一定讓你看懂的RocketMQ事務(wù)消息源碼分析

前言

得益于MQ削峰填谷诗赌,系統(tǒng)解耦丢郊,操作異步等功能特性,在互聯(lián)網(wǎng)行業(yè)彩掐,可以說(shuō)有分布式服務(wù)的地方构舟,MQ都往往不會(huì)缺席。由阿里自研的RocketMQ更是經(jīng)歷了多年的雙十一高并發(fā)挑戰(zhàn)堵幽,其中4.3.0版本推出了事務(wù)消息的新特性狗超,本文對(duì)RocketMQ 4.5.0版本事務(wù)消息相關(guān)的源碼跟蹤介紹,通過(guò)閱讀讀者可以知道:

事務(wù)消息解決什么樣的問(wèn)題

事務(wù)消息的實(shí)現(xiàn)原理及其設(shè)計(jì)亮點(diǎn)

解決什么問(wèn)題

假設(shè)我所在的系統(tǒng)現(xiàn)在有這樣一個(gè)場(chǎng)景:

本地開(kāi)啟數(shù)據(jù)庫(kù)事務(wù)進(jìn)行扣款操作朴下,成功后發(fā)送MQ消息給庫(kù)存中心進(jìn)行發(fā)貨努咐。

有人會(huì)想到開(kāi)啟mybatis事務(wù)實(shí)現(xiàn),把本地事務(wù)和MQ消息放在一起不就行了嗎殴胧?如果MQ發(fā)送成功渗稍,就提交事務(wù),發(fā)送失敗就回滾事務(wù)团滥,整套操作一氣呵成竿屹。

transaction{? 扣款();booleansuccess = 發(fā)送MQ();if(success){? ? commit();? }else{? ? rollBack();? }}

看似沒(méi)什么問(wèn)題,但是網(wǎng)絡(luò)是不可靠的灸姊。

假設(shè)MQ返回過(guò)來(lái)的響應(yīng)因?yàn)榫W(wǎng)絡(luò)原因遲遲沒(méi)有收到拱燃,所以在面對(duì)不確定的MQ返回結(jié)果只好進(jìn)行回滾。但是MQ 服務(wù)器又確實(shí)是收到了這條消息的力惯,只是給客戶(hù)端的響應(yīng)丟失了碗誉,所以導(dǎo)致的結(jié)果就是扣款失敗召嘶,成功發(fā)貨。


既然MQ消息的發(fā)送不能和本地事務(wù)寫(xiě)在一起哮缺,那如何來(lái)保證其整體具有原子性的需求呢苍蔬?答案就是今天我們介紹的主角:事務(wù)消息

概覽


總體而言RocketMQ事務(wù)消息分為兩條主線

定時(shí)任務(wù)發(fā)送流程:發(fā)送half message(半消息)蝴蜓,執(zhí)行本地事務(wù)碟绑,發(fā)送事務(wù)執(zhí)行結(jié)果

定時(shí)任務(wù)回查流程:MQ服務(wù)器回查本地事務(wù),發(fā)送事務(wù)執(zhí)行結(jié)果

因此本文也通過(guò)這兩條主線對(duì)源碼進(jìn)行分析

源碼分析

半消息發(fā)送流程

本地應(yīng)用(client)

在本地應(yīng)用發(fā)送事務(wù)消息的核心類(lèi)是TransactionMQProducer茎匠,該類(lèi)通過(guò)繼承DefaultMQProducer來(lái)復(fù)用大部分發(fā)送消息相關(guān)的邏輯格仲,這個(gè)類(lèi)的代碼量非常少只有100來(lái)行,下面是這個(gè)類(lèi)的sendMessageTransaction方法

@OverridepublicTransactionSendResultsendMessageInTransaction(finalMessage msg,finalObject arg)throwsMQClientException{if(null==this.transactionListener) {thrownewMQClientException("TransactionListener is null",null);? ? }returnthis.defaultMQProducerImpl.sendMessageInTransaction(msg,null, arg);}

這個(gè)方法做了兩件事诵冒,

檢查transactionListener是否存在

調(diào)用父類(lèi)執(zhí)行事務(wù)消息發(fā)送

TransactionListener在事務(wù)消息流程中起到至關(guān)重要的作用凯肋,一起看看這個(gè)接口

