系列
broker的消息存儲做了那些事
????rocketMq的broker消息存儲主要包括3個部分立砸,分別commitLog的存儲涵卵,consumeQueue的存儲泄朴,index的存儲,這章分享會把這三個過程分解清楚间螟,同時會對里面涉及的存儲位置的偏移量著重講解清楚肉迫。
? ? 1、commitLog的存儲是producer發(fā)送消息給broker端broker同步處理的
? ? 2甲喝、consumeQueue和index兩者存儲其實是一個定時任務(wù)從commitLog中獲取偏移量然后存儲過去的
? ? 3、consumeQueue和index的存儲 與 commitLog的存儲是隔離開的铛只,非同步的
broker的消息存儲過程
說明:分享自再說rocketmq消息存儲埠胖。
? ? 1糠溜、commitLog其實有兩層夠?qū)樱渲蠱appendFileQueue是邏輯的存儲隊列概念直撤,里面保存著順序增長的MappedFile文件非竿。
? ? 2、MappedFile文件是真正存儲實際數(shù)據(jù)的文件
? ? 3谋竖、在整個broker的存儲體系中红柱,MappedFile文件保存了commitLog、consumeQueue蓖乘、Index等锤悄,是核心的數(shù)據(jù)結(jié)構(gòu)。
broker的消息存儲過程
說明:參見BrokerController類
? ? 1嘉抒、其中sendProcessor就是broker端接收message的入口函數(shù)
說明:參見SendMessageProcessor類
? ? 1铁蹈、這里我們先看下單個消息的處理過程,也就是sendMessage過程
說明:參見SendMessageProcessor類
? ? 1众眨、putMessage函數(shù)開始執(zhí)行數(shù)據(jù)的保存
說明:參見DefaultMessageStore類
? ? 1握牧、commitLog.putMessage開始進(jìn)入commitLog的保存邏輯
說明:參見CommitLog類
????1、先從mappedFileQueue獲取最后一個MappedFile文件娩梨,如果為空就創(chuàng)建一個commitLog對應(yīng)的MappedFile文件沿腰,文件命名以實際以文件大小命名,分別是00000000000000000000狈定、00000000001073741824颂龙、00000000002147483648,文件名之間差1G=1024*1024*1224B=1073741824纽什。每個commitLog的MappedFile文件大小是1G措嵌,剩余多余無非存入新消息就用填充字符填充到1G。
????2芦缰、mappedFile.appendMessage執(zhí)行保存消息到MappedFile的動作
說明:參見MappedFile類
? ? 1企巢、這里我們以單個消息存儲為例繼續(xù)說明
說明:參見CommitLog類
? ? 1、做一些前置準(zhǔn)備让蕾,包括計算下一個存儲位置等
說明:參見CommitLog類
? ? 1浪规、這里有個特別的地方需要注意,就是如果剩余空間無法裝下消息+8個字節(jié)的結(jié)束標(biāo)識符探孝,就默認(rèn)結(jié)束了笋婿,結(jié)束標(biāo)識符應(yīng)該用于標(biāo)識mappedFile是否結(jié)束。
說明:參見CommitLog類
? ? 1顿颅、這里的消息寫入過程我們分析暫時到寫入緩存就結(jié)束了缸濒,真正實際上是還會有刷盤的動作的,這里暫時不展開分析,后續(xù)單獨開一章刷盤的的分析庇配。
說明:
? ? commitLog的MappedFile文件中保存的數(shù)據(jù)格式如上圖所示斩跌,在末尾不能保存整個消息的時候就會重新生成一個MappedFile文件,當(dāng)然在末尾應(yīng)該會有填充結(jié)束標(biāo)識符讨永,文件結(jié)束符是以特殊magic_code結(jié)束的,為BLANK_MAGIC_CODE=0xBBCCDDEE ^1880681586 +8;
comsumeQueue的存儲過程
說明:參見DefaultMessageStore類
? ? 1遇革、該實現(xiàn)類是一個線程函數(shù)卿闹,內(nèi)部通過run操作循環(huán)去commitLog去消息位移信息保存到consumeQueue當(dāng)中
說明:參見DefaultMessageStore類
說明:參見DefaultMessageStore類
? ? 1、this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
? ? ?2萝快、this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
說明:參見ConsumeQueue類
? ? 1锻霎、ConsumeQueue的消息單元格式如下圖。