??【Alibaba中間件技術(shù)系列】「RocketMQ技術(shù)專題」系統(tǒng)服務(wù)底層原理以及高性能存儲(chǔ)設(shè)計(jì)分析

設(shè)計(jì)背景

消息中間件的本身定義來(lái)考慮,應(yīng)該盡量減少對(duì)于外部第三方中間件的依賴谒出。一般來(lái)說(shuō)依賴的外部系統(tǒng)越多隅俘,也會(huì)使得本身的設(shè)計(jì)越復(fù)雜,采用文件系統(tǒng)作為消息存儲(chǔ)的方式笤喳。

RocketMQ存儲(chǔ)機(jī)制

消息中間件的存儲(chǔ)一般都是利用磁盤为居,一般是使用機(jī)械硬盤,但機(jī)械硬盤的速度比訪問內(nèi)存慢了n個(gè)數(shù)量級(jí)杀狡,一款優(yōu)秀的消息中間件必然會(huì)將硬件資源壓榨到極致颜骤,接下來(lái)看看rocketMq是如何做到高效存儲(chǔ)的。

RocketMQ存儲(chǔ)模型

CommitLog

消息主體以及元數(shù)據(jù)的存儲(chǔ)媒介捣卤,存儲(chǔ)Producer端寫入的消息主體內(nèi)容。單個(gè)文件大小默認(rèn)1G 八孝,文件名長(zhǎng)度為20位董朝,左邊補(bǔ)零,剩余為起始偏移量干跛,比如子姜,00000000000000000000代表了第一個(gè)文件,起始偏移量為0楼入,文件大小為1G=1073741824哥捕;當(dāng)?shù)谝粋€(gè)文件寫滿了,第二個(gè)文件為00000000001073741824嘉熊,起始偏移量為1073741824遥赚,以此類推。消息主要是順序?qū)懭肴罩疚募簦?dāng)文件滿了凫佛,寫入下一個(gè)文件讲坎;

ConsumeQueue

  • 消息消費(fèi)的邏輯隊(duì)列,其中包含了這個(gè)MessageQueue在CommitLog中的起始物理位置偏移量offset愧薛,消息實(shí)體內(nèi)容的大小和Message Tag的哈希值晨炕。

  • 實(shí)際物理存儲(chǔ)來(lái)說(shuō),ConsumeQueue對(duì)應(yīng)每個(gè)Topic和QueueId下面的文件毫炉,單個(gè)文件大小約5.72M瓮栗,每個(gè)文件由30W條數(shù)據(jù)組成,每個(gè)文件默認(rèn)大小為600萬(wàn)個(gè)字節(jié)瞄勾,當(dāng)一個(gè)ConsumeQueue類型的文件寫滿了费奸,則寫入下一個(gè)文件;

IndexFile

生成的索引文件提供訪問服務(wù)丰榴,通過消息Key值查詢消息真正的實(shí)體內(nèi)容货邓。在實(shí)際的物理存儲(chǔ)上,文件名則是以創(chuàng)建時(shí)的時(shí)間戳命名的四濒,固定的單個(gè)IndexFile文件大小約為400M换况,一個(gè)IndexFile可以保存2000W個(gè)索引;

MapedFileQueue

對(duì)連續(xù)物理存儲(chǔ)的抽象封裝類盗蟆,可以通過消息存儲(chǔ)的物理偏移量位置快速定位該offset所在MappedFile(具體物理存儲(chǔ)位置的抽象)戈二、創(chuàng)建、刪除MappedFile等操作喳资;

MappedFile

文件存儲(chǔ)的直接內(nèi)存映射業(yè)務(wù)抽象封裝類觉吭,通過操作該類,可以把消息字節(jié)寫入PageCache緩存區(qū)(commit)仆邓,或者原子性地將消息持久化的刷盤(flush)鲜滩;

RocketMQ消息架構(gòu)

集群有一個(gè)Broker,Topic為binlog的隊(duì)列(Consume Queue)數(shù)量為4节值,如下圖所示徙硅,按順序發(fā)送消息。Consumer端線程采用的是負(fù)載訂閱的方式進(jìn)行消費(fèi)搞疗。

Commit Log和Consume Queue

消息發(fā)送流程架構(gòu)

image

發(fā)送到相關(guān)服務(wù)節(jié)點(diǎn)

image

生產(chǎn)到消費(fèi)的轉(zhuǎn)換

image

總體核心流程

