Flink狀態(tài)機制

什么是狀態(tài)

首先要知道,狀態(tài)指的是算子的狀態(tài)淑仆。為什么算子需要狀態(tài)涝婉,狀態(tài)的用處無非兩點:

  1. 實現(xiàn)算子的邏輯(作為一種中間狀態(tài))
  2. 錯誤恢復(fù)
實現(xiàn)算子的邏輯

用官網(wǎng)的例子,假設(shè)一段數(shù)據(jù)流格式長這樣<1,3><1,2><1,3><2,3><2,5>
那么我想對相同第一個元素所有tuple蔗怠,求第二個元素的平均值墩弯。該如何實現(xiàn)?

你可能會想到使用Flink自帶的聚合函數(shù)寞射,其中該函數(shù)緩存所有的相同key的元素渔工,在函數(shù)里做遍歷累加求值的操作。這很正確桥温。但有一個不好的點引矩,需要緩存所有數(shù)據(jù)。

如果現(xiàn)在就讓你用map操作實現(xiàn)呢侵浸?而且不緩存所以數(shù)據(jù)

這就需要用到狀態(tài)了旺韭。試想一下,如果在map算子里面維護這樣一個變量<a,b>通惫。a是該算子的key的次數(shù)茂翔,上面數(shù)據(jù)key為1的次數(shù)便是3(a=3),b是所有第二個元素之和。

那么上面數(shù)據(jù)流在每個map算子中維護了<3,8>,<2,8>的狀態(tài)履腋。好了珊燎,平均值就出來了。而且遵湖,這個狀態(tài)悔政,來一次數(shù)據(jù)更新一次,不需要緩存延旧。

貼下代碼:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)

錯誤恢復(fù)

試想這樣一個場景:
需要將數(shù)據(jù)流的每個數(shù)據(jù)存入數(shù)據(jù)庫谋国,而且任務(wù)失敗后重啟能保證不將數(shù)據(jù)不重復(fù)落盤。怎么實現(xiàn)迁沫?

首先對于落盤芦瘾,肯定不能來一條存一條,考慮到性能問題集畅,我們設(shè)定一個閾值近弟,達(dá)到這個閾值觸發(fā)落盤操作。

那么任務(wù)一旦失敗了挺智,從哪開始恢復(fù)呢祷愉。這就肯定需要知道上一次落盤在哪發(fā)生的。

這就又需要在落盤算子(SinkFunction)中保存一個狀態(tài),用來記錄在上次任務(wù)失敗時所緩存的還沒有落盤的數(shù)據(jù)二鳄,只要把這批數(shù)據(jù)存數(shù)據(jù)庫赴涵。后面的操作繼續(xù)執(zhí)行就可以了。

代碼如下:

public class BufferingSink
        implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末订讼,一起剝皮案震驚了整個濱河市髓窜,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌躯嫉,老刑警劉巖纱烘,帶你破解...
    沈念sama閱讀 211,042評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異祈餐,居然都是意外死亡擂啥,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評論 2 384
  • 文/潘曉璐 我一進(jìn)店門帆阳,熙熙樓的掌柜王于貴愁眉苦臉地迎上來哺壶,“玉大人,你說我怎么就攤上這事蜒谤∩奖觯” “怎么了?”我有些...
    開封第一講書人閱讀 156,674評論 0 345
  • 文/不壞的土叔 我叫張陵鳍徽,是天一觀的道長资锰。 經(jīng)常有香客問我,道長阶祭,這世上最難降的妖魔是什么绷杜? 我笑而不...
    開封第一講書人閱讀 56,340評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮濒募,結(jié)果婚禮上鞭盟,老公的妹妹穿的比我還像新娘。我一直安慰自己瑰剃,他們只是感情好齿诉,可當(dāng)我...
    茶點故事閱讀 65,404評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著晌姚,像睡著了一般粤剧。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上挥唠,一...
    開封第一講書人閱讀 49,749評論 1 289
  • 那天俊扳,我揣著相機與錄音,去河邊找鬼猛遍。 笑死,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的懊烤。 我是一名探鬼主播梯醒,決...
    沈念sama閱讀 38,902評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼腌紧!你這毒婦竟也來了茸习?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,662評論 0 266
  • 序言:老撾萬榮一對情侶失蹤壁肋,失蹤者是張志新(化名)和其女友劉穎号胚,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體浸遗,經(jīng)...
    沈念sama閱讀 44,110評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡猫胁,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了跛锌。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片弃秆。...
    茶點故事閱讀 38,577評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖髓帽,靈堂內(nèi)的尸體忽然破棺而出菠赚,到底是詐尸還是另有隱情,我是刑警寧澤郑藏,帶...
    沈念sama閱讀 34,258評論 4 328
  • 正文 年R本政府宣布衡查,位于F島的核電站,受9級特大地震影響必盖,放射性物質(zhì)發(fā)生泄漏拌牲。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,848評論 3 312
  • 文/蒙蒙 一筑悴、第九天 我趴在偏房一處隱蔽的房頂上張望们拙。 院中可真熱鬧,春花似錦阁吝、人聲如沸砚婆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽装盯。三九已至,卻和暖如春甲馋,著一層夾襖步出監(jiān)牢的瞬間埂奈,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評論 1 264
  • 我被黑心中介騙來泰國打工定躏, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留账磺,地道東北人芹敌。 一個月前我還...
    沈念sama閱讀 46,271評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像垮抗,于是被迫代替她去往敵國和親氏捞。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,452評論 2 348

推薦閱讀更多精彩內(nèi)容

  • 3.2 彈性分布式數(shù)據(jù)集 本節(jié)簡單介紹RDD冒版,并介紹RDD與分布式共享內(nèi)存的異同液茎。 3.2.1 RDD簡介 在集群...
    Albert陳凱閱讀 1,475評論 0 0
  • Spark的算子的分類 從大方向來說,Spark 算子大致可以分為以下兩類: 1)Transformation 變...
    達(dá)微閱讀 864評論 0 6
  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對...
    cosWriter閱讀 11,090評論 1 32
  • Spark的算子的分類 從大方向來說辞嗡,Spark 算子大致可以分為以下兩類: 1)Transformation 變...
    姚興泉閱讀 1,400評論 0 6
  • 正骨的高手少捆等,練成太極拳高手的也少,不過還是有续室。 小金的老師就是一位練成太極拳的高手栋烤。小金是溫州人,年輕猎贴、聰明班缎、有...
    一代鬃獅閱讀 954評論 0 1