BlockingQueue(阻塞隊列)

注意:該隨筆內(nèi)容完全引自http://wsmajunfeng.iteye.com/blog/1629354筹陵,寫的很好,非常感謝并思,復制過來算是個積累宋彼,怕以后找不到输涕。

一. 前言

在新增的Concurrent包中,BlockingQueue很好的解決了多線程中衣式,如何高效安全“傳輸”數(shù)據(jù)的問題碴卧。通過這些高效并且線程安全的隊列類住册,為我們快速搭建高質(zhì)量的多線程程序帶來極大的便利烫葬。本文詳細介紹了BlockingQueue家庭中的所有成員搭综,包括他們各自的功能以及常見使用場景兑巾。

二. 認識BlockingQueue

阻塞隊列蒋歌,顧名思義委煤,首先它是一個隊列堂油,而一個隊列在數(shù)據(jù)結(jié)構(gòu)中所起的作用大致如下圖所示:


image

從上圖我們可以很清楚看到,通過一個共享的隊列碧绞,可以使得數(shù)據(jù)由隊列的一端輸入府框,從另外一端輸出;

常用的隊列主要有以下兩種:(當然通過不同的實現(xiàn)方式讥邻,還可以延伸出很多不同類型的隊列迫靖,DelayQueue就是其中的一種)

先進先出(FIFO):先插入的隊列的元素也最先出隊列,類似于排隊的功能系宜。從某種程度上來說這種隊列也體現(xiàn)了一種公平性。

后進先出(LIFO):后插入隊列的元素最先出隊列发魄,這種隊列優(yōu)先處理最近發(fā)生的事件盹牧。

多線程環(huán)境中,通過隊列可以很容易實現(xiàn)數(shù)據(jù)共享,比如經(jīng)典的“生產(chǎn)者”和“消費者”模型中汰寓,通過隊列可以很便利地實現(xiàn)兩者之間的數(shù)據(jù)共享口柳。假設(shè)我們有若干生產(chǎn)者線程,另外又有若干個消費者線程踩寇。如果生產(chǎn)者線程需要把準備好的數(shù)據(jù)共享給消費者線程啄清,利用隊列的方式來傳遞數(shù)據(jù),就可以很方便地解決他們之間的數(shù)據(jù)共享問題俺孙。但如果生產(chǎn)者和消費者在某個時間段內(nèi)辣卒,萬一發(fā)生數(shù)據(jù)處理速度不匹配的情況呢?理想情況下睛榄,如果生產(chǎn)者產(chǎn)出數(shù)據(jù)的速度大于消費者消費的速度荣茫,并且當生產(chǎn)出來的數(shù)據(jù)累積到一定程度的時候,那么生產(chǎn)者必須暫停等待一下(阻塞生產(chǎn)者線程)场靴,以便等待消費者線程把累積的數(shù)據(jù)處理完畢啡莉,反之亦然。然而旨剥,在concurrent包發(fā)布以前咧欣,在多線程環(huán)境下,我們每個程序員都必須去自己控制這些細節(jié)轨帜,尤其還要兼顧效率和線程安全魄咕,而這會給我們的程序帶來不小的復雜度。好在此時蚌父,強大的concurrent包橫空出世了哮兰,而他也給我們帶來了強大的BlockingQueue。(在多線程領(lǐng)域:所謂阻塞苟弛,在某些情況下會掛起線程(即阻塞)喝滞,一旦條件滿足,被掛起的線程又會自動被喚醒)膏秫,下面兩幅圖演示了BlockingQueue的兩個常見阻塞場景: *
image

       如上圖所示:當隊列中沒有數(shù)據(jù)的情況下右遭,消費者端的所有線程都會被自動阻塞(掛起),直到有數(shù)據(jù)放入隊列荔睹。**

image

***如上圖所示:當隊列中填滿數(shù)據(jù)的情況下狸演,生產(chǎn)者端的所有線程都會被自動阻塞(掛起),直到隊列中有空的位置僻他,線程被自動喚醒宵距。****

這也是我們在多線程環(huán)境下,為什么需要BlockingQueue的原因吨拗。作為BlockingQueue的使用者满哪,我們再也不需要關(guān)心什么時候需要阻塞線程婿斥,什么時候需要喚醒線程,因為這一切BlockingQueue都給你一手包辦了哨鸭。既然BlockingQueue如此神通廣大民宿,讓我們一起來見識下它的常用方法:

三. BlockingQueue的核心方法

1.放入數(shù)據(jù)

(1)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當前執(zhí)行方法

的線程);      
 ∠窦Α(2)offer(E o, long timeout, TimeUnit unit):可以設(shè)定等待的時間活鹰,如果在指定的時間內(nèi),還不能往隊列中加入BlockingQueue只估,則返回失敗志群。

(3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調(diào)用此方法的線程被阻斷直到BlockingQueue里面有空間再繼續(xù).

2. 獲取數(shù)據(jù)

(1)poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數(shù)規(guī)定的時間,取不到時返回null;

(2)poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定時間內(nèi)蛔钙,隊列一旦有數(shù)據(jù)可取锌云,則立即返回隊列中的數(shù)據(jù)。否則知道時間

超時還沒有數(shù)據(jù)可取吁脱,返回失敗桑涎。

(3)take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態(tài)直到BlockingQueue有新的數(shù)據(jù)被加入;

(4)drainTo():一次性從BlockingQueue獲取所有可用的數(shù)據(jù)對象(還可以指定獲取數(shù)據(jù)的個數(shù)),通過該方法兼贡,可以提升獲取數(shù)據(jù)效率攻冷;不需要多次分批加鎖或釋放鎖。

四. 常見BlockingQueue

在了解了BlockingQueue的基本功能后遍希,讓我們來看看BlockingQueue家庭大致有哪些成員讲衫?

image

1. ArrayBlockingQueue

基于數(shù)組的阻塞隊列實現(xiàn),在ArrayBlockingQueue內(nèi)部孵班,維護了一個定長數(shù)組,以便緩存隊列中的數(shù)據(jù)對象招驴,這是一個常用的阻塞隊列篙程,除了一個定長數(shù)組外,ArrayBlockingQueue內(nèi)部還保存著兩個整形變量别厘,分別標識著隊列的頭部和尾部在數(shù)組中的位置虱饿。

ArrayBlockingQueue在生產(chǎn)者放入數(shù)據(jù)和消費者獲取數(shù)據(jù),都是共用同一個鎖對象触趴,由此也意味著兩者無法真正并行運行氮发,這點尤其不同于LinkedBlockingQueue;按照實現(xiàn)原理來分析冗懦,ArrayBlockingQueue完全可以采用分離鎖爽冕,從而實現(xiàn)生產(chǎn)者和消費者操作的完全并行運行。Doug Lea之所以沒這樣去做披蕉,也許是因為ArrayBlockingQueue的數(shù)據(jù)寫入和獲取操作已經(jīng)足夠輕巧颈畸,以至于引入獨立的鎖機制乌奇,除了給代碼帶來額外的復雜性外,其在性能上完全占不到任何便宜眯娱。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在于礁苗,前者在插入或刪除元素時不會產(chǎn)生或銷毀任何額外的對象實例,而后者則會生成一個額外的Node對象徙缴。這在長時間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的系統(tǒng)中试伙,其對于GC的影響還是存在一定的區(qū)別。而在創(chuàng)建ArrayBlockingQueue時于样,我們還可以控制對象的內(nèi)部鎖是否采用公平鎖疏叨,默認采用非公平鎖。

2.LinkedBlockingQueue

基于鏈表的阻塞隊列百宇,同ArrayListBlockingQueue類似考廉,其內(nèi)部也維持著一個數(shù)據(jù)緩沖隊列(該隊列由一個鏈表構(gòu)成),當生產(chǎn)者往隊列中放入一個數(shù)據(jù)時携御,隊列會從生產(chǎn)者手中獲取數(shù)據(jù)昌粤,并緩存在隊列內(nèi)部,而生產(chǎn)者立即返回啄刹;只有當隊列緩沖區(qū)達到最大值緩存容量時(LinkedBlockingQueue可以通過構(gòu)造函數(shù)指定該值)涮坐,才會阻塞生產(chǎn)者隊列,直到消費者從隊列中消費掉一份數(shù)據(jù)誓军,生產(chǎn)者線程會被喚醒袱讹,反之對于消費者這端的處理也基于同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理并發(fā)數(shù)據(jù)昵时,還因為其對于生產(chǎn)者端和消費者端分別采用了獨立的鎖來控制數(shù)據(jù)同步捷雕,這也意味著在高并發(fā)的情況下生產(chǎn)者和消費者可以并行地操作隊列中的數(shù)據(jù),以此來提高整個隊列的并發(fā)性能壹甥。

作為開發(fā)者救巷,我們需要注意的是,如果構(gòu)造一個LinkedBlockingQueue對象句柠,而沒有指定其容量大小浦译,LinkedBlockingQueue會默認一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話溯职,如果生產(chǎn)者的速度一旦大于消費者的速度精盅,也許還沒有等到隊列滿阻塞產(chǎn)生,系統(tǒng)內(nèi)存就有可能已被消耗殆盡了谜酒。

ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞隊列叹俏,一般情況下,在處理多線程間的生產(chǎn)者消費者問題甚带,使用這兩個類足以她肯。

下面的代碼演示了如何使用BlockingQueue:

(1) 測試類

[
復制代碼

](javascript:void(0); "復制代碼")

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;"> 1 import java.util.concurrent.BlockingQueue; 2 import java.util.concurrent.ExecutorService; 3 import java.util.concurrent.Executors; 4 import java.util.concurrent.LinkedBlockingQueue; 5
6 public class BlockingQueueTest { 7
8 public static void main(String[] args) throws InterruptedException { 9 // 聲明一個容量為10的緩存隊列
10 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10); 11
12 //new了三個生產(chǎn)者和一個消費者
13 Producer producer1 = new Producer(queue); 14 Producer producer2 = new Producer(queue); 15 Producer producer3 = new Producer(queue); 16 Consumer consumer = new Consumer(queue); 17
18 // 借助Executors
19 ExecutorService service = Executors.newCachedThreadPool(); 20 // 啟動線程
21 service.execute(producer1); 22 service.execute(producer2); 23 service.execute(producer3); 24 service.execute(consumer); 25
26 // 執(zhí)行10s
27 Thread.sleep(10 * 1000); 28 producer1.stop(); 29 producer2.stop(); 30 producer3.stop(); 31
32 Thread.sleep(2000); 33 // 退出Executor
34 service.shutdown(); 35 } 36 }</pre>

[
復制代碼

](javascript:void(0); "復制代碼")

(2)生產(chǎn)者類

[
復制代碼

](javascript:void(0); "復制代碼")

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;"> 1 import java.util.Random; 2 import java.util.concurrent.BlockingQueue; 3 import java.util.concurrent.TimeUnit; 4 import java.util.concurrent.atomic.AtomicInteger; 5
6 /**
7 * 生產(chǎn)者線程
8 *
9 * @author jackyuj 10 */
11 public class Producer implements Runnable { 12
13 private volatile boolean isRunning = true;//是否在運行標志
14 private BlockingQueue queue;//阻塞隊列
15 private static AtomicInteger count = new AtomicInteger();//自動更新的值
16 private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; 17
18 //構(gòu)造函數(shù)
19 public Producer(BlockingQueue queue) { 20 this.queue = queue; 21 } 22
23 public void run() { 24 String data = null; 25 Random r = new Random(); 26
27 System.out.println("啟動生產(chǎn)者線程佳头!"); 28 try { 29 while (isRunning) { 30 System.out.println("正在生產(chǎn)數(shù)據(jù)..."); 31 Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));//取0~DEFAULT_RANGE_FOR_SLEEP值的一個隨機數(shù)
32
33 data = "data:" + count.incrementAndGet();//以原子方式將count當前值加1
34 System.out.println("將數(shù)據(jù):" + data + "放入隊列..."); 35 if (!queue.offer(data, 2, TimeUnit.SECONDS)) {//設(shè)定的等待時間為2s,如果超過2s還沒加進去返回true
36 System.out.println("放入數(shù)據(jù)失斍绨薄:" + data); 37 } 38 } 39 } catch (InterruptedException e) { 40 e.printStackTrace(); 41 Thread.currentThread().interrupt(); 42 } finally { 43 System.out.println("退出生產(chǎn)者線程康嘉!"); 44 } 45 } 46
47 public void stop() { 48 isRunning = false; 49 }
50 }</pre>

