RocketMQ源碼解析(九)-Broker#消息存儲ConsumeQueue

ConsumeQueue的作用

上一篇文章講到Broker在收到消息后喷屋,通過MessageStore將消息存儲到commitLog中拦宣,但是consumer在消費消息的時候是按照topic+queue的維度來拉取消息的诉植。為了方便讀取届腐,MessageStoreCommitLog中消息的offset按照topic+queueId劃分后适篙,存儲到不同的文件中往核,這就是ConsumeQueue

文件組織方式

回顧一下數(shù)據(jù)結(jié)構(gòu)圖中ConsumeQueue相關(guān)的部分。

ConsumeQueue存儲結(jié)構(gòu)

底層儲存跟CommitLog一樣使用MappedFile嚷节,每個CQUnit的大小是固定的聂儒,存儲了消息的offset、消息size和tagCode硫痰。存tag是為了在consumer取到消息offset后時候先根據(jù)tag做一次過濾衩婚,剩下的才需要到CommitLog中取消息詳情。
之前講過效斑,MessageStore通過ReputMessageService來將消息的offset寫道ConsumeQueue中非春,我們看下這部分代碼實現(xiàn)

ReputMessageService

這個Service是一個單線程的任務(wù),一直循環(huán)的調(diào)用doReput()方法:

        private boolean isCommitLogAvailable() {
            return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
        }

        private void doReput() {
            //1缓屠、判斷commitLog的maxOffset是否比上次讀取的offset大
            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
                if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                    && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                    break;
                }
                //2奇昙、從上次的結(jié)束offset開始讀取commitLog文件中的消息
                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                if (result != null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();

                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                            //3、檢查message數(shù)據(jù)完整性并封裝成DispatchRequest
                            DispatchRequest dispatchRequest =
                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                            int size = dispatchRequest.getMsgSize();

                            if (dispatchRequest.isSuccess()) {
                                if (size > 0) {
                                    //4敌完、分發(fā)消息到CommitLogDispatcher储耐,1)構(gòu)建索引; 2)更新consumeQueue
                                    DefaultMessageStore.this.doDispatch(dispatchRequest);
                                    //5、分發(fā)消息到MessageArrivingListener,喚醒等待的PullReqeust接收消息,Only Master?
                                    if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                        && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                        DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                            dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                            dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                            dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                    }
                                    //5滨溉、更新offset
                                    this.reputFromOffset += size;
                                    readSize += size;
                                    if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                        DefaultMessageStore.this.storeStatsService
                                            .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                        DefaultMessageStore.this.storeStatsService
                                            .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                            .addAndGet(dispatchRequest.getMsgSize());
                                    }
                                } else if (size == 0) {
                                    //6什湘、如果讀到文件結(jié)尾,則切換到新文件
                                    this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                    readSize = result.getSize();
                                }
                            } else if (!dispatchRequest.isSuccess()) {
                                //7业踏、解析消息出錯禽炬,跳過。commitLog文件中消息數(shù)據(jù)損壞的情況下才會進(jìn)來
                                if (size > 0) {
                                    log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                                    this.reputFromOffset += size;
                                } else {
                                    doNext = false;
                                    if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                        log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                            this.reputFromOffset);

                                        this.reputFromOffset += result.getSize() - readSize;
                                    }
                                }
                            }
                        }
                    } finally {
                        //8勤家、release對MappedFile的引用
                        result.release();
                    }
                } else {
                    doNext = false;
                }
            }
        }
    /**
     * 消息分發(fā)
     */
    public void doDispatch(DispatchRequest req) {
        for (CommitLogDispatcher dispatcher : this.dispatcherList) {
            dispatcher.dispatch(req);
        }
    }
  • 第1步腹尖,每次處理完讀取消息后,都將當(dāng)前已經(jīng)處理的最大offset記錄下來,下次處理從這個offset開始讀取消息
  • 第2步热幔,從commitLog文件中讀取消息詳情
  • 第4步乐设,分發(fā)讀取到的消息,MessageStore在初始化的時候會往dispatcherList中添加兩個Dispatcher.
this.dispatcherList = new LinkedList<>();
//consumeQueue構(gòu)建Dispatcher
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
//索引更新Dispatcher
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

具體Dispatcher的處理邏輯绎巨,我們下面詳細(xì)說

  • 第8步近尚,在通過commitLog讀取消息時,不會把消息數(shù)據(jù)復(fù)制到堆內(nèi)存中场勤,只是返回文件映射的byteBuffer戈锻,所以MappedFile記錄了有多少個引用,在數(shù)據(jù)使用完后需要釋放和媳。

Dispatcher構(gòu)建ConsumeQueue

CommitLogDispatcherBuildConsumeQueue實現(xiàn)比較簡單格遭,直接調(diào)用的MessageStore的接口

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
            switch (tranType) {
                /** 對于非事務(wù)消息和commit事務(wù)消息 */
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    DefaultMessageStore.this.putMessagePositionInfo(request);
                    break;
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
            }
        }
    }

MessageStore中的實現(xiàn):

    public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
        //找到對應(yīng)的ComsumeQueue文件
        ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
        cq.putMessagePositionInfoWrapper(dispatchRequest);
    }

前面已經(jīng)講過consumeQueue的數(shù)據(jù)存儲結(jié)構(gòu),每個topic+queueId對應(yīng)一個ConsumeQueue留瞳,每個ConsumeQueue包含一系列MappedFile拒迅。所以,這里第一步就是獲取對應(yīng)的ConsumeQueue她倘,如果不存在的話就會新建一個璧微。后面就是調(diào)用CQ的put方法:

public void putMessagePositionInfoWrapper(DispatchRequest request) {
        //1、寫入重試次數(shù)硬梁,最多30次
        final int maxRetries = 30; 
        //2前硫、判斷CQ是否是可寫的
        boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
        for (int i = 0; i < maxRetries && canWrite; i++) {
            long tagsCode = request.getTagsCode();
            if (isExtWriteEnable()) {
                //3、如果需要寫ext文件荧止,則將消息的tagscode寫入
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                cqExtUnit.setFilterBitMap(request.getBitMap());
                cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
                cqExtUnit.setTagsCode(request.getTagsCode());

                long extAddr = this.consumeQueueExt.put(cqExtUnit);
                if (isExtAddr(extAddr)) {
                    tagsCode = extAddr;
                } else {
                    log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
                        topic, queueId, request.getCommitLogOffset());
                }
            }
            //4开瞭、寫入文件
            boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
            if (result) {
                //5、記錄check point
                this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
                return;
            } else {
                ...
                ...
            }
        }

       ...
        this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
    }
  • 第3步罩息,將tagcode和bitMap記錄進(jìn)CQExt文件中,這個是一個過濾的擴(kuò)展功能个扰,采用的bloom過濾器先記錄消息的bitMap瓷炮,這樣consumer來讀取消息時先通過bloom過濾器判斷是否有符合過濾條件的消息
  • 第4步,將消息offset寫入CQ文件中递宅,這邊代碼如下:
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
        final long cqOffset) {

        if (offset <= this.maxPhysicOffset) {
            return true;
        }
        //一個CQUnit的大小是固定的20字節(jié)
        this.byteBufferIndex.flip();
        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
        this.byteBufferIndex.putLong(offset);
        this.byteBufferIndex.putInt(size);
        this.byteBufferIndex.putLong(tagsCode);

        final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
        //獲取最后一個MappedFile
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
        if (mappedFile != null) {
            //對新創(chuàng)建的文件娘香,寫將所有CQUnit初始化0值
            if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
                this.minLogicOffset = expectLogicOffset;
                this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
                this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
                this.fillPreBlank(mappedFile, expectLogicOffset);
                log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                    + mappedFile.getWrotePosition());
            }

            if (cqOffset != 0) {
                long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

                if (expectLogicOffset < currentLogicOffset) {
                    log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                    return true;
                }

                if (expectLogicOffset != currentLogicOffset) {
                    LOG_ERROR.warn(
                        "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset,
                        currentLogicOffset,
                        this.topic,
                        this.queueId,
                        expectLogicOffset - currentLogicOffset
                    );
                }
            }
            this.maxPhysicOffset = offset;
            //CQUnit寫入文件中
            return mappedFile.appendMessage(this.byteBufferIndex.array());
        }
        return false;
    }

