淺析消息隊(duì)列之rabbitMQ

rabbitMQ是一款基于AMQP協(xié)議的消息中間件,它能夠在應(yīng)用之間提供可靠的消息傳輸脓魏。在易用性兰吟,擴(kuò)展性,高可用性上表現(xiàn)優(yōu)秀茂翔。使用消息中間件利于應(yīng)用之間的解耦混蔼,生產(chǎn)者(客戶(hù)端)無(wú)需知道消費(fèi)者(服務(wù)端)的存在。而且兩端可以使用不同的語(yǔ)言編寫(xiě)珊燎,大大提供了靈活性惭嚣。

rabbitMQ工作原理

rabbitMQ里的一些基本定義如下:

exchange

producer只能將消息發(fā)送給exchange。而exchange負(fù)責(zé)將消息發(fā)送到queues悔政。Exchange必須準(zhǔn)確的知道怎么處理它接受到的消息晚吞,是被發(fā)送到一個(gè)特定的queue還是許多quenes,還是被拋棄赐俗,這些規(guī)則則是通過(guò)exchange type來(lái)定義屯远。主要的type有direct,topic,headers,fanout演熟。具體針對(duì)不同的場(chǎng)景使用不同的type婆瓜。

queue

消息隊(duì)列,消息的載體完疫。接收來(lái)自exchange的消息审姓,然后再由consumer取出园担。exchange和queue是可以一對(duì)多的近弟,它們通過(guò)routingKey來(lái)綁定缅糟。

Producer

生產(chǎn)者,消息的來(lái)源,消息必須發(fā)送給exchange祷愉。而不是直接給queue

Consumer

消費(fèi)者窗宦,直接從queue中獲取消息進(jìn)行消費(fèi)赦颇,而不是從exchange。

從以上可以看出rabbitMQ工作原理大致就是producer把一條消息發(fā)送給exchange赴涵。rabbitMQ根據(jù)routingKey負(fù)責(zé)將消息從exchange發(fā)送到對(duì)應(yīng)綁定的queue中去沐扳,這是由rabbitMQ負(fù)責(zé)做的,而consumer只需從queue獲取消息即可句占。

大致流程如下:

rabbitMQ工作模型

下面通過(guò)幾個(gè)列子來(lái)詳細(xì)說(shuō)明一下如何使用rabbitMQ。

1躯嫉、簡(jiǎn)單發(fā)送模型

在rabbitMQ里消息永遠(yuǎn)不能被直接發(fā)送到queue纱烘,這里我們通過(guò)提供一個(gè)空字符串來(lái)使用默認(rèn)的exchange。這個(gè)exchange是特殊的祈餐,它可以根據(jù)routingKey把消息發(fā)送給指定的queue擂啥。所以我們的設(shè)計(jì)看起來(lái)如下所示:

代碼如下

send.py

receive.py

2、工作隊(duì)列模型

這種模式常常用來(lái)處理耗資源耗時(shí)間的任務(wù)在多個(gè)workers中帆阳,主要是為了避免立即去處理一個(gè)耗時(shí)的任務(wù)而等待它的完成哺壶。代替的做法是一個(gè)稍后去處理這個(gè)任務(wù),讓一個(gè)worker process 在后臺(tái)處理這個(gè)任務(wù)蜒谤。當(dāng)有許多workers的時(shí)候山宾,消息將會(huì)以輪詢(xún)的方式被workers獲取。模型如下:

這里就會(huì)有一個(gè)問(wèn)題鳍徽,如果consumer在執(zhí)行任務(wù)時(shí)需要花費(fèi)一些時(shí)間资锰,這個(gè)時(shí)候如果突然掛了,消息還沒(méi)有被完成阶祭,消息豈不是丟失了绷杜,為了不讓消息丟失,rabbitmq提供了消息確認(rèn)機(jī)制濒募,consumer在接收到鞭盟,執(zhí)行完消息后會(huì)發(fā)送一個(gè)ack給rabbitmq告訴它可以從queue中移除消息了。

