闡述 Flink 提供的容錯機制,解釋分布式快照 Chandy Lamport 算法邏輯奇颠,剖析 Flink Checkpoint 具體實現(xiàn)流程烈拒。
1 容錯機制
Flink 容錯機制主要是狀態(tài)的保存和恢復(fù)广鳍,涉及 state backends 狀態(tài)后端赊时、checkpoint 和 savepoint,還有 Job 和 Task 的錯誤恢復(fù)焊傅。
1.1 State Backends 狀態(tài)后端
Flink 狀態(tài)后端是指保存 Checkpoint 數(shù)據(jù)的容器狐胎,其分類有MemoryStateBackend歌馍、FsStateBackend、RocksDBStateBackend暴浦,狀態(tài)的分類有 operator state 和 keyed state歌焦。
① MemoryStateBackend:默認独撇,本地調(diào)試使用躁锁,小狀態(tài)战转。
② FsStateBackend:高可用場景使用,大狀態(tài)啄踊、長窗口颠通。
③ RocksDBStateBackend:高可用場景使用命雀,可增量 checkpoint吏砂,超大狀態(tài)、長窗口淀歇。
1.2 State 狀態(tài)的保存和恢復(fù)
Flink 狀態(tài)保存和恢復(fù)主要依靠 Checkpoint 機制和 Savepoint 機制匈织,兩者的區(qū)別如下表所示。
Checkpoint 機制 | Savepoint 機制 | |
---|---|---|
保存 | 定時制作分布式快照 | 用戶手動觸發(fā)備份和停止作業(yè) |
恢復(fù) | 將整個作業(yè)的所有 Task 都回滾到最后一次成功 Checkpoint 中的狀態(tài) | 允許用戶修改代碼碰逸、調(diào)整并發(fā)后啟動作業(yè) |
1.2.1 相關(guān)概念
(1)Snapshot
快照的概念來源于相片饵史,指照相館的一種沖洗過程短的照片胜榔。在計算機領(lǐng)域夭织,快照是數(shù)據(jù)存儲的某一時刻的狀態(tài)記錄。Flink Snapshot 快照是指作業(yè)狀態(tài)的全局一致記錄讲竿。一個完整的快照是包括 source 算子的狀態(tài)(例如戴卜,消費 kafka partition 的 offset)琢岩、狀態(tài)算子的緩存數(shù)據(jù)和 sink 算子的狀態(tài)(批量緩存數(shù)據(jù)担孔、事務(wù)數(shù)據(jù)等)。
(2)Checkpoint
Checkpoint 檢查點可以自動產(chǎn)生快照啄育,用于Flink 故障恢復(fù)挑豌。Checkpoint 具有分布式墩崩、異步鹦筹、增量的特點。
(3)Savepoint
Savepoint 保存點是用戶手動觸發(fā)的徘键,保存全量的作業(yè)狀態(tài)數(shù)據(jù)吹害。一般使用場景是作業(yè)的升級、作業(yè)的并發(fā)度縮放赂摆、遷移集群等。
1.2.2 Snapshot 快照機制
Flink 是采用輕量級的分布式異步快照政恍,其實現(xiàn)是采用柵欄 barrier 作為 checkpoint 的傳遞信號达传,與業(yè)務(wù)數(shù)據(jù)一樣無差別地傳遞下去宪赶,目的是使得數(shù)據(jù)流被切分成微批,進行 checkpoint 保存為 snapshot蒙保。當 barrier 經(jīng)過流圖節(jié)點的時候邓厕,F(xiàn)link 進行 checkpoint 保存狀態(tài)數(shù)據(jù)扁瓢。
如下圖所示引几,checkpoint n 包含每個算子的狀態(tài)伟桅,該狀態(tài)是指checkpoint n 之前的全部事件,而不包含它之后的所有事件渐逃。
Checkpoint Barrier 對齊機制茄菊,如下圖所示面殖。當 ExecutionGraph 物理執(zhí)行圖中的 subtask 算子實例接收到 barrier 的時候脊僚,subtask 會記錄它的狀態(tài)數(shù)據(jù)。如果 subtask 有2個上游(例如 KeyedProcessFunction增淹、CoProcessFunction等)乌企,subtask 會收到上游的2個 barrier 后再觸發(fā) checkpoint(即 barrier 對齊)加酵。
Flink 全量快照:
StateBackend 采用 copy-on-write 寫時復(fù)制機制猪腕,即當舊狀態(tài)數(shù)據(jù)在進行異步快照的同時陋葡,可以不阻塞業(yè)務(wù)數(shù)據(jù)的實時處理脖岛。只有快照數(shù)據(jù)被持久化后,舊狀態(tài)數(shù)據(jù)才會被垃圾回收陨溅。
1.2.3 保證 Exactly-Once 語義
針對用戶作業(yè)出現(xiàn)故障而導(dǎo)致結(jié)果丟失或者重復(fù)的問題门扇,F(xiàn)link 提供3種語義:
① At-Least-Once 最少一次:不會丟失數(shù)據(jù)臼寄,但可能會有重復(fù)結(jié)果溜宽。
② Exactly-Once 精確一次:checkpoint barrier 對齊機制可以保障精確一次适揉。
// 最少一次
CheckpointingMode.AT_LEAST_ONCE
// 精確一次
CheckpointingMode.EXACTLY_ONCE
特別說明:
此處 Exactly-Once 語義是指 Flink 內(nèi)部精確一次煤惩,而不是端到端精確一次魄揉。如果需要端到端 Exactly-Once洛退,需要外部存儲的客戶端提供回滾和事務(wù)杰标,即對應(yīng)的 source 有回滾功能和 sink 有事務(wù)功能(例如在旱,kafka connector 提供回滾和事務(wù)桶蝎,相關(guān)內(nèi)容后續(xù)更新)。
1.2.4 Job 和 Task 的錯誤恢復(fù)策略
(1)Job Restart 策略
① FailureRateRestartStrategy:允許在指定時間間隔內(nèi)的最大失敗次數(shù)谅畅,同時可以設(shè)置重啟延時時間登渣。
② FixedDelayRestartStrategy:允許指定的失敗次數(shù),同時可以設(shè)置重啟延時時間毡泻。
③ NoRestartStrategy:不需要重啟胜茧,即 Job 直接失敗。
④ ThrowingRestartStrategy:不需要重啟仇味,直接拋異常呻顽。
Job Restart 策略可以通過 env 設(shè)置。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
));
上述策略的父類接口是RestartStrategy丹墨,其關(guān)鍵是restart(重啟操作)。
/**
* Strategy for {@link ExecutionGraph} restarts.
*/
public interface RestartStrategy {
/**
* True if the restart strategy can be applied to restart the {@link ExecutionGraph}.
*
* @return true if restart is possible, otherwise false
*/
boolean canRestart();
/**
* Called by the ExecutionGraph to eventually trigger a full recovery.
* The recovery must be triggered on the given callback object, and may be delayed
* with the help of the given scheduled executor.
*
* <p>The thread that calls this method is not supposed to block/sleep.
*
* @param restarter The hook to restart the ExecutionGraph
* @param executor An scheduled executor to delay the restart
* @return A {@link CompletableFuture} that will be completed when the restarting process is done.
*/
CompletableFuture<Void> restart(RestartCallback restarter, ScheduledExecutor executor);
}
(2)Task Failover 策略
① RestartAllStrategy:重啟全部 task贩挣,默認策略喉前。
② RestartIndividualStrategy:恢復(fù)單個 task。如果該 task 沒有source王财,可能導(dǎo)致數(shù)據(jù)丟失卵迂。
③ NoOpFailoverStrategy:不恢復(fù) task。
上述策略的父類接口是FailoverStrategy绒净,其關(guān)鍵是Factory的create(創(chuàng)建 strategy)见咒、onTaskFailure(處理錯誤)。
說明:
① FailoverStrategy 的設(shè)置是在 flink-config.yaml 的配置 jobmanager.execution.failover-strategy挂疆。
② ExecutionGraph 是 Job 重啟的對象即作業(yè)的物理執(zhí)行圖改览,Execution 是 Task 重啟的對象即 subtask哎垦。
/**
* A {@code FailoverStrategy} describes how the job computation recovers from task
* failures.
*
* <p>Failover strategies implement recovery logic for failures of tasks. The execution
* graph still implements "global failure / recovery" (which restarts all tasks) as
* a fallback plan or safety net in cases where it deems that the state of the graph
* may have become inconsistent.
*/
public abstract class FailoverStrategy {
// ------------------------------------------------------------------------
// failover implementation
// ------------------------------------------------------------------------
/**
* Called by the execution graph when a task failure occurs.
*
* @param taskExecution The execution attempt of the failed task.
* @param cause The exception that caused the task failure.
*/
public abstract void onTaskFailure(Execution taskExecution, Throwable cause);
/**
* Called whenever new vertices are added to the ExecutionGraph.
*
* @param newJobVerticesTopological The newly added vertices, in topological order.
*/
public abstract void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological);
/**
* Gets the name of the failover strategy, for logging purposes.
*/
public abstract String getStrategyName();
/**
* Tells the FailoverStrategy to register its metrics.
*
* <p>The default implementation does nothing
*
* @param metricGroup The metric group to register the metrics at
*/
public void registerMetrics(MetricGroup metricGroup) {}
// ------------------------------------------------------------------------
// factory
// ------------------------------------------------------------------------
/**
* This factory is a necessary indirection when creating the FailoverStrategy to that
* we can have both the FailoverStrategy final in the ExecutionGraph, and the
* ExecutionGraph final in the FailOverStrategy.
*/
public interface Factory {
/**
* Instantiates the {@code FailoverStrategy}.
*
* @param executionGraph The execution graph for which the strategy implements failover.
* @return The instantiated failover strategy.
*/
FailoverStrategy create(ExecutionGraph executionGraph);
}
}
2 Chandy Lamport 算法詳解
2.1 背景
如何產(chǎn)生可靠的全局一致性快照是分布式系統(tǒng)的難點,其傳統(tǒng)方案是使用的全局時鐘恃疯,但存在單點故障漏设、數(shù)據(jù)不一致等可靠性問題。為了解決該問題今妄,Chandy-Lamport 算法采用 marker 的傳播來代替全局時鐘郑口。
全局快照的概念:Global Snapshot 即全局狀態(tài) Global State,應(yīng)用于系統(tǒng) Failure Recovery盾鳞。
2.2 Chandy Lamport 算法
分布式系統(tǒng)的簡化:一個有向圖犬性,其中節(jié)點是進程,邊是channel腾仅。
(1)快照初始化
① 進程 Pi 記錄自己的進程狀態(tài)乒裆,同時生產(chǎn)一個標識信息 marker(與正常 message 不同),通過 ouput channel 發(fā)送給系統(tǒng)里面的其他進程推励。
② 進程 Pi 開始記錄所有 input channel 接收到的 message
(2)快照進行
進程 Pj 從 input channel Ckj 接收到 marker鹤耍。如果 Pj 還沒有記錄自己的進程狀態(tài),則 Pj 記錄自己的進程狀態(tài)验辞,向 output channel 發(fā)送 marker稿黄;否則 Pj 正在記錄自己的進程狀態(tài)(該 marker 之前的 message)。
marker的作用:marker相當于一個分隔符跌造,把無限的數(shù)據(jù)流分隔為一批一批數(shù)據(jù)杆怕。每一批數(shù)據(jù)進都行快照,例如進程Pj,處理的 message 為[n6,n5,marker2,n4,marker1,n3,n2,n1],Pj 接收到 marker1 后琼蚯,快照記錄n3,n2,n1,接受到 marker2后互纯,快照記錄n4。
(3)快照完成
所有的進程都收到 marker 信息并且記錄下自己的狀態(tài)和 channel 的狀態(tài)(包含的 message)醉拓。
2.3 總結(jié)
Flink 的分布式異步快照實現(xiàn)了Chandy Lamport 算法伟姐,其核心思想是在 source 插入 barrier 代替 Chandy-Lamport 算法中的 marker,通過控制 barrier 的同步來實現(xiàn) snapshot 的備份和 Exactly-Once 語義亿卤。
3 Checkpoint 實現(xiàn)流程
第一步:Checkpoint Coordinator觸發(fā)Checkpoint
Checkpoint Coordinator 向所有 source 節(jié)點 trigger Checkpoint愤兵。
第二步:source向下游廣播barrier
source task向下游廣播barrier。
說明:每個source task都會產(chǎn)生同批次的barrier排吴,向下游廣播秆乳。例如上圖,source task1 和 source task2 產(chǎn)生barrier n, 向下游廣播屹堰。
第三步:source通知coordinator完成備份
當source task備份完自己的狀態(tài)后肛冶,會將備份數(shù)據(jù)的地址(state handle)通知 Checkpoint Coordinator。
snapshot數(shù)據(jù)備份:同步階段或者異步階段(默認)
1.同步階段:task執(zhí)行狀態(tài)快照扯键,并寫入外部存儲系統(tǒng)睦袖,其執(zhí)行快照的過程
a.深拷貝state。
b.將寫操作封裝在異步的FutureTask中荣刑,其FutureTask的作用包括:=》打開輸入流 =》寫入狀態(tài)的元數(shù)據(jù)信息 =》寫入狀態(tài) =》關(guān)閉輸入流馅笙。
2.異步階段:執(zhí)行同步階段創(chuàng)建的FutureTask,向Checkpoint Coordinator發(fā)送ACK響應(yīng)厉亏。
如何配置同步和異步snapshot董习?
fink-config.yaml的state.backend.async配置異步/同步snapshot,默認是異步snapshot爱只。
第四步:map和sink執(zhí)行快照
map和sink task收集齊上游source的barrier n皿淋,執(zhí)行本地快照。下面例子是RocksDB增量Checkpoint 的流程:首先RocksDB會全量保存到磁盤上(紅色大三角表示)恬试,然后Flink會從中選擇沒有上傳的文件進行持久化備份(紫色小三角)窝趣。
第五步:map和sink通知coordinator完成備份
map和sink task在完成Checkpoint 之后,將狀態(tài)地址state handle返回通知 Coordinator忘渔。
第六步:coordinator確定完成checkpoint
當Checkpoint Coordinator收到全部task的state handle高帖,就確定該Checkpoint已完成,并向持久化存儲中備份一個Checkpoint Meta(元數(shù)據(jù)畦粮,包括該checkpoint狀態(tài)數(shù)據(jù)的備份地址)。