Prologue
在很久之前侣颂,筆者曾簡單介紹了Chandy-Lamport分布式快照算法,如果看官還未讀過枪孩,建議作為前置知識補(bǔ)充一下憔晒。
用過Flink的人都會知道檢查點(diǎn)機(jī)制有多重要,而Flink做checkpoint的過程正是依賴于Chandy-Lamport算法的變種——異步屏障快照(asynchronous barrier snapshotting, ABS)算法蔑舞。該算法由五位大佬通過論文《Lightweight Asynchronous Snapshots for Distributed Dataflows》提出拒担。可以說攻询,理解了ABS从撼,就真正理解了Flink檢查點(diǎn)背后的原理。本文來談?wù)勊?/p>
Checkpoint & Snapshot
檢查點(diǎn)是Flink為流計(jì)算過程提供的容錯(cuò)和故障恢復(fù)機(jī)制钧栖。當(dāng)程序出錯(cuò)時(shí)低零,F(xiàn)link會重啟受到影響的那部分算子及計(jì)算邏輯婆翔,并將它們重置到最后一次成功checkpoint時(shí)的狀態(tài)。每次成功的checkpoint產(chǎn)生的“狀態(tài)數(shù)據(jù)”其實(shí)就是這個(gè)流式計(jì)算任務(wù)在那一時(shí)刻的快照掏婶。
Flink作業(yè)可以抽象成有向圖表示啃奴,圖的頂點(diǎn)是算子(operator),邊是數(shù)據(jù)流(data stream)气堕,與Chandy-Lamport算法提出的“進(jìn)程-鏈路”圖模型恰好對應(yīng)纺腊。直接套用C-L算法的思路,我們可以得出如下推論:
- Flink作業(yè)的快照要包含兩部分茎芭,即算子所處的狀態(tài)以及數(shù)據(jù)流承載的數(shù)據(jù)揖膜。算子每收到/發(fā)出一條數(shù)據(jù),以及數(shù)據(jù)流每流入/流出一條數(shù)據(jù)梅桩,都會造成全局狀態(tài)的改變壹粟。
- 算子可以感知到自己的狀態(tài),但數(shù)據(jù)流的狀態(tài)不容易記錄宿百,主要是因?yàn)槌休d的數(shù)據(jù)量太大趁仙,并且總是在變化。
- 時(shí)間是無法靜止的(即數(shù)據(jù)總是在流動的)垦页,并且快照不能stop-the-world雀费,否則會造成延遲和數(shù)據(jù)堆積,降低吞吐量痊焊。
所以解決方案的要點(diǎn)有二:一是通過每個(gè)算子自己記錄的狀態(tài)合并出全局快照盏袄,二是引入一個(gè)標(biāo)記把數(shù)據(jù)流從時(shí)域上切分成段。下面就可以了解ABS算法的基礎(chǔ)——屏障薄啥。
Barrier
之前已經(jīng)講過辕羽,C-L算法引入了marker消息來作為快照的邊界,即區(qū)分“當(dāng)前快照的數(shù)據(jù)”和“下一個(gè)快照的數(shù)據(jù)”垄惧。ABS算法也有自己的marker消息刁愿,不過稱為檢查點(diǎn)屏障(checkpoint barrier),簡稱屏障到逊。
屏障由Flink的JobManager周期性產(chǎn)生(周期長度由StreamExecutionEnvironment.enableCheckpointing()
方法來指定)铣口,并廣播給所有Source算子,沿著數(shù)據(jù)流流動下去觉壶。下圖示出一條帶有屏障的數(shù)據(jù)流枷踏。
可見,第n - 1個(gè)屏障之后掰曾、第n個(gè)屏障之前的所有數(shù)據(jù)都屬于第n個(gè)檢查點(diǎn)。下游算子如果檢測到屏障的存在停团,就會觸發(fā)快照動作旷坦,不必再關(guān)心時(shí)間無法靜止的問題掏熬。下面繼續(xù)了解快照階段是如何執(zhí)行的。
Snapshotting & Barrier Alignment
舉例說明檢查點(diǎn)流程秒梅。下圖是論文中給出的并行度為2的Word Count示例旗芬,注意該作業(yè)的執(zhí)行計(jì)劃為有向無環(huán)圖(DAG)。
快照算法的步驟如下:
a) Source算子接收到JobManager產(chǎn)生的屏障捆蜀,生成自己狀態(tài)的快照(其中包含數(shù)據(jù)源對應(yīng)的offset/position信息)疮丛,并將屏障廣播給下游所有數(shù)據(jù)流;
b)辆它、c) 下游非Source的算子從它的某個(gè)輸入數(shù)據(jù)流接收到屏障后誊薄,會阻塞這個(gè)輸入流,繼續(xù)接收其他輸入流锰茉,直到所有輸入流的屏障都到達(dá)(圖中的count-2算子接收的兩個(gè)屏障就不是同時(shí)到達(dá)的)呢蔫。一旦算子收齊了所有屏障,它就會生成自己狀態(tài)的快照飒筑,并繼續(xù)將屏障廣播給下游所有數(shù)據(jù)流片吊;
d) 快照生成后,算子解除對輸入流的阻塞协屡,繼續(xù)進(jìn)行計(jì)算俏脊。Sink算子接收到屏障之后會向JobManager確認(rèn),所有Sink都確認(rèn)收到屏障標(biāo)記著這一周期checkpoint過程結(jié)束肤晓,快照成功爷贫。
可見,如果算子只有一個(gè)輸入流的話材原,問題就比較簡單沸久,只需要在收到屏障之后立即做快照。但是如果有多個(gè)輸入流余蟹,就必須要等待收到所有屏障才能做快照卷胯,以避免將檢查點(diǎn)n與檢查點(diǎn)n + 1的數(shù)據(jù)混淆。這個(gè)等待的過程就叫做對齊(alignment)威酒,圖來自官方文檔窑睁。注意算子內(nèi)部有個(gè)輸入緩沖區(qū),用來在對齊期間緩存數(shù)據(jù)葵孤。
下圖是從Flink系統(tǒng)的角度示出整個(gè)checkpoint流程里屏障的流動担钮,以及快照數(shù)據(jù)向狀態(tài)后端的寫入。注意Source記錄的offset值以及Sink收到所有屏障后的ack信號尤仍。
Exactly-Once vs At-Least-Once
上面講到的屏障對齊過程是Flink exactly-once語義的基礎(chǔ)箫津,因?yàn)槠琳蠈R能夠保證多輸入流的算子正常處理不同checkpoint區(qū)間的數(shù)據(jù),避免它們發(fā)生交叉,即不會有數(shù)據(jù)被處理兩次苏遥。
但是對齊過程需要時(shí)間饼拍,有一些對延遲特別敏感的應(yīng)用可能對準(zhǔn)確性的要求沒有那么高。所以Flink也允許在StreamExecutionEnvironment.enableCheckpointing()
方法里指定At-Least-Once語義田炭,會取消屏障對齊师抄,即算子收到第一個(gè)輸入的屏障之后不會阻塞,而是觸發(fā)快照教硫。這樣一來叨吮,部分屬于檢查點(diǎn)n + 1的數(shù)據(jù)也會包括進(jìn)檢查點(diǎn)n的數(shù)據(jù)里, 當(dāng)恢復(fù)時(shí)瞬矩,這部分交叉的數(shù)據(jù)就會被重復(fù)處理茶鉴。
Asynchronous
“屏障”和“快照”都講過了,“異步”呢丧鸯?這個(gè)詞實(shí)際上指的是快照數(shù)據(jù)寫入的異步性:算子收齊屏障并觸發(fā)快照之后蛤铜,不會等待快照數(shù)據(jù)全部寫入狀態(tài)后端,而是一邊后臺寫入丛肢,一邊立刻繼續(xù)處理數(shù)據(jù)流围肥,并將屏障發(fā)送到下游,實(shí)現(xiàn)了最小化延遲蜂怎。
當(dāng)然穆刻,引入異步性之后,checkpoint成功的條件除了所有Sink都報(bào)告ack之外杠步,還得加上一條:所有有狀態(tài)的算子都報(bào)告ack氢伟,否則JobManager就無法確認(rèn)異步寫入到底完成沒有。
DCG幽歼?
ABS的精華講完了朵锣。最后看論文中提到的特殊情況,即作業(yè)的執(zhí)行計(jì)劃是個(gè)有向有環(huán)圖(DCG)甸私。很顯然這種情況會造成死鎖诚些,環(huán)內(nèi)的算子就會無限等待收齊屏障。面對該問題皇型,ABS算法會單獨(dú)處理回邊(back edge)——即從下游流回上游的數(shù)據(jù)流诬烹,因?yàn)榛剡叺拇嬖跁?dǎo)致我們無法單純地通過每個(gè)算子的狀態(tài)合并出全局快照。
思路如下圖所示弃鸦,重點(diǎn)在于回邊終點(diǎn)的那個(gè)算子。當(dāng)該算子的非回邊輸入流的屏障都到達(dá)之后唬格,它會生成一個(gè)本地的快照備份家破,并于此同時(shí)開始記錄回邊流入的數(shù)據(jù)颜说,直到再次從回邊收到相同的屏障脑沿。這樣就靠算子的狀態(tài)記錄了回邊的狀態(tài),當(dāng)從快照恢復(fù)時(shí)韭邓,能夠?qū)⒒剡叺臄?shù)據(jù)重新放回?cái)?shù)據(jù)流傳輸。
The End
明天還有很多事辜御,民那晚安晚安鸭你。