flutter mqtt的使用看這里,持續(xù)更新吮播。变屁。。

mqtt網(wǎng)絡(luò)協(xié)議意狠,相信跟物聯(lián)網(wǎng)相關(guān)的公司都會遇到粟关,在Android,iOS原生開發(fā)是可以很好的實現(xiàn),相關(guān)的資料也是很多环戈!但是在flutter里面還算比較嘗鮮的一個領(lǐng)域吧闷板!

幸虧flutter里面 已經(jīng)有一個還不錯的第三庫mqtt_client,我在項目中用的版本是比較低一點的版本,如下圖

image

此版本在項目中,已經(jīng)運行了有一段時間了,相關(guān)的app,也已經(jīng)上線了,目前沒有發(fā)現(xiàn)有什么問題.但是,秉承我一貫的原則,干貨,干貨,干貨;當(dāng)然是要為大家?guī)碜钚掳姹镜膍qtt的示例啦~~此時應(yīng)該有掌聲

這篇文章介紹 怎么封裝一個屬于自己的工具院塞,解決在使用過程 中文亂碼等問題遮晚。

首先我希望用單例模式,這樣整個項目隨處都可以調(diào)用拦止,使用起來比較方便

然后希望支持加密與不加密的情況县遣!

最后希望支持中文

話不多說,新建一個工程flutter_app_mqtt,在pubspec.yaml文件中,添加依賴庫mqtt_client,然后pub get一下,下載庫

mqtt_client: ^7.3.0

準(zhǔn)備工作好了,我們準(zhǔn)備封裝工具類MqttTool


file.png

核心代碼

MqttTool工具類代碼如下:

import 'dart:async';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
import 'package:typed_data/typed_buffers.dart';

typedef ConnectedCallback = void Function();

class MqttTool {
  MqttQos qos = MqttQos.atLeastOnce;
  MqttServerClient mqttClient;
  static MqttTool _instance;
  static MqttTool getInstance() {
    if (_instance == null) {
      _instance = MqttTool();
    }
    return _instance;
  }

  Future<MqttClientConnectionStatus> connect(String server, int port,
      String clientIdentifier, String username, String password,
      {bool isSsl = false}) {
    mqttClient = MqttServerClient.withPort(server, clientIdentifier, port);

    mqttClient.onConnected = onConnected;

    mqttClient.onSubscribed = _onSubscribed;

    mqttClient.onSubscribeFail = _onSubscribeFail;

    mqttClient.onUnsubscribed = _onUnSubscribed;

    mqttClient.setProtocolV311();
    mqttClient.logging(on: false);
    if (isSsl) {
      mqttClient.secure = true;
      mqttClient.onBadCertificate = (dynamic a) => true;
    }
    _log("_正在連接中...");
    return mqttClient.connect(username, password);
  }

  disconnect() {
    mqttClient.disconnect();
    _log("_disconnect");
  }

  int publishMessage(String pTopic, String msg) {

    _log("_發(fā)送數(shù)據(jù)-topic:$pTopic,playLoad:$msg");
    Uint8Buffer uint8buffer = Uint8Buffer();
    var codeUnits = msg.codeUnits;
    uint8buffer.addAll(codeUnits);

    return mqttClient.publishMessage(pTopic, qos, uint8buffer, retain: false);
  }

  int publishRawMessage(String pTopic, List<int> list) {
    _log("_發(fā)送數(shù)據(jù)-topic:$pTopic,playLoad:$list");
    Uint8Buffer uint8buffer = Uint8Buffer();
//    var codeUnits = msg.codeUnits;
    uint8buffer.addAll(list);
    return mqttClient.publishMessage(pTopic, qos, uint8buffer, retain: false);
  }

  Subscription subscribeMessage(String subtopic) {
    return mqttClient.subscribe(subtopic, qos);
  }

  unsubscribeMessage(String unSubtopic) {
    mqttClient.unsubscribe(unSubtopic);
  }

  MqttClientConnectionStatus getMqttStatus() {
    return mqttClient.connectionStatus;
  }

