rocketMQ存儲 NO.2

在存儲第一篇中主要說了一些存儲文件的載體,和其他的管理類绞吁。至于消息的轉(zhuǎn)換存儲幢痘,中間的一些設(shè)計只是聊了一部分。

DefaultAppendMessageCallback 繼續(xù)聊

之前將了消息的大小計算家破,計算好了以后颜说,就可以進(jìn)行其他的驗證了

            // Exceeds the maximum message
            if (msgLen > this.maxMessageSize) {
                // 消息太大了
                CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                    + ", maxMessageSize: " + this.maxMessageSize);
                return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
            }

            // Determines whether there is sufficient free space
            if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                // 在消息放入到buffer中,放入失敗汰聋,需要將buffer標(biāo)識為文件已滿门粪。
                this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
                // END_FILE_MIN_BLANK_LENGTH 這個長度放在文件的末尾,當(dāng)讀取到該位置時烹困,發(fā)現(xiàn)時BLANK_MAGIC_CODE玄妈,
                // 說明文件讀取到盡頭了,該換一個文件讀取
                // 1 TOTALSIZE
                this.msgStoreItemMemory.putInt(maxBlank); // 先寫入剩余的內(nèi)容的長度
                // 2 MAGICCODE
                this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 設(shè)置一個標(biāo)識位髓梅,表示讀到盡頭了
                // 3 The remaining space may be any value
                // Here the length of the specially set maxBlank
                final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                    queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
            }

首先消息的長度不能太大了拟蜻,太大就是非法的消息。maxBlank參數(shù)是mappedFile中計算剩余多大的容量女淑。但是這里的判斷是否大于剩余量時,是通過消息的長度加上END_FILE_MIN_BLANK_LENGTH 的長度比較的辜御。并且END_FILE_MIN_BLANK_LENGTH是默認(rèn)的8個字節(jié)鸭你,有什么用意呢?往下看擒权,重置了msgStoreItemMemory袱巨,并且limit為maxBlank。但是先放入4個字節(jié)的maxBlank長度碳抄,然后又放入4個字節(jié)的CommitLog.BLANK_MAGIC_CODE愉老。在計算消息的大小時,也有4個字節(jié)的消息大小剖效,和一個4個字節(jié)的MAGICCODE標(biāo)識嫉入。

   public final static int MESSAGE_MAGIC_CODE = -626843481;
    protected final static int BLANK_MAGIC_CODE = -875286124;

看一下焰盗,他們的code,一個標(biāo)識消息咒林,一個標(biāo)識空白熬拒。可以看出來垫竞,在怎么放消息到MappedFile中澎粟,文件是需要滿或者結(jié)束的,那怎么標(biāo)識這個文件內(nèi)容獲取時欢瞪,沒有消息了呢活烙,就可以通過BLANK_MAGIC_CODE標(biāo)識,說明該文件的存儲的消息已經(jīng)結(jié)束遣鼓,后面的內(nèi)容都是空啸盏。在查詢消息功能時,讀取到BLANK狀態(tài)時就可以停止了譬正,往下查詢了宫补。所以每個文件的結(jié)尾必須要包含BLANK_MAGIC_CODE,從而就需要自動占用8個字節(jié)了曾我。最后返回的AppendMessageResult中狀態(tài)為END_OF_FILE粉怕,告訴調(diào)用方,文件滿了抒巢,需要重新創(chuàng)建新的MappedFile贫贝。在考慮一下,MappedFile在調(diào)用callBack方法時蛉谜,會將自身的wrote值對result中寫入的數(shù)量進(jìn)行累加的稚晚,那么就算文件不能繼續(xù)寫了,也要告訴MappedFile本次寫入多少長度型诚,所以在AppendMessageResult中的wroteBytes參數(shù)值就是maxBlank值了客燕。

            // Initialization of storage space
            this.resetByteBuffer(msgStoreItemMemory, msgLen);
            // 1 TOTALSIZE
            this.msgStoreItemMemory.putInt(msgLen);
            // 2 MAGICCODE
            this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 表示是個消息
            // 3 BODYCRC
            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
            // 4 QUEUEID
            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
            // 5 FLAG
            this.msgStoreItemMemory.putInt(msgInner.getFlag());
            // 6 QUEUEOFFSET
            this.msgStoreItemMemory.putLong(queueOffset);
            // 7 PHYSICALOFFSET
            this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
            // 8 SYSFLAG
            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
            // 9 BORNTIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
            // 10 BORNHOST
            this.resetByteBuffer(hostHolder, 8);
            this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
            // 11 STORETIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
            // 12 STOREHOSTADDRESS
            this.resetByteBuffer(hostHolder, 8);
            this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
            //this.msgBatchMemory.put(msgInner.getStoreHostBytes());
            // 13 RECONSUMETIMES
            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
            // 14 Prepared Transaction Offset
            this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
            // 15 BODY
            this.msgStoreItemMemory.putInt(bodyLength);
            if (bodyLength > 0)
                this.msgStoreItemMemory.put(msgInner.getBody());
            // 16 TOPIC
            this.msgStoreItemMemory.put((byte) topicLength);
            this.msgStoreItemMemory.put(topicData);
            // 17 PROPERTIES
            this.msgStoreItemMemory.putShort((short) propertiesLength);
            if (propertiesLength > 0)
                this.msgStoreItemMemory.put(propertiesData);

            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
            // Write messages to the queue buffer
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
                msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

            switch (tranType) {
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    // The next update ConsumeQueue information
                    // 當(dāng)消息沒有事務(wù),或者事務(wù)提交狰贯,則更新queue偏移量
                    CommitLog.this.topicQueueTable.put(key, ++queueOffset);
                    break;
                default:
                    break;
            }
            return result;

