TaskScheduler
TaskScheduler 負責對 DAGScheduler 提交過來的 Task 與最佳位置的 Executor 進行綁定喇完,然后通過 SchedulerBackend 發(fā)送到 Executor 上去執(zhí)行。
在這個版本中层坠,TaskScheduler 只有一個實現類,就是 TaskSchedulerImpl买猖,在 Spark-Core 的 org.apache.spark.scheduler 包下改橘。
源碼
在 SparkContext 中對 TaskScheduler 進行了初始化操作,在 SparkContext 概覽中提到過:
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_taskScheduler = ts
// 對 TaskScheduler 進行了啟動
_taskScheduler.start()
SparkContext.createTaskScheduler() 會根據運行模式的不同玉控,創(chuàng)建不同類型的 SchedulerBackend飞主,我這里以 Standalone 模式為例:
private def createTaskScheduler(...) = {
import SparkMasterRegex._
master match {
// Standalone 模式
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
// ...
}
}
我們先看下 TaskScheduler.initialize() 方法:
def initialize(backend: SchedulerBackend) {
this.backend = backend
// 根據 conf 中的設置,選擇不同模式的任務調度器
// 通過設定 SCHEDULER_MODE_PROPERTY 這個值來更改
// 默認為 FIFO
schedulableBuilder = {
schedulingMode match {
// 先進先出
case SchedulingMode.FIFO =>
// RootPool => TaskSetManager 的調度池(一個隊列)
new FIFOSchedulableBuilder(rootPool)
// 公平
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
// 創(chuàng)建樹形節(jié)點
schedulableBuilder.buildPools()
}
在 DAGScheduler 劃分完 Stage 后高诺,會將其封裝成 TaskSet 并提交給 TaskScheduler.submitTasks() 來做進一步的工作碌识,我們先看下它的實現細節(jié):
override def submitTasks(taskSet: TaskSet) {
// 取出 TaskSet 中的 Task
val tasks = taskSet.tasks
this.synchronized {
// 為每個 TaskSet 創(chuàng)建一個 TaskSetManager
// TaskSetManager 負責任務失敗時的重試工作
val manager = createTaskSetManager(taskSet, maxTaskFailures)
// 將 manager 添加到 schedulableBuilder 中
// schedulableBuilder 負責 TaskSetManager 的調度
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
// 其它操作
hasReceivedTask = true
}
// 資源的分配
backend.reviveOffers()
}
我們先看下 SchedulableBuilder.addTaskSetManager() 的實現細節(jié),以 FIFO 模式為例:
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
rootPool.addSchedulable(manager)
}
rootPool.addSchedulable() 的實現細節(jié):
override def addSchedulable(schedulable: Schedulable) {
require(schedulable != null)
// 將 TaskSetManager 放入到 schedulable 隊列中
schedulableQueue.add(schedulable)
schedulableNameToSchedulable.put(schedulable.name, schedulable)
schedulable.parent = this
}
從代碼中可以看出虱而,當 DAGScheduler 提交完 TaskSet 后筏餐,就會為其創(chuàng)建一個 TaskSetManager,然后將 TaskSetManager 放入到 TaskSetManager 隊列(池)中去等待執(zhí)行牡拇。
接下來我們看看 backend.reviveOffers() 的實現細節(jié)魁瞪,看看 TaskSetManager 是如何被調用的:
override def reviveOffers() {
// 發(fā)送一個 ReviveOffers 消息
// DriverEndpoint 上文提到過
driverEndpoint.send(ReviveOffers)
}
DriverEndpoint.receive() 會對這個消息進行處理 (CoarseGrainedSchedulerBackend 的內部類):
override def receive: PartialFunction[Any, Unit] = {
case ReviveOffers =>
makeOffers()
// 略略略
}
makeOffers() 的實現細節(jié):
private def makeOffers() {
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
// executorDataMap 為 executor 字典
// 上文提到過,將反注冊過來的 Executor 都放到了這里
// 找出活躍的 Executor
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
// 封裝 Executor 可用的資源量和聯(lián)系方式
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
// 將可用的 Executor 資源信息發(fā)送給 TaskScheduler.resourceOffers()
// TaskScheduler 會按最優(yōu)的條件將 Task 與 Executor 進行綁定并返回其集合
scheduler.resourceOffers(workOffers)
}
// 將與 Executor 綁定完的 Task 交給 Eexecutor 去執(zhí)行
if (!taskDescs.isEmpty) {
// 在每個 Executor 上啟動分別啟動其對應的 Task
launchTasks(taskDescs)
}
TaskScheduler.resourceOffers() 的實現細節(jié):
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// 要執(zhí)行的任務集
// TaskDescription 中有 Task ID 和 Executor ID
// 下面的代碼會將 Task 與 Executor 進行綁定惠呼,確定任務要到哪個 Executor 上去執(zhí)行
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
// 獲取排序后的等待執(zhí)行的 TaskSetManager
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
if (newExecAvail) {
// 計算每個 Task 執(zhí)行的位置(多個)
// 有興趣的可以點進去看看
taskSet.executorAdded()
}
}
// 根據優(yōu)先級(本地导俘、機架...) 將每個 Task 與 Executor 進行綁定
for (taskSet <- sortedTaskSets) {
var launchedAnyTask = false
var launchedTaskAtCurrentMaxLocality = false
for (currentMaxLocality <- taskSet.myLocalityLevels) {
do {
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
}
// 將與 Executor 綁定完的 Task 集合返回給 SchedulerBackend
// SchedulerBackend 接下來會提交給 Executor 去執(zhí)行
return tasks
}
SchedulerBackend.launchTasks() 的實現細節(jié):
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = TaskDescription.encode(task)
if (serializedTask.limit >= maxRpcMessageSize) {
// 資源不夠用
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
// 其它操作
}
}
else {
// 其它操作
// 向 Executor 發(fā)送了一個啟動任務的請求
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
// Executor 收到請求后
// 先簡單看一下
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
// 啟動任務
executor.launchTask(this, taskDesc)
}
總的來說,TaskScheduler 負責著任務的調度剔蹋、喚醒與重啟的工作旅薄。