AMQP大致內(nèi)容就是恒削,將消息和隊列綁定起來,規(guī)定讓進入到交換機中的具有某個路由鍵的消息進入到指定隊列中去扣孟。
RabbitMQ就是赤炒,可以自然的在網(wǎng)絡(luò)上應(yīng)用程序之間傳送數(shù)據(jù)的一類程序吊骤。且支持持久化缎岗。
http://blog.ftofficer.com/2010/03/translation-rabbitmq-python-rabbits-and-warrens/
1.MQ作用:
現(xiàn)成的、通用的消息隊列(MQ)服務(wù)器——無論是用什么語言寫出的白粉,不需要復(fù)雜的裝配的传泊,可以自然的在網(wǎng)絡(luò)上的應(yīng)用程序之間傳送數(shù)據(jù)的一類程序。
2.MQ經(jīng)歷及RabbitMQ的特點:
過去的4年里鸭巴,人們寫了有好多好多的開源的MQ服務(wù)器啊眷细。
有三個專門設(shè)計用來做及其靈活的消息隊列的程序值得關(guān)注:
Apache ActiveMQ 曝光率最高,不過看起來它有些問題鹃祖,可能會造成丟消息溪椎。不可接受,下一個恬口。
ZeroMQ 和 RabbitMQ 都支持一個開源的消息協(xié)議校读,稱為為AMQP。AMQP的一個優(yōu)點是它是一個靈活和開放的協(xié)議祖能,AMQP歉秫,即Advanced Message Queuing Protocol,一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標準高級消息隊列協(xié)議,是應(yīng)用層協(xié)議的一個開放標準,為面向消息的中間件設(shè)計。以便和另外兩個商業(yè)化的Message Queue (IBM和Tibco)競爭养铸,很好雁芙。不過ZeroMQ不支持消息持久化和崩潰恢復(fù),不太好钞螟。剩下的只有RabbitMQ了兔甘。如果你不在意消息持久化和崩潰恢復(fù),試試ZeroMQ吧鳞滨,延遲很低洞焙,而且支持靈活的拓撲。
RabbitMQ支持持久化拯啦。是的闽晦,如果RabbitMQ死掉了,消息并不會丟失提岔,當(dāng)隊列重啟仙蛉,一切都會回來。
Rabbitmq是一個MQ系統(tǒng)碱蒙,也就是消息中間件荠瘪,它實現(xiàn)了AMQP 0.8規(guī)范,簡單來說就是一個TCP的廣播服務(wù)器赛惩。AMQP協(xié)議哀墓,你可以類比JMS,不過JMS僅僅是java領(lǐng)域內(nèi)的API規(guī)范喷兼,而AMQP比JMS更進一步篮绰,它有自己的wire-level protocol,有一套可編程的協(xié)議季惯,中立于語言吠各。
3.RabbitMQ兔子消息隊列上使用AMQP協(xié)議臀突,工作方式解讀:
這里是花了一周時間閱讀關(guān)于AMQP和關(guān)于它如何在RabbitMQ上工作的文檔之后的一個總結(jié),還有贾漏,怎么在Python當(dāng)中使用候学。
3.1AMQP當(dāng)中有四個概念非常重要:虛擬主機(virtual host),交換機(exchange)纵散,隊列(queue)和綁定(binding)梳码。一個虛擬主機持有一組交換機、隊列和綁定伍掀。為什么需要多個虛擬主機呢掰茶?很簡單,RabbitMQ當(dāng)中蜜笤,用戶只能在虛擬主機的粒度進行權(quán)限控制濒蒋。因此,如果需要禁止A組訪問B組的交換機/隊列/綁定瘩例,必須為A和B分別創(chuàng)建一個虛擬主機啊胶。每一個RabbitMQ服務(wù)器都有一個默認的虛擬主機“/”。
3.2交換機垛贤,隊列焰坪,還有綁定的關(guān)系
交換機可以理解成具有路由表的路由程序,僅此而已聘惦。每個消息都有一個稱為路由鍵(routing key)的屬性某饰,就是一個簡單的字符串。交換機當(dāng)中有一系列的綁定(binding)善绎,即路由規(guī)則(routes)黔漂,例如,指明具有路由鍵 “X” 的消息要到名為timbuku的隊列當(dāng)中去禀酱。
一個綁定就是一個基于路由鍵將交換機和隊列連接起來的路由規(guī)則:
路由規(guī)則炬守,即綁定(binding)。一個綁定就是一個類似這樣的規(guī)則:將交換機“desert(沙漠)”當(dāng)中具有路由鍵“阿里巴巴”的消息送到隊列“hideout(山洞)”里面去剂跟。換句話說减途,一個綁定就是一個基于路由鍵將交換機和隊列連接起來的路由規(guī)則。
交換機不過就是一個由綁定構(gòu)成的路由表:
曹洽,一個綁定就是一個基于路由鍵將交換機和隊列連接起來的路由規(guī)則鳍置。例如,具有路由鍵“audit”的消息需要被送到兩個隊列送淆,“l(fā)og-forever”和“alert-the-big-dude”税产。要做到這個,就需要創(chuàng)建兩個綁定,每個都連接一個交換機和一個隊列辟拷,兩者都是由“audit”路由鍵觸發(fā)撞羽。在這種情況下,交換機會復(fù)制一份消息并且把它們分別發(fā)送到兩個隊列當(dāng)中梧兼。交換機不過就是一個由綁定構(gòu)成的路由表放吩。
交換機有多種類型智听。他們都是做路由的羽杰,不過接受不同類型的綁定。為什么不創(chuàng)建一種交換機來處理所有類型的路由規(guī)則呢到推?因為每種規(guī)則用來做匹配分子的CPU開銷是不同的考赛。常見有以下三種類型交換機:
Fanout Exchange– 不處理路由鍵。你只需要簡單的將隊列綁定到交換機上莉测。一個發(fā)送到交換機的消息都會被轉(zhuǎn)發(fā)到與該交換機綁定的所有隊列上颜骤。很像子網(wǎng)廣播,每臺子網(wǎng)內(nèi)的主機都獲得了一份復(fù)制的消息捣卤。Fanout交換機轉(zhuǎn)發(fā)消息是最快的忍抽。
Direct Exchange– 處理路由鍵。需要將一個隊列綁定到交換機上董朝,要求該消息與一個特定的路由鍵完全匹配鸠项。這是一個完整的匹配。如果一個隊列綁定到該交換機上要求路由鍵 “dog”子姜,則只有被標記為“dog”的消息才被轉(zhuǎn)發(fā)祟绊,不會轉(zhuǎn)發(fā)dog.puppy,也不會轉(zhuǎn)發(fā)dog.guard哥捕,只會轉(zhuǎn)發(fā)dog牧抽。
Topic Exchange– 將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上遥赚。符號“#”匹配一個或多個詞扬舒,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”凫佛,但是“audit.*” 只會匹配到“audit.irs”讲坎。我在RedHat的朋友做了一張不錯的圖,來表明topic交換機是如何工作的:
4.持久化這些小東西們
4.1關(guān)于消息的持久化
如果服務(wù)器程序掛了御蒲,RabbitMQ重啟之后會干凈的像個新生兒衣赶。隊列、交換機和綁定厚满,還有府瞄,放在隊列里面但是尚未處理的消息們,全都丟失了。
隊列和交換機有一個創(chuàng)建時候指定的標志durable遵馆,直譯叫做堅固的鲸郊。durable的唯一含義就是具有這個標志的隊列和交換機會在重啟之后重新建立,它不表示說在隊列當(dāng)中的消息會在重啟后恢復(fù)货邓。那么如何才能做到不只是隊列和交換機秆撮,還有消息都是持久的呢?
對于一個需要在重啟之后回復(fù)的消息來說换况,它需要被寫入到磁盤上职辨,而即使是最簡單的磁盤操作也是要消耗時間的。如果和消息的內(nèi)容相比戈二,你更看重的是消息處理的速度舒裤,那么不要使用持久化的消息。
4.2消息持久化的操作
當(dāng)你將消息發(fā)布到交換機的時候觉吭,可以指定一個標志“Delivery Mode”(投遞模式)腾供。根據(jù)你使用的AMQP的庫不同,指定這個標志的方法可能不太一樣(我們后面會討論如何用Python搞定)鲜滩。簡單的說伴鳖,就是將Delivery Mode設(shè)置成2,也就是持久的(persistent)即可徙硅。一般的AMQP庫都是將Delivery Mode設(shè)置成1榜聂,也就是非持久的。所以要持久化消息的步驟如下:
將交換機設(shè)成 durable闷游。
將隊列設(shè)成 durable峻汉。
將消息的 Delivery Mode 設(shè)置成2 。
另外脐往,如果你綁定了一個durable的隊列和一個durable的交換機休吠,RabbitMQ會自動保留這個綁定。類似的业簿,如果刪除了某個隊列或交換機(無論是不是durable)瘤礁,依賴它的綁定都會自動刪除。
注意兩點:
RabbitMQ 不允許你綁定一個非堅固(non-durable)的交換機和一個durable的隊列梅尤。反之亦然柜思。要想成功必須隊列和交換機都是durable的。
一旦創(chuàng)建了隊列和交換機巷燥,就不能修改其標志了赡盘。例如,如果創(chuàng)建了一個non-durable的隊列缰揪,然后想把它改變成durable的陨享,唯一的辦法就是刪除這個隊列然后重現(xiàn)創(chuàng)建。因此,最好仔細檢查創(chuàng)建的標志抛姑。
5.開始喂蛇了~
【譯注】說喂蛇是因為Python的圖標是條蛇赞厕。
AMQP的一個空白地帶是如何在Python當(dāng)中使用。對于其他語言有一大坨材料定硝。
Java –http://www.rabbitmq.com/java-client.html
Ruby –http://somic.org/2008/06/24/ruby-amqp-rabbitmq-example/
但是對Python老兄來說皿桑,你需要花點時間來挖掘一下。所以我寫了這個蔬啡,這樣別的家伙們就不需要經(jīng)歷我這種抓狂的過程了诲侮。
6.實例:一個基于Python語言的rabbitMQ收發(fā)消息代碼段
6.1建立發(fā)送消息的完整結(jié)構(gòu)。
首先星爪,我們需要一個Python的AMQP庫浆西。有兩個可選:
py-amqplib– 通用的AMQP
txAMQP– 使用Twisted框架的AMQP庫粉私,因此允許異步I/O顽腾。
根據(jù)你的需求,py-amqplib或者txAMQP都是可以的诺核。因為是基于Twisted的抄肖,txAMQP可以保證用異步IO構(gòu)建超高性能的AMQP程序。但是Twisted編程本身就是一個很大的主題……因此清晰起見窖杀,我們打算用 py-amqplib漓摩。更新:請參見Esteve Fernandez關(guān)于txAMQP的使用和代碼樣例的回復(fù)。
AMQP支持在一個TCP連接上啟用多個MQ通信channel入客,每個channel都可以被應(yīng)用作為通信流管毙。每個AMQP程序至少要有一個連接和一個channel。
fromamqplibimportclient_0_8asamqpconn = amqp.Connection(host="localhost:5672 ", userid="guest",password="guest", virtual_host="/", insist=False)chan = conn.channel()
每個channel都被分配了一個整數(shù)標識桌硫,自動由Connection()類的.channel()方法維護夭咬。或者铆隘,你可以使用.channel(x)來指定channel標識卓舵,其中x是你想要使用的channel標識。通常情況下膀钠,推薦使用.channel()方法來自動分配channel標識掏湾,以便防止沖突。
現(xiàn)在我們已經(jīng)有了一個可以用的連接和channel≈壮埃現(xiàn)在融击,我們的代碼將分成兩個應(yīng)用,生產(chǎn)者(producer)和消費者(consumer)雳窟。我們先創(chuàng)建一個消費者程序尊浪,他會創(chuàng)建一個叫做“po_box”的隊列和一個叫“sorting_room”的交換機:
chan.queue_declare(queue="po_box", durable=True,exclusive=False, auto_delete=False)chan.exchange_declare(exchange="sorting_room",type="direct", durable=True,auto_delete=False,)
這段代碼干了啥?首先,它創(chuàng)建了一個名叫“po_box”的隊列际长,它是durable的(重啟之后會重新建立)耸采,并且最后一個消費者斷開的時候不會自動刪除(auto_delete=False)。在創(chuàng)建durable的隊列(或者交換機)的時候工育,將auto_delete設(shè)置成false是很重要的虾宇,否則隊列將會在最后一個消費者斷開的時候消失,與durable與否無關(guān)如绸。如果將durable和auto_delete都設(shè)置成True嘱朽,只有尚有消費者活動的隊列可以在RabbitMQ意外崩潰的時候自動恢復(fù)。
(你可以注意到了另一個標志怔接,稱為“exclusive”搪泳。如果設(shè)置成True,只有創(chuàng)建這個隊列的消費者程序才允許連接到該隊列扼脐。這種隊列對于這個消費者程序是私有的)岸军。
還有另一個交換機聲明,創(chuàng)建了一個名字叫“sorting_room”的交換機瓦侮。auto_delete和durable的含義和隊列是一樣的艰赞。但是,.excange_declare() 還有另外一個參數(shù)叫做type肚吏,用來指定要創(chuàng)建的交換機的類型(如前面列出的):fanout,direct和topic.
到此為止方妖,你已經(jīng)有了一個可以接收消息的隊列和一個可以發(fā)送消息的交換機。不過我們需要創(chuàng)建一個綁定罚攀,把它們連接起來党觅。
chan.queue_bind(queue=”po_box”, exchange=”sorting_room”,
routing_key=”jason”)
這個綁定的過程非常直接。任何送到交換機“sorting_room”的具有路由鍵“jason” 的消息都被路由到名為“po_box” 的隊列斋泄。
6.2從隊列取消息以及反饋:
現(xiàn)在杯瞻,你有兩種方法從隊列當(dāng)中取出消息。第一個是調(diào)用chan.basic_get()是己,主動從隊列當(dāng)中拉出下一個消息(如果隊列當(dāng)中沒有消息又兵,chan.basic_get()會返回None, 因此下面代碼當(dāng)中print msg.body 會在沒有消息的時候崩掉):
msg = chan.basic_get("po_box")printmsg.bodychan.basic_ack(msg.delivery_tag)
但是如果你想要應(yīng)用程序在消息到達的時候立即得到通知怎么辦卒废?這種情況下不能使用chan.basic_get()沛厨,你需要用chan.basic_consume()注冊一個新消息到達的回調(diào)。
defrecv_callback(msg):print'Received: '+ msg.bodychan.basic_consume(queue='po_box', no_ack=True,callback=recv_callback, consumer_tag="testtag")whileTrue:? ? chan.wait()chan.basic_cancel("testtag")
chan.wait()放在一個無限循環(huán)里面摔认,這個函數(shù)會等待在隊列上逆皮,直到下一個消息到達隊列。chan.basic_cancel()用來注銷該回調(diào)函數(shù)参袱。參數(shù)consumer_tag當(dāng)中指定的字符串和chan.basic_consume()注冊的一直电谣。在這個例子當(dāng)中chan.basic_cancel()不會被調(diào)用到秽梅,因為上面是個無限循環(huán)…… 不過你需要知道這個調(diào)用,所以我把它放在了代碼里剿牺。
需要注意的另一個東西是no_ack參數(shù)企垦。這個參數(shù)可以傳給chan.basic_get()和chan.basic_consume(),默認是false晒来。當(dāng)從隊列當(dāng)中取出一個消息的時候钞诡,RabbitMQ需要應(yīng)用顯式地回饋說已經(jīng)獲取到了該消息。如果一段時間內(nèi)不回饋湃崩,RabbitMQ會將該消息重新分配給另外一個綁定在該隊列上的消費者荧降。另一種情況是消費者斷開連接,但是獲取到的消息沒有回饋攒读,則RabbitMQ同樣重新分配朵诫。如果將no_ack參數(shù)設(shè)置為true,則py-amqplib會為下一個AMQP請求添加一個no_ack屬性薄扁,告訴AMQP服務(wù)器不需要等待回饋剪返。但是,大多數(shù)時候泌辫,你也許想要自己手工發(fā)送回饋随夸,例如,需要在回饋之前將消息存入數(shù)據(jù)庫震放。回饋通常是通過調(diào)用chan.basic_ack()方法驼修,使用消息的delivery_tag屬性作為參數(shù)殿遂。參見chan.basic_get()的實例代碼。
好了乙各,這就是消費者的全部代碼墨礁。(下載:amqp_consumer.py)
6.3發(fā)送者將消息發(fā)送到交換機
不過沒有人發(fā)送消息的話,要消費者何用耳峦?所以需要一個生產(chǎn)者恩静。下面的代碼示例表明如何將一個簡單消息發(fā)送到交換區(qū)“sorting_room”,并且標記為路由鍵“jason” :
msg = amqp.Message("Test message!")msg.properties["delivery_mode"]=2chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")
你也許注意到我們設(shè)置消息的delivery_mode屬性為2蹲坷,因為隊列和交換機都設(shè)置為durable的驶乾,這個設(shè)置將保證消息能夠持久化,也就是說循签,當(dāng)它還沒有送達消費者之前如果RabbitMQ重啟則它能夠被恢復(fù)级乐。
剩下的最后一件事情(生產(chǎn)者和消費者都需要調(diào)用的)是關(guān)閉channel和連接:
chan.close()conn.close()
很簡單吧。(下載:amqp_publisher.py)
7.實踐
來真實地跑一下吧……
現(xiàn)在我們已經(jīng)寫好了生產(chǎn)者和消費者县匠,讓他們跑起來吧风科。假設(shè)你的RabbitMQ在localhost上安裝并且運行撒轮。
打開一個終端,執(zhí)行python ./amqp_consumer.py讓消費者運行贼穆,并且創(chuàng)建隊列题山、交換機和綁定。
然后在另一個終端運行python ./amqp_publisher.py “AMQP rocks.”故痊。如果一切良好臀蛛,你應(yīng)該能夠在第一個終端看到輸出的消息。
付諸使用吧
我知道這個教程是非常粗淺的關(guān)于AMQP/RabbitMQ和如何使用Python訪問的教程崖蜜。希望這個可以說明所有的概念如何在Python當(dāng)中被組合起來浊仆。如果你發(fā)現(xiàn)任何錯誤,請聯(lián)系原作者(williamsjj@digitar.com) 【譯注:如果是翻譯問題請聯(lián)系譯者】豫领。同時抡柿,我很高興回答我知道的問題〉瓤郑【譯注:譯者也是一樣的】洲劣。接下來是,集群化(clustering)课蔬!不過我需要先把它弄懂再說囱稽。
注:關(guān)于RabbitMQ的知識我主要來自這些來源,推薦閱讀:
高級消息隊列協(xié)議(Advanced Message Queuing Protocol):協(xié)議規(guī)約0.8 版本
–完–