Flink -- Watermark 那些事

Watermark 是 Flink 實(shí)時(shí)處理計(jì)算平臺(tái)的一個(gè)重要概念斥废,也是 Google 的著名實(shí)時(shí)計(jì)算論文 The Data?ow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing 里面經(jīng)常提到的名詞。看了很多的參考文獻(xiàn),和文檔蝗柔。有了一些自己的理解和體會(huì)粒督,做個(gè)筆記。最近 Flink 1.5 版本正式發(fā)布灭忠,也算蹭個(gè)熱度吧。


1. Watermark 的理解

? 最早看到 Watermark 的概念就是在 Flink 的官方文檔里面:

The mechanism in Flink to measure progress in event time is watermarks. Watermarks flow as part of the data stream and carry a timestamp t. A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark)

?上面這句話中有一個(gè)詞叫做 event-time座硕,了解 DataFlow 模型的同學(xué)都知道弛作, event-time 表示的是每個(gè)獨(dú)立事件在各自設(shè)備上產(chǎn)生的時(shí)間,是這個(gè)事件特有的屬性华匾,好比是一個(gè)人的生日映琳。與之對(duì)應(yīng)的還有 processing-timeingestion-time。理解起來也不難瘦真,processing-time 就是 Flink 的 window 操作該事件的時(shí)間刊头,ingestion-time 是事件作為數(shù)據(jù)源進(jìn)入 Flink 系統(tǒng)的時(shí)間。參考下面這個(gè)圖1诸尽,來源于 Flink 文檔

圖1:Event-Time Ingestion-Time Processing-Time 示例

? 重新回到引用中的這句英文原杂,翻譯過來的大致意思是," Watermarks 是 Flink 用來度量 event-time 事件處理進(jìn)度的一種機(jī)制您机,Watermarks 作為數(shù)據(jù)流的一部分穿肄,攜帶一個(gè)時(shí)間 t。表示這次的數(shù)據(jù)流中 event-time 已經(jīng)處理到時(shí)間 t 了际看,所有 event-time 早于或等于 t 時(shí)刻的事件都不應(yīng)該再出現(xiàn)在這個(gè)數(shù)據(jù)流中了咸产。" 有些朋友可能會(huì)有疑問,為什么要弄這么一個(gè)奇怪的限制仲闽。這是因?yàn)橐话銇碚f脑溢,在實(shí)際環(huán)境下,由于網(wǎng)絡(luò)阻塞赖欣,延遲等問題屑彻,processing-time 相比 event-time 會(huì)有延遲滯后的現(xiàn)象,而且這種現(xiàn)象非常普遍顶吮。也就是說社牲,并不是生產(chǎn)出一個(gè)事件, 就能在第一時(shí)間被處理悴了。參考下面這個(gè)圖2搏恤,來源于 Dataflow Model 論文违寿。

圖2:時(shí)間域的偏離

?圖2可以看到,橫坐標(biāo)是 event-time熟空,縱坐標(biāo)是 processing-time藤巢。理想的處理機(jī)制是,12:01分的 event-time 時(shí)間痛阻,應(yīng)該就在 12:01分的 processing-time 時(shí)刻被處理菌瘪。但是實(shí)際情況下腮敌,橫向藍(lán)色實(shí)線對(duì)應(yīng)的 processing-time 晚于12:01分阱当。所以圖中橫向黑色實(shí)線表示 event-time 的偏離,縱向紅色實(shí)線表示 processing-time 的偏離糜工,這些都比較容易理解弊添。 接下來,按照原圖的注釋捌木,淺色虛線表示 理想的 watermark油坝,深色虛線表示 實(shí)際的 watermark∨亳桑看到這里似乎有些困惑澈圈,該如何理解呢?

?直到帆啃,我看到另外一篇由 Tyler Akidau 大神寫的文章 Stream 102瞬女,是這樣描述 watermark 的:

Watermarks are temporal notions of input completeness in the event-time domain. Worded differently, they are the way the system measures progress and completeness relative to the event times of the records being processed in a stream of events.

? 大致意思是," Watermark 是從 event-time 維度描述輸入數(shù)據(jù)的完整性努潘, 換句話說诽偷,是系統(tǒng)評(píng)價(jià)數(shù)據(jù)流中處理 event-time 事件進(jìn)度和完整性的方式"。在圖2中疯坤,隨著 processing-time 的推移报慕,深色虛線逐步獲取了 event-time 的完成進(jìn)度。從概念上說压怠,可以理解成一個(gè)函數(shù)F(P) -> E眠冈,給定一個(gè) processing-time 值,函數(shù)返回對(duì)應(yīng)的 event-time 值菌瘫。這個(gè) event-time 時(shí)間值 E 就被系統(tǒng)認(rèn)為所有早于 E 時(shí)刻的 event-time 事件都已經(jīng)被觀察處理過蜗顽,或者說系統(tǒng)斷言,以后都不會(huì)再有早于 E 時(shí)刻的 event-time 事件出現(xiàn)了突梦。至此诫舅,對(duì)于流處理的算子(window ... )而言,對(duì)于這個(gè)時(shí)間段的計(jì)算告一段落宫患,可以生成結(jié)果了刊懈。

?當(dāng)然,很多朋友會(huì)問,如果過了 watermark 之后還有比估計(jì)時(shí)刻早的數(shù)據(jù)虚汛,姍姍來遲匾浪,怎么辦?Flink 框架自帶 Allowed Lateness 機(jī)制卷哩,這部分又是涉及到另一個(gè)概念 trigger蛋辈,這篇文章先不提了。

2. Watermark 的例子

? 了解了 watermark 概念将谊,引用 vishnuviswanath 論壇上的例子更好的加深理解冷溶。

case 1: 消息到達(dá)沒有延遲

? 如圖3,假設(shè)數(shù)據(jù)源生成3條消息尊浓,分別是第13秒逞频,第13秒,第16秒栋齿。計(jì)算窗口為10秒苗胀,每隔5秒滑動(dòng)一次。


圖3:數(shù)據(jù)生成

?這些消息在圖4中會(huì)落入對(duì)應(yīng)時(shí)間窗口瓦堵。前兩個(gè)在第13秒生成的消息會(huì)落入 [5s - 15s] 的窗口1和 [10s - 20s] 的窗口2基协。而第16秒生成的消息 會(huì)落入 [10s - 20s] 的窗口2和 [15s - 25s] 的窗口3。最終每隔窗口期望得到的結(jié)果分別是 (a,2), (a,3) 和 (a,1)菇用。

圖4:期望結(jié)果

case 2: 消息到達(dá)存在延遲

?假設(shè)有一個(gè)第13秒生成的消息到達(dá)時(shí)澜驮,延遲了6秒(在第19秒到達(dá)),原因可能是網(wǎng)絡(luò)阻塞刨疼。那么情況就變成了圖5這樣泉唁。


圖5:數(shù)據(jù)到達(dá)存在延遲

? 圖中可以看到,延遲的消息落入了窗口2和窗口3揩慕,因?yàn)?9秒屬于 10-20 和 15-25的范圍內(nèi)亭畜。雖然這個(gè)例子中,對(duì)于窗口2的結(jié)果沒有影響迎卤,但是窗口1和窗口3的結(jié)果都發(fā)生了變化拴鸵,導(dǎo)致發(fā)生錯(cuò)誤的原因就是處理消息時(shí)間的時(shí)候用的是 processing-time 而不是 event-time

case 3:用基于 event-time 的系統(tǒng)來處理問題

?使用 event-time 處理機(jī)制蜗搔,需要對(duì)每個(gè)消息提取 event-time 信息劲藐。這個(gè)時(shí)間信息一般消息自帶,或者手動(dòng)生成樟凄∑肝撸看下面一個(gè)例子,暫時(shí)忽略 getCurrentWatermark 這個(gè)函數(shù)缝龄,之后來討論汰现。

