Flink是一個分布式的流處理引擎缕碎,而流處理的其中一個特點就是7X24。那么池户,如何保障Flink作業(yè)的持續(xù)運行呢咏雌?Flink的內(nèi)部會將應(yīng)用狀態(tài)(state)存儲到本地內(nèi)存或者嵌入式的kv數(shù)據(jù)庫(RocksDB)中,由于采用的是分布式架構(gòu)校焦,F(xiàn)link需要對本地生成的狀態(tài)進行持久化存儲赊抖,以避免因應(yīng)用或者節(jié)點機器故障等原因?qū)е聰?shù)據(jù)的丟失,F(xiàn)link是通過checkpoint(檢查點)的方式將狀態(tài)寫入到遠程的持久化存儲寨典,從而就可以實現(xiàn)不同語義的結(jié)果保障氛雪。通過本文,你可以了解到什么是Flink的狀態(tài)耸成,F(xiàn)link的狀態(tài)是怎么存儲的报亩,F(xiàn)link可選擇的狀態(tài)后端(statebackend)有哪些,什么是全局一致性檢查點井氢,F(xiàn)link內(nèi)部如何通過檢查點實現(xiàn)Exactly Once的結(jié)果保障弦追。另外,本文內(nèi)容較長毙沾,建議關(guān)注加收藏骗卜。
什么是狀態(tài)
引子
關(guān)于什么是狀態(tài),我們先不做過多的分析左胞。首先看一個代碼案例寇仓,其中案例1是Spark的WordCount代碼,案例2是Flink的WorkCount代碼烤宙。
- 案例1:Spark WC
object WordCount {
def main(args:Array[String]){
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
輸入:
C:\WINDOWS\system32>nc -lp 9999
hello spark
hello spark
輸出:
- 案例2:Flink WC
public class WordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String,Integer>> words = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String,Integer>> out) throws Exception {
String[] splits = value.split("\\s");
for (String word : splits) {
out.collect(Tuple2.of(word, 1));
}
}
});
words.keyBy(0).sum(1).print();
env.execute("WC");
}
}
輸入:
C:\WINDOWS\system32>nc -lp 9999
hello Flink
hello Flink
輸出:
從上面的兩個例子可以看出遍烦,在使用Spark進行詞頻統(tǒng)計時,當(dāng)前的統(tǒng)計結(jié)果不受歷史統(tǒng)計結(jié)果的影響躺枕,只計算接收的當(dāng)前數(shù)據(jù)的結(jié)果服猪,這個就可以理解為無狀態(tài)的計算。再來看一下Flink的例子拐云,可以看出當(dāng)?shù)诙卧~頻統(tǒng)計時罢猪,把第一次的結(jié)果值也統(tǒng)計在了一起,即Flink把上一次的計算結(jié)果保存在了狀態(tài)里叉瘩,第二次計算的時候會先拿到上一次的結(jié)果狀態(tài)膳帕,然后結(jié)合新到來的數(shù)據(jù)再進行計算,這就可以理解成有狀態(tài)的計算薇缅,如下圖所示危彩。
狀態(tài)的類別
Flink提供了兩種基本類型的狀態(tài):分別是 Keyed State
和Operator State
攒磨。根據(jù)不同的狀態(tài)管理方式,每種狀態(tài)又有兩種存在形式汤徽,分別為:managed(托管狀態(tài))
和raw(原生狀態(tài))
娩缰。具體如下表格所示。需要注意的是谒府,由于Flink推薦使用managed state拼坎,所以下文主要討論managed state,對于raw state狱掂,本文不會做過多的討論演痒。
managed state & raw state區(qū)別
Keyed State & Operator State
Keyed State
Keyed State只能由作用在KeyedStream上面的函數(shù)使用,該狀態(tài)與某個key進行綁定趋惨,即每一個key對應(yīng)一個state鸟顺。Keyed State按照key進行維護和訪問的,F(xiàn)link會為每一個Key都維護一個狀態(tài)實例器虾,該狀態(tài)實例總是位于處理該key記錄的算子任務(wù)上讯嫂,因此同一個key的記錄可以訪問到一樣的狀態(tài)。如下圖所示兆沙,可以通過在一條流上使用keyBy()方法來生成一個KeyedStream欧芽。Flink提供了很多種keyed state,具體如下:
- ValueState<T>
用于保存類型為T的單個值葛圃。用戶可以通過ValueState.value()來獲取該狀態(tài)值千扔,通過ValueState.update()來更新該狀態(tài)。使用ValueStateDescriptor
來獲取狀態(tài)句柄库正。
- ListState<T>
用于保存類型為T的元素列表曲楚,即key的狀態(tài)值是一個列表。用戶可以使用ListState.add()或者ListState.addAll()將新元素添加到列表中褥符,通過ListState.get()訪問狀態(tài)元素龙誊,該方法會返回一個可遍歷所有元素的Iterable<T>對象,注意ListState不支持刪除單個元素喷楣,但是用戶可以使用update(List<T> values)來更新整個列表趟大。使用 ListStateDescriptor
來獲取狀態(tài)句柄。
- ReducingState<T>
調(diào)用add()方法添加值時铣焊,會立即返回一個使用ReduceFunction聚合后的值逊朽,用戶可以使用ReducingState.get()來獲取該狀態(tài)值。使用 ReducingStateDescriptor
來獲取狀態(tài)句柄曲伊。
- AggregatingState<IN, OUT>
與ReducingState<T>類似惋耙,不同的是它使用的是AggregateFunction來聚合內(nèi)部的值,AggregatingState.get()方法會計算最終的結(jié)果并將其返回。使用 AggregatingStateDescriptor
來獲取狀態(tài)句柄
- MapState<UK, UV>
用于保存一組key绽榛、value的映射,類似于java的Map集合婿屹。用戶可以通過get(UK key)方法獲取key對應(yīng)的狀態(tài)灭美,可以通過put(UK k,UV value)方法添加一個鍵值,可以通過remove(UK key)刪除給定key的值昂利,可以通過contains(UK key)判斷是否存在對應(yīng)的key届腐。使用 MapStateDescriptor
來獲取狀態(tài)句柄。
- FoldingState<T, ACC>
在Flink 1.4的版本中標(biāo)記過時蜂奸,在未來的版本中會被移除犁苏,使用AggregatingState進行代替。
值得注意的是扩所,上面的狀態(tài)原語都支持通過State.clear()方法來進行清除狀態(tài)围详。另外,上述的狀態(tài)原語僅用于與狀態(tài)進行交互祖屏,真正的狀態(tài)是存儲在狀態(tài)后端(后面會介紹狀態(tài)后端)的助赞,通過該狀態(tài)原語相當(dāng)于持有了狀態(tài)的句柄(handle)。
keyed State使用案例
下面給出一個MapState的使用案例袁勺,關(guān)于ValueState的使用情況可以參考官網(wǎng)雹食,具體如下:
public class MapStateExample {
//統(tǒng)計每個用戶每種行為的個數(shù)
public static class UserBehaviorCnt extends RichFlatMapFunction<Tuple3<Long, String, String>, Tuple3<Long, String, Integer>> {
//定義一個MapState句柄
private transient MapState<String, Integer> behaviorCntState;
// 初始化狀態(tài)
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MapStateDescriptor<String, Integer> userBehaviorMapStateDesc = new MapStateDescriptor<>(
"userBehavior", // 狀態(tài)描述符的名稱
TypeInformation.of(new TypeHint<String>() {}), // MapState狀態(tài)的key的數(shù)據(jù)類型
TypeInformation.of(new TypeHint<Integer>() {}) // MapState狀態(tài)的value的數(shù)據(jù)類型
);
behaviorCntState = getRuntimeContext().getMapState(userBehaviorMapStateDesc); // 獲取狀態(tài)
}
@Override
public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple3<Long, String, Integer>> out) throws Exception {
Integer behaviorCnt = 1;
// 如果當(dāng)前狀態(tài)包括該行為,則+1
if (behaviorCntState.contains(value.f1)) {
behaviorCnt = behaviorCntState.get(value.f1) + 1;
}
// 更新狀態(tài)
behaviorCntState.put(value.f1, behaviorCnt);
out.collect(Tuple3.of(value.f0, value.f1, behaviorCnt));
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
// 模擬數(shù)據(jù)源[userId,behavior,product]
DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements(
Tuple3.of(1L, "buy", "iphone"),
Tuple3.of(1L, "cart", "huawei"),
Tuple3.of(1L, "buy", "logi"),
Tuple3.of(1L, "fav", "oppo"),
Tuple3.of(2L, "buy", "huawei"),
Tuple3.of(2L, "buy", "onemore"),
Tuple3.of(2L, "fav", "iphone"));
userBehaviors
.keyBy(0)
.flatMap(new UserBehaviorCnt())
.print();
env.execute("MapStateExample");
}
}
結(jié)果輸出:
狀態(tài)的生命周期管理(TTL)
對于任何類型Keyed State都可以設(shè)定狀態(tài)的生命周期(TTL),即狀態(tài)的存活時間期丰,以確保能夠在規(guī)定時間內(nèi)及時地清理狀態(tài)數(shù)據(jù)群叶。如果配置了狀態(tài)的TTL,那么當(dāng)狀態(tài)過期時钝荡,存儲的狀態(tài)會被清除街立。狀態(tài)生命周期功能可以通過StateTtlConfig配置,然后將StateTtlConfig配置傳入StateDescriptor中的enableTimeToLive方法中即可化撕。代碼示例如下:
StateTtlConfig ttlConfig = StateTtlConfig
// 指定TTL時長為10S
.newBuilder(Time.seconds(10))
// 只對創(chuàng)建和寫入操作有效
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
// 不返回過期的數(shù)據(jù)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
// 初始化狀態(tài)
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MapStateDescriptor<String, Integer> userBehaviorMapStateDesc = new MapStateDescriptor<>(
"userBehavior", // 狀態(tài)描述符的名稱
TypeInformation.of(new TypeHint<String>() {}), // MapState狀態(tài)的key的數(shù)據(jù)類型
TypeInformation.of(new TypeHint<Integer>() {}) // MapState狀態(tài)的value的數(shù)據(jù)類型
);
// 設(shè)置stateTtlConfig
userBehaviorMapStateDesc.enableTimeToLive(ttlConfig);
behaviorCntState = getRuntimeContext().getMapState(userBehaviorMapStateDesc); // 獲取狀態(tài)
}
在StateTtlConfig創(chuàng)建時几晤,newBuilder方法是必須要指定的,newBuilder中設(shè)定過期時間的參數(shù)植阴。對于其他參數(shù)都是可選的或使用默認(rèn)值蟹瘾。其中setUpdateType方法中傳入的類型有三種:
public enum UpdateType {
//禁用TTL,永遠不會過期
Disabled,
// 創(chuàng)建和寫入時更新TTL
OnCreateAndWrite,
// 與OnCreateAndWrite類似,但是在讀操作時也會更新TTL
OnReadAndWrite
}
值得注意的是掠手,過期的狀態(tài)數(shù)據(jù)根據(jù)UpdateType參數(shù)進行配置憾朴,只有被寫入或者讀取的時間才會更新TTL,也就是說如果某個狀態(tài)指標(biāo)一直不被使用或者更新喷鸽,則永遠不會觸發(fā)對該狀態(tài)數(shù)據(jù)的清理操作众雷,這種情況可能會導(dǎo)致系統(tǒng)中的狀態(tài)數(shù)據(jù)越來越大。目前用戶可以使用StateTtlConfig.cleanupFullSnapshot設(shè)定當(dāng)觸發(fā)State Snapshot的時候清理狀態(tài)數(shù)據(jù),但是改配置不適合用于RocksDB做增量Checkpointing的操作砾省。
上面的StateTtlConfig創(chuàng)建時鸡岗,可以指定setStateVisibility,用于狀態(tài)的可見性配置编兄,根據(jù)過期數(shù)據(jù)是否被清理來確定是否返回狀態(tài)數(shù)據(jù)轩性。
/**
* 是否返回過期的數(shù)據(jù)
*/
public enum StateVisibility {
//如果數(shù)據(jù)沒有被清理,就可以返回
ReturnExpiredIfNotCleanedUp,
//永遠不返回過期的數(shù)據(jù),默認(rèn)值
NeverReturnExpired
}
Operator State
Operator State的作用于是某個算子任務(wù)狠鸳,這意味著所有在同一個并行任務(wù)之內(nèi)的記錄都能訪問到相同的狀態(tài) 揣苏。算子狀態(tài)不能通過其他任務(wù)訪問,無論該任務(wù)是相同的算子件舵。如下圖所示卸察。
Operator State是一種non-keyed state,與并行的操作算子實例相關(guān)聯(lián)铅祸,例如在Kafka Connector中坑质,每個Kafka消費端算子實例都對應(yīng)到Kafka的一個分區(qū)中,維護Topic分區(qū)和Offsets偏移量作為算子的Operator State个少。在Flink中可以實現(xiàn)ListCheckpointed<T extends Serializable>接口或者CheckpointedFunction 接口來實現(xiàn)一個Operator State洪乍。
首先,我們先看一下這兩個接口的具體實現(xiàn)夜焦,然后再給出這兩種接口的具體使用案例壳澳。先看一下ListCheckpointed接口的源碼,如下:
public interface ListCheckpointed<T extends Serializable> {
/**
* 獲取某個算子實例的當(dāng)前狀態(tài)茫经,該狀態(tài)包括該算子實例之前被調(diào)用時的所有結(jié)果
* 以列表的形式返回一個函數(shù)狀態(tài)的快照
* Flink觸發(fā)生成檢查點時調(diào)用該方法
* @param checkpointId checkpoint的ID,是一個唯一的巷波、單調(diào)遞增的值
* @param timestamp Job Manager觸發(fā)checkpoint時的時間戳
* @return 返回一個operator state list,如果為null時,返回空list
* @throws Exception
*/
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
/**
* 初始化函數(shù)狀態(tài)時調(diào)用,可能是在作業(yè)啟動時或者故障恢復(fù)時
* 根據(jù)提供的列表恢復(fù)函數(shù)狀態(tài)
* 注意:當(dāng)實現(xiàn)該方法時卸伞,需要在RichFunction#open()方法之前調(diào)用該方法
* @param state 被恢復(fù)算子實例的state列表 抹镊,可能為空
* @throws Exception
*/
void restoreState(List<T> state) throws Exception;
}
使用Operator ListState時,在進行擴縮容時荤傲,重分布的策略(狀態(tài)恢復(fù)的模式)如下圖所示:
上面的重分布策略為Even-split Redistribution垮耳,即每個算子實例中含有部分狀態(tài)元素的List列表,整個狀態(tài)數(shù)據(jù)是所有List列表的合集遂黍。當(dāng)觸發(fā)restore/redistribution動作時终佛,通過將狀態(tài)數(shù)據(jù)平均分配成與算子并行度相同數(shù)量的List列表,每個task實例中有一個List雾家,其可以為空或者含有多個元素铃彰。
我們再來看一下CheckpointedFunction接口,源碼如下:
public interface CheckpointedFunction {
/**
* 會在生成檢查點之前調(diào)用
* 該方法的目的是確保檢查點開始之前所有狀態(tài)對象都已經(jīng)更新完畢
* @param context 使用FunctionSnapshotContext作為參數(shù)
* 從FunctionSnapshotContext可以獲取checkpoint的元數(shù)據(jù)信息芯咧,
* 比如checkpoint編號牙捉,JobManager在初始化checkpoint時的時間戳
* @throws Exception
*/
void snapshotState(FunctionSnapshotContext context) throws Exception;
/**
* 在創(chuàng)建checkpointedFunction的并行實例時被調(diào)用竹揍,
* 在應(yīng)用啟動或者故障重啟時觸發(fā)該方法的調(diào)用
* @param context 傳入FunctionInitializationContext對象,
* 可以使用該對象訪問OperatorStateStore和 KeyedStateStore對象邪铲,
* 這兩個對象可以獲取狀態(tài)的句柄芬位,即通過Flink runtime來注冊函數(shù)狀態(tài)并返回state對象
* 比如:ValueState、ListState等
* @throws Exception
*/
void initializeState(FunctionInitializationContext context) throws Exception;
}
CheckpointedFunction接口是用于指定有狀態(tài)函數(shù)的最底層的接口阴孟,該接口提供了用于注冊和維護keyed state 與operator state的hook(即可以同時使用keyed state 和operator state),另外也是唯一支持使用list union state撼泛。關(guān)于Union List State,使用的是Flink為Operator state提供的另一種重分布的策略:Union Redistribution杆兵,即每個算子實例中含有所有狀態(tài)元素的List列表灶体,當(dāng)觸發(fā)restore/redistribution動作時精算,每個算子都能夠獲取到完整的狀態(tài)元素列表。具體如下圖所示:
ListCheckpointed
ListCheckpointed接口和CheckpointedFunction接口相比在靈活性上相對弱一些,只能支持List類型的狀態(tài)憋槐,并且在數(shù)據(jù)恢復(fù)的時候僅支持even-redistribution策略。該接口不像Flink提供的Keyed State(比如Value State淑趾、ListState)那樣直接在狀態(tài)后端(state backend)注冊阳仔,需要將operator state實現(xiàn)為成員變量,然后通過接口提供的回調(diào)函數(shù)與狀態(tài)后端進行交互治笨。使用代碼案例如下:
public class ListCheckpointedExample {
private static class UserBehaviorCnt extends RichFlatMapFunction<Tuple3<Long, String, String>, Tuple2<String, Long>> implements ListCheckpointed<Long> {
private Long userBuyBehaviorCnt = 0L;
@Override
public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple2<String, Long>> out) throws Exception {
if(value.f1.equals("buy")){
userBuyBehaviorCnt ++;
out.collect(Tuple2.of("buy",userBuyBehaviorCnt));
}
}
@Override
public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
//返回單個元素的List集合驳概,該集合元素是用戶購買行為的數(shù)量
return Collections.singletonList(userBuyBehaviorCnt);
}
@Override
public void restoreState(List<Long> state) throws Exception {
// 在進行擴縮容之后,進行狀態(tài)恢復(fù)旷赖,需要把其他subtask的狀態(tài)加在一起
for (Long cnt : state) {
userBuyBehaviorCnt += 1;
}
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
// 模擬數(shù)據(jù)源[userId,behavior,product]
DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements(
Tuple3.of(1L, "buy", "iphone"),
Tuple3.of(1L, "cart", "huawei"),
Tuple3.of(1L, "buy", "logi"),
Tuple3.of(1L, "fav", "oppo"),
Tuple3.of(2L, "buy", "huawei"),
Tuple3.of(2L, "buy", "onemore"),
Tuple3.of(2L, "fav", "iphone"));
userBehaviors
.flatMap(new UserBehaviorCnt())
.print();
env.execute("ListCheckpointedExample");
}
}
CheckpointedFunction
CheckpointedFunction接口提供了更加豐富的操作顺又,比如支持Union list state,可以訪問keyedState等孵,關(guān)于重分布策略稚照,如果使用Even-split Redistribution策略,則通過context. getListState(descriptor)獲取Operator State俯萌;如果使用UnionRedistribution策略果录,則通過context. getUnionList State(descriptor)來獲取。使用案例如下:
public class CheckpointFunctionExample {
private static class UserBehaviorCnt implements CheckpointedFunction, FlatMapFunction<Tuple3<Long, String, String>, Tuple3<Long, Long, Long>> {
// 統(tǒng)計每個operator實例的用戶行為數(shù)量的本地變量
private Long opUserBehaviorCnt = 0L;
// 每個key的state,存儲key對應(yīng)的相關(guān)狀態(tài)
private ValueState<Long> keyedCntState;
// 定義operator state咐熙,存儲算子的狀態(tài)
private ListState<Long> opCntState;
@Override
public void flatMap(Tuple3<Long, String, String> value, Collector<Tuple3<Long, Long, Long>> out) throws Exception {
if (value.f1.equals("buy")) {
// 更新算子狀態(tài)本地變量值
opUserBehaviorCnt += 1;
Long keyedCount = keyedCntState.value();
// 更新keyedstate的狀態(tài) ,判斷狀態(tài)是否為null弱恒,否則空指針異常
keyedCntState.update(keyedCount == null ? 1L : keyedCount + 1 );
// 結(jié)果輸出
out.collect(Tuple3.of(value.f0, keyedCntState.value(), opUserBehaviorCnt));
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 使用opUserBehaviorCnt本地變量更新operator state
opCntState.clear();
opCntState.add(opUserBehaviorCnt);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// 通過KeyedStateStore,定義keyedState的StateDescriptor描述符
ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("keyedCnt", TypeInformation.of(new TypeHint<Long>() {
}));
// 通過OperatorStateStore,定義OperatorState的StateDescriptor描述符
ListStateDescriptor opStateDescriptor = new ListStateDescriptor("opCnt", TypeInformation.of(new TypeHint<Long>() {
}));
// 初始化keyed state狀態(tài)值
keyedCntState = context.getKeyedStateStore().getState(valueStateDescriptor);
// 初始化operator state狀態(tài)
opCntState = context.getOperatorStateStore().getListState(opStateDescriptor);
// 初始化本地變量operator state
for (Long state : opCntState.get()) {
opUserBehaviorCnt += state;
}
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
// 模擬數(shù)據(jù)源[userId,behavior,product]
DataStreamSource<Tuple3<Long, String, String>> userBehaviors = env.fromElements(
Tuple3.of(1L, "buy", "iphone"),
Tuple3.of(1L, "cart", "huawei"),
Tuple3.of(1L, "buy", "logi"),
Tuple3.of(1L, "fav", "oppo"),
Tuple3.of(2L, "buy", "huawei"),
Tuple3.of(2L, "buy", "onemore"),
Tuple3.of(2L, "fav", "iphone"));
userBehaviors
.keyBy(0)
.flatMap(new UserBehaviorCnt())
.print();
env.execute("CheckpointFunctionExample");
}
}
什么是狀態(tài)后端
上面使用的狀態(tài)都需要存儲到狀態(tài)后端(StateBackend),然后在checkpoint觸發(fā)時棋恼,將狀態(tài)持久化到外部存儲系統(tǒng)返弹。Flink提供了三種類型的狀態(tài)后端锈玉,分別是基于內(nèi)存的狀態(tài)后端(MemoryStateBackend、基于文件系統(tǒng)的狀態(tài)后端(FsStateBackend)以及基于RockDB作為存儲介質(zhì)的RocksDB StateBackend义起。這三種類型的StateBackend都能夠有效地存儲Flink流式計算過程中產(chǎn)生的狀態(tài)數(shù)據(jù)拉背,在默認(rèn)情況下Flink使用的是MemoryStateBackend,區(qū)別見下表默终。下面分別對每種狀態(tài)后端的特點進行說明椅棺。
狀態(tài)后端的類別
MemoryStateBackend
MemoryStateBackend將狀態(tài)數(shù)據(jù)全部存儲在JVM堆內(nèi)存中,包括用戶在使用DataStream API中創(chuàng)建的Key/Value State齐蔽,窗口中緩存的狀態(tài)數(shù)據(jù)两疚,以及觸發(fā)器等數(shù)據(jù)。MemoryStateBackend具有非澈危快速和高效的特點鬼雀,但也具有非常多的限制,最主要的就是內(nèi)存的容量限制蛙吏,一旦存儲的狀態(tài)數(shù)據(jù)過多就會導(dǎo)致系統(tǒng)內(nèi)存溢出等問題,從而影響整個應(yīng)用的正常運行鞋吉。同時如果機器出現(xiàn)問題鸦做,整個主機內(nèi)存中的狀態(tài)數(shù)據(jù)都會丟失,進而無法恢復(fù)任務(wù)中的狀態(tài)數(shù)據(jù)谓着。因此從數(shù)據(jù)安全的角度建議用戶盡可能地避免在生產(chǎn)環(huán)境中使用MemoryStateBackend泼诱。Flink將MemoryStateBackend作為默認(rèn)狀態(tài)后端。
MemoryStateBackend比較適合用于測試環(huán)境中赊锚,并用于本地調(diào)試和驗證治筒,不建議在生產(chǎn)環(huán)境中使用。但如果應(yīng)用狀態(tài)數(shù)據(jù)量不是很大舷蒲,例如使用了大量的非狀態(tài)計算算子耸袜,也可以在生產(chǎn)環(huán)境中使MemoryStateBackend.
FsStateBackend
FsStateBackend是基于文件系統(tǒng)的一種狀態(tài)后端,這里的文件系統(tǒng)可以是本地文件系統(tǒng)牲平,也可以是HDFS分布式文件系統(tǒng)堤框。創(chuàng)建FsStateBackend的構(gòu)造函數(shù)如下:
FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots)
其中path如果為本地路徑,其格式為“file:///data/flink/checkpoints”纵柿,如果path為HDFS路徑蜈抓,其格式為“hdfs://nameservice/flink/checkpoints”。FsStateBackend中第二個Boolean類型的參數(shù)指定是否以同步的方式進行狀態(tài)數(shù)據(jù)記錄昂儒,默認(rèn)采用異步的方式將狀態(tài)數(shù)據(jù)同步到文件系統(tǒng)中沟使,異步方式能夠盡可能避免在Checkpoint的過程中影響流式計算任務(wù)。如果用戶想采用同步的方式進行狀態(tài)數(shù)據(jù)的檢查點數(shù)據(jù)渊跋,則將第二個參數(shù)指定為True即可腊嗡。
相比于MemoryStateBackend, FsStateBackend更適合任務(wù)狀態(tài)非常大的情況着倾,例如應(yīng)用中含有時間范圍非常長的窗口計算,或Key/value State狀態(tài)數(shù)據(jù)量非常大的場景叽唱,這時系統(tǒng)內(nèi)存不足以支撐狀態(tài)數(shù)據(jù)的存儲屈呕。同時FsStateBackend最大的好處是相對比較穩(wěn)定,在checkpoint時棺亭,將狀態(tài)持久化到像HDFS分布式文件系統(tǒng)中虎眨,能最大程度保證狀態(tài)數(shù)據(jù)的安全性。
RocksDBStateBackend
與前面的狀態(tài)后端不同镶摘,RocksDBStateBackend需要單獨引入相關(guān)的依賴包嗽桩。RocksDB 是一個 key/value 的內(nèi)存存儲系統(tǒng),類似于HBase凄敢,是一種內(nèi)存磁盤混合的 LSM DB碌冶。當(dāng)寫數(shù)據(jù)時會先寫進write buffer(類似于HBase的memstore),然后在flush到磁盤文件涝缝,當(dāng)讀取數(shù)據(jù)時會現(xiàn)在block cache(類似于HBase的block cache)扑庞,所以速度會很快。
RocksDBStateBackend在性能上要比FsStateBackend高一些拒逮,主要是因為借助于RocksDB存儲了最新熱數(shù)據(jù)罐氨,然后通過異步的方式再同步到文件系統(tǒng)中,但RocksDBStateBackend和MemoryStateBackend相比性能就會較弱一些滩援。
需要注意 RocksDB 不支持同步的 Checkpoint栅隐,構(gòu)造方法中沒有同步快照這個選項。不過 RocksDB 支持增量的 Checkpoint玩徊,也是目前唯一增量 Checkpoint 的 Backend租悄,意味著并不需要把所有 sst 文件上傳到 Checkpoint 目錄,僅需要上傳新生成的 sst 文件即可恩袱。它的 Checkpoint 存儲在外部文件系統(tǒng)(本地或HDFS)泣棋,其容量限制只要單個 TaskManager 上 State 總量不超過它的內(nèi)存+磁盤,單 Key最大 2G憎蛤,總大小不超過配置的文件系統(tǒng)容量即可外傅。對于超大狀態(tài)的作業(yè),例如天級窗口聚合等場景下可以使會用該狀態(tài)后端俩檬。
配置狀態(tài)后端
Flink默認(rèn)使用的狀態(tài)后端是MemoryStateBackend萎胰,所以不需要顯示配置。對于其他的狀態(tài)后端棚辽,都需要進行顯性配置技竟。在Flink中包含了兩種級別的StateBackend配置:一種是在程序中進行配置,該配置只對當(dāng)前應(yīng)用有效屈藐;另外一種是通過 flink-conf.yaml
進行全局配置榔组,一旦配置就會對整個Flink集群上的所有應(yīng)用有效熙尉。
- 應(yīng)用級別配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
如果使用RocksDBStateBackend則需要單獨引入rockdb依賴庫,如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
使用方式與FsStateBackend類似,如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));
- 集群級別配置
具體的配置項在flink-conf.yaml文件中搓扯,如下代碼所示检痰,參數(shù)state.backend指明StateBackend類型,state.checkpoints.dir配置具體的狀態(tài)存儲路徑锨推,代碼中使用filesystem作為StateBackend铅歼,然后指定相應(yīng)的HDFS文件路徑作為state的checkpoint文件夾。
# 使用filesystem存儲
state.backend: filesystem
# checkpoint存儲路徑
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
如果想用RocksDBStateBackend配置集群級別的狀態(tài)后端换可,可以使用下面的配置:
# 操作RocksDBStateBackend的線程數(shù)量椎椰,默認(rèn)值為1
state.backend.rocksdb.checkpoint.transfer.thread.num: 1
# 指定RocksDB存儲狀態(tài)數(shù)據(jù)的本地文件路徑
state.backend.rocksdb.localdir: /var/rockdb/checkpoints
# 用于指定定時器服務(wù)的工廠類實現(xiàn)類,默認(rèn)為“HEAP”沾鳄,也可以指定為“RocksDB”
state.backend.rocksdb.timer-service.factory: HEAP
什么是Checkpoint(檢查點)
上面講解了Flink的狀態(tài)以及狀態(tài)后端慨飘,狀態(tài)是存儲在狀態(tài)后端。為了保證state容錯译荞,F(xiàn)link提供了處理故障的措施瓤的,這種措施稱之為checkpoint(一致性檢查點)。checkpoint是Flink實現(xiàn)容錯的核心功能吞歼,主要是周期性地觸發(fā)checkpoint堤瘤,將state生成快照持久化到外部存儲系統(tǒng)(比如HDFS)。這樣一來浆熔,如果Flink程序出現(xiàn)故障,那么就可以從上一次checkpoint中進行狀態(tài)恢復(fù)桥帆,從而提供容錯保障医增。另外,通過checkpoint機制老虫,F(xiàn)link可以實現(xiàn)Exactly-once語義(Flink內(nèi)部的Exactly-once,關(guān)于端到端的exactly_once,Flink是通過兩階段提交協(xié)議實現(xiàn)的)叶骨。下面將會詳細(xì)分析Flink的checkpoint機制。
檢查點的生成
如上圖祈匙,輸入流是用戶行為數(shù)據(jù)忽刽,包括購買(buy)和加入購物車(cart)兩種,每種行為數(shù)據(jù)都有一個偏移量夺欲,統(tǒng)計每種行為的個數(shù)跪帝。
第一步:JobManager checkpoint coordinator 觸發(fā)checkpoint。
第二步:假設(shè)當(dāng)消費到[cart些阅,3]這條數(shù)據(jù)時伞剑,觸發(fā)了checkpoint。那么此時數(shù)據(jù)源會把消費的偏移量3寫入持久化存儲市埋。
第三步:當(dāng)寫入結(jié)束后黎泣,source會將state handle(狀態(tài)存儲路徑)反饋給JobManager的checkpoint coordinator恕刘。
第四步:接著算子count buy與count cart也會進行同樣的步驟
第五步:等所有的算子都完成了上述步驟之后,即當(dāng) Checkpoint coordinator 收集齊所有 task 的 state handle抒倚,就認(rèn)為這一次的 Checkpoint 全局完成了褐着,向持久化存儲中再備份一個 Checkpoint meta 文件,那么整個checkpoint也就完成了托呕,如果中間有一個不成功含蓉,那么本次checkpoin就宣告失敗。
檢查點的恢復(fù)
通過上面的分析镣陕,或許你已經(jīng)對Flink的checkpoint有了初步的認(rèn)識谴餐。那么接下來,我們看一下是如何從檢查點恢復(fù)的呆抑。
- 任務(wù)失敗
- 重啟作業(yè)
- 恢復(fù)檢查點
- 繼續(xù)處理數(shù)據(jù)
上述過程具體總結(jié)如下:
- 第一步:重啟作業(yè)
- 第二步:從上一次檢查點恢復(fù)狀態(tài)數(shù)據(jù)
- 第三步:繼續(xù)處理新的數(shù)據(jù)
Flink內(nèi)部Exactly-Once實現(xiàn)
Flink提供了精確一次的處理語義岂嗓,精確一次的處理語義可以理解為:數(shù)據(jù)可能會重復(fù)計算,但是結(jié)果狀態(tài)只有一個鹊碍。Flink通過Checkpoint機制實現(xiàn)了精確一次的處理語義厌殉,F(xiàn)link在觸發(fā)Checkpoint時會向Source端插入checkpoint barrier,checkpoint barriers是從source端插入的侈咕,并且會向下游算子進行傳遞公罕。checkpoint barriers攜帶一個checkpoint ID,用于標(biāo)識屬于哪一個checkpoint耀销,checkpoint barriers將流邏輯是哪個分為了兩部分楼眷。對于雙流的情況,通過barrier對齊的方式實現(xiàn)精確一次的處理語義熊尉。
關(guān)于什么是checkpoint barrier罐柳,可以看一下CheckpointBarrier類的源碼描述,如下:
/**
* Checkpoint barriers用來在數(shù)據(jù)流中實現(xiàn)checkpoint對齊的.
* Checkpoint barrier由JobManager的checkpoint coordinator插入到Source中,
* Source會把barrier廣播發(fā)送到下游算子,當(dāng)一個算子接收到了其中一個輸入流的Checkpoint barrier時,
* 它就會知道已經(jīng)處理完了本次checkpoint與上次checkpoint之間的數(shù)據(jù).
*
* 一旦某個算子接收到了所有輸入流的checkpoint barrier時狰住,
* 意味著該算子的已經(jīng)處理完了截止到當(dāng)前checkpoint的數(shù)據(jù)张吉,
* 可以觸發(fā)checkpoint,并將barrier向下游傳遞
*
* 根據(jù)用戶選擇的處理語義催植,在checkpoint完成之前會緩存后一次checkpoint的數(shù)據(jù)肮蛹,
* 直到本次checkpoint完成(exactly once)
*
* checkpoint barrier的id是嚴(yán)格單調(diào)遞增的
*
*/
public class CheckpointBarrier extends RuntimeEvent {...}
可以看出checkpoint barrier主要功能是實現(xiàn)checkpoint對齊的,從而可以實現(xiàn)Exactly-Once處理語義创南。
下面將會對checkpoint過程進行分解伦忠,具體如下:
圖1,包括兩個流稿辙,每個任務(wù)都會消費一條用戶行為數(shù)據(jù)(包括購買(buy)和加購(cart))缓苛,數(shù)字代表該數(shù)據(jù)的偏移量,count buy任務(wù)統(tǒng)計購買行為的個數(shù),coun cart統(tǒng)計加購行為的個數(shù)未桥。
圖2笔刹,觸發(fā)checkpoint,JobManager會向每個數(shù)據(jù)源發(fā)送一個新的checkpoint編號冬耿,以此來啟動檢查點生成流程舌菜。
-
圖3士葫,當(dāng)Source任務(wù)收到消息后菇夸,會停止發(fā)出數(shù)據(jù)帜平,然后利用狀態(tài)后端觸發(fā)生成本地狀態(tài)檢查點灶似,并把該checkpoint barrier以及checkpoint id廣播至所有傳出的數(shù)據(jù)流分區(qū)。狀態(tài)后端會在checkpoint完成之后通知任務(wù)磁玉,隨后任務(wù)會向Job Manager發(fā)送確認(rèn)消息驰唬。在將checkpoint barrier發(fā)出之后份汗,Source任務(wù)恢復(fù)正常工作绊起。
- 圖4精拟,Source任務(wù)發(fā)出的checkpoint barrier會發(fā)送到與之相連的下游算子任務(wù),當(dāng)任務(wù)收到一個新的checkpoint barrier時虱歪,會繼續(xù)等待其他輸入分區(qū)的checkpoint barrier到來蜂绎,這個過程稱之為barrier 對齊,checkpoint barrier到來之前會把到來的數(shù)據(jù)線緩存起來笋鄙。
- 圖5师枣,任務(wù)收齊了全部輸入分區(qū)的checkpoint barrier之后,會通知狀態(tài)后端開始生成checkpoint萧落,同時會把checkpoint barrier廣播至下游算子践美。
- 圖6,任務(wù)在發(fā)出checkpoint barrier之后找岖,開始處理因barrier對齊產(chǎn)生的緩存數(shù)據(jù)拨脉,在緩存的數(shù)據(jù)處理完之后,就會繼續(xù)處理輸入流數(shù)據(jù)宣增。
- 圖7,最終checkpoint barrier會被傳送到sink端矛缨,sink任務(wù)接收到checkpoint barrier之后爹脾,會向其他算子任務(wù)一樣,將自身的狀態(tài)寫入checkpoint箕昭,之后向Job Manager發(fā)送確認(rèn)消息灵妨。Job Manager接收到所有任務(wù)返回的確認(rèn)消息之后,就會將此次檢查點標(biāo)記為完成落竹。
使用案例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpoint的時間間隔泌霍,如果狀態(tài)比較大,可以適當(dāng)調(diào)大該值
env.enableCheckpointing(1000);
// 配置處理語義,默認(rèn)是exactly-once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 兩個checkpoint之間的最小時間間隔朱转,防止因checkpoint時間過長蟹地,導(dǎo)致checkpoint積壓
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoint執(zhí)行的上限時間,如果超過該閾值藤为,則會中斷checkpoint
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 最大并行執(zhí)行的檢查點數(shù)量怪与,默認(rèn)為1,可以指定多個缅疟,從而同時出發(fā)多個checkpoint分别,提升效率
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 設(shè)定周期性外部檢查點,將狀態(tài)數(shù)據(jù)持久化到外部系統(tǒng)中存淫,
// 使用該方式不會在任務(wù)正常停止的過程中清理掉檢查點數(shù)據(jù)
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
總結(jié)
本文首先從Flink的狀態(tài)入手耘斩,通過Spark的WordCount和Flink的Work Count進行說明什么是狀態(tài)。接著對狀態(tài)的分類以及狀態(tài)的使用進行了詳細(xì)說明桅咆。然后對Flink提供的三種狀態(tài)后端進行討論括授,并給出了狀態(tài)后端的使用說明。最后轧邪,以圖解加文字的形式詳細(xì)解釋了Flink的checkpoint機制刽脖,并給出了使用Checkpoint時的程序配置。