前言
在 Java
中通過 鎖
來控制多個線程對共享資源的訪問现柠,使用 Java
編程語言開發(fā)的朋友都知道,可以通過 synchronized
關(guān)鍵字來實現(xiàn)鎖的功能拥诡,它可以隱式的獲取鎖择克,也就是說我們使用該關(guān)鍵字并不需要去關(guān)心鎖的獲取和釋放過程幻捏,但是在提供方便的同時也意味著其靈活性的下降。例如殿遂,有這樣的一個場景诈铛,先獲取鎖 A,然后再獲取鎖 B墨礁,當(dāng)鎖 B 獲取到之后幢竹,釋放鎖 A 同時獲取鎖 C,當(dāng)獲取鎖 C 后饵溅,再釋放鎖 B 同時獲取鎖 D妨退,依次類推,像這種比較復(fù)雜的場景,使用 synchronized
關(guān)鍵字就比較難實現(xiàn)了咬荷。
在 Java SE 5
之后冠句,新增加了 Lock
接口和一系列的實現(xiàn)類來提供和 synchronized
關(guān)鍵字一樣的功能,它需要我們顯示的進(jìn)行鎖的獲取和釋放幸乒,除此之外還提供了可響應(yīng)中斷的鎖獲取操作以及超時獲取鎖等同步特性懦底。JDK
中提供的 Lock
接口實現(xiàn)類大部分都是聚合一個同步器 AQS 的子類來實現(xiàn)多線程的訪問控制的,下面我們看看這個構(gòu)建鎖和其它同步組件的基礎(chǔ)框架——隊列同步器 AQS(AbstractQueuedSynchronizer)
罕扎。
AQS 基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)
同步隊列
隊列同步器 AQS
(下文簡稱為同步器)主要是依賴于內(nèi)部的一個 FIFO(first-in-first-out)雙向隊列來對同步狀態(tài)進(jìn)行管理的聚唐,當(dāng)線程獲取同步狀態(tài)失敗時,同步器會將當(dāng)前線程和當(dāng)前等待狀態(tài)等信息封裝成一個內(nèi)部定義的節(jié)點(diǎn) Node
腔召,然后將其加入隊列杆查,同時阻塞當(dāng)前線程;當(dāng)同步狀態(tài)釋放時臀蛛,會將同步隊列中首節(jié)點(diǎn)喚醒亲桦,讓其再次嘗試去獲取同步狀態(tài)。同步隊列的基本結(jié)構(gòu)如下:
隊列節(jié)點(diǎn) Node
同步隊列使用同步器中的靜態(tài)內(nèi)部類 Node
用來保存獲取同步狀態(tài)的線程的引用浊仆、線程的等待狀態(tài)客峭、前驅(qū)節(jié)點(diǎn)和后繼節(jié)點(diǎn)。
同步隊列中 Node
節(jié)點(diǎn)的屬性名稱和具體含義如下表所示:
屬性類型和名稱 | 描述 |
---|---|
volatile int waitStatus | 當(dāng)前節(jié)點(diǎn)在隊列中的等待狀態(tài) |
volatile Node prev | 前驅(qū)節(jié)點(diǎn)抡柿,當(dāng)節(jié)點(diǎn)加入同步隊列時被賦值(使用尾部添加方式) |
volatile Node next | 后繼節(jié)點(diǎn) |
volatile Thread thread | 獲取同步狀態(tài)的線程 |
Node nextWaiter | 等待隊列中的后繼節(jié)點(diǎn)舔琅,如果當(dāng)前節(jié)點(diǎn)是共享的,則該字段是一個 SHARED 常量 |
每個節(jié)點(diǎn)線程都有兩種鎖模式洲劣,分別為 SHARED
表示線程以共享的模式等待鎖备蚓,EXCLUSIVE
表示線程以獨(dú)占的方式等待鎖。同時每個節(jié)點(diǎn)的等待狀態(tài) waitStatus
只能取以下表中的枚舉值:
枚舉值 | 描述 |
---|---|
SIGNAL | 值為 -1闪檬,表示該節(jié)點(diǎn)的線程已經(jīng)準(zhǔn)備完畢星著,等待資源釋放 |
CANCELLED | 值為 1,表示該節(jié)點(diǎn)線程獲取鎖的請求已經(jīng)取消了 |
CONDITION | 值為 -2粗悯,表示該節(jié)點(diǎn)線程等待在 Condition 上虚循,等待被其它線程喚醒 |
PROPAGATE | 值為 -3,表示下一次共享同步狀態(tài)獲取會無限進(jìn)行下去样傍,只在 SHARED 情況下使用 |
0 | 值為 0横缔,初始狀態(tài),初始化的默認(rèn)值 |
同步狀態(tài) state
同步器內(nèi)部使用了一個名為 state
的 int
類型的變量表示同步狀態(tài)衫哥,同步器的主要使用方式是通過繼承茎刚,子類通過繼承并實現(xiàn)它的抽象方法來管理同步狀態(tài),同步器給我們提供了如下三個方法來對同步狀態(tài)進(jìn)行更改撤逢。
方法簽名 | 描述 |
---|---|
protected final int getState() | 獲取當(dāng)前同步狀態(tài) |
protected final void setState(int newState) | 設(shè)置當(dāng)前同步狀態(tài) |
protected final boolean compareAndSetState(int expect, int update) | 使用 CAS 設(shè)置當(dāng)前狀態(tài)膛锭,該方法能夠保證狀態(tài)設(shè)置的原子性 |
在獨(dú)享鎖中同步狀態(tài) state
這個值通常是 0 或者 1(如果是重入鎖的話 state
值就是重入的次數(shù))粮坞,在共享鎖中 state
就是持有鎖的數(shù)量。
獨(dú)占式同步狀態(tài)獲取與釋放
同步器中提供了 acquire(int arg)
方法來進(jìn)行獨(dú)占式同步狀態(tài)的獲取初狰,獲取到了同步狀態(tài)也就是獲取到了鎖莫杈,該方法源碼如下所示:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
方法首先會調(diào)用 tryAcquire
方法嘗試去獲取鎖,查看方法的源碼可以發(fā)現(xiàn)奢入,同步器并未對該方法進(jìn)行實現(xiàn)(只是拋出一個不支持操作異常 UnsupportedOperationException
)筝闹,這個方法是需要后續(xù)同步組件的開發(fā)人員自己去實現(xiàn)的,如果方法返回 true
則表示當(dāng)前線程成功獲取到鎖腥光,調(diào)用 selfInterrupt()
中斷當(dāng)前線程(PS:這里留給大家一個問題:為什么獲取了鎖以后還要中斷線程呢关顷?
),方法結(jié)束返回武福,如果方法返回 false
則表示當(dāng)前線程獲取鎖失敗议双,也就是說有其它線程先前已經(jīng)獲取到了鎖,此時就需要把當(dāng)前線程以及等待狀態(tài)等信息添加到同步隊列中艘儒,下面來看看同步器在線程未獲取到鎖時具體是如何實現(xiàn)聋伦。
通過源碼發(fā)現(xiàn),當(dāng)獲取鎖失敗時界睁,會執(zhí)行判斷條件與操作的后半部分 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
,首先指定鎖模式為 Node.EXCLUSIVE
調(diào)用 addWaiter
方法兵拢,該方法源碼如下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
通過方法參數(shù)指定的鎖模式(共享鎖 or 獨(dú)占鎖)和當(dāng)前線程構(gòu)造出一個 Node
節(jié)點(diǎn)翻斟,如果同步隊列已經(jīng)初始化,那么首先會進(jìn)行一次從尾部加入隊列的嘗試说铃,使用 compareAndSetTail
方法保證原子性访惜,進(jìn)入該方法源碼可以發(fā)現(xiàn)是基于 sun.misc
包下提供的 Unsafe
類來實現(xiàn)的。如果首次嘗試加入同步隊列失敗腻扇,會再次調(diào)用 enq
方法進(jìn)行入隊操作债热,繼續(xù)跟進(jìn) enq
方法源碼如下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
通過其源碼可以發(fā)現(xiàn)和第一次嘗試加入隊列的代碼類似,只是該方法里面加了同步隊列初始化判斷幼苛,使用 compareAndSetHead
方法保證設(shè)置頭節(jié)點(diǎn)的原子性窒篱,同樣它底層也是基于 Unsafe
類,然后外層套了一個 for (;;)
死循環(huán)舶沿,循環(huán)唯一的退出條件是從隊尾入隊成功墙杯,也就是說如果從該方法成功返回了就表示已經(jīng)入隊成功了,至此括荡,addWaiter
執(zhí)行完畢返回當(dāng)前 Node
節(jié)點(diǎn)高镐。然后以該節(jié)點(diǎn)作為 acquireQueued
方法的入?yún)⒗^續(xù)進(jìn)行其它步驟,該方法如下所示:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看到畸冲,該方法本質(zhì)上也是通過一個死循環(huán)(自旋)去獲取鎖并且支持中斷嫉髓,在循環(huán)體外面定義兩個標(biāo)記變量观腊,failed
標(biāo)記是否成功獲取到鎖,interrupted
標(biāo)記在等待的過程中是否被中斷過算行。方法首先通過 predecessor
獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)梧油,當(dāng)當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是 head
頭節(jié)點(diǎn)時就調(diào)用 tryAcquire
嘗試獲取鎖,也就是第二個節(jié)點(diǎn)則嘗試獲取鎖纱意,這里為什么要從第二個節(jié)點(diǎn)才嘗試獲取鎖呢婶溯?是因為同步隊列本質(zhì)上是一個雙向鏈表
,在雙向鏈表中偷霉,第一個節(jié)點(diǎn)并不存儲任何數(shù)據(jù)是虛節(jié)點(diǎn)迄委,只是起到一個占位的作用,真正存儲數(shù)據(jù)的節(jié)點(diǎn)是從第二個節(jié)點(diǎn)開始的类少。如果成功獲取鎖叙身,也就是 tryAcquire
方法返回 true
后,將 head
指向當(dāng)前節(jié)點(diǎn)并把之前找到的頭節(jié)點(diǎn) p
從隊列中移除硫狞,修改是否成功獲取到鎖標(biāo)記信轿,結(jié)束方法返回中斷標(biāo)記。
如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn) p
不是頭節(jié)點(diǎn)或者前驅(qū)節(jié)點(diǎn) p
是頭節(jié)點(diǎn)但是獲取鎖操作失敗残吩,那么會調(diào)用 shouldParkAfterFailedAcquire
方法判斷當(dāng)前 node
節(jié)點(diǎn)是否需要被阻塞财忽,這里的阻塞判斷主要是為了防止長時間自旋給 CPU
帶來非常大的執(zhí)行開銷,浪費(fèi)資源泣侮。該方法源碼如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
方法參數(shù)為當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)以及當(dāng)前節(jié)點(diǎn)即彪,主要是靠前驅(qū)節(jié)點(diǎn)來判斷是否需要進(jìn)行阻塞,首先獲取到前驅(qū)節(jié)點(diǎn)的等待狀態(tài) ws
活尊,如果節(jié)點(diǎn)狀態(tài) ws
為 SIGNAL
隶校,表示前驅(qū)節(jié)點(diǎn)的線程已經(jīng)準(zhǔn)備完畢,等待資源釋放蛹锰,方法返回 true
表示可以阻塞深胳,如果 ws > 0
,通過上文可以知道節(jié)點(diǎn)只有一個狀態(tài) CANCELLED(值為 1)
滿足該條件铜犬,表示該節(jié)點(diǎn)線程獲取鎖的請求已經(jīng)取消了舞终,會通過一個 do-while
循環(huán)向前查找 CANCELLED
狀態(tài)的節(jié)點(diǎn)并將其從同步隊列中移除,否則進(jìn)入 else
分支翎苫,使用 compareAndSetWaitStatus
原子操作將前驅(qū)節(jié)點(diǎn)的等待狀態(tài)修改為 SIGNAL
权埠,以上這兩種情況都不需要進(jìn)行阻塞方法返回 false
。
當(dāng)經(jīng)過判斷后需要阻塞的話煎谍,也就是 compareAndSetWaitStatus
方法返回 true
時攘蔽,會通過 parkAndCheckInterrupt
方法阻塞掛起當(dāng)前線程,并返回當(dāng)前線程的中斷標(biāo)識呐粘。方法如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
線程阻塞是通過 LockSupport
這個工具類實現(xiàn)的满俗,深入其源碼可以發(fā)現(xiàn)它底層也是基于 Unsafe
類實現(xiàn)的转捕。如果以上兩個方法都返回 true
的話就更新中斷標(biāo)記。這里還有一個問題就是什么時候會將一個節(jié)點(diǎn)的等待狀態(tài) waitStatus
修改為 CANCELLED
節(jié)點(diǎn)線程獲取鎖的請求取消狀態(tài)呢唆垃?細(xì)心的朋友可能已經(jīng)發(fā)現(xiàn)了五芝,在上文貼出的 acquireQueued
方法源碼中的 finally
塊中會根據(jù) failed
標(biāo)記來決定是否調(diào)用 cancelAcquire
方法,這個方法就是用來將節(jié)點(diǎn)狀態(tài)修改為 CANCELLED
的辕万,方法的具體實現(xiàn)留給大家去探索枢步。至此 AQS
獨(dú)占式同步狀態(tài)獲取鎖的流程就完成了,下面通過一個流程圖來看看整體流程:
下面再看看獨(dú)占式鎖釋放的過程渐尿,同步器使用 release
方法來讓我們進(jìn)行獨(dú)占式鎖的釋放醉途,其方法源碼如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先調(diào)用 tryRelease
方法嘗試進(jìn)行鎖釋放操作,繼續(xù)跟進(jìn)該方法發(fā)現(xiàn)同步器只是拋出了一個不支持操作異常 UnsupportedOperationException
砖茸,這里和上文獨(dú)占鎖獲取中 tryAcquire
方法是一樣的套路隘擎,需要開發(fā)者自己定義鎖釋放操作。
通過其 JavaDoc
可以得知凉夯,如果返回 false
货葬,則表示釋放鎖失敗,方法結(jié)束劲够。該方法如果返回 true
震桶,則表示當(dāng)前線程釋放鎖成功,需要通知隊列中等待獲取鎖的線程進(jìn)行鎖獲取操作征绎。首先獲取頭節(jié)點(diǎn) head
尼夺,如果當(dāng)前頭節(jié)點(diǎn)不為 null
,并且其等待狀態(tài)不是初始狀態(tài)(0)炒瘸,則解除線程阻塞掛起狀態(tài),通過 unparkSuccessor
方法實現(xiàn)寝衫,該方法源碼如下:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
首先獲取頭節(jié)點(diǎn)的等待狀態(tài) ws
顷扩,如果狀態(tài)值為負(fù)數(shù)(Node.SIGNAL or Node.PROPAGATE),則通過 CAS 操作將其改為初始狀態(tài)(0)慰毅,然后獲取頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)隘截,如果后繼節(jié)點(diǎn)為 null
或者后繼節(jié)點(diǎn)狀態(tài)為 CANCELLED
(獲取鎖請求已取消),就從隊列尾部開始尋找第一個狀態(tài)為非 CANCELLED
的節(jié)點(diǎn)汹胃,如果該節(jié)點(diǎn)不為空則使用 LockSupport
的 unpark
方法將其喚醒婶芭,該方法底層是通過 Unsafe
類的 unpark
實現(xiàn)的。這里需要從隊尾查找非 CANCELLED
狀態(tài)的節(jié)點(diǎn)的原因是着饥,在之前的獲取獨(dú)占鎖失敗時的入隊 addWaiter
方法實現(xiàn)中犀农,該方法如下:
假設(shè)一個線程執(zhí)行到了上圖中的 ① 處,② 處還沒有執(zhí)行宰掉,此時另一個線程恰好執(zhí)行了 unparkSuccessor
方法呵哨,那么就無法通過從前向后查找了赁濒,因為節(jié)點(diǎn)的后繼指針 next
還沒賦值呢,所以需要從后往前進(jìn)行查找孟害。至此拒炎,獨(dú)占式鎖釋放操作就結(jié)束了,同樣的挨务,最后我們也通過一個流程圖來看看整個鎖釋放的過程:
獨(dú)占式可中斷同步狀態(tài)獲取
同步器提供了 acquireInterruptibly
方法來進(jìn)行可響應(yīng)中斷的獲取鎖操作击你,方法實現(xiàn)源碼如下:
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
方法首先檢查當(dāng)前線程的中斷狀態(tài),如果已中斷谎柄,則直接拋出中斷異常 InterruptedException
即響應(yīng)中斷丁侄,否則調(diào)用 tryAcquire
方法嘗試獲取鎖,如果獲取成功則方法結(jié)束返回谷誓,獲取失敗調(diào)用 doAcquireInterruptibly
方法绒障,跟進(jìn)該方法如下:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
仔細(xì)觀察可以發(fā)現(xiàn)該方法實現(xiàn)源碼和上文中 acquireQueued
方法的實現(xiàn)基本上類似,只是這里把入隊操作 addWaiter
放到了方法里面了捍歪,還有一個區(qū)別就是當(dāng)在循環(huán)體內(nèi)判斷需要進(jìn)行中斷時會直接拋出異常來響應(yīng)中斷户辱,兩個方法的對比如下:
其它步驟和獨(dú)占式鎖獲取一致,流程圖大體上和不響應(yīng)中斷的鎖獲取差不多糙臼,只是在最開始多了一步線程中斷狀態(tài)檢查和循環(huán)是會拋出中斷異常而已庐镐。
獨(dú)占式超時獲取同步狀態(tài)
同步器提供了 tryAcquireNanos
方法可以超時獲取同步狀態(tài)(也就是鎖
),該方法提供了之前 synchronized
關(guān)鍵字不支持的超時獲取的特性变逃,通過該方法我們可以在指定時間段 nanosTimeout
內(nèi)獲取鎖必逆,如果獲取到鎖則返回 true
,否則揽乱,返回 false
名眉。方法源碼如下:
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
首先會調(diào)用 tryAcquire
方法嘗試獲取一次鎖,如果獲取鎖成功則立即返回凰棉,否則調(diào)用 doAcquireNanos
方法進(jìn)入超時獲取鎖流程损拢。通過上文可以得知,同步器的 acquireInterruptibly
方法在等待獲取同步狀態(tài)時撒犀,如果當(dāng)前線程被中斷了福压,會拋出中斷異常 InterruptedException
并立刻返回。超時獲取鎖的流程其實是在響應(yīng)中斷的基礎(chǔ)上增加了超時獲取的特性或舞,doAcquireNanos
方法的源碼如下:
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
由以上方法實現(xiàn)源碼可以看出荆姆,針對超時獲取這里主要實現(xiàn)思路是:先使用當(dāng)前時間加上參數(shù)傳入的超時時間間隔 deadline
計算出超時的時間點(diǎn),然后每次進(jìn)行循環(huán)的時候使用超時時間點(diǎn) deadline
減去當(dāng)前時間得到剩余的時間 nanosTimeout
映凳,如果剩余時間小于 0 則證明當(dāng)前獲取鎖操作已經(jīng)超時胆筒,方法結(jié)束返回 false
,反如果剩余時間大于 0魏宽。
可以看到在里面執(zhí)行自旋的時候和上面獨(dú)占式同步獲取鎖狀態(tài) acquireQueued
方法那里是一樣的套路腐泻,即當(dāng)當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)時調(diào)用 tryAcquire
嘗試獲取鎖决乎,如果獲取成功則返回。
除了超時時間計算那里不同外派桩,還有個不同的地方就是在超時獲取鎖失敗之后的操作构诚,如果當(dāng)前線程獲取鎖失敗,則判斷剩余超時時間 nanosTimeout
是否小于 0铆惑,如果小于 0 則表示已經(jīng)超時方法立即返回范嘱,反之則會判斷是否需要進(jìn)行阻塞掛起當(dāng)前線程,如果通過 shouldParkAfterFailedAcquire
方法判斷需要掛起阻塞當(dāng)前線程员魏,還要進(jìn)一步比較超時剩余時間 nanosTimeout
和 spinForTimeoutThreshold
的大小丑蛤,如果小于等于 spinForTimeoutThreshold
值(1000 納秒)的話,將不會使當(dāng)前線程進(jìn)行超時等待撕阎,而是再次進(jìn)行自旋過程受裹。
加后面這個判斷的主要原因在于,在非常短(小于 1000 納秒)的時間內(nèi)的等待無法做到十分精確虏束,如果這時還進(jìn)行超時等待的話棉饶,反而會讓我們指定 nanosTimeout
的超時從整體上給人感覺反而不太精確,因此镇匀,在剩余超時時間非常短的情況下照藻,同步器會再次自旋進(jìn)行超時獲取鎖的過程,獨(dú)占式超時獲取鎖整個過程如下所示:
共享式同步狀態(tài)獲取與釋放
共享鎖
顧名思義就是可以多個線程共用一個鎖汗侵,在同步器中使用 acquireShared
來獲取共享鎖(同步狀態(tài))幸缕,方法源碼如下:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
首先通過 tryAcquireShared
嘗試獲取共享鎖,該方法是一個模板方法在同步器中只是拋出一個不支持操作異常晰韵,需要開發(fā)人員自己去實現(xiàn)发乔,同時方法的返回值有三種不同的類型分別代表三種不同的狀態(tài),其含義如下:
- 小于 0 表示當(dāng)前線程獲取鎖失敗
- 等于 0 表示當(dāng)前線程獲取鎖成功雪猪,但是之后的線程在沒有鎖釋放的情況下獲取鎖將失敗列疗,也就是說這個鎖是共享模式下的最后一把鎖了
- 大于 0 表示當(dāng)前線程獲取鎖成功,并且還有剩余的鎖可以獲取
當(dāng)方法 tryAcquireShared
返回值小于 0 時浪蹂,也就是獲取鎖失敗,將會執(zhí)行方法 doAcquireShared
告材,繼續(xù)跟進(jìn)該方法:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
方法首先調(diào)用 addWaiter
方法封裝當(dāng)前線程和等待狀態(tài)為共享模塊的節(jié)點(diǎn)并將其添加到等待同步隊列中坤次,可以發(fā)現(xiàn)在共享模式下節(jié)點(diǎn)的 nextWaiter
屬性是固定值 Node.SHARED
。然后循環(huán)獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)斥赋,如果前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)的話就嘗試獲取共享鎖缰猴,如果返回值大于等于 0 表示獲取共享鎖成功,則調(diào)用 setHeadAndPropagate
方法疤剑,更新頭節(jié)點(diǎn)同時如果有可用資源滑绒,則向后傳播闷堡,喚醒后繼節(jié)點(diǎn),接下來會檢查一下中斷標(biāo)識疑故,如果已經(jīng)中斷則中斷當(dāng)前線程杠览,方法結(jié)束返回。如果返回值小于 0纵势,則表示獲取鎖失敗踱阿,需要掛起阻塞當(dāng)前線程或者繼續(xù)自旋獲取共享鎖。下面看看 setHeadAndPropagate
方法的具體實現(xiàn):
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
首先將當(dāng)前獲取到鎖的節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)钦铁,然后方法參數(shù) propagate > 0
時表示之前 tryAcquireShared
方法的返回值大于 0软舌,也就是說當(dāng)前還有剩余的共享鎖可以獲取,則獲取當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)并且后繼節(jié)點(diǎn)是共享節(jié)點(diǎn)時喚醒節(jié)點(diǎn)去嘗試獲取鎖牛曹,doReleaseShared
方法是同步器共享鎖釋放的主要邏輯佛点。
同步器提供了 releaseShared
方法來進(jìn)行共享鎖的釋放,方法源碼如下所示:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
首先調(diào)用 tryReleaseShared
方法嘗試釋放共享鎖黎比,方法返回 false
代表鎖釋放失敗超营,方法結(jié)束返回 false
,否則就表示成功釋放鎖焰手,然后執(zhí)行 doReleaseShared
方法糟描,進(jìn)行喚醒后繼節(jié)點(diǎn)并檢查它是否可以向后傳播等操作。繼續(xù)跟進(jìn)該方法如下:
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
可以看到和獨(dú)占式鎖釋放不同的是书妻,在共享模式下船响,狀態(tài)同步和釋放可以同時執(zhí)行,其原子性由 CAS
來保證躲履,如果頭節(jié)點(diǎn)改變了也會繼續(xù)循環(huán)见间。每次共享節(jié)點(diǎn)在共享模式下喚醒時,頭節(jié)點(diǎn)都會指向它工猜,這樣就可以保證可以獲取到共享鎖的所有后續(xù)節(jié)點(diǎn)都可以喚醒了米诉。
如何自定義同步組件
在 JDK
中基于同步器實現(xiàn)的一些類絕大部分都是聚合了一個或多個繼承了同步器的類,使用同步器提供的模板方法自定義內(nèi)部同步狀態(tài)的管理篷帅,然后通過這個內(nèi)部類去實現(xiàn)同步狀態(tài)管理
的功能史侣,其實這從某種程度上來說使用了 模板模式
。比如 JDK
中可重入鎖 ReentrantLock
魏身、讀寫鎖 ReentrantReadWriteLock
惊橱、信號量 Semaphore
以及同步工具類 CountDownLatch
等,其源碼部分截圖如下:
通過上文可以知道箭昵,我們基于同步器可以分別自定義獨(dú)占鎖同步組件和共享鎖同步組件税朴,下面以實現(xiàn)一個在同一個時刻最多只允許 3 個線程訪問,其它線程的訪問將被阻塞的同步工具 TripletsLock
為例,很顯然這個工具是共享鎖模式正林,主要思路就是去實現(xiàn)一個 JDk
中的 Lock
接口來提供面向使用者的方法泡一,比如,調(diào)用 lock
方法獲取鎖觅廓,使用 unlock
來對鎖進(jìn)行釋放等鼻忠,在 TripletsLock
類內(nèi)部有一個自定義同步器 Sync
繼承自同步器 AQS,用來對線程的訪問和同步狀態(tài)進(jìn)行控制哪亿,當(dāng)線程調(diào)用 lock
方法獲取鎖時粥烁,自定義同步器 Sync
先計算出獲取到鎖后的同步狀態(tài),然后使用 Unsafe
類操作來保證同步狀態(tài)更新的原子性蝇棉,由于同一時刻只能 3 個線程訪問讨阻,這里我們可以將同步狀態(tài) state
的初始值設(shè)置為 3,表示當(dāng)前可用的同步資源數(shù)量篡殷,當(dāng)有線程成功獲取到鎖時將同步狀態(tài) state
減 1钝吮,有線程成功釋放鎖時將同步狀態(tài)加 1
,同步狀態(tài)的取值范圍為 0板辽、1奇瘦、2、3劲弦,同步狀態(tài)為 0 時表示沒有可用同步資源耳标,這個時候如果有線程訪問將被阻塞。下面來看看這個自定義同步組件的實現(xiàn)代碼:
/**
* @author mghio
* @date: 2020-06-13
* @version: 1.0
* @description:
* @since JDK 1.8
*/
public class TripletsLock implements Lock {
private final Sync sync = new Sync(3);
private static final class Sync extends AbstractQueuedSynchronizer {
public Sync(int state) {
setState(state);
}
Condition newCondition() {
return new ConditionObject();
}
@Override
protected int tryAcquireShared(int reduceCount) {
for (; ;) {
int currentState = getState();
int newState = currentState - reduceCount;
if (newState < 0 || compareAndSetState(currentState, newState)) {
return newState;
}
}
}
@Override
protected boolean tryReleaseShared(int count) {
for (; ;) {
int currentState = getState();
int newState = currentState + count;
if (compareAndSetState(currentState, newState)) {
return true;
}
}
}
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) > 0;
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
下面啟動 20 個線程測試看看自定義同步同步工具類 TripletsLock
是否達(dá)到我們的預(yù)期邑跪。測試代碼如下:
/**
* @author mghio
* @date: 2020-06-13
* @version: 1.0
* @description:
* @since JDK 1.8
*/
public class TripletsLockTest {
private final Lock lock = new TripletsLock();
private final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Test
public void testTripletsLock() {
// 啟動 20 個線程
for (int i = 0; i < 20; i++) {
Thread worker = new Runner();
worker.setDaemon(true);
worker.start();
}
for (int i = 0; i < 20; i++) {
second(2);
System.out.println();
}
}
private class Runner extends Thread {
@Override
public void run() {
for (; ;) {
lock.lock();
try {
second(1);
System.out.println(dateFormat.format(new Date()) + " ----> " + Thread.currentThread().getName());
second(1);
} finally {
lock.unlock();
}
}
}
}
private static void second(long seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
測試結(jié)果如下:
從以上測試結(jié)果可以發(fā)現(xiàn)次坡,同一時刻只有三個線程可以獲取到鎖,符合預(yù)期画畅,這里需要明確的是這個鎖獲取過程是非公平的砸琅。
總結(jié)
本文主要是對同步器中的基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)、獨(dú)占式與共享式同步狀態(tài)獲取與釋放過程做了簡要分析轴踱,由于水平有限如有錯誤之處還請留言討論症脂。隊列同步器 AbstractQueuedSynchronizer
是 JDK
中很多的一些多線程并發(fā)工具類的實現(xiàn)基礎(chǔ)框架,對其深入學(xué)習(xí)理解有助于我們更好的去使用其特性和相關(guān)工具類淫僻。
參考文章
Java并發(fā)編程的藝術(shù)
Java Synchronizer - AQS Learning
從 ReentrantLock 的實現(xiàn)看 AQS 的原理及應(yīng)用
The java.util.concurrent Synchronizer Framework