什么是阻塞隊列钩乍?
- 阻塞隊列與我們平常接觸到的普通隊列(ArrayList)的最大不同點在于阻塞隊列的添加和刪除方法都是阻塞的
- 阻塞添加:當(dāng)阻塞隊列元素已滿時击儡,隊列會阻塞加入元素的線程,直到隊列元素不滿時才重新喚醒線程執(zhí)行元素加入操作
- 阻塞刪除:當(dāng)隊列元素為空時嗦枢,刪除隊列元素的線程將被阻塞致开,直到隊列不為空再執(zhí)行刪除操作
BlockingQueue
- BlockingQueue 能夠解決多線程中如何高效安全的傳輸數(shù)據(jù)的問題悼嫉,幫助我們快速搭建高質(zhì)量的多線程程序
- 通過隊列,可以使得數(shù)據(jù)由隊列的一端輸入回还,從另一端輸出
- 適用場景:生產(chǎn)者線程在一端生成裆泳,消費者線程在另一端消費
BlockingQueue
BlockingQueue 主要方法
-
插入方法:
- add(E e):
- 將元素插入到隊尾(如果立即可行且不會超過該隊列的容量)
- 成功返回 true,失敗拋 IllegalStateException 異常
- offer(E e,long timeout,TimeUnit unit):
- 將元素插入到隊尾(如果立即可行且不會超過該隊列的容量)
- 可設(shè)置超時時間柠硕,該方法可以中斷
- 成功返回 true工禾,如果隊列已滿,返回 false
- put(E e):
- 將元素插入到隊尾蝗柔,如果隊列已滿則一直等待(阻塞)
- add(E e):
-
刪除方法:
- remove(Object o):
- 移除指定元素闻葵,成功返回 true,失敗返回 false
- poll(long timeout, TimeUnit unit):
- 獲取并移除此隊列的頭元素癣丧,在指定等待的時間前一直等到獲取元素
- take():
- 獲取并移除隊列頭元素槽畔,若沒有元素則一直阻塞
- remove(Object o):
-
檢查方法:
- element():
- 獲取但不移除隊列的頭元素,沒有元素則拋異常
- peek():
- 獲取但不移除隊列的頭胁编,若隊列為空則返回 null
- element():
BlockingQueue 基本使用
- 該例子中使用 ArrayBlockingQueue厢钧,生產(chǎn)者(Producer)將字符串插入共享隊列中,消費者將它們?nèi)〕?/li>
public class BlockingQueueExample {
public static void main(String[] args) throws Exception {
BlockingQueue queue = new ArrayBlockingQueue(3);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
}
- 生產(chǎn)者通過 put() 方法將元素插入共享隊列中
public class Producer implements Runnable {
private BlockingQueue queue;
public Producer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
queue.put("1");
Thread.sleep(1000);
queue.put("2");
Thread.sleep(1000);
queue.put("3");
}catch (Exception e){
e.printStackTrace();
}
}
}
- 消費者通過 take() 方法從隊列中取出元素嬉橙,take() 方法會阻塞直到獲取元素為止
public class Consumer implements Runnable {
private BlockingQueue queue = null;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
}catch (Exception e){
e.printStackTrace();
}
}
}
BlockingQueue 接口實現(xiàn)類與源碼分析
ArrayBlockingQueue
- 分析:
- 基于數(shù)組的阻塞隊列實現(xiàn)早直,內(nèi)部維護(hù)了一個數(shù)組用于緩存隊列中的數(shù)據(jù)對象
- 有兩個 Integer 類型的索引,指向添加市框、獲取下一個元素的位置的索引
- 并通過一個重入鎖 ReentrantLock 和兩個 Condition 條件來實現(xiàn)阻塞
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 存儲數(shù)據(jù)的數(shù)組 */
final Object[] items;
/**獲取數(shù)據(jù)的索引霞扬,主要用于take,poll,peek祥得,remove方法 */
int takeIndex;
/**添加數(shù)據(jù)的索引兔沃,主要用于 put, offer, or add 方法*/
int putIndex;
/** 隊列元素的個數(shù) */
int count;
/** 控制并非訪問的鎖 */
final ReentrantLock lock;
/**notEmpty條件對象,用于通知take方法隊列已有元素级及,可執(zhí)行獲取操作 */
private final Condition notEmpty;
/**notFull條件對象乒疏,用于通知put方法隊列未滿,可執(zhí)行添加操作 */
private final Condition notFull;
/**
迭代器
*/
transient Itrs itrs = null;
}
- 分析:
- add 方法實際上是調(diào)用了 offer 方法
- enqueue(E x) 方法內(nèi)部通過 putIndex 索引直接將元素添加到數(shù)組 item 中饮焦,當(dāng) putIndex 索引大小等于數(shù)組長度時怕吴,需要將 putIndex 重新設(shè)置為 0,這是因為當(dāng)前隊列元素總是從隊頭獲取县踢,從隊尾添加
//add方法實現(xiàn)转绷,間接調(diào)用了offer(e)
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
//offer方法
public boolean offer(E e) {
checkNotNull(e);//檢查元素是否為null
final ReentrantLock lock = this.lock;
lock.lock();//加鎖
try {
if (count == items.length)//判斷隊列是否滿
return false;
else {
enqueue(e);//添加元素到隊列
return true;
}
} finally {
lock.unlock();
}
}
//入隊操作
private void enqueue(E x) {
//獲取當(dāng)前數(shù)組
final Object[] items = this.items;
//通過putIndex索引對數(shù)組進(jìn)行賦值
items[putIndex] = x;
//索引自增,如果已是最后一個位置硼啤,重新設(shè)置 putIndex = 0;
if (++putIndex == items.length)
putIndex = 0;
count++;//隊列中元素數(shù)量加1
//喚醒調(diào)用take()方法的線程议经,執(zhí)行元素獲取操作。
notEmpty.signal();
}
- 分析:
- put 方法是一個阻塞方法谴返,如果隊列元素已滿煞肾,那么當(dāng)前線程將會被 notFull 條件對象掛起加入到等待隊列中,直到有空擋才會喚醒執(zhí)行添加操作
//put方法嗓袱,阻塞時可中斷
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//該方法可中斷
try {
//當(dāng)隊列元素個數(shù)與數(shù)組長度相等時籍救,無法添加元素
while (count == items.length)
//將當(dāng)前調(diào)用線程掛起,添加到notFull條件隊列中等待喚醒
notFull.await();
enqueue(e);//如果隊列沒有滿直接添加渠抹。蝙昙。
} finally {
lock.unlock();
}
}
LinkedBlockingQueue
- 基于鏈表的阻塞隊列,內(nèi)部維護(hù)著一個鏈表構(gòu)成的緩沖隊列梧却,用于緩存隊列中的數(shù)據(jù)對象
- 在正常情況下鏈表阻塞隊列的吞吐量要高于數(shù)組的阻塞隊列(ArrayBlockingQueue)奇颠,因為其內(nèi)部實現(xiàn)添加和刪除操作使用了兩個 ReentrantLock 來控制并發(fā)執(zhí)行(插入、獲取各有一個鎖)篮幢,而 ArrayBlockingQueue 內(nèi)部只使用一個 ReentrantLock 控制并發(fā)
- 它與 ArrayBlockingQueue 的 API 幾乎一致但內(nèi)部實現(xiàn)原理不太相同
- 當(dāng)創(chuàng)建一個 LinkedBlockingQueue 時大刊,默認(rèn)阻塞隊列中元素的數(shù)量大小為 Interger.MAX_VALUE
LinkedBlockingQueue 和 ArrayBlockingQueue 的區(qū)別
- 隊列大小有所不同:ArrayBlockingQueue 必須指定隊列大小,而 LinkedBlockingQueue 默認(rèn)為 Integer.MAX_VALUE(當(dāng)元素添加速度大于移除速度時三椿,需要注意一下缺菌,以免內(nèi)存溢出)
- 實現(xiàn)結(jié)構(gòu)不同:ArrayBlockingQueue 采用數(shù)組實現(xiàn)、而 LinkedBlockingQueue 采用鏈表實現(xiàn)
- 由于 ArrayBlockingQueue 采用數(shù)組存儲隊列元素搜锰,因此再插入伴郁、刪除元素時不會產(chǎn)生或銷毀任何額外的對象實例,而 LinkedBlockingQueue 每次插入都會生成一個新的結(jié)點(Node)對象蛋叼,這會影響日后 GC 垃圾回收
- ArrayBlockingQueue 中添加焊傅、刪除操作只使用一個鎖(ReentrantLock)剂陡,而 LinkedBlockingQueue 添加、刪除操作各使用一個鎖狐胎,因此 LinkedBlockingQueue 的并發(fā)吞吐量大于 ArrayBlockingQueue