Kafka集群建立過(guò)程分析

  • 從本章開(kāi)始我們來(lái)介紹一個(gè)kafka集群逐步建立的過(guò)程;
  • 集群中只有一臺(tái)broker;
  • topic的創(chuàng)建;
  • 增加多臺(tái)broker;
  • 擴(kuò)展已存在topic的partition;

第一個(gè)broker(我們叫它B1)啟動(dòng)

a. 更新zk上的controller epoch信息;
b. 注冊(cè)zk上的broker/topic節(jié)點(diǎn)變化事件通知;
c. 初始化ControllerContext, 主要是從zk上獲取broker, topic, parition, isr, partition leader, replicas等信息;
d. 啟動(dòng)ReplicaStateMachine;
e. 啟動(dòng)PartitionStateMachine;
f. 發(fā)送所有的partition信息(leader, isr, replica, epoch等)到所有的 live brokers;
g. 如果允許自動(dòng)leader rebalance的話, 則啟動(dòng)AutoRebalanceScheduler;

  • Controller初始化完成后, KafkaHealthcheck啟動(dòng),在zk的/brokers下面注冊(cè)自己的信息,類似下面這樣:
    {"jmx_port":-1,"timestamp":"1477624160337","endpoints":["PLAINTEXT://10.123.81.11:9092"],"host":"10.123.81.11","version":3,"port":9092}
  • KafkaController中的ReplicaStateMachine已經(jīng)啟動(dòng)且注冊(cè)了BrokerChangeListener事件通知, 因?yàn)楫?dāng)KafkaHealthcheck啟動(dòng)結(jié)束后,BrokerChangeListener被觸發(fā):
def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
      inLock(controllerContext.controllerLock) {
        if (hasStarted.get) {
          ControllerStats.leaderElectionTimer.time {
            try {
              val curBrokerIds = currentBrokerList.map(_.toInt).toSet
              val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
              val newBrokerInfo = newBrokerIds.map(zkUtils.getBrokerInfo(_))
              val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
              val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
              controllerContext.liveBrokers = curBrokerIds.map(zkUtils.getBrokerInfo(_)).filter(_.isDefined).map(_.get)
              newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
              deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
              if(newBrokerIds.size > 0)
                controller.onBrokerStartup(newBrokerIds.toSeq)
              if(deadBrokerIds.size > 0)
                controller.onBrokerFailure(deadBrokerIds.toSeq)
            } catch {
              case e: Throwable => error("Error while handling broker changes", e)
            }
          }
        }
      }
    }
  }

干三件事:

  1. 更新ControllerContext.liveBrokers;
  2. 獲取新增的broker列表, 回調(diào)KafkaController.onBrokerStartup(newBrokerIds.toSeq);
  3. 獲取死掉的broker列表, 回調(diào)KafkaController.onBrokerFailure(deadBrokerIds.toSeq);
  • KafkaController.onBrokerStartup: 針對(duì)新增的brokers作處理, 由于現(xiàn)在只有一個(gè)broker并且也沒(méi)有任何的topic, 因此這里基本上是什么都不會(huì)作;

創(chuàng)建Topic

  • 目前kafka支持兩種方式創(chuàng)建topic:
    1. 如果kafka啟動(dòng)時(shí)允許自動(dòng)創(chuàng)建topic(可以在配置文件中指定auto.create.topics.enable=true), 則發(fā)送消息到kafka時(shí),若topic不存在,會(huì)自動(dòng)創(chuàng)建;
    2. 使用admin工具(bin/kafka-topics.sh)先行創(chuàng)建, 我們這里講解這種方式;
  • 在使用bin/kafka-topic.sh腳本來(lái)創(chuàng)建topic時(shí), topic的config會(huì)被寫(xiě)入zk的/config/topics/[topic]下, topic的parition分配信息會(huì)被寫(xiě)入zk的/brokers/topics/[topic]下.
    其中parition的分配信息用戶可以指定,也可由kafka-topic.sh腳本自動(dòng)產(chǎn)生,產(chǎn)生規(guī)則如下:
    如查未指定開(kāi)始位置,就隨機(jī)選擇一位置開(kāi)始庸疾,通過(guò)輪詢方式分配每個(gè)分區(qū)的第一個(gè)replica的位置, 然后每個(gè)partition剩余的replicas的位置緊跟著其第一個(gè)replica的位置.
    假設(shè)一個(gè)集群有5個(gè)broker, 有個(gè)topic有10個(gè)parition, 每個(gè)parition有3個(gè)復(fù)本,則分配如下圖:
1553745402.jpg
  • KafkaController中的PartitionStateMachine組件在啟動(dòng)時(shí)注冊(cè)了TopicChangeListener(監(jiān)控聽(tīng)/brokers/topics), 此時(shí)被觸發(fā):
           val currentChildren = {
              import JavaConversions._
              (children: Buffer[String]).toSet
            }
            val newTopics = currentChildren -- controllerContext.allTopics
            val deletedTopics = controllerContext.allTopics -- currentChildren
            controllerContext.allTopics = currentChildren

            val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
            controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
              !deletedTopics.contains(p._1.topic))
            controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
            if(newTopics.size > 0)
              controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)

干三件事

  1. 更新ControllerContext.allTopics;
  2. 更新ControllerContext.partitionReplicaAssignment;
  3. 回調(diào)KafkaController.onNewTopicCreation;
  • KafkaController.onNewTopicCreation: 處理新topic的創(chuàng)建
    partitionStateMachine.registerPartitionChangeListener(topic)
    onNewPartitionCreation(newPartitions)

