TaskScheduler 源碼淺析

TaskScheduler

TaskScheduler 負責對 DAGScheduler 提交過來的 Task 與最佳位置的 Executor 進行綁定喇完,然后通過 SchedulerBackend 發(fā)送到 Executor 上去執(zhí)行。

image

在這個版本中层坠,TaskScheduler 只有一個實現類,就是 TaskSchedulerImpl买猖,在 Spark-Core 的 org.apache.spark.scheduler 包下改橘。

源碼

在 SparkContext 中對 TaskScheduler 進行了初始化操作,在 SparkContext 概覽中提到過:

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_taskScheduler = ts

// 對 TaskScheduler 進行了啟動
_taskScheduler.start()

SparkContext.createTaskScheduler() 會根據運行模式的不同玉控,創(chuàng)建不同類型的 SchedulerBackend飞主,我這里以 Standalone 模式為例:

private def createTaskScheduler(...) = {
  import SparkMasterRegex._
    
  master match {
      
    // Standalone 模式
    case SPARK_REGEX(sparkUrl) =>
      val scheduler = new TaskSchedulerImpl(sc)
      val masterUrls = sparkUrl.split(",").map("spark://" + _)
      val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
      scheduler.initialize(backend)
      (backend, scheduler)
      
      // ...
  }
    
}

我們先看下 TaskScheduler.initialize() 方法:

def initialize(backend: SchedulerBackend) {
  this.backend = backend
  // 根據 conf 中的設置,選擇不同模式的任務調度器
  // 通過設定 SCHEDULER_MODE_PROPERTY 這個值來更改
  // 默認為 FIFO
  schedulableBuilder = {
    schedulingMode match {
     // 先進先出
      case SchedulingMode.FIFO =>
        // RootPool => TaskSetManager 的調度池(一個隊列)
        new FIFOSchedulableBuilder(rootPool)
      // 公平
      case SchedulingMode.FAIR =>
        new FairSchedulableBuilder(rootPool, conf)
      case _ =>
        throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
        s"$schedulingMode")
    }
  }
  // 創(chuàng)建樹形節(jié)點
  schedulableBuilder.buildPools()
}

在 DAGScheduler 劃分完 Stage 后高诺,會將其封裝成 TaskSet 并提交給 TaskScheduler.submitTasks() 來做進一步的工作碌识,我們先看下它的實現細節(jié):

override def submitTasks(taskSet: TaskSet) {
  // 取出 TaskSet 中的 Task
  val tasks = taskSet.tasks
  this.synchronized {
    // 為每個 TaskSet 創(chuàng)建一個 TaskSetManager
    // TaskSetManager 負責任務失敗時的重試工作
    val manager = createTaskSetManager(taskSet, maxTaskFailures)
    // 將 manager 添加到 schedulableBuilder 中
    // schedulableBuilder 負責 TaskSetManager 的調度
    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

    // 其它操作

    hasReceivedTask = true
  }
  // 資源的分配
  backend.reviveOffers()
}

我們先看下 SchedulableBuilder.addTaskSetManager() 的實現細節(jié),以 FIFO 模式為例:

override def addTaskSetManager(manager: Schedulable, properties: Properties) {
  rootPool.addSchedulable(manager)
}

rootPool.addSchedulable() 的實現細節(jié):

override def addSchedulable(schedulable: Schedulable) {
  require(schedulable != null)
  // 將 TaskSetManager 放入到 schedulable 隊列中
  schedulableQueue.add(schedulable)
  schedulableNameToSchedulable.put(schedulable.name, schedulable)
  schedulable.parent = this
}

從代碼中可以看出虱而,當 DAGScheduler 提交完 TaskSet 后筏餐,就會為其創(chuàng)建一個 TaskSetManager,然后將 TaskSetManager 放入到 TaskSetManager 隊列(池)中去等待執(zhí)行牡拇。

接下來我們看看 backend.reviveOffers() 的實現細節(jié)魁瞪,看看 TaskSetManager 是如何被調用的:

override def reviveOffers() {
  // 發(fā)送一個 ReviveOffers 消息
  // DriverEndpoint 上文提到過
  driverEndpoint.send(ReviveOffers)
}

DriverEndpoint.receive() 會對這個消息進行處理 (CoarseGrainedSchedulerBackend 的內部類):

override def receive: PartialFunction[Any, Unit] = {

  case ReviveOffers =>
    makeOffers()

  // 略略略
    
}

makeOffers() 的實現細節(jié):

private def makeOffers() {
  val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
    // executorDataMap 為 executor 字典
    // 上文提到過,將反注冊過來的 Executor 都放到了這里
    // 找出活躍的 Executor
    val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
    // 封裝 Executor 可用的資源量和聯(lián)系方式
    val workOffers = activeExecutors.map { case (id, executorData) =>
      new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
    }.toIndexedSeq
    // 將可用的 Executor 資源信息發(fā)送給 TaskScheduler.resourceOffers()
    // TaskScheduler 會按最優(yōu)的條件將 Task 與 Executor 進行綁定并返回其集合
    scheduler.resourceOffers(workOffers)
  }
  // 將與 Executor 綁定完的 Task 交給 Eexecutor 去執(zhí)行
  if (!taskDescs.isEmpty) {
    // 在每個 Executor 上啟動分別啟動其對應的 Task
    launchTasks(taskDescs)
  }

TaskScheduler.resourceOffers() 的實現細節(jié):

