總體介紹
基于隊(duì)列的抽象同步器报慕,它是jdk中所有顯示的線程同步工具的基礎(chǔ)深浮,像ReentrantLock/DelayQueue/CountdownLatch等等,都是借助AQS實(shí)現(xiàn)的眠冈。Java中已經(jīng)有了synchronized關(guān)鍵字飞苇,那么為什么還需要來(lái)這么一出呢?因?yàn)锳QS能實(shí)現(xiàn)更多維度蜗顽,更多場(chǎng)景的鎖機(jī)制布卡,例如共享鎖(讀鎖)/基于條件的線程阻塞/可以實(shí)現(xiàn)公平和非公平的鎖策略/可以實(shí)現(xiàn)鎖等待的中斷,而synchronized關(guān)鍵字由JVM實(shí)現(xiàn)雇盖,在代碼使用層面來(lái)說(shuō)忿等,如果僅僅是使用獨(dú)占鎖,那synchronized關(guān)鍵字比其它的鎖實(shí)現(xiàn)用起來(lái)方便崔挖。
下面步入正題贸街,來(lái)看看AQS都提供了哪些能力。
1虚汛、主體結(jié)構(gòu)
先來(lái)看看內(nèi)部的主體結(jié)構(gòu):
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
protected AbstractQueuedSynchronizer() { }
//內(nèi)部類(lèi)Node模捂,代表每一個(gè)獲取或者釋放鎖的線程
static final class Node{}
//head 和 tail構(gòu)成了同步隊(duì)列鏈表的頭節(jié)點(diǎn)和尾節(jié)點(diǎn)舅逸。
private transient volatile Node head;
private transient volatile Node tail;
//同步狀態(tài)值桶蛔,所有的同步行為都是通過(guò)state這個(gè)共享資源來(lái)實(shí)現(xiàn)的桶错。
private volatile int state;
//條件對(duì)象,用于同步在當(dāng)前鎖對(duì)象上的線程
public class ConditionObject implements Condition, java.io.Serializable{}
/******
一系列的內(nèi)部方法:
包括嘗試獲取鎖權(quán)力
嘗試獲取鎖失敗后構(gòu)造節(jié)點(diǎn)放入等待隊(duì)列
各種入隊(duì)出隊(duì)的操作
嘗試釋放鎖等
********/
//一系列的Unsafe操作
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
}
所以總結(jié)起來(lái)AQS就是通過(guò)一系列的Unsafe方法去操作一個(gè)鏈表隊(duì)列将谊,而鏈表隊(duì)列中每個(gè)節(jié)點(diǎn)需要操作的共享資源就是一個(gè)int state字段冷溶,如果要用到條件等待,則需要了解ConditionObject尊浓。
2逞频、節(jié)點(diǎn)類(lèi)的構(gòu)造
再來(lái)看看Node節(jié)點(diǎn)的內(nèi)部結(jié)構(gòu):
static final class Node {
//常量,見(jiàn)nextWaiter屬性
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
//waitStatus 常量值栋齿。表示等待獲取鎖的線程已經(jīng)被取消(線程中斷/等待超時(shí))
static final int CANCELLED = 1;
//waitStatus 常量值苗胀。表示后繼線程需要unpark
static final int SIGNAL = -1;
//waitStatus 常量值襟诸。表示線程正在Condition隊(duì)列中
static final int CONDITION = -2;
//waitStatus 常量值。表示線程的acquireShared行為需要無(wú)條件的向隊(duì)列的下一個(gè)節(jié)點(diǎn)傳遞基协。用在共享鎖的場(chǎng)景歌亲。
static final int PROPAGATE = -3;
//注意,waitStatus除了以上常量值以為澜驮,由于是int類(lèi)型陷揪,則默認(rèn)是0
volatile int waitStatus;
//前繼節(jié)點(diǎn)
volatile Node prev;
//后繼節(jié)點(diǎn)
volatile Node next;
//節(jié)點(diǎn)所代表的線程
volatile Thread thread;
//如果節(jié)點(diǎn)再Condition等待隊(duì)列中,則該字段指向下一個(gè)節(jié)點(diǎn)杂穷,如果節(jié)點(diǎn)在同步隊(duì)列中悍缠,則為一個(gè)標(biāo)志位,其值為 SHARED或者null
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
所以在同步隊(duì)列中Node應(yīng)該長(zhǎng)下面這個(gè)樣子:
在Condition等待隊(duì)列中應(yīng)該長(zhǎng)如下這個(gè)樣子:
Node節(jié)點(diǎn)一共有3個(gè)構(gòu)造方法他們分別在同步隊(duì)列初始化時(shí)/加入同步隊(duì)列/加入Condition隊(duì)列時(shí)使用耐量,結(jié)合構(gòu)造方法以及AQS和Condition類(lèi)飞蚓,我們看下最終隊(duì)列會(huì)長(zhǎng)什么樣子。
首先看一下無(wú)參的構(gòu)造方法在什么地方使用:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); //用當(dāng)前線程和mode類(lèi)型構(gòu)造節(jié)點(diǎn)廊蜒,此時(shí)waitStatus默認(rèn)為0
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter方法是將線程放入隊(duì)列的方法玷坠,開(kāi)始構(gòu)造new Node(Thread.currentThread(), mode)這樣的節(jié)點(diǎn),然后嘗試入隊(duì)列劲藐,如果入隊(duì)列失敗則進(jìn)入enq方法,在enq中樟凄,如果隊(duì)列為空(當(dāng)t==null的時(shí)候聘芜,因?yàn)槿腙?duì)操作是從隊(duì)尾進(jìn)行的),此時(shí)構(gòu)造了一個(gè)空的Node節(jié)點(diǎn)缝龄,然后將head和tail指向它汰现。我們稱這個(gè)節(jié)點(diǎn)為哨兵節(jié)點(diǎn),為什么需要這樣一個(gè)節(jié)點(diǎn)叔壤,后面會(huì)說(shuō)明瞎饲,哨兵節(jié)點(diǎn)入隊(duì)列后我們通過(guò)當(dāng)前線程構(gòu)造的的節(jié)點(diǎn)new Node(Thread.currentThread(), mode)再入隊(duì)列。由于構(gòu)造方法沒(méi)有傳入waitStatus值炼绘,所以此時(shí)waitStatus默認(rèn)為0嗅战。
所以綜合AQS來(lái)看,最終的同步隊(duì)列應(yīng)該長(zhǎng)如下這個(gè)樣子:
那么在Condition等待隊(duì)列中呢? Condition隊(duì)列中有firstWaiter和lastWaiter分別指向頭節(jié)點(diǎn)和尾節(jié)點(diǎn)俺亮。
先看下加入Condition隊(duì)列的代碼:
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//構(gòu)造waitStatus等于CONDITION的NODE節(jié)點(diǎn)
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) //如果隊(duì)列為空驮捍,則新節(jié)點(diǎn)入隊(duì)列,頭節(jié)點(diǎn)指向新節(jié)點(diǎn)
firstWaiter = node;
else
t.nextWaiter = node; //如果隊(duì)列不為空脚曾,則新節(jié)點(diǎn)加入到隊(duì)列末尾
lastWaiter = node; //尾節(jié)點(diǎn)指向新節(jié)點(diǎn)
return node;
}
所以东且,最終Condition隊(duì)列中的節(jié)點(diǎn)如下:
Condition隊(duì)列中的節(jié)點(diǎn)是單向的,并且沒(méi)有哨兵節(jié)點(diǎn)本讥,在Condition隊(duì)列中珊泳,nextWaiter指向下一個(gè)節(jié)點(diǎn)鲁冯,而不是像在同步隊(duì)列中那樣指向模式(SHARED, null),其中各個(gè)節(jié)點(diǎn)的waitStatus等于CONDITION(-2)色查。
3薯演、獲取鎖
3.1 首先看寫(xiě)鎖lock的實(shí)現(xiàn)
public void lock() {
sync.acquire(1); //調(diào)用同步器的acquire方法
}
這里同步器由ReentrantReadWriteLock來(lái)繼承AQS后具體實(shí)現(xiàn),不同的鎖對(duì)AQS的實(shí)現(xiàn)是不一樣的综慎。
public final void acquire(int arg) {
if (!tryAcquire(arg) && //嘗試獲取鎖失敗
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //并且節(jié)點(diǎn)入隊(duì)列成功
selfInterrupt(); //線程中斷
}
整個(gè)acquire方法其實(shí)就是3部曲涣仿,①嘗試獲取鎖 ②如果獲取鎖沒(méi)有成功,則構(gòu)造節(jié)點(diǎn)加入等待隊(duì)列中 ③如果節(jié)點(diǎn)入隊(duì)列成功示惊,則線程自我中斷讓出資源好港。當(dāng)線程進(jìn)入同步隊(duì)列中后就實(shí)現(xiàn)了獲取鎖過(guò)程中的線程阻塞功能了,因?yàn)檫@個(gè)時(shí)候線程是自我中斷狀態(tài)米罚。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
我們找到tryAcquire方法發(fā)現(xiàn)钧汹,它直接拋出了一個(gè)異常。這里其實(shí)是AQS用了模板模式录择,將tryAcquire的具體實(shí)現(xiàn)預(yù)留給各種鎖來(lái)實(shí)現(xiàn)拔莱,例如單純的寫(xiě)鎖只用判斷state是否為0,而讀寫(xiě)鎖的實(shí)現(xiàn)是要先取state的高16或者低16位來(lái)判斷隘竭。所以不同的鎖對(duì)tryAcquire的實(shí)現(xiàn)不同塘秦,但是不管如何實(shí)現(xiàn),由于state是共享變量动看,所以各個(gè)實(shí)現(xiàn)一定會(huì)保證state的線程安全尊剔。
3.1.1 線程加入同步隊(duì)列
當(dāng)獲取鎖失敗時(shí),就開(kāi)始構(gòu)造節(jié)點(diǎn)入隊(duì)列了菱皆。
//在寫(xiě)鎖的情況下须误,這里mode為Node.EXCLUSIVE,其實(shí)就是null
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//在enq之前的這段代碼是一個(gè)快讀入隊(duì)列的實(shí)現(xiàn)仇轻,如果入隊(duì)不成功則交由enq來(lái)實(shí)現(xiàn)
Node pred = tail; //緩存tail節(jié)點(diǎn)京痢,因?yàn)閺年?duì)尾加入節(jié)點(diǎn),所以理論上tail就是前繼節(jié)點(diǎn)
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) { //當(dāng)隊(duì)尾不等于空篷店,則用CAS方法修改tail節(jié)點(diǎn)祭椰,讓其指向本次的構(gòu)造的新節(jié)點(diǎn)。
pred.next = node; //如果CAS成功則說(shuō)明入隊(duì)成功船庇,然后將前繼節(jié)點(diǎn)的next指向新節(jié)點(diǎn)吭产。
return node;
}
}
//如果隊(duì)列為空,或者新節(jié)點(diǎn)入隊(duì)列失敗鸭轮,則交給enq處理臣淤。
enq(node);
return node;
}
從上面的代碼我們能看出AQS中一個(gè)很重要的邏輯:入隊(duì)列,一定是優(yōu)先保證tail的重新指向窃爷,然后才是前節(jié)點(diǎn)的next指向新節(jié)點(diǎn)邑蒋。如果拿分布式系統(tǒng)數(shù)據(jù)一致性舉例的話就是:tail是數(shù)據(jù)強(qiáng)一致性姓蜂,而next是最終一致,這一點(diǎn)在后面的邏輯判斷中很重要医吊。
//自旋的入隊(duì)列方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize //隊(duì)列為空
if (compareAndSetHead(new Node())) //構(gòu)造空節(jié)點(diǎn)钱慢,CAS改變頭節(jié)點(diǎn)
tail = head; //將tail指向新節(jié)點(diǎn)
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { //隊(duì)列不為空,則將tail指向新節(jié)點(diǎn)
t.next = node; //前節(jié)點(diǎn)的next指向新節(jié)點(diǎn)
return t;
}
}
}
}
這里隊(duì)列不為空的情況與addWaiter的快速入隊(duì)邏輯是一樣的沒(méi)什么好說(shuō)的卿堂,這里的重點(diǎn)是隊(duì)列等于空的時(shí)候束莫,我們可以看到代碼構(gòu)造了一個(gè)不包含任何線程的“空”節(jié)點(diǎn),然后將head指向它草描,這里有兩個(gè)問(wèn)題: 1.為什么要構(gòu)造這個(gè)空節(jié)點(diǎn)览绿,直接用新節(jié)點(diǎn)作為頭節(jié)點(diǎn)不好么? 2.為什么隊(duì)列為空的時(shí)候是先修改head而不是像隊(duì)列不為空的時(shí)候那樣直接修改tail呢穗慕。
這里先保留這兩個(gè)問(wèn)題饿敲,后續(xù)再來(lái)解答。
這一節(jié)主要記住的一個(gè)核心點(diǎn)就是:隊(duì)列是否為空通過(guò)tail判斷逛绵,一個(gè)節(jié)點(diǎn)加入隊(duì)列分為兩步怀各,第一步是強(qiáng)一致性的修改tail(節(jié)點(diǎn)的pre已經(jīng)指向的前節(jié)點(diǎn)),下一步再是修改前節(jié)點(diǎn)的next术浪。
3.1.2瓢对、進(jìn)入同步隊(duì)列后繼續(xù)嘗試獲取鎖或者睡眠
線程被構(gòu)造為節(jié)點(diǎn)進(jìn)入隊(duì)列后,接下來(lái)就是對(duì)隊(duì)列中的節(jié)點(diǎn)進(jìn)行獲取鎖的處理:
//獨(dú)占模式并且不可中斷的為同步隊(duì)列中的線程獲取鎖
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; //中斷標(biāo)志
for (;;) {
final Node p = node.predecessor(); //節(jié)點(diǎn)的前繼節(jié)點(diǎn)
if (p == head && tryAcquire(arg)) { //如果前節(jié)點(diǎn)等于頭節(jié)點(diǎn)胰苏,則說(shuō)明當(dāng)前節(jié)點(diǎn)應(yīng)該獲取鎖
setHead(node); //如果獲取鎖成功沥曹,則將頭節(jié)點(diǎn)指向獲取鎖成功的節(jié)點(diǎn),并清空該節(jié)點(diǎn)的thread和pre碟联,讓該節(jié)點(diǎn)變成新的哨兵
p.next = null; // help GC
failed = false;
return interrupted; //返回中斷狀態(tài)
}
//如果獲取鎖失敗,或者當(dāng)前節(jié)點(diǎn)不是最靠前的非哨兵節(jié)點(diǎn)僵腺,則嘗試將線程park
if (shouldParkAfterFailedAcquire(p, node) && //判斷當(dāng)前節(jié)點(diǎn)是否可以park鲤孵,如果可以則嘗試park
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); //如果獲取鎖失敗,取消競(jìng)爭(zhēng)鎖
}
}
//判斷當(dāng)前節(jié)點(diǎn)是否可以睡眠(park)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //如果前繼節(jié)點(diǎn)的waitStatus等于Node.SIGNAL辰如,則前節(jié)點(diǎn)會(huì)負(fù)責(zé)喚醒當(dāng)前節(jié)點(diǎn)普监, 當(dāng)前節(jié)點(diǎn)可以park
return true;
if (ws > 0) { //如果前節(jié)點(diǎn)已經(jīng)取消(被中斷或超時(shí))
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); //直到遍歷到非取消的前節(jié)點(diǎn)為止
pred.next = node;//將遍歷到的前節(jié)點(diǎn)next指向node節(jié)點(diǎn)
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//將前節(jié)點(diǎn)waitStatus更新為Node.SIGNAL,表示后繼節(jié)點(diǎn)需要unpark
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//park當(dāng)前線程琉兜,并返回當(dāng)前線程的中斷狀態(tài)
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
看到這里凯正,我們來(lái)說(shuō)一下剛才遺留的那個(gè)問(wèn)題:為什么要增加一個(gè)哨兵節(jié)點(diǎn)。如果沒(méi)有哨兵豌蟋,如上圖廊散,這個(gè)時(shí)候該節(jié)點(diǎn)獲取到鎖,這個(gè)時(shí)候需要做兩件事情梧疲,先判斷head是否等于tail允睹,然后需要將head指向當(dāng)前的next运准,其實(shí)就是指向null,tail也需要指向null缭受,這個(gè)時(shí)候胁澳,如果有另外一個(gè)線程正在入隊(duì)列,需要將head或者tail指向這個(gè)入隊(duì)列新節(jié)點(diǎn)米者,這個(gè)時(shí)候如果入隊(duì)節(jié)點(diǎn)剛完成tail以及pre的指向韭畸,還沒(méi)來(lái)及更改前節(jié)點(diǎn)的next,這時(shí)出隊(duì)列這邊則會(huì)將head指向null蔓搞,tail也會(huì)被指向null胰丁,當(dāng)然這里如果在將tail指向null之前進(jìn)行一次當(dāng)前tail與老tail的對(duì)比, 如果一致則更新為null败明,不一致則不更新能避免tail指向null隘马,但是head這邊就顯得比較麻煩了,每次獲取鎖成功要重新設(shè)置head都需要從隊(duì)尾向?qū)︻^遍歷妻顶,以防止有新的節(jié)點(diǎn)放入酸员,直到遍歷到節(jié)點(diǎn)中thead為空的(獲取鎖已出隊(duì)列)節(jié)點(diǎn)的后一個(gè)節(jié)點(diǎn)為止。 這樣的處理顯得相當(dāng)?shù)穆闊?/p>
第二個(gè)問(wèn)題是為什么要先修改head而不能直接修改tail讳嘱,先看有哨兵的情況幔嗦,我改寫(xiě)了一下enq的方法:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize //隊(duì)列為空
Node n = new Node();
if (compareAndSetTail(n)) //構(gòu)造空節(jié)點(diǎn),CAS改變頭節(jié)點(diǎn)
head = n;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { //隊(duì)列不為空沥潭,則將tail指向新節(jié)點(diǎn)
t.next = node; //前節(jié)點(diǎn)的next指向新節(jié)點(diǎn)
return t;
}
}
}
}
這樣的處理方式我認(rèn)為它是線程安全的邀泉。在沒(méi)有哨兵的情況下就是把n換成node,邏輯是一樣的钝鸽。
3.1.3 獲取鎖或者睡眠過(guò)程中線程被中斷(取消獲取鎖)
我們?cè)賮?lái)看看取消獲取鎖做了什么事情:
//取消獲取鎖
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null) //空判斷
return;
node.thread = null; //取消節(jié)點(diǎn)對(duì)線程的引用
//將node節(jié)點(diǎn)往前繼節(jié)點(diǎn)方向所有連續(xù)的取消狀態(tài)的節(jié)點(diǎn)出隊(duì)列
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED; //修改node狀態(tài)
// If we are the tail, remove ourselves.
//如果當(dāng)前節(jié)點(diǎn)是隊(duì)尾汇恤,則將當(dāng)前節(jié)點(diǎn)移除隊(duì)列,這個(gè)時(shí)候就不用設(shè)置前節(jié)點(diǎn)狀態(tài)
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null); //將前節(jié)點(diǎn)(這個(gè)時(shí)候已經(jīng)是隊(duì)尾節(jié)點(diǎn))的next指向null拔恰,這里不保證它一定會(huì)成功因谎,因?yàn)榭赡苡衅渌鹿?jié)點(diǎn)加入,用CAS方式避免將覆蓋其它線程的操作颜懊。
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head && //如果當(dāng)前節(jié)點(diǎn)不是隊(duì)尾财岔,也不是最靠前的節(jié)點(diǎn)
((ws = pred.waitStatus) == Node.SIGNAL || //前節(jié)點(diǎn)狀態(tài)已經(jīng)為Node.SIGNAL 或者 將前節(jié)點(diǎn)狀態(tài)更改為Node.SIGNAL成功
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) { //并且前節(jié)點(diǎn)還未出隊(duì)列
Node next = node.next;
if (next != null && next.waitStatus <= 0) //當(dāng)前節(jié)點(diǎn)的next節(jié)點(diǎn)存在并且沒(méi)有取消
compareAndSetNext(pred, predNext, next);//則將前節(jié)點(diǎn)的next指向node節(jié)點(diǎn)的next
} else {
unparkSuccessor(node); //喚醒后繼節(jié)點(diǎn) //node為頭節(jié)點(diǎn),或者更新前節(jié)點(diǎn)狀態(tài)為SIGNAL失敽拥(前節(jié)點(diǎn)就沒(méi)辦法自動(dòng)喚醒后節(jié)點(diǎn)了)匠璧,或者前節(jié)點(diǎn)thead==null(前節(jié)點(diǎn)已取消)
}
node.next = node; // help GC 后繼節(jié)點(diǎn)指向自己,去除引用咸这,幫助GC
}
}
其主要邏輯如下圖:
尾節(jié)點(diǎn)直接出隊(duì)列夷恍,tail指向pred節(jié)點(diǎn)。
這兩個(gè)圖相同的邏輯就在于將當(dāng)前節(jié)點(diǎn)狀態(tài)修改為取消媳维,然后thead置空裁厅,最后出隊(duì)列冰沙,但是這里沒(méi)有重新設(shè)置當(dāng)前節(jié)點(diǎn)的prev指針,如果從tail向前遍歷执虹,還是能遍歷到node節(jié)點(diǎn)拓挥。
3.1.4 需要喚醒后繼節(jié)點(diǎn)的情況
我們?cè)賮?lái)看下,當(dāng)前節(jié)點(diǎn)狀態(tài)更新為SIGNAL失敶(這是哪種情況)侥啤,或者當(dāng)前節(jié)點(diǎn)就是首節(jié)點(diǎn)時(shí),喚醒后繼節(jié)點(diǎn)是如何操作的:
//喚醒node節(jié)點(diǎn)的后繼節(jié)點(diǎn)
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
//在喚醒后繼之前茬故,先將node節(jié)點(diǎn)狀態(tài)改為0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//如果node的后繼節(jié)點(diǎn)被取消盖灸,則從隊(duì)尾向node節(jié)點(diǎn)遍歷,找到距離node節(jié)點(diǎn)最近的waitStatus<=0的節(jié)點(diǎn)磺芭,然后喚醒s節(jié)點(diǎn)
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
這里有一個(gè)問(wèn)題就是取消當(dāng)前節(jié)點(diǎn)獲取鎖赁炎,然后要喚醒后繼節(jié)點(diǎn)的時(shí)機(jī),為什么需要喚醒后繼節(jié)點(diǎn)钾腺?
我們?cè)倏聪逻@段代碼:
if (pred != head && //如果當(dāng)前節(jié)點(diǎn)不是隊(duì)尾徙垫,也不是最靠前的節(jié)點(diǎn)
((ws = pred.waitStatus) == Node.SIGNAL || //前節(jié)點(diǎn)狀態(tài)已經(jīng)為Node.SIGNAL 或者 將前節(jié)點(diǎn)狀態(tài)更改為Node.SIGNAL成功
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
……..
} else {
unparkSuccessor(node);
}
其中if判斷為false的情況有:
pred == head : 前節(jié)點(diǎn)為head節(jié)點(diǎn),說(shuō)明當(dāng)前節(jié)點(diǎn)為第一個(gè)有效節(jié)點(diǎn)放棒,如果當(dāng)前節(jié)點(diǎn)被中斷了姻报,head節(jié)點(diǎn)在喚醒后繼節(jié)點(diǎn)時(shí)可能會(huì)找到老節(jié)點(diǎn)(當(dāng)前移除的節(jié)點(diǎn)),所以需要手動(dòng)喚醒node的后繼節(jié)點(diǎn)间螟。
pred != head的情況吴旋,pred.waitStatus != Node.SIGNAL 并且ws<=0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL) 為false :即node的前繼節(jié)點(diǎn)非head的情況,前繼節(jié)點(diǎn)狀態(tài)不等于SIGNAL (ws>0即等于CANDLED厢破,或者ws=[0, CONDITION, PROPAGATE])但是更新?tīng)顟B(tài)為SIGNAL失敗荣瑟,這里ws>0是讓后繼節(jié)點(diǎn)醒來(lái),在acquireQueued方法中重新判斷是否獲取鎖還是睡眠摩泪。其它情況將狀態(tài)更新為SIGNAL失敗褂傀,則說(shuō)明前節(jié)點(diǎn)剛好完成鎖釋放并執(zhí)行了unparkSuccessor,在該方法中加勤,需要將當(dāng)前釋放鎖節(jié)點(diǎn)的狀態(tài)更新為0
當(dāng)pred!=head,前繼節(jié)點(diǎn)狀態(tài)也等于SIGNAL的時(shí)候,pred.thread == null, 這種情況唯有前繼節(jié)點(diǎn)為取消狀態(tài)線程才會(huì)為空同波,如果前繼節(jié)點(diǎn)已經(jīng)取消鳄梅,則應(yīng)該喚醒后繼節(jié)點(diǎn),重新處理acquireQueued未檩。
3.2 釋放寫(xiě)鎖
這里還是從ReentrantReadWriteLock可重入讀寫(xiě)鎖入手戴尸,查看它的釋放寫(xiě)鎖的實(shí)現(xiàn):
public void unlock() {
sync.release(1);
}
可以看到,通過(guò)同步器調(diào)用release方法來(lái)達(dá)到釋放鎖的目的冤狡。
重點(diǎn)看一下AQS中release方法的實(shí)現(xiàn):
public final boolean release(int arg) {
if (tryRelease(arg)) {//嘗試釋放鎖(共享變量)
Node h = head;
if (h != null && h.waitStatus != 0) //釋放鎖成功后孙蒙,喚醒同步隊(duì)列中下一個(gè)需要獲取鎖的節(jié)點(diǎn)
unparkSuccessor(h);
return true;
}
return false;
}
這里tryRelease由具體的鎖來(lái)實(shí)現(xiàn)项棠。
4、獲取讀鎖(共享鎖)
還是以ReentrantReadWriteLock鎖的實(shí)現(xiàn)為例挎峦,首先看下獲取讀鎖的代碼入口:
public void lock() {
sync.acquireShared(1);
}
很明顯也是調(diào)用同步器來(lái)實(shí)現(xiàn)的香追,這里主要關(guān)注acquireShared。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
整體邏輯與獲取寫(xiě)鎖類(lèi)似坦胶,首先tryAcquireShared透典,該方法的具體實(shí)現(xiàn)邏輯有鎖實(shí)現(xiàn)來(lái)決定,在tryAcquireShared中顿苇,鎖的實(shí)現(xiàn)會(huì)根據(jù)state來(lái)判斷峭咒,如果當(dāng)前已經(jīng)加了寫(xiě)鎖,則不能加讀鎖纪岁,如果當(dāng)前有其它線程獲取的讀鎖凑队,則本次同樣能加讀鎖,并且會(huì)判斷是否同一個(gè)線程幔翰,如果是漩氨,則重入次數(shù)加1。 如果獲取鎖不成功导匣,則進(jìn)入doAcquireShared方法才菠。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); //構(gòu)造同步隊(duì)列節(jié)點(diǎn)并加入隊(duì)列
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg); //如果前節(jié)點(diǎn)已經(jīng)是head節(jié)點(diǎn),則繼續(xù)嘗試tryAcquireShared
if (r >= 0) { //獲取讀鎖成功
setHeadAndPropagate(node, r); //獲取鎖成功贡定,并且喚醒后續(xù)需要獲取共享鎖的節(jié)點(diǎn)
p.next = null; // help GC
if (interrupted) //線程unpark后赋访,會(huì)判斷線程的中斷狀態(tài),如果線程已經(jīng)被中斷缓待,這里繼承中斷狀態(tài)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); //獲取共享鎖過(guò)程中的任何異常都需要取消該節(jié)點(diǎn)繼續(xù)獲取鎖蚓耽,詳情建寫(xiě)鎖的取消邏輯。
}
}
共享鎖的核心邏輯在于setHeadAndPropagate方法旋炒,該方法中實(shí)現(xiàn)了共享鎖獲取鎖的冒泡步悠。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node); //把當(dāng)前節(jié)點(diǎn)變成頭節(jié)點(diǎn), 并且將thread引用置空瘫镇,prev置空
if (propagate > 0 ||
h == null ||
h.waitStatus < 0 ||
(h = head) == null ||
h.waitStatus < 0) { //喚醒后繼節(jié)點(diǎn)中所有的共享模式的等待者
Node s = node.next;
if (s == null || s.isShared()) //后繼節(jié)點(diǎn)有可能正在加入隊(duì)列(在addWaiter中是cas保證tail成功鼎兽,然后再設(shè)置的next引用),或者后繼節(jié)點(diǎn)是共享模式铣除,都需要嘗試喚醒后繼節(jié)點(diǎn)谚咬。
doReleaseShared(); //這里不是釋放鎖, 是喚醒后繼節(jié)點(diǎn)尚粘。
//對(duì)于共享模式而言择卦,前者獲取到共享鎖后,需要喚醒后繼的共享鎖等待者;這與前繼節(jié)點(diǎn)釋放鎖后需要喚醒后繼節(jié)點(diǎn)邏輯一致秉继,所以作者把通通的共享模式下喚醒后繼節(jié)點(diǎn)的行為封裝為了一個(gè)方法祈噪。
}
}
從上面代碼可以看出,判斷是否需要繼續(xù)往后傳遞獲取鎖的行為取決于緊鄰的后繼節(jié)點(diǎn)的模式是否為Shared尚辑。如果為Shared模式辑鲤,則需要向后傳遞獲取鎖的行為,具體邏輯看doReleaseShared的實(shí)現(xiàn)腌巾。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // 有至少一個(gè)后繼節(jié)點(diǎn)
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { //后繼節(jié)點(diǎn)需要喚醒
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //這里為什么一定需要CAS成功才能喚醒后繼節(jié)點(diǎn)遂填,因?yàn)樵诠蚕砟J较拢罄^節(jié)點(diǎn)可能被喚醒后很快釋放鎖澈蝙,如果前節(jié)點(diǎn)狀態(tài)還是為SIGNAL,則等它釋放鎖時(shí)需要去喚醒后繼節(jié)點(diǎn)吓坚,此時(shí)它的后繼已經(jīng)釋放鎖了,這里就會(huì)有問(wèn)題灯荧,其實(shí)就是并發(fā)的問(wèn)題礁击。
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //如果節(jié)點(diǎn)狀態(tài)已經(jīng)為0,則說(shuō)明要么節(jié)點(diǎn)已經(jīng)執(zhí)行了unparkSuccessor逗载,或者沒(méi)有后繼節(jié)點(diǎn)了哆窿。
continue; // loop on failed CAS
}
//如果在執(zhí)行以上動(dòng)作中,有新節(jié)點(diǎn)獲取到鎖(誰(shuí)獲取到鎖厉斟,head指向誰(shuí))挚躯,從新嘗試喚醒新節(jié)點(diǎn)的后繼節(jié)點(diǎn)。
if (h == head) // loop if head changed
break;
}
}
doReleaseShared方法是一個(gè)自旋方法擦秽,首先判斷是否還有后繼節(jié)點(diǎn)if (h != null && h != tail)码荔,如果有后繼節(jié)點(diǎn),拿到當(dāng)前頭節(jié)點(diǎn)狀態(tài)感挥,如果頭節(jié)點(diǎn)狀態(tài)為SIGNAL缩搅,則需要喚醒后繼節(jié)點(diǎn),這里后繼節(jié)點(diǎn)可能是共享節(jié)點(diǎn)触幼,也可能是一個(gè)獨(dú)占節(jié)點(diǎn)(才掛上來(lái)的)硼瓣,我們注意到代碼在執(zhí)行喚醒后繼節(jié)點(diǎn)unparkSuccessor之前,先執(zhí)行了if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))置谦,這里的邏輯顯然于獨(dú)占模式釋放鎖喚醒后繼節(jié)點(diǎn)或者共享模式釋放鎖的邏輯不太一樣堂鲤,這里是強(qiáng)制優(yōu)先講頭節(jié)點(diǎn)狀態(tài)設(shè)置0,只有設(shè)置為0成功的線程才能執(zhí)行喚醒下一個(gè)節(jié)點(diǎn)的操作媒峡。這里之所以要這樣設(shè)計(jì)瘟栖,是因?yàn)楣蚕砟J较拢淮吾尫沛i的線程可能有多個(gè)丝蹭,甚至存在有的共享節(jié)點(diǎn)在釋放鎖,有的共享節(jié)點(diǎn)卻在獲取鎖,但是不管怎么樣奔穿,他們都會(huì)并發(fā)的調(diào)用doReleaseShared方法镜沽,如果這里不加以控制,則會(huì)重復(fù)的調(diào)用unparkSuccessor贱田,最后會(huì)重復(fù)的調(diào)用unpark方法(這里我還沒(méi)明白如果重復(fù)調(diào)用unpark會(huì)有什么問(wèn)題)缅茉。
那么剛才沒(méi)有compareAndSetWaitStatus(h, Node.SIGNAL, 0)成功的節(jié)點(diǎn)會(huì)再次循環(huán),再次進(jìn)來(lái)時(shí)發(fā)現(xiàn)此時(shí)頭節(jié)點(diǎn)狀態(tài)為0(因?yàn)槠渌€程執(zhí)行了喚醒操作)男摧,則講狀態(tài)更新為PROPAGATE蔬墩,這里重點(diǎn)說(shuō)下這個(gè)邏輯,我翻遍了所有的代碼都沒(méi)有發(fā)現(xiàn)哪里有指定將PROPAGATE變更為其它狀態(tài)的邏輯耗拓,所以我不指定作者這里設(shè)計(jì)一個(gè)這個(gè)狀態(tài)有何用拇颅,我認(rèn)為更不需要compareAndSetWaitStatus(h, 0, Node.PROPAGATE)這段邏輯,當(dāng)其線程再次進(jìn)入判斷發(fā)現(xiàn)頭節(jié)點(diǎn)狀態(tài)不為SIGNAL時(shí)乔询,直接結(jié)束樟插。這種情況下再來(lái)看剛才的喚醒操作,當(dāng)喚醒后的節(jié)點(diǎn)重新嘗試獲取鎖失敗時(shí)竿刁,它又會(huì)執(zhí)行shouldParkAfterFailedAcquire方法黄锤,然后將前節(jié)點(diǎn)更改為SIGNAL,如果喚醒后的節(jié)點(diǎn)獲取鎖成功食拜,則head節(jié)點(diǎn)指向該節(jié)點(diǎn)鸵熟,后續(xù)的喚醒操作只與該節(jié)點(diǎn)有關(guān)。所以真心不明白為什么要有段設(shè)置為PROPAGATE的邏輯负甸,主要是代碼中任何地方看不到使用了PROPAGATE的這個(gè)邏輯流强。
這里再說(shuō)下自旋,退出自旋的條件是if (h == head)惑惶,即在處理過(guò)程中沒(méi)有新的節(jié)點(diǎn)獲取到鎖煮盼,用反證法,假設(shè)有新的節(jié)點(diǎn)獲取到鎖带污,這種情況下如果退出了方法會(huì)有問(wèn)題么 僵控? 我認(rèn)為沒(méi)有問(wèn)題,因?yàn)楂@取到鎖的節(jié)點(diǎn)最終會(huì)釋放鎖鱼冀,釋放鎖的動(dòng)作又會(huì)喚起后繼節(jié)點(diǎn)报破,所以為什么要自旋呢?
這一段如果有看明白作者意圖的還請(qǐng)指點(diǎn)一二千绪。
這里我們可以來(lái)看下整個(gè)node節(jié)點(diǎn)的狀態(tài)變化:
通過(guò)上圖觀察充易,所有從0變到PROPAGATE的,最后都會(huì)因?yàn)樾录尤牍?jié)點(diǎn)或者節(jié)點(diǎn)取消而變稱SIGNAL荸型,而SIGNAL最終又都會(huì)因?yàn)樾枰獑酒鸷罄^節(jié)點(diǎn)而將當(dāng)前節(jié)點(diǎn)更新為0盹靴,這形成了一個(gè)完整的閉環(huán),那為什么不是直接0變?yōu)镾IGNAL,不知道PROPAGATE在中間起到什么作用稿静。
5梭冠、釋放讀鎖
相較于獲取讀鎖,釋放的過(guò)程就比較簡(jiǎn)單了
public void unlock() {
sync.releaseShared(1); //調(diào)用同步器的releaseShared
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
可以看到改备, 在嘗試獲取鎖成功后控漠,直接調(diào)用doReleaseShared喚醒后繼節(jié)點(diǎn)。這里的邏輯就與共享鎖獲取鎖后需要繼續(xù)喚醒緊鄰的后繼共享節(jié)點(diǎn)一樣悬钳。
還有從這里就可以看出盐捷,doReleaseShared方法是被多個(gè)線程同時(shí)調(diào)用的,所以在代碼里面unparkSuccessor處需要進(jìn)行并發(fā)的控制默勾。
6碉渡、帶中斷的獲取鎖
不管是讀鎖還是寫(xiě)鎖,在獲取鎖的過(guò)程中還有一類(lèi)獲取方法是在等待鎖的過(guò)程中允許線程被中斷的灾测,方法會(huì)拋出InterruptedException異常爆价。
這里用共享鎖的lockInterruptibly方法舉例:
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1); //調(diào)用同步器的帶中斷的獲取共享鎖方法
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted()) //首先檢查線程的中斷狀態(tài),如果線程已經(jīng)被中斷媳搪,則拋出異常铭段,終止程序
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
接下來(lái)看下doAcquireSharedInterruptibly的實(shí)現(xiàn):
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); //線程在等待過(guò)程中被中斷,當(dāng)線程醒來(lái)后判斷到中斷狀態(tài)秦爆,直接拋出中斷異常終止程序
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireSharedInterruptibly與doAcquireShared方法相比序愚,唯一的區(qū)別就在于線程被unpark后如何處理中斷的,在doAcquireShared中等限,是將中斷狀態(tài)設(shè)置為一個(gè)boolean標(biāo)志返回給調(diào)用者爸吮,讓調(diào)用者自行決定該如何處理,而在doAcquireSharedInterruptibly中望门,如果發(fā)現(xiàn)線程被中斷形娇,直接拋出中斷異常終止程序。
7筹误、公平/非公平同步器
公平與非公平同步器其實(shí)與AQS本身無(wú)關(guān)桐早,這都是AQS具體的實(shí)現(xiàn),不同的鎖有不同的實(shí)現(xiàn)方式厨剪,但是總體的規(guī)則是一致的哄酝。因?yàn)槭茿QS的實(shí)現(xiàn),我想有必要這里說(shuō)一下祷膳,具體的實(shí)現(xiàn)請(qǐng)參考各個(gè)鎖的實(shí)現(xiàn)陶衅。
公平同步器:
所謂公平,即遵循先到先得的原則直晨,誰(shuí)先到達(dá)隊(duì)列搀军,則誰(shuí)就優(yōu)先獲取鎖膨俐,不得在剛嘗試鎖的時(shí)候不管隊(duì)列中是否有等待中的節(jié)點(diǎn)直接競(jìng)爭(zhēng)鎖。
而所謂不公平其實(shí)就是當(dāng)一個(gè)線程嘗試獲取鎖的時(shí)候罩句,不會(huì)主動(dòng)的去判斷同步隊(duì)列中是否有等待的節(jié)點(diǎn)吟策,直接就去競(jìng)爭(zhēng)鎖,如果競(jìng)爭(zhēng)失敗則進(jìn)入同步隊(duì)列進(jìn)行等待的止。
8、ConditionObject條件等待
在AQS中還有一個(gè)比較重要的類(lèi):ConditionObject着撩,這個(gè)類(lèi)實(shí)現(xiàn)了在基于AQS鎖的情況下對(duì)獲取到鎖的線程進(jìn)行有條件的等待和喚醒诅福,其主要的方法是await和signal以及它們的變種。有很多博客都拿它與Object對(duì)象的wait和notify作比較拖叙,說(shuō)ConditionObject的await和signal運(yùn)用的地方要比wait和notify廣氓润,其實(shí)并不然。它們使用的場(chǎng)景是截然不同的薯鳍,不然的話Doug Lea也不會(huì)費(fèi)這力氣重新造一個(gè)輪子咖气。
我認(rèn)為他們的區(qū)別主要在于,Object的wait和notify其實(shí)是在任何情況下都是可以調(diào)用的挖滤,而ConditionObject的await和signal必須要在基于AQS的鎖環(huán)境下才能調(diào)用崩溪,不然就會(huì)拋出異常(這也是我認(rèn)為它們之間最大的差異);其次斩松,ConditionObject是專(zhuān)門(mén)為AQS服務(wù)的伶唯,它的節(jié)點(diǎn)的構(gòu)造,狀態(tài)的標(biāo)志等都與AQS有關(guān)惧盹,在wait操作和notify操作時(shí)都需要去操作AQS的同步隊(duì)列乳幸。
所以綜上所述,ConditionObject是專(zhuān)門(mén)為AQS服務(wù)的钧椰,而不像有的博客寫(xiě)的它的用途要比Object的實(shí)現(xiàn)要廣粹断。
接下來(lái)看看ConditionObject的具體實(shí)現(xiàn)。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
/***
methods
***/
}
它的核心結(jié)構(gòu)就是有一個(gè)Node的頭節(jié)點(diǎn)和尾節(jié)點(diǎn)嫡霞,然后Node中通過(guò)Node nextWaiter進(jìn)行關(guān)聯(lián)形成了一個(gè)Condition鏈表隊(duì)列瓶埋,后續(xù)所有的操作都是圍繞這個(gè)隊(duì)列和同步隊(duì)列來(lái)進(jìn)行的。
8.1 await方法
public final void await() throws InterruptedException {
if (Thread.interrupted()) //判斷當(dāng)前線程是否被中斷
throw new InterruptedException();
Node node = addConditionWaiter(); //①將當(dāng)前線程構(gòu)造為Condition等待節(jié)點(diǎn)并加入隊(duì)列秒际,詳情見(jiàn)addConditionWaiter說(shuō)明
int savedState = fullyRelease(node); //②釋放鎖現(xiàn)在全部的狀態(tài)悬赏,鎖可能有重入,所以這里不是直接調(diào)用AQS的release方法娄徊,詳情見(jiàn)fullyRelease說(shuō)明
int interruptMode = 0; //標(biāo)記線程在await過(guò)程中的中斷狀態(tài)闽颇,0表示未中斷
while (!isOnSyncQueue(node)) { //判斷node節(jié)點(diǎn)是否在同步隊(duì)列中,只有node節(jié)點(diǎn)進(jìn)入了同步隊(duì)列循環(huán)才會(huì)結(jié)束(即寄锐,被signal了)
LockSupport.park(this);//如果不在同步隊(duì)列中兵多, 則park當(dāng)前線程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //③線程被喚醒或者中斷后尖啡,判斷線程的中斷狀態(tài)
break; //如果線程被中斷過(guò),則退出循環(huán)
}
//④線程被喚醒后重新獲取鎖奈懒,鎖狀態(tài)恢復(fù)到savedState
//不管線程是:未中斷,還是signal中斷,singnal后中斷,前面的代碼 都會(huì)保證node節(jié)點(diǎn)進(jìn)入同步隊(duì)列。
//acquireQueued 方法獲取到鎖茂嗓,并且在獲取鎖park的過(guò)程中有被中斷,并且之前在await過(guò)程中入撒,不是被signal之前就中斷的情況,則標(biāo)記后續(xù)處理中斷的情況為interruptMode。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//⑤重新獲取到鎖墙贱,把節(jié)點(diǎn)從condition隊(duì)列中去除窍箍,同時(shí)也會(huì)清除被取消的節(jié)點(diǎn)
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//⑥線程被中斷祷蝌,根據(jù)中斷條件選擇拋出異澈龋或者重新中斷傳遞狀態(tài)
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
//向Condition隊(duì)列添加等待者
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) { //如果隊(duì)尾節(jié)點(diǎn)已經(jīng)取消,則先清空隊(duì)列中的取消節(jié)點(diǎn)
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION); //加入到Condition隊(duì)列中的節(jié)點(diǎn)狀態(tài)都為CONDITION
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
從以上代碼可以看出:condition隊(duì)列中钾埂,節(jié)點(diǎn)是通過(guò)nextWaiter來(lái)形成鏈表的汞贸,隊(duì)列中所有節(jié)點(diǎn)的狀態(tài)為CONDITION
//釋放節(jié)點(diǎn)
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState(); //獲取當(dāng)前鎖的狀態(tài)
if (release(savedState)) { //全部釋放掉當(dāng)前狀態(tài)
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException(); //釋放失敗拋出異常
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
從fullyRelease的實(shí)現(xiàn)可以看出:release方法調(diào)用了tryRelease楣责,如果tryRelease成功,則喚醒后繼節(jié)點(diǎn)秆麸,這里要注意的是如果release不成功初嘹,則會(huì)拋出IllegalMonitorStateException異常,其實(shí)我認(rèn)為這里的else里面拋出異常是多余的沮趣,因?yàn)榫蛅ryRelease的實(shí)現(xiàn)來(lái)看屯烦,如果不是本線程去釋放自己獲得的鎖,tryRelease本身就會(huì)拋出IllegalMonitorStateException異常的房铭,而如果是本線程在釋放鎖驻龟,那一定是在持有鎖的情況下來(lái)釋放鎖的,這種情況一定會(huì)成功的缸匪,所以根本不會(huì)release失敗迅脐,所以代碼怎么都進(jìn)不到else中去。但是可以總結(jié)出的是豪嗽,await等方法一定是要在線程獲取AQS鎖的情況下調(diào)用谴蔑,否則就會(huì)拋出異常。另外龟梦,如果在釋放過(guò)程中線程中斷隐锭,則將節(jié)點(diǎn)設(shè)置為CANCELLED。
③
//檢查Condition隊(duì)列中節(jié)點(diǎn)在等待過(guò)程中的中斷狀態(tài)
//THROW_IE:表示在signal之前被中斷喚醒
// REINTERRUPT:表示在signal之后有中斷计贰,在singnal之后被通斷钦睡,需要保證singnal的行為最終完成,所以中斷只用延續(xù)狀態(tài)狀態(tài)REINTERRUPT躁倒,不用拋出異常荞怒。
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ? //當(dāng)前線程中斷狀態(tài)
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
//如果節(jié)點(diǎn)是被signal喚醒洒琢,則狀態(tài)會(huì)被更新為0,然后入同步隊(duì)列褐桌,最后才是被unpark衰抑,所以這里如果能CAS成功,則說(shuō)明節(jié)點(diǎn)沒(méi)有被signal荧嵌,所以線程是在await過(guò)程中被中斷的呛踊。所以在這里需要將節(jié)點(diǎn)入隊(duì)列。
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node); //節(jié)點(diǎn)入同步隊(duì)列
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
//如果節(jié)點(diǎn)是被signal喚醒的啦撮,則節(jié)點(diǎn)應(yīng)該會(huì)在同步隊(duì)列中谭网,什么情況下被signal喚醒但是node節(jié)點(diǎn)不在同步隊(duì)列中,而等待一會(huì)兒就在同步隊(duì)列中了赃春,這點(diǎn)確實(shí)沒(méi)想明白愉择。
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
8.2 signal
signal的用途是喚醒調(diào)用await方法后進(jìn)入park的線程。主要代碼如下:
public final void signal() {
if (!isHeldExclusively()) //判斷當(dāng)前線程是否是鎖的持有者织中,如果不是則拋出異常
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null) //頭節(jié)點(diǎn)不為空則鏈表不為空
doSignal(first);
}
下面詳細(xì)看下doSignal的實(shí)現(xiàn):
//喚醒在該條件上等待時(shí)間最長(zhǎng)的且狀態(tài)正常的節(jié)點(diǎn)
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && //循環(huán)從first嘗試將節(jié)點(diǎn)轉(zhuǎn)換為同步隊(duì)列節(jié)點(diǎn)锥涕,直到轉(zhuǎn)換成功或者遍歷完鏈表。
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//如果CAS失敗抠璃,則說(shuō)明節(jié)點(diǎn)狀態(tài)不為CCONDITION,則返回false繼續(xù)嘗試下一個(gè)節(jié)點(diǎn)脱惰。
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node); //將node節(jié)點(diǎn)加入同步隊(duì)列
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //如果隊(duì)列中前節(jié)點(diǎn)被取消或者CAS前節(jié)點(diǎn)狀態(tài)為SIGNAL失敗搏嗡,手動(dòng)喚醒已經(jīng)進(jìn)入同步隊(duì)列的node節(jié)點(diǎn)
LockSupport.unpark(node.thread);
return true;
}
寫(xiě)在最后:看AQS的源碼真的是花了不少的時(shí)間,但是其中任然有一些我認(rèn)為不應(yīng)該這樣寫(xiě)的邏輯拉一,我知道大部分情況是我沒(méi)想到作者所思考的情況采盒,所以還請(qǐng)各位看客輕拍,如果覺(jué)得對(duì)大家有幫助麻煩點(diǎn)個(gè)贊。