定時(shí)任務(wù)
Netty、Quartz、Kafka 以及 Linux 都有定時(shí)任務(wù)功能。
JDK 自帶的 java.util.Timer
和 DelayedQueue
可實(shí)現(xiàn)簡(jiǎn)單的定時(shí)任務(wù),底層用的是堆,存取復(fù)雜度都是 O(nlog(n))烹俗,但無法支撐海量定時(shí)任務(wù)爆侣。
在任務(wù)量大、性能要求高的場(chǎng)景幢妄,為了將任務(wù)存取及取消操作時(shí)間復(fù)雜度降為 O(1)兔仰,會(huì)采用時(shí)間輪算法。
什么是時(shí)間輪
- 調(diào)度模型:時(shí)間輪是為解決高效調(diào)度任務(wù)而產(chǎn)生的調(diào)度模型蕉鸳。
- 數(shù)據(jù)結(jié)構(gòu):通常由 hash table 和 鏈表 實(shí)現(xiàn)的數(shù)據(jù)結(jié)構(gòu)乎赴。
- 延時(shí)任務(wù)、周期性任務(wù):應(yīng)用場(chǎng)景主要在延遲大規(guī)模的延時(shí)任務(wù)潮尝、周期性的定時(shí)任務(wù)等榕吼。目前在 Kafka、caffeine勉失、netty 等各種任務(wù)調(diào)度功能中作為調(diào)度器使用羹蚣。
時(shí)間輪模型及其應(yīng)用
一種高效批量管理定時(shí)任務(wù)的調(diào)度模型。一般會(huì)實(shí)現(xiàn)成一個(gè)環(huán)形結(jié)構(gòu)乱凿,類似一個(gè)時(shí)鐘蝗砾,分為很多槽皱碘,一個(gè)槽代表一個(gè)時(shí)間間隔,每個(gè)槽使用雙向鏈表存儲(chǔ)定時(shí)任務(wù)席噩。
指針周期性跳動(dòng)癌幕,跳動(dòng)到一個(gè)槽位鹏漆,就執(zhí)行該槽位的定時(shí)任務(wù)抑钟。
時(shí)間輪應(yīng)用:
- 故障恢復(fù)
- 流量控制
- 調(diào)度算法
- 控制網(wǎng)絡(luò)中的數(shù)據(jù)包生命周期
計(jì)時(shí)器維護(hù)代價(jià)高德迹,如果
- 處理器在每個(gè)時(shí)鐘滴答聲中都會(huì)中斷
- 使用精細(xì)粒度計(jì)時(shí)器
- 未完成的計(jì)時(shí)器很多
需要高效的定時(shí)器算法以減少總體中斷的開銷。
單層時(shí)間輪的容量和精度都是有限的寺枉,對(duì)于精度要求特別高抑淫、時(shí)間跨度特別大或是海量定時(shí)任務(wù)需要調(diào)度的場(chǎng)景,通常會(huì)使用多級(jí)時(shí)間輪以及持久化存儲(chǔ)與時(shí)間輪結(jié)合的方案型凳。
Dubbo的時(shí)間輪結(jié)構(gòu)
Dubbo 時(shí)間輪實(shí)現(xiàn)位于 dubbo-common 模塊的 org.apache.dubbo.common.timer 包丈冬,下面我們就來分析時(shí)間輪涉及的核心接口和實(shí)現(xiàn)嘱函。
TimerTask
在 Dubbo 中甘畅,所有定時(shí)任務(wù)都要實(shí)現(xiàn) TimerTask 接口。只定義了一個(gè) run() 方法往弓,入?yún)⑹且粋€(gè) Timeout 接口對(duì)象疏唾。
public interface TimerTask {
void run(Timeout timeout) throws Exception;
}
Timeout
Timeout 對(duì)象與 TimerTask 對(duì)象一一對(duì)應(yīng),類似線程池返回的 Future 對(duì)象與提交到線程池中的任務(wù)對(duì)象之間的關(guān)系函似。
通過 Timeout 對(duì)象槐脏,不僅可以查看定時(shí)任務(wù)的狀態(tài),還可以操作定時(shí)任務(wù)(例如取消關(guān)聯(lián)的定時(shí)任務(wù))撇寞。
/**
* Timeout 對(duì)象與 TimerTask 對(duì)象一一對(duì)應(yīng)顿天,兩者的關(guān)系類似于線程池返回的 Future 對(duì)象與提交到線程池中的任務(wù)對(duì)象之間的關(guān)系堂氯。
* 通過 Timeout 對(duì)象,我們不僅可以查看定時(shí)任務(wù)的狀態(tài)牌废,還可以操作定時(shí)任務(wù)
*/
public interface Timeout {
/**
* 返回創(chuàng)建自己的定時(shí)器
*/
Timer timer();
/**
* 返回關(guān)聯(lián)的定時(shí)任務(wù)
*/
TimerTask task();
/**
* 返回定時(shí)任務(wù)是否到期
*/
boolean isExpired();
/**
* 返回定時(shí)任務(wù)是否被取消
*/
boolean isCancelled();
/**
* 嘗試取消定時(shí)任務(wù)咽白,如果任務(wù)已經(jīng)被執(zhí)行或已經(jīng)取消,方法正常返回.
*
* @return True if the cancellation completed successfully, otherwise false
*/
boolean cancel();
}
Timer
Timer 接口定義了定時(shí)器的基本行為鸟缕,核心是 newTimeout() :提交一個(gè)定時(shí)任務(wù)(TimerTask)并返回關(guān)聯(lián)的 Timeout 對(duì)象晶框,類似于向線程池提交任務(wù)。
public interface Timer {
/**
* 提交一個(gè)定時(shí)任務(wù)(TimerTask)懂从,類似于向線程池提交任務(wù)
* @return 返回關(guān)聯(lián)的 Timeout 對(duì)象
* @throws IllegalStateException if this timer has been {@linkplain #stop() stopped} already
* @throws RejectedExecutionException if the pending timeouts are too many and creating new timeout
* can cause instability in the system.
*/
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
/**
* @return 方法返回被取消的任務(wù)對(duì)應(yīng)的Timeout集合
*/
Set<Timeout> stop();
/**
* 判斷定時(shí)器是否停止
*
* @return true for stop
*/
boolean isStop();
}
HashedWheelTimeout
HashedWheelTimeout 是 Timeout 接口的唯一實(shí)現(xiàn)授段,是 HashedWheelTimer 的內(nèi)部類。HashedWheelTimeout 扮演了兩個(gè)角色:
- 1番甩、時(shí)間輪中雙向鏈表的節(jié)點(diǎn)侵贵,即定時(shí)任務(wù) TimerTask 在 HashedWheelTimer 中的容器。
- 2对室、定時(shí)任務(wù) TimerTask 提交到 HashedWheelTimer 之后返回的句柄(Handle)模燥,用于在時(shí)間輪外部查看和控制定時(shí)任務(wù)。
HashedWheelTimeout 中的核心字段如下:
prev掩宜、next(HashedWheelTimeout類型):分別對(duì)應(yīng)當(dāng)前定時(shí)任務(wù)在鏈表中的前驅(qū)節(jié)點(diǎn)和后繼節(jié)點(diǎn)蔫骂。
task(TimerTask類型):指實(shí)際被調(diào)度的任務(wù)。
deadline(long類型):指定時(shí)任務(wù)執(zhí)行的時(shí)間牺汤。這個(gè)時(shí)間是在創(chuàng)建 HashedWheelTimeout 時(shí)指定的辽旋,計(jì)算公式是:currentTime(創(chuàng)建 HashedWheelTimeout 的時(shí)間) + delay(任務(wù)延遲時(shí)間) - startTime(HashedWheelTimer 的啟動(dòng)時(shí)間),時(shí)間單位為納秒檐迟。
state(volatile int類型):指定時(shí)任務(wù)當(dāng)前所處狀態(tài)补胚,可選的有三個(gè),分別是 INIT(0)追迟、CANCELLED(1)和 EXPIRED(2)溶其。另外,還有一個(gè) STATE_UPDATER 字段(AtomicIntegerFieldUpdater類型)實(shí)現(xiàn) state 狀態(tài)變更的原子性敦间。
remainingRounds(long類型):指當(dāng)前任務(wù)剩余的時(shí)鐘周期數(shù)瓶逃。時(shí)間輪所能表示的時(shí)間長(zhǎng)度是有限的,在任務(wù)到期時(shí)間與當(dāng)前時(shí)刻的時(shí)間差廓块,超過時(shí)間輪單圈能表示的時(shí)長(zhǎng)厢绝,就出現(xiàn)了套圈的情況,需要該字段值表示剩余的時(shí)鐘周期带猴。
HashedWheelTimeout 中的核心方法有:
isCancelled()昔汉、isExpired() 、state() 方法:主要用于檢查當(dāng)前 HashedWheelTimeout 狀態(tài)拴清。
cancel() 方法:將當(dāng)前 HashedWheelTimeout 的狀態(tài)設(shè)置為 CANCELLED靶病,并將當(dāng)前 HashedWheelTimeout 添加到 cancelledTimeouts 隊(duì)列中等待銷毀会通。
expire() 方法:當(dāng)任務(wù)到期時(shí),會(huì)調(diào)用該方法將當(dāng)前 HashedWheelTimeout 設(shè)置為 EXPIRED 狀態(tài)娄周,然后調(diào)用其中的 TimerTask 的 run() 方法執(zhí)行定時(shí)任務(wù)渴语。
remove() 方法:將當(dāng)前 HashedWheelTimeout 從時(shí)間輪中刪除。
/**
* HashedWheelTimeout 是 Timeout 接口的唯一實(shí)現(xiàn)
* 1. 時(shí)間輪中雙向鏈表的節(jié)點(diǎn)昆咽,即定時(shí)任務(wù) TimerTask 在 HashedWheelTimer 中的容器
* 2. 定時(shí)任務(wù) TimerTask 提交到 HashedWheelTimer 之后返回的句柄驾凶,用于在時(shí)間輪外部查看和控制定時(shí)任務(wù)
*/
private static final class HashedWheelTimeout implements Timeout {
/** 狀態(tài)機(jī) */
private static final int ST_INIT = 0;
private static final int ST_CANCELLED = 1;
private static final int ST_EXPIRED = 2;
/** 實(shí)現(xiàn) state 狀態(tài)變更的原子性 */
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
// 所在的時(shí)間輪,定時(shí)器
private final HashedWheelTimer timer;
// 關(guān)聯(lián)的定時(shí)任務(wù)
private final TimerTask task;
private final long deadline;
@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"})
private volatile int state = ST_INIT;
/**
* 當(dāng)前任務(wù)剩余的時(shí)鐘周期數(shù)掷酗,由Worker計(jì)算.
* 時(shí)間輪所能表示的時(shí)間長(zhǎng)度是有限的调违,在任務(wù)到期時(shí)間與當(dāng)前時(shí)刻的時(shí)間差,超過時(shí)間輪單圈能表示的時(shí)長(zhǎng)泻轰,就出現(xiàn)了套圈的情況技肩,需要該字段值表示剩余的時(shí)鐘周期
* transferTimeoutsToBuckets() before the HashedWheelTimeout will be added to the correct HashedWheelBucket.
*/
long remainingRounds;
/**
* 在 HashedWheelTimerBucket構(gòu)建雙向鏈表
* 只要worker線程操作,不需要同步原語進(jìn)行同步
*/
HashedWheelTimeout next;
HashedWheelTimeout prev;
/**
* 所添加的桶
*/
HashedWheelBucket bucket;
/**
* 構(gòu)造函數(shù)浮声,HashedWheelTimer的newTimeout方法中調(diào)用
*/
HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
this.timer = timer;
this.task = task;
this.deadline = deadline;
}
@Override
public Timer timer() {
return timer;
}
@Override
public TimerTask task() {
return task;
}
@Override
public boolean cancel() {
// only update the state it will be removed from HashedWheelBucket on next tick.
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}
// If a task should be canceled we put this to another queue which will be processed on each tick.
// So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
// we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
timer.cancelledTimeouts.add(this);
return true;
}
void remove() {
HashedWheelBucket bucket = this.bucket;
if (bucket != null) {
bucket.remove(this);
} else {
timer.pendingTimeouts.decrementAndGet();
}
}
public boolean compareAndSetState(int expected, int state) {
return STATE_UPDATER.compareAndSet(this, expected, state);
}
public int state() {
return state;
}
@Override
public boolean isCancelled() {
return state() == ST_CANCELLED;
}
@Override
public boolean isExpired() {
return state() == ST_EXPIRED;
}
/**
* 設(shè)置到期虚婿,并調(diào)用TimeTask的run方法
*/
public void expire() {
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}
try {
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}
@Override
public String toString() {
final long currentTime = System.nanoTime();
long remaining = deadline - currentTime + timer.startTime;
String simpleClassName = ClassUtils.simpleClassName(this.getClass());
StringBuilder buf = new StringBuilder(192)
.append(simpleClassName)
.append('(')
.append("deadline: ");
if (remaining > 0) {
buf.append(remaining)
.append(" ns later");
} else if (remaining < 0) {
buf.append(-remaining)
.append(" ns ago");
} else {
buf.append("now");
}
if (isCancelled()) {
buf.append(", cancelled");
}
return buf.append(", task: ")
.append(task())
.append(')')
.toString();
}
}
HashedWheelBucket
HashedWheelBucket 是時(shí)間輪中的一個(gè)槽,時(shí)間輪中的槽實(shí)際上就是一個(gè)用于緩存和管理雙向鏈表的容器泳挥,雙向鏈表中的每一個(gè)節(jié)點(diǎn)就是一個(gè) HashedWheelTimeout 對(duì)象然痊,也就關(guān)聯(lián)了一個(gè) TimerTask 定時(shí)任務(wù)。
HashedWheelBucket 持有雙向鏈表的首尾兩個(gè)節(jié)點(diǎn)屉符,分別是 head 和 tail 兩個(gè)字段剧浸,再加上每個(gè) HashedWheelTimeout 節(jié)點(diǎn)均持有前驅(qū)和后繼的引用,這樣就可以正向或是逆向遍歷整個(gè)雙向鏈表了矗钟。
HashedWheelBucket 中的核心方法:
addTimeout() 方法:新增 HashedWheelTimeout 到雙向鏈表的尾部。
pollTimeout() 方法:移除雙向鏈表中的頭結(jié)點(diǎn)吨艇,并將其返回躬它。
remove() 方法:從雙向鏈表中移除指定的 HashedWheelTimeout 節(jié)點(diǎn)。
clearTimeouts() 方法:循環(huán)調(diào)用 pollTimeout() 方法處理整個(gè)雙向鏈表东涡,并返回所有未超時(shí)或者未被取消的任務(wù)冯吓。
expireTimeouts() 方法:遍歷雙向鏈表中的全部 HashedWheelTimeout 節(jié)點(diǎn)。 在處理到期的定時(shí)任務(wù)時(shí)软啼,會(huì)通過 remove() 方法取出桑谍,并調(diào)用其 expire() 方法執(zhí)行延柠;對(duì)于已取消的任務(wù)祸挪,通過 remove() 方法取出后直接丟棄;對(duì)于未到期的任務(wù)贞间,會(huì)將 remainingRounds 字段(剩余時(shí)鐘周期數(shù))減一贿条。
/**
* HashedWheelBucket 是時(shí)間輪中的一個(gè)桶
* 時(shí)間輪中的桶實(shí)際上就是一個(gè)用于緩存和管理雙向鏈表的容器雹仿,
* 雙向鏈表中的每一個(gè)節(jié)點(diǎn)就是一個(gè) HashedWheelTimeout 對(duì)象,也就關(guān)聯(lián)了一個(gè) TimerTask 定時(shí)任務(wù)整以。
*/
private static final class HashedWheelBucket {
/** 雙向鏈表結(jié)構(gòu) */
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
/**
* 尾插Timeout
*/
void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {
head = tail = timeout;
} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}
/**
* 調(diào)用所有到期的HashedWheelTimeout的expire胧辽,移除cancel的Timeout
*/
void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
next = remove(timeout);
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
// timeout是取消狀態(tài),直接從桶中干掉
next = remove(timeout);
} else {
// 否則減1輪表盤
timeout.remainingRounds--;
}
timeout = next;
}
}
/**
* 從桶中移除timeout公黑,并返回下一個(gè)
* @param timeout
* @return
*/
public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
HashedWheelTimeout next = timeout.next;
// remove timeout that was either processed or cancelled by updating the linked-list
if (timeout.prev != null) {
timeout.prev.next = next;
}
if (timeout.next != null) {
timeout.next.prev = timeout.prev;
}
if (timeout == head) {
// if timeout is also the tail we need to adjust the entry too
if (timeout == tail) {
tail = null;
head = null;
} else {
head = next;
}
} else if (timeout == tail) {
// if the timeout is the tail modify the tail to be the prev node.
tail = timeout.prev;
}
// null out prev, next and bucket to allow for GC.
timeout.prev = null;
timeout.next = null;
timeout.bucket = null;
timeout.timer.pendingTimeouts.decrementAndGet();
return next;
}
/**
* 清空桶邑商,并返回所有沒到期或沒取消的Timeouts.
*/
void clearTimeouts(Set<Timeout> set) {
for (; ; ) {
HashedWheelTimeout timeout = pollTimeout();
if (timeout == null) {
return;
}
if (timeout.isExpired() || timeout.isCancelled()) {
continue;
}
set.add(timeout);
}
}
/**
* 取出頭結(jié)點(diǎn)
* @return
*/
private HashedWheelTimeout pollTimeout() {
HashedWheelTimeout head = this.head;
if (head == null) {
return null;
}
HashedWheelTimeout next = head.next;
if (next == null) {
tail = this.head = null;
} else {
this.head = next;
next.prev = null;
}
// null out prev and next to allow for GC.
head.next = null;
head.prev = null;
head.bucket = null;
return head;
}
}
HashedWheelTimer
HashedWheelTimer 是 Timer 接口的實(shí)現(xiàn),它通過時(shí)間輪算法實(shí)現(xiàn)了一個(gè)定時(shí)器凡蚜。HashedWheelTimer 會(huì)根據(jù)當(dāng)前時(shí)間輪指針選定對(duì)應(yīng)的槽(HashedWheelBucket)人断,從雙向鏈表的頭部開始迭代,對(duì)每個(gè)定時(shí)任務(wù)(HashedWheelTimeout)進(jìn)行計(jì)算朝蜘,屬于當(dāng)前時(shí)鐘周期則取出運(yùn)行恶迈,不屬于則將其剩余的時(shí)鐘周期數(shù)減一操作。
HashedWheelTimer 的核心屬性:
workerState(volatile int類型):時(shí)間輪當(dāng)前所處狀態(tài)谱醇,可選值有 init暇仲、started、shutdown副渴。同時(shí)奈附,有相應(yīng)的 AtomicIntegerFieldUpdater 實(shí)現(xiàn) workerState 的原子修改。
startTime(long類型):當(dāng)前時(shí)間輪的啟動(dòng)時(shí)間煮剧,提交到該時(shí)間輪的定時(shí)任務(wù)的 deadline 字段值均以該時(shí)間戳為起點(diǎn)進(jìn)行計(jì)算桅狠。
wheel(HashedWheelBucket[]類型):該數(shù)組就是時(shí)間輪的環(huán)形隊(duì)列,每一個(gè)元素都是一個(gè)槽轿秧。當(dāng)指定時(shí)間輪槽數(shù)為 n 時(shí)中跌,實(shí)際上會(huì)取大于且最靠近 n 的 2 的冪次方值。
timeouts菇篡、cancelledTimeouts(LinkedBlockingQueue類型):timeouts 隊(duì)列用于緩沖外部提交時(shí)間輪中的定時(shí)任務(wù)漩符,cancelledTimeouts 隊(duì)列用于暫存取消的定時(shí)任務(wù)。HashedWheelTimer 會(huì)在處理 HashedWheelBucket 的雙向鏈表之前驱还,先處理這兩個(gè)隊(duì)列中的數(shù)據(jù)嗜暴。
tick(long類型):該字段在 HashedWheelTimer$Worker 中,是時(shí)間輪的指針议蟆,是一個(gè)步長(zhǎng)為 1 的單調(diào)遞增計(jì)數(shù)器闷沥。
mask(int類型):掩碼, mask = wheel.length - 1咐容,執(zhí)行 ticks & mask 便能定位到對(duì)應(yīng)的時(shí)鐘槽舆逃。
ticksDuration(long類型):時(shí)間指針每次加 1 所代表的實(shí)際時(shí)間,單位為納秒。
pendingTimeouts(AtomicLong類型):當(dāng)前時(shí)間輪剩余的定時(shí)任務(wù)總數(shù)路狮。
workerThread(Thread類型):時(shí)間輪內(nèi)部真正執(zhí)行定時(shí)任務(wù)的線程虫啥。
worker(Worker類型):真正執(zhí)行定時(shí)任務(wù)的邏輯封裝這個(gè) Runnable 對(duì)象中。
時(shí)間輪對(duì)外提供了一個(gè) newTimeout() 接口用于提交定時(shí)任務(wù)奄妨,在定時(shí)任務(wù)進(jìn)入到 timeouts 隊(duì)列之前會(huì)先調(diào)用 start() 方法啟動(dòng)時(shí)間輪涂籽,其中會(huì)完成下面兩個(gè)關(guān)鍵步驟:
- 1、確定時(shí)間輪的 startTime 字段砸抛。
- 2评雌、啟動(dòng) workerThread 線程,開始執(zhí)行 worker 任務(wù)直焙。
之后根據(jù) startTime 計(jì)算該定時(shí)任務(wù)的 deadline 字段柳骄,最后才能將定時(shí)任務(wù)封裝成 HashedWheelTimeout 并添加到 timeouts 隊(duì)列。
下面分析時(shí)間輪指針一次轉(zhuǎn)動(dòng)的全流程:
1箕般、時(shí)間輪指針轉(zhuǎn)動(dòng)耐薯,時(shí)間輪周期開始。
2丝里、清理用戶主動(dòng)取消的定時(shí)任務(wù)曲初,這些定時(shí)任務(wù)在用戶取消時(shí),會(huì)記錄到 cancelledTimeouts 隊(duì)列中杯聚。在每次指針轉(zhuǎn)動(dòng)的時(shí)候臼婆,時(shí)間輪都會(huì)清理該隊(duì)列。
3幌绍、將緩存在 timeouts 隊(duì)列中的定時(shí)任務(wù)轉(zhuǎn)移到時(shí)間輪中對(duì)應(yīng)的槽中颁褂。
4、根據(jù)當(dāng)前指針定位對(duì)應(yīng)槽傀广,處理該槽位的雙向鏈表中的定時(shí)任務(wù)颁独。
5、檢測(cè)時(shí)間輪的狀態(tài)伪冰。如果時(shí)間輪處于運(yùn)行狀態(tài)誓酒,則循環(huán)執(zhí)行上述步驟,不斷執(zhí)行定時(shí)任務(wù)贮聂。如果時(shí)間輪處于停止?fàn)顟B(tài)靠柑,則執(zhí)行下面的步驟獲取到未被執(zhí)行的定時(shí)任務(wù)并加入 unprocessedTimeouts 隊(duì)列:遍歷時(shí)間輪中每個(gè)槽位,并調(diào)用 clearTimeouts() 方法吓懈;對(duì) timeouts 隊(duì)列中未被加入槽中循環(huán)調(diào)用 poll()歼冰。
5、最后再次清理 cancelledTimeouts 隊(duì)列中用戶主動(dòng)取消的定時(shí)任務(wù)耻警。
上述核心邏輯在 HashedWheelTimer$Worker.run() 方法中隔嫡,
Worker
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
/** cnt滴答數(shù) */
private long tick;
/**
* 時(shí)間輪指針一次轉(zhuǎn)動(dòng)的全流程甸怕。
*
* 1. 時(shí)間輪指針轉(zhuǎn)動(dòng),時(shí)間輪周期開始畔勤。
* 2. 清理用戶主動(dòng)取消的定時(shí)任務(wù),這些定時(shí)任務(wù)在用戶取消時(shí)扒磁,會(huì)記錄到 cancelledTimeouts 隊(duì)列中庆揪。
* 在每次指針轉(zhuǎn)動(dòng)的時(shí)候,時(shí)間輪都會(huì)清理該隊(duì)列妨托。
* 3. 將緩存在 timeouts 隊(duì)列中的定時(shí)任務(wù)轉(zhuǎn)移到時(shí)間輪中對(duì)應(yīng)的槽中缸榛。
* 4. 根據(jù)當(dāng)前指針定位對(duì)應(yīng)槽,處理該槽位的雙向鏈表中的定時(shí)任務(wù)兰伤。
* 5. 檢測(cè)時(shí)間輪的狀態(tài)内颗。如果時(shí)間輪處于運(yùn)行狀態(tài),則循環(huán)執(zhí)行上述步驟敦腔,不斷執(zhí)行定時(shí)任務(wù)均澳。
* 如果時(shí)間輪處于停止?fàn)顟B(tài),則執(zhí)行下面的步驟獲取到未被執(zhí)行的定時(shí)任務(wù)并加入 unprocessedTimeouts 隊(duì)列:
* 遍歷時(shí)間輪中每個(gè)槽位符衔,并調(diào)用 clearTimeouts() 方法找前;對(duì) timeouts 隊(duì)列中未被加入槽中循環(huán)調(diào)用 poll()。
* 6. 最后再次清理 cancelledTimeouts 隊(duì)列中用戶主動(dòng)取消的定時(shí)任務(wù)判族。
*/
@Override
public void run() {
// Initialize the startTime.
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
// Notify the other threads waiting for the initialization at start().
startTimeInitialized.countDown();
// Worker啟動(dòng)時(shí)循環(huán)執(zhí)行躺盛,相當(dāng)于時(shí)間輪不停地轉(zhuǎn)動(dòng)
do {
final long deadline = waitForNextTick();
if (deadline > 0) {
// 計(jì)算指針指向的桶
int idx = (int) (tick & mask);
// 先處理被取消的任務(wù)
processCancelledTasks();
HashedWheelBucket bucket = wheel[idx];
// 遍歷 timeouts 隊(duì)列中的定時(shí)任務(wù)添加到桶中
transferTimeoutsToBuckets();
// 調(diào)用所有到期的HashedWheelTimeout的expire,移除cancel的Timeout
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// 上面的循環(huán)結(jié)束后形帮,遍歷清空所有的桶槽惫,將未過期或未取消的任務(wù)保存至unprocessedTimeouts集合,便于返回給stop()方法
for (HashedWheelBucket bucket : wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
// 將timeouts緩沖隊(duì)列中未取消的任務(wù)也添加到unprocessedTimeouts中
for (; ; ) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
// remove所有cancelledTimeouts隊(duì)列中的任務(wù)
processCancelledTasks();
}
private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
long calculated = timeout.deadline / tickDuration;
timeout.remainingRounds = (calculated - tick) / wheel.length;
// 計(jì)算落入的桶index辩撑,確保不會(huì)是過去時(shí)間
final long ticks = Math.max(calculated, tick);
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
private void processCancelledTasks() {
for (; ; ) {
HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
// all processed
break;
}
try {
timeout.remove();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown while process a cancellation task", t);
}
}
}
}
/**
* 根據(jù)startTime和tick數(shù)計(jì)算目標(biāo)ns界斜,然后等待到目標(biāo)ns
*
* @return Long.MIN_VALUE if received a shutdown request,
* current time otherwise (with Long.MIN_VALUE changed by +1)
*/
private long waitForNextTick() {
// 下一次滴答時(shí)間
long deadline = tickDuration * (tick + 1);
for (; ; ) {
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
if (isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
Set<Timeout> unprocessedTimeouts() {
return Collections.unmodifiableSet(unprocessedTimeouts);
}
}
HashedWheelTimer
/**
* 每個(gè)滴答耗時(shí) 通常 100ms
* 時(shí)間輪大小 通常512
* 保持單例模式使用
*
*/
public class HashedWheelTimer implements Timer {
/**
* may be in spi?
*/
public static final String NAME = "hased";
private static final Logger logger = LoggerFactory.getLogger(HashedWheelTimer.class);
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
private static final int INSTANCE_COUNT_LIMIT = 64;
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
/** 真正執(zhí)行定時(shí)任務(wù)的邏輯封裝這個(gè) Runnable 對(duì)象中 */
private final Worker worker = new Worker();
/** 時(shí)間輪內(nèi)部真正執(zhí)行定時(shí)任務(wù)的線程 */
private final Thread workerThread;
/** worker狀態(tài)機(jī) */
private static final int WORKER_STATE_INIT = 0;
private static final int WORKER_STATE_STARTED = 1;
private static final int WORKER_STATE_SHUTDOWN = 2;
/**
* 0 - init, 1 - started, 2 - shut down
*/
@SuppressWarnings({"unused", "FieldMayBeFinal"})
private volatile int workerState;
// 每個(gè)tick的時(shí)間,時(shí)間輪精度
private final long tickDuration;
// 時(shí)間輪桶
private final HashedWheelBucket[] wheel;
// 掩碼合冀, mask = wheel.length - 1锄蹂,執(zhí)行 ticks & mask 便能定位到對(duì)應(yīng)的時(shí)鐘槽
private final int mask;
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
/** timeouts 隊(duì)列用于緩沖外部提交時(shí)間輪中的定時(shí)任務(wù) */
private final Queue<HashedWheelTimeout> timeouts = new LinkedBlockingQueue<>();
/** cancelledTimeouts 隊(duì)列用于暫存取消的定時(shí)任務(wù) */
private final Queue<HashedWheelTimeout> cancelledTimeouts = new LinkedBlockingQueue<>();
// 統(tǒng)計(jì)待定的Timeouts數(shù)量
private final AtomicLong pendingTimeouts = new AtomicLong(0);
private final long maxPendingTimeouts;
/** 當(dāng)前時(shí)間輪的啟動(dòng)時(shí)間,提交到該時(shí)間輪的定時(shí)任務(wù)的 deadline 字段值均以該時(shí)間戳為起點(diǎn)進(jìn)行計(jì)算 */
private volatile long startTime;
public HashedWheelTimer() {
this(Executors.defaultThreadFactory());
}
public HashedWheelTimer(long tickDuration, TimeUnit unit) {
this(Executors.defaultThreadFactory(), tickDuration, unit);
}
public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
}
public HashedWheelTimer(ThreadFactory threadFactory) {
this(threadFactory, 100, TimeUnit.MILLISECONDS);
}
public HashedWheelTimer(
ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
this(threadFactory, tickDuration, unit, 512);
}
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(threadFactory, tickDuration, unit, ticksPerWheel, -1);
}
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel,
long maxPendingTimeouts) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
// wheel大小處理為2的指數(shù)水慨,并創(chuàng)建時(shí)間輪——桶數(shù)組
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// Convert tickDuration to nanos.
this.tickDuration = unit.toNanos(tickDuration);
// Prevent overflow.
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
workerThread = threadFactory.newThread(worker);
this.maxPendingTimeouts = maxPendingTimeouts;
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
@Override
protected void finalize() throws Throwable {
try {
super.finalize();
} finally {
// This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
// we have not yet shutdown then we want to make sure we decrement the active instance count.
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
}
}
}
/**
* 創(chuàng)建時(shí)間輪——桶數(shù)組
* @param ticksPerWheel
* @return
*/
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(
"ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException(
"ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
}
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
private static int normalizeTicksPerWheel(int ticksPerWheel) {
int normalizedTicksPerWheel = ticksPerWheel - 1;
normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 1;
normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 2;
normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 4;
normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 8;
normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 16;
return normalizedTicksPerWheel + 1;
}
/**
* 顯式啟動(dòng)后臺(tái)線程
* 即使未調(diào)用此方法得糜,后臺(tái)線程也將根據(jù)需要自動(dòng)啟動(dòng)。
* 1. 確定時(shí)間輪的 startTime 字段晰洒;
* 2. 啟動(dòng) workerThread 線程朝抖,開始執(zhí)行 worker 任務(wù)
*
* @throws IllegalStateException if this timer has been {@linkplain #stop() stopped} already
*/
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
@Override
public Set<Timeout> stop() {
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from " +
TimerTask.class.getSimpleName());
}
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2.
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
}
return Collections.emptySet();
}
try {
boolean interrupted = false;
while (workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
INSTANCE_COUNTER.decrementAndGet();
}
// 返回未處理的Timeouts
return worker.unprocessedTimeouts();
}
@Override
public boolean isStop() {
return WORKER_STATE_SHUTDOWN == WORKER_STATE_UPDATER.get(this);
}
/**
* 新建Timeout,并加入緩沖隊(duì)列
*/
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
// 超過maxPendingTimeouts谍珊,直接拒絕
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
start();
// 將該timeout添加至timeouts隊(duì)列中治宣,下一個(gè)tick會(huì)進(jìn)行處理
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// 防止溢出
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
/**
* Returns the number of pending timeouts of this {@link Timer}.
*/
public long pendingTimeouts() {
return pendingTimeouts.get();
}
private static void reportTooManyInstances() {
String resourceType = ClassUtils.simpleClassName(HashedWheelTimer.class);
logger.error("You are creating too many " + resourceType + " instances. " +
resourceType + " is a shared resource that must be reused across the JVM," +
"so that only a few instances are created.");
}
}
Dubbo 中如何使用定時(shí)任務(wù)
在 Dubbo 中,時(shí)間輪并不直接用于周期性操作,而是只向時(shí)間輪提交執(zhí)行單次的定時(shí)任務(wù)侮邀,在上一次任務(wù)執(zhí)行完成的時(shí)候坏怪,調(diào)用 newTimeout() 方法再次提交當(dāng)前任務(wù),這樣就會(huì)在下個(gè)周期執(zhí)行該任務(wù)绊茧。即使在任務(wù)執(zhí)行過程中出現(xiàn)了 GC铝宵、I/O 阻塞等情況,導(dǎo)致任務(wù)延遲或卡住华畏,也不會(huì)有同樣的任務(wù)源源不斷地提交進(jìn)來鹏秋,導(dǎo)致任務(wù)堆積。
Dubbo 中對(duì)時(shí)間輪的應(yīng)用主要體現(xiàn)在如下兩個(gè)方面:
失敗重試:例如亡笑,Provider 向注冊(cè)中心進(jìn)行注冊(cè)失敗時(shí)的重試操作侣夷,或是 Consumer 向注冊(cè)中心訂閱時(shí)的失敗重試等。
周期性定時(shí)任務(wù):例如仑乌,定期發(fā)送心跳請(qǐng)求百拓,請(qǐng)求超時(shí)的處理,或是網(wǎng)絡(luò)連接斷開后的重連機(jī)制晰甚。
測(cè)試
@RunWith(SpringRunner.class)
@SpringBootTest
public class HashedWheelTimerTest {
private class PrintTask implements TimerTask {
@Override
public void run(Timeout timeout) {
final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
System.out.println("task :" + LocalDateTime.now().format(formatter));
}
}
@Test
public void newTimeout() throws InterruptedException {
final Timer timer = newTimer();
// 每隔1s向時(shí)間輪添加任務(wù)耐版。定時(shí)任務(wù)也是1s
for (int i = 0; i < 10; i++) {
timer.newTimeout(new PrintTask(), 3, TimeUnit.SECONDS);
System.out.println("task" + i + "added into the timer");
Thread.sleep(1000);
}
Thread.sleep(5000);
}
@Test
public void stop() throws InterruptedException {
final Timer timer = newTimer();
for (int i = 0; i < 10; i++) {
timer.newTimeout(new PrintTask(), 5, TimeUnit.SECONDS);
Thread.sleep(100);
}
//stop timer
timer.stop();
try {
//this will throw a exception
timer.newTimeout(new PrintTask(), 5, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
}
private Timer newTimer() {
// 100ms間隔的時(shí)間輪
return new HashedWheelTimer(
new NamedThreadFactory("dubbo-future-timeout", true),
100,
TimeUnit.MILLISECONDS);
}
}
參考:
https://developer.51cto.com/art/202010/628734.htm?mobile
https://blog.csdn.net/weixin_38308374/article/details/105862201
https://wangguoping.blog.csdn.net/article/details/108293948
https://blog.csdn.net/weixin_42588665/article/details/81865156