時(shí)間語義纺酸、Event Time和Watermark機(jī)制原理與實(shí)踐

[TOC]
在流處理中,時(shí)間是一個(gè)非常核心的概念址否,是整個(gè)系統(tǒng)的基石餐蔬。比如,我們經(jīng)常會(huì)遇到這樣的需求:給定一個(gè)時(shí)間窗口佑附,比如一個(gè)小時(shí)樊诺,統(tǒng)計(jì)時(shí)間窗口的內(nèi)數(shù)據(jù)指標(biāo)。那如何界定哪些數(shù)據(jù)將進(jìn)入這個(gè)窗口呢音同?在窗口的定義之前词爬,首先需要確定一個(gè)應(yīng)用使用什么樣的時(shí)間語義。

本文將介紹Flink的Event Time权均、Processing Time和Ingestion Time三種時(shí)間語義顿膨,接著會(huì)詳細(xì)介紹Event Time和Watermark的工作機(jī)制锅锨,以及如何對(duì)數(shù)據(jù)流設(shè)置Event Time并生成Watermark。

Flink的三種時(shí)間語義


image.png

如上圖所示恋沃,F(xiàn)link支持三種時(shí)間語義:

Event Time
Event Time指的是數(shù)據(jù)流中每個(gè)元素或者每個(gè)事件自帶的時(shí)間屬性必搞,一般是事件發(fā)生的時(shí)間。由于事件從發(fā)生到進(jìn)入Flink時(shí)間算子之間有很多環(huán)節(jié)囊咏,一個(gè)較早發(fā)生的事件因?yàn)檠舆t可能較晚到達(dá)恕洲,因此使用Event Time意味著事件到達(dá)有可能是亂序的。

使用Event Time時(shí)梅割,最理想的情況下霜第,我們可以一直等待所有的事件到達(dá)后再進(jìn)行時(shí)間窗口的處理。假設(shè)一個(gè)時(shí)間窗口內(nèi)的所有數(shù)據(jù)都已經(jīng)到達(dá)户辞,基于Event Time的流處理會(huì)得到正確且一致的結(jié)果:無論我們是將同一個(gè)程序部署在不同的計(jì)算環(huán)境還是在相同的環(huán)境下多次計(jì)算同一份數(shù)據(jù)泌类,都能夠得到同樣的計(jì)算結(jié)果。我們根本不同擔(dān)心亂序到達(dá)的問題咆课。但這只是理想情況末誓,現(xiàn)實(shí)中無法實(shí)現(xiàn)扯俱,因?yàn)槲覀兗炔恢谰烤挂榷嚅L時(shí)間才能確認(rèn)所有事件都已經(jīng)到達(dá)书蚪,更不可能無限地一直等待下去。在實(shí)際應(yīng)用中迅栅,當(dāng)涉及到對(duì)事件按照時(shí)間窗口進(jìn)行統(tǒng)計(jì)時(shí)殊校,F(xiàn)link會(huì)將窗口內(nèi)的事件緩存下來,直到接收到一個(gè)Watermark读存,以確認(rèn)不會(huì)有更晚數(shù)據(jù)的到達(dá)为流。Watermark意味著在一個(gè)時(shí)間窗口下,F(xiàn)link會(huì)等待一個(gè)有限的時(shí)間让簿,這在一定程度上降低了計(jì)算結(jié)果的絕對(duì)準(zhǔn)確性敬察,而且增加了系統(tǒng)的延遲。在流處理領(lǐng)域尔当,比起其他幾種時(shí)間語義莲祸,使用Event Time的好處是某個(gè)事件的時(shí)間是確定的,這樣能夠保證計(jì)算結(jié)果在一定程度上的可預(yù)測性椭迎。

一個(gè)基于Event Time的Flink程序中必須定義Event Time锐帜,以及如何生成Watermark。我們可以使用元素中自帶的時(shí)間畜号,也可以在元素到達(dá)Flink后人為給Event Time賦值缴阎。

使用Event Time的優(yōu)勢(shì)是結(jié)果的可預(yù)測性,缺點(diǎn)是緩存較大简软,增加了延遲蛮拔,且調(diào)試和定位問題更復(fù)雜述暂。

Processing Time
對(duì)于某個(gè)算子來說,Processing Time指算子使用當(dāng)前機(jī)器的系統(tǒng)時(shí)鐘來定義時(shí)間建炫。在Processing Time的時(shí)間窗口場景下贸典,無論事件什么時(shí)候發(fā)生,只要該事件在某個(gè)時(shí)間段達(dá)到了某個(gè)算子踱卵,就會(huì)被歸結(jié)到該窗口下廊驼,不需要Watermark機(jī)制。對(duì)于一個(gè)程序在同一個(gè)計(jì)算環(huán)境來說惋砂,每個(gè)算子都有一定的耗時(shí)妒挎,同一個(gè)事件的Processing Time,第n個(gè)算子和第n+1個(gè)算子不同西饵。如果一個(gè)程序在不同的集群和環(huán)境下執(zhí)行時(shí)酝掩,限于軟硬件因素,不同環(huán)境下前序算子處理速度不同眷柔,對(duì)于下游算子來說期虾,事件的Processing Time也會(huì)不同,不同環(huán)境下時(shí)間窗口的計(jì)算結(jié)果會(huì)發(fā)生變化驯嘱。因此镶苞,Processing Time在時(shí)間窗口下的計(jì)算會(huì)有不確定性。

Processing Time只依賴當(dāng)前執(zhí)行機(jī)器的系統(tǒng)時(shí)鐘鞠评,不需要依賴Watermark茂蚓,無需緩存。Processing Time是實(shí)現(xiàn)起來非常簡單也是延遲最小的一種時(shí)間語義剃幌。

Ingestion Time
Ingestion Time是事件到達(dá)Flink Souce的時(shí)間聋涨。從Source到下游各個(gè)算子中間可能有很多計(jì)算環(huán)節(jié),任何一個(gè)算子的處理速度快慢可能影響到下游算子的Processing Time负乡。而Ingestion Time定義的是數(shù)據(jù)流最早進(jìn)入Flink的時(shí)間牍白,因此不會(huì)被算子處理速度影響。

Ingestion Time通常是Event Time和Processing Time之間的一個(gè)折中方案抖棘。比起Event Time茂腥,Ingestion Time可以不需要設(shè)置復(fù)雜的Watermark,因此也不需要太多緩存钉答,延遲較低欺栗。比起Processing Time匾鸥,Ingestion Time的時(shí)間是Souce賦值的曹仗,一個(gè)事件在整個(gè)處理過程從頭至尾都使用這個(gè)時(shí)間古今,而且后續(xù)算子不受前序算子處理速度的影響,計(jì)算結(jié)果相對(duì)準(zhǔn)確一些右蹦,但計(jì)算成本稍高诊杆。

設(shè)置時(shí)間語義
在Flink中歼捐,我們需要在執(zhí)行環(huán)境層面設(shè)置使用哪種時(shí)間語義。下面的代碼使用Event Time:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

如果想用另外兩種時(shí)間語義晨汹,需要替換為:TimeCharacteristic.ProcessingTime和TimeCharacteristic.IngestionTime豹储。

Event Time和Watermark
Flink的三種時(shí)間語義中,Processing Time和Ingestion Time都可以不用設(shè)置Watermark淘这。如果我們要使用Event Time語義剥扣,以下兩項(xiàng)配置缺一不可:第一,使用一個(gè)時(shí)間戳為數(shù)據(jù)流中每個(gè)事件的Event Time賦值铝穷;第二钠怯,生成Watermark。

實(shí)際上曙聂,Event Time是每個(gè)事件的元數(shù)據(jù)晦炊,F(xiàn)link并不知道每個(gè)事件的發(fā)生時(shí)間是什么,我們必須要為每個(gè)事件的Event Time賦值一個(gè)時(shí)間戳宁脊。關(guān)于時(shí)間戳断国,包括Flink在內(nèi)的絕大多數(shù)系統(tǒng)都支持Unix時(shí)間戳系統(tǒng)(Unix time或Unix epoch)。Unix時(shí)間戳系統(tǒng)以1970-01-01 00:00:00.000 為起始點(diǎn)榆苞,其他時(shí)間記為距離該起始時(shí)間的整數(shù)差值稳衬,一般是毫秒(millisecond)精度。

有了Event Time時(shí)間戳语稠,我們還必須生成Watermark宋彼。Watermark是Flink插入到數(shù)據(jù)流中的一種特殊的數(shù)據(jù)結(jié)構(gòu)弄砍,它包含一個(gè)時(shí)間戳仙畦,并假設(shè)后續(xù)不會(huì)有小于該時(shí)間戳的數(shù)據(jù)。下圖展示了一個(gè)亂序數(shù)據(jù)流音婶,其中方框是單個(gè)事件慨畸,方框中的數(shù)字是其對(duì)應(yīng)的Event Time時(shí)間戳,圓圈為Watermark衣式,圓圈中的數(shù)字為Watermark對(duì)應(yīng)的時(shí)間戳寸士。


image.png
  • Watermark與事件的時(shí)間戳緊密相關(guān)。一個(gè)時(shí)間戳為T的Watermark假設(shè)后續(xù)到達(dá)的事件時(shí)間戳都大于T碴卧。
  • 假如Flink算子接收到一個(gè)違背上述規(guī)則的事件弱卡,該事件將被認(rèn)定為遲到數(shù)據(jù),如上圖中時(shí)間戳為19的事件比Watermark(20)更晚到達(dá)住册。Flink提供了一些其他機(jī)制來處理遲到數(shù)據(jù)婶博。
  • Watermark時(shí)間戳必須單調(diào)遞增,以保證時(shí)間不會(huì)倒流荧飞。
  • Watermark機(jī)制允許用戶來控制準(zhǔn)確度和延遲凡人。Watermark設(shè)置得與事件時(shí)間戳相距緊湊名党,會(huì)產(chǎn)生不少遲到數(shù)據(jù),影響計(jì)算結(jié)果的準(zhǔn)確度挠轴,整個(gè)應(yīng)用的延遲很低传睹;Watermark設(shè)置得非常寬松,準(zhǔn)確度能夠得到提升岸晦,但應(yīng)用的延遲較高欧啤,因?yàn)镕link必須等待更長的時(shí)間才進(jìn)行計(jì)算。

Flink的Watermark細(xì)節(jié)介紹

Watermark是什么启上?從不同的維度可以有不同的理解

  1. 從Watermark的計(jì)算角度看:可以將Watermark理解為一個(gè)函數(shù):
    堂油,它的輸入是當(dāng)前的系統(tǒng)時(shí)間,輸出是一個(gè)Event Time(一個(gè)時(shí)間戳)碧绞,而且輸出的這個(gè)時(shí)間戳是嚴(yán)格單調(diào)遞增的府框。這樣看,Watermark就是一個(gè)函數(shù)讥邻。
  2. 從Watermark的具體形式來看:可以將Watermark當(dāng)成一個(gè)個(gè)時(shí)間戳迫靖,值就是1中輸出的那個(gè)時(shí)間戳。
  3. 從Watermark流轉(zhuǎn)的角度看:可以將Watermark理解成夾雜在正常流事件中的一個(gè)個(gè)特殊事件兴使。

這3種描述方式系宜,看似不同,實(shí)則一樣发魄,只是從不同的角度去看了而已盹牧。不管怎么理解,我們必須知道:流處理系統(tǒng)規(guī)定励幼,如果某個(gè)時(shí)刻Watermark的值為T1汰寓,那系統(tǒng)就認(rèn)為凡是早于或等于T1時(shí)間的事件都已經(jīng)收到了。注意苹粟,這個(gè)就是Watermark所代表的含義有滑,實(shí)際因?yàn)楝F(xiàn)實(shí)中各種情況,未必能嚴(yán)格做到這樣嵌削,但目標(biāo)就是要達(dá)到上面規(guī)定的這樣毛好,或者無限逼近。

Why苛秕?
為什么需要Watermark肌访?這個(gè)也有很多種描述方式,往大了說就是提供一種理論上解決分布式系統(tǒng)中消息亂序問題(這是分布式系統(tǒng)中一個(gè)經(jīng)典難題)的方案艇劫。說小點(diǎn)就是在有狀態(tài)的流計(jì)算中吼驶,當(dāng)我們關(guān)注事件的順序或者完整性時(shí),需要有這么一種機(jī)制能實(shí)現(xiàn)這個(gè)需求。

這里的完整性我舉個(gè)例子解釋一下:比如我們基于事件發(fā)生時(shí)間統(tǒng)計(jì)每5min的用戶PV總量旨剥,那比如12:00-12:05這個(gè)5min的統(tǒng)計(jì)該在什么時(shí)間點(diǎn)計(jì)算呢咧欣?假設(shè)沒有Watermark這個(gè)概念,你就永遠(yuǎn)不知道什么時(shí)候12:00-12:05區(qū)間的所有事件才全到齊轨帜。你不能假定收到12:00-12:05的數(shù)據(jù)就認(rèn)為之前的數(shù)據(jù)已經(jīng)全部來了魄咕,因?yàn)閿?shù)據(jù)可能延遲+亂序了。而Watermark就是為了解決這個(gè)問題而提出的蚌父,當(dāng)你收到Watermark的值為12:00-12:05的事件時(shí)哮兰,你就可以認(rèn)為早于這個(gè)時(shí)間的數(shù)據(jù)已經(jīng)都到了,數(shù)據(jù)已經(jīng)完整了苟弛,可以進(jìn)行12:00-12:05這個(gè)5min區(qū)間的數(shù)據(jù)計(jì)算了喝滞。至于如何保證,這個(gè)是框架要做的事情(當(dāng)然一般需要開發(fā)者參與)膏秫。

Where右遭?
哪里需要Watermark?這里我給一個(gè)簡單粗暴的結(jié)論缤削,當(dāng)同時(shí)滿足下面兩個(gè)條件的時(shí)候才會(huì)需要Watermark:

  • 計(jì)算中使用了時(shí)間相關(guān)的算子(time-based operations)窘哈,其實(shí)再明確點(diǎn),就是使用了Window的時(shí)候(注:Flink的Global Window除外亭敢,這個(gè)Window不是基于時(shí)間的)滚婉。
  • 1中使用的時(shí)間相關(guān)的算子選擇使用事件時(shí)間,即Event Time(注:如果是Flink的話帅刀,也包含Ingestion Time)让腹。

