- Producer發(fā)送消息到Broker减拭,Broker向Producer確認消息蔽豺,這個步驟叫Publisher Confirms,詳細在前面文章中有介紹——【RabbitMQ的那點事】發(fā)送方確認機制(Publisher Confirms):http://www.reibang.com/p/15f0c1a105fb
- 消息從Broker再發(fā)送至Consumer拧粪,Consumer向Broker確認消息修陡,這個步驟叫acknowledgements,也就是消費端的消息可靠性可霎,即本文內(nèi)容魄鸦。
1. 關(guān)于消費端確認模式(acknowledgements modes),官網(wǎng)文章:
- https://www.rabbitmq.com/consumers.html#acknowledgement-modes
- https://www.rabbitmq.com/confirms.html#acknowledgement-modes
2. 當注冊一個Consumer程序時癣朗,可以選擇多種投遞方式:
- 無ack模式(AcknowledgeMode.NONE)拾因,默認模式,意思是發(fā)送方不需要確認旷余,即fire and forget(發(fā)后即忘绢记,在消息發(fā)出后就立即將這條消息刪除,而不管消費端是否接收到正卧,是否處理完)
- 發(fā)送方需要消費端確認消息模式蠢熄,(消費端需要告知消息已經(jīng)收到)。這其中分兩種:
- 一種是自動確認:AcknowledgeMode.AUTO模式下炉旷,由Container自動應(yīng)答签孔,正確處理發(fā)出ack信息,處理失敗發(fā)出nack信息砾跃,rabbitmq發(fā)出消息后將會等待consumer端的應(yīng)答骏啰,只有收到ack確認信息才會把消息清除掉,收到nack信息的處理辦法由setDefaultRequeueRejected()方法設(shè)置抽高,所以在這種模式下,發(fā)生錯誤的消息是可以恢復的透绩。
- 另一種是手動確認翘骂,AcknowledgeMode.MANUAL≈愫溃基本同AUTO模式碳竟,區(qū)別是需要人為調(diào)用方法給應(yīng)答。
3. 想要消息的可靠消息狸臣,需要采用手動確認模式莹桅。
以下是manual mode的代碼示例,參考了文章:https://javamana.com/2021/09/20210912132719181S.html
3.1 配置:
spring:
rabbitmq:
port: 5672
host: localhost
virtual-host: spring-boot-test
listener:
simple:
acknowledge-mode: manual
如果我們使用了manual(需要手動在消費端確認消息模式)烛亦,然后使用了之前的RabbitListener去接收消息:
@RabbitListener(queues = "direct.queue")
public void listen(String in) {
System.out.println("Direct Message Listener: " + in);
}
可以看到確實也能接收到消息诈泼,但看控制臺懂拾,消息依舊還是處于Ready的狀態(tài),也就是消息雖然被投遞過铐达,但Broker并沒有收到確認:3.2 那么怎樣進行手動確認岖赋?
還是使用@RabbitListener,但需要在參數(shù)中加上:
- message瓮孙,用來查看是否是再次投遞過(getRedelivered)
- channel用來回復ack
- deliveryTag:消息投遞序號唐断,每個channel對應(yīng)一個(long類型),從1開始到9223372036854775807范圍杭抠,在手動消息確認時可以對指定delivery_tag的消息進行ack脸甘、nack、reject等操作偏灿。例如上面Ready中有4個消息斤程,那么deliveryTag分別是:1, 2菩混, 3忿墅, 4。
示例做了什么事情:
a. 接收消息
b. 處理(1除以0沮峡,業(yè)務(wù)處理報錯)疚脐,即沒來得及做act
c. 在異常處理中嘗試重新投遞,如果發(fā)現(xiàn)已經(jīng)重新投遞過一次(通過判斷messga的redelivered flag是否為true)邢疙,如果是棍弄,那么拒絕消息。
d. 在異常中嘗試再次返回隊列處理
以上的處理涉及到的方法:
- 告知Broker消息已收到疟游,使用void basicAck(long deliveryTag, boolean multiple)荔睹,其中deliveryTag在上述有介紹孔飒,是消息投遞的序號,每個channel的唯一值。multiple如果設(shè)為true哨鸭,則表示會確實該channel中的所有delivery tags。例如這個channel中有5爆班, 6南吮, 7, 8需要確定笋籽,如果調(diào)用basicAct(8, true)蹦漠,則會把5-8的delivery tag的消息都確認掉。如果multiple為false车海,那么例子中只會確認delivery tag = 8的消息笛园。
- 告知Broker消息消費失敗:basicNack(long deliveryTag, boolean multiple, boolean requeue),此方法有三個參數(shù)研铆,前兩個和basicAct相似埋同,最后一個是requeue,如果為true蚜印,則會將消息放回原隊列頭部莺禁。如果為false,在配置了dead letter exchange(死信通道)那么則會放這里窄赋,否則會丟掉哟冬。
- 另一個方法basicReject(long deliveryTag, boolean requeue),和basicNack類似忆绰,也是告知Broker消息消費失敗浩峡,只不過不能multiple確認。
@Slf4j
@Component
public class RabbitMQListener {
@RabbitListener(queues = {"direct.queue"})
public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
MessageProperties messageProperties = message.getMessageProperties();
try {
log.info("Get message: {}", message.toString());
log.info("DeliveryTag: {}", tag);
int a = 1/0; // 主動拋出錯誤
channel.basicAck(tag, false);
} catch (Exception e) {
log.error("met exception......{}", e.getMessage());
// 當前的消息是否重新投遞的消息,也就是該消息是重新回到隊列里的消息
log.info("messageProperties.getRedelivered(): {}", messageProperties.getRedelivered());
if (messageProperties.getRedelivered()) {
log.info("消息已重復處理失敗,拒絕再次接收...");
// 拒絕消息
channel.basicReject(tag, false);
} else {
log.info("消息即將再次返回隊列處理...");
channel.basicNack(tag, false, true);
}
}
}
}
測試結(jié)果:
2022-05-08 15:17:07.232 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : Get message: (Body:'hello, i am direct message!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=direct-routing-key, deliveryTag=1, consumerTag=amq.ctag-i3qgOdtD8H4bm7_5OUQXTQ, consumerQueue=direct.queue])
2022-05-08 15:17:07.235 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : DeliveryTag: 1
2022-05-08 15:17:07.236 ERROR 32107 --- [ntContainer#4-1] RabbitMQListener : met exception....../ by zero
2022-05-08 15:17:07.236 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : messageProperties.getRedelivered(): false
2022-05-08 15:17:07.236 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : 消息即將再次返回隊列處理...
2022-05-08 15:17:07.238 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : Get message: (Body:'hello, i am direct message!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=direct.exchange, receivedRoutingKey=direct-routing-key, deliveryTag=2, consumerTag=amq.ctag-i3qgOdtD8H4bm7_5OUQXTQ, consumerQueue=direct.queue])
2022-05-08 15:17:07.238 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : DeliveryTag: 2
2022-05-08 15:17:07.238 ERROR 32107 --- [ntContainer#4-1] RabbitMQListener : met exception....../ by zero
2022-05-08 15:17:07.238 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : messageProperties.getRedelivered(): true
2022-05-08 15:17:07.238 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : 消息已重復處理失敗,拒絕再次接收...
關(guān)于配置:
有些文章用的是listener.direct.acknowledge-mode: manual
參考Spring Boot 2.x的文檔:https://docs.spring.io/spring-boot/docs/2.0.0.RC1/reference/html/common-application-properties.html
原因是Spring AMQP現(xiàn)在支持兩種container type了错敢。
關(guān)于兩種type的區(qū)別翰灾,可以看網(wǎng)友的文章——RabbitMQ筆記(七)-SimpleMessageListenerContainer和DirectMessageListenerContainer:https://blog.csdn.net/yingziisme/article/details/86418580
4. 總結(jié)
消費方的ACK機制可以有效的解決消息從Broker到Consumer丟失的問題。但也要注意一點:消息的無限消費稚茅。
5. 額外的一些測試
5.1 上述關(guān)于deliveryTag纸淮,范圍是每個channel,例如:
啟動兩個Consumer亚享,都去訂閱同一個queue
這時候Producer往Broker中發(fā)10個消息咽块,那么每個Consumer都能收到5個消息,DeliveryTag都是1欺税,2侈沪,3,4晚凿,5
首先是Consumer端:
啟動兩個Instance:
@Slf4j
@Component
public class MultipleRabbitMQListener {
@RabbitListener(queues = {"direct.queue"})
public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
log.info("Get message: {}", new String(message.getBody()));
log.info("DeliveryTag: {}", tag);
ThreadUtils.sleep(30000); // 模擬業(yè)務(wù)處理時間
channel.basicAck(tag, false);
}
}
其次是Producer端亭罪,在兩個Consumer instance啟動后再啟動:
@Test
public void sendMessageToDirectExchange1() {
for (int i = 0; i < 10; i ++) {
rabbitTemplate.convertAndSend("direct.exchange", "direct-routing-key", "message - " + i);
}
}
測試結(jié)果:可以看到每個Channel的DeliveryTag都是自增的。
另外歼秽,就算業(yè)務(wù)需要處理30秒应役,Broker在沒有收到Consumer的ack之前,不會再把消息給另一個消費者哲银。即:保證了同一個Queue消息只會被消費一次扛吞。
Consumer-1的log:
Consumer-2的log:
參考:
spring-rabbit消費過程解析及AcknowledgeMode選擇https://blog.csdn.net/weixin_38380858/article/details/84963944