談?wù)凨afka Consumer Group的Coordinator與Rebalance機(jī)制

前言

前段時(shí)間寫了三個(gè)Spark Streaming程序侍匙,負(fù)責(zé)從Kafka訂閱群和用戶消息筷登,并做輿情監(jiān)控必須的ETL工作。它們消費(fèi)的Topic各自不同腋粥,但是分配的group.id都相同蝗岖。運(yùn)行一段時(shí)間之后侥猩,發(fā)現(xiàn)偶爾拋出如下所示的異常。

ERROR [org.apache.spark.streaming.scheduler.JobScheduler] - Error generating jobs for time 1579570560000 ms
java.lang.IllegalStateException: No current assignment for partition my_dummy_topic-11
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:192)
    at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:209)
......

經(jīng)過Google確定是由于我們使用的Spark Streaming 2.3版本無法處理Kafka Consumer Rebalance而引起的抵赢,見SPARK-22968及其對(duì)應(yīng)的PR欺劳。我們通過進(jìn)一步查看日志,確實(shí)發(fā)生了Rebalance铅鲤。

那么本文就來說說Rebalance機(jī)制划提,先從執(zhí)行Rebalance的主體——Consumer Group談起。

Consumer Group

為了使得Consumer易于組織邢享、可擴(kuò)展以及更好地容錯(cuò)鹏往,Kafka將一個(gè)或多個(gè)Consumer組織為Consumer Group,即消費(fèi)者組骇塘。Consumer Group的唯一標(biāo)識(shí)就是group.id伊履。Group內(nèi)的所有Consumer共同消費(fèi)已訂閱的各個(gè)Topic的所有Partition韩容,并且保證每個(gè)Partition只分配給該Group內(nèi)的唯一一個(gè)Consumer。對(duì)Consumer Group的管理就是實(shí)現(xiàn)以下三個(gè)職責(zé):

  • 分配并維護(hù)Consumer與Partition的消費(fèi)對(duì)應(yīng)關(guān)系唐瀑,即"who owns what"群凶。(重新)分配消費(fèi)對(duì)應(yīng)關(guān)系的過程就叫Rebalance
  • 記錄并提交Consumer消費(fèi)Partition的消費(fèi)位點(diǎn)(Offset值)哄辣,即"consumed up to where"请梢;
  • 維護(hù)Consumer Group的配置信息與Consumer元數(shù)據(jù)。

下面簡單介紹Consumer Group的管理力穗。

Group Coordinator

在0.9版本之前毅弧,Kafka強(qiáng)依賴于ZooKeeper實(shí)現(xiàn)Consumer Group的管理:

  • Group內(nèi)每個(gè)Consumer通過在ZK內(nèi)搶注節(jié)點(diǎn)來決定消費(fèi)哪些Partition,并注冊對(duì)Group和Broker相關(guān)節(jié)點(diǎn)的監(jiān)聽当窗,以獲知消費(fèi)環(huán)境的變化(其他Consumer掉線够坐、Broker宕機(jī)等),進(jìn)而觸發(fā)Rebalance超全;
  • Offset值也維護(hù)在ZK中咆霜,老生常談了。

這種方式除了過于依賴ZK嘶朱,導(dǎo)致ZK壓力偏大之外,還有兩個(gè)分布式系統(tǒng)中常見且嚴(yán)重的問題:

  • 羊群效應(yīng)(herd effect)——一個(gè)被監(jiān)聽的ZK節(jié)點(diǎn)發(fā)生變化光酣,導(dǎo)致大量的通知發(fā)送給所有監(jiān)聽者(即Consumer)疏遏;
  • 腦裂(split brain)——ZK只保證最終一致性,不同的Consumer在同一時(shí)刻可能看到不同的Group和Broker狀態(tài)救军,造成Rebalance混亂财异。

所以從0.9版本開始,Kafka重新設(shè)計(jì)了名為Group Coordinator的“協(xié)調(diào)者”服務(wù)負(fù)責(zé)實(shí)現(xiàn)上述職責(zé)唱遭,將這部分工作從ZK剝離開來戳寸。每個(gè)Broker在啟動(dòng)時(shí),都會(huì)順帶啟動(dòng)一個(gè)Group Coordinator實(shí)例拷泽。每個(gè)Consumer Group在初始化時(shí)疫鹊,都會(huì)分配給一個(gè)Group Coordinator實(shí)例來管理消費(fèi)關(guān)系和Offset,如下簡圖所示司致。

https://img.hchstudio.cn/The%20Silver%20Bullet%20for%20Endless%20Rebalancing.pdf

