基本用法
class Main {
public static void main(String[] args) {
ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(1);
poolExecutor.scheduleAtFixedRate(new task(), 5, 10, TimeUnit.SECONDS);
}
public static class task implements Runnable {
@Override
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
帶著問題看源碼
實戰(zhàn)案例
之前寫過的一段原生Java定期執(zhí)行任務(wù)的代碼 設(shè)想效果是每天早上八點跑一次任務(wù)
寫上面那段代碼時碰到兩個問題
一迹缀、ScheduledThreadPool好像還挺復雜 比起直接用while(true) sleep 用 ScheduledThreadPool這種方式有什么性能優(yōu)勢嗎
二贮勃、因為數(shù)據(jù)量很大可能耗時很久 當設(shè)置每天8點執(zhí)行一次 如果到第二天8點 上一次的任務(wù)還沒執(zhí)行完成 會發(fā)生什么
類結(jié)構(gòu)
實現(xiàn)了ScheduledExecutorService接口并繼承了ThreadPoolExecutor
同樣是出自熟悉的狗哥之手
先拖到最后看下本次目標多少 也不多 加上注釋以及最后一行留白一共1284行
提交任務(wù)
提交任務(wù)的三種方式
- pool.schedule 提交一次性任務(wù)
- pool.scheduleWithFixedDelay 提交周期任務(wù)(固定時間間隔)
- pool.scheduleAtFixedRate 提交周期任務(wù)(固定執(zhí)行時間)
提交任務(wù)方式一
提交任務(wù)方式二
與上一個方法基本一致 兩處不同
1)多了一步outerTask賦值
2)多傳了一個參數(shù) delay
執(zhí)行延遲時間
提交任務(wù)方式三
與方式二代碼幾乎一模一樣 (找不同 重要伏筆)
一個想法:這里是不是可以用個方法重載減少代碼重復呢
接下來看三種提交方式都有的核心方法:delayedExcute
( ) 將任務(wù)加入父類隊列
任務(wù)最終被加到了父類ThreadPoolExecutor
中的workQueue
工作隊列中
提交階段總結(jié):提交任務(wù)方法完成了兩件事:
1)把用戶傳入的runnable任務(wù)封裝成一個ScheduledFutureTask
內(nèi)部類對象
2)將該ScheduledFutureTask添加到父類ThreadPoolExecutor
的workQueue
工作隊列中
執(zhí)行任務(wù)
用戶提交的任務(wù)被封裝成ScheduledFutureTask
該內(nèi)部類中有一個run方法用于執(zhí)行任務(wù)
內(nèi)部類ScheduledFutureTask源碼
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
//任務(wù)添加到ScheduledThreadPoolExecutor中被分配的唯一序列號
private final long sequenceNumber;
//該任務(wù)的執(zhí)行時間
private long time;
//該任務(wù)的循環(huán)周期(延遲時間)
private final long period;
//重新入隊的實際任務(wù) 實際指向當前對象本身
RunnableScheduledFuture<V> outerTask = this;
//當前任務(wù)在延遲隊列中的索引
//能夠更加方便的取消當前任務(wù)
int heapIndex;
//構(gòu)造方法1
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
//構(gòu)造方法2
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
//構(gòu)造方法3
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
//該任務(wù)還有多久執(zhí)行
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
//比較兩個任務(wù)誰先執(zhí)行
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
//如果下次執(zhí)行任務(wù)的時間相同,則會比較任務(wù)的sequenceNumber值,
//sequenceNumber值小的任務(wù)會排在前面叠洗。
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
//是否周期性任務(wù)
public boolean isPeriodic() {
return period != 0;
}
// 設(shè)置下一次的執(zhí)行時間
// time 為上一次任務(wù)的開始執(zhí)行時間
// p > 0 說明是提交方式二 固定頻率執(zhí)行 time += p 在上一次開始時間的基礎(chǔ)上計算下一次執(zhí)行時間
// p < 0 說明是提交方式三 固定延遲執(zhí)行 轉(zhuǎn)回正數(shù)之后調(diào)用一次triggerTime 上次任務(wù)執(zhí)行完畢now() + 執(zhí)行周期p 而不是time+p
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
//overflowFree方法 防止溢出 任務(wù)運行足夠久這個long是有可能溢出
long triggerTime(long delay) {
// 上次任務(wù)執(zhí)行完畢now() + delay
return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
public void run() {
boolean periodic = isPeriodic();
//檢查當前狀態(tài)是否能運行 不能運行則取消任務(wù)
if (!canRunInCurrentRunState(periodic))
cancel(false);
//如果能運行 則再判斷是否周期性任務(wù)
else if (!periodic)
//不是周期性任務(wù)則直接調(diào)用run方法執(zhí)行一次就行了
ScheduledFutureTask.super.run();
//如果是需要周期性執(zhí)行的任務(wù) 則調(diào)用runAndReset方法
//runAndReset返回true代表執(zhí)行成功了 然后計算并設(shè)置出下一次的執(zhí)行時間
else if (ScheduledFutureTask.super.runAndReset()) {
// 設(shè)置下一次執(zhí)行時間
setNextRunTime();
// 將該任務(wù)重新放入隊列
reExecutePeriodic(outerTask);
}
}
}
核心方法:FutureTask.runAndReset()
// 執(zhí)行任務(wù)并忽略任務(wù)返回值愧膀,執(zhí)行完畢后將該任務(wù)重置為初始狀態(tài),
// 如果計算遇到異潮肷迹或被取消毅往,則無法重置回初始狀態(tài)
// 該方法用于需要重復執(zhí)行的場景
// 如果成功運行并重置,則返回true
protected boolean runAndReset() {
//如果狀態(tài)為NEW則接著會通過unsafe類把任務(wù)執(zhí)行線程引用CAS的保存在runner字段中 如果保存失敗派近,則直接返回攀唯;
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
//當成員變量callable不為null且狀態(tài)為NEW則可以執(zhí)行
if (c != null && s == NEW) {
try {
c.call(); // don't set result
//任務(wù)正常執(zhí)行完畢
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
//執(zhí)行完畢之后將runner執(zhí)行線程置為null
runner = null;
s = state;
//當執(zhí)行狀態(tài)為中斷 處理中斷
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
是否是周期性任務(wù)的判斷依據(jù)是看period成員變量是否為0 只有提交方式一沒傳這個參數(shù) 默認值0
計算下一次的執(zhí)行時間時就用到了period的正負符號了
到這里開頭的第二個問題似乎能解決了 由于我使用的是方式二scheduleAtFixedRate
p > 0 走的是 time += p (上一次的開始時間+delay )假如開始時間是前一天早上8點
那么算出的下一次time確實是第二天8點
但是實際上第二天9點上一個任務(wù)才執(zhí)行完畢
所以第二天8點 上一個任務(wù)還沒完成
此時會重新啟動一個任務(wù) 兩個任務(wù)一起跑榕栏?
還是說只有前一次執(zhí)行完畢才會處理下一次任務(wù) 然后拿到time發(fā)現(xiàn)已經(jīng)錯過了第二天8點 然后略過第二次任務(wù) 等第三天8點再執(zhí)行噪窘?
想要回答這個問題 關(guān)鍵還得看看從延遲隊列中取出任務(wù)并執(zhí)行的那段邏輯是怎么寫的
等不及了 直接劇透 實際測試一把
測試結(jié)果顯示 問題二場景 最后結(jié)果既不是錯過第二天8點直接第三天8點執(zhí)行 也不是第二天8點兩個任務(wù)并行執(zhí)行 而是的等到第一個任務(wù)9點執(zhí)行完畢之后立即執(zhí)行第二次任務(wù)
執(zhí)行階段總結(jié):當定時任務(wù)屬于周期性任務(wù) 那么執(zhí)行任務(wù)的run方法會在任務(wù)執(zhí)行完畢后 計算出下一次的執(zhí)行時間 并將該任務(wù)重新放回DelayWorkQueue
延遲隊列
任務(wù)調(diào)度
上面我們看了任務(wù)的run方法 那么是誰去調(diào)用這個run方法 又是什么時候去調(diào)用呢
延遲阻塞隊列DelayedWorkQueue
中放的元素是ScheduledFutureTask
,提交的任務(wù)被包裝成ScheduledFutureTask
放進工作隊列询张,Woker
工作線程消費工作隊列中的任務(wù),即調(diào)用ScheduledFutureTask.run()
戒幔,ScheduledFutureTask
又調(diào)用任務(wù)的run()
吠谢,這點和ThreadPoolExecutor
差不多,而ScheduledThreadPoolExecutor
是如何實現(xiàn)按時間調(diào)度的呢诗茎?
猜測可能是有一個專門負責調(diào)度的額外線程在監(jiān)聽隊列中存儲的任務(wù)的執(zhí)行時間是否到了 到了的話就取出任務(wù)調(diào)用run方法(后面發(fā)現(xiàn)猜錯了 狗哥使用了一種更加優(yōu)秀的設(shè)計)
DelayedWorkQueue
源碼
DelayedWorkQueue
實現(xiàn)自BlockingQueue
是高度定制化的阻塞隊列+優(yōu)先隊列 其核心數(shù)據(jù)結(jié)構(gòu)是二叉最小堆的優(yōu)先隊列工坊,隊列滿時會自動擴容,所以offer操作永遠不會阻塞敢订,maximumPoolSize也就用不上了王污,所以線程池中永遠會保持至多有corePoolSize個工作線程正在運行。
類結(jié)構(gòu)
//位于ScheduledThreadPoolExecutor中的一個內(nèi)部類
//一種定制化的延遲隊列+優(yōu)先隊列 只支持放入RunnableScheduledFutures
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
//DelayedWorkQueue 基于基于堆的數(shù)據(jù)結(jié)構(gòu)
//類似于 DelayQueue 和 PriorityQueue 中的數(shù)據(jù)結(jié)構(gòu)枢析,
//除了每個 ScheduledFutureTask 還將其索引記錄到堆數(shù)組中玉掸。
//這消除了在取消時查找任務(wù)的需要,大大加快了刪除速度(從 O(n)下降到 O(log n))醒叁,
//并減少了垃圾保留司浪,否則會因等待元素上升到頂部而發(fā)生清除前。
//但是因為隊列也可能包含 RunnableScheduledFutures 不是 ScheduledFutureTasks把沼,
//我們不能保證有這樣的索引可用啊易,在這種情況下我們回退到線性搜索。
//(我們希望大多數(shù)任務(wù)不會被修飾饮睬,并且速度更快的情況會更常見租谈。)
//所有堆操作都必須記錄索引更改——主要是在 siftUp 和 siftDown 中。
//刪除后捆愁,任務(wù)的heapIndex 設(shè)置為 -1割去。請注意,ScheduledFutureTasks
//最多可以在隊列中出現(xiàn)一次(對于其他類型的任務(wù)或工作隊列不需要如此)昼丑,
//因此由 heapIndex 唯一標識呻逆。
// 初始容量
private static final int INITIAL_CAPACITY = 16;
// 隊列
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
// 可重入鎖 用于隊列元素操作時加鎖
private final ReentrantLock lock = new ReentrantLock();
//隊列容量
private int size = 0;
//leader線程
private Thread leader = null;
//線程狀態(tài)控制器
private final Condition available = lock.newCondition();
private void siftUp(int k, RunnableScheduledFuture<?> key) {...}
private void siftDown(int k, RunnableScheduledFuture<?> key) {...}
public boolean offer(Runnable x) {...}
public RunnableScheduledFuture<?> take() throws InterruptedException {...}
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit){...}
...
}
核心成員變量:Condition available
關(guān)鍵字synchronize可以與wait()和nitify()方法相結(jié)合實現(xiàn)實現(xiàn)等待/通知模式 類ReentrantLock也可以實現(xiàn)同樣的功能,但需要借助condition對象 condition類是在JDK5中出現(xiàn)的技術(shù)菩帝,使用他有更好的靈活性咖城,比如可以實現(xiàn)多路通知功能
,也就是在一個Lock對象里可以創(chuàng)建多個condition實例呼奢,線程對象可以注冊在指定的condition中從而選擇性的進行線程通知宜雀,在調(diào)度線程上更加靈活。
- 調(diào)用Condition的await()和signal()方法握础,都必須在lock保護之內(nèi)辐董,就是說必須在lock.lock()和lock.unlock之間才可以使用
- Conditon中的await()對應(yīng)Object的wait();
- Condition中的signal()對應(yīng)Object的notify()禀综;
核心成員變量:Thread leader
指定等待隊列頭部任務(wù)的線程郎哭。 Leader-Follower 模式的變體他匪。用于減少不必要的定時等待。當一個線程成為領(lǐng)導者時夸研,它只等待下一個任務(wù)的延遲時間是否到達,而其他線程則無限期地等待依鸥。領(lǐng)導線程必須在從 take() 或 poll(...) 返回之前向某個其他線程發(fā)出信號亥至,除非某個其他線程在此期間成為領(lǐng)導。 每當隊列的頭部被具有較早到期時間的任務(wù)替換時贱迟,leader 字段將通過重置為空而無效姐扮,并且一些等待線程(但不一定是當前的leader)被發(fā)出信號。因此衣吠,等待線程必須準備好在等待期間獲得和失去領(lǐng)導權(quán)
核心方法:offer添加元素
ScheduledThreadPoolExecutor
提交任務(wù)時調(diào)用的是DelayedWorkQueue.add
茶敏,而add
、put
等一些對外提供的添加元素的方法都調(diào)用了offer
缚俏,其基本流程如下:
- 其作為生產(chǎn)者的入口惊搏,首先獲取鎖。
- 判斷隊列是否要滿了(
size >= queue.length
)忧换,滿了就擴容grow()
恬惯。 - 隊列未滿,size+1亚茬。
- 判斷添加的元素是否是第一個酪耳,是則不需要堆化。
- 添加的元素不是第一個刹缝,則需要堆化
siftUp
碗暗。 - 如果堆頂元素剛好是此時被添加的元素,則喚醒take線程消費梢夯。
- 最終釋放鎖言疗。
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
//擴容
grow();
size = i + 1;
if (i == 0) {
//如果是入的是第一個元素,不需要堆化
queue[0] = e;
setIndex(e, 0);
} else {
//堆化
siftUp(i, e);
}
if (queue[0] == e) {
//如果堆頂元素剛好是入隊列的元素厨疙,則喚醒take
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
核心方法:siftUp向上堆化
新添加的元素先會加到堆底洲守,然后一步步和上面的父親節(jié)點比較,若小于父親節(jié)點則和父親節(jié)點互換位置沾凄,循環(huán)比較直至大于父親節(jié)點才結(jié)束循環(huán)梗醇。
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
//找到父親節(jié)點
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
// 添加的元素 大于父親節(jié)點,則結(jié)束循環(huán)
break;
//添加的元素小于父親節(jié)點撒蟀,則位置互換
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
核心方法:take消費元素
Worker工作線程啟動后就會調(diào)用take方法循環(huán)消費工作隊列中的元素
take基本流程如下:
- 首先獲取可中斷鎖叙谨,判斷堆頂元素是否是空,空的則阻塞等待
available.await()
保屯。 - 堆頂元素不為空手负,則獲取其延遲執(zhí)行時間
delay
涤垫,delay <= 0
說明到了執(zhí)行時間,出隊列finishPoll
竟终。 -
delay > 0
還沒到執(zhí)行時間蝠猬,判斷leader
線程是否為空,不為空則說明有其他take線程也在等待统捶,當前take將無限期阻塞等待榆芦。 -
leader
線程為空,當前take線程設(shè)置為leader
喘鸟,并阻塞等待delay
時長匆绣。 - 當前l(fā)eader線程等待delay時長自動喚醒護著被其他take線程喚醒,則最終將
leader
設(shè)置為null
什黑。 - 再循環(huán)一次判斷
delay <= 0
出隊列崎淳。 - 跳出循環(huán)后判斷l(xiāng)eader為空并且堆頂元素不為空,則喚醒其他take線程愕把,最后是否鎖拣凹。
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0]; //取出堆頂元素
if (first == null)
//堆為空,等待
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
//到了執(zhí)行時間礼华,出隊列
return finishPoll(first);
first = null; // don't retain ref while waiting
//還沒到執(zhí)行時間
if (leader != null)
//此時若有其他take線程在等待咐鹤,當前take將無限期等待
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
核心方法:finishPoll出隊列
堆頂元素delay<=0,執(zhí)行時間到圣絮,出隊列就是一個向下堆化的過程siftDown
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
核心方法:siftDown向下堆化
由于堆頂元素出隊列后祈惶,就破壞了堆的結(jié)構(gòu),需要組織整理下扮匠,將堆尾元素移到堆頂捧请,然后向下堆化:
- 從堆頂開始,父親節(jié)點與左右子節(jié)點中較小的孩子節(jié)點比較(左孩子不一定小于右孩子)棒搜。
- 父親節(jié)點小于等于較小孩子節(jié)點疹蛉,則結(jié)束循環(huán),不需要交換位置力麸。
- 若父親節(jié)點大于較小孩子節(jié)點可款,則交換位置。
- 繼續(xù)向下循環(huán)判斷父親節(jié)點和孩子節(jié)點的關(guān)系克蚂,直到父親節(jié)點小于等于較小孩子節(jié)點才結(jié)束循環(huán)闺鲸。
private void siftDown(int k, RunnableScheduledFuture<?> key) {
//k = 0, key = queue[size-1]
//無符號右移,相當于size/2
int half = size >>> 1;
while (k < half) {
//只需要比較一半
//找到左孩子節(jié)點
// child = 2k + 1
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
//右孩子節(jié)點
int right = child + 1;
//比較左右孩子大小
if (right < size && c.compareTo(queue[right]) > 0)
//c左孩子大于右孩子埃叭,則將有孩子賦值給左孩子
c = queue[child = right];
//比較key和孩子c
if (key.compareTo(c) <= 0)
//key小于等于c摸恍,則結(jié)束循環(huán)
break;
//key大于孩子c,則key與孩子交換位置
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
核心方法:compareTo比較器
延遲隊列是也是個優(yōu)先隊列 每次元素出隊和入隊也都需要經(jīng)過堆化調(diào)整順序
根據(jù)任務(wù)到期時間從小到大排序 通過調(diào)用compareTo
方法完成大小判斷
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
ScheduledThreadPool
中的Leader-Follower
模式
定時調(diào)度線程池與一般線程池的一個重要不同:提交的任務(wù)是延遲而非立即執(zhí)行的,因此worker線程調(diào)用隊列的take以取出執(zhí)行任務(wù)必定是要阻塞的立镶”诎溃考慮到延遲任務(wù)必然有一個先后順序 因此只需要一個線程使用timed waiting等待隊首任務(wù)(堆頂)即可
ScheduledThreadPool
采用了Leader-Follower
模式,等待第一個線程的任務(wù)也稱為leader媚媒,其調(diào)用available.awaitNanos待當前隊列頭部任務(wù)到達調(diào)度時間時喚醒嗜逻。其余線程作為follower只需調(diào)用await方法無限阻塞等待,直至被leader喚醒缭召,并重新完成搶鎖->嘗試執(zhí)行隊列首元素->搶leader->等待的循環(huán)变泄。
舉個例子來理解Leader-Follower
模式,拿飯店員工來對照理解恼琼。
- 單Reactor多線程模式 飯店的員工一般都是分角色的,比如接待員屏富、服務(wù)員晴竞、廚師等等。
- 假如有一個叫做A的人狠半,固定他作為飯店接待員噩死,來客人了就分給客人一個座位號,然后交給其他服務(wù)員神年,比如B進行后續(xù)處理已维。
- B會根據(jù)座位號為客人引路,為客人點菜等等已日。
- 如果把A垛耳、B比作兩個線程,客人比作任務(wù)飘千,任務(wù)由A處理堂鲜,到交接給B處理,有一次線程上下文切換护奈。
- Leader-Follower模式 這次飯店不分角色了缔莲,每個人都是接待員和服務(wù)員,統(tǒng)稱為員工霉旗。
- 每次只能有一個員工在門口等待痴奏,比如A先在門口等待,其他員工在屋里歇著厌秒。
- 來客人了的話读拆,A會叫一個其他員工,比如B來門口接替自己简僧。
- 然后A開始為客人服務(wù)建椰,比如分配座位號,引路岛马,點菜等全流程服務(wù)棉姐。
- 拿線程來說的話屠列,就是接受任務(wù),處理任務(wù)都是由線程A負責伞矩,沒有線程上下文切換笛洛。
- DelayQueue的Leader-Follower模式 這次飯店也不分角色,都是員工乃坤,但是改變了經(jīng)營策略苛让,每個客人必須預(yù)約吃飯時間,預(yù)約采用APP預(yù)約湿诊。因為加入了延時狱杰,邏輯變得復雜了一些。
- 每次還是只能有一個員工在門口等待厅须,比如A先在門口等待仿畸,A看了眼預(yù)約登記表,發(fā)現(xiàn)離預(yù)約最早到店的時間還有30分鐘朗和,A就什么都不干了错沽,先休息30分鐘。
- 其他員工還是先在屋里歇著眶拉,但是因為采用APP預(yù)約千埃,客人約幾點都有可能,如果此時有客人約的是10分鐘后到店忆植,因為A要30分鐘后才能醒來干活放可,所以如果這位客人來了,門口就沒有人接待了唱逢。
- 對于這個問題吴侦,飯店的軟件系統(tǒng)在監(jiān)聽到最早到店時間變了的話,會再叫一個員工來門口等待坞古,此員工可能是新員工B备韧,也可能是叫醒了之前在門口休息的員工A。我們叫這位新員工X痪枫。
- 如果新員工X發(fā)現(xiàn)最早到店時間是現(xiàn)在织堂,或者客人已經(jīng)來了,就會叫一個員工C來門口接替自己奶陈,并立即開始為客人提供全流程服務(wù)易阳。
- 如果新員工X發(fā)現(xiàn)最早到店時間是10分鐘后,新員工X就像A之前一樣吃粒,什么都不干了潦俺,先休息10分鐘。
- 如果最早到店時間沒有變化,還是30分鐘后事示,軟件系統(tǒng)不會叫人早像,其他員工看到A在門口等待,自己可以安心的在屋里歇著肖爵,等著A叫人替換他卢鹦。
- 員工A在30分鐘后醒來,客人也到了劝堪,A會叫一個同事比如B接替自己冀自,而A為客人提供全流程服務(wù)。
問題解決
所以問題二到這里應(yīng)該能解釋了 DelayedWorkQueue
延遲隊列只能有一個Leader線程 當Leader不為null 即有其他任務(wù)在等待執(zhí)行的時候 下一個take線程無法從堆頂獲取并啟動任務(wù) (這不廢話嗎 隊首的任務(wù)都等著呢 后面的任務(wù)肯定沒到時間懊肜病)所以其實不是這個原因
問題二之所以隔天9點才能開啟第二次任務(wù)是因為同一個任務(wù)執(zhí)行完畢之后才會把自己重新放回隊列 雖然算出來的執(zhí)行時間是8點 但是9點才被放回任務(wù)隊列 一放進去立馬被取出來了判斷一下delay時間 這個delay時間≤0 (小于0就屬于上一個任務(wù)超時了導致下一個任務(wù)延遲了)都統(tǒng)一算作時間到了 立即執(zhí)行
一個疑問:為什么不設(shè)計成支持多任務(wù)并列執(zhí)行呢 答:其實是支持多任務(wù)并列執(zhí)行的 只不過是不支持同一個任務(wù)同時并列執(zhí)行而已
同時問題一也可以解釋了 其實DelayWorkQueue是一個基于堆的優(yōu)先隊列 優(yōu)化了多個定時任務(wù)的執(zhí)行調(diào)度(避免無效監(jiān)聽等待和避免線程上下文切換)
但是如果你只有一個定時任務(wù) 直接while true sleep也是可以的
總結(jié)
- 任務(wù)執(zhí)行方法位于
ScheduledFutureTask
的父類FutureTask.run()
該方法內(nèi)部捕獲異常且未打印異常信息熬粗,難以排查問題,同時周期性執(zhí)行任務(wù)會因為任務(wù)代碼拋異常而不再設(shè)置下次執(zhí)行時間和把自己放回延遲隊列的操作余境,即不會再周期性執(zhí)行 因此任務(wù)代碼一定要自行try-catch
做異常處理而不要直接拋出 -
ScheduledFutureTask
通過一個變量就區(qū)分了延遲和周期性執(zhí)行荐糜,period=0
延遲執(zhí)行,即只執(zhí)行一次葛超;period>0
固定頻率周期執(zhí)行;period<0
固定延遲時間周期執(zhí)行延塑,兩次任務(wù)開始執(zhí)行時間間隔受任務(wù)執(zhí)行耗時影響绣张。 - 如果周期性任務(wù)的執(zhí)行時長大于
period
,且看重執(zhí)行等間隔关带,建議使用scheduleWithFixedDelay()
侥涵。 - 若周期性任務(wù)的執(zhí)行時長遠小于
period
,則可以使用scheduleAtFixedRate()
宋雏。 - 如果周期性任務(wù)的執(zhí)行時長大于
period
且使用的是scheduleAtFixedRate
則并不能實現(xiàn)固定頻率執(zhí)行 - 當線程池處于關(guān)閉狀態(tài)(
shutdown
)芜飘,周期性任務(wù)會被取消和阻止執(zhí)行,非周期性任務(wù)會順利執(zhí)行完成不會被阻止磨总。 - 同一個周期性定時任務(wù)出隊執(zhí)行完之后才會重新入隊
-
DelayedWorkQueue
通過全局可重入鎖來實現(xiàn)同步 -
DelayedWorkQueue
常用于定時任務(wù) -
DelayedWorkQueue
內(nèi)部使用優(yōu)先級隊列來存儲 -
DelayedWorkQueue
添加元素滿了之后會自動擴容原來容量的1/2嗦明,即永遠不會阻塞,最大擴容可達Integer.MAX_VALUE
蚪燕,所以線程池中至多有corePoolSize
個工作線程正在運行娶牌。。 -
DelayedWorkQueue
消費元素take馆纳,在堆頂元素為空和delay >0 時诗良,阻塞等待。 -
DelayedWorkQueue
是一個生產(chǎn)永遠不會阻塞鲁驶,消費可以阻塞的生產(chǎn)者消費者模式鉴裹。 -
DelayedWorkQueue
有一個leader線程的變量,是Leader-Follower
模式的變種。當一個take
線程變成leader
線程時径荔,只需要等待下一次的延遲時間督禽,不是leader
線程的其他take
線程則需要等leader
線程出隊列了才喚醒其他take
線程 減少了不必要的定時等待。同時一個線程完成監(jiān)聽和任務(wù)執(zhí)行 也避免了調(diào)度線程和任務(wù)執(zhí)行線程的上下文切換成本
ScheduledThreadPoolExecutor與Timer的區(qū)別
JDK1.5開始提供ScheduledThreadPoolExecutor類猖凛,ScheduledThreadPoolExecutor類繼承ThreadPoolExecutor類重用線程池實現(xiàn)了任務(wù)的周期性調(diào)度功能赂蠢。在IDK1.5之前,實現(xiàn)任務(wù)的周期性調(diào)度主要使用的是Timer類和TimerTask類辨泳。
1.1線程角度
Timer是單線程模式虱岂,如果某個TimerTask任務(wù)的執(zhí)行時間比較久,會影響到其他任務(wù)的調(diào)度執(zhí)行菠红。ScheduledThreadPoolExecutor是多線程模式第岖,并且重用線程池,某個ScheduledFutureTask任務(wù)執(zhí)行的時間比較久试溯,不會影響到其他任務(wù)的調(diào)度執(zhí)行蔑滓。
1.2系統(tǒng)時間敏感度
Timer調(diào)度是基于操作系統(tǒng)的絕對時間的,對操作系統(tǒng)的時間敏感遇绞,一旦操作系統(tǒng)的時間改變键袱,則Timer的調(diào)度不再精確。ScheduledThreadPoolExecutor調(diào)度是基于相對時間的摹闽,不受操作系統(tǒng)時間改變的影響蹄咖。
1.3是否捕獲異常
Timer不會捕獲TimerTask拋出的異常,加上Timer又是單線程的付鹿。一旦某個調(diào)度任務(wù)出現(xiàn)異常則整個線程就會終止澜汤,其他需要調(diào)度的任務(wù)也不再執(zhí)行。ScheduledThreadPoolExecutor基于線程池來實現(xiàn)調(diào)度功能舵匾,某個任務(wù)拋出異常后俊抵,其他任務(wù)仍能正常執(zhí)行。
1.4任務(wù)是否具備優(yōu)先級
Timer中執(zhí)行的TimerTask任務(wù)整體上沒有優(yōu)先級的概念坐梯,只是按照系統(tǒng)的絕對時間來執(zhí)行任務(wù)徽诲。ScheduledThreadPoolExecutor中執(zhí)行的ScheduledFutureTask類實現(xiàn)了iavalang.Comparable接口和java.utilconcurrentDelayed接口,這也就說明了ScheduledFutureTask類中實現(xiàn)了兩個非常重要的方法吵血,一個是javalangComparable接口的compareTo方法馏段,一個是java.util.concurrentDelayed接口的getDelay方法。在ScheduledFutureTask類中compareTo方法方法實現(xiàn)了任務(wù)的比較践瓷,距離下次執(zhí)行的時間間隔短的任務(wù)會排在前面院喜,也就是說,距離下次執(zhí)行的時間間隔短的任務(wù)的優(yōu)先級比較高晕翠。而getDelay方法則能夠返回距離下次任務(wù)執(zhí)行的時間間隔喷舀。
1.5是否支持對任務(wù)排序
Timer不支持對任務(wù)的排序砍濒。ScheduledThreadPoolExecutor類中定義了一個靜態(tài)內(nèi)部類DelayedWorkQueue,DelayedWorkQueue類本質(zhì)上是一個有序隊列,為需要調(diào)度的每個任務(wù)按照距離下次執(zhí)行時間間隔的大小來排序
1.6能否獲取返回的結(jié)果
Timer中執(zhí)行的TimerTask類只是實現(xiàn)了iavaangRunnable接口硫麻,無法從TimerTask中獲取返回的結(jié)果爸邢。ScheduledThreadPoolExecutor中執(zhí)行的ScheduledFutureTask類繼承了FutureTask類,能夠通過遍Future來獲取返回的結(jié)果拿愧。通過以上對ScheduledThreadPoolExecutor類和Timer類的對比杠河,相信在JDK1.5之后,就沒有使用Timer來實現(xiàn)定時任務(wù)調(diào)度的必要了浇辜。