上海市住房和城乡建设网站,网站源代码怎么下载,网站域名怎么取,wordpress前台文章增加编辑器目录
引言
什么是MQTT#xff1f;
在Flutter中使用MQTT
安装
iOS
安卓
创建一个全局的客户端对象 配置客户端对象 连接#xff08;异步#xff09;
监听接受的消息
发送消息
监听连接状态和订阅的回调 引言 随着移动应用开发技术的发展#xff0c;实时通信成为…
目录
引言
什么是MQTT
在Flutter中使用MQTT
安装
iOS
安卓
创建一个全局的客户端对象 配置客户端对象 连接异步
监听接受的消息
发送消息
监听连接状态和订阅的回调 引言 随着移动应用开发技术的发展实时通信成为了许多应用程序不可或缺的一部分。无论是社交应用中的即时消息传递还是物联网(IoT)设备之间的数据交换都需要一个高效稳定的通信机制。MQTTMessage Queuing Telemetry Transport作为一种轻量级的消息协议非常适合于这种场景。本文将介绍如何在Flutter项目中集成MQTT并通过一个简单的示例来演示其基本用法。 什么是MQTT
MQTT是一种基于发布/订阅模式的轻量级消息协议设计初衷是为了提供低开销、低带宽的网络连接。它特别适合于远程位置的通信如传感器与中央服务器之间的数据传输。MQTT的主要特点包括
轻量级非常小的代码占用空间和带宽使用。发布/订阅模型允许一对多的消息分发即一个消息可以发送给多个客户端。服务质量(QoS)提供了三种不同的服务质量级别以满足不同场景下的需求。安全性支持TLS/SSL加密确保数据传输的安全性。 在Flutter中使用MQTT 首先需要安装mqtt_client这个依赖执行下面命令 flutter pub add mqtt_client 安装
如果您在 Android 或 iOS 设备上的 Flutter 环境中使用客户端则需要进行以下设备权限设置。
iOS
将以下键添加到位于ios/Runner/Info.plist的Info.plist文件中 keyNSLocalNetworkUsageDescription/key
stringLooking for local tcp Bonjour service/string
keyNSBonjourServices/key
arraystringmqtt.tcp/string
/array 安卓
将以下 Android 权限添加到位于android/app/src/main/AndroidManifest.xml的AndroidManifest.xml文件中 uses-permission android:nameandroid.permission.INTERNET /
uses-permission android:nameandroid.permission.ACCESS_NETWORK_STATE / 在页面中导入
import package:mqtt_client/mqtt_client.dart;
使用案例 这里我们使用的是wss协议
import dart:convert;
import package:flutter_diancan/utils/logger_helper.dart;
import package:mqtt_client/mqtt_client.dart;
import package:mqtt_client/mqtt_server_client.dart;
import package:shared_preferences/shared_preferences.dart;class MqttServe {final client MqttServerClient(请求地址, );FutureMqttClient connect() async {try {client.setProtocolV311();client.logging(on: true);client.port 443; // 端口号client.keepAlivePeriod 60;client.websocketProtocols [mqtt];client.useWebSocket true; // 因为我们这里使用的是wss协议所以加这个这个根据自己的需求来定是否需要client.onConnected onConnected;client.onDisconnected onDisconnected;client.onUnsubscribed onUnsubscribed;client.onSubscribed onSubscribed;client.onSubscribeFail onSubscribeFail;client.pongCallback pong;client.connectTimeoutPeriod 60;final connMess MqttConnectMessage().authenticateAs(用户名, 密码).withClientIdentifier(Mqtt_MyClientUniqueId).withWillTopic(willtopic).withWillMessage(My Will message).startClean().withWillQos(MqttQos.atLeastOnce);client.connectionMessage connMess;try {print(Connecting);await client.connect();} catch (e) {print(Exception: $e);client.disconnect();}client.updates!.listen((ListMqttReceivedMessageMqttMessage?? c) async {final recMessage c![0].payload as MqttPublishMessage;final payload MqttPublishPayload.bytesToStringAsString(recMessage.payload.message);print(Received message:$payload from topic: ${c[0].topic});});} catch (e, s) {LoggerHelper.fatal(e, s);}return client;}Futurevoid sendMessage() async {if (client.connectionStatus?.state MqttConnectionState.connected) {final builder MqttClientPayloadBuilder();var payloadObject {MsgData: 发送成功啦};print(发送的信息:${json.encode(payloadObject)} );builder.addUTF8String(json.encode(payloadObject));client.publishMessage(发送消息的订阅地址, MqttQos.atLeastOnce, builder.payload!);}}// Connected callbackvoid onConnected() {print(已连接);try {// 连接后订阅client.subscribe(订阅地址, MqttQos.atLeastOnce);} catch (e, s) {LoggerHelper.fatal(e, s);}}// Disconnected callbackvoid onDisconnected() async {print(已断开);final SharedPreferences prefs await SharedPreferences.getInstance();String? token prefs.getString(token);if (token ! null) {reconnect();}}Futurevoid reconnect() async {print(重连中);int retryCount 0;const maxRetries 10;const baseRetryInterval 2; // 初始重连间隔时间秒while (retryCount maxRetries) {try {print(Reconnecting attempt ${retryCount 1}...);await client.connect();if (client.connectionStatus?.state MqttConnectionState.connected) {print(Reconnected successfully.);break;}} catch (e) {print(Reconnect failed: $e);}// 计算下一次重连间隔时间指数退避int retryInterval baseRetryInterval * (2 retryCount);await Future.delayed(Duration(seconds: retryInterval));retryCount;}}// 关闭Futurevoid close() async {try {// 重新订阅client.unsubscribe(订阅地址);} catch (e, s) {LoggerHelper.fatal(e, s);}client.disconnect();}// Subscribed callbackvoid onSubscribed(String topic) {print(订阅成功主题为: $topic);}// Subscribed failed callbackvoid onSubscribeFail(String topic) {print(订阅失败主题为: $topic);}// Unsubscribed callbackvoid onUnsubscribed(String? topic) {print(Unsubscribed topic: $topic);}// Ping callbackvoid pong() {print(调用Ping响应客户端回调);}
}创建一个全局的客户端对象
final client MqttServerClient(请求地址, ); 配置客户端对象 client.setProtocolV311();client.logging(on: true);client.port 443; // 端口号client.keepAlivePeriod 60;client.websocketProtocols [mqtt];client.useWebSocket true; // 因为我们这里使用的是wss协议所以加这个这个根据自己的需求来定是否需要client.onConnected onConnected;client.onDisconnected onDisconnected;client.onUnsubscribed onUnsubscribed;client.onSubscribed onSubscribed;client.onSubscribeFail onSubscribeFail;client.pongCallback pong;client.connectTimeoutPeriod 60;
设置连接消息 final connMess MqttConnectMessage().authenticateAs(用户名, 密码).withClientIdentifier(Mqtt_MyClientUniqueId).withWillTopic(willtopic).withWillMessage(My Will message).startClean().withWillQos(MqttQos.atLeastOnce);client.connectionMessage connMess; 连接异步 try {print(Connecting);await client.connect();} catch (e) {print(Exception: $e);client.disconnect();}监听接受的消息 client.updates!.listen((ListMqttReceivedMessageMqttMessage?? c) async {final recMessage c![0].payload as MqttPublishMessage;final payload MqttPublishPayload.bytesToStringAsString(recMessage.payload.message);print(Received message:$payload from topic: ${c[0].topic});}); 发送消息
Futurevoid sendMessage() async {if (client.connectionStatus?.state MqttConnectionState.connected) {final builder MqttClientPayloadBuilder();var payloadObject {MsgData: 发送成功啦};print(发送的信息:${json.encode(payloadObject)} );builder.addUTF8String(json.encode(payloadObject));client.publishMessage(发送消息的订阅地址, MqttQos.atLeastOnce, builder.payload!);}} 监听连接状态和订阅的回调
// Connected callbackvoid onConnected() {print(已连接);try {// 连接后订阅client.subscribe(订阅地址, MqttQos.atLeastOnce);} catch (e, s) {LoggerHelper.fatal(e, s);}}// Disconnected callbackvoid onDisconnected() async {print(已断开);final SharedPreferences prefs await SharedPreferences.getInstance();String? token prefs.getString(token);if (token ! null) {reconnect();}}Futurevoid reconnect() async {print(重连中);int retryCount 0;const maxRetries 10;const baseRetryInterval 2; // 初始重连间隔时间秒while (retryCount maxRetries) {try {print(Reconnecting attempt ${retryCount 1}...);await client.connect();if (client.connectionStatus?.state MqttConnectionState.connected) {print(Reconnected successfully.);break;}} catch (e) {print(Reconnect failed: $e);}// 计算下一次重连间隔时间指数退避int retryInterval baseRetryInterval * (2 retryCount);await Future.delayed(Duration(seconds: retryInterval));retryCount;}}// 关闭Futurevoid close() async {try {// 重新订阅client.unsubscribe(订阅地址);} catch (e, s) {LoggerHelper.fatal(e, s);}client.disconnect();}// Subscribed callbackvoid onSubscribed(String topic) {print(订阅成功主题为: $topic);}// Subscribed failed callbackvoid onSubscribeFail(String topic) {print(订阅失败主题为: $topic);}// Unsubscribed callbackvoid onUnsubscribed(String? topic) {print(Unsubscribed topic: $topic);}// Ping callbackvoid pong() {print(调用Ping响应客户端回调);}