==Spark系列(九)DAGScheduler工作原理

Spark系列(九)DAGScheduler工作原理 - 會飛的紙盒 - 博客園
http://www.cnblogs.com/jianyuan/p/Spark%E7%B3%BB%E5%88%97%E4%B9%8BDAGScheduler%E5%B7%A5%E4%BD%9C%E5%8E%9F%E7%90%86.html

1肚豺、textFile方法的實(shí)現(xiàn)內(nèi)部先通過hadoopFile創(chuàng)建HadoopRDD(key-value對格式上真,key為文本文件的每一行偏移量,value為每行的內(nèi)容)仓手,再轉(zhuǎn)換為MapPartitionsRDD(每個集合元素只包含每行的內(nèi)容)


以wordcount為示例進(jìn)行深入分析

1
object wordcount {

2

3
def main(args: Array[String]) {

4
val conf = new SparkConf()

5
conf.setAppName("****wordcount****").setMaster("****local****")

6

7
val sc = new SparkContext(conf)

8
// 產(chǎn)生HadoopRDD->MapPartitionsRDD

9
val lines = sc.textFile("****C://Users//Administrator//Desktop//wordcount.txt****", 1)

10
// 產(chǎn)生FlatMappedRDD

11
val words = lines.flatMap(line=>line.split("**** ****"))

12
// 產(chǎn)生MapPartitionsRDD

13
val pairs = words.map(word=>(word,1))

14
//產(chǎn)生MapPartitionsRDD -> ShuffleRDD -> MapPartitionsRDD, 產(chǎn)生三個RDD

15
val result= pairs.reduceByKey(_ + _);

16
// foreach為action操作,通過SparkContext的runJob方法去觸發(fā)job(DAGScheduler)

17
result.foreach(count=>println(count))

18
}

19
}

說明:

1玻淑、textFile方法的實(shí)現(xiàn)內(nèi)部先通過hadoopFile創(chuàng)建HadoopRDD(key-value對格式嗽冒,key為文本文件的每一行偏移量,value為每行的內(nèi)容)补履,再轉(zhuǎn)換為MapPartitionsRDD(每個集合元素只包含每行的內(nèi)容)

2添坊、RDD里是沒有reduceByKey的,因此對RDD調(diào)用reduceByKey()方法的時候箫锤,會觸發(fā)scala的隱式轉(zhuǎn)換贬蛙;此時就會在作用域內(nèi)雨女,尋找隱式轉(zhuǎn)換,會在RDD中找到rddToPairRDDFunctions()隱式轉(zhuǎn)換阳准,然后將RDD轉(zhuǎn)換為PairRDDFunctions氛堕。

stage劃分算法說明

從觸發(fā)action操作的rdd開始往前倒推,首先會為最后一個rdd創(chuàng)建一個stage,繼續(xù)往前倒退的時候野蝇,如果發(fā)現(xiàn)對某個 rdd是寬依賴讼稚,那么就會將該寬依賴的rdd創(chuàng)建一個新的stage,之前面的那個rdd就是新的stage的最后一個rdd。然后以次類推绕沈,繼續(xù)往前倒退锐想,根據(jù)窄依賴和寬依賴進(jìn)行stage的劃分,知道所有的rdd全部遍歷完成七冲。

劃分stage的作用

在spark中提交的應(yīng)用都會以job的形式進(jìn)行執(zhí)行痛倚,job提交后會被劃分為多個stage,然后把stage封裝為TaskSet提交到TaskScheduler到executor中執(zhí)行。

源碼分析

以上wordcount程序action操作后執(zhí)行流程:

foreach(RDD.scala) -> runJob(SparkContext.scala) -> runJob(DAGScheduler.scala) -> submitJob(DAGScheduler.scala) -> eventProcessLoop.post發(fā)送JobSubmitted(DAGScheduler.scala) -> onReceive(DAGScheduler.scala)->case JobSubmitted -> handleJobSubmitted (入口)

DAGScheduler實(shí)現(xiàn)類所屬包:org.apache.spark.scheduler

**handleJobSubmitted
**
**功能:stage的依賴分析及生成stage和對應(yīng)的Job提交
**
1
private[scheduler] def handleJobSubmitted(jobId: Int,

2
finalRDD: RDD[_],

3
func: (TaskContext, Iterator[_]) => _,

4
partitions: Array[Int],

5
allowLocal: Boolean,

6
callSite: CallSite,

7
listener: JobListener,

8
properties: Properties = null)

9
{

10
var finalStage: Stage = null

11
try {

12
// New stage creation may throw an exception if, for example, jobs are run on a

13
// HadoopRDD whose underlying HDFS files have been deleted.

14
// 使用job的最后一個rdd創(chuàng)建finalStage,并加入到DAGScheduler內(nèi)部緩存中(stageIdToStage)

15
finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)

16
} catch {

17
case e: Exception =>

18
logWarning("****Creating**** ****new**** ****stage**** ****failed**** ****due**** ****to**** ****exception**** ****-**** ****job:**** ****" + jobId, e)

19
listener.jobFailed(e)

20
return

21
}

22
if (finalStage != null) {

23
// 使用finalStage創(chuàng)建一個Job澜躺,也就是該Job的最后一個stage

24
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)

25
clearCacheLocs()

26
logInfo("****Got**** ****job**** ****%s**** ****(%s)**** ****with**** ****%d**** ****output**** ****partitions**** ****(allowLocal=%s)****".format(

27
job.jobId, callSite.shortForm, partitions.length, allowLocal))

28
logInfo("****Final**** ****stage:**** ****" + finalStage + "****(****" + finalStage.name + "****)****")

29
logInfo("****Parents**** ****of**** ****final**** ****stage:**** ****" + finalStage.parents)

30
logInfo("****Missing**** ****parents:**** ****" + getMissingParentStages(finalStage))

31
val shouldRunLocally =

32
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1

33
val jobSubmissionTime = clock.getTimeMillis()

34
// 對于沒有父stage的job 本地執(zhí)行

35
if (shouldRunLocally) {

36
// Compute very short actions like first() or take() with no parent stages locally.

37
listenerBus.post(

38
SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))

39
// 本地執(zhí)行Job

40
runLocally(job)

41
} else {

42
// 將Job加入內(nèi)存緩存中

43
jobIdToActiveJob(jobId) = job

44
activeJobs += job

45
finalStage.resultOfJob = Some(job)

46
val stageIds = jobIdToStageIds(jobId).toArray

47
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))

48
listenerBus.post(

49
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))

50
// 提交stage,所有的stage都放入waitingStages隊(duì)列里

51
submitStage(finalStage)

52
}

53
}

54
submitWaitingStages()

55
}

submitStage

功能:stage劃分算法實(shí)現(xiàn)入口

1
private def submitStage(stage: Stage) {

2
val jobId = activeJobForStage(stage)

3
if (jobId.isDefined) {

4
logDebug("****submitStage(****" + stage + "****)****")

5
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {

6
//獲取當(dāng)前stage的父stage

7
val missing = getMissingParentStages(stage).sortBy(_.id)

8
logDebug("****missing:**** ****" + missing)

9
if (missing == Nil) {

10
logInfo("****Submitting**** ****" + stage + "**** ****(****" + stage.rdd + "****),**** ****which**** ****has**** ****no**** ****missing**** ****parents****")

11
// 為stage創(chuàng)建task,且task數(shù)據(jù)與partition數(shù)量相同

12
submitMissingTasks(stage, jobId.get)

13
} else {

14
// 提交父stage

15
for (parent <- missing) {

16
submitStage(parent)

17
}

18
// 將stage加入waitingStages緩存中

19
waitingStages += stage

20
}

21
}

22
} else {

23
abortStage(stage, "****No**** ****active**** ****job**** ****for**** ****stage**** ****" + stage.id)

24
}

25
}


getMissingParentStages

**
**
功能:
stage劃分算法的具體實(shí)現(xiàn)

實(shí)現(xiàn)原理:

對于一個stage如果它的最后一個rdd的所有依賴都是窄依賴蝉稳,那么不會創(chuàng)建新的stage,但如果存在寬依賴,就用寬依賴的那個rdd

創(chuàng)建一個新的stage并返回

1
// stage劃分算法的具體實(shí)現(xiàn)

2
// 對于一個stage如果它的最后一個rdd的所有依賴都是窄依賴掘鄙,那么不會創(chuàng)建新的stage,

3
// 但如果存在寬依賴耘戚,就用寬依賴的那個rdd創(chuàng)建一個新的stage并返回

4
private def getMissingParentStages(stage: Stage): List[Stage] = {

5
val missing = new HashSet[Stage]

6
val visited = new HashSet[RDD[_]]

7
// We are manually maintaining a stack here to prevent StackOverflowError

8
// caused by recursively visiting

9
val waitingForVisit = new Stack[RDD[_]]

10
def visit(rdd: RDD[_]) {

11
if (!visited(rdd)) {

12
visited += rdd

13
if (getCacheLocs(rdd).contains(Nil)) {

14
// 遍歷RDD

15
for (dep <- rdd.dependencies) {

16
dep match {

17
// 寬依賴處理

18
case shufDep: ShuffleDependency[_, _, _] =>

19
// 創(chuàng)建stage,并將isShuffleMap設(shè)置為true

20
val mapStage = getShuffleMapStage(shufDep, stage.jobId)

21
if (!mapStage.isAvailable) {

22
// 將新創(chuàng)建的stage緩存到missing中

23
missing += mapStage

24
}

25
// 窄依賴處理

26
case narrowDep: NarrowDependency[_] =>

27
// 將依賴的rdd放入棧中

28
waitingForVisit.push(narrowDep.rdd)

29
}

30
}

31
}

32
}

33
}

34
// 向waitingForVisit棧中壓rdd

35
waitingForVisit.push(stage.rdd)

36
while (!waitingForVisit.isEmpty) {

37
visit(waitingForVisit.pop())

38
}

39
// 返回stage列表

40
missing.toList

41
}

說明:

stage劃分算法由submitStage()方法和getMissingStages()方法共同組成

submitMissingTasks

功能:

為stage創(chuàng)建一批task,且task數(shù)量與partition數(shù)量相同

1
// 為stage創(chuàng)建一批task操漠,且task數(shù)量與partition數(shù)量相同

2
private def submitMissingTasks(stage: Stage, jobId: Int) {

3
logDebug("****submitMissingTasks(****" + stage + "****)****")

4
// Get our pending tasks and remember them in our pendingTasks entry

5
stage.pendingTasks.clear()

6

7
// First figure out the indexes of partition ids to compute.

8
// 獲取需要創(chuàng)建的partition數(shù)量

9
val partitionsToCompute: Seq[Int] = {

10
if (stage.isShuffleMap) {

11
(0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)

12
} else {

13
val job = stage.resultOfJob.get

14
(0 until job.numPartitions).filter(id => !job.finished(id))

15
}

16
}

17

18
................................

19

20
// 將stae加入到runningStages緩存中

21
runningStages += stage

22

23
................................

24

25
// 為stage創(chuàng)建指定數(shù)量的task,并計(jì)算最佳位置

26
val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {

27
partitionsToCompute.map { id =>

28
// 計(jì)算最佳位置

29
val locs = getPreferredLocs(stage.rdd, id)

30
val part = stage.rdd.partitions(id)

31
// 創(chuàng)建ShuffleMapTask

32
new ShuffleMapTask(stage.id, taskBinary, part, locs)

33
}

34
} else {

35
val job = stage.resultOfJob.get

36
partitionsToCompute.map { id =>

37
val p: Int = job.partitions(id)

38
val part = stage.rdd.partitions(p)

39
val locs = getPreferredLocs(stage.rdd, p)

40
// 給final stage創(chuàng)建ResultTask

41
new ResultTask(stage.id, taskBinary, part, locs, id)

42
}

43
}

44

45
if (tasks.size > 0) {

46
logInfo("****Submitting**** ****" + tasks.size + "**** ****missing**** ****tasks**** ****from**** ****" + stage + "**** ****(****" + stage.rdd + "****)****")

47
stage.pendingTasks ++= tasks

48
logDebug("****New**** ****pending**** ****tasks:**** ****" + stage.pendingTasks)

49
// 對stage的task創(chuàng)建TaskSet對象收津,調(diào)用TaskScheduler的submitTasks()方法提交TaskSet

50
taskScheduler.submitTasks(

51
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))

52
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

53
}

54

55
......................

56
}

getPreferredLocsInternal

功能:

計(jì)算每個task對應(yīng)的partition最佳位置,從stage的最后一個rdd開始查找浊伙,看rdd的partition是否有被cache撞秋、chencjpoint,如果有那么task的最佳位置就被cache或者checkpoint的partition的位置

調(diào)用過程:

submitMissingTasks->getPreferredLocs->getPreferredLocsInternal

1
// 計(jì)算每個task對應(yīng)的partition最佳位置

2
// 從stage的最后一個rdd開始查找,看rdd的partition是否有被cache嚣鄙、chencjpoint,

3
// 如果有那么task的最佳位置就被cache或者checkpoint的partition的位置

4
private def getPreferredLocsInternal(

5
rdd: RDD[_],

6
partition: Int,

7
visited: HashSet[(RDD[_],Int)])

8
: Seq[TaskLocation] =

9
{

10
// If the partition has already been visited, no need to re-visit.

11
// This avoids exponential path exploration. SPARK-695

12
if (!visited.add((rdd,partition))) {

13
// Nil has already been returned for previously visited partitions.

14
return Nil

15
}

16
// If the partition is cached, return the cache locations

17
// 尋找rdd是否被緩存

18
val cached = getCacheLocs(rdd)(partition)

19
if (!cached.isEmpty) {

20
return cached

21
}

22
// If the RDD has some placement preferences (as is the case for input RDDs), get those

23
// 尋找當(dāng)前RDD是否被cachepoint

24
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList

25
if (!rddPrefs.isEmpty) {

26
return rddPrefs.map(TaskLocation(_))

27
}

28
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep

29
// that has any placement preferences. Ideally we would choose based on transfer sizes,

30
// but this will do for now.

31
// 遞歸調(diào)用自己尋找rdd的父rdd,檢查對應(yīng)的partition是否被緩存或者checkpoint

32
rdd.dependencies.foreach {

33
case n: NarrowDependency[_] =>

34
for (inPart <- n.getParents(partition)) {

35
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)

36
if (locs != Nil) {

37
return locs

38
}

39
}

40
case _ =>

41
}

42
// 如果stage從最后一個rdd到最開始的rdd吻贿,partiton都沒有被緩存或者cachepoint,

43
// 那么task的最佳位置(preferredLocs)為Nil

44
Nil

45
}

分類: Spark

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末哑子,一起剝皮案震驚了整個濱河市舅列,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌卧蜓,老刑警劉巖帐要,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異弥奸,居然都是意外死亡榨惠,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來赠橙,“玉大人伸蚯,你說我怎么就攤上這事〖蚩荆” “怎么了剂邮?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長横侦。 經(jīng)常有香客問我,道長枉侧,這世上最難降的妖魔是什么引瀑? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮榨馁,結(jié)果婚禮上憨栽,老公的妹妹穿的比我還像新娘。我一直安慰自己翼虫,他們只是感情好屑柔,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著珍剑,像睡著了一般掸宛。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上招拙,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天唧瘾,我揣著相機(jī)與錄音,去河邊找鬼别凤。 笑死饰序,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的规哪。 我是一名探鬼主播求豫,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼由缆!你這毒婦竟也來了注祖?” 一聲冷哼從身側(cè)響起猾蒂,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤均唉,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后肚菠,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體舔箭,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了层扶。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片箫章。...
    茶點(diǎn)故事閱讀 39,902評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖镜会,靈堂內(nèi)的尸體忽然破棺而出檬寂,到底是詐尸還是另有隱情,我是刑警寧澤戳表,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布桶至,位于F島的核電站,受9級特大地震影響匾旭,放射性物質(zhì)發(fā)生泄漏镣屹。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一价涝、第九天 我趴在偏房一處隱蔽的房頂上張望女蜈。 院中可真熱鬧,春花似錦色瘩、人聲如沸伪窖。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽惰许。三九已至,卻和暖如春史辙,著一層夾襖步出監(jiān)牢的瞬間汹买,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工聊倔, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留晦毙,地道東北人。 一個月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓耙蔑,卻偏偏與公主長得像见妒,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子甸陌,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容