[Spark源碼剖析]Task的調(diào)度與執(zhí)行源碼剖析

本文基于Spark 1.3.1,Standalone模式

一個Spark Application分為stage級別和task級別的調(diào)度,stage級別的調(diào)度已經(jīng)用[DAGScheduler劃分stage]和[DAGScheduler提交stage]兩片文章進(jìn)行源碼層面的說明暑椰,本文將從源碼層面剖析task是如何被調(diào)度和執(zhí)行的。

函數(shù)調(diào)用流程

先給出task調(diào)度的總體函數(shù)調(diào)用流程桶错,并說明每個關(guān)鍵函數(shù)是干嘛的漫谷。這樣一開始就在心里有個大概的流程圖,便于之后的理解屋匕。

//< DAGScheduler調(diào)用該taskScheduler.submitTasks提交一個stage對應(yīng)的taskSet葛碧,一個taskSet包含多個task
TaskSchedulerImpl.submitTasks(taskSet: TaskSet)
    //< TaskScheduler(實(shí)際上是TaskSchedulerImpl)為DAGScheduler提交的每個taskSet創(chuàng)建一個對應(yīng)的TaskSetManager對象,TaskSetManager用于調(diào)度同一個taskSet中的task
    val manager = TaskSchedulerImpl.createTaskSetManager(taskSet, maxTaskFailures)
    //< 將新創(chuàng)建的manager加入到調(diào)度樹中过吻,調(diào)度樹由SchedulableBulider維護(hù)进泼。有FIFO、Fair兩種實(shí)現(xiàn)
    SchedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
        //< 觸發(fā)調(diào)用CoarseGrainedSchedulerBackend.reviveOffers()纤虽,它將通過發(fā)送事件觸發(fā)makeOffers方法調(diào)用
        CoarseGrainedSchedulerBackend.reviveOffers()
            //< 此處為發(fā)送ReviveOffers事件
            driverEndpoint.send(ReviveOffers)
        //< 此處為接收事件并處理
        CoarseGrainedSchedulerBackend.receive 
            CoarseGrainedSchedulerBackend.makeOffers
                //< 查找各個節(jié)點(diǎn)空閑資源(這里是cores)乳绕,并返回要在哪些節(jié)點(diǎn)上啟動哪些tasks的對應(yīng)關(guān)系,用Seq[Seq[TaskDescription]]表示
                TaskSchedulerImpl.resourceOffers
            //< 啟動對應(yīng)task
            CoarseGrainedSchedulerBackend.launchTasks
                executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

看了上述流程可能不那么明白廓推,沒關(guān)系刷袍,不明白才要往下看。

TaskSchedulerImpl.submitTasks(...)

在Spark 1.3.1版本中樊展,TaskSchedulerImpl是TaskScheduler的唯一實(shí)現(xiàn)呻纹。submitTasks函數(shù)主要作用如下源碼及注釋所示:

  1. 為taskSet創(chuàng)建對應(yīng)的TaskSetManager對象。TaskManager的主要功能在于對Task的細(xì)粒度調(diào)度专缠,比如
    • 決定在某個executor上是否啟動及啟動哪個task
    • 為了達(dá)到Locality aware雷酪,將Task的調(diào)度做相應(yīng)的延遲
    • 當(dāng)一個Task失敗的時候,在約定的失敗次數(shù)之內(nèi)時涝婉,將Task重新提交
    • 處理拖后腿的task
  2. 調(diào)用SchedulerBackend.makeOffers進(jìn)入下一步
override def submitTasks(taskSet: TaskSet) {
  val tasks = taskSet.tasks
  logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
  this.synchronized {
    //< 為stage對應(yīng)的taskSet創(chuàng)建TaskSetManager對象
    val manager = createTaskSetManager(taskSet, maxTaskFailures)
    //< 建立taskset與TaskSetManager的對應(yīng)關(guān)系
    activeTaskSets(taskSet.id) = manager

    //< TaskSetManager會被放入調(diào)度池(Pool)當(dāng)中哥力。
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

    //< 設(shè)置定時器,若task還沒啟動墩弯,則一直輸出未分配到資源報(bào)警(輸出警告日志)
    if (!isLocal && !hasReceivedTask) { 
      starvationTimer.scheduleAtFixedRate(new TimerTask() {
        override def run() {
          if (!hasLaunchedTask) {
            logWarning("Initial job has not accepted any resources; " +
              "check your cluster UI to ensure that workers are registered " +
              "and have sufficient resources")
          } else {
            this.cancel()
          }
        }
      }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
    }
    hasReceivedTask = true
  }

  //< 將處觸發(fā)調(diào)用SchedulerBackend.makeOffers來為tasks分配資源吩跋,調(diào)度任務(wù)
  backend.reviveOffers()
}

基于事件模型的調(diào)用

下面源碼及注釋展示了CoarseGrainedSchedulerBackend是如何通過事件模型來進(jìn)一步調(diào)用的。其中ReviveOffers事件有兩種觸發(fā)模式:

  1. 周期性觸發(fā)的渔工,默認(rèn)1秒一次
  2. reviveOffers被TaskSchedulerImpl.reviveOffers()調(diào)用
  override def reviveOffers() {
    driverEndpoint.send(ReviveOffers)
  }
  
  override def receive: PartialFunction[Any, Unit] = {
  //< 此處省略n行代碼
   
  case ReviveOffers =>
    makeOffers()

  //< 此處省略n行代碼
  }

CoarseGrainedSchedulerBackend.makeOffers()

該函數(shù)非常重要锌钮,它將集群的資源以O(shè)ffer的方式發(fā)給上層的TaskSchedulerImpl。TaskSchedulerImpl調(diào)用scheduler.resourceOffers獲得要被執(zhí)行的Seq[TaskDescription]引矩,然后將得到的Seq[TaskDescription]交給CoarseGrainedSchedulerBackend分發(fā)到各個executor上執(zhí)行

    def makeOffers() {
      launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq))
    }

為便于理解makeOffers調(diào)用及間接調(diào)用的各個流程梁丘,將該函數(shù)實(shí)現(xiàn)分為三個step來分析侵浸,這需要對源碼的表現(xiàn)形式做一點(diǎn)點(diǎn)改動,但并不會有任何影響氛谜。

Step1: val seq = executorDataMap.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq

executorDataMap是HashMap[String, ExecutorData]類型掏觉,在該HashMap中key為executor id,value為ExecutorData類型(包含executor的host值漫,RPC信息澳腹,TotalCores,F(xiàn)reeCores信息)

//< 代表一個executor上的可用資源(這里僅可用cores)
private[spark]
case class WorkerOffer(executorId: String, host: String, cores: Int)

這段代碼惭嚣,返回HashMap[executorId, WorkerOffer]遵湖。每個WorkerOffer包含executor的id,host及其上可用cores信息晚吞。

Step2: val taskDescs = scheduler.resourceOffers( seq )

拿到集群里的executor及其對應(yīng)WorkerOffer后延旧,就要開始第二個步驟,即找出要在哪些Worker上啟動哪些task槽地。這個過程比較長迁沫,也比較復(fù)雜。讓我來一層層撥開迷霧捌蚊。

我把val taskDescs = scheduler.resourceOffers( seq )TaskSchedulerImpl.resourceOffers(offers: Seq[WorkerOffer])集畅,返回的是Seq[Seq[TaskDescription]] 類型,來看看其實(shí)現(xiàn):

def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    //< 標(biāo)記每個slave為alive并記錄它們的hostname
    var newExecAvail = false
    //< 此處省略更新executor缅糟,host挺智,rack信息代碼;這里會根據(jù)是否有新的executor更新newExecAvail的值
  
    //< 為了負(fù)載均衡窗宦,打亂offers順序赦颇,Random.shuffle用于將一個集合中的元素打亂
    val shuffledOffers = Random.shuffle(offers)
    //< 事先創(chuàng)建好用于存放要在各個worker上launch的 List[workerId, ArrayBuffer[TaskDescription]]。
    //< 由于task要使用的cores并不一定為1赴涵,所以每個worker上要launch得task并不一定等于可用的cores數(shù)
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    //< 每個executor上可用的cores
    val availableCpus = shuffledOffers.map(o => o.cores).toArray

    //< 返回排序過的TaskSet隊(duì)列媒怯,有FIFO及Fair兩種排序規(guī)則,默認(rèn)為FIFO髓窜,可通過配置修改
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      //< 如果有新的executor added扇苞,更新TaskSetManager可用的executor
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }

    var launchedTask = false
    //< 依次取出排序過的taskSet列表中的taskSet;
    //< 對于每個taskSet寄纵,取出其tasks覆蓋的所有l(wèi)ocality鳖敷,從高到低依次遍歷每個等級的locality;
    //< 取出了taskSet及本次要處理的locality后程拭,根據(jù)該taskSet及l(fā)ocality遍歷所有可用的worker哄陶,找出可以在各個worker上啟動的task,加到tasks:Seq[Seq[TaskDescription]]中
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        //< 獲取tasks哺壶,tasks代表要在哪些worker上啟動哪些tasks
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }

結(jié)合代碼屋吨,概括起來說,Step2又可以分為4個SubStep:

  • 【SubStep1】: executor, host, rack等信息更新
  • 【SubStep2】: 隨機(jī)打亂workers山宾。目的是為了分配tasks能負(fù)載均衡至扰,分配tasks時,是從打亂的workers的序列的0下標(biāo)開始判斷是否能在worker上啟動task的
  • 【SubStep3】: RootPool對它包含的所有的TaskSetManagers進(jìn)行排序并返回已排序的TaskSetManager數(shù)組资锰。這里涉及到RootPool概念及如何排序敢课,將會在下文展開說明
  • 【SubStep4】: 對于RootPool返回的排序后的ArrayBuffer[TaskSetManager]中的每一個TaskSetManager,取出其包含的tasks包含的所有l(wèi)ocality绷杜。根據(jù)locality從高到低直秆,對于每個locality,遍歷所有worker鞭盟,結(jié)合延遲調(diào)度機(jī)制圾结,判斷TaskSetManager的哪些tasks可以在哪些workers上啟動。這里比較需要進(jìn)一步說明的是“延遲調(diào)度機(jī)制”及如何判斷某個TaskSetManager里的tasks是否有可以在某個worker上啟動

下面齿诉,就對SubStep3及SubStep4進(jìn)行展開說明

【SubStep3】

SubStep3的職責(zé)是"RootPool對它包含的所有的TaskSetManagers進(jìn)行排序并返回已排序的TaskSetManager數(shù)組"筝野。那么什么是RootPool呢?每個Spark Application包含唯一一個TaskScheduler對象粤剧,該TaskScheduler對象包含唯一一個RootPool歇竟,Spark Application包含的所有Job的所有stage對應(yīng)的所有未完成的TaskSetManager都會保存在RootPool中,完成后從RootPool中remove抵恋。RootPool為org.apache.spark.scheduler.Pool類型焕议,稱作調(diào)度池。Pool的概念與YARN中隊(duì)列的概念比較類似弧关,一個隊(duì)列可以包含子隊(duì)列盅安,相對的一個Pool可以包含子Pool;YARN隊(duì)列的葉子節(jié)點(diǎn)即提交到該隊(duì)列的Application梯醒,Pool的葉子節(jié)點(diǎn)即分配到該P(yáng)ool的TaskSetManager宽堆。Pool根據(jù)調(diào)度模式的不同,分為FIFO及Fair茸习。FIFO模式下只有一層Pool畜隶,不同于YARN的隊(duì)列可以n多層,Pool的Fair調(diào)度模式下号胚,只能有三層:RootPool籽慢,RootPool的子Pools,子Pools的葉子節(jié)點(diǎn)(即TaskSetManager)猫胁。

不同的調(diào)度模式添加葉子節(jié)點(diǎn)的實(shí)現(xiàn)是一樣的箱亿,如下:

  override def addSchedulable(schedulable: Schedulable) {
    require(schedulable != null)
    //< 當(dāng)我們添加一個元素的時候,它會添加到隊(duì)列的尾部弃秆,當(dāng)我們獲取一個元素時届惋,它會返回隊(duì)列頭部的元素
    schedulableQueue.add(schedulable)
    schedulableNameToSchedulable.put(schedulable.name, schedulable)
    schedulable.parent = this
  }

