BlockingQueue是我們在使用線程池的時(shí)候使用比較多的等待隊(duì)列,這里同時(shí)借助BlockingQueue分析下AQS中的ConditionObject誉碴。
ArrayBlockingQueue
構(gòu)造函數(shù) :
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
// 構(gòu)造函數(shù)中會new出一個(gè)新的ReentrantLock 方便后續(xù)使用
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition(); // 用于掛起生產(chǎn)節(jié)點(diǎn)
notFull = lock.newCondition(); // 用于掛起消費(fèi)節(jié)點(diǎn)
}
put方法:
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 這里加鎖保證入隊(duì)的原子性
// 由于使用Interruptibly結(jié)尾的lock 所以會拋出中斷異常
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); // 隊(duì)列已滿 阻塞自己
enqueue(e); 入隊(duì)
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal(); // 喚醒一個(gè)消費(fèi)節(jié)點(diǎn)
}
poll:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果 隊(duì)列為空 直接返回空 否則取出一個(gè)
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0; // 將takeIndex 重置為隊(duì)列頭部
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); // 喚醒一個(gè)生產(chǎn)節(jié)點(diǎn)
return x;
}
take:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); // 如果隊(duì)列為空 會阻塞自己
return dequeue();
} finally {
lock.unlock();
}
}
可以看到叛薯,在BlockingQueue中稀轨,使用Condition做了一些阻塞操作撮慨,下面來分析下:
首先newCondition方法會生成一個(gè)ConditionObject對象毛萌,該對象是AQS中的一個(gè)內(nèi)部類:
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
在使用condition時(shí)航闺,AQS會維護(hù)一個(gè)ConditionObject隊(duì)列褪测,隊(duì)列中記錄了所有正在等待的節(jié)點(diǎn),并且這些節(jié)點(diǎn)不會去搶鎖潦刃。
然后來看下await方法侮措,該方法的作用是將當(dāng)前線程放入等待隊(duì)列,并從CLH隊(duì)列中取出(關(guān)于CLH隊(duì)列乖杠,其實(shí)就是AQS中維護(hù)的雙向鏈表分扎,用于等待獲取鎖):
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 新建condition狀態(tài)的節(jié)點(diǎn)并將其入隊(duì)
AbstractQueuedSynchronizer.Node node = addConditionWaiter();
// 釋放當(dāng)前節(jié)點(diǎn)的鎖
// 注意 這里會記錄下拿了幾個(gè)鎖 后面加鎖也需要同樣的數(shù)量
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 節(jié)點(diǎn)沒有在CLH隊(duì)列中里面
// //將線程進(jìn)行掛起,前面已經(jīng)釋放掉鎖了胧洒,并且已經(jīng)安全的添加到了condition隊(duì)列中
LockSupport.park(this);
// 執(zhí)行這里的條件: 被中斷 or 被前置節(jié)點(diǎn)喚醒
// 這里只要checkInterruptWhileWaiting 返回的是0 就繼續(xù)park
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 重新?lián)屾i 此時(shí)節(jié)點(diǎn)已經(jīng)在CLH隊(duì)列中了 獲取成功后判斷該線程是否發(fā)生錯誤
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT; // 退出時(shí)重新中斷
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters(); // 清除隊(duì)列中的無效節(jié)點(diǎn)
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode); // 如果出現(xiàn)異常 拋出
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters(); // 清理無效節(jié)點(diǎn)
t = lastWaiter;
}
// 新建condition類型的節(jié)點(diǎn)
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) // 隊(duì)列為空 則新建的節(jié)點(diǎn)就是頭
firstWaiter = node;
else // 否則 將當(dāng)前節(jié)點(diǎn)加入到隊(duì)尾
t.nextWaiter = node;
lastWaiter = node;
return node;
}
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 注意 這里有個(gè)next 有個(gè) nextWaiter
// 一個(gè)用于CLH隊(duì)列 一個(gè)用于condition等待隊(duì)列 要區(qū)分開來
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
// 獲取隊(duì)列尾節(jié)點(diǎn)
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) { // 從尾部遍歷整個(gè)節(jié)點(diǎn) 看是否有當(dāng)前節(jié)點(diǎn)
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
釋放當(dāng)前節(jié)點(diǎn)的所有鎖:
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) { // 釋放當(dāng)前節(jié)點(diǎn)的所有鎖 并喚醒后置節(jié)點(diǎn)
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
public final boolean release(int arg) {
if (tryRelease(arg)) { // 釋放所有鎖
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 喚醒后繼節(jié)點(diǎn)
return true;
}
return false;
}
清理隊(duì)列中的無效節(jié)點(diǎn):
private void unlinkCancelledWaiters() {
AbstractQueuedSynchronizer.Node t = firstWaiter;
AbstractQueuedSynchronizer.Node trail = null;
while (t != null) {
AbstractQueuedSynchronizer.Node next = t.nextWaiter; // 拿到下一個(gè)節(jié)點(diǎn)
// 若頭節(jié)點(diǎn)的狀態(tài)已經(jīng)不是CONDITION
if (t.waitStatus != AbstractQueuedSynchronizer.Node.CONDITION) {
t.nextWaiter = null; // 剔除頭節(jié)點(diǎn)
if (trail == null)
firstWaiter = next; // 直接將 firstWaiter 記錄為 next
else
// trail已經(jīng)被記錄為CONDITION狀態(tài)的節(jié)點(diǎn)
// 將nextWaiter 記錄為next 即:
// CONDITION -> CANCELED -> UNKNOW
// 轉(zhuǎn)換為 CONDITION -> UNKNOW
trail.nextWaiter = next;
if (next == null) // 已經(jīng)遍歷到了隊(duì)尾
lastWaiter = trail;
}
else
trail = t; // 如果當(dāng)前節(jié)點(diǎn)還是CONDITION狀態(tài) 則使用trail記錄下
t = next;
}
}
該方法中有個(gè)疑點(diǎn)畏吓,CONDITION狀態(tài)是什么時(shí)候被重置掉的 ?
其實(shí)是在await方法中 :
// 這里 線程被喚醒后 會執(zhí)行checkInterruptWhileWaiting 方法
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
private int checkInterruptWhileWaiting(AbstractQueuedSynchronizer.Node node) {
// 判斷節(jié)點(diǎn)是否還在condition隊(duì)列里面卫漫,如果在菲饼,將狀態(tài)變成0,放到等待返回true列赎,拋異常宏悦。
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
// 也就是在這里 node 狀態(tài)會被重置為0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node); // 將該節(jié)點(diǎn)入隊(duì)
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
// 可能有別的線程通過signal 喚醒了當(dāng)前節(jié)點(diǎn)
// 并且正在入隊(duì) 那么這時(shí) 自旋啥也不干
while (!isOnSyncQueue(node))
Thread.yield();
return false; // 修改node 狀態(tài)失敗 返回false
}
// 這個(gè)方法應(yīng)該很熟悉了 將一個(gè)節(jié)點(diǎn)放入CLH隊(duì)列中
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
接下來看下signal方法:
public final void signal() {
// getExclusiveOwnerThread() == Thread.currentThread();
// 持有鎖的線程是否是本線程,如果不是持有鎖的線程直接拋異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
AbstractQueuedSynchronizer.Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(AbstractQueuedSynchronizer.Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null) // 將頭節(jié)點(diǎn)后移一位
lastWaiter = null; // 隊(duì)列已經(jīng)空了
first.nextWaiter = null; // 斷開當(dāng)前節(jié)點(diǎn)對后繼節(jié)點(diǎn)的引用
} while (!transferForSignal(first) &&
(first = firstWaiter) != null); // 隊(duì)列不為空 則繼續(xù)
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 將剝離出來的節(jié)點(diǎn)改為0狀態(tài)
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false; // 失敗的話 直接返回 操作等待隊(duì)列中下一個(gè)節(jié)點(diǎn)
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node); // 將當(dāng)前節(jié)點(diǎn)入隊(duì) 注意 這里返回的是當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
int ws = p.waitStatus;
// 如果前置節(jié)點(diǎn)狀態(tài)大于0(被取消)
// 或者更新狀態(tài)為SIGNAL 失敯摺(SIGNAL表示后繼節(jié)點(diǎn)可以被喚醒)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 則直接喚醒當(dāng)前線程
LockSupport.unpark(node.thread);
return true;
}
signalAll:
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null); // 主要這里不同 只要等待隊(duì)列還有節(jié)點(diǎn) 就繼續(xù)喚醒
}