再談Flink事件時間剃诅、水印和遲到數(shù)據(jù)處理

前言

之前的文章中已經(jīng)屢次提到過Flink的事件時間(event time)、水邮患伞(watermark)矛辕、亂序(out-of-order)、遲到數(shù)據(jù)(late element)這些概念,雖然它們都非沉钠罚基礎飞蹂,但筆者還沒有對它們做過像樣的介紹,感覺不太合適翻屈。正好今天腦子比較累陈哑,又是Friday night,不適合寫復雜的東西妖胀,就來談談簡單的吧芥颈。

事件時間與水印

所謂事件時間,就是Flink DataStream中的數(shù)據(jù)元素自身帶有的赚抡、在其實際發(fā)生時記錄的時間戳爬坑,具有業(yè)務含義,并與系統(tǒng)時間獨立涂臣。很顯然盾计,由于外部系統(tǒng)產(chǎn)生的數(shù)據(jù)往往不能及時、按序到達Flink系統(tǒng)赁遗,所以事件時間比處理時間有更強的不可預測性署辉。為了能夠準確地表達事件時間的處理進度,就必須用到水印岩四。

Flink水印的本質是DataStream中的一種特殊元素哭尝,每個水印都攜帶有一個時間戳。當時間戳為T的水印出現(xiàn)時剖煌,表示事件時間t <= T的數(shù)據(jù)都已經(jīng)到達材鹦,即水印后面應該只能流入事件時間t > T的數(shù)據(jù)。也就是說耕姊,水印是Flink判斷遲到數(shù)據(jù)的標準桶唐,同時也是窗口觸發(fā)的標記。

為了形象地說明水印的作用茉兰,參考一下下面的圖尤泽,是一個亂序的基于事件時間的數(shù)據(jù)流示例。

https://www.ververica.com/blog/how-apache-flink-enables-new-streaming-applications-part-1

圖中的方框就是數(shù)據(jù)元素规脸,其中的數(shù)字表示事件時間坯约,W(x)就表示時間戳是x的水印,并有長度為4個時間單位的滾動窗口莫鸭。假設時間單位為秒鬼店,可見事件時間為2、3黔龟、1s的元素都會進入?yún)^(qū)間為[1s, 4s]的窗口妇智,而事件時間為7s的元素會進入?yún)^(qū)間為[5s, 8s]的窗口滥玷。當水印W(4)到達時,表示已經(jīng)沒有t <= 4s的元素了巍棱,[1s, 4s]窗口會被觸發(fā)并計算惑畴。同理,水印W(9)到達時航徙,[5s, 8s]窗口會被觸發(fā)并計算如贷,以此類推。

不過圖中暫時沒有示出遲到數(shù)據(jù)到踏。如果事件時間為6的元素出現(xiàn)在W(9)后面杠袱,就算是遲到了。遲到數(shù)據(jù)的處理后面再說窝稿。

上面的示例只有一個并行度楣富,那么在有多個并行度的情況下,就會有多個流產(chǎn)生水印伴榔,窗口觸發(fā)時該采用哪個水印呢纹蝴?答案是所有流入水印中時間戳最小的那個。來自官方文檔的圖能夠說明問題踪少。

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html

容易理解塘安,如果所有流入水印中時間戳最小的那個都已經(jīng)達到或超過了窗口的結束時間,那么所有流的數(shù)據(jù)肯定已經(jīng)全部收齊援奢,就可以安全地觸發(fā)窗口計算了兼犯。

提取事件時間、產(chǎn)生水印

上面說了這么多集漾,那么事件時間是如何從數(shù)據(jù)中提取的免都,水印又是如何產(chǎn)生的呢?Flink提供了統(tǒng)一的DataStream.assignTimestampsAndWatermarks()方法來提取事件時間并同時產(chǎn)生水印帆竹,畢竟它們在處理過程中是緊密聯(lián)系的。

assignTimestampsAndWatermarks()方法接受的參數(shù)類型有AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks兩種脓规,分別對應周期性水印和打點(即由事件本身的屬性觸發(fā))水印栽连,它們的類圖如下所示。

周期性水印

顧名思義侨舆,使用AssignerWithPeriodicWatermarks時秒紧,水印是周期性產(chǎn)生的。該周期默認為200ms挨下,也能通過ExecutionConfig.setAutoWatermarkInterval()方法來指定新的周期熔恢。

由類圖容易看出,我們需要通過實現(xiàn)extractTimestamp()方法來提取事件時間臭笆,實現(xiàn)getCurrentWatermark()方法產(chǎn)生水印叙淌。但好在Flink已經(jīng)提供了3種內置的實現(xiàn)類秤掌,所以我們直接用就可以了,省事鹰霍。

  • AscendingTimestampExtractor
    總說話口干舌燥的(闻鉴?),還是看代碼吧茂洒。
    public abstract long extractAscendingTimestamp(T element);

    @Override
    public final long extractTimestamp(T element, long elementPrevTimestamp) {
        final long newTimestamp = extractAscendingTimestamp(element);
        if (newTimestamp >= this.currentTimestamp) {
            this.currentTimestamp = newTimestamp;
            return newTimestamp;
        } else {
            violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
            return newTimestamp;
        }
    }

    @Override
    public final Watermark getCurrentWatermark() {
        return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
    }

AscendingTimestampExtractor產(chǎn)生的時間戳和水印必須是單調非遞減的孟岛,用戶通過覆寫extractAscendingTimestamp()方法抽取時間戳。如果產(chǎn)生了遞減的時間戳督勺,就要使用名為MonotonyViolationHandler的組件處理異常渠羞,有兩種方式:打印警告日志(默認)和拋出RuntimeException。

單調遞增的事件時間并不太符合實際情況智哀,所以AscendingTimestampExtractor用得不多次询。

  • BoundedOutOfOrdernessTimestampExtractor
    它的出鏡率就非常高了。還是看代碼先盏触。
    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        if (maxOutOfOrderness.toMilliseconds() < 0) {
            throw new RuntimeException("Tried to set the maximum allowed " +
                "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        }
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
    }

    public abstract long extractTimestamp(T element);

    @Override
    public final Watermark getCurrentWatermark() {
        long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
        if (potentialWM >= lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }

    @Override
    public final long extractTimestamp(T element, long previousElementTimestamp) {
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) {
            currentMaxTimestamp = timestamp;
        }
        return timestamp;
    }

如名字所述渗蟹,BoundedOutOfOrdernessTimestampExtractor產(chǎn)生的時間戳和水印是允許“有界亂序”的,構造它時傳入的參數(shù)maxOutOfOrderness就是亂序區(qū)間的長度赞辩,而實際發(fā)射的水印為通過覆寫extractTimestamp()方法提取出來的時間戳減去亂序區(qū)間雌芽,相當于讓水印把步調“放慢一點”。這是Flink為遲到數(shù)據(jù)提供的第一重保障辨嗽。

當然世落,亂序區(qū)間的長度要根據(jù)實際環(huán)境謹慎設定,設定得太短會丟較多的數(shù)據(jù)糟需,設定得太長會導致窗口觸發(fā)延遲屉佳,實時性減弱。

  • IngestionTimeExtractor
    @Override
    public long extractTimestamp(T element, long previousElementTimestamp) {
        final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
        maxTimestamp = now;
        return now;
    }

    @Override
    public Watermark getCurrentWatermark() {
        final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
        maxTimestamp = now;
        return new Watermark(now - 1);
    }

IngestionTimeExtractor基于當前系統(tǒng)時鐘生成時間戳和水印洲押,其實就是Flink三大時間特征里的攝入時間了武花。

打點水印

打點水印比周期性水印用的要少不少,并且Flink沒有內置的實現(xiàn)杈帐,那么就寫個最簡單的栗子吧体箕。

    sourceStream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<UserActionRecord>() {
      @Nullable
      @Override
      public Watermark checkAndGetNextWatermark(UserActionRecord lastElement, long extractedTimestamp) {
        return lastElement.getUserId().endsWith("0") ? new Watermark(extractedTimestamp - 1) : null;
      }

      @Override
      public long extractTimestamp(UserActionRecord element, long previousElementTimestamp) {
        return element.getTimestamp();
      }
    });

AssignerWithPunctuatedWatermarks適用于需要依賴于事件本身的某些屬性決定是否發(fā)射水印的情況。我們實現(xiàn)checkAndGetNextWatermark()方法來產(chǎn)生水印挑童,產(chǎn)生的時機完全由用戶控制累铅。上面例子中是收取到用戶ID末位為0的數(shù)據(jù)時才發(fā)射。

還有三點需要提醒:

  • 不管使用哪種方式產(chǎn)生水印站叼,都不能過于頻繁娃兽。因為Watermark對象是會全部流向下游的,也會實打實地占用內存尽楔,水印過多會造成系統(tǒng)性能下降投储。
  • 水印的生成要盡量早第练,一般是在接入Source之后就產(chǎn)生,或者在Source經(jīng)過簡單的變換(map轻要、filter等)之后產(chǎn)生复旬。
  • 如果需求方對事件時間carry的業(yè)務意義并不關心,可以直接使用處理時間冲泥,簡單方便驹碍。

遲到數(shù)據(jù)處理

如上所述,水印的亂序區(qū)間能夠保證一些遲到數(shù)據(jù)不被丟棄凡恍,但是亂序區(qū)間往往不很長志秃,那些真正遲到了的數(shù)據(jù)該怎么辦呢?有兩種方法來兜底嚼酝,可以說是Flink為遲到數(shù)據(jù)提供的第二重保障浮还。

