- 以下例子代碼可在github或者在gitee下載
github:代碼鏈接
gitee:代碼鏈接 - 前兩篇博文:
springboot rabbitmq入門使用
springboot rabbitmq不同交換機類型實戰(zhàn)
RabbitMQ的高可用主要體現(xiàn)在消息的發(fā)送专缠、傳輸和接收的過程中同廉,可以保證消息成功發(fā)送、不會丟失瞎惫,以及被確認消費/不重復(fù)消費。
- 對于消息是否發(fā)送成功钱豁,主要是針對生產(chǎn)者端的消息生產(chǎn)確認機制;
- 對于消息不會丟失疯汁,主要是rabbitmq消息持久化機制牲尺;
- 對于消息確認消費/不重復(fù)消費,主要是針對消費者端對消息的確認消費機制涛目。
一秸谢、消息生產(chǎn)確認機制
對于消息是否發(fā)送成功,在rabbitmq自定義操作組件中可以統(tǒng)一設(shè)置消息生產(chǎn)確認相關(guān)邏輯rabbitTemplate.setConfirmCallback和rabbitTemplate.setReturnCallback霹肝。
@Slf4j
@Configuration
public class RabbitmqConfig {
//自定義配置RabbitMQ發(fā)送消息的操作組件RabbitTemplate
@Bean
public RabbitTemplate rabbitTemplate(){
//設(shè)置“發(fā)送消息后進行確認”
connectionFactory.setPublisherConfirms(true);
//設(shè)置“發(fā)送消息后返回確認信息”
connectionFactory.setPublisherReturns(true);
//構(gòu)造發(fā)送消息組件實例對象
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
//發(fā)送消息后估蹄,如果發(fā)送成功,則輸出“消息發(fā)送成功”的反饋信息
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})", correlationData,ack,cause));
//發(fā)送消息后沫换,如果發(fā)送失敗臭蚁,則輸出“消息發(fā)送失敗-消息丟失”的反饋信息
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message));
//定義消息傳輸?shù)母袷綖镴SON字符串格式
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//最終返回RabbitMQ的操作組件實例RabbitTemplate
return rabbitTemplate;
}
}
二、消息持久化
- 在創(chuàng)建交換機和隊列的時候讯赏,有個durable的參數(shù)垮兑,即是否持久化,如果設(shè)置為true漱挎,當(dāng)rabbitmq服務(wù)器重啟的時候系枪,創(chuàng)建的交換機和隊列均還存在著,不會丟失磕谅;
- 在發(fā)送消息的時候可以選擇為該消息設(shè)置持久化私爷,即消息體Message的deliveryMode設(shè)置為MessageDeliveryMode.PERSISTENT持久化雾棺,當(dāng)消息來不及消費rabbitmq服務(wù)器重啟,那么消息依舊存在衬浑,如果將所有消息都設(shè)置持久化捌浩,那么會影響性能,內(nèi)存和磁盤的讀寫速度差異很大工秩。
三尸饺、消息確認消費機制
如何保證消息能夠被準(zhǔn)備消費、不重復(fù)消費助币,RabbitMQ提供了消息確認機制浪听,即ACK模式。RabbitMQ的消息確認機制有3種奠支,分別是NONE(無須確認)馋辈、AUTO(自動確認)和MANUAL(手動確認)。
-
無須確認流程圖如下圖所示倍谜,對于該模式,消息是否消費成功生產(chǎn)者端是不知道的叉抡,存在可能重復(fù)消費/消息消費失敗的情況:
無需確認.jpeg -
代碼目錄如圖所示尔崔,演示自動確認和手動確認:
自動確認和手動確認.png
對于設(shè)置ACK模式,可以在yaml配置文件中設(shè)置spring.rabbitmq.listener.simple.acknowledge-mode: xxx褥民,也可以在聲明的監(jiān)聽器Bean中設(shè)置季春,用簡單監(jiān)聽器SimpleRabbitListenerContainerFactory即可:
@Slf4j
@Configuration
public class RabbitmqConfig {
/**
* 確認消費模式為自動確認機制-AUTO,采用直連傳輸directExchange消息模型
*/
@Bean
public SimpleRabbitListenerContainerFactory singleListenerContainerAuto(){
//定義消息監(jiān)聽器所在的容器工廠
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//設(shè)置容器工廠所用的實例
factory.setConnectionFactory(connectionFactory);
//設(shè)置消息在傳輸中的格式,在這里采用JSON的格式進行傳輸
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//設(shè)置并發(fā)消費者實例的初始數(shù)量消返。在這里為1個
factory.setConcurrentConsumers(1);
//設(shè)置并發(fā)消費者實例的最大數(shù)量载弄。在這里為1個
factory.setMaxConcurrentConsumers(1);
//設(shè)置并發(fā)消費者實例中每個實例拉取的消息數(shù)量-在這里為1個
factory.setPrefetchCount(1);
//確認消費模式為自動確認機制
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
/**
* 確認消費模式為手動確認機制-MANUAL,采用直連傳輸directExchange消息模型
*/
@Bean
public SimpleRabbitListenerContainerFactory singleListenerContainerManual(){
//定義消息監(jiān)聽器所在的容器工廠
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//設(shè)置容器工廠所用的實例
factory.setConnectionFactory(connectionFactory);
//設(shè)置消息在傳輸中的格式,在這里采用JSON的格式進行傳輸
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//設(shè)置并發(fā)消費者實例的初始數(shù)量撵颊。在這里為1個
factory.setConcurrentConsumers(1);
//設(shè)置并發(fā)消費者實例的最大數(shù)量宇攻。在這里為1個
factory.setMaxConcurrentConsumers(1);
//設(shè)置并發(fā)消費者實例中每個實例拉取的消息數(shù)量-在這里為1個
factory.setPrefetchCount(1);
//確認消費模式為自動確認機制
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
(1)自動確認模式
自動確認模式流程圖如圖所示,RabbitMQ內(nèi)置組件通知生產(chǎn)者端倡勇,當(dāng)消息成功消費/消費失敗都會通知:
對于自動確認模式逞刷,在消費者端可以看到和普通的消息隊列沒什么區(qū)別,而手工確認消費模式則比較靈活妻熊。
- 確認消費模式為自動確認機制-AUTO,采用直連傳輸directExchange消息模型-生產(chǎn)者
@Slf4j
@Component
public class AutoAckPublisher {
//定義RabbitMQ消息操作組件RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 發(fā)送消息
*/
public void sendMsg(Order order) {
try {
//設(shè)置交換機
rabbitTemplate.setExchange(RabbitMqConstants.AUTO_ACKNOWLEDGE_EXCHANGE);
//設(shè)置路由
rabbitTemplate.setRoutingKey(RabbitMqConstants.AUTO_ACKNOWLEDGE_ROUTING_KEY);
//發(fā)送消息
rabbitTemplate.convertAndSend(order);
log.info("確認消費模式為自動確認機制-消息模型DirectExchange-one-生產(chǎn)者-發(fā)送消息:{} ",order);
}catch (Exception e){
log.error("確認消費模式為自動確認機制-消息模型DirectExchange-one-生產(chǎn)者-發(fā)送消息:{},發(fā)生異常:{} ",order, e);
}
}
}
- 確認消費模式為自動確認機制-AUTO,采用直連傳輸directExchange消息模型-消費者
@Slf4j
@Component
public class AutoAckConsumer {
@RabbitListener(queues = RabbitMqConstants.AUTO_ACKNOWLEDGE_QUEUE, containerFactory = "singleListenerContainerAuto")
public void consumeMsg(Order order) {
try {
log.info("基于AUTO的自動確認消費模式-消費者監(jiān)聽消費消息-內(nèi)容為:{} ",order);
}catch (Exception e){
log.error("基于AUTO的自動確認消費模式-消費者監(jiān)聽消費消息:{},發(fā)生異常:", order, e);
}
}
}
(2)手工確認流程圖如圖所示夸浅,當(dāng)消息處理過程中出現(xiàn)異常的時候,需要手工確認處理該異常消息扔役,該消息是否重新歸入隊列等處理帆喇。
- 確認消費模式為手動確認機制-MANUAL,采用直連傳輸directExchange消息模型-生產(chǎn)者
@Slf4j
@Component
public class ManualAckPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 發(fā)送消息
*/
public void sendMsg(Order order) {
try {
rabbitTemplate.setExchange(RabbitMqConstants.MANUAL_ACKNOWLEDGE_EXCHANGE);
rabbitTemplate.setRoutingKey(RabbitMqConstants.MANUAL_ACKNOWLEDGE_ROUTING_KEY);
rabbitTemplate.convertAndSend(order);
log.info("確認消費模式為手動確認機制-消息模型DirectExchange-one-生產(chǎn)者-發(fā)送消息:{} ", order);
}catch (Exception e){
log.error("確認消費模式為手動確認機制-消息模型DirectExchange-one-生產(chǎn)者-發(fā)送消息:{},發(fā)生異常:{} ", order, e);
}
}
}
- 確認消費模式為手動確認機制-MANUAL,采用直連傳輸directExchange消息模型-消費者
在監(jiān)聽到消息并且消息成功處理完之后,通過basicAck來確認消息成功消費亿胸,當(dāng)捕獲到異常的時候即該消息處理失敗的時候坯钦,有兩種方式预皇,一種是拒絕該消息并且消息重新歸入隊列中,另一種是拒絕該消息并且丟棄掉葫笼,一般情況下重新歸入隊列深啤,還是會出現(xiàn)異常沒法消費掉,除非把異常修復(fù)了才行路星,并且在未修復(fù)該異常的情況下溯街,后面的消息會被堵塞住沒辦法消費,將消息重新歸入隊列中或許不是一個好的選擇洋丐。
一般情況下可以保留該消息的信息然后把消息丟棄掉呈昔,最后重新發(fā)送消息;或者把該消息丟入到死信隊列中友绝,不對該死信隊列進行監(jiān)聽堤尾,最后在rabbitmq管理后臺取出該消息/重新監(jiān)聽該消息重新發(fā)送到原先隊列進行消費,修復(fù)好異常情況再發(fā)送消息進行處理迁客,保證消息成功消費郭宝。
@Slf4j
@Component
public class ManualAckConsumer {
@RabbitListener(queues = RabbitMqConstants.MANUAL_ACKNOWLEDGE_QUEUE, containerFactory = "singleListenerContainerManual")
public void consumeMsg(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws IOException {
try {
log.info("基于MANUAL的手工確認消費模式-消費者監(jiān)聽消費消息,消息投遞標(biāo)記:{},內(nèi)容為:{} ", tag, order);
//拋異常,歸入使得消息重新歸入隊列
//int num = 1 / 0;
//執(zhí)行完業(yè)務(wù)邏輯后,手動進行確認消費掷漱,其中第一個參數(shù)為:消息的分發(fā)標(biāo)識(全局唯一);第二個參數(shù):是否允許批量確認消費
channel.basicAck(tag, false);
}catch (Exception e){
//第二個參數(shù)reueue重新歸入隊列,true的話會重新歸入隊列,需要人為地處理此次異常消息,重新歸入隊列也會繼續(xù)異常
channel.basicReject(tag, true);
log.error("基于MANUAL的手工確認消費模式-消費者監(jiān)聽消費消息:{},消息投遞標(biāo)簽:{},發(fā)生異常:", order, tag, e);
}
}
}
出現(xiàn)異常重新歸入隊列的情況粘室,如圖所示,顯示有unacked 1條消息卜范,下面有g(shù)et messages衔统,當(dāng)點擊的時候發(fā)現(xiàn)提示queue is empty隊列為空,確實準(zhǔn)備消費的消息為0條海雪,正在消費的消息一直是unacked狀態(tài)無法取出锦爵。
這個時候只能停止監(jiān)聽重啟項目,這個在線上不是好的辦法奥裸,停止監(jiān)聽之后消息變?yōu)閞eady狀態(tài)险掀,這個時候可以取出,可以看到提示“取出消息是毀滅性的操作”刺彩。
四種取出消息的模式迷郑,分別為:不確認消息重新歸入隊列、確認消息不重新歸入隊列创倔、拒絕該消息重新歸入隊列嗡害、拒絕該消息不重新歸入隊列。當(dāng)取出消息可以看到消息的內(nèi)容畦攘。
對于確認消息消費霸妹,避免消息異常出現(xiàn)上述情況,可以用死信隊列來處理知押,捕獲異常消息叹螟,發(fā)送消息到死信隊列鹃骂,不監(jiān)聽該隊列的消息,最后修復(fù)異常重新發(fā)送消息到原先隊列進行消費罢绽,詳情請看下篇博文
參考資料:
《分布式中間件實戰(zhàn)》
《rabbitmq實戰(zhàn)指南》