【Flink 精選】闡述 Flink 的容錯機制渠抹,剖析 Checkpoint 實現(xiàn)流程

闡述 Flink 提供的容錯機制,解釋分布式快照 Chandy Lamport 算法邏輯奇颠,剖析 Flink Checkpoint 具體實現(xiàn)流程烈拒。


1 容錯機制

Flink 容錯機制主要是狀態(tài)的保存和恢復(fù)广鳍,涉及 state backends 狀態(tài)后端赊时、checkpoint 和 savepoint,還有 Job 和 Task 的錯誤恢復(fù)焊傅。

1.1 State Backends 狀態(tài)后端

Flink 狀態(tài)后端是指保存 Checkpoint 數(shù)據(jù)的容器狐胎,其分類有MemoryStateBackend歌馍、FsStateBackend、RocksDBStateBackend暴浦,狀態(tài)的分類有 operator state 和 keyed state歌焦。

StateBackend的分類.JPG

① MemoryStateBackend:默認独撇,本地調(diào)試使用躁锁,小狀態(tài)战转。
② FsStateBackend:高可用場景使用,大狀態(tài)啄踊、長窗口颠通。
③ RocksDBStateBackend:高可用場景使用命雀,可增量 checkpoint吏砂,超大狀態(tài)、長窗口淀歇。

1.2 State 狀態(tài)的保存和恢復(fù)

Flink 狀態(tài)保存和恢復(fù)主要依靠 Checkpoint 機制和 Savepoint 機制匈织,兩者的區(qū)別如下表所示。

Checkpoint 機制 Savepoint 機制
保存 定時制作分布式快照 用戶手動觸發(fā)備份和停止作業(yè)
恢復(fù) 將整個作業(yè)的所有 Task 都回滾到最后一次成功 Checkpoint 中的狀態(tài) 允許用戶修改代碼碰逸、調(diào)整并發(fā)后啟動作業(yè)

1.2.1 相關(guān)概念

(1)Snapshot

快照的概念來源于相片饵史,指照相館的一種沖洗過程短的照片胜榔。在計算機領(lǐng)域夭织,快照是數(shù)據(jù)存儲的某一時刻的狀態(tài)記錄Flink Snapshot 快照是指作業(yè)狀態(tài)的全局一致記錄讲竿。一個完整的快照是包括 source 算子的狀態(tài)(例如戴卜,消費 kafka partition 的 offset)琢岩、狀態(tài)算子的緩存數(shù)據(jù)和 sink 算子的狀態(tài)(批量緩存數(shù)據(jù)担孔、事務(wù)數(shù)據(jù)等)。

(2)Checkpoint

Checkpoint 檢查點可以自動產(chǎn)生快照啄育,用于Flink 故障恢復(fù)挑豌。Checkpoint 具有分布式墩崩、異步鹦筹、增量的特點。

(3)Savepoint

Savepoint 保存點是用戶手動觸發(fā)的徘键,保存全量的作業(yè)狀態(tài)數(shù)據(jù)吹害。一般使用場景是作業(yè)的升級、作業(yè)的并發(fā)度縮放赂摆、遷移集群等。

1.2.2 Snapshot 快照機制

Flink 是采用輕量級的分布式異步快照政恍,其實現(xiàn)是采用柵欄 barrier 作為 checkpoint 的傳遞信號达传,與業(yè)務(wù)數(shù)據(jù)一樣無差別地傳遞下去宪赶,目的是使得數(shù)據(jù)流被切分成微批,進行 checkpoint 保存為 snapshot蒙保。當 barrier 經(jīng)過流圖節(jié)點的時候邓厕,F(xiàn)link 進行 checkpoint 保存狀態(tài)數(shù)據(jù)扁瓢。
如下圖所示引几,checkpoint n 包含每個算子的狀態(tài)伟桅,該狀態(tài)是指checkpoint n 之前的全部事件,而不包含它之后的所有事件渐逃。

barrier工作流程.JPG

Checkpoint Barrier 對齊機制茄菊,如下圖所示面殖。當 ExecutionGraph 物理執(zhí)行圖中的 subtask 算子實例接收到 barrier 的時候脊僚,subtask 會記錄它的狀態(tài)數(shù)據(jù)。如果 subtask 有2個上游(例如 KeyedProcessFunction增淹、CoProcessFunction等)乌企,subtask 會收到上游的2個 barrier 后再觸發(fā) checkpoint(即 barrier 對齊)加酵。
barrier對齊機制.JPG

