文檔解讀
文檔路徑
/Application Development/Streaming (DataStream API)/Event Time
關(guān)于event time和watermark轿塔,內(nèi)容比較多姆吭,本文的順序不一定完全按照原文檔的順序進(jìn)行解讀
Event time
官方的定義
Event time is the time that each individual event occurred on its producing device.
事件時(shí)間就是數(shù)據(jù)流中事件實(shí)際發(fā)生的時(shí)間。事件時(shí)間也是實(shí)際業(yè)務(wù)中需要處理的時(shí)間,由于各種數(shù)據(jù)源的不同特點(diǎn)硬纤,可能在流中計(jì)算的時(shí)候會(huì)遇到事件的延遲或者事件時(shí)間的亂序,為解決這個(gè)問題出現(xiàn)了Watermark敦捧。
Watermark
官方對Watermark概念的說明可以參考兩個(gè)地方袜香,第一就是官網(wǎng)的原文
The mechanism in Flink to measure progress in event time is watermarks. Watermarks flow as part of the data stream and carry a timestamp t. A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark).
第二就是Watermark類的API說明
A Watermark tells operators that no elements with a timestamp older or equal to the watermark timestamp should arrive at the operator. Watermarks are emitted at the sources and propagate through the operators of the topology. Operators must themselves emit watermarks to downstream operators using
Output.emitWatermark(Watermark)
. Operators that do not internally buffer elements can always forward the watermark that they receive. Operators that buffer elements, such as window operators, must forward a watermark after emission of elements that is triggered by the arriving watermark.In some cases a watermark is only a heuristic and operators should be able to deal with late elements. They can either discard those or update the result and emit updates/retractions to downstream operations.
When a source closes it will emit a final watermark with timestamp
Long.MAX_VALUE
. When an operator receives this it will know that no more input will be arriving in the future.
本文將針對第二種解釋做進(jìn)一步的說明,通過上文可以總結(jié)2點(diǎn):
- Watermark中包含了一個(gè)時(shí)間戳乞封,作用就是告訴算子在這個(gè)時(shí)間戳之前的數(shù)據(jù)都已經(jīng)到達(dá)了做裙,不會(huì)再有小于或等于這個(gè)時(shí)間戳的數(shù)據(jù)再到達(dá)這個(gè)算子了。
- Watermark可以認(rèn)為有一個(gè)“生命周期”肃晚,即出生锚贱,傳播,死亡关串。
Watermark生命周期
Watermark的“出生”
Watermark的產(chǎn)生有兩種方式拧廊,
- Source Functions中直接指定,請參考官文Source Functions with Timestamps and Watermarks
- 通過
DataStream#assignTimestampsAndWatermarks
方式指定如何生成Watermark晋修,這種方式又有兩種模式吧碾,時(shí)間驅(qū)動(dòng)的周期水位線(Periodic Watermarks)和數(shù)據(jù)驅(qū)動(dòng)的定點(diǎn)水位線(Punctuated Watermarks),對于周期水位線墓卦,官方提供了兩種實(shí)現(xiàn)可以參考BoundedOutOfOrdernessTimestampExtractor.java
和AscendingTimestampExtractor.java
倦春。
Watermark的傳播
此部分的內(nèi)容可以參見筆者之前的文章Watermarks in Parallel Streams
Watermark的“死亡”
當(dāng)watermark的時(shí)間戳變成Long.MAX_VALUE的時(shí)候,也就表示告訴算子再也沒有數(shù)據(jù)會(huì)到達(dá)了
擴(kuò)展閱讀
對于以周期模式產(chǎn)生watermark的時(shí)候落剪,官方給出的說明:
The interval (every n milliseconds) in which the watermark will be generated is defined via ExecutionConfig.setAutoWatermarkInterval(...).
對于這句話讀者要知道是:
- 如果不指定溅漾,默認(rèn)是200毫秒
-
ExecutionConfig.setAutoWatermarkInterval
一定要放到env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
后面才會(huì)生效,因?yàn)樵?code>setStreamTimeCharacteristic里面會(huì)強(qiáng)制設(shè)置周期為200毫秒著榴,如果這個(gè)方法后執(zhí)行添履,就會(huì)覆蓋原有設(shè)置的周期
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().setAutoWatermarkInterval(0);
} else {
getConfig().setAutoWatermarkInterval(200);
}
}
這個(gè)參數(shù)是在TimestampsAndPeriodicWatermarksOperator#open
中會(huì)拿到設(shè)置的watermarkInterval
并將此值傳給timerService
public void open() throws Exception {
super.open();
currentWatermark = Long.MIN_VALUE;
watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
if (watermarkInterval > 0) {
long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
}
這里getProcessingTimeService()
返回的對象就是StreamTask中的timerService。