Flink狀態(tài)

key狀態(tài)和算子狀態(tài)

key狀態(tài)

key狀態(tài)總是與key有關(guān),只能被用于keyedStream類型的函數(shù)與算子腥放。你可以認(rèn)為key狀態(tài)是一種被分區(qū)的算子狀態(tài),每一個(gè)key有一個(gè)狀態(tài)分區(qū)。每一個(gè)key狀態(tài)邏輯上由<parellel-operator-instance, key>唯一確定册烈,由于每一個(gè)key只分布在key算子的多個(gè)并發(fā)實(shí)例中的一個(gè)實(shí)例上,我們可以將<parellel-operator-instance, key>簡化為<operator, key>.

算子(operator)狀態(tài)

算子狀態(tài)也稱為非key狀態(tài)。每一個(gè)算子狀態(tài)綁定一個(gè)并發(fā)算子實(shí)例赏僧。kafka connector是flink算子狀態(tài)比較好的應(yīng)用范例大猛。每一個(gè) kafka consumer并發(fā)實(shí)例都維護(hù)一個(gè)topic分區(qū)和分區(qū)對應(yīng)的offset的map,并將此map作為算子狀態(tài)淀零。

當(dāng)并發(fā)數(shù)改變的時(shí)候挽绩,算子狀態(tài)支持在并發(fā)實(shí)例間重新分配狀態(tài)。有多種不同的重分配策略驾中。

原始的和被管理的狀態(tài)

key狀態(tài)和算子狀態(tài)以兩種形式存在:被管理的和原始的唉堪。
被管理的狀態(tài)由flink runtime管理,以一種數(shù)據(jù)結(jié)構(gòu)表示肩民,比如內(nèi)部hash表或者RocksDB唠亚,例如: ValueState,ListState等等。flink運(yùn)行時(shí)對狀態(tài)進(jìn)行編碼持痰,然后寫入checkpoints.

原始狀態(tài)是算子保存到它們自己定義的數(shù)據(jù)結(jié)構(gòu)中的一種狀態(tài)灶搜。當(dāng)checkpoint發(fā)生的時(shí)候,flink僅僅將二進(jìn)制寫入到checkpoint中工窍,它不知道狀態(tài)的數(shù)據(jù)結(jié)構(gòu)割卖,僅僅能看見原始的二進(jìn)制字節(jié)數(shù)據(jù)。

所有的stream數(shù)據(jù)流function可以使用被管理的狀態(tài)患雏,但是當(dāng)實(shí)現(xiàn)算子接口的時(shí)候究珊,僅僅能使用原始狀態(tài)接口。推薦使用被管理的狀態(tài)而不是原始狀態(tài)纵苛,因?yàn)槭褂帽还芾頎顟B(tài)剿涮,當(dāng)并發(fā)度變化的時(shí)候,flink能夠自動重新分配狀態(tài)攻人,而且也能夠更好地管理內(nèi)存取试。

注意: 如果你需要自定義被管理狀態(tài)的序列化邏輯,為了確保特性兼容怀吻,請看相應(yīng)的說明瞬浓。Flink默認(rèn)的序列化不需要特別的處理。

使用被管理的Key狀態(tài)

被管理的Key狀態(tài)接口能夠處理不同類型的狀態(tài)蓬坡,包括現(xiàn)有所有輸入元素的key猿棉。這意味著這類狀態(tài)僅僅能被用于KeyedStream上。KeyedStream能夠通過stream.keyBy(...)創(chuàng)建屑咳。

現(xiàn)在萨赁,我們首先看一下當(dāng)前所有的不同類型狀態(tài)接口,然后我們看一下如何在程序中使用兆龙。狀態(tài)接口類型如下:

  • ValueState<T>: 保存一個(gè)值杖爽。這個(gè)值可以被更新或獲取(涉及到上面提到的輸入元素的key, 每個(gè)key中都可能對應(yīng)一個(gè)值)。這個(gè)值可以通過update(T)更新慰安,通過T value()獲取腋寨。
  • ListState<T>: 保存元素列表』溃可以添加元素和獲取當(dāng)前存儲的所有元素Iterable對象萄窜。使用add(T)或addAll(List<T>)方法添加元素。使用Iterable<T> get()方法獲取iterable對象撒桨。也可以通過update(List<T>)方法覆蓋現(xiàn)有的列表查刻。
  • ReducingState<T>: 保存一個(gè)值,這個(gè)值是添加到狀態(tài)中所有值的聚合結(jié)果元莫。這個(gè)接口與ListState相似,但是通過add(T)方法添加的元素會通過指定的ReduceFunction聚合起來蝶押。
  • AggregatingState<IN,OUT>: 保存一個(gè)值踱蠢,這個(gè)值是添加到狀態(tài)的所有值的聚合結(jié)果。與ReducingState相比棋电,聚合后的數(shù)據(jù)類型也許與添加進(jìn)狀態(tài)的元素類型不同茎截。這個(gè)接口與ListState相同,但是通過add(IN)添加的元素使用指定的AggregateFunction對象聚合赶盔。
  • FoldingState<T,ACC>: 保存一個(gè)值企锌,這個(gè)值是添加到狀態(tài)的所有值的聚合結(jié)果。與ReducingState相比于未,聚合結(jié)果的類型可能與添加到狀態(tài)中的元素類型不一樣撕攒。這個(gè)接口與ListState相似,但是通過add(T)添加的元素通過指定的FoldFunction聚合
  • MapState<UK,UV>: 保存一個(gè)map對象烘浦。你可以將kv鍵值對放入狀態(tài)中抖坪,也可以獲取當(dāng)前存儲的鍵值列表一個(gè)Iterable對象。使用put(UK, UK)putAll(Map\<UK,UV\>)方法添加鍵值對闷叉。與key對應(yīng)的value可以通過get(UK)獲取擦俐。map中kv關(guān)系,key握侧,value數(shù)據(jù)分別可以通過entries(),keys()values()方法獲取蚯瞧。

所有類型的state狀態(tài)接口都有一個(gè)clear()方法,能夠清除當(dāng)有輸入元素key的狀態(tài)品擎。

注意: FoldingStateFoldingStateDescriptor已經(jīng)在flink1.4中廢棄了,將來會完全移除埋合。請用AggregatingStateAggregatingStateDescriptor代替。

我們要記住兩件重要的事萄传,第一件事是上面這些state接口類型僅僅用于與狀態(tài)交互饥悴。狀態(tài)不一定必須要保存到flink內(nèi)部,也可以保存到硬盤或其它地方。第二件事是你獲取到的狀態(tài)的值依賴輸入元素的key值西设,所以在同一個(gè)user function中如果兩次輸入流中的key值不一樣的話瓣铣,value也不一樣。

為了獲得一個(gè)狀態(tài)處理類贷揽,你必須要?jiǎng)?chuàng)建一個(gè)StateDescriptor對象棠笑。它里面保存著狀態(tài)的名字(后面我們會看到,你可以創(chuàng)建多個(gè)狀態(tài)禽绪,他們必須有不同的名字蓖救,以便你可以根據(jù)名字獲取狀態(tài)),狀態(tài)存儲值的類型和一個(gè)用戶自定義的function,例如一個(gè)ReduceFunction印屁。根據(jù)你想要存儲狀態(tài)的類型不同循捺,你可以創(chuàng)建ValueStateDescriptor,ListStateDescrptor,ReducingStateDescriptor,FoldingStateDescriptorMapStateDescriptor對象。

狀態(tài)可以通過RuntimeContext獲取雄人,它只能通過富函數(shù)(rich function)獲取从橘。請看這里詳細(xì)了解。但是我們也看一個(gè)簡短的例子础钠。RichFunction中的RuntimeContext對象有如下方法可以獲取狀態(tài)恰力。

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

下面舉一個(gè)FlatMapFunction的例子說明所有部分如何配合的。

 public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long,Long>, Tuple2<Long,Long>> {
   
   private transient ValueState<Tuple2<Long,Long>> sum;
   
   @Override
   public void flatMap(Tuple2<Long,Long> input, Collector<Tuple2<Long,Long>> out) throws Exception {
     // 獲取狀態(tài)值
     Tuple2<Long,Long> currentSum = sum.value();
     
     // 更新數(shù)量
     currentSum.f0 += 1;
     
     // 將輸入數(shù)據(jù)累加到第2個(gè)字段上
     currentSum.f1 += input.f1;
     
     // 更新狀態(tài)
     sum.update(currentSum);
     
     // 如果數(shù)量達(dá)到2個(gè)旗吁,計(jì)算平均值踩萎,發(fā)送到下游,并清空狀態(tài)
     if (currentSum.f0 >= 2) {
       out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
       sum.clear();
     }
   }
   
   @Override
   public void open(Configuration config) {
    ValueStateDescriptor<Tuple2<Long,Long>> descriptor = 
                                 new ValueStateDescriptor<>(
                                        "average", // 狀態(tài)名稱
                                        TypeInformation.of(new TypeHint<Tuple2<Long,Long>> () {}), //類型信息
                                        Tuple2.of(0L,0L)); // 狀態(tài)默認(rèn)值
         sum = getRuntimeContext().getState(descriptor);
   }
 }
 
 // 可以在流處理程序中像這樣使用(假設(shè)我們有一個(gè)StreamExecutionEnvironment env)
 env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
   .keyBy(0)
   .flatMap(new CountWindowAverage())
   .print();
   
 // 將打印輸出(1,4)和(1,5)

這個(gè)例子實(shí)現(xiàn)了一個(gè)貧血的計(jì)數(shù)窗口很钓。我們以tuple的第一個(gè)字段做為key來分類(這個(gè)例子中所有key都為1)香府。CountWindowAverage類成員變量ValueState中存儲著實(shí)時(shí)計(jì)算的數(shù)量和累加和。一量數(shù)量達(dá)到2個(gè)码倦,它將計(jì)算平均值回还,發(fā)送到下游并清空狀態(tài),從0開始叹洲。需要注意的是柠硕,對于不同輸入的key(輸入元素Tuple的第一個(gè)元素值不同),ValueState將保存不同的值运提。

狀態(tài)存活時(shí)間(TTL)

TTL可以被分配給任何類型的key狀態(tài)蝗柔。如果key狀態(tài)設(shè)置了TTL恩静,并且狀態(tài)過期了牙丽,狀態(tài)中存儲的值將被清空。后面將詳細(xì)說明骤视。

所有狀態(tài)集合的TTL是設(shè)置在每個(gè)元素上的栈妆。這意味著列表和map中每個(gè)元素元素過都是過期處理邏輯都是獨(dú)立的胁编,互不影響

為了使用TTL厢钧,首先要?jiǎng)?chuàng)建一個(gè)StateTtlConfig對象。通過給TTL函數(shù)傳遞這個(gè)狀態(tài)配置對象激活TTL嬉橙。

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

配置狀態(tài)需要考慮以下幾個(gè)方面:
newBuilder方法的第一個(gè)參數(shù)是必須的早直,它是TTL過期時(shí)間。
狀態(tài)的TTL時(shí)間戳需要被刷新市框,還需要配置更新類型霞扬,表示在什么情況下刷新,默認(rèn)是OnCreateAndWrite:

  • StateTtlConfig.UpdateType.OnCreateAndWrite - 僅僅當(dāng)創(chuàng)建和寫入時(shí)刷新TTL

  • StateTtlConfig.UpdateType.OnReadAndWrite - 讀和寫時(shí)刷新TTL
    狀態(tài)可見性配置當(dāng)讀取的時(shí)候如果過期的值沒有被清除的話,是否返回枫振,默認(rèn)是NeverReturnExpired

  • StateTtlConfig.StateVisibility.NeverReturnExpired - 從不返回過期的值

  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 如果過期的值仍然可以獲得則返回

