概述
AQS是java concurrent包的基礎(chǔ)曲管,像Lock簇爆、CountDownLatch剔猿、Semaphore等都是基于它實(shí)現(xiàn)的视译;
成員變量
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
static final long spinForTimeoutThreshold = 1000L;
- head:等待隊(duì)列頭部,延遲初始化归敬,直到調(diào)用enq才真正初始化酷含;
- tail:等待隊(duì)列尾部,延遲初始化弄慰,直到調(diào)用enq才真正初始化第美;
- state:AQS狀態(tài)位蝶锋,通過(guò)try*方法維護(hù)陆爽;
- spinForTimeoutThreshold:自旋鎖超時(shí)閥值;
實(shí)際上head是個(gè)空節(jié)點(diǎn),其thread和prev屬性都為null;
Node內(nèi)部類(lèi)
AQS會(huì)將等待線程封裝成Node扳缕,下面看看Node類(lèi)的結(jié)構(gòu):
static final class Node {
static final Node SHARED = new Node();//標(biāo)識(shí)等待節(jié)點(diǎn)處于共享模式
static final Node EXCLUSIVE = null;//標(biāo)識(shí)等待節(jié)點(diǎn)處于獨(dú)占模式
static final int CANCELLED = 1;//由于超時(shí)或中斷慌闭,節(jié)點(diǎn)已被取消
static final int SIGNAL = -1;//表示下一個(gè)節(jié)點(diǎn)是通過(guò)park堵塞的,需要通過(guò)unpark喚醒
static final int CONDITION = -2;//表示線程在等待條件變量(先獲取鎖躯舔,加入到條件等待隊(duì)列驴剔,然后釋放鎖,等待條件變量滿(mǎn)足條件粥庄;只有重新獲取鎖之后才能返回)
static final int PROPAGATE = -3;//表示后續(xù)結(jié)點(diǎn)會(huì)傳播喚醒的操作丧失,共享模式下起作用
//等待狀態(tài):對(duì)于condition節(jié)點(diǎn),初始化為CONDITION惜互;其它情況布讹,默認(rèn)為0,通過(guò)CAS操作原子更新
volatile int waitStatus;
//前節(jié)點(diǎn)
volatile Node prev;
//后節(jié)點(diǎn)
volatile Node next;
//線程對(duì)象
volatile Thread thread;
//對(duì)于Condtion表示下一個(gè)等待條件變量的節(jié)點(diǎn)训堆;其它情況下用于區(qū)分共享模式和獨(dú)占模式描验;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;//判斷是否共享模式
}
//獲取前節(jié)點(diǎn),如果為null坑鱼,拋出異常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { //addWaiter方法使用
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { //Condition使用
this.waitStatus = waitStatus;
this.thread = thread;
}
}
可以看到Node中記錄了等待的線程對(duì)象甲葬、節(jié)點(diǎn)狀態(tài)和前后節(jié)點(diǎn),并且通過(guò)nextWaiter判斷是獨(dú)占還是共享模式:
- 獨(dú)占模式:每次只能有一個(gè)線程能持有資源登馒;
- 共享模式:允許多個(gè)線程同時(shí)持有資源;
例如:
- CountDownLatch的await方法可以在多個(gè)線程中調(diào)用,當(dāng)CountDownLatch的計(jì)數(shù)器為0后溉卓,調(diào)用await的方法都會(huì)依次返回。 也就是說(shuō)多個(gè)線程可以同時(shí)等待await方法返回策肝,因此它適合被設(shè)計(jì)成共享模式键思,因?yàn)樗@取的是一個(gè)共享資源,資源在所有調(diào)用await方法的線程間共享阐枣;
- ReentrantLock提供了lock和unlock方法马靠,只允許一個(gè)線程獲得鎖奄抽,因此它適合被設(shè)計(jì)成獨(dú)占模式,因?yàn)樗@取的是一個(gè)獨(dú)占資源甩鳄,資源不能在調(diào)用lock方法的線程間共享;
- Semaphore維護(hù)了一組許可逞度,acquire方法獲取許可,如果有可用的許可妙啃,方法返回档泽,否則block;可用看到,acquire獲取到也是一個(gè)共享資源揖赴,只不過(guò)資源的數(shù)量有限制馆匿,因此它適合被設(shè)計(jì)成共享模式;
- ReentrantReadWriteLock提供了讀寫(xiě)鎖燥滑,寫(xiě)操作是獨(dú)占的渐北,讀操作是可以彼此共享的,因此它同時(shí)使用了獨(dú)占和共享模式铭拧;
抽象方法說(shuō)明
AbstractQueuedSynchronizer是個(gè)抽象類(lèi)赃蛛,部分方法并未實(shí)現(xiàn),子類(lèi)可以根據(jù)實(shí)際情況實(shí)現(xiàn)全部或部分方法:
//非堵塞獲取獨(dú)占資源,true表示成功
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
//非堵塞釋放獨(dú)占資源,true表示成功
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//非堵塞獲取共享資源搀菩,負(fù)數(shù)表示失敗呕臂,0表示成功但不需要向后傳播,大于0表示成功且可以向后傳播
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
//非堵塞釋放共享資源,true表示成功
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
//在排它模式下,狀態(tài)是否被占用
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
可以看到這些方法主要包括兩組:獨(dú)占方法和共享方法肪跋;一般而言歧蒋,子類(lèi)只需要實(shí)現(xiàn)其中一個(gè)模式即可,因此AQS并沒(méi)有將這些方法定義為抽象的州既;
獨(dú)占模式
acquire
獨(dú)占模式獲取資源;
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 調(diào)用tryAcquire谜洽,如果返回false,表示獲取資源失敗,要進(jìn)行排隊(duì)獲纫滋摇褥琐;
- 調(diào)用addWaiter,創(chuàng)建獨(dú)占模式Node,并加入到等待隊(duì)列的尾部;
- 調(diào)用acquireQueued方法,按照線程加入隊(duì)列的順序獲取資源;
- 如果acquireQueued返回true,表示發(fā)生中斷晤郑,因此通過(guò)selfInterrupt中斷當(dāng)前線程敌呈;
注意:acquire方法會(huì)忽略中斷,當(dāng)中斷發(fā)生時(shí)造寝,并不會(huì)馬上退出磕洪;
上面的第2步調(diào)用了addWaiter方法,方法實(shí)現(xiàn)如下:
private Node addWaiter(Node mode) {
//根據(jù)傳入的模式(獨(dú)占o(jì)r共享)創(chuàng)建Node對(duì)象诫龙;
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//如果pred不為空析显,說(shuō)明有線程在等待
//嘗試使用CAS入列,如果入列失敗签赃,則調(diào)用enq采用自旋的方式入列
//該邏輯在無(wú)競(jìng)爭(zhēng)的情況下才會(huì)成功谷异,快速入列
if (pred != null) {
//所謂的入列分尸,就是將節(jié)點(diǎn)設(shè)置為新的tail節(jié)點(diǎn)
//注意:有可能設(shè)置node的前節(jié)點(diǎn)成功,但是CAS更新失敶踵凇箩绍;
//這種情況下,由于無(wú)法從head或tail找到節(jié)點(diǎn)尺上,問(wèn)題不大材蛛;
//但是對(duì)于isOnSyncQueue這種方法,則會(huì)造成影響怎抛,需要特殊處理
node.prev = pred;
if (compareAndSetTail(pred, node)) {//通過(guò)CAS更新tail節(jié)點(diǎn),關(guān)于CAS卑吭,后面會(huì)專(zhuān)門(mén)寫(xiě)篇文章介紹
//將原tail節(jié)點(diǎn)的后節(jié)點(diǎn)設(shè)置為新tail節(jié)點(diǎn)
//由于CAS和設(shè)置next不是原子操作,因此可能出現(xiàn)更新tail節(jié)點(diǎn)成功马绝,但是未執(zhí)行pred.next = node豆赏,導(dǎo)致無(wú)法從head遍歷節(jié)點(diǎn);
//但是由于前面已經(jīng)設(shè)置了prev屬性迹淌,因此可以從尾部遍歷;
//像getSharedQueuedThreads河绽、getExclusiveQueuedThreads都是從尾部開(kāi)始遍歷
pred.next = node;
return node;
}
}
enq(node);//通過(guò)自旋入列
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;//記錄尾節(jié)點(diǎn)
if (t == null) { //由于采用lazy initialize,當(dāng)隊(duì)列為空時(shí),需要進(jìn)行初始化
//通過(guò)CAS設(shè)置head和tail節(jié)點(diǎn)
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;//將node的前節(jié)點(diǎn)設(shè)置為原tail節(jié)點(diǎn)
if (compareAndSetTail(t, node)) {//CAS更新tail節(jié)點(diǎn)唉窃,更新成功則將原tail節(jié)點(diǎn)的后節(jié)點(diǎn)設(shè)置為node,返回原tail節(jié)點(diǎn)纹笼,入列成功纹份;
t.next = node;
return t;
}
}
}
}
可以看到即使存在多線程競(jìng)爭(zhēng),例如線程1通過(guò)compareAndSetHead初始化了head和tail節(jié)點(diǎn)廷痘,線程2此時(shí)運(yùn)行到**if (t == null) **蔓涧,發(fā)現(xiàn)判斷成立,通過(guò)CAS更新head節(jié)點(diǎn)笋额,此時(shí)會(huì)更新失敗元暴,繼續(xù)下一循環(huán);直到線程1執(zhí)行完tail=head兄猩,線程2才會(huì)進(jìn)入else邏輯茉盏,節(jié)點(diǎn)入列;可以看到:
- head節(jié)點(diǎn)實(shí)際上是個(gè)空節(jié)點(diǎn);
- head節(jié)點(diǎn)是通過(guò)new Node()創(chuàng)建枢冤,因此waitStatus==0;
- 新入列的節(jié)點(diǎn)是通過(guò)Node(Thread thread, Node mode)創(chuàng)建鸠姨,waitStatus==0;
下面繼續(xù)看acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;//未發(fā)生中斷
//仍然通過(guò)自旋,根據(jù)前面的邏輯淹真,此處傳入的為新入列的節(jié)點(diǎn)
for (;;) {
final Node p = node.predecessor();//獲取前節(jié)點(diǎn)讶迁,即prev指向節(jié)點(diǎn)
//如果node的前一節(jié)點(diǎn)為head節(jié)點(diǎn),而head節(jié)點(diǎn)為空節(jié)點(diǎn),說(shuō)明node是等待隊(duì)列里排在最前面的節(jié)點(diǎn)
if (p == head && tryAcquire(arg)) {
//獲取資源成功核蘸,將node設(shè)置為頭節(jié)點(diǎn),setHead清空節(jié)點(diǎn)屬性thread,prev
setHead(node);
p.next = null; // 將原頭節(jié)點(diǎn)的next設(shè)為null,幫助GC
failed = false;
return interrupted;//返回是否發(fā)生中斷
}
//如果acquire失敗巍糯,是否要park,如果是則調(diào)用LockSupport.park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//發(fā)生中斷
}
} finally {
if (failed)//只有循環(huán)中出現(xiàn)異常啸驯,才會(huì)進(jìn)入該邏輯
cancelAcquire(node);
}
}
acquireQueued調(diào)用了shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法,這兩個(gè)方法分別是干什么的呢祟峦?
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//如果acquireQueued第一次調(diào)用該方法坯汤,ws==0
int ws = pred.waitStatus;
//已經(jīng)設(shè)置了狀態(tài),由于SIGNAL表示要通過(guò)unpark喚醒后一節(jié)點(diǎn)搀愧,因此當(dāng)獲取失敗時(shí)惰聂,是要調(diào)用park堵塞的,返回true
if (ws == Node.SIGNAL)
return true;
//如果前一節(jié)點(diǎn)已取消咱筛,則往前找搓幌,直到找到一個(gè)狀態(tài)正常的節(jié)點(diǎn),其實(shí)就是從隊(duì)列刪除取消狀態(tài)的節(jié)點(diǎn)
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;//更新next指針迅箩,去掉中間取消狀態(tài)的節(jié)點(diǎn)
} else {//更新pred節(jié)點(diǎn)的waitStatus為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;//返回false,表示不需要調(diào)用park
}
private final boolean parkAndCheckInterrupt() {
//將當(dāng)前線程的parkBlocker變量指向this溉愁,調(diào)用unsafe.park堵塞當(dāng)前線程
//簡(jiǎn)單來(lái)說(shuō)park是申請(qǐng)?jiān)S可,如果存在許可饲趋,馬上返回拐揭,否則一直等待獲得許可;unpark是將許可數(shù)量設(shè)為1,會(huì)喚醒park返回;
//LockSupport提供了unpark(Thread thread)方法奕塑,可以為指定線程頒發(fā)許可
//如果想更多了解堂污,請(qǐng)閱讀《Java如何實(shí)現(xiàn)線程堵塞》這篇文章
LockSupport.park(this);
return Thread.interrupted();//注意:該方法會(huì)清除線程的中斷狀態(tài)
}
可以看到對(duì)于等待隊(duì)列中的節(jié)點(diǎn),shouldParkAfterFailedAcquire會(huì)將前節(jié)點(diǎn)的狀態(tài)改為Node.SIGNAL;接著在下一次循環(huán)中調(diào)用parkAndCheckInterrupt堵塞線程
最后看看cancelAcquire方法:
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
//獲取node的前向節(jié)點(diǎn)
Node pred = node.prev;
//如果發(fā)現(xiàn)前向節(jié)點(diǎn)狀態(tài)為CANCELLED,則繼續(xù)向前找龄砰,直到找到狀態(tài)正常的節(jié)點(diǎn)
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;//節(jié)點(diǎn)狀態(tài)設(shè)為CANCELLED
//如果node為tail節(jié)點(diǎn)盟猖,則將pred更新為tail節(jié)點(diǎn)
if (node == tail && compareAndSetTail(node, pred)) {
//由于pred為新的尾節(jié)點(diǎn),因此將其next設(shè)為null
compareAndSetNext(pred, predNext, null);
} else {//如果node不是尾節(jié)點(diǎn)
int ws;
//當(dāng)滿(mǎn)足下面三個(gè)條件换棚,將pred的next指向node的下一節(jié)點(diǎn):
//1.pred不是head節(jié)點(diǎn):如果pred為頭節(jié)點(diǎn)式镐,而node又被cancel,則node.next為等待隊(duì)列中的第一個(gè)節(jié)點(diǎn),需要unpark喚醒
//2.pred節(jié)點(diǎn)狀態(tài)為SIGNAL或能更新為SIGNAL
//3.pred的thread變量不能為null
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
//更新pred的next,指向node的下一節(jié)點(diǎn)
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);//如果pred為頭節(jié)點(diǎn)固蚤,則喚醒node的后節(jié)點(diǎn)
}
node.next = node; // help GC
}
}
unparkSuccessor方法喚醒下一節(jié)點(diǎn):
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//如果節(jié)點(diǎn)為空或者被取消了娘汞,則從隊(duì)列尾部開(kāi)始查找,找到離node最近的非null且狀態(tài)正常的節(jié)點(diǎn)
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//取出找到節(jié)點(diǎn)的線程對(duì)象夕玩,通過(guò)unpark你弦,頒發(fā)許可;
if (s != null)
LockSupport.unpark(s.thread);
}
acquireInterruptibly
該方法和acquire類(lèi)似,只不過(guò)發(fā)生中斷時(shí)风秤,會(huì)拋出InterruptedException鳖目;
release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//unpark喚醒第一個(gè)等待節(jié)點(diǎn)
return true;
}
return false;
}
可以看到邏輯很簡(jiǎn)單,從等待隊(duì)列中取出第一個(gè)等待的節(jié)點(diǎn)缤弦,通過(guò)unparkSuccessor調(diào)用unpark釋放資源领迈;
共享模式方法
acquireShared
acquireShared用于共享模式下獲取資源,該方法會(huì)忽略中斷:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared方法AQS未實(shí)現(xiàn),留待子類(lèi)實(shí)現(xiàn)狸捅;主要看看doAcquireShared的邏輯:
private void doAcquireShared(int arg) {
//創(chuàng)建共享節(jié)點(diǎn)并加入到等待隊(duì)列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {//如果是第一個(gè)等待節(jié)點(diǎn)衷蜓,則調(diào)用tryAcquireShared
//負(fù)數(shù)表示失敗,0表示成功當(dāng)無(wú)法傳播,1表示成功且可傳播
int r = tryAcquireShared(arg);
if (r >= 0) {
//設(shè)置head節(jié)點(diǎn)尘喝,檢查下一個(gè)等待節(jié)點(diǎn)是否是共享模式,如果是磁浇,向下傳播
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();//恢復(fù)中斷狀態(tài)
failed = false;
return;
}
}
//前面已經(jīng)有介紹
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看到acquire調(diào)用的是setHead,而acquireShared調(diào)用的是** setHeadAndPropagate**:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);//前見(jiàn)面的分析
//1.傳入propagate>0
//2.head為null:什么情況會(huì)變?yōu)閚ull?
//3. 之前操作已經(jīng)設(shè)置了后續(xù)節(jié)點(diǎn)需要喚醒
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {//如果等待隊(duì)列中有等待線程
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {//需要unpark后續(xù)節(jié)點(diǎn)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
從上面的分析可以知道,獨(dú)占模式和共享模式的最大區(qū)別在于獨(dú)占模式只允許一個(gè)線程持有資源朽褪,而共享模式下置吓,當(dāng)調(diào)用doAcquireShared時(shí),會(huì)看后續(xù)的節(jié)點(diǎn)是否是共享模式缔赠,如果是衍锚,會(huì)通過(guò)unpark喚醒后續(xù)節(jié)點(diǎn);
從前面的分析可以知道嗤堰,被喚醒的節(jié)點(diǎn)是被堵塞在doAcquireShared的parkAndCheckInterrupt方法戴质,因此喚醒之后,會(huì)再次調(diào)用setHeadAndPropagate,從而將等待共享鎖的線程都喚醒踢匣,也就是說(shuō)會(huì)將喚醒傳播下去告匠;
- 加入同步隊(duì)列并阻塞的節(jié)點(diǎn),它的前驅(qū)節(jié)點(diǎn)只會(huì)是SIGNAL离唬,表示前驅(qū)節(jié)點(diǎn)釋放鎖時(shí)后专,后繼節(jié)點(diǎn)會(huì)被喚醒。shouldParkAfterFailedAcquire()方法保證了這點(diǎn)男娄,如果前驅(qū)節(jié)點(diǎn)不是SIGNAL,它會(huì)把它修改成SIGNAL行贪。
- 造成前驅(qū)節(jié)點(diǎn)是PROPAGATE的情況是前驅(qū)節(jié)點(diǎn)獲得鎖時(shí),會(huì)喚醒一次后繼節(jié)點(diǎn)模闲,但這時(shí)候后繼節(jié)點(diǎn)還沒(méi)有加入到同步隊(duì)列,所以暫時(shí)把節(jié)點(diǎn)狀態(tài)設(shè)置為PROPAGATE,當(dāng)后繼節(jié)點(diǎn)加入同步隊(duì)列后崭捍,會(huì)把PROPAGATE設(shè)置為SIGNAL,這樣前驅(qū)節(jié)點(diǎn)釋放鎖時(shí)會(huì)再次doReleaseShared尸折,這時(shí)候它的狀態(tài)已經(jīng)是SIGNAL了,就可以喚醒后續(xù)節(jié)點(diǎn)了殷蛇。
舉例說(shuō)明:例如讀寫(xiě)鎖实夹,寫(xiě)讀操作和寫(xiě)寫(xiě)操作互斥,讀讀之間不互斥粒梦;當(dāng)調(diào)用acquireShared獲取讀鎖時(shí)亮航,會(huì)檢查后續(xù)節(jié)點(diǎn)是否是獲取讀鎖,如果是匀们,則同樣釋放缴淋;
releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
doReleaseShared會(huì)喚醒第一個(gè)等待節(jié)點(diǎn), 根據(jù)前面acquireShared的邏輯,被喚醒的線程會(huì)通過(guò)setHeadAndPropagate繼續(xù)喚醒后續(xù)等待的線程重抖。