此篇博客所有源碼均來自JDK 1.8
信號量Semaphore是一個控制訪問多個共享資源的計數(shù)器,和CountDownLatch一樣中燥,其本質(zhì)上是一個“共享鎖”塘偎。
Semaphore,在API是這么介紹的:
一個計數(shù)信號量咱扣。從概念上講峰尝,信號量維護(hù)了一個許可集。如有必要祭往,在許可可用前會阻塞每一個 acquire()火窒,然后再獲取該許可。每個 release() 添加一個許可已骇,從而可能釋放一個正在阻塞的獲取者票编。但是,不使用實際的許可對象鲤竹,Semaphore 只對可用許可的號碼進(jìn)行計數(shù)昔榴,并采取相應(yīng)的行動。
Semaphore 通常用于限制可以訪問某些資源(物理或邏輯的)的線程數(shù)目吱肌。
下面我們就一個停車場的簡單例子來闡述Semaphore:
為了簡單起見我們假設(shè)停車場僅有5個停車位仰禽,一開始停車場沒有車輛所有車位全部空著纺蛆,然后先后到來三輛車规揪,停車場車位夠,安排進(jìn)去停車识颊,然后又來三輛奕坟,這個時候由于只有兩個停車位,所有只能停兩輛刃跛,其余一輛必須在外面候著苛萎,直到停車場有空車位,當(dāng)然以后每來一輛都需要在外面候著蛙酪。當(dāng)停車場有車開出去翘盖,里面有空位了,則安排一輛車進(jìn)去(至于是哪輛 要看選擇的機(jī)制是公平還是非公平)阁危。
從程序角度看汰瘫,停車場就相當(dāng)于信號量Semaphore,其中許可數(shù)為5趴乡,車輛就相對線程剑逃。當(dāng)來一輛車時官辽,許可數(shù)就會減 1 同仆,當(dāng)停車場沒有車位了(許可書 == 0 ),其他來的車輛需要在外面等候著。如果有一輛車開出停車場市怎,許可數(shù) + 1辛慰,然后放進(jìn)來一輛車。
號量Semaphore是一個非負(fù)整數(shù)(>=1)驰弄。當(dāng)一個線程想要訪問某個共享資源時速客,它必須要先獲取Semaphore,當(dāng)Semaphore >0時岔擂,獲取該資源并使Semaphore – 1浪耘。如果Semaphore值 = 0七冲,則表示全部的共享資源已經(jīng)被其他線程全部占用,線程必須要等待其他線程釋放資源癞埠。當(dāng)線程釋放資源時苗踪,Semaphore則+1
實現(xiàn)分析
Semaphore結(jié)構(gòu)如下:
[圖片上傳中。通铲。颅夺。(1)]
從上圖可以看出Semaphore內(nèi)部包含公平鎖(FairSync)和非公平鎖(NonfairSync),繼承內(nèi)部類Sync部服,其中Sync繼承AQS(再一次闡述AQS的重要性)拗慨。
Semaphore提供了兩個構(gòu)造函數(shù):
- Semaphore(int permits) :創(chuàng)建具有給定的許可數(shù)和非公平的公平設(shè)置的 Semaphore奉芦。 2. Semaphore(int permits, boolean fair) :創(chuàng)建具有給定的許可數(shù)和給定的公平設(shè)置的 Semaphore剧蹂。
實現(xiàn)如下:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
Semaphore默認(rèn)選擇非公平鎖宠叼。
當(dāng)信號量Semaphore = 1 時,它可以當(dāng)作互斥鎖使用筹裕。其中0窄驹、1就相當(dāng)于它的狀態(tài),當(dāng)=1時表示其他線程可以獲取抗斤,當(dāng)=0時丈咐,排他,即其他線程必須要等待伤疙。
信號量獲取
Semaphore提供了acquire()方法來獲取一個許可辆影。
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
內(nèi)部調(diào)用AQS的acquireSharedInterruptibly(int arg),該方法以共享模式獲取同步狀態(tài):
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
在acquireSharedInterruptibly(int arg)中锯蛀,tryAcquireShared(int arg)由子類來實現(xiàn)旁涤,對于Semaphore而言迫像,如果我們選擇非公平模式,則調(diào)用NonfairSync的tryAcquireShared(int arg)方法菌羽,否則調(diào)用FairSync的tryAcquireShared(int arg)方法纷闺。
公平
protected int tryAcquireShared(int acquires) {
for (;;) {
//判斷該線程是否位于CLH隊列的列頭
if (hasQueuedPredecessors())
return -1;
//獲取當(dāng)前的信號量許可
int available = getState();
//設(shè)置“獲得acquires個信號量許可之后,剩余的信號量許可數(shù)”
int remaining = available - acquires;
//CAS設(shè)置信號量
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
非公平
對于非公平而言氓轰,因為它不需要判斷當(dāng)前線程是否位于CLH同步隊列列頭浸卦,所以相對而言會簡單些限嫌。
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
信號量釋放
獲取了許可,當(dāng)用完之后就需要釋放炉抒,Semaphore提供release()來釋放許可稚叹。
public void release() {
sync.releaseShared(1);
}
內(nèi)部調(diào)用AQS的releaseShared(int arg):
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
releaseShared(int arg)調(diào)用Semaphore內(nèi)部類Sync的tryReleaseShared(int arg):
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
//信號量的許可數(shù) = 當(dāng)前信號許可數(shù) + 待釋放的信號許可數(shù)
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//設(shè)置可獲取的信號許可數(shù)為next
if (compareAndSetState(current, next))
return true;
}
}
對于信號量的獲取釋放詳細(xì)過程扒袖,請參考如下博客:
- 【死磕Java并發(fā)】-----J.U.C之AQS:CLH同步隊列
- 【死磕Java并發(fā)】-----J.U.C之AQS:同步狀態(tài)的獲取與釋放
- 【死磕Java并發(fā)】-----J.U.C之AQS:阻塞和喚醒線程
- 【死磕Java并發(fā)】-----J.U.C之重入鎖:ReentrantLock
應(yīng)用示例
我們已停車為示例:
public class SemaphoreTest {
static class Parking{
//信號量
private Semaphore semaphore;
Parking(int count){
semaphore = new Semaphore(count);
}
public void park(){
try {
//獲取信號量
semaphore.acquire();
long time = (long) (Math.random() * 10);
System.out.println(Thread.currentThread().getName() + "進(jìn)入停車場季率,停車" + time + "秒..." );
Thread.sleep(time);
System.out.println(Thread.currentThread().getName() + "開出停車場...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}
static class Car extends Thread {
Parking parking ;
Car(Parking parking){
this.parking = parking;
}
@Override
public void run() {
parking.park(); //進(jìn)入停車場
}
}
public static void main(String[] args){
Parking parking = new Parking(3);
for(int i = 0 ; i < 5 ; i++){
new Car(parking).start();
}
}
}
運(yùn)行結(jié)果如下: