【Kafka源碼】Kafka啟動過程

一般來說劲件,我們是通過命令來啟動kafka,但是命令的本質(zhì)還是調(diào)用代碼中的main方法约急,所以零远,我們重點看下啟動類Kafka。源碼下下來之后厌蔽,我們也可以通過直接運行Kafka.scala中的main方法(需要指定啟動參數(shù)牵辣,也就是server.properties的位置)來啟動Kafka。因為kafka依賴zookeeper奴饮,所以我們需要提前啟動zookeeper纬向,然后在server.properties中指定zk地址后,啟動戴卜。

下面我們首先看一下main()方法:

  def main(args: Array[String]): Unit = {
    try {
      val serverProps = getPropsFromArgs(args)
      val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

      // attach shutdown handler to catch control-c
      Runtime.getRuntime().addShutdownHook(new Thread() {
        override def run() = {
          kafkaServerStartable.shutdown
        }
      })

      kafkaServerStartable.startup
      kafkaServerStartable.awaitShutdown
    }
    catch {
      case e: Throwable =>
        fatal(e)
        System.exit(1)
    }
    System.exit(0)
  }

我們慢慢來分析下逾条,首先是getPropsFromArgs(args),這一行很明確投剥,就是從配置文件中讀取我們配置的內(nèi)容师脂,然后賦值給serverProps。第二步江锨,KafkaServerStartable.fromProps(serverProps)吃警,

object KafkaServerStartable {
  def fromProps(serverProps: Properties) = {
    KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
    new KafkaServerStartable(KafkaConfig.fromProps(serverProps))
  }
}

這塊主要是啟動了一個內(nèi)部的監(jiān)控服務(wù)(內(nèi)部狀態(tài)監(jiān)控)。

下面是一個在java中常見的鉤子函數(shù)啄育,在關(guān)閉時會啟動一些銷毀程序酌心,保證程序安全關(guān)閉。之后就是我們啟動的重頭戲了:kafkaServerStartable.startup挑豌。跟進(jìn)去可以很清楚的看到安券,里面調(diào)用的方法是KafkaServer中的startup方法墩崩,下面我們重點看下這個方法(比較長):

def startup() {
    try {
      info("starting")

      if(isShuttingDown.get)
        throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")

      if(startupComplete.get)
        return

      val canStartup = isStartingUp.compareAndSet(false, true)
      if (canStartup) {
        metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)

        brokerState.newState(Starting)

        /* start scheduler */
        kafkaScheduler.startup()

        /* setup zookeeper */
        zkUtils = initZk()

        /* start log manager */
        logManager = createLogManager(zkUtils.zkClient, brokerState)
        logManager.startup()

        /* generate brokerId */
        config.brokerId =  getBrokerId
        this.logIdent = "[Kafka Server " + config.brokerId + "], "

        socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
        socketServer.startup()

        /* start replica manager */
        replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
          isShuttingDown)
        replicaManager.startup()

        /* start kafka controller */
        kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
        kafkaController.startup()

        /* start group coordinator */
        groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime)
        groupCoordinator.startup()

        /* Get the authorizer and initialize it if one is specified.*/
        authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
          val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
          authZ.configure(config.originals())
          authZ
        }

        /* start processing requests */
        apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator,
          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
        brokerState.newState(RunningAsBroker)

        Mx4jLoader.maybeLoad()

        /* start dynamic config manager */
        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config),
                                                           ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))

        // Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
        // TODO: Move this logic to DynamicConfigManager
        AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {
          case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)
        }

        // Create the config manager. start listening to notifications
        dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
        dynamicConfigManager.startup()

        /* tell everyone we are alive */
        val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
          if (endpoint.port == 0)
            (protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))
          else
            (protocol, endpoint)
        }
        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
          config.interBrokerProtocolVersion)
        kafkaHealthcheck.startup()

        // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
        checkpointBrokerId(config.brokerId)

        /* register broker metrics */
        registerStats()

        shutdownLatch = new CountDownLatch(1)
        startupComplete.set(true)
        isStartingUp.set(false)
        AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
        info("started")
      }
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
        isStartingUp.set(false)
        shutdown()
        throw e
    }
  }

