前言
上一篇中已經(jīng)分析了ReentrantLock,下面我們來(lái)看一下讀寫(xiě)鎖ReentrantReadWriteLock趋厉。
在這之前,先看一下其結(jié)構(gòu)圖:
ReadLock/WriteLock
- ReadLock
//使用ReentrantReadWriteLock的Sync對(duì)象
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquireShared(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean tryLock() {
return sync.tryReadLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.releaseShared(1);
}
//不支持條件變量
public Condition newCondition() {
throw new UnsupportedOperationException();
}
public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
- WriteLock
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquire(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock( ) {
return sync.tryWriteLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public int getHoldCount() {
return sync.getWriteHoldCount();
}
WriteLock的代碼胶坠,類似ReadLock的代碼君账,差別在于獨(dú)占式獲取同步狀態(tài)。
Sync抽象類
Sync是ReentrantReadWriteLock的靜態(tài)內(nèi)部類沈善,繼承自AbstractQueuedSynchronizer的抽象類乡数。它使用AQS的state字來(lái)表示當(dāng)前鎖的持有數(shù)量,其中state的高16位表示讀狀態(tài)闻牡,即獲取該讀鎖的次數(shù)净赴,低16位表示獲取到寫(xiě)鎖的線程的可重入次數(shù)。
static final int SHARED_SHIFT = 16; // 位數(shù)
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 每個(gè)鎖的最大重入次數(shù)罩润,65535
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//寫(xiě)鎖的標(biāo)記玖翅,用來(lái)計(jì)算寫(xiě)鎖的重入次數(shù)
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
//第一個(gè)獲得讀鎖的線程
private transient Thread firstReader = null;
//firstReader的重入次數(shù)
private transient int firstReaderHoldCount;
//當(dāng)前線程持有的可重入數(shù)量
private transient ThreadLocalHoldCounter readHolds;
//成功獲取讀鎖的的最后一個(gè)線程的計(jì)數(shù)器
private transient HoldCounter cachedHoldCounter;
/**
* 計(jì)數(shù)器,主要存儲(chǔ)線程id和重入次數(shù)
**/
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
// 避免 HoldCounter 和 ThreadLocal 互相綁定而導(dǎo)致 GC 難以釋放它們
final long tid = getThreadId(Thread.currentThread());
}
static final long getThreadId(Thread thread) {
return UNSAFE.getLongVolatile(thread, TID_OFFSET);
}
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
Sync() {
//ThreadLocal子類割以,表示當(dāng)前線程持有的可重入讀鎖的數(shù)量
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
abstract boolean readerShouldBlock();
abstract boolean writerShouldBlock();
//寫(xiě)鎖釋放
protected final boolean tryRelease(int releases) {
//1. 如果釋放的線程不為鎖的持有者金度,直接拋出異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//2. 寫(xiě)鎖的重入次數(shù),如果為0严沥,釋放持有的線程
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
//寫(xiě)鎖獲取猜极。針對(duì)寫(xiě)鎖,因?yàn)橐_保寫(xiě)鎖的操作對(duì)讀鎖是可見(jiàn)的消玄。
//如果在存在讀鎖的情況下允許獲取寫(xiě)鎖跟伏,那么那些已經(jīng)獲取讀鎖的其他線程可能就無(wú)法感知當(dāng)前寫(xiě)線程的操作丢胚。
//因此只有等讀鎖完全釋放后,寫(xiě)鎖才能夠被當(dāng)前線程所獲取受扳,一旦寫(xiě)鎖獲取了携龟,所有其他讀、寫(xiě)線程均會(huì)被阻塞勘高。
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
//1. 計(jì)算寫(xiě)鎖的重入次數(shù)
int c = getState();
int w = exclusiveCount(c);
//2. 如果存在鎖
if (c != 0) {
//w == 0 表示僅存在讀鎖不存在寫(xiě)鎖
//2.1 如果只存在讀鎖或者存在寫(xiě)鎖但是當(dāng)前線程不是已經(jīng)獲取寫(xiě)鎖的線程骨宠,直接返回失敗
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//2.2. 如果寫(xiě)鎖可重入次數(shù)超出最大范圍,拋出異常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//2.3. 設(shè)置狀態(tài)相满,獲取成功
setState(c + acquires);
return true;
}
//3. 使用CAS設(shè)置狀態(tài)层亿,如果失敗返回失敗(#writerShouldBlock()在非公平鎖下永遠(yuǎn)是false,
//在公平鎖下需要判斷是否有前驅(qū)節(jié)點(diǎn)立美,上一篇中提到過(guò) )
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
//讀鎖釋放
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//1. 如果想要釋放鎖的線程為第一個(gè)獲取讀鎖的線程
if (firstReader == current) {
// 將計(jì)數(shù)器至空或者減1
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
//2. 判斷最后一次獲取到讀鎖的線程是否是當(dāng)前線程匿又,如果不是從readholds中獲取
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//3. 自旋,設(shè)置狀態(tài)
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
private IllegalMonitorStateException unmatchedUnlockException() {
return new IllegalMonitorStateException(
"attempt to unlock read lock, not locked by current thread");
}
//讀鎖獲取
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//1. 如果存在寫(xiě)鎖建蹄,且鎖的持有者不是當(dāng)前線程碌更,直接返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//2. 獲取讀鎖重入次數(shù)
int r = sharedCount(c);
/*
* readerShouldBlock():讀鎖是否需要等待(公平鎖原則)
* r < MAX_COUNT:持有線程小于最大數(shù)(65535)
* compareAndSetState(c, c + SHARED_UNIT):設(shè)置讀取鎖狀態(tài)
*/
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//3. 如果重入次數(shù)為0,firstReader = current洞慎;如果firstReader == current痛单, firstReaderHoldCount++;
//否則從緩存中獲取
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
//獲取讀鎖的完整版本,處理CAS未命中和tryAcquireShared中未處理的重入讀取
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
//有寫(xiě)鎖的前提下劲腿,線程持有者不是當(dāng)前線程
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
// 讀鎖需要阻塞旭绒,判斷是否當(dāng)前線程已經(jīng)獲取到讀鎖
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
//讀鎖超出最大范圍
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//CAS設(shè)置讀鎖成功
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
//嘗試獲取寫(xiě)鎖
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
//寫(xiě)鎖的數(shù)量
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
//嘗試獲取讀鎖
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
final Thread getOwner() {
return ((exclusiveCount(getState()) == 0) ?
null :
getExclusiveOwnerThread());
}
final int getReadLockCount() {
return sharedCount(getState());
}
final boolean isWriteLocked() {
return exclusiveCount(getState()) != 0;
}
final int getWriteHoldCount() {
return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}
final int getReadHoldCount() {
if (getReadLockCount() == 0)
return 0;
Thread current = Thread.currentThread();
if (firstReader == current)
return firstReaderHoldCount;
HoldCounter rh = cachedHoldCounter;
if (rh != null && rh.tid == getThreadId(current))
return rh.count;
int count = readHolds.get().count;
if (count == 0) readHolds.remove();
return count;
}
//自定義序列化邏輯
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
readHolds = new ThreadLocalHoldCounter();
setState(0); // reset to unlocked state
}
final int getCount() { return getState(); }
Sync的實(shí)現(xiàn)類
- NofairSync實(shí)現(xiàn)類
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
//頭節(jié)點(diǎn)不為空且下一個(gè)節(jié)點(diǎn)也不為空且是獨(dú)占鎖且下一個(gè)節(jié)點(diǎn)的線程也不為空
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
由于寫(xiě)鎖是獨(dú)占排它鎖,所以在非公平鎖的情況下焦人,需要調(diào)用 AQS 的 #apparentlyFirstQueuedIsExclusive()方法挥吵,判斷是否當(dāng)前寫(xiě)鎖已經(jīng)被獲取。
- FairSync實(shí)現(xiàn)類
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
ReentrantReadWriteLock
- 構(gòu)造方法
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
ReentrantReadWriteLock的默認(rèn)構(gòu)造方法初始化了非公平鎖的實(shí)現(xiàn)花椭,同時(shí)也初始化了readerLock忽匈,writerLock。
總結(jié)
1. Java中tid指的是什么矿辽?
線程的tid丹允,這里不直接使用#Thread.getId()是因?yàn)?getId()方法不是final,容易被子類覆蓋袋倔。
2. 什么是鎖降級(jí)雕蔽?
鎖降級(jí)就意味著寫(xiě)鎖是可以降級(jí)為讀鎖的,但是需要遵循先獲取寫(xiě)鎖奕污、獲取讀鎖再釋放寫(xiě)鎖的次序萎羔。注意如果當(dāng)前線程先獲取寫(xiě)鎖液走,然后釋放寫(xiě)鎖碳默,再獲取讀鎖這個(gè)過(guò)程不能稱之為鎖降級(jí)贾陷,鎖降級(jí)一定要遵循那個(gè)次序。
鎖降級(jí)中讀鎖的獲取釋放為必要嘱根?
肯定是必要的髓废。假如當(dāng)前線程 A 不獲取讀鎖而是直接釋放了寫(xiě)鎖,這個(gè)時(shí)候另外一個(gè)線程 B 獲取了寫(xiě)鎖该抒,那么這個(gè)線程 B 對(duì)數(shù)據(jù)的修改是不會(huì)對(duì)當(dāng)前線程 A 可見(jiàn)的慌洪。如果獲取了讀鎖,則線程B在獲取寫(xiě)鎖過(guò)程中判斷如果有讀鎖還沒(méi)有釋放則會(huì)被阻塞凑保,只有當(dāng)前線程 A 釋放讀鎖后冈爹,線程 B 才會(huì)獲取寫(xiě)鎖成功。