Flink帶有狀態(tài)的轉(zhuǎn)換操作

本篇是對(duì)一篇Flink文章的翻譯:https://training.ververica.com/lessons/stateful.html
實(shí)現(xiàn)一個(gè)場(chǎng)景:我們需要輸出每個(gè)傳感器的每個(gè)采集值(數(shù)值)溉旋,但是傳感器的數(shù)據(jù)波動(dòng)很大关炼,如何對(duì)數(shù)據(jù)處理告材,使其波動(dòng)沒(méi)有那么大叼风。這就需要當(dāng)前的采集值需要聯(lián)系之前的采集值殖蚕,換而言之匙隔,就需要知道當(dāng)前傳感器的狀態(tài)实抡。

為什么Flink需要管理狀態(tài)

flink在狀態(tài)管理上提供了一些引人矚目的的特性:

  1. local:Flink state可以被保存在本地機(jī)器中抡笼,可以以內(nèi)存的速度訪問(wèn)
  2. durable:Flink state 可以自動(dòng)被存儲(chǔ)下來(lái)
  3. vertically scalable(垂直擴(kuò)展):state可以被存在數(shù)據(jù)庫(kù)中秽荤,通過(guò)增加存儲(chǔ)來(lái)擴(kuò)展
  4. horizontally scalable(水平擴(kuò)展):當(dāng)集群數(shù)目增大甜奄,state可以重新分配
  5. queryble:state可以通過(guò)rest api查詢的

Rich Function

這點(diǎn)我們已經(jīng)看過(guò)許多這樣的函數(shù)式接口,包括FilterFunction, MapFunction, and FlatMapFunction. 這些都是單一抽象方法模式窃款。
每個(gè)接口课兄,F(xiàn)link都提供了rich 模式,rich接口包含其他的方法晨继,包括:

  • open():operator初始化時(shí)調(diào)用一次烟阐。這可以用來(lái)加載靜態(tài)data,或者打開(kāi)鏈接。
  • close():釋放資源
  • getRuntimeContext()提供對(duì)一切潛在感興趣的東西的訪問(wèn)蜒茄,最重要的是你可以創(chuàng)造和獲取state

一個(gè)keyed state的例子

這個(gè)例子唉擂,我們有一個(gè)傳感器數(shù)據(jù)流<String,double>,指的是傳感器id,和讀的數(shù)據(jù)檀葛。我們的目標(biāo)是將傳來(lái)的數(shù)據(jù)平滑處理玩祟。通過(guò)Smoother(下面的類)

為了完成這個(gè),我們的smoother需要記錄最近的傳感器數(shù)據(jù)屿聋,這可以通過(guò)Flink keyed state接口實(shí)現(xiàn)卵凑。

當(dāng)你處理類似的鍵流,flink將會(huì)維持一個(gè)key -value的存儲(chǔ)胜臊,來(lái)管理每個(gè)事件的state勺卢。

Flink支持多種類型的鍵狀態(tài),這個(gè)例子我們來(lái)討論最簡(jiǎn)單的象对,叫做ValueState黑忱。這意味著,F(xiàn)link會(huì)為每個(gè)key保持一個(gè)對(duì)象勒魔。本例子中甫煞,稱為MovingAverage。因?yàn)橐恍┬阅苌系脑蚬诰睿現(xiàn)Link也有支持特殊類型的State抚吠,包括ListState和MapState。

我們的Smoother有兩個(gè)方法:open(),map()弟胀。在open中楷力,我們通過(guò)ValueStateDescriptor創(chuàng)建了我們需要的state。這個(gè)方法的參數(shù)包含名字孵户,而且提供了序列化需要的信息(MovingAverage.class)萧朝。

public static class Smoother extends RichMapFunction<Tuple2<String, Double>, Tuple2<String, Double>> {
  private ValueState<MovingAverage> averageState;

  @Override
  public void open (Configuration conf) {
    ValueStateDescriptor<MovingAverage> descriptor =
      new ValueStateDescriptor<>("moving average", MovingAverage.class);
    averageState = getRuntimeContext().getState(descriptor);
  }

