java并發(fā)包已經存在Reentrant鎖和條件鎖,已經可以滿足很多并發(fā)場景下線程安全的需求政钟。但是在大量讀少量寫的場景下幔荒,并不是最優(yōu)的選擇权烧。與傳統鎖不同的是讀寫鎖的規(guī)則是可以共享讀,但只能一個寫辉词。很多博客中總結寫到讀讀不互斥摸航,讀寫互斥,寫寫互斥骆莹。就讀寫這個場景下來說,如果一個線程獲取了寫鎖担猛,然后再獲取讀鎖(同一個線程)也是可以的幕垦。鎖降級就是這種情況丢氢。但是如果也是同一個線程,先獲取讀鎖先改,再獲取寫鎖是獲取不到的(發(fā)生死鎖)疚察。所以嚴謹一點情況如下:
項目 | 非同一個線程 | 同一個線程 |
---|---|---|
讀讀 | 不互斥 | 不互斥 |
讀寫 | 互斥 | 鎖升級(不支持),發(fā)生死鎖 |
寫讀 | 互斥 | 鎖降級(支持)仇奶,不互斥 |
寫寫 | 互斥 | 不互斥 |
讀寫鎖的主要特性:
- 公平性:支持公平性和非公平性貌嫡。
- 重入性:支持重入。讀寫鎖最多支持 65535 個遞歸寫入鎖和 65535 個遞歸讀取鎖猜嘱。
- 鎖降級:遵循獲取寫鎖衅枫,再獲取讀鎖,最后釋放寫鎖的次序朗伶,如此寫鎖能夠降級成為讀鎖弦撩。
ReentrantReadWriteLock
java.util.concurrent.locks.ReentrantReadWriteLock ,實現 ReadWriteLock 接口论皆,可重入的讀寫鎖實現類益楼。在它內部,維護了一對相關的鎖点晴,一個用于只讀操作(共享鎖)感凤,另一個用于寫入操作(排它鎖)。
ReentrantReadWriteLock 類的大體結構如下:
/** 內部類 讀鎖 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 內部類 寫鎖 */
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
/** 使用默認(非公平)的排序屬性創(chuàng)建一個新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock() {
this(false);
}
/** 使用給定的公平策略創(chuàng)建一個新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
/** 返回用于寫入操作的鎖 */
@Override
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
/** 返回用于讀取操作的鎖 */
@Override
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* 省略其余源代碼
*/
}
public static class WriteLock implements Lock, java.io.Serializable {
/**
* 省略其余源代碼
*/
}
public static class ReadLock implements Lock, java.io.Serializable {
/**
* 省略其余源代碼
*/
}
- ReentrantReadWriteLock 與 ReentrantLock一樣粒督,其鎖主體也是 Sync陪竿,它的讀鎖、寫鎖都是通過 Sync 來實現的屠橄。所以 ReentrantReadWriteLock 實際上只有一個鎖族跛,只是在獲取讀取鎖和寫入鎖的方式上不一樣。
- 它的讀寫鎖對應兩個類:ReadLock 和 WriteLock 锐墙。這兩個類都是 Lock 的子類實現礁哄。
在 ReentrantLock 中,使用 Sync ( 實際是 AQS )的 int 類型的 state 來表示同步狀態(tài)溪北,表示鎖被一個線程重復獲取的次數桐绒。但是,讀寫鎖 ReentrantReadWriteLock 內部維護著一對讀寫鎖之拨,如果要用一個變量維護多種狀態(tài)茉继,需要采用“按位切割使用”的方式來維護這個變量,將其切分為兩部分:高16為表示讀蚀乔,低16為表示寫烁竭。
分割之后,讀寫鎖是如何迅速確定讀鎖和寫鎖的狀態(tài)呢乙墙?通過位運算颖变。假如當前同步狀態(tài)為S,那么:
- 寫狀態(tài)听想,等于 S & 0x0000FFFF(將高 16 位全部抹去)
-
讀狀態(tài)腥刹,等于 S >>> 16 (無符號補 0 右移 16 位)。
image.png
關鍵屬性
讀寫鎖由于要讀讀是共享的汉买,所以實現比ReentrantLock復雜的多衔峰。在ReentrantReadWriteLock類中有幾個成員屬性,先單獨提出來蛙粘,有助于理解下面的源碼分析垫卤。
//每個線程讀鎖持有數量
static final class HoldCounter {
int count = 0;
// 這個getThreadId用到了UNSAFE,why出牧?Thread本身是有getId可以獲取線程id的穴肘,這里為什么要用UNSAFE是為了防止重寫
final long tid = getThreadId(Thread.currentThread());
}
// ThreadLocal類型,這樣HoldCounter 就可以與線程進行綁定舔痕。獲取當前線程的HoldCounter
private transient ThreadLocalHoldCounter readHolds;
//緩存的上一個讀取線程的HoldCounter
private transient HoldCounter cachedHoldCounter;
// 第一個獲得讀鎖的線程(最后一個把共享鎖計算從0變到1的線程)
private transient Thread firstReader = null;
// firstReader 持有讀鎖的數量
private transient int firstReaderHoldCount;
讀鎖獲取
獲取讀鎖調用ReentrantReadWriteLock#readLock#lock就可以评抚,來看實現
public void lock() {
//acquireShared獲取共享鎖,實現在AQS的類中
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
//獲取讀鎖
if (tryAcquireShared(arg) < 0)
//讀鎖獲取失敗伯复,阻塞等待
doAcquireShared(arg);
}
關鍵的代碼在tryAcquireShared方法中
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//如果已經存在了寫鎖慨代,并且獲取寫鎖的不是當前線程,那么返回啸如,讀鎖獲取失敗侍匙。注意獲取寫鎖的是當前線程,是可以重入的(鎖降級)
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//獲取讀鎖數量(讀鎖是共享的叮雳,所以這個數量是所有線程讀鎖的累加和)想暗,位運算
int r = sharedCount(c);
//判斷讀鎖是否需要阻塞,注意公平鎖和非公平不一樣
if (!readerShouldBlock() &&
r < MAX_COUNT &&
//如果下面的CAS設置成功了债鸡,那么讀鎖也就獲取成功了
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
// 設置firstReader
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//同一個線程重入
firstReaderHoldCount++;
} else {
// 設置cachedHoldCounter江滨、readHolds
HoldCounter rh = cachedHoldCounter;// 最后一個成功獲取讀鎖的線程
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)// 為什么rh.count會等于0?當一個線程獲取讀鎖釋放厌均,再次獲取讀鎖唬滑,就是這種情況
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
readerShouldBlock的公平性和非公平性
- 非公平性的readerShouldBlock
對于非公平性的讀鎖,為了防止寫鎖餓死棺弊,需要判斷AQS隊列的第一個等待鎖的節(jié)點是不是寫鎖晶密,如果是寫鎖,那么讀鎖讓步模她;如果不是寫鎖稻艰,那么可以競爭。
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
- 公平性的readerShouldBlock
ReentrantLock中已經分析過這個方法侈净,不是AQS隊列中節(jié)點是讀鎖還是寫鎖都加到隊列末尾等待尊勿,完全公平僧凤。
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
fullTryAcquireShared的代碼和tryAcquireShared存在一定程度冗余,我個人覺得不要tryAcquireShared那一段代碼也是可以的元扔,這個如果哪位同學有更深的理解可以交流下躯保。總之澎语,fullTryAcquireShared是tryAcquireShared自旋重試的版本, 不再贅述
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
讀鎖釋放
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared比較簡單途事,主要就是修改firstReader、readHolds擅羞、state的值
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//釋放firstReader
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
喚醒后繼節(jié)點
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}