一、添加元素
public void put(Runnable e) {
offer(e);
}
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable e, long timeout, TimeUnit unit) {
return offer(e);
}
- put方法和add方法都會(huì)調(diào)用offer方法午磁,put方法沒有返回值莺戒,add返回是否添加成功
- 因?yàn)镈elayedWorkQueue可以擴(kuò)容,添加元素沒有阻塞爽蝴,所以帶時(shí)間的offer方法最終調(diào)用的還是不帶時(shí)間的offer方法
1.offer方法
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 如果超過(guò)了數(shù)組的容量沐批,執(zhí)行擴(kuò)容50%
if (i >= queue.length)
grow();
// 數(shù)組元素個(gè)數(shù)加1
size = i + 1;
// 如果是第一個(gè)元素,直接設(shè)置值
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
// 如果不是第一個(gè)元素蝎亚,需要向上比較并移動(dòng)
} else {
siftUp(i, e);
}
// 如果是第一個(gè)元素兴溜,說(shuō)明之前是空的糊啡,將leader置為空幽污,通知等待獲取隊(duì)列數(shù)據(jù)的線程
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
- 首先看是否需要擴(kuò)容伸辟,每次擴(kuò)容1.5倍
- 由于queue是小堆數(shù)據(jù)結(jié)構(gòu),如果是第一個(gè)元素梅惯,直接添加到數(shù)組中宪拥;如果不是第一個(gè)元素,需要與父結(jié)點(diǎn)比較并移動(dòng)
- 如果是第一個(gè)元素个唧,需通知從隊(duì)列獲取數(shù)據(jù)的線程
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
// 獲取父結(jié)點(diǎn)索引位置
int parent = (k - 1) >>> 1;
// 獲取父結(jié)點(diǎn)元素
RunnableScheduledFuture<?> e = queue[parent];
// 如果當(dāng)前插入的元素延遲時(shí)間或者序列號(hào)大于父結(jié)點(diǎn)江解,直接退出while循環(huán)
if (key.compareTo(e) >= 0)
break;
// 走到這,說(shuō)明需要將新元素上移徙歼,將父元素替換到新元素的位置
queue[k] = e;
setIndex(e, k);
// 將k的值更改為父元素的值犁河,進(jìn)行下一次循環(huán)
k = parent;
}
// 重新插入新的元素
queue[k] = key;
// 設(shè)置元素在數(shù)組的位置
setIndex(key, k);
}
- DelayedWorkQueue是小堆樹的數(shù)據(jù)結(jié)構(gòu)鳖枕,如果以0為下標(biāo)開始編號(hào),當(dāng)前結(jié)點(diǎn)的在數(shù)組中的位置為i桨螺,那么當(dāng)前結(jié)點(diǎn)的父結(jié)點(diǎn)在數(shù)組的(i-1)/2位置宾符,左結(jié)點(diǎn)是2i+1的位置,右結(jié)點(diǎn)是2i+2的位置灭翔;
- key是類ScheduledFutureTask魏烫,重寫了compareTo方法,比較的是time肝箱,sequenceNumber哄褒,NANOSECONDS大小
二、獲取元素
1. poll()
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 獲取小堆樹的根結(jié)點(diǎn)
RunnableScheduledFuture<?> first = queue[0];
// 如果根結(jié)點(diǎn)為空或者時(shí)間還沒到煌张,則返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
// 返回根結(jié)點(diǎn)并重新平衡小堆樹
else
return finishPoll(first);
} finally {
lock.unlock();
}
}
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
// 元素個(gè)數(shù)減1
int s = --size;
// 獲取最后一個(gè)元素
RunnableScheduledFuture<?> x = queue[s];
// 將最后一個(gè)元素置為空
queue[s] = null;
// 如果最后一個(gè)元素不是唯一的元素呐赡,那么放到根結(jié)點(diǎn),并再平衡小堆樹
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
// while循環(huán)骏融,k<half說(shuō)明還沒到葉子結(jié)點(diǎn)链嘀,所以一直要比較
while (k < half) {
// 左結(jié)點(diǎn)索引
int child = (k << 1) + 1;
// 獲取左結(jié)點(diǎn)元素
RunnableScheduledFuture<?> c = queue[child];
// 右結(jié)點(diǎn)索引
int right = child + 1;
// 因?yàn)榻Y(jié)點(diǎn)向下移動(dòng),所以要與子節(jié)點(diǎn)的較小值比較档玻,所以這里比較左結(jié)點(diǎn)元素與右結(jié)點(diǎn)元素大小
// 如果右結(jié)點(diǎn)較小怀泊,將c變更為右結(jié)點(diǎn)
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
// 如果當(dāng)前結(jié)點(diǎn)比子節(jié)點(diǎn)的較小值小,直接退出循環(huán)
if (key.compareTo(c) <= 0)
break;
// 與較小的子結(jié)點(diǎn)調(diào)換位置误趴,再循環(huán)
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
- 如果隊(duì)列為空霹琼,或者時(shí)間還沒到,則返回空
- 返回的元素是第一個(gè)元素冤留,將末尾元素放到一個(gè)位置碧囊,并向下再平衡
2. poll(long timeout, TimeUnit unit)
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// 隊(duì)列為空
if (first == null) {
// 阻塞時(shí)間小于等于0树灶,返回空
if (nanos <= 0)
return null;
// 有等待時(shí)間纤怒,那么就等待一段時(shí)間
else
nanos = available.awaitNanos(nanos);
// 隊(duì)列不為空
} else {
long delay = first.getDelay(NANOSECONDS);
// 時(shí)間到了,返回第一個(gè)元素
if (delay <= 0)
return finishPoll(first);
// 時(shí)間沒到天通,如果等待時(shí)間小于等于0泊窘,返回空
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
// 如果等待時(shí)間小于延遲時(shí)間
// 或者
// leader不為空,說(shuō)明有l(wèi)eader線程
// 那么等待一段時(shí)間
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
// 等待時(shí)間大于延遲時(shí)間且leader為空
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 因?yàn)榈却龝r(shí)間大于延遲時(shí)間像寒,所以這里只能等待延遲時(shí)間
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
// leader線程結(jié)束
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果沒有l(wèi)eader線程且隊(duì)列不為空烘豹,喚醒獲取元素的線程
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
- 如果隊(duì)列為空,等待時(shí)間小于等于0诺祸,直接返回空携悯,如果有等待時(shí)間,那么就阻塞一段時(shí)間筷笨;
- 如果隊(duì)列不為空
- 任務(wù)時(shí)間到了憔鬼,返回隊(duì)列第一個(gè)位置的元素
- 任務(wù)時(shí)間沒到龟劲,但是等待的時(shí)間小于等于0,那么返回空
- 如果等待的時(shí)間小于任務(wù)執(zhí)行的時(shí)間點(diǎn)轴或,或者leader線程不為空昌跌,那么阻塞等待的時(shí)間
- 如果等待時(shí)間大于任務(wù)執(zhí)行的時(shí)間點(diǎn)且leader線程為空,那么阻塞任務(wù)執(zhí)行的時(shí)間點(diǎn)
- 最后如果leader線程不為空且隊(duì)列不為空照雁,那么通知獲取對(duì)列元素的線程
3. take()
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// 如果隊(duì)列為空蚕愤,一直阻塞
if (first == null)
available.await();
// 隊(duì)列不為空
else {
// 獲取任務(wù)執(zhí)行的延遲時(shí)間
long delay = first.getDelay(NANOSECONDS);
// 沒有延遲時(shí)間,返回隊(duì)列的元素
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
// 如果leader線程不為空饺蚊,阻塞當(dāng)前線程
if (leader != null)
available.await();
// 如果leader線程為空萍诱,那么阻塞延遲的時(shí)間
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果leader線程為空且隊(duì)列不為空,通知等待獲取隊(duì)列元素的線程
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
- 如果隊(duì)列為空污呼,一直阻塞砂沛,直至被喚醒
- 如果隊(duì)列不為空,到了任務(wù)的延遲時(shí)間曙求,那么從隊(duì)列里彈出任務(wù)碍庵;如果沒到任務(wù)的延遲時(shí)間,那么阻塞任務(wù)悟狱,阻塞時(shí)間為任務(wù)的延遲時(shí)間静浴,但是如果leader線程不為空,那么會(huì)一直阻塞挤渐,直至被喚醒
4. peek()
public RunnableScheduledFuture<?> peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return queue[0];
} finally {
lock.unlock();
}
}
- 直接返回第一個(gè)元素苹享,不是彈出,不判斷是否到了延遲時(shí)間
5. peekExpired()
private RunnableScheduledFuture<?> peekExpired() {
// assert lock.isHeldByCurrentThread();
RunnableScheduledFuture<?> first = queue[0];
return (first == null || first.getDelay(NANOSECONDS) > 0) ?
null : first;
}
三浴麻、刪除元素
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 獲取刪除元素的索引
int i = indexOf(x);
if (i < 0)
return false;
// 將刪除的元素索引設(shè)為-1
setIndex(queue[i], -1);
// 元素個(gè)數(shù)減1
int s = --size;
// 用數(shù)組尾部元素替換刪除的結(jié)點(diǎn)
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
// 如果刪除不是尾部元素
if (s != i) {
// 先向下平衡
siftDown(i, replacement);
// 如果向下平衡失敗得问,也就是下面的都是大于當(dāng)前結(jié)點(diǎn),那么就想上平衡
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}