Flink狀態(tài)(state)管理在代碼里配置checkpoint


checkPoint簡介

  • 為了保證state的容錯性腾降,F(xiàn)link需要對state進行checkpoint翁都。

  • Checkpoint是Flink實現(xiàn)容錯機制最核心的功能潮罪,它能夠根據(jù)配置周期性地基于Stream中各個Operator/task的狀態(tài)來生成快照,從而將這些狀態(tài)數(shù)據(jù)定期持久化存儲下來逗宁,當Flink程序一旦意外崩潰時映九,重新運行程序時可以有選擇地從這些快照進行恢復,從而修正因為故障帶來的程序數(shù)據(jù)異常

  • Flink的checkpoint機制可以與(stream和state)的持久化存儲交互的前提:

    • 持久化的source瞎颗,它需要支持在一定時間內(nèi)重放事件件甥。這種sources的典型例子是持久化的消息隊列(比如Apache Kafka捌议,RabbitMQ等)或文件系統(tǒng)(比如HDFS,S3引有,GFS等)

    • 用于state的持久化存儲瓣颅,例如分布式文件系統(tǒng)(比如HDFS,S3譬正,GFS等)

checkPoint配置

  • 默認checkpoint功能是disabled的宫补,想要使用的時候需要先啟用

  • checkpoint開啟之后,默認的checkPointMode是Exactly-once

  • checkpoint的checkPointMode有兩種曾我,Exactly-once和At-least-once

  • Exactly-once對于大多數(shù)應(yīng)用來說是最合適的粉怕。At-least-once可能用在某些延遲超低的應(yīng)用程序(始終延遲為幾毫秒)

  • 默認checkpoint功能是disabled的,想要使用的時候需要先啟用

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
// 每隔1000 ms進行啟動一個檢查點【設(shè)置checkpoint的周期】
env.enableCheckpointing(1000); 
// 高級選項:
// 設(shè)置模式為exactly-once (這是默認值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 
// 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); 
// 檢查點必須在一分鐘內(nèi)完成抒巢,或者被丟棄【checkpoint的超時時間】
env.getCheckpointConfig().setCheckpointTimeout(60000); 
// 同一時間只允許進行一個檢查點
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 
// 表示一旦Flink處理程序被cancel后斋荞,會保留Checkpoint數(shù)據(jù),以便根據(jù)實際需要恢復到指定的Checkpoint【詳細解釋見備注】
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程序被cancel后虐秦,會保留Checkpoint數(shù)據(jù)平酿,以便根據(jù)實際需要恢復到指定的Checkpoint
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程序被cancel后,會刪除Checkpoint數(shù)據(jù)悦陋,只有job執(zhí)行失敗的時候才會保存checkpoint

State Backend(狀態(tài)的后端存儲)

  • 默認情況下蜈彼,state會保存在taskmanager的內(nèi)存中,checkpoint會存儲在JobManager的內(nèi)存中俺驶。

  • state 的store和checkpoint的位置取決于State Backend的配置

    • env.setStateBackend(…)
  • 一共有三種State Backend

    • MemoryStateBackend

    • FsStateBackend

    • RocksDBStateBackend

  • MemoryStateBackend

    • state數(shù)據(jù)保存在java堆內(nèi)存中幸逆,執(zhí)行checkpoint的時候,會把state的快照數(shù)據(jù)保存到j(luò)obmanager的內(nèi)存中

    • 基于內(nèi)存的Memory state backend在生產(chǎn)環(huán)境下不建議使用

  • FsStateBackend

    • state數(shù)據(jù)保存在taskmanager的內(nèi)存中暮现,執(zhí)行checkpoint的時候还绘,會把state的快照數(shù)據(jù)保存到配置的文件系統(tǒng)中

    • 可以使用hdfs等分布式文件系統(tǒng)

  • RocksDBStateBackend

    • RocksDB跟上面的都略有不同,它會在本地文件系統(tǒng)中維護狀態(tài)栖袋,state會直接寫入本地rocksdb中拍顷。同時它需要配置一個遠端的filesystem uri(一般是HDFS),在做checkpoint的時候塘幅,會把本地的數(shù)據(jù)直接復制到filesystem中昔案。fail over的時候從filesystem中恢復到本地

    • RocksDB克服了state受內(nèi)存限制的缺點,同時又能夠持久化到遠端文件系統(tǒng)中电媳,比較適合在生產(chǎn)中使用

State Backend使用方式

修改State Backend的兩種方式

  • 第一種:單任務(wù)調(diào)整

    • 修改當前任務(wù)代碼

    • env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));

    • 或者new MemoryStateBackend()

    • 或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依賴】

  • 第二種:全局調(diào)整

    • 修改flink-conf.yaml
    state.backend: filesystem 
    state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints 
    
    • 注意:state.backend的值可以是下面幾種:

      • jobmanager(MemoryStateBackend)

      • filesystem(FsStateBackend)

      • rocksdb(RocksDBStateBackend)

State backend演示

第一種:單任務(wù)調(diào)整

啟動連接socket zzy:9001的程序

./bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 -c com.zzy.bigdata.flink.SocketWindowWordCountJavaCheckPoint zzy_flink_learn.jar --port 9001
[iknow@data-5-63 flink-1.7.2]$ ./bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 -c com.zzy.bigdata.flink.SocketWindowWordCountJavaCheckPoint zzy_flink_learn.jar --port 9001
2019-03-06 12:03:15,057 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-iknow.
2019-03-06 12:03:15,057 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-iknow.
2019-03-06 12:03:15,325 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at /0.0.0.0:8032
2019-03-06 12:03:15,415 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-03-06 12:03:15,415 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-03-06 12:03:15,421 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The argument yn is deprecated in will be ignored.
2019-03-06 12:03:15,421 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The argument yn is deprecated in will be ignored.
2019-03-06 12:03:15,511 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=1}
2019-03-06 12:03:15,819 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/home/iknow/zhangzhiyong/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-03-06 12:03:16,386 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Submitting application master application_1551789318445_0004
2019-03-06 12:03:16,412 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1551789318445_0004
2019-03-06 12:03:16,412 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Waiting for the cluster to be allocated
2019-03-06 12:03:16,414 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deploying cluster, current state ACCEPTED
2019-03-06 12:03:19,940 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - YARN application has been deployed successfully.
Starting execution of program

如果zzy上未開啟9001端口,到j(luò)obManager的web ui上看到會報下面的錯

代碼里設(shè)置了checkpoint

//獲取flink的運行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默認checkpoint功能是disabled的踏揣,想要使用的時候需要先啟用;每隔10000ms進行啟動一個檢查點【設(shè)置checkpoint的周期】
env.enableCheckpointing(10000);
// 高級選項:
// 設(shè)置模式為exactly-once (這是默認值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 確保檢查點之間有至少500ms的間隔【checkpoint最小間隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 檢查點必須在一分鐘內(nèi)完成,或者被丟棄【checkpoint的超時時間】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一時間只允許進行一個檢查點
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink處理程序被cancel后匾乓,會保留Checkpoint數(shù)據(jù)捞稿,以便根據(jù)實際需要恢復到指定的Checkpoint【詳細解釋見備注】
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程序被cancel后,會保留Checkpoint數(shù)據(jù),以便根據(jù)實際需要恢復到指定的Checkpoint
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程序被cancel后娱局,會刪除Checkpoint數(shù)據(jù)彰亥,只有job執(zhí)行失敗的時候才會保存checkpoint


//設(shè)置statebackend

//env.setStateBackend(new MemoryStateBackend());
//env.setStateBackend(new FsStateBackend("hdfs://zzy:9000/flink/checkpoints"));
//rocksDB需要引入依賴flink-statebackend-rocksdb_2.11
//env.setStateBackend(new RocksDBStateBackend("hdfs://zzy:9000/flink/checkpoints",true));
env.setStateBackend(new FsStateBackend("hdfs://192.168.5.63:9000/flink/checkpoints"));

但是JobManager的web ui上checkpoint并未觸發(fā)

報錯如下,應(yīng)該是連接不到zzy 9001铃辖,識別不了zzy


選擇監(jiān)聽50.63上的9001端口,如果沒有nc命令猪叙,用

yum install -y nc

安裝下娇斩,用下面的命令啟動flink程序,采用flink on yarn的方式

./bin/flink run -m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 -c com.zzy.bigdata.flink.SocketWindowWordCountJavaCheckPoint zzy_flink_learn.jar --port 9001
2019-03-06 16:00:24,680 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster

如果一直出現(xiàn)Deployment xxx穴翩,此時可能是集群上沒有資源了犬第,
這里殺掉application_1551789318445_0007和application_1551789318445_0008(這兩臺是測試機器,資源很緊張)

然后再次重啟程序


注意yarn是不是successfully.的狀態(tài)


Yarn上啟動了應(yīng)用application_1551789318445_0009
點擊AM進去jobManager的web ui界面


Checkpoint的UI

可以看到每隔10s進行一次checkpoint

Hdfs上查看checkpoint數(shù)據(jù)芒帕,看到保存了最近10次的checkpoint數(shù)據(jù)


95d75e802ba1eceefeaf98636e907883跟job ID是對應(yīng)的


說明flink配置文件conf/flink-conf.yaml里的配置生效了

flink可以保存多個checkpoint,添加如下配置歉嗓,指定最多需要保存Checkpoint的個數(shù)

state.checkpoints.num-retained: 10
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市背蟆,隨后出現(xiàn)的幾起案子鉴分,更是在濱河造成了極大的恐慌,老刑警劉巖带膀,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件志珍,死亡現(xiàn)場離奇詭異,居然都是意外死亡垛叨,警方通過查閱死者的電腦和手機伦糯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來嗽元,“玉大人敛纲,你說我怎么就攤上這事〖涟” “怎么了淤翔?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長佩谷。 經(jīng)常有香客問我办铡,道長,這世上最難降的妖魔是什么琳要? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任寡具,我火速辦了婚禮,結(jié)果婚禮上稚补,老公的妹妹穿的比我還像新娘童叠。我一直安慰自己,他們只是感情好,可當我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布厦坛。 她就那樣靜靜地躺著五垮,像睡著了一般。 火紅的嫁衣襯著肌膚如雪杜秸。 梳的紋絲不亂的頭發(fā)上放仗,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天,我揣著相機與錄音撬碟,去河邊找鬼诞挨。 笑死,一個胖子當著我的面吹牛呢蛤,可吹牛的內(nèi)容都是我干的惶傻。 我是一名探鬼主播,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼其障,長吁一口氣:“原來是場噩夢啊……” “哼银室!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起励翼,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤蜈敢,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后汽抚,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體扶认,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年殊橙,在試婚紗的時候發(fā)現(xiàn)自己被綠了辐宾。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡膨蛮,死狀恐怖叠纹,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情敞葛,我是刑警寧澤誉察,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布,位于F島的核電站惹谐,受9級特大地震影響持偏,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜氨肌,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一鸿秆、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧怎囚,春花似錦卿叽、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽贩虾。三九已至,卻和暖如春沥阱,著一層夾襖步出監(jiān)牢的瞬間缎罢,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工考杉, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留策精,地道東北人。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓奔则,卻偏偏與公主長得像蛮寂,于是被迫代替她去往敵國和親蔽午。 傳聞我的和親對象是個殘疾皇子易茬,可洞房花燭夜當晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容