經過了前兩篇AbstractQueuedSynchronizer源碼分析- ReentrantLock搶鎖解鎖, AbstractQueuedSynchronizer源碼分析- CountDownLatch分析,我么對AQS已經學習了差不多了, 但是還有一些細節(jié)我們沒有進行分析, 如ReentrantLock 公平鎖和非公平鎖的區(qū)別, AQS如何綁定Condition,實現(xiàn)條件喚醒, 線程中斷的細節(jié), AQS無處不在的InterruptedException.下面我們主要圍繞以下幾點進行分析
1. 深入理解 ReentrantLock 公平鎖和非公平鎖的區(qū)別
2. 深入分析 AbstractQueuedSynchronizer 中的 Condition
3. 了解 Java 線程中斷
4. 了解InterruptedException 異常
公平鎖與非公平鎖
眾所周知synchronizer與ReentrantLock有一個很大的區(qū)別就是synchronizer關鍵字是非公平鎖, 而ReentrantLock可實現(xiàn)公平鎖與非公平鎖,根據(jù)前兩篇文章的學習, 我們不妨先不進行源碼分析, 先想一下,如果是你設置ReentrantLock公平鎖與非公平鎖, 你會怎樣設計.
創(chuàng)建
ReentrantLock 默認采用非公平鎖茂缚,除非你在構造方法中傳入參數(shù) true 网持。
public ReentrantLock() {
// 默認非公平鎖
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
加鎖
// 公平鎖
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 1. 和非公平鎖相比,這里多了一個判斷:是否有線程在等待
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 非公平鎖
final void lock() {
// 2. 和公平鎖相比,這里會直接先進行一次CAS夕土,成功就返回了
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
分析
- 非公平鎖搶鎖,先CAS 搶鎖,成功返回true, 失敗才進行acquire(1)操作.
- 非公平鎖在 CAS 失敗后,進入tryAcquire(1), 如果鎖這個時候被釋放了(state == 0)阶淘,非公平鎖會直接 CAS 搶鎖,但是公平鎖會判斷隊列是否有線程處于等待狀態(tài)互妓,如果有則不去搶鎖溪窒,進行入隊操作.
- 非公平鎖的優(yōu)點是性能比公平鎖好, 吞吐量增加, 缺點是如果一直有新的線程進行搶鎖可能會導致隊列中的線程一直處于別喚醒狀態(tài).
Condition代碼演示
// 資源類, 綁定Condition, 實現(xiàn)條件喚醒
public class ConditionResource {
private int number = 1;
ReentrantLock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
public void printA() {
try {
lock.lock();
while (number != 1) {
condition1.await();
}
System.out.println(Thread.currentThread().getName() + "工作, 下一個B");
number = 2;
condition2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB() {
try {
lock.lock();
while (number != 2) {
condition2.await();
}
System.out.println(Thread.currentThread().getName() + "工作, 下一個C");
number = 3;
condition3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC() {
try {
lock.lock();
while (number != 3) {
condition3.await();
}
System.out.println(Thread.currentThread().getName() + "工作, 下一個A");
number = 1;
condition1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
// 測試類, 創(chuàng)建三個線程, 對上面資源類各打印5次
@Test
public void testCondition() throws InterruptedException {
ConditionResource resource = new ConditionResource();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
resource.printA();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
resource.printB();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
resource.printC();
}
}, "C").start();
}
Condition分析之前, 我們先引入一個新的概念, 條件隊列. Condition創(chuàng)建就是創(chuàng)建一個條件隊列
Condition創(chuàng)建
// lock.newCondition() 實際上是創(chuàng)建ConditionObject對象
public Condition newCondition() {
return sync.newCondition();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
ConditionObject屬性
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// 條件隊列的第一個節(jié)點
private transient Node firstWaiter;
// 條件隊列的最后一個節(jié)點
private transient Node lastWaiter;
......
}
await操作
public final void await() throws InterruptedExczeption {
if (Thread.interrupted())
throw new InterruptedException();
// 1. 添加到 condition 的條件隊列中
Node node = addConditionWaiter();
// 2. 完全釋放獨占鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
// 3. 將node轉移到阻塞隊列, 返回true, 轉移成功, 返回false
while (!isOnSyncQueue(node)) {
// 線程掛起
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 被喚醒后,將進入阻塞隊列冯勉,等待獲取鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
//1, 返回值為lastWaiter
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 如果lastWaiter被取消澈蚌,請清理,
// (Node.CONDITION = -2)取消排隊
if (t != null && t.waitStatus != Node.CONDITION) {
// 1.1 清除取消排隊的WaitStatus
unlinkCancelledWaiters();
t = lastWaiter;
}
// 創(chuàng)建node,waitStatus指定為Node.CONDITION = -2
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 條件隊列的最后一個節(jié)點為null, 標示隊列為空
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 1.1
// 從條件隊列隊頭開始遍歷鏈表,若 t.waitStatus = -2, t的上一個節(jié)點的下一個節(jié)點指向t的下一個節(jié)點
//t.waitStatus != -2, t上一個節(jié)點的下一個節(jié)點指向t節(jié)點, 若t為null, t上一個節(jié)點設值為條件隊列最后一個節(jié)點
private void unlinkCancelledWaiters() {
// 條件隊列的第一個節(jié)點
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
// 2. 完全釋放獨占鎖
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 2.1 喚醒頭節(jié)點的下一個
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
// 若執(zhí)行到這里, 標示喚醒下一個節(jié)點失敗, 或者異常, 將這個節(jié)點的waitStatus 設置1,取消排隊
node.waitStatus = Node.CANCELLED;
}
}
// 2.1
public final boolean release(int arg) {
// 2.1.1
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 2.1.2 喚醒阻塞隊列中頭節(jié)點的下一個節(jié)點線程
unparkSuccessor(h);
return true;
}
return false;
}
//2.1.1 設置當前線程為null, 將state設置為0, 返回true
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);// 設置state為0, 完全釋放獨占鎖
return true;
}
// 3.判斷是否已經轉移到阻塞隊列
final boolean isOnSyncQueue(Node node) {
// 3.1 移動過去的時候,node 的 waitStatus 會置為 0灼狰,這個之后在說 signal 方法的時候會說到
// 如果 waitStatus 還是 Node.CONDITION宛瞄,也就是 -2,那肯定就是還在條件隊列中
// 如果 node 的前驅 prev 指向還是 null交胚,說明肯定沒有在阻塞隊列(prev是阻塞隊列鏈表中使用的)
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 3.2 如果 node 已經有后繼節(jié)點 next 的時候份汗,那肯定是在阻塞隊列了
if (node.next != null)
return true;
// 3.3 從阻塞隊列的隊尾往前遍歷,如果找到蝴簇,返回 true
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
signal操作
public final void signal() {
// 1,如果state == 0 拋異常, 調用 signal 方法的線程必須持有當前的獨占鎖
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 2, 執(zhí)行喚醒
doSignal(first);
}
// 1
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 2 從條件隊列隊頭往后遍歷裸影,找出第一個需要轉移的 node
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// 將 waitStatus 置為 0
// CAS 如果失敗,說明此 node 的 waitStatus 已不是 Node.CONDITION军熏,說明節(jié)點已經取消,轉移后面一個節(jié)點
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 自旋進入阻塞隊列的隊尾, 之前有分析
// p是node的前驅節(jié)點
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 前驅節(jié)點ws > 0, 取消排隊, 或者設置ws為-1失敗喚醒node節(jié)點線程
LockSupport.unpark(node.thread);
return true;
}
signal操作是喚醒線程, 轉移到阻塞隊列, 線程轉移到阻塞隊列之后, 當前線程執(zhí)行完畢, 釋放鎖之后, 會喚醒調用signal的線程.開始執(zhí)行await操作中LockSupport.park(this);后面的程序.
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 接著這里往下執(zhí)行
// 1 等待時檢查中斷
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break;
}
// 2. 這個方法返回的時候卷扮,代表當前線程獲取了鎖荡澎,而且 state == savedState了 && interruptMode != THROW_IE,說明在 signal 之前就發(fā)生中斷了晤锹,這里將 interruptMode 設置為 REINTERRUPT摩幔,用于待會重新中斷
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 3. 因為可能存在signal前發(fā)生的中斷, 需要清理數(shù)據(jù)
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 4.
// 0:什么都不做,沒有被中斷過鞭铆;
// THROW_IE:await 方法拋出 InterruptedException 異常或衡,因為它代表在 await() 期間發(fā)生了中斷焦影;
// REINTERRUPT:重新中斷當前線程,因為它代表 await() 期間沒有被中斷封断,而是 signal() 以后發(fā)生的中斷
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
// 等待時檢查中斷,
private int checkInterruptWhileWaiting(Node node) {
// 1.1 Thread.interrupted() 如果當前線程已經處于中斷狀態(tài)斯辰,那么該方法返回 true,同時將中斷狀態(tài)重置為 false
// REINTERRUPT: 代表 await 返回的時候坡疼,需要重新設置中斷狀態(tài)
//THROW_IE: 代表 await 返回的時候彬呻,需要拋出 InterruptedException 異常
return Thread.interrupted() ?
// 1.2 結果為true, 線程在掛起的時候被中斷過, 執(zhí)行transferAfterCancelledWait(node)
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// 1.2
// 在signa前發(fā)生的中斷, 返回true, 此時interruptMode = THROW_IE
// 在signa后發(fā)生的中斷, 返回false, 此時interruptMode = REINTERRUPT
final boolean transferAfterCancelledWait(Node node) {
// 用 CAS 將節(jié)點狀態(tài)設置為 0
// 如果這步 CAS 成功,說明是 signal 前發(fā)生的中斷柄瑰,因為如果 signal 先發(fā)生的話闸氮,signal 中會將 waitStatus 設置為 0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 設置成功, 轉移到阻塞隊列
enq(node);
return true;
}
// 到這里是因為 signal 方法已經將 waitStatus 設置為了 0
// signal 方法會將節(jié)點轉移到阻塞隊列,但是可能還沒完成教沾,這邊自旋等待其完成
// 當然蒲跨,這種事情還是比較少的吧:signal 調用之后,沒完成轉移之前授翻,發(fā)生了中斷
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
// 4.
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
看到這里, 就將Condition看完了, Condition中還有一下帶超時機制的 await, 不拋異常的await, signalAll這些方法, 但原理與await, signal相差不大, 就不一一分析了.
Java 線程中斷
Thread類中線程中斷相關的幾個方法
// Thread 類中的實例方法或悲,持有線程實例引用即可檢測線程中斷狀態(tài)
public boolean isInterrupted() {}
// Thread 中的靜態(tài)方法,檢測調用這個方法的線程是否已經中斷
// 如果當前線程已經處于中斷狀態(tài)藏姐,那么該方法返回 true隆箩,同時將中斷狀態(tài)重置為 false
public static boolean interrupted() {}
// Thread 類中的實例方法,用于設置一個線程的中斷狀態(tài)為 true
public void interrupt() {}
InterruptedException 異常
當阻塞方法收到中斷請求的時候就會拋出InterruptedException異常