這里粘出的源碼基于JDK1.8
部分方法的源碼不是全部源碼偏形,刪除掉了部分注釋以及對(duì)主要邏輯沒(méi)有影響的內(nèi)容。
貼出源碼目的主要是為了根據(jù)源碼講清邏輯塑崖。
主要內(nèi)容包括:
- AQS簡(jiǎn)介
- CLH同步隊(duì)列
- 同步狀態(tài)的獲取與釋放
AQS簡(jiǎn)介
Java的內(nèi)置鎖(synchronized
關(guān)鍵字)一直都是備受爭(zhēng)議的苟跪,在1.6之前這個(gè)重量級(jí)鎖性能較為低下,雖然在1.6之后在基礎(chǔ)層面進(jìn)行了大量?jī)?yōu)化套耕,但與Lock相比還是存在一些缺陷:
synch缺少獲取鎖與釋放鎖的可操作性,可中斷峡继,超時(shí)獲取鎖冯袍,且它為獨(dú)占式的
為此Java提供了 Lock 組件,解決了synch關(guān)鍵字存在的問(wèn)題碾牌,引入了更靈活的鎖操作方式康愤。
但是在Lock底層(以及絕大多數(shù)JUC同步組件的底層)是由一個(gè)非常關(guān)鍵由非常重要的組件支持的,這個(gè)組件就是AQS舶吗。
AQS
AQS AbstractQueuedSynchronizer
征冷,即隊(duì)列同步器,它是JUC并發(fā)包的核心組件誓琼。
它是構(gòu)建JUC同步組件的基礎(chǔ)框架检激,JUC并發(fā)包的作者(Dong Lea)期望它可以為實(shí)現(xiàn)大部分同步需求提供同步器。
AQS解決了實(shí)現(xiàn)同步器時(shí)涉及的大量細(xì)節(jié)實(shí)現(xiàn)問(wèn)題腹侣,例如:獲取同步狀態(tài)呵扛,F(xiàn)IFO隊(duì)列】鸫基于AQS來(lái)構(gòu)建同步器可以帶來(lái)很多好處。不僅可以極大地減少實(shí)現(xiàn)工作缤灵,而且也不必處理在多個(gè)位置上發(fā)生的競(jìng)爭(zhēng)問(wèn)題伦籍。
AQS的主要使用方式是繼承蓝晒,子類(lèi)通過(guò)繼承AQS并實(shí)現(xiàn)它的抽象方法,來(lái)管理同步狀態(tài)帖鸦。
AQS核心變量
1芝薇、volatile int state
AQS使用一個(gè) volatile 修飾的 int 變量來(lái)表示同步狀態(tài),當(dāng) state>0 時(shí)作儿,表示已經(jīng)獲取到了鎖洛二,當(dāng) state=0 時(shí),表示釋放了鎖攻锰。
它提供了三個(gè)方法:
getState()
seState(int newState)
compareAndSetState(int expect, int update)
這三個(gè)方法用于對(duì)同步狀態(tài)state進(jìn)行操作晾嘶,當(dāng)然,AQS可以確保對(duì)state操作的安全性娶吞。
2垒迂、FIFO同步隊(duì)列
AQS通過(guò)內(nèi)置的FIFO同步隊(duì)列,來(lái)完成資源獲取線(xiàn)程的排隊(duì)工作妒蛇。
如果當(dāng)前線(xiàn)程獲取同步狀態(tài)失敗時(shí)机断,AQS會(huì)將當(dāng)前線(xiàn)程以及等待狀態(tài)等信息,構(gòu)造成一個(gè)節(jié)點(diǎn)(Node)并將其加入同步隊(duì)列
同時(shí)绣夺,會(huì)阻塞當(dāng)前線(xiàn)程吏奸,當(dāng)同步狀態(tài)釋放時(shí),則會(huì)把節(jié)點(diǎn)中的線(xiàn)程喚醒陶耍,使其再次嘗試獲取同步狀態(tài)奋蔚。
AQS提供的方法
getState()
:返回同步狀態(tài)的當(dāng)前值
setState(int newState)
:設(shè)置當(dāng)前同步狀態(tài)
compareAndSetState(int expect, int update)
:使用CAS的方式設(shè)置當(dāng)前狀態(tài),該方法能夠保證狀態(tài)設(shè)置的原子性物臂。
tryAcquire(int arg)
:獨(dú)占式獲取同步狀態(tài)旺拉,獲取同步狀態(tài)成功后,其他線(xiàn)程需要等待該線(xiàn)程釋放同步狀態(tài)之后棵磷,才能獲取同步狀態(tài)蛾狗。
tryRelease(int arg)
:獨(dú)占式釋放同步狀態(tài)
tryAcquireShared(int arg)
:共享式獲取同步狀態(tài),返回值大于0表示獲取成功仪媒,否則獲取失敗沉桌。
tryReleaseShared()
:共享式釋放to怒狀態(tài)
isHeldExclusively()
:當(dāng)前同步器是否在獨(dú)占模式下被線(xiàn)程占用,一般該方法表示是否被當(dāng)前線(xiàn)程獨(dú)占
acquire(int arg)
:在方法內(nèi)部調(diào)用可重寫(xiě)的tryAcquire(int arg)方法算吩,獨(dú)占式獲取同步轉(zhuǎn)臺(tái)留凭,如果當(dāng)前線(xiàn)程獲取同步狀態(tài)成功,則由該方法返回偎巢,否則將會(huì)進(jìn)入同步隊(duì)列等待蔼夜。
acquireInterruptibly(int arg)
:與 acquire(int arg)相同,但是該方法響應(yīng)中斷压昼。當(dāng)前線(xiàn)程未獲取到同步狀態(tài)而進(jìn)入到同步隊(duì)列中求冷,如果當(dāng)前線(xiàn)程被中斷瘤运,則該方法會(huì)拋出 InterruptedException 異常并返回。
tryAcquireNanos(int arg, long nanos)
:可超時(shí)的獲取同步狀態(tài)匠题,如果當(dāng)前線(xiàn)程在 nanos 時(shí)間內(nèi)沒(méi)獲取到同步狀態(tài)拯坟,那么將會(huì)返回false,如果獲取到了將返回true韭山。
acquireShared(int arg)
:共享式獲取同步狀態(tài)郁季,如果當(dāng)前線(xiàn)程未獲取到同步狀態(tài),將會(huì)進(jìn)入同步隊(duì)列等待钱磅。與獨(dú)占式的主要區(qū)別是在同一時(shí)刻可以有多個(gè)下城獲取到同步狀態(tài)梦裂。
acquireSharedInturruptibly(int arg)
:共享式獲取同步狀態(tài),響應(yīng)中斷续搀。
tryAcquireSharedNanos(int arg, long nanosTimeout)
:共享式獲取同步狀態(tài)塞琼,增加超時(shí)限制。
release(int arg)
:獨(dú)占式釋放同步狀態(tài)禁舷,該方法會(huì)在釋放同步狀態(tài)之后彪杉,將同步隊(duì)列中第一個(gè)節(jié)點(diǎn)包含的線(xiàn)程喚醒。
releaseShared(int arg)
:共享式釋放同步狀態(tài)
CLH同步隊(duì)列
關(guān)于CLH鎖的相關(guān)原理牵咙,可以看文末的參考資料派近,這里就不詳細(xì)說(shuō)了。
在上面講到洁桌,AQS維護(hù)著一個(gè)FIFO隊(duì)列渴丸,這個(gè)隊(duì)列就是CLH隊(duì)列。
CLH同步隊(duì)列是一個(gè)FIFO雙向隊(duì)列另凌,AQS依賴(lài)它谱轨,來(lái)完成同步狀態(tài)的管理。
- 當(dāng)線(xiàn)程獲取同步狀態(tài)失敗時(shí)吠谢,AQS會(huì)將當(dāng)前線(xiàn)程以及等待狀態(tài)信息構(gòu)造成一個(gè)節(jié)點(diǎn)(Node)并將其加入到CLH同步隊(duì)列土童,同時(shí)會(huì)阻塞該線(xiàn)程。
- 當(dāng)同步狀態(tài)釋放時(shí)工坊,會(huì)吧節(jié)點(diǎn)喚醒(公平鎖)献汗,使其再次嘗試獲取同步狀態(tài)
CLH Node
CLH等待隊(duì)列,是一個(gè)以CLH鎖為變量的隊(duì)列王污。CLH鎖通常用于自旋鎖罢吃,這里用來(lái)阻塞同步器,同時(shí)在這個(gè)Node里保存了一個(gè)線(xiàn)程的相關(guān)控制信息昭齐。
在CLH同步隊(duì)列中尿招,一個(gè)節(jié)點(diǎn)表示一個(gè)線(xiàn)程,保存信息包括:
- 線(xiàn)程的引用(thread)
- 狀態(tài)(waitStatus)
- 前驅(qū)節(jié)點(diǎn)(prev)
- 后繼節(jié)點(diǎn)(next)
定義如下:
static final class Node {
/** 標(biāo)記一個(gè)Node節(jié)點(diǎn)處于共享模式 */
static final Node SHARED = new Node();
/** 標(biāo)記一個(gè)Node節(jié)點(diǎn)處于獨(dú)占模式 */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
/** 因?yàn)槌瑫r(shí)或者中斷,節(jié)點(diǎn)會(huì)被設(shè)置為取消狀態(tài)泊业,被取消的節(jié)點(diǎn)不會(huì)參與到競(jìng)爭(zhēng)中把沼,該節(jié)點(diǎn)會(huì)一直保持取消不會(huì)轉(zhuǎn)變?yōu)槠渌麪顟B(tài) */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
/** 后繼節(jié)點(diǎn)的線(xiàn)程處于等待狀態(tài),而當(dāng)前節(jié)點(diǎn)的線(xiàn)程如果釋放了同步狀態(tài)或被取消吁伺,將通知后繼節(jié)點(diǎn),使后繼節(jié)點(diǎn)得以運(yùn)行 */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
/** 節(jié)點(diǎn)在等待隊(duì)列中租谈,節(jié)點(diǎn)線(xiàn)程等待在Condition上篮奄,當(dāng)其他線(xiàn)程對(duì)Condition調(diào)用了singal()后,該節(jié)點(diǎn)將會(huì)從等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列中割去,加入到同步狀態(tài)的獲取中 */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
* 表示下一次共享式同步狀態(tài)將會(huì)無(wú)條件的傳播下去
*/
static final int PROPAGATE = -3;
/**
* 當(dāng)前Node的等待狀態(tài)窟却。初始值為0,表示不屬于以上任何狀態(tài)
*/
volatile int waitStatus;
/**
* 前驅(qū)節(jié)點(diǎn)
*/
volatile Node prev;
/**
* 后繼節(jié)點(diǎn)
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
* 獲取同步狀態(tài)的線(xiàn)程
*/
volatile Thread thread;
/**
* 指向下一個(gè)等待中Condition的Node節(jié)點(diǎn)呻逆,或者 waitStatus為SHARED的節(jié)點(diǎn)
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回前驅(qū)節(jié)點(diǎn)
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
CLH同步隊(duì)列
入列
addWaiter方法:
先快速?lài)L試設(shè)置尾節(jié)點(diǎn)夸赫,如果失敗,則調(diào)用enq(Node node)方法設(shè)置尾節(jié)點(diǎn)咖城。
在源碼中茬腿,兩個(gè)方法都是通過(guò)一個(gè) CAS 方法 cmopareAndSetTail(Node expect, Node update)
來(lái)設(shè)置尾節(jié)點(diǎn),該方法可以確保節(jié)點(diǎn)是線(xiàn)程安全添加的宜雀。
在 enq(Node node)
方法中切平,AQS通過(guò) 自旋 的方式來(lái)保證節(jié)點(diǎn)可以正確添加,只有成功添加后辐董,當(dāng)前線(xiàn)程才會(huì)從改方法返回悴品,否則一直自旋重試。
過(guò)程如下:
源碼如下::
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
// 新建node
Node node = new Node(Thread.currentThread(), mode);
// 快速?lài)L試添加尾節(jié)點(diǎn)简烘,失敗的話(huà)苔严,調(diào)用enq來(lái)添加
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
// 死循環(huán)自旋
for (;;) {
Node t = tail;
// 如果tail 不在,則設(shè)置為首節(jié)點(diǎn)
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
出列
CLH同步隊(duì)列遵循FIFO孤澎,首節(jié)點(diǎn)的線(xiàn)程釋放同步狀態(tài)后届氢,將會(huì)喚醒它的后繼節(jié)點(diǎn)(next),而后繼節(jié)點(diǎn)將會(huì)在獲取同步狀態(tài)成功時(shí)亥至,將自己設(shè)置為首節(jié)點(diǎn)悼沈。
這個(gè)過(guò)程非常簡(jiǎn)單,head執(zhí)行該節(jié)點(diǎn)并斷開(kāi)原來(lái)首節(jié)點(diǎn)的next 和 當(dāng)前節(jié)點(diǎn)的 prev 即可姐扮,注意在這個(gè)過(guò)程是不需要CAS來(lái)保證的絮供,因?yàn)橹挥幸粋€(gè)線(xiàn)程能夠成功獲取到同步狀態(tài)。
過(guò)程圖如下:
同步狀態(tài)的獲取與釋放
自定義的子類(lèi)使用AQS提供的模板方法可以實(shí)現(xiàn)自己的同步語(yǔ)義茶敏。
AQS提供了大量的模板方法來(lái)實(shí)現(xiàn)同步壤靶,主要分為三類(lèi):
1、獨(dú)占式獲取和釋放同步狀態(tài)
2惊搏、共享式獲取和釋放同步狀態(tài)
3贮乳、查詢(xún)同步隊(duì)列中的等待線(xiàn)程情況
獨(dú)占式
同一時(shí)刻僅有一個(gè)線(xiàn)程持有同步狀態(tài)
獨(dú)占式同步狀態(tài)獲取
aquire(int arg)
方法為AQS中提供的模板方法忧换,該方法為獨(dú)占式獲取同步狀態(tài),會(huì)忽略中斷向拆,也就是說(shuō)亚茬,由于線(xiàn)程獲取同步狀態(tài)失敗加入到CLH同步隊(duì)列中,后續(xù)對(duì)線(xiàn)程進(jìn)行中斷操作時(shí)浓恳,線(xiàn)程不會(huì)從同步隊(duì)列中移除刹缝。
各個(gè)方法定義如下:
1、tryAcquire:嘗試獲取鎖颈将,獲取成功則設(shè)置鎖狀態(tài)并且返回true梢夯,否則返回false。該方法由自定義同步組件繼承AQS后實(shí)現(xiàn)晴圾,該方法必須要保證線(xiàn)程安全的獲取同步狀態(tài)颂砸。
2、addWaiter:如果tryAcquire返回false(也就是獲取同步鎖失斔酪Α)人乓,則調(diào)用該方法,將當(dāng)前線(xiàn)程加入到CLH同步隊(duì)列的隊(duì)尾知允。
3撒蟀、acquiredQueued:當(dāng)前線(xiàn)程會(huì)根據(jù)公平性原則來(lái)進(jìn)行阻塞等待(自旋),直到獲取鎖成功為止温鸽。并且返回當(dāng)前線(xiàn)程在等待過(guò)程中有沒(méi)有中斷過(guò)保屯。
4、selfInterrupt:自己產(chǎn)生一個(gè)中斷涤垫。
acquireQueued
方法為一個(gè)自旋的過(guò)程姑尺,也就是說(shuō),當(dāng)前線(xiàn)程節(jié)點(diǎn)對(duì)象(Node)進(jìn)入同步隊(duì)列后蝠猬,就會(huì)進(jìn)入下一個(gè)自旋的工程切蟋,每個(gè)節(jié)點(diǎn)都會(huì)自省地觀(guān)察,當(dāng)條件滿(mǎn)足榆芦,獲取到同步狀態(tài)后柄粹,就可以從這個(gè)自旋中退出,否則一直執(zhí)行下去匆绣。
從代碼中可以看到驻右,當(dāng)前線(xiàn)程會(huì)一直嘗試獲取同步狀態(tài),當(dāng)然前提是只有其前驅(qū)節(jié)點(diǎn)為頭結(jié)點(diǎn)時(shí)崎淳,才可以嘗試獲取同步狀態(tài)堪夭。理由:
1、保持FIFO同步隊(duì)列原則。
2森爽、頭結(jié)點(diǎn)釋放同步狀態(tài)后恨豁,將會(huì)喚醒其后繼節(jié)點(diǎn),后繼節(jié)點(diǎn)被喚醒后于需要檢查自己是否為頭節(jié)點(diǎn)爬迟。
acquire方法流程圖:
源碼如下:
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 中斷標(biāo)記位
boolean interrupted = false;
// 死循環(huán)自旋
for (;;) {
// 獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)
final Node p = node.predecessor();
// 如果前驅(qū)是頭節(jié)點(diǎn)橘蜜,并且tryAcquire成功,則設(shè)置后返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果失敗雕旨,則判斷線(xiàn)程是否需要等待扮匠,并且根據(jù)結(jié)果判斷是否進(jìn)入等待。
// 判斷有中斷后凡涩,設(shè)置interruped為true,并會(huì)在方法結(jié)束返回
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
獨(dú)占式獲取響應(yīng)中斷
AQS提供的 acquire(int arg)
方法以獨(dú)占方式獲取同步狀態(tài)疹蛉,但是會(huì)忽略中斷活箕,對(duì)線(xiàn)程進(jìn)行中斷操作后,線(xiàn)程依然會(huì)位于CLH同步隊(duì)列中等待獲取同步狀態(tài)可款。
為此育韩,AQS提供了 acquireInterruptibly(int arg)
方法,該方法等待獲取同步狀態(tài)時(shí)闺鲸,如果當(dāng)前線(xiàn)程被中斷了筋讨,會(huì)立刻響應(yīng)中斷拋出異常 IntteruptedException
方法內(nèi)部會(huì)首先校驗(yàn)該線(xiàn)程是否已經(jīng)被中斷,如果是摸恍,則直接拋出中斷異常悉罕,否則,執(zhí)行 tryAcquire(int arg)
方法嘗試獲取同步狀態(tài)立镶。
如果成功壁袄,則直接返回,否則執(zhí)行 doAcquireInterruptibly(int arg)
媚媒。
doAcquireInterruptibly(int arg)
方法 與 acquire(int arg)
僅有兩處差別:
1嗜逻、方法聲明會(huì)拋出 InterruptException
異常。
2缭召、在中斷方法處不再使用 interrupted 標(biāo)志栈顷,而是直接拋出 InterruptedException
異常。
源碼如下:
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
* at least once {@link #tryAcquire}, returning on
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
* used to implement method {@link Lock#lockInterruptibly}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/**
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// checkInterrupted 后嵌巷,直接拋出異常
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
TODO
(如果有什么錯(cuò)誤或者建議萄凤,歡迎留言指出)
(本文內(nèi)容是對(duì)各個(gè)知識(shí)點(diǎn)的轉(zhuǎn)載整理,用于個(gè)人技術(shù)沉淀晴竞,以及大家學(xué)習(xí)交流用)
參考資料:
【死磕Java并發(fā)】—–J.U.C之AQS:AQS簡(jiǎn)介
【死磕Java并發(fā)】—–J.U.C之AQS:CLH同步隊(duì)列
【死磕Java并發(fā)】—–J.U.C之AQS:同步狀態(tài)的獲取與釋放
【死磕Java并發(fā)】—–J.U.C之AQS:阻塞和喚醒線(xiàn)程
源碼:ThreadPoolExecutor(JDK1.8)