1. 結(jié)構(gòu)
uml依賴關系
其核心還是隊列同步器Sync,它被ReadLock和WriteLock所共享撵颊。state為鎖標記笛谦, 其中高16位為ReadLock總計數(shù)標記糕殉,低16位為WriteLock(獨占模式)的重入計數(shù)探熔。
2. 讀鎖
2.1.讀鎖獲取
public void lock() {
sync.acquireShared(1);
}
/*
* 1. 若寫鎖被其他線程持有驹针, 則失敗
* 2. 若當前線程可以鎖定寫狀態(tài),先判斷是否應當阻塞(排隊策略)诀艰, 不需阻塞的話柬甥,嘗試CAS地修改狀態(tài)位。
* 3. 若2修改失敗了涡驮,可能是因為其他線程此時已經(jīng)持有了寫鎖暗甥,也可能是 CAS失敗, 還有可能是其他未處理的可重入讀(具體可以看代碼注釋),那么應該走fullyTryAcquireShared流程,不斷重試
*/
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 1.若寫鎖被其他線程持有捉捅, 則失敗
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// 2.若當前線程可以鎖定寫狀態(tài),先判斷是否應當阻塞(排隊策略)虽风, 不需阻塞的話棒口,嘗試CAS地修改狀態(tài)位。
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 修改成功辜膝,若當前線程是第一個讀線程(第一次進入或者重入)
if (r == 0) { // 第一次進入
firstReader = current;
firstReaderHoldCount = 1; // 記錄重入次數(shù)
} else if (firstReader == current) { // 它就是第一個讀線程无牵,發(fā)生重入, 將其重入次數(shù)+1
firstReaderHoldCount++;
} else { // 當前線程并非第一個讀厂抖,修改讀線程計數(shù)
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
/*
* step2失敗茎毁,可能原因是:
* CAS嘗試失敗,
* rh!= null && rh.tid != currentThreadId && rh.count > 0, 這意味著除了firstReader外忱辅,還有其他reader將rh狀態(tài)修改了
*/
return fullTryAcquireShared(current);
}
如果tryAcquireShared()
中第2步失敗了七蜘,那么進入fullTryAcquireShared()
不斷重試。
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
2.3 讀鎖釋放
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// 若讀鎖被徹底釋放墙懂,那么需要去喚醒隊列中排隊獲取X鎖的線程
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
/*
1. 如果當前線程就是firstReader, 那么將其重入計數(shù)減1橡卤, 減到0后將firstReader置為null
2. 否則,獲取當前線程的重入計數(shù)损搬, 將其減1碧库, 減到0將記錄當前線程重入次數(shù)的readHold移除
3. 死循環(huán)內(nèi)柜与,CAS地修改讀鎖計數(shù)(state高16位)
返回: 讀鎖獲取計數(shù)是否為0
*/
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
3. 寫鎖
3.1 寫鎖結(jié)構(gòu)
寫鎖與讀鎖共享同一個隊列同步器, 本質(zhì)上寫鎖就是一把X鎖嵌灰。
3.2 寫鎖獲取
public void lock() {
sync.acquire(1);
}
/*
* 先嘗試獲取弄匕,嘗試失敗,則進入阻塞隊列沽瞭,等待獲取
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 排隊等待獲取鎖
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 補償中斷標記粘茄,其原因是在acquireQueued中,每次unpark后都要清除中斷標記
selfInterrupt();
}
/*
* 1. 如果當前有其他線程獲取了讀鎖或?qū)戞i秕脓, 那么本次獲取失敗
* 2. c 柒瓣!= 0 && w != 0, 表示當前線程要重入,如果獨占鎖重入計數(shù)過大吠架,那么返回失斳狡丁;否則將重入計數(shù)增加
* 3. 當前線程有資格去修改state傍药, 但具體本次能不能改取決于排隊策略: 比如fair mode下磺平,有其他線程在排隊獲取X鎖,那么就不能直接獲取
*/
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
// state 高16位為shared標記位拐辽, 低16位為exclusive lock 重入計數(shù)拣挪。
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
addWaiter()
和acquireQueued()
實質(zhì)上就是AQS的方法,與ReentrantLock一樣俱诸,在這里不啰嗦了菠劝。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.3 寫鎖釋放
public void unlock() {
sync.release(1);
}
release()
中unparkSuccessor(Node node)
為AQS方法,同ReentrantLock,不做贅述睁搭。
public final boolean release(int arg) {
// 如果tryRelease返回true赶诊, 則意味著重入計數(shù)為0了,那么應該考慮喚醒等待隊列中的線程
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 減掉重入計數(shù)园骆,返回是否完全釋放(free)
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}