-
Notifications
You must be signed in to change notification settings - Fork 182
/
Copy pathmqtt_browser_client.dart
187 lines (156 loc) · 7.67 KB
/
mqtt_browser_client.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
/*
* Package : mqtt_client
* Author : S. Hamblett <[email protected]>
* Date : 31/05/2017
* Copyright : S.Hamblett
*/
import 'dart:async';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_browser_client.dart';
/// An annotated simple subscribe/publish usage example for mqtt_browser_client. Please read in with reference
/// to the MQTT specification. The example is runnable.
/// First create a client, the client is constructed with a broker name, client identifier
/// and port if needed. The client identifier (short ClientId) is an identifier of each MQTT
/// client connecting to a MQTT broker. As the word identifier already suggests, it should be unique per broker.
/// The broker uses it for identifying the client and the current state of the client. If you don’t need a state
/// to be hold by the broker, in MQTT 3.1.1 you can set an empty ClientId, which results in a connection without any state.
/// A condition is that clean session connect flag is true, otherwise the connection will be rejected.
/// The client identifier can be a maximum length of 23 characters. If a port is not specified the standard port
/// of 1883 is used. Only web sockets are supported in the browser client.
/// A websocket URL must start with ws:// or wss:// or Dart will throw an exception, consult your websocket MQTT broker
/// for details.
final client = MqttBrowserClient('ws://test.mosquitto.org', '');
Future<int> main() async {
/// Set logging on if needed, defaults to off
client.logging(on: false);
/// Set the correct MQTT protocol for mosquito
client.setProtocolV311();
/// If you intend to use a keep alive you must set it here otherwise keep alive will be disabled.
client.keepAlivePeriod = 20;
/// The connection timeout period can be set if needed, the default is 5 seconds.
client.connectTimeoutPeriod = 2000; // milliseconds
/// The ws port for Mosquitto is 8080, for wss it is 8081
client.port = 8080;
/// Add the unsolicited disconnection callback
client.onDisconnected = onDisconnected;
/// Add the successful connection callback
client.onConnected = onConnected;
/// Add a subscribed callback, there is also an unsubscribed callback if you need it.
/// You can add these before connection or change them dynamically after connection if
/// you wish. There is also an onSubscribeFail callback for failed subscriptions, these
/// can fail either because you have tried to subscribe to an invalid topic or the broker
/// rejects the subscribe request.
client.onSubscribed = onSubscribed;
/// Set a ping received callback if needed, called whenever a ping response(pong) is received
/// from the broker.
client.pongCallback = pong;
/// Set the appropriate websocket headers for your connection/broker.
/// Mosquito uses the single default header, other brokers may be fine with the
/// default headers.
client.websocketProtocols = MqttClientConstants.protocolsSingleDefault;
/// Create a connection message to use or use the default one. The default one sets the
/// client identifier, any supplied username/password and clean session,
/// an example of a specific one below.
final connMess = MqttConnectMessage()
.withClientIdentifier('Mqtt_MyClientUniqueId')
.withWillTopic('willtopic') // If you set this you must set a will message
.withWillMessage('My Will message')
.startClean() // Non persistent session for testing
.withWillQos(MqttQos.atLeastOnce);
print('EXAMPLE::Mosquitto client connecting....');
client.connectionMessage = connMess;
/// Connect the client, any errors here are communicated by raising of the appropriate exception. Note
/// in some circumstances the broker will just disconnect us, see the spec about this, we however will
/// never send malformed messages.
try {
await client.connect();
} on Exception catch (e) {
print('EXAMPLE::client exception - $e');
client.disconnect();
return -1;
}
/// Check we are connected
if (client.connectionStatus!.state == MqttConnectionState.connected) {
print('EXAMPLE::Mosquitto client connected');
} else {
/// Use status here rather than state if you also want the broker return code.
print(
'EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, status is ${client.connectionStatus}');
client.disconnect();
return -1;
}
/// Ok, lets try a subscription
print('EXAMPLE::Subscribing to the test/lol topic');
const topic = 'test/lol'; // Not a wildcard topic
client.subscribe(topic, MqttQos.atMostOnce);
/// The client has a change notifier object(see the Observable class) which we then listen to to get
/// notifications of published updates to each subscribed topic.
/// In general you should listen here as soon as possible after connecting, you will not receive any
/// publish messages until you do this.
/// Also you must re-listen after disconnecting.
client.updates!.listen((List<MqttReceivedMessage<MqttMessage?>>? c) {
final recMess = c![0].payload as MqttPublishMessage;
final pt =
MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
/// The above may seem a little convoluted for users only interested in the
/// payload, some users however may be interested in the received publish message,
/// lets not constrain ourselves yet until the package has been in the wild
/// for a while.
/// The payload is a byte buffer, this will be specific to the topic
print(
'EXAMPLE::Change notification:: topic is <${c[0].topic}>, payload is <-- $pt -->');
print('');
});
/// If needed you can listen for published messages that have completed the publishing
/// handshake which is Qos dependant. Any message received on this stream has completed its
/// publishing handshake with the broker.
client.published!.listen((MqttPublishMessage message) {
print(
'EXAMPLE::Published notification:: topic is ${message.variableHeader!.topicName}, with Qos ${message.header!.qos}');
});
/// Lets publish to our topic
/// Use the payload builder rather than a raw buffer
/// Our known topic to publish to
const pubTopic = 'Dart/Mqtt_client/testtopic';
final builder = MqttClientPayloadBuilder();
builder.addString('Hello from mqtt_client');
/// Subscribe to it
print('EXAMPLE::Subscribing to the Dart/Mqtt_client/testtopic topic');
client.subscribe(pubTopic, MqttQos.exactlyOnce);
/// Publish it
print('EXAMPLE::Publishing our topic');
client.publishMessage(pubTopic, MqttQos.exactlyOnce, builder.payload!);
/// Ok, we will now sleep a while, in this gap you will see ping request/response
/// messages being exchanged by the keep alive mechanism.
print('EXAMPLE::Sleeping....');
await MqttUtilities.asyncSleep(60);
/// Finally, unsubscribe and exit gracefully
print('EXAMPLE::Unsubscribing');
client.unsubscribe(topic);
/// Wait for the unsubscribe message from the broker if you wish.
await MqttUtilities.asyncSleep(2);
print('EXAMPLE::Disconnecting');
client.disconnect();
return 0;
}
/// The subscribed callback
void onSubscribed(String topic) {
print('EXAMPLE::Subscription confirmed for topic $topic');
}
/// The unsolicited disconnect callback
void onDisconnected() {
print('EXAMPLE::OnDisconnected client callback - Client disconnection');
if (client.connectionStatus!.disconnectionOrigin ==
MqttDisconnectionOrigin.solicited) {
print('EXAMPLE::OnDisconnected callback is solicited, this is correct');
}
}
/// The successful connect callback
void onConnected() {
print(
'EXAMPLE::OnConnected client callback - Client connection was sucessful');
}
/// Pong callback
void pong() {
print('EXAMPLE::Ping response client callback invoked');
}