1 檢查點(diǎn)機(jī)制
1.1 CheckPoints
為了使 Flink 的狀態(tài)具有良好的容錯(cuò)性宏侍,F(xiàn)link 提供了檢查點(diǎn)機(jī)制 (CheckPoints) 徽千。通過檢查點(diǎn)機(jī)制爱榔,F(xiàn)link 定期在數(shù)據(jù)流上生成 checkpoint barrier 挟伙,當(dāng)某個(gè)算子收到 barrier 時(shí),即會(huì)基于當(dāng)前狀態(tài)生成一份快照荐开,然后再將該 barrier 傳遞到下游算子付翁,下游算子接收到該 barrier 后,也基于當(dāng)前狀態(tài)生成一份快照晃听,依次傳遞直至到最后的 Sink 算子上胆敞。當(dāng)出現(xiàn)異常后,F(xiàn)link 就可以根據(jù)最近的一次的快照數(shù)據(jù)將所有算子恢復(fù)到先前的狀態(tài)杂伟。
1.2 開啟檢查點(diǎn)
默認(rèn)情況下移层,檢查點(diǎn)機(jī)制是關(guān)閉的,需要在程序中進(jìn)行開啟:
// 開啟檢查點(diǎn)機(jī)制赫粥,并指定狀態(tài)檢查點(diǎn)之間的時(shí)間間隔
env.enableCheckpointing(1000);
// 其他可選配置如下:
// 設(shè)置語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 設(shè)置兩個(gè)檢查點(diǎn)之間的最小時(shí)間間隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 設(shè)置執(zhí)行Checkpoint操作時(shí)的超時(shí)時(shí)間
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 設(shè)置最大并發(fā)執(zhí)行的檢查點(diǎn)的數(shù)量
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 將檢查點(diǎn)持久化到外部存儲(chǔ)
env.getCheckpointConfig().enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 如果有更近的保存點(diǎn)時(shí)观话,是否將作業(yè)回退到該檢查點(diǎn)
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
1.3 保存點(diǎn)機(jī)制
保存點(diǎn)機(jī)制 (Savepoints) 是檢查點(diǎn)機(jī)制的一種特殊的實(shí)現(xiàn),它允許你通過手工的方式來觸發(fā) Checkpoint越平,并將結(jié)果持久化存儲(chǔ)到指定路徑中频蛔,主要用于避免 Flink 集群在重啟或升級(jí)時(shí)導(dǎo)致狀態(tài)丟失。
1.4 RichFunction 檢查點(diǎn)實(shí)戰(zhàn)
public class OperatorWarning implements CheckpointedFunction {
// 非正常數(shù)據(jù)
private List<Tuple2<String, Long>> bufferedData;
// checkPointedState
private transient ListState<Tuple2<String, Long>> checkPointedState;
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// 注意這里獲取的是OperatorStateStore
checkPointedState = context.getOperatorStateStore().
getListState(new ListStateDescriptor<>("abnormalData", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})));
// 如果發(fā)生重啟秦叛,則需要從快照中將狀態(tài)進(jìn)行恢復(fù)
if (context.isRestored()) {
for (Tuple2<String, Long> element : checkPointedState.get()) {
bufferedData.add(element);
}
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 在進(jìn)行快照時(shí)晦溪,將數(shù)據(jù)存儲(chǔ)到checkPointedState
checkPointedState.clear();
for (Tuple2<String, Long> element : bufferedData) {
checkPointedState.add(element);
}
}
}
2 狀態(tài)管理
2.1 算子狀態(tài)
算子狀態(tài) (Operator State):顧名思義,狀態(tài)是和算子進(jìn)行綁定的挣跋,一個(gè)算子的狀態(tài)不能被其他算子所訪問到三圆。官方文檔上對(duì) Operator State 的解釋是:each operator state is bound to one parallel operator instance,所以更為確切的說一個(gè)算子狀態(tài)是與一個(gè)并發(fā)的算子實(shí)例所綁定的避咆,即假設(shè)算子的并行度是 2舟肉,那么其應(yīng)有兩個(gè)對(duì)應(yīng)的算子狀態(tài):
2.2 鍵控狀態(tài)
鍵控狀態(tài) (Keyed State) :是一種特殊的算子狀態(tài),即狀態(tài)是根據(jù) key 值進(jìn)行區(qū)分的查库,F(xiàn)link 會(huì)為每類鍵值維護(hù)一個(gè)狀態(tài)實(shí)例路媚。如下圖所示,每個(gè)顏色代表不同 key 值樊销,對(duì)應(yīng)四個(gè)不同的狀態(tài)實(shí)例整慎。需要注意的是鍵控狀態(tài)只能在 KeyedStream 上進(jìn)行使用,我們可以通過 stream.keyBy(...) 來得到 KeyedStream 围苫。
2.3 監(jiān)控狀態(tài)編程
Flink 提供了以下數(shù)據(jù)格式來管理和存儲(chǔ)鍵控狀態(tài) (Keyed State):
- ValueState:存儲(chǔ)單值類型的狀態(tài)裤园。可以使用 update(T) 進(jìn)行更新够吩,并通過 T value() 進(jìn)行檢索比然。
- ListState:存儲(chǔ)列表類型的狀態(tài)≈苎可以使用 add(T) 或 addAll(List) 添加元素强法;并通過 get() 獲得整個(gè)列表。
- ReducingState:用于存儲(chǔ)經(jīng)過 ReduceFunction 計(jì)算后的結(jié)果湾笛,使用 add(T) 增加元素饮怯。
- AggregatingState:用于存儲(chǔ)經(jīng)過 AggregatingState 計(jì)算后的結(jié)果,使用 add(IN) 添加元素嚎研。
- FoldingState:已被標(biāo)識(shí)為廢棄蓖墅,會(huì)在未來版本中移除,官方推薦使用 AggregatingState 代替临扮。
- MapState:維護(hù) Map 類型的狀態(tài)论矾。
@Override
public void open(Configuration parameters) {
// 通過狀態(tài)名稱(句柄)獲取狀態(tài)實(shí)例,如果不存在則會(huì)自動(dòng)創(chuàng)建
// abnormalData = getRuntimeContext().getListState(new ListStateDescriptor<>("abnormalData", Long.class));
StateTtlConfig ttlConfig = StateTtlConfig
// 設(shè)置有效期為 10 秒
.newBuilder(Time.seconds(10))
// 設(shè)置有效期更新規(guī)則杆勇,這里設(shè)置為當(dāng)創(chuàng)建和寫入時(shí)贪壳,都重置其有效期到規(guī)定的10秒
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
/*設(shè)置只要值過期就不可見,另外一個(gè)可選值是ReturnExpiredIfNotCleanedUp蚜退,
代表即使值過期了闰靴,但如果還沒有被物理刪除,就是可見的*/
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("abnormalData", Long.class);
descriptor.enableTimeToLive(ttlConfig);
abnormalData = getRuntimeContext().getListState(descriptor);
}
@Override
public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out) throws Exception {
Long inputValue = value.f1;
// 如果輸入值超過閾值钻注,則記錄該次不正常的數(shù)據(jù)信息
if (inputValue >= threshold) {
abnormalData.add(inputValue);
}
ArrayList<Long> list = Lists.newArrayList(abnormalData.get().iterator());
// 如果不正常的數(shù)據(jù)出現(xiàn)達(dá)到一定次數(shù)蚂且,則輸出報(bào)警信息
if (list.size() >= numberOfTimes) {
out.collect(Tuple2.of(value.f0 + " 超過指定閾值 ", list));
// 報(bào)警信息輸出后,清空狀態(tài)
abnormalData.clear();
}
}
2.4 算子狀態(tài)編程
相比于鍵控狀態(tài)幅恋,算子狀態(tài)目前支持的存儲(chǔ)類型只有以下三種:
- ListState:存儲(chǔ)列表類型的狀態(tài)杏死。
- UnionListState:存儲(chǔ)列表類型的狀態(tài),與 ListState 的區(qū)別在于:如果并行度發(fā)生變化捆交,ListState 會(huì)將該算子的所有并發(fā)的狀態(tài)實(shí)例進(jìn)行匯總识埋,然后均分給新的 Task;而 UnionListState 只是將所有并發(fā)的狀態(tài)實(shí)例匯總起來零渐,具體的劃分行為則由用戶進(jìn)行定義窒舟。
- BroadcastState:用于廣播的算子狀態(tài)。
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// 注意這里獲取的是OperatorStateStore
checkPointedState = context.getOperatorStateStore().
getListState(new ListStateDescriptor<>("abnormalData", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})));
// 如果發(fā)生重啟诵盼,則需要從快照中將狀態(tài)進(jìn)行恢復(fù)
if (context.isRestored()) {
for (Tuple2<String, Long> element : checkPointedState.get()) {
bufferedData.add(element);
}
}
}
備注:一個(gè)算子狀態(tài)是與一個(gè)并發(fā)的算子實(shí)例所綁定的惠豺,即假設(shè)算子的并行度是 2,那么其應(yīng)有兩個(gè)對(duì)應(yīng)的算子狀態(tài)
3 狀態(tài)后端
3.1 狀態(tài)管理實(shí)現(xiàn)方式
MemoryStateBackend
默認(rèn)的方式风宁,即基于 JVM 的堆內(nèi)存進(jìn)行存儲(chǔ)洁墙,主要適用于本地開發(fā)和調(diào)試。FsStateBackend
基于文件系統(tǒng)進(jìn)行存儲(chǔ)戒财,可以是本地文件系統(tǒng)热监,也可以是 HDFS 等分布式文件系統(tǒng)。 需要注意而是雖然選擇使用了 FsStateBackend 饮寞,但正在進(jìn)行的數(shù)據(jù)仍然是存儲(chǔ)在 TaskManager 的內(nèi)存中的孝扛,只有在 checkpoint 時(shí)列吼,才會(huì)將狀態(tài)快照寫入到指定文件系統(tǒng)上。RocksDBStateBackend
RocksDBStateBackend 是 Flink 內(nèi)置的第三方狀態(tài)管理器苦始,采用嵌入式的 key-value 型數(shù)據(jù)庫 RocksDB 來存儲(chǔ)正在進(jìn)行的數(shù)據(jù)寞钥。等到 checkpoint 時(shí),再將其中的數(shù)據(jù)持久化到指定的文件系統(tǒng)中陌选,所以采用 RocksDBStateBackend 時(shí)也需要配置持久化存儲(chǔ)的文件系統(tǒng)理郑。之所以這樣做是因?yàn)?RocksDB 作為嵌入式數(shù)據(jù)庫安全性比較低,但比起全文件系統(tǒng)的方式咨油,其讀取速率更快您炉;比起全內(nèi)存的方式,其存儲(chǔ)空間更大役电,因此它是一種比較均衡的方案赚爵。
3.2 配置方式
- 基于代碼方式進(jìn)行配置,只對(duì)當(dāng)前作業(yè)生效:
// 配置 FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
// 配置 RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));
// 配置 RocksDBStateBackend 時(shí)宴霸,需要額外導(dǎo)入下面的依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.12</version>
</dependency>
- 基于 flink-conf.yaml 配置文件的方式進(jìn)行配置囱晴,對(duì)所有部署在該集群上的作業(yè)都生效:
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints