JobScheduler有兩個重要成員豪诲,一是上文介紹的 ReceiverTracker挥等,負責分發(fā) receivers 及源源不斷地接收數(shù)據(jù);二是本文將要介紹的 JobGenerator抛腕,負責定時的生成 jobs 并 checkpoint诈悍。
定時邏輯
在 JobScheduler 的主構(gòu)造函數(shù)中,會創(chuàng)建 JobGenerator 對象兽埃。在 JobGenerator 的主構(gòu)造函數(shù)中,會創(chuàng)建一個定時器:
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
該定時器每隔 ssc.graph.batchDuration.milliseconds
會執(zhí)行一次 eventLoop.post(GenerateJobs(new Time(longTime)))
向 eventLoop 發(fā)送 GenerateJobs(new Time(longTime))
消息适袜,eventLoop收到消息后會進行這個 batch 對應(yīng)的 jobs 的生成及提交執(zhí)行柄错,eventLoop 是一個消息接收處理器。
需要注意的是,timer 在創(chuàng)建之后并不會馬上啟動售貌,將在 StreamingContext#start()
啟動 Streaming Application 時間接調(diào)用到 timer.start(restartTime.milliseconds)
才啟動给猾。
為 batch 生成 jobs
eventLoop 在接收到 GenerateJobs(new Time(longTime))
消息后的主要處理流程有以上圖中三步:
- 將已接收到的 blocks 分配給 batch
- 生成該 batch 對應(yīng)的 jobs
- 將 jobs 封裝成 JobSet 并提交執(zhí)行
接下來我們就將逐一展開這三步進行分析
將已接受到的 blocks 分配給 batch
上圖是根據(jù)源碼畫出的為 batch 分配 blocks 的流程圖,這里對 『獲得 batchTime 各個 InputDStream 未分配的 blocks』作進一步說明:
在文章 『文章鏈接』 中我們知道了各個 ReceiverInputDStream 對應(yīng)的 receivers 接收并保存的 blocks 信息會保存在 ReceivedBlockTracker#streamIdToUnallocatedBlockQueues
颂跨,該成員 key 為 streamId敢伸,value 為該 streamId 對應(yīng)的 InputDStream 已接收保存但尚未分配的 blocks 信息。
所以獲取某 InputDStream 未分配的 blocks 只要以該 InputDStream 的 streamId 來從 streamIdToUnallocatedBlockQueues 來 get 就好恒削。獲取之后池颈,會清楚該 streamId 對應(yīng)的value,以保證 block 不會被重復(fù)分配钓丰。
在實際調(diào)用中躯砰,為 batchTime 分配 blocks 時,會從streamIdToUnallocatedBlockQueues取出未分配的 blocks 塞進 timeToAllocatedBlocks: mutable.HashMap[Time, AllocatedBlocks]
中携丁,以在之后作為該 batchTime 對應(yīng)的 RDD 的輸入數(shù)據(jù)琢歇。
通過以上步驟,就可以為 batch 的所有 InputDStream 分配 blocks梦鉴。也就是為 batch 分配了 blocks李茫。
生成該 batch 對應(yīng)的 jobs
為指定 batchTime 生成 jobs 的邏輯如上圖所示。你可能會疑惑肥橙,為什么 DStreamGraph#generateJobs(time: Time)
為什么返回 Seq[Job]
魄宏,而不是單個 job。這是因為快骗,在一個 batch 內(nèi)娜庇,可能會有多個 OutputStream 執(zhí)行了多次 output 操作,每次 output 操作都將產(chǎn)生一個 Job方篮,最終就會產(chǎn)生多個 Jobs名秀。
我們結(jié)合上圖對執(zhí)行流程進一步分析。
在DStreamGraph#generateJobs(time: Time)
中藕溅,對于DStreamGraph成員ArrayBuffer[DStream[_]]的每一項匕得,調(diào)用DStream#generateJob(time: Time)
來生成這個 outputStream 在該 batchTime 的 job。該生成過程主要有三步:
Step1: 獲取該 outputStream 在該 batchTime 對應(yīng)的 RDD
每個 DStream 實例都有一個 generatedRDDs: HashMap[Time, RDD[T]]
成員巾表,用來保存該 DStream 在每個 batchTime 生成的 RDD汁掠,當 DStream#getOrCompute(time: Time)
調(diào)用時
首先會查看generatedRDDs中是否已經(jīng)有該 time 對應(yīng)的 RDD,若有則直接返回
-
若無集币,則調(diào)用
compute(validTime: Time)
來生成 RDD考阱,這一步根據(jù)每個 InputDStream繼承 compute 的實現(xiàn)不同而不同。例如鞠苟,對于 FileInputDStream乞榨,其 compute 實現(xiàn)邏輯如下:- 先通過一個 findNewFiles() 方法秽之,找到多個新 file
- 對每個新 file,都將其作為參數(shù)調(diào)用 sc.newAPIHadoopFile(file)吃既,生成一個 RDD 實例
- 將 2 中的多個新 file 對應(yīng)的多個 RDD 實例進行 union考榨,返回一個 union 后的 UnionRDD
Step2: 根據(jù) Step1中得到的 RDD 生成最終 job 要執(zhí)行的函數(shù) jobFunc
jobFunc定義如下:
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
可以看到,每個 outputStream 的 output 操作生成的 Job 其實與 RDD action 一樣鹦倚,最終調(diào)用 SparkContext#runJob 來提交 RDD DAG 定義的任務(wù)
Step3: 根據(jù) Step2中得到的 jobFunc 生成最終要執(zhí)行的 Job 并返回
Step2中得到了定義 Job 要干嘛的函數(shù)-jobFunc河质,這里便以 jobFunc及 batchTime 生成 Job 實例:
Some(new Job(time, jobFunc))
該Job實例將最終封裝在 JobHandler 中被執(zhí)行
至此,我們搞明白了 JobScheduler 是如何通過一步步調(diào)用來動態(tài)生成每個 batchTime 的 jobs震叙。下文我們將分析這些動態(tài)生成的 jobs 如何被分發(fā)及如何執(zhí)行掀鹅。