  Stream<List<MqttReceivedMessage<MqttMessage>>> updates() {
    _log("_監(jiān)聽成功!");
    return mqttClient.updates;
  }

  onConnected() {
//    mqttClient.onConnected = callback;
    _log("_onConnected");
  }

  onDisConnected(ConnectedCallback callback) {
    mqttClient.onDisconnected = callback;
  }

  _onDisconnected() {
    _log("_onDisconnected");
  }

  _onSubscribed(String topic) {
    _log("_訂閱主題成功---topic:$topic");
  }

  _onUnSubscribed(String topic) {
    _log("_取消訂閱主題成功---topic:$topic");
  }

  _onSubscribeFail(String topic) {
    _log("_onSubscribeFail");
  }

  _log(String msg) {
    print("MQTT-->$msg");
  }
}

工具類封裝完成了,現(xiàn)在就需要去項目中應(yīng)用了,因此,我們改造一下HomePage的布局,來完成測試驗證工作

Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text(widget.title),
      ),
      body: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: <Widget>[
            Text(
              'You have pushed the button this many times:',
            ),
            Text(
              '$_counter',
              style: Theme.of(context).textTheme.headline4,
            ),
            Container(
              child: RaisedButton(
                onPressed: _connect,
                child: Text(
                  "connect",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _subscribeTopic,
                child: Text(
                  "subscribe topic",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _unSubscribeTopic,
                child: Text(
                  "unSubscribe topic",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _publishTopic,
                child: Text(
                  "publish topic",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _startListen,
                child: Text(
                  "start listen",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _disconnect,
                child: Text(
                  "disconnect",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            )
          ],
        ),
      ),
      floatingActionButton: FloatingActionButton(
        onPressed: _incrementCounter,
        tooltip: 'Increment',
        child: Icon(Icons.add),
      ), // This trailing comma makes auto-formatting nicer for build methods.
    );
  }

對應(yīng)的UI圖是這樣的


changeImage.png

RaiseButton對應(yīng)的點擊事件函數(shù)如下

//  建立連接
  _connect() async {
    String server = "your server name";
    int port = 1883;
    String clientId = "86-1885999fuehxz5f3ced1e";
    String userName = "86-18859995315";
    String password = "63ab9508485e131f946ce59ab9b3b687";
    MqttTool.getInstance()
        .connect(server, port, clientId, userName, password)
        .then((v) {
      if (v.returnCode == MqttConnectReturnCode.connectionAccepted) {
        print("恭喜你~ ====mqtt連接成功");
      } else if (v.returnCode == MqttConnectReturnCode.badUsernameOrPassword) {
        print("有事做了~ ====mqtt連接失敗 --密碼錯誤!!!");
      } else {
        print("有事做了~ ====mqtt連接失敗!!!");
      }
    });
  }

//  訂閱主題
  _subscribeTopic() {
    String clientId = "86-1885999fuehxz5f3ced1e";

    String topic = "device/F4CFA26F1E43/#";

    String topic2 = "reply/device/F4CFA26F1E43/#";
    MqttTool.getInstance().subscribeMessage(topic);
    MqttTool.getInstance().subscribeMessage(topic2);
  }
  
//  取消訂閱
  _unSubscribeTopic() {
    String clientId = "86-1885999fuehxz5f3ced1e";
    String topic = "device/F4CFA26F1E43/#";
    MqttTool.getInstance().unsubscribeMessage(topic);
  }
  
//  發(fā)布消息
  _publishTopic() {
    String topic1 =
        "api/device/F4CFA26F1E43/86-1885999mvqqyy5f3cf0d5/attribute/OccupiedCoolingSetpoint";
    String str1 = "2950";

    String topic2 =
        "api/device/F4CFA26F1E43/86-1885999mvqqyy5f3cf0d5/attribute/OccupiedHeatingSetpoint";
    String str2 = "2900";
    MqttTool.getInstance().publishMessage(topic1, str1);
    MqttTool.getInstance().publishMessage(topic2, str2);
  }
  
//  監(jiān)聽消息的具體實現(xiàn)
  _onData(List<MqttReceivedMessage<MqttMessage>> data) {
    final MqttPublishMessage recMess = data[0].payload;
    final String topic = data[0].topic;
    final String pt = Utf8Decoder().convert(recMess.payload.message);
    String desString = "topic is <$topic>, payload is <-- $pt -->";
    print("string =$desString");
    Map p = Map();
    p["topic"] = topic;
    p["type"] = "string";
    p["payload"] = pt;
    ListEventBus.getDefault().post(p);
  }

//  開啟監(jiān)聽消息
  _startListen() {
    _listenSubscription = MqttTool.getInstance().updates().listen(_onData);
  }
  
//  斷開連接
  _disconnect() {
    MqttTool.getInstance().disconnect();
  }

點擊RaiseButton順序 connect--->subscribe topic---->start listen---->publish topic----> disconnect

應(yīng)的控制臺log 如下:


log.png

至此,Mqtt協(xié)議的封裝,加應(yīng)用完成得差不多了.

還有二點需要補(bǔ)充一下,

一個是加密問題,
另一個是 整個工程在監(jiān)聽mqtt回來的數(shù)據(jù)該如何處理

加密問題,其實MqttTool工具類代碼,已經(jīng)處理好了,相關(guān)代碼如下

    if (isSsl) {
      mqttClient.secure = true;
      mqttClient.onBadCertificate = (dynamic a) => true;
    }

所以用加密端口時,_connect()方法里面要多傳一個參數(shù) isSsl:true

//  建立連接
  _connect() async {
    String server = "connect.owon.com";
    int port = 8883;
    String clientId = "86-1885999fuehxz5f3ced1e";
    String userName = "86-188599895315";
    String password = "63ab9508485e131f946ce59ab9b3b687";
    MqttTool.getInstance()
        .connect(server, port, clientId, userName, password,isSsl: true)
        .then((v) {
      if (v.returnCode == MqttConnectReturnCode.connectionAccepted) {
        print("恭喜你~ ====mqtt連接成功");
      } else if (v.returnCode == MqttConnectReturnCode.badUsernameOrPassword) {
        print("有事做了~ ====mqtt連接失敗 --密碼錯誤!!!");
      } else {
        print("有事做了~ ====mqtt連接失敗!!!");
      }
    });
  }

另一個是 整個工程在監(jiān)聽mqtt回來的數(shù)據(jù)該如何處理
我這邊是把mqtt工具類接收到數(shù)據(jù)用evenbus發(fā)送出去了, 其他需要監(jiān)聽數(shù)據(jù)的page,就去用evenbus去監(jiān)聽接收的數(shù)據(jù).

ListEventBus.getDefault().post(p);

evenbus類的代碼如下

import 'dart:async';

class ListEventBus {
  static ListEventBus _instance;
  StreamController _streamController;
  factory ListEventBus.getDefault() {
    if (_instance == null) {
      _instance = ListEventBus._init();
    }
    return _instance;
  }

  ListEventBus._init() {
    _streamController = StreamController.broadcast();
  }

  StreamSubscription<T> register<T>(void onData(T event)) {
    ///需要返回訂閱者糜颠,所以不能使用下面這種形式
//   return _streamController.stream.listen((event) {
//      if (event is T) {
//        onData(event);
//      }
//    });
    ///沒有指定類型,全類型注冊
    if (T == dynamic) {
      return _streamController.stream.listen(onData);
    } else {
      ///篩選出 類型為 T 的數(shù)據(jù),獲得只包含T的Stream
      Stream<T> stream =
          _streamController.stream.where((type) => type is T).cast<T>();
      return stream.listen(onData);
    }
  }

  void post(event) {
    _streamController.add(event);
  }

  void unregister() {
    _streamController.close();
  }

  void pause() {
    _streamController.onPause();
  }

  void resume() {
    _streamController.onResume();
  }
}

在需要監(jiān)聽數(shù)據(jù)的page里面添加以下代碼就可以啦

  StreamSubscription<Map<dynamic, dynamic>> _listEvenBusSubscription;
@override
  void initState() {
    // TODO: implement initState
    super.initState();
    _listEvenBusSubscription =
        ListEventBus.getDefault().register<Map<dynamic, dynamic>>((msg) {
          String topic = msg["topic"];
          Map<String, dynamic> payload = msg["payload"];

        });
  }

main.dart文件里面的完整代碼

import 'dart:async';
import 'dart:convert';
import 'live_even_bus.dart';
import 'package:flutter/material.dart';
import 'package:mqtt_client/mqtt_client.dart';
import 'mqtt_tool.dart';

void main() {
  runApp(MyApp());
}

class MyApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      title: 'Flutter Demo',
      theme: ThemeData(
        primarySwatch: Colors.blue,
        visualDensity: VisualDensity.adaptivePlatformDensity,
      ),
      home: MyHomePage(title: 'Flutter Demo Home Page'),
    );
  }
}

