Spark 源碼分析(五): Executor 啟動

上一篇已經(jīng)將 Application 注冊到了 master 上了兰吟,在 master 收到注冊消息后會進行一系列操作耕皮,最后調(diào)用 schedule 方法。

這個 schedule 方法會去做兩件事圆恤,一件事是給等待調(diào)度的 driver 分配資源延旧,另一件事是給等待調(diào)度的 application 去分配資源啟動 Executor作岖。

給 application 分配資源啟動 Executor 的代碼最終會調(diào)用一個方法:launchExecutor(是 Master 中的代碼)嵌器。

在 lauchExecutor 方法中會先向 worker 發(fā)送 lauchExecutor 消息枝笨,然后會向 driver 發(fā)送 executor 已經(jīng)啟動的消息。

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    // 向 worker 發(fā)送 LaunchExecutor 消息
    worker.endpoint.send(LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
     // 向 driver 發(fā)送 ExecutorAdded 的消息
    exec.application.driver.send(
      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  }

下面看 worker 端收到 launchExecutor 消息后是怎么處理的学密。

同樣的在 receive 的模式匹配中找到該消息的匹配淘衙,可以看到做了這些事情:

1,先判斷發(fā)消息的 master 是否是 alive 狀態(tài)则果,如果是才會繼續(xù)執(zhí)行幔翰;

2漩氨,創(chuàng)建 executor 的工作目錄和本地臨時目錄西壮;

3,將 master 發(fā)來的消息封裝為 ExecutorRunner 對象叫惊,然后調(diào)用其 start 方法啟動線程款青;

4,向 master 發(fā)送消息霍狰,告訴當前 executor 的狀態(tài)抡草;

// 模式匹配饰及,是 LaunchExecutor 消息
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
        // 如果發(fā)送消息的 master 不是 active 的則不執(zhí)行    
        if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

          // 創(chuàng)建 executor 的工作目錄
          val executorDir = new File(workDir, appId + "/" + execId)
          if (!executorDir.mkdirs()) {
            throw new IOException("Failed to create directory " + executorDir)
          }

          // 通過 SPARK_EXECUTOR_DIRS 環(huán)境變量配置創(chuàng)建 Executor 的本地目錄
          // Worker 會在當前 application 執(zhí)行結(jié)束后刪除這個目錄
          val appLocalDirs = appDirectories.getOrElse(appId,
            Utils.getOrCreateLocalRootDirs(conf).map { dir =>
              val appDir = Utils.createDirectory(dir, namePrefix = "executor")
              Utils.chmod700(appDir)
              appDir.getAbsolutePath()
            }.toSeq)
          appDirectories(appId) = appLocalDirs
          // 將接收到的 application 中 Executor 的相關(guān)信息封裝為一個 ExecutorRunner 對象
          val manager = new ExecutorRunner(
            appId,
            execId,
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
            cores_,
            memory_,
            self,
            workerId,
            host,
            webUi.boundPort,
            publicAddress,
            sparkHome,
            executorDir,
            workerUri,
            conf,
            appLocalDirs, ExecutorState.RUNNING)
          executors(appId + "/" + execId) = manager
          // 啟動這個線程
          manager.start()
          // 更新 worker 的資源利用情況
          coresUsed += cores_
          memoryUsed += memory_
          // 給 master 回復(fù)消息
          sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
        } catch {
          case e: Exception =>
            logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
              Some(e.toString), None))
        }
      }

這里主要看 ExecutorRunner 調(diào)用 start 之后做了什么。

實際上是創(chuàng)建了一個線程康震,線程運行時會去執(zhí)行 fetchAndRunExecutor 這個方法燎含。

private[worker] def start() {
    // 創(chuàng)建線程
    workerThread = new Thread("ExecutorRunner for " + fullId) {
      override def run() { fetchAndRunExecutor() }
    }
    // 啟動線程
    workerThread.start()
    // Shutdown hook that kills actors on shutdown.
    shutdownHook = ShutdownHookManager.addShutdownHook { () =>
      // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
      // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
      if (state == ExecutorState.RUNNING) {
        state = ExecutorState.FAILED
      }
      killProcess(Some("Worker shutting down")) }
  }

fetchAndRunExecutor 這個方法將接收到的信息做如下一些事情:

1,創(chuàng)建 ProcessBuilder腿短,準備執(zhí)行本地命令屏箍;

2,為 ProcessBuilder 創(chuàng)建執(zhí)行目錄橘忱,設(shè)置環(huán)境變量赴魁;

3,啟動 ProcessBuilder钝诚,生成 Executor 進程颖御,這個進程的名稱一般為:CoarseGrainedExecutorBackend;

4凝颇,重定向輸出流和錯誤文件流潘拱;

5,等待獲取 executor 的退出碼拧略,然后發(fā)送給 worker泽铛;

private def fetchAndRunExecutor() {
    try {
      // 創(chuàng)建 ProcessBuilder
      val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
        memory, sparkHome.getAbsolutePath, substituteVariables)
      // 封裝指令
      val command = builder.command()
      val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
      logInfo(s"Launch command: $formattedCommand")

      builder.directory(executorDir)
      builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
      // In case we are running this from within the Spark Shell, avoid creating a "scala"
      // parent process for the executor command
      builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

      // Add webUI log urls
      val baseUrl =
        if (conf.getBoolean("spark.ui.reverseProxy", false)) {
          s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
        } else {
          s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
        }
      builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
      builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

      // 啟動進程
      process = builder.start()
      val header = "Spark Executor Command: %s\n%s\n\n".format(
        formattedCommand, "=" * 40)

      // 重定向標準輸出流
      // Redirect its stdout and stderr to files
      val stdout = new File(executorDir, "stdout")
      stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

      // 重定向錯誤輸出流
      val stderr = new File(executorDir, "stderr")
      Files.write(header, stderr, StandardCharsets.UTF_8)
      stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

      // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
      // or with nonzero exit code
      // 等待退出碼
      val exitCode = process.waitFor()
      state = ExecutorState.EXITED
      val message = "Command exited with code " + exitCode
      // 將退出碼發(fā)送給 worker
      worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
    } catch {
      case interrupted: InterruptedException =>
        logInfo("Runner thread for executor " + fullId + " interrupted")
        state = ExecutorState.KILLED
        killProcess(None)
      case e: Exception =>
        logError("Error running executor", e)
        state = ExecutorState.FAILED
        killProcess(Some(e.toString))
    }
  }

至此,Executor 是啟動完成了辑鲤。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末盔腔,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子月褥,更是在濱河造成了極大的恐慌弛随,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件宁赤,死亡現(xiàn)場離奇詭異舀透,居然都是意外死亡,警方通過查閱死者的電腦和手機决左,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進店門愕够,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人佛猛,你說我怎么就攤上這事惑芭。” “怎么了继找?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵遂跟,是天一觀的道長。 經(jīng)常有香客問我,道長幻锁,這世上最難降的妖魔是什么凯亮? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮哄尔,結(jié)果婚禮上假消,老公的妹妹穿的比我還像新娘。我一直安慰自己岭接,他們只是感情好置谦,可當我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著亿傅,像睡著了一般媒峡。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上葵擎,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天谅阿,我揣著相機與錄音,去河邊找鬼酬滤。 笑死签餐,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的盯串。 我是一名探鬼主播氯檐,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼体捏!你這毒婦竟也來了冠摄?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤几缭,失蹤者是張志新(化名)和其女友劉穎河泳,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體年栓,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡拆挥,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了某抓。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片纸兔。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖否副,靈堂內(nèi)的尸體忽然破棺而出汉矿,到底是詐尸還是另有隱情,我是刑警寧澤副编,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布负甸,位于F島的核電站流强,受9級特大地震影響痹届,放射性物質(zhì)發(fā)生泄漏呻待。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一队腐、第九天 我趴在偏房一處隱蔽的房頂上張望蚕捉。 院中可真熱鬧,春花似錦柴淘、人聲如沸迫淹。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽敛熬。三九已至,卻和暖如春第股,著一層夾襖步出監(jiān)牢的瞬間应民,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工夕吻, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留诲锹,地道東北人。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓涉馅,卻偏偏與公主長得像归园,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子稚矿,可洞房花燭夜當晚...
    茶點故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內(nèi)容