時(shí)間輪在 Netty , Kafka 中的設(shè)計(jì)與實(shí)現(xiàn)(上)

本文基于 Netty 4.1.112.Final , Kafka 3.9.0 版本進(jìn)行討論

在業(yè)務(wù)開發(fā)的場景中,我們經(jīng)常會(huì)遇到很多定時(shí)任務(wù)的需求彤避。比如剂癌,生成業(yè)務(wù)報(bào)表境肾,周期性對(duì)賬永丝,同步數(shù)據(jù)锹漱,訂單支付超時(shí)處理等。針對(duì)業(yè)務(wù)場景中定時(shí)任務(wù)邏輯復(fù)雜慕嚷,執(zhí)行時(shí)間長的特點(diǎn)哥牍,市面上已經(jīng)有很多成熟的任務(wù)調(diào)度中間件可供我們選擇。比如:ElasticJob , XXL-JOB , PowerJob 等等喝检。

而在中間件的場景中嗅辣,同樣也存在很多定時(shí)任務(wù)的需求。比如挠说,網(wǎng)絡(luò)連接的心跳檢測辩诞,網(wǎng)絡(luò)請求超時(shí)或失敗的重試機(jī)制,網(wǎng)絡(luò)連接斷開之后的重連機(jī)制纺涤。和業(yè)務(wù)場景不同的是,這些中間件場景的定時(shí)任務(wù)特點(diǎn)是邏輯簡單抠忘,執(zhí)行時(shí)間非常短撩炊,而且對(duì)時(shí)間精度的要求比較低。比如崎脉,心跳檢測以及失敗重試這些定時(shí)任務(wù)拧咳,其實(shí)晚執(zhí)行個(gè)幾十毫秒或者 100 毫秒也無所謂。

于是針對(duì)中間件場景中的這些定時(shí)任務(wù)特點(diǎn):

  1. 海量任務(wù)
  2. 任務(wù)邏輯簡單
  3. 執(zhí)行時(shí)間短
  4. 對(duì)任務(wù)調(diào)度的及時(shí)性沒有那么高的要求

各大中間件設(shè)計(jì)了時(shí)間輪來調(diào)度具有上述 4 種特征的定時(shí)任務(wù)囚灼,而本文主要討論的就是時(shí)間輪的設(shè)計(jì)與實(shí)現(xiàn)骆膝。但在這之前我們需要搞清楚時(shí)間輪這個(gè)設(shè)計(jì)需求是怎么產(chǎn)生的祭衩,我們先從 JDK 中的任務(wù)調(diào)度組件開始聊起,看看 JDK 中的這些任務(wù)調(diào)度組件為什么不能滿足中間件的場景阅签。

本文概要.png

1. JDK 中的任務(wù)調(diào)度組件

說到定時(shí)任務(wù)掐暮,我們第一時(shí)間就能想到的調(diào)度組件就是 JDK 中的 Timer,為什么這么說呢政钟,因?yàn)楣P者剛參加工作時(shí)的第一個(gè)任務(wù)就是用 Timer 實(shí)現(xiàn)的路克,當(dāng)時(shí)對(duì) Java 一無所知,完全零基礎(chǔ)养交。主管交給我一個(gè)定時(shí)任務(wù)的需求精算,兩眼抹黑。于是帶著清澈而又稚嫩的眼神到網(wǎng)上一頓搜索碎连,找到了這個(gè) Timer灰羽,如獲至寶。

1.1 Timer

public class Timer {
    // 優(yōu)先隊(duì)列鱼辙,按照任務(wù)的 ExecutionTime廉嚼,由近到遠(yuǎn)組織
    private final TaskQueue queue = new TaskQueue();
    // 延時(shí)任務(wù)的調(diào)度線程
    private final TimerThread thread = new TimerThread(queue);
}

Timer 中有兩個(gè)核心組件,一個(gè)是用于調(diào)度延時(shí)任務(wù)的 TimerThread 座每,另一個(gè)是 TaskQueue前鹅,用于組織延時(shí)任務(wù)。它是一個(gè)優(yōu)先級(jí)隊(duì)列峭梳,其底層是一個(gè)數(shù)組實(shí)現(xiàn)的小根堆舰绘。

class TaskQueue {
    // 數(shù)組實(shí)現(xiàn)的小根堆
    private TimerTask[] queue = new TimerTask[128];

    // 向小根堆的堆底添加TimerTask 
    void add(TimerTask task) {
        if (size + 1 == queue.length)
            queue = Arrays.copyOf(queue, 2*queue.length);

        queue[++size] = task;
        // 向上調(diào)整
        fixUp(size);
    }

    // 獲取堆頂任務(wù)
    TimerTask getMin() {
        return queue[1];
    }
    
    // 刪除堆頂任務(wù)
    void removeMin() {
        queue[1] = queue[size];
        queue[size--] = null;
        //向下調(diào)整堆
        fixDown(1);
    }
}

TaskQueue 會(huì)將所有延時(shí)任務(wù)按照它們的 ExecutionTime ,由近到遠(yuǎn)的組織在小根堆中葱椭,堆頂永遠(yuǎn)存放的是 ExecutionTime 最近的延時(shí)任務(wù)捂寿。

TimerThread 會(huì)不斷的從 TaskQueue 中獲取堆頂任務(wù),如果堆頂任務(wù)的 ExecutionTime 已經(jīng)達(dá)到 —— executionTime <= currentTime , 則執(zhí)行任務(wù)孵运。如果該任務(wù)是一個(gè)周期性任務(wù)秦陋,則將任務(wù)重新放入到 TaskQueue 中。

如果堆頂任務(wù)的 ExecutionTime 還沒有到達(dá)治笨,那么 TimerThread 就會(huì)等待 executionTime - currentTime 的時(shí)間驳概,一直到堆頂任務(wù)的執(zhí)行時(shí)間到達(dá),TimerThread 被重新喚醒執(zhí)行堆頂任務(wù)旷赖。

    private void mainLoop() {
        while (true) {
            try {
                TimerTask task;
                // 堆頂任務(wù)的執(zhí)行時(shí)間是否到達(dá)
                boolean taskFired;
                synchronized(queue) {
                    long currentTime, executionTime;
                    // 獲取堆頂延時(shí)任務(wù)
                    task = queue.getMin();
                    synchronized(task.lock) {
                        // 當(dāng)前時(shí)間
                        currentTime = System.currentTimeMillis();
                        // 堆頂任務(wù)的執(zhí)行時(shí)間
                        executionTime = task.nextExecutionTime;
                        // 是否到達(dá)堆頂任務(wù)的執(zhí)行時(shí)間
                        if (taskFired = (executionTime<=currentTime)) {
                            if (task.period == 0) { // Non-repeating, remove
                                queue.removeMin();
                                task.state = TimerTask.EXECUTED;
                            } else { // Repeating task, reschedule
                                queue.rescheduleMin(
                                  task.period<0 ? currentTime   - task.period
                                                : executionTime + task.period);
                            }
                        }
                    }
                    // 如果堆頂任務(wù)的執(zhí)行時(shí)間還未到達(dá)顺又,那么 TimerThread 就會(huì)在這里等待
                    if (!taskFired)
                        queue.wait(executionTime - currentTime);
                }
                // 如果堆頂任務(wù)的執(zhí)行時(shí)間已經(jīng)到達(dá),則立即執(zhí)行
                if (taskFired)  // Task fired; run it, holding no locks
                    task.run();
            } catch(InterruptedException e) {
            }
        }
    }

根據(jù)以上 Timer 的核心實(shí)現(xiàn)等孵,我們可以總結(jié)出 Timer 在應(yīng)對(duì)中間件場景的延時(shí)任務(wù)時(shí)稚照,有以下四種不足:

  1. 首先用于組織延時(shí)任務(wù)的 TaskQueue 本質(zhì)上是一個(gè)小根堆。對(duì)于堆這種數(shù)據(jù)結(jié)構(gòu)來說,添加果录,刪除一個(gè)延時(shí)任務(wù)時(shí)上枕,堆都要向上,向下調(diào)整以便滿足小根堆的特性弱恒。單次操作的時(shí)間復(fù)雜度為 O(logn)辨萍。顯然在面對(duì)海量定時(shí)任務(wù)的添加,刪除時(shí)斤彼,性能上還是差點(diǎn)意思分瘦。

  2. Timer 調(diào)度框架中只有一個(gè) TimerThread 線程來負(fù)責(zé)延時(shí)任務(wù)的調(diào)度,執(zhí)行琉苇。在面對(duì)海量任務(wù)的時(shí)候嘲玫,通常會(huì)顯得力不從心。

  3. 另外一個(gè)嚴(yán)重問題是并扇,當(dāng)延時(shí)任務(wù)在執(zhí)行的過程中出現(xiàn)異常時(shí)去团, Timer 并不會(huì)捕獲,會(huì)導(dǎo)致 TimerThread 終止穷蛹。這樣一來土陪,TaskQueue 中的其他延時(shí)任務(wù)將永遠(yuǎn)不會(huì)得到執(zhí)行。

  4. Timer 依賴于系統(tǒng)的絕對(duì)時(shí)間肴熏,如果系統(tǒng)時(shí)間本身不準(zhǔn)確鬼雀,那么延時(shí)任務(wù)的調(diào)度就可能會(huì)出問題。

1.2 DelayQueue

DelayQueue 是 JDK 提供的一個(gè)延時(shí)隊(duì)列蛙吏,我們可以利用它來延時(shí)獲取隊(duì)列中的元素源哩,它的實(shí)現(xiàn)其實(shí)和 Timer 中的 TaskQueue 很類似,其底層都是一個(gè)優(yōu)先級(jí)隊(duì)列鸦做。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    
    //基于小根堆實(shí)現(xiàn)的優(yōu)先級(jí)隊(duì)列
    private final PriorityQueue<E> q = new PriorityQueue<E>();
}

本質(zhì)上還是一個(gè)數(shù)組實(shí)現(xiàn)的小根堆励烦。添加,刪除任務(wù)的時(shí)間復(fù)雜度仍然是 O(logn)泼诱。

public class PriorityQueue<E> {
    // 數(shù)組實(shí)現(xiàn)的小根堆
    transient Object[] queue; 
}

DelayQueue 中的元素必須實(shí)現(xiàn) Delayed 接口中的 getDelay , compareTo 方法坛掠。

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

public interface Comparable<T> {
   public int compareTo(T o);
}

其中 getDelay 方法用于獲取任務(wù)還有多久到期。返回值如果小于等于 0 治筒,則表示該任務(wù)已經(jīng)到期了屉栓。

compareTo 方法用于調(diào)整任務(wù)在 DelayQueue 中的位置,DelayQueue 是一個(gè)小根堆耸袜,每次向 DelayQueue 添加新的任務(wù)時(shí)系瓢,先是把任務(wù)放到 DelayQueue 的末尾,然后依次向上調(diào)整句灌,直到任務(wù)的過期時(shí)間大于等于其 parent 。 這樣就可以保證 DelayQueue 的小根堆特性 —— 堆頂元素永遠(yuǎn)是過期時(shí)間最近的任務(wù)。

