Watermark 是 Flink 實(shí)時(shí)處理計(jì)算平臺(tái)的一個(gè)重要概念斥废,也是 Google 的著名實(shí)時(shí)計(jì)算論文 The Data?ow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing 里面經(jīng)常提到的名詞。看了很多的參考文獻(xiàn),和文檔蝗柔。有了一些自己的理解和體會(huì)粒督,做個(gè)筆記。最近 Flink 1.5 版本正式發(fā)布灭忠,也算蹭個(gè)熱度吧。
1. Watermark 的理解
? 最早看到 Watermark 的概念就是在 Flink 的官方文檔里面:
The mechanism in Flink to measure progress in event time is watermarks. Watermarks flow as part of the data stream and carry a timestamp t. A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark)
?上面這句話中有一個(gè)詞叫做 event-time座硕,了解 DataFlow 模型的同學(xué)都知道弛作, event-time 表示的是每個(gè)獨(dú)立事件在各自設(shè)備上產(chǎn)生的時(shí)間,是這個(gè)事件特有的屬性华匾,好比是一個(gè)人的生日映琳。與之對(duì)應(yīng)的還有 processing-time 和 ingestion-time。理解起來也不難瘦真,processing-time 就是 Flink 的 window 操作該事件的時(shí)間刊头,ingestion-time 是事件作為數(shù)據(jù)源進(jìn)入 Flink 系統(tǒng)的時(shí)間。參考下面這個(gè)圖1诸尽,來源于 Flink 文檔
? 重新回到引用中的這句英文原杂,翻譯過來的大致意思是," Watermarks 是 Flink 用來度量 event-time 事件處理進(jìn)度的一種機(jī)制您机,Watermarks 作為數(shù)據(jù)流的一部分穿肄,攜帶一個(gè)時(shí)間 t。表示這次的數(shù)據(jù)流中 event-time 已經(jīng)處理到時(shí)間 t 了际看,所有 event-time 早于或等于 t 時(shí)刻的事件都不應(yīng)該再出現(xiàn)在這個(gè)數(shù)據(jù)流中了咸产。" 有些朋友可能會(huì)有疑問,為什么要弄這么一個(gè)奇怪的限制仲闽。這是因?yàn)橐话銇碚f脑溢,在實(shí)際環(huán)境下,由于網(wǎng)絡(luò)阻塞赖欣,延遲等問題屑彻,processing-time 相比 event-time 會(huì)有延遲滯后的現(xiàn)象,而且這種現(xiàn)象非常普遍顶吮。也就是說社牲,并不是生產(chǎn)出一個(gè)事件, 就能在第一時(shí)間被處理悴了。參考下面這個(gè)圖2搏恤,來源于 Dataflow Model 論文违寿。
?圖2可以看到,橫坐標(biāo)是 event-time熟空,縱坐標(biāo)是 processing-time藤巢。理想的處理機(jī)制是,12:01分的 event-time 時(shí)間痛阻,應(yīng)該就在 12:01分的 processing-time 時(shí)刻被處理菌瘪。但是實(shí)際情況下腮敌,橫向藍(lán)色實(shí)線對(duì)應(yīng)的 processing-time 晚于12:01分阱当。所以圖中橫向黑色實(shí)線表示 event-time 的偏離,縱向紅色實(shí)線表示 processing-time 的偏離糜工,這些都比較容易理解弊添。 接下來,按照原圖的注釋捌木,淺色虛線表示 理想的 watermark油坝,深色虛線表示 實(shí)際的 watermark∨亳桑看到這里似乎有些困惑澈圈,該如何理解呢?
?直到帆啃,我看到另外一篇由 Tyler Akidau 大神寫的文章 Stream 102瞬女,是這樣描述 watermark 的:
Watermarks are temporal notions of input completeness in the event-time domain. Worded differently, they are the way the system measures progress and completeness relative to the event times of the records being processed in a stream of events.
? 大致意思是," Watermark 是從 event-time 維度描述輸入數(shù)據(jù)的完整性努潘, 換句話說诽偷,是系統(tǒng)評(píng)價(jià)數(shù)據(jù)流中處理 event-time 事件進(jìn)度和完整性的方式"。在圖2中疯坤,隨著 processing-time 的推移报慕,深色虛線逐步獲取了 event-time 的完成進(jìn)度。從概念上說压怠,可以理解成一個(gè)函數(shù)F(P) -> E眠冈,給定一個(gè) processing-time 值,函數(shù)返回對(duì)應(yīng)的 event-time 值菌瘫。這個(gè) event-time 時(shí)間值 E 就被系統(tǒng)認(rèn)為所有早于 E 時(shí)刻的 event-time 事件都已經(jīng)被觀察處理過蜗顽,或者說系統(tǒng)斷言,以后都不會(huì)再有早于 E 時(shí)刻的 event-time 事件出現(xiàn)了突梦。至此诫舅,對(duì)于流處理的算子(window ... )而言,對(duì)于這個(gè)時(shí)間段的計(jì)算告一段落宫患,可以生成結(jié)果了刊懈。
?當(dāng)然,很多朋友會(huì)問,如果過了 watermark 之后還有比估計(jì)時(shí)刻早的數(shù)據(jù)虚汛,姍姍來遲匾浪,怎么辦?Flink 框架自帶 Allowed Lateness 機(jī)制卷哩,這部分又是涉及到另一個(gè)概念 trigger蛋辈,這篇文章先不提了。
2. Watermark 的例子
? 了解了 watermark 概念将谊,引用 vishnuviswanath 論壇上的例子更好的加深理解冷溶。
case 1: 消息到達(dá)沒有延遲
? 如圖3,假設(shè)數(shù)據(jù)源生成3條消息尊浓,分別是第13秒逞频,第13秒,第16秒栋齿。計(jì)算窗口為10秒苗胀,每隔5秒滑動(dòng)一次。
?這些消息在圖4中會(huì)落入對(duì)應(yīng)時(shí)間窗口瓦堵。前兩個(gè)在第13秒生成的消息會(huì)落入 [5s - 15s] 的窗口1和 [10s - 20s] 的窗口2基协。而第16秒生成的消息 會(huì)落入 [10s - 20s] 的窗口2和 [15s - 25s] 的窗口3。最終每隔窗口期望得到的結(jié)果分別是 (a,2), (a,3) 和 (a,1)菇用。
case 2: 消息到達(dá)存在延遲
?假設(shè)有一個(gè)第13秒生成的消息到達(dá)時(shí)澜驮,延遲了6秒(在第19秒到達(dá)),原因可能是網(wǎng)絡(luò)阻塞刨疼。那么情況就變成了圖5這樣泉唁。
? 圖中可以看到,延遲的消息落入了窗口2和窗口3揩慕,因?yàn)?9秒屬于 10-20 和 15-25的范圍內(nèi)亭畜。雖然這個(gè)例子中,對(duì)于窗口2的結(jié)果沒有影響迎卤,但是窗口1和窗口3的結(jié)果都發(fā)生了變化拴鸵,導(dǎo)致發(fā)生錯(cuò)誤的原因就是處理消息時(shí)間的時(shí)候用的是 processing-time 而不是 event-time。
case 3:用基于 event-time 的系統(tǒng)來處理問題
?使用 event-time 處理機(jī)制蜗搔,需要對(duì)每個(gè)消息提取 event-time 信息劲藐。這個(gè)時(shí)間信息一般消息自帶,或者手動(dòng)生成樟凄∑肝撸看下面一個(gè)例子,暫時(shí)忽略 getCurrentWatermark
這個(gè)函數(shù)缝龄,之后來討論汰现。
public class TimestampExtractor implements AssignerWithPeriodicWatermarks<String> {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis());
}
@Override
public long extractTimestamp(String e, long previousElementTimestamp) {
return Long.parseLong(e.split(",")[1]);
}
}
? 有了 event-time 信息提取器之后挂谍,主函數(shù)流程如下:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> text = env.socketTextStream("localhost", 9999)
.assignTimestampsAndWatermarks(new TimestampExtractor());
DataStream<Tuple2<String, Integer>> count = text.map((MapFunction<String, Tuple2<String, Integer>>) m -> Tuple2.of(m.split(",")[0], 1))
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.sum(1);
count.print();
env.execute("EventTime processing example");
}
? 產(chǎn)生的結(jié)果見圖6,圖中可以看出窗口2和窗口3產(chǎn)生了正確的結(jié)果瞎饲,但是窗口1還是錯(cuò)的口叙。Flink 不把延遲的消息指派給窗口3是因?yàn)樗老⒌?event-time不屬于窗口3。但是為什么 Flink 不將延遲消息指派給窗口1呢嗅战?是因?yàn)橥铮?dāng)延遲消息到達(dá)的時(shí)候(第19秒)窗口1已經(jīng)完成計(jì)算了。那好驮捍,接下來解決這個(gè)問題疟呐。
case 4:使用 Watermark 機(jī)制
? 對(duì)于我們的案例來說,我們的目的是告訴 Flink 一個(gè)消息可以延遲多久厌漂。在 case 3 中提到了 getCurrentWatermark
函數(shù)萨醒,我們?cè)O(shè)置了 watermark 值為當(dāng)前系統(tǒng)時(shí)間斟珊,這表示沒有考慮到延遲的消息∥現(xiàn)在設(shè)置 watermark 值為 當(dāng)前時(shí)間 - 5秒,這就表明 Flink 期望的消息延遲最大是5秒囤踩。這是因?yàn)榇翱谠谳敵鲎罱K計(jì)算結(jié)果的條件是旨椒,當(dāng) watermark 超過窗口結(jié)束時(shí)間。所以只有當(dāng)前時(shí)間到達(dá)第20秒的時(shí)候堵漱,窗口1 [5s - 15s] 才符合輸出條件综慎,同樣的窗口2 [10s - 20s] 要第25秒時(shí)才會(huì)輸出結(jié)果。代碼如下:
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis() - 5000);
}
? 運(yùn)行的結(jié)果如圖7勤庐,最終示惊,三個(gè)窗口的結(jié)果都正確。
3. Watermark 的本地驗(yàn)證
? 看了上面的案例愉镰,突然又有了一個(gè)奇葩的想法米罚,如果把 當(dāng)前時(shí)間 - Lateness 的方法改成 當(dāng)前時(shí)間 + Lateness 的情況呢?是不是窗口會(huì)提前給出結(jié)果呢丈探?想到這里录择,打算在本地試一試。Flink 分配消息時(shí)間戳的方式有兩種:1. 直接在數(shù)據(jù)源 (source)指派; 2. 通過定義 timestamp assigner / watermark generator碗降。具體教程參考官方文檔隘竭。上一節(jié)內(nèi)容中的代碼主要是第二種方式,剛好本地驗(yàn)證的時(shí)候試試第一種方式讼渊。
public class EventTimeDoc {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 設(shè)置并行度為1动看,方便調(diào)試
env.setParallelism(1);
DataStreamSource<Tuple2<String, Long>> input = env.addSource(new SourceFunction<Tuple2<String, Long>>() {
String[] letters = new String[]{"A", "B", "C", "D", "E", "F"};
Random random = new Random();
@Override
public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
while (true) {
String key = letters[random.nextInt(6)];
Tuple2<String, Long> next = Tuple2.of(key, System.currentTimeMillis());
// 打印產(chǎn)生的消息
long w = showWindowsStart(next.f1, 0L, 7000L);
System.out.println(next + " -> " + LocalDateTime.fromDateFields(new Date(next.f1)) + " -> 所屬的窗口起始點(diǎn)是:" + LocalDateTime.fromDateFields(new Date(w)));
// event-time 設(shè)置為消息本身時(shí)間
ctx.collectWithTimestamp(next, next.f1);
// 消息時(shí)間 + Lateness
ctx.emitWatermark(new Watermark(next.f1 + 8000));
// 生成一條消息,暫停2秒
TimeUnit.SECONDS.sleep(2);
}
}
@Override
public void cancel() {
}
});
input.keyBy(0).timeWindow(Time.seconds(7))
.reduce((ReduceFunction<Tuple2<String, Long>>) (value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1))
.map(new MapFunction<Tuple2<String, Long>, Object>() {
@Override
public Object map(Tuple2<String, Long> value) throws Exception {
return Tuple2.of(value.f0, LocalDateTime.fromDateFields(new Date(value.f1)));
}
})
.print();
env.execute();
}
private static long showWindowsStart(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
}
? 本地測(cè)試構(gòu)造了一個(gè) <String, Long>的 Tuple 消息序列爪幻,String 是 字母 A-F 的隨機(jī)取值菱皆,Long 是每次生成消息時(shí)的時(shí)間戳赋兵。計(jì)算窗口選擇 TumblingEventTimeWindows
,固定為 7 秒搔预。伴隨每條消息生成發(fā)送 watermark 值霹期,并休眠 2 秒。其中 watermark 的值設(shè)置成 消息的EventTime + Lateness (Lateness 這里設(shè)置的是8秒)拯田。
? 本地測(cè)試時(shí)历造,第一個(gè)消息產(chǎn)生在第49秒,根據(jù)窗口的生命周期我們知道船庇,所屬窗口的第一個(gè)消息一旦到達(dá)吭产,窗口就立刻生成,窗口范圍計(jì)算方法是 org.apache.flink.streaming.api.windowing.windows.TimeWindow
的:
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
? 對(duì)照?qǐng)D8的分析鸭轮,窗口1 的范圍是 [48s - 55s) 左閉右開區(qū)間臣淤。伴隨著該消息的生成,watermark 也被發(fā)送窃爷,此時(shí) watermark 是57秒邑蒋,已經(jīng)超過了窗口1的結(jié)束時(shí)間,窗口1輸出最終結(jié)果 -- 第49秒的消息按厘,結(jié)束自己的生命周期医吊,窗口1被移除。所以第51秒逮京,第53秒的消息沒有被觀察到卿堂。
? 接下來,第55秒的消息進(jìn)入窗口2 [55s - 1分2s) 左閉右開區(qū)間懒棉,和第49秒消息一樣 watermark 值已經(jīng)超過了窗口2的結(jié)束時(shí)間草描,窗口2輸出最終結(jié)果 -- 第55秒的消息,結(jié)束自己的生命周期策严,窗口2被移除穗慕。所以第 57秒,第59秒享钞,第1分1秒的消息沒有被觀察到揍诽。
? 時(shí)間繼續(xù)推移,當(dāng)?shù)?分1秒的消息到達(dá)的時(shí)候栗竖,對(duì)應(yīng)的 watermark 值為 1分9秒暑脆,已經(jīng)超過了 窗口3 [1分2s - 1分9s) 左閉右開區(qū)間 的結(jié)束時(shí)間。結(jié)果窗口3中的消息還沒到達(dá)狐肢,沒來得及計(jì)算添吗,就已經(jīng)被移除了,第 1分3秒份名,第1分5秒碟联,第1分7秒的消息沒有被觀察到妓美。
? 以此類推,在 窗口4 [1分9s - 1分16s) 左閉右開區(qū)間最終得到的結(jié)果只有第1分9秒的消息鲤孵。
? 附上控制臺(tái)輸出的結(jié)果壶栋,見圖9∑占啵可以發(fā)現(xiàn)最終結(jié)果和圖8的分析是一致的贵试。(紅色框的內(nèi)容就是各個(gè)窗口最終結(jié)果)
4. 總結(jié)
? 實(shí)時(shí)計(jì)算中,處理延遲消息是系統(tǒng)的重要一部分凯正。整篇文章毙玻,可以了解遲到的消息如何對(duì)系統(tǒng)造成不同結(jié)果,以及不同 watermark 值在 ApacheFlink 中對(duì)窗口結(jié)果的影響廊散。