接下來就是消息轉(zhuǎn)發(fā)成byte數(shù)組了也搓,依次按規(guī)則寫入到msgStoreItemMemory中,最終msgStoreItemMemory寫入到byteBuffer中涵紊。其中有個queueOffset參數(shù)是在第6個次序?qū)懭氲陌剩⑶以谑聞?wù)提交或者沒有事務(wù)時,進(jìn)行++queueOffset操作摸柄,放入到topicQueueTable中颤练。說明queueOffset依次遞增的,他的作用是什么呢驱负?
DefaultAppendMessageCallback的append方法已經(jīng)大概了解嗦玖,本文只是講了單個消息放置患雇,當(dāng)然還提供了批量消息放置,原理都差不多
再回到CommitLog中putMessag方法剩余片段

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

        handleDiskFlush(result, putMessageResult, msg);
        handleHA(result, putMessageResult, msg);

        return putMessageResult;

該邏輯就是解鎖掉unlockMappedFile文件踏揣,即釋放掉文件與內(nèi)存映射關(guān)系映射庆亡,因為不需要再寫了,只剩下讀了捞稿。然后做個統(tǒng)計同一個topic的生成次數(shù)又谋,和消息大小。
先是處理磁盤刷新的邏輯娱局,因為broker支持同步刷盤和異步刷盤的彰亥。同步刷屏的好處就是保證數(shù)據(jù)不丟失,但是性能會降低很多衰齐;異步刷屏則就有可能會丟消息數(shù)據(jù)了任斋。那么就看看同步和異步是如何實現(xiàn)的把?

    public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        // Synchronization flush
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: " + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
                service.wakeup();
            }
        }
        // Asynchronous flush
        else {
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                flushCommitLogService.wakeup();
            } else {
                commitLogService.wakeup();
            }
        }
    }

首先看一下異步方式耻涛,他只是通過ThreadService方法中喚醒線程wakeup()废酷,該flush消息線程就可以喚醒∧疲看一下FlushRealTimeService 實時刷新服務(wù)類
在線程實現(xiàn)方法中

                try {
                    if (flushCommitLogTimed) {
                        Thread.sleep(interval);
                    } else {
                        this.waitForRunning(interval);
                    }

                    if (printFlushProgress) {
                        this.printFlushProgress();
                    }

                    long begin = System.currentTimeMillis();
                    CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        // 物理消息時間戳更新
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }
                    long past = System.currentTimeMillis() - begin;
                    if (past > 500) {
                        log.info("Flush data to disk costs {} ms", past);
                    }
                } catch (Throwable e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                    this.printFlushProgress();
                }

在刷新之前都會等待或者sleep一段時間澈蟆,然后通過mappedFileQueue執(zhí)行flush方法,并且更新了StoreCheckPoint的存儲消息的時間卓研。異步刷新很簡單趴俘,可以通過其他線程喚醒刷新線程,執(zhí)行刷盤操作奏赘。
同步刷新時寥闪,聲明了GroupCommitRequest請求,并且設(shè)置了內(nèi)部屬性nextOffset的值磨淌,該值是由消息的存儲起始位置+消息的寫入長度組合的疲憋。將該Request放入到了GroupCommitService服務(wù)中的Request列表中。該Request也存在倒計時監(jiān)聽器梁只,所以這段代碼request.waitForFlush()缚柳,進(jìn)行等待刷新完成。
GroupCommitService中代碼片段

        private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
        private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();

        public synchronized void putRequest(final GroupCommitRequest request) {
            synchronized (this.requestsWrite) {
                this.requestsWrite.add(request);
            }
            if (hasNotified.compareAndSet(false, true)) {
                waitPoint.countDown(); // notify
            }
        }

首先定義了2個屬性集合敛纲,一個是請求寫喂击,一個請求讀剂癌。在放入請求時淤翔,是將request對象放入到requestsWrite里面的,并且是鎖住requestsWrite對象佩谷。然后喚醒ServiceThread線程旁壮。在喚起線程是监嗜,會調(diào)用onWaiteEnd方法,而GroupCommitService實現(xiàn)該方法時調(diào)用了swapRequests()方法抡谐,

        private void swapRequests() {
            List<GroupCommitRequest> tmp = this.requestsWrite;
            this.requestsWrite = this.requestsRead;
            this.requestsRead = tmp;
        }

起始就是將讀寫的集合進(jìn)行交換而已裁奇。而線程喚醒后,就會調(diào)用doCommit方法

        private void doCommit() {
            synchronized (this.requestsRead) {
                if (!this.requestsRead.isEmpty()) {
                    for (GroupCommitRequest req : this.requestsRead) {
                        // There may be a message in the next file, so a maximum of
                        // two times the flush
                        boolean flushOK = false;
                        for (int i = 0; i < 2 && !flushOK; i++) {
                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

                            if (!flushOK) {
                                CommitLog.this.mappedFileQueue.flush(0);
                            }
                        }

                        req.wakeupCustomer(flushOK);
                    }

                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }

                    this.requestsRead.clear();
                } else {
                    // Because of individual messages is set to not sync flush, it
                    // will come to this process
                    CommitLog.this.mappedFileQueue.flush(0);
                }
            }
        }

首先對讀集合進(jìn)行鎖定麦撵。在這里依次遍歷所有的請求刽肠,然后判斷mappedFileQueue中FlushedWhere與請求中nextOffset比較,如果大于則刷新成功了免胃,就可以直接喚醒等待request請求的線程音五,如果小于則調(diào)用mappedFileQueue的flush方法。并且可以保證2次刷新羔沙。通過這種方式躺涝,實現(xiàn)消息的同步刷屏的,但是性能的確不是很高扼雏。
刷新磁盤后坚嗜,還有handleHA()方法,該方法是高可用消息的處理方式诗充,如何實現(xiàn)的苍蔬,后面會專門聊聊如何實現(xiàn) Master/Slave功能

考慮一下幾點問題?
1.已經(jīng)存儲的消息其障,存儲在commitLog中的消息都是各種類型的topic消息银室,包括有延遲消息,事務(wù)消息励翼,普通消息如何區(qū)分消費蜈敢;
2.由于commitLog中存儲的所有的消息,消息的查詢設(shè)計的不好汽抚,效率特別低抓狭,最終導(dǎo)致消費進(jìn)度緩慢
3.還有一些特別需求,例如通過關(guān)鍵字或者時間段造烁,檢索消息否过,這些都是需要設(shè)計良好的方式,提升查詢效率惭蟋,從而可以加快消費進(jìn)度苗桂。

ReputMessageService

該類是DefaultMessageStore的內(nèi)部類,它繼承與ServiceThread類告组,也是一個線程類煤伟,在該類中只有一個屬性 reputFromOffset 簡單解釋為重放偏移量。既然是實現(xiàn)線程接口,看一下run方法

        public void run() {
            DefaultMessageStore.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    Thread.sleep(1);
                    this.doReput();
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            DefaultMessageStore.log.info(this.getServiceName() + " service end");
        }

內(nèi)部也是一個線程自循環(huán)便锨,不停的調(diào)用doReput()方法围辙。

        private void doReput() {
            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
                log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
                    this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
            }
            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

                if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                    && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                    break;
                }

                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                if (result != null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();

                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                            DispatchRequest dispatchRequest =
                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                            int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                            if (dispatchRequest.isSuccess()) {
                                if (size > 0) {
                                    DefaultMessageStore.this.doDispatch(dispatchRequest);

                                    if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                        && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                        DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                            dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                            dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                            dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                    }

                                    this.reputFromOffset += size;
                                    readSize += size;
                                    if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                        DefaultMessageStore.this.storeStatsService
                                            .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                        DefaultMessageStore.this.storeStatsService
                                            .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                            .addAndGet(dispatchRequest.getMsgSize());
                                    }
                                } else if (size == 0) {
                                    // 讀取到了文件的末尾,重新?lián)Q個文件讀取
                                    this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                    readSize = result.getSize();
                                }
                            } else if (!dispatchRequest.isSuccess()) {

                                if (size > 0) {
                                    log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                                    this.reputFromOffset += size;
                                } else {
                                    doNext = false;
                                    // If user open the dledger pattern or the broker is master node,
                                    // it will not ignore the exception and fix the reputFromOffset variable
                                    if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
                                        DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                        log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                            this.reputFromOffset);
                                        this.reputFromOffset += result.getSize() - readSize;
                                    }
                                }
                            }
                        }
                    } finally {
                        result.release();
                    }
                } else {
                    doNext = false;
                }
            }
        }

