隊列同步器AbstractQueuedSynchronizer(以下簡稱同步器),是用來構(gòu)建鎖或者其他同步組件的基礎框架嘶朱,它使用了一個int成員變量表示同步狀態(tài),通過內(nèi)置的FIFO隊列來完成資源獲取線程的排隊工作移必,并發(fā)包的作者(Doug Lea)期望它能夠成為實現(xiàn)大部分同步需求的基礎掖蛤。
顧名思義,AQS不是一個實際的類色鸳,它是一個抽象類社痛,需要繼承該類并且實現(xiàn)抽象方法來管理同步狀態(tài)见转。而管理同步狀態(tài)時不免要對同步狀態(tài)進行更改命雀,這就需要使用到以下三個方法:
-
getState()
獲取當前同步狀態(tài)。 -
setState(int newState)
設置當前同步狀態(tài)斩箫。 -
compareAndSetState(int expect,int update)
使用CAS設置當前狀態(tài)吏砂,該方法能夠保證狀態(tài) 設置的原子性。
子類推薦被定義為自定義同步組件的靜態(tài)內(nèi)部 類乘客,同步器自身沒有實現(xiàn)任何同步接口狐血,它僅僅是定義了若干同步狀態(tài)獲取和釋放的方法來 供自定義同步組件使用,同步器既可以支持獨占式地獲取同步狀態(tài)易核,也可以支持共享式地獲 取同步狀態(tài)匈织,這樣就可以方便實現(xiàn)不同類型的同步組件(ReentrantLock、 ReentrantReadWriteLock和CountDownLatch等)
同步器是實現(xiàn)鎖(也可以是任意同步組件)的關(guān)鍵牡直,在鎖的實現(xiàn)中聚合同步器缀匕,利用同步 器實現(xiàn)鎖的語義∨鲆荩可以這樣理解二者之間的關(guān)系:
- 鎖是面向使用者的乡小,它定義了使用者與鎖交 互的接口(比如可以允許兩個線程并行訪問),隱藏了實現(xiàn)細節(jié)饵史;
- 同步器面向的是鎖的實現(xiàn)者满钟, 它簡化了鎖的實現(xiàn)方式胜榔,屏蔽了同步狀態(tài)管理、線程的排隊湃番、等待與喚醒等底層操作夭织。
鎖和同 步器很好地隔離了使用者和實現(xiàn)者所需關(guān)注的領域。
同步器接口實例:
同步器可重寫的方法:
方法名稱 | 方法描述 |
---|---|
boolean tryAcquire(int arg) |
獨占式獲取同步狀態(tài)吠撮,實現(xiàn)該方法需要查詢當前 狀態(tài)并判斷同步狀態(tài)是否符合預期摔癣,然后再進行 CAS設置同步狀態(tài) |
boolean tryRelease(int arg) |
獨占式釋放同步狀態(tài),等待獲取同步狀態(tài)的 線程將有機會獲取同步狀態(tài) |
int tryAcquireShared(int arg) |
共享式獲取同步狀態(tài)纬向,返回大于等于0的值择浊, 表示獲取成功,反之逾条,獲取失敗 |
boolean tryReleaseShared(int arg) |
共享式釋放同步狀態(tài) |
boolean isHeldExclusively() |
當前同步器是否在獨占模式下被線程占用琢岩, 一般該方法表示是否被當前線程所獨占 |
實現(xiàn)自定義同步組件時,將會調(diào)用同步器提供的模板方法师脂,這些(部分)模板方法與描述如下表所示
方法名稱 | 方法描述 |
---|---|
void acquire(int arg) |
獨占式獲取同步狀態(tài)担孔,如果當前線程獲取同 步狀態(tài)成功,則由該方法返回 否則,將會進 入同步隊列等待吃警,該方法將會調(diào)用重寫的 tryAcquire(int arg)方法 |
void acquireInterruptibly(int arg) |
與 acquire(int arg)相同,但是該方法響應中斷 糕篇,當前線程未獲取到 同步狀態(tài)而進入同步 隊列中,如果當前線程被中斷酌心,則該方法會 拋出Interruptedexception并返回 |
tryAcquireNanos(int arg, long nanosTimeout) |
在 acquireinterruptibly(int arg)基礎上增加 了超時限制拌消,如果當前線程在超時時間內(nèi)沒 有獲取到同步狀態(tài),那么將會返回 false安券, 如果獲取到了返回true |
void acquireShared(int arg) |
共享式的獲取同步狀態(tài),如果當前線程未獲 取到同步狀態(tài)墩崩,將會進入同步隊列等待,與 獨占式獲取的主要區(qū)別是在同一時刻可以 有多個線程獲取到同步狀態(tài) |
void acquireSharedInterruptibly(int arg) |
與 acquireShared(int arg)相同,該方法響應中斷 |
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) |
在 acquireSharedinterruptibly(int arg) 基礎上增加了超時限制 |
boolean release(int arg) |
獨占式的釋放同步狀態(tài)侯勉,該方法會在釋放同 步狀態(tài)之后鹦筹,將同步隊列中第一個節(jié)點包含的線程喚醒 |
boolean releaseShared(int arg) |
共享式的釋放同步狀態(tài) |
Collection<Thread> getQueuedThreads() |
獲取等待在同步隊列上的線程集合 |
同步器提供的模板方法基本上分為3類:
- 獨占式獲取與釋放同步狀態(tài)
- 共享式獲取與釋放
- 同步狀態(tài)和查詢同步隊列中的等待線程情況
通過上述模板方法,我們可以將鎖大致分為獨占式鎖與共享式鎖址貌。
獨占式鎖
顧名思義铐拐,獨占鎖就是在同一時刻只能有一個線程獲取到鎖,而其他獲取鎖的線程只能 處于同步隊列中等待练对,只有獲取鎖的線程釋放了鎖遍蟋,后繼的線程才能夠獲取鎖。下面我們通過一段代碼來演示一下:
public class Mutex implements Lock {
// 靜態(tài)內(nèi)部類锹淌,自定義同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 是否處于占用狀態(tài)
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 當狀態(tài)為0的時候獲取鎖
@Override
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 釋放鎖匿值,將狀態(tài)設置為0
@Override
protected boolean tryRelease(int releases) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 返回一個Condition,每個condition都包含了一個condition隊列
Condition newCondition() {
return new ConditionObject();
}
}
// 僅需要將操作代理到Sync上即可
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
上述示例中赂摆,獨占鎖Mutex是一個自定義同步組件挟憔,它在同一時刻只允許一個線程占有鎖钟些。Mutex中定義了一個靜態(tài)內(nèi)部類,該內(nèi)部類繼承了同步器并實現(xiàn)了獨占式獲取和釋放同步狀態(tài)绊谭。在tryAcquire(int acquires)方法中政恍,如果經(jīng)過CAS設置成功(同步狀態(tài)設置為1),則代表獲取了同步狀態(tài)达传,而在tryRelease(int releases)方法中只是將同步狀態(tài)重置為0篙耗。用戶使用Mutex時并不會直接和內(nèi)部同步器的實現(xiàn)打交道,而是調(diào)用Mutex提供的方法宪赶,在Mutex的實現(xiàn)中宗弯,以獲取鎖的lock()方法為例,只需要在方法實現(xiàn)中調(diào)用同步器的模板方法acquire(int args)即可搂妻,當前線程調(diào)用該方法獲取同步狀態(tài)失敗后會被加入到同步隊列中等待蒙保,這樣就大大降低了實現(xiàn)一個可靠自定義同步組件的門檻。
共享式鎖
共享式獲取與獨占式獲取最主要的區(qū)別在于同一時刻能否有多個線程同時獲取到同步狀態(tài)欲主。以文件的讀寫為例邓厕,如果一個程序在對文件進行讀操作,那么這一時刻對于該文件的寫操作均被阻塞扁瓢,而讀操作能夠同時進行详恼。寫操作要求對資源的獨占式訪問,而讀操作可以是共享式訪問引几,兩種不同的訪問模式在同一時刻對文件或資源的訪問情況如下圖所示:
上圖中左半部分昧互,共享式訪問資源時,其他共享式的訪問均被允許她紫,而獨占式訪問被阻塞硅堆,右半部分是獨占式訪問資源時屿储,同一時刻其他訪問均被阻塞贿讹。
下面我們通過一段代碼來實際演示共享式同步鎖的使用:
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero.");
}
setState(count);
}
public int tryAcquireShared(int reduceCount) {
for (; ; ) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
public boolean tryReleaseShared(int returnCount) {
for (; ; ) {
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}
public void lock() {
sync.acquireShared(1);
}
public void unlock() {
sync.releaseShared(1);
}
// 其他接口方法略
}
在上述示例中,TwinsLock實現(xiàn)了Lock接口够掠,提供了面向使用者的接口民褂,使用者調(diào)用lock()方法獲取鎖,隨后調(diào)用unlock()方法釋放鎖疯潭,而同一時刻只能有兩個線程同時獲取到鎖赊堪。TwinsLock同時包含了一個自定義同步器Sync,而該同步器面向線程訪問和同步狀態(tài)控制竖哩。以共享式獲取同步狀態(tài)為例:同步器會先計算出獲取后的同步狀態(tài)哭廉,然后通過CAS確保狀態(tài)的正確設置,當tryAcquireShared(int reduceCount)方法返回值大于等于0時相叁,當前線程才獲取同步狀態(tài)遵绰,對于上層的TwinsLock而言辽幌,則表示當前線程獲得了鎖。
下面編寫一個測試來驗證TwinsLock是否能按照預期工作椿访。在測試用例中乌企,定義了工作者線程Worker,該線程在執(zhí)行過程中獲取鎖成玫,當獲取鎖之后使當前線程睡眠0.5秒(并不釋放鎖)加酵,隨后打印當前線程名稱,最后再次睡眠1秒并釋放鎖哭当,測試用例如下:
@Test
public void twinsLockTest() throws InterruptedException {
final Lock lock = new TwinsLock();
class Worker extends Thread {
public void run() {
while (true) {
// System.out.println("before - " + Thread.currentThread().getName());
lock.lock();
try {
Thread.sleep(1000);
System.err.println("got - " + Thread.currentThread().getName());
Thread.sleep(1000);
} catch (InterruptedException e) {
} finally {
lock.unlock();
}
// System.out.println("after - " + Thread.currentThread().getName());
}
}
}
// 啟動10個線程
for (int i = 0; i < 10; i++) {
Worker w = new Worker();
w.start();
}
// 每隔1秒換行
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
System.out.println();
}
}
運行該測試用例猪腕,可以看到線程名稱成對輸出,也就是在同一時刻只有兩個線程能夠獲 取到鎖钦勘,這表明TwinsLock可以按照預期正確工作码撰。
啟動運行之后會看到基本獲取到共享資源的線程都是固定的那兩個,這是因為在沒獲取到共享資源時該線程會被加入AQS的等待隊列个盆,在釋放資源之后再被喚醒重新競爭資源脖岛,而由于之前等待的線程需要被喚醒才能重新競爭共享資源,而釋放資源的線程由于不需要喚醒颊亮,所以大概率會比其他線程優(yōu)先再次獲取到鎖柴梆,可以將before和after打印語句取消注釋在運行,這樣可以解決這個問題终惑。
對于AQS隊列同步器的實現(xiàn)分析我們放到下一節(jié)去分析绍在,本節(jié)主要簡單的講解和舉例AQS對于共享式和獨占式鎖的實現(xiàn)