1 什么是阻塞隊列
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列剖膳。
- 1)支持阻塞的插入方法:意思是當(dāng)隊列滿時而咆,隊列會阻塞插入元素的線程,直到隊列不滿蔑赘。
- 2)支持阻塞的移除方法:意思是在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强铡?/li>
阻塞隊列常用于生產(chǎn)者和消費者的場景,生產(chǎn)者是向隊列里添加元素的線程叼架,消費者是從隊列里取元素的線程畔裕。阻塞隊列就是生產(chǎn)者用來存放元素、消費者用來獲取元素的容器乖订。
在阻塞隊列不可用時扮饶,這兩個附加操作提供了4種處理方式
- 拋出異常:當(dāng)隊列滿時,如果再往隊列里插入元素乍构,會拋出IllegalStateException("Queue full")異常甜无。當(dāng)隊列空時,從隊列里獲取元素會拋出NoSuchElementException異常哥遮。
- 返回特殊值:當(dāng)往隊列插入元素時岂丘,會返回元素是否插入成功,成功返回true眠饮。如果是移除方法元潘,則是從隊列里取出一個元素,如果沒有則返回null君仆。
- 一直阻塞:當(dāng)阻塞隊列滿時翩概,如果生產(chǎn)者線程往隊列里put元素,隊列會一直阻塞生產(chǎn)者線程返咱,直到隊列可用或者響應(yīng)中斷退出钥庇。當(dāng)隊列空時,如果消費者線程從隊列里take元素咖摹,隊列會阻塞住消費者線程评姨,直到隊列不為空。
- 超時退出:當(dāng)阻塞隊列滿時萤晴,如果生產(chǎn)者線程往隊列里插入元素吐句,隊列會阻塞生產(chǎn)者線程一段時間,如果超過了指定的時間店读,生產(chǎn)者線程就會退出嗦枢。
注意 如果是無界阻塞隊列,隊列不可能會出現(xiàn)滿的情況屯断,所以使用put或offer方法永遠(yuǎn)不會被阻塞文虏,而且使用offer方法時,該方法永遠(yuǎn)返回true殖演。
2 Java里的阻塞隊列
JDK 7提供了7個阻塞隊列氧秘,如下
- ArrayBlockingQueue:一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列。
- LinkedBlockingQueue:一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列趴久。
- PriorityBlockingQueue:一個支持優(yōu)先級排序的無界阻塞隊列丸相。
- DelayQueue:一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列彼棍。
- LinkedTransferQueue:一個由鏈表結(jié)構(gòu)組成的無界阻塞隊列灭忠。
- LinkedBlockingDeque:一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列膳算。
2.1 ArrayBlockingQueue
- ArrayBlockingQueue是一個用數(shù)組實現(xiàn)的有界阻塞隊列。
- 此隊列按照先進(jìn)先出(FIFO)的原則對元素進(jìn)行排序更舞。
- 默認(rèn)情況下不保證線程公平的訪問隊列,所謂公平訪問隊列是指阻塞的線程坎吻,可以按照阻塞的先后順序訪問隊列缆蝉,即先阻塞線程先訪問隊列。
- 非公平性是對先等待的線程是非公平的瘦真,當(dāng)隊列可用時刊头,阻塞的線程都可以爭奪訪問隊列的資格,有可能先阻塞的線程最后才訪問隊列诸尽。
- 為了保證公平性原杂,通常會降低吞吐量。我們可以使用以下代碼創(chuàng)建一個公平的阻塞隊列您机。
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
訪問者的公平性是使用可重入鎖實現(xiàn)的穿肄,代碼如下。
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
2.2 LinkedBlockingQueue
LinkedBlockingQueue是一個用鏈表實現(xiàn)的有界阻塞隊列际看。此隊列的默認(rèn)和最大長度Integer.MAX_VALUE咸产。此隊列按照先進(jìn)先出的原則對元素進(jìn)行排序。
2.3 PriorityBlockingQueue
- PriorityBlockingQueue是一個支持優(yōu)先級的無界阻塞隊列仲闽。
- 默認(rèn)情況下元素采取自然順序升序排列脑溢。也可以自定義類實現(xiàn)compareTo()方法來指定元素排序規(guī)則,或者初始化PriorityBlockingQueue時赖欣,指定構(gòu)造參數(shù)Comparator來對元素進(jìn)行排序屑彻。需要注意的是不能保證同優(yōu)先級元素的順序。
- 默認(rèn)情況下元素采取自然順序升序排列顶吮。也可以自定義類實現(xiàn)compareTo()方法來指定元素排序規(guī)則社牲,或者初始化PriorityBlockingQueue時,指定構(gòu)造參數(shù)Comparator來對元素進(jìn)行排序悴了。需要注意的是不能保證同優(yōu)先級元素的順序膳沽。
2.4 DelayQueue
- DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現(xiàn)让禀。
- 隊列中的元素必須實現(xiàn)Delayed接口验残,在創(chuàng)建元素時可以指定多久才能從隊列中獲取當(dāng)前元素。只有在延遲期滿時才能從隊列中提取元素盈咳。
DelayQueue非常有用油够,可以將DelayQueue運用在以下應(yīng)用場景。
- 緩存系統(tǒng)的設(shè)計:可以用DelayQueue保存緩存元素的有效期腮敌,使用一個線程循環(huán)查詢DelayQueue阱当,一旦能從DelayQueue中獲取元素時俏扩,表示緩存有效期到了。
- 定時任務(wù)調(diào)度:使用DelayQueue保存當(dāng)天將會執(zhí)行的任務(wù)和執(zhí)行時間弊添,一旦從DelayQueue中獲取到任務(wù)就開始執(zhí)行录淡,比如TimerQueue就是使用DelayQueue實現(xiàn)的。
2.4.1 如何實現(xiàn)Delayed接口
DelayQueue隊列的元素必須實現(xiàn)Delayed接口油坝。我們可以參考ScheduledThreadPoolExecutor里ScheduledFutureTask類的實現(xiàn)嫉戚,一共有三步。
第一步:在對象創(chuàng)建的時候澈圈,初始化基本數(shù)據(jù)彬檀。使用time記錄當(dāng)前對象延遲到什么時候可以使用,使用sequenceNumber來標(biāo)識元素在隊列中的先后順序瞬女。
private static final AtomicLong sequencer = new AtomicLong(0);
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
第二步:實現(xiàn)getDelay方法窍帝,該方法返回當(dāng)前元素還需要延時多長時間,單位是納秒
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}
通過構(gòu)造函數(shù)可以看出延遲時間參數(shù)ns的單位是納秒诽偷,自己設(shè)計的時候最好使用納秒坤学,因為實現(xiàn)getDelay()方法時可以指定任意單位,一旦以秒或分作為單位报慕,而延時時間又精確不到納秒就麻煩了拥峦。使用時請注意當(dāng)time小于當(dāng)前時間時,getDelay會返回負(fù)數(shù)卖子。
第三步:實現(xiàn)compareTo方法來指定元素的順序略号。
例如,讓延時時間最長的放在隊列的末尾洋闽。
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<> x = (ScheduledFutureTask<>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) 0 : ((d < 0) -1 : 1);
}
2.4.2 如何實現(xiàn)延時阻塞隊列
延時阻塞隊列的實現(xiàn)很簡單玄柠,當(dāng)消費者從隊列里獲取元素時,如果元素沒有達(dá)到延時時間诫舅,就阻塞當(dāng)前線程羽利。
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return q.poll();
else if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
代碼中的變量leader是一個等待獲取隊列頭部元素的線程。如果leader不等于空刊懈,表示已經(jīng)有線程在等待獲取隊列的頭元素这弧。所以,使用await()方法讓當(dāng)前線程等待信號虚汛。如果leader等于空匾浪,則把當(dāng)前線程設(shè)置成leader,并使用awaitNanos()方法讓當(dāng)前線程等待接收信號或等待delay時間卷哩。
2.5 SynchronousQueue
- SynchronousQueue是一個不存儲元素的阻塞隊列蛋辈。每一個put操作必須等待一個take操作,否則不能繼續(xù)添加元素。
- 它支持公平訪問隊列冷溶。默認(rèn)情況下線程采用非公平性策略訪問隊列渐白。
- 使用以下構(gòu)造方法可以創(chuàng)建公平性訪問的SynchronousQueue,如果設(shè)置為true逞频,則等待的線程會采用先進(jìn)先出的順序訪問隊列纯衍。
public SynchronousQueue(boolean fair) {
transferer = fair new TransferQueue() : new TransferStack();
}
SynchronousQueue可以看成是一個傳球手,負(fù)責(zé)把生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費者線程苗胀。隊列本身并不存儲任何元素襟诸,非常適合傳遞性場景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue柒巫。
2.6 LinkedTransferQueue
LinkedTransferQueue是一個由鏈表結(jié)構(gòu)組成的無界阻塞TransferQueue隊列励堡。
相對于其他阻塞隊列谷丸,LinkedTransferQueue多了tryTransfer和transfer方法堡掏。
2.6.1 transfer方法
- 如果當(dāng)前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法時),transfer方法可以把生產(chǎn)者傳入的元素立刻transfer(傳輸)給消費者刨疼。
- 如果沒有消費者在等待接收元素泉唁,transfer方法會將元素存放在隊列的tail節(jié)點,并等到該元素被消費者消費了才返回揩慕。
transfer方法的關(guān)鍵代碼如下亭畜。
Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);
- 第一行代碼是試圖把存放當(dāng)前元素的s節(jié)點作為tail節(jié)點。
- 第二行代碼是讓CPU自旋等待消費者消費元素迎卤。因為自旋會消耗CPU拴鸵,所以自旋一定的次數(shù)后使用Thread.yield()方法來暫停當(dāng)前正在執(zhí)行的線程,并執(zhí)行其他線程蜗搔。
2.6.2 tryTransfer方法
- tryTransfer方法是用來試探生產(chǎn)者傳入的元素是否能直接傳給消費者劲藐。
- 如果沒有消費者等待接收元素,則返回false樟凄。
- 和transfer方法的區(qū)別是tryTransfer方法無論消費者是否接收聘芜,方法立即返回,而transfer方法是必須等到
消費者消費了才返回缝龄。 - 對于帶有時間限制的tryTransfer(E e汰现,long timeout,TimeUnit unit)方法叔壤,試圖把生產(chǎn)者傳入的元素直接傳給消費者瞎饲,但是如果沒有消費者消費該元素則等待指定的時間再返回,如果超時還沒消費元素炼绘,則返回false企软,如果在超時時間內(nèi)消費了元素,則返回true饭望。
2.7 LinkedBlockingDeque
LinkedBlockingDeque是一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列仗哨。所謂雙向隊列指的是可以從隊列的兩端插入和移出元素形庭。雙向隊列因為多了一個操作隊列的入口,在多線程同時入隊時厌漂,也就減少了一半的競爭萨醒。相比其他的阻塞隊列,LinkedBlockingDeque多了addFirst苇倡、addLast富纸、offerFirst、offerLast旨椒、peekFirst和peekLast等方法晓褪,以First單詞結(jié)尾的方法,表示插入综慎、獲然练隆(peek)或移除雙端隊列的第一個元素。以Last單詞結(jié)尾的方法示惊,表示插入好港、獲取或移除雙端隊列的最后一個元素。另外米罚,插入方法add等同于addLast钧汹,移除方法remove等效于removeFirst。但是take方法卻等同于takeFirst录择,不知道是不是JDK的bug拔莱,使用時還是用帶有First和Last后綴的方法更清楚。 在初始化LinkedBlockingDeque時可以設(shè)置容量防止其過度膨脹隘竭。另外塘秦,雙向阻塞隊列可以運用在“工作竊取”模式中。
3 阻塞隊列的實現(xiàn)原理
- 如果隊列是空的货裹,消費者會一直等待嗤形,當(dāng)生產(chǎn)者添加元素時,消費者是如何知道當(dāng)前隊列有元素的呢弧圆?如果讓你來設(shè)計阻塞隊列你會如何設(shè)計赋兵,如何讓生產(chǎn)者和消費者進(jìn)行高效率的通信呢?
- 讓我們先來看看JDK是如何實現(xiàn)的搔预。 使用通知模式實現(xiàn)霹期。所謂通知模式,就是當(dāng)生產(chǎn)者往滿的隊列里添加元素時會阻塞住生產(chǎn)者拯田,當(dāng)消費者消費了一個隊列中的元素后历造,會通知生產(chǎn)者當(dāng)前隊列可用。
通過查看JDK源碼發(fā)現(xiàn)ArrayBlockingQueue使用了Condition來實現(xiàn),代碼如下吭产。
private final Condition notFull;
private final Condition notEmpty;
public ArrayBlockingQueue(int capacity, boolean fair) {
// 省略其他代碼
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
當(dāng)往隊列里插入一個元素時侣监,如果隊列不可用,那么阻塞生產(chǎn)者主要通過LockSupport.park(this)來實現(xiàn)臣淤。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
繼續(xù)進(jìn)入源碼橄霉,發(fā)現(xiàn)調(diào)用setBlocker先保存一下將要阻塞的線程,然后調(diào)用unsafe.park阻塞當(dāng)前線程邑蒋。
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
unsafe.park(false, 0L);
setBlocker(t, null);
}
unsafe.park是個native方法姓蜂,代碼如下。
public native void park(boolean isAbsolute, long time);
park這個方法會阻塞當(dāng)前線程医吊,只有以下4種情況中的一種發(fā)生時钱慢,該方法才會返回。
- 與park對應(yīng)的unpark執(zhí)行或已經(jīng)執(zhí)行時卿堂∈“已經(jīng)執(zhí)行”是指unpark先執(zhí)行,然后再執(zhí)行park的情況御吞。
- 線程被中斷時麦箍。
- 等待完time參數(shù)指定的毫秒數(shù)時漓藕。
- 異程罩椋現(xiàn)象發(fā)生時,這個異诚沓現(xiàn)象沒有任何原因揍诽。
繼續(xù)看一下JVM是如何實現(xiàn)park方法:park在不同的操作系統(tǒng)中使用不同的方式實現(xiàn),在Linux下使用的是系統(tǒng)方法pthread_cond_wait實現(xiàn)栗竖。
現(xiàn)代碼在JVM源碼路徑src/os/linux/vm/os_linux.cpp里的os::PlatformEvent::park方法暑脆,代碼如下。
void os::PlatformEvent::park() {
int v ;
for (;;) {
v = _Event ;
if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
}
guarantee (v >= 0, "invariant") ;
if (v == 0) {
// Do this the hard way by blocking ...
int status = pthread_mutex_lock(_mutex);
assert_status(status == 0, status, "mutex_lock");
guarantee (_nParked == 0, "invariant") ;
++ _nParked ;
while (_Event < 0) {
status = pthread_cond_wait(_cond, _mutex);
// for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
// Treat this the same as if the wait was interrupted
if (status == ETIME) { status = EINTR; }
assert_status(status == 0 || status == EINTR, status, "cond_wait");
}
-- _nParked ;
// In theory we could move the ST of 0 into _Event past the unlock(),
// but then we'd need a MEMBAR after the ST.
_Event = 0 ;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "mutex_unlock");
}
guarantee (_Event >= 0, "invariant") ;
}
}
pthread_cond_wait是一個多線程的條件變量函數(shù)狐肢,cond是condition的縮寫添吗,字面意思可以理解為線程在等待一個條件發(fā)生,這個條件是一個全局變量份名。
這個方法接收兩個參數(shù):一個共享變量_cond碟联,一個互斥量_mutex。而unpark方法在Linux下是使用pthread_cond_signal實現(xiàn)的僵腺。
park方法在Windows下則是使用WaitForSingleObject實現(xiàn)的鲤孵。想知道pthread_cond_wait是如何實現(xiàn)的,可以參考glibc-2.5的nptl/sysdeps/pthread/pthread_cond_wait.c辰如。
當(dāng)線程被阻塞隊列阻塞時普监,線程會進(jìn)入WAITING(parking)狀態(tài)。我們可以使用jstack dump阻塞的生產(chǎn)者線程看到這點,如下凯正。
"main" prio=5 tid=0x00007fc83c000000 nid=0x10164e000 waiting on condition [0x000000010164d000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000140559fe8> (a java.util.concurrent.locks.
AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.
await(AbstractQueuedSynchronizer.java:2043)
at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:324)
at blockingqueue.ArrayBlockingQueueTest.main(ArrayBlockingQueueTest.java:11)
參考
《java并發(fā)編程的藝術(shù)》