先通過reputFromOffset偏移量從commitLog中的MappedFile中截取剩余部分的所有消息內(nèi)容SelectMappedBufferResult放案,之前在MappedFile中也講過SelectMappedBufferResult可能會存在多條消息姚建,他不是只有一條數(shù)據(jù),因為他截取的部分是從reputFromOffset到MappedFile的wrotePositon位置的數(shù)據(jù)吱殉。獲取到SelectMappedBufferResult時就開始遍歷數(shù)據(jù)掸冤。由于result中的ByteBuffer是順序讀取,所以內(nèi)部的pos位置隨著讀取也會越來越大友雳,但是不需要重置贩虾。通過CommitLog中的checkMessageAndReturnSize方法,就可以知道一個消息的大致信息沥阱,

public class DispatchRequest {
    private final String topic;
    private final int queueId;
    private final long commitLogOffset;
    private int msgSize;
    private final long tagsCode;
    private final long storeTimestamp;
    private final long consumeQueueOffset;
    private final String keys;
    private final boolean success;
    private final String uniqKey;

    private final int sysFlag;
    private final long preparedTransactionOffset;
    private final Map<String, String> propertiesMap;
    private byte[] bitMap;

    private int bufferSize = -1;//the buffer size maybe larger than the msg size if the message is wrapped by something
    // .....
}

其中得到的消息內(nèi)容都是一些關(guān)鍵屬性缎罢,例如topic,queueId考杉,msgSize等等策精,這些屬性有什么用,繼續(xù)講崇棠。因為得到dispatchRequest的結(jié)果不太相同的咽袜,例如文件讀到MAGIC_BLANK_CODE怎么處理的。首先dispatchRequest返回成功的枕稀,都是正常去讀的询刹,如果size大于0,存在消息萎坷。如果size=0說明文件末尾了凹联,需要換下一個文件讀取了,在這里commitLog.rollNextFile(reputFromOffset)就是指向了下一個文件的起始偏移量哆档。在存在消息的時蔽挠,首先調(diào)用了了doDispatch(request) 分發(fā)消息的方法,通過判斷條件進(jìn)行執(zhí)行消息到達(dá)監(jiān)聽器瓜浸,將消息的reputFromOffset加上了消息的size長度澳淑,然后做一些統(tǒng)計。重放線程主要功能還是在doDispatch()方法內(nèi)插佛。

    public void doDispatch(DispatchRequest req) {
        // 消息入磁盤成功杠巡,還有后續(xù)處理,例如創(chuàng)建索引雇寇,放入到消費隊列中氢拥,
        for (CommitLogDispatcher dispatcher : this.dispatcherList) {
            dispatcher.dispatch(req);
        }
    }

CommitLogDispatcher 消息分發(fā)的接口绑改,在doDispatch方法只是遍歷一遍分發(fā)接口實現(xiàn)類,那么有哪些實現(xiàn)類

CommitLogDispatcherBuildConsumeQueue 構(gòu)建消費隊列

        @Override
        public void dispatch(DispatchRequest request) {
            final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    DefaultMessageStore.this.putMessagePositionInfo(request);
                    break;
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
            }
        }

先判斷消息的事務(wù)類型兄一,如果是無事務(wù)或者事務(wù)提交,則執(zhí)行putMessagePositionInfo方法识腿,如果其他事務(wù)則不做任何處理出革。

    public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
        // 得到消費隊列,然后進(jìn)行數(shù)據(jù)更新
        ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
        cq.putMessagePositionInfoWrapper(dispatchRequest);
    }

ok渡讼,這里就引入了ConsumeQueue的消費隊列骂束,在生成的時候已經(jīng)選擇好放入那個topic下的隊列編號,那么對于消費組成箫,也應(yīng)該知道消費的是哪個消費隊列展箱。基本上一個生成隊列對應(yīng)一個消費隊列蹬昌,除非讀寫權(quán)限控制了混驰。

    public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
            ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }

        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            ConsumeQueue newLogic = new ConsumeQueue(
                topic,
                queueId,
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
                this);
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                logic = newLogic;
            }
        }

        return logic;
    }

查找消費隊列,首先在一個broker下皂贩,topic是唯一的栖榨,但是topic下可以有多個不同編號的queueId組成的消費隊列ConsumeQueue。屬性一下消費隊列的屬性信息

    public ConsumeQueue(
        final String topic,
        final int queueId,
        final String storePath,
        final int mappedFileSize,
        final DefaultMessageStore defaultMessageStore) {
        this.storePath = storePath;
        this.mappedFileSize = mappedFileSize;
        this.defaultMessageStore = defaultMessageStore;

        this.topic = topic;
        this.queueId = queueId;

        String queueDir = this.storePath
            + File.separator + topic
            + File.separator + queueId;

        this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);

        this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);

        if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {
            this.consumeQueueExt = new ConsumeQueueExt(
                topic,
                queueId,
                StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),
                defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
                defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
            );
        }
    }

