RocketMQ源碼解析——存儲部分(5)IndexFile消息索引日志文件相關(guān)的`IndexService`類

IndexFile文件講解

?之前說了RocketMQ的物理日志文件CommitLog和邏輯日志文件ConsumeQueue。現(xiàn)在說的是對應(yīng)的消息索引文件IndexFile富腊。

概述

?IndexFile(索引文件)提供了一種可以通過key或時間區(qū)間來查詢消息的方法玻蝌。Index文件的存儲位置是:HOME\store\index{fileName}词疼,文件名fileName是以創(chuàng)建時的時間戳命名的帘腹,固定的單個IndexFile文件大小約為400M许饿,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設(shè)計為在文件系統(tǒng)中實現(xiàn)HashMap結(jié)構(gòu)球化,故rocketmq的索引文件其底層實現(xiàn)為hash索引瓦糟。

文件結(jié)構(gòu)

?文件的結(jié)構(gòu)這里參考網(wǎng)上的一張圖


在這里插入圖片描述

?這個圖是整個IndexFile的文件結(jié)構(gòu),主要分為3部分巢掺。第一部分劲蜻,文件頭信息(大小為40 byte);第二部分先嬉,hash槽位(單個槽位4byte,一共500w個)含懊;第三部分,索引信息鏈表部分(單個index為20 byte绢要,一共2000w個)拗小。分三部分進(jìn)行說明:

  • 頭信息Index Head部分:主要記錄整個文件的相關(guān)信息
    • beginTimestamp:第一個索引消息落在Broker的時間戳;
    • endTimestamp:最后一個索引消息落在Broker的時間戳剿配;
    • beginPhyOffset:第一個索引消息在commitlog的偏移量阅束;
    • endPhyOffset:最后一個索引消息在commitlog的偏移量;
    • hashSlotCount:構(gòu)建索引占用的槽位數(shù)息裸;
    • indexCount:構(gòu)建的索引個數(shù)沪编;
  • Hash槽 Slot Table 部分:保存的是消息key在Index部分的位置蚁廓,槽位的確定方式是消息的topic和key中間用#拼接起來(topic#key)然后對總槽樹取模厨幻,計算槽位。
  • Index鏈表部分:Index中存儲的是消息相關(guān)的詳細(xì)信息况脆,和hash沖突時的處理方式
    • keyHash:topic#key結(jié)構(gòu)的Hash值(key是消息的key)
    • phyOffset:commitLog真實的物理位移
    • timeOffset:時間位移,消息的存儲時間與Index Header中beginTimestamp的時間差
    • slotValue解決hash槽沖突的值):當(dāng)topic-key(key是消息的key)的Hash值取500W的余之后得到的Slot Table的slot位置中已經(jīng)有值了(即Hash值取余后在Slot Table中有hash沖突時)看铆,則會用最新的Index值覆蓋盛末,并且將上一個值寫入最新Index的slotValue中,從而形成了一個鏈表的結(jié)構(gòu)肤频。

IndexFile文件相關(guān)的類

IndexFile頭文件相關(guān)的IndexHead

?IndexHead類關(guān)聯(lián)的其實就是IndexFile文件的頭文件相關(guān)的信息,沒有復(fù)雜的方法宵荒,都是一些字段的get和set方法

    //IndexFile的頭大小
    public static final int INDEX_HEADER_SIZE = 40;
    //beginTimestamp:第一個索引消息落在Broker的時間戳
    private static int beginTimestampIndex = 0;
    //endTimestamp:最后一個索引消息落在Broker的時間戳
    private static int endTimestampIndex = 8;
    //beginPhyOffset:第一個索引消息在commitlog的偏移量净嘀;
    private static int beginPhyoffsetIndex = 16;
    //endPhyOffset:最后一個索引消息在commitlog的偏移量;
    private static int endPhyoffsetIndex = 24;
    //hashSlotCount:構(gòu)建索引占用的槽位數(shù)
    private static int hashSlotcountIndex = 32;
    //indexCount:構(gòu)建的索引個數(shù)
    private static int indexCountIndex = 36;
    
    //記錄對應(yīng)信息用的原子類
    private AtomicLong beginTimestamp = new AtomicLong(0);
    private AtomicLong endTimestamp = new AtomicLong(0);
    private AtomicLong beginPhyOffset = new AtomicLong(0);
    private AtomicLong endPhyOffset = new AtomicLong(0);
    private AtomicInteger hashSlotCount = new AtomicInteger(0);
    private AtomicInteger indexCount = new AtomicInteger(1);

