Kafka服務端之KafkaController

[TOC]
在上一節(jié)對副本機制的實現(xiàn)進行了分析,其中提到Broker能夠處理來自KafkaController的LeaderAndIsrRequest胡嘿、StopReplicaRequest、UpdateMetadataRequest等請求钳踊。本節(jié)將介紹KafkaController在集群中扮演的角色以及KafkaController與各個Broker之間如何協(xié)同工作衷敌。

在Kafka集群的多個Broker中勿侯,有一個Broker會被選舉為Controller Leader,負責管理整個集群中所有的分區(qū)和副本的狀態(tài)缴罗。例如:當某分區(qū)的Leader副本出現(xiàn)故障時助琐,由Controller負責為該分區(qū)重新選舉新的Leader副本;當使用kafka-topics腳本增加某Topic的分區(qū)數(shù)量時面氓,由Controller管理分區(qū)的重新分配兵钮;當檢測到分區(qū)的ISR集合發(fā)生變化時,由Controller通知集群中所有的Broker更新其MetadataCache信息舌界。

為了實現(xiàn)Controller的高可用掘譬,一個Broker被選為Leader之后,其他的Broker都會成為Follower(不加特殊說明的情況下呻拌,本節(jié)的“Leader/Follower”指的都是KafkaController的Leader/Follower葱轩,請讀者不要與副本機制中的Leader副本和Follower副本混淆),會從剩下的Follower中選出新的Controller Leader來管理集群藐握。

選舉Controller Leader依賴于ZooKeeper實現(xiàn)靴拱,每個Broker啟動時都會創(chuàng)建一個KafkaController對象,但是集群中只能存在一個Controller Leader來對外提供服務趾娃。在集群啟動時缭嫡,多個Broker上的KafkaController會在指定路徑下競爭創(chuàng)建節(jié)點,只有第一個成功創(chuàng)建節(jié)點的KafkaController才能成為Leader抬闷,而其余的KafkaController則成為Follower妇蛀。當Leader出現(xiàn)故障后,所有的Follower會收到通知笤成,再次競爭在該路徑下創(chuàng)建節(jié)點從而選出新的Leader评架。這也是ZooKeeper的一種常見用法。

在Kafka早期版本中并沒有采用KafkaController的設計來對分區(qū)和副本狀態(tài)進行管理炕泳,而是依賴于ZooKeeper的Watcher和隊列纵诞。在早期版本的設計中,每個Broker都會在ZooKeeper上注冊Watcher培遵,ZooKeeper上就會出現(xiàn)大量Watcher浙芙,當分區(qū)或副本狀態(tài)變化時會喚醒很多不必要的Watcher,這種嚴重依賴ZooKeeper的設計出現(xiàn)了 “腦裂”籽腕、“羊群效應”以及ZooKeeper集群過載的情況嗡呼。在新版本設計中,只有Controller Leader在ZooKeeper上注冊Watcher皇耗,其他Broker幾乎不用再監(jiān)聽ZooKeeper中的數(shù)據(jù)變化南窗。舊版本中Broker之間傳遞事件依賴于ZooKeeper的設計比較低效,在新版設計中Controller Leader直接與Broker交互。舊版本的設計畢竟已經(jīng)廢棄万伤,本節(jié)不做過多介紹窒悔。需要讀者了解的是,在設計分布式系統(tǒng)時要適度依賴ZooKeeper集群敌买,合理利用ZooKeeper Watcher简珠,否則就會出現(xiàn)上述問題。

我們先通過圖4-55了解ZooKeeper中與KafkaController相關的路徑以及該路徑中記錄的內(nèi)容的含義虹钮。

  • ·/brokers/ids/[id]:記錄了集群中可用Broker的id北救。
  • ·/brokers/topics/[topic]/partitions:記錄了一個Topic中所有分區(qū)的分配信息以及AR集合信息。
  • ·/brokers/topics/[topic]/partitions/[partition_id]/state:記錄了某Partition的Leader副本所在BrokerId芜抒、lead_epoch、ISR集合托启、ZKVersion等信息宅倒。
  • ·/controller_epoch:記錄了當前Controller Leader的年代信息。
  • ·/controller:記錄了當前Controller Leader的Id屯耸,也用于Controller Leader的選舉拐迁。
  • ·/admin/reassign_partitions:記錄了需要進行副本重新分配的分區(qū)。
  • ·/admin/preferred_replica_election:記錄了需要進行“優(yōu)先副本”選舉的分區(qū)疗绣∠哒伲“優(yōu)先副本”是在創(chuàng)建分區(qū)時為其指定的第一個副本。
  • ·/admin/delete_topics:記錄了待刪除的Topic多矮。
  • ·/isr_change_notification:記錄了一段時間內(nèi)ISR集合發(fā)生變化的分區(qū)缓淹。
  • ·/config:記錄了一些配置信息。
image.png

在詳細介紹KafkaController的相關組件之前塔逃,先從整體上了解KafkaController的設計讯壶,以及組件之間的依賴關系如圖4-56所示。

image.png
  • ·KafkaController組織并封裝了其他組件湾盗,對外提供API接口伏蚊。
  • ·ZookeeperLeaderElector主要用于Controller Leader的選舉。
  • ·ControllerContextKafkaController的上下文信息格粪,緩存了ZooKeeper中記錄的整個集群的元信息躏吊,例如,可用Broker帐萎、全部的Topic比伏、分區(qū)、副本的信息吓肋。
  • ·ControllerChannelManager維護了Controller Leader與集群中其他Broker之間的網(wǎng)絡連接凳怨,是管理整個集群的基礎。
  • ·TopicDeletionManager用于對指定的Topic進行刪除。
  • ·PartitionStateMachine用于管理集群中所有Partition狀態(tài)的狀態(tài)機肤舞。
  • ·ReplicaStateMachine用于管理集群中所有副本狀態(tài)的狀態(tài)機紫新。
  • ·ControllerBrokerRequestBatch實現(xiàn)了向Broker批量發(fā)送請求的功能。
  • ·*PartitionLeaderSelector實現(xiàn)了多種Leader副本選舉策略李剖。
  • ·*Listener是ZooKeeper上的監(jiān)聽器芒率,實現(xiàn)了對ZooKeeper上某些節(jié)點中的數(shù)據(jù)、子節(jié)點或ZooKeeper Session狀態(tài)的監(jiān)聽篙顺,被觸發(fā)后調(diào)用相應的業(yè)務邏輯偶芍。

從另一個角度來看,KafkaController是ZooKeeper與Kafka集群交互的橋梁:它一方面對ZooKeeper進行監(jiān)聽德玫,其中包括Broker寫入到ZooKeeper中的數(shù)據(jù)匪蟀,也包括管理員使用腳本寫入的數(shù)據(jù);另一方面根據(jù)ZooKeeper中數(shù)據(jù)的變化做出相應的處理宰僧,通過LeaderAndIsrRequest材彪、StopReplicaRequest、UpdateMetadataRequest等請求控制每個Broker的工作琴儿。而且段化,KafkaController本身也通過ZooKeeper提供了高可用的機制。通過上述組件之間的協(xié)調(diào)工作造成,構成了一個統(tǒng)一的整體显熏。

ControllerChannelManager

Controller Leader通過發(fā)送多種請求管理集群中的其他Broker,KafkaController使用ControllerChannelManager管理其與集群中各個Broker之間的網(wǎng)絡交互晒屎。ControllerChannelManager中使用ControllerBrokerStateInfo類表示與一個Broker連接的各種信息喘蟆。ControllerBrokerStateInfo的定義如下:

image.png

RequestSendThread繼承了ShutdownableThread,在線程停止之前會循環(huán)執(zhí)行doWork()方法夷磕,通過NetworkClientBlockingOps完成發(fā)送請求并阻塞等待響應履肃。

image.png

ControllerChannelManager的核心字段是brokerStateInfo(HashMap[Int,ControllerBrokerStateInfo]類型),用于管理集群中每個Broker對應的ControllerBrokerStateInfo對象坐桩。其初始化過程如下:

image.png

ControllerChannelManager.addNewBroker()方法和removeBroker()方法實現(xiàn)了對brokerStateInfo集合的管理尺棋,sendRequest()方法向指定Broker發(fā)送請求。

image.png
image.png

ControllerContext
ControllerContext中維護了Controller使用到的上下文信息绵跷,從其構造函數(shù)也能猜到膘螟,ControllerContext與ZooKeeper有密切的關系,也可以將ControllerContext看作ZooKeeper數(shù)據(jù)的緩存碾局。

image.png

