簡介
JDK中提供了一些用于線程之間協(xié)同等待的工具類饵隙,CountDownLatch和CyclicBarrier就是最典型的兩個線程同步輔助類。
<b>CountDownLatch :</b>一個同步輔助類刚梭,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個或多個線程一直等待
<b>CyclicBarrier :</b>一個同步輔助類,它允許一組線程互相等待梭域,直到到達(dá)某個公共屏障點 (common barrier point)酥夭。
二者功能實現(xiàn)的區(qū)別:
1.CountDownLatch是一次性的赐纱,CyclicBarrier可以重設(shè)置。
2.CountDownLatch強(qiáng)調(diào)一個線程等多個線程完成某件事情采郎。CyclicBarrier是多個線程互等千所,等大家都完成。
3.CyclicBarrier有g(shù)etNumberWaiting接口返回被阻塞的線程數(shù)
二者功能介紹:
<h4>使用CountDownLatch實現(xiàn):</h4>
1蒜埋、5個運動員相繼都準(zhǔn)備就緒
2淫痰、教練員響起發(fā)令槍
3、運動員起跑
流程圖:
<pre>
Paste_Image.png
</pre>
demo code:
<pre>
public class TestCountDownLatch {
private static final int RUNNER_NUMBER = 5; // 運動員個數(shù)
private static final Random RANDOM = new Random();
public static void main(String[] args) {
// 用于判斷發(fā)令之前運動員是否已經(jīng)完全進(jìn)入準(zhǔn)備狀態(tài)整份,需要等待5個運動員待错,所以參數(shù)為5
CountDownLatch readyLatch = new CountDownLatch(RUNNER_NUMBER);
// 用于判斷裁判是否已經(jīng)發(fā)令,只需要等待一個裁判烈评,所以參數(shù)為1
CountDownLatch startLatch = new CountDownLatch(1);
for (int i = 0; i < RUNNER_NUMBER; i++) {
Thread t = new Thread(new Runner((i + 1) + "號運動員", readyLatch, startLatch));
t.start();
}
try {
readyLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("裁判:所有運動員準(zhǔn)備完畢火俄,開始...");
startLatch.countDown();
}
static class Runner implements Runnable {
private CountDownLatch readyLatch;
private CountDownLatch startLatch;
private String name;
public Runner(String name, CountDownLatch readyLatch, CountDownLatch startLatch) {
this.name = name;
this.readyLatch = readyLatch;
this.startLatch = startLatch;
}
public void run() {
int readyTime = RANDOM.nextInt(1000);
try {
Thread.sleep(readyTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + ":我已經(jīng)準(zhǔn)備完畢.");
readyLatch.countDown();
try {
startLatch.await(); // 等待裁判發(fā)開始命令
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + ":開跑...");
}
}
}
---------------------------輸出----------------------------------------------------------
3號運動員:我已經(jīng)準(zhǔn)備完畢.
1號運動員:我已經(jīng)準(zhǔn)備完畢.
4號運動員:我已經(jīng)準(zhǔn)備完畢.
5號運動員:我已經(jīng)準(zhǔn)備完畢.
2號運動員:我已經(jīng)準(zhǔn)備完畢.
裁判:所有運動員準(zhǔn)備完畢,開始...
3號運動員:開跑...
4號運動員:開跑...
1號運動員:開跑...
2號運動員:開跑...
5號運動員:開跑...
</pre>
<h4>使用CyclicBarrier模擬實現(xiàn):</h4>
1讲冠、前端調(diào)用restful api,已知商品id以后,調(diào)用后端商品起價接口瓜客、商品圖片信息接口
2、調(diào)用后端商品起價接口竿开、商品圖片信息接口
3谱仪、在匯總模塊中將多個接口的值拼接組合返回給前端
流程圖:
<pre>
Paste_Image.png
</pre>
demo code:
<pre>
public class TestCyclicBarrier {
private static final int THREAD_NUMBER = 2;
private static final Random RANDOM = new Random();
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(THREAD_NUMBER, new Runnable() {
public void run() {
System.out.println("2個接口都調(diào)用完成進(jìn)行后續(xù)處理。否彩。疯攒。");
}
});
for (int i = 0; i < THREAD_NUMBER; i++) {
Thread t = new Thread(new Worker(barrier,i));
t.start();
}
}
static class Worker implements Runnable {
private CyclicBarrier barrier;
private int apiIndex;
public Worker(CyclicBarrier barrier,int apiIndex) {
this.barrier = barrier;
this.apiIndex = apiIndex;
}
public void run() {
int time = RANDOM.nextInt(1000);
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("接口" + apiIndex + "調(diào)用完成");
try {
barrier.await(); // 等待所有線程都調(diào)用過此函數(shù)才能進(jìn)行后續(xù)動作
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
---------------------------輸出----------------------------------------------------------
接口0調(diào)用完成
接口1調(diào)用完成
2個接口都調(diào)用完成進(jìn)行后續(xù)處理。列荔。敬尺。
</pre>
<h4>CountDownLatch源碼:</h4>
CountDownLatch.countDown:
<pre>
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //嘗試將初始化的state減運算
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed枚尼,只執(zhí)行這里,中間一大段都會被跳過
break;
}
}
</pre>
CountDownLatch.await:
<pre>
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //state狀態(tài)不為0.既
doAcquireSharedInterruptibly(arg);
}
Node節(jié)點的waitStatus取值:
static final int CANCELLED = 1; //節(jié)點因為超時或者中斷被取消砂吞。該狀態(tài)不會再發(fā)生變署恍,而且被取消節(jié)點對應(yīng)的線程不會再發(fā)生阻塞。
static final int SIGNAL = -1; //后繼節(jié)點將被或者已經(jīng)被阻塞呜舒,所以當(dāng)前節(jié)點在釋放或者取消時锭汛,需要unpark它的后繼節(jié)點。
static final int CONDITION = -2; //該狀態(tài)僅供在條件隊列中的節(jié)點使用袭蝗。當(dāng)該節(jié)點轉(zhuǎn)移到同步隊列中時唤殴,該狀態(tài)將被設(shè)置為0。
static final int PROPAGATE = -3; //僅在共享模式下使用到腥。在doReleaseShared()方法中朵逝,僅僅會設(shè)置頭節(jié)點的狀態(tài)為PROPAGATE。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); //共享的形式
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor(); //前一個節(jié)點
if (p == head) {
int r = tryAcquireShared(arg); //判斷state值
if (r >= 0) {
setHeadAndPropagate(node, r); //設(shè)置頭結(jié)點為node,且最終設(shè)置頭結(jié)點的waitStatus為Node.PROPAGATE且喚醒node后面的狀態(tài)<0的某個結(jié)點(如果有的話)
p.next = null; // help GC
failed = false;
return; //結(jié)束阻塞
}
}
if (shouldParkAfterFailedAcquire(p, node) && //shouldParkAfterFailedAcquire 判斷前一個結(jié)點的狀態(tài)來確定結(jié)點是否應(yīng)該被阻塞
parkAndCheckInterrupt()) //阻塞且判斷是否被中斷乡范,如果被中斷則拋異常中斷
throw new InterruptedException();
}
} finally {
if (failed) //異常的話acquire失敗配名,在阻塞隊列中取消node,如果node為頭結(jié)點的話,且喚醒node后面的狀態(tài)<0的某個結(jié)點(如果有的話)
cancelAcquire(node);
}
}
</pre>
<h4>CyclicBarrier源碼:</h4>
CyclicBarrier:因為這個類代碼量比較少全局分析一下:
<pre>
public class CyclicBarrier {
private static class Generation {
boolean broken = false; //當(dāng)前代被中止
}
private final ReentrantLock lock = new ReentrantLock(); //lock
private final Condition trip = lock.newCondition();
private final int parties; //屏障需要攔截的任務(wù)數(shù)
private final Runnable barrierCommand; //線程都執(zhí)行結(jié)束到這個屏障以后執(zhí)行執(zhí)行的任務(wù)
private Generation generation = new Generation(); //當(dāng)前代,主要是為了reset的時候預(yù)留
private int count; //用于統(tǒng)計剩余任務(wù)數(shù)晋辆,初始化的時候=parties渠脉。為0的時候結(jié)束釋放鎖,lock.unlock();
private void nextGeneration() {
// signal completion of last generation
trip.signalAll(); //通知所有被阻塞的線程
// set up next generation
count = parties;
generation = new Generation();
}
private void breakBarrier() {
generation.broken = true; //中斷
count = parties; //count被重置
trip.signalAll();
}
private int dowait(boolean timed, long nanos) //可以定義是否帶超時的等待
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken) //屏障被中斷
throw new BrokenBarrierException();
if (Thread.interrupted()) { //當(dāng)前線程被中斷
breakBarrier(); //喚醒所有
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); //如果barrierCommand不為空的話執(zhí)行任務(wù)瓶佳,不是start重啟一個線程
ranAction = true;
nextGeneration(); //notice all 且更新?lián)Q代
return 0;
} finally {
if (!ranAction)
breakBarrier(); //如果執(zhí)行barrierCommand失敗的話芋膘,喚醒所有wait的線程
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;)
try {
if (!timed) //如果沒有設(shè)置wait時間話
trip.await(); //阻塞
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos); //設(shè)置等待時間
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) { //未中斷
breakBarrier();//則重新中斷
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
public CyclicBarrier(int parties, Runnable barrierAction) { //初始化
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) { //只有線程數(shù),沒有task霸饲,表示到達(dá)barrier后什么也不做
this(parties, null);
}
public int getParties() {
return parties;
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
}
public int await(long timeout, TimeUnit unit) //設(shè)置阻塞的最大時間
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
public boolean isBroken() { //返回屏障的阻塞狀態(tài)为朋,感覺用ReentrantReadWriteLock 的readLock效率更高
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
public void reset() { //重置
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;//返回被阻塞的線程數(shù)
} finally {
lock.unlock();
}
}
}
</pre>
<b>思考:</b>
1、CountDownLatch和CyclicBarrier都是通過設(shè)置的阻塞任務(wù)數(shù)減操作厚脉,而不是網(wǎng)上有些人說的CyclicBarrier就是通過count加知道任務(wù)數(shù)(JDK 1.7)
2习寸、CountDownLatch更多是直接結(jié)合AQS來做阻塞使用的是共享鎖,而CyclicBarrier是直接用ReentrantLock來實現(xiàn)使用的是排它鎖傻工,雖然ReentrantLock也是使用了AQS來實現(xiàn)霞溪,
3、CyclicBarrier的代碼結(jié)構(gòu)看起來更簡單清晰中捆,CountDownLatch用的很多基礎(chǔ)的AQS的方法
4威鹿、CountDownLatch偏向于計數(shù),CyclicBarrier可以指定柵欄結(jié)束以后的任務(wù)也方便重置當(dāng)前柵欄