使用Data Stream API編寫的程序通常以不同的形式持有狀態(tài):
- 在窗口中收集或聚合元素陈辱,直到觸發(fā)狀態(tài)存儲(chǔ)
- 轉(zhuǎn)換函數(shù)可能使用key/value狀態(tài)接口來存儲(chǔ)元素
- 轉(zhuǎn)換函數(shù)可能實(shí)現(xiàn)
CheckpointedFunction
接口來使得它們的本地變量容錯(cuò)赌渣。
參見流API指南中的狀態(tài)部分
當(dāng)checkpoint被激活時(shí),狀態(tài)會(huì)被持久化到checkpoint却特,以防止數(shù)據(jù)丟失和無縫恢復(fù)尖飞。狀態(tài)在內(nèi)部如何組織和它們?nèi)绾我约霸谀某志没=模蕾囉谒x的狀態(tài)后端。
可選的狀態(tài)后端
Flink內(nèi)部提供了這些狀態(tài)后端:
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
如果沒有其他配置涣脚,系統(tǒng)將使用MemoryStateBackend示辈。
MemoryStateBackend
MemoryStateBackend將內(nèi)部的數(shù)據(jù)保存在Java堆上。 Key/value狀態(tài)和窗口操作符持有存儲(chǔ)值遣蚀,觸發(fā)器等的哈希表矾麻。
當(dāng)進(jìn)行checkpoint時(shí)纱耻,這個(gè)狀態(tài)后端會(huì)對(duì)當(dāng)前的狀態(tài)進(jìn)行快照,并且將其作為checkpoint ACK消息的一部分發(fā)送給JobManager(master)险耀,該JobManager將其存儲(chǔ)在它的堆上弄喘。
MemoryStateBackend可以配置使用異步快照的方式。雖然我們強(qiáng)烈鼓勵(lì)使用異步快照的方式來避免管道阻塞甩牺,但是請注意蘑志,這個(gè)是一個(gè)新特性,目前默認(rèn)情況下不啟用柴灯。為了啟用這個(gè)狀態(tài)卖漫,用戶可以在初始化 MemoryStateBackend
時(shí)將構(gòu)造函數(shù)中相應(yīng)的布爾標(biāo)識(shí)設(shè)為 true
,例如:
new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
MemoryStateBackend的局限性:
- 單個(gè)狀態(tài)的大小默認(rèn)情況下最大為5MB赠群。這個(gè)值可以通過MemoryStateBackend構(gòu)造函數(shù)進(jìn)行增加羊始。
- 無論配置的最大狀態(tài)大小為多少,狀態(tài)的大小不能超過akka幀大小(見Configuration)
- 聚合的狀態(tài)必須在JobManager的內(nèi)存中能存放
MemoryStateBackend適用于:
- 本地開發(fā)和調(diào)試
- 只有很小狀態(tài)的作業(yè)查描,例如作業(yè)只由record-at-a-time函數(shù)組成(Map突委,F(xiàn)latMap,F(xiàn)ilter冬三,...)匀油。Kafka消費(fèi)者只需要非常小的狀態(tài)。
FsStateBackend
FsStateBackend使用文件系統(tǒng)URL(類型勾笆,地址敌蚜,路徑),例如“hdfs://namenode:40010/flink/checkpoints” 或 “file:///data/flink/checkpoints”.
FsStateBackend將in-flight數(shù)據(jù)存放在TaskManager的內(nèi)存中窝爪。當(dāng)進(jìn)行checkpoint時(shí)弛车,它將狀態(tài)快照寫入到配置的文件系統(tǒng)和目錄。最小的元數(shù)據(jù)存儲(chǔ)在JobManager的內(nèi)存中(或者蒲每,在高可用模式下纷跛,在元數(shù)據(jù)checkpoint中)。
FsStateBackend默認(rèn)使用異步快照以避免在寫狀態(tài)checkpoint時(shí)阻塞處理管道邀杏。要禁用此特性贫奠,用戶可以初始化 MemoryStateBackend
時(shí)將構(gòu)造函數(shù)中相應(yīng)的布爾標(biāo)識(shí)設(shè)為 false
,例如:
new FsStateBackend(path, false);
FsStateBackend適用于:
- 具有大狀態(tài)望蜡,長窗口唤崭,大key/value狀態(tài)的作業(yè)
- 所有的高可用性設(shè)置
RocksDBStateBackend
RocksDBStateBackend 使用文件系統(tǒng)URL(類型,地址脖律,路徑)浩姥,例如“hdfs://namenode:40010/flink/checkpoints” 或 “file:///data/flink/checkpoints”.
RocksDBStateBackend將in-flight數(shù)據(jù)存儲(chǔ)在RocksDB數(shù)據(jù)庫中,它(默認(rèn))存儲(chǔ)在TaskManager的data目錄下状您。當(dāng)checkpoint時(shí)勒叠,整個(gè)RocksDB數(shù)據(jù)庫將被checkpoint到配置的文件系統(tǒng)和目錄下兜挨。最小的元數(shù)據(jù)存儲(chǔ)在JobManager的內(nèi)存中(或者,在高可用模式下眯分,在元數(shù)據(jù)checkpoint中)拌汇。
RocksDBStateBackend總是執(zhí)行異步快照。
RocksDBStateBackend的限制:
- 作為RocksDB的JNI橋接API是基于byte[]的弊决,每個(gè)key和value的最大的支持大小是 2^31字節(jié)噪舀。重要:在RocksDB中使用合并操作的狀態(tài)(例如,ListState)能夠默默的積累到值的size大于 2^31字節(jié)飘诗,并且在下次檢索時(shí)會(huì)失敗与倡。這是目前 RocksDB JNI的限制。
RocksDBStateBackend適用于:
- 具有大狀態(tài)昆稿,長窗口纺座,大key/value狀態(tài)的作業(yè)
- 所有的高可用性設(shè)置
注意:你可以保持的狀態(tài)的數(shù)量只受限于磁盤可用空間的大小。相比于將狀態(tài)保存到內(nèi)存的FsStateBackend溉潭,這允許保持非常大的狀態(tài)净响。然而,這也意味著喳瓣,可以達(dá)到的最大的吞吐量會(huì)比狀態(tài)后端的吞吐量要低馋贤。
RocksDBStateBackend是目前唯一提供增量checkpoint的狀態(tài)后端(請參閱此處) 。
配置狀態(tài)后端
如果您不做任何指定畏陕,默認(rèn)的狀態(tài)后端是JobManager配乓。如果你希望為你的集群中的所有作業(yè)創(chuàng)建一個(gè)非默認(rèn)的狀態(tài)后端,你可以通過在flink-conf.yaml中指定一個(gè)新的默認(rèn)后端惠毁。默認(rèn)的狀態(tài)后端可以在每個(gè)作業(yè)的基礎(chǔ)上進(jìn)行覆蓋犹芹,如下所示.
設(shè)置一個(gè)作業(yè)級(jí)的狀態(tài)后端
作業(yè)的狀態(tài)后端通過作業(yè)中的 StreamExecutionEnvironment
進(jìn)行設(shè)置,如下述示例所示:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
設(shè)置默認(rèn)狀態(tài)后端
默認(rèn)狀態(tài)后端可以通過在 flink-conf.yaml
中設(shè)置state.backend
值指定仁讨。
可能的配置項(xiàng)是jobmanager (MemoryStateBackend), filesystem (FsStateBackend)实昨, rocksdb (RocksDBStateBackend)洞豁,或者實(shí)現(xiàn)了狀態(tài)后端工廠FsStateBackendFactory的類的完全限定類名,例如荒给,為RocksDBStateBackend設(shè)置為org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
丈挟。
配置文件中的示例部分如下所示:
# The backend that will be used to store operator state checkpoints
state.backend: filesystem
# Directory for storing checkpoints
state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints