獲取獨(dú)占式同步狀態(tài)
acquire
/**
* 獨(dú)占式獲取同步狀態(tài),如果當(dāng)前線程獲取同步狀態(tài)成功則直接返回绅这,
* 如果獲取失敗則線程阻塞柑司,并插入同步隊(duì)列進(jìn)行.等待調(diào)用release
* 釋放同步狀態(tài)時(shí),重新嘗試獲取同步狀態(tài)宏侍。成功則返回,失敗則阻塞等待下次release
*/
public final void acquire(int arg) {
/**
*子類實(shí)現(xiàn)tryAcquire能否獲取的獨(dú)占式同步狀態(tài)
*如果返回true則獲取同步狀態(tài)成功方法直接返回
*如果返回false則獲取同步狀態(tài)失敗進(jìn)入if語(yǔ)句
*/
if (!tryAcquire(arg) &&
/** addWaiter創(chuàng)建一個(gè)獨(dú)占式節(jié)點(diǎn)node,添加到同步隊(duì)列尾部. */
/** acquireQueued自旋,同步隊(duì)列頭部后置第一個(gè)節(jié)點(diǎn)線程嘗試獲取同步狀態(tài),成功則設(shè)置其為head節(jié)點(diǎn).失敗則阻塞 */
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter
創(chuàng)建節(jié)點(diǎn)node(類型是獨(dú)占式),如果同步隊(duì)列初始化則將當(dāng)前節(jié)點(diǎn)添加到同步隊(duì)列尾部,如果同步隊(duì)列沒(méi)有初始化則需要調(diào)用子函數(shù)enq創(chuàng)建一個(gè)同步隊(duì)列并將當(dāng)前節(jié)點(diǎn)添加到同步隊(duì)列尾部.
/** 獲取同步狀態(tài)失敗,添加節(jié)點(diǎn)到同步隊(duì)列尾部 **/
private Node addWaiter(Node mode) {
// 1. 將當(dāng)前線程構(gòu)建成Node
Node node = new Node(Thread.currentThread(), mode);
// 2. 判斷尾節(jié)點(diǎn)是否為null,如果為null說(shuō)明同步隊(duì)列未初始化
Node pred = tail;
if (pred != null) {
// 2.2 將當(dāng)前節(jié)點(diǎn)插入同步隊(duì)列尾部
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 2.1. 當(dāng)前同步隊(duì)列尾節(jié)點(diǎn)為null骨杂,說(shuō)明CLH同步隊(duì)列未初始化,調(diào)用enq
enq(node);
return node;
}
pred != null 邏輯圖如下天通,將當(dāng)前節(jié)點(diǎn)插入同步隊(duì)列尾部(Thread 3表示當(dāng)前節(jié)點(diǎn))
pred == null 邏輯圖如下,進(jìn)入enq
enq
初始化開(kāi)始時(shí)一個(gè)自旋,首先會(huì)獲取尾部node,并判斷是否為null,如果為null說(shuō)明同步隊(duì)列需要初始化,進(jìn)入if語(yǔ)句創(chuàng)建一個(gè)空node節(jié)點(diǎn),讓首部和尾部節(jié)點(diǎn)都指向這個(gè)空節(jié)點(diǎn).完成后重新進(jìn)入自旋.此時(shí)按照之前判斷尾部節(jié)點(diǎn)存在.我們會(huì)進(jìn)入else語(yǔ)句將當(dāng)前添加節(jié)點(diǎn)設(shè)置為tail節(jié)點(diǎn).
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//1. 構(gòu)造頭節(jié)點(diǎn)尾節(jié)點(diǎn)指向一個(gè)空節(jié)點(diǎn)
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 2. node節(jié)點(diǎn)插入同步隊(duì)列尾逼争,CAS操作失敗自旋嘗試
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
第一次循環(huán)進(jìn)入if
第二次循環(huán)進(jìn)入else,并return環(huán)跳出了這個(gè)自懸循環(huán)體系
acquireQueued
進(jìn)入自旋,找到CLH頭部后置第一個(gè)節(jié)點(diǎn)优床,嘗試獲取同步狀態(tài),成功則設(shè)置其為新head節(jié)點(diǎn).失敗則阻塞.
總結(jié)流程:
- 1 進(jìn)入自旋
- 2 判斷當(dāng)前節(jié)點(diǎn)前置節(jié)點(diǎn)是否為head節(jié)點(diǎn),如果是嘗試調(diào)用tryAcquire獲取同步狀態(tài)
- 3 獲取同步狀態(tài)成功誓焦,設(shè)置當(dāng)前節(jié)點(diǎn)為head節(jié)點(diǎn),并釋放當(dāng)前節(jié)點(diǎn)前置節(jié)點(diǎn)的指針,retun跳出自旋.
- 4 獲取同步狀態(tài)失敗,獲取同步狀態(tài)失敗,就將當(dāng)前節(jié)點(diǎn)和前驅(qū)節(jié)點(diǎn)作為參數(shù)交給shouldParkAfterFailedAcquire調(diào)用胆敞,shouldParkAfterFailedAcquire設(shè)置要當(dāng)前節(jié)點(diǎn)前驅(qū)節(jié)點(diǎn)node等待狀態(tài)到-1返回false,自旋第二次進(jìn)入時(shí)返回true,到parkAndCheckInterrupt()阻塞當(dāng)前線程杂伟。
- 如果節(jié)點(diǎn)線程被從阻塞中喚醒(可能是線程中斷移层,也可能是當(dāng)前節(jié)點(diǎn)是同步隊(duì)列后第一個(gè)節(jié)點(diǎn)。鎖被前置節(jié)點(diǎn)釋放同步狀態(tài)而喚醒)赫粥,重新進(jìn)入自旋步驟1观话。
/**
* 自旋,找到同步隊(duì)列頭部后置第一個(gè)節(jié)點(diǎn),嘗試獲取同步狀態(tài),成功則設(shè)置其為新head節(jié)點(diǎn).失敗則阻塞.
*/
final boolean acquireQueued(final Node node, int arg) {
/** 執(zhí)行是否失敗 **/
boolean failed = true;
try {
/** 標(biāo)識(shí)是否被中斷 **/
boolean interrupted = false;
/** 進(jìn)入自旋 **/
for (;;) {
/** 1. 獲得當(dāng)前節(jié)點(diǎn)的先驅(qū)節(jié)點(diǎn) **/
final Node p = node.predecessor();
/** 如果當(dāng)前節(jié)點(diǎn)的先驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)并且成功獲取同步狀態(tài)越平,即可以獲得獨(dú)占式鎖 **/
if (p == head && tryAcquire(arg)) {
/** 并將當(dāng)前節(jié)點(diǎn)設(shè)置為head節(jié)點(diǎn) **/
setHead(node);
/** 釋放當(dāng)前節(jié)前驅(qū)節(jié)點(diǎn)指針(這里前驅(qū)節(jié)點(diǎn)也相當(dāng)于原始的head節(jié)點(diǎn))等待GC回收 **/
p.next = null; // help GC
failed = false;
return interrupted;
}
/** 2.2 獲取鎖失敗频蛔,線程阻塞(可響應(yīng)線程被中斷), 如果是中斷響應(yīng)設(shè)置interrupted = true;*
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
/** 發(fā)生異常失敗 **/
if (failed)
/** 設(shè)置當(dāng)前節(jié)點(diǎn)狀態(tài)為消息 **/
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
參數(shù)
- 當(dāng)前節(jié)點(diǎn)和前驅(qū)節(jié)點(diǎn)
返回
- true 調(diào)用parkAndCheckInterrupt()阻塞當(dāng)前線程
- false 則重新進(jìn)入acquireQueued自旋
邏輯
1 如果發(fā)現(xiàn)前置節(jié)點(diǎn)等待狀態(tài)waitStatus==1則說(shuō)明此客戶不在等待剔除出隊(duì)列灵迫,返回false,在外層自懸循環(huán) 中從node重新向前開(kāi)始查找。
2 如果發(fā)現(xiàn)前置節(jié)點(diǎn)等待狀態(tài)waitStatus==-1 返回true 調(diào)用parkAndCheckInterrupt()阻塞當(dāng)前線程
3 如果發(fā)現(xiàn)前置節(jié)點(diǎn)等待狀態(tài)waitStatu==0設(shè)置為waitStatus=-1晦溪,返回false,則重新進(jìn)入acquireQueued自旋
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//前置節(jié)點(diǎn)狀態(tài)為-1 返回true 準(zhǔn)備直塞當(dāng)前節(jié)點(diǎn)線程
if (ws == Node.SIGNAL)
return true;
//前置節(jié)點(diǎn)狀態(tài)為 1瀑粥,剔除出隊(duì)列,在外層自懸循環(huán) 中從新開(kāi)始查找 返回false
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
//前置節(jié)點(diǎn)狀態(tài)為 0尼变,設(shè)置為 -1 在外層自懸循環(huán) 中從新開(kāi)始查找 返回false
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
第一次循環(huán)
是第3種情況利凑,并且這個(gè)方法運(yùn)行后返回false。會(huì)當(dāng)前節(jié)點(diǎn)(一般式尾節(jié)點(diǎn))的前置節(jié)點(diǎn)等待狀態(tài)由0變成了-1
如果添加節(jié)點(diǎn)并不是第一個(gè)等待的節(jié)點(diǎn)(thread2)
如果添加節(jié)點(diǎn)并是第一個(gè)等待的節(jié)點(diǎn)(thread3)
第二次循環(huán)
是第2種情況 嫌术,返回true開(kāi)始阻塞線程哀澈,并在方法返回告知是否是中斷導(dǎo)致線程喚醒
parkAndCheckInterrupt
阻塞當(dāng)前線程(可響應(yīng)中斷),返回true 表示中斷導(dǎo)致線程阻塞被喚醒
/**
* 阻塞當(dāng)前線程(可響應(yīng)中斷)度气,
* 返回true 表示中斷導(dǎo)致線程阻塞被喚醒
*/
private final boolean parkAndCheckInterrupt() {
/** 阻塞當(dāng)前線程(可響應(yīng)中斷)**/
LockSupport.park(this);
/** 如果線程是中斷從阻塞喚醒返回true **/
return Thread.interrupted();
}
整體流程圖
釋放獨(dú)占式同步狀態(tài)
入口函數(shù) release
/**
* 釋放獨(dú)占式同步入口函數(shù)割按,
* 參數(shù)arg傳遞給模板方法用來(lái)判斷釋放同步狀態(tài)
*
* 釋放同步狀態(tài)會(huì)釋放Head節(jié)點(diǎn)后置節(jié)點(diǎn)中線程從阻塞狀態(tài)中喚醒
*/
public final boolean release(int arg) {
/**
*子類實(shí)現(xiàn)能否釋放的獨(dú)占式同步狀態(tài)
*如果返回true則表示釋放同步狀態(tài)準(zhǔn)入條件成功進(jìn)入if語(yǔ)句
*如果返回false則表示釋放同步狀態(tài)失敗返回false
*/
if (tryRelease(arg)) {
/** 判斷同步隊(duì)列是否存在等待節(jié)點(diǎn) **/
Node h = head;
if (h != null && h.waitStatus != 0)
/**
* 更新CLH同步隊(duì)列Head節(jié)點(diǎn)的等待狀態(tài),將Head節(jié)點(diǎn)后置節(jié)點(diǎn)中線程從阻塞狀態(tài)中喚醒
*/
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccessor
更新CLH同步隊(duì)列Head節(jié)點(diǎn)的等待狀態(tài)磷籍,將Head節(jié)點(diǎn)后置節(jié)點(diǎn)中線程從阻塞狀態(tài)中喚醒
/**
* 更新CLH同步隊(duì)列Head節(jié)點(diǎn)的等待狀態(tài)适荣,將Head節(jié)點(diǎn)后置節(jié)點(diǎn)中線程從阻塞狀態(tài)中喚醒
*/
private void unparkSuccessor(Node node) {
/**
* 將Head節(jié)點(diǎn)的等待狀態(tài)設(shè)置為0
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/** 默認(rèn)情況下釋放的節(jié)點(diǎn)為head節(jié)點(diǎn)后置節(jié)點(diǎn)s. **/
/**
* 如果head節(jié)點(diǎn)后置節(jié)點(diǎn)等待狀態(tài)為1(取消),從尾節(jié)點(diǎn)開(kāi)始遍歷尋找最接近head節(jié)點(diǎn)等待狀態(tài)為-1的節(jié)點(diǎn)作為釋放節(jié)點(diǎn)s
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
/** 喚醒s節(jié)點(diǎn)中線程阻塞,被喚醒節(jié)點(diǎn)線程重新進(jìn)入acquireQueued自旋嘗試獲取同步狀態(tài) **/
if (s != null)
LockSupport.unpark(s.thread);
}
第一步:獲取head節(jié)點(diǎn)的waitStatus院领,如果小于0弛矛,就通過(guò)CAS操作將head節(jié)點(diǎn)的waitStatus修改為0,現(xiàn)在是:
第二步:尋找head節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)比然,如果這個(gè)節(jié)點(diǎn)的waitStatus小于0丈氓,就喚醒這個(gè)節(jié)點(diǎn),否則遍歷下去强法,找到第一個(gè)waitStatus<=0的節(jié)點(diǎn)万俗,并喚醒。
第三步: 被喚醒節(jié)點(diǎn)線程重新進(jìn)入acquireQueued自旋嘗試獲取同步狀態(tài),如果成功則設(shè)置當(dāng)前節(jié)點(diǎn)為head節(jié)點(diǎn).原始head節(jié)點(diǎn)出隊(duì).
整體圖
總結(jié)
獨(dú)占式同步可以看作是一把獨(dú)占鎖饮怯,每次僅僅只能由一個(gè)程序獲的鎖闰歪。
總體流程如下
嘗試鎖失敗 --> 進(jìn)入等待隊(duì)列排隊(duì) --> 阻塞當(dāng)前線程 --> 當(dāng)?shù)却?duì)列排到自己被喚醒 --> 嘗試獲取鎖(可能被其他線程插隊(duì)而導(dǎo)致獲取鎖失敗,失敗在次阻塞蓖墅,等待下次排到自己) --> 執(zhí)行自己的業(yè)務(wù)邏輯 --> 嘗試釋放鎖--> 成功通知等待隊(duì)列前面共享節(jié)點(diǎn)線程從阻塞中喚醒库倘。
子類可以擴(kuò)展如何獲取鎖,如何釋放鎖置媳,但整體流程不變于樟。
白話
我們舉一個(gè)生活中例子來(lái)看獨(dú)占式同步,我們都去餐廳吃過(guò)飯拇囊,如果去餐廳吃飯就時(shí)我們要執(zhí)行業(yè)務(wù)迂曲,要進(jìn)入餐廳吃飯的前提時(shí)要獲取同步狀態(tài),如果餐廳只有一個(gè)位置寥袭,那么就獨(dú)占式同步路捧,因?yàn)橥瑫r(shí)只能一個(gè)進(jìn)入餐廳吃飯关霸。如果餐廳由多個(gè)位置,那么就是共享式同步杰扫,因?yàn)橥蕉鄠€(gè)人進(jìn)入餐廳吃飯队寇。當(dāng)然能不能進(jìn)入邏輯時(shí)餐廳這個(gè)AQS子類去定制的。
我們來(lái)看獨(dú)占式例子:
客人A進(jìn)入餐廳吃飯章姓,餐廳此時(shí)沒(méi)有人佳遣,他可以順利進(jìn)入餐廳吃飯。
客戶B也進(jìn)入餐廳吃飯凡伊,餐廳此時(shí)被A占著零渐,嘗試進(jìn)入餐廳(子類實(shí)現(xiàn)邏輯判斷),進(jìn)入餐廳失敗
餐廳給B指定一個(gè)排隊(duì)編號(hào)系忙,此時(shí)B想插隊(duì)诵盼,在次嘗試進(jìn)入餐廳(子類實(shí)現(xiàn)邏輯判斷),A還沒(méi)有出來(lái)银还,進(jìn)入餐廳失敗
客戶C也進(jìn)入餐廳吃飯风宁,餐廳此時(shí)被A占著嘗試進(jìn)入餐廳(子類實(shí)現(xiàn)邏輯判斷),進(jìn)入餐廳失敗蛹疯,
餐廳給C指定一個(gè)排隊(duì)編號(hào)(加入同步隊(duì)列)戒财,此時(shí)C也想插隊(duì),在次嘗試進(jìn)入餐廳(子類實(shí)現(xiàn)邏輯判斷)捺弦,A剛好出來(lái)固翰,餐廳還沒(méi)有來(lái)得及通知B,C 進(jìn)入餐廳吃飯
C 吃飯完畢離開(kāi)餐廳羹呵,嘗試離開(kāi)餐廳(子類實(shí)現(xiàn)邏輯判斷),成功餐廳通知B(喚醒B線程)疗琉,B嘗試進(jìn)入餐廳(可能還會(huì)被插隊(duì))冈欢,成功進(jìn)入,失敗則在次等著(在次阻塞)
實(shí)現(xiàn)
實(shí)現(xiàn)一個(gè)AQS獨(dú)占同步盈简,需要繼承AbstractQueuedSynchronizer凑耻,并重寫(xiě)tryAcquire,tryRelease柠贤,最常見(jiàn)的就是ReentrantLock香浩,ReentrantLock內(nèi)部定義了一個(gè)AbstractQueuedSynchronizer實(shí)現(xiàn)AQS內(nèi)部類,通過(guò)同步狀態(tài)來(lái)控制獲取獨(dú)占鎖臼勉,其中0表示未占用邻吭,1表示已占用,當(dāng)tryAcquire會(huì)使用CAS將同步狀態(tài)設(shè)置為1宴霸,tryRelease會(huì)使用CAS將同步狀態(tài)設(shè)置0.保證同時(shí)只能一個(gè)線程占用鎖囱晴。