所謂讀寫鎖,即允許讀線程之前同時(shí)訪問(共享鎖)锈拨,讀和寫砌庄,以及寫和寫線程之間不能同時(shí)訪問(排它鎖)。JDK提供了ReentrantReadWriteLock實(shí)現(xiàn)讀寫線程的控制奕枢,可重入娄昆。
private Map<String, Object> map = new HashMap<>();
private ReadWriteLock rwl = new ReentrantReadWriteLock();
private Lock r = rwl.readLock();
private Lock w = rwl.writeLock();
public Object get(String key) {
r.lock();
System.out.println(Thread.currentThread().getName() + " 讀操作在執(zhí)行..");
try {
return map.get(key);
} finally {
r.unlock();
System.out.println(Thread.currentThread().getName() + " 讀操執(zhí)行完畢..");
}
}
public void put(String key, Object value) {
w.lock();
System.out.println(Thread.currentThread().getName() + " 寫操作在執(zhí)行..");
try {
map.put(key, value);
} finally {
w.unlock();
System.out.println(Thread.currentThread().getName() + " 寫操作執(zhí)行完畢..");
}
}
那么,接下來缝彬,我們分析下ReentrantReadWriteLock 源代碼萌焰,背后其實(shí)也借助了AQS,
其中谷浅,NonfairSync和FairSync是Sync類的兩種實(shí)現(xiàn)扒俯,分別定義了非公平的sync和公平的sync,其功能和Reentrantlock的一樣一疯,只是判斷的方式不同撼玄。
另外,定義了ReadLock和WriteLock兩個(gè)內(nèi)部類墩邀,畢竟讀和寫的邏輯不同掌猛,用兩個(gè)不同的類分開實(shí)現(xiàn)功能,二者實(shí)現(xiàn)接口Lock眉睹,
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
但是本質(zhì)上荔茬,每個(gè)方法都是借助sync對(duì)象完成操作废膘,我們重點(diǎn)關(guān)注如下四個(gè)方法,
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquireShared(1);
}
public void unlock() {
sync.releaseShared(1);
}
}
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
}
首先分析writelock的獲鎖和釋放鎖過程
寫鎖的acquire和release
和Reentrantlock的源碼過程一致慕蔚,調(diào)用sync的acquire和release方法丐黄,而這兩個(gè)方法,會(huì)調(diào)用sync實(shí)現(xiàn)者的tryAcquire和tryRelease方法坊萝,
寫鎖是一個(gè)互斥的可重入鎖孵稽,這點(diǎn)和Reentrantlock鎖一樣许起,因此十偶,實(shí)現(xiàn)方式相同,獲得鎖的時(shí)候园细,如果沒有g(shù)et到惦积,則進(jìn)入阻塞隊(duì)列。在tryAcquire中猛频,如果目前沒有鎖狮崩,需要去競爭,此時(shí)鹿寻,根據(jù)公平和非公平睦柴,對(duì)于寫鎖,有不同的判斷毡熏,通過NonfairSync類和FairSync類的定義可以看到坦敌,
static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
static final class FairSync extends Sync {
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
對(duì)于,writerShouldBlock方法而言痢法,在NonfairSync 直接返回false狱窘,不需要阻塞寫線程,而在FairSync中财搁,需要根據(jù)hasQueuedPredecessors函數(shù)返回值來判斷是否阻塞寫線程蘸炸,hasQueuedPredecessors在之前的文章中介紹過,當(dāng)前線程不為空尖奔,并且阻塞的線程不是當(dāng)前線程搭儒,返回true,即需要阻塞提茁,反之不需要阻塞淹禾。
接下來,看下甘凭,tryAcquire方法(不能無限循環(huán)稀拐,一次性給出false還是true,阻塞線程由aqs的acquire方法進(jìn)行)
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread(); //獲得當(dāng)前線程
int c = getState(); //拿到aqs的int state
int w = exclusiveCount(c); //獲得排他鎖的占用個(gè)數(shù)丹弱,
if (c != 0) { //當(dāng)前有鎖存在德撬,無論是讀鎖铲咨,還是寫鎖
if (w == 0 || current != getExclusiveOwnerThread()) //沒有寫鎖,即蜓洪,有讀鎖纤勒,或者,不是當(dāng)前正在獲得鎖的線程隆檀,
//返回false摇天,阻塞當(dāng)前線程。
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires); //否則恐仑,有寫鎖并且當(dāng)前線程就是鎖的持有這泉坐,可重入,更新aqs的state
return true;
}
if (writerShouldBlock() || // 如果當(dāng)前沒有任何鎖裳仆,則需要公平鎖和非公平鎖來判斷是否阻塞線程
!compareAndSetState(c, c + acquires))
return false; //如果需要阻塞或者不需要阻塞但是cas加鎖失敗腕让,則返回false,有aqs加入到隊(duì)列并循環(huán)等待
setExclusiveOwnerThread(current); //如果不需要阻塞并且也cas成功加到鎖歧斟,則這里設(shè)置寫鎖持有線程纯丸。
return true;
}
這里,使用了一個(gè)int state静袖,來表示兩個(gè)不同的鎖觉鼻,對(duì)應(yīng)的加鎖次數(shù),怎么做到队橙?將int的字節(jié)分成高16和低16位坠陈,分別表示,讀鎖和寫鎖喘帚,那么針對(duì)這個(gè)state的獲取讀和寫鎖不同的部分畅姊,就是bit操作了。
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int sharedCount(int c) { return c >>> SHARED_SHIFT; } //從state中獲得讀鎖個(gè)數(shù)
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }//從state中獲得寫鎖個(gè)數(shù)
看下吹由,tryRelease方法(不能無限循環(huán)若未,一次性給出false還是true,喚醒其他線程由aqs的release進(jìn)行)倾鲫,
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively()) //非鎖持有線程不能做粗合,當(dāng)前寫鎖只有一個(gè)線程獲得(互斥鎖),沒有共享問題
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null); // 清空寫鎖的持有線程
setState(nextc);
return free;
}
讀鎖的acquire和release
從上一節(jié)的過程看到乌昔,讀鎖在判斷是否阻塞的時(shí)候隙疚,非公平的NofairSync,出現(xiàn)新的判斷機(jī)制磕道,參考方法供屉,apparentlyFirstQueuedIsExclusive()
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null && // 有頭
(s = h.next) != null && // 且有尾,
!s.isShared() && //第一個(gè)節(jié)點(diǎn)不是共享線程,即伶丐,是寫線程悼做,即,有寫線程阻塞著呢
s.thread != null; //線程存在
}
言外之意哗魂,雖然是非公平鎖肛走,讀線程可以隨意加鎖不阻塞,但是呢录别,當(dāng)有寫線程正在阻塞的時(shí)候呢朽色,讀線程還是等等吧,先別讀组题,直接阻塞進(jìn)入隊(duì)列葫男,等待阻塞的寫線程執(zhí)行完畢(被喚醒后執(zhí)行),在被喚醒執(zhí)行往踢。
OK腾誉,接下來分析讀鎖的lock和unlock過程,對(duì)應(yīng)sync方法的acquireShared和releaseShared方法峻呕。這里,同aqs的acquire和release過程一樣趣效,具體細(xì)節(jié)如下瘦癌,
// aqs的acquireShared方法
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
//調(diào)用實(shí)現(xiàn)者的tryAcquireShared方法,如果小于零跷敬,1.表示需要阻塞讀線程讯私,則調(diào)用doAcquireShared加入到阻塞隊(duì)列。
doAcquireShared(arg);
}
//和acquire基本相同
//2. 因?yàn)橐枞骺砸ㄟ^addWaiter加入到隊(duì)列的尾部
//3. 進(jìn)入循環(huán)斤寇,直接隊(duì)列中前一個(gè)節(jié)點(diǎn)是head,即拥褂,被中斷喚醒娘锁,需要執(zhí)行,調(diào)整隊(duì)列饺鹃,head后移
//4. 循環(huán)過程中莫秆,前一個(gè)節(jié)點(diǎn)不是head,則繼續(xù)阻塞悔详,同時(shí)镊屎,梳理下隊(duì)列。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 加入到隊(duì)列的尾部茄螃,注意shared狀態(tài)節(jié)點(diǎn)
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //不斷判斷前一個(gè)節(jié)點(diǎn)
if (p == head) { // 如果是head缝驳,當(dāng)前節(jié)點(diǎn)對(duì)應(yīng)的線程被喚醒,
int r = tryAcquireShared(arg); // 獲得一個(gè)讀鎖。
if (r >= 0) { // 如果獲得成功
setHeadAndPropagate(node, r); // 設(shè)置隊(duì)列的head用狱,同時(shí)萎庭,propagate隊(duì)列
p.next = null; // help GC
if (interrupted)
selfInterrupt(); //自我中斷
failed = false;
return;
}
}
//在shouldParkAfterFailedAcquire中繼續(xù)查看前一個(gè)節(jié)點(diǎn)的狀態(tài)
//shouldParkAfterFailedAcquire如果前一個(gè)節(jié)點(diǎn)不是signal且無效狀態(tài)的話清理返回false,如果有
//如果有效狀態(tài)齿拂,將其設(shè)置為signal等待下次循環(huán)中被調(diào)用驳规,如果前一個(gè)節(jié)點(diǎn)是signal,返回true
if (shouldParkAfterFailedAcquire(p, node) && // 返回true署海,則需要進(jìn)行阻塞
parkAndCheckInterrupt()) // 如果需要阻塞吗购,則調(diào)用LockSupport進(jìn)行阻塞(線程停止運(yùn)行,等待中斷后砸狞,再繼續(xù)循環(huán))
interrupted = true;
//被中斷喚醒捻勉, 雖然被喚醒,但是前一個(gè)節(jié)點(diǎn)是head的才能有機(jī)會(huì)獲得鎖刀森,
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//如果執(zhí)行到這踱启,說明喚醒了一個(gè)讀線程,此時(shí)研底,是因?yàn)橹坝袑懢€程阻塞埠偿,為了優(yōu)先寫線程執(zhí)行,讀線程
//被阻塞榜晦,待到寫線程被喚醒后執(zhí)行完畢冠蒋,這里的讀線程被喚醒,而該讀線程后面乾胶,還有更多等待的讀線程抖剿,
//都可以被喚醒,所以這叫做propagete傳播喚醒识窿。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
// propagate也就是state的更新值大于0斩郎,代表可以繼續(xù)acquire
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 判斷后繼節(jié)點(diǎn)是否存在,如果存在 且是共享鎖喻频,即缩宜,讀線程。
// 然后進(jìn)行共享模式的釋放
if (s == null || s.isShared())
doReleaseShared();
}
}
/**
一直循環(huán)半抱,直到隊(duì)列為空脓恕,做一件事情,如果節(jié)點(diǎn)是signal窿侈,則cas設(shè)置狀態(tài)為0炼幔,成功后,喚醒節(jié)點(diǎn)對(duì)應(yīng)
線程史简,否則乃秀,如果節(jié)點(diǎn)狀態(tài)是0肛著,表示位置狀態(tài),cas設(shè)置為Propagate狀態(tài)(-3)跺讯,成功后枢贿,繼續(xù)下一次循環(huán)
**/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { //可以喚醒,
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //設(shè)置狀態(tài)為0
continue;
unparkSuccessor(h); // 中斷阻塞的線程 (喚醒)
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 調(diào)整節(jié)點(diǎn)的狀態(tài)
continue;
}
if (h == head) // 隊(duì)列為空的時(shí)候退出循環(huán)刀脏,如果不為空局荚,則一直在propagation
break;
}
}
doReleaseShared的作用,就是找到隊(duì)列中狀態(tài)為signal的節(jié)點(diǎn)愈污,并將其喚醒耀态,不進(jìn)行隊(duì)列的調(diào)整,喚醒后暂雹,狀態(tài)變成0首装,之后又變成-3。所以經(jīng)過doReleaseShared之后杭跪,原本為signal的線程被喚醒仙逻,隊(duì)列中所有節(jié)點(diǎn)狀態(tài)變成-3,隊(duì)列長隊(duì)沒有變涧尿。
shouldParkAfterFailedAcquire在判斷當(dāng)前節(jié)點(diǎn)前節(jié)點(diǎn)的狀態(tài)系奉,如果不是signal且有效(0,或者-3)现斋,則設(shè)置為signal喜最,因此,即使doReleaseShared將節(jié)點(diǎn)設(shè)置了-3庄蹋,shouldParkAfterFailedAcquire也會(huì)將其設(shè)置為-1 signal。
隊(duì)列的長度發(fā)生變化迷雪,在于parkAndCheckInterrupt()函數(shù)被中斷喚醒后限书,此時(shí),多個(gè)線程可能被喚醒章咧,都在此函數(shù)后繼續(xù)執(zhí)行倦西,參考,acquireShared函數(shù)赁严,雖然喚醒了多個(gè)線程扰柠,但是,只有前置節(jié)點(diǎn)是head節(jié)點(diǎn)疼约,才能獲取鎖后卤档,調(diào)整隊(duì)列長度,其他線程可能又會(huì)進(jìn)入到阻塞程剥,不過沒關(guān)系劝枣,shouldParkAfterFailedAcquire進(jìn)會(huì)將前一個(gè)節(jié)點(diǎn)設(shè)置為signal,因此,doReleaseShared中舔腾,依舊會(huì)喚醒線程溪胶。
例如,四個(gè)阻塞節(jié)點(diǎn)
- -1稳诚,-1哗脖,-1,-1 (初始扳还,都阻塞)
- -3才避,-3,-3普办,-3 (doReleaseShared全部是喚醒工扎,且狀態(tài)都變成-3,也可以還有0衔蹲,)
- -1肢娘,-1,-1 (第一個(gè)節(jié)點(diǎn)成功獲得鎖舆驶,調(diào)整隊(duì)列橱健,其他線程循環(huán)后,shouldParkAfterFailedAcquire將-3變成了-1沙廉,后拘荡,阻塞自己)
- 回到第一步,如此反復(fù)執(zhí)行撬陵,直到隊(duì)列為空珊皿。
接下來,看下releaseShared方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
可以看到使用了doReleaseShared巨税,批量喚醒多個(gè)等待的讀線程(也可能有寫線程)蟋定。這里也會(huì)調(diào)用aqs的實(shí)現(xiàn)類的tryReleaseShared方法。
最后草添,統(tǒng)一看下aqs實(shí)現(xiàn)類讀鎖的tryAcquireShared和tryReleaseShared方法驶兜,
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current) //如果有寫鎖,并且寫鎖的線程持有者不是當(dāng)前線程
return -1; //則返回-1远寸,獲得鎖失敗
int r = sharedCount(c); //獲取讀鎖的個(gè)數(shù)
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
//如果不需要阻塞讀線程抄淑,且,成功設(shè)置了state中讀鎖的個(gè)數(shù)位驰后,表明獲得到了讀鎖肆资。
if (r == 0) { //單獨(dú)考慮第一個(gè)獲得讀鎖的線程,程序執(zhí)行效率優(yōu)化目的
firstReader = current;
firstReaderHoldCount = 1; // 效率優(yōu)化
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
//cachedHoldCounter效率優(yōu)化倡怎,緩存上一次的HoldCounter
if (rh == null || rh.tid != getThreadId(current))
//如果上次緩存的holder不是當(dāng)前線程迅耘,則沖LocalThread(readHolds封裝了LocalThread)中重新獲取贱枣。
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0) //如果是自己的,當(dāng)時(shí)count=0颤专,則再次加入到LocalThread中纽哥,
readHolds.set(rh); //這里為何要重放一次?栖秕?春塌?。
rh.count++; //累加計(jì)數(shù)
}
return 1;
}
return fullTryAcquireShared(current);
}
tryAcquireShared只有一次if簇捍,對(duì)于阻塞或者獲取鎖失敗的讀線程如何處理只壳,參考fullTryAcquireShared。
/**
循環(huán)的在做tryAcquireShared相同的工作暑塑。
*/
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1; //有寫鎖吼句,且當(dāng)前線程不是寫鎖的持有者
} else if (readerShouldBlock()) { //當(dāng)度處理,如果是需要阻塞線程怎么辦
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter; //效率優(yōu)化
if (rh == null || rh.tid != getThreadId(current)) { //不是自己的HoldCounter
rh = readHolds.get(); //拿出自己的事格,
if (rh.count == 0) //之前沒有過任何重入惕艳,則,刪除ThreadLocal中的記錄
readHolds.remove();
}
}
if (rh.count == 0) //返回阻塞
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//執(zhí)行到這驹愚,說明远搪,沒有寫線程,且當(dāng)前讀線程不能阻塞(之前)逢捺,要繼續(xù)競爭鎖谁鳍,
//tryAcquireShared中只進(jìn)行一次獲得鎖,只能一個(gè)線程成功劫瞳,其他線程在這里繼續(xù)競爭倘潜。
if (compareAndSetState(c, c + SHARED_UNIT)) {//獲得到讀鎖
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1; //效率優(yōu)化
} else if (firstReader == current) {
firstReaderHoldCount++; //效率優(yōu)化
} else {
if (rh == null)
rh = cachedHoldCounter; //效率優(yōu)化
if (rh == null || rh.tid != getThreadId(current)) // 不是自己的Holder
rh = readHolds.get(); //拿自己的
else if (rh.count == 0) //
readHolds.set(rh); //為何重放?難道是因?yàn)樯厦娴膔emove志于?
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
fullTryAcquireShared作用窍荧,其實(shí)就讓多個(gè)讀線程,cas更新state中讀鎖的個(gè)數(shù)恨憎。接下來,看下
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
//調(diào)整holdcounter郊楣,如果重入次數(shù)減為1憔恳,則刪除ThreadLocal中的記錄。
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) { //cas更新讀鎖的個(gè)數(shù)
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0; //當(dāng)最后一個(gè)讀線程釋放鎖净蚤,返回true钥组,其余返回false。
}
}