Flink Timer(定時(shí)器)機(jī)制與其具體實(shí)現(xiàn)

Timer簡(jiǎn)介

Timer(定時(shí)器)是Flink Streaming API提供的用于感知并利用處理時(shí)間/事件時(shí)間變化的機(jī)制谣膳。Ververica blog上給出的描述如下:

Timers are what make Flink streaming applications reactive and adaptable to processing and event time changes.

對(duì)于普通用戶來說脆霎,最常見的顯式利用Timer的地方就是KeyedProcessFunction肾档。我們?cè)谄鋚rocessElement()方法中注冊(cè)Timer择诈,然后覆寫其onTimer()方法作為Timer觸發(fā)時(shí)的回調(diào)邏輯。根據(jù)時(shí)間特征的不同:

  • 處理時(shí)間——調(diào)用Context.timerService().registerProcessingTimeTimer()注冊(cè)晌缘;onTimer()在系統(tǒng)時(shí)間戳達(dá)到Timer設(shè)定的時(shí)間戳?xí)r觸發(fā)长捧。
  • 事件時(shí)間——調(diào)用Context.timerService().registerEventTimeTimer()注冊(cè);onTimer()在Flink內(nèi)部水印達(dá)到或超過Timer設(shè)定的時(shí)間戳?xí)r觸發(fā)放祟。

舉個(gè)栗子鳍怨,按天實(shí)時(shí)統(tǒng)計(jì)指標(biāo)并存儲(chǔ)在狀態(tài)中,每天0點(diǎn)清除狀態(tài)重新統(tǒng)計(jì)跪妥,就可以在processElement()方法里注冊(cè)Timer鞋喇。

ctx.timerService().registerProcessingTimeTimer(
  tomorrowZeroTimestampMs(System.currentTimeMillis(), 8) + 1
);

public static long tomorrowZeroTimestampMs(long now, int timeZone) {
  return now - (now + timeZone * 3600000) % 86400000 + 86400000;
}

再在onTimer()方法里執(zhí)行state.clear()。so easy眉撵。

除了KeyedProcessFunction之外侦香,Timer在窗口機(jī)制中也有重要的地位。提起窗口自然就能想到Trigger纽疟,即觸發(fā)器罐韩。來看下Flink自帶的EventTimeTrigger的部分代碼,它是事件時(shí)間特征下的默認(rèn)觸發(fā)器污朽。

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ?
            TriggerResult.FIRE :
            TriggerResult.CONTINUE;
    }

可見散吵,當(dāng)水印還沒有到達(dá)窗口右邊沿時(shí),就注冊(cè)以窗口右邊沿為時(shí)間戳的Timer。Timer到期后觸發(fā)onEventTime()方法矾睦,進(jìn)而觸發(fā)該窗口相關(guān)聯(lián)的Trigger晦款。

文章開頭引用的blog從用戶的角度給出了Flink Timer的4大特點(diǎn),如下圖所示枚冗。

經(jīng)由上面的介紹缓溅,我們有了兩個(gè)入手點(diǎn)(KeyedProcessFunction、Trigger)來分析Timer的細(xì)節(jié)赁温。接下來從前者入手肛宋,let's get our hands dirty。

TimerService束世、InternalTimerService

負(fù)責(zé)實(shí)際執(zhí)行KeyedProcessFunction的算子是KeyedProcessOperator酝陈,其中以內(nèi)部類的形式實(shí)現(xiàn)了KeyedProcessFunction需要的上下文類Context,如下所示毁涉。

    private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context {
        private final TimerService timerService;
        private StreamRecord<IN> element;

        ContextImpl(KeyedProcessFunction<K, IN, OUT> function, TimerService timerService) {
            function.super();
            this.timerService = checkNotNull(timerService);
        }

        @Override
        public Long timestamp() {
            checkState(element != null);
            if (element.hasTimestamp()) {
                return element.getTimestamp();
            } else {
                return null;
            }
        }

        @Override
        public TimerService timerService() {
            return timerService;
        }

        // 以下略...
    }

可見timerService()方法返回的是外部傳入的TimerService實(shí)例沉帮,那么我們就回到KeyedProcessOperator看一下它的實(shí)現(xiàn),順便放個(gè)類圖贫堰。

public class KeyedProcessOperator<K, IN, OUT>
    extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
    implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
    private static final long serialVersionUID = 1L;
    private transient TimestampedCollector<OUT> collector;
    private transient ContextImpl context;
    private transient OnTimerContextImpl onTimerContext;

    public KeyedProcessOperator(KeyedProcessFunction<K, IN, OUT> function) {
        super(function);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void open() throws Exception {
        super.open();
        collector = new TimestampedCollector<>(output);
        InternalTimerService<VoidNamespace> internalTimerService =
            getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
        TimerService timerService = new SimpleTimerService(internalTimerService);
        context = new ContextImpl(userFunction, timerService);
        onTimerContext = new OnTimerContextImpl(userFunction, timerService);
    }

    @Override
    public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
        collector.setAbsoluteTimestamp(timer.getTimestamp());
        invokeUserFunction(TimeDomain.EVENT_TIME, timer);
    }

    @Override
    public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
        collector.eraseTimestamp();
        invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        context.element = element;
        userFunction.processElement(element.getValue(), context, collector);
        context.element = null;
    }

    private void invokeUserFunction(
        TimeDomain timeDomain,
        InternalTimer<K, VoidNamespace> timer) throws Exception {
        onTimerContext.timeDomain = timeDomain;
        onTimerContext.timer = timer;
        userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
        onTimerContext.timeDomain = null;
        onTimerContext.timer = null;
    }

    // 以下略...
}

通過閱讀上述代碼喇勋,可以總結(jié)出:

  • TimerService接口的實(shí)現(xiàn)類為SimpleTimerService,它實(shí)際上又是InternalTimerService的非常簡(jiǎn)單的代理(真的很簡(jiǎn)單偎行,代碼略去)川背。
  • InternalTimerService的實(shí)例由getInternalTimerService()方法取得,該方法定義在所有算子的基類AbstractStreamOperator中蛤袒。它比較重要熄云,后面再提。
  • KeyedProcessOperator.processElement()方法調(diào)用用戶自定義函數(shù)的processElement()方法妙真,順便將上下文實(shí)例ContextImpl傳了進(jìn)去,所以用戶可以由它獲得TimerService來注冊(cè)Timer珍德。
  • Timer在代碼中叫做InternalTimer(是個(gè)接口)锈候。
  • 當(dāng)Timer觸發(fā)時(shí),實(shí)際上是根據(jù)時(shí)間特征調(diào)用onProcessingTime()/onEventTime()方法(這兩個(gè)方法來自Triggerable接口)晴及,進(jìn)而觸發(fā)用戶函數(shù)的onTimer()回調(diào)邏輯都办。后面還會(huì)見到它們嫡锌。

接下來就看看InternalTimerService是如何取得的虑稼。

    /**
     * Returns a {@link InternalTimerService} that can be used to query current processing time
     * and event time and to set timers. An operator can have several timer services, where
     * each has its own namespace serializer. Timer services are differentiated by the string
     * key that is given when requesting them, if you call this method with the same key
     * multiple times you will get the same timer service instance in subsequent requests.
     *
     * <p>Timers are always scoped to a key, the currently active key of a keyed stream operation.
     * When a timer fires, this key will also be set as the currently active key.
     *
     * <p>Each timer has attached metadata, the namespace. Different timer services
     * can have a different namespace type. If you don't need namespace differentiation you
     * can use {@link VoidNamespaceSerializer} as the namespace serializer.
     *
     * @param name                The name of the requested timer service. If no service exists under the given
     *                            name a new one will be created and returned.
     * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
     * @param triggerable         The {@link Triggerable} that should be invoked when timers fire
     * @param <N>                 The type of the timer namespace.
     */
    public <K, N> InternalTimerService<N> getInternalTimerService(
        String name,
        TypeSerializer<N> namespaceSerializer,
        Triggerable<K, N> triggerable) {
        checkTimerServiceInitialization();

        KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
        TypeSerializer<K> keySerializer = keyedStateBackend.getKeySerializer();
        InternalTimeServiceManager<K> keyedTimeServiceHandler = (InternalTimeServiceManager<K>) timeServiceManager;
        TimerSerializer<K, N> timerSerializer = new TimerSerializer<>(keySerializer, namespaceSerializer);
        return keyedTimeServiceHandler.getInternalTimerService(name, timerSerializer, triggerable);
    }

該方法的注釋描述非常清楚及皂,所以一起粘貼過來碍拆。簡(jiǎn)單來講:

  • 每個(gè)算子可以有一個(gè)或多個(gè)InternalTimerService。
  • InternalTimerService的四要素是:名稱、命名空間類型N(及其序列化器)、鍵類型K(及其序列化器),還有上文所述Triggerable接口的實(shí)現(xiàn)吉执。
  • InternalTimerService經(jīng)由InternalTimeServiceManager.getInternalTimerService()方法取得。

例如地来,上文KeyedProcessOperator初始化的InternalTimerService戳玫,名稱為"user-timers",命名空間類型為空(VoidNamespace)未斑,Triggerable實(shí)現(xiàn)類則是其本身咕宿。如果是WindowOperator的話,其InternalTimerService的名稱就是"window-timers",命名空間類型則是Window府阀。

InternalTimerService在代碼中仍然是一個(gè)接口缆镣,其代碼如下。方法的簽名除了多了命名空間之外(命名空間對(duì)用戶透明)试浙,其他都與TimerService提供的相同董瞻。

public interface InternalTimerService<N> {
    long currentProcessingTime();
    long currentWatermark();

    void registerProcessingTimeTimer(N namespace, long time);
    void deleteProcessingTimeTimer(N namespace, long time);

    void registerEventTimeTimer(N namespace, long time);
    void deleteEventTimeTimer(N namespace, long time);

    // ...
}

下面更進(jìn)一步,看看InternalTimeServiceManager是如何實(shí)現(xiàn)的田巴。

InternalTimeServiceManager钠糊、TimerHeapInternalTimer

顧名思義,InternalTimeServiceManager用于管理各個(gè)InternalTimeService壹哺。部分代碼如下:

public class InternalTimeServiceManager<K> {
    @VisibleForTesting
    static final String TIMER_STATE_PREFIX = "_timer_state";
    @VisibleForTesting
    static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_";
    @VisibleForTesting
    static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_";

    private final KeyGroupRange localKeyGroupRange;
    private final KeyContext keyContext;
    private final PriorityQueueSetFactory priorityQueueSetFactory;
    private final ProcessingTimeService processingTimeService;
    private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;
    private final boolean useLegacySynchronousSnapshots;

    @SuppressWarnings("unchecked")
    public <N> InternalTimerService<N> getInternalTimerService(
        String name,
        TimerSerializer<K, N> timerSerializer,
        Triggerable<K, N> triggerable) {
        InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer);
        timerService.startTimerService(
            timerSerializer.getKeySerializer(),
            timerSerializer.getNamespaceSerializer(),
            triggerable);
        return timerService;
    }

    @SuppressWarnings("unchecked")
    <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {
        InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name);
        if (timerService == null) {
            timerService = new InternalTimerServiceImpl<>(
                localKeyGroupRange,
                keyContext,
                processingTimeService,
                createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),
                createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));
            timerServices.put(name, timerService);
        }
        return timerService;
    }

    private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue(
        String name,
        TimerSerializer<K, N> timerSerializer) {
        return priorityQueueSetFactory.create(
            name,
            timerSerializer);
    }

    // 以下略...
}

從上面的代碼可以得知:

  • Flink中InternalTimerService的最終實(shí)現(xiàn)實(shí)際上是InternalTimerServiceImpl類抄伍,而InternalTimer的最終實(shí)現(xiàn)是TimerHeapInternalTimer類。
  • InternalTimeServiceManager會(huì)用HashMap維護(hù)一個(gè)特定鍵類型K下所有InternalTimerService的名稱與實(shí)例映射管宵。如果名稱已經(jīng)存在截珍,就會(huì)直接返回,不會(huì)重新創(chuàng)建啄糙。
  • 初始化InternalTimerServiceImpl時(shí)笛臣,會(huì)同時(shí)創(chuàng)建兩個(gè)包含TimerHeapInternalTimer的優(yōu)先隊(duì)列(該優(yōu)先隊(duì)列是Flink自己實(shí)現(xiàn)的),分別用于維護(hù)事件時(shí)間和處理時(shí)間的Timer隧饼。

說了這么多沈堡,最需要注意的是,Timer是維護(hù)在JVM堆內(nèi)存中的燕雁,如果頻繁注冊(cè)大量Timer诞丽,或者同時(shí)觸發(fā)大量Timer,也是一筆不小的開銷拐格。

TimerHeapInternalTimer的實(shí)現(xiàn)比較簡(jiǎn)單僧免,主要就是4個(gè)字段和1個(gè)方法。為了少打點(diǎn)字捏浊,把注釋也弄過來懂衩。

    /**
     * The key for which the timer is scoped.
     */
    @Nonnull
    private final K key;
    /**
     * The namespace for which the timer is scoped.
     */
    @Nonnull
    private final N namespace;
    /**
     * The expiration timestamp.
     */
    private final long timestamp;
    /**
     * This field holds the current physical index of this timer when it is managed by a timer heap so that we can
     * support fast deletes.
     */
    private transient int timerHeapIndex;

    @Override
    public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) {
        return Long.compare(timestamp, other.getTimestamp());
    }
}

可見,Timer的scope有兩個(gè)金踪,一是數(shù)據(jù)的key浊洞,二是命名空間。但是用戶不會(huì)感知到命名空間的存在胡岔,所以我們可以簡(jiǎn)單地認(rèn)為Timer是以key級(jí)別注冊(cè)的(Timer四大特點(diǎn)之1)法希。正確估計(jì)key的量可以幫助我們控制Timer的量。

timerHeapIndex是這個(gè)Timer在優(yōu)先隊(duì)列里存儲(chǔ)的下標(biāo)靶瘸。優(yōu)先隊(duì)列通常用二叉堆實(shí)現(xiàn)苫亦,而二叉堆可以直接用數(shù)組存儲(chǔ)(科普文見這里)毛肋,所以讓Timer持有其對(duì)應(yīng)的下標(biāo)可以較快地從隊(duì)列里刪除它。

comparePriorityTo()方法則用于確定Timer的優(yōu)先級(jí)屋剑,顯然Timer的優(yōu)先隊(duì)列是一個(gè)按Timer時(shí)間戳為關(guān)鍵字排序的最小堆润匙。下面粗略看看該最小堆的實(shí)現(xiàn)。

HeapPriorityQueueSet

上面代碼中PriorityQueueSetFactory.create()方法創(chuàng)建的優(yōu)先隊(duì)列實(shí)際上的類型是HeapPriorityQueueSet饼丘。它的基本思路與Java自帶的PriorityQueue相同趁桃,但是在其基礎(chǔ)上加入了按key去重的邏輯(Timer四大特點(diǎn)之2)辽话。不妨列出它的部分代碼肄鸽。

    private final HashMap<T, T>[] deduplicationMapsByKeyGroup;
    private final KeyGroupRange keyGroupRange;

    @Override
    @Nullable
    public T poll() {
        final T toRemove = super.poll();
        return toRemove != null ? getDedupMapForElement(toRemove).remove(toRemove) : null;
    }

    @Override
    public boolean add(@Nonnull T element) {
        return getDedupMapForElement(element).putIfAbsent(element, element) == null && super.add(element);
    }

    @Override
    public boolean remove(@Nonnull T toRemove) {
        T storedElement = getDedupMapForElement(toRemove).remove(toRemove);
        return storedElement != null && super.remove(storedElement);
    }

    private HashMap<T, T> getDedupMapForKeyGroup(
        @Nonnegative int keyGroupId) {
        return deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupId)];
    }

    private HashMap<T, T> getDedupMapForElement(T element) {
        int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(
            keyExtractor.extractKeyFromElement(element),
            totalNumberOfKeyGroups);
        return getDedupMapForKeyGroup(keyGroup);
    }

    private int globalKeyGroupToLocalIndex(int keyGroup) {
        checkArgument(keyGroupRange.contains(keyGroup), "%s does not contain key group %s", keyGroupRange, keyGroup);
        return keyGroup - keyGroupRange.getStartKeyGroup();
    }

要搞懂它,必須解釋一下KeyGroup和KeyGroupRange油啤。KeyGroup是Flink內(nèi)部KeyedState的原子單位典徘,亦即一些key的組合。一個(gè)Flink App的KeyGroup數(shù)量與最大并行度相同益咬,將key分配到KeyGroup的操作則是經(jīng)典的取hashCode+取模逮诲。而KeyGroupRange則是一些連續(xù)KeyGroup的范圍,每個(gè)Flink sub-task都只包含一個(gè)KeyGroupRange幽告。也就是說梅鹦,KeyGroupRange可以看做當(dāng)前sub-task在本地維護(hù)的所有key。

解釋完畢冗锁。容易得知齐唆,上述代碼中的那個(gè)HashMap<T, T>[]數(shù)組就是在KeyGroup級(jí)別對(duì)key進(jìn)行去重的容器,數(shù)組中每個(gè)元素對(duì)應(yīng)一個(gè)KeyGroup冻河。以插入一個(gè)Timer的流程為例:

  • 從Timer中取出key箍邮,計(jì)算該key屬于哪一個(gè)KeyGroup;
  • 計(jì)算出該KeyGroup在整個(gè)KeyGroupRange中的偏移量叨叙,按該偏移量定位到HashMap<T, T>[]數(shù)組的下標(biāo)锭弊;
  • 根據(jù)putIfAbsent()方法的語義,只有當(dāng)對(duì)應(yīng)HashMap不存在該Timer的key時(shí)擂错,才將Timer插入最小堆中味滞。

接下來回到主流程,InternalTimerServiceImpl钮呀。

InternalTimerServiceImpl

在這里剑鞍,我們終于可以看到注冊(cè)和移除Timer方法的最底層實(shí)現(xiàn)了。注意ProcessingTimeService是Flink內(nèi)部產(chǎn)生處理時(shí)間的時(shí)間戳的服務(wù)行楞。

    private final ProcessingTimeService processingTimeService;
    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue;
    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;
    private ScheduledFuture<?> nextTimer;

    @Override
    public void registerProcessingTimeTimer(N namespace, long time) {
        InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
        if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
            long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
            if (time < nextTriggerTime) {
                if (nextTimer != null) {
                    nextTimer.cancel(false);
                }
                nextTimer = processingTimeService.registerTimer(time, this);
            }
        }
    }

    @Override
    public void registerEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

    @Override
    public void deleteProcessingTimeTimer(N namespace, long time) {
        processingTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

    @Override
    public void deleteEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

由此可見攒暇,注冊(cè)Timer實(shí)際上就是為它們賦予對(duì)應(yīng)的時(shí)間戳、key和命名空間子房,并將它們加入對(duì)應(yīng)的優(yōu)先隊(duì)列形用。特別地就轧,當(dāng)注冊(cè)基于處理時(shí)間的Timer時(shí),會(huì)先檢查要注冊(cè)的Timer時(shí)間戳與當(dāng)前在最小堆堆頂?shù)腡imer的時(shí)間戳的大小關(guān)系田度。如果前者比后者要早妒御,就會(huì)用前者替代掉后者,因?yàn)樘幚頃r(shí)間是永遠(yuǎn)線性增長(zhǎng)的镇饺。

Timer注冊(cè)好了之后是如何觸發(fā)的呢乎莉?先來看處理時(shí)間的情況。

InternalTimerServiceImpl類繼承了ProcessingTimeCallback接口奸笤,表示它可以觸發(fā)處理時(shí)間的回調(diào)惋啃。該接口只要求實(shí)現(xiàn)一個(gè)方法,如下监右。

    @Override
    private Triggerable<K, N> triggerTarget;

    public void onProcessingTime(long time) throws Exception {
        nextTimer = null;
        InternalTimer<K, N> timer;

        while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            processingTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onProcessingTime(timer);
        }

        if (timer != null && nextTimer == null) {
            nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
        }
    }

可見边灭,當(dāng)onProcessingTime()方法被觸發(fā)回調(diào)時(shí),就會(huì)按順序從隊(duì)列中獲取到比時(shí)間戳time小的所有Timer健盒,并挨個(gè)執(zhí)行Triggerable.onProcessingTime()方法绒瘦,也就是在上文KeyedProcessOperator的同名方法,用戶自定義的onTimer()邏輯也就被執(zhí)行了扣癣。

最后來到ProcessingTimeService的實(shí)現(xiàn)類SystemProcessingTimeService惰帽,它是用調(diào)度線程池實(shí)現(xiàn)回調(diào)的。相關(guān)的代碼如下父虑。

    private final ScheduledThreadPoolExecutor timerService;

    @Override
    public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {
        long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;

        try {
            return timerService.schedule(
                new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException e) {
            final int status = this.status.get();
            if (status == STATUS_QUIESCED) {
                return new NeverCompleteFuture(delay);
            } else if (status == STATUS_SHUTDOWN) {
                throw new IllegalStateException("Timer service is shut down");
            } else {
                throw e;
            }
        }
    }

    // 注意:這個(gè)是TriggerTask線程的run()方法
    @Override
    public void run() {
        synchronized (lock) {
            try {
                if (serviceStatus.get() == STATUS_ALIVE) {
                    target.onProcessingTime(timestamp);
                }
            } catch (Throwable t) {
                TimerException asyncException = new TimerException(t);
                exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
            }
        }
    }

可見该酗,onProcessingTime()在TriggerTask線程中被回調(diào),而TriggerTask線程按照Timer的時(shí)間戳來調(diào)度频轿。到這里垂涯,處理時(shí)間Timer的情況就講述完畢了。

再來看事件時(shí)間的情況航邢。事件時(shí)間與內(nèi)部時(shí)間戳無關(guān)耕赘,而與水印有關(guān)。以下是InternalTimerServiceImpl.advanceWatermark()方法的代碼膳殷。

    public void advanceWatermark(long time) throws Exception {
        currentWatermark = time;
        InternalTimer<K, N> timer;

        while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            eventTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onEventTime(timer);
        }
    }

該邏輯與處理時(shí)間相似操骡,只不過從回調(diào)onProcessingTime()變成了回調(diào)onEventTime()而已。然后追蹤它的調(diào)用鏈赚窃,回到InternalTimeServiceManager的同名方法册招。

    public void advanceWatermark(Watermark watermark) throws Exception {
        for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
            service.advanceWatermark(watermark.getTimestamp());
        }
    }

繼續(xù)向上追溯,到達(dá)終點(diǎn):算子基類AbstractStreamOperator中處理水印的方法processWatermark()勒极。當(dāng)水印到來時(shí)是掰,就會(huì)按著上述調(diào)用鏈流轉(zhuǎn)到InternalTimerServiceImpl中,并觸發(fā)所有早于水印時(shí)間戳的Timer了辱匿。

    public void processWatermark(Watermark mark) throws Exception {
        if (timeServiceManager != null) {
            timeServiceManager.advanceWatermark(mark);
        }
        output.emitWatermark(mark);
    }

至此键痛,我們算是基本打通了Flink Timer機(jī)制的實(shí)現(xiàn)細(xì)節(jié)炫彩,well done。

The End

寫完才發(fā)現(xiàn)沒有提Timer的checkpoint邏輯(四大特點(diǎn)之3)絮短。但是本文已經(jīng)相當(dāng)長(zhǎng)了江兢,剩下的內(nèi)容等以后有機(jī)會(huì)寫checkpoint的時(shí)候再說吧。

民那晚安~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末丁频,一起剝皮案震驚了整個(gè)濱河市杉允,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌席里,老刑警劉巖叔磷,帶你破解...
    沈念sama閱讀 218,036評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異胁勺,居然都是意外死亡世澜,警方通過查閱死者的電腦和手機(jī)独旷,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門署穗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人嵌洼,你說我怎么就攤上這事案疲。” “怎么了麻养?”我有些...
    開封第一講書人閱讀 164,411評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵褐啡,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我鳖昌,道長(zhǎng)备畦,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,622評(píng)論 1 293
  • 正文 為了忘掉前任许昨,我火速辦了婚禮懂盐,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘糕档。我一直安慰自己莉恼,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,661評(píng)論 6 392
  • 文/花漫 我一把揭開白布速那。 她就那樣靜靜地躺著俐银,像睡著了一般。 火紅的嫁衣襯著肌膚如雪端仰。 梳的紋絲不亂的頭發(fā)上捶惜,一...
    開封第一講書人閱讀 51,521評(píng)論 1 304
  • 那天,我揣著相機(jī)與錄音荔烧,去河邊找鬼吱七。 笑死坞淮,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的陪捷。 我是一名探鬼主播回窘,決...
    沈念sama閱讀 40,288評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼市袖!你這毒婦竟也來了啡直?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,200評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤苍碟,失蹤者是張志新(化名)和其女友劉穎酒觅,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體微峰,經(jīng)...
    沈念sama閱讀 45,644評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡舷丹,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,837評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蜓肆。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片颜凯。...
    茶點(diǎn)故事閱讀 39,953評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖仗扬,靈堂內(nèi)的尸體忽然破棺而出症概,到底是詐尸還是另有隱情,我是刑警寧澤早芭,帶...
    沈念sama閱讀 35,673評(píng)論 5 346
  • 正文 年R本政府宣布彼城,位于F島的核電站,受9級(jí)特大地震影響退个,放射性物質(zhì)發(fā)生泄漏募壕。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,281評(píng)論 3 329
  • 文/蒙蒙 一语盈、第九天 我趴在偏房一處隱蔽的房頂上張望舱馅。 院中可真熱鬧,春花似錦黎烈、人聲如沸习柠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽资溃。三九已至,卻和暖如春烈炭,著一層夾襖步出監(jiān)牢的瞬間溶锭,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評(píng)論 1 269
  • 我被黑心中介騙來泰國打工符隙, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留趴捅,地道東北人垫毙。 一個(gè)月前我還...
    沈念sama閱讀 48,119評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像拱绑,于是被迫代替她去往敵國和親综芥。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,901評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容