Flink檢查點(diǎn)機(jī)制與狀態(tài)管理

1 檢查點(diǎn)機(jī)制

1.1 CheckPoints

為了使 Flink 的狀態(tài)具有良好的容錯(cuò)性宏侍,F(xiàn)link 提供了檢查點(diǎn)機(jī)制 (CheckPoints) 徽千。通過檢查點(diǎn)機(jī)制爱榔,F(xiàn)link 定期在數(shù)據(jù)流上生成 checkpoint barrier 挟伙,當(dāng)某個(gè)算子收到 barrier 時(shí),即會(huì)基于當(dāng)前狀態(tài)生成一份快照荐开,然后再將該 barrier 傳遞到下游算子付翁,下游算子接收到該 barrier 后,也基于當(dāng)前狀態(tài)生成一份快照晃听,依次傳遞直至到最后的 Sink 算子上胆敞。當(dāng)出現(xiàn)異常后,F(xiàn)link 就可以根據(jù)最近的一次的快照數(shù)據(jù)將所有算子恢復(fù)到先前的狀態(tài)杂伟。

檢查點(diǎn)機(jī)制

1.2 開啟檢查點(diǎn)

默認(rèn)情況下移层,檢查點(diǎn)機(jī)制是關(guān)閉的,需要在程序中進(jìn)行開啟:

// 開啟檢查點(diǎn)機(jī)制赫粥,并指定狀態(tài)檢查點(diǎn)之間的時(shí)間間隔
env.enableCheckpointing(1000); 

// 其他可選配置如下:
// 設(shè)置語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 設(shè)置兩個(gè)檢查點(diǎn)之間的最小時(shí)間間隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 設(shè)置執(zhí)行Checkpoint操作時(shí)的超時(shí)時(shí)間
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 設(shè)置最大并發(fā)執(zhí)行的檢查點(diǎn)的數(shù)量
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 將檢查點(diǎn)持久化到外部存儲(chǔ)
env.getCheckpointConfig().enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 如果有更近的保存點(diǎn)時(shí)观话,是否將作業(yè)回退到該檢查點(diǎn)
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

1.3 保存點(diǎn)機(jī)制

保存點(diǎn)機(jī)制 (Savepoints) 是檢查點(diǎn)機(jī)制的一種特殊的實(shí)現(xiàn),它允許你通過手工的方式來觸發(fā) Checkpoint越平,并將結(jié)果持久化存儲(chǔ)到指定路徑中频蛔,主要用于避免 Flink 集群在重啟或升級(jí)時(shí)導(dǎo)致狀態(tài)丟失。

1.4 RichFunction 檢查點(diǎn)實(shí)戰(zhàn)

public class OperatorWarning implements CheckpointedFunction {
    // 非正常數(shù)據(jù)
    private List<Tuple2<String, Long>> bufferedData;
    // checkPointedState
    private transient ListState<Tuple2<String, Long>> checkPointedState;
   
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // 注意這里獲取的是OperatorStateStore
        checkPointedState = context.getOperatorStateStore().
                getListState(new ListStateDescriptor<>("abnormalData", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})));
        // 如果發(fā)生重啟秦叛,則需要從快照中將狀態(tài)進(jìn)行恢復(fù)
        if (context.isRestored()) {
            for (Tuple2<String, Long> element : checkPointedState.get()) {
                bufferedData.add(element);
            }
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // 在進(jìn)行快照時(shí)晦溪,將數(shù)據(jù)存儲(chǔ)到checkPointedState
        checkPointedState.clear();
        for (Tuple2<String, Long> element : bufferedData) {
            checkPointedState.add(element);
        }
    }
}

2 狀態(tài)管理

2.1 算子狀態(tài)

算子狀態(tài) (Operator State):顧名思義,狀態(tài)是和算子進(jìn)行綁定的挣跋,一個(gè)算子的狀態(tài)不能被其他算子所訪問到三圆。官方文檔上對(duì) Operator State 的解釋是:each operator state is bound to one parallel operator instance,所以更為確切的說一個(gè)算子狀態(tài)是與一個(gè)并發(fā)的算子實(shí)例所綁定的避咆,即假設(shè)算子的并行度是 2舟肉,那么其應(yīng)有兩個(gè)對(duì)應(yīng)的算子狀態(tài):


算子狀態(tài)

2.2 鍵控狀態(tài)

鍵控狀態(tài) (Keyed State) :是一種特殊的算子狀態(tài),即狀態(tài)是根據(jù) key 值進(jìn)行區(qū)分的查库,F(xiàn)link 會(huì)為每類鍵值維護(hù)一個(gè)狀態(tài)實(shí)例路媚。如下圖所示,每個(gè)顏色代表不同 key 值樊销,對(duì)應(yīng)四個(gè)不同的狀態(tài)實(shí)例整慎。需要注意的是鍵控狀態(tài)只能在 KeyedStream 上進(jìn)行使用,我們可以通過 stream.keyBy(...) 來得到 KeyedStream 围苫。