如果設(shè)置成NeverReturnExpired喻圃,過期的狀態(tài)數(shù)據(jù)即使沒有被清除也獲取不到,就好像不存在一樣粪滤。這個(gè)選項(xiàng)在數(shù)據(jù)過期后必須不可用的情況下是有用的斧拍,例如對隱私數(shù)據(jù)敏感的應(yīng)用。

另一個(gè)選項(xiàng)是ReturnExpiredIfNotCleanedUp,如果過期的狀態(tài)值沒有被清除的話,允許返回杖小。

注意:

  • 狀態(tài)存儲器除了存儲狀態(tài)值還會存儲數(shù)據(jù)最后一次被修改的時(shí)間戳肆汹,意味著如果啟用TTL這個(gè)特性會增加狀態(tài)存儲的開銷。堆狀態(tài)存儲器會在內(nèi)存中存儲一個(gè)引用用戶狀態(tài)數(shù)據(jù)的一個(gè)java對象窍侧,還有一個(gè)原始的long類型县踢。RocksDB狀態(tài)存儲器會給每一個(gè)值转绷,列表或map中的每個(gè)元素都增加8個(gè)字節(jié)的存儲開銷伟件。
  • 當(dāng)前僅支持處理時(shí)間(processing time)的TTL,不支持事件(event time)的TTL.
  • 沒有配置TTL,卻啟動TTL或反之议经,都會導(dǎo)致兼容失敗和StateMigrationException斧账。
  • TTL配置不是checkpoint或savepoint的一部分,而是flink處理當(dāng)前正運(yùn)行Job的一種方式
  • 設(shè)置TTL的map狀態(tài)如果想支持null值煞肾,僅當(dāng)狀態(tài)值序列化器能夠處理null值的時(shí)候咧织。如果序列化器不支持null值 ,可以使用NullableSerializer包裝類籍救,但這將多消耗一個(gè)字節(jié)的存儲空間习绢。

過期狀態(tài)數(shù)據(jù)的清除

默認(rèn)情況下,過期的狀態(tài)數(shù)據(jù)僅當(dāng)顯示的讀取的時(shí)候才會被清除蝙昙,例如調(diào)用ValueState.value()的時(shí)間闪萄。

注意: 這意味著如果過期狀態(tài)數(shù)據(jù)沒有被讀取,它將不會被清除奇颠,可能導(dǎo)致狀態(tài)數(shù)據(jù)的不斷增長败去。在后來的flink版本中可能會改變。

在完全快照時(shí)清除

另外烈拒,你可以在執(zhí)行狀態(tài)完全快照時(shí)清除過期的狀態(tài)值圆裕,這將減小快照的大小广鳍。在當(dāng)前flink實(shí)現(xiàn)中,本地狀態(tài)不會被清除吓妆,但是當(dāng)從上一個(gè)快照中恢復(fù)的時(shí)候赊时,不會包括過期的狀態(tài)」⒄剑可以在StateTtlConfig中配置:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot()
    .build();

這個(gè)選項(xiàng)不適合于以RocksDB存儲狀態(tài)數(shù)據(jù)的遞增checkpoint.

注意:

  • 對于已經(jīng)存在的job蛋叼,清除策略可以在任意時(shí)間在StateTtlConfig中激活或關(guān)閉激活,例如:從savepoint重啟后

狀態(tài)存儲后端清除

除了在完全快照中清除剂陡,你還可以在后端清除狈涮。如果狀態(tài)后端存儲支持,下面選項(xiàng)可以激活默認(rèn)的后端清除策略鸭栖。

import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
   .newBuilder(Time.seconds(1))
   .cleanupInBackground()
   .build();

對于更詳細(xì)的控制某些特別的后端清除策略歌馍,你可以按照下面描述的單獨(dú)配置。當(dāng)前晕鹊,堆狀態(tài)存儲依賴遞增的清除松却,RocksDB則使用壓縮過濾器。

遞增的清除