?可以看到這里通過6個字段來表示對應(yīng)的6個字段的偏移量暑刃,其中6個字段的值都是用原子類來記錄表示的膜眠。

IndexFile讀寫相關(guān)的IndexFile

?IndexFile文件相關(guān)操作對應(yīng)的就是IndexFile類,提供對IndexFile文件的插入信息和對應(yīng)的查詢操作架谎。

字段屬性
    //hash曹的大小
    private static int hashSlotSize = 4;
    //一個index結(jié)構(gòu)的大小
    private static int indexSize = 20;
    //無效的index
    private static int invalidIndex = 0;
    //hash槽總數(shù)
    private final int hashSlotNum;
    //index的數(shù)量
    private final int indexNum;
    //IndexFile文件的映射文件對象
    private final MappedFile mappedFile;
    private final FileChannel fileChannel;
    private final MappedByteBuffer mappedByteBuffer;
    //IndexFile的頭信息
    private final IndexHeader indexHeader;

?記錄的主要是整個文件中相應(yīng)的單元的單個大小辟躏,和對應(yīng)hash槽和index鏈表的大小。和對應(yīng)文件的映射對象等信息

內(nèi)部方法分析
構(gòu)造方法

?構(gòu)造方法主要就是對應(yīng)的主要參數(shù)的設(shè)置捎琐,根據(jù)入?yún)⒂嬎阏麄€文件的大泄住(IndexFile的頭大小 + hash槽的大小 x hash槽的數(shù)量 + index結(jié)構(gòu)的大小 x index結(jié)構(gòu)的數(shù)量)概页,然后創(chuàng)建文件,設(shè)置文件頭對象IndexHead

public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,
        final long endPhyOffset, final long endTimestamp) throws IOException {
        //計算文件的大小 = IndexFile的頭大小 + hash槽的大小*hash槽的數(shù)量 + index結(jié)構(gòu)的大小*index結(jié)構(gòu)的數(shù)量
        int fileTotalSize =
            IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
        //獲取映射文件對象
        this.mappedFile = new MappedFile(fileName, fileTotalSize);
        //獲取對應(yīng)的channel
        this.fileChannel = this.mappedFile.getFileChannel();
        //獲取對應(yīng)文件的緩存
        this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();
        //設(shè)置hash槽數(shù)量
        this.hashSlotNum = hashSlotNum;
        //設(shè)置index結(jié)構(gòu)的數(shù)量
        this.indexNum = indexNum;

        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        //創(chuàng)建文件對應(yīng)的IndexHead對象
        this.indexHeader = new IndexHeader(byteBuffer);
        //初始化頭文件的beginPhyOffset 和 endPhyOffset
        if (endPhyOffset > 0) {
            this.indexHeader.setBeginPhyOffset(endPhyOffset);
            this.indexHeader.setEndPhyOffset(endPhyOffset);
        }
        //初始化頭文件的beginTimestamp 和 endTimestamp
        if (endTimestamp > 0) {
            this.indexHeader.setBeginTimestamp(endTimestamp);
            this.indexHeader.setEndTimestamp(endTimestamp);
        }
    }
保存key對應(yīng)index的putKey方法

?這個方法的調(diào)用绰沥,是在消息存入CommitLog之后贺待,進(jìn)行消息轉(zhuǎn)存的時候會調(diào)用麸塞。這里簡單貼一些調(diào)用鏈。

