AQS負責(zé)管理同步器類中的狀態(tài)肋杖,它管理了一個整數(shù)狀態(tài)信息溉仑,可以通過getState,setState以及compareAndSetState等protected方法來操作状植。這個整數(shù)可以用來表示任意狀態(tài)浊竟。比如怨喘,ReentrantLock用它來表示所有者線程已經(jīng)重復(fù)獲取該鎖的次數(shù),Semaphore用它來表示剩余的許可數(shù)量振定,F(xiàn)utureTask用它來表示任務(wù)的狀態(tài)(尚未開始必怜、正在運行已完成以及已取消)。
如果某個同步器支持獨占的獲取操作后频,那么需要實現(xiàn)一些保護方法梳庆,包括tryAcquire、tryRelease和isHeldExclusively等徘郭,而對于支持共享獲取的同步器靠益,則應(yīng)該實現(xiàn)tryAcquireShared和tryReleaseShared等方法。AQS中的acquire残揉、acquireShared胧后、release和releaseShared等方法都將調(diào)用這些方法在子類中帶有前綴try的版本來判斷某個操作是否能執(zhí)行抱环。
AQS中獲取操作和釋放操作的標(biāo)準(zhǔn)形式:首先壳快,判斷當(dāng)前狀態(tài)是否允許獲得操作;其次镇草,更新同步器的狀態(tài)眶痰。
boolean acquire() throws InterruptedException
{
while(當(dāng)前狀態(tài)不允許獲取操作)
{
if(需要阻塞獲取請求)
{
如果當(dāng)前線程不在隊列中,則將其插入隊列
阻塞當(dāng)前線程
}
else
{
返回失敗
}
}
可能更新同步器的狀態(tài)
如果線程位于隊列中梯啤,則將其移除隊列
返回成功
}
void release()
{
更新同步器的狀態(tài)
if(新的狀態(tài)允許某個被阻塞的線程獲取成功)
{
解除隊列中一個或多個線程的阻塞狀態(tài)
}
}
示例一:使用AbstractQueuedSynchronizer實現(xiàn)的二元閉鎖:
public class OneShotLatch
{
private final Sync sync = new Sync();
public void signal()
{
sync.releaseShared(0);
}
public void await() throws InterruptedException
{
sync.acquireSharedInterruptibly(0);
}
private class Sync extends AbstractQueuedSynchronizer
{
protected int tryAcquireShared(int ignored)
{
//如果閉鎖是開的(state == 1)竖伯,這個操作成功,否則失敗
return (getState() == 1) ? 1 : -1;
}
protected boolean tryReleaseShared(int ignored)
{
//打開閉鎖
setState(1);
//現(xiàn)在其他線程可以獲取該閉鎖
return true;
}
}
}
在OneShotLatch中因宇,AQS狀態(tài)用來表示閉鎖狀態(tài)七婴,關(guān)閉0,打開1察滑。await方法調(diào)用AQS的acquireSharedInterruptibly打厘,然后接著調(diào)用OneShotLatch中的tryAcquireShared方法。若之前已經(jīng)打開了閉鎖贺辰,則tryAcquireShared將返回成功并允許線程通過户盯,否則返回一個表示獲取操作失敗的值。acquireSharedInterruptibly方法在處理失敗的方式饲化,是把這個線程放入等待線程隊列中莽鸭。
java.util.concurrent中的所有同步器類都沒有直接擴展AQS,而是都將他們的相應(yīng)功能委托給私有的AQS子類來實現(xiàn)吃靠。如開篇的類圖所示蒋川。
ReentrantLock
ReentrantLock只支持獨占方式的獲取操作,因此它實現(xiàn)了tryAcquire撩笆、tryRelease和isHeldExclusively捺球。
源碼一:基于非公平的ReentrantLock實現(xiàn)tryAcquire
當(dāng)一個線程嘗試獲取鎖時缸浦,tryAcquire首先檢查鎖的狀態(tài)。若未被持有氮兵,它將嘗試更新鎖的狀態(tài)以表示鎖已經(jīng)被持有裂逐;
若鎖狀態(tài)表明它已經(jīng)被持有,并且如果當(dāng)前線程是鎖的所有者泣栈,則獲取計數(shù)遞增卜高,若不是當(dāng)前持有者,則獲取失敗南片。
final boolean nonfairTryAcquire(int acquires)
{
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0)
{
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread())
{
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
Semaphore與CountDownLatch
Semaphore將AQS的同步狀態(tài)用于保存當(dāng)前可用許可的數(shù)量掺涛。
CountDownLatch使用AQS的方式與Semaphore相似:在同步狀態(tài)中保存的是當(dāng)前的計數(shù)值,countDown方法調(diào)用release疼进,從而減少計數(shù)值薪缆,并且當(dāng)計數(shù)值為零時,解除所有等待線程的阻塞伞广。await調(diào)用acquire拣帽,當(dāng)計數(shù)器為零時,acquire將立即返回嚼锄,否則將阻塞减拭。
源碼二:Semaphore中的tryAcquireShared與tryReleaseShared:
final int tryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
FutureTask
Future.get的語義非常類似于閉鎖的語義---如果發(fā)生了某個事件(由FutureTask表示的任務(wù)執(zhí)行完成或被取消),那么線程就可以恢復(fù)執(zhí)行区丑,否則這些線程將停留在隊列中并直到該事件發(fā)生拧粪。
FutureTask中,AQS同步狀態(tài)被用來保存任務(wù)的狀態(tài)沧侥,例如正在運行可霎、已完成或已取消。此外正什,它還維護了一個引用啥纸,指向正在執(zhí)行計算任務(wù)的線程(如果它當(dāng)前處于運行狀態(tài))号杏,因此婴氮,如果任務(wù)取消,該線程就會中斷盾致。
ReentrantReadWriteLock
ReadWriteLock接口表示存在兩個鎖:一個讀鎖主经,一個寫鎖。讀鎖上的操作使用共享的獲取方法與釋放方法庭惜,寫鎖上的操作將使用獨占的獲取方法和釋放方法罩驻。
AQS內(nèi)部維護一個等待線程隊列,其中記錄了某個線程請求的是獨占訪問還是共享訪問护赊。在ReentrantReadWriteLock中惠遏,當(dāng)鎖可用時砾跃,如果位于隊列頭部的線程執(zhí)行寫入操作,那么線程會得到這個鎖节吮,如果位于隊列頭部的線程執(zhí)行讀取訪問抽高,那么隊列中在這個線程之前的所有線程都將獲得這個鎖。