高性能、高穩(wěn)定性的跨平臺MQTT客戶端

開源地址

https://github.com/jiejieTop/mqttclient

mqttclient

一個高性能、高穩(wěn)定性的跨平臺MQTT客戶端

一個高性能硕舆、高穩(wěn)定性的跨平臺MQTT客戶端,基于socket API之上開發(fā)骤公,可以在嵌入式設(shè)備(FreeRTOS/LiteOS/RT-Thread/TencentOS tiny)抚官、Linux、Windows阶捆、Mac上使用凌节,擁有非常簡潔的API接口,以極少的資源實現(xiàn)QOS2的服務(wù)質(zhì)量洒试,并且無縫銜接了mbedtls加密庫倍奢。

優(yōu)勢:

  • 基于標(biāo)準(zhǔn)BSD socket之上開發(fā),只要是兼容BSD socket的系統(tǒng)均可使用垒棋。

  • 穩(wěn)定:無論是掉線重連卒煞,丟包重發(fā),都是嚴格遵循MQTT協(xié)議標(biāo)準(zhǔn)執(zhí)行叼架,除此之外對大數(shù)據(jù)量的測試無論是收是發(fā)畔裕,都是非常穩(wěn)定(一次發(fā)送135K數(shù)據(jù)衣撬,3秒一次),高頻測試也是非常穩(wěn)定(7個主題同時收發(fā)扮饶,每秒一次具练,也就是1秒14個mqtt報文,服務(wù)質(zhì)量QoS0甜无、QoS1扛点、QoS2都有)。因為作者以極少的資源設(shè)計了記錄機制毫蚓,對采用QoS1服務(wù)質(zhì)量的報文必須保證到達一次占键,當(dāng)發(fā)布的主題(qos1、qos2都適用)沒有被服務(wù)器收到時會自動重發(fā)元潘,而對QoS2服務(wù)質(zhì)量的報文保證有且只有處理一次(如果不相信它穩(wěn)定性的同學(xué)可以自己去修改源碼畔乙,專門為QoS2服務(wù)質(zhì)量去做測試,故意不回復(fù)PUBREC包翩概,讓服務(wù)器重發(fā)QoS2報文牲距,且看看客戶端是否有且只有處理一次),而對于掉線重連的穩(wěn)定性钥庇,這種則是基本操作了牍鞠,沒啥好說的,在自動重連后還會自動重新訂閱主題评姨,保證主題不會丟失难述,因此在測試中穩(wěn)定性極好。

  • 輕量級:整個代碼工程極其簡單吐句,不使用mbedtls情況下胁后,占用資源極少,作者曾使用esp8266模組與云端通信嗦枢,整個工程代碼消耗的RAM不足15k(包括系統(tǒng)占用的開銷攀芯,對數(shù)據(jù)的處理開銷,而此次還是未優(yōu)化的情況下文虏,還依舊完美保留了掉線重連的穩(wěn)定性侣诺,但是對應(yīng)qos1、qos2服務(wù)質(zhì)量的報文則未做測試氧秘,因為STM32F103C8T6芯片資源實在是太少了年鸳,折騰不起)。

  • 無縫銜接mbedtls加密傳輸丸相,讓網(wǎng)絡(luò)傳輸更加安全阻星,而且接口層完全不需要用戶理會,無論是否加密,mqttclient對用戶提供的API接口是沒有變化的妥箕,這就很好的兼容了一套代應(yīng)用層的碼可以加密傳輸也可以不加密傳輸。

  • 擁有極簡的API接口更舞,總的來說畦幢,mqttclient的配置都有默認值,基本無需配置都能使用的缆蝉,也可以隨意配置宇葱,對配置都有健壯性檢測,這樣子設(shè)計的API接口也是非常簡單刊头。

  • 有非常好的代碼風(fēng)格與思想:整個代碼采用分層式設(shè)計黍瞧,代碼實現(xiàn)采用異步處理的思想,降低耦合原杂,提高性能印颤,具體體現(xiàn)在什么地方呢?很簡單穿肄,目前市面上很多MQTT客戶端發(fā)布主題都是要阻塞等待ack年局,這是非常暴力的行為,阻塞當(dāng)前線程等待服務(wù)器的應(yīng)答咸产,那如果我想要發(fā)送數(shù)據(jù)怎么辦矢否,或者我要重復(fù)檢測數(shù)據(jù)怎么辦,你可能會說脑溢,指定阻塞時間等待僵朗,那如果網(wǎng)絡(luò)延遲,ack遲遲不來屑彻,我就白等了嗎验庙,對于qos1、qos2的服務(wù)質(zhì)量怎么辦酱酬,所以說這種還是要異步處理的思想壶谒,我發(fā)布主題,那我發(fā)布出去就好了膳沽,不需要等待汗菜,對于qos1、qos2服務(wù)質(zhì)量的MQTT報文挑社,如果服務(wù)器沒收到陨界,那我重發(fā)就可以,這種重發(fā)也是異步的處理痛阻,完全不會阻塞當(dāng)前線程菌瘪。

  • MQTT協(xié)議支持主題通配符“#”、“+”

  • 訂閱的主題與消息處理完全分離俏扩,讓編程邏輯更加簡單易用糜工,用戶無需理會錯綜復(fù)雜的邏輯關(guān)系。

  • mqttclient內(nèi)部已實現(xiàn)甭嫉活處理機制捌木,無需用戶過多關(guān)心理會,用戶只需專心處理應(yīng)用功能即可嫉戚。

  • 無縫銜接salof:它是一個同步異步日志輸出框架刨裆,在空閑時候輸出對應(yīng)的日志信息,也可以將信息寫入flash中保存彬檀,方便調(diào)試帆啃。

  • 不對外產(chǎn)生依賴。

  • 使用 paho mqtt 庫

