AMQP簡(jiǎn)介
AMQP吆玖,即 Advanced Message Queuing Protocol,高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn)碌嘀,為面向消息的中間件設(shè)計(jì)。消息中間件主要用于組件之間的解耦和通訊虱而。
AMQP的主要特征是面向消息筏餐、隊(duì)列、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)牡拇、可靠性魁瞪、安全。
RabbitMQ是一個(gè)開源的AMQP實(shí)現(xiàn)惠呼,服務(wù)器端用 Erlang 語(yǔ)言編寫导俘,支持多種客戶端,如:Python剔蹋、Ruby旅薄、.NET、Java泣崩、JMS少梁、C、PHP矫付、ActionScript凯沪、XMPP、STOMP等买优,支持AJAX妨马。用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息,具有很高的易用性和可用性杀赢。
安裝部署
安裝部署請(qǐng)參考Docker 安裝部署RabbitMQ
AMQP協(xié)議中的幾個(gè)重要概念
ConnectionFactory烘跺、Connection、Channel
ConnectionFactory脂崔、Connection滤淳、Channel都是RabbitMQ對(duì)外提供的API中最基本的對(duì)象。
- ConnectionFactory:ConnectionFactory為Connection的制造工廠砌左。
- Connection:Connection是RabbitMQ的socket鏈接娇钱,它封裝了socket協(xié)議相關(guān)部分邏輯伤柄。
- Channel(信道):信道是建立在“真實(shí)的”TCP連接上的虛擬連接,在一條TCP鏈接上創(chuàng)建多少條信道是沒(méi)有限制的文搂,把他想象成光纖就是可以了适刀。它是我們與RabbitMQ打交道的最重要的一個(gè)接口,我們大部分的業(yè)務(wù)操作是在Channel這個(gè)接口中完成的煤蹭,包括定義Queue笔喉、定義Exchange、綁定Queue與Exchange硝皂、發(fā)布消息等常挚。
Queue(隊(duì)列)
Queue 是 RabbitMQ 的內(nèi)部對(duì)象,用于存儲(chǔ)消息稽物。
RabbitMQ中的消息只能存儲(chǔ)在 Queue 中奄毡。生產(chǎn)者(下圖中的P)生產(chǎn)消息并最終投遞到Queue中,消費(fèi)者(下圖中的C)可以從Queue中獲取消息并消費(fèi)贝或,消費(fèi)者可以是一個(gè)或者多個(gè)吼过。
Message acknowledgment(ack 消息的確認(rèn)):
在實(shí)際應(yīng)用中,可能會(huì)發(fā)生消費(fèi)者收到Queue中的消息咪奖,但沒(méi)有處理完成就宕機(jī)(或出現(xiàn)其他意外)的情況盗忱,這種情況下就可能會(huì)導(dǎo)致消息丟失。為了避免這種情況發(fā)生羊赵,我們可以要求消費(fèi)者在消費(fèi)完消息后發(fā)送一個(gè)回執(zhí)給RabbitMQ趟佃,RabbitMQ收到消息回執(zhí)(ack)后才將該消息從Queue中移除;如果RabbitMQ沒(méi)有收到回執(zhí)并檢測(cè)到消費(fèi)者的RabbitMQ連接斷開昧捷,則RabbitMQ會(huì)將該消息發(fā)送給其他消費(fèi)者(如果存在多個(gè)消費(fèi)者)進(jìn)行處理闲昭。這里不存在timeout概念,一個(gè)消費(fèi)者處理消息時(shí)間再長(zhǎng)也不會(huì)導(dǎo)致該消息被發(fā)送給其他消費(fèi)者靡挥,除非它的RabbitMQ連接斷開序矩。
這里會(huì)產(chǎn)生另外一個(gè)問(wèn)題,如果我們的開發(fā)人員在處理完業(yè)務(wù)邏輯后芹血,忘記發(fā)送回執(zhí)給RabbitMQ贮泞,這將會(huì)導(dǎo)致嚴(yán)重的bug——Queue中堆積的消息會(huì)越來(lái)越多楞慈;消費(fèi)者重啟后會(huì)重復(fù)消費(fèi)這些消息并重復(fù)執(zhí)行業(yè)務(wù)邏輯…
另外pub message是沒(méi)有ack的幔烛。
Message durability(消息的持久化)
如果我們希望即使在RabbitMQ服務(wù)重啟的情況下,也不會(huì)丟失消息囊蓝,我們可以將Queue與Message都設(shè)置為可持久化的(durable)饿悬,這樣可以保證絕大部分情況下我們的RabbitMQ消息不會(huì)丟失。但依然解決不了小概率丟失事件的發(fā)生(比如RabbitMQ服務(wù)器已經(jīng)接收到生產(chǎn)者的消息聚霜,但還沒(méi)來(lái)得及持久化該消息時(shí)RabbitMQ服務(wù)器就斷電了)狡恬,如果我們需要對(duì)這種小概率事件也要管理起來(lái)珠叔,那么我們要用到事務(wù)。由于這里僅為RabbitMQ的簡(jiǎn)單介紹弟劲,所以這里將不講解RabbitMQ相關(guān)的事務(wù)祷安。具體可以參考 RabbitMQ之消息確認(rèn)機(jī)制(事務(wù)+Confirm)
Prefetch count(每次向消費(fèi)者發(fā)送消息的總數(shù))
前面我們講到如果有多個(gè)消費(fèi)者同時(shí)訂閱同一個(gè)Queue中的消息,Queue中的消息會(huì)被平攤給多個(gè)消費(fèi)者兔乞。這時(shí)如果每個(gè)消息的處理時(shí)間不同汇鞭,就有可能會(huì)導(dǎo)致某些消費(fèi)者一直在忙,而另外一些消費(fèi)者很快就處理完手頭工作并一直空閑的情況庸追。我們可以通過(guò)設(shè)置prefetchCount來(lái)限制Queue每次發(fā)送給每個(gè)消費(fèi)者的消息數(shù)霍骄,比如我們?cè)O(shè)置prefetchCount=1,則Queue每次給每個(gè)消費(fèi)者發(fā)送一條消息淡溯;消費(fèi)者處理完這條消息后Queue會(huì)再給該消費(fèi)者發(fā)送一條消息读整。
Exchange(交換器)
Exchange生產(chǎn)者將消息發(fā)送到 Exchange(交換器,下圖中的X)咱娶,由 Exchange 根據(jù)一定的規(guī)則將消息路由到一個(gè)或多個(gè) Queue 中(或者丟棄)米间。
Routing key(路由key)
生產(chǎn)者在將消息發(fā)送給 Exchange 的時(shí)候,一般會(huì)指定一個(gè) routing key豺总,來(lái)指定這個(gè)消息的路由規(guī)則车伞。 Exchange 會(huì)根據(jù) routing key 和 Exchange Type(交換器類型) 以及 Binding key 的匹配情況來(lái)決定把消息路由到哪個(gè) Queue。RabbitMQ為routing key設(shè)定的長(zhǎng)度限制為255 bytes喻喳。
Binding(綁定)
RabbitMQ中通過(guò) Binding 將 Exchange 與 Queue 關(guān)聯(lián)起來(lái)另玖。
Binding key
在綁定(Binding) Exchange 與 Queue 關(guān)系的同時(shí),一般會(huì)指定一個(gè) binding key表伦。
Exchange Types (交換器類型)
RabbitMQ常用的Exchange Type有 Fanout谦去、 Direct、 Topic蹦哼、 Headers 這四種鳄哭。
-
Fanout
這種類型的Exchange路由規(guī)則非常簡(jiǎn)單,它會(huì)把所有發(fā)送到該Exchange的消息路由到所有與它綁定的Queue中纲熏,這時(shí) Routing key 不起作用妆丘。
image.png -
Direct
這種類型的Exchange路由規(guī)則也很簡(jiǎn)單,它會(huì)把消息路由到那些 binding key 與 routing key完全匹配的Queue中局劲。
image.png
在這個(gè)設(shè)置中勺拣,我們可以看到兩個(gè)隊(duì)列Q1、Q2直接綁定到了交換器X上鱼填。 第一個(gè)隊(duì)列用綁定key橙色(orange)綁定药有,第二個(gè)隊(duì)列有兩個(gè)綁定,一個(gè)綁定key為黑色(black),另一個(gè)為綠色(green)愤惰。
在這種設(shè)置中苇经,通過(guò)路由鍵橙色發(fā)布到交換器的消息將被路由到隊(duì)列Q1。 帶有黑色或綠色的路由鍵的消息將進(jìn)入Q2宦言。 所有其他消息將被丟棄扇单。
在上面列子中,routingKey=”error”的消息發(fā)送Exchange后奠旺,Exchange會(huì)將消息路由到Queue1(amqp.gen-S9b…令花,這是由RabbitMQ自動(dòng)生成的Queue名稱)和Queue2(amqp.gen-Agl…);如果routingKey=”info”或routingKey=”warning”的消息發(fā)到Exchange凉倚,Exchange只會(huì)將消息路由到Queue2兼都。 所有其他消息將被丟棄。
- Topic
這種類型的Exchange的路由規(guī)則支持 binding key 和 routing key 的模糊匹配稽寒,會(huì)把消息路由到滿足條件的Queue扮碧。 binding key 中可以存在兩種特殊字符 *與 #,用于做模糊匹配杏糙,其中 * 用于匹配一個(gè)單詞慎王,# 用于匹配0個(gè)或多個(gè)單詞,單詞以符號(hào)“.”為分隔符宏侍。
以上圖中的配置為例赖淤,routingKey=”quick.orange.rabbit”的消息會(huì)同時(shí)路由到Q1與Q2,routingKey=”lazy.orange.fox”的消息會(huì)路由到Q1與Q2谅河,routingKey=”lazy.brown.fox”的消息會(huì)路由到Q2咱旱,routingKey=”lazy.pink.rabbit”的消息會(huì)路由到Q2(只會(huì)投遞給Q2一次,雖然這個(gè)routingKey與Q2的兩個(gè)bindingKey都匹配)绷耍;routingKey=”quick.brown.fox”吐限、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息將會(huì)被丟棄褂始,因?yàn)樗鼈儧](méi)有匹配任何bindingKey诸典。
- Headers
這種類型的Exchange不依賴于 routing key 與 binding key 的匹配規(guī)則來(lái)路由消息,而是根據(jù)發(fā)送的消息內(nèi)容中的 headers 屬性進(jìn)行匹配崎苗。
RPC
MQ本身是基于異步的消息處理狐粱,前面的示例中所有的生產(chǎn)者(P)將消息發(fā)送到RabbitMQ后不會(huì)知道消費(fèi)者(C)處理成功或者失敗(甚至連有沒(méi)有消費(fèi)者來(lái)處理這條消息都不知道)胆数。
但實(shí)際的應(yīng)用場(chǎng)景中肌蜻,我們很可能需要一些同步處理,需要同步等待服務(wù)端將我的消息處理完成后再進(jìn)行下一步處理幅慌。這相當(dāng)于RPC(Remote Procedure Call宋欺,遠(yuǎn)程過(guò)程調(diào)用)轰豆。在RabbitMQ中也支持RPC胰伍。
RabbitMQ中實(shí)現(xiàn)RPC的機(jī)制是:
- 客戶端發(fā)送請(qǐng)求(消息)時(shí)齿诞,在消息的屬性中(MessageProperties,在AMQP協(xié)議中定義了14中properties骂租,這些屬性會(huì)隨著消息一起發(fā)送)設(shè)置兩個(gè)值replyTo(一個(gè)Queue名稱祷杈,用于告訴服務(wù)器處理完成后將通知我的消息發(fā)送到這個(gè)Queue中)和correlationId(此次請(qǐng)求的標(biāo)識(shí)號(hào),服務(wù)器處理完成后需要將此屬性返還渗饮,客戶端將根據(jù)這個(gè)id了解哪條請(qǐng)求被成功執(zhí)行了或執(zhí)行失數)
- 服務(wù)器端收到消息并處理
- 服務(wù)器端處理完消息后,將生成一條應(yīng)答消息到replyTo指定的Queue互站,同時(shí)帶上correlationId屬性
客戶端之前已訂閱replyTo指定的Queue私蕾,從中收到服務(wù)器的應(yīng)答消息后,根據(jù)其中的correlationId屬性分析哪條請(qǐng)求被執(zhí)行了胡桃,根據(jù)執(zhí)行結(jié)果進(jìn)行后續(xù)業(yè)務(wù)處理
參考: