前面我們學(xué)習(xí)了AQS动壤,ReentrantLock等,現(xiàn)在來學(xué)習(xí)一下什么是讀寫鎖ReentrantReadWriteLock淮逻。
當(dāng)讀操作遠(yuǎn)遠(yuǎn)高于寫操作時(shí)琼懊,這時(shí)候可以使用【讀寫鎖】讓【讀-讀】可以并發(fā)阁簸,提高性能枣耀。
本文還是基于源碼的形式吆寨,希望同學(xué)們能夠以本文為思路,自己跟蹤源碼一步步的debug進(jìn)去托修,加深理解醉旦。
一饶米、初識(shí)ReentrantReadWriteLock
同樣的,先看下其類圖:
- 實(shí)現(xiàn)了讀寫鎖接口
ReadWriteLock
- 有5個(gè)內(nèi)部類车胡,與ReentrantLock相同的是
FairSync
檬输、NonfairSync
和Sync
,另外不同的是增加兩個(gè)內(nèi)部類吨拍,都實(shí)現(xiàn)了Lock接口:WriteLock
ReadLock
- Sync 增加了兩個(gè)內(nèi)部類 :
-
HoldCounter
:持有鎖的計(jì)數(shù)器 -
ThreadLocalHoldCounter
:維護(hù)HoldCounter的ThreadLocal
-
二褪猛、使用案例
通常會(huì)維護(hù)一個(gè)操作數(shù)據(jù)的容器類,內(nèi)部應(yīng)該封裝好數(shù)據(jù)的read和write方法羹饰,如下所示:
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @description: 數(shù)據(jù)容器類
* @author:weirx
* @date:2022/1/13 15:29
* @version:3.0
*/
public class DataContainer {
/**
* 初始化讀鎖和寫鎖
*/
private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
private ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
protected void read(){
readLock.lock();
try {
System.out.println("獲取讀鎖");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readLock.unlock();
System.out.println("釋放讀鎖");
}
}
protected void write(){
writeLock.lock();
try {
System.out.println("獲取寫鎖");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writeLock.unlock();
System.out.println("釋放寫鎖");
}
}
}
簡單測試一下伊滋,分為讀讀、讀寫队秩、寫寫笑旺。
- 讀讀:
public static void main(String[] args) {
//初始化數(shù)據(jù)容器
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.read();
}, "t2").start();
}
結(jié)果,讀讀不互斥馍资,同時(shí)獲取讀鎖筒主,同時(shí)釋放:
獲取讀鎖
獲取讀鎖
釋放讀鎖
釋放讀鎖
- 讀寫:
public static void main(String[] args) {
//初始化數(shù)據(jù)容器
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.write();
}, "t2").start();
}
結(jié)果,讀寫互斥,無論是先執(zhí)行read還是write方法鸟蟹,都會(huì)等到讀鎖或?qū)戞i被釋放之后乌妙,才會(huì)獲取下一把鎖:
獲取讀鎖 -- 第一個(gè)執(zhí)行
釋放讀鎖 -- 第二個(gè)執(zhí)行
獲取寫鎖 -- 第三個(gè)執(zhí)行
釋放寫鎖 -- 第四個(gè)執(zhí)行
- 寫寫:
public static void main(String[] args) {
//初始化數(shù)據(jù)容器
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.write();
}, "t1").start();
new Thread(() -> {
dataContainer.write();
}, "t2").start();
}
結(jié)果,寫寫互斥建钥,只有第一把寫鎖釋放后藤韵,才能獲取下一把寫鎖:
獲取寫鎖
釋放寫鎖
獲取寫鎖
釋放寫鎖
注意:
- 鎖重入時(shí),持有讀鎖再去獲取寫鎖熊经,會(huì)導(dǎo)致寫鎖一直等待
結(jié)果:不會(huì)釋放protected void read(){ readLock.lock(); try { System.out.println("獲取讀鎖"); TimeUnit.SECONDS.sleep(1); System.out.println("獲取寫鎖"); writeLock.lock(); } catch (InterruptedException e) { e.printStackTrace(); } finally { readLock.unlock(); System.out.println("釋放讀鎖"); } }
獲取讀鎖 獲取寫鎖
- 鎖重入時(shí)泽艘,持有寫鎖,可以再去獲取讀鎖镐依。
結(jié)果:protected void write(){ writeLock.lock(); try { System.out.println("獲取寫鎖"); TimeUnit.SECONDS.sleep(1); System.out.println("獲取讀鎖"); readLock.lock(); } catch (InterruptedException e) { e.printStackTrace(); } finally { writeLock.unlock(); System.out.println("釋放寫鎖"); } }
獲取寫鎖 獲取讀鎖 釋放寫鎖
三匹涮、源碼分析
我們根據(jù)前面的例子,從讀鎖的獲取到釋放槐壳,從寫鎖的獲取到釋放然低,依次查看源碼。
先注意一個(gè)事情,讀寫鎖是以不同的位數(shù)來區(qū)分獨(dú)占鎖和共享鎖的狀態(tài)的:
/*
* 讀和寫分為上行下兩個(gè)部分脚翘,低16位是獨(dú)占鎖狀態(tài)灼卢,高16位是共享鎖狀態(tài)
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 返回以count表示的共享持有數(shù) */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 返回以count表示的互斥保持?jǐn)?shù) */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
3.1 讀鎖分析
3.1.1 讀鎖獲取
從 readLock.lock(); 這里進(jìn)入分析過程:
/**
* 獲取讀鎖。
* 如果寫鎖沒有被另一個(gè)線程持有来农,則獲取讀鎖并立即返回鞋真。
* 如果寫鎖被另一個(gè)線程持有,那么當(dāng)前線程將被禁用以用于線程調(diào)度目的并處于休眠狀態(tài)沃于,直到獲得讀鎖為止
*/
public void lock() {
sync.acquireShared(1);
}
如上的lock方法涩咖,是ReentrantReadWriteLock子類ReadLock的方法,而acquireShared方法是在AQS的子類Syn當(dāng)中定義的繁莹,這個(gè)方法嘗試以共享的方式獲取讀鎖檩互,失敗則進(jìn)入等待隊(duì)列, 不斷重試咨演,直到獲取讀鎖為止闸昨。
public final void acquireShared(int arg) {
// 被其他線程持有的話,就走AQS的doAcquireShared
if (tryAcquireShared(arg) < 0)
// 獲取共享鎖薄风,失敗加入等待隊(duì)列饵较,不可中斷的獲取,直到獲取為止
doAcquireShared(arg);
}
tryAcquireShared是在ReentrantReadWriteLock當(dāng)中實(shí)現(xiàn)的遭赂,我們直接看代碼:
protected final int tryAcquireShared(int unused) {
// 獲取當(dāng)前線程
Thread current = Thread.currentThread();
// 獲取當(dāng)前鎖狀態(tài)
int c = getState();
// 獨(dú)占鎖統(tǒng)計(jì)不等于0 且 持有者不是當(dāng)前線程循诉,就返回 -1 ,換句話說撇他,被其他線程持有
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 共享鎖數(shù)量
int r = sharedCount(c);
// 返回fase才有資格獲取讀鎖
if (!readerShouldBlock() &&
// 持有數(shù)小于默認(rèn)值
r < MAX_COUNT &&
// CAS 設(shè)置鎖狀態(tài)
compareAndSetState(c, c + SHARED_UNIT)) {
// 持有共享鎖為0
if (r == 0) {
// 第一個(gè)持有者是當(dāng)前線程
firstReader = current;
// 持有總數(shù)是 1
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 持有鎖的是當(dāng)前線程本身茄猫,就把技術(shù) + 1
firstReaderHoldCount++;
} else {
// 獲取緩存計(jì)數(shù)
HoldCounter rh = cachedHoldCounter;
// 如果是null 或者 持有線程的id不是當(dāng)前線程
if (rh == null || rh.tid != getThreadId(current))
// 賦值給緩存
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// rh不是null ,且是當(dāng)前線程困肩,就把讀鎖持有者設(shè)為緩存中的值
readHolds.set(rh);
// 將其 + 1
rh.count++;
}
return 1;
}
// 想要獲取讀鎖的線程應(yīng)該被阻塞划纽,保底工作,處理 CAS 未命中和在 tryAcquireShared 中未處理的重入讀取
return fullTryAcquireShared(current);
}
從上面的源碼我們可以看得出來锌畸,寫鎖和讀鎖之間是互斥的阿浓。
3.1.2 讀鎖釋放
直接看關(guān)鍵部分
/**
* 以共享模式釋放鎖,tryReleaseShared返回true蹋绽,則釋放
*/
public final boolean releaseShared(int arg) {
// 釋放鎖
if (tryReleaseShared(arg)) {
// 喚醒隊(duì)列的下一個(gè)線程
doReleaseShared();
return true;
}
return false;
}
看看讀寫鎖的tryReleaseShared實(shí)現(xiàn):
protected final boolean tryReleaseShared(int unused) {
//。筋蓖。卸耘。省略。粘咖。蚣抗。
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// 讀鎖的計(jì)數(shù)不會(huì)影響其它獲取讀鎖線程, 但會(huì)影響其它獲取寫鎖線程
// 計(jì)數(shù)為 0 才是真正釋放
return nextc == 0;
}
}
如果上述方法釋放成功,則走下面AQS繼承來的方法:
private void doReleaseShared() {
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一個(gè)節(jié)點(diǎn) unpark
// 如果 head.waitStatus == 0 ==> Node.PROPAGATE
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果有其它線程也在釋放讀鎖,那么需要將 waitStatus 先改為 0
// 防止 unparkSuccessor 被多次執(zhí)行
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 如果已經(jīng)是 0 了翰铡,改為 -3钝域,用來解決傳播性
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
3.2 寫鎖分析
3.2.1 獲取鎖
public final void acquire(int arg) {
// 嘗試獲得寫鎖失敗
if (!tryAcquire(arg) &&
// 將當(dāng)前線程關(guān)聯(lián)到一個(gè) Node 對(duì)象上, 模式為獨(dú)占模式
// 進(jìn)入 AQS 隊(duì)列阻塞
acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) {
selfInterrupt();
}
}
讀寫鎖的上鎖方法:tryAcquire
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
// 獲得低 16 位, 代表寫鎖的 state 計(jì)數(shù)
int w = exclusiveCount(c);
if (c != 0) {
// 如果寫鎖是0 或者 當(dāng)前線程不等于獨(dú)占線程,獲取失敗
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 寫鎖計(jì)數(shù)超過低 16 位, 報(bào)異常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 寫鎖重入, 獲得鎖成功
setState(c + acquires);
return true;
}
// 寫鎖應(yīng)該阻塞
if (writerShouldBlock() ||
// 更改計(jì)數(shù)失敗
!compareAndSetState(c, c + acquires))
// 獲取鎖失敗
return false;
// 設(shè)置當(dāng)前線程獨(dú)占鎖
setExclusiveOwnerThread(current);
return true;
}
3.2.2 釋放鎖
release:
public final boolean release(int arg) {
// 嘗試釋放寫鎖成功
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease:
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// 因?yàn)榭芍厝氲脑? 寫鎖計(jì)數(shù)為 0, 才算釋放成功
boolean free = exclusiveCount(nextc) == 0;
if (free) {
setExclusiveOwnerThread(null);
}
setState(nextc);
return free;
}