前言:
消息隊列的主要作用是實現(xiàn)系統(tǒng)間的解耦缓呛、異步處理和削峰填谷扮授。 由于消息隊列的異步使用特性闸天,天然的會存在一定概率消息丟失的情況犬绒。
方案1:消息落庫
消息落庫重發(fā)是基于MQ的confirm機制,在消息發(fā)送失敗后自動重發(fā)挂签。
Step 1: 首先把消息信息(業(yè)務數(shù)據(jù))存儲到數(shù)據(jù)庫中疤祭,緊接著,我們再把這個消息記錄也存儲到一張消息記錄表里(或者另外一個同源數(shù)據(jù)庫的消息記錄表)
Step 2:發(fā)送消息到MQ Broker節(jié)點(采用confirm方式發(fā)送饵婆,會有異步的返回結果)
Step 3勺馆、4:生產者端接受MQ Broker節(jié)點返回的Confirm確認消息結果,然后進行更新消息記錄表里的消息狀態(tài)啦辐。比如默認Status = 0 當收到消息確認成功后谓传,更新為1即可!
Step 5:但是在消息確認這個過程中可能由于網絡閃斷芹关、MQ Broker端異常等原因導致 回送消息失敗或者異常续挟。這個時候就需要發(fā)送方(生產者)對消息進行可靠性投遞了,保障消息不丟失侥衬,100%的投遞成功J觥(有一種極限情況是閃斷,Broker返回的成功確認消息轴总,但是生產端由于網絡閃斷沒收到直颅,這個時候重新投遞可能會造成消息重復,需要消費端去做冪等處理)所以我們需要有一個定時任務怀樟,(比如每5分鐘拉取一下處于中間狀態(tài)的消息功偿,當然這個消息可以設置一個超時時間,比如超過1分鐘 Status = 0 往堡,也就說明了1分鐘這個時間窗口內械荷,我們的消息沒有被確認,那么會被定時任務拉取出來)
Step 6:接下來我們把中間狀態(tài)的消息進行重新投遞 retry send虑灰,繼續(xù)發(fā)送消息到MQ 吨瞎,當然也可能有多種原因導致發(fā)送失敗
Step 7:我們可以采用設置最大努力嘗試次數(shù),比如投遞了3次穆咐,還是失敗颤诀,那么我們可以將最終狀態(tài)設置為Status = 2 字旭,最后 交由人工解決處理此類問題(或者把消息轉儲到失敗表中)。
表結構和代碼示例
CREATE TABLE IF NOT EXISTS `message_log`
(
`message_id` varchar(30) NOT NULL COMMENT '消息唯一ID',
`message` varchar(1000) DEFAULT '' COMMENT '消息內容',
`business_id` varchar(40) NOT NULL COMMENT '業(yè)務id崖叫,比如記錄訂單號',
`try_count` int(4) DEFAULT '0' COMMENT '重試次數(shù)',
`status` tinyint(2) DEFAULT '0' COMMENT ' 消息投遞狀態(tài) 0:投遞中 1:投遞成功 2:投遞失敗',
`next_retry_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '下一次投遞時間',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時間',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后一次更新時間',
PRIMARY KEY (`message_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
創(chuàng)建訂單方法
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderMapper orderMapper;
private final MessageLogMapper messageLogMapper;
private final RocketMQProducer rocketMQProducer;
//創(chuàng)建訂單
public void createOrder(Order order) {
//插入業(yè)務數(shù)據(jù)
orderMapper.insert(order);
//插入消息記錄表數(shù)據(jù)
MessageLog messageLog = new MessageLog();
//消息唯一ID
messageLog.setMessageId(messageId);
//保存消息整體
messageLog.setMessage(JSONObject.toJSONString(order));
//設置消息狀態(tài)為0 表示發(fā)送中
messageLog.setStatus(0);
//設置下一次執(zhí)行時間
messageLog.setNextRetryTime(nextRetryTime);
messageLogMapper.insert(brokerMessageLog);
//發(fā)送消息
rocketMQProducer.sendOrder(order);
}
}
消息生產者
@Component
public class RocketMQProducer {
public void sendOrder(Order order) {
//1.創(chuàng)建消息
Message message = new Message("test_quick_topic",// 主題
"TagA",// 標簽
"KeyA",// 用戶自定義的key遗淳,唯一的標識
FastJsonConvertUtil.convertObjectToJSON(order).getBytes()); //消息內容實體(byte[])
try {
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
//如果confirm返回成功 則進行更新
messageLogMapper.changeMessageLogStatus();
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
//失敗則進行具體的后續(xù)操作:重試 或者補償?shù)仁侄? System.err.println("-----------異常處理-----------");
}
});
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
定時任務
@Component
public class RetryMessageTasker {
@Scheduled(initialDelay = 5000, fixedDelay = 10000)
public void reSend() {
System.out.println("----------------定時任務開始----------------");
//pull status = 0 and timeout message
List<MessageLog> list = getNeedReSendMsgList();
for (MessageLog messageLog : list) {
if (messageLog.getTryCount() > maxTryCount) {
//update fail message
continue;
}
//更新try_count
// resend
try {
sendOrder(getMessage());
} catch (Exception e) {
e.printStackTrace();
System.err.println("-----------異常處理-----------");
}
}
}
}
該方案只能保證消息從生產者到MQ之間的可靠性投遞,解決辦法:
方式1. 在消息表中新增 消費成功狀態(tài)
,下游消費者變更消費狀態(tài)(要考慮多個業(yè)務消費的情況)
方式2. 使用業(yè)務正確性校驗平臺BCP檢查上下游業(yè)務數(shù)據(jù)是否一致,進行修復
方案2:二次確認檢測
二次確認檢測是基于延時投遞機制實現(xiàn)的心傀,主要目的是為了減少數(shù)據(jù)庫操作洲脂,提高并發(fā)量。
Step 1:先將業(yè)務數(shù)據(jù)進行入庫剧包,然后上游服務將消息M1發(fā)送出去
Step 2:在發(fā)送消息M1之后,緊接著生產端再次發(fā)送一條延遲消息(Second Send Delay Check)往果,即延遲檢查投遞消息M3
Step 3:消費端去監(jiān)聽指定隊列疆液,將收到的消息進行處理
Step 4:處理完成之后,發(fā)送一個confirm消息M2陕贮,也就是回送響應堕油,但是這里響應不是正常的ACK,而是重新生成一條消息肮之,投遞到MQ中
Step 5:
下游Callback Check Service
是一個單獨的服務掉缺,其實它扮演了方案一的存儲消息的DB角色,它通過MQ去監(jiān)聽下游服務發(fā)送的confirm消息M2戈擒,如果下游Callback Check Service
收到下游服務的confirm消息M2眶明,那么就對消息做持久化存儲,即將消息持久化到DB中Step 6:10分鐘之后MQ Server推送了延遲消息發(fā)送M3
Step 7:
下游Callback Check Service
收到延遲消息發(fā)送M3后筐高,Check消息后去檢查DB中是否存在消息M2搜囱,如果存在,則不需要做任何處理柑土,如果不存在或者消費失敗了蜀肘,那么下游Callback Check Service
就需要主動發(fā)起RPC通信給上游服務,上游服務收到信息后就會重新查詢業(yè)務消息然后將消息M1發(fā)送出去
該方案能夠保證消息從生成者端到消費者的可靠性投遞稽屏,消費者都能消費到扮宠,生產者也就自然而然是可靠性的投遞。
方案對比
方案 | 優(yōu)點 | 缺點 |
---|---|---|
消息落庫 | 實現(xiàn)簡單 | 發(fā)送消息前需要2次DB操作狐榔,影響并發(fā)性能 |
二次確認檢測 | 減少了數(shù)據(jù)庫操作坛增,提高并發(fā)量 | 不一定能保障百分百投遞成功,但是基本上可以保障大概99.9%的消息是OK的荒叼,有些特別極端的情況只能是使用定時任務去轿偎、BCP或人工去做補償了, |