onNewPartitionCreation的處理邏輯:

    partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
    partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
  1. 針對(duì)每個(gè)新topic注冊(cè)PartitionChangeListener, 監(jiān)控其partition數(shù)量的改變;
  2. partitionStateMachine.handleStateChanges(newPartitions, NewPartition): 將partition狀態(tài)變?yōu)?code>NewPartition;
  3. replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica):將replica狀態(tài)變?yōu)?code>NewReplica, 由于目前partition并沒(méi)有進(jìn)行選主操作,因此無(wú)其他操作被觸發(fā);
  4. partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector):
    4.1 將partition狀態(tài)由NewPartition -> OnlinePartition;
    4.2 選取replicas列表的head作為leader, 將leader, isr信息寫(xiě)入zk的/brokers/topics/[topic]/partitions/[partition id]/state
    4.3 BrokerRequestBatch.addLeaderAndIsrRequestForBrokers: 構(gòu)造 LeaderAndIsr Request,發(fā)送到各live broker, 這個(gè)request由broker內(nèi)部的ReplicaManager組件處理,我們后面會(huì)有專門(mén)的章節(jié)來(lái)分析它;
    4.4 replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica): 將replica狀態(tài)由NewReplica -> OnlineReplica;

第二個(gè)broker(我們叫它B2)啟動(dòng)

  • KafkaController組件依然啟動(dòng), 選主時(shí)發(fā)現(xiàn)已有controller存在則不繼續(xù)進(jìn)行選主,但仍監(jiān)聽(tīng)LeaderChangeListener事件;
  • KafkaHealthcheck啟動(dòng),在zk的/brokers下面注冊(cè)自己的信息;
  • 第一個(gè)broker B1此時(shí)作為Controller, BrokerChangeListener被觸發(fā), 獲取新增的broker列表, 回調(diào)KafkaController.onBrokerStartup(newBrokerIds.toSeq);
  1. sendUpdateMetadataRequest: 發(fā)送所有topic的partitionStateInfos到各broker;
  2. 針對(duì)新啟動(dòng)的broker, 調(diào)用replicaStateMachine.handleStateChanges更新replica狀態(tài)到OnlineReplica:
val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
    replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
  1. partitionStateMachine.triggerOnlinePartitionStateChange():針對(duì)new and offline partitions進(jìn)行選主;

針對(duì)已存在的topic, 在第二個(gè)broker B2上新增一個(gè)patition

  • 通過(guò)kafka-topics.sh腳本的alter命令, 在zk的/brokers/topics/[topic]下更新新增的partition的replicas信息;
  • KafkaController中的PartitionStateMachine組件監(jiān)聽(tīng)的AddPartitionsListener事件被觸發(fā):
          val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
          val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
            !controllerContext.partitionReplicaAssignment.contains(p._1))
          if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))
            error("Skipping adding partitions %s for topic %s since it is currently being deleted"
                  .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
          else {
            if (partitionsToBeAdded.size > 0) {
              info("New partitions to be added %s".format(partitionsToBeAdded))
              controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
              controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
            }
          }

干三件事

  1. 獲取topic新增的partition的replicas信息;
  2. 更新ControllerContext.partitionReplicaAssignment;
  3. 回調(diào)KafkaController.onNewPartitionCreation;
  • 處理partition的新增:
def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
    info("New partition creation callback for %s".format(newPartitions.mkString(",")))
    partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
    partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
  }

Kafka源碼分析-匯總

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市家卖,隨后出現(xiàn)的幾起案子畔况,更是在濱河造成了極大的恐慌丈氓,老刑警劉巖局冰,帶你破解...
    沈念sama閱讀 211,265評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蔚润,死亡現(xiàn)場(chǎng)離奇詭異奸远,居然都是意外死亡既棺,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門(mén)懒叛,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)丸冕,“玉大人,你說(shuō)我怎么就攤上這事薛窥∨种颍” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,852評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵诅迷,是天一觀的道長(zhǎng)佩番。 經(jīng)常有香客問(wèn)我,道長(zhǎng)罢杉,這世上最難降的妖魔是什么趟畏? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,408評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮滩租,結(jié)果婚禮上赋秀,老公的妹妹穿的比我還像新娘。我一直安慰自己律想,他們只是感情好猎莲,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,445評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著技即,像睡著了一般著洼。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上而叼,一...
    開(kāi)封第一講書(shū)人閱讀 49,772評(píng)論 1 290
  • 那天身笤,我揣著相機(jī)與錄音,去河邊找鬼澈歉。 笑死展鸡,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的埃难。 我是一名探鬼主播莹弊,決...
    沈念sama閱讀 38,921評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼涤久,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了忍弛?” 一聲冷哼從身側(cè)響起响迂,我...
    開(kāi)封第一講書(shū)人閱讀 37,688評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎细疚,沒(méi)想到半個(gè)月后蔗彤,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,130評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡疯兼,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,467評(píng)論 2 325
  • 正文 我和宋清朗相戀三年然遏,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片吧彪。...
    茶點(diǎn)故事閱讀 38,617評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡待侵,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出姨裸,到底是詐尸還是另有隱情秧倾,我是刑警寧澤,帶...
    沈念sama閱讀 34,276評(píng)論 4 329
  • 正文 年R本政府宣布傀缩,位于F島的核電站那先,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏赡艰。R本人自食惡果不足惜售淡,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,882評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望瞄摊。 院中可真熱鬧勋又,春花似錦、人聲如沸换帜。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,740評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)惯驼。三九已至蹲嚣,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間祟牲,已是汗流浹背隙畜。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,967評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留说贝,地道東北人议惰。 一個(gè)月前我還...
    沈念sama閱讀 46,315評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像乡恕,于是被迫代替她去往敵國(guó)和親言询。 傳聞我的和親對(duì)象是個(gè)殘疾皇子俯萎,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,486評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容