ControllerContext中各個字段的含義和作用如下所述荆残。

  • ·controllerChannelManager:管理Controller與集群中Broker之間的連接。
  • ·shuttingDownBrokerIds:正在關閉的BrokerId集合净当。
  • ·epoch:Controller的年代信息内斯,初始為0蕴潦。Controller的年代信息存儲的ZK路徑是“/controller_epoch”。每次重新選舉新的Leader Controller俘闯,epoch字段值就會增加1潭苞。
  • ·epochZkVersion:年代信息的ZK版本,初始為0真朗。
  • ·allTopics:整個集群中全部的Topic名稱此疹。
  • ·partitionReplicaAssignment:Map[TopicAndPartition, Seq[Int]]類型,記錄了每個分區(qū)的AR集合遮婶。
  • ·partitionLeadershipInfo:Map[TopicAndPartition, LeaderIsrAndControllerEpoch]類型蝗碎,記錄了每個分區(qū)的Leader副本所在的BrokerId、ISR集合以及controller_epoch等信息旗扑。其中LeaderIsrAndControllerEpoch的定義如下
image.png
  • ·partitionBeingReassigned:Map[TopicAndPartition, ReassignedPartitionsContext]類型蹦骑,記錄了正在重新分配副本的分區(qū)。該集合的value是ReassignedPartitionsContext類型臀防,其中封裝了新分配的AR集合信息以及用于監(jiān)聽ISR集合變化的ReassignedPa rtitionsIsrChangeListener脊串,其定義如下:
image.png
  • ·partitionsUndergoingPreferredReplicaElection:Set[TopicAndPartition]類型,記錄了正在進行“優(yōu)先副本”選舉的分區(qū)清钥。
  • ·liveBrokersUnderlying:Set[Broker]類型,記錄了當前可用的Broker集合放闺。
  • ·liveBrokerIdsUnderlying:Set[Int]類型祟昭,記錄了當前可用的BrokerId集合。

ControllerContext為liveBrokersUnderlying字段怖侦、liveBrokerIdsUnderlying字段和shuttingDownBrokerIds字段提供了相關的集合操縱方法篡悟。

  • ·partitionsOnBroker:獲取在指定Broker中存在有副本的分區(qū)集合。
  • ·replicasOnBrokers :獲取指定Broker集合中保存的所有副本匾寝。
  • ·replicasForTopic:獲取指定Topic的所有副本搬葬。
  • ·partitionsForTopic:獲取指定Topic的所有分區(qū)
  • ·allLiveReplicas:獲取所有可用Broker中保存的副本
  • ·replicasForPartition:獲取指定分區(qū)集合的副本。
  • ·removeTopic:刪除指定Topic艳悔。

ControllerBrokerRequestBatch
為了提高Controller Leader與集群中其他Broker的通信效率急凰,KafkaController使用ControllerBrokerRequestBatch組件實現(xiàn)批量發(fā)送請求的功能。

ControllerBrokerRequestBatch的核心字段如下所述猜年。

  • ·leaderAndIsrRequestMap:Map [Int,Map[TopicPartition, PartitionStateInfo]]類型抡锈,記錄了發(fā)往指定Broker的LeaderAndIsrRequest所需的信息,其中PartitionStateInfo的定義如下:
image.png
  • ·stopReplicaRequestMap:Map[Int, Seq[StopReplicaRequestInfo]]類型乔外,記錄了發(fā)往指定Broker的StopReplicaRequest所需的信息床三,其中StopReplicaRequestInfo的定義如下:
image.png
  • ·updateMetadataRequestMap:Map [Int,Map[TopicPartition, PartitionStateInfo]]類型,記錄了發(fā)往指定Broker的UpdateMetadataRequest集合杨幼。

ControllerBrokerRequestBatch的常規(guī)用法如下:


image.png

ControllerBrokerRequestBatch.newBatch()方法會檢測三個請求集合是否為空撇簿,如果不為空則拋出異常聂渊。ControllerBrokerRequestBatch.clear()方法則會清空三個請求集合。

ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers()方法會向leaderAndIsrRequestMap集合中添加待發(fā)送的LeaderAndIsrRequest所需的數(shù)據(jù)四瘫,同時會調(diào)用addUpdateMetadataRequestForBrokers()方法準備向集群中所有可用的Broker發(fā)送UpdateMetadataRequest汉嗽。

image.png

ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers()方法的代碼如下:


image.png
image.png

addStopReplicaRequestForBrokers()方法會向stopReplicaRequestMap集合中添加StopReplicaRequest所需的數(shù)據(jù),具體實現(xiàn)與上述兩個add*RequestForBroker()類似莲组,不再贅述诊胞。

ControllerBrokerRequestBatch.sendRequestsToBrokers()方法會使用上述三個集合中的數(shù)據(jù)來創(chuàng)建相應的請求,并添加到ControllerChannelManager中對應的messageQueue隊列中锹杈,最終由RequestSendThread線程將請求發(fā)送出去撵孤。

image.png

PartitionStateMachine
PartitionStateMachine是Controller Leader用于維護分區(qū)狀態(tài)的狀態(tài)機。分區(qū)的狀態(tài)是通過PartitionState接口定義的竭望,它有四個子類分別代表了分區(qū)四種可能的狀態(tài)邪码,如表4-2所示。

image.png

分區(qū)各個PartitionState之間的轉(zhuǎn)換如圖4-57所示咬清。


image.png

下面分析各個狀態(tài)之間轉(zhuǎn)換時闭专,需要完成的相關操作。

  • ·NonExistentPartition→NewPartition
    從ZooKeeper中加載分區(qū)的AR集合到ControllerContext的partitionReplicaAssignment集合中旧烧。

  • ·NewPartition→OnlinePartition
    首先將Leader副本和ISR集合的信息寫入到ZooKeeper中影钉,這里會將分區(qū)的AR集合中第一個可用的副本選舉為Leader副本,并將分區(qū)的所有可用副本作為ISR集合掘剪。之后平委,向所有可用的副本發(fā)送LeaderAndIsrRequest,指導這些副本進行Leader/Follower的角色切換夺谁,并向所有可用的Broker發(fā)送UpdateMetadataRequest來更新其上的MetadataCache

  • ·OnlinePartitio/OfflinePartition→OnlinePartition

為分區(qū)選擇新的Leader副本和ISR集合廉赔,并將結果寫入ZooKeeper。之后匾鸥,向需要進行角色切換的副本發(fā)送LeaderAndIsrRequest蜡塌,指導這些副本進行Leader/Follower的角色切換,并向所有可用的Broker發(fā)送UpdateMetadataRequest來更新其上的MetadataCache勿负。

  • ·NewPartition,OnlinePartition→OfflinePartition
    只進行狀態(tài)轉(zhuǎn)換馏艾,并沒有其他的操作。
  • ·OfflinePartition→NonExistentPartition
    只進行狀態(tài)轉(zhuǎn)換奴愉,并沒有其他的操作攒至。

PartitionStateMachine中的各個字段含義和作用如下所述途样。

  • ·controllerContext:ControllerContext對象盖高,用于維護KafkaController的上下文信息敞映。

  • ·zkUtils:ZooKeeper的客戶端献宫,用于與ZooKeeper服務器交互察滑。

  • ·partitionState:Map[TopicAndPartition, PartitionState]類型晃洒,記錄了每個分區(qū)對應的PartitionState狀態(tài)人灼。

  • ·brokerRequestBatch:ControllerBrokerRequestBatch對象坐昙,用于向指定的Broker批量發(fā)送請求。

  • noOpPartitionLeaderSelector:默認的Leader副本選舉類器溉浙,繼承了PartitionLeaderSelector烫止。NoOpLeaderSelector實現(xiàn)并沒有真正進行Leader副本的選舉,其實現(xiàn)是返回當前的Leader副本戳稽、ISR集合和AR集合馆蠕。關于PartitionLeaderSelector的其他實現(xiàn),下文詳述惊奇。

  • ·topicChangeListener:ZooKeeper的監(jiān)聽器互躬,用于監(jiān)聽Topic的變化。

  • ·deleteTopicsListener:ZooKeeper的監(jiān)聽器颂郎,用于監(jiān)聽Topic的刪除吼渡。

  • ·partitionModificationsListeners:用于監(jiān)聽分區(qū)的修改。

關于ZooKeeper監(jiān)聽器的相關介紹乓序,下文詳述寺酪。

PartitionStateMachine啟動時會對partitionState集合進行初始化,并調(diào)用triggerO nlinePartitionStateChange()方法將NewPartition和OfflinePartition狀態(tài)的分區(qū)轉(zhuǎn)換成OnlinePartition狀態(tài)

image.png

每個分區(qū)初始狀態(tài)的依據(jù)是controllerContext.partitionLeadershipInfo中記錄的Leader副本信息和ISR集合信息替劈。