這是一個消費隊列的構(gòu)造器方法明刷,包含了topic婴栽,queueId,也需要MappedFileQueue映射文件隊列辈末,說明該消費隊列也是需要存儲數(shù)據(jù)的愚争,只是他與CommitLog存儲的內(nèi)容可能不同而已。定義了文件的大小mappedFileSize挤聘,和其他的存儲根地址等等轰枝。
this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
這段代碼是申請了CQ_STORE_UNIT_SIZE=20長度的字節(jié),為什么是20個字節(jié)组去?下面繼續(xù)說狸膏。在通過topic和queueId查詢得到了一個ConsumeQueue,然后執(zhí)行cq.putMessagePositionInfoWrapper方法添怔。

    public void putMessagePositionInfoWrapper(DispatchRequest request) {
        final int maxRetries = 30;
        boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
        for (int i = 0; i < maxRetries && canWrite; i++) {
            long tagsCode = request.getTagsCode();
            if (isExtWriteEnable()) {
                // ...
            }
            boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
            if (result) {
                this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
                return;
            } else {
                // XXX: warn and notify me
                log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
                    + " failed, retry " + i + " times");

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    log.warn("", e);
                }
            }
        }

        // XXX: warn and notify me
        log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
        this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
    }

在執(zhí)行putMessagePositionInfo方法湾戳,然后更新StoreCheckPoint中l(wèi)ogicsMsgTimestamp方法。如果失敗广料,則繼續(xù)嘗試砾脑。

    private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
        final long cqOffset) {

        if (offset + size <= this.maxPhysicOffset) {
            log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
            return true;
        }

        this.byteBufferIndex.flip(); // 長度縮短limit=pos,并且重置pos=0位置
        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
        this.byteBufferIndex.putLong(offset);
        this.byteBufferIndex.putInt(size);
        this.byteBufferIndex.putLong(tagsCode);

        // cqOffset是編號艾杏,他的真實地址是CQ_STORE_UNIT_SIZE的倍數(shù)
        final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
        if (mappedFile != null) {

            if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
                // 當(dāng)文件時隊列中的第一個韧衣,且消費對了的偏移量不為0.文件中寫入的數(shù)據(jù)為0,需要重置一下flush,commit偏移量
                this.minLogicOffset = expectLogicOffset;
                this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
                this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
                // 并且填充之前的數(shù)據(jù)畅铭,
                this.fillPreBlank(mappedFile, expectLogicOffset);
                log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                    + mappedFile.getWrotePosition());
            }

            if (cqOffset != 0) {
                // 進(jìn)行校驗氏淑,保證expectLogicOffset的偏移量與真正需要寫入的位置時一致的
                long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

                if (expectLogicOffset < currentLogicOffset) {
                    log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                    return true;
                }

                if (expectLogicOffset != currentLogicOffset) {
                    LOG_ERROR.warn(
                        "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset,
                        currentLogicOffset,
                        this.topic,
                        this.queueId,
                        expectLogicOffset - currentLogicOffset
                    );
                }
            }
            this.maxPhysicOffset = offset + size;
            // 將byte數(shù)組添加到mappedFile中
            return mappedFile.appendMessage(this.byteBufferIndex.array());
        }
        return false;
    }

所以putMessagePositionInfo才是真正核心的方法,在方法中參數(shù)包括了物理偏移量 offset硕噩,消息大小size假残,消息tags碼,和消息邏輯順序cqOffset(在存儲消息時炉擅,會通過topic從CommitLog中topicQueueTable 中得到一個順序偏移量辉懒,消息存儲成功就行進(jìn)行 自身+1 操作)。byteBufferIndex之前說過長度為20個字節(jié)谍失,是固定的眶俩。在這里他放了哪些信息,8個字節(jié)的物理偏移量offset快鱼,4個字節(jié)的消息長度size颠印,8個字節(jié)的togasCode,剛剛組成20個字節(jié)抹竹,也說明了一個消息轉(zhuǎn)換成ConsumeQueu信息時嗽仪,只存儲了3個屬性值,并且是固定長度的20個字節(jié)柒莉。expectLogicOffset是存儲byteBufferIndex內(nèi)容的起始位置闻坚,通過MappedFileQueue得到了MappedFile。在確認(rèn)一下ConsumeQueue中一個MappedFile文件大芯ばⅰ:

    public int getMappedFileSizeConsumeQueue() {

        int factor = (int) Math.ceil(this.mappedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0));
        return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE);
    }

他是20的倍數(shù)窿凤,說明一個文件是可以完整的記錄factor數(shù)量的數(shù)據(jù)。不需要類似消息存儲一樣跨蟹,需要有個結(jié)尾結(jié)束標(biāo)識雳殊。在看一下 存儲的消費信息,當(dāng)獲取符合條件的MappedFile時窗轩,判斷了該文件是否第一次創(chuàng)建即完全是沒有寫入過數(shù)據(jù)的夯秃,該文件需要初始化,設(shè)置了最小的邏輯偏移量minLogicOffset 痢艺,更新了刷新和提交的位置仓洼,還執(zhí)行了fillPreBlank填充方法。其他的都是一些偏移量校驗過程堤舒,然后更新maxPhysicOffset色建,相同topic下的最大物理偏移量,然后將byteBufferIndex轉(zhuǎn)換成byte數(shù)組添加到mappedFile中舌缤。
好了ConsumeQueue有什么特點箕戳,
1.他的存儲數(shù)據(jù)格式固定的某残,20個字節(jié)大小。2.他可以側(cè)面看到陵吸,topic下的消息存儲情況玻墅。3.由于commitLog是存儲了所有的消息,但是通過不同的topic和queueId時壮虫,存儲的簡化數(shù)據(jù)澳厢,方便以后數(shù)據(jù)定位及查找。4.有些系統(tǒng)自定義的topic旨指,例如延遲類型的topic,或者重試這樣的topic喳整,系統(tǒng)可以進(jìn)行單獨管理和分配谆构。

CommitLogDispatcherBuildIndex 構(gòu)建索引分發(fā)器

    class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
                DefaultMessageStore.this.indexService.buildIndex(request);
            }
        }
    }

為了方便消息內(nèi)容查詢,例如數(shù)據(jù)庫設(shè)計中框都,引入索引搬素,就能快速定位到具體消息的位置。在RocketMQ中的設(shè)計魏保,索引的存儲設(shè)計熬尺,采用數(shù)組及鏈表結(jié)合方式的數(shù)據(jù)結(jié)構(gòu),與HashMap中的結(jié)構(gòu)設(shè)計類似谓罗。索引管理通過IndexService索引服務(wù)控制粱哼。

    public void buildIndex(DispatchRequest req) {
        IndexFile indexFile = retryGetAndCreateIndexFile();
        if (indexFile != null) {
            long endPhyOffset = indexFile.getEndPhyOffset();
            DispatchRequest msg = req;
            String topic = msg.getTopic();
            String keys = msg.getKeys();
            if (msg.getCommitLogOffset() < endPhyOffset) {
                return;
            }

            final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    break;
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    return;
            }

            if (req.getUniqKey() != null) {
                indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
                if (indexFile == null) {
                    log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                    return;
                }
            }

            if (keys != null && keys.length() > 0) {
                String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
                for (int i = 0; i < keyset.length; i++) {
                    String key = keyset[i];
                    if (key.length() > 0) {
                        indexFile = putKey(indexFile, msg, buildKey(topic, key));
                        if (indexFile == null) {
                            log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                            return;
                        }
                    }
                }
            }
        } else {
            log.error("build index error, stop building index");
        }
    }

索引存儲的結(jié)構(gòu)和內(nèi)容,在該代碼片段中檩咱,首先獲取到IndexFile索引文件揭措,然后通過DispatchRequest中topic,keys刻蚯,uniqKey等屬性進(jìn)行放置绊含。尤其是keys,他是多個關(guān)鍵字組成炊汹,但都會拆分多個key躬充,與topic組合成最終的key進(jìn)行存儲。索引一個消息可以有多個關(guān)鍵字組成讨便,或者一個唯一關(guān)鍵字組成充甚。那么IndexFile是如何存儲索引內(nèi)容的。

    public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,
        final long endPhyOffset, final long endTimestamp) throws IOException {
        // 一個索引文件最大需需要占的子節(jié)霸褒,有頭文件(40)+ 槽 + 索引信息
        int fileTotalSize =
            IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
        this.mappedFile = new MappedFile(fileName, fileTotalSize);
        this.fileChannel = this.mappedFile.getFileChannel();
        this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();
        this.hashSlotNum = hashSlotNum;
        this.indexNum = indexNum;

        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        this.indexHeader = new IndexHeader(byteBuffer);

        // 初始化時津坑,設(shè)置了起始的頭部信息
        if (endPhyOffset > 0) {
            this.indexHeader.setBeginPhyOffset(endPhyOffset);
            this.indexHeader.setEndPhyOffset(endPhyOffset);
        }

        if (endTimestamp > 0) {
            this.indexHeader.setBeginTimestamp(endTimestamp);
            this.indexHeader.setEndTimestamp(endTimestamp);
        }
    }

