先來看看 AQS 有哪些屬性
// 頭結(jié)點(diǎn)吼和,你直接把它當(dāng)做 當(dāng)前持有鎖的線程 可能是最好理解的
private transient volatile Node head;
// 阻塞的尾節(jié)點(diǎn),每個(gè)新的節(jié)點(diǎn)進(jìn)來,都插入到最后,也就形成了一個(gè)鏈表
private transient volatile Node tail;
// 這個(gè)是最重要的,代表當(dāng)前鎖的狀態(tài)泽腮,0代表沒有被占用,大于 0 代表有線程持有當(dāng)前鎖
// 這個(gè)值可以大于 1,是因?yàn)殒i可以重入遵馆,每次重入都加上 1
private volatile int state;
// 代表當(dāng)前持有獨(dú)占鎖的線程,舉個(gè)最重要的使用例子戈二,因?yàn)殒i可以重入
// reentrantLock.lock()可以嵌套調(diào)用多次,所以每次用這個(gè)來判斷當(dāng)前線程是否已經(jīng)擁有了鎖
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread;
等待隊(duì)列中每個(gè)線程被包裝成一個(gè) Node 實(shí)例绒北,數(shù)據(jù)結(jié)構(gòu)是鏈表峻汉,一起看看源碼:
static final class Node {
// 標(biāo)識節(jié)點(diǎn)當(dāng)前在共享模式下
static final Node SHARED = new Node();
// 標(biāo)識節(jié)點(diǎn)當(dāng)前在獨(dú)占模式下
static final Node EXCLUSIVE = null;
// ======== 下面的幾個(gè)int常量是給waitStatus用的 ===========
/** waitStatus value to indicate thread has cancelled */
// 代碼此線程取消了爭搶這個(gè)鎖
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
// 官方的描述是瘤礁,其表示當(dāng)前node的后繼節(jié)點(diǎn)對應(yīng)的線程需要被喚醒
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
// =====================================================
// 取值為上面的1、-1钝腺、-2拍屑、-3,或者0
// 這么理解喷斋,暫時(shí)只需要知道如果這個(gè)值 大于0 代表此線程取消了等待蒜茴,
// ps: 半天搶不到鎖,不搶了,ReentrantLock是可以指定timeouot的漓摩。腿椎。。
volatile int waitStatus;
// 前驅(qū)節(jié)點(diǎn)的引用
volatile Node prev;
// 后繼節(jié)點(diǎn)的引用
volatile Node next;
// 這個(gè)就是線程本尊
volatile Thread thread;
}
Node 的數(shù)據(jù)結(jié)構(gòu)其實(shí)也挺簡單的夭咬,就是 thread + waitStatus + pre + next 四個(gè)屬性而已.
上面是一些基本的數(shù)據(jù)結(jié)構(gòu)啃炸,下面,我們開始說 ReentrantLock 的公平鎖卓舵。
ReentrantLock 在內(nèi)部用了內(nèi)部類 Sync 來管理鎖南用,所以真正的獲取鎖和釋放鎖是由 Sync 的實(shí)現(xiàn)類來控制的。
abstract static class Sync extends AbstractQueuedSynchronizer {
}
Sync 有兩個(gè)實(shí)現(xiàn)边器,分別為 NonfairSync(非公平鎖)和 FairSync(公平鎖)训枢,我們看 FairSync 部分。
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 爭鎖
final void lock() {
acquire(1);
}
// 來自父類AQS忘巧,
// 我們看到恒界,這個(gè)方法,如果tryAcquire(arg) 返回true, 也就結(jié)束了砚嘴。
// 否則十酣,acquireQueued方法會將線程壓到隊(duì)列中
public final void acquire(int arg) { // 此時(shí) arg == 1
// 首先調(diào)用tryAcquire(1)一下,名字上就知道际长,這個(gè)只是試一試
// 因?yàn)橛锌赡苤苯泳统晒α四厮什桑簿筒恍枰M(jìn)隊(duì)列排隊(duì)了,
if (!tryAcquire(arg) &&
// tryAcquire(arg)沒有成功工育,這個(gè)時(shí)候需要把當(dāng)前線程掛起虾宇,放到阻塞隊(duì)列中。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
// 嘗試直接獲取鎖如绸,返回值是boolean嘱朽,代表是否獲取到鎖
// 返回true:1.沒有線程在等待鎖;2.重入鎖怔接,線程本來就持有鎖搪泳,也就可以理所當(dāng)然可以直接獲取
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// state == 0 此時(shí)此刻沒有線程持有鎖
if (c == 0) {
// 雖然此時(shí)此刻鎖是可以用的,但是這是公平鎖扼脐,既然是公平岸军,就得講究先來后到,
// 看看有沒有別人在隊(duì)列中等了半天了
if (!hasQueuedPredecessors() &&
// 如果沒有線程在等待,那就用CAS嘗試一下艰赞,成功了就獲取到鎖了佣谐,
// 不成功的話,只能說明一個(gè)問題方妖,就在剛剛幾乎同一時(shí)刻有個(gè)線程搶先了 =_=
// 因?yàn)閯倓傔€沒人的台谍,我判斷過了
compareAndSetState(0, acquires)) {
// 到這里就是獲取到鎖了,標(biāo)記一下吁断,告訴大家,現(xiàn)在是我占用了鎖
setExclusiveOwnerThread(current);
return true;
}
}
// 會進(jìn)入這個(gè)else if分支坞生,說明是重入了仔役,需要操作:state=state+1
// 這里不存在并發(fā)問題
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 如果到這里,說明前面的if和else if都沒有返回true是己,說明沒有獲取到鎖
// 回到上面一個(gè)外層調(diào)用方法繼續(xù)看:
// if (!tryAcquire(arg)
// && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// selfInterrupt();
return false;
}
// 假設(shè)tryAcquire(arg) 返回false又兵,那么代碼將執(zhí)行:
// acquireQueued(addWaiter(Node.EXCLUSIVE), arg),
// 這個(gè)方法卒废,首先需要執(zhí)行:addWaiter(Node.EXCLUSIVE)
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
// 此方法的作用是把線程包裝成node沛厨,同時(shí)進(jìn)入到隊(duì)列中
// 參數(shù)mode此時(shí)是Node.EXCLUSIVE,代表獨(dú)占模式
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 以下幾行代碼想把當(dāng)前node加到鏈表的最后面去摔认,也就是進(jìn)到阻塞隊(duì)列的最后
Node pred = tail;
// tail!=null => 隊(duì)列不為空(tail==head的時(shí)候逆皮,其實(shí)隊(duì)列是空的,不過不管這個(gè)吧)
if (pred != null) {
// 將當(dāng)前的隊(duì)尾節(jié)點(diǎn)参袱,設(shè)置為自己的前驅(qū)
node.prev = pred;
// 用CAS把自己設(shè)置為隊(duì)尾, 如果成功后电谣,tail == node 了,這個(gè)節(jié)點(diǎn)成為阻塞隊(duì)列新的尾巴
if (compareAndSetTail(pred, node)) {
// 進(jìn)到這里說明設(shè)置成功抹蚀,當(dāng)前node==tail, 將自己與之前的隊(duì)尾相連剿牺,
// 上面已經(jīng)有 node.prev = pred,加上下面這句环壤,也就實(shí)現(xiàn)了和之前的尾節(jié)點(diǎn)雙向連接了
pred.next = node;
// 線程入隊(duì)了晒来,可以返回了
return node;
}
}
// 仔細(xì)看看上面的代碼,如果會到這里郑现,
// 說明 pred==null(隊(duì)列是空的) 或者 CAS失敗(有線程在競爭入隊(duì))
// 讀者一定要跟上思路湃崩,如果沒有跟上,建議先不要往下讀了懂酱,往回仔細(xì)看竹习,否則會浪費(fèi)時(shí)間的
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
// 采用自旋的方式入隊(duì)
// 之前說過,到這個(gè)方法只有兩種可能:等待隊(duì)列為空列牺,或者有線程競爭入隊(duì)整陌,
// 自旋在這邊的語義是:CAS設(shè)置tail過程中,競爭一次競爭不到,我就多次競爭泌辫,總會排到的
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 之前說過随夸,隊(duì)列為空也會進(jìn)來這里
if (t == null) { // Must initialize
// 初始化head節(jié)點(diǎn)
// 細(xì)心的讀者會知道原來 head 和 tail 初始化的時(shí)候都是 null 的
// 還是一步CAS,你懂的震放,現(xiàn)在可能是很多線程同時(shí)進(jìn)來呢
if (compareAndSetHead(new Node()))
// 給后面用:這個(gè)時(shí)候head節(jié)點(diǎn)的waitStatus==0, 看new Node()構(gòu)造方法就知道了
// 這個(gè)時(shí)候有了head宾毒,但是tail還是null,設(shè)置一下殿遂,
// 把tail指向head诈铛,放心,馬上就有線程要來了墨礁,到時(shí)候tail就要被搶了
// 注意:這里只是設(shè)置了tail=head幢竹,這里可沒return哦,沒有return恩静,沒有return
// 所以焕毫,設(shè)置完了以后,繼續(xù)for循環(huán)驶乾,下次就到下面的else分支了
tail = head;
} else {
// 下面幾行邑飒,和上一個(gè)方法 addWaiter 是一樣的,
// 只是這個(gè)套在無限循環(huán)里级乐,反正就是將當(dāng)前線程排到隊(duì)尾疙咸,有線程競爭的話排不上重復(fù)排
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 現(xiàn)在,又回到這段代碼了
// if (!tryAcquire(arg)
// && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// selfInterrupt();
// 下面這個(gè)方法风科,參數(shù)node罕扎,經(jīng)過addWaiter(Node.EXCLUSIVE),此時(shí)已經(jīng)進(jìn)入阻塞隊(duì)列
// 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的話丐重,
// 意味著上面這段代碼將進(jìn)入selfInterrupt()腔召,所以正常情況下,下面應(yīng)該返回false
// 這個(gè)方法非常重要扮惦,應(yīng)該說真正的線程掛起臀蛛,然后被喚醒后去獲取鎖,都在這個(gè)方法里了
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// p == head 說明當(dāng)前節(jié)點(diǎn)雖然進(jìn)到了阻塞隊(duì)列崖蜜,但是是阻塞隊(duì)列的第一個(gè)浊仆,因?yàn)樗那膀?qū)是head
// 注意,阻塞隊(duì)列不包含head節(jié)點(diǎn)豫领,head一般指的是占有鎖的線程抡柿,head后面的才稱為阻塞隊(duì)列
// 所以當(dāng)前節(jié)點(diǎn)可以去試搶一下鎖
// 這里我們說一下,為什么可以去試試:
// 首先等恐,它是隊(duì)頭洲劣,這個(gè)是第一個(gè)條件备蚓,其次,當(dāng)前的head有可能是剛剛初始化的node囱稽,
// enq(node) 方法里面有提到郊尝,head是延時(shí)初始化的,而且new Node()的時(shí)候沒有設(shè)置任何線程
// 也就是說战惊,當(dāng)前的head不屬于任何一個(gè)線程流昏,所以作為隊(duì)頭,可以去試一試吞获,
// tryAcquire已經(jīng)分析過了, 忘記了請往前看一下况凉,就是簡單用CAS試操作一下state
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 到這里,說明上面的if分支沒有成功各拷,要么當(dāng)前node本來就不是隊(duì)頭茎刚,
// 要么就是tryAcquire(arg)沒有搶贏別人,繼續(xù)往下看
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 什么時(shí)候 failed 會為 true???
// tryAcquire() 方法拋異常的情況
if (failed)
cancelAcquire(node);
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
// 剛剛說過撤逢,會到這里就是沒有搶到鎖唄,這個(gè)方法說的是:"當(dāng)前線程沒有搶到鎖粮坞,是否需要掛起當(dāng)前線程蚊荣?"
// 第一個(gè)參數(shù)是前驅(qū)節(jié)點(diǎn),第二個(gè)參數(shù)才是代表當(dāng)前線程的節(jié)點(diǎn)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驅(qū)節(jié)點(diǎn)的 waitStatus == -1 莫杈,說明前驅(qū)節(jié)點(diǎn)狀態(tài)正常互例,當(dāng)前線程需要掛起,直接可以返回true
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 前驅(qū)節(jié)點(diǎn) waitStatus大于0 筝闹,之前說過媳叨,大于0 說明前驅(qū)節(jié)點(diǎn)取消了排隊(duì)。
// 這里需要知道這點(diǎn):進(jìn)入阻塞隊(duì)列排隊(duì)的線程會被掛起糊秆,而喚醒的操作是由前驅(qū)節(jié)點(diǎn)完成的平痰。
// 所以下面這塊代碼說的是將當(dāng)前節(jié)點(diǎn)的prev指向waitStatus<=0的節(jié)點(diǎn)赔蒲,
// 簡單說债热,就是為了找個(gè)好爹舶沿,因?yàn)槟氵€得依賴它來喚醒呢高镐,如果前驅(qū)節(jié)點(diǎn)取消了排隊(duì)刨仑,
// 找前驅(qū)節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)做爹量淌,往前遍歷總能找到一個(gè)好爹的
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 仔細(xì)想想硫狞,如果進(jìn)入到這個(gè)分支意味著什么
// 前驅(qū)節(jié)點(diǎn)的waitStatus不等于-1和1残吩,那也就是只可能是0泣侮,-2紧唱,-3
// 在我們前面的源碼中活尊,都沒有看到有設(shè)置waitStatus的深胳,所以每個(gè)新的node入隊(duì)時(shí)癣猾,waitStatu都是0
// 正常情況下夸盟,前驅(qū)節(jié)點(diǎn)是之前的 tail,那么它的 waitStatus 應(yīng)該是 0
// 用CAS將前驅(qū)節(jié)點(diǎn)的waitStatus設(shè)置為Node.SIGNAL(也就是-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 這個(gè)方法返回 false像捶,那么會再走一次 for 循序上陕,
// 然后再次進(jìn)來此方法,此時(shí)會從第一個(gè)分支返回 true
return false;
}
// private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
// 這個(gè)方法結(jié)束根據(jù)返回值我們簡單分析下:
// 如果返回true, 說明前驅(qū)節(jié)點(diǎn)的waitStatus==-1拓春,是正常情況释簿,那么當(dāng)前線程需要被掛起,等待以后被喚醒
// 我們也說過痘儡,以后是被前驅(qū)節(jié)點(diǎn)喚醒,就等著前驅(qū)節(jié)點(diǎn)拿到鎖枢步,然后釋放鎖的時(shí)候叫你好了
// 如果返回false, 說明當(dāng)前不需要被掛起沉删,為什么呢?往后看
// 跳回到前面是這個(gè)方法
// if (shouldParkAfterFailedAcquire(p, node) &&
// parkAndCheckInterrupt())
// interrupted = true;
// 1. 如果shouldParkAfterFailedAcquire(p, node)返回true醉途,
// 那么需要執(zhí)行parkAndCheckInterrupt():
// 這個(gè)方法很簡單矾瑰,因?yàn)榍懊娣祷豻rue,所以需要掛起線程隘擎,這個(gè)方法就是負(fù)責(zé)掛起線程的
// 這里用了LockSupport.park(this)來掛起線程殴穴,然后就停在這里了,等待被喚醒=======
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
// 2. 接下來說說如果shouldParkAfterFailedAcquire(p, node)返回false的情況
// 仔細(xì)看shouldParkAfterFailedAcquire(p, node)货葬,我們可以發(fā)現(xiàn)采幌,其實(shí)第一次進(jìn)來的時(shí)候,一般都不會返回true的震桶,原因很簡單休傍,前驅(qū)節(jié)點(diǎn)的waitStatus=-1是依賴于后繼節(jié)點(diǎn)設(shè)置的。也就是說蹲姐,我都還沒給前驅(qū)設(shè)置-1呢磨取,怎么可能是true呢人柿,但是要看到,這個(gè)方法是套在循環(huán)里的忙厌,所以第二次進(jìn)來的時(shí)候狀態(tài)就是-1了凫岖。
// 解釋下為什么shouldParkAfterFailedAcquire(p, node)返回false的時(shí)候不直接掛起線程:
// => 是為了應(yīng)對在經(jīng)過這個(gè)方法后,node已經(jīng)是head的直接后繼節(jié)點(diǎn)了逢净。 為了減少線程陷入park哥放,線程阻塞的開銷切換比較大
}
解鎖操作
如果線程沒獲取到鎖,線程會被 LockSupport.park(this); 掛起停止汹胃,等待被喚醒婶芭。
// 喚醒的代碼還是比較簡單的,你如果上面加鎖的都看懂了着饥,下面都不需要看就知道怎么回事了
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 往后看吧
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 回到ReentrantLock看tryRelease方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否完全釋放鎖
boolean free = false;
// 其實(shí)就是重入的問題犀农,如果c==0,也就是說沒有嵌套鎖了宰掉,可以釋放了呵哨,否則還不能釋放掉
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
// 喚醒后繼節(jié)點(diǎn)
// 從上面調(diào)用處知道,參數(shù)node是head頭結(jié)點(diǎn)
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
// 如果head節(jié)點(diǎn)當(dāng)前waitStatus<0, 將其修改為0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 下面的代碼就是喚醒后繼節(jié)點(diǎn)轨奄,但是有可能后繼節(jié)點(diǎn)取消了等待(waitStatus==1)
// 從隊(duì)尾往前找孟害,找到waitStatus<=0的所有節(jié)點(diǎn)中排在最前面的
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 從后往前找,仔細(xì)看代碼挪拟,不必?fù)?dān)心中間有節(jié)點(diǎn)取消(waitStatus==1)的情況
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 喚醒線程
LockSupport.unpark(s.thread);
}
喚醒線程以后挨务,被喚醒的線程將從以下代碼中繼續(xù)往前走:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 剛剛線程被掛起在這里了
return Thread.interrupted();
}
// 又回到這個(gè)方法了:acquireQueued(final Node node, int arg),這個(gè)時(shí)候玉组,node的前驅(qū)是head了
unparkSuccessor 方法中for循環(huán)從tail開始而不是head
上面AQS的代碼谎柄,我不明白的是這個(gè)for循環(huán),思路是找到后繼能夠有效的節(jié)點(diǎn)惯雳,為什么不從head開始朝巫,然后找到就break而非要從尾巴開始呢?石景?劈猿??
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
看插入的地方就會明白了潮孽。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//看這里
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
新節(jié)點(diǎn)pre指向tail揪荣,tail指向新節(jié)點(diǎn),這里后繼指向前驅(qū)的指針是由CAS操作保證線程安全的往史。而cas操作之后t.next=node之前变逃,可能會有其他線程進(jìn)來。所以出現(xiàn)了問題怠堪,從尾部向前遍歷是一定能遍歷到所有的節(jié)點(diǎn)揽乱。(cas和t.next=node不是原子性的名眉,導(dǎo)致在利用next指針遍歷節(jié)點(diǎn)時(shí),可能會出現(xiàn)凰棉,節(jié)點(diǎn)已經(jīng)插入即tail已經(jīng)更新损拢,而pre的next指針依然為null的情況,這將無法完整遍歷所有節(jié)點(diǎn))撒犀。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t; // ①
if (compareAndSetTail(t, node)) { // ②
t.next = node; // ③
return t;
}
}
}
}
① 處將新結(jié)點(diǎn) node 的 prev 引用指向當(dāng)前的 t, 即 tail 結(jié)點(diǎn). 然而, 由于 ①, ② 這兩行代碼的合在一起并非原子性的, 所以很有可能在設(shè)置 tail 時(shí)存在著競爭, 也即 tail 被其它線程更新過了. 所以要自旋操作, 即在死循環(huán)中操作, 直到成功為止. 自旋地 CAS volatile 變量是很經(jīng)典的用法. 如果設(shè)置成功了, 那么 從 node.prev 執(zhí)行完畢到正在用 CAS 設(shè)置 tail 時(shí), tail 變量是沒有被修改的, 所以如果 CAS成功, 那么 node.prev = t 一定是指向上一個(gè) tail 的. 同樣的, ②, ③ 合在一起也并非原子操作, 更重要的是, next field 的設(shè)置發(fā)生在 CAS 操作之后, 所以可能會存在 tail 已經(jīng)更新, 但是 last tail 的 next field 還未設(shè)置完畢, 即它的 lastTail.next 為 null 這種情況. 因此如果此時(shí)訪問該結(jié)點(diǎn)的 next 引用可能就會得到它在隊(duì)尾, 不存在后繼結(jié)點(diǎn)的"錯(cuò)覺". 而我們總是能夠通過從 tail 開始反向查找, 借助可靠的 prev 引用來定位到指定的結(jié)點(diǎn). 簡單總結(jié)一下, prev 引用的設(shè)置發(fā)生在 CAS之前, 因此如果 CAS 設(shè)置 tail 成功, 那么 prev 一定是正確地指向 last tail, 而 next 引用的設(shè)置發(fā)生在其后, 因而會存在一個(gè) tail 更新成功, 但是 last tail 的 next 引用還未設(shè)置的尷尬時(shí)期. 所以我們說 prev 是可靠的, 而 next 有時(shí)會為 null, 但并不一定真的就沒有后繼結(jié)點(diǎn).
在有文檔的情況下福压,不太建議生看代碼反推實(shí)現(xiàn)原理,效率很低也容易出現(xiàn)理解偏差或舞。
在并發(fā)環(huán)境下荆姆,加鎖和解鎖需要以下三個(gè)部件的協(xié)調(diào):
鎖狀態(tài)。我們要知道鎖是不是被別的線程占有了映凳,這個(gè)就是 state 的作用胆筒,它為 0 的時(shí)候代表沒有線程占有鎖,可以去爭搶這個(gè)鎖诈豌,用 CAS 將 state 設(shè)為 1仆救,如果 CAS 成功,說明搶到了鎖矫渔,這樣其他線程就搶不到了彤蔽,如果鎖重入的話,state進(jìn)行 +1 就可以庙洼,解鎖就是減 1顿痪,直到 state 又變?yōu)?0,代表釋放鎖油够,所以 lock() 和 unlock() 必須要配對啊蚁袭。然后喚醒等待隊(duì)列中的第一個(gè)線程,讓其來占有鎖叠聋。
線程的阻塞和解除阻塞撕阎。AQS 中采用了 LockSupport.park(thread) 來掛起線程受裹,用 unpark 來喚醒線程
阻塞隊(duì)列碌补。因?yàn)闋帗屾i的線程可能很多,但是只能有一個(gè)線程拿到鎖棉饶,其他的線程都必須等待厦章,這個(gè)時(shí)候就需要一個(gè) queue 來管理這些線程,AQS 用的是一個(gè) FIFO 的隊(duì)列照藻,就是一個(gè)鏈表袜啃,每個(gè) node 都持有后繼節(jié)點(diǎn)的引用
注意細(xì)節(jié)
現(xiàn)假設(shè)有線程1和線程2,線程1獲取了鎖幸缕,線程 1 沒有調(diào)用 unlock() 之前群发,線程 2 調(diào)用了 lock()晰韵,會發(fā)生什么?
線程 2 會初始化 head【new Node()】熟妓,同時(shí)線程 2 也會插入到阻塞隊(duì)列并掛起 (注意看這里是一個(gè) for 循環(huán)雪猪,而且設(shè)置 head 和 tail 的部分是不 return 的,只有入隊(duì)成功才會跳出循環(huán))
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
首先起愈,是線程 2 初始化 head 節(jié)點(diǎn)只恨,此時(shí) head==tail, waitStatus==0
然后線程 2 入隊(duì):
同時(shí)我們也要看此時(shí)節(jié)點(diǎn)的 waitStatus,我們知道 head 節(jié)點(diǎn)是線程 2 初始化的抬虽,此時(shí)的 waitStatus 沒有設(shè)置官觅, java 默認(rèn)會設(shè)置為 0,但是到 shouldParkAfterFailedAcquire 這個(gè)方法的時(shí)候阐污,線程 2 會把前驅(qū)節(jié)點(diǎn)休涤,也就是 head 的waitStatus設(shè)置為 -1。
那線程 2 節(jié)點(diǎn)此時(shí)的 waitStatus 是多少呢疤剑,由于沒有設(shè)置滑绒,所以是 0;
如果線程 3 此時(shí)再進(jìn)來隘膘,直接插到線程 2 的后面就可以了疑故,此時(shí)線程 3 的 waitStatus 是 0,到 shouldParkAfterFailedAcquire 方法的時(shí)候把前驅(qū)節(jié)點(diǎn)線程 2 的 waitStatus 設(shè)置為 -1弯菊。
這里可以簡單說下 waitStatus 中 SIGNAL(-1) 狀態(tài)的意思纵势,Doug Lea 注釋的是:代表后繼節(jié)點(diǎn)需要被喚醒。也就是說這個(gè) waitStatus 其實(shí)代表的不是自己的狀態(tài)管钳,而是后繼節(jié)點(diǎn)的狀態(tài)钦铁,我們知道,每個(gè) node 在入隊(duì)的時(shí)候才漆,都會把前驅(qū)節(jié)點(diǎn)的狀態(tài)改為 SIGNAL牛曹,然后阻塞,等待被前驅(qū)喚醒醇滥。
在acquireQueued方法里面黎比,第一次調(diào)用shouldParkAfterFailedAcquire(p, node)的時(shí)候,把前驅(qū)節(jié)點(diǎn)waitStatus從0改為-1鸳玩,然后返回false阅虫,回到acquireQueued方法,再嘗試拿一次鎖不跟,然后第二次調(diào)用shouldParkAfterFailedAcquire返回true颓帝,調(diào)用parkAndCheckInterrupt()掛起線程。
那么,如果在某線程B還沒有掛起之前购城,前驅(qū)節(jié)點(diǎn)的線程A發(fā)現(xiàn)自己waitStatus為-1直接unpark吕座,然后剛剛的線程B才掛起。那不就沒人能喚醒它了嗎瘪板?它是怎么保證被喚醒的米诉?
1、如果一個(gè)線程 park 了篷帅,那么調(diào)用 unpark(thread) 這個(gè)線程會被喚醒史侣;
2、如果一個(gè)線程先被調(diào)用了 unpark魏身,那么下一個(gè) park(thread) 操作不會掛起線程惊橱。
unparkSuccessor(Node node)
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
首先,第一行代碼先檢測 head 的后繼節(jié)點(diǎn)箭昵,只有當(dāng)此時(shí)的后繼節(jié)點(diǎn)不存在或者這個(gè)后繼節(jié)點(diǎn)取消了才開始從后往前找税朴,所以大部分情況下,其實(shí)不會發(fā)生從后往前遍歷整個(gè)隊(duì)列的情況家制。(后繼節(jié)點(diǎn)取消很正常正林,但是某節(jié)點(diǎn)在入隊(duì)的時(shí)候,如果發(fā)現(xiàn)前驅(qū)是取消狀態(tài)颤殴,前驅(qū)節(jié)點(diǎn)是會被請出隊(duì)列的)
這里為啥倒序遍歷觅廓?
之所以要倒序是因?yàn)閏ancelAcquire方法的最后一行node.next = node; 假如按照正序遍歷,剛好遍歷到node, 由于node線程異常(acquireQueued方法的for循環(huán))進(jìn)入cancelAcquire方法,執(zhí)行了最后一行后, 那么正序遍歷就會陷入死循環(huán)!(貌似不會進(jìn)入cancelAcquire這個(gè)方法?涵但?)
公平鎖和非公平鎖
ReentrantLock 默認(rèn)采用非公平鎖杈绸,除非你在構(gòu)造方法中傳入?yún)?shù) true 。
public ReentrantLock() {
// 默認(rèn)非公平鎖
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
公平鎖的 lock 方法:
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
// AbstractQueuedSynchronizer.acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 1. 和非公平鎖相比矮瘟,這里多了一個(gè)判斷:是否有線程在等待
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
非公平鎖的 lock 方法:
static final class NonfairSync extends Sync {
final void lock() {
// 2. 和公平鎖相比瞳脓,這里會直接先進(jìn)行一次CAS,成功就返回了
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// AbstractQueuedSynchronizer.acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 這里沒有對阻塞隊(duì)列進(jìn)行判斷
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
公平鎖和非公平鎖只有兩處不同:
非公平鎖在調(diào)用 lock 后澈侠,首先就會調(diào)用 CAS 進(jìn)行一次搶鎖劫侧,如果這個(gè)時(shí)候恰巧鎖沒有被占用,那么直接就獲取到鎖返回了哨啃。
非公平鎖在 CAS 失敗后烧栋,和公平鎖一樣都會進(jìn)入到 tryAcquire 方法,在 tryAcquire 方法中棘催,如果發(fā)現(xiàn)鎖這個(gè)時(shí)候被釋放了(state == 0)劲弦,非公平鎖會直接 CAS 搶鎖耳标,但是公平鎖會判斷等待隊(duì)列是否有線程處于等待狀態(tài)醇坝,如果有則不去搶鎖,乖乖排到后面.
公平鎖和非公平鎖就這兩點(diǎn)區(qū)別,如果這兩次 CAS 都不成功呼猪,那么后面非公平鎖和公平鎖是一樣的画畅,都要進(jìn)入到阻塞隊(duì)列等待喚醒。相對來說宋距,非公平鎖會有更好的性能轴踱,因?yàn)樗耐掏铝勘容^大。當(dāng)然谚赎,非公平鎖讓獲取鎖的時(shí)間變得更加不確定淫僻,可能會導(dǎo)致在阻塞隊(duì)列中的線程長期處于饑餓狀態(tài)。
Condition
我們先來看看 Condition 的使用場景壶唤,Condition 經(jīng)出椋可以用在生產(chǎn)者-消費(fèi)者的場景中,請看 Doug Lea 給出的這個(gè)例子:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer {
final Lock lock = new ReentrantLock();
// condition 依賴于 lock 來產(chǎn)生
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
// 生產(chǎn)
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await(); // 隊(duì)列已滿闸盔,等待悯辙,直到 not full 才能繼續(xù)生產(chǎn)
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal(); // 生產(chǎn)成功,隊(duì)列已經(jīng) not empty 了迎吵,發(fā)個(gè)通知出去
} finally {
lock.unlock();
}
}
// 消費(fèi)
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await(); // 隊(duì)列為空躲撰,等待,直到隊(duì)列 not empty击费,才能繼續(xù)消費(fèi)
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal(); // 被我消費(fèi)掉一個(gè)拢蛋,隊(duì)列 not full 了,發(fā)個(gè)通知出去
return x;
} finally {
lock.unlock();
}
}
}
我們可以看到蔫巩,在使用 condition 時(shí)瓤狐,必須先持有相應(yīng)的鎖。這個(gè)和 Object 類中的方法有相似的語義批幌,需要先持有某個(gè)對象的監(jiān)視器鎖才可以執(zhí)行 wait(), notify() 或 notifyAll() 方法础锐。
ArrayBlockingQueue 采用這種方式實(shí)現(xiàn)了生產(chǎn)者-消費(fèi)者,所以請只把這個(gè)例子當(dāng)做學(xué)習(xí)例子荧缘,實(shí)際生產(chǎn)中可以直接使用 ArrayBlockingQueue皆警。
我們常用 obj.wait(),obj.notify() 或 obj.notifyAll() 來實(shí)現(xiàn)相似的功能截粗,但是信姓,它們是基于對象的監(jiān)視器鎖的. 這里說的 Condition 是基于 ReentrantLock 實(shí)現(xiàn)的,而 ReentrantLock 是依賴于 AbstractQueuedSynchronizer 實(shí)現(xiàn)的绸罗。
每個(gè) ReentrantLock 實(shí)例可以通過調(diào)用多次 newCondition 產(chǎn)生多個(gè) ConditionObject 的實(shí)例:
final ConditionObject newCondition() {
// 實(shí)例化一個(gè) ConditionObject
return new ConditionObject();
}
我們首先來看下我們關(guān)注的 Condition 的實(shí)現(xiàn)類 AbstractQueuedSynchronizer 類中的 ConditionObject意推。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// 條件隊(duì)列的第一個(gè)節(jié)點(diǎn)
// 不要管這里的關(guān)鍵字 transient,是不參與序列化的意思
private transient Node firstWaiter;
// 條件隊(duì)列的最后一個(gè)節(jié)點(diǎn)
private transient Node lastWaiter;
......
在介紹 AQS 的時(shí)候珊蟀,我們有一個(gè)阻塞隊(duì)列(或者叫同步隊(duì)列sync queue)菊值,用于保存等待獲取鎖的線程的隊(duì)列,這里我們引入另一個(gè)概念,叫條件隊(duì)列(condition queue)腻窒,我畫了一張簡單的圖用來說明這個(gè)昵宇。
這里,我們簡單回顧下 Node 的屬性:
volatile int waitStatus; // 可取值 0儿子、CANCELLED(1)瓦哎、SIGNAL(-1)、CONDITION(-2)柔逼、PROPAGATE(-3)
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
prev 和 next 用于實(shí)現(xiàn)阻塞隊(duì)列的雙向鏈表蒋譬,這里的 nextWaiter 用于實(shí)現(xiàn)條件隊(duì)列的單向鏈表。
條件隊(duì)列和阻塞隊(duì)列的節(jié)點(diǎn)愉适,都是 Node 的實(shí)例羡铲,因?yàn)闂l件隊(duì)列的節(jié)點(diǎn)是需要轉(zhuǎn)移到阻塞隊(duì)列中去的;
我們知道一個(gè) ReentrantLock 實(shí)例可以通過多次調(diào)用 newCondition() 來產(chǎn)生多個(gè) Condition 實(shí)例儡毕,這里對應(yīng) condition1 和 condition2也切。注意,ConditionObject 只有兩個(gè)屬性 firstWaiter 和 lastWaiter
每個(gè) condition 有一個(gè)關(guān)聯(lián)的條件隊(duì)列腰湾,如線程 1 調(diào)用 condition1.await() 方法即可將當(dāng)前線程 1 包裝成 Node 后加入到條件隊(duì)列中雷恃,然后阻塞在這里,不繼續(xù)往下執(zhí)行费坊,條件隊(duì)列是一個(gè)單向鏈表倒槐;
調(diào)用condition1.signal() 觸發(fā)一次喚醒,此時(shí)喚醒的是隊(duì)頭附井,會將condition1 對應(yīng)的條件隊(duì)列的 firstWaiter(隊(duì)頭) 移到阻塞隊(duì)列的隊(duì)尾讨越,等待獲取鎖,獲取鎖后 await 方法才能返回永毅,繼續(xù)往下執(zhí)行把跨。
我們先來看看 wait 方法:
// 首先,這個(gè)方法是可被中斷的沼死,不可被中斷的是另一個(gè)方法 awaitUninterruptibly()
// 這個(gè)方法會阻塞着逐,直到調(diào)用 signal 方法(指 signal() 和 signalAll(),下同)意蛀,或被中斷
public final void await() throws InterruptedException {
// 老規(guī)矩耸别,既然該方法要響應(yīng)中斷,那么在最開始就判斷中斷狀態(tài)
if (Thread.interrupted())
throw new InterruptedException();
// 添加到 condition 的條件隊(duì)列中
Node node = addConditionWaiter();
// 釋放鎖县钥,返回值是釋放鎖之前的 state 值
// await() 之前秀姐,當(dāng)前線程是必須持有鎖的,這里肯定要釋放掉
int savedState = fullyRelease(node);
int interruptMode = 0;
// 這里退出循環(huán)有兩種情況若贮,之后再仔細(xì)分析
// 1. isOnSyncQueue(node) 返回 true省有,即當(dāng)前 node 已經(jīng)轉(zhuǎn)移到阻塞隊(duì)列了
// 2. checkInterruptWhileWaiting(node) != 0 會到 break痒留,然后退出循環(huán),代表的是線程中斷
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被喚醒后锥咸,將進(jìn)入阻塞隊(duì)列,等待獲取鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
具體細(xì)節(jié):
- 將節(jié)點(diǎn)加入到條件隊(duì)列
addConditionWaiter() 是將當(dāng)前節(jié)點(diǎn)加入到條件隊(duì)列
// 將當(dāng)前線程對應(yīng)的節(jié)點(diǎn)入隊(duì)细移,插入隊(duì)尾
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果條件隊(duì)列的最后一個(gè)節(jié)點(diǎn)取消了搏予,將其清除出去
// 為什么這里把 waitStatus 不等于 Node.CONDITION,就判定為該節(jié)點(diǎn)發(fā)生了取消排隊(duì)弧轧?
if (t != null && t.waitStatus != Node.CONDITION) {
// 這個(gè)方法會遍歷整個(gè)條件隊(duì)列雪侥,然后會將已取消的所有節(jié)點(diǎn)清除出隊(duì)列
unlinkCancelledWaiters();
t = lastWaiter;
}
// node 在初始化的時(shí)候,指定 waitStatus 為 Node.CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// t 此時(shí)是 lastWaiter精绎,隊(duì)尾
// 如果隊(duì)列為空
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
上面的這塊代碼很簡單速缨,就是將當(dāng)前線程進(jìn)入到條件隊(duì)列的隊(duì)尾。
在addWaiter 方法中代乃,有一個(gè) unlinkCancelledWaiters() 方法旬牲,該方法用于清除隊(duì)列中已經(jīng)取消等待的節(jié)點(diǎn)。
/ 等待隊(duì)列是一個(gè)單向鏈表搁吓,遍歷鏈表將已經(jīng)取消等待的節(jié)點(diǎn)清除出去
// 純屬鏈表操作原茅,很好理解,看不懂多看幾遍就可以了
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
// 如果節(jié)點(diǎn)的狀態(tài)不是 Node.CONDITION 的話堕仔,這個(gè)節(jié)點(diǎn)就是被取消的
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
完全釋放獨(dú)占鎖
回到 wait 方法擂橘,節(jié)點(diǎn)入隊(duì)了以后,會調(diào)用 int savedState = fullyRelease(node); 方法釋放鎖摩骨,注意通贞,這里是完全釋放獨(dú)占鎖(fully release),因?yàn)?ReentrantLock 是可以重入的恼五。
考慮一下這里的 savedState昌罩。如果在 condition1.await() 之前,假設(shè)線程先執(zhí)行了 2 次 lock() 操作灾馒,那么 state 為 2峡迷,我們理解為該線程持有 2 把鎖,這里 await() 方法必須將 state 設(shè)置為 0你虹,然后再進(jìn)入掛起狀態(tài)绘搞,這樣其他線程才能持有鎖。當(dāng)它被喚醒的時(shí)候傅物,它需要重新持有 2 把鎖夯辖,才能繼續(xù)下去。
// 首先董饰,我們要先觀察到返回值 savedState 代表 release 之前的 state 值
// 對于最簡單的操作:先 lock.lock()蒿褂,然后 condition1.await()圆米。
// 那么 state 經(jīng)過這個(gè)方法由 1 變?yōu)?0,鎖釋放啄栓,此方法返回 1
// 相應(yīng)的娄帖,如果 lock 重入了 n 次,savedState == n
// 如果這個(gè)方法失敗昙楚,會將節(jié)點(diǎn)設(shè)置為"取消"狀態(tài)近速,并拋出異常 IllegalMonitorStateException
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 這里使用了當(dāng)前的 state 作為 release 的參數(shù),也就是完全釋放掉鎖堪旧,將 state 置為 0
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
如果一個(gè)線程在不持有 lock 的基礎(chǔ)上削葱,就去調(diào)用 condition1.await() 方法,它能進(jìn)入條件隊(duì)列淳梦,但是在上面的這個(gè)方法中析砸,由于它不持有鎖,release(savedState) 這個(gè)方法肯定要返回 false爆袍,進(jìn)入到異常分支首繁,然后進(jìn)入 finally 塊設(shè)置 node.waitStatus = Node.CANCELLED,這個(gè)已經(jīng)入隊(duì)的節(jié)點(diǎn)之后會被后繼的節(jié)點(diǎn)”請出去“陨囊。
等待進(jìn)入阻塞隊(duì)列
釋放掉鎖以后蛮瞄,接下來是這段,這邊會自旋谆扎,如果發(fā)現(xiàn)自己還沒到阻塞隊(duì)列挂捅,那么掛起,等待被轉(zhuǎn)移到阻塞隊(duì)列堂湖。
int interruptMode = 0;
// 如果不在阻塞隊(duì)列中闲先,注意了,是阻塞隊(duì)列
while (!isOnSyncQueue(node)) {
// 線程掛起
LockSupport.park(this);
// 這里可以先不用看了无蜂,等看到它什么時(shí)候被 unpark 再說
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
isOnSyncQueue(Node node) 用于判斷節(jié)點(diǎn)是否已經(jīng)轉(zhuǎn)移到阻塞隊(duì)列了:
// 在節(jié)點(diǎn)入條件隊(duì)列的時(shí)候伺糠,初始化時(shí)設(shè)置了 waitStatus = Node.CONDITION
// 前面我提到,signal 的時(shí)候需要將節(jié)點(diǎn)從條件隊(duì)列移到阻塞隊(duì)列斥季,
// 這個(gè)方法就是判斷 node 是否已經(jīng)移動到阻塞隊(duì)列了
final boolean isOnSyncQueue(Node node) {
// 移動過去的時(shí)候训桶,node 的 waitStatus 會置為 0,這個(gè)之后在說 signal 方法的時(shí)候會說到
// 如果 waitStatus 還是 Node.CONDITION酣倾,也就是 -2舵揭,那肯定就是還在條件隊(duì)列中
// 如果 node 的前驅(qū) prev 指向還是 null,說明肯定沒有在 阻塞隊(duì)列(prev是阻塞隊(duì)列鏈表中使用的)
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果 node 已經(jīng)有后繼節(jié)點(diǎn) next 的時(shí)候躁锡,那肯定是在阻塞隊(duì)列了
if (node.next != null)
return true;
// 下面這個(gè)方法從阻塞隊(duì)列的隊(duì)尾開始從后往前遍歷找午绳,如果找到相等的,說明在阻塞隊(duì)列映之,否則就是不在阻塞隊(duì)列
// 可以通過判斷 node.prev() != null 來推斷出 node 在阻塞隊(duì)列嗎拦焚?答案是:不能蜡坊。
// AQS 的入隊(duì)方法,首先設(shè)置的是 node.prev 指向 tail赎败,
// 然后是 CAS 操作將自己設(shè)置為新的 tail秕衙,可是這次的 CAS 是可能失敗的。
return findNodeFromTail(node);
}
// 從阻塞隊(duì)列的隊(duì)尾往前遍歷僵刮,如果找到据忘,返回 true
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
回到前面的循環(huán),isOnSyncQueue(node) 返回 false 的話妓笙,那么進(jìn)到 LockSupport.park(this); 這里線程掛起若河。
signal 喚醒線程能岩,轉(zhuǎn)移到阻塞隊(duì)列
喚醒操作通常由另一個(gè)線程來操作寞宫,就像生產(chǎn)者-消費(fèi)者模式中,如果線程因?yàn)榈却M(fèi)而掛起拉鹃,那么當(dāng)生產(chǎn)者生產(chǎn)了一個(gè)東西后辈赋,會調(diào)用 signal 喚醒正在等待的線程來消費(fèi)。
// 喚醒等待了最久的線程
// 其實(shí)就是膏燕,將這個(gè)線程對應(yīng)的 node 從條件隊(duì)列轉(zhuǎn)移到阻塞隊(duì)列
public final void signal() {
// 調(diào)用 signal 方法的線程必須持有當(dāng)前的獨(dú)占鎖
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 從條件隊(duì)列隊(duì)頭往后遍歷钥屈,找出第一個(gè)需要轉(zhuǎn)移的 node
// 因?yàn)榍懊嫖覀冋f過,有些線程會取消排隊(duì)坝辫,但是可能還在隊(duì)列中
private void doSignal(Node first) {
do {
// 將 firstWaiter 指向 first 節(jié)點(diǎn)后面的第一個(gè)篷就,因?yàn)?first 節(jié)點(diǎn)馬上要離開了
// 如果將 first 移除后,后面沒有節(jié)點(diǎn)在等待了近忙,那么需要將 lastWaiter 置為 null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 因?yàn)?first 馬上要被移到阻塞隊(duì)列了竭业,和條件隊(duì)列的鏈接關(guān)系在這里斷掉
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
// 這里 while 循環(huán),如果 first 轉(zhuǎn)移不成功及舍,那么選擇 first 后面的第一個(gè)節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移未辆,依此類推
}
// 將節(jié)點(diǎn)從條件隊(duì)列轉(zhuǎn)移到阻塞隊(duì)列
// true 代表成功轉(zhuǎn)移
// false 代表在 signal 之前,節(jié)點(diǎn)已經(jīng)取消了
final boolean transferForSignal(Node node) {
// CAS 如果失敗锯玛,說明此 node 的 waitStatus 已不是 Node.CONDITION妙色,說明節(jié)點(diǎn)已經(jīng)取消屋灌,
// 既然已經(jīng)取消,也就不需要轉(zhuǎn)移了,方法返回吏饿,轉(zhuǎn)移后面一個(gè)節(jié)點(diǎn)
// 否則,將 waitStatus 置為 0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// enq(node): 自旋進(jìn)入阻塞隊(duì)列的隊(duì)尾
// 注意荐捻,這里的返回值 p 是 node 在阻塞隊(duì)列的前驅(qū)節(jié)點(diǎn)
Node p = enq(node);
int ws = p.waitStatus;
// ws > 0 說明 node 在阻塞隊(duì)列中的前驅(qū)節(jié)點(diǎn)取消了等待鎖堡僻,直接喚醒 node 對應(yīng)的線程。喚醒之后會怎么樣实撒,后面再解釋
// 如果 ws <= 0, 那么 compareAndSetWaitStatus 將會被調(diào)用姊途,節(jié)點(diǎn)入隊(duì)后涉瘾,需要把前驅(qū)節(jié)點(diǎn)的狀態(tài)設(shè)為 Node.SIGNAL(-1)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 如果前驅(qū)節(jié)點(diǎn)取消或者 CAS 失敗,會進(jìn)到這里喚醒線程捷兰,之后的操作看下一節(jié)
LockSupport.unpark(node.thread);
return true;
}
正常情況下立叛,ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 這句中,ws <= 0贡茅,而且 compareAndSetWaitStatus(p, ws, Node.SIGNAL) 會返回 true秘蛇,所以一般也不會進(jìn)去 if 語句塊中喚醒 node 對應(yīng)的線程。然后這個(gè)方法返回 true顶考,也就意味著 signal 方法結(jié)束了赁还,節(jié)點(diǎn)進(jìn)入了阻塞隊(duì)列。
假設(shè)發(fā)生了阻塞隊(duì)列中的前驅(qū)節(jié)點(diǎn)取消等待驹沿,或者 CAS 失敗艘策,只要喚醒線程,讓其進(jìn)到下一步即可渊季。
喚醒后檢查中斷狀態(tài)
上一步 signal 之后朋蔫,我們的線程由條件隊(duì)列轉(zhuǎn)移到了阻塞隊(duì)列,之后就準(zhǔn)備獲取鎖了却汉。只要重新獲取到鎖了以后驯妄,繼續(xù)往下執(zhí)行。
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 線程掛起
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
先解釋下 interruptMode合砂。interruptMode 可以取值為 REINTERRUPT(1)青扔,THROW_IE(-1),0
- REINTERRUPT: 代表 await 返回的時(shí)候翩伪,需要重新設(shè)置中斷狀態(tài)
- THROW_IE: 代表 await 返回的時(shí)候微猖,需要拋出 InterruptedException 異常
- 0 :說明在 await 期間,沒有發(fā)生中斷
有以下三種情況會讓 LockSupport.park(this); 這句返回繼續(xù)往下執(zhí)行:
- 常規(guī)路徑幻工。signal -> 轉(zhuǎn)移節(jié)點(diǎn)到阻塞隊(duì)列 -> 獲取了鎖(unpark)
- 線程中斷励两。在 park 的時(shí)候,另外一個(gè)線程對這個(gè)線程進(jìn)行了中斷
- signal 的時(shí)候我們說過囊颅,轉(zhuǎn)移以后的前驅(qū)節(jié)點(diǎn)取消了当悔,或者對前驅(qū)節(jié)點(diǎn)的CAS操作失敗了
線程喚醒后第一步是調(diào)用 checkInterruptWhileWaiting(node) 這個(gè)方法,此方法用于判斷是否在線程掛起期間發(fā)生了中斷踢代,如果發(fā)生了中斷盲憎,是 signal 調(diào)用之前中斷的,還是 signal 之后發(fā)生的中斷胳挎。
// 1. 如果在 signal 之前已經(jīng)中斷饼疙,返回 THROW_IE
// 2. 如果是 signal 之后中斷,返回 REINTERRUPT
// 3. 沒有發(fā)生中斷慕爬,返回 0
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
Thread.interrupted():如果當(dāng)前線程已經(jīng)處于中斷狀態(tài)窑眯,那么該方法返回 true屏积,同時(shí)將中斷狀態(tài)重置為 false,所以磅甩,才有后續(xù)的 重新中斷(REINTERRUPT) 的使用炊林。
看看怎么判斷是 signal 之前還是之后發(fā)生的中斷:
// 只有線程處于中斷狀態(tài),才會調(diào)用此方法
// 如果需要的話卷要,將這個(gè)已經(jīng)取消等待的節(jié)點(diǎn)轉(zhuǎn)移到阻塞隊(duì)列
// 返回 true:如果此線程在 signal 之前被取消渣聚,
final boolean transferAfterCancelledWait(Node node) {
// 用 CAS 將節(jié)點(diǎn)狀態(tài)設(shè)置為 0
// 如果這步 CAS 成功,說明是 signal 方法之前發(fā)生的中斷僧叉,因?yàn)槿绻?signal 先發(fā)生的話奕枝,signal 中會將 waitStatus 設(shè)置為 0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 將節(jié)點(diǎn)放入阻塞隊(duì)列
// 這里我們看到,即使中斷了瓶堕,依然會轉(zhuǎn)移到阻塞隊(duì)列
enq(node);
return true;
}
// 到這里是因?yàn)?CAS 失敗隘道,肯定是因?yàn)?signal 方法已經(jīng)將 waitStatus 設(shè)置為了 0
// signal 方法會將節(jié)點(diǎn)轉(zhuǎn)移到阻塞隊(duì)列,但是可能還沒完成捞烟,這邊自旋等待其完成
// 當(dāng)然薄声,這種事情還是比較少的吧:signal 調(diào)用之后当船,沒完成轉(zhuǎn)移之前题画,發(fā)生了中斷
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
即使發(fā)生了中斷,節(jié)點(diǎn)依然會轉(zhuǎn)移到阻塞隊(duì)列德频。
這里描繪了一個(gè)場景,本來有個(gè)線程苍息,它是排在條件隊(duì)列的后面的,但是因?yàn)樗恢袛嗔艘贾茫敲此鼤粏拘丫核迹缓笏l(fā)現(xiàn)自己不是被 signal 的那個(gè),但是它會自己主動去進(jìn)入到阻塞隊(duì)列钞护。
獲取獨(dú)占鎖
while 循環(huán)出來以后盖喷,下面是這段代碼:
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
由于 while 出來后,我們確定節(jié)點(diǎn)已經(jīng)進(jìn)入了阻塞隊(duì)列难咕,準(zhǔn)備獲取鎖课梳。
這里的 acquireQueued(node, savedState) 的第一個(gè)參數(shù) node 之前已經(jīng)經(jīng)過 enq(node) 進(jìn)入了隊(duì)列,參數(shù) savedState 是之前釋放鎖前的 state余佃,這個(gè)方法返回的時(shí)候暮刃,代表當(dāng)前線程獲取了鎖,而且 state == savedState了爆土。
前面我們說過椭懊,不管有沒有發(fā)生中斷,都會進(jìn)入到阻塞隊(duì)列步势,而 acquireQueued(node, savedState) 的返回值就是代表線程是否被中斷氧猬。如果返回 true背犯,說明被中斷了,而且 interruptMode != THROW_IE盅抚,說明在 signal 之前就發(fā)生中斷了媳板,這里將 interruptMode 設(shè)置為 REINTERRUPT,用于待會重新中斷泉哈。
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
這邊說說 node.nextWaiter != null 怎么滿足蛉幸。我前面也說了 signal 的時(shí)候會將節(jié)點(diǎn)轉(zhuǎn)移到阻塞隊(duì)列,有一步是 node.nextWaiter = null丛晦,將斷開節(jié)點(diǎn)和條件隊(duì)列的聯(lián)系奕纫。可是烫沙,在判斷發(fā)生中斷的情況下匹层,是 signal 之前還是之后發(fā)生的? 這部分的時(shí)候锌蓄,我也介紹了升筏,如果 signal 之前就中斷了,也需要將節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移到阻塞隊(duì)列瘸爽,這部分轉(zhuǎn)移的時(shí)候,是沒有設(shè)置 node.nextWaiter = null 的柑潦。之前我們說過享言,如果有節(jié)點(diǎn)取消,也會調(diào)用 unlinkCancelledWaiters 這個(gè)方法渗鬼,就是這里了览露。
處理中斷狀態(tài)
- 0:什么都不做差牛,沒有被中斷過;
- THROW_IE:await 方法拋出 InterruptedException 異常银择,因?yàn)樗碓?await() 期間發(fā)生了中斷夹孔;
- REINTERRUPT:重新中斷當(dāng)前線程,因?yàn)樗?await() 期間沒有被中斷,而是 signal() 以后發(fā)生的中斷
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
帶超時(shí)機(jī)制的 await
經(jīng)過前面的 7 步搭伤,整個(gè) ConditionObject 類基本上都分析完了只怎,接下來簡單分析下帶超時(shí)機(jī)制的 await 方法。
public final long awaitNanos(long nanosTimeout)
throws InterruptedException
public final boolean awaitUntil(Date deadline)
throws InterruptedException
public final boolean await(long time, TimeUnit unit)
throws InterruptedException
這三個(gè)方法都差不多怜俐,我們就挑一個(gè)出來看看吧:
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
// 等待這么多納秒
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
// 當(dāng)前時(shí)間 + 等待時(shí)長 = 過期時(shí)間
final long deadline = System.nanoTime() + nanosTimeout;
// 用于返回 await 是否超時(shí)
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 時(shí)間到啦
if (nanosTimeout <= 0L) {
// 這里因?yàn)橐?break 取消等待了身堡。取消等待的話一定要調(diào)用 transferAfterCancelledWait(node) 這個(gè)方法
// 如果這個(gè)方法返回 true,在這個(gè)方法內(nèi)拍鲤,將節(jié)點(diǎn)轉(zhuǎn)移到阻塞隊(duì)列成功
// 返回 false 的話贴谎,說明 signal 已經(jīng)發(fā)生,signal 方法將節(jié)點(diǎn)轉(zhuǎn)移了季稳。也就是說沒有超時(shí)嘛
timedout = transferAfterCancelledWait(node);
break;
}
// spinForTimeoutThreshold 的值是 1000 納秒擅这,也就是 1 毫秒
// 也就是說,如果不到 1 毫秒了景鼠,那就不要選擇 parkNanos 了仲翎,自旋的性能反而更好
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
// 得到剩余時(shí)間
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
AbstractQueuedSynchronizer 獨(dú)占鎖的取消排隊(duì)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
如果我們要取消一個(gè)線程的排隊(duì),我們需要在另外一個(gè)線程中對其進(jìn)行中斷铛漓。比如某線程調(diào)用 lock() 老久不返回溯香,我想中斷它。一旦對其進(jìn)行中斷浓恶,此線程會從 LockSupport.park(this); 中喚醒玫坛,然后 Thread.interrupted(); 返回 true。
我們發(fā)現(xiàn)一個(gè)問題问顷,即使是中斷喚醒了這個(gè)線程昂秃,也就只是設(shè)置了 interrupted = true 然后繼續(xù)下一次循環(huán)禀梳。而且杜窄,由于 Thread.interrupted(); 會清除中斷狀態(tài),第二次進(jìn) parkAndCheckInterrupt 的時(shí)候算途,返回會是 false塞耕。所以,我們要看到嘴瓤,在這個(gè)方法中扫外,interrupted 只是用來記錄是否發(fā)生了中斷,然后用于方法返回值廓脆,其他沒有做任何相關(guān)事情筛谚。
所以,我們看外層方法怎么處理 acquireQueued 返回 false 的情況停忿。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
所以說驾讲,lock() 方法處理中斷的方法就是,你中斷歸中斷,我搶鎖還是照樣搶鎖吮铭,幾乎沒關(guān)系时迫,只是我搶到鎖了以后,設(shè)置線程的中斷狀態(tài)而已谓晌,也不拋出任何異常出來掠拳。調(diào)用者獲取鎖后,可以去檢查是否發(fā)生過中斷纸肉,也可以不理會溺欧。
我們來看 ReentrantLock 的另一個(gè) lock 方法:
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 就是這里了,一旦異常柏肪,馬上結(jié)束這個(gè)方法胧奔,拋出異常。
// 這里不再只是標(biāo)記這個(gè)方法的返回值代表中斷狀態(tài)
// 而是直接拋出異常预吆,而且外層也不捕獲龙填,一直往外拋到 lockInterruptibly
throw new InterruptedException();
}
} finally {
// 如果通過 InterruptedException 異常出去,那么 failed 就是 true 了
if (failed)
cancelAcquire(node);
}
}
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 就是這里了拐叉,一旦異常岩遗,馬上結(jié)束這個(gè)方法,拋出異常凤瘦。
// 這里不再只是標(biāo)記這個(gè)方法的返回值代表中斷狀態(tài)
// 而是直接拋出異常宿礁,而且外層也不捕獲,一直往外拋到 lockInterruptibly
throw new InterruptedException();
}
} finally {
// 如果通過 InterruptedException 異常出去蔬芥,那么 failed 就是 true 了
if (failed)
cancelAcquire(node);
}
}
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
// 找一個(gè)合適的前驅(qū)梆靖。其實(shí)就是將它前面的隊(duì)列中已經(jīng)取消的節(jié)點(diǎn)都”請出去“
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
線程中斷
首先,我們要明白笔诵,中斷不是類似 linux 里面的命令 kill -9 pid返吻,不是說我們中斷某個(gè)線程,這個(gè)線程就停止運(yùn)行了乎婿。中斷代表線程狀態(tài)测僵,每個(gè)線程都關(guān)聯(lián)了一個(gè)中斷狀態(tài),是一個(gè) true 或 false 的 boolean 值谢翎,初始值為 false捍靠。
// Thread 類中的實(shí)例方法,持有線程實(shí)例引用即可檢測線程中斷狀態(tài)
public boolean isInterrupted() {}
// Thread 中的靜態(tài)方法森逮,檢測調(diào)用這個(gè)方法的線程是否已經(jīng)中斷
// 注意:這個(gè)方法返回中斷狀態(tài)的同時(shí)榨婆,會將此線程的中斷狀態(tài)重置為 false
// 所以,如果我們連續(xù)調(diào)用兩次這個(gè)方法的話褒侧,第二次的返回值肯定就是 false 了
public static boolean interrupted() {}
// Thread 類中的實(shí)例方法良风,用于設(shè)置一個(gè)線程的中斷狀態(tài)為 true
public void interrupt() {}
我們說中斷一個(gè)線程颜武,其實(shí)就是設(shè)置了線程的 interrupted status 為 true,至于說被中斷的線程怎么處理這個(gè)狀態(tài)拖吼,那是那個(gè)線程自己的事鳞上。
如果線程處于以下三種情況,那么當(dāng)線程被中斷的時(shí)候吊档,能自動感知到:
來自 Object 類的 wait()篙议、wait(long)、wait(long, int)怠硼,
來自 Thread 類的 join()鬼贱、join(long)、join(long, int)香璃、sleep(long)这难、sleep(long, int) 這幾個(gè)方法的相同之處是,方法上都有: throws InterruptedException,如果線程阻塞在這些方法上(我們知道葡秒,這些方法會讓當(dāng)前線程阻塞)姻乓,這個(gè)時(shí)候如果其他線程對這個(gè)線程進(jìn)行了中斷,那么這個(gè)線程會從這些方法中立即返回眯牧,拋出 InterruptedException 異常蹋岩,同時(shí)重置中斷狀態(tài)為 false。
- Selector 中的 select 方法 一旦中斷学少,方法立即返回
因?yàn)樗麄兡茏詣痈兄街袛啵ㄟ@里說自動剪个,當(dāng)然也是基于底層實(shí)現(xiàn)),并且在做出相應(yīng)的操作后都會重置中斷狀態(tài)為 false版确。
那是不是只有以上 幾種方法能自動感知到中斷呢扣囊?不是的,如果線程阻塞在 LockSupport.park(Object obj) 方法绒疗,也叫掛起侵歇,這個(gè)時(shí)候的中斷也會導(dǎo)致線程喚醒,但是喚醒后不會重置中斷狀態(tài)忌堂,所以喚醒后去檢測中斷狀態(tài)將是 true盒至。
InterruptedException
它是一個(gè)特殊的異常,不是說 JVM 對其有特殊的處理士修,而是它的使用場景比較特殊。通常樱衷,我們可以看到棋嘲,像 Object 中的 wait() 方法,ReentrantLock 中的 lockInterruptibly() 方法矩桂,Thread 中的 sleep() 方法等等沸移,這些方法都帶有 throws InterruptedException,我們通常稱這些方法為阻塞方法(blocking method)。
阻塞方法一個(gè)很明顯的特征是雹锣,它們需要花費(fèi)比較長的時(shí)間(不是絕對的网沾,只是說明時(shí)間不可控),還有它們的方法結(jié)束返回往往依賴于外部條件蕊爵,如 wait 方法依賴于其他線程的 notify辉哥,lock 方法依賴于其他線程的 unlock等等。
當(dāng)我們看到方法上帶有 throws InterruptedException 時(shí)攒射,我們就要知道醋旦,這個(gè)方法應(yīng)該是阻塞方法,我們?nèi)绻M茉琰c(diǎn)返回的話会放,我們往往可以通過中斷來實(shí)現(xiàn)饲齐。
除了幾個(gè)特殊類(如 Object,Thread等)外咧最,感知中斷并提前返回是通過輪詢中斷狀態(tài)來實(shí)現(xiàn)的捂人。我們自己需要寫可中斷的方法的時(shí)候,就是通過在合適的時(shí)機(jī)(通常在循環(huán)的開始處)去判斷線程的中斷狀態(tài)矢沿,然后做相應(yīng)的操作(通常是方法直接返回或者拋出異常)先慷。當(dāng)然,我們也要看到咨察,如果我們一次循環(huán)花的時(shí)間比較長的話论熙,那么就需要比較長的時(shí)間才能感知到線程中斷了。
處理中斷
一旦中斷發(fā)生摄狱,我們接收到了這個(gè)信息脓诡,然后怎么去處理中斷呢?
我們經(jīng)常會這么寫代碼:
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// ignore
}
// go on
當(dāng) sleep 結(jié)束繼續(xù)往下執(zhí)行的時(shí)候媒役,我們往往都不知道這塊代碼是真的 sleep 了 10 秒祝谚,還是只休眠了 1 秒就被中斷了。這個(gè)代碼的問題在于酣衷,我們將這個(gè)異常信息吞掉了交惯。(對于 sleep 方法,我相信大部分情況下穿仪,我們都不在意是否是中斷了席爽,這里是舉例)
AQS 的做法很值得我們借鑒,我們知道 ReentrantLock 有兩種 lock 方法:
public void lock() {
sync.lock();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
前面我們提到過啊片,lock() 方法不響應(yīng)中斷只锻。如果 thread1 調(diào)用了 lock() 方法,過了很久還沒搶到鎖紫谷,這個(gè)時(shí)候 thread2 對其進(jìn)行了中斷齐饮,thread1 是不響應(yīng)這個(gè)請求的捐寥,它會繼續(xù)搶鎖,當(dāng)然它不會把“被中斷”這個(gè)信息扔掉祖驱。我們可以看以下代碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 我們看到握恳,這里也沒做任何特殊處理,就是記錄下來中斷狀態(tài)捺僻。
// 這樣乡洼,如果外層方法需要去檢測的時(shí)候,至少我們沒有把這個(gè)信息丟了
selfInterrupt();// Thread.currentThread().interrupt();
}
而對于 lockInterruptibly() 方法陵像,因?yàn)槠浞椒ㄉ厦嬗?throws InterruptedException 就珠,這個(gè)信號告訴我們,如果我們要取消線程搶鎖醒颖,直接中斷這個(gè)線程即可妻怎,它會立即返回,拋出 InterruptedException 異常泞歉。
在并發(fā)包中逼侦,有非常多的這種處理中斷的例子,提供兩個(gè)方法腰耙,分別為響應(yīng)中斷和不響應(yīng)中斷榛丢,對于不響應(yīng)中斷的方法,記錄中斷而不是丟失這個(gè)信息挺庞。如 Condition 中的兩個(gè)方法就是這樣的:
void await() throws InterruptedException;
void awaitUninterruptibly();
通常晰赞,如果方法會拋出 InterruptedException 異常,往往方法體的第一句就是:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
......
}