一益咬、RabbitMQ基礎(chǔ)概念
1.定義
RabbitMQ是基于AMQP協(xié)議開發(fā)的一個(gè)MQ產(chǎn)品, 首先我們以Web管理頁面為入口庆捺,來了解下RabbitMQ的一些基礎(chǔ)概念朋鞍,這樣我們后續(xù)才好針對(duì)這些基礎(chǔ)概念進(jìn)行編程實(shí)戰(zhàn)殿较。
可以參照下圖來理解RabbitMQ當(dāng)中的基礎(chǔ)概念:
2.虛擬主機(jī) virtual host
這個(gè)在之前搭建時(shí)已經(jīng)體驗(yàn)過了。RabbitMQ出于服務(wù)器復(fù)用的想法辩蛋,可以在一個(gè)RabbitMQ集群中劃分出多個(gè)虛擬主機(jī)凑术,每一個(gè)虛擬主機(jī)都有AMQP的全套基礎(chǔ)組件,并且可以針對(duì)每個(gè)虛擬主機(jī)進(jìn)行權(quán)限以及數(shù)據(jù)分配防嗡,并且不同虛擬主機(jī)之間是完全隔離的变汪。
3.連接 Connection
客戶端與RabbitMQ進(jìn)行交互,首先就需要建立一個(gè)TPC連接本鸣,這個(gè)連接就是Connection疫衩。
4.信道 Channel
一旦客戶端與RabbitMQ建立了連接,就會(huì)分配一個(gè)AMQP信道 Channel。每個(gè)信道都會(huì)被分配一個(gè)唯一的ID闷煤。也可以理解為是客戶端與RabbitMQ實(shí)際進(jìn)行數(shù)據(jù)交互的通道童芹,我們后續(xù)的大多數(shù)的數(shù)據(jù)操作都是在信道 Channel 這個(gè)層面展開的。
RabbitMQ為了減少性能開銷鲤拿,也會(huì)在一個(gè)Connection中建立多個(gè)Channel假褪,這樣便于客戶端進(jìn)行多線程連接,這些連接會(huì)復(fù)用同一個(gè)Connection的TCP通道近顷,所以在實(shí)際業(yè)務(wù)中生音,對(duì)于Connection和Channel的分配也需要根據(jù)實(shí)際情況進(jìn)行考量。
5.交換機(jī) Exchange
這是RabbitMQ中進(jìn)行數(shù)據(jù)路由的重要組件窒升。消息發(fā)送到RabbitMQ中后缀遍,會(huì)首先進(jìn)入一個(gè)交換機(jī),然后由交換機(jī)負(fù)責(zé)將數(shù)據(jù)轉(zhuǎn)發(fā)到不同的隊(duì)列中饱须。RabbitMQ中有多種不同類型的交換機(jī)來支持不同的路由策略域醇。從Web管理界面就能看到,在每個(gè)虛擬主機(jī)中蓉媳,RabbitMQ都會(huì)默認(rèn)創(chuàng)建幾個(gè)不同類型的交換機(jī)來譬挚。
交換機(jī)多用來與生產(chǎn)者打交道。生產(chǎn)者發(fā)送的消息通過Exchange交換機(jī)分配到各個(gè)不同的Queue隊(duì)列上酪呻,而對(duì)于消息消費(fèi)者來說减宣,通常只需要關(guān)注自己感興趣的隊(duì)列就可以了。
6.隊(duì)列 Queue
?隊(duì)列是實(shí)際保存數(shù)據(jù)的最小單位玩荠。隊(duì)列結(jié)構(gòu)天生就具有FIFO的順序漆腌,消息最終都會(huì)被分發(fā)到不同的隊(duì)列當(dāng)中,然后才被消費(fèi)者進(jìn)行消費(fèi)處理姨蟋。這也是最近RabbitMQ功能變動(dòng)最大的地方屉凯。最為常用的是經(jīng)典隊(duì)列Classic。RabbitMQ 3.8.X版本添加了Quorum隊(duì)列眼溶,3.9.X又添加了Stream隊(duì)列悠砚。
6.1Classic 經(jīng)典隊(duì)列
這是RabbitMQ最為經(jīng)典的隊(duì)列類型。在單機(jī)環(huán)境中堂飞,擁有比較高的消息可靠性灌旧。
?在這個(gè)圖中可以看到,經(jīng)典隊(duì)列可以選擇是否持久化(Durability)以及是否自動(dòng)刪除(Auto delete)兩個(gè)屬性绰筛。
其中枢泰,Durability有兩個(gè)選項(xiàng),Durable和Transient铝噩。 Durable表示隊(duì)列會(huì)將消息保存到硬盤衡蚂,這樣消息的安全性更高。但是同時(shí),由于需要有更多的IO操作毛甲,所以生產(chǎn)和消費(fèi)消息的性能年叮,相比Transient會(huì)比較低。
Auto delete屬性如果選擇為是玻募,那隊(duì)列將在至少一個(gè)消費(fèi)者已經(jīng)連接只损,然后所有的消費(fèi)者都斷開連接后刪除自己。后面的Arguments部分七咧,還有非常多的參數(shù)跃惫,可以點(diǎn)擊后面的問號(hào)逐步了解。
其實(shí)這時(shí)艾栋,應(yīng)該結(jié)合kafka和RocketMQ這幾個(gè)MQ產(chǎn)品爆存,對(duì)隊(duì)列有一個(gè)更全面的理解。在MQ當(dāng)中蝗砾,隊(duì)列其實(shí)是MQ集群中的一個(gè)數(shù)據(jù)分片的最小單位终蒂。在MQ集群中,一個(gè)Topic會(huì)對(duì)應(yīng)多個(gè)隊(duì)列遥诉,而這些隊(duì)列會(huì)均勻的分配到集群的各個(gè)節(jié)點(diǎn)當(dāng)中。
6.2Quorum 仲裁隊(duì)列
仲裁隊(duì)列噪叙,是RabbitMQ從3.8.0版本矮锈,引入的一個(gè)新的隊(duì)列類型,整個(gè)3.8.X版本睁蕾,也都是在圍繞仲裁隊(duì)列進(jìn)行完善和優(yōu)化苞笨。仲裁隊(duì)列相比Classic經(jīng)典隊(duì)列,在分布式環(huán)境下對(duì)消息的可靠性保障更高子眶。官方文檔中表示瀑凝,未來會(huì)使用Quorum仲裁隊(duì)列代替?zhèn)鹘y(tǒng)Classic隊(duì)列。
關(guān)于Quorum的詳細(xì)介紹見?https://www.rabbitmq.com/quorum-queues.html臭杰,這里只是對(duì)其中的重點(diǎn)進(jìn)行下解讀粤咪。
?Quorum是基于Raft一致性協(xié)議實(shí)現(xiàn)的一種新型的分布式消息隊(duì)列,它實(shí)現(xiàn)了持久化渴杆,多備份的FIFO隊(duì)列寥枝,主要就是針對(duì)RabbitMQ的鏡像模式設(shè)計(jì)的。簡(jiǎn)單理解就是quorum隊(duì)列中的消息需要有集群中多半節(jié)點(diǎn)同意確認(rèn)后磁奖,才會(huì)寫入到隊(duì)列中囊拜。這種隊(duì)列類似于RocketMQ當(dāng)中的DLedger集群。這種方式可以保證消息在集群內(nèi)部不會(huì)丟失比搭。同時(shí)冠跷,Quorum是以犧牲很多高級(jí)隊(duì)列特性為代價(jià),來進(jìn)一步保證消息在分布式環(huán)境下的高可靠。
從整體功能上來說蜜托,Quorum隊(duì)列是在Classic經(jīng)典隊(duì)列的基礎(chǔ)上做減法抄囚,因此對(duì)于RabbitMQ的長(zhǎng)期使用者而言,其實(shí)是會(huì)影響使用體驗(yàn)的盗冷。它與普通隊(duì)列的區(qū)別:
?從官方這個(gè)比較圖就能看到怠苔,Quorum隊(duì)列大部分功能都是在Classic隊(duì)列基礎(chǔ)上做減法,比如Non-durable queues表示是非持久化的內(nèi)存隊(duì)列仪糖。Exclusivity表示獨(dú)占隊(duì)列柑司,即表示隊(duì)列只能由聲明該隊(duì)列的Connection連接來進(jìn)行使用,包括隊(duì)列創(chuàng)建锅劝、刪除攒驰、收發(fā)消息等,并且獨(dú)占隊(duì)列會(huì)在聲明該隊(duì)列的Connection斷開后自動(dòng)刪除故爵。
其中有個(gè)特例就是這個(gè)Poison Message(有毒的消息)玻粪。所謂毒消息是指消息一直不能被消費(fèi)者正常消費(fèi)(可能是由于消費(fèi)者失敗或者消費(fèi)邏輯有問題等),就會(huì)導(dǎo)致消息不斷的重新入隊(duì)诬垂,這樣這些消息就成為了毒消息劲室。這些讀消息應(yīng)該有保障機(jī)制進(jìn)行標(biāo)記并及時(shí)刪除。Quorum隊(duì)列會(huì)持續(xù)跟蹤消息的失敗投遞嘗試次數(shù)结窘,并記錄在"x-delivery-count"這樣一個(gè)頭部參數(shù)中很洋。然后,就可以通過設(shè)置 Delivery limit參數(shù)來定制一個(gè)毒消息的刪除策略隧枫。當(dāng)消息的重復(fù)投遞次數(shù)超過了Delivery limit參數(shù)閾值時(shí)喉磁,RabbitMQ就會(huì)刪除這些毒消息。當(dāng)然官脓,如果配置了死信隊(duì)列的話协怒,就會(huì)進(jìn)入對(duì)應(yīng)的死信隊(duì)列。
Quorum隊(duì)列更適合于 隊(duì)列長(zhǎng)期存在卑笨,并且對(duì)容錯(cuò)孕暇、數(shù)據(jù)安全方面的要求比低延遲、不持久等高級(jí)隊(duì)列更能要求更嚴(yán)格的場(chǎng)景赤兴。例如:電商系統(tǒng)的訂單芭商,引入MQ后,處理速度可以慢一點(diǎn)搀缠,但是訂單不能丟失铛楣。
也對(duì)應(yīng)以下一些不適合使用的場(chǎng)景:
1、一些臨時(shí)使用的隊(duì)列:比如transient臨時(shí)隊(duì)列艺普,exclusive獨(dú)占隊(duì)列簸州,或者經(jīng)常會(huì)修改和刪除的隊(duì)列鉴竭。
2、對(duì)消息低延遲要求高: 一致性算法會(huì)影響消息的延遲岸浑。
3搏存、對(duì)數(shù)據(jù)安全性要求不高:Quorum隊(duì)列需要消費(fèi)者手動(dòng)通知或者生產(chǎn)者手動(dòng)確認(rèn)。
4矢洲、隊(duì)列消息積壓嚴(yán)重 : 如果隊(duì)列中的消息很大璧眠,或者積壓的消息很多,就不要使用Quorum隊(duì)列读虏。Quorum隊(duì)列當(dāng)前會(huì)將所有消息始終保存在內(nèi)存中责静,直到達(dá)到內(nèi)存使用極限。
6.3Stream隊(duì)列
Stream隊(duì)列是RabbitMQ自3.9.0版本開始引入的一種新的數(shù)據(jù)隊(duì)列類型盖桥,也是目前官方最為推薦的隊(duì)列類型灾螃。這種隊(duì)列類型的消息是持久化到磁盤并且具備分布式備份的,更適合于消費(fèi)者多揩徊,讀消息非常頻繁的場(chǎng)景腰鬼。
Stream隊(duì)列的官方文檔地址:?https://www.rabbitmq.com/streams.html
Stream隊(duì)列的核心是以append-only只添加的日志來記錄消息,整體來說塑荒,就是消息將以append-only的方式持久化到日志文件中熄赡,然后通過調(diào)整每個(gè)消費(fèi)者的消費(fèi)進(jìn)度offset,來實(shí)現(xiàn)消息的多次分發(fā)齿税。下方有幾個(gè)屬性也都是來定義日志文件的大小以及保存時(shí)間本谜。如果你熟悉Kafka或者RocketMQ,會(huì)對(duì)這種日志記錄消息的方式非常熟悉偎窘。這種隊(duì)列提供了RabbitMQ已有的其他隊(duì)列類型不太好實(shí)現(xiàn)的四個(gè)特點(diǎn):
1、large fan-outs 大規(guī)模分發(fā)
當(dāng)想要向多個(gè)訂閱者發(fā)送相同的消息時(shí)溜在,以往的隊(duì)列類型必須為每個(gè)消費(fèi)者綁定一個(gè)專用的隊(duì)列陌知。如果消費(fèi)者的數(shù)量很大,這就會(huì)導(dǎo)致性能低下掖肋。而Stream隊(duì)列允許任意數(shù)量的消費(fèi)者使用同一個(gè)隊(duì)列的消息仆葡,從而消除綁定多個(gè)隊(duì)列的需求。
2志笼、Replay/Time-travelling 消息回溯
RabbitMQ已有的這些隊(duì)列類型沿盅,在消費(fèi)者處理完消息后,消息都會(huì)從隊(duì)列中刪除纫溃,因此腰涧,無法重新讀取已經(jīng)消費(fèi)過的消息。而Stream隊(duì)列允許用戶在日志的任何一個(gè)連接點(diǎn)開始重新讀取數(shù)據(jù)紊浩。
3窖铡、Throughput Performance 高吞吐性能
Strem隊(duì)列的設(shè)計(jì)以性能為主要目標(biāo)疗锐,對(duì)消息傳遞吞吐量的提升非常明顯。
4费彼、Large logs 大日志
RabbitMQ一直以來有一個(gè)讓人詬病的地方滑臊,就是當(dāng)隊(duì)列中積累的消息過多時(shí),性能下降會(huì)非常明顯箍铲。但是Stream隊(duì)列的設(shè)計(jì)目標(biāo)就是以最小的內(nèi)存開銷高效地存儲(chǔ)大量的數(shù)據(jù)雇卷。
整體上來說,RabbitMQ的Stream隊(duì)列颠猴,其實(shí)有很多地方借鑒了其他MQ產(chǎn)品的優(yōu)點(diǎn)关划,在保證消息可靠性的基礎(chǔ)上,著力提高隊(duì)列的消息吞吐量以及消息轉(zhuǎn)發(fā)性能芙粱。因此祭玉,Stream也是在視圖解決一個(gè)RabbitMQ一直以來,讓人詬病的缺點(diǎn)春畔,就是當(dāng)隊(duì)列中積累的消息過多時(shí)脱货,性能下降會(huì)非常明顯的問題。RabbitMQ以往更專注于企業(yè)級(jí)的內(nèi)部使用律姨,但是從這些隊(duì)列功能可以看到振峻,Rabbitmq也在向更復(fù)雜的互聯(lián)網(wǎng)環(huán)境靠攏,未來對(duì)于RabbitMQ的了解择份,也需要隨著版本推進(jìn)扣孟,不斷更新。
但是荣赶,從整體功能上來講凤价,隊(duì)列只不過是一個(gè)實(shí)現(xiàn)FIFO的數(shù)據(jù)結(jié)構(gòu)而已,這種數(shù)據(jù)結(jié)構(gòu)其實(shí)是越簡(jiǎn)單越好拔创。而當(dāng)前RabbitMQ區(qū)分出這么多種隊(duì)列類型利诺,其實(shí)極大的增加了應(yīng)用層面的使用難度,應(yīng)用層面必須有一些不同的機(jī)制兼容各種隊(duì)列剩燥。所以慢逾,在未來版本中,RabbitMQ很可能還是會(huì)將這幾種隊(duì)列類型最終統(tǒng)一成一種類型灭红。例如官方已經(jīng)說明未來會(huì)使用Quorum隊(duì)列類型替代經(jīng)典隊(duì)列侣滩,到那時(shí),應(yīng)用層很多工具就可以得到簡(jiǎn)化变擒,比如不需要再設(shè)置durable和exclusive屬性君珠。雖然Quorum隊(duì)列和Stream隊(duì)列目前還沒有合并的打算,但是在應(yīng)用層面來看娇斑,他們兩者是沖突的葛躏,是一種競(jìng)爭(zhēng)關(guān)系澈段,未來也很有可能最終統(tǒng)一保留成一種類型。至于未來走向如何舰攒,我們可以在后續(xù)版本拭目以待败富。
二、RabbitMQ編程模型
?RabbitMQ的使用生態(tài)已經(jīng)相當(dāng)龐大摩窃,支持非常多的語言兽叮。而就以java而論,也已經(jīng)支持非常多的擴(kuò)展猾愿。我們接下來會(huì)從原生API鹦聪、SpringBoot集成、SpringCloudStream集成蒂秘,三個(gè)角度來詳細(xì)學(xué)習(xí)RabbitMQ的編程模型泽本。在學(xué)習(xí)編程模型時(shí),要注意下姻僧,新推出的Stream隊(duì)列规丽,他的客戶端跟另外兩種隊(duì)列稍有不同。
1.原生API
使用RabbitMQ提供的原生客戶端API進(jìn)行交互撇贺。先來了解下如何使用Classic和Quorum隊(duì)列赌莺。至于Stream隊(duì)列,目前他使用的是和這兩個(gè)隊(duì)列不同的客戶端松嘶,所以會(huì)在后面一個(gè)章節(jié)單獨(dú)討論艘狭。
1.1maven依賴
1.2基礎(chǔ)編程模型
?這些各種各樣的消息模型其實(shí)都對(duì)應(yīng)一個(gè)比較統(tǒng)一的基礎(chǔ)編程模型。
step1翠订、首先創(chuàng)建連接巢音,獲取Channel
step2、聲明queue隊(duì)列
關(guān)鍵代碼:
api說明:
Declare a queue
Parameters:
queue?the name of the queue
durable?true if we are declaring a durable queue (the queue will survive a server restart)
exclusive?true if we are declaring an exclusive queue (restricted to this connection)
autoDelete?true if we are declaring an autodelete queue (server will delete it when no longer in use)
arguments?other properties (construction arguments) for the queue
Returns:
a declaration-confirm method to indicate the queue was successfully declared
Throws:
java.io.IOException - if an error is encountered
See Also:
com.rabbitmq.client.AMQP.Queue.Declare
com.rabbitmq.client.AMQP.Queue.DeclareOk
如果要聲明一個(gè)Quorum隊(duì)列尽超,則只需要在后面的arguments中傳入一個(gè)參數(shù)官撼,x-queue-type,參數(shù)值設(shè)定為quorum橙弱。
注意:1、對(duì)于Quorum類型燥狰,durable參數(shù)就必須是true了棘脐,設(shè)置成false的話,會(huì)報(bào)錯(cuò)龙致。同樣蛀缝,exclusive參數(shù)必須設(shè)置為false
如果要聲明一個(gè)Stream隊(duì)列,則?x-queue-type參數(shù)要設(shè)置為?stream?目代。
注意:1屈梁、同樣嗤练,durable參數(shù)必須是true,exclusive必須是false在讶。 -- 你應(yīng)該會(huì)想到煞抬,對(duì)于這兩種隊(duì)列,這兩個(gè)參數(shù)就是多余的了构哺,未來可以直接刪除革答。
2、x-max-length-bytes 表示日志文件的最大字節(jié)數(shù)曙强。x-stream-max-segment-size-bytes 每一個(gè)日志文件的最大大小残拐。這兩個(gè)是可選參數(shù),通常為了防止stream日志無限制累計(jì)碟嘴,都會(huì)配合stream隊(duì)列一起聲明溪食。
聲明的隊(duì)列,如果服務(wù)端沒有娜扇,那么會(huì)自動(dòng)創(chuàng)建错沃。但是如果服務(wù)端有了這個(gè)隊(duì)列,那么聲明的隊(duì)列屬性必須和服務(wù)端的隊(duì)列屬性一致才行袱衷。
step3捎废、Producer根據(jù)應(yīng)用場(chǎng)景發(fā)送消息到queue
關(guān)鍵代碼:
api說明:
Publish a message. Publishing to a non-existent exchange will result in a channel-level protocol exception, which closes the channel. Invocations of Channel#basicPublish will eventually block if a resource-driven alarm is in effect.
Parameters:
exchange?the exchange to publish the message to
routingKey?the routing key
props?other properties for the message - routing headers etc
body?the message body
其中exchange是一個(gè)Producer與queue的中間交互機(jī)制≈略铮可以讓Producer把消息按一定的規(guī)則發(fā)送到不同的queue登疗,不需要的話就傳空字符串。
step4嫌蚤、Consumer消費(fèi)消息
定義消費(fèi)者辐益,消費(fèi)消息進(jìn)行處理,并向RabbitMQ進(jìn)行消息確認(rèn)脱吱。確認(rèn)了之后就表明這個(gè)消息已經(jīng)消費(fèi)完了智政,否則RabbitMQ還會(huì)繼續(xù)讓別的消費(fèi)者實(shí)例來處理。
主要收集了兩種消費(fèi)方式:
1箱蝠、被動(dòng)消費(fèi)模式续捂,Consumer等待rabbitMQ 服務(wù)器將message推送過來再消費(fèi)。一般是啟一個(gè)一直掛起的線程來等待宦搬。
關(guān)鍵代碼:
api說明:
Start a non-nolocal, non-exclusive consumer, with a server-generated consumerTag.
Parameters:
queue?the name of the queue
autoAck?true if the server should consider messages acknowledged once delivered; false if the server should expect explicit acknowledgements
callback?an interface to the consumer object
Returns:
the consumerTag generated by the server
其中autoAck是個(gè)關(guān)鍵牙瓢,autoAck為true則表示消息發(fā)送到該Consumer后就被Consumer消費(fèi)掉了,不需要再往其他Consumer轉(zhuǎn)發(fā)间校;為false則會(huì)繼續(xù)往其他Consumer轉(zhuǎn)發(fā)矾克。要注意如果每個(gè)Consumer一直為false,會(huì)導(dǎo)致消息不停的被轉(zhuǎn)發(fā)憔足,不停的吞噬系統(tǒng)資源胁附,最終造成宕機(jī)酒繁。
2、另一種是主動(dòng)消費(fèi)模式控妻,Comsumer主動(dòng)到rabbitMQ服務(wù)器上去獲取指定的messge進(jìn)行消費(fèi)州袒。
關(guān)鍵代碼:
api說明:
Retrieve a message from a queue using com.rabbitmq.client.AMQP.Basic.Get
Parameters:
queue?the name of the queue
autoAck?true if the server should consider messages acknowledged once delivered; false if the server should expect explicit acknowledgements
Returns:
a GetResponse containing the retrieved message data
Throws:
java.io.IOException - if an error is encountered
See Also:
com.rabbitmq.client.AMQP.Basic.Get
com.rabbitmq.client.AMQP.Basic.GetOk
com.rabbitmq.client.AMQP.Basic.GetEmpty
3、Stream隊(duì)列消費(fèi)饼暑,在當(dāng)前版本下稳析,消費(fèi)Stream隊(duì)列時(shí),需要注意三板斧的設(shè)置弓叛。
a)channel必須設(shè)置basicQos屬性彰居。
b)正確聲明Stream隊(duì)列。
c)消費(fèi)時(shí)需要指定offset撰筷。
具體參看示例代碼陈惰。注意其中的注釋。
這三點(diǎn)要尤其注意毕籽,因?yàn)楫?dāng)前版本的錯(cuò)誤提示非常讓人著急抬闯。
step5、完成以后關(guān)閉連接关筒,釋放資源
channel.close();?
1.3官網(wǎng)的消息場(chǎng)景
原生API重點(diǎn)就是學(xué)習(xí)并理解RabbitMQ的官方消息模型溶握。具體參見?https://www.rabbitmq.com/getstarted.html?。其中可以看到蒸播,RabbitMQ官方提供了總共七種消息模型睡榆,這其中,6 RPC部分是使用RabbitMQ來實(shí)現(xiàn)RPC遠(yuǎn)程調(diào)用袍榆,這個(gè)場(chǎng)景通常不需要使用MQ來實(shí)現(xiàn)胀屿,所以也就不當(dāng)作重點(diǎn)來學(xué)習(xí)。而7 Publisher Confirms是當(dāng)前版本新引進(jìn)來的一種消息模型包雀,對(duì)保護(hù)消息可靠性有很重要的意義宿崭。
1:hello world 體驗(yàn)
最直接的方式,P端發(fā)送一個(gè)消息到一個(gè)指定的queue才写,中間不需要任何exchange規(guī)則葡兑。C端按queue方式進(jìn)行消費(fèi)。
關(guān)鍵代碼:(其實(shí)關(guān)鍵的區(qū)別也就是幾個(gè)聲明上的不同)
producer:
consumer:
2: Work queues 工作序列
這就是kafka同一groupId的消息分發(fā)模式
工作任務(wù)模式赞草,領(lǐng)導(dǎo)部署一個(gè)任務(wù)讹堤,由下面的一個(gè)員工來處理。
producer:
consumer:
這個(gè)模式應(yīng)該是最常用的模式房资,也是官網(wǎng)討論比較詳細(xì)的一種模式蜕劝,所以官網(wǎng)上也對(duì)這種模式做了重點(diǎn)講述檀头。
首先轰异。Consumer端的autoAck字段設(shè)置的是false,這表示consumer在接收到消息后不會(huì)自動(dòng)反饋服務(wù)器已消費(fèi)了message岖沛,而要改在對(duì)message處理完成了之后,再調(diào)用channel.basicAck來通知服務(wù)器已經(jīng)消費(fèi)了該message.這樣即使Consumer在執(zhí)行message過程中出問題了搭独,也不會(huì)造成message被忽略婴削,因?yàn)闆]有ack的message會(huì)被服務(wù)器重新進(jìn)行投遞。
但是牙肝,這其中也要注意一個(gè)很常見的BUG唉俗,就是如果所有的consumer都忘記調(diào)用basicAck()了,就會(huì)造成message被不停的分發(fā)配椭,也就造成不斷的消耗系統(tǒng)資源虫溜。這也就是 Poison Message(毒消息)
其次,官方特意提到的message的持久性股缸。關(guān)鍵的message不能因?yàn)榉?wù)出現(xiàn)問題而被忽略衡楞。還要注意,官方特意提到敦姻,所有的queue是不能被多次定義的瘾境。如果一個(gè)queue在開始時(shí)被聲明為durable,那在后面再次聲明這個(gè)queue時(shí)镰惦,即使聲明為 not durable迷守,那這個(gè)queue的結(jié)果也還是durable的。
然后旺入,是中間件最為關(guān)鍵的分發(fā)方式兑凿。這里,RabbitMQ默認(rèn)是采用的fair dispatch眨业,也叫round-robin模式急膀,就是把消息輪詢,在所有consumer中輪流發(fā)送龄捡。這種方式卓嫂,沒有考慮消息處理的復(fù)雜度以及consumer的處理能力。而他們改進(jìn)后的方案聘殖,是consumer可以向服務(wù)器聲明一個(gè)prefetchCount晨雳,我把他叫做預(yù)處理能力值。channel.basicQos(prefetchCount);表示當(dāng)前這個(gè)consumer可以同時(shí)處理幾個(gè)message奸腺。這樣服務(wù)器在進(jìn)行消息發(fā)送前餐禁,會(huì)檢查這個(gè)consumer當(dāng)前正在處理中的message(message已經(jīng)發(fā)送,但是未收到consumer的basicAck)有幾個(gè)突照,如果超過了這個(gè)consumer節(jié)點(diǎn)的能力值帮非,就不再往這個(gè)consumer發(fā)布。
這種模式,官方也指出還是有問題的末盔,消息有可能全部阻塞筑舅,所有consumer節(jié)點(diǎn)都超過了能力值,那消息就阻塞在服務(wù)器上陨舱,這時(shí)需要自己及時(shí)發(fā)現(xiàn)這個(gè)問題翠拣,采取措施,比如增加consumer節(jié)點(diǎn)或者其他策略:
Note about queue size If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.
另外官網(wǎng)上沒有深入提到的游盲,就是還是沒有考慮到message處理的復(fù)雜程度误墓。有的message處理可能很簡(jiǎn)單,有的可能很復(fù)雜益缎,現(xiàn)在還是將所有message的處理程度當(dāng)成一樣的谜慌。還是有缺陷的,但是目前也只看到dubbo里對(duì)單個(gè)服務(wù)有權(quán)重值的概念莺奔,涉及到了這個(gè)問題畦娄。
3:Publish/Subscribe 訂閱 發(fā)布 機(jī)制
type為fanout?的exchange:
這個(gè)機(jī)制是對(duì)上面的一種補(bǔ)充。也就是把preducer與Consumer進(jìn)行進(jìn)一步的解耦弊仪。producer只負(fù)責(zé)發(fā)送消息熙卡,至于消息進(jìn)入哪個(gè)queue,由exchange來分配励饵。如上圖驳癌,就是把producer發(fā)送的消息,交由exchange同時(shí)發(fā)送到兩個(gè)queue里役听,然后由不同的Consumer去進(jìn)行消費(fèi)颓鲜。
關(guān)鍵代碼 ===》 producer: //只負(fù)責(zé)往exchange里發(fā)消息,后面的事情不管典予。
receiver: //將消費(fèi)的目標(biāo)隊(duì)列綁定到exchange上甜滨。
關(guān)鍵處就是type為”fanout” 的exchange,這種類型的exchange只負(fù)責(zé)往所有已綁定的隊(duì)列上發(fā)送消息。
4:Routing 基于內(nèi)容的路由
type為”direct” 的exchange
這種模式一看圖就清晰了瘤袖。 在上一章 exchange 往所有隊(duì)列發(fā)送消息的基礎(chǔ)上衣摩,增加一個(gè)路由配置,指定exchange如何將不同類別的消息分發(fā)到不同的queue上捂敌。
關(guān)鍵代碼===> producer:
receiver:
5:Topics 話題
type為"topic" 的exchange
這個(gè)模式也就在上一個(gè)模式的基礎(chǔ)上艾扮,對(duì)routingKey進(jìn)行了模糊匹配
單詞之間用,隔開,* 代表一個(gè)具體的單詞占婉。# 代表0個(gè)或多個(gè)單詞泡嘴。
關(guān)鍵代碼===> producer:
receiver:
6:RPC 遠(yuǎn)程調(diào)用
遠(yuǎn)程調(diào)用是同步阻塞的調(diào)用遠(yuǎn)程服務(wù)并獲取結(jié)果。
RPC遠(yuǎn)程調(diào)用機(jī)制其實(shí)并不是消息中間件的處理強(qiáng)項(xiàng)逆济。畢竟消息隊(duì)列機(jī)制很大程度上來說就是為了緩沖同步RPC調(diào)用造成的瞬間高峰酌予。而RabbitMQ的同步調(diào)用示例磺箕,看著也確實(shí)怪怪的。并且抛虫,RPC遠(yuǎn)程調(diào)用的場(chǎng)景滞磺,也有太多可替代的技術(shù)會(huì)比用消息中間件處理得更優(yōu)雅,更流暢莱褒。
官網(wǎng)上這一大堆說明,其實(shí)我覺得就是表明涎劈,叫你不要用消息中間件來做RPC。所以關(guān)于這個(gè)RPC調(diào)用功能,就不再多做解釋了党觅。代碼實(shí)現(xiàn)可以參見官網(wǎng)萝玷,或者配套的示例代碼。
7:Publisher Confirms 發(fā)送者消息確認(rèn)
?RabbitMQ的消息可靠性是非常高的蹦浦,但是他以往的機(jī)制都是保證消息發(fā)送到了MQ之后扭吁,可以推送到消費(fèi)者消費(fèi),不會(huì)丟失消息盲镶。但是發(fā)送者發(fā)送消息是否成功是沒有保證的侥袜。我們可以回顧下,發(fā)送者發(fā)送消息的基礎(chǔ)API:Producer.basicPublish方法是沒有返回值的溉贿,也就是說枫吧,一次發(fā)送消息是否成功,應(yīng)用是不知道的宇色,這在業(yè)務(wù)上就容易造成消息丟失九杂。而這個(gè)模塊就是通過給發(fā)送者提供一些確認(rèn)機(jī)制,來保證這個(gè)消息發(fā)送的過程是成功的宣蠕。
如果了解了這個(gè)機(jī)制就會(huì)發(fā)現(xiàn)例隆,這個(gè)消息確認(rèn)機(jī)制就是跟RocketMQ的事務(wù)消息機(jī)制差不多的。而對(duì)于這個(gè)機(jī)制抢蚀,RocketMQ的支持明顯更優(yōu)雅镀层。
發(fā)送者確認(rèn)模式默認(rèn)是不開啟的,所以如果需要開啟發(fā)送者確認(rèn)模式皿曲,需要手動(dòng)在channel中進(jìn)行聲明鹿响。
在官網(wǎng)的示例中,重點(diǎn)解釋了三種策略:
a)發(fā)布單條消息
即發(fā)布一條消息就確認(rèn)一條消息谷饿。核心代碼:
channel.waitForConfirmsOrDie(5_000);這個(gè)方法就會(huì)在channel端等待RabbitMQ給出一個(gè)響應(yīng)惶我,用來表明這個(gè)消息已經(jīng)正確發(fā)送到了RabbitMQ服務(wù)端。但是要注意博投,這個(gè)方法會(huì)同步阻塞channel绸贡,在等待確認(rèn)期間,channel將不能再繼續(xù)發(fā)送消息,也就是說會(huì)明顯降低集群的發(fā)送速度即吞吐量听怕。
官方說明了捧挺,其實(shí)channel底層是異步工作的,會(huì)將channel阻塞住尿瞭,然后異步等待服務(wù)端發(fā)送一個(gè)確認(rèn)消息闽烙,才解除阻塞。但是我們?cè)谑褂脮r(shí)声搁,可以把他當(dāng)作一個(gè)同步工具來看待黑竞。
然后如果到了超時(shí)時(shí)間,還沒有收到服務(wù)端的確認(rèn)機(jī)制疏旨,那就會(huì)拋出異常很魂。然后通常處理這個(gè)異常的方式是記錄錯(cuò)誤日志或者嘗試重發(fā)消息,但是嘗試重發(fā)時(shí)一定要注意不要使程序陷入死循環(huán)檐涝。
b)發(fā)送批量消息
之前單條確認(rèn)的機(jī)制會(huì)對(duì)系統(tǒng)的吞吐量造成很大的影響遏匆,所以稍微中和一點(diǎn)的方式就是發(fā)送一批消息后,再一起確認(rèn)谁榜。
核心代碼:
這種方式可以稍微緩解下發(fā)送者確認(rèn)模式對(duì)吞吐量的影響幅聘。但是也有個(gè)固有的問題就是,當(dāng)確認(rèn)出現(xiàn)異常時(shí)窃植,發(fā)送者只能知道是這一批消息出問題了喊暖, 而無法確認(rèn)具體是哪一條消息出了問題。所以接下來就需要增加一個(gè)機(jī)制能夠具體對(duì)每一條發(fā)送出錯(cuò)的消息進(jìn)行處理撕瞧。
c)異步確認(rèn)消息
實(shí)現(xiàn)的方式也比較簡(jiǎn)單陵叽,Producer在channel中注冊(cè)監(jiān)聽器來對(duì)消息進(jìn)行確認(rèn)。核心代碼就是一個(gè):
按說監(jiān)聽只要注冊(cè)一個(gè)就可以了丛版,那為什么這里要注冊(cè)兩個(gè)呢巩掺?如果對(duì)照下RocketMQ的事務(wù)消息機(jī)制,這就很容易理解了页畦。發(fā)送者在發(fā)送完消息后胖替,就會(huì)執(zhí)行第一個(gè)監(jiān)聽器callback1,然后等服務(wù)端發(fā)過來的反饋后豫缨,再執(zhí)行第二個(gè)監(jiān)聽器callback2独令。
然后關(guān)于這個(gè)ConfirmCallback,這是個(gè)監(jiān)聽器接口好芭,里面只有一個(gè)方法: void handle(long sequenceNumber, boolean multiple) throws IOException;?
這方法中的兩個(gè)參數(shù):
1燃箭、sequenceNumer:這個(gè)是一個(gè)唯一的序列號(hào),代表一個(gè)唯一的消息舍败。在RabbitMQ中招狸,他的消息體只是一個(gè)二進(jìn)制數(shù)組敬拓,并不像RocketMQ一樣有一個(gè)封裝的對(duì)象,所以默認(rèn)消息是沒有序列號(hào)的裙戏。而RabbitMQ提供了一個(gè)方法int sequenceNumber = channel.getNextPublishSeqNo());來生成一個(gè)全局遞增的序列號(hào)乘凸。然后應(yīng)用程序需要自己來將這個(gè)序列號(hào)與消息對(duì)應(yīng)起來。沒錯(cuò)累榜!是的营勤!需要客戶端自己去做對(duì)應(yīng)!
2壹罚、multiple:這個(gè)是一個(gè)Boolean型的參數(shù)葛作。如果是false,就表示這一次只確認(rèn)了當(dāng)前一條消息渔嚷。如果是true,就表示RabbitMQ這一次確認(rèn)了一批消息稠曼,在sequenceNumber之前的所有消息都已經(jīng)確認(rèn)完成了形病。
對(duì)比下RocketMQ的事務(wù)消息機(jī)制,有沒有覺得很熟悉霞幅,但是又很別扭漠吻?當(dāng)然,考慮到這個(gè)對(duì)于RabbitMQ來說還是個(gè)新鮮玩意司恳,所以有理由相信這個(gè)機(jī)制在未來會(huì)越來越完善途乃。
2.SpringBoot集成
SpringBoot官方就集成了RabbitMQ,所以RabbitMQ與SpringBoot的集成是非常簡(jiǎn)單的扔傅。不過耍共,SpringBoot集成RabbitMQ的方式是按照Spring的一套統(tǒng)一的MQ模型創(chuàng)建的,因此SpringBoot集成插件中對(duì)于生產(chǎn)者猎塞、消息试读、消費(fèi)者等重要的對(duì)象模型,與RabbitMQ原生的各個(gè)組件有對(duì)應(yīng)關(guān)系荠耽,但是并不完全相同钩骇。這一點(diǎn)需要在后續(xù)試驗(yàn)過程中加深理解。
2.1引入依賴
SpringBoot官方集成了RabbitMQ铝量,只需要快速引入依賴包即可使用倘屹。RabbitMQ與SpringBoot集成的核心maven依賴就下面一個(gè)。
要特別注意下版本慢叨。我們這里采用的是SpringBoot的2.6.7版本的依賴發(fā)布包纽匙。不同版本下的配置方式會(huì)有變化。
然后所有的基礎(chǔ)運(yùn)行環(huán)境都在application.properties中進(jìn)行配置拍谐。所有配置以spring.rabbitmq開頭哄辣。通常按照示例進(jìn)行一些基礎(chǔ)的必要配置就可以跑了请梢。關(guān)于詳細(xì)的配置信息,可以參見RabbitProperties力穗,源碼中有各個(gè)字段說明毅弧。
如果需要更詳細(xì)的配置資料,那就需要到官方的github倉庫上去查了当窗。但是國(guó)內(nèi)訪問github的這個(gè)速度够坐,你是知道的。
2.2配置生產(chǎn)者
基礎(chǔ)的運(yùn)行環(huán)境參數(shù)以及生產(chǎn)者的一些默認(rèn)屬性配置都集中到了application.properties配置文件中崖面。所有配置項(xiàng)都以spring.rabbitmq開頭元咙。
關(guān)于詳細(xì)的配置信息,可以參見RabbitProperties類的源碼巫员,源碼中有各個(gè)字段的簡(jiǎn)單說明庶香。
如果需要更詳細(xì)的配置資料,那就需要去官方的github倉庫上去查了简识。github地址:?https://github.com/spring-projects/spring-amqp?赶掖。但是國(guó)內(nèi)訪問github的速度,你知道的七扰。
2.3聲明隊(duì)列
所有的exchange, queue, binding的配置奢赂,都需要以對(duì)象的方式聲明。默認(rèn)情況下颈走,這些業(yè)務(wù)對(duì)象一經(jīng)聲明膳灶,應(yīng)用就會(huì)自動(dòng)到RabbitMQ上常見對(duì)應(yīng)的業(yè)務(wù)對(duì)象。但是也是可以配置成綁定已有業(yè)務(wù)對(duì)象的立由。
業(yè)務(wù)對(duì)象的聲明方式轧钓,具體請(qǐng)參見示例工程。
詳細(xì)的屬性聲明锐膜,同樣參見github倉庫聋迎。
2.4使用RabbitmqTemplate對(duì)象發(fā)送消息
生產(chǎn)者的所有屬性都已經(jīng)在application.properties配置文件中進(jìn)行配置。項(xiàng)目啟動(dòng)時(shí)枣耀,就會(huì)在Spring容器中初始化一個(gè)RabbitmqTemplate對(duì)象霉晕,然后所有的發(fā)送消息操作都通過這個(gè)對(duì)象來進(jìn)行。
2.5使用@RabbitListener注解聲明消費(fèi)者
消費(fèi)者都是通過@RabbitListener注解來聲明捞奕。注解中包含了聲明消費(fèi)者隊(duì)列時(shí)所需要的重點(diǎn)參數(shù)牺堰。對(duì)照原生API,這些參數(shù)就不難理解了颅围。
但是當(dāng)要消費(fèi)Stream隊(duì)列時(shí)伟葫,還是要重點(diǎn)注意他的三個(gè)必要的步驟:
1、channel必須設(shè)置basicQos屬性院促。 channel對(duì)象可以在@RabbitListener聲明的消費(fèi)者方法中直接引用筏养,Spring框架會(huì)進(jìn)行注入斧抱。
2、正確聲明Stream隊(duì)列渐溶。 通過往Spring容器中注入Queue對(duì)象的方式聲明隊(duì)列辉浦。在Queue對(duì)象中傳入聲明Stream隊(duì)列所需要的參數(shù)。
3茎辐、消費(fèi)時(shí)需要指定offset宪郊。 可以通過注入Channel對(duì)象,使用原生API傳入offset屬性拖陆。
使用SpringBoot框架集成RabbitMQ后弛槐,開發(fā)過程可以得到很大的簡(jiǎn)化,所以使用過程并不難依啰,對(duì)照一下示例就能很快上手乎串。但是,需要理解一下的是速警,SpringBoot集成后的RabbitMQ中的很多概念叹誉,雖然都能跟原生API對(duì)應(yīng)上,但是這些模型中間都是做了轉(zhuǎn)換的坏瞄,比如Message桂对,就不是原生RabbitMQ中的消息了甩卓。使用SpringBoot框架鸠匀,尤其需要加深對(duì)RabbitMQ原生API的理解,這樣才能以不變應(yīng)萬變逾柿,深入理解各種看起來簡(jiǎn)單缀棍,但是其實(shí)坑很多的各種對(duì)象聲明方式。
2.6關(guān)于Stream隊(duì)列
在目前版本下机错,使用RabbitMQ的SpringBoot框架集成爬范,可以正常聲明Stream隊(duì)列,往Stream隊(duì)列發(fā)送消息弱匪,但是無法直接消費(fèi)Stream隊(duì)列了青瀑。
關(guān)于這個(gè)問題,還是需要從Stream隊(duì)列的三個(gè)重點(diǎn)操作入手萧诫。SpringBoot框架集成RabbitMQ后斥难,為了簡(jiǎn)化編程模型,就把channel帘饶,connection等這些關(guān)鍵對(duì)象給隱藏了哑诊,目前框架下,無法直接接入這些對(duì)象的注入過程及刻,所以無法直接使用镀裤。
如果非要使用Stream隊(duì)列竞阐,那么有兩種方式,一種是使用原生API的方式暑劝,在SpringBoot框架下自行封裝骆莹。另一種是使用RabbitMQ的Stream 插件。在服務(wù)端通過Strem插件打開TCP連接接口铃岔,并配合單獨(dú)提供的Stream客戶端使用汪疮。這種方式對(duì)應(yīng)用端的影響太重了,并且并沒有提供與SpringBoot框架的集成毁习,還需要自行完善智嚷,因此選擇使用的企業(yè)還比較少。
這里就不詳細(xì)介紹使用方式了纺且。關(guān)于Stream插件的使用和配置方式參見官方文檔:https://www.rabbitmq.com/stream.html盏道。配合Stream插件使用的客戶端有Java和GO兩個(gè)版本。其中Java版本客戶端參見git倉庫:https://github.com/rabbitmq/rabbitmq-stream-java-client?载碌。
其實(shí)關(guān)于Stream隊(duì)列猜嘱,現(xiàn)在也不需要著急上手,只是把他當(dāng)作一種特殊的隊(duì)列類型嫁艇,上手了解即可朗伶。因?yàn)橐环矫嫣碌募夹g(shù),往往還需要小白鼠多多驗(yàn)證步咪。另一方面论皆,現(xiàn)在RabbitMQ多種隊(duì)列并存的狀態(tài),在不久肯定會(huì)得到簡(jiǎn)化猾漫,到時(shí)候点晴,應(yīng)用層的使用方式也肯定會(huì)跟著變化。
推薦一篇博文?https://blog.csdn.net/roykingw/article/details/78404956?可以作為大家了解SpringBoot集成RabbitMQ的一個(gè)入門博文悯周。