本文摘自《Java并發(fā)編程的藝術(shù)-方騰飛》
本節(jié)將介紹什么是阻塞隊列帽哑,以及Java中阻塞隊列的4種四種處理方式况凉,并介紹Java7(Java8相同)中提供的7種阻塞隊列进鸠,稍后分析阻塞隊列的一種實現(xiàn)方式。
- 更多相關(guān)文章見筆者博客
1. 什么是阻塞隊列
? 阻塞隊列(BlockingQueue
)是一個支持兩個附加操作的隊列碱屁。這兩個附加的操作支持阻塞的插入和移除方法
? 1. 支持阻塞的插入方法:意思是當(dāng)隊列滿時磷脯,隊列會阻塞插入元素的線程,直到隊列不滿
? 2. 支持阻塞的移除方法:意思是在隊列為空時娩脾,獲取元素的線程會等待隊列變?yōu)榉强?/em>
? 阻塞隊列常用于生產(chǎn)者和消費者的場景赵誓,生產(chǎn)者是向隊列里添加元素的線程,消費者是從隊列里取元素的線程柿赊。阻塞隊列就是生產(chǎn)者用來存放元素俩功、消費者用來獲取元素的容器
? 在阻塞隊列不可用時,這兩個附加操作提供了4種處理方式碰声,如表1所示
- 插入和移除操作的 4 中處理方式
方法 / 處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove(e) | poll() | take() | poll(time,unit) |
檢查方法 | element() | peek() | 不可用 | 不可用 |
-
拋出異常:當(dāng)隊列滿時绑雄,如果再往隊列里插入元素,會拋出
IllegalStateException("Queue full")
異常奥邮。當(dāng)隊列空時,從隊列里獲取元素會拋出NoSuchElementException
異常 -
返回特殊值:當(dāng)往隊列插入元素時罗珍,會返回元素是否插入成功洽腺,成功返回 true。如果是移除方法覆旱,則是從隊列里取出一個元素蘸朋,如果沒有則返回
null
-
一直阻塞:當(dāng)阻塞隊列滿時,如果生產(chǎn)者線程往隊列里
put
元素扣唱,隊列會一直阻塞生產(chǎn)者線程藕坯,直到隊列可用或者響應(yīng)中斷退出团南。當(dāng)隊列空時,如果消費者線程從隊列里take
元素炼彪,隊列會阻塞住消費者線程吐根,直到隊列不為空 - 超時退出:當(dāng)阻塞隊列滿時,如果生產(chǎn)者線程往隊列里插入元素辐马,隊列會阻塞生產(chǎn)者線程一段時間拷橘,如果超過了指定的時間,生產(chǎn)者線程就會退出
這兩個附加操作的 4 種處理方式不方便記憶喜爷,所以我找了一下這幾個方法的規(guī)律冗疮。put
和 take
分別尾首含有字母 t
,offer
和 poll
都含有字母 o
注意:如果是無界阻塞隊列檩帐,隊列不可能會出現(xiàn)滿的情況术幔,所以使用
put
或offer
方法永遠(yuǎn)不會被阻塞,而且使用offer
方法時湃密,該方法永遠(yuǎn)返回true
2. Java里的阻塞隊列
JDK7提供了7個阻塞隊列诅挑,如下:
ArrayBlockingQueue
:一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列LinkedBlockingQueue
:一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列PriorityBlockingQueue
:一個支持優(yōu)先級排序的無界阻塞隊列DelayQueue
:一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列SynchronousQueue
:一個不存儲元素的阻塞隊列LinkedTransferQueue
:一個由鏈表結(jié)構(gòu)組成的無界阻塞隊列-
LinkedBlockingDeque
:一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列。
-
ArrayBlockingQueue
- ?
ArrayBlockingQueue
是一個用數(shù)組實現(xiàn)的有界阻塞隊列勾缭。此隊列按照先進先出(FIFO)的原則對元素進行排序 - 默認(rèn)情況下不保證線程公平的訪問隊列揍障,所謂公平訪問隊列是指阻塞的線程,可以按照阻塞的先后順序訪問隊列俩由,即先阻塞線程先訪問隊列毒嫡。非公平性是對先等待的線程是非公平的,當(dāng)隊列可用時幻梯,阻塞的線程都可以爭奪訪問隊列的資格兜畸,有可能先阻塞的線程最后才訪問隊列。為了保證公平性碘梢,通常會降低吞吐量咬摇。我們可以使用以下代碼創(chuàng)建一個公平的阻塞隊列
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue (1000. true) ;
- 訪問者的公平性是使用可重入鎖實現(xiàn)的,代碼如下:
public ArrayBlockingQueue (int capacity, boolean fair){ if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
- ?
-
LinkedBlockingQueue
-
LinkedBlockingQueue
是一個用鏈表實現(xiàn)的有界阻塞隊列煞躬。此隊列的默認(rèn)和最大長度為Integer.MAX VALUE
肛鹏。此隊列按照先進先出的原則對元素進行排序。
-
-
.PriorityBlockingQueue
-
PriorityBlockingQueue
是一個支持優(yōu)先級的無界阻塞隊列恩沛。默認(rèn)情況下元素采取自然順序升序排列在扰。也可以自定義類實現(xiàn)compareTo()
方法來指定元素排序規(guī)則,或者初始化PriorityBlockingQueue
時雷客,指定構(gòu)造參數(shù)Comparator
來對元素進行排序芒珠。需要注意的是不能保證同優(yōu)先級元素的順序。
-
-
DelayQueue
-
DelayQueue
是一個支持延時獲取元素的無界阻塞隊列搅裙。隊列使用PriorityQueue
來實現(xiàn)皱卓。隊列中的元素必須實現(xiàn)Delayed
接口裹芝,在創(chuàng)建元素時可以指定多久才能從隊列中獲取當(dāng)前元素。只有在延遲期滿時才能從隊列中提取元素 -
DelayQueue
非常有用娜汁,可以將DelayQueue
運用在以下應(yīng)用場景:- 緩存系統(tǒng)的設(shè)計:可以用
DelayQueue
保存緩存元素的有效期嫂易,使用一個線程循環(huán)查詢DelayQueue
,一旦能從DelayQueue
中獲取元素時存炮,表示緩存有效期到了 - 定時任務(wù)調(diào)度:使用
DelayQueue
保存當(dāng)天將會執(zhí)行的任務(wù)和執(zhí)行時間炬搭,一旦從DelayQueue中獲取到任務(wù)就開始執(zhí)行,比如TimerQueue
就是使用DelayQueue
實現(xiàn)的
- 緩存系統(tǒng)的設(shè)計:可以用
-
-
SynchronousQueue
-
SynchronousQueue
是一個不存儲元素的阻塞隊列穆桂。每一個put操作必須等待一個take操作宫盔,否則不能繼續(xù)添加元素 - 它支持公平訪問隊列。默認(rèn)情況下線程采用非公平性策略訪問隊列享完。使用以下構(gòu)造方法可以創(chuàng)建公平性訪問的
SynchronousQueue
灼芭,如果設(shè)置為true
,則等待的線程會采用先進先出的順序訪問隊列
public SynchronousQueue(boolean fair){ transferer = fair?new TransferQueue():new TransferStack(); }
-
SynchronousQueue
可以看成是一個傳球手般又,負(fù)責(zé)把生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費者線程彼绷。隊列本身并不存儲任何元素,非常適合傳遞性場景茴迁。SynchronousQueue
的吞吐量高于LinkedBlockingQueue
和ArrayBlockingQueue
-
-
LinkedTransferQueue
-
LinkedTransferQueue
是一個由鏈表結(jié)構(gòu)組成的無界阻塞TransferQueue
隊列寄悯。相對于其他阻塞隊列,LinkedTransferQueue
多了tryTransfer
和transfer
方法堕义。-
transfer
方法如果當(dāng)前有消費者正在等待接收元素(消費者使用
take0
方法或帶時間限制的poll0
方法時)猜旬,transfer
方法可以把生產(chǎn)者傳人的元素立刻transfer
(傳輸)給消費者。如果沒有消費者在等待接收元素倦卖,transfer
方法會將元素存放在隊列的tail
節(jié)點洒擦,并等到該元素被消費者消費了才返回。transfer
方法的關(guān)鍵代碼如下Node pred = tryAppend(S,haveData); return awaitMatch(s,pred,e,(how = TIMED),nanos);
第一行代碼是試圖把存放當(dāng)前元素的s節(jié)點作為tail節(jié)點怕膛。第二行代碼是讓CPU自旋等待消費者消費元素熟嫩。因為自旋會消耗CPU,所以自旋一定的次數(shù)后使用Thread.yield0方法來暫停當(dāng)前正在執(zhí)行的線程褐捻,并執(zhí)行其他線程
-
tryTransfer
方法-
tryTransfer
方法是用來試探生產(chǎn)者傳人的元素是否能直接傳給消費者掸茅。如果沒有消費者等待接收元素,則返回false
柠逞。和transfer
方法的區(qū)別是tryTransfer
方法無論消費者是否接收倦蚪,方法立即返回,而transfer
方法是必須等到消費者消費了才返回边苹。 - 對于帶有時間限制的
tryTransfer(E e,long timeout裁僧,TimeUnit unit)
方法慕购,試圖把生產(chǎn)者傳人的元素直接傳給消費者,但是如果沒有消費者消費該元素則等待指定的時間再返回殿如,如果超時還沒消費元素爱致,則返回false
帮坚,如果在超時時間內(nèi)消費了元素,則返回true
。
-
-
-
-
LinkedBlockingDeque
LinkedBlockingDeque
是一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列。所謂雙向隊列指的是可以從隊列的兩端插入和移出元素。雙向隊列因為多了一個操作隊列的入口,在多線程同時人隊時,也就減少了一半的競爭。相比其他的阻塞隊列勺良,LinkedBlockingDeque
多了addFirst
链蕊、addLast
奏属、offerFirst
勇婴、offerLast
嘱腥、peekFirst
和peekLast
等方法耕渴,以First
單詞結(jié)尾的方法,表示插入齿兔、獲取( peek)或移除雙端隊列的第一個元素橱脸。以Last
單詞結(jié)尾的方法,表示插入分苇、獲取或移除雙端隊列的最后一個元素添诉。另外,插入方法add
等同于addLast
医寿,移除方法remove
等效于removeFirst
栏赴。但是take
方法卻等同于takeFirst
,不知道是不是JDK
的bug
靖秩,使用時還是用帶有First
和Last
后綴的方法更清楚须眷。在初始化
LinkedBlockingDeque
時可以設(shè)置容量防止其過度膨脹。另外沟突,雙向阻塞隊列可以運用在“工作竊取”模式中花颗。