本章節(jié)是關(guān)于在event time上執(zhí)行的程序。有關(guān)event time, processing time, and ingestion time的更多介紹仪媒,請(qǐng)參閱事件時(shí)間(Event Time)
為了與event time結(jié)合使用狸眼,流程序需要相應(yīng)地設(shè)置一個(gè)時(shí)間特性藤树。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
分配時(shí)間戳(Assigning Timestamps)
為了讓event time工作,F(xiàn)link需要知道事件的時(shí)間戳份企,這意味著流中的每個(gè)元素都需要分配其事件時(shí)間戳也榄。這個(gè)通常是通過抽取或者訪問事件中某些字段的時(shí)間戳來獲取的。
時(shí)間戳的分配伴隨著水印的生成司志,告訴系統(tǒng)事件時(shí)間中的進(jìn)度甜紫。
這里有兩種方式來分配時(shí)間戳和生成水印:
1. 直接在數(shù)據(jù)流源中進(jìn)行。
2. 通過timestamp assigner
和watermark generator
生成:在Flink中骂远,timestamp
分配器也定義了用來發(fā)射的水印囚霸。
注意:timestamp和watermark都是通過從1970年1月1日0時(shí)0分0秒到現(xiàn)在的毫秒數(shù)來指定的。
帶有Timestamp和Watermark的源函數(shù)(Source Function with Timestamps And Watermarks)
數(shù)據(jù)流源可以直接為它們產(chǎn)生的數(shù)據(jù)元素分配timestamp
激才,并且他們也能發(fā)送水印拓型。這樣做的話,就沒必要再去定義timestamp
分配器了瘸恼,需要注意的是:如果一個(gè)timestamp
分配器被使用的話劣挫,由源提供的任何timestamp
和watermark
都會(huì)被重寫。
為了通過源直接為一個(gè)元素分配一個(gè)timestamp
东帅,源需要調(diào)用SourceContext
中的collectWithTimestamp(...)
方法压固。為了生成watermark,源需要調(diào)用emitWatermark(Watermark)
方法靠闭。
下面是一個(gè)簡(jiǎn)單的(無checkpoint)由源分配timestamp和產(chǎn)生watermark的例子:
override def run(ctx: SourceContext[MyType]): Unit = {
while (/* condition */) {
val next: MyType = getNext()
ctx.collectWithTimestamp(next, next.eventTimestamp)
if (next.hasWatermarkTime) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime))
}
}
}
時(shí)間戳分配器/水印生成器(Timestamp Assigners / Watermark Generators)
Timestamp
分配器獲取一個(gè)流并生成一個(gè)新的帶有Timestamp
元素和水印的流帐我。如果原始流已經(jīng)有時(shí)間戳和/或水印,則Timestamp
分配程序?qū)⒏采w它們
Timestamp分配器通常在數(shù)據(jù)源之后立即指定愧膀,但這并不是嚴(yán)格要求的拦键。通常是在timestamp分配器之前先解析(MapFunction)和過濾(FilterFunction)。在任何情況下檩淋,都需要在事件時(shí)間上的第一個(gè)操作(例如第一個(gè)窗口操作)之前指定timestamp分配程序芬为。有一個(gè)特殊情況,當(dāng)使用Kafka作為流作業(yè)的數(shù)據(jù)源時(shí)蟀悦,F(xiàn)link允許在源內(nèi)部指定timestamp分配器和watermark生成器碳柱。更多關(guān)于如何進(jìn)行的信息請(qǐng)參考Kafka Connector的文檔。
接下來的部分展示了要?jiǎng)?chuàng)建自己的timestamp 抽取器和watermark發(fā)射器熬芜,程序員需要實(shí)現(xiàn)的主要接口。想要查看Flink預(yù)定義的抽取器福稳,
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[MyEvent] = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter())
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
.filter( _.severity == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
withTimestampsAndWatermarks
.keyBy( _.getGroup )
.timeWindow(Time.seconds(10))
.reduce( (a, b) => a.add(b) )
.addSink(...)
周期性水印(With Periodic Watermarks)
AssignerWithPeriodicWatermarks
分配時(shí)間戳并定期生成水印(這可能依賴于流元素涎拉,或者純粹基于處理時(shí)間)。
watermark
生成的時(shí)間間隔(每n毫秒)是通過ExecutionConfig.setAutoWatermarkInterval(…)
定義的。每次調(diào)用分配器的getCurrentWatermark()
方法時(shí)鼓拧,如果返回的watermark
非空且大于前一個(gè)watermark
半火,則會(huì)發(fā)出新的watermark
。
這里我們展示了兩個(gè)使用周期性水印生成的時(shí)間戳分配器的簡(jiǎn)單示例季俩。請(qǐng)注意钮糖,F(xiàn)link附帶了一個(gè)BoundedOutOfOrdernessTimestampExtractor
,類似于下面所示的BoundedOutOfOrdernessGenerator
酌住,您可以在這里閱讀相關(guān)內(nèi)容店归。
/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxOutOfOrderness = 3500L // 3.5 seconds
var currentMaxTimestamp: Long = _
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current highest timestamp minus the out-of-orderness bound
new Watermark(currentMaxTimestamp - maxOutOfOrderness)
}
}
/**
* This generator generates watermarks that are lagging behind processing time by a fixed amount.
* It assumes that elements arrive in Flink after a bounded delay.
*/
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxTimeLag = 5000L // 5 seconds
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current time minus the maximum time lag
new Watermark(System.currentTimeMillis() - maxTimeLag)
}
}
帶斷點(diǎn)的水印(With Punctuated Watermarks)
無論何時(shí),當(dāng)某一事件表明需要?jiǎng)?chuàng)建新的watermark
時(shí)酪我,使用AssignerWithPunctuatedWatermarks
創(chuàng)建消痛。這個(gè)類首先調(diào)用extractTimestamp(…)
方法來為元素分配一個(gè)時(shí)間戳,然后立即調(diào)用該元素上的checkAndGetNextWatermark(…)
方法都哭。
checkAndGetNextWatermark(…)方法傳入在給extractTimestamp(…)方法中分配的timestamp
秩伞,并可以決定是否要生成watermark
。每當(dāng)checkAndGetNextWatermark(…)
方法返回一個(gè)非空watermark
并且該watermark
大于最新的前一個(gè)watermark
時(shí)欺矫,就會(huì)發(fā)出新的watermark
纱新。
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}
override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
}
}
注意: 可以在每個(gè)事件上生成一個(gè)watermark
。但是穆趴,由于每個(gè)watermark
都會(huì)導(dǎo)致下游的一些計(jì)算脸爱,過多的watermark
會(huì)降低性能。
每個(gè)Kafka分區(qū)的Timestamp(TimeStamps per Kafka Partion)
當(dāng)使用Apache Kafka作為數(shù)據(jù)源時(shí)毡代,每個(gè)Kafka分區(qū)
可能有一個(gè)簡(jiǎn)單的事件時(shí)間模式(遞增timestamp
或有界的無序)阅羹。然而,當(dāng)使用來自Kafka的流時(shí)教寂,多個(gè)分區(qū)通常是并行使用的捏鱼,將事件與分區(qū)交叉,破壞了每個(gè)分區(qū)的數(shù)據(jù)模型(這是Kafka消費(fèi)者
客戶端所固有的工作方式)
在這種情況下,您可以使用Flink支持Kafka-partition-aware
生成水印酪耕。該特性可以在Kafka消費(fèi)者
內(nèi)部生成watermarks
导梆,每個(gè)分區(qū)的watermarks
合并方式與流shuffles
時(shí)合并watermarks
的方式相同。
例如迂烁,如果事件時(shí)間戳嚴(yán)格按照Kafka分區(qū)遞增排列看尼,那么使用升序時(shí)間戳水印生成器生成每個(gè)分區(qū)的水印將產(chǎn)生完美的整體水印。
下圖展示了如何使用每個(gè)kafka分區(qū)生成水印盟步,以及在這種情況下水印如何通過流數(shù)據(jù)傳播藏斩。
val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})
val stream: DataStream[MyType] = env.addSource(kafkaSource)