整體框架

擁有非常明確的分層框架窍帝。

整體框架

目前已實現(xiàn)了Linux努潘、TencentOS tiny、FreeRTOS盯桦、RT-Thread平臺(已做成軟件包慈俯,名字為kawaii-mqtt),除此之外TencentOS tiny的AT框架亦可以使用(RAM消耗不足15K)拥峦,并且穩(wěn)定性極好贴膘!

平臺 代碼位置
Linux https://github.com/jiejieTop/mqttclient
TencentOS tiny https://github.com/Tencent/TencentOS-tiny/tree/master/board/Fire_STM32F429
TencentOS tiny AT 框架 https://github.com/jiejieTop/gokit3-board-mqttclient
RT-Thread https://github.com/jiejieTop/kawaii-mqtt
FreeRTOS https://github.com/jiejieTop/freertos-mqttclient

版本

發(fā)布版本 描述
[v1.0.0] 初次發(fā)布,完成基本框架及其穩(wěn)定性驗證
[v1.0.1] 修復(fù)主動與服務(wù)器斷開連接時的邏輯處理
[v1.0.2] 添加新特性——攔截器略号,修復(fù)一些小bug
[v1.0.3] 避免造成全局污染修改了log刑峡、list相關(guān)函數(shù)的命名

問題

歡迎以 GitHub Issues 的形式提交問題和bug報告

版權(quán)和許可

mqttclient 遵循 Apache License v2.0 開源協(xié)議。鼓勵代碼共享和尊重原作者的著作權(quán)玄柠,可以自由的使用突梦、修改源代碼,也可以將修改后的代碼作為開源或閉源軟件發(fā)布羽利,但必須保留原作者版權(quán)聲明宫患。

linux平臺下測試使用

安裝cmake:

sudo apt-get install cmake

配置

mqttclient/test/test.c文件中修改以下內(nèi)容:

    init_params.connect_params.network_params.network_ssl_params.ca_crt = test_ca_get();    /* CA證書 */
    init_params.connect_params.network_params.addr = "xxxxxxx";                             /* 服務(wù)器域名 */
    init_params.connect_params.network_params.port = "8883";                                /* 服務(wù)器端口號 */
    init_params.connect_params.user_name = "xxxxxxx";                                       /* 用戶名 */
    init_params.connect_params.password = "xxxxxxx";                                        /* 密碼 */
    init_params.connect_params.client_id = "xxxxxxx";                                       /* 客戶端id */

mbedtls

默認打開mbedtls。

salof 全稱是:Synchronous Asynchronous Log Output Framework(同步異步日志輸出框架)这弧,它是一個同步異步日志輸出框架娃闲,在空閑時候輸出對應(yīng)的日志信息,并且該庫與mqttclient無縫銜接匾浪。

配置對應(yīng)的日志輸出級別:

#define BASE_LEVEL      (0)
#define ASSERT_LEVEL    (BASE_LEVEL + 1)            /* 日志輸出級別:斷言級別(非常高優(yōu)先級) */
#define ERR_LEVEL       (ASSERT_LEVEL + 1)          /* 日志輸出級別:錯誤級別(高優(yōu)先級) */
#define WARN_LEVEL      (ERR_LEVEL + 1)             /* 日志輸出級別:警告級別(中優(yōu)先級) */
#define INFO_LEVEL      (WARN_LEVEL + 1)            /* 日志輸出級別:信息級別(低優(yōu)先級) */
#define DEBUG_LEVEL     (INFO_LEVEL + 1)            /* 日志輸出級別:調(diào)試級別(更低優(yōu)先級) */

#define         LOG_LEVEL                   WARN_LEVEL      /* 日志輸出級別 */

日志其他選項:

  • 終端帶顏色
  • 時間戳
  • 標(biāo)簽

mqttclient的配置

配置mqtt等待應(yīng)答列表的最大值皇帮,對于qos1 qos2服務(wù)質(zhì)量有要求的可以將其設(shè)置大一點,當(dāng)然也必須資源跟得上蛋辈,它主要是保證qos1 qos2的mqtt報文能準(zhǔn)確到達服務(wù)器属拾。

#define     MQTT_ACK_HANDLER_NUM_MAX            64

選擇MQTT協(xié)議的版本,默認為4,表示使用MQTT 3.1.1版本渐白,而3則表示為MQTT 3.1版本尊浓。

#define     MQTT_VERSION                        4           // 4 is mqtt 3.1.1

設(shè)置默認的保活時間纯衍,它主要是保證MQTT客戶端與服務(wù)器的保持活性連接眠砾,單位為 秒 ,比如MQTT客戶端與服務(wù)器100S沒有發(fā)送數(shù)據(jù)了托酸,有沒有接收到數(shù)據(jù),此時MQTT客戶端會發(fā)送一個ping包柒巫,確認一下這個會話是否存在励堡,如果收到服務(wù)器的應(yīng)答,那么說明這個會話還是存在的堡掏,可以隨時收發(fā)數(shù)據(jù)应结,而如果不存在了,就清除會話泉唁。

#define     MQTT_KEEP_ALIVE_INTERVAL            100         // unit: second

默認的命令超時鹅龄,它主要是用于socket讀寫超時,在MQTT初始化時可以指定:

#define     MQTT_DEFAULT_CMD_TIMEOUT            4000

默認主題的長度亭畜,主題是支持通配符的扮休,如果主題太長則會被截斷:

#define     MQTT_TOPIC_LEN_MAX                  64

默認的算法數(shù)據(jù)緩沖區(qū)的大小,如果要發(fā)送大量數(shù)據(jù)則修改大一些拴鸵,在MQTT初始化時可以指定:

#define     MQTT_DEFAULT_BUF_SIZE               1024

線程相關(guān)的配置玷坠,如線程棧,線程優(yōu)先級劲藐,線程時間片等:
在linux環(huán)境下可以是不需要理會這些參數(shù)的八堡,而在RTOS平臺則需要配置,如果不使用mbedtls聘芜,線程棧2048字節(jié)已足夠兄渺,而使用mbedtls加密后,需要配置4096字節(jié)以上汰现。

#define     MQTT_THREAD_STACK_SIZE              2048    // 線程棧
#define     MQTT_THREAD_PRIO                    5       // 線程優(yōu)先級
#define     MQTT_THREAD_TICK                    50      // 線程時間片

默認的重連時間間隔挂谍,當(dāng)發(fā)生掉線時,會以這個時間間隔嘗試重連:

