spark應(yīng)用執(zhí)行流程

1.Spark的應(yīng)用執(zhí)行機制

用戶提交一個Application到Spark集群執(zhí)行的基礎(chǔ)流程如下圖所示:


image.png

(1)Driver進(jìn)程啟動,構(gòu)建Spark Application的運行環(huán)境(啟動SparkContext)壶愤,SparkContext向資源管理器(可以是Standalone舅柜、Mesos或YARN)注冊并申請運行Executor資源兴想;

(2)資源管理器接到SparkContext申請后潜索,根據(jù)與woker之間心跳信息幢泼,決定在哪些worker上啟動Executor;

(3)Worker的Executor啟動后梢睛,會向SparkContext注冊。與此同時识椰,SparkContext解析Application绝葡,劃分job構(gòu)建成DAG圖,將DAG圖分解成Stage腹鹉,并把Taskset發(fā)送給Task Scheduler藏畅。

(4)Task Scheduler將Task發(fā)放給Executor運行同時SparkContext將應(yīng)用程序代碼發(fā)放給Executor。

(5)Task在Executor上運行功咒,運行完畢釋放所有資源愉阎。

2.Spark 作業(yè)提交運行詳細(xì)流程

image.png
  1. 通過SparkSubmit提交job后绞蹦,Client就開始構(gòu)建 spark context,即 application 的運行環(huán)境(使用本地的Client類的main函數(shù)來創(chuàng)建spark context并初始化它)

  2. client模式下提交任務(wù)榜旦,Driver在客戶端本地運行幽七;cluster模式下提交任務(wù)的時候,Driver是運行在集群上

  3. SparkContext連接到ClusterManager(Master)章办,向資源管理器注冊并申請運行Executor的資源(內(nèi)核和內(nèi)存)

  4. Master根據(jù)SparkContext提出的申請锉走,根據(jù)worker的心跳報告,來決定到底在那個worker上啟動executor

  5. Worker節(jié)點收到請求后會啟動executor

  6. executor向SparkContext注冊藕届,這樣driver就知道哪些executor運行該應(yīng)用

  7. SparkContext將Application代碼發(fā)送給executor(如果是standalone模式就是StandaloneExecutorBackend)

  8. 同時SparkContext解析Application代碼挪蹭,構(gòu)建DAG圖,提交給DAGScheduler進(jìn)行分解成stage休偶,stage被發(fā)送到TaskScheduler梁厉。

  9. TaskScheduler負(fù)責(zé)將Task分配到相應(yīng)的worker上,最后提交給executor執(zhí)行

  10. executor會建立Executor線程池踏兜,開始執(zhí)行Task词顾,并向SparkContext匯報,直到所有的task執(zhí)行完成

  11. 所有Task完成后,SparkContext向Master注銷

3.Job的調(diào)度執(zhí)行流程

應(yīng)用程序是一系列RDD的操作碱妆,Driver解析代碼時遇到Action算子肉盹,就會觸發(fā)Job的提交(實際底層實現(xiàn)上,Action算子最后調(diào)用rubJob函數(shù)提交Job給spark疹尾。其他操作都是生成對應(yīng)RDD的關(guān)系鏈上忍,job提交是隱式完成的,無需用戶顯示的提交)纳本。

Job的解析窍蓝、調(diào)度執(zhí)行整個流程可以劃分兩個階段:

  1. Stage劃分與提交

    (1)Job按照RDD之間的依賴關(guān)系是否為寬依賴,由DAGScheduler從RDD毅力鏈的末端開始觸發(fā)繁成,遍歷RDD依賴鏈劃分為一個或多個具有依賴關(guān)系個Stage吓笙;

    (2)第一步劃分出Stage后,DAGScheduler生成job實例巾腕,從末端Stage-FinalStage開始面睛,按照一定規(guī)則遞歸調(diào)度Stage,即交給TaskScheduler將每個stage轉(zhuǎn)化為一個TaskSet祠墅;

  2. Task調(diào)度與執(zhí)行:由TaskScheduler負(fù)責(zé)將TaskSet中的Task調(diào)度到Worker節(jié)點的Executor上執(zhí)行侮穿。

