AMQP 0.9.1介紹
AMQP 是什么
AMQP(高級(jí)消息隊(duì)列協(xié)議)是一個(gè)網(wǎng)絡(luò)協(xié)議箩艺,他支持符合要求的客戶端應(yīng)用(application)和消息中間件代理(messaging middleware broker)之間進(jìn)行通信香追。
消息代理和他們所扮演的角色
消息代理(message brokers)從發(fā)布者(publishers)亦稱生產(chǎn)者(producers)那兒接收消息孕暇,并根據(jù)既定的路由規(guī)則把收到的消息發(fā)送給處理消息的消費(fèi)者(consumers)磕谅。
由于AMQP是一個(gè)網(wǎng)絡(luò)協(xié)議点楼,所以這個(gè)過(guò)程中的發(fā)布者糖儡、消費(fèi)者、消息代理可以存在于不同的設(shè)備上贷币。
AMQP 模型簡(jiǎn)介
消息(message)被發(fā)布者(publisher)發(fā)送給交換機(jī)(exchange)击胜,交換機(jī)常常被比喻成郵局或者郵箱。然后交換機(jī)將收到的消息根據(jù)路由規(guī)則分發(fā)給綁定的隊(duì)列(queue)役纹。最后AMQP代理會(huì)將消息投遞給訂閱了此隊(duì)列的消費(fèi)者偶摔,或者消費(fèi)者按照需求自行獲取。
發(fā)布者(publisher)發(fā)布消息時(shí)可以給消息指定各種消息屬性(message meta-data)促脉。有些屬性有可能會(huì)被消息代理(brokers)使用辰斋,然而其他的屬性則是完全不透明的,它們只能被接收消息的應(yīng)用所使用瘸味。
從安全角度考慮宫仗,網(wǎng)絡(luò)是不可靠的,接收消息的應(yīng)用也有可能在處理消息的時(shí)候失敗旁仿∨悍颍基于此原因,AMQP模塊包含了一個(gè)消息確認(rèn)(message acknowledgements)的概念:當(dāng)一個(gè)消息從隊(duì)列中投遞給消費(fèi)者后(consumer)丁逝,消費(fèi)者會(huì)通知一下消息代理(broker)汁胆,這個(gè)可以是自動(dòng)的也可以由處理消息的應(yīng)用的開發(fā)者執(zhí)行。當(dāng)“消息確認(rèn)”被啟用的時(shí)候霜幼,消息代理不會(huì)完全將消息從隊(duì)列中刪除,直到它收到來(lái)自消費(fèi)者的確認(rèn)回執(zhí)(acknowledgement)誉尖。
在某些情況下罪既,例如當(dāng)一個(gè)消息無(wú)法被成功路由時(shí),消息或許會(huì)被返回給發(fā)布者并被丟棄∽粮校或者丢间,如果消息代理執(zhí)行了延期操作,消息會(huì)被放入一個(gè)所謂的死信隊(duì)列中驹针。此時(shí)烘挫,消息發(fā)布者可以選擇某些參數(shù)來(lái)處理這些特殊情況。
隊(duì)列柬甥,交換機(jī)和綁定統(tǒng)稱為AMQP實(shí)體(AMQP entities)饮六。
AMQP 是一個(gè)可編程的協(xié)議
AMQP 0-9-1是一個(gè)可編程協(xié)議,某種意義上說(shuō)AMQP的實(shí)體和路由規(guī)則是由應(yīng)用本身定義的苛蒲,而不是由消息代理定義卤橄。包括像聲明隊(duì)列和交換機(jī),定義他們之間的綁定臂外,訂閱隊(duì)列等等關(guān)于協(xié)議本身的操作窟扑。
這雖然能讓開發(fā)人員自由發(fā)揮,但也需要他們注意潛在的定義沖突漏健。當(dāng)然這在實(shí)踐中很少會(huì)發(fā)生嚎货,如果發(fā)生,會(huì)以配置錯(cuò)誤(misconfiguration)的形式表現(xiàn)出來(lái)蔫浆。
應(yīng)用程序(Applications)聲明AMQP實(shí)體厂抖,定義需要的路由方案,或者刪除不再需要的AMQP實(shí)體克懊。
交換機(jī)和交換機(jī)類型
交換機(jī)是用來(lái)發(fā)送消息的AMQP實(shí)體忱辅。交換機(jī)拿到一個(gè)消息之后將它路由給一個(gè)或零個(gè)隊(duì)列。它使用哪種路由算法是由交換機(jī)類型和被稱作綁定(bindings)的規(guī)則所決定的谭溉。AMQP 0-9-1的代理提供了四種交換機(jī)
name(交換機(jī)類型) | default pre-declared names(預(yù)聲明的默認(rèn)名稱) |
---|---|
direct exchange直連交換機(jī) | (Empty string)and amq.direct |
fanout exchange扇形交換機(jī) | amq.fanout |
topic exchange主題交換機(jī) | amq.topic |
headers exchange頭交換機(jī) | amq.match (and amq.headers in RabbitMq) |
除交換機(jī)類型外墙懂,在聲明交換機(jī)時(shí)還可以附帶許多其他的屬性,其中最重要的幾個(gè)分別是:Durability (消息代理重啟后扮念,交換機(jī)是否還存在),Auto-delete(當(dāng)所有與之綁定的消息隊(duì)列都完成了對(duì)此交換機(jī)的使用后损搬,刪掉它),Arguments(依賴代理本身)。
交換機(jī)可以有兩個(gè)狀態(tài):持久(durable)柜与、暫存(transient)巧勤。持久化的交換機(jī)會(huì)在消息代理(broker)重啟后依舊存在,而暫存的交換機(jī)則不會(huì)(它們需要在代理再次上線后重新被聲明)弄匕。然而并不是所有的應(yīng)用場(chǎng)景都需要持久化的交換機(jī)颅悉。
默認(rèn)交換機(jī)
默認(rèn)交換機(jī)(default exchange)實(shí)際上是一個(gè)由消息代理預(yù)先聲明好的沒(méi)有名字(名字為空字符串)的直連交換機(jī)(direct exchange)。它有一個(gè)特殊的屬性使得它對(duì)于簡(jiǎn)單應(yīng)用特別有用處:那就是每個(gè)新建隊(duì)列(queue)都會(huì)自動(dòng)綁定到默認(rèn)交換機(jī)上迁匠,綁定的路由鍵(routing key)名稱與隊(duì)列名稱相同剩瓶。
舉個(gè)栗子:當(dāng)你聲明了一個(gè)名為"search-indexing-online"的隊(duì)列驹溃,AMQP代理會(huì)自動(dòng)將其綁定到默認(rèn)交換機(jī)上,綁定(binding)的路由鍵名稱也是為"search-indexing-online"延曙。因此豌鹤,當(dāng)攜帶著名為"search-indexing-online"的路由鍵的消息被發(fā)送到默認(rèn)交換機(jī)的時(shí)候,此消息會(huì)被默認(rèn)交換機(jī)路由至名為"search-indexing-online"的隊(duì)列中枝缔。換句話說(shuō)布疙,默認(rèn)交換機(jī)看起來(lái)貌似能夠直接將消息投遞給隊(duì)列,盡管技術(shù)上并沒(méi)有做相關(guān)的操作愿卸。
直連交換機(jī)
直連型交換機(jī)(direct exchange)是根據(jù)消息攜帶的路由鍵(routing key)將消息投遞給對(duì)應(yīng)隊(duì)列的灵临。直連交換機(jī)用來(lái)處理消息的單播路由(unicast routing)(盡管它也可以處理多播路由)。下邊介紹它是如何工作的:
- 將一個(gè)隊(duì)列綁定到某個(gè)交換機(jī)上擦酌,同時(shí)賦予該綁定一個(gè)路由鍵(routing key)
- 當(dāng)一個(gè)攜帶著路由鍵為
R
的消息被發(fā)送給直連交換機(jī)時(shí)俱诸,交換機(jī)會(huì)把它路由給綁定值同樣為R
的隊(duì)列。
直連交換機(jī)經(jīng)常用來(lái)循環(huán)分發(fā)任務(wù)給多個(gè)工作者(workers)赊舶。當(dāng)這樣做的時(shí)候睁搭,我們需要明白一點(diǎn),在AMQP 0-9-1中笼平,消息的負(fù)載均衡是發(fā)生在消費(fèi)者(consumer)之間的园骆,而不是隊(duì)列(queue)之間。
扇形交換機(jī)
扇型交換機(jī)(fanout exchange)將消息路由給綁定到它身上的所有隊(duì)列寓调,而不理會(huì)綁定的路由鍵锌唾。如果N個(gè)隊(duì)列綁定到某個(gè)扇型交換機(jī)上,當(dāng)有消息發(fā)送給此扇型交換機(jī)時(shí)夺英,交換機(jī)會(huì)將消息的拷貝分別發(fā)送給這所有的N個(gè)隊(duì)列晌涕。扇型用來(lái)交換機(jī)處理消息的廣播路由(broadcast routing)。
因?yàn)樯刃徒粨Q機(jī)投遞消息的拷貝到所有綁定到它的隊(duì)列痛悯,所以他的應(yīng)用案例都極其相似:
- 大規(guī)模多用戶在線(MMO)游戲可以使用它來(lái)處理排行榜更新等全局事件
- 體育新聞網(wǎng)站可以用它來(lái)近乎實(shí)時(shí)地將比分更新分發(fā)給移動(dòng)客戶端
- 分發(fā)系統(tǒng)使用它來(lái)廣播各種狀態(tài)和配置更新
-
在群聊的時(shí)候余黎,它被用來(lái)分發(fā)消息給參與群聊的用戶。(AMQP沒(méi)有內(nèi)置presence的概念载萌,因此XMPP可能會(huì)是個(gè)更好的選擇)exchange-fanout.png
主題交換機(jī)
*
代表一個(gè)單詞
#
代表0個(gè)或多個(gè)單詞
主題交換機(jī)(topic exchanges)通過(guò)對(duì)消息的路由鍵和隊(duì)列到交換機(jī)的綁定模式之間的匹配惧财,將消息路由給一個(gè)或多個(gè)隊(duì)列。主題交換機(jī)經(jīng)常用來(lái)實(shí)現(xiàn)各種分發(fā)/訂閱模式及其變種扭仁。主題交換機(jī)通常用來(lái)實(shí)現(xiàn)消息的多播路由(multicast routing)垮衷。
主題交換機(jī)擁有非常廣泛的用戶案例。無(wú)論何時(shí)乖坠,當(dāng)一個(gè)問(wèn)題涉及到那些想要有針對(duì)性的選擇需要接收消息的 多消費(fèi)者/多應(yīng)用(multiple consumers/applications) 的時(shí)候搀突,主題交換機(jī)都可以被列入考慮范圍。
使用案例:
- 分發(fā)有關(guān)于特定地理位置的數(shù)據(jù)瓤帚,例如銷售點(diǎn)
- 由多個(gè)工作者(workers)完成的后臺(tái)任務(wù)描姚,每個(gè)工作者負(fù)責(zé)處理某些特定的任務(wù)
- 股票價(jià)格更新(以及其他類型的金融數(shù)據(jù)更新)
- 涉及到分類或者標(biāo)簽的新聞更新(例如涩赢,針對(duì)特定的運(yùn)動(dòng)項(xiàng)目或者隊(duì)伍)
- 云端的不同種類服務(wù)的協(xié)調(diào)
-
分布式架構(gòu)/基于系統(tǒng)的軟件封裝戈次,其中每個(gè)構(gòu)建者僅能處理一個(gè)特定的架構(gòu)或者系統(tǒng)轩勘。topic.png
頭交換機(jī)
有時(shí)消息的路由操作會(huì)涉及到多個(gè)屬性,此時(shí)使用消息頭就比用路由鍵更容易表達(dá)怯邪,頭交換機(jī)(headers exchange)就是為此而生的绊寻。頭交換機(jī)使用多個(gè)消息屬性來(lái)代替路由鍵建立路由規(guī)則。通過(guò)判斷消息頭的值能否與指定的綁定相匹配來(lái)確立路由規(guī)則悬秉。
我們可以綁定一個(gè)隊(duì)列到頭交換機(jī)上澄步,并給他們之間的綁定使用多個(gè)用于匹配的頭(header)。這個(gè)案例中和泌,消息代理得從應(yīng)用開發(fā)者那兒取到更多一段信息村缸,換句話說(shuō),它需要考慮某條消息(message)是需要部分匹配還是全部匹配武氓。上邊說(shuō)的“更多一段消息”就是"x-match"參數(shù)梯皿。當(dāng)"x-match"設(shè)置為“any”時(shí),消息頭的任意一個(gè)值被匹配就可以滿足條件县恕,而當(dāng)"x-match"設(shè)置為“all”的時(shí)候东羹,就需要消息頭的所有值都匹配成功。
頭交換機(jī)可以視為直連交換機(jī)的另一種表現(xiàn)形式忠烛。頭交換機(jī)能夠像直連交換機(jī)一樣工作属提,不同之處在于頭交換機(jī)的路由規(guī)則是建立在頭屬性值之上,而不是路由鍵美尸。路由鍵必須是一個(gè)字符串冤议,而頭屬性值則沒(méi)有這個(gè)約束,它們甚至可以是整數(shù)或者哈希值(字典)等师坎。
隊(duì)列
AMQP中的隊(duì)列(queue)跟其他消息隊(duì)列或任務(wù)隊(duì)列中的隊(duì)列是很相似的:它們存儲(chǔ)著即將被應(yīng)用消費(fèi)掉的消息恕酸。隊(duì)列跟交換機(jī)共享某些屬性,但是隊(duì)列也有一些另外的屬性屹耐。
- Name
- Durable(消息代理重啟后尸疆,隊(duì)列依舊存在)
- Exclusive(只被一個(gè)連接(connection)使用,而且當(dāng)連接關(guān)閉后隊(duì)列即被刪除)
- Auto-delete(當(dāng)最后一個(gè)消費(fèi)者退訂后即被刪除)
- Arguments(一些消息代理用他來(lái)完成類似與TTL的某些額外功能)
隊(duì)列在聲明(declare)后才能被使用惶岭。如果一個(gè)隊(duì)列尚不存在寿弱,聲明一個(gè)隊(duì)列會(huì)創(chuàng)建它。如果聲明的隊(duì)列已經(jīng)存在按灶,并且屬性完全相同症革,那么此次聲明不會(huì)對(duì)原有隊(duì)列產(chǎn)生任何影響。如果聲明中的屬性與已存在隊(duì)列的屬性有差異鸯旁,那么一個(gè)錯(cuò)誤代碼為406的通道級(jí)異常就會(huì)被拋出噪矛。
隊(duì)列參數(shù)
- x-message-ttl 發(fā)送到隊(duì)列的消息在丟棄之前可以存活多長(zhǎng)時(shí)間(毫秒)量蕊。
- x-expires 隊(duì)列在被自動(dòng)刪除(毫秒)之前可以使用多長(zhǎng)時(shí)間。
- x-max-length 隊(duì)列在開始從頭部刪除之前可以包含多少就緒消息艇挨。
- x-max-length-bytes 隊(duì)列在開始從頭部刪除之前可以包含的就緒消息的總體大小残炮。
- x-dead-letter-exchange 設(shè)置隊(duì)列溢出行為。這決定了在達(dá)到隊(duì)列的最大長(zhǎng)度時(shí)消息會(huì)發(fā)生什么缩滨。有效值為drop-head或reject-publish势就。交換的可選名稱,如果消息被拒絕或過(guò)期脉漏,將重新發(fā)布這些名稱苞冯。
- x-dead-letter-routing-key 可選的替換路由密鑰,用于在消息以字母為單位時(shí)使用侧巨。如果未設(shè)置舅锄,將使用消息的原始路由密鑰。
- x-max-priority 隊(duì)列支持的最大優(yōu)先級(jí)數(shù);如果未設(shè)置司忱,隊(duì)列將不支持消息優(yōu)先級(jí)皇忿。
- x-queue-mode 將隊(duì)列設(shè)置為延遲模式,在磁盤上保留盡可能多的消息以減少內(nèi)存使用;如果未設(shè)置烘贴,隊(duì)列將保留內(nèi)存緩存以盡快傳遞消息禁添。
- x-queue-master-locator 將隊(duì)列設(shè)置為主位置模式,確定在節(jié)點(diǎn)集群上聲明時(shí)隊(duì)列主機(jī)所在的規(guī)則桨踪。
隊(duì)列名稱
隊(duì)列的名字可以由應(yīng)用(application)來(lái)取老翘,也可以讓消息代理(broker)直接生成一個(gè)。隊(duì)列的名字可以是最多255字節(jié)的一個(gè)utf-8字符串锻离。若希望AMQP消息代理生成隊(duì)列名铺峭,需要給隊(duì)列的name參數(shù)賦值一個(gè)空字符串:在同一個(gè)通道(channel)的后續(xù)的方法(method)中,我們可以使用空字符串來(lái)表示之前生成的隊(duì)列名稱汽纠。之所以之后的方法可以獲取正確的隊(duì)列名是因?yàn)橥ǖ揽梢阅赜涀∠⒋碜詈笠淮紊傻年?duì)列名稱卫键。
以"amq."開始的隊(duì)列名稱被預(yù)留做消息代理內(nèi)部使用。如果試圖在隊(duì)列聲明時(shí)打破這一規(guī)則的話虱朵,一個(gè)通道級(jí)的403 (ACCESS_REFUSED)錯(cuò)誤會(huì)被拋出莉炉。
隊(duì)列持久化
持久化隊(duì)列(Durable queues)會(huì)被存儲(chǔ)在磁盤上,當(dāng)消息代理(broker)重啟的時(shí)候碴犬,它依舊存在絮宁。沒(méi)有被持久化的隊(duì)列稱作暫存隊(duì)列(Transient queues)。并不是所有的場(chǎng)景和案例都需要將隊(duì)列持久化服协。
持久化的隊(duì)列并不會(huì)使得路由到它的消息也具有持久性绍昂。倘若消息代理掛掉了,重新啟動(dòng),那么在重啟的過(guò)程中持久化隊(duì)列會(huì)被重新聲明窘游,無(wú)論怎樣唠椭,只有經(jīng)過(guò)持久化的消息才能被重新恢復(fù)。
綁定
綁定(Binding)是交換機(jī)(exchange)將消息(message)路由給隊(duì)列(queue)所需遵循的規(guī)則忍饰。如果要指示交換機(jī)“E”將消息路由給隊(duì)列“Q”贪嫂,那么“Q”就需要與“E”進(jìn)行綁定。綁定操作需要定義一個(gè)可選的路由鍵(routing key)屬性給某些類型的交換機(jī)喘批。路由鍵的意義在于從發(fā)送給交換機(jī)的眾多消息中選擇出某些消息撩荣,將其路由給綁定的隊(duì)列铣揉。
打個(gè)比方:
- 隊(duì)列(queue)是我們想要去的位于紐約的目的地
- 交換機(jī)(exchange)是JFK機(jī)場(chǎng)
- 綁定(binding)就是JFK機(jī)場(chǎng)到目的地的路線饶深。能夠到達(dá)目的地的路線可以是一條或者多條
擁有了交換機(jī)這個(gè)中間層,很多由發(fā)布者直接到隊(duì)列難以實(shí)現(xiàn)的路由方案能夠得以實(shí)現(xiàn)逛拱,并且避免了應(yīng)用開發(fā)者的許多重復(fù)勞動(dòng)敌厘。
如果AMQP的消息無(wú)法路由到隊(duì)列(例如,發(fā)送到的交換機(jī)沒(méi)有綁定隊(duì)列)朽合,消息會(huì)被就地銷毀或者返還給發(fā)布者俱两。如何處理取決于發(fā)布者設(shè)置的消息屬性。
消費(fèi)者
消息如果只是存儲(chǔ)在隊(duì)列里是沒(méi)有任何用處的曹步。被應(yīng)用消費(fèi)掉宪彩,消息的價(jià)值才能夠體現(xiàn)。在AMQP 0-9-1 模型中讲婚,有兩種途徑可以達(dá)到此目的:
- 將消息投遞給應(yīng)用 ("push API")
- 應(yīng)用根據(jù)需要主動(dòng)獲取消息 ("pull API")
使用push API尿孔,應(yīng)用(application)需要明確表示出它在某個(gè)特定隊(duì)列里所感興趣的,想要消費(fèi)的消息筹麸。如是活合,我們可以說(shuō)應(yīng)用注冊(cè)了一個(gè)消費(fèi)者,或者說(shuō)訂閱了一個(gè)隊(duì)列物赶。一個(gè)隊(duì)列可以注冊(cè)多個(gè)消費(fèi)者白指,也可以注冊(cè)一個(gè)獨(dú)享的消費(fèi)者(當(dāng)獨(dú)享消費(fèi)者存在時(shí),其他消費(fèi)者即被排除在外)酵紫。
每個(gè)消費(fèi)者(訂閱者)都有一個(gè)叫做消費(fèi)者標(biāo)簽的標(biāo)識(shí)符告嘲。它可以被用來(lái)退訂消息。消費(fèi)者標(biāo)簽實(shí)際上是一個(gè)字符串奖地。
消息確認(rèn)
消費(fèi)者應(yīng)用(Consumer applications) - 用來(lái)接受和處理消息的應(yīng)用 - 在處理消息的時(shí)候偶爾會(huì)失敗或者有時(shí)會(huì)直接崩潰掉橄唬。而且網(wǎng)絡(luò)原因也有可能引起各種問(wèn)題。這就給我們出了個(gè)難題鹉动,AMQP代理在什么時(shí)候刪除消息才是正確的轧坎?AMQP 0-9-1 規(guī)范給我們兩種建議:
- 當(dāng)消息代理(broker)將消息發(fā)送給應(yīng)用后立即刪除。(使用AMQP方法:basic.deliver或basic.get-ok)
- 待應(yīng)用(application)發(fā)送一個(gè)確認(rèn)回執(zhí)(acknowledgement)后再刪除消息泽示。(使用AMQP方法:basic.ack)
前者被稱作自動(dòng)確認(rèn)模式(automatic acknowledgement model)缸血,后者被稱作顯式確認(rèn)模式(explicit acknowledgement model)蜜氨。在顯式模式下,由消費(fèi)者應(yīng)用來(lái)選擇什么時(shí)候發(fā)送確認(rèn)回執(zhí)(acknowledgement)捎泻。應(yīng)用可以在收到消息后立即發(fā)送飒炎,或?qū)⑽刺幚淼南⒋鎯?chǔ)后發(fā)送,或等到消息被處理完畢后再發(fā)送確認(rèn)回執(zhí)(例如笆豁,成功獲取一個(gè)網(wǎng)頁(yè)內(nèi)容并將其存儲(chǔ)之后)郎汪。
如果一個(gè)消費(fèi)者在尚未發(fā)送確認(rèn)回執(zhí)的情況下掛掉了,那AMQP代理會(huì)將消息重新投遞給另一個(gè)消費(fèi)者闯狱。如果當(dāng)時(shí)沒(méi)有可用的消費(fèi)者了煞赢,消息代理會(huì)死等下一個(gè)注冊(cè)到此隊(duì)列的消費(fèi)者,然后再次嘗試投遞哄孤。
拒絕消息
當(dāng)一個(gè)消費(fèi)者接收到某條消息后照筑,處理過(guò)程有可能成功,有可能失敗瘦陈。應(yīng)用可以向消息代理表明凝危,本條消息由于“拒絕消息(Rejecting Messages)”的原因處理失敗了(或者未能在此時(shí)完成)。當(dāng)拒絕某條消息時(shí)晨逝,應(yīng)用可以告訴消息代理如何處理這條消息——銷毀它或者重新放入隊(duì)列蛾默。當(dāng)此隊(duì)列只有一個(gè)消費(fèi)者時(shí),請(qǐng)確認(rèn)不要由于拒絕消息并且選擇了重新放入隊(duì)列的行為而引起消息在同一個(gè)消費(fèi)者身上無(wú)限循環(huán)的情況發(fā)生捉貌。
否定確認(rèn)
在AMQP中支鸡,basic.reject方法用來(lái)執(zhí)行拒絕消息的操作。但basic.reject有個(gè)限制:你不能使用它決絕多個(gè)帶有確認(rèn)回執(zhí)(acknowledgements)的消息昏翰。但是如果你使用的是RabbitMQ苍匆,那么你可以使用被稱作negative acknowledgements(也叫nacks)
預(yù)取消息
在多個(gè)消費(fèi)者共享一個(gè)隊(duì)列的案例中,明確指定在收到下一個(gè)確認(rèn)回執(zhí)前每個(gè)消費(fèi)者一次可以接受多少條消息是非常有用的棚菊。這可以在試圖批量發(fā)布消息的時(shí)候起到簡(jiǎn)單的負(fù)載均衡和提高消息吞吐量的作用浸踩。
注意,RabbitMQ只支持通道級(jí)的預(yù)取計(jì)數(shù)统求,而不是連接級(jí)的或者基于大小的預(yù)取检碗。
配置策略policies
導(dǎo)航對(duì)管理員>策略>添加/更新操作員策略。
在名稱“ ^ amq \”旁邊輸入“ transient-queue-ttl”码邻。模式旁邊的折剃,然后選擇應(yīng)用到旁邊的“隊(duì)列”。
在策略旁邊的第一行中輸入“ expires” = 1800000像屋。
單擊添加策略怕犁。
消息屬性和有效載荷(消息主題)
AMQP模型中的消息(Message)對(duì)象是帶有屬性(Attributes)的。有些屬性及其常見(jiàn),以至于AMQP 0-9-1 明確的定義了它們奏甫,并且應(yīng)用開發(fā)者們無(wú)需費(fèi)心思思考這些屬性名字所代表的具體含義戈轿。例如:
- Content type(內(nèi)容類型)
- Content encoding(內(nèi)容編碼)
- Routing key(路由鍵)
- Delivery mode (persistent or not)
投遞模式(持久化 或 非持久化) - Message priority(消息優(yōu)先權(quán))
- Message publishing timestamp(消息發(fā)布的時(shí)間戳)
- Expiration period(消息有效期)
- Publisher application id(發(fā)布應(yīng)用的ID)
有些屬性是被AMQP代理所使用的,但是大多數(shù)是開放給接收它們的應(yīng)用解釋器用的阵子。有些屬性是可選的也被稱作消息頭(headers)思杯。他們跟HTTP協(xié)議的X-Headers很相似。消息屬性需要在消息被發(fā)布的時(shí)候定義挠进。
AMQP的消息除屬性外色乾,也含有一個(gè)有效載荷 - Payload(消息實(shí)際攜帶的數(shù)據(jù)),它被AMQP代理當(dāng)作不透明的字節(jié)數(shù)組來(lái)對(duì)待领突。消息代理不會(huì)檢查或者修改有效載荷暖璧。消息可以只包含屬性而不攜帶有效載荷。它通常會(huì)使用類似JSON這種序列化的格式數(shù)據(jù)攘须,為了節(jié)省漆撞,協(xié)議緩沖器和MessagePack將結(jié)構(gòu)化數(shù)據(jù)序列化,以便以消息的有效載荷的形式發(fā)布于宙。AMQP及其同行者們通常使用"content-type" 和 "content-encoding" 這兩個(gè)字段來(lái)與消息溝通進(jìn)行有效載荷的辨識(shí)工作,但這僅僅是基于約定而已悍汛。
消息能夠以持久化的方式發(fā)布捞魁,AMQP代理會(huì)將此消息存儲(chǔ)在磁盤上。如果服務(wù)器重啟离咐,系統(tǒng)會(huì)確認(rèn)收到的持久化消息未丟失谱俭。簡(jiǎn)單地將消息發(fā)送給一個(gè)持久化的交換機(jī)或者路由給一個(gè)持久化的隊(duì)列,并不會(huì)使得此消息具有持久化性質(zhì):它完全取決與消息本身的持久模式(persistence mode)宵蛀。將消息以持久化方式發(fā)布時(shí)昆著,會(huì)對(duì)性能造成一定的影響(就像數(shù)據(jù)庫(kù)操作一樣,健壯性的存在必定造成一些性能犧牲)术陶。
消息確認(rèn)
由于網(wǎng)絡(luò)的不確定性和應(yīng)用失敗的可能性凑懂,處理確認(rèn)回執(zhí)(acknowledgement)就變的十分重要。有時(shí)我們確認(rèn)消費(fèi)者收到消息就可以了梧宫,有時(shí)確認(rèn)回執(zhí)意味著消息已被驗(yàn)證并且處理完畢接谨,例如對(duì)某些數(shù)據(jù)已經(jīng)驗(yàn)證完畢并且進(jìn)行了數(shù)據(jù)存儲(chǔ)或者索引操作。
這種情形很常見(jiàn)塘匣,所以 AMQP 0-9-1 內(nèi)置了一個(gè)功能叫做 消息確認(rèn)(message acknowledgements)脓豪,消費(fèi)者用它來(lái)確認(rèn)消息已經(jīng)被接收或者處理。如果一個(gè)應(yīng)用崩潰掉(此時(shí)連接會(huì)斷掉忌卤,所以AMQP代理亦會(huì)得知)扫夜,而且消息的確認(rèn)回執(zhí)功能已經(jīng)被開啟,但是消息代理尚未獲得確認(rèn)回執(zhí),那么消息會(huì)被從新放入隊(duì)列(并且在還有還有其他消費(fèi)者存在于此隊(duì)列的前提下笤闯,立即投遞給另外一個(gè)消費(fèi)者)现拒。
協(xié)議內(nèi)置的消息確認(rèn)功能將幫助開發(fā)者建立強(qiáng)大的軟件。
AMQP 0.9.1方法
讓我們來(lái)看看交換機(jī)類望侈,有一組方法被關(guān)聯(lián)到了交換機(jī)的操作上印蔬。這些方法如下所示:
- exchange.declare
- exchange.declare-ok
- exchange.delete
- exchange.delete-ok
(請(qǐng)注意,RabbitMQ網(wǎng)站參考中包含了特用于RabbitMQ的交換機(jī)類的擴(kuò)展脱衙,這里我們不對(duì)其進(jìn)行討論)
以上的操作來(lái)自邏輯上的配對(duì):exchange.declare 和 exchange.declare-ok侥猬,exchange.delete 和 exchange.delete-ok. 這些操作分為“請(qǐng)求 - requests”(由客戶端發(fā)送)和“響應(yīng) - responses”(由代理發(fā)送,用來(lái)回應(yīng)之前提到的“請(qǐng)求”操作)捐韩。
如下的例子:客戶端要求消息代理使用exchange.declare方法聲明一個(gè)新的交換機(jī):
如上圖所示退唠,exchange.declare方法攜帶了好幾個(gè)參數(shù)。這些參數(shù)可以允許客戶端指定交換機(jī)名稱荤胁、類型瞧预、是否持久化等等。
操作成功后仅政,消息代理使用exchange.declare-ok方法進(jìn)行回應(yīng):
exchange.declare-ok方法除了通道號(hào)之外沒(méi)有攜帶任何其他參數(shù)(通道-channel 會(huì)在本指南稍后章節(jié)進(jìn)行介紹)垢油。
AMQP隊(duì)列類的配對(duì)方法 - queue.declare方法 和 queue.declare-ok有著與其他配對(duì)方法非常相似的一系列事件:
不是所有的AMQP方法都有與其配對(duì)的“另一半”。許多(basic.publish是最被廣泛使用的)都沒(méi)有相對(duì)應(yīng)的“響應(yīng)”方法圆丹,另外一些(如basic.get)有著一種以上與之對(duì)應(yīng)的“響應(yīng)”方法滩愁。
連接
AMQP連接通常是長(zhǎng)連接。AMQP是一個(gè)使用TCP提供可靠投遞的應(yīng)用層協(xié)議辫封。AMQP使用認(rèn)證機(jī)制并且提供TLS(SSL)保護(hù)硝枉。當(dāng)一個(gè)應(yīng)用不再需要連接到AMQP代理的時(shí)候,需要優(yōu)雅的釋放掉AMQP連接倦微,而不是直接將TCP連接關(guān)閉妻味。
通道
有些應(yīng)用需要與AMQP代理建立多個(gè)連接欣福。無(wú)論怎樣责球,同時(shí)開啟多個(gè)TCP連接都是不合適的,因?yàn)檫@樣做會(huì)消耗掉過(guò)多的系統(tǒng)資源并且使得防火墻的配置更加困難劣欢。AMQP 0-9-1提供了通道(channels)來(lái)處理多連接棕诵,可以把通道理解成共享一個(gè)TCP連接的多個(gè)輕量化連接。
在涉及多線程/進(jìn)程的應(yīng)用中凿将,為每個(gè)線程/進(jìn)程開啟一個(gè)通道(channel)是很常見(jiàn)的校套,并且這些通道不能被線程/進(jìn)程共享。
一個(gè)特定通道上的通訊與其他通道上的通訊是完全隔離的牧抵,因此每個(gè)AMQP方法都需要攜帶一個(gè)通道號(hào)笛匙,這樣客戶端就可以指定此方法是為哪個(gè)通道準(zhǔn)備的侨把。
虛擬主機(jī)
為了在一個(gè)單獨(dú)的代理上實(shí)現(xiàn)多個(gè)隔離的環(huán)境(用戶、用戶組妹孙、交換機(jī)秋柄、隊(duì)列 等),AMQP提供了一個(gè)虛擬主機(jī)(virtual hosts - vhosts)的概念蠢正。這跟Web servers虛擬主機(jī)概念非常相似骇笔,這為AMQP實(shí)體提供了完全隔離的環(huán)境。當(dāng)連接被建立的時(shí)候嚣崭,AMQP客戶端來(lái)指定使用哪個(gè)虛擬主機(jī)笨触。
AMQP是可擴(kuò)展的
AMQP 0-9-1 擁有多個(gè)擴(kuò)展點(diǎn):
- 定制化交換機(jī)類型 可以讓開發(fā)者們實(shí)現(xiàn)一些開箱即用的交換機(jī)類型尚未很好覆蓋的路由方案。例如 geodata-based routing雹舀。
- 交換機(jī)和隊(duì)列的聲明中可以包含一些消息代理能夠用到的額外屬性芦劣。例如RabbitMQ中的per-queue message TTL即是使用該方式實(shí)現(xiàn)。
- 特定消息代理的協(xié)議擴(kuò)展说榆。例如RabbitMQ所實(shí)現(xiàn)的擴(kuò)展虚吟。
- 新的 AMQP 0-9-1 方法類可被引入。
- 消息代理可以被其他的插件擴(kuò)展签财,例如RabbitMQ的管理前端 和 已經(jīng)被插件化的HTTP API串慰。
這些特性使得AMQP 0-9-1模型更加靈活,并且能夠適用于解決更加寬泛的問(wèn)題荠卷。
AMQP 0.9.1 客戶端生態(tài)系統(tǒng)
AMQP 0-9-1 擁有眾多的適用于各種流行語(yǔ)言和框架的客戶端模庐。其中一部分嚴(yán)格遵循AMQP規(guī)范,提供AMQP方法的實(shí)現(xiàn)油宜。另一部分提供了額外的技術(shù),方便使用的方法和抽象怜姿。有些客戶端是異步的(非阻塞的),有些是同步的(阻塞的),有些將這兩者同時(shí)實(shí)現(xiàn)良姆。有些客戶端支持“供應(yīng)商的特定擴(kuò)展”(例如RabbitMQ的特定擴(kuò)展)嫩实。
因?yàn)锳MQP的主要目標(biāo)之一就是實(shí)現(xiàn)交互性,所以對(duì)于開發(fā)者來(lái)講但狭,了解協(xié)議的操作方法而不是只停留在弄懂特定客戶端的庫(kù)就顯得十分重要披诗。這樣一來(lái),開發(fā)者使用不同類型的庫(kù)與協(xié)議進(jìn)行溝通時(shí)就會(huì)容易的多立磁。
下載安裝
版本
3.7.14
查看操作系統(tǒng)信息
cat /etc/centos-release
CentOS release 6.8 (Final)
安裝rabbit-server
rabbitmq提供的erlang呈队,對(duì)第三方插件支持可能會(huì)有問(wèn)題
rpm -ivh [--force] erlang-21.3.4-1.el6.x86_64.rpm
rabbitmq
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
yum -y install rabbitmq-server
配置文件
- 配置文件 rabbitmq.conf
- 環(huán)境變量文件 rabbitmq-env.conf
- 補(bǔ)充配置文件 advanced.config
/etc/rabbitmq/ (不同安裝方式位置會(huì)有不同)https://www.rabbitmq.com/configure.html#config-location
端口配置 https://www.rabbitmq.com/install-rpm.html#ports
注意:服務(wù)器設(shè)置為以系統(tǒng)用戶rabbitmq身份運(yùn)行 。
啟動(dòng)服務(wù)器
守護(hù)程序
chkconfig rabbitmq-server on
啟動(dòng)停止
/sbin/service rabbitmq-server start
/sbin/service rabbitmq-server stop
圖形化管理插件(端口:15672)
/usr/lib/rabbitmq/bin/rabbitmq-plugins list //查看插件安裝情況
/usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management //啟用rabbitmq_management服務(wù)
rabbitmqctl常用操作
https://www.rabbitmq.com/rabbitmqctl.8.html
查詢虛擬機(jī)rabbitmqctl list_vhosts
添加虛擬機(jī)rabbitmqctl add_vhost host_name
刪除虛擬機(jī)rabbitmqctl delete_vhost host_name
查詢用戶rabbitmqctl list_users
添加用戶rabbitmqctl add_user username pwd
刪除用戶rabbitmqctl delete_user username
修改密碼rabbitmqctl change_password username newpassword
設(shè)置用戶標(biāo)簽 rabbitmqctl set_user_tags username tag
設(shè)置這個(gè)才能在頁(yè)面上登錄,tag可以為administrator, monitoring, management
設(shè)置權(quán)限rabbitmqctl set_permissions [-p <vhost>] <username> <conf> <write> <read>
權(quán)限配置包括:配置(隊(duì)列和交換機(jī)的創(chuàng)建和刪除)唱歧、寫(發(fā)布消息)宪摧、讀(有關(guān)消息的任何操作粒竖,包括清除這個(gè)隊(duì)列) 。例如:rabbitmqctl set_permissions -p host_1 linyy ".*" ".*" ".*"
conf:一個(gè)正則表達(dá)式match哪些配置資源能夠被該用戶訪問(wèn)几于。
write:一個(gè)正則表達(dá)式match哪些配置資源能夠被該用戶讀蕊苗。
read:一個(gè)正則表達(dá)式match哪些配置資源能夠被該用戶訪問(wèn)
查詢queuerabbitmqctl list_queues [-p vhost]
查詢exchangerabbitmqctl list_exchanges
查詢bindingrabbitmqctl list_bindings
linux系統(tǒng)限制
用戶打開文件最大數(shù)量(生產(chǎn)環(huán)境建議:65536)
ulimit -n
os內(nèi)核允許最大打開文件數(shù)量
cat /proc/sys/fs/file-max
日志
/var/log/rabbitmq
集群
將多臺(tái)計(jì)算機(jī)連接在一起以形成單個(gè)邏輯代理,所有節(jié)點(diǎn)必須具有相同的Erlang cookie沿彭,計(jì)算機(jī)之間的網(wǎng)絡(luò)鏈接必須可靠朽砰,必須運(yùn)行相同版本的RabbitMQ和Erlang。
虛擬主機(jī)喉刘,交換瞧柔,用戶和權(quán)限將自動(dòng)鏡像到群集中的所有節(jié)點(diǎn)。隊(duì)列可以位于單個(gè)節(jié)點(diǎn)上饱搏,也可以跨多個(gè)節(jié)點(diǎn)進(jìn)行鏡像非剃。連接到群集中任何節(jié)點(diǎn)的客戶端可以查看群集中的所有隊(duì)列,即使它們不在該節(jié)點(diǎn)上也是如此推沸。
節(jié)點(diǎn)(rabbit@主機(jī)名)
所有節(jié)點(diǎn)都是對(duì)等的备绽,只有queue有主子節(jié)點(diǎn)之分。
打破集群 rabbitmqctl reset
默認(rèn)情況下鬓催,如果隊(duì)列的主節(jié)點(diǎn)出現(xiàn)故障肺素,與其對(duì)等節(jié)點(diǎn)斷開連接或從群集中刪除,則最舊的鏡像將被提升為新的主節(jié)點(diǎn)宇驾。在某些情況下倍靡,此鏡像可能 不同步,這將導(dǎo)致數(shù)據(jù)丟失课舍。從RabbitMQ 3.7.5開始塌西,ha-promote-on-failure 策略密鑰控制是否允許不同步的鏡像升級(jí)。設(shè)置為 when-synced時(shí)筝尾,將確保不提升未同步的鏡像捡需。使用when-synced促銷策略的系統(tǒng)必須使用 發(fā)布者確認(rèn),以便檢測(cè)隊(duì)列不可用性和代理無(wú)法排隊(duì)消息筹淫。
策略policy
一般3節(jié)點(diǎn)2個(gè)計(jì)數(shù)站辉,5節(jié)點(diǎn)3個(gè)計(jì)數(shù)
ha-mode | ha-params | result |
---|---|---|
exactly | count | 1意味著只有一個(gè)主隊(duì)列。2意味著1個(gè)主隊(duì)列+1個(gè)鏡像隊(duì)列损姜。如果集群中的節(jié)點(diǎn)數(shù)少于計(jì)數(shù)饰剥,則隊(duì)列鏡像到所有節(jié)點(diǎn)。如果急群眾節(jié)點(diǎn)數(shù)多余計(jì)數(shù)摧阅,并且包含鏡像的節(jié)點(diǎn)關(guān)閉汰蓉,則將在另一個(gè)節(jié)點(diǎn)上創(chuàng)建鏡像,使用具有"ha-promote-on-shutdown"的exactly模式逸尖,"always"可能是危險(xiǎn)的古沥,因?yàn)殛?duì)列可以在集群中遷移并在關(guān)閉時(shí)變得不同步瘸右。(3.7.5版本后,可以用ha-promote-on-failure的when-synced) |
all | (none) | 隊(duì)列在群集中的所有節(jié)點(diǎn)上進(jìn)行鏡像岩齿。將新節(jié)點(diǎn)添加到群集后太颤,該隊(duì)列將鏡像到該節(jié)點(diǎn)。這個(gè)設(shè)置非常保守盹沈。鏡像群集節(jié)點(diǎn)的法定數(shù)量(N / 2 + 1) 龄章,而不是推薦。鏡像到所有節(jié)點(diǎn)將給所有群集節(jié)點(diǎn)帶來(lái)額外的壓力乞封,包括網(wǎng)絡(luò)I / O,磁盤I / O和磁盤空間使用吧碾。 |
nodes | node names | 隊(duì)列鏡像到節(jié)點(diǎn)名稱中列出的節(jié)點(diǎn)睁本。節(jié)點(diǎn)名稱是出現(xiàn)在rabbitmqctl cluster_status中的Erlang節(jié)點(diǎn)名稱; 它們通常具有“ rabbit @ hostname ” 的形式锐借。如果這些節(jié)點(diǎn)名稱中的任何一個(gè)不是群集的一部分哮笆,則這不構(gòu)成錯(cuò)誤滑黔。如果在聲明隊(duì)列時(shí)列表中沒(méi)有任何節(jié)點(diǎn)處于聯(lián)機(jī)狀態(tài)通今,則將在聲明客戶端所連接的節(jié)點(diǎn)上創(chuàng)建隊(duì)列璃氢。 |
發(fā)布者確認(rèn)
要啟用確認(rèn),客戶端將發(fā)送 confirm.select方法舆蝴。根據(jù)是否 設(shè)置了無(wú)等待谦絮,代理可以使用confirm.select-ok進(jìn)行響應(yīng)。一旦在 頻道上使用confirm.select方法洁仗,就會(huì)說(shuō)它處于確認(rèn)模式层皱。事務(wù)渠道不能進(jìn)入確認(rèn)模式,一旦渠道處于確認(rèn)模式赠潦,就不能進(jìn)行事務(wù)叫胖。
什么時(shí)候發(fā)布的消息會(huì)被broker確認(rèn)
對(duì)于可路由消息,當(dāng)所有隊(duì)列都接受消息時(shí)她奥,將發(fā)送basic.ack瓮增。對(duì)于路由到持久隊(duì)列的持久性消息怎棱,這意味著持久化到磁盤。對(duì)于鏡像隊(duì)列绷跑,這意味著所有鏡像都已接受該消息拳恋。
永久消息的ack延遲
在將消息持久保存到磁盤后,將發(fā)送路由到持久隊(duì)列的持久消息的basic.ack你踩。RabbitMQ消息存儲(chǔ)在一段時(shí)間(幾百毫秒)之后批量傳遞消息到磁盤诅岩,以最小化fsync(2)調(diào)用的數(shù)量,或者當(dāng)隊(duì)列空閑時(shí)带膜。這意味著在恒定負(fù)載下吩谦,basic.ack的延遲 可以達(dá)到幾百毫秒。為了提高吞吐量膝藕,強(qiáng)烈建議應(yīng)用程序異步處理確認(rèn)(作為流)或發(fā)布批量消息并等待未完成的確認(rèn)式廷。確切的API因客戶端庫(kù)而異。
發(fā)布者返回(退回模式:未投遞到queue):
對(duì)于返回的消息芭挽,mandatory必須將模板的屬性設(shè)置為true或者mandatory-expression 必須將其評(píng)估true為特定消息滑废。
此功能需要將CachingConnectionFactory其publisherReturns屬性設(shè)置為true。
RabbitTemplate.setReturnCallback通過(guò)調(diào)用進(jìn)行注冊(cè)袜爪,將返回值發(fā)送給客戶端setReturnCallback(ReturnCallback callback)蠕趁。
回調(diào)必須實(shí)現(xiàn)以下方法void returnedMessage(Message message, int replyCode, String replyText,String exchange, String routingKey);
ReturnCallback每個(gè)僅支持一個(gè)RabbitTemplate
發(fā)布者確認(rèn)(確認(rèn)模式:未投遞到exchange):
模板需要CachingConnectionFactory其publisherConfirms屬性設(shè)置為的true。
RabbitTemplate.ConfirmCallback通過(guò)調(diào)用確認(rèn)辛馆,發(fā)送確認(rèn)給客戶端setConfirmCallback(ConfirmCallback callback)俺陋。
回調(diào)必須實(shí)現(xiàn)此方法void confirm(CorrelationData correlationData, boolean ack, String cause);
CorrelationData是發(fā)送原始消息時(shí)由客戶提供的對(duì)象。對(duì)的ack錯(cuò)誤nack昙篙。例如腊状,如果在生成時(shí)可用nack,則原因可能包含的原因:一個(gè)示例是將消息發(fā)送到不存在的交換機(jī)時(shí)苔可。在這種情況下缴挖,代理將關(guān)閉渠道。封閉的原因包括在中焚辅。
一個(gè)ConfirmCallback僅支持一個(gè)RabbitTemplate映屋。
集群搭建
搭建各主機(jī)上自己的rabbitmq
修改主機(jī)名
修改/etc/hosts
拷貝一臺(tái)/var/lib/rabbitmq/.erlang.cookie到其他,保證文件內(nèi)容一致
在RabbitMQ集群集群中同蜻,必須至少有一個(gè)磁盤節(jié)點(diǎn)秧荆,否則隊(duì)列元數(shù)據(jù)無(wú)法寫入到集群中,當(dāng)磁盤節(jié)點(diǎn)宕掉時(shí)埃仪,集群將無(wú)法寫入新的隊(duì)列元數(shù)據(jù)信息。(disc陕赃,ram)
在集群搭建好之后卵蛉,需要配置鏡像策略颁股,才能同步數(shù)據(jù)。
單臺(tái)多實(shí)例(適用于開發(fā)環(huán)境)
開啟
RABBITMQ_NODE_PORT=5772 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15672}]" RABBITMQ_NODENAME=rabbit1 rabbitmq-server -detached
RABBITMQ_NODE_PORT=5773 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server -detached
檢查端口
netstat -tnlp | grep 5772
(cli端口號(hào)還開著的話:rabbitmqctl -n rabbit2 stop)
加入集群
rabbitmqctl -n rabbit2 stop_app
rabbitmqctl -n rabbit2 reset
rabbitmqctl -n rabbit2 join_cluster rabbit1@hostname -s
--ram
rabbitmqctl -n rabbit2 start_app
查看集群cluster信息
rabbitmqctl -n rabbit1 cluster_status
注意事項(xiàng)
默認(rèn)值
默認(rèn)創(chuàng)建guest/guest,只有l(wèi)ocalhost能用傻丝,生產(chǎn)環(huán)境建議刪除guest用戶增加安全性甘有。
通道并發(fā)
應(yīng)用程序應(yīng)該每個(gè)線程使用一個(gè)channel,而不是跨線程共享相同的channel葡缰。
使用通道池亏掀,可以將通道池視為特定的同步解決方案,建議使用現(xiàn)有的池庫(kù)而不是自行開發(fā)的解決方案泛释。例如滤愕,Spring AMQP 具有即用型通道池功能。
消息確認(rèn)
https://www.rabbitmq.com/confirms.html
確認(rèn)模式和事務(wù)不能同時(shí)使用怜校。
ack的機(jī)制會(huì)觸發(fā)消息重復(fù)消費(fèi)的( 一般出在任務(wù)超時(shí)间影,或者沒(méi)有及時(shí)返回狀態(tài),引起任務(wù)重新入隊(duì)列茄茁,重新消費(fèi)魂贬。在rabbtimq里連接的斷開也會(huì)觸發(fā)消息重新入隊(duì)列)。消費(fèi)任務(wù)類型最好要支持冪等性裙顽,這樣的好處是 任務(wù)執(zhí)行多少次都沒(méi)關(guān)系付燥,頂多消耗一些性能! 如果不支持冪等愈犹,比如發(fā)送信息键科? 那么需要構(gòu)建一個(gè)map來(lái)記錄任務(wù)的執(zhí)行情況! 不僅僅是成功和失敗甘萧,還要有心跳B茑摇!扬卷! 這個(gè)map在消費(fèi)端實(shí)現(xiàn)就可以了Q姥浴!怪得! 這里會(huì)出現(xiàn)一個(gè)問(wèn)題咱枉,有兩個(gè)消費(fèi)者 c1, c2 ,一個(gè)任務(wù)有可能被c1消費(fèi)徒恋,如果再來(lái)一次蚕断,被c2執(zhí)行? 那么如何得知任務(wù)的情況入挣? 任務(wù)派發(fā)亿乳! 任務(wù)做成hash,固定消費(fèi)者!堅(jiān)決不要想方設(shè)法在mq擴(kuò)展這個(gè)future葛假。一句話障陶,要不保證消息冪等性,要不就用map記錄任務(wù)狀態(tài).
順序
單個(gè)queue在多消費(fèi)者下不能保證其先后順序聊训。我們遇到的大多數(shù)場(chǎng)景都不需要消息的有序的抱究,如果對(duì)于消息順序敏感,那么我們這里給出的方法是 消息體通過(guò)hash分派到隊(duì)列里带斑,每個(gè)隊(duì)列對(duì)應(yīng)一個(gè)消費(fèi)者鼓寺,多分拆隊(duì)列。為什么要這么設(shè)計(jì)勋磕? 同一組的任務(wù)會(huì)被分配到同一個(gè)隊(duì)列里妈候,每個(gè)隊(duì)列只能有一個(gè)worker來(lái)消費(fèi),這樣避免了同一個(gè)隊(duì)列多個(gè)消費(fèi)者消費(fèi)時(shí)朋凉,亂序的可能州丹! t1, t2 兩個(gè)任務(wù), t1 雖然被c1先pop了杂彭,但是有可能c2先把 t2 任務(wù)給完成了墓毒。一句話,主動(dòng)去分配隊(duì)列亲怠,單個(gè)消費(fèi)者所计。
冪等性
一個(gè)任務(wù)消費(fèi)者消費(fèi)多次,結(jié)果是一致的团秽。
消息重復(fù)消費(fèi)原因:
1主胧、生產(chǎn)者已把消息發(fā)送到mq,在mq給生產(chǎn)者返回ack的時(shí)候網(wǎng)絡(luò)中斷习勤,故生產(chǎn)者未收到確定信息踪栋,生產(chǎn)者認(rèn)為消息未發(fā)送成功,但實(shí)際情況是图毕,mq已成功接收到了消息夷都,在網(wǎng)絡(luò)重連后,生產(chǎn)者會(huì)重新發(fā)送剛才的消息予颤,造成mq接收了重復(fù)的消息
2囤官、消費(fèi)者在消費(fèi)mq中的消息時(shí),mq已把消息發(fā)送給消費(fèi)者蛤虐,消費(fèi)者在給mq返回ack時(shí)網(wǎng)絡(luò)中斷党饮,故mq未收到確認(rèn)信息,該條消息會(huì)重新發(fā)給其他的消費(fèi)者驳庭,或者在網(wǎng)絡(luò)重連后再次發(fā)送給該消費(fèi)者刑顺,但實(shí)際上該消費(fèi)者已成功消費(fèi)了該條消息,造成消費(fèi)者消費(fèi)了重復(fù)的消息;
解決方案:1.唯一 ID + 指紋碼 機(jī)制捏检;2.利用 redis 的原子性去實(shí)現(xiàn)(mq會(huì)生成一個(gè)唯一id荞驴,或者自己實(shí)現(xiàn)唯一id。第1個(gè)方案需要根據(jù)數(shù)據(jù)庫(kù)的主鍵不重復(fù)來(lái)實(shí)現(xiàn)贯城,數(shù)據(jù)庫(kù)要做分庫(kù)分表。)
延遲隊(duì)列
實(shí)現(xiàn)方法:死信交換機(jī)+消息存活時(shí)間TTL霹娄,不能讓這個(gè)隊(duì)列里面的消息被接受到能犯,否則消息一旦被消費(fèi),就不存在過(guò)期了
作用:定時(shí)任務(wù)(延遲一定時(shí)間執(zhí)行)
進(jìn)入死信交換機(jī)條件:1.消息被consumer拒收犬耻,并且reject方法參數(shù)里requeue是false踩晶。2.消息過(guò)期,即TTL時(shí)間到了枕磁。3.隊(duì)列的長(zhǎng)度限制滿了渡蜻。
整體設(shè)置:x-message-ttl參數(shù),如果整個(gè)隊(duì)列的消息都是相同的计济,可以設(shè)置
單獨(dú)設(shè)置:messageProperties.setExpiration("6000");
x-dead-letter-exchange代表消息過(guò)期后茸苇,消息要進(jìn)入的交換機(jī)
x-dead-letter-routing-key是配置消息過(guò)期后,進(jìn)入死信交換機(jī)的routing-key
spring-amqp
版本
2.1.5.RELEASE
兼容性
Spring Framework的最小版本依賴性是5.1.x.
最小amqp-client
Java客戶端庫(kù)版本為5.4.0沦寂。
代碼
ParameterizedTypeReference 用于復(fù)雜的參數(shù)轉(zhuǎn)換
重試功能学密,可以配置RabbitTemplate
使用RetryTemplate
來(lái)幫助處理代理連接問(wèn)題
RabbitMQ介紹
開源,支持協(xié)議多传藏,重量級(jí)腻暮,路由、負(fù)載均衡毯侦、數(shù)據(jù)持久化支持好
作用
是一個(gè)消息系統(tǒng)哭靖,允許軟件、應(yīng)用相互連接和擴(kuò)展侈离,從而組成一個(gè)更大的應(yīng)用试幽,或者將用戶設(shè)備和數(shù)據(jù)進(jìn)行連接,消息系統(tǒng)通過(guò)將消息的發(fā)送和接收分離來(lái)實(shí)現(xiàn)應(yīng)用程序的異步和解偶霍狰。
進(jìn)行數(shù)據(jù)投遞抡草,非阻塞操作或推送通知。實(shí)現(xiàn)發(fā)布/訂閱蔗坯,或者工作隊(duì)列康震。
思想
避免立即執(zhí)行資源密集型任務(wù)并且必須等待它完成。相反宾濒,我們安排任務(wù)稍后完成腿短。我們將任務(wù)封裝 為消息并將其發(fā)送到隊(duì)列。在后臺(tái)運(yùn)行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)。
技術(shù)亮點(diǎn)
可靠性
持久性機(jī)制橘忱,投遞確認(rèn)赴魁,發(fā)布者證實(shí),高可用性機(jī)制
靈活的路由
提供多種內(nèi)置交換機(jī)钝诚,可以組合起來(lái)使用颖御,甚至可以實(shí)現(xiàn)自己的交換機(jī)類型
集群
多個(gè)rabbitMq聚合成一個(gè)獨(dú)立的邏輯代理
聯(lián)合
對(duì)于服務(wù)器來(lái)說(shuō),它比集群需要更多的松散和非可靠鏈接凝颇。為此RabbitMQ提供了聯(lián)合模型潘拱。
高可用的隊(duì)列
在同一個(gè)集群里,隊(duì)列可以被鏡像到多個(gè)機(jī)器中拧略,以確保當(dāng)其中某些硬件出現(xiàn)故障后芦岂,你的消息仍然安全。
多協(xié)議
支持多種消息協(xié)議的消息傳遞垫蛆,AMQP 0-9-1,0-9和0-8禽最,以及擴(kuò)展,STOMP袱饭,MQTT川无,AMQP 1.0,HTTP
廣泛的客戶端
只要是你能想到的編程語(yǔ)言幾乎都有與其相適配的RabbitMQ客戶端宁赤。
可視化管理
RabbitMQ附帶了一個(gè)易于使用的可視化管理工具舀透,它可以幫助你監(jiān)控消息代理的每一個(gè)環(huán)節(jié)。
追蹤
如果你的消息系統(tǒng)有異常行為决左,RabbitMQ還提供了追蹤的支持愕够,讓你能夠發(fā)現(xiàn)問(wèn)題所在。
插件系統(tǒng)
RabbitMQ附帶了各種各樣的插件來(lái)對(duì)自己進(jìn)行擴(kuò)展佛猛。你甚至也可以寫自己的插件來(lái)使用惑芭。