RocketMQ應(yīng)用——事務(wù)消息

問(wèn)題引入

這里的一個(gè)需求場(chǎng)景式:工行用戶A想建行用戶B轉(zhuǎn)賬1萬(wàn)元

我們可以使用同步消息來(lái)處理該需求場(chǎng)景


image.png

1掉奄、 工行系統(tǒng)發(fā)送一個(gè)給B贈(zèng)款1萬(wàn)元的同步消息M給Broker
2嗓袱、 消息被Broker成功接收后淮椰,向工行系統(tǒng)發(fā)送成功ACK
3捂贿、 工行系統(tǒng)收到成功ACK后向用戶A中扣款1萬(wàn)元
4泛释、 建行系統(tǒng)從Broker中獲取消息M
5、 建行系統(tǒng)消費(fèi)消息M甚带,即向用戶B中贈(zèng)款1萬(wàn)元

這其中是有問(wèn)題的:若第3步中的扣款操作失敗她肯,但消息已經(jīng)發(fā)送到了Broker佳头。對(duì)于MQ來(lái)說(shuō),只要消息寫(xiě)入成功晴氨,那么這個(gè)消息就可以被消費(fèi)康嘉。此時(shí)建行系統(tǒng)中用戶B增加了1萬(wàn)元。出現(xiàn)了數(shù)據(jù)不一致問(wèn)題

解決思路

解決思路是籽前,讓第1/2/3步具有原子性亭珍,要么全部成功,要么全部失敗枝哄。即消息發(fā)送成功后肄梨,必須要保證扣款成功。如果扣款失敗挠锥,則回滾發(fā)送成功的消息众羡。而該思路即使用事務(wù)消息。這里要使用分布式事務(wù)解決方案蓖租。

image.png

使用事務(wù)消息來(lái)處理該需求場(chǎng)景:
1粱侣、 事務(wù)管理器TM向事務(wù)協(xié)調(diào)器TC發(fā)起指令,開(kāi)啟全局事務(wù)
2蓖宦、 工行系統(tǒng)發(fā)一個(gè)給B贈(zèng)款1萬(wàn)元的事務(wù)消息M給TC
3甜害、 TC會(huì)向Broker發(fā)送半事務(wù)消息prepareHalf,將消息M預(yù)提交到Broker球昨。此時(shí)的建行系統(tǒng)是看不到Broker中的消息M的
4沫浆、 Broker會(huì)將預(yù)提交執(zhí)行結(jié)果Report給TC
5、 如果預(yù)提交失敗匿垄,則TC會(huì)向TM上報(bào)預(yù)提交失敗的相應(yīng)亿乳,全局事務(wù)結(jié)束;如果與提交成功共螺,TC會(huì)調(diào)用工行系統(tǒng)的回調(diào)操作该肴,去完成工行用戶A的預(yù)扣款1萬(wàn)元操作
6、 工行系統(tǒng)會(huì)向TC發(fā)送預(yù)扣款執(zhí)行結(jié)果藐不,即本地事務(wù)的執(zhí)行狀態(tài)
7匀哄、 TC收到預(yù)扣款執(zhí)行結(jié)果后,會(huì)將結(jié)果上報(bào)給TM
8雏蛮、 TM會(huì)根據(jù)上報(bào)結(jié)果向TC發(fā)出不同的確認(rèn)指令

  • 若預(yù)扣款成功(本地事務(wù)狀態(tài)為COMMIT_MESSAGE)涎嚼,則TM向TC發(fā)送Global Commit指令
  • 若預(yù)扣款失敗(本地事務(wù)狀態(tài)為ROLLBACK_MESSAGE)挑秉,則TM向TC發(fā)送Global Rollback指令
  • 若未知狀態(tài)(本地事務(wù)狀態(tài)為UNKNOW)法梯,則會(huì)觸發(fā)工行系統(tǒng)的本地事務(wù)狀態(tài)回查操作。回查操作會(huì)將回查結(jié)果立哑,Report給TC夜惭。TC將結(jié)果上報(bào)給TM,TM會(huì)向TC發(fā)送最終確認(rèn)指令
    9铛绰、 TC在接收到指令后會(huì)向Broker與工行系統(tǒng)發(fā)出確認(rèn)指令
  • TC接收的若是Global Commit指令诈茧,則向Broker與工行系統(tǒng)發(fā)送Branch Commit指令。此時(shí)Broker中的消息M才可被建行系統(tǒng)看到捂掰;此時(shí)的工行用戶A中的扣款操作才真正被確認(rèn)
  • TC接收的若是Global Rollback指令若皱,則向Broker與工行系統(tǒng)發(fā)送Branch Rollback指令。此時(shí)Broker中的消息M將被撤銷(xiāo)尘颓;工行用戶A中的扣款操作將被回滾

