2021-01-11 mqtt協(xié)議簡(jiǎn)介及消息總線EMQX與客戶端Paho快速上手

1. MQTT簡(jiǎn)介

MQTT(Message Queuing Telemetry Transport慕蔚,消息隊(duì)列遙測(cè)傳輸協(xié)議)彼绷,是基于“訂閱/發(fā)布”模式的輕量級(jí)通信協(xié)議巍佑,該協(xié)議基于TCP/IP,能以極低的帶寬為海量(百萬級(jí))跨域設(shè)備提供可靠的消息服務(wù)寄悯,因此在物聯(lián)網(wǎng)萤衰、小型移動(dòng)終端、邊緣計(jì)算方面有廣泛應(yīng)用猜旬。
所謂可靠的消息傳輸脆栋,體現(xiàn)為可配置消息的服務(wù)質(zhì)量(QoS),有三種服務(wù)質(zhì)量可選:

  • 至多一次:
    消息發(fā)布完全依賴底層TCP/IP網(wǎng)絡(luò)洒擦。會(huì)發(fā)生消息丟失或重復(fù)椿争。應(yīng)用場(chǎng)景如環(huán)境傳感器的數(shù)據(jù)采集,丟失一次記錄無所謂熟嫩,因?yàn)椴痪煤筮€會(huì)有第二次發(fā)送秦踪。
  • 至少一次:
    確保消息送達(dá)訂閱者,但消息可能重復(fù)掸茅,適用于冪等性操作椅邓。
  • 只有一次:
    最嚴(yán)格的消息服務(wù)質(zhì)量,確保消息到達(dá)且僅到達(dá)一次訂閱者昧狮。應(yīng)用場(chǎng)景如計(jì)費(fèi)系統(tǒng)等景馁。

MQTT協(xié)議中存在三種身份:消息總線(Broker)、發(fā)布者(Publish)和訂閱者(Subscribe)陵且,其中消息總線屬于服務(wù)器裁僧,后兩者都屬于客戶端个束。發(fā)布者和訂閱者可以是各種物聯(lián)網(wǎng)設(shè)備和小型終端,消息發(fā)布者可以同時(shí)也是消息訂閱者聊疲,如下圖所示茬底。


MQTT.png

MQTT有一個(gè)底層覆蓋網(wǎng):它將建立客戶端到服務(wù)器的連接,提供兩者之間有序获洲、無損阱表、基于字節(jié)流的雙向傳輸。當(dāng)應(yīng)用數(shù)據(jù)通過MQTT網(wǎng)絡(luò)發(fā)送時(shí)贡珊,MQTT會(huì)把與之相關(guān)的服務(wù)質(zhì)量(QoS)和主題名(Topic)相關(guān)連最爬。
客戶端與消息總線建立連接后就是一個(gè)會(huì)話(Session),它們之間有周期性的狀態(tài)交互门岔,且可跨越多個(gè)連續(xù)的網(wǎng)絡(luò)連接爱致。
MQTT傳輸?shù)南⒎譃椋褐黝}(Topic)和負(fù)載(payload)兩部分:

  • Topic,可以理解為消息的類型寒随,訂閱者訂閱(Subscribe)后糠悯,就會(huì)收到該主題的消息內(nèi)容(payload);
  • payload妻往,可以理解為消息的內(nèi)容互艾,是指訂閱者具體要使用的內(nèi)容。

訂閱消息時(shí)讯泣,可以在訂閱表達(dá)式中使用通配符篩選器對(duì)主題進(jìn)行篩選纫普,可同時(shí)訂閱所匹配的多個(gè)主題。
MQTT協(xié)議中主要有以下5個(gè)方法:

  • connect:客戶端建立與服務(wù)器的連接
  • disconnect:等待客戶端完成工作后好渠,端口與總線的會(huì)話
  • subscribe:客戶端向消息總線注冊(cè)訂閱主題
  • unsubscribe:客戶端等待消息總線取消所注冊(cè)的訂閱
  • publish:客戶端向消息總線發(fā)送某主題的消息

2. 消息總線EMQX

EMQX(Erlang/Enterprise/Elastic MQTT Broker)昨稼,是基于Erlang語言開發(fā)的開源物聯(lián)網(wǎng)MQTT消息總線。其是一款由前華為員工開發(fā)的開源軟件晦墙,官網(wǎng)地址在此悦昵,配套有豐富的文檔和使用說明。我們可以使用docker來快速部署晌畅,官網(wǎng)也有簡(jiǎn)潔明了的安裝說明但指,這里不再冗述。
docker部署后抗楔,綁定了1883端口和18083端口棋凳。其中1883端口是服務(wù)總線對(duì)外提供服務(wù)的端口,客戶端通過該端口與消息總線通訊连躏。18083端口是網(wǎng)頁版后臺(tái)監(jiān)控端口剩岳,打開localhost:18083,默認(rèn)的管理員名為Admin入热,密碼為public拍棕。監(jiān)控界面可以看到所有連接到總線的客戶節(jié)點(diǎn)晓铆、注冊(cè)的主題、傳遞消息的統(tǒng)計(jì)數(shù)據(jù)等都一目了然绰播,還提供了測(cè)試工具骄噪,可以測(cè)試連接消息總線并訂閱主題,也可向總線發(fā)送主題消息測(cè)試客戶端是否能收到訂閱內(nèi)容蠢箩。

