Flink內(nèi)部Exactly Once三板斧:狀態(tài)甸祭、狀態(tài)后端與檢查點

Flink是一個分布式的流處理引擎缕碎,而流處理的其中一個特點就是7X24。那么池户,如何保障Flink作業(yè)的持續(xù)運行呢咏雌?Flink的內(nèi)部會將應(yīng)用狀態(tài)(state)存儲到本地內(nèi)存或者嵌入式的kv數(shù)據(jù)庫(RocksDB)中,由于采用的是分布式架構(gòu)校焦,F(xiàn)link需要對本地生成的狀態(tài)進行持久化存儲赊抖,以避免因應(yīng)用或者節(jié)點機器故障等原因?qū)е聰?shù)據(jù)的丟失,F(xiàn)link是通過checkpoint(檢查點)的方式將狀態(tài)寫入到遠程的持久化存儲寨典,從而就可以實現(xiàn)不同語義的結(jié)果保障氛雪。通過本文,你可以了解到什么是Flink的狀態(tài)耸成,F(xiàn)link的狀態(tài)是怎么存儲的报亩,F(xiàn)link可選擇的狀態(tài)后端(statebackend)有哪些,什么是全局一致性檢查點井氢,F(xiàn)link內(nèi)部如何通過檢查點實現(xiàn)Exactly Once的結(jié)果保障弦追。另外,本文內(nèi)容較長毙沾,建議關(guān)注加收藏骗卜。

什么是狀態(tài)

引子

關(guān)于什么是狀態(tài),我們先不做過多的分析左胞。首先看一個代碼案例寇仓,其中案例1是Spark的WordCount代碼,案例2是Flink的WorkCount代碼烤宙。

  • 案例1:Spark WC
object WordCount {
  def main(args:Array[String]){
  val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
  val ssc = new StreamingContext(conf, Seconds(5))
  val lines = ssc.socketTextStream("localhost", 9999)
  val words = lines.flatMap(_.split(" "))
  val pairs = words.map(word => (word, 1))
  val wordCounts = pairs.reduceByKey(_ + _)
  wordCounts.print()
  ssc.start()
  ssc.awaitTermination()
}
}

輸入:

C:\WINDOWS\system32>nc -lp 9999
hello spark
hello spark

輸出:

  • 案例2:Flink WC
public class WordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String,Integer>> words = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word : splits) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        words.keyBy(0).sum(1).print();
        env.execute("WC");
    }
}

輸入:

C:\WINDOWS\system32>nc -lp 9999
hello Flink
hello Flink

輸出:


從上面的兩個例子可以看出遍烦,在使用Spark進行詞頻統(tǒng)計時,當(dāng)前的統(tǒng)計結(jié)果不受歷史統(tǒng)計結(jié)果的影響躺枕,只計算接收的當(dāng)前數(shù)據(jù)的結(jié)果服猪,這個就可以理解為無狀態(tài)的計算。再來看一下Flink的例子拐云,可以看出當(dāng)?shù)诙卧~頻統(tǒng)計時罢猪,把第一次的結(jié)果值也統(tǒng)計在了一起,即Flink把上一次的計算結(jié)果保存在了狀態(tài)里叉瘩,第二次計算的時候會先拿到上一次的結(jié)果狀態(tài)膳帕,然后結(jié)合新到來的數(shù)據(jù)再進行計算,這就可以理解成有狀態(tài)的計算薇缅,如下圖所示危彩。

狀態(tài)的類別

Flink提供了兩種基本類型的狀態(tài):分別是 Keyed StateOperator State攒磨。根據(jù)不同的狀態(tài)管理方式,每種狀態(tài)又有兩種存在形式汤徽,分別為:managed(托管狀態(tài))raw(原生狀態(tài))娩缰。具體如下表格所示。需要注意的是谒府,由于Flink推薦使用managed state拼坎,所以下文主要討論managed state,對于raw state狱掂,本文不會做過多的討論演痒。

managed state & raw state區(qū)別

Keyed State & Operator State

Keyed State

Keyed State只能由作用在KeyedStream上面的函數(shù)使用,該狀態(tài)與某個key進行綁定趋惨,即每一個key對應(yīng)一個state鸟顺。Keyed State按照key進行維護和訪問的,F(xiàn)link會為每一個Key都維護一個狀態(tài)實例器虾,該狀態(tài)實例總是位于處理該key記錄的算子任務(wù)上讯嫂,因此同一個key的記錄可以訪問到一樣的狀態(tài)。如下圖所示兆沙,可以通過在一條流上使用keyBy()方法來生成一個KeyedStream欧芽。Flink提供了很多種keyed state,具體如下:

  • ValueState<T>

用于保存類型為T的單個值葛圃。用戶可以通過ValueState.value()來獲取該狀態(tài)值千扔,通過ValueState.update()來更新該狀態(tài)。使用ValueStateDescriptor來獲取狀態(tài)句柄库正。

  • ListState<T>

用于保存類型為T的元素列表曲楚,即key的狀態(tài)值是一個列表。用戶可以使用ListState.add()或者ListState.addAll()將新元素添加到列表中褥符,通過ListState.get()訪問狀態(tài)元素龙誊,該方法會返回一個可遍歷所有元素的Iterable<T>對象,注意ListState不支持刪除單個元素喷楣,但是用戶可以使用update(List<T> values)來更新整個列表趟大。使用 ListStateDescriptor來獲取狀態(tài)句柄。

  • ReducingState<T>

