使用Rocket MQ事務消息達到數(shù)據(jù)最終一致性
參考
1、 http://rocketmq.apache.org/rocketmq/the-design-of-transactional-message/ Rocket MQ事務消息的設計
2鲜结、https://rocketmq.apache.org/docs/transaction-example/ RocketMQ 事務消息的例子
0x00 前言
在分布式環(huán)境下,經(jīng)常會有跨服務的事務需求播掷,典型的例子如: 服務A 為賬戶服務,服務B為包月服務撼班,在服務A扣錢成功之后歧匈,要在服務B上做增加包月時長的操作,需要保證數(shù)據(jù)的一致性砰嘁。本文主要描寫如何在分布式環(huán)境下通過Rocket MQ的事務消息保證數(shù)據(jù)的最終一致性件炉。
注: 最終一致性是指在中間過程中,數(shù)據(jù)有可能不一致矮湘,但經(jīng)過一段時間后(具體時長視網(wǎng)絡延遲以及系統(tǒng)負載而定)斟冕,數(shù)據(jù)最終會保持一致。
0x01 Rocket MQ事務消息原理
Rocket MQ是阿里開發(fā)的一個分布式的開源消息隊列組件缅阳,目前由Apache開源組織維護磕蛇,最新版本是4.3.0,已經(jīng)支持事務消息十办。
事務消息可以確保本地事務 與 發(fā)送消息 之間的原子性秀撇,相關概念:
1、Half(Prepare) Message
Producer已經(jīng)把消息發(fā)送給Mq 服務器向族,但是Mq服務器尚未收到生產(chǎn)者的第二次Ack呵燕,這個時候消息會被標記為"temporarily undeliverable",目前消息的狀態(tài)為 Half Message
件相。
2再扭、Message Status Check
網(wǎng)絡斷開或者Producer應用重啟會導致Mq服務器無法從Producer獲取第二次ACK氧苍,當Mq服務器發(fā)現(xiàn)一個消息長時間處于 Half Message
狀態(tài)時(默認為60S,可配置)泛范,它會主動請求Producer候引,查詢消息Id對應的最新狀態(tài)(commit 或者 rollback)。
具體流程如下:
1敦跌、Producer 向Mq服務器 發(fā)送消息。
2逛揩、Mq服務器收到消息并持久化成功之后柠傍,會向 Producer確認首次ACK,此時消息處于 Half Message
狀態(tài)辩稽,并未發(fā)送給對應的Consumer惧笛。
3、Producer 開始執(zhí)行本地事務邏輯逞泄。
4患整、根據(jù)事務執(zhí)行結果,Producer 向Mq服務器提交二次確認(commit 或者 rollback)喷众。Mq Server 收到 Commit 狀態(tài)則將半消息標記為可投遞各谚,Consumer 最終將收到該消息;MQ Server 收到 Rollback 狀態(tài)則刪除半消息到千,Consumer 將不會接受該消息昌渤。
5、在斷網(wǎng)或者應用重啟的情況下憔四,二次ACK未成功的發(fā)給Mq Server膀息,Mq Server會主動向 Producer 啟動消息回查(Message Status Check),
6了赵、Producer 根據(jù)事務執(zhí)行結果潜支,對消息回查返回對應的結果。
7柿汛、Mq Server根據(jù)返回結果冗酿,決定繼續(xù)投遞消息或者丟棄消息(重復第4步操作)。
流程圖如下:
0x03 實例
針對上面案例苛茂,可以通過如下的方式使用事務消息已烤。
Producer:
1、實現(xiàn) TransactionListener
接口妓羊,在executeLocalTransaction
方法里執(zhí)行本地事務邏輯胯究,在checkLocalTransaction
方法里返回消息id對應的事務狀態(tài),用于Mq的消息回查躁绸。
2裕循、通過 TransactionMQProducer
構造事務消息并發(fā)送臣嚣。
Consumer:
1、從Mq server獲取到消息之后剥哑,即開始處理本地事務硅则,處理成功后返回 CONSUME_SUCCESS
。
2株婴、處理失敗則返回 RECONSUME_LATER
怎虫,Mq server會在稍后重新投遞這個消息,又進入步驟1困介。
注: Consumer 需要做好冪等控制大审,消息可能會被多次投遞到Consumer。
0x04 其他
可以看到座哩,極端情況下徒扶,可能仍然會出現(xiàn) 消費者出錯的情況,不過這種情況建議人工介入處理根穷。對于這種概率非常小的情況姜骡,使用人工介入手動處理的辦法,比實現(xiàn)一套完整的自動回滾事務系統(tǒng)成本要低很多屿良,要更劃算一些圈澈。
0x05 附
1、使用限制
以下是使用Rocket mq 事務消息的一些限制:
1尘惧、事務消息不支持 delay 或者 batch操作士败。
2、為了避免一個 Half Message的消息被檢查多次 或者 消息積壓褥伴,默認對每個消息最多進行15次消息回查谅将,可以通過修改broker的 transactionCheckMax
參數(shù)來指定次數(shù)。如果一個Half Message狀態(tài)的消息檢查次數(shù)超過了transactionCheckMax
重慢,默認情況下會直接丟棄掉并且打印錯誤日志饥臂,可以通過覆蓋 AbstractTransactionCheckListener
類來修改這個行為。
3似踱、通過 transactionMsgTimeout
參數(shù)可以指定消息回查(Message Status Check)間隔隅熙。
4、事務消息可能會被check或者consume多次核芽,要在Consumer端做好冪等控制囚戚。
2、事務狀態(tài)
Rocket MQ里有三種事務狀態(tài)
1轧简、LocalTransactionState.UNKNOW : 中間狀態(tài)驰坊,意味著Mq server需要稍候再次確認。
2哮独、LocalTransactionState.COMMIT_MESSAGE: 事務完成拳芙,意味著消息可以投遞給對應的 Consumer察藐。
3、LocalTransactionState.ROLLBACK_MESSAGE: 事務失敗舟扎,Mq Server會丟棄對應的事務消息分飞,不會投遞給對應的Consumer。