在前面一篇文章中分析到了 SparkContext 中的 TaskScheduler 創(chuàng)建及啟動。
在 StandaloneSchedulerBackend start 代碼里除了創(chuàng)建了一個 DriverEndpoint 用于 standalone 模式下用來和 Executor 通信之外還會創(chuàng)建一個 AppClient漫谷。
這個 AppClient 會向 Master 注冊 Application窗轩,然后 Master 會通過 Application 的信息為它分配 Worker。
創(chuàng)建這個 AppClient 對象之前會去獲取一些必要的參數(shù)缘滥。
// 拿到 Driver RpcEndpoint 的地址
val driverUrl = RpcEndpointAddress(
sc.conf.get("spark.driver.host"),
sc.conf.get("spark.driver.port").toInt,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
// 一些啟動參數(shù)
val args = Seq(
"--driver-url", driverUrl,
"--executor-id", "{{EXECUTOR_ID}}",
"--hostname", "{{HOSTNAME}}",
"--cores", "{{CORES}}",
"--app-id", "{{APP_ID}}",
"--worker-url", "{{WORKER_URL}}")
// executor 額外的 java 選項
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
// executor 額外的環(huán)境變量
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
// executor 額外的依賴
val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
.map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
val testingClassPath =
if (sys.props.contains("spark.testing")) {
sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
} else {
Nil
}
// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
// 將上面的那些信息封裝成一個 command 對象
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
// 獲取 application UI 的地址
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
// 獲取 executor 配置的 core 數(shù)量
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
然后用上面的這些參數(shù)結(jié)合 SparkContext 中的一些數(shù)據(jù)封裝一個 ApplicationDescription 對象获讳,這對象里封裝了一些信息诵原,可以看看风钻。
private[spark] case class ApplicationDescription(
name: String,// 名字
maxCores: Option[Int],// 最多使用的 core 數(shù)量
memoryPerExecutorMB: Int,// 每個 Executor 分配的內(nèi)存
command: Command,// 啟動命令
appUiUrl: String,// application 的 UI Url 地址
eventLogDir: Option[URI] = None,// event 日志目錄
// short name of compression codec used when writing event logs, if any (e.g. lzf)
eventLogCodec: Option[String] = None,
coresPerExecutor: Option[Int] = None,
// number of executors this application wants to start with,
// only used if dynamic allocation is enabled
initialExecutorLimit: Option[Int] = None,
user: String = System.getProperty("user.name", "<unknown>")) {
override def toString: String = "ApplicationDescription(" + name + ")"
}
封裝好 ApplicationDescription 對象之后顷蟀,根據(jù)這個對象創(chuàng)建一個 StandaloneAppClient 對象,然后調(diào)用 StandaloneAppClient 對象的 start 方法骡技。
// 封裝成一個 AppclientDescription 對象
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
// 創(chuàng)建 StandaloneAppClient 對象
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
// 調(diào)用 StandaloneAppClient 的 start 方法
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
// 等待注冊狀態(tài)的更新
waitForRegistration()
launcherBackend.setState(SparkAppHandle.State.RUNNING)
StandaloneAppClient 的 start 方法會去創(chuàng)建并注冊一個 ClientEndpoint 用來向 master 注冊 Application鸣个。
def start() {
// Just launch an rpcEndpoint; it will call back into the listener.
endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
}
ClientEndPoint 是一個 RpcEndpoint,在初始化的時候會去調(diào)用其 onstart 方法布朦。
override def onStart(): Unit = {
try {
// 向 master 注冊
registerWithMaster(1)
} catch {
case e: Exception =>
logWarning("Failed to connect to master", e)
markDisconnected()
stop()
}
}
registerWithMaster 方法實際上會去調(diào)用 tryRegisterAllMasters 方法囤萤,向所有的 Master 去注冊。
在 Spark 中喝滞,Master 可能會是高可靠的 (HA)阁将,這種情況會有可能有多個 Master,不過只有一個 Master 處于 alive 狀態(tài)右遭,其它處于 standby 狀態(tài)做盅。
/**
* 向所有的 master 進行一步注冊,將會一直調(diào)用 tryRegisterAllMasters 方法進行注冊窘哈,知道超出注冊時間
* 當(dāng)成功連接到 master 吹榴,所有調(diào)度的工作和 Futures 都會被取消
*/
private def registerWithMaster(nthRetry: Int) {
// 實際上調(diào)用了 tryRegisterAllMasters ,想所有 master 進行注冊
registerMasterFutures.set(tryRegisterAllMasters())
registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
override def run(): Unit = {
if (registered.get) {
registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
} else if (nthRetry >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
registerMasterFutures.get.foreach(_.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}
tryRegisterAllMasters 方法的調(diào)用滚婉,向所有 master 注冊图筹。
// 異步向所有 master 注冊,返回一個 [Future] 的數(shù)組用來以后取消
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
for (masterAddress <- masterRpcAddresses) yield {
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
if (registered.get) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
// 向 master 發(fā)送注冊消息
masterRef.send(RegisterApplication(appDescription, self))
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
})
}
}
向 master 發(fā)送注冊消息后让腹,master 收到消息后注冊完 application 之后會回復(fù)一條消息远剩。
case RegisterApplication(description, driver) =>
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
val app = createApplication(description, driver)
// 將 master 中內(nèi)存中等待調(diào)度的 app 隊列更新,
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
persistenceEngine.addApplication(app)
// 向 driver 回復(fù)一條注冊 Application 的處理結(jié)果的消息
driver.send(RegisteredApplication(app.id, self))
// 調(diào)度資源
schedule()
}
master 調(diào)用 shedule 方法骇窍,這個方法做兩件事瓜晤,一個是給等待調(diào)度的 driver 分配資源,一個是給等待調(diào)度的 application 分配資源去啟動 executor腹纳。
在給等待調(diào)度的 application 分配資源的時候最后會走到 launchExecutor 方法痢掠,這個方法會通過給符合要求的 worker 發(fā)送啟動 executor 的消息去啟動 executor。
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
// 給 worker 發(fā)送 LaunchExecutor 消息
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
在 worker 收到啟動 Executor 的消息后會去根據(jù)消息去啟動對應(yīng)的 Executor嘲恍。
至此足画,Application 的注冊就完成了。