Flink 中提供了3種時(shí)間模型:EventTime、ProcessingTime、IngestionTime。
底層實(shí)現(xiàn)上分為2種:Processing Time 與 Event Time,Ingestion Time 本質(zhì)上也是一種 Processing Time悔政,官方文檔 上對(duì)于3者的描述(參考下圖):
- EventTime 是事件創(chuàng)建的時(shí)間,即數(shù)據(jù)產(chǎn)生時(shí)自帶時(shí)間戳延旧。
- IngestionTime 是事件進(jìn)入 Flink 的時(shí)間谋国,即進(jìn)入 source operator 是給定的時(shí)間戳。
- ProcessingTime 是每一個(gè)執(zhí)行 window 操作的本地時(shí)間迁沫。
可以參考以下兩篇 Blog 和 Paper 幫助對(duì)時(shí)間域的理解芦瘾,也是官方推薦的
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102
https://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/43864.pdf
附筆者 翻譯1 翻譯2 翻譯3
Flink 如何設(shè)置時(shí)間域纪岁?
調(diào)用 setStreamTimeCharacteristic
設(shè)置時(shí)間域喂饥,枚舉類 TimeCharacteristic
預(yù)設(shè)了三種時(shí)間域雏节,不顯式設(shè)置的情況下扰柠,默認(rèn)使用 TimeCharacteristic.ProcessTime
。這也是 Flink 程序一般最開(kāi)始的工作藐吮。
# Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// 可選的:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
EventTime 與 WaterMarks
為什么必須處理事件時(shí)間溺拱?
在大多數(shù)情況下,消息進(jìn)入系統(tǒng)中是無(wú)序的(網(wǎng)絡(luò)谣辞、硬件迫摔、分布式邏輯都可能影響),并且會(huì)有消息延遲到達(dá)(例如移動(dòng)場(chǎng)景中泥从,由于手機(jī)無(wú)信號(hào)句占,導(dǎo)致一系列的操作消息在手機(jī)重新連接信號(hào)后發(fā)送),如果按照消息進(jìn)入系統(tǒng)的時(shí)間計(jì)算躯嫉,結(jié)果會(huì)與實(shí)時(shí)嚴(yán)重不符合纱烘。理想情況是 event time 和 processing time 是一致的(發(fā)生時(shí)間即處理時(shí)間),但是現(xiàn)實(shí)情況是不一致的祈餐,兩者存在歪斜(skew)擂啥。
因此,支持事件時(shí)間的流式處理程序需要一種方法來(lái)測(cè)量事件時(shí)間的進(jìn)度帆阳。例如哺壶,有一個(gè)按小時(shí)構(gòu)建的窗口,當(dāng)事件時(shí)間超過(guò)了一小時(shí)的時(shí)間范圍蜒谤,需要通知該窗口山宾,以便關(guān)閉正在進(jìn)行的窗口。
什么是水喻⒒铡(watermarks)
Flink 中檢測(cè)事件時(shí)間處理進(jìn)度的機(jī)制就是水印资锰,Watermark 作為數(shù)據(jù)處理流中的一部分進(jìn)行傳輸,并且攜帶一個(gè)時(shí)間戳t
旬盯。一個(gè) Watermark(t) 表示流中應(yīng)該不再有事件時(shí)間比t
小的元素(某個(gè)事件的時(shí)間戳比 Watermark 時(shí)間大)台妆。
Watermark 有助于解決亂序問(wèn)題
下圖表示一個(gè)順序的事件流中的 Watermark, Watermark 只代表一個(gè)簡(jiǎn)單的標(biāo)記翎猛,
下圖表示一個(gè)亂序的事件流中的 Watermark胖翰,表示所有事件時(shí)間戳小于 Watermark 時(shí)間戳的數(shù)據(jù)都已經(jīng)處理完了,任何事件大于 Watermark 的元素都不應(yīng)該再出現(xiàn)切厘,當(dāng)然這只是一種推測(cè)性的結(jié)果(基于多種信息的推測(cè))萨咳,
并行流中的水印
水印是在 Source function(源函數(shù))處或之后生成的。源函數(shù)的每個(gè)并行子任務(wù)通常獨(dú)立地生成水印疫稿。這些水印定義了該特定并行源的事件時(shí)間培他。
當(dāng)水印經(jīng)過(guò)流處理程序時(shí)鹃两,會(huì)將該算子的事件時(shí)間向前推進(jìn)。當(dāng)算子提前其事件時(shí)間時(shí)舀凛,會(huì)為后續(xù)的算子生成新水印俊扳。
一些算子使用多個(gè)輸入流,例如猛遍,使用 union 或者 keyBy/partition 函數(shù)的算子馋记。此類算子的當(dāng)前事件時(shí)間是其輸入流事件時(shí)間的最小值。
當(dāng)它的輸入流更新它們的事件時(shí)間時(shí)懊烤,算子也會(huì)更新梯醒。
下圖顯示了事件和水印經(jīng)過(guò)并行流的示例,以及跟蹤事件時(shí)間的運(yùn)算符腌紧。
延遲記錄(Late Elements)
某些記錄可能會(huì)違反水印的條件茸习,事件時(shí)間小于t但是晚于水印t到達(dá)。實(shí)際運(yùn)行過(guò)程中壁肋,事件可能被延遲任意的時(shí)間号胚,所以不可能指定一個(gè)時(shí)間,保證該時(shí)間之前的所有事件都被處理了浸遗。而且涕刚,即使延時(shí)時(shí)間是有界限的,過(guò)多的延遲水印的時(shí)間也是不理想的乙帮,會(huì)造成時(shí)間窗口處理的太多延時(shí)杜漠。
生成時(shí)間戳和水印
- 首先設(shè)置時(shí)間域
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- 分配時(shí)間戳
處理事件事件需要知道事件發(fā)生時(shí)間的時(shí)間戳,通常從流中數(shù)據(jù)元的某個(gè)字段提取時(shí)間戳察净。時(shí)間戳分配與生成水印密切相關(guān)驾茴,水印告訴系統(tǒng)事件時(shí)間的進(jìn)展。
有兩種方法可以分配時(shí)間戳并生成水忧饪ā:
- 直接在數(shù)據(jù)流 source 中
- 通過(guò)時(shí)間戳分配器/水印生成器:在 Flink 中锈至,時(shí)間戳分配器也會(huì)定義要發(fā)出的水印
帶時(shí)間戳和水印的 Source Functions
Stream source 可以直接為生成的數(shù)據(jù)元分配時(shí)間戳,也可以發(fā)出水印译秦。完成此 算子操作后峡捡,不需要時(shí)間戳分配器。如果使用了時(shí)間戳分配器筑悴,則 source 函數(shù)提供的任何時(shí)間戳和水印都將被覆蓋们拙。
要直接為源中的數(shù)據(jù)元分配時(shí)間戳,源必須使用 collectWithTimestamp(...)
方法作用域 SourceContext
阁吝。要生成水印砚婆,源必須調(diào)用 emitWatermark(Watermark)
函數(shù)。
下面是一個(gè)分配時(shí)間戳并生成水印的簡(jiǎn)單示例:
override def run(ctx: SourceContext[MyType]): Unit = {
while (/* condition */) {
val next: MyType = getNext()
ctx.collectWithTimestamp(next, next.eventTimestamp)
if (next.hasWatermarkTime) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime))
}
}
}
時(shí)間戳分配器/水印生成器
時(shí)間戳分配器(Timestamp assigners)獲取流并生成帶有帶時(shí)間戳數(shù)據(jù)元和水印的新流突勇。如果原始流已經(jīng)有時(shí)間戳或水印装盯,時(shí)間戳分配器會(huì)覆蓋它們坷虑。
時(shí)間戳分配器通常在數(shù)據(jù)源生成之后立即指定,但并非被嚴(yán)格要求這樣做埂奈。常見(jiàn)的模式是在時(shí)間戳分配器之前執(zhí)行解析(MapFunction)和過(guò)濾(FilterFunction)迄损。在任何情況下,需要在第一個(gè)操作事件時(shí)間的算子執(zhí)行之前指定時(shí)間戳分配器(例如第一個(gè)窗口算子操作)账磺。
作為一種特殊情況海蔽,當(dāng)使用 Kafka 作為流式作業(yè)的數(shù)據(jù)源時(shí) ,F(xiàn)link 允許在源(或消費(fèi)者)本身內(nèi)部指定時(shí)間戳分配器/水印發(fā)射器绑谣。更多信息相關(guān)信息請(qǐng)參考 Kafka Connector 文檔党窜。
下面是一個(gè)時(shí)間戳分配器/水印生成器的簡(jiǎn)單示例(只介紹了必須實(shí)現(xiàn)的主要接口):
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// create stream source
val stream: DataStream[MyEvent] = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter())
// assign timestamp and watermark assigner after filter function
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
.filter( _.severity == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
// window function and sink
withTimestampsAndWatermarks
.keyBy( _.getGroup )
.timeWindow(Time.seconds(10))
.reduce( (a, b) => a.add(b) )
.addSink(...)
使用周期性(periodically)水印
AssignerWithPeriodicWatermarks 分配時(shí)間戳并定期生成水印(可能取決于流數(shù)據(jù)元借宵,或純粹基于處理時(shí)間)幌衣。
生成水印的間隔(每n毫秒)使用 ExecutionConfig.setAutoWatermarkInterval(...)
。每次調(diào)用分配器的方法 getCurrentWatermark()
壤玫,如果返回的水印非空并且大于先前的水印豁护,則將發(fā)出新的水印。
下面有兩個(gè)例子欲间,時(shí)間戳分配器使用周期性水映铩:
該例子假定元素到達(dá)時(shí)在一定程度上是無(wú)序的,某個(gè)時(shí)間戳t
的最后達(dá)到元素相比時(shí)間戳t
的最早到達(dá)元素猎贴,最大延遲n毫秒班缎。
/**
The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxOutOfOrderness = 3500L // 3.5 seconds
var currentMaxTimestamp: Long = _
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current highest timestamp minus the out-of-orderness bound
new Watermark(currentMaxTimestamp - maxOutOfOrderness)
}
}
該例子假設(shè)元素在有界延遲后到達(dá),生成器生成的水印比處理時(shí)間滯后固定時(shí)間長(zhǎng)度她渴。
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxTimeLag = 5000L // 5 seconds
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current time minus the maximum time lag
new Watermark(System.currentTimeMillis() - maxTimeLag)
}
}
第二個(gè)例子比較容易理解达址,使用系統(tǒng)時(shí)間減去允許的延時(shí)時(shí)間作為 watermark 的時(shí)間。只跟當(dāng)前系統(tǒng)時(shí)間有關(guān)系趁耗,如果大批事件出現(xiàn)延時(shí)的情況沉唠,可能很多在 watermark 的時(shí)間之后出現(xiàn)了,會(huì)被被丟棄苛败。
第一個(gè)例子满葛,在當(dāng)前事件的事件時(shí)間和當(dāng)前最大時(shí)間(記錄最大的事件時(shí)間)中取最大值,得到最大的事件時(shí)間罢屈。用這個(gè)最大值減去一個(gè)允許的延時(shí)時(shí)間作為 watermark 時(shí)間嘀韧。同樣的如果大批事件發(fā)生延時(shí),那么對(duì)應(yīng)的 watermark 的時(shí)間就會(huì)向后推儡遮。
帶標(biāo)記(Punctuated)水印
使用 AssignerWithPunctuatedWatermarks
在某個(gè)事件指定生成新的水印的時(shí)候生成水印乳蛾。這種情況下,F(xiàn)link 首先會(huì)調(diào)用 extractTimestamp(...)
方法為數(shù)據(jù)分配時(shí)間戳鄙币,然后立即調(diào)用 checkAndGetNextWatermark(...)
肃叶。
checkAndGetNextWatermark(...)
方法傳遞在 extractTimestamp(...)
生成的時(shí)間戳,并且界定是否要生成水印十嘿。每當(dāng) checkAndGetNextWatermark(...)
方法返回非空水印因惭,并且該水印大于先一個(gè)水印時(shí),將向后發(fā)出新水印绩衷。
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}
override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
}
}
每個(gè)事件都可以生成水印蹦魔。但是,由于水印會(huì)導(dǎo)致一些后續(xù)的計(jì)算咳燕,因此過(guò)多的水印會(huì)降低性能勿决。
每個(gè) Kafka 分區(qū)一個(gè)時(shí)間戳
當(dāng)使用 Kafka 作為數(shù)據(jù)源的時(shí)候,每個(gè)分區(qū)可能有一個(gè)簡(jiǎn)單的事件時(shí)間模式(按時(shí)間戳升序或其他)招盲。當(dāng)消費(fèi)來(lái)自 Kafka 的流時(shí)低缩,多個(gè)分區(qū)一般會(huì)并行消費(fèi),分區(qū)中的事件交替消費(fèi)曹货,會(huì)破壞分區(qū)中的模式(Kafka 的消費(fèi)者客戶端工作方式)咆繁。
在這種情況下,可以使用 Flink 的 Kafka-partition-aware(分區(qū)感知)水印生成器顶籽。使用這個(gè)特性的時(shí)候玩般,水印會(huì)在 Kafka 消費(fèi)者內(nèi)部為每個(gè)分區(qū)生成,并且每個(gè)分區(qū)水印的合并方式與在流shuffle時(shí)合并水印的方式相同礼饱。
例如坏为,如果事件時(shí)間戳嚴(yán)格按每個(gè) Kafka 分區(qū)升序排列,那么使用升序時(shí)間戳水印生成器镊绪,為每分區(qū)生成水印 將產(chǎn)生完美的總體水印久脯。
下圖顯示了如何為每個(gè) Kafka 分區(qū)生成水印,以及在這種情況下水印如何通過(guò)流式數(shù)據(jù)流傳播镰吆。
val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
// kafka source set timestamp assigner
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})
val stream: DataStream[MyType] = env.addSource(kafkaSource)
具有遞增時(shí)間戳的 Assigner
定期生成水印的最簡(jiǎn)單的特殊情況是帘撰,給定的源任務(wù)看到的時(shí)間戳按升序出現(xiàn)的情況。在這種情況下万皿,當(dāng)前時(shí)間戳始終可以充當(dāng)水印摧找。
時(shí)間戳只需要在每個(gè)并行數(shù)據(jù)源任務(wù)中是升序的。例如牢硅,如果在特定設(shè)置中蹬耘,每個(gè)并發(fā)的源實(shí)例讀取一個(gè) Kafka 分區(qū),則只需要在每個(gè) Kafka 分區(qū)中時(shí)間戳是遞增减余。水印合并機(jī)制將生成正確的水印综苔,當(dāng)并行流被shuffle,union,connect 或 merge 時(shí)如筛。
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
允許固定時(shí)間延遲的 Assigner
另一個(gè)定期水印的例子是堡牡,當(dāng)水印滯后于流中看到的最大時(shí)間戳(事件時(shí)間)一段固定的時(shí)間。包括杨刨,預(yù)先知道流中可能遇到的最大延遲的情況晤柄。Flink 提供了 BoundedOutOfOrdernessTimestampExtractor
,使用參數(shù) maxOutOfOrderness
妖胀,計(jì)算給定窗口的最終結(jié)果時(shí)芥颈,允許元素延遲的最長(zhǎng)時(shí)間,超過(guò)的會(huì)被忽略赚抡。延遲為 t - t_w
(t
是數(shù)據(jù)的事件時(shí)間時(shí)間戳爬坑,t_w
是前一個(gè)水印的時(shí)間戳),如果延遲大于0涂臣,數(shù)據(jù)被認(rèn)為是遲到的盾计,默認(rèn)會(huì)在計(jì)算窗口的作業(yè)結(jié)果時(shí)被忽略。
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamps_watermarks.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html
http://vishnuviswanath.com/flink_eventtime.html