翻譯自:Data Streaming Fault Tolerance
簡介
Apache Flink 提供了容錯機制來恢復數(shù)據(jù)流應用的狀態(tài)。這種機制保證即使在錯誤出現(xiàn)時,應用的狀態(tài)會最終反應數(shù)據(jù)流中的每條記錄恰好一次(exactly once)代兵。注意勒叠,可以選擇降級到至少一次的保證(at least once)
這種容錯機制不斷的為分布式數(shù)據(jù)流建立快照涂邀。對于擁有小狀態(tài)(數(shù)據(jù)量較小)的流應用脊串,這種快照特別的輕量,在不影響太多性能的情況下不斷地建立快照清钥。這個狀態(tài)存放在配置好的地方(例如 主節(jié)點或HDFS)
當遇到應用失斍矸妗(因為機器、網(wǎng)絡或軟件失斔钫选)斩例,F(xiàn)link 就會停止分布式數(shù)據(jù)流。然后从橘,系統(tǒng)會重啟operator念赶,并將它們設置為最新成功的檢查點(checkpoint)。輸入數(shù)據(jù)流會被設置到狀態(tài)快照對應的點恰力。保證重啟后的并行數(shù)據(jù)流所處理的任何記錄都在檢查點狀態(tài)之后叉谜。
注意:為了實現(xiàn)該機制的所有保證,數(shù)據(jù)流源(例如消息隊列或broker)需要能夠回滾到近段歷史上的某個點踩萎。Kafka 擁有這個能力停局,而且Flink的Kafka 連接器實現(xiàn)了這個能力。
注意:因為Flink的檢查點通過分布式快照實現(xiàn),因此我們使用快照(snapshot)和 檢查點(checkpoint) 表示董栽。
個人理解: exactly once 僅對Flink內(nèi)部狀態(tài)而言码倦,對外部系統(tǒng)的影響是at least once。主要作用為:
對于流聚合的應用锭碳,保證聚合狀態(tài)的正確性
如果狀態(tài)會影響流的處理袁稽,保證流處理結(jié)果的正確性
檢查點
Flink 容錯機制的關鍵部分是為分布式數(shù)據(jù)系統(tǒng)建立一致性快照和操作狀態(tài)。這些快照充當一致性檢查點擒抛,在出現(xiàn)失敗時推汽,就可以回滾。Flink這種建立快照的機制在“分布式數(shù)據(jù)流中的輕量級異步快照” (http://arxiv.org/abs/1506.08603)中進行詳細描述歧沪。它受“Chandy-Lamport algorithm”啟發(fā)歹撒,并為Flink的執(zhí)行模型做了適配。
柵欄(Barriers)
Flink分布式快照中的一個關鍵元素是流柵欄(stream barriers)诊胞。這個柵欄被注入到數(shù)據(jù)流中暖夭,這些記錄流作為數(shù)據(jù)流的一部分。Barriers 絕不超車其他的記錄撵孤,會嚴格的保證順序迈着。Barrier將記錄分割成記錄集,并流入不同的快照中早直。每一個barrier都為快照攜帶一個ID寥假。Barries不會打斷數(shù)據(jù)流的流動,因此非常的輕量霞扬。不同快照的多個barrier可以在一個流中同時出現(xiàn)糕韧,這就意味著不同的快照會同時發(fā)生。
流柵欄會在數(shù)據(jù)流源頭被注入到并行數(shù)據(jù)流中喻圃。為快照n(Sn)產(chǎn)生的柵欄注入的點就是在源頭數(shù)據(jù)流中包含這些快照數(shù)據(jù)的位置萤彩。例如,在kafka中斧拍,這個位置就是最后一個記錄在分區(qū)內(nèi)的位置雀扶。Sn的位置會被報告給檢查點的協(xié)調(diào)者(Flink的JobManager)。
柵欄接下來就會向下游流動肆汹。當一個中游的operator從所有的輸入流中收到了快照n的barrier愚墓,他就會相同的所有下游流發(fā)送快照n的barrier。一旦尾operator(sink operator昂勉,流DAG的終點)已經(jīng)從所有的輸入流中收到了barrier n浪册,它就會將快照n想檢查點協(xié)調(diào)者反饋。當所有的sink反饋了該快照岗照,他就被認為已經(jīng)完成了村象。
當快照n已經(jīng)完成了笆环,可以確定Sn之前的所有記錄在source中都不再需要,因為這些記錄(及其后代)已經(jīng)通過了整個拓撲厚者。
接收多個輸入流的Operator需要將多個輸入的快照柵欄對齊躁劣。說明如下:
一旦operator從一個輸入流中收到了快照barrier n,它將不能再處理這個流中的其他記錄知道它從其他的流中也收到了該barrier n库菲。否則账忘,它會將來自于快照n中的記錄和來自于快照n+1中的快照混淆。
報告barrier n的數(shù)據(jù)流會臨時性的挑出來蝙昙。從這些流中收到的記錄不會處理闪萄,但是會放入到輸入緩存中梧却。
一旦從最后一個流收到了barrier n奇颠,這個operator會發(fā)送所有積壓的記錄(個人注:將barrier之前的數(shù)據(jù)都發(fā)送出去),然后發(fā)送快照n的barrier放航。
然后烈拒,它繼續(xù)處理從所有輸入流中的數(shù)據(jù),先處理輸入緩存中的數(shù)據(jù)广鳍,然后處理流中的數(shù)據(jù)荆几。
狀態(tài)
當operator包含了任意類型的狀態(tài),這些狀態(tài)必須加入到快照中赊时。Operator中的狀態(tài)包含幾個類型:
用戶定義的狀態(tài):這種狀態(tài)由transformation 的函數(shù)(如map() 或filter())直接創(chuàng)建和修改吨铸。用戶定義的狀態(tài)可以是在函數(shù)java對象中的一個簡單變量,或者函數(shù)綁定的key/value狀態(tài)祖秒。
系統(tǒng)狀態(tài):這些狀態(tài)是operator計算過程中的數(shù)據(jù)緩存诞吱。這種狀態(tài)的典型例子是窗口緩存,在其中竭缝,系統(tǒng)為窗口收集(或聚合)記錄知道窗口被觸發(fā)房维。
Operator在收到所有輸入流的快照柵欄時,且發(fā)送barrier到輸出流前抬纸,將狀態(tài)存為快照咙俩。這時,barrier之前的記錄會更新狀態(tài)湿故,這些更新不會依賴與barrier之后的的記錄阿趁。因為快照的狀態(tài)可能會比較大,它被存儲在配置好的狀態(tài)后端坛猪。默認脖阵,會存放在JobManager的內(nèi)存中,但是為了正式環(huán)境設置(serious setups)砚哆,應該配置一個分布式的存儲(例如HDFS).在保存完成狀態(tài)后独撇,operator會反饋檢查點屑墨,發(fā)送快照柵欄到輸出流,然后繼續(xù)處理纷铣。
現(xiàn)在快照包含:
對于任意分布式流數(shù)據(jù)源卵史,快照開始時的位置
-
對于operator, 狀態(tài)也會存儲在快照中。
屏幕快照 2018-11-05 上午11.02.12.png
準確一次 vs 至少一次
對齊步驟(barrier對對齊)可能會給流應用帶來延遲搜立。正常情況下以躯,額外的延遲大約在幾毫秒,但是我們見過一寫延遲增長很多的情況啄踊。對于所有記錄需要持續(xù)超低延遲(幾毫秒)的應用忧设,F(xiàn)link有在檢查點中跳過流對齊的選項。一旦從每一個輸入流中收到檢查點barrier颠通,就會建立檢查點快照址晕。
當跳過對齊步驟時,即使在n檢查點的檢查點柵欄之后顿锰,operator持續(xù)處理所有的輸入谨垃。這樣,在檢查點n建立快照之前硼控,operator也會處理屬于檢查點n+1的元素刘陶。在重啟時(on a restore),這些記錄會出現(xiàn)重復,因為他們都會包含在檢查點n內(nèi)牢撼,也會作為檢查點n后的數(shù)據(jù)被重放匙隔。
注意: 對齊之后發(fā)生在operator處理多個輸入時(join)或者有多個發(fā)送者時(stream 重排或重分區(qū))。因此熏版,對于只有一個并發(fā)度的operator(map()纷责、flatMap()、filter()……)及時是至少一次模式下也會保證準確一次纳决。
異步狀態(tài)快照
注意上面描述的機制意味著當operator在狀態(tài)服務中保存狀態(tài)的快照時停止處理輸入的記錄碰逸。同步狀態(tài)快照會在每次建立快照時引入延遲。
如果Operator使得狀態(tài)存儲在后臺異步完成阔加,那么在存儲狀態(tài)快照時繼續(xù)處理數(shù)據(jù)是可能的饵史。為此,Operator必須能夠生產(chǎn)一種對后期對operator狀態(tài)的修改不會影響到當前狀態(tài)對象的狀態(tài)對象胜榔。在RocksDB中使用的copy-on-write式的數(shù)據(jù)結(jié)構就是一個例子胳喷。
在輸入流中收到檢查點柵欄后,operator開始異步復制快照狀態(tài)夭织。它立馬就會向輸出流發(fā)送barrier并繼續(xù)常規(guī)數(shù)據(jù)流處理吭露。一旦后臺復制流程完成,它就會將檢查點協(xié)調(diào)者(JobManager)匯報檢查點尊惰。檢查點只有在所有的sink收到barrier并且有狀態(tài)的operator后臺匯報處理完成時才會結(jié)束(有時會比barrier到達sink的時間要晚)讲竿。
恢復
在這種機制下恢復就簡單了泥兰。一旦失敗,F(xiàn)link選擇最近完成的檢查點k题禀。然后鞋诗,系統(tǒng)會重新部署整個分布式數(shù)據(jù)流,給與每一個operator檢查點k對應的狀態(tài)迈嘹。數(shù)據(jù)源會從Sk對應的位置讀取數(shù)據(jù)流削彬。以Apache Kafka為例,這意味著告訴消費者從offset Sk 處拉取數(shù)據(jù)秀仲。
如果狀態(tài)是增量快照的融痛,那么operator啟動時一最新的全量快照狀態(tài),然后將增量快照應用于該狀態(tài)神僵。
Operator 快照實現(xiàn)
當operator采用快照時雁刷,存在兩部分,同步和異步挑豌。
Operator 和狀態(tài)服務使用Java FutureTask提供快照安券。這個任務包含同步部分完成時的狀態(tài)墩崩,異步部分在阻塞中氓英。然后,該檢查點的一個后臺線程會執(zhí)行異步部分鹦筹。
純異步檢查點的Operator會返回一個已經(jīng)完成的FutureTask铝阐。如果一個異步操作需要執(zhí)行,他會在FutureTask的run()方法內(nèi)執(zhí)行
這個任務是可以取消的铐拐,以便于釋放數(shù)據(jù)流及其他處理操作所需要的資源徘键。