publicinterfaceTransactionListener{/**? ? * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.? ? *? ? *@parammsg Half(prepare) message? ? *@paramarg Custom business parameter? ? *@returnTransaction state? ? */LocalTransactionStateexecuteLocalTransaction(finalMessage msg,finalObject arg);/**? ? * When no response to prepare(half) message. broker will send check message to check the transaction status, and this? ? * method will be invoked to get local transaction status.? ? *? ? *@parammsg Check message? ? *@returnTransaction state? ? */LocalTransactionStatecheckLocalTransaction(finalMessageExt msg);}

接口注釋說(shuō)的很明白,配合上面的概覽圖來(lái)看就是汽馋,executeLocalTransaction方法對(duì)應(yīng)的就是執(zhí)行本地事務(wù)操作侮东,checkLocalTransaction對(duì)應(yīng)的就是回查本地事務(wù)操作。

下面是DefaultMQProducer類(lèi)的sendMessageInTransaction方法源碼

publicTransactionSendResultsendMessageInTransaction(finalMessage msg,finalLocalTransactionExecuter localTransactionExecuter,finalObject arg)throwsMQClientException{? ? ...? ? SendResult sendResult =null;? ? MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED,"true");? ? MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP,this.defaultMQProducer.getProducerGroup());? ? ...? ? ? ? sendResult =this.send(msg);? ? ...switch(sendResult.getSendStatus()) {caseSEND_OK: {? ? ? ? ? ? ...? ? ? ? localTransactionState = transactionListener.executeLocalTransaction(msg, arg);? ? ? ? ? ? ? ? ...break;caseFLUSH_DISK_TIMEOUT:caseFLUSH_SLAVE_TIMEOUT:caseSLAVE_NOT_AVAILABLE:? ? ? ? ? ? localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;? ? }? ? ...this.endTransaction(sendResult, localTransactionState, localException);...}復(fù)制代碼

為了使源碼的邏輯更加直觀豹芯,筆者精簡(jiǎn)了核心代碼悄雅。sendMessageInTransaction方法主要做了以下事情

給消息打上事務(wù)消息相關(guān)的標(biāo)記,用于MQ服務(wù)端區(qū)分普通消息和事務(wù)消息

發(fā)送半消息(half message)

發(fā)送成功則由transactionListener執(zhí)行本地事務(wù)

執(zhí)行endTransaction方法铁蹈,如果半消息發(fā)送失敗本地事務(wù)執(zhí)行失敗告訴服務(wù)端是刪除半消息宽闲,半消息發(fā)送成功本地事務(wù)執(zhí)行成功則告訴服務(wù)端生效半消息。

發(fā)送半消息流程握牧,Client端代碼到這里差不多就結(jié)束了容诬,接下來(lái)看看RocketMQ Server端是如何處理的

RocketMQ Server

Server在接收到消息過(guò)后會(huì)進(jìn)行一些領(lǐng)域?qū)ο蟮霓D(zhuǎn)化和是否支持事務(wù)消息的權(quán)限校驗(yàn),對(duì)理解事務(wù)消息用處不大沿腰,此處就省略對(duì)旁枝末節(jié)的介紹了览徒。下面是TransactionalMessageBridge類(lèi)處理half message的源碼

publicPutMessageResultputHalfMessage(MessageExtBrokerInner messageInner){returnstore.putMessage(parseHalfMessageInner(messageInner));}privateMessageExtBrokerInnerparseHalfMessageInner(MessageExtBrokerInner msgInner){? ? MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());? ? MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,? ? ? ? String.valueOf(msgInner.getQueueId()));? ? msgInner.setSysFlag(? ? ? ? MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));? ? msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());? ? msgInner.setQueueId(0);? ? msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));returnmsgInner;}

這兩個(gè)方法主要做了以下事情:

publicclassMessageimplementsSerializable{privatestaticfinallongserialVersionUID =8445773977080406428L;privateString topic;privateintflag;privateMap properties;privatebyte[] body;privateString transactionId;}

將消息的topic,queueId放進(jìn)消息體自身的map里進(jìn)行緩存

將消息的topic 設(shè)置為“RMQ_SYS_TRANS_OP_HALF_TOPIC”颂龙,queueId設(shè)置為0

將消息寫(xiě)入磁盤(pán)持久化

可以看到所有的事務(wù)半消息都會(huì)被放進(jìn)同一個(gè)topic的同一個(gè)queue里面习蓬,通過(guò)對(duì)topic的區(qū)分,從而避免了半消息被consumer給消費(fèi)到

Server將半消息持久化后然后會(huì)發(fā)送結(jié)果給我們本地的應(yīng)用程序厘托。到了這里Server端對(duì)半消息的處理就結(jié)束了友雳,緊接著的是定時(shí)任務(wù)的登場(chǎng)稿湿。

定時(shí)任務(wù)回查流程

RocketMQ Server

定時(shí)任務(wù)是一個(gè)叫TransactionalMessageService類(lèi)的線程铅匹,下面是該類(lèi)的check方法

@Overridepublicvoidcheck(longtransactionTimeout,inttransactionCheckMax,? ? AbstractTransactionalMessageCheckListener listener){? ? ? ? ? ? ? ? ? ...if(!putBackHalfMsgQueue(msgExt, i)) {continue;? ? }? ? ? listener.resolveHalfMsg(msgExt);? } ...}

check方法非常長(zhǎng),省略的代碼大致都是對(duì)半消息進(jìn)行過(guò)濾(如超過(guò)72小時(shí)的事務(wù)消息饺藤,就被算作過(guò)期)包斑,只保留符合條件的半消息對(duì)其進(jìn)行回查流礁。

其中很有意思的是putBackHalfMsgQueue方法,因?yàn)槊看伟寻胂拇疟P(pán)拉到內(nèi)存里進(jìn)行處理都會(huì)對(duì)其屬性進(jìn)行改變(例如TRANSACTION_CHECK_TIMES罗丰,這是是否丟棄事務(wù)消息的關(guān)鍵信息)神帅,所以在發(fā)送回查消息之前需要對(duì)半消息再次放進(jìn)磁盤(pán)。RocketMQ采取的方法是基于最新的物理偏移量重新寫(xiě)入萌抵,而不是對(duì)原有的半消息進(jìn)行修改找御,其中的目的就是RocketMQ的存儲(chǔ)設(shè)計(jì)采用順序?qū)懀绻バ薷南?绍填,無(wú)法做到高性能霎桅。

下面是resolveHalfMsg方法,主要就是開(kāi)啟一個(gè)線程然后發(fā)送check消息讨永。

publicvoidresolveHalfMsg(finalMessageExt msgExt){? ? executorService.execute(newRunnable() {@Overridepublicvoidrun(){try{? ? ? ? ? ? ? ? sendCheckMessage(msgExt);? ? ? ? ? ? }catch(Exception e) {? ? ? ? ? ? ? ? LOGGER.error("Send check message error!", e);? ? ? ? ? ? }? ? ? ? }? ? });}

本地應(yīng)用(client)

下面是DefaultMQProducerImpl的checkTransactionState方法滔驶,是本地應(yīng)用對(duì)回查消息的處理邏輯

@OverridepublicvoidcheckTransactionState(finalString addr,finalMessageExt msg,finalCheckTransactionStateRequestHeader header){? ? Runnable request =newRunnable() {? ? ? ? ...@Overridepublicvoidrun(){? ? ? ? ? ? ...? ? TransactionListener transactionListener = getCheckListener();? ? ? ? ? ? ...? ? localTransactionState = transactionListener.checkLocalTransaction(message);? ? ? ? ? ? ? ...this.processTransactionState(? ? ? ? ? ? ? ? ? ? localTransactionState,? ? ? ? ? ? ? ? ? ? group,? ? ? ? ? ? ? ? ? ? exception);? ? ? ? ? ? ? ? }privatevoidprocessTransactionState(? ? ? ? ? ... DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,3000);? ? ? ? ? ...? ? ? ? }? ? };this.checkExecutor.submit(request);}

精簡(jiǎn)代碼邏輯后可以清晰的看到

開(kāi)啟一個(gè)線程來(lái)執(zhí)行回查的邏輯

執(zhí)行transactionListener的checkLocalTransaction方法來(lái)獲取本地事務(wù)執(zhí)行的結(jié)果

RocketMQ Server

RocketMQ 服務(wù)器在收到Client發(fā)過(guò)來(lái)的Commit消息后會(huì)

讀出半消息——>恢復(fù)topic等原消息體的信息——>和普通消息一樣再次寫(xiě)入磁盤(pán)——>刪除之前的半消息

如果是Rollback消息則直接刪除之前的半消息

到此,整條RocketMQ 事務(wù)消息的調(diào)用鏈就結(jié)束了

思考

1. 分布式事務(wù)等于事務(wù)消息嗎卿闹?

兩者并沒(méi)有關(guān)系揭糕,事務(wù)消息僅僅保證本地事務(wù)和MQ消息發(fā)送形成整體的原子性,而投遞到MQ服務(wù)器后锻霎,消費(fèi)者是否能一定消費(fèi)成功是無(wú)法保證的著角。

2. 源碼設(shè)計(jì)上有什么亮點(diǎn)嗎?

