什么是Trigger
Flink中Trigger用于定義何時對窗口進行計算并發(fā)出結果靠闭,它的觸發(fā)條件可以是時間也可以是某些特定條件帐我。對于時間窗口而言,默認Trigger是處理時間或Watermark大于窗口結束時間時觸發(fā)愧膀。
在Flink窗口機制中拦键,還有一個窗格的概念,它將窗口劃分成多個規(guī)則的部分檩淋,這些部分可看作子窗口芬为,可簡單理解為對窗口再次分片。窗格則定義為一組key相同(分區(qū)操作)蟀悦,并且位于同一個窗口中的元素媚朦。每個窗格都有一個Trigger對象。
先Trigger類中的幾個重要函數(shù):
// 每當有元素添加到窗口都會調用
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
// 在處理時間計時器觸發(fā)時調用
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
// 在事件時間計時器觸發(fā)時調用
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
// 有狀態(tài)的觸發(fā)器相關日戈,并在它們相應的窗口合并時合并兩個觸發(fā)器的狀態(tài)询张,例如使用會話窗口。
public boolean canMerge() {
return false;
}
// 該函數(shù)會在清除窗口是調用
public abstract void clear(W window, TriggerContext ctx) throws Exception;
TriggerResult
每次調用觸發(fā)器都會生成一個TriggerResult浙炼,它用于決定窗口接下來的行為份氧。
CONTINUE: 不做任何處理
FIRE:觸發(fā)計算
PURGE:完全清除窗口內容,并刪除窗口自身及元數(shù)據(jù)鼓拧。
FIRE_AND_PURGE:先進行窗口計算(FIRE)半火,隨后刪除所有狀態(tài)及元數(shù)據(jù)(PURGE)
Flink的內置Trigger:
內置Trigger | 說明 |
---|---|
ProcessingTimeTrigger | 一次觸發(fā),machine time大于窗口結束時間時觸發(fā) |
EventTimeTrigger | 一次觸發(fā),watermark大于窗口結束時間時觸發(fā) |
ContinuousProcessingTimeTrigger | 多次觸發(fā),基于processing time的固定時間間隔 |
ContinuousEventTimeTrigger | 多次觸發(fā) ,基于event time的固定時間間隔 |
CountTrigger | 多次觸發(fā),基于element的固定條數(shù) |
DeltaTrigger | 多次觸發(fā),當前element與上次觸發(fā)trigger的element做delta計算,超過threshold(閾值)時觸發(fā) |
PurgingTrigger | trigger wrapper,當nested trigger時觸發(fā),額外會清理窗口當前的中間狀態(tài) |
- EventTimeTrigger
當任務中時間語義定義為EventTime,做時間窗口計算時季俩,默認觸發(fā)器為EventTimeTrigger钮糖,也可以這樣定義:
.keyBy(_.userId)
.timeWindow(Time.minutes(5))
.trigger(EventTimeTrigger.create())
看看EventTimeTrigger的源碼:
@Override
// 對每一個到達窗口的元素調用
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
// window的最大時間戳比watermark小,該window需要立刻進行計算
return TriggerResult.FIRE;
} else {
// 注冊一個event time事件酌住,當watermark超過window.maxTimestamp時店归,會調用onEventTime方法
// 需要注意的是,在每個窗口中酪我,每個key和特定的時間戳只能有一個計時器
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}