#define     MQTT_RECONNECT_DEFAULT_DURATION     1000

其他不需要怎么配置的東西:

#define     MQTT_MAX_PACKET_ID                  (0xFFFF - 1)    // mqtt報文id
#define     MQTT_MAX_CMD_TIMEOUT                20000           //最大的命令超時參數(shù)
#define     MQTT_MIN_CMD_TIMEOUT                1000            //最小的命令超時參數(shù)

ps:以上參數(shù)基本不需要怎么配置的服鹅,直接用即可~

編譯 & 運行

./build.sh

運行build.sh腳本后會在 ./build/bin/目錄下生成可執(zhí)行文件mqtt-client凳兵,直接運行即可。

編譯成動態(tài)庫libmqttclient.so

./make-libmqttclient.sh

運行make-libmqttclient.sh腳本后會在 ./libmqttclient/lib目錄下生成一個動態(tài)庫文件libmqttclient.so企软,并安裝到系統(tǒng)的/usr/lib目錄下庐扫,相關(guān)頭文件已經(jīng)拷貝到./libmqttclient/include目錄下,編譯應(yīng)用程序的時候只需要鏈接動態(tài)庫即可-lmqttclient,動態(tài)庫的配置文件根據(jù)./test/mqtt_config.h配置的形庭。

設(shè)計思想

  • 整體采用分層式設(shè)計铅辞,代碼實現(xiàn)采用異步設(shè)計方式,降低耦合萨醒。
  • 消息的處理使用回調(diào)的方式處理:用戶指定[訂閱的主題]與指定[消息的處理函數(shù)]
  • 不對外產(chǎn)生依賴

API

mqttclient擁有非常簡潔的api接口

int mqtt_keep_alive(mqtt_client_t* c);
int mqtt_init(mqtt_client_t* c, client_init_params_t* init);
int mqtt_release(mqtt_client_t* c);
int mqtt_connect(mqtt_client_t* c);
int mqtt_disconnect(mqtt_client_t* c);
int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t msg_handler);
int mqtt_unsubscribe(mqtt_client_t* c, const char* topic_filter);
int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg);
int mqtt_list_subscribe_topic(mqtt_client_t* c);
int mqtt_set_interceptor_handler(mqtt_client_t* c, interceptor_handler_t handler);

核心

mqtt_client_t 結(jié)構(gòu)

typedef struct mqtt_client {
    unsigned short              packet_id;
    unsigned char               ping_outstanding;
    unsigned char               ack_handler_number;
    unsigned char               *read_buf;
    unsigned char               *write_buf;
    unsigned int                cmd_timeout;
    unsigned int                read_buf_size;
    unsigned int                write_buf_size;
    unsigned int                reconnect_try_duration;
    void                        *reconnect_date;
    reconnect_handler_t         reconnect_handler;
    client_state_t              client_state;
    platform_mutex_t            write_lock;
    platform_mutex_t            global_lock;
    mqtt_list_t                 msg_handler_list;
    mqtt_list_t                 ack_handler_list;
    network_t                   *network;
    platform_thread_t           *thread;
    platform_timer_t            reconnect_timer;
    platform_timer_t            last_sent;
    platform_timer_t            last_received;
    connect_params_t            *connect_params;
    interceptor_handler_t       interceptor_handler;
} mqtt_client_t;

該結(jié)構(gòu)主要維護以下內(nèi)容:

  1. 讀寫數(shù)據(jù)緩沖區(qū)read_buf斟珊、write_buf
  2. 命令超時時間cmd_timeout(主要是讀寫阻塞時間、等待響應(yīng)的時間富纸、重連等待時間)
  3. 維護ack鏈表ack_handler_list囤踩,這是異步實現(xiàn)的核心,所有等待響應(yīng)的報文都會被掛載到這個鏈表上
  4. 維護消息處理列表msg_handler_list晓褪,這是mqtt協(xié)議必須實現(xiàn)的內(nèi)容堵漱,所有來自服務(wù)器的publish報文都會被處理(前提是訂閱了對應(yīng)的消息)
  5. 維護一個網(wǎng)卡接口network
  6. 維護一個內(nèi)部線程thread,所有來自服務(wù)器的mqtt包都會在這里被處理涣仿!
  7. 兩個定時器勤庐,分別是掉線重連定時器與保活定時器reconnect_timer好港、last_sent愉镰、last_received
  8. 一些連接的參數(shù)connect_params

mqttclient實現(xiàn)

以下是整個框架的實現(xiàn)方式,方便大家更容易理解mqttclient的代碼與設(shè)計思想钧汹,讓大家能夠修改源碼與使用丈探,還可以提交pr或者issues,開源的世界期待各位大神的參與崭孤,感謝类嗤!

除此之外以下代碼的記錄機制與其超時處理機制是非常好的編程思想,大家有興趣一定要看源代碼辨宠!

初始化

int mqtt_init(mqtt_client_t* c, client_init_params_t* init)

主要是配置mqtt_client_t結(jié)構(gòu)的相關(guān)信息遗锣,如果沒有指定初始化參數(shù),則系統(tǒng)會提供默認的參數(shù)。
但連接部分的參數(shù)則必須指定:

    init_params.connect_params.network_params.addr = "[你的mqtt服務(wù)器IP地址或者是域名]";
    init_params.connect_params.network_params.port = 1883;  //端口號
    init_params.connect_params.user_name = "jiejietop";
    init_params.connect_params.password = "123456";
    init_params.connect_params.client_id = "clientid";

    mqtt_init(&client, &init_params);

連接服務(wù)器

int mqtt_connect(mqtt_client_t* c);

參數(shù)只有 mqtt_client_t 類型的指針,字符串類型的主題(支持通配符"#" "+")棘劣,主題的服務(wù)質(zhì)量,以及收到報文的處理函數(shù)笔咽,如不指定則有默認處理函數(shù)。連接服務(wù)器則是使用非異步的方式設(shè)計霹期,因為必須等待連接上服務(wù)器才能進行下一步操作叶组。

過程如下:

  1. 調(diào)用底層的連接函數(shù)連接上服務(wù)器:
c->network->connect(c->network);
  1. 序列化mqttCONNECT報文并且發(fā)送
MQTTSerialize_connect(c->write_buf, c->write_buf_size, &connect_data)
mqtt_send_packet(c, len, &connect_timer)
  1. 等待來自服務(wù)器的CONNACK報文
mqtt_wait_packet(c, CONNACK, &connect_timer)
  1. 連接成功后創(chuàng)建一個內(nèi)部線程mqtt_yield_thread,并在合適的時候啟動它:
platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, MQTT_THREAD_STACK_SIZE, MQTT_THREAD_PRIO, MQTT_THREAD_TICK)

if (NULL != c->thread) {
    mqtt_set_client_state(c, CLIENT_STATE_CONNECTED);
    platform_thread_startup(c->thread);
    platform_thread_start(c->thread);       /* start run mqtt thread */
}
  1. 而對于重連來說則不會重新創(chuàng)建線程历造,直接改變客戶端狀態(tài)為連接狀態(tài)即可:
mqtt_set_client_state(c, CLIENT_STATE_CONNECTED);

訂閱報文

int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t handler)

訂閱報文使用異步設(shè)計來實現(xiàn)的:
過程如下:

  1. 序列化訂閱報文并且發(fā)送給服務(wù)器
MQTTSerialize_subscribe(c->write_buf, c->write_buf_size, 0, mqtt_get_next_packet_id(c), 1, &topic, (int*)&qos)
mqtt_send_packet(c, len, &timer)
  1. 創(chuàng)建對應(yīng)的消息處理節(jié)點甩十,這個消息節(jié)點在收到服務(wù)器的SUBACK訂閱應(yīng)答報文后會掛載到消息處理列表msg_handler_list
mqtt_msg_handler_create(topic_filter, qos, handler)
  1. 在發(fā)送了報文給服務(wù)器那就要等待服務(wù)器的響應(yīng)了船庇,先記錄這個等待SUBACK
mqtt_ack_list_record(c, SUBACK, mqtt_get_next_packet_id(c), len, msg_handler)

取消訂閱

與訂閱報文的邏輯基本差不多的~

  1. 序列化訂閱報文并且發(fā)送給服務(wù)器
MQTTSerialize_unsubscribe(c->write_buf, c->write_buf_size, 0, packet_id, 1, &topic)
mqtt_send_packet(c, len, &timer)
  1. 創(chuàng)建對應(yīng)的消息處理節(jié)點,這個消息節(jié)點在收到服務(wù)器的UNSUBACK取消訂閱應(yīng)答報文后將消息處理列表msg_handler_list上的已經(jīng)訂閱的主題消息節(jié)點銷毀
mqtt_msg_handler_create((const char*)topic_filter, QOS0, NULL)
  1. 在發(fā)送了報文給服務(wù)器那就要等待服務(wù)器的響應(yīng)了侣监,先記錄這個等待UNSUBACK
mqtt_ack_list_record(c, UNSUBACK, packet_id, len, msg_handler)

發(fā)布報文

int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg)

參數(shù)只有 mqtt_client_t 類型的指針鸭轮,字符串類型的主題(支持通配符),要發(fā)布的消息(包括服務(wù)質(zhì)量橄霉、消息主體)窃爷。

    mqtt_message_t msg;
    
    msg.qos = 2;
    msg.payload = (void *) buf;
    
    mqtt_publish(&client, "testtopic1", &msg);

核心思想都差不多,過程如下:

  1. 先序列化發(fā)布報文姓蜂,然后發(fā)送到服務(wù)器
MQTTSerialize_publish(c->write_buf, c->write_buf_size, 0, msg->qos, msg->retained, msg->id,
              topic, (unsigned char*)msg->payload, msg->payloadlen);
mqtt_send_packet(c, len, &timer)
  1. 對于QOS0的邏輯按厘,不做任何處理,對于QOS1和QOS2的報文則需要記錄下來钱慢,在沒收到服務(wù)器應(yīng)答的時候進行重發(fā)
    if (QOS1 == msg->qos) {
        rc = mqtt_ack_list_record(c, PUBACK, mqtt_get_next_packet_id(c), len, NULL);
    } else if (QOS2 == msg->qos) {
        rc = mqtt_ack_list_record(c, PUBREC, mqtt_get_next_packet_id(c), len, NULL);
    }
  1. 還有非常重要的一點刻剥,重發(fā)報文的MQTT報文頭部需要設(shè)置DUP標(biāo)志位,這是MQTT協(xié)議的標(biāo)準(zhǔn)滩字,因此,在重發(fā)的時候作者直接操作了報文的DUP標(biāo)志位御吞,因為修改DUP標(biāo)志位的函數(shù)我沒有從MQTT庫中找到麦箍,所以我封裝了一個函數(shù),這與LwIP中的交叉存取思想是一個道理陶珠,它假設(shè)我知道MQTT報文的所有操作挟裂,所以我可以操作它,這樣子可以提高很多效率:
mqtt_set_publish_dup(c,1);  /* may resend this data, set the udp flag in advance */

內(nèi)部線程

static void mqtt_yield_thread(void *arg)

主要是對mqtt_yield函數(shù)的返回值做處理揍诽,比如在disconnect的時候銷毀這個線程诀蓉。

核心的處理函數(shù)

  1. 數(shù)據(jù)包的處理mqtt_packet_handle
static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer)

對不同的包使用不一樣的處理:

    switch (packet_type) {
        case 0: /* timed out reading packet */
            break;

        case CONNACK:
            break;

        case PUBACK:
        case PUBCOMP:
            rc = mqtt_puback_and_pubcomp_packet_handle(c, timer);
            break;

        case SUBACK:
            rc = mqtt_suback_packet_handle(c, timer);
            break;
            
        case UNSUBACK:
            rc = mqtt_unsuback_packet_handle(c, timer);
            break;

        case PUBLISH:
            rc = mqtt_publish_packet_handle(c, timer);
            break;

        case PUBREC:
        case PUBREL:
            rc = mqtt_pubrec_and_pubrel_packet_handle(c, timer);
            break;

        case PINGRESP:
            c->ping_outstanding = 0;
            break;

        default:
            goto exit;
    }

