前言
之前的文章中已經(jīng)屢次提到過Flink的事件時間(event time)、水邮患伞(watermark)矛辕、亂序(out-of-order)、遲到數(shù)據(jù)(late element)這些概念,雖然它們都非沉钠罚基礎飞蹂,但筆者還沒有對它們做過像樣的介紹,感覺不太合適翻屈。正好今天腦子比較累陈哑,又是Friday night,不適合寫復雜的東西妖胀,就來談談簡單的吧芥颈。
事件時間與水印
所謂事件時間,就是Flink DataStream中的數(shù)據(jù)元素自身帶有的赚抡、在其實際發(fā)生時記錄的時間戳爬坑,具有業(yè)務含義,并與系統(tǒng)時間獨立涂臣。很顯然盾计,由于外部系統(tǒng)產(chǎn)生的數(shù)據(jù)往往不能及時、按序到達Flink系統(tǒng)赁遗,所以事件時間比處理時間有更強的不可預測性署辉。為了能夠準確地表達事件時間的處理進度,就必須用到水印岩四。
Flink水印的本質是DataStream中的一種特殊元素哭尝,每個水印都攜帶有一個時間戳。當時間戳為T的水印出現(xiàn)時剖煌,表示事件時間t <= T的數(shù)據(jù)都已經(jīng)到達材鹦,即水印后面應該只能流入事件時間t > T的數(shù)據(jù)。也就是說耕姊,水印是Flink判斷遲到數(shù)據(jù)的標準桶唐,同時也是窗口觸發(fā)的標記。
為了形象地說明水印的作用茉兰,參考一下下面的圖尤泽,是一個亂序的基于事件時間的數(shù)據(jù)流示例。
圖中的方框就是數(shù)據(jù)元素规脸,其中的數(shù)字表示事件時間坯约,W(x)就表示時間戳是x的水印,并有長度為4個時間單位的滾動窗口莫鸭。假設時間單位為秒鬼店,可見事件時間為2、3黔龟、1s的元素都會進入?yún)^(qū)間為[1s, 4s]的窗口妇智,而事件時間為7s的元素會進入?yún)^(qū)間為[5s, 8s]的窗口滥玷。當水印W(4)到達時,表示已經(jīng)沒有t <= 4s的元素了巍棱,[1s, 4s]窗口會被觸發(fā)并計算惑畴。同理,水印W(9)到達時航徙,[5s, 8s]窗口會被觸發(fā)并計算如贷,以此類推。
不過圖中暫時沒有示出遲到數(shù)據(jù)到踏。如果事件時間為6的元素出現(xiàn)在W(9)后面杠袱,就算是遲到了。遲到數(shù)據(jù)的處理后面再說窝稿。
上面的示例只有一個并行度楣富,那么在有多個并行度的情況下,就會有多個流產(chǎn)生水印伴榔,窗口觸發(fā)時該采用哪個水印呢纹蝴?答案是所有流入水印中時間戳最小的那個。來自官方文檔的圖能夠說明問題踪少。
容易理解塘安,如果所有流入水印中時間戳最小的那個都已經(jīng)達到或超過了窗口的結束時間,那么所有流的數(shù)據(jù)肯定已經(jīng)全部收齊援奢,就可以安全地觸發(fā)窗口計算了兼犯。
提取事件時間、產(chǎn)生水印
上面說了這么多集漾,那么事件時間是如何從數(shù)據(jù)中提取的免都,水印又是如何產(chǎn)生的呢?Flink提供了統(tǒng)一的DataStream.assignTimestampsAndWatermarks()方法來提取事件時間并同時產(chǎn)生水印帆竹,畢竟它們在處理過程中是緊密聯(lián)系的。
assignTimestampsAndWatermarks()方法接受的參數(shù)類型有AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks兩種脓规,分別對應周期性水印和打點(即由事件本身的屬性觸發(fā))水印栽连,它們的類圖如下所示。
周期性水印
顧名思義侨舆,使用AssignerWithPeriodicWatermarks時秒紧,水印是周期性產(chǎn)生的。該周期默認為200ms挨下,也能通過ExecutionConfig.setAutoWatermarkInterval()方法來指定新的周期熔恢。
由類圖容易看出,我們需要通過實現(xiàn)extractTimestamp()方法來提取事件時間臭笆,實現(xiàn)getCurrentWatermark()方法產(chǎn)生水印叙淌。但好在Flink已經(jīng)提供了3種內置的實現(xiàn)類秤掌,所以我們直接用就可以了,省事鹰霍。
-
AscendingTimestampExtractor
總說話口干舌燥的(闻鉴?),還是看代碼吧茂洒。
public abstract long extractAscendingTimestamp(T element);
@Override
public final long extractTimestamp(T element, long elementPrevTimestamp) {
final long newTimestamp = extractAscendingTimestamp(element);
if (newTimestamp >= this.currentTimestamp) {
this.currentTimestamp = newTimestamp;
return newTimestamp;
} else {
violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
return newTimestamp;
}
}
@Override
public final Watermark getCurrentWatermark() {
return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
}
AscendingTimestampExtractor產(chǎn)生的時間戳和水印必須是單調非遞減的孟岛,用戶通過覆寫extractAscendingTimestamp()方法抽取時間戳。如果產(chǎn)生了遞減的時間戳督勺,就要使用名為MonotonyViolationHandler的組件處理異常渠羞,有兩種方式:打印警告日志(默認)和拋出RuntimeException。
單調遞增的事件時間并不太符合實際情況智哀,所以AscendingTimestampExtractor用得不多次询。
-
BoundedOutOfOrdernessTimestampExtractor
它的出鏡率就非常高了。還是看代碼先盏触。
public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
if (maxOutOfOrderness.toMilliseconds() < 0) {
throw new RuntimeException("Tried to set the maximum allowed " +
"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
}
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
}
public abstract long extractTimestamp(T element);
@Override
public final Watermark getCurrentWatermark() {
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
@Override
public final long extractTimestamp(T element, long previousElementTimestamp) {
long timestamp = extractTimestamp(element);
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
如名字所述渗蟹,BoundedOutOfOrdernessTimestampExtractor產(chǎn)生的時間戳和水印是允許“有界亂序”的,構造它時傳入的參數(shù)maxOutOfOrderness就是亂序區(qū)間的長度赞辩,而實際發(fā)射的水印為通過覆寫extractTimestamp()方法提取出來的時間戳減去亂序區(qū)間雌芽,相當于讓水印把步調“放慢一點”。這是Flink為遲到數(shù)據(jù)提供的第一重保障辨嗽。
當然世落,亂序區(qū)間的長度要根據(jù)實際環(huán)境謹慎設定,設定得太短會丟較多的數(shù)據(jù)糟需,設定得太長會導致窗口觸發(fā)延遲屉佳,實時性減弱。
- IngestionTimeExtractor
@Override
public long extractTimestamp(T element, long previousElementTimestamp) {
final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
maxTimestamp = now;
return now;
}
@Override
public Watermark getCurrentWatermark() {
final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
maxTimestamp = now;
return new Watermark(now - 1);
}
IngestionTimeExtractor基于當前系統(tǒng)時鐘生成時間戳和水印洲押,其實就是Flink三大時間特征里的攝入時間了武花。
打點水印
打點水印比周期性水印用的要少不少,并且Flink沒有內置的實現(xiàn)杈帐,那么就寫個最簡單的栗子吧体箕。
sourceStream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<UserActionRecord>() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(UserActionRecord lastElement, long extractedTimestamp) {
return lastElement.getUserId().endsWith("0") ? new Watermark(extractedTimestamp - 1) : null;
}
@Override
public long extractTimestamp(UserActionRecord element, long previousElementTimestamp) {
return element.getTimestamp();
}
});
AssignerWithPunctuatedWatermarks適用于需要依賴于事件本身的某些屬性決定是否發(fā)射水印的情況。我們實現(xiàn)checkAndGetNextWatermark()方法來產(chǎn)生水印挑童,產(chǎn)生的時機完全由用戶控制累铅。上面例子中是收取到用戶ID末位為0的數(shù)據(jù)時才發(fā)射。
還有三點需要提醒:
- 不管使用哪種方式產(chǎn)生水印站叼,都不能過于頻繁娃兽。因為Watermark對象是會全部流向下游的,也會實打實地占用內存尽楔,水印過多會造成系統(tǒng)性能下降投储。
- 水印的生成要盡量早第练,一般是在接入Source之后就產(chǎn)生,或者在Source經(jīng)過簡單的變換(map轻要、filter等)之后產(chǎn)生复旬。
- 如果需求方對事件時間carry的業(yè)務意義并不關心,可以直接使用處理時間冲泥,簡單方便驹碍。
遲到數(shù)據(jù)處理
如上所述,水印的亂序區(qū)間能夠保證一些遲到數(shù)據(jù)不被丟棄凡恍,但是亂序區(qū)間往往不很長志秃,那些真正遲到了的數(shù)據(jù)該怎么辦呢?有兩種方法來兜底嚼酝,可以說是Flink為遲到數(shù)據(jù)提供的第二重保障浮还。
窗口允許延遲
Flink提供了WindowedStream.allowedLateness()方法來設定窗口的允許延遲。也就是說闽巩,正常情況下窗口觸發(fā)計算完成之后就會被銷毀钧舌,但是設定了允許延遲之后,窗口會等待allowedLateness的時長后再銷毀涎跨。在該區(qū)間內的遲到數(shù)據(jù)仍然可以進入窗口中洼冻,并觸發(fā)新的計算。當然隅很,窗口也是吃資源大戶撞牢,所以allowedLateness的值要適當。給個完整的代碼示例如下叔营。
sourceStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<UserActionRecord>(Time.seconds(30)) {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(UserActionRecord record) {
return record.getTimestamp();
}
}
)
.keyBy("platform")
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(30))
.aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc())
// ......
allowedLateness機制實際上就是DataFlow模型中的回填(backfill)策略的實現(xiàn)屋彪。對于滑動窗口和滾動窗口是累積(accumulating)策略,對于會話窗口則是累積與回撤(accumulating & retracting)策略绒尊。之前講DataFlow模型時提到過畜挥,不廢話了。
側輸出遲到數(shù)據(jù)
側輸出(side output)是Flink的分流機制婴谱。遲到數(shù)據(jù)本身可以當做特殊的流蟹但,我們通過調用WindowedStream.sideOutputLateData()方法將遲到數(shù)據(jù)發(fā)送到指定OutputTag的側輸出流里去,再進行下一步處理(比如存到外部存儲或消息隊列)勘究。代碼如下。
// 側輸出的OutputTag
OutputTag<UserActionRecord> lateOutputTag = new OutputTag<>("late_data_output_tag");
sourceStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<UserActionRecord>(Time.seconds(30)) {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(UserActionRecord record) {
return record.getTimestamp();
}
}
)
.keyBy("platform")
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(30))
.sideOutputLateData(lateOutputTag) // 側輸出
.aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc())
// ......
// 獲取遲到數(shù)據(jù)并寫入對應Sink
stream.getSideOutput(lateOutputTag).addSink(lateDataSink);
The End
2019年快過去了斟冕,今年真的寫了不少東西呢口糕。
民那晚安。