揭開Spark Streaming神秘面紗③ - 動態(tài)生成 job

JobScheduler有兩個重要成員豪诲,一是上文介紹的 ReceiverTracker挥等,負責分發(fā) receivers 及源源不斷地接收數(shù)據(jù);二是本文將要介紹的 JobGenerator抛腕,負責定時的生成 jobs 并 checkpoint诈悍。

定時邏輯

在 JobScheduler 的主構(gòu)造函數(shù)中,會創(chuàng)建 JobGenerator 對象兽埃。在 JobGenerator 的主構(gòu)造函數(shù)中,會創(chuàng)建一個定時器:

  private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

該定時器每隔 ssc.graph.batchDuration.milliseconds 會執(zhí)行一次 eventLoop.post(GenerateJobs(new Time(longTime))) 向 eventLoop 發(fā)送 GenerateJobs(new Time(longTime))消息适袜,eventLoop收到消息后會進行這個 batch 對應(yīng)的 jobs 的生成及提交執(zhí)行柄错,eventLoop 是一個消息接收處理器。
需要注意的是,timer 在創(chuàng)建之后并不會馬上啟動售貌,將在 StreamingContext#start() 啟動 Streaming Application 時間接調(diào)用到 timer.start(restartTime.milliseconds)才啟動给猾。

為 batch 生成 jobs

eventLoop 在接收到 GenerateJobs(new Time(longTime))消息后的主要處理流程有以上圖中三步:

  1. 將已接收到的 blocks 分配給 batch
  2. 生成該 batch 對應(yīng)的 jobs
  3. 將 jobs 封裝成 JobSet 并提交執(zhí)行

接下來我們就將逐一展開這三步進行分析

將已接受到的 blocks 分配給 batch

上圖是根據(jù)源碼畫出的為 batch 分配 blocks 的流程圖,這里對 『獲得 batchTime 各個 InputDStream 未分配的 blocks』作進一步說明:
在文章 『文章鏈接』 中我們知道了各個 ReceiverInputDStream 對應(yīng)的 receivers 接收并保存的 blocks 信息會保存在 ReceivedBlockTracker#streamIdToUnallocatedBlockQueues颂跨,該成員 key 為 streamId敢伸,value 為該 streamId 對應(yīng)的 InputDStream 已接收保存但尚未分配的 blocks 信息。
所以獲取某 InputDStream 未分配的 blocks 只要以該 InputDStream 的 streamId 來從 streamIdToUnallocatedBlockQueues 來 get 就好恒削。獲取之后池颈,會清楚該 streamId 對應(yīng)的value,以保證 block 不會被重復(fù)分配钓丰。

在實際調(diào)用中躯砰,為 batchTime 分配 blocks 時,會從streamIdToUnallocatedBlockQueues取出未分配的 blocks 塞進 timeToAllocatedBlocks: mutable.HashMap[Time, AllocatedBlocks] 中携丁,以在之后作為該 batchTime 對應(yīng)的 RDD 的輸入數(shù)據(jù)琢歇。

通過以上步驟,就可以為 batch 的所有 InputDStream 分配 blocks梦鉴。也就是為 batch 分配了 blocks李茫。

生成該 batch 對應(yīng)的 jobs

為指定 batchTime 生成 jobs 的邏輯如上圖所示。你可能會疑惑肥橙,為什么 DStreamGraph#generateJobs(time: Time)為什么返回 Seq[Job]魄宏,而不是單個 job。這是因為快骗,在一個 batch 內(nèi)娜庇,可能會有多個 OutputStream 執(zhí)行了多次 output 操作,每次 output 操作都將產(chǎn)生一個 Job方篮,最終就會產(chǎn)生多個 Jobs名秀。

我們結(jié)合上圖對執(zhí)行流程進一步分析。
DStreamGraph#generateJobs(time: Time)中藕溅,對于DStreamGraph成員ArrayBuffer[DStream[_]]的每一項匕得,調(diào)用DStream#generateJob(time: Time)來生成這個 outputStream 在該 batchTime 的 job。該生成過程主要有三步:

Step1: 獲取該 outputStream 在該 batchTime 對應(yīng)的 RDD

每個 DStream 實例都有一個 generatedRDDs: HashMap[Time, RDD[T]] 成員巾表,用來保存該 DStream 在每個 batchTime 生成的 RDD汁掠,當 DStream#getOrCompute(time: Time)調(diào)用時

  • 首先會查看generatedRDDs中是否已經(jīng)有該 time 對應(yīng)的 RDD,若有則直接返回

  • 若無集币,則調(diào)用compute(validTime: Time)來生成 RDD考阱,這一步根據(jù)每個 InputDStream繼承 compute 的實現(xiàn)不同而不同。例如鞠苟,對于 FileInputDStream乞榨,其 compute 實現(xiàn)邏輯如下:

    1. 先通過一個 findNewFiles() 方法秽之,找到多個新 file
    2. 對每個新 file,都將其作為參數(shù)調(diào)用 sc.newAPIHadoopFile(file)吃既,生成一個 RDD 實例
    3. 將 2 中的多個新 file 對應(yīng)的多個 RDD 實例進行 union考榨,返回一個 union 后的 UnionRDD

Step2: 根據(jù) Step1中得到的 RDD 生成最終 job 要執(zhí)行的函數(shù) jobFunc

jobFunc定義如下:

val jobFunc = () => {
  val emptyFunc = { (iterator: Iterator[T]) => {} }
  context.sparkContext.runJob(rdd, emptyFunc)
}

可以看到,每個 outputStream 的 output 操作生成的 Job 其實與 RDD action 一樣鹦倚,最終調(diào)用 SparkContext#runJob 來提交 RDD DAG 定義的任務(wù)

Step3: 根據(jù) Step2中得到的 jobFunc 生成最終要執(zhí)行的 Job 并返回

Step2中得到了定義 Job 要干嘛的函數(shù)-jobFunc河质,這里便以 jobFunc及 batchTime 生成 Job 實例:

Some(new Job(time, jobFunc))

該Job實例將最終封裝在 JobHandler 中被執(zhí)行

至此,我們搞明白了 JobScheduler 是如何通過一步步調(diào)用來動態(tài)生成每個 batchTime 的 jobs震叙。下文我們將分析這些動態(tài)生成的 jobs 如何被分發(fā)及如何執(zhí)行掀鹅。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市捐友,隨后出現(xiàn)的幾起案子淫半,更是在濱河造成了極大的恐慌,老刑警劉巖匣砖,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件科吭,死亡現(xiàn)場離奇詭異,居然都是意外死亡猴鲫,警方通過查閱死者的電腦和手機对人,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來拂共,“玉大人牺弄,你說我怎么就攤上這事∫撕” “怎么了势告?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長抚恒。 經(jīng)常有香客問我咱台,道長,這世上最難降的妖魔是什么俭驮? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任回溺,我火速辦了婚禮,結(jié)果婚禮上混萝,老公的妹妹穿的比我還像新娘遗遵。我一直安慰自己,他們只是感情好逸嘀,可當我...
    茶點故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布车要。 她就那樣靜靜地躺著,像睡著了一般崭倘。 火紅的嫁衣襯著肌膚如雪翼岁。 梳的紋絲不亂的頭發(fā)上维哈,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天,我揣著相機與錄音登澜,去河邊找鬼。 笑死飘庄,一個胖子當著我的面吹牛脑蠕,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播跪削,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼谴仙,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了碾盐?” 一聲冷哼從身側(cè)響起晃跺,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎毫玖,沒想到半個月后掀虎,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡付枫,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年烹玉,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片阐滩。...
    茶點故事閱讀 39,902評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡二打,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出掂榔,到底是詐尸還是另有隱情继效,我是刑警寧澤,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布装获,位于F島的核電站瑞信,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏饱溢。R本人自食惡果不足惜喧伞,卻給世界環(huán)境...
    茶點故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望绩郎。 院中可真熱鬧潘鲫,春花似錦、人聲如沸肋杖。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽状植。三九已至浊竟,卻和暖如春怨喘,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背振定。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工必怜, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人后频。 一個月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓梳庆,卻偏偏與公主長得像,于是被迫代替她去往敵國和親卑惜。 傳聞我的和親對象是個殘疾皇子膏执,可洞房花燭夜當晚...
    茶點故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容