- 認(rèn)識(shí)BlockingQueue
阻塞隊(duì)列,顧名思義袍嬉,首先它是一個(gè)隊(duì)列境蔼,而一個(gè)隊(duì)列在數(shù)據(jù)結(jié)構(gòu)中所起的作用大致如下圖所示:
從上圖我們可以很清楚看到,通過一個(gè)共享的隊(duì)列伺通,可以使得數(shù)據(jù)由隊(duì)列的一端輸入箍土,從另外一端輸出;
常用的隊(duì)列主要有以下兩種:(當(dāng)然通過不同的實(shí)現(xiàn)方式罐监,還可以延伸出很多不同類型的隊(duì)列吴藻,DelayQueue就是其中的一種)
先進(jìn)先出(FIFO):先插入的隊(duì)列的元素也最先出隊(duì)列,類似于排隊(duì)的功能弓柱。從某種程度上來說這種隊(duì)列也體現(xiàn)了一種公平性沟堡。
后進(jìn)先出(LIFO):后插入隊(duì)列的元素最先出隊(duì)列,這種隊(duì)列優(yōu)先處理最近發(fā)生的事件吆你。
多線程環(huán)境中弦叶,通過隊(duì)列可以很容易實(shí)現(xiàn)數(shù)據(jù)共享俊犯,比如經(jīng)典的“生產(chǎn)者”和“消費(fèi)者”模型中妇多,通過隊(duì)列可以很便利地實(shí)現(xiàn)兩者之間的數(shù)據(jù)共享。假設(shè)我們有若干生產(chǎn)者線程燕侠,另外又有若干個(gè)消費(fèi)者線程者祖。如果生產(chǎn)者線程需要把準(zhǔn)備好的數(shù)據(jù)共享給消費(fèi)者線程,利用隊(duì)列的方式來傳遞數(shù)據(jù)绢彤,就可以很方便地解決他們之間的數(shù)據(jù)共享問題七问。但如果生產(chǎn)者和消費(fèi)者在某個(gè)時(shí)間段內(nèi),萬一發(fā)生數(shù)據(jù)處理速度不匹配的情況呢茫舶?理想情況下械巡,如果生產(chǎn)者產(chǎn)出數(shù)據(jù)的速度大于消費(fèi)者消費(fèi)的速度,并且當(dāng)生產(chǎn)出來的數(shù)據(jù)累積到一定程度的時(shí)候饶氏,那么生產(chǎn)者必須暫停等待一下(阻塞生產(chǎn)者線程)讥耗,以便等待消費(fèi)者線程把累積的數(shù)據(jù)處理完畢,反之亦然疹启。然而古程,在concurrent包發(fā)布以前,在多線程環(huán)境下喊崖,我們每個(gè)程序員都必須去自己控制這些細(xì)節(jié)挣磨,尤其還要兼顧效率和線程安全雇逞,而這會(huì)給我們的程序帶來不小的復(fù)雜度。好在此時(shí)茁裙,強(qiáng)大的concurrent包橫空出世了塘砸,而他也給我們帶來了強(qiáng)大的BlockingQueue。(在多線程領(lǐng)域:所謂阻塞晤锥,在某些情況下會(huì)掛起線程(即阻塞)谣蠢,一旦條件滿足,被掛起的線程又會(huì)自動(dòng)被喚醒)
下面兩幅圖演示了BlockingQueue的兩個(gè)常見阻塞場(chǎng)景:
如上圖所示:當(dāng)隊(duì)列中沒有數(shù)據(jù)的情況下查近,消費(fèi)者端的所有線程都會(huì)被自動(dòng)阻塞(掛起)眉踱,直到有數(shù)據(jù)放入隊(duì)列。
如上圖所示:當(dāng)隊(duì)列中填滿數(shù)據(jù)的情況下霜威,生產(chǎn)者端的所有線程都會(huì)被自動(dòng)阻塞(掛起)谈喳,直到隊(duì)列中有空的位置,線程被自動(dòng)喚醒戈泼。
這也是我們?cè)诙嗑€程環(huán)境下婿禽,為什么需要BlockingQueue的原因。作為BlockingQueue的使用者大猛,我們?cè)僖膊恍枰P(guān)心什么時(shí)候需要阻塞線程扭倾,什么時(shí)候需要喚醒線程,因?yàn)檫@一切BlockingQueue都給你一手包辦了挽绩。既然BlockingQueue如此神通廣大膛壹,讓我們一起來見識(shí)下它的常用方法:
BlockingQueue的核心方法
放入數(shù)據(jù):
offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,
則返回true,否則返回false.(本方法不阻塞當(dāng)前執(zhí)行方法的線程)
則返回true,否則返回false.(本方法不阻塞當(dāng)前執(zhí)行方法的線程)
offer(E o, long timeout, TimeUnit unit),可以設(shè)定等待的時(shí)間,如果在指定的時(shí)間內(nèi)唉堪,還不能往隊(duì)列中加入BlockingQueue模聋,則返回失敗。
put(anObject):把a(bǔ)nObject加到BlockingQueue里,如果BlockQueue沒有空間,則調(diào)用此方法的線程被阻斷,直到BlockingQueue里面有空間再繼續(xù).
獲取數(shù)據(jù):
poll(time):取走BlockingQueue里排在首位的對(duì)象,若不能立即取出,則可以等time參數(shù)規(guī)定的時(shí)間,
取不到時(shí)返回null;
poll(long timeout, TimeUnit unit):從BlockingQueue取出一個(gè)隊(duì)首的對(duì)象唠亚,如果在指定時(shí)間內(nèi)链方,隊(duì)列一旦有數(shù)據(jù)可取,則立即返回隊(duì)列中的數(shù)據(jù)灶搜。否則知道時(shí)間超時(shí)還沒有數(shù)據(jù)可取祟蚀,返回失敗。
take():取走BlockingQueue里排在首位的對(duì)象,若BlockingQueue為空,阻斷進(jìn)入等待狀態(tài)直到BlockingQueue有新的數(shù)據(jù)被加入;
drainTo():一次性從BlockingQueue獲取所有可用的數(shù)據(jù)對(duì)象(還可以指定獲取數(shù)據(jù)的個(gè)數(shù))割卖,
通過該方法前酿,可以提升獲取數(shù)據(jù)效率;不需要多次分批加鎖或釋放鎖究珊。
- 常見BlockingQueue
在了解了BlockingQueue的基本功能后薪者,讓我們來看看BlockingQueue家庭大致有哪些成員?
BlockingQueue成員詳細(xì)介紹
- ArrayBlockingQueue
基于數(shù)組的阻塞隊(duì)列實(shí)現(xiàn)剿涮,在ArrayBlockingQueue內(nèi)部言津,維護(hù)了一個(gè)定長數(shù)組攻人,以便緩存隊(duì)列中的數(shù)據(jù)對(duì)象,這是一個(gè)常用的阻塞隊(duì)列悬槽,除了一個(gè)定長數(shù)組外怀吻,ArrayBlockingQueue內(nèi)部還保存著兩個(gè)整形變量,分別標(biāo)識(shí)著隊(duì)列的頭部和尾部在數(shù)組中的位置初婆。
ArrayBlockingQueue在生產(chǎn)者放入數(shù)據(jù)和消費(fèi)者獲取數(shù)據(jù)蓬坡,都是共用同一個(gè)鎖對(duì)象,由此也意味著兩者無法真正并行運(yùn)行磅叛,這點(diǎn)尤其不同于LinkedBlockingQueue屑咳;按照實(shí)現(xiàn)原理來分析,ArrayBlockingQueue完全可以采用分離鎖弊琴,從而實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者操作的完全并行運(yùn)行兆龙。Doug Lea之所以沒這樣去做,也許是因?yàn)锳rrayBlockingQueue的數(shù)據(jù)寫入和獲取操作已經(jīng)足夠輕巧敲董,以至于引入獨(dú)立的鎖機(jī)制紫皇,除了給代碼帶來額外的復(fù)雜性外,其在性能上完全占不到任何便宜腋寨。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個(gè)明顯的不同之處在于聪铺,前者在插入或刪除元素時(shí)不會(huì)產(chǎn)生或銷毀任何額外的對(duì)象實(shí)例,而后者則會(huì)生成一個(gè)額外的Node對(duì)象萄窜。這在長時(shí)間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的系統(tǒng)中铃剔,其對(duì)于GC的影響還是存在一定的區(qū)別。而在創(chuàng)建ArrayBlockingQueue時(shí)脂倦,我們還可以控制對(duì)象的內(nèi)部鎖是否采用公平鎖番宁,默認(rèn)采用非公平鎖。
2.LinkedBlockingQueue
基于鏈表的阻塞隊(duì)列赖阻,同ArrayListBlockingQueue類似,其內(nèi)部也維持著一個(gè)數(shù)據(jù)緩沖隊(duì)列(該隊(duì)列由一個(gè)鏈表構(gòu)成)踱蠢,當(dāng)生產(chǎn)者往隊(duì)列中放入一個(gè)數(shù)據(jù)時(shí)火欧,隊(duì)列會(huì)從生產(chǎn)者手中獲取數(shù)據(jù),并緩存在隊(duì)列內(nèi)部茎截,而生產(chǎn)者立即返回苇侵;只有當(dāng)隊(duì)列緩沖區(qū)達(dá)到最大值緩存容量時(shí)(LinkedBlockingQueue可以通過構(gòu)造函數(shù)指定該值),才會(huì)阻塞生產(chǎn)者隊(duì)列企锌,直到消費(fèi)者從隊(duì)列中消費(fèi)掉一份數(shù)據(jù)榆浓,生產(chǎn)者線程會(huì)被喚醒,反之對(duì)于消費(fèi)者這端的處理也基于同樣的原理撕攒。而LinkedBlockingQueue之所以能夠高效的處理并發(fā)數(shù)據(jù)陡鹃,還因?yàn)槠鋵?duì)于生產(chǎn)者端和消費(fèi)者端分別采用了獨(dú)立的鎖來控制數(shù)據(jù)同步烘浦,這也意味著在高并發(fā)的情況下生產(chǎn)者和消費(fèi)者可以并行地操作隊(duì)列中的數(shù)據(jù),以此來提高整個(gè)隊(duì)列的并發(fā)性能萍鲸。
作為開發(fā)者闷叉,我們需要注意的是,如果構(gòu)造一個(gè)LinkedBlockingQueue對(duì)象脊阴,而沒有指定其容量大小握侧,LinkedBlockingQueue會(huì)默認(rèn)一個(gè)類似無限大小的容量(Integer.MAX_VALUE),這樣的話嘿期,如果生產(chǎn)者的速度一旦大于消費(fèi)者的速度品擎,也許還沒有等到隊(duì)列滿阻塞產(chǎn)生,系統(tǒng)內(nèi)存就有可能已被消耗殆盡了备徐。
ArrayBlockingQueue和LinkedBlockingQueue是兩個(gè)最普通也是最常用的阻塞隊(duì)列孽查,一般情況下,在處理多線程間的生產(chǎn)者消費(fèi)者問題坦喘,使用這兩個(gè)類足以盲再。
下面的代碼演示了如何使用BlockingQueue:
/** 生產(chǎn)者線程 **/
public class Producer implements Runnable{
private BlockingQueue queue;
private volatile boolean isRunning = true;
private static AtomicInteger count = new AtomicInteger();
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
public void run() {
String data = null;
System.out.println("啟動(dòng)生產(chǎn)者線程!");
Random random = new Random();
try {
while (isRunning) {
System.out.println("正在生產(chǎn)數(shù)據(jù)...");
Thread.sleep(random.nextInt(DEFAULT_RANGE_FOR_SLEEP));
data = "data:" + count.incrementAndGet();
System.out.println("將隊(duì)列" + data + "放入隊(duì)列...");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
System.out.println("放入數(shù)據(jù)失敗:" + data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出生產(chǎn)者線程瓣铣!");
}
}
public void stop() {
isRunning = false;
}
}
/** 消費(fèi)者線程 **/
public class Consumer implements Runnable{
private BlockingQueue<String> queue;
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000;
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
System.out.println("啟動(dòng)消費(fèi)者線程答朋!");
Random random = new Random();
boolean isRunning = true;
try {
while (isRunning) {
System.out.println("正在從隊(duì)列里獲取數(shù)據(jù)...");
String data = queue.poll(2, TimeUnit.SECONDS);
if (null != data) {
System.out.println("拿到數(shù)據(jù):" + data);
System.out.println("正在消費(fèi)數(shù)據(jù):" + data);
Thread.sleep(random.nextInt(DEFAULT_RANGE_FOR_SLEEP));
} else {
// 超過2s還沒數(shù)據(jù),認(rèn)為所有生產(chǎn)線程都已經(jīng)退出棠笑,自動(dòng)退出消費(fèi)線程梦碗。
isRunning = false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
} finally {
System.out.println("退出消費(fèi)者線程!");
}
}
}
測(cè)試:
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
// 聲明一個(gè)容量為10的緩存隊(duì)列
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer = new Consumer(queue);
// 借助Executors
ExecutorService service = Executors.newCachedThreadPool();
// 啟動(dòng)線程
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer);
// 執(zhí)行10s
Thread.sleep(10*1000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(2000);
// 退出Executor
service.shutdown();
}
}
下面是運(yùn)行結(jié)果:
啟動(dòng)生產(chǎn)者線程蓖救!
啟動(dòng)消費(fèi)者線程洪规!
正在從隊(duì)列里獲取數(shù)據(jù)…
啟動(dòng)生產(chǎn)者線程!
啟動(dòng)生產(chǎn)者線程循捺!
正在生產(chǎn)數(shù)據(jù)…
正在生產(chǎn)數(shù)據(jù)…
正在生產(chǎn)數(shù)據(jù)…
將隊(duì)列data:1放入隊(duì)列…
正在生產(chǎn)數(shù)據(jù)…
拿到數(shù)據(jù):data:1
正在消費(fèi)數(shù)據(jù):data:1
將隊(duì)列data:2放入隊(duì)列…
正在生產(chǎn)數(shù)據(jù)…
將隊(duì)列data:3放入隊(duì)列…
正在生產(chǎn)數(shù)據(jù)…
將隊(duì)列data:4放入隊(duì)列…
正在生產(chǎn)數(shù)據(jù)…
將隊(duì)列data:5放入隊(duì)列…
正在生產(chǎn)數(shù)據(jù)…
正在從隊(duì)列里獲取數(shù)據(jù)…
拿到數(shù)據(jù):data:2
正在消費(fèi)數(shù)據(jù):data:2
將隊(duì)列data:6放入隊(duì)列…
正在生產(chǎn)數(shù)據(jù)…
將隊(duì)列data:7放入隊(duì)列…
正在生產(chǎn)數(shù)據(jù)…
正在從隊(duì)列里獲取數(shù)據(jù)…
拿到數(shù)據(jù):data:3
正在消費(fèi)數(shù)據(jù):data:3
將隊(duì)列data:8放入隊(duì)列…
正在生產(chǎn)數(shù)據(jù)…
將隊(duì)列data:9放入隊(duì)列…
正在生產(chǎn)數(shù)據(jù)…
將隊(duì)列data:10放入隊(duì)列…
正在生產(chǎn)數(shù)據(jù)…
將隊(duì)列data:11放入隊(duì)列…
...
DelayQueue
DelayQueue中的元素只有當(dāng)其指定的延遲時(shí)間到了斩例,才能夠從隊(duì)列中獲取到該元素。DelayQueue是一個(gè)沒有大小限制的隊(duì)列从橘,因此往隊(duì)列中插入數(shù)據(jù)的操作(生產(chǎn)者)永遠(yuǎn)不會(huì)被阻塞念赶,而只有獲取數(shù)據(jù)的操作(消費(fèi)者)才會(huì)被阻塞。
使用場(chǎng)景:
DelayQueue使用場(chǎng)景較少恰力,但都相當(dāng)巧妙叉谜,常見的例子比如使用一個(gè)DelayQueue來管理一個(gè)超時(shí)未響應(yīng)的連接隊(duì)列。PriorityBlockingQueue
基于優(yōu)先級(jí)的阻塞隊(duì)列(優(yōu)先級(jí)的判斷通過構(gòu)造函數(shù)傳入的Compator對(duì)象來決定)踩萎,但需要注意的是PriorityBlockingQueue并不會(huì)阻塞數(shù)據(jù)生產(chǎn)者停局,而只會(huì)在沒有可消費(fèi)的數(shù)據(jù)時(shí),阻塞數(shù)據(jù)的消費(fèi)者。因此使用的時(shí)候要特別注意董栽,生產(chǎn)者生產(chǎn)數(shù)據(jù)的速度絕對(duì)不能快于消費(fèi)者消費(fèi)數(shù)據(jù)的速度码倦,否則時(shí)間一長,會(huì)最終耗盡所有的可用堆內(nèi)存空間裆泳。在實(shí)現(xiàn)PriorityBlockingQueue時(shí)叹洲,內(nèi)部控制線程同步的鎖采用的是公平鎖。SynchronousQueue
一種無緩沖的等待隊(duì)列工禾,類似于無中介的直接交易运提,有點(diǎn)像原始社會(huì)中的生產(chǎn)者和消費(fèi)者,生產(chǎn)者拿著產(chǎn)品去集市銷售給產(chǎn)品的最終消費(fèi)者闻葵,而消費(fèi)者必須親自去集市找到所要商品的直接生產(chǎn)者民泵,如果一方?jīng)]有找到合適的目標(biāo),那么對(duì)不起槽畔,大家都在集市等待栈妆。相對(duì)于有緩沖的BlockingQueue來說,少了一個(gè)中間經(jīng)銷商的環(huán)節(jié)(緩沖區(qū))厢钧,如果有經(jīng)銷商鳞尔,生產(chǎn)者直接把產(chǎn)品批發(fā)給經(jīng)銷商,而無需在意經(jīng)銷商最終會(huì)將這些產(chǎn)品賣給那些消費(fèi)者早直,由于經(jīng)銷商可以庫存一部分商品寥假,因此相對(duì)于直接交易模式,總體來說采用中間經(jīng)銷商的模式會(huì)吞吐量高一些(可以批量買賣)霞扬;但另一方面糕韧,又因?yàn)榻?jīng)銷商的引入,使得產(chǎn)品從生產(chǎn)者到消費(fèi)者中間增加了額外的交易環(huán)節(jié)喻圃,單個(gè)產(chǎn)品的及時(shí)響應(yīng)性能可能會(huì)降低萤彩。
聲明一個(gè)SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為斧拍。公平模式和非公平模式的區(qū)別:
如果采用公平模式:SynchronousQueue會(huì)采用公平鎖雀扶,并配合一個(gè)FIFO隊(duì)列來阻塞多余的生產(chǎn)者和消費(fèi)者,從而體系整體的公平策略饮焦;
但如果是非公平模式(SynchronousQueue默認(rèn)):SynchronousQueue采用非公平鎖怕吴,同時(shí)配合一個(gè)LIFO隊(duì)列來管理多余的生產(chǎn)者和消費(fèi)者,而后一種模式县踢,如果生產(chǎn)者和消費(fèi)者的處理速度有差距,則很容易出現(xiàn)饑渴的情況伟件,即可能有某些生產(chǎn)者或者是消費(fèi)者的數(shù)據(jù)永遠(yuǎn)都得不到處理硼啤。