[
復制代碼

](javascript:void(0); "復制代碼")

(3)消費者類

[
復制代碼

](javascript:void(0); "復制代碼")

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;"> 1 import java.util.Random; 2 import java.util.concurrent.BlockingQueue; 3 import java.util.concurrent.TimeUnit; 4
5 /**
6 * 消費者線程
7 *
8 * @author jackyuj 9 */
10 public class Consumer implements Runnable { 11
12 private BlockingQueue<String> queue; 13 private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; 14
15 //構(gòu)造函數(shù)
16 public Consumer(BlockingQueue<String> queue) { 17 this.queue = queue; 18 } 19
20 public void run() { 21 System.out.println("啟動消費者線程!"); 22 Random r = new Random(); 23 boolean isRunning = true; 24 try { 25 while (isRunning) { 26 System.out.println("正從隊列獲取數(shù)據(jù)..."); 27 String data = queue.poll(2, TimeUnit.SECONDS);//有數(shù)據(jù)時直接從隊列的隊首取走籽前,無數(shù)據(jù)時阻塞亭珍,在2s內(nèi)有數(shù)據(jù),取走枝哄,超過2s還沒數(shù)據(jù)肄梨,返回失敗
28 if (null != data) { 29 System.out.println("拿到數(shù)據(jù):" + data); 30 System.out.println("正在消費數(shù)據(jù):" + data); 31 Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); 32 } else { 33 // 超過2s還沒數(shù)據(jù),認為所有生產(chǎn)線程都已經(jīng)退出挠锥,自動退出消費線程众羡。
34 isRunning = false; 35 } 36 } 37 } catch (InterruptedException e) { 38 e.printStackTrace(); 39 Thread.currentThread().interrupt(); 40 } finally { 41 System.out.println("退出消費者線程!"); 42 } 43 } 44
45
46 }</pre>

[
復制代碼

](javascript:void(0); "復制代碼")

3. DelayQueue

DelayQueue中的元素只有當其指定的延遲時間到了蓖租,才能夠從隊列中獲取到該元素粱侣。DelayQueue是一個沒有大小限制的隊列,因此往隊列中插入數(shù)據(jù)的操作(生產(chǎn)者)永遠不會被阻塞蓖宦,而只有獲取數(shù)據(jù)的操作(消費者)才會被阻塞齐婴。

使用場景:

DelayQueue使用場景較少,但都相當巧妙稠茂,常見的例子比如使用一個DelayQueue來管理一個超時未響應(yīng)的連接隊列柠偶。

4. PriorityBlockingQueue

基于優(yōu)先級的阻塞隊列(優(yōu)先級的判斷通過構(gòu)造函數(shù)傳入的Compator對象來決定),但需要注意的是PriorityBlockingQueue并不會阻塞數(shù)據(jù)生產(chǎn)者睬关,而只會在沒有可消費的數(shù)據(jù)時诱担,阻塞數(shù)據(jù)的消費者。因此使用的時候要特別注意电爹,生產(chǎn)者生產(chǎn)數(shù)據(jù)的速度絕對不能快于消費者消費數(shù)據(jù)的速度该肴,否則時間一長,會最終耗盡所有的可用堆內(nèi)存空間藐不。在實現(xiàn)PriorityBlockingQueue時,內(nèi)部控制線程同步的鎖采用的是公平鎖秦效。

5. SynchronousQueue

