前言
前段時(shí)間寫了三個(gè)Spark Streaming程序侍匙,負(fù)責(zé)從Kafka訂閱群和用戶消息筷登,并做輿情監(jiān)控必須的ETL工作。它們消費(fèi)的Topic各自不同腋粥,但是分配的group.id
都相同蝗岖。運(yùn)行一段時(shí)間之后侥猩,發(fā)現(xiàn)偶爾拋出如下所示的異常。
ERROR [org.apache.spark.streaming.scheduler.JobScheduler] - Error generating jobs for time 1579570560000 ms
java.lang.IllegalStateException: No current assignment for partition my_dummy_topic-11
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315)
at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:192)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:209)
......
經(jīng)過Google確定是由于我們使用的Spark Streaming 2.3版本無法處理Kafka Consumer Rebalance而引起的抵赢,見SPARK-22968及其對(duì)應(yīng)的PR欺劳。我們通過進(jìn)一步查看日志,確實(shí)發(fā)生了Rebalance铅鲤。
那么本文就來說說Rebalance機(jī)制划提,先從執(zhí)行Rebalance的主體——Consumer Group談起。
Consumer Group
為了使得Consumer易于組織邢享、可擴(kuò)展以及更好地容錯(cuò)鹏往,Kafka將一個(gè)或多個(gè)Consumer組織為Consumer Group,即消費(fèi)者組骇塘。Consumer Group的唯一標(biāo)識(shí)就是group.id
伊履。Group內(nèi)的所有Consumer共同消費(fèi)已訂閱的各個(gè)Topic的所有Partition韩容,并且保證每個(gè)Partition只分配給該Group內(nèi)的唯一一個(gè)Consumer。對(duì)Consumer Group的管理就是實(shí)現(xiàn)以下三個(gè)職責(zé):
- 分配并維護(hù)Consumer與Partition的消費(fèi)對(duì)應(yīng)關(guān)系唐瀑,即"who owns what"群凶。(重新)分配消費(fèi)對(duì)應(yīng)關(guān)系的過程就叫Rebalance;
- 記錄并提交Consumer消費(fèi)Partition的消費(fèi)位點(diǎn)(Offset值)哄辣,即"consumed up to where"请梢;
- 維護(hù)Consumer Group的配置信息與Consumer元數(shù)據(jù)。
下面簡單介紹Consumer Group的管理力穗。
Group Coordinator
在0.9版本之前毅弧,Kafka強(qiáng)依賴于ZooKeeper實(shí)現(xiàn)Consumer Group的管理:
- Group內(nèi)每個(gè)Consumer通過在ZK內(nèi)搶注節(jié)點(diǎn)來決定消費(fèi)哪些Partition,并注冊對(duì)Group和Broker相關(guān)節(jié)點(diǎn)的監(jiān)聽当窗,以獲知消費(fèi)環(huán)境的變化(其他Consumer掉線够坐、Broker宕機(jī)等),進(jìn)而觸發(fā)Rebalance超全;
- Offset值也維護(hù)在ZK中咆霜,老生常談了。
這種方式除了過于依賴ZK嘶朱,導(dǎo)致ZK壓力偏大之外,還有兩個(gè)分布式系統(tǒng)中常見且嚴(yán)重的問題:
- 羊群效應(yīng)(herd effect)——一個(gè)被監(jiān)聽的ZK節(jié)點(diǎn)發(fā)生變化光酣,導(dǎo)致大量的通知發(fā)送給所有監(jiān)聽者(即Consumer)疏遏;
- 腦裂(split brain)——ZK只保證最終一致性,不同的Consumer在同一時(shí)刻可能看到不同的Group和Broker狀態(tài)救军,造成Rebalance混亂财异。
所以從0.9版本開始,Kafka重新設(shè)計(jì)了名為Group Coordinator的“協(xié)調(diào)者”服務(wù)負(fù)責(zé)實(shí)現(xiàn)上述職責(zé)唱遭,將這部分工作從ZK剝離開來戳寸。每個(gè)Broker在啟動(dòng)時(shí),都會(huì)順帶啟動(dòng)一個(gè)Group Coordinator實(shí)例拷泽。每個(gè)Consumer Group在初始化時(shí)疫鹊,都會(huì)分配給一個(gè)Group Coordinator實(shí)例來管理消費(fèi)關(guān)系和Offset,如下簡圖所示司致。
Group Coordinator提交Offset時(shí)也不再是向ZK寫拆吆,而是寫入那個(gè)廣為人知的特殊Topic——__consumer_offsets里。key是group-topic-partition
格式的脂矫,value為Offset值枣耀。
那么該如何確定一個(gè)Consumer Group被分配給哪個(gè)Group Coordinator呢?Kafka根據(jù)groupId.hashCode() % offsets.topic.num.partitions
取絕對(duì)值來得出該Group的Offset信息寫入__consumer_offsets的分區(qū)號(hào)庭再,并將Group分配給該分區(qū)Leader所在的Broker上的那個(gè)Group Coordinator捞奕。
上面粗略講了Group Coordinator和它的第二個(gè)職責(zé)的實(shí)現(xiàn)方式牺堰,接下來仔細(xì)分析第一個(gè)職責(zé),就是Rebalance的機(jī)制颅围。
Rebalance觸發(fā)條件與協(xié)議
我們已經(jīng)知道伟葫,Rebalance就是一個(gè)Consumer Group內(nèi)的所有Consumer分配消費(fèi)各已訂閱的Topic的各Partition的過程。Partition的分配有內(nèi)置的三種算法實(shí)現(xiàn)(range谷浅、round-robin扒俯、sticky),用戶也可以自定義分配規(guī)則一疯,這個(gè)與本文關(guān)系不太大撼玄,就不再展開說。
那么有哪些條件會(huì)觸發(fā)Rebalance呢墩邀?列舉如下:
- Group發(fā)生變更掌猛,即新的Consumer加入,或原有的Consumer離開眉睹。Consumer離開可以是主動(dòng)的(退出)荔茬,也可以是被動(dòng)的(崩潰);
- Partition發(fā)生變更竹海,包含訂閱的Topic被創(chuàng)建或刪除慕蔚,以及現(xiàn)有訂閱Topic新增Partition等。
本文只考慮Group發(fā)生變更的情況斋配。Kafka定義了一個(gè)簡單的協(xié)議來處理這種情況下的Rebalance過程孔飒,包含以下4對(duì)請(qǐng)求/響應(yīng):
- heartbeat:Group中的Consumer周期性地給Coordinator發(fā)送該心跳信號(hào),表示自己存活艰争;
- join-group:Consumer請(qǐng)求加入Group的信號(hào)坏瞄;
- leave-group:Consumer主動(dòng)退出Group的信號(hào);
- sync-group:Coordinator將已生成的Consumer-Partition消費(fèi)對(duì)應(yīng)關(guān)系分發(fā)給Consumer的信號(hào)甩卓。
下面三張圖示出心跳超時(shí)(被動(dòng)退出)鸠匀、主動(dòng)退出和加入Group的情況。
圖中的紅色嘆號(hào)表示Coordinator感知到了變化逾柿,并觸發(fā)Rebalance過程缀棍。
Rebalance過程
整個(gè)Rebalance分為兩個(gè)大步驟:JOIN和SYNC。
JOIN
正如其名鹿寻,在這一步中睦柴,所有Consumer都會(huì)向Coordinator發(fā)送join-group,請(qǐng)求重新加入Group(那些原本已經(jīng)在Group內(nèi)的也不例外)毡熏,同時(shí)放棄掉已分配給自己的Partition坦敌。以有新的Consumer主動(dòng)加入Group(即上一節(jié)中的第三種情況)為例,圖示如下。
圖中的sync. barrier線就是表示JOIN步驟與SYNC步驟之間的分界狱窘,即從發(fā)起Rebalance開始到接受最后一個(gè)join-group之間的超時(shí)杜顺。需要特別注意的是,Broker端和Consumer端判定這個(gè)階段超時(shí)的標(biāo)準(zhǔn)是不同的蘸炸,Broker端使用rebalance.timeout.ms
躬络,而Consumer端使用max.poll.interval.ms
。所以當(dāng)遇到不明原因的長時(shí)間Rebalance時(shí)搭儒,應(yīng)考慮max.poll.interval.ms
的設(shè)定穷当。
SYNC
這一步需要做的事情是:
- Coordinator在所有Consumer里選擇一個(gè)擔(dān)任Leader,并由Leader調(diào)用Partition分配規(guī)則來確定消費(fèi)對(duì)應(yīng)關(guān)系淹禾。
- 各個(gè)Consumer發(fā)送sync-group請(qǐng)求馁菜。Leader發(fā)送的請(qǐng)求里包含有已經(jīng)確定的消費(fèi)分配信息,其他Consumer的請(qǐng)求為空铃岔。
- Coordinator將消費(fèi)分配信息原樣封裝在sync-group響應(yīng)中汪疮,并投遞給各個(gè)Consumer,最終使Group內(nèi)所有成員都獲知自己該消費(fèi)的Partition毁习。
圖示如下智嚷,其中C1被選為Leader。SYNC步驟結(jié)束后纺且,原有的1~6六個(gè)分區(qū)被分配給了C1~C3三個(gè)Consumer盏道。Rebalance就完成了。
Rebalance的改進(jìn)方案
上述Rebalance機(jī)制沿用了很長時(shí)間载碌,但是它同樣存在問題:
- stop-the-world問題——需要收回(revoke)所有Partition再重新分配(reassign)摇天,在此時(shí)間內(nèi),所有Consumer都無法進(jìn)行消費(fèi)恐仑。如果Rebalance時(shí)間長,會(huì)造成lag为鳄。
- back-and-forth問題——如果多次觸發(fā)Rebalance裳仆,很有可能造成一個(gè)Consumer消費(fèi)的Partition被分配給其他Consumer,然后又分配回來孤钦,做了無用功歧斟。
為了解決該問題,在2.4版本特別提出了Incremental(增量)Rebalance偏形。顧名思義静袖,就是Rebalance時(shí)不再讓所有Consumer都放棄掉所有已分配的Partition,而是每次先記錄俊扭,并轉(zhuǎn)化成多次少量的Rebalance過程队橙,且Consumer在此期間不會(huì)STW。簡圖如下所示。
關(guān)于該方案的細(xì)節(jié)捐康,請(qǐng)參考KIP-429仇矾。
The End
民那晚安。祝身體健康解总,百毒不侵贮匕。