前言
在前面的文章ArrayBlockingQueue源碼分析中酥馍,已經(jīng)對JDK中的BlockingQueue中的做了一個回顧,同時對ArrayBlockingQueue中的核心方法作了說明蜓肆,而LinkedBlockingQueue作為JDK中BlockingQueue家族系列中一員,由于其作為固定大小線程池(Executors.newFixedThreadPool())底層所使用的阻塞隊(duì)列辜妓,分析它的目的主要在于2點(diǎn):
(1) 與ArrayBlockingQueue進(jìn)行類比學(xué)習(xí)勺疼,加深各種數(shù)據(jù)結(jié)構(gòu)的理解
(2) 了解底層實(shí)現(xiàn),能夠更好地理解每一種阻塞隊(duì)列對線程池性能的影響页慷,做到真正的知其然憔足,且知其所以然
- 源碼分析LinkedBlockingQueue的實(shí)現(xiàn)
- 與ArrayBlockingQueue進(jìn)行比較
- 說明為什么選擇LinkedBlockingQueue作為固定大小的線程池的阻塞隊(duì)列
如發(fā)現(xiàn)有分析不對或不準(zhǔn)確的地方,請您及時糾正(在此謝過)
1.LinkedBlockingQueue深入分析
LinkedBlockingQueue酒繁,見名之意滓彰,它是由一個基于鏈表的阻塞隊(duì)列,首先看一下的核心組成:
// 所有的元素都通過Node這個靜態(tài)內(nèi)部類來進(jìn)行存儲州袒,這與LinkedList的處理方式完全一樣
static class Node<E> {
//使用item來保存元素本身
E item;
//保存當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)
Node<E> next;
Node(E x) { item = x; }
}
/**
阻塞隊(duì)列所能存儲的最大容量
用戶可以在創(chuàng)建時手動指定最大容量,如果用戶沒有指定最大容量
那么最默認(rèn)的最大容量為Integer.MAX_VALUE.
*/
private final int capacity;
/**
當(dāng)前阻塞隊(duì)列中的元素?cái)?shù)量
PS:如果你看過ArrayBlockingQueue的源碼,你會發(fā)現(xiàn)
ArrayBlockingQueue底層保存元素?cái)?shù)量使用的是一個
普通的int類型變量揭绑。其原因是在ArrayBlockingQueue底層
對于元素的入隊(duì)列和出隊(duì)列使用的是同一個lock對象。而數(shù)
量的修改都是在處于線程獲取鎖的情況下進(jìn)行操作郎哭,因此不
會有線程安全問題他匪。
而LinkedBlockingQueue卻不是,它的入隊(duì)列和出隊(duì)列使用的是兩個
不同的lock對象,因此無論是在入隊(duì)列還是出隊(duì)列夸研,都會涉及對元素?cái)?shù)
量的并發(fā)修改邦蜜,(之后通過源碼可以更加清楚地看到)因此這里使用了一個原子操作類
來解決對同一個變量進(jìn)行并發(fā)修改的線程安全問題。
*/
private final AtomicInteger count = new AtomicInteger(0);
/**
* 鏈表的頭部
* LinkedBlockingQueue的頭部具有一個不變性:
* 頭部的元素總是為null陈惰,head.item==null
*/
private transient Node<E> head;
/**
* 鏈表的尾部
* LinkedBlockingQueue的尾部也具有一個不變性:
* 即last.next==null
*/
private transient Node<E> last;
/**
元素出隊(duì)列時線程所獲取的鎖
當(dāng)執(zhí)行take畦徘、poll等操作時線程需要獲取的鎖
*/
private final ReentrantLock takeLock = new ReentrantLock();
/**
當(dāng)隊(duì)列為空時毕籽,通過該Condition讓從隊(duì)列中獲取元素的線程處于等待狀態(tài)
*/
private final Condition notEmpty = takeLock.newCondition();
/**
元素入隊(duì)列時線程所獲取的鎖
當(dāng)執(zhí)行add、put井辆、offer等操作時線程需要獲取鎖
*/
private final ReentrantLock putLock = new ReentrantLock();
/**
當(dāng)隊(duì)列的元素已經(jīng)達(dá)到capactiy关筒,通過該Condition讓元素入隊(duì)列的線程處于等待狀態(tài)
*/
private final Condition notFull = putLock.newCondition();
通過上面的分析,我們可以發(fā)現(xiàn)LinkedBlockingQueue在入隊(duì)列和出隊(duì)列時使用的不是同一個Lock杯缺,這也意味著它們之間的操作不會存在互斥操作蒸播。在多個CPU的情況下,它們可以做到真正的在同一時刻既消費(fèi)萍肆、又生產(chǎn)袍榆,能夠做到并行處理。
下面讓我們看下LinkedBlockingQueue的構(gòu)造方法:
/**
* 如果用戶沒有顯示指定capacity的值塘揣,默認(rèn)使用int的最大值
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
可以看到,當(dāng)隊(duì)列中沒有任何元素的時候,此時隊(duì)列的頭部就等于隊(duì)列的尾部,
指向的是同一個節(jié)點(diǎn),并且元素的內(nèi)容為null
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
/*
在初始化LinkedBlockingQueue的時候包雀,還可以直接將一個集合
中的元素全部入隊(duì)列,此時隊(duì)列最大容量依然是int的最大值亲铡。
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
//獲取鎖
putLock.lock(); // Never contended, but necessary for visibility
try {
//迭代集合中的每一個元素,讓其入隊(duì)列,并且更新一下當(dāng)前隊(duì)列中的元素?cái)?shù)量
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
//參考下面的enqueue分析
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
//釋放鎖
putLock.unlock();
}
}
/**
* 我去才写,這代碼其實(shí)可讀性不怎么樣啊。
* 其實(shí)下面的代碼等價(jià)于如下內(nèi)容:
* last.next=node;
* last = node;
* 其實(shí)也沒有什么花樣:
就是讓新入隊(duì)列的元素成為原來的last的next奖蔓,讓進(jìn)入的元素稱為last
*
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
在分析完LinkedBlockingQueue的核心組成之后,下面讓我們再看下核心的幾個操作方法,首先分析一下元素入隊(duì)列的過程:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
/*注意上面這句話,約定所有的put/take操作都會預(yù)先設(shè)置本地變量,
可以看到下面有一個將putLock賦值給了一個局部變量的操作
*/
int c = -1;
Node<E> node = new Node(e);
/*
在這里首先獲取到putLock,以及當(dāng)前隊(duì)列的元素?cái)?shù)量
即上面所描述的預(yù)設(shè)置本地變量操作
*/
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
/*
執(zhí)行可中斷的鎖獲取操作,即意味著如果線程由于獲取
鎖而處于Blocked狀態(tài)時赞草,線程是可以被中斷而不再繼
續(xù)等待,這也是一種避免死鎖的一種方式吆鹤,不會因?yàn)? 發(fā)現(xiàn)到死鎖之后而由于無法中斷線程最終只能重啟應(yīng)用厨疙。
*/
putLock.lockInterruptibly();
try {
/*
當(dāng)隊(duì)列的容量到底最大容量時,此時線程將處于等待狀
態(tài),直到隊(duì)列有空閑的位置才繼續(xù)執(zhí)行疑务。使用while判
斷依舊是為了放置線程被"偽喚醒”而出現(xiàn)的情況,即當(dāng)
線程被喚醒時而隊(duì)列的大小依舊等于capacity時沾凄,線
程應(yīng)該繼續(xù)等待。
*/
while (count.get() == capacity) {
notFull.await();
}
//讓元素進(jìn)行隊(duì)列的末尾,enqueue代碼在上面分析過了
enqueue(node);
//首先獲取原先隊(duì)列中的元素個數(shù),然后再對隊(duì)列中的元素個數(shù)+1.
c = count.getAndIncrement();
/*注:c+1得到的結(jié)果是新元素入隊(duì)列之后隊(duì)列元素的總和暑始。
當(dāng)前隊(duì)列中的總元素個數(shù)小于最大容量時,此時喚醒其他執(zhí)行入隊(duì)列的線程
讓它們可以放入元素,如果新加入元素之后,隊(duì)列的大小等于capacity搭独,
那么就意味著此時隊(duì)列已經(jīng)滿了,也就沒有必須要喚醒其他正在等待入隊(duì)列的線程,因?yàn)閱拘阉鼈冎笥は鳎鼈円策€是繼續(xù)等待廊镜。
*/
if (c + 1 < capacity)
notFull.signal();
} finally {
//完成對鎖的釋放
putLock.unlock();
}
/*當(dāng)c=0時,即意味著之前的隊(duì)列是空隊(duì)列,出隊(duì)列的線程都處于等待狀態(tài)唉俗,
現(xiàn)在新添加了一個新的元素,即隊(duì)列不再為空,因此它會喚醒正在等待獲取元素的線程嗤朴。
*/
if (c == 0)
signalNotEmpty();
}
/*
喚醒正在等待獲取元素的線程,告訴它們現(xiàn)在隊(duì)列中有元素了
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//通過notEmpty喚醒獲取元素的線程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
看完put方法,下面再看看下offer是如何處理的方法:
/**
在BlockingQueue接口中除了定義put方法外(當(dāng)隊(duì)列元素滿了之后就會阻塞虫溜,
直到隊(duì)列有新的空間可以方法線程才會繼續(xù)執(zhí)行)雹姊,還定義一個offer方法,
該方法會返回一個boolean值衡楞,當(dāng)入隊(duì)列成功返回true,入隊(duì)列失敗返回false吱雏。
該方法與put方法基本操作基本一致,只是有細(xì)微的差異。
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
/*
當(dāng)隊(duì)列已經(jīng)滿了歧杏,它不會繼續(xù)等待,而是直接返回镰惦。
因此該方法是非阻塞的。
*/
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
/*
當(dāng)獲取到鎖時犬绒,需要進(jìn)行二次的檢查,因?yàn)榭赡墚?dāng)隊(duì)列的大小為capacity-1時旺入,
兩個線程同時去搶占鎖,而只有一個線程搶占成功凯力,那么此時
當(dāng)線程將元素入隊(duì)列后茵瘾,釋放鎖,后面的線程搶占鎖之后咐鹤,此時隊(duì)列
大小已經(jīng)達(dá)到capacity拗秘,所以將它無法讓元素入隊(duì)列。
下面的其余操作和put都一樣祈惶,此處不再詳述
*/
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
BlockingQueue還定義了一個限時等待插入操作聘殖,即在等待一定的時間內(nèi),如果隊(duì)列有空間可以插入行瑞,那么就將元素入隊(duì)列奸腺,然后返回true,如果在過完指定的時間后依舊沒有空間可以插入,那么就返回false血久,下面是限時等待操作的分析:
/**
通過timeout和TimeUnit來指定等待的時長
timeout為時間的長度,TimeUnit為時間的單位
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
//將指定的時間長度轉(zhuǎn)換為毫秒來進(jìn)行處理
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
//如果等待的剩余時間小于等于0突照,那么直接返回
if (nanos <= 0)
return false;
/*
通過condition來完成等待,此時當(dāng)前線程會完成鎖的氧吐,并且處于等待狀態(tài)
直到被其他線程喚醒該線程讹蘑、或者當(dāng)前線程被中斷、
等待的時間截至才會返回筑舅,該返回值為從方法調(diào)用到返回所經(jīng)歷的時長座慰。
注意:上面的代碼是condition的awitNanos()方法的通用寫法,
可以參看Condition.awaitNaos的API文檔翠拣。
下面的其余操作和put都一樣版仔,此處不再詳述
*/
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
通過上面的分析,我們應(yīng)該比較清楚地知道了LinkedBlockingQueue的入隊(duì)列的操作误墓,其主要是通過獲取到putLock鎖來完成蛮粮,當(dāng)隊(duì)列的數(shù)量達(dá)到最大值,此時會導(dǎo)致線程處于阻塞狀態(tài)或者返回false(根據(jù)具體的方法來看)谜慌;如果隊(duì)列還有剩余的空間然想,那么此時會新創(chuàng)建出一個Node對象,將其設(shè)置到隊(duì)列的尾部欣范,作為LinkedBlockingQueue的last元素变泄。
在分析完入隊(duì)列的過程之后令哟,我們接下來看看LinkedBlockingQueue出隊(duì)列的過程;由于BlockingQueue的方法都具有對稱性妨蛹,此處就只分析take方法的實(shí)現(xiàn)励饵,其余方法的實(shí)現(xiàn)都如出一轍:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//通過takeLock獲取鎖,并且支持線程中斷
takeLock.lockInterruptibly();
try {
//當(dāng)隊(duì)列為空時滑燃,則讓當(dāng)前線程處于等待
while (count.get() == 0) {
notEmpty.await();
}
//完成元素的出隊(duì)列
x = dequeue();
/*
隊(duì)列元素個數(shù)完成原子化操作-1,可以看到count元素會
在插入元素的線程和獲取元素的線程進(jìn)行并發(fā)修改操作役听。
*/
c = count.getAndDecrement();
/*
當(dāng)一個元素出隊(duì)列之后,隊(duì)列的大小依舊大于1時
當(dāng)前線程會喚醒其他執(zhí)行元素出隊(duì)列的線程,讓它們也
可以執(zhí)行元素的獲取
*/
if (c > 1)
notEmpty.signal();
} finally {
//完成鎖的釋放
takeLock.unlock();
}
/*
當(dāng)c==capaitcy時表窘,即在獲取當(dāng)前元素之前典予,
隊(duì)列已經(jīng)滿了,而此時獲取元素之后乐严,隊(duì)列就會
空出一個位置瘤袖,故當(dāng)前線程會喚醒執(zhí)行插入操作的線
程通知其他中的一個可以進(jìn)行插入操作。
*/
if (c == capacity)
signalNotFull();
return x;
}
/**
* 讓頭部元素出隊(duì)列的過程
* 其最終的目的是讓原來的head被GC回收昂验,讓其的next成為head
* 并且新的head的item為null.
* 因?yàn)長inkedBlockingQueue的頭部具有一致性:即元素為null捂敌。
*/
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
對于LinkedBlockingQueue的源碼分析就到這里,下面讓我們將LinkedBlockingQueue與ArrayBlockingQueue進(jìn)行一個比較既琴。
2.LinkedBlockingQueue與ArrayBlockingQueue的比較
ArrayBlockingQueue由于其底層基于數(shù)組占婉,并且在創(chuàng)建時指定存儲的大小,在完成后就會立即在內(nèi)存分配固定大小容量的數(shù)組元素甫恩,因此其存儲通常有限逆济,故其是一個“有界“的阻塞隊(duì)列;而LinkedBlockingQueue可以由用戶指定最大存儲容量磺箕,也可以無需指定奖慌,如果不指定則最大存儲容量將是Integer.MAX_VALUE,即可以看作是一個“無界”的阻塞隊(duì)列松靡,由于其節(jié)點(diǎn)的創(chuàng)建都是動態(tài)創(chuàng)建简僧,并且在節(jié)點(diǎn)出隊(duì)列后可以被GC所回收,因此其具有靈活的伸縮性雕欺。但是由于ArrayBlockingQueue的有界性岛马,因此其能夠更好的對于性能進(jìn)行預(yù)測,而LinkedBlockingQueue由于沒有限制大小阅茶,當(dāng)任務(wù)非常多的時候蛛枚,不停地向隊(duì)列中存儲,就有可能導(dǎo)致內(nèi)存溢出的情況發(fā)生脸哀。
其次沮峡,ArrayBlockingQueue中在入隊(duì)列和出隊(duì)列操作過程中梢什,使用的是同一個lock密末,所以即使在多核CPU的情況下,其讀取和操作的都無法做到并行慌随,而LinkedBlockingQueue的讀取和插入操作所使用的鎖是兩個不同的lock,它們之間的操作互相不受干擾矗晃,因此兩種操作可以并行完成看疙,故LinkedBlockingQueue的吞吐量要高于ArrayBlockingQueue。
3.選擇LinkedBlockingQueue的理由
/**
下面的代碼是Executors創(chuàng)建固定大小線程池的代碼浦旱,其使用了
LinkedBlockingQueue來作為任務(wù)隊(duì)列宇色。
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
JDK中選用LinkedBlockingQueue作為阻塞隊(duì)列的原因就在于其無界性。因?yàn)榫€程大小固定的線程池颁湖,其線程的數(shù)量是不具備伸縮性的宣蠕,當(dāng)任務(wù)非常繁忙的時候,就勢必會導(dǎo)致所有的線程都處于工作狀態(tài)甥捺,如果使用一個有界的阻塞隊(duì)列來進(jìn)行處理抢蚀,那么就非常有可能很快導(dǎo)致隊(duì)列滿的情況發(fā)生,從而導(dǎo)致任務(wù)無法提交而拋出RejectedExecutionException镰禾,而使用無界隊(duì)列由于其良好的存儲容量的伸縮性皿曲,可以很好的去緩沖任務(wù)繁忙情況下場景,即使任務(wù)非常多吴侦,也可以進(jìn)行動態(tài)擴(kuò)容屋休,當(dāng)任務(wù)被處理完成之后,隊(duì)列中的節(jié)點(diǎn)也會被隨之被GC回收备韧,非常靈活博投。
至此,LinkedBlockingQueue的分析就到這里盯蝴,如果您發(fā)現(xiàn)有任何編寫不對的地方毅哗,請指出(萬分感謝!)捧挺。