設(shè)計(jì)背景
消息中間件的本身定義來(lái)考慮,應(yīng)該盡量減少對(duì)于外部第三方中間件的依賴谒出。一般來(lái)說(shuō)依賴的外部系統(tǒng)越多隅俘,也會(huì)使得本身的設(shè)計(jì)越復(fù)雜,采用文件系統(tǒng)作為消息存儲(chǔ)的方式笤喳。
RocketMQ存儲(chǔ)機(jī)制
消息中間件的存儲(chǔ)一般都是利用磁盤为居,一般是使用機(jī)械硬盤,但機(jī)械硬盤的速度比訪問內(nèi)存慢了n個(gè)數(shù)量級(jí)杀狡,一款優(yōu)秀的消息中間件必然會(huì)將硬件資源壓榨到極致颜骤,接下來(lái)看看rocketMq是如何做到高效存儲(chǔ)的。
RocketMQ存儲(chǔ)模型
CommitLog
消息主體以及元數(shù)據(jù)的存儲(chǔ)媒介捣卤,存儲(chǔ)Producer端寫入的消息主體內(nèi)容。單個(gè)文件大小默認(rèn)1G 八孝,文件名長(zhǎng)度為20位董朝,左邊補(bǔ)零,剩余為起始偏移量干跛,比如子姜,00000000000000000000代表了第一個(gè)文件,起始偏移量為0楼入,文件大小為1G=1073741824哥捕;當(dāng)?shù)谝粋€(gè)文件寫滿了,第二個(gè)文件為00000000001073741824嘉熊,起始偏移量為1073741824遥赚,以此類推。消息主要是順序?qū)懭肴罩疚募簦?dāng)文件滿了凫佛,寫入下一個(gè)文件讲坎;
ConsumeQueue
消息消費(fèi)的邏輯隊(duì)列,其中包含了這個(gè)MessageQueue在CommitLog中的起始物理位置偏移量offset愧薛,消息實(shí)體內(nèi)容的大小和Message Tag的哈希值晨炕。
實(shí)際物理存儲(chǔ)來(lái)說(shuō),ConsumeQueue對(duì)應(yīng)每個(gè)Topic和QueueId下面的文件毫炉,單個(gè)文件大小約5.72M瓮栗,每個(gè)文件由30W條數(shù)據(jù)組成,每個(gè)文件默認(rèn)大小為600萬(wàn)個(gè)字節(jié)瞄勾,當(dāng)一個(gè)ConsumeQueue類型的文件寫滿了费奸,則寫入下一個(gè)文件;
IndexFile
生成的索引文件提供訪問服務(wù)丰榴,通過消息Key值查詢消息真正的實(shí)體內(nèi)容货邓。在實(shí)際的物理存儲(chǔ)上,文件名則是以創(chuàng)建時(shí)的時(shí)間戳命名的四濒,固定的單個(gè)IndexFile文件大小約為400M换况,一個(gè)IndexFile可以保存2000W個(gè)索引;
MapedFileQueue
對(duì)連續(xù)物理存儲(chǔ)的抽象封裝類盗蟆,可以通過消息存儲(chǔ)的物理偏移量位置快速定位該offset所在MappedFile(具體物理存儲(chǔ)位置的抽象)戈二、創(chuàng)建、刪除MappedFile等操作喳资;
MappedFile
文件存儲(chǔ)的直接內(nèi)存映射業(yè)務(wù)抽象封裝類觉吭,通過操作該類,可以把消息字節(jié)寫入PageCache緩存區(qū)(commit)仆邓,或者原子性地將消息持久化的刷盤(flush)鲜滩;
RocketMQ消息架構(gòu)
集群有一個(gè)Broker,Topic為binlog的隊(duì)列(Consume Queue)數(shù)量為4节值,如下圖所示徙硅,按順序發(fā)送消息。Consumer端線程采用的是負(fù)載訂閱的方式進(jìn)行消費(fèi)搞疗。
Commit Log和Consume Queue
消息發(fā)送流程架構(gòu)
發(fā)送到相關(guān)服務(wù)節(jié)點(diǎn)
生產(chǎn)到消費(fèi)的轉(zhuǎn)換
總體核心流程
RocketMQ的消息整體是有序的嗓蘑,所以消息按順序?qū)?nèi)容持久化在Commit Log中。Consume Queue則是用于將消息均衡地按序排列在不同的邏輯隊(duì)列匿乃,集群模式下多個(gè)消費(fèi)者就可以并行消費(fèi)Consume Queue的消息桩皿。
- MappedFile 所有的topic數(shù)據(jù)都寫到同一個(gè)文件中,文件的大小默認(rèn)為1G幢炸,使用mmap與磁盤文件做映射泄隔,初始化時(shí)使用mlock將內(nèi)存鎖定,防止pagecache被os交換到swap區(qū)域宛徊。數(shù)據(jù)是順序?qū)懨酚龋瑪?shù)據(jù)寫滿后自動(dòng)創(chuàng)建下個(gè)MappedFile順序?qū)懭搿?/li>
- MappedFileQueue MappedFile的隊(duì)列柜思,存儲(chǔ)封裝了所有的MappedFile實(shí)例。
- CommitLog 封裝了寫入消息和讀取消息的實(shí)現(xiàn)巷燥,根據(jù)MappedFileQueue找到正在寫的MappedFile赡盘,之后將消息寫入到pagecache,最后同步到硬盤上缰揪。
- ConsumerQueue 一個(gè)topic可以設(shè)置多個(gè)queue陨享,每個(gè)consumerQueue對(duì)應(yīng)一個(gè)topic下的queue,相當(dāng)于kafka里的partition概念钝腺。里面存儲(chǔ)了msg在commitLog中的offset抛姑、size、tagsCode艳狐,固定長(zhǎng)度是20字節(jié)定硝,consumer可以根據(jù)消息的offset在commitLog找到具體的消息。
詳細(xì)分析MQ發(fā)送和消費(fèi)流程
消息生產(chǎn)和消費(fèi)通過CommitLog
生產(chǎn)者發(fā)送消息最終寫入的是CommitLog(消息存儲(chǔ)的日志數(shù)據(jù)文件)毫目,Consumer端先從ConsumeQueue(消息邏輯隊(duì)列)讀取持久化消息的起始物理位置偏移量offset蔬啡、大小size和消息Tag的HashCode值,隨后再?gòu)腃ommitLog中進(jìn)行讀取待拉取消費(fèi)消息的真正實(shí)體內(nèi)容部分镀虐;
IndexFile(索引文件)
為了消息查詢提供了一種通過key或時(shí)間區(qū)間來(lái)查詢消息的方法箱蟆, 通過IndexFile來(lái)查找消息的方法不影響發(fā)送與消費(fèi)消息的主流程。
RocketMQ的CommitLog文件采用混合型存儲(chǔ)
所有Topic下的消息隊(duì)列共用同一個(gè)CommitLog的日志數(shù)據(jù)文件刮便,并通過建立類似索引文件—ConsumeQueue的方式來(lái)區(qū)分不同Topic下面的不同MessageQueue的消息空猜,同時(shí)為消費(fèi)消息起到一定的緩沖作用。
只有ReputMessageService異步服務(wù)線程通過doDispatch異步生成了ConsumeQueue隊(duì)列的元素后恨旱,Consumer端才能進(jìn)行消費(fèi)辈毯。
只要消息寫入并刷盤至CommitLog文件后,消息就不會(huì)丟失搜贤,即使ConsumeQueue中的數(shù)據(jù)丟失漓摩,也可以通過CommitLog來(lái)恢復(fù)。
RocketMQ順序讀寫
發(fā)送消息時(shí)入客,生產(chǎn)者端的消息確實(shí)是順序?qū)懭隒ommitLog;
消費(fèi)消息時(shí)腿椎,消費(fèi)者端也是順序讀取ConsumeQueue桌硫,根據(jù)其中的起始物理位置偏移量offset讀取消息是隨機(jī)讀取CommitLog。
在RocketMQ集群整體的吞吐量啃炸、并發(fā)量非常高的情況下铆隘,隨機(jī)讀取文件帶來(lái)的性能開銷影響還是比較大的
RocketMQ存儲(chǔ)架構(gòu)的優(yōu)缺點(diǎn):
優(yōu)點(diǎn):
- ConsumeQueue消息邏輯隊(duì)列較為輕量級(jí);
- 磁盤的訪問串行化南用,避免磁盤竟?fàn)幇蚰疲粫?huì)因?yàn)殛?duì)列增加導(dǎo)致IOWAIT增高掏湾;
缺點(diǎn):
- CommitLog來(lái)說(shuō)寫入消息雖然是順序?qū)懀亲x卻變成了完全的隨機(jī)讀肿嘲;
- Consumer端訂閱消費(fèi)一條消息融击,需要先讀ConsumeQueue,再讀Commit Log雳窟,一定程度上增加了開銷尊浪;
RocketMQ存儲(chǔ)模型
RocketMQ文件存儲(chǔ)模型,根據(jù)類別和作用從概念模型上大致可以劃分為5層
RocketMQ業(yè)務(wù)處理器層: Broker端對(duì)消息進(jìn)行讀取和寫入的業(yè)務(wù)邏輯入口封救,這一層主要包含了業(yè)務(wù)邏輯相關(guān)處理操作(根據(jù)解析RemotingCommand中的RequestCode來(lái)區(qū)分具體的業(yè)務(wù)操作類型拇涤,進(jìn)而執(zhí)行不同的業(yè)務(wù)處理流程),比如前置的檢查和校驗(yàn)步驟誉结、構(gòu)造MessageExtBrokerInner對(duì)象鹅士、decode反序列化、構(gòu)造Response返回對(duì)象等惩坑;
-
RocketMQ數(shù)據(jù)存儲(chǔ)組件層:該層主要是RocketMQ的存儲(chǔ)核心類—DefaultMessageStore掉盅,其為RocketMQ消息數(shù)據(jù)文件的訪問入口,通過該類的“putMessage()”和“getMessage()”方法完成對(duì)CommitLog消息存儲(chǔ)的日志數(shù)據(jù)文件進(jìn)行讀寫操作(具體的讀寫訪問操作還是依賴下一層中CommitLog對(duì)象模型提供的方法)旭贬;在該組件初始化時(shí)候怔接,還會(huì)啟動(dòng)很多存儲(chǔ)相關(guān)的后臺(tái)服務(wù)線程:
- AllocateMappedFileService(MappedFile預(yù)分配服務(wù)線程)
- ReputMessageService(回放存儲(chǔ)消息服務(wù)線程)
- HAService(Broker主從同步高可用服務(wù)線程)
- StoreStatsService(消息存儲(chǔ)統(tǒng)計(jì)服務(wù)線程)
- IndexService(索引文件服務(wù)線程)等;
-
RocketMQ存儲(chǔ)邏輯對(duì)象層: 該層主要包含了RocketMQ數(shù)據(jù)文件存儲(chǔ)直接相關(guān)的三個(gè)模型類IndexFile稀轨、ConsumerQueue和CommitLog扼脐。
- IndexFile為索引數(shù)據(jù)文件提供訪問服務(wù)
- ConsumerQueue為邏輯消息隊(duì)列提供訪問服務(wù)
- CommitLog則為消息存儲(chǔ)的日志數(shù)據(jù)文件提供訪問服務(wù)。
這三個(gè)模型類也是構(gòu)成了RocketMQ存儲(chǔ)層的整體結(jié)構(gòu)奋刽;
-
封裝的文件內(nèi)存映射層: RocketMQ主要采用JDK NIO中的MappedByteBuffer和FileChannel兩種方式完成數(shù)據(jù)文件的讀寫瓦侮。
- 采用MappedByteBuffer這種內(nèi)存映射磁盤文件的方式完成對(duì)大文件的讀寫,在RocketMQ中將該類封裝成MappedFile類佣谐。
- 對(duì)于每類大文件(IndexFile/ConsumerQueue/CommitLog)肚吏,在存儲(chǔ)時(shí)分隔成多個(gè)固定大小的文件(單個(gè)IndexFile文件大小約為400M、單個(gè)ConsumerQueue文件大小約5.72M狭魂、單個(gè)CommitLog文件大小為1G)罚攀,其中每個(gè)分隔文件的文件名為前面所有文件的字節(jié)大小數(shù)+1,即為文件的起始偏移量雌澄,從而實(shí)現(xiàn)了整個(gè)大文件的串聯(lián)斋泄。
每一種類的單個(gè)文件均由MappedFile類提供讀寫操作服務(wù)(其中,MappedFile類提供了順序?qū)?隨機(jī)讀镐牺、內(nèi)存數(shù)據(jù)刷盤炫掐、內(nèi)存清理等和文件相關(guān)的服務(wù));
- 磁盤存儲(chǔ)層: 主要指的是部署RocketMQ服務(wù)器所用的磁盤睬涧。
RocketMQ存儲(chǔ)技術(shù)
主要采用mmap與PageCache募胃,其中mmap內(nèi)存映射技術(shù)—Java中的MappedByteBuffer旗唁。
先簡(jiǎn)單介紹下mmap
mmap一種內(nèi)存映射文件的方法,即將一個(gè)文件或者其它對(duì)象映射到進(jìn)程的地址空間痹束,實(shí)現(xiàn)文件磁盤地址和進(jìn)程虛擬地址空間中一段虛擬地址的一一對(duì)映關(guān)系检疫。實(shí)現(xiàn)這樣的映射關(guān)系后,進(jìn)程就可以采用指針的方式讀寫操作這一段內(nèi)存,而系統(tǒng)會(huì)自動(dòng)回寫臟頁(yè)面到對(duì)應(yīng)的文件磁盤上。內(nèi)核空間對(duì)這段區(qū)域的修改也直接反映用戶空間薇缅,從而可以實(shí)現(xiàn)不同進(jìn)程間的文件共享件炉。
mmap內(nèi)存映射和普通標(biāo)準(zhǔn)IO操作的本質(zhì)區(qū)別在于它并不需要將文件中的數(shù)據(jù)先拷貝至OS的內(nèi)核IO緩沖區(qū),而是可以直接將用戶進(jìn)程私有地址空間中的一塊區(qū)域與文件對(duì)象建立映射關(guān)系,這樣程序就好像可以直接從內(nèi)存中完成對(duì)文件讀/寫操作一樣。
只有當(dāng)缺頁(yè)中斷發(fā)生時(shí),直接將文件從磁盤拷貝至用戶態(tài)的進(jìn)程空間內(nèi)晒来,只進(jìn)行了一次數(shù)據(jù)拷貝。對(duì)于容量較大的文件來(lái)說(shuō)(文件大小一般需要限制在1.5~2G以下郑现,這也是CommitLog設(shè)置成1G的原因)湃崩,采用Mmap的方式其讀/寫的效率和性能都非常高。
- RocketMq默認(rèn)的文件大小為1G接箫,即將1G的文件映射到物理內(nèi)存上攒读。但mmap初始化時(shí)只是將文件磁盤地址和進(jìn)程虛擬地址做了個(gè)映射,并沒有真正的將整個(gè)文件都映射到內(nèi)存中辛友,當(dāng)程序真正訪問這片內(nèi)存時(shí)產(chǎn)生缺頁(yè)異常薄扁,這時(shí)候才會(huì)將文件的內(nèi)容拷貝到page cache。
如果一開始只是做個(gè)映射废累,而到具體寫消息時(shí)才將文件的部分頁(yè)加載到pagecache邓梅,那效率將會(huì)是多么的低下。MappedFile初始化的操作是由單獨(dú)的線程(AllocateMappedFileService)實(shí)現(xiàn)的邑滨,就是對(duì)應(yīng)的生產(chǎn)消費(fèi)模型日缨。RocketMq在初始化MappedFile時(shí)做了內(nèi)存預(yù)熱,事先向page cache 中寫入一些數(shù)據(jù)flush到磁盤掖看,使整個(gè)文件都加載到page cache中匣距。
MappedByteBuffer技術(shù)分析
MappedByteBuffer繼承自ByteBuffer,其內(nèi)部維護(hù)了一個(gè)邏輯地址變量—address哎壳。在建立映射關(guān)系時(shí)毅待,
MappedByteBuffer利用了JDK NIO的FileChannel類提供的map()方法把文件對(duì)象映射到虛擬內(nèi)存。
源碼中map()方法的實(shí)現(xiàn)耳峦,可以發(fā)現(xiàn)最終其通過調(diào)用native方法map0()完成文件對(duì)象的映射工作,同時(shí)使用Util.newMappedByteBuffer()方法初始化MappedByteBuffer實(shí)例焕毫,但最終返回的是DirectByteBuffer的實(shí)例蹲坷。
在Java程序中使用MappedByteBuffer的get()方法來(lái)獲取內(nèi)存數(shù)據(jù)是最終通過DirectByteBuffer.get()方法實(shí)現(xiàn)(底層通過unsafe.getByte()方法驶乾,以“地址 + 偏移量”的方式獲取指定映射至內(nèi)存中的數(shù)據(jù))。
使用Mmap的限制
- mmap映射的內(nèi)存空間釋放的問題
由于映射的內(nèi)存空間本身就不屬于JVM的堆內(nèi)存區(qū)(Java Heap)循签,因此其不受JVM GC的控制级乐,卸載這部分內(nèi)存空間需要通過系統(tǒng)調(diào)用unmap()方法來(lái)實(shí)現(xiàn)。
然而unmap()方法是FileChannelImpl類里實(shí)現(xiàn)的私有方法县匠,無(wú)法直接顯示調(diào)用风科。RocketMQ中的做法是,通過Java反射的方式調(diào)用“sun.misc”包下的Cleaner類的clean()方法來(lái)釋放映射占用的內(nèi)存空間乞旦;
- MappedByteBuffer內(nèi)存映射大小限制
因?yàn)槠湔加玫氖翘摂M內(nèi)存(非JVM的堆內(nèi)存)贼穆,大小不受JVM的-Xmx參數(shù)限制,但其大小也受到OS虛擬內(nèi)存大小的限制兰粉。一般來(lái)說(shuō)故痊,一次只能映射1.5~2G 的文件至用戶態(tài)的虛擬內(nèi)存空間,RocketMQ默認(rèn)設(shè)置單個(gè)CommitLog日志數(shù)據(jù)文件為1G的原因了玖姑;
- 使用MappedByteBuffe的其他問題
會(huì)存在內(nèi)存占用率較高和文件關(guān)閉不確定性的問題愕秫;
OS的PageCache機(jī)制
PageCache是OS對(duì)文件的緩存,用于加速對(duì)文件的讀寫焰络。程序?qū)ξ募M(jìn)行順序讀寫的速度幾乎接近于內(nèi)存的讀寫訪問戴甩,這里的主要原因就是在于OS使用PageCache機(jī)制對(duì)讀寫訪問操作進(jìn)行了性能優(yōu)化,將一部分的內(nèi)存用作PageCache闪彼。
對(duì)于數(shù)據(jù)文件的讀取
如果一次讀取文件時(shí)出現(xiàn)未命中PageCache的情況甜孤,OS從物理磁盤上訪問讀取文件的同時(shí),會(huì)順序?qū)ζ渌噜弶K的數(shù)據(jù)文件進(jìn)行預(yù)讀取备蚓。這樣课蔬,只要下次訪問的文件已經(jīng)被加載至PageCache時(shí),讀取操作的速度基本等于訪問內(nèi)存郊尝。
對(duì)于數(shù)據(jù)文件的寫入
OS會(huì)先寫入至Cache內(nèi)二跋,隨后通過異步的方式由pdflush內(nèi)核線程將Cache內(nèi)的數(shù)據(jù)刷盤至物理磁盤上。
對(duì)于文件的順序讀寫操作來(lái)說(shuō)流昏,讀和寫的區(qū)域都在OS的PageCache內(nèi)扎即,此時(shí)讀寫性能接近于內(nèi)存。
RocketMQ的大致做法是况凉,將數(shù)據(jù)文件映射到OS的虛擬內(nèi)存中(通過JDK NIO的MappedByteBuffer)谚鄙,寫消息的時(shí)候首先寫入PageCache,并通過異步刷盤的方式將消息批量的做持久化(同時(shí)也支持同步刷盤)刁绒;
訂閱消費(fèi)消息時(shí)(對(duì)CommitLog操作是隨機(jī)讀让朴),由于PageCache的局部性熱點(diǎn)原理且整體情況下還是從舊到新的有序讀,因此大部分情況下消息還是可以直接從Page Cache中讀取傻盟,不會(huì)產(chǎn)生太多的缺頁(yè)(Page Fault)中斷而從磁盤讀取速蕊。
PageCache機(jī)制也不是完全無(wú)缺點(diǎn)的,當(dāng)遇到OS進(jìn)行臟頁(yè)回寫娘赴,內(nèi)存回收规哲,內(nèi)存swap等情況時(shí),就會(huì)引起較大的消息讀寫延遲诽表。
對(duì)于這些情況唉锌,RocketMQ采用了多種優(yōu)化技術(shù),比如內(nèi)存預(yù)分配竿奏,文件預(yù)熱袄简,mlock系統(tǒng)調(diào)用等,來(lái)保證在最大可能地發(fā)揮PageCache機(jī)制優(yōu)點(diǎn)的同時(shí)议双,盡可能地減少其缺點(diǎn)帶來(lái)的消息讀寫延遲痘番。
RocketMQ存儲(chǔ)優(yōu)化技術(shù)
RocketMQ存儲(chǔ)層采用的幾項(xiàng)優(yōu)化技術(shù)方案在一定程度上可以減少PageCache的缺點(diǎn)帶來(lái)的影響,主要包括內(nèi)存預(yù)分配平痰,文件預(yù)熱和mlock系統(tǒng)調(diào)用汞舱。
預(yù)先分配MappedFile
在消息寫入過程中(調(diào)用CommitLog的putMessage()方法),CommitLog會(huì)先從MappedFileQueue隊(duì)列中獲取一個(gè) MappedFile宗雇,如果沒有就新建一個(gè)昂芜。
MappedFile的創(chuàng)建過程是將構(gòu)建好的一個(gè)AllocateRequest請(qǐng)求(具體做法是,將下一個(gè)文件的路徑赔蒲、下下個(gè)文件的路徑泌神、文件大小為參數(shù)封裝為AllocateRequest對(duì)象)添加至隊(duì)列中,后臺(tái)運(yùn)行的AllocateMappedFileService服務(wù)線程(在Broker啟動(dòng)時(shí)舞虱,該線程就會(huì)創(chuàng)建并運(yùn)行)欢际,會(huì)不停地run,只要請(qǐng)求隊(duì)列里存在請(qǐng)求矾兜,就會(huì)去執(zhí)行MappedFile映射文件的創(chuàng)建和預(yù)分配工作损趋。
分配的時(shí)候有兩種策略,
一種是使用Mmap的方式來(lái)構(gòu)建MappedFile實(shí)例椅寺,另外一種是從TransientStorePool堆外內(nèi)存池中獲取相應(yīng)的DirectByteBuffer來(lái)構(gòu)建MappedFile浑槽。并且,在創(chuàng)建分配完下個(gè)MappedFile后返帕,還會(huì)將下下個(gè)MappedFile預(yù)先創(chuàng)建并保存至請(qǐng)求隊(duì)列中等待下次獲取時(shí)直接返回桐玻。RocketMQ中預(yù)分配MappedFile的設(shè)計(jì)非常巧妙,下次獲取時(shí)候直接返回就可以不用等待MappedFile創(chuàng)建分配所產(chǎn)生的時(shí)間延遲荆萤。
文件預(yù)熱
預(yù)熱的目的主要有兩點(diǎn)镊靴;
第一點(diǎn),由于僅分配內(nèi)存并進(jìn)行mlock系統(tǒng)調(diào)用后并不會(huì)為程序完全鎖定這些內(nèi)存,因?yàn)槠渲械姆猪?yè)可能是寫時(shí)復(fù)制的偏竟。因此算行,就有必要對(duì)每個(gè)內(nèi)存頁(yè)面中寫入一個(gè)假的值。其中苫耸,RocketMQ是在創(chuàng)建并分配MappedFile的過程中,預(yù)先寫入一些隨機(jī)值至Mmap映射出的內(nèi)存空間里儡陨。
第二褪子,調(diào)用Mmap進(jìn)行內(nèi)存映射后,OS只是建立虛擬內(nèi)存地址至物理地址的映射表骗村,而實(shí)際并沒有加載任何文件至內(nèi)存中嫌褪。程序要訪問數(shù)據(jù)時(shí)OS會(huì)檢查該部分的分頁(yè)是否已經(jīng)在內(nèi)存中,如果不在胚股,則發(fā)出一次缺頁(yè)中斷笼痛。這里,可以想象下1G的CommitLog需要發(fā)生多少次缺頁(yè)中斷琅拌,才能使得對(duì)應(yīng)的數(shù)據(jù)才能完全加載至物理內(nèi)存中缨伊。
RocketMQ的做法是,在做Mmap內(nèi)存映射的同時(shí)進(jìn)行madvise系統(tǒng)調(diào)用进宝,目的是使OS做一次內(nèi)存映射后對(duì)應(yīng)的文件數(shù)據(jù)盡可能多的預(yù)加載至內(nèi)存中刻坊,從而達(dá)到內(nèi)存預(yù)熱的效果。
public void warmMappedFile(FlushDiskType type, int pages) {
long beginTime = System.currentTimeMillis();
// mappedByteBuffer在java里面對(duì)應(yīng)了mmap的實(shí)現(xiàn)
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
int flush = 0;
long time = System.currentTimeMillis();
for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
byteBuffer.put(i, (byte) 0);
// force flush when flush disk type is sync
if (type == FlushDiskType.SYNC_FLUSH) {
// 同步刷盤機(jī)制党晋,OS_PAGE_SIZE為4K
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
flush = i;
mappedByteBuffer.force();
}
}
// prevent gc
if (j % 1000 == 0) {
log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
time = System.currentTimeMillis();
try {
Thread.sleep(0);
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
}
// force flush when prepare load finished
if (type == FlushDiskType.SYNC_FLUSH) {
log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
this.getFileName(), System.currentTimeMillis() - beginTime);
mappedByteBuffer.force();
}
log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
System.currentTimeMillis() - beginTime);
// 將page cache 這片內(nèi)存鎖定
this.mlock();
}
mlock 內(nèi)存鎖定
OS在內(nèi)存充足的情況下谭胚,會(huì)將文件加載到 page cache 提高文件的讀寫效率,但是當(dāng)內(nèi)存不夠用時(shí)未玻,os會(huì)將page cache回收掉灾而。試想如果MappedFile對(duì)應(yīng)的pagecache 被os回收,那就又產(chǎn)生缺頁(yè)異常再次從磁盤加載到pagecache扳剿,會(huì)對(duì)系統(tǒng)性能產(chǎn)生很大的影響旁趟。
將進(jìn)程使用的部分或者全部的地址空間鎖定在物理內(nèi)存中,防止其被交換到swap空間舞终。對(duì)于RocketMQ這種的高吞吐量的分布式消息隊(duì)列來(lái)說(shuō)轻庆,追求的是消息讀寫低延遲,那么肯定希望盡可能地多使用物理內(nèi)存敛劝,提高數(shù)據(jù)讀寫訪問的操作效率余爆。
RocketMq在創(chuàng)建完MappedFile并且內(nèi)存預(yù)熱完成后調(diào)用了c的mlock函數(shù)將這片內(nèi)存鎖定了,具體來(lái)看下是怎么實(shí)現(xiàn)的
// java 調(diào)用c
LibC INSTANCE = (LibC) Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
// 具體實(shí)現(xiàn)
public void mlock() {
final long beginTime = System.currentTimeMillis();
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
Pointer pointer = new Pointer(address);
{
int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
{
int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
}
RocketMQ刷盤機(jī)制
寫消息時(shí)是先寫入到pagecache夸盟,rocketMq提供了兩種刷盤機(jī)制蛾方,同步刷盤和異步刷盤,同步刷盤適用于對(duì)消息可靠性比較高的場(chǎng)合,同步刷盤性能比較低下桩砰,這樣即使系統(tǒng)宕機(jī)消息也不會(huì)丟失拓春。
同步刷盤
RocketMQ的Broker端才會(huì)真正地返回給Producer端一個(gè)成功的ACK響應(yīng)。同步刷盤對(duì)MQ消息可靠性來(lái)說(shuō)是一種不錯(cuò)的保障亚隅,但是性能上會(huì)有較大影響硼莽,一般適用于金融業(yè)務(wù)應(yīng)用領(lǐng)域。
RocketMQ同步刷盤的大致做法是煮纵,基于生產(chǎn)者消費(fèi)者模型懂鸵,主線程創(chuàng)建刷盤請(qǐng)求實(shí)例—GroupCommitRequest并在放入刷盤寫隊(duì)列后喚醒同步刷盤線程—GroupCommitService,來(lái)執(zhí)行刷盤動(dòng)作(其中用了CAS變量和CountDownLatch來(lái)保證線程間的同步)行疏。RocketMQ中用讀寫雙緩存隊(duì)列(requestsWrite/requestsRead)來(lái)實(shí)現(xiàn)讀寫分離匆光,其帶來(lái)的好處在于內(nèi)部消費(fèi)生成的同步刷盤請(qǐng)求可以不用加鎖,提高并發(fā)度酿联。
刷盤線程從阻塞隊(duì)列中獲取终息,刷盤其實(shí)就是調(diào)用了mappedByteBuffer.force()方法,刷盤成功后通過countdownlatch喚醒刷盤等待的線程贞让,原理很簡(jiǎn)單>
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// 同步刷盤
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 對(duì)應(yīng)一個(gè)單獨(dú)的線程
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
// GroupCommitRequest 封裝了CountDownLatch周崭,GroupCommitService刷盤完畢后喚醒等待線程
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// 異步刷盤
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}
異步刷盤
異步刷盤原理 發(fā)送消息線程寫到pagecache成功之后就返回,消息保存在page cache 中喳张,異步刷盤對(duì)應(yīng)了一個(gè)單獨(dú)線程休傍,刷盤默認(rèn)一次刷4個(gè)pageSize,也就是16k的數(shù)據(jù)蹲姐。異步刷盤有可能會(huì)丟失數(shù)據(jù)磨取,當(dāng)jvm程序死掉 但機(jī)器沒有宕機(jī),pagecache中的臟頁(yè)還是能人工刷到磁盤的柴墩,但是當(dāng)機(jī)器宕機(jī)之后忙厌,數(shù)據(jù)就永遠(yuǎn)丟失了。
能夠充分利用OS的PageCache的優(yōu)勢(shì)江咳,只要消息寫入PageCache即可將成功的ACK返回給Producer端逢净。消息刷盤采用后臺(tái)異步線程提交的方式進(jìn)行,降低了讀寫延遲歼指,提高了MQ的性能和吞吐量爹土。異步和同步刷盤的區(qū)別在于,異步刷盤時(shí)踩身,主線程并不會(huì)阻塞胀茵,在將刷盤線程wakeup后,就會(huì)繼續(xù)執(zhí)行挟阻。
RocketMQ的堆外存儲(chǔ)機(jī)制
RocketMq提供了堆外內(nèi)存池機(jī)制即TransientStorePool琼娘,TransientStorePool初始化時(shí)實(shí)例化5個(gè)堆外內(nèi)存峭弟,大小和MappedFile的大小1G,然后mlock鎖定此內(nèi)存區(qū)域脱拼。
發(fā)送消息時(shí)如果開啟了堆外內(nèi)存機(jī)制瞒瘸,MappedFile在實(shí)例化時(shí)從堆外內(nèi)存池中獲取一個(gè)directBuffer實(shí)例,寫消息先寫到堆外內(nèi)存中熄浓,然后有單獨(dú)的線程(CommitRealTimeService)刷到pagecache情臭,之后再由單獨(dú)的線程(FlushRealTimeService)從pagecahce刷到磁盤。
開啟堆外內(nèi)存池的好處:寫消息時(shí)先寫到堆外內(nèi)存赌蔑,純內(nèi)存操作非郴驯快。讀消息時(shí)是從pagecache中讀惯雳,相當(dāng)于實(shí)現(xiàn)了讀寫分離,但是會(huì)存在延時(shí)性機(jī)制問題鸿摇,以及對(duì)外內(nèi)存宕機(jī)了會(huì)丟失石景,數(shù)據(jù)一致性會(huì)存在問題。
消息生產(chǎn)
所有發(fā)送消息的線程是串行執(zhí)行的拙吉,所有topic的數(shù)據(jù)放一塊順序?qū)懙絧agecache中潮孽,因此效率十分的高。在寫 page cache 成功后筷黔,再由單獨(dú)的線程異步構(gòu)建consumerQueue和 indexFile(基于磁盤實(shí)現(xiàn)的hashMap往史,實(shí)現(xiàn)消息的查找),構(gòu)建完成consumerQueue成功后 consumer 就能消費(fèi)到最新的消息了佛舱,當(dāng)然構(gòu)建consumerQueue也是順序?qū)懽道看沃粚懭?0個(gè)字節(jié),占用的空間也不大请祖。
消息消費(fèi)
每個(gè)topic可以對(duì)應(yīng)多個(gè)consumerQueue订歪,就相當(dāng)于kafka里面的分區(qū)概念,Rocketmq里面的消費(fèi)者與consumerQueue的分配算法和kafka的相似肆捕。由于consumerQueue中只保存了消息在commitLog中的offset刷晋、msgSize、tagsCode慎陵,因此需要拿到offset去commitlog中把這條消息撈出來(lái)眼虱,這時(shí)候讀相當(dāng)與隨機(jī)讀。
注意席纽,由前面的mlock內(nèi)存鎖定再加上消費(fèi)的數(shù)據(jù)一般是最近生產(chǎn)的捏悬,數(shù)據(jù)還在pagecache中,對(duì)性能的影響也不大润梯,當(dāng)consumer消費(fèi)很遠(yuǎn)的數(shù)據(jù)時(shí)邮破,pagecache中肯定是沒有緩存的诈豌,這時(shí)候rocketMq建議consumer去slave上讀
總結(jié)
RocketMq所有topic共用一個(gè)commitLog,磁盤順序?qū)懯愫停@一點(diǎn)實(shí)現(xiàn)也是參考了kafka矫渔,讀消息時(shí)根據(jù)consumerQueue去commitLog中吧數(shù)據(jù)撈出來(lái),雖然是隨機(jī)讀摧莽,但是最新的數(shù)據(jù)一般在pagecahce中也無(wú)關(guān)緊要庙洼。使用內(nèi)存鎖定避免內(nèi)存swap交換,堆外內(nèi)存和pagecache的讀寫分離镊辕。