Flink--EventTime中WaterMark知識點掃盲

  • 基于flink-1.8.1
  • 基于flink官網(wǎng)

概述

  • 實時計算中谴返,數(shù)據(jù)時間比較敏感皂岔。有eventTime和processTime區(qū)分庭瑰,一般來說eventTime是從原始的消息中提取過來的晤锹,processTime是Flink自己提供的驼唱,F(xiàn)link中一個亮點就是可以基于eventTime計算寇损,這個功能很有用凸郑,因為實時數(shù)據(jù)可能會經(jīng)過比較長的鏈路,多少會有延時矛市,并且有很大的不確定性芙沥,對于一些需要精確體現(xiàn)事件變化趨勢的場景中,單純使用processTime顯然是不合理的浊吏。


    Flink-eventtime.png

Flink WaterMark介紹

  • watermark是一種衡量Event Time進展的機制而昨,它是數(shù)據(jù)本身的一個隱藏屬性。通痴姨铮基于Event Time的數(shù)據(jù)歌憨,自身都包含一個timestamp.watermark是用于處理亂序事件的,而正確的處理亂序事件墩衙,通常用watermark機制結合window來實現(xiàn)躺孝。
  • 流處理從事件產(chǎn)生享扔,到流經(jīng)source,再到operator植袍,中間是有一個過程和時間的惧眠。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時間順序來的于个,但是也不排除由于網(wǎng)絡氛魁、背壓等原因,導致亂序的產(chǎn)生(out-of-order或者說late element)厅篓。
  • 但是對于late element秀存,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間后羽氮,必須觸發(fā)window去進行計算了或链。這個特別的機制,就是watermark档押。

并行流中的watermark

  • 在source functions處或者之后生成watermark澳盐,source functions的parallel subtask 通常會獨立的生成watermark;這些watermarks定義了該特定parallel source的事件事件令宿;
  • 對于流式處理中的叼耙,當watermarks達到某個算子時,watermark會將event time提前粒没。每當算子將流中的event time提前筛婉,這個算子都會為下游算子生成新的watermark;
  • 一些算子會有多個source stream癞松, 例如爽撒,一個union,或者跟隨keyBy(...)或partition(...)函數(shù)的算子响蓉。當前輸入stream中的event time是多個source stream中的event time的最小值匆浙;(Such an operator’s current event time is the minimum of its input streams’ event times);由于input stream更新了event time,算子同樣會更新event time厕妖。如下圖所示:
    Flink-eventtime.png

生成 Timestamps / Watermarks

分配Timestamps

  • 為了處理事件時間,F(xiàn)link需要知道事件的時間戳挑庶,這意味著流中的每個元素都需要分配其事件時間戳言秸。 這通常通過從元素中的某個字段訪問/提取時間戳來完成。
  • 時間戳分配與生成watermark密切相關迎捺,watermark告訴系統(tǒng)事件時間的進展举畸。
  • 有兩種方法可以分配時間戳并生成水印:
    • 直接在數(shù)據(jù)流源中凳枝;
    • 通過時間戳分配器/watermatk生成器:在Flink中抄沮,時間戳分配器還定義要發(fā)出的watermark跋核;
  • Attention:Both timestamps and watermarks are specified as milliseconds since the Java epoch of 1970-01-01T00:00:00Z.

Source Functions with Timestamps and Watermarks

  • source functions可以直接為它們生成的元素分配時間戳,它們也可以發(fā)出watermark叛买。當完成此操作時砂代,不需要再使用時間戳分配器。請注意率挣,如果使用時間戳分配器刻伊,則將覆蓋源提供的任何時間戳和水印。
  • 要直接為source中的元素分配時間戳椒功,源必須在SourceContext上使用collectWithTimestamp(...)方法捶箱。 要生成水印,源必須調用emitWatermark(水佣)功能丁屎。
  • 語法:
    java
@Override
public void run(SourceContext<MyType> ctx) throws Exception {
    while (/* condition */) {
        MyType next = getNext();
        ctx.collectWithTimestamp(next, next.getEventTimestamp());

        if (next.hasWatermarkTime()) {
            ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
        }
    }
}

scala

override def run(ctx: SourceContext[MyType]): Unit = {
    while (/* condition */) {
        val next: MyType = getNext()
        ctx.collectWithTimestamp(next, next.eventTimestamp)

        if (next.hasWatermarkTime) {
            ctx.emitWatermark(new Watermark(next.getWatermarkTime))
        }
    }
}

