Flink State晚岭、CheckPoint與Savepoint鸥印、Flink的Exactly Once

State

State簡述

State分類:

  • Keyed state
  • Operator state

State兩種形態(tài):

  • Raw State(原始狀態(tài))
    只需要繼承RichFunction系列而不需要額外繼承其他接口,因此從getRunntime中獲取State
    以字節(jié)流的形式寫入進 checkpoint
  • Managed State(托管狀態(tài))
    托管狀態(tài)可以使用 Flink runtime 提供的數(shù)據(jù)結構來表示坦报,例如內部哈希表或者 RocksDB辅甥。具體有 ValueState,ListState 等燎竖。Flink runtime 會對這些狀態(tài)進行編碼然后將它們寫入到 checkpoint 中璃弄。需要繼承實現(xiàn) CheckpointedFunction 或者 ListCheckpointed 接口。這兩個接口實現(xiàn)的方法中都可以通過context去獲取state构回。

推薦使用托管狀態(tài)夏块,因為如果使用托管狀態(tài),當并行度發(fā)生改變時纤掸,F(xiàn)link 可以自動的幫你重分配 state脐供,同時還可以更好的管理內存。

分配策略:

  • Event Split
  • Union redistribution

下面代碼片對CheckpointedFunction使用案例:

public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                //將數(shù)據(jù)發(fā)到外部系統(tǒng)
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}

ListCheckpointed
是一種受限的 CheckpointedFunction借跪,只支持 List 風格的狀態(tài)和 even-spit 的重分配策略政己。


State狀態(tài)存儲

  • MemoryStateBackend
    將狀態(tài)保存在JobManager,因此這種情況會對JobManager分配到的內存帶來壓力
    支持異步和同步
  • FsStateBackend
    工作狀態(tài)仍然是存儲在 Task Manager 中的內存中掏愁,雖然在 Checkpoint 的時候會存在文件中歇由,所以還是得注意這個狀態(tài)要保證不超過 Task Manager 的內存
  • RocksDBStateBackend
    相對于上述兩種狀態(tài)保存的方案卵牍,該中方式不會將State保存在內存中,而是保存在HDFS或者文件中沦泌,當恢復的時候再去讀取恢復接口糊昙,因此相比于狀態(tài)直接保存在內存中,效率會有所降低谢谦。但這種方式是Flink推薦的释牺。
    RocksDB支持增量保存,其原理如下:
    當使用 RocksDBStateBackend 時回挽,增量 Checkpoint 是如何實現(xiàn)的呢没咙?RocksDB 是一個基于 LSM 實現(xiàn)的 KV 數(shù)據(jù)庫。LSM 全稱 Log Structured Merge Trees千劈,LSM 樹本質是將大量的磁盤隨機寫操作轉換成磁盤的批量寫操作來極大地提升磁盤數(shù)據(jù)寫入效率镜撩。一般 LSM Tree 實現(xiàn)上都會有一個基于內存的 MemTable 介質,所有的增刪改操作都是寫入到 MemTable 中队塘,當 MemTable 足夠大以后,將 MemTable 中的數(shù)據(jù) flush 到磁盤中生成不可變且內部有序的 ssTable(Sorted String Table)文件宜鸯,全量數(shù)據(jù)保存在磁盤的多個 ssTable 文件中憔古。HBase 也是基于 LSM Tree 實現(xiàn)的,HBase 磁盤上的 HFile 就相當于這里的 ssTable 文件淋袖,每次生成的 HFile 都是不可變的而且內部有序的文件鸿市。基于 ssTable 不可變的特性即碗,才實現(xiàn)了增量 Checkpoint焰情,具體流程如下所示:


    image.png

第一次 Checkpoint 時生成的狀態(tài)快照信息包含了兩個 sstable 文件:sstable1 和 sstable2 及 Checkpoint1 的元數(shù)據(jù)文件 MANIFEST-chk1,所以第一次 Checkpoint 時需要將 sstable1剥懒、sstable2 和 MANIFEST-chk1 上傳到外部持久化存儲中内舟。第二次 Checkpoint 時生成的快照信息為 sstable1、sstable2初橘、sstable3 及元數(shù)據(jù)文件 MANIFEST-chk2验游,由于 sstable 文件的不可變特性,所以狀態(tài)快照信息的 sstable1保檐、sstable2 這兩個文件并沒有發(fā)生變化耕蝉,sstable1、sstable2 這兩個文件不需要重復上傳到外部持久化存儲中夜只,因此第二次 Checkpoint 時垒在,只需要將 sstable3 和 MANIFEST-chk2 文件上傳到外部持久化存儲中即可。這里只將新增的文件上傳到外部持久化存儲扔亥,也就是所謂的增量 Checkpoint场躯。

