Flink 使用之WatermarkStrategy(含源碼分析)

Flink 使用介紹相關(guān)文檔目錄

Flink 使用介紹相關(guān)文檔目錄

前言

概括來說,watermark用于基于event time的流計(jì)算系統(tǒng)數(shù)據(jù)流可能發(fā)生亂序的情況芋膘。對(duì)于event time數(shù)據(jù)流倦淀,接收到數(shù)據(jù)的時(shí)間和上游產(chǎn)生數(shù)據(jù)的時(shí)間是不相關(guān)的蹦掐,因此可能會(huì)出現(xiàn)產(chǎn)生時(shí)間較早的數(shù)據(jù)由于網(wǎng)絡(luò)抖動(dòng)等原因到達(dá)Flink系統(tǒng)較晚的情形。Watermark用于應(yīng)對(duì)數(shù)據(jù)亂序的情況胖眷。Watermark是數(shù)據(jù)流中的一種特殊數(shù)據(jù)榕栏,由Flink內(nèi)部周期(可自定義)產(chǎn)生。下游接收到watermark的時(shí)候屹耐,會(huì)認(rèn)為timestamp在watermark之前的數(shù)據(jù)已經(jīng)到齊尸疆。針對(duì)這些數(shù)據(jù)的運(yùn)算過程可以開始。Watermark之后的數(shù)據(jù)沒有到齊惶岭,需要在緩存的同時(shí),等待后續(xù)數(shù)據(jù)的到來犯眠。Watermark的生成策略可以實(shí)現(xiàn)數(shù)據(jù)亂序的兼容按灶。例如將watermark發(fā)送的時(shí)間設(shè)置為當(dāng)前接收到的數(shù)據(jù)的最大timestamp(記為t)減去5s。這樣下游認(rèn)為t-5s之前的數(shù)據(jù)已經(jīng)到齊筐咧。t-5s之后的數(shù)據(jù)先緩存起來等待鸯旁。從而實(shí)現(xiàn)容忍“亂序的程度”不超過5s的情形。

更為詳細(xì)的Flink watermark講解參見以下文章:

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/concepts/time/

Flink 源碼之時(shí)間處理

接下來講解如何配置使用watermark生成策略(WatermarkStrategy)量蕊。

配置watermark自動(dòng)發(fā)送周期

Flink默認(rèn)的watermark自動(dòng)發(fā)送周期為200ms铺罢。Flink支持全局方式和局部方式配置自動(dòng)發(fā)送周期。

全局配置方式:修改flink-conf.yaml文件残炮,增加或修改
pipeline.auto-watermark-interval(對(duì)應(yīng)PipelineOptions.AUTO_WATERMARK_INTERVAL)配置項(xiàng)韭赘。

作業(yè)局部修改方式:調(diào)用ExecutionConfig.setAutoWatermarkInterval(...)方法。

env.getConfig().setAutoWatermarkInterval(100);

WatermarkStrategy使用

forMonotonousTimestamps

生成單調(diào)遞增的watermark势就。數(shù)據(jù)流元素到來的時(shí)候不發(fā)送watermark泉瞻,僅在到達(dá)自動(dòng)發(fā)送周期的時(shí)候才發(fā)送。

這里遞增的意思并非僅僅是watermark時(shí)間戳數(shù)值的嚴(yán)格遞增苞冯,每次發(fā)送的watermark都是最近接收到的元素?cái)y帶的timestamp袖牙。(從元素提取出攜帶的timestamp過程由TimestampAssigner負(fù)責(zé),后面分析)舅锄。一種例外情況是如果遇到遲到的數(shù)據(jù)(watermark比前一個(gè)元素斜薮铩),這個(gè)元素的watermark會(huì)被MonotonousTimestamps排除不做記錄,可以保證向下游發(fā)送的watermark是遞增的畴蹭。

// 使用env創(chuàng)建數(shù)據(jù)源
source = ...

source.assignTimestampsAndWatermarks(WatermarkStrategy.<元素類型>forMonotonousTimestamps();

forBoundedOutOfOrderness

上面的單調(diào)遞增方式無法解決元素亂序的問題烘贴。這里的BoundedOutOfOrderness是專門為數(shù)據(jù)存在亂序這種場(chǎng)景考慮的。使用時(shí)候需要指定一個(gè)參數(shù)撮胧,即最大可容忍的數(shù)據(jù)遲到時(shí)間桨踪。如果亂序數(shù)據(jù)遲到超過這個(gè)時(shí)間限制,該數(shù)據(jù)將被忽略芹啥。當(dāng)然還可以配置為旁路輸出锻离,參見Flink 使用之?dāng)?shù)據(jù)分流

BoundedOutOfOrderness實(shí)現(xiàn)并不復(fù)雜墓怀,基本和上面單調(diào)遞增的方式一致汽纠。區(qū)別是在周期發(fā)送watermark的時(shí)候,發(fā)送的watermark需要減去最大可容忍的數(shù)據(jù)遲到時(shí)間傀履。從而實(shí)現(xiàn)了數(shù)據(jù)計(jì)算的觸發(fā)時(shí)刻向后拖延虱朵,在拖延的時(shí)間段內(nèi)“等待”亂序數(shù)據(jù)到來。

使用方法如下:

source.assignTimestampsAndWatermarks(WatermarkStrategy.<元素類型>forBoundedOutOfOrderness(Duration.ofSeconds(30));

forGenerator

前面兩種watermark generator能夠滿足絕大多數(shù)使用場(chǎng)景钓账。如果仍不能滿足要求碴犬,F(xiàn)link提供了創(chuàng)建自定義watermark generator的方式。

這里以Integer類型的數(shù)據(jù)源為例梆暮,說明自定義generator的寫法服协。

.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WatermarkGeneratorSupplier<Integer>() {
    @Override
    public WatermarkGenerator<Integer> createWatermarkGenerator(Context context) {
        return new WatermarkGenerator<Integer>() {
            @Override
            public void onEvent(Integer integer, long eventTimestamp, WatermarkOutput watermarkOutput) {
                
            }

            @Override
            public void onPeriodicEmit(WatermarkOutput watermarkOutput) {

            }
        };
    }
}));

在自定義WatermarkGenerator的時(shí)候按需實(shí)現(xiàn)兩個(gè)方法:

  • onEvent:接收到元素的時(shí)候觸發(fā)。參數(shù)分別為輸入的元素啦粹,元素提取出來的timestamp(event time)和控制輸出watermark的對(duì)象(后面解釋)偿荷。
  • onPeriodicEmit:到達(dá)自動(dòng)發(fā)送watermark周期的時(shí)候觸發(fā)。參數(shù)只有一個(gè)唠椭,和前面的相同跳纳。

WatermarkOutput用來像下游發(fā)送watermark,或者控制數(shù)據(jù)輸出贪嫂。它有三個(gè)方法:

  • emitWatermark: 發(fā)送watermark到下游寺庄。
  • markIdle: 標(biāo)記output為空閑狀態(tài)。
  • markActive: 標(biāo)記output為活動(dòng)狀態(tài)撩荣。如果output在空閑狀態(tài)發(fā)送了watermark铣揉,也會(huì)自動(dòng)標(biāo)記為活動(dòng)狀態(tài)。

Flink中一個(gè)計(jì)算步驟可能有多個(gè)上游(雙數(shù)據(jù)流或更多)餐曹,計(jì)算步驟會(huì)考慮到所有上游的watermark逛拱。設(shè)想如果一個(gè)流一直不產(chǎn)生watermark,需要等待這個(gè)流的數(shù)據(jù)呢還是這個(gè)流目前就沒有數(shù)據(jù)可以忽略台猴?Flink不好判斷朽合。為了解決這個(gè)問題引入了idle(空閑)機(jī)制俱两。如果一個(gè)數(shù)據(jù)源標(biāo)記了空閑狀態(tài),下游計(jì)算的時(shí)候不會(huì)考慮這個(gè)數(shù)據(jù)源的watermark曹步。未能正確處理數(shù)據(jù)源的idle狀態(tài)會(huì)導(dǎo)致Flink整個(gè)計(jì)算過程的阻塞宪彩。務(wù)必要注意這一點(diǎn)。

noWatermarks

不生成任何watermark讲婚。

source.assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks());

withTimestampAssigner

該方法用來配置如何從元素中抽取出watermark尿孔。例如到來的數(shù)據(jù)包含數(shù)據(jù)生成時(shí)的timestamp,格式為Tuple2<String, Long>類型筹麸,值為("Hello", 1690437024)活合。我們可以獲取元素的第二個(gè)字段作為Flink內(nèi)部的timestamp使用。

withTimestampAssigner使用方法如下所示:

source.assignTimestampsAndWatermarks(WatermarkStrategy.<元素類型>forMonotonousTimestamps().withTimestampAssigner((element, timestamp) -> {
    // element: 到來的元素
    // timestamp:上游為元素指定的timestamp物赶,通常為數(shù)據(jù)源產(chǎn)生的timestamp
    // 需要編寫自己的抽取邏輯
    // 返回抽取出的timestamp
}));

withIdleness

上面forGenerator章節(jié)提到了idle問題白指。大家可能會(huì)問:有沒有一種常見的策略可以自動(dòng)標(biāo)記idle狀態(tài)?比如數(shù)據(jù)流持續(xù)一段時(shí)間沒有數(shù)據(jù)到來的時(shí)候自動(dòng)標(biāo)記為idle狀態(tài)酵紫。withIdleness正好是這種策略告嘲。它對(duì)watermark generator做了包裝。用戶在其中不需要再去編寫何時(shí)標(biāo)記idle的邏輯奖地。

withIdleness的用法如下所示:

.assignTimestampsAndWatermarks(WatermarkStrategy.<元素類型>forBoundedOutOfOrderness(Duration.ofSeconds(30)).withIdleness(Duration.ofSeconds(5)));

WatermarkStrategy源代碼分析

forBoundedOutOfOrderness

static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
    return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
}

上面創(chuàng)建了BoundedOutOfOrdernessWatermarks橄唬。繼續(xù)查看它的代碼。分析內(nèi)容在注釋中鹉动。

@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

    /** The maximum timestamp encountered so far. */
    // 暫存最大的timestamp
    // 發(fā)送的timestamp一定是遞增(或者大小不變)的
    private long maxTimestamp;

    /** The maximum out-of-orderness that this watermark generator assumes. */
    // 最大可容忍的數(shù)據(jù)遲到的時(shí)間范圍(亂序程度)
    private final long outOfOrdernessMillis;

    /**
     * Creates a new watermark generator with the given out-of-orderness bound.
     *
     * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
     */
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

        // start so that our lowest watermark would be Long.MIN_VALUE.
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }

    // ------------------------------------------------------------------------

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        // 接收到元素的時(shí)候轧坎,更新maxTimestamp
        // 如果接收到遲到的元素(eventTimestamp比maxTimestamp小)泽示,忽略不更新,確保timestamp遞增
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 在發(fā)送watermark時(shí)間周期到來的時(shí)候蜜氨,發(fā)送watermark
        // 發(fā)送的watermark需要減去outOfOrdernessMillis
        // 含義是讓下游認(rèn)為maxTimestamp - outOfOrdernessMillis - 1之前的數(shù)據(jù)已經(jīng)到齊
        // 只有認(rèn)為到齊的數(shù)據(jù)參會(huì)參與計(jì)算械筛,未到齊的數(shù)據(jù)會(huì)緩存等待
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

forMonotonousTimestamps

static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
    return (ctx) -> new AscendingTimestampsWatermarks<>();
}

這里創(chuàng)建了AscendingTimestampsWatermarks。它繼承了BoundedOutOfOrdernessWatermarks飒炎,代碼如下所示埋哟。

@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {

    /** Creates a new watermark generator with for ascending timestamps. */
    public AscendingTimestampsWatermarks() {
        super(Duration.ofMillis(0));
    }
}

看到構(gòu)造方法很容易明白,AscendingTimestampsWatermarks是一種不容忍任何數(shù)據(jù)遲到的BoundedOutOfOrdernessWatermarks郎汪。

withIdleness

default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
    checkNotNull(idleTimeout, "idleTimeout");
    checkArgument(
            !(idleTimeout.isZero() || idleTimeout.isNegative()),
            "idleTimeout must be greater than zero");
    return new WatermarkStrategyWithIdleness<>(this, idleTimeout);
}

繼續(xù)查看WatermarkStrategyWithIdlenesscreateWatermarkGenerator方法:

@Override
public WatermarkGenerator<T> createWatermarkGenerator(
        WatermarkGeneratorSupplier.Context context) {
    return new WatermarksWithIdleness<>(
            baseStrategy.createWatermarkGenerator(context), idlenessTimeout);
}

創(chuàng)建出的watermark generator為WatermarksWithIdleness赤赊。該類使用了裝飾器模式。在不改變?cè)衱atermark generator的基礎(chǔ)之上增加了標(biāo)記idle的能力煞赢。它有三個(gè)成員變量抛计。

// 包裝的watermark生成器
private final WatermarkGenerator<T> watermarks;
// idle定時(shí)器,用來判斷是否idle
private final IdlenessTimer idlenessTimer;
// 狀態(tài)標(biāo)記照筑,目前是否處于idle狀態(tài)
private boolean isIdleNow = false;

繼續(xù)分析onEventonPeriodicEmit方法:

@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
    // 調(diào)用被包裝watermark generator的onEvent方法
    watermarks.onEvent(event, eventTimestamp, output);
    // 告知idlenessTimer有活動(dòng)發(fā)生
    idlenessTimer.activity();
    // 標(biāo)記空閑狀態(tài)為false
    isIdleNow = false;
}

