process內(nèi)使用KeyedProcessFunction
liststate可以設(shè)置檢查點(diǎn)當(dāng)程序在某時(shí)刻停止再啟動(dòng)會(huì)繼續(xù)(記錄偏移量)
keyBy("windowEnd").process(new KeyedProcessFunction<Tuple, ItemCount, String>() {
ListState<ItemCount> listState = null;
//3.定時(shí)器實(shí)現(xiàn)邏輯
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
ArrayList<ItemCount> itemCounts = new ArrayList<>();
//將listState內(nèi)的數(shù)據(jù)取出
for (ItemCount itemCount : listState.get()) {
itemCounts.add(itemCount);
}
//排序
Collections.sort(itemCounts, new Comparator<ItemCount>() {
@Override
public int compare(ItemCount o1, ItemCount o2) {
return o1.count.compareTo(o2.count);
}
});
StringBuffer stringBuffer = new StringBuffer("時(shí)間 :"+sdt.format(itemCounts.get(0).windowEnd));
for (int i = 0;i<itemCounts.size();i++){
ItemCount itemCount = itemCounts.get(i);
stringBuffer.append("商品ID :"+itemCount.itemID+" 點(diǎn)擊量 :"+itemCount.count+"\n");
}
//發(fā)送出去
out.collect(stringBuffer.toString());
//清理list
itemCounts.clear();
}
@Override
//1.將ListStateDescriptor描述創(chuàng)建出來(lái)作為全局使用
public void open(Configuration parameters) throws Exception {
ListStateDescriptor<ItemCount> jk = new ListStateDescriptor<>(
"jk",//名稱(chēng)
TypeInformation.of(new TypeHint<ItemCount>() {
}) //類(lèi)型
);
//創(chuàng)建出listState
listState = getRuntimeContext().getListState(jk);
}
@Override
//2.將數(shù)據(jù)添加到listState
public void processElement(ItemCount value, Context ctx, Collector<String> out) throws Exception {
listState.add(value);
//創(chuàng)建定時(shí)器
ctx.timerService().registerEventTimeTimer(value.windowEnd + 1);
}
});