LinkedBlockingQueue基于鏈表實現(xiàn)先紫,未指定容量時默認容量為Integer.MAX_VALUE,即無界阻塞隊列筹煮,節(jié)點動態(tài)創(chuàng)建遮精,節(jié)點出隊后可被GC,伸縮性較好寺谤;如果消費者速度慢于生產(chǎn)者速度仑鸥,可能造成內(nèi)存空間不足,建議手動設置隊列大小变屁。
采用“two lock queue”算法變體眼俊,雙鎖(ReentrantLock):takeLock、putLock粟关,允許讀寫并行疮胖,remove(e)和迭代器iterators需要獲取2個鎖。
LinkedBlockingQueue同步機制:
ArrayBlockingQueue底層基于數(shù)組闷板,創(chuàng)建時必須指定隊列大小澎灸,節(jié)點數(shù)量一開始就固定,“有界”
ArrayBlockingQueue入隊和出隊使用同一個lock(但數(shù)據(jù)讀寫操作已非常簡潔)遮晚,讀取和寫入操作無法并行性昭。
ArrayBlockingQueue 同步機制:
LinkedBlockingQueue使用雙鎖可并行讀寫,其吞吐量更高县遣。
ArrayBlockingQueue在插入或刪除元素時直接放入數(shù)組指定位置(putIndex糜颠、takeIndex),不會產(chǎn)生或銷毀任何額外的對象實例萧求;而LinkedBlockingQueue則會生成一個額外的Node對象其兴,在高效并發(fā)處理大量數(shù)據(jù)時,對GC的影響存在一定的區(qū)別夸政。
在大部分并發(fā)場景下元旬,LinkedBlockingQueue的吞吐量比ArrayBlockingQueue更好。
// linkedblockingqueue.java
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// 這里為什么是 -1 這就是個標識成功守问、失敗的標志而已匀归。
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 必須要獲取到 putLock 才可以進行插入操作
putLock.lockInterruptibly();
try {
// 如果隊列滿,等待 notFull 的條件滿足耗帕。
while (count.get() == capacity) {
notFull.await();
}
// 入隊
enqueue(node);
// count 原子加 1朋譬,c 還是加 1 前的值
c = count.getAndIncrement();
// 如果這個元素入隊后,還有至少一個槽可以使用兴垦,調(diào)用 notFull.signal() 喚醒等待線程徙赢。
if (c + 1 < capacity)
notFull.signal();
} finally {
// 入隊后字柠,釋放掉 putLock
putLock.unlock();
}
// 如果 c == 0,那么代表隊列在這個元素入隊前是空的(不包括head空節(jié)點)狡赐,
// 那么所有的讀線程都在等待 notEmpty 這個條件窑业,等待喚醒,這里做一次喚醒操作
if (c == 0)
signalNotEmpty();
}
// linkedblockingqueue.java
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 首先枕屉,需要獲取到 takeLock 才能進行出隊操作
takeLock.lockInterruptibly();
try {
// 如果隊列為空常柄,等待 notEmpty 這個條件滿足再繼續(xù)執(zhí)行
while (count.get() == 0) {
notEmpty.await();
}
// 出隊
x = dequeue();
// count 進行原子減 1
c = count.getAndDecrement();
// 如果這次出隊后,隊列中至少還有一個元素搀擂,那么調(diào)用 notEmpty.signal() 喚醒其他的讀線程
if (c > 1)
notEmpty.signal();
} finally {
// 出隊后釋放掉 takeLock
takeLock.unlock();
}
// 如果 c == capacity西潘,那么說明在這個 take 方法發(fā)生的時候,隊列是滿的
// 既然出隊了一個哨颂,那么意味著隊列不滿了喷市,喚醒寫線程去寫
if (c == capacity)
signalNotFull();
return x;
}
兩種不同的生產(chǎn)者消費者協(xié)調(diào)策略:
在ArrayBlockingQueue中,速率的調(diào)控是通過生產(chǎn)者喚醒消費者威恼,消費者喚醒生產(chǎn)者互相作用來實現(xiàn)的調(diào)控品姓。
由于signal是要先獲取到鎖才能調(diào)用的,在put里是獲取不到take鎖的箫措,只能在自己的方法里去signal自己的condition隊列
linkedblockingqueue出于性能考慮用了兩個鎖腹备,盡量讓兩邊各自獨立,所以在LinkedBlockingQueue中斤蔓,是生產(chǎn)者在隊列未滿的情況下喚醒生產(chǎn)者植酥,
也就是finally之前的 if (c + 1 < capacity) notFull.signal();,消費者在隊列不為空的時候喚醒消費者弦牡,對應的是if (c > 1) notEmpty.signal(); 但是存在兩種特殊情況: 1 假設隊列滿了惧互,生產(chǎn)者可能全部處于await狀態(tài),那么此時就需要消費者出隊后喚醒生產(chǎn)者喇伯。也就是take操作return之前的signalNotFull()
假設隊列為空,消費者可能全部處于await狀態(tài)拨与,那么此時就需要生產(chǎn)者生產(chǎn)之后喚醒消費者稻据,也就是put操作return之前的signalNotEmpty()