消息隊(duì)列之RabbitMQ《一》

經(jīng)常聽人說到消息隊(duì)列。顧名思義赐稽,消息隊(duì)列叫榕,處理的對象是消息,而隊(duì)列是先進(jìn)先出姊舵。我們隨隨便便的一個(gè)請求晰绎,可能涉及到多個(gè)服務(wù),服務(wù)之間需要互相通信蠢莺,那就是消息寒匙。消息隊(duì)列是一種進(jìn)程之間通信或者同一進(jìn)程不同線程之間的通信方式。主要解決應(yīng)用的耦合躏将、異步消息、流量削峰考蕾。
雖然kafka很有名祸憋,但是rabbitMQ的文檔更好看,對初學(xué)者很友好肖卧。

學(xué)習(xí)消息隊(duì)列蚯窥,我們首先要了解2個(gè)東西,生產(chǎn)者(Produce)和消費(fèi)者(Consumer)塞帐。生產(chǎn)者發(fā)送消息拦赠,消費(fèi)者消費(fèi)消息。


Producer發(fā)送消息hello隊(duì)列葵姥,然后Consumer從hello隊(duì)列去獲取消息

RabbitMQ是一套開源(MPL)的消息隊(duì)列服務(wù)軟件荷鼠,它實(shí)現(xiàn)了高級消息隊(duì)列協(xié)議(AMQP)。它提供server服務(wù)榔幸,同時(shí)支持多種語言的客戶端連接允乐。
首先我們要確定自己需要將rabbitMQ的Server服務(wù)部署在哪臺(或哪幾臺)機(jī)器上矮嫉。然后我在那臺機(jī)器上下載安裝rabbitMQ或者使用docker啟動(dòng) 。我在我本機(jī)上(mac)用docker啟動(dòng)牍疏。


截屏2022-04-08 下午6.14.28.png

docker命令啟動(dòng)如下:

# for RabbitMQ 3.9, the latest series
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management

rabbitMQ服務(wù)起來之后蠢笋,要使用它上面的消息隊(duì)列。我們這邊需要與之進(jìn)行連接通訊鳞陨。python官方推薦使用pika昨寞。所以我們需要下載pika

pip install pika

一切準(zhǔn)備就緒,我們可以開始了厦滤。首先是我們的生產(chǎn)者producer编矾。它需要:

1、與rabbitMQ建立連接馁害。創(chuàng)建一個(gè)pika. BlockingConnection的連接對象窄俏,并創(chuàng)建獲取一個(gè)與之通信的channel。
2碘菜、使用queue_declare聲明一個(gè)隊(duì)列凹蜈,如果需要?jiǎng)t創(chuàng)建。因?yàn)槲覀儼l(fā)送消息之前必須確保相應(yīng)的隊(duì)列是存在的忍啸,如果不存在仰坦,則rabbitMQ會把該消息丟棄掉。
3计雌、通過channel發(fā)送消息到rabbitMQ Server悄晃。通常情況下,消息都不會直接發(fā)送到隊(duì)列凿滤,而是會經(jīng)過exchange交換機(jī)妈橄。但是RabbitMQ提供了一個(gè)默認(rèn)的exchange,當(dāng)使用默認(rèn)的exchange時(shí)翁脆,我們可以通過routing_key直接指定發(fā)送給哪個(gè)隊(duì)列眷蚓。
具體代碼如下:sender.py

import pika

#與broker建立連接,默認(rèn)端口是5672
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#創(chuàng)建一個(gè)新的channel,可以指定channel_num,未指定的話由系統(tǒng)分配一個(gè)有效的反番。
channel = connection.channel()

#聲明一個(gè)"hello"隊(duì)列沙热,如果不存在,有需要就創(chuàng)建
channel.queue_declare(queue='hello')

#發(fā)送5次Hello World消息
for i in range(5):
#exchange設(shè)置為' '表示使用默認(rèn)的exchange罢缸,通過routing_key指定直接發(fā)送到哪個(gè)隊(duì)列篙贸,routing_key的值即為queue的名稱。
    channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!'+str(i))
    print("[X] Sent 'Hello World!"+str(i))

#關(guān)閉連接   
connection.close()

然后我們再看下消費(fèi)者怎么消費(fèi)隊(duì)列的消息:
receive.py

import sys
import os
import pika

#定義一個(gè)收到消息后的回調(diào)函數(shù)枫疆,該函數(shù)需要4個(gè)參數(shù):
# - channel: BlockingChannel爵川,表示所在的channel
# - method: spec.Basic.Deliver
# - properties: spec.BasicProperties
# - body: bytes,表示收到的消息
def callback(ch,method,properties,body):
    print("[X] Received %r" %body)

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    #同樣,聲明一個(gè)隊(duì)列养铸,如果不存在就創(chuàng)建雁芙。
    channel.queue_declare(queue='hello')

    #調(diào)用basic_consume轧膘,指定queue和回調(diào)函數(shù),接受消息
    # auto_ack: 自動(dòng)確認(rèn)消息
    channel.basic_consume(queue='hello',
                          auto_ack=True,
                          on_message_callback=callback)

    print("[*] Waiting for message.To exit press Ctrl+C")
    channel.start_consuming()



if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

準(zhǔn)備就緒兔甘,我們依次啟動(dòng)receive.py谎碍、sender.py,運(yùn)行結(jié)果如下:


sender.png
receive.png

上面我們是先啟動(dòng)receive,等待著洞焙,然后再啟動(dòng)sender.如果我們先啟動(dòng)sender,再啟動(dòng)receive,打印也還是同上蟆淀,只是不是發(fā)一個(gè)取一個(gè)了,而是發(fā)5個(gè)澡匪,取5個(gè)熔任。也就是只要發(fā)送出去的,存在隊(duì)列里唁情,沒有人來取的話疑苔,就在隊(duì)列里等著別人來取。
當(dāng)然甸鸟,有時(shí)我們可能還不止一個(gè)消費(fèi)者惦费,


多個(gè)消費(fèi)者

我們可以開啟2個(gè)receive同時(shí)接收消息,那么它們會依次取出隊(duì)列的消息抢韭,不重復(fù)薪贫。我們發(fā)送10個(gè)消息,打印如下:

send:
[X] Sent 'Hello World!0
[X] Sent 'Hello World!1
[X] Sent 'Hello World!2
[X] Sent 'Hello World!3
[X] Sent 'Hello World!4
[X] Sent 'Hello World!5
[X] Sent 'Hello World!6
[X] Sent 'Hello World!7
[X] Sent 'Hello World!8
[X] Sent 'Hello World!9

receive1:
[X] Received b'Hello World!0'
[X] Received b'Hello World!2'
[X] Received b'Hello World!4'
[X] Received b'Hello World!6'
[X] Received b'Hello World!8'

receive2:
[X] Received b'Hello World!1'
[X] Received b'Hello World!3'
[X] Received b'Hello World!5'
[X] Received b'Hello World!7'
[X] Received b'Hello World!9'

好了刻恭,以上我們就完成了一個(gè)簡單的消息隊(duì)列的使用了!
接下來瞧省,我們再看兩個(gè)參數(shù):auto_ackx-single-active-consumer

auto_ack:自動(dòng)確認(rèn)消息標(biāo)志。默認(rèn)為False鳍贾。上面我們設(shè)置為True鞍匾,實(shí)際上就是讓它在收到消息后給rabbitMQ發(fā)送一個(gè)消息確認(rèn)消息,通知它我們已經(jīng)收到消息了贾漏,它可以將消息從消息隊(duì)列中刪除了候学。這樣我們?nèi)绻鹯eceive如果中斷,這些已經(jīng)被處理的消息就不在隊(duì)列中了纵散。
上面的例子中,我們的回調(diào)函數(shù)很簡單隐圾,只是打印出消息伍掀。但是大部分情況下,我們收到消息后暇藏,都需要做一些處理蜜笤,甚至有些是耗時(shí)的密集型處理,可能需要稍等幾秒盐碱。這時(shí)我們就不能使用auto_ask了把兔,因?yàn)槿f一該任務(wù)突然中斷沪伙,該消息還沒處理完,我們就直接將其從消息隊(duì)列中刪掉了县好,我們就沒辦法完整地完成這條消息的處理了围橡。我們不想丟掉任何一條消息。我們只想在我們已經(jīng)處理完該條消息了之后再給rabbitMQ發(fā)送確認(rèn)消息缕贡。這時(shí)翁授,就需要用到basic_ack。在消息處理完成之后晾咪,調(diào)用:ch.basic_ack(delivery_tag=method.delivery_tag)收擦,手動(dòng)發(fā)送確認(rèn)消息。

如果我們忘記調(diào)用basic_ack谍倦,消息就會一直存在消息隊(duì)列中嗎塞赂?那倒也不是,rabbitMQ有一個(gè)默認(rèn)的消息確認(rèn)超時(shí)時(shí)間:30分鐘昼蛀。超過30分鐘未收到確認(rèn)宴猾,它就會因PRECONDITION_FAILED而異常關(guān)閉channel,然后再這個(gè)channel上的所有消費(fèi)者曹洽,都將重新隊(duì)列鳍置。
這個(gè)時(shí)間在rabbitmq.conf可定義:

