如下代碼沮翔,當我們在使用 ReentrantLock 進行加鎖和解鎖時赴肚,底層到底是如何幫助我們進行控制的啦蛙酪?
static Lock lock = new ReentrantLock();
public static void main(String[] args) {
// 使用兩個線程模擬多線程執(zhí)行并發(fā)
new Thread(() -> doBusiness(), "Thread-1").start();
new Thread(() -> doBusiness(), "Thread-2").start();
}
private static void doBusiness() {
try {
lock.lock();
System.out.println("需要加鎖的業(yè)務處理代碼避除,防止并發(fā)異常");
} finally {
lock.unlock();
}
}
帶著這樣的疑問怎披,我們先后跟進 lock()和unlock() 源碼一探究竟
說明:
1、在進行查看 ReentrantLock 進行 lock() 加鎖和 unlock() 解鎖源碼時瓶摆,需要知道 LockSupport 類凉逛、了解自旋鎖以及鏈表相關(guān)知識。
2群井、在分析過程中状飞,假設第一個線程獲取到鎖的時候執(zhí)行代碼需要很長時間才釋放鎖,及在第二個第三個線程來獲取鎖的時候,第一個線程并沒有執(zhí)行完成诬辈,沒有釋放鎖資源酵使。
3、在分析過程中自晰,我們假設第一個線程就是最先進來獲取鎖的線程凝化,那么第二個第三個線程也是依次進入的稍坯,不會存在第三個線程先于第二個線程(即第三個線程如果先于第二個線程發(fā)生酬荞,那么第三個線程就是我們下面描述的第二個線程)
一、 lock() 方法
1瞧哟、查看lock()方法源碼
public void lock() {
sync.lock();
}
從上面可以看出 ReentrantLock 的 lock() 方法調(diào)用的是 sync 這個對象的 lock() 方法混巧,而 Sync 就是一個實現(xiàn)了抽象類AQS(AbstractQueuedSynchronizer) 抽象隊列同步器的一個子類 ,繼續(xù)跟進代碼(說明:ReentrantLock 分為公平鎖和非公平鎖勤揩,如果無參構(gòu)造器創(chuàng)建鎖默認是非公平鎖咧党,我們按照非公平鎖的代碼來講解)
1.1 關(guān)于Sync子類的源碼
abstract static class Sync extends AbstractQueuedSynchronizer {
// 此處省略具體實現(xiàn)AbstractQueuedSynchronizer 類的多個方法
}
這里需要說明的是 AbstractQueuedSynchronizer 抽象隊列同步器底層是一個通過Node實現(xiàn)的雙向鏈表,該抽象同步器有三個屬性 head 頭節(jié)點 陨亡, tail 尾節(jié)點 和 state 狀態(tài)值傍衡。
屬性1:head——注釋英文翻譯:等待隊列的頭部,懶加載负蠕,用于初始化蛙埂,當調(diào)用 setHead() 方法的時候會對 head 進行修改。注:如果 head 節(jié)點存在遮糖,則 head 節(jié)點的 waitStatus 狀態(tài)值用于保證其不變成 CANCELLED(取消绣的,值為1) 狀態(tài)
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
屬性2: tail——tail節(jié)點是等待隊列的尾部,懶加載欲账,在調(diào)用 enq() 方法添加一個新的 node 到等待隊列的時候會修改 tail 節(jié)點屡江。
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
屬性3:state——用于同步的狀態(tài)碼。 如果 state 該值為0赛不,則表示沒有其他線程獲取到鎖惩嘉,如果該值大于1則表示已經(jīng)被某線程獲取到了鎖,該值可以是2踢故、3文黎、4,用該值來處理重入鎖(遞歸鎖)的邏輯畴椰。
/**
* The synchronization state.
*/
private volatile int state;
1.2 上面 Sync 類使用 Node來作為雙向隊列的具體保存值和狀態(tài)的載體臊诊,Node 的具體結(jié)構(gòu)如下
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node(); // 共享鎖模式(主要用于讀寫鎖中的讀鎖)
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null; // 排他鎖模式(也叫互斥鎖)
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1; // 線程等待取消,不再參與鎖競爭
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1; // 表明線程需要被喚醒斜脂,可以競爭鎖
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2; // 表明線程在某種情況下等待
/** waitStatus value to indicate the next acquireShared should unconditionally propagate */
static final int PROPAGATE = -3;
volatile int waitStatus; // 默認等待狀態(tài)為0
volatile Node prev;
volatile Node next;
/** The thread that enqueued this node. Initialized on construction and nulled out after use.*/
volatile Thread thread; // 當前線程和節(jié)點進行綁定抓艳,通過構(gòu)造器初始化Thread,在使用的時候?qū)斍熬€程替換原有的null值
// 省略部分代碼
}
說明: Sync 通過Node節(jié)點構(gòu)建隊列,Node節(jié)點使用prev和next節(jié)點來行程雙向隊列帚戳,使用prev來關(guān)聯(lián)上一個節(jié)點玷或,使用next來關(guān)聯(lián)下一個節(jié)點儡首,每一個node節(jié)點和一個thread線程進行綁定,用來表示當前線程在阻塞隊列中的具體位置和狀態(tài) waitStatus 偏友。
2蔬胯、上面的 sync.lock() 繼續(xù)跟進源碼(非公平鎖):
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
說明:上面代碼說明,如果 compareAndSetState(0, 1) 為 true 位他,則執(zhí)行 setExclusiveOwnerThread(Thread.currentThread()) 氛濒,否則執(zhí)行 acquire(1);
2.1 compareAndSetState(0, 1) 底層使用unsafe類完成CAS操作 鹅髓,意思就是判斷當前state狀態(tài)是否為0舞竿,如果為零則將該值修改為1,并返回true窿冯;state不為0骗奖,則無法將該值修改為1,返回false醒串。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
2.2 假如 第1個線程 進來的時候 compareAndSetState(0, 1) 肯定執(zhí)行成功执桌,state 狀態(tài)會從0變成1,同時返回true芜赌,執(zhí)行 setExclusiveOwnerThread(Thread.currentThread()) 方法:
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
setExclusiveOwnerThread(Thread.currentThread()) 表示將當前 Sync 對象和當前線程綁定仰挣,意思是表明:當前對內(nèi)同步器執(zhí)行的線程為 thread,該 thread 獲取了鎖正在執(zhí)行较鼓。
2.3 假如 進來的線程為第2個 椎木,并且第一個線程還在執(zhí)行沒有釋放鎖,那么第2個線程就會執(zhí)行 acquire(1) 方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
進入到該方法中發(fā)現(xiàn)博烂,需要通過 !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 兩個方法判斷是否需要執(zhí)行 selfInterrupt();
(1)先執(zhí)行 tryAcquire(arg) 這個方法進行判斷
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 獲取state狀態(tài)香椎,因為第一個線程進來的時候只要還沒有執(zhí)行完就已經(jīng)將state設置為1了(即:2.1步)
int c = getState();
// 再次判斷之前獲取鎖的線程是否已經(jīng)釋放鎖了
if (c == 0) {
// 如果之前的線程已經(jīng)釋放鎖,那么當前線程進來就將狀態(tài)改為1禽篱,并且設置當前占用鎖的線程為自身
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 判斷當前占用鎖的線程是不是就是我自身畜伐,如果是我自身,這將State在原值的基礎上進行加1躺率,來處理重入鎖邏輯
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
// 判斷重入鎖次數(shù)是不是超過限制玛界,超過限制則直接報錯
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
從上面的方法看, 如果第二個線程進來悼吱,且第一個線程還未釋放鎖的情況下慎框,該方法 tryAcquire(arg) 直接放回false,那么 !tryAcquire(arg) 就為true后添,需要判斷第二個方法 acquireQueued( addWaiter(Node.EXCLUSIVE) , arg)笨枯,第二個方法先執(zhí)行addWaiter(Node.EXCLUSIVE),及添加等待線程進入隊列
(2)添加等待線程到同步阻塞隊列中
private Node addWaiter(Node mode) {
// 將當前線程和node節(jié)點進行綁定,設置模式為排他鎖模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;// 第二個線程也就是第一次進來該方法的線程馅精,tail肯定是null
if (pred != null) { // 如果tail尾節(jié)點不為空严嗜,表示第3、4洲敢、5次進來的線程
node.prev = pred; // 那么就將當前進來的線程節(jié)點的 prev 節(jié)點指向之前的尾節(jié)點
if (compareAndSetTail(pred, node)) { // 通過比較并交換漫玄,如果當前尾節(jié)點在設置過程中沒有被其他線程搶先操作,那么就將當前節(jié)點設置為tail尾節(jié)點
pred.next = node; // 將以前尾節(jié)點的下一個節(jié)點指向當前節(jié)點(新的尾節(jié)點)
return node;
}
}
enq(node); // 如果為第二個線程進來压彭,就是上面的 pred != null 成立沒有執(zhí)行睦优,直接執(zhí)行enq()方法
return node;
}
private Node enq(final Node node) {
for (;;) { // 一直循環(huán)檢查,相當于自旋鎖
Node t = tail;
if (t == null) { // Must initialize
// 第二個線程的第一次進來肯定先循環(huán)進入該方法哮塞,這時設置頭結(jié)點刨秆,該頭結(jié)點一般被稱為哨兵節(jié)點凳谦,并且頭和尾都指向該節(jié)點
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 1忆畅、第二個線程在第二次循環(huán)時將進入else 方法中,將該節(jié)點掛在哨兵節(jié)點(頭結(jié)點)后尸执,并且尾節(jié)點指向該節(jié)點家凯,并且將該節(jié)點返回(該節(jié)點有prev信息)
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如上在執(zhí)行 enq(final Node node) 結(jié)束,并且返回添加了第二個線程node節(jié)點的時候如失, addWaiter(Node mode) 方法會繼續(xù)向上返回
或者 : 如果是添加第3绊诲、4個線程直接走 addWaiter(Node mode) 方法中的 if 流程直接添加返回 都將,到了 2.3 步褪贵,執(zhí)行 acquireQueued(final Node node, int arg) 掂之,再次貼源碼
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
(3)即下一步就會執(zhí)行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法:
注: 上面的流程是將后面的線程加入到了同步阻塞隊列中,下面的方法第一個 if (p == head && tryAcquire(arg))則是看同步阻塞隊列的第一條阻塞線程是否可以獲取到鎖脆丁,如果能夠獲取到鎖就修改相應鏈表結(jié)構(gòu)世舰,第二個if ( shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() 即將發(fā)生線程阻塞
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 自旋鎖,如果為第二個線程槽卫,那么 p 就是 head 哨兵節(jié)點
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 上面的 if 表明如果當前線程為同步阻塞隊列中的第一個線程跟压,那么就再次試圖獲取鎖 tryAcquire(),如果獲取成功,則修改同步阻塞隊列
setHead(node); // 將head頭結(jié)點(哨兵節(jié)點)設置為已經(jīng)獲取鎖的線程node歼培,并將該node的Theread 設置為空
p.next = null; // help GC 取消和之前哨兵節(jié)點的關(guān)聯(lián)震蒋,便于垃圾回收器對之前數(shù)據(jù)的回收
failed = false;
return interrupted;
}
// 如果第二個線程沒有獲取到鎖(同步阻塞隊列中的第一個線程),那么就需要執(zhí)行下面兩個方法躲庄,注標藍的方法會讓當前未獲取到鎖的線程阻塞
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
// 將哨兵節(jié)點往后移查剖,并且將 thread 設置為空,取消和以前哨兵節(jié)點的關(guān)聯(lián)噪窘,并于垃圾回收器回收
head = node;
node.thread = null;
node.prev = null;
}
shouldParkAfterFailedAcquire(p, node)這個方法將哨兵隊列的狀態(tài)設置為待喚醒狀態(tài)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// pred為哨兵節(jié)點笋庄,ws為哨兵節(jié)點的狀態(tài)
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 將頭結(jié)點(哨兵節(jié)點)設置成待喚醒狀態(tài),第一次進來的時候
}
return false;
}
parkAndCheckInterrupt()這個方法會讓當前線程阻塞
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // LockSupport.park()會導致當前線程阻塞,直到某個線程調(diào)用unpark()方法
return Thread.interrupted();
}
那么在lock()方法執(zhí)行時无切,只要第一個線程沒有unlock()釋放鎖荡短,其他所有線程都會加入同步阻塞隊列中,該隊列中記錄了阻塞線程的順序哆键,在加入同步阻塞隊列前有多次機會可以搶先執(zhí)行(非公平鎖)掘托,如果沒有被執(zhí)行到,那么加入同步阻塞隊列后籍嘹,就只有頭部節(jié)點(哨兵節(jié)點)后的阻塞線程有機會獲取到鎖進行邏輯處理闪盔。再次查看該方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// if 表明只有頭部節(jié)點(哨兵節(jié)點)后的節(jié)點在放入同步阻塞隊列前可以獲取鎖
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 所有線程都被阻塞在這個方法處
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
二、unlock()方法
1辱士、unlock源碼
public void unlock() {
sync.release(1);
}
同樣是調(diào)用的同步阻塞隊列的方法 sync.release(1)泪掀,跟進查看源碼:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
2、查看tryRelease()方法:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
// 如果不是自身鎖對象調(diào)用unlock()方法的話颂碘,就報異常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
// 如果標志位已經(jīng)為0异赫,表示重入鎖已經(jīng)全部釋放,這將當前獲取鎖的線程設置為null,以便其他線程進行加鎖
setExclusiveOwnerThread(null);
}
// 更新重入鎖解鎖到達的次數(shù)头岔,如果C不為0,表示還有重入鎖unlock()沒有調(diào)用完
setState(c);
return free;
}
3塔拳、如果tryRelease()方法成功執(zhí)行,表示之前獲取鎖的線程已經(jīng)執(zhí)行完所有需要同步的代碼(重入鎖也完全退出)峡竣,那么就需要喚醒同步阻塞隊列中的第一個等待的線程(也是等待最久的線程)靠抑,執(zhí)行unparkSuccessor(h)方法:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
// 先獲取頭結(jié)點(哨兵節(jié)點)的waitStatus狀態(tài),如果小于0,則可以獲取鎖适掰,并將waitStatus的狀態(tài)設置為0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 如果哨兵節(jié)點的下一個節(jié)點為null,或者狀態(tài)為1表示已經(jīng)取消颂碧,則依次循環(huán)尋找后面節(jié)點,直至找到一個waitStatus<0的節(jié)點类浪,并將該節(jié)點設置為需要獲取鎖的節(jié)點
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 將該node節(jié)點的線程解鎖载城,允許它去獲取鎖,然后執(zhí)行業(yè)務邏輯
LockSupport.unpark(s.thread);
}
三戚宦、unlock()方法調(diào)用后个曙,會到lock()方法阻塞的地方,完成喚醒工作
1受楼、在上面方法 unparkSuccessor(Node node) 中執(zhí)行完 LockSupport.unpark(s.thread) 后在同步阻塞隊列后的第一個 node 關(guān)聯(lián)的線程將被喚醒垦搬,即unlock()方法代碼執(zhí)行完,將會到lock() 源碼解析的 2.3 步里艳汽,第三次貼該處源碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
2猴贰、在上面放大的紅色方法中,之前上面lock()源碼講了當中所有線程都被阻塞了河狐,如下面源碼紅色標記的地方:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
3米绕、所有未獲取到鎖的線程都在 parkAndCheckInterrupt() 方法處阻塞著瑟捣,所以我們即將喚醒的哨兵節(jié)點后的第一個阻塞線程也是在該處阻塞著,在執(zhí)行完 unlock() 源碼步驟第3步 unparkSuccessor(Node node) 中的方法栅干,則將 返回到 之前阻塞線程的這個方法 parkAndCheckInterrupt()的這行代碼 LockSupport.park( this ) 的下一步執(zhí)行 Thread.interrupted()迈套,喚醒當前線程。最后一步一步跟源碼往上返回 true ,就又到了這個方法acquire(int arg)碱鳞,第四次貼源碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 上面紅色的兩個方法都為 true,執(zhí)行自我喚醒
selfInterrupt();
}
4桑李、最終到了喚醒方法 selfInterrupt()
static void selfInterrupt() {
// 最終底層會調(diào)用 c++的喚醒方法
Thread.currentThread().interrupt();
}
總結(jié):
在第一個 A 線程 lock() 獲取到鎖后,第一個線程會在底層的同步阻塞隊列中設置鎖狀態(tài) state 為1(如果重入鎖多次獲取 state 每次加1)窿给,并設置擁有當前鎖的線程為自身A線程贵白,其他線程 B/C/D 來獲取鎖的時候就會比較鎖狀態(tài)是否為0,如果不為0崩泡,表示已經(jīng)被獲取了鎖禁荒,再次比較獲取鎖的線程是否為自身,如果為自身則對 state 加1(滿足重入鎖的規(guī)則)角撞,否則這將當前未獲取到鎖的線程放入同步阻塞隊列中呛伴,在放入的過程中,需要設置 head 哨兵節(jié)點和 tail 尾節(jié)點靴寂,以及相應的 waitStatus 狀態(tài)磷蜀,并且在放入過程中需要設置當前節(jié)點以及先關(guān)節(jié)點的 prev 和 next 節(jié)點,從而達到雙向隊列的效果百炬,存放到阻塞隊列后,線程會被阻塞到這樣一個方法中 parkAndCheckInterrupt() 污它,等待被喚醒剖踊。
在第一個 A 線程執(zhí)行完畢,調(diào)用 unlock() 解鎖后衫贬,unlock() 方法會從同步阻塞隊列的哨兵節(jié)點后的第一個節(jié)點獲取等待解鎖的線程B德澈,并將其解鎖,然后就會到B阻塞的方法 parkAndCheckInterrupt() 來繼續(xù)執(zhí)行固惯,調(diào)用 selfInterrupt()方法梆造,最終調(diào)用底層的 c語言的喚醒方法,使得 B 線程完成 lock() 方法獲取到鎖葬毫,然后執(zhí)行業(yè)務邏輯镇辉。其他線程以此類推,依次發(fā)生阻塞和喚醒贴捡。