DAG的生成
概述
spark作為一套高效的分布式運算框架爷辙,但是想要更深入的學習它,就要通過分析spark的源碼,不但可以更好的幫助理解spark的工作過程,還可以提高對集群的排錯能力图谷,本文主要關注的是Spark的Stage任務的執(zhí)行流程的流程。
DAG(Directed Acyclic Graph)叫做有向無環(huán)圖阱洪,原始的RDD通過一系列的轉(zhuǎn)換就就形成了DAG便贵,根據(jù)RDD之間的依賴關系的不同將DAG劃分成不同的Stage,對于窄依賴冗荸,partition的轉(zhuǎn)換處理在Stage中完成計算承璃。對于寬依賴,由于有Shuffle的存在蚌本,只能在parent RDD處理完成后绸硕,才能開始接下來的計算,因此寬依賴是劃分Stage的依據(jù)魂毁。
窄依賴 指的是每一個父RDD的Partition最多被子RDD的一個Partition使用
寬依賴 指的是多個子RDD的Partition會依賴同一個父RDD的Partition
DAGScheduler調(diào)度隊列
當我們看完Executor的創(chuàng)建與啟動流程后,我們繼續(xù)在SparkContext的構(gòu)造方法中繼續(xù)查看
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
出嘹。席楚。。税稼。烦秩。。
private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
//通過SparkEnv來創(chuàng)建createDriverEnv
SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
}
//在這里調(diào)用了createSparkEnv郎仆,返回一個SparkEnv對象只祠,這個對象里面有很多重要屬性,最重要的ActorSystem
private[spark] val env = createSparkEnv(conf, isLocal, listenerBus)
SparkEnv.set(env)
//創(chuàng)建taskScheduler
// Create and start the scheduler
private[spark] var (schedulerBackend, taskScheduler) =
SparkContext.createTaskScheduler(this, master)
//創(chuàng)建DAGScheduler
dagScheduler = new DAGScheduler(this)
//啟動TaksScheduler
taskScheduler.start()
扰肌。抛寝。。。盗舰。
}
在構(gòu)造方法中還創(chuàng)建了一個DAGScheduler對象晶府,這個類的任務就是用來劃分Stage任務的,構(gòu)造方法中初始化了 private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
DAGSchedulerEventProcessLoop是一個事件總線對象钻趋,用來負責任務的分發(fā)川陆,在構(gòu)造方法eventProcessLoop.start()
被調(diào)用,該方法是父類EventLoop的start
def start(): Unit = {
if (stopped.get) {
throw new IllegalStateException(name + " has already been stopped")
}
// Call onStart before starting the event thread to make sure it happens before onReceive
onStart()
eventThread.start()
}
調(diào)用了eventThread的start方法蛮位,開啟了一個線程
private val eventThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = {
try {
while (!stopped.get) {
val event = eventQueue.take()
try {
onReceive(event)
} catch {
case NonFatal(e) => {
try {
onError(e)
} catch {
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
}
} catch {
case ie: InterruptedException => // exit even if eventQueue is not empty
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
run方法中不斷的從LinkedBlockingDeque阻塞隊列中取消息较沪,然后調(diào)用onReceive(event)
方法,該方法是由子類DAGSchedulerEventProcessLoop實現(xiàn)的
override def onReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
//調(diào)用dagScheduler來出來提交任務
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
listener, properties)
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason) =>
dagScheduler.handleTaskSetFailed(taskSet, reason)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
onReceive中會匹配到傳入的任務類型失仁,執(zhí)行相應的邏輯尸曼。到此DAGScheduler的調(diào)度隊列會一直掛起,不斷輪詢隊列中的任務陶因。
DAG提交Task任務流程
當RDD經(jīng)過一系列的轉(zhuǎn)換Transformation方法后骡苞,最終要執(zhí)行Action動作方法,這里比如WordCount程序中最后調(diào)用collect()
方法時會將數(shù)據(jù)提交到Master上運行楷扬,任務真正的被執(zhí)行解幽,這里的方法執(zhí)行過程如下
/**
* Return an array that contains all of the elements in this RDD.
*/
def collect(): Array[T] = {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
sc
是SparkContext對象,這里調(diào)用 一個runJob
該方法調(diào)用多次重載的方法后,該方法最終會調(diào)用 dagScheduler.runJob
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
if (stopped) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
//dagScheduler出現(xiàn)了烘苹,可以切分stage
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
dagScheduler的runJob
是我們比較關心的
def runJob[T, U: ClassTag](
躲株。。镣衡。霜定。。
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
waiter.awaitResult() match {
case JobSucceeded => {
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
}
case JobFailed(exception: Exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
throw exception
}
}
這里面的我們主要看的是submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
提交任務
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
廊鸥。望浩。。惰说。磨德。。
//把job加入到任務隊列里面
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
waiter
}
這里比較關鍵的地方是eventProcessLoop.post
往任務隊列中加入一個JobSubmitted類型的任務吆视,eventProcessLoop是在構(gòu)造方法中就初始化好的事件總線對象典挑,內(nèi)部有一個線程不斷的輪詢隊列里的任務
輪詢到任務后調(diào)用onReceive
方法匹配任務類型,在這里我們提交的任務是JobSubmitted類型
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
//調(diào)用dagScheduler來出來提交任務
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
listener, properties)
調(diào)用了handleJobSubmitted
方法啦吧,接下來查看該方法
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
allowLocal: Boolean,
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: Stage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
//最終的stage
finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
您觉。。授滓。琳水。
submitStage(finalStage)
}
上面的代碼中肆糕,調(diào)用了newStage
進行任務的劃分,該方法是劃分任務的核心方法炫刷,劃分任務的根據(jù)最后一個依賴關系作為開始擎宝,通過遞歸,將每個寬依賴做為切分Stage的依據(jù)浑玛,切分Stage的過程是流程中的一環(huán)绍申,但在這里不詳細闡述,當任務切分完畢后顾彰,代碼繼續(xù)執(zhí)行來到submitStage(finalStage)
這里開始進行任務提交
這里以遞歸的方式進行任務的提交
//遞歸的方式提交stage
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing == Nil) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
//提交任務
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id)
}
}
調(diào)用submitMissingTasks(stage, jobId.get)
提交任務极阅,將每一個Stage和jobId傳入
private def submitMissingTasks(stage: Stage, jobId: Int) {
。涨享。筋搏。。厕隧。
if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingTasks ++= tasks
logDebug("New pending tasks: " + stage.pendingTasks)
//taskScheduler提交task
taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
}
}
這里的代碼我們需要關注的是 taskScheduler.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
創(chuàng)建了一個TaskSet對象奔脐,將所有任務的信息封裝,包括task任務列表吁讨,stageId,任務id,分區(qū)數(shù)參數(shù)等
Task任務調(diào)度
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
//創(chuàng)建TaskSetManager保存了taskSet任務列表
val manager = createTaskSetManager(taskSet, maxTaskFailures)
activeTaskSets(taskSet.id) = manager
//將任務加入調(diào)度池
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
}
hasReceivedTask = true
}
//接受任務
backend.reviveOffers()
}
該方法比較重要髓迎,主要將任務加入調(diào)度池,最后調(diào)用了backend.reviveOffers()
這里的backend是CoarseGrainedSchedulerBackend一個Executor任務調(diào)度對象
override def reviveOffers() {
//自己給自己發(fā)消息
driverActor ! ReviveOffers
}
這里用了內(nèi)部的DriverActor對象發(fā)送了一個內(nèi)部消息給自己建丧,接下來查看receiver方法接受的消息
case ReviveOffers =>
makeOffers()
收到消息后調(diào)用了makeOffers()
方法
def makeOffers() {
launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq))
}
makeOffers方法中排龄,將Executor的信息集合與調(diào)度池中的Tasks封裝成WokerOffers列表傳給了
launchTasks
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
。翎朱。橄维。。拴曲。争舞。
//把task序列化
val serializedTask = ser.serialize(task)
。澈灼。兑障。。蕉汪。
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
//把序列化好的task發(fā)送給Executor
executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
}
}
}
launchTasks方法將遍歷Tasks集合,每個Task任務序列化,發(fā)送啟動Task執(zhí)行消息的給Executor
Executor的onReceive方法
//DriverActor發(fā)送給Executor的啟動Task的消息
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val ser = env.closureSerializer.newInstance()
//把Task反序列化
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
//啟動task
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
Executor收到DriverActor發(fā)送的啟動Task的消息逞怨,這里才開始真正執(zhí)行任務了者疤,將收到的Task序列化信息反序列化,調(diào)用Executor
的launchTask
方法執(zhí)行任務
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer) {
//把task的描述信息放到了一份TaskRunner
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
runningTasks.put(taskId, tr)
//然后把TaskRunner丟到線程池里面
threadPool.execute(tr)
}
launchTask內(nèi)將Task提交到線程池去運行,TaskRunner是Runnable對象叠赦,里面的run方法執(zhí)行了我們app生成的每一個RDD的鏈上的邏輯驹马。 到此革砸,RDD的整個作業(yè)方式就結(jié)束了。