AQS是JUC鎖框架中最重要的類酥艳,通過它來實現(xiàn)獨占鎖和共享鎖的裁赠。本章是對AbstractQueuedSynchronizer源碼的完全解析辩恼,分為四個部分介紹:
- CLH隊列即同步隊列:儲存著所有等待鎖的線程
- 獨占鎖
- 共享鎖
- Condition條件
注: 還有一個AbstractQueuedLongSynchronizer類选侨,它與AQS功能和實現(xiàn)幾乎一樣锐想,唯一不同的是AQLS中代表鎖被獲取次數(shù)的成員變量state類型是long長整類型融蹂,而AQS中該成員變量是int類型旺订。
一. CLH隊列(線程同步隊列)
因為獲取鎖是有條件的弄企,沒有獲取鎖的線程就要阻塞等待,那么就要存儲這些等待的線程区拳。
在AQS中我們使用CLH隊列儲存這些等待的線程拘领,但它并不是直接儲存線程,而是儲存擁有線程的node節(jié)點樱调。所以先介紹重要內(nèi)部類Node约素。
1.1 內(nèi)部類Node
static final class Node {
// 共享模式的標(biāo)記
static final Node SHARED = new Node();
// 獨占模式的標(biāo)記
static final Node EXCLUSIVE = null;
// waitStatus變量的值,標(biāo)志著線程被取消
static final int CANCELLED = 1;
// waitStatus變量的值笆凌,標(biāo)志著后繼線程(即隊列中此節(jié)點之后的節(jié)點)需要被阻塞.(用于獨占鎖)
static final int SIGNAL = -1;
// waitStatus變量的值圣猎,標(biāo)志著線程在Condition條件上等待阻塞.(用于Condition的await等待)
static final int CONDITION = -2;
// waitStatus變量的值,標(biāo)志著下一個acquireShared方法線程應(yīng)該被允許乞而。(用于共享鎖)
static final int PROPAGATE = -3;
// 標(biāo)記著當(dāng)前節(jié)點的狀態(tài)送悔,默認(rèn)狀態(tài)是0, 小于0的狀態(tài)都是有特殊作用,大于0的狀態(tài)表示已取消
volatile int waitStatus;
// prev和next實現(xiàn)一個雙向鏈表
volatile Node prev;
volatile Node next;
// 該節(jié)點擁有的線程
volatile Thread thread;
// 可能有兩種作用:1. 表示下一個在Condition條件上等待的節(jié)點
// 2. 表示是共享模式或者獨占模式爪模,注意第一種情況節(jié)點一定是共享模式
Node nextWaiter;
// 是不是共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
// 返回前一個節(jié)點prev欠啤,如果為null,則拋出NullPointerException異常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 用于創(chuàng)建鏈表頭head屋灌,或者共享模式SHARED
Node() {
}
// 使用在addWaiter方法中
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// 使用在Condition條件中
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
重要的成員屬性:
- waitStatus: 表示當(dāng)前節(jié)點的狀態(tài)洁段,默認(rèn)狀態(tài)是0」补總共有五個值CANCELLED眉撵、SIGNAL、CONDITION落塑、PROPAGATE以及0纽疟。
- prev和next:記錄著當(dāng)前節(jié)點前一個節(jié)點和后一個節(jié)點的引用。
- thread:當(dāng)前節(jié)點擁有的線程憾赁。當(dāng)擁有鎖的線程釋放鎖的時候污朽,可能會調(diào)用LockSupport.unpark(thread),喚醒這個被阻塞的線程龙考。
- nextWaiter:如果是SHARED蟆肆,表示當(dāng)前節(jié)點是共享模式,如果是null晦款,當(dāng)前節(jié)點是獨占模式炎功,如果是其他值,當(dāng)前節(jié)點也是獨占模式缓溅,不過這個值也是Condition隊列的下一個節(jié)點蛇损。
注意:通過Node我們可以實現(xiàn)兩個隊列,一是通過prev和next實現(xiàn)CLH隊列(線程同步隊列,雙向隊列),二是nextWaiter實現(xiàn)Condition條件上的等待線程隊列(單向隊列)淤齐,后一個我們在Condition中介紹股囊。
1.2 操作CLH隊列
1.2.1 存儲CLH隊列
// CLH隊列頭
private transient volatile Node head;
// CLH隊列尾
private transient volatile Node tail;
1.2.2 設(shè)置CLH隊列頭head
/**
* 通過CAS函數(shù)設(shè)置head值,僅僅在enq方法中調(diào)用
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
這個方法只在enq方法中調(diào)用更啄,通過CAS函數(shù)設(shè)置head值稚疹,保證多線程安全
// 重新設(shè)置隊列頭head,它只在acquire系列的方法中調(diào)用
private void setHead(Node node) {
head = node;
// 線程也沒有意義了祭务,因為該線程已經(jīng)獲取到鎖了
node.thread = null;
// 前一個節(jié)點已經(jīng)沒有意義了
node.prev = null;
}
這個方法只在acquire系列的方法中調(diào)用内狗,重新設(shè)置head,表示移除一些等待線程節(jié)點义锥。
1.2.3 設(shè)置CLH隊列尾tail
/**
* 通過CAS函數(shù)設(shè)置tail值柳沙,僅僅在enq方法中調(diào)用
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
這個方法只在enq方法中調(diào)用,通過CAS函數(shù)設(shè)置tail值缨该,保證多線程安全
1.2.4 將一個節(jié)點插入到CLH隊列尾
// 向隊列尾插入新節(jié)點偎行,如果隊列沒有初始化川背,就先初始化贰拿。返回原先的隊列尾節(jié)點
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// t為null,表示隊列為空熄云,先初始化隊列
if (t == null) {
// 采用CAS函數(shù)即原子操作方式膨更,設(shè)置隊列頭head值。
// 如果成功缴允,再將head值賦值給鏈表尾tail荚守。如果失敗,表示head值已經(jīng)被其他線程练般,那么就進入循環(huán)下一次
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 新添加的node節(jié)點的前一個節(jié)點prev指向原來的隊列尾tail
node.prev = t;
// 采用CAS函數(shù)即原子操作方式矗漾,設(shè)置新隊列尾tail值。
if (compareAndSetTail(t, node)) {
// 設(shè)置老的隊列尾tail的下一個節(jié)點next指向新添加的節(jié)點node
t.next = node;
return t;
}
}
}
}
這個方法向CLH隊列尾插入一個新節(jié)點薄料,如果隊列為空敞贡,就先創(chuàng)建隊列再插入新節(jié)點,返回老的隊列尾節(jié)點摄职。
1.2.5 將當(dāng)前線程添加到CLH隊列尾
// 通過給定的模式mode(獨占或者共享)為當(dāng)前線程創(chuàng)建新節(jié)點誊役,并插入隊列中
private Node addWaiter(Node mode) {
// 為當(dāng)前線程創(chuàng)建新的節(jié)點
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 如果隊列已經(jīng)創(chuàng)建,就將新節(jié)點插入隊列尾谷市。
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果隊列沒有創(chuàng)建蛔垢,通過enq方法創(chuàng)建隊列,并插入新的節(jié)點迫悠。
enq(node);
return node;
}
為當(dāng)前線程創(chuàng)建一個新節(jié)點鹏漆,再插入到CLH隊列尾,返回新創(chuàng)建的節(jié)點。
二. 獨占鎖
我們先想一想獨占鎖的功能是什么?
獨占鎖至少有兩個功能:
- 獲取鎖的功能甫男。 當(dāng)多個線程一起獲取鎖的時候且改,只有一個線程能獲取到鎖,其他線程必須在當(dāng)前位置阻塞等待板驳。
- 釋放鎖的功能又跛。獲取鎖的線程釋放鎖資源,而且還必須能喚醒正在等待鎖資源的一個線程若治。
帶著這些疑惑慨蓝,我們來看AQS中是怎么實現(xiàn)的。
2.1 獲取獨占鎖的方法
2.1.1 acquire方法
/**
* 獲取獨占鎖端幼。如果沒有獲取到礼烈,線程就會阻塞等待,直到獲取鎖婆跑。不會響應(yīng)中斷異常
* @param arg
*/
public final void acquire(int arg) {
// 1. 先調(diào)用tryAcquire方法此熬,嘗試獲取獨占鎖,返回true滑进,表示獲取到鎖犀忱,不需要執(zhí)行acquireQueued方法。
// 2. 調(diào)用acquireQueued方法扶关,先調(diào)用addWaiter方法為當(dāng)前線程創(chuàng)建一個節(jié)點node阴汇,并插入隊列中重挑,
// 然后調(diào)用acquireQueued方法去獲取鎖鸟缕,如果不成功,就會讓當(dāng)前線程阻塞泵督,當(dāng)鎖釋放時才會被喚醒铜异。
// acquireQueued方法返回值表示在線程等待過程中哥倔,是否有另一個線程調(diào)用該線程的interrupt方法,發(fā)起中斷揍庄。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
將這個方法一下格式咆蒿,大家就很好理解了
public final void acquire(int arg) {
// 1.先調(diào)用tryAcquire方法,嘗試獲取獨占鎖币绩,返回true則直接返回
if (tryAcquire(arg)) return;
// 2. 調(diào)用addWaiter方法為當(dāng)前線程創(chuàng)建一個節(jié)點node蜡秽,并插入隊列中
Node node = addWaiter(Node.EXCLUSIVE);
// 調(diào)用acquireQueued方法去獲取鎖,
// acquireQueued方法返回值表示在線程等待過程中缆镣,是否有另一個線程調(diào)用該線程的interrupt方法芽突,發(fā)起中斷。
boolean interrupted = acquireQueued(node, arg);
// 如果interrupted為true董瞻,則當(dāng)前線程要發(fā)起中斷請求
if (interrupted) {
selfInterrupt();
}
}
2.1.2 tryAcquire方法
// 嘗試去獲取獨占鎖寞蚌,立即返回田巴。如果返回true表示獲取鎖成功。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
如果子類想實現(xiàn)獨占鎖挟秤,則必須重寫這個方法壹哺,否則拋出異常。這個方法的作用是當(dāng)前線程嘗試獲取鎖艘刚,如果獲取到鎖管宵,就會返回true,并更改鎖資源攀甚。沒有獲取到鎖返回false箩朴。
注:這個方法是立即返回的,不會阻塞當(dāng)前線程
下面是ReentrantLock中FairSync的tryAcquire方法實現(xiàn)
// 嘗試獲取鎖秋度,與非公平鎖最大的不同就是調(diào)用hasQueuedPredecessors()方法
// hasQueuedPredecessors方法返回true炸庞,表示等待線程隊列中有一個線程在當(dāng)前線程之前,
// 根據(jù)公平鎖的規(guī)則荚斯,當(dāng)前線程不能獲取鎖埠居。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 獲取鎖的記錄狀態(tài)
int c = getState();
// 如果c==0表示當(dāng)前鎖是空閑的
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 判斷當(dāng)前線程是不是獨占鎖的線程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 更改鎖的記錄狀態(tài)
setState(nextc);
return true;
}
return false;
}
2.1.3 acquireQueued方法
addWaiter方法已經(jīng)在上面講解了。acquireQueued方法作用就是獲取鎖事期,如果沒有獲取到滥壕,就讓當(dāng)前線程阻塞等待。
/**
* 想要獲取鎖的 acquire系列方法刑赶,都會這個方法來獲取鎖
* 循環(huán)通過tryAcquire方法不斷去獲取鎖捏浊,如果沒有獲取成功懂衩,
* 就有可能調(diào)用parkAndCheckInterrupt方法撞叨,讓當(dāng)前線程阻塞
* @param node 想要獲取鎖的節(jié)點
* @param arg
* @return 返回true,表示在線程等待的過程中浊洞,線程被中斷了
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 表示線程在等待過程中牵敷,是否被中斷了
boolean interrupted = false;
// 通過死循環(huán),直到node節(jié)點的線程獲取到鎖法希,才返回
for (;;) {
// 獲取node的前一個節(jié)點
final Node p = node.predecessor();
// 如果前一個節(jié)點是隊列頭head枷餐,并且嘗試獲取鎖成功
// 那么當(dāng)前線程就不需要阻塞等待,繼續(xù)執(zhí)行
if (p == head && tryAcquire(arg)) {
// 將節(jié)點node設(shè)置為新的隊列頭
setHead(node);
// help GC
p.next = null;
// 不需要調(diào)用cancelAcquire方法
failed = false;
return interrupted;
}
// 當(dāng)p節(jié)點的狀態(tài)是Node.SIGNAL時苫亦,就會調(diào)用parkAndCheckInterrupt方法毛肋,阻塞node線程
// node線程被阻塞,有兩種方式喚醒屋剑,
// 1.是在unparkSuccessor(Node node)方法润匙,會喚醒被阻塞的node線程,返回false
// 2.node線程被調(diào)用了interrupt方法唉匾,線程被喚醒孕讳,返回true
// 在這里只是簡單地將interrupted = true匠楚,沒有跳出for的死循環(huán),繼續(xù)嘗試獲取鎖
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// failed為true厂财,表示發(fā)生異常芋簿,非正常退出
// 則將node節(jié)點的狀態(tài)設(shè)置成CANCELLED,表示node節(jié)點所在線程已取消璃饱,不需要喚醒了与斤。
if (failed)
cancelAcquire(node);
}
}
主要流程:
- 通過for (;;)死循環(huán),直到node節(jié)點的線程獲取到鎖荚恶,才跳出循環(huán)幽告。
- 獲取node節(jié)點的前一個節(jié)點p。
- 當(dāng)前一個節(jié)點p時CLH隊列頭節(jié)點時裆甩,調(diào)用tryAcquire方法嘗試去獲取鎖冗锁,如果獲取成功,就將節(jié)點node設(shè)置成CLH隊列頭節(jié)點(相當(dāng)于移除節(jié)點node和之前的節(jié)點)然后return返回嗤栓。
注意:只有當(dāng)node節(jié)點的前一個節(jié)點是隊列頭節(jié)點時冻河,才會嘗試獲取鎖,所以獲取鎖是有順序的茉帅,按照添加到CLH隊列時的順序叨叙。- 調(diào)用shouldParkAfterFailedAcquire方法,來決定是否要阻塞當(dāng)前線程堪澎。
- 調(diào)用parkAndCheckInterrupt方法擂错,阻塞當(dāng)前線程。
- 如果當(dāng)前線程發(fā)生異常樱蛤,非正常退出钮呀,那么會在finally模塊中調(diào)用cancelAcquire(node)方法,取消當(dāng)前節(jié)點狀態(tài)昨凡。
注意:這里當(dāng)嘗試獲取鎖失敗時爽醋,并沒有立即阻塞當(dāng)前線程,但是因為在for (;;)死循環(huán)里便脊,會繼續(xù)循環(huán)蚂四,方法不會返回。
2.1.4 shouldParkAfterFailedAcquire方法
這個方法的返回值決定是否要阻塞當(dāng)前線程
/**
* 根據(jù)前一個節(jié)點pred的狀態(tài)哪痰,來判斷當(dāng)前線程是否應(yīng)該被阻塞
* @param pred : node節(jié)點的前一個節(jié)點
* @param node
* @return 返回true 表示當(dāng)前線程應(yīng)該被阻塞遂赠,之后應(yīng)該會調(diào)用parkAndCheckInterrupt方法來阻塞當(dāng)前線程
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果前一個pred的狀態(tài)是Node.SIGNAL,那么直接返回true晌杰,當(dāng)前線程應(yīng)該被阻塞
return true;
if (ws > 0) {
// 如果前一個節(jié)點狀態(tài)是Node.CANCELLED(大于0就是CANCELLED)跷睦,
// 表示前一個節(jié)點所在線程已經(jīng)被喚醒了,要從CLH隊列中移除CANCELLED的節(jié)點乎莉。
// 所以從pred節(jié)點一直向前查找直到找到不是CANCELLED狀態(tài)的節(jié)點送讲,并把它賦值給node.prev奸笤,
// 表示node節(jié)點的前一個節(jié)點已經(jīng)改變。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 此時前一個節(jié)點pred的狀態(tài)只能是0或者PROPAGATE哼鬓,不可能是CONDITION狀態(tài)
// CONDITION(這個是特殊狀態(tài)监右,只在condition列表中節(jié)點中存在,CLH隊列中不存在這個狀態(tài)的節(jié)點)
// 將前一個節(jié)點pred的狀態(tài)設(shè)置成Node.SIGNAL异希,這樣在下一次循環(huán)時健盒,就是直接阻塞當(dāng)前線程
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
我們發(fā)現(xiàn)是根據(jù)前一個節(jié)點的狀態(tài),來決定是否阻塞當(dāng)前線程称簿。而前一個節(jié)點狀態(tài)是在哪里改變的呢扣癣?驚奇地發(fā)現(xiàn)也是在這個方法中改變的。
- 如果前一個節(jié)點狀態(tài)是Node.SIGNAL憨降,那么直接返回true父虑,阻塞當(dāng)前線程
- 如果前一個節(jié)點狀態(tài)是Node.CANCELLED(大于0就是CANCELLED),表示前一個節(jié)點所在線程已經(jīng)被喚醒了授药,要從CLH隊列中移除CANCELLED的節(jié)點士嚎。所以從pred節(jié)點一直向前查找直到找到不是CANCELLED狀態(tài)的節(jié)點。
并把它賦值給node.prev悔叽,表示node節(jié)點的前一個節(jié)點已經(jīng)改變莱衩。在acquireQueued方法中進行下一次循環(huán)。- 不是前面兩種狀態(tài)娇澎,那么就將前一個節(jié)點狀態(tài)設(shè)置成Node.SIGNAL笨蚁,表示需要阻塞當(dāng)前線程,這樣再下一次循環(huán)時趟庄,就會直接阻塞當(dāng)前線程括细。
2.1.5 parkAndCheckInterrupt 方法
阻塞當(dāng)前線程,線程被喚醒后返回當(dāng)前線程中斷狀態(tài)
/**
* 阻塞當(dāng)前線程岔激,線程被喚醒后返回當(dāng)前線程中斷狀態(tài)
*/
private final boolean parkAndCheckInterrupt() {
// 通過LockSupport.park方法勒极,阻塞當(dāng)前線程
LockSupport.park(this);
// 當(dāng)前線程被喚醒后是掰,返回當(dāng)前線程中斷狀態(tài)
return Thread.interrupted();
}
通過LockSupport.park(this)阻塞當(dāng)前線程虑鼎。
2.1.6 cancelAcquire方法
將node節(jié)點的狀態(tài)設(shè)置成CANCELLED,表示node節(jié)點所在線程已取消键痛,不需要喚醒了炫彩。
// 將node節(jié)點的狀態(tài)設(shè)置成CANCELLED,表示node節(jié)點所在線程已取消絮短,不需要喚醒了江兢。
private void cancelAcquire(Node node) {
// 如果node為null,就直接返回
if (node == null)
return;
//
node.thread = null;
// 跳過那些已取消的節(jié)點丁频,在隊列中找到在node節(jié)點前面的第一次狀態(tài)不是已取消的節(jié)點
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 記錄pred原來的下一個節(jié)點杉允,用于CAS函數(shù)更新時使用
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 將node節(jié)點狀態(tài)設(shè)置為已取消Node.CANCELLED;
node.waitStatus = Node.CANCELLED;
// 如果node節(jié)點是隊列尾節(jié)點邑贴,那么就將pred節(jié)點設(shè)置為新的隊列尾節(jié)點
if (node == tail && compareAndSetTail(node, pred)) {
// 并且設(shè)置pred節(jié)點的下一個節(jié)點next為null
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
2.1.7 小結(jié)
- tryAcquire方法嘗試獲取獨占鎖,由子類實現(xiàn)
- acquireQueued方法叔磷,方法內(nèi)采用for死循環(huán)拢驾,先調(diào)用tryAcquire方法,嘗試獲取鎖改基,如果成功繁疤,則跳出循環(huán)方法返回。 如果失敗秕狰,就可能阻塞當(dāng)前線程稠腊。當(dāng)別的線程鎖釋放的時候,可能會喚醒這個線程鸣哀,然后再次進行循環(huán)判斷架忌,調(diào)用tryAcquire方法,嘗試獲取鎖我衬。
- 如果發(fā)生異常鳖昌,該線程被喚醒,所以要取消節(jié)點node的狀態(tài),因為節(jié)點node所在線程不是在阻塞狀態(tài)了畔勤。
注:還有其他獲取獨占鎖的方法将塑,例如doAcquireInterruptibly、doAcquireNanos都在文章最后的源碼解析中糕档,這里就不做解析了,具體原理都差不多拌喉。
2.2 釋放獨占鎖的方法
2.2.1 release方法
// 在獨占鎖模式下速那,釋放鎖的操作
public final boolean release(int arg) {
// 調(diào)用tryRelease方法,嘗試去釋放鎖尿背,由子類具體實現(xiàn)
if (tryRelease(arg)) {
Node h = head;
// 如果隊列頭節(jié)點的狀態(tài)不是0端仰,那么隊列中就可能存在需要喚醒的等待節(jié)點。
// 還記得我們在acquireQueued(final Node node, int arg)獲取鎖的方法中田藐,如果節(jié)點node沒有獲取到鎖荔烧,
// 那么我們會將節(jié)點node的前一個節(jié)點狀態(tài)設(shè)置為Node.SIGNAL,然后調(diào)用parkAndCheckInterrupt方法
// 將節(jié)點node所在線程阻塞汽久。
// 在這里就是通過unparkSuccessor方法鹤竭,進而調(diào)用LockSupport.unpark(s.thread)方法,喚醒被阻塞的線程
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- 調(diào)用tryRelease方法釋放鎖資源景醇,返回true表示鎖資源完全釋放了臀稚,返回false表示還持有鎖資源。
- 如果鎖資源完全被釋放了三痰,就要喚醒等待鎖資源的線程吧寺。調(diào)用unparkSuccessor方法喚醒一個等待線程
注:CLH隊列頭節(jié)點h為null窜管,表示隊列為空,沒有節(jié)點稚机。節(jié)點h的狀態(tài)是0微峰,表示沒有CLH隊列中沒有被阻塞的線程。
2.2.2 tryRelease方法
// 嘗試去釋放當(dāng)前線程持有的獨占鎖抒钱,立即返回蜓肆。如果返回true表示釋放鎖成功
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
如果子類想實現(xiàn)獨占鎖,則必須重寫這個方法谋币,否則拋出異常仗扬。作用是釋放當(dāng)前線程持有的鎖,返回true表示已經(jīng)完全釋放鎖資源蕾额,返回false早芭,表示還持有鎖資源。
注:對于獨占鎖來說诅蝶,同一時間只能有一個線程持有這個鎖退个,但是這個線程可以重復(fù)地獲取鎖,因為被鎖住的模塊调炬,再次進入另一個被這個鎖鎖住的模塊语盈,是允許的。這個就做可重入性缰泡,所以對于可重入的鎖釋放操作刀荒,也需要多次。
下面是ReentrantLock中Sync的tryRelease方法實現(xiàn)
protected final boolean tryRelease(int releases) {
// c表示新的鎖的記錄狀態(tài)
int c = getState() - releases;
// 如果當(dāng)前線程不是獨占鎖的線程棘钞,就拋出IllegalMonitorStateException異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 標(biāo)志是否可以釋放鎖
boolean free = false;
// 當(dāng)新的鎖的記錄狀態(tài)為0時缠借,表示可以釋放鎖
if (c == 0) {
free = true;
// 設(shè)置獨占鎖的線程為null
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2.2.3 unparkSuccessor方法
// 喚醒node節(jié)點的下一個非取消狀態(tài)的節(jié)點所在線程(即waitStatus<=0)
private void unparkSuccessor(Node node) {
// 獲取node節(jié)點的狀態(tài)
int ws = node.waitStatus;
// 如果小于0,就將狀態(tài)重新設(shè)置為0宜猜,表示這個node節(jié)點已經(jīng)完成了
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 下一個節(jié)點
Node s = node.next;
// 如果下一個節(jié)點為null泼返,或者狀態(tài)是已取消,那么就要尋找下一個非取消狀態(tài)的節(jié)點
if (s == null || s.waitStatus > 0) {
// 先將s設(shè)置為null姨拥,s不是非取消狀態(tài)的節(jié)點
s = null;
// 從隊列尾向前遍歷绅喉,直到遍歷到node節(jié)點
for (Node t = tail; t != null && t != node; t = t.prev)
// 因為是從后向前遍歷,所以不斷覆蓋找到的值垫毙,這樣才能得到node節(jié)點后下一個非取消狀態(tài)的節(jié)點
if (t.waitStatus <= 0)
s = t;
}
// 如果s不為null霹疫,表示存在非取消狀態(tài)的節(jié)點。那么調(diào)用LockSupport.unpark方法综芥,喚醒這個節(jié)點的線程
if (s != null)
LockSupport.unpark(s.thread);
}
這個方法的作用是喚醒node節(jié)點的下一個非取消狀態(tài)的節(jié)點所在線程。
- 將node節(jié)點的狀態(tài)設(shè)置為0
- 尋找到下一個非取消狀態(tài)的節(jié)點s
- 如果節(jié)點s不為null猎拨,則調(diào)用LockSupport.unpark(s.thread)方法喚醒s所在線程膀藐。
注:喚醒線程也是有順序的屠阻,就是添加到CLH隊列線程的順序。
2.2.4 小結(jié)
- 調(diào)用tryRelease方法去釋放當(dāng)前持有的鎖資源额各。
- 如果完全釋放了鎖資源国觉,那么就調(diào)用unparkSuccessor方法,去喚醒一個等待鎖的線程虾啦。
三. 共享鎖
共享鎖與獨占鎖相比麻诀,共享鎖可能被多個線程共同持有
3.1 獲取共享鎖的方法
3.1.1 acquireShared方法
// 獲取共享鎖
public final void acquireShared(int arg) {
// 嘗試去獲取共享鎖,如果返回值小于0表示獲取共享鎖失敗
if (tryAcquireShared(arg) < 0)
// 調(diào)用doAcquireShared方法去獲取共享鎖
doAcquireShared(arg);
}
調(diào)用tryAcquireShared方法嘗試獲取共享鎖傲醉,如果返回值小于0表示獲取共享鎖失敗.則繼續(xù)調(diào)用doAcquireShared方法獲取共享鎖蝇闭。
3.1.2 tryAcquireShared方法
// 嘗試去獲取共享鎖,立即返回硬毕。返回值大于等于0呻引,表示獲取共享鎖成功
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
如果子類想實現(xiàn)共享鎖,則必須重寫這個方法吐咳,否則拋出異常逻悠。作用是嘗試獲取共享鎖,返回值大于等于0韭脊,表示獲取共享鎖成功童谒。
下面是ReentrantReadWriteLock中Sync的tryReleaseShared方法實現(xiàn),這個我們會在ReentrantReadWriteLock章節(jié)中重點介紹的。
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 當(dāng)前線程是第一個獲取讀鎖(共享鎖)的線程
if (firstReader == current) {
// 將firstReaderHoldCount減一沪羔,如果就是1惠啄,那么表示該線程需要釋放讀鎖(共享鎖),
// 將firstReader設(shè)置為null
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
// 獲取當(dāng)前線程的HoldCounter變量
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
// 將rh變量的count減一任内,
int count = rh.count;
if (count <= 1) {
readHolds.remove();
// count <= 0表示當(dāng)前線程就沒有獲取到讀鎖(共享鎖)撵渡,這里釋放就拋出異常。
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
// 因為讀鎖是利用高16位儲存的死嗦,低16位的數(shù)據(jù)是要屏蔽的趋距,
// 所以這里減去SHARED_UNIT(65536),相當(dāng)于減一
// 表示一個讀鎖已經(jīng)釋放
int nextc = c - SHARED_UNIT;
// 利用CAS函數(shù)重新設(shè)置state值
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
3.1.3 doAcquireShared方法
/**
* 獲取共享鎖越除,獲取失敗节腐,則會阻塞當(dāng)前線程,直到獲取共享鎖返回
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
// 為當(dāng)前線程創(chuàng)建共享鎖節(jié)點node
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果節(jié)點node前一個節(jié)點是同步隊列頭節(jié)點摘盆。就會調(diào)用tryAcquireShared方法嘗試獲取共享鎖
if (p == head) {
int r = tryAcquireShared(arg);
// 如果返回值大于0翼雀,表示獲取共享鎖成功
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 如果節(jié)點p的狀態(tài)是Node.SIGNAL,就是調(diào)用parkAndCheckInterrupt方法阻塞當(dāng)前線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// failed為true孩擂,表示發(fā)生異常狼渊,
// 則將node節(jié)點的狀態(tài)設(shè)置成CANCELLED,表示node節(jié)點所在線程已取消,不需要喚醒了
if (failed)
cancelAcquire(node);
}
}
這個方法與獨占鎖的acquireQueued方法相比較狈邑,不同的有三點:
- doAcquireShared方法城须,調(diào)用addWaiter(Node.SHARED)方法,為當(dāng)前線程創(chuàng)建一個共享模式的節(jié)點node米苹。而acquireQueued方法是由外部傳遞來的糕伐。
- doAcquireShared方法沒有返回值,acquireQueued方法會返回布爾類型的值蘸嘶,是當(dāng)前線程中斷標(biāo)志位值
- 最大的區(qū)別是重新設(shè)置CLH隊列頭的方法不一樣良瞧。doAcquireShared方法調(diào)用setHeadAndPropagate方法,而acquireQueued方法調(diào)用setHead方法训唱。
3.1.4 setHeadAndPropagate方法
// 重新設(shè)置CLH隊列頭褥蚯,如果CLH隊列頭的下一個節(jié)點為null或者共享模式,
// 那么就要喚醒共享鎖上等待的線程
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 設(shè)置新的同步隊列頭head
setHead(node);
// 如果propagate大于0雪情,
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 獲取新的CLH隊列頭的下一個節(jié)點s
Node s = node.next;
// 如果節(jié)點s是空或者共享模式節(jié)點遵岩,那么就要喚醒共享鎖上等待的線程
if (s == null || s.isShared())
doReleaseShared();
}
}
3.2 釋放共享鎖的方法
3.2.1 releaseShared方法
// 釋放共享鎖
public final boolean releaseShared(int arg) {
// 嘗試釋放共享鎖
if (tryReleaseShared(arg)) {
// 喚醒等待共享鎖的線程
doReleaseShared();
return true;
}
return false;
}
3.2.2 tryReleaseShared方法
// 嘗試去釋放共享鎖
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
如果子類想實現(xiàn)共享鎖,則必須重寫這個方法巡通,否則拋出異常尘执。作用是釋放當(dāng)前線程持有的鎖,返回true表示已經(jīng)完全釋放鎖資源宴凉,返回false誊锭,表示還持有鎖資源。
3.2.3 doReleaseShared方法
// 會喚醒等待共享鎖的線程
private void doReleaseShared() {
for (;;) {
// 將同步隊列頭賦值給節(jié)點h
Node h = head;
// 如果節(jié)點h不為null弥锄,且不等于同步隊列尾
if (h != null && h != tail) {
// 得到節(jié)點h的狀態(tài)
int ws = h.waitStatus;
// 如果狀態(tài)是Node.SIGNAL丧靡,就要喚醒節(jié)點h后繼節(jié)點的線程
if (ws == Node.SIGNAL) {
// 將節(jié)點h的狀態(tài)設(shè)置成0,如果設(shè)置失敗籽暇,就繼續(xù)循環(huán)温治,再試一次。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 喚醒節(jié)點h后繼節(jié)點的線程
unparkSuccessor(h);
}
// 如果節(jié)點h的狀態(tài)是0戒悠,就設(shè)置ws的狀態(tài)是PROPAGATE熬荆。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果同步隊列頭head節(jié)點發(fā)生改變,繼續(xù)循環(huán)绸狐,
// 如果沒有改變卤恳,就跳出循環(huán)
if (h == head)
break;
}
}
四. Condition條件
Condition是為了實現(xiàn)線程之間相互等待的問題。注意Condition對象只能在獨占鎖中才能使用寒矿。
考慮一下情況突琳,有兩個線程,生產(chǎn)者線程符相,消費者線程拆融。當(dāng)消費者線程消費東西時,發(fā)現(xiàn)沒有東西,這時它就要等待冠息,讓生產(chǎn)者線程生產(chǎn)東西后挪凑,在通知它消費孕索。
因為操作的是同一個資源逛艰,所以要加鎖,防止多線程沖突搞旭。而鎖在同一時間只能有一個線程持有散怖,所以消費者在讓線程等待前,必須釋放鎖肄渗,且喚醒另一個等待鎖的線程镇眷。
那么在AQS中Condition條件又是如何實現(xiàn)的呢?
- 首先內(nèi)部存在一個Condition隊列翎嫡,存儲著所有在此Condition條件等待的線程欠动。
- await系列方法:讓當(dāng)前持有鎖的線程釋放鎖,并喚醒一個在CLH隊列上等待鎖的線程惑申,再為當(dāng)前線程創(chuàng)建一個node節(jié)點具伍,插入到Condition隊列(注意不是插入到CLH隊列中)
- signal系列方法:其實這里沒有喚醒任何線程,而是將Condition隊列上的等待節(jié)點插入到CLH隊列中圈驼,所以當(dāng)持有鎖的線程執(zhí)行完畢釋放鎖時人芽,就會喚醒CLH隊列中的一個線程,這個時候才會喚醒線程绩脆。
4.1 await系列方法
4.1.1 await方法
/**
* 讓當(dāng)前持有鎖的線程阻塞等待萤厅,并釋放鎖。如果有中斷請求靴迫,則拋出InterruptedException異常
* @throws InterruptedException
*/
public final void await() throws InterruptedException {
// 如果當(dāng)前線程中斷標(biāo)志位是true惕味,就拋出InterruptedException異常
if (Thread.interrupted())
throw new InterruptedException();
// 為當(dāng)前線程創(chuàng)建新的Node節(jié)點,并且將這個節(jié)點插入到Condition隊列中了
Node node = addConditionWaiter();
// 釋放當(dāng)前線程占有的鎖玉锌,并喚醒CLH隊列一個等待線程
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果節(jié)點node不在同步隊列中(注意不是Condition隊列)
while (!isOnSyncQueue(node)) {
// 阻塞當(dāng)前線程,那么怎么喚醒這個線程呢名挥?
// 首先我們必須調(diào)用signal或者signalAll將這個節(jié)點node加入到同步隊列。
// 只有這樣unparkSuccessor(Node node)方法芬沉,才有可能喚醒被阻塞的線程
LockSupport.park(this);
// 如果當(dāng)前線程產(chǎn)生中斷請求躺同,就跳出循環(huán)
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 如果節(jié)點node已經(jīng)在同步隊列中了,獲取同步鎖丸逸,只有得到鎖才能繼續(xù)執(zhí)行蹋艺,否則線程繼續(xù)阻塞等待
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清除Condition隊列中狀態(tài)不是Node.CONDITION的節(jié)點
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 是否要拋出異常,或者發(fā)出中斷請求
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
方法流程:
- addConditionWaiter方法:為當(dāng)前線程創(chuàng)建新的Node節(jié)點黄刚,并且將這個節(jié)點插入到Condition隊列中了
- fullyRelease方法:釋放當(dāng)前線程占有的鎖瓜晤,并喚醒CLH隊列一個等待線程
- isOnSyncQueue 方法:如果返回false帅韧,表示節(jié)點node不在CLH隊列中庐杨,即沒有調(diào)用過 signal系列方法罪治,所以調(diào)用LockSupport.park(this)方法阻塞當(dāng)前線程。
- 如果跳出while循環(huán)分预,表示節(jié)點node已經(jīng)在CLH隊列中,那么調(diào)用acquireQueued方法去獲取鎖。
- 清除Condition隊列中狀態(tài)不是Node.CONDITION的節(jié)點
4.1.2 addConditionWaiter方法
為當(dāng)前線程創(chuàng)建新的Node節(jié)點舒萎,并且將這個節(jié)點插入到Condition隊列中了
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果Condition隊列尾節(jié)點的狀態(tài)不是Node.CONDITION
if (t != null && t.waitStatus != Node.CONDITION) {
// 清除Condition隊列中,狀態(tài)不是Node.CONDITION的節(jié)點蹭沛,
// 并且可能會重新設(shè)置firstWaiter和lastWaiter
unlinkCancelledWaiters();
// 重新將Condition隊列尾賦值給t
t = lastWaiter;
}
// 為當(dāng)前線程創(chuàng)建一個狀態(tài)為Node.CONDITION的節(jié)點
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 如果t為null臂寝,表示Condition隊列為空,將node節(jié)點賦值給鏈表頭
if (t == null)
firstWaiter = node;
else
// 將新節(jié)點node插入到Condition隊列尾
t.nextWaiter = node;
// 將新節(jié)點node設(shè)置為新的Condition隊列尾
lastWaiter = node;
return node;
}
4.1.3 fullyRelease方法
釋放當(dāng)前線程占有的鎖摊灭,并喚醒CLH隊列一個等待線程
/**
* 釋放當(dāng)前線程占有的鎖咆贬,并喚醒CLH隊列一個等待線程
* 如果失敗就拋出異常,設(shè)置node節(jié)點的狀態(tài)是Node.CANCELLED
* @return
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 釋放當(dāng)前線程占有的鎖
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
4.1.4 isOnSyncQueue
節(jié)點node是不是在CLH隊列中
// 節(jié)點node是不是在CLH隊列中
final boolean isOnSyncQueue(Node node) {
// 如果node的狀態(tài)是Node.CONDITION帚呼,或者node沒有前一個節(jié)點prev掏缎,
// 那么返回false,節(jié)點node不在同步隊列中
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果node有下一個節(jié)點next煤杀,那么它一定在同步隊列中
if (node.next != null) // If has successor, it must be on queue
return true;
// 從同步隊列中查找節(jié)點node
return findNodeFromTail(node);
}
// 在同步隊列中從后向前查找節(jié)點node眷蜈,如果找到返回true,否則返回false
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
4.1.5 acquireQueued方法
獲取獨占鎖怜珍,這個在獨占鎖章節(jié)已經(jīng)說過
4.1.6 unlinkCancelledWaiters 方法
清除Condition隊列中狀態(tài)不是Node.CONDITION的節(jié)點
private void unlinkCancelledWaiters() {
// condition隊列頭賦值給t
Node t = firstWaiter;
// 這個trail節(jié)點端蛆,只是起輔助作用
Node trail = null;
while (t != null) {
//得到下一個節(jié)點next。當(dāng)節(jié)點是condition時候酥泛,nextWaiter表示condition隊列的下一個節(jié)點
Node next = t.nextWaiter;
// 如果節(jié)點t的狀態(tài)不是CONDITION今豆,那么該節(jié)點就要從condition隊列中移除
if (t.waitStatus != Node.CONDITION) {
// 將節(jié)點t的nextWaiter設(shè)置為null
t.nextWaiter = null;
// 如果trail為null,表示原先的condition隊列頭節(jié)點實效柔袁,需要設(shè)置新的condition隊列頭
if (trail == null)
firstWaiter = next;
else
// 將節(jié)點t從condition隊列中移除呆躲,因為改變了引用的指向,從condition隊列中已經(jīng)找不到節(jié)點t了
trail.nextWaiter = next;
// 如果next為null捶索,表示原先的condition隊列尾節(jié)點也實效插掂,重新設(shè)置隊列尾節(jié)點
if (next == null)
lastWaiter = trail;
}
else
// 遍歷到的有效節(jié)點
trail = t;
// 將next賦值給t,遍歷完整個condition隊列
t = next;
}
}
4.1.7 reportInterruptAfterWait方法
/**
* 如果interruptMode是THROW_IE腥例,就拋出InterruptedException異常
* 如果interruptMode是REINTERRUPT辅甥,則當(dāng)前線程再發(fā)出中斷請求
* 否則就什么都不做
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
4.2 signal系列方法
4.2.1 signal方法
// 如果condition隊列不為空,將condition隊列頭節(jié)點插入到同步隊列中
public final void signal() {
// 如果當(dāng)前線程不是獨占鎖線程燎竖,就拋出IllegalMonitorStateException異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 將Condition隊列頭賦值給節(jié)點first
Node first = firstWaiter;
if (first != null)
// 將Condition隊列中的first節(jié)點插入到CLH隊列中
doSignal(first);
}
如果condition隊列不為空璃弄,就調(diào)用doSignal方法將condition隊列頭節(jié)點插入到CLH隊列中。
4.2.2 doSignal方法
// 將Condition隊列中的first節(jié)點插入到CLH隊列中
private void doSignal(Node first) {
do {
// 原先的Condition隊列頭節(jié)點取消构回,所以重新賦值Condition隊列頭節(jié)點
// 如果新的Condition隊列頭節(jié)點為null夏块,表示Condition隊列為空了
// 疏咐,所以也要設(shè)置Condition隊列尾lastWaiter為null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 取消first節(jié)點nextWaiter引用
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
為什么使用while循環(huán),因為只有是Node.CONDITION狀態(tài)的節(jié)點才能插入CLH隊列脐供,如果不是這個狀態(tài)浑塞,那么循環(huán)Condition隊列下一個節(jié)點。
4.2.3 transferForSignal方法
// 返回true表示節(jié)點node插入到同步隊列中政己,返回false表示節(jié)點node沒有插入到同步隊列中
final boolean transferForSignal(Node node) {
// 如果節(jié)點node的狀態(tài)不是Node.CONDITION酌壕,或者更新狀態(tài)失敗,
// 說明該node節(jié)點已經(jīng)插入到同步隊列中匹颤,所以直接返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 將節(jié)點node插入到同步隊列中仅孩,p是原先同步隊列尾節(jié)點托猩,也是node節(jié)點的前一個節(jié)點
Node p = enq(node);
int ws = p.waitStatus;
// 如果前一個節(jié)點是已取消狀態(tài)印蓖,或者不能將它設(shè)置成Node.SIGNAL狀態(tài)。
// 就說明節(jié)點p之后也不會發(fā)起喚醒下一個node節(jié)點線程的操作京腥,
// 所以這里直接調(diào)用 LockSupport.unpark(node.thread)方法赦肃,喚醒節(jié)點node所在線程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
- 狀態(tài)不是 Node.CONDITION的節(jié)點,是不能從Condition隊列中插入到CLH隊列中公浪。直接返回false
- 調(diào)用enq方法他宛,將節(jié)點node插入到同步隊列中,p是原先同步隊列尾節(jié)點欠气,也是node節(jié)點的前一個節(jié)點
- 如果前一個節(jié)點是已取消狀態(tài)厅各,或者不能將它設(shè)置成Node.SIGNAL狀態(tài)。那么就要LockSupport.unpark(node.thread)方法喚醒node節(jié)點所在線程预柒。
4.2.4 signalAll 方法
// 將condition隊列中所有的節(jié)點都插入到同步隊列中
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
4.2.5 doSignalAll方法
/**
* 將condition隊列中所有的節(jié)點都插入到同步隊列中
* @param first condition隊列頭節(jié)點
*/
private void doSignalAll(Node first) {
// 表示將condition隊列設(shè)置為空
lastWaiter = firstWaiter = null;
do {
// 得到condition隊列的下一個節(jié)點
Node next = first.nextWaiter;
first.nextWaiter = null;
// 將節(jié)點first插入到同步隊列中
transferForSignal(first);
first = next;
// 循環(huán)遍歷condition隊列中所有的節(jié)點
} while (first != null);
}
循環(huán)遍歷整個condition隊列队塘,調(diào)用transferForSignal方法,將節(jié)點插入到CLH隊列中宜鸯。
4.3 小結(jié)
Condition只能使用在獨占鎖中憔古。它內(nèi)部有一個Condition隊列記錄所有在Condition條件等待的線程(即就是調(diào)用await系列方法后等待的線程).
await系列方法:會讓當(dāng)前線程釋放持有的鎖,并喚醒在CLH隊列上的一個等待鎖的線程淋袖,再將當(dāng)前線程插入到Condition隊列中(注意不是CLH隊列)
signal系列方法:并不是喚醒線程鸿市,而是將Condition隊列中的節(jié)點插入到CLH隊列中。
總結(jié)
使用AQS類來實現(xiàn)獨占鎖和共享鎖:
- 內(nèi)部有一個CLH隊列即碗,用來記錄所有等待鎖的線程
- 通過 acquire系列方法用來獲取獨占鎖焰情,獲取失敗,則阻塞當(dāng)前線程
- 通過release方法用來釋放獨占鎖剥懒,釋放成功内舟,則會喚醒一個等待獨占鎖的線程。
- 通過acquireShared系列方法用來獲取共享鎖蕊肥。
- 通過releaseShared方法用來釋放共享鎖谒获。
- 通過Condition來實現(xiàn)線程之間相互等待的蛤肌。
示例
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
// 實現(xiàn)一個可重入的獨占鎖, 必須復(fù)寫tryAcquire和tryRelease方法
class Sync extends AbstractQueuedSynchronizer {
// 嘗試獲取獨占鎖批狱, 利用鎖的獲取次數(shù)state屬性
@Override
protected boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 獲取鎖的記錄狀態(tài)state
int c = getState();
// 如果c==0表示當(dāng)前鎖是空閑的
if (c == 0) {
// 通過CAS原子操作方式設(shè)置鎖的狀態(tài)裸准,如果為true,表示當(dāng)前線程獲取的鎖赔硫,
// 為false炒俱,鎖的狀態(tài)被其他線程更改,當(dāng)前線程獲取的鎖失敗
if (compareAndSetState(0, acquires)) {
// 設(shè)置當(dāng)前線程為獨占鎖的線程
setExclusiveOwnerThread(current);
return true;
}
}
// 判斷當(dāng)前線程是不是獨占鎖的線程爪膊,因為是可重入鎖
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 更改鎖的記錄狀態(tài)
setState(nextc);
return true;
}
return false;
}
// 釋放持有的獨占鎖权悟, 因為是可重入鎖,所以只有當(dāng)c等于0的時候推盛,表示當(dāng)前持有鎖的完全釋放了鎖峦阁。
@Override
protected boolean tryRelease(int releases) {
// c表示新的鎖的記錄狀態(tài)
int c = getState() - releases;
// 如果當(dāng)前線程不是獨占鎖的線程,就拋出IllegalMonitorStateException異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 標(biāo)志是否可以釋放鎖
boolean free = false;
// 當(dāng)新的鎖的記錄狀態(tài)為0時耘成,表示可以釋放鎖
if (c == 0) {
free = true;
// 設(shè)置獨占鎖的線程為null
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
public class AQSTest {
public static void newThread(Sync sync, String name, int time) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("線程"+Thread.currentThread().getName()+" 開始運行榔昔,準(zhǔn)備獲取鎖");
// 通過acquire方法,獲取鎖瘪菌,如果沒有獲取到撒会,就等待
sync.acquire(1);
try {
System.out.println("====線程"+Thread.currentThread().getName()+" 在run方法中獲取了鎖");
lockAgain();
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
System.out.println("----線程"+Thread.currentThread().getName()+" 在run方法中釋放了鎖");
sync.release(1);
}
}
private void lockAgain() {
sync.acquire(1);
try {
System.out.println("====線程"+Thread.currentThread().getName()+" 在lockAgain方法中再次獲取了鎖");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
System.out.println("----線程"+Thread.currentThread().getName()+" 在lockAgain方法中釋放了鎖");
sync.release(1);
}
}
},name).start();
}
public static void main(String[] args) {
Sync sync = new Sync();
newThread(sync, "t1111", 1000);
newThread(sync, "t2222", 1000);
newThread(sync, "t3333", 1000);
}
}
利用AbstractQueuedSynchronizer簡單實現(xiàn)一個可重入的獨占鎖。
- 要實現(xiàn)獨占鎖师妙,必須重寫tryAcquire和tryRelease方法诵肛。否則在獲取鎖和釋放鎖的時候,會拋出異常默穴。
- 直接的調(diào)用AQS類的acquire(1)和release(1)方法獲取鎖和釋放鎖怔檩。
輸出結(jié)果是
線程t1111 開始運行,準(zhǔn)備獲取鎖
====線程t1111 在run方法中獲取了鎖
====線程t1111 在lockAgain方法中再次獲取了鎖
線程t2222 開始運行壁顶,準(zhǔn)備獲取鎖
線程t3333 開始運行珠洗,準(zhǔn)備獲取鎖
----線程t1111 在lockAgain方法中釋放了鎖
----線程t1111 在run方法中釋放了鎖
====線程t2222 在run方法中獲取了鎖
====線程t2222 在lockAgain方法中再次獲取了鎖
----線程t2222 在lockAgain方法中釋放了鎖
----線程t2222 在run方法中釋放了鎖
====線程t3333 在run方法中獲取了鎖
====線程t3333 在lockAgain方法中再次獲取了鎖
----線程t3333 在lockAgain方法中釋放了鎖
----線程t3333 在run方法中釋放了鎖
附錄
package java.util.concurrent.locks;
import sun.misc.Unsafe;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
protected AbstractQueuedSynchronizer() { }
static final class Node {
// 共享模式的標(biāo)記
static final Node SHARED = new Node();
// 獨占模式的標(biāo)記
static final Node EXCLUSIVE = null;
// waitStatus變量的值,標(biāo)志著線程被取消
static final int CANCELLED = 1;
// waitStatus變量的值若专,標(biāo)志著后繼線程(即隊列中此節(jié)點之后的節(jié)點)需要被阻塞.(用于獨占鎖)
static final int SIGNAL = -1;
// waitStatus變量的值许蓖,標(biāo)志著線程在Condition條件上等待阻塞.(用于Condition的await等待)
static final int CONDITION = -2;
// waitStatus變量的值,標(biāo)志著下一個acquireShared方法線程應(yīng)該被允許调衰。(用于共享鎖)
static final int PROPAGATE = -3;
// 標(biāo)記著當(dāng)前節(jié)點的狀態(tài)膊爪,默認(rèn)狀態(tài)是0, 小于0的狀態(tài)都是有特殊作用,大于0的狀態(tài)表示已取消
volatile int waitStatus;
// prev和next實現(xiàn)一個雙向鏈表
volatile Node prev;
volatile Node next;
// 該節(jié)點擁有的線程
volatile Thread thread;
// 可能有兩種作用:1. 表示下一個在Condition條件上等待的節(jié)點
// 2. 表示是共享模式或者獨占模式嚎莉,注意第一種情況節(jié)點一定是共享模式
Node nextWaiter;
// 是不是共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
// 返回前一個節(jié)點prev米酬,如果為null,則拋出NullPointerException異常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 用于創(chuàng)建鏈表頭head趋箩,或者共享模式SHARED
Node() {
}
// 使用在addWaiter方法中
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// 使用在Condition條件中
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
// CLH隊列頭
private transient volatile Node head;
// CLH隊列尾
private transient volatile Node tail;
// 用來記錄當(dāng)前鎖被獲取的次數(shù)赃额,當(dāng)state==0加派,表示還沒有被任何線程獲取
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
// 采用CAS函數(shù)。比較并交換函數(shù)跳芳,它是原子操作函數(shù)芍锦;即,通過CAS操作的數(shù)據(jù)都是以原子方式進行的
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// Queuing utilities
static final long spinForTimeoutThreshold = 1000L;
// 向隊列尾插入新節(jié)點飞盆,如果隊列沒有初始化娄琉,就先初始化。返回原先的隊列尾節(jié)點
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// t為null吓歇,表示隊列為空孽水,先初始化隊列
if (t == null) {
// 采用CAS函數(shù)即原子操作方式,設(shè)置隊列頭head值城看。
// 如果成功女气,再將head值賦值給鏈表尾tail。如果失敗析命,表示head值已經(jīng)被其他線程主卫,那么就進入循環(huán)下一次
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 新添加的node節(jié)點的前一個節(jié)點prev指向原來的隊列尾tail
node.prev = t;
// 采用CAS函數(shù)即原子操作方式,設(shè)置新隊列尾tail值鹃愤。
if (compareAndSetTail(t, node)) {
// 設(shè)置老的隊列尾tail的下一個節(jié)點next指向新添加的節(jié)點node
t.next = node;
return t;
}
}
}
}
// 通過給定的模式mode(獨占或者共享)為當(dāng)前線程創(chuàng)建新節(jié)點,并插入隊列中
private Node addWaiter(Node mode) {
// 為當(dāng)前線程創(chuàng)建新的節(jié)點
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 如果隊列已經(jīng)創(chuàng)建完域,就將新節(jié)點插入隊列尾软吐。
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果隊列沒有創(chuàng)建,通過enq方法創(chuàng)建隊列吟税,并插入新的節(jié)點凹耙。
enq(node);
return node;
}
// 重新設(shè)置隊列頭head,它只在acquire系列的方法中調(diào)用
private void setHead(Node node) {
head = node;
// 線程也沒有意義了肠仪,因為該線程已經(jīng)獲取到鎖了
node.thread = null;
// 前一個節(jié)點已經(jīng)沒有意義了
node.prev = null;
}
// 喚醒node節(jié)點的下一個非取消狀態(tài)的節(jié)點所在線程(即waitStatus<=0)
private void unparkSuccessor(Node node) {
// 獲取node節(jié)點的狀態(tài)
int ws = node.waitStatus;
// 如果小于0肖抱,就將狀態(tài)重新設(shè)置為0,表示這個node節(jié)點已經(jīng)完成了
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 下一個節(jié)點
Node s = node.next;
// 如果下一個節(jié)點為null异旧,或者狀態(tài)是已取消意述,那么就要尋找下一個非取消狀態(tài)的節(jié)點
if (s == null || s.waitStatus > 0) {
// 先將s設(shè)置為null,s不是非取消狀態(tài)的節(jié)點
s = null;
// 從隊列尾向前遍歷吮蛹,直到遍歷到node節(jié)點
for (Node t = tail; t != null && t != node; t = t.prev)
// 因為是從后向前遍歷荤崇,所以不斷覆蓋找到的值,這樣才能得到node節(jié)點后下一個非取消狀態(tài)的節(jié)點
if (t.waitStatus <= 0)
s = t;
}
// 如果s不為null潮针,表示存在非取消狀態(tài)的節(jié)點术荤。那么調(diào)用LockSupport.unpark方法,喚醒這個節(jié)點的線程
if (s != null)
LockSupport.unpark(s.thread);
}
// 會喚醒等待共享鎖的線程
private void doReleaseShared() {
for (;;) {
// 將同步隊列頭賦值給節(jié)點h
Node h = head;
// 如果節(jié)點h不為null每篷,且不等于同步隊列尾
if (h != null && h != tail) {
// 得到節(jié)點h的狀態(tài)
int ws = h.waitStatus;
// 如果狀態(tài)是Node.SIGNAL瓣戚,就要喚醒節(jié)點h后繼節(jié)點的線程
if (ws == Node.SIGNAL) {
// 將節(jié)點h的狀態(tài)設(shè)置成0端圈,如果設(shè)置失敗,就繼續(xù)循環(huán)子库,再試一次枫笛。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 喚醒節(jié)點h后繼節(jié)點的線程
unparkSuccessor(h);
}
// 如果節(jié)點h的狀態(tài)是0,就設(shè)置ws的狀態(tài)是PROPAGATE刚照。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果同步隊列頭head節(jié)點發(fā)生改變刑巧,繼續(xù)循環(huán),
// 如果沒有改變无畔,就跳出循環(huán)
if (h == head)
break;
}
}
// 重新設(shè)置CLH隊列頭啊楚,如果CLH隊列頭的下一個節(jié)點為null或者共享模式,
// 那么就要喚醒共享鎖上等待的線程
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 設(shè)置新的同步隊列頭head
setHead(node);
// 如果propagate大于0浑彰,
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 獲取新的CLH隊列頭的下一個節(jié)點s
Node s = node.next;
// 如果節(jié)點s是空或者共享模式節(jié)點恭理,那么就要喚醒共享鎖上等待的線程
if (s == null || s.isShared())
doReleaseShared();
}
}
// 則將node節(jié)點的狀態(tài)設(shè)置成CANCELLED,表示node節(jié)點所在線程已取消郭变,不需要喚醒了颜价。
private void cancelAcquire(Node node) {
// 如果node為null,就直接返回
if (node == null)
return;
//
node.thread = null;
// 跳過那些已取消的節(jié)點诉濒,在隊列中找到在node節(jié)點前面的第一次狀態(tài)不是已取消的節(jié)點
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 記錄pred原來的下一個節(jié)點周伦,用于CAS函數(shù)更新時使用
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 將node節(jié)點狀態(tài)設(shè)置為已取消Node.CANCELLED;
node.waitStatus = Node.CANCELLED;
// 如果node節(jié)點是隊列尾節(jié)點,那么就將pred節(jié)點設(shè)置為新的隊列尾節(jié)點
if (node == tail && compareAndSetTail(node, pred)) {
// 并且設(shè)置pred節(jié)點的下一個節(jié)點next為null
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
/**
* 根據(jù)前一個節(jié)點pred的狀態(tài)未荒,來判斷當(dāng)前線程是否應(yīng)該被阻塞
* @param pred : node節(jié)點的前一個節(jié)點
* @param node
* @return 返回true 表示當(dāng)前線程應(yīng)該被阻塞专挪,之后應(yīng)該會調(diào)用parkAndCheckInterrupt方法來阻塞當(dāng)前線程
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果前一個pred的狀態(tài)是Node.SIGNAL,那么直接返回true片排,當(dāng)前線程應(yīng)該被阻塞
return true;
if (ws > 0) {
// 如果前一個節(jié)點狀態(tài)是Node.CANCELLED(大于0就是CANCELLED)寨腔,
// 表示前一個節(jié)點所在線程已經(jīng)被喚醒了,要從CLH隊列中移除CANCELLED的節(jié)點率寡。
// 所以從pred節(jié)點一直向前查找直到找到不是CANCELLED狀態(tài)的節(jié)點迫卢,并把它賦值給node.prev,
// 表示node節(jié)點的前一個節(jié)點已經(jīng)改變冶共。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 此時前一個節(jié)點pred的狀態(tài)只能是0或者PROPAGATE别惦,不可能是CONDITION狀態(tài)
// CONDITION(這個是特殊狀態(tài)遥皂,只在condition列表中節(jié)點中存在筷厘,CLH隊列中不存在這個狀態(tài)的節(jié)點)
// 將前一個節(jié)點pred的狀態(tài)設(shè)置成Node.SIGNAL驼修,這樣在下一次循環(huán)時,就是直接阻塞當(dāng)前線程
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* 當(dāng)前線程發(fā)出中斷通知
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
/**
* 阻塞當(dāng)前線程命咐,線程被喚醒后返回當(dāng)前線程中斷狀態(tài)
*/
private final boolean parkAndCheckInterrupt() {
// 通過LockSupport.park方法篡九,阻塞當(dāng)前線程
LockSupport.park(this);
// 當(dāng)前線程被喚醒后,返回當(dāng)前線程中斷狀態(tài)
return Thread.interrupted();
}
/**
* 想要獲取鎖的 acquire系列方法醋奠,都會這個方法來獲取鎖
* 循環(huán)通過tryAcquire方法不斷去獲取鎖榛臼,如果沒有獲取成功伊佃,就有可能調(diào)用parkAndCheckInterrupt方法,讓當(dāng)前線程阻塞
* @param node 想要獲取鎖的節(jié)點
* @param arg
* @return 返回true沛善,表示在線程等待的過程中航揉,線程被中斷了
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 表示線程在等待過程中,是否被中斷了
boolean interrupted = false;
// 通過死循環(huán)金刁,直到node節(jié)點的線程獲取到鎖帅涂,才返回
for (;;) {
// 獲取node的前一個節(jié)點
final Node p = node.predecessor();
// 如果前一個節(jié)點是隊列頭head,并且嘗試獲取鎖成功
// 那么當(dāng)前線程就不需要阻塞等待尤蛮,繼續(xù)執(zhí)行
if (p == head && tryAcquire(arg)) {
// 將節(jié)點node設(shè)置為新的隊列頭
setHead(node);
// help GC
p.next = null;
// 不需要調(diào)用cancelAcquire方法
failed = false;
return interrupted;
}
// 當(dāng)p節(jié)點的狀態(tài)是Node.SIGNAL時媳友,就會調(diào)用parkAndCheckInterrupt方法,阻塞node線程
// node線程被阻塞产捞,有兩種方式喚醒醇锚,
// 1.是在unparkSuccessor(Node node)方法,會喚醒被阻塞的node線程坯临,返回false
// 2.node線程被調(diào)用了interrupt方法焊唬,線程被喚醒,返回true
// 在這里只是簡單地將interrupted = true看靠,沒有跳出for的死循環(huán)赶促,繼續(xù)嘗試獲取鎖
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// failed為true,表示發(fā)生異常衷笋,
// 則將node節(jié)點的狀態(tài)設(shè)置成CANCELLED芳杏,表示node節(jié)點所在線程已取消,不需要喚醒了辟宗。
if (failed)
cancelAcquire(node);
}
}
/**
* 這個方法與acquireQueued(final Node node, int arg)方法流程幾乎一樣
* 只不過當(dāng)parkAndCheckInterrupt返回true時,直接拋出異常吝秕。
* @param arg
* @throws InterruptedException
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
// 為當(dāng)前線程創(chuàng)建節(jié)點node泊脐,并插入到隊列中
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 通過死循環(huán),直到node節(jié)點的線程獲取到鎖烁峭,或者當(dāng)前線程有中斷請求會拋出中斷異常
for (;;) {
final Node p = node.predecessor();
// 如果前一個節(jié)點是隊列頭head容客,并且嘗試獲取鎖成功
// 將該節(jié)點node設(shè)置成隊列頭head
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 當(dāng)p節(jié)點的狀態(tài)是Node.SIGNAL時,就會調(diào)用parkAndCheckInterrupt方法约郁,阻塞node線程
// node線程被阻塞缩挑,有兩種方式喚醒,
// 1.是在unparkSuccessor(Node node)方法鬓梅,會喚醒被阻塞的node線程供置,返回false
// 2.node線程被調(diào)用了interrupt方法,線程被喚醒绽快,返回true
// 在這里如果parkAndCheckInterrupt返回true芥丧,就會拋出InterruptedException異常
// 跳出死循環(huán)紧阔,方法返回
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 嘗試在一定的時間nanosTimeout內(nèi)獲取鎖,超時了就返回false
*
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 計算截止時間
final long deadline = System.nanoTime() + nanosTimeout;
// 為當(dāng)前線程創(chuàng)建節(jié)點node续担,并插入到隊列中
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 如果前一個節(jié)點是隊列頭head擅耽,并且嘗試獲取鎖成功
// 將該節(jié)點node設(shè)置成隊列頭head
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 計算剩余時間
nanosTimeout = deadline - System.nanoTime();
// 剩余時間小于等于0,就直接返回false物遇,獲取鎖失敗
if (nanosTimeout <= 0L)
return false;
// 當(dāng)p節(jié)點的狀態(tài)是Node.SIGNAL時乖仇,調(diào)用LockSupport.parkNanos阻塞當(dāng)前線程
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 當(dāng)前線程阻塞nanosTimeout時間
LockSupport.parkNanos(this, nanosTimeout);
// 如果當(dāng)前線程中斷標(biāo)志位是true,拋出InterruptedException異常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
// failed為true询兴,表示發(fā)生異常乃沙,
// 則將node節(jié)點的狀態(tài)設(shè)置成CANCELLED,表示node節(jié)點所在線程已取消蕉朵,不需要喚醒了
if (failed)
cancelAcquire(node);
}
}
/**
* 獲取共享鎖崔涂,獲取失敗,則會阻塞當(dāng)前線程始衅,直到獲取共享鎖返回
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
// 為當(dāng)前線程創(chuàng)建共享鎖節(jié)點node
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果節(jié)點node前一個節(jié)點是同步隊列頭節(jié)點冷蚂。就會調(diào)用tryAcquireShared方法嘗試獲取共享鎖
if (p == head) {
int r = tryAcquireShared(arg);
// 如果返回值大于0,表示獲取共享鎖成功
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 如果節(jié)點p的狀態(tài)是Node.SIGNAL汛闸,就是調(diào)用parkAndCheckInterrupt方法阻塞當(dāng)前線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// failed為true蝙茶,表示發(fā)生異常,
// 則將node節(jié)點的狀態(tài)設(shè)置成CANCELLED诸老,表示node節(jié)點所在線程已取消隆夯,不需要喚醒了
if (failed)
cancelAcquire(node);
}
}
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// failed為true,表示發(fā)生異常别伏,
// 則將node節(jié)點的狀態(tài)設(shè)置成CANCELLED蹄衷,表示node節(jié)點所在線程已取消,不需要喚醒了
if (failed)
cancelAcquire(node);
}
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// Main exported methods
// 嘗試去獲取獨占鎖厘肮,立即返回愧口。如果返回true表示獲取鎖成功。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 嘗試去釋放當(dāng)前線程持有的獨占鎖类茂,立即返回耍属。如果返回true表示釋放鎖成功
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 嘗試去獲取共享鎖,立即返回巩检。返回值大于等于0厚骗,表示獲取共享鎖成功
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 嘗試去釋放共享鎖
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
/**
* 獲取獨占鎖。如果沒有獲取到兢哭,線程就會阻塞等待领舰,直到獲取鎖。不會響應(yīng)中斷異常
* @param arg
*/
public final void acquire(int arg) {
// 1. 先調(diào)用tryAcquire方法,嘗試獲取獨占鎖提揍,返回true啤月,表示獲取到鎖,不需要執(zhí)行acquireQueued方法劳跃。
// 2. 調(diào)用acquireQueued方法谎仲,先調(diào)用addWaiter方法為當(dāng)前線程創(chuàng)建一個節(jié)點node,并插入隊列中刨仑,
// 然后調(diào)用acquireQueued方法去獲取鎖郑诺,如果不成功,就會讓當(dāng)前線程阻塞杉武,當(dāng)鎖釋放時才會被喚醒辙诞。
// acquireQueued方法返回值表示在線程等待過程中,是否有另一個線程調(diào)用該線程的interrupt方法轻抱,發(fā)起中斷飞涂。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// public final void acquire(int arg) {
// // 1.先調(diào)用tryAcquire方法,嘗試獲取獨占鎖祈搜,返回true則直接返回
// if (tryAcquire(arg)) return;
// // 2. 調(diào)用addWaiter方法為當(dāng)前線程創(chuàng)建一個節(jié)點node较店,并插入隊列中
// Node node = addWaiter(Node.EXCLUSIVE);
// // 調(diào)用acquireQueued方法去獲取鎖,
// // acquireQueued方法返回值表示在線程等待過程中容燕,是否有另一個線程調(diào)用該線程的interrupt方法梁呈,發(fā)起中斷。
// boolean interrupted = acquireQueued(node, arg);
// // 如果interrupted為true蘸秘,則當(dāng)前線程要發(fā)起中斷請求
// if (interrupted) {
// selfInterrupt();
// }
// }
/**
* 獲取獨占鎖官卡。如果沒有獲取到,線程就會阻塞等待醋虏,直到獲取鎖寻咒。如果有中斷請求就會產(chǎn)生中斷異常
* @param arg
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 如果當(dāng)前線程中斷標(biāo)志位是true,那么直接拋出中斷異常
if (Thread.interrupted())
throw new InterruptedException();
// 先調(diào)用tryAcquire方法颈嚼,嘗試獲取獨占鎖仔涩,返回true,表示獲取到鎖粘舟,不需要執(zhí)行doAcquireInterruptibly方法。
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/**
* 嘗試在一定時間內(nèi)獲取獨占鎖佩研。如果有中斷請求就會產(chǎn)生中斷異常柑肴。
* 返回true,表示獲取鎖成功
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 如果當(dāng)前線程中斷標(biāo)志位是true旬薯,那么直接拋出中斷異常
if (Thread.interrupted())
throw new InterruptedException();
// 先調(diào)用tryAcquire方法晰骑,嘗試獲取獨占鎖,返回true,表示獲取到鎖硕舆。
// 否則調(diào)用doAcquireNanos方法獲取鎖
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
// 在獨占鎖模式下秽荞,釋放鎖的操作
public final boolean release(int arg) {
// 調(diào)用tryRelease方法,嘗試去釋放鎖抚官,由子類具體實現(xiàn)
if (tryRelease(arg)) {
Node h = head;
// 如果隊列頭節(jié)點的狀態(tài)不是0扬跋,那么隊列中就可能存在需要喚醒的等待節(jié)點。
// 還記得我們在acquireQueued(final Node node, int arg)獲取鎖的方法中凌节,如果節(jié)點node沒有獲取到鎖钦听,
// 那么我們會將節(jié)點node的前一個節(jié)點狀態(tài)設(shè)置為Node.SIGNAL,然后調(diào)用parkAndCheckInterrupt方法
// 將節(jié)點node所在線程阻塞倍奢。
// 在這里就是通過unparkSuccessor方法朴上,進而調(diào)用LockSupport.unpark(s.thread)方法,喚醒被阻塞的線程
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 獲取共享鎖
public final void acquireShared(int arg) {
// 嘗試去獲取共享鎖卒煞,如果返回值小于0表示獲取共享鎖失敗
if (tryAcquireShared(arg) < 0)
// 調(diào)用doAcquireShared方法去獲取共享鎖
doAcquireShared(arg);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
// 釋放共享鎖
public final boolean releaseShared(int arg) {
// 嘗試釋放共享鎖
if (tryReleaseShared(arg)) {
// 喚醒等待共享鎖的線程
doReleaseShared();
return true;
}
return false;
}
// Queue inspection methods
public final boolean hasQueuedThreads() {
return head != tail;
}
public final boolean hasContended() {
return head != null;
}
public final Thread getFirstQueuedThread() {
// handle only fast path, else relay
return (head == tail) ? null : fullGetFirstQueuedThread();
}
/**
* Version of getFirstQueuedThread called when fastpath fails
*/
private Thread fullGetFirstQueuedThread() {
Node h, s;
Thread st;
if (((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null) ||
((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null))
return st;
Node t = tail;
Thread firstThread = null;
while (t != null && t != head) {
Thread tt = t.thread;
if (tt != null)
firstThread = tt;
t = t.prev;
}
return firstThread;
}
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)
if (p.thread == thread)
return true;
return false;
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
// 返回false表示隊列為null痪宰,或者當(dāng)前線程節(jié)點在是隊列頭的下一個節(jié)點。
// 返回true表示有一個線程節(jié)點在當(dāng)前線程節(jié)點之前
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
// Instrumentation and monitoring methods
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
if (p.thread != null)
++n;
}
return n;
}
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (!p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
public String toString() {
int s = getState();
String q = hasQueuedThreads() ? "non" : "";
return super.toString() +
"[State = " + s + ", " + q + "empty queue]";
}
// Internal support methods for Conditions
// 節(jié)點node是不是在CLH隊列中
final boolean isOnSyncQueue(Node node) {
// 如果node的狀態(tài)是Node.CONDITION畔裕,或者node沒有前一個節(jié)點prev衣撬,
// 那么返回false,節(jié)點node不在同步隊列中
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果node有下一個節(jié)點next柴钻,那么它一定在同步隊列中
if (node.next != null) // If has successor, it must be on queue
return true;
// 從同步隊列中查找節(jié)點node
return findNodeFromTail(node);
}
// 在同步隊列中從后向前查找節(jié)點node淮韭,如果找到返回true,否則返回false
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
// 返回true表示節(jié)點node插入到同步隊列中贴届,返回false表示節(jié)點node沒有插入到同步隊列中
final boolean transferForSignal(Node node) {
// 如果節(jié)點node的狀態(tài)不是Node.CONDITION靠粪,或者更新狀態(tài)失敗,
// 說明該node節(jié)點已經(jīng)插入到同步隊列中毫蚓,所以直接返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 將節(jié)點node插入到同步隊列中占键,p是原先同步隊列尾節(jié)點,也是node節(jié)點的前一個節(jié)點
Node p = enq(node);
int ws = p.waitStatus;
// 如果前一個節(jié)點是已取消狀態(tài)元潘,或者不能將它設(shè)置成Node.SIGNAL狀態(tài)畔乙。
// 就說明節(jié)點p之后也不會發(fā)起喚醒下一個node節(jié)點線程的操作,
// 所以這里直接調(diào)用 LockSupport.unpark(node.thread)方法翩概,喚醒節(jié)點node所在線程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
// 等待節(jié)點node存放到同步隊列中牲距,如果是當(dāng)前線程插入的,返回true钥庇,如果是另一個線程插入的牍鞠,返回false。
final boolean transferAfterCancelledWait(Node node) {
// 當(dāng)節(jié)點node狀態(tài)還是Node.CONDITION评姨,改變它的狀態(tài)是0难述,然后插入到同步隊列中。
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
// 如果節(jié)點node不在同步隊列中,當(dāng)前線程讓出執(zhí)行權(quán)
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
/**
* 釋放當(dāng)前線程占有的鎖胁后,并喚醒CLH隊列一個等待線程
* 如果失敗就拋出異常店读,設(shè)置node節(jié)點的狀態(tài)是Node.CANCELLED
* @return
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 釋放當(dāng)前線程占有的鎖
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// Instrumentation methods for conditions
public final boolean owns(ConditionObject condition) {
return condition.isOwnedBy(this);
}
public final boolean hasWaiters(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.hasWaiters();
}
public final int getWaitQueueLength(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitQueueLength();
}
public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitingThreads();
}
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** Condition隊列頭. */
private transient Node firstWaiter;
/** Condition隊列尾. */
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
// Internal methods
/**
* 為當(dāng)前線程創(chuàng)建新的Node節(jié)點,并且將這個節(jié)點插入到Condition隊列中了
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果Condition隊列尾節(jié)點的狀態(tài)不是Node.CONDITION
if (t != null && t.waitStatus != Node.CONDITION) {
// 清除Condition隊列中攀芯,狀態(tài)不是Node.CONDITION的節(jié)點屯断,
// 并且可能會重新設(shè)置firstWaiter和lastWaiter
unlinkCancelledWaiters();
// 重新將Condition隊列尾賦值給t
t = lastWaiter;
}
// 為當(dāng)前線程創(chuàng)建一個狀態(tài)為Node.CONDITION的節(jié)點
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 如果t為null,表示Condition隊列為空敲才,將node節(jié)點賦值給鏈表頭
if (t == null)
firstWaiter = node;
else
// 將新節(jié)點node插入到Condition隊列尾
t.nextWaiter = node;
// 將新節(jié)點node設(shè)置為新的Condition隊列尾
lastWaiter = node;
return node;
}
// 將Condition隊列中的first節(jié)點插入到CLH隊列中
private void doSignal(Node first) {
do {
// 原先的Condition隊列頭節(jié)點取消裹纳,所以重新賦值Condition隊列頭節(jié)點
// 如果新的Condition隊列頭節(jié)點為null,表示Condition隊列為空了
// 紧武,所以也要設(shè)置Condition隊列尾lastWaiter為null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 取消first節(jié)點nextWaiter引用
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* 將condition隊列中所有的節(jié)點都插入到同步隊列中
* @param first condition隊列頭節(jié)點
*/
private void doSignalAll(Node first) {
// 表示將condition隊列設(shè)置為空
lastWaiter = firstWaiter = null;
do {
// 得到condition隊列的下一個節(jié)點
Node next = first.nextWaiter;
first.nextWaiter = null;
// 將節(jié)點first插入到同步隊列中
transferForSignal(first);
first = next;
// 循環(huán)遍歷condition隊列中所有的節(jié)點
} while (first != null);
}
// 清除Condition隊列中狀態(tài)不是Node.CONDITION的節(jié)點
private void unlinkCancelledWaiters() {
// condition隊列頭賦值給t
Node t = firstWaiter;
// 這個trail節(jié)點剃氧,只是起輔助作用
Node trail = null;
while (t != null) {
//得到下一個節(jié)點next。當(dāng)節(jié)點是condition時候阻星,nextWaiter表示condition隊列的下一個節(jié)點
Node next = t.nextWaiter;
// 如果節(jié)點t的狀態(tài)不是CONDITION朋鞍,那么該節(jié)點就要從condition隊列中移除
if (t.waitStatus != Node.CONDITION) {
// 將節(jié)點t的nextWaiter設(shè)置為null
t.nextWaiter = null;
// 如果trail為null,表示原先的condition隊列頭節(jié)點實效妥箕,需要設(shè)置新的condition隊列頭
if (trail == null)
firstWaiter = next;
else
// 將節(jié)點t從condition隊列中移除滥酥,因為改變了引用的指向,從condition隊列中已經(jīng)找不到節(jié)點t了
trail.nextWaiter = next;
// 如果next為null畦幢,表示原先的condition隊列尾節(jié)點也實效坎吻,重新設(shè)置隊列尾節(jié)點
if (next == null)
lastWaiter = trail;
}
else
// 遍歷到的有效節(jié)點
trail = t;
// 將next賦值給t,遍歷完整個condition隊列
t = next;
}
}
// public methods
// 如果condition隊列不為空宇葱,將condition隊列頭節(jié)點插入到同步隊列中
public final void signal() {
// 如果當(dāng)前線程不是獨占鎖線程瘦真,就拋出IllegalMonitorStateException異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 將Condition隊列頭賦值給節(jié)點first
Node first = firstWaiter;
if (first != null)
// 將Condition隊列中的first節(jié)點插入到CLH隊列中
doSignal(first);
}
// 將condition隊列中所有的節(jié)點都插入到同步隊列中
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 讓當(dāng)前持有鎖的線程阻塞等待,并釋放鎖黍瞧。如果線程等待期間發(fā)出中斷請求诸尽,不會產(chǎn)生中斷異常
public final void awaitUninterruptibly() {
// 為當(dāng)前線程創(chuàng)建新的Node節(jié)點,并且將這個節(jié)點插入到Condition隊列中了
Node node = addConditionWaiter();
// 釋放當(dāng)前線程占有的鎖印颤,并喚醒其他線程
int savedState = fullyRelease(node);
boolean interrupted = false;
// 如果節(jié)點node不在同步隊列中(注意不是Condition隊列)您机,就阻塞當(dāng)前線程
while (!isOnSyncQueue(node)) {
// 阻塞當(dāng)前線程
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
// 如果節(jié)點node已經(jīng)在同步隊列中了,獲取同步鎖年局,只有得到鎖才能繼續(xù)執(zhí)行际看,否則線程繼續(xù)阻塞等待
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
/**
* 如果線程沒有發(fā)起了中斷請求,返回0.
* 如果線程發(fā)起了中斷請求矢否,且中斷請求在signalled(即調(diào)用signal或signalAll)之前返回THROW_IE
* 中斷請求在signalled之后返回REINTERRUPT
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/**
* 如果interruptMode是THROW_IE仿村,就拋出InterruptedException異常
* 如果interruptMode是REINTERRUPT,則當(dāng)前線程再發(fā)出中斷請求
* 否則就什么都不做
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
/**
* 讓當(dāng)前持有鎖的線程阻塞等待兴喂,并釋放鎖。如果有中斷請求,則拋出InterruptedException異常
* @throws InterruptedException
*/
public final void await() throws InterruptedException {
// 如果當(dāng)前線程中斷標(biāo)志位是true衣迷,就拋出InterruptedException異常
if (Thread.interrupted())
throw new InterruptedException();
// 為當(dāng)前線程創(chuàng)建新的Node節(jié)點畏鼓,并且將這個節(jié)點插入到Condition隊列中了
Node node = addConditionWaiter();
// 釋放當(dāng)前線程占有的鎖,并喚醒CLH隊列一個等待線程
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果節(jié)點node不在同步隊列中(注意不是Condition隊列)
while (!isOnSyncQueue(node)) {
// 阻塞當(dāng)前線程,那么怎么喚醒這個線程呢壶谒?
// 首先我們必須調(diào)用signal或者signalAll將這個節(jié)點node加入到同步隊列云矫。
// 只有這樣unparkSuccessor(Node node)方法,才有可能喚醒被阻塞的線程
LockSupport.park(this);
// 如果當(dāng)前線程產(chǎn)生中斷請求汗菜,就跳出循環(huán)
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 如果節(jié)點node已經(jīng)在同步隊列中了让禀,獲取同步鎖,只有得到鎖才能繼續(xù)執(zhí)行陨界,否則線程繼續(xù)阻塞等待
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清除Condition隊列中狀態(tài)不是Node.CONDITION的節(jié)點
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 是否要拋出異常巡揍,或者發(fā)出中斷請求
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
// 獲取
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清除Condition隊列中狀態(tài)不是Node.CONDITION的節(jié)點
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 是否要拋出異常感帅,或者發(fā)出中斷請求
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
/**
* 返回true:表示這個condition對象是由sync對象創(chuàng)建的宗兼。
*/
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
/**
* 這個condition對象上是否有等待線程令蛉,即condition隊列不為空憨栽,且有一個節(jié)點的狀態(tài)是 Node.CONDITION
*/
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 遍歷condition隊列
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
/**
* 返回condition對象上等待線程的個數(shù)聪轿,即遍歷condition隊列卿嘲,計算節(jié)點的狀態(tài)是Node.CONDITION的個數(shù)
*/
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
// 遍歷condition隊列
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
// 計算節(jié)點的狀態(tài)是Node.CONDITION的個數(shù)
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
/**
* 返回condition對象上等待線程的集合捕传。
*/
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
// 遍歷condition隊列
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
// 當(dāng)節(jié)點的狀態(tài)是Node.CONDITION鬼悠,將節(jié)點的線程添加到list集合中
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
// 支持compareAndSet操作
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
/**
* 通過CAS函數(shù)設(shè)置head值录淡,僅僅在enq方法中調(diào)用
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* 通過CAS函數(shù)設(shè)置tail值捌木,僅僅在enq方法中調(diào)用
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
/**
* 通過CAS函數(shù)設(shè)置node對象的waitStatus值
*/
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
/**
* 通過CAS函數(shù)設(shè)置node對象的next值
*/
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
}