rocket源碼 順序消息和事務(wù)消息

順序消息的實(shí)現(xiàn)

順序消息進(jìn)行消費(fèi)時(shí),若是第一次消費(fèi)失敗烈钞,可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT,下一次會(huì)繼續(xù)消費(fèi)此消息。

順序消息的消費(fèi)失敗時(shí)的重試邏輯遇西,具體代碼在ProccessQueue中,順序消費(fèi)時(shí)手動(dòng)從processQueue中取消息严嗜,內(nèi)部是從msgTreeMap中取出消息后粱檀,將消息添加到consumingMsgOrderlyTreeMap中,若是消費(fèi)成功漫玄,將該消息從consumingMsgOrderlyTreeMap中刪除即可茄蚯。若是消費(fèi)失敗,執(zhí)行makeMessageToConsumeAgain方法睦优,將這些消息再放回msgTreeMap渗常。

順序消費(fèi)時(shí)有回滾和重試的邏輯,但是新版本不建議使用汗盘≈宓猓回滾和重試的邏輯和上面相同,回滾時(shí)將消息重新放回treeMap隐孽,提交時(shí)不用操作treeMap癌椿,但是需要根據(jù)consumingMsgOrderlyTreeMap找到當(dāng)前消費(fèi)的offset,從下一個(gè)繼續(xù)消費(fèi)菱阵。

順序消息消費(fèi)時(shí)使用同一個(gè)線(xiàn)程踢俄,可以看一下ConsumeMessageOrderlyService

this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(), // 迷惑性代碼...
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl("ConsumeMessageThread_"));

因?yàn)閝ueue的長(zhǎng)度是Integer.MAX_VALUE,因此在進(jìn)行消費(fèi)時(shí)使用的是一個(gè)線(xiàn)程晴及,并且有序執(zhí)行都办。

順序消息的消費(fèi)使用同一個(gè)線(xiàn)程是在ConsumeMessageOrderlyService.ConsumeRequest和ProcessQueue中實(shí)現(xiàn)的。

// ProcessQueue

private volatile boolean consuming = false;


    public boolean putMessage(final List<MessageExt> msgs) {
        boolean dispatchToConsume = false;
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            try {
                int validMsgCnt = 0;
                for (MessageExt msg : msgs) {
                    MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
                    if (null == old) {
                        validMsgCnt++;
                        this.queueOffsetMax = msg.getQueueOffset();
                        msgSize.addAndGet(msg.getBody().length);
                    }
                }
                msgCount.addAndGet(validMsgCnt);
                // 如果有消息可以進(jìn)行消費(fèi)虑稼,并且當(dāng)前queue沒(méi)有消費(fèi)琳钉,則將dispatchToConsume和consuming置為true
                if (!msgTreeMap.isEmpty() && !this.consuming) {
                    dispatchToConsume = true;
                    this.consuming = true;
                }

                if (!msgs.isEmpty()) {
                    MessageExt messageExt = msgs.get(msgs.size() - 1);
                    String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
                    if (property != null) {
                        long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
                        if (accTotal > 0) {
                            this.msgAccCnt = accTotal;
                        }
                    }
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("putMessage exception", e);
        }

        return dispatchToConsume;
    }
// ConsumeMessageOrderlyService

    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) {
        if (dispathToConsume) { // putMessage返回true時(shí),才將request提交到線(xiàn)程池
        // 如果已經(jīng)開(kāi)始對(duì)該queue進(jìn)行消費(fèi)了蛛倦,就不會(huì)再次提交任務(wù)
            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);
        }
    }
// 提交給線(xiàn)程池的任務(wù)
// 主要代碼
    class ConsumeRequest implements Runnable {

        @Override
        public void run() {
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                    // 如果可以繼續(xù)消費(fèi)歌懒,直接在當(dāng)前線(xiàn)程中輪詢(xún)消費(fèi)該P(yáng)rocessQueue即可
                    for (boolean continueConsume = true; continueConsume; ) {
                        // 在consumerImpl中的pullMessage方法中持續(xù)給ProcessQueue添加消息
                        // 手動(dòng)從ProcessQueue中取消息
                        List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
                        if (!msgs.isEmpty()) {
                            try {
                                this.processQueue.getLockConsume().lock();
                                //消費(fèi)消息
                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } catch (Throwable e) {
                               
                            } finally {
                                this.processQueue.getLockConsume().unlock();
                            }
                            // 處理消費(fèi)結(jié)果,若是成功繼續(xù)消費(fèi)
                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                        } else {
                            continueConsume = false;
                        }
                    }
                } 
        }

看代碼可以發(fā)現(xiàn)胰蝠,如果順序消息消費(fèi)失敗的話(huà)歼培,即消費(fèi)返回SUSPEND_CURRENT_QUEUE_A_MONENT時(shí)震蒋,當(dāng)前線(xiàn)程會(huì)停止消費(fèi),在processConsumeResult時(shí)躲庄,會(huì)提交新的任務(wù)到線(xiàn)程池查剖,在新的線(xiàn)程中繼續(xù)消費(fèi)該消息。

核心邏輯是保證一個(gè)ProcessQueue只在一個(gè)線(xiàn)程中輪詢(xún)消費(fèi)消息噪窘。

發(fā)送順序消息時(shí)會(huì)添加一個(gè)隊(duì)列選擇器笋庄,將需要有序的消息發(fā)送到同一個(gè)隊(duì)列。消費(fèi)端拉取特定queue的數(shù)據(jù)時(shí)天生有序倔监,在消費(fèi)時(shí)使用同一個(gè)線(xiàn)程進(jìn)行消費(fèi)直砂,因此就實(shí)現(xiàn)了順序消息。

