Semaphore和CountDownLatch都是典型的共享鎖腿箩,內(nèi)部使用AbstractQueuedSynchronizer來實(shí)現(xiàn)共享鎖的功能璧瞬。
我們先回顧一下AQS中關(guān)于共享鎖的重要方法男应。
一. AQS共享鎖的重要方法
1.1 acquireShared系列獲取共享鎖的方法
// 獲取共享鎖
public final void acquireShared(int arg) {
// 嘗試去獲取共享鎖察署,如果返回值小于0表示獲取共享鎖失敗
if (tryAcquireShared(arg) < 0)
// 調(diào)用doAcquireShared方法去獲取共享鎖
doAcquireShared(arg);
}
方法分析:
- tryAcquireShared方法:嘗試去獲取共享鎖,返回值小于0表示獲取共享鎖失敗拜姿,就要調(diào)用doAcquireShared方法烙样。返回值大于等于0,獲取成功,直接返回蕊肥。這個(gè)方法由子類實(shí)現(xiàn)谒获。
- doAcquireShared方法: 獲取共享鎖。如果獲取失敗壁却,當(dāng)前線程就會(huì)處于WAITING等待狀態(tài)批狱。如果獲取成功,因?yàn)槭枪蚕礞i展东,所以當(dāng)前線程也會(huì)去喚醒一個(gè)等待線程赔硫。
1.2 releaseShared 釋放共享鎖的方法
// 釋放共享鎖
public final boolean releaseShared(int arg) {
// 嘗試釋放共享鎖
if (tryReleaseShared(arg)) {
// 喚醒等待共享鎖的線程
doReleaseShared();
return true;
}
return false;
}
方法分析:
- tryReleaseShared方法,嘗試去釋放共享鎖資源盐肃。如果返回true爪膊,表示該共享鎖可以被獲取了,那么就要調(diào)用doReleaseShared方法砸王,去喚醒所有等待共享鎖的線程推盛。如果返回false,則什么都不做谦铃。
- doReleaseShared方法耘成,喚醒所有等待共享鎖的線程。
1.3 小結(jié)
所以要實(shí)現(xiàn)一個(gè)共享鎖,我們只要繼承AQS類瘪菌,并實(shí)現(xiàn)tryAcquireShared和tryReleaseShared方法就行了撒会。
二. Semaphore類
假如有這樣一個(gè)需求,現(xiàn)在有許多線程要執(zhí)行师妙,我們要控制同時(shí)執(zhí)行的線程數(shù)量诵肛,當(dāng)達(dá)到這個(gè)數(shù)量時(shí),其他線程必須等待疆栏,只有當(dāng)有線程執(zhí)行完畢時(shí)曾掂,才會(huì)喚醒等待線程惫谤,讓它執(zhí)行壁顶。
形象地解釋就是,有5個(gè)廁所溜歪,但是有10個(gè)人準(zhǔn)備上廁所若专。所以只有前面5個(gè)人才能上廁所,后面的人必須等待蝴猪。
2.1 Semaphore類介紹
在java中用Semaphore類就可以很方便地實(shí)現(xiàn)這個(gè)功能调衰,Semaphore類是一個(gè)共享鎖,所以允許多個(gè)線程獲取共享鎖自阱,它有一個(gè)permits許可數(shù)變量嚎莉,只有當(dāng)permits許可數(shù)大于0時(shí),線程才可以獲取共享鎖沛豌,否則線程就要阻塞等待趋箩。
2.2 重要屬性和方法
/** AQS的子類,使用實(shí)現(xiàn)共享鎖 */
private final Sync sync;
// 獲取共享鎖加派,如果獲取不到叫确,就一直等待。不響應(yīng)中斷請(qǐng)求
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
// 釋放共享鎖芍锦,當(dāng)前線程添加一個(gè)許可竹勉,喚醒等待許可的線程
public void release() {
sync.releaseShared(1);
}
可以看出Semaphore類獲取鎖以及釋放鎖的方法,都是通過sync的方法實(shí)現(xiàn)的娄琉。Sync是AQS的子類次乓,實(shí)現(xiàn)了tryAcquireShared和tryReleaseShared方法。
2.3 Sync內(nèi)部類
/**
* 非公平方式獲取 Semaphore共享鎖
* 每當(dāng)獲取Semaphore共享鎖孽水,都會(huì)消耗一定數(shù)量的許可票腰,也就是會(huì)減少state的值。
* 當(dāng)state減去要獲取的許可數(shù)的剩余值小于0時(shí)匈棘,表示不能提供想要的許可數(shù)丧慈,
* 獲取共享鎖失敗,當(dāng)前線程必須等待。
*/
final int nonfairTryAcquireShared(int acquires) {
// 采用CAS方式的樂觀鎖逃默,來實(shí)現(xiàn)修改state狀態(tài)時(shí)的多線程安全鹃愤,沒有阻塞線程
for (;;) {
// 得到當(dāng)前的許可數(shù)
int available = getState();
// 計(jì)算減去想要獲取許可數(shù)acquires之后的剩余值remaining
int remaining = available - acquires;
// 1. 如果remaining小于0,表示不能提供想要的許可數(shù)完域,獲取共享鎖失敗软吐,返回remaining值
// 2. 如果remaining大于等于0,則調(diào)用compareAndSetState方法吟税。
// 如果CAS調(diào)用失敗凹耙,表示有別的線程修改了鎖的狀態(tài)state,那么就循環(huán)重試肠仪,
// 如果成功表示獲取共享鎖成功肖抱,返回remaining值(肯定大于等于0)
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
/**
* 嘗試去釋放共享鎖。返回true表示釋放共享鎖成功
* 每次釋放Semaphore共享鎖异旧,都會(huì)添加一定數(shù)量的許可意述,也就是會(huì)增加state的值。
* (所以Semaphore擁有的許可總數(shù)是可以比初始傳遞的許可數(shù)要多的吮蛹。)
* 而且只要修改state狀態(tài)成功荤崇,就表示添加了一些許可,這個(gè)時(shí)候就要喚醒那些等待許可被阻塞的線程潮针。
*/
protected final boolean tryReleaseShared(int releases) {
// 采用CAS方式的樂觀鎖术荤,來實(shí)現(xiàn)修改state狀態(tài)時(shí)的多線程安全,沒有阻塞線程
for (;;) {
// 得到當(dāng)前的許可數(shù)
int current = getState();
// 計(jì)算新的許可數(shù)next
int next = current + releases;
// 表示許可數(shù)超出了int類型的上限每篷,拋出Error
if (next < current)
throw new Error("Maximum permit count exceeded");
// CAS返回false表示別的線程修改了鎖的state值瓣戚,那么循環(huán)重試
// 返回true表示添加了新的許可,就要喚醒那些等待許可被阻塞的線程雳攘。
if (compareAndSetState(current, next))
return true;
}
}
Sync類實(shí)現(xiàn)了非公平方法獲取鎖的方法带兜,以及釋放鎖的方法。
通過tryReleaseShared方法吨灭,我們發(fā)現(xiàn)permits許可數(shù)是允許超過初始化中賦予的值刚照。也就是說廁所是有可能變多的。
2.4 NonfairSync 和 FairSync內(nèi)部類
/**
* 非公平鎖
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
// 初始化擁有的許可數(shù)
NonfairSync(int permits) {
super(permits);
}
// 非公平方式獲取共享鎖
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* 公平鎖
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
// 初始化擁有的許可數(shù)
FairSync(int permits) {
super(permits);
}
/**
*
* @param acquires
*/
protected int tryAcquireShared(int acquires) {
// 采用CAS方式的樂觀鎖喧兄,來實(shí)現(xiàn)修改state狀態(tài)時(shí)的多線程安全无畔,沒有阻塞線程
for (;;) {
// hasQueuedPredecessors返回true,表示在等待鎖的線程池中吠冤,有別的線程在當(dāng)前線程之前浑彰,
// 根據(jù)公平鎖的原則,當(dāng)前線程不能獲取鎖拯辙。
if (hasQueuedPredecessors())
return -1;
// 得到當(dāng)前的許可數(shù)
int available = getState();
// 計(jì)算減去想要獲取許可數(shù)acquires之后的剩余值remaining
int remaining = available - acquires;
// 1. 如果remaining小于0郭变,表示不能提供想要的許可數(shù)颜价,獲取共享鎖失敗,返回remaining值
// 2. 如果remaining大于等于0诉濒,則調(diào)用compareAndSetState方法周伦。
// 如果CAS調(diào)用失敗,表示有別的線程修改了鎖的狀態(tài)state未荒,那么就循環(huán)重試专挪,
// 如果成功表示獲取共享鎖成功,返回remaining值(肯定大于等于0)
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
公平鎖和非公平鎖最大的區(qū)別就是片排,獲取鎖的時(shí)候寨腔,是否要判斷當(dāng)前線程是不是等待線程隊(duì)列中的第一個(gè)。
2.5 Semaphore類使用示例
import java.util.concurrent.Semaphore;
public class CountDownLatchTest1 {
public static void newThread(Semaphore semaphore, String name, int time) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("線程"+Thread.currentThread().getName()+" 開始");
try {
semaphore.acquire();
System.out.println("線程"+Thread.currentThread().getName()+" 獲取了許可");
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
semaphore.release();
System.out.println("線程"+Thread.currentThread().getName()+" 結(jié)束");
}
}, name);
t.start();
}
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
newThread(semaphore,"t"+i, 100);
}
}
}
執(zhí)行結(jié)果:
線程t1 開始
線程t1 獲取了許可
線程t2 開始
線程t2 獲取了許可
線程t3 開始
線程t3 獲取了許可
線程t4 開始
線程t5 開始
線程t6 開始
線程t1 結(jié)束
線程t4 獲取了許可
線程t2 結(jié)束
線程t6 獲取了許可
線程t3 結(jié)束
線程t5 獲取了許可
線程t4 結(jié)束
線程t5 結(jié)束
線程t6 結(jié)束
三. CountDownLatch
我們經(jīng)常碰到這樣一個(gè)需求率寡,想讓當(dāng)前線程等待另一個(gè)線程完成之后在操作迫卢,這個(gè)很簡(jiǎn)單,在當(dāng)前線程中調(diào)用另一個(gè)線程的join方法勇劣。
但是如果是想讓當(dāng)前線程等待多個(gè)線程完成之后在操作靖避,那么怎么辦呢潭枣?
有人說這個(gè)也簡(jiǎn)單比默,在當(dāng)前線程中分別調(diào)用線程的join方法,這樣做是可以的盆犁,但是顯得繁瑣命咐。
那么如果讓多個(gè)線程等待多個(gè)線程完成之后在操作,該怎么辦呢谐岁?在使用join方法就有點(diǎn)不切實(shí)際了醋奠。而這些需求都可以使用CountDownLatch類來實(shí)現(xiàn)。
3.1 CountDownLatch類介紹
CountDownLatch類也是一個(gè)共享鎖伊佃,它有個(gè)變量count窜司,當(dāng)獲取共享鎖時(shí),只有count等于0航揉,才能獲取成功塞祈,否則都是獲取失敗,當(dāng)前線程阻塞等待帅涂。
而變量count是通過CountDownLatch類的countDown方法來減少的议薪,當(dāng)count減少到0時(shí),相當(dāng)于釋放共享鎖媳友,要喚醒所有等待阻塞的線程斯议。
3.2 CountDownLatch類重要屬性和方法
/** AQS的子類,使用實(shí)現(xiàn)共享鎖 */
private final Sync sync;
// 獲取鎖醇锚,如果獲取不到哼御,就一直等待。如果有中斷請(qǐng)求就拋出異常
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 嘗試釋放共享鎖, 將count值減一
public void countDown() {
sync.releaseShared(1);
}
可以看出CountDownLatch類獲取鎖以及釋放鎖的方法恋昼,也都是通過sync的方法實(shí)現(xiàn)的尿扯。Sync是AQS的子類,實(shí)現(xiàn)了tryAcquireShared和tryReleaseShared方法焰雕。
3.3 Sync內(nèi)部類
/**
* 嘗試獲取共享鎖衷笋。根據(jù)共享鎖的規(guī)則,當(dāng)返回值小于0表示獲取失敗矩屁,當(dāng)前線程需要等待辟宗。
* 我們發(fā)現(xiàn)當(dāng)前只有當(dāng)state值等于0的時(shí)候,才能獲取共享鎖吝秕。
* 而state值在兩個(gè)地方發(fā)生改變泊脐,1. Sync(int count)初始化的時(shí)候
* 2. tryReleaseShared嘗試釋放共享鎖的時(shí)候。
* 注意:tryAcquireShared方法并沒有改變state的值烁峭。
*/
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
/**
* 嘗試去釋放CountDownLatch共享鎖容客,返回true,表示釋放成功约郁。
* 當(dāng)state大于0時(shí)缩挑,每次調(diào)用tryReleaseShared方法,都會(huì)將state數(shù)量減一鬓梅。
* 只有當(dāng)state等于0的時(shí)候供置,返回true,成功釋放鎖绽快,就會(huì)喚醒等待鎖的線程芥丧。
*/
protected boolean tryReleaseShared(int releases) {
// 采用CAS方式的樂觀鎖,來實(shí)現(xiàn)修改state狀態(tài)時(shí)的多線程安全坊罢,沒有阻塞線程
for (;;) {
//
int c = getState();
//等于0续担,說明共享鎖已經(jīng)被釋放,喚醒等待鎖的線程操作也已經(jīng)發(fā)出活孩,所以這個(gè)返回false物遇。
if (c == 0)
return false;
// 將state數(shù)量減一。
int nextc = c-1;
// 返回false诱鞠,表示state被別的線程更改了挎挖,所以循環(huán)重試。
if (compareAndSetState(c, nextc))
// 等于0航夺,表示完全釋放CountDownLatch共享鎖了蕉朵。
return nextc == 0;
}
}
注意:CountDownLatch類中Sync沒有公平鎖和非公平鎖的子類,因?yàn)閷?duì)于CountDownLatch類共享鎖來說阳掐,獲取鎖的成功條件只有一個(gè)就是count等于0.
3.4 CountDownLatch的使用示例
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest {
public static void newThread(CountDownLatch latch, String name, int time) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("線程"+Thread.currentThread().getName()+" 開始");
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 嘗試釋放CountDownLatch鎖
latch.countDown();
System.out.println("線程"+Thread.currentThread().getName()+" 結(jié)束");
}
}, name);
t.start();
}
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(4);
newThread(latch,"t1", 10);
newThread(latch,"t2", 100);
newThread(latch,"t3", 200);
newThread(latch,"t4", 500);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("線程"+Thread.currentThread().getName()+" 開始");
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程"+Thread.currentThread().getName()+" 結(jié)束");
}
}, "thread000000000").start();
latch.await();
System.out.println("主線程結(jié)束");
}
}
執(zhí)行結(jié)果:
線程t1 開始
線程t2 開始
線程t3 開始
線程t4 開始
線程thread000000000 開始
線程t1 結(jié)束
線程t2 結(jié)束
線程t3 結(jié)束
線程t4 結(jié)束
主線程結(jié)束
線程thread000000000 結(jié)束
主線程和thread000000000線程都會(huì)等待t1始衅、t2冷蚂、t3、t4線程都執(zhí)行完畢之后汛闸,才繼續(xù)執(zhí)行蝙茶。