鍵控狀態(tài)

2.3 監(jiān)控狀態(tài)編程

Flink 提供了以下數(shù)據(jù)格式來管理和存儲(chǔ)鍵控狀態(tài) (Keyed State):

  • ValueState:存儲(chǔ)單值類型的狀態(tài)裤园。可以使用 update(T) 進(jìn)行更新够吩,并通過 T value() 進(jìn)行檢索比然。
  • ListState:存儲(chǔ)列表類型的狀態(tài)≈苎可以使用 add(T) 或 addAll(List) 添加元素强法;并通過 get() 獲得整個(gè)列表。
  • ReducingState:用于存儲(chǔ)經(jīng)過 ReduceFunction 計(jì)算后的結(jié)果湾笛,使用 add(T) 增加元素饮怯。
  • AggregatingState:用于存儲(chǔ)經(jīng)過 AggregatingState 計(jì)算后的結(jié)果,使用 add(IN) 添加元素嚎研。
  • FoldingState:已被標(biāo)識(shí)為廢棄蓖墅,會(huì)在未來版本中移除,官方推薦使用 AggregatingState 代替临扮。
  • MapState:維護(hù) Map 類型的狀態(tài)论矾。
 @Override
    public void open(Configuration parameters) {
        // 通過狀態(tài)名稱(句柄)獲取狀態(tài)實(shí)例,如果不存在則會(huì)自動(dòng)創(chuàng)建
//        abnormalData = getRuntimeContext().getListState(new ListStateDescriptor<>("abnormalData", Long.class));

        StateTtlConfig ttlConfig = StateTtlConfig
                // 設(shè)置有效期為 10 秒
                .newBuilder(Time.seconds(10))
                // 設(shè)置有效期更新規(guī)則杆勇,這里設(shè)置為當(dāng)創(chuàng)建和寫入時(shí)贪壳,都重置其有效期到規(guī)定的10秒
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                /*設(shè)置只要值過期就不可見,另外一個(gè)可選值是ReturnExpiredIfNotCleanedUp蚜退,
                 代表即使值過期了闰靴,但如果還沒有被物理刪除,就是可見的*/
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();
        ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("abnormalData", Long.class);
        descriptor.enableTimeToLive(ttlConfig);
        abnormalData = getRuntimeContext().getListState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out) throws Exception {
        Long inputValue = value.f1;
        // 如果輸入值超過閾值钻注,則記錄該次不正常的數(shù)據(jù)信息
        if (inputValue >= threshold) {
            abnormalData.add(inputValue);
        }

        ArrayList<Long> list = Lists.newArrayList(abnormalData.get().iterator());
        // 如果不正常的數(shù)據(jù)出現(xiàn)達(dá)到一定次數(shù)蚂且,則輸出報(bào)警信息
        if (list.size() >= numberOfTimes) {
            out.collect(Tuple2.of(value.f0 + " 超過指定閾值 ", list));
            // 報(bào)警信息輸出后,清空狀態(tài)
            abnormalData.clear();
        }
    }

2.4 算子狀態(tài)編程

相比于鍵控狀態(tài)幅恋,算子狀態(tài)目前支持的存儲(chǔ)類型只有以下三種:

  • ListState:存儲(chǔ)列表類型的狀態(tài)杏死。
  • UnionListState:存儲(chǔ)列表類型的狀態(tài),與 ListState 的區(qū)別在于:如果并行度發(fā)生變化捆交,ListState 會(huì)將該算子的所有并發(fā)的狀態(tài)實(shí)例進(jìn)行匯總识埋,然后均分給新的 Task;而 UnionListState 只是將所有并發(fā)的狀態(tài)實(shí)例匯總起來零渐,具體的劃分行為則由用戶進(jìn)行定義窒舟。
  • BroadcastState:用于廣播的算子狀態(tài)。
 @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // 注意這里獲取的是OperatorStateStore
        checkPointedState = context.getOperatorStateStore().
                getListState(new ListStateDescriptor<>("abnormalData", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})));
        // 如果發(fā)生重啟诵盼,則需要從快照中將狀態(tài)進(jìn)行恢復(fù)
        if (context.isRestored()) {
            for (Tuple2<String, Long> element : checkPointedState.get()) {
                bufferedData.add(element);
            }
        }
    }

備注:一個(gè)算子狀態(tài)是與一個(gè)并發(fā)的算子實(shí)例所綁定的惠豺,即假設(shè)算子的并行度是 2,那么其應(yīng)有兩個(gè)對(duì)應(yīng)的算子狀態(tài)

3 狀態(tài)后端

