前言
并發(fā) JUC 包提供了很多工具類阎肝,比如之前說的 CountDownLatch香罐,CyclicBarrier ,今天說說這個 Semaphore——信號量逸绎,關(guān)于他的使用請查看往期文章并發(fā)編程之 線程協(xié)作工具類,今天的任務(wù)就是從源碼層面分析一下他的原理夭谤。
源碼分析
如果先不看源碼棺牧,根據(jù)以往我們看過的 CountDownLatch CyclicBarrier 的源碼經(jīng)驗來看,Semaphore 會怎么設(shè)計呢朗儒?
首先颊乘,他要實現(xiàn)多個線程線程同時訪問一個資源,類似于共享鎖醉锄,并且乏悄,要控制進入資源的線程的數(shù)量。
如果根據(jù) JDK 現(xiàn)有的資源榆鼠,我們是否可以使用 AQS 的 state 變量來控制呢纲爸?類似 CountDownLatch 一樣,有幾個線程我們就為這個 state 變量設(shè)置為幾妆够,當(dāng) state 達到了閾值识啦,其他線程就不能獲取鎖了,就需要等待神妹。當(dāng) Semaphore 調(diào)用 release 方法的時候颓哮,就釋放鎖,將 state 減一鸵荠,并喚醒 AQS 上的線程冕茅。
以上,就是我們的猜想蛹找,那我們看看 JDK 是不是和我們想的一樣姨伤。
首先看看 Semaphore 的 UML 結(jié)構(gòu):
內(nèi)部有 3 個類,繼承了 AQS庸疾。一個公平鎖乍楚,一個非公平鎖,這點和 ReentrantLock 一摸一樣届慈。
看看他的構(gòu)造器:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
兩個構(gòu)造器徒溪,兩個參數(shù),一個是許可線程數(shù)量金顿,一個是是否公平鎖臊泌,默認非公平。
而 Semaphore 有 2 個重要的方法揍拆,也是我們經(jīng)常使用的 2 個方法:
semaphore.acquire();
// doSomeing.....
semaphore.release();
acquire 和 release 方法渠概,我們今天重點看這兩個方法的源碼,一窺 Semaphore 的全貌礁凡。
acquire 方法源碼分析
代碼如下:
public void acquire() throws InterruptedException {
// 嘗試獲取一個鎖
sync.acquireSharedInterruptibly(1);
}
// 這是抽象類 AQS 的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 如果小于0高氮,就獲取鎖失敗了慧妄。加入到AQS 等待隊列中顷牌。
// 如果大于0剪芍,就直接執(zhí)行下面的邏輯了。不用進行阻塞等待窟蓝。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 這是抽象父類 Sync 的方法罪裹,默認是非公平的
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
// 非公平鎖的釋放鎖的方法
final int nonfairTryAcquireShared(int acquires) {
// 死循環(huán)
for (;;) {
// 獲取鎖的狀態(tài)
int available = getState();
int remaining = available - acquires;
// state 變量是否還足夠當(dāng)前獲取的
// 如果小于 0,獲取鎖就失敗了运挫。
// 如果大于 0状共,就循環(huán)嘗試使用 CAS 將 state 變量更新成減去輸入?yún)?shù)之后的。
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
這里的釋放就是對 state 變量減一(或者更多)的谁帕。
返回了剩余的 state 大小峡继。
當(dāng)返回值小于 0 的時候,說明獲取鎖失敗了匈挖,那么就需要進入 AQS 的等待隊列了碾牌。代碼入下:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 添加一個節(jié)點 AQS 隊列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 死循環(huán)
for (;;) {
// 找到新節(jié)點的上一個節(jié)點
final Node p = node.predecessor();
// 如果這個節(jié)點是 head,就嘗試獲取鎖
if (p == head) {
// 繼續(xù)嘗試獲取鎖儡循,這個方法是子類實現(xiàn)的
int r = tryAcquireShared(arg);
// 如果大于0舶吗,說明拿到鎖了。
if (r >= 0) {
// 將 node 設(shè)置為 head 節(jié)點
// 如果大于0择膝,就說明還有機會獲取鎖誓琼,那就喚醒后面的線程,稱之為傳播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 如果他的上一個節(jié)點不是 head肴捉,就不能獲取鎖
// 對節(jié)點進行檢查和更新狀態(tài)腹侣,如果線程應(yīng)該阻塞,返回 true齿穗。
if (shouldParkAfterFailedAcquire(p, node) &&
// 阻塞 park傲隶,并返回是否中斷,中斷則拋出異常
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
// 取消節(jié)點
cancelAcquire(node);
}
}
總的邏輯就是:
創(chuàng)建一個分享類型的 node 節(jié)點包裝當(dāng)前線程追加到 AQS 隊列的尾部缤灵。
如果這個節(jié)點的上一個節(jié)點是 head 伦籍,就是嘗試獲取鎖,獲取鎖的方法就是子類重寫的方法腮出。如果獲取成功了帖鸦,就將剛剛的那個節(jié)點設(shè)置成 head。
如果沒搶到鎖胚嘲,就阻塞等待作儿。
release 方法源碼分析
該方法用于釋放鎖,代碼如下:
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// 死循環(huán)釋放成功
if (tryReleaseShared(arg)) {
// 喚醒 AQS 等待對列中的節(jié)點馋劈,從 head 開始
doReleaseShared();
return true;
}
return false;
}
// Sync extends AbstractQueuedSynchronizer
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
// 對 state 變量 + 1
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
這里釋放鎖的邏輯寫在了抽象類 Sync 中攻锰。邏輯簡單晾嘶,就是對 state 變量做加法。
在加法成功后娶吞,執(zhí)行 doReleaseShared
方法垒迂,這個方法是 AQS 的。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 設(shè)置 head 的等待狀態(tài)為 0 妒蛇,并喚醒 head 上的線程
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 成功設(shè)置成 0 之后机断,將 head 狀態(tài)設(shè)置成傳播狀態(tài)
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
該方法的主要作用就是從 AQS 的 head 節(jié)點開始喚醒線程,注意绣夺,這里喚醒是 head 節(jié)點的下一個節(jié)點吏奸,需要和 doAcquireSharedInterruptibly
方法對應(yīng),因為 doAcquireSharedInterruptibly
方法喚醒的當(dāng)前節(jié)點的上一個節(jié)點陶耍,也就是 head 節(jié)點奋蔚。
至此,釋放 state 變量烈钞,喚醒 AQS 頭節(jié)點結(jié)束泊碑。
總結(jié)
總結(jié)一下 Semaphore 的原理吧。
總的來說棵磷,Semaphore 就是一個共享鎖蛾狗,通過設(shè)置 state 變量來實現(xiàn)對這個變量的共享。當(dāng)調(diào)用 acquire 方法的時候仪媒,state 變量就減去一沉桌,當(dāng)調(diào)用 release 方法的時候,state 變量就加一算吩。當(dāng) state 變量為 0 的時候留凭,別的線程就不能進入代碼塊了,就會在 AQS 中阻塞等待偎巢。