Spark-Core源碼精讀(13)次企、Task的運(yùn)行流程分析

上一篇文章我們著重分析了Task的提交過(guò)程入桂,本文中我們將對(duì)Task的運(yùn)行進(jìn)行詳細(xì)的分析份名。

我們從CoarseGrainedExecutorBackend接收到CoarseGrainedSchedulerBackend發(fā)過(guò)來(lái)的LaunchTask消息開始:

case LaunchTask(data) =>
  if (executor == null) {
    logError("Received LaunchTask command but executor was null")
    System.exit(1)
  } else {
    // 反序列化
    val taskDesc = ser.deserialize[TaskDescription](data.value)
    logInfo("Got assigned task " + taskDesc.taskId)
    // 調(diào)用Executor的launchTask來(lái)運(yùn)行Task
    executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
      taskDesc.name, taskDesc.serializedTask)
  }

接著進(jìn)入Executor的launchTask方法:

def launchTask(
    context: ExecutorBackend,
    taskId: Long,
    attemptNumber: Int,
    taskName: String,
    serializedTask: ByteBuffer): Unit = {
  // 實(shí)例化TaskRunner
  val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
    serializedTask)
  // 放入ConcurrentHashMap[Long, TaskRunner]的數(shù)據(jù)結(jié)構(gòu)中
  runningTasks.put(taskId, tr)
  // 在線程池中運(yùn)行剛才實(shí)例化的TaskRunner婚肆,也就是執(zhí)行其中的run()方法
  threadPool.execute(tr)
}

Executor的launchTask方法首先實(shí)例化一個(gè)TaskRunner(實(shí)現(xiàn)了Runnable接口)租副,然后使用線程池中的線程執(zhí)行實(shí)例化的TaskRunner中的run()方法,下面就進(jìn)入到TaskRunner的run()方法中较性,為了便于大家閱讀我們將該方法分成幾個(gè)部分:

// 實(shí)例化TaskMemoryManager用僧,即內(nèi)存管理
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
// 記錄反序列化的開始事件
val deserializeStartTime = System.currentTimeMillis()
// 設(shè)置ClassLoader
Thread.currentThread.setContextClassLoader(replClassLoader)
// 序列化器
val ser = env.closureSerializer.newInstance()
// 打印日志信息
logInfo(s"Running $taskName (TID $taskId)")
// 通過(guò)ExecutorBackend的statusUpdate方法向Driver發(fā)消息,匯報(bào)Task的狀態(tài)為RUNNING狀態(tài)
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStart: Long = 0
// GC事件
startGCTime = computeTotalGcTime()

Driver(DriverEndpoint)接收到消息后的處理不是我們關(guān)注的重點(diǎn)赞咙,我們聚焦于Task是怎樣運(yùn)行的责循,繼續(xù)閱讀下面的源碼:

try {
  // 反序列化成Task的依賴關(guān)系,包括taskBytes
  val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
  // 更新依賴關(guān)系攀操,也就是下載依賴(文件院仿、jar),下載的時(shí)候使用了synchronized關(guān)鍵字
  // 因?yàn)閷?duì)于每個(gè)Executor中的Tasks而言速和,這些依賴是共享資源
  updateDependencies(taskFiles, taskJars)
  // 將taskBytes反序列化成Task
  task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
  // 設(shè)置內(nèi)存管理器
  task.setTaskMemoryManager(taskMemoryManager)
  // If this task has been killed before we deserialized it, let's quit now. Otherwise,
  // continue executing the task.
  if (killed) {
    // Throw an exception rather than returning, because returning within a try{} block
    // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
    // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
    // for the task.
    throw new TaskKilledException
  }
  logDebug("Task " + taskId + "'s epoch is " + task.epoch)
  env.mapOutputTracker.updateEpoch(task.epoch)
  // 調(diào)用task的run()方法來(lái)執(zhí)行任務(wù)并獲得執(zhí)行結(jié)果
  // Run the actual task and measure its runtime.
  taskStart = System.currentTimeMillis()
  var threwException = true
  val (value, accumUpdates) = try {
    val res = task.run(
      taskAttemptId = taskId,
      attemptNumber = attemptNumber,
      metricsSystem = env.metricsSystem)
    threwException = false
    res
  } finally {
    ...
  }
  ...
  // 后面是對(duì)Task運(yùn)行完成后返回結(jié)果進(jìn)行的處理

首先就是反序列化依賴關(guān)系歹垫,關(guān)于序列化和反序列化我們會(huì)在本文的最統(tǒng)一的進(jìn)行總結(jié)。然后將taskBytes反序列化成Task颠放,最后調(diào)用Task的run()方法來(lái)執(zhí)行具體的Task并獲得執(zhí)行結(jié)果排惨,后面就是對(duì)Task運(yùn)行完成后返回結(jié)果的處理,我們?cè)赥ask運(yùn)行完成后再進(jìn)行分析碰凶,接下來(lái)我們進(jìn)入Task的run()方法:

final def run(
  taskAttemptId: Long,
  attemptNumber: Int,
  metricsSystem: MetricsSystem)
: (T, AccumulatorUpdates) = {
  context = new TaskContextImpl(
    stageId,
    partitionId,
    taskAttemptId,
    attemptNumber,
    taskMemoryManager,
    metricsSystem,
    internalAccumulators,
    runningLocally = false)
  TaskContext.setTaskContext(context)
  context.taskMetrics.setHostname(Utils.localHostName())
  context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators)
  taskThread = Thread.currentThread()
  if (_killed) {
    kill(interruptThread = false)
  }
  try {
    (runTask(context), context.collectAccumulators())
  } catch {
    ...
  } finally {
    ...
  }
}

可以看到內(nèi)部實(shí)際上調(diào)用的是Task的runTask方法暮芭,而根據(jù)不同的Task類型運(yùn)行的就是ShuffleMapTask或者ResultTask的runTask方法,下面我們就分別進(jìn)行說(shuō)明:

ShuffleMapTask

override def runTask(context: TaskContext): MapStatus = {
  // Deserialize the RDD using the broadcast variable.
  // 記錄反序列化開始的時(shí)間
  val deserializeStartTime = System.currentTimeMillis()
  // 獲取序列化/反序列化器
  val ser = SparkEnv.get.closureSerializer.newInstance()
  // 反序列化RDD及其ShuffleDependency
  val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  // 計(jì)算出反序列化所需要的時(shí)間
  _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
  metrics = Some(context.taskMetrics)
  var writer: ShuffleWriter[Any, Any] = null
  try {
    // 獲得ShuffleManager欲低,分成Hash和Sort的方式辕宏,默認(rèn)是Sort的方式
    // ShuffleManager是在SparkEnv中創(chuàng)建的(包括Driver和Executor)
    // Driver使用它注冊(cè)shuffles,而Executors可以向他讀取和寫入數(shù)據(jù)
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
    writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    writer.stop(success = true).get
  } catch {
    case e: Exception =>
      try {
        if (writer != null) {
          writer.stop(success = false)
        }
      } catch {
        case e: Exception =>
          log.debug("Could not stop writer", e)
      }
      throw e
  }
}

因?yàn)镾huffle是影響整個(gè)Spark應(yīng)用程序運(yùn)行的關(guān)鍵所在伸头,所以關(guān)于Shuffle的部分我們會(huì)單獨(dú)用文章分析匾效,現(xiàn)在關(guān)心的是Task的具體計(jì)算舷蟀,可以看出最后執(zhí)行的是RDD的iterator方法恤磷,該方法就是我們針對(duì)當(dāng)前Task所對(duì)應(yīng)的Partition進(jìn)行計(jì)算的關(guān)鍵所在,在具體的處理內(nèi)部會(huì)迭代Partition的元素并交給我們自定義的function進(jìn)行處理野宜。

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

第一次肯定是沒(méi)有緩存的扫步,所以直接調(diào)用compute,而具體的RDD實(shí)現(xiàn)不同的compute邏輯匈子,我們這里以MapPartitionsRDD的compute方法為例:

override def compute(split: Partition, context: TaskContext): Iterator[U] =
  f(context, split.index, firstParent[T].iterator(split, context))

可以清楚的看見直接執(zhí)行了我們編寫的函數(shù)f河胎,這里注意第二個(gè)參數(shù),同樣也是調(diào)用的父RDD的iterator方法虎敦,這樣就將同一個(gè)Stage內(nèi)的函數(shù)進(jìn)行展開計(jì)算游岳,形如:

// RDD1
x = 1 + y  // 這里的y就可以代表從HDFS中讀取的數(shù)據(jù)
// RDD2
z = x + 3

// 展開之后
z = (1 + y) + 3

// 這里只是打個(gè)比方政敢,方便大家理解

ResultTask

override def runTask(context: TaskContext): U = {
  // Deserialize the RDD and the func using the broadcast variables.
  // 記錄反序列化事件
  val deserializeStartTime = System.currentTimeMillis()
  // 獲取序列化/反序列化器
  val ser = SparkEnv.get.closureSerializer.newInstance()
  // 執(zhí)行反序列化,和Shuffle不同返回的是RDD和我們編寫的業(yè)務(wù)邏輯
  val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
  metrics = Some(context.taskMetrics)
  // 執(zhí)行我們編寫的業(yè)務(wù)邏輯代碼
  func(context, rdd.iterator(partition, context))
}

我們?cè)賮?lái)看ResultTask胚迫,和Shuffle不同的是ResultTask會(huì)直接產(chǎn)生最后的計(jì)算結(jié)果喷户。

接下來(lái)我們回過(guò)頭來(lái)看一下Task的run()方法對(duì)計(jì)算結(jié)果的處理:

override def run(): Unit = {
  
  ...
  
  try {
  
    ...
    
    // 記錄task運(yùn)行結(jié)束的時(shí)間
    val taskFinish = System.currentTimeMillis()
    // If the task has been killed, let's fail it.
    if (task.killed) {
      throw new TaskKilledException
    }
    // 序列化器
    val resultSer = env.serializer.newInstance()
    // 記錄序列化開始時(shí)間
    val beforeSerialization = System.currentTimeMillis()
    // 對(duì)返回的結(jié)果進(jìn)行序列化
    val valueBytes = resultSer.serialize(value)
    // 記錄序列化結(jié)束的時(shí)間
    val afterSerialization = System.currentTimeMillis()
    // 記錄一系列統(tǒng)計(jì)信息
    for (m <- task.metrics) {
      // Deserialization happens in two parts: first, we deserialize a Task object, which
      // includes the Partition. Second, Task.run() deserializes the RDD and function to be run
      m.setExecutorDeserializeTime(
        (taskStart - deserializeStartTime) + task.executorDeserializeTime)
      // We need to subtract Task.run()'s deserialization time to avoid double-counting
      m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
      m.setJvmGCTime(computeTotalGcTime() - startGCTime)
      m.setResultSerializationTime(afterSerialization - beforeSerialization)
      m.updateAccumulators()
    }
    // 使用DirectTaskResult對(duì)結(jié)果等信息進(jìn)行封裝
    val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
    // 對(duì)DirectTaskResult進(jìn)行序列化
    val serializedDirectResult = ser.serialize(directResult)
    // 獲取序列化后的大小
    val resultSize = serializedDirectResult.limit
    // directSend = sending directly back to the driver
    val serializedResult: ByteBuffer = {
      // 判斷序列化后的大小是否大于maxResultSize的限制(默認(rèn)大小為1GB)
      if (maxResultSize > 0 && resultSize > maxResultSize) {
        logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
          s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
          s"dropping it.")
        ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
      // 然后再判斷序列化后的大小是否大于等于akkaFrameSize - AkkaUtils.reservedSizeBytes,默認(rèn)大小為:128MB-200k
      } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
        // 獲得blockId
        val blockId = TaskResultBlockId(taskId)
        // 通過(guò)blockManager寫入访锻,這里是存儲(chǔ)級(jí)別是MEMORY_AND_DISK_SER
        env.blockManager.putBytes(
          blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
        logInfo(
          s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
        // 序列化
        ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
      } else {
        logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
        // 不經(jīng)過(guò)BlockManager褪尝,直接返回序列化后的結(jié)果
        serializedDirectResult
      }
    }
    execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
  } catch {
  
    ...
    
  } finally {
    runningTasks.remove(taskId)
  }
}

具體的結(jié)果(serializedResult)需要通過(guò)判斷序列化后的大小resultSize來(lái)決定:

  • 如果resultSize的大于maxResultSize(通過(guò)“spark.driver.maxResultSize”進(jìn)行配置),同時(shí)保證maxResultSize的值是大于0的期犬,那么返回的就是對(duì)IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize)序列化后的結(jié)果河哑,并打下Warning日志
  • 如果resultSize的小于等于maxResultSize并且大于等于128MB-200k,就通過(guò)BlockManager進(jìn)行存儲(chǔ)龟虎,存儲(chǔ)的級(jí)別為MEMORY_AND_DISK_SER璃谨,并且最后對(duì)封裝的IndirectTaskResult進(jìn)行序列化后的結(jié)果
  • 如果resultSize的大小小于128MB-200k,則直接返回序列化后的結(jié)果

最后通過(guò)調(diào)用ExecutorBackend(Standalone下就是CoarseGrainedExecutorBackend)的statusUpdate方法將結(jié)果返回給DriverEndpoint鲤妥,具體就是CoarseGrainedExecutorBackend向DriverEndpoint發(fā)送StatusUpdate來(lái)傳輸執(zhí)行結(jié)果:

override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
  // 將信息封裝成StatusUpdate
  val msg = StatusUpdate(executorId, taskId, state, data)
  driver match {
    case Some(driverRef) => driverRef.send(msg)
    case None => logWarning(s"Drop $msg because has not yet connected to driver")
  }
}

DriverEndpoint在接收到statusUpdate消息后進(jìn)行的操作:

case StatusUpdate(executorId, taskId, state, data) =>
  // 首先調(diào)用TaskSchedulerImpl的statusUpdate方法
  scheduler.statusUpdate(taskId, state, data.value)
  // 下面就是釋放并重新分配剛才Task使用的計(jì)算資源
  if (TaskState.isFinished(state)) {
    executorDataMap.get(executorId) match {
      case Some(executorInfo) =>
        executorInfo.freeCores += scheduler.CPUS_PER_TASK
        makeOffers(executorId)
      case None =>
        // Ignoring the update since we don't know about the executor.
        logWarning(s"Ignored task status update ($taskId state $state) " +
          s"from unknown executor with ID $executorId")
    }
  }

上面的操作分成兩步:首先調(diào)用TaskSchedulerImpl的statusUpdate方法睬罗;然后就是釋放并重新分配剛才Task使用的計(jì)算資源,我們直接進(jìn)入TaskSchedulerImpl的statusUpdate方法:

def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
  var failedExecutor: Option[String] = None
  synchronized {
    try {
      if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
        // We lost this entire executor, so remember that it's gone
        val execId = taskIdToExecutorId(tid)
        if (executorIdToTaskCount.contains(execId)) {
          removeExecutor(execId,
            SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
          failedExecutor = Some(execId)
        }
      }
      taskIdToTaskSetManager.get(tid) match {
        case Some(taskSet) =>
          if (TaskState.isFinished(state)) {
            taskIdToTaskSetManager.remove(tid)
            taskIdToExecutorId.remove(tid).foreach { execId =>
              if (executorIdToTaskCount.contains(execId)) {
                executorIdToTaskCount(execId) -= 1
              }
            }
          }
          if (state == TaskState.FINISHED) {
            taskSet.removeRunningTask(tid)
            taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
          } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
            taskSet.removeRunningTask(tid)
            taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
          }
        case None =>
          logError(
            ("Ignoring update with state %s for TID %s because its task set is gone (this is " +
              "likely the result of receiving duplicate task finished status updates)")
              .format(state, tid))
      }
    } catch {
      case e: Exception => logError("Exception in statusUpdate", e)
    }
  }
  // 防止產(chǎn)生死鎖
  // Update the DAGScheduler without holding a lock on this, since that can deadlock
  if (failedExecutor.isDefined) {
    dagScheduler.executorLost(failedExecutor.get)
    backend.reviveOffers()
  }
}

上面的源碼中最主要的部分就是使用TaskResultGetter來(lái)處理Successful或是FailedTask旭斥,即分別調(diào)用了TaskResultGetter的enqueueSuccessfulTask方法和enqueueFailedTask方法容达,我們現(xiàn)在關(guān)注的是Task執(zhí)行成功的情況(對(duì)于失敗的情況簡(jiǎn)單來(lái)說(shuō)就是進(jìn)行重試),所以我們進(jìn)入TaskResultGetter的enqueueSuccessfulTask方法:(注意下面只選取了主要的部分)

// 對(duì)結(jié)果進(jìn)行了反序列化處理
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
  // 下面就是匹配受到結(jié)果的類型垂券,進(jìn)而進(jìn)行不同的處理
  case directResult: DirectTaskResult[_] =>
    if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
      return
    }
    // deserialize "value" without holding any lock so that it won't block other threads.
    // We should call it here, so that when it's called again in
    // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
    directResult.value()
    (directResult, serializedData.limit())
  case IndirectTaskResult(blockId, size) =>
    if (!taskSetManager.canFetchMoreResults(size)) {
      // dropped by executor if size is larger than maxResultSize
      sparkEnv.blockManager.master.removeBlock(blockId)
      return
    }
    logDebug("Fetching indirect task result for TID %s".format(tid))
    scheduler.handleTaskGettingResult(taskSetManager, tid)
    val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
    if (!serializedTaskResult.isDefined) {
      /* We won't be able to get the task result if the machine that ran the task failed
       * between when the task ended and when we tried to fetch the result, or if the
       * block manager had to flush the result. */
      scheduler.handleFailedTask(
        taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
      return
    }
    val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
      serializedTaskResult.get)
    sparkEnv.blockManager.master.removeBlock(blockId)
    (deserializedResult, size)
}
// 使用統(tǒng)計(jì)系統(tǒng)記錄ResultSize
result.metrics.setResultSize(size)
scheduler.handleSuccessfulTask(taskSetManager, tid, result)

具體就是根據(jù)發(fā)過(guò)來(lái)的結(jié)果的類型進(jìn)行模式匹配花盐,然后分情況進(jìn)行處理:

如果接收到的是DirectTaskResult類型的數(shù)據(jù),也就是說(shuō)序列化后的大小小于128MB-200k的話菇爪,就返回(directResult, serializedData.limit())給(result, size)算芯;

如果接收到的是IndirectTaskResult,且序列化后的大小大于1GB的話凳宙,就dropped掉熙揍,否則就通過(guò)BlockManager獲取上面使用BlcokManager存儲(chǔ)的數(shù)據(jù),然后進(jìn)行反序列化處理氏涩,處理完成后返回(deserializedResult, size)給(result, size)届囚。

最后調(diào)用TaskSchedulerImpl的handleSuccessfulTask方法:

def handleSuccessfulTask(
    taskSetManager: TaskSetManager,
    tid: Long,
    taskResult: DirectTaskResult[_]): Unit = synchronized {
  taskSetManager.handleSuccessfulTask(tid, taskResult)
}

進(jìn)而調(diào)用TaskSetManager的handleSuccessfulTask方法:

def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
  
  ...
  
  sched.dagScheduler.taskEnded(
    tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
  ...
}

最主要的就是調(diào)用DAGScheduler的taskEnded方法:

def taskEnded(
    task: Task[_],
    reason: TaskEndReason,
    result: Any,
    accumUpdates: Map[Long, Any],
    taskInfo: TaskInfo,
    taskMetrics: TaskMetrics): Unit = {
  eventProcessLoop.post(
    CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
}

通過(guò)eventProcessLoop.post將CompletionEvent加入到消息隊(duì)列中,我們直接看DAGScheduler對(duì)該消息的處理:

case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
  dagScheduler.handleTaskCompletion(completion)

至此我們就不再往下追蹤了是尖,感興趣的朋友可以繼續(xù)追蹤下去意系,接下來(lái)的文章我們開始對(duì)Shuffle部分進(jìn)行細(xì)致的分析。

使用一張圖來(lái)簡(jiǎn)單的概括一下上面的流程:

補(bǔ)充:Task的序列化和反序列化的總結(jié):

序列化:
1饺汹、對(duì)RDD及其ShuffleDependency的序列化:
try {
  // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
  // For ResultTask, serialize and broadcast (rdd, func).
  val taskBinaryBytes: Array[Byte] = stage match {
    case stage: ShuffleMapStage =>
      closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
    case stage: ResultStage =>
      closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array()
  }
  taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
2蛔添、TaskSetManager:對(duì)Task依賴關(guān)系的序列化
val serializedTask: ByteBuffer = try {
  Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
} catch {
序列化完成后封裝成TaskDescription:
return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
  taskName, index, serializedTask))
3、CoarseGrainedSchedulerBackend中的DriverEndpoint:對(duì)TaskDescription的序列化:
// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
    val serializedTask = ser.serialize(task)

反序列化:

1、CoarseGrainedExecutorBackend接收到LaunchTask消息后:反序列化成TaskDescription
case LaunchTask(data) =>
  if (executor == null) {
    logError("Received LaunchTask command but executor was null")
    System.exit(1)
  } else {
    val taskDesc = ser.deserialize[TaskDescription](data.value)
2迎瞧、Executor在使用線程池中的線程運(yùn)行TaskRunner的run()方法的時(shí)候:反序列化依賴關(guān)系
try {
  val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
3夸溶、Executor在使用線程池中的線程運(yùn)行TaskRunner的run()方法的時(shí)候:反序列化成Task
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
4、ShuffleMapTask或者ResultTask在執(zhí)行runTask()方法的時(shí)候:反序列化RDD及其ShuffleDependency

ShuffleMapTask:

val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
  ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

ResultTask:

val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
  ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

本文參照的是Spark 1.6.3版本的源碼凶硅,同時(shí)給出Spark 2.1.0版本的連接:

Spark 1.6.3 源碼

Spark 2.1.0 源碼

本文為原創(chuàng)蜘醋,歡迎轉(zhuǎn)載,轉(zhuǎn)載請(qǐng)注明出處咏尝、作者压语,謝謝!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末编检,一起剝皮案震驚了整個(gè)濱河市胎食,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌允懂,老刑警劉巖厕怜,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異蕾总,居然都是意外死亡粥航,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門生百,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)递雀,“玉大人,你說(shuō)我怎么就攤上這事蚀浆∽撼蹋” “怎么了?”我有些...
    開封第一講書人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵市俊,是天一觀的道長(zhǎng)杨凑。 經(jīng)常有香客問(wèn)我,道長(zhǎng)摆昧,這世上最難降的妖魔是什么撩满? 我笑而不...
    開封第一講書人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮绅你,結(jié)果婚禮上伺帘,老公的妹妹穿的比我還像新娘。我一直安慰自己勇吊,他們只是感情好曼追,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開白布窍仰。 她就那樣靜靜地躺著汉规,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上针史,一...
    開封第一講書人閱讀 51,125評(píng)論 1 297
  • 那天晶伦,我揣著相機(jī)與錄音,去河邊找鬼啄枕。 笑死婚陪,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的频祝。 我是一名探鬼主播泌参,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼常空!你這毒婦竟也來(lái)了沽一?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤漓糙,失蹤者是張志新(化名)和其女友劉穎铣缠,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體昆禽,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蝗蛙,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了醉鳖。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片捡硅。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖盗棵,靈堂內(nèi)的尸體忽然破棺而出病曾,到底是詐尸還是另有隱情,我是刑警寧澤漾根,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布泰涂,位于F島的核電站,受9級(jí)特大地震影響辐怕,放射性物質(zhì)發(fā)生泄漏逼蒙。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一寄疏、第九天 我趴在偏房一處隱蔽的房頂上張望是牢。 院中可真熱鬧,春花似錦陕截、人聲如沸驳棱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)社搅。三九已至驻债,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間形葬,已是汗流浹背合呐。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留笙以,地道東北人淌实。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像猖腕,于是被迫代替她去往敵國(guó)和親拆祈。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容