Spark-Core源碼精讀(11)凶伙、Stage的劃分過程和Task數(shù)據(jù)本地性

本文將主要討論兩個Topic:Stage的劃分過程和Task數(shù)據(jù)本地性

引子

前面的文章中我們已經(jīng)分析了Spark應用程序即Application的注冊以及Executors的啟動注冊流程慕爬,即計算資源已經(jīng)分配完成(粗粒度的資源分配方式),換句話說Driver端的代碼已經(jīng)運行完成(SparkConf、SparkContext)暇昂,接下來就是運行用戶編寫的業(yè)務邏輯代碼舶得。

圖片來自Databricks的Spark-Essentials-SSW2016-TE1

Spark中對RDD的操作大體上可以分為transformation級別的操作和action級別的操作掰烟,transformation是lazy級別的操作,action操作(count、collect等)會觸發(fā)具體job的執(zhí)行纫骑,而每個job又會被劃分成一個或者多個Stage蝎亚,后面的Stage會依賴前面的Stage,而Stage劃分的依據(jù)就是是否為寬依賴(Spark中RDD的依賴關系分成寬依賴和窄依賴)先馆,所有的Stage會形成一個有向無環(huán)圖(DAG)发框,最后依據(jù)Task的數(shù)據(jù)本地性將Task發(fā)送到指定的Executor上運行,下面我們就詳細分析這一過程煤墙。

Stage的劃分

首先從一個Action級別的操作開始梅惯,此處以collect為例:

def collect(): Array[T] = withScope {
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}

可以看到執(zhí)行了SparkContext的runJob()方法:

def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  rdd.doCheckpoint()
}

內(nèi)部調(diào)用了DAGScheduler的runJob方法:

def runJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {
  // 記錄Job的開始時間
  val start = System.nanoTime
  val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
  waiter.awaitResult() match {
    case JobSucceeded =>
      logInfo("Job %d finished: %s, took %f s".format
        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
    case JobFailed(exception: Exception) =>
      logInfo("Job %d failed: %s, took %f s".format
        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
      val callerStackTrace = Thread.currentThread().getStackTrace.tail
      exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
      throw exception
  }
}

內(nèi)部調(diào)用了submitJob方法用來向scheduler提交Job,并返回一個JobWaiter仿野,使用JobWaiter的awaitResult()方法來等待DAGScheduler執(zhí)行完成铣减,并且當tasks執(zhí)行完畢后將執(zhí)行的結果返回給具體的resultHandler,下面我們就來看一下submitJob()方法:

def submitJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): JobWaiter[U] = {
  // Check to make sure we are not launching a task on a partition that does not exist.
  val maxPartitions = rdd.partitions.length
  partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
    throw new IllegalArgumentException(
      "Attempting to access a non-existent partition: " + p + ". " +
        "Total number of partitions: " + maxPartitions)
  }
  // 獲得jobId
  val jobId = nextJobId.getAndIncrement()
  if (partitions.size == 0) {
    // Return immediately if the job is running 0 tasks
    return new JobWaiter[U](this, jobId, 0, resultHandler)
  }
  assert(partitions.size > 0)
  val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
  // 實例化JobWaiter
  val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
  eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    SerializationUtils.clone(properties)))
  // 可以看到脚作,確實是返回了JobWaiter
  waiter
}

DAGScheduler內(nèi)部有一個DAGSchedulerEventProcessLoop(消息循環(huán)器)徙歼,他繼承自EventLoop,簡單的說每個消息循環(huán)器中會有一個消息隊列:LinkedBlockingDeque[E]鳖枕,E就代表具體EventLoop子類所要處理的消息的類型魄梯,DAGSchedulerEventProcessLoop具體處理的消息類型是DAGSchedulerEvent(當然他有許多子類型),每個消息循環(huán)器中會開辟一條新的線程來循環(huán)處理消息隊列中的消息宾符,DAGScheduler實例化的時候會創(chuàng)建一個消息循環(huán)器(eventProcessLoop)酿秸,并調(diào)用了eventProcessLoop的start方法(這個方法的調(diào)用隱藏在DAGScheduler類的最后一行),而start方法的作用就是開啟上面提到的那條線程開始處理消息隊列中的消息魏烫,當我們使用eventProcessLoop的post方法將JobSubmitted(該消息的類型就繼承自DAGSchedulerEvent)消息放入到消息隊列中后辣苏,消息循環(huán)器中的線程會從隊列中拿出這條消息,然后執(zhí)行消息循環(huán)器的onReceive(event)方法哄褒,而在DAGSchedulerEventProcessLoop中onReceive方法內(nèi)部執(zhí)行的是doOnReceive方法:

override def onReceive(event: DAGSchedulerEvent): Unit = {
  val timerContext = timer.time()
  try {
    doOnReceive(event)
  } finally {
    timerContext.stop()
  }
}

而doOnReceive方法在接收到具體的event后會用模式匹配來匹配收到的消息的具體類型稀蟋,這里接收到的是JobSubmitted類型的消息,處理如下:

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
  case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

  case ...
}

可以看到執(zhí)行的是DAGScheduler的handleJobSubmitted方法:

private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) {
  var finalStage: ResultStage = null
  try {
    // New stage creation may throw an exception if, for example, jobs are run on a
    // HadoopRDD whose underlying HDFS files have been deleted.
    // 實例化ResultStage
    finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
  } catch {
    case e: Exception =>
      logWarning("Creating new stage failed due to exception - job: " + jobId, e)
      listener.jobFailed(e)
      return
  }
  val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
  clearCacheLocs()
  logInfo("Got job %s (%s) with %d output partitions".format(
    job.jobId, callSite.shortForm, partitions.length))
  logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
  logInfo("Parents of final stage: " + finalStage.parents)
  logInfo("Missing parents: " + getMissingParentStages(finalStage))
  val jobSubmissionTime = clock.getTimeMillis()
  jobIdToActiveJob(jobId) = job
  activeJobs += job
  finalStage.setActiveJob(job)
  val stageIds = jobIdToStageIds(jobId).toArray
  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
  listenerBus.post(
    SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
  submitStage(finalStage)
  submitWaitingStages()
}

下面我們逐步分析:

private def newResultStage(
    rdd: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    jobId: Int,
    callSite: CallSite): ResultStage = {
  val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
  val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
  stageIdToStage(id) = stage
  updateJobIdStageIdMaps(jobId, stage)
  stage
}

getParentStagesAndId

private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
  val parentStages = getParentStages(rdd, firstJobId)
  val id = nextStageId.getAndIncrement()
  (parentStages, id)
}

getParentStages

private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
  // 實例化一個空的集合用來存儲ResultStage的父Stage
  val parents = new HashSet[Stage]
  // 實例化一個空的集合用來存儲已經(jīng)遍歷過的RDD
  val visited = new HashSet[RDD[_]]
  // We are manually maintaining a stack here to prevent StackOverflowError
  // caused by recursively visiting
  // 防止遞歸調(diào)用的時候出現(xiàn)StackOverflowError異常
  val waitingForVisit = new Stack[RDD[_]]
  def visit(r: RDD[_]) {
    if (!visited(r)) {
      visited += r
      // Kind of ugly: need to register RDDs with the cache here since
      // we can't do it in its constructor because # of partitions is unknown
      // 循環(huán)遍歷RDD的所有依賴關系
      for (dep <- r.dependencies) {
        dep match {
          // 如果是寬依賴
          case shufDep: ShuffleDependency[_, _, _] =>
            parents += getShuffleMapStage(shufDep, firstJobId)
          // 如果是窄依賴就將依賴的RDD壓入到waitingForVisit棧中繼續(xù)便利
          case _ =>
            waitingForVisit.push(dep.rdd)
        }
      }
    }
  }
  // 將最后一個RDD放入到等待visit的Stack中
  waitingForVisit.push(rdd)
  while (waitingForVisit.nonEmpty) {
    // 如果waitingForVisit不為空就將棧頂?shù)腞DD彈出呐赡,并使用上面定義的visit()進行處理
    visit(waitingForVisit.pop())
  }
  // 最后返回ResultStage的所有父Stages組成的List
  parents.toList
}

首先判斷該RDD(也就是最后一個RDD)和依賴的父RDD之間是寬依賴(ShuffleDependency)還是窄依賴(NarrowDependency的子類)退客。可以看到這里是通過RDD的dependencies方法來獲取依賴關系的:

final def dependencies: Seq[Dependency[_]] = {
  checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
    if (dependencies_ == null) {
      dependencies_ = getDependencies
    }
    dependencies_
  }
}

因為是第一次提交链嘀,所以直接調(diào)用getDependencies方法萌狂,而map操作產(chǎn)生的MapPartitionsRDD并沒有復寫該方法,所以調(diào)用的是抽象類RDD的getDependencies方法:

protected def getDependencies: Seq[Dependency[_]] = deps

而這里的deps就是默認構造方法中的deps:

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {

而重載的構造方法中默認就是實例化了OneToOneDependency即默認是窄依賴的:

def this(@transient oneParent: RDD[_]) =
  this(oneParent.context , List(new OneToOneDependency(oneParent)))

再例如reduceByKey操作最后生成的是ShuffleRDD怀泊,而ShuffleRDD復寫了getDependencies方法:

override def getDependencies: Seq[Dependency[_]] = {
  List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
}

很明顯可以看出是寬依賴茫藏。

讓我們再重新回到之前的判斷中,如果是窄依賴就把依賴的RDD壓入到棧中霹琼;如果是寬依賴就使用getShuffleMapStage方法獲得父Stage并放入到parents中务傲,下面來看getShuffleMapStage方法:

private def getShuffleMapStage(
    shuffleDep: ShuffleDependency[_, _, _],
    firstJobId: Int): ShuffleMapStage = {
  shuffleToMapStage.get(shuffleDep.shuffleId) match {
    // 首先根據(jù)shuffleId判斷shuffleToMapStage是否存在Stage凉当,如果存在就直接返回
    case Some(stage) => stage
    case None =>
      // We are going to register ancestor shuffle dependencies
      getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
        shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
      }
      // Then register current shuffleDep
      val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
      shuffleToMapStage(shuffleDep.shuffleId) = stage
      stage
  }
}

首先根據(jù)shuffleId判斷shuffleToMapStage是否存在Stage,如果存在就直接返回售葡;如果不存在看杭,也就是說還沒有注冊到shuffleToMapStage中,會執(zhí)行如下兩個步驟:

  • 獲得祖先的依賴關系為寬依賴的依賴(從右向左查找)然后一次創(chuàng)建并向shuffleToMapStage中注冊ShuffleStage(從左向右創(chuàng)建)
  • 創(chuàng)建并注冊當前的Shuffle Stage

我們先來看getAncestorShuffleDependencies這個方法:

private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
  val parents = new Stack[ShuffleDependency[_, _, _]]
  val visited = new HashSet[RDD[_]]
  // We are manually maintaining a stack here to prevent StackOverflowError
  // caused by recursively visiting
  val waitingForVisit = new Stack[RDD[_]]
  def visit(r: RDD[_]) {
    if (!visited(r)) {
      visited += r
      for (dep <- r.dependencies) {
        dep match {
          case shufDep: ShuffleDependency[_, _, _] =>
            if (!shuffleToMapStage.contains(shufDep.shuffleId)) {
              parents.push(shufDep)
            }
          case _ =>
        }
        waitingForVisit.push(dep.rdd)
      }
    }
  }
  waitingForVisit.push(rdd)
  while (waitingForVisit.nonEmpty) {
    visit(waitingForVisit.pop())
  }
  parents
}

和getParentStages類似天通,不同的是這里的parents保存的是父依賴關系中的ShuffleDependency,下面就是循環(huán)遍歷這些ShuffleDependency組成的集合執(zhí)行newOrUsedShuffleStage(dep, firstJobId)操作熄驼,dep就是集合中的一個ShuffleDependency像寒,下面我們來看這個newOrUsedShuffleStage方法:

private def newOrUsedShuffleStage(
    shuffleDep: ShuffleDependency[_, _, _],
    firstJobId: Int): ShuffleMapStage = {
  // 即將創(chuàng)建的Stage的最后一個RDD,也就是最右側的RDD
  val rdd = shuffleDep.rdd
  // Tasks的個數(shù)瓜贾,由此可見诺祸,Stage的并行度是由該Stage內(nèi)的最后一個RDD的partitions的個數(shù)所決定的
  val numTasks = rdd.partitions.length
  // 實例化ShuffleMapStage
  val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)
  if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
    val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
    val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
    (0 until locs.length).foreach { i =>
      if (locs(i) ne null) {
        // locs(i) will be null if missing
        stage.addOutputLoc(i, locs(i))
      }
    }
  } else {
    // Kind of ugly: need to register RDDs with the cache and map output tracker here
    // since we can't do it in the RDD constructor because # of partitions is unknown
    logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
  }
  stage
}

下面我們看一下newShuffleMapStage的具體實現(xiàn):

private def newShuffleMapStage(
    rdd: RDD[_],
    numTasks: Int,
    shuffleDep: ShuffleDependency[_, _, _],
    firstJobId: Int,
    callSite: CallSite): ShuffleMapStage = {
  // 可以看到此處又是調(diào)用的getParentStagesAndId函數(shù),然后重復上述的步驟
  val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId)
  val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
    firstJobId, callSite, shuffleDep)
  stageIdToStage(id) = stage
  updateJobIdStageIdMaps(firstJobId, stage)
  stage
}

可以看到內(nèi)部又調(diào)用了getParentStagesAndId函數(shù)祭芦,然后再次重復上述的步驟筷笨,至此我們可以得出一個結論:即最左側的Stage最先被創(chuàng)建,然后從左向右一次創(chuàng)建各個Stage(左邊的Stage的id比右邊的小龟劲,從0開始)胃夏,并且后面的Stage保存有上一個Stage(也可能是多個)的引用。然后我們回到newOrUsedShuffleStage方法中(限于篇幅不再重復貼出上面的源碼昌跌,大家可以使用自己的IDEA或者Eclipse查看源碼仰禀,或者向上滑動至newOrUsedShuffleStage方法的部分),下面就是MapOutputTracker相關的邏輯代碼蚕愤,這里先簡單的提議下答恶,以后會專門對MapOutputTracker進行分析,下面就是補充的內(nèi)容:

簡單的說就是:后面的Task可以通過Driver端的MapOutputTracker也就是MapOutputTrackerMaster獲得ShuffleMapTask的運行結果的元數(shù)據(jù)信息(包括數(shù)據(jù)存放的位置萍诱、大小等)悬嗓,然后根據(jù)獲得的元數(shù)據(jù)信息獲取需要處理的數(shù)據(jù),而這里的邏輯大家可以看成是對這些元數(shù)據(jù)信息的占位的作用

然后將最后一個ShuffleStage(為什么是最后一個ShuffleStage裕坊,因為我們上面已經(jīng)得出了結論包竹,Stage是從左向右一次創(chuàng)建的,所以這里是最后一個ShuffleStage籍凝,當然也可能是多個)最終返回到newResultStage方法中映企,由于離得太遠,我們再次貼出newResultStage方法的源碼:

private def newResultStage(
    rdd: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    jobId: Int,
    callSite: CallSite): ResultStage = {
  val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
  val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
  stageIdToStage(id) = stage
  updateJobIdStageIdMaps(jobId, stage)
  stage
}

下面終于到了創(chuàng)建ResultStage這一步了静浴,創(chuàng)建完成ResultStage后將其返回給handleJobSubmitted方法中的finalStage堰氓,至此一個完整的DAG(有向無環(huán)圖)就正式完成了。

Task數(shù)據(jù)本地性算法

我們繼續(xù)跟蹤handleJobSubmitted方法:

private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) {
  var finalStage: ResultStage = null
  try {
    // New stage creation may throw an exception if, for example, jobs are run on a
    // HadoopRDD whose underlying HDFS files have been deleted.
    finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
  } catch {
    case e: Exception =>
      logWarning("Creating new stage failed due to exception - job: " + jobId, e)
      listener.jobFailed(e)
      return
  }
  val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
  clearCacheLocs()
  // 打日志
  logInfo("Got job %s (%s) with %d output partitions".format(
    job.jobId, callSite.shortForm, partitions.length))
  logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
  logInfo("Parents of final stage: " + finalStage.parents)
  logInfo("Missing parents: " + getMissingParentStages(finalStage))
  // 記錄job提交的事件
  val jobSubmissionTime = clock.getTimeMillis()
  jobIdToActiveJob(jobId) = job
  activeJobs += job
  finalStage.setActiveJob(job)
  val stageIds = jobIdToStageIds(jobId).toArray
  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
  listenerBus.post(
    SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
  // 提交finalStage
  submitStage(finalStage)
  submitWaitingStages()
}

接下例就是實例化ActiveJob苹享,然后通過submitStage方法提交我們上一部分的到的finalStage:

private def submitStage(stage: Stage) {
  // 得到jobId
  val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {
    logDebug("submitStage(" + stage + ")")
    // 確保當前的Stage沒有未完成計算的父Stage双絮,也不是正在運行的Stage浴麻,而且也沒有提示提交失敗
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      val missing = getMissingParentStages(stage).sortBy(_.id)
      logDebug("missing: " + missing)
      if (missing.isEmpty) {
        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
        submitMissingTasks(stage, jobId.get)
      } else {
        for (parent <- missing) {
          submitStage(parent)
        }
        waitingStages += stage
      }
    }
  } else {
    abortStage(stage, "No active job for stage " + stage.id, None)
  }
}

上述源碼說明提交Tasks的時候也是先提交父Stage的Tasks,即前面的Stage計算完成后才能計算后面的Stage囤攀,明白這一點之后我們進入到submitMissingTasks方法(此處我們只選取關鍵部分的代碼):

private def submitMissingTasks(stage: Stage, jobId: Int) {
  
  ...
  
  val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
    stage match {
      case s: ShuffleMapStage =>
        partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
      case s: ResultStage =>
        val job = s.activeJob.get
        partitionsToCompute.map { id =>
          val p = s.partitions(id)
          (id, getPreferredLocs(stage.rdd, p))
        }.toMap
    }
  } catch {
    case NonFatal(e) =>
      stage.makeNewStageAttempt(partitionsToCompute.size)
      listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
      abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
      runningStages -= stage
      return
  }
  stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
  listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
  // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
  // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
  // the serialized copy of the RDD and for each task we will deserialize it, which means each
  // task gets a different copy of the RDD. This provides stronger isolation between tasks that
  // might modify state of objects referenced in their closures. This is necessary in Hadoop
  // where the JobConf/Configuration object is not thread-safe.
  // 如果是ShuffleMapStage就將其中RDD软免,及其依賴關系廣播出去;如果是ResultStage
  // 就將其中的RDD及其計算方法func廣播出去焚挠。由此也可以看出真正觸發(fā)計算的是ResultStage
  // ShuffleMapStage不會觸發(fā)計算膏萧。
  var taskBinary: Broadcast[Array[Byte]] = null
  try {
    // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
    // For ResultTask, serialize and broadcast (rdd, func).
    val taskBinaryBytes: Array[Byte] = stage match {
      case stage: ShuffleMapStage =>
        closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
      case stage: ResultStage =>
        closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array()
    }
    taskBinary = sc.broadcast(taskBinaryBytes)
  } catch {
    // In the case of a failure during serialization, abort the stage.
    case e: NotSerializableException =>
      abortStage(stage, "Task not serializable: " + e.toString, Some(e))
      runningStages -= stage
      // Abort execution
      return
    case NonFatal(e) =>
      abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}", Some(e))
      runningStages -= stage
      return
  }
  val tasks: Seq[Task[_]] = try {
    stage match {
      case stage: ShuffleMapStage =>
        partitionsToCompute.map { id =>
          val locs = taskIdToLocations(id)
          val part = stage.rdd.partitions(id)
          new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
            taskBinary, part, locs, stage.internalAccumulators)
        }
      case stage: ResultStage =>
        val job = stage.activeJob.get
        partitionsToCompute.map { id =>
          val p: Int = stage.partitions(id)
          val part = stage.rdd.partitions(p)
          val locs = taskIdToLocations(id)
          new ResultTask(stage.id, stage.latestInfo.attemptId,
            taskBinary, part, locs, id, stage.internalAccumulators)
        }
    }
  } catch {
    case NonFatal(e) =>
      abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
      runningStages -= stage
      return
  }
  if (tasks.size > 0) {
    logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
    stage.pendingPartitions ++= tasks.map(_.partitionId)
    logDebug("New pending partitions: " + stage.pendingPartitions)
    taskScheduler.submitTasks(new TaskSet(
      tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
    stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
  } else {
    // Because we posted SparkListenerStageSubmitted earlier, we should mark
    // the stage as completed here in case there are no tasks to run
    markStageAsFinished(stage, None)
    val debugString = stage match {
      case stage: ShuffleMapStage =>
        s"Stage ${stage} is actually done; " +
          s"(available: ${stage.isAvailable}," +
          s"available outputs: ${stage.numAvailableOutputs}," +
          s"partitions: ${stage.numPartitions})"
      case stage : ResultStage =>
        s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
    }
    logDebug(debugString)
  }
}

taskIdToLocations就是用來保存partition的id到TaskLocation映射關系的,我們進入到getPreferredLocs方法:

private[spark]
def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
  getPreferredLocsInternal(rdd, partition, new HashSet)
}

繼續(xù)追蹤getPreferredLocsInternal:

private def getPreferredLocsInternal(
    rdd: RDD[_],
    partition: Int,
    visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
  // If the partition has already been visited, no need to re-visit.
  // This avoids exponential path exploration.  SPARK-695
  if (!visited.add((rdd, partition))) {
    // Nil has already been returned for previously visited partitions.
    return Nil
  }
  // 如果partition被緩存了蝌衔,直接返回緩存的信息
  // If the partition is cached, return the cache locations
  val cached = getCacheLocs(rdd)(partition)
  if (cached.nonEmpty) {
    return cached
  }
  // 如果該RDD是從外部讀取數(shù)據(jù)榛泛,則執(zhí)行RDD的preferredLocations方法
  // If the RDD has some placement preferences (as is the case for input RDDs), get those
  val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
  if (rddPrefs.nonEmpty) {
    return rddPrefs.map(TaskLocation(_))
  }
  // 如果是窄依賴,就一直遞歸調(diào)用查找該依賴關系上的第一個RDD的Location作為該locs
  // If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
  // that has any placement preferences. Ideally we would choose based on transfer sizes,
  // but this will do for now.
  rdd.dependencies.foreach {
    case n: NarrowDependency[_] =>
      for (inPart <- n.getParents(partition)) {
        val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
        if (locs != Nil) {
          return locs
        }
      }
    case _ =>
  }
  Nil
}

我們進入到RDD的preferredLocations方法:

final def preferredLocations(split: Partition): Seq[String] = {
  checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
    getPreferredLocations(split)
  }
}

可以看見內(nèi)部調(diào)用的是RDD的getPreferredLocations方法:

/**
 * Optionally overridden by subclasses to specify placement preferences.
 */
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

很顯然最后具體的數(shù)據(jù)本地性計算是被子RDD而具體實現(xiàn)的噩斟,下面是具體實現(xiàn)該方法的RDD列表曹锨,具體的實現(xiàn)方法不再討論,有興趣的朋友可以研究一下剃允,如果開發(fā)者需要開發(fā)自定義的RDD從外部數(shù)據(jù)源中讀取數(shù)據(jù)沛简,為了保證Task的數(shù)據(jù)本地性就必須實現(xiàn)該RDD的getPreferredLocations方法。

