1.前言
窗口的觸發(fā)器定義了窗口是何時(shí)被觸發(fā)并同時(shí)決定觸發(fā)行為(對(duì)窗口進(jìn)行清理或者計(jì)算)爪飘。
注意:窗口的觸發(fā)在內(nèi)部是設(shè)置定時(shí)器來(lái)實(shí)現(xiàn)的。
2. 觸發(fā)器相關(guān)類
Trigger抽象類:
- onElement:每個(gè)元素到達(dá)觸發(fā)的回調(diào)方法愿阐;
- onProcessingTime:基于處理時(shí)間定時(shí)器觸發(fā)的回調(diào)方法污它;
- onEventTime:基于事件時(shí)間定時(shí)器觸發(fā)的回調(diào)方法诊杆;
- onMerge:窗口在合并時(shí)觸發(fā)的回調(diào)方法(會(huì)話窗口分配器assigner);
TriggerContext接口(定義在Trigger類中)氓拼,用于維持狀態(tài)你画,注冊(cè)定時(shí)器等:
- registerXXXTimeTimer:注冊(cè)(處理/事件)時(shí)間定時(shí)器;
- deleteXXXTimeTimer:刪除(處理/事件)時(shí)間定時(shí)器桃漾;
- getPartitionedState:從Flink狀態(tài)存儲(chǔ)終端獲取狀態(tài)坏匪;
TriggerResult枚舉類,用于決定窗口在觸發(fā)后的行為:
- CONTINUE:不作任何處理撬统;
- FIRE_AND_PURGE:觸發(fā)窗口計(jì)算并輸出結(jié)果同時(shí)清理并釋放窗口(該值只會(huì)被清理觸發(fā)器PurgingTrigger使用)适滓;
- FIRE:觸發(fā)窗口計(jì)算并輸出結(jié)果,但窗口并沒(méi)有被釋放并且數(shù)據(jù)仍然保留恋追;
- PURGE:不觸發(fā)窗口計(jì)算凭迹,不輸出結(jié)果,只清除窗口中的所有數(shù)據(jù)并釋放窗口
3.時(shí)間窗口觸發(fā)器
3.1 ProcessingTimeTrigger
onElement方法
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
先去注冊(cè)一個(gè)ProcessingTime定時(shí)器苦囱,觸發(fā)時(shí)間點(diǎn)就是當(dāng)前窗口的最大時(shí)間戳嗅绸;
觸發(fā)結(jié)果就是不做任何操作。
onProcessingTime和onEventTime方法
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE;
}
3.2 EventTimeTrigger
onElement方法
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
當(dāng)窗口的最大時(shí)間戳小于等于水位線立即觸發(fā)窗口計(jì)算撕彤;否則的話就去注冊(cè)EventTime定時(shí)器鱼鸠,結(jié)果就是不做任何操作。
onProcessingTime和onEventTime方法
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
onEventTime:如果時(shí)間等于窗口的最大時(shí)間戳羹铅,則觸發(fā)對(duì)窗口進(jìn)行計(jì)算蚀狰,否則不做任何操作
這里為什么需要有一層時(shí)間的判斷呢(time == window.maxTimestamp() )?
參考博客:Flink中EventTimeTrigger的理解
4.持續(xù)時(shí)間觸發(fā)器
筆者之前寫過(guò) 窗口實(shí)用觸發(fā)器
持續(xù)觸發(fā)职员,顧名思義麻蹋,在本次觸發(fā)之后需要更新并且保存下一次觸發(fā)的時(shí)間戳,因此在持續(xù)時(shí)間觸發(fā)器中引入了狀態(tài)保存機(jī)制:
/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);
使用的是ReducingState焊切,這里面調(diào)用的是Min函數(shù)扮授,選擇多個(gè)時(shí)間戳內(nèi)最小的。
ContinuousProcessingTimeTrigger
onElement方法:
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
//獲得存儲(chǔ)觸發(fā)時(shí)間戳的狀態(tài)容器
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
//獲取當(dāng)前處理時(shí)間
timestamp = ctx.getCurrentProcessingTime();
//如果狀態(tài)對(duì)象為空蛛蒙,則初始化糙箍;否則直接返回
if (fireTimestamp.get() == null) {
//計(jì)算起始時(shí)間
long start = timestamp - (timestamp % interval);
//下一次觸發(fā)時(shí)間戳為起始時(shí)間加上觸發(fā)間隔
long nextFireTimestamp = start + interval;
//以下一次觸發(fā)的時(shí)間戳注冊(cè)處理時(shí)間定時(shí)器
ctx.registerProcessingTimeTimer(nextFireTimestamp);
//將下一次觸發(fā)計(jì)算的時(shí)間戳加入狀態(tài)進(jìn)行保存
fireTimestamp.add(nextFireTimestamp);
return TriggerResult.CONTINUE;
}
return TriggerResult.CONTINUE;
}
所以,ContinuousProcessingTimeTrigger的onElement方法主要是完成對(duì)存儲(chǔ)窗口觸發(fā)時(shí)間狀態(tài)對(duì)象的初始化牵祟,并注冊(cè)了第一次執(zhí)行的定時(shí)器。
ContinuousEventTimeTrigger的onElement方法實(shí)現(xiàn)跟ContinuousProcessingTimeTrigger除了獲取時(shí)間戳的方式不同抖格,基本與此類似诺苹。
基于時(shí)間的回調(diào)方法咕晋,onProcessingTime
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
//首先從狀態(tài)中查找觸發(fā)時(shí)間
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
//跟定時(shí)器的注冊(cè)時(shí)間進(jìn)行對(duì)比,只有兩者相等時(shí)才會(huì)觸發(fā)計(jì)算
if (fireTimestamp.get().equals(time)) {
//清空狀態(tài)并重新初始化值
fireTimestamp.clear();
fireTimestamp.add(time + interval);
//注冊(cè)下一次觸發(fā)的定時(shí)器
ctx.registerProcessingTimeTimer(time + interval);
//觸發(fā)窗口計(jì)算
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}