EMQ.png

Docker部署后链蕊,系統(tǒng)已具備基本使用條件。關(guān)于如何使用插件谬泌、配置安全認(rèn)證等滔韵,在官方文檔有詳細(xì)步驟說明,可按圖索翼深入研究掌实,這里不再展開陪蜻,以后有需要再行探索。

3. 客戶端Paho

Paho是Eclipse基金會(huì)支持的MQTT客戶端開源實(shí)現(xiàn)贱鼻,官網(wǎng)地址在此舱呻。其提供了如下語言庫:java触幼、Python例书、JavaScript夷蚊、GoLang鼠锈、C/C++/C#暇咆、Rust诈豌、Android Service等组题。

3.1 最簡(jiǎn)搭建

這里橱脸,我選擇用Python基于Paho快速搭建一個(gè)最簡(jiǎn)單的MQTT客戶端础米。

  • 安裝Paho模塊
    先利用virtualenv創(chuàng)建python虛擬環(huán)境,在該虛擬環(huán)境下使用pip安裝Paho:

pip install paho-mqtt

  • 編寫一個(gè)publish客戶端:publish.py
import paho.mqtt.client as mqtt

client=mqtt.Client()
client.connect('127.0.0.1',1883,600)
client.loop_start()
while True:
    topic=input('Enter the topic name: ')
    message=input('Enter the message to send: ')
    client.publish(topic,payload=message,qos=0)
  • 編寫一個(gè)subscribe客戶端:subscribe.py
import paho.mqtt.client as mqtt

def on_message(client,userdata,msg):
    print(msg.topic+" "+str(msg.payload))

client=mqtt.Client()
client.on_message=on_message
client.connect('127.0.0.1',1883,600)
topic=input('Enter the topic you want to subscribe: ')
client.subscribe(topic,qos=0)
client.loop_forever()

先運(yùn)行subscribe.py添诉,進(jìn)行持續(xù)監(jiān)聽屁桑。再另起一個(gè)終端,進(jìn)入虛擬環(huán)境栏赴,并運(yùn)行publish.py蘑斧,輸入對(duì)應(yīng)的主題和消息內(nèi)容,之前的客戶端就能收到消息了须眷。
也可以到EMQX的監(jiān)控后臺(tái)中竖瘾,通過測(cè)試工具發(fā)送消息和訂閱消息,也可在監(jiān)控視圖中查看到當(dāng)前連接的客戶端花颗、消息統(tǒng)計(jì)數(shù)據(jù)等捕传。

3.2 常用方法

Paho for python的官方使用手冊(cè)在此,其中列出了詳細(xì)說明扩劝。此處列出其中的常用方法庸论,并簡(jiǎn)要說明职辅。

  • username_pw_set(username, password=None)
    在 connect()之前設(shè)置client的用戶名和密碼聂示,依據(jù)MQTT配置的mqtt_acl與mqtt_user表中的ACL規(guī)則與用戶信息進(jìn)行用戶驗(yàn)證域携。若MQTT開啟了ACL驗(yàn)證,就必須登錄驗(yàn)證催什。
  • connect( host, port=1883, keepalive=60 bind_address="" )
    以阻塞的方式進(jìn)行連接
    host:消息總線的主機(jī)名或IP
    port:端口
    keepalive:無通信的最長(zhǎng)時(shí)間(秒)涵亏, 如果沒有交換其他消息,則控制客戶端向總線發(fā)送ping的速率
    bind_address:多個(gè)接口綁定本地ip地址
  • connect_async( host, port=1883, keepalive=60, bind_address="" )
    以異步非阻塞的方式進(jìn)行連接
  • disconnect()
    斷開連接蒲凶,將不會(huì)等待所有排隊(duì)的消息被發(fā)送气筋。如果想確保所有消息在斷開連接前都已發(fā)送,則需要使用wait_for_publish()函數(shù)來發(fā)送消息旋圆,使用方法同publis()函數(shù)一致宠默。
  • publish(topic, payload=None, qos=0, retain=False)
    向客戶端代理發(fā)送消息
    topic:消息的主題
    payload:消息內(nèi)容,字節(jié)流形式灵巧,若想發(fā)送int/float等類型數(shù)據(jù)搀矫,可通過struct.pack()方法來創(chuàng)建payload,并在接收端通過struct.unpack()方法恢復(fù)數(shù)據(jù)刻肄,關(guān)于struct的使用方法見文末瓤球。
    qos:服務(wù)質(zhì)量(0,1,2分別對(duì)應(yīng)本文開頭所說的三種質(zhì)量類型)
    retain:若設(shè)置為True,則該條消息為保留消息敏弃,消息總線會(huì)保存每個(gè)Topic的最后一條保留消息及其QoS卦羡,當(dāng)訂閱該Topic的客戶新上線,則總線會(huì)將該消息遞送給它麦到。類似于微信公眾號(hào)新關(guān)注者收到一條默認(rèn)消息绿饵。刪除該保留消息的方法是發(fā)送空消息體的保留消息或發(fā)送最新的保留消息進(jìn)行覆蓋。
  • subscribe( topic, qos=0 )
    訂閱topic瓶颠,可以使用元組數(shù)組的方式訂閱多個(gè)主題:subscribe([("my/topic1",0),("my/topic2",0)...])
  • unsubscribe(topic)
    取消訂閱
  • loop( timeout=1.0, max_packets=1 )
    定期調(diào)用網(wǎng)絡(luò)處理
