前言
線程并發(fā)系列文章:
Java 線程基礎(chǔ)
Java 線程狀態(tài)
Java “優(yōu)雅”地中斷線程-實(shí)踐篇
Java “優(yōu)雅”地中斷線程-原理篇
真正理解Java Volatile的妙用
Java ThreadLocal你之前了解的可能有誤
Java Unsafe/CAS/LockSupport 應(yīng)用與原理
Java 并發(fā)"鎖"的本質(zhì)(一步步實(shí)現(xiàn)鎖)
Java Synchronized實(shí)現(xiàn)互斥之應(yīng)用與源碼初探
Java 對(duì)象頭分析與使用(Synchronized相關(guān))
Java Synchronized 偏向鎖/輕量級(jí)鎖/重量級(jí)鎖的演變過(guò)程
Java Synchronized 重量級(jí)鎖原理深入剖析上(互斥篇)
Java Synchronized 重量級(jí)鎖原理深入剖析下(同步篇)
Java并發(fā)之 AQS 深入解析(上)
Java并發(fā)之 AQS 深入解析(下)
Java Thread.sleep/Thread.join/Thread.yield/Object.wait/Condition.await 詳解
Java 并發(fā)之 ReentrantLock 深入分析(與Synchronized區(qū)別)
Java 并發(fā)之 ReentrantReadWriteLock 深入分析
Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(原理篇)
Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(應(yīng)用篇)
最詳細(xì)的圖文解析Java各種鎖(終極篇)
線程池必懂系列
前面分析了基于AQS的獨(dú)占鎖ReentrantLock慰于、共享鎖/獨(dú)占鎖ReentrantReadWriteLock滓窍,它們內(nèi)部都實(shí)現(xiàn)了Lock 接口珊豹。而AQS還有其它常用的子類封裝器样悟,它們雖然沒(méi)有實(shí)現(xiàn)Lock接口锣夹,但可以用來(lái)做線程間的同步舰绘,接下來(lái)將要來(lái)深入了解它們墅垮。
通過(guò)本篇文章民假,你將了解到:
1可免、Semaphore 原理分析
2抓于、CountDownLatch 原理分析
3、CyclicBarrier 原理分析
1浇借、Semaphore 原理分析
場(chǎng)景引入
ReentrantReadWriteLock 里有讀鎖和寫鎖捉撮,其中讀鎖是共享鎖,其核心是對(duì)AQS里的"state"變量進(jìn)行操作妇垢,每獲取一次鎖巾遭,將state加1,釋放鎖將state減1闯估。從這里可以看出灼舍,將state作為共享資源能夠?qū)崿F(xiàn)線程間的協(xié)作。
現(xiàn)在有個(gè)需求:資源是共享的涨薪,但是數(shù)量有限骑素,因此沒(méi)拿到資源的需要等待別人釋放資源。
將state作為標(biāo)記共享資源的數(shù)量尤辱,那么就有:
1砂豌、線程占有資源后將state減1,線程釋放資源后將state加1光督。
2阳距、若線程沒(méi)拿到資源(資源都被其它線程占有了),那么掛起等待结借。
3筐摘、線程釋放資源后,喚醒其它等待該資源的線程。
這樣子咖熟,不用synchronized+wait/notify與ReentrantLock+await/signal圃酵,也依然能夠?qū)崿F(xiàn)線程間同步。
具體到現(xiàn)實(shí)場(chǎng)景:
如停車場(chǎng)只能容納一定數(shù)量的車子馍管,當(dāng)停車場(chǎng)停滿了車(入場(chǎng)許可發(fā)放完了)郭赐,其它想進(jìn)來(lái)的車子必須等待有其它車從停車場(chǎng)開(kāi)出(釋放入場(chǎng)許可)。
Semaphore 構(gòu)造
指定初始的許可個(gè)數(shù):
#Semaphore.java
public Semaphore(int permits) {
//默認(rèn)是非公平
sync = new NonfairSync(permits);
}
Sync(int permits) {
setState(permits);
}
可以看出許可的個(gè)數(shù)就是state的值确沸。
Semaphore 占有許可
通過(guò)調(diào)用acquire(xx)占有許可:
#Semaphore.java
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
//交給AQS處理捌锭,可中斷
sync.acquireSharedInterruptibly(permits);
}
#AbstractQueuedSynchronizer.java
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//發(fā)生了中斷,直接返回
if (Thread.interrupted())
throw new InterruptedException();
//嘗試修改state(減)
if (tryAcquireShared(arg) < 0)
//修改失敗罗捎,則掛起等待
doAcquireSharedInterruptibly(arg);
}
每次可以占有多個(gè)許可观谦,若占有成功則直接返回,否則掛起等待桨菜。
具體的操作state在tryAcquireShared(xx)里實(shí)現(xiàn)豁状,此處以非公平模式說(shuō)明:
#Semaphore.java
final int nonfairTryAcquireShared(int acquires) {
//死循環(huán)確保修改state成功,或者state已經(jīng)獲取完了
for (;;) {
//獲取state
int available = getState();
//減少state
int remaining = available - acquires;
if (remaining < 0 ||
//CAS 操作
compareAndSetState(available, remaining))
return remaining;
}
}
Semaphore 釋放許可
占有許可做了相應(yīng)的任務(wù)后倒得,就可以釋放許可了泻红。
通過(guò)調(diào)用release(xx)釋放許可:
#Semaphore.java
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
//AQS 實(shí)現(xiàn)
sync.releaseShared(permits);
}
#AbstractQueuedSynchronizer.java
public final boolean releaseShared(int arg) {
//嘗試修改state(加)
if (tryReleaseShared(arg)) {
//成功修改state,喚醒后繼節(jié)點(diǎn)
doReleaseShared();
return true;
}
//修改失敗
return false;
}
具體的操作state在tryReleaseShared(xx)里實(shí)現(xiàn):
#Semaphore.java
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//獲取state
int current = getState();
//增加
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//修改
if (compareAndSetState(current, next))
return true;
}
}
可以看出:
釋放許可屎暇,增加state承桥,占有許可驻粟,減少state根悼。
另外,Semaphore 占有許可可分為公平與非公平模式蜀撑,占有許可過(guò)程可中斷/不可中斷挤巡。
Semaphore 與Lock 區(qū)別
與ReentrantLock、ReentrantReadWriteLock 區(qū)別在于從不同的角度看待state:
1酷麦、ReentrantLock矿卑、ReentrantReadWriteLock 獲取鎖的過(guò)程是將state值增大,而Semaphore 占有許可是將state值減小沃饶。
2母廷、ReentrantLock、ReentrantReadWriteLock 釋放鎖的過(guò)程是將state值減小糊肤,而Semaphore 釋放許可是將state值增大琴昆。
3、這也是AQS的靈活之處馆揉,將具體的"state"鎖代表的意義由子類實(shí)現(xiàn)业舍,可實(shí)現(xiàn)不同場(chǎng)景的應(yīng)用。
2、CountDownLatch 原理分析
場(chǎng)景引入
A舷暮、B态罪、C三個(gè)線程協(xié)作:
A 等待B、C完成任務(wù)后再進(jìn)行下一步操作下面。
這場(chǎng)景我們可能會(huì)想到用Thread.join()复颈,A調(diào)用B.join(),C.join()沥割,A阻塞等待券膀,當(dāng)B、C線程執(zhí)行結(jié)束后喚醒A驯遇。這種方式雖然能夠解決問(wèn)題芹彬,但是有些不盡人意的地方:比如說(shuō)A不一定要等待B、C執(zhí)行完成叉庐,而是B舒帮、C中途完成某個(gè)任務(wù)后通知A;又比如陡叠,B玩郊、C線程不止執(zhí)行一次任務(wù),而是一定的次數(shù)后才會(huì)喚醒A枉阵,這個(gè)時(shí)候使用Thread.join() 就無(wú)法解決問(wèn)題了译红。
而CountDownLatch 可以很好地解決這問(wèn)題。
CountDownLatch 構(gòu)造
#CountDownLatch.java
//初始化次數(shù)
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
//設(shè)置state
setState(count);
}
可以看出兴溜,count的值最終反饋到state上侦厚。
CountDownLatch 等待
通過(guò)await(xx)等待state變?yōu)?:
#CountDownLatch.java
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
//超時(shí)返回
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
#AbstractQueuedSynchronizer.java
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
//該方法響應(yīng)中斷
if (Thread.interrupted())
throw new InterruptedException();
//主要工作在tryAcquireShared(xx)里
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
又是AQS的套路,具體的操作state在tryAcquireShared(xx)里實(shí)現(xiàn):
#CountDownLatch.java
protected int tryAcquireShared(int acquires) {
//若state == 0拙徽,則返回1刨沦,否則-1
//外層判斷>=0,說(shuō)明當(dāng)前state還有數(shù)量膘怕,則需要阻塞等待想诅,否則不阻塞
return (getState() == 0) ? 1 : -1;
}
與其它子類實(shí)現(xiàn)的tryAcquireShared(xx)方法不同的是,CountDownLatch里的Sync并沒(méi)有修改state的值岛心,僅僅只是判斷state?=0進(jìn)而做具體的操作而已来破。
由此可知:CountDownLatch 是基于AQS的共享模式。
CountDownLatch 倒數(shù)計(jì)數(shù)
既然調(diào)用await(xx)可能會(huì)使得線程阻塞等待忘古,那么勢(shì)必有其它線程喚醒它徘禁,調(diào)用的方法即是countDown():
#CountDownLatch.java
public void countDown() {
sync.releaseShared(1);
}
#AbstractQueuedSynchronizer.java
public final boolean releaseShared(int arg) {
//子類實(shí)現(xiàn)
if (tryReleaseShared(arg)) {
//AQS里實(shí)現(xiàn),喚醒阻塞的線程
doReleaseShared();
return true;
}
return false;
}
同樣的存皂,具體的操作state在tryReleaseShared(xx)里實(shí)現(xiàn):
#CountDownLatch.java
protected boolean tryReleaseShared(int releases) {
for (;;) {
//獲取state
int c = getState();
//若當(dāng)前state==0晌坤,說(shuō)明已經(jīng)沒(méi)有可以釋放的了
if (c == 0)
return false;
int nextc = c-1;
//CAS修改
if (compareAndSetState(c, nextc))
//說(shuō)明可以喚醒其它線程了
return nextc == 0;
}
}
也即是說(shuō)逢艘,當(dāng)線程調(diào)用await(xx)阻塞后,其它線程通過(guò)countDown()修改state值骤菠,若是發(fā)現(xiàn)state最終變?yōu)?了它改,那么喚醒阻塞的線程。
用圖表示CountDownLatch主要結(jié)構(gòu)如下:
CountDownLatch 與其它AQS子類封裝器的區(qū)別
前面已經(jīng)分析了基于AQS的封裝器:ReentrantLock商乎、ReentrantReadWriteLock央拖、Semaphore,它們對(duì)state值的修改包括增加與減少鹉戚,而CountDownLatch 只是減小state的值鲜戒,用以實(shí)現(xiàn)倒數(shù)計(jì)數(shù)的功能。
可類比場(chǎng)景如下:
1抹凳、田徑運(yùn)動(dòng)場(chǎng)開(kāi)始百米賽跑遏餐。
2、運(yùn)動(dòng)員在跑道上各就各位(多個(gè)線程調(diào)用await 阻塞等待)赢底。
3失都、裁判喊倒數(shù)3、2幸冻、1(線程調(diào)用countDown)粹庞。
4、等待倒數(shù)結(jié)束洽损,發(fā)令槍響庞溜,運(yùn)動(dòng)員就開(kāi)始跑(線程被喚醒,繼續(xù)做事)碑定。
可以看出流码,運(yùn)動(dòng)員不會(huì)去干涉裁判的倒數(shù)(修改state值)。
3不傅、CyclicBarrier 原理分析
場(chǎng)景引入
在CountDownLatch 場(chǎng)景里說(shuō)到運(yùn)動(dòng)員需要裁判旅掂,想想可以不需要裁判嗎赏胚?運(yùn)動(dòng)員之間自發(fā)倒數(shù)访娶,倒數(shù)結(jié)束就一起跑。
更普遍的場(chǎng)景是:
1觉阅、幾個(gè)驢友想去某個(gè)景點(diǎn)旅游崖疤,約定了在某個(gè)地方集合后再一起出發(fā)。
2典勇、每個(gè)驢友到達(dá)集合點(diǎn)時(shí)打卡并看人都到齊了沒(méi)劫哼,沒(méi)到齊則等待。
3割笙、若最后一個(gè)參與者過(guò)來(lái)后發(fā)現(xiàn)人到齊了权烧,于是告訴大家不用等了眯亦,出發(fā)吧。
CyclicBarrier 可滿足該場(chǎng)景的需求般码。
CyclicBarrier 構(gòu)造
#CyclicBarrier.java
public CyclicBarrier(int parties, Runnable barrierAction) {
//必須要有參與者
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
//臨時(shí)變量count
this.count = parties;
//參與者都到達(dá)了后執(zhí)行的動(dòng)作
this.barrierCommand = barrierAction;
}
可以看出妻率,此處并沒(méi)有AQS介入,也就是沒(méi)有直接修改state板祝。
CyclicBarrier是通過(guò)ReentrantLock + Condition 來(lái)實(shí)現(xiàn)線程間同步的:
#CyclicBarrier.java
//獨(dú)占鎖宫静,為了互斥修改count
private final ReentrantLock lock = new ReentrantLock();
//線程等待條件
private final Condition trip = lock.newCondition();
//修改的共享變量
private int count;
CyclicBarrier 等待參與者
接著來(lái)分析,如何實(shí)現(xiàn)線程間的同步的券时。
#CyclicBarrier.java
public int await() throws InterruptedException, BrokenBarrierException {
try {
//實(shí)際調(diào)用doWait()孤里,此處是不限時(shí)等待
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//先鎖住
lock.lock();
try {
final Generation g = generation;
//等待過(guò)程被中斷
if (g.broken)
throw new BrokenBarrierException();
//中斷了線程
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//等待個(gè)數(shù)-1
int index = --count;
if (index == 0) {
//都到齊了,無(wú)需等待了
boolean ranAction = false;
try {
//執(zhí)行既定的方法
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//開(kāi)始下一輪
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
//走到這橘洞,說(shuō)明還需要等待
for (;;) {
try {
if (!timed)
//不限時(shí)等待
trip.await();
else if (nanos > 0L)
//限時(shí)等待捌袜,時(shí)間到了就返回
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//等待過(guò)程被中斷,則拋出異常
if (g == generation && ! g.broken) {
//不等了炸枣,喚醒其它線程
breakBarrier();
throw ie;
} else {
...
Thread.currentThread().interrupt();
}
}
//醒來(lái)后發(fā)現(xiàn)已經(jīng)被中斷了琢蛤,則直接拋出異常
if (g.broken)
throw new BrokenBarrierException();
//已經(jīng)開(kāi)啟了下一輪,說(shuō)明前面一輪都到齊了結(jié)束了
if (g != generation)
return index;
if (timed && nanos <= 0L) {
//超時(shí)了還是沒(méi)到齊抛虏,不等了博其,喚醒其它線程
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
線程先獲取獨(dú)占鎖,然后修改count值迂猴,若發(fā)現(xiàn)修改后count !=0慕淡,那么還需要等待,等待借助的是Condition.await(xx)方法沸毁。
有等待峰髓,自然有喚醒的地方:
#CyclicBarrier.java
private void breakBarrier() {
//置為true,表示已經(jīng)結(jié)束等待了
generation.broken = true;
//重置count息尺,復(fù)用的關(guān)鍵
count = parties;
//喚醒其它在等待的線程
trip.signalAll();
}
用圖表示携兵,等待/喚醒過(guò)程如下:
來(lái)看看CyclicBarrier 主要方法:
CyclicBarrier與CountDownLatch 區(qū)別
看到這,你也許已經(jīng)發(fā)現(xiàn)了CyclicBarrier 和CountDownLatch 實(shí)現(xiàn)的功能很相似搂誉,都是等待某個(gè)條件滿足后再進(jìn)行下一步的動(dòng)作徐紧,兩者不同之處在于:
1、CountDownLatch 參與的線程分為兩類:一個(gè)是等待者炭懊,另一個(gè)是計(jì)數(shù)者并级;CyclicBarrier 參與的線程既是等待者,也是計(jì)數(shù)者侮腹。
2嘲碧、CountDownLatch 完成一次完整的協(xié)作過(guò)程后不能再?gòu)?fù)用,CountDownLatch 可以復(fù)用(不用重新新建CountDownLatch 對(duì)象)父阻。
3愈涩、CountDownLatch 的計(jì)數(shù)值與線程個(gè)數(shù)沒(méi)有必然聯(lián)系望抽,CyclicBarrier 的初始計(jì)數(shù)值與線程個(gè)數(shù)一致。
4履婉、CountDownLatch 基于AQS實(shí)現(xiàn)糠聪,CyclicBarrier 基于ReentrantLock&Condition實(shí)現(xiàn)(內(nèi)部也是基于AQS)。
你可能還有疑惑谐鼎,在下一篇應(yīng)用篇將會(huì)重點(diǎn)體現(xiàn)兩者區(qū)別舰蟆。
下篇文章重點(diǎn)分析:Semaphore/CountDownLatch/CyclicBarrier 實(shí)際應(yīng)用。
本文基于jdk1.8狸棍。