AbstractQueuedSynchronizer
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements Serializable
概念
為實現(xiàn)依賴于先進先出 (FIFO) 等待隊列的阻塞鎖和相關(guān)同步器(信號量捣染、事件等等)提供一個框架。此類的設(shè)計目標(biāo)是成為依靠單個原子 int
值來表示狀態(tài)的大多數(shù)同步器的一個有用基礎(chǔ)停巷。子類必須定義更改此狀態(tài)的受保護方法液斜,并定義哪種狀態(tài)對于此對象意味著被獲取或被釋放。假定這些條件之后叠穆,此類中的其他方法就可以實現(xiàn)所有排隊和阻塞機制蕴潦。子類可以維護其他狀態(tài)字段匣缘,但只是為了獲得同步而只追蹤使用 getState()
、setState(int)
和 compareAndSetState(int, int)
方法來操作以原子方式更新的 int
值。
應(yīng)該將子類定義為非公共內(nèi)部幫助器類途茫,可用它們來實現(xiàn)其封閉類的同步屬性翼岁。類 AbstractQueuedSynchronizer
沒有實現(xiàn)任何同步接口。而是定義了諸如 acquireInterruptibly(int)
之類的一些方法,在適當(dāng)?shù)臅r候可以通過具體的鎖和相關(guān)同步器來調(diào)用它們始鱼,以實現(xiàn)其公共方法。
此類支持默認(rèn)的獨占模式和共享模式之一脆贵,或者二者都支持医清。處于獨占模式下時,其他線程試圖獲取該鎖將無法取得成功卖氨。在共享模式下会烙,多個線程獲取某個鎖可能(但不是一定)會獲得成功。此類并不"了解"這些不同筒捺,除了機械地意識到當(dāng)在共享模式下成功獲取某一鎖時柏腻,下一個等待線程(如果存在)也必須確定自己是否可以成功獲取該鎖。處于不同模式下的等待線程可以共享相同的 FIFO 隊列系吭。通常五嫂,實現(xiàn)子類只支持其中一種模式,但兩種模式都可以在(例如)ReadWriteLock
中發(fā)揮作用肯尺。只支持獨占模式或者只支持共享模式的子類不必定義支持未使用模式的方法沃缘。
此類通過支持獨占模式的子類定義了一個嵌套的 AbstractQueuedSynchronizer.ConditionObject
類,可以將這個類用作 Condition
實現(xiàn)则吟。其中:
-
isHeldExclusively()
方法將報告同步對于當(dāng)前線程是否是獨占的 - 使用當(dāng)前
getState()
值調(diào)用release(int)
方法則可以完全釋放此對象 - 如果給定保存的狀態(tài)值槐臀,那么
acquire(int)
方法可以將此對象最終恢復(fù)為它以前獲取的狀態(tài)
沒有別的 AbstractQueuedSynchronizer
方法創(chuàng)建這樣的條件,因此逾滥,如果無法滿足此約束峰档,則不要使用它败匹。AbstractQueuedSynchronizer.ConditionObject
的行為當(dāng)然取決于其同步器實現(xiàn)的語義寨昙。
此類為內(nèi)部隊列提供了檢查、檢測和監(jiān)視方法掀亩,還為 condition 對象提供了類似方法舔哪。可以根據(jù)需要使用用于其同步機制的 AbstractQueuedSynchronizer
將這些方法導(dǎo)出到類中槽棍。
此類的序列化只存儲維護狀態(tài)的基礎(chǔ)原子整數(shù)捉蚤,因此已序列化的對象擁有空的線程隊列。需要可序列化的典型子類將定義一個 readObject
方法炼七,該方法在反序列化時將此對象恢復(fù)到某個已知初始狀態(tài)缆巧。
使用
為了將此類用作同步器的基礎(chǔ),需要適當(dāng)?shù)刂匦露x以下方法豌拙,這是通過使用 getState()
陕悬、setState(int)
和 compareAndSetState(int, int)
方法來檢查和修改同步狀態(tài)來實現(xiàn)的:
-
tryAcquire(int)
試圖在獨占模式下獲取對象狀態(tài)。此方法應(yīng)該查詢是否允許它在獨占模式下獲取對象狀態(tài)按傅,如果允許捉超,則獲取它胧卤。 此方法總是由執(zhí)行 acquire 的線程來調(diào)用。
其定義了獲取鎖的一種方式拼岳,具體的邏輯判斷等枝誊,由
acquire(int)
方法調(diào)用并做調(diào)度,如下:@Override protected boolean tryAcquire(int arg) { Thread thread = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(c, arg)) { setExclusiveOwnerThread(Thread.currentThread()); } return true; } else if (getExclusiveOwnerThread() == thread) { int nextC = c + arg; if (nextC < 0) { throw new Error("Maximum lock count exceeded"); } setState(nextC); return true; } return false; }
-
tryRelease(int)
試圖設(shè)置狀態(tài)來反映獨占模式下的一個釋放惜纸。此方法總是由正在執(zhí)行釋放的線程調(diào)用叶撒。
其定義了釋放鎖的一種方式,具體的邏輯判斷等堪簿,由
release(int)
方法調(diào)用并做調(diào)度痊乾,如下:@Override protected boolean tryRelease(int arg) { if (getExclusiveOwnerThread() != Thread.currentThread()) { throw new IllegalMonitorStateException(); } int c = getState() - arg; boolean free = false; if (c == 0) { setExclusiveOwnerThread(null); free = true; } setState(c); return free; }
-
tryAcquireShared(int)
試圖在共享模式下獲取對象狀態(tài)。此方法應(yīng)該查詢是否允許它在共享模式下獲取對象狀態(tài)椭更,如果允許哪审,則獲取它。 此方法總是由執(zhí)行 acquire 線程來調(diào)用虑瀑。如果此方法報告失敗湿滓,則 acquire 方法可以將線程加入隊列(如果還沒有將它加入隊列),直到其獲得了其他某個線程釋放了該線程的信號舌狗。
-
tryReleaseShared(int)
試圖設(shè)置狀態(tài)來反映共享模式下的一個釋放叽奥。 此方法總是由正在執(zhí)行釋放的線程調(diào)用。
-
isHeldExclusively()
如果對于當(dāng)前(正調(diào)用的)線程痛侍,同步是以獨占方式進行的朝氓,則返回 true。
@Override protected boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); }
-
AbstractOwnableSynchronizer.setExclusiveOwnerThread(Thread)
設(shè)置當(dāng)前擁有獨占訪問的線程主届。
默認(rèn)情況下,每個方法都拋出UnsupportedOperationException
赵哲。這些方法的實現(xiàn)在內(nèi)部必須是線程安全的,通常應(yīng)該很短并且不被阻塞君丁。定義這些方法是使用此類的唯一受支持的方式枫夺。其他所有方法都被聲明final,因為它們無法是各不相同的绘闷。
您也可以查找從 AbstractOwnableSynchronizer
繼承的方法橡庞,用于跟蹤擁有獨占同步器的線程。鼓勵使用這些方法印蔗,這允許監(jiān)控和診斷工具來幫助用戶確定哪個線程保持鎖扒最。
即使此類基于內(nèi)部的某個 FIFO 隊列,它也無法強行實施 FIFO 獲取策略华嘹。獨占同步的核心采用以下形式:
Acquire:
while (!tryAcquire(arg)) {
enqueue thread if it is not already queued;
possibly block current thread;
}
Release:
if (tryRelease(arg))
unblock the first queued thread;
因為要在加入隊列之前檢查線程的獲取狀況吧趣,所以新獲取的線程可能闖入其他被阻塞的和已加入隊列的線程之前。不過如果需要,可以內(nèi)部調(diào)用一個或多個檢查方法再菊,通過定義 tryAcquire
和 tryAcquireShared
來禁用闖入爪喘。特別是 getFirstQueuedThread()
沒有返回當(dāng)前線程的時候,嚴(yán)格的 FIFO
鎖定可以定義 tryAcquire
立即返回 false
纠拔。只有 hasQueuedThreads()
返回 true
并且 getFirstQueuedThread
不是當(dāng)前線程時秉剑,更好的非嚴(yán)格公平的版本才可能會立即返回 false
;如果 getFirstQueuedThread
不為 null
并且不是當(dāng)前線程稠诲,則產(chǎn)生的結(jié)果相同侦鹏。出現(xiàn)進一步的變體也是有可能的。
對于默認(rèn)闖入(也稱為 greedy臀叙、renouncement 和 convoy-avoidance)策略略水,吞吐量和可伸縮性通常是最高的。盡管無法保證這是公平的或是無偏向的劝萤,但允許更早加入隊列的線程先于更遲加入隊列的線程再次爭用資源渊涝,并且相對于傳入的線程,每個參與再爭用的線程都有平等的成功機會床嫌。此外跨释,盡管從一般意義上說,獲取并非“自旋”厌处,它們可以在阻塞之前對用其他計算所使用的 tryAcquire
執(zhí)行多次調(diào)用鳖谈。在只保持獨占同步時,這為自旋提供了最大的好處阔涉,但不是這種情況時缆娃,也不會帶來最大的負(fù)擔(dān)。如果需要這樣做瑰排,那么可以使用“快速路徑”檢查來先行調(diào)用 acquire 方法贯要,以這種方式擴充這一點,如果可能不需要爭用同步器凶伙,則只能通過預(yù)先檢查 hasContended()
來確認(rèn)這一點郭毕。
通過特殊化其同步器的使用范圍它碎,此類為部分同步化提供了一個有效且可伸縮的基礎(chǔ)函荣,同步器可以依賴于 int
型的 state、acquire 和 release 參數(shù)扳肛,以及一個內(nèi)部的 FIFO 等待隊列傻挂。這些還不夠的時候,可以使用 atomic
類挖息、自己的定制 Queue
類和 LockSupport
阻塞支持金拒,從更低級別構(gòu)建同步器。
實現(xiàn)公平可重入鎖
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* 自定義可重入公平鎖.
*/
public class CustomFairLock {
private static final class Sync extends AbstractQueuedSynchronizer {
/**
* tryAcquire定義了獲取鎖的方式
*
* @param arg
* @return
*/
@Override
protected boolean tryAcquire(int arg) {
Thread thread = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors()
&& compareAndSetState(0, arg)) {
setExclusiveOwnerThread(Thread.currentThread());
}
return true;
} else if (getExclusiveOwnerThread() == thread) {
int nextC = c + arg;
if (nextC < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(nextC);
return true;
}
return false;
}
/**
* 定義了釋放鎖的方式
*
* @param arg
* @return
*/
@Override
protected boolean tryRelease(int arg) {
int c = getState() - arg;
if (getExclusiveOwnerThread() != Thread.currentThread()) {
throw new IllegalMonitorStateException();
}
boolean free = false;
if (c == 0) {
setExclusiveOwnerThread(null);
free = true;
}
setState(c);
return free;
}
@Override
protected boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
}
private Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public void tryLock(long timeout, TimeUnit unit) throws InterruptedException {
sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public boolean unlock() {
return sync.release(1);
}
}
ReentrantLock
對于ReentrantLock
其實現(xiàn)公平鎖和非公平鎖是通過以下代碼來區(qū)分的:
非公平鎖tryAcquire()
:
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
公平鎖tryAcquire()
:
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//唯一的區(qū)別在這里,公平鎖優(yōu)先判斷隊列中有無以等待的線程,而非公平的則不去判斷,直接compareAndSetState狀態(tài)
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;