大家好后室,我是小黑,一個在互聯(lián)網(wǎng)茍且偷生的農(nóng)民工独撇。
在Java并發(fā)編程中屑墨,經(jīng)常會用到鎖,除了Synchronized這個JDK關(guān)鍵字以外纷铣,還有Lock接口下面的各種鎖實現(xiàn)卵史,如重入鎖ReentrantLock,還有讀寫鎖ReadWriteLock等关炼,他們在實現(xiàn)鎖的過程中都是依賴與AQS來完成核心的加解鎖邏輯的。那么AQS具體是什么呢匣吊?
提供一個框架儒拂,用于實現(xiàn)依賴先進先出(FIFO)等待隊列的阻塞鎖和相關(guān)同步器(信號量,事件等)色鸳。 該類被設(shè)計為大多數(shù)類型的同步器的有用依據(jù)社痛,這些同步器依賴于單個原子int值來表示狀態(tài)。 子類必須定義改變此狀態(tài)的受保護方法命雀,以及根據(jù)該對象被獲取或釋放來定義該狀態(tài)的含義蒜哀。 給定這些,這個類中的其他方法執(zhí)行所有排隊和阻塞機制吏砂。 子類可以保持其他狀態(tài)字段撵儿,但只以原子方式更新int使用方法操縱值getState() 乘客, setState(int)和compareAndSetState(int, int)被跟蹤相對于同步。
上述內(nèi)容來自JDK官方文檔淀歇。
簡單來說易核,AQS是一個先進先出(FIFO)的等待隊列,主要用在一些線程同步場景浪默,需要通過一個int類型的值來表示同步狀態(tài)牡直。提供了排隊和阻塞機制。
類圖結(jié)構(gòu)
從類圖可以看出纳决,在ReentrantLock中定義了AQS的子類Sync碰逸,可以通過Sync實現(xiàn)對于可重入鎖的加鎖,解鎖阔加。
AQS通過int類型的狀態(tài)state來表示同步狀態(tài)饵史。
AQS中主要提供的方法:
acquire(int) 獨占方式獲取鎖
acquireShared(int) 共享方式獲取鎖
release(int) 獨占方式釋放鎖
releaseShared(int) 共享方式釋放鎖
獨占鎖和共享鎖
關(guān)于獨占鎖和共享鎖先給大家普及一下這個概念。
獨占鎖指該鎖只能同時被一個線程持有掸哑;
共享鎖指該鎖可以被多個線程同時持有约急。
舉個生活中的例子,比如我們使用打車軟件打車苗分,獨占鎖就好比我們打快車或者專車厌蔽,一輛車只能讓一個客戶打到,不能兩個客戶同時打到一輛車摔癣;共享鎖就好比打拼車奴饮,可以有多個客戶一起打到同一輛車。
AQS內(nèi)部結(jié)構(gòu)
我們簡單通過一張圖先來了解下AQS的內(nèi)部結(jié)構(gòu)择浊。其實就是有一個隊列戴卜,這個隊列的頭結(jié)點head代表當(dāng)前正在持有鎖的線程,后續(xù)的其他節(jié)點代表當(dāng)前正在等待的線程琢岩。
接下來我們通過源碼來看看AQS的加鎖和解鎖過程投剥。先來看看獨占鎖是如何進行加解鎖的。
獨占鎖加鎖過程
ReentrantLock lock = new ReentrantLock();
lock.lock();
public void lock() {
// 調(diào)用sync的lock方法
sync.lock();
}
可以看到在ReentrantLock的lock方法中担孔,直接調(diào)用了sync這個AQS子類的lock方法江锨。
final void lock() {
// 獲取鎖
acquire(1);
}
public final void acquire(int arg) {
// 1.先嘗試獲取,如果獲取成功糕篇,則直接返回啄育,代表加鎖成功
if (!tryAcquire(arg) &&
// 2.如果獲取失敗,則調(diào)用addWaiter在等待隊列中增加一個節(jié)點
// 3. 調(diào)用acquireQueued告訴前一個節(jié)點拌消,在解鎖之后喚醒自己,然后線程進入等待狀態(tài)
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果在等待過程中被中斷挑豌,則當(dāng)前線程中斷
selfInterrupt();
}
在獲取鎖時,基本可以分為3步:
- 嘗試獲取,如果成功則返回氓英,如果失敗侯勉,執(zhí)行下一步;
- 將當(dāng)前線程放入等待隊列尾部债蓝;
- 標(biāo)記前面等待的線程執(zhí)行完之后喚醒當(dāng)前線程壳鹤。
/**
* 嘗試獲取鎖(公平鎖實現(xiàn))
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 獲取state,初始值為0饰迹,每次加鎖成功會+1芳誓,解鎖成功-1
int c = getState();
// 當(dāng)前沒有線程占用
if (c == 0) {
// 判斷是否有其他線程排隊在本線程之前
if (!hasQueuedPredecessors() &&
// 如果沒有,通過CAS進行加鎖
compareAndSetState(0, acquires)) {
// 將當(dāng)前線程設(shè)置為AQS的獨占線程
setExclusiveOwnerThread(current);
return true;
}
}
// 如果當(dāng)前線程是正在獨占的線程(已持有鎖啊鸭,重入)
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// state+1
setState(nextc);
return true;
}
return false;
}
private Node addWaiter(Node mode) {
// 創(chuàng)建一個當(dāng)前線程的Node節(jié)點
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 如果等待隊列的尾節(jié)點!=null
if (pred != null) {
// 將本線程對應(yīng)節(jié)點的前置節(jié)點設(shè)置為原來的尾節(jié)點
node.prev = pred;
// 通過CAS將本線程節(jié)點設(shè)置為尾節(jié)點
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//尾節(jié)點為空锹淌,或者在CAS時失敗,則通過enq方法重新加入到尾部赠制。(本方法內(nèi)部采用自旋)
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 尾節(jié)點為空赂摆,代表等待隊列還沒有被初始化過
if (t == null) {
// 創(chuàng)建一個空的Node對象,通過CAS賦值給Head節(jié)點钟些,如果失敗烟号,則重新自旋一次,如果成功,將Head節(jié)點賦值給尾節(jié)點
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 尾節(jié)點不為空的情況,說明等待隊列已經(jīng)被初始化過蟆豫,將當(dāng)前節(jié)點的前置節(jié)點指向尾節(jié)點
node.prev = t;
// 將當(dāng)前節(jié)點CAS賦值給尾節(jié)點
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
// 標(biāo)識是否加鎖失敗
boolean failed = true;
try {
// 是否被中斷
boolean interrupted = false;
for (;;) {
// 取出來當(dāng)前節(jié)點的前一個節(jié)點
final Node p = node.predecessor();
// 如果前一個節(jié)點是head節(jié)點,那么自己就是老二迫筑,這個時候再嘗試獲取一次鎖
if (p == head && tryAcquire(arg)) {
// 如果獲取成功,把當(dāng)前節(jié)點設(shè)置為head節(jié)點
setHead(node);
p.next = null; // help GC
failed = false; // 標(biāo)識加鎖成功
return interrupted;
}
// shouldParkAfterFailedAcquire 檢查并更新前置節(jié)點p的狀態(tài)宗弯,如果node節(jié)點應(yīng)該阻塞就返回true
// 如果返回false脯燃,則自旋一次。
if (shouldParkAfterFailedAcquire(p, node) &&
// 當(dāng)前線程阻塞蒙保,在阻塞被喚醒時辕棚,判斷是否被中斷
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed) // 如果加鎖成功,則取消獲取鎖
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // ws == -1
/*
* 這個節(jié)點已經(jīng)設(shè)置了請求釋放的狀態(tài)邓厕,所以它可以在這里安全park.
*/
return true;
if (ws > 0) {
/*
* 前置節(jié)點被取消了逝嚎,跳過前置節(jié)點重試
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 將前置節(jié)點的狀態(tài)設(shè)置為請求釋放
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
在整個加鎖過程可以通過下圖更清晰的理解。
獨占鎖解鎖過程
public void unlock() {
sync.release(1);
}
同樣解鎖時也是直接調(diào)用AQS子類sync的release方法邑狸。
public final boolean release(int arg) {
// 嘗試解鎖
if (tryRelease(arg)) {
Node h = head;
// 解鎖成功懈糯,如果head!=null并且head.ws不等0涤妒,代表有其他線程排隊
if (h != null && h.waitStatus != 0)
// 喚醒后續(xù)等待的節(jié)點
unparkSuccessor(h);
return true;
}
return false;
}
解鎖過程如下:
- 先嘗試解鎖单雾,解鎖失敗則直接返回false。(理論上不會解鎖失敗,因為正在執(zhí)行解鎖的線程一定是持有鎖的線程)
- 解鎖成功之后硅堆,如果有head節(jié)點并且狀態(tài)不是0屿储,代表有線程被阻塞等待,則喚醒下一個等待的線程渐逃。
protected final boolean tryRelease(int releases) {
// state - 1
int c = getState() - releases;
// 如果當(dāng)前線程不是獨占AQS的線程够掠,但是這時候又來解鎖,這種情況肯定是非法的茄菊。
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 如果狀態(tài)歸零疯潭,代表鎖釋放了,將獨占線程設(shè)置為null
free = true;
setExclusiveOwnerThread(null);
}
// 將減1之后的狀態(tài)設(shè)置為state
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
/*
* 如果節(jié)點的ws小于0面殖,將ws設(shè)置為0
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 從等待隊列的尾部往前找竖哩,直到第二個節(jié)點,ws<=0的節(jié)點脊僚。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果存在符合條件的節(jié)點相叁,unpark喚醒這個節(jié)點的線程。
if (s != null)
LockSupport.unpark(s.thread);
}
共享鎖加鎖過程
為了實現(xiàn)共享鎖辽幌,AQS中專門有一套和排他鎖不同的實現(xiàn)增淹,我們來看一下源碼具體是怎么做的。
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
// tryAcquireShared 嘗試獲取共享鎖許可乌企,如果返回負(fù)數(shù)標(biāo)識獲取失敗
// 返回0表示成功虑润,但是已經(jīng)沒有多余的許可可用,后續(xù)不能再成功逛犹,返回正數(shù)表示后續(xù)請求也可以成功
if (tryAcquireShared(arg) < 0)
// 申請失敗端辱,則加入到共享等待隊列
doAcquireShared(arg);
}
tryAcquireShared嘗試獲取共享許可,本方法需要在子類中進行實現(xiàn)虽画。不同的實現(xiàn)類實現(xiàn)方式不一樣舞蔽。
下面的代碼是ReentrentReadWriteLock中的實現(xiàn)。
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 當(dāng)前有獨占線程正在持有許可码撰,并且獨占線程不是當(dāng)前線程渗柿,則返回失敗(-1)
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 沒有獨占線程脖岛,或者獨占線程是當(dāng)前線程朵栖。
// 獲取已使用讀鎖的個數(shù)
int r = sharedCount(c);
// 判斷當(dāng)前讀鎖是否應(yīng)該阻塞
if (!readerShouldBlock() &&
// 已使用讀鎖小于最大數(shù)量
r < MAX_COUNT &&
// CAS設(shè)置state,每次加SHARED_UNIT標(biāo)識共享鎖+1
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) { // 標(biāo)識第一次加讀鎖
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 重入加讀鎖
firstReaderHoldCount++;
} else {
// 并發(fā)加讀鎖柴梆,記錄當(dāng)前線程的讀的次數(shù)陨溅,HoldCounter中是一個ThreadLocal。
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 否則自旋嘗試獲取共享鎖
return fullTryAcquireShared(current);
}
本方法可以總結(jié)為三步:
- 如果有寫線程獨占绍在,則失敗门扇,返回-1
- 沒有寫線程或者當(dāng)前線程就是寫線程重入雹有,則判斷是否讀線程阻塞,如果不用阻塞則CAS將已使用讀鎖個數(shù)+1
- 如果第2步失敗臼寄,失敗原因可能是讀線程應(yīng)該阻塞霸奕,或者讀鎖達(dá)到上限,或者CAS失敗吉拳,則調(diào)用fullTryAcquireShared方法质帅。
private void doAcquireShared(int arg) {
// 加入同步等待隊列,指定是SHARED類型
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 取到當(dāng)前節(jié)點的前一個節(jié)點
final Node p = node.predecessor();
// 如果前一個節(jié)點是頭節(jié)點留攒,則當(dāng)前節(jié)點是第二個節(jié)點煤惩。
if (p == head) {
// 因為是FIFO隊列,所以當(dāng)前節(jié)點這時可以再嘗試獲取一次。
int r = tryAcquireShared(arg);
if (r >= 0) {
// 獲取成功炼邀,把當(dāng)前節(jié)點設(shè)置為頭節(jié)點盟庞。并且判斷是否需要喚醒后面的等待節(jié)點。
// 如果條件允許汤善,就會喚醒后面的節(jié)點
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 如果前置節(jié)點不是頭結(jié)點什猖,說明當(dāng)前節(jié)點線程需要阻塞等待,并告知前一個節(jié)點喚醒
// 檢查并更新前置節(jié)點p的狀態(tài)红淡,如果node節(jié)點應(yīng)該阻塞就返回true
// 當(dāng)前線程被喚醒之后不狮,會從parkAndCheckInterrupt()執(zhí)行
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
共享鎖釋放過程
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//tryReleaseShared()嘗試釋放許可,這個方法在AQS中默認(rèn)拋出一個異常在旱,需要在子類中實現(xiàn)
if (tryReleaseShared(arg)) {
// 喚醒線程摇零,設(shè)置傳播狀態(tài) WS
doReleaseShared();
return true;
}
return false;
}
AQS是很多并發(fā)場景下同步控制的基石,其中的實現(xiàn)相對要復(fù)雜很多桶蝎,還需要多看多琢磨才能完全理解驻仅。本文也是和大家做一個初探,給大家展示了核心的代碼邏輯登渣,希望能有所幫助噪服。
好的,本期內(nèi)容就到這里胜茧,我們下期見粘优。