【Spark Core】任務(wù)執(zhí)行機制和Task源碼淺析2

引言

上一小節(jié)《任務(wù)執(zhí)行機制和Task源碼淺析1》介紹了Executor的注冊過程。
這一小節(jié)趴樱,我將從Executor端馒闷,就接收LaunchTask消息之后Executor的執(zhí)行任務(wù)過程進行介紹。

1. Executor的launchTasks函數(shù)

DriverActor提交任務(wù)叁征,發(fā)送LaunchTask指令給CoarseGrainedExecutorBackend纳账,接收到指令之后,讓它內(nèi)部的executor來發(fā)起任務(wù)捺疼,即調(diào)用空閑的executor的launchTask函數(shù)疏虫。
下面是CoarseGrainedExecutorBackend中receiveWithLogging的部分代碼:

    case LaunchTask(data) =>
      if (executor == null) {
        logError("Received LaunchTask command but executor was null")
        System.exit(1)
      } else {
        val ser = env.closureSerializer.newInstance()
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }

Executor執(zhí)行task:

  def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer) {
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }

Executor內(nèi)部維護一個線程池,可以跑多個task帅涂,每一個提交的task都會包裝成TaskRunner交由threadPool執(zhí)行议薪。

2. TaskRunner的run方法

run方法中val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)是真正執(zhí)行task中的任務(wù)。

下面是TaskRunner中run方法的部分代碼:

      try {
        val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
        updateDependencies(taskFiles, taskJars)
        // 反序列化Task
        task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)

        // If this task has been killed before we deserialized it, let's quit now. Otherwise,
        // continue executing the task.
        if (killed) {
          // Throw an exception rather than returning, because returning within a try{} block
          // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
          // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
          // for the task.
          throw new TaskKilledException
        }

        attemptedTask = Some(task)
        logDebug("Task " + taskId + "'s epoch is " + task.epoch)
        env.mapOutputTracker.updateEpoch(task.epoch)

        // Run the actual task and measure its runtime.
        // 運行Task, 具體可以去看ResultTask和ShuffleMapTask
        taskStart = System.currentTimeMillis()
        val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
        val taskFinish = System.currentTimeMillis()

        // If the task has been killed, let's fail it.
        if (task.killed) {
          throw new TaskKilledException
        }

        // 對結(jié)果進行序列化
        val resultSer = env.serializer.newInstance()
        val beforeSerialization = System.currentTimeMillis()
        val valueBytes = resultSer.serialize(value)
        val afterSerialization = System.currentTimeMillis()

        // 更新任務(wù)的相關(guān)監(jiān)控信息媳友,會反映到監(jiān)控頁面上的
        for (m <- task.metrics) {
          m.setExecutorDeserializeTime(taskStart - deserializeStartTime)
          m.setExecutorRunTime(taskFinish - taskStart)
          m.setJvmGCTime(gcTime - startGCTime)
          m.setResultSerializationTime(afterSerialization - beforeSerialization)
        }

        val accumUpdates = Accumulators.values
        // 對結(jié)果進行再包裝斯议,包裝完再進行序列化
        val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
        val serializedDirectResult = ser.serialize(directResult)
        val resultSize = serializedDirectResult.limit

        // directSend = sending directly back to the driver
        val serializedResult = {
          if (maxResultSize > 0 && resultSize > maxResultSize) {
            logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
              s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
              s"dropping it.")
            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
          } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
            // 如果中間結(jié)果的大小超過了spark.akka.frameSize(默認是10M)的大小,就要提升序列化級別了醇锚,超過內(nèi)存的部分要保存到硬盤的
            val blockId = TaskResultBlockId(taskId)
            env.blockManager.putBytes(
              blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
            logInfo(
              s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
            ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
          } else {
            logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
            serializedDirectResult
          }
        }
        // 將任務(wù)完成和taskresult,通過statusUpdate報告給driver
        execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

      } catch {
        //異常處理代碼哼御,略去...
      } finally {
        // 清理為ResultTask注冊的shuffle內(nèi)存,最后把task從正在運行的列表當中刪除
        // Release memory used by this thread for shuffles
        env.shuffleMemoryManager.releaseMemoryForThisThread()
        // Release memory used by this thread for unrolling blocks
        env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
        // Release memory used by this thread for accumulators
        Accumulators.clear()
        runningTasks.remove(taskId)
      }
    }

3. Task執(zhí)行過程

TaskRunner會啟動一個新的線程恋昼,我們看一下run方法中的調(diào)用過程:
TaskRunner.run --> Task.run --> Task.runTask --> RDD.iterator --> RDD.computeOrReadCheckpoint --> RDD.compute

Task的run函數(shù)代碼:

  /**
   * Called by Executor to run this task.
   *
   * @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.
   * @param attemptNumber how many times this task has been attempted (0 for the first attempt)
   * @return the result of the task
   */
  final def run(taskAttemptId: Long, attemptNumber: Int): T = {
    context = new TaskContextImpl(stageId = stageId, partitionId = partitionId,
      taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, runningLocally = false)
    TaskContextHelper.setTaskContext(context)
    context.taskMetrics.setHostname(Utils.localHostName())
    taskThread = Thread.currentThread()
    if (_killed) {
      kill(interruptThread = false)
    }
    try {
      runTask(context)
    } finally {
      context.markTaskCompleted()
      TaskContextHelper.unset()
    }
  }

ShuffleMapTask和ResultTask分別實現(xiàn)了不同的runTask函數(shù)液肌。

ShuffleMapTask的runTask函數(shù)代碼:

  override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    //此處的taskBinary即為在org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的廣播變量取得的  

    metrics = Some(context.taskMetrics)
    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      // 將rdd計算的結(jié)果寫入memory或者disk  
      return writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }

ResultTask的runTask函數(shù)代碼:

  override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

    metrics = Some(context.taskMetrics)
    func(context, rdd.iterator(partition, context))
  }

4. Task狀態(tài)更新

Task執(zhí)行是通過TaskRunner來運行谤祖,它需要通過ExecutorBackend和Driver通信老速,通信消息是StatusUpdate:

  1. Task運行之前橘券,告訴Driver當前Task的狀態(tài)為TaskState.RUNNING。
  1. Task運行之后锋华,告訴Driver當前Task的狀態(tài)為TaskState.FINISHED箭窜,并返回計算結(jié)果绽快。
  2. 如果Task運行過程中發(fā)生錯誤坊罢,告訴Driver當前Task的狀態(tài)為TaskState.FAILED擅耽,并返回錯誤原因乖仇。
  3. 如果Task在中途被Kill掉了,告訴Driver當前Task的狀態(tài)為TaskState.FAILED起趾。


5. Task執(zhí)行完畢

Task執(zhí)行完畢训裆,在TaskRunner的run函數(shù)中蜀铲,通過statusUpdate通知ExecuteBackend,結(jié)果保存在DirectTaskResult中变姨。
SchedulerBackend接收到StatusUpdate之后做如下判斷:如果任務(wù)已經(jīng)成功處理定欧,則將其從監(jiān)視列表中刪除。如果整個作業(yè)中的所有任務(wù)都已經(jīng)完成忧额,則將占用的資源釋放睦番。
TaskSchedulerImpl將當前順利完成的任務(wù)放入完成隊列,同時取出下一個等待運行的Task巩检。

下面CoarseGrainedSchedulerBackend是中處理StatusUpdate消息的代碼:

      case StatusUpdate(executorId, taskId, state, data) =>
        //statusUpdate函數(shù)處理處理從taskset刪除已完成的task等工作
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          executorDataMap.get(executorId) match {
            case Some(executorInfo) =>
              executorInfo.freeCores += scheduler.CPUS_PER_TASK
              makeOffers(executorId)
            case None =>
              // Ignoring the update since we don't know about the executor.
              logWarning(s"Ignored task status update ($taskId state $state) " +
                "from unknown executor $sender with ID $executorId")
          }
        }

scheduler.statusUpdate函數(shù)進行如下步驟:

  1. TaskScheduler通過TaskId找到管理這個Task的TaskSetManager(負責(zé)管理一批Task的類)兢哭,從TaskSetManager里面刪掉這個Task迟螺,并把Task插入到TaskResultGetter(負責(zé)獲取Task結(jié)果的類)的成功隊列里舍咖;
  1. TaskResultGetter獲取到結(jié)果之后排霉,調(diào)用TaskScheduler的handleSuccessfulTask方法把結(jié)果返回;
  2. TaskScheduler調(diào)用TaskSetManager的handleSuccessfulTask方法球订,處理成功的Task瑰钮;
  3. TaskSetManager調(diào)用DAGScheduler的taskEnded方法飞涂,告訴DAGScheduler這個Task運行結(jié)束了,如果這個時候Task全部成功了士八,就會結(jié)束TaskSetManager梁呈。

DAGScheduler在taskEnded方法里觸發(fā)CompletionEvent事件,在處理CompletionEvent消息事件中調(diào)用DAGScheduler的handleTaskCompletion函數(shù)醋虏,針對ResultTask和ShuffleMapTask區(qū)別對待結(jié)果:
1)ResultTask:
job的numFinished加1哮翘,如果numFinished等于它的分片數(shù),則表示任務(wù)該Stage結(jié)束阻课,標記這個Stage為結(jié)束限煞,最后調(diào)用JobListener(具體實現(xiàn)在JobWaiter)的taskSucceeded方法员凝,把結(jié)果交給resultHandler(經(jīng)過包裝的自己寫的那個匿名函數(shù))處理,如果完成的Task數(shù)量等于總?cè)蝿?wù)數(shù)旺上,任務(wù)退出抚官。
2)ShuffleMapTask:

  1. 調(diào)用Stage的addOutputLoc方法阶捆,把結(jié)果添加到Stage的outputLocs列表里
  1. 如果該Stage沒有等待的Task了洒试,就標記該Stage為結(jié)束
  2. 把Stage的outputLocs注冊到MapOutputTracker里面垒棋,留個下一個Stage用
  3. 如果Stage的outputLocs為空痪宰,表示它的計算失敗,重新提交Stage
  4. 找出下一個在等待并且沒有父親的Stage提交

轉(zhuǎn)載請注明作者Jason Ding及其出處
GitCafe博客主頁(http://jasonding1354.gitcafe.io/)
Github博客主頁(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
簡書主頁(http://www.reibang.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354進入我的博客主頁

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市具练,隨后出現(xiàn)的幾起案子扛点,更是在濱河造成了極大的恐慌渡处,老刑警劉巖竞阐,帶你破解...
    沈念sama閱讀 211,265評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異仪召,居然都是意外死亡返咱,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評論 2 385
  • 文/潘曉璐 我一進店門评姨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來吐句,“玉大人店读,你說我怎么就攤上這事∥穆玻” “怎么了氧秘?”我有些...
    開封第一講書人閱讀 156,852評論 0 347
  • 文/不壞的土叔 我叫張陵丸相,是天一觀的道長彼棍。 經(jīng)常有香客問我,道長弛作,這世上最難降的妖魔是什么华匾? 我笑而不...
    開封第一講書人閱讀 56,408評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮黍瞧,結(jié)果婚禮上印颤,老公的妹妹穿的比我還像新娘穿肄。我一直安慰自己,他們只是感情好矢否,可當我...
    茶點故事閱讀 65,445評論 5 384
  • 文/花漫 我一把揭開白布僵朗。 她就那樣靜靜地躺著屑彻,像睡著了一般。 火紅的嫁衣襯著肌膚如雪粪薛。 梳的紋絲不亂的頭發(fā)上违寿,一...
    開封第一講書人閱讀 49,772評論 1 290
  • 那天陨界,我揣著相機與錄音痛阻,去河邊找鬼阱当。 笑死糜工,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的油坝。 我是一名探鬼主播,決...
    沈念sama閱讀 38,921評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼彬檀,長吁一口氣:“原來是場噩夢啊……” “哼窍帝!你這毒婦竟也來了诽偷?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,688評論 0 266
  • 序言:老撾萬榮一對情侶失蹤深浮,失蹤者是張志新(化名)和其女友劉穎飞苇,沒想到半個月后洋闽,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,130評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡羽利,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,467評論 2 325
  • 正文 我和宋清朗相戀三年这弧,在試婚紗的時候發(fā)現(xiàn)自己被綠了虚汛。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片卷哩。...
    茶點故事閱讀 38,617評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖冷溶,靈堂內(nèi)的尸體忽然破棺而出尊浓,到底是詐尸還是另有隱情,我是刑警寧澤苗胀,帶...
    沈念sama閱讀 34,276評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站歌亲,受9級特大地震影響堡掏,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜鹅龄,卻給世界環(huán)境...
    茶點故事閱讀 39,882評論 3 312
  • 文/蒙蒙 一扮休、第九天 我趴在偏房一處隱蔽的房頂上張望拴鸵。 院中可真熱鬧,春花似錦八堡、人聲如沸聘芜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽口叙。三九已至嗅战,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間疟呐,已是汗流浹背厌漂。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評論 1 265
  • 我被黑心中介騙來泰國打工苇倡, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人晓褪。 一個月前我還...
    沈念sama閱讀 46,315評論 2 360
  • 正文 我出身青樓综慎,卻偏偏與公主長得像,于是被迫代替她去往敵國和親好港。 傳聞我的和親對象是個殘疾皇子米罚,可洞房花燭夜當晚...
    茶點故事閱讀 43,486評論 2 348

推薦閱讀更多精彩內(nèi)容