3.1Job到DAGScheduler過程

  1. 首先在SparkContext初始化的時候會創(chuàng)建DAGScheduler,這個DAGScheduelr每個應(yīng)用只有一個。然后DAGScheduler創(chuàng)建的時候毁嗦,會初始化一個事件捕獲對象亲茅,并且開啟監(jiān)聽。之后我們的任務(wù)都會發(fā)給這個事件監(jiān)聽器,它會按照任務(wù)的類型創(chuàng)建不同的任務(wù)克锣。

  2. 再從客戶端程序方面說茵肃,當(dāng)我們調(diào)用action操作的時候,就會觸發(fā)runjob袭祟,它內(nèi)部其實就是向前面的那個事件監(jiān)聽器提交一個任務(wù)验残。

  3. 最后事件監(jiān)聽器調(diào)用DAGScheduler的handleJobSubmitted真正的處理

  4. 處理的時候,會先創(chuàng)建一個resultStage巾乳,每個job只有一個resultstage您没,其余的都是shufflestage.然后根據(jù)rdd的依賴關(guān)系,按照廣度優(yōu)先的思想遍歷rdd胆绊,遇到shufflerdd就創(chuàng)建一個新的stage氨鹏。

  5. 形成DAG圖后,遍歷等待執(zhí)行的stage列表压状,如果這個stage所依賴的父stage執(zhí)行完了仆抵,它就可以執(zhí)行了;否則還需要繼續(xù)等待种冬。

  6. 最終stage會以taskset的形式镣丑,提交給TaskScheduler,然后最后提交給excutor娱两。


    image.png

3.1.1Job提交

當(dāng)我們調(diào)用action操作的時候莺匠,就會觸發(fā)runjob,它內(nèi)部其實就是向前面的那個事件監(jiān)聽器提交一個任務(wù)十兢,以 RDD#collect()算子介紹:

collect()算子提交任務(wù):

def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

SparkContext#runJob

def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    ...
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    ...
  }

在dagScheduler.runJob-》dagScheduler.submitJob方法中慨蛙,會向eventProcessLoop發(fā)送一個‘JobSubmitted’-任務(wù)提交事件;

def runJob[T, U](
     rdd: RDD[T],
     func: (TaskContext, Iterator[T]) => U,
     partitions: Seq[Int],
     callSite: CallSite,
     resultHandler: (Int, U) => Unit,
     properties: Properties): Unit = {
   val start = System.nanoTime
   // 調(diào)用 submitJob 方法
   val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
}
//
def submitJob[T, U](..){
    ...
     eventProcessLoop.post(JobSubmitted(
       jobId, rdd, func2, partitions.toArray, callSite, waiter,
       SerializationUtils.clone(properties)))
   }

事件隊列的處理最后會走到 DAGSchedulerEventProcessLoop 的 onReceive 的回調(diào)方法里面去纪挎。

/**
   * The main event loop of the DAG scheduler.
   */
  override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
      // 調(diào)用 doOnReceive 方法
      doOnReceive(event)
    } finally {
      timerContext.stop()
    }
  }

后面會去調(diào)用 doOnReceive 方法,根據(jù) event 進(jìn)行模式匹配跟匆,匹配到 JobSubmitted 的 event 后實際上是去調(diào)用 DAGScheduler 的 handleJobSubmitted 這個方法

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    // 模式匹配
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      // 調(diào)用 handleJobSubmitted 方法
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

3.1.2Job的劃分和調(diào)度

handleJobSubmitted主要完成下面的工作:

