一油航、前言
抽象隊(duì)列同步器蒙畴,是一個(gè)抽象類受楼,它采用了模板方法的設(shè)計(jì)模式《Java設(shè)計(jì)模式二:模板模式》转唉。同時(shí)命迈,它也內(nèi)置了 CLH 隊(duì)列《小白十:CLH》贩绕,讓加入的線程自旋,當(dāng)該線程(節(jié)點(diǎn))的前驅(qū)節(jié)點(diǎn)拿到鎖時(shí)壶愤,該線程就進(jìn)入阻塞淑倾,等待前驅(qū)節(jié)點(diǎn)的喚醒,而線程的喚醒方式則是通過線程中斷的方式來實(shí)現(xiàn)的征椒,關(guān)于線程中斷娇哆,大家可以詳看:《小白十一:線程中斷》。
既然 AQS 是抽象類勃救,所以碍讨,本文只分析核心流程,遇到需要子類實(shí)現(xiàn)的方法時(shí)蒙秒,我們先跳過勃黍,到時(shí)候再結(jié)合具體的子類我們進(jìn)行整體分析。
二晕讲、獨(dú)占與共享
我們?cè)趯W(xué)習(xí) CLH 的時(shí)候覆获,就講到了一個(gè)節(jié)點(diǎn)可能是獨(dú)占模式,也可能是共享模式瓢省,但對(duì)于 AQS 來說弄息,但凡沒有獲取到鎖的線程,都要被加入到 AQS 的 CLH 隊(duì)列中勤婚,因此摹量,AQS 并不是特別關(guān)心獲取鎖的線程是哪種模式,我們?cè)谑褂媚0逶O(shè)計(jì)模式時(shí)蛔六,也需要有這種思想荆永。
雖然 AQS 不關(guān)心,但它給出了一組可選的方法需要子類來重載:
- 如果子類是獨(dú)占模式国章,則只需要重載并實(shí)現(xiàn)獨(dú)占模式相關(guān)的方法(比如:寫鎖)具钥;
- 如果子類 是共享模式,則只需要重載并實(shí)現(xiàn)共享模式相關(guān)的方法(比如:讀鎖)液兽;
- 那如果都實(shí)現(xiàn)了呢骂删?這種鎖就是上面兩種的結(jié)合(讀寫鎖)掌动;
這段注釋說明中,我們可以了解到幾點(diǎn):
- 獲取狀態(tài): getState宁玫;
- 修改狀態(tài):setState / compareAndSetState粗恢;
- 另外5個(gè)方法需要我們根據(jù)實(shí)際情況下選擇性重載;
- 可選性的方法實(shí)現(xiàn)時(shí)欧瘪,需要保證內(nèi)部線程安全眷射,且快速以及不被阻塞;
- AQS 中的其它方法不允許被重載佛掖,因?yàn)槭?final 的妖碉;
因此,AQS 提供了幾個(gè)可選的方法需要實(shí)現(xiàn):
private static final class LockSync extends AbstractQueuedSynchronizer {
protected LockSync() {
super();
}
@Override
protected boolean isHeldExclusively() {
return super.isHeldExclusively();
}
@Override
protected boolean tryAcquire(int arg) {
return super.tryAcquire(arg);
}
@Override
protected boolean tryRelease(int arg) {
return super.tryRelease(arg);
}
@Override
protected int tryAcquireShared(int arg) {
return super.tryAcquireShared(arg);
}
@Override
protected boolean tryReleaseShared(int arg) {
return super.tryReleaseShared(arg);
}
}
我們來分析一下這幾個(gè)方法的含義:
- isHeldExclusively:該線程是否正在獨(dú)占資源芥被。只有用到Condition才需要去實(shí)現(xiàn)它欧宜;
- tryAcquire:獨(dú)占方式。arg為獲取鎖的次數(shù)拴魄,嘗試獲取資源冗茸,成功則返回True,失敗則返回False匹中;
- tryRelease:獨(dú)占方式夏漱。arg為釋放鎖的次數(shù),嘗試釋放資源顶捷,成功則返回True麻蹋,失敗則返回False;
- tryAcquireShared:共享方式焊切。arg為獲取鎖的次數(shù),嘗試獲取資源芳室;
- 負(fù)數(shù)表示失斪ǚ尽;
- 0表示成功堪侯,但沒有剩余可用資源嚎尤;
- 正數(shù)表示成功,且有剩余資源伍宦;
- tryReleaseShared:共享方式芽死。arg為釋放鎖的次數(shù),嘗試釋放資源次洼,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回True关贵,否則返回False;
根據(jù)上面這些卖毁,我們之后會(huì)分析到的(提前劇透):
- ReentrantLock:是獨(dú)占鎖揖曾,所以實(shí)現(xiàn)了 tryAcquire + tryRelease + isHeldExclusively;
- ReentrantReadWriteLock:是共享鎖,所以都實(shí)現(xiàn)了炭剪;
三练链、AQS 核心流程
我們的 AQS 內(nèi)部采用 CLH 隊(duì)列來管理所有需要獲取鎖而獲取失敗,需要加入到隊(duì)列中奴拦,本地自旋來等待鎖的釋放媒鼓。
因此,流程的起點(diǎn)是:
- 阻塞的線程從『嘗試獲取鎖』開始错妖;
- 占用中的線程在使用完后绿鸣,從『嘗試釋放鎖』開始;
這也是符合『鎖』的流程:加鎖 or 解鎖站玄!
我們來看個(gè)獨(dú)占鎖的請(qǐng)求鎖和釋放鎖的流程:
// 獲取鎖流程
Acquire:
while (!tryAcquire(arg)) {
// enqueue thread if it is not already queued;
// possibly block current thread;
}
// 釋放鎖流程
Release:
if (tryRelease(arg)) {
// unblock the first queued thread;
}
(共享模式與上面的類似枚驻,只是調(diào)用的方法不同;且因?yàn)槭枪蚕砟J街昕酰?dāng)一個(gè)鎖釋放時(shí)再登,會(huì)引起一堆線程的競(jìng)爭(zhēng))
3.1、再談 CLH 的公平與非公平性
不知道大家是否還記得我在《小白十》中分析 CLH 時(shí)晾剖,通過文字的描述方式和大家說過了锉矢,CLH 本身是公平的(嚴(yán)格的FIFO),但是齿尽,之所以存在不公平的競(jìng)爭(zhēng)沽损,原因在于:新來的線程是先入隊(duì)再請(qǐng)求鎖,還是先請(qǐng)求鎖且失敗后再入隊(duì)列循头。JDK 源碼中注釋也說了這一點(diǎn):
原注釋也說了:
因?yàn)樵谌腙?duì)之前可以先嘗試一下獲取鎖绵估,因此,新來的線程就有機(jī)會(huì)與隊(duì)列中被阻塞的首元素競(jìng)爭(zhēng)卡骂。當(dāng)然国裳,我們也可以在 tryAcquire 或 tryAcquireShared 方法中來禁止這種不公平的競(jìng)爭(zhēng)行為。一般大多數(shù)公平同步器會(huì)根據(jù) AQS 中的 hasQueuedPredecessors 這個(gè)方法的返回來返回結(jié)果全跨。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
該方法會(huì)在之后詳細(xì)展開分析缝左,現(xiàn)簡(jiǎn)單分析返回結(jié)果;返回 true 表明隊(duì)列中有正在排隊(duì)或正在競(jìng)爭(zhēng)的線程浓若;返回 false 表明隊(duì)列可能為空(或者當(dāng)前競(jìng)爭(zhēng)的線程就是自己)渺杉。
所以,tryAcquire 實(shí)現(xiàn)公平性挪钓,僅依賴 hasQueuedPredecessors 的返回結(jié)果即可是越,簡(jiǎn)單來說,代碼可如下實(shí)現(xiàn):
protected boolean tryAcquire(int arg) {
return !hasQueuedPredecessors();
}
即:新來的線程想要獲取鎖時(shí)碌上,先獲取 hasQueuedPredecessors 的結(jié)果英妓,如果 hasQueuedPredecessors 返回 true挽放,表明隊(duì)列有排隊(duì)的元素,那么 tryAcquire 就返回 false 蔓纠,表明獲取鎖失敿琛!
3.2腿倚、獲取鎖流程
3.2.1纯出、AQS.acquire
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
}
該方法是獨(dú)占模式調(diào)用的:
- tryAcquire:先嘗試獲取鎖;(由子類來決定是否公平)
- addWaiter:如果不成功敷燎,則封裝成 CLH 的一個(gè)節(jié)點(diǎn)暂筝,然后加入隊(duì)列;
- acquireQueued:等待線程原地自旋硬贯,自旋一段時(shí)間后進(jìn)入阻塞焕襟;
- selfInterrupt:線程中斷來喚醒線程(park后被中斷是沒作用的,因此饭豹,unpark后再中斷一下)鸵赖;
3.2.2、AQS.addWaiter(重點(diǎn):入隊(duì))
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
/**
* Creates and enqueues node for current thread and given mode.
*/
private Node addWaiter(Node mode) {
// 封裝為一個(gè)獨(dú)占模式的 Node
Node node = new Node(Thread.currentThread(), mode);
// 獲取 tail 尾節(jié)點(diǎn)
Node pred = tail;
// tail != null拄衰,pred 就是 tail 節(jié)點(diǎn)
if (pred != null) {
// 新節(jié)點(diǎn).prev = pred (tail節(jié)點(diǎn) )
node.prev = pred;
// CAS它褪,直接設(shè)置 tail = node
if (compareAndSetTail(pred, node)) {
// pred (tail節(jié)點(diǎn)).next = node
pred.next = node;
return node;
}
}
// tail = null,表明隊(duì)列還未初始化
// 需要先初始化 head & tail
enq(node);
// 返回新的節(jié)點(diǎn)
return node;
}
}
- 先封裝為一個(gè)獨(dú)占模式的 Node翘悉;
- 快速獲取 tail 尾節(jié)點(diǎn)茫打;
- 如果尾節(jié)點(diǎn)為 null,則直接調(diào)用 enq 入隊(duì)妖混;
- 如果尾節(jié)點(diǎn)不為 null:
- 則將新的節(jié)點(diǎn) prev 指向 tail 節(jié)點(diǎn)老赤;
- 然后 CAS 直接設(shè)置新的節(jié)點(diǎn)為 tail 節(jié)點(diǎn);
- 原 tail 節(jié)點(diǎn)的 next 指向新 tail 節(jié)點(diǎn)制市;
3.2.3诗越、AQS.enq
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
}
該方法主要是用來初始化 CLH 隊(duì)列頭結(jié)點(diǎn),然后再插入新的節(jié)點(diǎn)息堂。
我們來分析下:
- 該方法是一個(gè)無限循環(huán);
- 獲取 tail 尾節(jié)點(diǎn)块促;
- 如果尾節(jié)點(diǎn)為 null荣堰,則 head = tail = null;
- 構(gòu)造 head 結(jié)點(diǎn)竭翠,且 CAS 設(shè)置 head = tail = new Node()振坚;
- tail 不為空,則執(zhí)行和 addWaiter 中加入節(jié)點(diǎn)一樣的過程斋扰;
- 加入成功則返回新節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)渡八;
3.2.4啃洋、AQS.acquireQueued(重點(diǎn):自旋 -> park阻塞 -> 拿到鎖返回)
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 重點(diǎn):自旋
for (;;) {
// 獲取 node 的前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
// 如果前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),且獲取到鎖屎鳍,才會(huì)退出該循環(huán)
// 因此宏娄,這里我們也要注意一點(diǎn):
// 即便前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),此時(shí)該線程也只是嘗試競(jìng)爭(zhēng)獲取鎖逮壁,而
// 在非公平模式下孵坚,還是有可能失敗使其繼續(xù)自旋!
if (p == head && tryAcquire(arg)) {
// 將該節(jié)點(diǎn)設(shè)為頭節(jié)點(diǎn)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 該線程也不能一直自旋消耗CPU是吧窥淆?所以卖宠,自旋到了一定時(shí)候,就要進(jìn)入阻塞狀態(tài)忧饭,
// 等待被喚醒扛伍,如果下面的兩個(gè)條件成立,則表示線程進(jìn)入阻塞狀態(tài)词裤,待之后前驅(qū)節(jié)點(diǎn)
// 釋放鎖后刺洒,通過中斷來喚醒該線程
if (shouldParkAfterFailedAcquire(p, node) && // 判斷前驅(qū)節(jié)點(diǎn)的狀態(tài)是否為 Node.SIGNAL
// 若 shouldParkAfterFailedAcquire = true 才會(huì)走到該方法
// 該方法就是 park 線程進(jìn)入阻塞狀態(tài),一旦park成功亚斋,就被掛起作媚,
// 掛起中的線程是無法響應(yīng)線程中斷的,因此在 unpark 后檢查一下
// 線程是否曾經(jīng)被中斷過帅刊,返回線程中斷的狀態(tài)
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果等待過程中沒有成功獲取資源(如timeout纸泡,或者可中斷的情況下被中斷了),
// 那么取消結(jié)點(diǎn)在隊(duì)列中的等待
if (failed)
cancelAcquire(node);
}
}
}
parkAndCheckInterrupt 線程掛起后就不會(huì)再繼續(xù)執(zhí)行后續(xù)代碼赖瞒!直到被前驅(qū)節(jié)點(diǎn)釋放鎖時(shí) unpark 才行女揭!并且,在掛起過程中栏饮,線程中斷也無法響應(yīng)吧兔!
3.2.5、AQS.shouldParkAfterFailedAcquire
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前驅(qū)節(jié)點(diǎn)已經(jīng)被設(shè)置為該狀態(tài):即該節(jié)點(diǎn)線程釋放鎖時(shí)會(huì)通知下一個(gè)節(jié)點(diǎn)
*/
return true;
if (ws > 0) {
/*
* 該線程取消了
*/
do {
// 不斷向前查找節(jié)點(diǎn)袍嬉,直到節(jié)點(diǎn)的狀態(tài) <= 0
// 這行代碼分解為兩行:
// pred = pred.prev; // 獲取前驅(qū)節(jié)點(diǎn)境蔼;
// node.prev = pred; // 新節(jié)點(diǎn)的prev指向新的前驅(qū)節(jié)點(diǎn)
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 新的前驅(qū)節(jié)點(diǎn)的next = 新的節(jié)點(diǎn)
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
}
重點(diǎn)是 compareAndSetWaitStatus,一旦前驅(qū)節(jié)點(diǎn)的狀態(tài)被設(shè)置為 SIGNAL伺通,那么 node 的下一次自旋就會(huì)進(jìn)入阻塞狀態(tài)箍土,因?yàn)椋乱淮窝h(huán)進(jìn)入該方法時(shí)罐监,前驅(qū)節(jié)點(diǎn)的 waitStatus 狀態(tài)為 SIGNAL吴藻。
3.2.6、AQS.parkAndCheckInterrupt(重點(diǎn):掛起)
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
private final boolean parkAndCheckInterrupt() {
// 調(diào)用 park 使線程進(jìn)入 waiting 狀態(tài)
LockSupport.park(this); // -----------------掛起弓柱!unpark后才會(huì)繼續(xù)執(zhí)行下一行代碼
// 返回線程是否被中斷過沟堡,并清除中斷狀態(tài)侧但!
return Thread.interrupted();
}
}
該方法很簡(jiǎn)單,直接讓線程進(jìn)入阻塞狀態(tài)航罗,然后禀横,返回是否曾經(jīng)被中斷過。
至此伤哺,整個(gè)獲取鎖的流程就分析完了燕侠,我們來小結(jié)下:
- tryAcquire嘗試獲取鎖;
- 獲取失敗立莉,addWaiter入隊(duì)绢彤;
- acquireQueued 自旋、檢查or設(shè)置前驅(qū)節(jié)點(diǎn) waitStatus = SIGNAL蜓耻;
- 然后 park 掛起茫舶,掛起中不響應(yīng)中斷;
- 等待前驅(qū)節(jié)點(diǎn)釋放時(shí)來 unpark (下面要分析的)刹淌;
- unpark 后的下一次循環(huán)開始進(jìn)入鎖的競(jìng)爭(zhēng)(因?yàn)?unpark 后饶氏,其前驅(qū)是 head 節(jié)點(diǎn));
- 如果失敗有勾,再次掛起疹启,重復(fù) 5,6步驟蔼卡;
- 如果成功喊崖,則把自己設(shè)置為 head 節(jié)點(diǎn),返回是否曾經(jīng)被中斷過雇逞;
我們可以看到 acquire 是整個(gè)獲取鎖的入口荤懂,一旦沒有成功,該方法也會(huì)被阻塞塘砸,直接該調(diào)用者(即線程)拿到鎖节仿,或者超時(shí)、取消等其它原因才會(huì)繼續(xù)執(zhí)行完掉蔬;整個(gè)過程其實(shí)非常的簡(jiǎn)單廊宪,只是有些細(xì)節(jié),大家需要注意女轿,凡是我標(biāo)注的重點(diǎn)箭启,大家都應(yīng)該仔細(xì)看看其中的注釋,細(xì)細(xì)品味各種可能性谈喳。
3.3、釋放鎖流程
我們分析完了獲取鎖的流程戈泼,并且也提到了一點(diǎn)點(diǎn) unpark 婿禽,所以赏僧,我們的釋放鎖的流程會(huì)非常的容易理解,流程也非常的短小扭倾。獲取鎖的入口是『acquire』淀零,而釋放鎖的入口則是『release』;對(duì)于共享模式膛壹,釋放鎖的模式則是『releaseShared』驾中。
3.3.1、AQS.release
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
}
該方法很簡(jiǎn)單模聋,調(diào)用子類『tryRelease』肩民,具體由子類實(shí)現(xiàn),我們直接跳過链方,只研究成功的情況持痰。當(dāng) tryRelease 成功釋放鎖,則拿到當(dāng)前 head 節(jié)點(diǎn)祟蚀,如果 head.waitStatus != 0 工窍,則去 unpark;那 unpark 誰呢前酿?當(dāng)然是 unpark 下一個(gè)節(jié)點(diǎn)嘛患雏。
3.3.2、unparkSuccessor
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
private void unparkSuccessor(Node node) {
// 先嘗試將自己的 waitStatus 置為 0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 獲取下一個(gè)節(jié)點(diǎn)罢维,并嘗試去喚醒它淹仑!
Node s = node.next;
// 如果下一個(gè)節(jié)點(diǎn)不存在,或者已被 cancelled
if (s == null || s.waitStatus > 0) {
s = null; // 置該節(jié)點(diǎn)為null
// 從后向前遍歷言津,直到 t = null or t == node 時(shí)結(jié)束
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// unpark攻人!取消線程的掛起
LockSupport.unpark(s.thread);
}
}
整個(gè)解鎖流程就分析完了,對(duì)悬槽,就這么簡(jiǎn)單怀吻!
四、小結(jié)
AQS 咋一看初婆,方法很多蓬坡,但我們只關(guān)注主流程,其實(shí)就幾個(gè)函數(shù)磅叛,也非常簡(jiǎn)單屑咳;希望大家通過學(xué)習(xí)并掌握主流程之后,再去了解其它方法弊琴,就不會(huì)有任何壓力了兆龙。AQS 分析完了,接下來敲董,我將會(huì)帶領(lǐng)大家開始分析我們常用的一些子類紫皇,如:ReentrantLock慰安、ReadWriteLock等。