本文基于2.3.0
眾所周知良蒸,RDD的依賴(lài)關(guān)系形成后控嗜,我們就可以根據(jù)寬依賴(lài)劃分Stage了铐懊。
目前Spark 的 stage分為兩種: org.apache.spark.scheduler.ResultStage
和 org.apache.spark.scheduler.ShuffleMapStage
, 他們都是由org.apache.spark.scheduler.DAGScheduler
對(duì)RDD進(jìn)行劃分得來(lái)媒抠。
關(guān)于DAGScheduler
的幾個(gè)問(wèn)題:
DAGScheduler什么時(shí)候生成
SparkContext
初始化時(shí)
_dagScheduler = new DAGScheduler(this)
怎么觸發(fā)DAGScheduler進(jìn)行工作
總的來(lái)說(shuō)蝇完,DAGScheduler每一個(gè)工作的開(kāi)始爸邢,依靠事件驅(qū)動(dòng)巫员,其中當(dāng)然也包括Stage劃分。
下面來(lái)詳細(xì)分析一番:
與DAGScheduler工作相關(guān)的驅(qū)動(dòng)事件都定義在DAGSchedulerEvent.scala
里甲棍,其中和啟動(dòng)Stage劃分有關(guān)的就是org.apache.spark.scheduler.JobSubmitted
下面簡(jiǎn)單梳理一下整個(gè)事件觸發(fā)的流程:
首先
org.apache.spark.SparkContext#submitJob
提交job
def submitJob[T, U, R](
rdd: RDD[T],
processPartition: Iterator[T] => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit,
resultFunc: => R): SimpleFutureAction[R] =
{
assertNotStopped()
val cleanF = clean(processPartition)
val callSite = getCallSite
val waiter = dagScheduler.submitJob( / 重點(diǎn)
rdd,
(context: TaskContext, iter: Iterator[T]) => cleanF(iter),
partitions,
callSite,
resultHandler,
localProperties.get)
new SimpleFutureAction(waiter, resultFunc)
}
可以看到简识,最終還是調(diào)用dagScheduler.submitJob()
進(jìn)入org.apache.spark.scheduler.DAGScheduler#submitJob
方法
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
......
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted( / 把事件post出去
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
很明顯赶掖,eventProcessLoop
將JobSubmitted
事件做了一個(gè)post操作。這個(gè)post操作很簡(jiǎn)單七扰,就是將事件放入了org.apache.spark.util.EventLoop#eventQueue
這個(gè)阻塞隊(duì)列而已奢赂。
代碼中的eventProcessLoop
是什么呢? 其實(shí)就是EventLoop
的子類(lèi)org.apache.spark.scheduler.DAGSchedulerEventProcessLoop
颈走。很顯然膳灶,上面提到的隊(duì)列也就是DAGSchedulerEventProcessLoop
對(duì)象的。
org.apache.spark.util.EventLoop
這個(gè)類(lèi)里有個(gè)重要的線程
需要提一下:
private val eventThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = {
try {
while (!stopped.get) {
val event = eventQueue.take()
try {
onReceive(event)
} catch {
......
} catch {
......
}
}
}
這個(gè)線程一旦開(kāi)啟立由,會(huì)不停在eventQueue
取事件轧钓,然后調(diào)用onReceive(event)
。
到這锐膜,其實(shí)大概就已經(jīng)能猜到事件是如何觸發(fā)DAGScheduler
工作了毕箍。
DAGScheduler
內(nèi)部有成員變量DAGSchedulerEventProcessLoop
, 并且會(huì)在自己初始化時(shí),調(diào)用其start
方法
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
eventProcessLoop.start()
這個(gè)start()
方法做了什么呢道盏?
def start(): Unit = {
if (stopped.get) {
throw new IllegalStateException(name + " has already been stopped")
}
// Call onStart before starting the event thread to make sure it happens before onReceive
onStart()
eventThread.start()
}
沒(méi)錯(cuò)而柑,啟動(dòng)了eventThread
。到此荷逞,可以看到DAGScheduler
持有eventProcessLoop
, 自己post媒咳,自己消費(fèi)。
還記得eventThread#run
方法里調(diào)用了onReceive(event)
嗎种远?我們看下DAGSchedulerEventProcessLoop
是如何實(shí)現(xiàn)父類(lèi)的onReceive(event)
的涩澡。
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event) /重點(diǎn)
} finally {
timerContext.stop()
}
}
很簡(jiǎn)單,繼續(xù)跟進(jìn)doOnReceive(event)
, org.apache.spark.scheduler.DAGSchedulerEventProcessLoop#doOnReceive
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
......
}
很簡(jiǎn)單坠敷,繼續(xù)跟進(jìn)dagScheduler.handleJobSubmitted
, org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
/ 分Stage入口
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
}
......
}
看到createResultStage
妙同,這就是開(kāi)始劃分Stage了。
到這里常拓,總結(jié)而言渐溶,提交job后,dagScheduler會(huì)根據(jù)JobSubmitted事件弄抬,觸發(fā)stage劃分工作茎辐。
具體DAGScheduler是如何劃分的
上面部分已經(jīng)提到createResultStage
方法,跟進(jìn)看一下:
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
/1. 建立ResultStage 的 父Stage
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
/2. ResultStage直接new就可以了
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
ResultStage
沒(méi)有什么太特別掂恕,直接new出來(lái)了拖陆,主要還是它的父Stage們。我們?cè)訇P(guān)注getOrCreateParentStages(rdd, jobId)
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
上面代碼意思很明顯懊亡,要獲得rdd的ShuffleDependencies, 也就是寬依賴(lài)們依啰。然后逐個(gè)生成ShuffleMapStage
. 那我們繼續(xù)跟蹤getOrCreateShuffleMapStage(shuffleDep, firstJobId)
的調(diào)用,最后會(huì)來(lái)到DAGScheduler#createShuffleMapStage
:
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd / 這里拿出了父rdd
val numTasks = rdd.partitions.length
val parents = getOrCreateParentStages(rdd, jobId) / 在此繼續(xù)往前追溯ShuffleMapStage
val id = nextStageId.getAndIncrement()
val stage = new ShuffleMapStage( /這個(gè)ShuffleMapStage的生成就到此為止了
id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)
if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}
到這里,ShuffleMapStage
生成了店枣。
總結(jié)速警,整體過(guò)程還是比較清晰叹誉,從最末尾的rdd開(kāi)始,往前追溯闷旧,按寬依賴(lài)劃分生成ShuffleMapStage
, 最后一個(gè)stage直接為ResultStage
长豁。整個(gè)過(guò)程中也同時(shí)在構(gòu)建stage間的父子依賴(lài)關(guān)系。
收工Cψ啤=辰蟆!