最近得閑,看了一下阻塞隊列的相關(guān)實現(xiàn)疾渴。在此分享一下我的一些體會。
BlockingQueue
ArrayBlockingQueue實現(xiàn)了接口BlockingQueue。顧名思義跨蟹,BlockingQueue就是阻塞隊列的意思。這包含兩層意思:一橘沥、隊列為空窗轩,可以?阻塞讀請求直到隊列中有元素再返回。二座咆、隊列為滿痢艺,可以?阻塞寫請求直到隊列中有空位可以存放元素仓洼。請注意,這里只是可以堤舒,而不是一定色建。
BlockingQueue對讀取、插入植酥、刪除提供了四種不同的方法:1镀岛、拋出異常;2友驮、返回特殊值漂羊;3、阻塞直到操作成功卸留;4走越、阻塞直到成功或者超時。參見下圖:
其中Special value(特殊值)是指null或者true/false耻瑟。注意旨指,阻塞隊列是不允許存入null的,因為null已經(jīng)在設(shè)計時被定為返回的特殊值喳整。
BlockingQueue的實現(xiàn)是線程安全的谆构,但是并不代表它所有的方法(包括繼承的方法)都是線程安全的,比如addAll框都、containsAll搬素、removeAll等方法。這些方法并一定不是原子操作(參見AbstractQueue)魏保。
說了這么多熬尺,BlockingQueue最適用的場景是什么呢?生產(chǎn)者消費(fèi)者模型谓罗。
ArrayBlockingQueue
ArrayBlockingQueue是BlockingQueue的實現(xiàn)之一粱哼,底層采用數(shù)組來實現(xiàn)。
從下面的代碼中檩咱,我們可以看到有一個ReentrantLock揭措,兩個Condition。一個notFull的Condition税手,一個notEmpty的Condition蜂筹,分別表示隊列存在空位的情況(可寫)和非空的情況(可讀)。
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
如果你對AQS的細(xì)節(jié)了解的話芦倒,這里的代碼也就好解釋了。有一個讀線程隊列不翩,有一個寫線程隊列兵扬,還有一個獲取獨占鎖的阻塞隊列麻裳。當(dāng)線程讀取到隊列中沒有元素時,會進(jìn)入讀線程隊列器钟;當(dāng)線程寫入滿隊列時津坑,會進(jìn)入寫線程隊列。只有等到各自的Condition發(fā)出signal時傲霸,才會轉(zhuǎn)移到阻塞隊列疆瑰,搶奪獨占鎖。
ArrayBlockingQueue有三個構(gòu)造函數(shù):
1. 指定隊列容量昙啄,限制隊列的最大長度穆役。
2. 指定獨占鎖是公平鎖還是非公平鎖。
3. 使用集合來初始化隊列梳凛。
第三個構(gòu)造函數(shù)有一個特別的地方:
在lock.lock()處加了一處注釋耿币。只是為了可見性加鎖,而不是為了互斥加鎖韧拒。這里的可見性是指初始化數(shù)組用的集合參數(shù)的可見性淹接。需要把整個集合添加到數(shù)組之后才能釋放鎖。
我們在看ArrayBlockingQueue的成員變量時發(fā)現(xiàn)有putIndex和takeIndex叛溢。這兩個標(biāo)志位分別表示插入元素時存放的位置和取出元素時的位置塑悼。ArrayBlockingQueue采用了數(shù)組循環(huán)存儲的方式來存儲數(shù)據(jù)。數(shù)組的最后一個位置被占據(jù)之后楷掉,會回到起點(index=0)存放新的數(shù)據(jù)厢蒜。這里的putIndex就負(fù)責(zé)指向下一個存放的位置。
暫時寫到這里靖诗,等后續(xù)有時間了再補(bǔ)上郭怪。