本篇是對(duì)一篇Flink文章的翻譯:https://training.ververica.com/lessons/stateful.html
實(shí)現(xiàn)一個(gè)場(chǎng)景:我們需要輸出每個(gè)傳感器的每個(gè)采集值(數(shù)值)溉旋,但是傳感器的數(shù)據(jù)波動(dòng)很大关炼,如何對(duì)數(shù)據(jù)處理告材,使其波動(dòng)沒(méi)有那么大叼风。這就需要當(dāng)前的采集值需要聯(lián)系之前的采集值殖蚕,換而言之匙隔,就需要知道當(dāng)前傳感器的狀態(tài)实抡。
為什么Flink需要管理狀態(tài)
flink在狀態(tài)管理上提供了一些引人矚目的的特性:
- local:Flink state可以被保存在本地機(jī)器中抡笼,可以以內(nèi)存的速度訪問(wèn)
- durable:Flink state 可以自動(dòng)被存儲(chǔ)下來(lái)
- vertically scalable(垂直擴(kuò)展):state可以被存在數(shù)據(jù)庫(kù)中秽荤,通過(guò)增加存儲(chǔ)來(lái)擴(kuò)展
- horizontally scalable(水平擴(kuò)展):當(dāng)集群數(shù)目增大甜奄,state可以重新分配
- queryble:state可以通過(guò)rest api查詢的
Rich Function
這點(diǎn)我們已經(jīng)看過(guò)許多這樣的函數(shù)式接口,包括FilterFunction, MapFunction, and FlatMapFunction. 這些都是單一抽象方法模式窃款。
每個(gè)接口课兄,F(xiàn)link都提供了rich 模式,rich接口包含其他的方法晨继,包括:
- open():operator初始化時(shí)調(diào)用一次烟阐。這可以用來(lái)加載靜態(tài)data,或者打開(kāi)鏈接。
- close():釋放資源
- getRuntimeContext()提供對(duì)一切潛在感興趣的東西的訪問(wèn)蜒茄,最重要的是你可以創(chuàng)造和獲取state
一個(gè)keyed state的例子
這個(gè)例子唉擂,我們有一個(gè)傳感器數(shù)據(jù)流<String,double>,指的是傳感器id,和讀的數(shù)據(jù)檀葛。我們的目標(biāo)是將傳來(lái)的數(shù)據(jù)平滑處理玩祟。通過(guò)Smoother(下面的類)
為了完成這個(gè),我們的smoother需要記錄最近的傳感器數(shù)據(jù)屿聋,這可以通過(guò)Flink keyed state接口實(shí)現(xiàn)卵凑。
當(dāng)你處理類似的鍵流,flink將會(huì)維持一個(gè)key -value的存儲(chǔ)胜臊,來(lái)管理每個(gè)事件的state勺卢。
Flink支持多種類型的鍵狀態(tài),這個(gè)例子我們來(lái)討論最簡(jiǎn)單的象对,叫做ValueState黑忱。這意味著,F(xiàn)link會(huì)為每個(gè)key保持一個(gè)對(duì)象勒魔。本例子中甫煞,稱為MovingAverage。因?yàn)橐恍┬阅苌系脑蚬诰睿現(xiàn)Link也有支持特殊類型的State抚吠,包括ListState和MapState。
我們的Smoother有兩個(gè)方法:open(),map()弟胀。在open中楷力,我們通過(guò)ValueStateDescriptor創(chuàng)建了我們需要的state。這個(gè)方法的參數(shù)包含名字孵户,而且提供了序列化需要的信息(MovingAverage.class)萧朝。
public static class Smoother extends RichMapFunction<Tuple2<String, Double>, Tuple2<String, Double>> {
private ValueState<MovingAverage> averageState;
@Override
public void open (Configuration conf) {
ValueStateDescriptor<MovingAverage> descriptor =
new ValueStateDescriptor<>("moving average", MovingAverage.class);
averageState = getRuntimeContext().getState(descriptor);
}
@Override
public Tuple2<String, Double> map (Tuple2<String, Double> item) throws Exception {
// access the state for this key
MovingAverage average = averageState.value();
// create a new MovingAverage (with window size 2) if none exists for this key
if (average == null) average = new MovingAverage(2);
// add this event to the moving average
average.add(item.f1);
averageState.update(average);
// return the smoothed result
return new Tuple2(item.f0, average.getAverage());
}
}
這個(gè)map方法會(huì)用來(lái)讓每個(gè)事件值更加平滑。每個(gè)事件都會(huì)調(diào)用一次map夏哭,當(dāng)然這個(gè)事件是與一個(gè)特定的key相關(guān)(一個(gè)傳感器)检柬,valueState保存了某個(gè)傳感器(key)的之前事件的信息,我們可以將本次事件的值與之前事件的傳感器值聯(lián)系起來(lái)竖配,去一個(gè)更加平滑的值何址。可以通過(guò)取平均值进胯。
Clearing State
這里有一個(gè)潛在的問(wèn)題用爪,如果傳感器的key是無(wú)窮的,那么Flink需要為每一個(gè)傳感器保存一個(gè)狀態(tài)值(MovingState)龄减。所以將一些結(jié)束项钮,不再會(huì)用到的key清除掉班眯。像:
averageState.clear()
在key沒(méi)有用到的一段時(shí)間后希停,我們就可以這樣做烁巫。我們將會(huì)看到如何用Timer做這個(gè),這要等到學(xué)了ProcessFunction后宠能。(之后會(huì)更新)
本篇分析了Keyed Stream State亚隙,還有Non-keyed State ,通常叫做operator state违崇,這個(gè)實(shí)現(xiàn)有些特殊阿弃,但是不常用。
Further Reading
- Working with State (Apache Flink Documentation)
- Using Managed Operator State (Apache Flink Documentation)
因?yàn)橄矚g羞延,所以堅(jiān)持