另一個(gè)選項(xiàng)是觸發(fā)狀態(tài)的遞增清除策略溅话。觸發(fā)可以是每個(gè)狀態(tài)的獲取或每條記錄處理時(shí)的回調(diào)晓锻。如果某些狀態(tài)激活了清除策略,后端的存儲將持有一個(gè)全局的針對所有元素的惰性迭代器飞几。每次遞增清除策略被觸發(fā)時(shí)砚哆,迭代器就向前進(jìn),會檢查經(jīng)過的元素屑墨,過期的狀態(tài)數(shù)據(jù)將被清除躁锁。

這個(gè)特性可以在StateTtlConfig激活

import org.apache.flink.api.common.state.StateTtlConfig;
 StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupIncrementally()
    .build();

這個(gè)策略有兩個(gè)參數(shù)。第一個(gè)參數(shù)是每次清除觸發(fā)時(shí)檢查的元素個(gè)數(shù)卵史。如果啟用战转,當(dāng)獲取每一個(gè)狀態(tài)數(shù)據(jù)時(shí)都會觸發(fā)。第二個(gè)參數(shù)配置每條記錄處理時(shí)以躯,是否也觸發(fā)清除槐秧。如果你啟用默認(rèn)的后端清除策略,對于堆存儲來說忧设,這個(gè)策略將在每次狀態(tài)數(shù)據(jù)獲取時(shí)被觸發(fā)并檢查5個(gè)元素刁标,并且每條記錄被處理時(shí)不會觸發(fā)清除。

注意:

  • 如果沒有獲取狀態(tài)數(shù)據(jù)或者沒有處理任何記錄见转,過期的狀態(tài)數(shù)據(jù)將一直保存
  • 花費(fèi)在遞增清除上的時(shí)間增加會記錄處理的時(shí)間
  • 當(dāng)前遞增清除僅僅適用于堆存儲命雀,為RocksDB設(shè)置遞增清除沒有作用
  • 如果堆存儲使用同步快照,那么全局的迭代器在迭代時(shí)將保存所有key的副本斩箫。因?yàn)閒link目前的實(shí)現(xiàn)不支持對狀態(tài)的并發(fā)修改吏砂。啟用這個(gè)特性將增加內(nèi)存消耗撵儿。異步快照沒有這個(gè)問題
  • 對于已經(jīng)存在的job,清除策略可以在任意時(shí)間在StateTtlConfig中激活或關(guān)閉激活狐血,例如當(dāng)從savepoint重啟的時(shí)候淀歇。

在RocksDB壓縮過程中清除

如果使用RocksDB存儲狀態(tài),另一個(gè)清除策略是激活flink壓縮過濾器。RocksDB周期性的執(zhí)行異步壓縮來合并狀態(tài)更新和減小存儲匈织。flink壓縮過濾器檢查帶TTL元素的時(shí)間戳浪默,排除掉過期的狀態(tài)數(shù)據(jù)。

這個(gè)特性默認(rèn)沒有啟用缀匕∧删觯可以通過flink配置文件配置,將state.backend.rocksdb.ttl.compaction.filter.enabled設(shè)置為true,或者當(dāng)為某個(gè)Job創(chuàng)建自定義RocksDB狀態(tài)存儲時(shí)調(diào)用RocksDBStateBackend::enableTtlCompactionFilter設(shè)置乡小。這樣設(shè)置了TTL的狀態(tài)將會使用過濾器阔加。

import org.apache.flink.api.common.state.StateTtlConfig

StateTtlConfig ttlConfig = StateTtlConfig
       .newBuilder(Time.seconds(1))
       .cleanupInRocksdbCompactFilter(1000)
       .build();