我們可以通過 take() 方法從 DelayQueue 獲取到期的堆頂任務(wù)胰锌,如果堆頂任務(wù)還沒到期骗绕,那么就會(huì)在 DelayQueue 上阻塞等待,直到堆頂任務(wù)到期為止资昧。

    public E take() throws InterruptedException {
        try {
            for (;;) {
                // 獲取 DelayQueue 堆頂任務(wù)
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    // 獲取堆頂任務(wù)還有多久到期
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0L)
                        // 堆頂任務(wù)到期酬土,則從 DelayQueue 中取出
                        return q.poll();
                    else {
                        try {
                            // 等待堆頂任務(wù)到期
                            available.awaitNanos(delay);
                        }
                    }
                }
            }
        } finally {
                    ...........
        }
    }

從 DelayQueue 的實(shí)現(xiàn)上可以看出,相比于 Timer 格带,DelayQueue 只是一個(gè)延時(shí)任務(wù)的管理隊(duì)列撤缴,而 Timer 卻是一個(gè)完整的任務(wù)調(diào)度組件。我們需要在 DelayQueue 的基礎(chǔ)之上叽唱,額外地實(shí)現(xiàn)任務(wù)調(diào)度功能屈呕。

但其底層的核心數(shù)據(jù)結(jié)構(gòu)仍然是一個(gè)小根堆。和 Timer 一樣棺亭,添加刪除延時(shí)任務(wù)的時(shí)間復(fù)雜度都是 O(logn)虎眨。同樣無法滿足海量延時(shí)任務(wù)的調(diào)度。

1.3 ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 是多線程版本的 Timer 镶摘,它是在 DelayQueue 的基礎(chǔ)上增加了多線程調(diào)度延時(shí)任務(wù)的能力嗽桩。ScheduledThreadPoolExecutor 中負(fù)責(zé)組織管理延時(shí)任務(wù)的是 DelayedWorkQueue,它也是一個(gè)小根堆實(shí)現(xiàn)的優(yōu)先級(jí)隊(duì)列凄敢,延時(shí)任務(wù) ScheduledFutureTask 按照到期時(shí)間由近及遠(yuǎn)的組織在 DelayedWorkQueue 中碌冶。DelayedWorkQueue 的第一個(gè)元素是到期時(shí)間最近的 ScheduledFutureTask。

業(yè)務(wù)線程可以通過 schedule , scheduleAtFixedRate , scheduleWithFixedDelay 方法將延時(shí)任務(wù) ScheduledFutureTask 添加到 DelayedWorkQueue 中涝缝。

image.png

ScheduledThreadPoolExecutor 負(fù)責(zé)調(diào)度延時(shí)任務(wù)的是一個(gè)線程池扑庞,里邊包含了多個(gè) worker 調(diào)度線程,每個(gè) worker 線程負(fù)責(zé)從 DelayedWorkQueue 中獲取已經(jīng)到期的 ScheduledFutureTask俊卤,然后執(zhí)行嫩挤。如果 DelayedWorkQueue 中沒有任務(wù)到期,那么 worker 線程就會(huì)在 DelayedWorkQueue 上阻塞等待消恍,直到有到期的任務(wù)出現(xiàn)岂昭。

雖然 ScheduledThreadPoolExecutor 提供了多線程的調(diào)度能力,在一定程度上保證了延時(shí)任務(wù)調(diào)度的及時(shí)性狠怨,但是其底層仍然是依賴 DelayedWorkQueue 來管理延時(shí)任務(wù)约啊,在面對(duì)海量延時(shí)任務(wù)的添加,刪除時(shí)佣赖,時(shí)間復(fù)雜度依然還是 O(logn) 恰矩。那么有沒有一種數(shù)據(jù)結(jié)構(gòu)可以將這個(gè)時(shí)間復(fù)雜度降低為 O(1) 呢 ? 這就是本文我們要討論的重點(diǎn)內(nèi)容 —— 時(shí)間輪的設(shè)計(jì)與實(shí)現(xiàn)憎蛤。

2. Netty 時(shí)間輪的設(shè)計(jì)原理

時(shí)間輪的設(shè)計(jì)靈感來自于我們?nèi)粘I钪杏玫溺姳硗飧担姳碛忻脶樇退保轴槪瑫r(shí)針萎胰,共三個(gè)指針碾盟,60 個(gè)刻度。秒針每走一個(gè)刻度就是一秒技竟,秒針走完一個(gè)時(shí)鐘周期(60s)冰肴,分針走一個(gè)刻度就是一分鐘,當(dāng)分針走完一個(gè)時(shí)鐘周期(60m)榔组,時(shí)針走一個(gè)刻度就是一個(gè)小時(shí)熙尉。

image.png

比如我們要在今天的 10 點(diǎn) 10 分 0 秒這個(gè)時(shí)刻去開一個(gè)重要的會(huì)議,那么當(dāng)鐘表的秒針指向 0 這個(gè)刻度搓扯,分針指向 10 這個(gè)刻度检痰,時(shí)針指向 10 這個(gè)刻度的時(shí)候,鬧鐘就會(huì)響起擅编,提醒我們?nèi)?zhí)行開會(huì)這個(gè)延時(shí)任務(wù)攀细。

如果我們能把鐘表里的刻度抽象成一個(gè)數(shù)據(jù)結(jié)構(gòu),用這個(gè)數(shù)據(jù)結(jié)構(gòu)來存放對(duì)應(yīng)刻度的延時(shí)任務(wù)的話爱态,那么當(dāng)鐘表的時(shí)針谭贪,分針,秒針指向某個(gè)刻度的時(shí)候锦担,我們就去執(zhí)行這個(gè)刻度對(duì)應(yīng)的延時(shí)任務(wù)俭识,這樣一來,一種新的延時(shí)任務(wù)調(diào)度思路就出來了洞渔,這也是時(shí)間輪的設(shè)計(jì)理念套媚。

image.png

如上圖所示,Netty 將鐘表的刻度抽象成了一個(gè) HashedWheelBucket 的數(shù)據(jù)結(jié)構(gòu)磁椒,鐘表的表盤被抽象成一個(gè) HashedWheelBucket 類型的環(huán)形數(shù)組堤瘤,鐘表中有 60 個(gè)刻度,而 Netty 的時(shí)間輪 HashedWheelTimer 一共有 512 個(gè)刻度浆熔。

public class HashedWheelTimer implements Timer {
    // 數(shù)組大小默認(rèn)為 512
    private final HashedWheelBucket[] wheel;
    // HashedWheelTimer 的時(shí)鐘精度本辐,也就是時(shí)鐘間隔,多久轉(zhuǎn)動(dòng)一次医增,默認(rèn) 100ms, 最小值為 1ms
    private final long tickDuration;
}

鐘表中一共有三個(gè)指針慎皱,分別是秒針,分針叶骨,時(shí)針茫多。而 HashedWheelTimer 中只有一個(gè) tick 指針,tick 每隔 tickDuration (100ms) 走一個(gè)刻度忽刽,也就是說 Netty 時(shí)間輪的時(shí)鐘精度就是 100 ms , 定時(shí)任務(wù)的調(diào)度延時(shí)有時(shí)會(huì)在 100ms 左右天揖。如果你接受不了這么大的調(diào)度誤差夺欲,那么可以將 tickDuration 適當(dāng)調(diào)小一些,但最小不能低于 1ms 宝剖。

什么意思呢 洁闰?比如現(xiàn)在我們需要在 5ms 之后執(zhí)行一個(gè)延時(shí)任務(wù),那么時(shí)間輪可能在 8ms 之后才會(huì)調(diào)度這個(gè)任務(wù)万细,也可能在 65ms 之后調(diào)度,也有可能在 108ms 之后調(diào)度纸泄,這就使得定時(shí)任務(wù)的執(zhí)行有了大約 100ms 左右的延時(shí)赖钞。

具體延時(shí)多少,取決于我們在什么時(shí)刻將這個(gè)定時(shí)任務(wù)添加到時(shí)間輪中聘裁。關(guān)于這一點(diǎn)雪营,筆者后面會(huì)在介紹時(shí)間輪具體實(shí)現(xiàn)細(xì)節(jié)的時(shí)候詳細(xì)討論,這里點(diǎn)到為止衡便,本小節(jié)我們還是主要聚焦于時(shí)間輪的設(shè)計(jì)原理献起。

對(duì)于鐘表的秒針來說,它的 tickDuration 就是 1s , 走完一個(gè)時(shí)鐘周期就是 60s 镣陕。 對(duì)于分針來說谴餐,它的 tickDuration 就是 1m , 走完一個(gè)時(shí)鐘周期就是 60m。對(duì)于時(shí)針來說呆抑,它的 tickDuration 就是 1h , 走完一個(gè)時(shí)鐘周期就是 12h岂嗓。

由于 HashedWheelTimer 中的 tickDuration 是 100ms , 有 512 個(gè)刻度 (HashedWheelBucket) , 所以時(shí)間輪中的 tick 指針走完一個(gè)時(shí)鐘周期需要 51200ms 。

HashedWheelBucket 是一個(gè)具有頭尾指針的雙向鏈表鹊碍,鏈表中存儲(chǔ)的元素類型為 HashedWheelTimeout 用于封裝定時(shí)任務(wù)厌殉。HashedWheelBucket 中的 head 指向雙向鏈表中的第一個(gè) HashedWheelTimeout , tail 指向雙向鏈表中的最后一個(gè) HashedWheelTimeout侈咕。

    private static final class HashedWheelBucket {
        // Used for the linked-list datastructure
        private HashedWheelTimeout head;// 指向雙向鏈表中的第一個(gè) timeout
        private HashedWheelTimeout tail;// 指向雙向鏈表中的最后一個(gè) timeout
    }
image.png

HashedWheelTimeout 用于封裝時(shí)間輪中的延時(shí)任務(wù)公罕,提交到時(shí)間輪中的延時(shí)任務(wù)必須實(shí)現(xiàn) TimerTask 接口。

// 延時(shí)任務(wù)
public interface TimerTask {
    void run(Timeout timeout) throws Exception;
}

private static final class HashedWheelTimeout implements Timeout, Runnable {
        // 延時(shí)任務(wù)所屬的時(shí)間輪
        private final HashedWheelTimer timer;
        // 延時(shí)任務(wù)
        private final TimerTask task;
        // 延時(shí)任務(wù)的 deadline 耀销,該時(shí)間是一個(gè)絕對(duì)時(shí)間楼眷,以時(shí)間輪的啟動(dòng)時(shí)間 startTime 為起點(diǎn)
        private final long deadline;
        // 延時(shí)任務(wù)所屬的 bucket
        HashedWheelBucket bucket;
        // 指向其在 bucket 的下一個(gè)延時(shí)任務(wù)
        HashedWheelTimeout next;
        // 指向其在 bucket 的前一個(gè)延時(shí)任務(wù)
        HashedWheelTimeout prev;


        HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
            this.timer = timer;
            this.task = task;
            this.deadline = deadline;
        }
}

HashedWheelTimeout 中有一個(gè)重要的屬性 deadline ,它規(guī)定了延時(shí)任務(wù) TimerTask 的到期時(shí)間树姨。deadline 是一個(gè)絕對(duì)時(shí)間值摩桶,它以時(shí)間輪的啟動(dòng)時(shí)間 startTime 為起點(diǎn),表示從 startTime 這個(gè)時(shí)間點(diǎn)開始帽揪,到 deadline 這個(gè)時(shí)間點(diǎn)到期硝清。

