創(chuàng)建 MappedFile 文件
創(chuàng)建 MappedFile 文件實現(xiàn)如下:
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
// 從 requestQueue 阻塞隊列中獲取 AllocateRequest 任務(wù)炊林。
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
if (null == expectedRequest) {
log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize());
return true;
}
if (expectedRequest != req) {
log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
return true;
}
if (req.getMappedFile() == null) {
long beginTime = System.currentTimeMillis();
MappedFile mappedFile;
// 判斷是否開啟 isTransientStorePoolEnable 豌注,如果開啟則使用直接內(nèi)存進行寫入數(shù)據(jù),最后從直接內(nèi)存中 commit 到 FileChannel 中。
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
// 使用 mmap 方式創(chuàng)建 MappedFile
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
if (eclipseTime > 10) {
int queueSize = this.requestQueue.size();
log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
+ " " + req.getFilePath() + " " + req.getFileSize());
}
// 判斷 mappedFile 大小麸恍,只有 CommitLog 才進行文件預(yù)熱
// 預(yù)寫入數(shù)據(jù)译仗。按照系統(tǒng)的 pagesize 進行每個pagesize 寫入一個字節(jié)數(shù)據(jù)。
//為了把mmap 方式映射的文件都加載到內(nèi)存中略号。
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMapedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
this.hasException = true;
return false;
} catch (IOException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.hasException = true;
if (null != req) {
requestQueue.offer(req);
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
}
}
} finally {
if (req != null && isSuccess)
req.getCountDownLatch().countDown();
}
return true;
}
從代碼中可以看出刑峡,只有 MappedFile 的大小等于或大于 CommitLog 的大小并且開啟文件預(yù)熱功能才會預(yù)加載文件。
CommitLog 文件的大小默認為 1 G玄柠。
文件預(yù)熱
文件預(yù)熱的時候需要了解的知識點 操作系統(tǒng)的 Page Cache 和 內(nèi)存映射技術(shù) mmap 突梦。
Page Cache
Page Cache 叫做頁緩存,而每一頁的大小通常是4K羽利,在Linux系統(tǒng)中寫入數(shù)據(jù)的時候并不會直接寫到硬盤上宫患,而是會先寫到Page Cache中,并打上dirty標(biāo)識这弧,由內(nèi)核線程flusher定期將被打上dirty的頁發(fā)送給IO調(diào)度層娃闲,最后由IO調(diào)度決定何時落地到磁盤中,而Linux一般會把還沒有使用的內(nèi)存全拿來給Page Cache使用匾浪。而讀的過程也是類似皇帮,會先到Page Cache中尋找是否有數(shù)據(jù),有的話直接返回蛋辈,如果沒有才會到磁盤中去讀取并寫入Page Cache然后再次讀取Page Cache并返回属拾。而且讀的這個過程中操作系統(tǒng)也會有一個預(yù)讀的操作,你的每一次讀取操作系統(tǒng)都會幫你預(yù)讀出后面一部分數(shù)據(jù),而且當(dāng)你一直在使用預(yù)讀數(shù)據(jù)的時候捌年,系統(tǒng)會幫你預(yù)讀出更多的數(shù)據(jù)(最大到128K)瓢娜。
mmap
mmap是一種將文件映射到虛擬內(nèi)存的技術(shù),可以將文件在磁盤位置的地址和在虛擬內(nèi)存中的虛擬地址通過映射對應(yīng)起來礼预,之后就可以在內(nèi)存這塊區(qū)域進行讀寫數(shù)據(jù)眠砾,而不必調(diào)用系統(tǒng)級別的read,wirte這些函數(shù),從而提升IO操作性能托酸,另外一點就是mmap后的虛擬內(nèi)存大小必須是內(nèi)存頁大小(通常是4K)的倍數(shù)褒颈,之所以這么做是為了匹配內(nèi)存操作。
預(yù)熱代碼
public void warmMappedFile(FlushDiskType type, int pages) {
long beginTime = System.currentTimeMillis();
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
int flush = 0;
long time = System.currentTimeMillis();
for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
byteBuffer.put(i, (byte) 0);
// 如果是同步寫盤操作励堡,則進行強行刷盤操作
if (type == FlushDiskType.SYNC_FLUSH) {
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
flush = i;
mappedByteBuffer.force();
}
}
// prevent gc (有什么用谷丸?)
if (j % 1000 == 0) {
log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
time = System.currentTimeMillis();
try {
Thread.sleep(0);
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
}
// 把剩余的數(shù)據(jù)強制刷新到磁盤中
if (type == FlushDiskType.SYNC_FLUSH) {
log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
this.getFileName(), System.currentTimeMillis() - beginTime);
mappedByteBuffer.force();
}
log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
System.currentTimeMillis() - beginTime);
this.mlock();
}
這里 MappedFile 已經(jīng)創(chuàng)建,對應(yīng)的 Buffer 為 mappedByteBuffer应结。
mappedByteBuffer 已經(jīng)通過 mmap 映射刨疼,此時操作系統(tǒng)中只是記錄了該文件和該 Buffer 的映射關(guān)系,而沒有映射到物理內(nèi)存中鹅龄。這里就通過對該 MappedFile 的每個 Page Cache 進行寫入一個字節(jié)揩慕,通過讀寫操作把 mmap 映射全部加載到物理內(nèi)存中。
鎖定內(nèi)存 mlock()
public void mlock() {
final long beginTime = System.currentTimeMillis();
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
Pointer pointer = new Pointer(address);
{
int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
{
int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
}
該方法主要是實現(xiàn)文件預(yù)熱后扮休,防止把預(yù)熱過的文件被操作系統(tǒng)調(diào)到swap空間中迎卤。當(dāng)程序在次讀取交換出去的數(shù)據(jù)的時候會產(chǎn)生缺頁異常。
LibC.INSTANCE.mlock 和 LibC.INSTANCE.madvise 都是調(diào)用的 Native 方法玷坠。
- LibC.INSTANCE.mlock 方法
實現(xiàn)是將鎖住指定的內(nèi)存區(qū)域避免被操作系統(tǒng)調(diào)到swap空間中蜗搔。 - LibC.INSTANCE.madvise 方法
實現(xiàn)是一次性先將一段數(shù)據(jù)讀入到映射內(nèi)存區(qū)域,這樣就減少了缺頁異常的產(chǎn)生八堡。