一.實(shí)現(xiàn)思路
1.生產(chǎn)者發(fā)起調(diào)用
2.消費(fèi)者消費(fèi)消息
3.定時(shí)任務(wù)定時(shí)拉取投遞失敗的消息重新投遞
4.各種異常清空的測試驗(yàn)證
5.使用動(dòng)態(tài)代理實(shí)現(xiàn)消費(fèi)端幕等性校驗(yàn)和消息確認(rèn)
二.項(xiàng)目搭建
1.pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.rabbitmq配置
spring:
rabbitmq:
port: 5672
username: liming
host: 114.116.18.120
password: liming
virtual-host: /
##開啟回調(diào)
publisher-returns: true
publisher-confirms: true
##設(shè)置手動(dòng)確認(rèn)
listener:
simple:
acknowledge-mode: manual
prefetch: 100
3.表結(jié)構(gòu)
CREATE TABLE `msg_log` (
`msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一標(biāo)識',
`msg` text COMMENT '消息體, json格式化',
`exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交換機(jī)',
`routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由鍵',
`status` int(11) NOT NULL DEFAULT '0' COMMENT '狀態(tài): 0投遞中 1投遞成功 2投遞失敗 3已消費(fèi)',
`try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重試次數(shù)',
`next_try_time` datetime DEFAULT NULL COMMENT '下一次重試時(shí)間',
`create_time` datetime DEFAULT NULL COMMENT '創(chuàng)建時(shí)間',
`update_time` datetime DEFAULT NULL COMMENT '更新時(shí)間',
PRIMARY KEY (`msg_id`),
UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投遞日志';
4.rabbitConfig
@Configuration
public class RabbitConfig{
@Bean
public Queue queue1(){
return new Queue("hello.queue1", true);
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
@Bean
public Binding binding1(){
return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
}
}
5.生產(chǎn)者生產(chǎn)消息
@Slf4j
@Component
public class TopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendList(String name){
//設(shè)置消息消費(fèi)成功回調(diào)操作
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) ->{
if(ack){
log.info("消息發(fā)送成功胸蛛!");
String msgId = correlationData.getId();
msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_SUCCESS);
}else{
log.info("消息發(fā)送到Exchange失敗, {}, cause: {}", correlationData, cause);
}
});
// 觸發(fā)setReturnCallback回調(diào)必須設(shè)置mandatory=true, 否則Exchange沒有找到Queue就會(huì)丟棄掉消息, 而不會(huì)觸發(fā)回調(diào)
rabbitTemplate.setMandatory(true);
// 消息是否從Exchange路由到Queue, 注意: 這是一個(gè)失敗回調(diào), 只有消息從Exchange路由到Queue失敗才會(huì)回調(diào)這個(gè)方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息從Exchange路由到Queue失敗: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
});
//創(chuàng)建消息日志
String msgId = UUID.randomUUID().toString()磕道;
MsgLog msgLog = new MsgLog(msgId, name, RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME);
msgLogMapper.insert(msgLog);// 消息入庫
Message message = new Message(name.getBytes(), new MessageProperties());
//設(shè)置消息唯一id 發(fā)送消息
CorrelationData correlationData = new CorrelationData(msgId );
rabbitTemplate.convertAndSend("topicExchange", "key.2", message, correlationData);
}
6.消費(fèi)者 消費(fèi)消息
@Component
@Slf4j
public class TopicConsumer {
@Autowired
private MsgLogService msgLogService;
@RabbitListener(queues = "hello.queue1")
public void consume(Message message, Channel channel) throws IOException {
String name = new String(message.getBody())愤炸;
log.info("收到消息: {}", name);
MsgLog msgLog = msgLogService.selectByMsgId(msgId);
if (null == msgLog || msgLog.getStatus().equals(Constant.MsgLogStatus.CONSUMED_SUCCESS)) {// 消費(fèi)冪等性
log.info("重復(fù)消費(fèi), msgId: {}", msgId);
return;
}
MessageProperties properties = message.getMessageProperties();
long tag = properties.getDeliveryTag();
boolean success = mailUtil.send(mail);
if (success) {
msgLogService.updateStatus(msgId, Constant.MsgLogStatus.CONSUMED_SUCCESS);
channel.basicAck(tag, false);// 消費(fèi)確認(rèn)
} else {
channel.basicNack(tag, false, true);
}
}
}
7.定時(shí)任務(wù) 處理消費(fèi)失敗的消息
@Component
@Slf4j
public class ResendMsg {
@Autowired
private MsgLogService msgLogService;
@Autowired
private RabbitTemplate rabbitTemplate;
// 最大投遞次數(shù)
private static final int MAX_TRY_COUNT = 3;
/**
* 每30s拉取投遞失敗的消息, 重新投遞
*/
@Scheduled(cron = "0/30 * * * * ?")
public void resend() {
log.info("開始執(zhí)行定時(shí)任務(wù)(重新投遞消息)");
List<MsgLog> msgLogs = msgLogService.selectTimeoutMsg();
msgLogs.forEach(msgLog -> {
String msgId = msgLog.getMsgId();
if (msgLog.getTryCount() >= MAX_TRY_COUNT) {
msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);
log.info("超過最大重試次數(shù), 消息投遞失敗, msgId: {}", msgId);
} else {
msgLogService.updateTryCount(msgId, msgLog.getNextTryTime());// 投遞次數(shù)+1
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData);// 重新投遞
log.info("第 " + (msgLog.getTryCount() + 1) + " 次重新投遞消息");
}
});
log.info("定時(shí)任務(wù)執(zhí)行結(jié)束(重新投遞消息)");
}
}