RocketMQ 事務(wù)消息

1屏富、前言

我們在事務(wù)中經(jīng)常會遇到這種情況脂男,向數(shù)據(jù)庫進(jìn)行一些數(shù)據(jù)修改后养叛,再將消息塞進(jìn) mq,供其他系統(tǒng)消費宰翅。但是這個過程中弃甥,數(shù)據(jù)庫修改失敗可以進(jìn)行回滾,但 mq 塞消息卻回滾不了汁讼;或者數(shù)據(jù)庫修改能保證一定修改成功淆攻,mq 卻不能保證消息一定到達(dá)。

針對數(shù)據(jù)庫回滾了嘿架,其實我們可以使用一個補償定時任務(wù)瓶珊,針對數(shù)據(jù)狀態(tài)沒有改變的數(shù)據(jù)(數(shù)據(jù)必須有狀態(tài)),再進(jìn)行撈取進(jìn)行狀態(tài)修改再塞到 mq耸彪,這就要求另一邊消費消息的系統(tǒng)實現(xiàn)消費冪等伞芹;或者說我們啥都不用做,因為如果 provider 方是接口提供者蝉娜,數(shù)據(jù)庫異常對于外部來說是接口調(diào)用異常唱较,它會重新進(jìn)行調(diào)用,與定時任務(wù)的邏輯相同召川。

針對向 mq 投遞消息丟失的情況南缓,數(shù)據(jù)需要兩個狀態(tài),首先數(shù)據(jù)修改成功會修改第一個狀態(tài)扮宠,但是只有消費方消費消息成功修改數(shù)據(jù)才會修改第二個狀態(tài)(針對消費者如何修改生產(chǎn)者的數(shù)據(jù)庫狀態(tài)西乖,生產(chǎn)者可以提供一個接口給消費者調(diào)用狐榔,不用怕調(diào)用量多大,因為消費者是以恒定的速率消費的)获雕。此時也應(yīng)該有一個定時任務(wù)薄腻,會掃描第一個狀態(tài)改變,而第二個狀態(tài)沒改變的數(shù)據(jù)届案,塞進(jìn) mq庵楷。

其實我本人覺得,這種設(shè)計方式可能比所謂的事務(wù)消息的效率要高楣颠,因為它沒有阻塞原來的流程尽纽,只是針對原來流程的異常場景做一些補償措施。但是它需要數(shù)據(jù)有狀態(tài)機(jī)童漩,針對原來無狀態(tài)的數(shù)據(jù)弄贿,用這種方式意味著對原來的系統(tǒng)進(jìn)行改造,成本可能會高矫膨。對于 RocketMQ 事務(wù)消息來說差凹,它自身實現(xiàn)了事務(wù),即它將自己與用戶提供的業(yè)務(wù)邏輯綁定在一起侧馅。

2危尿、流程與示例

事務(wù)消息
1)事務(wù)消息發(fā)送及提交

(1) 發(fā)送消息(half消息)。
(2) 服務(wù)端響應(yīng)消息寫入結(jié)果馁痴。
(3) 根據(jù)發(fā)送結(jié)果執(zhí)行本地事務(wù)(如果寫入失敗谊娇,此時half消息對業(yè)務(wù)不可見,本地邏輯不執(zhí)行)罗晕。
(4) 根據(jù)本地事務(wù)狀態(tài)執(zhí)行Commit或者Rollback(Commit操作生成消息索引济欢,消息對消費者可見)

針對 (3)而言,生產(chǎn)者需要提供 TransactionListener 接口 executeLocalTransaction 實現(xiàn)方法攀例,即當(dāng)發(fā)送事務(wù)性prepare(half)消息成功時船逮,將調(diào)用此方法以執(zhí)行本地事務(wù)。

2)事務(wù)補償

(1) 對沒有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息)粤铭,從服務(wù)端發(fā)起一次“回查”
(2) Producer收到回查消息挖胃,檢查回查消息對應(yīng)的本地事務(wù)的狀態(tài)
(3) 根據(jù)本地事務(wù)狀態(tài),重新Commit或者Rollback

其中梆惯,補償階段用于解決消息Commit或者Rollback發(fā)生超時或者失敗的情況酱鸭。
針對事務(wù)補償而言,生產(chǎn)者需要 TransactionListener 接口的 checkLocalTransaction 實現(xiàn)方法垛吗,即當(dāng)沒有對 prepare(half) 進(jìn)行響應(yīng)的時候凹髓,broker 會發(fā)送 check 消息檢查本地的事務(wù)狀態(tài)。

