嘮個(gè)嗑
網(wǎng)絡(luò)上搜羅了多次想知道 RabbitMQ 現(xiàn)實(shí)業(yè)務(wù)種怎么實(shí)現(xiàn)消息的可靠性的,但是大多都不太理想谦屑,站在各位大佬巨人的肩膀上研究了一段時(shí)間,我也整理了一套簡(jiǎn)單可行性的方案伞辛,包括消息異常處理币绩。這篇文章想主要講一些業(yè)務(wù)處理方案,項(xiàng)目中加入 RabbitMQ 中間件很簡(jiǎn)單瘸羡,但是根據(jù)具體業(yè)務(wù)實(shí)現(xiàn)消息的可靠性漩仙,這個(gè)需要多加考慮。當(dāng)然下面也會(huì)通過測(cè)試代碼來分析犹赖,文末也會(huì)附上源碼地址队他。
1、準(zhǔn)備
1.1峻村、環(huán)境準(zhǔn)備
之前博客上寫過一篇編譯安裝的方法 地址麸折,大家可以參考,因?yàn)?RabbitMQ 底層語言的原因可能稍微麻煩點(diǎn)粘昨,那就沒有辦法了嗎垢啼?如果你是先搞測(cè)試,再在項(xiàng)目中使用的話张肾,那可以使用 docker 安裝芭析,2 行代碼,如下
docker pull rabbitmq
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
1.2吞瞪、理論準(zhǔn)備
RabbitMQ 的類型包括:direct馁启、topic、fanout芍秆、headers惯疙、system(翻源碼看到的)
這里主要通過 topic 來分析,bindingkey 可以通過通配符 # 和 * 來匹配多個(gè) 路由鍵 (routingKey)浪听,
bindingkey 是綁定交換機(jī)(exchange)和隊(duì)列(queue)的螟碎, 生產(chǎn)者(publisher)發(fā)消息的時(shí)候會(huì)攜帶 routingKey、exchange 和 消息發(fā)送給 RabbitMQ迹栓,
連接成功后實(shí)際是組件 exchange 接收了生產(chǎn)者的消息掉分,然后通過 bindingkey 匹配 routingKey,決定送給哪個(gè) queue克伊,每個(gè)消費(fèi)者都會(huì)有 queue酥郭,所以 queue接收到消息 后就可以確保消費(fèi)者接可以收到消息了,最后消費(fèi)者再消費(fèi)愿吹。
再詳細(xì)的內(nèi)容可以查看大佬 erlie 的總結(jié) 地址
2不从、消息確認(rèn)
RabbitMQ 基礎(chǔ)配置
pom
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
spring:
rabbitmq:
host: 192.168.1.105
port: 5672
username: guest
password: guest
virtual-host: /
# 交換機(jī)、隊(duì)列和綁定鍵聲明
test:
exchange: test.exchange
one:
queue: one.test.queue
binding-key: one.test.key
consumer
@RabbitListener(bindings=@QueueBinding(
//配置交換器
exchange=@Exchange(value="${test.exchange}",type= ExchangeTypes.TOPIC),
//配置路由鍵
key="${test.one.binding-key}",
//配置隊(duì)列名稱
value=@Queue(value="${test.one.queue}",autoDelete="true")
))
public void test(String msg) {
log.info("test 收到的消息為:[{}]", msg);
//業(yè)務(wù)代碼...
}
publisher
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${test.exchange}")
private String exchange;
@Value("${test.one.binding-key}")
private String routingKey;
public void test() {
rabbitTemplate.convertAndSend(exchange, routingKey, "test msg");
}
通過上面默認(rèn)的配置基本上就可以使用 RabbitMQ 了,但是這不是本篇的重點(diǎn).我們要知道消息發(fā)送和到消費(fèi)的過程中出現(xiàn)問題怎么辦犁跪?這就需要我們分段確認(rèn)消息是否接收成功椿息,如果失敗了該如何處理歹袁,先想想可以怎么做。
我們先捋一下思路寝优,消息發(fā)送給 RabbitMQ 条舔,如果連接 RabbitMQ 失敗,則記錄該消息乏矾,如果連接成功但是 exchange 接收失敗則記錄下該消息孟抗,如果 exchange 接收成功但是 queue 接收 exchange 的消息失敗則記錄下該消息,消息從生產(chǎn)者到 queue 有 3 個(gè)位置可能因網(wǎng)絡(luò)抖動(dòng)或其他原因出現(xiàn)問題钻心,那我們?cè)谶@三個(gè)位置記錄下問題后凄硼,統(tǒng)一通過計(jì)劃定時(shí)獲取記錄的消息并且重新發(fā)送,如果重發(fā)三次還沒有成功則標(biāo)記該記錄為異常消息捷沸。
2.1摊沉、生產(chǎn)者消息確認(rèn)和回調(diào)
這里分兩步:
- 消息確認(rèn)指的是 RabbitMQ(exchange) 確認(rèn)接收到了消費(fèi)者發(fā)送的消息
- 消息回調(diào)指的是 queue 接收 exchange 的消息失敗,則回調(diào)告訴 RabbitMQ 失敗的消息
2.1.1亿胸、消息確認(rèn)
開啟配置
spring:
rabbitmq:
#publisher-confirms: true #已過時(shí)
publisher-confirm-type: correlated #開啟生產(chǎn)者消息確認(rèn)
還有另外 2 種模式:
- none 值是禁用發(fā)布確認(rèn)模式坯钦,是默認(rèn)值;
- simple 值經(jīng)測(cè)試有兩種效果,其一效果和 correlated 值一樣會(huì)觸發(fā)回調(diào)方法侈玄,其二在發(fā)布消息成功后使用 rabbitTemplate 調(diào)用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 節(jié)點(diǎn)返回發(fā)送結(jié)果婉刀,根據(jù)返回結(jié)果來判定下一步的邏輯,要注意的點(diǎn)是 waitForConfirmsOrDie 方法如果返回false則會(huì)關(guān)閉 channel序仙,則接下來無法發(fā)送消息到 broker突颊。
當(dāng)然還需新建一個(gè)類實(shí)現(xiàn) RabbitTemplate.ConfirmCallback,重寫方法 confirm潘悼,該方法有三個(gè)參數(shù) correlationData律秃、ack、cause治唤,主要說下 ack棒动,值為 true 表示 exchange 成功接收到消息,false 表示 exchange 接收消息失敗宾添,這里 2 種結(jié)果可以分別處理船惨,比如 false 可以把接收失敗的消息入庫,然后通過定時(shí)器來處理缕陕,比較懂的同學(xué)現(xiàn)在可能就有疑問了粱锐,correlationData 只能得到 msgId,根本沒有具體的消息,這里可以發(fā)揮你出色的想象力扛邑,可以通過對(duì)象封裝得到怜浅,也可以通過存內(nèi)存或者磁盤存儲(chǔ)得到,方法總比困難多蔬崩。
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String msgId = correlationData.getId();
if (ack) {
log.info("成功發(fā)送給 mq, msgId:[{}]", msgId);
} else {
log.error("發(fā)送給 mq 失敗, msgId:[{}], 失敗理由cause:[{}]", msgId, cause);
//消息從生產(chǎn)者沒有到 exchange恶座,那存庫
saveToDB(msgId,...);
}
}
這里還差一步搀暑,就是原生的 rabbitTemplate 怎么知道消息確認(rèn)時(shí)使用剛建的類呢,注入即可
rabbitTemplate.setConfirmCallback(剛建的類);如有疑問可看文末源碼奥裸。
2.1.2险掀、消息回調(diào)
spring:
rabbitmq:
publisher-returns: true #開啟生產(chǎn)者消息回調(diào)
同上沪袭,需要新建類并實(shí)現(xiàn) RabbitTemplate.ReturnCallback,并且重寫方法 returnedMessage湾宙, 最后需要注入如下內(nèi)容
rabbitTemplate.setReturnCallback(剛建的類);
// 要想使 returnCallback 生效,必須設(shè)置為true
rabbitTemplate.setMandatory(true);
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
String msg = new String(message.getBody());
log.error("消息回調(diào) msgId:[{}], msg:[{}] 不能被正確路由冈绊,replyCode:[{}], replyText:[{}], exchange:[{}], routingKey:[{}]", msgId, msg, replyCode, replyText, exchange, routingKey);
//消息從 exchange 沒有到 queue侠鳄, 那存庫
saveToDB(msgId, exchange, routingKey, msg);
}
如果消息發(fā)送時(shí)走到了回調(diào)方法 returnedMessage 中,說明目前的消息有問題是需要處理的死宣,同上伟恶,入庫。定時(shí)器來處理毅该。
當(dāng)然消息的發(fā)送方法 rabbitTemplate.convertAndSend() 會(huì)多一個(gè)參數(shù) correlationData
具體處理方法可以參考源碼博秫,這里只提供思路。
2.2眶掌、消費(fèi)者消息確認(rèn)
消費(fèi)者消息確認(rèn)是確認(rèn)消費(fèi)了隊(duì)列中的消息挡育,如果出現(xiàn)問題 RabbitMQ 會(huì)有重試機(jī)制,長時(shí)間失敗則需要人工干預(yù)朴爬,這個(gè)和生產(chǎn)者的確認(rèn)是先后關(guān)系即寒,實(shí)際是沒有關(guān)聯(lián)關(guān)系的,說到這召噩,說下我之前轉(zhuǎn)的牛角尖母赵,一直想尋找 exchange 如何確認(rèn)消費(fèi)者成功消費(fèi)消息,但是無果具滴,后來細(xì)想凹嘲,RabbitMQ 應(yīng)該設(shè)計(jì)的就是消費(fèi)者和 queue 交互,沒必要和 exchange 交互构韵。如有大佬知道 exchange 如何 ack 消費(fèi)者消費(fèi)消息可以告訴小弟周蹭,不勝感激。
圓規(guī)正轉(zhuǎn)贞绳,上消費(fèi)者代碼
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual #開啟消費(fèi)者消息確認(rèn); none:自動(dòng)確認(rèn)谷醉、auto:根據(jù)情況確認(rèn)
@RabbitListener(bindings=@QueueBinding(
//配置交換器
exchange=@Exchange(value="${test.exchange}",type= ExchangeTypes.TOPIC),
//配置路由鍵
key="${test.one.binding-key}",
//配置隊(duì)列名稱
value=@Queue(value="${test.one.queue}",autoDelete="true")
))
public void test(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("test 收到的消息為:[{}], msgId:[{}]", msg, message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"));
try {
//業(yè)務(wù)處理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
try {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
主要說 3 個(gè)方法:
1、basicAck 是確認(rèn)消息冈闭,需要傳遞兩個(gè)參數(shù)
- deliveryTag(唯一標(biāo)識(shí) ID):當(dāng)一個(gè)消費(fèi)者向 RabbitMQ 注冊(cè)后俱尼,會(huì)建立起一個(gè) Channel ,RabbitMQ 會(huì)用 basic.deliver 方法向消費(fèi)者推送消息萎攒,這個(gè)方法攜帶了一個(gè) delivery tag遇八, 它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標(biāo)識(shí) ID矛绘,是一個(gè)單調(diào)遞增的正整數(shù),delivery tag 的范圍僅限于 Channel
- multiple:為了減少網(wǎng)絡(luò)流量刃永,手動(dòng)確認(rèn)可以被批處理货矮,當(dāng)該參數(shù)為 true 時(shí),則可以一次性確認(rèn) delivery_tag 小于等于傳入值的所有消息斯够,false則只確認(rèn)傳入值等于 delivery_tag 的消息
2囚玫、basicNack 是拒絕消息,可以拒絕多條读规,比 basicAck 多一個(gè)布爾值的參數(shù)抓督,如果為 true,被 nack 后重新入隊(duì)列然后重新消費(fèi)消費(fèi)束亏;如果為 false 被 nack 就丟了瘸恼。
3肯污、basicReject 只能拒絕一條消息铐料,reject 后消息直接丟了改基。
總結(jié)
這里簡(jiǎn)單實(shí)現(xiàn) RabbitMQ 消息可靠的方式是通過把消息發(fā)送時(shí)出現(xiàn)問題后直接入庫,然后通過計(jì)劃定時(shí)查詢?cè)僦匦掳l(fā)送給 RabbitMQ,如果 exchange 成功 ack 后則標(biāo)記為重發(fā)成功怕敬,如果重發(fā) 3 次還是失敗則標(biāo)記異常揣炕,需要人工處理。
討論
這種處理其實(shí)不算是最優(yōu)的方案赖捌,技術(shù)上還可以有如下方案
- 生產(chǎn)者發(fā)消息時(shí)記錄該條消息祝沸,并設(shè)該記錄 1 分鐘后生效,留 1 分鐘給exchange 確認(rèn)并直接標(biāo)記該消息記錄為成功越庇,然后計(jì)劃任務(wù)定時(shí)掃有效且未確認(rèn)的消息罩锐,再發(fā)送給 RabbitMQ ,如果確認(rèn)后則標(biāo)記為成功卤唉,否則 3 次后標(biāo)記為失敗涩惑。
- 還和小編寫的方案類似,記錄失敗的消息桑驱,但是定時(shí)任務(wù)獲取到失敗的消息后竭恬,直接調(diào)用消費(fèi)者的服務(wù)處理,不通過 RabbitMQ熬的, 但是這就需要維護(hù)消息和消費(fèi)者服務(wù)的關(guān)系了痊硕,稍微復(fù)雜些。
最后希望可以幫到看官押框,如果記錄的不對(duì)煩請(qǐng)?jiān)u論指出岔绸,一起討論
https://github.com/charmsongo/springboot-samples/tree/master/springboot-rabbitmq-songo