Flink生成Timestamps和Watermarks

本章節(jié)是關(guān)于在event time上執(zhí)行的程序。有關(guān)event time, processing time, and ingestion time的更多介紹仪媒,請(qǐng)參閱事件時(shí)間(Event Time)

為了與event time結(jié)合使用狸眼,流程序需要相應(yīng)地設(shè)置一個(gè)時(shí)間特性藤树。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

分配時(shí)間戳(Assigning Timestamps)

為了讓event time工作,F(xiàn)link需要知道事件的時(shí)間戳份企,這意味著流中的每個(gè)元素都需要分配其事件時(shí)間戳也榄。這個(gè)通常是通過抽取或者訪問事件中某些字段的時(shí)間戳來獲取的。

時(shí)間戳的分配伴隨著水印的生成司志,告訴系統(tǒng)事件時(shí)間中的進(jìn)度甜紫。

這里有兩種方式來分配時(shí)間戳和生成水印:
1. 直接在數(shù)據(jù)流源中進(jìn)行。
2. 通過timestamp assignerwatermark generator生成:在Flink中骂远,timestamp分配器也定義了用來發(fā)射的水印囚霸。

注意:timestamp和watermark都是通過從1970年1月1日0時(shí)0分0秒到現(xiàn)在的毫秒數(shù)來指定的。

帶有Timestamp和Watermark的源函數(shù)(Source Function with Timestamps And Watermarks)

數(shù)據(jù)流源可以直接為它們產(chǎn)生的數(shù)據(jù)元素分配timestamp激才,并且他們也能發(fā)送水印拓型。這樣做的話,就沒必要再去定義timestamp分配器了瘸恼,需要注意的是:如果一個(gè)timestamp分配器被使用的話劣挫,由源提供的任何timestampwatermark都會(huì)被重寫。

為了通過源直接為一個(gè)元素分配一個(gè)timestamp东帅,源需要調(diào)用SourceContext中的collectWithTimestamp(...)方法压固。為了生成watermark,源需要調(diào)用emitWatermark(Watermark)方法靠闭。

下面是一個(gè)簡(jiǎn)單的(無checkpoint)由源分配timestamp和產(chǎn)生watermark的例子:

override def run(ctx: SourceContext[MyType]): Unit = {
    while (/* condition */) {
        val next: MyType = getNext()
        ctx.collectWithTimestamp(next, next.eventTimestamp)

        if (next.hasWatermarkTime) {
            ctx.emitWatermark(new Watermark(next.getWatermarkTime))
        }
    }
}

時(shí)間戳分配器/水印生成器(Timestamp Assigners / Watermark Generators)

Timestamp分配器獲取一個(gè)流并生成一個(gè)新的帶有Timestamp元素和水印的流帐我。如果原始流已經(jīng)有時(shí)間戳和/或水印,則Timestamp分配程序?qū)⒏采w它們

Timestamp分配器通常在數(shù)據(jù)源之后立即指定愧膀,但這并不是嚴(yán)格要求的拦键。通常是在timestamp分配器之前先解析(MapFunction)和過濾(FilterFunction)。在任何情況下檩淋,都需要在事件時(shí)間上的第一個(gè)操作(例如第一個(gè)窗口操作)之前指定timestamp分配程序芬为。有一個(gè)特殊情況,當(dāng)使用Kafka作為流作業(yè)的數(shù)據(jù)源時(shí)蟀悦,F(xiàn)link允許在源內(nèi)部指定timestamp分配器和watermark生成器碳柱。更多關(guān)于如何進(jìn)行的信息請(qǐng)參考Kafka Connector的文檔。

接下來的部分展示了要?jiǎng)?chuàng)建自己的timestamp 抽取器和watermark發(fā)射器熬芜,程序員需要實(shí)現(xiàn)的主要接口。想要查看Flink預(yù)定義的抽取器福稳,

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: DataStream[MyEvent] = env.readFile(
         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
         FilePathFilter.createDefaultFilter())

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .filter( _.severity == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

withTimestampsAndWatermarks
        .keyBy( _.getGroup )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) => a.add(b) )
        .addSink(...)

