Flink是一個分布式的流處理引擎史飞,而流處理的其中一個特點就是7X24尖昏。那么,如何保障Flink作業(yè)的持續(xù)運行呢构资?Flink的內(nèi)部會將應用狀態(tài)(state)存儲到本地內(nèi)存或者嵌入式的kv數(shù)據(jù)庫(RocksDB)中抽诉,由于采用的是分布式架構,F(xiàn)link需要對本地生成的狀態(tài)進行持久化存儲吐绵,以避免因應用或者節(jié)點機器故障等原因導致數(shù)據(jù)的丟失迹淌,F(xiàn)link是通過checkpoint(檢查點)的方式將狀態(tài)寫入到遠程的持久化存儲,從而就可以實現(xiàn)不同語義的結果保障己单。通過本文唉窃,你可以了解到什么是Flink的狀態(tài),F(xiàn)link的狀態(tài)是怎么存儲的荷鼠,F(xiàn)link可選擇的狀態(tài)后端(statebackend)有哪些句携,什么是全局一致性檢查點,F(xiàn)link內(nèi)部如何通過檢查點實現(xiàn)Exactly Once的結果保障允乐。另外矮嫉,本文內(nèi)容較長削咆,建議關注加收藏。
什么是狀態(tài)
引子
關于什么是狀態(tài)蠢笋,我們先不做過多的分析拨齐。首先看一個代碼案例,其中案例1是Spark的WordCount代碼昨寞,案例2是Flink的WorkCount代碼瞻惋。
案例1:Spark WC
object WordCount {
? def main(args:Array[String]){
? val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
? val ssc = new StreamingContext(conf, Seconds(5))
? val lines = ssc.socketTextStream("localhost", 9999)
? val words = lines.flatMap(_.split(" "))
? val pairs = words.map(word => (word, 1))
? val wordCounts = pairs.reduceByKey(_ + _)
? wordCounts.print()
? ssc.start()
? ssc.awaitTermination()}}
輸入:
C:\WINDOWS\system32>nc -lp 9999
hello spark
hello spark
輸出:
案例2:Flink WC
public class WordCount {
? ? public static void main(String[] args) throws Exception {
? ? ? ? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
? ? ? ? DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);
? ? ? ? SingleOutputStreamOperator<Tuple2<String,Integer>> words = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception {
? ? ? ? ? ? ? ? String[] splits = value.split("\\s");
? ? ? ? ? ? ? ? for (String word : splits) {
? ? ? ? ? ? ? ? ? ? out.collect(Tuple2.of(word, 1));
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? });
? ? ? ? words.keyBy(0).sum(1).print();
? ? ? ? env.execute("WC");
? ? }}
輸入:
C:\WINDOWS\system32>nc -lp 9999
hello
Flink
hello
Flink
輸出:
從上面的兩個例子可以看出,在使用Spark進行詞頻統(tǒng)計時援岩,當前的統(tǒng)計結果不受歷史統(tǒng)計結果的影響歼狼,只計算接收的當前數(shù)據(jù)的結果,這個就可以理解為無狀態(tài)的計算享怀。再來看一下Flink的例子羽峰,可以看出當?shù)诙卧~頻統(tǒng)計時,把第一次的結果值也統(tǒng)計在了一起添瓷,即Flink把上一次的計算結果保存在了狀態(tài)里梅屉,第二次計算的時候會先拿到上一次的結果狀態(tài)隧土,然后結合新到來的數(shù)據(jù)再進行計算秽褒,這就可以理解成有狀態(tài)的計算喝峦,如下圖所示郑藏。
狀態(tài)的類別
Flink提供了兩種基本類型的狀態(tài):分別是Keyed State和Operator State。根據(jù)不同的狀態(tài)管理方式醒串,每種狀態(tài)又有兩種存在形式帖鸦,分別為:managed(托管狀態(tài))和raw(原生狀態(tài))易遣。具體如下表格所示咱筛。需要注意的是庶近,由于Flink推薦使用managed state,所以下文主要討論managed state眷蚓,對于raw state,本文不會做過多的討論反番。
managed state & raw state區(qū)別
Keyed State & Operator State
Keyed State
Keyed State只能由作用在KeyedStream上面的函數(shù)使用沙热,該狀態(tài)與某個key進行綁定,即每一個key對應一個state罢缸。Keyed State按照key進行維護和訪問的篙贸,F(xiàn)link會為每一個Key都維護一個狀態(tài)實例,該狀態(tài)實例總是位于處理該key記錄的算子任務上枫疆,因此同一個key的記錄可以訪問到一樣的狀態(tài)爵川。如下圖所示,可以通過在一條流上使用keyBy()方法來生成一個KeyedStream息楔。Flink提供了很多種keyed state寝贡,具體如下:
ValueState<T>
用于保存類型為T的單個值扒披。用戶可以通過ValueState.value()來獲取該狀態(tài)值,通過ValueState.update()來更新該狀態(tài)圃泡。使用ValueStateDescriptor來獲取狀態(tài)句柄碟案。
ListState<T>
用于保存類型為T的元素列表,即key的狀態(tài)值是一個列表颇蜡。用戶可以使用ListState.add()或者ListState.addAll()將新元素添加到列表中价说,通過ListState.get()訪問狀態(tài)元素,該方法會返回一個可遍歷所有元素的Iterable對象风秤,注意ListState不支持刪除單個元素鳖目,但是用戶可以使用update(List values)來更新整個列表。使用ListStateDescriptor來獲取狀態(tài)句柄缤弦。
ReducingState<T>
調用add()方法添加值時领迈,會立即返回一個使用ReduceFunction聚合后的值,用戶可以使用ReducingState.get()來獲取該狀態(tài)值甸鸟。使用ReducingStateDescriptor來獲取狀態(tài)句柄惦费。
AggregatingState<IN, OUT>
與ReducingState類似,不同的是它使用的是AggregateFunction來聚合內(nèi)部的值抢韭,AggregatingState.get()方法會計算最終的結果并將其返回薪贫。使用AggregatingStateDescriptor來獲取狀態(tài)句柄
MapState<UK, UV>
用于保存一組key、value的映射刻恭,類似于java的Map集合瞧省。用戶可以通過get(UK key)方法獲取key對應的狀態(tài),可以通過put(UK k,UV value)方法添加一個鍵值鳍贾,可以通過remove(UK key)刪除給定key的值鞍匾,可以通過contains(UK key)判斷是否存在對應的key。使用MapStateDescriptor來獲取狀態(tài)句柄骑科。
FoldingState<T, ACC>
在Flink 1.4的版本中標記過時橡淑,在未來的版本中會被移除,使用AggregatingState進行代替咆爽。
值得注意的是梁棠,上面的狀態(tài)原語都支持通過State.clear()方法來進行清除狀態(tài)。另外斗埂,上述的狀態(tài)原語僅用于與狀態(tài)進行交互符糊,真正的狀態(tài)是存儲在狀態(tài)后端(后面會介紹狀態(tài)后端)的,通過該狀態(tài)原語相當于持有了狀態(tài)的句柄(handle)呛凶。
keyed State使用案例
下面給出一個MapState的使用案例男娄,關于ValueState的使用情況可以參考官網(wǎng),具體如下:
public class MapStateExample {
? ? //統(tǒng)計每個用戶每種行為的個數(shù)
? ? public static class UserBehaviorCnt extends RichFlatMapFunction<Tuple3<Long, String, String>, Tuple3<Long, String, Integer>> {
? ? ? ? //定義一個MapState句柄
? ? ? ? private transient MapState<String, Integer> behaviorCntState;
? ? ? ? // 初始化狀態(tài)
? ? ? ? @Override
? ? ? ? public void open(Configuration parameters) throws Exception {
? ? ? ? ? ? super.open(parameters);
? ? ? ? ? ? MapStateDescriptor<String, Integer> userBehaviorMapStateDesc = new MapStateDescriptor<>(
? ? ? ? ? ? ? ? ? ? "userBehavior",? // 狀態(tài)描述符的名稱
? ? ? ? ? ? ? ? ? ? TypeInformation.of(new TypeHint<String>() {}),? // MapState狀態(tài)的key的數(shù)據(jù)類型
? ? ? ? ? ? ? ? ? ? TypeInformation.of(new TypeHint<Integer>() {})? // MapState狀態(tài)的value的數(shù)據(jù)類型
? ? ? ? ? ? );
? ? ? ? ? ? behaviorCntState = getRuntimeContext().getMapState(userBehaviorMapStateDesc); // 獲取狀態(tài)
? ? ? ? }
? ? ? ? @Override
? ? ? ? public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple3<Long, String, Integer>> out) throws Exception {
? ? ? ? ? ? Integer behaviorCnt = 1;
? ? ? ? ? ? // 如果當前狀態(tài)包括該行為,則+1
? ? ? ? ? ? if (behaviorCntState.contains(value.f1)) {
? ? ? ? ? ? ? ? behaviorCnt = behaviorCntState.get(value.f1) + 1;
? ? ? ? ? ? }
? ? ? ? ? ? // 更新狀態(tài)
? ? ? ? ? ? behaviorCntState.put(value.f1, behaviorCnt);
? ? ? ? ? ? out.collect(Tuple3.of(value.f0, value.f1, behaviorCnt));
? ? ? ? }
? ? }
? ? public static void main(String[] args) throws Exception {
? ? ? ? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
? ? ? ? // 模擬數(shù)據(jù)源[userId,behavior,product]
? ? ? ? DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements(
? ? ? ? ? ? ? ? Tuple3.of(1L, "buy", "iphone"),
? ? ? ? ? ? ? ? Tuple3.of(1L, "cart", "huawei"),
? ? ? ? ? ? ? ? Tuple3.of(1L, "buy", "logi"),
? ? ? ? ? ? ? ? Tuple3.of(1L, "fav", "oppo"),
? ? ? ? ? ? ? ? Tuple3.of(2L, "buy", "huawei"),
? ? ? ? ? ? ? ? Tuple3.of(2L, "buy", "onemore"),
? ? ? ? ? ? ? ? Tuple3.of(2L, "fav", "iphone"));
? ? ? ? userBehaviors
? ? ? ? ? ? ? ? .keyBy(0)
? ? ? ? ? ? ? ? .flatMap(new UserBehaviorCnt())
? ? ? ? ? ? ? ? .print();
? ? ? ? env.execute("MapStateExample");
? ? }}
結果輸出:
狀態(tài)的生命周期管理(TTL)
對于任何類型Keyed State都可以設定狀態(tài)的生命周期(TTL),即狀態(tài)的存活時間模闲,以確保能夠在規(guī)定時間內(nèi)及時地清理狀態(tài)數(shù)據(jù)建瘫。如果配置了狀態(tài)的TTL,那么當狀態(tài)過期時围橡,存儲的狀態(tài)會被清除暖混。狀態(tài)生命周期功能可以通過StateTtlConfig配置,然后將StateTtlConfig配置傳入StateDescriptor中的enableTimeToLive方法中即可翁授。代碼示例如下:
StateTtlConfig ttlConfig = StateTtlConfig
? ? ? ? ? ? ? ? // 指定TTL時長為10S
? ? ? ? ? ? ? ? .newBuilder(Time.seconds(10))
? ? ? ? ? ? ? ? // 只對創(chuàng)建和寫入操作有效
? ? ? ? ? ? ? ? .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
? ? ? ? ? ? ? ? // 不返回過期的數(shù)據(jù)
? ? ? ? ? ? ? ? .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
? ? ? ? ? ? ? ? .build();
? ? ? ? // 初始化狀態(tài)
? ? ? ? @Override
? ? ? ? public void open(Configuration parameters) throws Exception {
? ? ? ? ? ? super.open(parameters);
? ? ? ? ? ? MapStateDescriptor<String, Integer> userBehaviorMapStateDesc = new MapStateDescriptor<>(
? ? ? ? ? ? ? ? ? ? "userBehavior",? // 狀態(tài)描述符的名稱
? ? ? ? ? ? ? ? ? ? TypeInformation.of(new TypeHint<String>() {}),? // MapState狀態(tài)的key的數(shù)據(jù)類型
? ? ? ? ? ? ? ? ? ? TypeInformation.of(new TypeHint<Integer>() {})? // MapState狀態(tài)的value的數(shù)據(jù)類型
? ? ? ? ? ? );
? ? ? ? ? ? // 設置stateTtlConfig
? ? ? ? ? ? userBehaviorMapStateDesc.enableTimeToLive(ttlConfig);
? ? ? ? ? ? behaviorCntState = getRuntimeContext().getMapState(userBehaviorMapStateDesc); // 獲取狀態(tài)
? ? ? ? }
在StateTtlConfig創(chuàng)建時拣播,newBuilder方法是必須要指定的,newBuilder中設定過期時間的參數(shù)收擦。對于其他參數(shù)都是可選的或使用默認值贮配。其中setUpdateType方法中傳入的類型有三種:
public enum UpdateType {
? ? ? ? //禁用TTL,永遠不會過期
? ? ? ? Disabled,
? ? ? ? // 創(chuàng)建和寫入時更新TTL
? ? ? ? OnCreateAndWrite,
? ? ? ? // 與OnCreateAndWrite類似,但是在讀操作時也會更新TTL
? ? ? ? OnReadAndWrite
? ? }
值得注意的是塞赂,過期的狀態(tài)數(shù)據(jù)根據(jù)UpdateType參數(shù)進行配置泪勒,只有被寫入或者讀取的時間才會更新TTL,也就是說如果某個狀態(tài)指標一直不被使用或者更新宴猾,則永遠不會觸發(fā)對該狀態(tài)數(shù)據(jù)的清理操作圆存,這種情況可能會導致系統(tǒng)中的狀態(tài)數(shù)據(jù)越來越大。目前用戶可以使用StateTtlConfig.cleanupFullSnapshot設定當觸發(fā)State Snapshot的時候清理狀態(tài)數(shù)據(jù)仇哆,但是改配置不適合用于RocksDB做增量Checkpointing的操作沦辙。
上面的StateTtlConfig創(chuàng)建時,可以指定setStateVisibility讹剔,用于狀態(tài)的可見性配置油讯,根據(jù)過期數(shù)據(jù)是否被清理來確定是否返回狀態(tài)數(shù)據(jù)。
/**
? ? * 是否返回過期的數(shù)據(jù)
? ? */
? ? public enum StateVisibility {
? ? ? ? //如果數(shù)據(jù)沒有被清理延欠,就可以返回
? ? ? ? ReturnExpiredIfNotCleanedUp,
? ? ? ? //永遠不返回過期的數(shù)據(jù),默認值
? ? ? ? NeverReturnExpired
? ? }
Operator State
Operator State的作用于是某個算子任務陌兑,這意味著所有在同一個并行任務之內(nèi)的記錄都能訪問到相同的狀態(tài) 。算子狀態(tài)不能通過其他任務訪問由捎,無論該任務是相同的算子兔综。如下圖所示。
Operator State是一種non-keyed state狞玛,與并行的操作算子實例相關聯(lián)邻奠,例如在Kafka Connector中,每個Kafka消費端算子實例都對應到Kafka的一個分區(qū)中为居,維護Topic分區(qū)和Offsets偏移量作為算子的Operator State。在Flink中可以實現(xiàn)ListCheckpointed<T extends Serializable>接口或者CheckpointedFunction 接口來實現(xiàn)一個Operator State杀狡。
首先蒙畴,我們先看一下這兩個接口的具體實現(xiàn),然后再給出這兩種接口的具體使用案例。先看一下ListCheckpointed接口的源碼膳凝,如下:
public interface ListCheckpointed<T extends Serializable> {
/**
* 獲取某個算子實例的當前狀態(tài)碑隆,該狀態(tài)包括該算子實例之前被調用時的所有結果
* 以列表的形式返回一個函數(shù)狀態(tài)的快照
* Flink觸發(fā)生成檢查點時調用該方法
* @param checkpointId checkpoint的ID,是一個唯一的、單調遞增的值
* @param timestamp Job Manager觸發(fā)checkpoint時的時間戳
* @return? 返回一個operator state list,如果為null時,返回空list
* @throws Exception
*/
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;/**
* 初始化函數(shù)狀態(tài)時調用蹬音,可能是在作業(yè)啟動時或者故障恢復時
* 根據(jù)提供的列表恢復函數(shù)狀態(tài)
* 注意:當實現(xiàn)該方法時上煤,需要在RichFunction#open()方法之前調用該方法
* @param state 被恢復算子實例的state列表 ,可能為空
* @throws Exception
*/
void restoreState(List<T> state) throws Exception;}
使用Operator ListState時著淆,在進行擴縮容時劫狠,重分布的策略(狀態(tài)恢復的模式)如下圖所示:
上面的重分布策略為Even-split Redistribution,即每個算子實例中含有部分狀態(tài)元素的List列表永部,整個狀態(tài)數(shù)據(jù)是所有List列表的合集独泞。當觸發(fā)restore/redistribution動作時,通過將狀態(tài)數(shù)據(jù)平均分配成與算子并行度相同數(shù)量的List列表苔埋,每個task實例中有一個List懦砂,其可以為空或者含有多個元素。
我們再來看一下CheckpointedFunction接口组橄,源碼如下:
public interface CheckpointedFunction {
/**
* 會在生成檢查點之前調用
* 該方法的目的是確保檢查點開始之前所有狀態(tài)對象都已經(jīng)更新完畢
* @param context 使用FunctionSnapshotContext作為參數(shù)
*? ? ? ? ? ? ? ? 從FunctionSnapshotContext可以獲取checkpoint的元數(shù)據(jù)信息荞膘,
*? ? ? ? ? ? ? ? 比如checkpoint編號,JobManager在初始化checkpoint時的時間戳
* @throws Exception
*/
void snapshotState(FunctionSnapshotContext context) throws Exception;
/**
* 在創(chuàng)建checkpointedFunction的并行實例時被調用玉工,
* 在應用啟動或者故障重啟時觸發(fā)該方法的調用
* @param context 傳入FunctionInitializationContext對象羽资,
*? ? ? ? ? ? ? ? ? 可以使用該對象訪問OperatorStateStore和 KeyedStateStore對象,
*? ? ? ? ? ? ? ? ? 這兩個對象可以獲取狀態(tài)的句柄瓮栗,即通過Flink runtime來注冊函數(shù)狀態(tài)并返回state對象
*? ? ? ? ? ? ? ? ? 比如:ValueState削罩、ListState等
* @throws Exception
*/
void initializeState(FunctionInitializationContext context) throws Exception;}
CheckpointedFunction接口是用于指定有狀態(tài)函數(shù)的最底層的接口,該接口提供了用于注冊和維護keyed state 與operator state的hook(即可以同時使用keyed state 和operator state)费奸,另外也是唯一支持使用list union state弥激。關于Union List State,使用的是Flink為Operator state提供的另一種重分布的策略:Union Redistribution,即每個算子實例中含有所有狀態(tài)元素的List列表愿阐,當觸發(fā)restore/redistribution動作時微服,每個算子都能夠獲取到完整的狀態(tài)元素列表。具體如下圖所示:
ListCheckpointed
ListCheckpointed接口和CheckpointedFunction接口相比在靈活性上相對弱一些缨历,只能支持List類型的狀態(tài)以蕴,并且在數(shù)據(jù)恢復的時候僅支持even-redistribution策略。該接口不像Flink提供的Keyed State(比如Value State辛孵、ListState)那樣直接在狀態(tài)后端(state backend)注冊丛肮,需要將operator state實現(xiàn)為成員變量,然后通過接口提供的回調函數(shù)與狀態(tài)后端進行交互魄缚。使用代碼案例如下
public class ListCheckpointedExample {
? ? private static class UserBehaviorCnt extends RichFlatMapFunction<Tuple3<Long, String, String>, Tuple2<String, Long>> implements ListCheckpointed<Long> {
? ? ? ? private Long userBuyBehaviorCnt = 0L;
? ? ? ? @Override
? ? ? ? public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple2<String, Long>> out) throws Exception {
? ? ? ? ? ? if(value.f1.equals("buy")){
? ? ? ? ? ? ? ? userBuyBehaviorCnt ++;
? ? ? ? ? ? ? ? out.collect(Tuple2.of("buy",userBuyBehaviorCnt));
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? @Override
? ? ? ? public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
? ? ? ? ? ? //返回單個元素的List集合宝与,該集合元素是用戶購買行為的數(shù)量
? ? ? ? ? ? return Collections.singletonList(userBuyBehaviorCnt);
? ? ? ? }
? ? ? ? @Override
? ? ? ? public void restoreState(List<Long> state) throws Exception {
? ? ? ? ? ? // 在進行擴縮容之后焚廊,進行狀態(tài)恢復,需要把其他subtask的狀態(tài)加在一起
? ? ? ? ? ? for (Long cnt : state) {
? ? ? ? ? ? ? ? userBuyBehaviorCnt += 1;
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static void main(String[] args) throws Exception {
? ? ? ? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
? ? ? ? // 模擬數(shù)據(jù)源[userId,behavior,product]
? ? ? ? DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements(
? ? ? ? ? ? ? ? Tuple3.of(1L, "buy", "iphone"),
? ? ? ? ? ? ? ? Tuple3.of(1L, "cart", "huawei"),
? ? ? ? ? ? ? ? Tuple3.of(1L, "buy", "logi"),
? ? ? ? ? ? ? ? Tuple3.of(1L, "fav", "oppo"),
? ? ? ? ? ? ? ? Tuple3.of(2L, "buy", "huawei"),
? ? ? ? ? ? ? ? Tuple3.of(2L, "buy", "onemore"),
? ? ? ? ? ? ? ? Tuple3.of(2L, "fav", "iphone"));
? ? ? ? userBehaviors
? ? ? ? ? ? ? ? .flatMap(new UserBehaviorCnt())
? ? ? ? ? ? ? ? .print();
? ? ? ? env.execute("ListCheckpointedExample");
? ? }
}
CheckpointedFunction
CheckpointedFunction接口提供了更加豐富的操作习劫,比如支持Union list state咆瘟,可以訪問keyedState,關于重分布策略诽里,如果使用Even-split Redistribution策略袒餐,則通過context. getListState(descriptor)獲取Operator State;如果使用UnionRedistribution策略谤狡,則通過context. getUnionList State(descriptor)來獲取灸眼。使用案例如下:
public class CheckpointFunctionExample {
? ? private static class UserBehaviorCnt implements CheckpointedFunction, FlatMapFunction<Tuple3<Long, String, String>, Tuple3<Long, Long, Long>> {
? ? ? ? // 統(tǒng)計每個operator實例的用戶行為數(shù)量的本地變量
? ? ? ? private Long opUserBehaviorCnt = 0L;
? ? ? ? // 每個key的state,存儲key對應的相關狀態(tài)
? ? ? ? private ValueState<Long> keyedCntState;
? ? ? ? // 定義operator state,存儲算子的狀態(tài)
? ? ? ? private ListState<Long> opCntState;
? ? ? ? @Override
? ? ? ? public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple3<Long, Long, Long>> out) throws Exception {
? ? ? ? ? ? if (value.f1.equals("buy")) {
? ? ? ? ? ? ? ? // 更新算子狀態(tài)本地變量值
? ? ? ? ? ? ? ? opUserBehaviorCnt += 1;
? ? ? ? ? ? ? ? Long keyedCount = keyedCntState.value();
? ? ? ? ? ? ? ? // 更新keyedstate的狀態(tài) ,判斷狀態(tài)是否為null豌汇,否則空指針異常
? ? ? ? ? ? ? ? keyedCntState.update(keyedCount == null ? 1L : keyedCount + 1 );
? ? ? ? ? ? ? ? // 結果輸出
? ? ? ? ? ? ? ? out.collect(Tuple3.of(value.f0, keyedCntState.value(), opUserBehaviorCnt));
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? @Override
? ? ? ? public void snapshotState(FunctionSnapshotContext context) throws Exception {
? ? ? ? ? ? // 使用opUserBehaviorCnt本地變量更新operator state
? ? ? ? ? ? opCntState.clear();
? ? ? ? ? ? opCntState.add(opUserBehaviorCnt);
? ? ? ? }
? ? ? ? @Override
? ? ? ? public void initializeState(FunctionInitializationContext context) throws Exception {
? ? ? ? ? ? // 通過KeyedStateStore,定義keyedState的StateDescriptor描述符
? ? ? ? ? ? ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("keyedCnt", TypeInformation.of(new TypeHint<Long>() {
? ? ? ? ? ? }));
? ? ? ? ? ? // 通過OperatorStateStore,定義OperatorState的StateDescriptor描述符
? ? ? ? ? ? ListStateDescriptor opStateDescriptor = new ListStateDescriptor("opCnt", TypeInformation.of(new TypeHint<Long>() {
? ? ? ? ? ? }));
? ? ? ? ? ? // 初始化keyed state狀態(tài)值
? ? ? ? ? ? keyedCntState = context.getKeyedStateStore().getState(valueStateDescriptor);
? ? ? ? ? ? // 初始化operator state狀態(tài)
? ? ? ? ? ? opCntState = context.getOperatorStateStore().getListState(opStateDescriptor);
? ? ? ? ? ? // 初始化本地變量operator state
? ? ? ? ? ? for (Long state : opCntState.get()) {
? ? ? ? ? ? ? ? opUserBehaviorCnt += state;
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static void main(String[] args) throws Exception {
? ? ? ? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
? ? ? ? // 模擬數(shù)據(jù)源[userId,behavior,product]
? ? ? ? DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements(
? ? ? ? ? ? ? ? Tuple3.of(1L, "buy", "iphone"),
? ? ? ? ? ? ? ? Tuple3.of(1L, "cart", "huawei"),
? ? ? ? ? ? ? ? Tuple3.of(1L, "buy", "logi"),
? ? ? ? ? ? ? ? Tuple3.of(1L, "fav", "oppo"),
? ? ? ? ? ? ? ? Tuple3.of(2L, "buy", "huawei"),
? ? ? ? ? ? ? ? Tuple3.of(2L, "buy", "onemore"),
? ? ? ? ? ? ? ? Tuple3.of(2L, "fav", "iphone"));
? ? ? ? userBehaviors
? ? ? ? ? ? ? ? .keyBy(0)
? ? ? ? ? ? ? ? .flatMap(new UserBehaviorCnt())
? ? ? ? ? ? ? ? .print();
? ? ? ? env.execute("CheckpointFunctionExample");
? ? }
}
什么是狀態(tài)后端
上面使用的狀態(tài)都需要存儲到狀態(tài)后端(StateBackend)幢炸,然后在checkpoint觸發(fā)時,將狀態(tài)持久化到外部存儲系統(tǒng)拒贱。Flink提供了三種類型的狀態(tài)后端宛徊,分別是基于內(nèi)存的狀態(tài)后端(MemoryStateBackend、基于文件系統(tǒng)的狀態(tài)后端(FsStateBackend)以及基于RockDB作為存儲介質的RocksDB StateBackend逻澳。這三種類型的StateBackend都能夠有效地存儲Flink流式計算過程中產(chǎn)生的狀態(tài)數(shù)據(jù)闸天,在默認情況下Flink使用的是MemoryStateBackend,區(qū)別見下表斜做。下面分別對每種狀態(tài)后端的特點進行說明苞氮。
狀態(tài)后端的類別
MemoryStateBackend
MemoryStateBackend將狀態(tài)數(shù)據(jù)全部存儲在JVM堆內(nèi)存中,包括用戶在使用DataStream API中創(chuàng)建的Key/Value State瓤逼,窗口中緩存的狀態(tài)數(shù)據(jù)笼吟,以及觸發(fā)器等數(shù)據(jù)。MemoryStateBackend具有非嘲云欤快速和高效的特點贷帮,但也具有非常多的限制,最主要的就是內(nèi)存的容量限制诱告,一旦存儲的狀態(tài)數(shù)據(jù)過多就會導致系統(tǒng)內(nèi)存溢出等問題撵枢,從而影響整個應用的正常運行。同時如果機器出現(xiàn)問題精居,整個主機內(nèi)存中的狀態(tài)數(shù)據(jù)都會丟失锄禽,進而無法恢復任務中的狀態(tài)數(shù)據(jù)。因此從數(shù)據(jù)安全的角度建議用戶盡可能地避免在生產(chǎn)環(huán)境中使用MemoryStateBackend靴姿。Flink將MemoryStateBackend作為默認狀態(tài)后端沃但。
MemoryStateBackend比較適合用于測試環(huán)境中,并用于本地調試和驗證佛吓,不建議在生產(chǎn)環(huán)境中使用宵晚。但如果應用狀態(tài)數(shù)據(jù)量不是很大恨旱,例如使用了大量的非狀態(tài)計算算子,也可以在生產(chǎn)環(huán)境中使MemoryStateBackend.
FsStateBackend
FsStateBackend是基于文件系統(tǒng)的一種狀態(tài)后端坝疼,這里的文件系統(tǒng)可以是本地文件系統(tǒng),也可以是HDFS分布式文件系統(tǒng)谆沃。創(chuàng)建FsStateBackend的構造函數(shù)如下:
FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots)
其中path如果為本地路徑钝凶,其格式為“file:///data/flink/checkpoints”,如果path為HDFS路徑唁影,其格式為“hdfs://nameservice/flink/checkpoints”耕陷。FsStateBackend中第二個Boolean類型的參數(shù)指定是否以同步的方式進行狀態(tài)數(shù)據(jù)記錄,默認采用異步的方式將狀態(tài)數(shù)據(jù)同步到文件系統(tǒng)中据沈,異步方式能夠盡可能避免在Checkpoint的過程中影響流式計算任務哟沫。如果用戶想采用同步的方式進行狀態(tài)數(shù)據(jù)的檢查點數(shù)據(jù),則將第二個參數(shù)指定為True即可锌介。
相比于MemoryStateBackend, FsStateBackend更適合任務狀態(tài)非常大的情況嗜诀,例如應用中含有時間范圍非常長的窗口計算,或Key/value State狀態(tài)數(shù)據(jù)量非常大的場景孔祸,這時系統(tǒng)內(nèi)存不足以支撐狀態(tài)數(shù)據(jù)的存儲隆敢。同時FsStateBackend最大的好處是相對比較穩(wěn)定,在checkpoint時崔慧,將狀態(tài)持久化到像HDFS分布式文件系統(tǒng)中拂蝎,能最大程度保證狀態(tài)數(shù)據(jù)的安全性。
RocksDBStateBackend
與前面的狀態(tài)后端不同惶室,RocksDBStateBackend需要單獨引入相關的依賴包温自。RocksDB 是一個 key/value 的內(nèi)存存儲系統(tǒng),類似于HBase皇钞,是一種內(nèi)存磁盤混合的 LSM DB悼泌。當寫數(shù)據(jù)時會先寫進write buffer(類似于HBase的memstore),然后在flush到磁盤文件鹅士,當讀取數(shù)據(jù)時會現(xiàn)在block cache(類似于HBase的block cache)券躁,所以速度會很快。
RocksDBStateBackend在性能上要比FsStateBackend高一些掉盅,主要是因為借助于RocksDB存儲了最新熱數(shù)據(jù)也拜,然后通過異步的方式再同步到文件系統(tǒng)中,但RocksDBStateBackend和MemoryStateBackend相比性能就會較弱一些趾痘。
需要注意 RocksDB 不支持同步的 Checkpoint慢哈,構造方法中沒有同步快照這個選項。不過 RocksDB 支持增量的 Checkpoint永票,也是目前唯一增量 Checkpoint 的 Backend卵贱,意味著并不需要把所有 sst 文件上傳到 Checkpoint 目錄滥沫,僅需要上傳新生成的 sst 文件即可。它的 Checkpoint 存儲在外部文件系統(tǒng)(本地或HDFS)键俱,其容量限制只要單個 TaskManager 上 State 總量不超過它的內(nèi)存+磁盤兰绣,單 Key最大 2G,總大小不超過配置的文件系統(tǒng)容量即可编振。對于超大狀態(tài)的作業(yè)缀辩,例如天級窗口聚合等場景下可以使會用該狀態(tài)后端。
配置狀態(tài)后端
Flink默認使用的狀態(tài)后端是MemoryStateBackend踪央,所以不需要顯示配置臀玄。對于其他的狀態(tài)后端,都需要進行顯性配置畅蹂。在Flink中包含了兩種級別的StateBackend配置:一種是在程序中進行配置健无,該配置只對當前應用有效;另外一種是通過flink-conf.yaml進行全局配置液斜,一旦配置就會對整個Flink集群上的所有應用有效累贤。
應用級別配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
如果使用RocksDBStateBackend則需要單獨引入rockdb依賴庫,如下:
<dependency>
? ? <groupId>org.apache.flink</groupId>
? ? <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
? ? <version>1.10.0</version>
? ? <scope>provided</scope></dependency>
使用方式與FsStateBackend類似,如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));
集群級別配置
具體的配置項在flink-conf.yaml文件中旗唁,如下代碼所示畦浓,參數(shù)state.backend指明StateBackend類型,state.checkpoints.dir配置具體的狀態(tài)存儲路徑检疫,代碼中使用filesystem作為StateBackend讶请,然后指定相應的HDFS文件路徑作為state的checkpoint文件夾。
# 使用filesystem存儲
state.backend:filesystem
# checkpoint存儲路徑
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
如果想用RocksDBStateBackend配置集群級別的狀態(tài)后端屎媳,可以使用下面的配置:
# 操作RocksDBStateBackend的線程數(shù)量夺溢,默認值為1
state.backend.rocksdb.checkpoint.transfer.thread.num: 1# 指定RocksDB存儲狀態(tài)數(shù)據(jù)的本地文件路徑
state.backend.rocksdb.localdir: /var/rockdb/checkpoints
# 用于指定定時器服務的工廠類實現(xiàn)類,默認為“HEAP”烛谊,也可以指定為“RocksDB”
state.backend.rocksdb.timer-service.factory: HEAP
什么是Checkpoint(檢查點)
上面講解了Flink的狀態(tài)以及狀態(tài)后端风响,狀態(tài)是存儲在狀態(tài)后端。為了保證state容錯丹禀,F(xiàn)link提供了處理故障的措施状勤,這種措施稱之為checkpoint(一致性檢查點)。checkpoint是Flink實現(xiàn)容錯的核心功能双泪,主要是周期性地觸發(fā)checkpoint持搜,將state生成快照持久化到外部存儲系統(tǒng)(比如HDFS)。這樣一來焙矛,如果Flink程序出現(xiàn)故障葫盼,那么就可以從上一次checkpoint中進行狀態(tài)恢復,從而提供容錯保障村斟。另外贫导,通過checkpoint機制抛猫,F(xiàn)link可以實現(xiàn)Exactly-once語義(Flink內(nèi)部的Exactly-once,關于端到端的exactly_once,Flink是通過兩階段提交協(xié)議實現(xiàn)的)。下面將會詳細分析Flink的checkpoint機制孩灯。
檢查點的生成
如上圖闺金,輸入流是用戶行為數(shù)據(jù),包括購買(buy)和加入購物車(cart)兩種峰档,每種行為數(shù)據(jù)都有一個偏移量掖看,統(tǒng)計每種行為的個數(shù)。
第一步:JobManager checkpoint coordinator 觸發(fā)checkpoint面哥。
第二步:假設當消費到[cart,3]這條數(shù)據(jù)時毅待,觸發(fā)了checkpoint尚卫。那么此時數(shù)據(jù)源會把消費的偏移量3寫入持久化存儲。
第三步:當寫入結束后尸红,source會將state handle(狀態(tài)存儲路徑)反饋給JobManager的checkpoint coordinator吱涉。
第四步:接著算子count buy與count cart也會進行同樣的步驟
第五步:等所有的算子都完成了上述步驟之后,即當 Checkpoint coordinator 收集齊所有 task 的 state handle外里,就認為這一次的 Checkpoint 全局完成了怎爵,向持久化存儲中再備份一個 Checkpoint meta 文件,那么整個checkpoint也就完成了盅蝗,如果中間有一個不成功鳖链,那么本次checkpoin就宣告失敗。
檢查點的恢復
通過上面的分析墩莫,或許你已經(jīng)對Flink的checkpoint有了初步的認識芙委。那么接下來,我們看一下是如何從檢查點恢復的狂秦。
任務失敗
重啟作業(yè)
恢復檢查點
繼續(xù)處理數(shù)據(jù)
上述過程具體總結如下:
第一步:重啟作業(yè)
第二步:從上一次檢查點恢復狀態(tài)數(shù)據(jù)
第三步:繼續(xù)處理新的數(shù)據(jù)
Flink內(nèi)部Exactly-Once實現(xiàn)
Flink提供了精確一次的處理語義灌侣,精確一次的處理語義可以理解為:數(shù)據(jù)可能會重復計算,但是結果狀態(tài)只有一個裂问。Flink通過Checkpoint機制實現(xiàn)了精確一次的處理語義侧啼,F(xiàn)link在觸發(fā)Checkpoint時會向Source端插入checkpoint barrier,checkpoint barriers是從source端插入的堪簿,并且會向下游算子進行傳遞痊乾。checkpoint barriers攜帶一個checkpoint ID,用于標識屬于哪一個checkpoint戴甩,checkpoint barriers將流邏輯是哪個分為了兩部分符喝。對于雙流的情況,通過barrier對齊的方式實現(xiàn)精確一次的處理語義甜孤。
關于什么是checkpoint barrier协饲,可以看一下CheckpointBarrier類的源碼描述畏腕,如下:
/**
* Checkpoint barriers用來在數(shù)據(jù)流中實現(xiàn)checkpoint對齊的.
* Checkpoint barrier由JobManager的checkpoint coordinator插入到Source中,
* Source會把barrier廣播發(fā)送到下游算子,當一個算子接收到了其中一個輸入流的Checkpoint barrier時,
* 它就會知道已經(jīng)處理完了本次checkpoint與上次checkpoint之間的數(shù)據(jù).
*
* 一旦某個算子接收到了所有輸入流的checkpoint barrier時,
* 意味著該算子的已經(jīng)處理完了截止到當前checkpoint的數(shù)據(jù)茉稠,
* 可以觸發(fā)checkpoint描馅,并將barrier向下游傳遞
*
* 根據(jù)用戶選擇的處理語義,在checkpoint完成之前會緩存后一次checkpoint的數(shù)據(jù)而线,
* 直到本次checkpoint完成(exactly once)
*
* checkpoint barrier的id是嚴格單調遞增的
*
*/
public class CheckpointBarrier extends RuntimeEvent {...}
可以看出checkpoint barrier主要功能是實現(xiàn)checkpoint對齊的铭污,從而可以實現(xiàn)Exactly-Once處理語義。
下面將會對checkpoint過程進行分解膀篮,具體如下:
圖1嘹狞,包括兩個流,每個任務都會消費一條用戶行為數(shù)據(jù)(包括購買(buy)和加購(cart))誓竿,數(shù)字代表該數(shù)據(jù)的偏移量磅网,count buy任務統(tǒng)計購買行為的個數(shù),coun cart統(tǒng)計加購行為的個數(shù)筷屡。
圖2涧偷,觸發(fā)checkpoint,JobManager會向每個數(shù)據(jù)源發(fā)送一個新的checkpoint編號毙死,以此來啟動檢查點生成流程燎潮。
圖3,當Source任務收到消息后扼倘,會停止發(fā)出數(shù)據(jù)确封,然后利用狀態(tài)后端觸發(fā)生成本地狀態(tài)檢查點,并把該checkpoint barrier以及checkpoint id廣播至所有傳出的數(shù)據(jù)流分區(qū)再菊。狀態(tài)后端會在checkpoint完成之后通知任務隅肥,隨后任務會向Job Manager發(fā)送確認消息。在將checkpoint barrier發(fā)出之后袄简,Source任務恢復正常工作腥放。
圖4,Source任務發(fā)出的checkpoint barrier會發(fā)送到與之相連的下游算子任務绿语,當任務收到一個新的checkpoint barrier時秃症,會繼續(xù)等待其他輸入分區(qū)的checkpoint barrier到來,這個過程稱之為barrier 對齊吕粹,checkpoint barrier到來之前會把到來的數(shù)據(jù)線緩存起來种柑。
圖5,任務收齊了全部輸入分區(qū)的checkpoint barrier之后匹耕,會通知狀態(tài)后端開始生成checkpoint聚请,同時會把checkpoint barrier廣播至下游算子。
圖6,任務在發(fā)出checkpoint barrier之后驶赏,開始處理因barrier對齊產(chǎn)生的緩存數(shù)據(jù)炸卑,在緩存的數(shù)據(jù)處理完之后,就會繼續(xù)處理輸入流數(shù)據(jù)煤傍。
圖7盖文,最終checkpoint barrier會被傳送到sink端,sink任務接收到checkpoint barrier之后蚯姆,會向其他算子任務一樣五续,將自身的狀態(tài)寫入checkpoint,之后向Job Manager發(fā)送確認消息龄恋。Job Manager接收到所有任務返回的確認消息之后疙驾,就會將此次檢查點標記為完成。
使用案例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpoint的時間間隔郭毕,如果狀態(tài)比較大荆萤,可以適當調大該值
env.enableCheckpointing(1000);
// 配置處理語義,默認是exactly-once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 兩個checkpoint之間的最小時間間隔铣卡,防止因checkpoint時間過長,導致checkpoint積壓
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoint執(zhí)行的上限時間偏竟,如果超過該閾值煮落,則會中斷checkpoint
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 最大并行執(zhí)行的檢查點數(shù)量,默認為1踊谋,可以指定多個蝉仇,從而同時出發(fā)多個checkpoint,提升效率
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 設定周期性外部檢查點殖蚕,將狀態(tài)數(shù)據(jù)持久化到外部系統(tǒng)中轿衔,
// 使用該方式不會在任務正常停止的過程中清理掉檢查點數(shù)據(jù)
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
總結
本文首先從Flink的狀態(tài)入手,通過Spark的WordCount和Flink的Work Count進行說明什么是狀態(tài)睦疫。接著對狀態(tài)的分類以及狀態(tài)的使用進行了詳細說明害驹。然后對Flink提供的三種狀態(tài)后端進行討論,并給出了狀態(tài)后端的使用說明蛤育。最后,以圖解加文字的形式詳細解釋了Flink的checkpoint機制,并給出了使用Checkpoint時的程序配置使碾。