一個 Streaming Application 往往需要7*24不間斷的跑贺奠,所以需要有抵御意外的能力(比如機器或者系統(tǒng)掛掉己单,JVM crash等)赛不。為了讓這成為可能睬辐,Spark Streaming需要 checkpoint 足夠多信息至一個具有容錯設(shè)計的存儲系統(tǒng)才能讓 Application 從失敗中恢復(fù)疗垛。Spark Streaming 會 checkpoint 兩種類型的數(shù)據(jù)症汹。
- Metadata(元數(shù)據(jù)) checkpointing - 保存定義了 Streaming 計算邏輯至類似 HDFS 的支持容錯的存儲系統(tǒng)。用來恢復(fù) driver贷腕,元數(shù)據(jù)包括:
- 配置 - 用于創(chuàng)建該 streaming application 的所有配置
- DStream 操作 - DStream 一些列的操作
- 未完成的 batches - 那些提交了 job 但尚未執(zhí)行或未完成的 batches
- Data checkpointing - 保存已生成的RDDs至可靠的存儲背镇。這在某些 stateful 轉(zhuǎn)換中是需要的,在這種轉(zhuǎn)換中泽裳,生成 RDD 需要依賴前面的 batches瞒斩,會導(dǎo)致依賴鏈隨著時間而變長。為了避免這種沒有盡頭的變長涮总,要定期將中間生成的 RDDs 保存到可靠存儲來切斷依賴鏈
總之胸囱,metadata checkpointing 主要用來恢復(fù) driver;而 RDD數(shù)據(jù)的 checkpointing 對于stateful 轉(zhuǎn)換操作是必要的瀑梗。
什么時候需要啟用 checkpoint烹笔?
什么時候該啟用 checkpoint 呢?滿足以下任一條件:
- 使用了 stateful 轉(zhuǎn)換 - 如果 application 中使用了
updateStateByKey
或reduceByKeyAndWindow
等 stateful 操作抛丽,必須提供 checkpoint 目錄來允許定時的 RDD checkpoint - 希望能從意外中恢復(fù) driver
如果 streaming app 沒有 stateful 操作谤职,也允許 driver 掛掉后再次重啟的進度丟失,就沒有啟用 checkpoint的必要了亿鲜。
如何使用 checkpoint柬帕?
啟用 checkpoint,需要設(shè)置一個支持容錯 的狡门、可靠的文件系統(tǒng)(如 HDFS陷寝、s3 等)目錄來保存 checkpoint 數(shù)據(jù)。通過調(diào)用 streamingContext.checkpoint(checkpointDirectory)
來完成其馏。另外凤跑,如果你想讓你的 application 能從 driver 失敗中恢復(fù),你的 application 要滿足:
- 若 application 為首次重啟叛复,將創(chuàng)建一個新的 StreamContext 實例
- 如果 application 是從失敗中重啟仔引,將會從 checkpoint 目錄導(dǎo)入 checkpoint 數(shù)據(jù)來重新創(chuàng)建 StreamingContext 實例
通過 StreamingContext.getOrCreate
可以達到目的:
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
如果 checkpointDirectory 存在扔仓,那么 context 將導(dǎo)入 checkpoint 數(shù)據(jù)。如果目錄不存在咖耘,函數(shù) functionToCreateContext 將被調(diào)用并創(chuàng)建新的 context
除調(diào)用 getOrCreate 外翘簇,還需要你的集群模式支持 driver 掛掉之后重啟之。例如儿倒,在 yarn 模式下版保,driver 是運行在 ApplicationMaster 中,若 ApplicationMaster 掛掉夫否,yarn 會自動在另一個節(jié)點上啟動一個新的 ApplicationMaster彻犁。
需要注意的是,隨著 streaming application 的持續(xù)運行凰慈,checkpoint 數(shù)據(jù)占用的存儲空間會不斷變大汞幢。因此,需要小心設(shè)置checkpoint 的時間間隔微谓。設(shè)置得越小森篷,checkpoint 次數(shù)會越多,占用空間會越大豺型;如果設(shè)置越大仲智,會導(dǎo)致恢復(fù)時丟失的數(shù)據(jù)和進度越多。一般推薦設(shè)置為 batch duration 的5~10倍触创。
導(dǎo)出 checkpoint 數(shù)據(jù)
上文提到坎藐,checkpoint 數(shù)據(jù)會定時導(dǎo)出到可靠的存儲系統(tǒng)为牍,那么
- 在什么時機進行 checkpoint
- checkpoint 的形式是怎么樣的
checkpoint 的時機
在 Spark Streaming 中哼绑,JobGenerator 用于生成每個 batch 對應(yīng)的 jobs,它有一個定時器碉咆,定時器的周期即初始化 StreamingContext 時設(shè)置的 batchDuration抖韩。這個周期一到,JobGenerator 將調(diào)用generateJobs方法來生成并提交 jobs疫铜,這之后調(diào)用 doCheckpoint 方法來進行 checkpoint茂浮。doCheckpoint 方法中,會判斷當(dāng)前時間與 streaming application start 的時間之差是否是 checkpoint duration 的倍數(shù)壳咕,只有在是的情況下才進行 checkpoint席揽。
checkpoint 的形式
最終 checkpoint 的形式是將類 Checkpoint的實例序列化后寫入外部存儲,值得一提的是谓厘,有專門的一條線程來做將序列化后的 checkpoint 寫入外部存儲幌羞。類 Checkpoint 包含以下數(shù)據(jù)
除了 Checkpoint 類,還有 CheckpointWriter 類用來導(dǎo)出 checkpoint竟稳,CheckpointReader 用來導(dǎo)入 checkpoint
Checkpoint 的局限
Spark Streaming 的 checkpoint 機制看起來很美好属桦,卻有一個硬傷熊痴。上文提到最終刷到外部存儲的是類 Checkpoint 對象序列化后的數(shù)據(jù)。那么在 Spark Streaming application 重新編譯后聂宾,再去反序列化 checkpoint 數(shù)據(jù)就會失敗果善。這個時候就必須新建 StreamingContext。
針對這種情況系谐,在我們結(jié)合 Spark Streaming + kafka 的應(yīng)用中巾陕,我們自行維護了消費的 offsets,這樣一來及時重新編譯 application蔚鸥,還是可以從需要的 offsets 來消費數(shù)據(jù)惜论,這里只是舉個例子,不詳細展開了止喷。