ReputMessageService#run
    ReputMessageService#doReput
        DefaultMessageStore#doDispatch
            CommitLogDispatcherBuildIndex#dispatch
                DefaultMessageStore#putMessagePositionInfo
                    IndexService#buildIndex
                        IndexService#putKey
                            IndexFile#putKey

?這個方法根據(jù)傳入的消息的key哪工,消息在CommitLog的物理偏移量弧哎,消息的存儲時間三個參數(shù)來進(jìn)行構(gòu)建索引。主要邏輯過程為:

  1. 檢查IndexHead類中記錄的indexCount值和IndexFile類中記錄的indexNum進(jìn)行比較偎捎,檢查文件是否已經(jīng)滿了,如果滿了直接返回
  2. 計算傳入key對應(yīng)的hash槽的位置茴她,并檢查要插入的槽位是否已經(jīng)存在值了程奠,如果已經(jīng)存在值了,檢查是不是無效值瞄沙,如果不是則需要記錄。在插入index信息的時候保存
  3. 吧當(dāng)前key的索引值申尼,插入對應(yīng)的hash槽中
  4. 計算對應(yīng)的index鏈表的位置肮疗,然后插入index信息,如果之前hash槽分配存在hash沖突伪货,則在把沖突的上一個key的index的值钾怔,保存在slotValue

?源碼如下

 public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        //如果已經(jīng)構(gòu)建的索引index數(shù)量 < 最大的index數(shù)量宗侦,則進(jìn)行插入,否則直接返回 false
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            //計算key 的 hash值矾利,使用的是String自帶的hashcode方法計算
            int keyHash = indexKeyHashMethod(key);
            // 計算key對應(yīng)的hash槽的位置
            int slotPos = keyHash % this.hashSlotNum;
            //計算對應(yīng)槽為的偏移量   IndexFile的頭長度+hash槽的位置*hash槽大小  40+位置*4
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
            FileLock fileLock = null;
            try {

                // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                // false);
                //從對應(yīng)的槽位的位置開始 獲取4個字節(jié)的長度 得到對應(yīng)topic的key對應(yīng)索引的位置
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                //檢查對應(yīng)槽位的值 是不是無效的索引馋袜,如果不是說明這次插入的key跟之前的key沖突,則要取出之前的keu
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }
                // 存儲時間 - 頭文件記錄的開始時間得到 時間差
                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
                //轉(zhuǎn)換時間
                timeDiff = timeDiff / 1000;
                //如果頭文件記錄的開始時間小于0察皇,則時間差記為0 泽台, 如果大于int最大值,則為最大值怀酷,如果時間差小于0,也記錄為0
                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }
                /**
                 * 計算 需要設(shè)置值的index偏移量  IndexFile頭大小+hash槽數(shù)量*hash槽大小+IndexFile的indexCount*index大小
                 * 也就是 40+500w*4+20*indexCount
                 */
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;
                //設(shè)置  index中的 keyHash
                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                //設(shè)置 index中的 phyOffset
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                //設(shè)置 index中的 timeDiff
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                //設(shè)置 index中的 slotValue
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
                //設(shè)置 在hash槽中的 index
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
                //如果indexCount 小于1桅锄,則表示是第一個存入的消息信息 則設(shè)置對應(yīng)的初始信息
                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }
                //如果對應(yīng)的 key的索引是無效索引
                if (invalidIndex == slotValue) {
                    this.indexHeader.incHashSlotCount();
                }
                //增加indexCount值
                this.indexHeader.incIndexCount();
                //設(shè)置對應(yīng)的最后一個消息的偏移量
                this.indexHeader.setEndPhyOffset(phyOffset);
                //設(shè)置對應(yīng)的最后一個消息的存儲時間
                this.indexHeader.setEndTimestamp(storeTimestamp);

                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            } finally {
                if (fileLock != null) {
                    try {
                        //釋放文件鎖
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }

        return false;
    }
根據(jù)時間區(qū)間查詢和key來進(jìn)行查詢消息的selectPhyOffset方法

?根據(jù)消息和落盤時間段來尋找消息在CommitLog上的偏移量竞滓。主要邏輯如下:

  1. 根據(jù)傳入的key吹缔,計算hash槽的位置
  2. 獲取hash槽記錄的index鏈表的位置的值
  3. 獲取index鏈表中的slotValue值是否大于0,大于0表示存在hash沖突厢塘,也就是存在key相同的消息,需要進(jìn)入步驟4進(jìn)一步尋找晚碾,否則直接返回
  4. 根據(jù)slotValue記錄的值,尋找對應(yīng)的index鏈表的index信息笛求。同時校驗,index記錄的timeOffsetIndexHead記錄的beginTimestamp的和是否在傳入的時間區(qū)間內(nèi)探入。在則繼續(xù)獲取slotValue重復(fù)步驟4,直到找到不符合的消息苗膝。

?整個方法就是根據(jù)key計算消息的偏移量。源碼如下

 /**
     * 根據(jù)偏移量和落盤時間段獲取消息的物理偏移量集合
     * @param phyOffsets  封裝邏輯偏移量值的集合
     * @param key 開始尋找的key  結(jié)構(gòu)為消息的topic#key
     * @param maxNum 尋找的數(shù)量
     * @param begin 落盤時間段開始時間
     * @param end 落盤時間段結(jié)束時間
     * @param lock 是否加文件鎖辱揭,現(xiàn)階段是不加鎖
     */
    public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
        final long begin, final long end, boolean lock) {
        if (this.mappedFile.hold()) {
            //計算key的hash槽的位置
            int keyHash = indexKeyHashMethod(key);
            int slotPos = keyHash % this.hashSlotNum;
            //計算槽位的偏移量信息
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;
            try {
                if (lock) {
                    // fileLock = this.fileChannel.lock(absSlotPos,
                    // hashSlotSize, true);
                }
                //獲取key對應(yīng)的索引信息
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                // if (fileLock != null) {
                // fileLock.release();
                // fileLock = null;
                // }
                //如果是無效索引則不處理问窃,意思就是沒有hash沖突的情況下則不進(jìn)一步處理,否則需要獲取之前的沖突的key
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                    || this.indexHeader.getIndexCount() <= 1) {
                } else {
                    //迭代獲取沖突的消息直到?jīng)]有沖突
                    for (int nextIndexToRead = slotValue; ; ) {
                        //獲取完畢泡躯,就結(jié)束
                        if (phyOffsets.size() >= maxNum) {
                            break;
                        }
                        //計算index結(jié)構(gòu)的偏移量
                        int absIndexPos =
                            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                                + nextIndexToRead * indexSize;
                        //獲取key的hash值
                        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                        //獲取消息的物理偏移量
                        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
                        //獲取時間位移
                        long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                        //獲取槽位沖突的上一個key的index信息
                        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
                        //如果時間偏移小于0丽焊,則進(jìn)行處理
                        if (timeDiff < 0) {
                            break;
                        }

                        timeDiff *= 1000L;
                        //計算消息的存儲時間
                        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                        //檢查消息是否符合
                        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
                        //符合條件的消息的 物理偏移量 添加到結(jié)果集中
                        if (keyHash == keyHashRead && timeMatched) {
                            phyOffsets.add(phyOffsetRead);
                        }
                        //如果槽位沖突的上一個key的index信息不合法咕别,則直接跳過,否則處理沖突的key
                        if (prevIndexRead <= invalidIndex
                            || prevIndexRead > this.indexHeader.getIndexCount()
                            || prevIndexRead == nextIndexToRead || timeRead < begin) {
                            break;
                        }
                        nextIndexToRead = prevIndexRead;
                    }
                }
            } catch (Exception e) {
                log.error("selectPhyOffset exception ", e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }

                this.mappedFile.release();
            }
        }
    }

操作IndexFile文件集合的IndexService

?IndexService是對多個IndexFile類的一種封裝雌贱。也是IndexFile文件最外層的操作類偿短。這個類的很多方法和CommitLog文件相關(guān)的CommitLog類以及ConsumeQueue文件相關(guān)的ConsumeQueue類相似∥舳海可以看看前面的兩篇文章,分別分析這兩個類:

  1. ConsumeQueue邏輯日志文件相關(guān)的ConsumeQueue勾怒;
  2. CommitLog物理日志相關(guān)的CommitLog

?這里對于文件的加載,創(chuàng)建和刪除邏輯就不進(jìn)行分析段只,主要看創(chuàng)建消息的索引的方法,和根據(jù)消息以及時間范圍查詢消息集合的方法

字段屬性

?IndexService中的字段赞枕,主要是設(shè)置單個IndexFile文件中的hash槽和index鏈表長度相關(guān)的

 //嘗試創(chuàng)建IndexFile的最大次數(shù)
    private static final int MAX_TRY_IDX_CREATE = 3;
    //消息存儲的操作類
    private final DefaultMessageStore defaultMessageStore;
    //hash槽合數(shù)
    private final int hashSlotNum;
    //index索引鏈表個數(shù)
    private final int indexNum;
    //存儲的路徑
    private final String storePath;
    //IndexFile的集合
    private final ArrayList<IndexFile> indexFileList = new ArrayList<IndexFile>();
    //讀寫鎖
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
內(nèi)部方法
構(gòu)造方法
    public IndexService(final DefaultMessageStore store) {
        this.defaultMessageStore = store;
        //獲取默認(rèn)構(gòu)建的索引個數(shù)  默認(rèn)是的 500w個
        this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
        //設(shè)置索引的個數(shù) 默認(rèn)是 5000000 * 4 也就是2000w個
        this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
        //存儲的路徑
        this.storePath =
            StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
    }

?這里兩個參數(shù)都是可以通過配置來設(shè)置

參數(shù) 含義
maxHashSlotNum IndexFile的hash槽數(shù)量,默認(rèn)500w
maxIndexNum IndexFile的index鏈長度谍椅,默認(rèn)2000w
創(chuàng)建消息索引和保存的buildIndex

?buildIndex方法的邏輯比較簡單古话。就是根據(jù)請求的中的消息的key和topic來構(gòu)建存儲的key結(jié)構(gòu)。然后調(diào)用IndexFile類中的方法陪踩。其中對于事務(wù)消息的回滾類型的消息不進(jìn)行記錄。

public void buildIndex(DispatchRequest req) {
        //嘗試獲取和創(chuàng)建 IndexFile 最大嘗試次數(shù)為3 次
        IndexFile indexFile = retryGetAndCreateIndexFile();
        if (indexFile != null) {
            long endPhyOffset = indexFile.getEndPhyOffset();
            DispatchRequest msg = req;
            //獲取消息轉(zhuǎn)存請求中消息的 topic 和 key
            String topic = msg.getTopic();
            String keys = msg.getKeys();
            //如果消息的CommitLog的物理偏移量 < IndexFile記錄的最后一個消息物理結(jié)束偏移量摘完,則表示消息已經(jīng)記錄了
            if (msg.getCommitLogOffset() < endPhyOffset) {
                return;
            }
            //獲取消息的類型,如果是事務(wù)消息的回滾類型的消息孝治,則直接返回,不進(jìn)行記錄
            final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    break;
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    return;
            }

            if (req.getUniqKey() != null) {
                //保存對應(yīng)的key的 谈飒, key的格式為 topic + "#" + key
                indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
                if (indexFile == null) {
                    log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                    return;
                }
            }

            if (keys != null && keys.length() > 0) {
                String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
                for (int i = 0; i < keyset.length; i++) {
                    String key = keyset[i];
                    if (key.length() > 0) {
                        indexFile = putKey(indexFile, msg, buildKey(topic, key));
                        if (indexFile == null) {
                            log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                            return;
                        }
                    }
                }
            }
        } else {
            log.error("build index error, stop building index");
        }
    }
