閱讀建議:雖然我這里會(huì)介紹一些 AQS 的知識(shí),不過(guò)如果你完全不了解 AQS驶兜,看本文就有點(diǎn)吃力了
目錄:
1.簡(jiǎn)介
2.一個(gè)緩存示例說(shuō)明讀寫(xiě)鎖的使用方式
3.讀寫(xiě)鎖的實(shí)現(xiàn)分析
3.1 讀寫(xiě)狀態(tài)的設(shè)計(jì)
3.2 ReentrantReadWriteLock 總覽
3.3寫(xiě)鎖的獲取
3.4寫(xiě)鎖的釋放
3.5讀鎖的獲取
3.6讀鎖的釋放
4.鎖降級(jí)
1.簡(jiǎn)介
ReentrantLock實(shí)現(xiàn)是排他鎖,這些鎖在同一時(shí)刻只允許一個(gè)線(xiàn)程進(jìn)行訪(fǎng)問(wèn)帘瞭,而讀寫(xiě)鎖在同一時(shí)刻可以允許多個(gè)讀線(xiàn)程訪(fǎng)問(wèn)忱辅,但是在寫(xiě)線(xiàn)程訪(fǎng)問(wèn)時(shí),所有的讀線(xiàn)程和其他寫(xiě)線(xiàn)程均被阻塞沃粗。讀寫(xiě)鎖維護(hù)了一對(duì)鎖,一個(gè)讀鎖和一個(gè)寫(xiě)鎖键畴,通過(guò)分離讀鎖和寫(xiě)鎖最盅,使得并發(fā)性相比一般的排他鎖有了很大提升突雪。
一般情況下,讀寫(xiě)鎖的性能都會(huì)比排它鎖好涡贱,因?yàn)榇蠖鄶?shù)場(chǎng)景讀是多于寫(xiě)的咏删。在讀多于寫(xiě)的情況下,讀寫(xiě)鎖能夠提供比排它鎖更好的并發(fā)性和吞吐量问词。Java并發(fā)包提供讀寫(xiě)鎖的實(shí)現(xiàn)是ReentrantReadWriteLock
2.一個(gè)緩存示例說(shuō)明讀寫(xiě)鎖的使用方式
public class Cache {
private static final Map<String, Object> map = new HashMap<String, Object>();
private static final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private static final Lock r = rwl.readLock();
private static final Lock w = rwl.writeLock();
// 獲取一個(gè)key對(duì)應(yīng)的value
public static final Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
// 設(shè)置key對(duì)應(yīng)的value督函,并返回舊的value
public static final Object put(String key, Object value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}
// 清空所有的內(nèi)容
public static final void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
}
Cache組合一個(gè)非線(xiàn)程安全的HashMap作為緩存的實(shí)現(xiàn),同時(shí)使用讀寫(xiě)鎖的讀鎖和寫(xiě)鎖來(lái)保證Cache是線(xiàn)程安全的激挪。在讀操作get(String key)方法中辰狡,需要獲取讀鎖,這使得并發(fā)訪(fǎng)問(wèn)該方法時(shí)不會(huì)被阻塞垄分。寫(xiě)操作put(String key,Object value)方法和clear()方法宛篇,在更新HashMap時(shí)必須提前獲取寫(xiě)鎖,當(dāng)獲取寫(xiě)鎖后薄湿,其他線(xiàn)程對(duì)于讀鎖和寫(xiě)鎖的獲取均被阻塞叫倍,而只有寫(xiě)鎖被釋放之后,其他讀寫(xiě)操作才能繼續(xù)嘿般。Cache使用讀寫(xiě)鎖提升讀操作的并發(fā)性段标,也保證每次寫(xiě)操作對(duì)所有的讀寫(xiě)操作的可見(jiàn)性涯冠,同時(shí)簡(jiǎn)化了編程方式炉奴。
3.讀寫(xiě)鎖的實(shí)現(xiàn)分析
3.1 讀寫(xiě)狀態(tài)的設(shè)計(jì)
讀寫(xiě)鎖同樣依賴(lài)自定義同步器來(lái)實(shí)現(xiàn)同步功能,而讀寫(xiě)狀態(tài)就是其同步器的同步狀態(tài)蛇更≌案希回想ReentrantLock中自定義同步器的實(shí)現(xiàn),同步狀態(tài)表示鎖被一個(gè)線(xiàn)程重復(fù)獲取的次數(shù)派任,而讀寫(xiě)鎖的自定義同步器需要在同步狀態(tài)(一個(gè)整型變量)上維護(hù)多個(gè)讀線(xiàn)程和一個(gè)寫(xiě)線(xiàn)程的狀態(tài)砸逊,使得該狀態(tài)的設(shè)計(jì)成為讀寫(xiě)鎖實(shí)現(xiàn)的關(guān)鍵。
ReentrantReadWriteLock 使用一個(gè)整型變量上維護(hù)多種狀態(tài)掌逛,就一定需要“按位切割使用”這個(gè)變量师逸,讀寫(xiě)鎖將變量切分成了兩個(gè)部分,高16位表示讀豆混,低16位表示寫(xiě)
當(dāng)前同步狀態(tài)表示一個(gè)線(xiàn)程已經(jīng)獲取了寫(xiě)鎖篓像,且重進(jìn)入了兩次,同時(shí)也連續(xù)獲取了兩次讀鎖皿伺。讀寫(xiě)鎖是如何迅速確定讀和寫(xiě)各自的狀態(tài)呢员辩?答案是通過(guò)位運(yùn)算。假設(shè)當(dāng)前同步狀態(tài)值為S鸵鸥,寫(xiě)狀態(tài)等于S&0x0000FFFF(將高16位全部抹去)奠滑,讀狀態(tài)等于S>>>16(無(wú)符號(hào)補(bǔ)0右移16位)。當(dāng)寫(xiě)狀態(tài)增加1時(shí),等于S+1宋税,當(dāng)讀狀態(tài)增加1時(shí)摊崭,等于S+(1<<16),也就是S+0x00010000杰赛。
3.2 ReentrantReadWriteLock 總覽
ReadLock 和 WriteLock 的代碼提出來(lái)一起看爽室,清晰一些:
ReadLock 和 WriteLock 中的方法都是通過(guò) Sync 這個(gè)類(lèi)來(lái)實(shí)現(xiàn)的。Sync 是 AQS 的子類(lèi)淆攻,然后再派生了公平模式和不公平模式阔墩。
從它們調(diào)用的 Sync 方法,我們可以看到: ReadLock 使用了共享模式瓶珊,WriteLock 使用了獨(dú)占模式啸箫。
等等,同一個(gè) AQS 實(shí)例怎么可以同時(shí)使用共享模式和獨(dú)占模式伞芹?忘苛??
這里給大家回顧下 AQS唱较,我們橫向?qū)Ρ认?AQS 的共享模式和獨(dú)占模式:
3.3寫(xiě)鎖的獲取
寫(xiě)鎖是一個(gè)支持重進(jìn)入的排它鎖扎唾。如果當(dāng)前線(xiàn)程已經(jīng)獲取了寫(xiě)鎖,則增加寫(xiě)狀態(tài)南缓。如果當(dāng)前線(xiàn)程在獲取寫(xiě)鎖時(shí)胸遇,讀鎖已經(jīng)被獲取(讀狀態(tài)不為0)或者該線(xiàn)程不是已經(jīng)獲取寫(xiě)鎖的線(xiàn)程汉形,則當(dāng)前線(xiàn)程進(jìn)入等待狀態(tài)纸镊,獲取寫(xiě)鎖的代碼如下
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// 存在讀鎖或者當(dāng)前獲取線(xiàn)程不是已經(jīng)獲取寫(xiě)鎖的線(xiàn)程
//也就是說(shuō),只要有讀鎖或?qū)戞i被占用概疆,這次就不能獲取到寫(xiě)鎖
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 這里不需要 CAS逗威,能到這里的,只可能是寫(xiě)鎖重入岔冀,不然在上面的 if 就攔截了
setState(c + acquires);
return true;
}
// 如果寫(xiě)鎖獲取不需要 block凯旭,那么進(jìn)行 CAS,成功就代表獲取到了寫(xiě)鎖
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
該方法除了重入條件(當(dāng)前線(xiàn)程為獲取了寫(xiě)鎖的線(xiàn)程)之外使套,增加了一個(gè)讀鎖是否存在的判斷罐呼。如果存在讀鎖,則寫(xiě)鎖不能被獲取童漩,原因在于:讀寫(xiě)鎖要確保寫(xiě)鎖的操作對(duì)讀鎖可見(jiàn)弄贿,如果允許讀鎖在已被獲取的情況下對(duì)寫(xiě)鎖的獲取,那么正在運(yùn)行的其他讀線(xiàn)程就無(wú)法感知到當(dāng)前寫(xiě)線(xiàn)程的操作矫膨。因此差凹,只有等待其他讀線(xiàn)程都釋放了讀鎖期奔,寫(xiě)鎖才能被當(dāng)前線(xiàn)程獲取,而寫(xiě)鎖一旦被獲取危尿,則其他讀寫(xiě)線(xiàn)程的后續(xù)訪(fǎng)問(wèn)均被阻塞呐萌。
寫(xiě)鎖的釋放與ReentrantLock的釋放過(guò)程基本類(lèi)似,每次釋放均減少寫(xiě)狀態(tài)谊娇,當(dāng)寫(xiě)狀態(tài)為0時(shí)表示寫(xiě)鎖已被釋放肺孤,從而等待的讀寫(xiě)線(xiàn)程能夠繼續(xù)訪(fǎng)問(wèn)讀寫(xiě)鎖,同時(shí)前次寫(xiě)線(xiàn)程的修改對(duì)后續(xù)讀寫(xiě)線(xiàn)程可見(jiàn)济欢。
3.4寫(xiě)鎖的釋放
血鎖的釋放赠堵,是線(xiàn)程安全的,因?yàn)閷?xiě)鎖是獨(dú)占鎖法褥,具有排他性茫叭,所以寫(xiě)鎖的釋放與ReentrantLock的釋放過(guò)程基本類(lèi)似,每次釋放均減少寫(xiě)狀態(tài)半等,當(dāng)寫(xiě)狀態(tài)為0時(shí)表示寫(xiě)鎖已被釋放揍愁,從而等待的讀寫(xiě)線(xiàn)程能夠繼續(xù)訪(fǎng)問(wèn)讀寫(xiě)鎖,同時(shí)前次寫(xiě)線(xiàn)程的修改對(duì)后續(xù)讀寫(xiě)線(xiàn)程可見(jiàn)
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// 如果 exclusiveCount(nextc) == 0杀饵,也就是說(shuō)包括重入的莽囤,所有的寫(xiě)鎖都釋放了
// 那么返回 true,這樣會(huì)進(jìn)行喚醒后繼節(jié)點(diǎn)的操作切距。
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
3.5讀鎖的獲取
讀鎖是一個(gè)支持重進(jìn)入的共享鎖朽缎,它能夠被多個(gè)線(xiàn)程同時(shí)獲取,在沒(méi)有其他寫(xiě)線(xiàn)程訪(fǎng)問(wèn)(或者寫(xiě)狀態(tài)為0)時(shí)蔚舀,讀鎖總會(huì)被成功地獲取饵沧,而所做的也只是(線(xiàn)程安全的)增加讀狀態(tài)。如果當(dāng)前線(xiàn)程已經(jīng)獲取了讀鎖赌躺,則增加讀狀態(tài)。如果當(dāng)前線(xiàn)程在獲取讀鎖時(shí)羡儿,寫(xiě)鎖已被其他線(xiàn)程獲取礼患,則進(jìn)入等待狀態(tài)。
獲取讀鎖的實(shí)現(xiàn)從Java 5到Java 6變得復(fù)雜許多掠归,主要原因是新增了一些功能缅叠,例如getReadHoldCount()方法,作用是返回當(dāng)前線(xiàn)程獲取讀鎖的次數(shù)虏冻。讀狀態(tài)是所有線(xiàn)程獲取讀鎖次數(shù)的總和肤粱,而每個(gè)線(xiàn)程各自獲取讀鎖的次數(shù)只能選擇保存在ThreadLocal中,由線(xiàn)程自身維護(hù)厨相,這使獲取讀鎖的實(shí)現(xiàn)變得復(fù)雜领曼。因此鸥鹉,這里將獲取讀鎖的代碼做了刪減,保留必要的部分
@ReservedStackAccess
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
//1.如果另一個(gè)線(xiàn)程持有寫(xiě)鎖庶骄,則失敗毁渗。
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 讀鎖的獲取次數(shù)
int r = sharedCount(c);
// 讀鎖獲取是否需要被阻塞
if (!readerShouldBlock() &&
// 判斷是否會(huì)溢出 (2^16-1,沒(méi)那么容易溢出的)
r < MAX_COUNT &&
// 下面這行 CAS 是將 state 屬性的高 16 位加 1单刁,
//低 16 位不變灸异,如果成功就代表獲取到了讀鎖
compareAndSetState(c, c + SHARED_UNIT)) {
// r == 0 ->此線(xiàn)程是第一個(gè)獲取讀鎖的
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 第一個(gè)獲取 readLock 的是 current 線(xiàn)程, 直接計(jì)數(shù)器加 1
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
// 先從 cachedHoldCounter拿數(shù)據(jù), 數(shù)據(jù)不對(duì)的話(huà), 再?gòu)膔eadHolds拿數(shù)據(jù)
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//代碼調(diào)用 fullTryAcquireShared
return fullTryAcquireShared(current);
}
fullTryAcquireShared 這個(gè)方法其實(shí)是 tryAcquireShared 的冗余(redundant)方法, 主要補(bǔ)足 readerShouldBlock 導(dǎo)致的獲取等待 和 CAS 修改 AQS 中 state 值失敗進(jìn)行的修補(bǔ)工作
3.6讀鎖的釋放
protected final boolean tryReleaseShared(int unused){
Thread current = Thread.currentThread();
//判斷現(xiàn)在進(jìn)行 release 的線(xiàn)程是否是 firstReader
if(firstReader == current){
// assert firstReaderHoldCount > 0
// 只獲取一次 readLock 直接置空 firstReader
if(firstReaderHoldCount == 1){
firstReader = null;
}else{
3. 將 firstReaderHoldCount 減 1
firstReaderHoldCount--;
}
}else{
// 先通過(guò) cachedHoldCounter 來(lái)取值
HoldCounter rh = cachedHoldCounter;
// cachedHoldCounter 代表的是上次獲取 readLock 的線(xiàn)程, 若這次進(jìn)行 release 的線(xiàn)程不是, 再通過(guò) readHolds 進(jìn)行 lookup 查找
if(rh == null || rh.tid != getThreadId(current)){
rh = readHolds.get();
}
int count = rh.count;
// count <= 1 時(shí)要進(jìn)行 ThreadLocal 的 remove , 不然容易內(nèi)存泄露
if(count <= 1){
readHolds.remove();
if(count <= 0){
// 并發(fā)多次釋放就有可能出現(xiàn)
throw unmatchedUnlockException();
}
}//HoldCounter.count 減 1
--rh.count;
}
for(;;){
// 這里是一個(gè) loop CAS 操作, 因?yàn)榭赡芷渌木€(xiàn)程此刻也在進(jìn)行 release操作
int c = getState();
int nextc = c - SHARED_UNIT;
// 這里是 readLock 的減 1, 也就是 aqs里面state的高 16 上進(jìn)行 減 1
//所以 減 SHARED_UNIT
if(compareAndSetState(c, nextc)){
/**
* Releasing the read lock has no effect on readers,
* but it may allow waiting writers to proceed if
* both read and write locks are now free
*/
return nextc == 0;
// 返回值是判斷 是否還有 readLock 沒(méi)有釋放完, 當(dāng)釋放完了會(huì)進(jìn)行后繼節(jié)點(diǎn)的
//喚醒( readLock 在進(jìn)行獲取成功時(shí)也進(jìn)行傳播式的喚醒后繼的 獲取 readLock 的節(jié)點(diǎn))
}
}
}
讀鎖釋放的過(guò)程還是比較簡(jiǎn)單的,主要就是將 hold count 減 1羔飞,如果減到 0 的話(huà)肺樟,還要將 ThreadLocal 中的 remove 掉。
然后是在 for 循環(huán)中將 state 的高 16 位減 1逻淌,如果發(fā)現(xiàn)讀鎖和寫(xiě)鎖都釋放光了儡嘶,那么喚醒后繼的獲取寫(xiě)鎖的線(xiàn)程。
4.鎖降級(jí)
鎖降級(jí)指的是寫(xiě)鎖降級(jí)成為讀鎖恍风。如果當(dāng)前線(xiàn)程擁有寫(xiě)鎖蹦狂,然后將其釋放,最后再獲取讀鎖朋贬,這種分段完成的過(guò)程不能稱(chēng)之為鎖降級(jí)凯楔。鎖降級(jí)是指把持住(當(dāng)前擁有的)寫(xiě)鎖锦募,再獲取到讀鎖摆屯,隨后釋放(先前擁有的)寫(xiě)鎖的過(guò)程。
接下來(lái)看一個(gè)鎖降級(jí)的示例糠亩。因?yàn)閿?shù)據(jù)不常變化虐骑,所以多個(gè)線(xiàn)程可以并發(fā)地進(jìn)行數(shù)據(jù)處理,當(dāng)數(shù)據(jù)變更后赎线,如果當(dāng)前線(xiàn)程感知到數(shù)據(jù)變化廷没,則進(jìn)行數(shù)據(jù)的準(zhǔn)備工作,同時(shí)其他處理線(xiàn)程被阻塞垂寥,直到當(dāng)前線(xiàn)程完成數(shù)據(jù)的準(zhǔn)備工作
public void processData() {
readLock.lock();
if (!update) {
// 必須先釋放讀鎖
readLock.unlock();
// 鎖降級(jí)從寫(xiě)鎖獲取到開(kāi)始
writeLock.lock();
try {
if (!update) {
// 準(zhǔn)備數(shù)據(jù)的流程(略)
update = true;
}
readLock.lock();
} finally {
writeLock.unlock();
}
// 鎖降級(jí)完成颠黎,寫(xiě)鎖降級(jí)為讀鎖
}try {
// 使用數(shù)據(jù)的流程(略)
} finally {
readLock.unlock();
}
}
鎖降級(jí)中讀鎖的獲取是否必要呢?答案是必要的滞项。主要是為了保證數(shù)據(jù)的可見(jiàn)性狭归,如果當(dāng)前線(xiàn)程不獲取讀鎖而是直接釋放寫(xiě)鎖,假設(shè)此刻另一個(gè)線(xiàn)程(記作線(xiàn)程T)獲取了寫(xiě)鎖并修改了數(shù)據(jù)文判,那么當(dāng)前線(xiàn)程無(wú)法感知線(xiàn)程T的數(shù)據(jù)更新过椎。如果當(dāng)前線(xiàn)程獲取讀鎖,即遵循鎖降級(jí)的步驟戏仓,則線(xiàn)程T將會(huì)被阻塞疚宇,直到當(dāng)前線(xiàn)程使用數(shù)據(jù)并釋放讀鎖之后亡鼠,線(xiàn)程T才能獲取寫(xiě)鎖進(jìn)行數(shù)據(jù)更新。
RentrantReadWriteLock不支持鎖升級(jí)(把持讀鎖灰嫉、獲取寫(xiě)鎖拆宛,最后釋放讀鎖的過(guò)程)。目的也是保證數(shù)據(jù)可見(jiàn)性讼撒,如果讀鎖已被多個(gè)線(xiàn)程獲取浑厚,其中任意線(xiàn)程成功獲取了寫(xiě)鎖并更新了數(shù)據(jù),則其更新對(duì)其他獲取到讀鎖的線(xiàn)程是不可見(jiàn)的根盒。
參考文獻(xiàn):
[1] ReentrantReadWriteLock 源碼分析(基于Java 8)
[2] Java 讀寫(xiě)鎖 ReentrantReadWriteLock 源碼分析
[3] Java的ReadWriteLock實(shí)現(xiàn)機(jī)制解析(一)
[4] 《Java并發(fā)編程藝術(shù)》