目錄
CountDownLatch原理剖析
日常開發(fā)中經(jīng)常遇到一個線程需要等待一些線程都結(jié)束后才能繼續(xù)向下運行的場景,在CountDownLatch出現(xiàn)之前通常使用join方法來實現(xiàn)缚窿,但join方法不夠靈活,所以開發(fā)了CountDownLatch在张。
示例
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 添加任務(wù)
executorService.execute(new Runnable() {
@Override
public void run() {
try {
// 模擬運行時間
Thread.sleep(1000);
System.out.println("thread one over...");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 遞減計數(shù)器
countDownLatch.countDown();
}
}
});
// 同上
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println("thread two over...");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
}
});
System.out.println("wait all child thread over!");
// 阻塞直到被interrupt或計數(shù)器遞減至0
countDownLatch.await();
System.out.println("all child thread over!");
executorService.shutdown();
}
輸出為:
wait all child thread over!
thread one over...
thread two over...
all child thread over!
CountDownLatch相對于join方法的優(yōu)點大致有兩點:
- 調(diào)用一個子線程的join方法后斤蔓,該線程會一直阻塞直到子線程運行完畢订框,而CountDownLatch允許子線程運行完畢或在運行過程中遞減計數(shù)器,也就是說await方法不一定要等到子線程運行結(jié)束才返回坯台。
- 使用線程池來管理線程一般都是直接添加Runnable到線程池聂沙,這時就沒有辦法再調(diào)用線程的join方法了秆麸,而仍可在子線程中遞減計數(shù)器,也就是說CountDownLatch相比join方法可以更靈活地控制線程的同步及汉。
類圖結(jié)構(gòu)
由圖可知蛔屹,CountDownLatch是基于AQS實現(xiàn)的。
由下面的代碼可知豁生,CountDownLatch的計數(shù)器值就是AQS的state值。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
源碼解析
void await()
當(dāng)線程調(diào)用CountDownLatch的await方法后漫贞,當(dāng)前線程會被阻塞甸箱,直到CountDownLatch的計數(shù)器值遞減至0或者其他線程調(diào)用了當(dāng)前線程的interrupt方法。
public void await() throws InterruptedException {
// 允許中斷(中斷時拋出異常)
sync.acquireSharedInterruptibly(1);
}
// AQS的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// state=0時tryAcquireShared方法返回1迅脐,直接返回
// 否則執(zhí)行doAcquireSharedInterruptibly方法
if (tryAcquireShared(arg) < 0)
// state不為0芍殖,調(diào)用該方法使await方法阻塞
doAcquireSharedInterruptibly(arg);
}
// Sync的方法(重寫了AQS中的該方法)
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// AQS的方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 獲取state值,state=0時r=1谴蔑,直接返回豌骏,不再阻塞
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 若state不為0則阻塞調(diào)用await方法的線程
// 等到其他線程執(zhí)行countDown方法使計數(shù)器遞減至0
// (state變?yōu)?)或該線程被interrupt時
// 該線程才能繼續(xù)向下運行
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
boolean await(long timeout, TimeUnit unit)
相較于上面的await方法,調(diào)用此方法后調(diào)用線程最多被阻塞timeout時間(單位由unit指定)隐锭,即使計數(shù)器沒有遞減至0或調(diào)用線程沒有被interrupt窃躲,調(diào)用線程也會繼續(xù)向下運行。
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
void countDown()
遞減計數(shù)器钦睡,當(dāng)計數(shù)器的值為0(即state=0)時會喚醒所有因調(diào)用await方法而被阻塞的線程蒂窒。
public void countDown() {
// 將計數(shù)器減1
sync.releaseShared(1);
}
// AQS的方法
public final boolean releaseShared(int arg) {
// 當(dāng)state被遞減至0時tryReleaseShared返回true
// 會執(zhí)行doReleaseShared方法喚醒因調(diào)用await方法阻塞的線程
// 否則如果state不是0的話什么也不做
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// Sync重寫的AQS中的方法
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
// 如果state已經(jīng)為0,沒有遞減必要,直接返回
// 否則會使state變成負(fù)數(shù)
if (c == 0)
return false;
int nextc = c-1;
// 通過CAS遞減state的值
if (compareAndSetState(c, nextc))
// 如果state被遞減至0洒琢,返回true以進(jìn)行后續(xù)喚醒工作
return nextc == 0;
}
}
CyclicBarrier原理探究
CountDownLatch的計數(shù)器時一次性的秧秉,也就是說當(dāng)計數(shù)器至變?yōu)?后,再調(diào)用await和countDown方法會直接返回衰抑。而CyclicBarrier則解決了此問題象迎。CyclicBarrier是回環(huán)屏障的意思,它可以使一組線程全部達(dá)到一個狀態(tài)后再全部同時執(zhí)行呛踊,然后重置自身狀態(tài)又可用于下一次的狀態(tài)同步砾淌。
示例
假設(shè)一個任務(wù)由階段1、階段2恋技、階段3組成拇舀,每個線程要串行地執(zhí)行階段1、階段2蜻底、階段3骄崩,當(dāng)多個線程執(zhí)行該任務(wù)時,必須要保證所有線程的階段1都執(zhí)行完畢后才能進(jìn)入階段2薄辅,當(dāng)所有線程的階段2都執(zhí)行完畢后才能進(jìn)入階段3要拂,可用下面的代碼實現(xiàn):
public static void main(String[] args) {
// 等待兩個線程同步
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 運行兩個子線程,當(dāng)兩個子線程的step1都執(zhí)行完畢后才會執(zhí)行step2
// 當(dāng)兩個子線程的step2都執(zhí)行完畢后才會執(zhí)行step3
for(int i = 0; i < 2; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try{
System.out.println(Thread.currentThread() + " step1");
cyclicBarrier.await();
System.out.println(Thread.currentThread() + " step2");
cyclicBarrier.await();
System.out.println(Thread.currentThread() + " step3");
}catch (Exception e){
e.printStackTrace();
}
}
});
}
executorService.shutdown();
}
輸出如下:
Thread[pool-1-thread-1,5,main] step1
Thread[pool-1-thread-2,5,main] step1
Thread[pool-1-thread-1,5,main] step2
Thread[pool-1-thread-2,5,main] step2
Thread[pool-1-thread-2,5,main] step3
Thread[pool-1-thread-1,5,main] step3
類圖結(jié)構(gòu)
CyclicBarrier基于ReentrantLock實現(xiàn)站楚,本質(zhì)上還是基于AQS的脱惰。parties用于記錄線程個數(shù),表示多少個線程調(diào)用await方法后窿春,所有線程才會沖破屏障往下運行拉一。count一開始等于parties,當(dāng)由線程調(diào)用await方法時會遞減1旧乞,當(dāng)count變成0時到達(dá)屏障點蔚润,所有調(diào)用await的線程會一起往下執(zhí)行,此時要重置CyclicBarrier尺栖,再次令count=parties嫡纠。
lock用于保證更新計數(shù)器count的原子性。lock的條件變量trip用于支持線程間使用await和signalAll進(jìn)行通信延赌。
以下是CyclicBarrier的構(gòu)造函數(shù):
public CyclicBarrier(int parties) {
this(parties, null);
}
// barrierAction為達(dá)到屏障點(parties個線程調(diào)用了await方法)時執(zhí)行的任務(wù)
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
Generation的定義如下:
private static class Generation {
// 記錄當(dāng)前屏障是否可以被打破
boolean broken = false;
}
源碼分析
int await()
當(dāng)前線程調(diào)用該方法時會阻塞除盏,直到滿足以下條件之一才會返回:
- parties個線程調(diào)用了await方法,也就是到達(dá)屏障點
- 其他線程調(diào)用了當(dāng)前線程的interrupt方法
- Generation對象的broken標(biāo)志被設(shè)置為true挫以,拋出BrokenBarrierExecption
public int await() throws InterruptedException, BrokenBarrierException {
try {
// false表示不設(shè)置超時時間者蠕,此時后面參數(shù)無意義
// dowait稍后具體分析
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
boolean await(long timeout, TimeUnit unit)
相比于await(),等待超時會返回false掐松。
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
// 設(shè)置了超時時間
// dowait稍后分析
return dowait(true, unit.toNanos(timeout));
}
int dowait(boolean timed, long nanos)
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
// 屏障已被打破則拋出異常
if (g.broken)
throw new BrokenBarrierException();
// 線程中斷則拋出異常
if (Thread.interrupted()) {
// 打破屏障
// 會做三件事
// 1. 設(shè)置generation的broken為true
// 2. 重置count為parites
// 3. 調(diào)用signalAll激活所有等待線程
breakBarrier();
throw new InterruptedException();
}
int index = --count;
// 到達(dá)了屏障點
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 執(zhí)行每一次到達(dá)屏障點所需要執(zhí)行的任務(wù)
command.run();
ranAction = true;
// 重置狀態(tài)蠢棱,進(jìn)入下一次屏障
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 如果index不為0
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 執(zhí)行此處時锌杀,有可能其他線程已經(jīng)調(diào)用了nextGeneration方法
// 此時應(yīng)該使當(dāng)前線程正常執(zhí)行下去
// 否則打破屏障
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
// 如果此次屏障已經(jīng)結(jié)束,則正常返回
if (g != generation)
return index;
// 如果是因為超時泻仙,則打破屏障并拋出異常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
// 打破屏障
private void breakBarrier() {
// 設(shè)置打破標(biāo)志
generation.broken = true;
// 重置count
count = parties;
// 喚醒所有等待的線程
trip.signalAll();
}
private void nextGeneration() {
// 喚醒當(dāng)前屏障下所有被阻塞的線程
trip.signalAll();
// 重置狀態(tài)糕再,進(jìn)入下一次屏障
count = parties;
generation = new Generation();
}
Semaphore原理探究
Semaphore信號量也是一個同步器,與CountDownLatch和CyclicBarrier不同的是玉转,它內(nèi)部的計數(shù)器是遞增的突想,并且在初始化時可以指定計數(shù)器的初始值(通常為0),但不必知道需要同步的線程個數(shù)究抓,而是在需要同步的地方調(diào)用acquire方法時指定需要同步的線程個數(shù)猾担。
示例
public static void main(String[] args) throws InterruptedException {
final int THREAD_COUNT = 2;
// 初始信號量為0
Semaphore semaphore = new Semaphore(0);
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++){
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread() + " over");
// 信號量+1
semaphore.release();
}
});
}
// 當(dāng)信號量達(dá)到2時才停止阻塞
semaphore.acquire(2);
System.out.println("all child thread over!");
executorService.shutdown();
}
類圖結(jié)構(gòu)
由圖可知,Semaphore還是使用AQS實現(xiàn)的刺下,并且可以選取公平性策略(默認(rèn)為非公平性的)绑嘹。
源碼解析
void acquire()
表示當(dāng)前線程希望獲取一個信號量資源,如果當(dāng)前信號量大于0橘茉,則當(dāng)前信號量的計數(shù)減1工腋,然后該方法直接返回。否則如果當(dāng)前信號量等于0畅卓,則被阻塞擅腰。
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 可以被中斷
if (Thread.interrupted())
throw new InterruptedException();
// 調(diào)用Sync子類方法嘗試獲取,這里根據(jù)構(gòu)造函數(shù)決定公平策略
if (tryAcquireShared(arg) < 0)
// 將當(dāng)前線程放入阻塞隊列翁潘,然后再次嘗試
// 如果失敗則掛起當(dāng)前線程
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared由Sync的子類實現(xiàn)以根據(jù)公平性采取相應(yīng)的行為趁冈。
以下是非公平策略NofairSync的實現(xiàn):
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
// 如果剩余信號量小于0直接返回
// 否則如果更新信號量成功則返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
假設(shè)線程A調(diào)用了acquire方法嘗試獲取信號量但因信號量不足被阻塞,這時線程B通過release增加了信號量拜马,此時線程C完全可以調(diào)用acquire方法成功獲取到信號量(如果信號量足夠的話)渗勘,這就是非公平性的體現(xiàn)。
下面是公平性的實現(xiàn):
protected int tryAcquireShared(int acquires) {
for (;;) {
// 關(guān)鍵在于先判斷AQS隊列中是否已經(jīng)有元素要獲取信號量
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
hasQueuedPredecessors方法(可參看第6章 Java并發(fā)包中鎖原理剖析)用于判斷當(dāng)前線程的前驅(qū)節(jié)點是否也在等待獲取該資源俩莽,如果是則自己放棄獲取的權(quán)限呀邢,然后當(dāng)前線程會被放入AQS中,否則嘗試去獲取豹绪。
void acquire(int permits)
可獲取多個信號量。
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
void acquireUninterruptibly()
不對中斷進(jìn)行響應(yīng)申眼。
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
void acquireUninterruptibly(int permits)
不對中斷進(jìn)行相應(yīng)并且可獲取多個信號量瞒津。
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
void release()
使信號量加1,如果當(dāng)前有線程因為調(diào)用acquire方法被阻塞而被放入AQS中的話括尸,會根據(jù)公平性策略選擇一個信號量個數(shù)能被滿足的線程進(jìn)行激活巷蚪。
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// 嘗試釋放資源(增加信號量)
if (tryReleaseShared(arg)) {
// 釋放資源成功則根據(jù)公平性策略喚醒AQS中阻塞的線程
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
void release(int permits)
可增加多個信號量。
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
更多
相關(guān)筆記:《Java并發(fā)編程之美》閱讀筆記