一昭娩、框架圖:
從中可以看出:
(01) ReentrantReadWriteLock實(shí)現(xiàn)了ReadWriteLock接口荣茫。ReadWriteLock是一個(gè)讀寫(xiě)鎖的接口,提供了"獲取讀鎖的readLock()函數(shù)" 和 "獲取寫(xiě)鎖的writeLock()函數(shù)"挑宠。
(02) ReentrantReadWriteLock中包含:sync對(duì)象充包,讀鎖readerLock和寫(xiě)鎖writerLock。讀鎖ReadLock和寫(xiě)鎖WriteLock都實(shí)現(xiàn)了Lock接口。讀鎖ReadLock和寫(xiě)鎖WriteLock中也都分別包含了"Sync對(duì)象",它們的Sync對(duì)象和ReentrantReadWriteLock的Sync對(duì)象 是一樣的,就是通過(guò)sync帮匾,讀鎖和寫(xiě)鎖實(shí)現(xiàn)了對(duì)同一個(gè)對(duì)象的訪問(wèn)。
(03) 和"ReentrantLock"一樣凡傅,sync是Sync類(lèi)型辟狈;而且,Sync也是一個(gè)繼承于AQS的抽象類(lèi)夏跷。Sync也包括"公平鎖"FairSync和"非公平鎖"NonfairSync哼转。sync對(duì)象是"FairSync"和"NonfairSync"中的一個(gè),默認(rèn)是"NonfairSync"槽华。
二壹蔓、介紹
ReadWriteLock,顧名思義猫态,是讀寫(xiě)鎖佣蓉。它維護(hù)了一對(duì)相關(guān)的鎖 — — “讀取鎖”和“寫(xiě)入鎖”,一個(gè)用于讀取操作亲雪,另一個(gè)用于寫(xiě)入操作勇凭。
“讀取鎖”用于只讀操作,它是“共享鎖”义辕,能同時(shí)被多個(gè)線程獲取虾标。
“寫(xiě)入鎖”用于寫(xiě)入操作,它是“獨(dú)占鎖”灌砖,寫(xiě)入鎖只能被一個(gè)線程鎖獲取璧函。
注意:不能同時(shí)存在讀取鎖和寫(xiě)入鎖!
ReadWriteLock是一個(gè)接口基显。ReentrantReadWriteLock是它的實(shí)現(xiàn)類(lèi)蘸吓,ReentrantReadWriteLock包括子類(lèi)ReadLock和WriteLock。
三撩幽、特點(diǎn)
復(fù)用的state值
state表示持有鎖的數(shù)量库继,因?yàn)镽eentrantReadWriteLock分為“讀鎖”和“寫(xiě)鎖”兩把鎖,所以它把低4位用來(lái)表示“寫(xiě)鎖(獨(dú)占鎖)”的持有數(shù),其他位數(shù)表示“讀鎖(共享鎖)”的持有數(shù)宪萄。
代碼:
/*
*?Read?vs?write?count?extraction?constants?and?functions.
*?Lock?state?is?logically?divided?into?two?unsigned?shorts:
*?The?lower?one?representing?the?exclusive?(writer)?lock?hold?count,
*?and?the?upper?the?shared?(reader)?hold?count.
*/
static?final?intSHARED_SHIFT=16;
static?final?intSHARED_UNIT=?(1<
static?final?intMAX_COUNT=?(1<
static?final?intEXCLUSIVE_MASK=?(1<
/**?Returns?the?number?of?shared?holds?represented?in?count? */
static?intsharedCount(intc)? ? {returnc?>>>SHARED_SHIFT;}
/**?Returns?the?number?of?exclusive?holds?represented?in?count? */
static?intexclusiveCount(intc)?{returnc?&EXCLUSIVE_MASK;}
ThreadLocal變量舅桩,保持線程和獲取“共享鎖”的次數(shù)
代碼
static final classThreadLocalHoldCounter
extendsThreadLocal {
publicHoldCounterinitialValue() {
return newHoldCounter();
}
}
/**
* The number of reentrant read locks held by current thread.
* Initialized only in constructor and readObject.
* Removed whenever a thread's read hold count drops to 0.
*/
private transientThreadLocalHoldCounterreadHolds;
四、源碼分析
還是從lock()開(kāi)始分析:
ReadLock中的lock():
public?voidlock()?{
sync.acquireShared(1);
}
AQS中的acquireShared():
public?final?voidacquireShared(intarg)?{
if(tryAcquireShared(arg)?<0)
doAcquireShared(arg);
}
ReentrantReadWriteLock中的tryAcquireShared(arg)
protected?final?inttryAcquireShared(intunused)?{
/*
*?Walkthrough:
*?1.?If?write?lock?held?by?another?thread,?fail.
*?2.?Otherwise,?this?thread?is?eligible?for
*? ? lock?wrt?state,?so?ask?if?it?should?block
*? ? because?of?queue?policy.?If?not,?try
*? ? to?grant?by?CASing?state?and?updating?count.
*? ? Note?that?step?does?not?check?for?reentrant
*? ? acquires,?which?is?postponed?to?full?version
*? ? to?avoid?having?to?check?hold?count?in
*? ? the?more?typical?non-reentrant?case.
*?3.?If?step?2?fails?either?because?thread
*? ? apparently?not?eligible?or?CAS?fails?or?count
*? ? saturated,?chain?to?version?with?full?retry?loop.
*/
Thread?current?=?Thread.currentThread();
intc?=?getState();
if(exclusiveCount(c)?!=0&& ? ? ? //如果是獨(dú)占鎖雨膨,而且不是當(dāng)前線程,直接返回-1
getExclusiveOwnerThread()?!=?current)
return-1;
intr?=sharedCount(c);
if(!readerShouldBlock()?&& ? ? ? ?//如果不需要“阻塞等待”读串,而且“讀取鎖”的計(jì)數(shù)小于“MAX_COUNT”聊记,“讀取鎖”的狀態(tài)+1
r?
compareAndSetState(c,c?+SHARED_UNIT))?{
if(r?==0)?{
firstReader=?current;
firstReaderHoldCount=1;
}else?if(firstReader==?current)?{
firstReaderHoldCount++;
}else{
HoldCounter?rh?=cachedHoldCounter; ? ? ?//readHolds是一個(gè)ThreadLocal,記錄當(dāng)前線程的獲取“讀取鎖”的次數(shù)恢暖,cacheHoldCounter是當(dāng)前線程的緩存排监,避免每次都從ThreadLocal中拿。這個(gè)次數(shù)當(dāng)然要+1
if(rh?==null||?rh.tid!=getThreadId(current))
cachedHoldCounter=?rh?=readHolds.get();
else?if(rh.count==0)
readHolds.set(rh);
rh.count++;
}
return1;
}
returnfullTryAcquireShared(current);
}
說(shuō)明:tryAcquireShared()的作用是嘗試獲取“共享鎖”杰捂。
如果在嘗試獲取鎖時(shí)舆床,“不需要阻塞等待”并且“讀取鎖的共享計(jì)數(shù)小于MAX_COUNT”,則直接通過(guò)CAS函數(shù)更新“讀取鎖的共享計(jì)數(shù)”嫁佳,以及將“當(dāng)前線程獲取讀取鎖的次數(shù)+1”挨队。
否則,通過(guò)fullTryAcquireShared()獲取讀取鎖蒿往。
ReentrantReadWriteLock中的fullTryAcquireShared(thread)
final?intfullTryAcquireShared(Thread?current)?{
/*
*?This?code?is?in?part?redundant?with?that?in
*?tryAcquireShared?but?is?simpler?overall?by?not
*?complicating?tryAcquireShared?with?interactions?between
*?retries?and?lazily?reading?hold?counts.
*/
HoldCounter?rh?=null;
for(;;)?{
intc?=?getState();
if(exclusiveCount(c)?!=0)?{//如果是獨(dú)占鎖盛垦,且當(dāng)前線程不是“獨(dú)占鎖”的持有者,則什么都不干瓤漏,直接返回-1
if(getExclusiveOwnerThread()?!=?current)
return-1;
//如果“需要阻塞等待”腾夯。
//(01)?當(dāng)“需要阻塞等待”的線程是第1個(gè)獲取鎖的線程的話,則繼續(xù)往下執(zhí)行蔬充。
//(02)?當(dāng)“需要阻塞等待”的線程獲取鎖的次數(shù)=0時(shí)蝶俱,則返回-1。
}else?if(readerShouldBlock())?{
//?Make?sure?we're?not?acquiring?read?lock?reentrantly
if(firstReader==?current)?{
//?assert?firstReaderHoldCount?>?0;
}else{
if(rh?==null)?{
rh?=cachedHoldCounter;
if(rh?==null||?rh.tid!=getThreadId(current))?{
rh?=readHolds.get();
if(rh.count==0)
readHolds.remove();
}
}
if(rh.count==0)
return-1;
}
}
//如果共享統(tǒng)計(jì)數(shù)超過(guò)MAX_COUNT饥漫,則拋出異常榨呆。
if(sharedCount(c)?==MAX_COUNT)
throw?newError("Maximum?lock?count?exceeded");
if(compareAndSetState(c,c?+SHARED_UNIT))?{
if(sharedCount(c)?==0)?{//更新state值,如果還沒(méi)有線程獲得“讀鎖”趾浅,那么就把當(dāng)前線程更新為firstReader愕提,firstReaderHoldCounter值設(shè)為1
firstReader=?current;
firstReaderHoldCount=1;
}else?if(firstReader==?current)?{//如果當(dāng)前線程就是鎖的持有者,firstReadHoldCounter+1
firstReaderHoldCount++;
}else{//都不滿足的話皿哨,從緩存或者LocalHold中獲取當(dāng)前線程獲得“讀鎖”的次數(shù)浅侨,+1
if(rh?==null)
rh?=cachedHoldCounter;
if(rh?==null||?rh.tid!=getThreadId(current))
rh?=readHolds.get();
else?if(rh.count==0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter=?rh;//?cache?for?release
}
return1;
}
}
}
說(shuō)明:fullTryAcquireShared()會(huì)根據(jù)“是否需要阻塞等待”,“讀取鎖的共享計(jì)數(shù)是否超過(guò)限制”等等進(jìn)行處理证膨。如果不需要阻塞等待如输,并且鎖的共享計(jì)數(shù)沒(méi)有超過(guò)限制,則通過(guò)CAS嘗試獲取鎖,并返回1不见。
doAcquireShared()定義在AQS函數(shù)中
privatevoiddoAcquireShared(intarg) {//addWaiter(Node.SHARED)的作用是澳化,創(chuàng)建“當(dāng)前線程”對(duì)應(yīng)的節(jié)點(diǎn),并將該線程添加到CLH隊(duì)列中稳吮。finalNode node =addWaiter(Node.SHARED);booleanfailed =true;try{booleaninterrupted =false;for(;;) {//獲取“node”的前一節(jié)點(diǎn)finalNode p =node.predecessor();//如果“當(dāng)前線程”是CLH隊(duì)列的表頭缎谷,則嘗試獲取共享鎖。if(p ==head) {intr =tryAcquireShared(arg);if(r >= 0) {
setHeadAndPropagate(node, r);
p.next=null;//help GCif(interrupted)
selfInterrupt();
failed=false;return;
}
}//如果“當(dāng)前線程”不是CLH隊(duì)列的表頭灶似,則通過(guò)shouldParkAfterFailedAcquire()判斷是否需要等待列林,//需要的話,則通過(guò)parkAndCheckInterrupt()進(jìn)行阻塞等待酪惭。若阻塞等待過(guò)程中希痴,線程被中斷過(guò),則設(shè)置interrupted為true春感。if(shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
interrupted=true;
}
}finally{if(failed)
cancelAcquire(node);
}
}
說(shuō)明:doAcquireShared()的作用是獲取共享鎖砌创。
它會(huì)首先創(chuàng)建線程對(duì)應(yīng)的CLH隊(duì)列的節(jié)點(diǎn),然后將該節(jié)點(diǎn)添加到CLH隊(duì)列中鲫懒。CLH隊(duì)列是管理獲取鎖的等待線程的隊(duì)列嫩实。
如果“當(dāng)前線程”是CLH隊(duì)列的表頭,則嘗試獲取共享鎖刀疙;否則舶赔,則需要通過(guò)shouldParkAfterFailedAcquire()判斷是否阻塞等待,需要的話谦秧,則通過(guò)parkAndCheckInterrupt()進(jìn)行阻塞等待竟纳。
doAcquireShared()會(huì)通過(guò)for循環(huán),不斷的進(jìn)行上面的操作疚鲤;目的就是獲取共享鎖锥累。需要注意的是:doAcquireShared()在每一次嘗試獲取鎖時(shí),是通過(guò)tryAcquireShared()來(lái)執(zhí)行的集歇!