StampedLock是Java 8新增的一個(gè)讀寫(xiě)鎖贿肩,它是對(duì)ReentrantReadWriteLock的改進(jìn)。StampedLock的同步狀態(tài)包含了一個(gè)版本和模式龄寞,獲取鎖的方法返回一個(gè)stamp表示這個(gè)鎖的狀態(tài)汰规;而這些方法的 "try" 版本返回一個(gè)特殊值0表示獲取鎖失敗。鎖釋放和轉(zhuǎn)換的方法需要stamp作為參數(shù)物邑,如果stamp不符合鎖的同步狀態(tài)就會(huì)失敗溜哮。StampedLock提供了三種模式的控制:
獨(dú)占寫(xiě)模式。
writeLock
方法可能會(huì)在獲取共享狀態(tài)時(shí)阻塞色解,如果成功獲取鎖茂嗓,返回一個(gè)stamp,它可以作為參數(shù)被用在unlockWrite
方法中以釋放寫(xiě)鎖科阎。tryWriteLock
的超時(shí)與非超時(shí)版本都被提供使用述吸。當(dāng)寫(xiě)鎖被獲取,那么沒(méi)有讀鎖能夠被獲取并且所有的樂(lè)觀讀鎖驗(yàn)證都會(huì)失敗锣笨。悲觀讀模式蝌矛。
readLock
方法可能會(huì)在獲取共享狀態(tài)時(shí)阻塞,如果成功獲取鎖错英,返回一個(gè)stamp入撒,它可以作為參數(shù)被用在unlockRead
方法中以釋放讀鎖。tryReadLock
的超時(shí)與非超時(shí)版本都被提供使用椭岩。樂(lè)觀讀模式茅逮。
tryOptimisticRead
方法只有當(dāng)寫(xiě)鎖沒(méi)有被獲取時(shí)會(huì)返回一個(gè)非0的stamp。在獲取這個(gè)stamp后直到調(diào)用validate
方法這段時(shí)間判哥,如果寫(xiě)鎖沒(méi)有被獲取献雅,那么validate
方法將會(huì)返回true。這個(gè)模式可以被認(rèn)為是讀鎖的一個(gè)弱化版本塌计,因?yàn)樗臓顟B(tài)可能隨時(shí)被寫(xiě)鎖破壞挺身。這個(gè)樂(lè)觀模式的主要是為一些很短的只讀代碼塊的使用設(shè)計(jì),它可以降低競(jìng)爭(zhēng)并且提高吞吐量夺荒。但是瞒渠,它的使用本質(zhì)上是很脆弱的。樂(lè)觀讀的代碼區(qū)域應(yīng)當(dāng)只讀取共享數(shù)據(jù)并將它們儲(chǔ)存在局部變量中以待后來(lái)使用技扼,當(dāng)然在使用前要先驗(yàn)證這些數(shù)據(jù)是否過(guò)期伍玖,這可以使用前面提到的validate
方法。在樂(lè)觀讀模式下的數(shù)據(jù)讀取可能是非常不一致的過(guò)程剿吻,因此只有當(dāng)你對(duì)數(shù)據(jù)的表示很熟悉并且重復(fù)調(diào)用validate
方法來(lái)檢查數(shù)據(jù)的一致性時(shí)使用此模式窍箍。例如,當(dāng)先讀取一個(gè)對(duì)象或者數(shù)組引用,然后訪問(wèn)它的字段椰棘、元素或者方法之一時(shí)上面的步驟都是需要的纺棺。
這個(gè)類還提供了在三種模式之間轉(zhuǎn)換的輔助方法。例如邪狞,tryConvertToWriteLock
方法嘗試"提升"一個(gè)模式祷蝌,如果已經(jīng)獲取了讀鎖并且此時(shí)沒(méi)有其他線程獲取讀鎖,那么這個(gè)方法返回一個(gè)合法的寫(xiě)stamp帆卓。這些方法被設(shè)計(jì)來(lái)幫助減少以“重試為主”設(shè)計(jì)時(shí)發(fā)生的代碼代碼膨脹巨朦。
示例
下面的類中描述了一些StampedLock的常用用法,它主要操作一個(gè)簡(jiǎn)單的二維點(diǎn)剑令。這個(gè)示例在沒(méi)有異常會(huì)拋出的情況下依然沿用使用try-catch
塊的慣例糊啡。
class Point {
// 成員變量
private double x, y;
// 鎖實(shí)例
private final StampedLock sl = new StampedLock();
// 排它鎖-寫(xiě)鎖(writeLock)
void move(double deltaX, double deltaY) {
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
// 一個(gè)只讀方法
// 其中存在樂(lè)觀讀鎖到悲觀讀鎖的轉(zhuǎn)換
double distanceFromOrigin() {
// 嘗試獲取樂(lè)觀讀鎖
long stamp = sl.tryOptimisticRead();
// 將全部變量拷貝到方法體棧內(nèi)
double currentX = x, currentY = y;
// 檢查在獲取到讀鎖stamp后,鎖有沒(méi)被其他寫(xiě)線程搶占
if (!sl.validate(stamp)) {
// 如果被搶占則獲取一個(gè)共享讀鎖(悲觀獲扔踅颉)
stamp = sl.readLock();
try {
// 將全部變量拷貝到方法體棧內(nèi)
currentX = x;
currentY = y;
} finally {
// 釋放共享讀鎖
sl.unlockRead(stamp);
}
}
// 返回計(jì)算結(jié)果
return Math.sqrt(currentX * currentX + currentY * currentY);
}
// 獲取讀鎖棚蓄,并嘗試轉(zhuǎn)換為寫(xiě)鎖
void moveIfAtOrigin(double newX, double newY) {
long stamp = sl.tryOptimisticRead();
try {
// 如果當(dāng)前點(diǎn)在原點(diǎn)則移動(dòng)
while (x == 0.0 && y == 0.0) {
// 嘗試將獲取的讀鎖升級(jí)為寫(xiě)鎖
long ws = sl.tryConvertToWriteLock(stamp);
// 升級(jí)成功,則更新stamp碍脏,并設(shè)置坐標(biāo)值梭依,然后退出循環(huán)
if (ws != 0L) {
stamp = ws;
x = newX;
y = newY;
break;
} else {
// 讀鎖升級(jí)寫(xiě)鎖失敗則釋放讀鎖,顯示獲取獨(dú)占寫(xiě)鎖潮酒,然后循環(huán)重試
sl.unlockRead(stamp);
stamp = sl.writeLock();
}
}
} finally {
sl.unlock(stamp);
}
}
}
介紹了StampedLock的基本用法后睛挚,下面開(kāi)始進(jìn)行源碼分析。
同步節(jié)點(diǎn)
StampedLock使用 long
作為同步狀態(tài)的類型急黎,它使用一個(gè)小的有限數(shù)作為讀鎖被獲取的2進(jìn)制位數(shù)(目前為7),所以當(dāng)reader的數(shù)量到達(dá)上限時(shí)侧到,使用一個(gè)額外的溢出字來(lái)表示溢出勃教。我們通過(guò)將最大的reader數(shù)量(RBITS)視作一個(gè)自旋鎖來(lái)保護(hù)同步狀態(tài)的溢出更新。
在StampedLock中使用的同步節(jié)點(diǎn)與AQS的同步節(jié)點(diǎn)有一點(diǎn)不同匠抗,下面先看它的常量代表的意義故源。
> line: 352
/** 處理器的數(shù)量,控制自旋次數(shù) */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/** 被增加到同步隊(duì)列前最大的重試次數(shù)汞贸,至少為1 */
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 1;
/** 在頭節(jié)點(diǎn)處被阻塞前的最大重試次數(shù) */
private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 1;
/** 在再次被阻塞前的最大重試次數(shù) */
private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 1;
/** The period for yielding when waiting for overflow spinlock */
private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1
/** 讀鎖被獲取的次數(shù)的二進(jìn)制位數(shù) */
private static final int LG_READERS = 7;
// Values for lock state and stamp operations
private static final long RUNIT = 1L; // 類似讀寫(xiě)鎖的RUNIT绳军,意思是每次獲取讀鎖時(shí)
// 同步狀態(tài)應(yīng)當(dāng)增加1
private static final long WBIT = 1L << LG_READERS; // 寫(xiě)狀態(tài) 10000000
private static final long RBITS = WBIT - 1L; // 溢出保護(hù) 01111111
private static final long RFULL = RBITS - 1L; // 最大reader 01111110
private static final long ABITS = RBITS | WBIT; // 掩碼 11111111
private static final long SBITS = ~RBITS; // 掩碼 24(1)10000000
/*
* 3種模式可以通過(guò)檢查區(qū)分 (m = stamp & ABITS):
* 寫(xiě)模式: m == WBIT
* 樂(lè)觀讀模式: m == 0L (即使讀鎖已經(jīng)被持有)
* 悲觀讀模式: m > 0L && m <= RFULL (同步狀態(tài)的拷貝,,但是stamp中的
* read hold count除了用來(lái)決定是哪個(gè)模式以外不會(huì)被使用)
*
* This differs slightly from the encoding of state:
* (state & ABITS) == 0L 表示鎖沒(méi)有被獲取
* (state & ABITS) == RBITS 這是一個(gè)特殊值矢腻,表示操作讀者bit位的自旋鎖溢出
*/
/** 鎖狀態(tài)的初始值 */
private static final long ORIGIN = WBIT << 1; // 1 00000000
// Special value from cancelled acquire methods so caller can throw IE
private static final long INTERRUPTED = 1L;
// 節(jié)點(diǎn)狀態(tài)值; order matters
private static final int WAITING = -1;
private static final int CANCELLED = 1;
// 節(jié)點(diǎn)模式 (使用int而不是boolean以允許運(yùn)算)
private static final int RMODE = 0;
private static final int WMODE = 1;
其中ABITS和SBITS是作為掩碼使用的门驾,來(lái)快速檢查當(dāng)前鎖的狀態(tài),在后面讀寫(xiě)鎖的獲取中可以看到它們的使用多柑。使用ORIGIN作為初始值也是與此相關(guān)奶是,我們?cè)诤竺嬗懻摗6x狀態(tài)正常最多只可以被獲取126(RFULL)次,如果超出這個(gè)上限聂沙,那么其他讀線程獲取鎖時(shí)需要在readreaderOverflow
記錄秆麸。因?yàn)?code>readreaderOverflow不是個(gè)原子變量,所以為了保證它的同步性及汉,需要進(jìn)行同步處理沮趣。
了解了常量值的含義后,開(kāi)始對(duì)同步節(jié)點(diǎn)的分析:
> line: 406
static final class WNode {
volatile WNode prev; // 前驅(qū)節(jié)點(diǎn)
volatile WNode next; // 后繼節(jié)點(diǎn)
volatile WNode cowait; // 讀線程鏈表
volatile Thread thread; // non-null while possibly parked
volatile int status; // 0, WAITING, or CANCELLED
final int mode; // RMODE or WMODE
WNode(int m, WNode p) { mode = m; prev = p; }
}
/** 隊(duì)列頭節(jié)點(diǎn) */
private transient volatile WNode whead;
/** 隊(duì)列尾節(jié)點(diǎn) */
private transient volatile WNode wtail;
StampedLock中的同步節(jié)點(diǎn)和AQS的幾乎一樣坷随,只多加了一個(gè)cowait
字段兔毒,同時(shí)狀態(tài)略有不同,還多了個(gè)判定是讀還是寫(xiě)的mode
字段甸箱。關(guān)于狀態(tài)只有WAITING
和CANCELLED
兩種育叁,閱讀過(guò)AQS相信對(duì)此不會(huì)有疑惑,而cowait
的的出現(xiàn)是對(duì)AQS的優(yōu)化芍殖。在StampedLock中豪嗽,讀節(jié)點(diǎn)不像AQS那樣每個(gè)讀線程都會(huì)構(gòu)造一個(gè)自己的節(jié)點(diǎn)并加入到同步隊(duì)列中,而是將許多連續(xù)的讀節(jié)點(diǎn)掛載在一個(gè)讀節(jié)點(diǎn)上豌骏,此時(shí)同步隊(duì)列中就不會(huì)出現(xiàn)多個(gè)連續(xù)的讀節(jié)點(diǎn)龟梦,當(dāng)此讀節(jié)點(diǎn)獲取到鎖時(shí),會(huì)喚醒在其上掛載的所有讀線程窃躲,此時(shí)其他需要增加到同步隊(duì)列中的線程無(wú)論讀寫(xiě)都會(huì)幫助頭節(jié)點(diǎn)喚醒计贰,如此就大大加快了讀線程的喚醒速度,具體實(shí)現(xiàn)會(huì)在后面進(jìn)行講解蒂窒。
寫(xiě)鎖的獲取與釋放
> line: 459
public long writeLock() {
long next;
// 當(dāng)沒(méi)有悲觀讀鎖或者寫(xiě)鎖已經(jīng)被獲取時(shí)躁倒,能夠獲取到寫(xiě)鎖
return ((next = tryWriteLock()) != 0L) ? next : acquireWrite(false, 0L);
}
> line: 471
public long tryWriteLock() {
long s;
// 進(jìn)行掩碼運(yùn)算
return (((s = state) & ABITS) == 0L) ? tryWriteLock(s) : 0L;
}
> line: 442
private long tryWriteLock(long s) {
// assert (s & ABITS) == 0L;
long next;
if (casState(s, next = s | WBIT)) {
VarHandle.storeStoreFence();
return next;
}
return 0L;
}
我們先明確一件事,同步狀態(tài)的初始值為100000000
(二進(jìn)制)洒琢,如果獲取到悲觀讀鎖秧秉,那么同步狀態(tài)會(huì)加一,ABITS
的值為11111111
衰抑,所以如果 state & ABITS
不為0象迎,就表示有線程獲取了悲觀讀鎖或者由線程已經(jīng)獲取了寫(xiě)鎖,而如果state & ABITS
為0呛踊,則只可能有線程獲取到了樂(lè)觀讀鎖砾淌,此時(shí)線程可以無(wú)視樂(lè)觀讀鎖然后獲取寫(xiě)鎖。
下面是獲取讀鎖的超時(shí)版本:
> line: 489
public long tryWriteLock(long time, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(time);
// 如果在嘗試獲取鎖前線程已經(jīng)被中斷谭网,那么直接拋出異常
if (!Thread.interrupted()) {
long next, deadline;
// 成功獲取到寫(xiě)鎖汪厨,直接返回一個(gè) stamp
if ((next = tryWriteLock()) != 0L)
return next;
// 指定超時(shí)時(shí)間 <=0,直接返回
if (nanos <= 0L)
return 0L;
if ((deadline = System.nanoTime() + nanos) == 0L)
deadline = 1L;
if ((next = acquireWrite(true, deadline)) != INTERRUPTED)
return next;
}
throw new InterruptedException();
}
> line: 1231
private long acquireWrite(boolean interruptible, long deadline) {
WNode node = null, p;
for (int spins = -1;;) { // spin while enqueuing
long m, s, ns;
// 如果當(dāng)前沒(méi)有線程獲取了悲觀讀鎖或?qū)戞i蜻底,那么嘗試獲取寫(xiě)鎖
if ((m = (s = state) & ABITS) == 0L) {
if ((ns = tryWriteLock(s)) != 0L)
return ns;
}
else if (spins < 0)
// 如果有線程已經(jīng)獲取了寫(xiě)鎖并且隊(duì)列還未被初始化或者為空骄崩,設(shè)置自旋次數(shù)
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
// 進(jìn)行忙等待
else if (spins > 0) {
--spins;
Thread.onSpinWait();
}
// 如果自旋結(jié)束還未獲取到鎖
else if ((p = wtail) == null) { // 初始化隊(duì)列
// 構(gòu)建一個(gè)節(jié)點(diǎn)聘鳞,并設(shè)為頭節(jié)點(diǎn)以及尾節(jié)點(diǎn)
WNode hd = new WNode(WMODE, null);
if (WHEAD.weakCompareAndSet(this, null, hd))
wtail = hd;
}
// 如果隊(duì)列已經(jīng)被初始化,將自己增加到同步隊(duì)列中
else if (node == null)
node = new WNode(WMODE, p);
// 入隊(duì)過(guò)程中如果有其他線程已經(jīng)將自己設(shè)置為尾節(jié)點(diǎn)要拂,則重新插入到尾端
else if (node.prev != p)
node.prev = p;
// 嘗試將自己設(shè)置為隊(duì)列尾節(jié)點(diǎn)抠璃,成功則跳出循環(huán)
else if (WTAIL.weakCompareAndSet(this, p, node)) {
p.next = node;
break;
}
}
boolean wasInterrupted = false;
for (int spins = -1;;) {
WNode h, np, pp; int ps;
// 如果前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)
if ((h = whead) == p) {
// 設(shè)置自旋次數(shù)
if (spins < 0)
spins = HEAD_SPINS;
// 如果小于最大頭節(jié)點(diǎn)自旋次數(shù),則將自選次數(shù)擴(kuò)大為兩倍
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins; k > 0; --k) { // 在隊(duì)列頭部一直自旋
long s, ns;
// 檢查當(dāng)前是否有線程獲取了寫(xiě)鎖或者悲觀讀鎖
if (((s = state) & ABITS) == 0L) {
// 如果沒(méi)有脱惰,則嘗試獲取寫(xiě)鎖
if ((ns = tryWriteLock(s)) != 0L) {
// 獲取成功搏嗡,將自己設(shè)為頭節(jié)點(diǎn)
whead = node;
node.prev = null;
if (wasInterrupted)
Thread.currentThread().interrupt();
return ns;
}
}
// 如果當(dāng)前鎖已被占有,那么就自旋
else
Thread.onSpinWait();
}
}
// 如果頭節(jié)點(diǎn)不為空且不是當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
else if (h != null) {
WNode c; Thread w;
// 如果頭節(jié)點(diǎn)的cowait!=null拉一,即頭節(jié)點(diǎn)是一個(gè)讀節(jié)點(diǎn)采盒,那么便幫助它釋放在此處
// 積聚的讀線程
while ((c = h.cowait) != null) {
if (WCOWAIT.weakCompareAndSet(h, c, c.cowait) &&
(w = c.thread) != null)
LockSupport.unpark(w);
}
}
// 如果在執(zhí)行上面操作時(shí)頭節(jié)點(diǎn)沒(méi)有被改變
if (whead == h) {
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
// 將前驅(qū)節(jié)點(diǎn)設(shè)置為WAITING
else if ((ps = p.status) == 0)
WSTATUS.compareAndSet(p, 0, WAITING);
// 如果前驅(qū)節(jié)點(diǎn)被取消了,則將此節(jié)點(diǎn)移除同步隊(duì)列蔚润,與更前面的節(jié)點(diǎn)建立聯(lián)系
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
long time; // 0 argument to park means no timeout
if (deadline == 0L)
time = 0L;
// 如果超時(shí)時(shí)間已經(jīng)到達(dá)磅氨,取消此節(jié)點(diǎn)
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
node.thread = wt;
// 阻塞當(dāng)前線程
if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
whead == h && node.prev == p) {
if (time == 0L)
LockSupport.park(this);
else
LockSupport.parkNanos(this, time);
}
node.thread = null;
if (Thread.interrupted()) {
if (interruptible)
return cancelWaiter(node, node, true);
wasInterrupted = true;
}
}
}
}
}
上面獲取寫(xiě)狀態(tài)的方法很復(fù)雜,我們將其逐個(gè)塊分析嫡纠。首先此方法包含了兩個(gè)死循環(huán)烦租,第一個(gè)循環(huán)主要的操作就是將自己加入到同步隊(duì)列中,而第二個(gè)循環(huán)的主要目的是阻塞自己除盏。
下面先說(shuō)第一個(gè)循環(huán)叉橱,在這個(gè)循環(huán)中執(zhí)行時(shí),一直都在伴隨著自旋者蠕,在將自己加入到同步隊(duì)列時(shí)窃祝,也在嘗試能否獲取到同步狀態(tài)。
- 第一個(gè)if分支
if ((m = (s = state) & ABITS) == 0L)
踱侣。 每次都會(huì)檢查同步狀態(tài)以查看是否有資格去獲取寫(xiě)鎖粪小,而失敗的原因只會(huì)是與其他線程競(jìng)爭(zhēng)寫(xiě)鎖時(shí)失敗了,自此以后它就沒(méi)有資格去獲取了泻仙,直到鎖再次為空糕再,這就是第一個(gè)if分支的用處。 - 第二個(gè)if分支
spins < 0
玉转。 如果第一個(gè)if判斷失敗,那么證明鎖已經(jīng)被其他線程持有了殴蹄,不管是讀鎖還是寫(xiě)鎖究抓。此時(shí)設(shè)置自旋次數(shù),準(zhǔn)備自旋獲取鎖袭灯,這也是StampedLock相對(duì)于過(guò)去的ReentrantLock做出的改變刺下,因?yàn)楹芏鄷r(shí)候在自旋的這段時(shí)間內(nèi)其他線程會(huì)釋放鎖,所以此時(shí)就能更快的獲取到鎖稽荧,而非通過(guò)等待/通知機(jī)制等待喚醒橘茉。當(dāng)然,如果是其他線程獲取了悲觀讀鎖,那么就將自旋次數(shù)設(shè)置為0畅卓,以讓它們能夠有時(shí)間讀取共享數(shù)擅腰;或者當(dāng)前同步隊(duì)列中已經(jīng)有不止一個(gè)節(jié)點(diǎn)了,那么根據(jù)FIFO原則翁潘,需要讓前面的線程先獲取寫(xiě)鎖趁冈,所以自旋次數(shù)也為0. - 第三個(gè)if分支
spins > 0
。 在第二個(gè)if分支設(shè)置了自旋次數(shù)后拜马,線程便可以開(kāi)始自旋了渗勘,每自旋一次spins減1。經(jīng)過(guò)這兩步之后俩莽,主要的自旋過(guò)程已經(jīng)完成旺坠,后面在加入到同步隊(duì)列中時(shí),只會(huì)每次循環(huán)時(shí)再嘗試一次扮超,相當(dāng)于多幾次嘗試的機(jī)會(huì)取刃。 - 第四個(gè)if分支
(p = wtail) == null
。 當(dāng)自旋次數(shù)用完后線程還沒(méi)有獲取到寫(xiě)鎖瞒津,那么就嘗試將自己增加到同步隊(duì)列中蝉衣。如果隊(duì)列還沒(méi)有被初始化,就構(gòu)造一個(gè)節(jié)點(diǎn)并將其設(shè)置為頭節(jié)點(diǎn)以及尾節(jié)點(diǎn)巷蚪。 - 第五個(gè)if分支
node == null
病毡。 如果同步隊(duì)列已經(jīng)被初始化了,那么就構(gòu)造一個(gè)寫(xiě)節(jié)點(diǎn)屁柏,前驅(qū)節(jié)點(diǎn)指向當(dāng)前的尾節(jié)點(diǎn)啦膜。 - 第六個(gè)if分支
node.prev != p
。在第五步后淌喻,這個(gè)節(jié)點(diǎn)并沒(méi)有被真正增加到同步隊(duì)列中僧家,因?yàn)榍膀?qū)節(jié)點(diǎn)的next字段還沒(méi)有指向它。在此期間裸删,可能有其他節(jié)點(diǎn)在它之前成功插入到了同步隊(duì)列中八拱,此時(shí)隊(duì)列的尾節(jié)點(diǎn)已經(jīng)被改變,而不是在線程堆棧中保存的尾節(jié)點(diǎn)p涯塔,所以將前驅(qū)節(jié)點(diǎn)指向新的尾節(jié)點(diǎn)肌稻,避免隊(duì)列混亂。 - 第七個(gè)分支
WTAIL.weakCompareAndSet(this, p, node)
匕荸。 如果尾節(jié)點(diǎn)并未發(fā)生改變爹谭,那么便嘗試CAS將自己設(shè)置為隊(duì)列的尾節(jié)點(diǎn)并使前驅(qū)節(jié)點(diǎn)的next字段指向自己,如果成功了榛搔,就跳出此循環(huán)诺凡,否則就重新設(shè)置前驅(qū)節(jié)點(diǎn)然后再次嘗試东揣。至此,第一個(gè)循環(huán)結(jié)束腹泌。
第一個(gè)循環(huán)完成之后嘶卧,當(dāng)前線程已經(jīng)成功插入到了同步隊(duì)列中,第二個(gè)循環(huán)可能依然會(huì)自旋獲取同步狀態(tài)真屯,如果不就將自己阻塞脸候。
- 第一個(gè)if分支
(h = whead) == p
。 此時(shí)p保存的還是在第一個(gè)循環(huán)中記錄的上一個(gè)尾節(jié)點(diǎn)绑蔫,即當(dāng)前節(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)运沦。如果前驅(qū)結(jié)點(diǎn)為頭節(jié)點(diǎn),那么便可以再次嘗試獲取同步狀態(tài)配深,因?yàn)榇藭r(shí)有很大幾率能夠成功携添。下面的if分支建立在此if分支的基礎(chǔ)上。
- 第一個(gè)if分支
spins < 0
篓叶。 首先設(shè)置自旋次數(shù)烈掠,因?yàn)榇藭r(shí)很有可能成功獲取同步狀態(tài),所以相比第一個(gè)循環(huán)1<<6 == 64
的次數(shù)缸托,現(xiàn)在將自旋次數(shù)設(shè)置的更多左敌,為1<<10 == 1024
次。 - 第二個(gè)分支
spins < MAX_HEAD_SPINS
俐镐。 如果在上一個(gè)大循環(huán)中矫限,在給定自旋次數(shù)內(nèi)仍未能夠成功獲取同步狀態(tài),那么就再次擴(kuò)大自旋次數(shù)到1<<16 == 65536
次佩抹。
在設(shè)置了自旋次數(shù)后叼风,開(kāi)始嘗試獲取同步狀態(tài)。此處的for循環(huán)的唯一功能就是獲取同步狀態(tài)棍苹,如果鎖目前被其他線程占有无宿,那么進(jìn)入忙等待,否則執(zhí)行tryWriteLock
方法進(jìn)行CAS競(jìng)爭(zhēng)枢里。
- 第二個(gè)if分支
h != null
孽鸡。 如果前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn),并且頭節(jié)點(diǎn)是讀節(jié)點(diǎn)栏豺,那么就幫助喚醒積聚的等待線程梭灿。注意,如果讀線程獲取鎖失敗冰悠,當(dāng)同步隊(duì)列的尾節(jié)點(diǎn)是讀節(jié)點(diǎn)時(shí),它便不會(huì)將自己插入到同步隊(duì)列中配乱,而是直接掛載在那個(gè)尾節(jié)點(diǎn)溉卓,當(dāng)那個(gè)節(jié)點(diǎn)成為頭節(jié)點(diǎn)時(shí)皮迟,就會(huì)喚醒在其上掛載的所有讀線程,此處就是幫助讀節(jié)點(diǎn)更快喚醒那些線程桑寨。
在完成了上面兩個(gè)分支的工作后(不管做的是哪一個(gè))伏尼,如果頭節(jié)點(diǎn)沒(méi)有發(fā)生變化片拍,那么就執(zhí)行下面的操作重抖,否則進(jìn)入下一個(gè)循環(huán)重新開(kāi)始梗搅。
- 第一個(gè)if分支
(np = node.prev) != p
铜犬。 如果前驅(qū)節(jié)點(diǎn)發(fā)生了改變栗弟,那么使當(dāng)前的前驅(qū)節(jié)點(diǎn)的next字段指向自己宴胧。 - 第二個(gè)if分支
(ps = p.status) == 0
蒂破。如果前驅(qū)節(jié)點(diǎn)未發(fā)生改變猿妈,且它的狀態(tài)為初始狀態(tài)肢藐,那么就嘗試將它的狀態(tài)設(shè)置為WATING故河,因?yàn)榇藭r(shí)前驅(qū)節(jié)點(diǎn)并不是頭節(jié)點(diǎn)并且已經(jīng)將自己阻塞或者取消了。 - 第三個(gè)if分支
ps == CANCELLED
吆豹。 如果前驅(qū)節(jié)點(diǎn)被取消了鱼的,那么就將此節(jié)點(diǎn)從同步隊(duì)列中移除出去,然后設(shè)置更前面的節(jié)點(diǎn)的next字段指向自己痘煤。 - 第四個(gè)if分支凑阶。 根據(jù)是否超時(shí)阻塞當(dāng)前線程,如果未使用超時(shí)衷快,那么就一直阻塞直到其他線程喚醒宙橱,否則就進(jìn)行超時(shí)處理。
雖然此方法看起來(lái)比較復(fù)雜烦磁,但是邏輯清晰养匈,還是比較容易理解。其復(fù)雜性主要就是自旋優(yōu)化導(dǎo)致的都伪。
下面是此方法的流程圖:
下面是幾種寫(xiě)節(jié)點(diǎn)加入同步隊(duì)列的常見(jiàn)情況:
下面是獲取鎖的響應(yīng)中斷版本:
> line: 517
public long writeLockInterruptibly() throws InterruptedException {
long next;
if (!Thread.interrupted() &&
(next = acquireWrite(true, 0L)) != INTERRUPTED)
return next;
throw new InterruptedException();
}
整體上一樣呕乎,只是多了個(gè)響應(yīng)中斷。
鎖釋放時(shí)需要將加鎖時(shí)返回的stamp作為參數(shù)陨晶。
> line: 678
public void unlockWrite(long stamp) {
// 如果同步狀態(tài)和stamp不相符猬仁,拋出異常
if (state != stamp || (stamp & WBIT) == 0L)
throw new IllegalMonitorStateException();
unlockWriteInternal(stamp);
}
> line: 661
private long unlockWriteInternal(long s) {
long next; WNode h;
// 更改同步狀態(tài)
STATE.setVolatile(this, next = unlockWriteState(s));
// 如果同步隊(duì)列中有節(jié)點(diǎn)等待,并且自旋獲取同步狀態(tài)失敗被阻塞(status!=0)先誉,釋放它
if ((h = whead) != null && h.status != 0)
release(h);
return next;
}
> line: 657
// 返回一個(gè)未加鎖狀態(tài)湿刽,增加一個(gè)版本并避免同步狀態(tài)為 0
private static long unlockWriteState(long s) {
return ((s += WBIT) == 0L) ? ORIGIN : s;
}
> line: 1208
private void release(WNode h) {
if (h != null) {
WNode q; Thread w;
WSTATUS.compareAndSet(h, WAITING, 0);
// 如果第一個(gè)等待節(jié)點(diǎn)為null或者被取消了,那么從隊(duì)尾開(kāi)始逆向?qū)ふ? if ((q = h.next) == null || q.status == CANCELLED) {
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
if (q != null && (w = q.thread) != null)
LockSupport.unpark(w);
}
}
從寫(xiě)鎖釋放的源碼中可以看出褐耳,寫(xiě)鎖釋放的時(shí)候诈闺,同步狀態(tài)會(huì)增加WBIT
,相當(dāng)于記錄寫(xiě)鎖總共被獲取的次數(shù)铃芦,當(dāng)其增加到上限溢出時(shí)雅镊,重置同步狀態(tài)為ORIGIN
襟雷。
讀鎖的獲取與釋放
> line: 532
public long readLock() {
long s, next;
// 如果同步隊(duì)列為空并且沒(méi)有線程獲取了寫(xiě)鎖并且CAS成功,返回stamp
return (whead == wtail
&& ((s = state) & ABITS) < RFULL
&& casState(s, next = s + RUNIT))
? next
: acquireRead(false, 0L);
}
> line: 1339
private long acquireRead(boolean interruptible, long deadline) {
boolean wasInterrupted = false;
WNode node = null, p;
for (int spins = -1;;) {
WNode h;
// 如果同步隊(duì)列為空或者只有一個(gè)頭節(jié)點(diǎn)
if ((h = whead) == (p = wtail)) {
for (long m, s, ns;;) {
// 如果沒(méi)有線程獲得寫(xiě)鎖并且讀鎖數(shù)量未達(dá)到上限仁烹,進(jìn)行CAS競(jìng)爭(zhēng)
// 如果讀鎖數(shù)量達(dá)到上限耸弄,嘗試增加讀鎖數(shù)量溢出,如果成功卓缰,返回
if ((m = (s = state) & ABITS) < RFULL ?
casState(s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
if (wasInterrupted)
Thread.currentThread().interrupt();
return ns;
}
// 如果有線程獲取了寫(xiě)鎖
else if (m >= WBIT) {
// 如果自旋次數(shù)大于0计呈,進(jìn)行忙等待
if (spins > 0) {
--spins;
Thread.onSpinWait();
}
else {
// 如果自旋次數(shù)用完,并且同步隊(duì)列未發(fā)生變化征唬,跳出循環(huán)
if (spins == 0) {
WNode nh = whead, np = wtail;
if ((nh == h && np == p) || (h = nh) != (p = np))
break;
}
// 設(shè)置自旋次數(shù)
spins = SPINS;
}
}
}
}
if (p == null) { // 初始化隊(duì)列
WNode hd = new WNode(WMODE, null);
if (WHEAD.weakCompareAndSet(this, null, hd))
wtail = hd;
}
// 如果隊(duì)列已經(jīng)初始化捌显,創(chuàng)建一個(gè)讀節(jié)點(diǎn),插入到隊(duì)列尾部
else if (node == null)
node = new WNode(RMODE, p);
// 如果隊(duì)列只有一個(gè)節(jié)點(diǎn)或者尾節(jié)點(diǎn)為寫(xiě)節(jié)點(diǎn)
else if (h == p || p.mode != RMODE) {
// 如果在自旋期間有其他節(jié)點(diǎn)成功插入到隊(duì)列尾部鳍鸵,將prev字段設(shè)置為新的尾節(jié)點(diǎn)
if (node.prev != p)
node.prev = p;
// 否則苇瓣,設(shè)置當(dāng)前節(jié)點(diǎn)為尾節(jié)點(diǎn),并使前驅(qū)節(jié)點(diǎn)的next字段指向自己偿乖,跳出循環(huán)
else if (WTAIL.weakCompareAndSet(this, p, node)) {
p.next = node;
break;
}
}
// 如果尾節(jié)點(diǎn)為讀節(jié)點(diǎn)击罪,將其cowait設(shè)置為當(dāng)前節(jié)點(diǎn),注意這是一個(gè)類似鏈表插入的操作
else if (!WCOWAIT.compareAndSet(p, node.cowait = p.cowait, node))
node.cowait = null;
// 如果上一步CAS成功
else {
for (;;) {
WNode pp, c; Thread w;
// 如果頭節(jié)點(diǎn)為讀節(jié)點(diǎn)贪薪,嘗試釋放此節(jié)點(diǎn)上掛載的所有節(jié)點(diǎn)
if ((h = whead) != null && (c = h.cowait) != null &&
WCOWAIT.compareAndSet(h, c, c.cowait) &&
(w = c.thread) != null) // help release
LockSupport.unpark(w);
// 響應(yīng)中斷
if (Thread.interrupted()) {
if (interruptible)
return cancelWaiter(node, p, true);
wasInterrupted = true;
}
// 如果掛載節(jié)點(diǎn)的前驅(qū)是頭節(jié)點(diǎn)或者掛載節(jié)點(diǎn)為頭節(jié)點(diǎn)媳禁,即現(xiàn)在可以嘗試獲取讀鎖
if (h == (pp = p.prev) || h == p || pp == null) {
long m, s, ns;
do {
if ((m = (s = state) & ABITS) < RFULL ?
casState(s, ns = s + RUNIT) :
(m < WBIT &&
(ns = tryIncReaderOverflow(s)) != 0L)) {
if (wasInterrupted)
Thread.currentThread().interrupt();
return ns;
}
} while (m < WBIT);
}
// 如果在前面執(zhí)行完隊(duì)列頭節(jié)點(diǎn)未發(fā)生變化
if (whead == h && p.prev == pp) {
long time;
if (pp == null || h == p || p.status > 0) {
node = null; // throw away
break;
}
// 計(jì)算超時(shí)時(shí)間,并阻塞自己
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L) {
if (wasInterrupted)
Thread.currentThread().interrupt();
return cancelWaiter(node, p, false);
}
Thread wt = Thread.currentThread();
node.thread = wt;
if ((h != pp || (state & ABITS) == WBIT) &&
whead == h && p.prev == pp) {
if (time == 0L)
LockSupport.park(this);
else
LockSupport.parkNanos(this, time);
}
node.thread = null;
}
}
}
}
for (int spins = -1;;) {
WNode h, np, pp; int ps;
// 如果前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)
if ((h = whead) == p) {
// 設(shè)置自旋數(shù)
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins;;) { // 在頭部自旋画切,嘗試獲取鎖
long m, s, ns;
if ((m = (s = state) & ABITS) < RFULL ?
casState(s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
WNode c; Thread w;
whead = node;
node.prev = null;
while ((c = node.cowait) != null) {
if (WCOWAIT.compareAndSet(node, c, c.cowait) &&
(w = c.thread) != null)
LockSupport.unpark(w);
}
if (wasInterrupted)
Thread.currentThread().interrupt();
return ns;
}
else if (m >= WBIT && --k <= 0)
break;
else
Thread.onSpinWait();
}
}
// 如果頭節(jié)點(diǎn)不為空并且前驅(qū)節(jié)點(diǎn)不為頭節(jié)點(diǎn)竣稽,如果頭節(jié)點(diǎn)是讀節(jié)點(diǎn)
// 幫助喚醒掛載的讀線程
else if (h != null) {
WNode c; Thread w;
while ((c = h.cowait) != null) {
if (WCOWAIT.compareAndSet(h, c, c.cowait) &&
(w = c.thread) != null)
LockSupport.unpark(w);
}
}
// 如果頭節(jié)點(diǎn)未發(fā)生改變
if (whead == h) {
// 如果前驅(qū)節(jié)點(diǎn)發(fā)生了改變,更新節(jié)點(diǎn)引用
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
// 如果前驅(qū)節(jié)點(diǎn)狀態(tài)是初始狀態(tài)霍弹,將其設(shè)置為WAITING
else if ((ps = p.status) == 0)
WSTATUS.compareAndSet(p, 0, WAITING);
// 如果前驅(qū)節(jié)點(diǎn)被取消毫别,移除此節(jié)點(diǎn)并更新節(jié)點(diǎn)引用
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
// 否則,阻塞自己
else {
long time;
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
node.thread = wt;
if (p.status < 0 &&
(p != h || (state & ABITS) == WBIT) &&
whead == h && node.prev == p) {
if (time == 0L)
LockSupport.park(this);
else
LockSupport.parkNanos(this, time);
}
node.thread = null;
if (Thread.interrupted()) {
if (interruptible)
return cancelWaiter(node, node, true);
wasInterrupted = true;
}
}
}
}
}
> line: 1157
private long tryIncReaderOverflow(long s) {
// assert (s & ABITS) >= RFULL;
// 如果剛好到達(dá)上限典格,則將同步狀態(tài)的最后七位設(shè)置為RBITS岛宦,用作屏蔽其他讀線程以原子更新readOverflow
if ((s & ABITS) == RFULL) {
if (casState(s, s | RBITS)) {
// readerOverflow記錄當(dāng)讀線程數(shù)量飽和時(shí)的溢出數(shù)
++readerOverflow;
// 更新完readerOverflow后將state重置回RFULL
// 此時(shí)可以讓下一個(gè)線程來(lái)更新readerOverflow
STATE.setVolatile(this, s);
return s;
}
}
// 否則,如果隨機(jī)數(shù)&111==0耍缴,調(diào)度線程
else if ((LockSupport.nextSecondarySeed() & OVERFLOW_YIELD_RATE) == 0)
Thread.yield();
// 否則進(jìn)行忙等待
else
Thread.onSpinWait();
return 0L;
}
讀狀態(tài)的獲取相比寫(xiě)狀態(tài)更加復(fù)雜砾肺,主要原因便是StampedLock的讀節(jié)點(diǎn)不像ReentrantReadWriteLock那樣每個(gè)線程構(gòu)造一個(gè)節(jié)點(diǎn),而是如果當(dāng)前尾節(jié)點(diǎn)是寫(xiě)節(jié)點(diǎn)防嗡,那么就構(gòu)造一個(gè)讀節(jié)點(diǎn)加入隊(duì)列尾端变汪,如果尾節(jié)點(diǎn)是讀節(jié)點(diǎn),就不會(huì)再將新構(gòu)造的讀節(jié)點(diǎn)加入到隊(duì)列尾部了蚁趁,而是直接掛載在前面那個(gè)讀節(jié)點(diǎn)上裙盾,當(dāng)那個(gè)節(jié)點(diǎn)成為頭節(jié)點(diǎn)時(shí),喚醒掛載在上面的所有讀線程。
整個(gè)方法依然被分為兩個(gè)大循環(huán)闷煤,不過(guò)這兩個(gè)循環(huán)的功能交錯(cuò)童芹,下面一個(gè)一個(gè)進(jìn)行分析,首先說(shuō)第一個(gè)大循環(huán)鲤拿。
- 第一個(gè)if
(h = whead) == (p = wtail)
。 如果隊(duì)列還沒(méi)有被初始化或者只有一個(gè)頭節(jié)點(diǎn)署咽,那么嘗試獲取讀狀態(tài)近顷。因?yàn)榇藭r(shí)很有可能獲取到讀鎖,所以即使現(xiàn)在讀線程數(shù)量已經(jīng)到達(dá)上限宁否,依然繼續(xù)嘗試窒升。但如果已經(jīng)有線程獲取了寫(xiě)鎖,那么就自旋1<<6 == 64
次進(jìn)行嘗試慕匠。如果自旋結(jié)束還沒(méi)有獲取到鎖饱须,那么退出獲取鎖的循環(huán)。 - 第一個(gè)if分支
p == null
台谊。 如果當(dāng)前隊(duì)列沒(méi)有被初始化蓉媳,那么就初始化隊(duì)列,此步寫(xiě)鎖獲取中已經(jīng)說(shuō)過(guò)锅铅,不再贅述酪呻,后面與寫(xiě)鎖獲取類似的功能都不在贅述。 - 第二個(gè)if分支
node == null
盐须。 構(gòu)造一個(gè)讀節(jié)點(diǎn)玩荠,設(shè)置prev字段為隊(duì)尾。 - 第三個(gè)if分支
h == p || p.mode != RMODE
贼邓。 如果前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)或者前驅(qū)節(jié)點(diǎn)的寫(xiě)模式阶冈,如果當(dāng)前節(jié)點(diǎn)的prev字段被改變,那么就重新設(shè)置prev字段塑径,否則將此節(jié)點(diǎn)設(shè)置為隊(duì)尾女坑,并跳出第一個(gè)大循環(huán)。 - 第四個(gè)if分支
!WCOWAIT.compareAndSet(p, node.cowait = p.cowait, node)
晓勇。 如果前驅(qū)結(jié)點(diǎn)不是頭節(jié)點(diǎn)并且是一個(gè)讀節(jié)點(diǎn)堂飞,那么嘗試將自己掛載到此節(jié)點(diǎn)上。 - 第五個(gè)if分支绑咱。 如果第五步CAS成功绰筛,則進(jìn)入最后一個(gè)分支,否則循環(huán)CAS設(shè)置描融。這個(gè)if分支由多個(gè)if語(yǔ)句組成铝噩,下面逐個(gè)分析。
- 第一個(gè)if
(h = whead) != null && (c = h.cowait) != null && WCOWAIT.compareAndSet(h, c, c.cowait) && (w = c.thread) != null
窿克。 如果頭節(jié)點(diǎn)是一個(gè)讀節(jié)點(diǎn)骏庸,那么幫助它喚醒掛載在其上等待的讀線程毛甲。 - 第二個(gè)if
Thread.interrupted()
。 如果響應(yīng)中斷具被,取消自己玻募,否則只記錄被中斷了。 - 第三個(gè)if
h == (pp = p.prev) || h == p || pp == null
一姿。 如果節(jié)點(diǎn)的前驅(qū)(當(dāng)前節(jié)點(diǎn)掛載在一個(gè)讀節(jié)點(diǎn)上七咧,相當(dāng)于是一個(gè)節(jié)點(diǎn))是頭節(jié)點(diǎn)或者節(jié)點(diǎn)是頭節(jié)點(diǎn),再次嘗試獲取鎖直到有線程獲取了寫(xiě)鎖叮叹。 - 第四個(gè)if
whead == h && p.prev == pp
艾栋。 如果節(jié)點(diǎn)引用未發(fā)生變化,那么嘗試阻塞自己蛉顽,因?yàn)橐呀?jīng)給予了足夠的機(jī)會(huì)嘗試獲取寫(xiě)鎖蝗砾,或者離能夠獲取到鎖還很遙遠(yuǎn),阻塞防止浪費(fèi)cpu資源携冤。如果被掛載的節(jié)點(diǎn)被取消了悼粮,那么丟棄自己,重新開(kāi)始噪叙,否則掛載節(jié)點(diǎn)將會(huì)一直阻塞下去矮锈。
第二個(gè)大循環(huán):
-
(h = whead) == p
。 如果前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)睁蕾,那么再次自旋嘗試獲取同步狀態(tài)苞笨。如果獲取成功,將自己設(shè)置為頭節(jié)點(diǎn)子眶,并喚醒掛載節(jié)點(diǎn)瀑凝。 -
h != null
。 如果頭節(jié)點(diǎn)不為空且是讀節(jié)點(diǎn)臭杰,幫助喚醒線程粤咪。 -
whead == h
。如果經(jīng)過(guò)上面的步驟后頭節(jié)點(diǎn)未發(fā)生變化渴杆,那么就將自己阻塞寥枝。
下面是此方法的流程圖:
下面展示一下讀線程插入到同步隊(duì)列的幾種常見(jiàn)情況,注意讀節(jié)點(diǎn)與寫(xiě)節(jié)點(diǎn)的內(nèi)部構(gòu)造是一樣的磁奖,此處省略寫(xiě)節(jié)點(diǎn)的cowait字段囊拜。
關(guān)于讀鎖獲取的其他版本大體差不多,不再贅述比搭,此處只貼出源碼冠跷。
> line: 549
public long tryReadLock() {
long s, m, next;
while ((m = (s = state) & ABITS) != WBIT) {
if (m < RFULL) {
if (casState(s, next = s + RUNIT))
return next;
}
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
return 0L;
}
> line: 576
public long tryReadLock(long time, TimeUnit unit)
throws InterruptedException {
long s, m, next, deadline;
long nanos = unit.toNanos(time);
if (!Thread.interrupted()) {
if ((m = (s = state) & ABITS) != WBIT) {
if (m < RFULL) {
if (casState(s, next = s + RUNIT))
return next;
}
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
if (nanos <= 0L)
return 0L;
if ((deadline = System.nanoTime() + nanos) == 0L)
deadline = 1L;
if ((next = acquireRead(true, deadline)) != INTERRUPTED)
return next;
}
throw new InterruptedException();
}
> line: 610
public long readLockInterruptibly() throws InterruptedException {
long s, next;
if (!Thread.interrupted()
// bypass acquireRead on common uncontended case
&& ((whead == wtail
&& ((s = state) & ABITS) < RFULL
&& casState(s, next = s + RUNIT))
||
(next = acquireRead(true, 0L)) != INTERRUPTED))
return next;
throw new InterruptedException();
}
讀鎖釋放代碼如下:
> line: 693
public void unlockRead(long stamp) {
long s, m; WNode h;
while (((s = state) & SBITS) == (stamp & SBITS)
&& (stamp & RBITS) > 0L
&& ((m = s & RBITS) > 0L)) {
if (m < RFULL) {
if (casState(s, s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
return;
}
}
else if (tryDecReaderOverflow(s) != 0L)
return;
}
throw new IllegalMonitorStateException();
}
> line: 1179
private long tryDecReaderOverflow(long s) {
// assert (s & ABITS) >= RFULL;
if ((s & ABITS) == RFULL) {
if (casState(s, s | RBITS)) {
int r; long next;
// 優(yōu)先減少溢出記錄
if ((r = readerOverflow) > 0) {
readerOverflow = r - 1;
next = s;
}
else
next = s - RUNIT;
STATE.setVolatile(this, next);
return next;
}
}
else if ((LockSupport.nextSecondarySeed() & OVERFLOW_YIELD_RATE) == 0)
Thread.yield();
else
Thread.onSpinWait();
return 0L;
}
樂(lè)觀讀鎖的獲取
樂(lè)觀讀鎖只需要獲取,不需要釋放。只要沒(méi)有線程獲取寫(xiě)鎖蜜托,那么就能獲取到樂(lè)觀讀鎖抄囚。當(dāng)然,當(dāng)使用由樂(lè)觀讀鎖讀取的共享數(shù)據(jù)時(shí)橄务,需要先調(diào)用validate
方法驗(yàn)證數(shù)據(jù)是否過(guò)期幔托,只需要判斷從獲取數(shù)據(jù)到現(xiàn)在這段時(shí)間內(nèi)是否有線程獲取讀鎖即可。從此可以看出仪糖,樂(lè)觀讀鎖可以很好的提高吞吐量柑司,但是它的使用也非常不穩(wěn)定,如果對(duì)數(shù)據(jù)的了解與控制不清晰锅劝,那么很容易出現(xiàn)臟讀問(wèn)題。
> line: 629
public long tryOptimisticRead() {
long s;
// 只要沒(méi)有線程獲取寫(xiě)鎖蟆湖,那么就能成功獲取樂(lè)觀讀鎖
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
> line: 646
public boolean validate(long stamp) {
VarHandle.acquireFence();
// 驗(yàn)證是否有線程獲取到寫(xiě)鎖
return (stamp & SBITS) == (state & SBITS);
}
讀寫(xiě)鎖之間的轉(zhuǎn)換
除了提供了樂(lè)觀讀鎖外故爵,StampedLock相比ReentrantReadWriteLock還提供了讀寫(xiě)鎖之間的轉(zhuǎn)換,而ReentrantReadWriteLock只提供了寫(xiě)鎖到讀鎖的鎖降級(jí)特性隅津。
- 嘗試轉(zhuǎn)換為寫(xiě)鎖
此時(shí)有三種情況诬垂,樂(lè)觀讀鎖轉(zhuǎn)換為讀鎖,悲觀讀鎖轉(zhuǎn)換為讀鎖伦仍,寫(xiě)鎖轉(zhuǎn)換為寫(xiě)鎖结窘。
- 樂(lè)觀讀鎖的
stamp & ABITS
一定是0,可通過(guò)樂(lè)觀讀鎖的獲取方法發(fā)現(xiàn)這個(gè)特點(diǎn)充蓝。當(dāng)嘗試轉(zhuǎn)換時(shí)隧枫,會(huì)通過(guò)tryConvertToWriteLock
方法的第一個(gè)if語(yǔ)句進(jìn)行判斷能否成功轉(zhuǎn)換。其中的(a!=0)
的作用是:現(xiàn)在state&ABITS == 0
谓苟,即沒(méi)有線程獲取了寫(xiě)鎖或悲觀讀鎖官脓,但是提供的stamp不符合這個(gè)結(jié)論,所以stamp檢驗(yàn)失敗涝焙。 - 雖然不會(huì)有人想要在獲取寫(xiě)鎖時(shí)依然調(diào)用此方法卑笨,但是為了預(yù)防這種情況,必須要是當(dāng)前獲取了寫(xiě)鎖的線程調(diào)用此方法才能成功仑撞。
- 當(dāng)悲觀讀鎖想要轉(zhuǎn)換為寫(xiě)鎖時(shí)赤兴,必須此時(shí)只有自己一個(gè)線程獲取了悲觀讀鎖才可以,否則此線程在修改數(shù)據(jù)時(shí)隧哮,其他的讀線程是無(wú)法感知到數(shù)據(jù)的更新的桶良。
>line: 739
public long tryConvertToWriteLock(long stamp) {
long a = stamp & ABITS, m, s, next;
// 驗(yàn)證stamp
while (((s = state) & SBITS) == (stamp & SBITS)) {
// 當(dāng)前沒(méi)有悲觀讀鎖或?qū)戞i
if ((m = s & ABITS) == 0L) {
// 但是提供的stamp記錄不相符,失敗
if (a != 0L)
break;
// 嘗試獲取寫(xiě)鎖近迁,成功則返回
if ((next = tryWriteLock(s)) != 0L)
return next;
}
// 如果當(dāng)前線程是已經(jīng)獲取了寫(xiě)鎖的線程
else if (m == WBIT) {
if (a != m)
break;
return stamp;
}
// 如果此時(shí)只有自己一個(gè)讀線程獲取了悲觀讀鎖
else if (m == RUNIT && a != 0L) {
if (casState(s, next = s - RUNIT + WBIT)) {
VarHandle.storeStoreFence();
return next;
}
}
else
break;
}
return 0L;
}
- 嘗試轉(zhuǎn)換為悲觀讀鎖
嘗試轉(zhuǎn)換為悲觀讀鎖依然有三種情況艺普。
- 寫(xiě)鎖轉(zhuǎn)換為悲觀讀鎖。寫(xiě)鎖有資格轉(zhuǎn)換為讀鎖,這并不會(huì)引發(fā)問(wèn)題歧譬,只要提供的stamp符合當(dāng)前的寫(xiě)鎖state即可(注意:寫(xiě)鎖每次釋放時(shí)都會(huì)更新版本岸浑,所以每個(gè)獲取寫(xiě)鎖的state都不會(huì)相同)。
- 樂(lè)觀讀鎖轉(zhuǎn)換為悲觀讀鎖瑰步。只要當(dāng)前沒(méi)有其他線程獲取了寫(xiě)鎖矢洲,那么就能安全轉(zhuǎn)換。
- 悲觀讀鎖轉(zhuǎn)換為悲觀讀鎖缩焦。此種情況和上面的寫(xiě)鎖轉(zhuǎn)寫(xiě)鎖一樣读虏,已經(jīng)是讀鎖了那就無(wú)需轉(zhuǎn)換,只需驗(yàn)證stamp合法性然后直接返回即可袁滥。
> line: 776
public long tryConvertToReadLock(long stamp) {
long a, s, next; WNode h;
while (((s = state) & SBITS) == (stamp & SBITS)) {
if ((a = stamp & ABITS) >= WBIT) {
// write stamp
if (s != stamp)
break;
STATE.setVolatile(this, next = unlockWriteState(s) + RUNIT);
if ((h = whead) != null && h.status != 0)
release(h);
return next;
}
else if (a == 0L) {
// optimistic read stamp
if ((s & ABITS) < RFULL) {
if (casState(s, next = s + RUNIT))
return next;
}
else if ((next = tryIncReaderOverflow(s)) != 0L)
return next;
}
else {
// already a read stamp
if ((s & ABITS) == 0L)
break;
return stamp;
}
}
return 0L;
}
- 嘗試轉(zhuǎn)換為樂(lè)觀讀鎖盖桥。
- 寫(xiě)鎖轉(zhuǎn)換為樂(lè)觀讀鎖。只需要驗(yàn)證stamp合法性即可成功轉(zhuǎn)換题翻。
- 樂(lè)觀讀鎖轉(zhuǎn)換為樂(lè)觀讀鎖揩徊。直接返回。
- 悲觀讀鎖轉(zhuǎn)換為樂(lè)觀讀鎖嵌赠。第一個(gè)if語(yǔ)句
(m = s & ABITS) == 0L
驗(yàn)證stamp的合法性塑荒,此時(shí)stamp只會(huì)是悲觀讀鎖返回的,但是s & ABITS==0
表示當(dāng)前沒(méi)有線程獲取了悲觀讀鎖姜挺,所以stamp不合法齿税。后兩個(gè)if語(yǔ)句針對(duì)當(dāng)前reader是否溢出作出不同的舉措。
> line: 817
public long tryConvertToOptimisticRead(long stamp) {
long a, m, s, next; WNode h;
VarHandle.acquireFence();
while (((s = state) & SBITS) == (stamp & SBITS)) {
if ((a = stamp & ABITS) >= WBIT) {
// write stamp
if (s != stamp)
break;
return unlockWriteInternal(s);
}
else if (a == 0L)
// already an optimistic read stamp
return stamp;
else if ((m = s & ABITS) == 0L) // invalid read stamp
break;
else if (m < RFULL) {
if (casState(s, next = s - RUNIT)) {
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
return next & SBITS;
}
}
else if ((next = tryDecReaderOverflow(s)) != 0L)
return next & SBITS;
}
return 0L;
}