RocketMQ第五講

broker是RockerMQ用來(lái)存儲(chǔ)消息的服務(wù),是RocketMQ的核心組件。NameServer和Producer都比較簡(jiǎn)單,Consumer和Broker都是比較復(fù)雜的。

從Client-Server視角來(lái)看,NameServer屬于Server踏施,Consumer和Producer都屬于Client,而B(niǎo)roker兩者兼具罕邀。Broker是Consumer和Producer的Server畅形,同時(shí)又是NameServer的Client。

此篇文檔是RocketMQ分析的第五篇文檔诉探,介紹Broker日熬。下一篇將對(duì)照RocketMQ官方文檔,介紹一下RocketMQ中設(shè)計(jì)到的一些概念性的知識(shí)阵具。

rocketmq_architecture_3.png

核心業(yè)務(wù)及架構(gòu)設(shè)計(jì)

broker是RocketMQ的核心碍遍,核心工作就是接收生成這的消息定铜,進(jìn)行存儲(chǔ)。同時(shí)怕敬,收到消費(fèi)者的請(qǐng)求后揣炕,從磁盤讀取內(nèi)容,把結(jié)果返回給消費(fèi)者东跪。

CommitLog

消息主體以及元數(shù)據(jù)的存儲(chǔ)主體畸陡,存儲(chǔ)Producer端寫入的消息主體內(nèi)容,消息內(nèi)容不是定長(zhǎng)的。單個(gè)文件大小默認(rèn)1G 虽填,文件名長(zhǎng)度為20位丁恭,左邊補(bǔ)零,剩余為起始偏移量斋日,比如00000000000000000000代表了第一個(gè)文件牲览,起始偏移量為0,文件大小為1G=1073741824恶守;當(dāng)?shù)谝粋€(gè)文件寫滿了第献,第二個(gè)文件為00000000001073741824,起始偏移量為1073741824兔港,以此類推庸毫。消息主要是順序?qū)懭肴罩疚募?dāng)文件滿了衫樊,寫入下一個(gè)文件飒赃;

CommitLog文件中保存了消息的全量?jī)?nèi)容。不同的Topic的消息科侈,在CommitLog都是順序存放的载佳。就是來(lái)一個(gè)消息,不管Topic是什么兑徘,直接追加的CommitLog中刚盈。

broker啟動(dòng)了一個(gè)專門的線程來(lái)構(gòu)建索引,把CommitLog中的消息挂脑,構(gòu)建了兩種類型的索引。ConsumerQueue和Index欲侮。正常消費(fèi)的時(shí)候崭闲,是根據(jù)Topic來(lái)消費(fèi),會(huì)用到ConsumerQueue索引威蕉。

rocketmq_design_11.png

也可根據(jù)返回的offsetMsgId刁俭,解析出ip,端口和CommitLog中的物理消息偏移量韧涨,直接去CommitLog中取數(shù)據(jù)牍戚。

ConsumerQueue

引入的目的主要是提高消息消費(fèi)的性能侮繁,由于RocketMQ是基于主題topic的訂閱模式,消息消費(fèi)是針對(duì)主題進(jìn)行的如孝,如果要遍歷commitlog文件中根據(jù)topic檢索消息是非常低效的宪哩。Consumer即可根據(jù)ConsumeQueue來(lái)查找待消費(fèi)的消息。

rocketmq_design_7.png

其中,ConsumeQueue(邏輯消費(fèi)隊(duì)列)作為消費(fèi)消息的索引,保存了指定Topic下的隊(duì)列消息在CommitLog中的起始物理偏移量offset芭届,消息大小size和消息Tag的HashCode值惭嚣。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結(jié)構(gòu)遍搞,具體存儲(chǔ)路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣consumequeue文件采取定長(zhǎng)設(shè)計(jì),每一個(gè)條目共20個(gè)字節(jié)圆恤,分別為8字節(jié)的commitlog物理偏移量、4字節(jié)的消息長(zhǎng)度腔稀、8字節(jié)tag hashcode盆昙,單個(gè)文件由30W個(gè)條目組成,可以像數(shù)組一樣隨機(jī)訪問(wèn)每一個(gè)條目烧颖,每個(gè)ConsumeQueue文件大小約5.72M弱左。

rocketmq_design_1.png

Index

IndexFile(索引文件)提供了一種可以通過(guò)key或時(shí)間區(qū)間來(lái)查詢消息的方法。Index文件的存儲(chǔ)位置是:HOME \store\index{fileName}炕淮,文件名fileName是以創(chuàng)建時(shí)的時(shí)間戳命名的拆火,固定的單個(gè)IndexFile文件大小約為400M,一個(gè)IndexFile可以保存 2000W個(gè)索引涂圆,IndexFile的底層存儲(chǔ)設(shè)計(jì)為在文件系統(tǒng)中實(shí)現(xiàn)HashMap結(jié)構(gòu)们镜,故rocketmq的索引文件其底層實(shí)現(xiàn)為hash索引。

rocketmq_design_13.png

按照Message Key查詢消息的時(shí)候润歉,會(huì)用到這個(gè)索引文件模狭。

IndexFile索引文件為用戶提供通過(guò)“按照Message Key查詢消息”的消息索引查詢服務(wù),IndexFile文件的存儲(chǔ)位置是:HOME\store\index{fileName}踩衩,文件名fileName是以創(chuàng)建時(shí)的時(shí)間戳命名的嚼鹉,文件大小是固定的,等于40+500W4+2000W20= 420000040個(gè)字節(jié)大小驱富。如果消息的properties中設(shè)置了UNIQ_KEY這個(gè)屬性锚赤,就用 topic + “#” + UNIQ_KEY的value作為 key 來(lái)做寫入操作。如果消息設(shè)置了KEYS屬性(多個(gè)KEY以空格分隔)褐鸥,也會(huì)用 topic + “#” + KEY 來(lái)做索引线脚。

其中的索引數(shù)據(jù)包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 這四個(gè)字段,一共20 Byte。NextIndex offset 即前面讀出來(lái)的 slotValue浑侥,如果有 hash沖突姊舵,就可以用這個(gè)字段將所有沖突的索引用鏈表的方式串起來(lái)了。Timestamp記錄的是消息storeTimestamp之間的差寓落,并不是一個(gè)絕對(duì)的時(shí)間括丁。整個(gè)Index File的結(jié)構(gòu)如圖,40 Byte 的Header用于保存一些總的統(tǒng)計(jì)信息零如,4500W的 Slot Table并不保存真正的索引數(shù)據(jù)躏将,而是保存每個(gè)槽位對(duì)應(yīng)的單向鏈表的頭。202000W 是真正的索引數(shù)據(jù)考蕾,即一個(gè) Index File 可以保存 2000W個(gè)索引祸憋。

“按照Message Key查詢消息”的方式,RocketMQ的具體做法是肖卧,主要通過(guò)Broker端的QueryMessageProcessor業(yè)務(wù)處理器來(lái)查詢蚯窥,讀取消息的過(guò)程就是用topic和key找到IndexFile索引文件中的一條記錄,根據(jù)其中的commitLog offset從CommitLog文件中讀取消息的實(shí)體內(nèi)容塞帐。

接口和類圖

RocketMQ中有兩個(gè)核心模塊拦赠,remoting模塊和store模塊。remoting模塊在NameServer葵姥,Produce荷鼠,Consumer和Broker都用到。store只在Broker中用到榔幸,包含了存儲(chǔ)文件操作的API允乐,對(duì)消息實(shí)體的操作是通過(guò)DefaultMessageStore進(jìn)行操作。

store模塊

  • DefaultMessageStore

屬性和方法很多削咆,就不往這里放了牍疏。

  • CommitLog

文件存儲(chǔ)實(shí)現(xiàn)類,包括多個(gè)內(nèi)部類

  • MappedFile

· 對(duì)于文件夾下的一個(gè)文件

image.png

borker模塊

  • SendMessageProcessor
  • PullMessageProcessor
  • QueryMessageProcessor
  • BrokerController:核心控制器
image.png

remoting模塊

  • NettyRemotingClient
  • NettyRemotingServer
image.png

啟動(dòng)流程

實(shí)例化BrokerController

  • 實(shí)例化brokerConfig
  • 實(shí)例化nettyServerConfig
  • 實(shí)例化nettyClientConfig
  • 實(shí)例化messageStoreConfig
  • 實(shí)例化brokerController拨齐,并初始化
  1. 從磁盤加載文件到內(nèi)存:topicConfigManager鳞陨、consumerOffsetManager、subscriptionGroupManager瞻惋、consumerFilterManager厦滤、messageStore。
  2. 實(shí)例化remotingServer
  3. 實(shí)例化各種線程池歼狼,sendMessageExecutor馁害、pullMessageExecutor、replyMessageExecutor蹂匹、queryMessageExecutor、adminBrokerExecutor凹蜈、clientManageExecutor限寞、heartbeatExecutor忍啸、endTransactionExecutor、consumerManageExecutor履植。
  4. 調(diào)用registerProcessor()方法计雌,注冊(cè)各種線程池到各種對(duì)于的process上,process包括:SendMessageProcessor玫霎、PullMessageProcessor凿滤、ReplyMessageProcessor、QueryMessageProcessor庶近、ClientManageProcessor翁脆、ConsumerManageProcessor、EndTransactionProcessor鼻种、AdminBrokerProcessor反番。
  5. 啟動(dòng)一些定時(shí)任務(wù)
  • 啟動(dòng)各種服務(wù)
  • messageStore
    reputMessageService:構(gòu)建索引線程
    flushConsumeQueueService
    flushCommitLogService:內(nèi)存數(shù)據(jù)刷到磁盤服務(wù)
    storeStatsService
  • remotingServer:NettyServer服務(wù)
  • brokerOuterAPI:與NameServer通信的NettyClient服務(wù)
  • pullRequestHoldService
  • 啟動(dòng)定時(shí)任務(wù)