Schedulable類型的參數(shù)schedulable包含成員val parent: Pool髓帽,即父Pool,所以在添加TaskSetManager到Pool的時候就指定了父Pool脑豹。對于FIFO郑藏,所有的TaskSetManager的父Pool都是RootPool;對于Fair瘩欺,TaskSetManager的父Pool即RootPool的某個子Pool必盖。

不同的模式,除了Pool的層級結(jié)構(gòu)不同俱饿,對它包含的TaskSetManagers進(jìn)行排序時使用的算法也不同歌粥。FIFO對應(yīng)FIFOSchedulingAlgorithm類,F(xiàn)air對應(yīng)FairSchedulingAlgorithm()類

  var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
    schedulingMode match {
      case SchedulingMode.FAIR =>
        new FairSchedulingAlgorithm()
      case SchedulingMode.FIFO =>
        new FIFOSchedulingAlgorithm()
    }
  }

當(dāng)Pool.getSortedTaskSetQueue被調(diào)用時拍埠,就會用到該排序類失驶,如下:

  //< 利用排序算法taskSetSchedulingAlgorithm先對以本pool作為父pool的子pools做排序,再對排序后的pool中的每個TaskSetManager排序械拍;
  //< 得到最終排好序的 ArrayBuffer[TaskSetManager]
  override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue =
      schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
    //< FIFO不會調(diào)到這里突勇,直接走到下面的return  
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
    }
    sortedTaskSetQueue
  }

FIFO排序類中的比較函數(shù)的實(shí)現(xiàn)很簡單:

  1. Schedulable A和Schedulable B的優(yōu)先級,優(yōu)先級值越小坷虑,優(yōu)先級越高
  2. A優(yōu)先級與B優(yōu)先級相同甲馋,若A對應(yīng)stage id越小,優(yōu)先級越高
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    if (res < 0) {
      true
    } else {
      false
    }
  }
}

Pool及TaskSetManager都繼承于Schedulable迄损,來看下它的定義:

private[spark] trait Schedulable {
  var parent: Pool
  // child queues
  def schedulableQueue: ConcurrentLinkedQueue[Schedulable]
  def schedulingMode: SchedulingMode
  def weight: Int
  def minShare: Int
  def runningTasks: Int
  def priority: Int
  def stageId: Int
  def name: String

    //< 省略若干代碼
}

可以看到定躏,Schedulable包含weight(權(quán)重)、priority(優(yōu)先級)芹敌、minShare(最小共享量)等屬性痊远。其中:

  • weight:權(quán)重,默認(rèn)是1氏捞,設(shè)置為2的話碧聪,就會比其他調(diào)度池獲得2x多的資源,如果設(shè)置為-1000液茎,該調(diào)度池一有任務(wù)就會馬上運(yùn)行
  • minShare:最小共享核心數(shù)逞姿,默認(rèn)是0,在權(quán)重相同的情況下捆等,minShare大的滞造,可以獲得更多的資源

對于Fair調(diào)度模式下的比較,實(shí)現(xiàn)如下:


private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
    var compare:Int = 0

    if (s1Needy && !s2Needy) {
      return true
    } else if (!s1Needy && s2Needy) {
      return false
    } else if (s1Needy && s2Needy) {
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }

    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
      s1.name < s2.name
    }
  }
}

結(jié)合以上代碼栋烤,我們可以比較容易看出Fair調(diào)度模式的比較邏輯:

  1. 正在運(yùn)行的task個數(shù)小于最小共享核心數(shù)的要比不小于的優(yōu)先級高
  2. 若兩者正在運(yùn)行的task個數(shù)都小于最小共享核心數(shù)秘通,則比較minShare使用率的值屯换,即runningTasks.toDouble / math.max(minShare, 1.0).toDouble邀泉,越小則優(yōu)先級越高
  3. 若minShare使用率相同,則比較權(quán)重使用率丰泊,即runningTasks.toDouble / s.weight.toDouble,越小則優(yōu)先級越高
  4. 如果權(quán)重使用率還相同蔑祟,則比較兩者的名字

對于Fair調(diào)度模式趁耗,需要先對RootPool的各個子Pool進(jìn)行排序,再對子Pool中的TaskSetManagers進(jìn)行排序疆虚,使用的算法都是FairSchedulingAlgorithm.FairSchedulingAlgorithm

