目錄
一评姨、概念
二难述、支付和發(fā)送MQ的一致性?
三、RocketMQ的事務(wù)消息
四吐句、ebay通用方案: 本地消息表 + MQ
五胁后、小結(jié)
一、概念
image-20211031090718024.png
#1.為什么需要事務(wù)消息?
#2.業(yè)務(wù)為什么這樣設(shè)計? 解耦: 計算手續(xù)費可以離線計算,利用MQ解耦支付和計算手續(xù)費流程
(1)流程主要涉及三個步驟:
--支付成功,更新訂單數(shù)據(jù)
--發(fā)送消息給 MQ
--手續(xù)費系統(tǒng)拉取消息,計算手續(xù)費入庫
(2)任何一個步驟失敗,都會導(dǎo)致數(shù)據(jù)不一致
--要么訂單數(shù)據(jù)更新了,手續(xù)費數(shù)據(jù)沒有生成
--要么手續(xù)費生成,但是訂單數(shù)據(jù)沒更新
(3)消息的消費比較簡單,若消費失敗,只要我們沒有提交消息確認(rèn),那么MQ服務(wù)器將會自動重試
(4)最大的問題: 如何保證支付和發(fā)送MQ的一致性?
--1.發(fā)送MQ 2.更新支付數(shù)據(jù): 提交事務(wù)但是回滾了,此時消息發(fā)送出去無法撤回,手續(xù)費計算并入庫了!數(shù)據(jù)不一致
--1.更新支付數(shù)據(jù) 2.發(fā)送MQ :
二嗦枢、支付和發(fā)送MQ的一致性?
image-20211031000505388.png
#1.先更新支付數(shù)據(jù),后發(fā)送MQ,是不是看不出什么問題
(1)偽代碼
// 開始事務(wù)
try {
// 1.執(zhí)行數(shù)據(jù)庫操作
// 2.發(fā)送 mq 消息
// 3.提交事務(wù)
}catch (Exception e){
// 4.回滾事務(wù)
}
(2)存在一種情況:由于網(wǎng)絡(luò)原因未收到MQ Server的響應(yīng)結(jié)果,認(rèn)為消息發(fā)送失敗導(dǎo)致回滾,實際消息已經(jīng)存儲在MQ Server了,手續(xù)費系統(tǒng)正常且計算后入庫,兩邊DB顯示數(shù)據(jù)不一致了
#2.MQ發(fā)送失敗不是可以重試嗎?
(1)消息重試如果耦合在事務(wù)中,顯然會拉長數(shù)據(jù)庫事務(wù)執(zhí)行時間,事務(wù)持有鎖時間過長,影響整體數(shù)據(jù)庫的吞吐量
(2)實際業(yè)務(wù),不太建議將消息耦合到數(shù)據(jù)庫事務(wù)中
三攀芯、RocketMQ的事務(wù)消息
image-20211031001104843.png
#1.概念
(1)可實現(xiàn)分布式事務(wù): 事務(wù)操作和消息發(fā)送要么都成功,要么都失敗
#2.實現(xiàn)原理
(1)first先發(fā)送一個半half消息到MQ中,通知開啟一個事務(wù)
注意:這里的半消息并不是說內(nèi)容不完整,實際包含所有的消息內(nèi)容,半消息和普通消息唯一的區(qū)別在于事務(wù)提交之前message對于consumer來說是"不可見"的
(2)half message 發(fā)送成功,那么可以根據(jù)事務(wù)的執(zhí)行結(jié)果再決定提交/回滾(事務(wù)|消息)
(3)
--若事務(wù)提交成功,那么發(fā)送確認(rèn)消息給MQ,那么手續(xù)費系統(tǒng)可以成功消費這個message
--若事務(wù)被回滾,發(fā)送回滾通知給MQ,那么MQ會將這個half message刪除
1419561-20200330075826122-1719090786.jpg
#3.那提交事務(wù)/回滾事務(wù)過程中 失敗怎么樣?
(1)MQ有"事務(wù)反查"機制,我們需要注冊一個回調(diào)接口,MQ會定時反查本地事務(wù)狀態(tài),根據(jù)結(jié)果決定回滾還是提交事務(wù)
(2)需要繼承 TransactionListener 注解回調(diào)接口
executeLocalTransaction 方法執(zhí)行本地事務(wù)
checkLocalTranscation 用來執(zhí)行檢查本地事務(wù)
(3)消息反查次數(shù)過多,導(dǎo)致半消息隊列堆積,影響性能
--RocketMQ 默認(rèn)將單個消息的檢查次數(shù)限制為 15 次,當(dāng)檢查次數(shù)超過最大次數(shù)后,RocketMQ 將會丟棄消息并且打印錯誤日志净宵。
--修改 broker 配置文件
# N為最大檢查次數(shù)
transactionCheckMax=N
--若想自定義丟棄消息行為敲才,需要修改 RocketMQ broker 端代碼裹纳,繼承 AbstractTransactionalMessageCheckListener 重寫 resolveDiscardMsg 方法择葡,加入自定義邏輯。
(4)反查時間設(shè)置
--設(shè)置 MQ 服務(wù)端多久之后開始反查事務(wù)消息(自事務(wù)消息保存成功之后開始計算),修改 broker 配置文件
#用于控制事務(wù)性消息檢查間隔, broker 每隔 5s 檢查一次事務(wù)消息剃氧,如果此時事務(wù)消息到 MQ 服務(wù)端時間還未超過 60s敏储,此時將不會反查,直到時間大于 60s朋鞍。
transactionTimeout=60000 // 單位為 ms,默認(rèn)為 60s
transactionCheckInterval=5000 // 每隔 5s 檢查一次事務(wù)消息
#4.偽代碼如下
public static class TransactionListenerImpl implements TransactionListener {
/**
* 半消息發(fā)送成功將會自動執(zhí)行該邏輯
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 執(zhí)行本地事務(wù)
try {
} catch (Exception e) {
System.out.println("本地事務(wù)執(zhí)行異常");
}
// 異常情況返回未知狀態(tài)
return LocalTransactionState.UNKNOW;
}
/***
* 若提交/回滾事務(wù)消息失敗已添,rocketmq 自動反查事務(wù)狀態(tài)
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
try {
if (isSuccess) {
// 本地事務(wù)執(zhí)行成功,提交半消息
} else {
// 本地事務(wù)執(zhí)行成功滥酥,回滾半消息
}
} catch (Exception e) {
}
// 異常情況返回未知狀態(tài)
return LocalTransactionState.UNKNOW;
}
}
四更舞、ebay通用方案: 本地消息表 + MQ
image-20211031083104157.png
#1.例子
(1)若有兩個微服務(wù)交互,支付服務(wù)和手續(xù)費服務(wù)坎吻,支付服務(wù)用于更新支付狀態(tài)缆蝉,手續(xù)費服務(wù)從MQ接受支付成功消息計算手續(xù)費,兩者不同庫表
(2)消息的消費有ack可保證不會重復(fù)消費,難點在于如何保證支付service + 消費一致
#2.本地消息表保證支付service + 消息發(fā)送的一致性?
(1)本地事務(wù)新增"消息日志",(支付表和消息日志表通過本地事務(wù)保證一致,保證原子性)
mysql begin transaction
// 1.支付service
// 2.存儲本地消息
commit transaction
(2)定時掃描消息日志
第一步已經(jīng)將消息寫到了消息日志表,可以啟動獨立線程定時任務(wù)job對消息日志表進行掃描
--掃描到記錄并成功發(fā)送到MQ Server,則刪除該消息日志
--若MQ未反饋發(fā)送成功則等到下一個周期重試
(3)如何保證consumer一定能消費到message?
--MQ有ack消息確認(rèn)機制,consumer接受到消息并完成業(yè)務(wù)流程向MQ發(fā)送ack,說明consumer消費message完成,MQ不再推送message,否則producer不斷重試向consumer發(fā)送mesaage
--因為消息中間件可能會重復(fù)投遞此消息,那么手續(xù)費service則需要保證冪等
五、小結(jié)
#1.可靠消息最終一致性
(1)保證message從producer傳遞到consumer的一致性
(2)注意兩個問題
--本地事務(wù)和消息發(fā)送的原子性問題
--consumer消費message的可靠性
#2.使用場景
(1)異步,解耦:同步事務(wù)變成了消息執(zhí)行的異步操作,避免了分布式事務(wù)同步阻塞,并實現(xiàn)了兩個微服務(wù)的解耦
(2)執(zhí)行周期長,實時性不高:可靠消息一致性適合執(zhí)行周期長且實時性不高的場景
ref:
還不知道事務(wù)消息嗎瘦真?這篇文章帶你全面掃盲刊头! - 樓下小黑哥 - 博客園 (cnblogs.com)
小黑十一點半 (studyidea.cn)
分布式事務(wù)有這一篇就夠了! - 知乎 (zhihu.com)