Timestamp Assigners / Watermark Generators

  • Timestamp Assigners獲取stream并生成帶有帶時間戳元素和水印的新流。 如果原始stream中已經(jīng)有時間戳和/或水印旱眯,則時間戳分配器會覆蓋它們晨川。
  • 時間戳分配器通常在數(shù)據(jù)源之后立即指定,但并非嚴格要求這樣做键思。 例如础爬,常見的模式是在時間戳分配器之前解析(MapFunction)和過濾(FilterFunction)。 在任何情況下吼鳞,需要在事件時間的第一個操作之前指定時間戳分配器(例如第一個窗口操作)看蚜。 作為一種特殊情況,當使用Kafka作為流式傳輸作業(yè)的源時赔桌,F(xiàn)link允許在源(或消費者)本身內指定時間戳分配器/水印發(fā)射器供炎。 有關如何執(zhí)行此操作的更多信息,請參閱Kafka Connector文檔疾党。
  • java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

withTimestampsAndWatermarks
        .keyBy( (event) -> event.getGroup() )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...);
  • scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: DataStream[MyEvent] = env.readFile(
         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
         FilePathFilter.createDefaultFilter())

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .filter( _.severity == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

withTimestampsAndWatermarks
        .keyBy( _.getGroup )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) => a.add(b) )
        .addSink(...)

With Periodic Watermarks周期性watermark

定時提取watermark音诫,這種方式會定時提取更新wartermark。

  • AssignerWithPeriodicWatermarks定期分配時間戳并生成水友┪弧(可能取決于流元素竭钝,或純粹基于處理時間)。
  • 生成水印的間隔(每n毫秒)由ExecutionConfig.setAutoWatermarkInterval(...)定義雹洗。 每次調用分配器的getCurrentWatermark()方法香罐,如果返回的水印非空且大于前一個水印,則會發(fā)出新的水印时肿。
  • 這里我們展示了兩個使用周期性水印生成的時間戳分配器的簡單示例庇茫。 請注意,F(xiàn)link附帶了一個BoundedOutOfOrdernessTimestampExtractor螃成,類似于下面顯示的BoundedOutOfOrdernessGenerator旦签;
  • java
/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxTimeLag = 5000; // 5 seconds

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.getCreationTime();
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current time minus the maximum time lag
        return new Watermark(System.currentTimeMillis() - maxTimeLag);
    }
}
  • scala
/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxOutOfOrderness = 3500L // 3.5 seconds

    var currentMaxTimestamp: Long = _

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
    }
}

/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxTimeLag = 5000L // 5 seconds

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current time minus the maximum time lag
        new Watermark(System.currentTimeMillis() - maxTimeLag)
    }
}

With Punctuated Watermarks

伴隨event的到來就提取watermark查坪,就是每一個event到來的時候,就會提取一次Watermark宁炫。這樣的方式當然設置watermark更為精準偿曙,但是當數(shù)據(jù)量大的時候,頻繁的更新wartermark會比較影響性能淋淀。通常情況下采用定時提取就足夠了遥昧。

  • 當生成watermark的過程中某個event指示器 可能生成新wateramrk,請使用AssignerWithPunctuatedWatermarks朵纷。 對于此類炭臭,F(xiàn)link將首先調用extractTimestamp(...)方法為元素分配時間戳,然后立即調用該元素上的checkAndGetNextWatermark(...)方法袍辞。
  • checkAndGetNextWatermark(...)方法傳遞在extractTimestamp(...)方法中分配的時間戳鞋仍,并可以決定是否要生成watermark。 每當checkAndGetNextWatermark(...)方法返回非空watermark搅吁,并且該watermark大于最新的先前watermark時威创,將發(fā)出該新watermark。
  • demo
public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> {

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.getCreationTime();
    }

    @Override
    public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
        return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
    }
}

  • scala
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
        if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
    }
}

  • 注意:可以在每個事件上生成水印谎懦。 然而肚豺,因為每個水印在下游引起一些計算,所以過多的水印會降低性能.

