[TOC]
一鸭巴、RabbitMQ消息發(fā)送機(jī)制
消息由生產(chǎn)者生產(chǎn)鹃祖,首先并發(fā)送到交換機(jī)(Exchange),然后由交換機(jī)根據(jù)指定的路由規(guī)則將消息路由到不同的隊(duì)列(Queue)中校读。最后由不同的消費(fèi)者去消費(fèi)處理歉秫。
根據(jù)RabbitMQ的機(jī)制分析雁芙,想要確保消息發(fā)送的可靠性兔甘,需要保證兩個(gè)方面:
- 確認(rèn)消息到達(dá)交換機(jī)洞焙,publish
- 確認(rèn)消息到達(dá)隊(duì)列,routes
這兩步如果全成功了熔任,說明消息已經(jīng)成功發(fā)送笋敞;任何一步出現(xiàn)問題荠瘪,消息就沒發(fā)送成功。
二喷兼、方案1: 開啟事務(wù)機(jī)制
在SpringBoot項(xiàng)目中可以通過開啟RabbitMQ事務(wù)機(jī)制的方式解決季惯,具體操作如下勉抓。
-
提供一個(gè)事物管理器
@Bean RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory); }
-
生產(chǎn)者上添加事務(wù)注解并設(shè)置通信信道為事務(wù)模式
@Service public class MsgService { @Autowired RabbitTemplate rabbitTemplate; // 添加事務(wù)注解 @Transactional public void send() { // 開啟事務(wù)模式 rabbitTemplate.setChannelTransacted(true); rabbitTemplate.convertAndSend("ecchange", "queue", "Hello World!"); throw new RuntimeException(); } }
開啟事務(wù)模式以后纵散,RabbitMQ生產(chǎn)者發(fā)送消息要經(jīng)過如下4步:
- 客戶端請(qǐng)求伍掀,將信道設(shè)置為事務(wù)模式蜜笤;
- 服務(wù)端響應(yīng)瘩例,設(shè)置完成垛贤;
- 客戶端發(fā)送消息趣倾;
- 客戶端提交事務(wù);
- 服務(wù)端響應(yīng)善绎,確認(rèn)事務(wù)提交禀酱。
正常情況我們發(fā)送消息只需要第三步剂跟,可以看出上面的步驟比較復(fù)雜曹洽,所以事務(wù)模式效率不是很高送淆。
三辟拷、方案2: 發(fā)送方確認(rèn)
添加配置阐斜,啟用發(fā)送方確認(rèn)衫冻。注意,發(fā)送方確認(rèn)和事務(wù)不能同時(shí)存在智听,會(huì)報(bào)錯(cuò)羽杰。
spring:
rabbitmq:
# 消息到達(dá)交換器確認(rèn)回調(diào)
publisher-confirm-type: correlated
# 消息到達(dá)隊(duì)列回調(diào)
publisher-returns: true
publisher-confirm-type
枚舉如下:
public enum ConfirmType {
/**
* Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
* within scoped operations.
*/
SIMPLE,
/**
* Use with {@code CorrelationData} to correlate confirmations with sent
* messsages.
*/
CORRELATED,
/**
* Publisher confirms are disabled (default).
*/
NONE
}
配置監(jiān)聽,實(shí)現(xiàn)回調(diào)
@Slf4j
@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
public static final String EXCHANGE_NAME = "exchange_name";
public static final String QUEUE_NAME = "queue.name";
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
Queue queue() {
return new Queue(QUEUE_NAME);
}
@Bean
DirectExchange directExchange() {
return new DirectExchange(EXCHANGE_NAME);
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue()).to(directExchange()).with(QUEUE_NAME);
}
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/**
* Confirmation callback.
*
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("{}:消息成功到達(dá)交換器", correlationData.getId());
} else {
log.error("{}:消息發(fā)送失敗", correlationData.getId());
}
}
/**
* Returned message callback.
*
* @param message the returned message.
* @param replyCode the reply code.
* @param replyText the reply text.
* @param exchange the exchange.
* @param routingKey the routing key.
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("{}:消息未成功路由到隊(duì)列", message.getMessageProperties().getMessageId());
}
}
三到推、方案3: 失敗重試
失敗重試分兩種情況考赛,沒鏈接上mq的鏈接重試和鏈接上了發(fā)送失敗的發(fā)送重試。
1. 鏈接重試
利用Spring Boot自帶的重試機(jī)制莉测,直接通過配置開啟即可:
spring:
rabbitmq:
template:
retry:
enabled: true #開啟重試機(jī)制
initial-interval: 1000ms #重試起始時(shí)間間隔
max-attempts: 10 #最大重試次數(shù)
max-interval: 10000ms #最大重試時(shí)間間隔
multiplier: 2 #時(shí)間間隔系數(shù)
配置完以后颜骤,當(dāng)MQ鏈接斷開后,Spring會(huì)進(jìn)行retry連接。retry時(shí)間間隔為起始時(shí)間間隔*系數(shù)
。
2. 發(fā)送重試
發(fā)送重試主要針對(duì)的是消息沒有到達(dá)交換器祟绊。總體思路就是:利用消息確認(rèn)機(jī)制,當(dāng)消息沒有到達(dá)交換器時(shí),就會(huì)走失敗回調(diào)厚满,在回調(diào)方法中進(jìn)行相應(yīng)的業(yè)務(wù)補(bǔ)償處理即可丰榴。
利用數(shù)據(jù)庫(kù)存儲(chǔ)發(fā)送的消息記錄,創(chuàng)建數(shù)據(jù)表,大致字段內(nèi)容如下:
字段名 | 字段類型 | 字段是否為空 | 默認(rèn)值 | 備注 |
---|---|---|---|---|
id | bigint(20) unsigned | N | 主鍵仆邓,消息id | |
content | text | N | 消息內(nèi)容 | |
state | tinyint(1) | N | 狀態(tài):0-發(fā)送中嗓蘑,1-成功,2-失敗 | |
route_key | varchar(255) | N | 路由key | |
exchange | varcher(255) | N | 交換機(jī) | |
count | tinyint | N | 重試次數(shù) | |
try_time | datetime | N | 重試時(shí)間 | |
create_time | datetime | N | CURRENT_TIMESTAMP | 創(chuàng)建時(shí)間 |
update_time | datetime | N | CURRENT_TIMESTAMP | 更新時(shí)間 |
del_flag | tinyint(1) | N | 邏輯刪除 |
操作步驟如下:
- 消息發(fā)送前巷燥,向表中插入消息發(fā)送記錄,狀態(tài)為0抛姑,try_time根據(jù)實(shí)際情況設(shè)置即可蔬啡。
- 在confirm回調(diào)方法中顽腾,收到發(fā)送成功的回調(diào)漓摩,則將該消息的狀態(tài)修改為1啃炸。
- 通過定時(shí)job掃描發(fā)送記錄,篩選出狀態(tài)為0融击,并且過了重試時(shí)間的消息匣屡,重新發(fā)送。重試次數(shù)根據(jù)實(shí)際情況判斷怔接。
注意瓦侮,在發(fā)送的時(shí)候會(huì)出現(xiàn)重復(fù)發(fā)送的情況斋泄,所以在消費(fèi)的時(shí)候需要做好冪等摔认。