- 從本章開(kāi)始我們來(lái)介紹一個(gè)kafka集群逐步建立的過(guò)程;
- 集群中只有一臺(tái)broker;
- topic的創(chuàng)建;
- 增加多臺(tái)broker;
- 擴(kuò)展已存在topic的partition;
第一個(gè)broker(我們叫它B1)啟動(dòng)
- broker啟動(dòng)流程,請(qǐng)參考Kafka初始化流程與請(qǐng)求處理;
- broker在啟動(dòng)過(guò)程中, 會(huì)先啟動(dòng)
KafkaController
, 因?yàn)榇藭r(shí)只有一臺(tái)broker B1, 它將被選為當(dāng)前kafka集群的Controller, 過(guò)程可參考KafkaController分析1-選主和Failover; - 當(dāng)broker B1變?yōu)镃ontroller后會(huì)作一系列的初始化, 具體參考 KafkaController分析7-啟動(dòng)流程, 包括以下幾點(diǎn):
a. 更新zk上的controller epoch信息;
b. 注冊(cè)zk上的broker/topic節(jié)點(diǎn)變化事件通知;
c. 初始化ControllerContext, 主要是從zk上獲取broker, topic, parition, isr, partition leader, replicas等信息;
d. 啟動(dòng)ReplicaStateMachine;
e. 啟動(dòng)PartitionStateMachine;
f. 發(fā)送所有的partition信息(leader, isr, replica, epoch等)到所有的 live brokers;
g. 如果允許自動(dòng)leader rebalance的話, 則啟動(dòng)AutoRebalanceScheduler;
- Controller初始化完成后,
KafkaHealthcheck
啟動(dòng),在zk的/brokers
下面注冊(cè)自己的信息,類似下面這樣:
{"jmx_port":-1,"timestamp":"1477624160337","endpoints":["PLAINTEXT://10.123.81.11:9092"],"host":"10.123.81.11","version":3,"port":9092}
-
KafkaController
中的ReplicaStateMachine
已經(jīng)啟動(dòng)且注冊(cè)了BrokerChangeListener
事件通知, 因?yàn)楫?dāng)KafkaHealthcheck
啟動(dòng)結(jié)束后,BrokerChangeListener
被觸發(fā):
def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
inLock(controllerContext.controllerLock) {
if (hasStarted.get) {
ControllerStats.leaderElectionTimer.time {
try {
val curBrokerIds = currentBrokerList.map(_.toInt).toSet
val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
val newBrokerInfo = newBrokerIds.map(zkUtils.getBrokerInfo(_))
val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
controllerContext.liveBrokers = curBrokerIds.map(zkUtils.getBrokerInfo(_)).filter(_.isDefined).map(_.get)
newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
if(newBrokerIds.size > 0)
controller.onBrokerStartup(newBrokerIds.toSeq)
if(deadBrokerIds.size > 0)
controller.onBrokerFailure(deadBrokerIds.toSeq)
} catch {
case e: Throwable => error("Error while handling broker changes", e)
}
}
}
}
}
}
干三件事:
- 更新
ControllerContext.liveBrokers
; - 獲取新增的broker列表, 回調(diào)
KafkaController.onBrokerStartup(newBrokerIds.toSeq);
- 獲取死掉的broker列表, 回調(diào)
KafkaController.onBrokerFailure(deadBrokerIds.toSeq)
;
-
KafkaController.onBrokerStartup
: 針對(duì)新增的brokers作處理, 由于現(xiàn)在只有一個(gè)broker并且也沒(méi)有任何的topic, 因此這里基本上是什么都不會(huì)作;
創(chuàng)建Topic
- 目前kafka支持兩種方式創(chuàng)建topic:
- 如果kafka啟動(dòng)時(shí)允許自動(dòng)創(chuàng)建topic(可以在配置文件中指定
auto.create.topics.enable=true
), 則發(fā)送消息到kafka時(shí),若topic不存在,會(huì)自動(dòng)創(chuàng)建; - 使用admin工具(bin/kafka-topics.sh)先行創(chuàng)建, 我們這里講解這種方式;
- 如果kafka啟動(dòng)時(shí)允許自動(dòng)創(chuàng)建topic(可以在配置文件中指定
- 在使用
bin/kafka-topic.sh
腳本來(lái)創(chuàng)建topic時(shí), topic的config會(huì)被寫(xiě)入zk的/config/topics/[topic]下, topic的parition分配信息會(huì)被寫(xiě)入zk的/brokers/topics/[topic]下.
其中parition的分配信息用戶可以指定,也可由kafka-topic.sh腳本自動(dòng)產(chǎn)生,產(chǎn)生規(guī)則如下:
如查未指定開(kāi)始位置,就隨機(jī)選擇一位置開(kāi)始庸疾,通過(guò)輪詢方式分配每個(gè)分區(qū)的第一個(gè)replica的位置, 然后每個(gè)partition剩余的replicas的位置緊跟著其第一個(gè)replica的位置.
假設(shè)一個(gè)集群有5個(gè)broker, 有個(gè)topic有10個(gè)parition, 每個(gè)parition有3個(gè)復(fù)本,則分配如下圖:
-
KafkaController
中的PartitionStateMachine
組件在啟動(dòng)時(shí)注冊(cè)了TopicChangeListener
(監(jiān)控聽(tīng)/brokers/topics), 此時(shí)被觸發(fā):
val currentChildren = {
import JavaConversions._
(children: Buffer[String]).toSet
}
val newTopics = currentChildren -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- currentChildren
controllerContext.allTopics = currentChildren
val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
!deletedTopics.contains(p._1.topic))
controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
if(newTopics.size > 0)
controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
干三件事
- 更新
ControllerContext.allTopics
; - 更新
ControllerContext.partitionReplicaAssignment
; - 回調(diào)
KafkaController.onNewTopicCreation
;
-
KafkaController.onNewTopicCreation
: 處理新topic的創(chuàng)建
partitionStateMachine.registerPartitionChangeListener(topic)
onNewPartitionCreation(newPartitions)
onNewPartitionCreation
的處理邏輯:
partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
- 針對(duì)每個(gè)新topic注冊(cè)
PartitionChangeListener
, 監(jiān)控其partition數(shù)量的改變; -
partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
: 將partition狀態(tài)變?yōu)?code>NewPartition; -
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
:將replica狀態(tài)變?yōu)?code>NewReplica, 由于目前partition并沒(méi)有進(jìn)行選主操作,因此無(wú)其他操作被觸發(fā); -
partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
:
4.1 將partition狀態(tài)由NewPartition
->OnlinePartition
;
4.2 選取replicas列表的head作為leader, 將leader, isr信息寫(xiě)入zk的/brokers/topics/[topic]/partitions/[partition id]/state
4.3BrokerRequestBatch.addLeaderAndIsrRequestForBrokers
: 構(gòu)造 LeaderAndIsr Request,發(fā)送到各live broker, 這個(gè)request由broker內(nèi)部的ReplicaManager
組件處理,我們后面會(huì)有專門(mén)的章節(jié)來(lái)分析它;
4.4replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
: 將replica狀態(tài)由NewReplica
->OnlineReplica
;
第二個(gè)broker(我們叫它B2)啟動(dòng)
-
KafkaController
組件依然啟動(dòng), 選主時(shí)發(fā)現(xiàn)已有controller存在則不繼續(xù)進(jìn)行選主,但仍監(jiān)聽(tīng)LeaderChangeListener
事件; -
KafkaHealthcheck
啟動(dòng),在zk的/brokers
下面注冊(cè)自己的信息; - 第一個(gè)broker B1此時(shí)作為Controller,
BrokerChangeListener
被觸發(fā), 獲取新增的broker列表, 回調(diào)KafkaController.onBrokerStartup(newBrokerIds.toSeq);
-
sendUpdateMetadataRequest
: 發(fā)送所有topic的partitionStateInfos到各broker; - 針對(duì)新啟動(dòng)的broker, 調(diào)用
replicaStateMachine.handleStateChanges
更新replica狀態(tài)到OnlineReplica
:
val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
-
partitionStateMachine.triggerOnlinePartitionStateChange()
:針對(duì)new and offline partitions進(jìn)行選主;
針對(duì)已存在的topic, 在第二個(gè)broker B2上新增一個(gè)patition
- 通過(guò)
kafka-topics.sh
腳本的alter命令, 在zk的/brokers/topics/[topic]下更新新增的partition的replicas信息; -
KafkaController
中的PartitionStateMachine
組件監(jiān)聽(tīng)的AddPartitionsListener
事件被觸發(fā):
val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
!controllerContext.partitionReplicaAssignment.contains(p._1))
if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))
error("Skipping adding partitions %s for topic %s since it is currently being deleted"
.format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
else {
if (partitionsToBeAdded.size > 0) {
info("New partitions to be added %s".format(partitionsToBeAdded))
controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
}
}
干三件事
- 獲取topic新增的partition的replicas信息;
- 更新
ControllerContext.partitionReplicaAssignment
; - 回調(diào)
KafkaController.onNewPartitionCreation
;
- 處理partition的新增:
def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
info("New partition creation callback for %s".format(newPartitions.mkString(",")))
partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
}