@Override
public void onPeriodicEmit(WatermarkOutput output) {
    if (idlenessTimer.checkIfIdle()) {
        // 檢查空閑狀態(tài)吹截,如果為空閑
        if (!isIdleNow) {
            // 如果當(dāng)前狀態(tài)不是空閑
            // 說明剛從活動(dòng)狀態(tài)變?yōu)榭臻e狀態(tài)
            // 標(biāo)記為idle狀態(tài)
            output.markIdle();
            // 記錄空閑狀態(tài)為true
            isIdleNow = true;
        }
    } else {
        // 如果不是idle狀態(tài)瘦陈,調(diào)用包裝的watermark generator的onPeriodicEmit方法
        watermarks.onPeriodicEmit(output);
    }
}

最后的問題就是IdlenessTimer是怎么判斷是否idle的。我們繼續(xù)分析它的構(gòu)造函數(shù)波俄,activity方法和checkIfIdle方法晨逝。

IdlenessTimer(Clock clock, Duration idleTimeout) {
    // 獲取時(shí)鐘
    this.clock = clock;

    long idleNanos;
    // 將idle超時(shí)時(shí)間轉(zhuǎn)換為納秒保存
    try {
        idleNanos = idleTimeout.toNanos();
    } catch (ArithmeticException ignored) {
        // long integer overflow
        idleNanos = Long.MAX_VALUE;
    }

    this.maxIdleTimeNanos = idleNanos;
}

public void activity() {
    // 內(nèi)部有個(gè)計(jì)數(shù)器,只要有活動(dòng)懦铺,該計(jì)數(shù)器自增1
    // counter是long類型捉貌,即便是自增溢出了,也不會(huì)影響
    counter++;
}

public boolean checkIfIdle() {
    if (counter != lastCounter) {
        // lastCounter為最近一次counter計(jì)數(shù)
        // 如果不等冬念,說明期間有活動(dòng)
        // 這里不寫if (counter > lastCounter)的原因是兼容counter溢出的情況
        // activity since the last check. we reset the timer
        // 更新lastCounter趁窃,重設(shè)計(jì)時(shí)器
        lastCounter = counter;
        startOfInactivityNanos = 0L;
        return false;
    } else // timer started but has not yet reached idle timeout
    if (startOfInactivityNanos == 0L) {
        // first time that we see no activity since the last periodic probe
        // begin the timer
        // 首次發(fā)現(xiàn)counter沒有更新,即沒有活動(dòng)刘急,啟用計(jì)時(shí)器
        startOfInactivityNanos = clock.relativeTimeNanos();
        return false;
    } else {
        // 如果當(dāng)前時(shí)間和計(jì)時(shí)器時(shí)間差超過maxIdleTimeNanos棚菊,說明處于空閑狀態(tài)
        return clock.relativeTimeNanos() - startOfInactivityNanos > maxIdleTimeNanos;
    }
}
    

本博客為作者原創(chuàng),歡迎大家參與討論和批評(píng)指正叔汁。如需轉(zhuǎn)載請(qǐng)注明出處统求。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市据块,隨后出現(xiàn)的幾起案子码邻,更是在濱河造成了極大的恐慌,老刑警劉巖另假,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件像屋,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡边篮,警方通過查閱死者的電腦和手機(jī)己莺,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來戈轿,“玉大人凌受,你說我怎么就攤上這事∷急” “怎么了胜蛉?”我有些...
    開封第一講書人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)色乾。 經(jīng)常有香客問我誊册,道長(zhǎng),這世上最難降的妖魔是什么暖璧? 我笑而不...
    開封第一講書人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任案怯,我火速辦了婚禮,結(jié)果婚禮上漆撞,老公的妹妹穿的比我還像新娘殴泰。我一直安慰自己于宙,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開白布悍汛。 她就那樣靜靜地躺著捞魁,像睡著了一般。 火紅的嫁衣襯著肌膚如雪离咐。 梳的紋絲不亂的頭發(fā)上谱俭,一...
    開封第一講書人閱讀 51,125評(píng)論 1 297
  • 那天,我揣著相機(jī)與錄音宵蛀,去河邊找鬼昆著。 笑死,一個(gè)胖子當(dāng)著我的面吹牛术陶,可吹牛的內(nèi)容都是我干的凑懂。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼梧宫,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼接谨!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起塘匣,我...
    開封第一講書人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤脓豪,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后忌卤,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體扫夜,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年驰徊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了笤闯。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡棍厂,死狀恐怖望侈,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情勋桶,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布侥猬,位于F島的核電站例驹,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏退唠。R本人自食惡果不足惜鹃锈,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望瞧预。 院中可真熱鬧屎债,春花似錦仅政、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至躯喇,卻和暖如春辫封,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背廉丽。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來泰國(guó)打工倦微, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人正压。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓欣福,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親焦履。 傳聞我的和親對(duì)象是個(gè)殘疾皇子拓劝,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容