Flink 全量快照:
StateBackend 采用 copy-on-write 寫時復(fù)制機制猪腕,即當舊狀態(tài)數(shù)據(jù)在進行異步快照的同時陋葡,可以不阻塞業(yè)務(wù)數(shù)據(jù)的實時處理脖岛。只有快照數(shù)據(jù)被持久化后,舊狀態(tài)數(shù)據(jù)才會被垃圾回收陨溅。

1.2.3 保證 Exactly-Once 語義

針對用戶作業(yè)出現(xiàn)故障而導(dǎo)致結(jié)果丟失或者重復(fù)的問題门扇,F(xiàn)link 提供3種語義:
At-Least-Once 最少一次:不會丟失數(shù)據(jù)臼寄,但可能會有重復(fù)結(jié)果溜宽。
Exactly-Once 精確一次:checkpoint barrier 對齊機制可以保障精確一次适揉。

// 最少一次
CheckpointingMode.AT_LEAST_ONCE

// 精確一次
CheckpointingMode.EXACTLY_ONCE

特別說明:
此處 Exactly-Once 語義是指 Flink 內(nèi)部精確一次煤惩,而不是端到端精確一次魄揉。如果需要端到端 Exactly-Once洛退,需要外部存儲的客戶端提供回滾和事務(wù)杰标,即對應(yīng)的 source 有回滾功能和 sink 有事務(wù)功能(例如在旱,kafka connector 提供回滾和事務(wù)桶蝎,相關(guān)內(nèi)容后續(xù)更新)。

1.2.4 Job 和 Task 的錯誤恢復(fù)策略

(1)Job Restart 策略

FailureRateRestartStrategy:允許在指定時間間隔內(nèi)的最大失敗次數(shù)谅畅,同時可以設(shè)置重啟延時時間登渣。
FixedDelayRestartStrategy:允許指定的失敗次數(shù),同時可以設(shè)置重啟延時時間毡泻。
NoRestartStrategy:不需要重啟胜茧,即 Job 直接失敗。
ThrowingRestartStrategy:不需要重啟仇味,直接拋異常呻顽。
Job Restart 策略可以通過 env 設(shè)置。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // number of restart attempts
  Time.of(10, TimeUnit.SECONDS) // delay
));

上述策略的父類接口是RestartStrategy丹墨,其關(guān)鍵是restart(重啟操作)。

/**
 * Strategy for {@link ExecutionGraph} restarts.
 */
public interface RestartStrategy {

    /**
     * True if the restart strategy can be applied to restart the {@link ExecutionGraph}.
     *
     * @return true if restart is possible, otherwise false
     */
    boolean canRestart();

    /**
     * Called by the ExecutionGraph to eventually trigger a full recovery.
     * The recovery must be triggered on the given callback object, and may be delayed
     * with the help of the given scheduled executor.
     *
     * <p>The thread that calls this method is not supposed to block/sleep.
     *
     * @param restarter The hook to restart the ExecutionGraph
     * @param executor An scheduled executor to delay the restart
     * @return A {@link CompletableFuture} that will be completed when the restarting process is done.
     */
    CompletableFuture<Void> restart(RestartCallback restarter, ScheduledExecutor executor);
}

(2)Task Failover 策略

RestartAllStrategy:重啟全部 task贩挣,默認策略喉前。
RestartIndividualStrategy:恢復(fù)單個 task。如果該 task 沒有source王财,可能導(dǎo)致數(shù)據(jù)丟失卵迂。
NoOpFailoverStrategy:不恢復(fù) task。
上述策略的父類接口是FailoverStrategy绒净,其關(guān)鍵是Factory的create(創(chuàng)建 strategy)见咒、onTaskFailure(處理錯誤)。

說明:
① FailoverStrategy 的設(shè)置是在 flink-config.yaml 的配置 jobmanager.execution.failover-strategy挂疆。
② ExecutionGraph 是 Job 重啟的對象即作業(yè)的物理執(zhí)行圖改览,Execution 是 Task 重啟的對象即 subtask哎垦。

/**
 * A {@code FailoverStrategy} describes how the job computation recovers from task
 * failures.
 * 
 * <p>Failover strategies implement recovery logic for failures of tasks. The execution
 * graph still implements "global failure / recovery" (which restarts all tasks) as
 * a fallback plan or safety net in cases where it deems that the state of the graph
 * may have become inconsistent.
 */
public abstract class FailoverStrategy {


    // ------------------------------------------------------------------------
    //  failover implementation
    // ------------------------------------------------------------------------ 