while True:
  client.loop()
  • loop_start() / loop_stop()
    實(shí)現(xiàn)網(wǎng)絡(luò)循環(huán)的線程接口, 可以控制線程的啟動(dòng)和結(jié)束拟赊。loop_start()可以在connect()之前或之后調(diào)用,調(diào)用后會(huì)啟動(dòng)一個(gè)后臺(tái)線程粹淋,自動(dòng)執(zhí)行l(wèi)oop函數(shù)吸祟,從而將主線程從上文的while True死循環(huán)中釋放出來。
client.connect("mqtt.eclipse.org")
client.loop_start()
while True:
    temperature = sensor.blocking_read()
    client.publish("paho/temperature", temperature)
  • loop_forever()
    網(wǎng)絡(luò)循環(huán)的阻塞形式桃移,在調(diào)用 disconnect() 之前不會(huì)停止欢搜。
  • on_connect(client, userdata, flags, rc)
    回調(diào)函數(shù),當(dāng)總線響應(yīng)我們連接請(qǐng)求時(shí)被調(diào)用谴轮。這是一個(gè)被@property和@on_connect.setter修飾的訪問器炒瘟。
    client:回調(diào)該函數(shù)的client實(shí)例
    userdata:在 Client()\user_data_set()中設(shè)置的私有用戶數(shù)據(jù)
    flags:總線的響應(yīng)標(biāo)志,是一個(gè) dict 字典第步,flags['session present']疮装,當(dāng)客戶端重新連接到它先前已連接的總線時(shí)缘琅,此標(biāo)志指示總線是否仍然具有該客戶端的會(huì)話信息。 如果為1廓推,則會(huì)話仍然存在刷袍。
    rc:連接狀況

0:連接成功
1:連接拒絕-不正確的協(xié)議版本
2:連接拒絕-無效客戶標(biāo)識(shí)符
3:連接拒絕-服務(wù)器不可用
4:連接拒絕-錯(cuò)誤用戶名或密碼
5:連接拒絕-未授權(quán)
6-255:當(dāng)前未使用。

def on_connect(client, userdata, flags, rc):
    print("Connection returned result: "+connack_string(rc))

mqttc.on_connect = on_connect
...
  • on_disconnect(client, userdata, rc)
    當(dāng)客戶端斷開連接時(shí)使用
    rc:斷開狀態(tài), 如果是 0 , 則是調(diào)用 disconnect()斷開的, 如果是其他任何值, 則表示意外斷開
def on_disconnect(client, userdata, rc):
    if rc != 0:
        print("Unexpected disconnection.")

client.on_disconnect = on_disconnect
  • on_message(client, userdata, message)
    當(dāng)在訂閱的主題上收到了消息, 并且與現(xiàn)在的主題篩選器回調(diào)不匹配時(shí)使用樊展。使用message_callback_add()可以定義一個(gè)特殊主題篩選器的回調(diào)函數(shù)呻纹,on_message()在篩選器無匹配時(shí)被調(diào)用。
    message 是一個(gè) MQTTMessage實(shí)例, 它的屬性有topic, payload,qos,retain专缠。
def on_message(client, userdata, message):
   print("Received message '" + str(message.payload) + "' on topic '"
       + message.topic + "' with QoS " + str(message.qos))