image.png

PartitionStateMachine.handleStateChange()方法是管理分區(qū)狀態(tài)的核心方法寄雀,該方法控制著PartitionState的轉(zhuǎn)換。這里需要注意該方法的第三個參數(shù)陨献,它指定了用來選舉Leader副本的PartitionLeaderSelector對象咙俩。

image.png
image.png

PartitionState由NewPartition切換為OnlinePartition時,調(diào)用了 initializeLeaderAndIsrF orPartition()方法湿故,其操作的主要步驟是:

(1)從ControllerContext.partitionReplicaAssignment集合中選擇第一個可用的副本作為Leader副本,其余的副本構成ISR集合膜蛔。
(2)將Leader副本和ISR集合的信息寫入到ZooKeeper坛猪。
(3)更新ControllerContext.partitionLeadershipInfo中緩存的Leader副本、ISR集合等信息皂股。
(4)將上述步驟中確定的Leader副本墅茉、ISR集合、AR集合等信息添加到ControllerBrokerRequestBatch呜呐,之后會封裝成LeaderAndIsrRequest發(fā)送給相關的Broker就斤。

PartitionStateMachine.initializeLeaderAndIsrForPartition()方法的具體實現(xiàn)如下:

image.png

當PartitionState由OfflinePartition或OnlinePartition切換為OnlinePartition時調(diào)用了electLeaderForPartition()方法,其操作的主要步驟是:

(1)使用指定的PartitionLeaderSelector為分區(qū)選舉新的Leader副本蘑辑。
(2)將Leader副本和ISR集合的信息寫入到Zookeeper洋机。
(3)更新ControllerContext.partitionLeadershipInfo集合中緩存的Leader副本、ISR集合等信息洋魂。
(4)將上述步驟中確定的Leader副本绷旗、ISR集合喜鼓、AR集合等信息添加到ControllerBrokerRequestBatch,之后會封裝成LeaderAndIsrRequest發(fā)送給相關的Broker衔肢。

PartitionStateMachine.electLeaderForPartition()方法的具體實現(xiàn)如下:!
image.png
image.png

在handleStateChange()方法中對于目標分區(qū)狀態(tài)為NewPartition庄岖、OfflinePartition、NonExistentPartition的處理比較簡單角骤,只是進行了狀態(tài)切換隅忿,并未進行其他處理。有的讀者會說邦尊,當PartitionState由NonExistentPartition轉(zhuǎn)換為NewPartition時背桐,并沒有從ZooKeeper中加載Partition的AR集合的相關操作。這是因為在調(diào)用handleStateChange()的方法中已經(jīng)完成了此操作胳赌,我們以創(chuàng)建Topic的過程為例牢撼。當創(chuàng)建Topic時觸發(fā)TopicChangeListener這個監(jiān)聽器,它會調(diào)用handleStateChange()完成PartitionState由NonExistentPartition到NewPartition的切換疑苫,調(diào)用關系如圖4-58所示熏版。


image.png

在TopicChangeListener中會完成從ZooKeeper加載Partition的AR集合的操作。


image.png

handleStateChange()方法是個private方法捍掺,由PartitionStateMachine.handleStateChanges()方法和triggerOnlinePartitionStateChange()方法調(diào)用撼短,對外提供PartitionState切換。handleStateChanges()方法對指定的分區(qū)集合循環(huán)調(diào)用handleStateChange()方法進行狀態(tài)轉(zhuǎn)換挺勿。

image.png

triggerOnlinePartitionStateChange()方法對partitionState集合中的全部分區(qū)進行遍歷曲横,將OfflinePartition和NewPartition狀態(tài)的分區(qū)轉(zhuǎn)換成OnlinePartition狀態(tài)。狀態(tài)切換成功的分區(qū)即可對外提供服務不瓶。

image.png

PartitionLeaderSelector
通過對前面的分析可知禾嫉,PartitionMachine將Leader副本選舉、確定ISR集合的工作委托給了PartitionLeaderSelector接口實現(xiàn)蚊丐,PartitionMachine可以專注于管理分區(qū)狀態(tài)熙参。這是策略模式的一種典型的應用場景。

圖4-59展示了PartitionLeaderSelector的實現(xiàn)類麦备,這五個不同的實現(xiàn)提供了不同的策略孽椰。PartitionLeaderSelector接口的定義如下:


image.png

NoOpLeaderSelector是其中最簡單的實現(xiàn),它并沒有進行Leader選舉凛篙,而是將currentLeaderAndIsr直接返回黍匾,需要接收LeaderAndIsrRequest的Broker則是分區(qū)的AR集合

OfflinePartitionLeaderSelector會根據(jù)currentLeaderAndIsr選舉新的Leader和ISR集合,策略如下:

1)如果在ISR集合中存在至少一個可用的副本呛梆,則從ISR集合中選擇新的Leader副本锐涯,當前ISR集合為新ISR集合。
(2)如果ISR集合中沒有可用的副本且“Unclean leader election”配置被禁用填物,那么就拋出異常全庸。
(3)如果“Unclean leader election”被開啟秀仲,則從AR集合中選擇新的Leader副本和ISR集合。
(4)如果AR集合中沒有可用的副本壶笼,拋出異常神僵。

OfflinePartitionLeaderSelector.selectLeader()方法的具體實現(xiàn)如下:

image.png
image.png

對于剩余的PartitionLeaderSelector實現(xiàn),這里只介紹其策略覆劈,具體的實現(xiàn)代碼留給讀者自己分析保礼。PreferredReplicaPartitionLeaderSelector的策略是:如果“優(yōu)先副本”可用且在ISR集合中,則選取其為Leader副本责语,當前的ISR集合為新的ISR集合炮障,并向AR集合中所有可用副本發(fā)送LeaderAndIsrRequest,否則會拋出異常坤候。

ReassignedPartitionLeaderSelector涉及到副本的重新分配胁赢,副本重新分配的相關概念后面詳細分析,這里先簡單了解ReassignedPartitionLeaderSelector的策略:選取的新Leader副本必須在新指定的AR集合中且同時在當前ISR集合中白筹,當前ISR集合為新ISR集合智末,接收LeaderAndIsrRequest的副本是新指定的AR集合中的副本。

ControlledShutdownLeaderSelector的策略是:從當前ISR集合中排除正在關閉的副本后作為新的ISR集合徒河,從新ISR集合中選擇新的Leader系馆,需要向AR集合中可用的副本發(fā)送LeaderAndIsrRequest。

ReplicaStateMachine
ReplicaStateMachine是Controller Leader用于維護副本狀態(tài)的狀態(tài)機顽照。副本狀態(tài)由ReplicaState接口表示由蘑,它有七個子類,分別代表了副本的七種不同的狀態(tài)代兵,如表4-3所示尼酿。

image.png

ReplicaState之間的轉(zhuǎn)換如圖4-60所示。下面介紹各個ReplicaState狀態(tài)之間轉(zhuǎn)換時需要完成的相關操作植影。

  • ·NonExistentReplica→NewReplica

Controller向此副本所在Broker發(fā)送LeaderAndIsrRequest裳擎,并向集群中所有可用的Broker發(fā)送 UpdateMetadataRequest。

  • ·NewReplica→OnlineReplica
    Controller將NewReplica加入到AR集合中
  • ·OnlineReplica,OfflineReplica→OnlineReplica

Controller向此副本所在的Broker發(fā)送LeaderAndIsrRequest何乎,并向集群中所有可用的Broker發(fā)送UpdateMetadataRequest。

  • ·NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible →OfflineReplica Controller向副本所在Broker發(fā)送StopReplicaRequest土辩,之后會從ISR集合中清除此副本支救,最后向其他可用副本所在的Broker發(fā)送LeaderAndIsrRequest,并向集群中所有可用的Broker發(fā)送UpdateMetadataRequest拷淘。

  • ·OfflineReplica→ReplicaDeletionStarted
    Controller向副本所在Broker發(fā)送StopReplicaRequest各墨。

  • ·ReplicaDeletionStarted→ReplicaDeletionSuccessful
    只做狀態(tài)轉(zhuǎn)換,并沒有其他操作启涯。

  • ·ReplicaDeletionStarted→ReplicaDeletionIneligible
    只做狀態(tài)轉(zhuǎn)換贬堵,并沒有其他操作恃轩。

  • ·ReplicaDeletionSuccessful→NonExistentReplica
    Controller從AR集合中刪除此副本。

