rabbitMQ如何保證如果消息發(fā)送失敗,保證其消息不丟失狐胎、怎么設(shè)置消息過(guò)期時(shí)間以及死信隊(duì)列是如何在消息消費(fèi)失敗時(shí)保證消息不丟失的掸犬、如何使用過(guò)期時(shí)間來(lái)實(shí)現(xiàn)延遲隊(duì)列以及rabbitMQ的持久化跟匆、消息確認(rèn)的機(jī)制是怎樣的?本博文將具體介紹上述內(nèi)容
本博文中的代碼實(shí)現(xiàn)實(shí)在SpringBoot整合RabbitMQ——消息的發(fā)送和接收的基礎(chǔ)上實(shí)現(xiàn)了咱枉,完整的代碼可以查看Gitee上的項(xiàng)目rabbitmq
rabbitMQ如何保證消息的不丟失
消息的丟失有以下四種情況:
- 消息發(fā)送到RabbitMQ服務(wù)器,交換機(jī)根據(jù)自身的類(lèi)型和路由鍵無(wú)法匹配到隊(duì)列徒恋,導(dǎo)致消息丟失
- 消息設(shè)置了過(guò)期時(shí)間蚕断,消息過(guò)期了導(dǎo)致消息丟失
- 消息不能被正確的消費(fèi),導(dǎo)致消息的丟失
- 因?yàn)榉?wù)器的崩潰導(dǎo)致消息的丟失
針對(duì)以上的情況入挣,rabbitMQ提供了不同的解決方案
消息設(shè)置mandatory參數(shù)和使用備份交換機(jī)
mandatory參數(shù)
在上篇博文中我們其實(shí)已經(jīng)在配置文件中配置了mandatory
的參數(shù)亿乳,并且在connectionFactory
中設(shè)置了其中的參數(shù)
spring:
rabbitmq:
template:
mandatory: true
publisher-confirms: true
publisher-returns: true
然后再發(fā)送消息的時(shí)候設(shè)置回調(diào)函數(shù)
/**
* 確認(rèn)后回調(diào)方法
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public final void confirm(CorrelationData correlationData, boolean ack, String cause) {
this.logger.info("confirm-----correlationData:" + correlationData.toString() + "---ack:" + ack + "----cause:" + cause);
// TODO 記錄日志(數(shù)據(jù)庫(kù)或者es)
this.handleConfirmCallback(correlationData.getId(), ack, cause);
}
/**
* 失敗后回調(diào)方法
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public final void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
this.logger.info("return-----message:" + message.toString() + "---replyCode:" + replyCode + "----replyText:" + replyText + "----exchange:" + exchange + "----routingKey:" + routingKey);
// TODO 記錄日志(數(shù)據(jù)庫(kù)或者es)
this.handleReturnCallback(message, replyCode, replyText, routingKey);
}
用戶(hù)可以在這里對(duì)消息進(jìn)行本地持久化,其實(shí)上面的也叫消息確認(rèn)模式径筏,發(fā)送端將消息發(fā)送給RabbitMQ葛假,rabbitMQ會(huì)異步回調(diào)Confirm方法,告訴發(fā)送方滋恬,RabbitMQ服務(wù)端有沒(méi)有收到消息聊训,如果沒(méi)有收到消息的話,原因是什么夷恍。同時(shí)會(huì)異步回調(diào)設(shè)置的returnedMessage將發(fā)送的消息返回
備份交換機(jī)
除了上面的消息確認(rèn)模式魔眨,還有一種備份交換機(jī)的方案也是可以解決消息的丟失問(wèn)題,具體的邏輯如下:
- 聲明一個(gè)交換機(jī)A酿雪,其類(lèi)型為fanout類(lèi)型
- 聲明一個(gè)交換機(jī)B遏暴,設(shè)置其
alternate-exchange
屬性為交換機(jī)A - 聲明一個(gè)隊(duì)列a,并且與交換機(jī)A綁定
- 聲明一個(gè)隊(duì)列b指黎,并且與交換機(jī)B綁定朋凉,路由鍵為rb
這樣我們就實(shí)現(xiàn)了備份交換機(jī)功能,其業(yè)務(wù)實(shí)現(xiàn)邏輯如下:
我們發(fā)送一個(gè)消息到交換機(jī)B上醋安,當(dāng)路由鍵等于rb時(shí)杂彭,消息會(huì)正確發(fā)送到隊(duì)列b上墓毒,當(dāng)路由鍵不等于rb時(shí),即消息不能正確的發(fā)送到隊(duì)列b上亲怠,此時(shí)就會(huì)發(fā)送給交換機(jī)A所计,由于交換機(jī)A是fanout類(lèi)型的,所以消息會(huì)被進(jìn)一步發(fā)送到隊(duì)列a上团秽。這樣我們就實(shí)現(xiàn)了發(fā)送方消息的不丟失主胧。
[圖片上傳失敗...(image-5a7baf-1562419602501)]
代碼實(shí)現(xiàn)這里給出alternaate-exchange
的實(shí)現(xiàn)
Map<String,Object> map = new HashMap<String,Object>();
map.put("alternate-exchange","A");
MqExchange exchange = new MqExchange().arguments(map).name("B").type(ExchangeTypeEnum.DIRECT.getCode());
amExchangeDeclare.declareExchange(exchange);
過(guò)期時(shí)間和死信隊(duì)列
過(guò)期時(shí)間
我們常見(jiàn)的購(gòu)物車(chē)訂單,一般有這樣的需求习勤,在規(guī)定的時(shí)間內(nèi)沒(méi)有付款的話踪栋,該訂單就會(huì)失效,這里常見(jiàn)就是使用消息的過(guò)期時(shí)間來(lái)控制的图毕,在rabbitMQ中實(shí)現(xiàn)過(guò)期時(shí)間有兩種方式
- 設(shè)置隊(duì)列的過(guò)期時(shí)間夷都,則該隊(duì)列中所有的消息的過(guò)期時(shí)間都是一樣的
// 設(shè)置隊(duì)列的過(guò)期時(shí)間
Map<String,Object> map = new HashMap<String,Object>();
map.put("x-message-ttl",6000);
MqQueue queue = new MqQueue().name(queueName).arguments(map);
amQueueDeclare.declareQueue(queue);
如果不設(shè)置ttl(Time To Live),則這個(gè)消息不會(huì)過(guò)期,如果將TTL設(shè)置為0予颤,則表示除非此時(shí)可以直接將消息投遞到消費(fèi)者囤官,否則消息會(huì)立即丟棄
- 設(shè)置消息的過(guò)期時(shí)間,只有這個(gè)消息存在過(guò)期時(shí)間荣瑟,設(shè)置消息的過(guò)期時(shí)間如下:
// 設(shè)置消息的過(guò)期時(shí)間
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(6000);
return message;
}
};
sendService.send(exchangeName,routingKey,data, messagePostProcessor, messageId);
注意:使用第一種方式來(lái)設(shè)置過(guò)期時(shí)間治拿,一旦消息過(guò)期,就會(huì)從隊(duì)列中消除笆焰,而采用第二種方式劫谅,即使消息過(guò)期,也不會(huì)馬上從隊(duì)列中消除嚷掠,因?yàn)槊織l消息是否過(guò)期是在即將投遞到消費(fèi)者之期間進(jìn)行判斷的
隊(duì)列也是有過(guò)期時(shí)間的捏检,通過(guò)x-expires
屬性來(lái)設(shè)置的
Map<String,Object> map = new HashMap<String,Object>();
map.put("x-expires",6000);
MqQueue queue = new MqQueue().name(queueName).arguments(map);
amQueueDeclare.declareQueue(queue);
死信隊(duì)列
DLX:Dead-Letter-Exchange:當(dāng)一個(gè)消息在一個(gè)隊(duì)列中變成死信之后,他能被重新發(fā)送到另一個(gè)交換機(jī)中不皆,這個(gè)交換機(jī)就是DLX贯城。
那么消息滿(mǎn)足什么條件就會(huì)成為死信呢?
- 消息被拒絕霹娄,并且設(shè)置requeue參數(shù)為false
- 消息過(guò)期
- 隊(duì)列達(dá)到最大長(zhǎng)度
那么如何使用死信隊(duì)列能犯?
死信隊(duì)列一般都是作為其他隊(duì)列的一個(gè)屬性來(lái)用的,當(dāng)這個(gè)隊(duì)列中存在死信時(shí)犬耻,RabbitMQ就會(huì)自動(dòng)將這個(gè)消息重新發(fā)布到設(shè)置的DLX上踩晶,進(jìn)而被路由到另一個(gè)隊(duì)列中
// 定義一個(gè)交換機(jī)
MqExchange dlxExchange = new MqExchange().name("B").type(ExchangeTypeEnum.DIRECT.getCode());
amExchangeDeclare.declareExchange(exchange);
// 聲明一個(gè)隊(duì)列時(shí),設(shè)置他的屬性 x-dead-letter-exchange 為上面定義的交換機(jī)
Map<String,Object> map = new HashMap<String,Object>();
map.put("x-dead-letter-exchange ","dlxExchange");
// 也可以為DLX指定路由鍵 這個(gè)不是必須的枕磁,如果沒(méi)有設(shè)置路由鍵渡蜻,則使用原隊(duì)列的路由鍵
map.put("x-dead-letter-routing-key","dlx-routing-key");
MqQueue queue = new MqQueue().name(queueName).arguments(map);
amQueueDeclare.declareQueue(queue);
這樣就初步完成了死信隊(duì)列的聲明。其業(yè)務(wù)流程圖如下:
延時(shí)隊(duì)列(定時(shí)隊(duì)列)
上述的購(gòu)物車(chē)訂單的示例,其實(shí)最優(yōu)的方案設(shè)計(jì)是使用TTL+DLX來(lái)實(shí)現(xiàn)茸苇,如果用戶(hù)沒(méi)有在規(guī)定的時(shí)間來(lái)支付排苍,則這個(gè)訂單就進(jìn)行一場(chǎng)處理。
延時(shí)隊(duì)列的具體使用方法如下:
方案一:
- 聲明一個(gè)設(shè)置死信隊(duì)列的隊(duì)列学密,該隊(duì)列沒(méi)有消費(fèi)者
- 給每一個(gè)發(fā)送該隊(duì)列的消息設(shè)置過(guò)期時(shí)間
- 消息一旦過(guò)期就會(huì)被死信隊(duì)列消費(fèi)淘衙,這樣就能實(shí)現(xiàn)延時(shí)隊(duì)列的效果
方案二:
- 聲明多個(gè)設(shè)置死信隊(duì)列、不同過(guò)期時(shí)間的隊(duì)列腻暮,該隊(duì)列沒(méi)有消費(fèi)者
- 然后通過(guò)不同的的routingKey來(lái)將消息發(fā)送到不同的隊(duì)列上
- 等隊(duì)列過(guò)期時(shí)間一到幔翰,消息就會(huì)匹配到死信隊(duì)列上,這樣也能實(shí)現(xiàn)延時(shí)隊(duì)列的效果
方案一和方案二的實(shí)現(xiàn)原理基本相同西壮,不同的是一個(gè)是消息的過(guò)期時(shí)間,一個(gè)是隊(duì)列的過(guò)期時(shí)間叫惊,方案二聲明多個(gè)不同過(guò)期時(shí)間的隊(duì)列款青,而方案一只聲明一個(gè)隊(duì)列,這樣的話有優(yōu)點(diǎn)也有缺點(diǎn)霍狰,優(yōu)點(diǎn)是減少了RabbitMQ的隊(duì)列數(shù)量抡草,缺點(diǎn)是降低了RabbitMQ隊(duì)列消息消費(fèi)的速度,而使用哪種方案可以根據(jù)業(yè)務(wù)和流量來(lái)衡量使用
具體的業(yè)務(wù)流程圖如下:
[圖片上傳失敗...(image-1f73c4-1562419602501)]
消息持久化
RabbitMQ中交換機(jī)蔗坯、隊(duì)列和消息都可以持久化康震,其中交換機(jī)和隊(duì)列的持久化只需要在聲明時(shí),其屬性durable為true即可宾濒,而消息的持久化是建立在隊(duì)列持久化的基礎(chǔ)上腿短,因?yàn)樵赗abbitMQ中,消息時(shí)存儲(chǔ)在隊(duì)列上的绘梦,隊(duì)列都沒(méi)有了橘忱,消息肯定也是存儲(chǔ)不了的。
消息的持久化在上面的內(nèi)容已經(jīng)介紹過(guò)了卸奉,不同版本的SpringBoot的RabbitMQ集成钝诚,實(shí)現(xiàn)的方式可能 不一樣,但是本質(zhì)都是一樣的榄棵,最終操作的都是RabbitMQ服務(wù)器
消息和隊(duì)列的持久化都是比較簡(jiǎn)單的凝颇,但是我們這里要清楚的知道,如果我們將所有的消息都設(shè)置了持久化了疹鳄,會(huì)嚴(yán)重影響RabbitMQ的性能拧略,畢竟消息寫(xiě)入到磁盤(pán)的速度比寫(xiě)入內(nèi)存的數(shù)據(jù)慢的不是一點(diǎn)點(diǎn)的。
這個(gè)就要求我們?cè)谠O(shè)計(jì)時(shí)需要注意尚辑,對(duì)于可靠性不是那么高的消息辑鲤,可以不采用持久化處理來(lái)提高吞吐量。在選擇是否將消息持久化時(shí)杠茬,需要在可靠性和吞吐量之間做一個(gè)權(quán)衡
消息消費(fèi)處理
參數(shù)isAck
這里需要注意一點(diǎn)月褥,在我們上一篇博文SpringBoot整合RabbitMQ——消息的發(fā)送和接收中弛随,我們有介紹在為隊(duì)列設(shè)置監(jiān)聽(tīng)時(shí),有個(gè)參數(shù)isAck
宁赤,這里如果設(shè)置成true舀透,則隊(duì)列在接收到消息后,不管業(yè)務(wù)方有沒(méi)有完全消費(fèi)消息决左,都會(huì)給RabbitMQ返回個(gè)消息已經(jīng)消費(fèi)成功的結(jié)果愕够,RabbitMQ在判斷消息已經(jīng)成功消費(fèi)了則會(huì)刪除隊(duì)列中的消息,但是業(yè)務(wù)方其實(shí)沒(méi)有真正完成消息的消費(fèi)佛猛,這樣就會(huì)導(dǎo)致數(shù)據(jù)的丟失惑芭。
那我們?cè)趺磥?lái)處理這個(gè)問(wèn)題呢?
其實(shí)很簡(jiǎn)單继找,我們只需要將isAck
字段的參數(shù)設(shè)置成false即可遂跟,這樣的話就需要業(yè)務(wù)方手動(dòng)去操作該消息有沒(méi)有被成功消費(fèi)。如果沒(méi)有消費(fèi)的話就拒絕這個(gè)消息婴渡,是的拒絕這個(gè)消息幻锁,還記得我們之前介紹過(guò)的死信隊(duì)列,隊(duì)列設(shè)置了死信隊(duì)列边臼,消息一旦被拒絕的話哄尔,消息就會(huì)進(jìn)入死信交換機(jī),進(jìn)而匹配到響應(yīng)的隊(duì)列中柠并,可以在隊(duì)列中將消息持久化到本地岭接,這是一種解決方法。
具體的流程圖可以參考死信隊(duì)列的流程圖:
消息分發(fā)
在實(shí)際的生產(chǎn)過(guò)程中堂鲤,可能一個(gè)隊(duì)列存在多個(gè)消費(fèi)者亿傅,那么隊(duì)列此時(shí)收到的消息就會(huì)以輪詢(xún)的方式發(fā)送給消費(fèi)者。
一般情況下瘟栖,rabbitMQ會(huì)將第m條消息發(fā)送給第m%n個(gè)消費(fèi)者葵擎。這里其實(shí)有個(gè)隱患,在消費(fèi)者任務(wù)非常繁重的情況下半哟,來(lái)不及消費(fèi)那么多的消息酬滤,而其他的消費(fèi)者,由于某些原因寓涨,很快的處理完消息盯串,這種情況就很容易出現(xiàn)某個(gè)消費(fèi)者承受的壓力就比較大,造成整體應(yīng)用的吞吐量下降戒良。
為了解決這個(gè)問(wèn)題体捏, 我們其實(shí)可以設(shè)置Channel信道上的最大處理消息的個(gè)數(shù),代碼設(shè)置如下:
// 在系統(tǒng)初始化啟動(dòng)時(shí),加載Connection几缭,創(chuàng)建Channel河泳,然后設(shè)置BasicQos的個(gè)數(shù)
Channel channel = cachingConnectionFactory.createConnection().createChannel();
channel.basicQos(1000);
basicQos
具體的作用是設(shè)置允許限制Channel上所有消費(fèi)者所能保持的最大未確認(rèn)的消息的數(shù)量
注意如果這里設(shè)置了最大的未確認(rèn)的消息的數(shù)量,那么所有的消費(fèi)者都會(huì)生效