RocketMQ的消息整體是有序的嗓蘑,所以消息按順序?qū)?nèi)容持久化在Commit Log中。Consume Queue則是用于將消息均衡地按序排列在不同的邏輯隊(duì)列匿乃,集群模式下多個(gè)消費(fèi)者就可以并行消費(fèi)Consume Queue的消息桩皿。

  1. MappedFile 所有的topic數(shù)據(jù)都寫到同一個(gè)文件中,文件的大小默認(rèn)為1G幢炸,使用mmap與磁盤文件做映射泄隔,初始化時(shí)使用mlock將內(nèi)存鎖定,防止pagecache被os交換到swap區(qū)域宛徊。數(shù)據(jù)是順序?qū)懨酚龋瑪?shù)據(jù)寫滿后自動(dòng)創(chuàng)建下個(gè)MappedFile順序?qū)懭搿?/li>
  2. MappedFileQueue MappedFile的隊(duì)列柜思,存儲(chǔ)封裝了所有的MappedFile實(shí)例。
  3. CommitLog 封裝了寫入消息和讀取消息的實(shí)現(xiàn)巷燥,根據(jù)MappedFileQueue找到正在寫的MappedFile赡盘,之后將消息寫入到pagecache,最后同步到硬盤上缰揪。
  4. ConsumerQueue 一個(gè)topic可以設(shè)置多個(gè)queue陨享,每個(gè)consumerQueue對(duì)應(yīng)一個(gè)topic下的queue,相當(dāng)于kafka里的partition概念钝腺。里面存儲(chǔ)了msg在commitLog中的offset抛姑、size、tagsCode艳狐,固定長(zhǎng)度是20字節(jié)定硝,consumer可以根據(jù)消息的offset在commitLog找到具體的消息。
image

詳細(xì)分析MQ發(fā)送和消費(fèi)流程

消息生產(chǎn)和消費(fèi)通過CommitLog

生產(chǎn)者發(fā)送消息最終寫入的是CommitLog(消息存儲(chǔ)的日志數(shù)據(jù)文件)毫目,Consumer端先從ConsumeQueue(消息邏輯隊(duì)列)讀取持久化消息的起始物理位置偏移量offset蔬啡、大小size和消息Tag的HashCode值,隨后再?gòu)腃ommitLog中進(jìn)行讀取待拉取消費(fèi)消息的真正實(shí)體內(nèi)容部分镀虐;

IndexFile(索引文件)

為了消息查詢提供了一種通過key或時(shí)間區(qū)間來(lái)查詢消息的方法箱蟆, 通過IndexFile來(lái)查找消息的方法不影響發(fā)送與消費(fèi)消息的主流程。

RocketMQ的CommitLog文件采用混合型存儲(chǔ)

所有Topic下的消息隊(duì)列共用同一個(gè)CommitLog的日志數(shù)據(jù)文件刮便,并通過建立類似索引文件—ConsumeQueue的方式來(lái)區(qū)分不同Topic下面的不同MessageQueue的消息空猜,同時(shí)為消費(fèi)消息起到一定的緩沖作用。

  • 只有ReputMessageService異步服務(wù)線程通過doDispatch異步生成了ConsumeQueue隊(duì)列的元素后恨旱,Consumer端才能進(jìn)行消費(fèi)辈毯。

  • 只要消息寫入并刷盤至CommitLog文件后,消息就不會(huì)丟失搜贤,即使ConsumeQueue中的數(shù)據(jù)丟失漓摩,也可以通過CommitLog來(lái)恢復(fù)。

RocketMQ順序讀寫

  • 發(fā)送消息時(shí)入客,生產(chǎn)者端的消息確實(shí)是順序?qū)懭隒ommitLog;

  • 消費(fèi)消息時(shí)腿椎,消費(fèi)者端也是順序讀取ConsumeQueue桌硫,根據(jù)其中的起始物理位置偏移量offset讀取消息是隨機(jī)讀取CommitLog。

在RocketMQ集群整體的吞吐量啃炸、并發(fā)量非常高的情況下铆隘,隨機(jī)讀取文件帶來(lái)的性能開銷影響還是比較大的

RocketMQ存儲(chǔ)架構(gòu)的優(yōu)缺點(diǎn):

優(yōu)點(diǎn):
  1. ConsumeQueue消息邏輯隊(duì)列較為輕量級(jí);
  2. 磁盤的訪問串行化南用,避免磁盤竟?fàn)幇蚰疲粫?huì)因?yàn)殛?duì)列增加導(dǎo)致IOWAIT增高掏湾;
缺點(diǎn):
  1. CommitLog來(lái)說(shuō)寫入消息雖然是順序?qū)懀亲x卻變成了完全的隨機(jī)讀肿嘲;
  2. Consumer端訂閱消費(fèi)一條消息融击,需要先讀ConsumeQueue,再讀Commit Log雳窟,一定程度上增加了開銷尊浪;

RocketMQ存儲(chǔ)模型

RocketMQ文件存儲(chǔ)模型,根據(jù)類別和作用從概念模型上大致可以劃分為5層

