- CountDownLatch
- CyclicBarrier
- Semaphore
CountDownLatch
1. CountDownLatch 的使用
private void countDownTest() {
// 1. 首先我們聲明一個(gè)CountDownLatch實(shí)例胜宇,參數(shù)為我們需要同步的線程個(gè)數(shù)
final CountDownLatch countDownLatch = new CountDownLatch(2);
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
Log.i(TAG, "run: Thread A run");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 2. 在操作完畢后通知
countDownLatch.countDown();
}
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
Log.i(TAG, "run: Thread B run");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 2. 操作完畢后通知
countDownLatch.countDown();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.i(TAG, "run: Thread B run next");
}
}
});
try {
// 3. 在需要同步的線程進(jìn)行等待
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.i(TAG, "countDownTest: main---- run");
executorService.shutdown();
}
我們看下輸出的日志情況
ph_MainActivity: run: Thread A run
ph_MainActivity: run: Thread B run
ph_MainActivity: countDownTest: main---- run
ph_MainActivity: run: Thread B run next
從使用的方法及結(jié)果我們可以看到颅夺,CountDownLatch 可以實(shí)現(xiàn)join 的功能契耿,但是比join更靈活奋早,可以結(jié)合線程池使用默责;并且可以在線程執(zhí)行的任何時(shí)刻進(jìn)行同步过牙,不是必須在任務(wù)結(jié)束時(shí)
2. CountDownLaunch 原理解析
從UML圖中我們得知其使用的AQS實(shí)現(xiàn)的莺丑。
- 構(gòu)造方法
// count 是線程在通過之前必須被調(diào)用的countDown的次數(shù)
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
- await :當(dāng)線程調(diào)用await 方法后線程會(huì)被阻塞谴返,當(dāng)其他線程調(diào)用了相應(yīng)次數(shù)的countdown 方法,計(jì)數(shù)器的state 的值為0 時(shí)命贴;或者其他線程調(diào)用了本線程的intrrupt 方法后 會(huì)拋出異常放回
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
那不Sync 中沒有實(shí)現(xiàn)acquireSharedInterruptibly,我們?cè)贏QS中看下
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
// 如果獲取失敗則進(jìn)入阻塞隊(duì)列
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared 在Sync 中有實(shí)現(xiàn)
// 如果當(dāng)前同步器的狀態(tài) 為0 的話道宅,表示可獲得鎖,否則進(jìn)入阻塞隊(duì)列
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
- await(long timeout, TimeUnit unit) 與await 方法類似胸蛛,只不過當(dāng)超時(shí)會(huì)返回false 而結(jié)束等待
- countDown:調(diào)用該方法后計(jì)數(shù)器值會(huì)遞減污茵,遞減后如果計(jì)數(shù)器值為0則喚醒所有因調(diào)用await 方法二阻塞的線程。
public void countDown() {
// 委托Sync 調(diào)用AQS方法
sync.releaseShared(1);
}
// 共享模式下的釋放
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 這個(gè)方法就是線程在獲得鎖時(shí)葬项,喚醒后續(xù)節(jié)點(diǎn)時(shí)調(diào)用的方法
doReleaseShared();
return true;
}
return false;
}
釋放鎖主要是在tryReleaseShared 中做的泞当,在Sync 中有實(shí)現(xiàn)
// 對(duì) state 進(jìn)行遞減,直到 state 變成 0民珍;
// state 遞減為 0 時(shí)襟士,返回 true盗飒,其余返回 false
protected boolean tryReleaseShared(int releases) {
// 自旋保證 CAS 一定可以成功
for (;;) {
int c = getState();
// state 已經(jīng)是 0 了,直接返回 false
if (c == 0)
return false;
// 對(duì) state 進(jìn)行遞減
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
我們可以看到CountDownLaunch 主要使用了AQS實(shí)現(xiàn)陋桂,主要通過重寫 tryAcquireShared 和 tryReleaseShared 方法進(jìn)行了控制箩兽。
CyclicBarrier
1. CyclicBarrier 使用
final CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
@Override
public void run() {
Log.i(TAG, "run: cyclicBarrier over!");
}
});
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Log.i(TAG, "run: A =====1 ");
cyclicBarrier.await();
Log.i(TAG, "run: A =====2 ");
cyclicBarrier.await();
Log.i(TAG, "run: A =====3 ");
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Log.i(TAG, "run: B =====1 ");
cyclicBarrier.await();
Log.i(TAG, "run: B =====2 ");
cyclicBarrier.await();
Log.i(TAG, "run: B =====3 ");
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
運(yùn)行結(jié)果:
ph_MainActivity: run: A =====1
ph_MainActivity: run: B =====1
ph_MainActivity: run: cyclicBarrier over!
ph_MainActivity: run: B =====2
ph_MainActivity: run: A =====2
ph_MainActivity: run: cyclicBarrier over!
ph_MainActivity: run: A =====3
ph_MainActivity: run: B =====3
CyclicBarrier 使多個(gè)線程相互等待,假如計(jì)數(shù)器為n章喉,前n-1個(gè)線程都會(huì)因?yàn)榈竭_(dá)屏障而被阻塞,當(dāng)?shù)趎個(gè)線程調(diào)用await 后身坐,計(jì)數(shù)器的值為0了秸脱,這時(shí)候會(huì)發(fā)通知喚醒前n-1個(gè)線程。并且CyclicBarrier 是可以復(fù)用的部蛇,可以定制突破屏障后的操作
2. CyclicBarrier 實(shí)現(xiàn)
CyclicBarrier是基于獨(dú)占鎖實(shí)現(xiàn)的摊唇,底層還是基于AQS。
parties:用于記錄多少個(gè)線程調(diào)用await 才會(huì)沖破屏障的個(gè)數(shù)涯鲁,即我們初始化傳入的值
count:開始為parties的值巷查,當(dāng)調(diào)用一次await 后就-1,當(dāng)為0時(shí)到達(dá)屏障調(diào)用await的線程結(jié)束等待抹腿,隨后便會(huì)恢復(fù)為parties 的值用來復(fù)用岛请。
初始化方法:只是進(jìn)行簡單的賦值
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
- await()方法:當(dāng)線程調(diào)用該方法后會(huì)進(jìn)行阻塞,直到滿足一下某個(gè)條件才會(huì)繼續(xù)執(zhí)行:parties 為0警绩,即都到了屏障點(diǎn)崇败;其他線程調(diào)用了本線程的interrupt方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 獲取鎖并上鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
//如果屏障被打破則拋出BrokenBarrierException異常,在調(diào)用breakBarrier 方法時(shí)會(huì)被打破
if (g.broken)
throw new BrokenBarrierException();
// 如果線程被interrupt 則打破屏障并拋出異常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// count 進(jìn)行減1操作
int index = --count;
// 如果為0肩祥,即所有的線程都到達(dá)了屏障點(diǎn)
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 如果設(shè)置的破除屏障點(diǎn)后需要執(zhí)行的任務(wù)不為空則執(zhí)行
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 喚醒所有的線程并重置
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// 循環(huán)進(jìn)行等待后室,直到被喚醒、打破混狠,或者超時(shí)岸霹?TODO 為什么使用循環(huán)?将饺?贡避?
for (;;) {
try {
// 如果沒有設(shè)置超時(shí),則調(diào)用await方法直接進(jìn)行等待
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// 喚醒所有的線程
trip.signalAll();
// 重置屏障參數(shù)
count = parties;
generation = new Generation();
}
- await(timeout, unit):與await 類似予弧,只不過當(dāng)超時(shí)后會(huì)拋出TimeOutException 返回
Semaphore
1. Semaphore 使用方法
final Semaphore semaphore = new Semaphore(0);
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(new Runnable() {
@Override
public void run() {
Log.i(TAG, "run: A===");
semaphore.release();
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
Log.i(TAG, "run: B===");
semaphore.release();
}
});
semaphore.acquire(2);
Log.i(TAG, "semaphT: 1=======end");
executorService.execute(new Runnable() {
@Override
public void run() {
Log.i(TAG, "run: C===");
semaphore.release();
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.i(TAG, "run: D===");
semaphore.release();
}
});
semaphore.tryAcquire(2,1000,TimeUnit.MILLISECONDS);
Log.i(TAG, "semaphT: 2=======end");
輸出結(jié)果:
ph_MainActivity: run: B===
ph_MainActivity: run: A===
ph_MainActivity: semaphT: 1=======end
ph_MainActivity: run: C===
ph_MainActivity: semaphT: 2=======end
ph_MainActivity: run: D===
Semaphore 和CyclicBarrier 類似可以重復(fù)使用
2. SemaphoreUML 圖
Semaphore 的源碼我們就不再分析了如果感興趣可以去看一下