在Java中勘伺,BlockingQueue是一個接口,它的實現(xiàn)類有 ArrayBlockingQueue、DelayQueue盹愚、 LinkedBlockingDeque、 LinkedBlockingQueue站故、PriorityBlockingQueue皆怕、 SynchronousQueue等毅舆,它們的區(qū)別主要體現(xiàn)在存儲結(jié)構(gòu)上或?qū)υ?操作上的不同,但是對于take與put操作的原理愈腾,卻是類似的憋活。
阻塞與非阻塞
-
入隊
offer(E e):如果隊列沒滿,立即返回true虱黄;如果隊列滿了悦即,立即返回 false --> 不阻塞
put(E e):如果隊列滿了,一直阻塞橱乱,直到隊列不滿了或者線程被中 斷 --> 阻塞
-
出隊
poll():如果沒有元素辜梳,直接返回null;如果有元素泳叠,出隊 --> 不阻塞
take():如果隊列空了作瞄,一直阻塞,直到隊列不為空或者線程被中斷 --> 阻塞
一危纫、LinkedBlockingQueue
LinkedBlockingQueue可以指定容量宗挥,如果在初始化時沒有指定容量,那么默認使用int的最大值作為隊列容量种蝶。內(nèi)部維持一個隊列属韧,所以有一個頭節(jié)點head和一個尾節(jié)點last,內(nèi)部維持兩把鎖蛤吓,一個用于入隊宵喂,一個用于出隊,還有鎖關(guān)聯(lián)的Condition對象会傲。
1锅棕、底層數(shù)據(jù)結(jié)構(gòu)
LinkedBlockingQueue內(nèi)部是使用鏈表實現(xiàn)一個隊列的,但是有別于一般的隊列淌山,在于該隊列至少是有一個節(jié)點的裸燎,頭節(jié)點不含有元素。如果隊列為空時泼疑,頭節(jié)點的next參數(shù)為null德绿。尾節(jié)點的next參數(shù)也為null。
2退渗、主要變量
// 容量限制移稳,如果沒有指定,則為 Integer.MAX_VALUE
private final int capacity;
// 當前隊列中的元素數(shù)量
private final AtomicInteger count = new AtomicInteger();
// 隊列頭節(jié)點会油,始終滿足head.item == null
transient Node<E> head;
// 隊列的尾節(jié)點个粱,始終滿足last.next == null
private transient Node<E> last;
// 由 take、poll 等持有的鎖
private final ReentrantLock takeLock = new ReentrantLock();
// 當隊列為空時翻翩,保存執(zhí)行出隊的線程
private final Condition notEmpty = takeLock.newCondition();
// 由 put都许、offer 等持有的鎖
private final ReentrantLock putLock = new ReentrantLock();
// 當隊列滿時稻薇,保存執(zhí)行入隊的線程
private final Condition notFull = putLock.newCondition();
3、put(E e)方法
put(E e)方法用于將一個元素插入到隊列的尾部胶征。在隊列滿時會阻塞塞椎,直到隊列中有元素被取出。
// 在此隊列的尾部插入指定元素睛低,如有必要忱屑,等待空間可用。
public void put(E e) throws InterruptedException {
// 不允許元素為null
if (e == null) throw new NullPointerException();
int c = -1;
// 以當前元素新建一個節(jié)點
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 獲得入隊的鎖
putLock.lockInterruptibly();
try {
// 如果隊列已滿暇昂,那么將該線程加入到Condition的等待隊列中
while (count.get() == capacity) {
notFull.await();
}
// 當隊列未滿莺戒,然后有出隊線程取出導(dǎo)致,將節(jié)點入隊
enqueue(node);
// 得到插入之前隊列的元素個數(shù)急波。getAndIncrement返回的是 +1 前的值
c = count.getAndIncrement();
// 如果還可以插入元素从铲,那么釋放等待的入隊線程
if (c + 1 < capacity)
notFull.signal();
} finally {
// 釋放入隊的鎖
putLock.unlock();
}
// 如果插入隊列之前元素個數(shù)為0,插入后就通知出隊線程隊列非空
if (c == 0)
signalNotEmpty();
}
put方法總結(jié):
1澄暮、LinkedBlockingQueue不允許插入的元素為null名段;
2、同一時刻只有一個線程可以進行入隊操作泣懊,putLock在將元素插入隊列尾部時加鎖了伸辟;
3、如果隊列滿了馍刮,則會調(diào)用notFull.await(),將該線程加入到Condition等待隊列中信夫。await方法會釋放線程占有的鎖,這將導(dǎo)致之前由于被阻塞的入隊線程將會獲取到鎖卡啰,執(zhí)行到while循環(huán)處静稻,不過可能因為隊列仍舊是滿的,也被進入到條件隊列中匈辱;
4振湾、一旦有出隊線程取走元素,就會通知到入隊等待隊列釋放線程亡脸。那么第一個加入到Condition隊列中的將會被釋放押搪,那么該線程將會重新獲得put鎖,繼而執(zhí)行enqueue()方法浅碾,將節(jié)點插入到隊列的尾部大州;
5、然后得到插入隊列前元素的個數(shù)及穗,如果插入后隊列中還可以繼續(xù)插入元素摧茴,那么就通知notFull條件的等待隊列中的線程绵载;
6埂陆、如果插入隊列前個數(shù)為0苛白,那現(xiàn)在插入后,就為1了焚虱,那就可以通知因為隊列為空而導(dǎo)致阻塞的出隊線程去取元素了购裙。
4、E take()方法
take()方法用于得到隊頭的元素鹃栽,在隊列為空時會阻塞躏率,直到隊列中有元素可取。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 獲取takeLock鎖
takeLock.lockInterruptibly();
try {
// 如果隊列中元素數(shù)量為0民鼓,則將出隊線程加入到notEmpty隊列中進行等待
while (count.get() == 0) {
notEmpty.await();
}
// 得到到隊頭的元素
x = dequeue();
// 得到出隊列前元素的個數(shù)薇芝。getAndDecrement返回的是 -1 前的值
c = count.getAndDecrement();
// 如果出隊列前的元素數(shù)量大于1,那說明還可以繼續(xù)取丰嘉,那就釋放在notEmpty隊列的第一個線程
if (c > 1)
notEmpty.signal();
} finally {
// 釋放出隊鎖
takeLock.unlock();
}
// 如果出隊前隊列是滿的夯到,那現(xiàn)在取走一個元素了,隊列就不滿了饮亏,就可以去通知等待中的入隊線程了耍贾。
if (c == capacity)
signalNotFull();
return x;
}
take方法總結(jié):
1、同一時刻只有一個線程可以進行出隊操作路幸,takeLock在出隊之前加鎖了荐开;
2、如果隊列中元素為空简肴,那就進入notEmpty隊列中進行等待晃听。直到隊列不為空時,得到隊列中的第一個元素砰识。當發(fā)現(xiàn)取完發(fā)現(xiàn)還有元素可取時杂伟,再通知一下notEmpty隊列中等待的其他線程。最后判斷自己取元素前的是不是滿的仍翰,如果是滿的赫粥,那自己取完,就不滿了予借,就可以通知在notFull隊列中等待插入的線程進行put了越平。
5、remove()方法
remove()方法用于刪除隊列中一個元素灵迫,如果隊列中不含有該元素秦叛,那么返回 false ;有的話則刪除并返回true瀑粥。入隊和出隊都是只獲取一個鎖挣跋,而 remove()方法需要同時獲得兩把鎖,
public boolean remove(Object o) {
// 因為隊列不包含null元素狞换,返回false
if (o == null) return false;
// 獲取兩把鎖
fullyLock();
try {
// 從頭的下一個節(jié)點開始遍歷
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
// 如果匹配避咆,那么將節(jié)點從隊列中移除舟肉,trail表示需要刪除節(jié)點的前一節(jié)點
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
// 釋放兩把鎖
fullyUnlock();
}
}
/**
* 鎖定以防止 put 和 take.
*/
void fullyLock() {
putLock.lock();
takeLock.lock();
}
/**
* 解鎖以允許 put 和 take.
*/
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
6、總結(jié)
LinkedBlockingQueue允許兩個線程同時在兩端進行入隊和出隊操作查库,但一端同時只能有一個線程進行操作路媚,是通過兩個鎖進行區(qū)分的。
為了維護底部數(shù)據(jù)的統(tǒng)一樊销,引入了AtomicInteger的一個count變量整慎,表示隊列中元素的個數(shù)。count只能在兩個地方變化围苫,一個是入隊的方法(進行+1操作)裤园,另一個是出隊的方法(進行-1操作),而AtomicInteger是原子安全的剂府,所以也就確保了底層隊列的數(shù)據(jù)同步比然。
二、ArrayBlockingQueue
// 構(gòu)造方法
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
// 創(chuàng)建具有給定(固定)容量和指定訪問策略的ArrayBlockingQueue 周循。
// 參數(shù):capacity —— 這個隊列的容量
fair —— 如果為true 强法,則在插入或刪除時阻塞的線程的隊列訪問將按 FIFO 順序處理;如果為false 湾笛,則未指定訪問順序饮怯。
public ArrayBlockingQueue(int capacity, boolean fair) {
// 如果capacity < 1,拋出異常
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
1嚎研、底層數(shù)據(jù)結(jié)構(gòu)
ArrayBlockingQueue內(nèi)部是使用數(shù)組實現(xiàn)一個隊列的蓖墅,并且在構(gòu)造方法中就需要指定容量,也就意味著底層數(shù)組一旦創(chuàng)建了临扮,容量就不能改變了论矾,因此ArrayBlockingQueue是一個容量限制的阻塞隊列。因此在隊列滿的時候執(zhí)行入隊會阻塞杆勇,在隊列為空時出隊也會阻塞贪壳。
2、主要變量
// 元素數(shù)組蚜退,其長度在構(gòu)造方法中指定
final Object[] items;
// 隊列中實際的元素數(shù)量
int count;
// 保護所有通道的主鎖
final ReentrantLock lock;
// 等待take條件的隊列
private final Condition notEmpty;
// 等待put條件的隊列
private final Condition notFull;
3闰靴、put(E e)
在此隊列的尾部插入指定元素,如果隊列已滿钻注,則等待空間可用
public void put(E e) throws InterruptedException {
// 檢查元素是否為null蚂且,如果是,拋出NullPointerException
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
// 加鎖
lock.lockInterruptibly();
try {
// 當隊里中的元素數(shù)量等于數(shù)組長度幅恋,則隊列已滿杏死,阻塞,等待隊列成為不滿狀態(tài)
while (count == items.length)
notFull.await();
// 將元素入隊
enqueue(e);
} finally {
// 釋放鎖
lock.unlock();
}
}
put方法總結(jié):
1、ArrayBlockingQueue不允許添加null元素淑翼;
2腐巢、ArrayBlockingQueue在隊列已滿的時候,會調(diào)用notFull.await()窒舟,釋放鎖并處于阻塞狀態(tài)系忙;
3诵盼、一旦ArrayBlockingQueue在隊列不滿的時候惠豺,就立即入隊。
4风宁、E take()
取出此隊列的頭部元素洁墙,如果隊列空,則阻塞戒财,等待元素可取热监。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加鎖
lock.lockInterruptibly();
try {
// 當隊列中元素數(shù)量為0時,則進入阻塞狀態(tài)
while (count == 0)
notEmpty.await();
// 隊列不為空是饮寞,調(diào)用dequeue()出隊
return dequeue();
} finally {
// 釋放鎖
lock.unlock();
}
}
take方法總結(jié):
1孝扛、取元素時,一旦獲得鎖幽崩,隊列為空苦始, 則會阻塞,直至不為空慌申,調(diào)用dequeue()出隊陌选。
5、總結(jié)
ArrayBlockingQueue是一個底層結(jié)構(gòu)是數(shù)組的阻塞隊列蹄溉,是通過 ReentrantLock 和 Condition 來實現(xiàn)的咨油。不可插入為null的元素,入隊和出隊使用的是同一個鎖柒爵。意味著同一時刻只能有一個線程能進行入隊或者出隊操作役电。入隊時,隊列已滿則會調(diào)用notFull.await()棉胀,進入阻塞狀態(tài)宴霸。直到隊列不滿時,再進行入隊操作膏蚓。當出隊時瓢谢,隊列為空,則調(diào)用notEmpty.await()驮瞧,進入阻塞狀態(tài)氓扛,直到隊列不為空時,則出隊。
三采郎、LinkedBlockingQueue和ArrayBlockingQueue的區(qū)別
1千所、底層實現(xiàn)不同
LinkedBlockingQueue底層實現(xiàn)是鏈表,ArrayBlockingQueue底層實現(xiàn)是數(shù)組
2蒜埋、隊列容量
LinkedBlockingQueue默認的隊列長度是Integer.Max淫痰,但是可以指定容量。在入隊與出隊都高并發(fā)的情況下整份,性能比ArrayBlockingQueue高很多待错;
ArrayBlockingQueue必須在構(gòu)造方法中指定隊列長度,不可變烈评。在只有入隊高并發(fā)或出隊高并發(fā)的情況下火俄,因為操作數(shù)組,且不需要擴容讲冠,性能很高瓜客。
3、鎖的數(shù)量
LinkedBlockingQueue有兩把鎖竿开,可以有兩個線程同時進行入隊和出隊操作谱仪,但同時只能有一個線程進行入隊或出隊操作。
ArrayBlockingQueue只有一把鎖否彩,同時只能有一個線程進行入隊和出隊操作疯攒。