RocketMq源碼篇-事務消息之broker處理事務消息

一、broker事務消息處理流程

  • 接收事務消息布蔗,存入half topic
    • 接收事務消息定续,存入half topic ,這個其實跟發(fā)送普通消息差不多燥滑,只不過它這里要將原來的topic 渐北,queueId 換成事務消息的,也就是half topic 铭拧,queueId 是half topic 里面queue的id
  • 提交事務
    • 本地事務執(zhí)行完成之后赃蛛,會告訴broker 某個事務消息的本地執(zhí)行結果恃锉。如果是提交事務,會將原來存在half topic 中的事務消息取出來呕臂,換成原來的 topic 與queueId破托,接著就是將消息寫入commitlog中,存入成功之后歧蒋,將這個事務消息的執(zhí)行結果寫入到這個op half topic 中土砂,這步操作就是為了本地事務消息檢查器找出那種還沒有確認的消息。其實存入原來topic 之后谜洽,broker的reput線程就可以將這個消息在commitlog的位置寫到對應queue中了
  • 回滾事務
    • 如果是回滾消息的話萝映,只是將這個事務消息的執(zhí)行結果寫入到這個op half topic 中,消費者不會消費阐虚,等待被清理
  • 檢查本地事務執(zhí)行結果
    • broker 會有一個后臺線程不停的檢查那些沒有告訴broker 本地事務執(zhí)行結果的事務消息序臂,然后回調消息生產者問問這個事務消息對應的本地事務執(zhí)行如何了,是commit還是rollback实束。這里這個線程是60s檢查一次奥秆,然后檢查寫入half topic 超過6s還沒告訴本地事務執(zhí)行結果的消息

二、源碼分析

1咸灿、broker事務處理組件初始化

BrokerController#initialTransaction

   private void initialTransaction() {
        // spi方式加載TransactionalMessageService類构订,默認沒有
        this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
        if (null == this.transactionalMessageService) {
            // TransactionalMessageBridge是事務消息服務組件用來與存儲器交互使用的
            this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
            log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
        }
        // spi方式加載AbstractTransactionalMessageCheckListener,默認沒有
        // 事務消息檢查器找出沒有本地事務執(zhí)行結果的消息后避矢,會通知監(jiān)聽器AbstractTransactionalMessageCheckListener悼瘾,該監(jiān)聽器進行響應的處理
        this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
        if (null == this.transactionalMessageCheckListener) {
            this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
            log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
        }
        this.transactionalMessageCheckListener.setBrokerController(this);
        // 事務消息檢查服務,檢查沒有返回事務執(zhí)行結果的消息
        this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
    }
2谷异、接收事務消息

SendMessageProcessor#asyncSendMessage

       CompletableFuture<PutMessageResult> putMessageResult = null;
        Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
        String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        if (transFlag != null && Boolean.parseBoolean(transFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(response);
            }    
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }

TransactionalMessageServiceImpl#asyncPrepareMessage

    @Override
    public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
        return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
    }

TransactionalMessageBridge#asyncPutHalfMessage

    public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
        return store.asyncPutMessage(parseHalfMessageInner(messageInner));
    }

TransactionalMessageBridge#parseHalfMessageInner

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        // REAL_TOPIC屬性值保存真實的topic
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        // REAL_QID屬性值保存真實的queueId
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        // 設置topic是RMQ_SYS_TRANS_HALF_TOPIC
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        // 設置queueId是0
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
    }
  • TransactionalMessageBridge#parseHalfMessageInner將事務消息重置
  • store.asyncPutMessage(parseHalfMessageInner(messageInner))將重置的事務消息保存至文件存儲系統(tǒng)
3分尸、 提交事務或者回滾事務

EndTransactionProcessor#processRequest

        OperationResult result = new OperationResult();
        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
            // 獲取half topic的數(shù)據(jù)
            result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                // 檢查數(shù)據(jù)
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    // 將事務消息轉為原來的消息
                    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    // 提交至存儲器
                    RemotingCommand sendResult = sendFinalMessage(msgInner);
                    if (sendResult.getCode() == ResponseCode.SUCCESS) {
                        // 提交成功后,將RMQ_SYS_TRANS_HALF_TOPIC置為RMQ_SYS_TRANS_OP_HALF_TOPIC歹嘹,為了后面清理
                        this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                    }
                    return sendResult;
                }
                return res;
            }
        } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
            // 獲取數(shù)據(jù)
            result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                // 檢查數(shù)據(jù)
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    // 將RMQ_SYS_TRANS_HALF_TOPIC置為RMQ_SYS_TRANS_OP_HALF_TOPIC,為了后面清理
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return res;
            }
        }
  • commitMessage是獲取之前topic為RMQ_SYS_TRANS_HALF_TOPIC的消息
  • endMessageTransaction將消息還原
  • sendFinalMessage提交最終的消息
  • deletePrepareMessage是創(chuàng)建topic為RMQ_SYS_TRANS_OP_HALF_TOPIC孔庭,tag是remove尺上,消息內容是之前的queueOffset的新消息,方便后面清除

4圆到、檢查事務執(zhí)行結果

TransactionalMessageCheckService#run

    public void run() {
        log.info("Start transaction check service thread!");
        long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
        while (!this.isStopped()) {
            this.waitForRunning(checkInterval);
        }
        log.info("End transaction check service thread!");
    }

TransactionalMessageCheckService#onWaitEnd

    @Override
    protected void onWaitEnd() {
        // 6s超時怎抛,half topic還在就檢查
        long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
        // 最大檢查次數(shù)15
        int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
        long begin = System.currentTimeMillis();
        log.info("Begin to check prepare message, begin time:{}", begin);
        this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
        log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
    }

TransactionalMessageServiceImpl#check

if (isNeedCheck) {
    // 重新塞到了commitLog中
    if (!putBackHalfMsgQueue(msgExt, i)) {
        continue;
    }
    // 發(fā)送
    listener.resolveHalfMsg(msgExt);
} 

AbstractTransactionalMessageCheckListener#resolveHalfMsg

public void resolveHalfMsg(final MessageExt msgExt) {
    executorService.execute(new Runnable() {
        @Override
        public void run() {
            try {
                sendCheckMessage(msgExt);
            } catch (Exception e) {
                LOGGER.error("Send check message error!", e);
            }
        }
    });
}

AbstractTransactionalMessageCheckListener#sendCheckMessage
發(fā)送RequestCode.CHECK_TRANSACTION_STATE的消息查詢本地事務執(zhí)行結果

public void sendCheckMessage(MessageExt msgExt) throws Exception {
    CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
    checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
    checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
    checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
    checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
    msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
    msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
    msgExt.setStoreSize(0);
    String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
    Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
    if (channel != null) {
        brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
    } else {
        LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
    }
}
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市芽淡,隨后出現(xiàn)的幾起案子马绝,更是在濱河造成了極大的恐慌,老刑警劉巖挣菲,帶你破解...
    沈念sama閱讀 222,590評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件富稻,死亡現(xiàn)場離奇詭異掷邦,居然都是意外死亡,警方通過查閱死者的電腦和手機椭赋,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,157評論 3 399
  • 文/潘曉璐 我一進店門抚岗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人哪怔,你說我怎么就攤上這事宣蔚。” “怎么了认境?”我有些...
    開封第一講書人閱讀 169,301評論 0 362
  • 文/不壞的土叔 我叫張陵胚委,是天一觀的道長。 經常有香客問我叉信,道長篷扩,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,078評論 1 300
  • 正文 為了忘掉前任茉盏,我火速辦了婚禮鉴未,結果婚禮上,老公的妹妹穿的比我還像新娘鸠姨。我一直安慰自己铜秆,他們只是感情好,可當我...
    茶點故事閱讀 69,082評論 6 398
  • 文/花漫 我一把揭開白布讶迁。 她就那樣靜靜地躺著连茧,像睡著了一般。 火紅的嫁衣襯著肌膚如雪巍糯。 梳的紋絲不亂的頭發(fā)上啸驯,一...
    開封第一講書人閱讀 52,682評論 1 312
  • 那天,我揣著相機與錄音祟峦,去河邊找鬼罚斗。 笑死,一個胖子當著我的面吹牛宅楞,可吹牛的內容都是我干的针姿。 我是一名探鬼主播,決...
    沈念sama閱讀 41,155評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼厌衙,長吁一口氣:“原來是場噩夢啊……” “哼距淫!你這毒婦竟也來了?” 一聲冷哼從身側響起婶希,我...
    開封第一講書人閱讀 40,098評論 0 277
  • 序言:老撾萬榮一對情侶失蹤榕暇,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體彤枢,經...
    沈念sama閱讀 46,638評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡狰晚,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,701評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了堂污。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片家肯。...
    茶點故事閱讀 40,852評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖盟猖,靈堂內的尸體忽然破棺而出讨衣,到底是詐尸還是另有隱情,我是刑警寧澤式镐,帶...
    沈念sama閱讀 36,520評論 5 351
  • 正文 年R本政府宣布反镇,位于F島的核電站,受9級特大地震影響娘汞,放射性物質發(fā)生泄漏歹茶。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,181評論 3 335
  • 文/蒙蒙 一你弦、第九天 我趴在偏房一處隱蔽的房頂上張望惊豺。 院中可真熱鬧,春花似錦禽作、人聲如沸尸昧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,674評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽烹俗。三九已至,卻和暖如春萍程,著一層夾襖步出監(jiān)牢的瞬間幢妄,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,788評論 1 274
  • 我被黑心中介騙來泰國打工茫负, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蕉鸳,地道東北人。 一個月前我還...
    沈念sama閱讀 49,279評論 3 379
  • 正文 我出身青樓朽褪,卻偏偏與公主長得像置吓,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子缔赠,可洞房花燭夜當晚...
    茶點故事閱讀 45,851評論 2 361

推薦閱讀更多精彩內容

  • 什么是事務消息 首先我們用一個場景來講一下事務消息解決的問題。分布式消息隊列多用來解決多個微服務之間的調用解耦友题,不...
    空擋閱讀 1,541評論 0 3
  • 前言 得益于MQ削峰填谷嗤堰,系統(tǒng)解耦,操作異步等功能特性,在互聯(lián)網行業(yè)踢匣,可以說有分布式服務的地方告匠,MQ都往往不會缺席...
    零點145閱讀 797評論 0 0
  • 總覽 RocketMQ事務消息(Transactional Message)是指應用本地事務和發(fā)送消息操作可以被定...
    93張先生閱讀 1,156評論 1 2
  • 一、案例 創(chuàng)建事務監(jiān)聽器离唬,用來執(zhí)行事務后专,以及檢查事務執(zhí)行的結果;事務監(jiān)聽器需要實現(xiàn)TransactionListe...
    sky_5395閱讀 262評論 0 0
  • 順序消息的實現(xiàn) 順序消息進行消費時输莺,若是第一次消費失敗戚哎,可以返回SUSPEND_CURRENT_QUEUE_A_M...
    Ace_b90f閱讀 249評論 0 0