Spark 源碼分析(四): Application 的注冊

在前面一篇文章中分析到了 SparkContext 中的 TaskScheduler 創(chuàng)建及啟動。

在 StandaloneSchedulerBackend start 代碼里除了創(chuàng)建了一個 DriverEndpoint 用于 standalone 模式下用來和 Executor 通信之外還會創(chuàng)建一個 AppClient漫谷。

這個 AppClient 會向 Master 注冊 Application窗轩,然后 Master 會通過 Application 的信息為它分配 Worker。

創(chuàng)建這個 AppClient 對象之前會去獲取一些必要的參數(shù)缘滥。

        // 拿到 Driver RpcEndpoint 的地址
        val driverUrl = RpcEndpointAddress(
      sc.conf.get("spark.driver.host"),
      sc.conf.get("spark.driver.port").toInt,
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
    // 一些啟動參數(shù)
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    // executor 額外的 java 選項
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    // executor 額外的環(huán)境變量
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    // executor 額外的依賴
    val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
      
    val testingClassPath =
      if (sys.props.contains("spark.testing")) {
        sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
      } else {
        Nil
      }

    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    // 將上面的那些信息封裝成一個 command 對象
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    // 獲取 application UI 的地址
    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
    // 獲取 executor 配置的 core 數(shù)量
    val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)

然后用上面的這些參數(shù)結(jié)合 SparkContext 中的一些數(shù)據(jù)封裝一個 ApplicationDescription 對象获讳,這對象里封裝了一些信息诵原,可以看看风钻。

private[spark] case class ApplicationDescription(
    name: String,// 名字
    maxCores: Option[Int],// 最多使用的 core 數(shù)量
    memoryPerExecutorMB: Int,// 每個 Executor 分配的內(nèi)存
    command: Command,// 啟動命令
    appUiUrl: String,// application 的 UI Url 地址
    eventLogDir: Option[URI] = None,// event 日志目錄
    // short name of compression codec used when writing event logs, if any (e.g. lzf)
    eventLogCodec: Option[String] = None,
    coresPerExecutor: Option[Int] = None,
    // number of executors this application wants to start with,
    // only used if dynamic allocation is enabled
    initialExecutorLimit: Option[Int] = None,
    user: String = System.getProperty("user.name", "<unknown>")) {

  override def toString: String = "ApplicationDescription(" + name + ")"
}

封裝好 ApplicationDescription 對象之后顷蟀,根據(jù)這個對象創(chuàng)建一個 StandaloneAppClient 對象,然后調(diào)用 StandaloneAppClient 對象的 start 方法骡技。

// 封裝成一個 AppclientDescription 對象    
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
  // 創(chuàng)建 StandaloneAppClient 對象
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
  // 調(diào)用 StandaloneAppClient 的 start 方法
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
  // 等待注冊狀態(tài)的更新
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)

StandaloneAppClient 的 start 方法會去創(chuàng)建并注冊一個 ClientEndpoint 用來向 master 注冊 Application鸣个。

def start() {
    // Just launch an rpcEndpoint; it will call back into the listener.
    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  }

ClientEndPoint 是一個 RpcEndpoint,在初始化的時候會去調(diào)用其 onstart 方法布朦。

override def onStart(): Unit = {
      try {
        // 向 master 注冊
        registerWithMaster(1)
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          stop()
      }
    }

registerWithMaster 方法實際上會去調(diào)用 tryRegisterAllMasters 方法囤萤,向所有的 Master 去注冊。

在 Spark 中喝滞,Master 可能會是高可靠的 (HA)阁将,這種情況會有可能有多個 Master,不過只有一個 Master 處于 alive 狀態(tài)右遭,其它處于 standby 狀態(tài)做盅。

/**
* 向所有的 master 進行一步注冊,將會一直調(diào)用 tryRegisterAllMasters 方法進行注冊窘哈,知道超出注冊時間
* 當(dāng)成功連接到 master 吹榴,所有調(diào)度的工作和 Futures 都會被取消
*/
private def registerWithMaster(nthRetry: Int) {
        // 實際上調(diào)用了 tryRegisterAllMasters ,想所有 master 進行注冊
      registerMasterFutures.set(tryRegisterAllMasters())
      registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
        override def run(): Unit = {
          if (registered.get) {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerMasterThreadPool.shutdownNow()
          } else if (nthRetry >= REGISTRATION_RETRIES) {
            markDead("All masters are unresponsive! Giving up.")
          } else {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerWithMaster(nthRetry + 1)
          }
        }
      }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
    }

tryRegisterAllMasters 方法的調(diào)用滚婉,向所有 master 注冊图筹。

// 異步向所有 master 注冊,返回一個 [Future] 的數(shù)組用來以后取消
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered.get) {
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
            // 向 master 發(fā)送注冊消息
            masterRef.send(RegisterApplication(appDescription, self))
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        })
      }
    }

向 master 發(fā)送注冊消息后让腹,master 收到消息后注冊完 application 之后會回復(fù)一條消息远剩。

case RegisterApplication(description, driver) =>
      // TODO Prevent repeated registrations from some driver
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        val app = createApplication(description, driver)
        // 將 master 中內(nèi)存中等待調(diào)度的 app 隊列更新,
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        // 向 driver 回復(fù)一條注冊 Application 的處理結(jié)果的消息
        driver.send(RegisteredApplication(app.id, self))
        // 調(diào)度資源
        schedule()
      }

master 調(diào)用 shedule 方法骇窍,這個方法做兩件事瓜晤,一個是給等待調(diào)度的 driver 分配資源,一個是給等待調(diào)度的 application 分配資源去啟動 executor腹纳。

在給等待調(diào)度的 application 分配資源的時候最后會走到 launchExecutor 方法痢掠,這個方法會通過給符合要求的 worker 發(fā)送啟動 executor 的消息去啟動 executor。

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    // 給 worker 發(fā)送 LaunchExecutor 消息
    worker.endpoint.send(LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
    exec.application.driver.send(
      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  }

在 worker 收到啟動 Executor 的消息后會去根據(jù)消息去啟動對應(yīng)的 Executor嘲恍。

至此足画,Application 的注冊就完成了。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末佃牛,一起剝皮案震驚了整個濱河市淹辞,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌俘侠,老刑警劉巖象缀,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件彬向,死亡現(xiàn)場離奇詭異,居然都是意外死亡攻冷,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進店門遍希,熙熙樓的掌柜王于貴愁眉苦臉地迎上來等曼,“玉大人,你說我怎么就攤上這事凿蒜〗” “怎么了?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵废封,是天一觀的道長州泊。 經(jīng)常有香客問我,道長漂洋,這世上最難降的妖魔是什么遥皂? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮刽漂,結(jié)果婚禮上演训,老公的妹妹穿的比我還像新娘。我一直安慰自己贝咙,他們只是感情好样悟,可當(dāng)我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著庭猩,像睡著了一般窟她。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蔼水,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天震糖,我揣著相機與錄音,去河邊找鬼徙缴。 笑死试伙,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的于样。 我是一名探鬼主播疏叨,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼穿剖!你這毒婦竟也來了蚤蔓?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤糊余,失蹤者是張志新(化名)和其女友劉穎秀又,沒想到半個月后单寂,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡吐辙,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年宣决,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片昏苏。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡尊沸,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出贤惯,到底是詐尸還是另有隱情洼专,我是刑警寧澤,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布孵构,位于F島的核電站屁商,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏颈墅。R本人自食惡果不足惜蜡镶,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望精盅。 院中可真熱鬧帽哑,春花似錦、人聲如沸叹俏。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽粘驰。三九已至屡谐,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蝌数,已是汗流浹背愕掏。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留顶伞,地道東北人饵撑。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像唆貌,于是被迫代替她去往敵國和親滑潘。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內(nèi)容