[toc]
- Posted by 微博@Yangsc_o
- 原創(chuàng)文章,版權(quán)聲明:自由轉(zhuǎn)載-非商用-非衍生-保持署名 | Creative Commons BY-NC-ND 3.0
摘要
本文通過ReentrantLock來窺探AbstractQueuedSynchronizer(AQS)的實現(xiàn)原理拒名,在看此文之前噩咪。你需要了解一下park顾彰、unpark的功能,請移步至上一篇《深入剖析park胃碾、unpark》涨享;
AbstractQueuedSynchronizer實現(xiàn)一把鎖
根據(jù)AbstractQueuedSynchronizer的官方文檔,如果想實現(xiàn)一把鎖的仆百,需要繼承AbstractQueuedSynchronizer厕隧,并需要重寫tryAcquire、tryRelease、可選擇重寫isHeldExclusively提供locked state吁讨、因為支持序列化髓迎,所以需要重寫readObject以便反序列化時恢復(fù)原始值、newCondition提供條件建丧;官方提供的java代碼如下(官方文檔見參考連接)排龄;
public class MyLock implements Lock, java.io.Serializable {
private static class Sync extends AbstractQueuedSynchronizer {
// Acquires the lock if state is zero
@Override
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Releases the lock by setting state to zero
@Override
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Provides a Condition
Condition newCondition() {
return new ConditionObject();
}
// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
// Reports whether in locked state
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
/**
* The sync object does all the hard work. We just forward to it.
*/
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
private static volatile Integer value = 0;
public static void main(String[] args) {
MyLock myLock = new MyLock();
for (int i = 0; i < 1000; i++) {
new Thread(()->{
myLock.lock();
value ++;
myLock.unlock();
}).start();
}
System.out.println(value);
}
}
上面是一個不可重入的鎖,它實現(xiàn)了一個鎖基礎(chǔ)功能翎朱,目的是為了跟ReentrantLock的實現(xiàn)做對比橄维;
ReentrantLock
ReentrantLock的特點(diǎn)
ReentrantLock意思為可重入鎖,指的是一個線程能夠?qū)σ粋€臨界資源重復(fù)加鎖拴曲。ReentrantLock跟常用的Synchronized進(jìn)行比較争舞;
Synchronized的基礎(chǔ)用法
Synchronized的分析可以參考《深入剖析synchronized關(guān)鍵詞》,ReentrantLock可以創(chuàng)建公平鎖澈灼、也可以創(chuàng)建非公平鎖竞川,接下來看一下ReentrantLock的簡單用法,非公平鎖實現(xiàn)比較簡單叁熔,今天重點(diǎn)是公平鎖流译;
public class ReentrantLockTest {
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock(true);
reentrantLock.lock();
try {
log.info("lock");
} catch (Exception e) {
log.error(e);
} finally {
reentrantLock.unlock();
log.info("unlock");
}
}
}
ReentrantLock與AQS的關(guān)聯(lián)
先看一下加鎖方法lock
-
非公平鎖lock方法
compareAndSetState很好理解,通過CAS加鎖者疤,如果加鎖失敗調(diào)用acquire;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
- 公平鎖lock方法
final void lock() {
acquire(1);
}
- AQS框架的處理流程
? 線程繼續(xù)等待叠赦,仍然保留獲取鎖的可能驹马,獲取鎖流程仍在繼續(xù),分析實現(xiàn)原理
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
總結(jié):公平鎖的上鎖是必須判斷自己是不是需要排隊除秀;而非公平鎖是直接進(jìn)行CAS修改計數(shù)器看能不能加鎖成功糯累;如果加鎖不成功則乖乖排隊(調(diào)用acquire);所以不管公平還是不公平册踩;只要進(jìn)到了AQS隊列當(dāng)中那么他就會排隊;
AQS架構(gòu)圖
美團(tuán)畫的AQS的架構(gòu)圖泳姐,很詳細(xì),當(dāng)有自定義同步器接入時暂吉,只需重寫第一層所需要的部分方法即可胖秒,不需要關(guān)注底層具體的實現(xiàn)流程。當(dāng)自定義同步器進(jìn)行加鎖或者解鎖操作時慕的,先經(jīng)過第一層的API進(jìn)入AQS內(nèi)部方法阎肝,然后經(jīng)過第二層進(jìn)行鎖的獲取,接著對于獲取鎖失敗的流程肮街,進(jìn)入第三層和第四層的等待隊列處理风题,而這些處理方式均依賴于第五層的基礎(chǔ)數(shù)據(jù)提供層。
AQS核心思想是,如果被請求的共享資源空閑沛硅,那么就將當(dāng)前請求資源的線程設(shè)置為有效的工作線程眼刃,將共享資源設(shè)置為鎖定狀態(tài);如果共享資源被占用摇肌,就需要一定的阻塞等待喚醒機(jī)制來保證鎖分配擂红。這個機(jī)制主要用的是CLH隊列的變體實現(xiàn)的,將暫時獲取不到鎖的線程加入到隊列中朦蕴。
CLH:Craig篮条、Landin and Hagersten隊列,是單向鏈表吩抓,AQS中的隊列是CLH變體的虛擬雙向隊列(FIFO)涉茧,AQS是通過將每條請求共享資源的線程封裝成一個節(jié)點(diǎn)來實現(xiàn)鎖的分配。
- 非公平鎖的加鎖流程
<img src="https://gitee.com/yangsanchao/images/raw/master/blogs/b8b53a70984668bc68653efe9531573e78636-20200604160346262-20200604160403594.png" alt="img" style="zoom:50%;" />
- 公平鎖的加鎖流程
<img src="https://gitee.com/yangsanchao/images/raw/master/blogs/image-20200604172945029.png" alt="image-20200604172945029" style="zoom:50%;" />
- 解鎖公平鎖和非公平鎖邏輯一致
<img src="https://gitee.com/yangsanchao/images/raw/master/blogs/image-20200604173023011.png" alt="image-20200604173023011" style="zoom:50%;" />
加鎖:
- 通過ReentrantLock的加鎖方法Lock進(jìn)行加鎖操作疹娶。
- 會調(diào)用到內(nèi)部類Sync的Lock方法伴栓,由于Sync#lock是抽象方法,根據(jù)ReentrantLock初始化選擇的公平鎖和非公平鎖雨饺,執(zhí)行相關(guān)內(nèi)部類的Lock方法钳垮,本質(zhì)上都會執(zhí)行AQS的Acquire方法。
- AQS的Acquire方法會執(zhí)行tryAcquire方法额港,但是由于tryAcquire需要自定義同步器實現(xiàn)饺窿,因此執(zhí)行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通過公平鎖和非公平鎖內(nèi)部類實現(xiàn)的tryAcquire方法移斩,因此會根據(jù)鎖類型不同肚医,執(zhí)行不同的tryAcquire。
- tryAcquire是獲取鎖邏輯向瓷,獲取失敗后肠套,會執(zhí)行框架AQS的后續(xù)邏輯,跟ReentrantLock自定義同步器無關(guān)猖任。
- 流程:Lock -> acquire -> tryAcquire( or nonfairTryAcquire)
解鎖:
- 通過ReentrantLock的解鎖方法Unlock進(jìn)行解鎖你稚。
- Unlock會調(diào)用內(nèi)部類Sync的Release方法,該方法繼承于AQS朱躺。
- Release中會調(diào)用tryRelease方法刁赖,tryRelease需要自定義同步器實現(xiàn),tryRelease只在ReentrantLock中的Sync實現(xiàn)室琢,因此可以看出乾闰,釋放鎖的過程,并不區(qū)分是否為公平鎖盈滴。
- 釋放成功后涯肩,所有處理由AQS框架完成轿钠,與自定義同步器無關(guān)。
- 流程:unlock -> release -> tryRelease
acquire獲取鎖
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
tryAcquire
acquire方法首先會調(diào)tryAcquire方法病苗,需要注意的是tryAcquire的結(jié)果做取反疗垛;根據(jù)前面分析,tryAcquire會調(diào)用子類的實現(xiàn)硫朦,ReentrantLock有兩個內(nèi)部類贷腕,F(xiàn)airSync,NonfairSync咬展,都繼承自Sync泽裳,Sync繼承AbstractQueuedSynchronizer;
實現(xiàn)方式差別在是否有hasQueuedPredecessors() 的判斷條件
- 公平鎖實現(xiàn)
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 獲取lock對象的上鎖狀態(tài)破婆,如果鎖是自由狀態(tài)則=0涮总,如果被上鎖則為1,大于1表示重入
int c = getState();
if (c == 0) {
// hasQueuedPredecessors祷舀,判斷自己是否需要排隊
// 下面我會單獨(dú)介紹瀑梗,如果不需要排隊則進(jìn)行cas嘗試加鎖
// 如果加鎖成功則把當(dāng)前線程設(shè)置為擁有鎖的線程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果C不等于0,但是當(dāng)前線程等于擁有鎖的線程則表示這是一次重入裳扯,那么直接把狀態(tài)+1表示重入次數(shù)+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
非公平鎖
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
hasQueuedPredecessors
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
- Node
先來看下AQS中最基本的數(shù)據(jù)結(jié)構(gòu)——Node抛丽,Node即為上面CLH變體隊列中的節(jié)點(diǎn)。
static final class Node {
static final Node SHARED = new Node(); // 表示線程以共享的模式等待鎖
static final Node EXCLUSIVE = null; // 表示線程正在以獨(dú)占的方式等待鎖
static final int CANCELLED = 1; // 表示線程獲取鎖的請求已經(jīng)取消了
static final int SIGNAL = -1; // 表示線程已經(jīng)準(zhǔn)備好了饰豺,就等資源釋放了
static final int CONDITION = -2; // 表示節(jié)點(diǎn)在等待隊列中亿鲜,節(jié)點(diǎn)線程等待喚醒
static final int PROPAGATE = -3; // 當(dāng)前線程處在SHARED情況下,該字段才會使用
volatile int waitStatus; // 當(dāng)前節(jié)點(diǎn)在隊列中的狀態(tài)
volatile Node prev; // 前驅(qū)指針
volatile Node next; // 后繼指針
volatile Thread thread; // 表示處于該節(jié)點(diǎn)的線程
Node nextWaiter; // 指向下一個處于CONDITION狀態(tài)的節(jié)點(diǎn)
final boolean isShared() {
return nextWaiter == SHARED;
}
// 返回前驅(qū)節(jié)點(diǎn)冤吨,沒有的話拋出npe
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
再看hasQueuedPredecessors狡门,整個方法如果最后返回false,則去加鎖锅很,如果返回true則不加鎖,因為這個方法被取反操作凤跑;hasQueuedPredecessors是公平鎖加鎖時判斷等待隊列中是否存在有效節(jié)點(diǎn)的方法爆安。如果返回False,說明當(dāng)前線程可以爭取共享資源仔引;如果返回True扔仓,說明隊列中存在有效節(jié)點(diǎn),當(dāng)前線程必須加入到等待隊列中咖耘。
- h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
雙向鏈表中翘簇,第一個節(jié)點(diǎn)為虛節(jié)點(diǎn),其實并不存儲任何信息儿倒,只是占位版保。真正的第一個有數(shù)據(jù)的節(jié)點(diǎn)呜笑,是在第二個節(jié)點(diǎn)開始的。
- 當(dāng)h != t時: 如果(s = h.next) == null彻犁,等待隊列正在有線程進(jìn)行初始化叫胁,但只是進(jìn)行到了Tail指向Head,沒有將Head指向Tail汞幢,此時隊列中有元素驼鹅,需要返回True(這塊具體見下邊代碼分析)。
- 如果(s = h.next) != null森篷,說明此時隊列中至少有一個有效節(jié)點(diǎn)输钩。
- 如果此時s.thread == Thread.currentThread(),說明等待隊列的第一個有效節(jié)點(diǎn)中的線程與當(dāng)前線程相同仲智,那么當(dāng)前線程是可以獲取資源的买乃;
- 如果s.thread != Thread.currentThread(),說明等待隊列的第一個有效節(jié)點(diǎn)線程與當(dāng)前線程不同坎藐,當(dāng)前線程必須加入進(jìn)等待隊列为牍。
如果這上面沒有看懂,沒有關(guān)系岩馍,先來分析一下構(gòu)建整個隊列的過程碉咆;
- addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// tail為對尾,賦值給pred
Node pred = tail;
// 判斷pred是否為空蛀恩,其實就是判斷對尾是否有節(jié)點(diǎn)疫铜,其實只要隊列被初始化了對尾肯定不為空
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
- enq
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
用一張圖來分析一下,整個隊列構(gòu)建過程双谆;
(1)通過Node(Thread thread, Node mode) 方法構(gòu)建一個node節(jié)點(diǎn)(node2)壳咕,此時的nextWaiter為空,線程不為空顽馋,是當(dāng)前線程谓厘;
(2)如果隊尾為空,則說明隊列未建立寸谜,調(diào)用enq構(gòu)建第一個虛擬節(jié)點(diǎn)(node1)竟稳,通過compareAndSetHead方法構(gòu)建一個頭節(jié)點(diǎn),需要注意的是該頭節(jié)點(diǎn)thread是null熊痴,后續(xù)很多都是用線程是否為null來判讀是否為第一個虛擬節(jié)點(diǎn)他爸;
(3)將node1 cas設(shè)置為head
(4)將頭節(jié)點(diǎn)賦值為tail = head
(5)進(jìn)入下一次for循環(huán)時,會走到else分支果善,會將傳入的node的指向頭部節(jié)點(diǎn)的next辆童,此時node2的prev指向node1(tail)
(6)將node2 cas設(shè)置為tail赛惩;
-
(7)將node2指向node1的next;
經(jīng)過上面的步驟焰枢,就構(gòu)建了一個長度為2的隊列;
添加第二個隊列時,走的是這段代碼,流程就簡單多了,代碼如下
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
再看一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());因為整個構(gòu)建過程并不是原子操作止喷,所以這個條件判斷,現(xiàn)在再是不是就看明白了混聊?
- 當(dāng)h != t時(3)步驟已經(jīng)完成: 如果(s = h.next) == null 此時步驟(4)未完成弹谁,等待隊列正在有線程進(jìn)行初始化,但只是進(jìn)行到了Tail指向Head句喜,沒有將Head指向Tail预愤,此時隊列中有元素,需要返回True
- 如果(s = h.next) != null咳胃,說明此時隊列中至少有一個有效節(jié)點(diǎn)植康。
- 如果此時s.thread == Thread.currentThread(),說明等待隊列的第一個有效節(jié)點(diǎn)中的線程與當(dāng)前線程相同展懈,那么當(dāng)前線程是可以獲取資源的销睁;
- 如果s.thread != Thread.currentThread(),說明等待隊列的第一個有效節(jié)點(diǎn)線程與當(dāng)前線程不同存崖,當(dāng)前線程必須加入進(jìn)等待隊列冻记。
acquireQueued
addWaiter方法其實就是把對應(yīng)的線程以Node的數(shù)據(jù)結(jié)構(gòu)形式加入到雙端隊列里,返回的是一個包含該線程的Node来惧。而這個Node會作為參數(shù)冗栗,進(jìn)入到acquireQueued方法中。acquireQueued方法可以對排隊中的線程進(jìn)行“獲鎖”操作供搀∮缇樱總的來說,一個線程獲取鎖失敗了葛虐,被放入等待隊列胎源,acquireQueued會把放入隊列中的線程不斷去獲取鎖,直到獲取成功或者不再需要獲扔炱辍(中斷)乒融。
下面通過代碼從“何時出隊列?”和“如何出隊列摄悯?”兩個方向來分析一下acquireQueued源碼:
final boolean acquireQueued(final Node node, int arg) {
// 標(biāo)記是否成功拿到資源
boolean failed = true;
try {
// 標(biāo)記等待過程中是否中斷過
boolean interrupted = false;
for (;;) {
// 獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),有兩種情況愧捕;1奢驯、上一個節(jié)點(diǎn)為頭部;2上一個節(jié)點(diǎn)不為頭部
final Node p = node.predecessor();
// 如果p是頭結(jié)點(diǎn)次绘,說明當(dāng)前節(jié)點(diǎn)在真實數(shù)據(jù)隊列的首部瘪阁,就嘗試獲取鎖(頭結(jié)點(diǎn)是虛節(jié)點(diǎn))
// 因為第一次tryAcquire判斷是否需要排隊撒遣,如果需要排隊,那么我就入隊管跺,此處再重試一次
if (p == head && tryAcquire(arg)) {
// 獲取鎖成功义黎,頭指針移動到當(dāng)前node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 說明p為頭節(jié)點(diǎn)且當(dāng)前沒有獲取到鎖(可能是非公平鎖被搶占了)或者是p不為頭結(jié)點(diǎn),這個時候就要判斷當(dāng)前node是否要被阻塞(被阻塞條件:前驅(qū)節(jié)點(diǎn)的waitStatus為-1)豁跑,防止無限循環(huán)浪費(fèi)資源廉涕。具體兩個方法下面細(xì)細(xì)分析
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 成功拿到資源,準(zhǔn)備釋放
if (failed)
cancelAcquire(node);
}
}
setHead
設(shè)置當(dāng)前節(jié)點(diǎn)為頭節(jié)點(diǎn)艇拍,并且將node.thread為空(剛才提到判斷是否為頭部虛擬節(jié)點(diǎn)的條件就是node.thread == null狐蜕。waitStatus狀態(tài)并為修改,等下我們再分析卸夕;
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
shouldParkAfterFailedAcquire
接下來看shouldParkAfterFailedAcquire代碼层释,需要注意的是,每一個新創(chuàng)建Node的節(jié)點(diǎn)是被下一個排隊的node設(shè)置為等待狀態(tài)為SIGNAL快集, 這里比較難以理解為什么需要去改變上一個節(jié)點(diǎn)的park狀態(tài)贡羔?
每個node都有一個狀態(tài),默認(rèn)為0个初,表示無狀態(tài)乖寒,-1表示在park;當(dāng)時不能自己把自己改成-1狀態(tài)勃黍?因為你得確定你自己park了才是能改為-1宵统;所以只能先park;在改狀態(tài)覆获;但是問題你自己都park了马澈;完全釋放CPU資源了,故而沒有辦法執(zhí)行任何代碼了弄息,所以只能別人來改痊班;故而可以看到每次都是自己的后一個節(jié)點(diǎn)把自己改成-1狀態(tài);
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取前驅(qū)節(jié)點(diǎn)的狀態(tài)
int ws = pred.waitStatus;
// 說明頭結(jié)點(diǎn)處于喚醒狀態(tài)
if (ws == Node.SIGNAL)
return true;
// static final int CANCELLED = 1; // 表示線程獲取鎖的請求已經(jīng)取消了
// static final int SIGNAL = -1; // 表示線程已經(jīng)準(zhǔn)備好了摹量,就等資源釋放了
// static final int CONDITION = -2; // 表示節(jié)點(diǎn)在等待隊列中涤伐,節(jié)點(diǎn)線程等待喚醒
// static final int PROPAGATE = -3; // 當(dāng)前線程處在SHARED情況下,該字段才會使用
if (ws > 0) {
do {
// 把取消節(jié)點(diǎn)從隊列中剔除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 設(shè)置前任節(jié)點(diǎn)等待狀態(tài)為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt
調(diào)用LockSupport.park掛起當(dāng)前線程缨称,自己已經(jīng)park凝果,無法再修改狀態(tài)了!
private final boolean parkAndCheckInterrupt() {
// 調(diào)?用park()使線程進(jìn)?入waiting狀態(tài)
LockSupport.park(this);
// 如果被喚醒睦尽,查看?自?己是不不是被中斷的器净,這?里里先清除?下標(biāo)記位
return Thread.interrupted();
}
shouldParkAfterFailedAcquire的整個流程還是比較清晰的,如果不清楚当凡,可以參考美團(tuán)畫的流程圖山害;
cancelAcquire
通過上面的分析纠俭,當(dāng)failed為true時,也就意味著park結(jié)束浪慌,線程被喚醒了冤荆,for循環(huán)已經(jīng)跳出,開始執(zhí)行cancelAcquire权纤,通過cancelAcquire方法钓简,將Node的狀態(tài)標(biāo)記為CANCELLED;代碼如下:
private void cancelAcquire(Node node) {
// 將無效節(jié)點(diǎn)過濾
if (node == null)
return;
// 設(shè)置該節(jié)點(diǎn)不關(guān)聯(lián)任何線程妖碉,也就是虛節(jié)點(diǎn)(上面已經(jīng)提到涌庭,node.thread = null是判讀是否是頭節(jié)點(diǎn)的條件)
node.thread = null;
Node pred = node.prev;
// 通過前驅(qū)節(jié)點(diǎn),處理waitStatus > 0的node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 把當(dāng)前node的狀態(tài)設(shè)置為CANCELLED欧宜,當(dāng)下一個node排隊結(jié)束時坐榆,自己就會被上一行代碼處理掉;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
// 如果當(dāng)前節(jié)點(diǎn)是尾節(jié)點(diǎn)冗茸,將從后往前的第一個非取消狀態(tài)的節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)席镀,更新失敗的話,則進(jìn)入else夏漱,如果更新成功豪诲,將tail的后繼節(jié)點(diǎn)設(shè)置為null
if (node == tail && compareAndSetTail(node, pred)) {
// 把自己設(shè)置為null
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果當(dāng)前節(jié)點(diǎn)不是head的后繼節(jié)點(diǎn)
// 1:判斷當(dāng)前節(jié)點(diǎn)前驅(qū)節(jié)點(diǎn)的是否為SIGNAL
// 2:如果不是,則把前驅(qū)節(jié)點(diǎn)設(shè)置為SINGAL看是否成功
// 如果1和2中有一個為true挂绰,再判斷當(dāng)前節(jié)點(diǎn)的線程是否為null
// 如果上述條件都滿足屎篱,把當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)的后繼指針指向當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果當(dāng)前節(jié)點(diǎn)是head的后繼節(jié)點(diǎn),或者上述條件不滿足葵蒂,那就喚醒當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
當(dāng)前的流程:
獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)交播,如果前驅(qū)節(jié)點(diǎn)的狀態(tài)是CANCELLED,那就一直往前遍歷践付,找到第一個waitStatus <= 0的節(jié)點(diǎn)秦士,將找到的Pred節(jié)點(diǎn)和當(dāng)前Node關(guān)聯(lián),將當(dāng)前Node設(shè)置為CANCELLED永高。
-
根據(jù)當(dāng)前節(jié)點(diǎn)的位置隧土,考慮以下三種情況:
(1) 當(dāng)前節(jié)點(diǎn)是尾節(jié)點(diǎn)。
(2) 當(dāng)前節(jié)點(diǎn)是Head的后繼節(jié)點(diǎn)命爬。
(3) 當(dāng)前節(jié)點(diǎn)不是Head的后繼節(jié)點(diǎn)曹傀,也不是尾節(jié)點(diǎn)。
(1)當(dāng)前節(jié)點(diǎn)時尾節(jié)點(diǎn)
<img src="https://gitee.com/yangsanchao/images/raw/master/blogs/image-20200607180254816.png" alt="image-20200607180254816" style="zoom:50%;" />
(2)當(dāng)前節(jié)點(diǎn)是Head的后繼節(jié)點(diǎn)饲宛。
這張圖描述的是這段代碼:unparkSuccessor
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
<img src="https://gitee.com/yangsanchao/images/raw/master/blogs/image-20200607180140169.png" alt="image-20200607180140169" style="zoom:50%;" />
(3)當(dāng)前節(jié)點(diǎn)不是Head的后繼節(jié)點(diǎn)皆愉,也不是尾節(jié)點(diǎn)。
這張圖描述的是這段代碼跟(2)一樣;
<img src="https://gitee.com/yangsanchao/images/raw/master/blogs/image-20200607180034112.png" alt="image-20200607180034112" style="zoom:50%;" />
通過上面的圖亥啦,你會發(fā)現(xiàn)所有的變化都是對Next指針進(jìn)行了操作,而沒有對Prev指針進(jìn)行操作练链,原因是執(zhí)行cancelAcquire的時候翔脱,當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)可能已經(jīng)從隊列中出去了(已經(jīng)執(zhí)行過Try代碼塊中的shouldParkAfterFailedAcquire方法了),也就是下圖中代碼1和代碼2直接的間隙就會出現(xiàn)這種情況媒鼓,此時修改Prev指針届吁,有可能會導(dǎo)致Prev指向另一個已經(jīng)移除隊列的Node,因此這塊變化Prev指針不安全绿鸣。
<img src="https://gitee.com/yangsanchao/images/raw/master/blogs/image-20200607180655529.png" alt="image-20200607180655529" />
unlock解鎖
解鎖時并不區(qū)分公平和不公平疚沐,因為ReentrantLock實現(xiàn)了鎖的可重入,可以進(jìn)一步的看一下時如何處理的潮模,上代碼:
public void unlock() {
sync.release(1);
}
release
public final boolean release(int arg) {
// 自定義的tryRelease如果返回true亮蛔,說明該鎖沒有被任何線程持有
if (tryRelease(arg)) {
// 獲取頭結(jié)點(diǎn)
Node h = head;
if (h != null && h.waitStatus != 0)
// 頭結(jié)點(diǎn)不為空并且頭結(jié)點(diǎn)的waitStatus不是初始化節(jié)點(diǎn)情況,解除線程掛起狀態(tài)
unparkSuccessor(h);
return true;
}
return false;
}
這里的判斷條件為什么是h != null && h.waitStatus != 0
- h == null Head還沒初始化擎厢。初始情況下究流,head == null,第一個節(jié)點(diǎn)入隊动遭,Head會被初始化一個虛擬節(jié)點(diǎn)芬探。如果還沒來得及入隊,就會出現(xiàn)head == null 的情況厘惦。
- h != null && waitStatus == 0 表明后繼節(jié)點(diǎn)對應(yīng)的線程仍在運(yùn)行中偷仿,不需要喚醒。
- h != null && waitStatus < 0 表明后繼節(jié)點(diǎn)可能被阻塞了宵蕉,需要喚醒酝静,(還記得一個node是在shouldParkAfterFailedAcquire方法中被設(shè)置為SIGNAL = -1的吧?不記得翻看一下上面吧)
tryRelease
protected final boolean tryRelease(int releases) {
// 減少可重入次數(shù)国裳,setState(c);
int c = getState() - releases;
// 當(dāng)前線程不是持有鎖的線程形入,拋出異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果持有線程全部釋放,將當(dāng)前獨(dú)占鎖所有線程設(shè)置為null缝左,并更新state
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor
這個方法在cancelAcquire其實也用到了亿遂,簡單分析一下
// 如果當(dāng)前節(jié)點(diǎn)是head的后繼節(jié)點(diǎn),或者上述條件不滿足渺杉,就喚醒當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)unparkSuccessor(node);
private void unparkSuccessor(Node node) {
// 獲取結(jié)點(diǎn)waitStatus蛇数,CAS設(shè)置狀態(tài)state=0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 獲取當(dāng)前節(jié)點(diǎn)的下一個節(jié)點(diǎn)
Node s = node.next;
// 如果下個節(jié)點(diǎn)是null或者下個節(jié)點(diǎn)被cancelled,就找到隊列最開始的非cancelled的節(jié)點(diǎn)
if (s == null || s.waitStatus > 0) {
s = null;
// 就從尾部節(jié)點(diǎn)開始找是越,到隊首耳舅,找到隊列第一個waitStatus<0的節(jié)點(diǎn)。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果當(dāng)前節(jié)點(diǎn)的下個節(jié)點(diǎn)不為空,而且狀態(tài)<=0浦徊,就把當(dāng)前節(jié)點(diǎn)unpark
if (s != null)
LockSupport.unpark(s.thread);
}
為什么要從后往前找第一個非Cancelled的節(jié)點(diǎn)呢馏予?
原因1:addWaiter方法并非原子,構(gòu)建鏈表結(jié)構(gòu)時如下圖中 1盔性、2間隙執(zhí)行unparkSuccessor霞丧,此時鏈表是不完整的,沒辦法從前往后找了冕香;
原因2:還有一點(diǎn)原因蛹尝,在產(chǎn)生CANCELLED狀態(tài)節(jié)點(diǎn)的時候,先斷開的是Next指針悉尾,Prev指針并未斷開突那,因此也是必須要從后往前遍歷才能夠遍歷完全部的Node;
中斷恢復(fù)
喚醒后构眯,會執(zhí)行return Thread.interrupted();愕难,這個函數(shù)返回的是當(dāng)前執(zhí)行線程的中斷狀態(tài),并清除鸵赖。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
acquireQueued代碼务漩,當(dāng)parkAndCheckInterrupt返回True或者False的時候,interrupted的值不同它褪,但都會執(zhí)行下次循環(huán)饵骨。如果這個時候獲取鎖成功,就會把當(dāng)前interrupted返回茫打。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如果acquireQueued為True居触,就會執(zhí)行selfInterrupt方法。
該方法其實是為了中斷線程老赤。但為什么獲取了鎖以后還要中斷線程呢轮洋?這部分屬于Java提供的協(xié)作式中斷知識內(nèi)容,感興趣同學(xué)可以查閱一下抬旺。這里簡單介紹一下:
- 當(dāng)中斷線程被喚醒時弊予,并不知道被喚醒的原因,可能是當(dāng)前線程在等待中被中斷开财,也可能是釋放了鎖以后被喚醒汉柒。因此我們通過Thread.interrupted()方法檢查中斷標(biāo)記(該方法返回了當(dāng)前線程的中斷狀態(tài),并將當(dāng)前線程的中斷標(biāo)識設(shè)置為False)责鳍,并記錄下來碾褂,如果發(fā)現(xiàn)該線程被中斷過,就再中斷一次历葛。
- 線程在等待資源的過程中被喚醒正塌,喚醒后還是會不斷地去嘗試獲取鎖,直到搶到鎖為止。也就是說乓诽,在整個流程中帜羊,并不響應(yīng)中斷,只是記錄中斷記錄鸠天。最后搶到鎖返回了逮壁,那么如果被中斷過的話,就需要補(bǔ)充一次中斷粮宛。
這里的處理方式主要是運(yùn)用線程池中基本運(yùn)作單元Worder中的runWorker,通過Thread.interrupted()進(jìn)行額外的判斷處理卖宠,可以看下ThreadPoolExecutor源碼的判斷條件巍杈;
其它
AQS在JUC中有?比較?廣泛的使?用,以下是主要使?用的地?方:
- ReentrantLock:使?用AQS保存鎖重復(fù)持有的次數(shù)扛伍。當(dāng)?一個線程獲取鎖時筷畦, ReentrantLock記錄當(dāng)
前獲得鎖的線程標(biāo)識,?用于檢測是否重復(fù)獲取刺洒,以及錯誤線程試圖解鎖操作時異常情況的處理理鳖宾。 - Semaphore:使?用AQS同步狀態(tài)來保存信號量量的當(dāng)前計數(shù)。 tryRelease會增加計數(shù)逆航,
acquireShared會減少計數(shù)鼎文。 - CountDownLatch:使?用AQS同步狀態(tài)來表示計數(shù)。計數(shù)為0時因俐,所有的Acquire操作
(CountDownLatch的await?方法)才可以通過拇惋。 - ReentrantReadWriteLock:使?用AQS同步狀態(tài)中的16位保存寫鎖持有的次數(shù),剩下的16位?用于保
存讀鎖的持有次數(shù)抹剩。 - ThreadPoolExecutor: Worker利利?用AQS同步狀態(tài)實現(xiàn)對獨(dú)占線程變量量的設(shè)置(tryAcquire和
tryRelease)撑帖。
至此,通過ReentrantLock分析AQS的實現(xiàn)原理一家完畢澳眷,需要說明的是胡嘿,此文深度參考了美團(tuán)分析的ReentrantLock,是參考鏈接的第三個钳踊,有興趣可以對比差異衷敌,感謝!
參考
Java的LockSupport.park()實現(xiàn)分析
[從ReentrantLock的實現(xiàn)看AQS的原理及應(yīng)用