??消息中間價(jià)存儲(chǔ)一般都是利用磁盤,在廉價(jià)的PC機(jī)上一般是使用機(jī)械硬盤,但機(jī)械硬盤的速度比訪問內(nèi)存慢了n個(gè)數(shù)量級(jí)粟关,但一款優(yōu)秀的消息中間件必然會(huì)將硬件資源壓榨到極致,接下來看看rocketMq是如何做到高效存儲(chǔ)的环戈。
1闷板、rocketMq存儲(chǔ)結(jié)構(gòu)
這張流程圖簡(jiǎn)單介紹了rocketMq的存儲(chǔ)實(shí)現(xiàn),先簡(jiǎn)單說明下各自的含義
- MappedFile 所有的topic數(shù)據(jù)都寫到同一個(gè)文件中院塞,文件的大小默認(rèn)為1G遮晚,使用mmap與磁盤文件做映射,初始化時(shí)使用
mlock
將內(nèi)存鎖定拦止,防止pagecache被os交換到swap區(qū)域县遣。數(shù)據(jù)是順序?qū)?/strong>,數(shù)據(jù)寫滿后自動(dòng)創(chuàng)建下個(gè)MappedFile順序?qū)懭搿?/li>- MappedFileQueue MappedFile的隊(duì)列创泄,存儲(chǔ)封裝了所有的MappedFile實(shí)例艺玲。
- CommitLog 封裝了寫入消息和讀取消息的實(shí)現(xiàn),根據(jù)MappedFileQueue找到正在寫的MappedFile鞠抑,之后將消息寫入到pagecache饭聚。
- ConsumerQueue 一個(gè)topic可以設(shè)置多個(gè)queue,每個(gè)consumerQueue對(duì)應(yīng)一個(gè)topic下的queue搁拙,相當(dāng)于kafka里的partition概念秒梳。里面存儲(chǔ)了msg在commitLog中的offset、size箕速、tagsCode酪碘,固定長度是20字節(jié),consumer可以根據(jù)消息的offset在commitLog找到具體的消息盐茎。
2兴垦、高性能存儲(chǔ)實(shí)現(xiàn)
2.1、mmap&&page cache
??先簡(jiǎn)單介紹下mmap,mmap一種內(nèi)存映射文件的方法探越,即將一個(gè)文件或者其它對(duì)象映射到進(jìn)程的地址空間狡赐,實(shí)現(xiàn)文件磁盤地址和進(jìn)程虛擬地址空間中一段虛擬地址的一一對(duì)映關(guān)系。實(shí)現(xiàn)這樣的映射關(guān)系后钦幔,進(jìn)程就可以采用指針的方式讀寫操作這一段內(nèi)存枕屉,而系統(tǒng)會(huì)自動(dòng)回寫臟頁面到對(duì)應(yīng)的文件磁盤上。相反鲤氢,內(nèi)核空間對(duì)這段區(qū)域的修改也直接反映用戶空間搀擂,從而可以實(shí)現(xiàn)不同進(jìn)程間的文件共享。
??rocketMq默認(rèn)的文件大小為1G卷玉,即將1G的文件映射到物理內(nèi)存上哨颂。但mmap初始化時(shí)只是將文件磁盤地址和進(jìn)程虛擬地址做了個(gè)映射,并沒有真正的將整個(gè)文件都映射到內(nèi)存中揍庄,當(dāng)程序真正訪問這片內(nèi)存時(shí)產(chǎn)生缺頁異常咆蒿,這時(shí)候才會(huì)將文件的內(nèi)容拷貝到page cache。試想蚂子,如果一開始只是做個(gè)映射沃测,而到具體寫消息時(shí)才將文件的部分頁加載到pagecache,那效率將會(huì)是多么的低下食茎。MappedFile初始化的操作是由單獨(dú)的線程(AllocateMappedFileService)實(shí)現(xiàn)的蒂破,就是對(duì)應(yīng)的生產(chǎn)消費(fèi)模型。還好rocketMq在初始化MappedFile時(shí)做了內(nèi)存預(yù)熱别渔,事先向page cache 中寫入一些數(shù)據(jù)flush到磁盤附迷,使整個(gè)文件都加載到page cache中。接下來簡(jiǎn)單看下如何預(yù)熱的
public void warmMappedFile(FlushDiskType type, int pages) {
long beginTime = System.currentTimeMillis();
// mappedByteBuffer在java里面對(duì)應(yīng)了mmap的實(shí)現(xiàn)
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
int flush = 0;
long time = System.currentTimeMillis();
for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
byteBuffer.put(i, (byte) 0);
// force flush when flush disk type is sync
if (type == FlushDiskType.SYNC_FLUSH) {
// 同步刷盤機(jī)制哎媚,OS_PAGE_SIZE為4K
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
flush = i;
mappedByteBuffer.force();
}
}
// prevent gc
if (j % 1000 == 0) {
log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
time = System.currentTimeMillis();
try {
Thread.sleep(0);
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
}
// force flush when prepare load finished
if (type == FlushDiskType.SYNC_FLUSH) {
log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
this.getFileName(), System.currentTimeMillis() - beginTime);
mappedByteBuffer.force();
}
log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
System.currentTimeMillis() - beginTime);
// 將page cache 這片內(nèi)存鎖定
this.mlock();
}
2.2喇伯、mlock 內(nèi)存鎖定
??os在內(nèi)存充足的情況下,會(huì)將文件加載到 page cache 提高文件的讀寫效率拨与,但是當(dāng)內(nèi)存不夠用時(shí)稻据,os會(huì)將page cache 回收掉。試想如果MappedFile對(duì)應(yīng)的pagecache 被os回收买喧,那就又產(chǎn)生缺頁異常再次從磁盤加載到pagecache捻悯,會(huì)對(duì)系統(tǒng)性能產(chǎn)生很大的影響。rocketMq在創(chuàng)建完MappedFile并且內(nèi)存預(yù)熱完成后調(diào)用了c的mlock函數(shù)將這片內(nèi)存鎖定了淤毛,具體來看下是怎么實(shí)現(xiàn)的
// java 調(diào)用c
LibC INSTANCE = (LibC) Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
// 具體實(shí)現(xiàn)
public void mlock() {
final long beginTime = System.currentTimeMillis();
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
Pointer pointer = new Pointer(address);
{
int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
{
int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
}
2.3今缚、刷盤機(jī)制
??寫消息時(shí)是先寫入到pagecache,rocketMq提供了兩種刷盤機(jī)制低淡,同步刷盤和異步刷盤姓言,同步刷盤適用于對(duì)消息可靠性比較高的場(chǎng)合瞬项,同步刷盤性能比較低下,這樣即使系統(tǒng)宕機(jī)消息也不會(huì)丟失何荚。如圖所示,此圖來自rocketMq社區(qū)
??下面簡(jiǎn)單介紹下同步刷盤的原理滥壕,同步刷盤機(jī)制下,發(fā)送線程實(shí)例化一個(gè)GroupCommitRequest兽泣,成員變量中有CountDownLatch,然后push到單獨(dú)的刷盤線程(GroupCommitService)中的阻塞隊(duì)列中胁孙,刷盤線程從阻塞隊(duì)列中獲取唠倦,刷盤其實(shí)就是調(diào)用了mappedByteBuffer.force()方法,刷盤成功后通過countdownlatch喚醒刷盤等待的線程涮较,原理很簡(jiǎn)單稠鼻。
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// 同步刷盤
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 對(duì)應(yīng)一個(gè)單獨(dú)的線程
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
// GroupCommitRequest 封裝了CountDownLatch,GroupCommitService刷盤完畢后喚醒等待線程
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// 異步刷盤
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}
??異步刷盤原理 發(fā)送消息線程寫到pagecache成功之后就返回狂票,消息保存在page cache 中候齿,異步刷盤對(duì)應(yīng)了一個(gè)單獨(dú)線程,源碼中刷盤默認(rèn)一次刷4個(gè)pageSize闺属,也就是16k的數(shù)據(jù)慌盯。異步刷盤有可能會(huì)丟失數(shù)據(jù),當(dāng)jvm程序死掉 但機(jī)器沒有宕機(jī)掂器,pagecache 中的臟頁還是能人工刷到磁盤的亚皂,但是當(dāng)機(jī)器宕機(jī)之后,數(shù)據(jù)就永遠(yuǎn)丟失了国瓮。
2.4灭必、堆外內(nèi)存池機(jī)制
??如上圖所示,rocketMq提供了堆外內(nèi)存池機(jī)制即 TransientStorePool乃摹,TransientStorePool初始化時(shí)實(shí)例化5個(gè)堆外內(nèi)存禁漓,大小和MappedFile的大小1G,然后mlock鎖定此內(nèi)存區(qū)域孵睬。發(fā)送消息時(shí)如果開啟了堆外內(nèi)存機(jī)制播歼,MappedFile在實(shí)例化時(shí)從堆外內(nèi)存池中獲取一個(gè)directBuffer實(shí)例,寫消息先寫到堆外內(nèi)存中肪康,然后有單獨(dú)的線程(CommitRealTimeService)刷到pagecache荚恶,之后再由單獨(dú)的線程(FlushRealTimeService)從pagecahce刷到磁盤。
??開啟堆外內(nèi)存池的好處:寫消息時(shí)先寫到堆外內(nèi)存磷支,純內(nèi)存操作非弛撕常快。讀消息時(shí)是從pagecache中讀雾狈,相當(dāng)于實(shí)現(xiàn)了讀寫分離廓潜。
3、消息生產(chǎn)
??由最開始的總體圖可知,所有發(fā)送消息的線程是串行執(zhí)行的辩蛋,所有topic的數(shù)據(jù)放一塊順序?qū)懙絧agecache中呻畸,因此效率十分的高。在寫 page cache 成功后悼院,再由單獨(dú)的線程異步構(gòu)建consumerQueue和 indexFile(基于磁盤實(shí)現(xiàn)的hashMap伤为,實(shí)現(xiàn)消息的查找),構(gòu)建完成consumerQueue成功后 consumer 就能消費(fèi)到最新的消息了据途,當(dāng)然構(gòu)建consumerQueue也是順序?qū)懡视蓿看沃粚懭?0個(gè)字節(jié),占用的空間也不大颖医。
4位衩、消息消費(fèi)
??每個(gè)topic可以對(duì)應(yīng)多個(gè)consumerQueue,就相當(dāng)于kafka里面的分區(qū)概念熔萧。rocketmq里面的消費(fèi)者與consumerQueue的分配算法和kafka的相似糖驴。由于consumerQueue中只保存了消息在commitLog中的offset、msgSize佛致、tagsCode贮缕,因此需要拿到offset去commitlog中把這條消息撈出來,注意,這時(shí)候讀相當(dāng)與隨機(jī)讀,由前面的mlock內(nèi)存鎖定再加上消費(fèi)的數(shù)據(jù)一般是最近生產(chǎn)的晌杰,所有數(shù)據(jù)還在pagecache中跷睦,對(duì)性能的影響也不大。有一點(diǎn)肋演,當(dāng)consumer消費(fèi)很遠(yuǎn)的數(shù)據(jù)時(shí)抑诸,pagecache中肯定是沒有緩存的,這時(shí)候rocketMq建議consumer去slave上讀爹殊,多好的設(shè)計(jì)啊蜕乡。
5、總結(jié)
??rocketMq所有topic共用一個(gè)commitLog梗夸,磁盤順序?qū)懖懔幔@一點(diǎn)實(shí)現(xiàn)也是參考了kafka,讀消息時(shí)根據(jù)consumerQueue去commitLog中吧數(shù)據(jù)撈出來反症,雖然是隨機(jī)讀辛块,但是最新的數(shù)據(jù)一般在pagecahce中也無關(guān)緊要。一款優(yōu)秀的中間件要把硬件的性能發(fā)揮到極致和考慮到操作系統(tǒng)的相關(guān)特性铅碍,比如使用內(nèi)存鎖定避免內(nèi)存swap交換机断,堆外內(nèi)存和pagecache的讀寫分離老充。以上這些都是看了看rocketMq的存儲(chǔ)源碼總結(jié)出來的女坑,如有錯(cuò)誤歡迎指正~