image
  1. RocketMQ業(yè)務(wù)處理器層: Broker端對(duì)消息進(jìn)行讀取和寫入的業(yè)務(wù)邏輯入口封救,這一層主要包含了業(yè)務(wù)邏輯相關(guān)處理操作(根據(jù)解析RemotingCommand中的RequestCode來(lái)區(qū)分具體的業(yè)務(wù)操作類型拇涤,進(jìn)而執(zhí)行不同的業(yè)務(wù)處理流程),比如前置的檢查和校驗(yàn)步驟誉结、構(gòu)造MessageExtBrokerInner對(duì)象鹅士、decode反序列化、構(gòu)造Response返回對(duì)象等惩坑;

  2. RocketMQ數(shù)據(jù)存儲(chǔ)組件層:該層主要是RocketMQ的存儲(chǔ)核心類—DefaultMessageStore掉盅,其為RocketMQ消息數(shù)據(jù)文件的訪問入口,通過該類的“putMessage()”和“getMessage()”方法完成對(duì)CommitLog消息存儲(chǔ)的日志數(shù)據(jù)文件進(jìn)行讀寫操作(具體的讀寫訪問操作還是依賴下一層中CommitLog對(duì)象模型提供的方法)旭贬;在該組件初始化時(shí)候怔接,還會(huì)啟動(dòng)很多存儲(chǔ)相關(guān)的后臺(tái)服務(wù)線程:

    • AllocateMappedFileService(MappedFile預(yù)分配服務(wù)線程)
    • ReputMessageService(回放存儲(chǔ)消息服務(wù)線程)
    • HAService(Broker主從同步高可用服務(wù)線程)
    • StoreStatsService(消息存儲(chǔ)統(tǒng)計(jì)服務(wù)線程)
    • IndexService(索引文件服務(wù)線程)等;
  3. RocketMQ存儲(chǔ)邏輯對(duì)象層: 該層主要包含了RocketMQ數(shù)據(jù)文件存儲(chǔ)直接相關(guān)的三個(gè)模型類IndexFile稀轨、ConsumerQueue和CommitLog扼脐。

    • IndexFile為索引數(shù)據(jù)文件提供訪問服務(wù)
    • ConsumerQueue為邏輯消息隊(duì)列提供訪問服務(wù)
    • CommitLog則為消息存儲(chǔ)的日志數(shù)據(jù)文件提供訪問服務(wù)。

這三個(gè)模型類也是構(gòu)成了RocketMQ存儲(chǔ)層的整體結(jié)構(gòu)奋刽;

  1. 封裝的文件內(nèi)存映射層: RocketMQ主要采用JDK NIO中的MappedByteBuffer和FileChannel兩種方式完成數(shù)據(jù)文件的讀寫瓦侮。

    • 采用MappedByteBuffer這種內(nèi)存映射磁盤文件的方式完成對(duì)大文件的讀寫,在RocketMQ中將該類封裝成MappedFile類佣谐。
    • 對(duì)于每類大文件(IndexFile/ConsumerQueue/CommitLog)肚吏,在存儲(chǔ)時(shí)分隔成多個(gè)固定大小的文件(單個(gè)IndexFile文件大小約為400M、單個(gè)ConsumerQueue文件大小約5.72M狭魂、單個(gè)CommitLog文件大小為1G)罚攀,其中每個(gè)分隔文件的文件名為前面所有文件的字節(jié)大小數(shù)+1,即為文件的起始偏移量雌澄,從而實(shí)現(xiàn)了整個(gè)大文件的串聯(lián)斋泄。

每一種類的單個(gè)文件均由MappedFile類提供讀寫操作服務(wù)(其中,MappedFile類提供了順序?qū)?隨機(jī)讀镐牺、內(nèi)存數(shù)據(jù)刷盤炫掐、內(nèi)存清理等和文件相關(guān)的服務(wù));

  1. 磁盤存儲(chǔ)層: 主要指的是部署RocketMQ服務(wù)器所用的磁盤睬涧。

RocketMQ存儲(chǔ)技術(shù)

主要采用mmap與PageCache募胃,其中mmap內(nèi)存映射技術(shù)—Java中的MappedByteBuffer旗唁。

先簡(jiǎn)單介紹下mmap

mmap一種內(nèi)存映射文件的方法,即將一個(gè)文件或者其它對(duì)象映射到進(jìn)程的地址空間痹束,實(shí)現(xiàn)文件磁盤地址和進(jìn)程虛擬地址空間中一段虛擬地址的一一對(duì)映關(guān)系检疫。實(shí)現(xiàn)這樣的映射關(guān)系后,進(jìn)程就可以采用指針的方式讀寫操作這一段內(nèi)存,而系統(tǒng)會(huì)自動(dòng)回寫臟頁(yè)面到對(duì)應(yīng)的文件磁盤上。內(nèi)核空間對(duì)這段區(qū)域的修改也直接反映用戶空間薇缅,從而可以實(shí)現(xiàn)不同進(jìn)程間的文件共享件炉。