public class TimestampExtractor implements AssignerWithPeriodicWatermarks<String> {
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(System.currentTimeMillis());
    }

    @Override
    public long extractTimestamp(String e, long previousElementTimestamp) {
        return Long.parseLong(e.split(",")[1]);
    }
}

? 有了 event-time 信息提取器之后挂谍,主函數(shù)流程如下:

public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> text = env.socketTextStream("localhost", 9999)
                                     .assignTimestampsAndWatermarks(new TimestampExtractor());

        DataStream<Tuple2<String, Integer>> count = text.map((MapFunction<String, Tuple2<String, Integer>>) m -> Tuple2.of(m.split(",")[0], 1))
                                                        .keyBy(0)
                                                        .timeWindow(Time.seconds(10), Time.seconds(5))
                                                        .sum(1);
        count.print();
        env.execute("EventTime processing example");
}

? 產(chǎn)生的結(jié)果見圖6,圖中可以看出窗口2和窗口3產(chǎn)生了正確的結(jié)果瞎饲,但是窗口1還是錯(cuò)的口叙。Flink 不把延遲的消息指派給窗口3是因?yàn)樗老⒌?event-time不屬于窗口3。但是為什么 Flink 不將延遲消息指派給窗口1呢嗅战?是因?yàn)橥铮?dāng)延遲消息到達(dá)的時(shí)候(第19秒)窗口1已經(jīng)完成計(jì)算了。那好驮捍,接下來解決這個(gè)問題疟呐。


圖6:基于 event-time 的計(jì)算

case 4:使用 Watermark 機(jī)制

? 對(duì)于我們的案例來說,我們的目的是告訴 Flink 一個(gè)消息可以延遲多久厌漂。在 case 3 中提到了 getCurrentWatermark 函數(shù)萨醒,我們?cè)O(shè)置了 watermark 值為當(dāng)前系統(tǒng)時(shí)間斟珊,這表示沒有考慮到延遲的消息∥現(xiàn)在設(shè)置 watermark 值為 當(dāng)前時(shí)間 - 5秒,這就表明 Flink 期望的消息延遲最大是5秒囤踩。這是因?yàn)榇翱谠谳敵鲎罱K計(jì)算結(jié)果的條件是旨椒,當(dāng) watermark 超過窗口結(jié)束時(shí)間。所以只有當(dāng)前時(shí)間到達(dá)第20秒的時(shí)候堵漱,窗口1 [5s - 15s] 才符合輸出條件综慎,同樣的窗口2 [10s - 20s] 要第25秒時(shí)才會(huì)輸出結(jié)果。代碼如下:

public Watermark getCurrentWatermark() {
        return new Watermark(System.currentTimeMillis() - 5000);
}

? 運(yùn)行的結(jié)果如圖7勤庐,最終示惊,三個(gè)窗口的結(jié)果都正確。

圖7:使用 Watermark 機(jī)制

3. Watermark 的本地驗(yàn)證

? 看了上面的案例愉镰,突然又有了一個(gè)奇葩的想法米罚,如果把 當(dāng)前時(shí)間 - Lateness 的方法改成 當(dāng)前時(shí)間 + Lateness 的情況呢?是不是窗口會(huì)提前給出結(jié)果呢丈探?想到這里录择,打算在本地試一試。Flink 分配消息時(shí)間戳的方式有兩種:1. 直接在數(shù)據(jù)源 (source)指派; 2. 通過定義 timestamp assigner / watermark generator碗降。具體教程參考官方文檔隘竭。上一節(jié)內(nèi)容中的代碼主要是第二種方式,剛好本地驗(yàn)證的時(shí)候試試第一種方式讼渊。

public class EventTimeDoc {
    
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 設(shè)置并行度為1动看,方便調(diào)試
        env.setParallelism(1);

        DataStreamSource<Tuple2<String, Long>> input = env.addSource(new SourceFunction<Tuple2<String, Long>>() {
            String[] letters = new String[]{"A", "B", "C", "D", "E", "F"};
            Random random = new Random();

            @Override
            public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
                while (true) {
                    String key = letters[random.nextInt(6)];
                    Tuple2<String, Long> next = Tuple2.of(key, System.currentTimeMillis());

                    // 打印產(chǎn)生的消息
                    long w = showWindowsStart(next.f1, 0L, 7000L);
                    System.out.println(next + " -> " + LocalDateTime.fromDateFields(new Date(next.f1)) + " -> 所屬的窗口起始點(diǎn)是:" + LocalDateTime.fromDateFields(new Date(w)));
                    // event-time 設(shè)置為消息本身時(shí)間
                    ctx.collectWithTimestamp(next, next.f1);
                    // 消息時(shí)間 + Lateness
                    ctx.emitWatermark(new Watermark(next.f1 + 8000));

                    // 生成一條消息,暫停2秒
                    TimeUnit.SECONDS.sleep(2);
                }
            }

            @Override
            public void cancel() {

            }
        });

        input.keyBy(0).timeWindow(Time.seconds(7))
             .reduce((ReduceFunction<Tuple2<String, Long>>) (value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1))
             .map(new MapFunction<Tuple2<String, Long>, Object>() {
                 @Override
                 public Object map(Tuple2<String, Long> value) throws Exception {
                     return Tuple2.of(value.f0, LocalDateTime.fromDateFields(new Date(value.f1)));
                 }
             })
             .print();

        env.execute();
    }

    private static long showWindowsStart(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }
}

? 本地測(cè)試構(gòu)造了一個(gè) <String, Long>的 Tuple 消息序列爪幻,String 是 字母 A-F 的隨機(jī)取值菱皆,Long 是每次生成消息時(shí)的時(shí)間戳赋兵。計(jì)算窗口選擇 TumblingEventTimeWindows,固定為 7 秒搔预。伴隨每條消息生成發(fā)送 watermark 值霹期,并休眠 2 秒。其中 watermark 的值設(shè)置成 消息的EventTime + Lateness (Lateness 這里設(shè)置的是8秒)拯田。

? 本地測(cè)試時(shí)历造,第一個(gè)消息產(chǎn)生在第49秒,根據(jù)窗口的生命周期我們知道船庇,所屬窗口的第一個(gè)消息一旦到達(dá)吭产,窗口就立刻生成,窗口范圍計(jì)算方法是 org.apache.flink.streaming.api.windowing.windows.TimeWindow 的:

public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }

? 對(duì)照?qǐng)D8的分析鸭轮,窗口1 的范圍是 [48s - 55s) 左閉右開區(qū)間臣淤。伴隨著該消息的生成,watermark 也被發(fā)送窃爷,此時(shí) watermark 是57秒邑蒋,已經(jīng)超過了窗口1的結(jié)束時(shí)間,窗口1輸出最終結(jié)果 -- 第49秒的消息按厘,結(jié)束自己的生命周期医吊,窗口1被移除。所以第51秒逮京,第53秒的消息沒有被觀察到卿堂。

? 接下來,第55秒的消息進(jìn)入窗口2 [55s - 1分2s) 左閉右開區(qū)間懒棉,和第49秒消息一樣 watermark 值已經(jīng)超過了窗口2的結(jié)束時(shí)間草描,窗口2輸出最終結(jié)果 -- 第55秒的消息,結(jié)束自己的生命周期策严,窗口2被移除穗慕。所以第 57秒,第59秒享钞,第1分1秒的消息沒有被觀察到揍诽。

? 時(shí)間繼續(xù)推移,當(dāng)?shù)?分1秒的消息到達(dá)的時(shí)候栗竖,對(duì)應(yīng)的 watermark 值為 1分9秒暑脆,已經(jīng)超過了 窗口3 [1分2s - 1分9s) 左閉右開區(qū)間 的結(jié)束時(shí)間。結(jié)果窗口3中的消息還沒到達(dá)狐肢,沒來得及計(jì)算添吗,就已經(jīng)被移除了,第 1分3秒份名,第1分5秒碟联,第1分7秒的消息沒有被觀察到妓美。

? 以此類推,在 窗口4 [1分9s - 1分16s) 左閉右開區(qū)間最終得到的結(jié)果只有第1分9秒的消息鲤孵。

圖8:執(zhí)行分析圖

? 附上控制臺(tái)輸出的結(jié)果壶栋,見圖9∑占啵可以發(fā)現(xiàn)最終結(jié)果和圖8的分析是一致的贵试。(紅色框的內(nèi)容就是各個(gè)窗口最終結(jié)果)

圖9:控制臺(tái)輸出信息

4. 總結(jié)

? 實(shí)時(shí)計(jì)算中,處理延遲消息是系統(tǒng)的重要一部分凯正。整篇文章毙玻,可以了解遲到的消息如何對(duì)系統(tǒng)造成不同結(jié)果,以及不同 watermark 值在 ApacheFlink 中對(duì)窗口結(jié)果的影響廊散。

參考資料

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末桑滩,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子允睹,更是在濱河造成了極大的恐慌运准,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,183評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件擂找,死亡現(xiàn)場(chǎng)離奇詭異戳吝,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)贯涎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,850評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來慢洋,“玉大人塘雳,你說我怎么就攤上這事∑粘铮” “怎么了败明?”我有些...
    開封第一講書人閱讀 168,766評(píng)論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)太防。 經(jīng)常有香客問我妻顶,道長(zhǎng),這世上最難降的妖魔是什么蜒车? 我笑而不...
    開封第一講書人閱讀 59,854評(píng)論 1 299
  • 正文 為了忘掉前任讳嘱,我火速辦了婚禮,結(jié)果婚禮上酿愧,老公的妹妹穿的比我還像新娘沥潭。我一直安慰自己,他們只是感情好嬉挡,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,871評(píng)論 6 398
  • 文/花漫 我一把揭開白布钝鸽。 她就那樣靜靜地躺著汇恤,像睡著了一般但校。 火紅的嫁衣襯著肌膚如雪媳瞪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,457評(píng)論 1 311
  • 那天期犬,我揣著相機(jī)與錄音颜懊,去河邊找鬼蓝角。 笑死,一個(gè)胖子當(dāng)著我的面吹牛饭冬,可吹牛的內(nèi)容都是我干的使鹅。 我是一名探鬼主播,決...
    沈念sama閱讀 40,999評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼昌抠,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼患朱!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起炊苫,我...
    開封第一講書人閱讀 39,914評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤裁厅,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后侨艾,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體执虹,經(jīng)...
    沈念sama閱讀 46,465評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,543評(píng)論 3 342
  • 正文 我和宋清朗相戀三年唠梨,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了袋励。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,675評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡当叭,死狀恐怖茬故,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蚁鳖,我是刑警寧澤磺芭,帶...
    沈念sama閱讀 36,354評(píng)論 5 351
  • 正文 年R本政府宣布,位于F島的核電站醉箕,受9級(jí)特大地震影響钾腺,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜讥裤,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,029評(píng)論 3 335
  • 文/蒙蒙 一放棒、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧坞琴,春花似錦哨查、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,514評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽邮府。三九已至,卻和暖如春溉奕,著一層夾襖步出監(jiān)牢的瞬間褂傀,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,616評(píng)論 1 274
  • 我被黑心中介騙來泰國(guó)打工加勤, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留仙辟,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,091評(píng)論 3 378
  • 正文 我出身青樓鳄梅,卻偏偏與公主長(zhǎng)得像叠国,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子戴尸,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,685評(píng)論 2 360

推薦閱讀更多精彩內(nèi)容