3.1 狀態(tài)管理實(shí)現(xiàn)方式

狀態(tài)管理的實(shí)現(xiàn)方式
  • MemoryStateBackend
    默認(rèn)的方式风宁,即基于 JVM 的堆內(nèi)存進(jìn)行存儲(chǔ)洁墙,主要適用于本地開發(fā)和調(diào)試。

  • FsStateBackend
    基于文件系統(tǒng)進(jìn)行存儲(chǔ)戒财,可以是本地文件系統(tǒng)热监,也可以是 HDFS 等分布式文件系統(tǒng)。 需要注意而是雖然選擇使用了 FsStateBackend 饮寞,但正在進(jìn)行的數(shù)據(jù)仍然是存儲(chǔ)在 TaskManager 的內(nèi)存中的孝扛,只有在 checkpoint 時(shí)列吼,才會(huì)將狀態(tài)快照寫入到指定文件系統(tǒng)上。

  • RocksDBStateBackend
    RocksDBStateBackend 是 Flink 內(nèi)置的第三方狀態(tài)管理器苦始,采用嵌入式的 key-value 型數(shù)據(jù)庫 RocksDB 來存儲(chǔ)正在進(jìn)行的數(shù)據(jù)寞钥。等到 checkpoint 時(shí),再將其中的數(shù)據(jù)持久化到指定的文件系統(tǒng)中陌选,所以采用 RocksDBStateBackend 時(shí)也需要配置持久化存儲(chǔ)的文件系統(tǒng)理郑。之所以這樣做是因?yàn)?RocksDB 作為嵌入式數(shù)據(jù)庫安全性比較低,但比起全文件系統(tǒng)的方式咨油,其讀取速率更快您炉;比起全內(nèi)存的方式,其存儲(chǔ)空間更大役电,因此它是一種比較均衡的方案赚爵。

3.2 配置方式

  • 基于代碼方式進(jìn)行配置,只對(duì)當(dāng)前作業(yè)生效:
// 配置 FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
// 配置 RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));

// 配置 RocksDBStateBackend 時(shí)宴霸,需要額外導(dǎo)入下面的依賴:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
    <version>1.12</version>
</dependency>
  • 基于 flink-conf.yaml 配置文件的方式進(jìn)行配置囱晴,對(duì)所有部署在該集群上的作業(yè)都生效:
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市瓢谢,隨后出現(xiàn)的幾起案子畸写,更是在濱河造成了極大的恐慌,老刑警劉巖氓扛,帶你破解...
    沈念sama閱讀 207,113評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件拯欧,死亡現(xiàn)場(chǎng)離奇詭異珍剑,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門合陵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來起意,“玉大人玖详,你說我怎么就攤上這事沸柔。” “怎么了整份?”我有些...
    開封第一講書人閱讀 153,340評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵待错,是天一觀的道長。 經(jīng)常有香客問我烈评,道長火俄,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,449評(píng)論 1 279
  • 正文 為了忘掉前任讲冠,我火速辦了婚禮瓜客,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己谱仪,他們只是感情好玻熙,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,445評(píng)論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著芽卿,像睡著了一般揭芍。 火紅的嫁衣襯著肌膚如雪胳搞。 梳的紋絲不亂的頭發(fā)上卸例,一...
    開封第一講書人閱讀 49,166評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音肌毅,去河邊找鬼筷转。 笑死,一個(gè)胖子當(dāng)著我的面吹牛悬而,可吹牛的內(nèi)容都是我干的呜舒。 我是一名探鬼主播,決...
    沈念sama閱讀 38,442評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼笨奠,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼袭蝗!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起般婆,我...
    開封第一講書人閱讀 37,105評(píng)論 0 261
  • 序言:老撾萬榮一對(duì)情侶失蹤到腥,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后蔚袍,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體乡范,經(jīng)...
    沈念sama閱讀 43,601評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,066評(píng)論 2 325
  • 正文 我和宋清朗相戀三年啤咽,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了晋辆。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,161評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡宇整,死狀恐怖瓶佳,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情鳞青,我是刑警寧澤霸饲,帶...
    沈念sama閱讀 33,792評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站盼玄,受9級(jí)特大地震影響贴彼,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜埃儿,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,351評(píng)論 3 307
  • 文/蒙蒙 一器仗、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦精钮、人聲如沸威鹿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽忽你。三九已至,卻和暖如春臂容,著一層夾襖步出監(jiān)牢的瞬間科雳,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評(píng)論 1 261
  • 我被黑心中介騙來泰國打工脓杉, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留糟秘,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,618評(píng)論 2 355
  • 正文 我出身青樓球散,卻偏偏與公主長得像尿赚,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蕉堰,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,916評(píng)論 2 344

推薦閱讀更多精彩內(nèi)容