RocketMQ MappedFile 預(yù)熱原理解析

創(chuàng)建 MappedFile 文件

創(chuàng)建 MappedFile 文件實現(xiàn)如下:

private boolean mmapOperation() {
    boolean isSuccess = false;
    AllocateRequest req = null;
    try {
        // 從 requestQueue 阻塞隊列中獲取 AllocateRequest  任務(wù)炊林。
        req = this.requestQueue.take();
        AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
        if (null == expectedRequest) {
            log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
                + req.getFileSize());
            return true;
        }
        if (expectedRequest != req) {
            log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "
                + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
            return true;
        }

        if (req.getMappedFile() == null) {
            long beginTime = System.currentTimeMillis();

            MappedFile mappedFile;
            // 判斷是否開啟 isTransientStorePoolEnable 豌注,如果開啟則使用直接內(nèi)存進行寫入數(shù)據(jù),最后從直接內(nèi)存中 commit 到 FileChannel 中。
            if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                try {
                    mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                    mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                } catch (RuntimeException e) {
                    log.warn("Use default implementation.");
                    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                }
            } else {
                // 使用 mmap 方式創(chuàng)建 MappedFile
                mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
            }

            long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
            if (eclipseTime > 10) {
                int queueSize = this.requestQueue.size();
                log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
                    + " " + req.getFilePath() + " " + req.getFileSize());
            }
            // 判斷 mappedFile 大小麸恍,只有 CommitLog 才進行文件預(yù)熱
            // 預(yù)寫入數(shù)據(jù)译仗。按照系統(tǒng)的 pagesize 進行每個pagesize 寫入一個字節(jié)數(shù)據(jù)。
            //為了把mmap 方式映射的文件都加載到內(nèi)存中略号。
            if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
                .getMapedFileSizeCommitLog()
                &&
                this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
                mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                    this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
            }

            req.setMappedFile(mappedFile);
            this.hasException = false;
            isSuccess = true;
        }
    } catch (InterruptedException e) {
        log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
        this.hasException = true;
        return false;
    } catch (IOException e) {
        log.warn(this.getServiceName() + " service has exception. ", e);
        this.hasException = true;
        if (null != req) {
            requestQueue.offer(req);
            try {
                Thread.sleep(1);
            } catch (InterruptedException ignored) {
            }
        }
    } finally {
        if (req != null && isSuccess)
            req.getCountDownLatch().countDown();
    }
    return true;
}

從代碼中可以看出刑峡,只有 MappedFile 的大小等于或大于 CommitLog 的大小并且開啟文件預(yù)熱功能才會預(yù)加載文件。
CommitLog 文件的大小默認為 1 G玄柠。

文件預(yù)熱

文件預(yù)熱的時候需要了解的知識點 操作系統(tǒng)的 Page Cache 和 內(nèi)存映射技術(shù) mmap 突梦。

Page Cache

Page Cache 叫做頁緩存,而每一頁的大小通常是4K羽利,在Linux系統(tǒng)中寫入數(shù)據(jù)的時候并不會直接寫到硬盤上宫患,而是會先寫到Page Cache中,并打上dirty標(biāo)識这弧,由內(nèi)核線程flusher定期將被打上dirty的頁發(fā)送給IO調(diào)度層娃闲,最后由IO調(diào)度決定何時落地到磁盤中,而Linux一般會把還沒有使用的內(nèi)存全拿來給Page Cache使用匾浪。而讀的過程也是類似皇帮,會先到Page Cache中尋找是否有數(shù)據(jù),有的話直接返回蛋辈,如果沒有才會到磁盤中去讀取并寫入Page Cache然后再次讀取Page Cache并返回属拾。而且讀的這個過程中操作系統(tǒng)也會有一個預(yù)讀的操作,你的每一次讀取操作系統(tǒng)都會幫你預(yù)讀出后面一部分數(shù)據(jù),而且當(dāng)你一直在使用預(yù)讀數(shù)據(jù)的時候捌年,系統(tǒng)會幫你預(yù)讀出更多的數(shù)據(jù)(最大到128K)瓢娜。

mmap

mmap是一種將文件映射到虛擬內(nèi)存的技術(shù),可以將文件在磁盤位置的地址和在虛擬內(nèi)存中的虛擬地址通過映射對應(yīng)起來礼预,之后就可以在內(nèi)存這塊區(qū)域進行讀寫數(shù)據(jù)眠砾,而不必調(diào)用系統(tǒng)級別的read,wirte這些函數(shù),從而提升IO操作性能托酸,另外一點就是mmap后的虛擬內(nèi)存大小必須是內(nèi)存頁大小(通常是4K)的倍數(shù)褒颈,之所以這么做是為了匹配內(nèi)存操作。

預(yù)熱代碼

