背景
工作中遇到一個需求绍些,需要按天劃分窗口屏箍,并且每隔固定時間段觸發(fā)一次窗口計算,時間語義為ProcessingTime嘀掸。在測試過程中發(fā)現(xiàn),使用ContinuousProcessingTimeTrigger會有一個問題:當窗口到達EndTime時并不會觸發(fā)规惰。
測試
在本地測試時使用自造數(shù)據(jù):類別睬塌,數(shù)量,時間歇万。然后統(tǒng)計每分鐘的總量揩晴,每10秒鐘觸發(fā)一次窗口計算,并且觸發(fā)窗口計算后立即清除已經(jīng)計算過的所有數(shù)據(jù)贪磺,累計的總量值通過狀態(tài)保存硫兰。
public class demo2 {
private static class DataSource extends RichParallelSourceFunction<Tuple3<String,Integer,String>>{
private volatile boolean isRunning=true;
@Override
public void run(SourceContext<Tuple3<String,Integer,String>> ctx) throws Exception{
Random random=new Random();
while(isRunning){
Thread.sleep((getRuntimeContext().getIndexOfThisSubtask()+1)*1000*8);
String key="類別"+(char)('A'+random.nextInt(1));
int value=random.nextInt(10)+1;
String dt=dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss.SSS",System.currentTimeMillis());
System.out.println(String.format("Emits\t(%s,%d,%s)",key,value,dt));
ctx.collect(new Tuple3<>(key,value,dt));
}
}
@Override
public void cancel(){
isRunning=false;
}
}
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple3<String,Integer,String>> ds =env.addSource(new DataSource());
SingleOutputStreamOperator<String> res=ds
.keyBy(
(KeySelector<Tuple3<String, Integer,String>, String>) in -> in.f0
)
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
.evictor(CountEvictor.of(0,true))
.process(new ProcessWindowFunction<Tuple3<String, Integer,String>, String, String, TimeWindow>() {
private static final long serialVersionUID = 3091075666113786631L;
private ValueState<Integer> valueState;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> desc=new ValueStateDescriptor<>("value_state",Integer.class);
valueState=getRuntimeContext().getState(desc);
super.open(parameters);
}
@Override
public void process(String tuple, Context context, Iterable<Tuple3<String, Integer,String>> iterable, Collector<String> collector) throws Exception {
//測試輸出:窗口的每次觸發(fā)時間
System.out.println("trigger:"+dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss.SSS",context.currentProcessingTime()));
int res=0;
if(valueState.value()!=null){
res=valueState.value();
}
for(Tuple3<String, Integer,String> val:iterable){
res+=val.f1;
}
valueState.update(res);
String out=dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss",context.window().getStart())+
","+tuple.toString()+":"+valueState.value();
collector.collect(out);
}
@Override
public void clear(Context context) throws Exception {
//狀態(tài)清理時間
System.out.println("Start Clear:"+dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss.SSS",System.currentTimeMillis()));
valueState.clear();
super.clear(context);
}
});
res.process(new ProcessFunction<String, Object>() {
@Override
public void processElement(String s, Context context, Collector<Object> collector) throws Exception {
System.out.println(s);
}
});
env.execute();
}
}
程序執(zhí)行后的輸出結(jié)果如下:
從上圖可以看到在30/40/50這三個節(jié)點,窗口都觸發(fā)了計算寒锚,并輸出了正確的累計結(jié)果瞄崇,但是在窗口結(jié)束的時間點并未觸發(fā)計算
問題定位
看源碼
- 屬性聲明
public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long interval;
/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);
interval為傳入的觸發(fā)時間間隔;stateDesc是定義的ReduceState狀態(tài)描述符,Min()代表選擇的ReduceFunction壕曼,表示選擇多個時間戳中時間最小的苏研。
- onElement方法
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
timestamp = ctx.getCurrentProcessingTime();
if (fireTimestamp.get() == null) {
long start = timestamp - (timestamp % interval);
long nextFireTimestamp = start + interval;
ctx.registerProcessingTimeTimer(nextFireTimestamp);
fireTimestamp.add(nextFireTimestamp);
return TriggerResult.CONTINUE;
}
return TriggerResult.CONTINUE;
}
onElement方法是用來初始化窗口的第一次的觸發(fā)時間。
- onProcessingTime方法
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
if (fireTimestamp.get().equals(time)) {
fireTimestamp.clear();
fireTimestamp.add(time + interval);
ctx.registerProcessingTimeTimer(time + interval);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
onProcessingTime方法是基于ProcessingTime的回調(diào)方法腮郊,首先從狀態(tài)中獲取當前的觸發(fā)時間摹蘑,然后跟定時器中時間進行比對,如果兩者相等轧飞,則清除狀態(tài)值并重新初始化衅鹿,然后更新注冊下一次的定時器觸發(fā)時間撒踪,最后觸發(fā)窗口計算。
由onProcessingTime的代碼推測大渤,最后一次fireTimestamp和ctx.registerProcessingTimeTimer注冊的時間已經(jīng)超出了窗口的結(jié)束時間制妄,導致在窗口結(jié)束時并不會觸發(fā)最后一次計算。
- 測試代碼驗證
根據(jù)ContinuousProcessingTimeTrigger的源碼新建一個MyContinuousProcessingTimeTrigger的類,修改其中的onProcessingTime方法:
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
if (fireTimestamp.get().equals(time)) {
fireTimestamp.clear();
fireTimestamp.add(time + interval);
ctx.registerProcessingTimeTimer(time + interval);
System.out.println("nextFireTime:"+dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss.SSS",time+this.interval));
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
然后再測試代碼中使用MyContinuousProcessingTimeTrigger泵三,測試輸出如下:
前兩次注冊的40&50秒兩個時間點都會正確觸發(fā)耕捞,但17:00:00這個時間點因為此時窗口以及關閉(窗口的關閉時間:16:59:59.999),導致不會觸發(fā)烫幕。
問題的源頭以及確認俺抽,那接下來就是解決這個問題了。
解決途徑
解決這個問題较曼,同樣需要去翻源碼磷斧,我們在窗口的process方法中找到如下代碼:
if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
clearAllState(triggerContext.window, evictingWindowState, mergingWindows);
}
private void clearAllState(
W window,
ListState<StreamRecord<IN>> windowState,
MergingWindowSet<W> mergingWindows) throws Exception {
windowState.clear();
triggerContext.clear();
processContext.window = window;
processContext.clear();
if (mergingWindows != null) {
mergingWindows.retireWindow(window);
mergingWindows.persist();
}
}
可以看到,會有一個CleanupTime捷犹,當滿足這個條件時弛饭,會清除窗口的信息。繼續(xù)翻isCleanupTime這個方法:
/**
* Returns {@code true} if the given time is the cleanup time for the given window.
*/
protected final boolean isCleanupTime(W window, long time) {
return time == cleanupTime(window);
}
/**
* Returns the cleanup time for a window, which is
* {@code window.maxTimestamp + allowedLateness}. In
* case this leads to a value greater than {@link Long#MAX_VALUE}
* then a cleanup time of {@link Long#MAX_VALUE} is
* returned.
*
* @param window the window whose cleanup time we are computing.
*/
private long cleanupTime(W window) {
if (windowAssigner.isEventTime()) {
long cleanupTime = window.maxTimestamp() + allowedLateness;
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
} else {
return window.maxTimestamp();
}
}
可以看到對于非EventTime的語義萍歉,cleanupTime就是窗口的結(jié)束時間window.maxTimestamp()孩哑,看到這里,解決問題的方法也就有了:
修改MyContinuousProcessingTimeTrigger中的onProcessingTime方法:
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
if(time==window.maxTimestamp()){
return TriggerResult.FIRE;
}
if (fireTimestamp.get().equals(time)) {
fireTimestamp.clear();
fireTimestamp.add(time + interval);
ctx.registerProcessingTimeTimer(time + interval);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
測試結(jié)果:
可以看到在窗口結(jié)束時會觸發(fā)正確的統(tǒng)計結(jié)果翠桦。