調(diào)用add()方法添加值時铣焊,會立即返回一個使用ReduceFunction聚合后的值逊朽,用戶可以使用ReducingState.get()來獲取該狀態(tài)值。使用 ReducingStateDescriptor來獲取狀態(tài)句柄曲伊。

  • AggregatingState<IN, OUT>

與ReducingState<T>類似惋耙,不同的是它使用的是AggregateFunction來聚合內(nèi)部的值,AggregatingState.get()方法會計算最終的結(jié)果并將其返回。使用 AggregatingStateDescriptor來獲取狀態(tài)句柄

  • MapState<UK, UV>

用于保存一組key绽榛、value的映射,類似于java的Map集合婿屹。用戶可以通過get(UK key)方法獲取key對應(yīng)的狀態(tài)灭美,可以通過put(UK k,UV value)方法添加一個鍵值,可以通過remove(UK key)刪除給定key的值昂利,可以通過contains(UK key)判斷是否存在對應(yīng)的key届腐。使用 MapStateDescriptor來獲取狀態(tài)句柄。

  • FoldingState<T, ACC>

在Flink 1.4的版本中標(biāo)記過時蜂奸,在未來的版本中會被移除犁苏,使用AggregatingState進行代替。

值得注意的是扩所,上面的狀態(tài)原語都支持通過State.clear()方法來進行清除狀態(tài)围详。另外,上述的狀態(tài)原語僅用于與狀態(tài)進行交互祖屏,真正的狀態(tài)是存儲在狀態(tài)后端(后面會介紹狀態(tài)后端)的助赞,通過該狀態(tài)原語相當(dāng)于持有了狀態(tài)的句柄(handle)。

keyed State使用案例

下面給出一個MapState的使用案例袁勺,關(guān)于ValueState的使用情況可以參考官網(wǎng)雹食,具體如下:

public class MapStateExample {

    //統(tǒng)計每個用戶每種行為的個數(shù)
    public static class UserBehaviorCnt extends RichFlatMapFunction<Tuple3<Long, String, String>, Tuple3<Long, String, Integer>> {

        //定義一個MapState句柄
        private transient MapState<String, Integer> behaviorCntState;

        // 初始化狀態(tài)
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            MapStateDescriptor<String, Integer> userBehaviorMapStateDesc = new MapStateDescriptor<>(
                    "userBehavior",  // 狀態(tài)描述符的名稱
                    TypeInformation.of(new TypeHint<String>() {}),  // MapState狀態(tài)的key的數(shù)據(jù)類型
                    TypeInformation.of(new TypeHint<Integer>() {})  // MapState狀態(tài)的value的數(shù)據(jù)類型
            );
            behaviorCntState = getRuntimeContext().getMapState(userBehaviorMapStateDesc); // 獲取狀態(tài)
        }

        @Override
        public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple3<Long, String, Integer>> out) throws Exception {
            Integer behaviorCnt = 1;
            // 如果當(dāng)前狀態(tài)包括該行為,則+1
            if (behaviorCntState.contains(value.f1)) {
                behaviorCnt = behaviorCntState.get(value.f1) + 1;
            }
            // 更新狀態(tài)
            behaviorCntState.put(value.f1, behaviorCnt);
            out.collect(Tuple3.of(value.f0, value.f1, behaviorCnt));
        }
    }
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        // 模擬數(shù)據(jù)源[userId,behavior,product]
        DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements(
                Tuple3.of(1L, "buy", "iphone"),
                Tuple3.of(1L, "cart", "huawei"),
                Tuple3.of(1L, "buy", "logi"),
                Tuple3.of(1L, "fav", "oppo"),
                Tuple3.of(2L, "buy", "huawei"),
                Tuple3.of(2L, "buy", "onemore"),
                Tuple3.of(2L, "fav", "iphone"));
        userBehaviors
                .keyBy(0)
                .flatMap(new UserBehaviorCnt())
                .print();
        env.execute("MapStateExample");
    }
}

結(jié)果輸出:

狀態(tài)的生命周期管理(TTL)

對于任何類型Keyed State都可以設(shè)定狀態(tài)的生命周期(TTL),即狀態(tài)的存活時間期丰,以確保能夠在規(guī)定時間內(nèi)及時地清理狀態(tài)數(shù)據(jù)群叶。如果配置了狀態(tài)的TTL,那么當(dāng)狀態(tài)過期時钝荡,存儲的狀態(tài)會被清除街立。狀態(tài)生命周期功能可以通過StateTtlConfig配置,然后將StateTtlConfig配置傳入StateDescriptor中的enableTimeToLive方法中即可化撕。代碼示例如下:

StateTtlConfig ttlConfig = StateTtlConfig
                 // 指定TTL時長為10S
                .newBuilder(Time.seconds(10))
                 // 只對創(chuàng)建和寫入操作有效
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                 // 不返回過期的數(shù)據(jù)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) 
                .build();

        // 初始化狀態(tài)
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            MapStateDescriptor<String, Integer> userBehaviorMapStateDesc = new MapStateDescriptor<>(
                    "userBehavior",  // 狀態(tài)描述符的名稱
                    TypeInformation.of(new TypeHint<String>() {}),  // MapState狀態(tài)的key的數(shù)據(jù)類型
                    TypeInformation.of(new TypeHint<Integer>() {})  // MapState狀態(tài)的value的數(shù)據(jù)類型

            );
            // 設(shè)置stateTtlConfig
            userBehaviorMapStateDesc.enableTimeToLive(ttlConfig);
            behaviorCntState = getRuntimeContext().getMapState(userBehaviorMapStateDesc); // 獲取狀態(tài)

        }

在StateTtlConfig創(chuàng)建時几晤,newBuilder方法是必須要指定的,newBuilder中設(shè)定過期時間的參數(shù)植阴。對于其他參數(shù)都是可選的或使用默認(rèn)值蟹瘾。其中setUpdateType方法中傳入的類型有三種:

public enum UpdateType {
        //禁用TTL,永遠不會過期
        Disabled,
        // 創(chuàng)建和寫入時更新TTL
        OnCreateAndWrite,
        // 與OnCreateAndWrite類似,但是在讀操作時也會更新TTL
        OnReadAndWrite
    }

值得注意的是掠手,過期的狀態(tài)數(shù)據(jù)根據(jù)UpdateType參數(shù)進行配置憾朴,只有被寫入或者讀取的時間才會更新TTL,也就是說如果某個狀態(tài)指標(biāo)一直不被使用或者更新喷鸽,則永遠不會觸發(fā)對該狀態(tài)數(shù)據(jù)的清理操作众雷,這種情況可能會導(dǎo)致系統(tǒng)中的狀態(tài)數(shù)據(jù)越來越大。目前用戶可以使用StateTtlConfig.cleanupFullSnapshot設(shè)定當(dāng)觸發(fā)State Snapshot的時候清理狀態(tài)數(shù)據(jù),但是改配置不適合用于RocksDB做增量Checkpointing的操作砾省。

上面的StateTtlConfig創(chuàng)建時鸡岗,可以指定setStateVisibility,用于狀態(tài)的可見性配置编兄,根據(jù)過期數(shù)據(jù)是否被清理來確定是否返回狀態(tài)數(shù)據(jù)轩性。

    /**
     * 是否返回過期的數(shù)據(jù)
     */
    public enum StateVisibility {
        //如果數(shù)據(jù)沒有被清理,就可以返回
        ReturnExpiredIfNotCleanedUp,
        //永遠不返回過期的數(shù)據(jù),默認(rèn)值
        NeverReturnExpired
    }

Operator State

Operator State的作用于是某個算子任務(wù)狠鸳,這意味著所有在同一個并行任務(wù)之內(nèi)的記錄都能訪問到相同的狀態(tài) 揣苏。算子狀態(tài)不能通過其他任務(wù)訪問,無論該任務(wù)是相同的算子件舵。如下圖所示卸察。

Operator State是一種non-keyed state,與并行的操作算子實例相關(guān)聯(lián)铅祸,例如在Kafka Connector中坑质,每個Kafka消費端算子實例都對應(yīng)到Kafka的一個分區(qū)中,維護Topic分區(qū)和Offsets偏移量作為算子的Operator State个少。在Flink中可以實現(xiàn)ListCheckpointed<T extends Serializable>接口或者CheckpointedFunction 接口來實現(xiàn)一個Operator State洪乍。

首先,我們先看一下這兩個接口的具體實現(xiàn)夜焦,然后再給出這兩種接口的具體使用案例壳澳。先看一下ListCheckpointed接口的源碼,如下:

public interface ListCheckpointed<T extends Serializable> {
    
    /**
     * 獲取某個算子實例的當(dāng)前狀態(tài)茫经,該狀態(tài)包括該算子實例之前被調(diào)用時的所有結(jié)果
     * 以列表的形式返回一個函數(shù)狀態(tài)的快照
     * Flink觸發(fā)生成檢查點時調(diào)用該方法
     * @param checkpointId checkpoint的ID,是一個唯一的巷波、單調(diào)遞增的值
     * @param timestamp Job Manager觸發(fā)checkpoint時的時間戳
     * @return  返回一個operator state list,如果為null時,返回空list
     * @throws Exception
     */
    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
    /**
     * 初始化函數(shù)狀態(tài)時調(diào)用,可能是在作業(yè)啟動時或者故障恢復(fù)時
     * 根據(jù)提供的列表恢復(fù)函數(shù)狀態(tài)
     * 注意:當(dāng)實現(xiàn)該方法時卸伞,需要在RichFunction#open()方法之前調(diào)用該方法
     * @param state 被恢復(fù)算子實例的state列表 抹镊,可能為空
     * @throws Exception
     */
    void restoreState(List<T> state) throws Exception;
}

使用Operator ListState時,在進行擴縮容時荤傲,重分布的策略(狀態(tài)恢復(fù)的模式)如下圖所示:

.png

上面的重分布策略為Even-split Redistribution垮耳,即每個算子實例中含有部分狀態(tài)元素的List列表,整個狀態(tài)數(shù)據(jù)是所有List列表的合集遂黍。當(dāng)觸發(fā)restore/redistribution動作時终佛,通過將狀態(tài)數(shù)據(jù)平均分配成與算子并行度相同數(shù)量的List列表,每個task實例中有一個List雾家,其可以為空或者含有多個元素铃彰。

