上一篇博客詳細分析了Spark在Standalone模式下的部署過程愉舔,文中提到在Worker注冊完成后需要執(zhí)行一個schedule操作來分配資源,本文就將具體分析此方法具體是怎樣分配資源的锈嫩。
注:本專題的文章皆使用Spark-1.6.3版本的源碼為參考,如果Spark-2.1.0版本有重大改進的地方也會進行說明垦搬。
什么時候會調(diào)用schedule呼寸?
其實每當一個新的application加入或者資源發(fā)生變化的時候都會調(diào)用schudule方法對資源進行重新分配,那么它是如何分配資源的呢猴贰?我們下面進行源碼級別的分析对雪。
schedule
我們先貼出schedule的源碼:
// 既然要分配資源就必須保證Master的當前狀態(tài)為ALIVE
if (state != RecoveryState.ALIVE) {
return
}
// Drivers take strict precedence over executors
// 注釋說的很明確,先注冊Drivers然后再注冊executors
// 1. 首先將ALIVE狀態(tài)的Workers使用shuffle的方式打亂米绕,以免每次都將Driver分配到同一個Worker上
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
// 2. 循環(huán)遍歷啟動Drivers
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
// 2.1 判斷是否有剩余的沒有分配的Workers慌植,并且尚未啟動
while (numWorkersVisited < numWorkersAlive && !launched) {
// 2.2 獲取一個Worker,第一個的索引為0义郑,后面的索引根據(jù)curPos = (curPos + 1) % numWorkersAlive進行計算
val worker = shuffledAliveWorkers(curPos)
// 2.3 標記分配過的Worker加1
numWorkersVisited += 1
// 2.4 判斷當前的Worker的內(nèi)存和cpu是否滿足Driver的需求
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
// 2.5 如果滿足資源的需求就在當前Worker上啟動Driver
launchDriver(worker, driver)
// 2.6 啟動完成后從等待的隊列中刪除,并將launched標記為true
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
// 3 啟動Executors
startExecutorsOnWorkers()
啟動Driver
我已經(jīng)在上面的源碼中對分配的流程進行了詳細的注釋丈钙,現(xiàn)在我們看一下launchDriver方法:
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
// 1. 打日志
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
// 2. 向worker中添加driver的信息非驮,包括增加已經(jīng)使用的內(nèi)存和cpu信息
worker.addDriver(driver)
// 3. 向driver中添加該worker的引用
driver.worker = Some(worker)
// 4. 向Worker發(fā)送LaunchDriver的消息,通知Worker啟動Driver
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
// 5. 將driver的狀態(tài)變成RUNNING
driver.state = DriverState.RUNNING
}
接下來我們看一下對應(yīng)的Worker在接收到LaunchDriver消息后是怎么處理的:
case LaunchDriver(driverId, driverDesc) => {
// 1. 打日志
logInfo(s"Asked to launch driver $driverId")
// 2. 實例化DriverRunner
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
// 3. 實例化完成后向drivers中添加該driver的記錄
drivers(driverId) = driver
// 4. 啟動driver
driver.start()
// 5. 啟動完成后記錄資源的變化
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
}
繼續(xù)跟蹤driver.start():
// 英文注釋說的很清楚:啟動一個線程來運行和管理driver
/** Starts a thread to run and manage the driver. */
private[worker] def start() = {
new Thread("DriverRunner for " + driverId) {
override def run() {
try {
// 創(chuàng)建driver的工作目錄
val driverDir = createWorkingDirectory()
// 下載用戶的Jar文件到driver的工作目錄并返回路徑名稱
val localJarFilename = downloadUserJar(driverDir)
def substituteVariables(argument: String): String = argument match {
case "{{WORKER_URL}}" => workerUrl
case "{{USER_JAR}}" => localJarFilename
case other => other
}
// TODO: If we add ability to submit multiple jars they should also be added here
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
// 具體的啟動Driver的操作雏赦,這里不再詳細分析
launchDriver(builder, driverDir, driverDesc.supervise)
}
catch {
case e: Exception => finalException = Some(e)
}
val state =
if (killed) {
DriverState.KILLED
} else if (finalException.isDefined) {
DriverState.ERROR
} else {
finalExitCode match {
case Some(0) => DriverState.FINISHED
case _ => DriverState.FAILED
}
}
finalState = Some(state)
worker.send(DriverStateChanged(driverId, state, finalException))
}
}.start()
}
如果啟動成功最后要向worker發(fā)送一條DriverStateChanged的消息劫笙,而Worker在接收到該消息后會調(diào)用handleDriverStateChanged方法進行一系列處理,具體的處理細節(jié)就不再說明星岗,主要的就是向Master發(fā)送一條driverStateChanged的消息填大,Master在接收到該消息后移除Driver的信息:
case DriverStateChanged(driverId, state, exception) => {
state match {
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
}
至此向Driver分配資源并啟動Driver的過程結(jié)束,下面我們看一下啟動Executors即執(zhí)行startExecutorsOnWorkers()的流程俏橘。
啟動Executors
startExecutorsOnWorkers():
/**
* Schedule and launch executors on workers
*/
private def startExecutorsOnWorkers(): Unit = {
// 采用的是先進先出的原則
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps if app.coresLeft > 0) {
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// Filter out workers that don't have enough resources to launch an executor
// 過濾出ALIVE狀態(tài)并且資源滿足要求的workers允华,同時按照空閑cpu cores的個數(shù)倒序排列
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
// 決定在每個worker上面分配多少個cpu cores
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// 然后開始進行分配
// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
我們首先看一下是如何決定在每個worker上分配多少個cores的,這里我們只列出scheduleExecutorsOnWorkers方法的英文注釋寥掐,并進行說明靴寂,具體的操作大家可以去看源碼:
/**
* Schedule executors to be launched on the workers.
* Returns an array containing number of cores assigned to each worker.
*
* There are two modes of launching executors. The first attempts to spread out an application's
* executors on as many workers as possible, while the second does the opposite (i.e. launch them
* on as few workers as possible). The former is usually better for data locality purposes and is
* the default.
*
* The number of cores assigned to each executor is configurable. When this is explicitly set,
* multiple executors from the same application may be launched on the same worker if the worker
* has enough cores and memory. Otherwise, each executor grabs all the cores available on the
* worker by default, in which case only one executor may be launched on each worker.
*
* It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
* at a time). Consider the following example: cluster has 4 workers with 16 cores each.
* User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
* allocated at a time, 12 cores from each worker would be assigned to each executor.
* Since 12 < 16, no executors would launch [SPARK-8881].
*/
大致意思是說有兩種分配模型,第一種是將executors分配到盡可能多的workers上召耘;第二種與第一種相反百炬。默認使用的是第一種模型,這種模型更加符合數(shù)據(jù)的本地性原則污它,為每個Executor分配的cores的個數(shù)是可以進行配置的(spark-submit 或者 spark-env.sh)剖踊,如果設(shè)置了庶弃,多個executors可能會被分配在一個worker上(前提是該worker擁有足夠的cores和memory),否則每個executor會充分利用worker上的cores德澈,這種情況下一個executor會被分配在一個worker上歇攻。具體在集群上分配cores的時候會盡可能的滿足我們的要求,如果需要的cores的個數(shù)大于workers中空閑的cores的個數(shù)圃验,那么就先分配空閑的cores掉伏,盡可能的去滿足要求。
接下來就是具體為executors分配計算資源并啟動executors的過程:
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// If the number of cores per executor is specified, we divide the cores assigned
// to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
// 向application中添加executor的信息
val exec = app.addExecutor(worker, coresToAssign)
// 啟動executors
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
啟動executors:
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
// 向worker發(fā)消息啟動executor
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
// 然后向driver發(fā)送executors的信息
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
worker在接收到啟動executor的消息后執(zhí)行具體的啟動操作澳窑,并向Master匯報斧散。
然后也要向driver發(fā)送executors的資源信息,driver收到信息后執(zhí)行application摊聋,至此分配并啟動executors的大致流程也就執(zhí)行完畢鸡捐。
最后用一張圖總結(jié)一下啟動Driver和Worker的簡易流程:
本文只是大致的分析了Master在執(zhí)行schedule的時候具體為Driver、Executors分配資源并啟動它們的流程麻裁,以后我們還會分析整個application的運行流程箍镜,那時我們會具體進行分析。
本文參考和拓展閱讀:
本文為原創(chuàng)煎源,歡迎轉(zhuǎn)載色迂,轉(zhuǎn)載請注明出處、作者手销,謝謝歇僧!