1. 前言
AbstractQueuedSynchronizer(AQS)基于FIFO等待隊(duì)列以及CAS操作實(shí)現(xiàn)了基礎(chǔ)了同步框架,JUC中包括ReentrantLock,ReentrantReadWriteLock迂卢,CountDownLatch,Condition都是基于AbstractQueuedSynchronizer提供的基礎(chǔ)同步操作來實(shí)現(xiàn)的。
2. AbstractQueuedSynchronizer實(shí)現(xiàn)原理
2. 1 CLH隊(duì)列
CLH(代表Craig, Landin, Hagersten三人)隊(duì)列是一個(gè)FIFO的隊(duì)列瘦穆,這個(gè)隊(duì)列用來對資源競爭者(不同的線程就是一個(gè)競爭者)進(jìn)行排隊(duì)棚点。
2.1.1 隊(duì)列節(jié)點(diǎn)Node
AQS實(shí)現(xiàn)中隊(duì)列中一個(gè)節(jié)點(diǎn)即一個(gè)競爭者该镣,用類Node祠墅,Node類有如下主要成員:
成員 | 含義 | 取值 |
---|---|---|
waitStatus | 當(dāng)前節(jié)點(diǎn)狀態(tài) | SIGNAL=-1 表示當(dāng)前節(jié)點(diǎn)釋放資源后需要喚醒他的后繼阻塞中的節(jié)點(diǎn); CANCELLED=1當(dāng)前節(jié)點(diǎn)已經(jīng)取消對資源的競爭; PROPAGATE=-3 用于競爭共享資源競爭侮穿,表示當(dāng)前節(jié)點(diǎn)獲取資源后,應(yīng)該喚醒其他排隊(duì)阻塞中獲取該共享資源的競爭者; CONDITION=-2專門用于Condition的實(shí)現(xiàn) |
Node prev,next | 當(dāng)前節(jié)點(diǎn)的前驅(qū)和后繼節(jié)點(diǎn) | -- |
Thread thread | 當(dāng)前排隊(duì)的線程毁嗦,也就是競爭資源的線程 | -- |
Node nextWaiter | 如果waitStatus=CONDITION則nextWaiter連接到下一個(gè)等待在同一個(gè)condition上的node亲茅;nextWaiter也可以是class NODE的靜態(tài)成員SHARED表示當(dāng)前節(jié)點(diǎn)競爭共享資源;等于null表示競爭排他性資源 |
1. 節(jié)點(diǎn)種類
根據(jù)上表中nextWaiter可以看出狗准,實(shí)際上node的種類有三種:
- exclusive node(表示當(dāng)前節(jié)點(diǎn)申請排他新資源)
- shared node(表示當(dāng)前節(jié)點(diǎn)申請的是共享資源)
- condition克锣, 當(dāng)前節(jié)點(diǎn)等待條件變量
不同的node種類獲取資源(對應(yīng)入隊(duì)操作)和釋放資源(對應(yīng)出隊(duì)操作)都有所不同
2.2.2 入隊(duì)操作
也就是去競爭資源。
這里只介紹一下exclusive和shared node的入隊(duì)操作
2.2.2.1 exclusive node入隊(duì)操作
使用AQS競爭排他性資源時(shí)腔长,需要調(diào)用如下兩個(gè)接口之一:
/*
調(diào)用acquire獲取資源袭祟,這是一個(gè)阻塞操作直到獲取到資源,且不可被中斷捞附。
*/
public final void acquire(long arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
和acquire不同的是競爭者等待期間是可以被中斷
*/
public final void acquireInterruptibly(long arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
兩者都調(diào)用了tryAcquire(), 這個(gè)方法需要使用AQS的用戶繼承并實(shí)現(xiàn)它巾乳,并且這個(gè)調(diào)用不應(yīng)該阻塞,之所以提供tryAcquire操作是因?yàn)楫?dāng)前競爭者可能已經(jīng)獲取了資源鸟召,那么在tryAcquire可以做一下判斷胆绊,避免已經(jīng)獲取了資源后還去盲目的進(jìn)去CLH隊(duì)列排隊(duì)。
典型的就是重進(jìn)入鎖ReentrantLock的實(shí)現(xiàn)欧募,獲取鎖的線程可以再次獲取鎖压状,那么就可以在tryAcquire中做一下判斷。
關(guān)于參數(shù)arg跟继,可以理解成嘗試獲取資源數(shù)量吧种冬,實(shí)際上這個(gè)跟AQS的一個(gè)重要的成員state有關(guān),當(dāng)用戶使用AQS時(shí)舔糖,需要設(shè)置state這個(gè)整數(shù)值娱两,關(guān)于這個(gè)state,應(yīng)該可以當(dāng)成是對資源狀態(tài)的描述吧金吗。比如ReenctrantLock中可以根據(jù)state是否等于0判斷是否加鎖了谷婆,以及state的值大小判斷重進(jìn)入鎖加了幾次鎖。
上面代碼addWaiter()其實(shí)就是不停的使用CAS操作將當(dāng)前節(jié)點(diǎn)加入CLH隊(duì)列尾部辽聊。
2. acquireQueued()
下面是acquireQueued的代碼實(shí)現(xiàn):
final boolean acquireQueued(final Node node, long arg) {
boolean failed = true;
try {
// 線程中斷標(biāo)記
boolean interrupted = false;
for (;;) {
// 獲得當(dāng)前節(jié)點(diǎn)的前驅(qū)
final Node p = node.predecessor();
// 如果前驅(qū)節(jié)點(diǎn)已經(jīng)是隊(duì)列頭,那就意味著看起來自己最優(yōu)先獲取到資源期贫,然后就調(diào)用tryAcquire嘗試獲取資源跟匆。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 當(dāng)前節(jié)點(diǎn)不是頭節(jié)點(diǎn)則排隊(duì)阻塞。
// shouldParkAfterFailedAcquire會(huì)將前驅(qū)節(jié)點(diǎn)的waitStatus標(biāo)記成SIGNAL表示告訴前驅(qū)節(jié)點(diǎn):你釋放資源后需要喚醒我通砍,我可能阻塞了
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果排隊(duì)過程中被中斷玛臂,記下來烤蜕,不甩異常
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
以上可以看出acquireQueued其實(shí)就是在不停的做如下嘗試:
- 判斷當(dāng)前節(jié)點(diǎn)是不是最前面了,是則調(diào)用tryAcquire嘗試獲取資源迹冤,獲取成功就返回讽营,獲取失敗或者不是最前面節(jié)點(diǎn)則阻塞等待,然后到2
- 無限等待然后被喚醒泡徙,繼續(xù)1.
2. doAcquireInterruptibly()
和acquireQueued的過程是一樣的橱鹏,就是等待的過程中被中斷了立即甩異常
2.2.2.2 shared node入隊(duì)操作
和exelusive模式一樣都有可中斷和不可中斷兩個(gè),如下:
public final void acquireShared(long arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final void acquireSharedInterruptibly(long arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared()需要使用AQS使用者繼承實(shí)現(xiàn)堪藐,不過和tryAcquire不一樣的是它返回一個(gè)整數(shù)莉兰,小于0表示嘗試失敗了。
1. doAcquireShared
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 和exclusive模式下acquireQueued整體流程是一樣的礁竞,
// 不同的是在這里糖荒,成功獲取到競爭資源后,調(diào)用的這個(gè)方法模捂。
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
關(guān)于上面setHeadAndPropagate做了什么捶朵,先通過下圖描述一下為什么會(huì)有這個(gè)方法的調(diào)用吧.
假設(shè)現(xiàn)在排隊(duì)隊(duì)列如下:
----- ------ ----- -----
| E1 | ->| S1 | -> | S2 | ->| S3 |
----- ------ ----- ----
節(jié)點(diǎn)E1是頭節(jié)點(diǎn),exclusively持有資源狂男, S1, S2, S3都是在申請共享訪問資源综看,然后因?yàn)镋1已經(jīng)持有了,所以都阻塞了并淋。
然后E1釋放資源了寓搬,所以他會(huì)喚醒阻塞中的S1,S1拿到了資源县耽,但是由于S1是共享模式去拿資源句喷,S2,S3也都是去申請共享資源兔毙,所以S1應(yīng)該去同樣喚醒S2唾琼,S3一起去分享資源。 這也就是setHeadAndPropagate
干的事情--喚醒S2, S2醒了喚醒S3澎剥。
2.2.3 出隊(duì)操作
也就是釋放資源.
2.2.3.1 exclusive node釋放資源
釋放資源通過如下方法實(shí)現(xiàn):
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease和tryAcquire一樣锡溯,是需要使用者繼承并實(shí)現(xiàn)的,返回true表示資源完全被釋放(也就是需要喚醒后面等待的node)哑姚,tryRelease同樣需要結(jié)合資源狀態(tài)state去判斷祭饭,且應(yīng)該是一個(gè)非阻塞的調(diào)用。
下面unparkSuccessor喚醒后繼等待者的實(shí)現(xiàn):
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 后繼節(jié)點(diǎn)等于null并不能代表沒有后繼節(jié)點(diǎn)叙量,原因跟入隊(duì)addWaiter的實(shí)現(xiàn)有關(guān).
// 調(diào)用addWaiter入隊(duì)newNode時(shí)先用CAS將tail設(shè)置成newNode倡蝙,然后再將之前的tail.next設(shè)置成newNode,
// 所以這中間的時(shí)間間隔中有tail但是之前的tail還沒有完成和newNode連接绞佩,也就是之前的tail的后繼是null
// 而s.waitStatus >0 則表示這個(gè)節(jié)點(diǎn)狀態(tài)為CACELLED取消競爭資源寺鸥,應(yīng)該再找它后面的節(jié)點(diǎn)
// for循環(huán)里從tail往前遍歷找到最前面的沒有CANCELLED的節(jié)點(diǎn)猪钮,然后喚醒它。
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 喚醒找到的節(jié)點(diǎn)對應(yīng)的線程胆建。
LockSupport.unpark(s.thread);
}
2.2.3.2 shared node釋放資源
調(diào)用如下方法:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared需要使用AQS的繼承并實(shí)現(xiàn)它烤低,返回true表示成功釋放,意味著需要喚醒后續(xù)等待者笆载。
關(guān)于doReleaseShared就是喚醒后繼所有等待的節(jié)點(diǎn)扑馁,被喚醒的節(jié)點(diǎn)又會(huì)在前面介紹的各種doAcquireXXX中醒來去嘗試獲取資源,獲取不到的又會(huì)等待宰译。