1. 什么是State
有時候除了一次處理一個event, 我們也想記錄處理多個event的信息,這個時候的操作就是stateful有狀態(tài)的。
2. 按key分組的state
按key分組的state就是一個key-value store, state的分區(qū)和分發(fā)都是嚴格和stream在一起的噩斟,對keyed state的訪問只能在對應的keyed stream上,這保證了keyed state的更新都是在本地發(fā)生的, 既保證了一致性又不需要有事務開銷。keyed state又進一步被組織成Key Groups. Key Groups是Flink能分發(fā)keyed state的最小單元,key groups的數量與最大并行度一致并级。
3.State Persistence
Flink通過 stream replay 和 checkpointting 實現容錯。一個checkpoint記錄了在某一點所有的input streams在所有operators的執(zhí)行狀態(tài)侮腹,通過恢復checkpoint嘲碧,并將stream重置到與checkpoint對應的event,可恢復flink應用的運行父阻。checkpoint是默認關閉的愈涩。為了配合checkpoint實現容錯,stream data source需要是能replay的加矛,比如kafka.
3.1 Checkpoingtting
checkpointting 是異步的履婉,checkpoint barriers可以不一致,每個operation可以異步地拍下各自狀態(tài)的快照斟览。
3.1.1 Barriers
stream barrier 是插入到data stream中的一條記錄谐鼎,用來區(qū)分哪些記錄應該在當前snapshot中,哪些應該到下一個snapshot中趣惠。barrier會攜帶當前snapshot的ID,是非常輕量的不會影響stream的處理身害。一個stream里可以同時存在多個barrier, 即多個snapshot可能在同時發(fā)生味悄。
stream barrier從data source開始插入到data stream, 一個中間operator從它所有的input streams都接收到snapshot n的barrier后,向它的所有output stream發(fā)出一個barrier. 當data sink 從它的所有input stream都接收到barrier后塌鸯,向jobmanager報告snapshot n已完成侍瑟,所有的data sink都完成以后,snapshot n結束。一個snapshot n結束之后涨颜,job不會再處理Sn之前的記錄费韭。
如果一個operator有多個input stream, 它要基于barrier對齊它們:
- operator收到某一個input stream的barrier以后,就不能再處理這個stream之后的event, 而應該把他們放到緩存里
- operator收到最后一個input stream的barrier后庭瑰,先發(fā)出所有要發(fā)出的屬于當前snapshot的記錄星持,再發(fā)出一個barrier給所有output stream
- operator把當前state記錄到snapshot, 恢復處理緩存中的記錄, 再處理新到的記錄
- operator將state異步寫回到state backend
3.1.2 Snapshotting Operator State
operator的state也要包含再snapshot中。
operator接收到它的所有input steam的barrier后弹灭,開始記錄自己的snapshot, 將snapshot存儲到state backend后督暂,向output stream發(fā)出barrier。
snapshot包含:
- 對每一個stream, snapshot開始時處理的數據位置offset/position
- 對每一個operator, 一個指向snapshot中state的指針
3.1.3 Recovery
一旦程序失敗了穷吮,Flink選擇一個最近的完整的checkpoint恢復operator的狀態(tài)和input stream的位置逻翁。如果state是增量創(chuàng)建的,則選擇一個最近的完整的捡鱼,然后依次將增量修改更新到state上八回。
3.2 Unaligned Checkpointting
不對齊的checkpointting過程如下:
- operator對接收到的barrier做出響應
- operator立即向output stream發(fā)出barrier
- operator將提前到達的記錄(在最后一個stream的barrier到達之前處理的其他stream中不屬于本snapshot的記錄)做上標記并創(chuàng)建一個快照
不對齊的checkpointting能保證盡快處理掉所有記錄,降低延遲驾诈,對于多個input stream速度不一致的情況很有效缠诅。
不對齊的checkpointting的做異常恢復時要先處理那些提前到達的記錄翘鸭,再處理新到達的記錄滴铅,其他過程與對齊的checkpointting一樣。
3.3 State Backends
State可以存儲在內存中就乓,也可以存儲在RocksDB中汉匙,選用的數據結果取決于選用的backend.
3.4 Savepoints
所有支持checkpoint的程序都支持savepoint. savepoint由用戶手動觸發(fā),且不會自動過期生蚁,其他與checkpoints一樣噩翠。
3.5 Exactly Once & At Least Once
對齊的checkpoint能夠保證exactly once, 而不對齊的checkpoints只能保證al least once. 因為不對齊的checkpoints中已經包含了應該在之后的snapshot中的數據,而恢復以后又會再處理一次邦投。
4. 批處理程序中的狀態(tài)和容錯
Flink以流處理的方式對待批處理伤锚,即一個DataSet被當作一個有界流處理,所以批處理中的狀態(tài)和容錯與流處理大體一致志衣,僅有以下幾個差一點:
- 批處理的容錯不使用checkpoints屯援,而只是把所有的數據重新處理一遍
- DataSet API中的狀態(tài)處理使用內存數據結構,而不是用key-value索引
- DataSet API引入了只能在有界流上使用的特殊的同步迭代方法