- Controller這個(gè)角色是在kafka 0.8以后添加的,它負(fù)責(zé)的功能很多;
- Topic的創(chuàng)始, Partition leader的選取, Partition的增加, PartitionReassigned, PreferredReplicaElection, Topic的刪除等;
選主
Kafkak中有多處涉及到選主和failover, 比如Controller, 比如Partition leader. 我們先來看下和選主有關(guān)的類;
LeaderElector
- 所在文件: core/src/main/scala/kafka/server/LeaderElector.scala
- 是個(gè)trait, 源碼中的注釋:
This trait defines a leader elector If the existing leader is dead, this class will handle automatic re-election and if it succeeds, it invokes the leader state change callback
- 接口:
trait LeaderElector extends Logging {
def startup // 啟動(dòng)
def amILeader : Boolean //標(biāo)識(shí)是否為主
def elect: Boolean //選主
def close //關(guān)閉
}
ZookeeperLeaderElector
- 所在文件: core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
- 實(shí)現(xiàn)了 trait LeaderElector
- 基于zookeeper臨時(shí)節(jié)點(diǎn)的搶占式選主策略, 多個(gè)備選者都去zk上注冊(cè)同一個(gè)臨時(shí)節(jié)點(diǎn), 但zk保證同時(shí)只有一個(gè)備選者注冊(cè)成功, 此備選者即成為leader, 然后大家都watch這個(gè)臨時(shí)節(jié)點(diǎn), 一旦此臨時(shí)節(jié)點(diǎn)消失, watcher被觸發(fā), 各備選者又一次開始搶占選主;
-
startup方法
: 先watch這個(gè)zk節(jié)點(diǎn), 然后調(diào)用elect
;
def startup {
inLock(controllerContext.controllerLock) {
controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
elect
}
}
-
elect方法
:
-
controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
這個(gè)leaderChangeListener
被觸發(fā)時(shí):
1. 臨時(shí)節(jié)點(diǎn)數(shù)據(jù)發(fā)生變化handleDataChange
: 如果改變前是leader, 改變后不是leader, 則回調(diào)onResigningAsLeader()
;
2. 臨時(shí)節(jié)點(diǎn)被刪除handleDataDeleted
: 如果當(dāng)前是leader, 則回調(diào)onResigningAsLeader()
并同次調(diào)用elect
開始搶占式選主;
KafkaController的選主與Failover
- 使用
ZookeeperLeaderElector
作選主和Failover
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
onControllerResignation, config.brokerId)
- 在zk上的臨時(shí)節(jié)點(diǎn):
ZkUtils.ControllerPath = /controller
-
KafkaController::startup
:
def startup() = {
inLock(controllerContext.controllerLock) {
info("Controller starting up")
registerSessionExpirationListener()
isRunning = true
controllerElector.startup
info("Controller startup complete")
}
}
其中
registerSessionExpirationListener()
注冊(cè)zk連接的狀態(tài)回調(diào),處理SessionExpiration;
controllerElector.startup
開始選主和Failover;
-
onControllerFailover
: 變?yōu)閘eader時(shí)被回調(diào),
設(shè)置當(dāng)前broker的狀態(tài)為RunningAsController
作下面的事情:
This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
It does the following things on the become-controller state change -
1. Register controller epoch changed listener
2. Increments the controller epoch
3. Initializes the controller's context object that holds cache objects for current topics, live brokers and leaders for all existing partitions.
4. Starts the controller's channel manager
5. Starts the replica state machine
6. Starts the partition state machine