如果沒(méi)收到ack瑰剃,Rabbitmq會(huì)重新發(fā)送此條消息齿诉,如果有其他的consumer在線,將會(huì)接收并消費(fèi)這條消息培他。消息確認(rèn)機(jī)制是默認(rèn)打開(kāi)的鹃两。如果想關(guān)閉它只需要設(shè)置no_ack=true。在此處我們不需要設(shè)置舀凛。默認(rèn)如下就行:

channel.basic_consume(callback, ?queue='hello')

除了consumer之外我們還得確保rabbitMQ掛了之后消息不被丟失俊扳。這里我們就需要確保隊(duì)列queue和消息messages都得是持久化的。

隊(duì)列的持久話需要設(shè)置durable屬性:

channel.queue_declare(queue= task_queue, durable=True)

消息的持久話則是通過(guò)delivery_mode屬性猛遍,設(shè)置值為2即可馋记。

channel.basic_publish(exchange='',

routing_key="task_queue",

body=message,

properties=pika.BasicProperties(

delivery_mode = 2, # make message persistent

))

還有一個(gè)屬性相對(duì)比較重要号坡,它可以保證consumer確認(rèn)消費(fèi)完一條消息之后再去獲取下一條消息。如果consumer正在忙碌的狀態(tài)梯醒,消息將會(huì)被分發(fā)到下一個(gè)不是很忙的consumer宽堆。

設(shè)置如下:

channel.basic_qos(prefetch_count=1)

下面貼出部分代碼

producer.py

consumer.py

3、廣播模型

在前面2個(gè)示例我們都適用默認(rèn)的exchange茸习。這里我們將自己定義一個(gè)exchange畜隶。并設(shè)置type為fanout。它可以將消息廣播給綁定的每一個(gè)queue号胚。而不再是某一個(gè)queue籽慢。我們?cè)诖藙?chuàng)建一個(gè)叫l(wèi)ogs的exchange,就像下面這樣:

channel.exchange_declare(exchange='logs', type='fanout')

所以發(fā)布消息就變成了下面這樣:

channel.basic_publish(exchange='logs',routing_key='', body=message)

在這里我們需要將消息發(fā)送給所有的queues猫胁。而不需要指定某些隊(duì)列箱亿。所以我們這里就用臨時(shí)隊(duì)列代替。并設(shè)置在失去連接后刪除隊(duì)列弃秆。當(dāng)然我們也可以不這么做届惋。

設(shè)置臨時(shí)隊(duì)列,讓rabbitmq給我們一個(gè)隨機(jī)的隊(duì)列名字菠赚,設(shè)置exclusive為true確保失去連接的時(shí)候隊(duì)列也被刪除了脑豹。因?yàn)槲覀冞@里不需要持久化隊(duì)列。

result = channel.queue_declare(exclusive=True)

下面就是要綁定queues和exchange:

channel.queue_bind(exchange='logs', queue=result.method.queue)

綜上所述我們的設(shè)計(jì)就像下面這樣:

部分代碼如下

producer.py

consumer.py

4衡查、direct模型

在上個(gè)模型中晨缴,消息被發(fā)送給所有的消費(fèi)者,而在這一部分我們將通過(guò)路由的方式使exchange通過(guò)定義的路由方式將消息發(fā)送給隊(duì)列峡捡。所以我們需要在綁定exchange和queue的時(shí)候指定routing_key字段击碗,注意這里的routing_key不是basic_publish中的routing_key。

見(jiàn)如下:

channel.queue_bind(exchange=exchange_name,queue=queue_name,

routing_key='black')

這里我們將使用type為direct的exchange们拙。這種路由方式exchange將消息通過(guò)綁定的routing_key發(fā)送到指定的隊(duì)列稍途。而且exchange可以通過(guò)多個(gè)routing_key把消息發(fā)送給同一個(gè)queue。