public void warmMappedFile(FlushDiskType type, int pages) {
        long beginTime = System.currentTimeMillis();
        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        int flush = 0;
        long time = System.currentTimeMillis();
        for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
            byteBuffer.put(i, (byte) 0);
            // 如果是同步寫盤操作励堡,則進行強行刷盤操作
            if (type == FlushDiskType.SYNC_FLUSH) {
                if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                    flush = i;
                    mappedByteBuffer.force();
                }
            }

            // prevent gc  (有什么用谷丸?)
            if (j % 1000 == 0) {
                log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
                time = System.currentTimeMillis();
                try {
                    Thread.sleep(0);
                } catch (InterruptedException e) {
                    log.error("Interrupted", e);
                }
            }
        }

        // 把剩余的數(shù)據(jù)強制刷新到磁盤中
        if (type == FlushDiskType.SYNC_FLUSH) {
            log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
                this.getFileName(), System.currentTimeMillis() - beginTime);
            mappedByteBuffer.force();
        }
        log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
            System.currentTimeMillis() - beginTime);

        this.mlock();
    }

這里 MappedFile 已經(jīng)創(chuàng)建,對應(yīng)的 Buffer 為 mappedByteBuffer应结。
mappedByteBuffer 已經(jīng)通過 mmap 映射刨疼,此時操作系統(tǒng)中只是記錄了該文件和該 Buffer 的映射關(guān)系,而沒有映射到物理內(nèi)存中鹅龄。這里就通過對該 MappedFile 的每個 Page Cache 進行寫入一個字節(jié)揩慕,通過讀寫操作把 mmap 映射全部加載到物理內(nèi)存中。

鎖定內(nèi)存 mlock()

    public void mlock() {
        final long beginTime = System.currentTimeMillis();
        final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
        Pointer pointer = new Pointer(address);
        {
            int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
            log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
        }

        {
            int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
            log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
        }
    }

該方法主要是實現(xiàn)文件預(yù)熱后扮休,防止把預(yù)熱過的文件被操作系統(tǒng)調(diào)到swap空間中迎卤。當(dāng)程序在次讀取交換出去的數(shù)據(jù)的時候會產(chǎn)生缺頁異常。

LibC.INSTANCE.mlock 和 LibC.INSTANCE.madvise 都是調(diào)用的 Native 方法玷坠。

  • LibC.INSTANCE.mlock 方法
    實現(xiàn)是將鎖住指定的內(nèi)存區(qū)域避免被操作系統(tǒng)調(diào)到swap空間中蜗搔。
  • LibC.INSTANCE.madvise 方法
    實現(xiàn)是一次性先將一段數(shù)據(jù)讀入到映射內(nèi)存區(qū)域,這樣就減少了缺頁異常的產(chǎn)生八堡。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末樟凄,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子兄渺,更是在濱河造成了極大的恐慌不同,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件溶耘,死亡現(xiàn)場離奇詭異二拐,居然都是意外死亡,警方通過查閱死者的電腦和手機凳兵,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評論 3 385
  • 文/潘曉璐 我一進店門百新,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人庐扫,你說我怎么就攤上這事饭望≌躺冢” “怎么了?”我有些...
    開封第一講書人閱讀 157,435評論 0 348
  • 文/不壞的土叔 我叫張陵铅辞,是天一觀的道長厌漂。 經(jīng)常有香客問我,道長斟珊,這世上最難降的妖魔是什么苇倡? 我笑而不...
    開封第一講書人閱讀 56,509評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮囤踩,結(jié)果婚禮上旨椒,老公的妹妹穿的比我還像新娘。我一直安慰自己堵漱,他們只是感情好综慎,可當(dāng)我...
    茶點故事閱讀 65,611評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著勤庐,像睡著了一般示惊。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上愉镰,一...
    開封第一講書人閱讀 49,837評論 1 290
  • 那天米罚,我揣著相機與錄音,去河邊找鬼岛杀。 笑死,一個胖子當(dāng)著我的面吹牛崭孤,可吹牛的內(nèi)容都是我干的类嗤。 我是一名探鬼主播,決...
    沈念sama閱讀 38,987評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼辨宠,長吁一口氣:“原來是場噩夢啊……” “哼遗锣!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起嗤形,我...
    開封第一講書人閱讀 37,730評論 0 267
  • 序言:老撾萬榮一對情侶失蹤精偿,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后赋兵,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體笔咽,經(jīng)...
    沈念sama閱讀 44,194評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,525評論 2 327
  • 正文 我和宋清朗相戀三年霹期,在試婚紗的時候發(fā)現(xiàn)自己被綠了叶组。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,664評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡历造,死狀恐怖甩十,靈堂內(nèi)的尸體忽然破棺而出船庇,到底是詐尸還是另有隱情,我是刑警寧澤侣监,帶...
    沈念sama閱讀 34,334評論 4 330
  • 正文 年R本政府宣布鸭轮,位于F島的核電站,受9級特大地震影響橄霉,放射性物質(zhì)發(fā)生泄漏窃爷。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,944評論 3 313
  • 文/蒙蒙 一酪劫、第九天 我趴在偏房一處隱蔽的房頂上張望吞鸭。 院中可真熱鬧,春花似錦覆糟、人聲如沸刻剥。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽造虏。三九已至,卻和暖如春麦箍,著一層夾襖步出監(jiān)牢的瞬間漓藕,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評論 1 266
  • 我被黑心中介騙來泰國打工挟裂, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留享钞,地道東北人。 一個月前我還...
    沈念sama閱讀 46,389評論 2 360
  • 正文 我出身青樓诀蓉,卻偏偏與公主長得像栗竖,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子渠啤,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,554評論 2 349