rabbitMQ是一款基于AMQP協(xié)議的消息中間件,它能夠在應(yīng)用之間提供可靠的消息傳輸脓魏。在易用性兰吟,擴(kuò)展性,高可用性上表現(xiàn)優(yōu)秀茂翔。使用消息中間件利于應(yīng)用之間的解耦混蔼,生產(chǎn)者(客戶(hù)端)無(wú)需知道消費(fèi)者(服務(wù)端)的存在。而且兩端可以使用不同的語(yǔ)言編寫(xiě)珊燎,大大提供了靈活性惭嚣。
rabbitMQ工作原理
rabbitMQ里的一些基本定義如下:
exchange
producer只能將消息發(fā)送給exchange。而exchange負(fù)責(zé)將消息發(fā)送到queues悔政。Exchange必須準(zhǔn)確的知道怎么處理它接受到的消息晚吞,是被發(fā)送到一個(gè)特定的queue還是許多quenes,還是被拋棄赐俗,這些規(guī)則則是通過(guò)exchange type來(lái)定義屯远。主要的type有direct,topic,headers,fanout演熟。具體針對(duì)不同的場(chǎng)景使用不同的type婆瓜。
queue
消息隊(duì)列,消息的載體完疫。接收來(lái)自exchange的消息审姓,然后再由consumer取出园担。exchange和queue是可以一對(duì)多的近弟,它們通過(guò)routingKey來(lái)綁定缅糟。
Producer
生產(chǎn)者,消息的來(lái)源,消息必須發(fā)送給exchange祷愉。而不是直接給queue
Consumer
消費(fèi)者窗宦,直接從queue中獲取消息進(jìn)行消費(fèi)赦颇,而不是從exchange。
從以上可以看出rabbitMQ工作原理大致就是producer把一條消息發(fā)送給exchange赴涵。rabbitMQ根據(jù)routingKey負(fù)責(zé)將消息從exchange發(fā)送到對(duì)應(yīng)綁定的queue中去沐扳,這是由rabbitMQ負(fù)責(zé)做的,而consumer只需從queue獲取消息即可句占。
大致流程如下:
rabbitMQ工作模型
下面通過(guò)幾個(gè)列子來(lái)詳細(xì)說(shuō)明一下如何使用rabbitMQ。
1躯嫉、簡(jiǎn)單發(fā)送模型
在rabbitMQ里消息永遠(yuǎn)不能被直接發(fā)送到queue纱烘,這里我們通過(guò)提供一個(gè)空字符串來(lái)使用默認(rèn)的exchange。這個(gè)exchange是特殊的祈餐,它可以根據(jù)routingKey把消息發(fā)送給指定的queue擂啥。所以我們的設(shè)計(jì)看起來(lái)如下所示:
代碼如下
send.py
receive.py
2、工作隊(duì)列模型
這種模式常常用來(lái)處理耗資源耗時(shí)間的任務(wù)在多個(gè)workers中帆阳,主要是為了避免立即去處理一個(gè)耗時(shí)的任務(wù)而等待它的完成哺壶。代替的做法是一個(gè)稍后去處理這個(gè)任務(wù),讓一個(gè)worker process 在后臺(tái)處理這個(gè)任務(wù)蜒谤。當(dāng)有許多workers的時(shí)候山宾,消息將會(huì)以輪詢(xún)的方式被workers獲取。模型如下:
這里就會(huì)有一個(gè)問(wèn)題鳍徽,如果consumer在執(zhí)行任務(wù)時(shí)需要花費(fèi)一些時(shí)間资锰,這個(gè)時(shí)候如果突然掛了,消息還沒(méi)有被完成阶祭,消息豈不是丟失了绷杜,為了不讓消息丟失,rabbitmq提供了消息確認(rèn)機(jī)制濒募,consumer在接收到鞭盟,執(zhí)行完消息后會(huì)發(fā)送一個(gè)ack給rabbitmq告訴它可以從queue中移除消息了。
如果沒(méi)收到ack瑰剃,Rabbitmq會(huì)重新發(fā)送此條消息齿诉,如果有其他的consumer在線,將會(huì)接收并消費(fèi)這條消息培他。消息確認(rèn)機(jī)制是默認(rèn)打開(kāi)的鹃两。如果想關(guān)閉它只需要設(shè)置no_ack=true。在此處我們不需要設(shè)置舀凛。默認(rèn)如下就行:
channel.basic_consume(callback, ?queue='hello')
除了consumer之外我們還得確保rabbitMQ掛了之后消息不被丟失俊扳。這里我們就需要確保隊(duì)列queue和消息messages都得是持久化的。
隊(duì)列的持久話需要設(shè)置durable屬性:
channel.queue_declare(queue= task_queue, durable=True)
消息的持久話則是通過(guò)delivery_mode屬性猛遍,設(shè)置值為2即可馋记。
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
還有一個(gè)屬性相對(duì)比較重要号坡,它可以保證consumer確認(rèn)消費(fèi)完一條消息之后再去獲取下一條消息。如果consumer正在忙碌的狀態(tài)梯醒,消息將會(huì)被分發(fā)到下一個(gè)不是很忙的consumer宽堆。
設(shè)置如下:
channel.basic_qos(prefetch_count=1)
下面貼出部分代碼
producer.py
consumer.py
3、廣播模型
在前面2個(gè)示例我們都適用默認(rèn)的exchange茸习。這里我們將自己定義一個(gè)exchange畜隶。并設(shè)置type為fanout。它可以將消息廣播給綁定的每一個(gè)queue号胚。而不再是某一個(gè)queue籽慢。我們?cè)诖藙?chuàng)建一個(gè)叫l(wèi)ogs的exchange,就像下面這樣:
channel.exchange_declare(exchange='logs', type='fanout')
所以發(fā)布消息就變成了下面這樣:
channel.basic_publish(exchange='logs',routing_key='', body=message)
在這里我們需要將消息發(fā)送給所有的queues猫胁。而不需要指定某些隊(duì)列箱亿。所以我們這里就用臨時(shí)隊(duì)列代替。并設(shè)置在失去連接后刪除隊(duì)列弃秆。當(dāng)然我們也可以不這么做届惋。
設(shè)置臨時(shí)隊(duì)列,讓rabbitmq給我們一個(gè)隨機(jī)的隊(duì)列名字菠赚,設(shè)置exclusive為true確保失去連接的時(shí)候隊(duì)列也被刪除了脑豹。因?yàn)槲覀冞@里不需要持久化隊(duì)列。
result = channel.queue_declare(exclusive=True)
下面就是要綁定queues和exchange:
channel.queue_bind(exchange='logs', queue=result.method.queue)
綜上所述我們的設(shè)計(jì)就像下面這樣:
部分代碼如下
producer.py
consumer.py
4衡查、direct模型
在上個(gè)模型中晨缴,消息被發(fā)送給所有的消費(fèi)者,而在這一部分我們將通過(guò)路由的方式使exchange通過(guò)定義的路由方式將消息發(fā)送給隊(duì)列峡捡。所以我們需要在綁定exchange和queue的時(shí)候指定routing_key字段击碗,注意這里的routing_key不是basic_publish中的routing_key。
見(jiàn)如下:
channel.queue_bind(exchange=exchange_name,queue=queue_name,
routing_key='black')
這里我們將使用type為direct的exchange们拙。這種路由方式exchange將消息通過(guò)綁定的routing_key發(fā)送到指定的隊(duì)列稍途。而且exchange可以通過(guò)多個(gè)routing_key把消息發(fā)送給同一個(gè)queue。
通過(guò)下面這張圖我們來(lái)分析一下:
在上面的圖中砚婆,我們可以看出type為direct的exchange X 綁定了2個(gè)隊(duì)列械拍。隊(duì)列Q1關(guān)聯(lián)路由orange。隊(duì)列Q2關(guān)聯(lián)路由black和green装盯。所以一個(gè)帶有路由健orange消息將被exchange發(fā)送給隊(duì)列Q1坷虑。而帶有路由健black或者green的消息將被發(fā)送給隊(duì)列Q2。
我們還是通過(guò)修改前面的日志系統(tǒng)埂奈,來(lái)展示direct類(lèi)型的exchange如何工作迄损,如圖:
部分代碼如下
producer.py
consumer.py
讓我們運(yùn)行一下看看結(jié)果是什么,我們啟動(dòng)了3個(gè)consumer,routing_key分別指定為warning, error账磺,第三個(gè)同時(shí)指定這2個(gè)芹敌。然后在運(yùn)行producer時(shí)帶上路由信息routing_key痊远。運(yùn)行后可以看出指定了warning的不會(huì)收到error的消息。同時(shí)指定warning 和error的consumer則會(huì)都收到消息氏捞。
發(fā)送消息:
只收到warning的消息:
只收到error的消息:
error和waring的都能收到:
5碧聪、Topic模型
這種模型是最靈活的,相比較于direct的完全匹配和fanout的廣播液茎。Topic可以用類(lèi)似正則的手法更好的匹配來(lái)滿(mǎn)足我們的應(yīng)用逞姿。下面我們首先了解一下topic類(lèi)型的exchange:
topic類(lèi)型的routing_key不可以是隨意的單詞,它必須是一系列的單詞組合捆等,中間以逗號(hào)隔開(kāi)哼凯,譬如“quick.orange.rabbit”這個(gè)樣子。發(fā)送消息的routing_key必須匹配上綁定到隊(duì)列的routing_key楚里。消息才會(huì)被發(fā)送。此外還有個(gè)重要的地方要說(shuō)明猎贴,在如下代碼處綁定的routing_key種可以有*和#兩種字符
channel.queue_bind(exchange='topic_logs',queue=queue_name,
routing_key=binding_key)
它們代表的意義如下
*(星號(hào))可以匹配任意一個(gè)單詞
#(井號(hào))可以匹配0到多個(gè)單詞
我們通過(guò)下圖來(lái)解釋一下:
Q1匹配3個(gè)單詞中間為orange的routing_key ,而Q2可以匹配3個(gè)單詞最后一個(gè)單詞為rabbit和第一個(gè)單詞為lazy后面可以有多個(gè)單詞的routing_key班缎。
下面貼上部分示例:
producer.py
consumer.py
6、RPC應(yīng)用模型
當(dāng)我們需要在遠(yuǎn)程服務(wù)器上執(zhí)行一個(gè)方法并等待它的結(jié)果的時(shí)候她渴,我們將這種模式稱(chēng)為RPC达址。下面我們用rabbitMQ建立一個(gè)RPC系統(tǒng)在rabbit MQ中為了能讓client收到server端的response message。需要定義一個(gè)callback queue ,就像下面這樣:
不過(guò)現(xiàn)在有一個(gè)問(wèn)題趁耗,就是每次請(qǐng)求都會(huì)創(chuàng)建一個(gè)callback queue .這樣的效率是極其低下的沉唠。幸運(yùn)的是我們可以通過(guò)correlation_id為每一個(gè)client創(chuàng)建一個(gè)單獨(dú)的callback queue。通過(guò)指定correlation_id我們可以知道callback queue中的消息屬于哪個(gè)client苛败。要做到這樣只需client每次發(fā)送請(qǐng)求時(shí)帶上這唯一的correlation_id满葛。然后當(dāng)我們從callback queue中收到消息時(shí),我們能基于 correlation_id 匹配上我們的消息罢屈。
匹配不上的消息將被丟棄嘀韧,看上去就像下圖這樣:
總結(jié)一下流程如下:
client發(fā)起請(qǐng)求,請(qǐng)求中帶有2個(gè)參數(shù)reply_to和correlation_id
請(qǐng)求發(fā)往rpc_queue
server獲取到rpc_queue中的消息缠捌,處理完畢后锄贷,將結(jié)果發(fā)往reply_to指定的callback queue
client 獲取到callback queue中的消息,匹配correlation_id,如果匹配就獲取曼月,不匹配就丟棄谊却。
示列代碼參考:http://www.rabbitmq.com/tutorials/tutorial-six-python.html
從上面的6個(gè)示例我們大致了解了如何運(yùn)用rabbitMQ解決我們的實(shí)際需求,下面我們?cè)賮?lái)看看如何管理和監(jiān)控rabbitMQ的實(shí)際運(yùn)行情況哑芹。
rabbitMQ的管理和監(jiān)控
1炎辨、rabbitmq management插件
rabbitMQ提供了一個(gè)管理插件,通過(guò)這個(gè)插件我們可以查看當(dāng)前rabbitMQ服務(wù)的運(yùn)行情況聪姿。在解壓縮官網(wǎng)提供的rabbitMQ安裝包之后蹦魔,在sbin目錄可以看見(jiàn)rabbitmq-plugins文件激率,我們只需運(yùn)行一下命令:
rabbitmq-plugins enable rabbitmq_management
然后再游覽器中輸入http://server-name:15672/就可以查看當(dāng)前rabbitMQ的一些運(yùn)行狀況。如下所示:
在這個(gè)管理控制臺(tái)我可以做很多事情勿决,譬如:
查看運(yùn)行的exchanges,queues,users,virtual hosts還有權(quán)限
添加exchanges,queue,users,virtual host乒躺,以及給用戶(hù)賦予權(quán)限
監(jiān)控消息長(zhǎng)度,通道低缩,消息速度嘉冒。連接數(shù)
發(fā)送接收消息
關(guān)閉連接,清除隊(duì)列
2咆繁、rabbitmqctl使用
在rabbitMQ中讳推,rabbitctl是一個(gè)被廣泛使用的命令。對(duì)用戶(hù)的增加玩般,刪除银觅,列出列表,創(chuàng)建權(quán)限坏为,都是通過(guò)rabbitmqctl完成的究驴。下面舉幾個(gè)例子來(lái)熟悉一下如何使用
創(chuàng)建一個(gè)用戶(hù)名和密碼都為test的新用戶(hù)
./rabbitmqctl ?add_user test test
刪除的話使用以下命令
./rabbitmqctl ?delete_user test
列出所有用戶(hù)
./rabbitmqctl ?list_users
同樣也可以用此命令為用戶(hù)賦予權(quán)限
譬如我們想為用戶(hù)test在vhost rabbitmq賦予全部訪問(wèn)權(quán)限,只許執(zhí)行如下命令
./rabbitmqctl set_permissions –p rabbitmq test “.*” “.*” “.*”
列出權(quán)限
./rabbitmqctl list_permissions –p rabbitmq
刪除權(quán)限
./rabbitmqctl clear_permissions –p rabbitmq
同樣的rabbitmqctl也可以用來(lái)查看rabbitmq的運(yùn)行狀況匀伏,如下
列出隊(duì)列和消息數(shù)目
./rabbitmqctl ?list_queues –p rabbitmq
如果想要了解更多的隊(duì)列消息洒忧,譬如名字,消息數(shù)目够颠,消費(fèi)者數(shù)目熙侍,內(nèi)存使用情況,以及其他屬性 履磨。則可以發(fā)送一下命令:
./rabbitmqctl list_queues name messages consumers memory durable auto_delete
列出exchanges相關(guān)信息
./rabbitmqctl list_exchanges ?name ?type ?durable ?auto_delete
rabbitmqctl還有很多功能蛉抓,這里不一一例舉了。有興趣的可以去官方網(wǎng)站查看剃诅。
rabbitMQ集群
且聽(tīng)下回分解芝雪。
本文作者:呂翔(點(diǎn)融黑幫),來(lái)自點(diǎn)融北京技術(shù)團(tuán)隊(duì)综苔,從事過(guò)社交和P2P領(lǐng)域惩系,對(duì)互聯(lián)網(wǎng)金融比較感興趣。平時(shí)喜歡爬山打球如筛。