3)事務(wù)消息狀態(tài)

事務(wù)消息共有三種狀態(tài)怯屉,提交狀態(tài)蔚舀、回滾狀態(tài)饵沧、中間狀態(tài):

  • TransactionStatus.CommitTransaction: 提交事務(wù),它允許消費者消費此消息赌躺。
  • TransactionStatus.RollbackTransaction: 回滾事務(wù)狼牺,它代表該消息將被刪除,不允許被消費礼患。
  • TransactionStatus.Unknown: 中間狀態(tài)是钥,它代表需要檢查消息隊列來確定狀態(tài)。
示例

Producer.java

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        //創(chuàng)建事務(wù)監(jiān)聽器
        TransactionListener transactionListener = new TransactionListenerImpl();
        //創(chuàng)建消息生產(chǎn)者
        TransactionMQProducer producer = new TransactionMQProducer("group6");
        producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");
        //生產(chǎn)者這是監(jiān)聽器
        producer.setTransactionListener(transactionListener);
        //啟動消息生產(chǎn)者
        producer.start();
        String[] tags = new String[]{"TagA", "TagB", "TagC"};
        for (int i = 0; i < 3; i++) {
            try {
                Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                TimeUnit.SECONDS.sleep(1);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        //producer.shutdown();
    }
}

TransactionListenerImpl.java

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        
        // 可以根據(jù)不同表現(xiàn)返回不同狀態(tài)
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        // 查本地事務(wù)狀態(tài)
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

3缅叠、原理

1)事務(wù)消息在一階段對用戶不可見
在RocketMQ事務(wù)消息的主要流程中悄泥,一階段的消息如何對用戶不可見。其中肤粱,事務(wù)消息相對普通消息最大的特點就是一階段發(fā)送的消息對用戶是不可見的弹囚。那么,如何做到寫入消息但是對用戶不可見呢狼犯?RocketMQ事務(wù)消息的做法是:如果消息是half消息余寥,將備份原消息的主題與消息消費隊列,然后改變主題為RMQ_SYS_TRANS_HALF_TOPIC悯森。由于消費組未訂閱該主題,故消費端無法消費half類型的消息绪撵,然后RocketMQ會開啟一個定時任務(wù)瓢姻,從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進(jìn)行消費,根據(jù)生產(chǎn)者組獲取一個服務(wù)提供者發(fā)送回查事務(wù)狀態(tài)請求音诈,根據(jù)事務(wù)狀態(tài)來決定是提交或回滾消息幻碱。

RocketMQ的具體實現(xiàn)策略是:寫入的如果事務(wù)消息,對消息的Topic和Queue等屬性進(jìn)行替換细溅,同時將原來的Topic和Queue信息存儲到消息的屬性中褥傍,正因為消息主題被替換,故消息并不會轉(zhuǎn)發(fā)到該原主題的消息消費隊列喇聊,消費者無法感知消息的存在恍风,不會消費。其實改變消息主題是RocketMQ的常用“套路”誓篱,延時消息的實現(xiàn)機(jī)制也是如此朋贬。

這種替換思路很常見,比如 dfs 中文件替換思路窜骄。

2)Commit和Rollback操作以及Op消息的引入
在完成一階段寫入一條對用戶不可見的消息后锦募,二階段如果是Commit操作,則需要讓消息對用戶可見邻遏;如果是Rollback則需要撤銷一階段的消息糠亩。

先說Rollback的情況虐骑。對于Rollback,本身一階段的消息對用戶是不可見的赎线,其實不需要真正撤銷消息(實際上RocketMQ也無法去真正的刪除一條消息富弦,因為是順序?qū)懳募模5菂^(qū)別于這條消息沒有確定狀態(tài)(Pending狀態(tài)氛驮,事務(wù)懸而未決)腕柜,需要一個操作來標(biāo)識這條消息的最終狀態(tài)。