基于 LSM Tree 實現(xiàn)的數(shù)據(jù)庫為了提高查詢效率谈为,都需要定期對磁盤上多個 sstable 文件進行合并操作,合并時會將刪除的推盛、過期的以及舊版本的數(shù)據(jù)進行清理峦阁,從而降低 sstable 文件的總大小。圖中可以看到第三次 Checkpoint 時生成的快照信息為sstable3耘成、sstable4榔昔、sstable5 及元數(shù)據(jù)文件 MANIFEST-chk3, 其中新增了 sstable4 文件且 sstable1 和 sstable2 文件合并成 sstable5 文件瘪菌,因此第三次 Checkpoint 時只需要向外部持久化存儲上傳 sstable4撒会、sstable5 及元數(shù)據(jù)文件 MANIFEST-chk3。

基于 RocksDB 的增量 Checkpoint 從本質上來講每次 Checkpoint 時只將本次 Checkpoint 新增的快照信息上傳到外部的持久化存儲中师妙,依靠的是 LSM Tree 中 sstable 文件不可變的特性诵肛。對 LSM Tree 感興趣的同學可以深入研究 RocksDB 或 HBase 相關原理及實現(xiàn)。


CheckPoint

CheckPoint需要的先決條件:

  • Source需要支持數(shù)據(jù)的短時間重放功能
  • 需要一個能保存狀態(tài)的持久化存儲介質

CheckPoint如何配置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 開啟 Checkpoint默穴,每 1000毫秒進行一次 Checkpoint
env.enableCheckpointing(1000);

// Checkpoint 語義設置為 EXACTLY_ONCE
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// CheckPoint 的超時時間
env.getCheckpointConfig().setCheckpointTimeout(60000);

// 同一時間怔檩,只允許 有 1 個 Checkpoint 在發(fā)生
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 兩次 Checkpoint 之間的最小時間間隔為 500 毫秒
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// 當 Flink 任務取消時,保留外部保存的 CheckPoint 信息
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 當有較新的 Savepoint 時蓄诽,作業(yè)也會從 Checkpoint 處恢復
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

// 作業(yè)最多允許 Checkpoint 失敗 1 次(flink 1.9 開始支持)
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);

// Checkpoint 失敗后薛训,整個 Flink 任務也會失敗(flink 1.9 之前)
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(true)

Checkpoint與SavePoint的區(qū)別

image.png

Operate UID的重要性
如何為算子指定UID

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

CheckPoint流程

  1. JobManager中CheckPointCoordinator 會定期向所有 SourceTask 發(fā)送 CheckPointTrigger仑氛,Source Task 會在數(shù)據(jù)流中安插 Checkpoint barrier


    image.png

2.當 task 收到上游所有實例的 barrier 后乙埃,向自己的下游繼續(xù)傳遞 barrier,然后自身同步進行快照锯岖,并將自己的狀態(tài)異步寫入到持久化存儲中
?? - 如果是增量 Checkpoint介袜,則只是把最新的一部分更新寫入到外部持久化存儲中;
?? - 為了下游盡快進行 Checkpoint,所以 task 會先發(fā)送 barrier 到下游出吹,自身再同步進行快照;


image.png
  1. 當 task 將狀態(tài)信息完成備份后遇伞,會將備份數(shù)據(jù)的地址(state handle)通知給 JobManager 的CheckPointCoordinator,如果 Checkpoint 的持續(xù)時長超過了 Checkpoint 設定的超時時間CheckPointCoordinator 還沒有收集完所有的 State Handle捶牢,CheckPointCoordinator 就會認為本次 Checkpoint 失敗赃额,會把這次 Checkpoint 產生的所有狀態(tài)數(shù)據(jù)全部刪除

  2. 如果 CheckPointCoordinator 收集完所有算子的 State Handle,CheckPointCoordinator 會把整個 StateHandle 封裝成 completed Checkpoint Meta叫确,寫入到外部存儲中跳芳,Checkpoint 結束


    image.png

Flink Exactly Once

在Flink中的Exactly Once有兩種層面,一種是Flink CheckPoint處竹勉,還有一處是研究端到端的飞盆,因此讀者有必要在學習之前先熟悉它的兩個層面。

CheckPoint處的Exactly Once
在上述文章中,我們知道了CheckPoint的流程吓歇,但此時需要了解Barrier機制孽水。當Exactly Once中Task在收到來自Source的Barrier時,如果停下手中的數(shù)據(jù)處理任務并等待所有Source的Barrier城看,與此同時先把數(shù)據(jù)保存在緩存中女气,待所有Barrier收集完畢之后再去做CheckPoint,那么這種方式就稱呼為Barrier對齊测柠。如果不停下手中的任務(不去等待收集到所有Source實例的Barrier)炼鞠,等到收集到所有的Source Barrier再去做CheckPoint,那么這種方式就被稱呼為Barrier不對齊轰胁,就會產生整個系統(tǒng)的At least once谒主。在配置上來看是否配置的ExactlyOnce就是配置了Barrier是否對齊。

端到端的ExactlyOnce
仔細思考CheckPoint的機制就會發(fā)現(xiàn)赃阀,CheckPoint的ExactlyOnce并不能保證數(shù)據(jù)端到端的ExactlyOnce霎肯,比如說Sink在CHK100 ~ CHK101之間掛了,但是最近的一次成功CheckPoint為CHK100榛斯,在這期間以及寫入的Sink數(shù)據(jù)無法被刪除观游,那么端到端的就會退化成At least once。
那么如何解決端到端的Exactly Once呢驮俗,有兩種方案:

  • 假如我們使用的存儲介質支持按照全局主鍵去重
  • Sink數(shù)據(jù)時懂缕,與CheckPoint的時機做一次強綁定,Sink成功CheckPoint才可能成功意述,否則CheckPoint失敗,但這樣做有一個前提就是需要Sink端支持事務吮蛹。
    第一種方式比較好理解荤崇,這里我們主要討論第二種方式:
    首先需要先去了解一下2PC的實現(xiàn)方式,F(xiàn)link保證端到端的Exactly Once其實就是使用了這種思想潮针,將JobManager作為協(xié)調者术荤,所有的Sink不會直接提交已經處理好的數(shù)據(jù),而是先放在緩存中每篷,等到JobManager下發(fā)CHeckPoint之后瓣戚,在snapshot方法中對Sink進行數(shù)據(jù)precommit(只是flush操作但是不能commit事務),然后snapshot繼續(xù)做CheckPont的工作焦读,等JobManager收到所有算子的CheckPoint成功通知后子库,會調用CheckPointLinstener中的complete Hook通知所有的算子CheckPoint已經成功,這個時候各個算子才能真正的提交事務矗晃。
    因此這種方式需要考慮CheckPoint的間隔時間仑嗅,因為數(shù)據(jù)的真正寫入Sink時機完全是和CheckPoint同步的。

補充:
在FlinkKafkaConsumer中分區(qū)分配原則是使用assign的方式實現(xiàn);

在assign中使用的分區(qū)分配其實是使用的是round-robin的策略仓技;

當并行度改變時鸵贬,F(xiàn)linkKafka使用的是UnionSplit的策略方式,因為使用event-split可能獲取到的State和分區(qū)得到的Partition并不匹配脖捻;

在恢復時阔逼,若發(fā)現(xiàn)Partition不在內存中的TopPartition --> Offset映射關系中,則讓當前的Partition從EARLIEST處開始消費地沮。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末嗜浮,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子诉濒,更是在濱河造成了極大的恐慌周伦,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,639評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件未荒,死亡現(xiàn)場離奇詭異专挪,居然都是意外死亡,警方通過查閱死者的電腦和手機片排,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評論 3 385
  • 文/潘曉璐 我一進店門寨腔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人率寡,你說我怎么就攤上這事迫卢。” “怎么了冶共?”我有些...
    開封第一講書人閱讀 157,221評論 0 348
  • 文/不壞的土叔 我叫張陵乾蛤,是天一觀的道長。 經常有香客問我捅僵,道長家卖,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,474評論 1 283
  • 正文 為了忘掉前任庙楚,我火速辦了婚禮上荡,結果婚禮上,老公的妹妹穿的比我還像新娘馒闷。我一直安慰自己酪捡,他們只是感情好,可當我...
    茶點故事閱讀 65,570評論 6 386
  • 文/花漫 我一把揭開白布纳账。 她就那樣靜靜地躺著逛薇,像睡著了一般。 火紅的嫁衣襯著肌膚如雪疏虫。 梳的紋絲不亂的頭發(fā)上金刁,一...
    開封第一講書人閱讀 49,816評論 1 290
  • 那天帅涂,我揣著相機與錄音,去河邊找鬼尤蛮。 笑死媳友,一個胖子當著我的面吹牛,可吹牛的內容都是我干的产捞。 我是一名探鬼主播醇锚,決...
    沈念sama閱讀 38,957評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼坯临!你這毒婦竟也來了焊唬?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,718評論 0 266
  • 序言:老撾萬榮一對情侶失蹤看靠,失蹤者是張志新(化名)和其女友劉穎赶促,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體挟炬,經...
    沈念sama閱讀 44,176評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡鸥滨,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,511評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了谤祖。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片婿滓。...
    茶點故事閱讀 38,646評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖粥喜,靈堂內的尸體忽然破棺而出凸主,到底是詐尸還是另有隱情,我是刑警寧澤额湘,帶...
    沈念sama閱讀 34,322評論 4 330
  • 正文 年R本政府宣布卿吐,位于F島的核電站,受9級特大地震影響锋华,放射性物質發(fā)生泄漏嗡官。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,934評論 3 313
  • 文/蒙蒙 一供置、第九天 我趴在偏房一處隱蔽的房頂上張望谨湘。 院中可真熱鬧绽快,春花似錦芥丧、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至活孩,卻和暖如春物遇,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評論 1 266
  • 我被黑心中介騙來泰國打工询兴, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留乃沙,地道東北人。 一個月前我還...
    沈念sama閱讀 46,358評論 2 360
  • 正文 我出身青樓诗舰,卻偏偏與公主長得像警儒,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子眶根,可洞房花燭夜當晚...
    茶點故事閱讀 43,514評論 2 348

推薦閱讀更多精彩內容