當(dāng)處理一定數(shù)量的狀態(tài)數(shù)據(jù)后,RocksDB壓縮過濾器將查詢當(dāng)前時(shí)間戳满钟,檢查有沒有過期胜榔。你可以改變它,傳遞一個(gè)自定義值給StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)方法. 更新時(shí)間戳越頻繁湃番,清除的速度越快夭织,但卻降低了壓縮性能。因?yàn)樾枰{(diào)用JNI本地方法. 如果你啟用默認(rèn)的清除策略(這個(gè)策略適用于RocksDB狀態(tài)存儲)吠撮,在每處理1000個(gè)元素后都查詢一次當(dāng)前的時(shí)間戳尊惰。

如果想為RocksDB過濾器開啟本地方法的debug級別日志,可以為FlinkCompactionFilter設(shè)置日志級別為Debug.

log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG

注意:

  • 在壓縮過程中調(diào)用TTL過濾器會減慢flink處理速度纬向。TTL過濾器必須在key正在被壓縮的過程中择浊,刷新key對應(yīng)的每一個(gè)value元素的時(shí)間戳戴卜,并檢查是否過期逾条。對于集合類型例如list或map,也將檢查其中的每一個(gè)元素。
  • 如果這個(gè)特性用于列表狀態(tài)投剥,列表的長度不固定师脂。 TTL過濾器必須對每一個(gè)元素另外通過JNI調(diào)用Java類型的序列化器,由第一個(gè)過期的元素決定下一個(gè)沒過期元素的偏移量。
  • 對于已經(jīng)存在的job江锨,清除策略可以在任意時(shí)間在StateTtlConfig中激活或關(guān)閉激活吃警,例如當(dāng)從savepoint重啟時(shí)。

Scala DataStream API中的狀態(tài)

除了上面講到的接口, Scala API對于KeyStream中使用ValueState存儲單個(gè)狀態(tài)值的map()或flatmap()函數(shù)有更簡短的寫法啄育。user function從Option對象中得到ValueState當(dāng)前狀態(tài)值酌心,并返回一個(gè)待更新的值。

val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

使用被管理的算子狀態(tài)

為了使用被管理的算子狀態(tài)挑豌,必須實(shí)現(xiàn)通用的CheckpointedFunction接口安券,或者實(shí)現(xiàn)ListCheckpointed<T extends Serializable>接口墩崩。

CheckpointedFunction

CheckpointedFunction接口可以讓我們保存不同數(shù)據(jù)結(jié)構(gòu)的非key對象的狀態(tài)。我們?nèi)绻褂煤蠲悖仨殞?shí)現(xiàn)以下兩個(gè)方法:

void snapshotState(FunctionSnapshotContext conetxt) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;

每當(dāng)checkpoint執(zhí)行的時(shí)候都會調(diào)用snapshotState()方法鹦筹,而另一個(gè)方法,initializeState()址貌,則是用戶自定義的功能初始化的時(shí)候調(diào)用铐拐,包括首次初始化或者從之前的checkpoint恢復(fù)的時(shí)候。鑒于此练对,initializeState()不僅包含不同的需要保存狀態(tài)的數(shù)據(jù)初始化的邏輯遍蟋,還需要包含恢復(fù)狀態(tài)的邏輯。

當(dāng)前螟凭,支持列表類型的被管理的算子狀態(tài)匿值,狀態(tài)應(yīng)該是由互相獨(dú)立的可序列化的對象組成的列表,當(dāng)伸縮的時(shí)候需要重新分配赂摆。換言之挟憔,這些對象是非key狀態(tài)分配的最小粒度。根據(jù)狀態(tài)獲取方式的不同烟号,定義了以下分配策略:

  • 平均分配(Even-split redistribution): 每一個(gè)算子返回一個(gè)狀態(tài)集合绊谭,整個(gè)狀態(tài)邏輯上是所有列表集合的并集。當(dāng)恢復(fù)或重新分配時(shí)汪拥,根據(jù)并發(fā)數(shù)平均分配成多個(gè)子集达传,每一個(gè)算子獲取一個(gè)子集合,可能是空集合迫筑,也可能包含一個(gè)或多個(gè)元素宪赶。例如,并發(fā)度1的時(shí)候脯燃,一個(gè)算子的checkpoint狀態(tài)包含元素1和元素2搂妻,當(dāng)并發(fā)度增加到2的時(shí)候,元素1可能被分配到算子0辕棚,元素2被分配到算子1.
  • 合并分配(union redistribution): 每一個(gè)算子返回一個(gè)狀態(tài)的集合欲主,整個(gè)狀態(tài)邏輯上是所有這些列表集合的并集,當(dāng)恢復(fù)或重新分配時(shí)逝嚎,每一個(gè)算子都將分配到整個(gè)狀態(tài)的列表集合扁瓢。