在獲取了數(shù)據(jù)本地性信息之后斥废,我們就根據(jù)Stage的類型來生成ShuffleMapTask和ResultTask椒楣,然后使用TaskSet進行封裝,最后調(diào)用TaskScheduler的submitTasks方法提交具體的TaskSet牡肉。

本文參照的是Spark 1.6.3版本的源碼撒顿,同時給出Spark 2.1.0版本的連接:

Spark 1.6.3 源碼

Spark 2.1.0 源碼

本文為原創(chuàng),歡迎轉載荚板,轉載請注明出處凤壁、作者,謝謝跪另!

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末拧抖,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子免绿,更是在濱河造成了極大的恐慌唧席,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,198評論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嘲驾,死亡現(xiàn)場離奇詭異淌哟,居然都是意外死亡,警方通過查閱死者的電腦和手機辽故,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評論 3 398
  • 文/潘曉璐 我一進店門徒仓,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人誊垢,你說我怎么就攤上這事掉弛≈⒓” “怎么了?”我有些...
    開封第一講書人閱讀 167,643評論 0 360
  • 文/不壞的土叔 我叫張陵殃饿,是天一觀的道長谋作。 經(jīng)常有香客問我,道長乎芳,這世上最難降的妖魔是什么遵蚜? 我笑而不...
    開封第一講書人閱讀 59,495評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮奈惑,結果婚禮上吭净,老公的妹妹穿的比我還像新娘。我一直安慰自己携取,他們只是感情好驾诈,可當我...
    茶點故事閱讀 68,502評論 6 397
  • 文/花漫 我一把揭開白布病毡。 她就那樣靜靜地躺著,像睡著了一般臼勉。 火紅的嫁衣襯著肌膚如雪文兢。 梳的紋絲不亂的頭發(fā)上晤斩,一...
    開封第一講書人閱讀 52,156評論 1 308
  • 那天,我揣著相機與錄音姆坚,去河邊找鬼澳泵。 笑死,一個胖子當著我的面吹牛兼呵,可吹牛的內(nèi)容都是我干的兔辅。 我是一名探鬼主播,決...
    沈念sama閱讀 40,743評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼击喂,長吁一口氣:“原來是場噩夢啊……” “哼维苔!你這毒婦竟也來了?” 一聲冷哼從身側響起懂昂,我...
    開封第一講書人閱讀 39,659評論 0 276
  • 序言:老撾萬榮一對情侶失蹤介时,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后凌彬,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體沸柔,經(jīng)...
    沈念sama閱讀 46,200評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,282評論 3 340
  • 正文 我和宋清朗相戀三年铲敛,在試婚紗的時候發(fā)現(xiàn)自己被綠了褐澎。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,424評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡伐蒋,死狀恐怖乱凿,靈堂內(nèi)的尸體忽然破棺而出顽素,到底是詐尸還是另有隱情,我是刑警寧澤徒蟆,帶...
    沈念sama閱讀 36,107評論 5 349
  • 正文 年R本政府宣布胁出,位于F島的核電站,受9級特大地震影響段审,放射性物質(zhì)發(fā)生泄漏全蝶。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,789評論 3 333
  • 文/蒙蒙 一寺枉、第九天 我趴在偏房一處隱蔽的房頂上張望抑淫。 院中可真熱鬧,春花似錦姥闪、人聲如沸始苇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,264評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽催式。三九已至,卻和暖如春避归,著一層夾襖步出監(jiān)牢的瞬間荣月,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,390評論 1 271
  • 我被黑心中介騙來泰國打工梳毙, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留哺窄,地道東北人。 一個月前我還...
    沈念sama閱讀 48,798評論 3 376
  • 正文 我出身青樓账锹,卻偏偏與公主長得像萌业,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子奸柬,可洞房花燭夜當晚...
    茶點故事閱讀 45,435評論 2 359

推薦閱讀更多精彩內(nèi)容