一種無緩沖的等待隊列雏蛮,類似于無中介的直接交易,有點像原始社會中的生產(chǎn)者和消費者阱州,生產(chǎn)者拿著產(chǎn)品去集市銷售給產(chǎn)品的最終消費者挑秉,而消費者必須親自去集市找到所要商品的直接生產(chǎn)者,如果一方?jīng)]有找到合適的目標苔货,那么對不起犀概,大家都在集市等待立哑。相對于有緩沖的BlockingQueue來說,少了一個中間經(jīng)銷商的環(huán)節(jié)(緩沖區(qū))姻灶,如果有經(jīng)銷商铛绰,生產(chǎn)者直接把產(chǎn)品批發(fā)給經(jīng)銷商,而無需在意經(jīng)銷商最終會將這些產(chǎn)品賣給那些消費者产喉,由于經(jīng)銷商可以庫存一部分商品捂掰,因此相對于直接交易模式,總體來說采用中間經(jīng)銷商的模式會吞吐量高一些(可以批量買賣)曾沈;但另一方面这嚣,又因為經(jīng)銷商的引入,使得產(chǎn)品從生產(chǎn)者到消費者中間增加了額外的交易環(huán)節(jié)塞俱,單個產(chǎn)品的及時響應(yīng)性能可能會降低姐帚。

聲明一個SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為障涯。公平模式和非公平模式的區(qū)別:

如果采用公平模式:SynchronousQueue會采用公平鎖罐旗,并配合一個FIFO隊列來阻塞多余的生產(chǎn)者和消費者,從而體系整體的公平策略像樊;

但如果是非公平模式(SynchronousQueue默認):SynchronousQueue采用非公平鎖尤莺,同時配合一個LIFO隊列來管理多余的生產(chǎn)者和消費者,而后一種模式生棍,如果生產(chǎn)者和消費者的處理速度有差距颤霎,則很容易出現(xiàn)饑渴的情況,即可能有某些生產(chǎn)者或者是消費者的數(shù)據(jù)永遠都得不到處理涂滴。

五. 小結(jié)

BlockingQueue不光實現(xiàn)了一個完整隊列所具有的基本功能友酱,同時在多線程環(huán)境下,他還自動管理了多線間的自動等待于喚醒功能柔纵,從而使得程序員可以忽略這些細節(jié)缔杉,關(guān)注更高級的功能。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末搁料,一起剝皮案震驚了整個濱河市或详,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌郭计,老刑警劉巖霸琴,帶你破解...
    沈念sama閱讀 219,366評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異昭伸,居然都是意外死亡梧乘,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來选调,“玉大人夹供,你說我怎么就攤上這事∪士埃” “怎么了哮洽?”我有些...
    開封第一講書人閱讀 165,689評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長枝笨。 經(jīng)常有香客問我袁铐,道長,這世上最難降的妖魔是什么横浑? 我笑而不...
    開封第一講書人閱讀 58,925評論 1 295
  • 正文 為了忘掉前任剔桨,我火速辦了婚禮,結(jié)果婚禮上徙融,老公的妹妹穿的比我還像新娘洒缀。我一直安慰自己,他們只是感情好欺冀,可當我...
    茶點故事閱讀 67,942評論 6 392
  • 文/花漫 我一把揭開白布树绩。 她就那樣靜靜地躺著,像睡著了一般隐轩。 火紅的嫁衣襯著肌膚如雪饺饭。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,727評論 1 305
  • 那天职车,我揣著相機與錄音瘫俊,去河邊找鬼。 笑死悴灵,一個胖子當著我的面吹牛扛芽,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播积瞒,決...
    沈念sama閱讀 40,447評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼川尖,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了茫孔?” 一聲冷哼從身側(cè)響起叮喳,我...
    開封第一講書人閱讀 39,349評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎缰贝,沒想到半個月后嘲更,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,820評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡揩瞪,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,990評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了篓冲。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片李破。...
    茶點故事閱讀 40,127評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡宠哄,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出嗤攻,到底是詐尸還是另有隱情毛嫉,我是刑警寧澤,帶...
    沈念sama閱讀 35,812評論 5 346
  • 正文 年R本政府宣布妇菱,位于F島的核電站承粤,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏闯团。R本人自食惡果不足惜辛臊,卻給世界環(huán)境...
    茶點故事閱讀 41,471評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望房交。 院中可真熱鬧彻舰,春花似錦、人聲如沸候味。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽白群。三九已至尚胞,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間帜慢,已是汗流浹背笼裳。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留崖堤,地道東北人侍咱。 一個月前我還...
    沈念sama閱讀 48,388評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像密幔,于是被迫代替她去往敵國和親楔脯。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,066評論 2 355

推薦閱讀更多精彩內(nèi)容