Spark Streaming 解決這 4 個(gè)問題的不同 focus拓哟,可以將 Spark Streaming 劃分為四個(gè)大的模塊:
模塊 1:DAG 靜態(tài)定義
模塊 2:Job 動(dòng)態(tài)生成
模塊 3:數(shù)據(jù)產(chǎn)生與導(dǎo)入
模塊 4:長(zhǎng)時(shí)容錯(cuò)
模塊 1:DAG 靜態(tài)定義
2.2 模塊 2:Job 動(dòng)態(tài)生成
現(xiàn)在有了DStreamGraph和DStream,也就是靜態(tài)定義了的計(jì)算邏輯,下面我們來看 Spark Streaming 是如何將其動(dòng)態(tài)調(diào)度的。
在 Spark Streaming 程序的入口疹蛉,我們都會(huì)定義一個(gè) batchDuration枝恋,就是需要每隔多長(zhǎng)時(shí)間就比照靜態(tài)的DStreamGraph來動(dòng)態(tài)生成一個(gè) RDD DAG 實(shí)例。在 Spark Streaming 里禁悠,總體負(fù)責(zé)動(dòng)態(tài)作業(yè)調(diào)度的具體類是JobScheduler,在 Spark Streaming 程序開始運(yùn)行的時(shí)候兑宇,會(huì)生成一個(gè)JobScheduler的實(shí)例碍侦,并被 start() 運(yùn)行起來。
JobScheduler有兩個(gè)非常重要的成員:JobGenerator和ReceiverTracker隶糕。JobScheduler將每個(gè) batch 的 RDD DAG 具體生成工作委托給JobGenerator瓷产,而將源頭輸入數(shù)據(jù)的記錄工作委托給ReceiverTracker。
JobGenerator維護(hù)了一個(gè)定時(shí)器枚驻,周期就是我們剛剛提到的 batchDuration濒旦,定時(shí)為每個(gè) batch 生成 RDD DAG 的實(shí)例。具體的再登,每次 RDD DAG 實(shí)際生成包含 5 個(gè)步驟:
(1)要求ReceiverTracker將目前已收到的數(shù)據(jù)進(jìn)行一次 allocate尔邓,即將上次 batch 切分后的數(shù)據(jù)切分到到本次新的 batch 里;
(2)要求DStreamGraph復(fù)制出一套新的 RDD DAG 的實(shí)例锉矢,具體過程是:DStreamGraph將要求圖里的尾DStream節(jié)點(diǎn)生成具體的 RDD 實(shí)例梯嗽,并遞歸的調(diào)用尾DStream的上游DStream節(jié)點(diǎn)……以此遍歷整個(gè)DStreamGraph,遍歷結(jié)束也就正好生成了 RDD DAG 的實(shí)例沽损;
(3)獲取第 1 步ReceiverTracker分配到本 batch 的源頭數(shù)據(jù)的 meta 信息灯节;
(4) 將第 2 步生成的本 batch 的 RDD DAG,和第 3 步獲取到的 meta 信息,一同提交給JobScheduler異步執(zhí)行炎疆;
(5) 只要提交結(jié)束(不管是否已開始異步執(zhí)行)卡骂,就馬上對(duì)整個(gè)系統(tǒng)的當(dāng)前運(yùn)行狀態(tài)做一個(gè) checkpoint。
DStream有一個(gè)重要而特殊的子類ReceiverInputDStream:它除了需要像其它DStream那樣在某個(gè) batch 里實(shí)例化RDD以外形入,還需要額外的Receiver為這個(gè)RDD生產(chǎn)數(shù)據(jù)全跨!
具體的,Spark Streaming 在程序剛開始運(yùn)行時(shí):
(1) 由Receiver的總指揮ReceiverTracker分發(fā)多個(gè) job(每個(gè) job 有 1 個(gè) task)亿遂,到多個(gè) executor 上分別啟動(dòng)ReceiverSupervisor實(shí)例浓若;
(2) 每個(gè)ReceiverSupervisor啟動(dòng)后將馬上生成一個(gè)用戶提供的Receiver實(shí)現(xiàn)的實(shí)例 —— 該Receiver實(shí)現(xiàn)可以持續(xù)產(chǎn)生或者持續(xù)接收系統(tǒng)外數(shù)據(jù),比如TwitterReceiver可以實(shí)時(shí)爬取 twitter 數(shù)據(jù) —— 并在Receiver實(shí)例生成后調(diào)用Receiver.onStart()崩掘;
注意到這里采用的是完整 checkpoint 的方式,和之前的 WAL 的方式都不一樣少办。Checkpoint通常也是落地到可靠存儲(chǔ)如 HDFS苞慢。Checkpoint發(fā)起的間隔默認(rèn)的是和batchDuration 一致;即每次 batch 發(fā)起英妓、提交了需要運(yùn)行的 job 后就做Checkpoint挽放,另外在 job 完成了更新任務(wù)狀態(tài)的時(shí)候再次做一下Checkpoint。
這樣一來蔓纠,在 driver 失效并恢復(fù)后辑畦,可以讀取最近一次的Checkpoint來恢復(fù)作業(yè)的DStreamGraph和 job 的運(yùn)行及完成狀態(tài)。
模塊長(zhǎng)時(shí)容錯(cuò)保障方式
模塊 1-DAG 靜態(tài)定義driver 端定時(shí)對(duì) DStreamGraph 做 Checkpoint腿倚,來記錄整個(gè) DStreamGraph 的變化
模塊 2-job 動(dòng)態(tài)生成driver 端定時(shí)對(duì) JobScheduler 做 Checkpoint纯出,來記錄每個(gè) batch 的 job 的完成情況
模塊 3-數(shù)據(jù)產(chǎn)生與導(dǎo)入driver 端源頭塊數(shù)據(jù)的 meta 信息上報(bào) ReceiverTracker 時(shí),寫入 WAL
模塊 3-數(shù)據(jù)產(chǎn)生與導(dǎo)入executor 端對(duì)源頭塊數(shù)據(jù)的保障:(1) 熱備敷燎;(2) 冷備暂筝;(3) 重放;(4) 忽略
總結(jié)一下“模塊4:長(zhǎng)時(shí)容錯(cuò)”的內(nèi)容為上述表格硬贯,可以看到焕襟,Spark Streaming 的長(zhǎng)時(shí)容錯(cuò)特性,能夠提供不重饭豹、不丟鸵赖,exactly-once 的處理語(yǔ)義。