一袭灯、簡述
本文簡述一下Broker的啟動(dòng)流程绑嘹,主要涉及的步驟及簡要配置工腋,不做過多深入。
二蟋恬、BrokerStartup筋现、BrokerController
啟動(dòng)類箱歧,類結(jié)構(gòu)如下:
首先會(huì)調(diào)用createBrokerController()實(shí)例化一個(gè)BrokerController(它才是核心的啟動(dòng)類)呀邢,然后調(diào)用start()方法价淌,這個(gè)套路和Namesrv的思路一致瞒津。
1巷蚪、createBrokerController()
與Namesrv類似屁柏,通過一頓騷操作有送,將cmd命令行的參數(shù)進(jìn)行解析雀摘,并產(chǎn)出4個(gè)配置(這四個(gè)配置本身有默認(rèn)配置):
-
BrokerConfig:
broker自身的一些配置,例如namesrvAddr(namesrv地址)涯塔,brokerIP1伤塌,brokerIP2轧铁,brokerName齿风,defaultTopicQueueNums(默認(rèn)8個(gè)隊(duì)列)救斑,autoCreateTopicEnable(默認(rèn)居然是true)等等 -
NettyServerConfig
nettyserver的一些配置真屯,最重要的 listenPort (10911)及serverWorkerThreads(worker的線程數(shù)量)绑蔫、serverOnewaySemaphoreValue(one-way模式發(fā)送的最大線程數(shù))、serverAsyncSemaphoreValue(異步模式最大的線程數(shù))携添,serverSocketSndBufSize(發(fā)送消息最大長度)等等 -
NettyClientConfig
netty客戶端(producer烈掠、consumer)的一些配置,connectTimeoutMillis(超時(shí)時(shí)間默認(rèn)3秒)瘾蛋,channelNotActiveInterval(通道異常檢查時(shí)間矫限,默認(rèn)60秒)等等 -
MessageStoreConfig
消息commitLog固化配置奇唤,storePathCommitLog(commitLog目錄),mapedFileSizeCommitLog(commitLog大小咬扇,默認(rèn)1G)懈贺,
messageDelayLevel(消息延遲投遞級別:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)
與Namesrv類似梭灿,如果cmd中含有:
- -c path,則會(huì)加載path指定的配置文件中的配置
- -p 會(huì)打印所有的配置堡妒,并退出
- -m 只打印 @ImportantField 注解標(biāo)注的配置皮迟,并退出
待上面的配置都加載完畢后,調(diào)用initialize進(jìn)行初始化:
boolean initResult = controller.initialize();
2忿檩、controller.initialize()
這個(gè)方法很重要爆阶,做了一堆的事情辨图,在羅列功能之前,先看這個(gè)圖
RocketMQ默認(rèn)會(huì)在/home(我的是windows孽尽,所以home=C:\User\asd)下新建一個(gè)store文件夾杉女,里面存放了一堆一堆很重要的配置,commitlog(固化的消息)速勇,config(topic坎拐、subscriptionGroup、consumerFilter都伪、consumerOffset积担、delayOffset配置)帝璧,consumequeue(消費(fèi)者隊(duì)列配置)等等,都很重要褐耳,而controller.initialize()方法主要就是在broker啟動(dòng)的時(shí)候?qū)@些配置進(jìn)行初始化
-
topicConfigManager.load()
加載C:\Users\asd\store\config\topics.json -
consumerOffsetManager.load()
加載C:\Users\asd\store\config\consumerOffset.json -
subscriptionGroupManager.load()
加載C:\Users\asd\store\config\subscriptionGroup.json -
consumerFilterManager.load()
加載C:\Users\asd\store\config\consumerFilter.json -
messageStore.load()
加載C:\Users\asd\store\commitLog\ 下所有消息日志文件
該加載的都加載完了铃芦,然后啟動(dòng)NettyServer以及一堆的 Executor 線程池
-
remotingServer
默認(rèn)監(jiān)聽端口 10911 -
fastRemotingServer
默認(rèn)監(jiān)聽端口 10911 - 2 = 10909杨帽,莫非和VIP通道有關(guān)嗤军?后續(xù)補(bǔ)充 -
sendMessageExecutor
發(fā)送消息的線程池 -
pullMessageExecutor
拉取消息的線程池 -
queryMessageExecutor
查詢消息的線程池 -
adminBrokerExecutor
不知道干啥的線程池 -
clientManageExecutor
客戶端連接管理線程池 -
heartbeatExecutor
心跳線程池 -
endTransactionExecutor
事務(wù)結(jié)束線程池 -
consumerManageExecutor
消費(fèi)者連接線程池
線程池初始化呢完畢后叙赚,開啟一些定時(shí)任務(wù)
-
BrokerController.this.getBrokerStats().record();
固化broker的狀態(tài)震叮,默認(rèn)1天執(zhí)行一次 -
BrokerController.this.consumerOffsetManager.persist();
固化offset苇瓣,延遲10秒偿乖,每5秒執(zhí)行一次 -
BrokerController.this.consumerFilterManager.persist();
Filter固化哲嘲,延遲10秒眠副,每10秒執(zhí)行一次 -
BrokerController.this.protectBroker();
保護(hù)broker竣稽?后續(xù)完善毫别,延遲3分,每3分執(zhí)行一次 -
BrokerController.this.printWaterMark();
打印流水信息台丛,發(fā)送恋博、拉取、查詢炼吴、結(jié)束事務(wù)消息等日志
延遲10秒疫衩,每1秒一次 -
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
打印日志:獲取已經(jīng)固化到commitlog,但是還沒有被消費(fèi)的日志的byte大小
延遲10秒童芹,每60秒執(zhí)行一次 -
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
如果fetchNamesrvAddrByAddressServer=true假褪,則會(huì)執(zhí)行
主要是通過httpclient從給定的URL動(dòng)態(tài)獲取NameSrv的地址
延遲10秒近顷,每120秒執(zhí)行一次 -
BrokerController.this.slaveSynchronize.syncAll();
如果broker的角色是SLAVE,則會(huì)執(zhí)行窒升,主要是從主Broker定期同步topic饱须、offset、delayOffset譬挚、group信息
延遲10秒,每60秒執(zhí)行一次 -
BrokerController.this.printMasterAndSlaveDiff();
如果broker的角色是不是SLAVE狠角,則會(huì)執(zhí)行丰歌,主要打印SLAVE與Master之間差異的byte大小
如果啟用了TLS安全傳輸配置立帖,則會(huì)啟動(dòng) fileWatchService悠砚?后續(xù)補(bǔ)充
最后調(diào)用了三個(gè)方法:
-
initialTransaction()
初始化事務(wù)消息的一些服務(wù)及Listener -
initialAcl();
初始化ACL的一些服務(wù)灌旧,進(jìn)行訪問控制 -
initialRpcHooks();
暫時(shí)不知道干啥的
到這里枢泰,controller.initialize()算是完成了,回顧一下窿克,其實(shí)思路很清晰:
- 1毛甲、加載配置文件
- 2玻募、啟動(dòng)Netty服務(wù)
- 3七咧、初始化線程池
- 4、啟動(dòng)定時(shí)任務(wù)
- 5坑雅、其他的一些配置
3裹粤、start()
啟動(dòng)就比較直接了遥诉,首先啟動(dòng)一些服務(wù):
-
messageStore.start()
消息固化服務(wù)啟動(dòng) -
remotingServer.start()
nettyServer啟動(dòng) 10911 -
fastRemotingServer.start()
vip nettyServer啟動(dòng)矮锈,端口10909 -
fileWatchService.start();
啟用TLS加密的服務(wù)啟動(dòng) -
brokerOuterAPI.start();
啟用動(dòng)態(tài)獲取NameSrv的服務(wù)啟動(dòng) -
pullRequestHoldService.start();
獲取請求的服務(wù)啟動(dòng) -
clientHousekeepingService.start();
啟動(dòng) -
filterServerManager.start();
過濾器啟動(dòng) -
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
定時(shí)任務(wù)债朵,從Namesrv獲取所有broker注冊的信息序芦,延遲10秒粤咪,每10秒或者registerNameServerPeriod指定的時(shí)間寥枝,但是不能少于10秒執(zhí)行一次 -
brokerStatsManager.start();
broker狀態(tài)檢查啟動(dòng)囊拜,貌似啥也沒干 - this.brokerFastFailure.start();
-
transactionalMessageCheckService.start();
如果不是SLAVE艾疟,則會(huì)啟動(dòng)事務(wù)