前文有簡單的提到RocketMQ
的底層文件存儲模型犀概,基于該存儲模型之上再簡單的探索一下 CommitLog
的一個底層設(shè)計,思考RocketMQ
如何做到高性能夜惭?
對于RoceketMQ
而言姻灶,所有的消息最終都需要被持久化到CommitLog
文件中。
如上圖所示诈茧,可以很粗淺的理解為产喉,CommitLog
描述的是整個CommitLog
目錄,而MappedFileQueue
描述的則是CommitLog File
數(shù)組容器若皱,而MappedFile
描述一個CommitLog File
镊叁。
CommitLog
// commitlog構(gòu)造器
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());// 創(chuàng)建mapperFileQueue
this.defaultMessageStore = defaultMessageStore;
// 刷盤對象線程
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
this.flushCommitLogService = new FlushRealTimeService();
}
// 提交
this.commitLogService = new CommitRealTimeService();
// append消息回調(diào)(描述的是將消息在文件末尾不斷的append上去)
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
@Override
protected MessageExtBatchEncoder initialValue() {
return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
// 消息寫入鎖
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
}
MappedFileQueue#load()
加載CommitLog
目錄下的文件
public boolean load() {
File dir = new File(this.storePath);
File[] files = dir.listFiles();
if (files != null) {
// ascending order
Arrays.sort(files);
for (File file : files) {
if (file.length() != this.mappedFileSize) {// mappedFileSize默認(rèn)1G
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, please check it manually");
return false;// 即只加載大小為1G的文件
}
try {
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
mappedFile.setWrotePosition(this.mappedFileSize);// 已寫位置
mappedFile.setFlushedPosition(this.mappedFileSize);// 設(shè)置已刷盤位置
mappedFile.setCommittedPosition(this.mappedFileSize);//設(shè)置已提交位置
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
}
return true;
}
初始化加載數(shù)據(jù)時尘颓,只加載了1G文件走触,低于1G的文件不做加載處理,那么當(dāng)要寫入時一定要找到一個最新的文件疤苹,或者新建一個文件-->MappedFileQueue#getLastMappedFile()
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);;
} else {
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
由代碼可知互广,創(chuàng)建MappedFile
時,傳入的參數(shù)包括下一個文件路徑卧土,以及下下個文件路徑惫皱,而創(chuàng)建的程序則如AllocateMappedFileService#putRequestAndReturnMappedFile()
創(chuàng)建的方式也特別有意思,通過將創(chuàng)建文件的參數(shù)封裝為一個AllocateRequest
對象并放入阻塞隊列中尤莺,通過另外線程不斷從隊列中取出請求并完成創(chuàng)建旅敷。
不僅創(chuàng)建了當(dāng)前的文件,還可以把下一個文件創(chuàng)建好颤霎,達(dá)到異步預(yù)創(chuàng)建的目的媳谁,減少了創(chuàng)建文件時的時間涂滴,進(jìn)而可以提供系統(tǒng)的吞吐量。
不僅如此晴音,除了預(yù)創(chuàng)建Commitlog File
之外柔纵,從源碼mmapOperation()
方法中可以看到一個方法叫做MappedFile#warmMappedFile()
,字面理解為預(yù)熱锤躁。為什么要做文件預(yù)熱呢搁料?自然是為了提高讀寫性能,提升系統(tǒng)的吞吐量系羞,個人認(rèn)為消息隊列最核心的問題應(yīng)該是消息的堆積能力
郭计,系統(tǒng)吞吐量
,當(dāng)然前提是拋開高可用等問題椒振,畢竟本身消息隊列的作為異步解耦拣宏,削峰填谷的核心訴求注定了業(yè)務(wù)上允許一定的時延。
為了提高IO讀寫的性能杠人,RocketMQ
都做了什么勋乾?
MappedFile#init()
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
ensureDirOK(this.file.getParent());
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);// 內(nèi)存映射手段
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("create file channel " + this.fileName + " Failed. ", e);
throw e;
} catch (IOException e) {
log.error("map file " + this.fileName + " Failed. ", e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
內(nèi)存映射可以在一定條件下提高IO讀寫效率,但是不見得是必備良藥嗡善。之前在網(wǎng)上看過一篇文章對Java的擊中IO操作api進(jìn)行可對比:傳送門
內(nèi)存映射本質(zhì)上是通過將進(jìn)程使用的虛擬內(nèi)存地址映射到物理地址上辑莫,以此提高IO的讀寫。直接對磁盤IO的讀寫性能非常差罩引,比起內(nèi)存的讀寫簡直是差之千里各吨,而內(nèi)存映射可以讓IO的讀寫近乎對內(nèi)存的讀寫。比肩內(nèi)存的讀寫要求是數(shù)據(jù)必須命中pageCache袁铐,那么在pageCache層面揭蜒,RocketMQ
由做了什么優(yōu)化呢?查閱源碼可以確定的時剔桨,RocketMQ
使用了兩個較為底層的方法mlock
屉更,madvise
。這兩個方法的目的是要做什么洒缀?鎖住內(nèi)存瑰谜,以及內(nèi)存預(yù)熱。
內(nèi)存鎖定
linux系統(tǒng)為了優(yōu)化IO讀寫的效率與速度树绩,引入了一種內(nèi)存機(jī)制(物理內(nèi)存)萨脑,即數(shù)據(jù)從磁盤到內(nèi)存的復(fù)制過程由內(nèi)核實現(xiàn),而實現(xiàn)的基礎(chǔ)則是pageCache饺饭,pageCache的大小默認(rèn)是4kb渤早。關(guān)于pageCache的內(nèi)容很多,筆者對此也了解較淺瘫俊,故不做贅述鹊杖,后續(xù)深入了解后再補(bǔ)充提鸟。
物理內(nèi)存是有操作系統(tǒng)級別控制,當(dāng)運行的Java 進(jìn)程結(jié)束后仅淑,物理內(nèi)存也不會理解釋放称勋,該問題進(jìn)一步導(dǎo)致在Linux系統(tǒng)中程序頻繁讀寫文件后,可用物理內(nèi)存變少涯竟。當(dāng)系統(tǒng)的物理內(nèi)存不夠用的時赡鲜,就需要將物理內(nèi)存中的一部分空間釋放出來,以供當(dāng)前運行的程序使用庐船。那些被釋放的空間可能來自一些很長時間沒有什么操作的程序银酬,這些被釋放的空間被臨時保存到Swap空間中,等到那些程序要運行時筐钟,再從Swap分區(qū)中恢復(fù)保存的數(shù)據(jù)到內(nèi)存中揩瞪。這樣,系統(tǒng)總是在物理內(nèi)存不夠時篓冲,才進(jìn)行Swap交換李破。為了減少系統(tǒng)級別的Swap交換,RocketMQ
通過使用mlock
來鎖定內(nèi)存壹将。
mlock
的作用如下:
- 被鎖定的物理內(nèi)存在被解鎖或進(jìn)程退出前嗤攻,不會被頁回收流程處理。
- 被鎖定的物理內(nèi)存诽俯,不會被交換到swap分區(qū)設(shè)備妇菱。
- 進(jìn)程執(zhí)行mlock操作時,內(nèi)核會立刻分配物理內(nèi)存(注意COW的情況)暴区。
內(nèi)存預(yù)熱
日常中使用緩存來解決系統(tǒng)的性能問題闯团,減少對底層數(shù)據(jù)庫的直接讀寫,降低數(shù)據(jù)庫的讀壓力仙粱,這個過程在操作系統(tǒng)IO讀寫亦是同樣的道理房交。pageCache可以理解為系統(tǒng)緩存,而內(nèi)存預(yù)熱的目的則是建議操作系統(tǒng)預(yù)先將文件內(nèi)容加載至pageCache缰盏,當(dāng)讀取數(shù)據(jù)時會優(yōu)先判斷是否命中pageCache涌萤,如果無法命中則會拋出一次缺頁中斷淹遵,直接從磁盤讀取口猜,一次降低了IO吞吐量。
madivse
函數(shù)的意義是建議操作系統(tǒng)加載數(shù)據(jù)至pageCache中透揣,方法參數(shù):int madvise(void *addr 济炎、長度 size_t , int 建議),如下提供兩個常見的建議
:
- madv_willneed
預(yù)計在不久的將來訪問(因此,可能最好已閱讀一些頁面 .) - madv_dontneed
不要期待在不久的將來訪問(用的時間.用給定的范圍后,使內(nèi)核可以釋放與它關(guān)聯(lián)的資源.)在此范圍內(nèi)的頁的后續(xù)訪問都將成功,但從基礎(chǔ)會在重新裝入存儲器內(nèi)容的映射文件(看到mmap(2))在沒有基本映射的頁面請求或零填充辐真。
RocketMQ
在創(chuàng)建文件時正是使用了madv_willneed
须尚,由于文件創(chuàng)建的方式由異步線程完成崖堤,故而內(nèi)存預(yù)熱對于當(dāng)前的IO讀寫影響不大。
MappedFile#appendMessagesInner()
RocketMQ
提供的刷盤方式有兩種耐床,一種是同步刷盤密幔,一種是異步刷盤,同步刷盤號稱數(shù)據(jù)不可能丟失撩轰,果真如此嗎胯甩?
從源碼上看,消息的寫入首先是寫入到進(jìn)程內(nèi)存中
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
然后再通過異步的線程實現(xiàn)刷盤堪嫂,這種方式其實還是存在一定程度可能出現(xiàn)數(shù)據(jù)丟失的情況偎箫。
前文看過一段程序,在Commitlog
初始化時
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService(); // 同步刷盤
} else {
this.flushCommitLogService = new FlushRealTimeService(); // 異步刷盤
}
GroupCommitService
這個類有個比較精妙的設(shè)計皆串,即設(shè)計了一對讀寫GroupCommitRequest
隊列淹办。
讀是相對于刷盤實例,即this對象恶复,而寫則相對于系統(tǒng)的刷盤請求寫入怜森。這么設(shè)計有什么好處呢?實現(xiàn)了讀寫的分離谤牡,當(dāng)系統(tǒng)發(fā)起刷盤請求時不會影響系統(tǒng)繼續(xù)寫入刷盤請求(刷盤是阻塞操作)塔插,并且在完成一次刷盤之后即可進(jìn)行讀寫隊列互換身份(加了同步鎖),繼續(xù)讀寫拓哟。
while (!this.isStopped()) {
try {
this.waitForRunning(10);
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
每10s換一次互換一次隊列想许,并且每10ms進(jìn)行一次刷盤,那么在這10ms內(nèi)如果發(fā)現(xiàn)了宕機(jī)断序,無疑會丟失一部分?jǐn)?shù)據(jù)流纹。
FlushRealTimeService
該類譯名是實時刷盤,但是果真如此嗎违诗?
// 是否實時刷盤漱凝?默認(rèn)是false
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
// 時間間隔,默認(rèn)500诸迟,即500ms
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
// 默認(rèn)刷盤頁數(shù)茸炒,默認(rèn)4
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
// 物理隊列刷盤吞吐時間間隔,默認(rèn)10s
int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
// Print flush progress
long currentTimeMillis = System.currentTimeMillis();
// 如果當(dāng)前系統(tǒng)時間大于上次刷盤時間+物理隊列刷盤吞吐時間間隔
// 理論上來說阵苇,默認(rèn)時間500ms壁公,即currentTimeMillis+500ms不太可能大于上次currentTimeMillis+10s
// 刷盤是阻塞的,如果一次刷盤時間過程绅项,則會將刷盤的頁數(shù)改為0
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
printFlushProgress = (printTimes++ % 10) == 0;
}
// 刷盤頁數(shù)這個值得意義何在呢紊册?
private boolean isAbleToFlush(final int flushLeastPages) {
int flush = this.flushedPosition.get();// 上次刷盤的位置,offset
int write = getReadPosition();// 當(dāng)前寫入位置快耿,offset
if (this.isFull()) {
return true;
}
if (flushLeastPages > 0) {
// (write - flush)/ OS_PAGE_SIZE 表示上次刷盤到現(xiàn)在寫入的字節(jié)數(shù)除以系統(tǒng)默認(rèn)頁面大小囊陡,即4kb
return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
}
return write > flush;// 如果上次寫入大于上次刷盤芳绩,則允許刷盤
}
MappedFile#flush()
// 文件的寫入可以選擇使用
//We only append data to fileChannel or mappedByteBuffer, never both.
if (writeBuffer != null || this.fileChannel.position() != 0) {
this.fileChannel.force(false);
} else {
this.mappedByteBuffer.force();
}
寫入到底是用FileChannel還是MMap好呢?看前文提到的一篇博客撞反,而RocketMQ
提供了一種可選擇性妥色。
看``
this.transientStorePool = new TransientStorePool(messageStoreConfig);
if (messageStoreConfig.isTransientStorePoolEnable()) {// 開啟了該值才可以進(jìn)行初始化
this.transientStorePool.init();
}
// 初始化
/**
* It's a heavy init method.
*/
public void init() {
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
final long address = ((DirectBuffer) byteBuffer).address();// 使用的是堆外內(nèi)存
Pointer pointer = new Pointer(address);
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
availableBuffers.offer(byteBuffer);
}
}
// 過沒有初始化,writeBuffer則為空
public ByteBuffer borrowBuffer() {
ByteBuffer buffer = availableBuffers.pollFirst();
if (availableBuffers.size() < poolSize * 0.4) {
log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size());
}
return buffer;
}
這里用到一個DirectBuffer
遏片,可以稱之為堆外內(nèi)存垛膝,亦可以理解為不收J(rèn)VM管控的內(nèi)存區(qū)域。以正確的姿勢使用堆外內(nèi)存可以在提高IO的讀寫效率丁稀。
為何吼拥?從堆內(nèi)堆外的角度來思考一個文件讀取的過程,比如:要完成一個從文件中讀數(shù)據(jù)到堆內(nèi)內(nèi)存的操作线衫,完成這個操作通常有2種方法凿可,一種即FileChannelImpl.read()。這里實際上File I/O會將數(shù)據(jù)讀到堆外內(nèi)存中授账,然后堆外內(nèi)存再將數(shù)據(jù)拷貝到堆內(nèi)內(nèi)存枯跑。
但是堆外內(nèi)存的創(chuàng)建很重,故而RocketMQ
將堆外內(nèi)存進(jìn)行了池化白热,以此達(dá)到復(fù)用的效果敛助,默認(rèn)是讀取5塊堆外內(nèi)存,即5G內(nèi)容屋确。
是否要使用該方案纳击?
/**
* Enable transient commitLog store pool only if transientStorePoolEnable is true and the FlushDiskType is
* ASYNC_FLUSH
*
* @return <tt>true</tt> or <tt>false</tt>
*/
public boolean isTransientStorePoolEnable() {
return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
&& BrokerRole.SLAVE != getBrokerRole();
}
使用FileChannel寫操作其實是操作的堆外內(nèi)存。
總結(jié)
在IO讀寫操作上攻臀,RocketMQ
的一些優(yōu)化方案的關(guān)鍵詞包括:
- 異步創(chuàng)建文件
- 內(nèi)存鎖定
- 內(nèi)存預(yù)熱
- 堆外內(nèi)存
當(dāng)然焕数,具體怎么使用,怎么配置還是要業(yè)務(wù)刨啸,但是不可否認(rèn)的是RocketMQ
的設(shè)計確實很精妙