一致性檢查點
1.什么是一致性檢查點
Flink故障恢復機制的核心,就是應(yīng)用的一致性檢查點驼鹅。有狀態(tài)應(yīng)用的一致性檢查點舰罚,就是所有任務(wù)的狀態(tài)纽门,在某個時間點的一份快照。這個時間點营罢,是所有任務(wù)都恰好處理完同一個相同輸入數(shù)據(jù)的時候赏陵。
在流式應(yīng)用處理過程中饼齿,F(xiàn)link會周期性的為應(yīng)用狀態(tài)生成檢查點。一旦發(fā)生故障蝙搔,F(xiàn)link會利用最新的檢查點來恢復任務(wù)狀態(tài)缕溉,并重新啟動處理程序。如果所有算子的狀態(tài)都可以寫入檢查點并從其中恢復吃型,并且所有輸入流的消費位置都能重置到檢查點的那一刻证鸥,那么檢查點和故障恢復機制就能為整個應(yīng)用狀態(tài)提供精確一次的一致性保障。 值得一提的是勤晚,F(xiàn)link的檢查點機制枉层,只能保證應(yīng)用內(nèi)部狀態(tài)的精確一致性,至于端對端的精確一致性保證赐写,還需結(jié)合外部讀寫系統(tǒng)的支持鸟蜡。
val env = StreamExecutionEnvironment.getExecutionEnvironment()
// 啟用checkpoint間隔 1000ms
env.enableCheckpointing(1000)
// 高級配置:
// 設(shè)置 exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 設(shè)置checkpoint最小間隔 500ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// 設(shè)置checkpoint在1分鐘內(nèi)完成 否則將被丟棄
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 設(shè)置 checkpoint 失敗時,任務(wù)不會 fail血淌,該 checkpoint 會被丟棄
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
// 設(shè)置checkpoint并發(fā)度為1
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 任務(wù)canel后矩欠,檢查點的狀態(tài)
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);