1.什么是RabbitMQ
RabbitMQ是一個(gè)由erlang開發(fā)的AMQP(Advanced Message Queue )的開源實(shí)現(xiàn)纺且。AMQP 的出現(xiàn)其實(shí)也是應(yīng)了廣大人民群眾的需求绞吁,雖然在同步消息通訊的世界里有很多公開標(biāo)準(zhǔn)(如 COBAR的 IIOP 狰住,或者是 SOAP 等)灭袁,但是在異步消息處理中卻不是這樣伊佃,只有大企業(yè)有一些商業(yè)實(shí)現(xiàn)(如微軟的 MSMQ 驶鹉,IBM 的 Websphere MQ 等)轰驳,因此眯娱,在 2006 年的 6 月礁苗,Cisco 、Redhat徙缴、iMatix 等聯(lián)合制定了 AMQP 的公開標(biāo)準(zhǔn)试伙。 RabbitMQ是由RabbitMQ Technologies Ltd開發(fā)并且提供商業(yè)支持的。該公司在2010年4月被SpringSource(VMWare的一個(gè)部門)收購于样。在2013年5月被并入Pivotal疏叨。其實(shí)VMWare,Pivotal和EMC本質(zhì)上是一家的穿剖。不同的是VMWare是獨(dú)立上市子公司蚤蔓,而Pivotal是整合了EMC的某些資源,現(xiàn)在并沒有上市糊余。 RabbitMQ的官網(wǎng)是http://www.rabbitmq.com 百度百科amqp協(xié)議介紹https://baike.baidu.com/item/AMQP/8354716?fr=aladdin 注意:RabbitMQ是采用erlang語言開發(fā)的秀又,所以必須有erlang環(huán)境才可以運(yùn)行
2.為什么要使用MQ
3.常用消息中間件的對(duì)比
4.消息隊(duì)列RabbitMq的五種形式隊(duì)列
4.1.點(diǎn)對(duì)點(diǎn)(簡單)的隊(duì)列
點(diǎn)對(duì)點(diǎn)模式:一對(duì)一消費(fèi),一個(gè)生產(chǎn)者投遞消息給隊(duì)列贬芥,只能允許有一個(gè)消費(fèi)者進(jìn)行消費(fèi)吐辙。
注意:如果消費(fèi)集群的話,會(huì)進(jìn)行均攤消費(fèi)蘸劈。前提是服務(wù)器的配置相同昏苏。
均攤消費(fèi)的弊端:假如有2臺(tái)服務(wù)器分別為A、B。如果每個(gè)消費(fèi)處理消息的業(yè)務(wù)時(shí)間不相同的情況下捷雕,可能對(duì)消費(fèi)者處理慢的服務(wù)器不公平(服務(wù)器壓力大)椒丧,A處理比B處理時(shí)間快,應(yīng)該A處理的消息多一些救巷,B處理的消息少一些才合理壶熏。
隊(duì)列以先進(jìn)先出原則進(jìn)行存放消息集合。生產(chǎn)者投遞消息到隊(duì)列中浦译。
當(dāng)消費(fèi)者啟動(dòng)的時(shí)候棒假,會(huì)與隊(duì)列服務(wù)器建立長連接,當(dāng)生產(chǎn)者有消息投遞到隊(duì)列的時(shí)候精盅,隊(duì)列會(huì)立刻將消息通知給消費(fèi)者進(jìn)行消費(fèi)帽哑。
長連接的好處:如果是短鏈接的話,每次訪問都需要建立連接叹俏,比較占內(nèi)存妻枕。建立長連接會(huì)減少三次握手,提高傳輸速度粘驰。
取消息隊(duì)列與推送消息隊(duì)列的區(qū)別:
取消息:生產(chǎn)者投遞消息到隊(duì)列中屡谐,隊(duì)列服務(wù)器緩存消息。這時(shí)候當(dāng)消費(fèi)者啟動(dòng)的時(shí)候蝌数,消費(fèi)者會(huì)去向隊(duì)列服務(wù)器中獲取消息愕掏。
推消息:當(dāng)生產(chǎn)者和消費(fèi)者都啟動(dòng)的時(shí)候,生產(chǎn)者向隊(duì)列投遞消息顶伞,這時(shí)候隊(duì)列會(huì)將消息推送給消費(fèi)者饵撑。
4.2.工作(公平性)隊(duì)列模式
公平隊(duì)列的原理:隊(duì)列服務(wù)器向消費(fèi)者發(fā)送消息的時(shí)候,消費(fèi)者采用手動(dòng)應(yīng)答模式唆貌,隊(duì)列服務(wù)器必須要收到消費(fèi)者發(fā)送ack結(jié)果通知之后滑潘,才會(huì)繼續(xù)發(fā)送下一個(gè)消息。
4.3.發(fā)布訂閱模式
Direct exchange(直連交換機(jī))是根據(jù)消息攜帶的路由鍵(routing key)將消息投遞給對(duì)應(yīng)隊(duì)列的挠锥。
發(fā)布訂閱實(shí)現(xiàn)流程:生產(chǎn)者投遞消息給交換機(jī)众羡,交換機(jī)根據(jù)路由策略(routignKey)轉(zhuǎn)發(fā)到不同的隊(duì)列服務(wù)器中緩存侨赡,然后隊(duì)列服務(wù)器在推送消息給消費(fèi)者進(jìn)行消費(fèi)或者消費(fèi)者從隊(duì)列服務(wù)器中拉取消息進(jìn)行消費(fèi)蓖租。
發(fā)布訂閱實(shí)現(xiàn)原理:一對(duì)多。
這個(gè)隊(duì)列模式是消息隊(duì)列中最重要的隊(duì)列了羊壹,其他的都是在它的基礎(chǔ)上進(jìn)行了擴(kuò)展蓖宦。 功能實(shí)現(xiàn):一個(gè)生產(chǎn)者發(fā)送消息,多個(gè)消費(fèi)者獲取消息(同樣的消息)油猫,包括一個(gè)生產(chǎn)者稠茂,一個(gè)交換機(jī),多個(gè)隊(duì)列,多個(gè)消費(fèi)者睬关。
思路解讀(重點(diǎn)理解):
1. 一個(gè)生產(chǎn)者诱担,多個(gè)消費(fèi)者
2. 每一個(gè)消費(fèi)者都有自己的一個(gè)隊(duì)列
3. 生產(chǎn)者沒有直接發(fā)消息到隊(duì)列中,而是發(fā)送到交換機(jī)
4. 每個(gè)消費(fèi)者的隊(duì)列都綁定到交換機(jī)上
5. 消息通過交換機(jī)到達(dá)每個(gè)消費(fèi)者的隊(duì)列 該模式就是Fanout Exchange(扇型交換機(jī))將消息路由給綁定到它身上的所有隊(duì)列 以用戶發(fā)郵件案例講解
注意:交換機(jī)沒有存儲(chǔ)消息功能电爹,如果消息發(fā)送沒有綁定消費(fèi)隊(duì)列的交換機(jī)蔫仙,消息則丟失。在消費(fèi)者沒有啟動(dòng)的情況下丐箩,生產(chǎn)者投遞消息到交換機(jī)摇邦,這時(shí)候交換機(jī)不知道把消息轉(zhuǎn)發(fā)給哪個(gè)消費(fèi)者,所以消息會(huì)消失屎勘。因?yàn)榻粨Q機(jī)沒有緩存功能施籍,只做轉(zhuǎn)發(fā)的功能。
使用場(chǎng)景:用戶注冊(cè)→發(fā)送郵件→發(fā)送短信概漱。
4.4.路由模式RoutingKey
Direct exchange(直連交換機(jī))是根據(jù)消息攜帶的路由鍵(routing key)將消息投遞給對(duì)應(yīng)隊(duì)列的丑慎。
生產(chǎn)者發(fā)送消息到交換機(jī)并指定一個(gè)路由key,消費(fèi)者隊(duì)列綁定到交換機(jī)時(shí)要制定路由key(key匹配就能接受消息瓤摧,key不匹配就不能接受消息)立哑。
例如:我們可以把路由key設(shè)置為insert ,那么消費(fèi)者隊(duì)列key指定包含insert才可以接收消息姻灶,消費(fèi)者隊(duì)列key定義為update或者delete就不能接收消息铛绰。很好的控制了更新,插入和刪除的操作产喉。 采用交換機(jī)direct模式
流程說明:如果生產(chǎn)者投遞消息到交換機(jī)(exchange),郵件隊(duì)列和短信隊(duì)列也都綁定了交換機(jī)(exchange)捂掰。但是當(dāng)交換機(jī)的類型(type=direct)的時(shí)候,交換機(jī)的轉(zhuǎn)發(fā)(路由)由routingKey決定轉(zhuǎn)發(fā)給誰曾沈。如下如圖所示这嚣,當(dāng)交換機(jī)的rontingKey=email的時(shí)候,消息將轉(zhuǎn)發(fā)到郵件隊(duì)列服務(wù)然后由郵件消費(fèi)者進(jìn)行消費(fèi)塞俱。而短信隊(duì)列是都收不到消息的姐帚,因?yàn)槎绦诺穆酚蓃outingKey=msg。如果短信隊(duì)列也想收到消息就需要修改routingKey=email才可以收到消息障涯。
這就是交換機(jī)類型type=direct的用法及特性罐旗。
4.5.通配符模式Topics
說明:此模式實(shí)在路由key模式的基礎(chǔ)上,使用了通配符來管理消費(fèi)者接收消息唯蝶。生產(chǎn)者P發(fā)送消息到交換機(jī)X九秀,type=topic,交換機(jī)根據(jù)綁定隊(duì)列的routing key的值進(jìn)行通配符匹配粘我;
符號(hào)#:匹配一個(gè)或者多個(gè)詞lazy.# 可以匹配lazy.irs或者lazy.irs.cor
符號(hào)*:只能匹配一個(gè)詞lazy.* 可以匹配lazy.irs或者lazy.cor
消息隊(duì)列RabbitMQ應(yīng)答模式
為了確保消息不會(huì)丟失鼓蜒,RabbitMQ支持消息應(yīng)答。消費(fèi)者發(fā)送一個(gè)消息應(yīng)答,告訴RabbitMQ這個(gè)消息已經(jīng)接收并且處理完畢了都弹。RabbitMQ就可以刪除它了娇豫。 如果一個(gè)消費(fèi)者掛掉卻沒有發(fā)送應(yīng)答,RabbitMQ會(huì)理解為這個(gè)消息沒有處理完全畅厢,然后交給另一個(gè)消費(fèi)者去重新處理锤躁。這樣,你就可以確認(rèn)即使消費(fèi)者偶爾掛掉也不會(huì)丟失任何消息了或详。 沒有任何消息超時(shí)限制系羞;只有當(dāng)消費(fèi)者掛掉時(shí),RabbitMQ才會(huì)重新投遞霸琴。即使處理一條消息會(huì)花費(fèi)很長的時(shí)間椒振。 消息應(yīng)答是默認(rèn)打開的。我們通過顯示的設(shè)置autoAsk=true關(guān)閉這種機(jī)制∥喑耍現(xiàn)即自動(dòng)應(yīng)答開澎迎,一旦我們完成任務(wù),消費(fèi)者會(huì)自動(dòng)發(fā)送應(yīng)答选调。通知RabbitMQ消息已被處理夹供,可以從內(nèi)存刪除。如果消費(fèi)者因宕機(jī)或鏈接失敗等原因沒有發(fā)送ACK(不同于ActiveMQ仁堪,在RabbitMQ里哮洽,消息沒有過期的概念),則RabbitMQ會(huì)將消息重新發(fā)送給其他監(jiān)聽在隊(duì)列的下一個(gè)消費(fèi)者弦聂。
RabbitMQ的公平轉(zhuǎn)發(fā)
目前消息轉(zhuǎn)發(fā)機(jī)制是平均分配鸟辅,這樣就會(huì)出現(xiàn)倆個(gè)消費(fèi)者,奇數(shù)的任務(wù)很耗時(shí)莺葫,偶數(shù)的任何工作量很小匪凉,造成的原因就是近當(dāng)消息到達(dá)隊(duì)列進(jìn)行轉(zhuǎn)發(fā)消息。并不在乎有多少任務(wù)消費(fèi)者并未傳遞一個(gè)應(yīng)答給RabbitMQ捺檬。僅僅盲目轉(zhuǎn)發(fā)所有的奇數(shù)給一個(gè)消費(fèi)者再层,偶數(shù)給另一個(gè)消費(fèi)者。 為了解決這樣的問題堡纬,我們可以使用basicQos方法聂受,傳遞參數(shù)為prefetchCount= 1。這樣告訴RabbitMQ不要在同一時(shí)間給一個(gè)消費(fèi)者超過一條消息隐轩。 換句話說饺饭,只有在消費(fèi)者空閑的時(shí)候會(huì)發(fā)送下一條信息。調(diào)度分發(fā)消息的方式职车,也就是告訴RabbitMQ每次只給消費(fèi)者處理一條消息,也就是等待消費(fèi)者處理完畢并自己對(duì)剛剛處理的消息進(jìn)行確認(rèn)之后,才發(fā)送下一條消息悴灵,防止消費(fèi)者太過于忙碌扛芽,也防止它太過去清閑。 通過 設(shè)置channel.basicQos(1);
消息隊(duì)列RabbitMQ應(yīng)答模式
案例: 生產(chǎn)者端代碼不變积瞒,消費(fèi)者端代碼這部分就是用于開啟手動(dòng)應(yīng)答模式的川尖。 channel.basicConsume(QUEUE_NAME, false, defaultConsumer); 注:第二個(gè)參數(shù)值為false代表關(guān)閉RabbitMQ的自動(dòng)應(yīng)答機(jī)制,改為手動(dòng)應(yīng)答茫孔。 在處理完消息時(shí)叮喳,返回應(yīng)答狀態(tài),true表示為自動(dòng)應(yīng)答模式缰贝。 channel.basicAck(envelope.getDeliveryTag(), false);
傳統(tǒng)簡單隊(duì)列是如何實(shí)現(xiàn)的馍悟?
生產(chǎn)者生產(chǎn)消息直接投遞給隊(duì)列服務(wù)器,隊(duì)列服務(wù)器在以推送消息到消費(fèi)者或者消費(fèi)者從隊(duì)列服務(wù)器拉取消息進(jìn)行消費(fèi)剩晴。消費(fèi)者啟動(dòng)的時(shí)候會(huì)與隊(duì)列服務(wù)器建立長連接锣咒。
RabbitMQ關(guān)鍵名詞
AMQP(高級(jí)消息隊(duì)列協(xié)議)是一個(gè)異步消息傳遞所使用應(yīng)用層協(xié)議規(guī)范,為面向消息中間件設(shè)計(jì)赞弥,基于此協(xié)議的客戶端與消息中間件可以無視消息來源傳遞消息毅整,不受客戶端、消息中間件绽左、不同的開發(fā)語言環(huán)境等條件的限制悼嫉;
涉及概念解釋:
Server(Broker):接收客戶端連接,實(shí)現(xiàn)AMQP協(xié)議的消息隊(duì)列和路由功能的進(jìn)程拼窥;
Virtual Host:虛擬主機(jī)的概念承粤,類似權(quán)限控制組,一個(gè)Virtual Host里可以有多個(gè)Exchange和Queue闯团。? ?
Exchange:交換機(jī)辛臊,接收生產(chǎn)者發(fā)送的消息,并根據(jù)Routing Key將消息路由到服務(wù)器中的隊(duì)列Queue房交。
ExchangeType:交換機(jī)類型決定了路由消息行為彻舰,RabbitMQ中有三種類型Exchange,分別是fanout候味、direct刃唤、topic;? Message Queue:消息隊(duì)列白群,用于存儲(chǔ)還未被消費(fèi)者消費(fèi)的消息尚胞;
Message:由Header和body組成,Header是由生產(chǎn)者添加的各種屬性的集合帜慢,包括Message是否被持久化笼裳、優(yōu)先級(jí)是多少唯卖、由哪個(gè)Message Queue接收等;
body是真正需要發(fā)送的數(shù)據(jù)內(nèi)容躬柬;
BindingKey:綁定關(guān)鍵字拜轨,將一個(gè)特定的Exchange和一個(gè)特定的Queue綁定起來。
RabbitMQ交換機(jī)的作用
生產(chǎn)者發(fā)送消息不會(huì)像傳統(tǒng)方式直接將消息投遞到隊(duì)列中允青,而是先將消息投遞到交換機(jī)中橄碾,在由交換機(jī)轉(zhuǎn)發(fā)到具體的隊(duì)列,隊(duì)列在將消息以推送或者拉取方式給消費(fèi)者進(jìn)行消費(fèi)颠锉,這和我們之前學(xué)習(xí)Nginx有點(diǎn)類似法牲。 交換機(jī)的作用根據(jù)具體的路由策略分發(fā)到不同的隊(duì)列中。
交換機(jī)有四種類型:
Direct exchange(直連交換機(jī)):是根據(jù)消息攜帶的路由鍵琼掠;
routing key:將消息投遞給對(duì)應(yīng)隊(duì)列的 Fanout exchange(扇型交換機(jī))將消息路由給綁定到它身上的所有隊(duì)列 拒垃;
Topic exchange(主題交換機(jī)):隊(duì)列通過路由鍵綁定到交換機(jī)上,然后眉枕,交換機(jī)根據(jù)消息里的路由值恶复,將消息路由給一個(gè)或多個(gè)綁定隊(duì)列;
Headers exchange(頭交換機(jī)):類似主題交換機(jī)速挑,但是頭交換機(jī)使用多個(gè)消息屬性來代替路由鍵建立路由規(guī)則谤牡。通過判斷消息頭的值能否與指定的綁定相匹配來確立路由規(guī)則。
RabbitMQ消息確認(rèn)機(jī)制
問題產(chǎn)生背景: 生產(chǎn)者發(fā)送消息出去之后姥宝,不知道到底有沒有發(fā)送到RabbitMQ服務(wù)器翅萤, 默認(rèn)是不知道的。而且有的時(shí)候我們?cè)诎l(fā)送消息之后腊满,后面的邏輯出問題了套么,我們不想要發(fā)送之前的消息了,需要撤回該怎么做碳蛋。
如果RabbitMQ服務(wù)器宕機(jī)了胚泌,消息會(huì)丟失嗎?
? 答案:RabbitMQ服務(wù)器支持消息持久化機(jī)制肃弟,會(huì)把消息持久化在硬盤上保存玷室。代碼設(shè)置? channel.queueDeclare(EMAIL_QUEUE, true, false, false, null); 方法第二個(gè)參數(shù),默認(rèn)情況下我們應(yīng)該設(shè)置為true笤受。
解決方案:
1.AMQP 事務(wù)機(jī)制
2.Confirm 模式
事務(wù)模式::
? txSelect:將當(dāng)前channel設(shè)置為transaction模式
? txCommit :提交當(dāng)前事務(wù)
? txRollback:事務(wù)回滾
生產(chǎn)者? 消費(fèi)者? 隊(duì)列服務(wù)器?
消費(fèi)者如何確保消息一定能夠消費(fèi)成功穷缤?
通過應(yīng)答模式,默認(rèn)為應(yīng)答模式箩兽,可以修改為手動(dòng)應(yīng)答津肛。設(shè)置方法:channel.basicConsume(QUEUE_NAME, false, defaultConsumer); 第二個(gè)參數(shù)。
設(shè)置應(yīng)答模式 :第一個(gè)參數(shù) 隊(duì)列名稱汗贫、第二個(gè)參數(shù) 應(yīng)答模式 如果為true 自動(dòng)應(yīng)答身坐,false 為手動(dòng)應(yīng)答秸脱、第三個(gè)參數(shù) 監(jiān)聽器
自動(dòng)應(yīng)答(true):不在乎消費(fèi)者對(duì)這個(gè)消息處理是否成功,都會(huì)告訴隊(duì)列刪除該消息掀亥。如果處理消息失敗的情況下撞反,應(yīng)該實(shí)現(xiàn)自動(dòng)補(bǔ)償妥色。
手動(dòng)應(yīng)答(false):當(dāng)隊(duì)列把消息推送給消費(fèi)者搪花,消費(fèi)者處理完業(yè)務(wù)邏輯之后,手動(dòng)返回ack(通知)告訴給隊(duì)列服務(wù)器是否要?jiǎng)h除該消息嘹害、如果失敗撮竿,隊(duì)列服務(wù)器做補(bǔ)償,而不會(huì)直接刪除該消息笔呀、
springboot整合rabbitmq項(xiàng)目
springboot整合rabbitmq分為2個(gè)項(xiàng)目幢踏,一個(gè)是生產(chǎn)者服務(wù),一個(gè)是消息服務(wù)平臺(tái)項(xiàng)目许师。消息服務(wù)平臺(tái)項(xiàng)目中包括郵件消費(fèi)者和短信消費(fèi)者房蝉。沒有必要每一個(gè)消費(fèi)者都創(chuàng)建一個(gè)項(xiàng)目,那樣會(huì)浪費(fèi)資源微渠。
在一個(gè)項(xiàng)目中搭幻,可以有多個(gè)生產(chǎn)者和消費(fèi)者。
RabbitMQ消息重試機(jī)制
消費(fèi)者在消費(fèi)消息的時(shí)候逞盆,如果消費(fèi)者業(yè)務(wù)邏輯出現(xiàn)程序異常檀蹋,這時(shí)候應(yīng)該如何處理?
答案:使用消息重試機(jī)制云芦。
如何合適選擇重試機(jī)制:
情況1: 消費(fèi)者獲取到消息后俯逾,調(diào)用第三方接口,但接口暫時(shí)無法訪問舅逸,是否需要重試?
答案:需要重試機(jī)制桌肴。
情況2: 消費(fèi)者獲取到消息后,拋出數(shù)據(jù)轉(zhuǎn)換異常琉历,是否需要重試?
答案:不需要重試機(jī)制坠七,需要發(fā)布版本進(jìn)行解決。
如何實(shí)現(xiàn)重試機(jī)制
總結(jié):對(duì)于情況2善已,如果消費(fèi)者代碼拋出異常是需要發(fā)布新版本才能解決的問題灼捂,那么不需要重試,重試也無濟(jì)于事换团。應(yīng)該采用日志記錄+定時(shí)任務(wù)job健康檢查+人工進(jìn)行補(bǔ)償悉稠。
消費(fèi)者如果保證消息冪等性,不被重復(fù)消費(fèi)
產(chǎn)生原因:網(wǎng)絡(luò)延遲傳輸中艘包,消費(fèi)出現(xiàn)異车拿停或者是消費(fèi)延遲消費(fèi)耀盗,會(huì)造成MQ進(jìn)行重試補(bǔ)償,在重試過程中卦尊,可能會(huì)造成重復(fù)消費(fèi)叛拷。
消費(fèi)者如何保證消息冪等性,不被重復(fù)消費(fèi)
解決辦法:
①使用全局MessageID判斷消費(fèi)方是否是同一個(gè)岂却,解決冪等性忿薇。
②或者使用業(yè)務(wù)邏輯id保證唯一(比如訂單號(hào)碼)
RabbitMQ死信隊(duì)列
死信隊(duì)列 聽上去像 消息“死”了? ? 其實(shí)也有點(diǎn)這個(gè)意思,死信隊(duì)列? 是 當(dāng)消息在一個(gè)隊(duì)列 因?yàn)橄铝性颍?/p>
消息被拒絕(basic.reject/ basic.nack)并且不再重新投遞 requeue=false
消息超期 (rabbitmq? Time-To-Live -> messageProperties.setExpiration())
隊(duì)列超載
變成了 “死信” 后? ? 被重新投遞(publish)到另一個(gè)Exchange? 該Exchange 就是DLX? ? 然后該Exchange 根據(jù)綁定規(guī)則 轉(zhuǎn)發(fā)到對(duì)應(yīng)的 隊(duì)列上? 監(jiān)聽該隊(duì)列? 就可以重新消費(fèi)? ? 說白了 就是? 沒有被消費(fèi)的消息? 換個(gè)地方重新被消費(fèi)
生產(chǎn)者? -->? 消息 --> 交換機(jī)? --> 隊(duì)列? --> 變成死信? --> DLX交換機(jī) -->隊(duì)列 --> 消費(fèi)者
什么是死信呢躏哩?什么樣的消息會(huì)變成死信呢署浩?
消息被拒絕(basic.reject或basic.nack)并且requeue=false.
消息TTL過期
隊(duì)列達(dá)到最大長度(隊(duì)列滿了,無法再添加數(shù)據(jù)到mq中)
應(yīng)用場(chǎng)景分析
在定義業(yè)務(wù)隊(duì)列的時(shí)候扫尺,可以考慮指定一個(gè)死信交換機(jī)筋栋,并綁定一個(gè)死信隊(duì)列,當(dāng)消息變成死信時(shí)正驻,該消息就會(huì)被發(fā)送到該死信隊(duì)列上弊攘,這樣就方便我們查看消息失敗的原因了
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 丟棄消息
如何使用死信交換機(jī)呢?
定義業(yè)務(wù)(普通)隊(duì)列的時(shí)候指定參數(shù)
x-dead-letter-exchange: 用來設(shè)置死信后發(fā)送的交換機(jī)
x-dead-letter-routing-key:用來設(shè)置死信的routingKey
/**
* 定義死信隊(duì)列相關(guān)信息
*/
public final static String deadQueueName = "dead_queue";
public final static String deadRoutingKey = "dead_routing_key";
public final static String deadExchangeName = "dead_exchange";
/**
* 死信隊(duì)列 交換機(jī)標(biāo)識(shí)符
*/
public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 死信隊(duì)列交換機(jī)綁定鍵標(biāo)識(shí)符
*/
public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
/**
* 定義短信隊(duì)列? 包括死信隊(duì)列
*
* @return
*/
@Bean
public Queue fanoutMsgQueue() {
? ? //return new Queue(MSG_QUEUE_FANOUT);
? ? // 將普通隊(duì)列綁定到死信隊(duì)列交換機(jī)上
? ? Map<String, Object> args = new HashMap<>(2);
? ? args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
? ? args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
? ? Queue queue = new Queue(MSG_QUEUE_FANOUT, true, false, false, args);
? ? return queue;
}
/**
* 配置死信隊(duì)列
*
* @return
*/
@Bean
public Queue deadQueue() {
? ? Queue queue = new Queue(deadQueueName, true);
? ? return queue;
}
/**
*? 創(chuàng)建死信交換機(jī)
*/
@Bean
public DirectExchange deadExchange() {
? ? return new DirectExchange(deadExchangeName);
}
/**
*? 死信交換機(jī)綁定私信隊(duì)列
* @param deadQueue
* @param deadExchange
* @return
*/
@Bean
public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
? ? return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
}
RabbitMq 的配置文件
spring:
? rabbitmq:
? ? #### 連接地址
? ? host: 127.0.0.1
? ? ####端口號(hào)
? ? port: 5672
? ? #### 用戶名 自己在rabbitmq服務(wù)器上新建的? 默認(rèn)的用戶名和密碼為guest
? ? username: ming
? ? #### 密碼
? ? password: ming
? ? ###? 虛擬主機(jī)
? ? virtual-host: /member
? ? listener:
? ? ? simple:
? ? ? ? retry:
? ? ? ? ? ####開啟消費(fèi)者(程序出現(xiàn)異常的情況下)進(jìn)行重試機(jī)制
? ? ? ? ? enabled: true
? ? ? ? ? ### 最大重試次數(shù),? 默認(rèn)情況下 一直重試
? ? ? ? ? max-attempts: 5
? ? ? ? ? #### 重試間隔時(shí)間 單位:毫秒
? ? ? ? ? initial-interval: 3000
? ? ? ? ? ##### 開啟手動(dòng)應(yīng)答 ack
? ? ? ? acknowledge-mode: manual
### 服務(wù)端口號(hào)
server:
? port: 8081