spark源碼閱讀之scheduler模塊①

本文基于Spark 1.6.3版本源碼

整體概述

spark的調(diào)度模塊可以說是非常有特色的模塊設(shè)計(jì),使用DAG(有向無環(huán)圖)刻畫spark任務(wù)的邏輯關(guān)系共螺,將任務(wù)切分為多個(gè)stage聂宾,在每個(gè)stage中根據(jù)并行度又分為多個(gè)task悼粮,這多個(gè)Task的計(jì)算邏輯都一樣细诸,然后把封裝好的task提交給executor執(zhí)行得出結(jié)果绿贞。且每個(gè)stage之間以及stage內(nèi)部又存在著依賴關(guān)系褒墨,通過這些依賴關(guān)系構(gòu)成了lineage炫刷,可以提供很好的容錯(cuò)性。

spark調(diào)度模塊中起主導(dǎo)作用的類有三個(gè):DAGScheduler郁妈,TaskScheduler浑玛,SchedulerBackend

DAGScheduler:被稱為high-level scheduling layer(高階調(diào)度層),主要負(fù)責(zé)根據(jù)ShuffleDependency將Job分為多個(gè)stage噩咪,每個(gè)stage中有一組并行的執(zhí)行相同計(jì)算邏輯的Task顾彰,將這組Task的元數(shù)據(jù)封裝成為TaskSets,然后提交給TaskScheduler來執(zhí)行調(diào)度計(jì)算剧腻。

TaskScheduler:被稱作low-level Task scheduler interface(低階的Task調(diào)度接口)拘央,主要的實(shí)現(xiàn)類為TaskSchedulerImpl,主要負(fù)責(zé)在接受到DAGScheduler發(fā)送來的TaskSets后书在,將其提交給集群灰伟,并在執(zhí)行期間出現(xiàn)問題時(shí)重新提交Tasks,最后將結(jié)果events返回給DAGScheduler儒旬。

SchedulerBackend:作為TaskScheduler的后臺(tái)進(jìn)程栏账,負(fù)責(zé)與各種平臺(tái)的cluster manager交互,并為Application申請相應(yīng)的資源栈源,SchedulerBanckend類有多種實(shí)現(xiàn)挡爵,例如Application如果提交給yarn平臺(tái)進(jìn)行資源的管理調(diào)度,則SchedulerBackend對應(yīng)的實(shí)現(xiàn)類為YarnSchedulerBackend甚垦,如果是采用Deploy模式茶鹃,則實(shí)現(xiàn)類為SparkDeploySchedulerBackend。

以下源碼分析均是基于Deploy模式艰亮,其他模式在SchedulerBackend實(shí)現(xiàn)上略有不同闭翩,不過其調(diào)度原理和實(shí)現(xiàn)都是一樣的。

三個(gè)重要類實(shí)例的初始化及其之間的關(guān)系

我們可以從SparkContext的初始化入手來分析以上三個(gè)重要類的初始化迄埃,當(dāng)提交Application后疗韵,spark會(huì)首先初始化SparkContext實(shí)例并創(chuàng)建driver,來看一下SparkContext中實(shí)例化三個(gè)重要類的代碼:

val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

其中TaskScheduler和SchedulerBackend是根據(jù)傳入的master進(jìn)行模式匹配得出的侄非,不同的平臺(tái)有不同的實(shí)現(xiàn)蕉汪,而DAGScheduler是直接new出來的,且DAGScheduler實(shí)例中持有TaskScheduler的引用逞怨,這一點(diǎn)可以從DAGScheduler的構(gòu)造代碼中看出:

def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
  this(
    sc,
    taskScheduler,
    sc.listenerBus,
    sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
    sc.env.blockManager.master,
    sc.env)
}
提交Job

通過上述源碼可知者疤,在Application提交之前,SparkContext實(shí)例化的過程中叠赦,就已經(jīng)實(shí)例好了_schedulerBackend 宛渐,_taskScheduler,_dagScheduler這三個(gè)實(shí)例,那么接下來窥翩,我們通過active操作count方法的代碼來看一下Job是如何提交的:

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

runJob方法最終調(diào)用的是dagScheduler的runJob方法:

def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  rdd.doCheckpoint()
}

在DAGScheduler的runJob方法中业岁,生成了一個(gè)JobWaiter實(shí)例來監(jiān)聽Job的執(zhí)行情況,只有當(dāng)Job中的所有Task全都成功完成寇蚊,Job才會(huì)被標(biāo)記成功:

def runJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {
  val start = System.nanoTime
  //生成一個(gè)JobWaiter的實(shí)例來監(jiān)聽Job的執(zhí)行情況笔时,只有當(dāng)Job中的所有的Task全都成功完成,Job才會(huì)被標(biāo)記成功
  val waiter: JobWaiter[U] = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
  waiter.awaitResult() match {
    case JobSucceeded =>
      logInfo("Job %d finished: %s, took %f s".format
        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
    case JobFailed(exception: Exception) =>
      logInfo("Job %d failed: %s, took %f s".format
        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
      val callerStackTrace = Thread.currentThread().getStackTrace.tail
      exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
      throw exception
  }
}

在submitJob方法中首先創(chuàng)建了JobWaiter實(shí)例仗岸,并且通過eventProcessLoop來發(fā)送JobSubmitted消息允耿,這個(gè)eventProcessLoop使用來監(jiān)聽DAGScheduler自身的一些消息,在實(shí)例化DAGScheduler時(shí)創(chuàng)建該實(shí)例

def submitJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): JobWaiter[U] = {
  // Check to make sure we are not launching a task on a partition that does not exist.
  val maxPartitions = rdd.partitions.length
  partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
    throw new IllegalArgumentException(
      "Attempting to access a non-existent partition: " + p + ". " +
        "Total number of partitions: " + maxPartitions)
  }
  val jobId = nextJobId.getAndIncrement()   //獲取JobId
  if (partitions.size == 0) {
    // Return immediately if the job is running 0 tasks
    return new JobWaiter[U](this, jobId, 0, resultHandler)
  }
  assert(partitions.size > 0)
  val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
  // 生成一個(gè)JobWaiter的實(shí)例來監(jiān)聽Job的執(zhí)行情況扒怖,只有當(dāng)Job中的所有的Task全都成功完成较锡,Job才會(huì)被標(biāo)記成功
  val waiter: JobWaiter[U] = new JobWaiter(this, jobId, partitions.size, resultHandler)
  // DAGSchedulerEventProcessLoop這個(gè)實(shí)例的主要職責(zé)是調(diào)用DAGScheduler的相應(yīng)方法來處理DAGScheduler發(fā)送給他的各種消息,起監(jiān)督Job的作用
  eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    SerializationUtils.clone(properties)))    //DAGScheduler向eventProcessLoop提交該Job盗痒,最終調(diào)用eventProcessLoop的run方法來處理請求
  waiter
}

eventProcessLoop最終調(diào)用其doOnReceive方法來處理所有的Event:

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    //如果提交的是一個(gè)JobSubmitted的Event蚂蕴,那么調(diào)用handleJobSubmitted方法來處理這個(gè)請求
  case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

  case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
    dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

  ...
}

到這里,Job就已經(jīng)提交了俯邓,接下來是對Job提交的處理骡楼,即DAGScheduler的最主要的功能:劃分stage

劃分stage

我們來看DAGScheduler的handleJobSubmitted方法代碼,其中是如何劃分stage的稽鞭,我們分為幾段來看

var finalStage: ResultStage = null
try {
  // New stage creation may throw an exception if, for example, jobs are run on a
  // HadoopRDD whose underlying HDFS files have been deleted.
  // 首先調(diào)用newResultStage方法來創(chuàng)建finalStage
  finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
  case e: Exception =>
    logWarning("Creating new stage failed due to exception - job: " + jobId, e)
    listener.jobFailed(e)
    return
}

我們可以看到鸟整,DAGShceduler首先創(chuàng)建最后一個(gè)stage:finalStage,我們看一看newResultStage方法:

private def newResultStage( //創(chuàng)建最后一個(gè)stage的方法
    rdd: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    jobId: Int,
    callSite: CallSite): ResultStage = {
  //通過調(diào)用getParentStagesAndId方法來劃分stage朦蕴,傳入最后一個(gè)RDD和JobId
  val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
  val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
  stageIdToStage(id) = stage
  updateJobIdStageIdMaps(jobId, stage)
  stage
}