根據(jù)消息以及時間范圍查詢消息集合的queryOffset

?queryOffset方法也比較簡單杭措,先根據(jù)傳入的落盤時間區(qū)間段,獲取合適的IndexFile文件手素,然后調(diào)用IndexFile類從文件中根據(jù)消息的key和topic獲取消息的物理偏移量集合

public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
        List<Long> phyOffsets = new ArrayList<Long>(maxNum);

        long indexLastUpdateTimestamp = 0;
        long indexLastUpdatePhyoffset = 0;
        //比較此次要獲取的 最大數(shù)量 和 配置的 maxMsgsNumBatch 參數(shù)瘩蚪。 取最大值
        maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
        try {
            this.readWriteLock.readLock().lock();
            //indexFile 不為空 則迭代indexFile 集合
            if (!this.indexFileList.isEmpty()) {
                for (int i = this.indexFileList.size(); i > 0; i--) {
                    // 獲取IndexFile
                    IndexFile f = this.indexFileList.get(i - 1);
                    boolean lastFile = i == this.indexFileList.size();
                    //如果是最后一個IndexFile,則記錄對應(yīng)的 最后記錄時間 和 最大偏移量
                    if (lastFile) {
                        indexLastUpdateTimestamp = f.getEndTimestamp();
                        indexLastUpdatePhyoffset = f.getEndPhyOffset();
                    }
                    /**
                     * 檢查時間是不是符合 疹瘦,
                     * 1. 開始時間和結(jié)束 時間在 IndexFile 頭文件記錄的beginTimestamp 和endTimestamp 中
                     * 2. 開始時間 在 beginTimestamp 和endTimestamp 中
                     * 3. 結(jié)束時間 在 beginTimestamp 和endTimestamp 中
                     */
                    if (f.isTimeMatched(begin, end)) {
                        //獲取符合條件的key的物理偏移量
                        f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);
                    }

                    if (f.getBeginTimestamp() < begin) {
                        break;
                    }

                    if (phyOffsets.size() >= maxNum) {
                        break;
                    }
                }
            }
        } catch (Exception e) {
            log.error("queryMsg exception", e);
        } finally {
            this.readWriteLock.readLock().unlock();
        }

        return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);
    }

下一篇存儲部分(6)RocketMQ主從同步相關(guān)的HAService和HAConnection

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末拱礁,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子呢灶,更是在濱河造成了極大的恐慌,老刑警劉巖鸯乃,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件跋涣,死亡現(xiàn)場離奇詭異鸟悴,居然都是意外死亡,警方通過查閱死者的電腦和手機细诸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來利赋,“玉大人,你說我怎么就攤上這事媚送】艿椋” “怎么了?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵拿霉,是天一觀的道長。 經(jīng)常有香客問我友浸,道長,這世上最難降的妖魔是什么收恢? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任伦意,我火速辦了婚禮,結(jié)果婚禮上驮肉,老公的妹妹穿的比我還像新娘。我一直安慰自己离钝,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布卵渴。 她就那樣靜靜地躺著鲤竹,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上碘橘,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天,我揣著相機與錄音仰禽,去河邊找鬼。 笑死坟瓢,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的折联。 我是一名探鬼主播,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼诚镰,長吁一口氣:“原來是場噩夢啊……” “哼祥款!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起刃跛,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎桨昙,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蛙酪,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年凹蜂,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片玛痊。...
    茶點故事閱讀 38,137評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡狂打,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出菱父,到底是詐尸還是另有隱情剑逃,我是刑警寧澤,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布蛹磺,位于F島的核電站,受9級特大地震影響萤捆,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜俗或,卻給世界環(huán)境...
    茶點故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一岁忘、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧干像,春花似錦、人聲如沸麻汰。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至位喂,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間忆某,已是汗流浹背点待。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留状原,地道東北人。 一個月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓颠区,卻偏偏與公主長得像,于是被迫代替她去往敵國和親毕莱。 傳聞我的和親對象是個殘疾皇子颅夺,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內(nèi)容