在ReplicaStateMachine中也有controllerContext黎做、zkUtils叉跛、brokerRequestBatch字段,它們的功能與PartitionStateMachine中的同名字段相同蒸殿,ReplicaStateMachine剩余的字段如下所述筷厘。

  • ·replicaState:Map[PartitionAndReplica, ReplicaState]類型,記錄每個副本對應的ReplicaState狀態(tài)宏所。
  • ·brokerChangeListener:ZooKeeper的監(jiān)聽器酥艳,用于監(jiān)聽Broker的變化,例如Broker宕機或重新上線等事件爬骤。關于ZooKeeper監(jiān)聽器的相關介紹充石,下文詳述。

ReplicaStateMachine啟動時會對replicaState集合進行初始化霞玄,并調(diào)用handleStateChanges()方法嘗試將可用副本轉(zhuǎn)換為OnlineReplica狀態(tài)骤铃。


image.png

設置每個副本的初始狀態(tài)的依據(jù)是controllerContext.partitionLeadershipInfo中記錄的Broker狀態(tài)。ReplicaStateMachine.initializeReplicaState()方法如下:

image.png

ReplicaStateMachine的核心方法是handleStateChange()方法溃列,其中控制著ReplicaState的轉(zhuǎn)換劲厌。


image.png
image.png
image.png
image.png

ReplicaStateMachine.handleStateChanges()方法對指定的副本集合循環(huán)調(diào)用handleStateChange()方法來完成狀態(tài)轉(zhuǎn)換,與PartitionStateMachine中的實現(xiàn)類似听隐,代碼不貼出來了补鼻。

ZooKeeper Listener

I0Itec-zkClient是一款ZooKeeper客戶端工具,它并沒有像Apache Curator那樣實現(xiàn)高級的功能雅任,而是提供了簡單易用的API來實現(xiàn)一些常見的功能风范。I0Itec-zkClient像大多數(shù)ZooKeeper客戶端框架一樣實現(xiàn)了斷線重連。它還提供了方便使用的監(jiān)聽器沪么,避免了手動反復注冊Watcher的煩瑣操作硼婿,下文會介紹KafkaController中的多個I0Itec-zkClient Listener實現(xiàn)。I0Itec還對ZooKeeper的異常和序列化做了簡單封裝禽车。此處不對I0IteczkClient的使用展開詳述寇漫,感興趣的讀者可以參考相關文檔進行學習。

Listener接口介紹
KafkaController會通過ZooKeeper監(jiān)控整個Kafka集群的運行狀態(tài)殉摔,響應管理員指定的相關操作州胳。具體的實現(xiàn)方式是在ZooKeeper的指定節(jié)點上添加Listener,監(jiān)聽此節(jié)點中的數(shù)據(jù)變化或是其子節(jié)點的變化逸月,從而觸發(fā)相應的業(yè)務邏輯栓撞。

Listener按照接口的類型可以分為三類,如表4-4所示


image.png

IZkDataListener接口的定義如下:
[圖片上傳失敗...(image-eab1f6-1613633270244)]

IZkChildListener接口中的定義如下:


image.png

IZkStateListener接口的定義如下:


image.png

Kafka中提供了五個IZkDataListener接口的實現(xiàn),它們分別是:LeaderChangeListener瓤湘、PartitionModificationsListener瓢颅、PreferredReplicaElectionListener、PartitionsReassignedListener弛说、ReassignedPartitionsIsrChangeListener挽懦。如圖4-61所示。

image.png

Kafka中提供了四個IZkChildListener接口的實現(xiàn)剃浇,它們分別是:DeleteTopicsListener巾兆、TopicChangeListener、IsrChangeNotificationListener虎囚、BrokerChangeListener角塑,如圖4-62所示。

image.png

Kafka中只有了一個IZkStateListener接口的實現(xiàn)淘讥,如圖4-63所示圃伶。

image.png

TopicChangeListener
TopicChangeListener負責管理Topic的增刪,它監(jiān)聽“/brokers/topics”節(jié)點的子節(jié)點變化蒲列。其具體邏輯如下:

image.png

在TopicChangeListener.onNewTopicCreation()方法中還會為每個新增的Topic注冊一個PartitionModificationsListener窒朋,然后調(diào)用onNewPartitionCreation()方法完成新增Topic的分區(qū)狀態(tài)以及副本狀態(tài)轉(zhuǎn)換。

image.png

為了便于讀者理解蝗岖,這里舉例解釋TopicChangeListener的功能〗男桑現(xiàn)在假設有三個Broker,管理人員通過kafka-topics腳本添加了一個名為“test”的Topic抵赢,它有三個分區(qū)欺劳,每個分區(qū)有三個副本。

kafka-topics腳本向ZooKeeper的“/brokers/topics/test”節(jié)點寫入的信息是"partitions": {"0": [0,1, 2],"1": [1,2,0],"2": [2,1,0]}铅鲤,此時觸發(fā)TopicChangeListener划提。TopicChangeListener將test中每個Partition的AR集合加載到ControllerContext中,在進行第一次分區(qū)狀態(tài)轉(zhuǎn)換(NoExistentPartition→NewPartition)和第一次副本狀態(tài)切換(NoExistentReplica→NewReplica)時邢享,只做了狀態(tài)切換并沒有發(fā)送任何請求鹏往。進行第二次分區(qū)狀態(tài)轉(zhuǎn)換(NewPartition→OnlinePartition)時會選取Leader副本和ISR集合信息,結果為如表4-5所示骇塘。

image.png

之后會將此結果寫入ZooKeeper伊履,向所有可用Broker發(fā)送LeaderAndIsrRequest來指導副本的角色切換,然后向所有可用Broker發(fā)送UpdateMetadataRequest來更新其MetadataCache款违。第二次副本狀態(tài)切換(NewReplica→OnlineReplica)時唐瀑,副本已在AR集合中,所以并未做任何操作奠货。

TopicDeletionManager與DeleteTopicsListener
在開始介紹刪除Topic 的實現(xiàn)之前介褥,先來了解一下TopicDeletionManager的功能和實現(xiàn)座掘。在TopicDeletionManager中維護了多個集合递惋,用于管理待刪除的Topic和不可刪除的集合柔滔,它會啟動一個DeleteTopicsThread線程來執(zhí)行刪除Topic的具體邏輯。

當Topic滿足下列三種情況之一時不能被刪除:
(1)如果Topic中的任一分區(qū)正在重新分配副本萍虽,則此Topic不能被刪除睛廊。
(2)如果Topic中的任一分區(qū)正在進行“優(yōu)先副本”選舉,則此Topic不能被刪除杉编。
(3)如果Topic中的任一分區(qū)的任一副本所在的Broker宕機超全,則此Topic不能被刪除。

TopicDeletionManager中各個字段的含義和功能如下所示邓馒。

  • ·partitionStateMachine:用于管理分區(qū)狀態(tài)的狀態(tài)機嘶朱。
  • ·replicaStateMachine:用于管理副本狀態(tài)的狀態(tài)機。
  • ·topicsToBeDeleted:Set[String]類型光酣,用于記錄將要被刪除的Topic集合疏遏,由TopicDeletionManager的構造器參數(shù)initialTopicsToBeDeleted指定其初始化值。
  • ·partitionsToBeDeleted:Set[TopicAndPartition]類型救军,用于記錄將要被刪除的分區(qū)集合
  • ·topicsIneligibleForDeletion:Set[String]類型财异,用于記錄不可刪除的Topic集合,由TopicDeletionManager的構造器參數(shù)initialTopicsIneligibleForDeletion指定其初始化值唱遭。
  • ·deleteTopicStateChanged:AtomicBoolean類型戳寸,用于標識Topic刪除操作是否開始。
  • ·deleteTopicsThread:DeleteTopicsThread類型拷泽,用于刪除Topic的后臺線程疫鹊。
  • ·isDeleteTopicEnabled:配置項delete.topic.enable的值,用于指定是否支持刪除Topic跌穗。
  • ·deleteTopicsCond:Condition對象订晌,用于其他線程與deleteTopicsThread線程同步。

在TopicDeletionManager啟動時蚌吸,會調(diào)用start()方法進行初始化锈拨。它會根據(jù)isDeleteTopicEnabled字段決定是否啟動DeleteTopicsThread線程,如果此時topicsToBeDeleted集合不為空羹唠,則DeleteTopicsThread可以開始進行Topic刪除的相關操作奕枢,并將deleteTopicStateChanged字段設置為true。

DeleteTopicsListener被觸發(fā)后通過enqueueTopicsForDeletion()將待刪除的Topic放入topicsToBeDeleted集合佩微,將待刪除的Topic的分區(qū)集合放入partitionsToBeDeleted集合缝彬,并喚醒DeleteTopicsThread處理。

image.png

