版權聲明:本文為原創(chuàng)文章渣蜗,未經允許不得轉載屠尊。
復習內容:
Spark中Job的提交 http://www.reibang.com/p/e3f4df04facf
1.Spark中Job如何劃分為Stage
我們在復習內容中介紹了Spark中Job的提交,下面我們看如何將Job劃分為Stage袍睡。
對于JobSubmitted事件類型知染,通過 dagScheduler的handleJobSubmitted方法處理肋僧,方法源碼如下:
<code>
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[],
func: (TaskContext, Iterator[]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
//根據jobId生成新的Stage,詳見1
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
...
Stage的提交及TaskSet(tasks)的提交
...
}
</code>
1.newResultStage方法如下, 根據jobId生成一個ResultStage
<code>
private def newResultStage(
rdd: RDD[],
func: (TaskContext, Iterator[]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
//根據jobid和rdd得到父Stages和StageId斑胜,詳見2
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
//根據父Stages和StageId生成ResultStage,詳見4
val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
</code>
- getParentStagesAndId方法如下所示:
<code>
private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
val parentStages = getParentStages(rdd, firstJobId),詳見3
val id = nextStageId.getAndIncrement()
(parentStages, id)
}
</code>
3.getParentStages方法如下所示:
<code>
private def getParentStages(rdd: RDD[], firstJobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[]]
//將要遍歷的RDD放到棧Stack中
val waitingForVisit = new Stack[RDD[]]
def visit(r: RDD[]) {
if (!visited(r)) {
visited += r
for (dep <- r.dependencies) {
dep match {
//判斷rdd的依賴關系嫌吠,如果是ShuffleDependency說明是寬依賴止潘,詳見4
case shufDep: ShuffleDependency[_, _, ] =>
parents += getShuffleMapStage(shufDep, firstJobId)
//是窄依賴
case _ =>
//遍歷rdd的父RDD是否有父Stage存在
waitingForVisit.push(dep.rdd)
} } } }
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
//調用visit方法訪問出棧的RDD
visit(waitingForVisit.pop())
}
parents.toList
}
</code>
4.getShuffleMapStage方法如下所示:
<code>
private def getShuffleMapStage(
shuffleDep: ShuffleDependency[, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
// We are going to register ancestor shuffle dependencies,詳見5
getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
//根據firstJobId生成ShuffleMapStage,詳見6
shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
}
// Then register current shuffleDep
val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage
stage
}
}
</code>
5.getAncestorShuffleDependencies方法如下:
<code>
private def getAncestorShuffleDependencies(rdd: RDD[]): Stack[ShuffleDependency[, , ]] = {
val parents = new Stack[ShuffleDependency[, , ]]
val visited = new HashSet[RDD[]]
val waitingForVisit = new Stack[RDD[]]
def visit(r: RDD[]) {
if (!visited(r)) {
visited += r
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
if (!shuffleToMapStage.contains(shufDep.shuffleId)) {
parents.push(shufDep)
}
case _ =>
}
waitingForVisit.push(dep.rdd)
}
}
}
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
parents
}
</code>
6.newOrUsedShuffleStage方法如下所示,根據給定的RDD生成ShuffleMapStage,如果shuffleId對應的Stage已經存在與MapOutputTracker辫诅,那么number和位置輸出的位置信息都可以從MapOutputTracker找到
<code>
private def newOrUsedShuffleStage(
shuffleDep: ShuffleDependency[_, _, ],
firstJobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.length
val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)
if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
for (i <- 0 until locs.length) {
stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing
}
stage.numAvailableOutputs = locs.count( != null)
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}
</code>
2.Stage描述
一個Stage是一組并行的tasks凭戴;一個Stage可以被多個Job共享;一些Stage可能沒有運行所有的RDD的分區(qū)炕矮,比如first 和 lookup么夫;Stage的劃分是通過是否存在Shuffle為邊界來劃分的者冤,Stage的子類有兩個:ResultStage和ShuffleMapStage
對于窄依賴生成的是ResultStage,對于寬依賴生成的是ShuffleMapStage档痪。當ShuffleMapStages執(zhí)行完后涉枫,產生輸出文件,等待reduce task去獲取腐螟,同時愿汰,ShffleMapStages也可以通過DAGScheduler的submitMapStage方法
下一篇我們看Stage如何提交的。