上一篇已經(jīng)將 Application 注冊到了 master 上了兰吟,在 master 收到注冊消息后會進行一系列操作耕皮,最后調(diào)用 schedule 方法。
這個 schedule 方法會去做兩件事圆恤,一件事是給等待調(diào)度的 driver 分配資源延旧,另一件事是給等待調(diào)度的 application 去分配資源啟動 Executor作岖。
給 application 分配資源啟動 Executor 的代碼最終會調(diào)用一個方法:launchExecutor(是 Master 中的代碼)嵌器。
在 lauchExecutor 方法中會先向 worker 發(fā)送 lauchExecutor 消息枝笨,然后會向 driver 發(fā)送 executor 已經(jīng)啟動的消息。
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
// 向 worker 發(fā)送 LaunchExecutor 消息
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
// 向 driver 發(fā)送 ExecutorAdded 的消息
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
下面看 worker 端收到 launchExecutor 消息后是怎么處理的学密。
同樣的在 receive 的模式匹配中找到該消息的匹配淘衙,可以看到做了這些事情:
1,先判斷發(fā)消息的 master 是否是 alive 狀態(tài)则果,如果是才會繼續(xù)執(zhí)行幔翰;
2漩氨,創(chuàng)建 executor 的工作目錄和本地臨時目錄西壮;
3,將 master 發(fā)來的消息封裝為 ExecutorRunner 對象叫惊,然后調(diào)用其 start 方法啟動線程款青;
4,向 master 發(fā)送消息霍狰,告訴當前 executor 的狀態(tài)抡草;
// 模式匹配饰及,是 LaunchExecutor 消息
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
// 如果發(fā)送消息的 master 不是 active 的則不執(zhí)行
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
// 創(chuàng)建 executor 的工作目錄
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
// 通過 SPARK_EXECUTOR_DIRS 環(huán)境變量配置創(chuàng)建 Executor 的本地目錄
// Worker 會在當前 application 執(zhí)行結(jié)束后刪除這個目錄
val appLocalDirs = appDirectories.getOrElse(appId,
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
val appDir = Utils.createDirectory(dir, namePrefix = "executor")
Utils.chmod700(appDir)
appDir.getAbsolutePath()
}.toSeq)
appDirectories(appId) = appLocalDirs
// 將接收到的 application 中 Executor 的相關(guān)信息封裝為一個 ExecutorRunner 對象
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
// 啟動這個線程
manager.start()
// 更新 worker 的資源利用情況
coresUsed += cores_
memoryUsed += memory_
// 給 master 回復(fù)消息
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
} catch {
case e: Exception =>
logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
Some(e.toString), None))
}
}
這里主要看 ExecutorRunner 調(diào)用 start 之后做了什么。
實際上是創(chuàng)建了一個線程康震,線程運行時會去執(zhí)行 fetchAndRunExecutor 這個方法燎含。
private[worker] def start() {
// 創(chuàng)建線程
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
// 啟動線程
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
// It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
// be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
if (state == ExecutorState.RUNNING) {
state = ExecutorState.FAILED
}
killProcess(Some("Worker shutting down")) }
}
fetchAndRunExecutor 這個方法將接收到的信息做如下一些事情:
1,創(chuàng)建 ProcessBuilder腿短,準備執(zhí)行本地命令屏箍;
2,為 ProcessBuilder 創(chuàng)建執(zhí)行目錄橘忱,設(shè)置環(huán)境變量赴魁;
3,啟動 ProcessBuilder钝诚,生成 Executor 進程颖御,這個進程的名稱一般為:CoarseGrainedExecutorBackend;
4凝颇,重定向輸出流和錯誤文件流潘拱;
5,等待獲取 executor 的退出碼拧略,然后發(fā)送給 worker泽铛;
private def fetchAndRunExecutor() {
try {
// 創(chuàng)建 ProcessBuilder
val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
// 封裝指令
val command = builder.command()
val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
logInfo(s"Launch command: $formattedCommand")
builder.directory(executorDir)
builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
// Add webUI log urls
val baseUrl =
if (conf.getBoolean("spark.ui.reverseProxy", false)) {
s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
} else {
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
}
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
// 啟動進程
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
formattedCommand, "=" * 40)
// 重定向標準輸出流
// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
// 重定向錯誤輸出流
val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, StandardCharsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
// 等待退出碼
val exitCode = process.waitFor()
state = ExecutorState.EXITED
val message = "Command exited with code " + exitCode
// 將退出碼發(fā)送給 worker
worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
} catch {
case interrupted: InterruptedException =>
logInfo("Runner thread for executor " + fullId + " interrupted")
state = ExecutorState.KILLED
killProcess(None)
case e: Exception =>
logError("Error running executor", e)
state = ExecutorState.FAILED
killProcess(Some(e.toString))
}
}
至此,Executor 是啟動完成了辑鲤。