以上方案就是為了確保消息投遞與扣款操作能夠在一個(gè)事務(wù)中走触,要成功都成功,有一個(gè)失敗疤苹,就全部回滾互广。

以上方案并不是一個(gè)典型的XA模式。因?yàn)閄A模式中的分支事務(wù)是異步的卧土,而事務(wù)消息方案中的消息預(yù)提交與預(yù)扣款操作間是同步的惫皱。

基礎(chǔ)

分布式事務(wù)

對(duì)于分布式事務(wù),通俗地說(shuō)就是尤莺,一次操作由若干分支操作組成旅敷,這些分支操作分屬不同應(yīng)用,分布在不同服務(wù)器上颤霎。分布式事務(wù)需要保證這些分支操作要么全部成功媳谁,要么全部失敗。分布式事務(wù)與普通事務(wù)一樣友酱,就是為了保證操作結(jié)果的一致性

事務(wù)消息

RocketMQ提供了類(lèi)似X/Open XA的分布式事務(wù)功能晴音,通過(guò)事務(wù)消息能達(dá)到分布式事務(wù)的最終一致性。XA是一種分布式事務(wù)解決方案缔杉,一種分布式事務(wù)處理模式锤躁。

半事務(wù)消息

咱不能投遞的消息,發(fā)送方已經(jīng)成功地將消息發(fā)送到了Broker或详,但是Broker未收到最終確認(rèn)指令系羞,此時(shí)該消息被標(biāo)記成“暫不能投遞”狀態(tài),即不能被消費(fèi)者看到霸琴。處于該種狀態(tài)下的消息即半事務(wù)消息椒振。

本地事務(wù)狀態(tài)

Producer回調(diào)操作執(zhí)行的結(jié)果為本地事務(wù)狀態(tài),其會(huì)發(fā)送給TC沈贝,而TC會(huì)再發(fā)送給TM杠人,TM會(huì)根據(jù)TC發(fā)送的本地事務(wù)狀態(tài)來(lái)決定全局事務(wù)確認(rèn)指令。

消息回查

image.png

消息回查宋下,即重新查詢本地事務(wù)的執(zhí)行狀態(tài)嗡善。本利就是重新到DB中查看預(yù)扣款操作是否執(zhí)行成功。

注意:消息回查不是重新執(zhí)行回調(diào)操作学歧≌忠回調(diào)操作是進(jìn)行預(yù)扣款操作,而消息回查則是查看預(yù)扣款操作執(zhí)行的結(jié)果

引發(fā)消息回查的原因最常見(jiàn)的有兩個(gè):
1枝笨、 回調(diào)操作返回UNKNOW
2袁铐、TC沒(méi)有接收到TM的最終全局事務(wù)確認(rèn)指令

RocketMQ中的消息回查設(shè)置

關(guān)于消息回查,有三個(gè)常見(jiàn)的屬性設(shè)置横浑。他們都在Broker加載的配置文件中配置剔桨,

  • transactionTimeout=20,指定TM在20秒內(nèi)應(yīng)將最終確認(rèn)狀態(tài)發(fā)送給TC徙融,否則引發(fā)消息回查洒缀。默認(rèn)60秒
  • transactionCheckMax=5,指定最多回查5次欺冀,吵過(guò)后將丟棄消息并記錄錯(cuò)誤日志树绩。默認(rèn)15次
  • transactionCheckInterval=10,指定設(shè)置的多次消息回查的時(shí)間間隔為10秒隐轩,默認(rèn)為60秒

XA模式三劍客

XA協(xié)議

XA(Unix Transaction)是一種分布式事務(wù)解決方案饺饭,一種分布式事務(wù)處理模式,是基于XA協(xié)議的职车。XA協(xié)議有Tuxedo(Transaction for Unix has been Extended for Distributed Operation瘫俊,分布式操作擴(kuò)展之后的Unix事務(wù)系統(tǒng))首先提出,并交給X/Open組織悴灵,作為資源管理器與事務(wù)管理器的接口標(biāo)準(zhǔn)军援。

XA模式中有三個(gè)重要組件:TC、TM称勋、RM

TC

Transaction Coordinator胸哥,事務(wù)協(xié)調(diào)者。維護(hù)全局和分支事務(wù)的狀態(tài)赡鲜,驅(qū)動(dòng)全局事務(wù)提交或回滾

RocketMQ中Broker充當(dāng)著TC

TM

Transaction Manager空厌,事務(wù)管理器。定義全局事務(wù)的范圍:開(kāi)始全局事務(wù)银酬、提交或回滾全局事務(wù)嘲更。它實(shí)際是全局事務(wù)的發(fā)起者

RocketMQ中事務(wù)消息的Producer充當(dāng)著TM

RM