def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {

  // 要執(zhí)行的任務集
  // TaskDescription 中有 Task ID 和 Executor ID
  // 下面的代碼會將 Task 與 Executor 進行綁定惠呼,確定任務要到哪個 Executor 上去執(zhí)行
  val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    
  // 獲取排序后的等待執(zhí)行的 TaskSetManager 
  val sortedTaskSets = rootPool.getSortedTaskSetQueue
  for (taskSet <- sortedTaskSets) {
    if (newExecAvail) {
      // 計算每個 Task 執(zhí)行的位置(多個)
      // 有興趣的可以點進去看看
      taskSet.executorAdded()
    }
  }

  // 根據優(yōu)先級(本地导俘、機架...) 將每個 Task 與 Executor 進行綁定
  for (taskSet <- sortedTaskSets) {
    var launchedAnyTask = false
    var launchedTaskAtCurrentMaxLocality = false
    for (currentMaxLocality <- taskSet.myLocalityLevels) {
      do {
        launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
          taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
        launchedAnyTask |= launchedTaskAtCurrentMaxLocality
      } while (launchedTaskAtCurrentMaxLocality)
    }
    if (!launchedAnyTask) {
      taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
    }
  }

  // 將與 Executor 綁定完的 Task 集合返回給 SchedulerBackend
  // SchedulerBackend 接下來會提交給 Executor 去執(zhí)行
  return tasks
}

SchedulerBackend.launchTasks() 的實現細節(jié):

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
    val serializedTask = TaskDescription.encode(task)
    if (serializedTask.limit >= maxRpcMessageSize) {
      // 資源不夠用
      scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
        // 其它操作
      }
    }
    else {
      // 其它操作

      // 向 Executor 發(fā)送了一個啟動任務的請求
      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
  }
}

// Executor 收到請求后
// 先簡單看一下
case LaunchTask(data) =>
  if (executor == null) {
    exitExecutor(1, "Received LaunchTask command but executor was null")
  } else {
    val taskDesc = TaskDescription.decode(data.value)
    logInfo("Got assigned task " + taskDesc.taskId)
    // 啟動任務
    executor.launchTask(this, taskDesc)
  }

總的來說,TaskScheduler 負責著任務的調度剔蹋、喚醒與重啟的工作旅薄。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市泣崩,隨后出現的幾起案子少梁,更是在濱河造成了極大的恐慌,老刑警劉巖矫付,帶你破解...
    沈念sama閱讀 218,546評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件凯沪,死亡現場離奇詭異,居然都是意外死亡技即,警方通過查閱死者的電腦和手機著洼,發(fā)現死者居然都...
    沈念sama閱讀 93,224評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來而叼,“玉大人,你說我怎么就攤上這事豹悬】辏” “怎么了?”我有些...
    開封第一講書人閱讀 164,911評論 0 354
  • 文/不壞的土叔 我叫張陵瞻佛,是天一觀的道長脱篙。 經常有香客問我娇钱,道長,這世上最難降的妖魔是什么绊困? 我笑而不...
    開封第一講書人閱讀 58,737評論 1 294
  • 正文 為了忘掉前任文搂,我火速辦了婚禮,結果婚禮上秤朗,老公的妹妹穿的比我還像新娘煤蹭。我一直安慰自己,他們只是感情好取视,可當我...
    茶點故事閱讀 67,753評論 6 392
  • 文/花漫 我一把揭開白布硝皂。 她就那樣靜靜地躺著,像睡著了一般作谭。 火紅的嫁衣襯著肌膚如雪稽物。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,598評論 1 305
  • 那天折欠,我揣著相機與錄音贝或,去河邊找鬼。 笑死锐秦,一個胖子當著我的面吹牛傀缩,可吹牛的內容都是我干的。 我是一名探鬼主播农猬,決...
    沈念sama閱讀 40,338評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼赡艰,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了斤葱?” 一聲冷哼從身側響起慷垮,我...
    開封第一講書人閱讀 39,249評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎揍堕,沒想到半個月后料身,有當地人在樹林里發(fā)現了一具尸體,經...
    沈念sama閱讀 45,696評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡衩茸,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,888評論 3 336
  • 正文 我和宋清朗相戀三年芹血,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片楞慈。...
    茶點故事閱讀 40,013評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡幔烛,死狀恐怖,靈堂內的尸體忽然破棺而出囊蓝,到底是詐尸還是另有隱情饿悬,我是刑警寧澤,帶...
    沈念sama閱讀 35,731評論 5 346
  • 正文 年R本政府宣布聚霜,位于F島的核電站狡恬,受9級特大地震影響珠叔,放射性物質發(fā)生泄漏。R本人自食惡果不足惜弟劲,卻給世界環(huán)境...
    茶點故事閱讀 41,348評論 3 330
  • 文/蒙蒙 一祷安、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧兔乞,春花似錦汇鞭、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至锚国,卻和暖如春腕巡,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背血筑。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評論 1 270
  • 我被黑心中介騙來泰國打工绘沉, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人豺总。 一個月前我還...
    沈念sama閱讀 48,203評論 3 370
  • 正文 我出身青樓车伞,卻偏偏與公主長得像,于是被迫代替她去往敵國和親喻喳。 傳聞我的和親對象是個殘疾皇子另玖,可洞房花燭夜當晚...
    茶點故事閱讀 44,960評論 2 355