需要使用兩個知識點领虹,一個是多線程,一個是隊列queue求豫。
需要先開一個線程塌衰,該線程是訂閱topic,然后等待消息蝠嘉。
另外一個線程猾蒂,就是發(fā)布消息。
比較坑的是mqtt的python包是晨,只說了回調函數on_message肚菠,但是該回調是通過subscribe函數調用的,無法返回message到主程序罩缴。
經過多次嘗試蚊逢,最后想到用queue层扶,在on_message中put message,然后在主函數中get message烙荷,再在返回message镜会。
on_message函數將消息寫入queue
def on_message(client, userdata, msg):
mss = str((msg.payload).decode('utf-8'))
print(mss)
mp.put(mss)
主函數get后返回給主程序,并且開了兩個線程
def mqtt_send_check(client, topic, msg):
client.loop_start()
thread_sub = threading.Thread(target=mqtt_subscribe, args=(client,topic))
thread_sub.start()
thread_pub = threading.Thread(target=mqtt_publish, args=(client,topic,msg))
thread_pub.start()
time.sleep(4)
client.loop_stop()
return(mp.get())
pytest測試程序
class Test_comm:
TOPIC_REPORT = PRODUCT_KEY + "/" + DEVICE_NAME + "/abc"
def test_comm(self):
MSG = "aaa"
client = mqtt_connect(MQTT_SERVER_ADDR, MQTT_SERVER_PORT,PRODUCT_KEY, DEVICE_NAME,DEVICE_SECRET, TIMESTAMP)
pps = mqtt_send_check(client, self.TOPIC_REPORT, MSG)
print('pps:',pps)
pytest程序通過這種方式终抽,可以得到message消息戳表,實現斷言了。