Flink提供了不同的狀態(tài)存儲方式兆旬,并說明了狀態(tài)如何存和存儲在哪里假抄。
狀態(tài)可以被存儲在Jvm的堆和堆外。根據(jù)狀態(tài)存儲方式的不同,F(xiàn)link也能代替應(yīng)用管理狀態(tài)宿饱,意思是Flink能夠進(jìn)行內(nèi)存管理(有必要的時候,可能會溢出到硬盤),允許應(yīng)用保存非常大的狀態(tài)熏瞄。默認(rèn)情況下,在配置文件flink-conf.yaml
中為所有Flink作業(yè)配置狀態(tài)存儲方式谬以。
然而强饮,默認(rèn)的狀態(tài)存儲方式配置可以被單獨(dú)的作業(yè)設(shè)置覆蓋,就像下面那樣为黎。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(...);
使用Data Stream API
寫的程序經(jīng)常需要以多種情況保存狀態(tài):
- 在窗口被觸發(fā)之前, 窗口需要保存或聚合元素
- 轉(zhuǎn)換算子也許會使用key/value狀態(tài)接口保存數(shù)據(jù)
- 轉(zhuǎn)換算子也許實現(xiàn)
CheckpointedFunction
接口使本地變量容錯邮丰。
當(dāng)checkpointing被激活的時候,一旦發(fā)生checkpoint铭乾,狀態(tài)會被保存剪廉,這樣數(shù)據(jù)就不會丟失,并且在恢復(fù)的時候能夠保持?jǐn)?shù)據(jù)一致性炕檩。狀態(tài)在內(nèi)部是怎么表示的斗蒋,以及當(dāng)checkpoint的時候,狀態(tài)怎么樣被保存笛质,以及保存到哪里依賴選擇的狀態(tài)存儲方式泉沾。
Flink提供了三種開箱即用的狀態(tài)存儲方式:
- MemoryStateBackend 內(nèi)存存儲
- FsStateBackend 文件系統(tǒng)存儲
- RocksDBStateBackend RocksDB存儲
如果沒有特殊配置,系統(tǒng)默認(rèn)使用內(nèi)存存儲方式妇押。
MemoryStateBackend 內(nèi)存存儲
內(nèi)存存儲:在Java
堆中保存狀態(tài)對象跷究。Key/Value狀態(tài)和窗口算子都會以Hash表的方式保存狀態(tài)值,觸發(fā)器等敲霍。
當(dāng)checkpoint的時候俊马,狀態(tài)存儲將會快照狀態(tài),將當(dāng)checkpoint向JobManager發(fā)送回執(zhí)消息時色冀,作為消息的一部分發(fā)給JobManager(master)潭袱,JobManager會將狀態(tài)存儲到堆內(nèi)存中。
可以配置內(nèi)存存儲使用異步快照锋恬。我們也強(qiáng)烈推薦使用異步快照屯换,避免阻塞流處理通道。請注意默認(rèn)是打開異步快照的与学。如果想要關(guān)閉這個特性彤悔,用戶可以在實現(xiàn)化MemoryStateBackend
對象的時候,給構(gòu)造函數(shù)中相應(yīng)的boolean
參數(shù)傳false
(這應(yīng)該僅用于調(diào)試目的)索守。
new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
內(nèi)存存儲有如下限制:
- 每一個狀態(tài)大小默認(rèn)不超過
5M
晕窑。這個值可以在實例化MemoryStateBackend
的時候增加 - 不管配置的最大狀態(tài)大小是多少,狀態(tài)大小不能超過akka配置的楨(一次RPC傳輸?shù)臄?shù)據(jù))大小(參數(shù): akka.framesize,默認(rèn):10M)卵佛。
- 聚合的狀態(tài)必須適合
JobMaanger
內(nèi)存
以下情況推薦使用內(nèi)存存儲
- 本地開發(fā)或調(diào)用
- 只保存少量狀態(tài)的作業(yè)杨赤。例如僅僅包含一次一條記錄算子(例如:Map,FlatMap,Fliter,....)的作業(yè)敞斋。對于這樣的作業(yè),
Kafka Consumer
僅僅需要非常少的狀態(tài)疾牲。
FsStateBackend 文件系統(tǒng)存儲
通過配置文件系統(tǒng)的URL(類型,地址,路徑)使用文件系統(tǒng)存儲植捎。例如"hdfs://namenode:40010/flink/checkpoints"
或者"file:///data/flink/checkpoints"
。
FsStateBackend
將狀態(tài)數(shù)據(jù)保存在TaskManager’s
內(nèi)存中阳柔。當(dāng)checkpoint的時候焰枢,將狀態(tài)數(shù)據(jù)寫到配置的文件系統(tǒng)或目錄中。最小的元數(shù)據(jù)會存儲到JobManager
內(nèi)存中(或者在HA模式下舌剂,存儲到checkpoint元數(shù)據(jù)中).
FsStateBackend
默認(rèn)使用異步快照济锄,以避免阻塞流處理。如果想禁止該特性霍转,在實現(xiàn)化FsStateBackend
對象的時候荐绝,構(gòu)造函數(shù)中應(yīng)的參數(shù)傳入false
即可。
new FsStateBackend(path, false);
以下情況谴忧,推薦使用FsStateBackend
- 具有大狀態(tài)很泊,長窗口,大的key/value狀態(tài)的作業(yè)
- 所有HA模式下
RocksDBStateBackend RocksDB存儲
要想使用RocksDB存儲沾谓,需要配置文件系統(tǒng)的URL(類型,地址,路徑)。例如"hdfs://namenode:40010/flink/checkpoints"
或者"file:///data/flink/checkpoints"
戳鹅。
RocksDBStateBackend
將狀態(tài)數(shù)據(jù)保存到RocksDB數(shù)據(jù)庫.RocksDB文件默認(rèn)會存儲到TaskManager的數(shù)據(jù)目錄中均驶。當(dāng)checkpoint的時候,整個RocksDB數(shù)據(jù)庫將會保存到配置的文件系統(tǒng)或目錄中枫虏。最小的元數(shù)據(jù)會存儲到JobManager
內(nèi)存中(或者在HA模式下妇穴,存儲到checkpoint元數(shù)據(jù)中).
RocksDBStateBackend
總是執(zhí)行異步快照。
RocksDBStateBackend
具有如下限制:
- 由于 RocksDB JNI通信使用的API基于byte[]隶债,每個key或每個value最大支持2^31字節(jié)腾它。
注意: 在以RocksDB作用存儲情況下,使用merge操作的狀態(tài)(例如:ListState)會默默地將值大小累加到大于2^31字節(jié)死讹,當(dāng)再次讀取的時候會失敗瞒滴,這是目前RocksDB JNI的限制。
以下情況赞警,推薦使用RocksDBStateBackend
- 具有非常大的狀態(tài)妓忍,長窗口,大的key/value狀態(tài)的作業(yè)
- 所有HA模式下
你可以保存的狀態(tài)數(shù)據(jù)量僅僅受限于磁盤剩余空間大小愧旦。與將狀態(tài)保存到內(nèi)存中的``FsStateBackend `相比世剖,可以保存更大的狀態(tài)。然而這也意味著能達(dá)到的最大吞吐量更小笤虫。因為所有從rocksDB讀或?qū)懭雛ocksDB都需要經(jīng)過序列化與反序例化旁瘫,比那些基于Java堆的存儲后端開銷更大祖凫。
RocksDBStateBackend
是目前唯一提供 增量的checkpoint的存儲。
RocksDB的一些指標(biāo)可以被獲取酬凳,但是默認(rèn)沒打開惠况,可以在這里找到全部文檔說明。
配置狀態(tài)存儲
如果你什么也沒配置粱年,默認(rèn)的狀態(tài)存儲在JobManager
內(nèi)存中售滤。如果你希望為所有作業(yè)默認(rèn)一個其它的存儲,你可以在flink-conf.ymal
中配置其它的存儲台诗。當(dāng)然完箩,每一個作業(yè)也能單獨(dú)設(shè)置存儲。
每個作業(yè)單獨(dú)設(shè)置存儲
下面示例顯示StreamExecutionEnvironment
的作業(yè)如何設(shè)置存儲拉队。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
如果你想使用RocksDBStateBackend
弊知,你就必須在你的Flink項目中添加如下Maven依賴。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.8.0</version>
</dependency>
設(shè)置默認(rèn)的狀態(tài)存儲
默認(rèn)的狀態(tài)存儲能夠在flink-conf.yaml
文件中配置粱快,參數(shù)是state.backend
. 值可以選擇jobmanager(MemoryStateBackend)
, filesystem(FsStateBackend)
,rocksdb(RocksDBStateBackend)
三者中的一個秩彤,也可以配置實現(xiàn)了接口StateBackendFactory
的全類名。例如: RocksDBStateBackend
的實現(xiàn)類org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
.
state.checkpoints.dir
參數(shù)定義了checkpoint數(shù)據(jù)和元數(shù)據(jù)文件存儲的位置事哭,你可以在這里發(fā)現(xiàn)更詳細(xì)的checkpoint目錄結(jié)構(gòu)說明
配置示例:
# 狀態(tài)存儲
state.backend: filesystem
# checkpoints數(shù)據(jù)存儲目錄
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
翻譯自: https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/state_backends.html