通過(guò)對(duì)整條鏈路源碼的學(xué)習(xí)理解發(fā)現(xiàn)還是有不少亮點(diǎn)的

server端回查消息的發(fā)送旋恼,client端回查消息邏輯的處理雇寇,client端commit/rollback消息的提交都是用了異步進(jìn)行,可以說(shuō)能異步的地方都用了異步蚌铜,通過(guò)異步+重試的方式保證了在分布式環(huán)境中即使短暫的網(wǎng)絡(luò)狀況不良好锨侯,也不會(huì)影響整體邏輯。

引入TransactionListener冬殃,真正做到了開(kāi)閉原則以及依賴(lài)倒置原則囚痴,面向接口編程。整體擴(kuò)展性做得非常好审葬,使用者只需要編寫(xiě)自己的Listener就可以做到事務(wù)消息的發(fā)送深滚,非常方便

TransactionMQProducer通過(guò)繼承DefaultMQProducer極大地復(fù)用了關(guān)于發(fā)送消息相關(guān)的邏輯

3. 源碼設(shè)計(jì)上有什么不足嗎?

RocketMQ作為一款極其成功的消息中間件涣觉,要發(fā)現(xiàn)不足不是那么容易了痴荐,筆者談幾點(diǎn)看法

sendMessageIntransaction等事務(wù)相關(guān)的方法被劃分在了DefaultMQProducer里面,從內(nèi)聚的角度來(lái)說(shuō)這是跟事務(wù)相關(guān)的發(fā)送消息方法應(yīng)該被劃分在TransactionMQProducer官册。

所有topic的半消息都會(huì)寫(xiě)在topic為RMQ_SYS_TRANS_OP_HALF_TOPIC的半消息隊(duì)列里生兆,并且每條半消息,在整個(gè)鏈路里會(huì)被寫(xiě)多次膝宁,如果并發(fā)很大且大部分消息都是事務(wù)消息的話鸦难,可靠性會(huì)存在問(wèn)題根吁。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市合蔽,隨后出現(xiàn)的幾起案子击敌,更是在濱河造成了極大的恐慌,老刑警劉巖拴事,帶你破解...
    沈念sama閱讀 221,548評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件沃斤,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡刃宵,警方通過(guò)查閱死者的電腦和手機(jī)轰枝,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,497評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)组去,“玉大人鞍陨,你說(shuō)我怎么就攤上這事〈勇。” “怎么了诚撵?”我有些...
    開(kāi)封第一講書(shū)人閱讀 167,990評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)键闺。 經(jīng)常有香客問(wèn)我寿烟,道長(zhǎng),這世上最難降的妖魔是什么辛燥? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,618評(píng)論 1 296
  • 正文 為了忘掉前任筛武,我火速辦了婚禮,結(jié)果婚禮上挎塌,老公的妹妹穿的比我還像新娘徘六。我一直安慰自己,他們只是感情好榴都,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,618評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布待锈。 她就那樣靜靜地躺著,像睡著了一般嘴高。 火紅的嫁衣襯著肌膚如雪竿音。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 52,246評(píng)論 1 308
  • 那天拴驮,我揣著相機(jī)與錄音春瞬,去河邊找鬼。 笑死套啤,一個(gè)胖子當(dāng)著我的面吹牛宽气,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 40,819評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼抹竹,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼线罕!你這毒婦竟也來(lái)了止潮?” 一聲冷哼從身側(cè)響起窃判,我...
    開(kāi)封第一講書(shū)人閱讀 39,725評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎喇闸,沒(méi)想到半個(gè)月后袄琳,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,268評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡燃乍,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,356評(píng)論 3 340
  • 正文 我和宋清朗相戀三年唆樊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片刻蟹。...
    茶點(diǎn)故事閱讀 40,488評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡逗旁,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出舆瘪,到底是詐尸還是另有隱情片效,我是刑警寧澤,帶...
    沈念sama閱讀 36,181評(píng)論 5 350
  • 正文 年R本政府宣布英古,位于F島的核電站淀衣,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏召调。R本人自食惡果不足惜膨桥,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,862評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望唠叛。 院中可真熱鬧只嚣,春花似錦、人聲如沸艺沼。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,331評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)澳厢。三九已至环础,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間剩拢,已是汗流浹背线得。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,445評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工徐伐, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人角雷。 一個(gè)月前我還...
    沈念sama閱讀 48,897評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像祈远,于是被迫代替她去往敵國(guó)和親车份。 傳聞我的和親對(duì)象是個(gè)殘疾皇子扫沼,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,500評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容