mmap內(nèi)存映射和普通標(biāo)準(zhǔn)IO操作的本質(zhì)區(qū)別在于它并不需要將文件中的數(shù)據(jù)先拷貝至OS的內(nèi)核IO緩沖區(qū),而是可以直接將用戶進(jìn)程私有地址空間中的一塊區(qū)域與文件對(duì)象建立映射關(guān)系,這樣程序就好像可以直接從內(nèi)存中完成對(duì)文件讀/寫操作一樣。

只有當(dāng)缺頁(yè)中斷發(fā)生時(shí),直接將文件從磁盤拷貝至用戶態(tài)的進(jìn)程空間內(nèi)晒来,只進(jìn)行了一次數(shù)據(jù)拷貝。對(duì)于容量較大的文件來(lái)說(shuō)(文件大小一般需要限制在1.5~2G以下郑现,這也是CommitLog設(shè)置成1G的原因)湃崩,采用Mmap的方式其讀/寫的效率和性能都非常高。

image
  • RocketMq默認(rèn)的文件大小為1G接箫,即將1G的文件映射到物理內(nèi)存上攒读。但mmap初始化時(shí)只是將文件磁盤地址和進(jìn)程虛擬地址做了個(gè)映射,并沒有真正的將整個(gè)文件都映射到內(nèi)存中辛友,當(dāng)程序真正訪問這片內(nèi)存時(shí)產(chǎn)生缺頁(yè)異常薄扁,這時(shí)候才會(huì)將文件的內(nèi)容拷貝到page cache。
image

如果一開始只是做個(gè)映射废累,而到具體寫消息時(shí)才將文件的部分頁(yè)加載到pagecache邓梅,那效率將會(huì)是多么的低下。MappedFile初始化的操作是由單獨(dú)的線程(AllocateMappedFileService)實(shí)現(xiàn)的邑滨,就是對(duì)應(yīng)的生產(chǎn)消費(fèi)模型日缨。RocketMq在初始化MappedFile時(shí)做了內(nèi)存預(yù)熱,事先向page cache 中寫入一些數(shù)據(jù)flush到磁盤掖看,使整個(gè)文件都加載到page cache中匣距。

MappedByteBuffer技術(shù)分析

MappedByteBuffer繼承自ByteBuffer,其內(nèi)部維護(hù)了一個(gè)邏輯地址變量—address哎壳。在建立映射關(guān)系時(shí)毅待,

MappedByteBuffer利用了JDK NIO的FileChannel類提供的map()方法把文件對(duì)象映射到虛擬內(nèi)存。

源碼中map()方法的實(shí)現(xiàn)耳峦,可以發(fā)現(xiàn)最終其通過調(diào)用native方法map0()完成文件對(duì)象的映射工作,同時(shí)使用Util.newMappedByteBuffer()方法初始化MappedByteBuffer實(shí)例焕毫,但最終返回的是DirectByteBuffer的實(shí)例蹲坷。

在Java程序中使用MappedByteBuffer的get()方法來(lái)獲取內(nèi)存數(shù)據(jù)是最終通過DirectByteBuffer.get()方法實(shí)現(xiàn)(底層通過unsafe.getByte()方法驶乾,以“地址 + 偏移量”的方式獲取指定映射至內(nèi)存中的數(shù)據(jù))。

使用Mmap的限制
  • mmap映射的內(nèi)存空間釋放的問題

由于映射的內(nèi)存空間本身就不屬于JVM的堆內(nèi)存區(qū)(Java Heap)循签,因此其不受JVM GC的控制级乐,卸載這部分內(nèi)存空間需要通過系統(tǒng)調(diào)用unmap()方法來(lái)實(shí)現(xiàn)。

然而unmap()方法是FileChannelImpl類里實(shí)現(xiàn)的私有方法县匠,無(wú)法直接顯示調(diào)用风科。RocketMQ中的做法是,通過Java反射的方式調(diào)用“sun.misc”包下的Cleaner類的clean()方法來(lái)釋放映射占用的內(nèi)存空間乞旦;

  • MappedByteBuffer內(nèi)存映射大小限制

因?yàn)槠湔加玫氖翘摂M內(nèi)存(非JVM的堆內(nèi)存)贼穆,大小不受JVM的-Xmx參數(shù)限制,但其大小也受到OS虛擬內(nèi)存大小的限制兰粉。一般來(lái)說(shuō)故痊,一次只能映射1.5~2G 的文件至用戶態(tài)的虛擬內(nèi)存空間,RocketMQ默認(rèn)設(shè)置單個(gè)CommitLog日志數(shù)據(jù)文件為1G的原因了玖姑;

  • 使用MappedByteBuffe的其他問題

會(huì)存在內(nèi)存占用率較高和文件關(guān)閉不確定性的問題愕秫;

OS的PageCache機(jī)制

PageCache是OS對(duì)文件的緩存,用于加速對(duì)文件的讀寫焰络。程序?qū)ξ募M(jìn)行順序讀寫的速度幾乎接近于內(nèi)存的讀寫訪問戴甩,這里的主要原因就是在于OS使用PageCache機(jī)制對(duì)讀寫訪問操作進(jìn)行了性能優(yōu)化,將一部分的內(nèi)存用作PageCache闪彼。

對(duì)于數(shù)據(jù)文件的讀取

如果一次讀取文件時(shí)出現(xiàn)未命中PageCache的情況甜孤,OS從物理磁盤上訪問讀取文件的同時(shí),會(huì)順序?qū)ζ渌噜弶K的數(shù)據(jù)文件進(jìn)行預(yù)讀取备蚓。這樣课蔬,只要下次訪問的文件已經(jīng)被加載至PageCache時(shí),讀取操作的速度基本等于訪問內(nèi)存郊尝。

對(duì)于數(shù)據(jù)文件的寫入

OS會(huì)先寫入至Cache內(nèi)二跋,隨后通過異步的方式由pdflush內(nèi)核線程將Cache內(nèi)的數(shù)據(jù)刷盤至物理磁盤上。

對(duì)于文件的順序讀寫操作來(lái)說(shuō)流昏,讀和寫的區(qū)域都在OS的PageCache內(nèi)扎即,此時(shí)讀寫性能接近于內(nèi)存。

  • RocketMQ的大致做法是况凉,將數(shù)據(jù)文件映射到OS的虛擬內(nèi)存中(通過JDK NIO的MappedByteBuffer)谚鄙,寫消息的時(shí)候首先寫入PageCache,并通過異步刷盤的方式將消息批量的做持久化(同時(shí)也支持同步刷盤)刁绒;

  • 訂閱消費(fèi)消息時(shí)(對(duì)CommitLog操作是隨機(jī)讀让朴),由于PageCache的局部性熱點(diǎn)原理且整體情況下還是從舊到新的有序讀,因此大部分情況下消息還是可以直接從Page Cache中讀取傻盟,不會(huì)產(chǎn)生太多的缺頁(yè)(Page Fault)中斷而從磁盤讀取速蕊。

image

PageCache機(jī)制也不是完全無(wú)缺點(diǎn)的,當(dāng)遇到OS進(jìn)行臟頁(yè)回寫娘赴,內(nèi)存回收规哲,內(nèi)存swap等情況時(shí),就會(huì)引起較大的消息讀寫延遲诽表。

對(duì)于這些情況唉锌,RocketMQ采用了多種優(yōu)化技術(shù),比如內(nèi)存預(yù)分配竿奏,文件預(yù)熱袄简,mlock系統(tǒng)調(diào)用等,來(lái)保證在最大可能地發(fā)揮PageCache機(jī)制優(yōu)點(diǎn)的同時(shí)议双,盡可能地減少其缺點(diǎn)帶來(lái)的消息讀寫延遲痘番。

RocketMQ存儲(chǔ)優(yōu)化技術(shù)

RocketMQ存儲(chǔ)層采用的幾項(xiàng)優(yōu)化技術(shù)方案在一定程度上可以減少PageCache的缺點(diǎn)帶來(lái)的影響,主要包括內(nèi)存預(yù)分配平痰,文件預(yù)熱和mlock系統(tǒng)調(diào)用汞舱。

預(yù)先分配MappedFile

在消息寫入過程中(調(diào)用CommitLog的putMessage()方法),CommitLog會(huì)先從MappedFileQueue隊(duì)列中獲取一個(gè) MappedFile宗雇,如果沒有就新建一個(gè)昂芜。

MappedFile的創(chuàng)建過程是將構(gòu)建好的一個(gè)AllocateRequest請(qǐng)求(具體做法是,將下一個(gè)文件的路徑赔蒲、下下個(gè)文件的路徑泌神、文件大小為參數(shù)封裝為AllocateRequest對(duì)象)添加至隊(duì)列中,后臺(tái)運(yùn)行的AllocateMappedFileService服務(wù)線程(在Broker啟動(dòng)時(shí)舞虱,該線程就會(huì)創(chuàng)建并運(yùn)行)欢际,會(huì)不停地run,只要請(qǐng)求隊(duì)列里存在請(qǐng)求矾兜,就會(huì)去執(zhí)行MappedFile映射文件的創(chuàng)建和預(yù)分配工作损趋。

分配的時(shí)候有兩種策略,

一種是使用Mmap的方式來(lái)構(gòu)建MappedFile實(shí)例椅寺,另外一種是從TransientStorePool堆外內(nèi)存池中獲取相應(yīng)的DirectByteBuffer來(lái)構(gòu)建MappedFile浑槽。并且,在創(chuàng)建分配完下個(gè)MappedFile后返帕,還會(huì)將下下個(gè)MappedFile預(yù)先創(chuàng)建并保存至請(qǐng)求隊(duì)列中等待下次獲取時(shí)直接返回桐玻。RocketMQ中預(yù)分配MappedFile的設(shè)計(jì)非常巧妙,下次獲取時(shí)候直接返回就可以不用等待MappedFile創(chuàng)建分配所產(chǎn)生的時(shí)間延遲荆萤。

文件預(yù)熱

預(yù)熱的目的主要有兩點(diǎn)镊靴;

  • 第一點(diǎn),由于僅分配內(nèi)存并進(jìn)行mlock系統(tǒng)調(diào)用后并不會(huì)為程序完全鎖定這些內(nèi)存,因?yàn)槠渲械姆猪?yè)可能是寫時(shí)復(fù)制的偏竟。因此算行,就有必要對(duì)每個(gè)內(nèi)存頁(yè)面中寫入一個(gè)假的值。其中苫耸,RocketMQ是在創(chuàng)建并分配MappedFile的過程中,預(yù)先寫入一些隨機(jī)值至Mmap映射出的內(nèi)存空間里儡陨。

  • 第二褪子,調(diào)用Mmap進(jìn)行內(nèi)存映射后,OS只是建立虛擬內(nèi)存地址至物理地址的映射表骗村,而實(shí)際并沒有加載任何文件至內(nèi)存中嫌褪。程序要訪問數(shù)據(jù)時(shí)OS會(huì)檢查該部分的分頁(yè)是否已經(jīng)在內(nèi)存中,如果不在胚股,則發(fā)出一次缺頁(yè)中斷笼痛。這里,可以想象下1G的CommitLog需要發(fā)生多少次缺頁(yè)中斷琅拌,才能使得對(duì)應(yīng)的數(shù)據(jù)才能完全加載至物理內(nèi)存中缨伊。

RocketMQ的做法是,在做Mmap內(nèi)存映射的同時(shí)進(jìn)行madvise系統(tǒng)調(diào)用进宝,目的是使OS做一次內(nèi)存映射后對(duì)應(yīng)的文件數(shù)據(jù)盡可能多的預(yù)加載至內(nèi)存中刻坊,從而達(dá)到內(nèi)存預(yù)熱的效果。

public void warmMappedFile(FlushDiskType type, int pages) {
        long beginTime = System.currentTimeMillis();
        // mappedByteBuffer在java里面對(duì)應(yīng)了mmap的實(shí)現(xiàn)
        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        int flush = 0;
        long time = System.currentTimeMillis();
        for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
            byteBuffer.put(i, (byte) 0);
            // force flush when flush disk type is sync
            if (type == FlushDiskType.SYNC_FLUSH) {
                // 同步刷盤機(jī)制党晋,OS_PAGE_SIZE為4K
                if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                    flush = i;
                    mappedByteBuffer.force();
                }
            }

            // prevent gc
            if (j % 1000 == 0) {
                log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
                time = System.currentTimeMillis();
                try {
                    Thread.sleep(0);
                } catch (InterruptedException e) {
                    log.error("Interrupted", e);
                }
            }
        }

        // force flush when prepare load finished
        if (type == FlushDiskType.SYNC_FLUSH) {
            log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
                this.getFileName(), System.currentTimeMillis() - beginTime);
            mappedByteBuffer.force();
        }
        log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
            System.currentTimeMillis() - beginTime);
        // 將page cache 這片內(nèi)存鎖定
        this.mlock();
    }

mlock 內(nèi)存鎖定

  • OS在內(nèi)存充足的情況下谭胚,會(huì)將文件加載到 page cache 提高文件的讀寫效率,但是當(dāng)內(nèi)存不夠用時(shí)未玻,os會(huì)將page cache回收掉灾而。試想如果MappedFile對(duì)應(yīng)的pagecache 被os回收,那就又產(chǎn)生缺頁(yè)異常再次從磁盤加載到pagecache扳剿,會(huì)對(duì)系統(tǒng)性能產(chǎn)生很大的影響旁趟。

  • 將進(jìn)程使用的部分或者全部的地址空間鎖定在物理內(nèi)存中,防止其被交換到swap空間舞终。對(duì)于RocketMQ這種的高吞吐量的分布式消息隊(duì)列來(lái)說(shuō)轻庆,追求的是消息讀寫低延遲,那么肯定希望盡可能地多使用物理內(nèi)存敛劝,提高數(shù)據(jù)讀寫訪問的操作效率余爆。

RocketMq在創(chuàng)建完MappedFile并且內(nèi)存預(yù)熱完成后調(diào)用了c的mlock函數(shù)將這片內(nèi)存鎖定了,具體來(lái)看下是怎么實(shí)現(xiàn)的

// java 調(diào)用c
LibC INSTANCE = (LibC) Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
// 具體實(shí)現(xiàn)
public void mlock() {
        final long beginTime = System.currentTimeMillis();
        final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
        Pointer pointer = new Pointer(address);
        {
            int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
            log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
        }

        {
            int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
            log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
        }
}

RocketMQ刷盤機(jī)制

寫消息時(shí)是先寫入到pagecache夸盟,rocketMq提供了兩種刷盤機(jī)制蛾方,同步刷盤和異步刷盤,同步刷盤適用于對(duì)消息可靠性比較高的場(chǎng)合,同步刷盤性能比較低下桩砰,這樣即使系統(tǒng)宕機(jī)消息也不會(huì)丟失拓春。

image

同步刷盤

  • RocketMQ的Broker端才會(huì)真正地返回給Producer端一個(gè)成功的ACK響應(yīng)。同步刷盤對(duì)MQ消息可靠性來(lái)說(shuō)是一種不錯(cuò)的保障亚隅,但是性能上會(huì)有較大影響硼莽,一般適用于金融業(yè)務(wù)應(yīng)用領(lǐng)域。

  • RocketMQ同步刷盤的大致做法是煮纵,基于生產(chǎn)者消費(fèi)者模型懂鸵,主線程創(chuàng)建刷盤請(qǐng)求實(shí)例—GroupCommitRequest并在放入刷盤寫隊(duì)列后喚醒同步刷盤線程—GroupCommitService,來(lái)執(zhí)行刷盤動(dòng)作(其中用了CAS變量和CountDownLatch來(lái)保證線程間的同步)行疏。RocketMQ中用讀寫雙緩存隊(duì)列(requestsWrite/requestsRead)來(lái)實(shí)現(xiàn)讀寫分離匆光,其帶來(lái)的好處在于內(nèi)部消費(fèi)生成的同步刷盤請(qǐng)求可以不用加鎖,提高并發(fā)度酿联。

  • 刷盤線程從阻塞隊(duì)列中獲取终息,刷盤其實(shí)就是調(diào)用了mappedByteBuffer.force()方法,刷盤成功后通過countdownlatch喚醒刷盤等待的線程贞让,原理很簡(jiǎn)單>


public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        // 同步刷盤
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
           // 對(duì)應(yīng)一個(gè)單獨(dú)的線程
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
                 // GroupCommitRequest 封裝了CountDownLatch周崭,GroupCommitService刷盤完畢后喚醒等待線程
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: " + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
                service.wakeup();
            }
        }
        // 異步刷盤
        else {
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                flushCommitLogService.wakeup();
            } else {
                commitLogService.wakeup();
            }
        }
    }

異步刷盤

  • 異步刷盤原理 發(fā)送消息線程寫到pagecache成功之后就返回,消息保存在page cache 中喳张,異步刷盤對(duì)應(yīng)了一個(gè)單獨(dú)線程休傍,刷盤默認(rèn)一次刷4個(gè)pageSize,也就是16k的數(shù)據(jù)蹲姐。異步刷盤有可能會(huì)丟失數(shù)據(jù)磨取,當(dāng)jvm程序死掉 但機(jī)器沒有宕機(jī),pagecache中的臟頁(yè)還是能人工刷到磁盤的柴墩,但是當(dāng)機(jī)器宕機(jī)之后忙厌,數(shù)據(jù)就永遠(yuǎn)丟失了。

  • 能夠充分利用OS的PageCache的優(yōu)勢(shì)江咳,只要消息寫入PageCache即可將成功的ACK返回給Producer端逢净。消息刷盤采用后臺(tái)異步線程提交的方式進(jìn)行,降低了讀寫延遲歼指,提高了MQ的性能和吞吐量爹土。異步和同步刷盤的區(qū)別在于,異步刷盤時(shí)踩身,主線程并不會(huì)阻塞胀茵,在將刷盤線程wakeup后,就會(huì)繼續(xù)執(zhí)行挟阻。

RocketMQ的堆外存儲(chǔ)機(jī)制

  • RocketMq提供了堆外內(nèi)存池機(jī)制即TransientStorePool琼娘,TransientStorePool初始化時(shí)實(shí)例化5個(gè)堆外內(nèi)存峭弟,大小和MappedFile的大小1G,然后mlock鎖定此內(nèi)存區(qū)域脱拼。

  • 發(fā)送消息時(shí)如果開啟了堆外內(nèi)存機(jī)制瞒瘸,MappedFile在實(shí)例化時(shí)從堆外內(nèi)存池中獲取一個(gè)directBuffer實(shí)例,寫消息先寫到堆外內(nèi)存中熄浓,然后有單獨(dú)的線程(CommitRealTimeService)刷到pagecache情臭,之后再由單獨(dú)的線程(FlushRealTimeService)從pagecahce刷到磁盤。

