隊(duì)列
隊(duì)列是先進(jìn)先出(FIFO)的線性表枝誊。在具體應(yīng)用中通常用鏈表或者數(shù)組來(lái)實(shí)現(xiàn)。隊(duì)列只允許在后端(稱為rear)進(jìn)行插入操作惜纸,在前端(稱為front)進(jìn)行刪除操作叶撒。隊(duì)列的操作方式和堆棧類似绝骚,唯一的區(qū)別在于隊(duì)列只允許新數(shù)據(jù)在后端進(jìn)行添加。
操作 | 拋出異常 | 有返回值 |
---|---|---|
Insert | add(e) | offer(e) |
Remove | remove() | poll() |
Examine | element() | peek() |
阻塞隊(duì)列
阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列,這兩個(gè)附加操作支持阻塞的插入和移除方法.
- 支持阻塞的插入方法: 當(dāng)隊(duì)列滿時(shí),隊(duì)列會(huì)阻塞插入元素的線程,直到隊(duì)列不滿為止.
- 支持阻塞的移除方法: 當(dāng)隊(duì)列為空時(shí),獲取元素的線程阻塞等待線程非空.
阻塞隊(duì)列通常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,生產(chǎn)者就是向隊(duì)列里添加元素,而消費(fèi)者就是從隊(duì)列里取出元素. 阻塞隊(duì)列就是生產(chǎn)者存儲(chǔ)元素而消費(fèi)者用來(lái)獲取元素的容器.
操作方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時(shí)退出 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
檢查 | element() | peek() | 不可用 | 不可用 |
注意: 如果是無(wú)界阻塞隊(duì)列,隊(duì)列永遠(yuǎn)都不會(huì)出現(xiàn)滿的情況,所以使用put或者take方法永遠(yuǎn)都不會(huì)被阻塞,而且使用put方法時(shí),該方法永遠(yuǎn)返回為true.
JDK提供的阻塞隊(duì)列
從上面的UML圖可以看到,JKD7提供了7個(gè)阻塞隊(duì)列:
-
ArrayBlockingQueue
: 由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列 -
LinkedBlockingQueue
: 由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列 -
PriorityBlockingQueue
: 支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列 -
DelayQueue
: 使用優(yōu)先級(jí)隊(duì)列隊(duì)列實(shí)現(xiàn)的無(wú)界阻塞隊(duì)列 -
SynchronousQueue
: 不存儲(chǔ)元素的阻塞隊(duì)列 -
LinkedTransferQueue
: 由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列 -
LinkedBlockingDeque
: 由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列
ArrayBlockingQueue
ArrayBlockingQueue
是一個(gè)用數(shù)組實(shí)現(xiàn)的有界隊(duì)列,此隊(duì)列按照先進(jìn)先出的原則對(duì)元素進(jìn)行排序.
默認(rèn)情況下不保證線程公平的訪問(wèn)隊(duì)列,所謂公平訪問(wèn)隊(duì)列是指阻塞的線程,可以按照阻塞的先后順序訪問(wèn)隊(duì)列,即先阻塞線程先訪問(wèn)隊(duì)列.非公平性對(duì)先等待的線程是非公平的,當(dāng)隊(duì)列可用時(shí),阻塞的線程都可以爭(zhēng)奪訪問(wèn)隊(duì)列的資格,有可能先阻塞的線程最后才訪問(wèn)隊(duì)列.
為了保證公平性,通常會(huì)降低吞吐量,可以使用以下代碼創(chuàng)建一個(gè)公平的阻塞隊(duì)列.
ArrayBlockingQueue fairQueue= new ArrayBlockingQueue(1000,true);
訪問(wèn)者的公平性是使用可重入鎖實(shí)現(xiàn)的,代碼如下:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
LinkedBlockingQueue
LinkedBlockingQueue
是一個(gè)用鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列,此隊(duì)列默認(rèn)最大長(zhǎng)度為Integer.MAX_VALUE,按照先進(jìn)先出(FIFO)的原則對(duì)元素進(jìn)行排序
PriorityBlockingQueue
PriorityBlockingQueue
是一個(gè)支持優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列,默認(rèn)情況下元素采用自然排序升序排列,也可以自定義類實(shí)現(xiàn)compareTo()方法來(lái)指定元素排序規(guī)則,或者初始化PriorityBlockingQueue
時(shí),指定構(gòu)造參數(shù)Comparator來(lái)對(duì)元素進(jìn)行排序,需要注意的是不能保證同優(yōu)先級(jí)的元素排序.
DelayQueue
DelayQueue
是一個(gè)支持延時(shí)獲取元素的無(wú)界阻塞隊(duì)列,隊(duì)列使用PriorityQueue來(lái)實(shí)現(xiàn). 隊(duì)列中元素必須實(shí)現(xiàn)Delayed接口,在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素.只有延遲期滿時(shí)才能從隊(duì)列中提出元素.
DelayQueue
非常有用,可以將DelayQueue
運(yùn)用在一下場(chǎng)景:
- 緩存系統(tǒng)的設(shè)計(jì): 可以送
DelayQueue
保存緩存元素的有效期,使用一個(gè)線程循環(huán)查詢DelayQueue
,一旦從DelayQueue
獲取元素,就表示緩存到期了. - 定時(shí)任務(wù)調(diào)度:使用
DelayQueue
保存當(dāng)前將會(huì)執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從DelayQueue
中獲取到任務(wù)就開(kāi)始執(zhí)行,比如TimeQueue就是使用DelayQueue
實(shí)現(xiàn)的.
SynchronousQueue
SynchronousQueue
是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)put操作必須等待一個(gè)take操作,否則不能繼續(xù)添加元素.
支持公平訪問(wèn)隊(duì)列,默認(rèn)情況下線程采用非公平性策略,使用帶boolean參數(shù)的構(gòu)造方法可以實(shí)現(xiàn)等待線程采用先進(jìn)先出(FIFO)的順序訪問(wèn)隊(duì)列.
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
LinkedTransferQueue
LinkedTransferQueue
是一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞TransferQueue隊(duì)列,相當(dāng)于其他阻塞隊(duì)列,LinkedTransferQueue多了一tryTransfer和transfer方法.
-
transfer方法
如果當(dāng)前有消費(fèi)者正在等待接收元素(消費(fèi)者使用take()方法或者帶時(shí)間限制的poll方式時(shí))transfer()方法可以吧生產(chǎn)者傳入的元素立即transfer(傳輸)給消費(fèi)者,如果沒(méi)有消費(fèi)者在等待接收元素,transfer方法將元素存放在隊(duì)列的tail節(jié)點(diǎn),并等待該元素被消費(fèi)者消費(fèi)了才返回.
-
tryTransfer方法
tryTransfer方法用來(lái)試探生產(chǎn)者傳入元素是否能夠直接傳遞給消費(fèi)者,如果沒(méi)有消費(fèi)者等待接收元素.則返回false, 和transfer方法的區(qū)別是tryTransfer方法無(wú)論消費(fèi)者是否接收,方法立即返回,而transfer需要等待消費(fèi)者消費(fèi)了才返回.
LinkedBlockingDeque
LinkedBlockingDeque
是一由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列,所謂雙向隊(duì)列指的是可以從隊(duì)列兩端插入和移除元素,雙端隊(duì)列因?yàn)槎嗔艘粋€(gè)操作隊(duì)列的入口,在多線程同時(shí)入隊(duì)時(shí),也就減少了一般競(jìng)爭(zhēng).相比其他阻塞隊(duì)列,LinkedBlockingDeque
多了addFirst, addLast,offerFirst,offerLast,peekFirst,peekLast等方法.
在初始化LinkedBlockingDeque
時(shí)可以設(shè)置容量防止其過(guò)渡膨脹, 另外,雙向阻塞隊(duì)列可以運(yùn)行在"工作竊取"模式中.
阻塞隊(duì)列實(shí)現(xiàn)的原理
通知模式實(shí)現(xiàn): 所謂通知模式,就是當(dāng)生產(chǎn)者從滿的隊(duì)列里添加元素時(shí)會(huì)阻塞生產(chǎn)者,而當(dāng)消費(fèi)者消費(fèi)了一個(gè)隊(duì)列中的元素后,就會(huì)通知生產(chǎn)者當(dāng)前隊(duì)列可用. ArrayBlockingQueue使用ReentrantLock和Condition實(shí)現(xiàn).
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
}
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
當(dāng)往隊(duì)列里插入一個(gè)元素時(shí),如果隊(duì)列不可用,那么阻塞生產(chǎn)者主要通過(guò)LockSupport.part(this)實(shí)現(xiàn):
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
然后看看LockSupport的源碼:發(fā)現(xiàn)調(diào)研setBlocker先保存一下將要阻塞的線程,然后代用unsafe.park阻塞當(dāng)前線程:
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, 0L);
setBlocker(t, null);
}
park是個(gè)native方法,會(huì)阻塞當(dāng)前線程,只有以下四種情況中一種發(fā)生時(shí),該返回才會(huì)返回.
- 與park相對(duì)的unpark執(zhí)行或者已經(jīng)執(zhí)行. "已經(jīng)執(zhí)行"是指執(zhí)行unpark,再執(zhí)行park的情況
- 線程被中斷時(shí)
- 等待完time參數(shù)指定的毫秒數(shù)時(shí)
- 異踌艄唬現(xiàn)象發(fā)生時(shí),這個(gè)異逞雇簦現(xiàn)象沒(méi)有任何原因