在使用synchronize關(guān)鍵字修飾方法后潦嘶,只允許一個線程進行訪問贰锁,這個雖然有利于保證數(shù)據(jù)安全巷屿,卻實際場景背道而馳的固以。實際中數(shù)據(jù)都是讀取多,寫入少攒庵,我們需要更粗細粒的并發(fā)鎖嘴纺。JVM concurrent.locks包給我們提供ReadWriteLock讀寫鎖,內(nèi)置兩把鎖浓冒,讀鎖栽渴、寫鎖,滿足多個線程并發(fā)讀取數(shù)據(jù)稳懒,寫入時互斥所有線程闲擦,既保證了數(shù)據(jù)安全,又提升了響應(yīng)量场梆。
概念
讀鎖: 可以理解成共享鎖墅冷,允許多個線程同時讀取
寫鎖: 獨占鎖,有且只允許一個線程訪問
讀寫互斥: 在獲取寫鎖時或油,必須等待所有讀鎖全部釋放寞忿,才能獲取成功,讀鎖會堵塞寫鎖顶岸,寫鎖會堵塞所有的線程腔彰。
鎖升級: 在使用讀鎖時,已經(jīng)獲取讀鎖線程在沒有釋放讀鎖的情況下辖佣,去獲取寫鎖這就是鎖升級霹抛。這是不被允許的,鎖升級會造成死鎖卷谈。
// 這個會造成死鎖
ReadWriteLock lock = new ReentrantReadWriteLock();
lock.readLock().lock();
lock.writeLock().lock();
鎖降級: 已經(jīng)獲取到寫鎖線程杯拐,被允許在沒有釋放鎖的情況下去獲取讀鎖的,值得注意讀鎖世蔗、寫鎖仍然需要單獨釋放端逼。
//并不會造成死鎖
ReadWriteLock lock = new ReentrantReadWriteLock();
lock.writeLock().lock();
lock.readLock().lock();
使用官方例子演示ReentrantReadWriteLock 使用場景,每次獲取緩存時凸郑,先判斷緩存是否已經(jīng)失效了裳食,如果失效了使用寫鎖更新緩存。
class CachedData {
Object data;
volatile boolean cacheValid; //緩存失效標記
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// 必須釋放了讀鎖才能去獲取寫鎖芙沥,這樣不會造成死鎖
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// 雙重檢查狀態(tài),因為在獲取鎖的可能被其他線程更新狀態(tài)了
// 獲取到寫鎖,更新緩存和狀態(tài)
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 通過在釋放寫鎖之前獲取讀鎖來降級
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock();
// Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
代碼很少而昨,但是非常有代表性救氯,非常適合緩存這種讀取多,更新少的場景歌憨。在每次讀取緩存時着憨,先開啟讀鎖,檢查緩存狀況务嫡,需要更新緩存時甲抖。先釋放讀鎖然后再去獲取寫鎖,在更新前先判斷緩存又沒被其他線程更新過了心铃,更新完數(shù)據(jù)后降級到讀鎖准谚,再釋放寫鎖,使用緩存釋放讀鎖去扣。
源碼解析
這里源碼分析只有簡單講解兩個鎖的獲取柱衔、釋放原理,看閱讀源碼之前愉棱,自備AQS的知識點唆铐。
ReentrantReadWriteLock是實現(xiàn)ReadWriteLock接口的實現(xiàn)類,內(nèi)部使用AQS的int state
來表示讀寫鎖的狀態(tài)
如上圖所示奔滑,兩個鎖的獲取艾岂、釋放都是同時使用
int state
來進行,使用低16位表示寫鎖獲取次數(shù)朋其、高16位表示讀鎖獲取次數(shù)王浴。使用內(nèi)部類Sync 單獨編寫共享鎖、獨占鎖的獲取釋放具體實現(xiàn)令宿,再使用ReadLock叼耙、WriteLock分別調(diào)用共享鎖、獨占鎖的方法粒没。源碼閱讀先從Sync內(nèi)部類開始筛婉。
內(nèi)部屬性
abstract static class Sync extends AbstractQueuedSynchronizer {
//讀 寫 鎖分界點
static final int SHARED_SHIFT = 16;
//讀鎖最小單位,剛好表示當前擁有一個讀鎖線程
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 支持最大讀取次數(shù)
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
//寫鎖掩碼
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 計算當前獲取共享鎖數(shù)量 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 計算當前獲取獨占鎖數(shù)量 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
//主要用于保存每一個讀鎖線程的重入次數(shù)
static final class HoldCounter { //初始化對象癞松,就將當前線程id賦值給tid
int count = 0; //重入次數(shù)
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
/**
* 保存HoldCounter到每一個線程私有棧禎
*/
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() { //實現(xiàn)初始化接口爽撒,每一次調(diào)用get()時,沒有值就會調(diào)用初始化方法
return new HoldCounter();
}
}
/**
* 記錄讀鎖重入次數(shù)
*/
private transient ThreadLocalHoldCounter readHolds;
/**
* 這個是上一個讀鎖HoldCounter 緩存
*/
private transient HoldCounter cachedHoldCounter;
我這里認為讀鎖做一個共享鎖在重入次數(shù)上响蓉,state不能準確表達出每一個線程到底重入了多少次硕勿,所以需要用到HoldCounter來記錄每一個線程獲取鎖次數(shù),在釋放鎖的時候枫甲,會看下如何使用的源武。
共享鎖的獲取和釋放
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 當獨占鎖不等于0扼褪,這時只有獨占鎖是自身的情況下才能獲取到讀鎖
//兩個條件都滿足時,寫鎖獲取到讀鎖 鎖降級
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c); //持有共享鎖數(shù)
if (!readerShouldBlock() &&
r < MAX_COUNT &&
//這里只對高位進行累加,設(shè)置成功就相當于獲取鎖成功了
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) { //首次加鎖 firstReader 必須是讀鎖線程
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) { //重入
firstReaderHoldCount++;
} else { //當前線程不是首個讀鎖持有者粱栖,要使用HoldCounter 記錄重入
HoldCounter rh = cachedHoldCounter; //這是上一個線程緩存
if (rh == null || rh.tid != getThreadId(current))
//這里會返回當前線程初始化值 也就是數(shù)量為空0
//將當前線程重入對象賦值給緩存
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0) //第一次進入
readHolds.set(rh);
rh.count++;
}
return 1;
}
// cas 競爭失敗话浇,完整版本共享鎖獲取
return fullTryAcquireShared(current);
}
readerShouldBlock 是一個隊列堵塞策略方法,用于區(qū)分公平鎖和非公平鎖的實現(xiàn)闹究,當返回true時幔崖,會堵塞所有獲取讀鎖線程。
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) { //自旋獲取鎖渣淤,直到成功
int c = getState();
if (exclusiveCount(c) != 0) { //這時已經(jīng)是寫鎖狀態(tài)
if (getExclusiveOwnerThread() != current) //不是鎖降級就退出循環(huán)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) { //當返回true赏寇,則說明已經(jīng)存在堵塞線程,這是要么自旋价认,要么失敗
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) { //重入獲取讀鎖嗅定,這也是不行的
// assert firstReaderHoldCount > 0;
} else { //判斷線程棧禎是否還有重入,如果出現(xiàn)重入自旋等待
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0) //鎖已經(jīng)釋放了刻伊,不能算是重入了露戒,直接失敗了
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) { //高16位運算,獲取共享鎖成功
if (sharedCount(c) == 0) { // 下面代碼跟上面基本一致捶箱,略過.....
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
總結(jié)下: 當獲取共享鎖時智什,只有檢測到獨占鎖時,獲取鎖方法會立即返回失敗丁屎。
看下共享鎖釋放
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1) //鎖已經(jīng)退出荠锭,歸還緩存
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) { //當前鎖沒有重入,直接刪除
readHolds.remove();
if (count <= 0) //多次釋放鎖
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) { //自旋 鎖數(shù)量減一
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0; //共享鎖數(shù)據(jù)為0
}
}
HoldCounter用于維護每一個線程釋放鎖數(shù)量晨川,保證釋放不會超過自身持有的數(shù)量证九。
獨占鎖獲取和釋放
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) { // c 不能等于0 ,當前仍然持有鎖共虑,有可能是獨占鎖或者是共享鎖
// 如果獨占鎖為空0愧怜,則說明當前仍然有線程沒有釋放讀鎖,這個不滿足寫鎖獲取妈拌,直接失敗
//w > 0 ,這是說明已經(jīng)有線程獲取獨占鎖了拥坛,這時必須是重入才會獲取成功
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//這里是重入了
setState(c + acquires);
return true;
}
//競爭獲取鎖
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
writerShouldBlock:當返回true會堵塞獲取鎖的線程,用于區(qū)分公平鎖和非公平鎖實現(xiàn)尘分。結(jié)合上面代碼猜惋,當返回true時,不會去獲取鎖培愁,直接失敗了著摔。
獨占鎖釋放
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively()) //持有獨占鎖線程不是當前線程
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free) //所有鎖都被釋放了,可以將獨占鎖線程致空
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
公平鎖和非公平鎖
ReentrantReadWriteLock內(nèi)部有兩個鎖可以選擇定续,公平鎖和非公平鎖谍咆。通過構(gòu)造參數(shù)進行選擇,默認使用非公平鎖禾锤。
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
非公平鎖: 在獲取讀鎖或者寫鎖時,獲取鎖的線程并不是順序的卧波,在堵塞隊列中的線程可能長期等待时肿,獲取不到鎖庇茫,而沒有在堵塞隊列中等待線程反而能快速獲取到鎖港粱,這個會造成線程饑餓,但是會比公平鎖有更高的吞吐量旦签。
公平鎖: 保證每一個等待最久線程最先獲取到線程執(zhí)行權(quán)查坪,線程都會按照AQS堵塞順序獲取鎖,這樣有利于避免線程饑餓的產(chǎn)生宁炫,但是在在獲取鎖需要判斷隊列有一定性能損耗偿曙,所以吞吐量不如非公平高。
公平鎖和非公平鎖區(qū)別在在于writerShouldBlock 羔巢、readerShouldBlock 方法實現(xiàn)不同而已望忆。
公平鎖實現(xiàn)
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
hasQueuedPredecessors: 返回true則說明AQS中存在堵塞線程,只有在出現(xiàn)寫鎖的時候竿秆,才會將獲取鎖線程放入隊列中启摄,所以readerShouldBlock在讀鎖獲取時,會永遠返回false幽钢。
非公平鎖
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
// 只有堵塞隊列第一個線程為非共享鎖時才會返回true
// 當隊列前面已經(jīng)出現(xiàn)寫鎖了歉备,所有共享鎖都不能和寫鎖競爭,放棄競爭
return apparentlyFirstQueuedIsExclusive();
}
}
從上面代碼知道匪燕,只有這兩個方法返回true蕾羊,都不能去競爭鎖,公平鎖的策略非常明顯帽驯,只有堵塞隊列有線程龟再,就會放棄鎖競爭。而非公平鎖則是在寫鎖時尼变,無論隊列有無線程都會嘗試競爭利凑,寫鎖時只有隊列最前面的線程為寫鎖時,才會放棄競爭享甸,總的來說公平鎖和非公平鎖邏輯和ReentrantLock 邏輯基本一樣截碴。
tryLock
在讀鎖、寫鎖的對象中蛉威,都存在tryLock 方法日丹,它跟lock方法有“億點點”不同,雖然他們都是調(diào)用了內(nèi)部Sync方法蚯嫌,但是在獲取鎖方法上哲虾,和上面分析tryAcquire丙躏、tryAcquireShared基本一致,唯獨缺少了readerShouldBlock束凑、writerShouldBlock使用晒旅。使用這個方法獲取鎖,無論公平鎖還非公平鎖汪诉,獲取鎖邏輯都一樣废恋。無論堵塞隊列是否有線程,會直接競爭獲取鎖扒寄,在非公平鎖中讀鎖會讓步隊列中第一個寫鎖鱼鼓,寫鎖優(yōu)先級會高于讀鎖。但tryLock不存在该编,所有鎖的競爭的公平的迄本,快速的,可以理解這個方法在獲取鎖上會有更高的優(yōu)先級(相比lock)课竣。