前言
在之前介紹了可靠性投遞方案和項(xiàng)目搭建::https://juejin.im/post/5c3f43dae51d45731470a18c
唯一ID:https://juejin.im/post/5c3eb6e7518825253b5ea545
前期的一些準(zhǔn)備:https://juejin.im/post/5c3f4bafe51d4551ec60b521
先給出完整代碼:
https://github.com/hmilyos/common.git
https://github.com/hmilyos/snowFlakeDemo.git
https://github.com/hmilyos/rabbitmq-common.git available 分支
先來回顧一下我們可靠性投遞的流程
流程的示意圖如上所示,比如我下單成功了必盖,這時(shí)進(jìn)行 step1韧献,對我的業(yè)務(wù)數(shù)據(jù)進(jìn)行入庫钦椭,業(yè)務(wù)數(shù)據(jù)入庫完畢(這里要特別注意一定要保證業(yè)務(wù)數(shù)據(jù)入庫)再對要發(fā)送的消息進(jìn)行入庫氨鹏,圖中采用了兩個(gè)數(shù)據(jù)庫集歇,可以根據(jù)實(shí)際業(yè)務(wù)場景來確定是否采用兩個(gè)數(shù)據(jù)庫暴构,如果采用了兩個(gè)數(shù)據(jù)庫盗飒,有人可能就像到了采用分布式事務(wù)來保證數(shù)據(jù)的一致性烈拒,但是在大型互聯(lián)網(wǎng)中圆裕,基本很少采用事務(wù)广鳍,都是采用補(bǔ)償機(jī)制。
對業(yè)務(wù)數(shù)據(jù)和消息入庫完畢就進(jìn)入 setp2吓妆,發(fā)送消息到 MQ 服務(wù)上赊时,按照正常的流程就是消費(fèi)者監(jiān)聽到該消息,就根據(jù)唯一 id 修改該消息的狀態(tài)為已消費(fèi)耿战,并給一個(gè)確認(rèn)應(yīng)答 ack 到 Listener蛋叼。如果出現(xiàn)意外情況,消費(fèi)者未接收到或者 Listener 接收確認(rèn)時(shí)發(fā)生網(wǎng)絡(luò)閃斷剂陡,接收不到狈涮,這時(shí)候就需要用到我們的分布式定時(shí)任務(wù)來從 msg 數(shù)據(jù)庫抓取那些超時(shí)了還未被消費(fèi)的消息,重新發(fā)送一遍鸭栖。重試機(jī)制里面要設(shè)置重試次數(shù)限制歌馍,因?yàn)橐恍┩獠康脑驅(qū)е乱恢卑l(fā)送失敗的,不能重試太多次晕鹊,要不然會拖垮整個(gè)服務(wù)松却。例如重試三次還是失敗的,就把消息的 status 設(shè)置成 發(fā)送失敗溅话,然后通過補(bǔ)償機(jī)制晓锻,人工去處理。實(shí)際生產(chǎn)中飞几,這種情況還是比較少的砚哆,但是你不能沒有這個(gè)補(bǔ)償機(jī)制,要不然就做不到可靠性了屑墨。
下面就讓我們用代碼來實(shí)現(xiàn)這個(gè)方案吧躁锁。
1. 簡單的創(chuàng)建訂單接口
@RestController
public class OrderController {
@Autowired
private IMessageService messageService;
@GetMapping("/createOrder")
public ServerResponse createOrder(long userId){
return messageService.createOrder(userId);
}
}
2. IMessageService 接口以及實(shí)現(xiàn)類
public interface IMessageService {
@Transactional
ServerResponse createOrder(long userId);
}
@Service
public class MessageServiceImpl implements IMessageService {
private final static Logger log = LoggerFactory.getLogger(MessageServiceImpl.class);
@Autowired
private ISnowFlakeService snowFlakeService;
@Autowired
private MessageMapper messageMapper;
@Autowired
private RabbitOrderSender rabbitOrderSender;
@Override
public ServerResponse createOrder(long userId) {
// 首先是針對業(yè)務(wù)邏輯,進(jìn)行下單的業(yè)務(wù)卵史,保存到數(shù)據(jù)庫后
// 業(yè)務(wù)落庫后战转,再對消息進(jìn)行落庫,
long msgId = snowFlakeService.getSnowFlakeID();
Message message = new Message(msgId, TypeEnum.CREATE_ORDER.getCode(), userId + "創(chuàng)建訂單:" + msgId,
0, MSGStatusEnum.SENDING.getCode(), DateUtils.addMinutes(new Date(), Constants.TRY_TIMEOUT));
int row = messageMapper.insertSelective(message);
if (row == 0){
throw new CustomException(500, "消息入庫異常");
}
// 消息落庫后就可以發(fā)送消息了
try {
rabbitOrderSender.sendOrder(message);
} catch (Exception e) {
// 因?yàn)闃I(yè)務(wù)已經(jīng)落庫了
// 所以 即使發(fā)送失敗也不影響以躯,因?yàn)榭煽啃酝哆f槐秧,我回去再次嘗試發(fā)送消息
log.error("sendOrder mq msg error: ", e);
messageMapper.updataNextRetryTimeForNow(message.getMessageId());
}
return ServerResponse.createBySuccess();
}
}
注意了,我這里是直接拿消息的實(shí)體當(dāng)做業(yè)務(wù)去落庫了忧设,實(shí)際上應(yīng)該是先對訂單實(shí)體落庫色鸳,然后再對消息實(shí)體落庫,最后發(fā)送消息见转!
3. 發(fā)送消息的具體實(shí)現(xiàn) RabbitOrderSender
具體代碼如下,注釋也寫得很詳細(xì)了:
@Component
public class RabbitOrderSender {
private static final Logger log = LoggerFactory.getLogger(RabbitOrderSender.class);
//自動(dòng)注入RabbitTemplate模板類
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MessageMapper messageMapper;
@Value("${order.rabbitmq.listener.order.exchange.name}")
private String exchangeName;
@Value("${order.rabbitmq.send.create.key}")
private String routingKey;
//發(fā)送消息方法調(diào)用: 構(gòu)建自定義對象消息
public void sendOrder(Message message) throws Exception {
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
CorrelationData correlationData = new CorrelationData(message.getMessageId() + "");
rabbitTemplate.convertAndSend(exchangeName, routingKey, message, correlationData);
// throw new CustomException("--test--");
}
//回調(diào)函數(shù): confirm確認(rèn)
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("correlationData: {}", correlationData);
String messageId = correlationData.getId();
if(ack){
//如果confirm返回成功 則進(jìn)行更新
messageMapper.changeMessageStatus(Long.parseLong(messageId), MSGStatusEnum.SEND_SUCCESS.getCode());
} else {
//失敗則進(jìn)行具體的后續(xù)操作:重試 或者補(bǔ)償?shù)仁侄? log.error("消息發(fā)送失敗蒜哀,需要進(jìn)行異常處理...");
messageMapper.updataNextRetryTimeForNow(Long.parseLong(messageId));
}
}
};
//回調(diào)函數(shù): return返回斩箫, 這里是預(yù)防消息不可達(dá)的情況吏砂,比如 MQ 里面沒有對應(yīng)的 exchange、queue 等情況乘客,
// 如果消息真的不可達(dá)狐血,那么就要根據(jù)你實(shí)際的業(yè)務(wù)去做對應(yīng)處理,比如是直接落庫易核,記錄補(bǔ)償匈织,還是放到死信隊(duì)列里面,之后再進(jìn)行落庫
// 這里脫開實(shí)際業(yè)務(wù)場景牡直,不大好描述
final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode,
String replyText, String exchange, String routingKey) {
log.info("return exchange: {}, routingKey: {}, replyCode: {}, replyText: {}",
exchange, routingKey, replyCode, replyText);
}
};
}
4. 消費(fèi)端代碼 RabbitOrderReceiver
@Component
public class RabbitOrderReceiver {
private static final Logger log = LoggerFactory.getLogger(RabbitOrderReceiver.class);
@Autowired
private MessageMapper messageMapper;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${order.rabbitmq.listener.order.queue.name}",
durable="${order.rabbitmq.listener.order.queue.durable}"),
exchange = @Exchange(value = "${order.rabbitmq.listener.order.exchange.name}",
durable="${order.rabbitmq.listener.order.exchange.durable}",
type= "${order.rabbitmq.listener.order.exchange.type}",
ignoreDeclarationExceptions = "${order.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
key = "${order.rabbitmq.listener.order.key}"
)
)
@RabbitHandler
public void onOrderMessage(@Payload com.hmily.rabbitmq.rabbitmqcommon.entity.Message msg,
Channel channel,
@Headers Map<String, Object> headers) throws Exception {
log.info("-----------------RabbitOrderReceiver---------------------");
log.info("消費(fèi)端 order msg: {} ", msg.toString());
msg.setStatus(MSGStatusEnum.PROCESSING_IN.getCode());
int row = messageMapper.updateByPrimaryKeySelective(msg);
if (row != 0) {
Thread.sleep(2000L);
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
// 接著去執(zhí)行你對應(yīng)的業(yè)務(wù)邏輯缀匕,
// 注意,這是可靠性投遞碰逸,執(zhí)行業(yè)務(wù)邏輯一定要做冪等性
}
}
}
注意:我這里并沒有做冪等性去重乡小,實(shí)際業(yè)務(wù)時(shí)必須做冪等性!饵史!
6.我們的定時(shí)重試機(jī)制 SendMessageTask
@Component
public class SendMessageTask {
private static final Logger log = LoggerFactory.getLogger(SendMessageTask.class);
@Autowired
private MessageMapper messageMapper;
@Autowired
private IMessageFailedService messageFailedService;
@Autowired
private RabbitOrderSender rabbitOrderSender;
@Scheduled(initialDelay = 3000, fixedDelay = 10000)
public void reSend(){
log.info("---------------定時(shí)任務(wù)開始---------------");
List<Message> msgs = messageMapper.getNotProcessingInByType(TypeEnum.CREATE_ORDER.getCode(), null,
new int[]{MSGStatusEnum.SENDING.getCode()});
msgs.forEach(msg -> {
if (msg.getTryCount() >= Constants.MAX_TRY_COUNT) {
// 如果重試次數(shù)大于最大重試次數(shù)就不再重試满钟,記錄失敗
msg.setStatus(MSGStatusEnum.SEND_FAILURE.getCode());
msg.setUpdateTime(new Date());
messageMapper.updateByPrimaryKeySelective(msg);
MessageFailed failed = new MessageFailed(msg.getMessageId(), "消息發(fā)送失敗", "已達(dá)到最大重試次數(shù),但是還是發(fā)送失敗");
messageFailedService.add(failed);
} else {
// 未達(dá)到最大重試次數(shù)胳喷,可以進(jìn)行重發(fā)消息
// 先改一下消息記錄湃番,保存好再發(fā)送消息
msg.setNextRetry(DateUtils.addMinutes(new Date(), Constants.TRY_TIMEOUT));
int row = messageMapper.updateTryCount(msg);
try {
rabbitOrderSender.sendOrder(msg);
} catch (Exception e) {
log.error("sendOrder mq msg error: ", e);
messageMapper.updataNextRetryTimeForNow(msg.getMessageId());
}
}
});
}
}
至此,我們的可靠性投遞就完成了吭露。