在Java多線程應(yīng)用中三痰,隊列的使用率很高吧寺,多數(shù)生產(chǎn)消費模型的首選數(shù)據(jù)結(jié)構(gòu)就是隊列窜管。Java提供的線程安全的Queue可以分為阻塞隊列和非阻塞隊列,其中阻塞隊列的典型例子是BlockingQueue稚机,非阻塞隊列的典型例子是ConcurrentLinkedQueue幕帆,在實際應(yīng)用中要根據(jù)實際需要選用阻塞隊列或者非阻塞隊列。
注:什么叫線程安全赖条?這個首先要明確失乾。線程安全的類 ,指的是類內(nèi)共享的全局變量的訪問必須保證是不受多線程形式影響的纬乍。如果由于多線程的訪問(比如修改碱茁、遍歷、查看)而使這些變量結(jié)構(gòu)被破壞或者針對這些變量操作的原子性被破壞仿贬,則這個類就不是線程安全的纽竣。
BlockingQueue
BlockingQueue,顧名思義茧泪,“阻塞隊列”:可以提供阻塞功能的隊列蜓氨。
首先,看看BlockingQueue提供的常用方法:
---- | 可能報異常 | 返回布爾值 | 可能阻塞 | 設(shè)定等待時間 |
---|---|---|---|---|
入隊 | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
出隊 | remove() | poll() | take() | poll(timeout, unit) |
查看 | element() | peek() | 無 | 無 |
強調(diào)
- add(e) remove() element() 方法不會阻塞線程队伟。當(dāng)不滿足約束條件時穴吹,會拋出IllegalStateException 異常。例如:當(dāng)隊列被元素填滿后缰泡,再調(diào)用add(e)刀荒,則會拋出異常。
- offer(e) poll() peek() 方法即不會阻塞線程棘钞,也不會拋出異常缠借。例如:當(dāng)隊列被元素填滿后,再調(diào)用offer(e)宜猜,則不會插入元素泼返,函數(shù)返回false。
- 要想要實現(xiàn)阻塞功能姨拥,需要調(diào)用put(e) take() 方法绅喉。當(dāng)不滿足約束條件時,會阻塞線程叫乌。
以ArrayBlockingQueue類為例:
- 第一類方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");//隊列已滿柴罐,拋異常
}
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();//隊列為空,拋異常
}
- 第二類方法
public boolean offer(E e) {
if (e == null)throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)//隊列已滿憨奸,返回false
return false;
else {
insert(e);//insert方法中發(fā)出了notEmpty.signal();
return true;
}
} finally {
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == 0)//隊列為空革屠,返回false
return null;
E x = extract();//extract方法中發(fā)出了notFull.signal();
return x;
} finally {
lock.unlock();
}
}
- 第三類方法(這里面涉及到Condition類,簡要提一下)
await方法指:造成當(dāng)前線程在接到信號或被中斷之前一直處于等待狀態(tài)。
signal方法指:喚醒一個等待線程似芝。
public void put(E e)throws InterruptedException {
if (e == null)throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == items.length)//如果隊列已滿那婉,等待notFull這個條件,這時當(dāng)前線程被阻塞
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); //喚醒受notFull阻塞的當(dāng)前線程
throw ie;
}
insert(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == 0)//如果隊列為空党瓮,等待notEmpty這個條件详炬,這時當(dāng)前線程被阻塞
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal();//喚醒受notEmpty阻塞的當(dāng)前線程
throw ie;
}
E x = extract();
return x;
} finally {
lock.unlock();
}
}
- 第四類方法就是指在有必要時等待指定時間,就不詳細說了寞奸。
BlockingQueue接口的具體實現(xiàn)類
- ArrayBlockingQueue呛谜,其構(gòu)造函數(shù)必須帶一個int參數(shù)來指明其大小
- LinkedBlockingQueue,若其構(gòu)造函數(shù)帶一個規(guī)定大小的參數(shù)蝇闭,生成的BlockingQueue有大小限制呻率,若不帶大小參數(shù),所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定
- PriorityBlockingQueue呻引,其所含對象的排序不是FIFO,而是依據(jù)對象的自然排序順序或者是構(gòu)造函數(shù)的Comparator決定的順序
上面是用ArrayBlockingQueue舉得例子礼仗,下面看看LinkedBlockingQueue:
首先,既然是鏈表逻悠,就應(yīng)該有Node節(jié)點元践,它是一個內(nèi)部靜態(tài)類:
static class Node<E> {
/** The item, volatile to ensure barrier separating write and read */
volatile E item;
Node<E> next;
Node(E x) { item = x; }
}
然后,對于鏈表來說童谒,肯定需要兩個變量來標示頭和尾:
/** 頭指針 */
private transient Node<E> head;//head.next是隊列的頭元素
/** 尾指針 */
private transient Node<E> last;//last.next是null
那么单旁,對于入隊和出隊就很自然能理解了:
private void enqueue(E x) {
last = last.next = new Node<E>(x);//入隊是為last再找個下家
}
private E dequeue() {
Node<E> first = head.next; //出隊是把head.next取出來,然后將head向后移一位
head = first;
E x = first.item;
first.item = null;
return x;
}
另外饥伊,LinkedBlockingQueue相對于ArrayBlockingQueue還有不同是象浑,有兩個ReentrantLock,且隊列現(xiàn)有元素的大小由一個AtomicInteger對象標示琅豆。
- 注:AtomicInteger類是以原子的方式操作整型變量愉豺。
private final AtomicInteger count =new AtomicInteger(0);
/** 用于讀取的獨占鎖*/
private final ReentrantLock takeLock =new ReentrantLock();
/** 隊列是否為空的條件 */
private final Condition notEmpty = takeLock.newCondition();
/** 用于寫入的獨占鎖 */
private final ReentrantLock putLock =new ReentrantLock();
/** 隊列是否已滿的條件 */
private final Condition notFull = putLock.newCondition();
有兩個Condition很好理解,在ArrayBlockingQueue也是這樣做的茫因。但是為什么需要兩個ReentrantLock呢蚪拦?下面會慢慢道來。
讓我們來看看offer和poll方法的代碼:
public boolean offer(E e) {
if (e == null)throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
final ReentrantLock putLock =this.putLock;//入隊當(dāng)然用putLock
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(e); //入隊
c = count.getAndIncrement(); //隊長度+1
if (c + 1 < capacity)
notFull.signal(); //隊列沒滿冻押,當(dāng)然可以解鎖了
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();//這個方法里發(fā)出了notEmpty.signal();
return c >= 0;
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock =this.takeLock;出隊當(dāng)然用takeLock
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();//出隊
c = count.getAndDecrement();//隊長度-1
if (c > 1)
notEmpty.signal();//隊列沒空驰贷,解鎖
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();//這個方法里發(fā)出了notFull.signal();
return x;
}
看看源代碼發(fā)現(xiàn)和上面ArrayBlockingQueue的很類似,關(guān)鍵的問題在于:為什么要用兩個ReentrantLockputLock和takeLock洛巢?
我們仔細想一下括袒,入隊操作其實操作的只有隊尾引用last,并且沒有牽涉到head稿茉。而出隊操作其實只針對head箱熬,和last沒有關(guān)系类垦。那么就是說入隊和出隊的操作完全不需要公用一把鎖,所以就設(shè)計了兩個鎖城须,這樣就實現(xiàn)了多個不同任務(wù)的線程入隊的同時可以進行出隊的操作,另一方面由于兩個操作所共同使用的count是AtomicInteger類型的米苹,所以完全不用考慮計數(shù)器遞增遞減的問題糕伐。
另外,還有一點需要說明一下:await()和singal()這兩個方法執(zhí)行時都會檢查當(dāng)前線程是否是獨占鎖的當(dāng)前線程蘸嘶,如果不是則拋出java.lang.IllegalMonitorStateException異常良瞧。所以可以看到在源碼中這兩個方法都出現(xiàn)在Lock的保護塊中。
=============分隔================
下面再來說說ConcurrentLinkedQueue训唱,它是一個無鎖的并發(fā)線程安全的隊列褥蚯。
對比鎖機制的實現(xiàn),使用無鎖機制的難點在于要充分考慮線程間的協(xié)調(diào)况增。簡單的說就是多個線程對內(nèi)部數(shù)據(jù)結(jié)構(gòu)進行訪問時赞庶,如果其中一個線程執(zhí)行的中途因為一些原因出現(xiàn)故障,其他的線程能夠檢測并幫助完成剩下的操作澳骤。這就需要把對數(shù)據(jù)結(jié)構(gòu)的操作過程精細的劃分成多個狀態(tài)或階段歧强,考慮每個階段或狀態(tài)多線程訪問會出現(xiàn)的情況。
ConcurrentLinkedQueue有兩個volatile的線程共享變量:head为肮,tail摊册。要保證這個隊列的線程安全就是保證對這兩個Node的引用的訪問(更新,查看)的原子性和可見性颊艳,由于volatile本身能夠保證可見性茅特,所以就是對其修改的原子性要被保證。
下面通過offer方法的實現(xiàn)來看看在無鎖情況下如何保證原子性:
public boolean offer(E e) {
if (e == null)throw new NullPointerException();
Node<E> n = new Node<E>(e, null);
for (;;) {
Node<E> t = tail;
Node<E> s = t.getNext();
if (t == tail) { //------------------------------a
if (s == null) {//---------------------------b
if (t.casNext(s, n)) { //-------------------c
casTail(t, n); //------------------------d
return true;
}
} else {
casTail(t, s); //----------------------------e
}
}
}
}
此方法的循環(huán)內(nèi)首先獲得尾指針和其next指向的對象棋枕,由于tail和Node的next均是volatile的白修,所以保證了獲得的分別都是最新的值。
代碼a:t==tail是最上層的協(xié)調(diào)戒悠,如果其他線程改變了tail的引用熬荆,則說明現(xiàn)在獲得不是最新的尾指針需要重新循環(huán)獲得最新的值。
代碼b:s==null的判斷绸狐。靜止狀態(tài)下tail的next一定是指向null的卤恳,但是多線程下的另一個狀態(tài)就是中間態(tài):tail的指向沒有改變,但是其next已經(jīng)指向新的結(jié)點寒矿,即完成tail引用改變前的狀態(tài)突琳,這時候s!=null。這里就是協(xié)調(diào)的典型應(yīng)用符相,直接進入代碼e去協(xié)調(diào)參與中間態(tài)的線程去完成最后的更新拆融,然后重新循環(huán)獲得新的tail開始自己的新一次的入隊嘗試蠢琳。另外值得注意的是a,b之間,其他的線程可能會改變tail的指向镜豹,使得協(xié)調(diào)的操作失敗傲须。從這個步驟可以看到無鎖實現(xiàn)的復(fù)雜性。
代碼c:t.casNext(s, n)是入隊的第一步趟脂,因為入隊需要兩步:更新Node的next泰讽,改變tail的指向。代碼c之前可能發(fā)生tail引用指向的改變或者進入更新的中間態(tài)昔期,這兩種情況均會使得t指向的元素的next屬性被原子的改變已卸,不再指向null。這時代碼c操作失敗硼一,重新進入循環(huán)累澡。
代碼d:這是完成更新的最后一步了,就是更新tail的指向般贼,最有意思的協(xié)調(diào)在這兒又有了體現(xiàn)愧哟。從代碼看casTail(t, n)不管是否成功都會接著返回true標志著更新的成功。首先如果成功則表明本線程完成了兩步的更新具伍,返回true是理所當(dāng)然的翅雏;如果 casTail(t, n)不成功呢?要清楚的是完成代碼c則代表著更新進入了中間態(tài)人芽,代碼d不成功則是tail的指向被其他線程改變望几。意味著對于其他的線程而言:它們得到的是中間態(tài)的更新,s!=null萤厅,進入代碼e幫助本線程執(zhí)行最后一步并且先于本線程成功橄抹。這樣本線程雖然代碼d失敗了,但是是由于別的線程的協(xié)助先完成了惕味,所以返回true也就理所當(dāng)然了楼誓。
通過分析這個入隊的操作,可以清晰的看到無鎖實現(xiàn)的每個步驟和狀態(tài)下多線程之間的協(xié)調(diào)和工作名挥。
注:上面這大段文字看起來很累疟羹,先能看懂多少看懂多少,現(xiàn)在看不懂先不急禀倔,下面還會提到這個算法榄融,并且用示意圖說明,就易懂很多了救湖。
在使用ConcurrentLinkedQueue時要注意愧杯,如果直接使用它提供的函數(shù),比如add或者poll方法鞋既,這樣我們自己不需要做任何同步力九。
但如果是非原子操作耍铜,比如:
if(!queue.isEmpty()) {
queue.poll(obj);
}
我們很難保證,在調(diào)用了isEmpty()之后跌前,poll()之前棕兼,這個queue沒有被其他線程修改。所以對于這種情況舒萎,我們還是需要自己同步:
synchronized(queue) {
if(!queue.isEmpty()) {
queue.poll(obj);
}
}
- 注:這種需要進行自己同步的情況要視情況而定程储,不是任何情況下都需要這樣做。
另外還說一下臂寝,ConcurrentLinkedQueue的size()是要遍歷一遍集合的,所以盡量要避免用size而改用isEmpty()摊灭,以免性能過慢咆贬。
[文章轉(zhuǎn)載自]madun大神