這個索引文件的構(gòu)造器,
1.先是定義了文件的大小fileTotalSize傲霸,并且已經(jīng)確定了他的組成部門疆瑰,包括INDEX_HEADER_SIZE長度眉反,hash槽的長度,索引的長度穆役。也可以看出索引文件是三部分組成的寸五。頭文件,hash槽數(shù)據(jù)耿币,索引數(shù)據(jù)組成梳杏。
2.定義了索引文件的槽數(shù)量,和索引數(shù)量
3.得到了IndexHeader淹接,索引頭數(shù)據(jù)十性。

    private AtomicLong beginTimestamp = new AtomicLong(0);
    private AtomicLong endTimestamp = new AtomicLong(0);
    private AtomicLong beginPhyOffset = new AtomicLong(0);
    private AtomicLong endPhyOffset = new AtomicLong(0);
    private AtomicInteger hashSlotCount = new AtomicInteger(0);

    private AtomicInteger indexCount = new AtomicInteger(1);

indexHeader由6個屬性組成,開始塑悼,結(jié)束時間劲适。開始結(jié)束物理偏移量。槽數(shù)量厢蒜,索引數(shù)量霞势。因為IndexHeader也是存儲在磁盤中的,從屬性中斑鸦,可以確定一個IndexHeader占用了40個字節(jié)愕贡。

    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            int keyHash = indexKeyHashMethod(key); // 通過key hash進(jìn)行分配
            int slotPos = keyHash % this.hashSlotNum; // 槽的位置
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; // 已經(jīng)占的位置,頭文件和所屬槽的地址

            FileLock fileLock = null;

            try {

                // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                // false);
                // 記錄了這個槽對應(yīng)的值巷屿,該值是記錄最近一次put索引時的索引位置固以,但初始都是0
                // 那么索引的位置怎么拿到?通過header中的indexCount獲取
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }

                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

                timeDiff = timeDiff / 1000;

                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }

                // 需要put索引的真正位置
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;

                // 一個索引所占的位置,4個byte=hash值嘱巾,8個byte=消息物理偏移量嘴纺,4個byte=時間差,4個byte=上一個索引的位置
                // 這個索引的設(shè)計類似與HashMap的結(jié)構(gòu)設(shè)計浓冒,采用數(shù)組與鏈表的形式
                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); // 為什么要記錄時間差栽渴?因為省空間
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

                // 重新更新一下槽當(dāng)前的索引位置,提供給下一個索引用
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

                if (this.indexHeader.getIndexCount() <= 1) {
                    // 如果是第一個開始放置索引稳懒,更新開始物理偏移量和開始存儲時間
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }

                this.indexHeader.incHashSlotCount();
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);

                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }

        return false;
    }

IndexFile是如何存儲key的闲擦,如何結(jié)合數(shù)據(jù)結(jié)構(gòu)存儲的?索引文件的基本結(jié)構(gòu)圖场梆,如下:


索引文件設(shè)計

從文件的橫向看墅冷,和縱向看,了解清楚內(nèi)部結(jié)構(gòu)設(shè)計思路或油。
首先確定索引數(shù)量還能繼續(xù)放置寞忿。通過key得到一個keyHash值,然后通過keyHash百分比槽的數(shù)量顶岸,得到了slotPos腔彰,該位置就是可以對應(yīng)槽的位置叫编。但是slotPos只是相對順序位置,真實的存放位置還需要包含header部分霹抛。所以absSlotPos是槽的絕對位置搓逾。通過absSlotPos位置得到4個字節(jié)長度即slotValue,該值記錄的是最近一次放置索引的順序值杯拐。timeDiff為什么要時間差霞篡,并且轉(zhuǎn)換成了單位秒。因為文件的IndexHeader存儲了文件的開始時間的端逼,如果要得到索引的最終時間朗兵,就可以通過開始時間加上時間差。從而到達(dá)從long8個字節(jié)只需要int 4個字節(jié)存儲顶滩,磁盤空間可以剩下很多余掖。因為indexHeader中存放了當(dāng)前索引存放的順序位置,就能得到absIndexPos絕對索引位置诲祸,在存放索引數(shù)據(jù)時浊吏,一個索引存放需要20個字節(jié)而昨。除了存放的key的hash值救氯,物理偏移量,時間差等歌憨,還存儲了同一個hash槽的上一個索引順序位置着憨,這樣就能組合成了一個單向鏈表了。存儲索引后务嫡,更新了當(dāng)前hash槽中的索引順序編號甲抖。并且增加了indexHeader中的索引數(shù)量,更新了最大的物理偏移量phyOffset心铃,和最大存儲消息的時間准谚。
既然有了存儲的邏輯,那么查詢索引如何實現(xiàn)呢去扣?IndexFile中查詢方法

    public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
        final long begin, final long end, boolean lock) {
        if (this.mappedFile.hold()) {
            // 找出hashslot 位置柱衔,得到索引編號,通過索引編號找出具體的索引信息愉棱,然后依次找出上一個索引的位置進(jìn)行遍歷
            int keyHash = indexKeyHashMethod(key);
            int slotPos = keyHash % this.hashSlotNum;
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;
            try {
                if (lock) {
                    // fileLock = this.fileChannel.lock(absSlotPos,
                    // hashSlotSize, true);
                }
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                    || this.indexHeader.getIndexCount() <= 1) {
                } else {
                    for (int nextIndexToRead = slotValue; ; ) {
                        if (phyOffsets.size() >= maxNum) {
                            break;
                        }

                        int absIndexPos =
                            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                                + nextIndexToRead * indexSize;

                        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

                        long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

                        if (timeDiff < 0) {
                            break;
                        }

                        timeDiff *= 1000L;

                        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);

                        if (keyHash == keyHashRead && timeMatched) {
                            phyOffsets.add(phyOffsetRead);
                        }

                        if (prevIndexRead <= invalidIndex
                            || prevIndexRead > this.indexHeader.getIndexCount()
                            || prevIndexRead == nextIndexToRead || timeRead < begin) {
                            // 1.索引已經(jīng)沒有上一個索引位置唆铐。2.前一個索引編號大于了當(dāng)前編號,
                            // 3.索引編號一致奔滑,4艾岂,時間小于查詢的時間
                            break;
                        }

                        nextIndexToRead = prevIndexRead;
                    }
                }
            } catch (Exception e) {
                log.error("selectPhyOffset exception ", e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }

                this.mappedFile.release();
            }
        }
    }

