self.context.runJob(self, writeToFile)//開始提交任務(wù)狗唉,self就是最后一個rdd械拍,這個rdd通過依賴關(guān)系進行stage切分
runJob(rdd, func)//將最后一個rdd和一個函數(shù)(taskContext, iterator)傳入到該方法中
spark.logLineage = true 打印血統(tǒng)關(guān)系
//DAGScheduler的runJob方法嘴高,切分stage
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties)
這個方法中有
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
并返回一個回調(diào)器
在sparkContext中贸呢,new DAGScheduler
啟動了一個先進先出的隊列W嗨尽I烊小!:
在DAGScheduler的主構(gòu)造器中的最后一行eventProcessLoop.start()
eventProcessLoop:DAGSchedulerEventProcessLoop 繼承 abstract class EventLoop[E](name: String)
這個父類中有
private val eventQueue: BlockingQueue[E] = new LinkedBlockingDequeE
private val eventThread = new Thread(name) 該線程從上面的阻塞隊列中取逢倍,有內(nèi)容取捧颅,沒內(nèi)容阻塞,等著较雕, 先進先出的調(diào)度器碉哑!
其中 onReceive(event) 的實現(xiàn)是它的子類DAGSchedulerEventProcessLoop的onReceive
dagscheduler中先將數(shù)據(jù)封裝在event中,然后放到eventprocessloop阻塞隊列中
DAGScheduler中的handleJobSubmitted用于切分stage(其中的newstage方法)