事務(wù)消息

二階段提交加補(bǔ)償機(jī)制

第一階段提交消息到broker浩习,broker將topic修改為RMQ_SYS_TRANS_HALF_TOPIC静暂,存入對(duì)consumer不可見(jiàn)的topic/queue。如果此階段寫(xiě)入成功谱秽,執(zhí)行transactionListener.executeLocalTransaction()洽蛀。

第二階段,根據(jù)本地事務(wù)的執(zhí)行結(jié)果提交或者回滾第一階段提交至broker的消息疟赊,這里使用的是OneWay方法郊供,可靠性低,可能出現(xiàn)失敗或者超時(shí)的情況近哟。

broker端處理RequestCode.END_TRANSACTION的請(qǐng)求驮审,如果是commit,則將原來(lái)的消息取出吉执,更改為正確的topic/queue疯淫,并進(jìn)行落盤(pán),然后添加Op狀態(tài)鼠证。如果是rollback峡竣,則直接添加Op狀態(tài)即可靠抑。

添加Op狀態(tài)是將消息添加到Op隊(duì)列中量九,Op隊(duì)列是為了補(bǔ)償邏輯時(shí)減少判斷。

補(bǔ)償邏輯:

BrokerController啟動(dòng)時(shí)會(huì)啟動(dòng)TransactionMessageCheckService颂碧,默認(rèn)每隔60s檢查一次HALF_TOPIC下所有的queue中的消息荠列,檢查步驟如下

  • 先判斷當(dāng)前queue和對(duì)應(yīng)的opQueue是否添加過(guò)消息,如果沒(méi)有载城,遍歷下一個(gè)queue肌似,若有,進(jìn)行下一步判斷
  • 獲取對(duì)應(yīng)的opQueue中的消息诉瓦,若是沒(méi)有消息川队,遍歷下一個(gè)queue力细,若有,進(jìn)行下一步判斷
  • 遍歷當(dāng)前queue
  • 如果當(dāng)前偏移量已經(jīng)添加了oP狀態(tài)固额,直接遍歷至下一個(gè)偏移量眠蚂,否則進(jìn)行下一步判斷
  • 獲取當(dāng)前消息,若為null斗躏,遍歷下一個(gè)偏移量逝慧,若不為null,進(jìn)行下一步判斷
  • 若當(dāng)前消息需要舍棄或者跳過(guò)啄糙,遍歷下一個(gè)偏移量笛臣,否則進(jìn)行下一步判斷
  • 判斷當(dāng)前消息是否需要check,若暫時(shí)不需要隧饼,重新走判斷流程
  • 若是需要check沈堡,broker端給producer發(fā)送CHECK_TRANSACTION_STATE消息,producer端接收到消息后燕雁,執(zhí)行TransactionListener.checkLocalTransaction踱蛀,將check結(jié)果回發(fā)給broker。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末贵白,一起剝皮案震驚了整個(gè)濱河市率拒,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌禁荒,老刑警劉巖猬膨,帶你破解...
    沈念sama閱讀 211,561評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異呛伴,居然都是意外死亡勃痴,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,218評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)热康,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)沛申,“玉大人,你說(shuō)我怎么就攤上這事姐军√模” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,162評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵奕锌,是天一觀的道長(zhǎng)著觉。 經(jīng)常有香客問(wèn)我,道長(zhǎng)惊暴,這世上最難降的妖魔是什么饼丘? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,470評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮辽话,結(jié)果婚禮上肄鸽,老公的妹妹穿的比我還像新娘卫病。我一直安慰自己,他們只是感情好典徘,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,550評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布忽肛。 她就那樣靜靜地躺著,像睡著了一般烂斋。 火紅的嫁衣襯著肌膚如雪屹逛。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,806評(píng)論 1 290
  • 那天汛骂,我揣著相機(jī)與錄音罕模,去河邊找鬼。 笑死帘瞭,一個(gè)胖子當(dāng)著我的面吹牛淑掌,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蝶念,決...
    沈念sama閱讀 38,951評(píng)論 3 407
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼抛腕,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了媒殉?” 一聲冷哼從身側(cè)響起担敌,我...
    開(kāi)封第一講書(shū)人閱讀 37,712評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎廷蓉,沒(méi)想到半個(gè)月后全封,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,166評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡桃犬,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,510評(píng)論 2 327
  • 正文 我和宋清朗相戀三年刹悴,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片攒暇。...
    茶點(diǎn)故事閱讀 38,643評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡土匀,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出形用,到底是詐尸還是另有隱情就轧,我是刑警寧澤,帶...
    沈念sama閱讀 34,306評(píng)論 4 330
  • 正文 年R本政府宣布尾序,位于F島的核電站钓丰,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏每币。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,930評(píng)論 3 313
  • 文/蒙蒙 一琢歇、第九天 我趴在偏房一處隱蔽的房頂上張望兰怠。 院中可真熱鬧梦鉴,春花似錦、人聲如沸揭保。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,745評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)秸侣。三九已至存筏,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間味榛,已是汗流浹背椭坚。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,983評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留搏色,地道東北人善茎。 一個(gè)月前我還...
    沈念sama閱讀 46,351評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像频轿,于是被迫代替她去往敵國(guó)和親垂涯。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,509評(píng)論 2 348