  @Override
  public Tuple2<String, Double> map (Tuple2<String, Double> item) throws Exception {
    // access the state for this key
    MovingAverage average = averageState.value();

    // create a new MovingAverage (with window size 2) if none exists for this key
    if (average == null) average = new MovingAverage(2);

    // add this event to the moving average
    average.add(item.f1);
    averageState.update(average);

    // return the smoothed result
    return new Tuple2(item.f0, average.getAverage());
  }
}

這個(gè)map方法會(huì)用來(lái)讓每個(gè)事件值更加平滑。每個(gè)事件都會(huì)調(diào)用一次map夏哭,當(dāng)然這個(gè)事件是與一個(gè)特定的key相關(guān)(一個(gè)傳感器)检柬,valueState保存了某個(gè)傳感器(key)的之前事件的信息,我們可以將本次事件的值與之前事件的傳感器值聯(lián)系起來(lái)竖配,去一個(gè)更加平滑的值何址。可以通過(guò)取平均值进胯。

Clearing State
這里有一個(gè)潛在的問(wèn)題用爪,如果傳感器的key是無(wú)窮的,那么Flink需要為每一個(gè)傳感器保存一個(gè)狀態(tài)值(MovingState)龄减。所以將一些結(jié)束项钮,不再會(huì)用到的key清除掉班眯。像:

averageState.clear()

在key沒(méi)有用到的一段時(shí)間后希停,我們就可以這樣做烁巫。我們將會(huì)看到如何用Timer做這個(gè),這要等到學(xué)了ProcessFunction后宠能。(之后會(huì)更新)

本篇分析了Keyed Stream State亚隙,還有Non-keyed State ,通常叫做operator state违崇,這個(gè)實(shí)現(xiàn)有些特殊阿弃,但是不常用。

Further Reading


因?yàn)橄矚g羞延,所以堅(jiān)持

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末渣淳,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子伴箩,更是在濱河造成了極大的恐慌入愧,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,430評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嗤谚,死亡現(xiàn)場(chǎng)離奇詭異棺蛛,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)巩步,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,406評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門旁赊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人椅野,你說(shuō)我怎么就攤上這事终畅。” “怎么了竟闪?”我有些...
    開(kāi)封第一講書(shū)人閱讀 167,834評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵声离,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我瘫怜,道長(zhǎng)术徊,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,543評(píng)論 1 296
  • 正文 為了忘掉前任鲸湃,我火速辦了婚禮赠涮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘暗挑。我一直安慰自己笋除,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,547評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布炸裆。 她就那樣靜靜地躺著垃它,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上国拇,一...
    開(kāi)封第一講書(shū)人閱讀 52,196評(píng)論 1 308
  • 那天洛史,我揣著相機(jī)與錄音,去河邊找鬼酱吝。 笑死也殖,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的务热。 我是一名探鬼主播忆嗜,決...
    沈念sama閱讀 40,776評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼崎岂!你這毒婦竟也來(lái)了捆毫?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,671評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤冲甘,失蹤者是張志新(化名)和其女友劉穎冻璃,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體损合,經(jīng)...
    沈念sama閱讀 46,221評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡省艳,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,303評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了嫁审。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片跋炕。...
    茶點(diǎn)故事閱讀 40,444評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖律适,靈堂內(nèi)的尸體忽然破棺而出辐烂,到底是詐尸還是另有隱情,我是刑警寧澤捂贿,帶...
    沈念sama閱讀 36,134評(píng)論 5 350
  • 正文 年R本政府宣布纠修,位于F島的核電站,受9級(jí)特大地震影響厂僧,放射性物質(zhì)發(fā)生泄漏扣草。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,810評(píng)論 3 333
  • 文/蒙蒙 一颜屠、第九天 我趴在偏房一處隱蔽的房頂上張望辰妙。 院中可真熱鬧,春花似錦甫窟、人聲如沸密浑。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,285評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)尔破。三九已至街图,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間懒构,已是汗流浹背餐济。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,399評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留痴脾,地道東北人颤介。 一個(gè)月前我還...
    沈念sama閱讀 48,837評(píng)論 3 376
  • 正文 我出身青樓梳星,卻偏偏與公主長(zhǎng)得像赞赖,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子冤灾,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,455評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容