1藻丢、現(xiàn)象
最近收到很多RocketMQ使用者把曼,反饋生產(chǎn)環(huán)境中在消息發(fā)送過(guò)程中偶爾會(huì)出現(xiàn)如下4個(gè)錯(cuò)誤信息之一:
1)[REJECTREQUEST]system busy, start flow control for a while
2)too many requests and system thread pool busy, RejectedExecutionException
3)[PC_SYNCHRONIZED]broker busy, start flow control for a while
4)[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d
2杨帽、原理解讀
在進(jìn)行消息中間件的選型時(shí),如果待選中間件在功能上嗤军、性能上都能滿足業(yè)務(wù)的情況下注盈,建議把中間件的實(shí)現(xiàn)語(yǔ)言這個(gè)因素也考慮進(jìn)去,畢竟選擇一門用自己擅長(zhǎng)的語(yǔ)言實(shí)現(xiàn)的中間件會(huì)更具掌控性叙赚。在出現(xiàn)異常的情況下老客,我們可以根據(jù)自己的經(jīng)驗(yàn)提取錯(cuò)誤信息關(guān)鍵字system busy,在RocketMQ源碼中直接搜索震叮,得到拋出上述錯(cuò)誤信息的代碼如下:
其代碼入口為:org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand胧砰。從圖中可以看出,拋出上述錯(cuò)誤的關(guān)鍵原因是:pair.getObject1().rejectRequest()和拋出RejectedExecutionException異常苇瓣。
備注:本文偏實(shí)戰(zhàn)尉间,源碼只是作為分析的重點(diǎn)證據(jù),故本文只會(huì)點(diǎn)出關(guān)鍵源碼,并不會(huì)詳細(xì)跟蹤其整個(gè)實(shí)現(xiàn)流程乌妒,如果想詳細(xì)了解其實(shí)現(xiàn)汹想,可以查閱筆者編著的《RocketMQ技術(shù)內(nèi)幕》。
2.1 RocketMQ 網(wǎng)絡(luò)處理機(jī)制概述
RocketMQ的網(wǎng)絡(luò)設(shè)計(jì)非常值得我們學(xué)習(xí)與借鑒撤蚊,首先在客戶端端將不同的請(qǐng)求定義不同的請(qǐng)求命令CODE古掏,服務(wù)端會(huì)將客戶端請(qǐng)求進(jìn)行分類,每個(gè)命令或每類請(qǐng)求命令定義一個(gè)處理器(NettyRequestProcessor)侦啸,然后每一個(gè)NettyRequestProcessor綁定到一個(gè)單獨(dú)的線程池槽唾,進(jìn)行命令處理,不同類型的請(qǐng)求將使用不同的線程池進(jìn)行處理光涂,實(shí)現(xiàn)線程隔離庞萍。
為了方便下文的描述,我們先簡(jiǎn)單的認(rèn)識(shí)一下NettyRequestProcessor忘闻、Pair钝计、RequestCode。其核心關(guān)鍵點(diǎn)如下:
- NettyRequestProcessor
RocketMQ 服務(wù)端請(qǐng)求處理器齐佳,例如SendMessageProcessor是消息發(fā)送處理器私恬、PullMessageProcessor是消息拉取命令處理器。 - RequestCode
請(qǐng)求CODE炼吴,用來(lái)區(qū)分請(qǐng)求的類型本鸣,例如SEND_MESSAGE:表示該請(qǐng)求為消息發(fā)送,PULL_MESSAGE:消息拉取請(qǐng)求硅蹦。 - Pair
用來(lái)封裝NettyRequestProcessor與ExecuteService的綁定關(guān)系荣德。在RocketMQ的網(wǎng)絡(luò)處理模型中,會(huì)為每一個(gè)NettyRequestProcessor與特定的線程池綁定童芹,所有該NettyRequestProcessor的處理邏輯都在該線程池中運(yùn)行涮瞻。
2.2 pair.getObject1().rejectRequest()
由于讀者朋友提出的問(wèn)題,都是發(fā)生在消息發(fā)送過(guò)程中假褪,故本文重點(diǎn)關(guān)注SendMessageProcessor#rejectRequest方法饲宛。
SendMessageProcessor#rejectRequest
public boolean rejectRequest() {
return this.brokerController.getMessageStore().isOSPageCacheBusy() || // @1
this.brokerController.getMessageStore().isTransientStorePoolDeficient(); // @2
}
拒絕請(qǐng)求的條件有兩個(gè),只要其中任意一個(gè)滿足嗜价,則返回true。
代碼@1:Os PageCache busy幕庐,判斷操作系統(tǒng)PageCache是否繁忙久锥,如果忙,則返回true异剥。想必看到這里大家肯定與我一樣好奇瑟由,RocketMQ是如何判斷pageCache是否繁忙呢?下面會(huì)重點(diǎn)分析冤寿。
代碼@2:transientStorePool是否不足歹苦。
2.2.1 isOSPageCacheBusy()
DefaultMessageStore#isOSPageCacheBusy()
public boolean isOSPageCacheBusy() {
long begin = this.getCommitLog().getBeginTimeInLock(); // @1 start
long diff = this.systemClock.now() - begin; // @1 end
return diff < 10000000
&& diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills(); // @2
}
代碼@1:先重點(diǎn)解釋begin青伤、diff兩個(gè)局部變量的含義:
- begin
通俗的一點(diǎn)講,就是將消息寫入Commitlog文件所持有鎖的時(shí)間殴瘦,精確說(shuō)是將消息體追加到內(nèi)存映射文件(DirectByteBuffer)或pageCache(FileChannel#map)該過(guò)程中開始持有鎖的時(shí)間戳狠角,具體的代碼請(qǐng)參考:CommitLog#putMessage。 - diff
一次消息追加過(guò)程中持有鎖的總時(shí)長(zhǎng)蚪腋,即往內(nèi)存映射文件或pageCache追加一條消息所耗時(shí)間丰歌。
代碼@2:如果一次消息追加過(guò)程的時(shí)間超過(guò)了Broker配置文件osPageCacheBusyTimeOutMills,則認(rèn)為pageCache繁忙屉凯,osPageCacheBusyTimeOutMills默認(rèn)值為1000立帖,表示1s。
2.2.2 isTransientStorePoolDeficient()
DefaultMessageStore#isTransientStorePoolDeficient
public boolean isTransientStorePoolDeficient() {
return remainTransientStoreBufferNumbs() == 0;
}
public int remainTransientStoreBufferNumbs() {
return this.transientStorePool.remainBufferNumbs();
}
最終調(diào)用TransientStorePool#remainBufferNumbs方法悠砚。
public int remainBufferNumbs() {
if (storeConfig.isTransientStorePoolEnable()) {
return availableBuffers.size();
}
return Integer.MAX_VALUE;
}
如果啟用transientStorePoolEnable機(jī)制晓勇,返回當(dāng)前可用的ByteBuffer個(gè)數(shù),即整個(gè)isTransientStorePoolDeficient方法的用意是是否還存在可用的ByteBuffer灌旧,如果不存在绑咱,即表示pageCache繁忙。那什么是transientStorePoolEnable機(jī)制呢节榜?
2.3 漫談transientStorePoolEnable機(jī)制
Java NIO的內(nèi)存映射機(jī)制羡玛,提供了將文件系統(tǒng)中的文件映射到內(nèi)存機(jī)制,實(shí)現(xiàn)對(duì)文件的操作轉(zhuǎn)換對(duì)內(nèi)存地址的操作宗苍,極大的提高了IO特性稼稿,但這部分內(nèi)存并不是常駐內(nèi)存,可以被置換到交換內(nèi)存(虛擬內(nèi)存)讳窟,RocketMQ為了提高消息發(fā)送的性能让歼,引入了內(nèi)存鎖定機(jī)制,即將最近需要操作的commitlog文件映射到內(nèi)存丽啡,并提供內(nèi)存鎖定功能谋右,確保這些文件始終存在內(nèi)存中,該機(jī)制的控制參數(shù)就是transientStorePoolEnable补箍。
2.3.1 MappedFile
重點(diǎn)關(guān)注MappedFile的ByteBuffer writeBuffer改执、MappedByteBuffer mappedByteBuffer這兩個(gè)屬性的初始化,因?yàn)檫@兩個(gè)方法是寫消息與查消息操作的直接數(shù)據(jù)結(jié)構(gòu)坑雅。
兩個(gè)關(guān)鍵點(diǎn)如下:
- ByteBuffer writeBuffer
如果開啟了transientStorePoolEnable,則使用ByteBuffer.allocateDirect(fileSize),創(chuàng)建(java.nio的內(nèi)存映射機(jī)制)辈挂。如果未開啟,則為空裹粤。 - MappedByteBuffer mappedByteBuffer
使用FileChannel#map方法創(chuàng)建终蒂,即真正意義上的PageCache。
消息寫入時(shí):
MappedFile#appendMessagesInner
從中可見(jiàn),在消息寫入時(shí)拇泣,如果writerBuffer不為空噪叙,說(shuō)明開啟了transientStorePoolEnable機(jī)制,則消息首先寫入writerBuffer中霉翔,如果其為空睁蕾,則寫入mappedByteBuffer中。
消息拉取(讀消息):
MappedFile#selectMappedBuffer
消息讀取時(shí)早龟,是從mappedByteBuffer中讀(pageCache)惫霸。
大家是不是發(fā)現(xiàn)了一個(gè)有趣的點(diǎn),如果開啟transientStorePoolEnable機(jī)制葱弟,是不是有了讀寫分離的效果壹店,先寫入writerBuffer中,讀卻是從mappedByteBuffer中讀取芝加。
為了對(duì)transientStorePoolEnable引入意圖闡述的更加明白硅卢,這里我引入Rocketmq社區(qū)貢獻(xiàn)者胡宗棠關(guān)于此問(wèn)題的見(jiàn)解。
通常有如下兩種方式進(jìn)行讀寫:
- 第一種藏杖,Mmap+PageCache的方式将塑,讀寫消息都走的是pageCache,這樣子讀寫都在pagecache里面不可避免會(huì)有鎖的問(wèn)題蝌麸,在并發(fā)的讀寫操作情況下点寥,會(huì)出現(xiàn)缺頁(yè)中斷降低,內(nèi)存加鎖来吩,污染頁(yè)的回寫敢辩。
- 第二種,DirectByteBuffer(堆外內(nèi)存)+PageCache的兩層架構(gòu)方式弟疆,這樣子可以實(shí)現(xiàn)讀寫消息分離戚长,寫入消息時(shí)候?qū)懙降氖荄irectByteBuffer——堆外內(nèi)存中,讀消息走的是PageCache(對(duì)于,DirectByteBuffer是兩步刷盤,一步是刷到PageCache怠苔,還有一步是刷到磁盤文件中)同廉,帶來(lái)的好處就是,避免了內(nèi)存操作的很多容易堵的地方柑司,降低了時(shí)延迫肖,比如說(shuō)缺頁(yè)中斷降低,內(nèi)存加鎖攒驰,污染頁(yè)的回寫咒程。
溫馨提示:如果想與胡宗棠大神進(jìn)一步溝通交流,可以關(guān)注他的github賬號(hào):https://github.com/zongtanghu
不知道大家會(huì)不會(huì)有另外一個(gè)擔(dān)憂讼育,如果開啟了transientStorePoolEnable,內(nèi)存鎖定機(jī)制,那是不是隨著commitlog文件的不斷增加奶段,最終導(dǎo)致內(nèi)存溢出饥瓷?
2.3.2 TransientStorePool初始化
從這里可以看出,TransientStorePool默認(rèn)會(huì)初始化5個(gè)DirectByteBuffer(對(duì)外內(nèi)存)痹籍,并提供內(nèi)存鎖定功能呢铆,即這部分內(nèi)存不會(huì)被置換,可以通過(guò)transientStorePoolSize參數(shù)控制蹲缠。
在消息寫入消息時(shí)棺克,首先從池子中獲取一個(gè)DirectByteBuffer進(jìn)行消息的追加。當(dāng)5個(gè)DirectByteBuffer全部寫滿消息后线定,該如何處理呢娜谊?從RocketMQ的設(shè)計(jì)中來(lái)看,同一時(shí)間斤讥,只會(huì)對(duì)一個(gè)commitlog文件進(jìn)行順序?qū)懮唇裕瑢懲暌粋€(gè)后,繼續(xù)創(chuàng)建一個(gè)新的commitlog文件芭商。故TransientStorePool的設(shè)計(jì)思想是循環(huán)利用這5個(gè)DirectByteBuffer派草,只需要寫入到DirectByteBuffer的內(nèi)容被提交到PageCache后,即可重復(fù)利用铛楣。對(duì)應(yīng)的代碼如下:
TransientStorePool#returnBuffer
public void returnBuffer(ByteBuffer byteBuffer) {
byteBuffer.position(0);
byteBuffer.limit(fileSize);
this.availableBuffers.offerFirst(byteBuffer);
}
其調(diào)用棧如下:
從上面的分析看來(lái)近迁,并不會(huì)隨著消息的不斷寫入而導(dǎo)致內(nèi)存溢出。
3簸州、現(xiàn)象解答
3.1 [REJECTREQUEST]system busy
其拋出的源碼入口點(diǎn):NettyRemotingAbstract#processRequestCommand鉴竭,上面的原理分析部分已經(jīng)詳細(xì)介紹其實(shí)現(xiàn)原理,總結(jié)如下勿侯。
在不開啟transientStorePoolEnable機(jī)制時(shí)拓瞪,如果Broker PageCache繁忙時(shí)則拋出上述錯(cuò)誤,判斷PageCache繁忙的依據(jù)就是向PageCache追加消息時(shí)助琐,如果持有鎖的時(shí)間超過(guò)1s祭埂,則會(huì)拋出該錯(cuò)誤;在開啟transientStorePoolEnable機(jī)制時(shí)兵钮,其判斷依據(jù)是如果TransientStorePool中不存在可用的堆外內(nèi)存時(shí)拋出該錯(cuò)誤蛆橡。
3.2 too many requests and system thread pool busy, RejectedExecutionException
其拋出的源碼入口點(diǎn):NettyRemotingAbstract#processRequestCommand,其調(diào)用地方緊跟3.1,是在向線程池執(zhí)行任務(wù)時(shí)掘譬,被線程池拒絕執(zhí)行時(shí)拋出的泰演,我們可以順便看看Broker消息處理發(fā)送的線程信息:
BrokerController#registerProcessor
該線程池的隊(duì)列長(zhǎng)度默認(rèn)為10000,我們可以通過(guò)sendThreadPoolQueueCapacity來(lái)改變默認(rèn)值葱轩。
3.3 [PC_SYNCHRONIZED]broker busy
其拋出的源碼入口點(diǎn):DefaultMessageStore#putMessage睦焕,在進(jìn)行消息追加時(shí)藐握,再一次判斷PageCache是否繁忙,如果繁忙垃喊,則拋出上述錯(cuò)誤猾普。
3.4 broker busy, period in queue: %sms, size of queue: %d
其拋出源碼的入口點(diǎn):BrokerFastFailure#cleanExpiredRequest。該方法的調(diào)用頻率為每隔10s中執(zhí)行一次本谜,不過(guò)有一個(gè)執(zhí)行前提條件就是Broker端要開啟快速失敗初家,默認(rèn)為開啟,可以通過(guò)參數(shù)brokerFastFailureEnable來(lái)設(shè)置乌助。該方法的實(shí)現(xiàn)要點(diǎn)是每隔10s溜在,檢測(cè)一次,如果檢測(cè)到PageCache繁忙他托,并且發(fā)送隊(duì)列中還有排隊(duì)的任務(wù)掖肋,則直接不再等待,直接拋出系統(tǒng)繁忙錯(cuò)誤上祈,使正在排隊(duì)的線程快速失敗培遵,結(jié)束等待。
4登刺、實(shí)踐建議
經(jīng)過(guò)上面的原理講解與現(xiàn)象分析籽腕,消息發(fā)送時(shí)拋出system busy、broker busy的原因都是PageCache繁忙纸俭,那是不是可以通過(guò)調(diào)整上述提到的某些參數(shù)來(lái)避免拋出錯(cuò)誤呢皇耗?.例如如下參數(shù):
- osPageCacheBusyTimeOutMills
設(shè)置PageCache系統(tǒng)超時(shí)的時(shí)間,默認(rèn)為1000揍很,表示1s郎楼,那是不是可以把增加這個(gè)值,例如設(shè)置為2000或3000窒悔。作者觀點(diǎn):非常不可取呜袁。 - sendThreadPoolQueueCapacity
Broker服務(wù)器處理的排隊(duì)隊(duì)列,默認(rèn)為10000简珠,如果隊(duì)列中積壓了10000個(gè)請(qǐng)求阶界,則會(huì)拋出RejectExecutionException。作者觀點(diǎn):不可取聋庵。 - brokerFastFailureEnable
是否啟用快速失敗膘融,默認(rèn)為true,表示當(dāng)如果發(fā)現(xiàn)Broker服務(wù)器的PageCache繁忙祭玉,如果發(fā)現(xiàn)sendThreadPoolQueue隊(duì)列中不為空氧映,表示還有排隊(duì)的發(fā)送請(qǐng)求在排隊(duì)等待執(zhí)行,則直接結(jié)束等待脱货,返回broker busy岛都。那如果不開啟快速失敗律姨,則同樣可以避免拋出這個(gè)錯(cuò)誤。作者觀點(diǎn):非常不可取疗绣。
修改上述參數(shù)线召,都不可取,原因是出現(xiàn)system busy多矮、broker busy這個(gè)錯(cuò)誤,其本質(zhì)是系統(tǒng)的PageCache繁忙哈打,通俗一點(diǎn)講就是向PageCache追加消息時(shí)塔逃,單個(gè)消息發(fā)送占用的時(shí)間超過(guò)1s了,如果繼續(xù)往該Broker服務(wù)器發(fā)送消息并等待料仗,其TPS根本無(wú)法滿足湾盗,哪還是高性能的消息中間了呀。故才會(huì)采用快速失敗機(jī)制立轧,直接給消息發(fā)送者返回錯(cuò)誤格粪,消息發(fā)送者默認(rèn)情況會(huì)重試2次,將消息發(fā)往其他Broker氛改,保證其高可用帐萎。
下面根據(jù)個(gè)人的見(jiàn)解,提出如下解決辦法:
4.1 開啟transientStorePoolEnable
在broker.config中將transientStorePoolEnable=true胜卤。
方案依據(jù):
啟用“讀寫”分離疆导,消息發(fā)送時(shí)消息先追加到DirectByteBuffer(堆外內(nèi)存)中,然后在異步刷盤機(jī)制下葛躏,會(huì)將DirectByteBuffer中的內(nèi)容提交到PageCache澈段,然后刷寫到磁盤。消息拉取時(shí)舰攒,直接從PageCache中拉取败富,實(shí)現(xiàn)了讀寫分離,減輕了PageCaceh的壓力摩窃,能從根本上解決該問(wèn)題兽叮。方案缺點(diǎn):
會(huì)增加數(shù)據(jù)丟失的可能性,如果Broker JVM進(jìn)程異常退出偶芍,提交到PageCache中的消息是不會(huì)丟失的充择,但存在堆外內(nèi)存(DirectByteBuffer)中但還未提交到PageCache中的這部分消息,將會(huì)丟失匪蟀。但通常情況下椎麦,RocketMQ進(jìn)程退出的可能性不大。
4.2 擴(kuò)容Broker服務(wù)器
方案依據(jù):
當(dāng)Broker服務(wù)器自身比較忙的時(shí)候材彪,快速失敗观挎,并且在接下來(lái)的一段時(shí)間內(nèi)會(huì)規(guī)避該Broker琴儿,這樣該Broker恢復(fù)提供了時(shí)間保證,Broker本身的架構(gòu)是支持分布式水平擴(kuò)容的嘁捷,增加Topic的隊(duì)列數(shù)造成,降低單臺(tái)Broker服務(wù)器的負(fù)載,從而避免出現(xiàn)PageCache雄嚣。
溫馨提示:在Broker擴(kuò)容時(shí)候晒屎,可以復(fù)制集群中任意一臺(tái)Broker服務(wù)下${ROCKETMQ_HOME}/store/config/topics.json到新Broker服務(wù)器指定目錄,避免在新Broker服務(wù)器上為Broker創(chuàng)建隊(duì)列缓升,然后消息發(fā)送者鼓鲁、消息消費(fèi)者都能動(dòng)態(tài)獲取Topic的路由信息。
與之?dāng)U容對(duì)應(yīng)的港谊,也可以通過(guò)對(duì)原有Broker進(jìn)行升配骇吭,例如增加內(nèi)存、把機(jī)械盤換成SSD歧寺,但這種情況燥狰,通常需要重啟Broekr服務(wù)器,沒(méi)有擴(kuò)容來(lái)的方便斜筐。
本文就介紹到這里了龙致,如果大家覺(jué)得文章對(duì)自己有用的話,麻煩幫忙點(diǎn)贊奴艾、轉(zhuǎn)發(fā)净当,謝謝。親愛(ài)的讀者朋友蕴潦,還有更好的方案沒(méi)像啼?歡迎留言與作者互動(dòng),共同探討潭苞。
更多文章請(qǐng)關(guān)注下面微信公眾號(hào):