一震蒋、寫在前面
上篇給大家聊了獨(dú)占式的源碼十艾,具體參見《J.U.C|AQS獨(dú)占式源碼分析》
這一章我們繼續(xù)在AQS的源碼世界中遨游,解讀共享式同步狀態(tài)的獲取和釋放抢肛。
二、什么是共享式
共享式與獨(dú)占式唯一的區(qū)別是在于同一時刻可以有多個線程獲取到同步狀態(tài)碳柱。
我們以讀寫鎖為例來看兩者,一個線程在對一個資源文件進(jìn)行讀操作時熬芜,那么這一時刻對于文件的寫操作均被阻塞莲镣,而其它線程的讀操作可以同時進(jìn)行。
當(dāng)寫操作要求對資源獨(dú)占操作涎拉,而讀操作可以是共享的瑞侮,兩種不同的操作對同一資源進(jìn)行操作會是什么樣的?看下圖
共享式訪問資源鼓拧,其他共享時均被允許,而獨(dú)占式被阻塞。
獨(dú)占式訪問資源時跌前,其它訪問均被阻塞雀瓢。
通過讀寫鎖給大家一起溫故下獨(dú)占式和共享式概念,上一節(jié)我們已經(jīng)聊過獨(dú)占式酌住,本章我們主要聊共享式店归。
主要講解方法
- protected int tryAcquireShared(int arg);共享式獲取同步狀態(tài)酪我,返回值 >= 0 表示獲取成功消痛,反之則失敗。
- protected boolean tryReleaseShared(int arg): 共享式釋放同步狀態(tài)都哭。
三秩伞、核心方法分析
3.1 同步狀態(tài)的獲取
public final void acquireShared(int arg)
共享式獲取同步狀態(tài)的頂級入口,如果當(dāng)前線程為獲取到同步狀態(tài)欺矫,將會加入到同步隊列中等待纱新,與獨(dú)占式唯一的區(qū)別是在于同一時刻可以有多個線程獲取到同步狀態(tài)
方法源碼
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
方法函數(shù)解析
- tryAcquireShared(arg):獲取同步狀態(tài),返回值大于等于0表示獲取成功汇陆,否則失敗怒炸。
- doAcquireShared(arg):共享式獲取共享狀態(tài),包含構(gòu)建節(jié)點(diǎn)毡代,加入隊列等待阅羹,喚醒節(jié)點(diǎn)等操作勺疼。
源碼分析
同步器的 acquireShared 和 doAcquireShared 方法
//請求共享鎖的入口
public final void acquireShared(int arg) {
// 當(dāng)state != 0 并且tryAcquireShared(arg) < 0 時才去才獲取資源
if (tryAcquireShared(arg) < 0)
// 獲取鎖
doAcquireShared(arg);
}
// 以共享不可中斷模式獲取鎖
private void doAcquireShared(int arg) {
// 將當(dāng)前線程一共享方式構(gòu)建成 node 節(jié)點(diǎn)并將其加入到同步隊列的尾部。這里addWaiter(Node.SHARED)操作和獨(dú)占式基本一樣捏鱼,
final Node node = addWaiter(Node.SHARED);
// 是否成功標(biāo)記
boolean failed = true;
try {
// 等待過程是否被中斷標(biāo)記
boolean interrupted = false;
自旋
for (;;) {
// 獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
// 判斷前驅(qū)節(jié)點(diǎn)是否是head節(jié)點(diǎn)执庐,也就是看自己是不是老二節(jié)點(diǎn)
if (p == head) {
// 如果自己是老二節(jié)點(diǎn),嘗試獲取資源鎖,返回三種狀態(tài)
// state < 0 : 表示獲取資源失敗
// state = 0: 表示當(dāng)前正好線程獲取到資源导梆, 此時不需要進(jìn)行向后繼節(jié)點(diǎn)傳播轨淌。
// state > 0: 表示當(dāng)前線程獲取資源鎖后,還有多余的資源看尼,需要向后繼節(jié)點(diǎn)繼續(xù)傳播递鹉,獲取資源。
int r = tryAcquireShared(arg);
// 獲取資源成功
if (r >= 0) {
// 當(dāng)前節(jié)點(diǎn)線程獲取資源成功后藏斩,對后繼節(jié)點(diǎn)進(jìn)行邏輯操作
setHeadAndPropagate(node, r);
// setHeadAndPropagate(node, r) 已經(jīng)對node.prev = null,在這有對p.next = null; 等待GC進(jìn)行垃圾收集躏结。
p.next = null; // help GC
// 如果等待過程被中斷了, 將中斷給補(bǔ)上狰域。
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 判斷狀態(tài)媳拴,尋找安全點(diǎn),進(jìn)入waiting狀態(tài)兆览,等著被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在acquireShared(int arg)方法中屈溉,同步器調(diào)用tryAcquireShared(arg)方法獲取同步狀態(tài),返回同步狀態(tài)有兩種抬探。
當(dāng)同步狀態(tài)大于等于0時: 表示可以獲取到同步狀態(tài)子巾,退出自旋,在doAcquireShared(int arg)方法中可以看到節(jié)點(diǎn)獲取資源退出自旋的條件就是大于等于0
小于0會加入同步隊列中等待被喚醒驶睦。
addWaiter和enq方法
// 創(chuàng)建節(jié)點(diǎn)砰左,并將節(jié)點(diǎn)加入到同步隊列尾部中。
private Node addWaiter(Node mode) {
// 以共享方式為線程構(gòu)建Node節(jié)點(diǎn)
Node node = new Node(Thread.currentThread(), mode);
// 嘗試快速加入到隊列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
// CAS保證原子操作场航,將node節(jié)點(diǎn)加入到隊列尾部
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 快速加入失敗缠导,走 enq(node)方法
enq(node);
return node;
}
//以自旋的方式,將node節(jié)點(diǎn)加入到隊列的尾部
private Node enq(final Node node) {
// 自旋
for (;;) {
// 獲取尾部節(jié)點(diǎn)
Node t = tail;
// 如果tail節(jié)點(diǎn)為空溉痢, 說明同步隊列還沒初始化僻造,必須先進(jìn)行初始化
if (t == null) { // Must initialize
// CAS保證原子操作, 新建一個空 node 節(jié)點(diǎn)并將其設(shè)置為head節(jié)點(diǎn)
if (compareAndSetHead(new Node()))
// 設(shè)置成功并將tail也指向該節(jié)點(diǎn)
tail = head;
} else {
// 將node節(jié)點(diǎn)加入到隊列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
這兩個方法和獨(dú)占式的基本相同孩饼,注釋中都表明了髓削,在這就不多做解釋了。
獲取資源成功后對后繼節(jié)點(diǎn)的操作setHeadAndPropagate方法
private void setHeadAndPropagate(Node node, int propagate) {
// 記錄老的head節(jié)點(diǎn)镀娶,以便核對
Node h = head; // Record old head for check below
// 將node 設(shè)置成head節(jié)點(diǎn)
setHead(node);
// 這里表示: 如果資源足夠(propagate > 0)或者舊頭節(jié)點(diǎn)為空(h == null)或者舊節(jié)點(diǎn)的waitStatus為 SIGNAL(-1) 或者 PROPAGATE(-3)(h.waitStatus < 0)
// 或者當(dāng)前head節(jié)點(diǎn)不為空或者waitStatus為SIGNAL(-1) 或者 PROPAGATE(-3)立膛,此時需要繼續(xù)喚醒后繼節(jié)點(diǎn)來嘗試獲取資源。
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 當(dāng)前node節(jié)點(diǎn)的后繼節(jié)點(diǎn)
Node s = node.next;
//如果后節(jié)點(diǎn)為空或者屬于共享節(jié)點(diǎn)
if (s == null || s.isShared())
// 繼續(xù)嘗試獲取資源
doReleaseShared();
}
}
首先將當(dāng)前節(jié)點(diǎn)設(shè)置為head節(jié)點(diǎn) setHead(node), 其次根據(jù)條件看是否對后繼節(jié)點(diǎn)繼續(xù)喚醒宝泵。
獲取資源失敗進(jìn)行阻塞等待unpark
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取前驅(qū)節(jié)點(diǎn)的等待狀態(tài)
int ws = pred.waitStatus;
// 如果等待狀態(tài)已經(jīng)為SIGNAL(表示當(dāng)前當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)處于等待狀態(tài)好啰,如果當(dāng)前節(jié)點(diǎn)釋放了同步狀態(tài)或者被中斷, 則會喚醒后繼節(jié)點(diǎn))
if (ws == Node.SIGNAL)
// 直接返回儿奶,表示可以安心的去休息了
return true;
// 如果前驅(qū)的節(jié)點(diǎn)的狀態(tài) ws > 0(表示該節(jié)點(diǎn)已經(jīng)被取消或者中斷框往,也就是成無效節(jié)點(diǎn),需要從同步隊列中取消的)
if (ws > 0) {
// 循環(huán)往前需尋找闯捎,知道尋找到一個有效的安全點(diǎn)(一個等待狀態(tài)<= 0 的節(jié)點(diǎn)椰弊,排在它后面)
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 注意這一波操作后,獲獎取消的節(jié)點(diǎn)全部變成GC可回收的廢棄鏈瓤鼻。
pred.next = node;
} else {
//如果前驅(qū)正常秉版,那就把前驅(qū)的狀態(tài)設(shè)置成SIGNAL,告訴它獲取資源后通知自己一下茬祷。有可能失敗沐飘,人家說不定剛剛釋放完呢!
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 調(diào)用park方法使當(dāng)前節(jié)點(diǎn)的線程進(jìn)入waiting
LockSupport.park(this);
//返回線程中斷狀態(tài)
return Thread.interrupted();
}
這兩個方法和獨(dú)占式基本相同牲迫。
接著看doReleaseShared 這個比較復(fù)雜
private void doReleaseShared() {
//注意,這里的頭結(jié)點(diǎn)已經(jīng)是上面新設(shè)定的頭結(jié)點(diǎn)了,從這里可以看出,如果propagate=0,
//不會進(jìn)入doReleaseShared方法里面,那就有共享式變成了獨(dú)占式
for (;;) { // 死循環(huán)以防在執(zhí)行此操作時添加新節(jié)點(diǎn):退出條件 h == head
Node h = head;
// 前提條件,當(dāng)前的頭節(jié)點(diǎn)不為空借卧,并且不是尾節(jié)點(diǎn)
if (h != null && h != tail) {
// 當(dāng)前頭節(jié)點(diǎn)的等待狀態(tài)
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 如果當(dāng)前節(jié)點(diǎn)的狀態(tài)為SIGNAL盹憎,則利用CAS將其狀態(tài)設(shè)置為0(也就是初始狀態(tài))
//這里不直接設(shè)為Node.PROPAGATE,是因為unparkSuccessor(h)中,如果ws < 0會設(shè)置為0铐刘,所以ws先設(shè)置為0陪每,再設(shè)置為PROPAGATE
//這里需要控制并發(fā),因為入口有setHeadAndPropagate跟release兩個镰吵,避免兩次unpark
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases 設(shè)置失敗檩禾,重新循環(huán)
// 喚醒后繼節(jié)點(diǎn)
unparkSuccessor(h);
}
// 如果等待狀態(tài)不為0 則利用CAS將其狀態(tài)設(shè)置為PROPAGATE ,以確保在釋放資源時能夠繼續(xù)通知后繼節(jié)點(diǎn)疤祭。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed 如果head 期間發(fā)生了改變盼产,則需要從新循壞
break;
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 在此再次判斷當(dāng)前頭節(jié)點(diǎn)的的狀態(tài),如果小于0 將設(shè)置為0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//獲取后繼節(jié)點(diǎn)
Node s = node.next;
if (s == null || s.waitStatus > 0) {
//如果后繼節(jié)點(diǎn)為空或者等待狀態(tài)大于0 直接放棄勺馆。
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
// 循環(huán)從尾部往前尋找下一個等待狀態(tài)不大于0的節(jié)點(diǎn)
if (t.waitStatus <= 0)
s = t;
}
// 喚醒該節(jié)點(diǎn)的線程
if (s != null)
LockSupport.unpark(s.thread);
}
最后一步釋放資源就比較簡單了戏售。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
總結(jié)
在獲取同步狀態(tài)時,同步器維護(hù)一個同步隊列草穆,獲取狀態(tài)失敗的線程會加入到隊列中并進(jìn)行自旋灌灾,出列的(或者停止自旋)的條件時前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)并且成功獲取了同步狀態(tài)。在釋放同步狀態(tài)時悲柱,調(diào)用Release方法釋放同步狀態(tài)锋喜,然后喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)。
共享式方式在喚醒后繼節(jié)點(diǎn)獲得資源后會判斷當(dāng)前資源是否還有多余的豌鸡,如果有會繼續(xù)喚醒下一個節(jié)點(diǎn)嘿般。