我們再來看一下CheckpointedFunction接口,源碼如下:

public interface CheckpointedFunction {

    /**
     * 會在生成檢查點之前調(diào)用
     * 該方法的目的是確保檢查點開始之前所有狀態(tài)對象都已經(jīng)更新完畢
     * @param context 使用FunctionSnapshotContext作為參數(shù)
     *                從FunctionSnapshotContext可以獲取checkpoint的元數(shù)據(jù)信息芯咧,
     *                比如checkpoint編號牙捉,JobManager在初始化checkpoint時的時間戳
     * @throws Exception
     */
    void snapshotState(FunctionSnapshotContext context) throws Exception;

    /**
     * 在創(chuàng)建checkpointedFunction的并行實例時被調(diào)用竹揍,
     * 在應(yīng)用啟動或者故障重啟時觸發(fā)該方法的調(diào)用
     * @param context 傳入FunctionInitializationContext對象,
     *                   可以使用該對象訪問OperatorStateStore和 KeyedStateStore對象邪铲,
     *                   這兩個對象可以獲取狀態(tài)的句柄芬位,即通過Flink runtime來注冊函數(shù)狀態(tài)并返回state對象
     *                   比如:ValueState、ListState等
     * @throws Exception
     */
    void initializeState(FunctionInitializationContext context) throws Exception;
}

CheckpointedFunction接口是用于指定有狀態(tài)函數(shù)的最底層的接口阴孟,該接口提供了用于注冊和維護keyed state 與operator state的hook(即可以同時使用keyed state 和operator state),另外也是唯一支持使用list union state撼泛。關(guān)于Union List State,使用的是Flink為Operator state提供的另一種重分布的策略:Union Redistribution杆兵,即每個算子實例中含有所有狀態(tài)元素的List列表灶体,當(dāng)觸發(fā)restore/redistribution動作時精算,每個算子都能夠獲取到完整的狀態(tài)元素列表。具體如下圖所示:

ListCheckpointed

ListCheckpointed接口和CheckpointedFunction接口相比在靈活性上相對弱一些,只能支持List類型的狀態(tài)憋槐,并且在數(shù)據(jù)恢復(fù)的時候僅支持even-redistribution策略。該接口不像Flink提供的Keyed State(比如Value State淑趾、ListState)那樣直接在狀態(tài)后端(state backend)注冊阳仔,需要將operator state實現(xiàn)為成員變量,然后通過接口提供的回調(diào)函數(shù)與狀態(tài)后端進行交互治笨。使用代碼案例如下:

public class ListCheckpointedExample {
    private static class UserBehaviorCnt extends RichFlatMapFunction<Tuple3<Long, String, String>, Tuple2<String, Long>> implements ListCheckpointed<Long> {
        private Long userBuyBehaviorCnt = 0L;
        @Override
        public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple2<String, Long>> out) throws Exception {
            if(value.f1.equals("buy")){
                userBuyBehaviorCnt ++;
                out.collect(Tuple2.of("buy",userBuyBehaviorCnt));
            }
        }
        @Override
        public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
            //返回單個元素的List集合驳概,該集合元素是用戶購買行為的數(shù)量
            return Collections.singletonList(userBuyBehaviorCnt);
        }
        @Override
        public void restoreState(List<Long> state) throws Exception {
            // 在進行擴縮容之后,進行狀態(tài)恢復(fù)旷赖,需要把其他subtask的狀態(tài)加在一起
            for (Long cnt : state) {
                userBuyBehaviorCnt += 1;
            }
        }
    }
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        // 模擬數(shù)據(jù)源[userId,behavior,product]
        DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements(
                Tuple3.of(1L, "buy", "iphone"),
                Tuple3.of(1L, "cart", "huawei"),
                Tuple3.of(1L, "buy", "logi"),
                Tuple3.of(1L, "fav", "oppo"),
                Tuple3.of(2L, "buy", "huawei"),
                Tuple3.of(2L, "buy", "onemore"),
                Tuple3.of(2L, "fav", "iphone"));

        userBehaviors
                .flatMap(new UserBehaviorCnt())
                .print();

        env.execute("ListCheckpointedExample");
    }
}

CheckpointedFunction

CheckpointedFunction接口提供了更加豐富的操作顺又,比如支持Union list state,可以訪問keyedState等孵,關(guān)于重分布策略稚照,如果使用Even-split Redistribution策略,則通過context. getListState(descriptor)獲取Operator State俯萌;如果使用UnionRedistribution策略果录,則通過context. getUnionList State(descriptor)來獲取。使用案例如下:

public class CheckpointFunctionExample {
    private static class UserBehaviorCnt implements CheckpointedFunction, FlatMapFunction<Tuple3<Long, String, String>, Tuple3<Long, Long, Long>> {
        // 統(tǒng)計每個operator實例的用戶行為數(shù)量的本地變量
        private Long opUserBehaviorCnt = 0L;
        // 每個key的state,存儲key對應(yīng)的相關(guān)狀態(tài)
        private ValueState<Long> keyedCntState;
        // 定義operator state咐熙,存儲算子的狀態(tài)
        private ListState<Long> opCntState;