Resource Manager,資源管理器揩瞪。管理分支事務(wù)處理的資源赋朦,與TC交談并注冊(cè)分支事務(wù)和報(bào)告分支事務(wù)的狀態(tài),并驅(qū)動(dòng)分支事務(wù)提交或回滾

RocketMQ中事務(wù)消息的Producer及Broker均是RM

XA模式架構(gòu)

image.png

XA模式是一個(gè)典型的2PC,其執(zhí)行原理如下:
1宠哄、 TM向TC發(fā)起指令壹将,開(kāi)啟一個(gè)全局事務(wù)。
2毛嫉、 根據(jù)業(yè)務(wù)要求诽俯,各個(gè)RM會(huì)逐個(gè)向TC注冊(cè)分支事務(wù),然后TC會(huì)逐個(gè)向RM發(fā)出預(yù)執(zhí)行指令承粤。
3暴区、 各個(gè)RM在接收到指令后會(huì)進(jìn)行本地事務(wù)預(yù)執(zhí)行。
4辛臊、 RM將預(yù)執(zhí)行結(jié)果Report給TC仙粱。當(dāng)然,這個(gè)結(jié)果可能成功彻舰,也可能失敗伐割。
5、 TC在接收到各個(gè)RM的Report后會(huì)匯總上報(bào)給TM淹遵,根據(jù)匯總結(jié)果TM會(huì)向TC發(fā)出確認(rèn)指令
6口猜、 TC在接收到指令后再次向RM發(fā)送確認(rèn)指令

事務(wù)消息方案并不是一個(gè)典型的XA模式。因?yàn)閄A模式中的分支事務(wù)是異步的透揣,而事務(wù)消息方案中的消息預(yù)提交與預(yù)扣款操作間是同步的济炎。

注意

  • 事務(wù)消息不支持延時(shí)消息
  • 對(duì)于事務(wù)消息要做好冪等性檢查,因?yàn)槭聞?wù)消息可能不止一次被消費(fèi)(因?yàn)榇嬖诨貪L后再提交的情況)

代碼

定義工行事務(wù)監(jiān)聽(tīng)器

public class ICBCTransactionListener implements TransactionListener { 
    // 回調(diào)操作方法 
    // 消息預(yù)提交成功就會(huì)觸發(fā)該方法的執(zhí)行辐真,用于完成本地事務(wù) 
    @Override 
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { 
        System.out.println("預(yù)提交消息成功:" + msg); 
        // 假設(shè)接收到TAGA的消息就表示扣款操作成功须尚,TAGB的消息表示扣款失敗, 
        // TAGC表示扣款結(jié)果不清楚侍咱,需要執(zhí)行消息回查 
        if (StringUtils.equals("TAGA", msg.getTags())) { 
            return LocalTransactionState.COMMIT_MESSAGE; 
        } else if (StringUtils.equals("TAGB", msg.getTags())) { 
            return LocalTransactionState.ROLLBACK_MESSAGE; 
        } else if (StringUtils.equals("TAGC", msg.getTags())) { 
            return LocalTransactionState.UNKNOW; 
        }
        return LocalTransactionState.UNKNOW; 
    }

    // 消息回查方法 
    // 引發(fā)消息回查的原因最常見(jiàn)的有兩個(gè): 
    // 1)回調(diào)操作返回UNKNWON 
    // 2)TC沒(méi)有接收到TM的最終全局事務(wù)確認(rèn)指令 
    @Override 
    public LocalTransactionState checkLocalTransaction(MessageExt msg) { 
        System.out.println("執(zhí)行消息回查" + msg.getTags()); 
        return LocalTransactionState.COMMIT_MESSAGE; 
    } 
}

定義事務(wù)消息生產(chǎn)者