client.on_message = on_message
  • message_callback_add(sub, callback)
    定義具有特殊主題篩選器的回調(diào)函數(shù)雷酪,可以使用通配符匹配多個(gè)主題,比如sensors/#可以匹配sensors/temperature和sensors/humidity涝婉。
    sub:過濾器哥力,一個(gè)過濾器字符串只能對(duì)應(yīng)一個(gè)callback。若能匹配上墩弯,則on_message不會(huì)再被調(diào)用吩跋。若多個(gè)匹配器匹配,則這多個(gè)匹配器對(duì)應(yīng)的callback都會(huì)被調(diào)用渔工。
    callback:與on_message函數(shù)格式一致锌钮。
  • message_callback_remove(sub)
    移除過濾器
  • on_publish(client, userdata, mid)
    發(fā)布的回調(diào)函數(shù)
  • on_subscribe(client, userdata, mid, granted_qos)
    訂閱的回調(diào)函數(shù)
  • on_unsubscribe(client, userdata, mid)
    取消訂閱的回調(diào)函數(shù)
  • on_log(client, userdata, level, buf)
    客戶端有日志信息時(shí)調(diào)用, 能與Python Log同時(shí)使用
    level等級(jí)包括:MQTT_LOG_INFO, MQTT_LOG_NOTICE, MQTT_LOG_WARNING, MQTT_LOG_ERR, MQTT_LOG_DEBUG

4. 附錄:struct用法

python中struct是用來處理結(jié)構(gòu)數(shù)據(jù)的,可將結(jié)構(gòu)數(shù)據(jù)轉(zhuǎn)換為字節(jié)流引矩,再將字節(jié)流恢復(fù)為結(jié)構(gòu)數(shù)據(jù)轧粟。在轉(zhuǎn)換過程中,需要使用一個(gè)格式化字符串脓魏,該字符串中的格式需與所處理的結(jié)構(gòu)數(shù)據(jù)格式一一對(duì)應(yīng)。

  • struct.pack(format,v1,v2,...)
  • struct.unpack(format,string)

其中格式字符串由一個(gè)或多個(gè)格式字符組成通惫,格式字符參照如下:


格式字符.png

字節(jié)流存在大小端問題茂翔,在格式字符串首位,有一個(gè)可選字符來決定是大端還是小端履腋,參考如下珊燎。默認(rèn)為@,即使用本機(jī)字符順序遵湖。


大小端.png

這里有個(gè)示例:

import  struct
buffer1 =  struct.pack( "ihb" ,  1 ,  2 ,  3 )
buffer2 = struct.unpack( "ihb" ,  buffer )
data  =  [ 1 ,  2 ,  3 ]
buffer3 =  struct.pack( "!ihb" ,  *data)
buffer4 =  struct.unpack( "!ihb" ,  buffer3 )
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末悔政,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子延旧,更是在濱河造成了極大的恐慌谋国,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,695評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件迁沫,死亡現(xiàn)場(chǎng)離奇詭異芦瘾,居然都是意外死亡捌蚊,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門近弟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來缅糟,“玉大人,你說我怎么就攤上這事祷愉〈盎拢” “怎么了?”我有些...
    開封第一講書人閱讀 168,130評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵二鳄,是天一觀的道長(zhǎng)赴涵。 經(jīng)常有香客問我,道長(zhǎng)泥从,這世上最難降的妖魔是什么句占? 我笑而不...
    開封第一講書人閱讀 59,648評(píng)論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮躯嫉,結(jié)果婚禮上纱烘,老公的妹妹穿的比我還像新娘。我一直安慰自己祈餐,他們只是感情好擂啥,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,655評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著帆阳,像睡著了一般哺壶。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蜒谤,一...
    開封第一講書人閱讀 52,268評(píng)論 1 309
  • 那天山宾,我揣著相機(jī)與錄音,去河邊找鬼鳍徽。 笑死资锰,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的阶祭。 我是一名探鬼主播绷杜,決...
    沈念sama閱讀 40,835評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼濒募!你這毒婦竟也來了鞭盟?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,740評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤瑰剃,失蹤者是張志新(化名)和其女友劉穎齿诉,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,286評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡鹃两,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,375評(píng)論 3 340
  • 正文 我和宋清朗相戀三年遗座,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片俊扳。...
    茶點(diǎn)故事閱讀 40,505評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡途蒋,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出馋记,到底是詐尸還是另有隱情号坡,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布梯醒,位于F島的核電站宽堆,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏茸习。R本人自食惡果不足惜畜隶,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,873評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望号胚。 院中可真熱鬧籽慢,春花似錦、人聲如沸猫胁。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽弃秆。三九已至届惋,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間菠赚,已是汗流浹背脑豹。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評(píng)論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留衡查,地道東北人瘩欺。 一個(gè)月前我還...
    沈念sama閱讀 48,921評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像峡捡,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子筑悴,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,515評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容