一雹锣、前言
前面介紹了使用CAS實現(xiàn)的非阻塞隊列ConcurrentLinkedQueue,下面就來介紹下使用獨占鎖實現(xiàn)的阻塞隊列LinkedBlockingQueue的實現(xiàn)
阿里巴巴長期招聘Java研發(fā)工程師p6,p7,p8等上不封頂級別初茶,有意向的可以發(fā)簡歷給我,注明想去的部門和工作地點:1064454834@qq.com
二、 LinkedBlockingQueue類圖結(jié)構(gòu)
如圖LinkedBlockingQueue中也有兩個Node分別用來存放首尾節(jié)點浊闪,并且里面有個初始值為0的原子變量count用來記錄隊列元素個數(shù)恼布,另外里面有兩個ReentrantLock的獨占鎖,分別用來控制元素入隊和出隊加鎖搁宾,其中takeLock用來控制同時只有一個線程可以從隊列獲取元素折汞,其他線程必須等待,putLock控制同時只能有一個線程可以獲取鎖去添加元素盖腿,其他線程必須等待爽待。另外notEmpty和notFull用來實現(xiàn)入隊和出隊的同步。 另外由于出入隊是兩個非公平獨占鎖翩腐,所以可以同時又一個線程入隊和一個線程出隊鸟款,其實這個是個生產(chǎn)者-消費者模型。
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger(0);
public static final int MAX_VALUE = 0x7fffffff;
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//初始化首尾節(jié)點
last = head = new Node<E>(null);
}
如圖默認隊列容量為0x7fffffff;用戶也可以自己指定容量茂卦。
三何什、必備基礎(chǔ)
3.1 ReentrantLock
可以參考 https://www.atatech.org/articles/80539?flag_data_from=active
3.2 條件變量(Condition)
條件變量這里使用的是takeLock.newCondition()獲取也就是說調(diào)用ReentrantLock的方法獲取的,那么可預見Condition使用了ReentrantLock的state等龙。上面的參考沒有提到所以這里串串講下
- 首先看下類圖結(jié)構(gòu)
如圖ConditionObject中兩個node分別用來存放條件隊列的首尾節(jié)點处渣,條件隊列就是調(diào)用條件變量的await方法被阻塞后的節(jié)點組成的單向鏈表伶贰。另外ConditionObject還要依賴AQS的state,ConditionObject是AQS類的一個內(nèi)部類霍比。
- awaitNanos操作
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
//如果中斷標志被設(shè)置了幕袱,則拋異常
if (Thread.interrupted())
throw new InterruptedException();
//添加當前線程節(jié)點到條件隊列,
Node node = addConditionWaiter();
//當前線程釋放獨占鎖
int savedState = fullyRelease(node);
long lastTime = System.nanoTime();
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
//掛起當前線程直到超時
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
}
//unpark后悠瞬,當前線程重新獲取鎖们豌,有可能獲取不到被放到AQS的隊列
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return nanosTimeout - (System.nanoTime() - lastTime);
}
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
//釋放鎖,如果失敗則拋異常
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
首先如果當前線程中斷標志被設(shè)置了浅妆,直接拋出異常望迎。添加當前線程節(jié)點(狀態(tài)為:-2)到條件隊列凌外。
然后嘗試釋放當前線程擁有的鎖并保存當前計數(shù)辩尊,可知如果當前線程調(diào)用awaitNano前沒有使用當前條件變量所在的Reetenlock變量調(diào)用lock或者lockInterruptibly獲取到鎖,會拋出IllegalMonitorStateException異常。
然后調(diào)用park掛起當前線程直到超時或者其他線程調(diào)用了當前線程的unpark方法康辑,或者調(diào)用了當前線程的interupt方法(這時候會拋異常)摄欲。
如果超時或者其他線程調(diào)用了當前線程的unpark方法,則當前線程從掛起變?yōu)榧せ畲保@取cpu資源后會繼續(xù)執(zhí)行胸墙,會重新獲取鎖。
- signal操作
public final void signal() {
//如果當前線程沒有持有鎖按咒,拋異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//從條件隊列找第一個狀態(tài)為CONDITION的迟隅,然后把狀態(tài)變?yōu)?
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
//狀態(tài)為CONDITION的,然后把狀態(tài)變?yōu)?
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//把條件隊列的上面狀態(tài)為0的節(jié)點放入AQS阻塞隊列
Node p = enq(node);
int ws = p.waitStatus;
//調(diào)用unpark激活掛起的線程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
首先看調(diào)用signal的線程是不是持有了獨占鎖励七,沒有則拋出異常智袭。
然后獲取在條件隊列里面待的時間最長的node,把它移動到線程持有的鎖所在的AQS隊列。
其中enq方法就是把當前節(jié)點放入了AQS隊列掠抬,但是這時候該節(jié)點還是在條件隊列里面那吼野,那么什么時候從條件隊列移除那?其實在await里面的unlinkCancelledWaiters方法两波。
總結(jié):
無論是條件變量的await和singal都是需要先獲取獨占鎖才能調(diào)用箫锤,因為條件變量使用的就是獨占鎖里面的state管理狀態(tài),否者會報異常雨女。
四 、帶超時時間的offer操作-生產(chǎn)者
在隊尾添加元素阳准,如果隊列滿了氛堕,那么等待timeout時候,如果時間超時則返回false野蝇,如果在超時前隊列有空余空間讼稚,則插入后返回true括儒。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//空元素拋空指針異常
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//獲取可被中斷鎖,只有一個線程克獲取
putLock.lockInterruptibly();
try {
//如果隊列滿則進入循環(huán)
while (count.get() == capacity) {
//nanos<=0直接返回
if (nanos <= 0)
return false;
//否者調(diào)用await進行等待锐想,超時則返回<=0(1)
nanos = notFull.awaitNanos(nanos);
}
//await在超時時間內(nèi)返回則添加元素(2)
enqueue(new Node<E>(e));
c = count.getAndIncrement();
//隊列不滿則激活其他等待入隊線程(3)
if (c + 1 < capacity)
notFull.signal();
} finally {
//釋放鎖
putLock.unlock();
}
//c==0說明隊列里面有一個元素帮寻,這時候喚醒出隊線程(4)
if (c == 0)
signalNotEmpty();
return true;
}
private void enqueue(Node<E> node) {
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
如果獲取鎖前面有線程調(diào)用了putLock. interrupt(),并且后面沒有調(diào)用interrupted()重置中斷標志,調(diào)用lockInterruptibly時候會拋出InterruptedException異常赠摇。
隊列滿的時候調(diào)用notFull.awaitNanos阻塞當前線程固逗,當前線程會釋放獲取的鎖,然后等待超時或者其他線程調(diào)用了notFull.signal()才會返回并重新獲取鎖藕帜,或者其他線程調(diào)用了該線程的interrupt方法設(shè)置了中斷標志烫罩,這時候也會返回但是會拋出InterruptedException異常。
如果超時則直接返回false洽故,如果超時前調(diào)用了notFull.signal()則會退出循環(huán)贝攒,執(zhí)行(2)添加元素到隊列,然后執(zhí)行(3)时甚,(3)的目的是為了激活其他入隊等待線程隘弊。(4)的話c==0說明隊列里面已經(jīng)有一個元素了,這時候就可以激活等待出隊線程了荒适。
另外signalNotEmpty函數(shù)是先獲取獨占鎖梨熙,然后在調(diào)用的signal這也證明了3.2節(jié)的結(jié)論。
五吻贿、 帶超時時間的poll操作-消費者
獲取并移除隊首元素串结,在指定的時間內(nèi)去輪詢隊列看有沒有首元素有則返回,否者超時后返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//出隊線程獲取獨占鎖
takeLock.lockInterruptibly();
try {
//循環(huán)直到隊列不為空
while (count.get() == 0) {
//超時直接返回null
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
//出隊舅列,計數(shù)器減一
x = dequeue();
c = count.getAndDecrement();
//如果出隊前隊列不為空則發(fā)送信號肌割,激活其他阻塞的出隊線程
if (c > 1)
notEmpty.signal();
} finally {
//釋放鎖
takeLock.unlock();
}
//當前隊列容量為最大值-1則激活入隊線程。
if (c == capacity)
signalNotFull();
return x;
}
首先獲取獨占鎖帐要,然后進入循環(huán)當當前隊列有元素才會退出循環(huán)把敞,或者超時了,直接返回null榨惠。
超時前退出循環(huán)后奋早,就從隊列移除元素,然后計數(shù)器減去一赠橙,如果減去1前隊列元素大于1則說明當前移除后隊列還有元素耽装,那么就發(fā)信號激活其他可能阻塞到當前條件信號的線程。
最后如果減去1前隊列元素個數(shù)=最大值期揪,那么移除一個后會騰出一個空間來掉奄,這時候可以激活可能存在的入隊阻塞線程。
六凤薛、put操作-生產(chǎn)者
與帶超時時間的poll類似不同在于put時候如果當前隊列滿了它會一直等待其他線程調(diào)用notFull.signal才會被喚醒姓建。
七诞仓、 take操作-消費者
與帶超時時間的poll類似不同在于take時候如果當前隊列空了它會一直等待其他線程調(diào)用notEmpty.signal()才會被喚醒。
八速兔、 size操作
當前隊列元素個數(shù)墅拭,如代碼直接使用原子變量count獲取。
public int size() {
return count.get();
}
九涣狗、peek操作
獲取但是不移除當前隊列的頭元素谍婉,沒有則返回null
public E peek() {
//隊列空,則返回null
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
十屑柔、 remove操作
刪除隊列里面的一個元素屡萤,有則刪除返回true,沒有則返回false掸宛,在刪除操作時候由于要遍歷隊列所以加了雙重鎖死陆,也就是在刪除過程中不允許入隊也不允許出隊操作
public boolean remove(Object o) {
if (o == null) return false;
//雙重加鎖
fullyLock();
try {
//遍歷隊列找則刪除返回true
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
//找不到返回false
return false;
} finally {
//解鎖
fullyUnlock();
}
}
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
void unlink(Node<E> p, Node<E> trail) {
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
//如果當前隊列滿,刪除后唧瘾,也不忘記最快的喚醒等待的線程
if (count.getAndDecrement() == capacity)
notFull.signal();
}
十一措译、開源框架中使用
tomcat中任務(wù)隊列TaskQueue
11.1 類圖結(jié)構(gòu)
可知TaskQueue繼承了LinkedBlockingQueue并且泛化類型固定了為Runnalbe.重寫了offer,poll,take方法饰序。
11.2 TaskQueue
tomcat中有個線程池ThreadPoolExecutor领虹,在NIOEndPoint中當acceptor線程接受到請求后,會把任務(wù)放入隊列求豫,然后poller 線程從隊列里面獲取任務(wù)塌衰,然后就吧任務(wù)放入線程池執(zhí)行。這個ThreadPoolExecutor中的的一個參數(shù)就是TaskQueue蝠嘉。
先看看ThreadPoolExecutor的參數(shù)如果是普通LinkedBlockingQueue是怎么樣的執(zhí)行邏輯:
當調(diào)用線程池方法 execute() 方法添加一個任務(wù)時:
- 如果當前運行的線程數(shù)量小于 corePoolSize最疆,則創(chuàng)建新線程運行該任務(wù)
- 如果當前運行的線程數(shù)量大于或等于 corePoolSize,則將這個任務(wù)放入阻塞隊列蚤告。
- 如果當前隊列滿了努酸,并且當前運行的線程數(shù)量小于 maximumPoolSize,則創(chuàng)建新線程運行該任務(wù)杜恰;
- 如果當前隊列滿了获诈,并且當前運行的線程數(shù)量大于或等于 maximumPoolSize,那么線程池將會拋出RejectedExecutionException異常心褐。
如果線程執(zhí)行完了當前任務(wù)舔涎,那么會去隊列里面獲取一個任務(wù)來執(zhí)行,如果任務(wù)執(zhí)行完了逗爹,并且當前線程數(shù)大于corePoolSize亡嫌,那么會根據(jù)線程空閑時間keepAliveTime回收一些線程保持線程池corePoolSize個線程。
首先看下線程池中exectue添加任務(wù)時候的邏輯:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//當前工作線程個數(shù)小于core個數(shù)則開新線程執(zhí)行(1)
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//放入隊列(2)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果隊列滿了則開新線程,但是個數(shù)要不超過最大值昼伴,超過則返回false
//然后執(zhí)行reject handler(3)
else if (!addWorker(command, false))
reject(command);
}
可知當當前工作線程個數(shù)為corePoolSize后,如果在來任務(wù)會把任務(wù)添加到隊列镣屹,隊列滿了或者入隊失敗了則開啟新線程圃郊。
然后看看TaskQueue中重寫的offer方法的邏輯:
public boolean offer(Runnable o) {
// 如果parent為null則直接調(diào)用父類方法
if (parent==null) return super.offer(o);
//如果當前線程池中線程個數(shù)達到最大,則無條件調(diào)用父類方法
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//如果當前提交的任務(wù)小于當前線程池線程數(shù)女蜈,說明線程用不完持舆,沒必要重新開線程
if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
//如果當前線程池線程個數(shù)>core個數(shù)但是小于最大個數(shù),則開新線程代替放入隊列
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//到了這里伪窖,無條件調(diào)用父類
return super.offer(o);
}
可知parent.getPoolSize()<parent.getMaximumPoolSize()普通隊列會把當前任務(wù)放入隊列逸寓,TAskQueue則是返回false,因為這會開啟新線程執(zhí)行任務(wù)覆山,當然前提是當前線程個數(shù)沒有達到最大值竹伸。
然后看下Worker線程中如果從隊列里面獲取任務(wù)執(zhí)行的:
final void runWorker(Worker w) {
...
try {
while (task != null || (task = getTask()) != null) {
...
}
completedAbruptly = false;
} finally {
...
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
...
int wc = workerCountOf(c);
...
try {
//根據(jù)timed決定調(diào)用poll還是take
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
十二、總結(jié)
12.1 并發(fā)安全總結(jié)
仔細思考下阻塞隊列是如何實現(xiàn)并發(fā)安全的維護隊列鏈表的簇宽,先分析下簡單的情況就是當隊列里面有多個元素時候勋篓,由于同時只有一個線程(通過獨占鎖putLock實現(xiàn))入隊元素并且是操作last節(jié)點(,而同時只有一個出隊線程(通過獨占鎖takeLock實現(xiàn))操作head節(jié)點魏割,所以不存在并發(fā)安全問題譬嚣。
- 考慮當隊列為空的時候隊列狀態(tài)為:
這時候假如一個線程調(diào)用了take方法,由于隊列為空,所以count.get()==0所以當前線程會調(diào)用notEmpty.await()把自己掛起钞它,并且放入notEmpty的條件隊列拜银,并且釋放當前條件變量關(guān)聯(lián)的通過takeLock.lockInterruptibly()獲取的獨占鎖。由于釋放了鎖遭垛,所以這時候其他線程調(diào)用take時候就會通過takeLock.lockInterruptibly()獲取獨占鎖尼桶,然后同樣阻塞到notEmpty.await(),同樣會被放入notEmpty的條件隊列耻卡,也就說在隊列為空的情況下可能會有多個線程因為調(diào)用take被放入了notEmpty的條件隊列疯汁。
這時候如果有一個線程調(diào)用了put方法,那么就會調(diào)用enqueue操作卵酪,該操作會在last節(jié)點后面添加新元素并且設(shè)置last為新節(jié)點幌蚊。然后count.getAndIncrement()先獲取當前隊列元個數(shù)為0保存到c,然后自增count為1溃卡,由于c==0所以調(diào)用signalNotEmpty激活notEmpty的條件隊列里面的阻塞時間最長的線程溢豆,這時候take中調(diào)用notEmpty.await()的線程會被激活await內(nèi)部會重新去獲取獨占鎖獲取成功則返回,否者被放入AQS的阻塞隊列瘸羡,如果獲取成功漩仙,那么count.get() >0因為可能多個線程put了,所以調(diào)用dequeue從隊列獲取元素(這時候一定可以獲取到),然后調(diào)用c = count.getAndDecrement() 把當前計數(shù)返回后并減去1队他,如果c>1 說明當前隊列還有其他元素卷仑,那么就調(diào)用 notEmpty.signal()去激活 notEmpty的條件隊列里面的其他阻塞線程。
- 考慮當隊列滿的時候:
當隊列滿的時候調(diào)用put方法時候麸折,會由于notFull.await()當前線程被阻塞放入notFull管理的條件隊列里面锡凝,同理可能會有多個調(diào)用put方法的線程都放到了notFull的條件隊列里面。
這時候如果有一個線程調(diào)用了take方法,調(diào)用dequeue()出隊一個元素垢啼,c = count.getAndDecrement()窜锯;count值減一;c==capacity;現(xiàn)在隊列有一個空的位置芭析,所以調(diào)用signalNotFull()激活notFull條件隊列里面等待最久的一個線程锚扎。
12.2簡單對比
LinkedBlockingQueue與ConcurrentLinkedQueue相比前者前者是阻塞隊列使用可重入獨占的非公平鎖來實現(xiàn)通過使用put鎖和take鎖使得入隊和出隊解耦可以同時進行處理,但是同時只有一個線程可以入隊或者出隊馁启,其他線程必須等待驾孔,另外引入了條件變量來進行入隊和出隊的同步,每個條件變量維護一個條件隊列用來存放阻塞的線程进统,要注意這個隊列和AQS的隊列不是一個東東助币。LinkedBlockingQueue的size操作通過使用原子變量count獲取能夠比較精確的獲取當前隊列的元素個數(shù),另外remove方法使用雙鎖保證刪除時候隊列元素保持不變螟碎,另外其實這個是個生產(chǎn)者-消費者模型眉菱。
而ConcurrentLinkedQueue則使用CAS非阻塞算法來實現(xiàn),使用CAS原子操作保證鏈表構(gòu)建的安全性掉分,當多個線程并發(fā)時候CAS失敗的線程不會被阻塞俭缓,而是使用cpu資源去輪詢CAS直到成功,size方法先比LinkedBlockingQueue的獲取的個數(shù)是不精確的酥郭,因為獲取size的時候是通過遍歷隊列進行的华坦,而遍歷過程中可能進行增加刪除操作,remove方法操作時候也沒有對整個隊列加鎖,remove時候可能進行增加刪除操作不从,這就可能刪除了一個剛剛新增的元素惜姐,而不是刪除的想要位置的。
歡迎關(guān)注微信公眾號:‘技術(shù)原始積累’ 獲取更多技術(shù)干貨__