我們知道線程池運(yùn)行時(shí)膳犹,會(huì)不斷從任務(wù)隊(duì)列中獲取任務(wù)晤碘,然后執(zhí)行任務(wù)。如果我們想實(shí)現(xiàn)延時(shí)或者定時(shí)執(zhí)行任務(wù)都办,重要一點(diǎn)就是任務(wù)隊(duì)列會(huì)根據(jù)任務(wù)延時(shí)時(shí)間的不同進(jìn)行排序烙荷,延時(shí)時(shí)間越短地就排在隊(duì)列的前面镜会,先被獲取執(zhí)行。
隊(duì)列是先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu)终抽,就是先進(jìn)入隊(duì)列的數(shù)據(jù)戳表,先被獲取。但是有一種特殊的隊(duì)列叫做優(yōu)先級(jí)隊(duì)列昼伴,它會(huì)對(duì)插入的數(shù)據(jù)進(jìn)行優(yōu)先級(jí)排序匾旭,保證優(yōu)先級(jí)越高的數(shù)據(jù)首先被獲取,與數(shù)據(jù)的插入順序無(wú)關(guān)圃郊。
實(shí)現(xiàn)優(yōu)先級(jí)隊(duì)列高效常用的一種方式就是使用堆季率。
一. 用堆實(shí)現(xiàn)優(yōu)先級(jí)隊(duì)列
在常用排序算法總結(jié)這篇文章中,我們?cè)敿?xì)地講解了堆排序的實(shí)現(xiàn)描沟。這里我們回顧一下飒泻。
1.1 什么是堆
- 它是一個(gè)完全二叉樹,即除了最后一層節(jié)點(diǎn)不是滿的吏廉,其他層節(jié)點(diǎn)都是滿的泞遗,即左右節(jié)點(diǎn)都有。
- 它不是二叉搜索樹席覆,即左節(jié)點(diǎn)的值都比父節(jié)點(diǎn)值小史辙,右節(jié)點(diǎn)的值都不比父節(jié)點(diǎn)值小,這樣查找的時(shí)候佩伤,就可以通過(guò)二分的方式聊倔,效率是(log N)。
- 它是特殊的二叉樹生巡,它要求父節(jié)點(diǎn)的值不能小于子節(jié)點(diǎn)的值耙蔑。這樣保證大的值在上面,小的值在下面孤荣。所以堆遍歷和查找都是低效的甸陌,因?yàn)槲覀冎恢?br> 從根節(jié)點(diǎn)到子葉節(jié)點(diǎn)的每條路徑都是降序的,但是各個(gè)路徑之間都是沒(méi)有聯(lián)系的盐股,查找一個(gè)值時(shí)钱豁,你不知道應(yīng)該從左節(jié)點(diǎn)查找還是從右節(jié)點(diǎn)開始查找。
- 它可以實(shí)現(xiàn)快速的插入和刪除疯汁,效率都在(log N)左右牲尺。所以它可以實(shí)現(xiàn)優(yōu)先級(jí)隊(duì)列。
堆是一個(gè)二叉樹幌蚊,但是它最簡(jiǎn)單的方式是通過(guò)數(shù)組去實(shí)現(xiàn)二叉樹谤碳,而且因?yàn)槎咽且粋€(gè)完全二叉樹凛澎,就不存在數(shù)組空間的浪費(fèi)。怎么使用數(shù)組來(lái)存儲(chǔ)二叉樹呢估蹄?
就是用數(shù)組的下標(biāo)來(lái)模擬二叉樹的各個(gè)節(jié)點(diǎn),比如說(shuō)根節(jié)點(diǎn)就是0塑煎,第一層的左節(jié)點(diǎn)是1,右節(jié)點(diǎn)是2臭蚁。由此我們可以得出下列公式:
// 對(duì)于n位置的節(jié)點(diǎn)來(lái)說(shuō):
int left = 2 * n + 1; // 左子節(jié)點(diǎn)
int right = 2 * n + 2; // 右子節(jié)點(diǎn)
int parent = (n - 1) / 2; // 父節(jié)點(diǎn)最铁,當(dāng)然n要大于0,根節(jié)點(diǎn)是沒(méi)有父節(jié)點(diǎn)的
對(duì)于堆來(lái)說(shuō)垮兑,只有兩個(gè)操作冷尉,插入insert和刪除remove,不管插入還是刪除保證堆的成立條件系枪,1.是完全二叉樹雀哨,2.父節(jié)點(diǎn)的值不能小于子節(jié)點(diǎn)的值。
public void insert(int value) {
// 第一步將插入的值私爷,直接放在最后一個(gè)位置雾棺。并將長(zhǎng)度加一
store[size++] = value;
// 得到新插入值所在位置。
int index = size - 1;
while(index > 0) {
// 它的父節(jié)點(diǎn)位置坐標(biāo)
int parentIndex = (index - 1) / 2;
// 如果父節(jié)點(diǎn)的值小于子節(jié)點(diǎn)的值衬浑,你不滿足堆的條件捌浩,那么就交換值
if (store[index] > store[parentIndex]) {
swap(store, index, parentIndex);
index = parentIndex;
} else {
// 否則表示這條路徑上的值已經(jīng)滿足降序,跳出循環(huán)
break;
}
}
}
主要步驟:
- 直接將value插入到size位置工秩,并將size自增尸饺,這樣store數(shù)組中插入一個(gè)值了。
- 要保證從這個(gè)葉節(jié)點(diǎn)到根節(jié)點(diǎn)這條路徑上的節(jié)點(diǎn)助币,滿足父節(jié)點(diǎn)的值不能小于子節(jié)點(diǎn)浪听。
- 通過(guò)int parentIndex = (index - 1) / 2得到父節(jié)點(diǎn),如果比父節(jié)點(diǎn)值大眉菱,那么兩者位置的值交換迹栓,然后再拿這個(gè)父節(jié)點(diǎn)和它的父父節(jié)點(diǎn)比較。
直到這個(gè)節(jié)點(diǎn)值比父節(jié)點(diǎn)值小倍谜,或者這個(gè)節(jié)點(diǎn)已經(jīng)是根節(jié)點(diǎn)就退出循環(huán)迈螟。
因?yàn)槲覀兠看沃徊迦胍粋€(gè)值,所以只需要保證新插入位置的葉節(jié)點(diǎn)到根節(jié)點(diǎn)路徑滿足堆的條件尔崔,因?yàn)槠渌窂經(jīng)]做操作,肯定是滿足條件的褥民。第二因?yàn)槭侵苯釉趕ize位置插入值季春,所以肯定滿足是完全二叉樹這個(gè)條件。因?yàn)槊看窝h(huán)index都是除以2這種倍數(shù)遞減的方式消返,所以它最多循環(huán)次數(shù)是(log N)次载弄。
public int remove() {
// 將根的值記錄耘拇,最后返回
int result = store[0];
// 將最后位置的值放到根節(jié)點(diǎn)位置
store[0] = store[--size];
int index = 0;
// 通過(guò)循環(huán),保證父節(jié)點(diǎn)的值不能小于子節(jié)點(diǎn)宇攻。
while(true) {
int leftIndex = 2 * index + 1; // 左子節(jié)點(diǎn)
int rightIndex = 2 * index + 2; // 右子節(jié)點(diǎn)
// leftIndex >= size 表示這個(gè)子節(jié)點(diǎn)還沒(méi)有值惫叛。
if (leftIndex >= size) break;
int maxIndex = leftIndex;
if (store[leftIndex] < store[rightIndex]) maxIndex = rightIndex;
if (store[index] < store[maxIndex]) {
swap(store, index, maxIndex);
index = maxIndex;
} else {
break;
}
}
return result;
}
在堆中最大值就在根節(jié)點(diǎn),所以操作步驟:
- 將根節(jié)點(diǎn)的值保存到result中逞刷。
- 將最后節(jié)點(diǎn)的值移動(dòng)到根節(jié)點(diǎn)嘉涌,再將長(zhǎng)度減一,這樣滿足堆成立第一個(gè)條件夸浅,堆是一個(gè)完全二叉樹仑最。
- 使用循環(huán),來(lái)滿足堆成立的第二個(gè)條件帆喇,父節(jié)點(diǎn)的值不能小于子節(jié)點(diǎn)的值警医。
- 最后返回result。
那么怎么樣滿足堆的第二個(gè)條件呢坯钦?
因?yàn)楦c(diǎn)的值現(xiàn)在是新值预皇,那么就有可能比它的子節(jié)點(diǎn)小,所以就有可能要進(jìn)行交換婉刀。
- 我們要找出左子節(jié)點(diǎn)和右子節(jié)點(diǎn)那個(gè)值更大深啤,因?yàn)檫@個(gè)值可能要和父節(jié)點(diǎn)值進(jìn)行交換,如果它不是較大值的話路星,它和父節(jié)點(diǎn)進(jìn)行交換之后溯街,就會(huì)出現(xiàn)父節(jié)點(diǎn)的值小于子節(jié)點(diǎn)。
- 將找到的較大子節(jié)點(diǎn)值和父節(jié)點(diǎn)值進(jìn)行比較洋丐。
- 如果父節(jié)點(diǎn)的值小于它呈昔,那么將父節(jié)點(diǎn)和較大子節(jié)點(diǎn)值進(jìn)行交換,然后再比較較大子節(jié)點(diǎn)和它的子節(jié)點(diǎn)友绝。
- 如果父節(jié)點(diǎn)的值不小于子節(jié)點(diǎn)較大值堤尾,或者沒(méi)有子節(jié)點(diǎn)(即這個(gè)節(jié)點(diǎn)已經(jīng)是葉節(jié)點(diǎn)了),就跳出循環(huán)迁客。
- 每次循環(huán)我們都是以2的倍數(shù)遞增郭宝,所以它也是最多循環(huán)次數(shù)是(log N)次。
所以通過(guò)堆這種方式可以快速實(shí)現(xiàn)優(yōu)先級(jí)隊(duì)列掷漱,它的插入和刪除操作的效率都是O(log N)粘室。
二. DelayedWorkQueue類
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
從定義中看出DelayedWorkQueue是一個(gè)阻塞隊(duì)列。
2.1 重要成員屬性
// 初始時(shí)卜范,數(shù)組長(zhǎng)度大小衔统。
private static final int INITIAL_CAPACITY = 16;
// 使用數(shù)組來(lái)儲(chǔ)存隊(duì)列中的元素。
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
// 使用lock來(lái)保證多線程并發(fā)安全問(wèn)題。
private final ReentrantLock lock = new ReentrantLock();
// 隊(duì)列中儲(chǔ)存元素的大小
private int size = 0;
//特指隊(duì)列頭任務(wù)所在線程
private Thread leader = null;
// 當(dāng)隊(duì)列頭的任務(wù)延時(shí)時(shí)間到了锦爵,或者有新的任務(wù)變成隊(duì)列頭時(shí)舱殿,用來(lái)喚醒等待線程
private final Condition available = lock.newCondition();
DelayedWorkQueue是用數(shù)組來(lái)儲(chǔ)存隊(duì)列中的元素,那么我們看看它是怎么實(shí)現(xiàn)優(yōu)先級(jí)隊(duì)列的险掀。
2.2 插入元素排序siftUp方法
private void siftUp(int k, RunnableScheduledFuture<?> key) {
// 當(dāng)k==0時(shí)沪袭,就到了堆二叉樹的根節(jié)點(diǎn)了,跳出循環(huán)
while (k > 0) {
// 父節(jié)點(diǎn)位置坐標(biāo), 相當(dāng)于(k - 1) / 2
int parent = (k - 1) >>> 1;
// 獲取父節(jié)點(diǎn)位置元素
RunnableScheduledFuture<?> e = queue[parent];
// 如果key元素大于父節(jié)點(diǎn)位置元素樟氢,滿足條件冈绊,那么跳出循環(huán)
// 因?yàn)槭菑男〉酱笈判虻摹? if (key.compareTo(e) >= 0)
break;
// 否則就將父節(jié)點(diǎn)元素存放到k位置
queue[k] = e;
// 這個(gè)只有當(dāng)元素是ScheduledFutureTask對(duì)象實(shí)例才有用,用來(lái)快速取消任務(wù)嗡害。
setIndex(e, k);
// 重新賦值k焚碌,尋找元素key應(yīng)該插入到堆二叉樹的那個(gè)節(jié)點(diǎn)
k = parent;
}
// 循環(huán)結(jié)束,k就是元素key應(yīng)該插入的節(jié)點(diǎn)位置
queue[k] = key;
setIndex(key, k);
}
通過(guò)循環(huán)霸妹,來(lái)查找元素key應(yīng)該插入在堆二叉樹那個(gè)節(jié)點(diǎn)位置十电,并交互父節(jié)點(diǎn)的位置。具體流程在前面已經(jīng)介紹過(guò)了叹螟。
2.3 移除元素排序siftDown方法
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
// 通過(guò)循環(huán)鹃骂,保證父節(jié)點(diǎn)的值不能大于子節(jié)點(diǎn)。
while (k < half) {
// 左子節(jié)點(diǎn), 相當(dāng)于 (k * 2) + 1
int child = (k << 1) + 1;
// 左子節(jié)點(diǎn)位置元素
RunnableScheduledFuture<?> c = queue[child];
// 右子節(jié)點(diǎn), 相當(dāng)于 (k * 2) + 2
int right = child + 1;
// 如果左子節(jié)點(diǎn)元素值大于右子節(jié)點(diǎn)元素值罢绽,那么右子節(jié)點(diǎn)才是較小值的子節(jié)點(diǎn)畏线。
// 就要將c與child值重新賦值
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
// 如果父節(jié)點(diǎn)元素值小于較小的子節(jié)點(diǎn)元素值,那么就跳出循環(huán)
if (key.compareTo(c) <= 0)
break;
// 否則良价,父節(jié)點(diǎn)元素就要和子節(jié)點(diǎn)進(jìn)行交換
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
通過(guò)循環(huán)寝殴,保證父節(jié)點(diǎn)的值不能小于子節(jié)點(diǎn)。
2.4 插入元素方法
public void put(Runnable e) {
offer(e);
}
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable e, long timeout, TimeUnit unit) {
return offer(e);
}
我們發(fā)現(xiàn)與普通阻塞隊(duì)列相比明垢,這三個(gè)添加方法都是調(diào)用offer方法蚣常。那是因?yàn)樗鼪](méi)有隊(duì)列已滿的條件,也就是說(shuō)可以不斷地向DelayedWorkQueue添加元素,當(dāng)元素個(gè)數(shù)超過(guò)數(shù)組長(zhǎng)度時(shí)痊银,會(huì)進(jìn)行數(shù)組擴(kuò)容抵蚊。
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
// 使用lock保證并發(fā)操作安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 如果要超過(guò)數(shù)組長(zhǎng)度,就要進(jìn)行數(shù)組擴(kuò)容
if (i >= queue.length)
// 數(shù)組擴(kuò)容
grow();
// 將隊(duì)列中元素個(gè)數(shù)加一
size = i + 1;
// 如果是第一個(gè)元素溯革,那么就不需要排序贞绳,直接賦值就行了
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// 調(diào)用siftUp方法,使插入的元素變得有序致稀。
siftUp(i, e);
}
// 表示新插入的元素是隊(duì)列頭冈闭,更換了隊(duì)列頭,
// 那么就要喚醒正在等待獲取任務(wù)的線程豺裆。
if (queue[0] == e) {
leader = null;
// 喚醒正在等待等待獲取任務(wù)的線程
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
主要是三步:
- 元素個(gè)數(shù)超過(guò)數(shù)組長(zhǎng)度拒秘,就會(huì)調(diào)用grow()方法号显,進(jìn)行數(shù)組擴(kuò)容臭猜。
- 將新元素e添加到優(yōu)先級(jí)隊(duì)列中對(duì)應(yīng)的位置躺酒,通過(guò)siftUp方法,保證按照元素的優(yōu)先級(jí)排序蔑歌。
- 如果新插入的元素是隊(duì)列頭羹应,即更換了隊(duì)列頭,那么就要喚醒正在等待獲取任務(wù)的線程次屠。這些線程可能是因?yàn)樵?duì)列頭元素的延時(shí)時(shí)間沒(méi)到园匹,而等待的。
數(shù)組擴(kuò)容方法:
private void grow() {
int oldCapacity = queue.length;
// 每次擴(kuò)容增加原來(lái)數(shù)組的一半數(shù)量劫灶。
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
// 使用Arrays.copyOf來(lái)復(fù)制一個(gè)新數(shù)組
queue = Arrays.copyOf(queue, newCapacity);
}
2.5 獲取隊(duì)列頭元素
2.5.1 立即獲取隊(duì)列頭元素
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first = queue[0];
// 隊(duì)列頭任務(wù)是null裸违,或者任務(wù)延時(shí)時(shí)間沒(méi)有到,都返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
// 移除隊(duì)列頭元素
return finishPoll(first);
} finally {
lock.unlock();
}
}
當(dāng)隊(duì)列頭任務(wù)是null本昏,或者任務(wù)延時(shí)時(shí)間沒(méi)有到供汛,表示這個(gè)任務(wù)還不能返回,因此直接返回null涌穆。否則調(diào)用finishPoll方法怔昨,移除隊(duì)列頭元素并返回。
// 移除隊(duì)列頭元素
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
// 將隊(duì)列中元素個(gè)數(shù)減一
int s = --size;
// 獲取隊(duì)列末尾元素x
RunnableScheduledFuture<?> x = queue[s];
// 原隊(duì)列末尾元素設(shè)置為null
queue[s] = null;
if (s != 0)
// 因?yàn)橐瞥岁?duì)列頭元素宿稀,所以進(jìn)行重新排序趁舀。
siftDown(0, x);
setIndex(f, -1);
return f;
}
這個(gè)方法與我們?cè)诘谝还?jié)中,介紹堆的刪除方法一樣祝沸。
- 先將隊(duì)列中元素個(gè)數(shù)減一矮烹。
- 將原隊(duì)列末尾元素設(shè)置成隊(duì)列頭元素,再將隊(duì)列末尾元素設(shè)置為null罩锐。
- 調(diào)用siftDown(0, x)方法奉狈,保證按照元素的優(yōu)先級(jí)排序。
2.5.2 等待獲取隊(duì)列頭元素
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// 如果沒(méi)有任務(wù)唯欣,就讓線程在available條件下等待嘹吨。
if (first == null)
available.await();
else {
// 獲取任務(wù)的剩余延時(shí)時(shí)間
long delay = first.getDelay(NANOSECONDS);
// 如果延時(shí)時(shí)間到了,就返回這個(gè)任務(wù)境氢,用來(lái)執(zhí)行蟀拷。
if (delay <= 0)
return finishPoll(first);
// 將first設(shè)置為null,當(dāng)線程等待時(shí)萍聊,不持有first的引用
first = null; // don't retain ref while waiting
// 如果還是原來(lái)那個(gè)等待隊(duì)列頭任務(wù)的線程问芬,
// 說(shuō)明隊(duì)列頭任務(wù)的延時(shí)時(shí)間還沒(méi)有到,繼續(xù)等待寿桨。
if (leader != null)
available.await();
else {
// 記錄一下當(dāng)前等待隊(duì)列頭任務(wù)的線程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 當(dāng)任務(wù)的延時(shí)時(shí)間到了時(shí)此衅,能夠自動(dòng)超時(shí)喚醒强戴。
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
// 喚醒等待任務(wù)的線程
available.signal();
lock.unlock();
}
}
如果隊(duì)列中沒(méi)有任務(wù),那么就讓當(dāng)前線程在available條件下等待挡鞍。如果隊(duì)列頭任務(wù)的剩余延時(shí)時(shí)間delay大于0骑歹,那么就讓當(dāng)前線程在available條件下等待delay時(shí)間。
如果隊(duì)列插入了新的隊(duì)列頭墨微,它的剩余延時(shí)時(shí)間肯定小于原來(lái)隊(duì)列頭的時(shí)間道媚,這個(gè)時(shí)候就要喚醒等待線程,看看它是否能獲取任務(wù)翘县。
2.5.3 超時(shí)等待獲取隊(duì)列頭元素
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// 如果沒(méi)有任務(wù)最域。
if (first == null) {
// 超時(shí)時(shí)間已到,那么就直接返回null
if (nanos <= 0)
return null;
else
// 否則就讓線程在available條件下等待nanos時(shí)間
nanos = available.awaitNanos(nanos);
} else {
// 獲取任務(wù)的剩余延時(shí)時(shí)間
long delay = first.getDelay(NANOSECONDS);
// 如果延時(shí)時(shí)間到了锈麸,就返回這個(gè)任務(wù)镀脂,用來(lái)執(zhí)行。
if (delay <= 0)
return finishPoll(first);
// 如果超時(shí)時(shí)間已到忘伞,那么就直接返回null
if (nanos <= 0)
return null;
// 將first設(shè)置為null薄翅,當(dāng)線程等待時(shí),不持有first的引用
first = null; // don't retain ref while waiting
// 如果超時(shí)時(shí)間小于任務(wù)的剩余延時(shí)時(shí)間虑省,那么就有可能獲取不到任務(wù)匿刮。
// 在這里讓線程等待超時(shí)時(shí)間nanos
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 當(dāng)任務(wù)的延時(shí)時(shí)間到了時(shí),能夠自動(dòng)超時(shí)喚醒探颈。
long timeLeft = available.awaitNanos(delay);
// 計(jì)算剩余的超時(shí)時(shí)間
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
// 喚醒等待任務(wù)的線程
available.signal();
lock.unlock();
}
}
與take方法相比較熟丸,就要考慮設(shè)置的超時(shí)時(shí)間,如果超時(shí)時(shí)間到了伪节,還沒(méi)有獲取到有用任務(wù)光羞,那么就返回null。其他的與take方法中邏輯一樣怀大。
三. 總結(jié)
使用優(yōu)先級(jí)隊(duì)列DelayedWorkQueue纱兑,保證添加到隊(duì)列中的任務(wù),會(huì)按照任務(wù)的延時(shí)時(shí)間進(jìn)行排序化借,延時(shí)時(shí)間少的任務(wù)首先被獲取潜慎。