kafka源碼剖析(二)之kafka-server的啟動(dòng)

KAFKA的啟動(dòng)

直接運(yùn)行Kafka.scala中的main方法(需要指定啟動(dòng)參數(shù)靶累,也就是server.properties的位置)來(lái)啟動(dòng)Kafka宏蛉。因?yàn)閗afka依賴(lài)zookeeper,所以我們需要提前啟動(dòng)zookeeper俘种,然后在server.properties中指定zk地址后,啟動(dòng)。

看一下main()方法:

  def main(args: Array[String]): Unit = {
    try {   
// 加載對(duì)應(yīng)的server.properties配置文件,并生成Properties實(shí)例.
      val serverProps = getPropsFromArgs(args)
//這里生成一個(gè)KafkaServer的實(shí)例,這個(gè)實(shí)例生成時(shí),會(huì)在實(shí)例中同時(shí)生成一個(gè)KafkaServer的實(shí)例,
// 生成KafkaServer實(shí)例前,需要先通過(guò)serverProps生成出一個(gè)KafkaConfig的實(shí)例.

      val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

      // attach shutdown handler to catch control-c
      Runtime.getRuntime().addShutdownHook(new Thread() {
        override def run() = {
          kafkaServerStartable.shutdown
        }
      })
// 停止 服務(wù) 
      kafkaServerStartable.startup
      kafkaServerStartable.awaitShutdown
    }
    catch {
      case e: Throwable =>
        fatal(e)
        System.exit(1)
    }
    System.exit(0)
  }

根據(jù)properties生成server實(shí)例

getPropsFromArgs(args)缚柳,這一行很明確,就是從配置文件中讀取我們配置的內(nèi)容搪锣,然后賦值給serverProps秋忙。
KafkaServerStartable.fromProps(serverProps),

    object KafkaServerStartable {
      def fromProps(serverProps: Properties) = {
        KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
        new KafkaServerStartable(KafkaConfig.fromProps(serverProps))
      }
    }

這塊主要是啟動(dòng)了一個(gè)內(nèi)部的監(jiān)控服務(wù)(內(nèi)部狀態(tài)監(jiān)控)构舟。

KafkaServer的啟動(dòng)

下面是一個(gè)在java中常見(jiàn)的鉤子函數(shù)灰追,在關(guān)閉時(shí)會(huì)啟動(dòng)一些銷(xiāo)毀程序,保證程序安全關(guān)閉。kafkaServerStartable.startup弹澎。跟進(jìn)去可以很清楚的看到朴下,里面調(diào)用的方法是KafkaServer中的startup方法:

// 啟動(dòng)kafka的調(diào)度器,這個(gè)KafkaScheduler的實(shí)例生成時(shí)需要得到background.threads配置的值,默認(rèn)是10個(gè),用于配置后臺(tái)線程池的個(gè)數(shù)