先從查詢的key計算得到keyHash,定位到屬于哪個槽朋其,然后得到這個槽的絕對位置absSlotPos王浴。通過absSlotPos讀取4個字節(jié)脆炎,就能獲取到最近索引的順便編號。最終計算得到absIndexPos絕對索引位置叼耙,然后依次讀取響應(yīng)的數(shù)據(jù)腕窥,與查詢的時間,關(guān)鍵字比較等查找合適的物理偏移量筛婉。然后nextIndexToRead重新賦值到當(dāng)前索引指向的順序編號prevIndexRead簇爆,繼續(xù)循環(huán)。如何結(jié)束循環(huán)呢爽撒?直到索引存儲的上一個索引的編號為0入蛆,才查找結(jié)束,或者查找內(nèi)容滿了等等硕勿。

RocketMQ最重要的存儲有這些數(shù)據(jù)組成哨毁,包括消息元數(shù)據(jù),消費隊列數(shù)據(jù)源武,和索引數(shù)據(jù)等扼褪。在設(shè)計中運用了很多線程方式,解耦很多業(yè)務(wù)關(guān)聯(lián)粱栖。例如在存儲消息的時候话浇,也會將消息存儲到對應(yīng)的隊列中,但是RocketMQ設(shè)計中闹究,消息的存放是加了鎖的同步代碼塊幔崖,為了保證效率,提高代碼執(zhí)行速率渣淤,盡可能減少其他工作赏寇,能解耦的用異步方式處理,所以只將消息存放到MappedFile中价认。我們知道消息要進(jìn)行2次處理后梆砸,才能更加有效的查詢消息幽纷,所以用重放線程來控制消息的二次處理,包括消費隊列的控制,索引的添加等等盈魁。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末窗骑,一起剝皮案震驚了整個濱河市翩迈,隨后出現(xiàn)的幾起案子膘流,更是在濱河造成了極大的恐慌,老刑警劉巖丁屎,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件荠锭,死亡現(xiàn)場離奇詭異,居然都是意外死亡晨川,警方通過查閱死者的電腦和手機证九,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進(jìn)店門删豺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人愧怜,你說我怎么就攤上這事呀页。” “怎么了拥坛?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵蓬蝶,是天一觀的道長。 經(jīng)常有香客問我猜惋,道長丸氛,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任著摔,我火速辦了婚禮缓窜,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘谍咆。我一直安慰自己禾锤,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布摹察。 她就那樣靜靜地躺著恩掷,像睡著了一般。 火紅的嫁衣襯著肌膚如雪港粱。 梳的紋絲不亂的頭發(fā)上螃成,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天旦签,我揣著相機與錄音查坪,去河邊找鬼。 笑死宁炫,一個胖子當(dāng)著我的面吹牛偿曙,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播羔巢,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼望忆,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了竿秆?” 一聲冷哼從身側(cè)響起启摄,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎幽钢,沒想到半個月后歉备,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡匪燕,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年蕾羊,在試婚紗的時候發(fā)現(xiàn)自己被綠了喧笔。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡龟再,死狀恐怖书闸,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情利凑,我是刑警寧澤浆劲,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站哀澈,受9級特大地震影響梳侨,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜日丹,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一走哺、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧哲虾,春花似錦丙躏、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至汪诉,卻和暖如春废恋,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背扒寄。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工鱼鼓, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人该编。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓迄本,卻偏偏與公主長得像,于是被迫代替她去往敵國和親课竣。 傳聞我的和親對象是個殘疾皇子嘉赎,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容