到這里满葛,應(yīng)該說清楚了整個SubStep3的流程径簿。

SubStep4

SubStep4說白了就是已經(jīng)知道了哪些worker上由多少可用cores了,然后要決定要在哪些worker上啟動哪些tasks:

//< 事先創(chuàng)建好用于存放要在各個worker上launch的 List[workerId, ArrayBuffer[TaskDescription]]嘀韧。
//< 由于task要使用的cores并不一定為1篇亭,所以每個worker上要launch得task并不一定等于可用的cores數(shù)
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))

var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
  do {
    //< 獲取tasks,tasks代表要在哪些worker上啟動哪些tasks
    launchedTask = resourceOfferSingleTaskSet(
        taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
  } while (launchedTask)
}

從for循環(huán)可以看到锄贷,該過程對排好序的taskSet數(shù)組的每一個元素译蒂,從locality優(yōu)先級從高到低(taskSet.myLocalityLevels返回該taskSet包含的所有task包含的locality,按locality從高到低排列谊却,PROCESS_LOCAL最高)取出locality柔昼,以取出的taskSet和locality調(diào)用TaskSchedulerImpl.resourceOfferSingleTaskSet,來看下它的實(shí)現(xiàn)(為方便閱讀及理解炎辨,刪去一些代碼):

private def resourceOfferSingleTaskSet(
    taskSet: TaskSetManager,
    maxLocality: TaskLocality,
    shuffledOffers: Seq[WorkerOffer],
    availableCpus: Array[Int],
    tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
  var launchedTask = false

  //< 獲取每個worker上要執(zhí)行的tasks序列
  for (i <- 0 until shuffledOffers.size) {
    val execId = shuffledOffers(i).executorId
    val host = shuffledOffers(i).host
    if (availableCpus(i) >= CPUS_PER_TASK) {
      try {
        for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
          //< 將獲得要在index為i的worker上執(zhí)行的task捕透,添加到tasks(i)中;這樣就知道了要在哪個worker上執(zhí)行哪些tasks了
          tasks(i) += task

          availableCpus(i) -= CPUS_PER_TASK
          assert(availableCpus(i) >= 0)
          launchedTask = true
        }
      } catch {
        case e: TaskNotSerializableException =>
          return launchedTask
      }
    }
  }
  return launchedTask
}

resourceOfferSingleTaskSet拿到worker可用cores碴萧,taskSet和locality后

  1. 遍歷每個worker的可用cores乙嘀,如果可用cores大于task需要的cores數(shù)(即CPUS_PER_TASK),進(jìn)入2
  2. 調(diào)用taskSet.resourceOffer(execId, host, maxLocality)獲取可在指定executor上啟動的task破喻,若返回非空虎谢,把返回的task加到最終的tasks: Seq[ArrayBuffer[TaskDescription]]中,該結(jié)構(gòu)保存要在哪些worker上啟動哪些tasks
  3. 減少2中分配了task的worker的可用cores及更新其他信息

從以上的分析中可以看出曹质,要在某個executor上啟動哪個task最終的實(shí)現(xiàn)在TaskSetManager.resourceOffer中婴噩,由于該函數(shù)比較長,我將函數(shù)分過幾個過程來分析

首先來看第一段:

//< 如果資源是有l(wèi)ocality特征的
if (maxLocality != TaskLocality.NO_PREF) {
  //< 獲取當(dāng)前taskSet允許執(zhí)行的locality咆繁。getAllowedLocalityLevel隨時間變化而變化
  allowedLocality = getAllowedLocalityLevel(curTime)
  //< 如果允許的locality級別低于maxLocality讳推,則使用maxLocality覆蓋允許的locality
  if (allowedLocality > maxLocality) {
    // We're not allowed to search for farther-away tasks
    //< 臨時將允許的locality級別降低到資源允許的最高locality級別
    allowedLocality = maxLocality
  }
}

要判斷task能否在worker上啟動,除了空閑資源是否達(dá)到task要求外玩般,還需要判斷本地性银觅,即locality。locality從高到低共分為PROCESS_LOCAL, NODE_LOCAL,RACK_LOCAL及ANY坏为。若taskSet帶有l(wèi)ocality屬性究驴,則通過getAllowedLocalityLevel函數(shù)獲得該taskSet能容忍的最低界別locality镊绪。

getAllowedLocalityLevel中:

  1. 如果taskset剛剛被提交,taskScheduler開始第一輪對taskset中的task開始提交洒忧,那么當(dāng)時currentLocalityIndex為0蝴韭,直接返回可用的最好的本地性;如果是在以后的提交過程中熙侍,那么如果當(dāng)前的等待時間超過了一個級別榄鉴,就向后跳一個級別
  2. getAllowedLocalityLevel方法返回的是當(dāng)前這次調(diào)度中,能夠容忍的最差的本地性級別蛉抓,在后續(xù)步驟的搜索中就只搜索本地性比這個級別好的情況
  3. 隨著時間的推移庆尘,撇開maxLocality配置不談,對于本地性的容忍程度越來越大巷送。

繼續(xù)返回TaskSetManager.resourceOffer中驶忌,獲得taskSet能容忍的最差locality后,與maxLocality比較去較差的locality作為最終的
能容忍的最差locality笑跛。

進(jìn)入第二段:

dequeueTask(execId, host, allowedLocality) match {
  case Some((index, taskLocality, speculative)) => {
    //< 進(jìn)行各種信息更新操作
    
    addRunningTask(taskId)

    // We used to log the time it takes to serialize the task, but task size is already
    // a good proxy to task serialization time.
    // val timeTaken = clock.getTime() - startTime
    val taskName = s"task ${info.id} in stage ${taskSet.id}"
    sched.dagScheduler.taskStarted(task, info)
    return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
      taskName, index, serializedTask))
  }
  case _ =>
}

可以看到付魔,第二段首先調(diào)用了函數(shù)dequeueTask,如果返回不為空飞蹂,說明為指定的worker分配了task几苍;這之后,進(jìn)行各種信息更新晤柄,將taskId加入到runningTask中擦剑,并通知DAGScheduler,最后返回taskDescription芥颈。來看看dequeueTask的實(shí)現(xiàn):

private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value, Boolean)] =
  {
    //< dequeueTaskFromList: 該方法獲取list中一個可以launch的task惠勒,同時清除掃描過的已經(jīng)執(zhí)行的task。其實(shí)它從第二次開始首先掃描的一定是已經(jīng)運(yùn)行完成的task爬坑,因此是延遲清除
    // 同一個Executor纠屋,通過execId來查找相應(yīng)的等待的task
    for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {
      return Some((index, TaskLocality.PROCESS_LOCAL, false))
    }

    // 通過主機(jī)名找到相應(yīng)的Task,不過比之前的多了一步判斷
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
      for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {
        return Some((index, TaskLocality.NODE_LOCAL, false))
      }
    }


    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
      // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
      for (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) {
        return Some((index, TaskLocality.PROCESS_LOCAL, false))
      }
    }

    // 通過Rack的名稱查找Task
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
      for {
        rack <- sched.getRackForHost(host)
        index <- dequeueTaskFromList(execId, getPendingTasksForRack(rack))
      } {
        return Some((index, TaskLocality.RACK_LOCAL, false))
      }
    }

    // 查找那些preferredLocations為空的,不指定在哪里執(zhí)行的Task來執(zhí)行
    if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
      for (index <- dequeueTaskFromList(execId, allPendingTasks)) {
        return Some((index, TaskLocality.ANY, false))
      }
    }

    // find a speculative task if all others tasks have been scheduled
    // 最后沒辦法了盾计,拖的時間太長了售担,只能啟動推測執(zhí)行了
    dequeueSpeculativeTask(execId, host, maxLocality).map {
      case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
  }

從該實(shí)現(xiàn)可以看出,不管之前獲得的能容忍的最差locality(即allowedLocality)有多低署辉,每次dequeueTask都是以PROCESS_LOCAL->...->allowedLocality順序來判斷是否可以以該locality啟動task族铆,而并不是必須以allowedLocality啟動task。這也增大了啟動task的機(jī)會哭尝。

到這里應(yīng)該大致說清楚了Step2中的各個流程哥攘。

Step3: launchTasks( taskDescs )