周期性水印(With Periodic Watermarks)

AssignerWithPeriodicWatermarks分配時(shí)間戳并定期生成水印(這可能依賴于流元素涎拉,或者純粹基于處理時(shí)間)。

watermark生成的時(shí)間間隔(每n毫秒)是通過ExecutionConfig.setAutoWatermarkInterval(…)定義的。每次調(diào)用分配器的getCurrentWatermark()方法時(shí)鼓拧,如果返回的watermark非空且大于前一個(gè)watermark半火,則會(huì)發(fā)出新的watermark

這里我們展示了兩個(gè)使用周期性水印生成的時(shí)間戳分配器的簡(jiǎn)單示例季俩。請(qǐng)注意钮糖,F(xiàn)link附帶了一個(gè)BoundedOutOfOrdernessTimestampExtractor,類似于下面所示的BoundedOutOfOrdernessGenerator酌住,您可以在這里閱讀相關(guān)內(nèi)容店归。

/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxOutOfOrderness = 3500L // 3.5 seconds

    var currentMaxTimestamp: Long = _

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
    }
}

/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxTimeLag = 5000L // 5 seconds

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current time minus the maximum time lag
        new Watermark(System.currentTimeMillis() - maxTimeLag)
    }
}

帶斷點(diǎn)的水印(With Punctuated Watermarks)

無論何時(shí),當(dāng)某一事件表明需要?jiǎng)?chuàng)建新的watermark時(shí)酪我,使用AssignerWithPunctuatedWatermarks創(chuàng)建消痛。這個(gè)類首先調(diào)用extractTimestamp(…)方法來為元素分配一個(gè)時(shí)間戳,然后立即調(diào)用該元素上的checkAndGetNextWatermark(…)方法都哭。

checkAndGetNextWatermark(…)方法傳入在給extractTimestamp(…)方法中分配的timestamp秩伞,并可以決定是否要生成watermark。每當(dāng)checkAndGetNextWatermark(…)方法返回一個(gè)非空watermark并且該watermark大于最新的前一個(gè)watermark時(shí)欺矫,就會(huì)發(fā)出新的watermark纱新。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
        if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
    }
}

注意: 可以在每個(gè)事件上生成一個(gè)watermark。但是穆趴,由于每個(gè)watermark都會(huì)導(dǎo)致下游的一些計(jì)算脸爱,過多的watermark會(huì)降低性能。

每個(gè)Kafka分區(qū)的Timestamp(TimeStamps per Kafka Partion)

當(dāng)使用Apache Kafka作為數(shù)據(jù)源時(shí)毡代,每個(gè)Kafka分區(qū)可能有一個(gè)簡(jiǎn)單的事件時(shí)間模式(遞增timestamp或有界的無序)阅羹。然而,當(dāng)使用來自Kafka的流時(shí)教寂,多個(gè)分區(qū)通常是并行使用的捏鱼,將事件與分區(qū)交叉,破壞了每個(gè)分區(qū)的數(shù)據(jù)模型(這是Kafka消費(fèi)者客戶端所固有的工作方式)

在這種情況下,您可以使用Flink支持Kafka-partition-aware生成水印酪耕。該特性可以在Kafka消費(fèi)者內(nèi)部生成watermarks导梆,每個(gè)分區(qū)的watermarks合并方式與流shuffles時(shí)合并watermarks的方式相同。

例如迂烁,如果事件時(shí)間戳嚴(yán)格按照Kafka分區(qū)遞增排列看尼,那么使用升序時(shí)間戳水印生成器生成每個(gè)分區(qū)的水印將產(chǎn)生完美的整體水印。

下圖展示了如何使用每個(gè)kafka分區(qū)生成水印盟步,以及在這種情況下水印如何通過流數(shù)據(jù)傳播藏斩。