通過(guò)下面這張圖我們來(lái)分析一下:

在上面的圖中砚婆,我們可以看出type為direct的exchange X 綁定了2個(gè)隊(duì)列械拍。隊(duì)列Q1關(guān)聯(lián)路由orange。隊(duì)列Q2關(guān)聯(lián)路由black和green装盯。所以一個(gè)帶有路由健orange消息將被exchange發(fā)送給隊(duì)列Q1坷虑。而帶有路由健black或者green的消息將被發(fā)送給隊(duì)列Q2。

我們還是通過(guò)修改前面的日志系統(tǒng)埂奈,來(lái)展示direct類(lèi)型的exchange如何工作迄损,如圖:

部分代碼如下

producer.py

consumer.py

讓我們運(yùn)行一下看看結(jié)果是什么,我們啟動(dòng)了3個(gè)consumer,routing_key分別指定為warning, error账磺,第三個(gè)同時(shí)指定這2個(gè)芹敌。然后在運(yùn)行producer時(shí)帶上路由信息routing_key痊远。運(yùn)行后可以看出指定了warning的不會(huì)收到error的消息。同時(shí)指定warning 和error的consumer則會(huì)都收到消息氏捞。

發(fā)送消息:

只收到warning的消息:

只收到error的消息:

error和waring的都能收到:

5碧聪、Topic模型

這種模型是最靈活的,相比較于direct的完全匹配和fanout的廣播液茎。Topic可以用類(lèi)似正則的手法更好的匹配來(lái)滿(mǎn)足我們的應(yīng)用逞姿。下面我們首先了解一下topic類(lèi)型的exchange:

topic類(lèi)型的routing_key不可以是隨意的單詞,它必須是一系列的單詞組合捆等,中間以逗號(hào)隔開(kāi)哼凯,譬如“quick.orange.rabbit”這個(gè)樣子。發(fā)送消息的routing_key必須匹配上綁定到隊(duì)列的routing_key楚里。消息才會(huì)被發(fā)送。此外還有個(gè)重要的地方要說(shuō)明猎贴,在如下代碼處綁定的routing_key種可以有*和#兩種字符

channel.queue_bind(exchange='topic_logs',queue=queue_name,

routing_key=binding_key)

它們代表的意義如下

*(星號(hào))可以匹配任意一個(gè)單詞

#(井號(hào))可以匹配0到多個(gè)單詞

我們通過(guò)下圖來(lái)解釋一下:

Q1匹配3個(gè)單詞中間為orange的routing_key ,而Q2可以匹配3個(gè)單詞最后一個(gè)單詞為rabbit和第一個(gè)單詞為lazy后面可以有多個(gè)單詞的routing_key班缎。

下面貼上部分示例:

producer.py

consumer.py

6、RPC應(yīng)用模型

當(dāng)我們需要在遠(yuǎn)程服務(wù)器上執(zhí)行一個(gè)方法并等待它的結(jié)果的時(shí)候她渴,我們將這種模式稱(chēng)為RPC达址。下面我們用rabbitMQ建立一個(gè)RPC系統(tǒng)在rabbit MQ中為了能讓client收到server端的response message。需要定義一個(gè)callback queue ,就像下面這樣:

不過(guò)現(xiàn)在有一個(gè)問(wèn)題趁耗,就是每次請(qǐng)求都會(huì)創(chuàng)建一個(gè)callback queue .這樣的效率是極其低下的沉唠。幸運(yùn)的是我們可以通過(guò)correlation_id為每一個(gè)client創(chuàng)建一個(gè)單獨(dú)的callback queue。通過(guò)指定correlation_id我們可以知道callback queue中的消息屬于哪個(gè)client苛败。要做到這樣只需client每次發(fā)送請(qǐng)求時(shí)帶上這唯一的correlation_id满葛。然后當(dāng)我們從callback queue中收到消息時(shí),我們能基于 correlation_id 匹配上我們的消息罢屈。

