基于Java 8進行源碼分析
1.ReentrantLock
1.1. lock()
// 非公平鎖
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// 公平鎖
final void lock() {
acquire(1);
}
1.2. acquire(int)
這個方法是一個綜合操作,目的是為了讓線程獲取到鎖方库。大致邏輯:
1.嘗試讓當前線程獲取鎖南捂,如果成功直接返回腺晾,如果失敗就執(zhí)行下一步居暖。
2.將沒有獲取到鎖的線程封裝為節(jié)點存儲在阻塞隊列中米诉,新來的都呆在隊列的后面币狠。
3.如果當前線程位于head.next
的位置游两,嘗試獲取鎖,獲取到就直接返回漩绵,如果獲取不到就和隊列中其他位置的線程一樣進入阻塞狀態(tài)贱案。
4.當阻塞的線程被喚醒時(例如unlock()
被調(diào)用),然后再次重復上一步渐行。(unlock()
只會讓一個線程退出阻塞狀態(tài)轰坊,不會導致驚群)其中還有一些對中斷邏輯的處理
*一般情況下,如果線程進入阻塞狀態(tài)前祟印,中斷標志就已經(jīng)為true肴沫,那么中斷的邏輯在進入阻塞狀態(tài)后就會被觸發(fā)(因為只有進入Safepoint,線程才能感知到中斷標志的變化)蕴忆。
*那么在lock()方法的調(diào)用過程中颤芬,如果對隊列中沒能獲取到獲取到鎖的線程進行中斷的話,在它進入阻塞狀態(tài)后,AQS會感知到這個操作并暫時清除中斷標志站蝠,在目標線程成功獲取到鎖后汰具,AQS會恢復中斷標志為true,方便用戶自己處理中斷的邏輯菱魔。
*AQS暫時清除中斷標志的原因是為了能讓AQS內(nèi)的自選邏輯繼續(xù)執(zhí)行留荔,這是lock()
的執(zhí)行邏輯;如果使用的是lockInterruptibly()
的話澜倦,在這個暫時清除中斷標志
的邏輯中聚蝶,會直接拋出中斷的異常。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // addWaiter里藻治,設(shè)置的默認waitStatus為null
selfInterrupt();
}
1.2.1. tryAcquire(int)
tryAcquire(int)
方法的邏輯比較簡單碘勉,主要行為是嘗試一次獲取鎖,公平鎖和非公平鎖的區(qū)別在于:
公平鎖在獲取鎖之前會看看有沒有線程比它等待時間久桩卵;非公平鎖則是直接嘗試獲取验靡。
// 非公平鎖的實現(xiàn)
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) { // 差別點
setExclusiveOwnerThread(current);
return true;
}
}
// 這是對重入的操作
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 公平鎖的實現(xiàn)
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) { // 差別點
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
1.2.2. addWaiter(Node.EXCLUSIVE)
這個方法的目的是:對于沒/不能插隊的lock操作,將它對應的線程添加到AQS的阻塞隊列中雏节,等待合適的執(zhí)行時機胜嗓。
其中一共有兩個行為: (
阻塞隊列
是一個雙向鏈表)
1.初始阻塞隊列
這里會創(chuàng)建一個既是head又是tail的空內(nèi)容節(jié)點
2.向阻塞隊列尾部鏈接新節(jié)點
// AbstractQueuedSynchronizer#acquire(int) 中的代碼片段
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
// AbstractQueuedSynchronizer
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 新建一個節(jié)點,當前發(fā)起lock操作的線程作為新節(jié)點
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 如果尾部節(jié)點不為null矾屯,把新建的節(jié)點鏈接到隊列當前尾部節(jié)點后面兼蕊。
if (pred != null) {
node.prev = pred;
// 嘗試完成雙向鏈接行為。
// 通過CAS操作保證并發(fā)時的結(jié)果正確件蚕,如果CAS操作失敗會進入下面的enq(node)方法孙技。
// 代表節(jié)點由于并發(fā)問題,沒能成功添加到隊列中排作。
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 通過自旋牵啦,把當前節(jié)點添加到阻塞隊列的尾部
enq(node);
return node;
}
// AbstractQueuedSynchronizer
private Node enq(final Node node) {
for (;;) {
Node t = tail; // tail是被volatile修飾的
if (t == null) { // Must initialize
// 初始化隊列,把一個空內(nèi)容的Node作為隊列的head和tail
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 再次嘗試將當前新建的節(jié)點鏈接到隊列當前尾部節(jié)點后面妄痪。
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t; // 這里返回的是當前節(jié)點的前一個節(jié)點
}
}
}
}
1.2.3. acquireQueued(Node, int)
1.讓隊列中位于head.next的線程嘗試獲取鎖哈雏。
2.確認當前節(jié)點是不是應該進入阻塞狀態(tài)
如果在第1步中的節(jié)點獲取到了鎖,那這個節(jié)點就不用走第2步衫生。
對于獲取到鎖的線程裳瘪,會把這個線程對應的節(jié)點作為哨兵,變相相當于把它清除出阻塞隊列了罪针。
Thread 的
interrupt
盒蟆、interrupted
棒仍、isinterrupted
interrupt
: 設(shè)置當前線程的中斷標志為true徘钥,不會立即執(zhí)行中斷,只有在Safepoint時線程才會知道中斷標志的值是什么还最。
interrupted
: 檢測當前線程的中斷標志,同時把值設(shè)置為false毡惜。 (這個是Thread的靜態(tài)方法)
isinterrupted
: 檢測當前線程的中斷標志拓轻。
LockSupport API文檔
LockSupport.park(this)
的作用是讓當前線程進入阻塞狀態(tài),但不會釋放當前線程占有的鎖資源经伙,如果檢測到中斷扶叉,會立即退出阻塞狀態(tài)。
LockSupport.unpark(Thread)
可以讓指定線程退出阻塞狀態(tài)橱乱。
// AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) { // 在上文中可以看出arg的值為1
boolean failed = true; // 是否沒有獲取到鎖辜梳,true代表沒獲取到
try {
boolean interrupted = false;
// 下面這些行為是對每個線程都有效的粱甫,相當于節(jié)點的自旋自檢
for (;;) {
final Node p = node.predecessor(); // 嘗試獲取當前節(jié)點的上一個節(jié)點泳叠,如果為null會導致拋出異常
// 1. 如果當前節(jié)點的上一個節(jié)點是head
// 2. 當前嘗試一次獲取鎖 (獲取到會返回true,失敗會返回false)
// 同時滿足上面兩點才會進入if代碼塊茶宵,這也意味著只有位于這個head后的節(jié)點才會嘗試去獲取鎖危纫,
// 如果if判斷成立,則代表當前線程成功獲取到了鎖乌庶。
if (p == head && tryAcquire(arg)) {
setHead(node); // 把當前獲取到鎖的節(jié)點作為head 并 清除它的prev和thread屬性值种蝶,變相相當于清除出隊列了
p.next = null; // help GC
failed = false;
return interrupted;
}
// 確定當前線程是否應該被阻塞
// 如果當前線程對應的等待狀態(tài)為0,那下次獲取鎖失敗后才會被阻塞瞒大,并且等待狀態(tài)會被設(shè)置為-1螃征。
// 如果當前線程對應的等待狀態(tài)為-1,那線程直接阻塞透敌。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果進入這里盯滚,代表當前線程的中斷標志為true
interrupted = true;
}
} finally {
if (failed) // 在獲取鎖的過程中出現(xiàn)異常,導致當前線程沒有獲取到鎖酗电,但自旋中斷了
cancelAcquire(node); // 綜合邏輯中意味著將當前節(jié)點移出阻塞隊列
}
}
// AbstractQueuedSynchronizer
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//上一個節(jié)點的等待狀態(tài)魄藕,這個waitStatus和Condition的操作有關(guān)。
int ws = pred.waitStatus;
// 經(jīng)過下面的處理撵术,可以看出來這樣的一個邏輯背率,上一個節(jié)點的等待狀態(tài)決定了當前節(jié)點是否需要被阻塞。
// 如果是SIGNAL嫩与,也就是-1的話寝姿,當前線程進入阻塞狀態(tài),類似sleep()划滋,都不會釋放線程所占有的鎖資源
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 如果上一個節(jié)點的waitStatus大于0饵筑,就不斷向前尋找,直至找到一個等待狀態(tài)小于等于0的節(jié)點古毛,
// 然后在這個節(jié)點和當前節(jié)點之間建立雙向鏈接翻翩,整個過程意味著把兩者之間waitStatus > 0的節(jié)點清除出阻塞隊列了都许。
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 多線程操作下也不會出現(xiàn)問題,因為每個節(jié)點都只會找自己之前的第一個 waitStatus 小于等于0的數(shù)據(jù)
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 額外情況(例如為null)嫂冻,就把上一個節(jié)點的等待狀態(tài)設(shè)置為SIGNAL胶征,也就是-1
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// AbstractQueuedSynchronizer
private final boolean parkAndCheckInterrupt() {
// 阻塞當前線程,但不會釋放當前線程占有的鎖資源
LockSupport.park(this);
// 檢測當前線程的中斷標志桨仿,并清除該標志睛低,如果不清空會導致下次自旋時無法進入阻塞狀態(tài)。
// 如果線程在進入阻塞狀態(tài)前就已經(jīng)被標記為中斷狀態(tài)的話服傍,阻塞行為會立即中止執(zhí)行這里的return操作钱雷。
return Thread.interrupted();
}
waitStatus | 值 | 代表含義 |
---|---|---|
PROPAGATE | -3 | 當前節(jié)點后續(xù)的共享鎖可以獲取 |
CONDITION | -2 | 當前節(jié)點在condition隊列中 |
SIGNAL | -1 | 當前節(jié)點的next需要unpark |
未命名 | 0 | 當前節(jié)點剛進入阻塞隊列,還沒有進入阻塞狀態(tài) |
CANCELLED | 1 | 當前的節(jié)點被取消 |
1.3. unlock()
讓當前線程的下一位的線程退出阻塞狀態(tài)吹零,并不會引起驚群罩抗。
public void unlock() {
sync.release(1); // 釋放1層鎖,也就是讓state減1
}
public final boolean release(int arg) {
if (tryRelease(arg)) { // 嘗試釋放指定層鎖灿椅,只有當state為0是才會進入if代碼塊
Node h = head;
if (h != null && h.waitStatus != 0) // 在本文中展示的調(diào)用鏈里套蒂,進這里的只有-1
unparkSuccessor(h); // 對head.next位置上的線程進行unpark
return true;
}
return false;
}
2. Condition
這東西和
Object#wait()
、Object#notify
的用法差不多
// 使用示例
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
condition.await();
condition.signal();
condition.signalAll();
}
await()
或者wait()
要確保在獲取到鎖的代碼塊內(nèi)編寫茫蛹,否則會報錯操刀。
2.1. await()
簡單的過一遍代碼,可以發(fā)現(xiàn)婴洼,
await()
會把當前線程放到條件隊列中骨坑,同時會釋放持有的鎖資源,然后把線程阻塞住柬采。
// AbstractQueuedSynchronizer
public final void await() throws InterruptedException {
if (Thread.interrupted()) // 對中斷的先行檢測
throw new InterruptedException();
// 把當前線程加入到條件隊列中欢唾,其中waitStatus默認為-2,之前在說lock()的邏輯時警没,那里面存的是null
// 條件隊列不像阻塞隊列匈辱,它里面是沒有哨兵的。
Node node = addConditionWaiter();
// 釋放當前線程掌握的鎖資源
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 如果不在阻塞隊列上
LockSupport.park(this); // 阻塞當前線程
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// ... 省略部分代碼
}
2.2. signal() & signalAll()
可以看出來這兩個方法的邏輯是相似的杀迹,都是在調(diào)用時把條件隊列中的節(jié)點從前到后的移動到阻塞隊列中亡脸,只不過前者只移一個,后者全部移過去树酪。
在常規(guī)使用中浅碾,這兩個方法在調(diào)用過程中并不會觸發(fā)unpark
方法,也就是說不會讓線程退出阻塞狀態(tài)续语,想要讓線程退出阻塞狀態(tài)垂谢,還得靠unlock()
方法。
// signal()
// AbstractQueuedSynchronizer
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter; // 獲取條件隊列上的第一個元素
if (first != null)
doSignal(first);
}
// AbstractQueuedSynchronizer
private void doSignal(Node first) {
// 代碼塊里面的邏輯: 把條件隊列中的第一個節(jié)點從條件隊列中移除疮茄,并添加到阻塞隊列中滥朱,
// 并且把自己的waitStatus從-2改為0根暑,把在阻塞隊列中前一位的waitStatus改為-1
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// AbstractQueuedSynchronizer
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 把waitStatus從-2變?yōu)?
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node); // 把當前的節(jié)點放入到阻塞隊列中,返回值是原來的tail徙邻,不是當前節(jié)點排嫌,別搞錯
int ws = p.waitStatus;
// 嘗試把前一個節(jié)點的waitStatus修改為-1,普遍都能執(zhí)行成功
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 一般情況下,不會執(zhí)行if里面的代碼
LockSupport.unpark(node.thread);
return true;
}
// signalAll()
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
// 代碼塊里面的邏輯: 把條件隊列中的全部節(jié)點從條件隊列中移除缰犁,并添加到阻塞隊列中淳地,
// 并且把自己的waitStatus從-2改為0,把在阻塞隊列中前一位的waitStatus改為-1
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
公平鎖 & 非公平鎖
非公平鎖在調(diào)用
lock()
方法時帅容,會優(yōu)先進行CAS操作颇象;公平鎖只有在隊列中沒有比它等得久的線程時才會進行CAS操作嘗試獲取一次鎖。
一旦線程進入到阻塞隊列并徘,那兩者就沒什么區(qū)別遣钳。
非公平鎖形象點描述就是:你剛來的時候可以插隊,但如果沒插隊成功饮亏,到了等待隊伍里耍贾,那就乖乖的排隊就行了。
可中斷鎖
lockInterruptibly()
和lock()
的邏輯很相似路幸,起碼核心功能代碼是很相似的,多了對中斷異常的拋出付翁。
見AbstractQueuedSynchronizer#doAcquireInterruptibly(int)
見AbstractQueuedSynchronizer#acquireInterruptibly(int)
太簡單了简肴,這里就不貼代碼了
ReentrantLock 條件
條件隊列內(nèi)的節(jié)點是有序的,在喚醒的操作中百侧,條件隊列內(nèi)的數(shù)據(jù)也是有序的轉(zhuǎn)移到阻塞隊列砰识,所以
signal
或者signal
能夠按向條件隊列內(nèi)添加的順序執(zhí)行。