前言
Semaphore一種通常用于維持?jǐn)?shù)量的信號(hào)量工具莉恼,在Java 并發(fā)中最常見的使用就是維持并發(fā)線程的個(gè)數(shù),正如之前提到的,并不是并發(fā)場(chǎng)景線程越多越好,應(yīng)該是跟對(duì)應(yīng)的業(yè)務(wù)場(chǎng)景及CPU數(shù)量共同決定诗茎。而需要維持?jǐn)?shù)量時(shí),我們大致就需要一個(gè)JUC里面Semaphore這樣一個(gè)信號(hào)量工具献汗。Semaphore 同樣依賴于AQS實(shí)現(xiàn)(shared)敢订,內(nèi)部持有一個(gè)Sync對(duì)象,同樣存在公平實(shí)現(xiàn)罢吃、非公平實(shí)現(xiàn)兩種模式(可以在構(gòu)造時(shí)指定fair參數(shù))枢析。
原理
每次線程執(zhí)行前夕都會(huì)從這里默認(rèn)通過一下許可,然后執(zhí)行完畢時(shí)釋放一下許可刃麸,根據(jù)維系的數(shù)量來決定當(dāng)前線程是該等待還是執(zhí)行。
簡(jiǎn)單看一下源碼司浪,實(shí)現(xiàn)蠻簡(jiǎn)單的
首先是持有AQS對(duì)象
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
/** All mechanics via AbstractQueuedSynchronizer subclass */
private final Sync sync;
取許可及釋放操作
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
公平非公平兩種實(shí)現(xiàn)版本:
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}