Group Coordinator提交Offset時(shí)也不再是向ZK寫拆吆,而是寫入那個(gè)廣為人知的特殊Topic——__consumer_offsets里。key是group-topic-partition格式的脂矫,value為Offset值枣耀。

那么該如何確定一個(gè)Consumer Group被分配給哪個(gè)Group Coordinator呢?Kafka根據(jù)groupId.hashCode() % offsets.topic.num.partitions取絕對(duì)值來得出該Group的Offset信息寫入__consumer_offsets的分區(qū)號(hào)庭再,并將Group分配給該分區(qū)Leader所在的Broker上的那個(gè)Group Coordinator捞奕。

上面粗略講了Group Coordinator和它的第二個(gè)職責(zé)的實(shí)現(xiàn)方式牺堰,接下來仔細(xì)分析第一個(gè)職責(zé),就是Rebalance的機(jī)制颅围。

Rebalance觸發(fā)條件與協(xié)議

我們已經(jīng)知道伟葫,Rebalance就是一個(gè)Consumer Group內(nèi)的所有Consumer分配消費(fèi)各已訂閱的Topic的各Partition的過程。Partition的分配有內(nèi)置的三種算法實(shí)現(xiàn)(range谷浅、round-robin扒俯、sticky),用戶也可以自定義分配規(guī)則一疯,這個(gè)與本文關(guān)系不太大撼玄,就不再展開說。

那么有哪些條件會(huì)觸發(fā)Rebalance呢墩邀?列舉如下:

  • Group發(fā)生變更掌猛,即新的Consumer加入,或原有的Consumer離開眉睹。Consumer離開可以是主動(dòng)的(退出)荔茬,也可以是被動(dòng)的(崩潰);
  • Partition發(fā)生變更竹海,包含訂閱的Topic被創(chuàng)建或刪除慕蔚,以及現(xiàn)有訂閱Topic新增Partition等。

本文只考慮Group發(fā)生變更的情況斋配。Kafka定義了一個(gè)簡單的協(xié)議來處理這種情況下的Rebalance過程孔飒,包含以下4對(duì)請(qǐng)求/響應(yīng):

  • heartbeat:Group中的Consumer周期性地給Coordinator發(fā)送該心跳信號(hào),表示自己存活艰争;
  • join-group:Consumer請(qǐng)求加入Group的信號(hào)坏瞄;
  • leave-group:Consumer主動(dòng)退出Group的信號(hào);
  • sync-group:Coordinator將已生成的Consumer-Partition消費(fèi)對(duì)應(yīng)關(guān)系分發(fā)給Consumer的信號(hào)甩卓。

下面三張圖示出心跳超時(shí)(被動(dòng)退出)鸠匀、主動(dòng)退出和加入Group的情況。

心跳超時(shí)——注意heartbeat.interval.ms與session.timeout.ms兩個(gè)參數(shù)的作用
主動(dòng)退出Group
加入Group

圖中的紅色嘆號(hào)表示Coordinator感知到了變化逾柿,并觸發(fā)Rebalance過程缀棍。

Rebalance過程

整個(gè)Rebalance分為兩個(gè)大步驟:JOIN和SYNC。

JOIN

正如其名鹿寻,在這一步中睦柴,所有Consumer都會(huì)向Coordinator發(fā)送join-group,請(qǐng)求重新加入Group(那些原本已經(jīng)在Group內(nèi)的也不例外)毡熏,同時(shí)放棄掉已分配給自己的Partition坦敌。以有新的Consumer主動(dòng)加入Group(即上一節(jié)中的第三種情況)為例,圖示如下。

圖中的sync. barrier線就是表示JOIN步驟與SYNC步驟之間的分界狱窘,即從發(fā)起Rebalance開始到接受最后一個(gè)join-group之間的超時(shí)杜顺。需要特別注意的是,Broker端和Consumer端判定這個(gè)階段超時(shí)的標(biāo)準(zhǔn)是不同的蘸炸,Broker端使用rebalance.timeout.ms躬络,而Consumer端使用max.poll.interval.ms。所以當(dāng)遇到不明原因的長時(shí)間Rebalance時(shí)搭儒,應(yīng)考慮max.poll.interval.ms的設(shè)定穷当。

SYNC

這一步需要做的事情是:

  • Coordinator在所有Consumer里選擇一個(gè)擔(dān)任Leader,并由Leader調(diào)用Partition分配規(guī)則來確定消費(fèi)對(duì)應(yīng)關(guān)系淹禾。
  • 各個(gè)Consumer發(fā)送sync-group請(qǐng)求馁菜。Leader發(fā)送的請(qǐng)求里包含有已經(jīng)確定的消費(fèi)分配信息,其他Consumer的請(qǐng)求為空铃岔。
  • Coordinator將消費(fèi)分配信息原樣封裝在sync-group響應(yīng)中汪疮,并投遞給各個(gè)Consumer,最終使Group內(nèi)所有成員都獲知自己該消費(fèi)的Partition毁习。