匹配不上的消息將被丟棄嘀韧,看上去就像下圖這樣:

總結(jié)一下流程如下:

client發(fā)起請(qǐng)求,請(qǐng)求中帶有2個(gè)參數(shù)reply_to和correlation_id

請(qǐng)求發(fā)往rpc_queue

server獲取到rpc_queue中的消息缠捌,處理完畢后锄贷,將結(jié)果發(fā)往reply_to指定的callback queue

client 獲取到callback queue中的消息,匹配correlation_id,如果匹配就獲取曼月,不匹配就丟棄谊却。

示列代碼參考:http://www.rabbitmq.com/tutorials/tutorial-six-python.html

從上面的6個(gè)示例我們大致了解了如何運(yùn)用rabbitMQ解決我們的實(shí)際需求,下面我們?cè)賮?lái)看看如何管理和監(jiān)控rabbitMQ的實(shí)際運(yùn)行情況哑芹。

rabbitMQ的管理和監(jiān)控

1炎辨、rabbitmq management插件

rabbitMQ提供了一個(gè)管理插件,通過(guò)這個(gè)插件我們可以查看當(dāng)前rabbitMQ服務(wù)的運(yùn)行情況聪姿。在解壓縮官網(wǎng)提供的rabbitMQ安裝包之后蹦魔,在sbin目錄可以看見(jiàn)rabbitmq-plugins文件激率,我們只需運(yùn)行一下命令:

rabbitmq-plugins enable rabbitmq_management

然后再游覽器中輸入http://server-name:15672/就可以查看當(dāng)前rabbitMQ的一些運(yùn)行狀況。如下所示:

在這個(gè)管理控制臺(tái)我可以做很多事情勿决,譬如:

查看運(yùn)行的exchanges,queues,users,virtual hosts還有權(quán)限

添加exchanges,queue,users,virtual host乒躺,以及給用戶(hù)賦予權(quán)限

監(jiān)控消息長(zhǎng)度,通道低缩,消息速度嘉冒。連接數(shù)

發(fā)送接收消息

關(guān)閉連接,清除隊(duì)列

2咆繁、rabbitmqctl使用

在rabbitMQ中讳推,rabbitctl是一個(gè)被廣泛使用的命令。對(duì)用戶(hù)的增加玩般,刪除银觅,列出列表,創(chuàng)建權(quán)限坏为,都是通過(guò)rabbitmqctl完成的究驴。下面舉幾個(gè)例子來(lái)熟悉一下如何使用

創(chuàng)建一個(gè)用戶(hù)名和密碼都為test的新用戶(hù)

./rabbitmqctl ?add_user test test

刪除的話使用以下命令

./rabbitmqctl ?delete_user test

列出所有用戶(hù)

./rabbitmqctl ?list_users

同樣也可以用此命令為用戶(hù)賦予權(quán)限

譬如我們想為用戶(hù)test在vhost rabbitmq賦予全部訪問(wèn)權(quán)限,只許執(zhí)行如下命令

./rabbitmqctl set_permissions –p rabbitmq test “.*” “.*” “.*”

列出權(quán)限

./rabbitmqctl list_permissions –p rabbitmq

刪除權(quán)限

./rabbitmqctl clear_permissions –p rabbitmq

同樣的rabbitmqctl也可以用來(lái)查看rabbitmq的運(yùn)行狀況匀伏,如下

列出隊(duì)列和消息數(shù)目

./rabbitmqctl ?list_queues –p rabbitmq

如果想要了解更多的隊(duì)列消息洒忧,譬如名字,消息數(shù)目够颠,消費(fèi)者數(shù)目熙侍,內(nèi)存使用情況,以及其他屬性 履磨。則可以發(fā)送一下命令:

