1. 概述
StampedLock是JDK 8新增的讀寫鎖,與我們上篇所講的Phaser一樣胖缤,它也不是由AQS實(shí)現(xiàn)的尚镰。StampedLock是一個(gè)基于能力(capability-based)的同步鎖,提供了三種模式來(lái)控制 read/write 的獲取哪廓,并且內(nèi)部實(shí)現(xiàn)了自己的同步等待隊(duì)列狗唉。本篇我們來(lái)詳細(xì)分析在摒棄了AQS之后的StampedLock的實(shí)現(xiàn)。
-
StampedLock的狀態(tài)由一個(gè)版本和模式構(gòu)成涡真。鎖獲取方法返回一個(gè) long型的值-stamp分俯,用來(lái)表示并控制對(duì)鎖狀態(tài)的訪問(wèn);0表示鎖授權(quán)失敗哆料。鎖釋放和轉(zhuǎn)換方法需要用這個(gè)stamp作為參數(shù)缸剪,如果它與鎖狀態(tài)不匹配操作就會(huì)失敗。StampedLock提供了三種鎖模式來(lái)控制讀寫鎖的獲榷唷:
-
寫鎖:使用
writeLock
方法獲取杏节,當(dāng)鎖不可用時(shí)會(huì)阻塞,獲取成功后返回一個(gè)與這個(gè)寫鎖對(duì)應(yīng)的stamp典阵,在unlockWrite
方法中奋渔,需要通過(guò)這個(gè)stamp來(lái)釋放與之對(duì)應(yīng)的鎖。在tryWriteLock
同樣也會(huì)提供這個(gè)stamp壮啊。當(dāng)在write模式中獲取到寫鎖時(shí)嫉鲸,讀鎖不能被獲取,并且所有的樂(lè)觀讀鎖驗(yàn)證(validate
方法)都會(huì)失敗歹啼。 -
讀鎖:使用
readLock
方法獲取玄渗,當(dāng)超出可用資源時(shí)(類似AQS的state設(shè)計(jì))會(huì)阻塞座菠。同樣的,在獲取鎖成功后也會(huì)返回stamp藤树,作用與上述相同浴滴。tryReadLock同樣如此。 -
樂(lè)觀讀鎖:使用
tryOptimisticRead
方法獲取也榄,只有在寫鎖可用時(shí)才能成功獲取樂(lè)觀讀鎖巡莹,獲取成功后也會(huì)返回一個(gè)stamp。validate
方法可以根據(jù)這個(gè)stamp來(lái)判斷寫鎖是否被獲取甜紫。這種模式可以理解為一個(gè)弱化的讀鎖(weak version of a read-lock)降宅,它在任何時(shí)候都能被破壞。樂(lè)觀讀模式常被用在短的只讀的代碼段囚霸,用來(lái)減少爭(zhēng)用并提高吞吐量腰根。樂(lè)觀讀區(qū)域應(yīng)該只讀取字段,并將它們保存在本地變量中拓型,以便在驗(yàn)證(validate
方法)后使用额嘿。在樂(lè)觀讀模式中字段的讀取可能會(huì)不一致,所以可能需要反復(fù)調(diào)用validate()
來(lái)檢查一致性劣挫。例如册养,當(dāng)首次讀取一個(gè)對(duì)象或數(shù)組引用,然后訪問(wèn)其中一個(gè)的字段压固、元素或方法時(shí)球拦,這些步驟通常是必需的。
-
寫鎖:使用
StampedLock還支持在三種模式中提供有條件地轉(zhuǎn)換帐我。例如坎炼,
tryConvertToWriteLock
方法嘗試升級(jí)一個(gè)鎖模式,下面三種情況下可以升級(jí)模式并返回一個(gè)有效的write stamp:
(1) 已經(jīng)在writing模式中
(2) 在reading模式中并且已經(jīng)沒(méi)有其他讀線程
(3) 在樂(lè)觀讀模式中鎖可用
這些方法的表現(xiàn)形式旨在幫助減少由于基于重試(retry-based)設(shè)計(jì)造成的代碼膨脹拦键。StampedLock 被設(shè)計(jì)作為線程安全模型的內(nèi)部工具類谣光。它的使用依賴于對(duì)數(shù)據(jù)、對(duì)象和方法的內(nèi)部屬性有一定的了解芬为。StampedLock 是不可重入的萄金,所以在鎖的內(nèi)部不能調(diào)用其他嘗試重復(fù)獲取鎖的方法。一個(gè)stamp如果在很長(zhǎng)時(shí)間都沒(méi)有使用或驗(yàn)證媚朦,在很長(zhǎng)一段時(shí)間之后可能就會(huì)驗(yàn)證失敗捡絮。StampedLocks是可序列化的,但是反序列化后變?yōu)槌跏嫉姆擎i定狀態(tài)莲镣,所以在遠(yuǎn)程鎖定中是不安全的。
StampedLock 的調(diào)度策略不會(huì)始終偏向讀線程或?qū)懢€程涎拉,所有的"try"方法都是盡最大努力獲取瑞侮,并不一定遵循任何調(diào)度或公平策略的圆。從"try"方法獲取或轉(zhuǎn)換鎖失敗返回0時(shí),不會(huì)攜帶任何鎖的狀態(tài)信息半火。由于StampedLock支持跨多個(gè)鎖模式的協(xié)調(diào)使用越妈,它不會(huì)直接實(shí)現(xiàn)
Lock
或ReadWriteLock
接口。但是钮糖,如果應(yīng)用程序需要Lock的相關(guān)功能梅掠,它可以通過(guò)asReadLock()、asWriteLock()
和asReadWriteLock()
方法返回一個(gè)Lock視圖店归。
在介紹了 StampedLock 的特性之后阎抒,我們來(lái)看一下內(nèi)部的等待隊(duì)列的實(shí)現(xiàn):
相較于AQS,可以看到 StampedLock 的等待隊(duì)列多了一個(gè)cowait節(jié)點(diǎn)鏈消痛,這個(gè)節(jié)點(diǎn)用來(lái)存放等待讀的線程列表且叁。也就是說(shuō),等待寫的線程存放在鏈表的正常節(jié)點(diǎn)中秩伞,如果有讀線程等待獲取鎖逞带,就會(huì)把這個(gè)讀線程放到cowait節(jié)點(diǎn)鏈上。
1.1 核心參數(shù)
//獲取鎖失敗入隊(duì)之前的最大自旋次數(shù)(實(shí)際運(yùn)行時(shí)并不一定是這個(gè)數(shù))
private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;
//頭節(jié)點(diǎn)獲取鎖的最大自旋次數(shù)
private static final int HEAD_SPINS = (NCPU > 1) ? 1 << 10 : 0;
//頭節(jié)點(diǎn)再次阻塞前的最大自旋次數(shù)
private static final int MAX_HEAD_SPINS = (NCPU > 1) ? 1 << 16 : 0;
//等待自旋鎖溢出的周期數(shù)
private static final int OVERFLOW_YIELD_RATE = 7; // must be power 2 - 1
//在溢出之前讀線程計(jì)數(shù)用到的bit數(shù)
private static final int LG_READERS = 7;
// Values for lock state and stamp operations
private static final long RUNIT = 1L;//讀鎖單位
private static final long WBIT = 1L << LG_READERS;//寫狀態(tài)標(biāo)識(shí) 1000 0000
private static final long RBITS = WBIT - 1L;//讀狀態(tài)標(biāo)識(shí) 111 1111
private static final long RFULL = RBITS - 1L; //讀鎖最大資源數(shù) 111 1110
private static final long ABITS = RBITS | WBIT; //用于獲取鎖狀態(tài) 1111 1111
private static final long SBITS = ~RBITS; //note overlap with ABITS
//鎖狀態(tài)初始值
private static final long ORIGIN = WBIT << 1;
//中斷標(biāo)識(shí)
private static final long INTERRUPTED = 1L;
//節(jié)點(diǎn)狀態(tài) 等待/取消
private static final int WAITING = -1;
private static final int CANCELLED = 1;
//節(jié)點(diǎn)模型 讀/寫
private static final int RMODE = 0;
private static final int WMODE = 1;
狀態(tài)判斷:
state & ABITS == 0L
寫鎖可用
state & ABITS < RFULL
讀鎖可用
state & ABITS == WBIT
寫鎖已經(jīng)被其他線程獲取
state & ABITS == RFULL
讀鎖飽和纱新,可嘗試增加額外資源數(shù)
(stamp & SBITS) == (state & SBITS)
驗(yàn)證stamp是否為當(dāng)前已經(jīng)獲取的鎖stamp
(state & WBIT) != 0L
當(dāng)前線程已經(jīng)持有寫鎖
(state & RBITS) != 0L
當(dāng)前線程已經(jīng)持有讀鎖
s & RBITS
讀鎖已經(jīng)被獲取的數(shù)量
1.2 函數(shù)列表
//構(gòu)造函數(shù)
public StampedLock() {
state = ORIGIN;
}
//獲取寫鎖展氓,等待鎖可用
public long writeLock()
//獲取寫鎖,直接返回
public long tryWriteLock()
//獲取寫鎖脸爱,等待指定的時(shí)間
public long tryWriteLock(long time, TimeUnit unit)
//獲取寫鎖遇汞,響應(yīng)中斷
public long writeLockInterruptibly()
//獲取讀鎖,等待鎖可用
public long readLock()
//嘗試獲取讀鎖阅羹,直接返回
public long tryReadLock()
//獲取讀鎖勺疼,限制等待時(shí)間
public long tryReadLock(long time, TimeUnit unit)
//獲取讀鎖,響應(yīng)中斷
public long readLockInterruptibly()
//獲取樂(lè)觀讀鎖,如果寫鎖可用獲取成功捏鱼,不修改任何狀態(tài)值
public long tryOptimisticRead()
//驗(yàn)證stamp执庐,如果在鎖發(fā)出給定的stamp之后寫鎖沒(méi)有被獲取,或者給定stamp是當(dāng)前已經(jīng)獲取的鎖stamp导梆,則返回true轨淌。一般用在樂(lè)觀讀鎖中,用于判斷是否可繼續(xù)獲取讀鎖看尼。
public boolean validate(long stamp)
//釋放寫鎖
public void unlockWrite(long stamp)
//釋放讀鎖
public void unlockRead(long stamp)
//釋放給定stamp對(duì)應(yīng)的鎖
public void unlock(long stamp)
//嘗試升級(jí)給定stamp對(duì)應(yīng)的鎖為寫鎖
public long tryConvertToWriteLock(long stamp)
//嘗試降級(jí)給定stamp對(duì)應(yīng)的鎖為讀鎖
public long tryConvertToReadLock(long stamp)
//嘗試降級(jí)給定stamp對(duì)應(yīng)的鎖為樂(lè)觀讀鎖
public long tryConvertToOptimisticRead(long stamp)
//嘗試釋放寫鎖递鹉,一般用在異常復(fù)原
public boolean tryUnlockWrite()
//嘗試釋放讀鎖,一般用在異常復(fù)原
public boolean tryUnlockRead()
//寫鎖是否被持有
public boolean isWriteLocked()
//讀鎖是否被持有
public boolean isReadLocked()
//獲取讀鎖數(shù)
public int getReadLockCount()
//返回一個(gè)ReadLock
public Lock asReadLock()
//返回一個(gè)WriteLock
public Lock asWriteLock()
//返回一個(gè)ReadWriteLock
public ReadWriteLock asReadWriteLock()
2. 源碼解析
2.1 writeLock()
//獲取寫鎖藏斩,等待可用
public long writeLock() {
long s, next; // bypass acquireWrite in fully unlocked case only
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
private long acquireWrite(boolean interruptible, long deadline) {
WNode node = null, p;
//第一個(gè)自旋躏结,準(zhǔn)備入隊(duì)
for (int spins = -1;;) { // spin while enqueuing
long m, s, ns;
if ((m = (s = state) & ABITS) == 0L) {//鎖可用
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))//獲取鎖 CAS修改鎖狀態(tài)
return ns;
}
else if (spins < 0)
spins = (m == WBIT && wtail == whead) ? SPINS : 0;//自旋次數(shù)
else if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins; //隨機(jī)遞減
}
else if ((p = wtail) == null) { // initialize queue
WNode hd = new WNode(WMODE, null);//初始化寫鎖等待隊(duì)列
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
else if (node == null)
node = new WNode(WMODE, p);//創(chuàng)建新的等待節(jié)點(diǎn)
else if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {//更新tail節(jié)點(diǎn)
p.next = node;
break;
}
}
//第二個(gè)自旋,節(jié)點(diǎn)依次獲取鎖
for (int spins = -1;;) {
WNode h, np, pp; int ps;
if ((h = whead) == p) {//當(dāng)前節(jié)點(diǎn)是最后一個(gè)等待節(jié)點(diǎn)
if (spins < 0)
spins = HEAD_SPINS; //頭結(jié)點(diǎn)自旋次數(shù)
else if (spins < MAX_HEAD_SPINS)
spins <<= 1; // spins=spins/2
for (int k = spins;;) { // spin at head
long s, ns;
if (((s = state) & ABITS) == 0L) {//鎖可用
if (U.compareAndSwapLong(this, STATE, s,
ns = s + WBIT)) {//更新鎖狀態(tài)
//更新頭結(jié)點(diǎn)狰域,返回stamp
whead = node;
node.prev = null;
return ns;
}
}
else if (LockSupport.nextSecondarySeed() >= 0 &&
--k <= 0)//隨機(jī)遞減
break;
}
}
else if (h != null) { // help release stale waiters
WNode c; Thread w;
//依次喚醒頭節(jié)點(diǎn)的cowait節(jié)點(diǎn)線程
while ((c = h.cowait) != null) {//有等待讀的線程
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && //CAS更新頭結(jié)點(diǎn)的cowait
(w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) {
//檢查隊(duì)列穩(wěn)定性
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {//尾節(jié)點(diǎn)取消媳拴,更新尾節(jié)點(diǎn)的前繼節(jié)點(diǎn)為p.prev黄橘,繼續(xù)自旋
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
long time; // 0 argument to park means no timeout
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);//超時(shí),取消等待
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
whead == h && node.prev == p)
U.park(false, time); // emulate LockSupport.park
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);//中斷屈溉,取消等待
}
}
}
}
說(shuō)明:獲取寫鎖塞关,如果鎖可用((state & ABITS) == 0L
)則直接獲取寫鎖并返回stamp,否則調(diào)用acquireWrite
等待鎖可用子巾,acquireWrite
主要由兩個(gè)自旋組成帆赢,代碼雖然比較多,但是邏輯很簡(jiǎn)單线梗,函數(shù)大概執(zhí)行流程如下:
- 第一個(gè)自旋椰于,使當(dāng)前線程進(jìn)入等待隊(duì)列的尾節(jié)點(diǎn)。
- 第二個(gè)自旋缠导,節(jié)點(diǎn)依次獲取寫鎖廉羔,直到當(dāng)前線程所在節(jié)點(diǎn)的前繼節(jié)點(diǎn)(prev)為頭結(jié)點(diǎn)時(shí),如果鎖可用僻造,則說(shuō)明可以獲取鎖憋他,獲取成功返回stamp
- 如果在自旋中未能成功獲取到鎖,并且線程被中斷或者等待超時(shí)髓削,則調(diào)用
cancelWaiter
方法取消節(jié)點(diǎn)的等待竹挡,cancelWaiter
后面會(huì)分析。
2.2 readLock()
//獲取寫鎖立膛,等待鎖可用
public long readLock() {
long s = state, next; // bypass acquireRead on common uncontended case
return ((whead == wtail && (s & ABITS) < RFULL && //還有可用資源
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
private long acquireRead(boolean interruptible, long deadline) {
WNode node = null, p;
//第一個(gè)自旋揪罕,入隊(duì)
for (int spins = -1;;) {
WNode h;
if ((h = whead) == (p = wtail)) {//等待隊(duì)列為空
for (long m, s, ns;;) {
if ((m = (s = state) & ABITS) < RFULL ? //有可用資源
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
return ns;
else if (m >= WBIT) {
if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
--spins;//隨機(jī)遞減自旋數(shù)
}
else {
if (spins == 0) { //自旋結(jié)束,準(zhǔn)備進(jìn)入等待隊(duì)列
WNode nh = whead, np = wtail;
if ((nh == h && np == p) || (h = nh) != (p = np))
break;
}
spins = SPINS;
}
}
}
}
if (p == null) { // initialize queue
//初始化等待隊(duì)列
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
else if (node == null)
node = new WNode(RMODE, p);//創(chuàng)建新的節(jié)點(diǎn)
else if (h == p || p.mode != RMODE) {
//到這里說(shuō)明尾節(jié)點(diǎn)是寫線程
if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {//更新tail節(jié)點(diǎn)為當(dāng)前節(jié)點(diǎn)
p.next = node;
break;
}
}
else if (!U.compareAndSwapObject(p, WCOWAIT,
node.cowait = p.cowait, node))//到這里說(shuō)明尾節(jié)點(diǎn)是等待讀的節(jié)點(diǎn)宝泵,CAS把當(dāng)前節(jié)點(diǎn)(node節(jié)點(diǎn))轉(zhuǎn)移到p節(jié)點(diǎn)的cowait上
node.cowait = null;
else {
//當(dāng)前節(jié)點(diǎn)進(jìn)入等待隊(duì)列成功后的邏輯(當(dāng)前節(jié)點(diǎn)已被轉(zhuǎn)移到尾節(jié)點(diǎn)的cowait上)
for (;;) {
WNode pp, c; Thread w;
if ((h = whead) != null && (c = h.cowait) != null &&
U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null) // help release
U.unpark(w); //喚醒頭節(jié)點(diǎn)等待讀線程
if (h == (pp = p.prev) || h == p || pp == null) {//沒(méi)有前繼節(jié)點(diǎn)芯侥,可以嘗試喚醒當(dāng)前節(jié)點(diǎn)等待的線程
long m, s, ns;
do {
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s,
ns = s + RUNIT) : //獲取鎖
(m < WBIT &&
(ns = tryIncReaderOverflow(s)) != 0L))//讀鎖飽和洛心,嘗試增加額外的讀鎖數(shù)量暗甥,只有在讀鎖數(shù)=RFULL時(shí)才可以增加
return ns; //返回stamp
} while (m < WBIT);
}
//超時(shí)及中斷判斷邏輯
if (whead == h && p.prev == pp) {//檢查隊(duì)列是否穩(wěn)定
long time;
if (pp == null || h == p || p.status > 0) {
node = null; // throw away
break;
}
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, p, false);//超時(shí)梗醇,取消等待
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if ((h != pp || (state & ABITS) == WBIT) &&
whead == h && p.prev == pp)
U.park(false, time);//阻塞等待
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);//被中斷,取消等待
}
}
}
}
//第二個(gè)自旋闯捎,節(jié)點(diǎn)依次獲取鎖
for (int spins = -1;;) {
WNode h, np, pp; int ps;
if ((h = whead) == p) {//當(dāng)前節(jié)點(diǎn)是最后一個(gè)等待節(jié)點(diǎn)
if (spins < 0)
spins = HEAD_SPINS;//初始化自旋數(shù)
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
for (int k = spins;;) { // spin at head
long m, s, ns;
if ((m = (s = state) & ABITS) < RFULL ? //有可用資源
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
//獲取讀鎖成功椰弊,更新頭節(jié)點(diǎn)為當(dāng)前節(jié)點(diǎn)
WNode c; Thread w;
whead = node;
node.prev = null;
//依次喚醒當(dāng)前節(jié)點(diǎn)的cowait節(jié)點(diǎn)線程
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT,
c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
return ns;
}
else if (m >= WBIT &&
LockSupport.nextSecondarySeed() >= 0 && --k <= 0)//隨機(jī)遞減自旋次數(shù)
break;
}
}
else if (h != null) {
WNode c; Thread w;
while ((c = h.cowait) != null) {
//依次喚醒head節(jié)點(diǎn)的cowait節(jié)點(diǎn)線程
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
if (whead == h) {
//檢查隊(duì)列穩(wěn)定性
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {//尾節(jié)點(diǎn)取消,更新尾節(jié)點(diǎn)的前繼節(jié)點(diǎn)為p.prev瓤鼻,繼續(xù)自旋
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
//超時(shí)及中斷判斷邏輯
long time;
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
return cancelWaiter(node, node, false);//超時(shí)秉版,取消等待
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 &&
(p != h || (state & ABITS) == WBIT) &&
whead == h && node.prev == p)
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);//被中斷,取消等待
}
}
}
}
說(shuō)明:獲取讀鎖茬祷,代碼比較多清焕,但邏輯很簡(jiǎn)單,跟wirteLock()
差不多。詳細(xì)的流程這里就不細(xì)說(shuō)了秸妥,有興趣的同學(xué)可以參考筆者添加的注釋一步一步閱讀借卧。
如果有可用資源((state & ABITS) < RFULL
)則直接獲取讀鎖并返回stamp,否則調(diào)用acquireRead
等待鎖可用筛峭,acquireRead
函數(shù)執(zhí)行流程如下:
- 第一個(gè)自旋,使當(dāng)前線程進(jìn)入等待隊(duì)列的尾節(jié)點(diǎn)陪每。注意這里跟獲取寫鎖時(shí)的區(qū)別影晓,在獲取寫鎖時(shí),把當(dāng)前線程所在的節(jié)點(diǎn)直接放入隊(duì)尾檩禾;但是在獲取讀鎖時(shí)挂签,是把當(dāng)前線程所在的節(jié)點(diǎn)放入尾節(jié)點(diǎn)的cowait節(jié)點(diǎn)里。
- 第二個(gè)自旋盼产,節(jié)點(diǎn)依次獲取讀鎖饵婆。直到當(dāng)前線程所在節(jié)點(diǎn)的前繼節(jié)點(diǎn)(prev)為頭結(jié)點(diǎn)時(shí),如果有可用資源戏售,則說(shuō)明可以獲取鎖侨核,獲取成功返回stamp
- 如果在自旋中未能成功獲取到鎖,并且線程被中斷或者等待超時(shí)灌灾,則調(diào)用
cancelWaiter
方法取消節(jié)點(diǎn)的等待搓译。
2.3 tryOptimisticRead()
//獲取樂(lè)觀讀鎖
public long tryOptimisticRead() {
long s;
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
說(shuō)明:獲取樂(lè)觀讀鎖,取決于寫鎖的狀態(tài)锋喜,如果寫鎖空閑則獲取成功些己,并且不修改任何狀態(tài)值。函數(shù)比較簡(jiǎn)單嘿般,不多贅述段标。
2.4 cancelWaiter(WNode node, WNode group, boolean interrupted)
//取消給定節(jié)點(diǎn)
private long cancelWaiter(WNode node, WNode group, boolean interrupted) {
if (node != null && group != null) {
Thread w;
node.status = CANCELLED;//修改節(jié)點(diǎn)狀態(tài)
// unsplice cancelled nodes from group
//依次解除已經(jīng)取消的cowait節(jié)點(diǎn)的鏈接
for (WNode p = group, q; (q = p.cowait) != null;) {
if (q.status == CANCELLED) {
U.compareAndSwapObject(p, WCOWAIT, q, q.cowait);
p = group; // restart
}
else
p = q;
}
if (group == node) {
//依次喚醒節(jié)點(diǎn)上的未取消的cowait節(jié)點(diǎn)線程
for (WNode r = group.cowait; r != null; r = r.cowait) {
if ((w = r.thread) != null)
U.unpark(w); // wake up uncancelled co-waiters
}
//
for (WNode pred = node.prev; pred != null; ) { // unsplice
WNode succ, pp; // find valid successor
while ((succ = node.next) == null ||
succ.status == CANCELLED) { //后繼節(jié)點(diǎn)為空或者已經(jīng)取消,則去查找一個(gè)有效的后繼節(jié)點(diǎn)
WNode q = null; // find successor the slow way
//從尾節(jié)點(diǎn)開(kāi)始往前查找距離node節(jié)點(diǎn)最近的一個(gè)有效節(jié)點(diǎn)q
for (WNode t = wtail; t != null && t != node; t = t.prev)
if (t.status != CANCELLED)
q = t; // don't link if succ cancelled
if (succ == q || // ensure accurate successor
//運(yùn)行到這里說(shuō)明從node到“距離node最近的一個(gè)有效節(jié)點(diǎn)q”之間可能存在已經(jīng)取消的節(jié)點(diǎn)
// CAS替換node的后繼節(jié)點(diǎn)為“距離node最近的一個(gè)有效節(jié)點(diǎn)”炉奴,也就是說(shuō)解除了“所有已經(jīng)取消但是還存在在鏈表上的無(wú)效節(jié)點(diǎn)”的鏈接
U.compareAndSwapObject(node, WNEXT,
succ, succ = q)) {
if (succ == null && node == wtail) {
//運(yùn)行到這里說(shuō)明node為尾節(jié)點(diǎn)逼庞,
//利用CAS先修改尾節(jié)點(diǎn)為node的前繼有效節(jié)點(diǎn),后面再解除node的鏈接
U.compareAndSwapObject(this, WTAIL, node, pred);
}
break;
}
}
//解除node節(jié)點(diǎn)的鏈接
if (pred.next == node) // unsplice pred link
U.compareAndSwapObject(pred, WNEXT, node, succ);
//喚醒后繼節(jié)點(diǎn)的線程
if (succ != null && (w = succ.thread) != null) {
succ.thread = null;
U.unpark(w); // wake up succ to observe new pred
}
//如果前繼節(jié)點(diǎn)已經(jīng)取消盆佣,向前查找一個(gè)有效節(jié)點(diǎn)繼續(xù)循環(huán)往堡,如果這個(gè)節(jié)點(diǎn)為空則直接跳出循環(huán)
if (pred.status != CANCELLED || (pp = pred.prev) == null)
break;
node.prev = pp; // repeat if new pred wrong/cancelled
U.compareAndSwapObject(pp, WNEXT, pred, succ);
pred = pp;
}
}
}
//檢查是否可喚醒head節(jié)點(diǎn)的后繼節(jié)點(diǎn)線程
WNode h; // Possibly release first waiter
while ((h = whead) != null) {
long s; WNode q; // similar to release() but check eligibility
if ((q = h.next) == null || q.status == CANCELLED) {
//從尾節(jié)點(diǎn)向前查找一個(gè)未取消的節(jié)點(diǎn),作為頭節(jié)點(diǎn)的next節(jié)點(diǎn)
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
if (h == whead) {
if (q != null && h.status == 0 &&
((s = state) & ABITS) != WBIT && // waiter is eligible
(s == 0L || q.mode == RMODE))//鎖可用共耍,或者后繼節(jié)點(diǎn)是讀線程
release(h);//可以喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)線程
break;
}
}
return (interrupted || Thread.interrupted()) ? INTERRUPTED : 0L;
}
說(shuō)明:如果節(jié)點(diǎn)線程被中斷或者等待超時(shí)虑灰,需要取消節(jié)點(diǎn)的鏈接。大概的操作就是首先修改節(jié)點(diǎn)為取消狀態(tài)痹兜,然后解除它在等待隊(duì)列中的鏈接穆咐,并且喚醒節(jié)點(diǎn)上所有等待讀的線程(也就是cowait節(jié)點(diǎn));最后如果鎖可用,幫助喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)的線程对湃。其實(shí)也是AQS中取消獲取鎖方法的一種變體(詳見(jiàn)AQS篇)崖叫。
重點(diǎn)介紹一下cancelWaiter
的前兩個(gè)參數(shù)node
和group
:
- 如果
node!=group
,說(shuō)明node節(jié)點(diǎn)是group節(jié)點(diǎn)上的一個(gè)cowait節(jié)點(diǎn)(如果不明白請(qǐng)見(jiàn)上面代碼中對(duì)acquireRead
方法中的U.compareAndSwapObject(p, WCOWAIT,node.cowait = p.cowait, node)
這一行代碼的注釋)拍柒,這種情況下首先修改node節(jié)點(diǎn)的狀態(tài)(node.status = CANCELLED
)心傀,然后直接操作group節(jié)點(diǎn),依次解除group節(jié)點(diǎn)上已經(jīng)取消的cowait節(jié)點(diǎn)的鏈接拆讯。最后如果鎖可用脂男,幫助喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)的線程。 - 如果
node==group
种呐,說(shuō)明在node節(jié)點(diǎn)之前的節(jié)點(diǎn)為寫線程節(jié)點(diǎn)宰翅,這時(shí)需要進(jìn)行以下操作:
a) 依次喚醒node節(jié)點(diǎn)上的未取消的cowait節(jié)點(diǎn)線程
b) 解除node節(jié)點(diǎn)和一段節(jié)點(diǎn)(node節(jié)點(diǎn)到“距離node最近的一個(gè)有效節(jié)點(diǎn)”)的鏈接
c) 最后如果鎖可用,幫助喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)的線程爽室。
小結(jié)
關(guān)于StampedLock的其他方法汁讼,由于實(shí)現(xiàn)方式與我們已經(jīng)講述的方法大同小異,而且也相對(duì)比較簡(jiǎn)單阔墩,這里就不在詳細(xì)介紹了嘿架,有興趣的同學(xué)可以自行下載源碼參照本篇的源碼解析進(jìn)行查閱。