開啟堆外內(nèi)存池的好處:寫消息時(shí)先寫到堆外內(nèi)存赌蔑,純內(nèi)存操作非郴驯快。讀消息時(shí)是從pagecache中讀惯雳,相當(dāng)于實(shí)現(xiàn)了讀寫分離,但是會(huì)存在延時(shí)性機(jī)制問題鸿摇,以及對(duì)外內(nèi)存宕機(jī)了會(huì)丟失石景,數(shù)據(jù)一致性會(huì)存在問題。

image
image

消息生產(chǎn)

所有發(fā)送消息的線程是串行執(zhí)行的拙吉,所有topic的數(shù)據(jù)放一塊順序?qū)懙絧agecache中潮孽,因此效率十分的高。在寫 page cache 成功后筷黔,再由單獨(dú)的線程異步構(gòu)建consumerQueue和 indexFile(基于磁盤實(shí)現(xiàn)的hashMap往史,實(shí)現(xiàn)消息的查找),構(gòu)建完成consumerQueue成功后 consumer 就能消費(fèi)到最新的消息了佛舱,當(dāng)然構(gòu)建consumerQueue也是順序?qū)懽道看沃粚懭?0個(gè)字節(jié),占用的空間也不大请祖。

消息消費(fèi)

每個(gè)topic可以對(duì)應(yīng)多個(gè)consumerQueue订歪,就相當(dāng)于kafka里面的分區(qū)概念,Rocketmq里面的消費(fèi)者與consumerQueue的分配算法和kafka的相似肆捕。由于consumerQueue中只保存了消息在commitLog中的offset刷晋、msgSize、tagsCode慎陵,因此需要拿到offset去commitlog中把這條消息撈出來(lái)眼虱,這時(shí)候讀相當(dāng)與隨機(jī)讀。

注意席纽,由前面的mlock內(nèi)存鎖定再加上消費(fèi)的數(shù)據(jù)一般是最近生產(chǎn)的捏悬,數(shù)據(jù)還在pagecache中,對(duì)性能的影響也不大润梯,當(dāng)consumer消費(fèi)很遠(yuǎn)的數(shù)據(jù)時(shí)邮破,pagecache中肯定是沒有緩存的诈豌,這時(shí)候rocketMq建議consumer去slave上讀

總結(jié)

RocketMq所有topic共用一個(gè)commitLog,磁盤順序?qū)懯愫停@一點(diǎn)實(shí)現(xiàn)也是參考了kafka矫渔,讀消息時(shí)根據(jù)consumerQueue去commitLog中吧數(shù)據(jù)撈出來(lái),雖然是隨機(jī)讀摧莽,但是最新的數(shù)據(jù)一般在pagecahce中也無(wú)關(guān)緊要庙洼。使用內(nèi)存鎖定避免內(nèi)存swap交換,堆外內(nèi)存和pagecache的讀寫分離镊辕。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末油够,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子征懈,更是在濱河造成了極大的恐慌石咬,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,607評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件卖哎,死亡現(xiàn)場(chǎng)離奇詭異鬼悠,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)亏娜,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,239評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門焕窝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人维贺,你說(shuō)我怎么就攤上這事它掂。” “怎么了溯泣?”我有些...
    開封第一講書人閱讀 164,960評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵虐秋,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我垃沦,道長(zhǎng)熟妓,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,750評(píng)論 1 294
  • 正文 為了忘掉前任栏尚,我火速辦了婚禮起愈,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘译仗。我一直安慰自己抬虽,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,764評(píng)論 6 392
  • 文/花漫 我一把揭開白布纵菌。 她就那樣靜靜地躺著阐污,像睡著了一般。 火紅的嫁衣襯著肌膚如雪咱圆。 梳的紋絲不亂的頭發(fā)上笛辟,一...
    開封第一講書人閱讀 51,604評(píng)論 1 305
  • 那天功氨,我揣著相機(jī)與錄音,去河邊找鬼手幢。 笑死捷凄,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的围来。 我是一名探鬼主播跺涤,決...
    沈念sama閱讀 40,347評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼监透!你這毒婦竟也來(lái)了桶错?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,253評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤胀蛮,失蹤者是張志新(化名)和其女友劉穎院刁,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體粪狼,經(jīng)...
    沈念sama閱讀 45,702評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡退腥,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,893評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了鸳玩。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,015評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡演闭,死狀恐怖不跟,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情米碰,我是刑警寧澤窝革,帶...
    沈念sama閱讀 35,734評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站吕座,受9級(jí)特大地震影響虐译,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜吴趴,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,352評(píng)論 3 330
  • 文/蒙蒙 一漆诽、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧锣枝,春花似錦厢拭、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,934評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至陨闹,卻和暖如春楞捂,著一層夾襖步出監(jiān)牢的瞬間薄坏,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,052評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工寨闹, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留胶坠,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,216評(píng)論 3 371
  • 正文 我出身青樓鼻忠,卻偏偏與公主長(zhǎng)得像涵但,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子帖蔓,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,969評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容