回顧
上一章學(xué)習(xí)了如何使用pika第三方庫(kù)連接RabbitMQ,并通過(guò)RabbitMQ傳遞消息订咸,實(shí)現(xiàn)了簡(jiǎn)易的生產(chǎn)者消費(fèi)者模式。
上一章傳送門(mén):RabbitMQ入門(mén)
文章是我學(xué)習(xí)記錄用的酬诀,可以看rabbitmq中文文檔
脏嚷,他們解釋的更專(zhuān)業(yè)。
任務(wù)隊(duì)列
首先我們知道了什么是隊(duì)列瞒御,那么什么是任務(wù)隊(duì)列呢父叙?
任務(wù)隊(duì)列,也有叫工作隊(duì)列的肴裙。一般情況下趾唱,我們會(huì)將耗時(shí)或者大量占用資源的操作封裝成任務(wù),然后把任務(wù)發(fā)送給任務(wù)隊(duì)列(其實(shí)這里的任務(wù)隊(duì)列就是RabbitMQ)蜻懦,然后再由工作進(jìn)程(或者叫做worker)將任務(wù)取出并執(zhí)行甜癞,執(zhí)行結(jié)束后觸發(fā)回調(diào)函數(shù),來(lái)告知RabbitMQ任務(wù)執(zhí)行完畢宛乃。
PS:上面說(shuō)的概念有點(diǎn)亂悠咱,不過(guò)沒(méi)關(guān)系,知道是什么就可以征炼。
準(zhǔn)備工作
現(xiàn)在假設(shè)有這樣一個(gè)需求析既,我們用一個(gè)字符串來(lái)表示任務(wù),后面每個(gè)‘.’來(lái)表示耗時(shí)1秒谆奥,比如'task1...'眼坏,代表任務(wù)名為task1,耗時(shí)3秒雄右。
準(zhǔn)備兩個(gè)腳本空骚,send.py和receive.py纺讲,分別表示生產(chǎn)者和消費(fèi)者。
send.py
import sys
import pika
task = sys.argv[1] or "Hello World."
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue')
channel.basic_publish(exchange='', routing_key='task_queue', body=task)
print("sent to [task_queue]: {}".format(task))
這里的task囤屹,我是用命令行參數(shù)的形式傳遞進(jìn)去的熬甚。稍后會(huì)在使用時(shí)說(shuō)明。
receive.py
import time
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare('task_queue')
def callback(ch, method, properties, body):
print("Recevied: {}".format(body))
time.sleep(body.count('.'))
print("{} done.".format(body))
channel.basic_consume(on_message_callback=callback, queue='task_queue')
print("waiting for task of task-queue. To exit press CTRL+C.")
channel.start_consuming()
模擬工作隊(duì)列
寫(xiě)好了腳本之后肋坚,我們可以開(kāi)始探索工作隊(duì)列了乡括。
現(xiàn)在,打開(kāi)終端智厌,并開(kāi)啟三個(gè)窗口诲泌。
PS:這里順便說(shuō)個(gè)Mac上的小技巧,按住cmd+空格會(huì)調(diào)出Sportlight搜索框铣鹏,輸入ter彈出的第一項(xiàng)就是終端敷扫,直接回車(chē)就能打開(kāi)終端了。
然后按cmd+t快捷鍵來(lái)新開(kāi)一個(gè)命令窗口诚卸,cmd+w來(lái)關(guān)閉當(dāng)前命令窗口葵第。
啟動(dòng)RabbitMQ
在第一個(gè)命令行窗口,你需要先啟動(dòng)RabbitMQ合溺。執(zhí)行rabbitmq-server命令即可卒密。
在第二個(gè)命令行窗口,執(zhí)行python send.py task1...
在第三個(gè)命令行窗口棠赛,執(zhí)行python receive.py
哎哮奇,為什么會(huì)報(bào)錯(cuò)呢,看到最后一行:time.sleep(body.count('.'))報(bào)錯(cuò)了睛约。
經(jīng)過(guò)斷點(diǎn)調(diào)試才發(fā)現(xiàn)鼎俘,body是bytes類(lèi)型,而count方法是str對(duì)象的痰腮。所以而芥,恍然大悟律罢,得做解碼處理膀值。
time.sleep(body.decode('utf-8').count('.'))
然后,就可以正常運(yùn)行了误辑,結(jié)果如下圖:
這就說(shuō)明沧踏,消費(fèi)者已經(jīng)成功處理了‘task1...’