一.總體框架
AQS是指AbstractQueuedSynchronizer尸红。它是一個抽象類犀变,java并發(fā)包里的ReentrantLock、CountDownLatch和Semaphroe等重要的工具類都是基于AQS來實(shí)現(xiàn)的。
總體來說,AQS維護(hù)了一個volatile的state變量代表共享資源锈锤,還有一個FIFO的等待隊(duì)列院崇,在多線程爭奪資源被阻塞時會進(jìn)入此隊(duì)列了肆氓。等待隊(duì)列是個雙向鏈表記錄則沒有獲取的執(zhí)行許可的線程。等待隊(duì)列中的結(jié)點(diǎn)元素是AQS自定義的static的內(nèi)部類Node底瓣。AQS支持共享和獨(dú)占兩種模式谢揪。ReentrantLock就是獨(dú)占型的,只有一個線程可以獲得到鎖并執(zhí)行捐凭。CountDownLatch和Semaphore就是共享型拨扶,允許多個線程同時執(zhí)行。
AQS是一個抽象類茁肠,并不能被直接實(shí)例化使用患民。它的作用是提供等待隊(duì)列的管理,包括如何入隊(duì)何時喚醒等垦梆。而具體的資源如何獲取和釋放等由具體的自定義同步器來實(shí)現(xiàn)匹颤。也就是說ReentrantLock等類自定義了資源(state)的獲取和釋放仅孩,而使用AQS的來管理阻塞隊(duì)列。不同的自定義資源獲取方式實(shí)現(xiàn)了CountDownLatch和Semaphore等類惋嚎。
自定義同步方法需要實(shí)現(xiàn)的方法有:
- isHeldExclusively() //返回該線程是否正在獨(dú)占資源杠氢,只有用的condition才需要去實(shí)現(xiàn)它
- tryAcquire(int); //獨(dú)占方式,嘗試獲取資源另伍,成功返回true鼻百,失敗返回false
- tryRelease(int); //獨(dú)占方式,嘗試釋放資源摆尝,成功返回true温艇,失敗返回false
- tryAcquireShared(int); //共享方法,嘗試獲取資源堕汞。返回負(fù)數(shù)表示失敗勺爱,0表示成功,但沒有可用資源了讯检,正數(shù)表示成功且有剩余資源
- tryReleaseShared(int);//共享方式琐鲁。嘗試釋放資源。如果釋放后運(yùn)行喚醒后續(xù)結(jié)點(diǎn)返回true人灼,否則返回false
這其中tryAcquire和tryRelease是一組围段,用于實(shí)現(xiàn)獨(dú)占資源的情況,如ReentrantLock投放;tryAcquireShared和tryReleaseShared是一組用于實(shí)現(xiàn)共享資源的情況奈泪,如CountDownLatch。
二.源碼分析
2.1 acquire方法源碼詳解
在AQS中一個重要的方法是acquire(int)灸芳,這個方法實(shí)現(xiàn)請求資源和阻塞線程的功能涝桅。下面先貼一下它的源碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
這個方法里只有一個if語句。首先執(zhí)行tryAcquire(int)方法烙样,前面說了這個方法需要子類來自定義冯遂,這里先看一下它的代碼:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
可以看到在AQS中tryAcquire方法直接拋出了異常。因?yàn)榫唧w的獲取資源細(xì)節(jié)需要子類根據(jù)自己要實(shí)現(xiàn)的功能來寫谒获,AQS只負(fù)責(zé)阻塞隊(duì)列的管理等工作债蜜。同時注意到這個方法并不是一個抽象的方法。其實(shí)前面說的需要子類實(shí)現(xiàn)的5個方法都不是抽象的究反,因?yàn)樽宇惒⒉灰欢ㄐ枰獙?shí)現(xiàn)所有這些方法,這提供了一定的靈活性儒洛。
2.1.1 addWaiter方法詳解
先忙接著看acquire方法精耐。在if語句里,如果tryAcquire返回true琅锻,那么acquire就返回了卦停,說明成功獲取到了資源向胡。如果tryAcquire返回false,if語句的前半句判斷就成立了惊完,需要繼續(xù)執(zhí)行&&右邊的acquireQueued方法僵芹,執(zhí)行它之前先執(zhí)行了addWaiter。先看一下addWaiter它的代碼:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
可以看到這個方法有一個Node參數(shù)小槐。Node類便是aqs維護(hù)的FIFO隊(duì)列中的元素的類型拇派。回顧一下acquire()方法的代碼凿跳,是將Node.EXCLUSIVE作為參數(shù)傳入了addWaiter件豌。查看Node源碼發(fā)現(xiàn)有這么一句:
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
原來這是一個null值,用來表示獨(dú)占性線程控嗜。不管如何茧彤,先繼續(xù)看addWaiter的源碼吧。
第一句代碼:Node node = new Node(Thread.currentThread,mode);新建了一個表示當(dāng)前線程的結(jié)點(diǎn)疆栏。剛才傳入的null作為模式傳給構(gòu)造方法曾掂。進(jìn)入對象構(gòu)造方法查看:
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
繼續(xù)看addWaiter的后續(xù)代碼,發(fā)現(xiàn)是獲取了當(dāng)前隊(duì)列的尾節(jié)點(diǎn)壁顶,并將新建結(jié)點(diǎn)的prev指針執(zhí)行尾節(jié)點(diǎn)珠洗,再使用cas嘗試替換尾節(jié)點(diǎn),如果成功博助,那么當(dāng)前結(jié)點(diǎn)就成為新的尾節(jié)點(diǎn)险污,返回。
如果cas失敗或者當(dāng)前tail為null富岳,調(diào)用eng方法處理蛔糯。下面看一下eng的代碼:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
熟悉AtomicInteger的朋友看到這段代碼一定會感動非常熟悉。這里就是使用了循環(huán)嘗試的方式來進(jìn)行cas操作窖式,指導(dǎo)成功為止蚁飒。另外當(dāng)tail==null時,先新建head結(jié)點(diǎn)再進(jìn)行操作萝喘,當(dāng)前這里給head變量反之也是使用了cas操作淮逻。
綜上,addWaiter()進(jìn)行的操作就是安全地更新隊(duì)列的tail指針阁簸。
2.1.2 acquireQueue方法詳解
下面繼續(xù)看acquire()方法爬早。再把代碼貼一次。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
給acqureQueued方法傳入的第一個參數(shù)是addWaiter方法的返回值启妹,回想一下剛才的addWaiter方法筛严,發(fā)現(xiàn)它的返回值是新創(chuàng)建的表示當(dāng)前線程的Node結(jié)點(diǎn)。acquireQueued方法的另一個參數(shù)是acquire的形參arg饶米,這個一般是獲取資源的個數(shù)桨啃,像ReentrantLock的lock方法就是調(diào)用了acquire(1)车胡。下面看一下acquireQueued方法的源碼吧:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這個方法的主體是一個死循環(huán),不斷測試兩件事:1.是否是頭結(jié)點(diǎn)的下一個節(jié)點(diǎn)照瘾,說明該輪到自己獲取資源了匈棘。2:是否可以休息了。判斷1成功后就用tryAcquire獲取資源析命,成功后設(shè)置當(dāng)前結(jié)點(diǎn)為頭結(jié)點(diǎn)主卫,返回。如果1判斷不成功則執(zhí)行shouldParkAfterFailedAcquire方法碳却,先貼一下代碼:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
這個方法的工作是找到當(dāng)前結(jié)點(diǎn)之前的一個未取消的結(jié)點(diǎn)队秩,將其waitStatue改為SIGNAL(-1)。這樣在該結(jié)點(diǎn)釋放資源時就會喚醒當(dāng)前結(jié)點(diǎn)昼浦。
當(dāng)shouldParkAfterFailedAcquire返回true之后馍资,當(dāng)前線程就可以去休息了——調(diào)用parkAndCheckInterrupt方法:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
這個方法使用了LockSupport的park方法,使線程進(jìn)入waiting狀態(tài)关噪。當(dāng)其它線程調(diào)用unPark方法鸟蟹,或此線程被中斷后才會返回。
2.1.3小結(jié)
下面來總結(jié)一下acquire方法使兔。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
先嘗試獲取資源建钥,獲取到的情況直接返回。獲取不到將線程加入隊(duì)列:首先將tail指向表示當(dāng)前線程的結(jié)點(diǎn)虐沥,使用CAS操作更新tail熊经。之后執(zhí)行acquireQueued方法,如果是當(dāng)前隊(duì)列的第二個則再次嘗試獲取tryAcquire欲险,成功后將自己設(shè)置為head(head表示已經(jīng)獲取到的資源的結(jié)點(diǎn))镐依。不能獲取資源時判斷是否可以park(),判斷依據(jù)是其prev的結(jié)點(diǎn)的waitState是否是signal天试,即是否會在釋放資源時通知它槐壳。之后當(dāng)前線程調(diào)用park進(jìn)入waiting狀態(tài)。waitting結(jié)束時返回是否中斷標(biāo)志喜每,并重置標(biāo)志务唐。回到acquire带兜,如果waitting期間中斷過枫笛,則調(diào)用selfInterrupt響應(yīng)中斷。
2.2 release(int)
此方法是獨(dú)占模式下釋放資源的頂層方法刚照。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
這里可以看出釋放資源成功時刑巧,獲取到head結(jié)點(diǎn)(因?yàn)閔ead結(jié)點(diǎn)表示的線程就是當(dāng)前獲取到資源的線程),執(zhí)行unparkSuccessor()操作。這里便和shouldParkAfterFailedAcquire中‘休息’的代碼相呼應(yīng)海诲。如果那里設(shè)置了waitStatus為signal就會使用LockSupport.unpark方法來喚醒等待的線程。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 如果后繼結(jié)點(diǎn)為null或等待狀態(tài)>0(當(dāng)前結(jié)點(diǎn)被取消)檩互,則從后往前找到正在應(yīng)該被喚醒的結(jié)點(diǎn)
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
對了特幔,tryRelease方法也是具體的同步器來實(shí)現(xiàn)的。
2.3 其它方法
acquireShared(int)和releaseShared()方法是共享模式下獲取資源和釋放資源的方法闸昨。這里不再詳細(xì)展開了蚯斯,請看參考資料里的文章。
參考資料:1.Java并發(fā)之AQS詳解
本文是Java并發(fā)專題(歡迎大家關(guān)注)的一篇饵较。
以下是完整的目錄:
Java并發(fā)之基礎(chǔ)知識
Java并發(fā)之volatile關(guān)鍵字
Java并發(fā)之synchronized關(guān)鍵字
Java并發(fā)之原子類
Java并發(fā)之線程池
Java并發(fā)之并發(fā)工具類
Java并發(fā)之AQS原理
Java并發(fā)之ThreadLocal使用和源碼分析