在創(chuàng)建finalStage的時(shí)候需要傳入其parentStages篮条,這也是構(gòu)成DAG調(diào)度計(jì)劃的一個(gè)重要部分,看其實(shí)現(xiàn)

private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
  val parentStages: List[Stage] = getParentStages(rdd, firstJobId)   //找到parentStages
  val id = nextStageId.getAndIncrement()    //nextStageId是一個(gè)AtomicInteger吩抓,自增1
  (parentStages, id)    //返回parentStages的序列和對應(yīng)的Id
}

其中調(diào)用了getParentStages方法涉茧,在getParentStages中實(shí)現(xiàn)了遞歸調(diào)用,返回的是Stage的List

private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
  val parents = new HashSet[Stage]    //parents序列
  val visited = new HashSet[RDD[_]]   //已經(jīng)被訪問的RDD
  // We are manually maintaining a stack here to prevent StackOverflowError
  // caused by recursively visiting
  val waitingForVisit = new Stack[RDD[_]]   //需要被處理的RDD棧
  def visit(r: RDD[_]) {
    if (!visited(r)) {    //如果棧中的RDD不在被訪問的序列中琴拧,則加進(jìn)去
      visited += r
      // Kind of ugly: need to register RDDs with the cache here since
      // we can't do it in its constructor because # of partitions is unknown
      for (dep <- r.dependencies) {   //遍歷這個(gè)RDD的dependencies
        dep match {
          case shufDep: ShuffleDependency[_, _, _] =>   //如果匹配到是ShuffleDependency
            parents += getShuffleMapStage(shufDep, firstJobId)    //調(diào)用getShuffleMapStage方法生成一個(gè)stage加入到parents序列中
          case _ =>   //如果是窄依賴將訪問dep對應(yīng)的RDD壓入待訪問棧(這里的RDD應(yīng)該是之前一個(gè)RDD的父RDD,相當(dāng)于實(shí)現(xiàn)了一個(gè)遞歸)
            waitingForVisit.push(dep.rdd)
        }
      }
    }
  }
  waitingForVisit.push(rdd) //將最后一個(gè)RDD放入待訪問棧
  while (waitingForVisit.nonEmpty) {
    visit(waitingForVisit.pop())    //如果需要被處理的RDD棧不為空嘱支,則調(diào)用visit方法取出里棧中的RDD
  }
  parents.toList

以上代碼中可以看出蚓胸,劃分stage的依據(jù)是shuffleDependency,以上代碼的精彩之處在于自建了一個(gè)待訪問棧:waitingForVisit除师,通過出棧入棧以及RDD之間的Dependency實(shí)現(xiàn)了一個(gè)遞歸調(diào)用沛膳,體現(xiàn)了spark源碼的優(yōu)雅之處。其中當(dāng)遇到ShuffleDependency的時(shí)候汛聚,調(diào)用getShuffleMapStage方法創(chuàng)建了新的Stage锹安,我們來看一下這個(gè)方法:

private def getShuffleMapStage(
    shuffleDep: ShuffleDependency[_, _, _],
    firstJobId: Int): ShuffleMapStage = {
  shuffleToMapStage.get(shuffleDep.shuffleId) match {
    case Some(stage) => stage   //存在就獲取
    case None =>    //不存在就創(chuàng)建
      // We are going to register ancestor shuffle dependencies
      // 將對應(yīng)的RDD再調(diào)用getAncestorShuffleDependencies方法注冊其祖先的依賴,負(fù)責(zé)確認(rèn)這個(gè)stage它的parentStage是否已經(jīng)生成
      getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
        //拿到還沒有注冊的stage序列遍歷,調(diào)用newOrUsedShuffleStage方法注冊到shuffleToMapStage中
        shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
      }
      // Then register current shuffleDep
      val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
      shuffleToMapStage(shuffleDep.shuffleId) = stage
      stage
  }
}

以上方法中叹哭,維護(hù)了一個(gè)shuffleToMapStage集合忍宋,存有shuffleId和ShuffleMapStage的映射,根據(jù)傳入的shuffleDep风罩,如果存在就返回糠排,如果不存在就創(chuàng)建,其中g(shù)etAncestorShuffleDependencies方法是為了找到那些沒有被注冊到shuffleToMapStage集合的Stage超升,其中遞歸調(diào)用的模樣像極了getParentStages方法入宦,而newOrUsedShuffleStage則是創(chuàng)建shuffle map stage的方法,來看一下newOrUsedShuffleStage

/**
  * 根據(jù)傳入的Dep對應(yīng)的RDD創(chuàng)建一個(gè)shuffle map stage室琢,這個(gè)stage會(huì)包含傳入的JobID
  * 如果這個(gè)stage之前已經(jīng)存在于MapOutputTracker中乾闰,那么會(huì)覆蓋
  */
private def newOrUsedShuffleStage(
    shuffleDep: ShuffleDependency[_, _, _],
    firstJobId: Int): ShuffleMapStage = {
  val rdd = shuffleDep.rdd
  val numTasks = rdd.partitions.length    //這個(gè)RDD的partitions的數(shù)量就是task的數(shù)量
  val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)   //創(chuàng)建stage
  if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {   //如果mapOutputTracker中已經(jīng)存在這個(gè)shuffleDep
    val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) //把之前的元數(shù)據(jù)信息提取出來
    val locs = MapOutputTracker.deserializeMapStatuses(serLocs)   //修改覆蓋
    (0 until locs.length).foreach { i =>
      if (locs(i) ne null) {
        // locs(i) will be null if missing
        stage.addOutputLoc(i, locs(i))
      }
    }
  } else {    //如果沒有,就直接注冊進(jìn)去
    // Kind of ugly: need to register RDDs with the cache and map output tracker here
    // since we can't do it in the RDD constructor because # of partitions is unknown
    logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
  }
  stage
}

以上代碼中盈滴,首先調(diào)用了newShuffleMapStage方法創(chuàng)建了ShuffleMapStage涯肩,其次由于是ShuffleMapStage,存在shuffle的過程雹熬,會(huì)有中間數(shù)據(jù)落地的過程宽菜,所以需要重新注冊修改一下mapOutputTracker,mapOutputTracker是用來管理map端輸出的竿报。其中newShuffleMapStage方法和newResultStage方法如出一轍铅乡,首先調(diào)用getParentStagesAndId方法獲取parentStage,然后創(chuàng)建ShuffleMapStage實(shí)例

private def newShuffleMapStage(
    rdd: RDD[_],
    numTasks: Int,
    shuffleDep: ShuffleDependency[_, _, _],
    firstJobId: Int,
    callSite: CallSite): ShuffleMapStage = {
  val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId)
  val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
    firstJobId, callSite, shuffleDep)
  stageIdToStage(id) = stage
  updateJobIdStageIdMaps(firstJobId, stage)
  stage
}

在方法最后調(diào)用updateJobIdStageIdMaps將新建的stage的stageId與JobId聯(lián)系起來烈菌。

以上這些方法中阵幸,我們首先創(chuàng)建了finalStage,然后通過RDD之間的Dependency芽世,采用遞歸調(diào)用的方法挚赊,找出了這個(gè)finalStage的parentStages隊(duì)列,并維護(hù)到相關(guān)的數(shù)據(jù)結(jié)構(gòu)中济瓢。


下面我們來看一下荠割,如何提交上面創(chuàng)建的這些Stages

我們回到handleJobSubmitted,看一下finalStage創(chuàng)建完成后的代碼

