在jdk自帶的庫中,有兩種技術(shù)可以實現(xiàn)定時任務(wù)。一種是使用Timer,另外一個則是ScheduledThreadPoolExecutor。下面為大家分析一下這兩個技術(shù)的底層實現(xiàn)原理以及各自的優(yōu)缺點荡陷。
一侯谁、Timer
1. Timer的使用
class MyTask extends TimerTask{
@Override
public void run() {
System.out.println("hello world");
}
}
public class TimerDemo {
public static void main(String[] args) {
//創(chuàng)建定時器對象
Timer t=new Timer();
//在3秒后執(zhí)行MyTask類中的run方法,后面每10秒跑一次
t.schedule(new MyTask(), 3000,10000);
}
}
通過往Timer提交一個TimerTask的任務(wù)妥曲,同時指定多久后開始執(zhí)行以及執(zhí)行周期垦写,我們可以開啟一個定時任務(wù)。
2. 源碼解析
首先我們先來看一下Timer這個類
//存放定時任務(wù)的隊列
//這個TaskQueue 也是Timer內(nèi)部自定義的一個隊列,這個隊列通過最小堆來維護(hù)隊列
//下一次執(zhí)行時間距離現(xiàn)在最小的會被放在堆頂碑幅,到時執(zhí)行線程直接獲取堆頂任務(wù)并判斷是否執(zhí)行即可
private final TaskQueue queue = new TaskQueue();
//負(fù)責(zé)執(zhí)行定時任務(wù)的線程
private final TimerThread thread = new TimerThread(queue);
public Timer() {
this("Timer-" + serialNumber());
}
public Timer(String name) {
//設(shè)置線程的名字,并且啟動這個線程
thread.setName(name);
thread.start();
}
再來看一下TimerThread 這個類戴陡,這個類也是定義在Timer.class中的一個類,它繼承了Thread類,所以可以直接拿來當(dāng)線程使用。
我們直接來看他的構(gòu)造方法以及run方法
//在Timer中初始化的時候會將Timer的Queue賦值進(jìn)來
TimerThread(TaskQueue queue) {
this.queue = queue;
}
public void run() {
try {
//進(jìn)入自旋,開始不斷的從任務(wù)隊列中獲取定時任務(wù)來執(zhí)行
mainLoop();
} finally {
// Someone killed this Thread, behave as if Timer cancelled
synchronized(queue) {
newTasksMayBeScheduled = false;
queue.clear(); // Eliminate obsolete references
}
}
}
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
//加同步
synchronized(queue) {
//如果任務(wù)隊列為空,并且newTasksMayBeScheduled為true,就休眠等待,直到有任務(wù)進(jìn)來就會喚醒這個線程
//如果有人調(diào)用timer的cancel方法沟涨,newTasksMayBeScheduled會變成false
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
if (queue.isEmpty())
break;
// 獲取當(dāng)前時間和下次任務(wù)執(zhí)行時間
long currentTime, executionTime;
//獲取隊列中最早要執(zhí)行的任務(wù)
task = queue.getMin();
synchronized(task.lock) {
//如果這個任務(wù)已經(jīng)被結(jié)束了猜欺,就從隊列中移除
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue; // No action required, poll queue again
}
//獲取當(dāng)前時間和下次任務(wù)執(zhí)行時間
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
//判斷任務(wù)執(zhí)行時間是否小于當(dāng)前時間,表示小于,就說明可以執(zhí)行了
if (taskFired = (executionTime<=currentTime)) {
//如果任務(wù)的執(zhí)行周期是0,說明只要執(zhí)行一次就好了,就從隊列中移除它,這樣下一次就不會獲取到該任務(wù)了
if (task.period == 0) {
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else {
//重新設(shè)置該任務(wù)下一次的執(zhí)行時間
//如果之前設(shè)置的period小于0,就用當(dāng)前時間-period,等于就是當(dāng)前時間加上周期值
//這里的下次執(zhí)行時間就是當(dāng)前的執(zhí)行時間加上周期值
//這里涉及到是否以固定頻率調(diào)用任務(wù)的問題,下面再詳細(xì)講解
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
//如果任務(wù)的執(zhí)行時間還沒到拷窜,就計算出還有多久才到達(dá)執(zhí)行時間,然后線程進(jìn)入休眠
if (!taskFired)
queue.wait(executionTime - currentTime);
}
//如果任務(wù)的執(zhí)行時間到了,就執(zhí)行這個任務(wù)
if (taskFired)
task.run();
} catch(InterruptedException e) {
}
}
}
通過上面的代碼,我們大概了解了Timer是怎么工作的了。下面來看一下schedule()方法的相關(guān)代碼
//Timer.java
public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
//調(diào)用內(nèi)部的一個方法
sched(task, System.currentTimeMillis()+delay, -period);
}
private void sched(TimerTask task, long time, long period) {
if (time < 0)
throw new IllegalArgumentException("Illegal execution time.");
// 如果設(shè)定的定時任務(wù)周期太長,就將其除以2
if (Math.abs(period) > (Long.MAX_VALUE >> 1))
period >>= 1;
//加鎖同步
synchronized(queue) {
if (!thread.newTasksMayBeScheduled)
throw new IllegalStateException("Timer already cancelled.");
//設(shè)置任務(wù)的各個屬性
synchronized(task.lock) {
if (task.state != TimerTask.VIRGIN)
throw new IllegalStateException(
"Task already scheduled or cancelled");
task.nextExecutionTime = time;
task.period = period;
task.state = TimerTask.SCHEDULED;
}
//將任務(wù)加入到隊列中
queue.add(task);
//如果任務(wù)加入隊列后排在堆頂篮昧,說明該任務(wù)可能馬上可以執(zhí)行了,那就喚醒執(zhí)行線程
if (queue.getMin() == task)
queue.notify();
}
}
3. 總結(jié)
Timer的原理比較簡單,當(dāng)我們初始化Timer的時候,timer內(nèi)部會啟動一個線程赋荆,并且初始化一個優(yōu)先級隊列,該優(yōu)先級隊列使用了最小堆的技術(shù)來將最早執(zhí)行時間的任務(wù)放在堆頂懊昨。
當(dāng)我們調(diào)用schedule方法的時候,其實就是生成一個任務(wù)然后插入到該優(yōu)先級隊列中窄潭。最后,timer內(nèi)部的線程會從優(yōu)先級隊列的堆頂獲取任務(wù)酵颁,獲取到任務(wù)后嫉你,先判斷執(zhí)行時間是否到了,如果到了先設(shè)置下一次的執(zhí)行時間并調(diào)整堆躏惋,然后執(zhí)行任務(wù)幽污。如果沒到執(zhí)行時間那線程就休眠一段時間。
關(guān)于計算下次任務(wù)執(zhí)行時間的策略:
這里設(shè)置下一次執(zhí)行時間的算法會根據(jù)傳入peroid的值來判斷使用哪種策略:
- 如果peroid是負(fù)數(shù),那下一次的執(zhí)行時間就是當(dāng)前時間+peroid的值
- 如果peroid是正數(shù)簿姨,那下一次執(zhí)行時間就是該任務(wù)這次的執(zhí)行時間+peroid的值距误。
這兩個策略的不同點在于,如果計算下次執(zhí)行時間是以當(dāng)前時間為基數(shù)扁位,那它就不是以固定頻率來執(zhí)行任務(wù)的准潭。因為Timer是單線程執(zhí)行任務(wù)的,如果A任務(wù)執(zhí)行周期是10秒,但是有個B任務(wù)執(zhí)行了20幾秒域仇,那么下一次A任務(wù)的執(zhí)行時間就要等B執(zhí)行完后輪到自己時刑然,再過10秒才會執(zhí)行下一次。
如果策略是這次任務(wù)的執(zhí)行時間+peroid的值就是按固定頻率不斷執(zhí)行任務(wù)了暇务。讀者可以自行模擬一下
二泼掠、ScheduledThreadPoolExecutor
1. 使用
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(8);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("hello world");
}
}, 1, 3, TimeUnit.SECONDS);
2. 實現(xiàn)原理+源碼解析
由于ScheduledThreadPoolExecutor是基于線程池實現(xiàn)的。所以了解它的原理之前讀者有必要先了解一下Java線程池的實現(xiàn)般卑。關(guān)于Java線程池的實現(xiàn)原理,可以看我的另外一篇博客:Java線程池實現(xiàn)原理詳解
我們直接來看一下的源碼
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//將任務(wù)進(jìn)行一層封裝,最后得到一個ScheduledFutureTask對象
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
//進(jìn)行一些裝飾,其實就是返回sft這個對象
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//提交給線程池執(zhí)行
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果線程池已經(jīng)關(guān)閉,就拒絕這個任務(wù)
if (isShutdown())
reject(task);
else {
//將當(dāng)前任務(wù)加入到任務(wù)隊列中去
super.getQueue().add(task);
//判斷線程池是否關(guān)閉了,然后判斷是否需要移除這個任務(wù)
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//因為這里的定時任務(wù)是直接放到任務(wù)隊列中,所以需要保證已經(jīng)有worker啟動了
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
//如果worker的數(shù)量小于corePoolSize,那就啟動一個worker,用來消費(fèi)任務(wù)隊列的任務(wù)
if (wc < corePoolSize)
addWorker(null, true);
//worker的數(shù)量為0也直接啟動一個worker
else if (wc == 0)
addWorker(null, false);
}
到這里,我們可以看到我們提交的任務(wù)被封裝成一個ScheduledFutureTask然后提交給任務(wù)隊列武鲁,同時如果發(fā)現(xiàn)worker的數(shù)量少于設(shè)置的corePoolSize,我們還會啟動一個worker線程蝠检。
但是沐鼠,我們怎么保證worker不會馬上就從任務(wù)隊列中獲取任務(wù)然后直接執(zhí)行呢(這樣我們設(shè)定的延遲執(zhí)行就沒有效果了)?
另外叹谁,怎么保證任務(wù)執(zhí)行完下一次在一定周期后還會再執(zhí)行呢饲梭,也就是怎么保證任務(wù)的延遲執(zhí)行和周期執(zhí)行?
我們先來看一下任務(wù)的延遲執(zhí)行的解決方案焰檩。其實就是修改任務(wù)隊列的實現(xiàn)憔涉,通過將任務(wù)隊列變成延遲隊列,worker不會馬上獲取到任務(wù)隊列中的任務(wù)了析苫。只有任務(wù)的時間到了兜叨,worker線程才能從延遲隊列中獲取到任務(wù)并執(zhí)行穿扳。
在ScheduledThreadPoolExecutor中,定義了DelayedWorkQueue類來實現(xiàn)延遲隊列国旷。DelayedWorkQueue內(nèi)部使用了最小堆的數(shù)據(jù)結(jié)構(gòu)矛物,當(dāng)任務(wù)插入到隊列中時,會根據(jù)執(zhí)行的時間自動調(diào)整在堆中的位置跪但,最后執(zhí)行時間最近的那個會放在堆頂痘绎。
當(dāng)worker要去隊列獲取任務(wù)時饱岸,如果堆頂?shù)膱?zhí)行時間還沒到饮亏,那么worker就會阻塞一定時間后才能獲取到那個任務(wù)慨绳,這樣就實現(xiàn)了任務(wù)的延遲執(zhí)行。
由于篇幅問題被环,DelayedWorkQueue的源碼就不作解析了,有興趣的朋友可以去ScheduledThreadPoolExecutor類中查閱糙及。
解決了任務(wù)的延遲執(zhí)行問題,接下來就是任務(wù)的周期執(zhí)行的解決方案了蛤售。周期執(zhí)行和前面封裝的ScheduledFutureTask有關(guān)丁鹉。我們直接來看一下ScheduledFutureTask的run方法就知道了
public void run() {
//先判斷任務(wù)是否周期執(zhí)行
boolean periodic = isPeriodic();
//判斷是否能執(zhí)行任務(wù)
if (!canRunInCurrentRunState(periodic))
cancel(false);
//判斷是否周期性任務(wù)
else if (!periodic)
//不是的話執(zhí)行執(zhí)行run方法
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
//如果是周期性任務(wù),那就設(shè)置下一次的執(zhí)行時間
setNextRunTime();
//重新將任務(wù)放到隊列中,然后等待下一次執(zhí)行
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
//根據(jù)peroid的正負(fù)來判斷下一次執(zhí)行時間的計算策略
//和timer的下一次執(zhí)行時間計算策略有點像
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
//先判斷是否可以在當(dāng)前狀態(tài)下執(zhí)行
if (canRunInCurrentRunState(true)) {
//重新加任務(wù)放到任務(wù)隊列中
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
從源碼可以看出,當(dāng)任務(wù)執(zhí)行完后,如果該任務(wù)時周期性任務(wù)悴能,那么會重新計算下一次執(zhí)行時間揣钦,然后重新放到任務(wù)隊列中等待下一次執(zhí)行。
3. 總結(jié)
ScheduledThreadPoolExecutor的實現(xiàn)是基于java線程池漠酿。通過對任務(wù)進(jìn)行一層封裝來實現(xiàn)任務(wù)的周期執(zhí)行冯凹,以及將任務(wù)隊列改成延遲隊列來實現(xiàn)任務(wù)的延遲執(zhí)行。
我們將任務(wù)放入任務(wù)隊列的同時炒嘲,會嘗試開啟一個worker來執(zhí)行這個任務(wù)(如果當(dāng)前worker的數(shù)量小于corePoolSize)宇姚。由于這個任務(wù)隊列時一個延遲隊列,只有任務(wù)執(zhí)行時間達(dá)到才能獲取到任務(wù)夫凸,因此worker只能阻塞等到有隊列中有任務(wù)到達(dá)才能獲取到任務(wù)執(zhí)行浑劳。
當(dāng)任務(wù)執(zhí)行完后,會檢查自己是否是一個周期性執(zhí)行的任務(wù)夭拌。如果是的話魔熏,就會重新計算下一次執(zhí)行的時間,然后重新將自己放入任務(wù)隊列中鸽扁。
關(guān)于下一次任務(wù)的執(zhí)行時間的計算規(guī)則蒜绽,和Timer差不多,這里就不多做介紹桶现。
三躲雅、Timer和ScheduledThreadPoolExecutor的區(qū)別
由于Timer是單線程的,如果一次執(zhí)行多個定時任務(wù),會導(dǎo)致某些任務(wù)被其他任務(wù)所阻塞骡和。比如A任務(wù)每秒執(zhí)行一次相赁,B任務(wù)10秒執(zhí)行一次相寇,但是一次執(zhí)行5秒,就會導(dǎo)致A任務(wù)在長達(dá)5秒都不會得到執(zhí)行機(jī)會噪生。而ScheduledThreadPoolExecutor是基于線程池的裆赵,可以動態(tài)的調(diào)整線程的數(shù)量,所以不會有這個問題
如果執(zhí)行多個任務(wù)跺嗽,在Timer中一個任務(wù)的崩潰會導(dǎo)致所有任務(wù)崩潰,從而所有任務(wù)都停止執(zhí)行页藻。而ScheduledThreadPoolExecutor則不會桨嫁。
Timer的執(zhí)行周期時間依賴于系統(tǒng)時間,timer中份帐,獲取到堆頂任務(wù)執(zhí)行時間后璃吧,如果執(zhí)行時間還沒到,會計算出需要休眠的時間=(執(zhí)行時間-系統(tǒng)時間),如果系統(tǒng)時間被調(diào)整废境,就會導(dǎo)致休眠時間無限拉長畜挨,后面就算改回來了任務(wù)也因為在休眠中而得不到執(zhí)行的機(jī)會。ScheduledThreadPoolExecutor由于用是了nanoTime來計算執(zhí)行周期的,所以和系統(tǒng)時間是無關(guān)的,無論系統(tǒng)時間怎么調(diào)整都不會影響到任務(wù)調(diào)度噩凹。
注意的是,nanoTime和系統(tǒng)時間是完全無關(guān)的(之前一直以為只是時間戳的納秒級粒度),關(guān)于nanoTime的介紹如下:
返回最準(zhǔn)確的可用系統(tǒng)計時器的當(dāng)前值巴元,以毫微秒為單位。
此方法只能用于測量已過的時間驮宴,與系統(tǒng)或鐘表時間的其他任何時間概念無關(guān)逮刨。返回值表示從某一固定但任意的時間算起的毫微秒數(shù)(或許從以后算起,所以該值可能為負(fù))堵泽。此方法提供毫微秒的精度修己,但不是必要的毫微秒的準(zhǔn)確度。它對于值的更改頻率沒有作出保證迎罗。在取值范圍大于約 292 年(263 毫微秒)的連續(xù)調(diào)用的不同點在于:由于數(shù)字溢出睬愤,將無法準(zhǔn)確計算已過的時間。
總體來說,Timer除了在版本兼容性上面略勝一籌以外(Timer是jdk1.3就支持的纹安,而ScheduledThreadPoolExecutor在jdk1.5才出現(xiàn))尤辱,其余全部被ScheduledThreadPoolExecutor碾壓。所以日常技術(shù)選型中钻蔑,也推薦使用ScheduledThreadPoolExecutor來實現(xiàn)定時任務(wù)啥刻。
最后,如果哪里有寫的不對或者有疑惑的地方咪笑,歡迎評論或者郵件我可帽。對Java各種技術(shù)有興趣的也可以加我互相交流。