首先判斷是否目前正在關(guān)閉中或者已經(jīng)啟動了,這兩種情況直接拋出異常完疫。然后是一個CAS的操作isStartingUp泰鸡,防止線程并發(fā)操作啟動,判斷是否可以啟動壳鹤。如果可以啟動盛龄,就開始我們的啟動過程。

  • 構(gòu)造Metrics類
  • 定義broker狀態(tài)為啟動中starting
  • 啟動定時器kafkaScheduler.startup()
  • 構(gòu)造zkUtils:利用參數(shù)中的zk信息芳誓,啟動一個zk客戶端
  • 啟動文件管理器:讀取zk中的配置信息余舶,包含__consumer_offsets和system.topic。重點是啟動一些定時任務(wù)锹淌,來刪除符合條件的記錄(cleanupLogs)匿值,清理臟記錄(flushDirtyLogs),把所有記錄寫到一個文本文件中赂摆,防止在啟動時重啟所有的記錄文件(checkpointRecoveryPointOffsets)挟憔。
  /**
   *  Start the background threads to flush logs and do log cleanup
   */
  def startup() {
    /* Schedule the cleanup task to delete old logs */
    if(scheduler != null) {
      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
      scheduler.schedule("kafka-log-retention", 
                         cleanupLogs, 
                         delay = InitialTaskDelayMs, 
                         period = retentionCheckMs, 
                         TimeUnit.MILLISECONDS)
      info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
      scheduler.schedule("kafka-log-flusher", 
                         flushDirtyLogs, 
                         delay = InitialTaskDelayMs, 
                         period = flushCheckMs, 
                         TimeUnit.MILLISECONDS)
      scheduler.schedule("kafka-recovery-point-checkpoint",
                         checkpointRecoveryPointOffsets,
                         delay = InitialTaskDelayMs,
                         period = flushCheckpointMs,
                         TimeUnit.MILLISECONDS)
    }
    if(cleanerConfig.enableCleaner)
      cleaner.startup()
  }
  • 下一步,獲取brokerId
  • 啟動一個NIO socket服務(wù)
  • 啟動復(fù)制管理器:啟動ISR超時處理線程
  • 啟動kafka控制器:注冊session過期監(jiān)聽器烟号,同時啟動控制器leader選舉
  • 啟動協(xié)調(diào)器
  • 權(quán)限認(rèn)證
  • 開啟線程绊谭,開始處理請求
  • 開啟配置監(jiān)聽,主要是監(jiān)聽zk節(jié)點數(shù)據(jù)變化汪拥,然后廣播到所有機(jī)器
  • 開啟健康檢查:目前只是把broker節(jié)點注冊到zk上达传,注冊成功就是活的,否則就是dead
  • 注冊啟動數(shù)據(jù)信息
  • 啟動成功
  • 等待關(guān)閉countDownLatch迫筑,如果shutdownLatch變?yōu)?宪赶,則關(guān)閉Kafka
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市脯燃,隨后出現(xiàn)的幾起案子搂妻,更是在濱河造成了極大的恐慌,老刑警劉巖辕棚,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件叽讳,死亡現(xiàn)場離奇詭異,居然都是意外死亡坟募,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進(jìn)店門邑狸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來懈糯,“玉大人,你說我怎么就攤上這事单雾∽” “怎么了她紫?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長屿储。 經(jīng)常有香客問我贿讹,道長,這世上最難降的妖魔是什么够掠? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任民褂,我火速辦了婚禮,結(jié)果婚禮上疯潭,老公的妹妹穿的比我還像新娘赊堪。我一直安慰自己,他們只是感情好竖哩,可當(dāng)我...
    茶點故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布哭廉。 她就那樣靜靜地躺著,像睡著了一般相叁。 火紅的嫁衣襯著肌膚如雪遵绰。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天增淹,我揣著相機(jī)與錄音椿访,去河邊找鬼。 笑死埠通,一個胖子當(dāng)著我的面吹牛赎离,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播端辱,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼梁剔,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了舞蔽?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤个盆,失蹤者是張志新(化名)和其女友劉穎颊亮,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體雹有,經(jīng)...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡溜宽,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年临梗,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,795評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖降铸,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情谅畅,我是刑警寧澤毡泻,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布雹顺,位于F島的核電站嬉愧,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏四康。R本人自食惡果不足惜闪金,卻給世界環(huán)境...
    茶點故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望墨闲。 院中可真熱鬧,春花似錦瞻离、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽格侯。三九已至,卻和暖如春朝墩,著一層夾襖步出監(jiān)牢的瞬間亿卤,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工肛冶, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人近范。 一個月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像延蟹,于是被迫代替她去往敵國和親评矩。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,724評論 2 354

推薦閱讀更多精彩內(nèi)容