1. 消費者與消費者組
首先簡單了解一下消費者和消費者組:
假設(shè)某 topic 有4個分區(qū)棘催,消費者組中只有一個消費者,那么這個消費者將消費全部 partition 中的數(shù)據(jù)。
如果消費者組中有兩個消費者,那么每個消費者消費兩個 partition。
如果消費者組中有4個消費者钳踊,那么每個消費者消費一個partition。
如果消費者組中有5個消費者勿侯,那么有一個消費者就是空閑的拓瞪。
注意:在同一個消費者組中,不要讓消費者的數(shù)量大于分區(qū)的數(shù)量
多個消費者組之間不會互相影響助琐。
那么消費者和消費者組的運行機制是什么樣的祭埂?它們是怎么通信的?這些都要依賴于本文所探討的協(xié)調(diào)器兵钮。
2. 協(xié)調(diào)器
在 kafka-0.10 版本蛆橡,Kafka 在服務(wù)端引入了組協(xié)調(diào)器(GroupCoordinator),每個 Kafka Server 啟動時都會創(chuàng)建一個 GroupCoordinator 實例掘譬,用于管理部分消費者組和該消費者組下的每個消費者的消費偏移量泰演。同時在客戶端引入了消費者協(xié)調(diào)器(ConsumerCoordinator),實例化一個消費者就會實例化一個 ConsumerCoordinator 對象葱轩,ConsumerCoordinator 負責(zé)同一個消費者組下各消費者與服務(wù)端的 GroupCoordinator 進行通信睦焕。
(1) 消費者協(xié)調(diào)器(ConsumerCoordinator)
ConsumerCoordinator 定義的位置:
public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final ConsumerCoordinator coordinator;
}
ConsumerCoordinator 是 KafkaConsumer 的一個私有的成員變量,因此 ConsumerCoordinator 中存儲的信息也只有與之對應(yīng)的消費者可見靴拱,不同消費者之間是看不到彼此的 ConsumerCoordinator 中的信息的垃喊。
ConsumerCoordinator 的作用:
- 處理更新消費者緩存的 Metadata 請求
- 向組協(xié)調(diào)器發(fā)起加入消費者組的請求
- 對本消費者加入消費者前后的相應(yīng)處理
- 請求離開消費者組(例如當消費者取消訂閱時)
- 向組協(xié)調(diào)器發(fā)送提交偏移量的請求
- 通過一個定時的心跳檢測任務(wù)來讓組協(xié)調(diào)器感知自己的運行狀態(tài)
- Leader消費者的 ConsumerCoordinator 還負責(zé)執(zhí)行分區(qū)的分配趾疚,一個消費者組中消費者 leader 由組協(xié)調(diào)器選出绢涡,leader 消費者的 ConsumerCoordinator 負責(zé)消費者與分區(qū)的分配,然后把分配結(jié)果發(fā)送給組協(xié)調(diào)器寝受,然后組協(xié)調(diào)器再把分配結(jié)果返回給其他消費者的消費者協(xié)調(diào)器偎窘,這樣減輕了服務(wù)端的負擔(dān)
ConsumerCoordinator 實現(xiàn)上述功能的組件是 ConsumerCoordinator 類的私有成員或者是其父類的私有成員:
public final class ConsumerCoordinator extends AbstractCoordinator {
private final List<PartitionAssignor> assignors;
private final OffsetCommitCallback defaultOffsetCommitCallback;
private final SubscriptionState subscriptions;
private final ConsumerInterceptors<?, ?> interceptors;
private boolean isLeader = false;
private MetadataSnapshot metadataSnapshot;
private MetadataSnapshot assignmentSnapshot;
省略了部分代碼....
}
public abstract class AbstractCoordinator implements Closeable {
private enum MemberState {
UNJOINED, // the client is not part of a group
REBALANCING, // the client has begun rebalancing
STABLE, // the client has joined and is sending heartbeats
}
private final Heartbeat heartbeat;
protected final ConsumerNetworkClient client;
private HeartbeatThread heartbeatThread = null;
private MemberState state = MemberState.UNJOINED;
private RequestFuture<ByteBuffer> joinFuture = null;
省略了部分代碼....
}
各組件及其功能如下圖所示:
(2) 組協(xié)調(diào)器(GroupCoordinator)
GroupCoordinator 的作用:
- 負責(zé)對其管理的組員(消費者)提交的相關(guān)請求進行處理
- 與消費者之間建立連接乌助,并從與之連接的消費者之間選出一個 leader
- 當 leader 分配好消費者與分區(qū)的訂閱關(guān)系后,會把結(jié)果發(fā)送給組協(xié)調(diào)器陌知,組協(xié)調(diào)器再把結(jié)果返回給各個消費者
- 管理與之連接的消費者的消費偏移量的提交眷茁,將每個消費者的消費偏移量保存到kafka的內(nèi)部主題中
- 通過心跳檢測消費者與自己的連接狀態(tài)
- 啟動組協(xié)調(diào)器的時候創(chuàng)建一個定時任務(wù),用于清理過期的消費組元數(shù)據(jù)以及過去的消費偏移量信息
GroupCoordinator 依賴的組件及其作用:
- KafkaConfig:實例化 OffsetConfig 和 GroupConfig
- ZkUtils:分消費者分配組協(xié)調(diào)器時從Zookeeper獲取內(nèi)部主題的分區(qū)元數(shù)據(jù)信息纵诞。
- GroupMetadataManager:負責(zé)管理 GroupMetadata以及消費偏移量的提交,并提供了一系列的組管理的方法供組協(xié)調(diào)器調(diào)用培遵。GroupMetadataManager 不僅把 GroupMetadata 寫到kafka內(nèi)部主題中浙芙,而且還在內(nèi)存中緩存了一份GroupMetadata登刺,其中包括了組員(消費者)的元數(shù)據(jù)信息,例如消費者的 memberId嗡呼、leaderId纸俭、分區(qū)分配關(guān)系,狀態(tài)元數(shù)據(jù)等南窗。狀態(tài)元數(shù)據(jù)可以是以下五種狀態(tài):
- PreparingRebalance:消費組準備進行平衡操作
- AwaitingSync:等待leader消費者將分區(qū)分配關(guān)系發(fā)送給組協(xié)調(diào)器
- Stable:消費者正常運行狀態(tài)揍很,心跳檢測正常
- Dead:處于該狀態(tài)的消費組沒有任何消費者成員,且元數(shù)據(jù)信息也已經(jīng)被刪除
- Empty:處于該狀態(tài)的消費組沒有任何消費者成員万伤,但元數(shù)據(jù)信息也沒有被刪除窒悔,知道所有消費者對應(yīng)的消費偏移量元數(shù)據(jù)信息過期。
- ReplicaManager:GroupMetadataManager需要把消費組元數(shù)據(jù)信息以及消費者提交的已消費偏移量信息寫入 Kafka 內(nèi)部主題中敌买,對內(nèi)部主題的操作與對其他主題的操作一樣简珠,先通過 ReplicaManager 將消息寫入 leader 副本,ReplicaManager 負責(zé) leader 副本與其他副本的管理虹钮。
- DelayedJoin:延遲操作類聋庵,用于監(jiān)視處理所有消費組成員與組協(xié)調(diào)器之間的心跳超時
- GroupConfig:定義了組成員與組協(xié)調(diào)器之間session超時時間配置
3. 消費者協(xié)調(diào)器和組協(xié)調(diào)器的交互
(1) 心跳
消費者協(xié)調(diào)器通過和組協(xié)調(diào)器發(fā)送心跳來維持它們和群組的從屬關(guān)系以及它們對分區(qū)的所有權(quán)關(guān)系。只要消費者以正常的時間間隔發(fā)送心跳芙粱,就被認為是活躍的祭玉,說明它還在讀取分區(qū)里的消息。消費者會在輪詢獲取消息或提交偏移量時發(fā)送心跳春畔。
如果消費者停止發(fā)送心跳的時間足夠長脱货,會話就會過期,組協(xié)調(diào)器認為它已經(jīng)死亡拐迁,就會觸發(fā)一次再均衡蹭劈。
在 0.10 版本里,心跳任務(wù)由一個獨立的心跳線程來執(zhí)行线召,可以在輪詢獲取消息的空檔發(fā)送心跳铺韧。這樣一來,發(fā)送心跳的頻率(也就是組協(xié)調(diào)器群檢測消費者運行狀態(tài)的時間)與消息輪詢的頻率(由處理消息所花費的時間來確定)之間就是相互獨立的缓淹。在0.10 版本的 Kafka 里哈打,可以指定消費者在離開群組并觸發(fā)再均衡之前可以有多長時間不進行消息輪詢,這樣可以避免出現(xiàn)活鎖(livelock)讯壶,比如有時候應(yīng)用程序并沒有崩潰料仗,只是由于某些原因?qū)е聼o法正常運行。這個配置與
session.timeout.ms 是相互獨立的伏蚊,后者用于控制檢測消費者發(fā)生崩潰的時間和停止發(fā)送心跳的時間立轧。
(2) 分區(qū)再均衡
發(fā)生分區(qū)再均衡的3種情況:
- 一個新的消費者加入群組時,它讀取的是原本由其他消費者讀取的消息。
- 當一個消費者被關(guān)閉或發(fā)生崩潰時氛改,它就離開群組帐萎,原本由它讀取的分區(qū)將由群組里的其他消費者來讀取。如果一個消費者主動離開消費組胜卤,消費者會通知組協(xié)調(diào)器它將要離開群組疆导,組協(xié)調(diào)器會立即觸發(fā)一次再均衡,盡量降低處理停頓葛躏。如果一個消費者意外發(fā)生崩潰澈段,沒有通知組協(xié)調(diào)器就停止讀取消息,組協(xié)調(diào)器會等待幾秒鐘舰攒,確認它死亡了才會觸發(fā)再均衡败富。在這幾秒鐘時間里,死掉的消費者不會讀取分區(qū)里的消息芒率。
- 在主題發(fā)生變化時囤耳,比如管理員添加了新的分區(qū),會發(fā)生分區(qū)重分配偶芍。
分區(qū)的所有權(quán)從一個消費者轉(zhuǎn)移到另一個消費者充择,這樣的行為被稱為分區(qū)再均衡。再均衡非常重要匪蟀,它為消費者群組帶來了高可用性和伸縮性(我們可以放心地添加或移除消費者)椎麦,不過在正常情況下,我們并不希望發(fā)生這樣的行為材彪。在再均衡期間观挎,消費者無法讀取消息,造成整個群組一小段時間的不可用段化。另外嘁捷,當分區(qū)被重新分配給另一個消費者時,消費者當前的讀取狀態(tài)會丟失显熏,它有可能還需要去刷新緩存雄嚣,在它重新恢復(fù)狀態(tài)之前會拖慢應(yīng)用程序。
(3) leader 消費者分配分區(qū)的策略
當消費者要加入群組時喘蟆,它會向群組協(xié)調(diào)器發(fā)送一個 JoinGroup 請求缓升。第一個加入群組的消費者將成為leader消費者。leader消費者從組協(xié)調(diào)器那里獲得群組的成員列表(列表中包含了所有最近發(fā)送過心跳的消費者蕴轨,它們被認為是活躍的)港谊,并負責(zé)給每一個消費者分配分區(qū)。
每個消費者的消費者協(xié)調(diào)器在向組協(xié)調(diào)器請求加入組時橙弱,都會把自己支持的分區(qū)分配策略報告給組協(xié)調(diào)器(輪詢或者是按跨度分配或者其他)歧寺,組協(xié)調(diào)器選出該消費組下所有消費者都支持的的分區(qū)分配策略發(fā)送給leader消費者燥狰,leader消費者根據(jù)這個分區(qū)分配策略進行分配。
完畢之后成福,leader消費者把分配情況列表發(fā)送給組協(xié)調(diào)器碾局,消費者協(xié)調(diào)器再把這些信息發(fā)送給所有消費者。每個消費者只能看到自己的分配信息奴艾,只有l(wèi)eader消費者知道群組里所有消費者的分配信息。這個過程會在每次再均衡時重復(fù)發(fā)生内斯。
(4) 消費者入組過程
- 消費者創(chuàng)建后蕴潦,消費者協(xié)調(diào)器會選擇一個負載較小的節(jié)點,向該節(jié)點發(fā)送尋找組協(xié)調(diào)器的請求
-
KafkaApis 處理請求俘闯,調(diào)用返回組協(xié)調(diào)器所在的節(jié)點潭苞,過程如下:
- 找到組協(xié)調(diào)器后,消費者協(xié)調(diào)器申請加入消費組真朗,發(fā)送 JoinGroupRequest請求
- KafkaApis 調(diào)用 handleJoinGroup() 方法處理請求
- 把消費者注冊到消費組中
- 把消費者的clientId與一個UUID值生成一個memberId分配給消費者
- 構(gòu)造器該消費者的MemberMetadata信息
- 把該消費者的MemberMetadata信息注冊到GroupMetadata中
- 第一個加入組的消費者將成為leader
- 把處理JoinGroupRequest請求的結(jié)果返回給消費者
- 加入組成功后此疹,進行分區(qū)再均衡