Spark中Job如何劃分為Stage源碼解讀

版權聲明:本文為原創(chuàng)文章渣蜗,未經允許不得轉載屠尊。
復習內容:
Spark中Job的提交 http://www.reibang.com/p/e3f4df04facf

1.Spark中Job如何劃分為Stage

我們在復習內容中介紹了Spark中Job的提交,下面我們看如何將Job劃分為Stage袍睡。
對于JobSubmitted事件類型知染,通過 dagScheduler的handleJobSubmitted方法處理肋僧,方法源碼如下:
<code>
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[],
func: (TaskContext, Iterator[
]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
//根據jobId生成新的Stage,詳見1
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
...
Stage的提交及TaskSet(tasks)的提交
...
}
</code>

1.newResultStage方法如下, 根據jobId生成一個ResultStage
<code>
private def newResultStage(
rdd: RDD[],
func: (TaskContext, Iterator[
]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
//根據jobid和rdd得到父Stages和StageId斑胜,詳見2
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
//根據父Stages和StageId生成ResultStage,詳見4
val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
</code>

  1. getParentStagesAndId方法如下所示:
    <code>
    private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
    val parentStages = getParentStages(rdd, firstJobId),詳見3
    val id = nextStageId.getAndIncrement()
    (parentStages, id)
    }
    </code>

3.getParentStages方法如下所示:
<code>
private def getParentStages(rdd: RDD[], firstJobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[
]]
//將要遍歷的RDD放到棧Stack中
val waitingForVisit = new Stack[RDD[]]
def visit(r: RDD[
]) {
if (!visited(r)) {
visited += r
for (dep <- r.dependencies) {
dep match {
//判斷rdd的依賴關系嫌吠,如果是ShuffleDependency說明是寬依賴止潘,詳見4
case shufDep: ShuffleDependency[_, _, ] =>
parents += getShuffleMapStage(shufDep, firstJobId)
//是窄依賴
case _ =>
//遍歷rdd的父RDD是否有父Stage存在
waitingForVisit.push(dep.rdd)
} } } }
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
//調用visit方法訪問出棧的RDD
visit(waitingForVisit.pop())
}
parents.toList
}
</code>
4.getShuffleMapStage方法如下所示:
<code>
private def getShuffleMapStage(
shuffleDep: ShuffleDependency[
, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
// We are going to register ancestor shuffle dependencies,詳見5
getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
//根據firstJobId生成ShuffleMapStage,詳見6
shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
}
// Then register current shuffleDep
val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage
stage
}
}
</code>

5.getAncestorShuffleDependencies方法如下:
<code>
private def getAncestorShuffleDependencies(rdd: RDD[]): Stack[ShuffleDependency[, , ]] = {
val parents = new Stack[ShuffleDependency[
, , ]]
val visited = new HashSet[RDD[
]]
val waitingForVisit = new Stack[RDD[
]]
def visit(r: RDD[
]) {
if (!visited(r)) {
visited += r
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
if (!shuffleToMapStage.contains(shufDep.shuffleId)) {
parents.push(shufDep)
}
case _ =>
}
waitingForVisit.push(dep.rdd)
}
}
}
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
parents
}
</code>

6.newOrUsedShuffleStage方法如下所示,根據給定的RDD生成ShuffleMapStage,如果shuffleId對應的Stage已經存在與MapOutputTracker辫诅,那么number和位置輸出的位置信息都可以從MapOutputTracker找到
<code>
private def newOrUsedShuffleStage(
shuffleDep: ShuffleDependency[_, _, ],
firstJobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.length
val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)
if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
for (i <- 0 until locs.length) {
stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing
}
stage.numAvailableOutputs = locs.count(
!= null)
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}
</code>

2.Stage描述

一個Stage是一組并行的tasks凭戴;一個Stage可以被多個Job共享;一些Stage可能沒有運行所有的RDD的分區(qū)炕矮,比如first 和 lookup么夫;Stage的劃分是通過是否存在Shuffle為邊界來劃分的者冤,Stage的子類有兩個:ResultStage和ShuffleMapStage
對于窄依賴生成的是ResultStage,對于寬依賴生成的是ShuffleMapStage档痪。當ShuffleMapStages執(zhí)行完后涉枫,產生輸出文件,等待reduce task去獲取腐螟,同時愿汰,ShffleMapStages也可以通過DAGScheduler的submitMapStage方法

獨立作為job被提交
stage劃分示意圖.png

下一篇我們看Stage如何提交的。
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末乐纸,一起剝皮案震驚了整個濱河市衬廷,隨后出現的幾起案子,更是在濱河造成了極大的恐慌汽绢,老刑警劉巖吗跋,帶你破解...
    沈念sama閱讀 221,635評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異庶喜,居然都是意外死亡小腊,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 94,543評論 3 399
  • 文/潘曉璐 我一進店門久窟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來秩冈,“玉大人,你說我怎么就攤上這事斥扛∪胛剩” “怎么了?”我有些...
    開封第一講書人閱讀 168,083評論 0 360
  • 文/不壞的土叔 我叫張陵稀颁,是天一觀的道長芬失。 經常有香客問我,道長匾灶,這世上最難降的妖魔是什么棱烂? 我笑而不...
    開封第一講書人閱讀 59,640評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮阶女,結果婚禮上颊糜,老公的妹妹穿的比我還像新娘。我一直安慰自己秃踩,他們只是感情好衬鱼,可當我...
    茶點故事閱讀 68,640評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著憔杨,像睡著了一般鸟赫。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,262評論 1 308
  • 那天抛蚤,我揣著相機與錄音台谢,去河邊找鬼。 笑死岁经,一個胖子當著我的面吹牛对碌,可吹牛的內容都是我干的。 我是一名探鬼主播蒿偎,決...
    沈念sama閱讀 40,833評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼朽们,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了诉位?” 一聲冷哼從身側響起骑脱,我...
    開封第一講書人閱讀 39,736評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎苍糠,沒想到半個月后叁丧,有當地人在樹林里發(fā)現了一具尸體,經...
    沈念sama閱讀 46,280評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡岳瞭,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,369評論 3 340
  • 正文 我和宋清朗相戀三年拥娄,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瞳筏。...
    茶點故事閱讀 40,503評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡稚瘾,死狀恐怖,靈堂內的尸體忽然破棺而出姚炕,到底是詐尸還是另有隱情摊欠,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布柱宦,位于F島的核電站些椒,受9級特大地震影響,放射性物質發(fā)生泄漏掸刊。R本人自食惡果不足惜免糕,卻給世界環(huán)境...
    茶點故事閱讀 41,870評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望忧侧。 院中可真熱鬧石窑,春花似錦、人聲如沸苍柏。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至辩块,卻和暖如春匾荆,著一層夾襖步出監(jiān)牢的瞬間抽碌,已是汗流浹背儿普。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評論 1 272
  • 我被黑心中介騙來泰國打工岛宦, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留抵皱,地道東北人余耽。 一個月前我還...
    沈念sama閱讀 48,909評論 3 376
  • 正文 我出身青樓缚柏,卻偏偏與公主長得像,于是被迫代替她去往敵國和親碟贾。 傳聞我的和親對象是個殘疾皇子币喧,可洞房花燭夜當晚...
    茶點故事閱讀 45,512評論 2 359

推薦閱讀更多精彩內容