[TOC]
在流處理中,時(shí)間是一個(gè)非常核心的概念址否,是整個(gè)系統(tǒng)的基石餐蔬。比如,我們經(jīng)常會(huì)遇到這樣的需求:給定一個(gè)時(shí)間窗口佑附,比如一個(gè)小時(shí)樊诺,統(tǒng)計(jì)時(shí)間窗口的內(nèi)數(shù)據(jù)指標(biāo)。那如何界定哪些數(shù)據(jù)將進(jìn)入這個(gè)窗口呢音同?在窗口的定義之前词爬,首先需要確定一個(gè)應(yīng)用使用什么樣的時(shí)間語義。
本文將介紹Flink的Event Time权均、Processing Time和Ingestion Time三種時(shí)間語義顿膨,接著會(huì)詳細(xì)介紹Event Time和Watermark的工作機(jī)制锅锨,以及如何對(duì)數(shù)據(jù)流設(shè)置Event Time并生成Watermark。
Flink的三種時(shí)間語義
如上圖所示恋沃,F(xiàn)link支持三種時(shí)間語義:
Event Time
Event Time指的是數(shù)據(jù)流中每個(gè)元素或者每個(gè)事件自帶的時(shí)間屬性必搞,一般是事件發(fā)生的時(shí)間。由于事件從發(fā)生到進(jìn)入Flink時(shí)間算子之間有很多環(huán)節(jié)囊咏,一個(gè)較早發(fā)生的事件因?yàn)檠舆t可能較晚到達(dá)恕洲,因此使用Event Time意味著事件到達(dá)有可能是亂序的。
使用Event Time時(shí)梅割,最理想的情況下霜第,我們可以一直等待所有的事件到達(dá)后再進(jìn)行時(shí)間窗口的處理。假設(shè)一個(gè)時(shí)間窗口內(nèi)的所有數(shù)據(jù)都已經(jīng)到達(dá)户辞,基于Event Time的流處理會(huì)得到正確且一致的結(jié)果:無論我們是將同一個(gè)程序部署在不同的計(jì)算環(huán)境還是在相同的環(huán)境下多次計(jì)算同一份數(shù)據(jù)泌类,都能夠得到同樣的計(jì)算結(jié)果。我們根本不同擔(dān)心亂序到達(dá)的問題咆课。但這只是理想情況末誓,現(xiàn)實(shí)中無法實(shí)現(xiàn)扯俱,因?yàn)槲覀兗炔恢谰烤挂榷嚅L時(shí)間才能確認(rèn)所有事件都已經(jīng)到達(dá)书蚪,更不可能無限地一直等待下去。在實(shí)際應(yīng)用中迅栅,當(dāng)涉及到對(duì)事件按照時(shí)間窗口進(jìn)行統(tǒng)計(jì)時(shí)殊校,F(xiàn)link會(huì)將窗口內(nèi)的事件緩存下來,直到接收到一個(gè)Watermark读存,以確認(rèn)不會(huì)有更晚數(shù)據(jù)的到達(dá)为流。Watermark意味著在一個(gè)時(shí)間窗口下,F(xiàn)link會(huì)等待一個(gè)有限的時(shí)間让簿,這在一定程度上降低了計(jì)算結(jié)果的絕對(duì)準(zhǔn)確性敬察,而且增加了系統(tǒng)的延遲。在流處理領(lǐng)域尔当,比起其他幾種時(shí)間語義莲祸,使用Event Time的好處是某個(gè)事件的時(shí)間是確定的,這樣能夠保證計(jì)算結(jié)果在一定程度上的可預(yù)測性椭迎。
一個(gè)基于Event Time的Flink程序中必須定義Event Time锐帜,以及如何生成Watermark。我們可以使用元素中自帶的時(shí)間畜号,也可以在元素到達(dá)Flink后人為給Event Time賦值缴阎。
使用Event Time的優(yōu)勢(shì)是結(jié)果的可預(yù)測性,缺點(diǎn)是緩存較大简软,增加了延遲蛮拔,且調(diào)試和定位問題更復(fù)雜述暂。
Processing Time
對(duì)于某個(gè)算子來說,Processing Time指算子使用當(dāng)前機(jī)器的系統(tǒng)時(shí)鐘來定義時(shí)間建炫。在Processing Time的時(shí)間窗口場景下贸典,無論事件什么時(shí)候發(fā)生,只要該事件在某個(gè)時(shí)間段達(dá)到了某個(gè)算子踱卵,就會(huì)被歸結(jié)到該窗口下廊驼,不需要Watermark機(jī)制。對(duì)于一個(gè)程序在同一個(gè)計(jì)算環(huán)境來說惋砂,每個(gè)算子都有一定的耗時(shí)妒挎,同一個(gè)事件的Processing Time,第n個(gè)算子和第n+1個(gè)算子不同西饵。如果一個(gè)程序在不同的集群和環(huán)境下執(zhí)行時(shí)酝掩,限于軟硬件因素,不同環(huán)境下前序算子處理速度不同眷柔,對(duì)于下游算子來說期虾,事件的Processing Time也會(huì)不同,不同環(huán)境下時(shí)間窗口的計(jì)算結(jié)果會(huì)發(fā)生變化驯嘱。因此镶苞,Processing Time在時(shí)間窗口下的計(jì)算會(huì)有不確定性。
Processing Time只依賴當(dāng)前執(zhí)行機(jī)器的系統(tǒng)時(shí)鐘鞠评,不需要依賴Watermark茂蚓,無需緩存。Processing Time是實(shí)現(xiàn)起來非常簡單也是延遲最小的一種時(shí)間語義剃幌。
Ingestion Time
Ingestion Time是事件到達(dá)Flink Souce的時(shí)間聋涨。從Source到下游各個(gè)算子中間可能有很多計(jì)算環(huán)節(jié),任何一個(gè)算子的處理速度快慢可能影響到下游算子的Processing Time负乡。而Ingestion Time定義的是數(shù)據(jù)流最早進(jìn)入Flink的時(shí)間牍白,因此不會(huì)被算子處理速度影響。
Ingestion Time通常是Event Time和Processing Time之間的一個(gè)折中方案抖棘。比起Event Time茂腥,Ingestion Time可以不需要設(shè)置復(fù)雜的Watermark,因此也不需要太多緩存钉答,延遲較低欺栗。比起Processing Time匾鸥,Ingestion Time的時(shí)間是Souce賦值的曹仗,一個(gè)事件在整個(gè)處理過程從頭至尾都使用這個(gè)時(shí)間古今,而且后續(xù)算子不受前序算子處理速度的影響,計(jì)算結(jié)果相對(duì)準(zhǔn)確一些右蹦,但計(jì)算成本稍高诊杆。
設(shè)置時(shí)間語義
在Flink中歼捐,我們需要在執(zhí)行環(huán)境層面設(shè)置使用哪種時(shí)間語義。下面的代碼使用Event Time:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
如果想用另外兩種時(shí)間語義晨汹,需要替換為:TimeCharacteristic.ProcessingTime和TimeCharacteristic.IngestionTime豹储。
Event Time和Watermark
Flink的三種時(shí)間語義中,Processing Time和Ingestion Time都可以不用設(shè)置Watermark淘这。如果我們要使用Event Time語義剥扣,以下兩項(xiàng)配置缺一不可:第一,使用一個(gè)時(shí)間戳為數(shù)據(jù)流中每個(gè)事件的Event Time賦值铝穷;第二钠怯,生成Watermark。
實(shí)際上曙聂,Event Time是每個(gè)事件的元數(shù)據(jù)晦炊,F(xiàn)link并不知道每個(gè)事件的發(fā)生時(shí)間是什么,我們必須要為每個(gè)事件的Event Time賦值一個(gè)時(shí)間戳宁脊。關(guān)于時(shí)間戳断国,包括Flink在內(nèi)的絕大多數(shù)系統(tǒng)都支持Unix時(shí)間戳系統(tǒng)(Unix time或Unix epoch)。Unix時(shí)間戳系統(tǒng)以1970-01-01 00:00:00.000 為起始點(diǎn)榆苞,其他時(shí)間記為距離該起始時(shí)間的整數(shù)差值稳衬,一般是毫秒(millisecond)精度。
有了Event Time時(shí)間戳语稠,我們還必須生成Watermark宋彼。Watermark是Flink插入到數(shù)據(jù)流中的一種特殊的數(shù)據(jù)結(jié)構(gòu)弄砍,它包含一個(gè)時(shí)間戳仙畦,并假設(shè)后續(xù)不會(huì)有小于該時(shí)間戳的數(shù)據(jù)。下圖展示了一個(gè)亂序數(shù)據(jù)流音婶,其中方框是單個(gè)事件慨畸,方框中的數(shù)字是其對(duì)應(yīng)的Event Time時(shí)間戳,圓圈為Watermark衣式,圓圈中的數(shù)字為Watermark對(duì)應(yīng)的時(shí)間戳寸士。
- Watermark與事件的時(shí)間戳緊密相關(guān)。一個(gè)時(shí)間戳為T的Watermark假設(shè)后續(xù)到達(dá)的事件時(shí)間戳都大于T碴卧。
- 假如Flink算子接收到一個(gè)違背上述規(guī)則的事件弱卡,該事件將被認(rèn)定為遲到數(shù)據(jù),如上圖中時(shí)間戳為19的事件比Watermark(20)更晚到達(dá)住册。Flink提供了一些其他機(jī)制來處理遲到數(shù)據(jù)婶博。
- Watermark時(shí)間戳必須單調(diào)遞增,以保證時(shí)間不會(huì)倒流荧飞。
- Watermark機(jī)制允許用戶來控制準(zhǔn)確度和延遲凡人。Watermark設(shè)置得與事件時(shí)間戳相距緊湊名党,會(huì)產(chǎn)生不少遲到數(shù)據(jù),影響計(jì)算結(jié)果的準(zhǔn)確度挠轴,整個(gè)應(yīng)用的延遲很低传睹;Watermark設(shè)置得非常寬松,準(zhǔn)確度能夠得到提升岸晦,但應(yīng)用的延遲較高欧啤,因?yàn)镕link必須等待更長的時(shí)間才進(jìn)行計(jì)算。
Flink的Watermark細(xì)節(jié)介紹
Watermark是什么启上?從不同的維度可以有不同的理解
- 從Watermark的計(jì)算角度看:可以將Watermark理解為一個(gè)函數(shù):
堂油,它的輸入是當(dāng)前的系統(tǒng)時(shí)間,輸出是一個(gè)Event Time(一個(gè)時(shí)間戳)碧绞,而且輸出的這個(gè)時(shí)間戳是嚴(yán)格單調(diào)遞增的府框。這樣看,Watermark就是一個(gè)函數(shù)讥邻。 - 從Watermark的具體形式來看:可以將Watermark當(dāng)成一個(gè)個(gè)時(shí)間戳迫靖,值就是1中輸出的那個(gè)時(shí)間戳。
- 從Watermark流轉(zhuǎn)的角度看:可以將Watermark理解成夾雜在正常流事件中的一個(gè)個(gè)特殊事件兴使。
這3種描述方式系宜,看似不同,實(shí)則一樣发魄,只是從不同的角度去看了而已盹牧。不管怎么理解,我們必須知道:流處理系統(tǒng)規(guī)定励幼,如果某個(gè)時(shí)刻Watermark的值為T1汰寓,那系統(tǒng)就認(rèn)為凡是早于或等于T1時(shí)間的事件都已經(jīng)收到了。注意苹粟,這個(gè)就是Watermark所代表的含義有滑,實(shí)際因?yàn)楝F(xiàn)實(shí)中各種情況,未必能嚴(yán)格做到這樣嵌削,但目標(biāo)就是要達(dá)到上面規(guī)定的這樣毛好,或者無限逼近。
Why苛秕?
為什么需要Watermark肌访?這個(gè)也有很多種描述方式,往大了說就是提供一種理論上解決分布式系統(tǒng)中消息亂序問題(這是分布式系統(tǒng)中一個(gè)經(jīng)典難題)的方案艇劫。說小點(diǎn)就是在有狀態(tài)的流計(jì)算中吼驶,當(dāng)我們關(guān)注事件的順序或者完整性時(shí),需要有這么一種機(jī)制能實(shí)現(xiàn)這個(gè)需求。
這里的完整性我舉個(gè)例子解釋一下:比如我們基于事件發(fā)生時(shí)間統(tǒng)計(jì)每5min的用戶PV總量旨剥,那比如12:00-12:05這個(gè)5min的統(tǒng)計(jì)該在什么時(shí)間點(diǎn)計(jì)算呢咧欣?假設(shè)沒有Watermark這個(gè)概念,你就永遠(yuǎn)不知道什么時(shí)候12:00-12:05區(qū)間的所有事件才全到齊轨帜。你不能假定收到12:00-12:05的數(shù)據(jù)就認(rèn)為之前的數(shù)據(jù)已經(jīng)全部來了魄咕,因?yàn)閿?shù)據(jù)可能延遲+亂序了。而Watermark就是為了解決這個(gè)問題而提出的蚌父,當(dāng)你收到Watermark的值為12:00-12:05的事件時(shí)哮兰,你就可以認(rèn)為早于這個(gè)時(shí)間的數(shù)據(jù)已經(jīng)都到了,數(shù)據(jù)已經(jīng)完整了苟弛,可以進(jìn)行12:00-12:05這個(gè)5min區(qū)間的數(shù)據(jù)計(jì)算了喝滞。至于如何保證,這個(gè)是框架要做的事情(當(dāng)然一般需要開發(fā)者參與)膏秫。
Where右遭?
哪里需要Watermark?這里我給一個(gè)簡單粗暴的結(jié)論缤削,當(dāng)同時(shí)滿足下面兩個(gè)條件的時(shí)候才會(huì)需要Watermark:
- 計(jì)算中使用了時(shí)間相關(guān)的算子(time-based operations)窘哈,其實(shí)再明確點(diǎn),就是使用了Window的時(shí)候(注:Flink的Global Window除外亭敢,這個(gè)Window不是基于時(shí)間的)滚婉。
- 1中使用的時(shí)間相關(guān)的算子選擇使用事件時(shí)間,即Event Time(注:如果是Flink的話帅刀,也包含Ingestion Time)让腹。
這里再解釋一下2。前文我們介紹過有兩種時(shí)間Event Time和Processing Time(Flink獨(dú)有的Ingestion Time在Watermark這里可以歸結(jié)為Event Time扣溺,后文不再另行說明)骇窍,時(shí)間相關(guān)的算子選擇時(shí)間時(shí)必然是二選一。并不是選擇Processing Time的時(shí)候就沒有Watermark了娇妓,只是這個(gè)時(shí)候Processing Time自身就是一個(gè)完美的Watermark(因?yàn)闀r(shí)間一去不復(fù)返像鸡,Processing Time永遠(yuǎn)是單調(diào)遞增的),并不需要產(chǎn)生單獨(dú)的Watermark了哈恰。所以在Processing Time里面,你可以認(rèn)為Watermark沒有意義了志群,所以去掉了着绷,或者認(rèn)為Processing Time自身就是Watermark都行。
實(shí)戰(zhàn)
場景介紹
為了方便說明锌云,我構(gòu)造了一個(gè)簡單的場景荠医,假設(shè)有一個(gè)設(shè)備產(chǎn)生了一組事件,如下:
{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}
一共9個(gè)事件,id是事件名稱彬向,timestamp是設(shè)備端事件真實(shí)產(chǎn)生的時(shí)間兼贡。也就是事件真實(shí)產(chǎn)生順序是:
event1, event2, event3, event4, event5, event6, event7, event8, event9
但在傳輸過程中因?yàn)楦鞣N現(xiàn)實(shí)原因亂序了,到Flink這里的時(shí)候娃胆,事件順序變成了:
event1, event2, event4, event5, event7, event3, event6, event8, event9
現(xiàn)在我們要做的事情就是計(jì)算每5秒中的事件個(gè)數(shù)遍希,以此來判斷事件高峰期。
說明:
- 這個(gè)計(jì)算是非常有代表性的里烦,比如電商統(tǒng)計(jì)每小時(shí)的pv就能知道每天用戶高峰期發(fā)生在哪幾個(gè)時(shí)段凿蒜,這里為了方便說明問題,把問題簡化了胁黑,并且為了快速出結(jié)果废封,把時(shí)間粒度縮短為5秒鐘。
- 計(jì)算時(shí)丧蘸,要想結(jié)果準(zhǔn)確漂洋,就不能使用Processing Time,這樣如果數(shù)據(jù)從產(chǎn)生到被處理延遲比較大的話力喷,最終計(jì)算的結(jié)果也會(huì)不準(zhǔn)確氮发。除非這個(gè)延遲可控或者可接受,則可簡單的使用Processing Time冗懦,否則就必須用Event Time進(jìn)行計(jì)算爽冕。
Flink提供的Watermark機(jī)制
Flink提供了3種方式來生成Watermark:
- 在Source中生成Watermark;
- 通過AssignerWithPeriodicWatermarks生成Watermark披蕉;
- 通過AssignerWithPunctuatedWatermarks生成Watermark颈畸;
前面介紹過了Watermark是在使用Event Time的場景下才使用的,所以給事件增加Event Time和生成Watermark是一對(duì)操作没讲,一般都是一起使用的眯娱。方式1是直接在Flink的最源頭Source那里就生成了Event Time和Watermark。方式2和方式3則是流處理中的某一步驟(可以理解為一個(gè)特殊點(diǎn)的算子)爬凑,它的輸入是流徙缴,輸出還是流,只不過經(jīng)過這個(gè)流之后事件就會(huì)有Event Timestamp和Watermark了嘁信,一般這一步放在Source之后于样,最晚也要在時(shí)間算子之前,也就是Window之前潘靖。而且他兩的優(yōu)先級(jí)高穿剖,如果Source中生成了Watermark,后面又使用了方式2或3卦溢,則會(huì)覆蓋之前的Event Timestamp和Watermark糊余。
下面我們分別介紹每種方式秀又。
Watermark In Source
package com.niyanchun.watermark;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;
import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;
/**
* Assign timestamp and watermark at Source Function Demo.
*
* @author NiYanchun
**/
public class AssignAtSourceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.addSource(new CustomSource())
.timeWindowAll(Time.seconds(5))
.process(new CustomProcessFunction())
.print();
env.execute();
}
public static class CustomSource extends RichSourceFunction<JSONObject> {
@Override
public void run(SourceContext<JSONObject> ctx) throws Exception {
System.out.println("event in source:");
getOutOfOrderEvents().forEach(e -> {
System.out.println(e);
long timestampInMills = ((DateTime) e.get("timestamp")).getMillis();
ctx.collectWithTimestamp(e, timestampInMills);
ctx.emitWatermark(new Watermark(timestampInMills));
});
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void cancel() {
}
}
/**
* generate out of order events
*
* @return List<JSONObject>
*/
private static List<JSONObject> getOutOfOrderEvents() {
// 2020-05-24 12:00:00
JSONObject event1 = new JSONObject().fluentPut("id", "event1")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 0));
// 2020-05-24 12:00:01
JSONObject event2 = new JSONObject().fluentPut("id", "event2")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 1));
// 2020-05-24 12:00:03
JSONObject event3 = new JSONObject().fluentPut("id", "event3")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 3));
// 2020-05-24 12:00:04
JSONObject event4 = new JSONObject().fluentPut("id", "event4")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 4));
// 2020-05-24 12:00:05
JSONObject event5 = new JSONObject().fluentPut("id", "event5")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 5));
// 2020-05-24 12:00:06
JSONObject event6 = new JSONObject().fluentPut("id", "event6")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 6));
// 2020-05-24 12:00:07
JSONObject event7 = new JSONObject().fluentPut("id", "event7")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 7));
// 2020-05-24 12:00:08
JSONObject event8 = new JSONObject().fluentPut("id", "event8")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 8));
// 2020-05-24 12:00:09
JSONObject event9 = new JSONObject().fluentPut("id", "event9")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 9));
// 這里把消息打亂,模擬實(shí)際中的消息亂序
// 真實(shí)的消息產(chǎn)生順序是(根據(jù)時(shí)間戳):event1, event2, event3, event4, event5, event6, event7, event8, event9
// 打亂之后的消息順序是:event1, event2, event4, event3, event5, event7, event6, event8, event9
return Arrays.asList(event1, event2, event4, event5, event7, event3, event6, event8, event9);
}
public static class CustomProcessFunction extends ProcessAllWindowFunction<JSONObject, Object, TimeWindow> {
@Override
public void process(Context context, Iterable<JSONObject> elements, Collector<Object> out) throws Exception {
TimeWindow window = context.window();
Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(String.format("\nwindow{%s - %s}", sdf.format(window.getStart()), sdf.format(window.getEnd())));
int count = 0;
for (JSONObject element : elements) {
System.out.println(element.getString("id"));
count++;
}
System.out.println("Total:" + count);
}
}
}
這里自定義了一個(gè)Source贬芥,然后接了一個(gè)Window(timeWindowAll)吐辙,做了一個(gè)簡單的處理,最終輸出蘸劈。這里需要注意一個(gè)點(diǎn):timeWindowAll底層其實(shí)是定義了一個(gè)TumblingWindows昏苏,至于使用Processing Time(TumblingProcessingTimeWindows),還是Event Time(TumblingEventTimeWindows)則由env.setStreamTimeCharacteristic來確定的昵时,該選項(xiàng)的默認(rèn)值是TimeCharacteristic.ProcessingTime捷雕,即使用Processing Time。
作為演示壹甥,修改一下上面代碼救巷,先使用Processing Time,看下結(jié)果:
event in source:
{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}
window{2020-05-24 20:12:30 - 2020-05-24 20:12:35}
event1
event2
event4
event5
event7
event3
event6
event8
event9
Total:9
Process finished with exit code 0
可以看到句柠,只有一個(gè)Window浦译,其范圍是window{2020-05-24 20:12:30 - 2020-05-24 20:12:35},即我代碼運(yùn)行的時(shí)間溯职,顯然這樣的統(tǒng)計(jì)結(jié)果是沒有意義的精盅,因?yàn)樗w現(xiàn)不出業(yè)務(wù)真正的高峰期。后面我們只討論使用Event Time的情況谜酒。
現(xiàn)在重新改為env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);叹俏,然后運(yùn)行:
event in source:
{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}
window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}
event1
event2
event4
Total:3
window{2020-05-24 12:00:05 - 2020-05-24 12:00:10}
event5
event7
event6
event8
event9
Total:5
Process finished with exit code 0
我們看下現(xiàn)在的輸出,有兩個(gè)Window:window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}和window{2020-05-24 12:00:05 - 2020-05-24 12:00:10}僻族,可以看到就是5秒鐘一個(gè)Window粘驰。然后12:00:00-12:00:05這個(gè)Window里面包含了3個(gè)事件:event1,event2述么,event4蝌数;12:00:05-12:00:10這個(gè)Window里面包含了5個(gè)事件:event5、event7度秘、event6顶伞、event8、event9剑梳。
從這個(gè)結(jié)果看event3丟了唆貌,其它數(shù)據(jù)都在,為什么呢阻荒?如果我說因?yàn)閑vent3亂序了挠锥,排在了后邊,你肯定會(huì)說event6也排到了event7后邊侨赡,為什么event6卻沒有丟呢?要解釋清楚這個(gè)問題還需要涉及到觸發(fā)器以及窗口的原理和機(jī)制,為了保證行文的連貫性羊壹,這里我先直接給出結(jié)論:因?yàn)榇翱谀J(rèn)的觸發(fā)器實(shí)現(xiàn)機(jī)制是本該在一個(gè)窗口內(nèi)的數(shù)據(jù)亂序了以后蓖宦,只要在這個(gè)窗口結(jié)束(即被觸發(fā))之前來,那是不影響的油猫,不認(rèn)為是遲到數(shù)據(jù)稠茂,不會(huì)被丟掉;但如果這個(gè)窗口已經(jīng)結(jié)束了才來情妖,就會(huì)被丟掉了睬关。比如event3本應(yīng)該屬于12:00:00-12:00:05這個(gè)窗口,當(dāng)event5這條數(shù)據(jù)來的時(shí)候毡证,這個(gè)窗口就就認(rèn)為數(shù)據(jù)完整了电爹,于是觸發(fā)計(jì)算,接著就銷毀了料睛。等event3來的時(shí)候已經(jīng)是12:00:05-12:00:10窗口了丐箩,所以它直接被丟掉了。也就是在時(shí)間窗口這里恤煞,對(duì)于“亂序”的定義不是要求每個(gè)到來事件的時(shí)間戳都嚴(yán)格升序屎勘,而是看屬于這個(gè)窗口的事件能否在窗口時(shí)間范圍內(nèi)來,如果能來居扒,就不算亂序概漱,至于在這個(gè)時(shí)間范圍內(nèi)來的先后順序無所謂。這個(gè)其實(shí)也是合理的喜喂。
另外還有兩個(gè)細(xì)節(jié)點(diǎn)要注意一下:
- 當(dāng)Source是有界數(shù)據(jù)時(shí)瓤摧,當(dāng)所有數(shù)據(jù)發(fā)送完畢后,系統(tǒng)會(huì)自動(dòng)發(fā)一個(gè)值為Long.MAX_VALUE的Watermark夜惭,表示數(shù)據(jù)發(fā)送完了姻灶。
- Window是一個(gè)左閉右開區(qū)間,比如12:00:00的數(shù)據(jù)屬于12:00:00-12:00:05窗口诈茧,而12:00:05的數(shù)據(jù)屬于12:00:05-12:00:10窗口产喉。
AssignerWithPeriodicWatermarks && AssignerWithPunctuatedWatermarks
AssignerWithPriodicWatermarks和AssignerWithPunctuatedWatermarks其實(shí)非常像,哪怕是用法都非常像敢会,他兩個(gè)的主要區(qū)別是Watermark的產(chǎn)生機(jī)制或者時(shí)機(jī):AssignerWithPriodicWatermarks是根據(jù)一個(gè)固定的時(shí)間周期性的產(chǎn)生Watermark曾沈,而AssignerWithPunctuatedWatermarks則是由事件驅(qū)動(dòng),然后代碼自己控制何時(shí)以何種方式產(chǎn)生Watermark鸥昏,比如一個(gè)event就產(chǎn)生一個(gè)塞俱,還是幾個(gè)event產(chǎn)生一個(gè),或者滿足什么條件時(shí)產(chǎn)生Watermark等吏垮,就是用戶可以靈活控制障涯。
package com.niyanchun.watermark;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;
/**
* Assign timestamp and watermark at Source Function Demo.
*
* @author NiYanchun
**/
public class AssignerWatermarksDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.addSource(new CustomSource())
.assignTimestampsAndWatermarks(new CustomAssignerWithPeriodicWatermarks())
// .assignTimestampsAndWatermarks(new CustomAssignerWithPunctuatedWatermarks())
.timeWindowAll(Time.seconds(5))
.process(new CustomProcessFunction())
.print();
env.execute();
}
public static class CustomSource extends RichSourceFunction<JSONObject> {
@Override
public void run(SourceContext<JSONObject> ctx) throws Exception {
System.out.println("event in source:");
getOutOfOrderEvents().forEach(e -> {
System.out.println(e);
ctx.collect(e);
});
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void cancel() {
}
}
/**
* generate out of order events
*
* @return List<JSONObject>
*/
private static List<JSONObject> getOutOfOrderEvents() {
// 2020-05-24 12:00:00
JSONObject event1 = new JSONObject().fluentPut("id", "event1")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 0));
// 2020-05-24 12:00:01
JSONObject event2 = new JSONObject().fluentPut("id", "event2")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 1));
// 2020-05-24 12:00:03
JSONObject event3 = new JSONObject().fluentPut("id", "event3")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 3));
// 2020-05-24 12:00:04
JSONObject event4 = new JSONObject().fluentPut("id", "event4")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 4));
// 2020-05-24 12:00:05
JSONObject event5 = new JSONObject().fluentPut("id", "event5")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 5));
// 2020-05-24 12:00:06
JSONObject event6 = new JSONObject().fluentPut("id", "event6")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 6));
// 2020-05-24 12:00:07
JSONObject event7 = new JSONObject().fluentPut("id", "event7")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 7));
// 2020-05-24 12:00:08
JSONObject event8 = new JSONObject().fluentPut("id", "event8")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 8));
// 2020-05-24 12:00:09
JSONObject event9 = new JSONObject().fluentPut("id", "event9")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 9));
// 可以把消息打亂罐旗,模擬實(shí)際中的消息亂序。
// 真實(shí)的消息產(chǎn)生順序是(根據(jù)時(shí)間戳):event1, event2, event3, event4, event5, event6, event7, event8, event9
// 打亂之后的消息順序是:event1, event2, event4, event3, event5, event7, event6, event8, event9
return Arrays.asList(event1, event2, event4, event5, event7, event3, event6, event8, event9);
}
public static class CustomProcessFunction extends ProcessAllWindowFunction<JSONObject, Object, TimeWindow> {
@Override
public void process(Context context, Iterable<JSONObject> elements, Collector<Object> out) throws Exception {
TimeWindow window = context.window();
Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(String.format("\nwindow{%s - %s}", sdf.format(window.getStart()), sdf.format(window.getEnd())));
int count = 0;
for (JSONObject element : elements) {
System.out.println(element.getString("id"));
count++;
}
System.out.println("Total:" + count);
}
}
/**
* AssignerWithPeriodicWatermarks demo
*/
public static class CustomAssignerWithPeriodicWatermarks implements AssignerWithPeriodicWatermarks<JSONObject> {
private long currentTimestamp;
@Nullable
@Override
public Watermark getCurrentWatermark() {
Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(String.format("invoke getCurrentWatermark at %s and watermark is: %s",
System.currentTimeMillis(), sdf.format(currentTimestamp)));
return new Watermark(currentTimestamp);
}
@Override
public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
long timestamp = ((DateTime) element.get("timestamp")).getMillis();
Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("invoke extractTimestamp: " + sdf.format(timestamp));
currentTimestamp = timestamp;
return timestamp;
}
}
/**
* AssignerWithPunctuatedWatermarks demo.
*/
public static class CustomAssignerWithPunctuatedWatermarks implements AssignerWithPunctuatedWatermarks<JSONObject> {
private long currentTimestamp;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp) {
Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(String.format("invoke getCurrentWatermark at %s and watermark is: %s",
System.currentTimeMillis(), sdf.format(currentTimestamp)));
return new Watermark(currentTimestamp);
}
@Override
public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
long timestamp = ((DateTime) element.get("timestamp")).getMillis();
Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("invoke extractTimestamp: " + sdf.format(timestamp));
currentTimestamp = timestamp;
return timestamp;
}
}
}
先分別看下AssignerWithPriodicWatermarks和AssignerWithPunctuatedWatermarks部分吧:
/**
* AssignerWithPeriodicWatermarks demo
*/
public static class CustomAssignerWithPeriodicWatermarks implements AssignerWithPeriodicWatermarks<JSONObject> {
private long currentTimestamp;
@Nullable
@Override
public Watermark getCurrentWatermark() {
// 省略一些邏輯
return new Watermark(currentTimestamp);
}
@Override
public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
// 省略一些邏輯
return timestamp;
}
}
/**
* AssignerWithPunctuatedWatermarks demo.
*/
public static class CustomAssignerWithPunctuatedWatermarks implements AssignerWithPunctuatedWatermarks<JSONObject> {
private long currentTimestamp;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp) {
// 省略一些邏輯
return new Watermark(currentTimestamp);
}
@Override
public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
// 省略一些邏輯
return timestamp;
}
}
為了突出重點(diǎn)唯蝶,我刪掉了具體實(shí)現(xiàn)九秀。可以看到這兩個(gè)類都有一個(gè)extractTimestamp方法粘我,這個(gè)方法每個(gè)Event都會(huì)調(diào)用鼓蜒,作用就是給Event賦一個(gè)Event Time。另外一個(gè)方法稍微有點(diǎn)差異征字,AssignerWithPeriodicWatermarks的方法叫g(shù)etCurrentWatermark()都弹,而AssignerWithPunctuatedWatermarks的方法是checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp),它們的主要區(qū)別是方法的調(diào)用機(jī)制:
- getCurrentWatermark()沒有參數(shù)匙姜,它是框架根據(jù)用戶設(shè)置的固定時(shí)間周期性的調(diào)用畅厢。這個(gè)固定的時(shí)間可以通過以下方式設(shè)置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();
executionConfig.setAutoWatermarkInterval(500);
上面的代碼設(shè)置每500毫秒調(diào)用一次getCurrentWatermark(),即每500毫秒產(chǎn)生一個(gè)Watermark搁料。不顯式的設(shè)置的話或详,默認(rèn)值是0,但實(shí)際效果是每200ms調(diào)用一次郭计。
- checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp)有兩個(gè)參數(shù):一個(gè)是event霸琴,一個(gè)是extractTimestamp方法返回的時(shí)間戳。這個(gè)方法被調(diào)用的時(shí)間點(diǎn)是:每個(gè)事件來了先調(diào)用extractTimestamp昭伸,然后馬上調(diào)用checkAndGetNextWatermark梧乘。在checkAndGetNextWatermark中你可以通過返回值控制是否產(chǎn)生新的Watermark,如果你不想返回新的Watermark庐杨,可以返回null或者一個(gè)小于等于上一個(gè)Watermark的時(shí)間戳选调,這樣就相當(dāng)于本次不返回Watermark或者返回的Watermark不是遞增的被丟棄了,繼續(xù)使用原來的Watermark灵份。因?yàn)閃atermark不能為null仁堪,且必須單調(diào)遞增。
AssignerWithPriodicWatermarks和AssignerWithPunctuatedWatermarks的區(qū)別就這些填渠,最佳實(shí)踐的話我個(gè)人覺得優(yōu)先考慮AssignerWithPriodicWatermarks弦聂,如果不能滿足需求,再考慮AssignerWithPunctuatedWatermarks氛什。一方面是前者簡單一些莺葫,另一方面是一般沒有必要每個(gè)事件就計(jì)算一個(gè)Watermark,這樣會(huì)增加不是很有必要的計(jì)算量枪眉。
遲到數(shù)據(jù)
從上面的部分看到event3因?yàn)檫t到被默默的丟掉了捺檬,現(xiàn)實(shí)中數(shù)據(jù)是重要資產(chǎn),肯定是不能隨便丟棄的贸铜。Flink提供了兩種解決方案:
- 允許一定的延遲堡纬。這個(gè)延遲可以在兩個(gè)地方設(shè)置:第一種是可以在上面的AssignerWithXXXWatermarks方法里面給計(jì)算出的時(shí)間戳減去一個(gè)時(shí)間聂受,這個(gè)時(shí)間就是你允許延遲的時(shí)間。第二種就是在時(shí)間窗口那里可以通過allowedLateness來設(shè)置一個(gè)允許的延遲時(shí)間隐轩,
但允許一定延遲的方式只能治標(biāo)饺饭,不能治本渤早。我們只能根據(jù)實(shí)際情況允許一定限度的延遲职车,但總歸是有個(gè)限度的,原因主要有兩個(gè):1)延遲太高會(huì)喪失實(shí)時(shí)性鹊杖,如果你的場景對(duì)實(shí)時(shí)性要求比較高悴灵,那就無法設(shè)置太大的延遲。2)延遲實(shí)際是延長了窗口的生命周期骂蓖,所以資源消耗會(huì)增加积瞒。
- 在Window那里通過sideOutputLateData將遲到的數(shù)據(jù)以流的形式旁路出去。這個(gè)是治本的手段登下,它沒有時(shí)間的限制茫孔,如果有遲到數(shù)據(jù),就會(huì)發(fā)送到這個(gè)單獨(dú)的流里面去被芳,然后可以為這個(gè)流單獨(dú)設(shè)置處理方式缰贝。