Flink的狀態(tài)與容錯是這個框架很核心的知識點奠涌。其中一致檢查點也就是Checkpoints也是Flink故障恢復機制的核心固灵,這篇文章將詳細介紹Flink的狀態(tài)管理和Checkpoints的概念以及在生產(chǎn)環(huán)境中的參數(shù)設(shè)置。
什么是State狀態(tài)轴术?
在使用Flink進行窗口聚合統(tǒng)計难衰,排序等操作的時候,數(shù)據(jù)流的處理離不開狀態(tài)管理
是一個Operator的運行的狀態(tài)/歷史值逗栽,在內(nèi)存中進行維護
流程:一個算子的子任務(wù)接收輸入流盖袭,獲取對應(yīng)的狀態(tài),計算新的結(jié)果祭陷,然后把結(jié)果更新到狀態(tài)里面
有狀態(tài)和無狀態(tài)介紹
無狀態(tài)計算: 同個數(shù)據(jù)進到算子里面多少次苍凛,都是一樣的輸出,比如 filter
有狀態(tài)計算:需要考慮歷史狀態(tài)兵志,同個輸入會有不同的輸出醇蝴,比如sum、reduce聚合操作
-
狀態(tài)管理分類
-
ManagedState(用的多)
Flink管理想罕,自動存儲恢復
-
細分兩類
-
Keyed State 鍵控狀態(tài)(用的多)
有KeyBy才用這個悠栓,僅限用在KeyStream中,每個key都有state 按价,是基于KeyedStream上的狀態(tài)
一般是用richFlatFunction,或者其他richfunction里面惭适,在open()聲明周期里面進行初始化
ValueState、ListState楼镐、MapState等數(shù)據(jù)結(jié)構(gòu)
-
Operator State 算子狀態(tài)(用的少,部分source會用)
- ListState癞志、UnionListState、BroadcastState等數(shù)據(jù)結(jié)構(gòu)
-
-
RawState(用的少)
用戶自己管理和維護
存儲結(jié)構(gòu):二進制數(shù)組
-
-
State數(shù)據(jù)結(jié)構(gòu)(狀態(tài)值可能存在內(nèi)存框产、磁盤凄杯、DB或者其他分布式存儲中)
-
ValueState 簡單的存儲一個值(ThreadLocal / String)
ValueState.value()
ValueState.update(T value)
-
ListState 列表
ListState.add(T value)
ListState.get() //得到一個Iterator
-
MapState 映射類型
MapState.get(key)
MapState.put(key, value)
-
State狀態(tài)后端:存儲在哪里
-
Flink 內(nèi)置了以下這些開箱即用的 state backends :
-
(新版)HashMapStateBackend、EmbeddedRocksDBStateBackend
- 如果沒有其他配置秉宿,系統(tǒng)將使用 HashMapStateBackend戒突。
-
(舊版)MemoryStateBackend、FsStateBackend描睦、RocksDBStateBackend
- 如果不設(shè)置膊存,默認使用 MemoryStateBackend。
-
-
狀態(tài)詳解
-
HashMapStateBackend 保存數(shù)據(jù)在內(nèi)部作為Java堆的對象忱叭。
鍵/值狀態(tài)和窗口操作符持有哈希表,用于存儲值韵丑、觸發(fā)器等
非常快埂息,因為每個狀態(tài)訪問和更新都對 Java 堆上的對象進行操作
但是狀態(tài)大小受集群內(nèi)可用內(nèi)存的限制
-
場景:
具有大狀態(tài)、長窗口千康、大鍵/值狀態(tài)的作業(yè)享幽。
所有高可用性設(shè)置拾弃。
-
EmbeddedRocksDBStateBackend 在RocksDB數(shù)據(jù)庫中保存狀態(tài)數(shù)據(jù)
該數(shù)據(jù)庫(默認)存儲在 TaskManager 本地數(shù)據(jù)目錄中
與HashMapStateBackend在java存儲 對象不同,數(shù)據(jù)存儲為序列化的字節(jié)數(shù)組
RocksDB可以根據(jù)可用磁盤空間進行擴展豪椿,并且是唯一支持增量快照的狀態(tài)后端奔坟。
但是每個狀態(tài)訪問和更新都需要(反)序列化并可能從磁盤讀取,這導致平均性能比內(nèi)存狀態(tài)后端慢一個數(shù)量級
-
場景
具有非常大狀態(tài)咳秉、長窗口鸯隅、大鍵/值狀態(tài)的作業(yè)。
所有高可用性設(shè)置
舊版
-
MemoryStateBackend(內(nèi)存炕舵,不推薦在生產(chǎn)場景使用)
FsStateBackend(文件系統(tǒng)上跟畅,本地文件系統(tǒng)、HDFS, 性能更好,常用)
RocksDBStateBackend (無需擔心 OOM 風險徊件,是大部分時候的選擇)
代碼配置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
//或者
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
什么是Checkpoint檢查點
Flink中所有的Operator的當前State的全局快照
默認情況下 checkpoint 是禁用的
Checkpoint是把State數(shù)據(jù)定時持久化存儲庇忌,防止丟失
手工調(diào)用checkpoint,叫 savepoint疏橄,主要是用于flink集群維護升級等
底層使用了Chandy-Lamport 分布式快照算法,保證數(shù)據(jù)在分布式環(huán)境下的一致性
有狀態(tài)流應(yīng)用的一致檢查點略就,其實就是所有任務(wù)的狀態(tài),在某個時間點的一份 拷貝(一份快照)窄绒;這個時間點崔兴,應(yīng)該是所有任務(wù)都恰好處理完一個相同的輸入數(shù)據(jù)的時候
Flink 捆綁的些檢查點存儲類型:
作業(yè)管理器檢查點存儲 JobManagerCheckpointStorage
文件系統(tǒng)檢查點存儲 FileSystemCheckpointStorage
端到端(end-to-end)狀態(tài)一致性
數(shù)據(jù)一致性保證都是由流處理器實現(xiàn)的蛔翅,也就是說都是在Flink流處理器內(nèi)部保證的
在真實應(yīng)用中山析,了流處理器以外還包含了數(shù)據(jù)源(例如Kafka掏父、Mysql)和輸出到持久化系統(tǒng)(Kafka、Mysql爵政、Hbase、CK)
端到端的一致性保證陶缺,是意味著結(jié)果的正確性貫穿了整個流處理應(yīng)用的各個環(huán)節(jié),每一個組件都要保證自己的一致性等龙。
-
Source
- 需要外部數(shù)據(jù)源可以重置讀取位置伶贰,當發(fā)生故障的時候重置偏移量到故障之前的位置
-
內(nèi)部
- 依賴Checkpoints機制黍衙,在發(fā)生故障的時可以恢復各個環(huán)節(jié)的數(shù)據(jù)
-
Sink:
- 當故障恢復時,數(shù)據(jù)不會重復寫入外部系統(tǒng)位仁,常見的就是 冪等和事務(wù)寫入(和checkpoint配合)
有關(guān)檢查點配置的常用參數(shù)配置介紹
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設(shè)置checkpoint的周期, 每隔1000 ms進行啟動一個檢查點
env.getCheckpointConfig().setCheckpointInterval(1000);
// 設(shè)置狀態(tài)級別模式為exactly-once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//超時時間方椎,可能是保存太耗費時間或者是狀態(tài)后端的問題,任務(wù)同步執(zhí)行不能一直阻塞
env.getCheckpointConfig().setCheckpointTimeout(60000L);
// 設(shè)置取消和故障時是否保留Checkpoint數(shù)據(jù)琳疏,這個設(shè)置較為重要闸拿,沒有正確的選擇好可能會導致檢查點數(shù)據(jù)失效
//有兩個參數(shù)可以設(shè)置
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 取消作業(yè)時保留檢查點新荤。必須在取消后手動清理檢查點狀態(tài)。
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 取消作業(yè)時刪除檢查點篱瞎。只有在作業(yè)失敗時,檢查點狀態(tài)才可用俐筋。
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
實戰(zhàn)部分:
為了模擬生產(chǎn)環(huán)境中實時產(chǎn)生的訂單數(shù)據(jù)校哎,這里我們自己定義一個數(shù)據(jù)源來源源不斷的產(chǎn)生模擬訂單數(shù)據(jù)
訂單類:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {
private String tradeNo;
private String title;
private int money;
private int userId;
private Date createTime;
@Override
public String toString() {
return "VideoOrder{" +
"tradeNo='" + tradeNo + '\'' +
", title='" + title + '\'' +
", money=" + money +
", userId=" + userId +
", createTime=" + createTime +
'}';
}
}
public class VideoOrderSourceV2 extends RichParallelSourceFunction<VideoOrder> {
private volatile Boolean flag = true;
private Random random = new Random();
private static List<VideoOrder> list = new ArrayList<>();
static {
list.add(new VideoOrder("","java",10,0,null));
list.add(new VideoOrder("","spring boot",15,0,null));
}
/**
* run 方法調(diào)用前 用于初始化連接
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("-----open-----");
}
/**
* 用于清理之前
* @throws Exception
*/
@Override
public void close() throws Exception {
System.out.println("-----close-----");
}
/**
* 產(chǎn)生數(shù)據(jù)的邏輯
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<VideoOrder> ctx) throws Exception {
while (flag){
Thread.sleep(1000);
String id = UUID.randomUUID().toString().substring(30);
int userId = random.nextInt(10);
int videoNum = random.nextInt(list.size());
VideoOrder videoOrder = list.get(videoNum);
videoOrder.setUserId(userId);
videoOrder.setCreateTime(new Date());
videoOrder.setTradeNo(id);
System.out.println("產(chǎn)生:"+videoOrder.getTitle()+"闷哆,價格:"+videoOrder.getMoney()+", 時間:"+ TimeUtil.format(videoOrder.getCreateTime()));
ctx.collect(videoOrder);
}
}
/**
* 控制任務(wù)取消
*/
@Override
public void cancel() {
flag = false;
}
}
產(chǎn)生數(shù)據(jù)的格式如下:
主程序:使用reduce算子對數(shù)據(jù)進訂單價格進行滾動計算抱怔,并設(shè)置Checkpoint保證數(shù)據(jù)狀態(tài)可以存取
public class FlinkKeyByReduceApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//構(gòu)建執(zhí)行任務(wù)環(huán)境以及任務(wù)的啟動的入口, 存儲全局相關(guān)的參數(shù)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(5000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//這是我本機的ip地址
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://192.168.192.100:8020/checkpoint"));
DataStreamSource<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
KeyedStream<VideoOrder, String> videoOrderStringKeyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
});
SingleOutputStreamOperator<VideoOrder> reduce = videoOrderStringKeyedStream.reduce(new ReduceFunction<VideoOrder>() {
@Override
public VideoOrder reduce(VideoOrder value1, VideoOrder value2) throws Exception {
VideoOrder videoOrder = new VideoOrder();
videoOrder.setTitle(value1.getTitle());
videoOrder.setMoney(value1.getMoney() + value2.getMoney());
return videoOrder;
}
});
reduce.print();
env.execute("job");
}
}
在本地測試運行結(jié)果屈留,可以看到數(shù)據(jù)根據(jù)訂單分組不斷的進行滾動計算
進入服務(wù)器的HDFS查看檢查點數(shù)據(jù)是否存在
之后將應(yīng)用進行打包灌危,上傳到服務(wù)器進行測試碳胳,可以使用Flink的Web頁面進行手動提交jar包運行,也可以使用命令進行提交味混,之后可以看到程序運行過程中的相關(guān)日志輸出
./bin/flink run -c net.xxx.xxx.FlinkKeyByReduceApp -p 3 /xiaochan-flink.jar
模擬宕機
運行程序的時候我們可以在Flink看到任務(wù)進行的id號翁锡,這個時候我們手動的cancel掉或者是直接把服務(wù)kill掉夕土,這個時候任務(wù)被強制暫停。
進入到HDFS可以看到我們設(shè)置的檢查點的數(shù)據(jù)依舊存在哈踱,我們使用如下命令梨熙,讓程序從上次宕機前的訂單計算狀態(tài)繼續(xù)往下計算。
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="" cid="n204" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; color: rgb(51, 51, 51); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">-s : 指定檢查點的元數(shù)據(jù)的位置邪财,這個位置記錄著宕機前程序的計算狀態(tài)
./bin/flink run -s /checkpoint/id號/chk-23/_metadata -c net.xxx.xxx.FlinkKeyByReduceApp -p 3 /root/xdclass-flink.jar </pre>
運行命令树埠,進入WEB頁面進行查看,是否成功又碌。
可以看到出現(xiàn)一次close的時候绊袋,代表我們的程序以及停止,服務(wù)器已經(jīng)宕機皂岔,這個時候訂單的計算結(jié)果如上圖的紅色方框展姐。在我們運行了上面那條命令后再次查看日志的數(shù)據(jù),從open開始可以看到這次就不是從訂單最初的狀態(tài)開始進行的了教馆,而是從上一次宕機前計算的結(jié)果墅拭,繼續(xù)往下計算,到這里Checkponit的實戰(zhàn)應(yīng)用測試就完成了舒憾。