總所周知,java concurrent包的工具類是構建在AbstractQueuedSynchronizer類上的基礎上的减牺,而這個類是Doug Lea大神基于CHL隊列實現(xiàn)的同步器晚缩。這個強大的同步器是怎樣實現(xiàn)的呢浅役?我們來一探究竟。
因為AQS的代碼比較難以理解瞻凤,我們從concurrent包下的并發(fā)工具類著手開始研究憨攒。從最簡單的CountDownLatch開始,首先看它的源碼
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
CountDownlatch類定義了一個Sync類繼承自AQS阀参,實現(xiàn)的了AQS的tryAcquireShared和tryReleaseShared方法肝集,share顧名思義是共享鎖。首先從await方法入手:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
await方法調用的AQS的acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
從這個方法看蛛壳,await方法是可中斷的杏瞻,如果當前線程被中斷,則直接向上拋InterruptedException衙荐。如果正常執(zhí)行捞挥,則會調用tryAcquireShared方法,這個是在之類中實現(xiàn)的∮且鳎現(xiàn)在回到CountDownLatch砌函,看tryAcquireShared的實現(xiàn):
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
很簡單,如果state為0則返回1溜族,否則讹俊,返回-1。state是構造函數(shù)里傳進來的煌抒。我們都知道使用CountDownlatch時傳進來的數(shù)字表示并發(fā)執(zhí)行的線程數(shù)仍劈,由此聯(lián)想state就是持有鎖的線程數(shù)。從acquireSharedInterruptibly方法可以看到摧玫,當前state!=0耳奕,即并發(fā)任務線程還沒執(zhí)行完時绑青,會進入doAcquireSharedInterruptibly:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
首先看addWaiter方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
也就是說在pred為null的時候會初始化隊列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
從上面代碼看初始化之后的隊列是這樣的:
head只是指向一個空節(jié)點诬像,這一點對于理解后面的代碼很重要屋群,再回到doAcquireSharedInterruptibly,p的前繼節(jié)點就是head,所以會進入下面的if分支(至于為什么有這個if判斷后面再詳解),對于CountDownLatch坏挠,在并發(fā)任務還沒完成的時候芍躏,tryAcquireShared返回值為-1,所以就不會往下走降狠。直接進入shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
pred就是head对竣,初始化之后waitStatus=0,進入else分支榜配,故head的waitStatus被更新為SIGNAL否纬,再回到doAcquireSharedInterruptibly,這個時候如果線程沒有被中斷,那么會接著循環(huán)蛋褥,再次進入shouldParkAfterFailedAcquire临燃,這個是進入第一個if分支,返回true烙心,那么就是進入parkAndCheckInterrupt膜廊,將當前線程阻塞住,這就是CountDownlatch調用await后阻塞住的原因。
從上面的分析可以知道淫茵,對于CountDownlatch爪瓜,在并發(fā)任務還沒結束的時候,如果另外一個線程B再調用await方法匙瘪,那么當前線程會放到等待隊列的最后面铆铆。第一個節(jié)點park住的時候,它的waitStatus還是0丹喻,所以這次算灸,shouldParkAfterFailedAcquire會把第一個節(jié)點的waitStatus設置為SIGNAL,同時下次循環(huán)會park住線程B
AQS獲取鎖的過程已經(jīng)了解清楚了驻啤,下面來看看AQS釋放鎖的過程菲驴。還是從CountDownLatch的countdown()方法入手。countdown()是直接調用AQS的releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
從代碼看骑冗,tryReleaseShared是在子類中實現(xiàn)的:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
從tryReleaseShared方法代碼來看赊瞬,只有等所有并發(fā)任務執(zhí)行完,tryReleaseShared才會返回true贼涩,才會執(zhí)行doReleaseShared
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
如果head節(jié)點的waitStatus為SIGNAL巧涧,則先把head節(jié)點的status設置為0,然后進入unparkSuccessor
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
在通常情況下遥倦,s!=null并且s.waitStatus為SIGNAL谤绳,所以head節(jié)點的后繼節(jié)點會被喚醒占锯。就是說每次調用releaseShared只會喚醒等待隊列中head節(jié)點之后的線程。
分析到這里缩筛,試想這個使用CountDownLatch場景消略,線程A和線程B,都調用await方法等待線程B瞎抛、線程C完成任務艺演。那么在線程B、線>程C完成任務的時候桐臊,主線程調用releaseShared進入doReleaseShared喚醒head節(jié)點之后的節(jié)點線程胎撤。因為原來的線程是在doAcquireSharedInterruptibly里的for循環(huán)最后park住,現(xiàn)在仍然回到該處断凶,繼續(xù)下次循環(huán)伤提。這個時候會進入上面提到的if分支,進入setHeadAndPropagate认烁。
private void setHeadAndPropagate(Node node, long propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
從代碼來看肿男,setHeadAndPropagate就是把當前當前head節(jié)點remove掉,設置當前線程節(jié)點為head節(jié)點(也就是第二個節(jié)點)砚著。同時在共享鎖的模式下次伶,會調用doReleaseShared,喚醒當前節(jié)點的后繼節(jié)點,這就是propagate的概念稽穆。同理后續(xù)節(jié)點又會再喚醒它后面的節(jié)點冠王,直到整個隊列都被喚醒。
至此舌镶,已基本了解AQS的工作原理的柱彻,為了加深印象,我們來看下面的線程隊列的變化過程圖餐胀。
線程thread1調用acquireSharedInterruptibly之后哟楷,線程隊列如下圖,同時thread1被park住
另外一個線程thead2再次調用acquireSharedInterruptibly之后否灾,線程隊列如下圖卖擅,同時thread2被park住
這個時候,另一個線程觸發(fā)releaseShared墨技,線程隊列如下圖惩阶,同時thread1被unpark
thread1被unpark之后,會進入setHeadAndPropagate扣汪,setHead之后断楷,線程隊列如下圖
thread1調用doReleaseShared喚醒thread2后,線程隊列如下圖
thread2 進入setHeadAndPropagate崭别,setHead之后冬筒,線程隊列如下圖