Timestamps per Kafka Partition

  • 當使用Apache Kafka作為數(shù)據(jù)源時界拦,每個Kafka分區(qū)可能具有簡單的事件時間模式(升序時間戳或有界無序)吸申。但是,當從Kafka消費流數(shù)據(jù)時享甸,多個分區(qū)通常并行消費截碴,交錯來自分區(qū)的事件并破壞每個分區(qū)模式(這是Kafka的消費者客戶端工作的固有方式)。
  • 在這種情況下蛉威,您可以使用Flink的Kafka分區(qū)感知水印生成日丹。使用該功能,根據(jù)Kafka分區(qū)在Kafka使用者內部生成水印蚯嫌,并且每個分區(qū)水印的合并方式與在流shuffle上合并水印的方式相同哲虾。
  • 例如,如果事件時間戳嚴格按每個Kafka分區(qū)升序择示,則使用升序時間戳水印生成器生成每分區(qū)水印將產(chǎn)生完美的整體水印束凑。
  • 下圖顯示了如何使用per-Kafka分區(qū)水印生成,以及在這種情況下水印如何通過流數(shù)據(jù)流傳播对妄。
  • demo
FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {

    @Override
    public long extractAscendingTimestamp(MyType element) {
        return element.eventTimestamp();
    }
});

DataStream<MyType> stream = env.addSource(kafkaSource);
  • scala
val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
    def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})

val stream: DataStream[MyType] = env.addSource(kafkaSource)
kafka.png

Pre-defined Timestamp Extractors / Watermark Emitters

  • 這部分比較簡單,是兩個類的講解敢朱,詳見官網(wǎng)剪菱;
  • 建議大家認真閱讀一下官網(wǎng)摩瞎;

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市孝常,隨后出現(xiàn)的幾起案子旗们,更是在濱河造成了極大的恐慌,老刑警劉巖构灸,帶你破解...
    沈念sama閱讀 217,509評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件上渴,死亡現(xiàn)場離奇詭異,居然都是意外死亡喜颁,警方通過查閱死者的電腦和手機稠氮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,806評論 3 394
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來半开,“玉大人隔披,你說我怎么就攤上這事〖挪穑” “怎么了奢米?”我有些...
    開封第一講書人閱讀 163,875評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長纠永。 經(jīng)常有香客問我鬓长,道長,這世上最難降的妖魔是什么尝江? 我笑而不...
    開封第一講書人閱讀 58,441評論 1 293
  • 正文 為了忘掉前任涉波,我火速辦了婚禮,結果婚禮上茂装,老公的妹妹穿的比我還像新娘怠蹂。我一直安慰自己,他們只是感情好少态,可當我...
    茶點故事閱讀 67,488評論 6 392
  • 文/花漫 我一把揭開白布城侧。 她就那樣靜靜地躺著,像睡著了一般彼妻。 火紅的嫁衣襯著肌膚如雪嫌佑。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,365評論 1 302
  • 那天侨歉,我揣著相機與錄音屋摇,去河邊找鬼。 笑死幽邓,一個胖子當著我的面吹牛炮温,可吹牛的內容都是我干的。 我是一名探鬼主播牵舵,決...
    沈念sama閱讀 40,190評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼柒啤,長吁一口氣:“原來是場噩夢啊……” “哼倦挂!你這毒婦竟也來了?” 一聲冷哼從身側響起担巩,我...
    開封第一講書人閱讀 39,062評論 0 276
  • 序言:老撾萬榮一對情侶失蹤方援,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后涛癌,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體犯戏,經(jīng)...
    沈念sama閱讀 45,500評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,706評論 3 335
  • 正文 我和宋清朗相戀三年拳话,在試婚紗的時候發(fā)現(xiàn)自己被綠了先匪。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,834評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡假颇,死狀恐怖胚鸯,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情笨鸡,我是刑警寧澤姜钳,帶...
    沈念sama閱讀 35,559評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站形耗,受9級特大地震影響哥桥,放射性物質發(fā)生泄漏。R本人自食惡果不足惜激涤,卻給世界環(huán)境...
    茶點故事閱讀 41,167評論 3 328
  • 文/蒙蒙 一拟糕、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧倦踢,春花似錦送滞、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,779評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至晤碘,卻和暖如春褂微,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背园爷。 一陣腳步聲響...
    開封第一講書人閱讀 32,912評論 1 269
  • 我被黑心中介騙來泰國打工宠蚂, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人童社。 一個月前我還...
    沈念sama閱讀 47,958評論 2 370
  • 正文 我出身青樓求厕,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子呀癣,可洞房花燭夜當晚...
    茶點故事閱讀 44,779評論 2 354

推薦閱讀更多精彩內容