1鬼廓、Kafka rebalance機制
kafka是以消費者組進行消費肿仑,一個消費者組,由多個consumer組成,他們和topic的消費規(guī)則如下:
topic的一個分區(qū)只能被消費組中的一個消費者消費尤慰。
消費者組中的一個消費者可以消費topic一個或者多個分區(qū)馏锡。
那么group中每個消費者consumer消費topic中的哪個partition是由誰決定的呢?
1.1 消費者分區(qū)策略
主要有三種rebalance的策略:range(范圍)伟端、round-robin(輪詢)杯道、sticky(粘性)、CooperativeSticky责蝠。
Kafka 提供了消費者客戶端參數(shù)partition.assignment.strategy 來設(shè)置消費者與訂閱主題之間的分區(qū)分配策略党巾。默認情況為range分配策略。
假設(shè)一個主題有10個分區(qū)(0-9)霜医,現(xiàn)在有三個consumer消費:
range策略:就是按照分區(qū)序號排序(范圍分配)齿拂,假設(shè) n=分區(qū)數(shù)/消費者數(shù)量 = 3, m=分區(qū)數(shù)%消費者數(shù)量 = 1肴敛,那么前 m 個消費者每個分配 n+1 個分區(qū)署海,后面的(消費者數(shù)量-m )個消費者每個分配 n 個分區(qū)。
比如分區(qū)03給一個consumer医男,分區(qū)46給一個consumer叹侄,分區(qū)7~9給一個consumer。round-robin策略就是輪詢分配
比如分區(qū)0昨登、3趾代、6、9給一個consumer丰辣,分區(qū)1撒强、4、7給一個consumer笙什,分區(qū)2飘哨、5、8給一個consumerSticky是粘性的意思琐凭,它是從 0.11.x 版本開始引入這種分配策略芽隆,首先會盡量均衡的放置分區(qū)到消費者上面,在出現(xiàn)同一消費者組內(nèi)消費者出現(xiàn)問題的時候统屈,在rebalance會盡量保持原有分配的分區(qū)不變化胚吁,這樣可以節(jié)省開銷。
Cooperative Sticky和Sticky類似愁憔,但是它會將原來的一次大規(guī)模rebalance操作腕扶,拆分成了多次小規(guī)模的rebalance,直至最終平衡完成吨掌,所以體驗上會更好半抱。
默認策略是Range + CooperativeSticky
1.2 rebalance(再均衡)
再均衡:在同一個消費者組當中脓恕,分區(qū)的所有權(quán)從一個消費者轉(zhuǎn)移到另外一個消費者,比如consumer group中某個消費者掛了窿侈,此時會自動把分配給他的分區(qū)交給其他的消費者炼幔,如果他又重啟了,那么又會把一些分區(qū)重新交還給他史简。
Kafka 再平衡是外部觸發(fā)導致的乃秀,觸發(fā) Kafka 再平衡的有以下幾種情況:
- 消費組成員發(fā)生變更,有新消費者加入或者離開乘瓤,或者有消費者宕機环形;
消費者并不一定需要真正下線策泣,例如遇到長時間的 GC 衙傀、網(wǎng)絡(luò)延遲導致消費者長時間未向Group Coordinator發(fā)送心跳等情況時,GroupCoordinator 會認為消費者己下線 - 消費組訂閱的主題數(shù)量發(fā)生變更萨咕;
- 消費組訂閱的分區(qū)數(shù)發(fā)生變更统抬。
rebalance過程中,消費者無法從kafka消費消息危队,這對kafka的TPS會有影響聪建,如果kafka集群內(nèi)節(jié)點較多,比如數(shù)百個茫陆,那重平衡可能會耗時極多金麸,所以應(yīng)盡量避免在系統(tǒng)高峰期的重平衡發(fā)生。
1.3 rebalance的影響
Rebalance對我們數(shù)據(jù)的影響主要有以下幾點:
1簿盅、可能重復消費: Consumer被踢出消費組挥下,可能還沒有提交offset,Rebalance時會Partition重新分配其它Consumer,會造成重復消費桨醋,雖有冪等操作但耗費消費資源棚瘟,亦增加集群壓力
2、集群不穩(wěn)定:Rebalance擴散到整個ConsumerGroup的所有消費者喜最,因為一個消費者的退出偎蘸,導致整個Group進行了Rebalance,并在一個比較慢的時間內(nèi)達到穩(wěn)定狀態(tài)瞬内,影響面較大
3迷雪、影響消費速度:頻繁的Rebalance反而降低了消息的消費速度,大部分時間都在重復消費和Rebalance
1.4 避免rebalance措施
1虫蝶、業(yè)務(wù)需要不可避免(分區(qū)振乏、消費者擴容)
(1)針對分區(qū)個數(shù)的增加, 一般不會常有秉扑,是需要增加的時候都是業(yè)務(wù)及數(shù)據(jù)需求慧邮,不可避免
(2)對Topic的訂閱增加或取消亦不可避免
2调限、合理設(shè)置消費者參數(shù)
下邊是我們遇到的,要格外關(guān)注及重視
(1)未能及時發(fā)送心跳而Rebalance
session.timeout.ms 一次session的連接超時時間
heartbeat.interval.ms 心跳時間误澳,一般為超時時間的1/3耻矮,Consumer在被判定為死亡之前,能夠發(fā)送至少 3 輪的心跳請求
(2)Consumer消費超時而Rebalance
max.poll.interval.ms 每隔多長時間去拉取消息忆谓。合理設(shè)置預期值裆装,盡量但間隔時間消費者處理完業(yè)務(wù)邏輯,否則就會被coordinator判定為死亡倡缠,踢出Consumer Group哨免,進行Rebalance
max.poll.records 一次從拉取出來的數(shù)據(jù)條數(shù)。根據(jù)消費業(yè)務(wù)處理耗費時長合理設(shè)置昙沦,如果每次max.poll.interval.ms 設(shè)置的時間較短琢唾,可以max.poll.records設(shè)置小點兒,少拉取些盾饮,這樣不會超時采桃。
總之,盡可能在max.poll.interval.ms時間間隔內(nèi)處理完max.poll.records條消息丘损,讓Coordinator認為消費Consumer還活著
2普办、消費者位移offset管理
Offset記錄著下一條將要發(fā)送給Consumer的消息的序號。
Offset從語義上來看擁有兩種:Current Offset和Committed Offset徘钥。
2.1 Current Offset
Current Offset保存在Consumer客戶端中衔蹲,它表示Consumer希望收到的下一條消息的序號。它僅僅在poll()方法中使用呈础。例如舆驶,Consumer第一次調(diào)用poll()方法后收到了20條消息,那么Current Offset就被設(shè)置為20猪落。這樣Consumer下一次調(diào)用poll()方法時贞远,Kafka就知道應(yīng)該從序號為21的消息開始讀取。這樣就能夠保證每次Consumer poll消息時笨忌,都能夠收到不重復的消息蓝仲。
2.2 Committed Offset
Committed Offset保存在Broker上,它表示Consumer已經(jīng)確認消費過的消息的序號官疲。主要通過commitSync和commitAsync
API來操作袱结。舉個例子,Consumer通過poll() 方法收到20條消息后途凫,此時Current Offset就是20垢夹,經(jīng)過一系列的邏輯處理后,并沒有調(diào)用consumer.commitAsync()或consumer.commitSync()來提交Committed Offset维费,那么此時Committed Offset依舊是0果元。
Committed Offset主要用于Consumer Rebalance促王。在Consumer Rebalance的過程中,一個partition被分配給了一個Consumer而晒,那么這個Consumer該從什么位置開始消費消息呢蝇狼?答案就是Committed Offset。另外倡怎,如果一個Consumer消費了5條消息(poll并且成功commitSync)之后宕機了迅耘,重新啟動之后它仍然能夠從第6條消息開始消費,因為Committed Offset已經(jīng)被Kafka記錄為5监署。
總結(jié)一下颤专,Current Offset是針對Consumer的poll過程的,它可以保證每次poll都返回不重復的消息钠乏;而Committed Offset是用于Consumer Rebalance過程的栖秕,它能夠保證新的Consumer能夠從正確的位置開始消費一個partition,從而避免重復消費缓熟。
在Kafka 0.9前累魔,Committed Offset信息保存在zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目錄中(zookeeper其實并不適合進行大批量的讀寫操作摔笤,尤其是寫操作)够滑。而在0.9之后,所有的offset信息都保存在了Broker上的一個名為__consumer_offsets的topic中吕世。
Kafka集群中offset的管理都是由Group Coordinator中的Offset Manager完成的彰触。
2.3 Group Coordinator
Group Coordinator是運行在Kafka集群中每一個Broker內(nèi)的一個進程。它主要負責Consumer Group的管理命辖,Offset位移管理以及Consumer Rebalance况毅。
對于每一個Consumer Group,Group Coordinator都會存儲以下信息:
- 訂閱的topics列表
- Consumer Group配置信息尔艇,包括session timeout等
- 組中每個Consumer的元數(shù)據(jù)尔许。包括主機名,consumer id
- 每個Group正在消費的topic partition的當前offsets
- Partition的ownership元數(shù)據(jù)终娃,包括consumer消費的partitions映射關(guān)系
Consumer Group如何確定自己的coordinator是誰呢味廊? 簡單來說分為兩步:
確定Consumer Group offset信息將要寫入__consumers_offsets topic的哪個分區(qū)。具體計算公式:
__consumers_offsets partition# = Math.abs(groupId.hashCode() % offsets.topic.num.partitions) //offsets.topic.num.partitions默認值為50棠耕。
該分區(qū)leader所在的broker就是被選定的coordinator
2.4 Offset存儲模型
由于一個partition只能固定的交給一個消費者組中的一個消費者消費余佛,因此Kafka保存offset時并不直接為每個消費者保存,而是以 groupid-topic-partition -> offset 的方式保存窍荧。
因此consumer poll消息時辉巡,已知groupid和topic,又通過Coordinator分配partition的方式獲得了對應(yīng)的partition蕊退,自然能夠通過Coordinator查找__consumers_offsets的方式獲得最新的offset了郊楣。