并且做保活的處理:

mqtt_keep_alive(c)

當(dāng)發(fā)生超時后

if (platform_timer_is_expired(&c->last_sent) || platform_timer_is_expired(&c->last_received)) 

序列號一個心跳包并且發(fā)送給服務(wù)器

MQTTSerialize_pingreq(c->write_buf, c->write_buf_size);
mqtt_send_packet(c, len, &timer);

當(dāng)再次發(fā)生超時后暑脆,表示與服務(wù)器的連接已斷開渠啤,需要重連的操作,設(shè)置客戶端狀態(tài)為斷開連接

mqtt_set_client_state(c, CLIENT_STATE_DISCONNECTED);
  1. ack鏈表的掃描添吗,當(dāng)收到服務(wù)器的報文時沥曹,對ack列表進行掃描操作
mqtt_ack_list_scan(c);

當(dāng)超時后就銷毀ack鏈表節(jié)點:

mqtt_ack_handler_destroy(ack_handler);

當(dāng)然下面這幾種報文則需要重發(fā)操作:(PUBACK 、PUBREC碟联、 PUBREL 妓美、PUBCOMP,保證QOS1 QOS2的服務(wù)質(zhì)量)

if ((ack_handler->type ==  PUBACK) || (ack_handler->type ==  PUBREC) || (ack_handler->type ==  PUBREL) || (ack_handler->type ==  PUBCOMP))
    mqtt_ack_handler_resend(c, ack_handler);
  1. 保持活性的時間過去了鲤孵,可能掉線了壶栋,需要重連操作
mqtt_try_reconnect(c);

重連成功后嘗試重新訂閱報文,保證恢復(fù)原始狀態(tài)~

mqtt_try_resubscribe(c)

發(fā)布應(yīng)答與發(fā)布完成報文的處理

static int mqtt_puback_and_pubcomp_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
  1. 反序列化報文
MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->read_buf, c->read_buf_size)
  1. 取消對應(yīng)的ack記錄
mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL);

訂閱應(yīng)答報文的處理

static int mqtt_suback_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
  1. 反序列化報文
MQTTDeserialize_suback(&packet_id, 1, &count, (int*)&granted_qos, c->read_buf, c->read_buf_size)
  1. 取消對應(yīng)的ack記錄
mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL);
  1. 安裝對應(yīng)的訂閱消息處理函數(shù)普监,如果是已存在的則不會安裝
mqtt_msg_handlers_install(c, msg_handler);

取消訂閱應(yīng)答報文的處理

static int mqtt_unsuback_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
  1. 反序列化報文
MQTTDeserialize_unsuback(&packet_id, c->read_buf, c->read_buf_size)
  1. 取消對應(yīng)的ack記錄贵试,并且獲取到已經(jīng)訂閱的消息處理節(jié)點
mqtt_ack_list_unrecord(c, UNSUBACK, packet_id, &msg_handler)
  1. 銷毀對應(yīng)的訂閱消息處理函數(shù)
mqtt_msg_handler_destory(msg_handler);

來自服務(wù)器的發(fā)布報文的處理

static int mqtt_publish_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
  1. 反序列化報文
MQTTDeserialize_publish(&msg.dup, &qos, &msg.retained, &msg.id, &topic_name,
        (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->read_buf, c->read_buf_size)
  1. 對于QOS0琉兜、QOS1的報文,直接去處理消息
mqtt_deliver_message(c, &topic_name, &msg);
  1. 對于QOS1的報文锡移,還需要發(fā)送一個PUBACK應(yīng)答報文給服務(wù)器
MQTTSerialize_ack(c->write_buf, c->write_buf_size, PUBACK, 0, msg.id);
  1. 而對于QOS2的報文則需要發(fā)送PUBREC報文給服務(wù)器呕童,除此之外還需要記錄PUBREL到ack鏈表上,等待服務(wù)器的發(fā)布釋放報文淆珊,最后再去處理這個消息
MQTTSerialize_ack(c->write_buf, c->write_buf_size, PUBREC, 0, msg.id);
mqtt_ack_list_record(c, PUBREL, msg.id + 1, len, NULL)
mqtt_deliver_message(c, &topic_name, &msg);

說明:一旦注冊到ack列表上的報文夺饲,當(dāng)具有重復(fù)的報文是不會重新被注冊的,它會通過mqtt_ack_list_node_is_exist函數(shù)判斷這個節(jié)點是否存在施符,主要是依賴等待響應(yīng)的消息類型與msgid往声。

發(fā)布收到與發(fā)布釋放報文的處理

static int mqtt_pubrec_and_pubrel_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
  1. 反序列化報文
MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->read_buf, c->read_buf_size)
  1. 產(chǎn)生一個對應(yīng)的應(yīng)答報文
