Dubbo——時(shí)間輪(Time Wheel)算法應(yīng)用

定時(shí)任務(wù)

Netty、Quartz、Kafka 以及 Linux 都有定時(shí)任務(wù)功能。

JDK 自帶的 java.util.TimerDelayedQueue 可實(shí)現(xiàn)簡(jiǎn)單的定時(shí)任務(wù),底層用的是堆,存取復(fù)雜度都是 O(nlog(n))烹俗,但無法支撐海量定時(shí)任務(wù)爆侣。

在任務(wù)量大、性能要求高的場(chǎng)景幢妄,為了將任務(wù)存取及取消操作時(shí)間復(fù)雜度降為 O(1)兔仰,會(huì)采用時(shí)間輪算法。

什么是時(shí)間輪

  • 調(diào)度模型:時(shí)間輪是為解決高效調(diào)度任務(wù)而產(chǎn)生的調(diào)度模型蕉鸳。
  • 數(shù)據(jù)結(jié)構(gòu):通常由 hash table 和 鏈表 實(shí)現(xiàn)的數(shù)據(jù)結(jié)構(gòu)乎赴。
  • 延時(shí)任務(wù)、周期性任務(wù):應(yīng)用場(chǎng)景主要在延遲大規(guī)模的延時(shí)任務(wù)潮尝、周期性的定時(shí)任務(wù)等榕吼。目前在 Kafka、caffeine勉失、netty 等各種任務(wù)調(diào)度功能中作為調(diào)度器使用羹蚣。

時(shí)間輪模型及其應(yīng)用

一種高效批量管理定時(shí)任務(wù)的調(diào)度模型。一般會(huì)實(shí)現(xiàn)成一個(gè)環(huán)形結(jié)構(gòu)乱凿,類似一個(gè)時(shí)鐘蝗砾,分為很多槽皱碘,一個(gè)槽代表一個(gè)時(shí)間間隔,每個(gè)槽使用雙向鏈表存儲(chǔ)定時(shí)任務(wù)席噩。

指針周期性跳動(dòng)癌幕,跳動(dòng)到一個(gè)槽位鹏漆,就執(zhí)行該槽位的定時(shí)任務(wù)抑钟。

Hashed Timing Wheel 結(jié)構(gòu)示意圖

時(shí)間輪應(yīng)用:

  • 故障恢復(fù)
  • 流量控制
  • 調(diào)度算法
  • 控制網(wǎng)絡(luò)中的數(shù)據(jù)包生命周期

計(jì)時(shí)器維護(hù)代價(jià)高德迹,如果

  • 處理器在每個(gè)時(shí)鐘滴答聲中都會(huì)中斷
  • 使用精細(xì)粒度計(jì)時(shí)器
  • 未完成的計(jì)時(shí)器很多

需要高效的定時(shí)器算法以減少總體中斷的開銷。
單層時(shí)間輪的容量和精度都是有限的寺枉,對(duì)于精度要求特別高抑淫、時(shí)間跨度特別大或是海量定時(shí)任務(wù)需要調(diào)度的場(chǎng)景,通常會(huì)使用多級(jí)時(shí)間輪以及持久化存儲(chǔ)與時(shí)間輪結(jié)合的方案型凳。

Dubbo的時(shí)間輪結(jié)構(gòu)

Dubbo 時(shí)間輪實(shí)現(xiàn)位于 dubbo-common 模塊的 org.apache.dubbo.common.timer 包丈冬,下面我們就來分析時(shí)間輪涉及的核心接口和實(shí)現(xiàn)嘱函。

TimerTask

在 Dubbo 中甘畅,所有定時(shí)任務(wù)都要實(shí)現(xiàn) TimerTask 接口。只定義了一個(gè) run() 方法往弓,入?yún)⑹且粋€(gè) Timeout 接口對(duì)象疏唾。

public interface TimerTask {

    void run(Timeout timeout) throws Exception;
}

Timeout

Timeout 對(duì)象與 TimerTask 對(duì)象一一對(duì)應(yīng),類似線程池返回的 Future 對(duì)象與提交到線程池中的任務(wù)對(duì)象之間的關(guān)系函似。
通過 Timeout 對(duì)象槐脏,不僅可以查看定時(shí)任務(wù)的狀態(tài),還可以操作定時(shí)任務(wù)(例如取消關(guān)聯(lián)的定時(shí)任務(wù))撇寞。

/**
 * Timeout 對(duì)象與 TimerTask 對(duì)象一一對(duì)應(yīng)顿天,兩者的關(guān)系類似于線程池返回的 Future 對(duì)象與提交到線程池中的任務(wù)對(duì)象之間的關(guān)系堂氯。
 * 通過 Timeout 對(duì)象,我們不僅可以查看定時(shí)任務(wù)的狀態(tài)牌废,還可以操作定時(shí)任務(wù)
 */
public interface Timeout {

    /**
     * 返回創(chuàng)建自己的定時(shí)器
     */
    Timer timer();

    /**
     * 返回關(guān)聯(lián)的定時(shí)任務(wù)
     */
    TimerTask task();

    /**
     * 返回定時(shí)任務(wù)是否到期
     */
    boolean isExpired();

    /**
     * 返回定時(shí)任務(wù)是否被取消
     */
    boolean isCancelled();

    /**
     * 嘗試取消定時(shí)任務(wù)咽白,如果任務(wù)已經(jīng)被執(zhí)行或已經(jīng)取消,方法正常返回.
     *
     * @return True if the cancellation completed successfully, otherwise false
     */
    boolean cancel();
}

Timer

Timer 接口定義了定時(shí)器的基本行為鸟缕,核心是 newTimeout() :提交一個(gè)定時(shí)任務(wù)(TimerTask)并返回關(guān)聯(lián)的 Timeout 對(duì)象晶框,類似于向線程池提交任務(wù)。

public interface Timer {

