Flink--WaterMark理解和實(shí)踐

  • 基于flink-1.8.1

Watermark作用

  • watermark是用于處理亂序事件的峻厚,而正確的處理亂序事件屋谭,通常用watermark機(jī)制結(jié)合window來(lái)實(shí)現(xiàn)。
  • 我們知道胚委,流處理從事件產(chǎn)生竹椒,到流經(jīng)source朦促,再到operator膝晾,中間是有一個(gè)過(guò)程和時(shí)間的。雖然大部分情況下务冕,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來(lái)的血当,但是也不排除由于網(wǎng)絡(luò)、背壓等原因禀忆,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說(shuō)late element)臊旭。
  • 但是對(duì)于late element,我們又不能無(wú)限期的等下去箩退,必須要有個(gè)機(jī)制來(lái)保證一個(gè)特定的時(shí)間后离熏,必須觸發(fā)window去進(jìn)行計(jì)算了。這個(gè)特別的機(jī)制戴涝,就是watermark滋戳。
  • watermark基礎(chǔ)知識(shí):Flink--EventTime中WaterMark知識(shí)點(diǎn)掃盲

Window的劃分

  • Window的設(shè)定無(wú)關(guān)數(shù)據(jù)本身钻蔑,而是系統(tǒng)定義好的。
  • window是flink中劃分?jǐn)?shù)據(jù)一個(gè)基本單位奸鸯,window的劃分方式是固定的咪笑,默認(rèn)會(huì)根據(jù)自然時(shí)間劃分window,并且劃分方式是前閉后開(kāi)娄涩。
  • window示例:
window劃分 第一個(gè) 第二個(gè) 第三個(gè)
3s [00:00:00~00:00:03) [00:00:03~00:00:06) [00:00:06~00:00:09)
5s [00:00:00~00:00:05) [00:00:05~00:00:10) [00:00:10~00:00:15)
10s [00:00:00~00:00:10) [00:00:10~00:00:20) [00:00:20~00:00:30)
1min [00:00:00~00:01:00) [00:01:00~00:02:00) [00:02:00~00:03:00)

Watermark分配方式

Watermark默認(rèn)更新時(shí)間

  • 詳見(jiàn)源碼解釋
  • 在非processing time的模式下钝满,默認(rèn)是200ms;
// --------------------------------------------------------------------------------------------
    //  Time characteristic
    // --------------------------------------------------------------------------------------------

    /**
     * Sets the time characteristic for all streams create from this environment, e.g., processing
     * time, event time, or ingestion time.
     *
     * <p>If you set the characteristic to IngestionTime of EventTime this will set a default
     * watermark update interval of 200 ms. If this is not applicable for your application
     * you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
     *
     * @param characteristic The time characteristic.
     */
    @PublicEvolving
    public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
        this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
        if (characteristic == TimeCharacteristic.ProcessingTime) {
            getConfig().setAutoWatermarkInterval(0);
        } else {
            getConfig().setAutoWatermarkInterval(200);
        }
    }

Periodic Watermarks跟蹤

  • 因?yàn)镻eriodic Watermarks允許設(shè)定一個(gè)最大亂序時(shí)間申窘,這種情況應(yīng)用最多弯蚜。
package github.yahuili1128.watermark;

import github.yahuili1128.pojo.MockUpModel;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.TreeSet;

import static github.yahuili1128.connector.SourceKafka010.getMockUpkafka010;

/**
 * @Description : 從kafka中讀取數(shù)據(jù),練習(xí)watermark
 * @Author : LiYahui
 * @Date : 2019-08-06 11:45
 * @Version : V1.0
 */
public class PeriodicWatermarkTest {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        SingleOutputStreamOperator<MockUpModel> mockUpkafka010 = getMockUpkafka010(env).name("kafka source");
        SingleOutputStreamOperator<String> result = mockUpkafka010.filter(line -> line.gender.equals("male"))
                .assignTimestampsAndWatermarks(new GetWateramrk()).keyBy(line -> line.gender)
                .window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new WindowTest()).name("watermark print");

        result.print();

        env.execute(PeriodicWatermarkTest.class.getSimpleName());
    }

    public static class WindowTest implements WindowFunction<MockUpModel, String, String, TimeWindow> {

        @Override
        public void apply(String key, TimeWindow window, Iterable<MockUpModel> input, Collector<String> out)
                throws Exception {
            TreeSet<Long> set = new TreeSet<>();
            //          元素個(gè)數(shù)
            int size = Iterables.size(input);
            Iterator<MockUpModel> eles = input.iterator();
            while (eles.hasNext()) {
                set.add(eles.next().timestamp);
            }
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
            //(code剃法,窗口內(nèi)元素個(gè)數(shù)碎捺,窗口內(nèi)最早元素的時(shí)間,窗口內(nèi)最晚元素的時(shí)間贷洲,窗口自身開(kāi)始時(shí)間收厨,窗口自身結(jié)束時(shí)間)
            String first = sdf.format(set.first());
            String last = sdf.format(set.last());
            String start = sdf.format(window.getStart());
            String end = sdf.format(window.getEnd());
            // 調(diào)試使用
            out.collect("event.key:" + key + ",window中元素個(gè)數(shù):" + size + ",window第一個(gè)元素時(shí)間戳:" + first + ",window最后一個(gè)元素時(shí)間戳:"
                    + last + ",window開(kāi)始時(shí)間戳:" + start + ",window結(jié)束時(shí)間戳:" + end + ",窗口內(nèi)所有的時(shí)間戳:" + set.toString());

        }
    }


    public static class GetWateramrk implements AssignerWithPeriodicWatermarks<MockUpModel> {
        //      定義最大延遲 2s
        private final long maxOutOfOrderness = 5000L;
        private long currentMaxTimestamp;
        private Watermark watermark;

        //      將時(shí)間戳信息格式化,調(diào)試學(xué)習(xí)
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            watermark = new Watermark(this.currentMaxTimestamp - this.maxOutOfOrderness);
            return watermark;
        }

        @Override
        public long extractTimestamp(MockUpModel element, long previousElementTimestamp) {
            //      獲取event中的時(shí)間戳
            long timestamp = element.getTimestamp();
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            //          將所有的時(shí)間信息打印
            System.out.println("--->>> event.key:" + element.gender + " | event中timestamp:" + timestamp + "| " + sdf
                    .format(timestamp) + "| currentMaxTimestamp:" + currentMaxTimestamp + "| " + sdf
                    .format(currentMaxTimestamp) + "| watermark" + watermark.toString());
            //          返回event中的時(shí)間戳
            return timestamp;
        }
    }

}

  • print數(shù)據(jù)案例
