一般來說劲件,我們是通過命令來啟動kafka,但是命令的本質(zhì)還是調(diào)用代碼中的main方法约急,所以零远,我們重點看下啟動類Kafka。源碼下下來之后厌蔽,我們也可以通過直接運行Kafka.scala中的main方法(需要指定啟動參數(shù)牵辣,也就是server.properties的位置)來啟動Kafka。因為kafka依賴zookeeper奴饮,所以我們需要提前啟動zookeeper纬向,然后在server.properties中指定zk地址后,啟動戴卜。
下面我們首先看一下main()方法:
def main(args: Array[String]): Unit = {
try {
val serverProps = getPropsFromArgs(args)
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
kafkaServerStartable.shutdown
}
})
kafkaServerStartable.startup
kafkaServerStartable.awaitShutdown
}
catch {
case e: Throwable =>
fatal(e)
System.exit(1)
}
System.exit(0)
}
我們慢慢來分析下逾条,首先是getPropsFromArgs(args),這一行很明確投剥,就是從配置文件中讀取我們配置的內(nèi)容师脂,然后賦值給serverProps。第二步江锨,KafkaServerStartable.fromProps(serverProps)吃警,
object KafkaServerStartable {
def fromProps(serverProps: Properties) = {
KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
new KafkaServerStartable(KafkaConfig.fromProps(serverProps))
}
}
這塊主要是啟動了一個內(nèi)部的監(jiān)控服務(wù)(內(nèi)部狀態(tài)監(jiān)控)。
下面是一個在java中常見的鉤子函數(shù)啄育,在關(guān)閉時會啟動一些銷毀程序酌心,保證程序安全關(guān)閉。之后就是我們啟動的重頭戲了:kafkaServerStartable.startup挑豌。跟進(jìn)去可以很清楚的看到安券,里面調(diào)用的方法是KafkaServer中的startup方法墩崩,下面我們重點看下這個方法(比較長):
def startup() {
try {
info("starting")
if(isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
if(startupComplete.get)
return
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
brokerState.newState(Starting)
/* start scheduler */
kafkaScheduler.startup()
/* setup zookeeper */
zkUtils = initZk()
/* start log manager */
logManager = createLogManager(zkUtils.zkClient, brokerState)
logManager.startup()
/* generate brokerId */
config.brokerId = getBrokerId
this.logIdent = "[Kafka Server " + config.brokerId + "], "
socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
socketServer.startup()
/* start replica manager */
replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
isShuttingDown)
replicaManager.startup()
/* start kafka controller */
kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
kafkaController.startup()
/* start group coordinator */
groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime)
groupCoordinator.startup()
/* Get the authorizer and initialize it if one is specified.*/
authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
authZ
}
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)
Mx4jLoader.maybeLoad()
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config),
ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))
// Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
// TODO: Move this logic to DynamicConfigManager
AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {
case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)
}
// Create the config manager. start listening to notifications
dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
dynamicConfigManager.startup()
/* tell everyone we are alive */
val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
if (endpoint.port == 0)
(protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))
else
(protocol, endpoint)
}
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
config.interBrokerProtocolVersion)
kafkaHealthcheck.startup()
// Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
checkpointBrokerId(config.brokerId)
/* register broker metrics */
registerStats()
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}
首先判斷是否目前正在關(guān)閉中或者已經(jīng)啟動了,這兩種情況直接拋出異常完疫。然后是一個CAS的操作isStartingUp泰鸡,防止線程并發(fā)操作啟動,判斷是否可以啟動壳鹤。如果可以啟動盛龄,就開始我們的啟動過程。
- 構(gòu)造Metrics類
- 定義broker狀態(tài)為啟動中starting
- 啟動定時器kafkaScheduler.startup()
- 構(gòu)造zkUtils:利用參數(shù)中的zk信息芳誓,啟動一個zk客戶端
- 啟動文件管理器:讀取zk中的配置信息余舶,包含__consumer_offsets和system.topic。重點是啟動一些定時任務(wù)锹淌,來刪除符合條件的記錄(cleanupLogs)匿值,清理臟記錄(flushDirtyLogs),把所有記錄寫到一個文本文件中赂摆,防止在啟動時重啟所有的記錄文件(checkpointRecoveryPointOffsets)挟憔。
/**
* Start the background threads to flush logs and do log cleanup
*/
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
}
if(cleanerConfig.enableCleaner)
cleaner.startup()
}
- 下一步,獲取brokerId
- 啟動一個NIO socket服務(wù)
- 啟動復(fù)制管理器:啟動ISR超時處理線程
- 啟動kafka控制器:注冊session過期監(jiān)聽器烟号,同時啟動控制器leader選舉
- 啟動協(xié)調(diào)器
- 權(quán)限認(rèn)證
- 開啟線程绊谭,開始處理請求
- 開啟配置監(jiān)聽,主要是監(jiān)聽zk節(jié)點數(shù)據(jù)變化汪拥,然后廣播到所有機(jī)器
- 開啟健康檢查:目前只是把broker節(jié)點注冊到zk上达传,注冊成功就是活的,否則就是dead
- 注冊啟動數(shù)據(jù)信息
- 啟動成功
- 等待關(guān)閉countDownLatch迫筑,如果shutdownLatch變?yōu)?宪赶,則關(guān)閉Kafka