    /**  
     * 提交一個(gè)定時(shí)任務(wù)(TimerTask)懂从,類似于向線程池提交任務(wù)
     * @return 返回關(guān)聯(lián)的 Timeout 對(duì)象
     * @throws IllegalStateException      if this timer has been {@linkplain #stop() stopped} already
     * @throws RejectedExecutionException if the pending timeouts are too many and creating new timeout
     *                                    can cause instability in the system.
     */
    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);

    /**
     * @return 方法返回被取消的任務(wù)對(duì)應(yīng)的Timeout集合
     */
    Set<Timeout> stop();

    /**
     * 判斷定時(shí)器是否停止
     *
     * @return true for stop
     */
    boolean isStop();
}

HashedWheelTimeout

HashedWheelTimeout 是 Timeout 接口的唯一實(shí)現(xiàn)授段,是 HashedWheelTimer 的內(nèi)部類。HashedWheelTimeout 扮演了兩個(gè)角色:

  • 1番甩、時(shí)間輪中雙向鏈表的節(jié)點(diǎn)侵贵,即定時(shí)任務(wù) TimerTask 在 HashedWheelTimer 中的容器。
  • 2对室、定時(shí)任務(wù) TimerTask 提交到 HashedWheelTimer 之后返回的句柄(Handle)模燥,用于在時(shí)間輪外部查看和控制定時(shí)任務(wù)。

HashedWheelTimeout 中的核心字段如下:

  • prev掩宜、next(HashedWheelTimeout類型):分別對(duì)應(yīng)當(dāng)前定時(shí)任務(wù)在鏈表中的前驅(qū)節(jié)點(diǎn)和后繼節(jié)點(diǎn)蔫骂。

  • task(TimerTask類型):指實(shí)際被調(diào)度的任務(wù)。

  • deadline(long類型):指定時(shí)任務(wù)執(zhí)行的時(shí)間牺汤。這個(gè)時(shí)間是在創(chuàng)建 HashedWheelTimeout 時(shí)指定的辽旋,計(jì)算公式是:currentTime(創(chuàng)建 HashedWheelTimeout 的時(shí)間) + delay(任務(wù)延遲時(shí)間) - startTime(HashedWheelTimer 的啟動(dòng)時(shí)間),時(shí)間單位為納秒檐迟。

  • state(volatile int類型):指定時(shí)任務(wù)當(dāng)前所處狀態(tài)补胚,可選的有三個(gè),分別是 INIT(0)追迟、CANCELLED(1)和 EXPIRED(2)溶其。另外,還有一個(gè) STATE_UPDATER 字段(AtomicIntegerFieldUpdater類型)實(shí)現(xiàn) state 狀態(tài)變更的原子性敦间。

  • remainingRounds(long類型):指當(dāng)前任務(wù)剩余的時(shí)鐘周期數(shù)瓶逃。時(shí)間輪所能表示的時(shí)間長(zhǎng)度是有限的,在任務(wù)到期時(shí)間與當(dāng)前時(shí)刻的時(shí)間差廓块,超過時(shí)間輪單圈能表示的時(shí)長(zhǎng)厢绝,就出現(xiàn)了套圈的情況,需要該字段值表示剩余的時(shí)鐘周期带猴。

HashedWheelTimeout 中的核心方法有:

  • isCancelled()昔汉、isExpired() 、state() 方法:主要用于檢查當(dāng)前 HashedWheelTimeout 狀態(tài)拴清。

  • cancel() 方法:將當(dāng)前 HashedWheelTimeout 的狀態(tài)設(shè)置為 CANCELLED靶病,并將當(dāng)前 HashedWheelTimeout 添加到 cancelledTimeouts 隊(duì)列中等待銷毀会通。

  • expire() 方法:當(dāng)任務(wù)到期時(shí),會(huì)調(diào)用該方法將當(dāng)前 HashedWheelTimeout 設(shè)置為 EXPIRED 狀態(tài)娄周,然后調(diào)用其中的 TimerTask 的 run() 方法執(zhí)行定時(shí)任務(wù)渴语。

  • remove() 方法:將當(dāng)前 HashedWheelTimeout 從時(shí)間輪中刪除。

/**
 * HashedWheelTimeout 是 Timeout 接口的唯一實(shí)現(xiàn)
 * 1. 時(shí)間輪中雙向鏈表的節(jié)點(diǎn)昆咽,即定時(shí)任務(wù) TimerTask 在 HashedWheelTimer 中的容器
 * 2. 定時(shí)任務(wù) TimerTask 提交到 HashedWheelTimer 之后返回的句柄驾凶,用于在時(shí)間輪外部查看和控制定時(shí)任務(wù)
 */
private static final class HashedWheelTimeout implements Timeout {

    /** 狀態(tài)機(jī) */
    private static final int ST_INIT = 0;
    private static final int ST_CANCELLED = 1;
    private static final int ST_EXPIRED = 2;
    
    /** 實(shí)現(xiàn) state 狀態(tài)變更的原子性 */
    private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");

    // 所在的時(shí)間輪,定時(shí)器
    private final HashedWheelTimer timer;
    // 關(guān)聯(lián)的定時(shí)任務(wù)
    private final TimerTask task;
    private final long deadline;

    @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"})
    private volatile int state = ST_INIT;

    /**
     * 當(dāng)前任務(wù)剩余的時(shí)鐘周期數(shù)掷酗,由Worker計(jì)算.
     * 時(shí)間輪所能表示的時(shí)間長(zhǎng)度是有限的调违,在任務(wù)到期時(shí)間與當(dāng)前時(shí)刻的時(shí)間差,超過時(shí)間輪單圈能表示的時(shí)長(zhǎng)泻轰,就出現(xiàn)了套圈的情況技肩,需要該字段值表示剩余的時(shí)鐘周期
     * transferTimeoutsToBuckets() before the HashedWheelTimeout will be added to the correct HashedWheelBucket.
     */
    long remainingRounds;