public class TransactionProducer {
    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("tpg");
        producer.setNamesrvAddr("rocketmqOS:9876");
        /**
         * 定義一個(gè)線程池
         * @param corePoolSize 線程池中核心線程數(shù)量
         * @param maximumPoolSize 線程池中最多線程數(shù)
         * @param keepAliveTime 這是一個(gè)時(shí)間耐床。當(dāng)線程池中線程數(shù)量大于核心線程數(shù)量 是,
         * 多余空閑線程的存活時(shí)長(zhǎng)
         * @param unit 時(shí)間單位
         * @param workQueue 臨時(shí)存放任務(wù)的隊(duì)列楔脯,其參數(shù)就是隊(duì)列的長(zhǎng)度
         * @param threadFactory 線程工廠
         */
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        // 為生產(chǎn)者指定一個(gè)線程池 
        producer.setExecutorService(executorService);
        // 為生產(chǎn)者添加事務(wù)監(jiān)聽(tīng)器 
        producer.setTransactionListener(new ICBCTransactionListener());
        producer.start();
        String[] tags = {"TAGA", "TAGB", "TAGC"};
        for (int i = 0; i < 3; i++) {
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("TTopic", tags[i], body);
            // 發(fā)送事務(wù)消息 
            // 第二個(gè)參數(shù)用于指定在執(zhí)行本地事務(wù)時(shí)要使用的業(yè)務(wù)參數(shù) 
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.println("發(fā)送結(jié)果為:" + sendResult.getSendStatus());
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末撩轰,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子昧廷,更是在濱河造成了極大的恐慌堪嫂,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,734評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件木柬,死亡現(xiàn)場(chǎng)離奇詭異皆串,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)眉枕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門(mén)恶复,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)怜森,“玉大人,你說(shuō)我怎么就攤上這事谤牡「惫瑁” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,133評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵拓哟,是天一觀的道長(zhǎng)想许。 經(jīng)常有香客問(wèn)我伶授,道長(zhǎng)断序,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,532評(píng)論 1 293
  • 正文 為了忘掉前任糜烹,我火速辦了婚禮违诗,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘疮蹦。我一直安慰自己诸迟,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,585評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布愕乎。 她就那樣靜靜地躺著阵苇,像睡著了一般。 火紅的嫁衣襯著肌膚如雪感论。 梳的紋絲不亂的頭發(fā)上绅项,一...
    開(kāi)封第一講書(shū)人閱讀 51,462評(píng)論 1 302
  • 那天,我揣著相機(jī)與錄音比肄,去河邊找鬼快耿。 笑死,一個(gè)胖子當(dāng)著我的面吹牛芳绩,可吹牛的內(nèi)容都是我干的掀亥。 我是一名探鬼主播,決...
    沈念sama閱讀 40,262評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼妥色,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼搪花!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起嘹害,我...
    開(kāi)封第一講書(shū)人閱讀 39,153評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤撮竿,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后吼拥,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體倚聚,經(jīng)...
    沈念sama閱讀 45,587評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,792評(píng)論 3 336
  • 正文 我和宋清朗相戀三年凿可,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了惑折。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片授账。...
    茶點(diǎn)故事閱讀 39,919評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖惨驶,靈堂內(nèi)的尸體忽然破棺而出白热,到底是詐尸還是另有隱情,我是刑警寧澤粗卜,帶...
    沈念sama閱讀 35,635評(píng)論 5 345
  • 正文 年R本政府宣布屋确,位于F島的核電站,受9級(jí)特大地震影響续扔,放射性物質(zhì)發(fā)生泄漏攻臀。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,237評(píng)論 3 329
  • 文/蒙蒙 一纱昧、第九天 我趴在偏房一處隱蔽的房頂上張望刨啸。 院中可真熱鬧,春花似錦识脆、人聲如沸设联。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,855評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)离例。三九已至,卻和暖如春悉稠,著一層夾襖步出監(jiān)牢的瞬間宫蛆,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,983評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工偎球, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留洒扎,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,048評(píng)論 3 370
  • 正文 我出身青樓衰絮,卻偏偏與公主長(zhǎng)得像袍冷,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子猫牡,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,864評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 分布式事務(wù)場(chǎng)景 1胡诗、跨JVM進(jìn)程會(huì)產(chǎn)生分布式事務(wù)(微服務(wù)間調(diào)用)2、跨數(shù)據(jù)庫(kù)實(shí)例會(huì)產(chǎn)生分布式事務(wù)(同一個(gè)微服務(wù)內(nèi)調(diào)...
    小丑的果實(shí)閱讀 615評(píng)論 0 0
  • https://zhuanlan.zhihu.com/p/437577902[https://zhuanlan.z...
    程序員小2閱讀 664評(píng)論 0 3
  • 分布式事務(wù)解決的用戶最本質(zhì)訴求是什么淌友?數(shù)據(jù)一致煌恢。 大中企業(yè)有一個(gè)共同的訴求是數(shù)據(jù)一致,幾乎覆蓋到各個(gè)行業(yè)震庭。 比如說(shuō)...
    脆皮雞大蝦閱讀 265評(píng)論 0 0
  • 本地事務(wù) 在計(jì)算機(jī)系統(tǒng)中瑰抵,更多的是通過(guò)關(guān)系型數(shù)據(jù)庫(kù)來(lái)控制事務(wù),這是利用數(shù)據(jù)庫(kù)本身的事務(wù)特性來(lái)實(shí)現(xiàn)的器联,因此叫數(shù)據(jù)庫(kù)事...
    知止9528閱讀 594評(píng)論 0 0
  • 最近整理了下分布式事務(wù)相關(guān)知識(shí)及典型應(yīng)用場(chǎng)景解決方案二汛,主要內(nèi)容如下: 1婿崭、分布式事務(wù)1)事務(wù)簡(jiǎn)介2)本地事務(wù)講解3...
    javacoo閱讀 1,586評(píng)論 0 18