mqtt_publish_ack_packet(c, packet_id, packet_type);
  1. 取消對應(yīng)的ack記錄
mqtt_ack_list_unrecord(c, UNSUBACK, packet_id, &msg_handler)

開源地址

https://github.com/jiejieTop/mqttclient

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市戳吝,隨后出現(xiàn)的幾起案子浩销,更是在濱河造成了極大的恐慌,老刑警劉巖听哭,帶你破解...
    沈念sama閱讀 222,183評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件慢洋,死亡現(xiàn)場離奇詭異,居然都是意外死亡陆盘,警方通過查閱死者的電腦和手機普筹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,850評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來隘马,“玉大人太防,你說我怎么就攤上這事∷嵩保” “怎么了蜒车?”我有些...
    開封第一講書人閱讀 168,766評論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長幔嗦。 經(jīng)常有香客問我酿愧,道長,這世上最難降的妖魔是什么邀泉? 我笑而不...
    開封第一講書人閱讀 59,854評論 1 299
  • 正文 為了忘掉前任寓娩,我火速辦了婚禮,結(jié)果婚禮上呼渣,老公的妹妹穿的比我還像新娘棘伴。我一直安慰自己,他們只是感情好屁置,可當(dāng)我...
    茶點故事閱讀 68,871評論 6 398
  • 文/花漫 我一把揭開白布焊夸。 她就那樣靜靜地躺著,像睡著了一般蓝角。 火紅的嫁衣襯著肌膚如雪阱穗。 梳的紋絲不亂的頭發(fā)上饭冬,一...
    開封第一講書人閱讀 52,457評論 1 311
  • 那天,我揣著相機與錄音揪阶,去河邊找鬼昌抠。 笑死,一個胖子當(dāng)著我的面吹牛鲁僚,可吹牛的內(nèi)容都是我干的炊苫。 我是一名探鬼主播,決...
    沈念sama閱讀 40,999評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼冰沙,長吁一口氣:“原來是場噩夢啊……” “哼侨艾!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起拓挥,我...
    開封第一講書人閱讀 39,914評論 0 277
  • 序言:老撾萬榮一對情侶失蹤唠梨,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后侥啤,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體当叭,經(jīng)...
    沈念sama閱讀 46,465評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,543評論 3 342
  • 正文 我和宋清朗相戀三年盖灸,在試婚紗的時候發(fā)現(xiàn)自己被綠了科展。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,675評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡糠雨,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出徘跪,到底是詐尸還是另有隱情甘邀,我是刑警寧澤,帶...
    沈念sama閱讀 36,354評論 5 351
  • 正文 年R本政府宣布垮庐,位于F島的核電站松邪,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏哨查。R本人自食惡果不足惜逗抑,卻給世界環(huán)境...
    茶點故事閱讀 42,029評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望寒亥。 院中可真熱鬧邮府,春花似錦、人聲如沸溉奕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,514評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽加勤。三九已至仙辟,卻和暖如春同波,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背叠国。 一陣腳步聲響...
    開封第一講書人閱讀 33,616評論 1 274
  • 我被黑心中介騙來泰國打工未檩, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人粟焊。 一個月前我還...
    沈念sama閱讀 49,091評論 3 378
  • 正文 我出身青樓冤狡,卻偏偏與公主長得像,于是被迫代替她去往敵國和親吆玖。 傳聞我的和親對象是個殘疾皇子筒溃,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,685評論 2 360