DeleteTopicsThread是真正執(zhí)行Topic刪除操作的線程哺眯,它繼承了ShutdownableThread谷浅,入口方法是doWork()方法。刪除Topic的步驟如下:
(1)獲取待刪除Topic的分區(qū)集合,構成UpdateMetadataRequest發(fā)送給所有的Broker一疯,將Broker中MetadataCache的相關信息刪除撼玄。這些分區(qū)不再對外提供服務。
(2)調(diào)用onPartitionDeletion()方法開始對指定分區(qū)進行刪除墩邀。
a)將不可用副本轉(zhuǎn)換成ReplicaDeletionIneligible狀態(tài)掌猛。

b)將可用副本轉(zhuǎn)換成OfflineReplica狀態(tài)。此步驟會發(fā)送StopReplicaRequest到待刪除的副本(不會刪除副本)眉睹,同時還會向可用的Broker發(fā)送LeaderAndIsrRequest和UpdateMetadataRequest荔茬,將副本從ISR集合中刪除。
c)將可用副本由OfflineReplica轉(zhuǎn)換成ReplicaDeletionStarted竹海。此步驟會向可用副本發(fā)送StopReplicaRequest(刪除副本)慕蔚。注意,這里設置了回調(diào)函數(shù)處理StopReplicaResponse。

(3)調(diào)用deleteTopicStopReplicaCallback()回調(diào)函數(shù)處理StopReplicaResponse。
a)如果StopReplicaResponse中的錯誤碼表示出現(xiàn)異常龙屉,則將副本狀態(tài)轉(zhuǎn)換為ReplicaDeletionIneligible,并標記此副本所在Topic不可刪除十偶,也就是將Topic添加到topicsIneligibleForDeletion隊列,最后喚醒DeleteTopicsThread線程园细。

b)如果StopReplicaResponse正常惦积,則將副本狀態(tài)轉(zhuǎn)換為ReplicaDeletionSuccessful,并喚醒DeleteTopicsThread線程猛频。

(4)經(jīng)過上述三個步驟后狮崩,開始第二次doWork()調(diào)用。如果待刪除的Topic的所有副本已經(jīng)處于ReplicaDeletionSuccessful狀態(tài)鹿寻,調(diào)用completeDeleteTopic()方法完成Topic的刪除睦柴。

a)取消partitionModificationsListeners監(jiān)聽。
b)將此Topic的所有副本從ReplicaDeletionSuccessful轉(zhuǎn)換為NonExistentReplica毡熏。此步驟會將副本對應的Replica對象從ControllerContext中刪除坦敌。
c)將Topic的所有分區(qū)轉(zhuǎn)換為OfflinePartition狀態(tài),緊接著會再轉(zhuǎn)換為NonExistentPartition痢法。
d)將Topic和相關的分區(qū)從topicsToBeDeleted集合和partitionsToBeDeleted集合中刪除狱窘。
e)刪除ZooKeeper以及ControllerContext中與此Topic相關的全部信息。

(5)如果還有副本處于ReplicaDeletionStarted狀態(tài)财搁,則表示還沒有收到StopReplicaResponse蘸炸,則繼續(xù)等待。
(6)如果Topic的任一副本處于ReplicaDeletionIneligible狀態(tài)尖奔,則表示此Topic不能被刪除搭儒,調(diào)用markTopicForDeletionRetry()將處于ReplicaDeletionIneligible狀態(tài)的副本重新轉(zhuǎn)換成OfflineReplica狀態(tài)穷当。此步驟的相關操作在步驟(2)→b中已經(jīng)詳細描述,這里不再贅述淹禾。

DeleteTopicsThread.doWork()方法的具體實現(xiàn)如下:


image.png

isTopicEligibleForDeletion()方法根據(jù)下面三個條件判斷Topic能否開始刪除操作膘滨。

image.png

onTopicDeletion()方法的核心是向所有可用的Broker發(fā)送UpdateMetadataRequest,注意其leader字段為LeaderDuringDelete稀拐,通知它們指定的Topic要被刪除,并刪除MetadataCache中與此Topic相關的緩存信息丹弱。

image.png

onPartitionDeletion()方法直接調(diào)用了startReplicaDeletion()方法德撬,在startReplicaDeletion()方法中開始對副本進行刪除。

image.png
image.png

deleteTopicStopReplicaCallback()回調(diào)函數(shù)中會調(diào)用failReplicaDeletion()方法處理異常副本躲胳,調(diào)用completeReplicaDeletion()方法處理返回正常StopReplicaResponse的副本蜓洪。

image.png

回到doWork()方法繼續(xù)分析,在步驟4中調(diào)用completeDeleteTopic()方法對成功刪除的Topic進行處理坯苹。


image.png

在步驟6中調(diào)用markTopicForDeletionRetry()方法處理不可刪除的Topic隆檀,它會將處于ReplicaDeletionIneligible狀態(tài)的副本重新轉(zhuǎn)換成OfflineReplica狀態(tài)。

image.png

在前面我們介紹了三種Topic不可刪除的情況粹湃,在DeleteTopicsThread線程的執(zhí)行過程中也有涉及恐仑。當Topic不再滿足這三種情況時會通過resumeDeletionForTopics()方法從topicsIneligibleForDeletion集合中將其移除,并喚醒DeleteTopicsThread線程進行上述刪除操作为鳄。圖4-64是其被調(diào)用的位置裳仆,依次對應本節(jié)開始描述的三種情況。

image.png

介紹完TopicDeletionManager的相關實現(xiàn)后再來對DeleteTopicsListener進行分析孤钦。DeleteTopicsListener會監(jiān)聽ZooKeeper中“/admin/delete_topics”節(jié)點下的子節(jié)點變化歧斟,當TopicCommand在該路徑下添加需要被刪除的Topic時,DeleteTopicsListener會被觸發(fā)偏形,它會將該待刪除的Topic交由TopicDeletionManager執(zhí)行Topic刪除操作静袖。下面是DeleteTopicsListener.handleChildChange()方法的具體實現(xiàn):

image.png
image.png

PartitionModificationsListener
在上一節(jié)介紹的Topic刪除過程中涉及PartitionModificationsListener的注冊和取消。在新增Topic時會為每個Topic注冊一個PartitionModificationsListener俊扭,在成功刪除Topic之后會將注冊的PartitionModificationsListener刪除队橙。PartitionModificationsListener會監(jiān)聽“/brokers/topics/[topic_name]”節(jié)點中的數(shù)據(jù)變化,主要用于監(jiān)聽一個Topic的分區(qū)變化萨惑。

PartitionModificationsListener.handleDataChange()方法的實現(xiàn)如下:!
image.png

onNewPartitionCreation()方法在TopicChangeListener中分析過喘帚,此處不再贅述。需要注意咒钟,PartitionModificationsListener并不對分區(qū)的刪除進行處理吹由,在第5章中分析kafkatopics腳本時可以看到,是不能減少Topic的分區(qū)數(shù)量的朱嘴。

BrokerChangeListener
BrokerChangeListener是ReplicaStateMachine中唯一的ZooKeeper Listener倾鲫,它會監(jiān)聽“/brokers/ids”節(jié)點下的子節(jié)點變化粗合,主要負責處理Broker的上線和故障下線。當Broker上線時會在“/brokers/ids”下創(chuàng)建臨時節(jié)點乌昔,下線時會刪除對應的臨時節(jié)點隙疚。

BrokerChangeListener.handleChildChange()方法的實現(xiàn)如下:


image.png

KafkaController.onBrokerFailure()方法對故障Broker的處理步驟如下:

(1)將Leader副本分布在故障Broker上的分區(qū)轉(zhuǎn)換為OfflinePartition狀態(tài)。
(2)將OfflinePartition狀態(tài)的分區(qū)轉(zhuǎn)換為OnlinePartition狀態(tài)磕道。此步會使用OfflinePartitionLeaderSelector為其選取Leader副本和ISR集合并寫入ZooKeeper供屉,之后發(fā)送LeaderAndIsrRequest和UpdateMetadataRequest。
3)將故障Broker上的副本轉(zhuǎn)換為OfflineReplica狀態(tài)溺蕉。此步會向故障Broker發(fā)送StopReplicaRequest伶丐,從ISR集合中清除相關副本,并發(fā)送LeaderAndIsrRequest和UpdateMetadataRequest疯特。
(4)檢查故障Broker上是否存在待刪除Topic的副本哗魂,如果存在,則將其副本轉(zhuǎn)換為ReplicaDeletionIneligible狀態(tài)并標記Topic不可刪除漓雅。
(5)如果步驟1中沒有分區(qū)的Leader副本在故障Broker上录别,則上述步驟中可能不會發(fā)送UpdateMetadataRequest,這里向可用Broker發(fā)送UpdateMetadataRequest邻吞。

KafkaController.onBrokerFailure()方法的具體實現(xiàn)如下:

