Driver容錯
- 思想:
- 周期性將Dstream的DAG持久化到文件 系統(tǒng)中泪漂,重新啟動Driver時重新加載DAG
- 實現(xiàn):
- 啟動Driver自動重啟(ClusterManager支持該功能)
- standalone: 提交任務(wù)時添加
--supervise
參數(shù)
官方文檔鏈接 - yarn:設(shè)置
yarn.resourcemanager.am.max-attempts
或者spark.yarn.maxAppAttempts
spark on yarn參數(shù)配置 - mesos: 提交任務(wù)時添加 --supervise 參數(shù)
- standalone: 提交任務(wù)時添加
- 設(shè)置checkpoint
StreamingContext.setCheckpoint(hdfsDirectory)
- 支持從checkpoint中重啟
def createContext(checkpointDirectory: String): StreamingContext = {
val ssc = ???
ssc.checkpoint(checkpointDirectory)
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDirectory, createContext(checkpointDirectory))
Executor通用容錯
- 思路:
- spark 處理RDD失敗會通過lineage進行重做保證數(shù)據(jù)可靠
- 對于reduceByKey等Stateful操作重做的lineage較長的舌仍,強制啟動checkpoint仇味,減少重做幾率
- 實現(xiàn):
- 啟用checkpoint
ssc.setCheckpoint(checkpointDir)
Receiver容錯
- 思想:
- 接收到數(shù)據(jù)后先寫日志(WAL)到可靠文件系統(tǒng)中汇荐,后才寫入實際的RDD。如果后續(xù)處理失敗則成功寫入WAL的數(shù)據(jù)通過WAL進行恢復(fù),未成功寫入WAL的數(shù)據(jù)通過可回溯的Source進行重放
- 實現(xiàn):
- 啟用checkpoint
ssc.setCheckpoint(checkpointDir)
- 啟用WAL
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
對Receiver使用可靠性存儲
StoreageLevel.MEMORY_AND_DISK_SER
orStoreageLevel.MEMORY_AND_DISK_SER2
效果:
- 常規(guī)的數(shù)據(jù)流:Receiver提供
AtLeastOne
語義(可能重復(fù))
- 基于KafkaDirect的Receiver提供
ExactlyOne
語義,保證數(shù)據(jù)不丟不重復(fù)
Graceful Stop 優(yōu)雅停止
- 思想:
- 結(jié)束任務(wù)時等待處理中的任務(wù)結(jié)束嗤堰,并保留當前工作狀態(tài)到checkpoint中,確保重啟任務(wù)后能獲取到正確的checkpoint
- 等待時間的設(shè)置(如何保證所有數(shù)據(jù)寫入checkpoint度宦?踢匣, 一般設(shè)置成slideWindow的5-10倍)
- 實現(xiàn): 有如下幾種:
- 設(shè)置spark.streaming.stopGracefullyOnShutdown為true
- 添加hook
sys.ShutdownHookThread {
println("Gracefully stopping Spark Streaming Application at"+ new Date())
ssc.stop(true, true)
println("Application stopped at"+ new Date())
}
StreamingContext代碼鏈接,graceful stop 邏輯見stop函數(shù)
TODO
- Kafka Direct如何保證 Exactly One
- 如何支持window容災(zāi)戈抄?(個人YY的一個場景和自我理解离唬,不確定是否正確)
window為5分鐘,slide為1分鐘的任務(wù)划鸽,執(zhí)行到第100分鐘暫停后输莺,checkpoint中的進度應(yīng)該是[96,100], 10分鐘后啟動戚哎,此時需要的進度是[106,110],輸出數(shù)據(jù)會出現(xiàn)丟失?
spark-streaming使用的是系統(tǒng)時間嫂用,而非數(shù)據(jù)時間型凳。雖然數(shù)據(jù)本身是連續(xù)的,但是到達spark-streaming的時間相關(guān)較久導(dǎo)致無法存入同一window嘱函,無法正常計算啰脚?