1.使用 觸發(fā) job 的最后一個 rdd來創(chuàng)建 finalStage异袄;注: Stage 是一個抽象類,一共有兩個實現(xiàn)玛臂,一個是 ResultStage烤蜕,是用 action 中的函數(shù)計算結(jié)果的 stage;另一個是 ShuffleMapStage迹冤,是為 shuffle 準(zhǔn)備數(shù)據(jù)的 stage讽营。

2.構(gòu)造一個 Job 對象,將上面創(chuàng)建的 finalStage 封裝進(jìn)去泡徙,這個 Job 的最后一個 stage 也就是這個 finalStage橱鹏;

3.將 Job 的相關(guān)信息保存到內(nèi)存的數(shù)據(jù)結(jié)構(gòu)中;

4.調(diào)用 submitStage 方法提交 finalStage。

構(gòu)造finalStage和Job實例
private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // 使用觸發(fā) job 的最后一個 RDD 創(chuàng)建一個 ResultStage
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    // 使用前面創(chuàng)建好的 ResultStage 去創(chuàng)建一個 job
    // 這個 job 的最后一個 stage 就是 finalStage
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    // 將 job 的相關(guān)信息存儲到內(nèi)存中
    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    // 提交 finalStage
    submitStage(finalStage)
  }
3.1.2.1DAG劃分莉兰,并調(diào)度Stage

下面就會走進(jìn) submitStage 方法挑围,這個方法是用來提交 stage 的,具體做了這些操作:

1糖荒,首先會驗證 stage 對應(yīng)的 job id 進(jìn)行校驗杉辙,存在才會繼續(xù)執(zhí)行;

2捶朵,在提交這個 stage 之前會判斷當(dāng)前 stage 的狀態(tài)蜘矢。

如果是 running、waiting综看、failed 的話就不做任何操作品腹。

如果不是這三個狀態(tài)則會根據(jù)當(dāng)前 stage 去往前推前面的 stage,如果能找到前面的 stage 則繼續(xù)遞歸調(diào)用 submitStage 方法寓搬,直到當(dāng)前 stage 找不到前面的 stage 為止珍昨,這時候的 stage 就相當(dāng)于當(dāng)前 job 的第一個 stage,然后回去調(diào)用 submitMissingTasks 方法去分配 task句喷。

private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    // 看看當(dāng)前的 job 是否存在
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
       // 判斷當(dāng)前 stage 的狀態(tài)
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        // 根據(jù)當(dāng)前的 stage 去推倒前面的 stage
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        // 如果前面已經(jīng)沒有 stage 了,那么久將當(dāng)前 stage 去執(zhí)行 submitMissingTasks 方法
        // 如果前面還有 stage 的話那么遞歸調(diào)用 submitStage
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          // 將當(dāng)前 stage 加入等待隊列
          waitingStages += stage
        }
      }
    } else {
      // abortStage 終止提交當(dāng)前 stage
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }
3.1.2.2劃分stage

getMissingParentStages 這個劃分算法做了哪些操作:

1.創(chuàng)建 missing 和 visited 兩個 HashSet唾琼,分別用來存儲根據(jù)當(dāng)前 stage 向前找到的所有 stage 數(shù)據(jù)和已經(jīng)調(diào)用過 visit 方法的 RDD兄春;

2.創(chuàng)建一個存放 RDD 的棧,然后將傳進(jìn)來的 stage 中的 rdd 也就是 finalStage 中的那個 job 觸發(fā)的最后一個 RDD 放入棧中锡溯;

3.然后將棧中的 RDD 拿出來調(diào)用 visit 方法赶舆,這個 visit 方法內(nèi)部會根據(jù)當(dāng)前 RDD 的依賴鏈逐個遍歷所有 RDD,并且會根據(jù)相鄰兩個 RDD 的依賴關(guān)系來決定下面的操作:

