ReentrantLock 是上文提到的 AQS 其中的一個(gè)實(shí)現(xiàn)類豁护,是一個(gè)可重入的互斥鎖奏篙,和 synchronized 有相同的基本行為和語義,但是具有擴(kuò)展功能握童。它由上一次成功鎖定并且尚未解鎖的線程擁有。
ReentrantLock 源碼初探 (JDK11)
- ReentrantLock 之構(gòu)造器
// 創(chuàng)建公平鎖
public ReentrantLock() {
sync = new NonfairSync();
}
// 創(chuàng)建公平鎖(true) or 非公平鎖(false)
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
- ReentrantLock 之內(nèi)部類 Sync
/** AQS 的實(shí)現(xiàn)類淘捡, 重寫 AQS 中的 protected 方法
有兩個(gè)實(shí)現(xiàn)類 1藕各、NonfairSync 非公平鎖,2焦除、FairSync公平鎖 */
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/** 嘗試獲取非公平鎖 */
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) { //
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 判斷state狀態(tài)是否為0,不為0直接加鎖
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current); //獨(dú)占狀態(tài)鎖持有者指向當(dāng)前線程
return true;
}
} // state狀態(tài)不為0 但是鎖被當(dāng)前線程持有 則state+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; //加鎖失敗
}
/** 釋放鎖作彤,公平鎖和非公平鎖的釋放操作是相同的 */
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/** 判斷持有獨(dú)占鎖的線程是否是當(dāng)前線程 */
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// 獲取鎖重入次數(shù)
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
- ReentrantLock 之內(nèi)部類 NonfairSync膘魄,F(xiàn)airSync
非公平鎖加鎖邏輯由內(nèi)部類Sync實(shí)現(xiàn),一上來就會(huì)嘗試獲取鎖資源竭讳;公平鎖加鎖邏輯由本身重寫實(shí)現(xiàn)创葡,在獲取鎖資源之前會(huì)判斷隊(duì)列中是否有正在等待的節(jié)點(diǎn),如果沒有才會(huì)嘗試獲取鎖資源绢慢。他們之間的重入邏輯則是相同的灿渴,如果是當(dāng)前節(jié)點(diǎn)的線程再次請(qǐng)求鎖資源則會(huì)對(duì)該節(jié)點(diǎn)的 state + 1,表示重入的次數(shù)胰舆。
/** 非公平鎖 */
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 重寫AQS的方法邏輯骚露,由AQS的 acquire(int arg) 方法調(diào)用
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires); // 這個(gè)方法由Sync實(shí)現(xiàn)
}
}
/** 公平鎖 */
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 分配鎖資源邏輯(這里只有分配成功的操作,對(duì)于分配失敗的邏輯缚窿,在AQS中實(shí)現(xiàn))
if (c == 0) {
// 隊(duì)列中沒有等待節(jié)點(diǎn)棘幸,則對(duì)當(dāng)前線程分配鎖資源
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 重入邏輯 (對(duì)當(dāng)前state的值 +1 )
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
- java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors
// 判斷隊(duì)列中是否有等待節(jié)點(diǎn)
public final boolean hasQueuedPredecessors() {
Node h, s;
if ((h = head) != null) {
// 當(dāng) head 的下一個(gè)節(jié)點(diǎn)為 null 或者 是等待被剔除狀態(tài)的時(shí)候
if ((s = h.next) == null || s.waitStatus > 0) {
s = null; // traverse in case of concurrent cancellation
// 從 tail 節(jié)點(diǎn)往前遍歷,獲取最接近 head 的等待節(jié)點(diǎn)
for (Node p = tail; p != h && p != null; p = p.prev) {
if (p.waitStatus <= 0) // 過濾掉隊(duì)列中撤銷等待的節(jié)點(diǎn)
s = p;
}
}
// 該等待節(jié)點(diǎn)可以被喚醒(當(dāng)前訪問線程不是該等待節(jié)點(diǎn)的線程)
if (s != null && s.thread != Thread.currentThread())
return true;
}
// 隊(duì)列中沒有等待節(jié)點(diǎn)
return false;
}
加鎖解鎖過程
寫到這里倦零,對(duì) ReentrantLock 所實(shí)現(xiàn)的邏輯有了一個(gè)大概的了解误续,但是可以發(fā)現(xiàn),上面只有對(duì)加鎖扫茅,解鎖進(jìn)行了操作蹋嵌,但是我們一直提及的節(jié)點(diǎn),隊(duì)列在上面并沒有體現(xiàn)葫隙,那么我們加鎖失敗是怎么操作的呢栽烂,傳說中的入隊(duì)出隊(duì)又是一個(gè)什么情況呢,接下來就對(duì) ReentrantLock 的 lock() , unloc() 操作對(duì)整個(gè)加鎖解鎖邏輯串聯(lián)進(jìn)行梳理停蕉。
這里我們看一下非公平鎖的操作吧愕鼓。
- 入列操作
- java.util.concurrent.locks.ReentrantLock.lock
public void lock() {
// 這里的 acquire 方法由 AQS 實(shí)現(xiàn)
sync.acquire(1);
}
- java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {
/** !tryAcquire(arg) 代表獲取鎖的狀態(tài)(修改stat,設(shè)置線程),獲取成功返回true取反則跳出 accquire慧起;
獲取失敗返回false取反則進(jìn)行后續(xù) acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 操作*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
// 獲取鎖失敗會(huì)進(jìn)行addWaiter操作菇晃;將當(dāng)前線程對(duì)應(yīng)的節(jié)點(diǎn)添加到隊(duì)列中,指向tail
private Node addWaiter(Node mode) {
Node node = new Node(mode);
// 自旋將當(dāng)前獲取鎖失敗的線程對(duì)應(yīng)的節(jié)點(diǎn)添加到隊(duì)列中蚓挤,并指向tail磺送。
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
// 返回的是當(dāng)前請(qǐng)求線程對(duì)應(yīng)的節(jié)點(diǎn)
return node;
}
} else {
// 初始化同步隊(duì)列的操作驻子,初始化結(jié)束后 head 和 tail 指向的是同一個(gè)節(jié)點(diǎn)
initializeSyncQueue();
}
}
}
private final void initializeSyncQueue() {
Node h;
if (HEAD.compareAndSet(this, null, (h = new Node())))
tail = h;
}
// 初始化獨(dú)占模式的節(jié)點(diǎn)
/** java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
.Node(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node) */
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;
THREAD.set(this, Thread.currentThread());
}
- java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
// 這里的 node 為當(dāng)前線程在隊(duì)列中對(duì)應(yīng)的節(jié)點(diǎn),arg為獲取鎖時(shí)的標(biāo)記狀態(tài)(1)
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false; // 線程中斷標(biāo)記
try {
for (;;) {
final Node p = node.predecessor();
/** 如果node的前一個(gè)節(jié)點(diǎn)為 head 則嘗試獲取鎖估灿,
這里是下面中斷的線程被喚醒后崇呵,重新進(jìn)入循環(huán),會(huì)執(zhí)行這一段代碼
這里并沒有使用CAS來設(shè)置頭結(jié)點(diǎn)馅袁,因?yàn)?tryAcquire 里面的CAS操作域慷,
只能有一個(gè)線程進(jìn)入到if里面的代碼塊*/
if (p == head && tryAcquire(arg)) {
// 加鎖成功,將當(dāng)前節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)汗销,代表當(dāng)前節(jié)點(diǎn)出隊(duì)犹褒。
setHead(node);
p.next = null; // help GC
return interrupted;
}
// 判斷是否可以掛起(通過當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)的waitStatus屬性判斷是否可以被喚醒)
// 如果該線程被喚醒,則繼續(xù)循環(huán)執(zhí)行上面的加鎖操作弛针,喚醒后將該節(jié)點(diǎn)剔除隊(duì)列
if (shouldParkAfterFailedAcquire(p, node))
// 執(zhí)行掛起操作
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
// 以上操作拋出異常的時(shí)候叠骑,撤銷本次請(qǐng)求
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
- java.util.concurrent.locks.AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire
// 這里傳入的 node 為當(dāng)前訪問線程對(duì)應(yīng)的節(jié)點(diǎn),
// 確保前節(jié)點(diǎn)狀態(tài)是可以被喚醒
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果前一個(gè)節(jié)點(diǎn)是等待狀態(tài)削茁,則返回true執(zhí)行掛起操作
return true;
if (ws > 0) {
/** 頭節(jié)點(diǎn)的 waitStatus 值是0宙枷,這里是從當(dāng)前節(jié)點(diǎn)(tail)往前遍歷,
取最近的一個(gè)等待狀態(tài)的節(jié)點(diǎn) or head節(jié)點(diǎn)*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/** waitStatus必須為0或PROPAGATE茧跋。CAS設(shè)置屬性之后重新進(jìn)入判斷*/
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
- java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt
// 對(duì)當(dāng)前線程執(zhí)行掛起操作
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
- java.util.concurrent.locks.AbstractQueuedSynchronizer#cancelAcquire
// 在執(zhí)行掛起操作發(fā)生異常時(shí)慰丛,取消正在獲取資源的請(qǐng)求
// 這里的node是當(dāng)前請(qǐng)求線程對(duì)應(yīng)的節(jié)點(diǎn)
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// 從當(dāng)前節(jié)點(diǎn)(也就是尾節(jié)點(diǎn))向前遍歷,找到最近的一個(gè)等待的節(jié)點(diǎn)退出循環(huán)
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 這里的 predNext 是距離尾節(jié)點(diǎn)最近的等待節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)(不一定是當(dāng)前節(jié)點(diǎn))
Node predNext = pred.next;
// 標(biāo)記當(dāng)前節(jié)點(diǎn)為取消狀態(tài)厌衔,其余節(jié)點(diǎn)則可以跳過該節(jié)點(diǎn)
node.waitStatus = Node.CANCELLED;
// 如果當(dāng)前節(jié)點(diǎn)是尾節(jié)點(diǎn)璧帝,則移除自己
if (node == tail && compareAndSetTail(node, pred)) {
pred.compareAndSetNext(predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
pred.thread != null) {
// 這里是將前一個(gè)等待節(jié)點(diǎn) next 設(shè)置尾當(dāng)前節(jié)點(diǎn)的 next,將當(dāng)前 node 剔除富寿;
// 假如中間有撤銷節(jié)點(diǎn)的話睬隶,這樣操作也會(huì)將其過濾掉;
Node next = node.next;
if (next != null && next.waitStatus <= 0)
pred.compareAndSetNext(predNext, next);
} else {
// 當(dāng)前節(jié)點(diǎn)是head節(jié)點(diǎn) or 當(dāng)前節(jié)點(diǎn) waitStatus 為 PROPAGATE 時(shí)進(jìn)入當(dāng)前邏輯
// 喚醒當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
- 出列操作
同步隊(duì)列(CLH)遵循FIFO页徐,首節(jié)點(diǎn)是獲取同步狀態(tài)的節(jié)點(diǎn)苏潜,首節(jié)點(diǎn)的線程釋放同步狀態(tài)后,將會(huì)喚醒它的后繼節(jié)點(diǎn)(next)变勇,而后繼節(jié)點(diǎn)將會(huì)在獲取同步狀態(tài)成功時(shí)將自己設(shè)置為首節(jié)點(diǎn)恤左。設(shè)置首節(jié)點(diǎn)是通過獲取同步狀態(tài)成功的線程來完成的(獲取同步狀態(tài)是通過CAS來完成),只能有一個(gè)線程能夠獲取到同步狀態(tài)搀绣,因此設(shè)置頭節(jié)點(diǎn)的操作并不需要CAS來保證飞袋,只需要將首節(jié)點(diǎn)設(shè)置為其原首節(jié)點(diǎn)的后繼節(jié)點(diǎn)并斷開原首節(jié)點(diǎn)的next(等待GC回收)應(yīng)用即可。
- java.util.concurrent.locks.AbstractQueuedSynchronizer#release
// tryRelease 修改實(shí)現(xiàn)類的state修改和節(jié)點(diǎn)線程解綁链患,成功返回true,從頭節(jié)點(diǎn)開始出隊(duì)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;
// 這里是判斷下一個(gè)節(jié)點(diǎn)是否撤銷等待巧鸭,
// 如果撤銷的話,找下一個(gè)等待節(jié)點(diǎn)(從tail開始往前找最遠(yuǎn)的一個(gè)等待節(jié)點(diǎn))
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
// 對(duì)其執(zhí)行喚醒操作
if (s != null)
LockSupport.unpark(s.thread);
}
總結(jié):AQS將獲取鎖麻捻,釋放鎖的操作交由子類去實(shí)現(xiàn)纲仍,這里是由ReentrantLock的公平鎖呀袱,非公平鎖來實(shí)現(xiàn);其本身實(shí)現(xiàn)了共享模式和獨(dú)占模式的入列出列操作郑叠,這里的代碼看的是獨(dú)占模式的入隊(duì)出隊(duì)操作夜赵。獲取鎖失敗則將當(dāng)前線程的對(duì)應(yīng)的節(jié)點(diǎn)添加到tail,添加時(shí)要保證它的前一節(jié)點(diǎn)是可以被喚醒的乡革,添加成功后將當(dāng)前線程掛起寇僧;喚醒操作是對(duì)head的后繼節(jié)點(diǎn)進(jìn)行喚醒,喚醒后會(huì)重復(fù)執(zhí)行入隊(duì)操作中的頭節(jié)點(diǎn)獲取鎖的邏輯沸版,獲取成功婉宰,即可跳出循環(huán)。