image.png
image.png

這里舉例說明onBrokerFailure()方法的功能:現(xiàn)在假設Broker0上分布了Partition0组题、1、2的各一個副本抱冷,其中Partition0的副本為Leader副本往踢,其余兩個副本在對應的ISR集合中。當Broker0發(fā)生故障下線時徘层,ZooKeeper中的“/brokers/ids/0”臨時節(jié)點會被刪除峻呕,并觸發(fā)BrokerChangeListener進行處理。首先趣效,將Broker0從ControllerContext的Broker列表中刪除瘦癌。然后,將Partition0轉(zhuǎn)換為OfflinePartition狀態(tài)跷敬,緊接著再將其轉(zhuǎn)換成OnlinePartition狀態(tài)讯私,此時會使用OfflinePartitionLeaderSelector為其選舉新的Leader副本和ISR集合并更新到ZooKeeper中,隨后發(fā)送LeaderAndIsrRequest和UpdateMetaRequest西傀。之后斤寇,將三個副本轉(zhuǎn)換成OfflineReplica,并將其從ISR集合刪除拥褂,此時會發(fā)送StopReplicaRequest(不刪除副本)娘锁、LeaderAndIsrRequest和UpdateMetaRequest更新可用Broker的MetadataCache。

KafkaController.onBrokerStartup()方法的實現(xiàn)如下:


image.png
image.png

繼續(xù)上面的示例饺鹃,當Broker0重新上線后會創(chuàng)建臨時節(jié)點“/brokers/ids/0”莫秆,觸發(fā)BrokerChangeListener處理间雀。首先同樣是更新ControllerContext的Broker列表。然后將三個副本轉(zhuǎn)換成OnlineReplica狀態(tài)镊屎,此時每個分區(qū)都已經(jīng)有了Leader副本和ISR集合信息惹挟,所以向?qū)北景l(fā)送LeaderAndIsrRequest使其成為Follower,并向可用Broker發(fā)送UpdateMetadataRequest更新MetadataCache信息缝驳。此示例沒有Partition處于OfflinePartition狀態(tài)连锯,也沒有需要進行重新分配的Partition,所以后續(xù)步驟沒有執(zhí)行用狱。

IsrChangeNotificationListener
在前面介紹過运怖,F(xiàn)ollower副本會與Leader副本會進行消息同步,當Follower副本追上Leader副本時會被添加到ISR集合中齿拂,當Follower副本與Leader副本差距太大時會被踢出ISR集合。Leader副本不僅會在ISR集合變化時將其記錄到ZooKeeper中肴敛,還會調(diào)用ReplicaManager.recordIsrChange()方法記錄到isrChangeSet集合中署海,之后通過isr-changepropagation定時任務將該集合中周期性地寫入到ZooKeeper的“/isr_change_notification”路徑下。KafkaController中定義的IsrChangeNotificationListener用于監(jiān)聽此路徑下的子節(jié)點變化医男,當某些分區(qū)的ISR集合變化時通知整個集群中的所有Broker砸狞。

IsrChangeNotificationListener.handleChildChange()方法的具體實現(xiàn)如下:

image.png

PreferredReplicaElectionListener
PreferredReplicaElectionListener負責監(jiān)聽的ZooKeeper節(jié)點是“/admin/preferred_replica_election”。當我們通過PreferredReplicaLeaderElectionCommand命令指定某些分區(qū)需要進行“優(yōu)先副本”選舉時會將指定分區(qū)的信息寫入該節(jié)點镀梭,從而觸發(fā)PreferredReplicaElectionListener進行處理刀森。進行“優(yōu)先副本”選舉的目的是讓分區(qū)的“優(yōu)先副本”重新成為Leader副本,這是為了讓Leader副本在整個集群中分布得更加均衡报账。

PreferredReplicaElectionListener.handleDataChange()方法的具體實現(xiàn)如下:


image.png

onPreferredReplicaElection()方法的核心是通過PreferredReplicaPartitionLeaderSelector選舉Leader副本和ISR集合研底。PreferredReplicaPartitionLeaderSelector選舉成功的條件是“優(yōu)先副本”不是當前Leader副本,但是要在ISR集合中透罢,否則會拋出異常榜晦。經(jīng)過選舉后的ISR集合還是當前的ISR集合,但分區(qū)的Leader副本變成了“優(yōu)先副本”羽圃。

image.png

onPreferredReplicaElection()方法還會被一個名為"partition-rebalance"的定時任務調(diào)用乾胶,此任務會定期檢測集群中“優(yōu)先副本”與Leader副本的分配情況,并判斷是否觸發(fā)“優(yōu)先副本”選舉朽寞,后面會詳細介紹該定時任務识窿。

副本重新分配的相關Listener

PartitionsReassignedListener監(jiān)聽的ZooKeeper節(jié)點是“/admin/reassign_partitions”。當管理人員通過ReassignPartitionsCommand命令指定某些分區(qū)需要重新分配副本時脑融,會將指定分區(qū)的信息寫入該節(jié)點喻频,從而觸發(fā)PartitionsReassignedListener進行處理。

下面是整個副本重新分配的步驟:
(1)從ZooKeeper的“/admin/reassign_partitions”節(jié)點下讀取分區(qū)重分配信息肘迎。
(2)過濾掉正在進行重新分配的分區(qū)半抱。
(3)檢測其Topic是否為待刪除的Topic脓恕。如果是,則調(diào)用KafkaController. removePartitionFromReassignedPartitions()方法窿侈,其操作如下:
a)取消此分區(qū)注冊的ReassignedPartitionsIsrChangeListener炼幔。
b)刪除ZooKeeper的“/admin/reassign_partitions”節(jié)點中與當前分區(qū)相關的數(shù)據(jù)。
c)從partitionsBeingReassigned集合中刪除分區(qū)相關的數(shù)據(jù)史简。
(4)否則乃秀,創(chuàng)建ReassignedPartitionsContext,調(diào)用initiateReassignReplicasForTopicPa rtition()方法開始為重新分配副本做一些準備工作圆兵。
a)首先跺讯,獲取當前的舊AR集合和指定的新AR集合。
b)比較新舊兩個AR集合殉农,若兩者完全一樣則拋出異常刀脏,執(zhí)行步驟3的操作后結束。
c)判斷新AR集合中涉及的Broker是否都是可用的超凳,若不是愈污,也拋出異常,執(zhí)行步驟3的操作后結束轮傍。
d)為分區(qū)添注冊ReassignedPartitionsIsrChangeListener暂雹,后面會詳細介紹此Listener。
e)將分區(qū)添加到partitionsBeingReassigned集合中创夜,并標識該Topic不能被刪除杭跪。
f)調(diào)用onPartitionReassignment()方法,開始執(zhí)行副本重新分配驰吓。
5)onPartitionReassignment()方法完成了副本重新分配的整個流程涧尿。在下面的描述中使用“新AR+舊AR”表示新AR集合和舊AR集合的并集,“新AR-舊AR”表示新AR集合與舊AR集合的差集檬贰。
(6)判斷新AR集合中的所有副本是否已經(jīng)進入了ISR集合现斋。如果沒有,則執(zhí)行下面的步驟:
a)將分區(qū)在ContextController和ZooKeeper中的AR集合更新成“新AR+舊AR”偎蘸。
b)向“新AR+舊AR”發(fā)送LeaderAndIsrRequest庄蹋,此步驟主要目的是為了增加ZooKeeper中記錄的leader_epoch值。
c)將“新AR-舊AR”中的副本更新成NewReplica狀態(tài)迷雪,此步驟會向這些副本發(fā)送LeaderAndIsrRequest使其成為Follower副本限书,并發(fā)送UpdateMetadataRequest。
(7)如果新AR集合中的副本已經(jīng)都進入了ISR集合章咧,則執(zhí)行下面的步驟:

a)將新AR集合中的所有副本都轉(zhuǎn)換成OnlineReplica狀態(tài)倦西。
b)將ControllerContext中的AR記錄更新為新AR集合。
c)如果當前Leader副本在新AR集合中赁严,則遞增ZooKeeper和ControllerContext中記錄的leader_epoch值扰柠,并發(fā)送LeaderAndIsrRequest和UpdateMetadataRequest粉铐。
d)如果當前Leader不在新AR集合中或Leader副本不可用,則將分區(qū)狀態(tài)轉(zhuǎn)換為OnlinePartition(之前也是OnlinePartition)卤档,主要目的使用ReassignedPartitionLeaderSelec tor選舉新的Leader副本蝙泼,使得新AR集合中的一個副本成為新Leader副本,然后會發(fā)送LeaderAndIsrRequest和UpdateMetadataRequest劝枣。
e)將“舊AR-新AR”中的副本轉(zhuǎn)換成OfflineReplica汤踏,此步驟會發(fā)送StopReplicaRequest(不刪除副本),清理ISR集合中的相關副本舔腾,并發(fā)送LeaderAndISRRequest和UpdateMetadataRequest請求溪胶。
f)接著將“舊AR-新AR”中的副本轉(zhuǎn)換成ReplicaDeletionStarted,此步驟會發(fā)送StopReplicaRequest(刪除副本)稳诚。完成刪除后哗脖,將副本轉(zhuǎn)換為ReplicaDeletionSuccessful,最終轉(zhuǎn)換成NonExistentReplica扳还。
g)更新ZooKeeper中記錄的AR信息才避。
h)將此分區(qū)的相關信息從ZooKeeper的“/admin/reassign_partitions”節(jié)點中移除。
i)向所有可用的Broker發(fā)送一次UpdateMetadataRequest普办。
j)嘗試取消相關的Topic的“不可刪除”標記工扎,并喚醒DeleteTopicsThread線程徘钥。

上面描述的整個流程其實還涉及ReassignedPartitionsIsrChangeListener的相關內(nèi)容衔蹲,其中,步驟7就是在ReassignedPartitionsIsrChangeListener中完成的呈础。

ReassignedPartitionsIsrChangeListener在上述步驟(4)→d中注冊到ZooKeeper的“/broker/topics/[topic_name]/partitions/[partitionId]/state”節(jié)點上監(jiān)聽其數(shù)據(jù)變化舆驶,主要負責處理進行副本重新分配的分區(qū)的ISR集合變化。當ReassignedPartitionsIsrChangeListener監(jiān)聽到分區(qū)的ISR集合發(fā)生變化時而钞,按照下列步驟進行處理:

(1)檢查當前分區(qū)是否正在進行副本的重新分配操作沙廉,若不是,則結束臼节。
(2)從ZooKeeper中讀取當前分區(qū)的Leader和ISR集合撬陵。
(3)如果新AR集合中的副本已完全進入當前ISR集合,則調(diào)用onPartitionReassignment()方法完成步驟7的相關操作网缝。
(4)否則巨税,輸出日志后結束,等待下一次觸發(fā)粉臊。

為了讓讀者更好地理解副本重新分配的過程草添,通過一個示例描述這個過程:現(xiàn)在假設有Broker05這六個Broker,Topic名為“test”的編號為0的分區(qū)(Partition0)三個副本扼仲,分別位于Broker13上远寸,其中Leader副本位于Broker1上抄淑。

管理人員使用ReassignPartitionsCommand命令將Partition0的副本重新分配到Broker3~5上。PartitionsReassignedListener被觸發(fā)后首先為Partition0注冊ReassignedP artitionsIsrChangeListener驰后,并標記test這個Topic不能被刪除肆资。然后,將ZooKeeper和ControllerContext中Partition0的AR集合記錄更新為[1,2,3,4,5]倡怎。發(fā)送LeaderAndIsrRequest使[4,5]副本成為Follower副本迅耘,并發(fā)送UpdateMetadataRequest更新可用Broker的MetadataCache。到這里PartitionsReassignedListener的相關處理就結束了监署。

隨著[4,5]兩個Follower副本與Leader副本進行同步颤专,最終會進入ISR集合,此時會觸發(fā)ReassignedPartitionsIsrChangeListener進行處理钠乏。首先將[3,4,5]副本轉(zhuǎn)換為OnlineReplica狀態(tài)栖秕,更新ControllerContext中對應的AR記錄為[3,4,5]。之后晓避,[3,4,5]副本已經(jīng)處于ISR集合簇捍,且Leader副本不在[3,4,5]中,則需要使用ReassignedPartitionLead erSelector選舉新的Leader俏拱。我們簡單回顧ReassignedPartitionLeaderSelector的策略暑塑,它選取的新Leader副本必須在新指定的AR集合中且在ISR集合中,當前ISR集合為新ISR集合锅必。接收LeaderAndIsrRequest的副本是新指定的AR集合事格。這里假設選舉副本3為Leader副本,并向[3,4,5]三個副本發(fā)送LeaderAndIsrRequest請求搞隐。然后驹愚,將[1,2]副本轉(zhuǎn)換為OfflineReplica,此時發(fā)送StopReplicaRequest停止副本劣纲,將其從ISR中移除逢捺,最終將這兩個副本刪除轉(zhuǎn)換為NonExistentReplica狀態(tài)。最后癞季,更新ZooKeeper中的AR記錄劫瞳,標記取消“test”的“不可刪除”標記,并喚醒DeleteTopicsThread線程绷柒。副本重新分配到此全部完成志于,此時AR集合和ISR集合都為[3,4,5],Leader為副本3辉巡。

PartitionsReassignedListener.handleDataChange()方法的具體實現(xiàn)如下:

image.png

KafkaController.initiateReassignReplicasForTopicPartition()方法的實現(xiàn)如下:


image.png

KafkaController.onPartitionReassignment()是副本重新分配的核心恨憎,其具體實現(xiàn)如下:


image.png
image.png

ReassignedPartitionsIsrChangeListener的核心邏輯也是調(diào)用onPartitionReassignment()方法,這里不再贅述了。

KafkaController初始化與故障轉(zhuǎn)移
KafkaController的主要功能和實現(xiàn)在前面幾節(jié)中已經(jīng)介紹完了憔恳。本節(jié)介紹KafkaController的初始化以及故障轉(zhuǎn)移方面的內(nèi)容瓤荔。在Kafka集群中,只有一個Controller能夠成為Leader來管理整個集群钥组,而其他未成為Controller Leader的Broker上也會創(chuàng)建一個KafkaController對象输硝,它們唯一能做的事情就是當Controller Leader出現(xiàn)故障,不能繼續(xù)管理集群時程梦,競爭成為新的Controller Leader点把。

KafkaController的啟動和故障轉(zhuǎn)移的過程與ZookeeperLeaderElector有著密切的關系,KafkaController. controllerElector字段ZookeeperLeaderElector類型的定義如下:

image.png

KafkaController啟動的過程由KafkaController.startup()方法完成屿附,其中會注冊SessionExpirationListener郎逃,并啟動ZookeeperLeaderElector。

image.png

SessionExpirationListener繼承了IZkStateListener接口挺份,監(jiān)聽KafkaController與ZooKeeper的連接狀態(tài)褒翰。當KafkaController與ZooKeeper的連接超時后創(chuàng)建新連接時會觸發(fā)SessionExpirationListener.handleNewSession()方法。

image.png

KafkaController的啟動邏輯委托給了ZooKeeperLeaderElector.startup()方法完成匀泊。在ZookeeperLeaderElector中有兩個比較重要的字段:

  • ·leaderId:緩存當前的Controller LeaderId优训。
  • ·leaderChangeListener:LeaderChangeListener會監(jiān)聽Zookeeper的“/controller”節(jié)點的數(shù)據(jù)變化,當此節(jié)點中保存的LeaderId發(fā)生變化時各聘,會觸發(fā)LeaderChangeListener進行相應的處理揣非。
image.png

當“/controller”節(jié)點中的數(shù)據(jù)被刪除時會觸發(fā)handleDataDeleted()方法進行處理。

image.png

ZookeeperLeaderElector.startup()方法的邏輯是注冊LeaderChangeListener后躲因,立即調(diào)用elect()方法嘗試進行Controller Leader的選舉早敬。

image.png

KafkaController觸發(fā)選舉的地方有三處,如圖4-65所示毛仪。依次是:第一次啟動的時候搁嗓;LeaderChangeListener監(jiān)聽到“/controller”節(jié)點中數(shù)據(jù)被刪除芯勘;ZooKeeper連接過期并重新建立之后箱靴。


image.png

ZookeeperLeaderElector.elect()方法的具體實現(xiàn)如下:


image.png

onControllerFailover
ZookeeperLeaderElector.elect()方法中調(diào)用的回調(diào)函數(shù)onBecomingLeader()實際上是ZookeeperLeaderElector構造函數(shù)中傳入的KafkaController.onControllerFailover()方法。當前Broker成功選舉為Controller Leader時會通過該方法完成一系列的初始化操作荷愕,其具體步驟如下

image.png
image.png

此過程的組件我們在前面已經(jīng)詳細分析過了衡怀,這里我們關注一下initializeControllerContext()方法,了解ControllerContext到底從ZooKeeper中讀取了哪些數(shù)據(jù)安疗。


image.png