    /**
     * Called by the execution graph when a task failure occurs.
     * 
     * @param taskExecution The execution attempt of the failed task. 
     * @param cause The exception that caused the task failure.
     */
    public abstract void onTaskFailure(Execution taskExecution, Throwable cause);

    /**
     * Called whenever new vertices are added to the ExecutionGraph.
     * 
     * @param newJobVerticesTopological The newly added vertices, in topological order.
     */
    public abstract void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological);

    /**
     * Gets the name of the failover strategy, for logging purposes.
     */
    public abstract String getStrategyName();

    /**
     * Tells the FailoverStrategy to register its metrics.
     * 
     * <p>The default implementation does nothing
     * 
     * @param metricGroup The metric group to register the metrics at
     */
    public void registerMetrics(MetricGroup metricGroup) {}

    // ------------------------------------------------------------------------
    //  factory
    // ------------------------------------------------------------------------

    /**
     * This factory is a necessary indirection when creating the FailoverStrategy to that
     * we can have both the FailoverStrategy final in the ExecutionGraph, and the
     * ExecutionGraph final in the FailOverStrategy.
     */
    public interface Factory {

        /**
         * Instantiates the {@code FailoverStrategy}.
         * 
         * @param executionGraph The execution graph for which the strategy implements failover.
         * @return The instantiated failover strategy.
         */
        FailoverStrategy create(ExecutionGraph executionGraph);
    }
}

2 Chandy Lamport 算法詳解

2.1 背景

如何產(chǎn)生可靠的全局一致性快照是分布式系統(tǒng)的難點,其傳統(tǒng)方案是使用的全局時鐘恃疯,但存在單點故障漏设、數(shù)據(jù)不一致等可靠性問題。為了解決該問題今妄,Chandy-Lamport 算法采用 marker 的傳播來代替全局時鐘郑口。

全局快照的概念:Global Snapshot 即全局狀態(tài) Global State,應(yīng)用于系統(tǒng) Failure Recovery盾鳞。

2.2 Chandy Lamport 算法

分布式系統(tǒng)的簡化:一個有向圖犬性,其中節(jié)點是進程,邊是channel腾仅。

(1)快照初始化

① 進程 Pi 記錄自己的進程狀態(tài)乒裆,同時生產(chǎn)一個標識信息 marker(與正常 message 不同),通過 ouput channel 發(fā)送給系統(tǒng)里面的其他進程推励。
② 進程 Pi 開始記錄所有 input channel 接收到的 message

(2)快照進行

進程 Pj 從 input channel Ckj 接收到 marker鹤耍。如果 Pj 還沒有記錄自己的進程狀態(tài),則 Pj 記錄自己的進程狀態(tài)验辞,向 output channel 發(fā)送 marker稿黄;否則 Pj 正在記錄自己的進程狀態(tài)(該 marker 之前的 message)。

marker的作用:marker相當于一個分隔符跌造,把無限的數(shù)據(jù)流分隔為一批一批數(shù)據(jù)杆怕。每一批數(shù)據(jù)進都行快照,例如進程Pj,處理的 message 為[n6,n5,marker2,n4,marker1,n3,n2,n1],Pj 接收到 marker1 后琼蚯,快照記錄n3,n2,n1,接受到 marker2后互纯,快照記錄n4。

(3)快照完成

所有的進程都收到 marker 信息并且記錄下自己的狀態(tài)和 channel 的狀態(tài)(包含的 message)醉拓。

2.3 總結(jié)

Flink 的分布式異步快照實現(xiàn)了Chandy Lamport 算法伟姐,其核心思想是在 source 插入 barrier 代替 Chandy-Lamport 算法中的 marker,通過控制 barrier 的同步來實現(xiàn) snapshot 的備份和 Exactly-Once 語義亿卤。

3 Checkpoint 實現(xiàn)流程

第一步:Checkpoint Coordinator觸發(fā)Checkpoint

Checkpoint Coordinator 向所有 source 節(jié)點 trigger Checkpoint愤兵。


coordinator觸發(fā)checkpoint.png

第二步:source向下游廣播barrier

source task向下游廣播barrier。


source發(fā)送barrier和異步備份狀態(tài).png

說明:每個source task都會產(chǎn)生同批次的barrier排吴,向下游廣播秆乳。例如上圖,source task1 和 source task2 產(chǎn)生barrier n, 向下游廣播屹堰。

第三步:source通知coordinator完成備份

當source task備份完自己的狀態(tài)后肛冶,會將備份數(shù)據(jù)的地址(state handle)通知 Checkpoint Coordinator。

