1.AQS概覽
AbstractQueuedSynchronizer
簡(jiǎn)稱AQS
滨彻,是JUC中實(shí)現(xiàn)并發(fā)的基礎(chǔ)亭饵,ReentrantLock
辜羊、CountDownLatch
八秃、Semaphore
喜德、ReentrantReadWriteLock
底層都是基于AQS
實(shí)現(xiàn)并發(fā)控制的。根據(jù)AQS
字面含義萌衬,其本質(zhì)上是一個(gè)同步隊(duì)列秕豫,主要保存在鎖競(jìng)爭(zhēng)中失敗的線程混移,并在適當(dāng)?shù)臅r(shí)機(jī)喚醒它們歌径,AQS
設(shè)計(jì)成模板方法回铛,獲取鎖的邏輯則交給子類來(lái)實(shí)現(xiàn)腔长。大體流程如下:
本文所討論的獨(dú)占模式與共享模式下鎖的獲取以及釋放過(guò)程是指:獲取鎖失敗在同步隊(duì)列中被阻塞,以及持有鎖的線程在釋放鎖后鸟召,同步隊(duì)列中的阻塞線程被喚醒搶奪鎖的過(guò)程药版,獲取與釋放鎖的實(shí)現(xiàn)都是由子類自己實(shí)現(xiàn)槽片,而同步隊(duì)列只是負(fù)責(zé)保存因搶奪鎖失敗而阻塞的線程和被喚醒后成功搶奪鎖的線程的出隊(duì)操作。
2.AQS重要變量
我們把一個(gè)AQS
對(duì)象叫做synchronization(同步器)剩盒,子類會(huì)在構(gòu)造函數(shù)中初始化出一個(gè)AQS
對(duì)象辽聊。AQS
分為兩種模式:獨(dú)占模式與共享模式跟匆。
- exclusiveOwnerThread:The current owner of exclusive mode synchronization(在獨(dú)占模式中,當(dāng)一個(gè)線程獲得鎖時(shí)迹冤,會(huì)將該線程設(shè)置給該變量保存泡徙,會(huì)在子類實(shí)現(xiàn)獲取鎖時(shí)用到)
- state:The synchronization state(同步器狀態(tài)锋勺,0代表未獲取鎖,1代表獲取到鎖)
進(jìn)入到AQS
中的線程苏章,都會(huì)被分配一個(gè)Node
節(jié)點(diǎn)枫绅,該節(jié)點(diǎn)會(huì)記錄線程thread
并淋,等待狀態(tài)waitStatus
、前驅(qū)節(jié)點(diǎn)prev
以及后繼節(jié)點(diǎn)next
兔毙,等待狀態(tài)會(huì)在喚醒節(jié)點(diǎn)搶奪鎖的邏輯中使用澎剥。在AQS
中Node
是載體,可以直觀地理解為線程就是Node
叙量。
static final class Node {
/*標(biāo)識(shí)共享模式*/
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/*標(biāo)識(shí)獨(dú)占模式*/
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/*線程處于取消狀態(tài)*/
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/*后繼節(jié)點(diǎn)可以被喚醒*/
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/*node節(jié)點(diǎn)狀態(tài)*/
volatile int waitStatus;
/*前驅(qū)節(jié)點(diǎn)*/
volatile Node prev;
/*后繼節(jié)點(diǎn)*/
volatile Node next;
/*當(dāng)前線程*/
volatile Thread thread;
/*標(biāo)識(shí)獨(dú)占模式or共享模式*/
Node nextWaiter;
}
3.獨(dú)占模式下獲取鎖的過(guò)程
我們以ReentrantLock
的非公平鎖實(shí)現(xiàn)為例分析該過(guò)程宛乃,ReentrantLock
調(diào)用lock
方法的時(shí)候,首先會(huì)使用AQS
提供的CAS操作更新狀態(tài)蒸辆,如果更新成功征炼,則將當(dāng)前線程設(shè)置成synchronization
的owner;更新失敗則調(diào)用acquire
操作進(jìn)行入隊(duì)操作躬贡。
final void lock() {
/*cas操作更新synchronization狀態(tài)并*/
if (compareAndSetState(0, 1))
/*如果更新成功則設(shè)置當(dāng)前線程為synchronization的owner*/
setExclusiveOwnerThread(Thread.currentThread());
else
/*更新失敗則調(diào)用該方法進(jìn)行后續(xù)的入隊(duì)操作谆奥,重入鎖邏輯也在此實(shí)現(xiàn)*/
acquire(1);
}
3.1 調(diào)用acquire
方法
acquire
中主要有三個(gè)邏輯
- 調(diào)用子類實(shí)現(xiàn)的
tryAcquire
方法獲取鎖拂玻,獲取鎖失敗則進(jìn)入下面的操作 - 調(diào)用
addWaiter
方法對(duì)當(dāng)前獲取鎖失敗的線程進(jìn)行入隊(duì)操作 - 調(diào)用
acquireQueued
方法對(duì)當(dāng)前線程進(jìn)行阻塞
流程圖如下:
public final void acquire(int arg) {
/*tryAcquire獲取鎖酸些,該邏輯由子類實(shí)現(xiàn)宰译,addWaiter方法將當(dāng)前線程進(jìn)行入隊(duì)操作,acquireQueued將阻塞該線程*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
3.2 調(diào)用addWaiter
方法
addWaiter
方法的主要作用是針對(duì)當(dāng)前線程創(chuàng)建節(jié)點(diǎn)Node
并把節(jié)點(diǎn)插入到隊(duì)尾魄懂,分為以下幾步:
- 1.創(chuàng)建新的node存放當(dāng)前線程沿侈,將新創(chuàng)建節(jié)點(diǎn)的前驅(qū)設(shè)置為tail節(jié)點(diǎn)
- 2.cas操作更新尾節(jié)點(diǎn)
- 3.如果cas操作更新成功則將原來(lái)的尾部節(jié)點(diǎn)的后繼節(jié)點(diǎn)設(shè)置為node,失敗則調(diào)用
enq
方法
需要注意的是addWaiter
方法處于多線程環(huán)境中市栗,只有當(dāng)step2執(zhí)行成功缀拭,才算成功,整體流程圖如下:
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
/*創(chuàng)建新的node存放當(dāng)前線程*/
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
/*將新創(chuàng)建節(jié)點(diǎn)的前驅(qū)設(shè)置為tail節(jié)點(diǎn)*/
node.prev = pred;
/*cas操作更新尾節(jié)點(diǎn)*/
if (compareAndSetTail(pred, node)) {
/*原來(lái)的尾部節(jié)點(diǎn)的后繼節(jié)點(diǎn)設(shè)置為node*/
pred.next = node;
return node;
}
}
/*cas操作失敗則進(jìn)入enq方法*/
enq(node);
return node;
}
3.3 調(diào)用enq
方法進(jìn)行入隊(duì)操作
enq
方法依然處于并發(fā)環(huán)境中填帽,也是依靠cas操作來(lái)達(dá)成目的蛛淋,有兩種情況會(huì)進(jìn)入enq
方法,第一種是CLH隊(duì)列中沒有任何節(jié)點(diǎn)篡腌,第一個(gè)進(jìn)入到該隊(duì)列中的線程褐荷,因?yàn)?code>tail==null進(jìn)入到該方法,在enq
中會(huì)創(chuàng)建一個(gè)空的頭節(jié)點(diǎn)嘹悼,并設(shè)置尾節(jié)點(diǎn)與頭節(jié)點(diǎn)相同叛甫,CLH中的頭節(jié)點(diǎn)是不存放任何信息的,只是方便用于遍歷使用绘迁;第二種是入隊(duì)操作失敗的線程合溺,會(huì)在該方法中反復(fù)重試,直到入隊(duì)缀台。
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
/*第一次入隊(duì)棠赛,head和tail還沒有被初始化*/
if (t == null) { // Must initialize
/*cas操作設(shè)置dummyhead,頭節(jié)點(diǎn)是不放置任何信息的*/
if (compareAndSetHead(new Node()))
tail = head;
} else {
/*與addWaiter方法中入隊(duì)邏輯一致*/
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
3.4 調(diào)用acquireQueued
方法完成線程的阻塞
調(diào)用addWaiter
方法后會(huì)返回成功入隊(duì)的節(jié)點(diǎn)作為入?yún)魅?code>acquireQueued膛腐,進(jìn)入該方法會(huì)判斷該節(jié)點(diǎn)是否為位于隊(duì)首的元素即該節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)是否為head
節(jié)點(diǎn)睛约,如果位于隊(duì)首則嘗試獲取鎖,獲取成功則設(shè)置新的頭節(jié)點(diǎn)哲身,失敗就進(jìn)入shouldParkAfterFailedAcquire
方法判斷是否調(diào)用LockSupport.park(this)
掛起辩涝,被掛起后,當(dāng)其他節(jié)點(diǎn)喚醒該線程的時(shí)候勘天,又會(huì)重復(fù)此過(guò)程怔揩,喚醒后的操作會(huì)在釋放鎖的章節(jié)詳細(xì)介紹,主要流程圖如下:
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
/*獲取入隊(duì)節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)*/
final Node p = node.predecessor();
/**
*如果前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)脯丝,說(shuō)明該節(jié)點(diǎn)位于隊(duì)首商膊,繼續(xù)搶奪鎖,
*如果成功則將該節(jié)點(diǎn)信息清空宠进,設(shè)置成頭節(jié)點(diǎn)
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
/*將前驅(qū)節(jié)點(diǎn)置空晕拆,有助于GC*/
p.next = null; // help GC
failed = false;
return interrupted;
}
/*滿足park條件則調(diào)用LockSupport.park()方法阻塞該線程,等待喚醒*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
接下來(lái)重點(diǎn)分析下shouldParkAfterFailedAcquire
方法材蹬,該方法的作用是設(shè)置節(jié)點(diǎn)狀態(tài)实幕,設(shè)置的是當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)的狀態(tài)吝镣,該邏輯主要是在喚醒的時(shí)候使用,當(dāng)前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)且狀態(tài)是SIGNAL
昆庇,則會(huì)通知喚醒當(dāng)前節(jié)點(diǎn)末贾,此處的實(shí)現(xiàn)類比于節(jié)點(diǎn)在入隊(duì)時(shí)會(huì)告知排在自己前面的節(jié)點(diǎn),自己已經(jīng)做好要被喚醒的準(zhǔn)備凰锡,在特定的時(shí)刻喚醒自己未舟,如果在之后出現(xiàn)其他狀態(tài)的變更圈暗,比如更新成為CANCELLED
狀態(tài)掂为,則被取消,不需要再被喚醒员串。
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
/*獲取前驅(qū)節(jié)點(diǎn)的waitStatus*/
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/**
*前驅(qū)節(jié)點(diǎn)waitStatus狀態(tài)設(shè)置為SIGNAL后勇哗,當(dāng)前節(jié)點(diǎn)就能安全的被park,
*因?yàn)樵谇膀?qū)節(jié)點(diǎn)釋放鎖的時(shí)候就能保證會(huì)喚醒該節(jié)點(diǎn)
*/
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
/*如果前驅(qū)節(jié)點(diǎn)是cancelled狀態(tài)寸齐,則跳過(guò)欲诺,將該節(jié)點(diǎn)剔除隊(duì)列,直到找到非cancel狀態(tài)的節(jié)點(diǎn)*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
/*獨(dú)占模式中渺鹦,這里的waitStatus只會(huì)有0這種狀態(tài)扰法,將前驅(qū)節(jié)點(diǎn)設(shè)置為SIGNAL*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
當(dāng)shouldParkAfterFailedAcquire
返回true
之后,代表該節(jié)點(diǎn)下的線程就能被安全的park毅厚,因此調(diào)用LockSupport.park
方法塞颁。至此,整個(gè)阻塞的過(guò)程就算完成了吸耿。
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
4.獨(dú)占模式下釋放鎖的過(guò)程
以ReentrantLock
中非公平鎖釋放的邏輯為例祠锣,本質(zhì)上是調(diào)用AQS
提供的release
方法
/**
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then the hold
* count is decremented. If the hold count is now zero then the lock
* is released. If the current thread is not the holder of this
* lock then {@link IllegalMonitorStateException} is thrown.
*
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
public void unlock() {
sync.release(1);
}
4.1 release
方法調(diào)用
release
中釋放鎖的邏輯核心就是unparkSuccessor
,最終喚醒線程的邏輯都是放在unparkSuccessor
中來(lái)實(shí)現(xiàn)的咽安。
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
/*tryRelease釋放鎖邏輯由子類自己實(shí)現(xiàn)*/
if (tryRelease(arg)) {
Node h = head;
/**
*確保頭節(jié)點(diǎn)已經(jīng)被初始化并且有節(jié)點(diǎn)做過(guò)入隊(duì)后已經(jīng)設(shè)置過(guò)waitStatus狀態(tài)
*因?yàn)橹挥姓{(diào)用過(guò)shouldParkAfterFailedAcquire之后waitStatus才會(huì)!=0
*h.waitStatus != 0的邏輯主要是排除頭節(jié)點(diǎn)已經(jīng)初始化伴网,但是還沒有
*調(diào)用shouldParkAfterFailedAcquire設(shè)置waitStatus的情況,這樣調(diào)
*用unparkSuccessor時(shí)就沒有滿足條件可以被喚醒的節(jié)點(diǎn)妆棒,因此不再調(diào)用澡腾。
*/
if (h != null && h.waitStatus != 0)
/*調(diào)用unpark方法喚醒后繼節(jié)點(diǎn)*/
unparkSuccessor(h);
return true;
}
return false;
}
4.2 unparkSuccessor
方法調(diào)用
該方法中會(huì)以head節(jié)點(diǎn)作為入?yún)ⅲ紫雀鶕?jù)head節(jié)點(diǎn)獲取到隊(duì)首的元素(很好實(shí)現(xiàn)糕珊,只需要調(diào)用head節(jié)點(diǎn)的next方法)动分,當(dāng)隊(duì)首元素為空或者waitStatus
大于0(即CANCELLED
狀態(tài))的時(shí)候,會(huì)執(zhí)行在隊(duì)列中從后往前遍歷的操作放接,找到距離頭節(jié)點(diǎn)最近且符合喚醒條件(waitStatus == SIGNAL
)的節(jié)點(diǎn)并調(diào)用unpark
方法喚醒該節(jié)點(diǎn)中的線程刺啦。該處有兩個(gè)比較特殊的情況,第一種是隊(duì)列中只有一個(gè)節(jié)點(diǎn)(除頭節(jié)點(diǎn)外)纠脾,因?yàn)闆]有后續(xù)節(jié)點(diǎn)玛瘸,所以waitStatus
值不會(huì)被它的后繼節(jié)點(diǎn)設(shè)置蜕青,因此等于初始值為0,不滿足s == null || s.waitStatus > 0
的條件糊渊,在最后直接被LockSupport.unpark(s.thread)
調(diào)用右核,喚醒;第二種情況渺绒,雖然隊(duì)首沒有節(jié)點(diǎn)(除頭節(jié)點(diǎn)外)贺喝,但還是從隊(duì)尾往前遍歷尋找節(jié)點(diǎn),理想情況是隊(duì)首為空則代表隊(duì)列為空宗兼,沒有遍歷隊(duì)列的必要躏鱼,但是整個(gè)環(huán)境是在多線程的情況下,有可能在判空之后殷绍,又有新的線程進(jìn)入到隊(duì)列之中了染苛。
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
/*先獲取頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),即隊(duì)首節(jié)點(diǎn)*/
Node s = node.next;
/*如果沒有節(jié)點(diǎn)或者該節(jié)點(diǎn)狀態(tài)為cancel狀態(tài)主到,則從隊(duì)尾往前遍歷茶行,找出最前面的符合條件的節(jié)點(diǎn)喚醒*/
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
unparkSuccessor
代碼中還有一個(gè)很巧妙的邏輯是遍歷隊(duì)列的時(shí)候是從隊(duì)尾往前遍歷,即使是在判空之后登钥,隊(duì)列中又插入了新的元素畔师,似乎無(wú)論是從前往后還是從后到前遍歷是沒有區(qū)別的,顯然這塊的邏輯是和其他原因有關(guān)系的牧牢,問(wèn)題是出在入隊(duì)的時(shí)候看锉,下面的這段代碼是addWaiter
和enq
中都有的邏輯,目的是向隊(duì)尾中插入新的節(jié)點(diǎn)结执。這個(gè)操作一共由三步完成度陆,它是在并發(fā)環(huán)境中并且不是原子操作,所以會(huì)出現(xiàn)在cas操作成功之后献幔,t.next = node
還沒有完成懂傀,即部分節(jié)點(diǎn)的后繼還沒有被更新,因此雖然節(jié)點(diǎn)已經(jīng)入隊(duì)蜡感,但是從前往后遍歷還是會(huì)出現(xiàn)無(wú)法找到后繼節(jié)點(diǎn)的情況蹬蚁。
//1.更新前驅(qū)節(jié)點(diǎn)
node.prev = t;
//2.cas操作更新尾節(jié)點(diǎn)
if (compareAndSetTail(t, node)) {
//3.更新尾節(jié)點(diǎn)成功后,更新后繼節(jié)點(diǎn)
t.next = node;
return t;
}
4.3 線程喚醒后的操作
當(dāng)在unparkSuccessor
中調(diào)用LockSupport.unpark(s.thread)
后郑兴,之前被阻塞的線程就會(huì)被喚醒
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
喚醒后首先調(diào)用Thread.interrupted()
判斷該線程是否被中斷過(guò)(因?yàn)樵诰€程被中斷的時(shí)候也會(huì)立馬被喚醒)犀斋,如果被中斷過(guò),acquireQueued
方法中的interrupted
值就為true情连,因此在acquire
方法中叽粹,就會(huì)調(diào)用selfInterrupt
方法對(duì)線程進(jìn)行中斷,Thread.interrupted()
方法的調(diào)用會(huì)導(dǎo)致中斷標(biāo)記被清除,所以需要調(diào)用selfInterrupt
再次中斷該線程虫几。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
這也是為什么ReentrantLock
的lock
方法無(wú)法響應(yīng)中斷的原因锤灿,即使線程因?yàn)橹袛啾粏拘眩^續(xù)從LockSupport.park(this);
處的代碼開始執(zhí)行辆脸,但是如果無(wú)法獲取到鎖但校,依然會(huì)被再次阻塞。
5 共享模式下鎖的獲取
分析共享模式之前啡氢,先展示下兩種模式下方法的對(duì)應(yīng)關(guān)系:
獨(dú)占模式 | 共享模式 |
---|---|
tryAcquire(int arg) | tryAcquireShared(int arg) |
acquire(int arg) | acquireShared(int arg) |
acquireQueued(final Node node, int arg) | doAcquireShared(int arg) |
tryRelease(int arg) | tryReleaseShared(int arg) |
release(int arg) | releaseShared(int arg) |
在共享模式下状囱,鎖可以被多個(gè)線程同時(shí)獲取(Semaphore
的設(shè)計(jì)與實(shí)現(xiàn)就是基于共享模式下的AQS)倘是,因此在獲取鎖以及釋放鎖的時(shí)候都會(huì)喚醒線程去搶奪鎖亭枷。在Node
類中通過(guò)nextWaiter
來(lái)標(biāo)識(shí)共享模式(SHARED
)與獨(dú)占模式(EXCLUSIVE
)下的節(jié)點(diǎn)。acquireShared
方法用于在共享模式下獲取鎖辨绊,其中tryAcquireShared
由子類自己實(shí)現(xiàn)奶栖,當(dāng)tryAcquireShared
失敗時(shí),即沒有成功獲取鎖的時(shí)候门坷,會(huì)調(diào)用doAcquireShared
方法。
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
5.1 調(diào)用doAcquireShared
獲取鎖
該方法的實(shí)現(xiàn)和acquireQueued
非常像袍镀,具體可以參照上文的分析對(duì)比來(lái)看默蚌,doAcquireShared
創(chuàng)建新節(jié)點(diǎn)的時(shí)候指定為Node.SHARED
共享模式,獲取鎖之后調(diào)用setHeadAndPropagate
苇羡,該方法主要設(shè)置當(dāng)前節(jié)點(diǎn)為頭節(jié)點(diǎn)并且喚醒同步隊(duì)列中處于共享模式下的節(jié)點(diǎn)绸吸。其中setHeadAndPropagate
會(huì)調(diào)用doReleaseShared
喚醒同步隊(duì)列中的線程,釋放鎖的時(shí)候也會(huì)調(diào)用doReleaseShared
设江,doReleaseShared
統(tǒng)一放在后文分析锦茁。
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
/*設(shè)置節(jié)點(diǎn)為共享模式*/
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
/*獲取前驅(qū)節(jié)點(diǎn),如果為頭節(jié)點(diǎn)的話叉存,則嘗試獲取鎖*/
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
/*r>=0代表獲取鎖成功码俩,線程被喚醒*/
if (r >= 0) {
/*設(shè)置新的頭節(jié)點(diǎn)并且喚醒同步隊(duì)列中的線程*/
setHeadAndPropagate(node, r);
p.next = null; // help GC
/*檢測(cè)線程是否被中斷過(guò),如果中斷過(guò)則再次調(diào)用中斷邏輯*/
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
6 共享模式下鎖的釋放
6.1 releaseShared
釋放共享模式下的鎖
tryReleaseShared
邏輯由子類實(shí)現(xiàn)歼捏,釋放鎖成功后調(diào)用doReleaseShared
喚醒同步隊(duì)列中的線程稿存。
public final boolean releaseShared(int arg) {
/*tryReleaseShared釋放鎖,由子類實(shí)現(xiàn)*/
if (tryReleaseShared(arg)) {
/*喚醒同步隊(duì)列中的線程*/
doReleaseShared();
return true;
}
return false;
}
6.2 doReleaseShared
喚醒同步隊(duì)列中的線程
成功獲取加鎖和釋放鎖的時(shí)候都會(huì)調(diào)用該方法喚醒處于同步隊(duì)列中隊(duì)首的線程瞳秽,這里其實(shí)是AQS
本身的一種優(yōu)化瓣履,加速喚醒同步隊(duì)列中的元素。對(duì)于一次喚醒操作练俐,可以分解為以下幾步:
- 用戶代碼層面調(diào)用子類釋放鎖代碼
- 釋放鎖成功之后調(diào)用
doReleaseShared
喚醒同步隊(duì)列中的頭節(jié)點(diǎn)袖迎,如果頭節(jié)點(diǎn)無(wú)變化該線程退出,否則繼續(xù)進(jìn)行for循環(huán) - 隊(duì)首節(jié)點(diǎn)被喚醒,如果成功獲取鎖則設(shè)置新的頭節(jié)點(diǎn)(
setHeadAndPropagate
)燕锥,調(diào)用doReleaseShared
方法重復(fù)上述邏輯浴韭,如果未成功獲取到鎖則再次被park
unparkSuccessor
方法在上文中已經(jīng)詳細(xì)分析過(guò)了,重點(diǎn)分析下該方法中加速喚醒線程的邏輯脯宿,主要靠h == head
這段邏輯實(shí)現(xiàn)念颈,例舉一種doReleaseShared
中h == head
不成立的場(chǎng)景剖膳,用戶層面的線程a釋放鎖之后梢为,位于隊(duì)首的線程t1被喚醒,t1調(diào)用setHeadAndPropagate
方法設(shè)置頭節(jié)點(diǎn)為t1裳朋,但還未調(diào)用doReleaseShared
中的unparkSuccessor
方法跺撼,這時(shí)用戶層面的線程b釋放鎖窟感,喚醒位于隊(duì)首的線程t2,t2調(diào)用setHeadAndPropagate
設(shè)置新的頭節(jié)點(diǎn)為t2歉井,這個(gè)時(shí)候t1繼續(xù)執(zhí)行柿祈,最后發(fā)現(xiàn)隊(duì)首元素已經(jīng)變化,繼續(xù)for循環(huán)調(diào)用unparkSuccessor
方法喚醒隊(duì)首元素哩至。流程如下所示:
doReleaseShared
中h == head
不成立時(shí)進(jìn)入for循環(huán)持續(xù)喚醒同步隊(duì)列中線程的邏輯躏嚎,主要是一種加速喚醒的優(yōu)化邏輯,當(dāng)頭節(jié)點(diǎn)發(fā)生變化時(shí)菩貌,說(shuō)明此時(shí)有不止一個(gè)線程釋放鎖卢佣,而在共享模式下,鎖是能夠被不止一個(gè)線程所持有的箭阶,因此應(yīng)該趨向于喚醒更多同步隊(duì)列中的線程來(lái)獲取鎖虚茶。
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
/*喚醒同步隊(duì)列隊(duì)首節(jié)點(diǎn)的具體邏輯*/
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
/*如果喚醒其他線程的過(guò)程中,頭節(jié)點(diǎn)未被修改過(guò)仇参,則終止*/
if (h == head) // loop if head changed
break;
}
}
7 總結(jié)
AQS
本質(zhì)上來(lái)說(shuō)就是實(shí)現(xiàn)一個(gè)隊(duì)列的功能嘹叫,因?yàn)閾寠Z鎖失敗的線程都會(huì)被記錄到該隊(duì)列中且被阻塞,在特定的時(shí)候出隊(duì)然后被喚醒诈乒,整個(gè)流程是遵守FIFO
先進(jìn)先出的規(guī)則罩扇。共享模式與獨(dú)占模式的區(qū)別主要是在共享模式下(使用共享模式的子類)是支持多個(gè)線程獲取鎖的,圍繞此目的抓谴,兩者的設(shè)計(jì)會(huì)有所不同暮蹂。