Flink Time和Watermark的理解
1. Time
背景
在實際開發(fā)過程中,我們可能需要接入各種流數據源羽利,比如在線業(yè)務用戶點擊流數據、監(jiān)控系實時收集到的事件流數據、從傳感器采集到的實時數據檀蹋,等等,為了處理方便他們可能會寫入Kafka消息中間件集群中某個/某些topic中云芦,或者選擇其它的緩沖/存儲系統(tǒng)俯逾。這些數據源中數據元素具有固定的時間屬性,是在流數據處理系統(tǒng)之外的其它系統(tǒng)生成的舅逸。比如桌肴,上億用戶通過手機終端操作觸發(fā)生成的事件數據,都具有對應的事件時間琉历;再特殊一點坠七,可能我們希望回放(Replay)上一年手機終端用戶的歷史行為數據,與當前某個流數據集交叉分析才能夠得到支持某類業(yè)務的特定結果旗笔,這種情況下彪置,基于數據所具有的事件時間進行處理,就具有很重要的意義了蝇恶。
下面拳魁,我們先從Flink支持的3個與流數據處理相關的時間概念(Time Notion):ProcessTime、EventTime撮弧、IngestionTime潘懊。有些系統(tǒng)對時間概念的抽象有其它叫法姚糊,比如,Google Cloud Dataflow中稱為時間域(Time Domain)授舟。在Flink中救恨,基于不同的Time Notion來處理流數據,具有不同的意義和結果释树,所以了解這3個Time Notion非常關鍵忿薇。
Time Notion
我們先看下,Apache Flink官網文檔給出的一張概念圖躏哩,非常形象地展示了Process Time署浩、Event Time、Ingestion Time這三個時間分別所處的位置扫尺,如下圖所示:
下面筋栋,分別對這3個Time Notion進行說明如下:
ProcessTime--事件被處理時當前系統(tǒng)的時間
Flink中有對數據處理的操作進行抽象,稱為Transformation Operator正驻,而對于整個Dataflow的開始和結束分別對應著Source Operator和Sink Operator弊攘,這些Operator都是在Flink集群系統(tǒng)所在的主機節(jié)點上,所以在基于ProcessTime的Notion進行與時間相關的數據處理時姑曙,數據處理依賴于Flink程序運行所在的主機節(jié)點系統(tǒng)時鐘(System Clock)襟交。
因為我們關心的是數據處理時間(Process Time),比如進行Time Window操作伤靠,對Window的指派就是基于當前Operator所在主機節(jié)點的系統(tǒng)時鐘捣域。也就是說,每次創(chuàng)建一個Window宴合,計算Window對應的起始時間和結束時間都使用Process Time焕梅,它與外部進入的數據元素的事件時間無關。那么卦洽,后續(xù)作用于Window的操作(Function)都是基于具有Process Time特性的Window進行的贞言。
使用ProcessTime的場景,比如阀蒂,我們需要對某個App應用的用戶行為進行實時統(tǒng)計分析與監(jiān)控该窗,由于用戶可能使用不同的終端設備,這樣可能會造成數據并非是實時的(如用戶手機沒電蚤霞,導致2小時以后才會將操作行為記錄批量上傳上來)酗失。而此時,如果我們按照每分鐘的時間粒度做實時統(tǒng)計監(jiān)控争便,那么這些數據記錄延遲的太嚴重级零,如果為了等到這些記錄上傳上來(無法預測,具體什么時間能獲取到這些數據)再做統(tǒng)計分析,對每分鐘之內的數據進行統(tǒng)計分析的結果恐怕要到幾個小時甚至幾天后才能計算并輸出結果奏纪,這不是我們所希望的鉴嗤。而且,數據處理系統(tǒng)可能也沒有這么大的容量來處理海量數據的情況序调。結合業(yè)務需求醉锅,其實我們只需要每分鐘時間內進入的數據記錄,依賴當前數據處理系統(tǒng)的處理時間(Process Time)生成每分鐘的Window发绢,指派數據記錄到指定Window并計算結果硬耍,這樣就不用考慮數據元素本身自帶的事件時間了。
EventTime--事件產生的時間边酒,它通常由事件中的時間戳描述
流數據中的數據元素可能會具有不變的事件時間(Event Time)屬性经柴,該事件時間是數據元素所代表的行為發(fā)生時就不會改變。最簡單的情況下墩朦,這也最容易理解:所有進入到Flink處理系統(tǒng)的流數據坯认,都是在外部的其它系統(tǒng)中產生的,它們產生后具有了事件時間氓涣,經過傳輸后牛哺,進入到Flink處理系統(tǒng),理論上(如果所有系統(tǒng)都具有相同系統(tǒng)時鐘)該事件時間對應的時間戳要早于進入到Flink處理系統(tǒng)中進行處理的時間戳劳吠,但實際應用中會出現數據記錄亂序引润、延遲到達等問題,這也是非常普遍的痒玩。
基于EventTime的Notion淳附,處理數據的進度(Progress)依賴于數據本身,而不是當前Flink處理系統(tǒng)中Operator所在主機節(jié)點的系統(tǒng)時鐘凰荚。所以燃观,需要有一種機制能夠控制數據處理的進度褒脯,比如一個基于事件時間的Time Window創(chuàng)建后便瑟,具體怎么確定屬于該Window的數據元素都已經到達?如果確定都到達了番川,然后就可以對屬于這個Window的所有數據元素做滿足需要的處理(如匯總到涂、分組等)。這就要用到WaterMark機制颁督,它能夠衡量數據處理進度(表達數據到達的完整性)践啄。
WaterMark帶有一個時間戳,假設為X沉御,進入到數據處理系統(tǒng)中的數據元素具有事件時間屿讽,記為Y,如果Y<X,則所有的數據元素均已到達伐谈,可以計算并輸出結果烂完。反過來說,可能更容易理解一些:要想觸發(fā)對當前Window中的數據元素進行計算诵棵,必須保證對所有進入到系統(tǒng)的數據元素抠蚣,其事件時間Y>=X。如果數據元素的事件時間是有序的履澳,那么當出現一個數據元素的事件時間Y<X嘶窄,則觸發(fā)對當前Window計算,并創(chuàng)建另一個新的Window來指派事件時間Y<X的數據元素到該新的Window中距贷。
可以看到柄冲,有了WaterMark機制,對基于事件時間的流數據處理會變得特別靈活忠蝗,可以根據實際業(yè)務需要選擇各種組件和處理策略羊初。比如,上面我們說到什湘,當Y<X則觸發(fā)當前Window計算长赞,記為t1時刻,如果流數據元素是亂序的闽撤,經過一段時間得哆,假設t2時刻有一個數據元素的事件時間Y>=X,這時該怎么辦呢哟旗?如果t1時刻的Window已經不存在了贩据,但我們還是希望新出現的亂序數據元素加入到t1時刻Window的計算中,這時可以實現自定義的Trigger來滿足各種業(yè)務場景的需要闸餐。
IngestionTime--事件進入Flink的時間
IngestionTime是數據進入到Flink流數據處理系統(tǒng)的時間饱亮,該時間依賴于Source Operator所在主機節(jié)點的系統(tǒng)時鐘,會為到達的數據記錄指派Ingestion Time舍沙〗希基于IngestionTime的Notion,存在多個Source Operator的情況下拂铡,每個Source Operator會使用自己本地系統(tǒng)時鐘指派Ingestion Time壹无。后續(xù)基于時間相關的各種操作,都會使用數據記錄中的Ingestion Time感帅。
與EventTime相比斗锭,IngestionTime不能處理亂序、延遲到達事件的應用場景失球,它也就不用必須指定如何生成WaterMark岖是。
設定時間特性
Flink DataStream 程序的第一部分通常是設置基本時間特性。 該設置定義了數據流源的行為方式(例如:它們是否將分配時間戳),以及像 **KeyedStream.timeWindow(Time.seconds(30)) ** 這樣的窗口操作應該使用上面哪種時間概念豺撑。
以下示例顯示了一個 Flink 程序作箍,該程序在每小時時間窗口中聚合事件。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 其他
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
stream
.keyBy( (event) -> event.getUser() )
.timeWindow(Time.hours(1))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
2. Watermark
Watermark的類型
EventTime和Watermarks
在使用eventTime的時候如何處理亂序數據前硫?
我們知道胞得,流處理從事件產生,到流經source屹电,再到operator阶剑,中間是有一個過程和時間的。雖然大部分情況下危号,流到operator的數據都是按照事件產生的時間順序來的牧愁,但是也不排除由于網絡延遲等原因,導致亂序的產生外莲,特別是使用kafka的話猪半,多個分區(qū)的數據無法保證有序。所以在進行window計算的時候偷线,我們又不能無限期的等下去磨确,必須要有個機制來保證一個特定的時間后,必須觸發(fā)window去進行計算了声邦。這個特別的機制乏奥,就是watermark,watermark是用于處理亂序事件的亥曹。
watermark可以翻譯為水位線
有序的流的watermarks
無序的流的watermarks
多并行度流的watermarks
注意:多并行度的情況下邓了,watermark對齊會取所有channel最小的watermark
在Apache Flink中使用watermark的4個理解
當人們第一次使用Flink時,經常會對watermark感到困惑媳瞪。但其實watermark并不復雜骗炉。讓我們通過一個簡單的例子來說明為什么我們需要watermark,以及它的工作機制是什么樣的蛇受。
在下文中的例子中句葵,我們有一個帶有時間戳的事件流,但是由于某種原因它們并不是按順序到達的龙巨。圖中的數字代表事件發(fā)生的時間戳笼呆。第一個到達的事件發(fā)生在時間4,然后它后面跟著的是發(fā)生在更早時間(時間2)的事件旨别,以此類推:
注意這是一個按照事件時間處理的例子,這意味著時間戳反映的是事件發(fā)生的時間汗茄,而不是處理事件的時間秸弛。事件時間(Event-Time)處理的強大之處在于,無論是在處理實時的數據還是重新處理歷史的數據,基于事件時間創(chuàng)建的流計算應用都能保證結果是一樣的递览。
現在假設我們正在嘗試創(chuàng)建一個流計算排序算子叼屠。也就是處理一個亂序到達的事件流,并按照事件時間的順序輸出事件绞铃。
理解1
數據流中的第一個元素的時間是4镜雨,但是我們不能直接將它作為排序后數據流的第一個元素并輸出它。因為數據是亂序到達的儿捧,也許有一個更早發(fā)生的數據還沒有到達荚坞。事實上,我們能預見一些這個流的未來菲盾,也就是我們的排序算子至少要等到2這條數據的到達再輸出結果颓影。
有緩存,就必然有延遲懒鉴。
理解2
如果我們做錯了诡挂,我們可能會永遠等待下去。首先临谱,我們的應用程序從看到時間4的數據璃俗,然后看到時間2的數據。是否會有一個比時間2更早的數據到達呢悉默?也許會旧找,也許不會。我們可以一直等下去麦牺,但可能永遠看不到1钮蛛。
最終,我們必須勇敢地輸出 2 作為排序流的第一個結果
理解3
我們需要的是某種策略剖膳,它定義了對于任何帶時間戳的事件流魏颓,何時停止等待更早數據的到來。
這正是 watermark 的作用吱晒,他們定義了何時不再等待更早的數據甸饱。
Flink中的事件時間處理依賴于一種特殊的帶時間戳的元素,成為watermark仑濒,它們會由數據源或是watermark生成器插入數據流中叹话。具有時間戳t的watermark可以被理解為斷言了所有時間戳小于或等于t的事件都(在某種合理的概率上)已經到達了。
注:此處原文是“小于”墩瞳,譯者認為應該是 “小于或等于”驼壶,因為 Flink 源碼中采用的是 “小于或等于” 的機制。
何時我們的排序算子應該停止等待喉酌,然后將事件2作為首個元素輸出热凹?答案是當收到時間戳為2(或更大)的watermark時泵喘。
理解4
我們可以設想不同的策略來生成watermark。
我們知道每個事件都會延遲一段時間才到達般妙,而這些延遲差異會比較大纪铺,所以有些事件會比其他事件延遲更多。一種簡單的方法是假設這些延遲不會超過某個最大值碟渺。Flink 把這種策略稱作 “有界無序生成策略”(bounded-out-of-orderness)鲜锚。當然也有很多更復雜的方式去生成watermark,但是對于大多數應用來說苫拍,固定延遲的方式已經足夠了芜繁。
如果想要構建一個類似排序的流應用,可以使用Flink的ProcessFunction怯疤。它提供了對事件時間計時器(基于watermark觸發(fā)回調)的訪問浆洗,還提供了可以用來緩存數據的托管狀態(tài)接口。
Watermark案例
1.watermarks的生成方式
通常集峦,在接收到source的數據后伏社,應該立刻生成watermark;但是塔淤,也可以在source后摘昌,應用簡單的map或者filter操作后,再生成watermark高蜂。
注意:如果指定多次watermark聪黎,后面指定的會覆蓋前面的值。
生成方式
-
With Periodic Watermarks
周期性的觸發(fā)watermark的生成和發(fā)送备恤,默認是100ms
每隔N秒自動向流里注入一個WATERMARK
時間間隔由ExecutionConfig.setAutoWatermarkInterval 決定.
每次調用getCurrentWatermark 方法, 如果得到的WATERMARK
不為空并且比之前的大就注入流中
可以定義一個最大允許亂序的時間稿饰,這種比較常用
實現AssignerWithPeriodicWatermarks接口
-
With Punctuated Watermarks
基于某些事件觸發(fā)watermark的生成和發(fā)送
基于事件向流里注入一個WATERMARK,每一個元素都有機會判斷是否生成一個WATERMARK.
如果得到的WATERMARK 不為空并且比之前的大就注入流中
實現AssignerWithPunctuatedWatermarks接口
2.watermark和window案例
這里寫了一個watermark&window的flink程序露泊,從socket讀取數據
代碼:
public class StreamingWindowWatermark {
private static final Logger log = LoggerFactory.getLogger(StreamingWindowWatermark.class);
public static void main(String[] args) throws Exception {
//定義socket的端口號
int port = 9000;
//獲取運行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設置使用eventtime喉镰,默認是使用processtime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設置并行度為1,默認并行度是當前機器的cpu數量
env.setParallelism(1);
//連接socket獲取輸入的數據
DataStream<String> text = env.socketTextStream("zzy", port, "\n");
//解析輸入的數據,每行數據按逗號分隔
DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] arr = value.split(",");
return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
}
});
//抽取timestamp和生成watermark
DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
Long currentMaxTimestamp = 0L;
final Long maxOutOfOrderness = 10000L;// 最大允許的亂序時間是10s
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
/**
* 定義生成watermark的邏輯,比當前最大時間戳晚10s
* 默認100ms被調用一次
*/
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
//定義如何提取timestamp
@Override
public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
long timestamp = element.f1;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
//設置多并行度時獲取線程id
long id = Thread.currentThread().getId();
log.info("extractTimestamp=======>" + ",currentThreadId:" + id + ",key:" + element.f0 + ",eventtime:[" + element.f1 + "|" + sdf.format(element.f1) + "]," +
"currentMaxTimestamp:[" + currentMaxTimestamp + "|" +
sdf.format(currentMaxTimestamp) + "],watermark:[" + getCurrentWatermark().getTimestamp() + "|" + sdf.format(getCurrentWatermark().getTimestamp()) + "]");
// System.out.println("currentThreadId:" + id + ",key:" + element.f0 + ",eventtime:[" + element.f1 + "|" + sdf.format(element.f1) + "],currentMaxTimestamp:[" + currentMaxTimestamp + "|" +
// sdf.format(currentMaxTimestamp) + "],watermark:[" + getCurrentWatermark().getTimestamp() + "|" + sdf.format(getCurrentWatermark().getTimestamp()) + "]");
return timestamp;
}
});
DataStream<String> window = waterMarkStream.keyBy(0)//分組
.window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口惭笑,和調用TimeWindow效果一樣
.apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
/**
* 對window內的數據進行排序侣姆,保證數據的順序
* @param tuple
* @param window
* @param input
* @param out
* @throws Exception
*/
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
String key = tuple.toString();
List<Long> arrarList = new ArrayList<Long>();
Iterator<Tuple2<String, Long>> it = input.iterator();
while (it.hasNext()) {
Tuple2<String, Long> next = it.next();
//時間戳放到了arrarList里
arrarList.add(next.f1);
}
Collections.sort(arrarList);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))
+ "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());
out.collect(result);
}
});
//測試-把結果打印到控制臺即可
window.print();
//注意:因為flink是懶加載的,所以必須調用execute方法沉噩,上面的代碼才會執(zhí)行
env.execute("eventtime-watermark");
}
}
啟動程序StreamingWindowWatermark
打印日志:
2019-02-14 11:57:36,715 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction] [INFO] - Connecting to server socket zzy:9000
2019-02-14 11:57:36,741 [Window(TumblingEventTimeWindows(3000), EventTimeTrigger, WindowFunction$3) -> Sink: Print to Std. Out (1/1)] [org.apache.flink.runtime.state.heap.HeapKeyedStateBackend] [INFO] - Initializing heap keyed state backend with stream factory.
首先捺宗,我們開啟socket,輸入第一條數據川蒙,數據格式是(id,時間戳):
? /data nc -l 9000
0001,1550116440000
輸出如下:
019-02-14 11:58:48,690 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116440000|2019-02-14 11:54:00.000],currentMaxTimestamp:[1550116440000|2019-02-14 11:54:00.000],watermark:[1550116430000|2019-02-14 11:53:50.000]
匯總下表:
此時蚜厉,wartermark的時間按照邏輯,已經落后于currentMaxTimestamp10秒了派歌。
我們繼續(xù)輸入:
0001,1550116444000
輸出內容如下:
2019-02-14 12:08:25,474 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116444000|2019-02-14 11:54:04.000],currentMaxTimestamp:[1550116444000|2019-02-14 11:54:04.000],watermark:[1550116434000|2019-02-14 11:53:54.000]
再次匯總表:
繼續(xù)輸入:
0001,1550116450000
輸出內容如下:
2019-02-14 14:30:27,480 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116450000|2019-02-14 11:54:10.000],currentMaxTimestamp:[1550116450000|2019-02-14 11:54:10.000],watermark:[1550116440000|2019-02-14 11:54:00.000]
匯總下表:
到這里弯囊,window仍然沒有被觸發(fā)痰哨,此時watermark的時間已經等于了第一條數據的Event Time了胶果。那么window到底什么時候被觸發(fā)呢匾嘱?我們再次輸入:
0001,1550116451000
輸出內容如下:
2019-02-14 14:36:01,479 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116451000|2019-02-14 11:54:11.000],currentMaxTimestamp:[1550116451000|2019-02-14 11:54:11.000],watermark:[1550116441000|2019-02-14 11:54:01.000]
匯總如下:
可以看到window仍然沒有觸發(fā),此時早抠,我們的數據已經發(fā)到2019-02-14 11:54:11.000了霎烙,最早的數據已經過去了11秒了,還沒有開始計算蕊连。那是不是要等到13(10+3)秒過去了悬垃,才開始觸發(fā)window呢?答案是否定的甘苍。
我們再次增加1秒尝蠕,輸入:
0001,1550116452000
輸出內容如下:
2019-02-14 14:40:50,332 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116452000|2019-02-14 11:54:12.000],currentMaxTimestamp:[1550116452000|2019-02-14 11:54:12.000],watermark:[1550116442000|2019-02-14 11:54:02.000]
匯總如下:
Window依舊沒有觸發(fā)
我們再次增加1s,輸入:
0001,1550116453000
輸出內容如下:
2019-02-14 14:51:10,020 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116453000|2019-02-14 11:54:13.000],currentMaxTimestamp:[1550116453000|2019-02-14 11:54:13.000],watermark:[1550116443000|2019-02-14 11:54:03.000]
(0001),1,2019-02-14 11:54:00.000,2019-02-14 11:54:00.000,2019-02-14 11:54:00.000,2019-02-14 11:54:03.000
可以看到觸發(fā)了window操作载庭,打印數據到控制臺了
String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))
+ "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());
out.collect(result);
匯總如下:
到這里看彼,我們做一個說明:
window的觸發(fā)機制瞒大,是先按照自然時間將window劃分殴边,如果window大小是3秒,那么1分鐘內會把window劃分為如下的形式(注意window是左閉右開的):
[00:00:00,00:00:03)
[00:00:03,00:00:06)
...
[00:00:57,00:01:00)
如果window大小是10秒捂蕴,則window會被分為如下的形式:
[00:00:00,00:00:10)
[00:00:10,00:00:20)
...
[00:00:50,00:01:00)
window的設定無關數據本身顽铸,而是系統(tǒng)定義好了的茁计。
輸入的數據中,根據自身的Event Time谓松,將數據劃分到不同的window中星压,如果window中有數據,則當watermark時間>=Event Time時鬼譬,就符合了window觸發(fā)的條件了娜膘,最終決定window觸發(fā),還是由數據本身的Event Time所屬的window中的window_end_time決定拧簸。
上面的測試中劲绪,最后一條數據到達后,其水位線已經升至19:34:24秒盆赤,正好是最早的一條記錄所在window的window_end_time贾富,所以window就被觸發(fā)了。
為了驗證window的觸發(fā)機制牺六,我們繼續(xù)輸入數據:
0001,1550116455000
輸出內容如下:
2019-02-14 15:00:58,535 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116455000|2019-02-14 11:54:15.000],currentMaxTimestamp:[1550116455000|2019-02-14 11:54:15.000],watermark:[1550116445000|2019-02-14 11:54:05.000]
匯總表:
此時颤枪,watermark時間雖然已經達到了第二條數據的時間,但是由于其沒有達到第二條數據所在window的結束時間淑际,所以window并沒有被觸發(fā)畏纲。那么扇住,第二條數據所在的window時間是:
[2019/2/14 11:54:03, 2019/2/14 11:54:06)
也就是說,我們必須輸入一個11:54:06秒的數據盗胀,第二條數據所在的window才會被觸發(fā)艘蹋。
我們繼續(xù)輸入:
0001,1550116456000
輸出如下:
2019-02-14 15:07:48,879 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116456000|2019-02-14 11:54:16.000],currentMaxTimestamp:[1550116456000|2019-02-14 11:54:16.000],watermark:[1550116446000|2019-02-14 11:54:06.000]
(0001),1,2019-02-14 11:54:04.000,2019-02-14 11:54:04.000,2019-02-14 11:54:03.000,2019-02-14 11:54:06.000
可以看到是有觸發(fā)windows操作的
匯總:
下面劃重點了
watermark觸發(fā)條件
此時,我們已經看到票灰,window的觸發(fā)要符合以下幾個條件:
1女阀、watermark時間 >= window_end_time
2、在[window_start_time,window_end_time)中有數據存在
同時滿足了以上2個條件屑迂,window才會觸發(fā)浸策。
而且,這里要強調一點惹盼,watermark是一個全局的值庸汗,不是某一個key下的值,所以即使不是同一個key的數據手报,其warmark也會增加蚯舱,例如:
0002,1550116458000
輸出如下:
2019-02-14 15:22:04,219 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:39,key:0002,eventtime:[1550116458000|2019-02-14 11:54:18.000],currentMaxTimestamp:[1550116458000|2019-02-14 11:54:18.000],watermark:[1550116448000|2019-02-14 11:54:08.000]
我們看到昧诱,currentMaxTimestamp也增加到2019-02-14 11:54:08.000了晓淀。
watermark+window處理亂序
我們上面的測試,數據都是按照時間順序遞增的盏档,現在凶掰,我們輸入一些亂序的(late)數據,看看watermark結合window機制蜈亩,是如何處理亂序的懦窘。
輸入:
0001,1550116440000
0001,1550116441000
0001,1550116442000
0001,1550116443000
0001,1550116444000
0001,1550116445000
0001,1550116446000
0001,1550116450000
0001,1550116451000
0001,1550116452000
0001,1550116453000
0001,1550116456000
0001,1550116460000
0001,1550116461000
0001,1550116462000
0001,1550116464000
輸出如下:
2019-02-14 15:34:49,469 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116452000|2019-02-14 11:54:12.000],currentMaxTimestamp:[1550116452000|2019-02-14 11:54:12.000],watermark:[1550116442000|2019-02-14 11:54:02.000]
2019-02-14 15:34:50,276 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116453000|2019-02-14 11:54:13.000],currentMaxTimestamp:[1550116453000|2019-02-14 11:54:13.000],watermark:[1550116443000|2019-02-14 11:54:03.000]
(0001),3,2019-02-14 11:54:00.000,2019-02-14 11:54:02.000,2019-02-14 11:54:00.000,2019-02-14 11:54:03.000
2019-02-14 15:35:05,916 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116456000|2019-02-14 11:54:16.000],currentMaxTimestamp:[1550116456000|2019-02-14 11:54:16.000],watermark:[1550116446000|2019-02-14 11:54:06.000]
(0001),3,2019-02-14 11:54:03.000,2019-02-14 11:54:05.000,2019-02-14 11:54:03.000,2019-02-14 11:54:06.000
2019-02-14 15:35:17,804 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116460000|2019-02-14 11:54:20.000],currentMaxTimestamp:[1550116460000|2019-02-14 11:54:20.000],watermark:[1550116450000|2019-02-14 11:54:10.000]
2019-02-14 15:35:17,804 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116461000|2019-02-14 11:54:21.000],currentMaxTimestamp:[1550116461000|2019-02-14 11:54:21.000],watermark:[1550116451000|2019-02-14 11:54:11.000]
2019-02-14 15:35:17,804 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116462000|2019-02-14 11:54:22.000],currentMaxTimestamp:[1550116462000|2019-02-14 11:54:22.000],watermark:[1550116452000|2019-02-14 11:54:12.000]
(0001),1,2019-02-14 11:54:06.000,2019-02-14 11:54:06.000,2019-02-14 11:54:06.000,2019-02-14 11:54:09.000
(0001),2,2019-02-14 11:54:10.000,2019-02-14 11:54:11.000,2019-02-14 11:54:09.000,2019-02-14 11:54:12.000
2019-02-14 15:35:48,356 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116464000|2019-02-14 11:54:24.000],currentMaxTimestamp:[1550116464000|2019-02-14 11:54:24.000],watermark:[1550116454000|2019-02-14 11:54:14.000]
再輸入:
0001,1550116454000
輸出如下:
2019-02-14 15:40:41,051 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116454000|2019-02-14 11:54:14.000],currentMaxTimestamp:[1550116464000|2019-02-14 11:54:24.000],watermark:[1550116454000|2019-02-14 11:54:14.000]
匯總:
可以看到,雖然我們輸入了一個2019/2/14 11:54:14的數據稚配,但是currentMaxTimestamp和watermark都沒變畅涂。
此時,按照我們上面提到的公式:
1道川、watermark時間 >= window_end_time
2午衰、在[window_start_time,window_end_time)中有數據存在
那如果我們再次輸入一條2019/2/14 11:54:25的數據,此時watermark時間會升高到19:34:33冒萄,這時的window一定就會觸發(fā)了臊岸,我們試一試:
輸入:
0001,1550116465000
輸出如下:
2019-02-14 15:48:07,322 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark] [INFO] - extractTimestamp=======>,currentThreadId:37,key:0001,eventtime:[1550116465000|2019-02-14 11:54:25.000],currentMaxTimestamp:[1550116465000|2019-02-14 11:54:25.000],watermark:[1550116455000|2019-02-14 11:54:15.000]
(0001),3,2019-02-14 11:54:12.000,2019-02-14 11:54:14.000,2019-02-14 11:54:12.000,2019-02-14 11:54:15.000
可以看到觸發(fā)了window操作,打印了2019/2/14 11:54:14這條數據
匯總:
上邊的結果尊流,已經表明帅戒,對于out-of-order的數據,Flink可以通過watermark機制結合window的操作崖技,來處理一定范圍內的亂序數據逻住。那么對于“遲到”太多的數據钟哥,Flink是怎么處理的呢?
late element的處理
運行代碼:StreamingWindowWatermark2
public class StreamingWindowWatermark2 {
private static final Logger log = LoggerFactory.getLogger(StreamingWindowWatermark2.class);
public static void main(String[] args) throws Exception {
//定義socket的端口號
int port = 9000;
//獲取運行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設置使用eventtime瞎访,默認是使用processtime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設置并行度為1,默認并行度是當前機器的cpu數量
env.setParallelism(1);
//連接socket獲取輸入的數據
DataStream<String> text = env.socketTextStream("zzy", port, "\n");
//解析輸入的數據
DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] arr = value.split(",");
return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
}
});
//抽取timestamp和生成watermark
DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
Long currentMaxTimestamp = 0L;
final Long maxOutOfOrderness = 10000L;// 最大允許的亂序時間是10s--亂序時間
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
/**
* 定義生成watermark的邏輯
* 默認100ms被調用一次
*/
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
//定義如何提取timestamp
@Override
public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
long timestamp = element.f1;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
log.info("key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+
sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");
// System.out.println("key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+
// sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");
return timestamp;
}
});
//保存被丟棄的數據
OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};
//注意腻贰,由于getSideOutput方法是SingleOutputStreamOperator子類中的特有方法,所以這里的類型装诡,不能使用它的父類dataStream银受。
SingleOutputStreamOperator<String> window = waterMarkStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口践盼,和調用TimeWindow效果一樣
//.allowedLateness(Time.seconds(2))//允許數據遲到2秒--延遲時間
.sideOutputLateData(outputTag)
.apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
/**
* 對window內的數據進行排序鸦采,保證數據的順序
* @param tuple
* @param window
* @param input
* @param out
* @throws Exception
*/
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
String key = tuple.toString();
List<Long> arrarList = new ArrayList<Long>();
Iterator<Tuple2<String, Long>> it = input.iterator();
while (it.hasNext()) {
Tuple2<String, Long> next = it.next();
arrarList.add(next.f1);
}
Collections.sort(arrarList);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String result = "key:" + key + ",size:" + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))
+ "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());
out.collect(result);
}
});
//window.getSideOutput獲取遲到的數據,把遲到的數據暫時打印到控制臺咕幻,實際中可以保存到其他存儲介質中
DataStream<Tuple2<String, Long>> sideOutput = window.getSideOutput(outputTag);
sideOutput.flatMap(new FlatMapFunction<Tuple2<String,Long>, Tuple2<String,String>>() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Override
public void flatMap(Tuple2<String, Long> stringLongTuple2, Collector<Tuple2<String, String>> collector) throws Exception {
collector.collect(new Tuple2<>(stringLongTuple2.f0,"eventtime:" + stringLongTuple2.f1 + "|"
+ sdf.format(stringLongTuple2.f1)));
}
}).print();
// sideOutput.print();
//測試-把結果打印到控制臺即可
window.print();
//注意:因為flink是懶加載的渔伯,所以必須調用execute方法,上面的代碼才會執(zhí)行
env.execute("eventtime-watermark-late-data");
}
}
我們輸入一個亂序很多的數據來測試下:
輸入:
? /data nc -l 9000
0001,1550116440000
0001,1550116443000
0001,1550116444000
0001,1550116445000
0001,1550116446000
0001,1550116450000
0001,1550116451000
0001,1550116452000
0001,1550116453000
0001,1550116441000
0001,1550116454000
0001,1550116455000
0001,1550116455000
0001,1550116457000
0001,1550116458000
輸出如下:
2019-02-14 16:34:27,881 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark2] [INFO] - key:0001,eventtime:[1550116455000|2019-02-14 11:54:15.000],currentMaxTimestamp:[1550116455000|2019-02-14 11:54:15.000],watermark:[1550116445000|2019-02-14 11:54:05.000]
2019-02-14 16:34:27,881 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark2] [INFO] - key:0001,eventtime:[1550116455000|2019-02-14 11:54:15.000],currentMaxTimestamp:[1550116455000|2019-02-14 11:54:15.000],watermark:[1550116445000|2019-02-14 11:54:05.000]
2019-02-14 16:34:27,882 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark2] [INFO] - key:0001,eventtime:[1550116457000|2019-02-14 11:54:17.000],currentMaxTimestamp:[1550116457000|2019-02-14 11:54:17.000],watermark:[1550116447000|2019-02-14 11:54:07.000]
key:(0001),size:3,2019-02-14 11:54:03.000,2019-02-14 11:54:05.000,2019-02-14 11:54:03.000,2019-02-14 11:54:06.000
2019-02-14 16:34:28,420 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark2] [INFO] - key:0001,eventtime:[1550116458000|2019-02-14 11:54:18.000],currentMaxTimestamp:[1550116458000|2019-02-14 11:54:18.000],watermark:[1550116448000|2019-02-14 11:54:08.000]
輸入數據:
0001,1550116447000
0001,1550116446000
輸出如下:
2019-02-14 16:35:25,902 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark2] [INFO] - key:0001,eventtime:[1550116447000|2019-02-14 11:54:07.000],currentMaxTimestamp:[1550116458000|2019-02-14 11:54:18.000],watermark:[1550116448000|2019-02-14 11:54:08.000]
2019-02-14 16:39:11,450 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark2] [INFO] - key:0001,eventtime:[1550116446000|2019-02-14 11:54:06.000],currentMaxTimestamp:[1550116458000|2019-02-14 11:54:18.000],watermark:[1550116448000|2019-02-14 11:54:08.000]
沒有觸發(fā)window
550116446000|2019-02-14 11:54:06.000 對應的window是
[2019-02-14 11:54:06.000, 2019-02-14 11:54:09.000)
而現在的watermark是2019-02-14 11:54:08.000 比2019-02-14 11:54:09.000小肄程,輸入eventtime是1550116445000|2019-02-14 11:54:05.000的事件
輸入:
0001,1550116445000
輸出:
2019-02-14 16:40:14,721 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark2] [INFO] - key:0001,eventtime:[1550116445000|2019-02-14 11:54:05.000],currentMaxTimestamp:[1550116458000|2019-02-14 11:54:18.000],watermark:[1550116448000|2019-02-14 11:54:08.000]
(0001,eventtime:1550116445000|2019-02-14 11:54:05.000)
我們輸入數據:
0001,1550116444000
輸出:
2019-02-14 16:47:38,607 [Source: Socket Stream -> Map -> Timestamps/Watermarks (1/1)] [xuwei.tech.streaming.watermark.StreamingWindowWatermark2] [INFO] - key:0001,eventtime:[1550116444000|2019-02-14 11:54:04.000],currentMaxTimestamp:[1550116458000|2019-02-14 11:54:18.000],watermark:[1550116448000|2019-02-14 11:54:08.000]
(0001,eventtime:1550116444000|2019-02-14 11:54:04.000)
可以看出來是有觸發(fā)window的
總結
1.Flink如何處理亂序锣吼?
watermark+window機制,window中可以對input進行按照Event Time排序蓝厌,使得完全按照Event Time發(fā)生的順序去處理數據玄叠,以達到處理亂序數據的目的。-
- Flink何時觸發(fā)window拓提?
1读恃、watermark時間 >= window_end_time(對于out-of-order以及正常的數據而言)
2、在[window_start_time,window_end_time)中有數據存在
3.Flink應該如何設置最大亂序時間代态?
這個要結合自己的業(yè)務以及數據情況去設置寺惫。如果maxOutOfOrderness設置的太小,而自身數據發(fā)送時由于網絡等原因導致亂序或者late太多蹦疑,那么最終的結果就是會有很多單條的數據在window中被觸發(fā)西雀,數據的正確性影響太大。
參考:
http://shiyanjun.cn/archives/1785.html
http://wuchong.me/blog/2018/11/18/flink-tips-watermarks-in-apache-flink-made-easy/