--->>> event.key:male | event中timestamp:1565082033747| 2019-08-06 17:00:33.747| currentMaxTimestamp:1565082033747| 2019-08-06 17:00:33.747| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082036758| 2019-08-06 17:00:36.758| currentMaxTimestamp:1565082036758| 2019-08-06 17:00:36.758| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082038778| 2019-08-06 17:00:38.778| currentMaxTimestamp:1565082038778| 2019-08-06 17:00:38.778| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082037791| 2019-08-06 17:00:37.791| currentMaxTimestamp:1565082038778| 2019-08-06 17:00:38.778| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082038803| 2019-08-06 17:00:38.803| currentMaxTimestamp:1565082038803| 2019-08-06 17:00:38.803| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082039815| 2019-08-06 17:00:39.815| currentMaxTimestamp:1565082039815| 2019-08-06 17:00:39.815| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082041839| 2019-08-06 17:00:41.839| currentMaxTimestamp:1565082041839| 2019-08-06 17:00:41.839| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082044850| 2019-08-06 17:00:44.850| currentMaxTimestamp:1565082044850| 2019-08-06 17:00:44.850| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082046869| 2019-08-06 17:00:46.869| currentMaxTimestamp:1565082046869| 2019-08-06 17:00:46.869| watermarkWatermark @ 1565082039850
8> event.key:male,window中元素個(gè)數(shù):6,window第一個(gè)元素時(shí)間戳:2019-08-06 17:00:33.747,window最后一個(gè)元素時(shí)間戳:2019-08-06 17:00:39.815,window開(kāi)始時(shí)間戳:2019-08-06 17:00:30.000,window結(jié)束時(shí)間戳:2019-08-06 17:00:40.000,窗口內(nèi)所有的時(shí)間戳:[1565082033747, 1565082036758, 1565082037791, 1565082038778, 1565082038803, 1565082039815]
--->>> event.key:male | event中timestamp:1565082047880| 2019-08-06 17:00:47.880| currentMaxTimestamp:1565082047880| 2019-08-06 17:00:47.880| watermarkWatermark @ 1565082041869
--->>> event.key:male | event中timestamp:1565082046890| 2019-08-06 17:00:46.890| currentMaxTimestamp:1565082047880| 2019-08-06 17:00:47.880| watermarkWatermark @ 1565082042880
--->>> event.key:male | event中timestamp:1565082049900| 2019-08-06 17:00:49.900| currentMaxTimestamp:1565082049900| 2019-08-06 17:00:49.900| watermarkWatermark @ 1565082042880
--->>> event.key:male | event中timestamp:1565082051921| 2019-08-06 17:00:51.921| currentMaxTimestamp:1565082051921| 2019-08-06 17:00:51.921| watermarkWatermark @ 1565082044900
--->>> event.key:male | event中timestamp:1565082050934| 2019-08-06 17:00:50.934| currentMaxTimestamp:1565082051921| 2019-08-06 17:00:51.921| watermarkWatermark @ 1565082046921
--->>> event.key:male | event中timestamp:1565082051945| 2019-08-06 17:00:51.945| currentMaxTimestamp:1565082051945| 2019-08-06 17:00:51.945| watermarkWatermark @ 1565082046921
--->>> event.key:male | event中timestamp:1565082052955| 2019-08-06 17:00:52.955| currentMaxTimestamp:1565082052955| 2019-08-06 17:00:52.955| watermarkWatermark @ 1565082046945
--->>> event.key:male | event中timestamp:1565082055965| 2019-08-06 17:00:55.965| currentMaxTimestamp:1565082055965| 2019-08-06 17:00:55.965| watermarkWatermark @ 1565082047955
8> event.key:male,window中元素個(gè)數(shù):6,window第一個(gè)元素時(shí)間戳:2019-08-06 17:00:41.839,window最后一個(gè)元素時(shí)間戳:2019-08-06 17:00:49.900,window開(kāi)始時(shí)間戳:2019-08-06 17:00:40.000,window結(jié)束時(shí)間戳:2019-08-06 17:00:50.000,窗口內(nèi)所有的時(shí)間戳:[1565082041839, 1565082044850, 1565082046869, 1565082046890, 1565082047880, 1565082049900]
--->>> event.key:male | event中timestamp:1565082056983| 2019-08-06 17:00:56.983| currentMaxTimestamp:1565082056983| 2019-08-06 17:00:56.983| watermarkWatermark @ 1565082050965
--->>> event.key:male | event中timestamp:1565082055993| 2019-08-06 17:00:55.993| currentMaxTimestamp:1565082056983| 2019-08-06 17:00:56.983| watermarkWatermark @ 1565082051983
--->>> event.key:male | event中timestamp:1565082059005| 2019-08-06 17:00:59.005| currentMaxTimestamp:1565082059005| 2019-08-06 17:00:59.005| watermarkWatermark @ 1565082051983
--->>> event.key:male | event中timestamp:1565082061028| 2019-08-06 17:01:01.028| currentMaxTimestamp:1565082061028| 2019-08-06 17:01:01.028| watermarkWatermark @ 1565082054005
--->>> event.key:male | event中timestamp:1565082062036| 2019-08-06 17:01:02.036| currentMaxTimestamp:1565082062036| 2019-08-06 17:01:02.036| watermarkWatermark @ 1565082056028
  • 為什么watermark會(huì)出現(xiàn)-5000
    • AssignerWithPeriodicWatermarks子類(lèi)是每隔一段時(shí)間執(zhí)行的,這個(gè)具體由ExecutionConfig.setAutoWatermarkInterval設(shè)置优构,默認(rèn)是200ms诵叁,之所以會(huì)出現(xiàn)-5000時(shí)因?yàn)槟銢](méi)有數(shù)據(jù)進(jìn)入窗口,當(dāng)然一直都是-5000钦椭,但是getCurrentWatermark方法不是在執(zhí)行extractTimestamp后才執(zhí)行拧额。

結(jié)論

  • window的觸發(fā)要符合以下幾個(gè)條件:
    • watermark時(shí)間 >= window_end_time
    • 在[window_start_time,window_end_time)中有數(shù)據(jù)存在;
  • 同時(shí)滿足了以上2個(gè)條件彪腔,window才會(huì)觸發(fā)侥锦。
  • watermark是一個(gè)全局的值,不是某一個(gè)key下的值德挣,所以即使不是同一個(gè)key的數(shù)據(jù)恭垦,其warmark也會(huì)增加.
  • 這個(gè)部分的知識(shí)點(diǎn)需要細(xì)細(xì)的理解一下;

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市吗浩,隨后出現(xiàn)的幾起案子建芙,更是在濱河造成了極大的恐慌,老刑警劉巖懂扼,帶你破解...
    沈念sama閱讀 222,104評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件禁荸,死亡現(xiàn)場(chǎng)離奇詭異右蒲,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)赶熟,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén)瑰妄,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人映砖,你說(shuō)我怎么就攤上這事间坐。” “怎么了邑退?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,697評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵竹宋,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我地技,道長(zhǎng)蜈七,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,836評(píng)論 1 298
  • 正文 為了忘掉前任莫矗,我火速辦了婚禮飒硅,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘作谚。我一直安慰自己三娩,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,851評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布妹懒。 她就那樣靜靜地躺著雀监,像睡著了一般。 火紅的嫁衣襯著肌膚如雪彬伦。 梳的紋絲不亂的頭發(fā)上滔悉,一...
    開(kāi)封第一講書(shū)人閱讀 52,441評(píng)論 1 310
  • 那天,我揣著相機(jī)與錄音单绑,去河邊找鬼回官。 笑死,一個(gè)胖子當(dāng)著我的面吹牛搂橙,可吹牛的內(nèi)容都是我干的歉提。 我是一名探鬼主播,決...
    沈念sama閱讀 40,992評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼区转,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼苔巨!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起废离,我...
    開(kāi)封第一講書(shū)人閱讀 39,899評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤侄泽,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后蜻韭,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體悼尾,經(jīng)...
    沈念sama閱讀 46,457評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡柿扣,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,529評(píng)論 3 341
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了闺魏。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片未状。...
    茶點(diǎn)故事閱讀 40,664評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖析桥,靈堂內(nèi)的尸體忽然破棺而出司草,到底是詐尸還是另有隱情,我是刑警寧澤泡仗,帶...
    沈念sama閱讀 36,346評(píng)論 5 350
  • 正文 年R本政府宣布埋虹,位于F島的核電站,受9級(jí)特大地震影響娩怎,放射性物質(zhì)發(fā)生泄漏吨岭。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,025評(píng)論 3 334
  • 文/蒙蒙 一峦树、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧旦事,春花似錦魁巩、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,511評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至卖鲤,卻和暖如春肾扰,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背蛋逾。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,611評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工集晚, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人区匣。 一個(gè)月前我還...
    沈念sama閱讀 49,081評(píng)論 3 377
  • 正文 我出身青樓偷拔,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親亏钩。 傳聞我的和親對(duì)象是個(gè)殘疾皇子莲绰,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,675評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容