Flink提供了接口缕陕,允許編程者自定義 timesatmp extractor 與 watermark emitter瀑梗。更確切的說枢冤,可以根據(jù)情況征冷,實(shí)現(xiàn) AssignerWithPeriodicWatermarks 或 AssignerWithPunctuatedWatermarks接口歹颓。簡單來說坯屿,AssignerWithPeriodicWatermarks 會(huì)周期性的發(fā)射watermark,AssignerWithPunctuatedWatermarks 會(huì)根據(jù)數(shù)據(jù)的某些屬性來決定是否發(fā)射watermark巍扛。
為了更大程度的簡化這個(gè)編程過程领跛,flink自帶了一些默認(rèn)實(shí)現(xiàn)的timestamp assigner。這一部分會(huì)介紹它們撤奸。除了它們開箱即用的特性隔节,它們的實(shí)現(xiàn)也是一個(gè)實(shí)現(xiàn)自定義類的很好的參考。
Assigners with ascending timestamps
Periodic Watermarks最簡單的場景便是source中數(shù)據(jù)的timestamp是升序流入的寂呛。這種情況下怎诫,當(dāng)前流入數(shù)據(jù)的timestamp就可以當(dāng)做是watermark,因?yàn)椴粫?huì)有亂序的情況發(fā)生贷痪,也就是不會(huì)有小于當(dāng)前數(shù)據(jù)時(shí)間戳的數(shù)據(jù)在當(dāng)前數(shù)據(jù)之后流入幻妓。
注意,僅僅需要在每個(gè)并發(fā)的source task中保持上述特性就可以劫拢。例如:通過設(shè)置后肉津,每個(gè)Kafka partiton都在一個(gè)獨(dú)立的source task中讀取數(shù)據(jù),這時(shí)舱沧,只要求每個(gè)partition的數(shù)據(jù)的時(shí)間戳是升序的就可以妹沙。Flink的watermark的merge機(jī)制會(huì)在并發(fā)流的shuffled,unionconnected,merged時(shí)生成正確的watermark熟吏。
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
@Override
public long extractAscendingTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
Assingers allowing a fixed amount of lateness
另一個(gè)使用 Periodic Watermarks 的場景是watermark始終落后當(dāng)前流中最大時(shí)間戳一個(gè)固定的時(shí)間距糖。也適用于事前就知道流中數(shù)據(jù)的最大遲到時(shí)間的情況玄窝,如:在測試時(shí),使用自定義source讀取在一定時(shí)間范圍內(nèi)亂序的數(shù)據(jù)悍引。(注:最大遲到時(shí)間不是指數(shù)據(jù)的時(shí)間戳與系統(tǒng)時(shí)間的差值恩脂,而是遲到數(shù)據(jù)與該數(shù)據(jù)之前流入的數(shù)據(jù)的最大時(shí)間戳的差值)
對于這種情況,flink提供了 BoundedOutOfOrdernessTimestampExtractor 趣斤,它的入?yún)?maxOutOfOrderness俩块,也就是,對于給定的window來說浓领,它需要在event time坐標(biāo)系下等待多久的遲到數(shù)據(jù)之后再觸發(fā)計(jì)算(超過等待時(shí)間的數(shù)據(jù)會(huì)被忽略)玉凯。數(shù)據(jù)是否遲到取決于 lateness= t - t_w 的值,其中t代表event time坐標(biāo)系下數(shù)據(jù)的時(shí)間戳联贩,t_w代表該數(shù)據(jù)之前的最大watermark壮啊。如果 lateness > 0 ,那么該數(shù)據(jù)會(huì)被認(rèn)為是遲到了撑蒜,默認(rèn)情況下歹啼,當(dāng)進(jìn)行window計(jì)算時(shí),該數(shù)據(jù)會(huì)被忽略座菠,不參與計(jì)算狸眼。(注:根據(jù)這個(gè)公式,我覺得是 lateness < 0 的情況下浴滴,才是遲到數(shù)據(jù))
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
@Override
public long extractTimestamp(MyEvent element) {
return element.getCreationTime();
}
});