這里再解釋一下2。前文我們介紹過有兩種時(shí)間Event Time和Processing Time(Flink獨(dú)有的Ingestion Time在Watermark這里可以歸結(jié)為Event Time扣溺,后文不再另行說明)骇窍,時(shí)間相關(guān)的算子選擇時(shí)間時(shí)必然是二選一。并不是選擇Processing Time的時(shí)候就沒有Watermark了娇妓,只是這個(gè)時(shí)候Processing Time自身就是一個(gè)完美的Watermark(因?yàn)闀r(shí)間一去不復(fù)返像鸡,Processing Time永遠(yuǎn)是單調(diào)遞增的),并不需要產(chǎn)生單獨(dú)的Watermark了哈恰。所以在Processing Time里面,你可以認(rèn)為Watermark沒有意義了志群,所以去掉了着绷,或者認(rèn)為Processing Time自身就是Watermark都行。

實(shí)戰(zhàn)

場景介紹
為了方便說明锌云,我構(gòu)造了一個(gè)簡單的場景荠医,假設(shè)有一個(gè)設(shè)備產(chǎn)生了一組事件,如下:

{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}

一共9個(gè)事件,id是事件名稱彬向,timestamp是設(shè)備端事件真實(shí)產(chǎn)生的時(shí)間兼贡。也就是事件真實(shí)產(chǎn)生順序是:

event1, event2, event3, event4, event5, event6, event7, event8, event9

但在傳輸過程中因?yàn)楦鞣N現(xiàn)實(shí)原因亂序了,到Flink這里的時(shí)候娃胆,事件順序變成了:

event1, event2, event4, event5, event7, event3, event6, event8, event9

現(xiàn)在我們要做的事情就是計(jì)算每5秒中的事件個(gè)數(shù)遍希,以此來判斷事件高峰期。

說明:

  1. 這個(gè)計(jì)算是非常有代表性的里烦,比如電商統(tǒng)計(jì)每小時(shí)的pv就能知道每天用戶高峰期發(fā)生在哪幾個(gè)時(shí)段凿蒜,這里為了方便說明問題,把問題簡化了胁黑,并且為了快速出結(jié)果废封,把時(shí)間粒度縮短為5秒鐘。
  2. 計(jì)算時(shí)丧蘸,要想結(jié)果準(zhǔn)確漂洋,就不能使用Processing Time,這樣如果數(shù)據(jù)從產(chǎn)生到被處理延遲比較大的話力喷,最終計(jì)算的結(jié)果也會(huì)不準(zhǔn)確氮发。除非這個(gè)延遲可控或者可接受,則可簡單的使用Processing Time冗懦,否則就必須用Event Time進(jìn)行計(jì)算爽冕。
Flink提供的Watermark機(jī)制

Flink提供了3種方式來生成Watermark:

  1. 在Source中生成Watermark;
  2. 通過AssignerWithPeriodicWatermarks生成Watermark披蕉;
  3. 通過AssignerWithPunctuatedWatermarks生成Watermark颈畸;

前面介紹過了Watermark是在使用Event Time的場景下才使用的,所以給事件增加Event Time和生成Watermark是一對(duì)操作没讲,一般都是一起使用的眯娱。方式1是直接在Flink的最源頭Source那里就生成了Event Time和Watermark。方式2和方式3則是流處理中的某一步驟(可以理解為一個(gè)特殊點(diǎn)的算子)爬凑,它的輸入是流徙缴,輸出還是流,只不過經(jīng)過這個(gè)流之后事件就會(huì)有Event Timestamp和Watermark了嘁信,一般這一步放在Source之后于样,最晚也要在時(shí)間算子之前,也就是Window之前潘靖。而且他兩的優(yōu)先級(jí)高穿剖,如果Source中生成了Watermark,后面又使用了方式2或3卦溢,則會(huì)覆蓋之前的Event Timestamp和Watermark糊余。

下面我們分別介紹每種方式秀又。

Watermark In Source

package com.niyanchun.watermark;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;

import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;

/**
 * Assign timestamp and watermark at Source Function Demo.
 *
 * @author NiYanchun
 **/
public class AssignAtSourceDemo {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    env.addSource(new CustomSource())
        .timeWindowAll(Time.seconds(5))
        .process(new CustomProcessFunction())
        .print();