得到要在哪些worker上啟動哪些task后,將調(diào)用launchTasks來啟動各個task,實(shí)現(xiàn)如下:

def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
    val ser = SparkEnv.get.closureSerializer.newInstance()
    //< 序列化task
    val serializedTask = ser.serialize(task)
    //< 若序列化后的task的size大于等于Akka可用空間
    if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
      val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
      scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
        try {
          var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
            "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
            "spark.akka.frameSize or using broadcast variables for large values."
          msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize, AkkaUtils.reservedSizeBytes)
          //< 中止taskSet逝淹,標(biāo)記為已完成耕姊;同時將該taskSet的狀態(tài)置為isZombie(Zombie:僵尸)
          taskSet.abort(msg)
        } catch {
          case e: Exception => logError("Exception in error callback", e)
        }
      }
    }
    else {
      //< 若序列化后的task的size小于Akka可用空間,減去對應(yīng)executor上的可用cores數(shù)并向?qū)?yīng)的executor發(fā)送啟動task消息
      val executorData = executorDataMap(task.executorId)
      executorData.freeCores -= scheduler.CPUS_PER_TASK
      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
  }
}

邏輯比較簡單栅葡,先對task進(jìn)行序列化茉兰,若序列化后的task的size大于等于akka可用空間大小,則taskSet標(biāo)記為已完成并置為Zombie狀態(tài)欣簇;若序列化后的task的size小于akka可用空間大小规脸,則通過發(fā)送消息給對應(yīng)executor啟動task


歡迎關(guān)注我的微信公眾號:FunnyBigData

FunnyBigData
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市熊咽,隨后出現(xiàn)的幾起案子燃辖,更是在濱河造成了極大的恐慌,老刑警劉巖网棍,帶你破解...
    沈念sama閱讀 206,013評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異妇智,居然都是意外死亡滥玷,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,205評論 2 382
  • 文/潘曉璐 我一進(jìn)店門巍棱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來惑畴,“玉大人,你說我怎么就攤上這事航徙∪绱” “怎么了?”我有些...
    開封第一講書人閱讀 152,370評論 0 342
  • 文/不壞的土叔 我叫張陵到踏,是天一觀的道長杠袱。 經(jīng)常有香客問我,道長窝稿,這世上最難降的妖魔是什么楣富? 我笑而不...
    開封第一講書人閱讀 55,168評論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮伴榔,結(jié)果婚禮上纹蝴,老公的妹妹穿的比我還像新娘。我一直安慰自己踪少,他們只是感情好塘安,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,153評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著援奢,像睡著了一般兼犯。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,954評論 1 283
  • 那天免都,我揣著相機(jī)與錄音锉罐,去河邊找鬼。 笑死绕娘,一個胖子當(dāng)著我的面吹牛脓规,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播险领,決...
    沈念sama閱讀 38,271評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼侨舆,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了绢陌?” 一聲冷哼從身側(cè)響起挨下,我...
    開封第一講書人閱讀 36,916評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎脐湾,沒想到半個月后臭笆,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,382評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡秤掌,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,877評論 2 323
  • 正文 我和宋清朗相戀三年愁铺,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片闻鉴。...
    茶點(diǎn)故事閱讀 37,989評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡茵乱,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出孟岛,到底是詐尸還是另有隱情瓶竭,我是刑警寧澤,帶...
    沈念sama閱讀 33,624評論 4 322
  • 正文 年R本政府宣布渠羞,位于F島的核電站斤贰,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏堵未。R本人自食惡果不足惜腋舌,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,209評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望渗蟹。 院中可真熱鬧块饺,春花似錦、人聲如沸雌芽。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,199評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽世落。三九已至淮腾,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背谷朝。 一陣腳步聲響...
    開封第一講書人閱讀 31,418評論 1 260
  • 我被黑心中介騙來泰國打工洲押, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人圆凰。 一個月前我還...
    沈念sama閱讀 45,401評論 2 352
  • 正文 我出身青樓杈帐,卻偏偏與公主長得像,于是被迫代替她去往敵國和親专钉。 傳聞我的和親對象是個殘疾皇子挑童,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,700評論 2 345

推薦閱讀更多精彩內(nèi)容