        @Override
        public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple3<Long, Long, Long>> out) throws Exception {
            if (value.f1.equals("buy")) {
                // 更新算子狀態(tài)本地變量值
                opUserBehaviorCnt += 1;
                Long keyedCount = keyedCntState.value();
                // 更新keyedstate的狀態(tài) ,判斷狀態(tài)是否為null弱恒,否則空指針異常
                keyedCntState.update(keyedCount == null ? 1L : keyedCount + 1 );
                // 結(jié)果輸出
                out.collect(Tuple3.of(value.f0, keyedCntState.value(), opUserBehaviorCnt));
            }
        }
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            // 使用opUserBehaviorCnt本地變量更新operator state
            opCntState.clear();
            opCntState.add(opUserBehaviorCnt);
        }

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {

            // 通過KeyedStateStore,定義keyedState的StateDescriptor描述符
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("keyedCnt", TypeInformation.of(new TypeHint<Long>() {
            }));

            // 通過OperatorStateStore,定義OperatorState的StateDescriptor描述符
            ListStateDescriptor opStateDescriptor = new ListStateDescriptor("opCnt", TypeInformation.of(new TypeHint<Long>() {
            }));
            // 初始化keyed state狀態(tài)值
            keyedCntState = context.getKeyedStateStore().getState(valueStateDescriptor);
            // 初始化operator state狀態(tài)
            opCntState = context.getOperatorStateStore().getListState(opStateDescriptor);
            // 初始化本地變量operator state
            for (Long state : opCntState.get()) {
                opUserBehaviorCnt += state;
            }
        }
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        // 模擬數(shù)據(jù)源[userId,behavior,product]
        DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements(
                Tuple3.of(1L, "buy", "iphone"),
                Tuple3.of(1L, "cart", "huawei"),
                Tuple3.of(1L, "buy", "logi"),
                Tuple3.of(1L, "fav", "oppo"),
                Tuple3.of(2L, "buy", "huawei"),
                Tuple3.of(2L, "buy", "onemore"),
                Tuple3.of(2L, "fav", "iphone"));

        userBehaviors
                .keyBy(0)
                .flatMap(new UserBehaviorCnt())
                .print();
        env.execute("CheckpointFunctionExample");
    }
}

什么是狀態(tài)后端

上面使用的狀態(tài)都需要存儲到狀態(tài)后端(StateBackend),然后在checkpoint觸發(fā)時棋恼,將狀態(tài)持久化到外部存儲系統(tǒng)返弹。Flink提供了三種類型的狀態(tài)后端锈玉,分別是基于內(nèi)存的狀態(tài)后端(MemoryStateBackend、基于文件系統(tǒng)的狀態(tài)后端(FsStateBackend)以及基于RockDB作為存儲介質(zhì)的RocksDB StateBackend义起。這三種類型的StateBackend都能夠有效地存儲Flink流式計算過程中產(chǎn)生的狀態(tài)數(shù)據(jù)拉背,在默認(rèn)情況下Flink使用的是MemoryStateBackend,區(qū)別見下表默终。下面分別對每種狀態(tài)后端的特點進行說明椅棺。

狀態(tài)后端的類別

MemoryStateBackend

MemoryStateBackend將狀態(tài)數(shù)據(jù)全部存儲在JVM堆內(nèi)存中,包括用戶在使用DataStream API中創(chuàng)建的Key/Value State齐蔽,窗口中緩存的狀態(tài)數(shù)據(jù)两疚,以及觸發(fā)器等數(shù)據(jù)。MemoryStateBackend具有非澈危快速和高效的特點鬼雀,但也具有非常多的限制,最主要的就是內(nèi)存的容量限制蛙吏,一旦存儲的狀態(tài)數(shù)據(jù)過多就會導(dǎo)致系統(tǒng)內(nèi)存溢出等問題,從而影響整個應(yīng)用的正常運行鞋吉。同時如果機器出現(xiàn)問題鸦做,整個主機內(nèi)存中的狀態(tài)數(shù)據(jù)都會丟失,進而無法恢復(fù)任務(wù)中的狀態(tài)數(shù)據(jù)谓着。因此從數(shù)據(jù)安全的角度建議用戶盡可能地避免在生產(chǎn)環(huán)境中使用MemoryStateBackend泼诱。Flink將MemoryStateBackend作為默認(rèn)狀態(tài)后端。

MemoryStateBackend比較適合用于測試環(huán)境中赊锚,并用于本地調(diào)試和驗證治筒,不建議在生產(chǎn)環(huán)境中使用。但如果應(yīng)用狀態(tài)數(shù)據(jù)量不是很大舷蒲,例如使用了大量的非狀態(tài)計算算子耸袜,也可以在生產(chǎn)環(huán)境中使MemoryStateBackend.

FsStateBackend

FsStateBackend是基于文件系統(tǒng)的一種狀態(tài)后端,這里的文件系統(tǒng)可以是本地文件系統(tǒng)牲平,也可以是HDFS分布式文件系統(tǒng)堤框。創(chuàng)建FsStateBackend的構(gòu)造函數(shù)如下:

FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots)

