1. MQTT簡(jiǎn)介
MQTT(Message Queuing Telemetry Transport慕蔚,消息隊(duì)列遙測(cè)傳輸協(xié)議)彼绷,是基于“訂閱/發(fā)布”模式的輕量級(jí)通信協(xié)議巍佑,該協(xié)議基于TCP/IP,能以極低的帶寬為海量(百萬級(jí))跨域設(shè)備提供可靠的消息服務(wù)寄悯,因此在物聯(lián)網(wǎng)萤衰、小型移動(dòng)終端、邊緣計(jì)算方面有廣泛應(yīng)用猜旬。
所謂可靠的消息傳輸脆栋,體現(xiàn)為可配置消息的服務(wù)質(zhì)量(QoS),有三種服務(wù)質(zhì)量可選:
- 至多一次:
消息發(fā)布完全依賴底層TCP/IP網(wǎng)絡(luò)洒擦。會(huì)發(fā)生消息丟失或重復(fù)椿争。應(yīng)用場(chǎng)景如環(huán)境傳感器的數(shù)據(jù)采集,丟失一次記錄無所謂熟嫩,因?yàn)椴痪煤筮€會(huì)有第二次發(fā)送秦踪。 - 至少一次:
確保消息送達(dá)訂閱者,但消息可能重復(fù)掸茅,適用于冪等性操作椅邓。 - 只有一次:
最嚴(yán)格的消息服務(wù)質(zhì)量,確保消息到達(dá)且僅到達(dá)一次訂閱者昧狮。應(yīng)用場(chǎng)景如計(jì)費(fèi)系統(tǒng)等景馁。
MQTT協(xié)議中存在三種身份:消息總線(Broker)、發(fā)布者(Publish)和訂閱者(Subscribe)陵且,其中消息總線屬于服務(wù)器裁僧,后兩者都屬于客戶端个束。發(fā)布者和訂閱者可以是各種物聯(lián)網(wǎng)設(shè)備和小型終端,消息發(fā)布者可以同時(shí)也是消息訂閱者聊疲,如下圖所示茬底。
MQTT有一個(gè)底層覆蓋網(wǎng):它將建立客戶端到服務(wù)器的連接,提供兩者之間有序获洲、無損阱表、基于字節(jié)流的雙向傳輸。當(dāng)應(yīng)用數(shù)據(jù)通過MQTT網(wǎng)絡(luò)發(fā)送時(shí)贡珊,MQTT會(huì)把與之相關(guān)的服務(wù)質(zhì)量(QoS)和主題名(Topic)相關(guān)連最爬。
客戶端與消息總線建立連接后就是一個(gè)會(huì)話(Session),它們之間有周期性的狀態(tài)交互门岔,且可跨越多個(gè)連續(xù)的網(wǎng)絡(luò)連接爱致。
MQTT傳輸?shù)南⒎譃椋褐黝}(Topic)和負(fù)載(payload)兩部分:
- Topic,可以理解為消息的類型寒随,訂閱者訂閱(Subscribe)后糠悯,就會(huì)收到該主題的消息內(nèi)容(payload);
- payload妻往,可以理解為消息的內(nèi)容互艾,是指訂閱者具體要使用的內(nèi)容。
訂閱消息時(shí)讯泣,可以在訂閱表達(dá)式中使用通配符篩選器對(duì)主題進(jìn)行篩選纫普,可同時(shí)訂閱所匹配的多個(gè)主題。
MQTT協(xié)議中主要有以下5個(gè)方法:
- connect:客戶端建立與服務(wù)器的連接
- disconnect:等待客戶端完成工作后好渠,端口與總線的會(huì)話
- subscribe:客戶端向消息總線注冊(cè)訂閱主題
- unsubscribe:客戶端等待消息總線取消所注冊(cè)的訂閱
- publish:客戶端向消息總線發(fā)送某主題的消息
2. 消息總線EMQX
EMQX(Erlang/Enterprise/Elastic MQTT Broker)昨稼,是基于Erlang語言開發(fā)的開源物聯(lián)網(wǎng)MQTT消息總線。其是一款由前華為員工開發(fā)的開源軟件晦墙,官網(wǎng)地址在此悦昵,配套有豐富的文檔和使用說明。我們可以使用docker來快速部署晌畅,官網(wǎng)也有簡(jiǎn)潔明了的安裝說明但指,這里不再冗述。
docker部署后抗楔,綁定了1883端口和18083端口棋凳。其中1883端口是服務(wù)總線對(duì)外提供服務(wù)的端口,客戶端通過該端口與消息總線通訊连躏。18083端口是網(wǎng)頁版后臺(tái)監(jiān)控端口剩岳,打開localhost:18083,默認(rèn)的管理員名為Admin入热,密碼為public拍棕。監(jiān)控界面可以看到所有連接到總線的客戶節(jié)點(diǎn)晓铆、注冊(cè)的主題、傳遞消息的統(tǒng)計(jì)數(shù)據(jù)等都一目了然绰播,還提供了測(cè)試工具骄噪,可以測(cè)試連接消息總線并訂閱主題,也可向總線發(fā)送主題消息測(cè)試客戶端是否能收到訂閱內(nèi)容蠢箩。
Docker部署后链蕊,系統(tǒng)已具備基本使用條件。關(guān)于如何使用插件谬泌、配置安全認(rèn)證等滔韵,在官方文檔有詳細(xì)步驟說明,可按圖索翼深入研究掌实,這里不再展開陪蜻,以后有需要再行探索。
3. 客戶端Paho
Paho是Eclipse基金會(huì)支持的MQTT客戶端開源實(shí)現(xiàn)贱鼻,官網(wǎng)地址在此舱呻。其提供了如下語言庫:java触幼、Python例书、JavaScript夷蚊、GoLang鼠锈、C/C++/C#暇咆、Rust诈豌、Android Service等组题。
3.1 最簡(jiǎn)搭建
這里橱脸,我選擇用Python基于Paho快速搭建一個(gè)最簡(jiǎn)單的MQTT客戶端础米。
- 安裝Paho模塊
先利用virtualenv創(chuàng)建python虛擬環(huán)境,在該虛擬環(huán)境下使用pip安裝Paho:
pip install paho-mqtt
- 編寫一個(gè)publish客戶端:publish.py
import paho.mqtt.client as mqtt
client=mqtt.Client()
client.connect('127.0.0.1',1883,600)
client.loop_start()
while True:
topic=input('Enter the topic name: ')
message=input('Enter the message to send: ')
client.publish(topic,payload=message,qos=0)
- 編寫一個(gè)subscribe客戶端:subscribe.py
import paho.mqtt.client as mqtt
def on_message(client,userdata,msg):
print(msg.topic+" "+str(msg.payload))
client=mqtt.Client()
client.on_message=on_message
client.connect('127.0.0.1',1883,600)
topic=input('Enter the topic you want to subscribe: ')
client.subscribe(topic,qos=0)
client.loop_forever()
先運(yùn)行subscribe.py添诉,進(jìn)行持續(xù)監(jiān)聽屁桑。再另起一個(gè)終端,進(jìn)入虛擬環(huán)境栏赴,并運(yùn)行publish.py蘑斧,輸入對(duì)應(yīng)的主題和消息內(nèi)容,之前的客戶端就能收到消息了须眷。
也可以到EMQX的監(jiān)控后臺(tái)中竖瘾,通過測(cè)試工具發(fā)送消息和訂閱消息,也可在監(jiān)控視圖中查看到當(dāng)前連接的客戶端花颗、消息統(tǒng)計(jì)數(shù)據(jù)等捕传。
3.2 常用方法
Paho for python的官方使用手冊(cè)在此,其中列出了詳細(xì)說明扩劝。此處列出其中的常用方法庸论,并簡(jiǎn)要說明职辅。
- username_pw_set(username, password=None)
在 connect()之前設(shè)置client的用戶名和密碼聂示,依據(jù)MQTT配置的mqtt_acl與mqtt_user表中的ACL規(guī)則與用戶信息進(jìn)行用戶驗(yàn)證域携。若MQTT開啟了ACL驗(yàn)證,就必須登錄驗(yàn)證催什。 - connect( host, port=1883, keepalive=60 bind_address="" )
以阻塞的方式進(jìn)行連接
host:消息總線的主機(jī)名或IP
port:端口
keepalive:無通信的最長(zhǎng)時(shí)間(秒)涵亏, 如果沒有交換其他消息,則控制客戶端向總線發(fā)送ping的速率
bind_address:多個(gè)接口綁定本地ip地址 - connect_async( host, port=1883, keepalive=60, bind_address="" )
以異步非阻塞的方式進(jìn)行連接 - disconnect()
斷開連接蒲凶,將不會(huì)等待所有排隊(duì)的消息被發(fā)送气筋。如果想確保所有消息在斷開連接前都已發(fā)送,則需要使用wait_for_publish()函數(shù)來發(fā)送消息旋圆,使用方法同publis()函數(shù)一致宠默。 - publish(topic, payload=None, qos=0, retain=False)
向客戶端代理發(fā)送消息
topic:消息的主題
payload:消息內(nèi)容,字節(jié)流形式灵巧,若想發(fā)送int/float等類型數(shù)據(jù)搀矫,可通過struct.pack()方法來創(chuàng)建payload,并在接收端通過struct.unpack()方法恢復(fù)數(shù)據(jù)刻肄,關(guān)于struct的使用方法見文末瓤球。
qos:服務(wù)質(zhì)量(0,1,2分別對(duì)應(yīng)本文開頭所說的三種質(zhì)量類型)
retain:若設(shè)置為True,則該條消息為保留消息敏弃,消息總線會(huì)保存每個(gè)Topic的最后一條保留消息及其QoS卦羡,當(dāng)訂閱該Topic的客戶新上線,則總線會(huì)將該消息遞送給它麦到。類似于微信公眾號(hào)新關(guān)注者收到一條默認(rèn)消息绿饵。刪除該保留消息的方法是發(fā)送空消息體的保留消息或發(fā)送最新的保留消息進(jìn)行覆蓋。 - subscribe( topic, qos=0 )
訂閱topic瓶颠,可以使用元組數(shù)組的方式訂閱多個(gè)主題:subscribe([("my/topic1",0),("my/topic2",0)...]) - unsubscribe(topic)
取消訂閱 - loop( timeout=1.0, max_packets=1 )
定期調(diào)用網(wǎng)絡(luò)處理
while True:
client.loop()
- loop_start() / loop_stop()
實(shí)現(xiàn)網(wǎng)絡(luò)循環(huán)的線程接口, 可以控制線程的啟動(dòng)和結(jié)束拟赊。loop_start()可以在connect()之前或之后調(diào)用,調(diào)用后會(huì)啟動(dòng)一個(gè)后臺(tái)線程粹淋,自動(dòng)執(zhí)行l(wèi)oop函數(shù)吸祟,從而將主線程從上文的while True死循環(huán)中釋放出來。
client.connect("mqtt.eclipse.org")
client.loop_start()
while True:
temperature = sensor.blocking_read()
client.publish("paho/temperature", temperature)
- loop_forever()
網(wǎng)絡(luò)循環(huán)的阻塞形式桃移,在調(diào)用 disconnect() 之前不會(huì)停止欢搜。 - on_connect(client, userdata, flags, rc)
回調(diào)函數(shù),當(dāng)總線響應(yīng)我們連接請(qǐng)求時(shí)被調(diào)用谴轮。這是一個(gè)被@property和@on_connect.setter修飾的訪問器炒瘟。
client:回調(diào)該函數(shù)的client實(shí)例
userdata:在 Client()\user_data_set()中設(shè)置的私有用戶數(shù)據(jù)
flags:總線的響應(yīng)標(biāo)志,是一個(gè) dict 字典第步,flags['session present']疮装,當(dāng)客戶端重新連接到它先前已連接的總線時(shí)缘琅,此標(biāo)志指示總線是否仍然具有該客戶端的會(huì)話信息。 如果為1廓推,則會(huì)話仍然存在刷袍。
rc:連接狀況
0:連接成功
1:連接拒絕-不正確的協(xié)議版本
2:連接拒絕-無效客戶標(biāo)識(shí)符
3:連接拒絕-服務(wù)器不可用
4:連接拒絕-錯(cuò)誤用戶名或密碼
5:連接拒絕-未授權(quán)
6-255:當(dāng)前未使用。
def on_connect(client, userdata, flags, rc):
print("Connection returned result: "+connack_string(rc))
mqttc.on_connect = on_connect
...
- on_disconnect(client, userdata, rc)
當(dāng)客戶端斷開連接時(shí)使用
rc:斷開狀態(tài), 如果是 0 , 則是調(diào)用 disconnect()斷開的, 如果是其他任何值, 則表示意外斷開
def on_disconnect(client, userdata, rc):
if rc != 0:
print("Unexpected disconnection.")
client.on_disconnect = on_disconnect
-
on_message(client, userdata, message)
當(dāng)在訂閱的主題上收到了消息, 并且與現(xiàn)在的主題篩選器回調(diào)不匹配時(shí)使用樊展。使用message_callback_add()可以定義一個(gè)特殊主題篩選器的回調(diào)函數(shù)呻纹,on_message()在篩選器無匹配時(shí)被調(diào)用。
message 是一個(gè) MQTTMessage實(shí)例, 它的屬性有topic, payload,qos,retain专缠。
def on_message(client, userdata, message):
print("Received message '" + str(message.payload) + "' on topic '"
+ message.topic + "' with QoS " + str(message.qos))
client.on_message = on_message
-
message_callback_add(sub, callback)
定義具有特殊主題篩選器的回調(diào)函數(shù)雷酪,可以使用通配符匹配多個(gè)主題,比如sensors/#可以匹配sensors/temperature和sensors/humidity涝婉。
sub:過濾器哥力,一個(gè)過濾器字符串只能對(duì)應(yīng)一個(gè)callback。若能匹配上墩弯,則on_message不會(huì)再被調(diào)用吩跋。若多個(gè)匹配器匹配,則這多個(gè)匹配器對(duì)應(yīng)的callback都會(huì)被調(diào)用渔工。
callback:與on_message函數(shù)格式一致锌钮。 - message_callback_remove(sub)
移除過濾器 - on_publish(client, userdata, mid)
發(fā)布的回調(diào)函數(shù) - on_subscribe(client, userdata, mid, granted_qos)
訂閱的回調(diào)函數(shù) - on_unsubscribe(client, userdata, mid)
取消訂閱的回調(diào)函數(shù) - on_log(client, userdata, level, buf)
客戶端有日志信息時(shí)調(diào)用, 能與Python Log同時(shí)使用
level等級(jí)包括:MQTT_LOG_INFO, MQTT_LOG_NOTICE, MQTT_LOG_WARNING, MQTT_LOG_ERR, MQTT_LOG_DEBUG
4. 附錄:struct用法
python中struct是用來處理結(jié)構(gòu)數(shù)據(jù)的,可將結(jié)構(gòu)數(shù)據(jù)轉(zhuǎn)換為字節(jié)流引矩,再將字節(jié)流恢復(fù)為結(jié)構(gòu)數(shù)據(jù)轧粟。在轉(zhuǎn)換過程中,需要使用一個(gè)格式化字符串脓魏,該字符串中的格式需與所處理的結(jié)構(gòu)數(shù)據(jù)格式一一對(duì)應(yīng)。
- struct.pack(format,v1,v2,...)
- struct.unpack(format,string)
其中格式字符串由一個(gè)或多個(gè)格式字符組成通惫,格式字符參照如下:
字節(jié)流存在大小端問題茂翔,在格式字符串首位,有一個(gè)可選字符來決定是大端還是小端履腋,參考如下珊燎。默認(rèn)為@,即使用本機(jī)字符順序遵湖。
這里有個(gè)示例:
import struct
buffer1 = struct.pack( "ihb" , 1 , 2 , 3 )
buffer2 = struct.unpack( "ihb" , buffer )
data = [ 1 , 2 , 3 ]
buffer3 = struct.pack( "!ihb" , *data)
buffer4 = struct.unpack( "!ihb" , buffer3 )