1蒋腮、概念
在Flink中割择,水位線是一種衡量Event Time進展的機制立宜,用來處理實時數(shù)據(jù)中的亂序問題的冒萄,通常是水位線和窗口結(jié)合使用來實現(xiàn)。
從設(shè)備生成實時流事件橙数,到Flink的source宦言,再到多個oparator處理數(shù)據(jù),過程中會受到網(wǎng)絡(luò)延遲商模、背壓等多種因素影響造成數(shù)據(jù)亂序奠旺。在進行窗口處理時,不可能無限期的等待延遲數(shù)據(jù)到達施流,當?shù)竭_特定watermark時,認為在watermark之前的數(shù)據(jù)已經(jīng)全部達到(即使后面還有延遲的數(shù)據(jù)), 可以觸發(fā)窗口計算响疚,這個機制就是 Watermark(水位線),具體如下圖所示瞪醋。
2忿晕、水位線的計算
watermark本質(zhì)上是一個時間戳,且是動態(tài)變化的银受,會根據(jù)當前最大事件時間產(chǎn)生践盼。watermarks具體計算為:
watermark = 進入 Flink 窗口的最大的事件時間(maxEventTime)— 指定的延遲時間(t)
當watermark時間戳大于等于窗口結(jié)束時間時,意味著窗口結(jié)束宾巍,需要觸發(fā)窗口計算咕幻。
3、水位線生成
3.1 生成的時機
水位線生產(chǎn)的最佳位置是在盡可能靠近數(shù)據(jù)源的地方顶霞,因為水位線生成時會做出一些有關(guān)元素順序相對時間戳的假設(shè)肄程。由于數(shù)據(jù)源讀取過程是并行的,一切引起Flink跨行數(shù)據(jù)流分區(qū)進行重新分發(fā)的操作(比如:改變并行度选浑,keyby等)都會導致元素時間戳亂序蓝厌。但是如果是某些初始化的filter、map等不會引起元素重新分發(fā)的操作古徒,可以考慮在生成水位線之前使用拓提。
3.2 水位線分配器
- Periodic Watermarks
周期性分配水位線比較常用,是我們會指示系統(tǒng)以固定的時間間隔發(fā)出的水位線隧膘。在設(shè)置時間為事件時間時代态,會默認設(shè)置這個時間間隔為200ms, 如果需要調(diào)整可以自行設(shè)置狐粱。比如下面的例子是手動設(shè)置每隔1s發(fā)出水位線。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 手動設(shè)置時間間隔為1s
env.getConfig().setAutoWatermarkInterval(1000);
周期水位線需要實現(xiàn)接口:AssignerWithPeriodicWatermarks胆数,下面是示例:
public class TestPeriodWatermark implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
Long currentMaxTimestamp = 0L;
final Long maxOutOfOrderness = 1000L;// 延遲時長是1s
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
long timestamp = element.f1;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
}
- Punctuated Watermarks
定點水位線不是太常用肌蜻,主要為輸入流中包含一些用于指示系統(tǒng)進度的特殊元組和標記,方便根據(jù)輸入元素生成水位線的場景使用的必尼。
由于數(shù)據(jù)流中每一個遞增的EventTime都會產(chǎn)生一個Watermark蒋搜。
在實際的生產(chǎn)中Punctuated方式在TPS很高的場景下會產(chǎn)生大量的Watermark在一定程度上對下游算子造成壓力,所以只有在實時性要求非常高的場景才會選擇Punctuated的方式進行Watermark的生成判莉。
public class TestPunctuateWatermark implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Tuple2<String, Long> lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}
@Override
public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
return element.f1;
}
}
4豆挽、水位線與數(shù)據(jù)完整性
水位線可以用于平衡延遲和結(jié)果的完整性,它控制著執(zhí)行某些計算需要等待的時間券盅。這個時間是預(yù)估的帮哈,現(xiàn)實中不存在完美的水位線,因為總會存在延遲的記錄∶潭疲現(xiàn)實處理中娘侍,需要我們足夠了解從數(shù)據(jù)生成到數(shù)據(jù)源的整個過程,來估算延遲的上線泳炉,才能更好的設(shè)置水位線憾筏。
如果水位線設(shè)置的過于寬松,好處是計算時能保證近可能多的數(shù)據(jù)被收集到花鹅,但由于此時的水位線遠落后于處理記錄的時間戳氧腰,導致產(chǎn)生的數(shù)據(jù)結(jié)果延遲較大。
如果設(shè)置的水位線過于緊迫刨肃,數(shù)據(jù)結(jié)果的時效性當然會更好古拴,但由于水位線大于部分記錄的時間戳,數(shù)據(jù)的完整性就會打折扣真友。
所以黄痪,水位線的設(shè)置需要更多的去了解數(shù)據(jù),并在數(shù)據(jù)時效性和完整性上有一個權(quán)衡锻狗。