spark源碼分析之任務調(diào)度篇

DAG的生成

概述

spark作為一套高效的分布式運算框架爷辙,但是想要更深入的學習它,就要通過分析spark的源碼,不但可以更好的幫助理解spark的工作過程,還可以提高對集群的排錯能力图谷,本文主要關注的是Spark的Stage任務的執(zhí)行流程的流程。

DAG(Directed Acyclic Graph)叫做有向無環(huán)圖阱洪,原始的RDD通過一系列的轉(zhuǎn)換就就形成了DAG便贵,根據(jù)RDD之間的依賴關系的不同將DAG劃分成不同的Stage,對于窄依賴冗荸,partition的轉(zhuǎn)換處理在Stage中完成計算承璃。對于寬依賴,由于有Shuffle的存在蚌本,只能在parent RDD處理完成后绸硕,才能開始接下來的計算,因此寬依賴是劃分Stage的依據(jù)魂毁。

窄依賴 指的是每一個父RDD的Partition最多被子RDD的一個Partition使用
寬依賴 指的是多個子RDD的Partition會依賴同一個父RDD的Partition

DAGScheduler調(diào)度隊列

當我們看完Executor的創(chuàng)建與啟動流程后,我們繼續(xù)在SparkContext的構(gòu)造方法中繼續(xù)查看

class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
  出嘹。席楚。。税稼。烦秩。。
    
 private[spark] def createSparkEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus): SparkEnv = {
    //通過SparkEnv來創(chuàng)建createDriverEnv
    SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
  }
  //在這里調(diào)用了createSparkEnv郎仆,返回一個SparkEnv對象只祠,這個對象里面有很多重要屬性,最重要的ActorSystem
  private[spark] val env = createSparkEnv(conf, isLocal, listenerBus)
  SparkEnv.set(env)

  //創(chuàng)建taskScheduler
  // Create and start the scheduler
  private[spark] var (schedulerBackend, taskScheduler) =
    SparkContext.createTaskScheduler(this, master)

  //創(chuàng)建DAGScheduler
  dagScheduler = new DAGScheduler(this)

  //啟動TaksScheduler
  taskScheduler.start()
    扰肌。抛寝。。。盗舰。
}

在構(gòu)造方法中還創(chuàng)建了一個DAGScheduler對象晶府,這個類的任務就是用來劃分Stage任務的,構(gòu)造方法中初始化了 private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
DAGSchedulerEventProcessLoop是一個事件總線對象钻趋,用來負責任務的分發(fā)川陆,在構(gòu)造方法eventProcessLoop.start()被調(diào)用,該方法是父類EventLoop的start

  def start(): Unit = {
    if (stopped.get) {
      throw new IllegalStateException(name + " has already been stopped")
    }
    // Call onStart before starting the event thread to make sure it happens before onReceive
    onStart()
    eventThread.start()
  }

調(diào)用了eventThread的start方法蛮位,開啟了一個線程

  private val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) => {
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
            }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }
  }

run方法中不斷的從LinkedBlockingDeque阻塞隊列中取消息较沪,然后調(diào)用onReceive(event)方法,該方法是由子類DAGSchedulerEventProcessLoop實現(xiàn)的

  override def onReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
      //調(diào)用dagScheduler來出來提交任務
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
        listener, properties)

    case StageCancelled(stageId) =>
      dagScheduler.handleStageCancellation(stageId)

    case JobCancelled(jobId) =>
      dagScheduler.handleJobCancellation(jobId)

    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)

    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()

    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId) =>
      dagScheduler.handleExecutorLost(execId, fetchFailed = false)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)

    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
      dagScheduler.handleTaskCompletion(completion)

    case TaskSetFailed(taskSet, reason) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason)

    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }

onReceive中會匹配到傳入的任務類型失仁,執(zhí)行相應的邏輯尸曼。到此DAGScheduler的調(diào)度隊列會一直掛起,不斷輪詢隊列中的任務陶因。

DAG提交Task任務流程

