checkPoint簡介
為了保證state的容錯性腾降,F(xiàn)link需要對state進行checkpoint翁都。
Checkpoint是Flink實現(xiàn)容錯機制最核心的功能潮罪,它能夠根據(jù)配置周期性地基于Stream中各個Operator/task的狀態(tài)來生成快照,從而將這些狀態(tài)數(shù)據(jù)定期持久化存儲下來逗宁,當Flink程序一旦意外崩潰時映九,重新運行程序時可以有選擇地從這些快照進行恢復,從而修正因為故障帶來的程序數(shù)據(jù)異常
-
Flink的checkpoint機制可以與(stream和state)的持久化存儲交互的前提:
持久化的source瞎颗,它需要支持在一定時間內(nèi)重放事件件甥。這種sources的典型例子是持久化的消息隊列(比如Apache Kafka捌议,RabbitMQ等)或文件系統(tǒng)(比如HDFS,S3引有,GFS等)
用于state的持久化存儲瓣颅,例如分布式文件系統(tǒng)(比如HDFS,S3譬正,GFS等)
checkPoint配置
默認checkpoint功能是disabled的宫补,想要使用的時候需要先啟用
checkpoint開啟之后,默認的checkPointMode是Exactly-once
checkpoint的checkPointMode有兩種曾我,Exactly-once和At-least-once
Exactly-once對于大多數(shù)應(yīng)用來說是最合適的粉怕。At-least-once可能用在某些延遲超低的應(yīng)用程序(始終延遲為幾毫秒)
默認checkpoint功能是disabled的,想要使用的時候需要先啟用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔1000 ms進行啟動一個檢查點【設(shè)置checkpoint的周期】
env.enableCheckpointing(1000);
// 高級選項:
// 設(shè)置模式為exactly-once (這是默認值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 檢查點必須在一分鐘內(nèi)完成抒巢,或者被丟棄【checkpoint的超時時間】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一時間只允許進行一個檢查點
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink處理程序被cancel后斋荞,會保留Checkpoint數(shù)據(jù),以便根據(jù)實際需要恢復到指定的Checkpoint【詳細解釋見備注】
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程序被cancel后虐秦,會保留Checkpoint數(shù)據(jù)平酿,以便根據(jù)實際需要恢復到指定的Checkpoint
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程序被cancel后,會刪除Checkpoint數(shù)據(jù)悦陋,只有job執(zhí)行失敗的時候才會保存checkpoint
State Backend(狀態(tài)的后端存儲)
默認情況下蜈彼,state會保存在taskmanager的內(nèi)存中,checkpoint會存儲在JobManager的內(nèi)存中俺驶。
-
state 的store和checkpoint的位置取決于State Backend的配置
- env.setStateBackend(…)
-
一共有三種State Backend
MemoryStateBackend
FsStateBackend
RocksDBStateBackend
-
MemoryStateBackend
state數(shù)據(jù)保存在java堆內(nèi)存中幸逆,執(zhí)行checkpoint的時候,會把state的快照數(shù)據(jù)保存到j(luò)obmanager的內(nèi)存中
基于內(nèi)存的Memory state backend在生產(chǎn)環(huán)境下不建議使用
-
FsStateBackend
state數(shù)據(jù)保存在taskmanager的內(nèi)存中暮现,執(zhí)行checkpoint的時候还绘,會把state的快照數(shù)據(jù)保存到配置的文件系統(tǒng)中
可以使用hdfs等分布式文件系統(tǒng)
-
RocksDBStateBackend
RocksDB跟上面的都略有不同,它會在本地文件系統(tǒng)中維護狀態(tài)栖袋,state會直接寫入本地rocksdb中拍顷。同時它需要配置一個遠端的filesystem uri(一般是HDFS),在做checkpoint的時候塘幅,會把本地的數(shù)據(jù)直接復制到filesystem中昔案。fail over的時候從filesystem中恢復到本地
RocksDB克服了state受內(nèi)存限制的缺點,同時又能夠持久化到遠端文件系統(tǒng)中电媳,比較適合在生產(chǎn)中使用
State Backend使用方式
修改State Backend的兩種方式
-
第一種:單任務(wù)調(diào)整
修改當前任務(wù)代碼
env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依賴】
-
第二種:全局調(diào)整
- 修改flink-conf.yaml
state.backend: filesystem state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
-
注意:state.backend的值可以是下面幾種:
jobmanager(MemoryStateBackend)
filesystem(FsStateBackend)
rocksdb(RocksDBStateBackend)
State backend演示
第一種:單任務(wù)調(diào)整
啟動連接socket zzy:9001的程序
./bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 -c com.zzy.bigdata.flink.SocketWindowWordCountJavaCheckPoint zzy_flink_learn.jar --port 9001
[iknow@data-5-63 flink-1.7.2]$ ./bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 -c com.zzy.bigdata.flink.SocketWindowWordCountJavaCheckPoint zzy_flink_learn.jar --port 9001
2019-03-06 12:03:15,057 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-iknow.
2019-03-06 12:03:15,057 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-iknow.
2019-03-06 12:03:15,325 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at /0.0.0.0:8032
2019-03-06 12:03:15,415 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-03-06 12:03:15,415 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-03-06 12:03:15,421 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument yn is deprecated in will be ignored.
2019-03-06 12:03:15,421 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument yn is deprecated in will be ignored.
2019-03-06 12:03:15,511 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1}
2019-03-06 12:03:15,819 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/home/iknow/zhangzhiyong/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-03-06 12:03:16,386 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1551789318445_0004
2019-03-06 12:03:16,412 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1551789318445_0004
2019-03-06 12:03:16,412 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
2019-03-06 12:03:16,414 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2019-03-06 12:03:19,940 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully.
Starting execution of program
如果zzy上未開啟9001端口,到j(luò)obManager的web ui上看到會報下面的錯
代碼里設(shè)置了checkpoint
//獲取flink的運行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默認checkpoint功能是disabled的踏揣,想要使用的時候需要先啟用;每隔10000ms進行啟動一個檢查點【設(shè)置checkpoint的周期】
env.enableCheckpointing(10000);
// 高級選項:
// 設(shè)置模式為exactly-once (這是默認值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 確保檢查點之間有至少500ms的間隔【checkpoint最小間隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 檢查點必須在一分鐘內(nèi)完成,或者被丟棄【checkpoint的超時時間】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一時間只允許進行一個檢查點
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink處理程序被cancel后匾乓,會保留Checkpoint數(shù)據(jù)捞稿,以便根據(jù)實際需要恢復到指定的Checkpoint【詳細解釋見備注】
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程序被cancel后,會保留Checkpoint數(shù)據(jù),以便根據(jù)實際需要恢復到指定的Checkpoint
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程序被cancel后娱局,會刪除Checkpoint數(shù)據(jù)彰亥,只有job執(zhí)行失敗的時候才會保存checkpoint
//設(shè)置statebackend
//env.setStateBackend(new MemoryStateBackend());
//env.setStateBackend(new FsStateBackend("hdfs://zzy:9000/flink/checkpoints"));
//rocksDB需要引入依賴flink-statebackend-rocksdb_2.11
//env.setStateBackend(new RocksDBStateBackend("hdfs://zzy:9000/flink/checkpoints",true));
env.setStateBackend(new FsStateBackend("hdfs://192.168.5.63:9000/flink/checkpoints"));
但是JobManager的web ui上checkpoint并未觸發(fā)
報錯如下,應(yīng)該是連接不到zzy 9001铃辖,識別不了zzy
選擇監(jiān)聽50.63上的9001端口,如果沒有nc命令猪叙,用
yum install -y nc
安裝下娇斩,用下面的命令啟動flink程序,采用flink on yarn的方式
./bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 -c com.zzy.bigdata.flink.SocketWindowWordCountJavaCheckPoint zzy_flink_learn.jar --port 9001
2019-03-06 16:00:24,680 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster
如果一直出現(xiàn)Deployment xxx穴翩,此時可能是集群上沒有資源了犬第,
這里殺掉application_1551789318445_0007和application_1551789318445_0008(這兩臺是測試機器,資源很緊張)
然后再次重啟程序
注意yarn是不是successfully.的狀態(tài)
Yarn上啟動了應(yīng)用application_1551789318445_0009
點擊AM進去jobManager的web ui界面
Checkpoint的UI
可以看到每隔10s進行一次checkpoint
Hdfs上查看checkpoint數(shù)據(jù)芒帕,看到保存了最近10次的checkpoint數(shù)據(jù)
95d75e802ba1eceefeaf98636e907883跟job ID是對應(yīng)的
說明flink配置文件conf/flink-conf.yaml里的配置生效了
flink可以保存多個checkpoint,添加如下配置歉嗓,指定最多需要保存Checkpoint的個數(shù)
state.checkpoints.num-retained: 10