前言
本文的主要詳細(xì)分析ArrayBlockingQueue的實(shí)現(xiàn)原理府蔗,由于該并發(fā)集合其底層是使用了java.util.ReentrantLock和java.util.Condition來完成并發(fā)控制的夜焦,我們可以通過JDK的源代碼更好的學(xué)習(xí)這些并發(fā)控制類的使用脂男,同時(shí)該類也是所有并發(fā)集合中最簡(jiǎn)單的一個(gè)累盗,分析該類的源碼也是為之后分析其他并發(fā)集合做好基礎(chǔ)厨姚。
1.Queue接口和BlockingQueue接口回顧
1.1 Queue接口回顧
在Queue接口中兢榨,除了繼承Collection接口中定義的方法外绰垂,它還分別額外地定義插入室奏、刪除、查詢這3個(gè)操作劲装,其中每一個(gè)操作都以兩種不同的形式存在胧沫,每一種形式都對(duì)應(yīng)著一個(gè)方法。
方法說明:
操作 | 拋出異常 | 返回特殊值 |
---|---|---|
Insert | add(e) | offer(e) |
Remove | remove() | poll() |
Examine | element() | peek() |
- add方法在將一個(gè)元素插入到隊(duì)列的尾部時(shí)占业,如果出現(xiàn)隊(duì)列已經(jīng)滿了绒怨,那么就會(huì)拋出IllegalStateException,而使用offer方法時(shí),如果隊(duì)列滿了纺酸,則添加失敗,返回false,但并不會(huì)引發(fā)異常窖逗。
- remove方法是獲取隊(duì)列的頭部元素并且刪除,如果當(dāng)隊(duì)列為空時(shí)餐蔬,那么就會(huì)拋出NoSuchElementException碎紊。而poll在隊(duì)列為空時(shí),則返回一個(gè)null樊诺。
- element方法是從隊(duì)列中獲取到隊(duì)列的第一個(gè)元素仗考,但不會(huì)刪除,但是如果隊(duì)列為空時(shí)词爬,那么它就會(huì)拋出NoSuchElementException秃嗜。peek方法與之類似,只是不會(huì)拋出異常,而是返回false锅锨。
后面我們?cè)诜治鯝rrayBlockingQueue的方法時(shí)叽赊,主要也是圍繞著這幾個(gè)方法來進(jìn)行分析。
1.2 BlockingQueue接口回顧
BlockingQueue是JDK1.5出現(xiàn)的接口必搞,它在原來的Queue接口基礎(chǔ)上提供了更多的額外功能:當(dāng)獲取隊(duì)列中的頭部元素時(shí)必指,如果隊(duì)列為空,那么它將會(huì)使執(zhí)行線程處于等待狀態(tài)恕洲;當(dāng)添加一個(gè)元素到隊(duì)列的尾部時(shí)塔橡,如果隊(duì)列已經(jīng)滿了,那么它同樣會(huì)使執(zhí)行的線程處于等待狀態(tài)霜第。
前面我們?cè)谡fQueue接口時(shí)提到過葛家,它針對(duì)于相同的操作提供了2種不同的形式,而BlockingQueue更夸張泌类,針對(duì)于相同的操作提供了4種不同的形式癞谒。
該四種形式分別為:
- 拋出異常
- 返回一個(gè)特殊值(可能是null或者是false,取決于具體的操作)
- 阻塞當(dāng)前執(zhí)行直到其可以繼續(xù)
- 當(dāng)線程被掛起后末誓,等待最大的時(shí)間扯俱,如果一旦超時(shí),即使該操作依舊無法繼續(xù)執(zhí)行喇澡,線程也不會(huì)再繼續(xù)等待下去。
對(duì)應(yīng)的方法說明:
操作 | 拋出異常 | 返回特殊值 | 阻塞 | 超時(shí) |
---|---|---|---|---|
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | element() | peek() | 無 | 無 |
BlockingQueue雖然比起Queue在操作上提供了更多的支持殊校,但是它在使用的使用也應(yīng)該如下的幾點(diǎn):
- BlockingQueue中是不允許添加null的晴玖,該接受在聲明的時(shí)候就要求所有的實(shí)現(xiàn)類在接收到一個(gè)null的時(shí)候,都應(yīng)該拋出NullPointerException为流。
- BlockingQueue是線程安全的呕屎,因此它的所有和隊(duì)列相關(guān)的方法都具有原子性。但是對(duì)于那么從Collection接口中繼承而來的批量操作方法敬察,比如addAll(Collection e)等方法秀睛,BlockingQueue的實(shí)現(xiàn)通常沒有保證其具有原子性,因此我們?cè)谑褂玫腂lockingQueue莲祸,應(yīng)該盡可能地不去使用這些方法蹂安。
- BlockingQueue主要應(yīng)用于生產(chǎn)者與消費(fèi)者的模型中,其元素的添加和獲取都是極具規(guī)律性的锐帜。但是對(duì)于remove(Object o)這樣的方法田盈,雖然BlockingQueue可以保證元素正確的刪除,但是這樣的操作會(huì)非常響應(yīng)性能缴阎,因此我們?cè)跊]有特殊的情況下允瞧,也應(yīng)該避免使用這類方法。
2. ArrayBlockingQueue深入分析
有了上面的鋪墊,下面我們就可以真正開始分析ArrayBlockingQueue了述暂。在分析之前痹升,首先讓我們看看API對(duì)其的描述。
注意:這里使用的JDK版本為1.7畦韭,不同的JDK版本在實(shí)現(xiàn)上存在不同
首先讓我們看下ArrayBlockingQueue的核心組成:
/** 底層維護(hù)隊(duì)列元素的數(shù)組 */
final Object[] items;
/** 當(dāng)讀取元素時(shí)數(shù)組的下標(biāo)(這里稱為讀下標(biāo)) */
int takeIndex;
/** 添加元素時(shí)數(shù)組的下標(biāo) (這里稱為寫小標(biāo))*/
int putIndex;
/** 隊(duì)列中的元素個(gè)數(shù) */
int count;
/**用于并發(fā)控制的工具類**/
final ReentrantLock lock;
/** 控制take操作時(shí)是否讓線程等待 */
private final Condition notEmpty;
/** 控制put操作時(shí)是否讓線程等待 */
private final Condition notFull;
take方法分析(369-379行):
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
/*
嘗試獲取鎖视卢,如果此時(shí)鎖被其他線程鎖占用,那么當(dāng)前線程就處于Waiting的狀態(tài)廊驼。
注意:當(dāng)方法是支持線程中斷響應(yīng)的如果其他線程此時(shí)中斷當(dāng)前線程据过,
那么當(dāng)前線程就會(huì)拋出InterruptedException
*/
lock.lockInterruptibly();
try {
/*
如果此時(shí)隊(duì)列中的元素個(gè)數(shù)為0,那么就讓當(dāng)前線程wait,并且釋放鎖。
注意:這里使用了while進(jìn)行重復(fù)檢查妒挎,是為了防止當(dāng)前線程可能由于 其他未知的原因被喚醒绳锅。
(通常這種情況被稱為"spurious wakeup")
*/
while (count == 0)
notEmpty.await();
//如果隊(duì)列不為空,則從隊(duì)列的頭部取元素
return extract();
} finally {
//完成鎖的釋放
lock.unlock();
}
}
extract方法分析(163-171):
/*
根據(jù)takeIndex來獲取當(dāng)前的元素,然后通知其他等待的線程酝掩。
Call only when holding lock.(只有當(dāng)前線程已經(jīng)持有了鎖之后鳞芙,它才能調(diào)用該方法)
*/
private E extract() {
final Object[] items = this.items;
//根據(jù)takeIndex獲取元素,因?yàn)樵厥且粋€(gè)Object類型的數(shù)組,因此它通過cast方法將其轉(zhuǎn)換成泛型。
E x = this.<E>cast(items[takeIndex]);
//將當(dāng)前位置的元素設(shè)置為null
items[takeIndex] = null;
//并且將takeIndex++,注意:這里因?yàn)橐呀?jīng)使用了鎖期虾,因此inc方法中沒有使用到原子操作
takeIndex = inc(takeIndex);
//將隊(duì)列中的總的元素減1
--count;
//喚醒其他等待的線程
notFull.signal();
return x;
}
put方法分析(318-239)
public void put(E e) throws InterruptedException {
//首先檢查元素是否為空原朝,否則拋出NullPointerException
checkNotNull(e);
final ReentrantLock lock = this.lock;
//進(jìn)行鎖的搶占
lock.lockInterruptibly();
try {
/*當(dāng)隊(duì)列的長(zhǎng)度等于數(shù)組的長(zhǎng)度,此時(shí)說明隊(duì)列已經(jīng)滿了,這里同樣
使用了while來方式當(dāng)前線程被"偽喚醒"。*/
while (count == items.length)
//則讓當(dāng)前線程處于等待狀態(tài)
notFull.await();
//一旦獲取到鎖并且隊(duì)列還未滿時(shí)镶苞,則執(zhí)行insert操作喳坠。
insert(e);
} finally {
//完成鎖的釋放
lock.unlock();
}
}
//檢查元素是否為空
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
//該方法的邏輯非常簡(jiǎn)單
private void insert(E x) {
//將當(dāng)前元素設(shè)置到putIndex位置
items[putIndex] = x;
//讓putIndex++
putIndex = inc(putIndex);
//將隊(duì)列的大小加1
++count;
//喚醒其他正在處于等待狀態(tài)的線程
notEmpty.signal();
}
注:ArrayBlockingQueue其實(shí)是一個(gè)循環(huán)隊(duì)列
我們使用一個(gè)圖來簡(jiǎn)單說明一下:
黃色表示數(shù)組中有元素
當(dāng)再一次執(zhí)行put的時(shí)候,其結(jié)果為:
此時(shí)放入的元素會(huì)從頭開始置,我們通過其incr方法更加清晰的看出其底層的操作:
/**
* Circularly increment i.
*/
final int inc(int i) {
//當(dāng)takeIndex的值等于數(shù)組的長(zhǎng)度時(shí),就會(huì)重新置為0茂蚓,這個(gè)一個(gè)循環(huán)遞增的過程
return (++i == items.length) ? 0 : i;
}
至此壕鹉,ArrayBlockingQueue的核心部分就分析完了,其余的隊(duì)列操作基本上都是換湯不換藥的聋涨,此處不再一一列舉晾浴。