下面是一個(gè)帶狀態(tài)的SinkFunction示例,使用平均分配策略补君,功能是在將一些元素發(fā)送給外部系統(tǒng)之前引几,使用CheckpointedFunction緩存這些元素.

 public class BufferingSink 
           implements SinkFunction <Tuple2<String,Integer>>, CheckpointedFunction {
        
        private final int threshold;
        private transient ListState<Tuple2<String, Integer>> checkpoinedState;
        private List<Tuple2<String, Integer>> bufferedElements;
        public BufferingSink(int threshold) {
         this.threshold = threshold;
         this.bufferedElements = new ArrayList<>();
        }
        
        @Override
        public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
         bufferedElement.add(value);
         if (bufferedElements.size() == threshold) {
           for (Tuple2<String, Integer> element : bufferedElements) {
               // send it to the sink
           }
           bufferedElement.clear();
         }
        }
        
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
          checkpointedState.clear();
          for (Tuple<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
          }
        }
        
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
          ListStateDescriptor<Tuple2<String, Integer>> descriptor = 
              new ListStateDescriptor<>(
                  "buffered-elements",
                  TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}));
                  
         checkpointedState = context.getOperatorStateStore().getListState(descriptor);
         
         if(context.isRestored()) {
             for (Tuple2<String, Integer> element : checkpointedStage.get()) {
                bufferedElements.add(element);
             }
          }
     }
 }

initializeState方法接收一個(gè)FunctionInitializationContext參數(shù)。這個(gè)參數(shù)用于初始化非key的狀態(tài)"容器"挽铁。有一個(gè)ListState類型的狀態(tài)容器伟桅,當(dāng)checkpointing發(fā)生的時(shí)候硅堆,非key的狀態(tài)會存儲在ListState對象中。

注意一下狀態(tài)容器是如何初始化的贿讹,和key狀態(tài)相似渐逃,都需要一個(gè)StateDescriptor來定義狀態(tài)名和保存狀態(tài)的數(shù)據(jù)類型信息。

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    new ListStateDescriptor<>(
        "buffered-elements",
        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

獲取狀態(tài)方法名稱的不同代表了不同的分配策略民褂。例如茄菊,如果想在恢復(fù)的時(shí)候使用合并分配策略,獲取狀態(tài)時(shí)面殖,使用getUnionListState(descriptor)方法。如果方法名不包含分配策略名稱脊僚,例如:getListState(descriptor),就默認(rèn)表示將會使用平均分配策略。

在初始化狀態(tài)容器之后辽幌,我們使用isRestored()方法來判斷當(dāng)前是否是失敗后的恢復(fù),如果是乌企,將執(zhí)行恢復(fù)邏輯。

我們再來看看類BufferingSink的代碼加酵,ListState是成員變量,在initializeState方法初始化猪腕,以便在snapshotState方法中使用。在snapshotState方法中钦勘,ListState首先清除上一次checkpont的所有對象陋葡,然后保存這一次需要checkpoint的對象。

順便提一下个盆,key狀態(tài)也能使用initializeState方法初始化脖岛,可以使用FunctionInitializationContext對象實(shí)現(xiàn)朵栖。

ListCheckpointed

ListCheckpointedCheckpointedFunction的一個(gè)變體颊亮,有更多的限制條件,僅支持list類型的狀態(tài)存儲陨溅,并且只能是平均分配终惑。包含以下兩個(gè)方法:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;

snapshotState方法中,算子應(yīng)該返回需要保存的list對象门扇,當(dāng)恢復(fù)的時(shí)候雹有,在restoreState方法中編寫恢復(fù)list數(shù)據(jù)的邏輯偿渡。 如果狀態(tài)不需要重新分區(qū),可以在snapshotState方法中返回Collections.singletonList(MY_STATE)對象霸奕。

Stateful Source函數(shù)

相對于其它算子溜宽,Stateful source算子有一點(diǎn)特別。為了使更新狀態(tài)和輸出狀態(tài)原子化(失敗/恢復(fù)的恰好一次主義要求),必須在source算子的上下文中使用鎖质帅。

public static class CounterSource
        extends RichParallelSourceFunction<Long>
        implements ListCheckpointed<Long> {

    /**  恰好一次語義使用的偏移量 */
    private Long offset = 0L;

    /**job是否取消的標(biāo)識*/
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // 輸出和更新狀態(tài)是原子化的
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
        return Collections.singletonList(offset);
    }

    @Override
    public void restoreState(List<Long> state) {
        for (Long s : state)
            offset = s;
    }
}

