阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列苗膝。這兩個(gè)附加的操作是:在隊(duì)列為空時(shí)束凑,獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强辗斩椤.?dāng)隊(duì)列滿時(shí)枪芒,存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用。阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景挺庞,生產(chǎn)者是往隊(duì)列里添加元素的線程晰赞,消費(fèi)者是從隊(duì)列里拿元素的線程。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器选侨,而消費(fèi)者也只從容器里拿元素掖鱼。
阻塞隊(duì)列提供了四種處理方法:
拋出異常:是指當(dāng)阻塞隊(duì)列滿時(shí)候,再往隊(duì)列里插入元素援制,會(huì)拋出IllegalStateException(“Queue full”)異常戏挡。當(dāng)隊(duì)列為空時(shí),從隊(duì)列里獲取元素時(shí)會(huì)拋出NoSuchElementException異常 晨仑。
返回特殊值:插入方法會(huì)返回是否成功褐墅,成功則返回true。移除方法洪己,則是從隊(duì)列里拿出一個(gè)元素妥凳,如果沒(méi)有則返回null
一直阻塞:當(dāng)阻塞隊(duì)列滿時(shí),如果生產(chǎn)者線程往隊(duì)列里put元素答捕,隊(duì)列會(huì)一直阻塞生產(chǎn)者線程逝钥,直到拿到數(shù)據(jù),或者響應(yīng)中斷退出拱镐。當(dāng)隊(duì)列空時(shí)艘款,消費(fèi)者線程試圖從隊(duì)列里take元素持际,隊(duì)列也會(huì)阻塞消費(fèi)者線程,直到隊(duì)列可用哗咆。
超時(shí)退出:當(dāng)阻塞隊(duì)列滿時(shí)蜘欲,隊(duì)列會(huì)阻塞生產(chǎn)者線程一段時(shí)間,如果超過(guò)一定的時(shí)間岳枷,生產(chǎn)者線程就會(huì)退出芒填。
Java里的阻塞隊(duì)列
JDK7提供了7個(gè)阻塞隊(duì)列。分別是
ArrayBlockingQueue :一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列空繁。
LinkedBlockingQueue :一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列。
PriorityBlockingQueue :一個(gè)支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列朱庆。
DelayQueue:一個(gè)使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無(wú)界阻塞隊(duì)列盛泡。
SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。
LinkedTransferQueue:一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞隊(duì)列娱颊。
LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列傲诵。
ArrayBlockingQueue
ArrayBlockingQueue是一個(gè)用數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列。此隊(duì)列按照先進(jìn)先出(FIFO)的原則對(duì)元素進(jìn)行排序箱硕。默認(rèn)情況下不保證訪問(wèn)者公平的訪問(wèn)隊(duì)列拴竹,所謂公平訪問(wèn)隊(duì)列是指阻塞的所有生產(chǎn)者線程或消費(fèi)者線程,當(dāng)隊(duì)列可用時(shí)剧罩,可以按照阻塞的先后順序訪問(wèn)隊(duì)列栓拜,即先阻塞的生產(chǎn)者線程,可以先往隊(duì)列里插入元素惠昔,先阻塞的消費(fèi)者線程幕与,可以先從隊(duì)列里獲取元素。通常情況下為了保證公平性會(huì)降低吞吐量镇防。我們可以使用以下代碼創(chuàng)建一個(gè)公平的阻塞隊(duì)列:
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
訪問(wèn)者的公平性是使用可重入鎖實(shí)現(xiàn)的啦鸣,代碼如下:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
LinkedBlockingQueue
LinkedBlockingQueue是一個(gè)用鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列。此隊(duì)列的默認(rèn)和最大長(zhǎng)度為Integer.MAX_VALUE来氧。此隊(duì)列按照先進(jìn)先出的原則對(duì)元素進(jìn)行排序诫给。
PriorityBlockingQueue
PriorityBlockingQueue是一個(gè)支持優(yōu)先級(jí)的無(wú)界隊(duì)列。默認(rèn)情況下元素采取自然順序排列啦扬,也可以通過(guò)比較器comparator來(lái)指定元素的排序規(guī)則中狂。元素按照升序排列。
DelayQueue
DelayQueue是一個(gè)支持延時(shí)獲取元素的無(wú)界阻塞隊(duì)列考传。隊(duì)列使用PriorityQueue來(lái)實(shí)現(xiàn)吃型。隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口,在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素僚楞。只有在延遲期滿時(shí)才能從隊(duì)列中提取元素勤晚。我們可以將DelayQueue運(yùn)用在以下應(yīng)用場(chǎng)景:
緩存系統(tǒng)的設(shè)計(jì):可以用DelayQueue保存緩存元素的有效期枉层,使用一個(gè)線程循環(huán)查詢DelayQueue,一旦能從DelayQueue中獲取元素時(shí)赐写,表示緩存有效期到了鸟蜡。
-
定時(shí)任務(wù)調(diào)度。使用DelayQueue保存當(dāng)天將會(huì)執(zhí)行的任務(wù)和執(zhí)行時(shí)間挺邀,一旦從DelayQueue中獲取到任務(wù)就開始執(zhí)行揉忘,從比如TimerQueue就是使用DelayQueue實(shí)現(xiàn)的。
隊(duì)列中的Delayed必須實(shí)現(xiàn)compareTo來(lái)指定元素的順序端铛。比如讓延時(shí)時(shí)間最長(zhǎng)的放在隊(duì)列的末尾泣矛。
實(shí)現(xiàn)代碼如下:
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask x = (ScheduledFutureTask)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
如何實(shí)現(xiàn)Delayed接口
我們可以參考ScheduledThreadPoolExecutor里ScheduledFutureTask類。這個(gè)類實(shí)現(xiàn)了Delayed接口禾蚕。首先:在對(duì)象創(chuàng)建的時(shí)候您朽,使用time記錄前對(duì)象什么時(shí)候可以使用,代碼如下:
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
然后使用getDelay可以查詢當(dāng)前元素還需要延時(shí)多久换淆,代碼如下:
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}
通過(guò)構(gòu)造函數(shù)可以看出延遲時(shí)間參數(shù)ns的單位是納秒哗总,自己設(shè)計(jì)的時(shí)候最好使用納秒,因?yàn)間etDelay時(shí)可以指定任意單位倍试,一旦以納秒作為單位讯屈,而延時(shí)的時(shí)間又精確不到納秒就麻煩了。使用時(shí)請(qǐng)注意當(dāng)time小于當(dāng)前時(shí)間時(shí)县习,getDelay會(huì)返回負(fù)數(shù)涮母。
如何實(shí)現(xiàn)延時(shí)隊(duì)列
延時(shí)隊(duì)列的實(shí)現(xiàn)很簡(jiǎn)單,當(dāng)消費(fèi)者從隊(duì)列里獲取元素時(shí)准颓,如果元素沒(méi)有達(dá)到延時(shí)時(shí)間哈蝇,就阻塞當(dāng)前線程。
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return q.poll();
else if (leader != null)
available.await();
SynchronousQueue
SynchronousQueue是一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列攘已。每一個(gè)put操作必須等待一個(gè)take操作炮赦,否則不能繼續(xù)添加元素。SynchronousQueue可以看成是一個(gè)傳球手样勃,負(fù)責(zé)把生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費(fèi)者線程举畸。隊(duì)列本身并不存儲(chǔ)任何元素蓉驹,非常適合于傳遞性場(chǎng)景,比如在一個(gè)線程中使用的數(shù)據(jù)慨蓝,傳遞給另外一個(gè)線程使用京革,SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。
LinkedTransferQueue
LinkedTransferQueue是一個(gè)由鏈表結(jié)構(gòu)組成的無(wú)界阻塞TransferQueue隊(duì)列辫樱。相對(duì)于其他阻塞隊(duì)列峭拘,LinkedTransferQueue多了tryTransfer和transfer方法。
transfer方法
如果當(dāng)前有消費(fèi)者正在等待接收元素(消費(fèi)者使用take()方法或帶時(shí)間限制的poll()方法時(shí)),transfer方法可以把生產(chǎn)者傳入的元素立刻transfer(傳輸)給消費(fèi)者鸡挠。如果沒(méi)有消費(fèi)者在等待接收元素辉饱,transfer方法會(huì)將元素存放在隊(duì)列的tail節(jié)點(diǎn),并等到該元素被消費(fèi)者消費(fèi)了才返回拣展。transfer方法的關(guān)鍵代碼如下:
Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);
第一行代碼是試圖把存放當(dāng)前元素的s節(jié)點(diǎn)作為tail節(jié)點(diǎn)彭沼。第二行代碼是讓CPU自旋等待消費(fèi)者消費(fèi)元素。因?yàn)樽孕龝?huì)消耗CPU备埃,所以自旋一定的次數(shù)后使用Thread.yield()方法來(lái)暫停當(dāng)前正在執(zhí)行的線程姓惑,并執(zhí)行其他線程。
tryTransfer方法
tryTransfer方法則是用來(lái)試探下生產(chǎn)者傳入的元素是否能直接傳給消費(fèi)者按脚。如果沒(méi)有消費(fèi)者等待接收元素于毙,則返回false。和transfer方法的區(qū)別是tryTransfer方法無(wú)論消費(fèi)者是否接收辅搬,方法立即返回望众。而transfer方法是必須等到消費(fèi)者消費(fèi)了才返回。
對(duì)于帶有時(shí)間限制的tryTransfer(E e, long timeout, TimeUnit unit)方法伞辛,則是試圖把生產(chǎn)者傳入的元素直接傳給消費(fèi)者,但是如果沒(méi)有消費(fèi)者消費(fèi)該元素則等待指定的時(shí)間再返回夯缺,如果超時(shí)還沒(méi)消費(fèi)元素蚤氏,則返回false,如果在超時(shí)時(shí)間內(nèi)消費(fèi)了元素踊兜,則返回true竿滨。
LinkedBlockingDeque
LinkedBlockingDeque是一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。所謂雙向隊(duì)列指的你可以從隊(duì)列的兩端插入和移出元素捏境。雙端隊(duì)列因?yàn)槎嗔艘粋€(gè)操作隊(duì)列的入口于游,在多線程同時(shí)入隊(duì)時(shí),也就減少了一半的競(jìng)爭(zhēng)垫言。相比其他的阻塞隊(duì)列贰剥,LinkedBlockingDeque多了addFirst,addLast筷频,offerFirst蚌成,offerLast,peekFirst凛捏,peekLast等方法担忧,以First單詞結(jié)尾的方法,表示插入坯癣,獲绕渴ⅰ(peek)或移除雙端隊(duì)列的第一個(gè)元素。以Last單詞結(jié)尾的方法,表示插入惩猫,獲取或移除雙端隊(duì)列的最后一個(gè)元素芝硬。另外插入方法add等同于addLast,移除方法remove等效于removeFirst帆锋。但是take方法卻等同于takeFirst吵取,不知道是不是Jdk的bug,使用時(shí)還是用帶有First和Last后綴的方法更清楚锯厢。
在初始化LinkedBlockingDeque時(shí)可以設(shè)置容量防止其過(guò)渡膨脹皮官。另外雙向阻塞隊(duì)列可以運(yùn)用在“工作竊取”模式中。
阻塞隊(duì)列的實(shí)現(xiàn)原理
如果隊(duì)列是空的实辑,消費(fèi)者會(huì)一直等待捺氢,當(dāng)生產(chǎn)者添加元素時(shí)候,消費(fèi)者是如何知道當(dāng)前隊(duì)列有元素的呢剪撬?如果讓你來(lái)設(shè)計(jì)阻塞隊(duì)列你會(huì)如何設(shè)計(jì)摄乒,讓生產(chǎn)者和消費(fèi)者能夠高效率的進(jìn)行通訊呢?讓我們先來(lái)看看JDK是如何實(shí)現(xiàn)的残黑。
使用通知模式實(shí)現(xiàn)馍佑。所謂通知模式,就是當(dāng)生產(chǎn)者往滿的隊(duì)列里添加元素時(shí)會(huì)阻塞住生產(chǎn)者梨水,當(dāng)消費(fèi)者消費(fèi)了一個(gè)隊(duì)列中的元素后拭荤,會(huì)通知生產(chǎn)者當(dāng)前隊(duì)列可用。通過(guò)查看JDK源碼發(fā)現(xiàn)ArrayBlockingQueue使用了Condition來(lái)實(shí)現(xiàn)疫诽,代碼如下:
private final Condition notFull;
private final Condition notEmpty;
public ArrayBlockingQueue(int capacity, boolean fair) {
//省略其他代碼
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
當(dāng)我們往隊(duì)列里插入一個(gè)元素時(shí)舅世,如果隊(duì)列不可用,阻塞生產(chǎn)者主要通過(guò)LockSupport.park(this)來(lái)實(shí)現(xiàn)
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
繼續(xù)進(jìn)入源碼奇徒,發(fā)現(xiàn)調(diào)用setBlocker先保存下將要阻塞的線程雏亚,然后調(diào)用unsafe.park阻塞當(dāng)前線程。
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
unsafe.park(false, 0L);
setBlocker(t, null);
}
unsafe.park是個(gè)native方法摩钙,代碼如下:
public native void park(boolean isAbsolute, long time);
park這個(gè)方法會(huì)阻塞當(dāng)前線程罢低,只有以下四種情況中的一種發(fā)生時(shí),該方法才會(huì)返回腺律。
與park對(duì)應(yīng)的unpark執(zhí)行或已經(jīng)執(zhí)行時(shí)奕短。注意:已經(jīng)執(zhí)行是指unpark先執(zhí)行,然后再執(zhí)行的park匀钧。
線程被中斷時(shí)翎碑。
如果參數(shù)中的time不是零,等待了指定的毫秒數(shù)時(shí)之斯。
發(fā)生異橙砧荆現(xiàn)象時(shí)。這些異常事先無(wú)法確定。
我們繼續(xù)看一下JVM是如何實(shí)現(xiàn)park方法的莉擒,park在不同的操作系統(tǒng)使用不同的方式實(shí)現(xiàn)酿炸,在linux下是使用的是系統(tǒng)方法pthread_cond_wait實(shí)現(xiàn)。實(shí)現(xiàn)代碼在JVM源碼路徑src/os/linux/vm/os_linux.cpp里的 os::platformEvent::park方法涨冀,代碼如下:
void os::PlatformEvent::park() {
int v ;
for (;;) {
v = _Event ;
if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
}
guarantee (v >= 0, "invariant") ;
if (v == 0) {
// Do this the hard way by blocking ...
int status = pthread_mutex_lock(_mutex);
assert_status(status == 0, status, "mutex_lock");
guarantee (_nParked == 0, "invariant") ;
++ _nParked ;
while (_Event < 0) {
status = pthread_cond_wait(_cond, _mutex);
// for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
// Treat this the same as if the wait was interrupted
if (status == ETIME) { status = EINTR; }
assert_status(status == 0 || status == EINTR, status, "cond_wait");
}
-- _nParked ;
// In theory we could move the ST of 0 into _Event past the unlock(),
// but then we'd need a MEMBAR after the ST.
_Event = 0 ;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "mutex_unlock");
}
guarantee (_Event >= 0, "invariant") ;
}
}
pthread_cond_wait是一個(gè)多線程的條件變量函數(shù)填硕,cond是condition的縮寫,字面意思可以理解為線程在等待一個(gè)條件發(fā)生鹿鳖,這個(gè)條件是一個(gè)全局變量扁眯。這個(gè)方法接收兩個(gè)參數(shù),一個(gè)共享變量_cond翅帜,一個(gè)互斥量_mutex姻檀。而unpark方法在linux下是使用pthread_cond_signal實(shí)現(xiàn)的。park 在windows下則是使用WaitForSingleObject實(shí)現(xiàn)的涝滴。
當(dāng)隊(duì)列滿時(shí)绣版,生產(chǎn)者往阻塞隊(duì)列里插入一個(gè)元素,生產(chǎn)者線程會(huì)進(jìn)入WAITING (parking)狀態(tài)歼疮。我們可以使用jstack dump阻塞的生產(chǎn)者線程看到這點(diǎn):
"main" prio=5 tid=0x00007fc83c000000 nid=0x10164e000 waiting on condition [0x000000010164d000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000140559fe8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:324)
at blockingqueue.ArrayBlockingQueueTest.main(ArrayBlockingQueueTest.java:11)