1. RabbitMQ 高級(jí)
1.1. 過期時(shí)間TTL
過期時(shí)間TTL表示可以對(duì)消息設(shè)置預(yù)期的時(shí)間款青,在這個(gè)時(shí)間內(nèi)都可以被消費(fèi)者接收獲茸鲂蕖;過了之后消息將自動(dòng)被刪除抡草。RabbitMQ可以對(duì)消息和隊(duì)列設(shè)置TTL饰及。目前有兩種方法可以設(shè)置。
- 第一種方法是通過隊(duì)列屬性設(shè)置康震,隊(duì)列中所有消息都有相同的過期時(shí)間燎含。
- 第二種方法是對(duì)消息進(jìn)行單獨(dú)設(shè)置,每條消息TTL可以不同腿短。
如果上述兩種方法同時(shí)使用屏箍,則消息的過期時(shí)間以兩者之間TTL較小的那個(gè)數(shù)值為準(zhǔn)。消息在隊(duì)列的生存時(shí)間一旦超過設(shè)置的TTL值橘忱,就稱為dead message被投遞到死信隊(duì)列赴魁, 消費(fèi)者將無法再收到該消
息。
1.1.1. 設(shè)置隊(duì)列TTL
在 spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml
文件中添加
如下內(nèi)容:
<!--定義過期隊(duì)列及其屬性钝诚,不存在則自動(dòng)創(chuàng)建-->
<rabbit:queue id="my_ttl_queue" name="my_ttl_queue" auto-declare="true">
<rabbit:queue-arguments> <!--投遞到該隊(duì)列的消息如果沒有消費(fèi)都將在6秒之后被刪除-->
<entry key="x-message-ttl" value-type="long" value="6000"/>
</rabbit:queue-arguments>
</rabbit:queue>
然后在測試類 spring-rabbitmq-producer\src\test\java\com\itheima\rabbitmq\ProducerTest.java
中編寫如下方法發(fā)送消息到上述定義的隊(duì)列:
/*
** 過期隊(duì)列消息
* 投遞到該隊(duì)列的消息如果沒有消費(fèi)都將在6秒之后被刪除
*/
@Test public void ttlQueueTest(){
//路由鍵與隊(duì)列同名
rabbitTemplate.convertAndSend("my_ttl_queue", "發(fā)送到過期隊(duì)列my_ttl_queue颖御,
6秒內(nèi)不消費(fèi)則不能再被消費(fèi)。");
}
參數(shù) x-message-ttl 的值 必須是非負(fù) 32 位整數(shù) (0 <= n <= 2^32-1) 凝颇,以毫秒為單位表示 TTL 的值潘拱。這樣疹鳄,值 6000 表示存在于 隊(duì)列 中的當(dāng)前 消息 將最多只存活 6 秒鐘。
如果不設(shè)置TTL,則表示此消息不會(huì)過期芦岂。如果將TTL設(shè)置為0瘪弓,則表示除非此時(shí)可以直接將消息投遞到消費(fèi)者,否則該消息會(huì)被立即丟棄盔腔。
1.1.2. 設(shè)置消息TTL
消息的過期時(shí)間杠茬;只需要在發(fā)送消息(可以發(fā)送到任何隊(duì)列,不管該隊(duì)列是否屬于某個(gè)交換機(jī))的時(shí)候設(shè)置過期時(shí)間即可弛随。在測試類中編寫如下方法發(fā)送消息并設(shè)置過期時(shí)間到隊(duì)列:
/*
** 過期消息
* 該消息投遞任何交換機(jī)或隊(duì)列中的時(shí)候瓢喉;如果到了過期時(shí)間則將從該隊(duì)列中刪除
*/
@Test
public void ttlMessageTest(){
MessageProperties messageProperties = new MessageProperties();
//設(shè)置消息的過期時(shí)間,5秒
messageProperties.setExpiration("5000");
Message message = new Message("測試過期消息舀透,5秒鐘過期".getBytes(),
messageProperties);
//路由鍵與隊(duì)列同名
rabbitTemplate.convertAndSend("my_ttl_queue", message);
}
expiration 字段以微秒為單位表示 TTL 值栓票。且與 x-message-ttl 具有相同的約束條件。因?yàn)?br> expiration 字段必須為字符串類型愕够,broker 將只會(huì)接受以字符串形式表達(dá)的數(shù)字走贪。
當(dāng)同時(shí)指定了 queue 和 message 的 TTL 值,則兩者中較小的那個(gè)才會(huì)起作用惑芭。
1.2. 死信隊(duì)列
DLX坠狡,全稱為Dead-Letter-Exchange , 可以稱之為死信交換機(jī),也有人稱之為死信郵箱遂跟。當(dāng)消息在一個(gè)隊(duì)列中變成死信(dead message)之后逃沿,它能被重新發(fā)送到另一個(gè)交換機(jī)中,這個(gè)交換機(jī)就是DLX 幻锁,綁定DLX的隊(duì)列就稱之為死信隊(duì)列凯亮。
消息變成死信,可能是由于以下的原因:
- 消息被拒絕
- 消息過期
- 隊(duì)列達(dá)到最大長度
DLX也是一個(gè)正常的交換機(jī)哄尔,和一般的交換機(jī)沒有區(qū)別假消,它能在任何的隊(duì)列上被指定,實(shí)際上就是設(shè)置某一個(gè)隊(duì)列的屬性岭接。當(dāng)這個(gè)隊(duì)列中存在死信時(shí)富拗,Rabbitmq就會(huì)自動(dòng)地將這個(gè)消息重新發(fā)布到設(shè)置的DLX上去,進(jìn)而被路由到另一個(gè)隊(duì)列鸣戴,即死信隊(duì)列啃沪。
要想使用死信隊(duì)列,只需要在定義隊(duì)列的時(shí)候設(shè)置隊(duì)列參數(shù) x-dead-letter-exchange 指定交換機(jī)即可葵擎。
具體步驟如下面的章節(jié)谅阿。
1.2.1. 定義死信交換機(jī)
在 spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml
文件中添加
如下內(nèi)容:
<!--定義定向交換機(jī)中的持久化死信隊(duì)列半哟,不存在則自動(dòng)創(chuàng)建-->
<rabbit:queue id="my_dlx_queue" name="my_dlx_queue" auto-declare="true"/>
<!--定義廣播類型交換機(jī)酬滤;并綁定上述兩個(gè)隊(duì)列-->
<rabbit:direct-exchange id="my_dlx_exchange" name="my_dlx_exchange"
auto- declare="true">
<rabbit:bindings>
<!--綁定路由鍵my_ttl_dlx签餐、my_max_dlx,可以將過期的消息轉(zhuǎn)移到my_dlx_queue隊(duì) 列-->
<rabbit:binding key="my_ttl_dlx" queue="my_dlx_queue"/>
<rabbit:binding key="my_max_dlx" queue="my_dlx_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
1.2.2. 隊(duì)列設(shè)置死信交換機(jī)
為了測試消息在過期盯串、隊(duì)列達(dá)到最大長度后都將被投遞死信交換機(jī)上氯檐;所以添加配置如下:
在 spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml
文件中添加
如下內(nèi)容:
<!--定義過期隊(duì)列及其屬性,不存在則自動(dòng)創(chuàng)建-->
<rabbit:queue id="my_ttl_dlx_queue" name="my_ttl_dlx_queue" auto- declare="true">
<rabbit:queue-arguments>
<!--投遞到該隊(duì)列的消息如果沒有消費(fèi)都將在6秒之后被投遞到死信交換機(jī)-->
<entry key="x-message-ttl" value-type="long" value="6000"/>
<!--設(shè)置當(dāng)消息過期后投遞到對(duì)應(yīng)的死信交換機(jī)-->
<entry key="x-dead-letter-exchange" value="my_dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!--定義限制長度的隊(duì)列及其屬性体捏,不存在則自動(dòng)創(chuàng)建-->
<rabbit:queue id="my_max_dlx_queue" name="my_max_dlx_queue" auto- declare="true">
<rabbit:queue-arguments>
<!--投遞到該隊(duì)列的消息最多2個(gè)消息冠摄,如果超過則最早的消息被刪除投遞到死信交換機(jī)-->
<entry key="x-max-length" value-type="long" value="2"/>
<!--設(shè)置當(dāng)消息過期后投遞到對(duì)應(yīng)的死信交換機(jī)-->
<entry key="x-dead-letter-exchange" value="my_dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!--定義定向交換機(jī) 根據(jù)不同的路由key投遞消息-->
<rabbit:direct-exchange id="my_normal_exchange" name="my_normal_exchange"
auto-declare="true">
<rabbit:bindings>
<rabbit:binding key="my_ttl_dlx" queue="my_ttl_dlx_queue"/>
<rabbit:binding key="my_max_dlx" queue="my_max_dlx_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
1.2.3. 消息過期的死信隊(duì)列測試
1)發(fā)送消息代碼
添加 spring-rabbitmq-producer\src\test\java\com\itheima\rabbitmq\ProducerTest.java
方法
/***
過期消息投遞到死信隊(duì)列
* 投遞到一個(gè)正常的隊(duì)列,但是該隊(duì)列有設(shè)置過期時(shí)間几缭,到過期時(shí)間之后消息會(huì)被投遞到死信交換機(jī) (隊(duì)列)
*/
@Test
public void dlxTTLMessageTest(){
rabbitTemplate.convertAndSend("my_normal_exchange", "my_ttl_dlx", "測試過 期消息河泳;
6秒過期后會(huì)被投遞到死信交換機(jī)");
}
2)在rabbitMQ管理界面中結(jié)果
未過期:
過期后:
3)流程
具體因?yàn)殛?duì)列消息過期而被投遞到死信隊(duì)列的流程:
1.2.4. 消息過長的死信隊(duì)列測試
1)發(fā)送消息代碼
添加 spring-rabbitmq-producer\src\test\java\com\itheima\rabbitmq\ProducerTest.java
方法
/**
* 超過隊(duì)列長度消息投遞到死信隊(duì)列
* 投遞到一個(gè)正常的隊(duì)列,但是該隊(duì)列有設(shè)置最大消息數(shù)年栓,到最大消息數(shù)之后隊(duì)列中最早的消息會(huì)被投
遞到死信交換機(jī)(隊(duì)列)
*/
@Test
public void dlxMaxMessageTest(){
rabbitTemplate.convertAndSend("my_normal_exchange", "my_max_dlx",
"隊(duì)列my_max_dlx_queue的最大長度為2拆挥;消息超過后會(huì)被投遞到死信交換機(jī);這是 第1個(gè)消息");
rabbitTemplate.convertAndSend("my_normal_exchange", "my_max_dlx",
"隊(duì)列my_max_dlx_queue的最大長度為2某抓;消息超過后會(huì)被投遞到死信交換機(jī)纸兔;這是 第2個(gè)消息");
rabbitTemplate.convertAndSend("my_normal_exchange", "my_max_dlx",
"隊(duì)列my_max_dlx_queue的最大長度為2;消息超過后會(huì)被投遞到死信交換機(jī)否副;這是 第3個(gè)消息");
}
2)在rabbitMQ管理界面中結(jié)果
上面發(fā)送的3條消息中的第1條消息會(huì)被投遞到死信隊(duì)列中(如果啟動(dòng)了消費(fèi)者汉矿,那么隊(duì)列消息很快會(huì)被取走消費(fèi)掉);
3)消費(fèi)者接收死信隊(duì)列消息
與過期消息投遞到死信隊(duì)列的代碼和配置是共用的备禀,并不需要重新編寫洲拇。
4)流程
消息超過隊(duì)列最大消息長度而被投遞到死信隊(duì)列的流程在前面的圖中已包含。
1.3. 延遲隊(duì)列
延遲隊(duì)列存儲(chǔ)的對(duì)象是對(duì)應(yīng)的延遲消息痹届;所謂“延遲消息” 是指當(dāng)消息被發(fā)送以后呻待,并不想讓消費(fèi)者立刻
拿到消息,而是等待特定時(shí)間后队腐,消費(fèi)者才能拿到這個(gè)消息進(jìn)行消費(fèi)蚕捉。
在RabbitMQ中延遲隊(duì)列可以通過 過期時(shí)間 + 死信隊(duì)列
來實(shí)現(xiàn);具體如下流程圖所示:
在上圖中柴淘;分別設(shè)置了兩個(gè)5秒迫淹、10秒的過期隊(duì)列,然后等到時(shí)間到了則會(huì)自動(dòng)將這些消息轉(zhuǎn)移投遞到對(duì)應(yīng)的死信隊(duì)列中为严,然后消費(fèi)者再從這些死信隊(duì)列接收消息就可以實(shí)現(xiàn)消息的延遲接收敛熬。
延遲隊(duì)列的應(yīng)用場景;如:
- 在電商項(xiàng)目中的支付場景第股;如果在用戶下單之后的幾十分鐘內(nèi)沒有支付成功应民;那么這個(gè)支付的訂單算是支付失敗,要進(jìn)行支付失敗的異常處理(將庫存加回去),這時(shí)候可以通過使用延遲隊(duì)列來處理
- 在系統(tǒng)中如有需要在指定的某個(gè)時(shí)間之后執(zhí)行的任務(wù)都可以通過延遲隊(duì)列處理
1.4. 消息確認(rèn)機(jī)制
確認(rèn)并且保證消息被送達(dá)诲锹,提供了兩種方式:發(fā)布確認(rèn)和事務(wù)繁仁。(兩者不可同時(shí)使用)在channel為事務(wù)時(shí),不可引入確認(rèn)模式归园;同樣channel為確認(rèn)模式下黄虱,不可使用事務(wù)。
1.4.1 發(fā)布確認(rèn)
有兩種方式:消息發(fā)送成功確認(rèn)和消息發(fā)送失敗回調(diào)庸诱。
-
消息發(fā)送成功確認(rèn)
在spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml connectionFactory
中啟用消息確認(rèn):
<!-- publisher-confirms="true" 表示:啟用了消息確認(rèn) -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-confirms="true" />
配置消息確認(rèn)回調(diào)方法如下:
<!-- 消息回調(diào)處理類 -->
<bean id="confirmCallback" class="com.itheima.rabbitmq.MsgSendConfirmCallBack"/>
<!--定義rabbitTemplate對(duì)象操作可以在代碼中方便發(fā)送消息-->
<!-- confirm-callback="confirmCallback" 表示:消息失敗回調(diào) -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
confirm-callback="confirmCallback"/>
消息確認(rèn)回調(diào)方法com.itheima.rabbitmq.MsgSendConfirmCallBack如下:
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息確認(rèn)成功...."); }
else {
//處理丟失的消息 System.out.println("消息確認(rèn)失敗," + cause);
}
}
}
功能測試如下:
發(fā)送消息
com.itheima.rabbitmq.ProducerTest#queueTest
@Test public void queueTest(){
//路由鍵與隊(duì)列同名
rabbitTemplate.convertAndSend("spring_queue", "只發(fā)隊(duì)列spring_queue的消 息捻浦。");
}
管理界面確認(rèn)消息發(fā)送成功
消息確認(rèn)回調(diào)
- 消息發(fā)送失敗回調(diào)
在spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml
connectionFactory 中啟用回調(diào):
<!-- publisher-returns="true" 表示:啟用了失敗回調(diào) -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
publisher-returns="true" />
配置消息失敗回調(diào)方法如下:
注意:同時(shí)需配置mandatory="true",否則消息則丟失
<!-- 消息失敗回調(diào)類 -->
<bean id="sendReturnCallback"
class="com.itheima.rabbitmq.MsgSendReturnCallback"/>
<!-- return-callback="sendReturnCallback" 表示:消息失敗回調(diào) ,同時(shí)需配置
mandatory="true"桥爽,否則消息則丟失-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
confirm-callback="confirmCallback" return-callback="sendReturnCallback"
mandatory="true"/>
消息失敗回調(diào)方法com.itheima.rabbitmq.MsgSendReturnCallback如下:
public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback {
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
String msgJson = new String(message.getBody());
System.out.println("Returned Message:"+msgJson);
}
}
功能測試如下:
模擬消息發(fā)送失敗
com.itheima.rabbitmq.ProducerTest#testFailQueueTest
@Test
public void testFailQueueTest() throws InterruptedException {
//exchange 正確,queue 錯(cuò)誤 ,confirm被回調(diào), ack=true; return被回調(diào)
replyText:NO_ROUTE
amqpTemplate.convertAndSend("test_fail_exchange", "", "測試消息發(fā)送失敗進(jìn)行確認(rèn) 應(yīng)答朱灿。");
}
失敗回調(diào)結(jié)果如下:
1.4.2 事務(wù)支持
場景:業(yè)務(wù)處理伴隨消息的發(fā)送,業(yè)務(wù)處理失斈扑摹(事務(wù)回滾)后要求消息不發(fā)送母剥。rabbitmq 使用調(diào)用者的外部事務(wù),通常是首選形导,因?yàn)樗欠乔秩胄缘模ǖ婉詈希?/p>
外部事務(wù)的配置:spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml
<!-- channel-transacted="true" 表示:支持事務(wù)操作 -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
confirm-callback="confirmCallback" return-callback="sendReturnCallback"
channel-transacted="true" />
<!--平臺(tái)事務(wù)管理器-->
<bean id="transactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
- 模擬業(yè)務(wù)處理失敗的場景:
測試類或者測試方法上加入@Transactional注解
@Transactional
public class ProducerTest
@Test
public void queueTest2(){
//路由鍵與隊(duì)列同名
rabbitTemplate.convertAndSend("spring_queue", "只發(fā)隊(duì)列spring_queue的消息- -01环疼。"); System.out.println("----------------dosoming:可以是數(shù)據(jù)庫的操作,也可以是其他業(yè)務(wù)
類型的操作---------------");
//模擬業(yè)務(wù)處理失敗
System.out.println(1/0);
rabbitTemplate.convertAndSend("spring_queue", "只發(fā)隊(duì)列spring_queue的消息- -02朵耕。");
}
測試結(jié)果:
1.5. 消息追蹤
消息中心的消息追蹤需要使用Trace實(shí)現(xiàn)炫隶,Trace是Rabbitmq用于記錄每一次發(fā)送的消息,方便使用Rabbitmq的開發(fā)者調(diào)試阎曹、排錯(cuò)伪阶。可通過插件形式提供可視化界面处嫌。Trace啟動(dòng)后會(huì)自動(dòng)創(chuàng)建系統(tǒng)Exchange:amq.rabbitmq.trace ,每個(gè)隊(duì)列會(huì)自動(dòng)綁定該Exchange栅贴,綁定后發(fā)送到隊(duì)列的消息都會(huì)記錄到Trace日志。
1.5.1 消息追蹤啟用與查看
以下是trace的相關(guān)命令和使用(要使用需要先rabbitmq啟用插件熏迹,再打開開關(guān)才能使用):
命令集 | 描述 |
---|---|
rabbitmq-plugins list | 查看插件列表 |
rabbitmq-plugins enable rabbitmq_tracing | rabbitmq啟用trace插件 |
rabbitmqctl trace_on | 打開trace的開關(guān) |
rabbitmqctl trace_on -p itcast | 打開trace的開關(guān)(itcast為需要日志追蹤的 vhost) |
rabbitmqctl trace_off | 關(guān)閉trace的開關(guān) |
rabbitmq-plugins disable rabbitmq_tracing | rabbitmq關(guān)閉Trace插件 |
rabbitmqctl set_user_tags heimaadministrator | 只有administrator的角色才能查看日志界面 |
安裝插件并開啟 trace_on 之后檐薯,會(huì)發(fā)現(xiàn)多個(gè) exchange:amq.rabbitmq.trace ,類型為:topic注暗。
1.5.2 日志追蹤
第一步:發(fā)送消息
rabbitTemplate.convertAndSend("spring_queue", "只發(fā)隊(duì)列spring_queue的消息--01坛缕。");
發(fā)送成功,web查看多了一條消息
第二步:查看trace
第三步:點(diǎn)擊Tracing查看Trace log files
第四步:點(diǎn)擊itcast-trace.log確認(rèn)消息軌跡正確性
瀏覽器截圖: