Spark系列(九)DAGScheduler工作原理 - 會飛的紙盒 - 博客園
http://www.cnblogs.com/jianyuan/p/Spark%E7%B3%BB%E5%88%97%E4%B9%8BDAGScheduler%E5%B7%A5%E4%BD%9C%E5%8E%9F%E7%90%86.html
1肚豺、textFile方法的實(shí)現(xiàn)內(nèi)部先通過hadoopFile創(chuàng)建HadoopRDD(key-value對格式上真,key為文本文件的每一行偏移量,value為每行的內(nèi)容)仓手,再轉(zhuǎn)換為MapPartitionsRDD(每個集合元素只包含每行的內(nèi)容)
以wordcount為示例進(jìn)行深入分析
1
object wordcount {
2
3
def main(args: Array[String]) {
4
val conf = new SparkConf()
5
conf.setAppName("****wordcount****").setMaster("****local****")
6
7
val sc = new SparkContext(conf)
8
// 產(chǎn)生HadoopRDD->MapPartitionsRDD
9
val lines = sc.textFile("****C://Users//Administrator//Desktop//wordcount.txt****", 1)
10
// 產(chǎn)生FlatMappedRDD
11
val words = lines.flatMap(line=>line.split("**** ****"))
12
// 產(chǎn)生MapPartitionsRDD
13
val pairs = words.map(word=>(word,1))
14
//產(chǎn)生MapPartitionsRDD -> ShuffleRDD -> MapPartitionsRDD, 產(chǎn)生三個RDD
15
val result= pairs.reduceByKey(_ + _);
16
// foreach為action操作,通過SparkContext的runJob方法去觸發(fā)job(DAGScheduler)
17
result.foreach(count=>println(count))
18
}
19
}
說明:
1玻淑、textFile方法的實(shí)現(xiàn)內(nèi)部先通過hadoopFile創(chuàng)建HadoopRDD(key-value對格式嗽冒,key為文本文件的每一行偏移量,value為每行的內(nèi)容)补履,再轉(zhuǎn)換為MapPartitionsRDD(每個集合元素只包含每行的內(nèi)容)
2添坊、RDD里是沒有reduceByKey的,因此對RDD調(diào)用reduceByKey()方法的時候箫锤,會觸發(fā)scala的隱式轉(zhuǎn)換贬蛙;此時就會在作用域內(nèi)雨女,尋找隱式轉(zhuǎn)換,會在RDD中找到rddToPairRDDFunctions()隱式轉(zhuǎn)換阳准,然后將RDD轉(zhuǎn)換為PairRDDFunctions氛堕。
stage劃分算法說明
從觸發(fā)action操作的rdd開始往前倒推,首先會為最后一個rdd創(chuàng)建一個stage,繼續(xù)往前倒退的時候野蝇,如果發(fā)現(xiàn)對某個 rdd是寬依賴讼稚,那么就會將該寬依賴的rdd創(chuàng)建一個新的stage,之前面的那個rdd就是新的stage的最后一個rdd。然后以次類推绕沈,繼續(xù)往前倒退锐想,根據(jù)窄依賴和寬依賴進(jìn)行stage的劃分,知道所有的rdd全部遍歷完成七冲。
劃分stage的作用
在spark中提交的應(yīng)用都會以job的形式進(jìn)行執(zhí)行痛倚,job提交后會被劃分為多個stage,然后把stage封裝為TaskSet提交到TaskScheduler到executor中執(zhí)行。
源碼分析
以上wordcount程序action操作后執(zhí)行流程:
foreach(RDD.scala) -> runJob(SparkContext.scala) -> runJob(DAGScheduler.scala) -> submitJob(DAGScheduler.scala) -> eventProcessLoop.post發(fā)送JobSubmitted(DAGScheduler.scala) -> onReceive(DAGScheduler.scala)->case JobSubmitted -> handleJobSubmitted (入口)
DAGScheduler實(shí)現(xiàn)類所屬包:org.apache.spark.scheduler
**handleJobSubmitted
**
**功能:stage的依賴分析及生成stage和對應(yīng)的Job提交
**
1
private[scheduler] def handleJobSubmitted(jobId: Int,
2
finalRDD: RDD[_],
3
func: (TaskContext, Iterator[_]) => _,
4
partitions: Array[Int],
5
allowLocal: Boolean,
6
callSite: CallSite,
7
listener: JobListener,
8
properties: Properties = null)
9
{
10
var finalStage: Stage = null
11
try {
12
// New stage creation may throw an exception if, for example, jobs are run on a
13
// HadoopRDD whose underlying HDFS files have been deleted.
14
// 使用job的最后一個rdd創(chuàng)建finalStage,并加入到DAGScheduler內(nèi)部緩存中(stageIdToStage)
15
finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
16
} catch {
17
case e: Exception =>
18
logWarning("****Creating**** ****new**** ****stage**** ****failed**** ****due**** ****to**** ****exception**** ****-**** ****job:**** ****" + jobId, e)
19
listener.jobFailed(e)
20
return
21
}
22
if (finalStage != null) {
23
// 使用finalStage創(chuàng)建一個Job澜躺,也就是該Job的最后一個stage
24
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
25
clearCacheLocs()
26
logInfo("****Got**** ****job**** ****%s**** ****(%s)**** ****with**** ****%d**** ****output**** ****partitions**** ****(allowLocal=%s)****".format(
27
job.jobId, callSite.shortForm, partitions.length, allowLocal))
28
logInfo("****Final**** ****stage:**** ****" + finalStage + "****(****" + finalStage.name + "****)****")
29
logInfo("****Parents**** ****of**** ****final**** ****stage:**** ****" + finalStage.parents)
30
logInfo("****Missing**** ****parents:**** ****" + getMissingParentStages(finalStage))
31
val shouldRunLocally =
32
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
33
val jobSubmissionTime = clock.getTimeMillis()
34
// 對于沒有父stage的job 本地執(zhí)行
35
if (shouldRunLocally) {
36
// Compute very short actions like first() or take() with no parent stages locally.
37
listenerBus.post(
38
SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
39
// 本地執(zhí)行Job
40
runLocally(job)
41
} else {
42
// 將Job加入內(nèi)存緩存中
43
jobIdToActiveJob(jobId) = job
44
activeJobs += job
45
finalStage.resultOfJob = Some(job)
46
val stageIds = jobIdToStageIds(jobId).toArray
47
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
48
listenerBus.post(
49
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
50
// 提交stage,所有的stage都放入waitingStages隊(duì)列里
51
submitStage(finalStage)
52
}
53
}
54
submitWaitingStages()
55
}
submitStage
功能:stage劃分算法實(shí)現(xiàn)入口
1
private def submitStage(stage: Stage) {
2
val jobId = activeJobForStage(stage)
3
if (jobId.isDefined) {
4
logDebug("****submitStage(****" + stage + "****)****")
5
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
6
//獲取當(dāng)前stage的父stage
7
val missing = getMissingParentStages(stage).sortBy(_.id)
8
logDebug("****missing:**** ****" + missing)
9
if (missing == Nil) {
10
logInfo("****Submitting**** ****" + stage + "**** ****(****" + stage.rdd + "****),**** ****which**** ****has**** ****no**** ****missing**** ****parents****")
11
// 為stage創(chuàng)建task,且task數(shù)據(jù)與partition數(shù)量相同
12
submitMissingTasks(stage, jobId.get)
13
} else {
14
// 提交父stage
15
for (parent <- missing) {
16
submitStage(parent)
17
}
18
// 將stage加入waitingStages緩存中
19
waitingStages += stage
20
}
21
}
22
} else {
23
abortStage(stage, "****No**** ****active**** ****job**** ****for**** ****stage**** ****" + stage.id)
24
}
25
}
getMissingParentStages
**
**
功能:
stage劃分算法的具體實(shí)現(xiàn)
實(shí)現(xiàn)原理:
對于一個stage如果它的最后一個rdd的所有依賴都是窄依賴蝉稳,那么不會創(chuàng)建新的stage,但如果存在寬依賴,就用寬依賴的那個rdd
創(chuàng)建一個新的stage并返回
1
// stage劃分算法的具體實(shí)現(xiàn)
2
// 對于一個stage如果它的最后一個rdd的所有依賴都是窄依賴掘鄙,那么不會創(chuàng)建新的stage,
3
// 但如果存在寬依賴耘戚,就用寬依賴的那個rdd創(chuàng)建一個新的stage并返回
4
private def getMissingParentStages(stage: Stage): List[Stage] = {
5
val missing = new HashSet[Stage]
6
val visited = new HashSet[RDD[_]]
7
// We are manually maintaining a stack here to prevent StackOverflowError
8
// caused by recursively visiting
9
val waitingForVisit = new Stack[RDD[_]]
10
def visit(rdd: RDD[_]) {
11
if (!visited(rdd)) {
12
visited += rdd
13
if (getCacheLocs(rdd).contains(Nil)) {
14
// 遍歷RDD
15
for (dep <- rdd.dependencies) {
16
dep match {
17
// 寬依賴處理
18
case shufDep: ShuffleDependency[_, _, _] =>
19
// 創(chuàng)建stage,并將isShuffleMap設(shè)置為true
20
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
21
if (!mapStage.isAvailable) {
22
// 將新創(chuàng)建的stage緩存到missing中
23
missing += mapStage
24
}
25
// 窄依賴處理
26
case narrowDep: NarrowDependency[_] =>
27
// 將依賴的rdd放入棧中
28
waitingForVisit.push(narrowDep.rdd)
29
}
30
}
31
}
32
}
33
}
34
// 向waitingForVisit棧中壓rdd
35
waitingForVisit.push(stage.rdd)
36
while (!waitingForVisit.isEmpty) {
37
visit(waitingForVisit.pop())
38
}
39
// 返回stage列表
40
missing.toList
41
}
說明:
stage劃分算法由submitStage()方法和getMissingStages()方法共同組成
submitMissingTasks
功能:
為stage創(chuàng)建一批task,且task數(shù)量與partition數(shù)量相同
1
// 為stage創(chuàng)建一批task操漠,且task數(shù)量與partition數(shù)量相同
2
private def submitMissingTasks(stage: Stage, jobId: Int) {
3
logDebug("****submitMissingTasks(****" + stage + "****)****")
4
// Get our pending tasks and remember them in our pendingTasks entry
5
stage.pendingTasks.clear()
6
7
// First figure out the indexes of partition ids to compute.
8
// 獲取需要創(chuàng)建的partition數(shù)量
9
val partitionsToCompute: Seq[Int] = {
10
if (stage.isShuffleMap) {
11
(0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)
12
} else {
13
val job = stage.resultOfJob.get
14
(0 until job.numPartitions).filter(id => !job.finished(id))
15
}
16
}
17
18
................................
19
20
// 將stae加入到runningStages緩存中
21
runningStages += stage
22
23
................................
24
25
// 為stage創(chuàng)建指定數(shù)量的task,并計(jì)算最佳位置
26
val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {
27
partitionsToCompute.map { id =>
28
// 計(jì)算最佳位置
29
val locs = getPreferredLocs(stage.rdd, id)
30
val part = stage.rdd.partitions(id)
31
// 創(chuàng)建ShuffleMapTask
32
new ShuffleMapTask(stage.id, taskBinary, part, locs)
33
}
34
} else {
35
val job = stage.resultOfJob.get
36
partitionsToCompute.map { id =>
37
val p: Int = job.partitions(id)
38
val part = stage.rdd.partitions(p)
39
val locs = getPreferredLocs(stage.rdd, p)
40
// 給final stage創(chuàng)建ResultTask
41
new ResultTask(stage.id, taskBinary, part, locs, id)
42
}
43
}
44
45
if (tasks.size > 0) {
46
logInfo("****Submitting**** ****" + tasks.size + "**** ****missing**** ****tasks**** ****from**** ****" + stage + "**** ****(****" + stage.rdd + "****)****")
47
stage.pendingTasks ++= tasks
48
logDebug("****New**** ****pending**** ****tasks:**** ****" + stage.pendingTasks)
49
// 對stage的task創(chuàng)建TaskSet對象收津,調(diào)用TaskScheduler的submitTasks()方法提交TaskSet
50
taskScheduler.submitTasks(
51
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
52
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
53
}
54
55
......................
56
}
getPreferredLocsInternal
功能:
計(jì)算每個task對應(yīng)的partition最佳位置,從stage的最后一個rdd開始查找浊伙,看rdd的partition是否有被cache撞秋、chencjpoint,如果有那么task的最佳位置就被cache或者checkpoint的partition的位置
調(diào)用過程:
submitMissingTasks->getPreferredLocs->getPreferredLocsInternal
1
// 計(jì)算每個task對應(yīng)的partition最佳位置
2
// 從stage的最后一個rdd開始查找,看rdd的partition是否有被cache嚣鄙、chencjpoint,
3
// 如果有那么task的最佳位置就被cache或者checkpoint的partition的位置
4
private def getPreferredLocsInternal(
5
rdd: RDD[_],
6
partition: Int,
7
visited: HashSet[(RDD[_],Int)])
8
: Seq[TaskLocation] =
9
{
10
// If the partition has already been visited, no need to re-visit.
11
// This avoids exponential path exploration. SPARK-695
12
if (!visited.add((rdd,partition))) {
13
// Nil has already been returned for previously visited partitions.
14
return Nil
15
}
16
// If the partition is cached, return the cache locations
17
// 尋找rdd是否被緩存
18
val cached = getCacheLocs(rdd)(partition)
19
if (!cached.isEmpty) {
20
return cached
21
}
22
// If the RDD has some placement preferences (as is the case for input RDDs), get those
23
// 尋找當(dāng)前RDD是否被cachepoint
24
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
25
if (!rddPrefs.isEmpty) {
26
return rddPrefs.map(TaskLocation(_))
27
}
28
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
29
// that has any placement preferences. Ideally we would choose based on transfer sizes,
30
// but this will do for now.
31
// 遞歸調(diào)用自己尋找rdd的父rdd,檢查對應(yīng)的partition是否被緩存或者checkpoint
32
rdd.dependencies.foreach {
33
case n: NarrowDependency[_] =>
34
for (inPart <- n.getParents(partition)) {
35
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
36
if (locs != Nil) {
37
return locs
38
}
39
}
40
case _ =>
41
}
42
// 如果stage從最后一個rdd到最開始的rdd吻贿,partiton都沒有被緩存或者cachepoint,
43
// 那么task的最佳位置(preferredLocs)為Nil
44
Nil
45
}
分類: Spark