如何實(shí)現(xiàn)一個(gè)鎖
實(shí)現(xiàn)一個(gè)鎖膛壹,主要需要考慮2個(gè)問(wèn)題
- 如何線程安全的修改鎖狀態(tài)位?
- 得不到鎖的線程唉堪,如何排隊(duì)模聋?
帶著這2個(gè)問(wèn)題,我們看一下JUC中的ReentrantLock是如何做的唠亚?
ReentrantLock鎖實(shí)現(xiàn)
ReentrantLock類的大部分邏輯链方,都是其均繼承自AQS的內(nèi)部類Sync實(shí)現(xiàn)的
如何線程安全的修改鎖狀態(tài)位?
鎖狀態(tài)位的修改主要通過(guò)灶搜,內(nèi)部類Sync實(shí)現(xiàn)的
public abstract class AbstractQueuedSynchronizer{
//鎖狀態(tài)標(biāo)志位:volatile變量(多線程間通過(guò)此變量判斷鎖的狀態(tài))
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
}
abstract static Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//volatile讀祟蚀,確保了鎖狀態(tài)位的內(nèi)存可見(jiàn)性
int c = getState();
//鎖還沒(méi)有被其他線程占用
if (c == 0) {
//此時(shí),如果多個(gè)線程同時(shí)進(jìn)入割卖,CAS操作會(huì)確保前酿,只有一個(gè)線程修改成功
if (compareAndSetState(0, acquires)) {
//設(shè)置當(dāng)前線程擁有獨(dú)占訪問(wèn)權(quán)
setExclusiveOwnerThread(current);
return true;
}
}
//當(dāng)前線程就是擁有獨(dú)占訪問(wèn)權(quán)的線程,即鎖重入
else if (current == getExclusiveOwnerThread()) {
//重入鎖計(jì)數(shù)+1
int nextc = c + acquires;
if (nextc < 0) //溢出
throw new Error("Maximum lock count exceeded");
//只有獲取鎖的線程鹏溯,才能進(jìn)入此段代碼罢维,因此只需要一個(gè)volatile寫操作,確保其內(nèi)存可見(jiàn)性即可
setState(nextc);
return true;
}
return false;
}
//只有獲取鎖的線程才會(huì)執(zhí)行此方法丙挽,因此只需要volatile讀寫確保內(nèi)存可見(jiàn)性即可
protected final boolean tryRelease(int releases) {
//鎖計(jì)數(shù)器-1
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//鎖計(jì)數(shù)器為0肺孵,說(shuō)明鎖被釋放
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
從代碼中匀借,我們可以發(fā)現(xiàn)線程安全的關(guān)鍵在于:<font color="#dd0000">volatile變量</font>和<font color="#dd0000">CAS原語(yǔ)</font>的配合使用
得不到鎖的線程,如何排隊(duì)平窘?
JUC中鎖的排隊(duì)策略吓肋,是基于CLH隊(duì)列的變種實(shí)現(xiàn)的。因此瑰艘,我們先看看啥是CLH隊(duì)列
CLH隊(duì)列
如上圖所示是鬼,獲取不到鎖的線程,會(huì)進(jìn)入隊(duì)尾紫新,然后自旋屑咳,直到其前驅(qū)線程釋放鎖。
這樣做的好處:假設(shè)有1000個(gè)線程等待獲取鎖弊琴,鎖釋放后,只會(huì)通知隊(duì)列中的第一個(gè)線程去競(jìng)爭(zhēng)鎖杖爽,減少了并發(fā)沖突敲董。(ZK的分布式鎖,為了避免驚群效應(yīng)慰安,也使用了類似的方式:獲取不到鎖的線程只監(jiān)聽(tīng)前一個(gè)節(jié)點(diǎn))
為什么說(shuō)JUC中的實(shí)現(xiàn)是基于CLH的“變種”腋寨,因?yàn)樵糃LH隊(duì)列,一般用于實(shí)現(xiàn)自旋鎖化焕。而JUC中的實(shí)現(xiàn)萄窜,獲取不到鎖的線程,一般會(huì)時(shí)而阻塞撒桨,時(shí)而喚醒查刻。
JUC中的CLH隊(duì)列實(shí)現(xiàn)
我們來(lái)看看AbstractQueuedSynchronizer類中的acquire方法實(shí)現(xiàn)
public final void acquire(int arg) {
//嘗試獲取鎖
if (!tryAcquire(arg) &&
//獲取不到,則進(jìn)入等待隊(duì)列,返回是否中斷
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//如果返回中斷凤类,則調(diào)用當(dāng)前線程的interrupt()方法
selfInterrupt();
}
入隊(duì)
如果線程調(diào)用tryAcquire(其最終實(shí)現(xiàn)是調(diào)用上面分析過(guò)的nonfairTryAcquire方法)獲取鎖失敗穗泵。則首先調(diào)用addWaiter(Node.EXCLUSIVE)方法,將自己加入CLH隊(duì)列的尾部谜疤。
private Node addWaiter(Node mode) {
//線程對(duì)應(yīng)的Node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//尾節(jié)點(diǎn)不為空
if (pred != null) {
//當(dāng)前node的前驅(qū)指向尾節(jié)點(diǎn)
node.prev = pred;
//將當(dāng)前node設(shè)置為新的尾節(jié)點(diǎn)
//如果cas操作失敗佃延,說(shuō)明線程競(jìng)爭(zhēng)
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//lockfree的方式插入隊(duì)尾
enq(node);
return node;
}
private Node enq(final Node node) {
//經(jīng)典的lockfree算法:循環(huán)+CAS
for (;;) {
Node t = tail;
//尾節(jié)點(diǎn)為空
if (t == null) { // Must initialize
//初始化頭節(jié)點(diǎn)
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
入隊(duì)過(guò)程,入下圖所示
1 T0持有鎖時(shí)夷磕,其CLH隊(duì)列的頭尾指針為NULL
2 線程T1履肃,此時(shí)請(qǐng)求鎖,由于鎖被T0占有坐桩。因此加入隊(duì)列尾部尺棋。具體過(guò)程如下所示:
(1) 初始化頭節(jié)點(diǎn)
(2) 初始化T1節(jié)點(diǎn),入隊(duì)撕攒,尾指針指向T1
3 此時(shí)如果有一個(gè)T10線程先于T1入隊(duì)陡鹃,則T1執(zhí)行compareAndSetTail(t, node)會(huì)失敗烘浦,然后回到for循環(huán)開(kāi)始處,重新入隊(duì)萍鲸。
由自旋到阻塞
入隊(duì)后闷叉,調(diào)用acquireQueued方法,時(shí)而自旋脊阴,時(shí)而阻塞握侧,直到獲取鎖(或被取消)。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//其前驅(qū)是頭結(jié)點(diǎn)嘿期,并且再次調(diào)用tryAcquire成功獲取鎖
if (p == head && tryAcquire(arg)) {
//將自己設(shè)為頭結(jié)點(diǎn)
setHead(node);
p.next = null; // help GC
failed = false;
//成功獲取鎖品擎,返回
return interrupted;
}
//沒(méi)有得到鎖時(shí):
//shouldParkAfterFailedAcquire方法:返回是否需要阻塞當(dāng)前線程
//parkAndCheckInterrupt方法:阻塞當(dāng)前線程,當(dāng)線程再次喚醒時(shí)备徐,返回是否被中斷
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//修改中斷標(biāo)志位
interrupted = true;
}
} finally {
if (failed)
//獲取鎖失敗萄传,則將此線程對(duì)應(yīng)的node的waitStatus改為CANCEL
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
/**
* 獲取鎖失敗時(shí),檢查并更新node的waitStatus蜜猾。
* 如果線程需要阻塞秀菱,返回true。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//前驅(qū)節(jié)點(diǎn)的waitStatus是SIGNAL蹭睡。
if (ws == Node.SIGNAL)
/*
* SIGNAL狀態(tài)的節(jié)點(diǎn)衍菱,釋放鎖后,會(huì)喚醒其后繼節(jié)點(diǎn)肩豁。
* 因此脊串,此線程可以安全的阻塞(前驅(qū)節(jié)點(diǎn)釋放鎖時(shí),會(huì)喚醒此線程)清钥。
*/
return true;
//前驅(qū)節(jié)點(diǎn)對(duì)應(yīng)的線程被取消
if (ws > 0) {
do {
//跳過(guò)此前驅(qū)節(jié)點(diǎn)
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
此時(shí)琼锋,需要將前驅(qū)節(jié)點(diǎn)的狀態(tài)設(shè)置為SIGNAL。
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//當(dāng)shouldParkAfterFailedAcquire方法返回true祟昭,則調(diào)用parkAndCheckInterrupt方法阻塞當(dāng)前線程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
自旋過(guò)程斩例,入下圖所示
然后線程T2,加入了請(qǐng)求鎖的隊(duì)列,尾指針后移从橘。
終上所述念赶,每個(gè)得不到鎖的線程,都會(huì)講自己封裝成Node恰力,加入隊(duì)尾叉谜,或自旋或阻塞,直到獲取鎖(為簡(jiǎn)化問(wèn)題踩萎,不考慮取消的情況)
鎖的釋放
前文提到停局,T1,T2在阻塞之前,都回去修改其前驅(qū)節(jié)點(diǎn)的waitStatus=-1。這是為什么董栽?
我們看下鎖釋放的代碼码倦,便一目了然
public final boolean release(int arg) {
//修改鎖計(jì)數(shù)器,如果計(jì)數(shù)器為0,說(shuō)明鎖被釋放
if (tryRelease(arg)) {
Node h = head;
//head節(jié)點(diǎn)的waitStatus不等于0锭碳,說(shuō)明head節(jié)點(diǎn)的后繼節(jié)點(diǎn)對(duì)應(yīng)的線程袁稽,正在阻塞,等待被喚醒
if (h != null && h.waitStatus != 0)
//喚醒后繼節(jié)點(diǎn)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//后繼節(jié)點(diǎn)
Node s = node.next;
//如果s被取消擒抛,跳過(guò)被取消節(jié)點(diǎn)
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//喚醒后繼節(jié)點(diǎn)
LockSupport.unpark(s.thread);
}
如代碼所示推汽,waitStatus=-1的作用,主要是告訴釋放鎖的線程:后面還有排隊(duì)等待獲取鎖的線程歧沪,請(qǐng)喚醒他歹撒!
釋放鎖的過(guò)程,如圖所示:
總結(jié)
實(shí)現(xiàn)鎖的關(guān)鍵在于:
- 通過(guò)CAS操作與volatile變量互相配合诊胞,線程安全的修改鎖標(biāo)志位
- 基于CLH隊(duì)列暖夭,實(shí)現(xiàn)鎖的排隊(duì)策略
問(wèn)題討論
1 unparkSuccessor時(shí)為什么會(huì)出現(xiàn)s==null || s.waitStatus>0的情況
2 這種情況下,為什么要通過(guò)prev指針?lè)聪虿檎襍uccessor節(jié)點(diǎn)
//unparkSuccessor方法的相關(guān)代碼
//釋放鎖時(shí)撵孤,s就是head.next
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
原因如下:
當(dāng)釋放鎖的線程在執(zhí)行unparkSuccessor(head)時(shí)鳞尔,分情況進(jìn)行分析:
(1)s==0的情況:
觸發(fā)條件:head的successor節(jié)點(diǎn)獲取鎖成功后,執(zhí)行了head.next=null的操作后早直,解鎖線程讀取了head.next,因此s==null
反向遍歷的原因:next指針在head.next處斷開(kāi)了市框,只能通過(guò)prev指針遍歷
關(guān)鍵代碼霞扬,如圖所示:
(2)s.waitStatus > 0的情況
觸發(fā)條件:head的successor節(jié)點(diǎn)被取消(cancelAcquire)時(shí),執(zhí)行了如下操作:
successor.waitStatus=1 ;
successor.next = successor;
反向遍歷的原因:next指針在head節(jié)點(diǎn)的后繼節(jié)點(diǎn)處斷開(kāi)(head.next.next)枫振,因此只能通過(guò)prev指針遍歷
關(guān)鍵代碼喻圃,如圖所示: