DAGScheduler.handleJobSubmitted
//創(chuàng)建job
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
//提價(jià)stage
submitStage(finalStage)
private def submitStage(stage: Stage): Unit = {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug(s"submitStage($stage (name=${stage.name};" +
s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
// 看當(dāng)前stage有沒(méi)有parentstage
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
//沒(méi)有parent stage 就提交任務(wù)
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
//有parent stage,遞歸進(jìn)行尋找沒(méi)有parent stage的進(jìn)行提交任務(wù)
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
submitMissingTasks
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
//計(jì)算分區(qū),每一個(gè)分區(qū)編號(hào)對(duì)應(yīng)一個(gè)task
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
//創(chuàng)建task
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
// Figure out the indexes of partition ids to compute.
//分區(qū)編號(hào)
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
ShuffleMapStage.findMissingPartitions
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
//看看shuffleDep.shuffleId有沒(méi)有當(dāng)前分區(qū)
1.有的話就取 2.沒(méi)有就是stage最后一個(gè)rdd的分區(qū)數(shù)
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
}