最近看了看Flink中state方面的知識俱箱,F(xiàn)link中的state是啥廊移?state的作用是啥导披?為什么Flink中引入了state這個概念笋粟?既然最近的項目需要用到state怀挠,結(jié)合官網(wǎng)再加上網(wǎng)上大佬state的博客,那就簡單了解一下Flink中的state吧害捕!
1.State是什么
看到state這個詞绿淋,給人的第一印象就是中文翻譯:狀態(tài),那么state在Flink中有什么具體的涵義呢吨艇?state是Flink程序某個時刻某個task/operator的狀態(tài),state數(shù)據(jù)是程序運行中某一時刻數(shù)據(jù)結(jié)果腾啥,這個state數(shù)據(jù)會保存在taskmanager的內(nèi)存中东涡,也就是java的堆內(nèi)存中。官方推薦使用先存儲到本地的RocksDB中倘待,然后再將RocksDB中的狀態(tài)數(shù)據(jù)異步checkpoint到hdfs上疮跑。
2.State的作用
首先要將state和checkpoint概念區(qū)分開,可以理解為checkpoint是要把state數(shù)據(jù)持久化存儲起來凸舵,checkpoint默認情況下會存儲在JoManager的內(nèi)存中祖娘。checkpoint表示一個Flink job在一個特定時刻的一份全局狀態(tài)快照,方便在任務(wù)失敗的情況下數(shù)據(jù)的恢復(fù)啊奄。
3.State 狀態(tài)值存儲
State 的store和checkpoint的位置最終取決于State Backend的配置env.setStateBackend()渐苏。一共有三種env.setStateBackend(new RocksDBStateBackend()), env.setStateBackend(new MemoryStateBackend()), env.setStateBackend(new FsStateBackend())。
→→→→→→→→→→→→→→MemoryStateBackend ←←←←←←←←←←←←
?原理
①MemoryStateBackend 在java堆內(nèi)存上維護狀態(tài)菇夸。
②Checkpoint時琼富,MemoryStateBackend 對State做一次快照,并在向JobManager發(fā)送checkpoint確認完成的消息中帶上次快照數(shù)據(jù)庄新,然后快照就會存儲在JobManager的堆內(nèi)存中鞠眉。
③MemoryStateBackend 可以使用異步的方式進行快照,推薦使用異步的方式避免阻塞。如果不希望異步择诈,可以在構(gòu)造的時候傳入false
val backend: MemoryStateBackend = new MemoryStateBackend(10 *1024 *1024,false)
env.setStateBackend(backend)
? 限制
單個state的大小默認值為5MB械蹋,可以在MemoryStateBackend的構(gòu)造函數(shù)中增加。無論如何配置羞芍,State大小都無法大于akka.framesize(JobManager和TaskManager之間發(fā)送的最大消息的大小哗戈,默認是10MB)。JobManager必須有足夠的內(nèi)存荷科。
state默認大小
? 適用場景
本地開發(fā)和調(diào)試
→→→→→→→→→→→→→→→→FsStateBackend←←←←←←←←←←←←←←
? 原理
①FsStateBackend需要配置一個文件系統(tǒng)的url谱醇,如hdfs上的路徑:hdfs://hfflink/flink/checkpoints/... 暇仲。
②FsStateBackend在TaskManager的內(nèi)存中持有正在處理的數(shù)據(jù)。Checkpoint時將state snapshot寫入文件系統(tǒng)目錄下的文件中副渴。文件的路徑等元數(shù)據(jù)會傳遞給JobManager奈附。
③FsStateBackend可以使用異步的方式進行快照,推薦使用異步的方式避免阻塞煮剧。如果不希望異步斥滤,可以在構(gòu)造的時候傳入false
val backend = new FsStateBackend("hdfs://hfflink/flink/checkpoints/...",false)
env.setStateBackend(backend)
? 適用場景
大狀態(tài),長窗口勉盅,大鍵/值狀態(tài)的job
→→→→→→→→→→→→→→RocksDBStateBackend←←←←←←←←←←←←
? 原理
①RocksDBStateBackend需要配置一個文件系統(tǒng)的url佑颇,如hdfs上的路徑:hdfs://hfflink/flink/checkpoints/... 。
②RocksDBStateBackend將運行中的數(shù)據(jù)保存在RocksDB數(shù)據(jù)庫中草娜,默認情況下存儲在TaskManager數(shù)據(jù)目錄中挑胸。在checkpoint的時候,整個RocksDB數(shù)據(jù)庫將被checkpointed到配文件的文件系統(tǒng)和目錄中宰闰。文件的路徑等元數(shù)據(jù)會 傳遞給JobManager茬贵,存在其內(nèi)存中。
③RocksDBStateBackend總是執(zhí)行異步快照移袍。
? 限制
RocksDB JNI API是基于byte[],因此key和value最大支持大小為2^31個字節(jié)(2GB)解藻,RocksDB自身在支持較大value時候會有限制。
? 使用場景
超大狀態(tài)葡盗,超長窗口螟左,大鍵/值狀態(tài)的job.
? 與前兩種狀態(tài)后端對比
①狀態(tài)保存在數(shù)據(jù)庫RocksDB中,相比其他狀態(tài)后端可保存更大的狀態(tài)觅够,但開銷更大(讀/寫需要反序列化/序列化去檢索/存儲狀態(tài))胶背。
②目前只有RocksDBStateBackend支持增量checkpoint(默認全量),false默認全量,true代表增量喘先。
val checkpoint = "hdfs://hfflink/flink/checkpoints/..."
env.setStateBackend(new RocksDBStateBackend(checkpoint, true))
4.State的應(yīng)用
-
State->KeyedState
KeyedState是基于KeyedStream上的狀態(tài)奄妨,這個狀態(tài)是跟特定的key綁定的,對KeyedStream流上的每個key都有對應(yīng)的state苹祟。Keyed State 僅僅可以被使用在基于KeyStream上的functions和operators中砸抛。
-
State->OperatorState
OperatorState就是non-keyed state,每個Operator State都綁定到一個并行的算子實例中树枫。Kafka Connector就是一個使用Operator State的一個很好的例子直焙,他會在每個connector實例中,保存改實例中消費topic的所有(partition砂轻,offset)映射奔誓。
5.State存在形式
Keyed State和Operator State存在兩種形式:managed (托管狀態(tài))和 raw(原始狀態(tài))。托管狀態(tài)是由Flink框架管理的狀態(tài);而原始狀態(tài)是由用戶自行管理狀態(tài)的具體數(shù)據(jù)結(jié)構(gòu)厨喂,框架在做checkpoint的時候和措,使用bytes 數(shù)組讀寫狀態(tài)內(nèi)容,對其內(nèi)部數(shù)據(jù)結(jié)構(gòu)一無所知蜕煌。通常所有的datastream functions都可以使用托管狀態(tài),但是原始狀態(tài)接口僅僅能夠在實現(xiàn)operators的時候使用派阱。推薦使用managed state而不是使用raw state,因為使用托管狀態(tài)的時候Flink可以在parallelism發(fā)生改變的情況下能夠動態(tài)重新分配狀態(tài)斜纪,而且還能更好的進行內(nèi)存管理贫母。
6.Keyed State在托管狀態(tài)下使用
托管的keyed state接口提供了不同類型的state,它們都是作用于當前輸入流元素的key上。這就意味著這種類型的state僅僅可以被使用在keyedStream之上盒刚,keyedStream可以通過stream.keyBy(...)來創(chuàng)建腺劣。
接下來我們首先來看一看下面所提供的不同類型的state,然后再看看它們在代碼中是如果使用的。
- ValueState<T>:即類型為T的單值狀態(tài)因块。這個狀態(tài)與對應(yīng)的key綁定橘原,是最簡單的狀態(tài)了。它可以通過update()方法更新狀態(tài)值涡上,通過value()方法獲取狀態(tài)值趾断。
case class valueStateFunction extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
private var sum: ValueState[(Long, Long)] = _
override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
// access the state value
val tmpCurrentSum = sum.value
// If it hasn't been used before, it will be null
val currentSum = if (tmpCurrentSum != null) {
tmpCurrentSum
} else {
(0L, 0L)
}
// update the count
val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
// update the state
sum.update(newSum)
// if the count reaches 2, emit the average and clear the state
if (newSum._1 >= 2) {
out.collect((input._1, newSum._2 / newSum._1))
sum.clear()
}
}
override def open(parameters: Configuration): Unit = {
sum = getRuntimeContext.getState(
new ValueStateDescriptor[(Long, Long)]("value-state-name", createTypeInformation[(Long, Long)])
)
}
}
val input: DataStream[(Long, Long)] = ...
input.keyBy(_._1).flatMap(new valueStateFunction())
.print()
env.execute("job-name")
- ListState<T>:即key上的狀態(tài)值為一個列表∠判福可以通過add()方法往列表中附加值歼冰;也可以通過get()方法返回一個Iterable<T>來遍歷狀態(tài)值靡狞。
case class CountListState() extends RichFlatMapFunction[(Long, Long), List[(Long, Long)]] {
private var listState: ListState[(Long, Long)] = null
//業(yè)務(wù)邏輯根據(jù)自己的實際情況設(shè)計
override def flatMap(value: (Long, Long), out: Collector[List[(Long, Long)]]): Unit = {
var list: List[(Long, Long)] = List()
listState.add(value)
val iterable: lang.Iterable[(Long, Long)] = listState.get()
val itt = iterable.iterator()
while (itt.hasNext) {
list = list :+ itt.next()
}
out.collect(list)
}
override def open(parameters: Configuration): Unit = {
listState = getRuntimeContext.getListState(new ListStateDescriptor[(Long, Long)]("list-state-name", createTypeInformation[(Long, Long)]))
}
}
val input: DataStream[(Long, Long)] = ...
input.keyBy(_._1).flatMap(new listStateFunction())
.print()
env.execute("job-name")
- ReducingState<T>:這種狀態(tài)通過用戶傳入的reduceFunction耻警,每次調(diào)用add()方法添加值的時候,會調(diào)用reduceFunction甸怕,最后合并到一個單一的狀態(tài)值甘穿。
case class reducingStateFunction() extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
private var state: ReducingState[Long] = null
//業(yè)務(wù)邏輯根據(jù)自己的實際情況設(shè)計
override def flatMap(value: (Long, Long), out: Collector[(Long, Long)]): Unit = {
state.add(value._2)
out.collect(value)
}
override def open(parameters: Configuration): Unit = {
state = getRuntimeContext.getReducingState(new ReducingStateDescriptor[Long]("reducing-state-name", new ReduceFunction[Long] {
override def reduce(value1: Long, value2: Long): Long = {
value1 + value2
}
}, createTypeInformation[Long]))
}
}
val input: DataStream[(Long, Long)] = ...
input.keyBy(_._1).flatMap(new reducingStateFunction())
.print()
env.execute("job-name")
- AggregatingState<IN,OUT>:這種狀態(tài)值保留一個值,該值是添加的所有值的聚合梢杭。和ReducingState不同的是温兼,聚合類型可能與添加到狀態(tài)中的元素類型不同派近。
case class aggregatingStateFunction() extends RichMapFunction[(Long, Long), Long] {
private var state: AggregatingState[(Long, Long), Long] = null
//業(yè)務(wù)邏輯根據(jù)自己的實際情況設(shè)計
override def map(value: (Long, Long)): Long = {
state.add(value)
value._2
}
override def open(parameters: Configuration): Unit = {
state = getRuntimeContext.getAggregatingState(new AggregatingStateDescriptor[(Long, Long), (Long, Long), Long]("aggr-state-name", aggregateFunction, createTypeInformation[(Long, Long)]))
}
}
val aggregateFunction = new AggregateFunction[(Long, Long), (Long, Long), Long] {
override def createAccumulator(): (Long, Long) = {
(0L, 0L)
}
override def add(value: (Long, Long), accumulator: (Long, Long)): (Long, Long) = {
(accumulator._1 + 1, accumulator._2 + value._2)
}
override def getResult(accumulator: (Long, Long)): Long = {
val result = accumulator._1 / accumulator._2
result
}
override def merge(a: (Long, Long), b: (Long, Long)): (Long, Long) = {
(a._1 + b._1, a._2 + b._2)
}
}
val input: DataStream[(Long, Long)] = ...
input.keyBy(_._1).flatMap(new aggregatingStateFunction())
.print()
env.execute("job-name")
- FoldingState<T,ACC>:跟ReducingState有點類似案疲,不過它的狀態(tài)值類型可以與add()方法中傳入的元素類型不同酌予。FoldingState在Flink-1.4版本已經(jīng)被棄用爷贫,在未來會被完全廢棄掉诗茎《Щ罚可以使用AggregatingState代替之驮肉。
case class foldingStateFunction() extends RichMapFunction[(Long, Long), (Long, Long)] {
private var state: FoldingState[(Long, Long), Long] = null
//業(yè)務(wù)邏輯根據(jù)自己的實際情況設(shè)計
override def map(value: (Long, Long)): (Long, Long) = {
state.add(value)
value
}
override def open(parameters: Configuration): Unit = {
state = getRuntimeContext.getFoldingState(new FoldingStateDescriptor[(Long, Long), Long]("folding-state-name", 10L, new FoldFunction[(Long, Long), Long] {
override def fold(accumulator: Long, value: (Long, Long)): Long = {
accumulator + value._1 + value._2
}
}, createTypeInformation[Long]))
}
}
val input: DataStream[(Long, Long)] = ...
input.keyBy(_._1).flatMap(new foldingStateFunction())
.print()
env.execute("job-name")
- MapState<UK,UV>:即狀態(tài)值是一個map,用戶可以通過put() 或者putAll()方法添加元素涣雕。
case class mapStateFunction() extends RichMapFunction[(Long, Long), ((Long, Long), Long)] {
private var state: MapState[(Long, Long), Long] = null
//業(yè)務(wù)邏輯根據(jù)自己的實際情況設(shè)計
override def map(value: (Long, Long)): ((Long, Long), Long) = {
state.put(value, value._1 + value._2)
(value, state.get(value))
}
override def open(parameters: Configuration): Unit = {
val keyMapState = createTypeInformation[(Long, Long)]
val valueMapState = createTypeInformation[Long]
state = getRuntimeContext.getMapState(new MapStateDescriptor[(Long, Long), Long]("map-state-name", keyMapState, valueMapState))
}
}
val input: DataStream[(Long, Long)] = ...
input.keyBy(_._1).flatMap(new mapStateFunction())
.print()
env.execute("job-name")
6-1.State生存周期
time-to-live(TTL)能夠被使用在任何類型的keyed state全释。如果TTL被配置而且狀態(tài)值已經(jīng)過期装处,則將以最佳的方式清理。為了使用狀態(tài)TTL浸船,必須首先構(gòu)建一個StateTtlConfig 對象妄迁。通過配置可以在任何state descriptor 中使用寝蹈。
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build
val stateDescriptor = new ValueStateDescriptor[String]("value_state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
配置中有下面幾個配置項可以選擇:StateTtlConfig中的newBuilder這個方法是必須的,它是設(shè)置生存周期的值登淘。
- TTL 刷新策略(默認OnCreateAndWrite)
策略類型 | 描述 |
---|---|
StateTtlConfig.UpdateType.Disabled | 禁用TTL箫老,永不過期 |
StateTtlConfig.UpdateType.OnCreateAndWrite | 每次寫操作都會更新State的最后訪問時間 |
StateTtlConfig.UpdateType.OnReadAndWrite | 每次讀寫操作都會跟新State的最后訪問時間 |
- 狀態(tài)可見性(默認NeverReturnExpired)
策略類型 | 描述 |
---|---|
StateTtlConfig.StateVisibility.NeverReturnExpired | 永不返回過期狀態(tài) |
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp | 可以返回過期但尚未被清理的狀態(tài)值 |
Notes:
- 狀態(tài)后端存儲最后一次修改的時間戳和值,這意味著啟用該特性會增加狀態(tài)存儲的消耗形帮。堆狀態(tài)后端在內(nèi)存中存儲一個附加的Java對象槽惫,其中包含對用戶狀態(tài)對象的引用和一個原始長值。RocksDB狀態(tài)后端為每個存儲值辩撑、列表條目或映射條目添加8個字節(jié);
- 目前只支持與處理時間相關(guān)的TTLs;
- 如果試圖使用啟用TTL的描述符或使用啟用TTL的描述符恢復(fù)先前在沒有TTL的情況下配置的狀態(tài)界斜,將導(dǎo)致兼容性失敗和statmigration異常;
- TTL配置不是check- or savepoints的一部分,而是Flink在當前運行的作業(yè)中如何處理它的一種方式
6-1-1.State清除策略
Cleanup in full snapshot↓
默認情況下合冀,過期值只有在顯式讀出時才會被刪除各薇,例如通過調(diào)用ValueState.value()方法。此外君躺,您可以在獲取完整狀態(tài)快照時激活清理操作峭判,這將減少其大小。在當前實現(xiàn)下棕叫,本地狀態(tài)不會被清除林螃,但在從前一個快照恢復(fù)時,它不會包含已刪除的過期狀態(tài)俺泣×迫希可以在StateTtlConfig中配置.
1:下面的配置選項不適用于RocksDB state backend上的increamental checkpointing:;
2:對于現(xiàn)有作業(yè)伏钠,此清理策略可以在StateTtlConfig中隨時激活或停用横漏,例如從保存點重新啟動后可以使用。
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot
.build
Incremental cleanup↓
另一個選項是增量地觸發(fā)對某些狀態(tài)項的清理熟掂。觸發(fā)器可以是來自每個狀態(tài)訪問或/和每個記錄處理的回調(diào)缎浇。如果這個清理策略在某個狀態(tài)下活躍的,那么存儲后端會在其所有條目上為該狀態(tài)保留一個惰性全局迭代器赴肚。每次觸發(fā)增量清理時素跺,迭代器都會被提升。檢查遍歷的狀態(tài)項誉券,并清理過期的狀態(tài)項指厌。
這個特性可以在StateTtlConfig中激活:
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlCon fig
.newBuilder(Time.seconds(1))
.cleanupIncrementally
.build
上面的策略有兩個參數(shù),第一個參數(shù):第是每次清理觸發(fā)的檢查狀態(tài)的條件横朋。如果啟用仑乌,則每次狀態(tài)訪問都將觸發(fā)它。第二個參數(shù):是否為每個記錄處理額外觸發(fā)清理。
Notes:
- 如果對狀態(tài)沒有訪問或者沒有任何處理的記錄晰甚,那么狀態(tài)會一直保留衙传;
- 增量狀態(tài)的清理增加了記錄處理的延遲;
- 目前厕九,增量狀態(tài)的清理策略僅僅在對堆狀態(tài)后端被實現(xiàn)了蓖捶,對于設(shè)置了RocksDB的將沒有效果;
- 如果使用堆狀態(tài)后端進行同步快照扁远,全局迭代器在跌倒時會保留所有鍵的副本俊鱼,因為它的特性不支持對并發(fā)數(shù)的修改。使用此功能將增加內(nèi)存消耗畅买。異步快照進行對狀態(tài)的保存就沒有這種情況發(fā)生并闲;
- 對于現(xiàn)有的作業(yè),可以通過在StateTtlConfig中設(shè)置這種清理策略能夠隨時被激活和停用谷羞,例如:從保存點重新啟動后帝火。
Cleanup during RocksDB compaction↓
如果使用RocksDB進行狀態(tài)的管理,另一個清理策略就是激活Flink的壓縮過濾這個策略湃缎。RocksDB會定期使用異步壓縮來合并狀態(tài)的更新和減少儲存犀填。Flink壓縮過濾器使用TTL檢查狀態(tài)的過期時間戳,并排除過期值嗓违。
默認情況下是關(guān)閉該特性的九巡。對于RocksDB進行狀態(tài)管理首先要做的就是要激活,通過Flink配置文件配置state.backend.rocksdb.ttl.compaction.filter.enabled蹂季,或者對于一個Flink job來說如果一個自定義的RocksDB 狀態(tài)管理被創(chuàng)建那么它可以調(diào)用 RocksDBStateBackend::enableTtlCompactionFilter冕广。然后任何帶有TTL的狀態(tài)都可以配置來去使用過濾器。
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter
.build
RocksDB compaction filter將會從Flink每次處理完一定數(shù)據(jù)量的狀態(tài)之后乏盐,從Flink查詢用于檢查過期的當前時間戳,這個數(shù)字默認是1000佳窑。你也可以選擇更改它制恍,并將自定義值傳遞給StateTtlConfig.newBuilder(…)父能。cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)方法。頻繁的跟新時間錯可以提高清理的數(shù)據(jù)但是會降低壓縮性能净神,因為它使用了來自本地的JNI的調(diào)用何吝。
Notes:
- 在壓縮過程中調(diào)用TTL過濾器會減慢它的速度。TTL過濾器必須解析上次訪問的時間戳鹃唯,并檢查每個正在壓縮的鍵的每個存儲狀態(tài)條目的過期時間爱榕。對于集合狀態(tài)類型(列表或映射),每個存儲的元素也調(diào)用該檢查;
- 對于現(xiàn)有作業(yè)坡慌,此清理策略可以在StateTtlConfig中隨時激活或停用黔酥,例如從保存點重新啟動后。
目前,管理operator state僅僅支持使用List類型跪者。當前棵帽,支持List樣式的托管運算符狀態(tài),彼此之間相互獨立,因此可以在重新縮放時可以重新分配渣玲。換句話說逗概,這些對象是可以重新分配non-keyed state的最佳粒度。根據(jù)狀態(tài)訪問方法忘衍,定義一下重新分配方案逾苫。
6-2.State在scala DataStream API中的使用
除了上面描述的接口之外,Scala API還為有狀態(tài)map()或flatMap()函數(shù)提供了快捷方式枚钓,這些函數(shù)在KeyedStream上只有一個ValueState铅搓。方法中在一個Option中獲取ValueState的當前值,并且必須返回一個更新后的值搀捷,該值將用于更新狀態(tài)狸吞。
val stream: DataStream[(String, Int)] = ...
val counts: DataStream[(String, Int)] = stream
.keyBy(_._1)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ( (in._1, c), Some(c + in._2) )
case None => ( (in._1, 0), Some(in._2) )
})
7. Operator State在托管狀態(tài)下使用
為了使用的operator state,有狀態(tài)的函數(shù)可以通過CheckpointedFunction 接口或者ListCheckpointed<T extends Serializable>接口實現(xiàn)指煎。
CheckpointedFunction↓
CheckpointedFunction 接口通過不同的重新分發(fā)方案提供對非鍵狀態(tài)(no-keyed state)的訪問蹋偏。它需要實現(xiàn)下面兩種方法:
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
每當必須執(zhí)行檢查點時,都會調(diào)用SnapshotState()至壤。每次初始化用戶定義的函數(shù)時威始,都會調(diào)用對應(yīng)的initializeState()方法,急當函數(shù)首次初始化時像街,或者當函數(shù)實際從早期的檢查點恢復(fù)時黎棠。因此,initializeState() 方法不僅僅時初始化不同類型狀態(tài)的地方镰绎,而且也是狀態(tài)恢復(fù)邏輯的地方脓斩。
當前,托管的 operator state支持List 類型畴栖。狀態(tài)應(yīng)該是可序列化對象的List随静,彼此之間獨立,因此在重新調(diào)節(jié)之后可以重新分配吗讶。換句話說燎猛,這些對象是non-keyed state重新分配的最佳粒度。根據(jù)狀態(tài)訪問方法照皆,定義了一下重新分配的方案:
Even-split redistribution:每一個operator 返回一個狀態(tài)元素的List重绷。整個狀態(tài)在邏輯上是所有列表的串聯(lián)。在恢復(fù)或者重新分配時膜毁,這個List會被均勻劃分為和operator并行度一樣的子list(sublist)昭卓。每個operator獲得一個sublist,它可能是為空或者包含一個或者多個元素愤钾。舉個例子:如果并行度為1的operator的checkpointed state包含element1和element2 兩個,當增減并行度到2時候醒,element1可能在operator的實例0中绰垂,element2可能在operator的實例1中。
Union redistribution:*每個operator 返回一個狀態(tài)元素的List火焰。整個狀態(tài)在邏輯上是所有列表的串聯(lián)劲装。在恢復(fù)或者重新分配時,每一個operator都將獲得包含整個state elements的List昌简。 *
下面是一個有狀態(tài)的SinkFunction的例子占业,它使用CheckpointedFunction 在將元素發(fā)送到下游之前進將它們緩沖起來。它演示了even-split redistribution這個策略:
class BufferingSink(threshold: Int = 0)
extends SinkFunction[(String, Int)]
with CheckpointedFunction {
@transient
private var checkpointedState: ListState[(String, Int)] = _
private val bufferedElements = ListBuffer[(String, Int)]()
override def invoke(value: (String, Int), context: Context): Unit = {
bufferedElements += value
if (bufferedElements.size == threshold) {
for (element <- bufferedElements) {
// send it to the sink
}
bufferedElements.clear()
}
}
override def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointedState.clear()
for (element <- bufferedElements) {
checkpointedState.add(element)
}
}
override def initializeState(context: FunctionInitializationContext): Unit = {
val descriptor = new ListStateDescriptor[(String, Int)](
"buffered-elements",
TypeInformation.of(new TypeHint[(String, Int)]() {})
)
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
if(context.isRestored) {
for(element <- checkpointedState.get()) {
bufferedElements += element
}
}
}
}
initializeState ()方法中有一個FunctionInitializationContext參數(shù)纯赎。這個參數(shù)是用來初始化non-keyed state的“containers”谦疾。這有一個ListState類型的container,non-keyed state 的對象將會在進行檢查點時保存到這犬金。
注意怎么初始化這個狀態(tài)念恍,和keyed state類似,使用一個StateDescriptor 包含了狀態(tài)的名字和所持有的狀態(tài)值類型的信息:
val descriptor = new ListStateDescriptor[(String, Long)](
"buffered-elements",
TypeInformation.of(new TypeHint[(String, Long)]() {})
)
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
狀態(tài)訪問方法的命名約定包含其重新分配模式和狀態(tài)結(jié)構(gòu)晚顷。例如峰伙,當在還原的時候?qū)ist state和Union redistribution方案一起使用,通過使用getUnionListState(descriptor)獲取state该默。如果方法名不包含重分配的模式瞳氓,例如,getListState(descriptor)栓袖,這將意味著使用基本的Even-split redistribution 方案匣摘。
初始化容器之后,我們使用FunctionInitializationContext的isRestored()方法檢查當我們的任務(wù)失敗后是否正在恢復(fù)裹刮。如果isRestored()方法返回的結(jié)果是true音榜,說明正在恢復(fù),則使用恢復(fù)邏輯捧弃。
如修改后的BufferingSink代碼所示赠叼,狀態(tài)初始化期間恢復(fù)的ListState保存在類變量中,以備將來在Snapshotstate()中使用塔橡。ListState被清除了前一個checkpoint包含的所有對象梅割,然后被我們想要填充的新對象進行填充霜第。
另一方面葛家,keyed state可以在initializeState()方法中初始化。使用FunctionInitializationContext就可以完成泌类。
override def initializeState(context: FunctionInitializationContext): Unit = {
try {
val descriptor: ListStateDescriptor[List[RealTimeContent]] = new ListStateDescriptor[List[RealTimeContent]]("content-sink", createTypeInformation[List[RealTimeContent]])
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
if (context.isRestored) {
val itt = checkpointedState.get().iterator()
while (itt.hasNext) {
val element = itt.next()
bufferElements += element
}
}
} catch {
case e: Exception => println(e.getMessage)
}
}
ListCheckpointed↓
ListCheckpointed() 接口是 CheckpointedFunction()有限的一個變體癞谒。它只支持List 類型的state 和 even-split redistribution 策略結(jié)合的恢復(fù)底燎。它要求實現(xiàn)下面兩個方法:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
在snapshotState()方法中,operator應(yīng)該返回一個對象的列表給到checkpoint弹砚,restoreState ()方法則是在恢復(fù)中使用這個列表的數(shù)據(jù)双仍。如果這個state是不可再分區(qū)的,你可以在snapshotState()一直返回一個 Collections.singletonList(MY_STATE)桌吃。
7-1.有State的Source Functions
與其它的operators相比朱沃,有狀態(tài)的Sources需要更多的關(guān)注。為了對狀態(tài)和輸出集合原子性進行更新(要求故障恢復(fù)時的 exactly-once來說)茅诱,則要求開發(fā)者從source的context中獲取一個lock逗物。
class CounterSource
extends RichParallelSourceFunction[Long]
with ListCheckpointed[Long] {
@volatile
private var isRunning = true
private var offset = 0L
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
val lock = ctx.getCheckpointLock
while (isRunning) {
// output and state update are atomic
lock.synchronized({
ctx.collect(offset)
offset += 1
})
}
}
override def cancel(): Unit = isRunning = false
override def restoreState(state: util.List[Long]): Unit =
for (s <- state) {
offset = s
}
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
Collections.singletonList(offset)
}