其中path如果為本地路徑,其格式為“file:///data/flink/checkpoints”纵柿,如果path為HDFS路徑蜈抓,其格式為“hdfs://nameservice/flink/checkpoints”。FsStateBackend中第二個Boolean類型的參數(shù)指定是否以同步的方式進行狀態(tài)數(shù)據(jù)記錄昂儒,默認(rèn)采用異步的方式將狀態(tài)數(shù)據(jù)同步到文件系統(tǒng)中沟使,異步方式能夠盡可能避免在Checkpoint的過程中影響流式計算任務(wù)。如果用戶想采用同步的方式進行狀態(tài)數(shù)據(jù)的檢查點數(shù)據(jù)渊跋,則將第二個參數(shù)指定為True即可腊嗡。

相比于MemoryStateBackend, FsStateBackend更適合任務(wù)狀態(tài)非常大的情況着倾,例如應(yīng)用中含有時間范圍非常長的窗口計算,或Key/value State狀態(tài)數(shù)據(jù)量非常大的場景叽唱,這時系統(tǒng)內(nèi)存不足以支撐狀態(tài)數(shù)據(jù)的存儲屈呕。同時FsStateBackend最大的好處是相對比較穩(wěn)定,在checkpoint時棺亭,將狀態(tài)持久化到像HDFS分布式文件系統(tǒng)中虎眨,能最大程度保證狀態(tài)數(shù)據(jù)的安全性。

RocksDBStateBackend

與前面的狀態(tài)后端不同镶摘,RocksDBStateBackend需要單獨引入相關(guān)的依賴包嗽桩。RocksDB 是一個 key/value 的內(nèi)存存儲系統(tǒng),類似于HBase凄敢,是一種內(nèi)存磁盤混合的 LSM DB碌冶。當(dāng)寫數(shù)據(jù)時會先寫進write buffer(類似于HBase的memstore),然后在flush到磁盤文件涝缝,當(dāng)讀取數(shù)據(jù)時會現(xiàn)在block cache(類似于HBase的block cache)扑庞,所以速度會很快。

RocksDBStateBackend在性能上要比FsStateBackend高一些拒逮,主要是因為借助于RocksDB存儲了最新熱數(shù)據(jù)罐氨,然后通過異步的方式再同步到文件系統(tǒng)中,但RocksDBStateBackend和MemoryStateBackend相比性能就會較弱一些滩援。

需要注意 RocksDB 不支持同步的 Checkpoint栅隐,構(gòu)造方法中沒有同步快照這個選項。不過 RocksDB 支持增量的 Checkpoint玩徊,也是目前唯一增量 Checkpoint 的 Backend租悄,意味著并不需要把所有 sst 文件上傳到 Checkpoint 目錄,僅需要上傳新生成的 sst 文件即可恩袱。它的 Checkpoint 存儲在外部文件系統(tǒng)(本地或HDFS)泣棋,其容量限制只要單個 TaskManager 上 State 總量不超過它的內(nèi)存+磁盤,單 Key最大 2G憎蛤,總大小不超過配置的文件系統(tǒng)容量即可外傅。對于超大狀態(tài)的作業(yè),例如天級窗口聚合等場景下可以使會用該狀態(tài)后端俩檬。

配置狀態(tài)后端

Flink默認(rèn)使用的狀態(tài)后端是MemoryStateBackend萎胰,所以不需要顯示配置。對于其他的狀態(tài)后端棚辽,都需要進行顯性配置技竟。在Flink中包含了兩種級別的StateBackend配置:一種是在程序中進行配置,該配置只對當(dāng)前應(yīng)用有效屈藐;另外一種是通過 flink-conf.yaml進行全局配置榔组,一旦配置就會對整個Flink集群上的所有應(yīng)用有效熙尉。

  • 應(yīng)用級別配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

如果使用RocksDBStateBackend則需要單獨引入rockdb依賴庫,如下:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.10.0</version>
    <scope>provided</scope>
</dependency>

使用方式與FsStateBackend類似,如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));

  • 集群級別配置

具體的配置項在flink-conf.yaml文件中搓扯,如下代碼所示检痰,參數(shù)state.backend指明StateBackend類型,state.checkpoints.dir配置具體的狀態(tài)存儲路徑锨推,代碼中使用filesystem作為StateBackend铅歼,然后指定相應(yīng)的HDFS文件路徑作為state的checkpoint文件夾。

# 使用filesystem存儲
state.backend: filesystem
# checkpoint存儲路徑
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

如果想用RocksDBStateBackend配置集群級別的狀態(tài)后端换可,可以使用下面的配置:

# 操作RocksDBStateBackend的線程數(shù)量椎椰,默認(rèn)值為1
state.backend.rocksdb.checkpoint.transfer.thread.num: 1
# 指定RocksDB存儲狀態(tài)數(shù)據(jù)的本地文件路徑
state.backend.rocksdb.localdir: /var/rockdb/checkpoints
# 用于指定定時器服務(wù)的工廠類實現(xiàn)類,默認(rèn)為“HEAP”沾鳄,也可以指定為“RocksDB”
state.backend.rocksdb.timer-service.factory: HEAP

什么是Checkpoint(檢查點)

上面講解了Flink的狀態(tài)以及狀態(tài)后端慨飘,狀態(tài)是存儲在狀態(tài)后端。為了保證state容錯译荞,F(xiàn)link提供了處理故障的措施瓤的,這種措施稱之為checkpoint(一致性檢查點)。checkpoint是Flink實現(xiàn)容錯的核心功能吞歼,主要是周期性地觸發(fā)checkpoint堤瘤,將state生成快照持久化到外部存儲系統(tǒng)(比如HDFS)。這樣一來浆熔,如果Flink程序出現(xiàn)故障,那么就可以從上一次checkpoint中進行狀態(tài)恢復(fù)桥帆,從而提供容錯保障医增。另外,通過checkpoint機制老虫,F(xiàn)link可以實現(xiàn)Exactly-once語義(Flink內(nèi)部的Exactly-once,關(guān)于端到端的exactly_once,Flink是通過兩階段提交協(xié)議實現(xiàn)的)叶骨。下面將會詳細(xì)分析Flink的checkpoint機制。

檢查點的生成

如上圖祈匙,輸入流是用戶行為數(shù)據(jù)忽刽,包括購買(buy)和加入購物車(cart)兩種,每種行為數(shù)據(jù)都有一個偏移量夺欲,統(tǒng)計每種行為的個數(shù)跪帝。

第一步:JobManager checkpoint coordinator 觸發(fā)checkpoint。

第二步:假設(shè)當(dāng)消費到[cart些阅,3]這條數(shù)據(jù)時伞剑,觸發(fā)了checkpoint。那么此時數(shù)據(jù)源會把消費的偏移量3寫入持久化存儲市埋。

第三步:當(dāng)寫入結(jié)束后黎泣,source會將state handle(狀態(tài)存儲路徑)反饋給JobManager的checkpoint coordinator恕刘。

第四步:接著算子count buy與count cart也會進行同樣的步驟

第五步:等所有的算子都完成了上述步驟之后,即當(dāng) Checkpoint coordinator 收集齊所有 task 的 state handle抒倚,就認(rèn)為這一次的 Checkpoint 全局完成了褐着,向持久化存儲中再備份一個 Checkpoint meta 文件,那么整個checkpoint也就完成了托呕,如果中間有一個不成功含蓉,那么本次checkpoin就宣告失敗。

檢查點的恢復(fù)

通過上面的分析镣陕,或許你已經(jīng)對Flink的checkpoint有了初步的認(rèn)識谴餐。那么接下來,我們看一下是如何從檢查點恢復(fù)的呆抑。

  • 任務(wù)失敗
  • 重啟作業(yè)
  • 恢復(fù)檢查點
  • 繼續(xù)處理數(shù)據(jù)

上述過程具體總結(jié)如下:

  • 第一步:重啟作業(yè)
  • 第二步:從上一次檢查點恢復(fù)狀態(tài)數(shù)據(jù)
  • 第三步:繼續(xù)處理新的數(shù)據(jù)

Flink內(nèi)部Exactly-Once實現(xiàn)

Flink提供了精確一次的處理語義岂嗓,精確一次的處理語義可以理解為:數(shù)據(jù)可能會重復(fù)計算,但是結(jié)果狀態(tài)只有一個鹊碍。Flink通過Checkpoint機制實現(xiàn)了精確一次的處理語義厌殉,F(xiàn)link在觸發(fā)Checkpoint時會向Source端插入checkpoint barrier,checkpoint barriers是從source端插入的侈咕,并且會向下游算子進行傳遞公罕。checkpoint barriers攜帶一個checkpoint ID,用于標(biāo)識屬于哪一個checkpoint耀销,checkpoint barriers將流邏輯是哪個分為了兩部分楼眷。對于雙流的情況,通過barrier對齊的方式實現(xiàn)精確一次的處理語義熊尉。

關(guān)于什么是checkpoint barrier罐柳,可以看一下CheckpointBarrier類的源碼描述,如下:

/**
 * Checkpoint barriers用來在數(shù)據(jù)流中實現(xiàn)checkpoint對齊的.
 * Checkpoint barrier由JobManager的checkpoint coordinator插入到Source中,
 * Source會把barrier廣播發(fā)送到下游算子,當(dāng)一個算子接收到了其中一個輸入流的Checkpoint barrier時,
 * 它就會知道已經(jīng)處理完了本次checkpoint與上次checkpoint之間的數(shù)據(jù).
 * 
 * 一旦某個算子接收到了所有輸入流的checkpoint barrier時狰住,
 * 意味著該算子的已經(jīng)處理完了截止到當(dāng)前checkpoint的數(shù)據(jù)张吉,
 * 可以觸發(fā)checkpoint,并將barrier向下游傳遞
 * 
 * 根據(jù)用戶選擇的處理語義催植,在checkpoint完成之前會緩存后一次checkpoint的數(shù)據(jù)肮蛹,
 * 直到本次checkpoint完成(exactly once)
 * 
 * checkpoint barrier的id是嚴(yán)格單調(diào)遞增的
 *
 */
    public class CheckpointBarrier extends RuntimeEvent {...}

可以看出checkpoint barrier主要功能是實現(xiàn)checkpoint對齊的,從而可以實現(xiàn)Exactly-Once處理語義创南。

下面將會對checkpoint過程進行分解伦忠,具體如下:

圖1,包括兩個流稿辙,每個任務(wù)都會消費一條用戶行為數(shù)據(jù)(包括購買(buy)和加購(cart))缓苛,數(shù)字代表該數(shù)據(jù)的偏移量,count buy任務(wù)統(tǒng)計購買行為的個數(shù),coun cart統(tǒng)計加購行為的個數(shù)未桥。