RocketMQ事務(wù)消息方案中引入了Op消息的概念矫废,用Op消息標(biāo)識事務(wù)消息已經(jīng)確定的狀態(tài)(Commit或者Rollback)盏缤。如果一條事務(wù)消息沒有對應(yīng)的Op消息,說明這個事務(wù)的狀態(tài)還無法確定(可能是二階段失敗了)蓖扑。引入Op消息后唉铜,事務(wù)消息無論是Commit或者Rollback都會記錄一個Op操作。Commit相對于Rollback只是在寫入Op消息前創(chuàng)建Half消息的索引律杠。

3)Op消息的存儲和對應(yīng)關(guān)系
RocketMQ將Op消息寫入到全局一個特定的Topic中通過源碼中的方法—TransactionalMessageUtil.buildOpTopic()潭流;這個Topic是一個內(nèi)部的Topic(像Half消息的Topic一樣),不會被用戶消費柜去。Op消息的內(nèi)容為對應(yīng)的Half消息的存儲的Offset灰嫉,這樣通過Op消息能索引到Half消息進(jìn)行后續(xù)的回查操作。

half —— op —— user

Op 消息最大的用處是標(biāo)識 half 消息是否到了終態(tài)(commit 或 rollback)嗓奢,因為沒有終態(tài)的消息需要回查來確定狀態(tài)(half 為啥不自己標(biāo)識消息的狀態(tài)呢讼撒?不是很清楚,我也只是從概念上認(rèn)識股耽,沒有深入源碼)

4)Half消息的索引構(gòu)建
在執(zhí)行二階段Commit操作時根盒,需要構(gòu)建出Half消息的索引。一階段的Half消息由于是寫到一個特殊的Topic物蝙,所以二階段構(gòu)建索引時需要讀取出Half消息炎滞,并將Topic和Queue替換成真正的目標(biāo)的Topic和Queue,之后通過一次普通消息的寫入操作來生成一條對用戶可見的消息诬乞。所以RocketMQ事務(wù)消息二階段其實是利用了一階段存儲的消息的內(nèi)容册赛,在二階段時恢復(fù)出一條完整的普通消息,然后走一遍消息寫入流程丽惭。

5)如何處理二階段失敗的消息击奶?
如果在RocketMQ事務(wù)消息的二階段過程中失敗了,例如在做Commit操作時责掏,出現(xiàn)網(wǎng)絡(luò)問題導(dǎo)致Commit失敗柜砾,那么需要通過一定的策略使這條消息最終被Commit。RocketMQ采用了一種補償機(jī)制换衬,稱為“回查”痰驱。Broker端對未確定狀態(tài)的消息發(fā)起回查证芭,將消息發(fā)送到對應(yīng)的Producer端(同一個Group的Producer),由Producer根據(jù)消息來檢查本地事務(wù)的狀態(tài)担映,進(jìn)而執(zhí)行Commit或者Rollback废士。Broker端通過對比Half消息和Op消息進(jìn)行事務(wù)消息的回查并且推進(jìn)CheckPoint(記錄那些事務(wù)消息的狀態(tài)是確定的)。

值得注意的是蝇完,rocketmq并不會無休止的的信息事務(wù)狀態(tài)回查官硝,默認(rèn)回查15次,如果15次回查還是無法得知事務(wù)狀態(tài)短蜕,rocketmq默認(rèn)回滾該消息氢架。

整體流程如下:


整體流程

3、后記

RocketMQ 的事務(wù)消息能不能用呢朋魔?我覺得見仁見智岖研。它確實提供一個比較簡單的方式讓我們實現(xiàn)事務(wù),但與之而來的就是性能問題警检。所以設(shè)計架構(gòu)時孙援,我們可能不能單一依賴某一個中間件的能力,而是應(yīng)該把思路放開一點扇雕,用多種手段保證系統(tǒng)的準(zhǔn)確性拓售。比如我們可以根據(jù)業(yè)務(wù)上來設(shè)計,用文章開頭我們提到的補償措施來避免失敗的問題洼裤。

我們的 mq 是怎么用的呢邻辉?就是用的上面的補償措施。具體到消息分發(fā)腮鞍,很簡單,每個 message 都有相應(yīng) type 屬性莹菱,根據(jù) message 不同的 type 觸發(fā)相應(yīng)的動作移国。我們有三種 producer道伟,分別是 rmqProducer迹缀、ngProducer、fbProducer蜜徽,分別放到三個不同的 topic 中。consumer 消費相應(yīng)的 topic 進(jìn)行消費祟剔,消費到的消息根據(jù)不同的 type 做動作摩梧。