./rabbitmqctl list_queues name messages consumers memory durable auto_delete

列出exchanges相關(guān)信息

./rabbitmqctl list_exchanges ?name ?type ?durable ?auto_delete

rabbitmqctl還有很多功能蛉抓,這里不一一例舉了。有興趣的可以去官方網(wǎng)站查看剃诅。

rabbitMQ集群

且聽(tīng)下回分解芝雪。

本文作者:呂翔(點(diǎn)融黑幫),來(lái)自點(diǎn)融北京技術(shù)團(tuán)隊(duì)综苔,從事過(guò)社交和P2P領(lǐng)域惩系,對(duì)互聯(lián)網(wǎng)金融比較感興趣。平時(shí)喜歡爬山打球如筛。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末堡牡,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子杨刨,更是在濱河造成了極大的恐慌晤柄,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件妖胀,死亡現(xiàn)場(chǎng)離奇詭異芥颈,居然都是意外死亡惠勒,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)爬坑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)纠屋,“玉大人,你說(shuō)我怎么就攤上這事盾计∈鄣#” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵署辉,是天一觀的道長(zhǎng)族铆。 經(jīng)常有香客問(wèn)我,道長(zhǎng)哭尝,這世上最難降的妖魔是什么哥攘? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮材鹦,結(jié)果婚禮上逝淹,老公的妹妹穿的比我還像新娘。我一直安慰自己侠姑,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布箩做。 她就那樣靜靜地躺著莽红,像睡著了一般。 火紅的嫁衣襯著肌膚如雪邦邦。 梳的紋絲不亂的頭發(fā)上安吁,一...
    開(kāi)封第一講書(shū)人閱讀 48,970評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音燃辖,去河邊找鬼鬼店。 笑死,一個(gè)胖子當(dāng)著我的面吹牛黔龟,可吹牛的內(nèi)容都是我干的妇智。 我是一名探鬼主播,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼氏身,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼巍棱!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起蛋欣,我...
    開(kāi)封第一講書(shū)人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤航徙,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后陷虎,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體到踏,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡杠袱,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了窝稿。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片楣富。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖讹躯,靈堂內(nèi)的尸體忽然破棺而出菩彬,到底是詐尸還是另有隱情,我是刑警寧澤潮梯,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布骗灶,位于F島的核電站,受9級(jí)特大地震影響秉馏,放射性物質(zhì)發(fā)生泄漏耙旦。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一萝究、第九天 我趴在偏房一處隱蔽的房頂上張望免都。 院中可真熱鬧,春花似錦帆竹、人聲如沸绕娘。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)险领。三九已至,卻和暖如春秒紧,著一層夾襖步出監(jiān)牢的瞬間绢陌,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來(lái)泰國(guó)打工熔恢, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留脐湾,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓叙淌,卻偏偏與公主長(zhǎng)得像秤掌,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子鹰霍,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • 1. 歷史 RabbitMQ是一個(gè)由erlang開(kāi)發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,092評(píng)論 3 51
  • 1 RabbitMQ安裝部署 這里是ErLang環(huán)境的下載地址http://www.erlang.org/down...
    Bobby0322閱讀 2,223評(píng)論 0 11
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理机杜,服務(wù)發(fā)現(xiàn),斷路器衅谷,智...
    卡卡羅2017閱讀 134,599評(píng)論 18 139
  • 關(guān)于消息隊(duì)列椒拗,從前年開(kāi)始斷斷續(xù)續(xù)看了些資料,想寫(xiě)很久了,但一直沒(méi)騰出空蚀苛,近來(lái)分別碰到幾個(gè)朋友聊這塊的技術(shù)選型在验,是時(shí)...
    預(yù)流閱讀 584,412評(píng)論 51 785
  • 來(lái)源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化堵未、事務(wù)腋舌、擁塞控...
    jiangmo閱讀 10,344評(píng)論 2 34