當(dāng)flink checkpoint完全確認(rèn)的時(shí)候,一些算子也許需要與外部系統(tǒng)交換一些信息适揉,這種情況看下org.apache.flink.runtime.state.CheckpointListener接口。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末煤惩,一起剝皮案震驚了整個(gè)濱河市嫉嘀,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌魄揉,老刑警劉巖剪侮,帶你破解...
    沈念sama閱讀 212,383評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異洛退,居然都是意外死亡瓣俯,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評論 3 385
  • 文/潘曉璐 我一進(jìn)店門兵怯,熙熙樓的掌柜王于貴愁眉苦臉地迎上來降铸,“玉大人,你說我怎么就攤上這事摇零⊥频В” “怎么了?”我有些...
    開封第一講書人閱讀 157,852評論 0 348
  • 文/不壞的土叔 我叫張陵谅畅,是天一觀的道長毡泻。 經(jīng)常有香客問我粘优,道長雹顺,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,621評論 1 284
  • 正文 為了忘掉前任贩挣,我火速辦了婚禮卵迂,結(jié)果婚禮上绒净,老公的妹妹穿的比我還像新娘。我一直安慰自己论颅,他們只是感情好恃疯,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,741評論 6 386
  • 文/花漫 我一把揭開白布今妄。 她就那樣靜靜地躺著盾鳞,像睡著了一般瞻离。 火紅的嫁衣襯著肌膚如雪套利。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,929評論 1 290
  • 那天验辞,我揣著相機(jī)與錄音跌造,去河邊找鬼壳贪。 笑死寝杖,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的醉拓。 我是一名探鬼主播收苏,決...
    沈念sama閱讀 39,076評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼鹿霸,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了钻哩?” 一聲冷哼從身側(cè)響起肛冶,我...
    開封第一講書人閱讀 37,803評論 0 268
  • 序言:老撾萬榮一對情侶失蹤珊肃,失蹤者是張志新(化名)和其女友劉穎馅笙,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體烈和,經(jīng)...
    沈念sama閱讀 44,265評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡招刹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,582評論 2 327
  • 正文 我和宋清朗相戀三年蔗喂,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了缰儿。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片乖阵。...
    茶點(diǎn)故事閱讀 38,716評論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡瞪浸,死狀恐怖吏祸,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蹈矮,我是刑警寧澤,帶...
    沈念sama閱讀 34,395評論 4 333
  • 正文 年R本政府宣布蝠咆,位于F島的核電站刚操,受9級特大地震影響再芋,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜鉴逞,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,039評論 3 316
  • 文/蒙蒙 一华蜒、第九天 我趴在偏房一處隱蔽的房頂上張望豁遭。 院中可真熱鬧,春花似錦捂蕴、人聲如沸啥辨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽级乍。三九已至玫荣,卻和暖如春大诸,著一層夾襖步出監(jiān)牢的瞬間贯卦,已是汗流浹背撵割。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評論 1 266
  • 我被黑心中介騙來泰國打工睁枕, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留沸手,地道東北人注簿。 一個(gè)月前我還...
    沈念sama閱讀 46,488評論 2 361
  • 正文 我出身青樓捐晶,卻偏偏與公主長得像妄辩,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子英支,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,612評論 2 350