一、Semaphore
Semaphore是一種在多線程環(huán)境下使用的設(shè)施景殷,該設(shè)施負(fù)責(zé)協(xié)調(diào)各個(gè)線程隙赁,以保證它們能夠正確、合理的使用公共資源的設(shè)施披蕉,也是操作系統(tǒng)中用于控制進(jìn)程同步互斥的量颈畸。Semaphore是一種計(jì)數(shù)信號(hào)量,用于管理一組資源没讲,內(nèi)部是基于AQS的共享模式眯娱。它相當(dāng)于給線程規(guī)定一個(gè)量從而控制允許活動(dòng)的線程數(shù)。
Semaphore(信號(hào)量)是用來控制同時(shí)訪問特定資源的線程數(shù)量爬凑,它通過協(xié)調(diào)各個(gè)線程徙缴,以保證合理的使用公共資源。很多年以來嘁信,我都覺得從字面上很難理解Semaphore所表達(dá)的含義于样,只能把它比作是控制流量的紅綠燈,比如XX馬路要限制流量潘靖,只允許同時(shí)有一百輛車在這條路上行使百宇,其他的都必須在路口等待,所以前一百輛車會(huì)看到綠燈秘豹,可以開進(jìn)這條馬路携御,后面的車會(huì)看到紅燈,不能駛?cè)隭X馬路既绕,但是如果前一百輛中有五輛車已經(jīng)離開了XX馬路啄刹,那么后面就允許有5輛車駛?cè)腭R路,這個(gè)例子里說的車就是線程凄贩,駛?cè)腭R路就表示線程在執(zhí)行誓军,離開馬路就表示線程執(zhí)行完成,看見紅燈就表示線程被阻塞疲扎,不能執(zhí)行昵时。
Semaphore 是 synchronized 的加強(qiáng)版捷雕,作用是控制線程的并發(fā)數(shù)量。就這一點(diǎn)而言壹甥,單純的synchronized 關(guān)鍵字是實(shí)現(xiàn)不了的救巷。
信號(hào)量通過一組許可證來控制對(duì)共享資源的訪問。
如果需要句柠,可以用acquire()方法獲取許可浦译,如果許可為0,那么會(huì)進(jìn)行阻塞溯职,通過使用release()方法釋放許可精盅,把許可歸還給Semaphore,歸還之后谜酒,阻塞的線程就會(huì)醒來嘗試獲取許可叹俏。
Semaphore提供給了若干個(gè)api對(duì)應(yīng)不同的功能:
- Semaphore(int permits):非公平模式創(chuàng)建;
- Semaphore(int permits, boolean fair):可以指定是否公平模式創(chuàng)建僻族;
- acquire():嘗試獲取1個(gè)許可她肯,如果沒有許可則阻塞,可以被中斷停止等待鹰贵;
- acquire(int permits):跟上一個(gè)方法類型晴氨,嘗試獲取permits個(gè)許可;
- acquireUninterruptibly():嘗試獲取一個(gè)許可碉输,不可中斷籽前;
- acquireUninterruptibly(int permits):嘗試獲取permits個(gè)許可,不可中斷敷钾;
- tryAcquire():嘗試獲取一個(gè)許可枝哄,獲取不到則直接返回失敗阻荒;
- tryAcquire(int permits):嘗試獲取permits個(gè)許可挠锥,獲取不到則直接返回失敗侨赡;
- tryAcquire(int permits, long timeout, TimeUnit unit):嘗試在timeout時(shí)間內(nèi)獲取permits個(gè)許可蓖租,超時(shí)則返回false,可被中斷羊壹;
- tryAcquire(long timeout, TimeUnit unit):嘗試在timeout時(shí)間內(nèi)獲取1個(gè)許可蓖宦,超時(shí)則返回false,可被中斷油猫;
- release():釋放一個(gè)許可稠茂;
- release(int permits):釋放n個(gè)許可;
下面演示基于公平鎖的Semaphore情妖,獲取鎖使用acquireUninterruptibly():
這里設(shè)置的許可為2睬关,可以發(fā)現(xiàn)诱担,同一時(shí)刻最多只能有兩個(gè)線程獲得許可。
二电爹、執(zhí)行原理
Semaphore的執(zhí)行原理相對(duì)來說比較簡單蔫仙。下面描述了可中斷非公平的信號(hào)量實(shí)現(xiàn)原理,ASQ中的state值就相當(dāng)于許可的數(shù)量:
- 執(zhí)行acquire的時(shí)候藐不,會(huì)嘗試讓state - acquires匀哄,如果發(fā)現(xiàn)許可足夠秦效,則進(jìn)行cas更新雏蛮,扣減許可,否則線程進(jìn)入等待隊(duì)列阱州;
- 執(zhí)行release的時(shí)候挑秉,state + releases,把許可加回去苔货。
三犀概、Semaphore用法
/**
* @Description: 演示Semaphore用法
*/
public class SemaphoreDemo {
public static Semaphore semaphore = new Semaphore(3,true);
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(50);
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"拿到了許可證");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(Thread.currentThread().getName()+"釋放了許可證");
semaphore.release();
}
});
}
executorService.shutdown();
}
}
注意,如果使用的是tryAcquire失敗之后直接返回夜惭,線程不會(huì)進(jìn)入AQS等待隊(duì)列姻灶。
四、源碼
公平信號(hào)量 和 非公平信號(hào)量 的區(qū)別
"公平信號(hào)量"和"非公平信號(hào)量"的釋放信號(hào)量的機(jī)制是一樣的诈茧!
不同的是它們獲取信號(hào)量的機(jī)制:線程在嘗試獲取信號(hào)量許可時(shí)产喉,對(duì)于公平信號(hào)量而言,如果當(dāng)前線程不在CLH隊(duì)列的頭部敢会,則排隊(duì)等候曾沈;而對(duì)于非公平信號(hào)量而言,無論當(dāng)前線程是不是在CLH隊(duì)列的頭部鸥昏,它都會(huì)直接獲取信號(hào)量塞俱。該差異具體的體現(xiàn)在,它們的tryAcquireShared()函數(shù)的實(shí)現(xiàn)不同吏垮。
4.1 Semaphore構(gòu)造方法
public class Semaphore implements java.io.Serializable {
private final Sync sync;
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
}
1障涯、Semaphore 構(gòu)造器,permits 為傳入的許可證數(shù)膳汪,默認(rèn)非公平構(gòu)造器像樊;
2、Semaphore 構(gòu)造器旅敷,permits 為傳入的許可證數(shù)生棍,fair 是 boolean 型的,如果傳入 true媳谁,則公平涂滴,否則不公平友酱;
4.2 NonfairSync 和 FairSync源碼
public class Semaphore implements java.io.Serializable {
private final Sync sync;
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
兩者都繼承了 Sync 同步器,初始化時(shí)都調(diào)用了父類構(gòu)造器柔纵,同時(shí)都有一個(gè)獲取信號(hào)的方法缔杉,稍后再分析獲取信號(hào)的區(qū)別。
4.3 acquire(獲取信號(hào)量)
- 這個(gè)方法是從信號(hào)量獲取一個(gè)許可搁料,在獲取到許可或详,或線程中斷之前,當(dāng)前線程阻塞;獲取許可后立即返回并將許可數(shù)減一
public class Semaphore implements java.io.Serializable {
private final Sync sync;
/**
* 如果沒有許可可用郭计,則會(huì)休眠霸琴,直到發(fā)生以下兩種情況
* 1、其他調(diào)用release方法釋放許可昭伸,并且當(dāng)前線程獲取到許可
* 2梧乘、其他線程中斷了當(dāng)前線程
* 1)當(dāng)前線程在進(jìn)入這個(gè)方法時(shí)設(shè)置了中斷標(biāo)志位
* 2)等待許可時(shí)發(fā)生了中斷,則拋出中斷異常
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
}
- acquireSharedInterruptibly
這個(gè)方法是直接調(diào)用AQS的acquireSharedInterruptibly(int ard)方法庐杨;
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* 首先檢測(cè)是否中斷.中斷后拋出異常
* 嘗試獲取許可选调,成功退出;失敗則進(jìn)入AQS隊(duì)列,直至成功獲取或中斷
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 嘗試獲取鎖灵份,返回剩余共享鎖的數(shù)量仁堪;小于0則加入同步隊(duì)列,自旋
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
}
tryAcquireShared(arg)則會(huì)調(diào)用Semaphore中兩個(gè)同步器的tryAcquireShared實(shí)現(xiàn)方法填渠; 如果獲取失敗則加入隊(duì)列等待喚醒弦聂;
4.4 非公平模式的實(shí)現(xiàn)
非公平實(shí)現(xiàn)都是首先查看是否有可獲取的許可,如果有則獲取成功揭蜒,沒有則進(jìn)隊(duì)列等待横浑;利用此可以提高并發(fā)量
public class Semaphore implements java.io.Serializable {
private final Sync sync;
static final class NonfairSync extends Sync {
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
}
- 直接調(diào)用其父類Sync中非公平共享獲取
public class Semaphore implements java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
// 自旋直到無許可或者狀態(tài)位賦值成功
for (;;) {
int available = getState();
int remaining = available - acquires;
// 如果小于0則直接返回,否則利用CAS給AQS狀態(tài)位賦值
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
通過自旋+CAS來一直嘗試獲取許可屉更,直到獲取成功或者沒有許可徙融,返回剩余的許可數(shù)
4.5 公平模式的實(shí)現(xiàn)
公平與非公平的區(qū)別在于始終按照AQS隊(duì)列FIFO的順序來的
public class Semaphore implements java.io.Serializable {
private final Sync sync;
static final class FairSync extends Sync {
protected int tryAcquireShared(int acquires) {
//自旋 CAS 實(shí)現(xiàn)線程安全
for (;;) {
// 判斷是否有前置任務(wù)排隊(duì)
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
// 如果小于0則直接返回,否則利用CAS給AQS狀態(tài)位賦值
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
如果等待隊(duì)列不為空瑰谜,則直接返回-1欺冀。 以上兩種模式獲取失敗后都會(huì)調(diào)用doAcquireSharedInterruptibly(int arg);自旋等待獲取鎖
- doAcquireSharedInterruptibly方法:會(huì)使得當(dāng)前線程一直等待萨脑,直到當(dāng)前線程獲取到鎖(或被中斷)才返回
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//創(chuàng)建“當(dāng)前線程”的Node節(jié)點(diǎn)隐轩,且node中記錄的鎖是“共享鎖”類型,并將節(jié)點(diǎn)添加到CLH隊(duì)列末尾渤早。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//獲取前繼節(jié)點(diǎn)职车,如果前繼節(jié)點(diǎn)是等待鎖隊(duì)列的表頭,則嘗試獲取共享鎖
// 判斷新增的節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)是否頭節(jié)點(diǎn)
final Node p = node.predecessor();
if (p == head) {
// 是頭節(jié)點(diǎn),那么在此嘗試獲取共享鎖
int r = tryAcquireShared(arg);
if (r >= 0) {
// 獲取成功悴灵,把當(dāng)前節(jié)點(diǎn)變?yōu)樾碌膆ead節(jié)點(diǎn)扛芽,
//并且檢查后續(xù)節(jié)點(diǎn)是否可以在共享模式下等待,
//并且允許繼續(xù)傳播积瞒,則調(diào)用doReleaseShared繼續(xù)喚醒下一個(gè)節(jié)點(diǎn)嘗試獲取鎖
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//前繼節(jié)點(diǎn)不是頭節(jié)點(diǎn)川尖,當(dāng)前線程一直等待,直到獲取到鎖
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
- shouldParkAfterFailedAcquire方法:判斷當(dāng)前線程獲取鎖失敗之后是否需要掛起
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/*說明:4.shouldParkAfterFailedAcquire 返回當(dāng)前線程是否應(yīng)該阻塞
(01) 關(guān)于waitStatus請(qǐng)參考下表(中擴(kuò)號(hào)內(nèi)為waitStatus的值)
CANCELLED[1] -- 當(dāng)前線程已被取消
SIGNAL[-1] -- “當(dāng)前線程的后繼線程需要被unpark(喚醒)”茫孔。
一般發(fā)生情況是:當(dāng)前線程的后繼線程處于阻塞狀態(tài)叮喳,
而當(dāng)前線程被release或cancel掉,因此需要喚醒當(dāng)前線程的后繼線程缰贝。
CONDITION[-2] -- 當(dāng)前線程(處在Condition休眠狀態(tài))在等待Condition喚醒
PROPAGATE[-3] -- (共享鎖)其它線程獲取到“共享鎖”
[0] -- 當(dāng)前線程不屬于上面的任何一種狀態(tài)馍悟。
(02) shouldParkAfterFailedAcquire()通過以下規(guī)則,判斷“當(dāng)前線程”是否需要被阻塞揩瞪。
規(guī)則1:如果前繼節(jié)點(diǎn)狀態(tài)為SIGNAL赋朦,表明當(dāng)前節(jié)點(diǎn)需要被unpark(喚醒)篓冲,此時(shí)則返回true李破。
規(guī)則2:如果前繼節(jié)點(diǎn)狀態(tài)為CANCELLED(ws>0),說明前繼節(jié)點(diǎn)已經(jīng)被取消壹将,則通過先前回溯找到一個(gè)有效(非CANCELLED狀態(tài))的節(jié)點(diǎn)嗤攻,并返回false。
規(guī)則3:如果前繼節(jié)點(diǎn)狀態(tài)為非SIGNAL诽俯、非CANCELLED妇菱,則設(shè)置前繼的狀態(tài)為SIGNAL,并返回false暴区。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驅(qū)節(jié)點(diǎn)的狀態(tài)
int ws = pred.waitStatus;
// 如果前驅(qū)節(jié)點(diǎn)是SIGNAL狀態(tài)闯团,則意味著當(dāng)前線程需要unpark喚醒,此時(shí)返回true
if (ws == Node.SIGNAL)
return true;
// 如果前繼節(jié)點(diǎn)是取消的狀態(tài)即前驅(qū)節(jié)點(diǎn)狀態(tài)為CANCELLED
if (ws > 0) {
// 從隊(duì)尾向前尋找第一個(gè)狀態(tài)不為CANCELLED的節(jié)點(diǎn)
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 將前驅(qū)節(jié)點(diǎn)的狀態(tài)設(shè)置為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
}
4.6 void release()
公平和非公平使用相同的釋放 釋放許可
public class Semaphore implements java.io.Serializable {
private final Sync sync;
public void release() {
sync.releaseShared(1);
}
}
- 調(diào)用AQS中的releaseShared(int arg)
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//目的是讓當(dāng)前線程釋放它所持有的共享鎖仙粱,它首先會(huì)通過tryReleaseShared()去嘗試釋放共享鎖房交。
//嘗試成功,則直接返回伐割;嘗試失敗候味,則通過doReleaseShared()去釋放共享鎖。
public final boolean releaseShared(int arg) {
//釋放共享鎖
if (tryReleaseShared(arg)) {
//喚醒所有共享節(jié)點(diǎn)線程
doReleaseShared();
return true;
}
return false;
}
}
- tryReleaseShared()在Semaphore.Sync中被重寫隔心,釋放共享鎖白群,將鎖計(jì)數(shù)器加回去
public class Semaphore implements java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 獲取“鎖計(jì)數(shù)器”的狀態(tài)
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 通過CAS函數(shù)進(jìn)行賦值。
if (compareAndSetState(current, next))
return true;
}
}
}
}
- 如果釋放許可成功硬霍,則調(diào)用AQS中的doReleaseShared()方法來喚醒AQS隊(duì)列中等待的線程
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* 喚醒同步隊(duì)列中的一個(gè)線程
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//是否需要喚醒后繼節(jié)點(diǎn)
if (ws == Node.SIGNAL) {
//修改狀態(tài)為初始0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//喚醒h.nex節(jié)點(diǎn)線程
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
}
1)獲取隊(duì)列的頭節(jié)點(diǎn)元素帜慢,如果不為null,并且不為尾節(jié)點(diǎn),說白了粱玲,就是不止一個(gè)人等待侍咱,進(jìn)入判斷。
2)如果線程節(jié)點(diǎn)是需要喚醒的線程密幔,則進(jìn)行喚醒楔脯,獲取資源使用。
3)失敗后重試胯甩。
4)如果沒有后繼需要喚醒的節(jié)點(diǎn)昧廷,則退出,就相當(dāng)于每人排隊(duì)上廁所了偎箫,讓出來資源就空著木柬。
Semaphore 總結(jié)
1、Semaphore 內(nèi)部維護(hù)一組信號(hào)量淹办,即一個(gè) volatile 的整型 state 變量眉枕。
2、Semaphore 分為公平或非公平兩種方式怜森,獲取信號(hào)量或釋放信號(hào)量的本質(zhì)是對(duì) state 進(jìn)行原子的減少或增加操作速挑。
3、獲取不到信號(hào)的線程放在等待隊(duì)列里面副硅,釋放信號(hào)的時(shí)候會(huì)喚醒后繼節(jié)點(diǎn)姥宝。
4、Semaphore 主要用于對(duì)線程數(shù)量恐疲、公共資源(比如數(shù)據(jù)庫連接池)等進(jìn)行數(shù)量控制腊满。
參考:
https://www.itzhai.com/articles/graphical-several-fun-concurrent-helper-classes.html
https://www.cnblogs.com/200911/p/6060359.html