    env.execute();
  }


  public static class CustomSource extends RichSourceFunction<JSONObject> {

    @Override
    public void run(SourceContext<JSONObject> ctx) throws Exception {
      System.out.println("event in source:");
      getOutOfOrderEvents().forEach(e -> {
        System.out.println(e);
        long timestampInMills = ((DateTime) e.get("timestamp")).getMillis();
        ctx.collectWithTimestamp(e, timestampInMills);
        ctx.emitWatermark(new Watermark(timestampInMills));
      });

      try {
        Thread.sleep(5000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    @Override
    public void cancel() {

    }
  }


  /**
   * generate out of order events
   *
   * @return List<JSONObject>
   */
  private static List<JSONObject> getOutOfOrderEvents() {
    // 2020-05-24 12:00:00
    JSONObject event1 = new JSONObject().fluentPut("id", "event1")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 0));
    // 2020-05-24 12:00:01
    JSONObject event2 = new JSONObject().fluentPut("id", "event2")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 1));
    // 2020-05-24 12:00:03
    JSONObject event3 = new JSONObject().fluentPut("id", "event3")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 3));
    // 2020-05-24 12:00:04
    JSONObject event4 = new JSONObject().fluentPut("id", "event4")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 4));
    // 2020-05-24 12:00:05
    JSONObject event5 = new JSONObject().fluentPut("id", "event5")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 5));
    // 2020-05-24 12:00:06
    JSONObject event6 = new JSONObject().fluentPut("id", "event6")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 6));
    // 2020-05-24 12:00:07
    JSONObject event7 = new JSONObject().fluentPut("id", "event7")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 7));
    // 2020-05-24 12:00:08
    JSONObject event8 = new JSONObject().fluentPut("id", "event8")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 8));
    // 2020-05-24 12:00:09
    JSONObject event9 = new JSONObject().fluentPut("id", "event9")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 9));

    // 這里把消息打亂,模擬實(shí)際中的消息亂序
    // 真實(shí)的消息產(chǎn)生順序是(根據(jù)時(shí)間戳):event1, event2, event3, event4, event5, event6, event7, event8, event9
    // 打亂之后的消息順序是:event1, event2, event4, event3, event5, event7, event6, event8, event9
    return Arrays.asList(event1, event2, event4, event5, event7, event3, event6, event8, event9);
  }

  public static class CustomProcessFunction extends ProcessAllWindowFunction<JSONObject, Object, TimeWindow> {

    @Override
    public void process(Context context, Iterable<JSONObject> elements, Collector<Object> out) throws Exception {
      TimeWindow window = context.window();
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      System.out.println(String.format("\nwindow{%s - %s}", sdf.format(window.getStart()), sdf.format(window.getEnd())));

      int count = 0;
      for (JSONObject element : elements) {
        System.out.println(element.getString("id"));
        count++;
      }
      System.out.println("Total:" + count);
    }
  }
}

這里自定義了一個(gè)Source贬芥,然后接了一個(gè)Window(timeWindowAll)吐辙,做了一個(gè)簡單的處理,最終輸出蘸劈。這里需要注意一個(gè)點(diǎn):timeWindowAll底層其實(shí)是定義了一個(gè)TumblingWindows昏苏,至于使用Processing Time(TumblingProcessingTimeWindows),還是Event Time(TumblingEventTimeWindows)則由env.setStreamTimeCharacteristic來確定的昵时,該選項(xiàng)的默認(rèn)值是TimeCharacteristic.ProcessingTime捷雕,即使用Processing Time。

作為演示壹甥,修改一下上面代碼救巷,先使用Processing Time,看下結(jié)果:

event in source:
{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}

window{2020-05-24 20:12:30 - 2020-05-24 20:12:35}
event1
event2
event4
event5
event7
event3
event6
event8
event9
Total:9

Process finished with exit code 0

可以看到句柠,只有一個(gè)Window浦译,其范圍是window{2020-05-24 20:12:30 - 2020-05-24 20:12:35},即我代碼運(yùn)行的時(shí)間溯职,顯然這樣的統(tǒng)計(jì)結(jié)果是沒有意義的精盅,因?yàn)樗w現(xiàn)不出業(yè)務(wù)真正的高峰期。后面我們只討論使用Event Time的情況谜酒。

現(xiàn)在重新改為env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);叹俏,然后運(yùn)行:

event in source:
{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}

window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}
event1
event2
event4
Total:3

window{2020-05-24 12:00:05 - 2020-05-24 12:00:10}
event5
event7
event6
event8
event9
Total:5

Process finished with exit code 0

我們看下現(xiàn)在的輸出,有兩個(gè)Window:window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}和window{2020-05-24 12:00:05 - 2020-05-24 12:00:10}僻族,可以看到就是5秒鐘一個(gè)Window粘驰。然后12:00:00-12:00:05這個(gè)Window里面包含了3個(gè)事件:event1,event2述么,event4蝌数;12:00:05-12:00:10這個(gè)Window里面包含了5個(gè)事件:event5、event7度秘、event6顶伞、event8、event9剑梳。

從這個(gè)結(jié)果看event3丟了唆貌,其它數(shù)據(jù)都在,為什么呢阻荒?如果我說因?yàn)閑vent3亂序了挠锥,排在了后邊,你肯定會(huì)說event6也排到了event7后邊侨赡,為什么event6卻沒有丟呢?要解釋清楚這個(gè)問題還需要涉及到觸發(fā)器以及窗口的原理和機(jī)制,為了保證行文的連貫性羊壹,這里我先直接給出結(jié)論:因?yàn)榇翱谀J(rèn)的觸發(fā)器實(shí)現(xiàn)機(jī)制是本該在一個(gè)窗口內(nèi)的數(shù)據(jù)亂序了以后蓖宦,只要在這個(gè)窗口結(jié)束(即被觸發(fā))之前來,那是不影響的油猫,不認(rèn)為是遲到數(shù)據(jù)稠茂,不會(huì)被丟掉;但如果這個(gè)窗口已經(jīng)結(jié)束了才來情妖,就會(huì)被丟掉了睬关。比如event3本應(yīng)該屬于12:00:00-12:00:05這個(gè)窗口,當(dāng)event5這條數(shù)據(jù)來的時(shí)候毡证,這個(gè)窗口就就認(rèn)為數(shù)據(jù)完整了电爹,于是觸發(fā)計(jì)算,接著就銷毀了料睛。等event3來的時(shí)候已經(jīng)是12:00:05-12:00:10窗口了丐箩,所以它直接被丟掉了。也就是在時(shí)間窗口這里恤煞,對(duì)于“亂序”的定義不是要求每個(gè)到來事件的時(shí)間戳都嚴(yán)格升序屎勘,而是看屬于這個(gè)窗口的事件能否在窗口時(shí)間范圍內(nèi)來,如果能來居扒,就不算亂序概漱,至于在這個(gè)時(shí)間范圍內(nèi)來的先后順序無所謂。這個(gè)其實(shí)也是合理的喜喂。

另外還有兩個(gè)細(xì)節(jié)點(diǎn)要注意一下:

  • 當(dāng)Source是有界數(shù)據(jù)時(shí)瓤摧,當(dāng)所有數(shù)據(jù)發(fā)送完畢后,系統(tǒng)會(huì)自動(dòng)發(fā)一個(gè)值為Long.MAX_VALUE的Watermark夜惭,表示數(shù)據(jù)發(fā)送完了姻灶。
  • Window是一個(gè)左閉右開區(qū)間,比如12:00:00的數(shù)據(jù)屬于12:00:00-12:00:05窗口诈茧,而12:00:05的數(shù)據(jù)屬于12:00:05-12:00:10窗口产喉。
AssignerWithPeriodicWatermarks && AssignerWithPunctuatedWatermarks

AssignerWithPriodicWatermarks和AssignerWithPunctuatedWatermarks其實(shí)非常像,哪怕是用法都非常像敢会,他兩個(gè)的主要區(qū)別是Watermark的產(chǎn)生機(jī)制或者時(shí)機(jī):AssignerWithPriodicWatermarks是根據(jù)一個(gè)固定的時(shí)間周期性的產(chǎn)生Watermark曾沈,而AssignerWithPunctuatedWatermarks則是由事件驅(qū)動(dòng),然后代碼自己控制何時(shí)以何種方式產(chǎn)生Watermark鸥昏,比如一個(gè)event就產(chǎn)生一個(gè)塞俱,還是幾個(gè)event產(chǎn)生一個(gè),或者滿足什么條件時(shí)產(chǎn)生Watermark等吏垮,就是用戶可以靈活控制障涯。

package com.niyanchun.watermark;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;

/**
 * Assign timestamp and watermark at Source Function Demo.
 *
 * @author NiYanchun
 **/
public class AssignerWatermarksDemo {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    env.addSource(new CustomSource())
        .assignTimestampsAndWatermarks(new CustomAssignerWithPeriodicWatermarks())
//        .assignTimestampsAndWatermarks(new CustomAssignerWithPunctuatedWatermarks())
        .timeWindowAll(Time.seconds(5))
        .process(new CustomProcessFunction())
        .print();

