Flink窗口可以分為時(shí)間維度窗口和數(shù)據(jù)量窗口猜年。其中時(shí)間維度窗口又可分為ProcessingTime和EventTime适肠。一般情況下以時(shí)間作為窗口的應(yīng)用場景更多一些刁笙,比如:統(tǒng)計(jì)每分鐘的數(shù)據(jù)量、處理數(shù)據(jù)亂序等拧揽。
最近有這么一個場景慰照。采集kafka中數(shù)據(jù)并組裝成SQL批量寫入MySQL灶挟。由于MySQL對單個請求包(max_allowed_packet)有限制,所以考慮通過數(shù)量窗口來控制單個insert 語句的長度毒租。實(shí)踐過程中發(fā)現(xiàn)另外一個問題:如果窗口數(shù)量達(dá)不到稚铣,則一直不會觸發(fā),將導(dǎo)致數(shù)據(jù)入庫延遲。
下面是實(shí)現(xiàn)數(shù)量窗口的代碼榛泛,類名稱:org.apache.flink.streaming.api.datastream.KeyedStream:
/**
* Windows this {@code KeyedStream} into tumbling count windows.
*
* @param size The size of the windows in number of elements.
*/
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}
創(chuàng)建一個全局window蝌蹂,緩存數(shù)據(jù)。后面通過觸發(fā)器觸發(fā)事件曹锨,后續(xù)由窗口處理函數(shù)對此窗口中的數(shù)據(jù)進(jìn)行處理孤个。那么CountTrigger中又是如何實(shí)現(xiàn)最大值呢?下面是CountTrigger的代碼:
/**
* A {@link Trigger} that fires once the count of elements in a pane reaches the given count.
*
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
@PublicEvolving
public class CountTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long maxCount;
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
private CountTrigger(long maxCount) {
this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(W window, OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(stateDesc);
}
@Override
public String toString() {
return "CountTrigger(" + maxCount + ")";
}
/**
* Creates a trigger that fires once the number of elements in a pane reaches the given count.
*
* @param maxCount The count of elements at which to fire.
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
public static <W extends Window> CountTrigger<W> of(long maxCount) {
return new CountTrigger<>(maxCount);
}
private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}
onElement方法沛简,每次一個數(shù)據(jù)過來都會調(diào)用這個方法齐鲤。此類中實(shí)現(xiàn)了一個ReducingState,用來累加數(shù)據(jù)流入的數(shù)量椒楣,當(dāng)累加達(dá)到最大數(shù)量時(shí)给郊,會清除累加值,并觸發(fā)窗口函數(shù)進(jìn)行處理捧灰。TriggerResult.FIRE即是觸發(fā)執(zhí)行淆九。下面貼上TriggerResult的定義部分:
/**
* No action is taken on the window.
*/
CONTINUE,
/**
* {@code FIRE_AND_PURGE} evaluates the window function and emits the window
* result.
*/
FIRE_AND_PURGE,
/**
* On {@code FIRE}, the window is evaluated and results are emitted.
* The window is not purged, though, all elements are retained.
*/
FIRE,
/**
* All elements in the window are cleared and the window is discarded,
* without evaluating the window function or emitting any elements.
*/
PURGE;
CONTINUE:不做任何操作;
FIRE_AND_PURGE:觸發(fā)窗口事件并且清除窗口中積累的數(shù)據(jù)毛俏;
FIRE:觸發(fā)窗口事件不清除窗口中的累積的數(shù)據(jù)炭庙;
PURGE:只清除窗口中的數(shù)據(jù),且拋棄窗口煌寇;
如果需要CountTrigger響應(yīng)最大等待時(shí)間焕蹄,需要更改現(xiàn)有的onProcessingTime方法實(shí)現(xiàn),改成FIRE阀溶。為什么不直接改成FIRE_AND_PURGE呢腻脏,參考源代碼風(fēng)格,如果需要FIRE_AND_PURGE則外層使用PurgingTrigger.of進(jìn)行包裝银锻,這樣比較靈活永品。
新建Trigger類,的代碼如下:
/**
* 可以指定最大窗口數(shù)量和等待的時(shí)間
*
* @param <W>
* @author mingbozhang
*/
public class CountWithMaxTimeTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long maxCount;
private final long maxWaitingTime;
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("count", new CountWithMaxTimeTrigger.Sum(), LongSerializer.INSTANCE);
/**
* When merging we take the lowest of all fire timestamps as the new fire timestamp.
*/
private final ReducingStateDescriptor<Long> fireTimeStateDesc =
new ReducingStateDescriptor<>("fire-time", new CountWithMaxTimeTrigger.Min(), LongSerializer.INSTANCE);
private CountWithMaxTimeTrigger(long maxCount, long maxWaitingTime) {
this.maxCount = maxCount;
this.maxWaitingTime = maxWaitingTime;
}
/**
* Creates a trigger that fires once the number of elements in a pane reaches the given count.
*
* @param maxCount The count of elements at which to fire.
* @param maxWaitingTime max waiting ms
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
public static <W extends Window> CountWithMaxTimeTrigger<W> of(long maxCount, long maxWaitingTime) {
return new CountWithMaxTimeTrigger<>(maxCount, maxWaitingTime);
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(fireTimeStateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
fireTimestamp.clear();
return TriggerResult.FIRE;
}
if (fireTimestamp.get() == null) {
long triggerTime = System.currentTimeMillis() + maxWaitingTime;
ctx.registerProcessingTimeTimer(triggerTime);
fireTimestamp.add(triggerTime);
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(fireTimeStateDesc);
if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
ctx.getPartitionedState(stateDesc).clear();
fireTimestamp.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(W window, OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(stateDesc);
}
@Override
public String toString() {
return "CountTrigger(" + maxCount + ")";
}
private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
private static class Min implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return Math.min(value1, value2);
}
}
}
在onElement中注冊Timer:
if (fireTimestamp.get() == null) {
long triggerTime = System.currentTimeMillis() + maxWaitingTime;
ctx.registerProcessingTimeTimer(triggerTime);
fireTimestamp.add(triggerTime);
}
在onProcessingTime中觸發(fā)窗口事件徒仓,并且清除最大數(shù)據(jù)量計(jì)數(shù)器:
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(fireTimeStateDesc);
if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) {
ctx.getPartitionedState(stateDesc).clear();
fireTimestamp.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}