ScheduledThreadPoolExecutor 的調(diào)度原理主要基于兩個(gè)內(nèi)部類乖坠,ScheduledFutureTask 和 DelayedWorkQueue:
- ScheduledFutureTask 是對(duì)任務(wù)的一層封裝,將我們提交的 Runnable 或 Callable 封裝成具有時(shí)間周期的任務(wù)雁芙;
- DelayedWorkQueue 實(shí)現(xiàn)了對(duì) ScheduledFutureTask 的延遲出隊(duì)管理;
ScheduledFutureTask
ScheduledFutureTask有以下幾種構(gòu)造方法:
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
super 中調(diào)用 FutureTask 的構(gòu)造方法骨坑,可以參考 FutureTask實(shí)現(xiàn)原理洪乍。ScheduledFutureTask 主要配置參數(shù)如下:
名稱 | 含義 |
---|---|
time | 任務(wù)能夠執(zhí)行的時(shí)間點(diǎn)(單位:nanoTime ) |
period | 正值表示固定時(shí)間周期執(zhí)行。 負(fù)值表示固定延遲周期執(zhí)行匾乓。 0表示非重復(fù)任務(wù)。 |
sequenceNumber | FIFO調(diào)度序列值(用 AtomicLong 實(shí)現(xiàn)) |
注意:period 大于 0 或 小于 0 時(shí)又谋,都是周期性執(zhí)行的拼缝,只是執(zhí)行時(shí)間規(guī)律不一樣。
ScheduledFutureTask 的主要調(diào)度輔助方法如下:
// 任務(wù)的延遲執(zhí)行時(shí)間
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
//實(shí)現(xiàn)任務(wù)的排序彰亥,執(zhí)行時(shí)間越小越靠前咧七,相同則按照隊(duì)列FIFO順序
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber) // 時(shí)間一樣時(shí),按照FIFO的順序
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
// 是否是周期性任務(wù)
public boolean isPeriodic() {
return period != 0;
}
// 設(shè)置下一次運(yùn)行時(shí)間
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p; // 按固定時(shí)間周期剩愧,下次執(zhí)行時(shí)間為上次執(zhí)行時(shí)間 + 周期時(shí)間
else
time = triggerTime(-p); // 按固定延時(shí)周期猪叙,下次執(zhí)行時(shí)間為當(dāng)前時(shí)間 + 延時(shí)時(shí)間
}
核心 run 方法
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic)) // 判斷是否可以運(yùn)行任務(wù)
cancel(false); // 取消任務(wù),移除隊(duì)列
else if (!periodic) // 非周期性任務(wù) 直接調(diào)用父類 FutureTask 的 run 方法
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) { // 周期性任務(wù)仁卷,調(diào)用父類 runAndReset 方法穴翩,返回是否執(zhí)行成功
// 執(zhí)行成功后繼續(xù)設(shè)置下一次運(yùn)行時(shí)間
setNextRunTime();
// 重新執(zhí)行周期性任務(wù)(可能因?yàn)榫€程池運(yùn)行狀態(tài)的改變而被拒絕)
reExecutePeriodic(outerTask);
}
}
對(duì)于周期性任務(wù),在 run 方法中執(zhí)行成功后會(huì)繼續(xù)設(shè)置下一次執(zhí)行時(shí)間锦积,并把任務(wù)加入延時(shí)隊(duì)列芒帕。但需注意,如果任務(wù)執(zhí)行失敗丰介,將不會(huì)再被周期性調(diào)用背蟆。所以在可能執(zhí)行失敗的周期性任務(wù)中鉴分,必須做好異常處理。
DelayedWorkQueue
DelayedWorkQueue 是一個(gè)延時(shí)有序隊(duì)列带膀,內(nèi)部采用 數(shù)組 維護(hù)隊(duì)列元素志珍,采用 堆排序 的思想維護(hù)隊(duì)列順序,并在隊(duì)列元素(ScheduledFutureTask)建立索引垛叨,支持快速刪除伦糯。
注意:DelayedWorkQueue 的整個(gè)隊(duì)列不是完全有序的,只保證元素有序出隊(duì)嗽元。
下面詳細(xì)講解 DelayedWorkQueue 的實(shí)現(xiàn):
核心入隊(duì)方法:
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow(); // 隊(duì)列擴(kuò)容 類似 ArrayList 擴(kuò)容
size = i + 1;
if (i == 0) { // 隊(duì)列為空敛纲,直接加入
queue[0] = e;
setIndex(e, 0); // 設(shè)置元素在隊(duì)列的索引,即告訴元素自己在隊(duì)列的第幾位
} else {
siftUp(i, e); // 放入適當(dāng)?shù)奈恢? }
if (queue[0] == e) {
leader = null; // 等待隊(duì)列頭的線程
available.signal(); // 通知
}
} finally {
lock.unlock();
}
return true;
}
入隊(duì)方法中最重要的是 siftUp 方法剂癌, sift 在英文單詞中是 篩
的意思淤翔,這里可將 siftUp 理解為向前篩,找到合適的 堆排序點(diǎn) 加進(jìn)去佩谷。
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1; // (k-1)/2
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
siftUp 主要思想是將新增的任務(wù)與前 (k-1)/2 的位置比較旁壮,如果任務(wù)執(zhí)行時(shí)間較近者替換位置 (k-1)/2。依次往前比較琳要,直到無(wú)替換發(fā)生寡具。每次新增元素調(diào)用 siftUp 僅能保證第一個(gè)元素是最小的秤茅。整個(gè)隊(duì)列不一定有序:
例將:5 10 9 3 依次入隊(duì),隊(duì)列變化如下
[5]
[5,10]
[5,9,10]
[3,5,10,9]
如果對(duì)上述的入隊(duì)方式不了解稚补,可用下面的排序代碼進(jìn)行斷點(diǎn)調(diào)試:
// DelayedWorkQueue 的入隊(duì)、出隊(duì)排序模擬
public class SortArray {
Integer[] queue = new Integer[16];
int size = 0;
public static void main(String[] args) {
SortArray array = new SortArray();
array.add(5);
array.add(9);
array.add(10);
array.add(3);
System.out.println(array.take());
System.out.println(array.take());
System.out.println(array.take());
System.out.println(array.take());
}
boolean add(Integer e) {
if (e == null)
throw new NullPointerException();
int i = size;
size = i + 1;
if (i == 0) {
queue[0] = e;
} else {
siftUp(i, e);
}
return true;
}
Integer take() {
Integer i = queue[0];
int s = --size;
Integer k = queue[s];
if (size != 0)
siftDown(0, k);
return i;
}
private void siftUp(int k, Integer key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Integer e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = key;
}
private void siftDown(int k, Integer key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Integer c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = key;
}
}
核心出隊(duì)方法:
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 直接獲取隊(duì)首任務(wù)
RunnableScheduledFuture<?> first = queue[0];
if (first == null) // 空 則等待
available.await();
else {
long delay = first.getDelay(NANOSECONDS); // 看任務(wù)是否可以執(zhí)行
if (delay <= 0)
return finishPoll(first); // 可執(zhí)行框喳,則進(jìn)行出隊(duì)操作
// 可不執(zhí)行课幕,還需等待,則往下走
first = null;
// 看是否有正在等待的leader線程
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay); // 延時(shí)等待
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
代碼中的 available 是一個(gè)信號(hào)量五垮,會(huì)在隊(duì)列的頭部有新任務(wù)變?yōu)榭捎没蛘咝戮€程可能需要成為領(lǐng)導(dǎo)者時(shí)乍惊,發(fā)出信號(hào)。
private final Condition available = lock.newCondition();
take() 方法中重要的方法是 finishPoll(first)
思犁,主要進(jìn)行出隊(duì)時(shí)維護(hù)隊(duì)列順序:
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
siftDown 跟前面的 siftUp 很像遂蛀,它也只能保證出隊(duì)后下一個(gè)仍為最近的任務(wù)拧咳。并不會(huì)移動(dòng)和清理整個(gè)隊(duì)列。
還是用上面列出的 SortArray 這個(gè)類為例:
public static void main(String[] args) {
SortArray array = new SortArray();
array.add(5);
array.add(9);
array.add(10);
array.add(3);
System.out.println(Arrays.toString(array.queue));
System.out.println(array.take());
System.out.println(array.take());
System.out.println(array.take());
System.out.println(array.take());
System.out.println(Arrays.toString(array.queue));
array.add(20);
array.add(4);
System.out.println(Arrays.toString(array.queue));
}
我們先將5,9,10,3 依次入隊(duì)莉撇,然后全部出隊(duì),再入隊(duì) 20,4惶傻,我們看下最后的隊(duì)列里面的數(shù)據(jù)是什么樣子:
[3, 5, 10, 9, null, null, null, null, null, null, null, null, null, null, null, null]
3
5
9
10
[10, 10, 10, 9, null, null, null, null, null, null, null, null, null, null, null, null]
[4, 20, 10, 9, null, null, null, null, null, null, null, null, null, null, null, null]
看了這個(gè)結(jié)果你可能有點(diǎn)奇怪棍郎,已經(jīng)出隊(duì)了的元素居然還在隊(duì)列里面。這是一種 lazy 策略银室,DelayedWorkQueue 并不會(huì)真正直接清理掉隊(duì)列里出隊(duì)的元素涂佃,用 size 來(lái)控制隊(duì)列的邏輯大小励翼,并發(fā)物理實(shí)際大小,后來(lái)的元素會(huì)根據(jù)size來(lái)覆蓋原有的元素辜荠。
關(guān)于 DelayedWorkQueue 的出隊(duì)和入隊(duì)還有疑問(wèn)的汽抚,可以自己調(diào)試 SortArray 的代碼,看看不同的情況的不同處理結(jié)果伯病。DelayedWorkQueue 的 siftUp 殊橙、siftDown 這種排序策略非常高效,并非維護(hù)整個(gè)隊(duì)列實(shí)時(shí)有序狱从,只保證第一個(gè)出隊(duì)元素的正確性膨蛮。
元素刪除
上文有提到 ScheduledFutureTask 的索引,DelayedWorkQueue 運(yùn)用索引可以快速定位刪除元素:
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(x);
if (i < 0)
return false;
setIndex(queue[i], -1);
int s = --size;
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
if (s != i) {
siftDown(i, replacement); // 順序調(diào)整
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
// 使用索引獲取下標(biāo)
private int indexOf(Object x) {
if (x != null) {
if (x instanceof ScheduledFutureTask) {
int i = ((ScheduledFutureTask) x).heapIndex; // 索引
if (i >= 0 && i < size && queue[i] == x)
return i;
} else {
for (int i = 0; i < size; i++)
if (x.equals(queue[i]))
return i;
}
}
return -1;
}
remove方法里面首先利用 indexOf
調(diào)用索引獲取下標(biāo)季研,然后使用 siftDown
敞葛,siftUp
來(lái)調(diào)整隊(duì)列順序。這里索引的使用能夠極大提高元素定位的效率与涡,尤其是在隊(duì)列比較長(zhǎng)的時(shí)候惹谐。
最后思考一個(gè)問(wèn)題:為什么 DelayedWorkQueue 使用數(shù)組而不是鏈表結(jié)構(gòu)?
個(gè)人認(rèn)為驼卖,因?yàn)槭褂脭?shù)據(jù)結(jié)構(gòu)氨肌,利用下標(biāo)快速訪問(wèn),可以發(fā)揮基于 siftDown
酌畜,siftUp
的高效排序算法怎囚,而鏈表的下標(biāo)訪問(wèn)效率低,因此選擇使用數(shù)組桥胞。
多線程系列目錄(不斷更新中):
線程啟動(dòng)原理
線程中斷機(jī)制
多線程實(shí)現(xiàn)方式
FutureTask實(shí)現(xiàn)原理
線程池之ThreadPoolExecutor概述
線程池之ThreadPoolExecutor使用
線程池之ThreadPoolExecutor狀態(tài)控制
線程池之ThreadPoolExecutor執(zhí)行原理
線程池之ScheduledThreadPoolExecutor概述
線程池之ScheduledThreadPoolExecutor調(diào)度原理
線程池的優(yōu)雅關(guān)閉實(shí)踐