@@ -171,6 +171,98 @@ int MqttClient::messageRetain() const
171
171
return -1 ;
172
172
}
173
173
174
+ void MqttClient::setClient (arduino::Client* client) {
175
+ if (_client != nullptr && _client->connected ()) {
176
+ // TODO if the current client is connected we cannot perform the change, first call disconnect
177
+ return ;
178
+ }
179
+
180
+ _client = client;
181
+ }
182
+
183
+ void MqttClient::setReceiveCallback (MqttReceiveCallback cbk) {
184
+ _cbk = cbk;
185
+ }
186
+
187
+ class MqttReadStream : public IStream {
188
+ public:
189
+ MqttReadStream (MqttClient& ref, int available)
190
+ : ref(ref), _available(available) { }
191
+
192
+ size_t readBytes (uint8_t * buf, size_t s) override {
193
+ size_t to_read = s < _available ? s : _available;
194
+ to_read = ref.readBytes (buf, to_read);
195
+ _available -= to_read;
196
+ return to_read;
197
+ }
198
+
199
+ int available () override { return _available; }
200
+
201
+ int read () override {
202
+ if (_available > 0 ) {
203
+ _available--;
204
+ return ref.read ();
205
+ } else {
206
+ return -1 ; // TODO return proper error code
207
+ }
208
+ }
209
+ private:
210
+ MqttClient& ref;
211
+ int _available;
212
+ };
213
+
214
+ class ArduinoMqttOStream : public MqttOStream {
215
+ public:
216
+ // TODO change pointer to reference, since it won't change
217
+ ArduinoMqttOStream (MqttClient &ref, error_t err=0 )
218
+ : MqttOStream(err), ref(ref) { }// TODO replace err default value with success
219
+
220
+ ~ArduinoMqttOStream () {
221
+ ref.endMessage ();
222
+ }
223
+
224
+ size_t write (uint8_t a) override {
225
+ if (rc == 1 ) {
226
+ return ref.write (a);
227
+ }
228
+ return 0 ;
229
+ }
230
+
231
+ size_t write (const uint8_t *buffer, size_t size) override {
232
+ if (rc == 1 ) {
233
+ return ref.write (buffer, size);
234
+ }
235
+ return 0 ;
236
+ }
237
+
238
+ int availableForWrite () override { return 0 ; }
239
+
240
+ private:
241
+ MqttClient& ref;
242
+ };
243
+
244
+
245
+ error_t MqttClient::publish (Topic t, uint8_t payload[], size_t size, MqttQos qos, MqttPublishFlag flags) {
246
+ int error = this ->beginMessage (t, (flags & RetainEnabled) == RetainEnabled, qos, (flags & DupEnabled) == DupEnabled);
247
+
248
+ if (error == 0 ) { // TODO replace this with a proper enum value
249
+ return error;
250
+ }
251
+
252
+ int res = this ->write (payload, size);
253
+ this ->endMessage ();
254
+
255
+ return res;
256
+ }
257
+
258
+ MqttOStream&& MqttClient::publish(Topic t, MqttQos qos, MqttPublishFlag flags) {
259
+ int error = this ->beginMessage (
260
+ t, (flags & RetainEnabled) == RetainEnabled,
261
+ static_cast <uint8_t >(qos), (flags & DupEnabled) == DupEnabled);
262
+
263
+ return std::move (ArduinoMqttOStream (*this , error));
264
+ }
265
+
174
266
int MqttClient::beginMessage (const char * topic, unsigned long size, bool retain, uint8_t qos, bool dup)
175
267
{
176
268
_txMessageTopic = topic;
@@ -259,6 +351,20 @@ int MqttClient::endMessage()
259
351
return 1 ;
260
352
}
261
353
354
+ void MqttClient::setWill (
355
+ Topic willTopic, const uint8_t * will_message, size_t will_size, MqttQos qos, MqttPublishFlag flags) {
356
+ int error = this ->beginWill (willTopic, (flags & RetainEnabled) == RetainEnabled, qos, (flags & DupEnabled) == DupEnabled);
357
+
358
+ if (error == 0 ) { // TODO replace this with a proper enum value
359
+ return ;
360
+ }
361
+
362
+ int res = this ->write (will_message, will_size);
363
+ this ->endWill ();
364
+
365
+ return ;
366
+ }
367
+
262
368
int MqttClient::beginWill (const char * topic, unsigned short size, bool retain, uint8_t qos)
263
369
{
264
370
int topicLength = strlen (topic);
@@ -314,7 +420,7 @@ int MqttClient::endWill()
314
420
return 1 ;
315
421
}
316
422
317
- int MqttClient::subscribe (const char * topic, uint8_t qos)
423
+ error_t MqttClient::subscribe (Topic topic, MqttQos qos)
318
424
{
319
425
int topicLength = strlen (topic);
320
426
int remainingLength = topicLength + 5 ;
@@ -362,12 +468,12 @@ int MqttClient::subscribe(const char* topic, uint8_t qos)
362
468
return 0 ;
363
469
}
364
470
365
- int MqttClient::subscribe (const String& topic, uint8_t qos)
471
+ error_t MqttClient::subscribe (const String& topic, MqttQos qos)
366
472
{
367
473
return subscribe (topic.c_str (), qos);
368
474
}
369
475
370
- int MqttClient::unsubscribe (const char * topic)
476
+ error_t MqttClient::unsubscribe (Topic topic)
371
477
{
372
478
int topicLength = strlen (topic);
373
479
int remainingLength = topicLength + 4 ;
@@ -565,16 +671,19 @@ void MqttClient::poll()
565
671
} else {
566
672
_rxState = MQTT_CLIENT_RX_STATE_READ_PUBLISH_PAYLOAD;
567
673
568
- if (_onMessage) {
674
+ if (_cbk) {
675
+ MqttReadStream stream (*this , _rxLength);
676
+ _cbk (_rxMessageTopic.c_str (), stream);
677
+ } else if (_onMessage) {
569
678
#ifdef MQTT_CLIENT_STD_FUNCTION_CALLBACK
570
679
_onMessage (this ,_rxLength);
571
680
#else
572
681
_onMessage (_rxLength);
573
682
#endif
683
+ }
574
684
575
- if (_rxLength == 0 ) {
576
- _rxState = MQTT_CLIENT_RX_STATE_READ_TYPE;
577
- }
685
+ if ((_onMessage || _cbk) && _rxLength == 0 ) {
686
+ _rxState = MQTT_CLIENT_RX_STATE_READ_TYPE;
578
687
}
579
688
}
580
689
}
@@ -592,7 +701,10 @@ void MqttClient::poll()
592
701
593
702
_rxState = MQTT_CLIENT_RX_STATE_READ_PUBLISH_PAYLOAD;
594
703
595
- if (_onMessage) {
704
+ if (_cbk) {
705
+ MqttReadStream stream (*this , _rxLength);
706
+ _cbk (_rxMessageTopic.c_str (), stream);
707
+ } else if (_onMessage) {
596
708
#ifdef MQTT_CLIENT_STD_FUNCTION_CALLBACK
597
709
_onMessage (this ,_rxLength);
598
710
#else
@@ -647,12 +759,12 @@ void MqttClient::poll()
647
759
}
648
760
}
649
761
650
- int MqttClient::connect (IPAddress ip, uint16_t port)
762
+ error_t MqttClient::connect (IPAddress ip, uint16_t port)
651
763
{
652
764
return connect (ip, NULL , port);
653
765
}
654
766
655
- int MqttClient::connect (const char *host, uint16_t port)
767
+ error_t MqttClient::connect (const char *host, uint16_t port)
656
768
{
657
769
return connect ((uint32_t )0 , host, port);
658
770
}
@@ -833,7 +945,7 @@ int MqttClient::subscribeQoS() const
833
945
return _subscribeQos;
834
946
}
835
947
836
- int MqttClient::connect (IPAddress ip, const char * host, uint16_t port)
948
+ error_t MqttClient::connect (IPAddress ip, const char * host, uint16_t port)
837
949
{
838
950
if (clientConnected ()) {
839
951
_client->stop ();
@@ -1041,7 +1153,7 @@ void MqttClient::pubcomp(uint16_t id)
1041
1153
endPacket ();
1042
1154
}
1043
1155
1044
- void MqttClient::ping ()
1156
+ error_t MqttClient::ping ()
1045
1157
{
1046
1158
uint8_t packetBuffer[2 ];
1047
1159
0 commit comments