// 計(jì)算延時(shí)任務(wù)到期的絕對(duì)時(shí)間戳
// 時(shí)間輪中的時(shí)間戳均以時(shí)間輪的啟動(dòng)時(shí)間 startTime 為起點(diǎn)
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
image.png

Netty 時(shí)間輪中的時(shí)間坐標(biāo)系全部是以時(shí)間輪的啟動(dòng)時(shí)間點(diǎn) startTime 為基準(zhǔn)的,當(dāng)時(shí)間輪啟動(dòng)之后转晰,會(huì)將那一刻的時(shí)間戳設(shè)置到 startTime 中芦拿。

public class HashedWheelTimer implements Timer {
    // 時(shí)間輪的啟動(dòng)時(shí)間戳(納秒)
    private volatile long startTime;
}

時(shí)間輪中的 tick 指針也是一個(gè)絕對(duì)值士飒,當(dāng)時(shí)間輪啟動(dòng)之后,tick 指向 0 蔗崎,每隔 100ms (tickDuration)酵幕,tick 向前轉(zhuǎn)動(dòng)一下。但需要注意的是 tick 的值是只增不減的缓苛,只要時(shí)間輪在運(yùn)行芳撒,那么 tick 的值就會(huì)一直遞增上去。比如未桥,當(dāng) tick 轉(zhuǎn)動(dòng)完一個(gè)時(shí)鐘周期(51200ms)之后笔刹,tick 的值是 512 而不是重新指向 0 。

tick 與 HashedWheelBucket 之間的映射關(guān)系通過 ticks & mask 計(jì)算得出冬耿。mask 為 HashedWheelBucket 的個(gè)數(shù)減 1 舌菜,所以這就要求時(shí)間輪中 HashedWheelBucket 的個(gè)數(shù)必須是 2 的次冪。

在時(shí)間輪中亦镶,屬于同一個(gè) HashedWheelBucket 中的延時(shí)任務(wù) HashedWheelTimeouts 日月,它們的到期時(shí)間 deadline 都在同一時(shí)間范圍內(nèi) —— [ tick , tick + 1) * tickDuration

比如缤骨,在時(shí)間輪剛剛啟動(dòng)之后爱咬,tick 指向 0 ,那么 wheel[0] 指向的 HashedWheelBucket 里存放的 HashedWheelTimeouts荷憋,它們的到期時(shí)間均在 [ 0 , 100) ms 之內(nèi)台颠。

假如我們在 tick = 0 這個(gè)時(shí)刻,向時(shí)間輪中添加了一個(gè)延時(shí) 5ms 執(zhí)行的 HashedWheelTimeout勒庄,那么它就會(huì)被放入 wheel[0] 中串前。如果添加的是一個(gè)延時(shí) 101ms 執(zhí)行的 HashedWheelTimeout,那么它就會(huì)被放入 wheel[1] 中实蔽。同樣的道理荡碾,如果添加的是一個(gè)延時(shí) 360ms 執(zhí)行的 HashedWheelTimeout,那么它就會(huì)被放入 wheel[3] 中局装。

image.png

當(dāng)時(shí)間過了 100ms 之后坛吁,Netty 就會(huì)將 HashedWheelBucket0 中的延時(shí)任務(wù)拉出來執(zhí)行,執(zhí)行完之后铐尚,tick 的值加 1 拨脉,從 0 轉(zhuǎn)動(dòng)到 1 。在經(jīng)過 100 ms 之后,Netty 就會(huì)將 HashedWheelBucket1 中的延時(shí)任務(wù)拉出來執(zhí)行,執(zhí)行完之后掀序,tick 的值加 1 舶担,從 1 轉(zhuǎn)動(dòng)到 2 帖旨,如此往復(fù)循環(huán)箕昭。這就是整個(gè)時(shí)間輪的運(yùn)轉(zhuǎn)邏輯。

但從這個(gè)過程中我們可以看出解阅,延時(shí)任務(wù)的調(diào)度存在 tickDuration(100ms)左右的延遲落竹。比如,在 tick = 0 這個(gè)時(shí)刻货抄,添加到 HashedWheelBucket0 中的延時(shí)任務(wù)述召,我們本來是期望這些延時(shí)任務(wù)分別在 5ms , 10ms , 95ms 之后執(zhí)行,但時(shí)間輪的真正調(diào)度時(shí)間卻在 100ms 之后蟹地。這就導(dǎo)致了任務(wù)調(diào)度產(chǎn)生了 100ms 左右的延遲桨武。

如果你接受不了 100ms 的延遲,那么可以在創(chuàng)建時(shí)間輪的時(shí)候锈津,將 tickDuration 的值調(diào)低,但不能低于 1ms 凉蜂。tickDuration 的值越小琼梆,時(shí)間輪的精度越高,性能開銷也就越大窿吩。tickDuration 的值越大茎杂,時(shí)間輪的精度也就越低,性能開銷越小纫雁。

    public HashedWheelTimer(long tickDuration, TimeUnit unit) {
        
    }

但在中間件的場景中煌往,往往對(duì)延時(shí)任務(wù)調(diào)度的及時(shí)性沒有那么高的要求,同時(shí)為了兼顧時(shí)間輪的精度與性能轧邪,tickDuration 默認(rèn)設(shè)置為100ms 是剛好合適的刽脖,通常不需要調(diào)整。

另外在默認(rèn)情況下忌愚,只有一個(gè)線程 workerThread 負(fù)責(zé)推動(dòng)時(shí)間輪的轉(zhuǎn)動(dòng)曲管,以及延時(shí)任務(wù)的執(zhí)行。

public class HashedWheelTimer implements Timer {
    // HashedWheelTimer 的 worker 線程硕糊,由它來驅(qū)動(dòng)時(shí)間輪的轉(zhuǎn)動(dòng)院水,延時(shí)任務(wù)的執(zhí)行
    private final Thread workerThread;
}

從上面的過程可以看出,只有當(dāng)前 tick 對(duì)應(yīng)的 HashedWheelBucket 中的延時(shí)任務(wù)全部被執(zhí)行完畢的時(shí)候简十,tick 才會(huì)向前推動(dòng)檬某。所以為了保證任務(wù)調(diào)度的及時(shí)性,時(shí)間輪中的延時(shí)任務(wù)執(zhí)行時(shí)間不能太長螟蝙,只適合邏輯簡單恢恼,執(zhí)行時(shí)間短的延時(shí)任務(wù)。

但畢竟在默認(rèn)情況下就只有這一個(gè) workerThread胶逢,既負(fù)責(zé)延時(shí)任務(wù)的調(diào)度厅瞎,又負(fù)責(zé)延時(shí)任務(wù)的執(zhí)行饰潜,對(duì)于有海量并發(fā)延時(shí)任務(wù)的場景,還是顯得力不從心和簸。為了應(yīng)對(duì)這種情況彭雾,我們可以在創(chuàng)建時(shí)間輪的時(shí)候,指定一個(gè)專門用于執(zhí)行延時(shí)任務(wù)的 Executor锁保。

這樣一來薯酝,時(shí)間輪中的延時(shí)任務(wù)調(diào)度還是由單線程 workerThread 負(fù)責(zé),到期的延時(shí)任務(wù)由線程池 Executor 來負(fù)責(zé)執(zhí)行爽柒,近一步提升延時(shí)任務(wù)調(diào)度的及時(shí)性吴菠。但事實(shí)上,在大部分場景中浩村,有一個(gè) workerThread 就夠了做葵,并不需要額外的指定 Executor。大家可以根據(jù)實(shí)際情況心墅,自由裁定酿矢。

public class HashedWheelTimer implements Timer {
    // 負(fù)責(zé)執(zhí)行延時(shí)任務(wù),用于應(yīng)對(duì)大量的并發(fā)延時(shí)任務(wù)場景
    // 默認(rèn)為單線程 workerThread
    private final Executor taskExecutor;
    // 在構(gòu)造函數(shù)中可以設(shè)置 taskExecutor
    public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
            long maxPendingTimeouts, Executor taskExecutor) 
}

另外還有一個(gè)問題就是怎燥,上圖時(shí)間輪中的延時(shí)任務(wù)瘫筐,它們的延時(shí)時(shí)間都在同一時(shí)鐘周期內(nèi)。Netty 時(shí)間輪中的一個(gè)時(shí)鐘周期是 51200ms 铐姚。

也就是說策肝,在 tick = 0 這個(gè)時(shí)刻,只要延時(shí)任務(wù)的延時(shí)時(shí)間在 51200ms 之內(nèi)隐绵,那么當(dāng) tick 轉(zhuǎn)動(dòng)完 512 個(gè)刻度之后(一個(gè)時(shí)鐘周期)之众,這 512 個(gè)刻度對(duì)應(yīng)的 HashedWheelBucket 中的延時(shí)任務(wù)全部會(huì)被執(zhí)行到。

如果我們在 tick = 0 這個(gè)時(shí)刻氢橙,添加一個(gè)延時(shí)任務(wù)酝枢,但它的延時(shí)時(shí)間超過了一個(gè)時(shí)鐘周期,比如在 51250ms 之后執(zhí)行悍手。 那么這個(gè)延時(shí)任務(wù)也會(huì)被添加到 HashedWheelBucket0 中帘睦。

image.png

當(dāng)時(shí)間過了 100ms 之后,workerThread 就會(huì)執(zhí)行 HashedWheelBucket0 中的延時(shí)任務(wù)坦康。但此時(shí)只能執(zhí)行延時(shí) 5ms , 10ms 的任務(wù)竣付,不能執(zhí)行延時(shí) 51250ms 的任務(wù),因?yàn)樗枰鹊较乱粋€(gè)時(shí)鐘周期才能執(zhí)行滞欠。

那么 workerThread 在執(zhí)行延時(shí)任務(wù)的時(shí)候如何才能知道古胆,哪些任務(wù)是本次時(shí)鐘周期內(nèi)可以執(zhí)行的,哪些任務(wù)是要等到下一次或者下下次時(shí)鐘周期才能執(zhí)行的呢 ?

在延時(shí)任務(wù)模型 HashedWheelTimeout 中有一個(gè)字段 —— remainingRounds逸绎,用于記錄延時(shí)任務(wù)還剩多少時(shí)鐘周期可以執(zhí)行惹恃。

private static final class HashedWheelTimeout implements Timeout, Runnable {
    // 執(zhí)行該延時(shí)任務(wù)需要經(jīng)過多少時(shí)鐘周期
    long remainingRounds;
}

本次時(shí)鐘周期內(nèi)可以執(zhí)行的延時(shí)任務(wù),它的 remainingRounds = 0 棺牧,workerThread 在遇到 remainingRounds = 0 的 HashedWheelTimeout 就會(huì)執(zhí)行巫糙。

下一個(gè)時(shí)鐘周期才能執(zhí)行的延時(shí)任務(wù),它的 remainingRounds = 1 颊乘,依次類推参淹。當(dāng) workerThread 遇到 remainingRounds > 0 的 HashedWheelTimeout 就會(huì)直接跳過,并將 remainingRounds 減 1 乏悄。

比如浙值,上圖中 HashedWheelBucket0 中的這幾個(gè)延時(shí)任務(wù),其中延時(shí) 5ms , 10ms 的 HashedWheelTimeout 它們的 remainingRounds = 0, 表示在本次時(shí)鐘周期內(nèi)就可以馬上執(zhí)行檩小。

image.png

