Flink 提供了容錯機制淋样,可以恢復數(shù)據(jù)流應用到一致狀態(tài)嘹悼。該機制確保在發(fā)生故障時排苍,程序的狀態(tài)最終將只反映數(shù)據(jù)流中的每個記錄一次(exactly once)
,有一個開關可以降級為至少一次(at-least-once)袁铐。容錯機制不斷地創(chuàng)建分布式數(shù)據(jù)流的快照揭蜒,對于小狀態(tài)的流式程序,快照非常輕量剔桨,可以高頻率創(chuàng)建而對性能影響很小屉更。流式程序的狀態(tài)存儲在可配置的位置(如主節(jié)點或 HDFS 上)。當程序失斎髯骸(由于機器瑰谜、網(wǎng)絡或軟件故障),F(xiàn)link 停止分布式數(shù)據(jù)流树绩。然后系統(tǒng)重新啟動 operator 并將其重置為最新成功的 checkpoint萨脑,輸入流重置為相應的狀態(tài)快照位置,保證被重啟的并行數(shù)據(jù)流中處理的任何一個記錄都不是 checkpoint 狀態(tài)之前的一部分饺饭。
默認情況下渤早,禁用檢查點。
為了容錯機制生效瘫俊,數(shù)據(jù)源(例如 queue 或者 broker)需要能夠回滾到指定位置重放數(shù)據(jù)流鹊杖。Apache Kafka 有這個特性,F(xiàn)link 中 Kafka 的 connector 利用了這個功能扛芽。
由于 Flink 的 checkpoint 是通過分布式快照實現(xiàn)的,接下來我們將 snapshot 和 checkpoint 這兩個詞交替使用。
由于 Flink checkpoint 是通過分布式 snapshot 實現(xiàn)的赡鲜,因此我們在說法上 snapshot 和 checkpoint 可以互換使用。
Checkpoint
Flink 容錯機制的核心就是持續(xù)地創(chuàng)建分布式數(shù)據(jù)流和操作算子狀態(tài)的 snapshot银酬。這些快照(snapshot)在程序出錯失敗時可以回退到一致性檢查點嘲更,F(xiàn)link 用于繪制這些 snapshots 的機制在 Lightweight Asynchronous Snapshots for Distributed Dataflows 中進行了描述。受分布式快照算法的標準 Chandy-Lamport 算法啟發(fā)赋朦,并針對 Flink 執(zhí)行模型量身定制。
Barriers
Flink 分布式快照的核心是 stream barriers宠哄。這些 barriers 被插入到數(shù)據(jù)流中壹将,并作為數(shù)據(jù)流的一部分和記錄一起向下游。Barriers 永遠不會超過正常數(shù)據(jù)毛嫉,數(shù)據(jù)流嚴格有序诽俯。一個 barrier 將數(shù)據(jù)流中的記錄分割為進入當前快照的一組記錄和進入下一個快照的記錄承粤。每個 barrier 都帶有快照ID,并且 barrier 之前的記錄都進入了此快照仙粱。Barriers 不會中斷數(shù)據(jù)流彻舰,所以非常的輕量。多個不同快照的多個 barriers 可以同時在 stream 中出現(xiàn)淹遵,即多個快照可能同時創(chuàng)建。
Stream barriers 在 source stream 的并行數(shù)據(jù)流中插入济炎。當 snapshot n 被插入(計作Sn)辐真,Sn點是 source stream 中 snapshot 覆蓋數(shù)據(jù)的位置。例如在 Apache Kafka 中耐床,此位置表示某個分區(qū)中最后一條數(shù)據(jù)的偏移量(offset)楔脯。Sn點被發(fā)送給 checkpoint coordinator(Flink JobManger)。
然后 barrier 繼續(xù)移動堪嫂。當中間算子從其所有的輸入流(input stream)中收到 snapshot n 的 barrier 時木柬,會向其所有輸出流(outgoing stream)插入 snapshot n 的 barrier。一旦 Sink operator(流式DAG的末端)從其所有輸入流中接受到 barrier n眉枕,向 checkpoint coordinator 確認 snapshot n 已完成,在所有 sinks 確認之后谤牡,該 snapshot 被認為已完成。
一旦 snapshot n 完成想许,作業(yè)將永遠不會再向 source 請求Sn之前的記錄断序,因為這些記錄已經(jīng)都走完了整個拓撲圖。
接收超過一個輸入流的 operator 需要基于 snapshot barrier 對齊(align)輸入漱凝。參見上圖:
- 當算子從輸入流接收到 snapshot 的 barrier n诸迟,就不能繼續(xù)處理此數(shù)據(jù)流的后續(xù)數(shù)據(jù),知道其接收到其余流的 barrier n為止壁公。否則會將屬于 snapshot n 和 snapshot n+1的數(shù)據(jù)混淆
- 接收到 barrier n 的流的數(shù)據(jù)會被放在一個 input buffer 中绅项,暫時不會處理
- 當從最后一個流中接收到 barrier n 時,算子會 emit 所有暫存在 buffer 中的數(shù)據(jù)快耿,然后自己向下游發(fā)送 Snapshot n
- 最后算子恢復所有輸入流數(shù)據(jù)的處理,優(yōu)先處理輸入緩存中的數(shù)據(jù)
State
當算子包含任何形式的 state 時撞反,此狀態(tài)(state)也必須是快照的一部分搪花。算子狀態(tài)有不同的形式:
- 用戶定義的狀態(tài)(User-defined state),由 transformation 函數(shù)(如map()或filter())直接創(chuàng)建或修改的狀態(tài)丁稀。
- 系統(tǒng)狀態(tài)(System state)倚聚,作為算子計算一部分的緩存數(shù)據(jù)凿可。典型例子就是窗口緩存(window buffers)授账,系統(tǒng)在其中收集(聚合)窗口的記錄惨驶,直到窗口被處理。
當算子接收到所有輸入流中的 barriers 后屋确,會對其狀態(tài)進行快照续扔,在提交 barriers 到輸出流之前。這時纱昧,在 barrier 設置之前所有數(shù)據(jù)會對狀態(tài)進行更新,并且之后不會再依賴 barrier 之前數(shù)據(jù)设联。由于 snapshote 的狀態(tài)可能會很大灼捂,因此存儲在可配置的 state backend 中。默認在 JobManager 的內(nèi)存中宫蛆,但是對于生產(chǎn)使用偎球,應使用可靠的分布式存儲系統(tǒng)(如HDFS)。在狀態(tài)存儲后衰絮,算子確認 checkpoint 完成猫牡,將 snapshot barrier 發(fā)送到輸出流后恢復處理。
生成的 snapshot 包含:
- 對于每個并行輸入數(shù)據(jù)源:快照創(chuàng)建時數(shù)據(jù)流中的位置偏移
- 對于每個 算子:存儲在快照中的狀態(tài)指針
Exactly Once vs. At Least Once
對齊操作可能會增加流處理的延遲煌恢,通常這種額外的延遲在毫秒級震庭,但是我們也遇到過延遲顯著增加的異常情況。對于要求所有記錄處理都保持毫秒級低延遲的應用器联,F(xiàn)link 提供了在 checkpoint 時跳過對齊的開關婿崭。一旦算子從每個輸入流接收到檢查點氓栈,就會開始繪制快照婿着。
當跳過對齊時,算子會繼續(xù)處理所有的輸入竟宋,即使當 checkpoint n 的 barrier 到達時。也就是說氯葬,算子在 checkpoint n 創(chuàng)建之前婉陷,繼續(xù)處理屬于 checkpoint n+1 的數(shù)據(jù)。當恢復異常時闯睹,這部分記錄就會重復處理担神,因為它們的處理狀態(tài)已經(jīng)被包含在了 checkpoint n 中,同時也會在之后再次被重放處理妄讯。
對齊操作只會發(fā)生在擁有多輸入運算(join)或者多個輸出(repartition亥贸、shuffle)的算子的場景下。所以炕置,對于只有 map()、flatmap()默垄、fliter() 等并行操作即使在至少一次的模式中仍然會保證嚴格一次甚纲。
Asynchronous State Snapshots
上述機制意味著當算子會停止處理輸入記錄,當算子在存儲 snapshot 時鹃操。這種同步會在創(chuàng)建快照時引入延遲。
可以讓算子在存儲快照時繼續(xù)處理數(shù)據(jù)组民,讓快照存儲異步在后臺運行悲靴。因此癞尚,算子必須生成一個 state 對象以某種方式存儲,保證后續(xù)狀態(tài)的修改不會改變這個 state 對象浇揩。例如 RocksDB 中使用的 copy-on-write(寫時復制)類型的數(shù)據(jù)結(jié)構(gòu)。
在接受輸入的到檢查點 barriers积锅,算子啟動異步的 snapshot 復制其狀態(tài)养盗。會立刻向下游提交 barrier,然后繼續(xù)正常數(shù)據(jù)的處理箫爷。當后臺復制過程完成聂儒,會向 checkpoint coordinator(JobManager)進行確認。檢查點完成的充分條件是:所有 sink 接收到了 barrier衩婚,所有有狀態(tài)算子都確認完成了狀態(tài)備份谅猾。
Recovery
在這種機制下的錯誤恢復:Flink 選擇最近一次完成的檢查點k,系統(tǒng)重新部署整個分布式數(shù)據(jù)流税娜,并為每個算子恢復到檢查點k的狀態(tài)。輸入源從Sk位置開始讀取概行,例如在Kafka中弧岳,從Sk保存的偏移量開始讀取业踏。
如果是增量快照涧卵,算子需要從最新的圈梁快照恢復,然后對此狀態(tài)進行一系列增量更新伐脖。
Operator Snapshot Implementation
算子創(chuàng)建快照由兩部分操作:同步(synchronous)和異步(asynchronous)部分
算子和 state backend 將快照作為Java的 FutureTask乐设。這個任務包含了已完成同步部分和pending的異步部分,異步的部分在后臺線程中執(zhí)行蠕啄。完全同步的檢查點的算子返回已完成的 FutureTask戈锻。如果需要執(zhí)行異步算子操作,則以該 run() 方法執(zhí)行 FutureTask。這些任務是可以取消的楚昭,來釋放流和其他資源叹卷。
Reference
https://ci.apache.org/projects/flink/flink-docs-release-1.6/internals/stream_checkpointing.html