作者: 一字馬胡
轉(zhuǎn)載標(biāo)志 【2017-11-03】
更新日志
日期 | 更新內(nèi)容 | 備注 |
---|---|---|
2017-11-03 | 添加轉(zhuǎn)載標(biāo)志 | 持續(xù)更新 |
導(dǎo)入
Semaphore實現(xiàn)為一種基于計數(shù)的信號量绊起,Semaphore管理著一組虛擬的許可集合,這種許可可以作為某種憑證,來管理資源,在一些資源有限的場景下很有實用性惠豺,比如數(shù)據(jù)庫連接,應(yīng)用可初始化一組數(shù)據(jù)庫連接风宁,然后通過使用Semaphore來管理獲取連接的許可洁墙,任何線程想要獲得一個連接必須首先獲得一個許可,然后再憑這個許可獲得一個連接戒财,這個許可將持續(xù)到這個線程歸還了連接热监。在使用上,任何一個線程都需要通過acquire來獲得一個Semaphore許可饮寞,這個操作可能會阻塞線程直到成功獲得一個許可孝扛,因為資源是有限的,所以許可也是有限的幽崩,沒有獲得資源就需要阻塞等待其他線程歸還Semaphore苦始,而歸還Semaphore操作通過release方法來進(jìn)行,release會喚醒一個等待在Semaphore上的一個線程來嘗試獲得許可慌申。如果想要達(dá)到一種互斥的效果陌选,比如任何時刻只能有一個線程獲得許可,那么就可以初始化Semaphore的數(shù)量為1,一個線程獲得這個Semaphore之后咨油,任何到來的通過acquire來嘗試獲得許可的線程都會被阻塞直到這個持有Semaphore的線程調(diào)用了release方法來釋放Semaphore您炉。
在實現(xiàn)上役电,Semaphore借助了線程同步框架AQS赚爵,AQS的分析可以參考文章Java同步框架AbstractQueuedSynchronizer,同樣借助了AQS來實現(xiàn)的是java中的可重入鎖的實現(xiàn)法瑟,同樣你可以在文章Java可重入鎖詳解中找到j(luò)ava中可重入鎖的分析總結(jié)文檔冀膝。在這些文章中已經(jīng)分析過如何通過AQS來實現(xiàn)鎖的語義,本文將繼續(xù)分析AQS的應(yīng)用實例瓢谢,Semaphore作為一種線程間同步機制是非常輕量級的方案畸写,所以學(xué)習(xí)和掌握Semaphore是有必要的驮瞧。
信號量Semaphore
Semaphore的實現(xiàn)借助了同步框架AQS氓扛,下面的圖片展示了Semaphore的代碼結(jié)構(gòu),Semaphore使用一個內(nèi)部類Sync來實現(xiàn)论笔,而Sync繼承了AQS來實現(xiàn)采郎,Sync有兩個子類,分別對應(yīng)著公平模式和非公平模式的Semaphore狂魔,下文中會分析兩者的實現(xiàn)細(xì)節(jié)蒜埋。
首先來看一下Sync的構(gòu)造函數(shù):
Sync(int permits) {
setState(permits);
}
參數(shù)即為需要管理的許可數(shù)量,Sync使用AQS提供的setState方法來初始化共享變量state最楷,后續(xù)通過acquire和release來獲取和規(guī)劃許可整份。下面首先分析的是公平模式下的獲取許可方法:
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
hasQueuedPredecessors方法表示的是是否有線程在等待許可,如果已經(jīng)有線程在等待了籽孙,那么直接返回-1代表獲取許可失敗烈评,否則再去獲取,獲取許可就是通過compareAndSetState方法來更新state的值犯建,下面來看一下非公平模式下的獲取許可的方法:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
非公平模式和公平模式的區(qū)別在于公平模式會考慮是否已經(jīng)有線程在等待讲冠,而非公平模式會快速去競爭,不會考慮是否有線程在前面等待挪哄,關(guān)于多個線程是如何去競爭共享變量而獲得鎖語義的內(nèi)容需要參考文章Java同步框架AbstractQueuedSynchronizer撇眯。下面來分析一下上面分析的兩個方法是如何被調(diào)用的捎琐,上文中提到,我們是通過使用acquire方法來獲得一個許可的否彩,下面是Semaphore的acquire方法:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
acquire方法調(diào)用了AQS基類的acquireSharedInterruptibly方法,而acquireSharedInterruptibly方法調(diào)用了其子類的tryAcquireShared方法嗦随,對應(yīng)了公平模式和非公平模式下的tryAcquireShared方法胳搞。上面分析了獲取許可的方法acquire,下面再來分析一下歸還許可的方法release:
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
獲取許可是通過減少state來完成的,而歸還許可則是通過增加state來完成的肌毅,AQS通過維護(hù)一個共享變量state來實現(xiàn)多線程同步筷转。為了更好的理解Semaphore的工作原理,下面展示一個使用示例悬而,先來看代碼:
class Resource {
/**
* The resource
* @return r
*/
public Object getResource() {
return new Object();
}
}
/**
* The Thread Pool
*/
class Pool {
private static final int MAX_AVAILABLE = 100;
private final int availableSemaphore;
private Semaphore available;
public Pool() {
this(MAX_AVAILABLE);
}
public Pool(int available) {
this(available, false);
}
public Pool(int available, boolean fairMode) {
this.availableSemaphore = available;
this.available = new Semaphore(available, fairMode);
items = new Resource[availableSemaphore];
for (int i = 0; i < availableSemaphore; i ++) {
items[i] = new Resource();
}
used = new boolean[availableSemaphore];
}
public int availableSemaphore() {
return this.availableSemaphore;
}
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
// Not a particularly efficient data structure; just for demo
protected Object[] items;
protected boolean[] used;
private synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}
private synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
Pool類使用了Semaphore來管理一組許可呜舒,獲得許可的線程可以獲得一個Resource,而Resource是什么可以自定義笨奠,比如是數(shù)據(jù)庫連接池袭蝗。調(diào)用Pool類的getItem可以獲得一個許可,而調(diào)用putItem將許可歸還給Pool般婆,下面展示了Pool的簡單使用示例:
/**
* Created by hujian06 on 2017/10/21.
*
* Semaphore demo
*/
public class SemaphoreDemo {
public static void main(String ... args) {
Pool pool = new Pool();
Object resource = null;
try {
Resource r = (Resource) pool.getItem();
resource = r.getResource();
//do your biz
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
pool.putItem(resource); //release here
}
}
}
需要注意的是到腥,任何線程在獲得許可之后,使用共享資源完畢都需要執(zhí)行歸還操作蔚袍,否則會有線程一直在等待乡范。本文的內(nèi)容到此也就結(jié)束了,Semaphore只是使用AQS的一種簡單例子啤咽,AQS的強大之處就在于晋辆,你僅僅需要繼承他,然后使用它提供的api就可以實現(xiàn)任意復(fù)雜的線程同步方案宇整,AQS為我們做了大部分的同步工作瓶佳,所以本文可以當(dāng)成是對使用AQS的一種簡單介紹,你應(yīng)當(dāng)去分析一下AQS的實現(xiàn)細(xì)節(jié)鳞青,并且加以總結(jié)霸饲,可以這么說,理解了AQS臂拓,就理解了java中線程同步是如何實現(xiàn)的厚脉,線程同步是并發(fā)的核心內(nèi)容,如何保證多個線程可以安全高效的訪問共享數(shù)據(jù)埃儿,是并發(fā)需要首要考慮的問題器仗,而AQS解決了這些問題,未來還會對AQS進(jìn)行深入分析總結(jié)童番。