圖示如下智嚷,其中C1被選為Leader。SYNC步驟結(jié)束后纺且,原有的1~6六個(gè)分區(qū)被分配給了C1~C3三個(gè)Consumer盏道。Rebalance就完成了。

Rebalance的改進(jìn)方案

上述Rebalance機(jī)制沿用了很長時(shí)間载碌,但是它同樣存在問題:

  • stop-the-world問題——需要收回(revoke)所有Partition再重新分配(reassign)摇天,在此時(shí)間內(nèi),所有Consumer都無法進(jìn)行消費(fèi)恐仑。如果Rebalance時(shí)間長,會(huì)造成lag为鳄。
  • back-and-forth問題——如果多次觸發(fā)Rebalance裳仆,很有可能造成一個(gè)Consumer消費(fèi)的Partition被分配給其他Consumer,然后又分配回來孤钦,做了無用功歧斟。

為了解決該問題,在2.4版本特別提出了Incremental(增量)Rebalance偏形。顧名思義静袖,就是Rebalance時(shí)不再讓所有Consumer都放棄掉所有已分配的Partition,而是每次先記錄俊扭,并轉(zhuǎn)化成多次少量的Rebalance過程队橙,且Consumer在此期間不會(huì)STW。簡圖如下所示。

關(guān)于該方案的細(xì)節(jié)捐康,請(qǐng)參考KIP-429仇矾。

The End

民那晚安。祝身體健康解总,百毒不侵贮匕。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市花枫,隨后出現(xiàn)的幾起案子刻盐,更是在濱河造成了極大的恐慌,老刑警劉巖劳翰,帶你破解...
    沈念sama閱讀 207,113評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件敦锌,死亡現(xiàn)場離奇詭異,居然都是意外死亡磕道,警方通過查閱死者的電腦和手機(jī)供屉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來溺蕉,“玉大人伶丐,你說我怎么就攤上這事》杼兀” “怎么了哗魂?”我有些...
    開封第一講書人閱讀 153,340評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長漓雅。 經(jīng)常有香客問我录别,道長,這世上最難降的妖魔是什么邻吞? 我笑而不...
    開封第一講書人閱讀 55,449評(píng)論 1 279
  • 正文 為了忘掉前任组题,我火速辦了婚禮,結(jié)果婚禮上抱冷,老公的妹妹穿的比我還像新娘崔列。我一直安慰自己,他們只是感情好旺遮,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,445評(píng)論 5 374
  • 文/花漫 我一把揭開白布赵讯。 她就那樣靜靜地躺著,像睡著了一般耿眉。 火紅的嫁衣襯著肌膚如雪边翼。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,166評(píng)論 1 284
  • 那天鸣剪,我揣著相機(jī)與錄音组底,去河邊找鬼丈积。 笑死,一個(gè)胖子當(dāng)著我的面吹牛斤寇,可吹牛的內(nèi)容都是我干的桶癣。 我是一名探鬼主播,決...
    沈念sama閱讀 38,442評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼娘锁,長吁一口氣:“原來是場噩夢啊……” “哼牙寞!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起莫秆,我...
    開封第一講書人閱讀 37,105評(píng)論 0 261
  • 序言:老撾萬榮一對(duì)情侶失蹤间雀,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后镊屎,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體惹挟,經(jīng)...
    沈念sama閱讀 43,601評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,066評(píng)論 2 325
  • 正文 我和宋清朗相戀三年缝驳,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了连锯。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,161評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡用狱,死狀恐怖运怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情夏伊,我是刑警寧澤摇展,帶...
    沈念sama閱讀 33,792評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站溺忧,受9級(jí)特大地震影響咏连,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜鲁森,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,351評(píng)論 3 307
  • 文/蒙蒙 一祟滴、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧歌溉,春花似錦踱启、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽透罢。三九已至榜晦,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間羽圃,已是汗流浹背乾胶。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評(píng)論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人识窿。 一個(gè)月前我還...
    沈念sama閱讀 45,618評(píng)論 2 355
  • 正文 我出身青樓斩郎,卻偏偏與公主長得像,于是被迫代替她去往敵國和親喻频。 傳聞我的和親對(duì)象是個(gè)殘疾皇子缩宜,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,916評(píng)論 2 344

推薦閱讀更多精彩內(nèi)容