1屏富、前言
我們在事務(wù)中經(jīng)常會遇到這種情況脂男,向數(shù)據(jù)庫進(jìn)行一些數(shù)據(jù)修改后养叛,再將消息塞進(jìn) mq,供其他系統(tǒng)消費宰翅。但是這個過程中弃甥,數(shù)據(jù)庫修改失敗可以進(jìn)行回滾,但 mq 塞消息卻回滾不了汁讼;或者數(shù)據(jù)庫修改能保證一定修改成功淆攻,mq 卻不能保證消息一定到達(dá)。
針對數(shù)據(jù)庫回滾了嘿架,其實我們可以使用一個補償定時任務(wù)瓶珊,針對數(shù)據(jù)狀態(tài)沒有改變的數(shù)據(jù)(數(shù)據(jù)必須有狀態(tài)),再進(jìn)行撈取進(jìn)行狀態(tài)修改再塞到 mq耸彪,這就要求另一邊消費消息的系統(tǒng)實現(xiàn)消費冪等伞芹;或者說我們啥都不用做,因為如果 provider 方是接口提供者蝉娜,數(shù)據(jù)庫異常對于外部來說是接口調(diào)用異常唱较,它會重新進(jìn)行調(diào)用,與定時任務(wù)的邏輯相同召川。
針對向 mq 投遞消息丟失的情況南缓,數(shù)據(jù)需要兩個狀態(tài),首先數(shù)據(jù)修改成功會修改第一個狀態(tài)扮宠,但是只有消費方消費消息成功修改數(shù)據(jù)才會修改第二個狀態(tài)(針對消費者如何修改生產(chǎn)者的數(shù)據(jù)庫狀態(tài)西乖,生產(chǎn)者可以提供一個接口給消費者調(diào)用狐榔,不用怕調(diào)用量多大,因為消費者是以恒定的速率消費的)获雕。此時也應(yīng)該有一個定時任務(wù)薄腻,會掃描第一個狀態(tài)改變,而第二個狀態(tài)沒改變的數(shù)據(jù)届案,塞進(jìn) mq庵楷。
其實我本人覺得,這種設(shè)計方式可能比所謂的事務(wù)消息的效率要高楣颠,因為它沒有阻塞原來的流程尽纽,只是針對原來流程的異常場景做一些補償措施。但是它需要數(shù)據(jù)有狀態(tài)機(jī)童漩,針對原來無狀態(tài)的數(shù)據(jù)弄贿,用這種方式意味著對原來的系統(tǒng)進(jìn)行改造,成本可能會高矫膨。對于 RocketMQ 事務(wù)消息來說差凹,它自身實現(xiàn)了事務(wù),即它將自己與用戶提供的業(yè)務(wù)邏輯綁定在一起侧馅。
2危尿、流程與示例
1)事務(wù)消息發(fā)送及提交
(1) 發(fā)送消息(half消息)。
(2) 服務(wù)端響應(yīng)消息寫入結(jié)果馁痴。
(3) 根據(jù)發(fā)送結(jié)果執(zhí)行本地事務(wù)(如果寫入失敗谊娇,此時half消息對業(yè)務(wù)不可見,本地邏輯不執(zhí)行)罗晕。
(4) 根據(jù)本地事務(wù)狀態(tài)執(zhí)行Commit或者Rollback(Commit操作生成消息索引济欢,消息對消費者可見)
針對 (3)而言,生產(chǎn)者需要提供 TransactionListener 接口 executeLocalTransaction 實現(xiàn)方法攀例,即當(dāng)發(fā)送事務(wù)性prepare(half)消息成功時船逮,將調(diào)用此方法以執(zhí)行本地事務(wù)。
2)事務(wù)補償
(1) 對沒有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息)粤铭,從服務(wù)端發(fā)起一次“回查”
(2) Producer收到回查消息挖胃,檢查回查消息對應(yīng)的本地事務(wù)的狀態(tài)
(3) 根據(jù)本地事務(wù)狀態(tài),重新Commit或者Rollback
其中梆惯,補償階段用于解決消息Commit或者Rollback發(fā)生超時或者失敗的情況酱鸭。
針對事務(wù)補償而言,生產(chǎn)者需要 TransactionListener 接口的 checkLocalTransaction 實現(xiàn)方法垛吗,即當(dāng)沒有對 prepare(half) 進(jìn)行響應(yīng)的時候凹髓,broker 會發(fā)送 check 消息檢查本地的事務(wù)狀態(tài)。
3)事務(wù)消息狀態(tài)
事務(wù)消息共有三種狀態(tài)怯屉,提交狀態(tài)蔚舀、回滾狀態(tài)饵沧、中間狀態(tài):
- TransactionStatus.CommitTransaction: 提交事務(wù),它允許消費者消費此消息赌躺。
- TransactionStatus.RollbackTransaction: 回滾事務(wù)狼牺,它代表該消息將被刪除,不允許被消費礼患。
- TransactionStatus.Unknown: 中間狀態(tài)是钥,它代表需要檢查消息隊列來確定狀態(tài)。
示例
Producer.java
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//創(chuàng)建事務(wù)監(jiān)聽器
TransactionListener transactionListener = new TransactionListenerImpl();
//創(chuàng)建消息生產(chǎn)者
TransactionMQProducer producer = new TransactionMQProducer("group6");
producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
//生產(chǎn)者這是監(jiān)聽器
producer.setTransactionListener(transactionListener);
//啟動消息生產(chǎn)者
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 3; i++) {
try {
Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
TimeUnit.SECONDS.sleep(1);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
//producer.shutdown();
}
}
TransactionListenerImpl.java
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
// 可以根據(jù)不同表現(xiàn)返回不同狀態(tài)
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
// 查本地事務(wù)狀態(tài)
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
3缅叠、原理
1)事務(wù)消息在一階段對用戶不可見
在RocketMQ事務(wù)消息的主要流程中悄泥,一階段的消息如何對用戶不可見。其中肤粱,事務(wù)消息相對普通消息最大的特點就是一階段發(fā)送的消息對用戶是不可見的弹囚。那么,如何做到寫入消息但是對用戶不可見呢狼犯?RocketMQ事務(wù)消息的做法是:如果消息是half消息余寥,將備份原消息的主題與消息消費隊列,然后改變主題為RMQ_SYS_TRANS_HALF_TOPIC悯森。由于消費組未訂閱該主題,故消費端無法消費half類型的消息绪撵,然后RocketMQ會開啟一個定時任務(wù)瓢姻,從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進(jìn)行消費,根據(jù)生產(chǎn)者組獲取一個服務(wù)提供者發(fā)送回查事務(wù)狀態(tài)請求音诈,根據(jù)事務(wù)狀態(tài)來決定是提交或回滾消息幻碱。
RocketMQ的具體實現(xiàn)策略是:寫入的如果事務(wù)消息,對消息的Topic和Queue等屬性進(jìn)行替換细溅,同時將原來的Topic和Queue信息存儲到消息的屬性中褥傍,正因為消息主題被替換,故消息并不會轉(zhuǎn)發(fā)到該原主題的消息消費隊列喇聊,消費者無法感知消息的存在恍风,不會消費。其實改變消息主題是RocketMQ的常用“套路”誓篱,延時消息的實現(xiàn)機(jī)制也是如此朋贬。
這種替換思路很常見,比如 dfs 中文件替換思路窜骄。
2)Commit和Rollback操作以及Op消息的引入
在完成一階段寫入一條對用戶不可見的消息后锦募,二階段如果是Commit操作,則需要讓消息對用戶可見邻遏;如果是Rollback則需要撤銷一階段的消息糠亩。
先說Rollback的情況虐骑。對于Rollback,本身一階段的消息對用戶是不可見的赎线,其實不需要真正撤銷消息(實際上RocketMQ也無法去真正的刪除一條消息富弦,因為是順序?qū)懳募模5菂^(qū)別于這條消息沒有確定狀態(tài)(Pending狀態(tài)氛驮,事務(wù)懸而未決)腕柜,需要一個操作來標(biāo)識這條消息的最終狀態(tài)。
RocketMQ事務(wù)消息方案中引入了Op消息的概念矫废,用Op消息標(biāo)識事務(wù)消息已經(jīng)確定的狀態(tài)(Commit或者Rollback)盏缤。如果一條事務(wù)消息沒有對應(yīng)的Op消息,說明這個事務(wù)的狀態(tài)還無法確定(可能是二階段失敗了)蓖扑。引入Op消息后唉铜,事務(wù)消息無論是Commit或者Rollback都會記錄一個Op操作。Commit相對于Rollback只是在寫入Op消息前創(chuàng)建Half消息的索引律杠。
3)Op消息的存儲和對應(yīng)關(guān)系
RocketMQ將Op消息寫入到全局一個特定的Topic中通過源碼中的方法—TransactionalMessageUtil.buildOpTopic()潭流;這個Topic是一個內(nèi)部的Topic(像Half消息的Topic一樣),不會被用戶消費柜去。Op消息的內(nèi)容為對應(yīng)的Half消息的存儲的Offset灰嫉,這樣通過Op消息能索引到Half消息進(jìn)行后續(xù)的回查操作。
Op 消息最大的用處是標(biāo)識 half 消息是否到了終態(tài)(commit 或 rollback)嗓奢,因為沒有終態(tài)的消息需要回查來確定狀態(tài)(half 為啥不自己標(biāo)識消息的狀態(tài)呢讼撒?不是很清楚,我也只是從概念上認(rèn)識股耽,沒有深入源碼)
4)Half消息的索引構(gòu)建
在執(zhí)行二階段Commit操作時根盒,需要構(gòu)建出Half消息的索引。一階段的Half消息由于是寫到一個特殊的Topic物蝙,所以二階段構(gòu)建索引時需要讀取出Half消息炎滞,并將Topic和Queue替換成真正的目標(biāo)的Topic和Queue,之后通過一次普通消息的寫入操作來生成一條對用戶可見的消息诬乞。所以RocketMQ事務(wù)消息二階段其實是利用了一階段存儲的消息的內(nèi)容册赛,在二階段時恢復(fù)出一條完整的普通消息,然后走一遍消息寫入流程丽惭。
5)如何處理二階段失敗的消息击奶?
如果在RocketMQ事務(wù)消息的二階段過程中失敗了,例如在做Commit操作時责掏,出現(xiàn)網(wǎng)絡(luò)問題導(dǎo)致Commit失敗柜砾,那么需要通過一定的策略使這條消息最終被Commit。RocketMQ采用了一種補償機(jī)制换衬,稱為“回查”痰驱。Broker端對未確定狀態(tài)的消息發(fā)起回查证芭,將消息發(fā)送到對應(yīng)的Producer端(同一個Group的Producer),由Producer根據(jù)消息來檢查本地事務(wù)的狀態(tài)担映,進(jìn)而執(zhí)行Commit或者Rollback废士。Broker端通過對比Half消息和Op消息進(jìn)行事務(wù)消息的回查并且推進(jìn)CheckPoint(記錄那些事務(wù)消息的狀態(tài)是確定的)。
值得注意的是蝇完,rocketmq并不會無休止的的信息事務(wù)狀態(tài)回查官硝,默認(rèn)回查15次,如果15次回查還是無法得知事務(wù)狀態(tài)短蜕,rocketmq默認(rèn)回滾該消息氢架。
整體流程如下:
3、后記
RocketMQ 的事務(wù)消息能不能用呢朋魔?我覺得見仁見智岖研。它確實提供一個比較簡單的方式讓我們實現(xiàn)事務(wù),但與之而來的就是性能問題警检。所以設(shè)計架構(gòu)時孙援,我們可能不能單一依賴某一個中間件的能力,而是應(yīng)該把思路放開一點扇雕,用多種手段保證系統(tǒng)的準(zhǔn)確性拓售。比如我們可以根據(jù)業(yè)務(wù)上來設(shè)計,用文章開頭我們提到的補償措施來避免失敗的問題洼裤。
我們的 mq 是怎么用的呢邻辉?就是用的上面的補償措施。具體到消息分發(fā)腮鞍,很簡單,每個 message 都有相應(yīng) type 屬性莹菱,根據(jù) message 不同的 type 觸發(fā)相應(yīng)的動作移国。我們有三種 producer道伟,分別是 rmqProducer迹缀、ngProducer、fbProducer蜜徽,分別放到三個不同的 topic 中。consumer 消費相應(yīng)的 topic 進(jìn)行消費祟剔,消費到的消息根據(jù)不同的 type 做動作摩梧。