def startup() {
    try {
      info("starting")

      if(isShuttingDown.get)
        throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")

      if(startupComplete.get)
        return

      val canStartup = isStartingUp.compareAndSet(false, true)
      if (canStartup) {
        metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)

        brokerState.newState(Starting)
    // 啟動(dòng)scheduler 實(shí)例  
        /* start scheduler */
        kafkaScheduler.startup()
   
//  生產(chǎn)zk 初始化 并依賴(lài)  判斷  broker 是否發(fā)生變化  
        /* setup zookeeper */
        zkUtils = initZk()
     
     // 初始化創(chuàng)建并啟動(dòng)LogManager的實(shí)例,
        /* start log manager */
        logManager = createLogManager(zkUtils.zkClient, brokerState)
        logManager.startup()

// 如果broker.id的配置沒(méi)有配置(小于0的值時(shí)),同時(shí)broker.id.generation.enable配置為true,默認(rèn)也就是true,
// 這個(gè)時(shí)候根據(jù)zk中/brokers/seqid路徑的version值,第一次從0開(kāi)始,每次增加.并加上reserved.broker.max.id配置的值,默認(rèn)是1000,
//來(lái)充當(dāng)這個(gè)server的broker.id,同時(shí)把這個(gè)broker.id更新到logDir目錄下的meta.properties文件中,
//下次讀取時(shí),直接讀取這個(gè)配置文件中的broker.id的值,而不需要重新進(jìn)行創(chuàng)建.
        /* generate brokerId */
        config.brokerId =  getBrokerId
        this.logIdent = "[Kafka Server " + config.brokerId + "], "
  // 啟動(dòng) kafka 的sockerServer 
        socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
        socketServer.startup()

//,生成并啟動(dòng)ReplicaManager,此實(shí)例依賴(lài)kafkaScheduler與logManager實(shí)例.
       /* start replica manager */
        replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
          isShuttingDown)
        replicaManager.startup()

//生成并啟動(dòng)KafkaController實(shí)例,此使用用于控制當(dāng)前的broker中的所有的leader的partition的操作.        
   /* start kafka controller */
        kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
        kafkaController.startup()

   //生成并啟動(dòng)GroupCoordinator的實(shí)例,這個(gè)是0.9新加入的一個(gè)玩意,用于對(duì)consumer中新加入的與partition的檢查,并對(duì)partition與consumer進(jìn)行平衡操作.

        /* start group coordinator */
        groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime)
        groupCoordinator.startup()

    // 根據(jù)authorizer.class.name配置項(xiàng)配置的Authorizer的實(shí)現(xiàn)類(lèi),生成一個(gè)用于認(rèn)證的實(shí)例,用于對(duì)用戶(hù)的操作進(jìn)行認(rèn)證.這個(gè)默認(rèn)為不認(rèn)證.
        /* Get the authorizer and initialize it if one is specified.*/
        authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
          val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
          authZ.configure(config.originals())
          authZ
        }

   // 成用于對(duì)外對(duì)外提供服務(wù)的KafkaApis實(shí)例,并設(shè)置當(dāng)前的broker的狀態(tài)為運(yùn)行狀態(tài)
        /* start processing requests */
        apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator,
          kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
        brokerState.newState(RunningAsBroker)

        Mx4jLoader.maybeLoad()


//生成動(dòng)態(tài)配置修改的處理管理,主要是topic修改與client端配置的修改,并把已經(jīng)存在的clientid對(duì)應(yīng)的配置進(jìn)行修改.
        /* start dynamic config manager */
        dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config),
                                                           ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))

        // Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
        // TODO: Move this logic to DynamicConfigManager
        AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {
          case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)
        }
//   創(chuàng)建一個(gè)配置實(shí)例 并發(fā)起通知給個(gè)個(gè)block
        // Create the config manager. start listening to notifications
        dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
        dynamicConfigManager.startup()
         
        /* tell everyone we are alive */
        val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
          if (endpoint.port == 0)
            (protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))
          else
            (protocol, endpoint)
        }
        kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
          config.interBrokerProtocolVersion)
        kafkaHealthcheck.startup()

        // Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
        checkpointBrokerId(config.brokerId)

        /* register broker metrics */
        registerStats()

        shutdownLatch = new CountDownLatch(1)
        startupComplete.set(true)
        isStartingUp.set(false)
        AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
        info("started")
      }
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
        isStartingUp.set(false)
        shutdown()
        throw e
    }
  }

首先判斷是否目前正在關(guān)閉中或者已經(jīng)啟動(dòng)了,這兩種情況直接拋出異常苦蒿。然后是一個(gè)CAS的操作isStartingUp殴胧,防止線程并發(fā)操作啟動(dòng),判斷是否可以啟動(dòng)佩迟。如果可以啟動(dòng)团滥,就開(kāi)始我們的啟動(dòng)過(guò)程。

