spark streaming的checkpoint目的是保證長時間運行的任務(wù)在意外掛掉后保證數(shù)據(jù)不丟失寝蹈,checkpoint包含兩種數(shù)據(jù):metadata和data柱锹,本篇主要討論對metadata的checkpoint吹榴。
如何checkpoint
如果要對metadata做checkpoint,首先要有一個可靠的文件系統(tǒng)保證數(shù)據(jù)的安全性,spark支持hdfs等巷帝。通過代碼streamingContext.checkpoint(checkpointDirectory)指定具體的存儲路徑;
jobGenerator在每一個batch時間后調(diào)用generateJobs方法研侣,在jobScheduler.submitJobSet提交任務(wù)后谱邪,執(zhí)行doCheckpoint方法來保存metadata;
doCheckpoint方法中先判斷是否需要checkpoint庶诡,條件為ssc.checkpointDuration != null && ssc.checkpointDir != null惦银,最重要的是指定后面的ssc.checkpointDir指定路徑,再判斷是否到時間末誓,如果滿足條件進(jìn)行正式代碼扯俱;
-
通過ssc.graph.updateCheckpointData(time)調(diào)用DStream的updateCheckpointData,從而執(zhí)行每個DStream子類的checkpointData.update(currentTime)基显,以DirectKafkaInputDStream為例蘸吓,最后執(zhí)行的是DirectKafkaInputDStreamCheckpointData的update,目的是更新要持久的源數(shù)據(jù)checkpointData.data撩幽;通過dependencies.foreach(_.updateCheckpointData(currentTime))使所有依賴的DStream執(zhí)行库继;
所有DStream都執(zhí)行完update后,執(zhí)行CheckpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)窜醉,本次batchcheckpoint完成宪萄;
當(dāng)jobGenerator接收到batch完成事件后,通過jobGenerator.onBatchCompletion(jobSet.time)調(diào)用clearMetadata方法榨惰,最后執(zhí)行DStream的clearMetadata刪除generatedRDDs的過期RDD的metadata拜英。
如何恢復(fù)
-
要從checkpoint中恢復(fù),在創(chuàng)建StreamingContext時略有不同琅催,代碼如圖
StreamingContext的getOrCreate方法中居凶,先通過CheckpointReader.read( checkpointPath, new SparkConf(), hadoopConf, createOnError)反序列化出Checkpoint,如果Checkpoint不為空即路徑存在且有數(shù)據(jù)藤抡,使用StreamingContext(null, _, null)構(gòu)造方法創(chuàng)建StreamingContext侠碧;
StreamingContext.start后,在使用DStreamGraph的實例時時會判斷此實例為新創(chuàng)建或從checkpoint中恢復(fù)缠黍,如從checkpoint中恢復(fù)弄兜,則執(zhí)行g(shù)raph.restoreCheckpointData(),通過DStream的restoreCheckpointData最終調(diào)用DStream子類內(nèi)部的DStreamCheckpointData.restore將保存的RDD metadata寫回到generatedRDDs里;
-
同時jobGenerator在start時瓷式,判斷ssc.isCheckpointPresent替饿,實際就是判斷ssc里面的cp_是否有值從而執(zhí)行restart方法。restart方法首先從checkpoint的時間開始恢復(fù)任務(wù)贸典,然后生成從最后時間到restartTime時間序列视卢;
-
調(diào)用graph.generateJobs生成job,在方法內(nèi)會調(diào)用DStream的generateJobs時廊驼,在getOrCompute方法通過上面還原的generatedRDDs獲取對應(yīng)時間的RDD源數(shù)據(jù)信息据过,如果沒有再重新生成颊埃,最后提交任務(wù)。
創(chuàng)建與恢復(fù)區(qū)別
-
先看一下Checkpoint中包括哪些信息:
val master = ssc.sc.master val framework = ssc.sc.appName val jars = ssc.sc.jars val graph = ssc.graph val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConfPairs = ssc.conf.getAll
以上數(shù)據(jù)都是通過反序列化恢復(fù)得到的蝶俱,對新程序的所有的配置都不會生效,比如隊列饥漫、資源數(shù)等榨呆。
恢復(fù)checkpoint時,從文件系統(tǒng)反序列化數(shù)據(jù)成CheckPoint的具體代碼為Checkpoint.deserialize(fis, conf)庸队,所以還原的信息要與當(dāng)前編譯的serialVersion一致积蜻,否則會出現(xiàn)異常
在jobGenerator中,新創(chuàng)建的StreamingContext調(diào)用的是startFirstTime方法彻消,會初始化DStream的一些數(shù)據(jù)竿拆;而checkpoint恢復(fù)調(diào)用的是restart。