# 30 minutes in milliseconds
consumer_timeout = 1800000

同時(shí)你還可以在advanced.config中完全禁用這個(gè)超時(shí)時(shí)間,只是不推薦使用

%% advanced.config
[
  {rabbit, [
    {consumer_timeout, undefined}
  ]}
].

x-single-active-consumer:單個(gè)激活狀態(tài)的消費(fèi)者。
這個(gè)是在聲明隊(duì)列的時(shí)候的參數(shù)送淆,我們可以通過arguments參數(shù)税产,將x-single-active-consumer設(shè)置為True:

  arguments = {"x-single-active-consumer":True}
  channel.queue_declare(queue='hello1',arguments=arguments)

一旦將x-single-active-consumer設(shè)置為True,則這個(gè)隊(duì)列只允許存在一個(gè)有效的消費(fèi)者消費(fèi)消息。上面的例子中偷崩,如果我們設(shè)置了這個(gè)參數(shù)辟拷,又啟動(dòng)了2個(gè)receive,則只有一個(gè)receive可以取到消息,它會取到所有的消息阐斜。
tips:如果一個(gè)隊(duì)列已經(jīng)創(chuàng)建為非x-single-active-consumer的衫冻,而你想更改其為x-single-active-consumer,上面的代碼是行不通的谒出,會報(bào)錯(cuò)隅俘,會提示你,聲明的隊(duì)列的和server上的隊(duì)列不一致笤喳。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末为居,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子杀狡,更是在濱河造成了極大的恐慌蒙畴,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,496評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件呜象,死亡現(xiàn)場離奇詭異膳凝,居然都是意外死亡碑隆,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評論 3 392
  • 文/潘曉璐 我一進(jìn)店門蹬音,熙熙樓的掌柜王于貴愁眉苦臉地迎上來上煤,“玉大人,你說我怎么就攤上這事祟绊÷ト耄” “怎么了?”我有些...
    開封第一講書人閱讀 162,632評論 0 353
  • 文/不壞的土叔 我叫張陵牧抽,是天一觀的道長嘉熊。 經(jīng)常有香客問我,道長扬舒,這世上最難降的妖魔是什么阐肤? 我笑而不...
    開封第一講書人閱讀 58,180評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮讲坎,結(jié)果婚禮上孕惜,老公的妹妹穿的比我還像新娘。我一直安慰自己晨炕,他們只是感情好蹦狂,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,198評論 6 388
  • 文/花漫 我一把揭開白布净捅。 她就那樣靜靜地躺著缠导,像睡著了一般沦泌。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上费奸,一...
    開封第一講書人閱讀 51,165評論 1 299
  • 那天弥激,我揣著相機(jī)與錄音,去河邊找鬼愿阐。 笑死微服,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的缨历。 我是一名探鬼主播以蕴,決...
    沈念sama閱讀 40,052評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼辛孵!你這毒婦竟也來了舒裤?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,910評論 0 274
  • 序言:老撾萬榮一對情侶失蹤觉吭,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后仆邓,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鲜滩,經(jīng)...
    沈念sama閱讀 45,324評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡伴鳖,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,542評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了徙硅。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片榜聂。...
    茶點(diǎn)故事閱讀 39,711評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖嗓蘑,靈堂內(nèi)的尸體忽然破棺而出须肆,到底是詐尸還是另有隱情,我是刑警寧澤桩皿,帶...
    沈念sama閱讀 35,424評論 5 343
  • 正文 年R本政府宣布豌汇,位于F島的核電站,受9級特大地震影響泄隔,放射性物質(zhì)發(fā)生泄漏拒贱。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,017評論 3 326
  • 文/蒙蒙 一佛嬉、第九天 我趴在偏房一處隱蔽的房頂上張望逻澳。 院中可真熱鬧,春花似錦暖呕、人聲如沸斜做。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,668評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽瓤逼。三九已至,卻和暖如春钝腺,著一層夾襖步出監(jiān)牢的瞬間抛姑,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,823評論 1 269
  • 我被黑心中介騙來泰國打工艳狐, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留定硝,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,722評論 2 368
  • 正文 我出身青樓毫目,卻偏偏與公主長得像蔬啡,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子镀虐,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,611評論 2 353

推薦閱讀更多精彩內(nèi)容