什么是狀態(tài)
首先要知道,狀態(tài)指的是算子的狀態(tài)淑仆。為什么算子需要狀態(tài)涝婉,狀態(tài)的用處無非兩點:
- 實現(xiàn)算子的邏輯(作為一種中間狀態(tài))
- 錯誤恢復(fù)
實現(xiàn)算子的邏輯
用官網(wǎng)的例子,假設(shè)一段數(shù)據(jù)流格式長這樣<1,3><1,2><1,3><2,3><2,5>
那么我想對相同第一個元素所有tuple蔗怠,求第二個元素的平均值墩弯。該如何實現(xiàn)?
你可能會想到使用Flink自帶的聚合函數(shù)寞射,其中該函數(shù)緩存所有的相同key的元素渔工,在函數(shù)里做遍歷累加求值的操作。這很正確桥温。但有一個不好的點引矩,需要緩存所有數(shù)據(jù)。
如果現(xiàn)在就讓你用map操作實現(xiàn)呢侵浸?而且不緩存所以數(shù)據(jù)
這就需要用到狀態(tài)了旺韭。試想一下,如果在map算子里面維護這樣一個變量<a,b>通惫。a是該算子的key的次數(shù)茂翔,上面數(shù)據(jù)key為1的次數(shù)便是3(a=3),b是所有第二個元素之和。
那么上面數(shù)據(jù)流在每個map算子中維護了<3,8>,<2,8>的狀態(tài)履腋。好了珊燎,平均值就出來了。而且遵湖,這個狀態(tài)悔政,來一次數(shù)據(jù)更新一次,不需要緩存延旧。
貼下代碼:
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* The ValueState handle. The first field is the count, the second field a running sum.
*/
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
// access the state value
Tuple2<Long, Long> currentSum = sum.value();
// update the count
currentSum.f0 += 1;
// add the second field of the input value
currentSum.f1 += input.f1;
// update the state
sum.update(currentSum);
// if the count reaches 2, emit the average and clear the state
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
}
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(0)
.flatMap(new CountWindowAverage())
.print();
// the printed output will be (1,4) and (1,5)
錯誤恢復(fù)
試想這樣一個場景:
需要將數(shù)據(jù)流的每個數(shù)據(jù)存入數(shù)據(jù)庫谋国,而且任務(wù)失敗后重啟能保證不將數(shù)據(jù)不重復(fù)落盤。怎么實現(xiàn)迁沫?
首先對于落盤芦瘾,肯定不能來一條存一條,考慮到性能問題集畅,我們設(shè)定一個閾值近弟,達(dá)到這個閾值觸發(fā)落盤操作。
那么任務(wù)一旦失敗了挺智,從哪開始恢復(fù)呢祷愉。這就肯定需要知道上一次落盤在哪發(fā)生的。
這就又需要在落盤算子(SinkFunction)中保存一個狀態(tài),用來記錄在上次任務(wù)失敗時所緩存的還沒有落盤的數(shù)據(jù)二鳄,只要把這批數(shù)據(jù)存數(shù)據(jù)庫赴涵。后面的操作繼續(xù)執(zhí)行就可以了。
代碼如下:
public class BufferingSink
implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
}