Semaphore
Semaphore
是 Java 并發(fā)包中提供的一個工具類早直,翻譯過來為“信號量”寥假,作用是控制并發(fā)線程的數(shù)量。
類的結(jié)構(gòu)
先來看一下 Semaphore
的結(jié)構(gòu):
Semaphore
中有三個個內(nèi)部類:
- 類
Sync
繼承了AbstractQueuedSynchronizer
霞扬,重寫了tryReleaseShared
方法糕韧,還有一些在Semaphore
中用到的輔助方法,都是對線程進(jìn)行控制的 - 類
NonfairSync
繼承了Sync
喻圃,通過重寫tryAcquireShared
方法實(shí)現(xiàn)了非公平的線程競爭機(jī)制萤彩,這個方法內(nèi)部是調(diào)用了Sync
的nonfairTryAcquireShared
方法 - 類
FairSync
繼承了Sync
,也是通過重寫tryAcquireShared
方法實(shí)現(xiàn)了公平的線程競爭機(jī)制
由此可以看出斧拍,Semaphore
實(shí)際上是通過 AQS 的共享鎖來控制線程的雀扶。
構(gòu)造方法
Semaphore
的構(gòu)造函數(shù)對變量 sync 進(jìn)行了初始化,默認(rèn)是非公平競爭的肆汹,也可以通過指定參數(shù)設(shè)置為公平競爭愚墓,其他所有的方法內(nèi)部都是調(diào)用了 sync 變量的方法。
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
acquire
acquire
用來獲取信號量昂勉,Semaphore
提供了重載方法浪册,也可以獲取多個信號量。
public void acquire() throws InterruptedException { // 獲取 1 個信號量
sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException(); // 參數(shù)判斷
sync.acquireSharedInterruptibly(permits); // 獲取指定個數(shù)的信號量
}
acquire
方法內(nèi)部調(diào)用了 sync
的 acquireSharedInterruptibly
方法岗照,這里并沒有對這個方法進(jìn)行重寫村象,所以調(diào)用的還是 AbstractQueuedSynchronizer
中的方法,這里就不貼代碼了攒至,但是 acquireSharedInterruptibly
方法內(nèi)部又調(diào)用了 tryAcquireShared
方法煞肾,由于 Semaphore
類提供了公平和非公平兩種競爭機(jī)制,所以 tryAcquireShared
也有兩種不同的實(shí)現(xiàn)嗓袱。
來看一下兩種獲取鎖的方法籍救。
tryAcquireShared
先來看一下非公平的 tryAcquireShared
,這個方法是內(nèi)部類 NonfairSync
中的:
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
繼續(xù)看 nonfairTryAcquireShared
渠抹,這個方法是 Sync
提供的:
final int nonfairTryAcquireShared(int acquires) {
for (;;) { // 循環(huán)直到獲取成功
int available = getState(); // 獲取當(dāng)前 state蝙昙,在這里就是信號量
int remaining = available - acquires; // 減去獲取的信號量后剩余的信號量
// 如果信號量小于 0(獲取失斏撂选) 或者更新信號量成功,返回剩余信號量
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
再看公平的 tryAcquireShared
奇颠,這個方法是內(nèi)部類 FairSync
中的:
protected int tryAcquireShared(int acquires) {
for (;;) {
// 先判斷在當(dāng)前線程之前是否有線程正在 acquire败去,如果有返回 -1 表示獲取失敗
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
相對于非公平的 nonfairTryAcquireShared
,公平的 tryAcquireShared
先判斷在當(dāng)前線程之前是否有線程正在 acquire烈拒,如果有就直接返回 -1 表示 tryAcquire 失敗了圆裕,這就是公平的體現(xiàn)。
acquire
還提供了忽略中斷的 acquireUninterruptibly
荆几,這里就不展開來說了吓妆。
release
release
方法用來釋放信號量,同樣的吨铸,Semaphore
提供了重載方法行拢,可以釋放多個信號量。
public void release() {
sync.releaseShared(1);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
和 acquire
一樣诞吱,release
方法調(diào)用了 AbstractQueuedSynchronizer
中的 releaseShared
方法舟奠,releaseShared
方法內(nèi)部又調(diào)用了 tryReleaseShared
方法,這個方法由子類 Sync
重寫:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow 溢出
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
tryReleaseShared
的邏輯比較簡單房维,將信號量歸還沼瘫,CAS 更新 state 即可。
Semaphore
還提供了 tryAcquire
方法以及一些輔助方法咙俩,這里不再贅述晕鹊。
總結(jié)
Semaphore
提供了線程的控制方案,對線程的競爭提供了公平和非公平的方式暴浦。
應(yīng)用
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
private static final int numOfThreads = 5; // 線程數(shù)
private static final int sleepTime = 3000; // 睡眠時(shí)間
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(numOfThreads);
System.out.println("停車場一共有 " + numOfThreads + " 個停車位");
for (int i = 0; i < 2 * numOfThreads; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 停車");
Thread.sleep(sleepTime);
System.out.println(Thread.currentThread().getName() + " 開走了");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
輸出結(jié)果為:
停車場一共有 5 個停車位
Thread-2 停車
Thread-0 停車
Thread-1 停車
Thread-4 停車
Thread-3 停車
Thread-0 開走了
Thread-2 開走了
Thread-8 停車
Thread-7 停車
Thread-1 開走了
Thread-4 開走了
Thread-3 開走了
Thread-9 停車
Thread-6 停車
Thread-5 停車
Thread-7 開走了
Thread-8 開走了
Thread-9 開走了
Thread-6 開走了
Thread-5 開走了
CountDownLatch溅话、CyclicBarrier 和 Semaphore
-
CountDownLatch
由一類線程控制另一類線程,CyclicBarrier
是一類線程都執(zhí)行到了await
方法后再繼續(xù)執(zhí)行歌焦,Semaphore
則是控制同時(shí)執(zhí)行的線程的數(shù)量 -
CountDownLatch
主要通過await
方法和countDown
方法控制飞几,CyclicBarrier
只通過await
方法,Semaphore
通過acquire
方法和release
方法