寫文件的邏輯和寫CommitLog的邏輯是一樣的,首先封裝一個CQUnit办龄,這里面offset占8個字節(jié)烘绽,消息size占用4個字節(jié),tagcode占用8個字節(jié)俐填。然后找最后一個MappedFile安接,對于新建的文件,會有一個預(yù)熱的動作英融,寫把所有CQUnit初始化成0值盏檐。最后將Unit寫入到文件中歇式。

總結(jié)

ConsumeQueue文件數(shù)據(jù)生成的整個步驟就講到這里了。Consumer來讀取文件的時候胡野,只要指定要讀的topic和queueId材失,以及開始o(jì)ffset。因為每個CQUnit的大小是固定的硫豆,所以很容易就可以在文件中定位到龙巨。找到開始的位置后,只需要連續(xù)讀取后面指定數(shù)量的Unit熊响,然后根據(jù)Unit中存的CommitLog的offset就可以到CommitLog中讀取消息詳情了旨别。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市耘眨,隨后出現(xiàn)的幾起案子昼榛,更是在濱河造成了極大的恐慌,老刑警劉巖剔难,帶你破解...
    沈念sama閱讀 217,826評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件胆屿,死亡現(xiàn)場離奇詭異,居然都是意外死亡偶宫,警方通過查閱死者的電腦和手機非迹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來纯趋,“玉大人憎兽,你說我怎么就攤上這事〕趁埃” “怎么了纯命?”我有些...
    開封第一講書人閱讀 164,234評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長痹栖。 經(jīng)常有香客問我亿汞,道長,這世上最難降的妖魔是什么揪阿? 我笑而不...
    開封第一講書人閱讀 58,562評論 1 293
  • 正文 為了忘掉前任疗我,我火速辦了婚禮,結(jié)果婚禮上南捂,老公的妹妹穿的比我還像新娘吴裤。我一直安慰自己,他們只是感情好溺健,可當(dāng)我...
    茶點故事閱讀 67,611評論 6 392
  • 文/花漫 我一把揭開白布麦牺。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪枕面。 梳的紋絲不亂的頭發(fā)上愿卒,一...
    開封第一講書人閱讀 51,482評論 1 302
  • 那天,我揣著相機與錄音潮秘,去河邊找鬼琼开。 笑死,一個胖子當(dāng)著我的面吹牛枕荞,可吹牛的內(nèi)容都是我干的柜候。 我是一名探鬼主播,決...
    沈念sama閱讀 40,271評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼躏精,長吁一口氣:“原來是場噩夢啊……” “哼渣刷!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起矗烛,我...
    開封第一講書人閱讀 39,166評論 0 276
  • 序言:老撾萬榮一對情侶失蹤辅柴,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后瞭吃,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體碌嘀,經(jīng)...
    沈念sama閱讀 45,608評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,814評論 3 336
  • 正文 我和宋清朗相戀三年歪架,在試婚紗的時候發(fā)現(xiàn)自己被綠了股冗。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,926評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡和蚪,死狀恐怖止状,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情攒霹,我是刑警寧澤怯疤,帶...
    沈念sama閱讀 35,644評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站催束,受9級特大地震影響旅薄,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜泣崩,卻給世界環(huán)境...
    茶點故事閱讀 41,249評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望洛口。 院中可真熱鬧矫付,春花似錦、人聲如沸第焰。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至杀赢,卻和暖如春烘跺,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背脂崔。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評論 1 269
  • 我被黑心中介騙來泰國打工滤淳, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人砌左。 一個月前我還...
    沈念sama閱讀 48,063評論 3 370
  • 正文 我出身青樓脖咐,卻偏偏與公主長得像,于是被迫代替她去往敵國和親汇歹。 傳聞我的和親對象是個殘疾皇子屁擅,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,871評論 2 354

推薦閱讀更多精彩內(nèi)容