image.png
消息投遞時(shí) 可能發(fā)生丟失的場景:
- 生產(chǎn)者------msg------> MQ 耳鸯∈祝可開啟消息投遞結(jié)果回調(diào),確保每條消息都收到了回調(diào)县爬。
- MQ阳啥。將Queue與消息設(shè)置成可持久化,搭建鏡像集群隊(duì)列财喳。
- MQ-------callback---->生產(chǎn)者察迟。回調(diào)時(shí)失敗耳高,某條消息在一段時(shí)間內(nèi)未收到回調(diào)卷拘,則默認(rèn)投遞失敗,生產(chǎn)者需要再次投遞該消息到MQ祝高。(該場景下會(huì)導(dǎo)致同一條消息被重復(fù)投遞栗弟,消費(fèi)者端需要自行保證消息冪等消費(fèi))
一、實(shí)現(xiàn)思路
image.png
使用技術(shù):
- SpringBoot
- RabbitMQ
- Mysql
- MybatisPlus
- XxlJob
二工闺、準(zhǔn)備乍赫,框架搭建
- 數(shù)據(jù)庫Entity:
- Message
/**
* 消息發(fā)送歷史
*
* @author futao
* @date 2020/3/31.
*/
@Getter
@Setter
@Builder
@TableName("message")
public class Message extends IdTimeEntity {
@Tolerate
public Message() {
}
/**
* 消息承載的業(yè)務(wù)數(shù)據(jù)
*/
@TableField("msg_data")
private String msgData;
/**
* 交換機(jī)名稱
*/
@TableField("exchange_name")
private String exchangeName;
/**
* 路由鍵
*/
@TableField("routing_key")
private String routingKey;
/**
* 消息狀態(tài)
*
* @see com.futao.springboot.learn.rabbitmq.doc.reliabledelivery.model.enums.MessageStatusEnum
*/
@TableField("status")
private int status;
/**
* 重試次數(shù)
*/
@TableField("retry_times")
private int retryTimes;
/**
* 下一次重試時(shí)間
*/
@TableField("next_retry_date_time")
private LocalDateTime nextRetryDateTime;
}
- 消息狀態(tài)枚舉
/**
* 消息狀態(tài)枚舉
*
* @author futao
* @date 2020/3/31.
*/
@Getter
@AllArgsConstructor
public enum MessageStatusEnum {
/**
* 1=發(fā)送中
*/
SENDING(1, "發(fā)送中"),
/**
* 2=發(fā)送失敗
*/
SUCCESS(2, "發(fā)送成功"),
/**
* 3=發(fā)送失敗
*/
FAIL(3, "發(fā)送失敗");
private int status;
private String description;
}
- RabbitMQ配置
spring:
rabbitmq:
host: localhost
port: 5672
username: futao
password: 123456789
virtual-host: reliable-delivery
connection-timeout: 15000
# 發(fā)送確認(rèn)
publisher-confirms: true
# 路由失敗回調(diào)
publisher-returns: true
template:
# 必須設(shè)置成true 消息路由失敗通知監(jiān)聽者,而不是將消息丟棄
mandatory: true
app:
rabbitmq:
retry:
# 消息最大重試次數(shù)
max-retry-times: 5
# 每次重試時(shí)間間隔
retry-interval: 5s
# 隊(duì)列定義
queue:
user: user-queue
# 交換機(jī)定義
exchange:
user: user-exchange
三陆蟆、編碼
- 隊(duì)列交換機(jī)定義與綁定
/**
* RabbitMQ隊(duì)列定義與綁定
*
* @author futao
* @date 2020/3/31.
*/
@Configuration
public class Declare {
@Bean
public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName) {
return QueueBuilder
.durable(userQueueName)
//.withArgument("x-max-length", 2)
.build();
}
@Bean
public Exchange userExchange(@Value("${app.rabbitmq.exchange.user}") String userExchangeName) {
return ExchangeBuilder
.topicExchange(userExchangeName)
.durable(true)
.build();
}
@Bean
public Binding userBinding(Queue userQueue, Exchange userExchange) {
return BindingBuilder
.bind(userQueue)
.to(userExchange)
.with("user.*")
.noargs();
}
}
- 對RabbitTemplate進(jìn)行增強(qiáng)雷厂,設(shè)置
confirmCallback()
消息投遞回調(diào)方法與returnCallback()
消息路由失敗回調(diào)方法
/**
* Bean增強(qiáng)
* 【嚴(yán)重警告】: 不可在該類中注入Bean,被注入的Bean不會(huì)被BeanPostProcessor增強(qiáng)叠殷,造成誤傷改鲫。
* 必須通過容器來獲取需要注入的Bean
*
* @author futao
* @date 2020/3/20.
*/
@Slf4j
@Component
public class BeanEnhance implements BeanPostProcessor {
// @Resource
// private MessageMapper messageMapper;
/**
* 消息的最大重試次數(shù)
*/
@Value("${app.rabbitmq.retry.max-retry-times}")
private int maxRetryTimes;
/**
* 每次重試時(shí)間間隔
*/
@Value("${app.rabbitmq.retry.retry-interval}")
private Duration retryInterval;
// @Autowired
// private RabbitTemplate rabbitTemplate;
//
// @Autowired
// private BeanEnhance enhance;
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
//增強(qiáng)RabbitTemplate
if (RabbitTemplate.class.equals(bean.getClass())) {
//消息投遞成功與否的監(jiān)聽,可以用來保證消息100%投遞到rabbitMQ林束。(如果某條消息(通過id判定)在一定時(shí)間內(nèi)未收到該回調(diào)像棘,則重發(fā)該消息)
//需要設(shè)置 publisher-confirms: true
((RabbitTemplate) bean).setConfirmCallback((correlationData, ack, cause) -> {
String correlationDataId = correlationData.getId();
if (ack) {
//ACK
log.debug("消息[{}]投遞成功,將DB中的消息狀態(tài)設(shè)置為投遞成功", correlationDataId);
ApplicationContextHolder.getBean(MessageMapper.class).update(null,
Wrappers.<Message>lambdaUpdate()
.set(Message::getStatus, MessageStatusEnum.SUCCESS.getStatus())
.eq(Message::getId, correlationDataId)
);
} else {
log.debug("消息[{}]投遞失敗,cause:{}", correlationDataId, cause);
//NACK壶冒,消息重發(fā)
ApplicationContextHolder.getBean(BeanEnhance.class).reSend(correlationDataId);
}
});
//消息路由失敗的回調(diào)--需要設(shè)置 publisher-returns: true 并且 template: mandatory: true 否則rabbit將丟棄該條消息
((RabbitTemplate) bean).setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.warn("消息路由失敗回調(diào)...做一些補(bǔ)償或者記錄.............................................");
log.warn("message{}", message);
log.warn("replyCode{}", replyCode);
log.warn("replyText{}", replyText);
log.warn("exchange{}", exchange);
log.warn("routingKey{}", routingKey);
});
}
return bean;
}
/**
* NACK時(shí)進(jìn)行消息重發(fā)
*
* @param correlationDataId
*/
@Transactional(rollbackFor = Exception.class)
public void reSend(String correlationDataId) {
Message message = ApplicationContextHolder.getBean(MessageMapper.class).selectById(correlationDataId);
if (message.getRetryTimes() < maxRetryTimes) {
//進(jìn)行重試
ApplicationContextHolder.getBean(RabbitTemplate.class).convertAndSend(message.getExchangeName(), message.getRoutingKey(), message.getMsgData(), new CorrelationData(correlationDataId));
//更新DB消息狀態(tài)
ApplicationContextHolder.getBean(MessageMapper.class).update(null,
Wrappers.<Message>lambdaUpdate()
.set(Message::getStatus, MessageStatusEnum.SENDING.getStatus())
.set(Message::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
.set(Message::getRetryTimes, message.getRetryTimes() + 1)
.eq(Message::getId, correlationDataId)
);
}
}
}
- 生產(chǎn)者:
/**
* @author futao
* @date 2020/3/31.
*/
@Component
public class Sender {
@Value("${app.rabbitmq.retry.retry-interval}")
private Duration retryInterval;
@Autowired
private RabbitTemplate rabbitTemplate;
@Resource
private MessageMapper messageMapper;
@Value("${app.rabbitmq.exchange.user}")
private String userExchangeName;
public void send(User user) {
Message message = Message.builder()
.msgData(JSON.toJSONString(user))
.exchangeName(userExchangeName)
.routingKey("user.abc")
.status(MessageStatusEnum.SENDING.getStatus())
//下次重試時(shí)間
.nextRetryDateTime(LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
.retryTimes(0)
.build();
//消息落庫
messageMapper.insert(
message
);
CorrelationData correlationData = new CorrelationData(message.getId());
//消息投遞到MQ
rabbitTemplate.convertAndSend(userExchangeName, "user.abc", JSON.toJSONString(user), correlationData);
}
}
- 定時(shí)任務(wù)掃描DB中的消息狀態(tài)缕题,如果存在發(fā)送中的消息,且當(dāng)前時(shí)間>=下一次投遞時(shí)間 and 重試次數(shù)<=最大重試次數(shù)胖腾,則再次進(jìn)行投遞烟零。
- XxlJob配置
xxl:
job:
switch: ON
admin:
### 調(diào)度中心部署跟地址 [選填]:如調(diào)度中心集群部署存在多個(gè)地址則用逗號分隔。執(zhí)行器將會(huì)使用該地址進(jìn)行"執(zhí)行器心跳注冊"和"任務(wù)結(jié)果回調(diào)"咸作;為空則關(guān)閉自動(dòng)注冊锨阿;
addresses: http://127.0.0.1:9090/xxl-job-admin
executor:
### 執(zhí)行器AppName [選填]:執(zhí)行器心跳注冊分組依據(jù);為空則關(guān)閉自動(dòng)注冊
appname: xxl-job-executor-rabbitmq
### 執(zhí)行器IP [選填]:默認(rèn)為空表示自動(dòng)獲取IP记罚,多網(wǎng)卡時(shí)可手動(dòng)設(shè)置指定IP墅诡,該IP不會(huì)綁定Host僅作為通訊實(shí)用;地址信息用于 "執(zhí)行器注冊" 和 "調(diào)度中心請求并觸發(fā)任務(wù)"毫胜;
# ip:
### 執(zhí)行器端口號 [選填]:小于等于0則自動(dòng)獲仁樾薄诬辈;默認(rèn)端口為9999,單機(jī)部署多個(gè)執(zhí)行器時(shí)荐吉,注意要配置不同執(zhí)行器端口焙糟;
port: 9999
### 執(zhí)行器運(yùn)行日志文件存儲(chǔ)磁盤路徑 [選填] :需要對該路徑擁有讀寫權(quán)限;為空則使用默認(rèn)路徑样屠;
logpath: data/applogs/xxl-job/jobhandler
### 執(zhí)行器日志文件保存天數(shù) [選填] : 過期日志自動(dòng)清理, 限制值大于等于3時(shí)生效; 否則, 如-1, 關(guān)閉自動(dòng)清理功能穿撮;
logretentiondays: 30
### 執(zhí)行器通訊TOKEN [選填]:非空時(shí)啟用;
accessToken:
- Configuration
/**
* XXL-JOB配置
*
* @author futao
* @date 2020/4/1.
*/
@Setter
@Getter
@Slf4j
@Configuration
@ConfigurationProperties(prefix = "xxl.job")
public class XxlJobConfig {
private final Admin admin = new Admin();
private final Executor executor = new Executor();
@Bean
public XxlJobSpringExecutor xxlJobExecutor(XxlJobConfig xxlJobConfig) {
log.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(xxlJobConfig.getAdmin().getAddresses());
xxlJobSpringExecutor.setAppName(xxlJobConfig.getExecutor().getAppName());
xxlJobSpringExecutor.setIp(xxlJobConfig.getExecutor().getIp());
xxlJobSpringExecutor.setPort(xxlJobConfig.getExecutor().getPort());
xxlJobSpringExecutor.setLogPath(xxlJobConfig.getExecutor().getLogPath());
xxlJobSpringExecutor.setLogRetentionDays(xxlJobConfig.getExecutor().getLogRetentionDays());
return xxlJobSpringExecutor;
}
@Getter
@Setter
public static class Admin {
private String addresses;
}
@Getter
@Setter
public static class Executor {
private String appName;
private String ip;
private int port;
private String logPath;
private int logRetentionDays;
}
}
- 定時(shí)掃描任務(wù)編寫
/**
* 掃描數(shù)據(jù)庫中需要重新投遞的消息并重新投遞
*
* @author futao
* @date 2020/4/1.
*/
@Slf4j
@Component
public class MessageReSendJob extends IJobHandler {
@Autowired
private RabbitTemplate rabbitTemplate;
@Resource
private MessageMapper messageMapper;
@Autowired
private MessageReSendJob messageReSendJob;
/**
* 最大重試次數(shù)
*/
@Value("${app.rabbitmq.retry.max-retry-times}")
private int retryTimes;
/**
* 重試時(shí)間間隔
*/
@Value("${app.rabbitmq.retry.retry-interval}")
private Duration retryInterval;
/**
* 批量從數(shù)據(jù)庫中讀取的消息
*/
private static final int PAGE_SIZE = 100;
@XxlJob(value = "MessageReSendJob", init = "jobHandlerInit", destroy = "jobHandlerDestroy")
@Override
public ReturnT<String> execute(String s) throws Exception {
long startTime = System.currentTimeMillis();
log.info("開始掃描需要進(jìn)行重試投遞的消息");
XxlJobLogger.log("開始掃描需要進(jìn)行重試投遞的消息");
service(1);
log.info("掃描需要進(jìn)行重試投遞的消息任務(wù)結(jié)束痪欲,耗時(shí)[{}]ms", System.currentTimeMillis() - startTime);
XxlJobLogger.log("掃描需要進(jìn)行重試投遞的消息任務(wù)結(jié)束悦穿,耗時(shí)[{}]ms", System.currentTimeMillis() - startTime);
return ReturnT.SUCCESS;
}
public void service(int pageNum) {
IPage<Message> messageIPage = messageMapper.selectPage(new Page<>(pageNum, PAGE_SIZE),
Wrappers.<Message>lambdaQuery()
//發(fā)送中的消息
.eq(Message::getStatus, MessageStatusEnum.SENDING.getStatus())
//已到達(dá)下次發(fā)送時(shí)間
.le(Message::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)))
);
List<Message> messages = messageIPage.getRecords();
messages.forEach(message -> {
if (retryTimes <= message.getRetryTimes()) {
//已達(dá)到最大投遞次數(shù),將消息設(shè)置為投遞失敗
messageMapper.update(null, Wrappers.<Message>lambdaUpdate().set(Message::getStatus, MessageStatusEnum.FAIL.getStatus()).eq(Message::getId, message.getId()));
} else {
messageReSendJob.reSend(message);
}
});
if (PAGE_SIZE == messages.size()) {
service(++pageNum);
}
}
/**
* 重新投遞消息
*
* @param message
*/
public void reSend(Message message) {
messageMapper.update(null,
Wrappers.<Message>lambdaUpdate()
.set(Message::getRetryTimes, message.getRetryTimes() + 1)
.set(Message::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
.eq(Message::getId, message.getId())
);
try {
//再次投遞
rabbitTemplate.convertAndSend(message.getExchangeName(), message.getRoutingKey(), message.getMsgData(), new CorrelationData(message.getId()));
} catch (Exception e) {
log.error("消息[{}]投遞失敗", JSON.toJSONString(message));
}
}
public void jobHandlerInit() {
log.info("before job execute...");
XxlJobLogger.log("before job handler init...");
}
public void jobHandlerDestroy() {
log.info("after job execute...");
XxlJobLogger.log("after job execute...");
}
}
-
XxlJob 新增調(diào)度任務(wù)
image.png
四业踢、測試
- 測試接口
/**
* @author futao
* @date 2020/4/1.
*/
@RequestMapping("/user")
@RestController
public class UserController {
@Autowired
private Sender sender;
@RequestMapping("/send")
public void send() {
sender.send(User
.builder()
.userName("天文")
.birthday(LocalDate.of(1995, 1, 31))
.address("浙江杭州")
.build());
}
}
- 正常場景:
消息落庫栗柒,狀態(tài)為1=發(fā)送中
image.png
回調(diào)
image.png
消息設(shè)置成投遞成功
image.png
- 異常場景
啟動(dòng)生產(chǎn)者服務(wù)后停止MQ
發(fā)送消息
image.png
因?yàn)槭詹坏皆摋l消息的ACK。所以一直處于發(fā)送中知举。開啟任務(wù)調(diào)度再次進(jìn)行投遞(投遞次數(shù)+1瞬沦,且更新下次投遞時(shí)間)
image.png
當(dāng)投遞次數(shù)達(dá)到最大投遞次數(shù),下一次雇锡,將消息設(shè)置成投遞失敗
image.png
調(diào)度日志
image.png
image.png
# Next
- 消息可靠消費(fèi)
- 消費(fèi)端限流保護(hù)
- 死信隊(duì)列
- 延遲隊(duì)列