經(jīng)常聽人說到消息隊(duì)列。顧名思義赐稽,消息隊(duì)列叫榕,處理的對象是消息,而隊(duì)列是先進(jìn)先出姊舵。我們隨隨便便的一個(gè)請求晰绎,可能涉及到多個(gè)服務(wù),服務(wù)之間需要互相通信蠢莺,那就是消息寒匙。消息隊(duì)列是一種進(jìn)程之間通信或者同一進(jìn)程不同線程之間的通信方式。主要解決應(yīng)用的耦合躏将、異步消息、流量削峰考蕾。
雖然kafka很有名祸憋,但是rabbitMQ的文檔更好看,對初學(xué)者很友好肖卧。
學(xué)習(xí)消息隊(duì)列蚯窥,我們首先要了解2個(gè)東西,生產(chǎn)者(Produce)和消費(fèi)者(Consumer)塞帐。生產(chǎn)者發(fā)送消息拦赠,消費(fèi)者消費(fèi)消息。
RabbitMQ是一套開源(MPL)的消息隊(duì)列服務(wù)軟件荷鼠,它實(shí)現(xiàn)了高級消息隊(duì)列協(xié)議(AMQP)。它提供server服務(wù)榔幸,同時(shí)支持多種語言的客戶端連接允乐。
首先我們要確定自己需要將rabbitMQ的Server服務(wù)部署在哪臺(或哪幾臺)機(jī)器上矮嫉。然后我在那臺機(jī)器上下載安裝rabbitMQ或者使用docker啟動(dòng) 。我在我本機(jī)上(mac)用docker啟動(dòng)牍疏。
docker命令啟動(dòng)如下:
# for RabbitMQ 3.9, the latest series
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
rabbitMQ服務(wù)起來之后蠢笋,要使用它上面的消息隊(duì)列。我們這邊需要與之進(jìn)行連接通訊鳞陨。python官方推薦使用pika昨寞。所以我們需要下載pika
pip install pika
一切準(zhǔn)備就緒,我們可以開始了厦滤。首先是我們的生產(chǎn)者producer编矾。它需要:
1、與rabbitMQ建立連接馁害。創(chuàng)建一個(gè)pika. BlockingConnection的連接對象窄俏,并創(chuàng)建獲取一個(gè)與之通信的channel。
2碘菜、使用queue_declare聲明一個(gè)隊(duì)列凹蜈,如果需要?jiǎng)t創(chuàng)建。因?yàn)槲覀儼l(fā)送消息之前必須確保相應(yīng)的隊(duì)列是存在的忍啸,如果不存在仰坦,則rabbitMQ會把該消息丟棄掉。
3计雌、通過channel發(fā)送消息到rabbitMQ Server悄晃。通常情況下,消息都不會直接發(fā)送到隊(duì)列凿滤,而是會經(jīng)過exchange交換機(jī)妈橄。但是RabbitMQ提供了一個(gè)默認(rèn)的exchange,當(dāng)使用默認(rèn)的exchange時(shí)翁脆,我們可以通過routing_key直接指定發(fā)送給哪個(gè)隊(duì)列眷蚓。
具體代碼如下:sender.py
import pika
#與broker建立連接,默認(rèn)端口是5672
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#創(chuàng)建一個(gè)新的channel,可以指定channel_num,未指定的話由系統(tǒng)分配一個(gè)有效的反番。
channel = connection.channel()
#聲明一個(gè)"hello"隊(duì)列沙热,如果不存在,有需要就創(chuàng)建
channel.queue_declare(queue='hello')
#發(fā)送5次Hello World消息
for i in range(5):
#exchange設(shè)置為' '表示使用默認(rèn)的exchange罢缸,通過routing_key指定直接發(fā)送到哪個(gè)隊(duì)列篙贸,routing_key的值即為queue的名稱。
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!'+str(i))
print("[X] Sent 'Hello World!"+str(i))
#關(guān)閉連接
connection.close()
然后我們再看下消費(fèi)者怎么消費(fèi)隊(duì)列的消息:
receive.py
import sys
import os
import pika
#定義一個(gè)收到消息后的回調(diào)函數(shù)枫疆,該函數(shù)需要4個(gè)參數(shù):
# - channel: BlockingChannel爵川,表示所在的channel
# - method: spec.Basic.Deliver
# - properties: spec.BasicProperties
# - body: bytes,表示收到的消息
def callback(ch,method,properties,body):
print("[X] Received %r" %body)
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
#同樣,聲明一個(gè)隊(duì)列养铸,如果不存在就創(chuàng)建雁芙。
channel.queue_declare(queue='hello')
#調(diào)用basic_consume轧膘,指定queue和回調(diào)函數(shù),接受消息
# auto_ack: 自動(dòng)確認(rèn)消息
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
print("[*] Waiting for message.To exit press Ctrl+C")
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
準(zhǔn)備就緒兔甘,我們依次啟動(dòng)receive.py谎碍、sender.py,運(yùn)行結(jié)果如下:
上面我們是先啟動(dòng)receive,等待著洞焙,然后再啟動(dòng)sender.如果我們先啟動(dòng)sender,再啟動(dòng)receive,打印也還是同上蟆淀,只是不是發(fā)一個(gè)取一個(gè)了,而是發(fā)5個(gè)澡匪,取5個(gè)熔任。也就是只要發(fā)送出去的,存在隊(duì)列里唁情,沒有人來取的話疑苔,就在隊(duì)列里等著別人來取。
當(dāng)然甸鸟,有時(shí)我們可能還不止一個(gè)消費(fèi)者惦费,
我們可以開啟2個(gè)receive同時(shí)接收消息,那么它們會依次取出隊(duì)列的消息抢韭,不重復(fù)薪贫。我們發(fā)送10個(gè)消息,打印如下:
send:
[X] Sent 'Hello World!0
[X] Sent 'Hello World!1
[X] Sent 'Hello World!2
[X] Sent 'Hello World!3
[X] Sent 'Hello World!4
[X] Sent 'Hello World!5
[X] Sent 'Hello World!6
[X] Sent 'Hello World!7
[X] Sent 'Hello World!8
[X] Sent 'Hello World!9
receive1:
[X] Received b'Hello World!0'
[X] Received b'Hello World!2'
[X] Received b'Hello World!4'
[X] Received b'Hello World!6'
[X] Received b'Hello World!8'
receive2:
[X] Received b'Hello World!1'
[X] Received b'Hello World!3'
[X] Received b'Hello World!5'
[X] Received b'Hello World!7'
[X] Received b'Hello World!9'
好了刻恭,以上我們就完成了一個(gè)簡單的消息隊(duì)列的使用了!
接下來瞧省,我們再看兩個(gè)參數(shù):auto_ack和x-single-active-consumer
auto_ack:自動(dòng)確認(rèn)消息標(biāo)志。默認(rèn)為False鳍贾。上面我們設(shè)置為True鞍匾,實(shí)際上就是讓它在收到消息后給rabbitMQ發(fā)送一個(gè)消息確認(rèn)消息,通知它我們已經(jīng)收到消息了贾漏,它可以將消息從消息隊(duì)列中刪除了候学。這樣我們?nèi)绻鹯eceive如果中斷,這些已經(jīng)被處理的消息就不在隊(duì)列中了纵散。
上面的例子中,我們的回調(diào)函數(shù)很簡單隐圾,只是打印出消息伍掀。但是大部分情況下,我們收到消息后暇藏,都需要做一些處理蜜笤,甚至有些是耗時(shí)的密集型處理,可能需要稍等幾秒盐碱。這時(shí)我們就不能使用auto_ask了把兔,因?yàn)槿f一該任務(wù)突然中斷沪伙,該消息還沒處理完,我們就直接將其從消息隊(duì)列中刪掉了县好,我們就沒辦法完整地完成這條消息的處理了围橡。我們不想丟掉任何一條消息。我們只想在我們已經(jīng)處理完該條消息了之后再給rabbitMQ發(fā)送確認(rèn)消息缕贡。這時(shí)翁授,就需要用到basic_ack。在消息處理完成之后晾咪,調(diào)用:ch.basic_ack(delivery_tag=method.delivery_tag)收擦,手動(dòng)發(fā)送確認(rèn)消息。
如果我們忘記調(diào)用basic_ack谍倦,消息就會一直存在消息隊(duì)列中嗎塞赂?那倒也不是,rabbitMQ有一個(gè)默認(rèn)的消息確認(rèn)超時(shí)時(shí)間:30分鐘昼蛀。超過30分鐘未收到確認(rèn)宴猾,它就會因PRECONDITION_FAILED而異常關(guān)閉channel,然后再這個(gè)channel上的所有消費(fèi)者曹洽,都將重新隊(duì)列鳍置。
這個(gè)時(shí)間在rabbitmq.conf可定義:
# 30 minutes in milliseconds
consumer_timeout = 1800000
同時(shí)你還可以在advanced.config中完全禁用這個(gè)超時(shí)時(shí)間,只是不推薦使用
%% advanced.config
[
{rabbit, [
{consumer_timeout, undefined}
]}
].
x-single-active-consumer:單個(gè)激活狀態(tài)的消費(fèi)者。
這個(gè)是在聲明隊(duì)列的時(shí)候的參數(shù)送淆,我們可以通過arguments參數(shù)税产,將x-single-active-consumer設(shè)置為True:
arguments = {"x-single-active-consumer":True}
channel.queue_declare(queue='hello1',arguments=arguments)
一旦將x-single-active-consumer設(shè)置為True,則這個(gè)隊(duì)列只允許存在一個(gè)有效的消費(fèi)者消費(fèi)消息。上面的例子中偷崩,如果我們設(shè)置了這個(gè)參數(shù)辟拷,又啟動(dòng)了2個(gè)receive,則只有一個(gè)receive可以取到消息,它會取到所有的消息阐斜。
tips:如果一個(gè)隊(duì)列已經(jīng)創(chuàng)建為非x-single-active-consumer的衫冻,而你想更改其為x-single-active-consumer,上面的代碼是行不通的谒出,會報(bào)錯(cuò)隅俘,會提示你,聲明的隊(duì)列的和server上的隊(duì)列不一致笤喳。