KAFKA的啟動(dòng)
直接運(yùn)行Kafka.scala中的main方法(需要指定啟動(dòng)參數(shù)靶累,也就是server.properties的位置)來(lái)啟動(dòng)Kafka宏蛉。因?yàn)閗afka依賴(lài)zookeeper,所以我們需要提前啟動(dòng)zookeeper俘种,然后在server.properties中指定zk地址后,啟動(dòng)。
看一下main()方法:
def main(args: Array[String]): Unit = {
try {
// 加載對(duì)應(yīng)的server.properties配置文件,并生成Properties實(shí)例.
val serverProps = getPropsFromArgs(args)
//這里生成一個(gè)KafkaServer的實(shí)例,這個(gè)實(shí)例生成時(shí),會(huì)在實(shí)例中同時(shí)生成一個(gè)KafkaServer的實(shí)例,
// 生成KafkaServer實(shí)例前,需要先通過(guò)serverProps生成出一個(gè)KafkaConfig的實(shí)例.
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
kafkaServerStartable.shutdown
}
})
// 停止 服務(wù)
kafkaServerStartable.startup
kafkaServerStartable.awaitShutdown
}
catch {
case e: Throwable =>
fatal(e)
System.exit(1)
}
System.exit(0)
}
根據(jù)properties生成server實(shí)例
getPropsFromArgs(args)缚柳,這一行很明確,就是從配置文件中讀取我們配置的內(nèi)容搪锣,然后賦值給serverProps秋忙。
KafkaServerStartable.fromProps(serverProps),
object KafkaServerStartable {
def fromProps(serverProps: Properties) = {
KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
new KafkaServerStartable(KafkaConfig.fromProps(serverProps))
}
}
這塊主要是啟動(dòng)了一個(gè)內(nèi)部的監(jiān)控服務(wù)(內(nèi)部狀態(tài)監(jiān)控)构舟。
KafkaServer的啟動(dòng)
下面是一個(gè)在java中常見(jiàn)的鉤子函數(shù)灰追,在關(guān)閉時(shí)會(huì)啟動(dòng)一些銷(xiāo)毀程序,保證程序安全關(guān)閉。kafkaServerStartable.startup弹澎。跟進(jìn)去可以很清楚的看到朴下,里面調(diào)用的方法是KafkaServer中的startup方法:
// 啟動(dòng)kafka的調(diào)度器,這個(gè)KafkaScheduler的實(shí)例生成時(shí)需要得到background.threads配置的值,默認(rèn)是10個(gè),用于配置后臺(tái)線程池的個(gè)數(shù)
def startup() {
try {
info("starting")
if(isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
if(startupComplete.get)
return
val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
brokerState.newState(Starting)
// 啟動(dòng)scheduler 實(shí)例
/* start scheduler */
kafkaScheduler.startup()
// 生產(chǎn)zk 初始化 并依賴(lài) 判斷 broker 是否發(fā)生變化
/* setup zookeeper */
zkUtils = initZk()
// 初始化創(chuàng)建并啟動(dòng)LogManager的實(shí)例,
/* start log manager */
logManager = createLogManager(zkUtils.zkClient, brokerState)
logManager.startup()
// 如果broker.id的配置沒(méi)有配置(小于0的值時(shí)),同時(shí)broker.id.generation.enable配置為true,默認(rèn)也就是true,
// 這個(gè)時(shí)候根據(jù)zk中/brokers/seqid路徑的version值,第一次從0開(kāi)始,每次增加.并加上reserved.broker.max.id配置的值,默認(rèn)是1000,
//來(lái)充當(dāng)這個(gè)server的broker.id,同時(shí)把這個(gè)broker.id更新到logDir目錄下的meta.properties文件中,
//下次讀取時(shí),直接讀取這個(gè)配置文件中的broker.id的值,而不需要重新進(jìn)行創(chuàng)建.
/* generate brokerId */
config.brokerId = getBrokerId
this.logIdent = "[Kafka Server " + config.brokerId + "], "
// 啟動(dòng) kafka 的sockerServer
socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
socketServer.startup()
//,生成并啟動(dòng)ReplicaManager,此實(shí)例依賴(lài)kafkaScheduler與logManager實(shí)例.
/* start replica manager */
replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
isShuttingDown)
replicaManager.startup()
//生成并啟動(dòng)KafkaController實(shí)例,此使用用于控制當(dāng)前的broker中的所有的leader的partition的操作.
/* start kafka controller */
kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
kafkaController.startup()
//生成并啟動(dòng)GroupCoordinator的實(shí)例,這個(gè)是0.9新加入的一個(gè)玩意,用于對(duì)consumer中新加入的與partition的檢查,并對(duì)partition與consumer進(jìn)行平衡操作.
/* start group coordinator */
groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime)
groupCoordinator.startup()
// 根據(jù)authorizer.class.name配置項(xiàng)配置的Authorizer的實(shí)現(xiàn)類(lèi),生成一個(gè)用于認(rèn)證的實(shí)例,用于對(duì)用戶(hù)的操作進(jìn)行認(rèn)證.這個(gè)默認(rèn)為不認(rèn)證.
/* Get the authorizer and initialize it if one is specified.*/
authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
authZ
}
// 成用于對(duì)外對(duì)外提供服務(wù)的KafkaApis實(shí)例,并設(shè)置當(dāng)前的broker的狀態(tài)為運(yùn)行狀態(tài)
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)
Mx4jLoader.maybeLoad()
//生成動(dòng)態(tài)配置修改的處理管理,主要是topic修改與client端配置的修改,并把已經(jīng)存在的clientid對(duì)應(yīng)的配置進(jìn)行修改.
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config),
ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))
// Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
// TODO: Move this logic to DynamicConfigManager
AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {
case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)
}
// 創(chuàng)建一個(gè)配置實(shí)例 并發(fā)起通知給個(gè)個(gè)block
// Create the config manager. start listening to notifications
dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
dynamicConfigManager.startup()
/* tell everyone we are alive */
val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
if (endpoint.port == 0)
(protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))
else
(protocol, endpoint)
}
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
config.interBrokerProtocolVersion)
kafkaHealthcheck.startup()
// Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
checkpointBrokerId(config.brokerId)
/* register broker metrics */
registerStats()
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}
首先判斷是否目前正在關(guān)閉中或者已經(jīng)啟動(dòng)了,這兩種情況直接拋出異常苦蒿。然后是一個(gè)CAS的操作isStartingUp殴胧,防止線程并發(fā)操作啟動(dòng),判斷是否可以啟動(dòng)佩迟。如果可以啟動(dòng)团滥,就開(kāi)始我們的啟動(dòng)過(guò)程。
構(gòu)造Metrics類(lèi)
定義broker狀態(tài)為啟動(dòng)中starting
啟動(dòng)定時(shí)器kafkaScheduler.startup()
構(gòu)造zkUtils:利用參數(shù)中的zk信息报强,啟動(dòng)一個(gè)zk客戶(hù)端
啟動(dòng)文件管理器:讀取zk中的配置信息灸姊,包含__consumer_offsets和system.topic。重點(diǎn)是啟動(dòng)一些定時(shí)任務(wù)秉溉,來(lái)刪除符合條件的記錄(cleanupLogs)力惯,清理臟記錄(flushDirtyLogs),把所有記錄寫(xiě)到一個(gè)文本文件中召嘶,防止在啟動(dòng)時(shí)重啟所有的記錄文件(checkpointRecoveryPointOffsets)父晶。
/**
* Start the background threads to flush logs and do log cleanup
*/
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
}
if(cleanerConfig.enableCleaner)
cleaner.startup()
}