解讀Java同步器相關源碼
關于解讀Java同步器相關源碼的文章已經(jīng)數(shù)不勝數(shù), 但是經(jīng)典的東西總能經(jīng)的起反復的解讀和學習. 最好每個人都能親自翻著源碼看一看, 肯定會有收獲. 如果因為這篇文章促成你看源碼, 那么這篇文章遠超過了它內容的價值.
文章包含五部分內容: AQS源碼解讀;ReentrantLock源碼解讀药薯;ReentrantReadWriteLock源碼解讀您朽;CountDownLatch源碼解讀狂丝;拜神儀式. 源碼部分并不是面面俱到, 只是大致分析了AQS如何在每個類中發(fā)揮作用.
AQS部分
AbstractQueuedSynchronizer是基于自旋和先進先出隊列的同步器. Java并發(fā)包中的可重入鎖(ReentrantLock), 讀寫鎖(ReentrantReadWriteLock), 信號量(Semaphore), 閉鎖(CountDownLatch)都基于AQS構建.
- AQS內部維護了一個先進先出的隊列, 組成隊列的節(jié)點由Node表示, Node大致結構如下:
static final class Node {
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
// 指向前后節(jié)點
volatile Node prev;
volatile Node next;
// 指向當前線程(或者正在獲取鎖, 或者正在申請信號量...)
volatile Thread thread;
}
其中waitStatus表示當前Node的狀態(tài). 舉個例子, 假如一個競爭非常激烈的鎖, 某個線程一段時間內未競爭到鎖, 而取消了. 這是Node會被置為取消的狀態(tài). Node初始化時的值為0.
CANCELLED: Thread已取消
SIGNAL: Thread正等待被unpark(獲取鎖未成功進入隊列等待的線程, 會被標記成SIGNAL, 線程也會通過LockSupport.park被掛起)
CONDITION: Thread正在等待Condition, 在condition queue中
PROPAGATE: 只可能頭節(jié)點被設置該狀態(tài), 在releaseShared時需要被傳播給后續(xù)節(jié)點.
在AQS中還有一個非常重要的狀態(tài)屬性:
private volatile int state
. 這個屬性可以被子類擴展成不同的用途. 在ReentrantLock中state表示獲取鎖的數(shù)量(最多2147483647個), state=0表示鎖未被獲取過. ReentrantReadWriteLock, 更是巧妙的把state分成兩段來用, 高16位表示讀鎖數(shù)量, 低16位表示寫鎖數(shù)量.因此ReentrantReadWriteLock限制最大的鎖數(shù)量是65535. Semphore表示剩余的許可數(shù)量. CountDownLatch表示閉鎖數(shù)量.AQS作為一個基類出現(xiàn), 如果基于它來構建同步工具需要重新定義以下方法:
tryAcquire // 獨占模式下獲取鎖
tryRelease // 獨占模式下釋放所
tryAcquireShared // 共享模式下獲取鎖
tryReleaseShared // 共享模式下釋放鎖
isHeldExclusively // 判斷鎖是不是被當前線程獨占
獨占鎖只能由一條線程正在使用鎖, 共享鎖多個線程可以同時使用.
ReentrantLock 部分
這一部分來分析ReentrantLock的非公平模式下獲取和釋放鎖的過程. 先來說獲取鎖的過程, 獲取鎖可以大致的分為兩個階段: 搶占式獲取, 如果獲取成功則立即返回;排隊等待. 下邊來看源碼:
// 使用lock獲取鎖的時候, 如果鎖處于空閑狀態(tài), 則獲取成功立即返回.
// 如果所被當前線程持有, 則嵌套的獲取可沖入鎖.
// 如果鎖被其他線程持有, 則當前線程被切換出去, 直到獲取了鎖.
final void lock() {
// 如果當前鎖處于空閑狀態(tài), 則直接獲取鎖. 是不用排隊的.
if (compareAndSetState(0, 1))
// 設置當前線程為獨占線程
setExclusiveOwnerThread(Thread.currentThread());
// 如果當前不能獲取, 則和公平鎖一樣進入隊列
else
acquire(1);
}
// AQS
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
先嘗試立即獲取鎖, 如果剛好搶上了, 則立即返回. 這時可能有其它線程等待, 也可能沒有. 獲取鎖通過CAS修改AQS中state的狀態(tài). 如果修改成功, 表示獲取鎖成功, 設置當前線程為owner.
非公平鎖有著更好的性能, 想象一種情況, A線程釋放鎖, B線程正在等待被喚醒, 而這時C快速的獲取鎖并釋放, 這一切都趕在B被喚醒之前. 而公平鎖則無法達到這種共贏的局面.
接下看插隊失敗后如何處理:
public final void acquire(int arg) { // arg = 1
// 如果獲取鎖失敗 && 并且當前線程被中斷當前線程
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 調用線程的interupt方法.
selfInterrupt();
}
上邊我們說過tryAcquire需要實現(xiàn)者重寫, 我們拿到ReentrantLock.NonfaitLock的實現(xiàn):
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
這個方法調用的是父類Sync的nonfairTryAcquire方法. Sync繼承了AQS, 而NonfaitLock繼承自Sync.
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 獲取狀態(tài)
int c = getState();
// 空閑狀態(tài)可以直接獲取
if (c == 0) {
// 獲取成功, set 線程
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果當前線程已經(jīng)獲取了鎖, 則可重入獲取
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 更新當前鎖被獲取的數(shù)量
setState(nextc);
return true;
}
return false; // 獲取失敗
}
首先, 獲取當前AQS的state, 上邊說過Reentrant用它來記錄鎖的獲取情況. 如果state == 0, 嘗試CAS修改狀態(tài), 成功則獲取鎖成功. 如果state不是0, 則表示當前鎖已經(jīng)被占有, 因為是可重入鎖, 所以, 如果當前線程和持有鎖的線程是同一個線程, 更新AQS狀態(tài), 獲取鎖成功. 這是第二次插隊的過程.
如果tryAcquire失敗了呢? 這次真的要排隊了.
// 把當前獲取鎖的節(jié)點放入等待隊列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 先嘗試直接入隊列
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果失敗, 則執(zhí)行自旋set
enq(node);
return node;
}
先嘗試看能不能直接一次進隊, 如果不行調用enq方法, 執(zhí)行自旋邏輯去.
最后一次不排隊獲取鎖的機會:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 獲取該tail的前一個節(jié)點
final Node p = node.predecessor();
// 如果當前節(jié)點的上一個節(jié)點==頭節(jié)點, 并且獲取鎖成功.
// 意味著當前的head節(jié)點釋放了鎖. 所以當前線程獲取鎖成功, 并且把當前節(jié)點置為head節(jié)點
if (p == head && tryAcquire(arg)) {
setHead(node); 則把node設置為頭節(jié)點
p.next = null; // help GC
failed = false;
return interrupted;
}
// 檢查當前線程是否應該被掛起 && 調用LockSupport.park成功則當前線程可被中斷.
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
看for循環(huán)中的第一個if, 如果node的前一個節(jié)點是頭節(jié)點(即正在占有鎖), 并且tryAcquire獲取鎖成功, 說明當前線程成功的拿到了鎖, 不需要被interrupt.
接著看shouldParkAfterFailedAcquire和parkAndCheckInterrupt兩個方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 如果前一個節(jié)點的state是SIGNAL, 則當前線程應該interrupt
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { // 刪除cancelled的node
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
如果前一個節(jié)點的waitStatus是SIGNAL, 表示這個節(jié)點也在等待被喚醒, 當前線程需要被interrupt, 老實排隊. 如果waitStatus > 0(這個節(jié)點被取消了), 移出隊列, 并檢查是否有其他節(jié)點需要被移除. 否則的話設置前一個節(jié)點的waitStatus為SIGNAL. 因為這個方法調用是在acquireQueued方法中的for循環(huán)中, 會不停的被調用. 最終期望的結果是, 如果當前線程沒有機會獲取鎖, 則掛起該線程, 并到隊列排隊.
parkAndCheckInterrupt方法會調用LockSupport.park(this)
掛起當前線程. 因為acquireQueued在一個for循環(huán)里邊, 所以當線程被unpark的時候仍會接著執(zhí)行重新獲取鎖的邏輯.
所以回頭再看acquire方法就可以用一句話概括: 如果獲取成功則立即返回, 如果獲取鎖失敗, 加入等待隊列, 然后后調用selfInterrupt方法. 到此, 加鎖的過程分析完成. 下邊來看unlock的過程.
public void unlock() {
sync.release(1);
}
// AQS
public final boolean release(int arg) {
if (tryRelease(arg)) { // 如果當前線程完全釋放鎖
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
ReentrantLock的unlock方法調用了AQS的release方法. tryRelease我們可以在ReentrantLock.Sync中找到其實現(xiàn):
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 因為是可重入鎖, 所以c可能大于1, 所以只有c == 0時, 鎖才釋放完成.
free = true;
setExclusiveOwnerThread(null);
}
setState(c); // 設置state
return free;
}
首先, 判斷當前線程獲取鎖的剩余數(shù)量, 如果數(shù)量為0, 則釋放成功, 把AQS中的Thread置空. 如果數(shù)量 != 0, 則把當前的數(shù)量set到AQS的state中. 可重入獲取鎖, 所以可能不會一次性釋放完成. 當只有當c為0, 才真正釋放, 返回true.
如果tryRelease返回true, 判斷當前head的狀態(tài), 然后執(zhí)行unparkSuccessor方法.
private void unparkSuccessor(Node node) { // node == head
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 設置waitStatus為0
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; // 如果node的下個節(jié)點為空, 或者s的waitStatus 是cancelled
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // 從隊尾開始找可被unpark的Node
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
如果當前釋放鎖的節(jié)點waitStatus < 0, 則重置其狀態(tài)為0(初始狀態(tài)). 然后拿到當前節(jié)點的下一個節(jié)點, 如果這個節(jié)點為null或者被cancel了, 則沿著隊尾(可能等待隊列斷鏈了)找到最后一個等待節(jié)點并把這個節(jié)點包含的線程喚醒. 釋放鎖包含釋放鎖, 喚醒下一個等待的線程.
ReentrantReadWriteLock部分
ReentrantReadWriteLock也分公平和非公平模式, 我們主要分析非公平鎖的邏輯.
先看讀鎖lock的過程:
public void lock() {
sync.acquireShared(1);
}
//AQS
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
//AQS
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 如果當前已有獨占鎖(即寫鎖), 而且獲取鎖的線程不是當前的線程, 返回-1. 外邊執(zhí)行doAcquireShared, 走排隊邏輯
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 獲取讀鎖的獲取數(shù)
int r = sharedCount(c);
// 如果條件符合則直接cas獲取讀鎖
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 如果上述沒有獲取成功, 則spin等待隊列獲取鎖.
return fullTryAcquireShared(current);
}
// 檢查當前排隊的隊頭是否是寫鎖, 如果是的話給寫鎖優(yōu)先權.
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
通過exclusiveCount方法可以獲取當前獨占鎖(即寫鎖)的數(shù)量. 如果當前已經(jīng)有寫鎖, 而且鎖定的線程不是當前線程, 直接返回失敗. 對于讀寫鎖, 如果當前有線程占有寫鎖, 則讀鎖不能被獲取.
跳到下一個if, 這有三個判斷條件:
- 這個方法判斷當前的獲取讀鎖是否應該被block. 啥時候需要被block? 如果當前等待隊列的頭節(jié)點將要獲取寫鎖, 那么這種情況下讀鎖需要把優(yōu)先權讓給它(只有這一種情況會, 比如寫鎖在隊列中則會照常按序等待).
- 讀鎖的數(shù)量是否超了最大鎖數(shù)限制65535.
- CAS state是否成功
如果三個條件同時為真, 則讀鎖獲取所成功. 否則調用fullTryAcquireShared, 通過自旋來獲取鎖.
如果仍然沒有獲取成功(比如, 寫鎖一直被占有, 或者等待隊列頭節(jié)點正在等待獲取寫鎖), 則會執(zhí)行doAcquireShared方法.
//AQS
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這段代碼我們比較熟悉, 首先把當前線程加到等待隊列中. 如果當前節(jié)點的前一個節(jié)點是head, 則嘗試CAS獲取鎖. 和重入鎖不同的是獲取成功之后執(zhí)行的操作: set head 且 把等待的讀鎖都喚醒.
unlock的過程, 還有寫鎖幾乎和ReentrantLock一致的, 所以這里就不再次分析了.
CountDownLatch 部分
CountDownLatch也就是我們常說的閉鎖, 它可以實現(xiàn)這樣的功能: 等待所有的線程到達. 比如我們把任務分配給了多個線程去執(zhí)行, 等待所有的線程執(zhí)行完匯總結果. 類似這種通知機制我們可以用閉鎖實現(xiàn). 實現(xiàn)閉鎖的時候, 我們需要指定等待數(shù), 也可以指定所有線程到達時觸發(fā)的回調函數(shù). 閉鎖的兩個核心方法是:
- countDown: 通知線程已就緒.
- await: 等待所有的線程到達.
接下來我們分析下閉鎖的實現(xiàn). 首先從它的構造函數(shù)看起(說是看它, 其實為了看Sync):
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
CountDownLatch在構造的時候新建了一個Sync實例, 構造函數(shù)指定的閉鎖數(shù)量最終通過setState賦值給了AQS的state. 接著我們分析countDown方法():
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
邏輯比較簡單, countDown一次, state減少一次. 當state=0(所有線程到達), 執(zhí)行doReleaseShared方法. 我們再來看下await方法:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
當state > 0時, 主線程執(zhí)行doAcquireSharedInterruptibly方法(會檢查線程是否中斷), 首先將當前線程添加到等待隊列中, 然后線程被中斷. 當最后一個線程執(zhí)行到countDown時, 通過doReleaseShared方法把所有等待線程喚醒.
至此, AQS及其實現(xiàn)類都分析完畢. 文章中只是選擇性的分析了一些場景, 并沒有面面俱到. 所以, 如果想要更深入全面的了解, 還需自己去看代碼, 以及Doug Lea大神的論文, 幸運的是ifeve已經(jīng)有了中譯版本.
拜神
上述我們所看的源碼, 都出自Doug Lea之手, 針對AQS的設計還有一片論文, 大家可以找來看看.