    /**
     * 在 HashedWheelTimerBucket構(gòu)建雙向鏈表
     * 只要worker線程操作,不需要同步原語進(jìn)行同步
     */
    HashedWheelTimeout next;
    HashedWheelTimeout prev;

    /**
     * 所添加的桶
     */
    HashedWheelBucket bucket;

    /**
     * 構(gòu)造函數(shù)浮声,HashedWheelTimer的newTimeout方法中調(diào)用
     */
    HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
        this.timer = timer;
        this.task = task;
        this.deadline = deadline;
    }

    @Override
    public Timer timer() {
        return timer;
    }

    @Override
    public TimerTask task() {
        return task;
    }

    @Override
    public boolean cancel() {
        // only update the state it will be removed from HashedWheelBucket on next tick.
        if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
            return false;
        }
        // If a task should be canceled we put this to another queue which will be processed on each tick.
        // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
        // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
        timer.cancelledTimeouts.add(this);
        return true;
    }

    void remove() {
        HashedWheelBucket bucket = this.bucket;
        if (bucket != null) {
            bucket.remove(this);
        } else {
            timer.pendingTimeouts.decrementAndGet();
        }
    }

    public boolean compareAndSetState(int expected, int state) {
        return STATE_UPDATER.compareAndSet(this, expected, state);
    }

    public int state() {
        return state;
    }

    @Override
    public boolean isCancelled() {
        return state() == ST_CANCELLED;
    }

    @Override
    public boolean isExpired() {
        return state() == ST_EXPIRED;
    }

    /**
     * 設(shè)置到期虚婿,并調(diào)用TimeTask的run方法
     */
    public void expire() {
        if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
            return;
        }

        try {
            task.run(this);
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
            }
        }
    }

    @Override
    public String toString() {
        final long currentTime = System.nanoTime();
        long remaining = deadline - currentTime + timer.startTime;
        String simpleClassName = ClassUtils.simpleClassName(this.getClass());

        StringBuilder buf = new StringBuilder(192)
                .append(simpleClassName)
                .append('(')
                .append("deadline: ");
        if (remaining > 0) {
            buf.append(remaining)
                    .append(" ns later");
        } else if (remaining < 0) {
            buf.append(-remaining)
                    .append(" ns ago");
        } else {
            buf.append("now");
        }

        if (isCancelled()) {
            buf.append(", cancelled");
        }

        return buf.append(", task: ")
                .append(task())
                .append(')')
                .toString();
    }
}

HashedWheelBucket

HashedWheelBucket 是時(shí)間輪中的一個(gè)槽,時(shí)間輪中的槽實(shí)際上就是一個(gè)用于緩存和管理雙向鏈表的容器泳挥,雙向鏈表中的每一個(gè)節(jié)點(diǎn)就是一個(gè) HashedWheelTimeout 對(duì)象然痊,也就關(guān)聯(lián)了一個(gè) TimerTask 定時(shí)任務(wù)。

HashedWheelBucket 持有雙向鏈表的首尾兩個(gè)節(jié)點(diǎn)屉符,分別是 head 和 tail 兩個(gè)字段剧浸,再加上每個(gè) HashedWheelTimeout 節(jié)點(diǎn)均持有前驅(qū)和后繼的引用,這樣就可以正向或是逆向遍歷整個(gè)雙向鏈表了矗钟。

HashedWheelBucket 中的核心方法:

  • addTimeout() 方法:新增 HashedWheelTimeout 到雙向鏈表的尾部。

  • pollTimeout() 方法:移除雙向鏈表中的頭結(jié)點(diǎn)吨艇,并將其返回躬它。

  • remove() 方法:從雙向鏈表中移除指定的 HashedWheelTimeout 節(jié)點(diǎn)。

  • clearTimeouts() 方法:循環(huán)調(diào)用 pollTimeout() 方法處理整個(gè)雙向鏈表东涡,并返回所有未超時(shí)或者未被取消的任務(wù)冯吓。

  • expireTimeouts() 方法:遍歷雙向鏈表中的全部 HashedWheelTimeout 節(jié)點(diǎn)。 在處理到期的定時(shí)任務(wù)時(shí)软啼,會(huì)通過 remove() 方法取出桑谍,并調(diào)用其 expire() 方法執(zhí)行延柠;對(duì)于已取消的任務(wù)祸挪,通過 remove() 方法取出后直接丟棄;對(duì)于未到期的任務(wù)贞间,會(huì)將 remainingRounds 字段(剩余時(shí)鐘周期數(shù))減一贿条。

/**
 * HashedWheelBucket 是時(shí)間輪中的一個(gè)桶
 * 時(shí)間輪中的桶實(shí)際上就是一個(gè)用于緩存和管理雙向鏈表的容器雹仿,
 * 雙向鏈表中的每一個(gè)節(jié)點(diǎn)就是一個(gè) HashedWheelTimeout 對(duì)象,也就關(guān)聯(lián)了一個(gè) TimerTask 定時(shí)任務(wù)整以。
 */
private static final class HashedWheelBucket {

    /** 雙向鏈表結(jié)構(gòu) */
    private HashedWheelTimeout head;
    private HashedWheelTimeout tail;

    /**
     * 尾插Timeout
     */
    void addTimeout(HashedWheelTimeout timeout) {
        assert timeout.bucket == null;
        timeout.bucket = this;
        if (head == null) {
            head = tail = timeout;
        } else {
            tail.next = timeout;
            timeout.prev = tail;
            tail = timeout;
        }
    }

    /**
     * 調(diào)用所有到期的HashedWheelTimeout的expire胧辽,移除cancel的Timeout
     */
    void expireTimeouts(long deadline) {
        HashedWheelTimeout timeout = head;

        // process all timeouts
        while (timeout != null) {
            HashedWheelTimeout next = timeout.next;
            if (timeout.remainingRounds <= 0) {
                next = remove(timeout);
                if (timeout.deadline <= deadline) {
                    timeout.expire();
                } else {
                    // The timeout was placed into a wrong slot. This should never happen.
                    throw new IllegalStateException(String.format(
                            "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                }
            } else if (timeout.isCancelled()) {
                // timeout是取消狀態(tài),直接從桶中干掉
                next = remove(timeout);
            } else {
                // 否則減1輪表盤
                timeout.remainingRounds--;
            }
            timeout = next;
        }
    }

    /**
     * 從桶中移除timeout公黑,并返回下一個(gè)
     * @param timeout
     * @return
     */
    public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
        HashedWheelTimeout next = timeout.next;
        // remove timeout that was either processed or cancelled by updating the linked-list
        if (timeout.prev != null) {
            timeout.prev.next = next;
        }
        if (timeout.next != null) {
            timeout.next.prev = timeout.prev;
        }

        if (timeout == head) {
            // if timeout is also the tail we need to adjust the entry too
            if (timeout == tail) {
                tail = null;
                head = null;
            } else {
                head = next;
            }
        } else if (timeout == tail) {
            // if the timeout is the tail modify the tail to be the prev node.
            tail = timeout.prev;
        }
        // null out prev, next and bucket to allow for GC.
        timeout.prev = null;
        timeout.next = null;
        timeout.bucket = null;
        timeout.timer.pendingTimeouts.decrementAndGet();
        return next;
    }

    /**
     * 清空桶邑商,并返回所有沒到期或沒取消的Timeouts.
     */
    void clearTimeouts(Set<Timeout> set) {
        for (; ; ) {
            HashedWheelTimeout timeout = pollTimeout();
            if (timeout == null) {
                return;
            }
            if (timeout.isExpired() || timeout.isCancelled()) {
                continue;
            }
            set.add(timeout);
        }
    }

    /**
     * 取出頭結(jié)點(diǎn)
     * @return
     */
    private HashedWheelTimeout pollTimeout() {
        HashedWheelTimeout head = this.head;
        if (head == null) {
            return null;
        }
        HashedWheelTimeout next = head.next;
        if (next == null) {
            tail = this.head = null;
        } else {
            this.head = next;
            next.prev = null;
        }

        // null out prev and next to allow for GC.
        head.next = null;
        head.prev = null;
        head.bucket = null;
        return head;
    }
}

HashedWheelTimer

HashedWheelTimer 是 Timer 接口的實(shí)現(xiàn),它通過時(shí)間輪算法實(shí)現(xiàn)了一個(gè)定時(shí)器凡蚜。HashedWheelTimer 會(huì)根據(jù)當(dāng)前時(shí)間輪指針選定對(duì)應(yīng)的槽(HashedWheelBucket)人断,從雙向鏈表的頭部開始迭代,對(duì)每個(gè)定時(shí)任務(wù)(HashedWheelTimeout)進(jìn)行計(jì)算朝蜘,屬于當(dāng)前時(shí)鐘周期則取出運(yùn)行恶迈,不屬于則將其剩余的時(shí)鐘周期數(shù)減一操作。

HashedWheelTimer 的核心屬性:

  • workerState(volatile int類型):時(shí)間輪當(dāng)前所處狀態(tài)谱醇,可選值有 init暇仲、started、shutdown副渴。同時(shí)奈附,有相應(yīng)的 AtomicIntegerFieldUpdater 實(shí)現(xiàn) workerState 的原子修改。

  • startTime(long類型):當(dāng)前時(shí)間輪的啟動(dòng)時(shí)間煮剧,提交到該時(shí)間輪的定時(shí)任務(wù)的 deadline 字段值均以該時(shí)間戳為起點(diǎn)進(jìn)行計(jì)算桅狠。

  • wheel(HashedWheelBucket[]類型):該數(shù)組就是時(shí)間輪的環(huán)形隊(duì)列,每一個(gè)元素都是一個(gè)槽轿秧。當(dāng)指定時(shí)間輪槽數(shù)為 n 時(shí)中跌,實(shí)際上會(huì)取大于且最靠近 n 的 2 的冪次方值。

  • timeouts菇篡、cancelledTimeouts(LinkedBlockingQueue類型):timeouts 隊(duì)列用于緩沖外部提交時(shí)間輪中的定時(shí)任務(wù)漩符,cancelledTimeouts 隊(duì)列用于暫存取消的定時(shí)任務(wù)。HashedWheelTimer 會(huì)在處理 HashedWheelBucket 的雙向鏈表之前驱还,先處理這兩個(gè)隊(duì)列中的數(shù)據(jù)嗜暴。

  • tick(long類型):該字段在 HashedWheelTimer$Worker 中,是時(shí)間輪的指針议蟆,是一個(gè)步長(zhǎng)為 1 的單調(diào)遞增計(jì)數(shù)器闷沥。

  • mask(int類型):掩碼, mask = wheel.length - 1咐容,執(zhí)行 ticks & mask 便能定位到對(duì)應(yīng)的時(shí)鐘槽舆逃。

  • ticksDuration(long類型):時(shí)間指針每次加 1 所代表的實(shí)際時(shí)間,單位為納秒。

  • pendingTimeouts(AtomicLong類型):當(dāng)前時(shí)間輪剩余的定時(shí)任務(wù)總數(shù)路狮。

  • workerThread(Thread類型):時(shí)間輪內(nèi)部真正執(zhí)行定時(shí)任務(wù)的線程虫啥。

  • worker(Worker類型):真正執(zhí)行定時(shí)任務(wù)的邏輯封裝這個(gè) Runnable 對(duì)象中。

時(shí)間輪對(duì)外提供了一個(gè) newTimeout() 接口用于提交定時(shí)任務(wù)奄妨,在定時(shí)任務(wù)進(jìn)入到 timeouts 隊(duì)列之前會(huì)先調(diào)用 start() 方法啟動(dòng)時(shí)間輪涂籽,其中會(huì)完成下面兩個(gè)關(guān)鍵步驟:

  • 1、確定時(shí)間輪的 startTime 字段砸抛。
  • 2评雌、啟動(dòng) workerThread 線程,開始執(zhí)行 worker 任務(wù)直焙。

之后根據(jù) startTime 計(jì)算該定時(shí)任務(wù)的 deadline 字段柳骄,最后才能將定時(shí)任務(wù)封裝成 HashedWheelTimeout 并添加到 timeouts 隊(duì)列。

下面分析時(shí)間輪指針一次轉(zhuǎn)動(dòng)的全流程:
  • 1箕般、時(shí)間輪指針轉(zhuǎn)動(dòng)耐薯,時(shí)間輪周期開始。

  • 2丝里、清理用戶主動(dòng)取消的定時(shí)任務(wù)曲初,這些定時(shí)任務(wù)在用戶取消時(shí),會(huì)記錄到 cancelledTimeouts 隊(duì)列中杯聚。在每次指針轉(zhuǎn)動(dòng)的時(shí)候臼婆,時(shí)間輪都會(huì)清理該隊(duì)列。

  • 3幌绍、將緩存在 timeouts 隊(duì)列中的定時(shí)任務(wù)轉(zhuǎn)移到時(shí)間輪中對(duì)應(yīng)的槽中颁褂。

  • 4、根據(jù)當(dāng)前指針定位對(duì)應(yīng)槽傀广,處理該槽位的雙向鏈表中的定時(shí)任務(wù)颁独。

  • 5、檢測(cè)時(shí)間輪的狀態(tài)伪冰。如果時(shí)間輪處于運(yùn)行狀態(tài)誓酒,則循環(huán)執(zhí)行上述步驟,不斷執(zhí)行定時(shí)任務(wù)贮聂。如果時(shí)間輪處于停止?fàn)顟B(tài)靠柑,則執(zhí)行下面的步驟獲取到未被執(zhí)行的定時(shí)任務(wù)并加入 unprocessedTimeouts 隊(duì)列:遍歷時(shí)間輪中每個(gè)槽位,并調(diào)用 clearTimeouts() 方法吓懈;對(duì) timeouts 隊(duì)列中未被加入槽中循環(huán)調(diào)用 poll()歼冰。

  • 5、最后再次清理 cancelledTimeouts 隊(duì)列中用戶主動(dòng)取消的定時(shí)任務(wù)耻警。

上述核心邏輯在 HashedWheelTimer$Worker.run() 方法中隔嫡,

Worker

private final class Worker implements Runnable {
    private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
    
    /** cnt滴答數(shù) */
    private long tick;

    /**
     * 時(shí)間輪指針一次轉(zhuǎn)動(dòng)的全流程甸怕。
     *
     * 1. 時(shí)間輪指針轉(zhuǎn)動(dòng),時(shí)間輪周期開始畔勤。
     * 2. 清理用戶主動(dòng)取消的定時(shí)任務(wù),這些定時(shí)任務(wù)在用戶取消時(shí)扒磁,會(huì)記錄到 cancelledTimeouts 隊(duì)列中庆揪。
     *      在每次指針轉(zhuǎn)動(dòng)的時(shí)候,時(shí)間輪都會(huì)清理該隊(duì)列妨托。
     * 3. 將緩存在 timeouts 隊(duì)列中的定時(shí)任務(wù)轉(zhuǎn)移到時(shí)間輪中對(duì)應(yīng)的槽中缸榛。
     * 4. 根據(jù)當(dāng)前指針定位對(duì)應(yīng)槽,處理該槽位的雙向鏈表中的定時(shí)任務(wù)兰伤。
     * 5. 檢測(cè)時(shí)間輪的狀態(tài)内颗。如果時(shí)間輪處于運(yùn)行狀態(tài),則循環(huán)執(zhí)行上述步驟敦腔,不斷執(zhí)行定時(shí)任務(wù)均澳。
     *      如果時(shí)間輪處于停止?fàn)顟B(tài),則執(zhí)行下面的步驟獲取到未被執(zhí)行的定時(shí)任務(wù)并加入 unprocessedTimeouts 隊(duì)列:
     *      遍歷時(shí)間輪中每個(gè)槽位符衔,并調(diào)用 clearTimeouts() 方法找前;對(duì) timeouts 隊(duì)列中未被加入槽中循環(huán)調(diào)用 poll()。
     * 6. 最后再次清理 cancelledTimeouts 隊(duì)列中用戶主動(dòng)取消的定時(shí)任務(wù)判族。
     */
    @Override
    public void run() {
        // Initialize the startTime.
        startTime = System.nanoTime();
        if (startTime == 0) {
            // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
            startTime = 1;
        }

        // Notify the other threads waiting for the initialization at start().
        startTimeInitialized.countDown();

        // Worker啟動(dòng)時(shí)循環(huán)執(zhí)行躺盛,相當(dāng)于時(shí)間輪不停地轉(zhuǎn)動(dòng)
        do {
            final long deadline = waitForNextTick();
            if (deadline > 0) {
                // 計(jì)算指針指向的桶
                int idx = (int) (tick & mask);
                // 先處理被取消的任務(wù)
                processCancelledTasks();
                HashedWheelBucket bucket = wheel[idx];
                // 遍歷 timeouts 隊(duì)列中的定時(shí)任務(wù)添加到桶中
                transferTimeoutsToBuckets();
                // 調(diào)用所有到期的HashedWheelTimeout的expire,移除cancel的Timeout
                bucket.expireTimeouts(deadline);
                tick++;
            }
        } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

        // 上面的循環(huán)結(jié)束后形帮,遍歷清空所有的桶槽惫,將未過期或未取消的任務(wù)保存至unprocessedTimeouts集合,便于返回給stop()方法
        for (HashedWheelBucket bucket : wheel) {
            bucket.clearTimeouts(unprocessedTimeouts);
        }
        
        // 將timeouts緩沖隊(duì)列中未取消的任務(wù)也添加到unprocessedTimeouts中
        for (; ; ) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                break;
            }
            if (!timeout.isCancelled()) {
                unprocessedTimeouts.add(timeout);
            }
        }
        
        // remove所有cancelledTimeouts隊(duì)列中的任務(wù)
        processCancelledTasks();
    }

    private void transferTimeoutsToBuckets() {
        // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
        // adds new timeouts in a loop.
        for (int i = 0; i < 100000; i++) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                // all processed
                break;
            }
            if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                // Was cancelled in the meantime.
                continue;
            }

            long calculated = timeout.deadline / tickDuration;
            timeout.remainingRounds = (calculated - tick) / wheel.length;

            // 計(jì)算落入的桶index辩撑,確保不會(huì)是過去時(shí)間
            final long ticks = Math.max(calculated, tick);
            int stopIndex = (int) (ticks & mask);

            HashedWheelBucket bucket = wheel[stopIndex];
            bucket.addTimeout(timeout);
        }
    }

    private void processCancelledTasks() {
        for (; ; ) {
            HashedWheelTimeout timeout = cancelledTimeouts.poll();
            if (timeout == null) {
                // all processed
                break;
            }
            try {
                timeout.remove();
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("An exception was thrown while process a cancellation task", t);
                }
            }
        }
    }

    /**
     * 根據(jù)startTime和tick數(shù)計(jì)算目標(biāo)ns界斜,然后等待到目標(biāo)ns
     *
     * @return Long.MIN_VALUE if received a shutdown request,
     * current time otherwise (with Long.MIN_VALUE changed by +1)
     */
    private long waitForNextTick() {
        // 下一次滴答時(shí)間
        long deadline = tickDuration * (tick + 1);

        for (; ; ) {
            final long currentTime = System.nanoTime() - startTime;
            long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

            if (sleepTimeMs <= 0) {
                if (currentTime == Long.MIN_VALUE) {
                    return -Long.MAX_VALUE;
                } else {
                    return currentTime;
                }
            }
            if (isWindows()) {
                sleepTimeMs = sleepTimeMs / 10 * 10;
            }

            try {
                Thread.sleep(sleepTimeMs);
            } catch (InterruptedException ignored) {
                if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                    return Long.MIN_VALUE;
                }
            }
        }
    }

    Set<Timeout> unprocessedTimeouts() {
        return Collections.unmodifiableSet(unprocessedTimeouts);
    }
}

HashedWheelTimer

/**
 * 每個(gè)滴答耗時(shí)  通常 100ms
 * 時(shí)間輪大小 通常512
 * 保持單例模式使用
 * 
 */
public class HashedWheelTimer implements Timer {

    /**
     * may be in spi?
     */
    public static final String NAME = "hased";

    private static final Logger logger = LoggerFactory.getLogger(HashedWheelTimer.class);

    private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
    private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
    private static final int INSTANCE_COUNT_LIMIT = 64;
    private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");

    /** 真正執(zhí)行定時(shí)任務(wù)的邏輯封裝這個(gè) Runnable 對(duì)象中 */
    private final Worker worker = new Worker();
    /** 時(shí)間輪內(nèi)部真正執(zhí)行定時(shí)任務(wù)的線程 */
    private final Thread workerThread;

    /** worker狀態(tài)機(jī) */
    private static final int WORKER_STATE_INIT = 0;
    private static final int WORKER_STATE_STARTED = 1;
    private static final int WORKER_STATE_SHUTDOWN = 2;

    /**
     * 0 - init, 1 - started, 2 - shut down
     */
    @SuppressWarnings({"unused", "FieldMayBeFinal"})
    private volatile int workerState;

    // 每個(gè)tick的時(shí)間,時(shí)間輪精度
    private final long tickDuration;
    
    // 時(shí)間輪桶
    private final HashedWheelBucket[] wheel;
    
    // 掩碼合冀, mask = wheel.length - 1锄蹂,執(zhí)行 ticks & mask 便能定位到對(duì)應(yīng)的時(shí)鐘槽
    private final int mask;
    private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
    
    /** timeouts 隊(duì)列用于緩沖外部提交時(shí)間輪中的定時(shí)任務(wù) */
    private final Queue<HashedWheelTimeout> timeouts = new LinkedBlockingQueue<>();
    
    /** cancelledTimeouts 隊(duì)列用于暫存取消的定時(shí)任務(wù) */
    private final Queue<HashedWheelTimeout> cancelledTimeouts = new LinkedBlockingQueue<>();
    
    // 統(tǒng)計(jì)待定的Timeouts數(shù)量
    private final AtomicLong pendingTimeouts = new AtomicLong(0);
    private final long maxPendingTimeouts;

    /** 當(dāng)前時(shí)間輪的啟動(dòng)時(shí)間,提交到該時(shí)間輪的定時(shí)任務(wù)的 deadline 字段值均以該時(shí)間戳為起點(diǎn)進(jìn)行計(jì)算 */
    private volatile long startTime;

    public HashedWheelTimer() {
        this(Executors.defaultThreadFactory());
    }

    public HashedWheelTimer(long tickDuration, TimeUnit unit) {
        this(Executors.defaultThreadFactory(), tickDuration, unit);
    }

    public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
        this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
    }

    public HashedWheelTimer(ThreadFactory threadFactory) {
        this(threadFactory, 100, TimeUnit.MILLISECONDS);
    }

    public HashedWheelTimer(
            ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
        this(threadFactory, tickDuration, unit, 512);
    }

    public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel) {
        this(threadFactory, tickDuration, unit, ticksPerWheel, -1);
    }

    public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel,
            long maxPendingTimeouts) {

        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        }
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }

        // wheel大小處理為2的指數(shù)水慨,并創(chuàng)建時(shí)間輪——桶數(shù)組
        wheel = createWheel(ticksPerWheel);
        mask = wheel.length - 1;

        // Convert tickDuration to nanos.
        this.tickDuration = unit.toNanos(tickDuration);

        // Prevent overflow.
        if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
            throw new IllegalArgumentException(String.format(
                    "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                    tickDuration, Long.MAX_VALUE / wheel.length));
        }
        workerThread = threadFactory.newThread(worker);

        this.maxPendingTimeouts = maxPendingTimeouts;

        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
                WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
            reportTooManyInstances();
        }
    }

    @Override
    protected void finalize() throws Throwable {
        try {
            super.finalize();
        } finally {
            // This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
            // we have not yet shutdown then we want to make sure we decrement the active instance count.
            if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
                INSTANCE_COUNTER.decrementAndGet();
            }
        }
    }

    /**
     * 創(chuàng)建時(shí)間輪——桶數(shù)組
     * @param ticksPerWheel
     * @return
     */
    private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException(
                    "ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }
        if (ticksPerWheel > 1073741824) {
            throw new IllegalArgumentException(
                    "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
        }

        ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i++) {
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
    }

    private static int normalizeTicksPerWheel(int ticksPerWheel) {
        int normalizedTicksPerWheel = ticksPerWheel - 1;
        normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 1;
        normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 2;
        normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 4;
        normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 8;
        normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 16;
        return normalizedTicksPerWheel + 1;
    }

    /**
     * 顯式啟動(dòng)后臺(tái)線程
     * 即使未調(diào)用此方法得糜,后臺(tái)線程也將根據(jù)需要自動(dòng)啟動(dòng)。
     * 1. 確定時(shí)間輪的 startTime 字段晰洒;
     * 2. 啟動(dòng) workerThread 線程朝抖,開始執(zhí)行 worker 任務(wù)
     *
     * @throws IllegalStateException if this timer has been {@linkplain #stop() stopped} already
     */
    public void start() {
        switch (WORKER_STATE_UPDATER.get(this)) {
            case WORKER_STATE_INIT:
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    workerThread.start();
                }
                break;
            case WORKER_STATE_STARTED:
                break;
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }

        // Wait until the startTime is initialized by the worker.
        while (startTime == 0) {
            try {
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }

    @Override
    public Set<Timeout> stop() {
        if (Thread.currentThread() == workerThread) {
            throw new IllegalStateException(
                    HashedWheelTimer.class.getSimpleName() +
                            ".stop() cannot be called from " +
                            TimerTask.class.getSimpleName());
        }

        if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
            // workerState can be 0 or 2 at this moment - let it always be 2.
            if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
                INSTANCE_COUNTER.decrementAndGet();
            }

            return Collections.emptySet();
        }

        try {
            boolean interrupted = false;
            while (workerThread.isAlive()) {
                workerThread.interrupt();
                try {
                    workerThread.join(100);
                } catch (InterruptedException ignored) {
                    interrupted = true;
                }
            }

            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        } finally {
            INSTANCE_COUNTER.decrementAndGet();
        }
        
        // 返回未處理的Timeouts
        return worker.unprocessedTimeouts();
    }

    @Override
    public boolean isStop() {
        return WORKER_STATE_SHUTDOWN == WORKER_STATE_UPDATER.get(this);
    }

    /**
     * 新建Timeout,并加入緩沖隊(duì)列
     */
    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

        // 超過maxPendingTimeouts谍珊,直接拒絕
        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                    + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                    + "timeouts (" + maxPendingTimeouts + ")");
        }

        start();

        // 將該timeout添加至timeouts隊(duì)列中治宣,下一個(gè)tick會(huì)進(jìn)行處理
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // 防止溢出
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

    /**
     * Returns the number of pending timeouts of this {@link Timer}.
     */
    public long pendingTimeouts() {
        return pendingTimeouts.get();
    }

    private static void reportTooManyInstances() {
        String resourceType = ClassUtils.simpleClassName(HashedWheelTimer.class);
        logger.error("You are creating too many " + resourceType + " instances. " +
                resourceType + " is a shared resource that must be reused across the JVM," +
                "so that only a few instances are created.");
    }
}

Dubbo 中如何使用定時(shí)任務(wù)

在 Dubbo 中,時(shí)間輪并不直接用于周期性操作,而是只向時(shí)間輪提交執(zhí)行單次的定時(shí)任務(wù)侮邀,在上一次任務(wù)執(zhí)行完成的時(shí)候坏怪,調(diào)用 newTimeout() 方法再次提交當(dāng)前任務(wù),這樣就會(huì)在下個(gè)周期執(zhí)行該任務(wù)绊茧。即使在任務(wù)執(zhí)行過程中出現(xiàn)了 GC铝宵、I/O 阻塞等情況,導(dǎo)致任務(wù)延遲或卡住华畏,也不會(huì)有同樣的任務(wù)源源不斷地提交進(jìn)來鹏秋,導(dǎo)致任務(wù)堆積。

Dubbo 中對(duì)時(shí)間輪的應(yīng)用主要體現(xiàn)在如下兩個(gè)方面:

  • 失敗重試:例如亡笑,Provider 向注冊(cè)中心進(jìn)行注冊(cè)失敗時(shí)的重試操作侣夷,或是 Consumer 向注冊(cè)中心訂閱時(shí)的失敗重試等。

  • 周期性定時(shí)任務(wù):例如仑乌,定期發(fā)送心跳請(qǐng)求百拓,請(qǐng)求超時(shí)的處理,或是網(wǎng)絡(luò)連接斷開后的重連機(jī)制晰甚。

測(cè)試

@RunWith(SpringRunner.class)
@SpringBootTest
public class HashedWheelTimerTest {

