Timer簡(jiǎn)介
Timer(定時(shí)器)是Flink Streaming API提供的用于感知并利用處理時(shí)間/事件時(shí)間變化的機(jī)制谣膳。Ververica blog上給出的描述如下:
Timers are what make Flink streaming applications reactive and adaptable to processing and event time changes.
對(duì)于普通用戶來說脆霎,最常見的顯式利用Timer的地方就是KeyedProcessFunction肾档。我們?cè)谄鋚rocessElement()方法中注冊(cè)Timer择诈,然后覆寫其onTimer()方法作為Timer觸發(fā)時(shí)的回調(diào)邏輯。根據(jù)時(shí)間特征的不同:
- 處理時(shí)間——調(diào)用Context.timerService().registerProcessingTimeTimer()注冊(cè)晌缘;onTimer()在系統(tǒng)時(shí)間戳達(dá)到Timer設(shè)定的時(shí)間戳?xí)r觸發(fā)长捧。
- 事件時(shí)間——調(diào)用Context.timerService().registerEventTimeTimer()注冊(cè);onTimer()在Flink內(nèi)部水印達(dá)到或超過Timer設(shè)定的時(shí)間戳?xí)r觸發(fā)放祟。
舉個(gè)栗子鳍怨,按天實(shí)時(shí)統(tǒng)計(jì)指標(biāo)并存儲(chǔ)在狀態(tài)中,每天0點(diǎn)清除狀態(tài)重新統(tǒng)計(jì)跪妥,就可以在processElement()方法里注冊(cè)Timer鞋喇。
ctx.timerService().registerProcessingTimeTimer(
tomorrowZeroTimestampMs(System.currentTimeMillis(), 8) + 1
);
public static long tomorrowZeroTimestampMs(long now, int timeZone) {
return now - (now + timeZone * 3600000) % 86400000 + 86400000;
}
再在onTimer()方法里執(zhí)行state.clear()
。so easy眉撵。
除了KeyedProcessFunction之外侦香,Timer在窗口機(jī)制中也有重要的地位。提起窗口自然就能想到Trigger纽疟,即觸發(fā)器罐韩。來看下Flink自帶的EventTimeTrigger的部分代碼,它是事件時(shí)間特征下的默認(rèn)觸發(fā)器污朽。
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
可見散吵,當(dāng)水印還沒有到達(dá)窗口右邊沿時(shí),就注冊(cè)以窗口右邊沿為時(shí)間戳的Timer。Timer到期后觸發(fā)onEventTime()方法矾睦,進(jìn)而觸發(fā)該窗口相關(guān)聯(lián)的Trigger晦款。
文章開頭引用的blog從用戶的角度給出了Flink Timer的4大特點(diǎn),如下圖所示枚冗。
經(jīng)由上面的介紹缓溅,我們有了兩個(gè)入手點(diǎn)(KeyedProcessFunction、Trigger)來分析Timer的細(xì)節(jié)赁温。接下來從前者入手肛宋,let's get our hands dirty。
TimerService束世、InternalTimerService
負(fù)責(zé)實(shí)際執(zhí)行KeyedProcessFunction的算子是KeyedProcessOperator酝陈,其中以內(nèi)部類的形式實(shí)現(xiàn)了KeyedProcessFunction需要的上下文類Context,如下所示毁涉。
private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context {
private final TimerService timerService;
private StreamRecord<IN> element;
ContextImpl(KeyedProcessFunction<K, IN, OUT> function, TimerService timerService) {
function.super();
this.timerService = checkNotNull(timerService);
}
@Override
public Long timestamp() {
checkState(element != null);
if (element.hasTimestamp()) {
return element.getTimestamp();
} else {
return null;
}
}
@Override
public TimerService timerService() {
return timerService;
}
// 以下略...
}
可見timerService()方法返回的是外部傳入的TimerService實(shí)例沉帮,那么我們就回到KeyedProcessOperator看一下它的實(shí)現(xiàn),順便放個(gè)類圖贫堰。
public class KeyedProcessOperator<K, IN, OUT>
extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
private static final long serialVersionUID = 1L;
private transient TimestampedCollector<OUT> collector;
private transient ContextImpl context;
private transient OnTimerContextImpl onTimerContext;
public KeyedProcessOperator(KeyedProcessFunction<K, IN, OUT> function) {
super(function);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void open() throws Exception {
super.open();
collector = new TimestampedCollector<>(output);
InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
TimerService timerService = new SimpleTimerService(internalTimerService);
context = new ContextImpl(userFunction, timerService);
onTimerContext = new OnTimerContextImpl(userFunction, timerService);
}
@Override
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
invokeUserFunction(TimeDomain.EVENT_TIME, timer);
}
@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.eraseTimestamp();
invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
context.element = element;
userFunction.processElement(element.getValue(), context, collector);
context.element = null;
}
private void invokeUserFunction(
TimeDomain timeDomain,
InternalTimer<K, VoidNamespace> timer) throws Exception {
onTimerContext.timeDomain = timeDomain;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}
// 以下略...
}
通過閱讀上述代碼喇勋,可以總結(jié)出:
- TimerService接口的實(shí)現(xiàn)類為SimpleTimerService,它實(shí)際上又是InternalTimerService的非常簡(jiǎn)單的代理(真的很簡(jiǎn)單偎行,代碼略去)川背。
- InternalTimerService的實(shí)例由getInternalTimerService()方法取得,該方法定義在所有算子的基類AbstractStreamOperator中蛤袒。它比較重要熄云,后面再提。
- KeyedProcessOperator.processElement()方法調(diào)用用戶自定義函數(shù)的processElement()方法妙真,順便將上下文實(shí)例ContextImpl傳了進(jìn)去,所以用戶可以由它獲得TimerService來注冊(cè)Timer珍德。
- Timer在代碼中叫做InternalTimer(是個(gè)接口)锈候。
- 當(dāng)Timer觸發(fā)時(shí),實(shí)際上是根據(jù)時(shí)間特征調(diào)用onProcessingTime()/onEventTime()方法(這兩個(gè)方法來自Triggerable接口)晴及,進(jìn)而觸發(fā)用戶函數(shù)的onTimer()回調(diào)邏輯都办。后面還會(huì)見到它們嫡锌。
接下來就看看InternalTimerService是如何取得的虑稼。
/**
* Returns a {@link InternalTimerService} that can be used to query current processing time
* and event time and to set timers. An operator can have several timer services, where
* each has its own namespace serializer. Timer services are differentiated by the string
* key that is given when requesting them, if you call this method with the same key
* multiple times you will get the same timer service instance in subsequent requests.
*
* <p>Timers are always scoped to a key, the currently active key of a keyed stream operation.
* When a timer fires, this key will also be set as the currently active key.
*
* <p>Each timer has attached metadata, the namespace. Different timer services
* can have a different namespace type. If you don't need namespace differentiation you
* can use {@link VoidNamespaceSerializer} as the namespace serializer.
*
* @param name The name of the requested timer service. If no service exists under the given
* name a new one will be created and returned.
* @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
* @param triggerable The {@link Triggerable} that should be invoked when timers fire
* @param <N> The type of the timer namespace.
*/
public <K, N> InternalTimerService<N> getInternalTimerService(
String name,
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerable) {
checkTimerServiceInitialization();
KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
TypeSerializer<K> keySerializer = keyedStateBackend.getKeySerializer();
InternalTimeServiceManager<K> keyedTimeServiceHandler = (InternalTimeServiceManager<K>) timeServiceManager;
TimerSerializer<K, N> timerSerializer = new TimerSerializer<>(keySerializer, namespaceSerializer);
return keyedTimeServiceHandler.getInternalTimerService(name, timerSerializer, triggerable);
}
該方法的注釋描述非常清楚及皂,所以一起粘貼過來碍拆。簡(jiǎn)單來講:
- 每個(gè)算子可以有一個(gè)或多個(gè)InternalTimerService。
- InternalTimerService的四要素是:名稱、命名空間類型N(及其序列化器)、鍵類型K(及其序列化器),還有上文所述Triggerable接口的實(shí)現(xiàn)吉执。
- InternalTimerService經(jīng)由InternalTimeServiceManager.getInternalTimerService()方法取得。
例如地来,上文KeyedProcessOperator初始化的InternalTimerService戳玫,名稱為"user-timers",命名空間類型為空(VoidNamespace)未斑,Triggerable實(shí)現(xiàn)類則是其本身咕宿。如果是WindowOperator的話,其InternalTimerService的名稱就是"window-timers",命名空間類型則是Window府阀。
InternalTimerService在代碼中仍然是一個(gè)接口缆镣,其代碼如下。方法的簽名除了多了命名空間之外(命名空間對(duì)用戶透明)试浙,其他都與TimerService提供的相同董瞻。
public interface InternalTimerService<N> {
long currentProcessingTime();
long currentWatermark();
void registerProcessingTimeTimer(N namespace, long time);
void deleteProcessingTimeTimer(N namespace, long time);
void registerEventTimeTimer(N namespace, long time);
void deleteEventTimeTimer(N namespace, long time);
// ...
}
下面更進(jìn)一步,看看InternalTimeServiceManager是如何實(shí)現(xiàn)的田巴。
InternalTimeServiceManager钠糊、TimerHeapInternalTimer
顧名思義,InternalTimeServiceManager用于管理各個(gè)InternalTimeService壹哺。部分代碼如下:
public class InternalTimeServiceManager<K> {
@VisibleForTesting
static final String TIMER_STATE_PREFIX = "_timer_state";
@VisibleForTesting
static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_";
@VisibleForTesting
static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_";
private final KeyGroupRange localKeyGroupRange;
private final KeyContext keyContext;
private final PriorityQueueSetFactory priorityQueueSetFactory;
private final ProcessingTimeService processingTimeService;
private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;
private final boolean useLegacySynchronousSnapshots;
@SuppressWarnings("unchecked")
public <N> InternalTimerService<N> getInternalTimerService(
String name,
TimerSerializer<K, N> timerSerializer,
Triggerable<K, N> triggerable) {
InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer);
timerService.startTimerService(
timerSerializer.getKeySerializer(),
timerSerializer.getNamespaceSerializer(),
triggerable);
return timerService;
}
@SuppressWarnings("unchecked")
<N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {
InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name);
if (timerService == null) {
timerService = new InternalTimerServiceImpl<>(
localKeyGroupRange,
keyContext,
processingTimeService,
createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),
createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));
timerServices.put(name, timerService);
}
return timerService;
}
private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue(
String name,
TimerSerializer<K, N> timerSerializer) {
return priorityQueueSetFactory.create(
name,
timerSerializer);
}
// 以下略...
}
從上面的代碼可以得知:
- Flink中InternalTimerService的最終實(shí)現(xiàn)實(shí)際上是InternalTimerServiceImpl類抄伍,而InternalTimer的最終實(shí)現(xiàn)是TimerHeapInternalTimer類。
- InternalTimeServiceManager會(huì)用HashMap維護(hù)一個(gè)特定鍵類型K下所有InternalTimerService的名稱與實(shí)例映射管宵。如果名稱已經(jīng)存在截珍,就會(huì)直接返回,不會(huì)重新創(chuàng)建啄糙。
- 初始化InternalTimerServiceImpl時(shí)笛臣,會(huì)同時(shí)創(chuàng)建兩個(gè)包含TimerHeapInternalTimer的優(yōu)先隊(duì)列(該優(yōu)先隊(duì)列是Flink自己實(shí)現(xiàn)的),分別用于維護(hù)事件時(shí)間和處理時(shí)間的Timer隧饼。
說了這么多沈堡,最需要注意的是,Timer是維護(hù)在JVM堆內(nèi)存中的燕雁,如果頻繁注冊(cè)大量Timer诞丽,或者同時(shí)觸發(fā)大量Timer,也是一筆不小的開銷拐格。
TimerHeapInternalTimer的實(shí)現(xiàn)比較簡(jiǎn)單僧免,主要就是4個(gè)字段和1個(gè)方法。為了少打點(diǎn)字捏浊,把注釋也弄過來懂衩。
/**
* The key for which the timer is scoped.
*/
@Nonnull
private final K key;
/**
* The namespace for which the timer is scoped.
*/
@Nonnull
private final N namespace;
/**
* The expiration timestamp.
*/
private final long timestamp;
/**
* This field holds the current physical index of this timer when it is managed by a timer heap so that we can
* support fast deletes.
*/
private transient int timerHeapIndex;
@Override
public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) {
return Long.compare(timestamp, other.getTimestamp());
}
}
可見,Timer的scope有兩個(gè)金踪,一是數(shù)據(jù)的key浊洞,二是命名空間。但是用戶不會(huì)感知到命名空間的存在胡岔,所以我們可以簡(jiǎn)單地認(rèn)為Timer是以key級(jí)別注冊(cè)的(Timer四大特點(diǎn)之1)法希。正確估計(jì)key的量可以幫助我們控制Timer的量。
timerHeapIndex是這個(gè)Timer在優(yōu)先隊(duì)列里存儲(chǔ)的下標(biāo)靶瘸。優(yōu)先隊(duì)列通常用二叉堆實(shí)現(xiàn)苫亦,而二叉堆可以直接用數(shù)組存儲(chǔ)(科普文見這里)毛肋,所以讓Timer持有其對(duì)應(yīng)的下標(biāo)可以較快地從隊(duì)列里刪除它。
comparePriorityTo()方法則用于確定Timer的優(yōu)先級(jí)屋剑,顯然Timer的優(yōu)先隊(duì)列是一個(gè)按Timer時(shí)間戳為關(guān)鍵字排序的最小堆润匙。下面粗略看看該最小堆的實(shí)現(xiàn)。
HeapPriorityQueueSet
上面代碼中PriorityQueueSetFactory.create()方法創(chuàng)建的優(yōu)先隊(duì)列實(shí)際上的類型是HeapPriorityQueueSet饼丘。它的基本思路與Java自帶的PriorityQueue相同趁桃,但是在其基礎(chǔ)上加入了按key去重的邏輯(Timer四大特點(diǎn)之2)辽话。不妨列出它的部分代碼肄鸽。
private final HashMap<T, T>[] deduplicationMapsByKeyGroup;
private final KeyGroupRange keyGroupRange;
@Override
@Nullable
public T poll() {
final T toRemove = super.poll();
return toRemove != null ? getDedupMapForElement(toRemove).remove(toRemove) : null;
}
@Override
public boolean add(@Nonnull T element) {
return getDedupMapForElement(element).putIfAbsent(element, element) == null && super.add(element);
}
@Override
public boolean remove(@Nonnull T toRemove) {
T storedElement = getDedupMapForElement(toRemove).remove(toRemove);
return storedElement != null && super.remove(storedElement);
}
private HashMap<T, T> getDedupMapForKeyGroup(
@Nonnegative int keyGroupId) {
return deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupId)];
}
private HashMap<T, T> getDedupMapForElement(T element) {
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(
keyExtractor.extractKeyFromElement(element),
totalNumberOfKeyGroups);
return getDedupMapForKeyGroup(keyGroup);
}
private int globalKeyGroupToLocalIndex(int keyGroup) {
checkArgument(keyGroupRange.contains(keyGroup), "%s does not contain key group %s", keyGroupRange, keyGroup);
return keyGroup - keyGroupRange.getStartKeyGroup();
}
要搞懂它,必須解釋一下KeyGroup和KeyGroupRange油啤。KeyGroup是Flink內(nèi)部KeyedState的原子單位典徘,亦即一些key的組合。一個(gè)Flink App的KeyGroup數(shù)量與最大并行度相同益咬,將key分配到KeyGroup的操作則是經(jīng)典的取hashCode+取模逮诲。而KeyGroupRange則是一些連續(xù)KeyGroup的范圍,每個(gè)Flink sub-task都只包含一個(gè)KeyGroupRange幽告。也就是說梅鹦,KeyGroupRange可以看做當(dāng)前sub-task在本地維護(hù)的所有key。
解釋完畢冗锁。容易得知齐唆,上述代碼中的那個(gè)HashMap<T, T>[]
數(shù)組就是在KeyGroup級(jí)別對(duì)key進(jìn)行去重的容器,數(shù)組中每個(gè)元素對(duì)應(yīng)一個(gè)KeyGroup冻河。以插入一個(gè)Timer的流程為例:
- 從Timer中取出key箍邮,計(jì)算該key屬于哪一個(gè)KeyGroup;
- 計(jì)算出該KeyGroup在整個(gè)KeyGroupRange中的偏移量叨叙,按該偏移量定位到
HashMap<T, T>[]
數(shù)組的下標(biāo)锭弊; - 根據(jù)putIfAbsent()方法的語義,只有當(dāng)對(duì)應(yīng)HashMap不存在該Timer的key時(shí)擂错,才將Timer插入最小堆中味滞。
接下來回到主流程,InternalTimerServiceImpl钮呀。
InternalTimerServiceImpl
在這里剑鞍,我們終于可以看到注冊(cè)和移除Timer方法的最底層實(shí)現(xiàn)了。注意ProcessingTimeService是Flink內(nèi)部產(chǎn)生處理時(shí)間的時(shí)間戳的服務(wù)行楞。
private final ProcessingTimeService processingTimeService;
private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue;
private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;
private ScheduledFuture<?> nextTimer;
@Override
public void registerProcessingTimeTimer(N namespace, long time) {
InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
if (time < nextTriggerTime) {
if (nextTimer != null) {
nextTimer.cancel(false);
}
nextTimer = processingTimeService.registerTimer(time, this);
}
}
}
@Override
public void registerEventTimeTimer(N namespace, long time) {
eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
@Override
public void deleteProcessingTimeTimer(N namespace, long time) {
processingTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
@Override
public void deleteEventTimeTimer(N namespace, long time) {
eventTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
由此可見攒暇,注冊(cè)Timer實(shí)際上就是為它們賦予對(duì)應(yīng)的時(shí)間戳、key和命名空間子房,并將它們加入對(duì)應(yīng)的優(yōu)先隊(duì)列形用。特別地就轧,當(dāng)注冊(cè)基于處理時(shí)間的Timer時(shí),會(huì)先檢查要注冊(cè)的Timer時(shí)間戳與當(dāng)前在最小堆堆頂?shù)腡imer的時(shí)間戳的大小關(guān)系田度。如果前者比后者要早妒御,就會(huì)用前者替代掉后者,因?yàn)樘幚頃r(shí)間是永遠(yuǎn)線性增長(zhǎng)的镇饺。
Timer注冊(cè)好了之后是如何觸發(fā)的呢乎莉?先來看處理時(shí)間的情況。
InternalTimerServiceImpl類繼承了ProcessingTimeCallback接口奸笤,表示它可以觸發(fā)處理時(shí)間的回調(diào)惋啃。該接口只要求實(shí)現(xiàn)一個(gè)方法,如下监右。
@Override
private Triggerable<K, N> triggerTarget;
public void onProcessingTime(long time) throws Exception {
nextTimer = null;
InternalTimer<K, N> timer;
while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onProcessingTime(timer);
}
if (timer != null && nextTimer == null) {
nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
}
}
可見边灭,當(dāng)onProcessingTime()方法被觸發(fā)回調(diào)時(shí),就會(huì)按順序從隊(duì)列中獲取到比時(shí)間戳time小的所有Timer健盒,并挨個(gè)執(zhí)行Triggerable.onProcessingTime()方法绒瘦,也就是在上文KeyedProcessOperator的同名方法,用戶自定義的onTimer()邏輯也就被執(zhí)行了扣癣。
最后來到ProcessingTimeService的實(shí)現(xiàn)類SystemProcessingTimeService惰帽,它是用調(diào)度線程池實(shí)現(xiàn)回調(diào)的。相關(guān)的代碼如下父虑。
private final ScheduledThreadPoolExecutor timerService;
@Override
public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {
long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;
try {
return timerService.schedule(
new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
final int status = this.status.get();
if (status == STATUS_QUIESCED) {
return new NeverCompleteFuture(delay);
} else if (status == STATUS_SHUTDOWN) {
throw new IllegalStateException("Timer service is shut down");
} else {
throw e;
}
}
}
// 注意:這個(gè)是TriggerTask線程的run()方法
@Override
public void run() {
synchronized (lock) {
try {
if (serviceStatus.get() == STATUS_ALIVE) {
target.onProcessingTime(timestamp);
}
} catch (Throwable t) {
TimerException asyncException = new TimerException(t);
exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
}
}
}
可見该酗,onProcessingTime()在TriggerTask線程中被回調(diào),而TriggerTask線程按照Timer的時(shí)間戳來調(diào)度频轿。到這里垂涯,處理時(shí)間Timer的情況就講述完畢了。
再來看事件時(shí)間的情況航邢。事件時(shí)間與內(nèi)部時(shí)間戳無關(guān)耕赘,而與水印有關(guān)。以下是InternalTimerServiceImpl.advanceWatermark()方法的代碼膳殷。
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer<K, N> timer;
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
}
該邏輯與處理時(shí)間相似操骡,只不過從回調(diào)onProcessingTime()變成了回調(diào)onEventTime()而已。然后追蹤它的調(diào)用鏈赚窃,回到InternalTimeServiceManager的同名方法册招。
public void advanceWatermark(Watermark watermark) throws Exception {
for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
service.advanceWatermark(watermark.getTimestamp());
}
}
繼續(xù)向上追溯,到達(dá)終點(diǎn):算子基類AbstractStreamOperator中處理水印的方法processWatermark()勒极。當(dāng)水印到來時(shí)是掰,就會(huì)按著上述調(diào)用鏈流轉(zhuǎn)到InternalTimerServiceImpl中,并觸發(fā)所有早于水印時(shí)間戳的Timer了辱匿。
public void processWatermark(Watermark mark) throws Exception {
if (timeServiceManager != null) {
timeServiceManager.advanceWatermark(mark);
}
output.emitWatermark(mark);
}
至此键痛,我們算是基本打通了Flink Timer機(jī)制的實(shí)現(xiàn)細(xì)節(jié)炫彩,well done。
The End
寫完才發(fā)現(xiàn)沒有提Timer的checkpoint邏輯(四大特點(diǎn)之3)絮短。但是本文已經(jīng)相當(dāng)長(zhǎng)了江兢,剩下的內(nèi)容等以后有機(jī)會(huì)寫checkpoint的時(shí)候再說吧。
民那晚安~