class MyHomePage extends StatefulWidget {
  MyHomePage({Key key, this.title}) : super(key: key);
  final String title;
  @override
  _MyHomePageState createState() => _MyHomePageState();
}

class _MyHomePageState extends State<MyHomePage> {
  int _counter = 0;
  StreamSubscription<List<MqttReceivedMessage<MqttMessage>>>
      _listenSubscription;

  StreamSubscription<Map<dynamic, dynamic>> _listEvenBusSubscription;

  void _incrementCounter() {
    setState(() {
      _counter++;
    });
  }

@override
  void initState() {
    // TODO: implement initState
    super.initState();
    _listEvenBusSubscription =
        ListEventBus.getDefault().register<Map<dynamic, dynamic>>((msg) {
          String topic = msg["topic"];
          Map<String, dynamic> payload = msg["payload"];
          print("監(jiān)聽的 topc= $topic, payload= $payload");
        });
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text(widget.title),
      ),
      body: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: <Widget>[
            Text(
              'You have pushed the button this many times:',
            ),
            Text(
              '$_counter',
              style: Theme.of(context).textTheme.headline4,
            ),
            Container(
              child: RaisedButton(
                onPressed: _connect,
                child: Text(
                  "connect",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _subscribeTopic,
                child: Text(
                  "subscribe topic",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _unSubscribeTopic,
                child: Text(
                  "unSubscribe topic",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _publishTopic,
                child: Text(
                  "publish topic",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _startListen,
                child: Text(
                  "start listen",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            ),
            Container(
              child: RaisedButton(
                onPressed: _disconnect,
                child: Text(
                  "disconnect",
                  style: TextStyle(fontSize: 20, color: Colors.purple),
                ),
              ),
            )
          ],
        ),
      ),
      floatingActionButton: FloatingActionButton(
        onPressed: _incrementCounter,
        tooltip: 'Increment',
        child: Icon(Icons.add),
      ), // This trailing comma makes auto-formatting nicer for build methods.
    );
  }

//  建立連接
  _connect() async {
    String server = "your server name";
    int port = 8883;
    String clientId = "86-1885999713jlb5f3d01f3";
    String userName = "86-188599895315";
    String password = "63ab9508485e131f946ce59ab9b3b687";
    MqttTool.getInstance()
        .connect(server, port, clientId, userName, password, isSsl: true)
        .then((v) {
      if (v.returnCode == MqttConnectReturnCode.connectionAccepted) {
        print("恭喜你~ ====mqtt連接成功");
      } else if (v.returnCode == MqttConnectReturnCode.badUsernameOrPassword) {
        print("有事做了~ ====mqtt連接失敗 --密碼錯誤!!!");
      } else {
        print("有事做了~ ====mqtt連接失敗!!!");
      }
    });
  }

//  訂閱主題
  _subscribeTopic() {
    String clientId = "86-1885999fuehxz5f3ced1e";

    String topic = "device/F4CFA26F1E43/#";

    String topic2 = "reply/device/F4CFA26F1E43/#";
    MqttTool.getInstance().subscribeMessage(topic);
    MqttTool.getInstance().subscribeMessage(topic2);
  }

//  取消訂閱
  _unSubscribeTopic() {
    String clientId = "86-1885999fuehxz5f3ced1e";
    String topic = "device/F4CFA26F1E43/#";
    MqttTool.getInstance().unsubscribeMessage(topic);
  }

//  發(fā)布消息
  _publishTopic() {
    String topic1 =
        "api/device/F4CFA26F1E43/86-1885999mvqqyy5f3cf0d5/attribute/OccupiedCoolingSetpoint";
    String str1 = "2950";

    String topic2 =
        "api/device/F4CFA26F1E43/86-1885999mvqqyy5f3cf0d5/attribute/OccupiedHeatingSetpoint";
    String str2 = "2900";
    MqttTool.getInstance().publishMessage(topic1, str1);
    MqttTool.getInstance().publishMessage(topic2, str2);
  }

//  監(jiān)聽消息的具體實現(xiàn)
  _onData(List<MqttReceivedMessage<MqttMessage>> data) {
    final MqttPublishMessage recMess = data[0].payload;
    final String topic = data[0].topic;
    final String pt = Utf8Decoder().convert(recMess.payload.message);
    String desString = "topic is <$topic>, payload is <-- $pt -->";
    print("string =$desString");
    Map p = Map();
    p["topic"] = topic;
    p["type"] = "string";
    p["payload"] = pt;
    ListEventBus.getDefault().post(p);
  }

//  開啟監(jiān)聽消息
  _startListen() {
    _listenSubscription = MqttTool.getInstance().updates().listen(_onData);
  }

//  斷開連接
  _disconnect() {
    MqttTool.getInstance().disconnect();
  }


}

結(jié)尾

mqtt這個庫也是在不斷的更新萧求,如果從低版本升到高版本其兴,可能會遇到不兼容的問題,希望到家夸政,冷靜沉著應(yīng)對元旬!祝君好運~ 最后,小伙伴們覺得有點幫助秒梳,請幫忙點個贊吧法绵。如果有什么問題需要探討的箕速,也歡迎留言~

2023年4月15日更新,希望能幫助到小伙伴們酪碘,好運~

mqtt庫更新到如下版本

mqtt_client: ^9.6.6

主要變化:

  • MqttTool工具類適配空安全,
  • 增加mqtt連接成功監(jiān)聽
  • 增加mqtt連接斷開監(jiān)聽
  • 增加網(wǎng)絡(luò)異常時提示
  • 解決中文亂碼問題
    如下
import 'dart:async';
import 'dart:convert';
import 'package:connectivity_plus/connectivity_plus.dart';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
import 'package:owoncare/owon_res/owon_constant.dart';
import '../generated/l10n.dart';
import '../owon_utils/owon_log.dart';
import 'package:typed_data/typed_buffers.dart';
import 'owon_toast.dart';



class OwonMqtt {
  MqttQos qos = MqttQos.atLeastOnce;
  MqttServerClient mqttClient;
  static OwonMqtt _instance;
  StreamSubscription<ConnectivityResult> _subscription;
  static Set<String> keepTopics = new Set<String>();

  ConnectivityResult _currentConnectivityResult;
  
  static OwonMqtt getInstance() {
    if (_instance == null) {
      _instance = OwonMqtt();
    }
    return _instance;
  }

  Future<MqttClientConnectionStatus> connect(String server, int port,
      String clientIdentifier, String username, String password,
      {bool isSsl = false, Function onDisconnected, Function onConnected}) {
    _subscription = Connectivity()
        .onConnectivityChanged
        .listen((ConnectivityResult result) {
      _currentConnectivityResult = result;
    });

    print("mqtt connect to $server:$port, clientId:$clientIdentifier");
    mqttClient = MqttServerClient.withPort(server, clientIdentifier, port);

    mqttClient.onSubscribed = _onSubscribed;

    mqttClient.onSubscribeFail = _onSubscribeFail;

    mqttClient.onUnsubscribed = _onUnSubscribed;
    mqttClient.onDisconnected = onDisconnected;
    mqttClient.onConnected = onConnected;

    mqttClient.setProtocolV311();
    mqttClient.logging(on: false);
    mqttClient.keepAlivePeriod = 120;
    if (isSsl) {
      mqttClient.secure = true;
      mqttClient.onBadCertificate = (dynamic a) => true;
    }
    _log("mqtt正在連接中...");
    return mqttClient.connect(username, password);
  }

  disconnect() {
    if (mqttClient != null) {
      mqttClient.disconnect();
      _subscription.cancel();
      _log("mqtt disconnect");
    } else {
      _log("mqtt client is null , disconnect failure");
    }
  }

//   int publishMessage(String pTopic, String msg) {
//     if (_currentConnectivityResult == ConnectivityResult.none) {
//       Future.delayed(Duration(milliseconds: 0), () {
//         OwonToast.show(S.of(OwonConstant.context).login_no_network,);
//       });
//       return null;
//     }
// //    AbnormalType currentType = Provider.of<AbnormalProvider>(currentContext).abnormalType;
// //    if(currentType == AbnormalType.failedTwice && (mqttClient.connectionStatus.state == MqttConnectionState.disconnected)){
// //      print("mqtt.dart第73行");
// //      ListEventBus.getDefault().post(2);
// //    }
//
//     _log("_發(fā)送數(shù)據(jù)-topic:$pTopic,playLoad:$msg");
//     Uint8Buffer uint8buffer = Uint8Buffer();
//     var codeUnits = msg.codeUnits;
//     uint8buffer.addAll(codeUnits);
//
//     return mqttClient.publishMessage(pTopic, qos, uint8buffer, retain: false);
//   }

  bool isEnable() {
    if (_currentConnectivityResult == ConnectivityResult.none) {
      OwonLog.e("mqtt 網(wǎng)絡(luò)異常");
      Future.delayed(Duration(milliseconds: 0), () {
        OwonToast.show(
          S.of(OwonConstant.context).login_no_network,
        );
      });
      return false;
    }
    if (mqttClient == null ||
        mqttClient.connectionStatus.state != MqttConnectionState.connected) {
      OwonLog.e("mqtt 連接狀態(tài)異常");
      if (mqttClient != null) {
        OwonLog.e("主動斷開mqtt連接");
        mqttClient.disconnect();
      }
      return false;
    } else {
      return true;
    }
  }

  ///此方法可以兼容中文
  int publishMessage(String pTopic, String msg) {
    if (!isEnable()) return null;

    _log("_發(fā)送數(shù)據(jù)-topic:$pTopic,playLoad:$msg");

    final bytes = utf8.encode(msg);
    // 創(chuàng)建MQTT消息并發(fā)送
    final message = MqttClientPayloadBuilder();
    bytes.forEach((element) {
      message.addByte(element);
    });

    return mqttClient.publishMessage(pTopic, qos, message.payload,
        retain: false);
  }

  int publishRawMessage(String pTopic, List<int> list) {
    if (!isEnable()) return null;
    _log("_發(fā)送數(shù)據(jù)-topic:$pTopic,playLoad:$list");
    Uint8Buffer uint8buffer = Uint8Buffer();
//    var codeUnits = msg.codeUnits;
    uint8buffer.addAll(list);
    return mqttClient.publishMessage(pTopic, qos, uint8buffer, retain: false);
  }

  Subscription subscribeMessage(String subtopic, {bool keep = false}) {
    if (keep) {
      keepTopics.add(subtopic);
    }
    if (!isEnable()) return null;
    return mqttClient.subscribe(subtopic, qos);
  }

  unsubscribeMessage(String unSubtopic) {
    if (!isEnable()) return null;
    mqttClient.unsubscribe(unSubtopic);
  }

  MqttClientConnectionStatus getMqttStatus() {
    return mqttClient != null ? mqttClient.connectionStatus : null;
  }

  Stream<List<MqttReceivedMessage<MqttMessage>>> updates() {
    _log("_監(jiān)聽成功!");
    return mqttClient.updates;
  }

  _onSubscribed(String topic) {
    _log("_訂閱主題成功---topic:$topic");
  }

  _onUnSubscribed(String topic) {
    _log("_取消訂閱主題成功---topic:$topic");
  }

  _onSubscribeFail(String topic) {
    _log("_onSubscribeFail");
  }

  _log(String msg) {
    // print("MQTT-->$msg");
    OwonLog.e("MQTT-->$msg");
  }

  static cleanKeepTopics() {
    keepTopics.clear();
  }
}

建立連接的代碼

      OwonMqtt.getInstance()
          .connect(_mMqttServer, _mIsSSLFlag ? _mMqttSSLPort : _mMqttPort,
              _mClientId, _mUserName, accessToken,
              isSsl: _mIsSSLFlag,
              onConnected: _onMqttConnectedState,
              onDisconnected: _onDisconnectedState)
          .then((v) {
        OwonLog.e("mqtt connect result = $v");
        if (v.returnCode == MqttConnectReturnCode.connectionAccepted) {
          OwonLog.e("恭喜你~ ====mqtt連接成功");
          
        } else if (v.returnCode ==
            MqttConnectReturnCode.badUsernameOrPassword) {
          OwonLog.e("有事做了~ ====mqtt連接失敗 --密碼錯誤!!!");
        } else {
          OwonLog.e("有事做了~ ====mqtt連接失敗!!!");
        }
      }).catchError((e) {
          OwonLog.e("有事做了~ ====mqtt連接失敗!!!");
      });

監(jiān)聽mqtt連接成功

 _onMqttConnectedState() {
    OwonLog.e("mqtt連接已成功")
    //do something,比如:去訂閱相關(guān)主題盐茎,或者更新UI
  }

監(jiān)聽mqtt連接斷開

 _onDisconnectedState() {
    OwonLog.e("mqtt連接已斷開");
    //do something, 比如:給出提示兴垦,或者去重新連接mqtt等
  }
補(bǔ)充一下,當(dāng)前版本信息
Justin-Mac-mini:care-vefify$ flutter doctor
Doctor summary (to see all details, run flutter doctor -v):
[?] Flutter (Channel stable, 3.0.3, on macOS 13.3 22E252 darwin-x64, locale zh-Hans-CN)
[?] Android toolchain - develop for Android devices (Android SDK version 32.0.0)
[?] Xcode - develop for iOS and macOS (Xcode 14.3)
[?] Chrome - develop for the web
[?] Android Studio (version 2021.2)
[?] IntelliJ IDEA Community Edition (version 2021.2.2)
[?] VS Code (version 1.77.1)
[?] Connected device (2 available)
[?] HTTP Host Availability

? No issues found!
Justin-Mac-mini:care-vefify$ 
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末字柠,一起剝皮案震驚了整個濱河市探越,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌窑业,老刑警劉巖钦幔,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異常柄,居然都是意外死亡鲤氢,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進(jìn)店門西潘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來卷玉,“玉大人,你說我怎么就攤上這事喷市∠嘀郑” “怎么了?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵品姓,是天一觀的道長寝并。 經(jīng)常有香客問我,道長腹备,這世上最難降的妖魔是什么衬潦? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮馏谨,結(jié)果婚禮上别渔,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好哎媚,可當(dāng)我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布喇伯。 她就那樣靜靜地躺著,像睡著了一般拨与。 火紅的嫁衣襯著肌膚如雪稻据。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天买喧,我揣著相機(jī)與錄音捻悯,去河邊找鬼。 笑死淤毛,一個胖子當(dāng)著我的面吹牛今缚,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播低淡,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼姓言,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了蔗蹋?” 一聲冷哼從身側(cè)響起何荚,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎猪杭,沒想到半個月后餐塘,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡皂吮,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年戒傻,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片涮较。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡稠鼻,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出狂票,到底是詐尸還是另有隱情候齿,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布闺属,位于F島的核電站慌盯,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏掂器。R本人自食惡果不足惜亚皂,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望国瓮。 院中可真熱鬧灭必,春花似錦狞谱、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至播歼,卻和暖如春伶跷,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背秘狞。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工叭莫, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人烁试。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓雇初,卻偏偏與公主長得像,于是被迫代替她去往敵國和親廓潜。 傳聞我的和親對象是個殘疾皇子抵皱,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,914評論 2 355