4笙纤、參考資料

https://juejin.cn/post/6844904193526857742

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子伐弹,更是在濱河造成了極大的恐慌惨好,老刑警劉巖龄句,帶你破解...
    沈念sama閱讀 217,826評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件职抡,死亡現(xiàn)場離奇詭異萨蚕,居然都是意外死亡浩蓉,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評論 3 395
  • 文/潘曉璐 我一進(jìn)店門嘁字,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事倦炒。” “怎么了?”我有些...
    開封第一講書人閱讀 164,234評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我躏敢,道長件余,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,562評論 1 293
  • 正文 為了忘掉前任遭居,我火速辦了婚禮啼器,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘魏滚。我一直安慰自己镀首,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,611評論 6 392
  • 文/花漫 我一把揭開白布鼠次。 她就那樣靜靜地躺著更哄,像睡著了一般。 火紅的嫁衣襯著肌膚如雪腥寇。 梳的紋絲不亂的頭發(fā)上成翩,一...
    開封第一講書人閱讀 51,482評論 1 302
  • 那天,我揣著相機(jī)與錄音赦役,去河邊找鬼麻敌。 笑死,一個胖子當(dāng)著我的面吹牛掂摔,可吹牛的內(nèi)容都是我干的术羔。 我是一名探鬼主播,決...
    沈念sama閱讀 40,271評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼乙漓,長吁一口氣:“原來是場噩夢啊……” “哼级历!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起叭披,我...
    開封第一講書人閱讀 39,166評論 0 276
  • 序言:老撾萬榮一對情侶失蹤寥殖,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體嚼贡,經(jīng)...
    沈念sama閱讀 45,608評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡熏纯,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,814評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了粤策。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片樟澜。...
    茶點故事閱讀 39,926評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖掐场,靈堂內(nèi)的尸體忽然破棺而出往扔,到底是詐尸還是另有隱情,我是刑警寧澤熊户,帶...
    沈念sama閱讀 35,644評論 5 346
  • 正文 年R本政府宣布萍膛,位于F島的核電站,受9級特大地震影響嚷堡,放射性物質(zhì)發(fā)生泄漏蝗罗。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,249評論 3 329
  • 文/蒙蒙 一蝌戒、第九天 我趴在偏房一處隱蔽的房頂上張望串塑。 院中可真熱鬧,春花似錦北苟、人聲如沸桩匪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽傻昙。三九已至,卻和暖如春彩扔,著一層夾襖步出監(jiān)牢的瞬間妆档,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評論 1 269
  • 我被黑心中介騙來泰國打工虫碉, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留贾惦,地道東北人。 一個月前我還...
    沈念sama閱讀 48,063評論 3 370
  • 正文 我出身青樓敦捧,卻偏偏與公主長得像须板,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子兢卵,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,871評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 1逼纸、從本地事務(wù)到分布式事務(wù) 我們經(jīng)常支付寶轉(zhuǎn)賬余額寶,這是日常生活的一件普通小事济蝉,但是我們思考支付寶扣除轉(zhuǎn)賬的錢之...
    冰河winner閱讀 760評論 0 4
  • 系列 RocketMq 消息Tag過濾 RocketMq 廣播模式 RocketMQ 同步調(diào)用的新特性 Rocke...
    晴天哥_王志閱讀 1,393評論 0 1
  • 在微服務(wù)架構(gòu)中,隨著服務(wù)的逐步拆分,數(shù)據(jù)庫私有已經(jīng)成為共識王滤,這也導(dǎo)致所面臨的分布式事務(wù)問題成為微服務(wù)落地過程中一個...
    mingxungu閱讀 466評論 1 1
  • RocketMQ事務(wù)消息接口介紹 當(dāng)我們在業(yè)務(wù)邏輯中發(fā)送消息時贺嫂,消息與業(yè)務(wù)的事務(wù)之間難以保證一致性,如果業(yè)務(wù)代碼出...
    無醉_1866閱讀 2,844評論 0 0
  • 一雁乡、背景 MQ組件是系統(tǒng)架構(gòu)里必不可少的一門利器第喳,設(shè)計層面可以降低系統(tǒng)耦合度,高并發(fā)場景又可以起到削峰填谷的作用踱稍,...
    清茶豆奶閱讀 661評論 0 1