圖2笔刹,觸發(fā)checkpoint,JobManager會向每個數(shù)據(jù)源發(fā)送一個新的checkpoint編號冬耿,以此來啟動檢查點生成流程舌菜。

  • 圖3士葫,當(dāng)Source任務(wù)收到消息后菇夸,會停止發(fā)出數(shù)據(jù)帜平,然后利用狀態(tài)后端觸發(fā)生成本地狀態(tài)檢查點灶似,并把該checkpoint barrier以及checkpoint id廣播至所有傳出的數(shù)據(jù)流分區(qū)。狀態(tài)后端會在checkpoint完成之后通知任務(wù)磁玉,隨后任務(wù)會向Job Manager發(fā)送確認(rèn)消息驰唬。在將checkpoint barrier發(fā)出之后份汗,Source任務(wù)恢復(fù)正常工作绊起。


  • 圖4精拟,Source任務(wù)發(fā)出的checkpoint barrier會發(fā)送到與之相連的下游算子任務(wù),當(dāng)任務(wù)收到一個新的checkpoint barrier時虱歪,會繼續(xù)等待其他輸入分區(qū)的checkpoint barrier到來蜂绎,這個過程稱之為barrier 對齊,checkpoint barrier到來之前會把到來的數(shù)據(jù)線緩存起來笋鄙。
  • 圖5师枣,任務(wù)收齊了全部輸入分區(qū)的checkpoint barrier之后,會通知狀態(tài)后端開始生成checkpoint萧落,同時會把checkpoint barrier廣播至下游算子践美。
  • 圖6,任務(wù)在發(fā)出checkpoint barrier之后找岖,開始處理因barrier對齊產(chǎn)生的緩存數(shù)據(jù)拨脉,在緩存的數(shù)據(jù)處理完之后,就會繼續(xù)處理輸入流數(shù)據(jù)宣增。
  • 圖7,最終checkpoint barrier會被傳送到sink端矛缨,sink任務(wù)接收到checkpoint barrier之后爹脾,會向其他算子任務(wù)一樣,將自身的狀態(tài)寫入checkpoint箕昭,之后向Job Manager發(fā)送確認(rèn)消息灵妨。Job Manager接收到所有任務(wù)返回的確認(rèn)消息之后,就會將此次檢查點標(biāo)記為完成落竹。

使用案例

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// checkpoint的時間間隔泌霍,如果狀態(tài)比較大,可以適當(dāng)調(diào)大該值
env.enableCheckpointing(1000);
// 配置處理語義,默認(rèn)是exactly-once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 兩個checkpoint之間的最小時間間隔朱转,防止因checkpoint時間過長蟹地,導(dǎo)致checkpoint積壓
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoint執(zhí)行的上限時間,如果超過該閾值藤为,則會中斷checkpoint
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 最大并行執(zhí)行的檢查點數(shù)量怪与,默認(rèn)為1,可以指定多個缅疟,從而同時出發(fā)多個checkpoint分别,提升效率
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 設(shè)定周期性外部檢查點,將狀態(tài)數(shù)據(jù)持久化到外部系統(tǒng)中存淫,
// 使用該方式不會在任務(wù)正常停止的過程中清理掉檢查點數(shù)據(jù)
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

總結(jié)

本文首先從Flink的狀態(tài)入手耘斩,通過Spark的WordCount和Flink的Work Count進行說明什么是狀態(tài)。接著對狀態(tài)的分類以及狀態(tài)的使用進行了詳細(xì)說明桅咆。然后對Flink提供的三種狀態(tài)后端進行討論括授,并給出了狀態(tài)后端的使用說明。最后轧邪,以圖解加文字的形式詳細(xì)解釋了Flink的checkpoint機制刽脖,并給出了使用Checkpoint時的程序配置。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末忌愚,一起剝皮案震驚了整個濱河市曲管,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌硕糊,老刑警劉巖院水,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異简十,居然都是意外死亡檬某,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進店門螟蝙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來恢恼,“玉大人,你說我怎么就攤上這事胰默〕“撸” “怎么了?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵牵署,是天一觀的道長漏隐。 經(jīng)常有香客問我,道長奴迅,這世上最難降的妖魔是什么青责? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上脖隶,老公的妹妹穿的比我還像新娘扁耐。我一直安慰自己,他們只是感情好浩村,可當(dāng)我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布做葵。 她就那樣靜靜地躺著,像睡著了一般心墅。 火紅的嫁衣襯著肌膚如雪酿矢。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天怎燥,我揣著相機與錄音瘫筐,去河邊找鬼。 笑死铐姚,一個胖子當(dāng)著我的面吹牛策肝,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播隐绵,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼之众,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了依许?” 一聲冷哼從身側(cè)響起棺禾,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎峭跳,沒想到半個月后膘婶,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡蛀醉,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年悬襟,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片拯刁。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡脊岳,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出垛玻,到底是詐尸還是另有隱情割捅,我是刑警寧澤,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布夭谤,位于F島的核電站,受9級特大地震影響巫糙,放射性物質(zhì)發(fā)生泄漏朗儒。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望醉锄。 院中可真熱鬧乏悄,春花似錦、人聲如沸恳不。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽烟勋。三九已至规求,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間卵惦,已是汗流浹背阻肿。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留沮尿,地道東北人丛塌。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像畜疾,于是被迫代替她去往敵國和親赴邻。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,927評論 2 355