// 拿到finalStage之后就可以創(chuàng)建job了
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()    //清空taskLocation的緩存
logInfo("Got job %s (%s) with %d output partitions".format(
  job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job   //jobId與job的映射放入集合中
activeJobs += job   //job加入activeJobs中
finalStage.setActiveJob(job)    //將finalStage的activeJob屬性指定為當(dāng)前job
val stageIds: Array[Int] = jobIdToStageIds(jobId).toArray   //根據(jù)jobId取出對應(yīng)的stageIds
//根據(jù)stageIds取出stage的lastestInfo
val stageInfos: Array[StageInfo] = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
  SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)   //提交finalStage
submitWaitingStages()   //提交waiting隊(duì)列的stages

首先創(chuàng)建了Job實(shí)例旺矾,并維護(hù)了相關(guān)的數(shù)據(jù)結(jié)構(gòu)蔑鹦,最后調(diào)用submitStage方法并傳入了finalStage,我們來看這個(gè)submitStage的具體實(shí)現(xiàn)

/** Submits stage, but first recursively submits any missing parents. */
// 提交這個(gè)stage箕宙,首先遞歸的提交它的missing parents
private def submitStage(stage: Stage) {
  val jobId = activeJobForStage(stage)    //拿到stage對應(yīng)的jobId
  if (jobId.isDefined) {    //如果不為空
    logDebug("submitStage(" + stage + ")")
    // 如果這個(gè)stage不在waiting嚎朽、running、failed隊(duì)列中
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      val missing: List[Stage] = getMissingParentStages(stage).sortBy(_.id)    //找到這個(gè)stage的missing parent stages
      logDebug("missing: " + missing)
      if (missing.isEmpty) {    //如果有未提交的parentStages柬帕,那么遞歸的提交它的missing parent stages哟忍, 最后提交這個(gè)stage
        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
        submitMissingTasks(stage, jobId.get)    //這個(gè)方法會(huì)完成DAGScheduler最后的工作
      } else {
        for (parent <- missing) {
          submitStage(parent)   //這里實(shí)現(xiàn)遞歸
        }
        waitingStages += stage
      }
    }
  } else {
    abortStage(stage, "No active job for stage " + stage.id, None)
  }
}

在這個(gè)方法中我們又看到了遞歸調(diào)用的精妙之處狡门,對傳入的finalStage,首先確認(rèn)其有沒有未提交的parentStages锅很,如果有首先提交其parentStage其馏,而當(dāng)前的Stage就會(huì)被放入waitingStages中,通過submitWaitingStages方法來調(diào)用粗蔚,針對每一個(gè)提交的Stage調(diào)用submitMissingTasks來完成最后的工作

封裝Tasks

通過以上的方法尝偎,finalStage以及其parentStages都已經(jīng)遞歸提交了,通過submitMissingTasks這個(gè)方法鹏控,我們可以得知提交的Stage都做了什么操作致扯,submitMissingTasks方法代碼較長,首先針對傳入的Stages維護(hù)了像runningStages当辐、outputCommitCoordinator等數(shù)據(jù)結(jié)構(gòu)抖僵,我們截選關(guān)鍵部分來看:

// 這里取到了Tasks的序列
val tasks: Seq[Task[_]] = try {
  stage match {
    case stage: ShuffleMapStage =>
      partitionsToCompute.map { id =>
        val locs = taskIdToLocations(id)
        val part = stage.rdd.partitions(id)
        new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
          taskBinary, part, locs, stage.internalAccumulators)
      }
    case stage: ResultStage =>
      val job = stage.activeJob.get
      partitionsToCompute.map { id =>
        val p: Int = stage.partitions(id)
        val part = stage.rdd.partitions(p)
        val locs = taskIdToLocations(id)
        new ResultTask(stage.id, stage.latestInfo.attemptId,
          taskBinary, part, locs, id, stage.internalAccumulators)
      }
  }
} catch {
  case NonFatal(e) =>
    abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceS
    runningStages -= stage
    return
}

這里對傳入的Stages進(jìn)行模式匹配,如果是ResultStage即finalStage缘揪,那么創(chuàng)建ResultTask耍群,如果是ShuffleMapStage ,則創(chuàng)建ShuffleMapTask找筝,接著看下面的代碼:

// 如果tasks序列不為空蹈垢,那么封裝成TaskSet,走你袖裕,接下來看taskScheduler的了
if (tasks.size > 0) {
  logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
  stage.pendingPartitions ++= tasks.map(_.partitionId)
  logDebug("New pending partitions: " + stage.pendingPartitions)
  taskScheduler.submitTasks(new TaskSet(
    tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
  stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
  // Because we posted SparkListenerStageSubmitted earlier, we should mark
  // the stage as completed here in case there are no tasks to run
  markStageAsFinished(stage, None)
  val debugString = stage match {
    case stage: ShuffleMapStage =>
      s"Stage ${stage} is actually done; " +
        s"(available: ${stage.isAvailable}," +
        s"available outputs: ${stage.numAvailableOutputs}," +
        s"partitions: ${stage.numPartitions})"
    case stage : ResultStage =>
      s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
  }
  logDebug(debugString)
}

可以看到曹抬,這里將上一步創(chuàng)建的Tasks實(shí)例封裝成為TaskSet,然后調(diào)用TaskScheduler的submitTasks方法提交給集群急鳄,至此DAGScheduler的任務(wù)已經(jīng)圓滿結(jié)束谤民,它剩下的工作僅是通過eventProcessLoop來監(jiān)聽TaskScheduler返回的一些信息,這也是DAGScheduler實(shí)例中持有TaskScheduler引用的原因疾宏。

下一篇文章中我們繼續(xù)分析TaskScheduler在提交Tasks時(shí)做了哪些操作张足,且SchedulerBackend是如何在調(diào)度資源的分配上做到公平公正的,敬請期待坎藐!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末为牍,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子岩馍,更是在濱河造成了極大的恐慌碉咆,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,332評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件兼雄,死亡現(xiàn)場離奇詭異吟逝,居然都是意外死亡帽蝶,警方通過查閱死者的電腦和手機(jī)赦肋,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,508評論 3 385
  • 文/潘曉璐 我一進(jìn)店門块攒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人佃乘,你說我怎么就攤上這事囱井。” “怎么了趣避?”我有些...
    開封第一講書人閱讀 157,812評論 0 348
  • 文/不壞的土叔 我叫張陵庞呕,是天一觀的道長。 經(jīng)常有香客問我程帕,道長住练,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,607評論 1 284
  • 正文 為了忘掉前任愁拭,我火速辦了婚禮讲逛,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘岭埠。我一直安慰自己盏混,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,728評論 6 386
  • 文/花漫 我一把揭開白布惜论。 她就那樣靜靜地躺著许赃,像睡著了一般。 火紅的嫁衣襯著肌膚如雪馆类。 梳的紋絲不亂的頭發(fā)上混聊,一...
    開封第一講書人閱讀 49,919評論 1 290
  • 那天,我揣著相機(jī)與錄音蹦掐,去河邊找鬼技羔。 笑死,一個(gè)胖子當(dāng)著我的面吹牛卧抗,可吹牛的內(nèi)容都是我干的藤滥。 我是一名探鬼主播,決...
    沈念sama閱讀 39,071評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼社裆,長吁一口氣:“原來是場噩夢啊……” “哼拙绊!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起泳秀,我...
    開封第一講書人閱讀 37,802評論 0 268
  • 序言:老撾萬榮一對情侶失蹤标沪,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后嗜傅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體金句,經(jīng)...
    沈念sama閱讀 44,256評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,576評論 2 327
  • 正文 我和宋清朗相戀三年吕嘀,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了违寞。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片贞瞒。...
    茶點(diǎn)故事閱讀 38,712評論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖趁曼,靈堂內(nèi)的尸體忽然破棺而出军浆,到底是詐尸還是另有隱情,我是刑警寧澤挡闰,帶...
    沈念sama閱讀 34,389評論 4 332
  • 正文 年R本政府宣布乒融,位于F島的核電站,受9級特大地震影響摄悯,放射性物質(zhì)發(fā)生泄漏赞季。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,032評論 3 316
  • 文/蒙蒙 一奢驯、第九天 我趴在偏房一處隱蔽的房頂上張望碟摆。 院中可真熱鬧,春花似錦叨橱、人聲如沸典蜕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽愉舔。三九已至,卻和暖如春伙菜,著一層夾襖步出監(jiān)牢的瞬間轩缤,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,026評論 1 266
  • 我被黑心中介騙來泰國打工贩绕, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留火的,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,473評論 2 360
  • 正文 我出身青樓淑倾,卻偏偏與公主長得像馏鹤,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子娇哆,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,606評論 2 350

推薦閱讀更多精彩內(nèi)容