source通知coordinator完成備份.png

snapshot數(shù)據(jù)備份:同步階段或者異步階段(默認)
1.同步階段:task執(zhí)行狀態(tài)快照扯键,并寫入外部存儲系統(tǒng)睦袖,其執(zhí)行快照的過程
a.深拷貝state。
b.將寫操作封裝在異步的FutureTask中荣刑,其FutureTask的作用包括:=》打開輸入流 =》寫入狀態(tài)的元數(shù)據(jù)信息 =》寫入狀態(tài) =》關(guān)閉輸入流馅笙。
2.異步階段:執(zhí)行同步階段創(chuàng)建的FutureTask,向Checkpoint Coordinator發(fā)送ACK響應(yīng)厉亏。

如何配置同步和異步snapshot董习?
fink-config.yamlstate.backend.async配置異步/同步snapshot,默認是異步snapshot爱只。

第四步:map和sink執(zhí)行快照

map和sink task收集齊上游source的barrier n皿淋,執(zhí)行本地快照。下面例子是RocksDB增量Checkpoint 的流程:首先RocksDB會全量保存到磁盤上(紅色大三角表示)恬试,然后Flink會從中選擇沒有上傳的文件進行持久化備份(紫色小三角)窝趣。


map和sink執(zhí)行快照.png

第五步:map和sink通知coordinator完成備份

map和sink task在完成Checkpoint 之后,將狀態(tài)地址state handle返回通知 Coordinator忘渔。


map和sink通知coordinator完成備份.png

第六步:coordinator確定完成checkpoint

當Checkpoint Coordinator收到全部task的state handle高帖,就確定該Checkpoint已完成,并向持久化存儲中備份一個Checkpoint Meta(元數(shù)據(jù)畦粮,包括該checkpoint狀態(tài)數(shù)據(jù)的備份地址)。


coordinator持久化checkpoint meta.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末乖阵,一起剝皮案震驚了整個濱河市宣赔,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌瞪浸,老刑警劉巖儒将,帶你破解...
    沈念sama閱讀 222,729評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異对蒲,居然都是意外死亡钩蚊,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,226評論 3 399
  • 文/潘曉璐 我一進店門蹈矮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來砰逻,“玉大人,你說我怎么就攤上這事泛鸟◎鹋兀” “怎么了?”我有些...
    開封第一講書人閱讀 169,461評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長刚操。 經(jīng)常有香客問我闸翅,道長,這世上最難降的妖魔是什么菊霜? 我笑而不...
    開封第一講書人閱讀 60,135評論 1 300
  • 正文 為了忘掉前任坚冀,我火速辦了婚禮,結(jié)果婚禮上鉴逞,老公的妹妹穿的比我還像新娘记某。我一直安慰自己,他們只是感情好华蜒,可當我...
    茶點故事閱讀 69,130評論 6 398
  • 文/花漫 我一把揭開白布辙纬。 她就那樣靜靜地躺著,像睡著了一般叭喜。 火紅的嫁衣襯著肌膚如雪贺拣。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,736評論 1 312
  • 那天捂蕴,我揣著相機與錄音譬涡,去河邊找鬼。 笑死啥辨,一個胖子當著我的面吹牛涡匀,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播溉知,決...
    沈念sama閱讀 41,179評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼陨瘩,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了级乍?” 一聲冷哼從身側(cè)響起舌劳,我...
    開封第一講書人閱讀 40,124評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎玫荣,沒想到半個月后甚淡,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,657評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡捅厂,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,723評論 3 342
  • 正文 我和宋清朗相戀三年贯卦,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片焙贷。...
    茶點故事閱讀 40,872評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡撵割,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出盈厘,到底是詐尸還是另有隱情睁枕,我是刑警寧澤,帶...
    沈念sama閱讀 36,533評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站外遇,受9級特大地震影響注簿,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜跳仿,卻給世界環(huán)境...
    茶點故事閱讀 42,213評論 3 336
  • 文/蒙蒙 一诡渴、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧菲语,春花似錦妄辩、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,700評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至佩憾,卻和暖如春哮伟,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背妄帘。 一陣腳步聲響...
    開封第一講書人閱讀 33,819評論 1 274
  • 我被黑心中介騙來泰國打工楞黄, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人抡驼。 一個月前我還...
    沈念sama閱讀 49,304評論 3 379
  • 正文 我出身青樓鬼廓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親致盟。 傳聞我的和親對象是個殘疾皇子碎税,可洞房花燭夜當晚...
    茶點故事閱讀 45,876評論 2 361