    env.execute();
  }

  public static class CustomSource extends RichSourceFunction<JSONObject> {

    @Override
    public void run(SourceContext<JSONObject> ctx) throws Exception {
      System.out.println("event in source:");
      getOutOfOrderEvents().forEach(e -> {
        System.out.println(e);
        ctx.collect(e);
      });

      try {
        Thread.sleep(2000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    @Override
    public void cancel() {

    }
  }
  
  /**
   * generate out of order events
   *
   * @return List<JSONObject>
   */
  private static List<JSONObject> getOutOfOrderEvents() {
    // 2020-05-24 12:00:00
    JSONObject event1 = new JSONObject().fluentPut("id", "event1")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 0));
    // 2020-05-24 12:00:01
    JSONObject event2 = new JSONObject().fluentPut("id", "event2")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 1));
    // 2020-05-24 12:00:03
    JSONObject event3 = new JSONObject().fluentPut("id", "event3")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 3));
    // 2020-05-24 12:00:04
    JSONObject event4 = new JSONObject().fluentPut("id", "event4")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 4));
    // 2020-05-24 12:00:05
    JSONObject event5 = new JSONObject().fluentPut("id", "event5")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 5));
    // 2020-05-24 12:00:06
    JSONObject event6 = new JSONObject().fluentPut("id", "event6")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 6));
    // 2020-05-24 12:00:07
    JSONObject event7 = new JSONObject().fluentPut("id", "event7")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 7));
    // 2020-05-24 12:00:08
    JSONObject event8 = new JSONObject().fluentPut("id", "event8")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 8));
    // 2020-05-24 12:00:09
    JSONObject event9 = new JSONObject().fluentPut("id", "event9")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 9));

    // 可以把消息打亂罐旗,模擬實(shí)際中的消息亂序。
    // 真實(shí)的消息產(chǎn)生順序是(根據(jù)時(shí)間戳):event1, event2, event3, event4, event5, event6, event7, event8, event9
    // 打亂之后的消息順序是:event1, event2, event4, event3, event5, event7, event6, event8, event9
    return Arrays.asList(event1, event2, event4, event5, event7, event3, event6, event8, event9);
  }

  public static class CustomProcessFunction extends ProcessAllWindowFunction<JSONObject, Object, TimeWindow> {

    @Override
    public void process(Context context, Iterable<JSONObject> elements, Collector<Object> out) throws Exception {
      TimeWindow window = context.window();
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      System.out.println(String.format("\nwindow{%s - %s}", sdf.format(window.getStart()), sdf.format(window.getEnd())));

      int count = 0;
      for (JSONObject element : elements) {
        System.out.println(element.getString("id"));
        count++;
      }
      System.out.println("Total:" + count);
    }
  }

  /**
   * AssignerWithPeriodicWatermarks demo
   */
  public static class CustomAssignerWithPeriodicWatermarks implements AssignerWithPeriodicWatermarks<JSONObject> {

    private long currentTimestamp;

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      System.out.println(String.format("invoke getCurrentWatermark at %s and watermark is: %s",
          System.currentTimeMillis(), sdf.format(currentTimestamp)));
      return new Watermark(currentTimestamp);
    }

    @Override
    public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
      long timestamp = ((DateTime) element.get("timestamp")).getMillis();
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      System.out.println("invoke extractTimestamp: " + sdf.format(timestamp));
      currentTimestamp = timestamp;
      return timestamp;
    }
  }

  /**
   * AssignerWithPunctuatedWatermarks demo.
   */
  public static class CustomAssignerWithPunctuatedWatermarks implements AssignerWithPunctuatedWatermarks<JSONObject> {

    private long currentTimestamp;

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp) {
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      System.out.println(String.format("invoke getCurrentWatermark at %s and watermark is: %s",
          System.currentTimeMillis(), sdf.format(currentTimestamp)));
      return new Watermark(currentTimestamp);
    }

    @Override
    public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
      long timestamp = ((DateTime) element.get("timestamp")).getMillis();
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      System.out.println("invoke extractTimestamp: " + sdf.format(timestamp));
      currentTimestamp = timestamp;
      return timestamp;
    }
  }
}

先分別看下AssignerWithPriodicWatermarks和AssignerWithPunctuatedWatermarks部分吧:

  /**
   * AssignerWithPeriodicWatermarks demo
   */
  public static class CustomAssignerWithPeriodicWatermarks implements AssignerWithPeriodicWatermarks<JSONObject> {

    private long currentTimestamp;

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
      // 省略一些邏輯
      return new Watermark(currentTimestamp);
    }

    @Override
    public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
      // 省略一些邏輯
      return timestamp;
    }
  }

  /**
   * AssignerWithPunctuatedWatermarks demo.
   */
  public static class CustomAssignerWithPunctuatedWatermarks implements AssignerWithPunctuatedWatermarks<JSONObject> {

    private long currentTimestamp;

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp) {
      // 省略一些邏輯
      return new Watermark(currentTimestamp);
    }

    @Override
    public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
      // 省略一些邏輯
      return timestamp;
    }
  }

為了突出重點(diǎn)唯蝶,我刪掉了具體實(shí)現(xiàn)九秀。可以看到這兩個(gè)類都有一個(gè)extractTimestamp方法粘我,這個(gè)方法每個(gè)Event都會(huì)調(diào)用鼓蜒,作用就是給Event賦一個(gè)Event Time。另外一個(gè)方法稍微有點(diǎn)差異征字,AssignerWithPeriodicWatermarks的方法叫g(shù)etCurrentWatermark()都弹,而AssignerWithPunctuatedWatermarks的方法是checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp),它們的主要區(qū)別是方法的調(diào)用機(jī)制:

  • getCurrentWatermark()沒有參數(shù)匙姜,它是框架根據(jù)用戶設(shè)置的固定時(shí)間周期性的調(diào)用畅厢。這個(gè)固定的時(shí)間可以通過以下方式設(shè)置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ExecutionConfig executionConfig = env.getConfig();
executionConfig.setAutoWatermarkInterval(500);

上面的代碼設(shè)置每500毫秒調(diào)用一次getCurrentWatermark(),即每500毫秒產(chǎn)生一個(gè)Watermark搁料。不顯式的設(shè)置的話或详,默認(rèn)值是0,但實(shí)際效果是每200ms調(diào)用一次郭计。

  • checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp)有兩個(gè)參數(shù):一個(gè)是event霸琴,一個(gè)是extractTimestamp方法返回的時(shí)間戳。這個(gè)方法被調(diào)用的時(shí)間點(diǎn)是:每個(gè)事件來了先調(diào)用extractTimestamp昭伸,然后馬上調(diào)用checkAndGetNextWatermark梧乘。在checkAndGetNextWatermark中你可以通過返回值控制是否產(chǎn)生新的Watermark,如果你不想返回新的Watermark庐杨,可以返回null或者一個(gè)小于等于上一個(gè)Watermark的時(shí)間戳选调,這樣就相當(dāng)于本次不返回Watermark或者返回的Watermark不是遞增的被丟棄了,繼續(xù)使用原來的Watermark灵份。因?yàn)閃atermark不能為null仁堪,且必須單調(diào)遞增。

AssignerWithPriodicWatermarks和AssignerWithPunctuatedWatermarks的區(qū)別就這些填渠,最佳實(shí)踐的話我個(gè)人覺得優(yōu)先考慮AssignerWithPriodicWatermarks弦聂,如果不能滿足需求,再考慮AssignerWithPunctuatedWatermarks氛什。一方面是前者簡單一些莺葫,另一方面是一般沒有必要每個(gè)事件就計(jì)算一個(gè)Watermark,這樣會(huì)增加不是很有必要的計(jì)算量枪眉。

遲到數(shù)據(jù)

從上面的部分看到event3因?yàn)檫t到被默默的丟掉了捺檬,現(xiàn)實(shí)中數(shù)據(jù)是重要資產(chǎn),肯定是不能隨便丟棄的贸铜。Flink提供了兩種解決方案:

  1. 允許一定的延遲堡纬。這個(gè)延遲可以在兩個(gè)地方設(shè)置:第一種是可以在上面的AssignerWithXXXWatermarks方法里面給計(jì)算出的時(shí)間戳減去一個(gè)時(shí)間聂受,這個(gè)時(shí)間就是你允許延遲的時(shí)間。第二種就是在時(shí)間窗口那里可以通過allowedLateness來設(shè)置一個(gè)允許的延遲時(shí)間隐轩,

但允許一定延遲的方式只能治標(biāo)饺饭,不能治本渤早。我們只能根據(jù)實(shí)際情況允許一定限度的延遲职车,但總歸是有個(gè)限度的,原因主要有兩個(gè):1)延遲太高會(huì)喪失實(shí)時(shí)性鹊杖,如果你的場景對(duì)實(shí)時(shí)性要求比較高悴灵,那就無法設(shè)置太大的延遲。2)延遲實(shí)際是延長了窗口的生命周期骂蓖,所以資源消耗會(huì)增加积瞒。

  1. 在Window那里通過sideOutputLateData將遲到的數(shù)據(jù)以流的形式旁路出去。這個(gè)是治本的手段登下,它沒有時(shí)間的限制茫孔,如果有遲到數(shù)據(jù),就會(huì)發(fā)送到這個(gè)單獨(dú)的流里面去被芳,然后可以為這個(gè)流單獨(dú)設(shè)置處理方式缰贝。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市畔濒,隨后出現(xiàn)的幾起案子剩晴,更是在濱河造成了極大的恐慌,老刑警劉巖侵状,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件赞弥,死亡現(xiàn)場離奇詭異,居然都是意外死亡趣兄,警方通過查閱死者的電腦和手機(jī)绽左,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來艇潭,“玉大人拼窥,你說我怎么就攤上這事”┣” “怎么了闯团?”我有些...
    開封第一講書人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長仙粱。 經(jīng)常有香客問我房交,道長,這世上最難降的妖魔是什么伐割? 我笑而不...
    開封第一講書人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任候味,我火速辦了婚禮刃唤,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘白群。我一直安慰自己尚胞,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開白布帜慢。 她就那樣靜靜地躺著笼裳,像睡著了一般。 火紅的嫁衣襯著肌膚如雪粱玲。 梳的紋絲不亂的頭發(fā)上躬柬,一...
    開封第一講書人閱讀 51,541評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音抽减,去河邊找鬼允青。 笑死,一個(gè)胖子當(dāng)著我的面吹牛卵沉,可吹牛的內(nèi)容都是我干的颠锉。 我是一名探鬼主播,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼史汗,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼琼掠!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起淹办,我...
    開封第一講書人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤眉枕,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后怜森,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體速挑,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年副硅,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了姥宝。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡恐疲,死狀恐怖腊满,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情培己,我是刑警寧澤碳蛋,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布,位于F島的核電站省咨,受9級(jí)特大地震影響肃弟,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一笤受、第九天 我趴在偏房一處隱蔽的房頂上張望穷缤。 院中可真熱鬧,春花似錦箩兽、人聲如沸津肛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽身坐。三九已至,卻和暖如春芳绩,著一層夾襖步出監(jiān)牢的瞬間掀亥,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來泰國打工妥色, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人遏片。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓嘹害,卻偏偏與公主長得像,于是被迫代替她去往敵國和親吮便。 傳聞我的和親對(duì)象是個(gè)殘疾皇子笔呀,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容