當RDD經(jīng)過一系列的轉(zhuǎn)換Transformation方法后骡苞,最終要執(zhí)行Action動作方法,這里比如WordCount程序中最后調(diào)用collect()方法時會將數(shù)據(jù)提交到Master上運行楷扬,任務真正的被執(zhí)行解幽,這里的方法執(zhí)行過程如下

  /**
   * Return an array that contains all of the elements in this RDD.
   */
  def collect(): Array[T] = {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

sc 是SparkContext對象,這里調(diào)用 一個runJob該方法調(diào)用多次重載的方法后,該方法最終會調(diào)用 dagScheduler.runJob

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit) {
    if (stopped) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    //dagScheduler出現(xiàn)了烘苹,可以切分stage
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
      resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

dagScheduler的runJob是我們比較關心的

 def runJob[T, U: ClassTag](
 
    躲株。。镣衡。霜定。。

    val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
    waiter.awaitResult() match {
      case JobSucceeded => {
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      }
      case JobFailed(exception: Exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        throw exception
    }
  }

這里面的我們主要看的是submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)提交任務

def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {

     廊鸥。望浩。。惰说。磨德。。

    //把job加入到任務隊列里面
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
    waiter
  }

這里比較關鍵的地方是eventProcessLoop.post往任務隊列中加入一個JobSubmitted類型的任務吆视,eventProcessLoop是在構(gòu)造方法中就初始化好的事件總線對象典挑,內(nèi)部有一個線程不斷的輪詢隊列里的任務

輪詢到任務后調(diào)用onReceive方法匹配任務類型,在這里我們提交的任務是JobSubmitted類型

    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
      //調(diào)用dagScheduler來出來提交任務
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
        listener, properties)

調(diào)用了handleJobSubmitted方法啦吧,接下來查看該方法

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      allowLocal: Boolean,
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: Stage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      //最終的stage
      finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
        您觉。。授滓。琳水。
        submitStage(finalStage)
   }

上面的代碼中肆糕,調(diào)用了newStage進行任務的劃分,該方法是劃分任務的核心方法炫刷,劃分任務的根據(jù)最后一個依賴關系作為開始擎宝,通過遞歸,將每個寬依賴做為切分Stage的依據(jù)浑玛,切分Stage的過程是流程中的一環(huán)绍申,但在這里不詳細闡述,當任務切分完畢后顾彰,代碼繼續(xù)執(zhí)行來到submitStage(finalStage)這里開始進行任務提交
這里以遞歸的方式進行任務的提交

//遞歸的方式提交stage
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing == Nil) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
            //提交任務
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id)
    }
  }

調(diào)用submitMissingTasks(stage, jobId.get)提交任務极阅,將每一個Stage和jobId傳入

  private def submitMissingTasks(stage: Stage, jobId: Int) {
   。涨享。筋搏。。厕隧。

    if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingTasks ++= tasks
      logDebug("New pending tasks: " + stage.pendingTasks)
      //taskScheduler提交task
      taskScheduler.submitTasks(
        new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
      markStageAsFinished(stage, None)
      logDebug("Stage " + stage + " is actually done; %b %d %d".format(
        stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
    }
  }

這里的代碼我們需要關注的是 taskScheduler.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
創(chuàng)建了一個TaskSet對象奔脐,將所有任務的信息封裝,包括task任務列表吁讨,stageId,任務id,分區(qū)數(shù)參數(shù)等

Task任務調(diào)度

override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
        //創(chuàng)建TaskSetManager保存了taskSet任務列表
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      activeTaskSets(taskSet.id) = manager
     //將任務加入調(diào)度池
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
      }
      hasReceivedTask = true
    }
    //接受任務
    backend.reviveOffers()
  }

該方法比較重要髓迎,主要將任務加入調(diào)度池,最后調(diào)用了backend.reviveOffers()這里的backend是CoarseGrainedSchedulerBackend一個Executor任務調(diào)度對象

  override def reviveOffers() {
    //自己給自己發(fā)消息
    driverActor ! ReviveOffers
  }

這里用了內(nèi)部的DriverActor對象發(fā)送了一個內(nèi)部消息給自己建丧,接下來查看receiver方法接受的消息

      case ReviveOffers =>
        makeOffers()

收到消息后調(diào)用了makeOffers()方法

    def makeOffers() {
      launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq))
    }

makeOffers方法中排龄,將Executor的信息集合與調(diào)度池中的Tasks封裝成WokerOffers列表傳給了
launchTasks

    def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
           。翎朱。橄维。。拴曲。争舞。
        //把task序列化
        val serializedTask = ser.serialize(task)

            。澈灼。兑障。。蕉汪。
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK
          //把序列化好的task發(fā)送給Executor
          executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
        }
      }
    }

launchTasks方法將遍歷Tasks集合,每個Task任務序列化,發(fā)送啟動Task執(zhí)行消息的給Executor
Executor的onReceive方法

  //DriverActor發(fā)送給Executor的啟動Task的消息
    case LaunchTask(data) =>
      if (executor == null) {
        logError("Received LaunchTask command but executor was null")
        System.exit(1)
      } else {
        val ser = env.closureSerializer.newInstance()
        //把Task反序列化
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        //啟動task
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }

Executor收到DriverActor發(fā)送的啟動Task的消息逞怨,這里才開始真正執(zhí)行任務了者疤,將收到的Task序列化信息反序列化,調(diào)用ExecutorlaunchTask方法執(zhí)行任務

  def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer) {
    //把task的描述信息放到了一份TaskRunner
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    runningTasks.put(taskId, tr)
    //然后把TaskRunner丟到線程池里面
    threadPool.execute(tr)
  }

launchTask內(nèi)將Task提交到線程池去運行,TaskRunner是Runnable對象叠赦,里面的run方法執(zhí)行了我們app生成的每一個RDD的鏈上的邏輯驹马。 到此革砸,RDD的整個作業(yè)方式就結(jié)束了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末糯累,一起剝皮案震驚了整個濱河市算利,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌泳姐,老刑警劉巖效拭,帶你破解...
    沈念sama閱讀 219,188評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異胖秒,居然都是意外死亡缎患,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評論 3 395
  • 文/潘曉璐 我一進店門阎肝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來挤渔,“玉大人,你說我怎么就攤上這事风题∨械迹” “怎么了?”我有些...
    開封第一講書人閱讀 165,562評論 0 356
  • 文/不壞的土叔 我叫張陵沛硅,是天一觀的道長眼刃。 經(jīng)常有香客問我,道長稽鞭,這世上最難降的妖魔是什么鸟整? 我笑而不...
    開封第一講書人閱讀 58,893評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮朦蕴,結(jié)果婚禮上篮条,老公的妹妹穿的比我還像新娘。我一直安慰自己吩抓,他們只是感情好涉茧,可當我...
    茶點故事閱讀 67,917評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著疹娶,像睡著了一般伴栓。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上雨饺,一...
    開封第一講書人閱讀 51,708評論 1 305
  • 那天钳垮,我揣著相機與錄音,去河邊找鬼额港。 笑死饺窿,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的移斩。 我是一名探鬼主播肚医,決...
    沈念sama閱讀 40,430評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼肠套!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起你稚,我...
    開封第一講書人閱讀 39,342評論 0 276
  • 序言:老撾萬榮一對情侶失蹤入宦,失蹤者是張志新(化名)和其女友劉穎乾闰,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體涯肩,經(jīng)...
    沈念sama閱讀 45,801評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡疗垛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,976評論 3 337
  • 正文 我和宋清朗相戀三年硫朦,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片泽裳。...
    茶點故事閱讀 40,115評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡涮总,死狀恐怖瀑梗,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情抛丽,我是刑警寧澤亿鲜,帶...
    沈念sama閱讀 35,804評論 5 346
  • 正文 年R本政府宣布狡门,位于F島的核電站,受9級特大地震影響其馏,放射性物質(zhì)發(fā)生泄漏叛复。R本人自食惡果不足惜扔仓,卻給世界環(huán)境...
    茶點故事閱讀 41,458評論 3 331
  • 文/蒙蒙 一翘簇、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧呜笑,春花似錦彻犁、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,008評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽疾宏。三九已至,卻和暖如春为牍,著一層夾襖步出監(jiān)牢的瞬間碉咆,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,135評論 1 272
  • 我被黑心中介騙來泰國打工疫铜, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留壳咕,地道東北人。 一個月前我還...
    沈念sama閱讀 48,365評論 3 373
  • 正文 我出身青樓幌羞,卻偏偏與公主長得像竟稳,于是被迫代替她去往敵國和親他爸。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,055評論 2 355

推薦閱讀更多精彩內(nèi)容

  • YarnYarn產(chǎn)生背景:Yarn直接來自于MR1.0MR1.0 問題:采用的是master slave結(jié)構(gòu)系谐,ma...
    時待吾閱讀 5,676評論 2 23
  • 上一篇文章講解了RDD的基本概念, 這篇文章嘗試分析當Spark拿到一個RDD之后是如何處理它的. 文中會涉及到S...
    肝蹬福克斯記閱讀 5,986評論 2 17
  • 本文基于spark2.11 1. 前言 1.1 基本概念 RDD關于RDD已經(jīng)有很多文章了止喷,可以參考一下理解Spa...
    aaron1993閱讀 1,806評論 0 3
  • RDD的概述 RDD是只讀的弹谁、分區(qū)記錄的集合句喜,是Spark編程模型的最主要抽象,它是一種特殊的集合植康,支持多種數(shù)據(jù)源...
    木戎閱讀 3,261評論 0 2
  • 成熟的心思嗎销睁? 理想中的自己 要面子的童鞋 失去活動的心思 只想自己嗎 不想笨拙的參與這個游戲
    明霧心閱讀 132評論 0 0