構(gòu)造Metrics類(lèi)
定義broker狀態(tài)為啟動(dòng)中starting
啟動(dòng)定時(shí)器kafkaScheduler.startup()
構(gòu)造zkUtils:利用參數(shù)中的zk信息报强,啟動(dòng)一個(gè)zk客戶(hù)端
啟動(dòng)文件管理器:讀取zk中的配置信息灸姊,包含__consumer_offsets和system.topic。重點(diǎn)是啟動(dòng)一些定時(shí)任務(wù)秉溉,來(lái)刪除符合條件的記錄(cleanupLogs)力惯,清理臟記錄(flushDirtyLogs),把所有記錄寫(xiě)到一個(gè)文本文件中召嘶,防止在啟動(dòng)時(shí)重啟所有的記錄文件(checkpointRecoveryPointOffsets)父晶。

  /**
   *  Start the background threads to flush logs and do log cleanup
   */
  def startup() {
    /* Schedule the cleanup task to delete old logs */
    if(scheduler != null) {
      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
      scheduler.schedule("kafka-log-retention", 
                         cleanupLogs, 
                         delay = InitialTaskDelayMs, 
                         period = retentionCheckMs, 
                         TimeUnit.MILLISECONDS)
      info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
      scheduler.schedule("kafka-log-flusher", 
                         flushDirtyLogs, 
                         delay = InitialTaskDelayMs, 
                         period = flushCheckMs, 
                         TimeUnit.MILLISECONDS)
      scheduler.schedule("kafka-recovery-point-checkpoint",
                         checkpointRecoveryPointOffsets,
                         delay = InitialTaskDelayMs,
                         period = flushCheckpointMs,
                         TimeUnit.MILLISECONDS)
    }
    if(cleanerConfig.enableCleaner)
      cleaner.startup()
  }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市苍蔬,隨后出現(xiàn)的幾起案子诱建,更是在濱河造成了極大的恐慌,老刑警劉巖碟绑,帶你破解...
    沈念sama閱讀 216,997評(píng)論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件俺猿,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡格仲,警方通過(guò)查閱死者的電腦和手機(jī)押袍,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,603評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)凯肋,“玉大人谊惭,你說(shuō)我怎么就攤上這事∥甓” “怎么了圈盔?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,359評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)悄雅。 經(jīng)常有香客問(wèn)我驱敲,道長(zhǎng),這世上最難降的妖魔是什么宽闲? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,309評(píng)論 1 292
  • 正文 為了忘掉前任众眨,我火速辦了婚禮握牧,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘娩梨。我一直安慰自己沿腰,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,346評(píng)論 6 390
  • 文/花漫 我一把揭開(kāi)白布狈定。 她就那樣靜靜地躺著颂龙,像睡著了一般。 火紅的嫁衣襯著肌膚如雪掸冤。 梳的紋絲不亂的頭發(fā)上厘托,一...
    開(kāi)封第一講書(shū)人閱讀 51,258評(píng)論 1 300
  • 那天,我揣著相機(jī)與錄音稿湿,去河邊找鬼。 笑死押赊,一個(gè)胖子當(dāng)著我的面吹牛饺藤,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播流礁,決...
    沈念sama閱讀 40,122評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼涕俗,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了神帅?” 一聲冷哼從身側(cè)響起再姑,我...
    開(kāi)封第一講書(shū)人閱讀 38,970評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎找御,沒(méi)想到半個(gè)月后元镀,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,403評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡霎桅,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,596評(píng)論 3 334
  • 正文 我和宋清朗相戀三年栖疑,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片滔驶。...
    茶點(diǎn)故事閱讀 39,769評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡遇革,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出揭糕,到底是詐尸還是另有隱情萝快,我是刑警寧澤,帶...
    沈念sama閱讀 35,464評(píng)論 5 344
  • 正文 年R本政府宣布著角,位于F島的核電站揪漩,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏雇寇。R本人自食惡果不足惜氢拥,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,075評(píng)論 3 327
  • 文/蒙蒙 一蚌铜、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧嫩海,春花似錦冬殃、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,705評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至奕谭,卻和暖如春涣觉,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背血柳。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,848評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工官册, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人难捌。 一個(gè)月前我還...
    沈念sama閱讀 47,831評(píng)論 2 370
  • 正文 我出身青樓膝宁,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親根吁。 傳聞我的和親對(duì)象是個(gè)殘疾皇子员淫,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,678評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容