概述
ArrayBlockingQueue
顧名思義存炮,使用數(shù)組實現(xiàn)的阻塞隊列。今天我們就來詳細講述下他的代碼實現(xiàn)
阻塞隊列
什么是阻塞隊列?
阻塞隊列是一種特殊的隊列,使用場景為并發(fā)環(huán)境下。在某種情況下(當線程無法獲取鎖的時候)線程會被掛起并且在隊列中等待,如果條件具備(鎖被釋放)那么就會喚醒掛起的線程泞边。
通俗點來講的話勺卢,阻塞隊列類似于理發(fā)店的等待區(qū),當沒有理發(fā)師空閑的時候尽狠,客人會在等待區(qū)等待榴鼎,一旦有了空閑,就會有人自動遞補晚唇。
類的繼承關系
ArrayBlockingQueue
繼承了抽象隊列巫财,并且實現(xiàn)了阻塞隊列,因此它具備隊列的所有基本特性哩陕。
基本實現(xiàn)原理
ArrayBlockingQueue
的實現(xiàn)是基于ReentrantLock
以及AQS
內部實現(xiàn)的鎖機制以及Condition
機制平项。
ArrayBlockingQueue
內部聲明了兩個Condition
變量,一個叫notEmpty
悍及,一個叫notFull
闽瓢,當有數(shù)據(jù)加入隊列時嘗試喚醒notEmpty
,當有數(shù)據(jù)移除隊列時則喚醒notFull
心赶,從而實現(xiàn)一個類似于生產(chǎn)者消費者模型的機制扣讼。
源碼分析
類成員變量
// 隊列的存儲對象數(shù)組
final Object[] items;
// 下一個取出的序號
int takeIndex;
// 下一個放入隊列的序號
int putIndex;
// 隊列中的元素數(shù)目
int count;
// 鎖以及用來控制隊列的兩個條件變量
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
transient Itrs itrs = null;
構造函數(shù)
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
// 通用的構造函數(shù),以容量和是否公平鎖為參數(shù)缨叫,余下兩個構造函數(shù)均調用此函數(shù)
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
// 調用構造函數(shù)
this(capacity, fair);
// 為阻塞隊列初始化數(shù)據(jù)(此操作需要上鎖)
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = 0;
try {
// 將集合中的數(shù)據(jù)存放到數(shù)組中并且進行判空操作
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
// 修改count和putIndex的值
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
這里有一點疑問椭符,這里明明是構造函數(shù)荔燎,是類初始化的地方,照理來說不會產(chǎn)生競爭销钝,為什么要進行加鎖操作呢有咨?此處原本有一句原版的注釋 Lock only for visibility, not mutual exclusion
鎖是為了可見性而不是互斥。這句話怎么理解呢蒸健?我們仔細觀察代碼座享,發(fā)現(xiàn)當我們把集合中的數(shù)據(jù)全部插入隊列中之后,我們會修改相應的count
以及putIndex
的數(shù)值似忧,但是如果我們沒有加鎖渣叛,那么在集合插入完成前count
以及putIndex
沒有完成初始化操作的時候如果有其他線程進行了插入等操作的話,會造成數(shù)據(jù)同步問題從而使得數(shù)據(jù)不準確盯捌,因此這里的鎖是必要的淳衙。
隊列操作
基礎隊列操作enqueue和dequeue
// 隊列的插入操作
private void enqueue(E x) {
// 本地聲明一個item數(shù)組的引用
final Object[] items = this.items;
// 將元素放入數(shù)組中
items[putIndex] = x;
// 如果此時已經(jīng)到了數(shù)組的末尾了,將putIndex重置為0
if (++putIndex == items.length)
putIndex = 0;
// 元素數(shù)目加1
count++;
// 發(fā)出通知告訴所有取數(shù)據(jù)的線程可以取數(shù)據(jù)
notEmpty.signal();
}
// 隊列的移除操作
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 找到要移除的數(shù)據(jù)置空
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 如果此時已經(jīng)到了數(shù)組的末尾了挽唉,將takeIndex重置為0
if (++takeIndex == items.length)
takeIndex = 0;
// 元素數(shù)目減1
count--;
// 迭代器操作滤祖,這個之后再說
if (itrs != null)
itrs.elementDequeued();
// 發(fā)出通知告知插入線程可以工作
notFull.signal();
return x;
}
這兩個方法是隊列操作的基本方法,基本上就是常規(guī)的數(shù)組數(shù)據(jù)插入移除瓶籽,只是有一點很讓人困惑 final Object[] items = this.items;
這段代碼實現(xiàn)將類成員對象在本地創(chuàng)建了一個引用匠童,然后在本地使用引用進行操作,為什么要多此一舉呢塑顺?除此之外汤求,代碼中大量用到了這種手法,例如: final ReentrantLock lock = this.lock;
這又是為了什么呢严拒?對此筆者猜測可能是和優(yōu)化相關扬绪,因為jdk7中的實現(xiàn)與之不同,是使用的類變量直接操作裤唠。在進行了資料查閱后挤牛,筆者找到了一個相對靠譜的解釋:
這是ArrayBlockingQueue的作者Doug Lea的習慣,他認為這種書寫習慣是對機器更加友好的書寫
當然也有一些大神有一些其他的解釋:
final本身是不可變的种蘸,但是由于反射以及序列化操作的存在墓赴,final的不可變性就變得捉摸不定,除此之外一些編譯器層面上在final上優(yōu)化的不夠好航瞭,導致會在使用到數(shù)據(jù)的時候反復重載導致緩存失效
希望大家可以自己認真思考下诫硕,然后嘗試下,得到自己的結論刊侯。
阻塞隊列的插入操作
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果阻塞隊列已滿章办,那么插入失敗
if (count == items.length)
return false;
else {
// 否則插入成功
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
阻塞隊列插入操作大致就以上幾種,這幾種的區(qū)別在代碼中也體現(xiàn)得比較清楚了:
-
offer
返回的是布爾值,插入成功返回true
否則(隊列已滿)返回false
-
put
沒有返回值藕届,假如隊列是滿的挪蹭,他會一直阻塞直到隊列為空的時候執(zhí)行插入操作 -
add
實際上調用的就是offer
,只是他在加入失敗后會拋出異常
阻塞隊列的移除操作
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex);
} finally {
lock.unlock();
}
}
-
poll
執(zhí)行成功會返回隊列元素翰舌,如果隊列為空則直接返回null -
take
執(zhí)行成功會返回隊列元素嚣潜,但是如果隊列為空他不會返回而是等待有數(shù)據(jù)插入冬骚,然后取出 -
peek
則是直接獲取隊列元素椅贱,并且執(zhí)行后不會將元素從隊列中刪除
迭代器實現(xiàn)
由于迭代器和內部隊列共享數(shù)據(jù),再加上阻塞隊列的特性只冻,導致為了實現(xiàn)迭代器功能庇麦,需要新增一些很復雜的代碼實現(xiàn)。
內部聲明了兩個類來實現(xiàn)迭代器喜德,一個是Itr
繼承Iterator<E>
山橄,一個則是Itrs
。
Itrs
Itrs
是用來管理迭代器的舍悯。由于阻塞隊列內部可能會有多個迭代器在同時工作航棱,在迭代器內部發(fā)生刪除或者是一些不常見的操作時可能會產(chǎn)生一些問題,比如他們會丟失自己的數(shù)據(jù)之類的萌衬。所以Itrs
內部會維護一個變量用于記錄循環(huán)的圈數(shù)饮醇,并且在刪除操作removeAt
的時候會通知所有的迭代器。
class Itrs {
// 創(chuàng)建一個Node類作為單向鏈表(節(jié)點是弱引用)來管理迭代器
private class Node extends WeakReference<Itr> {
Node next;
Node(Itr iterator, Node next) {
super(iterator);
this.next = next;
}
}
// 循環(huán)圈數(shù)
int cycles = 0;
// 鏈表頭
private Node head;
// 清理相關的變量
private Node sweeper = null;
private static final int SHORT_SWEEP_PROBES = 4;
private static final int LONG_SWEEP_PROBES = 16;
Itrs(Itr initial) {
register(initial);
}
// 清理無效的迭代器(如果sweeper為空秕豫,則從頭開始朴艰,否則從sweeper記錄的節(jié)點開始)
void doSomeSweeping(boolean tryHarder) {
}
// 新增加一個迭代器
void register(Itr itr) {
head = new Node(itr, head);
}
// 當takeIndex為0時調用此方法
void takeIndexWrapped() {
// cycle數(shù)+1,內部實現(xiàn)通知所有迭代器并進行清理(鏈表遍歷)
}
// 有移除操作的時候調用此方法混移,并通知所有迭代器進行清理
void removedAt(int removedIndex) {
// 簡單的鏈表遍歷祠墅,內部調用Itr的removedAt方法
}
// 當發(fā)現(xiàn)隊列為空的時候調用此方法,清理迭代器內的弱引用
void queueIsEmpty() {
}
// 有元素被取時是調用
void elementDequeued() {
// 如果數(shù)組為空調用queueIsEmpty進行清理
if (count == 0)
queueIsEmpty();
// 如果takeIndex為0歌径,調用takeIndexWrapped毁嗦,來進行循環(huán)+1操作
else if (takeIndex == 0)
takeIndexWrapped();
}
}
Itr
Itrs
是管理迭代器的,Itr
則是迭代器的具體實現(xiàn)
private class Itr implements Iterator<E> {
// 游標回铛,用于尋找下一個元素
private int cursor;
// 下一個元素
private E nextItem;
// 下一個元素的下標
private int nextIndex;
// 上一個元素
private E lastItem;
// 上一個元素的下標
private int lastRet;
// 上一個take的下標
private int prevTakeIndex;
// 上一個循環(huán)
private int prevCycles;
// 標記為空
private static final int NONE = -1;
// 刪除標記
private static final int REMOVED = -2;
// DETACH標記專用于prevTakeIndex
private static final int DETACHED = -3;
Itr() {
// 這是構造函數(shù)狗准,內部實現(xiàn)主要是初始化為主,
// 并且在Itrs不為空的時候進行一波清理操作
}
boolean isDetached() {
return prevTakeIndex < 0;
}
private int incCursor(int index) {
// 游標+1勺届,并重新計算值(判斷是否走完一個循環(huán)驶俊,是否等于putIndex)
if (++index == items.length)
index = 0;
if (index == putIndex)
index = NONE;
return index;
}
// 判斷給的刪除數(shù)是否是有效值
private boolean invalidated(int index, int prevTakeIndex,
long dequeues, int length) {
}
// 計算在迭代器的上一次操作后所有的刪除(出隊)操作
private void incorporateDequeues() {
// 主要方法為通過當前圈數(shù)和之前的圈數(shù)以及偏移量計算
// 真實的刪除數(shù),并且和prevTakeIndex以及index的偏移量進行比較
}
// 進行detach操作并進行清理
private void detach() {
}
// 判斷是否有下一個節(jié)點
public boolean hasNext() {
}
// 沒有下一個節(jié)點(沒有detach的節(jié)點將會被執(zhí)行detach操作)
private void noNext() {
}
// 找到下個節(jié)點
public E next() {
// 實現(xiàn)不復雜免姿,主要是需要判斷節(jié)點是否是detach模式
}
// 刪除節(jié)點
public void remove() {
}
// 當隊列為空或者后續(xù)很難找到下個節(jié)點的時候通知迭代器
void shutdown() {
}
// 輔助計算游標和prevTakeIndex之間的距離
private int distance(int index, int prevTakeIndex, int length) {
}
// 刪除節(jié)點
boolean removedAt(int removedIndex) {
}
// 當takeIndex歸0時調用
boolean takeIndexWrapped() {
}
}
總結
ArrayBlockingQueue
的實現(xiàn)可以說是比較的簡單清晰饼酿,主要是利用了ReentrantLock
內部的Condition
,通過設置兩個條件來巧妙地完成阻塞隊列的實現(xiàn),只要能夠理解這兩個條件的工作原理故俐,源碼的理解就沒有太大的難度想鹰。ArrayBlockingQueue
較難理解的反而是它內部的迭代器,由于阻塞隊列的特性药版,他的迭代器可能會有丟失當前數(shù)據(jù)的風險辑舷,因此,作者創(chuàng)作的時候加入了許多復雜的方法來保證可靠性槽片,但是在這里由于篇幅限制何缓,以及迭代器在阻塞隊列中的地位和重要性并不高,所以簡單講述还栓,如果有興趣可以自己找一份源碼閱讀碌廓。