val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
    def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})

val stream: DataStream[MyType] = env.addSource(kafkaSource)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市却盘,隨后出現(xiàn)的幾起案子狰域,更是在濱河造成了極大的恐慌媳拴,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,194評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件兆览,死亡現(xiàn)場(chǎng)離奇詭異屈溉,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)抬探,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門子巾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人小压,你說我怎么就攤上這事线梗。” “怎么了场航?”我有些...
    開封第一講書人閱讀 156,780評(píng)論 0 346
  • 文/不壞的土叔 我叫張陵缠导,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我溉痢,道長(zhǎng)僻造,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,388評(píng)論 1 283
  • 正文 為了忘掉前任孩饼,我火速辦了婚禮髓削,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘镀娶。我一直安慰自己立膛,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評(píng)論 5 384
  • 文/花漫 我一把揭開白布梯码。 她就那樣靜靜地躺著宝泵,像睡著了一般。 火紅的嫁衣襯著肌膚如雪轩娶。 梳的紋絲不亂的頭發(fā)上儿奶,一...
    開封第一講書人閱讀 49,764評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音鳄抒,去河邊找鬼闯捎。 笑死,一個(gè)胖子當(dāng)著我的面吹牛许溅,可吹牛的內(nèi)容都是我干的瓤鼻。 我是一名探鬼主播,決...
    沈念sama閱讀 38,907評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼贤重,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼茬祷!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起并蝗,我...
    開封第一講書人閱讀 37,679評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤牲迫,失蹤者是張志新(化名)和其女友劉穎耐朴,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體盹憎,經(jīng)...
    沈念sama閱讀 44,122評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評(píng)論 2 325
  • 正文 我和宋清朗相戀三年铐刘,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了陪每。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,605評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡镰吵,死狀恐怖檩禾,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情疤祭,我是刑警寧澤盼产,帶...
    沈念sama閱讀 34,270評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站勺馆,受9級(jí)特大地震影響戏售,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜草穆,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評(píng)論 3 312
  • 文/蒙蒙 一灌灾、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧悲柱,春花似錦锋喜、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至涯冠,卻和暖如春炉奴,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背功偿。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評(píng)論 1 265
  • 我被黑心中介騙來泰國(guó)打工盆佣, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人械荷。 一個(gè)月前我還...
    沈念sama閱讀 46,297評(píng)論 2 360
  • 正文 我出身青樓共耍,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親吨瞎。 傳聞我的和親對(duì)象是個(gè)殘疾皇子痹兜,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 本章節(jié)是關(guān)于在event time上執(zhí)行的程序的。想獲取更多關(guān)于event time颤诀,processing tim...
    寫B(tài)ug的張小天閱讀 23,701評(píng)論 0 12
  • Flink 中提供了3種時(shí)間模型:EventTime字旭、ProcessingTime对湃、IngestionTime。底...
    Alex90閱讀 6,532評(píng)論 0 2
  • 這個(gè)連接器提供了對(duì)由Apache Kafka提供的事件流的訪問遗淳。 Flink 提供了特殊的Kafka Connec...
    寫B(tài)ug的張小天閱讀 21,431評(píng)論 2 17
  • 原文鏈接 本節(jié)與在事件時(shí)間上運(yùn)行的程序相關(guān)拍柒。有關(guān)事件時(shí)間、處理時(shí)間和攝入時(shí)間的介紹屈暗,請(qǐng)參見事件時(shí)間簡(jiǎn)介拆讯。 為了處理...
    小C菜鳥閱讀 833評(píng)論 0 1
  • 你認(rèn)為借錢需要慎重思考嗎?你的好心有沒有換來你的煎熬养叛? 相信很多人都會(huì)回答种呐,當(dāng)然需要慎重思考。理由是弃甥,錢很...
    燕子Diana閱讀 554評(píng)論 2 1