broker是RockerMQ用來(lái)存儲(chǔ)消息的服務(wù),是RocketMQ的核心組件。NameServer和Producer都比較簡(jiǎn)單,Consumer和Broker都是比較復(fù)雜的。
從Client-Server視角來(lái)看,NameServer屬于Server踏施,Consumer和Producer都屬于Client,而B(niǎo)roker兩者兼具罕邀。Broker是Consumer和Producer的Server畅形,同時(shí)又是NameServer的Client。
此篇文檔是RocketMQ分析的第五篇文檔诉探,介紹Broker日熬。下一篇將對(duì)照RocketMQ官方文檔,介紹一下RocketMQ中設(shè)計(jì)到的一些概念性的知識(shí)阵具。
rocketmq_architecture_3.png
核心業(yè)務(wù)及架構(gòu)設(shè)計(jì)
broker是RocketMQ的核心碍遍,核心工作就是接收生成這的消息定铜,進(jìn)行存儲(chǔ)。同時(shí)怕敬,收到消費(fèi)者的請(qǐng)求后揣炕,從磁盤讀取內(nèi)容,把結(jié)果返回給消費(fèi)者东跪。
CommitLog
消息主體以及元數(shù)據(jù)的存儲(chǔ)主體畸陡,存儲(chǔ)Producer端寫入的消息主體內(nèi)容,消息內(nèi)容不是定長(zhǎng)的。單個(gè)文件大小默認(rèn)1G 虽填,文件名長(zhǎng)度為20位丁恭,左邊補(bǔ)零,剩余為起始偏移量斋日,比如00000000000000000000代表了第一個(gè)文件牲览,起始偏移量為0,文件大小為1G=1073741824恶守;當(dāng)?shù)谝粋€(gè)文件寫滿了第献,第二個(gè)文件為00000000001073741824,起始偏移量為1073741824兔港,以此類推庸毫。消息主要是順序?qū)懭肴罩疚募?dāng)文件滿了衫樊,寫入下一個(gè)文件飒赃;
CommitLog文件中保存了消息的全量?jī)?nèi)容。不同的Topic的消息科侈,在CommitLog都是順序存放的载佳。就是來(lái)一個(gè)消息,不管Topic是什么兑徘,直接追加的CommitLog中刚盈。
broker啟動(dòng)了一個(gè)專門的線程來(lái)構(gòu)建索引,把CommitLog中的消息挂脑,構(gòu)建了兩種類型的索引。ConsumerQueue和Index欲侮。正常消費(fèi)的時(shí)候崭闲,是根據(jù)Topic來(lái)消費(fèi),會(huì)用到ConsumerQueue索引威蕉。
也可根據(jù)返回的offsetMsgId刁俭,解析出ip,端口和CommitLog中的物理消息偏移量韧涨,直接去CommitLog中取數(shù)據(jù)牍戚。
ConsumerQueue
引入的目的主要是提高消息消費(fèi)的性能侮繁,由于RocketMQ是基于主題topic的訂閱模式,消息消費(fèi)是針對(duì)主題進(jìn)行的如孝,如果要遍歷commitlog文件中根據(jù)topic檢索消息是非常低效的宪哩。Consumer即可根據(jù)ConsumeQueue來(lái)查找待消費(fèi)的消息。
其中,ConsumeQueue(邏輯消費(fèi)隊(duì)列)作為消費(fèi)消息的索引,保存了指定Topic下的隊(duì)列消息在CommitLog中的起始物理偏移量offset芭届,消息大小size和消息Tag的HashCode值惭嚣。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結(jié)構(gòu)遍搞,具體存儲(chǔ)路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣consumequeue文件采取定長(zhǎng)設(shè)計(jì),每一個(gè)條目共20個(gè)字節(jié)圆恤,分別為8字節(jié)的commitlog物理偏移量、4字節(jié)的消息長(zhǎng)度腔稀、8字節(jié)tag hashcode盆昙,單個(gè)文件由30W個(gè)條目組成,可以像數(shù)組一樣隨機(jī)訪問(wèn)每一個(gè)條目烧颖,每個(gè)ConsumeQueue文件大小約5.72M弱左。
Index
IndexFile(索引文件)提供了一種可以通過(guò)key或時(shí)間區(qū)間來(lái)查詢消息的方法。Index文件的存儲(chǔ)位置是:{fileName}炕淮,文件名fileName是以創(chuàng)建時(shí)的時(shí)間戳命名的拆火,固定的單個(gè)IndexFile文件大小約為400M,一個(gè)IndexFile可以保存 2000W個(gè)索引涂圆,IndexFile的底層存儲(chǔ)設(shè)計(jì)為在文件系統(tǒng)中實(shí)現(xiàn)HashMap結(jié)構(gòu)们镜,故rocketmq的索引文件其底層實(shí)現(xiàn)為hash索引。
按照Message Key查詢消息的時(shí)候润歉,會(huì)用到這個(gè)索引文件模狭。
IndexFile索引文件為用戶提供通過(guò)“按照Message Key查詢消息”的消息索引查詢服務(wù),IndexFile文件的存儲(chǔ)位置是:{fileName}踩衩,文件名fileName是以創(chuàng)建時(shí)的時(shí)間戳命名的嚼鹉,文件大小是固定的,等于40+500W4+2000W20= 420000040個(gè)字節(jié)大小驱富。如果消息的properties中設(shè)置了UNIQ_KEY這個(gè)屬性锚赤,就用 topic + “#” + UNIQ_KEY的value作為 key 來(lái)做寫入操作。如果消息設(shè)置了KEYS屬性(多個(gè)KEY以空格分隔)褐鸥,也會(huì)用 topic + “#” + KEY 來(lái)做索引线脚。
其中的索引數(shù)據(jù)包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 這四個(gè)字段,一共20 Byte。NextIndex offset 即前面讀出來(lái)的 slotValue浑侥,如果有 hash沖突姊舵,就可以用這個(gè)字段將所有沖突的索引用鏈表的方式串起來(lái)了。Timestamp記錄的是消息storeTimestamp之間的差寓落,并不是一個(gè)絕對(duì)的時(shí)間括丁。整個(gè)Index File的結(jié)構(gòu)如圖,40 Byte 的Header用于保存一些總的統(tǒng)計(jì)信息零如,4500W的 Slot Table并不保存真正的索引數(shù)據(jù)躏将,而是保存每個(gè)槽位對(duì)應(yīng)的單向鏈表的頭。202000W 是真正的索引數(shù)據(jù)考蕾,即一個(gè) Index File 可以保存 2000W個(gè)索引祸憋。
“按照Message Key查詢消息”的方式,RocketMQ的具體做法是肖卧,主要通過(guò)Broker端的QueryMessageProcessor業(yè)務(wù)處理器來(lái)查詢蚯窥,讀取消息的過(guò)程就是用topic和key找到IndexFile索引文件中的一條記錄,根據(jù)其中的commitLog offset從CommitLog文件中讀取消息的實(shí)體內(nèi)容塞帐。
接口和類圖
RocketMQ中有兩個(gè)核心模塊拦赠,remoting模塊和store模塊。remoting模塊在NameServer葵姥,Produce荷鼠,Consumer和Broker都用到。store只在Broker中用到榔幸,包含了存儲(chǔ)文件操作的API允乐,對(duì)消息實(shí)體的操作是通過(guò)DefaultMessageStore進(jìn)行操作。
store模塊
- DefaultMessageStore
屬性和方法很多削咆,就不往這里放了牍疏。
- CommitLog
文件存儲(chǔ)實(shí)現(xiàn)類,包括多個(gè)內(nèi)部類
- MappedFile
· 對(duì)于文件夾下的一個(gè)文件
borker模塊
- SendMessageProcessor
- PullMessageProcessor
- QueryMessageProcessor
- BrokerController:核心控制器
remoting模塊
- NettyRemotingClient
- NettyRemotingServer
啟動(dòng)流程
實(shí)例化BrokerController
- 實(shí)例化brokerConfig
- 實(shí)例化nettyServerConfig
- 實(shí)例化nettyClientConfig
- 實(shí)例化messageStoreConfig
- 實(shí)例化brokerController拨齐,并初始化
- 從磁盤加載文件到內(nèi)存:topicConfigManager鳞陨、consumerOffsetManager、subscriptionGroupManager瞻惋、consumerFilterManager厦滤、messageStore。
- 實(shí)例化remotingServer
- 實(shí)例化各種線程池歼狼,sendMessageExecutor馁害、pullMessageExecutor、replyMessageExecutor蹂匹、queryMessageExecutor、adminBrokerExecutor凹蜈、clientManageExecutor限寞、heartbeatExecutor忍啸、endTransactionExecutor、consumerManageExecutor履植。
- 調(diào)用registerProcessor()方法计雌,注冊(cè)各種線程池到各種對(duì)于的process上,process包括:SendMessageProcessor玫霎、PullMessageProcessor凿滤、ReplyMessageProcessor、QueryMessageProcessor庶近、ClientManageProcessor翁脆、ConsumerManageProcessor、EndTransactionProcessor鼻种、AdminBrokerProcessor反番。
- 啟動(dòng)一些定時(shí)任務(wù)
- 啟動(dòng)各種服務(wù)
- messageStore
reputMessageService:構(gòu)建索引線程
flushConsumeQueueService
flushCommitLogService:內(nèi)存數(shù)據(jù)刷到磁盤服務(wù)
storeStatsService - remotingServer:NettyServer服務(wù)
- brokerOuterAPI:與NameServer通信的NettyClient服務(wù)
- pullRequestHoldService
- 啟動(dòng)定時(shí)任務(wù)
線程模型
上面介紹了broker的核心業(yè)務(wù)流程和架構(gòu),關(guān)鍵接口和類叉钥,啟動(dòng)流程罢缸。最后介紹一下broker的線程模型,只有知道了線程模型投队,才能大概知道前面介紹的那些事如何協(xié)同工作的枫疆,對(duì)broker才能有一個(gè)立體的認(rèn)識(shí)。
RocketMQ的RPC通信采用Netty組件作為底層通信庫(kù)敷鸦,同樣也遵循了Reactor多線程模型息楔,同時(shí)又在這之上做了一些擴(kuò)展和優(yōu)化。關(guān)于Reactor線程模型轧膘,可以看看我之前寫的這篇文檔:Reactor線程模型
上面的框圖中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多線程模型钞螟。一個(gè) Reactor 主線程(eventLoopGroupBoss,即為上面的1)負(fù)責(zé)監(jiān)聽(tīng) TCP網(wǎng)絡(luò)連接請(qǐng)求谎碍,建立好連接鳞滨,創(chuàng)建SocketChannel,并注冊(cè)到selector上蟆淀。RocketMQ的源碼中會(huì)自動(dòng)根據(jù)OS的類型選擇NIO和Epoll拯啦,也可以通過(guò)參數(shù)配置),然后監(jiān)聽(tīng)真正的網(wǎng)絡(luò)數(shù)據(jù)。拿到網(wǎng)絡(luò)數(shù)據(jù)后熔任,再丟給Worker線程池(eventLoopGroupSelector褒链,即為上面的“N”,源碼中默認(rèn)設(shè)置為3)疑苔,在真正執(zhí)行業(yè)務(wù)邏輯之前需要進(jìn)行SSL驗(yàn)證甫匹、編解碼、空閑檢查、網(wǎng)絡(luò)連接管理兵迅,這些工作交給defaultEventExecutorGroup(即為上面的“M1”抢韭,源碼中默認(rèn)設(shè)置為8)去做。而處理業(yè)務(wù)操作放在業(yè)務(wù)線程池中執(zhí)行恍箭,根據(jù) RomotingCommand 的業(yè)務(wù)請(qǐng)求碼code去processorTable這個(gè)本地緩存變量中找到對(duì)應(yīng)的 processor刻恭,然后封裝成task任務(wù)后,提交給對(duì)應(yīng)的業(yè)務(wù)processor處理線程池來(lái)執(zhí)行(sendMessageExecutor扯夭,以發(fā)送消息為例鳍贾,即為上面的 “M2”)。
上面的圖和這段畫是從官方文檔抄過(guò)來(lái)的交洗,但是文字和圖對(duì)應(yīng)的不是很好骑科,畫的也不夠詳細(xì),但是主要流程是這個(gè)樣子藕筋。以后有時(shí)間了纵散,我重新安裝自己的理解,畫一張更詳細(xì)的圖隐圾。
備注
NameSev線程說(shuō)明
AsyncAppender-Worker-Thread-0:異步打印日志伍掀,logback使用,應(yīng)該是守護(hù)線程
FileWatchService:
NettyEventExecutor:
NettyNIOBoss_:一個(gè)
NettyServerNIOSelector_:默認(rèn)為三個(gè)
NSScheduledThread:定時(shí)任務(wù)線程
ServerHouseKeepingService:守護(hù)線程
ThreadDeathWatch-2-1:守護(hù)線程暇藏,Netty用蜜笤,已經(jīng)廢棄
RemotingExecutorThread(1-8):工作線程池,沒(méi)有共用NettyServerNIOSelector_盐碱,直接初始化8個(gè)線程
Broker線程說(shuō)明
logback日志異步線程
AsyncAppender-Worker-Thread-0:異步打印日志把兔,logback使用,共九個(gè):
RocketmqBrokerAppender_inner
RocketmqFilterAppender_inner
RocketmqProtectionAppender_inner
RocketmqRemotingAppender_inner
RocketmqRebalanceLockAppender_inner
RocketmqStoreAppender_inner
RocketmqStoreErrorAppender_inner
RocketmqWaterMarkAppender_inner
RocketmqTransactionAppender_inner
Processor使用線程
SendMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_MESSAGE
PullMessageThread_:remotingServer.registerProcessor(RequestCode.PULL_MESSAGE
ProcessReplyMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE
QueryMessageThread_:remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE
AdminBrokerThread_:remotingServer.registerDefaultProcessor
ClientManageThread_:remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT
HeartbeatThread_:remotingServer.registerProcessor(RequestCode.HEART_BEAT
EndTransactionThread_:remotingServer.registerProcessor(RequestCode.END_TRANSACTION
ConsumerManageThread_:remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,RequestCode.UPDATE_CONSUMER_OFFSET,RequestCode.QUERY_CONSUMER_OFFSET
RemotingClient線程
brokerOutApi_thread_:BrokerController.registerBrokerAll(true, false, true);
==================================================================
定時(shí)任務(wù)線程
BrokerControllerScheduledThread:=>
BrokerController.this.getBrokerStats().record();
BrokerController.this.consumerOffsetManager.persist();
BrokerController.this.consumerFilterManager.persist();
BrokerController.this.protectBroker();
BrokerController.this.printWaterMark();
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
BrokerController.this.printMasterAndSlaveDiff();
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
BrokerFastFailureScheduledThread:=>
FilterServerManagerScheduledThread:=>
FilterServerManager.this.createFilterServer();
ClientHousekeepingScheduledThread:=>
ClientHousekeepingService.this.scanExceptionChannel();
服務(wù)線程
PullRequestHoldService
FileWatchService
未知線程
AllocateMappedFileService
AcceptSocketService
BrokerStatsThread1