線程模型

上面介紹了broker的核心業(yè)務(wù)流程和架構(gòu),關(guān)鍵接口和類叉钥,啟動(dòng)流程罢缸。最后介紹一下broker的線程模型,只有知道了線程模型投队,才能大概知道前面介紹的那些事如何協(xié)同工作的枫疆,對(duì)broker才能有一個(gè)立體的認(rèn)識(shí)。

RocketMQ的RPC通信采用Netty組件作為底層通信庫(kù)敷鸦,同樣也遵循了Reactor多線程模型息楔,同時(shí)又在這之上做了一些擴(kuò)展和優(yōu)化。關(guān)于Reactor線程模型轧膘,可以看看我之前寫的這篇文檔:Reactor線程模型

rocketmq_design_6.png

上面的框圖中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多線程模型钞螟。一個(gè) Reactor 主線程(eventLoopGroupBoss,即為上面的1)負(fù)責(zé)監(jiān)聽(tīng) TCP網(wǎng)絡(luò)連接請(qǐng)求谎碍,建立好連接鳞滨,創(chuàng)建SocketChannel,并注冊(cè)到selector上蟆淀。RocketMQ的源碼中會(huì)自動(dòng)根據(jù)OS的類型選擇NIO和Epoll拯啦,也可以通過(guò)參數(shù)配置),然后監(jiān)聽(tīng)真正的網(wǎng)絡(luò)數(shù)據(jù)。拿到網(wǎng)絡(luò)數(shù)據(jù)后熔任,再丟給Worker線程池(eventLoopGroupSelector褒链,即為上面的“N”,源碼中默認(rèn)設(shè)置為3)疑苔,在真正執(zhí)行業(yè)務(wù)邏輯之前需要進(jìn)行SSL驗(yàn)證甫匹、編解碼、空閑檢查、網(wǎng)絡(luò)連接管理兵迅,這些工作交給defaultEventExecutorGroup(即為上面的“M1”抢韭,源碼中默認(rèn)設(shè)置為8)去做。而處理業(yè)務(wù)操作放在業(yè)務(wù)線程池中執(zhí)行恍箭,根據(jù) RomotingCommand 的業(yè)務(wù)請(qǐng)求碼code去processorTable這個(gè)本地緩存變量中找到對(duì)應(yīng)的 processor刻恭,然后封裝成task任務(wù)后,提交給對(duì)應(yīng)的業(yè)務(wù)processor處理線程池來(lái)執(zhí)行(sendMessageExecutor扯夭,以發(fā)送消息為例鳍贾,即為上面的 “M2”)。

上面的圖和這段畫是從官方文檔抄過(guò)來(lái)的交洗,但是文字和圖對(duì)應(yīng)的不是很好骑科,畫的也不夠詳細(xì),但是主要流程是這個(gè)樣子藕筋。以后有時(shí)間了纵散,我重新安裝自己的理解,畫一張更詳細(xì)的圖隐圾。

備注

NameSev線程說(shuō)明

AsyncAppender-Worker-Thread-0:異步打印日志伍掀,logback使用,應(yīng)該是守護(hù)線程

FileWatchService:

NettyEventExecutor:

NettyNIOBoss_:一個(gè)

NettyServerNIOSelector_:默認(rèn)為三個(gè)

NSScheduledThread:定時(shí)任務(wù)線程

ServerHouseKeepingService:守護(hù)線程

ThreadDeathWatch-2-1:守護(hù)線程暇藏,Netty用蜜笤,已經(jīng)廢棄

RemotingExecutorThread(1-8):工作線程池,沒(méi)有共用NettyServerNIOSelector_盐碱,直接初始化8個(gè)線程

Broker線程說(shuō)明

logback日志異步線程

AsyncAppender-Worker-Thread-0:異步打印日志把兔,logback使用,共九個(gè):

