1 eventTime
一個(gè)基于eventTime的flink程序必須定義:每條數(shù)據(jù)的eventTime時(shí)間戳和如何生成watermark怨喘。一旦設(shè)置了eventTime則必須設(shè)置watermark。
Event Time:是事件創(chuàng)建的時(shí)間。它通常由事件中的時(shí)間戳描述烤咧,例如采集的日志數(shù)據(jù)中,每一條日志都會記錄自己的生成時(shí)間缘圈,F(xiàn)link 通過時(shí)間戳分配器訪問事件時(shí)間戳穿香。
(1)設(shè)定流的時(shí)間屬性為eventtime
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 從調(diào)用時(shí)刻開始給 env 創(chuàng)建的每一個(gè) stream 追加時(shí)間特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
(2)給每條數(shù)據(jù)設(shè)置時(shí)間戳
要讓flink知道,流入的哪個(gè)字段是eventtime贞岭。
需要重載public abstract long extractTimestamp(T var1);
2.watermark
flink會將窗口內(nèi)的事件緩存下來八毯,直到接收到一個(gè)watermark。
(1) 數(shù)據(jù)流中的 Watermark 用于表示 timestamp 小于 Watermark 的數(shù)據(jù)瞄桨,都已經(jīng)到達(dá)了话速,因此, window 的執(zhí)行也是由 Watermark 觸發(fā)的芯侥。
每次系統(tǒng)會校驗(yàn)已經(jīng)到達(dá)的數(shù)據(jù)中最大的 maxEventTime泊交,然后認(rèn)定 eventTime小于 maxEventTime - t 的所有數(shù)據(jù)都已經(jīng)到達(dá),如果有窗口的停止時(shí)間等于maxEventTime – t柱查,那么這個(gè)窗口被觸發(fā)執(zhí)行廓俭。
(2)watermark是單調(diào)遞增的,保證時(shí)間不會倒流唉工。
一個(gè)時(shí)間戳為t的watermark意味著研乒,它之后到達(dá)的事件時(shí)間戳都大于t。
(3)周期性的watermark和逐個(gè)生成watermark
為什么會引入周期性watermark淋硝?
如果某個(gè)分區(qū)的watermark遲遲不更新雹熬,這回導(dǎo)致算子的eventtime停滯宽菜,導(dǎo)致大量的數(shù)據(jù)積壓。因此引入了根據(jù)系統(tǒng)時(shí)間周期性生成watermark的方式竿报。
(4)watermark的引入
dataStream.assignTimestampsAndWatermarks//會自動生成周期watermarks铅乡,存在默認(rèn)間隔,也可以自己設(shè)置間隔setAutoWatermarkInterval(long interval)
public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {//周期性生成watermark
private static final long serialVersionUID = 1L;
private long currentMaxTimestamp;
private long lastEmittedWatermark = -9223372036854775808L;
private final long maxOutOfOrderness;
public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
if (maxOutOfOrderness.toMilliseconds() < 0L) {
throw new RuntimeException("Tried to set the maximum allowed lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
} else {
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = -9223372036854775808L + this.maxOutOfOrderness;
}
}
public long getMaxOutOfOrdernessInMillis() {
return this.maxOutOfOrderness;
}
public abstract long extractTimestamp(T var1);
public final Watermark getCurrentWatermark() {
long potentialWM = this.currentMaxTimestamp - this.maxOutOfOrderness;
if (potentialWM >= this.lastEmittedWatermark) {
this.lastEmittedWatermark = potentialWM;
}
return new Watermark(this.lastEmittedWatermark);
}
//獲取時(shí)間戳仰楚,最大時(shí)間戳是單調(diào)遞增的
public final long extractTimestamp(T element, long previousElementTimestamp) {
long timestamp = this.extractTimestamp(element);
if (timestamp > this.currentMaxTimestamp) {
this.currentMaxTimestamp = timestamp;
}
return timestamp;
}
}