窗口允許延遲

Flink提供了WindowedStream.allowedLateness()方法來設定窗口的允許延遲。也就是說闽巩,正常情況下窗口觸發(fā)計算完成之后就會被銷毀钧舌,但是設定了允許延遲之后,窗口會等待allowedLateness的時長后再銷毀涎跨。在該區(qū)間內的遲到數(shù)據(jù)仍然可以進入窗口中洼冻,并觸發(fā)新的計算。當然隅很,窗口也是吃資源大戶撞牢,所以allowedLateness的值要適當。給個完整的代碼示例如下叔营。

      sourceStream.assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor<UserActionRecord>(Time.seconds(30)) {
          private static final long serialVersionUID = 1L;
          @Override
          public long extractTimestamp(UserActionRecord record) {
            return record.getTimestamp();
          }
        }
      )
      .keyBy("platform")
      .window(TumblingEventTimeWindows.of(Time.minutes(1)))
      .allowedLateness(Time.seconds(30))
      .aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc())
      // ......

allowedLateness機制實際上就是DataFlow模型中的回填(backfill)策略的實現(xiàn)屋彪。對于滑動窗口和滾動窗口是累積(accumulating)策略,對于會話窗口則是累積與回撤(accumulating & retracting)策略绒尊。之前講DataFlow模型時提到過畜挥,不廢話了。

側輸出遲到數(shù)據(jù)

側輸出(side output)是Flink的分流機制婴谱。遲到數(shù)據(jù)本身可以當做特殊的流蟹但,我們通過調用WindowedStream.sideOutputLateData()方法將遲到數(shù)據(jù)發(fā)送到指定OutputTag的側輸出流里去,再進行下一步處理(比如存到外部存儲或消息隊列)勘究。代碼如下。

      // 側輸出的OutputTag
      OutputTag<UserActionRecord> lateOutputTag = new OutputTag<>("late_data_output_tag");

      sourceStream.assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor<UserActionRecord>(Time.seconds(30)) {
          private static final long serialVersionUID = 1L;
          @Override
          public long extractTimestamp(UserActionRecord record) {
            return record.getTimestamp();
          }
        }
      )
      .keyBy("platform")
      .window(TumblingEventTimeWindows.of(Time.minutes(1)))
      .allowedLateness(Time.seconds(30))
      .sideOutputLateData(lateOutputTag)   // 側輸出
      .aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc())
      // ......

      // 獲取遲到數(shù)據(jù)并寫入對應Sink
      stream.getSideOutput(lateOutputTag).addSink(lateDataSink);

The End

2019年快過去了斟冕,今年真的寫了不少東西呢口糕。

民那晚安。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末磕蛇,一起剝皮案震驚了整個濱河市景描,隨后出現(xiàn)的幾起案子十办,更是在濱河造成了極大的恐慌,老刑警劉巖超棺,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件向族,死亡現(xiàn)場離奇詭異,居然都是意外死亡棠绘,警方通過查閱死者的電腦和手機件相,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來氧苍,“玉大人夜矗,你說我怎么就攤上這事∪门埃” “怎么了紊撕?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵,是天一觀的道長赡突。 經(jīng)常有香客問我对扶,道長,這世上最難降的妖魔是什么惭缰? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任浪南,我火速辦了婚禮,結果婚禮上从媚,老公的妹妹穿的比我還像新娘逞泄。我一直安慰自己,他們只是感情好拜效,可當我...
    茶點故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布喷众。 她就那樣靜靜地躺著,像睡著了一般紧憾。 火紅的嫁衣襯著肌膚如雪到千。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天赴穗,我揣著相機與錄音憔四,去河邊找鬼。 笑死般眉,一個胖子當著我的面吹牛了赵,可吹牛的內容都是我干的。 我是一名探鬼主播甸赃,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼柿汛,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了埠对?” 一聲冷哼從身側響起络断,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤裁替,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后貌笨,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體弱判,經(jīng)...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年锥惋,在試婚紗的時候發(fā)現(xiàn)自己被綠了昌腰。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,605評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡净刮,死狀恐怖剥哑,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情淹父,我是刑警寧澤株婴,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站暑认,受9級特大地震影響困介,放射性物質發(fā)生泄漏。R本人自食惡果不足惜蘸际,卻給世界環(huán)境...
    茶點故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一座哩、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧粮彤,春花似錦根穷、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至惫周,卻和暖如春尘惧,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背递递。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工喷橙, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人登舞。 一個月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓贰逾,卻偏偏與公主長得像,于是被迫代替她去往敵國和親菠秒。 傳聞我的和親對象是個殘疾皇子疙剑,可洞房花燭夜當晚...
    茶點故事閱讀 43,472評論 2 348