    private class PrintTask implements TimerTask {

        @Override
        public void run(Timeout timeout) {
            final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            System.out.println("task :" + LocalDateTime.now().format(formatter));
        }
    }

    @Test
    public void newTimeout() throws InterruptedException {
        final Timer timer = newTimer();
        // 每隔1s向時(shí)間輪添加任務(wù)耐版。定時(shí)任務(wù)也是1s
        for (int i = 0; i < 10; i++) {
            timer.newTimeout(new PrintTask(), 3, TimeUnit.SECONDS);
            System.out.println("task" + i + "added into the timer");
            Thread.sleep(1000);
        }
        Thread.sleep(5000);
    }

    @Test
    public void stop() throws InterruptedException {
        final Timer timer = newTimer();
        for (int i = 0; i < 10; i++) {
            timer.newTimeout(new PrintTask(), 5, TimeUnit.SECONDS);
            Thread.sleep(100);
        }
        //stop timer
        timer.stop();

        try {
            //this will throw a exception
            timer.newTimeout(new PrintTask(), 5, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private Timer newTimer() {
        // 100ms間隔的時(shí)間輪
        return new HashedWheelTimer(
                new NamedThreadFactory("dubbo-future-timeout", true),
                100,
                TimeUnit.MILLISECONDS);
    }
}

參考:
https://developer.51cto.com/art/202010/628734.htm?mobile

https://blog.csdn.net/weixin_38308374/article/details/105862201

https://wangguoping.blog.csdn.net/article/details/108293948

https://blog.csdn.net/weixin_42588665/article/details/81865156

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市压汪,隨后出現(xiàn)的幾起案子粪牲,更是在濱河造成了極大的恐慌,老刑警劉巖止剖,帶你破解...
    沈念sama閱讀 216,496評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件腺阳,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡穿香,警方通過查閱死者的電腦和手機(jī)亭引,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來皮获,“玉大人焙蚓,你說我怎么就攤上這事∪鞅Γ” “怎么了购公?”我有些...
    開封第一講書人閱讀 162,632評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)雁歌。 經(jīng)常有香客問我宏浩,道長(zhǎng),這世上最難降的妖魔是什么靠瞎? 我笑而不...
    開封第一講書人閱讀 58,180評(píng)論 1 292
  • 正文 為了忘掉前任比庄,我火速辦了婚禮求妹,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘佳窑。我一直安慰自己制恍,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,198評(píng)論 6 388
  • 文/花漫 我一把揭開白布神凑。 她就那樣靜靜地躺著净神,像睡著了一般。 火紅的嫁衣襯著肌膚如雪耙厚。 梳的紋絲不亂的頭發(fā)上强挫,一...
    開封第一講書人閱讀 51,165評(píng)論 1 299
  • 那天岔霸,我揣著相機(jī)與錄音薛躬,去河邊找鬼。 笑死呆细,一個(gè)胖子當(dāng)著我的面吹牛型宝,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播絮爷,決...
    沈念sama閱讀 40,052評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼趴酣,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了坑夯?” 一聲冷哼從身側(cè)響起岖寞,我...
    開封第一講書人閱讀 38,910評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎柜蜈,沒想到半個(gè)月后仗谆,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,324評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡淑履,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,542評(píng)論 2 332
  • 正文 我和宋清朗相戀三年隶垮,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片秘噪。...
    茶點(diǎn)故事閱讀 39,711評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡狸吞,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出指煎,到底是詐尸還是另有隱情蹋偏,我是刑警寧澤,帶...
    沈念sama閱讀 35,424評(píng)論 5 343
  • 正文 年R本政府宣布至壤,位于F島的核電站暖侨,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏崇渗。R本人自食惡果不足惜字逗,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,017評(píng)論 3 326
  • 文/蒙蒙 一京郑、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧葫掉,春花似錦些举、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,668評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至挪挤,卻和暖如春叼丑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背扛门。 一陣腳步聲響...
    開封第一講書人閱讀 32,823評(píng)論 1 269
  • 我被黑心中介騙來泰國打工鸠信, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人论寨。 一個(gè)月前我還...
    沈念sama閱讀 47,722評(píng)論 2 368
  • 正文 我出身青樓星立,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國和親葬凳。 傳聞我的和親對(duì)象是個(gè)殘疾皇子绰垂,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,611評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容

  • [TOC]在kafka中,有許多請(qǐng)求并不是立即返回火焰,而且處理完一些異步操作或者等待某些條件達(dá)成后才返回劲装,這些請(qǐng)求一...
    tracy_668閱讀 2,207評(píng)論 0 1
  • 零、時(shí)間輪定義 簡(jiǎn)單說說時(shí)間輪吧昌简,它是一個(gè)高效的延時(shí)隊(duì)列占业,或者說定時(shí)器。實(shí)際上現(xiàn)在網(wǎng)上對(duì)于時(shí)間輪算法的解釋很多江场,定...
    Java大生閱讀 1,950評(píng)論 1 0
  • 時(shí)間輪:高效延時(shí)隊(duì)列(定時(shí)器)纺酸。 Kafka中時(shí)間輪(TimingWheel)存儲(chǔ)定時(shí)任務(wù)環(huán)形隊(duì)列,底層數(shù)組實(shí)現(xiàn)址否,...
    hedgehog1112閱讀 1,219評(píng)論 0 8
  • 時(shí)間輪由來已久餐蔬,Linux內(nèi)核里有它,大大小小的應(yīng)用里也用它; Kafka里主要用它來作大量的定時(shí)任務(wù)佑附,超時(shí)判斷等...
    掃帚的影子閱讀 4,394評(píng)論 0 3
  • 在Redisson分布式鎖的實(shí)現(xiàn)一文中樊诺,我們說到Redisson會(huì)調(diào)用scheduleExpirationRene...
    jackcooper閱讀 4,659評(píng)論 0 3