rabbitmq消息可靠性之消息回調(diào)機制
rabbitmq消息回調(diào)機制圖解.png
rabbitmq在消息的發(fā)送與接收中草雕,會經(jīng)過上面的流程,這些流程中每一步都有可能導致消息丟失固以,或者消費失敗甚至直接是服務器宕機等墩虹,這是我們服務接受不了的,為了保證消息的可靠性憨琳,rabbitmq提供了以下幾種機制
生產(chǎn)者確認機制
消息持久化存儲
消費者確認機制
失敗重試機制
本文主要講解生產(chǎn)者確認機制诫钓,也是rabbitmq提供的消息回調(diào)機制,這個機制可以解決生產(chǎn)者發(fā)送消息到交換機和交換機路由到隊列過程中的消息丟失問題
這種機制必須給每個消息指定一個唯一ID篙螟,消息發(fā)送到rabbitmq之后會返回結(jié)果給生產(chǎn)者菌湃,表示消息是否發(fā)送成功,返回結(jié)果有以下兩種
publisher-confirm:發(fā)送者確認:消息成功投遞到交換機遍略,返回 ack惧所;消息未投遞到交換機,返回 nack
publisher-return:發(fā)送者回執(zhí):消息成功投遞到交換機绪杏,但是沒有路由到隊列下愈。返回 ack,及路由失敗原因
spring:
rabbitmq:
# rabbitMQ的ip地址
host: 127.0.0.1
# 端口
port: 5672
# 集群模式配置
# addresses: 127.0.0.1:8071, 127.0.0.1:8072, 127.0.0.1:8073
username: admin
password: 123456
virtual-host: /
# 消費者確認機制相關(guān)配置
# 開啟publisher-confirm蕾久,
# 這里支持兩種類型:simple:同步等待confirm結(jié)果势似,直到超時;# correlated:異步回調(diào)僧著,定義ConfirmCallback履因,MQ返回結(jié)果時會回調(diào)這個ConfirmCallback
publisher-confirm-type: correlated
# publish-returns:開啟publish-return功能,同樣是基于callback機制盹愚,不過是定義ReturnCallback
publisher-returns: true
# 定義消息路由失敗時的策略栅迄。true,則調(diào)用ReturnCallback杯拐;false:則直接丟棄消息
template:
mandatory: true
然后定義 ReturnCallback 回調(diào)霞篡,每個RabbitTemplate只能配置一個ReturnCallback世蔗,因此需要在項目加載時配置
package com.gitee.small.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class RabbitMQConfig implements ApplicationContextAware {
//綁定鍵
public final static String DOG = "topic.dog";
public final static String CAT = "topic.cat";
/**
* Queue構(gòu)造函數(shù)參數(shù)說明
* new Queue(SMS_QUEUE, true);
* 1. 隊列名
* 2. 是否持久化 true:持久化 false:不持久化
*/
@Bean
public Queue firstQueue() {
return new Queue(DOG);
}
@Bean
public Queue secondQueue() {
return new Queue(CAT);
}
@Bean
public TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
/**
* 將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.dog
* 這樣只要是消息攜帶的路由鍵是topic.dog,才會分發(fā)到該隊列
*/
@Bean(name = "binding.dog")
public Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(DOG);
}
/**
* 將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規(guī)則topic.#
* 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發(fā)到該隊列
*/
@Bean(name = "binding.cat")
public Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 獲取RabbitTemplate對象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 判斷是否是延遲消息
Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
if (receivedDelay != null && receivedDelay > 0) {
// 是一個延遲消息,忽略這個錯誤提示
return;
}
// 記錄日志
log.error("消息發(fā)送到隊列失敗朗兵,響應碼:{}, 失敗原因:{}, 交換機: {}, 路由key:{}, 消息: {}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有需要的話污淋,重發(fā)消息
});
}
}
接著定義 ConfirmCallback,ConfirmCallback 可以在發(fā)送消息時指定余掖,因為每個業(yè)務處理 confirm 成功或失敗的邏輯不一定相同寸爆,上面已經(jīng)定義好exchange 和 queue,新建RabbitMqTest測試類
package smallJ;
import com.gitee.small.Application;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.UUID;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class RabbitMqTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test() throws InterruptedException {
// 1.準備CorrelationData
// 1.1.消息ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 1.2.準備ConfirmCallback
correlationData.getFuture().addCallback(result -> {
// 判斷結(jié)果
if (result.isAck()) {
// ACK
log.info("消息成功投遞到交換機盐欺!消息ID: {}", correlationData.getId());
} else {
// NACK
log.error("消息投遞到交換機失斄薅埂!消息ID:{}冗美,原因:{}", correlationData.getId(), result.getReason());
// 重發(fā)消息
}
}, ex -> {
// 記錄日志
log.error("消息發(fā)送異常, ID:{}, 原因{}", correlationData.getId(), ex.getMessage());
// 可以重發(fā)消息
});
rabbitTemplate.convertAndSend("topicExchange", "topic.dog", "路由模式測試-dog", correlationData);
// 程序休眠兩秒等待回調(diào)
Thread.sleep(2000);
}
}
加兩個監(jiān)聽器進行測試
package com.gitee.small.rabbitmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class TopicRabbitReceiver {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.dog"),
exchange = @Exchange(value = "bindingExchangeMessage", type = ExchangeTypes.TOPIC)
))
public void process(String msg) {
log.info("dog-收到消息:{}", msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.cat"),
exchange = @Exchange(value = "bindingExchangeMessage2", type = ExchangeTypes.TOPIC)
))
public void process2(String msg){
log.info("cat-收到消息:{}", msg);
}
}
測試結(jié)果如下
smallJ.RabbitMqTest : 消息成功投遞到交換機魔种!消息ID: 83f057fa-042d-4f56-872d-9d31a0444b82
c.g.small.rabbitmq.TopicRabbitReceiver : dog-收到消息:路由模式測試-dog
c.g.small.rabbitmq.TopicRabbitReceiver : cat-收到消息:路由模式測試-dog