本文基于 Netty 4.1.112.Final , Kafka 3.9.0 版本進(jìn)行討論
在業(yè)務(wù)開發(fā)的場景中,我們經(jīng)常會(huì)遇到很多定時(shí)任務(wù)的需求彤避。比如剂癌,生成業(yè)務(wù)報(bào)表境肾,周期性對(duì)賬永丝,同步數(shù)據(jù)锹漱,訂單支付超時(shí)處理等。針對(duì)業(yè)務(wù)場景中定時(shí)任務(wù)邏輯復(fù)雜慕嚷,執(zhí)行時(shí)間長的特點(diǎn)哥牍,市面上已經(jīng)有很多成熟的任務(wù)調(diào)度中間件可供我們選擇。比如:ElasticJob , XXL-JOB , PowerJob 等等喝检。
而在中間件的場景中嗅辣,同樣也存在很多定時(shí)任務(wù)的需求。比如挠说,網(wǎng)絡(luò)連接的心跳檢測辩诞,網(wǎng)絡(luò)請求超時(shí)或失敗的重試機(jī)制,網(wǎng)絡(luò)連接斷開之后的重連機(jī)制纺涤。和業(yè)務(wù)場景不同的是,這些中間件場景的定時(shí)任務(wù)特點(diǎn)是邏輯簡單抠忘,執(zhí)行時(shí)間非常短撩炊,而且對(duì)時(shí)間精度的要求比較低。比如崎脉,心跳檢測以及失敗重試這些定時(shí)任務(wù)拧咳,其實(shí)晚執(zhí)行個(gè)幾十毫秒或者 100 毫秒也無所謂。
于是針對(duì)中間件場景中的這些定時(shí)任務(wù)特點(diǎn):
- 海量任務(wù)
- 任務(wù)邏輯簡單
- 執(zhí)行時(shí)間短
- 對(duì)任務(wù)調(diào)度的及時(shí)性沒有那么高的要求
各大中間件設(shè)計(jì)了時(shí)間輪來調(diào)度具有上述 4 種特征的定時(shí)任務(wù)囚灼,而本文主要討論的就是時(shí)間輪的設(shè)計(jì)與實(shí)現(xiàn)骆膝。但在這之前我們需要搞清楚時(shí)間輪這個(gè)設(shè)計(jì)需求是怎么產(chǎn)生的祭衩,我們先從 JDK 中的任務(wù)調(diào)度組件開始聊起,看看 JDK 中的這些任務(wù)調(diào)度組件為什么不能滿足中間件的場景阅签。
1. JDK 中的任務(wù)調(diào)度組件
說到定時(shí)任務(wù)掐暮,我們第一時(shí)間就能想到的調(diào)度組件就是 JDK 中的 Timer,為什么這么說呢政钟,因?yàn)楣P者剛參加工作時(shí)的第一個(gè)任務(wù)就是用 Timer 實(shí)現(xiàn)的路克,當(dāng)時(shí)對(duì) Java 一無所知,完全零基礎(chǔ)养交。主管交給我一個(gè)定時(shí)任務(wù)的需求精算,兩眼抹黑。于是帶著清澈而又稚嫩的眼神到網(wǎng)上一頓搜索碎连,找到了這個(gè) Timer灰羽,如獲至寶。
1.1 Timer
public class Timer {
// 優(yōu)先隊(duì)列鱼辙,按照任務(wù)的 ExecutionTime廉嚼,由近到遠(yuǎn)組織
private final TaskQueue queue = new TaskQueue();
// 延時(shí)任務(wù)的調(diào)度線程
private final TimerThread thread = new TimerThread(queue);
}
Timer 中有兩個(gè)核心組件,一個(gè)是用于調(diào)度延時(shí)任務(wù)的 TimerThread 座每,另一個(gè)是 TaskQueue前鹅,用于組織延時(shí)任務(wù)。它是一個(gè)優(yōu)先級(jí)隊(duì)列峭梳,其底層是一個(gè)數(shù)組實(shí)現(xiàn)的小根堆舰绘。
class TaskQueue {
// 數(shù)組實(shí)現(xiàn)的小根堆
private TimerTask[] queue = new TimerTask[128];
// 向小根堆的堆底添加TimerTask
void add(TimerTask task) {
if (size + 1 == queue.length)
queue = Arrays.copyOf(queue, 2*queue.length);
queue[++size] = task;
// 向上調(diào)整
fixUp(size);
}
// 獲取堆頂任務(wù)
TimerTask getMin() {
return queue[1];
}
// 刪除堆頂任務(wù)
void removeMin() {
queue[1] = queue[size];
queue[size--] = null;
//向下調(diào)整堆
fixDown(1);
}
}
TaskQueue 會(huì)將所有延時(shí)任務(wù)按照它們的 ExecutionTime ,由近到遠(yuǎn)的組織在小根堆中葱椭,堆頂永遠(yuǎn)存放的是 ExecutionTime 最近的延時(shí)任務(wù)捂寿。
TimerThread 會(huì)不斷的從 TaskQueue 中獲取堆頂任務(wù),如果堆頂任務(wù)的 ExecutionTime 已經(jīng)達(dá)到 —— executionTime <= currentTime
, 則執(zhí)行任務(wù)孵运。如果該任務(wù)是一個(gè)周期性任務(wù)秦陋,則將任務(wù)重新放入到 TaskQueue 中。
如果堆頂任務(wù)的 ExecutionTime 還沒有到達(dá)治笨,那么 TimerThread 就會(huì)等待 executionTime - currentTime
的時(shí)間驳概,一直到堆頂任務(wù)的執(zhí)行時(shí)間到達(dá),TimerThread 被重新喚醒執(zhí)行堆頂任務(wù)旷赖。
private void mainLoop() {
while (true) {
try {
TimerTask task;
// 堆頂任務(wù)的執(zhí)行時(shí)間是否到達(dá)
boolean taskFired;
synchronized(queue) {
long currentTime, executionTime;
// 獲取堆頂延時(shí)任務(wù)
task = queue.getMin();
synchronized(task.lock) {
// 當(dāng)前時(shí)間
currentTime = System.currentTimeMillis();
// 堆頂任務(wù)的執(zhí)行時(shí)間
executionTime = task.nextExecutionTime;
// 是否到達(dá)堆頂任務(wù)的執(zhí)行時(shí)間
if (taskFired = (executionTime<=currentTime)) {
if (task.period == 0) { // Non-repeating, remove
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else { // Repeating task, reschedule
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
// 如果堆頂任務(wù)的執(zhí)行時(shí)間還未到達(dá)顺又,那么 TimerThread 就會(huì)在這里等待
if (!taskFired)
queue.wait(executionTime - currentTime);
}
// 如果堆頂任務(wù)的執(zhí)行時(shí)間已經(jīng)到達(dá),則立即執(zhí)行
if (taskFired) // Task fired; run it, holding no locks
task.run();
} catch(InterruptedException e) {
}
}
}
根據(jù)以上 Timer 的核心實(shí)現(xiàn)等孵,我們可以總結(jié)出 Timer 在應(yīng)對(duì)中間件場景的延時(shí)任務(wù)時(shí)稚照,有以下四種不足:
首先用于組織延時(shí)任務(wù)的 TaskQueue 本質(zhì)上是一個(gè)小根堆。對(duì)于堆這種數(shù)據(jù)結(jié)構(gòu)來說,添加果录,刪除一個(gè)延時(shí)任務(wù)時(shí)上枕,堆都要向上,向下調(diào)整以便滿足小根堆的特性弱恒。單次操作的時(shí)間復(fù)雜度為
O(logn)
辨萍。顯然在面對(duì)海量定時(shí)任務(wù)的添加,刪除時(shí)斤彼,性能上還是差點(diǎn)意思分瘦。Timer 調(diào)度框架中只有一個(gè) TimerThread 線程來負(fù)責(zé)延時(shí)任務(wù)的調(diào)度,執(zhí)行琉苇。在面對(duì)海量任務(wù)的時(shí)候嘲玫,通常會(huì)顯得力不從心。
另外一個(gè)嚴(yán)重問題是并扇,當(dāng)延時(shí)任務(wù)在執(zhí)行的過程中出現(xiàn)異常時(shí)去团, Timer 并不會(huì)捕獲,會(huì)導(dǎo)致 TimerThread 終止穷蛹。這樣一來土陪,TaskQueue 中的其他延時(shí)任務(wù)將永遠(yuǎn)不會(huì)得到執(zhí)行。
Timer 依賴于系統(tǒng)的絕對(duì)時(shí)間肴熏,如果系統(tǒng)時(shí)間本身不準(zhǔn)確鬼雀,那么延時(shí)任務(wù)的調(diào)度就可能會(huì)出問題。
1.2 DelayQueue
DelayQueue 是 JDK 提供的一個(gè)延時(shí)隊(duì)列蛙吏,我們可以利用它來延時(shí)獲取隊(duì)列中的元素源哩,它的實(shí)現(xiàn)其實(shí)和 Timer 中的 TaskQueue 很類似,其底層都是一個(gè)優(yōu)先級(jí)隊(duì)列鸦做。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
//基于小根堆實(shí)現(xiàn)的優(yōu)先級(jí)隊(duì)列
private final PriorityQueue<E> q = new PriorityQueue<E>();
}
本質(zhì)上還是一個(gè)數(shù)組實(shí)現(xiàn)的小根堆励烦。添加,刪除任務(wù)的時(shí)間復(fù)雜度仍然是 O(logn)
泼诱。
public class PriorityQueue<E> {
// 數(shù)組實(shí)現(xiàn)的小根堆
transient Object[] queue;
}
DelayQueue 中的元素必須實(shí)現(xiàn) Delayed
接口中的 getDelay
, compareTo
方法坛掠。
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
public interface Comparable<T> {
public int compareTo(T o);
}
其中 getDelay
方法用于獲取任務(wù)還有多久到期。返回值如果小于等于 0 治筒,則表示該任務(wù)已經(jīng)到期了屉栓。
compareTo
方法用于調(diào)整任務(wù)在 DelayQueue 中的位置,DelayQueue 是一個(gè)小根堆耸袜,每次向 DelayQueue 添加新的任務(wù)時(shí)系瓢,先是把任務(wù)放到 DelayQueue 的末尾,然后依次向上調(diào)整句灌,直到任務(wù)的過期時(shí)間大于等于其 parent 。 這樣就可以保證 DelayQueue 的小根堆特性 —— 堆頂元素永遠(yuǎn)是過期時(shí)間最近的任務(wù)。
我們可以通過 take()
方法從 DelayQueue 獲取到期的堆頂任務(wù)胰锌,如果堆頂任務(wù)還沒到期骗绕,那么就會(huì)在 DelayQueue 上阻塞等待,直到堆頂任務(wù)到期為止资昧。
public E take() throws InterruptedException {
try {
for (;;) {
// 獲取 DelayQueue 堆頂任務(wù)
E first = q.peek();
if (first == null)
available.await();
else {
// 獲取堆頂任務(wù)還有多久到期
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0L)
// 堆頂任務(wù)到期酬土,則從 DelayQueue 中取出
return q.poll();
else {
try {
// 等待堆頂任務(wù)到期
available.awaitNanos(delay);
}
}
}
}
} finally {
...........
}
}
從 DelayQueue 的實(shí)現(xiàn)上可以看出,相比于 Timer 格带,DelayQueue 只是一個(gè)延時(shí)任務(wù)的管理隊(duì)列撤缴,而 Timer 卻是一個(gè)完整的任務(wù)調(diào)度組件。我們需要在 DelayQueue 的基礎(chǔ)之上叽唱,額外地實(shí)現(xiàn)任務(wù)調(diào)度功能屈呕。
但其底層的核心數(shù)據(jù)結(jié)構(gòu)仍然是一個(gè)小根堆。和 Timer 一樣棺亭,添加刪除延時(shí)任務(wù)的時(shí)間復(fù)雜度都是 O(logn)
虎眨。同樣無法滿足海量延時(shí)任務(wù)的調(diào)度。
1.3 ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 是多線程版本的 Timer 镶摘,它是在 DelayQueue 的基礎(chǔ)上增加了多線程調(diào)度延時(shí)任務(wù)的能力嗽桩。ScheduledThreadPoolExecutor 中負(fù)責(zé)組織管理延時(shí)任務(wù)的是 DelayedWorkQueue,它也是一個(gè)小根堆實(shí)現(xiàn)的優(yōu)先級(jí)隊(duì)列凄敢,延時(shí)任務(wù) ScheduledFutureTask 按照到期時(shí)間由近及遠(yuǎn)的組織在 DelayedWorkQueue 中碌冶。DelayedWorkQueue 的第一個(gè)元素是到期時(shí)間最近的 ScheduledFutureTask。
業(yè)務(wù)線程可以通過 schedule , scheduleAtFixedRate , scheduleWithFixedDelay 方法將延時(shí)任務(wù) ScheduledFutureTask 添加到 DelayedWorkQueue 中涝缝。
ScheduledThreadPoolExecutor 負(fù)責(zé)調(diào)度延時(shí)任務(wù)的是一個(gè)線程池扑庞,里邊包含了多個(gè) worker 調(diào)度線程,每個(gè) worker 線程負(fù)責(zé)從 DelayedWorkQueue 中獲取已經(jīng)到期的 ScheduledFutureTask俊卤,然后執(zhí)行嫩挤。如果 DelayedWorkQueue 中沒有任務(wù)到期,那么 worker 線程就會(huì)在 DelayedWorkQueue 上阻塞等待消恍,直到有到期的任務(wù)出現(xiàn)岂昭。
雖然 ScheduledThreadPoolExecutor 提供了多線程的調(diào)度能力,在一定程度上保證了延時(shí)任務(wù)調(diào)度的及時(shí)性狠怨,但是其底層仍然是依賴 DelayedWorkQueue 來管理延時(shí)任務(wù)约啊,在面對(duì)海量延時(shí)任務(wù)的添加,刪除時(shí)佣赖,時(shí)間復(fù)雜度依然還是 O(logn)
恰矩。那么有沒有一種數(shù)據(jù)結(jié)構(gòu)可以將這個(gè)時(shí)間復(fù)雜度降低為 O(1)
呢 ? 這就是本文我們要討論的重點(diǎn)內(nèi)容 —— 時(shí)間輪的設(shè)計(jì)與實(shí)現(xiàn)憎蛤。
2. Netty 時(shí)間輪的設(shè)計(jì)原理
時(shí)間輪的設(shè)計(jì)靈感來自于我們?nèi)粘I钪杏玫溺姳硗飧担姳碛忻脶樇退保轴槪瑫r(shí)針萎胰,共三個(gè)指針碾盟,60 個(gè)刻度。秒針每走一個(gè)刻度就是一秒技竟,秒針走完一個(gè)時(shí)鐘周期(60s)冰肴,分針走一個(gè)刻度就是一分鐘,當(dāng)分針走完一個(gè)時(shí)鐘周期(60m)榔组,時(shí)針走一個(gè)刻度就是一個(gè)小時(shí)熙尉。
比如我們要在今天的 10 點(diǎn) 10 分 0 秒這個(gè)時(shí)刻去開一個(gè)重要的會(huì)議,那么當(dāng)鐘表的秒針指向 0 這個(gè)刻度搓扯,分針指向 10 這個(gè)刻度检痰,時(shí)針指向 10 這個(gè)刻度的時(shí)候,鬧鐘就會(huì)響起擅编,提醒我們?nèi)?zhí)行開會(huì)這個(gè)延時(shí)任務(wù)攀细。
如果我們能把鐘表里的刻度抽象成一個(gè)數(shù)據(jù)結(jié)構(gòu),用這個(gè)數(shù)據(jù)結(jié)構(gòu)來存放對(duì)應(yīng)刻度的延時(shí)任務(wù)的話爱态,那么當(dāng)鐘表的時(shí)針谭贪,分針,秒針指向某個(gè)刻度的時(shí)候锦担,我們就去執(zhí)行這個(gè)刻度對(duì)應(yīng)的延時(shí)任務(wù)俭识,這樣一來,一種新的延時(shí)任務(wù)調(diào)度思路就出來了洞渔,這也是時(shí)間輪的設(shè)計(jì)理念套媚。
如上圖所示,Netty 將鐘表的刻度抽象成了一個(gè) HashedWheelBucket 的數(shù)據(jù)結(jié)構(gòu)磁椒,鐘表的表盤被抽象成一個(gè) HashedWheelBucket 類型的環(huán)形數(shù)組堤瘤,鐘表中有 60 個(gè)刻度,而 Netty 的時(shí)間輪 HashedWheelTimer 一共有 512 個(gè)刻度浆熔。
public class HashedWheelTimer implements Timer {
// 數(shù)組大小默認(rèn)為 512
private final HashedWheelBucket[] wheel;
// HashedWheelTimer 的時(shí)鐘精度本辐,也就是時(shí)鐘間隔,多久轉(zhuǎn)動(dòng)一次医增,默認(rèn) 100ms, 最小值為 1ms
private final long tickDuration;
}
鐘表中一共有三個(gè)指針慎皱,分別是秒針,分針叶骨,時(shí)針茫多。而 HashedWheelTimer 中只有一個(gè) tick 指針,tick 每隔 tickDuration (100ms) 走一個(gè)刻度忽刽,也就是說 Netty 時(shí)間輪的時(shí)鐘精度就是 100 ms , 定時(shí)任務(wù)的調(diào)度延時(shí)有時(shí)會(huì)在 100ms 左右天揖。如果你接受不了這么大的調(diào)度誤差夺欲,那么可以將 tickDuration 適當(dāng)調(diào)小一些,但最小不能低于 1ms 宝剖。
什么意思呢 洁闰?比如現(xiàn)在我們需要在 5ms 之后執(zhí)行一個(gè)延時(shí)任務(wù),那么時(shí)間輪可能在 8ms 之后才會(huì)調(diào)度這個(gè)任務(wù)万细,也可能在 65ms 之后調(diào)度,也有可能在 108ms 之后調(diào)度纸泄,這就使得定時(shí)任務(wù)的執(zhí)行有了大約 100ms 左右的延時(shí)赖钞。
具體延時(shí)多少,取決于我們在什么時(shí)刻將這個(gè)定時(shí)任務(wù)添加到時(shí)間輪中聘裁。關(guān)于這一點(diǎn)雪营,筆者后面會(huì)在介紹時(shí)間輪具體實(shí)現(xiàn)細(xì)節(jié)的時(shí)候詳細(xì)討論,這里點(diǎn)到為止衡便,本小節(jié)我們還是主要聚焦于時(shí)間輪的設(shè)計(jì)原理献起。
對(duì)于鐘表的秒針來說,它的 tickDuration 就是 1s , 走完一個(gè)時(shí)鐘周期就是 60s 镣陕。 對(duì)于分針來說谴餐,它的 tickDuration 就是 1m , 走完一個(gè)時(shí)鐘周期就是 60m。對(duì)于時(shí)針來說呆抑,它的 tickDuration 就是 1h , 走完一個(gè)時(shí)鐘周期就是 12h岂嗓。
由于 HashedWheelTimer 中的 tickDuration 是 100ms , 有 512 個(gè)刻度 (HashedWheelBucket) , 所以時(shí)間輪中的 tick 指針走完一個(gè)時(shí)鐘周期需要 51200ms 。
HashedWheelBucket 是一個(gè)具有頭尾指針的雙向鏈表鹊碍,鏈表中存儲(chǔ)的元素類型為 HashedWheelTimeout 用于封裝定時(shí)任務(wù)厌殉。HashedWheelBucket 中的 head 指向雙向鏈表中的第一個(gè) HashedWheelTimeout , tail 指向雙向鏈表中的最后一個(gè) HashedWheelTimeout侈咕。
private static final class HashedWheelBucket {
// Used for the linked-list datastructure
private HashedWheelTimeout head;// 指向雙向鏈表中的第一個(gè) timeout
private HashedWheelTimeout tail;// 指向雙向鏈表中的最后一個(gè) timeout
}
HashedWheelTimeout 用于封裝時(shí)間輪中的延時(shí)任務(wù)公罕,提交到時(shí)間輪中的延時(shí)任務(wù)必須實(shí)現(xiàn) TimerTask 接口。
// 延時(shí)任務(wù)
public interface TimerTask {
void run(Timeout timeout) throws Exception;
}
private static final class HashedWheelTimeout implements Timeout, Runnable {
// 延時(shí)任務(wù)所屬的時(shí)間輪
private final HashedWheelTimer timer;
// 延時(shí)任務(wù)
private final TimerTask task;
// 延時(shí)任務(wù)的 deadline 耀销,該時(shí)間是一個(gè)絕對(duì)時(shí)間楼眷,以時(shí)間輪的啟動(dòng)時(shí)間 startTime 為起點(diǎn)
private final long deadline;
// 延時(shí)任務(wù)所屬的 bucket
HashedWheelBucket bucket;
// 指向其在 bucket 的下一個(gè)延時(shí)任務(wù)
HashedWheelTimeout next;
// 指向其在 bucket 的前一個(gè)延時(shí)任務(wù)
HashedWheelTimeout prev;
HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
this.timer = timer;
this.task = task;
this.deadline = deadline;
}
}
HashedWheelTimeout 中有一個(gè)重要的屬性 deadline ,它規(guī)定了延時(shí)任務(wù) TimerTask 的到期時(shí)間树姨。deadline 是一個(gè)絕對(duì)時(shí)間值摩桶,它以時(shí)間輪的啟動(dòng)時(shí)間 startTime 為起點(diǎn),表示從 startTime 這個(gè)時(shí)間點(diǎn)開始帽揪,到 deadline 這個(gè)時(shí)間點(diǎn)到期硝清。
// 計(jì)算延時(shí)任務(wù)到期的絕對(duì)時(shí)間戳
// 時(shí)間輪中的時(shí)間戳均以時(shí)間輪的啟動(dòng)時(shí)間 startTime 為起點(diǎn)
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
Netty 時(shí)間輪中的時(shí)間坐標(biāo)系全部是以時(shí)間輪的啟動(dòng)時(shí)間點(diǎn) startTime 為基準(zhǔn)的,當(dāng)時(shí)間輪啟動(dòng)之后转晰,會(huì)將那一刻的時(shí)間戳設(shè)置到 startTime 中芦拿。
public class HashedWheelTimer implements Timer {
// 時(shí)間輪的啟動(dòng)時(shí)間戳(納秒)
private volatile long startTime;
}
時(shí)間輪中的 tick 指針也是一個(gè)絕對(duì)值士飒,當(dāng)時(shí)間輪啟動(dòng)之后,tick 指向 0 蔗崎,每隔 100ms (tickDuration)酵幕,tick 向前轉(zhuǎn)動(dòng)一下。但需要注意的是 tick 的值是只增不減的缓苛,只要時(shí)間輪在運(yùn)行芳撒,那么 tick 的值就會(huì)一直遞增上去。比如未桥,當(dāng) tick 轉(zhuǎn)動(dòng)完一個(gè)時(shí)鐘周期(51200ms)之后笔刹,tick 的值是 512 而不是重新指向 0 。
tick 與 HashedWheelBucket 之間的映射關(guān)系通過 ticks & mask
計(jì)算得出冬耿。mask 為 HashedWheelBucket 的個(gè)數(shù)減 1 舌菜,所以這就要求時(shí)間輪中 HashedWheelBucket 的個(gè)數(shù)必須是 2 的次冪。
在時(shí)間輪中亦镶,屬于同一個(gè) HashedWheelBucket 中的延時(shí)任務(wù) HashedWheelTimeouts 日月,它們的到期時(shí)間 deadline 都在同一時(shí)間范圍內(nèi) —— [ tick , tick + 1) * tickDuration
。
比如缤骨,在時(shí)間輪剛剛啟動(dòng)之后爱咬,tick 指向 0 ,那么 wheel[0] 指向的 HashedWheelBucket 里存放的 HashedWheelTimeouts荷憋,它們的到期時(shí)間均在 [ 0 , 100) ms
之內(nèi)台颠。
假如我們在 tick = 0
這個(gè)時(shí)刻,向時(shí)間輪中添加了一個(gè)延時(shí) 5ms
執(zhí)行的 HashedWheelTimeout勒庄,那么它就會(huì)被放入 wheel[0] 中串前。如果添加的是一個(gè)延時(shí) 101ms
執(zhí)行的 HashedWheelTimeout,那么它就會(huì)被放入 wheel[1] 中实蔽。同樣的道理荡碾,如果添加的是一個(gè)延時(shí) 360ms
執(zhí)行的 HashedWheelTimeout,那么它就會(huì)被放入 wheel[3] 中局装。
當(dāng)時(shí)間過了 100ms 之后坛吁,Netty 就會(huì)將 HashedWheelBucket0
中的延時(shí)任務(wù)拉出來執(zhí)行,執(zhí)行完之后铐尚,tick 的值加 1 拨脉,從 0 轉(zhuǎn)動(dòng)到 1 。在經(jīng)過 100 ms 之后,Netty 就會(huì)將 HashedWheelBucket1
中的延時(shí)任務(wù)拉出來執(zhí)行,執(zhí)行完之后掀序,tick 的值加 1 舶担,從 1 轉(zhuǎn)動(dòng)到 2 帖旨,如此往復(fù)循環(huán)箕昭。這就是整個(gè)時(shí)間輪的運(yùn)轉(zhuǎn)邏輯。
但從這個(gè)過程中我們可以看出解阅,延時(shí)任務(wù)的調(diào)度存在 tickDuration(100ms)左右的延遲落竹。比如,在 tick = 0
這個(gè)時(shí)刻货抄,添加到 HashedWheelBucket0
中的延時(shí)任務(wù)述召,我們本來是期望這些延時(shí)任務(wù)分別在 5ms , 10ms , 95ms 之后執(zhí)行,但時(shí)間輪的真正調(diào)度時(shí)間卻在 100ms 之后蟹地。這就導(dǎo)致了任務(wù)調(diào)度產(chǎn)生了 100ms 左右的延遲桨武。
如果你接受不了 100ms 的延遲,那么可以在創(chuàng)建時(shí)間輪的時(shí)候锈津,將 tickDuration 的值調(diào)低,但不能低于 1ms 凉蜂。tickDuration 的值越小琼梆,時(shí)間輪的精度越高,性能開銷也就越大窿吩。tickDuration 的值越大茎杂,時(shí)間輪的精度也就越低,性能開銷越小纫雁。
public HashedWheelTimer(long tickDuration, TimeUnit unit) {
}
但在中間件的場景中煌往,往往對(duì)延時(shí)任務(wù)調(diào)度的及時(shí)性沒有那么高的要求,同時(shí)為了兼顧時(shí)間輪的精度與性能轧邪,tickDuration 默認(rèn)設(shè)置為100ms 是剛好合適的刽脖,通常不需要調(diào)整。
另外在默認(rèn)情況下忌愚,只有一個(gè)線程 workerThread 負(fù)責(zé)推動(dòng)時(shí)間輪的轉(zhuǎn)動(dòng)曲管,以及延時(shí)任務(wù)的執(zhí)行。
public class HashedWheelTimer implements Timer {
// HashedWheelTimer 的 worker 線程硕糊,由它來驅(qū)動(dòng)時(shí)間輪的轉(zhuǎn)動(dòng)院水,延時(shí)任務(wù)的執(zhí)行
private final Thread workerThread;
}
從上面的過程可以看出,只有當(dāng)前 tick 對(duì)應(yīng)的 HashedWheelBucket 中的延時(shí)任務(wù)全部被執(zhí)行完畢的時(shí)候简十,tick 才會(huì)向前推動(dòng)檬某。所以為了保證任務(wù)調(diào)度的及時(shí)性,時(shí)間輪中的延時(shí)任務(wù)執(zhí)行時(shí)間不能太長螟蝙,只適合邏輯簡單恢恼,執(zhí)行時(shí)間短的延時(shí)任務(wù)。
但畢竟在默認(rèn)情況下就只有這一個(gè) workerThread胶逢,既負(fù)責(zé)延時(shí)任務(wù)的調(diào)度厅瞎,又負(fù)責(zé)延時(shí)任務(wù)的執(zhí)行饰潜,對(duì)于有海量并發(fā)延時(shí)任務(wù)的場景,還是顯得力不從心和簸。為了應(yīng)對(duì)這種情況彭雾,我們可以在創(chuàng)建時(shí)間輪的時(shí)候,指定一個(gè)專門用于執(zhí)行延時(shí)任務(wù)的 Executor锁保。
這樣一來薯酝,時(shí)間輪中的延時(shí)任務(wù)調(diào)度還是由單線程 workerThread 負(fù)責(zé),到期的延時(shí)任務(wù)由線程池 Executor 來負(fù)責(zé)執(zhí)行爽柒,近一步提升延時(shí)任務(wù)調(diào)度的及時(shí)性吴菠。但事實(shí)上,在大部分場景中浩村,有一個(gè) workerThread 就夠了做葵,并不需要額外的指定 Executor。大家可以根據(jù)實(shí)際情況心墅,自由裁定酿矢。
public class HashedWheelTimer implements Timer {
// 負(fù)責(zé)執(zhí)行延時(shí)任務(wù),用于應(yīng)對(duì)大量的并發(fā)延時(shí)任務(wù)場景
// 默認(rèn)為單線程 workerThread
private final Executor taskExecutor;
// 在構(gòu)造函數(shù)中可以設(shè)置 taskExecutor
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts, Executor taskExecutor)
}
另外還有一個(gè)問題就是怎燥,上圖時(shí)間輪中的延時(shí)任務(wù)瘫筐,它們的延時(shí)時(shí)間都在同一時(shí)鐘周期內(nèi)。Netty 時(shí)間輪中的一個(gè)時(shí)鐘周期是 51200ms 铐姚。
也就是說策肝,在 tick = 0
這個(gè)時(shí)刻,只要延時(shí)任務(wù)的延時(shí)時(shí)間在 51200ms 之內(nèi)隐绵,那么當(dāng) tick 轉(zhuǎn)動(dòng)完 512 個(gè)刻度之后(一個(gè)時(shí)鐘周期)之众,這 512 個(gè)刻度對(duì)應(yīng)的 HashedWheelBucket 中的延時(shí)任務(wù)全部會(huì)被執(zhí)行到。
如果我們在 tick = 0
這個(gè)時(shí)刻氢橙,添加一個(gè)延時(shí)任務(wù)酝枢,但它的延時(shí)時(shí)間超過了一個(gè)時(shí)鐘周期,比如在 51250ms
之后執(zhí)行悍手。 那么這個(gè)延時(shí)任務(wù)也會(huì)被添加到 HashedWheelBucket0
中帘睦。
當(dāng)時(shí)間過了 100ms 之后,workerThread 就會(huì)執(zhí)行 HashedWheelBucket0
中的延時(shí)任務(wù)坦康。但此時(shí)只能執(zhí)行延時(shí) 5ms , 10ms 的任務(wù)竣付,不能執(zhí)行延時(shí) 51250ms
的任務(wù),因?yàn)樗枰鹊较乱粋€(gè)時(shí)鐘周期才能執(zhí)行滞欠。
那么 workerThread 在執(zhí)行延時(shí)任務(wù)的時(shí)候如何才能知道古胆,哪些任務(wù)是本次時(shí)鐘周期內(nèi)可以執(zhí)行的,哪些任務(wù)是要等到下一次或者下下次時(shí)鐘周期才能執(zhí)行的呢 ?
在延時(shí)任務(wù)模型 HashedWheelTimeout 中有一個(gè)字段 —— remainingRounds逸绎,用于記錄延時(shí)任務(wù)還剩多少時(shí)鐘周期可以執(zhí)行惹恃。
private static final class HashedWheelTimeout implements Timeout, Runnable {
// 執(zhí)行該延時(shí)任務(wù)需要經(jīng)過多少時(shí)鐘周期
long remainingRounds;
}
本次時(shí)鐘周期內(nèi)可以執(zhí)行的延時(shí)任務(wù),它的 remainingRounds = 0 棺牧,workerThread 在遇到 remainingRounds = 0
的 HashedWheelTimeout 就會(huì)執(zhí)行巫糙。
下一個(gè)時(shí)鐘周期才能執(zhí)行的延時(shí)任務(wù),它的 remainingRounds = 1 颊乘,依次類推参淹。當(dāng) workerThread 遇到 remainingRounds > 0
的 HashedWheelTimeout 就會(huì)直接跳過,并將 remainingRounds 減 1 乏悄。
比如浙值,上圖中 HashedWheelBucket0 中的這幾個(gè)延時(shí)任務(wù),其中延時(shí) 5ms , 10ms 的 HashedWheelTimeout 它們的 remainingRounds = 0
, 表示在本次時(shí)鐘周期內(nèi)就可以馬上執(zhí)行檩小。
延時(shí) 51250ms 的 HashedWheelTimeout 它的 remainingRounds = 1
开呐, 表示在下一個(gè)時(shí)鐘周期才能執(zhí)行。
好了规求,現(xiàn)在整個(gè)時(shí)間輪的設(shè)計(jì)原理筆者就為大家介紹完了负蚊,那么讓我們再次回到本小節(jié)開頭的問題,在面對(duì)海量延時(shí)任務(wù)的添加與取消時(shí)颓哮,時(shí)間輪如何將這個(gè)時(shí)間復(fù)雜度降低為 O(1)
呢 ?
首先鸵荠,時(shí)間輪的核心數(shù)據(jù)結(jié)構(gòu)就是一個(gè) HashedWheelBucket 類型的環(huán)形數(shù)組 wheel 冕茅, 數(shù)組長度默認(rèn)為 512 。wheel 數(shù)組用于組織管理時(shí)間輪中的所有延時(shí)任務(wù)蛹找。
// 數(shù)組大小默認(rèn)為 512
private final HashedWheelBucket[] wheel;
與之前介紹的延時(shí)隊(duì)列 DelayedWorkQueue 不同的是姨伤,環(huán)形數(shù)組 wheel 會(huì)按照延時(shí)時(shí)間的不同,將延時(shí)任務(wù)分散到 512 個(gè) HashedWheelBucket 中管理庸疾。每個(gè) HashedWheelBucket 負(fù)責(zé)管理到期時(shí)間范圍在 [ tick , tick + 1) * tickDuration
之間的任務(wù)乍楚。
而 DelayedWorkQueue 則是用一個(gè)優(yōu)先級(jí)隊(duì)列來管理所有的延時(shí)任務(wù),為了維護(hù)小根堆的特性届慈,每次在向 DelayedWorkQueue 添加或者刪除延時(shí)任務(wù)的時(shí)間復(fù)雜度為 O(logn)
徒溪。
當(dāng)我們向時(shí)間輪添加一個(gè)延時(shí)任務(wù)時(shí),Netty 首先會(huì)根據(jù)延時(shí)任務(wù)的 deadline 以及 tickDuration 計(jì)算出該延時(shí)任務(wù)最終會(huì)停留在哪一個(gè) tick 上金顿。注意臊泌,延時(shí)任務(wù)中的 deadline 是一個(gè)絕對(duì)值而不是相對(duì)值,是以時(shí)間輪啟動(dòng)時(shí)間 startTime 為基準(zhǔn)的一個(gè)絕對(duì)時(shí)間戳揍拆。tick 也是一個(gè)絕對(duì)值而不是相對(duì)值渠概,是以時(shí)間輪剛剛啟動(dòng)時(shí) tick = 0
為基準(zhǔn)的絕對(duì)值,只增不減嫂拴。
比如播揪,前面這個(gè)延時(shí) 51250ms 的任務(wù)贮喧,它最終會(huì)停留在 tick = 512
上。但由于時(shí)間輪是一個(gè)環(huán)形數(shù)組猪狈,所以 tick 512 與數(shù)組長度 512 取余得到所屬 HashedWheelBucket 在 wheel 數(shù)組中的 index = 0箱沦。
// 計(jì)算延時(shí)任務(wù),最終會(huì)停留在哪一個(gè) tick 上
long calculated = timeout.deadline / tickDuration;
// 獲取 calculated 對(duì)應(yīng)的 HashedWheelBucket
int stopIndex = (int) (calculated & mask);
HashedWheelBucket bucket = wheel[stopIndex];
// 將延時(shí)任務(wù)添加到對(duì)應(yīng)的 HashedWheelBucket 中
bucket.addTimeout(timeout);
然后將延時(shí)任務(wù)添加到 HashedWheelBucket 的末尾罪裹,前面我們已經(jīng)提過饱普,HashedWheelBucket 是一個(gè)雙向鏈表,向鏈表末尾添加一個(gè)元素的時(shí)間復(fù)雜度為 O(1)
状共。
private static final class HashedWheelBucket {
public void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {
head = tail = timeout;
} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}
}
延時(shí)任務(wù)的取消邏輯也很簡單套耕,就是將這個(gè)延時(shí)任務(wù)從其所屬的 HashedWheelBucket 中刪除即可。從一個(gè)雙向鏈表中刪除某個(gè)指定的元素時(shí)間復(fù)雜度也是 O(1)
峡继。
public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
HashedWheelTimeout next = timeout.next;
// timeout 不是第一個(gè)元素
if (timeout.prev != null) {
timeout.prev.next = next;
}
// timeout 不是最后一個(gè)元素
if (timeout.next != null) {
timeout.next.prev = timeout.prev;
}
if (timeout == head) {
// Bucket 中只有一個(gè)任務(wù)冯袍,直接將頭尾指針置空
if (timeout == tail) {
tail = null;
head = null;
} else {
// 待刪除的任務(wù)是第一個(gè)任務(wù),head 指針向后移動(dòng)
head = next;
}
} else if (timeout == tail) {
// 待刪除的任務(wù)是最后一個(gè)任務(wù)碾牌,tail 指針向前移動(dòng)
tail = timeout.prev;
}
// null out prev, next and bucket to allow for GC.
timeout.prev = null;
timeout.next = null;
timeout.bucket = null;
return next;
}
從以上過程我們可以看出康愤,時(shí)間輪在面對(duì)海量延時(shí)任務(wù)的添加,取消的時(shí)候舶吗,所需的時(shí)間復(fù)雜度都是 O(1)
征冷,聊完了延時(shí)任務(wù)的管理,現(xiàn)在我們在來看一下延時(shí)任務(wù)的調(diào)度與執(zhí)行誓琼。
Netty 只靠一個(gè)單線程 workThread 來推動(dòng)時(shí)間輪一個(gè) tick 一個(gè) tick 地向前轉(zhuǎn)動(dòng)检激,當(dāng)時(shí)間輪轉(zhuǎn)動(dòng)到某一個(gè) tick 時(shí),workThread 會(huì)等待一個(gè) tickDuration (默認(rèn) 100ms)的時(shí)間腹侣,隨后 workThread 會(huì)將該 tick 對(duì)應(yīng)的 HashedWheelBucket 中 remainingRounds = 0
的延時(shí)任務(wù)全都拉取下來挨個(gè)執(zhí)行叔收。
當(dāng)執(zhí)行完 HashedWheelBucket 中的延時(shí)任務(wù)之后,tick 向前推進(jìn)一格(tick++)傲隶,workThread 繼續(xù)睡眠等待一個(gè) tickDuration饺律,然后重復(fù)上述過程。
這里我們可以看出跺株,延時(shí)任務(wù)的調(diào)度與執(zhí)行在默認(rèn)情況下全部都是由一個(gè)單線程 workThread 來執(zhí)行复濒。如果時(shí)間輪中的延時(shí)任務(wù)邏輯復(fù)雜,執(zhí)行時(shí)間長乒省,那么就會(huì)影響整個(gè)時(shí)間輪的調(diào)度芝薇,tick 的轉(zhuǎn)動(dòng)就會(huì)出現(xiàn)延時(shí),所以時(shí)間輪雖然可以處理海量的延時(shí)任務(wù)作儿,但是這些延時(shí)任務(wù)的邏輯必須要簡單洛二,執(zhí)行時(shí)間要短。當(dāng)然了,我們也可以在創(chuàng)建時(shí)間輪的時(shí)候指定一個(gè)專門執(zhí)行延時(shí)任務(wù)的線程池來加快任務(wù)的執(zhí)行晾嘶。
由于延時(shí)任務(wù)的調(diào)度通常會(huì)有一個(gè) tickDuration 左右的延時(shí)妓雾。比如,任務(wù)的調(diào)度可能會(huì)晚幾毫秒或者幾十毫秒垒迂,也有可能晚一個(gè) tickDuration 左右械姻。所以時(shí)間輪只能處理那些對(duì)任務(wù)調(diào)度的及時(shí)性要求沒那么高的場景。
3. Netty 時(shí)間輪相關(guān)設(shè)計(jì)模型的實(shí)現(xiàn)
3.1 HashedWheelTimer
Netty 使用一個(gè)叫做 HashedWheelTimer 的結(jié)構(gòu)來描述時(shí)間輪机断,其中包含了第二小節(jié)中介紹的所有重要屬性以及核心結(jié)構(gòu)楷拳。其中最核心的就是 wheel 環(huán)形數(shù)組,它相當(dāng)于鐘表的表盤吏奸,表盤中的每一個(gè)刻度用 HashedWheelBucket 結(jié)構(gòu)描述欢揖。
private final HashedWheelBucket[] wheel;
時(shí)間輪中究竟包含多少個(gè)刻度,是由構(gòu)造參數(shù) ticksPerWheel
決定的奋蔚,默認(rèn)為 512 她混。Netty 會(huì)根據(jù)延時(shí)時(shí)間的不同將所有提交到時(shí)間輪的延時(shí)任務(wù)分散到 512 個(gè) HashedWheelBucket 中組織管理。定位延時(shí)任務(wù)所在的 HashedWheelBucket 以及向 HashedWheelBucket 中添加泊碑,取消延時(shí)任務(wù)的時(shí)間復(fù)雜度均為 O(1)
坤按。
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts, Executor taskExecutor)
如果時(shí)間輪中需要調(diào)度的延時(shí)任務(wù)非常多,那么每個(gè) HashedWheelBucket 中就可能包含大量的延時(shí)任務(wù)馒过,這就導(dǎo)致時(shí)間輪的調(diào)度發(fā)生延遲臭脓。針對(duì)這種情況,我們可以適當(dāng)增加 ticksPerWheel 的個(gè)數(shù)腹忽,讓更多的 HashedWheelBucket 來分?jǐn)傃訒r(shí)任務(wù)谢鹊。但 ticksPerWheel 必須是 2 的次冪。
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
// ticksPerWheel 必須是 2 的次冪留凭,默認(rèn)為 512
ticksPerWheel = MathUtil.findNextPositivePowerOfTwo(ticksPerWheel);
// 創(chuàng)建時(shí)間輪中的 hash 槽
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
這樣一來,當(dāng)我們向時(shí)間輪添加延時(shí)的任務(wù)的時(shí)候偎巢,就可以通過 &
運(yùn)算來代替 %
運(yùn)算去尋找延時(shí)任務(wù)對(duì)應(yīng)的 HashedWheelBucket蔼夜。
mask = wheel.length - 1;
第二小節(jié)我們已經(jīng)介紹過了,在向時(shí)間輪添加延時(shí)任務(wù)時(shí)压昼,我們需要首先定位到這個(gè)延時(shí)任務(wù)最終會(huì)停留在哪一個(gè) tick 上求冷,時(shí)間輪中的 tick 是一個(gè)絕對(duì)值,它不會(huì)按照時(shí)鐘周期的結(jié)束而自動(dòng)歸 0 窍霞,而是一直會(huì)往上遞增匠题。
calculated 也是一個(gè)絕對(duì)值,表示延時(shí)任務(wù)最終會(huì)停留在哪一個(gè) tick 上但金,隨后通過 calculated & mask
定位到對(duì)應(yīng)的 HashedWheelBucket韭山,時(shí)間復(fù)雜度為 O(1)
。
// 計(jì)算延時(shí)任務(wù),最終會(huì)停留在哪一個(gè) tick 上
long calculated = timeout.deadline / tickDuration;
// 獲取 calculated 對(duì)應(yīng)的 HashedWheelBucket
int stopIndex = (int) (calculated & mask);
HashedWheelBucket bucket = wheel[stopIndex];
tickDuration
表示時(shí)間輪中的時(shí)鐘精度钱磅,也就是 tick 指針多久轉(zhuǎn)動(dòng)一次梦裂,默認(rèn)為 100ms,我們可以通過構(gòu)造參數(shù) tickDuration 進(jìn)行指定盖淡,但最小不能低于 1ms年柠。
private final long tickDuration;
tickDuration 的值越小,時(shí)間輪的精度越高褪迟,性能開銷也就越大冗恨。tickDuration 的值越大,時(shí)間輪的精度也就越低味赃,性能開銷越小掀抹。
現(xiàn)在時(shí)間輪的基本骨架就有了,而時(shí)間輪的運(yùn)轉(zhuǎn)靠的就是 workerThread 洁桌,由它來驅(qū)動(dòng)時(shí)鐘 tick 一下一下的轉(zhuǎn)動(dòng)渴丸,并執(zhí)行對(duì)應(yīng) HashedWheelBucket 中的延時(shí)任務(wù)。
private final Worker worker = new Worker();
private final Thread workerThread;
由于在默認(rèn)情況下另凌,Netty 時(shí)間輪中就只有這一個(gè)單線程 workerThread 來負(fù)責(zé)延時(shí)任務(wù)的調(diào)度與執(zhí)行谱轨,在面對(duì)海量并發(fā)任務(wù)的時(shí)候,難免顯得有點(diǎn)力不從心吠谢。執(zhí)行任務(wù)的時(shí)間過長土童,就會(huì)導(dǎo)致 tick 的轉(zhuǎn)動(dòng)產(chǎn)生很大的延時(shí)。于是 Netty 又在 4.1.69.Final 中引入了一個(gè) taskExecutor工坊,來專門負(fù)責(zé)執(zhí)行延時(shí)任務(wù)献汗。
private final Executor taskExecutor;
我們可以通過構(gòu)造參數(shù) taskExecutor
來指定自定義的線程池,默認(rèn)情況下為 ImmediateExecutor 王污,其本質(zhì)還是由 workerThread 來執(zhí)行延時(shí)任務(wù)罢吃。
public final class ImmediateExecutor implements Executor {
@Override
public void execute(Runnable command) {
ObjectUtil.checkNotNull(command, "command").run();
}
}
workerThread 負(fù)責(zé)從對(duì)應(yīng) tick 的 HashedWheelBucket 中拉取延時(shí)任務(wù),然后將延時(shí)任務(wù)丟給 taskExecutor 來執(zhí)行昭齐。這在一定程度上提高了延時(shí)任務(wù)的消費(fèi)速度尿招,不至于拖慢 workerThread 從而影響到整個(gè)時(shí)間輪的運(yùn)轉(zhuǎn)。
時(shí)間輪中待執(zhí)行延時(shí)任務(wù)的最大個(gè)數(shù)受到參數(shù) maxPendingTimeouts
限制阱驾,默認(rèn)為 -1 就谜。當(dāng) maxPendingTimeouts 的值小于等于 0 的時(shí)候,表示 Netty 不會(huì)對(duì)時(shí)間輪中的延時(shí)任務(wù)個(gè)數(shù)進(jìn)行限制里覆。
// 時(shí)間輪中待執(zhí)行延時(shí)任務(wù)的最大個(gè)數(shù)
private final long maxPendingTimeouts;
// 時(shí)間輪當(dāng)前待執(zhí)行的延時(shí)任務(wù)個(gè)數(shù)
private final AtomicLong pendingTimeouts = new AtomicLong(0);
當(dāng)時(shí)間輪中的延時(shí)任務(wù)個(gè)數(shù)超過了 maxPendingTimeouts 的限制時(shí)丧荐,再向時(shí)間輪添加延時(shí)任務(wù)就會(huì)得到 RejectedExecutionException
異常。
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
// 待執(zhí)行的延時(shí)任務(wù)計(jì)數(shù) + 1
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
// maxPendingTimeouts 默認(rèn)為 -1 喧枷。表示不對(duì)時(shí)間輪的延時(shí)任務(wù)個(gè)數(shù)進(jìn)行限制
// 如果達(dá)到限制虹统,則不能繼續(xù)向時(shí)間輪添加任務(wù)
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
弓坞,,窟却,昼丑,,夸赫,菩帝,,
}
另外時(shí)間輪 HashedWheelTimer 在 JVM 進(jìn)程中的實(shí)例個(gè)數(shù)會(huì)受到 INSTANCE_COUNT_LIMIT
的限制茬腿,默認(rèn)為 64 呼奢。
// 系統(tǒng)當(dāng)前時(shí)間輪實(shí)例的個(gè)數(shù)
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
// 默認(rèn)允許的 HashedWheelTimer 最大實(shí)例個(gè)數(shù)
private static final int INSTANCE_COUNT_LIMIT = 64;
如果當(dāng)前 JVM 進(jìn)程中的 HashedWheelTimer 實(shí)例個(gè)數(shù)超過了 64 ,那么 Netty 就會(huì)打印 Error
日志進(jìn)行警告切平。
private static void reportTooManyInstances() {
if (logger.isErrorEnabled()) {
String resourceType = simpleClassName(HashedWheelTimer.class);
logger.error("You are creating too many " + resourceType + " instances. " +
resourceType + " is a shared resource that must be reused across the JVM, " +
"so that only a few instances are created.");
}
}
從上面的警告信息我們可以看出握础,時(shí)間輪是一種共享的資源,既然是一種系統(tǒng)資源悴品,那么就和內(nèi)存資源一樣(ByteBuf)都存在資源泄露的風(fēng)險(xiǎn)禀综。當(dāng)我們使用完時(shí)間輪但忘記調(diào)用它的 stop
方法進(jìn)行關(guān)閉的時(shí)候,就發(fā)生了資源泄露苔严。
和 ByteBuf 一樣定枷,在 HashedWheelTimer 中也有一個(gè) ResourceLeakTracker
用于跟蹤探測資源泄露的發(fā)生,如果發(fā)生資源泄露届氢,Netty 就會(huì)以 Error
日志的形式打印出泄露的位置欠窒。
class SimpleLeakAwareByteBuf extends WrappedByteBuf {
final ResourceLeakTracker<ByteBuf> leak;
}
public class HashedWheelTimer implements Timer {
private final ResourceLeakTracker<HashedWheelTimer> leak;
}
關(guān)于 ResourceLeakTracker 的實(shí)現(xiàn)原理,感興趣的讀者可以回看下筆者之前的文章 《Netty 如何自動(dòng)探測內(nèi)存泄露的發(fā)生》
我們在創(chuàng)建 HashedWheelTimer 的時(shí)候可以通過構(gòu)造參數(shù) leakDetection
來開啟退子,關(guān)閉時(shí)間輪的資源泄露探測岖妄。leakDetection 默認(rèn)為 true , 表示無條件開啟資源泄露的探測。如果設(shè)置為 false , 那么只有當(dāng) workerThread 不是守護(hù)線程的時(shí)候才會(huì)開啟資源泄露探測寂祥。
workerThread 默認(rèn)情況下并不是一個(gè)守護(hù)線程荐虐。
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
3.2 延時(shí)任務(wù)的添加
如上圖所示,當(dāng)我們向時(shí)間輪添加一個(gè)延時(shí)任務(wù)時(shí)丸凭,并不是大家想象的那樣福扬,直接將延時(shí)任務(wù)添加到時(shí)間輪中,而是首先添加到一個(gè)叫做 timeouts 的 MpscQueue 中贮乳。
// 多線程在向 HashedWheelTimer 添加延時(shí)任務(wù)的時(shí)候,首先會(huì)將任務(wù)添加到 timeouts 中恬惯,而不是直接添加到時(shí)間輪里
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
為什么會(huì)這么設(shè)計(jì)呢 向拆? 時(shí)間輪是一個(gè)單線程驅(qū)動(dòng)的模型,內(nèi)部只有一個(gè) workerThread 來推動(dòng) tick 的轉(zhuǎn)動(dòng)酪耳,并從對(duì)應(yīng) HashedWheelBucket 中拉取延時(shí)任務(wù)浓恳。所以時(shí)間輪采用的是無鎖化的設(shè)計(jì)刹缝,workerThread 在訪問內(nèi)部任何數(shù)據(jù)結(jié)構(gòu)的時(shí)候都不會(huì)加鎖。
而向時(shí)間輪添加延時(shí)任務(wù)的操作卻是多線程執(zhí)行的颈将,如果任務(wù)被直接添加到時(shí)間輪中梢夯,那么就破壞了無鎖化的設(shè)計(jì),workerThread 在訪問內(nèi)部相關(guān)數(shù)據(jù)結(jié)構(gòu)的時(shí)候就必須加鎖了晴圾。
所以為了避免加鎖的開銷颂砸,Netty 引入了一個(gè) MpscQueue 作為中轉(zhuǎn),業(yè)務(wù)多線程首先會(huì)將延時(shí)任務(wù)添加到 MpscQueue 中死姚。等到下一個(gè) tick , workerThread 調(diào)度延時(shí)任務(wù)的時(shí)候人乓,會(huì)統(tǒng)一將 MpscQueue 中的延時(shí)任務(wù)轉(zhuǎn)移到時(shí)間輪中。保證了 workerThread 單線程的無鎖化運(yùn)行都毒。
另外 Netty 時(shí)間輪采用了懶啟動(dòng)的設(shè)計(jì)色罚,只有第一次向時(shí)間輪添加延時(shí)任務(wù)的時(shí)候才會(huì)啟動(dòng)。因?yàn)闀r(shí)間輪一旦啟動(dòng)之后账劲,workerThread 就開始運(yùn)行戳护,推動(dòng) tick 一下一下的向前推進(jìn)。如果時(shí)間輪剛被創(chuàng)建出來就啟動(dòng)瀑焦,此時(shí)內(nèi)部又沒有任何延時(shí)任務(wù)腌且,這就導(dǎo)致了 tick 不必要的空轉(zhuǎn)。
當(dāng)時(shí)間輪啟動(dòng)之后蝠猬,就會(huì)根據(jù)延時(shí)任務(wù) TimerTask 的延時(shí)時(shí)間 delay 計(jì)算到期時(shí)間 deadline 切蟋, 然后將 TimerTask 封裝成 HashedWheelTimeout 添加到 MpscQueue 中。
private static final class HashedWheelTimeout implements Timeout, Runnable {
// 延時(shí)任務(wù)所屬的時(shí)間輪
private final HashedWheelTimer timer;
// 延時(shí)任務(wù)
private final TimerTask task;
// 延時(shí)任務(wù)的 deadline 榆芦,該時(shí)間是一個(gè)絕對(duì)時(shí)間柄粹,以時(shí)間輪的啟動(dòng)時(shí)間 startTime 為起點(diǎn)
private final long deadline;
HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
this.timer = timer;
this.task = task;
this.deadline = deadline;
}
}
之前我們提到過,HashedWheelTimeout 中最重要的一個(gè)屬性就是延時(shí)任務(wù)的到期時(shí)間 deadline 匆绣, deadline 是一個(gè)絕對(duì)時(shí)間戳驻右,Netty 時(shí)間輪中的時(shí)間坐標(biāo)系全部是以時(shí)間輪的啟動(dòng)時(shí)間點(diǎn) startTime 為基準(zhǔn)的,deadline 表示從 startTime 這個(gè)時(shí)間點(diǎn)開始崎淳,到 deadline 這個(gè)時(shí)間點(diǎn)到期堪夭。
為什么這么設(shè)計(jì)呢 ? 這是因?yàn)槲覀冃枰褧r(shí)間輪的啟動(dòng)時(shí)間也考慮進(jìn)延時(shí)時(shí)間的計(jì)算當(dāng)中拣凹。比如森爽,我們向時(shí)間輪中添加一個(gè)延時(shí) 5ms 執(zhí)行的任務(wù),其中時(shí)間輪啟動(dòng)花了 2ms , 那么這個(gè)延時(shí)任務(wù)就應(yīng)該在時(shí)間輪啟動(dòng)后 3ms 開始執(zhí)行嚣镜。所以在計(jì)算延時(shí)任務(wù)到期時(shí)間戳 deadline 的時(shí)候需要減去時(shí)間輪的啟動(dòng)時(shí)間爬迟。后續(xù)時(shí)間輪的時(shí)間坐標(biāo)軸均以 startTime 為基準(zhǔn)。
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
下面是延時(shí)任務(wù)完整的添加邏輯菊匿,整個(gè)時(shí)間復(fù)雜度為 O(1)
:
public class HashedWheelTimer implements Timer {
// 時(shí)間輪的啟動(dòng)時(shí)間戳(納秒)
private volatile long startTime;
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
// 懶啟動(dòng)時(shí)間輪付呕,worker 線程會(huì)等待 100ms 后執(zhí)行
start();
// 計(jì)算延時(shí)任務(wù)到期的時(shí)間戳
// 時(shí)間戳的參考坐標(biāo)系均以時(shí)間輪的啟動(dòng)時(shí)間 startTime 為起點(diǎn)
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
}
3.3 延時(shí)任務(wù)的取消
延時(shí)任務(wù)的取消和添加一樣计福,它們都是在 workerThread 之外進(jìn)行操作的,所以當(dāng)業(yè)務(wù)線程取消一個(gè)延時(shí)任務(wù)時(shí)徽职,也是先將這個(gè)被取消的延時(shí)任務(wù)放到一個(gè) MpscQueue 中暫存象颖。
private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
等到下一個(gè) tick 到來的時(shí)候,workerThread 會(huì)統(tǒng)一處理 cancelledTimeouts 集合中已經(jīng)被取消的延時(shí)任務(wù)姆钉。
private void processCancelledTasks() {
for (;;) {
// workerThread 不斷的從 cancelledTimeouts 中拉取被取消的延時(shí)任務(wù)
HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
// all processed
break;
}
try {
// 將延時(shí)任務(wù)從 HashedWheelBucket 中刪除说订,時(shí)間復(fù)雜度 O(1)
timeout.remove();
} catch (Throwable t) {
}
}
}
延時(shí)任務(wù) HashedWheelTimeout 的狀態(tài)一共有三個(gè),初始為 ST_INIT育韩,任務(wù)被取消之后會(huì)更新為 ST_CANCELLED克蚂,任務(wù)準(zhǔn)備執(zhí)行的時(shí)候會(huì)更新為 ST_EXPIRED。
private static final class HashedWheelTimeout implements Timeout, Runnable {
private static final int ST_INIT = 0;
private static final int ST_CANCELLED = 1;
private static final int ST_EXPIRED = 2;
private volatile int state = ST_INIT;
// 延時(shí)任務(wù)所屬的時(shí)間輪
private final HashedWheelTimer timer;
@Override
public boolean cancel() {
// 更新任務(wù)狀態(tài)為 ST_CANCELLED
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}
timer.cancelledTimeouts.add(this);
return true;
}
}
3.4 時(shí)間輪的啟動(dòng)
之前我們提過筋讨,Netty 時(shí)間輪采用了懶啟動(dòng)的設(shè)計(jì)埃叭,當(dāng)我們首次向時(shí)間輪添加延時(shí)任務(wù)的時(shí)候才會(huì)啟動(dòng)。時(shí)間輪有三種狀態(tài)悉罕,剛被創(chuàng)建出來的時(shí)候狀態(tài)為 WORKER_STATE_INIT赤屋,啟動(dòng)之后狀態(tài)為 WORKER_STATE_STARTED,關(guān)閉之后狀態(tài)為 WORKER_STATE_SHUTDOWN壁袄。
public class HashedWheelTimer implements Timer {
// 初始狀態(tài)
public static final int WORKER_STATE_INIT = 0;
// 啟動(dòng)狀態(tài)
public static final int WORKER_STATE_STARTED = 1;
// 關(guān)閉狀態(tài)
public static final int WORKER_STATE_SHUTDOWN = 2;
// 時(shí)間輪的狀態(tài)类早,初始為 WORKER_STATE_INIT
private volatile int workerState;
// 原子更新時(shí)間輪狀態(tài)的 Updater
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
// 監(jiān)聽時(shí)間輪的啟動(dòng)事件
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
// 時(shí)間輪的啟動(dòng)時(shí)間戳(納秒)
private volatile long startTime;
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
// 啟動(dòng) workerThread , 它是一個(gè)非守護(hù)線程
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// 等待 workerThread 的啟動(dòng)嗜逻,workerThread 啟動(dòng)之后會(huì)設(shè)置 startTime涩僻,并執(zhí)行 startTimeInitialized.countdown
while (startTime == 0) {
try {
// 等待時(shí)間輪的啟動(dòng)
startTimeInitialized.await();
} catch (InterruptedException ignore) {
}
}
}
}
時(shí)間輪中有一個(gè)重要的屬性 startTime,初始狀態(tài)下為 0 栈顷,啟動(dòng)之后逆日,workerThread 會(huì)將啟動(dòng)那一刻的時(shí)間戳設(shè)置到 startTime 中,這個(gè) startTime 非常重要萄凤,因?yàn)楹罄m(xù)時(shí)間輪中的時(shí)間坐標(biāo)系均是以 startTime 為基準(zhǔn)的室抽。時(shí)間輪啟動(dòng)的一項(xiàng)重要工作就是設(shè)置這個(gè) startTime。
private final class Worker implements Runnable {
@Override
public void run() {
// 設(shè)置時(shí)間輪的啟動(dòng)時(shí)間戳
startTime = System.nanoTime();
// 通知正在等待時(shí)間輪啟動(dòng)的業(yè)務(wù)線程
startTimeInitialized.countDown();
..........
}
}
3.5 時(shí)間輪的運(yùn)轉(zhuǎn)
時(shí)間輪會(huì)按照一定的到期 deadline 時(shí)間范圍將所有的延時(shí)任務(wù)分別打散到 512 個(gè) HashedWheelBucket 中靡努,比如坪圾,我們在 tick = 0
這個(gè)時(shí)刻向時(shí)間輪添加延時(shí)任務(wù),如果這個(gè)任務(wù)的 deadline 在 [ 0 , 100 )ms
內(nèi)惑朦,那么它將會(huì)被添加到 HashedWheelBucket0兽泄,中,如果 deadline 在 [ 100 , 200 )ms
內(nèi)漾月,那么就會(huì)被添加到 HashedWheelBucket1 中病梢。同樣的道理,如果 deadline 在 [ 200, 300 )ms
內(nèi)栅屏,它將會(huì)被添加到 HashedWheelBucket2 中飘千,以此類推。
假設(shè)當(dāng)前 tick = 2
, 那么就表示 HashedWheelBucket2 中的延時(shí)任務(wù)馬上要被 workerThread 調(diào)度執(zhí)行栈雳,那么具體在什么時(shí)間執(zhí)行呢 护奈?
時(shí)間輪中的時(shí)間紀(jì)元是 tick = 0
,也就是從 0ms 開始哥纫, HashedWheelBucket2 中所有的延時(shí)任務(wù),它們的 deadline 都在 [ 200, 300 )ms
以內(nèi)。那么當(dāng) tick 從 0 轉(zhuǎn)動(dòng)到 2 的時(shí)候目锭,就表示時(shí)間已經(jīng)過去了 200ms荧呐。
但此時(shí)還不能馬上就開始執(zhí)行 HashedWheelBucket2 中的任務(wù),因?yàn)樗鼈兊难訒r(shí)時(shí)間可能是 210ms , 250ms 也可能是 299ms 擅憔,如果在 tick = 2 也就是 200ms 的這個(gè)時(shí)間點(diǎn)就馬上執(zhí)行鸵闪,那么這些任務(wù)就被提前執(zhí)行了。
所以我們需要等到 300ms 也就是 tick = 3 這個(gè)時(shí)刻才能執(zhí)行 HashedWheelBucket2 中的延時(shí)任務(wù)暑诸,注意這里 tick = 3
指的是具體真實(shí)的時(shí)間已經(jīng)到了 300ms 這個(gè)時(shí)間點(diǎn)蚌讼,而時(shí)間輪中的 tick 還是指向 2 ,并沒有向前推進(jìn)个榕。
也就是說篡石,延時(shí) 210ms , 250ms , 299ms 的任務(wù),需要等到 300ms 之后才能得到執(zhí)行西采,這里我們也可以看出凰萨,時(shí)間輪的精度是 tickDuration (默認(rèn) 100ms),延時(shí)任務(wù)的調(diào)度通常會(huì)晚一個(gè) 100ms 左右械馆。
這里提到 "100ms 左右" 的意思是胖眷,時(shí)間輪中的延時(shí)任務(wù)可能會(huì)被晚調(diào)度 5ms ,也可能晚調(diào)度 9ms ,也可能是幾十毫秒,也有可能是 105ms 狱杰, 108ms , 111ms 瘦材。具體這個(gè)調(diào)度延遲的不確定性是如何產(chǎn)生的,我們放在下一個(gè)小節(jié)在討論仿畸,這里大家有這個(gè)概念就可以了食棕。
因此 workerThread 在調(diào)度延時(shí)任務(wù)的時(shí)候,通常會(huì)首先等到 next tick 的時(shí)間點(diǎn)來臨才會(huì)開始執(zhí)行當(dāng)前 tick 對(duì)應(yīng)的 HashedWheelBucket错沽。
private long waitForNextTick() {
// 獲取 tick + 1 對(duì)應(yīng)的時(shí)間戳 deadline簿晓,后續(xù) workerThread 會(huì) sleep 直到 deadline 到達(dá)
long deadline = tickDuration * (tick + 1);
for (;;) {
// 時(shí)間輪時(shí)間軸中的當(dāng)前時(shí)間戳
final long currentTime = System.nanoTime() - startTime;
// 這里需要保證 workerThread 至少要 sleep 1ms ,防止其被頻繁的喚醒
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
// 如果 deadline 已過期千埃,那么直接返回 currentTime
// tick bucket 中延時(shí)任務(wù)的 deadline 小于等于 currentTime 的就會(huì)被執(zhí)行
if (sleepTimeMs <= 0) {
// currentTime 溢出
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
try {
// sleep 等待到 deadline 時(shí)間點(diǎn)憔儿,然后執(zhí)行當(dāng)前 tick bucket 中的延時(shí)任務(wù)(timeout.deadline <= deadline)
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
// 時(shí)間輪被其他線程關(guān)閉,中斷 worker 線程
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
時(shí)間輪的精度是由 tickDuration 決定的放可,這個(gè)值我們可以調(diào)節(jié)谒臼,默認(rèn)為 100ms , 但最小不能低于 1ms 朝刊。tickDuration 越小,時(shí)間輪的精度越高蜈缤,同時(shí) workerThread 的繁忙程度也越高 拾氓。如果 tickDuration 設(shè)置的過小,那么 workerThread 在這里就會(huì)被頻繁的喚醒底哥。
所以為了防止 workerThread 被頻繁的喚醒咙鞍,我們需要保證至少要 sleep 1ms 。
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
如果 sleepTimeMs <= 0
則說明當(dāng)前時(shí)間點(diǎn) currentTime 已經(jīng)過了 tick + 1 對(duì)應(yīng)的時(shí)間戳 deadline 趾徽, 這樣就不用在這里等待了续滋,直接返回 currentTime。
只要當(dāng)前 tick 對(duì)應(yīng)的 HashedWheelBucket 中的延時(shí)任務(wù)到期時(shí)間小于等于 currentTime (延時(shí)任務(wù)以過期)孵奶,workerThread 會(huì)將會(huì)執(zhí)行它們疲酌。
如果 sleepTimeMs > 0
則說明當(dāng)前時(shí)間還沒有到達(dá) tick + 1 這個(gè)時(shí)間點(diǎn),那么 workerThread 就會(huì)在這里睡眠等待了袁。
當(dāng)時(shí)間到達(dá) tick + 1 這個(gè)時(shí)間點(diǎn)之后徐勃,workerThread 就會(huì)從這里喚醒,轉(zhuǎn)去執(zhí)行當(dāng)前 tick 對(duì)應(yīng)的 HashedWheelBucket 里的延時(shí)任務(wù)早像。
int idx = (int) (tick & mask);
HashedWheelBucket bucket = wheel[idx];
但 HashedWheelBucket 里面此時(shí)可能還是空的僻肖,沒有任何延時(shí)任務(wù)。因?yàn)楫?dāng)業(yè)務(wù)線程在向時(shí)間輪添加延時(shí)任務(wù)的時(shí)候卢鹦,首先是要將任務(wù)添加到一個(gè)叫做 timeouts 的 MpscQueue 中臀脏。也就是說延時(shí)任務(wù)首先會(huì)在 timeouts 中緩存,并不會(huì)直接添加到對(duì)應(yīng)的 HashedWheelBucket 中冀自,
那么 workerThread 在被喚醒之后揉稚,首先要做的就是從 timeouts 中將延時(shí)任務(wù)轉(zhuǎn)移到時(shí)間輪對(duì)應(yīng)的 HashedWheelBucket 中。
private void transferTimeoutsToBuckets() {
// 每個(gè) tick 最多只能從 timeouts 中轉(zhuǎn)移 10 萬個(gè)延時(shí)任務(wù)到時(shí)間輪中
// 防止極端情況下 worker 線程在這里不停地拉取任務(wù)熬粗,執(zhí)行任務(wù)
// 剩下的任務(wù)等到下一個(gè) tick 在進(jìn)行轉(zhuǎn)移
for (int i = 0; i < 100000; i++) {
// 拉取待執(zhí)行的延時(shí)任務(wù)
HashedWheelTimeout timeout = timeouts.poll();
// 跳過已被取消的任務(wù)
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
continue;
}
// 計(jì)算延時(shí)任務(wù)最終會(huì)停留在哪個(gè) tick 上
// 這里的 tick 和 calculated 是一個(gè)絕對(duì)值搀玖,從 0 開始增加,只增不減
long calculated = timeout.deadline / tickDuration;
// 時(shí)間輪從當(dāng)前 tick 開始轉(zhuǎn)動(dòng)到 calculated 需要經(jīng)過多少個(gè)時(shí)鐘周期
timeout.remainingRounds = (calculated - tick) / wheel.length;
// calculated < 當(dāng)前 tick , 則表示延時(shí)任務(wù) timeout 已經(jīng)過期了
// 那么就將過期的 timeout 放在當(dāng)前 tick 中執(zhí)行
final long ticks = Math.max(calculated, tick);
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
當(dāng)任務(wù)轉(zhuǎn)移完成之后驻呐,workerThread 開始處理當(dāng)前 tick 對(duì)應(yīng)的HashedWheelBucket灌诅,將 HashedWheelBucket 中的延時(shí)任務(wù)挨個(gè)拉取出來執(zhí)行。當(dāng)所有到期的延時(shí)任務(wù)被執(zhí)行完之后含末,tick 向前推進(jìn)一格猜拾,開啟新一輪的循環(huán)。
private static final class HashedWheelBucket {
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
while (timeout != null) { // bucket 不為空
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) { // 屬于當(dāng)前時(shí)鐘周期
next = remove(timeout); // 從 bucket 中刪除
if (timeout.deadline <= deadline) {
// 執(zhí)行延時(shí)任務(wù)
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
timeout.remainingRounds --;
}
timeout = next;
}
}
}
時(shí)間輪中的延時(shí)任務(wù)默認(rèn)情況下是由 workerThread 執(zhí)行的佣盒,但如果我們在創(chuàng)建時(shí)間輪的時(shí)候指定了專門的 taskExecutor 挎袜, 那么延時(shí)任務(wù)就會(huì)由這個(gè) taskExecutor 負(fù)責(zé)執(zhí)行,workerThread 只負(fù)責(zé)調(diào)度,大大減輕了 workerThread 的負(fù)荷盯仪。
private static final class HashedWheelTimeout implements Timeout, Runnable {
// 時(shí)間輪
private final HashedWheelTimer timer;
public void expire() {
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}
try {
// 時(shí)間輪的 taskExecutor 負(fù)責(zé)執(zhí)行延時(shí)任務(wù)紊搪,默認(rèn)為 workerThread
timer.taskExecutor.execute(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
+ " for execution.", t);
}
}
}
}
下面是時(shí)間輪運(yùn)轉(zhuǎn)的完整邏輯流程:
private final class Worker implements Runnable {
@Override
public void run() {
do {
// workerThread 這里會(huì)等待到下一個(gè) tick 的時(shí)間點(diǎn)
final long deadline = waitForNextTick();
// deadline < 0 表示 currentTime 溢出
if (deadline > 0) {
int idx = (int) (tick & mask);
// 將 cancelledTimeouts 中已經(jīng)取消的 task 從對(duì)應(yīng) bucket 中刪除
processCancelledTasks();
HashedWheelBucket bucket =
wheel[idx];
// 將 timeouts 中收集的延時(shí)任務(wù)添加到時(shí)間輪中
transferTimeoutsToBuckets();
// 執(zhí)行當(dāng)前 tick 對(duì)應(yīng) bucket 中的所有延時(shí)任務(wù)
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
..............
}
3.6 調(diào)度延遲的不確定性是如何產(chǎn)生的
前面我們提到過,時(shí)間輪只適合那種對(duì)延時(shí)任務(wù)的調(diào)度及時(shí)性要求沒那么高的場景全景,Netty 時(shí)間輪的精度為一個(gè) tickDuration嗦明,默認(rèn)為 100ms 。延時(shí)任務(wù)的調(diào)度通常會(huì)晚 100ms 左右蚪燕。
比如,現(xiàn)在我們向時(shí)間輪添加一個(gè)延時(shí) 5ms 之后執(zhí)行的任務(wù)奔浅,那么這個(gè)延時(shí)任務(wù)可能會(huì)在 8ms 之后執(zhí)行馆纳,也可能是 65ms 之后執(zhí)行,也有可能在 108ms 之后執(zhí)行汹桦÷呈唬總之,時(shí)間輪調(diào)度的延遲范圍會(huì)在 100ms 左右舞骆。那為什么會(huì)出現(xiàn)這種不確定性呢 钥弯?
這其中主要有兩個(gè)原因,首先第一個(gè)原因就是時(shí)間輪的延時(shí)任務(wù)太多督禽,延時(shí)任務(wù)的邏輯比較復(fù)雜脆霎,執(zhí)行時(shí)間略長,導(dǎo)致了 workerThread 的阻塞狈惫,從而造成了任務(wù)調(diào)度的延遲睛蛛。減緩這種情況的一個(gè)措施就是在創(chuàng)建時(shí)間輪的時(shí)候,我們可以指定一個(gè)自定義的 taskExecutor 來專門負(fù)責(zé)延時(shí)任務(wù)的執(zhí)行胧谈,減輕 workerThread 的負(fù)荷忆肾。或者增大 HashedWheelBucket 的個(gè)數(shù)菱肖,盡量的分散延時(shí)任務(wù)客冈,不要讓它們集中在某一個(gè) HashedWheelBucket 中。
第二個(gè)原因是要看我們究竟在哪一個(gè)時(shí)間點(diǎn)向時(shí)間輪添加延時(shí)任務(wù)稳强。不同的添加時(shí)機(jī)场仲,也會(huì)造成調(diào)度的不確定性。這可能有點(diǎn)難以理解退疫,我們來看一個(gè)具體的例子燎窘。
比如,我們在下圖時(shí)間軸中 1ms 這個(gè)時(shí)刻向時(shí)間輪添加一個(gè)延時(shí) 5ms 執(zhí)行的任務(wù)蹄咖。當(dāng)前時(shí)間輪如上圖所示褐健,tick 指向 0 。
延時(shí) 5ms 的任務(wù)會(huì)被添加到 HashedWheelBucket0 中,此時(shí) workerThread 還在 sleep 等待 next tick 也就是 100ms 這個(gè)時(shí)間點(diǎn)的到來蚜迅。
我們在 1ms 這個(gè)時(shí)刻添加的這個(gè)延時(shí)任務(wù)本來應(yīng)該在時(shí)間軸中的 6ms 這個(gè)時(shí)間點(diǎn)執(zhí)行舵匾,但是現(xiàn)在 workerThread 還在睡眠,需要等到 100ms 這個(gè)時(shí)間點(diǎn)才能被喚醒去執(zhí)行 HashedWheelBucket0 中的延時(shí)任務(wù)谁不。這就產(chǎn)生了 90 ms 的調(diào)度延時(shí)坐梯。
但如果我們在時(shí)間軸的 94ms 位置處添加這個(gè) 5ms 的延時(shí)任務(wù),那么這個(gè)延時(shí)任務(wù)本應(yīng)該在時(shí)間軸的 99ms 這個(gè)時(shí)間點(diǎn)被執(zhí)行刹帕,但由于 workerThread 在 100ms 這個(gè)時(shí)間點(diǎn)才會(huì)被喚醒吵血,所以產(chǎn)生了 1ms 的調(diào)度延時(shí)。
如果非常不幸偷溺,我們恰好卡在了時(shí)間軸 95ms 這個(gè)時(shí)間點(diǎn)添加這個(gè) 5ms 的延時(shí)任務(wù)蹋辅,此時(shí)要注意,這個(gè)延時(shí)任務(wù)會(huì)被放在 HashedWheelBucket1 中而不是 HashedWheelBucket0挫掏。
而 HashedWheelBucket1 中的延時(shí)任務(wù)侦另,workerThread 需要等到時(shí)間軸 200ms 這個(gè)時(shí)間點(diǎn)才會(huì)去執(zhí)行,這樣一來尉共,本應(yīng)該在 100ms 這個(gè)時(shí)間點(diǎn)執(zhí)行的延時(shí)任務(wù)褒傅,時(shí)間輪卻在 200ms 這個(gè)時(shí)間點(diǎn)來調(diào)度,這就產(chǎn)生了 100ms 的調(diào)度延時(shí)袄友。如果在算上 CPU 調(diào)度 workerThread 的時(shí)間殿托,那么這個(gè)延遲可能就在 105ms 或者 108ms 左右。這里大家可以對(duì)照上小節(jié)的內(nèi)容剧蚣,仔細(xì)想想是不是這么回事碌尔。
3.7 時(shí)間輪的關(guān)閉
時(shí)間輪定義了三種狀態(tài),在剛被創(chuàng)建出來的時(shí)候狀態(tài)為 WORKER_STATE_INIT券敌,啟動(dòng)之后狀態(tài)為 WORKER_STATE_STARTED唾戚。
public class HashedWheelTimer implements Timer {
// 初始狀態(tài)
public static final int WORKER_STATE_INIT = 0;
// 啟動(dòng)狀態(tài)
public static final int WORKER_STATE_STARTED = 1;
// 關(guān)閉狀態(tài)
public static final int WORKER_STATE_SHUTDOWN = 2;
// 時(shí)間輪的狀態(tài),初始為 WORKER_STATE_INIT
private volatile int workerState;
}
當(dāng)時(shí)間輪要關(guān)閉的時(shí)候待诅,我們就需要將 workerState 更新為 WORKER_STATE_SHUTDOWN叹坦。當(dāng) workerThread 檢測到時(shí)間輪的狀態(tài)不是 WORKER_STATE_STARTED 的時(shí)候,就會(huì)退出 do ... while
循環(huán)卑雁,停止時(shí)間輪的轉(zhuǎn)動(dòng)募书。
隨后 workerThread 會(huì)將當(dāng)前時(shí)間輪中所有 HashedWheelBuckets 中遺留的(未來得及執(zhí)行,未取消)的延時(shí)任務(wù)以及 timeouts 隊(duì)列中緩存的待執(zhí)行延時(shí)任務(wù)(未取消)統(tǒng)統(tǒng)轉(zhuǎn)移到 unprocessedTimeouts 集合中测蹲。并將已經(jīng)取消的延時(shí)任務(wù)從對(duì)應(yīng) HashedWheelBucket 中刪除莹捡。
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
@Override
public void run() {
do {
........ 時(shí)間輪運(yùn)轉(zhuǎn)邏輯 .......
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
for (HashedWheelBucket bucket: wheel) {
// 將 bucket 中還沒來得及執(zhí)行并且沒有被取消的任務(wù)轉(zhuǎn)移到 unprocessedTimeouts
bucket.clearTimeouts(unprocessedTimeouts);
}
// 將 timeouts 中緩存的待執(zhí)行任務(wù)(沒有被取消)轉(zhuǎn)移到 unprocessedTimeouts
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
// 將 cancelledTimeouts 中的延時(shí)任務(wù)從對(duì)應(yīng) bucket 中刪除
processCancelledTasks();
}
}
最后 Netty 會(huì)將 unprocessedTimeouts 集合中收集到的那些還未來得及執(zhí)行的延時(shí)任務(wù)全部取消,然后將這些延時(shí)任務(wù)返回給業(yè)務(wù)線程扣甲,由業(yè)務(wù)線程自行處理篮赢。
時(shí)間輪關(guān)閉的完整流程總結(jié)如下:
在延時(shí)任務(wù)中不能執(zhí)行時(shí)間輪關(guān)閉的操作齿椅。
原子更新時(shí)間輪的狀態(tài)為 WORKER_STATE_SHUTDOWN
中斷 workerThread,并等待 workerThread 結(jié)束启泣。
取消所有還未來得及調(diào)度的延時(shí)任務(wù)涣脚,并返回給業(yè)務(wù)線程。
public class HashedWheelTimer implements Timer {
@Override
public Set<Timeout> stop() {
// 在延時(shí)任務(wù)中不能執(zhí)行停止時(shí)間輪的操作
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from " +
TimerTask.class.getSimpleName());
}
// 停止時(shí)間輪
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// cas 更新狀態(tài)失敗寥茫,這里時(shí)間輪的狀態(tài)可能會(huì)是兩種:
// 1. WORKER_STATE_INIT 還未啟動(dòng)
// 2. WORKER_STATE_SHUTDOWN 已經(jīng)停止
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
// 如果時(shí)間輪還未啟動(dòng)遣蚀,那么直接停止就好了
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
// 時(shí)間輪沒有啟動(dòng),那么肯定也就沒有延時(shí)任務(wù)纱耻,這里直接返回一個(gè)空集合就行
return Collections.emptySet();
}
try {
boolean interrupted = false;
while (workerThread.isAlive()) {
// 中斷 workerThread芭梯,使其從 sleep 中喚醒,執(zhí)行時(shí)間輪關(guān)閉的邏輯
// 如果 workerThread 在運(yùn)行弄喘,那么此時(shí)時(shí)間輪已經(jīng)是 WORKER_STATE_SHUTDOWN 狀態(tài)
// workerThread 會(huì)退出 do while 循環(huán)轉(zhuǎn)去執(zhí)行時(shí)間輪的關(guān)閉邏輯
workerThread.interrupt();
try {
// 等待 workerThread 的結(jié)束
workerThread.join(100);
} catch (InterruptedException ignored) {
// 當(dāng)前線程被中斷
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
if (leak != null) {
// 關(guān)閉資源泄露探測
boolean closed = leak.close(this);
assert closed;
}
}
// 獲取還未來得及處理的延時(shí)任務(wù)
Set<Timeout> unprocessed = worker.unprocessedTimeouts();
Set<Timeout> cancelled = new HashSet<Timeout>(unprocessed.size());
// 將還未來得及處理的任務(wù)全部取消玖喘,然后返回
for (Timeout timeout : unprocessed) {
if (timeout.cancel()) {
cancelled.add(timeout);
}
}
return cancelled;
}
}