在進(jìn)行spring-kafka消費(fèi)的過(guò)程中诱告,大部分人可能都遇到過(guò)kafka消息堆積的情況悯姊,尤其是大數(shù)據(jù)處理的場(chǎng)景池颈,這時(shí)候就要想辦法提高消費(fèi)能力。
提高消費(fèi)者的數(shù)量可以嗎刺彩?我們知道迷郑,kafka的消費(fèi)者個(gè)數(shù)是與kafka的分區(qū)數(shù)相關(guān)的,一個(gè)分區(qū)最多只能被一個(gè)消費(fèi)者消費(fèi)创倔,也就是說(shuō)嗡害,你在客戶端開(kāi)啟的消費(fèi)者個(gè)數(shù)即使超過(guò)了分區(qū)數(shù),也不能提高消費(fèi)能力畦攘,反而還占資源霸妹!
那既然一個(gè)分區(qū)只能被一個(gè)消費(fèi)者消費(fèi),那我增加分區(qū)數(shù)不就可以多開(kāi)消費(fèi)者了知押?這在理論上確實(shí)是可行的叹螟,但是在運(yùn)維層面會(huì)涉及到分區(qū)、數(shù)據(jù)的遷移朗徊,而且這期間kafka是不可用的狀態(tài)首妖,成本太過(guò)高昂。
本方案從消費(fèi)端的代碼層面著手爷恳,看如何設(shè)計(jì)實(shí)現(xiàn)一個(gè)高性能的kafka消費(fèi)組件有缆。
一、異步
默認(rèn)情況下温亲,spring-kafka采用的是同步消費(fèi)模式棚壁,這種情況下消費(fèi)能力被分區(qū)數(shù)限制,難以提升栈虚。所以我們首先想到的異步消費(fèi):
異步消費(fèi)袖外,將消息的拉取與處理交由不同的線程處理,在硬件允許的前提下魂务,可自由提高消費(fèi)能力曼验。
異步消費(fèi)雖然提高了消費(fèi)能力,但是對(duì)于offset的提交卻帶來(lái)了挑戰(zhàn):因?yàn)槟J(rèn)情況下粘姜,offset是自動(dòng)提交的鬓照,自動(dòng)提交很可能導(dǎo)致異步未處理完的消息丟失;
二孤紧、手動(dòng)提交
方案:
- consumer將offset加入本地列表進(jìn)行維護(hù)豺裆;
- 異步worker回調(diào)更新offset狀態(tài);
- consumer遍歷offset号显,尋找可提交的最大offset臭猜;
- 提交最大offset到kafka躺酒,并清空本地已提交offset;
三蔑歌、有序offset列表
從上圖可知羹应,為了進(jìn)行手動(dòng)提交,其中最關(guān)鍵的部分是手動(dòng)維護(hù)的offset列表丐膝,為實(shí)現(xiàn)offset的提交量愧,要達(dá)到的目標(biāo):
- 減少提交次數(shù),每次只提交已完成的最大offset帅矗;
- offset列表已提交部分及時(shí)釋放偎肃,保持彈性;
- 盡量保證offset更新的有序性浑此,提交時(shí)前面offset已完成累颂,避免出現(xiàn)頭部真空現(xiàn)象;
但現(xiàn)實(shí)情況是:
- 異步worker執(zhí)行進(jìn)度無(wú)法預(yù)料朱监,對(duì)offset回調(diào)更新無(wú)法保證順序;
- 極有可能出現(xiàn)頭部長(zhǎng)時(shí)間無(wú)更新的真空現(xiàn)象原叮;
- 極端情況下赫编,由于頭部offset一直不更新導(dǎo)致offset列表一直得不到釋放從而導(dǎo)致內(nèi)存溢出;
極端情況下會(huì)出現(xiàn)以下情形:
問(wèn)題:在異步執(zhí)行的情況下怎樣實(shí)現(xiàn)offset列表更新的有序性?
四脐区、傳統(tǒng)線程池
傳統(tǒng)線程池調(diào)度策略:
- 當(dāng)線程數(shù)< corePoolSize時(shí),直接創(chuàng)建新線程執(zhí)行任務(wù)她按;
- 當(dāng)corePoolSize<線程數(shù)<maxPoolSize時(shí)牛隅,如果隊(duì)列未滿炕柔,任務(wù)入隊(duì)等待;
- 當(dāng)corePoolSize<線程數(shù)<maxPoolSize時(shí)媒佣,如果隊(duì)列已滿匕累,創(chuàng)建新線程執(zhí)行任務(wù);
- 當(dāng)線程數(shù)=maxPoolSize默伍,且隊(duì)列已滿欢嘿,則執(zhí)行拒絕策略;
從傳統(tǒng)線程池的調(diào)度策略來(lái)看也糊,它是惰性的炼蹦,這對(duì)我們當(dāng)前場(chǎng)景來(lái)說(shuō)有哪些問(wèn)題?
- 在第3步中狸剃,后到的任務(wù)可能會(huì)比先到的任務(wù)先執(zhí)行完掐隐;
- 在第4步中,拒絕策略會(huì)導(dǎo)致任務(wù)拋棄或由主線程插隊(duì)钞馁;
這兩點(diǎn)都會(huì)導(dǎo)致無(wú)法保證任務(wù)的順序執(zhí)行虑省!
五、饑渴線程池
目標(biāo):
- 確保先到的任務(wù)先執(zhí)行僧凰;
- 保證任務(wù)等待時(shí)的公平性探颈;
- 不能丟棄任務(wù);
方案:
- 改變傳統(tǒng)線程池調(diào)度策略训措;
- 使用公平隊(duì)列保證任務(wù)等待的公平性伪节;
- 完全摒除拒絕策略;
- put instead offer隙弛;
新的調(diào)度策略:
- 當(dāng)線程數(shù)< corePoolSize時(shí)架馋,直接創(chuàng)建新線程執(zhí)行任務(wù);
- 當(dāng)corePoolSize<線程數(shù)<maxPoolSize時(shí)全闷,強(qiáng)制創(chuàng)建新線程執(zhí)行任務(wù)叉寂;
- 當(dāng)線程數(shù)=maxPoolSize,入隊(duì)列等待总珠;
六屏鳍、微批處理
目標(biāo):
- 提高末端業(yè)務(wù)執(zhí)行效率,最大化提升性能局服;
- 降低本地offset列表內(nèi)存開(kāi)銷钓瞭;
- 提高offset提交至kafka的效率;
方案:
- 基于micro-batch微批思想淫奔,consumer線程對(duì)批量消息進(jìn)行切分山涡、整合,將微批數(shù)據(jù)交給不同的worker線程處理;
- offset列表僅記錄micro-batch中最大的offset鸭丛;
- 僅提交最大offset到kafka竞穷;
微批處理要求業(yè)務(wù)必須具有原子性谴垫,micro-batch這一批要么全部成功,要么全部失斈钢搿翩剪!
七、last offset處理
問(wèn)題:
- consumer線程在提交offset時(shí)彩郊,秉著‘有多少就提交多少’的思想前弯,并不等待,所以在異步的情況下會(huì)出現(xiàn)最后一批消息的若干offset得不到提交的情況秫逝;
- 如果后續(xù)沒(méi)有業(yè)務(wù)消息產(chǎn)生恕出,那么這若干offset將永遠(yuǎn)得不到提交,如果此時(shí)發(fā)生rebalance违帆,將發(fā)生重復(fù)消費(fèi)浙巫;
方案:
基于sideCar模式,提供額外的監(jiān)視器刷后,動(dòng)態(tài)監(jiān)測(cè)本節(jié)點(diǎn)的offset列表的畴,如若發(fā)現(xiàn)offset列表仍有未提交的offset,則會(huì)主動(dòng)發(fā)送探針消息尝胆,驅(qū)動(dòng)consumer進(jìn)行poll及commit丧裁;
八、重寫與重排
問(wèn)題:
- 發(fā)生rebalance時(shí)含衔,consumer實(shí)例可能會(huì)發(fā)生變化煎娇,用已過(guò)時(shí)的consumer提交offset會(huì)失敹帧;
- 發(fā)生rebalance時(shí)逊桦,可能會(huì)發(fā)生重復(fù)消費(fèi)眨猎,本地offset列表會(huì)重復(fù)添加;
- 發(fā)生rebalance時(shí)强经,由于micro-batch機(jī)制,拉取的offset可能比已有的所有offset要小寺渗,造成offset列表亂序匿情;
方案:
- 加入offset列表時(shí),檢查consumer實(shí)例信殊,如果發(fā)生變化炬称,則對(duì)consumer進(jìn)行重寫;
- 濾重檢查及offset列表重排序涡拘;
九玲躯、分區(qū)重分配
問(wèn)題:
- 發(fā)生rebalance時(shí),節(jié)點(diǎn)被分配的分區(qū)也可能發(fā)生變化鳄乏,這時(shí)節(jié)點(diǎn)之前保存的未提交offset列表就成了臟數(shù)據(jù)跷车,并常駐內(nèi)存;
- 臟數(shù)據(jù)導(dǎo)致探針消息不停地被發(fā)送橱野,但是本節(jié)點(diǎn)消費(fèi)不到朽缴,引起網(wǎng)絡(luò)濫發(fā);
問(wèn)題思考(怎么判斷offset列表為臟數(shù)據(jù)水援?):
- 判斷offset列表大小不變?yōu)槭裁床豢梢裕?br> ——可能存在每次巡檢時(shí)大小都不變的情況密强;
- 判斷offset列表內(nèi)容不變?yōu)槭裁床豢梢裕?br> ——可能線程池繁忙,新offset添加一直等待導(dǎo)致列表不變化蜗元;
方案:
每次添加offset時(shí)記錄offset列表的更新時(shí)間戳或渤,Offset Monitor定時(shí)檢測(cè),當(dāng)經(jīng)歷了一個(gè)rebalance周期后奕扣,如果時(shí)間戳仍未更新薪鹦,判斷offset列表為臟數(shù)據(jù),予以清除成畦。
方案思考
- 時(shí)間戳長(zhǎng)時(shí)間不更新意味著什么距芬?
—— 1.分區(qū)重分配;2.線程池繁忙循帐,新offset添加不進(jìn)去框仔; - 線程池繁忙的場(chǎng)景,為什么offset列表不會(huì)被誤殺拄养?
—— 把時(shí)間線拉長(zhǎng)离斩,新offset加入等待一個(gè)rebalance周期后银舱,一定會(huì)rebalance;
這個(gè)案例給我們的啟發(fā):從空間維度轉(zhuǎn)變?yōu)闀r(shí)間維度去判斷跛梗,這種思考維度的轉(zhuǎn)變以及思考尺度的放大寻馏,降低了解決問(wèn)題的復(fù)雜性,提供了更多的可能性核偿。
十诚欠、極致性能
問(wèn)題:
- 傳統(tǒng)隊(duì)列在讀寫時(shí)存在鎖爭(zhēng)用,在高并發(fā)場(chǎng)景下漾岳,線程不停地被掛起轰绵、恢復(fù),上下文切換過(guò)程存在著很大的開(kāi)銷尼荆;
- 在x86架構(gòu)下的CPU左腔,在高并發(fā)場(chǎng)景下很容易形成偽共享(多個(gè)線程操作不同的變量,但是變量處于相同的緩存行捅儒,修改變量會(huì)使緩存行失效液样,甚至發(fā)生跨槽讀取);
Disruptor:
Disruptor是一個(gè)高性能的隊(duì)列巧还,通過(guò)以下設(shè)計(jì)解決傳統(tǒng)隊(duì)列的鎖爭(zhēng)用及偽共享問(wèn)題:
- 無(wú)鎖設(shè)計(jì):采用CAS無(wú)鎖方式鞭莽,保證線程安全,并提高效率;
- 環(huán)形數(shù)組:可避免頻繁的垃圾回收狞悲,同時(shí)數(shù)組對(duì)處理器的緩存機(jī)制更加友好;
- 元素定位:環(huán)形數(shù)組長(zhǎng)度為2^n撮抓,通過(guò)位運(yùn)算,能快速取到元素;
- Cache padding:通過(guò)添加額外的無(wú)用信息摇锋,避免偽共享引發(fā)的性能問(wèn)題丹拯。
十一、總體架構(gòu)
十二荸恕、效果測(cè)試
服務(wù)器:3 * 6核8G
消費(fèi)邏輯:將14萬(wàn)CDC消息計(jì)算更新后寫入ES
消費(fèi)配置:kafka 6分區(qū)乖酬、批量拉取500涵卵、32線程宣吱、2048隊(duì)列
效果對(duì)比:
- 不使用組件:81分鐘
- 親緣線程池:6分41秒
- Disruptor:4分51秒
- 饑渴線程池:4分11秒
- 饑渴 + 微批:1分35秒