在我們平時(shí)使用kafka的過程中吮旅,consumer偶爾會(huì)出現(xiàn)TPS大幅降低的情況指郁,可能會(huì)出現(xiàn)這樣的日志:
??究其原因是發(fā)生了Rebalance麦牺。Rebalance是消費(fèi)者組內(nèi)某個(gè)消費(fèi)者實(shí)例掛掉后,其他消費(fèi)者實(shí)例自動(dòng)重新分配訂閱主題分區(qū)的過程挺峡。換句話說左胞,它也是一種分配一個(gè)Consumer Group中所有的Consumer訂閱Topic(主題)下所有Partition(分區(qū))的協(xié)議。當(dāng)發(fā)生Rebalance時(shí)你雌,Consumer Group中所有的Consumer全部停止消費(fèi)器联,等待Rebalance結(jié)束后重新消費(fèi),類似JVM中的Stop The World婿崭。除了Stop The World以外拨拓,Rebalance的效率也很低。舉個(gè)例子逛球,我們的Consumer Group中有四個(gè)Consumer千元,分別為ConsumerA,ConsumerB颤绕,ConsumerC幸海,ConsumerD,分別消費(fèi)Topic A 下的 8個(gè)分區(qū)祟身,ConsumerA:P1、P2物独,ConsumerB:P3袜硫、P4,ConsumerC:P5挡篓、P6婉陷,ConsumerD:P7、P8,當(dāng)ConsumerD因?yàn)閽斓粲|發(fā)Rebalance后官研,所有的Consumer重新分配Partition秽澳,而不是ConsumerA,ConsumerB,ConsumerC接管ConsumerD之前消費(fèi)的Partition。當(dāng)我們的Consumer Group中Consumer實(shí)例比較多的時(shí)候戏羽,Rebalance將極其耗費(fèi)時(shí)間担神。
??那么在什么條件下會(huì)觸發(fā)Coordinator的Rebalance呢?當(dāng)滿足以下3個(gè)條件之一后,會(huì)觸發(fā)Rebalance始花。
1.當(dāng)Consumer Group中Consumer實(shí)例數(shù)量發(fā)生變化:無論實(shí)例是增加妄讯、減少還是崩潰都會(huì)觸發(fā)Rebalance。
2.當(dāng)Consumer Group訂閱的Topic數(shù)量發(fā)生變更:在大部分時(shí)候我們都是訂閱一個(gè)Topic酷宵,并消費(fèi)其中的消息亥贸。但是也可以通過正則的方式訂閱多個(gè)滿足正則規(guī)則的Topic,如果此時(shí)新增一個(gè)滿足要求的Topic浇垦,同樣也會(huì)觸發(fā)Rebalance炕置。
3.當(dāng)訂閱的Topic下面的Partition數(shù)量發(fā)生變更時(shí),也會(huì)觸發(fā)Rebalance男韧。
??第2讹俊、3種情況其實(shí)都是在我們的控制之中的情況,在這里就不繼續(xù)討論了煌抒,只討論Consumer實(shí)例數(shù)量發(fā)生變化導(dǎo)致的Rebalance。新增Consumer實(shí)例這種情況比較好理解厕倍,當(dāng)我們新啟動(dòng)一個(gè)已有的group.id
的Consumer時(shí)寡壮,就相當(dāng)于向該Consumer Group中添加了一個(gè)Consumer實(shí)例。同理讹弯,當(dāng)我們停掉某個(gè)Consumer實(shí)例時(shí)况既,該實(shí)例也會(huì)自動(dòng)從Consumer Group中退出。
?以上兩種情況都是Consumer實(shí)例正常增減组民,由此導(dǎo)致的Rebalance都是在意料之中的棒仍。但是還有一種情況也會(huì)導(dǎo)致Rebalance,那就是Consumer被Coordinator踢出消費(fèi)組臭胜。和Redis一樣莫其,Kafka的Consumer實(shí)例也會(huì)向Coordinator定期發(fā)送心跳癞尚,如果在指定時(shí)間內(nèi)Consumer沒有向Coordinator發(fā)送心跳,Coordinator就會(huì)認(rèn)為該Consumer實(shí)例已經(jīng)死亡乱陡,然后將其踢出消費(fèi)組浇揩。上面提到的指定時(shí)間由session.timeout.ms
參數(shù)控制,默認(rèn)值為10s,即Coordinator在10s內(nèi)沒有收到Consumer實(shí)例的心跳憨颠,則會(huì)認(rèn)為該實(shí)例不可用胳徽,但是session.timeout.ms
這個(gè)值不宜設(shè)置過大,因?yàn)檫@樣會(huì)影響Coordinator快速發(fā)現(xiàn)不可用的Consumer實(shí)例爽彤。除了session.timeout.ms
參數(shù)养盗,還有一個(gè)參數(shù)與心跳緊密相關(guān),那就是heartbeat.interval.ms
适篙,該參數(shù)的意思是實(shí)例在多少秒內(nèi)發(fā)出一次心跳往核,該參數(shù)控制著實(shí)例發(fā)送心跳的頻率,默認(rèn)值為3s匙瘪。一般默認(rèn)session.timeout.ms
>= 3*heartbeat.interval.ms
铆铆,之所以這么設(shè)置是因?yàn)槲覀冃枰谝淮?code>heartbeat.interval.ms中盡可能的多發(fā)出心跳,使得實(shí)例能夠盡早的發(fā)現(xiàn)當(dāng)前是否開啟了Rebalance(如果開啟了Rebalance丹喻,Coordinator會(huì)在心跳請(qǐng)求的響應(yīng)中放入REBALANCE_NEEDED標(biāo)識(shí))以及在被Coordinator判定為死亡之前盡量告知Coordinator實(shí)例還活著薄货,但是心跳也不宜頻繁發(fā)送,畢竟發(fā)送心跳需要占用帶寬資源碍论,因此我們采用一個(gè)默認(rèn)的取值方法谅猾,即session.timeout.ms
>= 3*heartbeat.interval.ms
。
??除了上面提到的兩個(gè)參數(shù)鳍悠,Consumer端還有一個(gè)參數(shù)用來控制實(shí)例的實(shí)際消費(fèi)能力對(duì)于Rebalance的影響税娜,那就是max.poll.interval.ms
,該參數(shù)規(guī)定了Consumer每隔多久才能poll一批消息藏研,默認(rèn)值為5min敬矩。如果Consumer在設(shè)定的max.poll.interval.ms
的時(shí)間限制內(nèi)沒有消費(fèi)完這批消息,那么Consumer會(huì)主動(dòng)離開當(dāng)前的Consumer Group蠢挡,繼而引發(fā)新的一輪Rebalance弧岳。
??另外,如果Consumer使用的語言是Java业踏,那么GC的問題同樣需要考慮禽炬。如果使用的是CMS垃圾回收器,可能會(huì)出現(xiàn)最終標(biāo)記階段停頓時(shí)間過長(zhǎng)的問題勤家,在這一階段是會(huì)Stop The World的腹尖,這同樣會(huì)導(dǎo)致Rebalance的發(fā)生。
??說到這里伐脖,我們可以總結(jié)一下热幔,到底什么情況導(dǎo)致的Rebalance是我們不希望發(fā)生的:
1.Consumer實(shí)例沒有及時(shí)向Coordinator發(fā)送它的心跳乐设,導(dǎo)致被Coordinator認(rèn)為實(shí)例死亡,從而被移出消費(fèi)組断凶,導(dǎo)致Rebalance伤提。在這種情況下我們可以通過設(shè)置session.timeout.ms
和heartbeat.interval.ms
兩個(gè)參數(shù)進(jìn)行控制。
2.Consumer實(shí)例消費(fèi)時(shí)間過長(zhǎng)认烁,超過了max.poll.interval.ms
設(shè)置的時(shí)間限制肿男,導(dǎo)致實(shí)例主動(dòng)離開消費(fèi)組,進(jìn)而導(dǎo)致Rebalance却嗡。這種情況如何優(yōu)化會(huì)在后面進(jìn)行分析舶沛。
3.Consumer實(shí)例使用的垃圾回收器不合適同樣可能會(huì)導(dǎo)致Rebalance,如果你使用的JDK版本在1.8以上窗价,還是建議你使用G1垃圾回收器如庭。
??回到我們第一張圖中的日志:Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.這段話的意思是由于發(fā)生了rebalance,分區(qū)已經(jīng)被分配給其他成員撼港,當(dāng)前Consumer實(shí)例無法提交offset坪它。這標(biāo)識(shí)兩次調(diào)用poll()方法的時(shí)間超過了默認(rèn)的max.poll.interval.ms
,這表明實(shí)例花費(fèi)了太多時(shí)間用戶處理消息帝牡。你可以通過增加max.poll.interval.ms
參數(shù)的值或者降低max.poll.records
參數(shù)的值往毡。
??除去Kafka官方提示我們的這兩種解決辦法之外,還有其他兩種解決辦法:一是縮短單條消息的處理時(shí)間靶溜,增加消費(fèi)者的吞吐量开瞭;二是使用線程池等方案,使用多線程加速消息的消費(fèi)速率罩息,同樣可以提升消費(fèi)者的吞吐量嗤详。