如果是寬依賴祭饭,即 ShuffleDependency 芜茵,那么會調(diào)用 getOrCreateShuffleMapStage 創(chuàng)建一個新的 stage,默認(rèn)每個 job 的最后一個 stage 是 ResultStage倡蝙,剩余的 job 中的其它 stage 均為 ShuffleMapStage九串。然后會將創(chuàng)建的這個 stage 加入前面創(chuàng)建的 missing 的 HashSet 中;

如果是窄依賴寺鸥,即 NarrowDependency猪钮,那么會將該 RDD 加入到前面創(chuàng)建的 RDD 棧中,繼續(xù)遍歷調(diào)用 visit 方法胆建。

直到所有的 RDD 都遍歷結(jié)束后返回前面創(chuàng)建的 missing 的集合烤低。

private def getMissingParentStages(stage: Stage): List[Stage] = {
    // 存放下面找到的所有 stage
    val missing = new HashSet[Stage]
    // 存放已經(jīng)遍歷過的 rdd
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    // 創(chuàng)建一個維護(hù) RDD 的棧
    val waitingForVisit = new Stack[RDD[_]]
    // visit 方法
    def visit(rdd: RDD[_]) {
      // 判斷當(dāng)前 rdd 是否 visit 過
      if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
          // 遍歷當(dāng)前 RDD 的依賴鏈
          for (dep <- rdd.dependencies) {
            dep match {
              // 如果是寬依賴
              case shufDep: ShuffleDependency[_, _, _] =>
                // 創(chuàng)建 ShuffleMapStage 
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  // 加入 missing 集合
                  missing += mapStage
                }
              // 如果是窄依賴
              case narrowDep: NarrowDependency[_] =>
                // 加入等待 visit 的集合中,準(zhǔn)備下一次遍歷
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }
    // 將傳入的 stage 中的 rdd 拿出來壓入 waitingForVisit 的棧中
    waitingForVisit.push(stage.rdd)
    // 遍歷棧里的所有 RDD 
    while (waitingForVisit.nonEmpty) {
      // 調(diào)用 visit 方法
      visit(waitingForVisit.pop())
    }
    // 返回 missing 這個 stage 集合
    missing.toList
  }
3.1.2.3為創(chuàng)建的task分配最佳位置

submitMissingTasks 方法中做了這些事:

1.拿到 stage 中沒有計算的 partition笆载;

2.獲取 task 對應(yīng)的 partition 的最佳位置扑馁,使用最佳位置算法涯呻;

3.獲取 taskBinary,將 stage 的 RDD 和 ShuffleDependency(或 func)廣播到 Executor檐蚜;

4.為 stage 創(chuàng)建 task和taskSet(當(dāng)tasks長度大于0)

5.提交taskSet給TaskScheduler

submitMissingTasks 主要為task分配最佳位置計算-生成taskId和最佳partition的映射關(guān)系魄懂;

3.2提交Stage給TaskScheduler完成任務(wù)集的調(diào)度

前面已經(jīng)分析到了 DAGScheduler 對 stage 劃分,并對 Task 的最佳位置進(jìn)行計算之后闯第,通過調(diào)用 taskScheduler 的 submitTasks 方法市栗,將每個 stage 的 taskSet 進(jìn)行提交。

在 taskScheduler 的 submitTasks 方法中會為每個 taskSet 創(chuàng)建一個 TaskSetManager咳短,用于管理 taskSet填帽。然后向調(diào)度池中添加該 TaskSetManager,最后會調(diào)用 backend.reviveOffers() 方法為 task 分配資源咙好。

TaskScheduler維護(hù)task和executor對應(yīng)關(guān)系篡腌,executor和物理資源對應(yīng)關(guān)系,在排隊的task和正在跑的task勾效。

3.2.1包裝taskSet嘹悼,為task分配資源

TaskScheduler唯一實現(xiàn)類-TaskSchedulerImpl的submitTasks邏輯

override def submitTasks(taskSet: TaskSet) {
    //獲取 taskSet 中的 task
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      // 為每個 taskSet 創(chuàng)建一個 TaskSetManager
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      // 拿到 stage 的 id
      val stage = taskSet.stageId
      // 創(chuàng)建一個 HashMap ,用來存儲 stage 對應(yīng)的 TaskSetManager
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      // 將上面創(chuàng)建的 taskSetManager 存入 map 中
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      // 向調(diào)度池中添加剛才創(chuàng)建的 TaskSetManager
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      // 判斷程序是否為 local 模式层宫,并且 TaskSchedulerImpl 沒有收到 Task
      if (!isLocal && !hasReceivedTask) {
        // 創(chuàng)建一個定時器杨伙,通過指定時間檢查 TaskSchedulerImpl 的饑餓情況
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            // 如果 TaskSchedulerImpl 已經(jīng)安排執(zhí)行了 Task,則取消定時器
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      // 標(biāo)記已經(jīng)接收到 Task
      hasReceivedTask = true
    }
    // 給 Task 分配資源
    backend.reviveOffers()
  }

下面主要看 backend.reviveOffers() 這個方法萌腿,在提交模式是 standalone 模式下限匣,實際上是調(diào)用 StandaloneSchedulerBackend 的 reviveOffers 方法,實則調(diào)用的是其父類 CoarseGrainedSchedulerBackend 的 reviveOffers 方法毁菱,這個方法是向 driverEndpoint 發(fā)送一個 ReviveOffers 消息米死。

代碼塊

override def reviveOffers() {
    // 向 driverEndpoint 發(fā)送 ReviveOffers 消息
    driverEndpoint.send(ReviveOffers)
  }

DriverEndpoint 收到信息后會調(diào)用 makeOffers 方法:

case ReviveOffers =>
        makeOffers()

makeOffers 方法內(nèi)部會將 application 所有可用的 executor 封裝成一個 workOffers,每個 workOffers 內(nèi)部封裝了每個 executor 的資源數(shù)量贮庞。

然后調(diào)用 taskScheduler 的 resourceOffers 從上面封裝的 workOffers 信息為每個 task 分配合適的 executor峦筒。

最后調(diào)用 launchTasks 啟動 task。

private def makeOffers() {
      // 過濾出可用的 executor
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
      // 將這些 executor 封裝成 workOffers
      val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toIndexedSeq
      // 給每個 task 分配 executor窗慎,然后調(diào)用 launchTasks 啟動這些 task
      launchTasks(scheduler.resourceOffers(workOffers))
    }

下面看一下 launchTasks 這個方法勘天。

這個方法主要做了這些操作:

1.遍歷每個 task,然后將每個 task 信息序列化捉邢。

2.判斷序列化后的 task 信息,如果大于 rpc 發(fā)送消息的最大值商膊,則停止伏伐,建議調(diào)整 rpc 的 maxRpcMessageSize,如果小于 rpc 發(fā)送消息的最大值晕拆,則找到 task 對應(yīng)的 executor藐翎,然后更新該 executor 對應(yīng)的一些內(nèi)存資源信息材蹬。

3.向 executor 發(fā)送 LaunchTask 消息。

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
        // 遍歷所有的 task
      for (task <- tasks.flatten) {
        // 序列化 task 信息
        val serializedTask = ser.serialize(task)
        // 判斷序列化后的 task 信息是否大于 rpc 能夠傳送的最大信息量
        if (serializedTask.limit >= maxRpcMessageSize) {
         ....
        }
        else {
          // 找到對應(yīng)的 executor
          val executorData = executorDataMap(task.executorId)
          // 更新 executor 的資源信息
          executorData.freeCores -= scheduler.CPUS_PER_TASK

          logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
            s"${executorData.executorHost}.")

          // 向 executor 發(fā)送 LaunchTask 消息
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }

3.2.2Executor端執(zhí)行Task

Executor 收到消息后做了哪些操作吝镣?這里 executorData.executorEndpoint 實際上就是在創(chuàng)建 Executor 守護(hù)進(jìn)程時候創(chuàng)建的那個 CoarseGrainedExecutorBackend堤器。

CoarseGrainedExecutorBackend處理接收到 LaunchTask 消息后會判斷當(dāng)前的 executor 是不是為空,如果不為空就會反序列化 task 的信息末贾,然后調(diào)用 executor 的 launchTask 方法闸溃。

case LaunchTask(data) =>
        // 判斷當(dāng)前 executor 是不是空
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
        // 反序列化 task 的信息
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        // 調(diào)用 executor 的 lauchTask 方法
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }

executor的 launchTask方法首先會為每個 task 創(chuàng)建一個 TaskRunner,然后會將 task 添加到 runningTasks 的集合中拱撵,并標(biāo)記其為運行狀態(tài)辉川,最后將 taskRunner 放到一個線程池中執(zhí)行。

def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer): Unit = {
    // 創(chuàng)建 TaskRunner
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    runningTasks.put(taskId, tr)
    // 將 taskRunner 放到線程池中執(zhí)行
    threadPool.execute(tr)
  }

task 運行完成后回向 driver 發(fā)送消息拴测,driver 會更新 executor 的一些資源數(shù)據(jù)乓旗,并標(biāo)記 task 已完成。

TaskScheduler是一個trait接口集索,任務(wù)調(diào)度器的實現(xiàn)只有一種就是TaskSchedulerImpl屿愚。TaskSchedulerImpl主要處理一些通用的邏輯,例如在多個作業(yè)之間決定調(diào)度順序务荆,執(zhí)行推測執(zhí)行的邏輯等等妆距。主要邏輯是:

3.3小結(jié)

任務(wù)在driver中從誕生到最終發(fā)送的過程,主要有一下幾個步驟:

  • DAGScheduler對作業(yè)計算鏈按照shuffle依賴劃分多個stage蛹含,提交一個stage根據(jù)個stage的一些信息創(chuàng)建多個Task毅厚,包括ShuffleMapTask和ResultTask, 并封裝成一個任務(wù)集(TaskSet),把這個任務(wù)集交給TaskScheduler

  • TaskSchedulerImpl將接收到的任務(wù)集加入調(diào)度池中,然后通知調(diào)度后端SchedulerBackend

  • CoarseGrainedSchedulerBackend收到新任務(wù)提交的通知后浦箱,檢查下現(xiàn)在可用 executor有哪些吸耿,并把這些可用的executor交給TaskSchedulerImpl

  • TaskSchedulerImpl根據(jù)獲取到的計算資源,根據(jù)任務(wù)本地性級別的要求以及考慮到黑名單因素酷窥,按照round-robin的方式對可用的executor進(jìn)行輪詢分配任務(wù)咽安,經(jīng)過多個本地性級別分配,多輪分配后最終得出任務(wù)與executor之間的分配關(guān)系蓬推,并封裝成TaskDescription形式返回給SchedulerBackend

  • SchedulerBackend拿到這些分配關(guān)系后妆棒,就知道哪些任務(wù)該發(fā)往哪個executor了,通過調(diào)用rpc接口將任務(wù)通過網(wǎng)絡(luò)發(fā)送即可沸伏。

4.Spark運行架構(gòu)特點

(1)每個Application獲取專屬的executor進(jìn)程糕珊,該進(jìn)程在Application期間一直駐留,并以多線程方式運行tasks毅糟。這種Application隔離機制有其優(yōu)勢的红选,無論是從調(diào)度角度看(每個Driver調(diào)度它自己的任務(wù)),還是從運行角度看(來自不同Application的Task運行在不同的JVM中)姆另。當(dāng)然喇肋,這也意味著Spark Application不能跨應(yīng)用程序共享數(shù)據(jù)坟乾,除非將數(shù)據(jù)寫入到外部存儲系統(tǒng)。

(2)Spark與資源管理器無關(guān)蝶防,只要能夠獲取executor進(jìn)程甚侣,并能保持相互通信就可以了。

(3)提交SparkContext的Client應(yīng)該靠近Worker節(jié)點(運行Executor的節(jié)點)间学,最好是在同一個Rack里殷费,因為Spark Application運行過程中SparkContext和Executor之間有大量的信息交換;如果想在遠(yuǎn)程集群中運行菱鸥,最好使用RPC將SparkContext提交給集群宗兼,不要遠(yuǎn)離Worker運行SparkContext。

(4)Task采用了數(shù)據(jù)本地性和推測執(zhí)行的優(yōu)化機制氮采。

5.參考

  1. https://www.cnblogs.com/xing901022/p/6674966.html

  2. https://www.cnblogs.com/zhuge134/p/10965266.html

  3. https://blog.csdn.net/xianpanjia4616/article/details/84405145

  4. https://juejin.im/post/5d23069a6fb9a07f091bc66e

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末殷绍,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子鹊漠,更是在濱河造成了極大的恐慌主到,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,682評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件躯概,死亡現(xiàn)場離奇詭異登钥,居然都是意外死亡,警方通過查閱死者的電腦和手機娶靡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評論 3 395
  • 文/潘曉璐 我一進(jìn)店門牧牢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人姿锭,你說我怎么就攤上這事塔鳍。” “怎么了呻此?”我有些...
    開封第一講書人閱讀 165,083評論 0 355
  • 文/不壞的土叔 我叫張陵轮纫,是天一觀的道長。 經(jīng)常有香客問我焚鲜,道長危号,這世上最難降的妖魔是什么钮莲? 我笑而不...
    開封第一講書人閱讀 58,763評論 1 295
  • 正文 為了忘掉前任快耿,我火速辦了婚禮盏袄,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘葱她。我一直安慰自己情连,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,785評論 6 392
  • 文/花漫 我一把揭開白布览效。 她就那樣靜靜地躺著却舀,像睡著了一般。 火紅的嫁衣襯著肌膚如雪锤灿。 梳的紋絲不亂的頭發(fā)上挽拔,一...
    開封第一講書人閱讀 51,624評論 1 305
  • 那天,我揣著相機與錄音但校,去河邊找鬼螃诅。 笑死,一個胖子當(dāng)著我的面吹牛状囱,可吹牛的內(nèi)容都是我干的术裸。 我是一名探鬼主播,決...
    沈念sama閱讀 40,358評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼亭枷,長吁一口氣:“原來是場噩夢啊……” “哼袭艺!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起叨粘,我...
    開封第一講書人閱讀 39,261評論 0 276
  • 序言:老撾萬榮一對情侶失蹤猾编,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后升敲,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體答倡,經(jīng)...
    沈念sama閱讀 45,722評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年驴党,在試婚紗的時候發(fā)現(xiàn)自己被綠了瘪撇。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,030評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡港庄,死狀恐怖倔既,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情攘轩,我是刑警寧澤叉存,帶...
    沈念sama閱讀 35,737評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站度帮,受9級特大地震影響歼捏,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜笨篷,卻給世界環(huán)境...
    茶點故事閱讀 41,360評論 3 330
  • 文/蒙蒙 一瞳秽、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧率翅,春花似錦练俐、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽燕锥。三九已至,卻和暖如春悯蝉,著一層夾襖步出監(jiān)牢的瞬間归形,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評論 1 270
  • 我被黑心中介騙來泰國打工鼻由, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留暇榴,地道東北人。 一個月前我還...
    沈念sama閱讀 48,237評論 3 371
  • 正文 我出身青樓蕉世,卻偏偏與公主長得像蔼紧,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子狠轻,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,976評論 2 355

推薦閱讀更多精彩內(nèi)容