- 我們一般都是使用
bin/kafka-server-start.sh
腳本來啟動; - 從
bin/kafka-server-start.sh
可以知道此腳本用法:
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
(1)server.properties
為配置文件路徑, 這里config/server.properties
有一個配置文件的模板,里面就是一行行的key=value
;
(2)--override property=value
是若干個可項的參數(shù), 用來覆蓋server.properties配置文件中同名的配置項; - 從
bin/kafka-server-start.sh
最后一行exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
可知, Kafka啟動時的入口類為kafka.Kafka
, 我們直接來看這個類;
Kafka啟動入口類:kafk.Kafak
- 所在文件: core/src/main/scala/kafka/Kafka.scala
- 定義:
object Kafka extends Logging
- main函數(shù):
val serverProps = getPropsFromArgs(args)
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
kafkaServerStartable.shutdown //捕獲control-c中斷,停止當前服務
}
})
kafkaServerStartable.startup //啟動服務
kafkaServerStartable.awaitShutdown //等待服務結(jié)束
使用getPropsFromArgs方法來獲取各配置項, 然后將啟動和停止動作全部代理給KafkaServerStartable
類;
Kafka啟動代理類:KafkaServerStartable
- 伴生對象:
object KafkaServerStartable
提供fromProps方法來創(chuàng)建KafkaServerStartable
; - KafkaServerStartable對象創(chuàng)建時會同時創(chuàng)建
KafkaServer
, 這才是真正的主角;
def startup() {
try {
server.startup()
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
// KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code
System.exit(1)
}
}
def shutdown() {
try {
server.shutdown()
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e)
// Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
Runtime.getRuntime.halt(1)
}
}
/**
* Allow setting broker state from the startable.
* This is needed when a custom kafka server startable want to emit new states that it introduces.
*/
def setServerState(newState: Byte) {
server.brokerState.newState(newState)
}
def awaitShutdown() =
server.awaitShutdown