延時(shí) 51250ms 的 HashedWheelTimeout 它的 remainingRounds = 1 开呐, 表示在下一個(gè)時(shí)鐘周期才能執(zhí)行。

好了规求,現(xiàn)在整個(gè)時(shí)間輪的設(shè)計(jì)原理筆者就為大家介紹完了负蚊,那么讓我們再次回到本小節(jié)開頭的問題,在面對(duì)海量延時(shí)任務(wù)的添加與取消時(shí)颓哮,時(shí)間輪如何將這個(gè)時(shí)間復(fù)雜度降低為 O(1) 呢 ?

首先鸵荠,時(shí)間輪的核心數(shù)據(jù)結(jié)構(gòu)就是一個(gè) HashedWheelBucket 類型的環(huán)形數(shù)組 wheel 冕茅, 數(shù)組長度默認(rèn)為 512 。wheel 數(shù)組用于組織管理時(shí)間輪中的所有延時(shí)任務(wù)蛹找。

    // 數(shù)組大小默認(rèn)為 512
    private final HashedWheelBucket[] wheel;

與之前介紹的延時(shí)隊(duì)列 DelayedWorkQueue 不同的是姨伤,環(huán)形數(shù)組 wheel 會(huì)按照延時(shí)時(shí)間的不同,將延時(shí)任務(wù)分散到 512 個(gè) HashedWheelBucket 中管理庸疾。每個(gè) HashedWheelBucket 負(fù)責(zé)管理到期時(shí)間范圍在 [ tick , tick + 1) * tickDuration 之間的任務(wù)乍楚。

而 DelayedWorkQueue 則是用一個(gè)優(yōu)先級(jí)隊(duì)列來管理所有的延時(shí)任務(wù),為了維護(hù)小根堆的特性届慈,每次在向 DelayedWorkQueue 添加或者刪除延時(shí)任務(wù)的時(shí)間復(fù)雜度為 O(logn)徒溪。

當(dāng)我們向時(shí)間輪添加一個(gè)延時(shí)任務(wù)時(shí),Netty 首先會(huì)根據(jù)延時(shí)任務(wù)的 deadline 以及 tickDuration 計(jì)算出該延時(shí)任務(wù)最終會(huì)停留在哪一個(gè) tick 上金顿。注意臊泌,延時(shí)任務(wù)中的 deadline 是一個(gè)絕對(duì)值而不是相對(duì)值,是以時(shí)間輪啟動(dòng)時(shí)間 startTime 為基準(zhǔn)的一個(gè)絕對(duì)時(shí)間戳揍拆。tick 也是一個(gè)絕對(duì)值而不是相對(duì)值渠概,是以時(shí)間輪剛剛啟動(dòng)時(shí) tick = 0 為基準(zhǔn)的絕對(duì)值,只增不減嫂拴。

比如播揪,前面這個(gè)延時(shí) 51250ms 的任務(wù)贮喧,它最終會(huì)停留在 tick = 512 上。但由于時(shí)間輪是一個(gè)環(huán)形數(shù)組猪狈,所以 tick 512 與數(shù)組長度 512 取余得到所屬 HashedWheelBucket 在 wheel 數(shù)組中的 index = 0箱沦。

// 計(jì)算延時(shí)任務(wù),最終會(huì)停留在哪一個(gè) tick 上
long calculated = timeout.deadline / tickDuration;

// 獲取 calculated 對(duì)應(yīng)的 HashedWheelBucket
int stopIndex = (int) (calculated & mask);
HashedWheelBucket bucket = wheel[stopIndex];

// 將延時(shí)任務(wù)添加到對(duì)應(yīng)的 HashedWheelBucket 中
bucket.addTimeout(timeout);

然后將延時(shí)任務(wù)添加到 HashedWheelBucket 的末尾罪裹,前面我們已經(jīng)提過饱普,HashedWheelBucket 是一個(gè)雙向鏈表,向鏈表末尾添加一個(gè)元素的時(shí)間復(fù)雜度為 O(1) 状共。

private static final class HashedWheelBucket {

        public void addTimeout(HashedWheelTimeout timeout) {
            assert timeout.bucket == null;
            timeout.bucket = this;
            if (head == null) {
                head = tail = timeout;
            } else {
                tail.next = timeout;
                timeout.prev = tail;
                tail = timeout;
            }
        }      
}

延時(shí)任務(wù)的取消邏輯也很簡單套耕,就是將這個(gè)延時(shí)任務(wù)從其所屬的 HashedWheelBucket 中刪除即可。從一個(gè)雙向鏈表中刪除某個(gè)指定的元素時(shí)間復(fù)雜度也是 O(1)峡继。

  public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
            HashedWheelTimeout next = timeout.next;
            // timeout 不是第一個(gè)元素
            if (timeout.prev != null) { 
                timeout.prev.next = next;
            }
            // timeout 不是最后一個(gè)元素
            if (timeout.next != null) {
                timeout.next.prev = timeout.prev;
            }

            if (timeout == head) {
                // Bucket 中只有一個(gè)任務(wù)冯袍,直接將頭尾指針置空
                if (timeout == tail) {
                    tail = null;
                    head = null;
                } else {
                    // 待刪除的任務(wù)是第一個(gè)任務(wù),head 指針向后移動(dòng)
                    head = next;
                }
            } else if (timeout == tail) {
                // 待刪除的任務(wù)是最后一個(gè)任務(wù)碾牌,tail 指針向前移動(dòng)
                tail = timeout.prev;
            }
            // null out prev, next and bucket to allow for GC.
            timeout.prev = null;
            timeout.next = null;
            timeout.bucket = null;
            return next;
        }

從以上過程我們可以看出康愤,時(shí)間輪在面對(duì)海量延時(shí)任務(wù)的添加,取消的時(shí)候舶吗,所需的時(shí)間復(fù)雜度都是 O(1)征冷,聊完了延時(shí)任務(wù)的管理,現(xiàn)在我們在來看一下延時(shí)任務(wù)的調(diào)度與執(zhí)行誓琼。

Netty 只靠一個(gè)單線程 workThread 來推動(dòng)時(shí)間輪一個(gè) tick 一個(gè) tick 地向前轉(zhuǎn)動(dòng)检激,當(dāng)時(shí)間輪轉(zhuǎn)動(dòng)到某一個(gè) tick 時(shí),workThread 會(huì)等待一個(gè) tickDuration (默認(rèn) 100ms)的時(shí)間腹侣,隨后 workThread 會(huì)將該 tick 對(duì)應(yīng)的 HashedWheelBucket 中 remainingRounds = 0 的延時(shí)任務(wù)全都拉取下來挨個(gè)執(zhí)行叔收。

當(dāng)執(zhí)行完 HashedWheelBucket 中的延時(shí)任務(wù)之后,tick 向前推進(jìn)一格(tick++)傲隶,workThread 繼續(xù)睡眠等待一個(gè) tickDuration饺律,然后重復(fù)上述過程。

這里我們可以看出跺株,延時(shí)任務(wù)的調(diào)度與執(zhí)行在默認(rèn)情況下全部都是由一個(gè)單線程 workThread 來執(zhí)行复濒。如果時(shí)間輪中的延時(shí)任務(wù)邏輯復(fù)雜,執(zhí)行時(shí)間長乒省,那么就會(huì)影響整個(gè)時(shí)間輪的調(diào)度芝薇,tick 的轉(zhuǎn)動(dòng)就會(huì)出現(xiàn)延時(shí),所以時(shí)間輪雖然可以處理海量的延時(shí)任務(wù)作儿,但是這些延時(shí)任務(wù)的邏輯必須要簡單洛二,執(zhí)行時(shí)間要短。當(dāng)然了,我們也可以在創(chuàng)建時(shí)間輪的時(shí)候指定一個(gè)專門執(zhí)行延時(shí)任務(wù)的線程池來加快任務(wù)的執(zhí)行晾嘶。

由于延時(shí)任務(wù)的調(diào)度通常會(huì)有一個(gè) tickDuration 左右的延時(shí)妓雾。比如,任務(wù)的調(diào)度可能會(huì)晚幾毫秒或者幾十毫秒垒迂,也有可能晚一個(gè) tickDuration 左右械姻。所以時(shí)間輪只能處理那些對(duì)任務(wù)調(diào)度的及時(shí)性要求沒那么高的場景

3. Netty 時(shí)間輪相關(guān)設(shè)計(jì)模型的實(shí)現(xiàn)

image.png

3.1 HashedWheelTimer

Netty 使用一個(gè)叫做 HashedWheelTimer 的結(jié)構(gòu)來描述時(shí)間輪机断,其中包含了第二小節(jié)中介紹的所有重要屬性以及核心結(jié)構(gòu)楷拳。其中最核心的就是 wheel 環(huán)形數(shù)組,它相當(dāng)于鐘表的表盤吏奸,表盤中的每一個(gè)刻度用 HashedWheelBucket 結(jié)構(gòu)描述欢揖。

private final HashedWheelBucket[] wheel;

時(shí)間輪中究竟包含多少個(gè)刻度,是由構(gòu)造參數(shù) ticksPerWheel 決定的奋蔚,默認(rèn)為 512 她混。Netty 會(huì)根據(jù)延時(shí)時(shí)間的不同將所有提交到時(shí)間輪的延時(shí)任務(wù)分散到 512 個(gè) HashedWheelBucket 中組織管理。定位延時(shí)任務(wù)所在的 HashedWheelBucket 以及向 HashedWheelBucket 中添加泊碑,取消延時(shí)任務(wù)的時(shí)間復(fù)雜度均為 O(1)坤按。

    public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
            long maxPendingTimeouts, Executor taskExecutor) 

如果時(shí)間輪中需要調(diào)度的延時(shí)任務(wù)非常多,那么每個(gè) HashedWheelBucket 中就可能包含大量的延時(shí)任務(wù)馒过,這就導(dǎo)致時(shí)間輪的調(diào)度發(fā)生延遲臭脓。針對(duì)這種情況,我們可以適當(dāng)增加 ticksPerWheel 的個(gè)數(shù)腹忽,讓更多的 HashedWheelBucket 來分?jǐn)傃訒r(shí)任務(wù)谢鹊。但 ticksPerWheel 必須是 2 的次冪。

    private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        // ticksPerWheel 必須是 2 的次冪留凭,默認(rèn)為 512
        ticksPerWheel = MathUtil.findNextPositivePowerOfTwo(ticksPerWheel);
        // 創(chuàng)建時(shí)間輪中的 hash 槽
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i ++) {
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
    }

這樣一來,當(dāng)我們向時(shí)間輪添加延時(shí)的任務(wù)的時(shí)候偎巢,就可以通過 & 運(yùn)算來代替 % 運(yùn)算去尋找延時(shí)任務(wù)對(duì)應(yīng)的 HashedWheelBucket蔼夜。

mask = wheel.length - 1;

第二小節(jié)我們已經(jīng)介紹過了,在向時(shí)間輪添加延時(shí)任務(wù)時(shí)压昼,我們需要首先定位到這個(gè)延時(shí)任務(wù)最終會(huì)停留在哪一個(gè) tick 上求冷,時(shí)間輪中的 tick 是一個(gè)絕對(duì)值,它不會(huì)按照時(shí)鐘周期的結(jié)束而自動(dòng)歸 0 窍霞,而是一直會(huì)往上遞增匠题。

calculated 也是一個(gè)絕對(duì)值,表示延時(shí)任務(wù)最終會(huì)停留在哪一個(gè) tick 上但金,隨后通過 calculated & mask 定位到對(duì)應(yīng)的 HashedWheelBucket韭山,時(shí)間復(fù)雜度為 O(1)

// 計(jì)算延時(shí)任務(wù),最終會(huì)停留在哪一個(gè) tick 上
long calculated = timeout.deadline / tickDuration;

// 獲取 calculated 對(duì)應(yīng)的 HashedWheelBucket
int stopIndex = (int) (calculated & mask);
HashedWheelBucket bucket = wheel[stopIndex];

tickDuration 表示時(shí)間輪中的時(shí)鐘精度钱磅,也就是 tick 指針多久轉(zhuǎn)動(dòng)一次梦裂,默認(rèn)為 100ms,我們可以通過構(gòu)造參數(shù) tickDuration 進(jìn)行指定盖淡,但最小不能低于 1ms年柠。

private final long tickDuration;

tickDuration 的值越小,時(shí)間輪的精度越高褪迟,性能開銷也就越大冗恨。tickDuration 的值越大,時(shí)間輪的精度也就越低味赃,性能開銷越小掀抹。

現(xiàn)在時(shí)間輪的基本骨架就有了,而時(shí)間輪的運(yùn)轉(zhuǎn)靠的就是 workerThread 洁桌,由它來驅(qū)動(dòng)時(shí)鐘 tick 一下一下的轉(zhuǎn)動(dòng)渴丸,并執(zhí)行對(duì)應(yīng) HashedWheelBucket 中的延時(shí)任務(wù)。

private final Worker worker = new Worker();
private final Thread workerThread;

由于在默認(rèn)情況下另凌,Netty 時(shí)間輪中就只有這一個(gè)單線程 workerThread 來負(fù)責(zé)延時(shí)任務(wù)的調(diào)度與執(zhí)行谱轨,在面對(duì)海量并發(fā)任務(wù)的時(shí)候,難免顯得有點(diǎn)力不從心吠谢。執(zhí)行任務(wù)的時(shí)間過長土童,就會(huì)導(dǎo)致 tick 的轉(zhuǎn)動(dòng)產(chǎn)生很大的延時(shí)。于是 Netty 又在 4.1.69.Final 中引入了一個(gè) taskExecutor工坊,來專門負(fù)責(zé)執(zhí)行延時(shí)任務(wù)献汗。

 private final Executor taskExecutor;

我們可以通過構(gòu)造參數(shù) taskExecutor 來指定自定義的線程池,默認(rèn)情況下為 ImmediateExecutor 王污,其本質(zhì)還是由 workerThread 來執(zhí)行延時(shí)任務(wù)罢吃。

public final class ImmediateExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        ObjectUtil.checkNotNull(command, "command").run();
    }
}

workerThread 負(fù)責(zé)從對(duì)應(yīng) tick 的 HashedWheelBucket 中拉取延時(shí)任務(wù),然后將延時(shí)任務(wù)丟給 taskExecutor 來執(zhí)行昭齐。這在一定程度上提高了延時(shí)任務(wù)的消費(fèi)速度尿招,不至于拖慢 workerThread 從而影響到整個(gè)時(shí)間輪的運(yùn)轉(zhuǎn)。

時(shí)間輪中待執(zhí)行延時(shí)任務(wù)的最大個(gè)數(shù)受到參數(shù) maxPendingTimeouts 限制阱驾,默認(rèn)為 -1 就谜。當(dāng) maxPendingTimeouts 的值小于等于 0 的時(shí)候,表示 Netty 不會(huì)對(duì)時(shí)間輪中的延時(shí)任務(wù)個(gè)數(shù)進(jìn)行限制里覆。

// 時(shí)間輪中待執(zhí)行延時(shí)任務(wù)的最大個(gè)數(shù)
private final long maxPendingTimeouts;
// 時(shí)間輪當(dāng)前待執(zhí)行的延時(shí)任務(wù)個(gè)數(shù)
private final AtomicLong pendingTimeouts = new AtomicLong(0);

當(dāng)時(shí)間輪中的延時(shí)任務(wù)個(gè)數(shù)超過了 maxPendingTimeouts 的限制時(shí)丧荐,再向時(shí)間輪添加延時(shí)任務(wù)就會(huì)得到 RejectedExecutionException 異常。

    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {

        // 待執(zhí)行的延時(shí)任務(wù)計(jì)數(shù) + 1
        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
        // maxPendingTimeouts 默認(rèn)為 -1 喧枷。表示不對(duì)時(shí)間輪的延時(shí)任務(wù)個(gè)數(shù)進(jìn)行限制
        // 如果達(dá)到限制虹统,則不能繼續(xù)向時(shí)間輪添加任務(wù)
        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxPendingTimeouts + ")");
        }
                    弓坞,,窟却,昼丑,,夸赫,菩帝,,
    }

另外時(shí)間輪 HashedWheelTimer 在 JVM 進(jìn)程中的實(shí)例個(gè)數(shù)會(huì)受到 INSTANCE_COUNT_LIMIT 的限制茬腿,默認(rèn)為 64 呼奢。

    // 系統(tǒng)當(dāng)前時(shí)間輪實(shí)例的個(gè)數(shù)
    private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
    // 默認(rèn)允許的 HashedWheelTimer 最大實(shí)例個(gè)數(shù)
    private static final int INSTANCE_COUNT_LIMIT = 64;

如果當(dāng)前 JVM 進(jìn)程中的 HashedWheelTimer 實(shí)例個(gè)數(shù)超過了 64 ,那么 Netty 就會(huì)打印 Error 日志進(jìn)行警告切平。

    private static void reportTooManyInstances() {
        if (logger.isErrorEnabled()) {
            String resourceType = simpleClassName(HashedWheelTimer.class);
            logger.error("You are creating too many " + resourceType + " instances. " +
                    resourceType + " is a shared resource that must be reused across the JVM, " +
                    "so that only a few instances are created.");
        }
    }

從上面的警告信息我們可以看出握础,時(shí)間輪是一種共享的資源,既然是一種系統(tǒng)資源悴品,那么就和內(nèi)存資源一樣(ByteBuf)都存在資源泄露的風(fēng)險(xiǎn)禀综。當(dāng)我們使用完時(shí)間輪但忘記調(diào)用它的 stop 方法進(jìn)行關(guān)閉的時(shí)候,就發(fā)生了資源泄露苔严。

和 ByteBuf 一樣定枷,在 HashedWheelTimer 中也有一個(gè) ResourceLeakTracker 用于跟蹤探測資源泄露的發(fā)生,如果發(fā)生資源泄露届氢,Netty 就會(huì)以 Error 日志的形式打印出泄露的位置欠窒。

class SimpleLeakAwareByteBuf extends WrappedByteBuf {
   final ResourceLeakTracker<ByteBuf> leak;
}

public class HashedWheelTimer implements Timer {
    private final ResourceLeakTracker<HashedWheelTimer> leak;
}

關(guān)于 ResourceLeakTracker 的實(shí)現(xiàn)原理,感興趣的讀者可以回看下筆者之前的文章 《Netty 如何自動(dòng)探測內(nèi)存泄露的發(fā)生》

我們在創(chuàng)建 HashedWheelTimer 的時(shí)候可以通過構(gòu)造參數(shù) leakDetection 來開啟退子,關(guān)閉時(shí)間輪的資源泄露探測岖妄。leakDetection 默認(rèn)為 true , 表示無條件開啟資源泄露的探測。如果設(shè)置為 false , 那么只有當(dāng) workerThread 不是守護(hù)線程的時(shí)候才會(huì)開啟資源泄露探測寂祥。

workerThread 默認(rèn)情況下并不是一個(gè)守護(hù)線程荐虐。

leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

3.2 延時(shí)任務(wù)的添加

image.png

如上圖所示,當(dāng)我們向時(shí)間輪添加一個(gè)延時(shí)任務(wù)時(shí)丸凭,并不是大家想象的那樣福扬,直接將延時(shí)任務(wù)添加到時(shí)間輪中,而是首先添加到一個(gè)叫做 timeouts 的 MpscQueue 中贮乳。

    // 多線程在向 HashedWheelTimer 添加延時(shí)任務(wù)的時(shí)候,首先會(huì)將任務(wù)添加到 timeouts 中恬惯,而不是直接添加到時(shí)間輪里
    private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();

為什么會(huì)這么設(shè)計(jì)呢 向拆? 時(shí)間輪是一個(gè)單線程驅(qū)動(dòng)的模型,內(nèi)部只有一個(gè) workerThread 來推動(dòng) tick 的轉(zhuǎn)動(dòng)酪耳,并從對(duì)應(yīng) HashedWheelBucket 中拉取延時(shí)任務(wù)浓恳。所以時(shí)間輪采用的是無鎖化的設(shè)計(jì)刹缝,workerThread 在訪問內(nèi)部任何數(shù)據(jù)結(jié)構(gòu)的時(shí)候都不會(huì)加鎖。

而向時(shí)間輪添加延時(shí)任務(wù)的操作卻是多線程執(zhí)行的颈将,如果任務(wù)被直接添加到時(shí)間輪中梢夯,那么就破壞了無鎖化的設(shè)計(jì),workerThread 在訪問內(nèi)部相關(guān)數(shù)據(jù)結(jié)構(gòu)的時(shí)候就必須加鎖了晴圾。

所以為了避免加鎖的開銷颂砸,Netty 引入了一個(gè) MpscQueue 作為中轉(zhuǎn),業(yè)務(wù)多線程首先會(huì)將延時(shí)任務(wù)添加到 MpscQueue 中死姚。等到下一個(gè) tick , workerThread 調(diào)度延時(shí)任務(wù)的時(shí)候人乓,會(huì)統(tǒng)一將 MpscQueue 中的延時(shí)任務(wù)轉(zhuǎn)移到時(shí)間輪中。保證了 workerThread 單線程的無鎖化運(yùn)行都毒。

另外 Netty 時(shí)間輪采用了懶啟動(dòng)的設(shè)計(jì)色罚,只有第一次向時(shí)間輪添加延時(shí)任務(wù)的時(shí)候才會(huì)啟動(dòng)。因?yàn)闀r(shí)間輪一旦啟動(dòng)之后账劲,workerThread 就開始運(yùn)行戳护,推動(dòng) tick 一下一下的向前推進(jìn)。如果時(shí)間輪剛被創(chuàng)建出來就啟動(dòng)瀑焦,此時(shí)內(nèi)部又沒有任何延時(shí)任務(wù)腌且,這就導(dǎo)致了 tick 不必要的空轉(zhuǎn)。

當(dāng)時(shí)間輪啟動(dòng)之后蝠猬,就會(huì)根據(jù)延時(shí)任務(wù) TimerTask 的延時(shí)時(shí)間 delay 計(jì)算到期時(shí)間 deadline 切蟋, 然后將 TimerTask 封裝成 HashedWheelTimeout 添加到 MpscQueue 中。

private static final class HashedWheelTimeout implements Timeout, Runnable {
        // 延時(shí)任務(wù)所屬的時(shí)間輪
        private final HashedWheelTimer timer;
        // 延時(shí)任務(wù)
        private final TimerTask task;
        // 延時(shí)任務(wù)的 deadline 榆芦,該時(shí)間是一個(gè)絕對(duì)時(shí)間柄粹,以時(shí)間輪的啟動(dòng)時(shí)間 startTime 為起點(diǎn)
        private final long deadline;

        HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
            this.timer = timer;
            this.task = task;
            this.deadline = deadline;
        }
}

之前我們提到過,HashedWheelTimeout 中最重要的一個(gè)屬性就是延時(shí)任務(wù)的到期時(shí)間 deadline 匆绣, deadline 是一個(gè)絕對(duì)時(shí)間戳驻右,Netty 時(shí)間輪中的時(shí)間坐標(biāo)系全部是以時(shí)間輪的啟動(dòng)時(shí)間點(diǎn) startTime 為基準(zhǔn)的,deadline 表示從 startTime 這個(gè)時(shí)間點(diǎn)開始崎淳,到 deadline 這個(gè)時(shí)間點(diǎn)到期堪夭。

image.png

為什么這么設(shè)計(jì)呢 ? 這是因?yàn)槲覀冃枰褧r(shí)間輪的啟動(dòng)時(shí)間也考慮進(jìn)延時(shí)時(shí)間的計(jì)算當(dāng)中拣凹。比如森爽,我們向時(shí)間輪中添加一個(gè)延時(shí) 5ms 執(zhí)行的任務(wù),其中時(shí)間輪啟動(dòng)花了 2ms , 那么這個(gè)延時(shí)任務(wù)就應(yīng)該在時(shí)間輪啟動(dòng)后 3ms 開始執(zhí)行嚣镜。所以在計(jì)算延時(shí)任務(wù)到期時(shí)間戳 deadline 的時(shí)候需要減去時(shí)間輪的啟動(dòng)時(shí)間爬迟。后續(xù)時(shí)間輪的時(shí)間坐標(biāo)軸均以 startTime 為基準(zhǔn)。

long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

下面是延時(shí)任務(wù)完整的添加邏輯菊匿,整個(gè)時(shí)間復(fù)雜度為 O(1)

public class HashedWheelTimer implements Timer {
    // 時(shí)間輪的啟動(dòng)時(shí)間戳(納秒)
    private volatile long startTime;

    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        // 懶啟動(dòng)時(shí)間輪付呕,worker 線程會(huì)等待 100ms 后執(zhí)行
        start();
        // 計(jì)算延時(shí)任務(wù)到期的時(shí)間戳
        // 時(shí)間戳的參考坐標(biāo)系均以時(shí)間輪的啟動(dòng)時(shí)間 startTime 為起點(diǎn)
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }
}

3.3 延時(shí)任務(wù)的取消

延時(shí)任務(wù)的取消和添加一樣计福,它們都是在 workerThread 之外進(jìn)行操作的,所以當(dāng)業(yè)務(wù)線程取消一個(gè)延時(shí)任務(wù)時(shí)徽职,也是先將這個(gè)被取消的延時(shí)任務(wù)放到一個(gè) MpscQueue 中暫存象颖。

private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();

等到下一個(gè) tick 到來的時(shí)候,workerThread 會(huì)統(tǒng)一處理 cancelledTimeouts 集合中已經(jīng)被取消的延時(shí)任務(wù)姆钉。

        private void processCancelledTasks() {
            for (;;) {
                // workerThread 不斷的從 cancelledTimeouts 中拉取被取消的延時(shí)任務(wù)
                HashedWheelTimeout timeout = cancelledTimeouts.poll();
                if (timeout == null) {
                    // all processed
                    break;
                }
                try {
                    // 將延時(shí)任務(wù)從 HashedWheelBucket 中刪除说订,時(shí)間復(fù)雜度 O(1)
                    timeout.remove();
                } catch (Throwable t) {

                }
            }
        }

延時(shí)任務(wù) HashedWheelTimeout 的狀態(tài)一共有三個(gè),初始為 ST_INIT育韩,任務(wù)被取消之后會(huì)更新為 ST_CANCELLED克蚂,任務(wù)準(zhǔn)備執(zhí)行的時(shí)候會(huì)更新為 ST_EXPIRED。

private static final class HashedWheelTimeout implements Timeout, Runnable {
        private static final int ST_INIT = 0;
        private static final int ST_CANCELLED = 1;
        private static final int ST_EXPIRED = 2;

        private volatile int state = ST_INIT;
        // 延時(shí)任務(wù)所屬的時(shí)間輪
        private final HashedWheelTimer timer;

        @Override
        public boolean cancel() {
            // 更新任務(wù)狀態(tài)為 ST_CANCELLED
            if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
                return false;
            }

            timer.cancelledTimeouts.add(this);
            return true;
        }
}

3.4 時(shí)間輪的啟動(dòng)

之前我們提過筋讨,Netty 時(shí)間輪采用了懶啟動(dòng)的設(shè)計(jì)埃叭,當(dāng)我們首次向時(shí)間輪添加延時(shí)任務(wù)的時(shí)候才會(huì)啟動(dòng)。時(shí)間輪有三種狀態(tài)悉罕,剛被創(chuàng)建出來的時(shí)候狀態(tài)為 WORKER_STATE_INIT赤屋,啟動(dòng)之后狀態(tài)為 WORKER_STATE_STARTED,關(guān)閉之后狀態(tài)為 WORKER_STATE_SHUTDOWN壁袄。

public class HashedWheelTimer implements Timer {
    // 初始狀態(tài)
    public static final int WORKER_STATE_INIT = 0;
    // 啟動(dòng)狀態(tài)
    public static final int WORKER_STATE_STARTED = 1;
    // 關(guān)閉狀態(tài)
    public static final int WORKER_STATE_SHUTDOWN = 2;
    // 時(shí)間輪的狀態(tài)类早,初始為 WORKER_STATE_INIT
    private volatile int workerState; 
    // 原子更新時(shí)間輪狀態(tài)的 Updater
    private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
    // 監(jiān)聽時(shí)間輪的啟動(dòng)事件
    private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
    // 時(shí)間輪的啟動(dòng)時(shí)間戳(納秒)
    private volatile long startTime;

    public void start() {
        switch (WORKER_STATE_UPDATER.get(this)) {
            case WORKER_STATE_INIT:
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    // 啟動(dòng) workerThread , 它是一個(gè)非守護(hù)線程
                    workerThread.start();
                }
                break;
            case WORKER_STATE_STARTED:
                break;
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }

        // 等待 workerThread 的啟動(dòng)嗜逻,workerThread 啟動(dòng)之后會(huì)設(shè)置 startTime涩僻,并執(zhí)行 startTimeInitialized.countdown
        while (startTime == 0) {
            try {
                // 等待時(shí)間輪的啟動(dòng)
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
              
            }
        }
    }
}

時(shí)間輪中有一個(gè)重要的屬性 startTime,初始狀態(tài)下為 0 栈顷,啟動(dòng)之后逆日,workerThread 會(huì)將啟動(dòng)那一刻的時(shí)間戳設(shè)置到 startTime 中,這個(gè) startTime 非常重要萄凤,因?yàn)楹罄m(xù)時(shí)間輪中的時(shí)間坐標(biāo)系均是以 startTime 為基準(zhǔn)的室抽。時(shí)間輪啟動(dòng)的一項(xiàng)重要工作就是設(shè)置這個(gè) startTime。

private final class Worker implements Runnable {
       @Override
        public void run() {
              // 設(shè)置時(shí)間輪的啟動(dòng)時(shí)間戳
              startTime = System.nanoTime();
              // 通知正在等待時(shí)間輪啟動(dòng)的業(yè)務(wù)線程
              startTimeInitialized.countDown();

                        ..........
        }
}

3.5 時(shí)間輪的運(yùn)轉(zhuǎn)

image.png

時(shí)間輪會(huì)按照一定的到期 deadline 時(shí)間范圍將所有的延時(shí)任務(wù)分別打散到 512 個(gè) HashedWheelBucket 中靡努,比如坪圾,我們在 tick = 0 這個(gè)時(shí)刻向時(shí)間輪添加延時(shí)任務(wù),如果這個(gè)任務(wù)的 deadline 在 [ 0 , 100 )ms 內(nèi)惑朦,那么它將會(huì)被添加到 HashedWheelBucket0兽泄,中,如果 deadline 在 [ 100 , 200 )ms 內(nèi)漾月,那么就會(huì)被添加到 HashedWheelBucket1 中病梢。同樣的道理,如果 deadline 在 [ 200, 300 )ms 內(nèi)栅屏,它將會(huì)被添加到 HashedWheelBucket2 中飘千,以此類推。

假設(shè)當(dāng)前 tick = 2 , 那么就表示 HashedWheelBucket2 中的延時(shí)任務(wù)馬上要被 workerThread 調(diào)度執(zhí)行栈雳,那么具體在什么時(shí)間執(zhí)行呢 护奈?

時(shí)間輪中的時(shí)間紀(jì)元是 tick = 0 ,也就是從 0ms 開始哥纫, HashedWheelBucket2 中所有的延時(shí)任務(wù),它們的 deadline 都在 [ 200, 300 )ms 以內(nèi)。那么當(dāng) tick 從 0 轉(zhuǎn)動(dòng)到 2 的時(shí)候目锭,就表示時(shí)間已經(jīng)過去了 200ms荧呐。

但此時(shí)還不能馬上就開始執(zhí)行 HashedWheelBucket2 中的任務(wù),因?yàn)樗鼈兊难訒r(shí)時(shí)間可能是 210ms , 250ms 也可能是 299ms 擅憔,如果在 tick = 2 也就是 200ms 的這個(gè)時(shí)間點(diǎn)就馬上執(zhí)行鸵闪,那么這些任務(wù)就被提前執(zhí)行了。

所以我們需要等到 300ms 也就是 tick = 3 這個(gè)時(shí)刻才能執(zhí)行 HashedWheelBucket2 中的延時(shí)任務(wù)暑诸,注意這里 tick = 3 指的是具體真實(shí)的時(shí)間已經(jīng)到了 300ms 這個(gè)時(shí)間點(diǎn)蚌讼,而時(shí)間輪中的 tick 還是指向 2 ,并沒有向前推進(jìn)个榕。

也就是說篡石,延時(shí) 210ms , 250ms , 299ms 的任務(wù),需要等到 300ms 之后才能得到執(zhí)行西采,這里我們也可以看出凰萨,時(shí)間輪的精度是 tickDuration (默認(rèn) 100ms),延時(shí)任務(wù)的調(diào)度通常會(huì)晚一個(gè) 100ms 左右械馆。

這里提到 "100ms 左右" 的意思是胖眷,時(shí)間輪中的延時(shí)任務(wù)可能會(huì)被晚調(diào)度 5ms ,也可能晚調(diào)度 9ms ,也可能是幾十毫秒,也有可能是 105ms 狱杰, 108ms , 111ms 瘦材。具體這個(gè)調(diào)度延遲的不確定性是如何產(chǎn)生的,我們放在下一個(gè)小節(jié)在討論仿畸,這里大家有這個(gè)概念就可以了食棕。

因此 workerThread 在調(diào)度延時(shí)任務(wù)的時(shí)候,通常會(huì)首先等到 next tick 的時(shí)間點(diǎn)來臨才會(huì)開始執(zhí)行當(dāng)前 tick 對(duì)應(yīng)的 HashedWheelBucket错沽。

        private long waitForNextTick() {
            // 獲取 tick + 1 對(duì)應(yīng)的時(shí)間戳 deadline簿晓,后續(xù) workerThread 會(huì) sleep 直到 deadline 到達(dá)
            long deadline = tickDuration * (tick + 1);

            for (;;) {
                // 時(shí)間輪時(shí)間軸中的當(dāng)前時(shí)間戳
                final long currentTime = System.nanoTime() - startTime;
                // 這里需要保證 workerThread 至少要 sleep 1ms ,防止其被頻繁的喚醒
                long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
                // 如果 deadline 已過期千埃,那么直接返回 currentTime
                // tick bucket 中延時(shí)任務(wù)的 deadline 小于等于 currentTime 的就會(huì)被執(zhí)行
                if (sleepTimeMs <= 0) {
                    // currentTime 溢出
                    if (currentTime == Long.MIN_VALUE) {
                        return -Long.MAX_VALUE;
                    } else {
                        return currentTime;
                    }
                }

                try {
                    // sleep 等待到 deadline 時(shí)間點(diǎn)憔儿,然后執(zhí)行當(dāng)前 tick bucket 中的延時(shí)任務(wù)(timeout.deadline <= deadline)
                    Thread.sleep(sleepTimeMs);
                } catch (InterruptedException ignored) {
                    // 時(shí)間輪被其他線程關(guān)閉,中斷 worker 線程
                    if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                        return Long.MIN_VALUE;
                    }
                }
            }
        }

時(shí)間輪的精度是由 tickDuration 決定的放可,這個(gè)值我們可以調(diào)節(jié)谒臼,默認(rèn)為 100ms , 但最小不能低于 1ms 朝刊。tickDuration 越小,時(shí)間輪的精度越高蜈缤,同時(shí) workerThread 的繁忙程度也越高 拾氓。如果 tickDuration 設(shè)置的過小,那么 workerThread 在這里就會(huì)被頻繁的喚醒底哥。

所以為了防止 workerThread 被頻繁的喚醒咙鞍,我們需要保證至少要 sleep 1ms 。

long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

如果 sleepTimeMs <= 0 則說明當(dāng)前時(shí)間點(diǎn) currentTime 已經(jīng)過了 tick + 1 對(duì)應(yīng)的時(shí)間戳 deadline 趾徽, 這樣就不用在這里等待了续滋,直接返回 currentTime。

只要當(dāng)前 tick 對(duì)應(yīng)的 HashedWheelBucket 中的延時(shí)任務(wù)到期時(shí)間小于等于 currentTime (延時(shí)任務(wù)以過期)孵奶,workerThread 會(huì)將會(huì)執(zhí)行它們疲酌。

如果 sleepTimeMs > 0 則說明當(dāng)前時(shí)間還沒有到達(dá) tick + 1 這個(gè)時(shí)間點(diǎn),那么 workerThread 就會(huì)在這里睡眠等待了袁。

當(dāng)時(shí)間到達(dá) tick + 1 這個(gè)時(shí)間點(diǎn)之后徐勃,workerThread 就會(huì)從這里喚醒,轉(zhuǎn)去執(zhí)行當(dāng)前 tick 對(duì)應(yīng)的 HashedWheelBucket 里的延時(shí)任務(wù)早像。

int idx = (int) (tick & mask);
HashedWheelBucket bucket = wheel[idx];

但 HashedWheelBucket 里面此時(shí)可能還是空的僻肖,沒有任何延時(shí)任務(wù)。因?yàn)楫?dāng)業(yè)務(wù)線程在向時(shí)間輪添加延時(shí)任務(wù)的時(shí)候卢鹦,首先是要將任務(wù)添加到一個(gè)叫做 timeouts 的 MpscQueue 中臀脏。也就是說延時(shí)任務(wù)首先會(huì)在 timeouts 中緩存,并不會(huì)直接添加到對(duì)應(yīng)的 HashedWheelBucket 中冀自,

那么 workerThread 在被喚醒之后揉稚,首先要做的就是從 timeouts 中將延時(shí)任務(wù)轉(zhuǎn)移到時(shí)間輪對(duì)應(yīng)的 HashedWheelBucket 中。

        private void transferTimeoutsToBuckets() {
            // 每個(gè) tick 最多只能從 timeouts 中轉(zhuǎn)移 10 萬個(gè)延時(shí)任務(wù)到時(shí)間輪中
            // 防止極端情況下 worker 線程在這里不停地拉取任務(wù)熬粗,執(zhí)行任務(wù)
            // 剩下的任務(wù)等到下一個(gè) tick 在進(jìn)行轉(zhuǎn)移
            for (int i = 0; i < 100000; i++) {
                // 拉取待執(zhí)行的延時(shí)任務(wù)
                HashedWheelTimeout timeout = timeouts.poll();
                // 跳過已被取消的任務(wù)
                if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                    continue;
                }
                // 計(jì)算延時(shí)任務(wù)最終會(huì)停留在哪個(gè) tick 上
                // 這里的 tick 和 calculated 是一個(gè)絕對(duì)值搀玖,從 0 開始增加,只增不減
                long calculated = timeout.deadline / tickDuration;
                // 時(shí)間輪從當(dāng)前 tick 開始轉(zhuǎn)動(dòng)到 calculated 需要經(jīng)過多少個(gè)時(shí)鐘周期
                timeout.remainingRounds = (calculated - tick) / wheel.length;
                // calculated < 當(dāng)前 tick , 則表示延時(shí)任務(wù) timeout 已經(jīng)過期了
                // 那么就將過期的 timeout 放在當(dāng)前 tick 中執(zhí)行
                final long ticks = Math.max(calculated, tick);
                int stopIndex = (int) (ticks & mask);

                HashedWheelBucket bucket = wheel[stopIndex];
                bucket.addTimeout(timeout);
            }
        }

當(dāng)任務(wù)轉(zhuǎn)移完成之后驻呐,workerThread 開始處理當(dāng)前 tick 對(duì)應(yīng)的HashedWheelBucket灌诅,將 HashedWheelBucket 中的延時(shí)任務(wù)挨個(gè)拉取出來執(zhí)行。當(dāng)所有到期的延時(shí)任務(wù)被執(zhí)行完之后含末,tick 向前推進(jìn)一格猜拾,開啟新一輪的循環(huán)。

image.png
private static final class HashedWheelBucket {

       public void expireTimeouts(long deadline) {
            HashedWheelTimeout timeout = head;
            // process all timeouts
            while (timeout != null) { // bucket 不為空
                HashedWheelTimeout next = timeout.next;
                if (timeout.remainingRounds <= 0) { // 屬于當(dāng)前時(shí)鐘周期
                    next = remove(timeout); // 從 bucket 中刪除
                    if (timeout.deadline <= deadline) {
                        // 執(zhí)行延時(shí)任務(wù)
                        timeout.expire();
                    } else {
                        // The timeout was placed into a wrong slot. This should never happen.
                        throw new IllegalStateException(String.format(
                                "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                } else if (timeout.isCancelled()) {
                    next = remove(timeout);
                } else {
                    timeout.remainingRounds --;
                }
                timeout = next;
            }
        }
}

時(shí)間輪中的延時(shí)任務(wù)默認(rèn)情況下是由 workerThread 執(zhí)行的佣盒,但如果我們在創(chuàng)建時(shí)間輪的時(shí)候指定了專門的 taskExecutor 挎袜, 那么延時(shí)任務(wù)就會(huì)由這個(gè) taskExecutor 負(fù)責(zé)執(zhí)行,workerThread 只負(fù)責(zé)調(diào)度,大大減輕了 workerThread 的負(fù)荷盯仪。

private static final class HashedWheelTimeout implements Timeout, Runnable { 
        // 時(shí)間輪
        private final HashedWheelTimer timer;

        public void expire() {
            if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
                return;
            }

            try {
                // 時(shí)間輪的 taskExecutor 負(fù)責(zé)執(zhí)行延時(shí)任務(wù)紊搪,默認(rèn)為 workerThread
                timer.taskExecutor.execute(this);
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
                            + " for execution.", t);
                }
            }
        }
}

下面是時(shí)間輪運(yùn)轉(zhuǎn)的完整邏輯流程:

    private final class Worker implements Runnable {
        @Override
        public void run() {
          
            do {
                // workerThread 這里會(huì)等待到下一個(gè) tick 的時(shí)間點(diǎn)
                final long deadline = waitForNextTick();
                // deadline < 0 表示 currentTime 溢出
                if (deadline > 0) {
                    int idx = (int) (tick & mask);
                    // 將 cancelledTimeouts 中已經(jīng)取消的 task 從對(duì)應(yīng) bucket 中刪除
                    processCancelledTasks();
                    HashedWheelBucket bucket =
                            wheel[idx];
                    // 將 timeouts 中收集的延時(shí)任務(wù)添加到時(shí)間輪中
                    transferTimeoutsToBuckets();
                    // 執(zhí)行當(dāng)前 tick 對(duì)應(yīng) bucket 中的所有延時(shí)任務(wù)
                    bucket.expireTimeouts(deadline);
                    tick++;
                }
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

                      ..............

    }

3.6 調(diào)度延遲的不確定性是如何產(chǎn)生的

前面我們提到過,時(shí)間輪只適合那種對(duì)延時(shí)任務(wù)的調(diào)度及時(shí)性要求沒那么高的場景全景,Netty 時(shí)間輪的精度為一個(gè) tickDuration嗦明,默認(rèn)為 100ms 。延時(shí)任務(wù)的調(diào)度通常會(huì)晚 100ms 左右蚪燕。

比如,現(xiàn)在我們向時(shí)間輪添加一個(gè)延時(shí) 5ms 之后執(zhí)行的任務(wù)奔浅,那么這個(gè)延時(shí)任務(wù)可能會(huì)在 8ms 之后執(zhí)行馆纳,也可能是 65ms 之后執(zhí)行,也有可能在 108ms 之后執(zhí)行汹桦÷呈唬總之,時(shí)間輪調(diào)度的延遲范圍會(huì)在 100ms 左右舞骆。那為什么會(huì)出現(xiàn)這種不確定性呢 钥弯?

這其中主要有兩個(gè)原因,首先第一個(gè)原因就是時(shí)間輪的延時(shí)任務(wù)太多督禽,延時(shí)任務(wù)的邏輯比較復(fù)雜脆霎,執(zhí)行時(shí)間略長,導(dǎo)致了 workerThread 的阻塞狈惫,從而造成了任務(wù)調(diào)度的延遲睛蛛。減緩這種情況的一個(gè)措施就是在創(chuàng)建時(shí)間輪的時(shí)候,我們可以指定一個(gè)自定義的 taskExecutor 來專門負(fù)責(zé)延時(shí)任務(wù)的執(zhí)行胧谈,減輕 workerThread 的負(fù)荷忆肾。或者增大 HashedWheelBucket 的個(gè)數(shù)菱肖,盡量的分散延時(shí)任務(wù)客冈,不要讓它們集中在某一個(gè) HashedWheelBucket 中。

第二個(gè)原因是要看我們究竟在哪一個(gè)時(shí)間點(diǎn)向時(shí)間輪添加延時(shí)任務(wù)稳强。不同的添加時(shí)機(jī)场仲,也會(huì)造成調(diào)度的不確定性。這可能有點(diǎn)難以理解退疫,我們來看一個(gè)具體的例子燎窘。

image.png

比如,我們在下圖時(shí)間軸中 1ms 這個(gè)時(shí)刻向時(shí)間輪添加一個(gè)延時(shí) 5ms 執(zhí)行的任務(wù)蹄咖。當(dāng)前時(shí)間輪如上圖所示褐健,tick 指向 0 。

image.png

延時(shí) 5ms 的任務(wù)會(huì)被添加到 HashedWheelBucket0 中,此時(shí) workerThread 還在 sleep 等待 next tick 也就是 100ms 這個(gè)時(shí)間點(diǎn)的到來蚜迅。

我們在 1ms 這個(gè)時(shí)刻添加的這個(gè)延時(shí)任務(wù)本來應(yīng)該在時(shí)間軸中的 6ms 這個(gè)時(shí)間點(diǎn)執(zhí)行舵匾,但是現(xiàn)在 workerThread 還在睡眠,需要等到 100ms 這個(gè)時(shí)間點(diǎn)才能被喚醒去執(zhí)行 HashedWheelBucket0 中的延時(shí)任務(wù)谁不。這就產(chǎn)生了 90 ms 的調(diào)度延時(shí)坐梯。

但如果我們在時(shí)間軸的 94ms 位置處添加這個(gè) 5ms 的延時(shí)任務(wù),那么這個(gè)延時(shí)任務(wù)本應(yīng)該在時(shí)間軸的 99ms 這個(gè)時(shí)間點(diǎn)被執(zhí)行刹帕,但由于 workerThread 在 100ms 這個(gè)時(shí)間點(diǎn)才會(huì)被喚醒吵血,所以產(chǎn)生了 1ms 的調(diào)度延時(shí)。

如果非常不幸偷溺,我們恰好卡在了時(shí)間軸 95ms 這個(gè)時(shí)間點(diǎn)添加這個(gè) 5ms 的延時(shí)任務(wù)蹋辅,此時(shí)要注意,這個(gè)延時(shí)任務(wù)會(huì)被放在 HashedWheelBucket1 中而不是 HashedWheelBucket0挫掏。

而 HashedWheelBucket1 中的延時(shí)任務(wù)侦另,workerThread 需要等到時(shí)間軸 200ms 這個(gè)時(shí)間點(diǎn)才會(huì)去執(zhí)行,這樣一來尉共,本應(yīng)該在 100ms 這個(gè)時(shí)間點(diǎn)執(zhí)行的延時(shí)任務(wù)褒傅,時(shí)間輪卻在 200ms 這個(gè)時(shí)間點(diǎn)來調(diào)度,這就產(chǎn)生了 100ms 的調(diào)度延時(shí)袄友。如果在算上 CPU 調(diào)度 workerThread 的時(shí)間殿托,那么這個(gè)延遲可能就在 105ms 或者 108ms 左右。這里大家可以對(duì)照上小節(jié)的內(nèi)容剧蚣,仔細(xì)想想是不是這么回事碌尔。

3.7 時(shí)間輪的關(guān)閉

時(shí)間輪定義了三種狀態(tài),在剛被創(chuàng)建出來的時(shí)候狀態(tài)為 WORKER_STATE_INIT券敌,啟動(dòng)之后狀態(tài)為 WORKER_STATE_STARTED唾戚。

public class HashedWheelTimer implements Timer {
    // 初始狀態(tài)
    public static final int WORKER_STATE_INIT = 0;
    // 啟動(dòng)狀態(tài)
    public static final int WORKER_STATE_STARTED = 1;
    // 關(guān)閉狀態(tài)
    public static final int WORKER_STATE_SHUTDOWN = 2;
    // 時(shí)間輪的狀態(tài),初始為 WORKER_STATE_INIT
    private volatile int workerState; 
}

當(dāng)時(shí)間輪要關(guān)閉的時(shí)候待诅,我們就需要將 workerState 更新為 WORKER_STATE_SHUTDOWN叹坦。當(dāng) workerThread 檢測到時(shí)間輪的狀態(tài)不是 WORKER_STATE_STARTED 的時(shí)候,就會(huì)退出 do ... while 循環(huán)卑雁,停止時(shí)間輪的轉(zhuǎn)動(dòng)募书。

隨后 workerThread 會(huì)將當(dāng)前時(shí)間輪中所有 HashedWheelBuckets 中遺留的(未來得及執(zhí)行,未取消)的延時(shí)任務(wù)以及 timeouts 隊(duì)列中緩存的待執(zhí)行延時(shí)任務(wù)(未取消)統(tǒng)統(tǒng)轉(zhuǎn)移到 unprocessedTimeouts 集合中测蹲。并將已經(jīng)取消的延時(shí)任務(wù)從對(duì)應(yīng) HashedWheelBucket 中刪除莹捡。

private final class Worker implements Runnable {

        private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

        @Override
        public void run() {
            do {

                      ........ 時(shí)間輪運(yùn)轉(zhuǎn)邏輯 .......

            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);


            for (HashedWheelBucket bucket: wheel) {
                // 將 bucket 中還沒來得及執(zhí)行并且沒有被取消的任務(wù)轉(zhuǎn)移到 unprocessedTimeouts
                bucket.clearTimeouts(unprocessedTimeouts);
            }
            // 將 timeouts 中緩存的待執(zhí)行任務(wù)(沒有被取消)轉(zhuǎn)移到 unprocessedTimeouts
            for (;;) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (!timeout.isCancelled()) {
                    unprocessedTimeouts.add(timeout);
                }
            }
            // 將 cancelledTimeouts 中的延時(shí)任務(wù)從對(duì)應(yīng) bucket 中刪除
            processCancelledTasks();
        }
}

最后 Netty 會(huì)將 unprocessedTimeouts 集合中收集到的那些還未來得及執(zhí)行的延時(shí)任務(wù)全部取消,然后將這些延時(shí)任務(wù)返回給業(yè)務(wù)線程扣甲,由業(yè)務(wù)線程自行處理篮赢。

時(shí)間輪關(guān)閉的完整流程總結(jié)如下:

  1. 在延時(shí)任務(wù)中不能執(zhí)行時(shí)間輪關(guān)閉的操作齿椅。

  2. 原子更新時(shí)間輪的狀態(tài)為 WORKER_STATE_SHUTDOWN

  3. 中斷 workerThread,并等待 workerThread 結(jié)束启泣。

  4. 取消所有還未來得及調(diào)度的延時(shí)任務(wù)涣脚,并返回給業(yè)務(wù)線程。

public class HashedWheelTimer implements Timer {
    @Override
    public Set<Timeout> stop() {
        // 在延時(shí)任務(wù)中不能執(zhí)行停止時(shí)間輪的操作
        if (Thread.currentThread() == workerThread) {
            throw new IllegalStateException(
                    HashedWheelTimer.class.getSimpleName() +
                            ".stop() cannot be called from " +
                            TimerTask.class.getSimpleName());
        }
        // 停止時(shí)間輪
        if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
            // cas 更新狀態(tài)失敗寥茫,這里時(shí)間輪的狀態(tài)可能會(huì)是兩種:
            // 1. WORKER_STATE_INIT 還未啟動(dòng)
            // 2. WORKER_STATE_SHUTDOWN 已經(jīng)停止
            if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
                // 如果時(shí)間輪還未啟動(dòng)遣蚀,那么直接停止就好了
                if (leak != null) {
                    boolean closed = leak.close(this);
                    assert closed;
                }
            }
            // 時(shí)間輪沒有啟動(dòng),那么肯定也就沒有延時(shí)任務(wù)纱耻,這里直接返回一個(gè)空集合就行
            return Collections.emptySet();
        }

        try {
            boolean interrupted = false;
            while (workerThread.isAlive()) {
                // 中斷 workerThread芭梯,使其從 sleep 中喚醒,執(zhí)行時(shí)間輪關(guān)閉的邏輯
                // 如果 workerThread 在運(yùn)行弄喘,那么此時(shí)時(shí)間輪已經(jīng)是 WORKER_STATE_SHUTDOWN 狀態(tài)
                // workerThread 會(huì)退出 do while 循環(huán)轉(zhuǎn)去執(zhí)行時(shí)間輪的關(guān)閉邏輯
                workerThread.interrupt();
                try {
                    // 等待 workerThread 的結(jié)束
                    workerThread.join(100);
                } catch (InterruptedException ignored) {
                    // 當(dāng)前線程被中斷
                    interrupted = true;
                }
            }

            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        } finally {
            if (leak != null) {
                // 關(guān)閉資源泄露探測
                boolean closed = leak.close(this);
                assert closed;
            }
        }
        // 獲取還未來得及處理的延時(shí)任務(wù)
        Set<Timeout> unprocessed = worker.unprocessedTimeouts();
        Set<Timeout> cancelled = new HashSet<Timeout>(unprocessed.size());
        // 將還未來得及處理的任務(wù)全部取消玖喘,然后返回
        for (Timeout timeout : unprocessed) {
            if (timeout.cancel()) {
                cancelled.add(timeout);
            }
        }
        return cancelled;
    }
}

《時(shí)間輪在 Netty , Kafka 中的設(shè)計(jì)與實(shí)現(xiàn)(下)》

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市限次,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌柴灯,老刑警劉巖卖漫,帶你破解...
    沈念sama閱讀 219,490評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異赠群,居然都是意外死亡羊始,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門查描,熙熙樓的掌柜王于貴愁眉苦臉地迎上來突委,“玉大人,你說我怎么就攤上這事冬三≡扔停” “怎么了?”我有些...
    開封第一講書人閱讀 165,830評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵勾笆,是天一觀的道長敌蚜。 經(jīng)常有香客問我,道長窝爪,這世上最難降的妖魔是什么弛车? 我笑而不...
    開封第一講書人閱讀 58,957評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮蒲每,結(jié)果婚禮上纷跛,老公的妹妹穿的比我還像新娘。我一直安慰自己邀杏,他們只是感情好贫奠,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,974評(píng)論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般叮阅。 火紅的嫁衣襯著肌膚如雪刁品。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,754評(píng)論 1 307
  • 那天浩姥,我揣著相機(jī)與錄音挑随,去河邊找鬼。 笑死勒叠,一個(gè)胖子當(dāng)著我的面吹牛兜挨,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播眯分,決...
    沈念sama閱讀 40,464評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼拌汇,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了弊决?” 一聲冷哼從身側(cè)響起噪舀,我...
    開封第一講書人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎飘诗,沒想到半個(gè)月后与倡,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,847評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡昆稿,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,995評(píng)論 3 338
  • 正文 我和宋清朗相戀三年纺座,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片溉潭。...
    茶點(diǎn)故事閱讀 40,137評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡净响,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出喳瓣,到底是詐尸還是另有隱情馋贤,我是刑警寧澤,帶...
    沈念sama閱讀 35,819評(píng)論 5 346
  • 正文 年R本政府宣布畏陕,位于F島的核電站掸掸,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏蹭秋。R本人自食惡果不足惜扰付,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,482評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望仁讨。 院中可真熱鬧羽莺,春花似錦、人聲如沸洞豁。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至刁卜,卻和暖如春志电,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背蛔趴。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評(píng)論 1 272
  • 我被黑心中介騙來泰國打工挑辆, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人孝情。 一個(gè)月前我還...
    沈念sama閱讀 48,409評(píng)論 3 373
  • 正文 我出身青樓鱼蝉,卻偏偏與公主長得像,于是被迫代替她去往敵國和親箫荡。 傳聞我的和親對(duì)象是個(gè)殘疾皇子魁亦,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,086評(píng)論 2 355