RocketmqBrokerAppender_inner

RocketmqFilterAppender_inner

RocketmqProtectionAppender_inner

RocketmqRemotingAppender_inner

RocketmqRebalanceLockAppender_inner

RocketmqStoreAppender_inner

RocketmqStoreErrorAppender_inner

RocketmqWaterMarkAppender_inner

RocketmqTransactionAppender_inner

Processor使用線程

SendMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_MESSAGE

PullMessageThread_:remotingServer.registerProcessor(RequestCode.PULL_MESSAGE

ProcessReplyMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE

QueryMessageThread_:remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE

AdminBrokerThread_:remotingServer.registerDefaultProcessor

ClientManageThread_:remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT

HeartbeatThread_:remotingServer.registerProcessor(RequestCode.HEART_BEAT

EndTransactionThread_:remotingServer.registerProcessor(RequestCode.END_TRANSACTION

ConsumerManageThread_:remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,RequestCode.UPDATE_CONSUMER_OFFSET,RequestCode.QUERY_CONSUMER_OFFSET

RemotingClient線程

brokerOutApi_thread_:BrokerController.registerBrokerAll(true, false, true);

==================================================================

定時(shí)任務(wù)線程

BrokerControllerScheduledThread:=>

BrokerController.this.getBrokerStats().record();

BrokerController.this.consumerOffsetManager.persist();

BrokerController.this.consumerFilterManager.persist();

BrokerController.this.protectBroker();

BrokerController.this.printWaterMark();

log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());

BrokerController.this.brokerOuterAPI.fetchNameServerAddr();

BrokerController.this.printMasterAndSlaveDiff();

BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());

BrokerFastFailureScheduledThread:=>

FilterServerManagerScheduledThread:=>

FilterServerManager.this.createFilterServer();

ClientHousekeepingScheduledThread:=>

ClientHousekeepingService.this.scanExceptionChannel();

服務(wù)線程

PullRequestHoldService

FileWatchService

未知線程

AllocateMappedFileService

AcceptSocketService

BrokerStatsThread1

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末瓮顽,一起剝皮案震驚了整個(gè)濱河市县好,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌暖混,老刑警劉巖缕贡,帶你破解...
    沈念sama閱讀 222,807評(píng)論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異拣播,居然都是意外死亡晾咪,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,284評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門贮配,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)谍倦,“玉大人,你說(shuō)我怎么就攤上這事泪勒≈缰” “怎么了宴猾?”我有些...
    開(kāi)封第一講書人閱讀 169,589評(píng)論 0 363
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)曹洽。 經(jīng)常有香客問(wèn)我鳍置,道長(zhǎng),這世上最難降的妖魔是什么送淆? 我笑而不...
    開(kāi)封第一講書人閱讀 60,188評(píng)論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮怕轿,結(jié)果婚禮上偷崩,老公的妹妹穿的比我還像新娘。我一直安慰自己撞羽,他們只是感情好阐斜,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,185評(píng)論 6 398
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著诀紊,像睡著了一般谒出。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上邻奠,一...
    開(kāi)封第一講書人閱讀 52,785評(píng)論 1 314
  • 那天笤喳,我揣著相機(jī)與錄音,去河邊找鬼碌宴。 笑死杀狡,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的贰镣。 我是一名探鬼主播呜象,決...
    沈念sama閱讀 41,220評(píng)論 3 423
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼碑隆!你這毒婦竟也來(lái)了恭陡?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 40,167評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤上煤,失蹤者是張志新(化名)和其女友劉穎休玩,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體楼入,經(jīng)...
    沈念sama閱讀 46,698評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡哥捕,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,767評(píng)論 3 343
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了嘉熊。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片遥赚。...
    茶點(diǎn)故事閱讀 40,912評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖阐肤,靈堂內(nèi)的尸體忽然破棺而出凫佛,到底是詐尸還是另有隱情讲坎,我是刑警寧澤,帶...
    沈念sama閱讀 36,572評(píng)論 5 351
  • 正文 年R本政府宣布愧薛,位于F島的核電站晨炕,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏毫炉。R本人自食惡果不足惜瓮栗,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,254評(píng)論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望瞄勾。 院中可真熱鬧费奸,春花似錦、人聲如沸进陡。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 32,746評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)趾疚。三九已至缨历,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間糙麦,已是汗流浹背辛孵。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,859評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留喳资,地道東北人觉吭。 一個(gè)月前我還...
    沈念sama閱讀 49,359評(píng)論 3 379
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像仆邓,于是被迫代替她去往敵國(guó)和親鲜滩。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,922評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容