一、 簡(jiǎn)介
上篇我們介紹了ArrayBlockingQueue的相關(guān)方法的原理步清,這一篇我們來(lái)學(xué)習(xí)一下ArrayBlockingQueue
的“親戚” LinkedBlockingQueue
旅东。在集合框架里,想必大家都用過(guò)ArrayList和LinkedList继控,也經(jīng)常在面試中問(wèn)到他們之間的區(qū)別斟薇。ArrayList和ArrayBlockingQueue一樣停巷,內(nèi)部基于數(shù)組來(lái)存放元素耍攘,而LinkedBlockingQueue則和LinkedList一樣,內(nèi)部基于鏈表來(lái)存放元素畔勤。
LinkedBlockingQueue實(shí)現(xiàn)了BlockingQueue接口蕾各,這里放一張類(lèi)的繼承關(guān)系圖(圖片來(lái)自之前的文章:說(shuō)說(shuō)隊(duì)列Queue)
LinkedBlockingQueue不同于ArrayBlockingQueue,它如果不指定容量庆揪,默認(rèn)為Integer.MAX_VALUE式曲,也就是無(wú)界隊(duì)列。所以為了避免隊(duì)列過(guò)大造成機(jī)器負(fù)載或者內(nèi)存爆滿的情況出現(xiàn)缸榛,我們?cè)谑褂玫臅r(shí)候建議手動(dòng)傳一個(gè)隊(duì)列的大小吝羞。
二、源碼分析
2.1内颗、屬性
/**
* 節(jié)點(diǎn)類(lèi)钧排,用于存儲(chǔ)數(shù)據(jù)
*/
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
/** 阻塞隊(duì)列的大小,默認(rèn)為Integer.MAX_VALUE */
private final int capacity;
/** 當(dāng)前阻塞隊(duì)列中的元素個(gè)數(shù) */
private final AtomicInteger count = new AtomicInteger();
/**
* 阻塞隊(duì)列的頭結(jié)點(diǎn)
*/
transient Node<E> head;
/**
* 阻塞隊(duì)列的尾節(jié)點(diǎn)
*/
private transient Node<E> last;
/** 獲取并移除元素時(shí)使用的鎖均澳,如take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** notEmpty條件對(duì)象恨溜,當(dāng)隊(duì)列沒(méi)有數(shù)據(jù)時(shí)用于掛起執(zhí)行刪除的線程 */
private final Condition notEmpty = takeLock.newCondition();
/** 添加元素時(shí)使用的鎖如 put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** notFull條件對(duì)象,當(dāng)隊(duì)列數(shù)據(jù)已滿時(shí)用于掛起執(zhí)行添加的線程 */
private final Condition notFull = putLock.newCondition();
從上面的屬性我們知道找前,每個(gè)添加到LinkedBlockingQueue隊(duì)列中的數(shù)據(jù)都將被封裝成Node節(jié)點(diǎn)糟袁,添加的鏈表隊(duì)列中,其中head和last分別指向隊(duì)列的頭結(jié)點(diǎn)和尾結(jié)點(diǎn)躺盛。與ArrayBlockingQueue不同的是项戴,LinkedBlockingQueue內(nèi)部分別使用了takeLock 和 putLock 對(duì)并發(fā)進(jìn)行控制,也就是說(shuō)颗品,添加和刪除操作并不是互斥操作肯尺,可以同時(shí)進(jìn)行,這樣也就可以大大提高吞吐量躯枢。
這里如果不指定隊(duì)列的容量大小则吟,也就是使用默認(rèn)的Integer.MAX_VALUE,如果存在添加速度大于刪除速度時(shí)候锄蹂,有可能會(huì)內(nèi)存溢出氓仲,這點(diǎn)在使用前希望慎重考慮。
另外得糜,LinkedBlockingQueue對(duì)每一個(gè)lock鎖都提供了一個(gè)Condition用來(lái)掛起和喚醒其他線程敬扛。
2.2、構(gòu)造函數(shù)
public LinkedBlockingQueue() {
// 默認(rèn)大小為Integer.MAX_VALUE
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
默認(rèn)的構(gòu)造函數(shù)和最后一個(gè)構(gòu)造函數(shù)創(chuàng)建的隊(duì)列大小都為Integer.MAX_VALUE朝抖,只有第二個(gè)構(gòu)造函數(shù)用戶可以指定隊(duì)列的大小啥箭。第二個(gè)構(gòu)造函數(shù)最后初始化了last和head節(jié)點(diǎn),讓它們都指向了一個(gè)元素為null的節(jié)點(diǎn)治宣。
最后一個(gè)構(gòu)造函數(shù)使用了putLock來(lái)進(jìn)行加鎖急侥,但是這里并不是為了多線程的競(jìng)爭(zhēng)而加鎖,只是為了放入的元素能立即對(duì)其他線程可見(jiàn)侮邀。
2.3坏怪、方法
同樣,LinkedBlockingQueue也有著和ArrayBlockingQueue一樣的方法绊茧,我們先來(lái)看看入隊(duì)列的方法铝宵。
2.3.1、入隊(duì)方法
LinkedBlockingQueue提供了多種入隊(duì)操作的實(shí)現(xiàn)來(lái)滿足不同情況下的需求华畏,入隊(duì)操作有如下幾種:
- void put(E e)鹏秋;
- boolean offer(E e);
- boolean offer(E e, long timeout, TimeUnit unit)亡笑。
put(E e)
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 獲取鎖中斷
putLock.lockInterruptibly();
try {
//判斷隊(duì)列是否已滿拼岳,如果已滿阻塞等待
while (count.get() == capacity) {
notFull.await();
}
// 把node放入隊(duì)列中
enqueue(node);
c = count.getAndIncrement();
// 再次判斷隊(duì)列是否有可用空間,如果有喚醒下一個(gè)線程進(jìn)行添加操作
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 如果隊(duì)列中有一條數(shù)據(jù)况芒,喚醒消費(fèi)線程進(jìn)行消費(fèi)
if (c == 0)
signalNotEmpty();
}
小結(jié)put方法來(lái)看惜纸,它總共做了以下情況的考慮:
- 隊(duì)列已滿,阻塞等待绝骚。
- 隊(duì)列未滿耐版,創(chuàng)建一個(gè)node節(jié)點(diǎn)放入隊(duì)列中,如果放完以后隊(duì)列還有剩余空間压汪,繼續(xù)喚醒下一個(gè)添加線程進(jìn)行添加粪牲。如果放之前隊(duì)列中沒(méi)有元素,放完以后要喚醒消費(fèi)線程進(jìn)行消費(fèi)止剖。
很清晰明了是不是腺阳?
我們來(lái)看看該方法中用到的幾個(gè)其他方法落君,先來(lái)看看enqueue(Node node)方法:
private void enqueue(Node<E> node) {
last = last.next = node;
}
該方法可能有些同學(xué)看不太懂,我們用一張圖來(lái)看看往隊(duì)列里依次放入元素A和元素B亭引,畢竟無(wú)圖無(wú)真相:
接下來(lái)我們看看signalNotEmpty绎速,順帶著看signalNotFull方法。
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
為什么要這么寫(xiě)焙蚓?因?yàn)閟ignal的時(shí)候要獲取到該signal對(duì)應(yīng)的Condition對(duì)象的鎖才行纹冤。
offer(E e)
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 隊(duì)列有可用空間,放入node節(jié)點(diǎn)购公,判斷放入元素后是否還有可用空間萌京,
// 如果有,喚醒下一個(gè)添加線程進(jìn)行添加操作宏浩。
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
可以看到offer僅僅對(duì)put方法改動(dòng)了一點(diǎn)點(diǎn)知残,當(dāng)隊(duì)列沒(méi)有可用元素的時(shí)候,不同于put方法的阻塞等待比庄,offer方法直接方法false橡庞。
offer(E e, long timeout, TimeUnit unit)
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 等待超時(shí)時(shí)間nanos,超時(shí)時(shí)間到了返回false
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
該方法只是對(duì)offer方法進(jìn)行了阻塞超時(shí)處理印蔗,使用了Condition的awaitNanos來(lái)進(jìn)行超時(shí)等待扒最,這里為什么要用while循環(huán)?因?yàn)閍waitNanos方法是可中斷的华嘹,為了防止在等待過(guò)程中線程被中斷吧趣,這里使用while循環(huán)進(jìn)行等待過(guò)程中中斷的處理,繼續(xù)等待剩下需等待的時(shí)間耙厚。
2.3.2强挫、出隊(duì)方法
入隊(duì)列的方法說(shuō)完后,我們來(lái)說(shuō)說(shuō)出隊(duì)列的方法薛躬。LinkedBlockingQueue提供了多種出隊(duì)操作的實(shí)現(xiàn)來(lái)滿足不同情況下的需求俯渤,如下:
- E take();
- E poll();
- E poll(long timeout, TimeUnit unit);
take()
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 隊(duì)列為空,阻塞等待
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
// 隊(duì)列中還有元素型宝,喚醒下一個(gè)消費(fèi)線程進(jìn)行消費(fèi)
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 移除元素之前隊(duì)列是滿的八匠,喚醒生產(chǎn)線程進(jìn)行添加元素
if (c == capacity)
signalNotFull();
return x;
}
take方法看起來(lái)就是put方法的逆向操作,它總共做了以下情況的考慮:
- 隊(duì)列為空趴酣,阻塞等待梨树。
- 隊(duì)列不為空,從隊(duì)首獲取并移除一個(gè)元素岖寞,如果消費(fèi)后還有元素在隊(duì)列中抡四,繼續(xù)喚醒下一個(gè)消費(fèi)線程進(jìn)行元素移除。如果放之前隊(duì)列是滿元素的情況,移除完后要喚醒生產(chǎn)線程進(jìn)行添加元素指巡。
我們來(lái)看看dequeue方法
private E dequeue() {
// 獲取到head節(jié)點(diǎn)
Node<E> h = head;
// 獲取到head節(jié)點(diǎn)指向的下一個(gè)節(jié)點(diǎn)
Node<E> first = h.next;
// head節(jié)點(diǎn)原來(lái)指向的節(jié)點(diǎn)的next指向自己淑履,等待下次gc回收
h.next = h; // help GC
// head節(jié)點(diǎn)指向新的節(jié)點(diǎn)
head = first;
// 獲取到新的head節(jié)點(diǎn)的item值
E x = first.item;
// 新head節(jié)點(diǎn)的item值設(shè)置為null
first.item = null;
return x;
}
可能有些童鞋鏈表算法不是很熟悉,我們可以結(jié)合注釋和圖來(lái)看就清晰很多了藻雪。
其實(shí)這個(gè)寫(xiě)法看起來(lái)很繞秘噪,我們其實(shí)也可以這么寫(xiě):
private E dequeue() {
// 獲取到head節(jié)點(diǎn)
Node<E> h = head;
// 獲取到head節(jié)點(diǎn)指向的下一個(gè)節(jié)點(diǎn),也就是節(jié)點(diǎn)A
Node<E> first = h.next;
// 獲取到下下個(gè)節(jié)點(diǎn)阔涉,也就是節(jié)點(diǎn)B
Node<E> next = first.next;
// head的next指向下下個(gè)節(jié)點(diǎn),也就是圖中的B節(jié)點(diǎn)
h.next = next;
// 得到節(jié)點(diǎn)A的值
E x = first.item;
first.item = null; // help GC
first.next = first; // help GC
return x;
}
poll()
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
poll方法去除了take方法中元素為空后阻塞等待這一步驟捷绒,這里也就不詳細(xì)說(shuō)了瑰排。同理,poll(long timeout, TimeUnit unit)也和offer(E e, long timeout, TimeUnit unit)一樣暖侨,利用了Condition的awaitNanos方法來(lái)進(jìn)行阻塞等待直至超時(shí)椭住。這里就不列出來(lái)說(shuō)了。
2.3.3字逗、獲取元素方法
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
加鎖后京郑,獲取到head節(jié)點(diǎn)的next節(jié)點(diǎn),如果為空返回null葫掉,如果不為空些举,返回next節(jié)點(diǎn)的item值。
2.3.4俭厚、刪除元素方法
public boolean remove(Object o) {
if (o == null) return false;
// 兩個(gè)lock全部上鎖
fullyLock();
try {
// 從head開(kāi)始遍歷元素户魏,直到最后一個(gè)元素
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
// 如果找到相等的元素,調(diào)用unlink方法刪除元素
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
// 兩個(gè)lock全部解鎖
fullyUnlock();
}
}
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
因?yàn)閞emove方法使用兩個(gè)鎖全部上鎖挪挤,所以其他操作都需要等待它完成叼丑,而該方法需要從head節(jié)點(diǎn)遍歷到尾節(jié)點(diǎn),所以時(shí)間復(fù)雜度為O(n)扛门。我們來(lái)看看unlink方法鸠信。
void unlink(Node<E> p, Node<E> trail) {
// p的元素置為null
p.item = null;
// p的前一個(gè)節(jié)點(diǎn)的next指向p的next,也就是把p從鏈表中去除了
trail.next = p.next;
// 如果last指向p论寨,刪除p后讓last指向trail
if (last == p)
last = trail;
// 如果刪除之前元素是滿的星立,刪除之后就有空間了,喚醒生產(chǎn)線程放入元素
if (count.getAndDecrement() == capacity)
notFull.signal();
}
3葬凳、問(wèn)題
看源碼的時(shí)候贞铣,我給自己拋出了一個(gè)問(wèn)題。
- 為什么dequeue里的h.next不指向null沮明,而指向h辕坝?
- 為什么unlink里沒(méi)有p.next = null或者p.next = p這樣的操作?
這個(gè)疑問(wèn)一直困擾著我荐健,直到我看了迭代器的部分源碼后才豁然開(kāi)朗酱畅,下面放出部分迭代器的源碼:
private Node<E> current;
private Node<E> lastRet;
private E currentElement;
Itr() {
fullyLock();
try {
current = head.next;
if (current != null)
currentElement = current.item;
} finally {
fullyUnlock();
}
}
private Node<E> nextNode(Node<E> p) {
for (;;) {
// 解決了問(wèn)題1
Node<E> s = p.next;
if (s == p)
return head.next;
if (s == null || s.item != null)
return s;
p = s;
}
}
迭代器的遍歷分為兩步琳袄,第一步加雙鎖把元素放入臨時(shí)變量中,第二部遍歷臨時(shí)變量的元素纺酸。也就是說(shuō)remove可能和迭代元素同時(shí)進(jìn)行窖逗,很有可能remove的時(shí)候,有線程在進(jìn)行迭代操作餐蔬,而如果unlink中改變了p的next碎紊,很有可能在迭代的時(shí)候會(huì)造成錯(cuò)誤,造成不一致問(wèn)題樊诺。這個(gè)解決了問(wèn)題2仗考。
而問(wèn)題1其實(shí)在nextNode方法中也能找到,為了正確遍歷词爬,nextNode使用了 s == p的判斷秃嗜,當(dāng)下一個(gè)元素是自己本身時(shí),返回head的下一個(gè)節(jié)點(diǎn)顿膨。
4锅锨、總結(jié)
LinkedBlockingQueue是一個(gè)阻塞隊(duì)列,內(nèi)部由兩個(gè)ReentrantLock來(lái)實(shí)現(xiàn)出入隊(duì)列的線程安全恋沃,由各自的Condition對(duì)象的await和signal來(lái)實(shí)現(xiàn)等待和喚醒功能必搞。它和ArrayBlockingQueue的不同點(diǎn)在于:
- 隊(duì)列大小有所不同,ArrayBlockingQueue是有界的初始化必須指定大小囊咏,而LinkedBlockingQueue可以是有界的也可以是無(wú)界的(Integer.MAX_VALUE)顾画,對(duì)于后者而言,當(dāng)添加速度大于移除速度時(shí)匆笤,在無(wú)界的情況下研侣,可能會(huì)造成內(nèi)存溢出等問(wèn)題。
- 數(shù)據(jù)存儲(chǔ)容器不同炮捧,ArrayBlockingQueue采用的是數(shù)組作為數(shù)據(jù)存儲(chǔ)容器庶诡,而LinkedBlockingQueue采用的則是以Node節(jié)點(diǎn)作為連接對(duì)象的鏈表。
- 由于ArrayBlockingQueue采用的是數(shù)組的存儲(chǔ)容器咆课,因此在插入或刪除元素時(shí)不會(huì)產(chǎn)生或銷(xiāo)毀任何額外的對(duì)象實(shí)例末誓,而LinkedBlockingQueue則會(huì)生成一個(gè)額外的Node對(duì)象。這可能在長(zhǎng)時(shí)間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的時(shí)书蚪,對(duì)于GC可能存在較大影響喇澡。
- 兩者的實(shí)現(xiàn)隊(duì)列添加或移除的鎖不一樣,ArrayBlockingQueue實(shí)現(xiàn)的隊(duì)列中的鎖是沒(méi)有分離的殊校,即添加操作和移除操作采用的同一個(gè)ReenterLock鎖晴玖,而LinkedBlockingQueue實(shí)現(xiàn)的隊(duì)列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock呕屎,這樣能大大提高隊(duì)列的吞吐量让簿,也意味著在高并發(fā)的情況下生產(chǎn)者和消費(fèi)者可以并行地操作隊(duì)列中的數(shù)據(jù),以此來(lái)提高整個(gè)隊(duì)列的并發(fā)性能秀睛。
三尔当、隊(duì)列各個(gè)方法區(qū)別:
方法 | 作用 | 區(qū)別 |
---|---|---|
add | 增加一個(gè)元素 | 如果隊(duì)列已滿,則拋出一個(gè)IIIegaISlabEepeplian異常 |
offer | 添加一個(gè)元素并返回true | 如果隊(duì)列已滿蹂安,則返回false椭迎。 如果 e 元素為 null 則拋出 NullPointerException 異常。該方法不阻塞 |
put | 添加一個(gè)元素并返回true | 如果隊(duì)列已滿則阻塞當(dāng)前線程直到隊(duì)列有空閑插入成功后返回 true田盈,如果在阻塞的時(shí)候被其它線程設(shè)置了中斷標(biāo)志畜号,則被阻塞線程會(huì)拋出 InterruptedException異常而返回,另外如果 e 元素為 null 則拋出 NullPointerException 異常 |
remove | 刪除隊(duì)列里面指定元素 | 有則刪除返回 true缠黍,沒(méi)有則返回 false |
poll | 移除并返問(wèn)隊(duì)列頭部的元素 | 如果隊(duì)列為空弄兜,則返回null药蜻。該方法是不阻塞的 |
take | 移除并返回隊(duì)列頭部的元素 | 如果隊(duì)列為空則阻塞調(diào)用線程瓷式。如果隊(duì)列為空則阻塞當(dāng)前線程直到隊(duì)列不為空然后返回元素,如果在阻塞的時(shí)候被其它線程設(shè)置了中斷標(biāo)志语泽,則被阻塞線程會(huì)拋出 InterruptedException 異常而返回贸典。 |
element | 返回隊(duì)列頭部的元素 | 如果隊(duì)列為空,則拋出一個(gè)NoSuchElementException異常 |
peek | 返回隊(duì)列頭部的元素 | 如果隊(duì)列為空踱卵,則返回null廊驼。該方法是不阻塞的 |
參考:https://blog.csdn.net/tonywu1992/article/details/83419448?spm=1001.2014.3001.5502