引言
上一小節(jié)《任務(wù)執(zhí)行機制和Task源碼淺析1》介紹了Executor的注冊過程。
這一小節(jié)趴樱,我將從Executor端馒闷,就接收LaunchTask消息之后Executor的執(zhí)行任務(wù)過程進行介紹。
1. Executor的launchTasks函數(shù)
DriverActor提交任務(wù)叁征,發(fā)送LaunchTask指令給CoarseGrainedExecutorBackend纳账,接收到指令之后,讓它內(nèi)部的executor來發(fā)起任務(wù)捺疼,即調(diào)用空閑的executor的launchTask函數(shù)疏虫。
下面是CoarseGrainedExecutorBackend中receiveWithLogging的部分代碼:
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val ser = env.closureSerializer.newInstance()
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
Executor執(zhí)行task:
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
Executor內(nèi)部維護一個線程池,可以跑多個task帅涂,每一個提交的task都會包裝成TaskRunner交由threadPool執(zhí)行议薪。
2. TaskRunner的run方法
run方法中val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
是真正執(zhí)行task中的任務(wù)。
下面是TaskRunner中run方法的部分代碼:
try {
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
// 反序列化Task
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
// If this task has been killed before we deserialized it, let's quit now. Otherwise,
// continue executing the task.
if (killed) {
// Throw an exception rather than returning, because returning within a try{} block
// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
// exception will be caught by the catch block, leading to an incorrect ExceptionFailure
// for the task.
throw new TaskKilledException
}
attemptedTask = Some(task)
logDebug("Task " + taskId + "'s epoch is " + task.epoch)
env.mapOutputTracker.updateEpoch(task.epoch)
// Run the actual task and measure its runtime.
// 運行Task, 具體可以去看ResultTask和ShuffleMapTask
taskStart = System.currentTimeMillis()
val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
val taskFinish = System.currentTimeMillis()
// If the task has been killed, let's fail it.
if (task.killed) {
throw new TaskKilledException
}
// 對結(jié)果進行序列化
val resultSer = env.serializer.newInstance()
val beforeSerialization = System.currentTimeMillis()
val valueBytes = resultSer.serialize(value)
val afterSerialization = System.currentTimeMillis()
// 更新任務(wù)的相關(guān)監(jiān)控信息媳友,會反映到監(jiān)控頁面上的
for (m <- task.metrics) {
m.setExecutorDeserializeTime(taskStart - deserializeStartTime)
m.setExecutorRunTime(taskFinish - taskStart)
m.setJvmGCTime(gcTime - startGCTime)
m.setResultSerializationTime(afterSerialization - beforeSerialization)
}
val accumUpdates = Accumulators.values
// 對結(jié)果進行再包裝斯议,包裝完再進行序列化
val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit
// directSend = sending directly back to the driver
val serializedResult = {
if (maxResultSize > 0 && resultSize > maxResultSize) {
logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
// 如果中間結(jié)果的大小超過了spark.akka.frameSize(默認是10M)的大小,就要提升序列化級別了醇锚,超過內(nèi)存的部分要保存到硬盤的
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
} else {
logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
serializedDirectResult
}
}
// 將任務(wù)完成和taskresult,通過statusUpdate報告給driver
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
} catch {
//異常處理代碼哼御,略去...
} finally {
// 清理為ResultTask注冊的shuffle內(nèi)存,最后把task從正在運行的列表當中刪除
// Release memory used by this thread for shuffles
env.shuffleMemoryManager.releaseMemoryForThisThread()
// Release memory used by this thread for unrolling blocks
env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
// Release memory used by this thread for accumulators
Accumulators.clear()
runningTasks.remove(taskId)
}
}
3. Task執(zhí)行過程
TaskRunner會啟動一個新的線程恋昼,我們看一下run方法中的調(diào)用過程:
TaskRunner.run
--> Task.run
--> Task.runTask
--> RDD.iterator
--> RDD.computeOrReadCheckpoint
--> RDD.compute
。
Task的run函數(shù)代碼:
/**
* Called by Executor to run this task.
*
* @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.
* @param attemptNumber how many times this task has been attempted (0 for the first attempt)
* @return the result of the task
*/
final def run(taskAttemptId: Long, attemptNumber: Int): T = {
context = new TaskContextImpl(stageId = stageId, partitionId = partitionId,
taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, runningLocally = false)
TaskContextHelper.setTaskContext(context)
context.taskMetrics.setHostname(Utils.localHostName())
taskThread = Thread.currentThread()
if (_killed) {
kill(interruptThread = false)
}
try {
runTask(context)
} finally {
context.markTaskCompleted()
TaskContextHelper.unset()
}
}
ShuffleMapTask和ResultTask分別實現(xiàn)了不同的runTask函數(shù)液肌。
ShuffleMapTask的runTask函數(shù)代碼:
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
//此處的taskBinary即為在org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的廣播變量取得的
metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
// 將rdd計算的結(jié)果寫入memory或者disk
return writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
ResultTask的runTask函數(shù)代碼:
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
}
4. Task狀態(tài)更新
Task執(zhí)行是通過TaskRunner來運行谤祖,它需要通過ExecutorBackend和Driver通信老速,通信消息是StatusUpdate:
- Task運行之前橘券,告訴Driver當前Task的狀態(tài)為TaskState.RUNNING。
- Task運行之后锋华,告訴Driver當前Task的狀態(tài)為TaskState.FINISHED箭窜,并返回計算結(jié)果绽快。
- 如果Task運行過程中發(fā)生錯誤坊罢,告訴Driver當前Task的狀態(tài)為TaskState.FAILED擅耽,并返回錯誤原因乖仇。
-
如果Task在中途被Kill掉了,告訴Driver當前Task的狀態(tài)為TaskState.FAILED起趾。
5. Task執(zhí)行完畢
Task執(zhí)行完畢训裆,在TaskRunner的run函數(shù)中蜀铲,通過statusUpdate通知ExecuteBackend,結(jié)果保存在DirectTaskResult中变姨。
SchedulerBackend接收到StatusUpdate之后做如下判斷:如果任務(wù)已經(jīng)成功處理定欧,則將其從監(jiān)視列表中刪除。如果整個作業(yè)中的所有任務(wù)都已經(jīng)完成忧额,則將占用的資源釋放睦番。
TaskSchedulerImpl將當前順利完成的任務(wù)放入完成隊列,同時取出下一個等待運行的Task巩检。
下面CoarseGrainedSchedulerBackend是中處理StatusUpdate消息的代碼:
case StatusUpdate(executorId, taskId, state, data) =>
//statusUpdate函數(shù)處理處理從taskset刪除已完成的task等工作
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
"from unknown executor $sender with ID $executorId")
}
}
scheduler.statusUpdate
函數(shù)進行如下步驟:
- TaskScheduler通過TaskId找到管理這個Task的TaskSetManager(負責(zé)管理一批Task的類)兢哭,從TaskSetManager里面刪掉這個Task迟螺,并把Task插入到TaskResultGetter(負責(zé)獲取Task結(jié)果的類)的成功隊列里舍咖;
- TaskResultGetter獲取到結(jié)果之后排霉,調(diào)用TaskScheduler的handleSuccessfulTask方法把結(jié)果返回;
- TaskScheduler調(diào)用TaskSetManager的handleSuccessfulTask方法球订,處理成功的Task瑰钮;
- TaskSetManager調(diào)用DAGScheduler的taskEnded方法飞涂,告訴DAGScheduler這個Task運行結(jié)束了,如果這個時候Task全部成功了士八,就會結(jié)束TaskSetManager梁呈。
DAGScheduler在taskEnded方法里觸發(fā)CompletionEvent事件,在處理CompletionEvent消息事件中調(diào)用DAGScheduler的handleTaskCompletion函數(shù)醋虏,針對ResultTask和ShuffleMapTask區(qū)別對待結(jié)果:
1)ResultTask:
job的numFinished加1哮翘,如果numFinished等于它的分片數(shù),則表示任務(wù)該Stage結(jié)束阻课,標記這個Stage為結(jié)束限煞,最后調(diào)用JobListener(具體實現(xiàn)在JobWaiter)的taskSucceeded方法员凝,把結(jié)果交給resultHandler(經(jīng)過包裝的自己寫的那個匿名函數(shù))處理,如果完成的Task數(shù)量等于總?cè)蝿?wù)數(shù)旺上,任務(wù)退出抚官。
2)ShuffleMapTask:
- 調(diào)用Stage的addOutputLoc方法阶捆,把結(jié)果添加到Stage的outputLocs列表里
- 如果該Stage沒有等待的Task了洒试,就標記該Stage為結(jié)束
- 把Stage的outputLocs注冊到MapOutputTracker里面垒棋,留個下一個Stage用
- 如果Stage的outputLocs為空痪宰,表示它的計算失敗,重新提交Stage
- 找出下一個在等待并且沒有父親的Stage提交
轉(zhuǎn)載請注明作者Jason Ding及其出處
GitCafe博客主頁(http://jasonding1354.gitcafe.io/)
Github博客主頁(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
簡書主頁(http://www.reibang.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354進入我的博客主頁