Partition Rebalance
在KafkaController.onControllerFailover()方法中會啟動一個名為“partition-rebalance”的周期性的定時任務抛杨,它提供了分區(qū)的自動均衡功能。該定時任務會周期性地調(diào)用KafkaController.checkAndTriggerPartitionRebalance()方法對失衡的Broker上相關的分區(qū)進行“優(yōu)先副本”選舉荐类,使得相關分區(qū)的“優(yōu)先副本”重新成為Leader副本怖现,整個集群中Leader副本的分布也會重新恢復平衡

checkAndTriggerPartitionRebalance()方法首先獲取“優(yōu)先副本”所在的BrokerId與分區(qū)的對應關系,然后利用此對應關系,計算每個“優(yōu)先副本”所在的Broker的“imbalance”比率壤巷,該值是當前Leader副本為非“優(yōu)先副本”的分區(qū)與此Broker上分區(qū)總數(shù)的比值监嗜,當“imbalance”比率大于一定閾值卒暂,則觸發(fā)“優(yōu)先副本”選舉。

為了讀者更好地理解含義铁追,現(xiàn)舉例說明。假設集群中有0~2三個Broker茫船,某Topic有15個分區(qū)琅束,Leader副本即為“優(yōu)先副本”,它們的分布如表4-6所示算谈。

image.png

當Kafka集群運行一段時間涩禀,期間某些Broker可能出現(xiàn)過宕機,導致Leader副本發(fā)生遷移然眼,現(xiàn)在Leader副本的分布如表4-7所示埋泵。

image.png

此時的Leader副本分布已經(jīng)明顯不均勻了,Broker2上分布了5個分區(qū)的副本罪治,但當前有4個分區(qū)并不是以“優(yōu)先副本”為Leader副本丽声,Broker2的“imbalance”比率為4/5=80%,默認的閾值為10%觉义,此時會觸發(fā)對Partition5雁社、Partition8、Partition11晒骇、Partition14的“優(yōu)先副本”選舉霉撵。

KafkaController.checkAndTriggerPartitionRebalance()方法的實現(xiàn)如下

image.png
image.png

KafkaController成為Controller Leader后進行的一系列操作的具體流程和實現(xiàn)到這里就介紹完了。下面我們來分析KafkaController如何從Controller Leader變成Follower洪囤。

onControllerResignation
通過前面對LeaderChangeListener源碼的分析我們知道徒坡,當它監(jiān)聽到“/controller”中的數(shù)據(jù)被刪除或改變時,舊的Controller Leader需要調(diào)用onResigningAsLeader()回調(diào)函數(shù)進行一些清理工作瘤缩,它實際是ZookeeperLeaderElector構造函數(shù)中傳入的KafkaController.onControllerResignation()方法喇完,其實現(xiàn)如下:

image.png

處理ControlledShutdownRequest
在前面介紹的BrokerChangeListener可以根據(jù)ZooKeeper中“/brokers/ids”的子節(jié)點變化,處理Broker故障宕機的場景剥啤。在有些場景中用戶希望主動關閉正常運行的Broker锦溪,例如,更換硬件府怯、操作系統(tǒng)升級刻诊、修改Kafka的配置等。如果依然使用上述方式關閉Broker牺丙,就略顯粗暴则涯。在Kafka中提供了Controlled Shutdown的方式來關閉一個Broker實例。

使用Controlled Shutdown的方式主動下線Broker有兩個好處:一是可以讓日志文件完全同步到磁盤上,在Broker下次重現(xiàn)上線時不需要進行Log的恢復操作粟判;二是Controller Shutdown方式在關閉Broker之前肖揣,會對其上的Leader副本進行遷移,這樣就可以減少分區(qū)的不可用時間浮入。

在Kafka之前的版本中需要使用命令行工具向Controller Leader發(fā)送ControlledShutdownRequest請求龙优。在本書分析的版本中,已經(jīng)將該過程寫成了JVM的關閉鉤子事秀。在Kafka.main()方法中可以看到這段代碼:

image.png

KafkaController.shutdownBroker()方法是ControlledShutdownRequest的核心彤断,該方法會使用ControlledShutdownLeaderSelector重新選擇Leader副本和ISR集合,實現(xiàn)Leader副本的遷移易迹≡籽茫回顧一下ControlledShutdownLeaderSelector的策略:從當前ISR集合中排除正在關閉的副本后的剩余副本作為新ISR集合,從新ISR集合中選擇新的Leader睹欲,需要向可用的AR發(fā)送LeaderAndIsrRequest供炼。shutdownBroker()方法的代碼如下所示。

image.png
image.png

在本節(jié)中我們介紹了ContextController如何管理集群相關的各種信息窘疮,包括Broker袋哼、Topic、分區(qū)闸衫、副本等元數(shù)據(jù)涛贯,以及讀取、更新ZooKeeper的時機蔚出。了解了ControllerChannelManager和ControllerBrokerRequestBatch如何幫助KafkaController完成與集群中其他的Broker的通信弟翘,以及LeaderAndIsrRequest和UpdateMetadataRequest等請求的格式。介紹了管理分區(qū)狀態(tài)的PartitionStateMachine組件和管理副本狀態(tài)的ReplicaStateMachine骄酗,以及在不同場景下使用的PartitionLeaderSelector策略稀余。通過注冊不同多種不同類型的ZooKeeper Listener,可以實現(xiàn)多種管理功能趋翻,例如:Topic增刪的相關處理睛琳、Broker故障下線的處理、分區(qū)的副本重新分配嘿歌、“優(yōu)先副本”選舉掸掏,等等茁影。我們還介紹了一個Broker如何成為Controller Leader宙帝,成為Controller Leader后的初始化流程以及放棄Controller Leader角色后的清理操作。最后募闲,又分析了Controlled Shutdown方式下一個Broker的具體實現(xiàn)步脓。希望讀者通過本節(jié)的閱讀,能夠?qū)afkaController的功能和實現(xiàn)有清晰的了解。

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末靴患,一起剝皮案震驚了整個濱河市仍侥,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌鸳君,老刑警劉巖农渊,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異或颊,居然都是意外死亡砸紊,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進店門囱挑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來醉顽,“玉大人,你說我怎么就攤上這事平挑∮翁恚” “怎么了?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵通熄,是天一觀的道長唆涝。 經(jīng)常有香客問我,道長唇辨,這世上最難降的妖魔是什么石抡? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮助泽,結果婚禮上啰扛,老公的妹妹穿的比我還像新娘。我一直安慰自己嗡贺,他們只是感情好隐解,可當我...
    茶點故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著诫睬,像睡著了一般煞茫。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上摄凡,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天续徽,我揣著相機與錄音,去河邊找鬼亲澡。 笑死钦扭,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的床绪。 我是一名探鬼主播客情,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼其弊,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了膀斋?” 一聲冷哼從身側(cè)響起梭伐,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎仰担,沒想到半個月后糊识,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡摔蓝,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年技掏,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片项鬼。...
    茶點故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡哑梳,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出绘盟,到底是詐尸還是另有隱情鸠真,我是刑警寧澤,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布龄毡,位于F島的核電站吠卷,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏沦零。R本人自食惡果不足惜祭隔,卻給世界環(huán)境...
    茶點故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望路操。 院中可真熱鬧疾渴,春花似錦、人聲如沸屯仗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽魁袜。三九已至桩撮,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間峰弹,已是汗流浹背店量。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留鞠呈,地道東北人融师。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像粟按,于是被迫代替她去往敵國和親诬滩。 傳聞我的和親對象是個殘疾皇子霹粥,可洞房花燭夜當晚...
    茶點故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內(nèi)容

  • [TOC] 6.1 協(xié)議設計 在實際應用中灭将, Kafka 經(jīng)常被用作高性能疼鸟、可擴展的消息中間件 。 Kafka 自...
    tracy_668閱讀 1,187評論 0 6
  • 1. Kafka簡介 Kafka是一種分布式的庙曙,基于發(fā)布/訂閱的消息系統(tǒng)空镜。主要設計目標如下: 以時間復雜度為O(1...
    Vernon閱讀 373評論 0 0
  • 一、為什么需要消息系統(tǒng) 1.解耦: 允許你獨立的擴展或修改兩邊的處理過程捌朴,只要確保它們遵守同樣的接口約束吴攒。 2.冗...
    join_a922閱讀 434評論 0 0
  • Kafka的架構 包括Kafka的基本組成,Kafka的拓撲結構以及Kafka的內(nèi)部通信協(xié)議砂蔽。Kafka內(nèi)部的通信...
    陳晨_軟件五千言閱讀 1,891評論 0 9
  • 今天感恩節(jié)哎洼怔,感謝一直在我身邊的親朋好友。感恩相遇左驾!感恩不離不棄镣隶。 中午開了第一次的黨會,身份的轉(zhuǎn)變要...
    迷月閃星情閱讀 10,567評論 0 11