一、概述
CyclicBarrier吐葵,也是位于java.util.concurrent包下的一個并發(fā)同步工具類,光從字面上理解的意思是可循環(huán)使用(Cyclic)的屏障(Barrier)桥氏。
主要的作用是:可以讓一組線程到達一個屏障(也可以叫同步點)時被阻塞温峭,直到這組線程中的最后一個線程到達屏障時,屏障才會被移除字支,然后就可以使得所有被屏障攔截的線程能夠繼續(xù)往下干活凤藏。我們暫且把這個狀態(tài)就叫做barrier奸忽,當(dāng)調(diào)用await()方法之后,線程就處于barrier了揖庄。
二栗菜、應(yīng)用場景
CyclicBarrier可以用于多線程計算數(shù)據(jù),最后合并計算結(jié)果的應(yīng)用場景蹄梢。比如我們用一個Excel文件保存了用戶的所有銀行流水疙筹,然后每個sheet工作簿保存一個賬戶最近一年的每筆流水記錄,現(xiàn)在需要統(tǒng)計用戶的日均銀行流水禁炒,那么就可以使用多線程處理每個sheet工作簿里的銀行流水而咆,全部執(zhí)行完之后,得到每個sheet的日均銀行流水幕袱,最后翘盖,再一起統(tǒng)計這些線程的計算結(jié)果,從而最終計算出整個Excel的日均銀行流水凹蜂。
三馍驯、使用方法
3.1、默認構(gòu)造器
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
其中玛痊,parties這個參數(shù)表示屏障攔截的線程數(shù)量绍绘,每個線程調(diào)用await()方法告訴CyclicBarrier我已經(jīng)到達了屏障,然后當(dāng)前線程被阻塞牡借,而barrierAction這個參數(shù)的作用則是:當(dāng)這些線程都到達了屏障時酵使,可以額外做某些事情的,其本身是傳入了一個Runnable類型的參數(shù)对省,然后可以在run()方法中可以額外做一些事情蝗拿。
3.2、重要方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
通常比較常用蒿涎,可以用于掛起一組線程哀托,直至所有的線程都到達屏障了再同時執(zhí)行后續(xù)任務(wù)。
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
可以讓這些那些先到達屏障狀態(tài)的線程等待一定的時間劳秋,如果此時還有線程沒有到達屏障則直接讓已經(jīng)到達屏障的線程繼續(xù)執(zhí)行后續(xù)任務(wù)仓手。
四、代碼實例
- 假若有若干個線程都要進行寫數(shù)據(jù)操作玻淑,并且只有所有線程都完成寫數(shù)據(jù)操作之后嗽冒,這些線程才能繼續(xù)做后面的事情,此時就可以利用CyclicBarrier了补履。
package com.feizi.java.concurrency.tool;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* Created by feizi on 2018/6/4.
*/
public class TestCyclicBarrier {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
for (int i = 0; i < N; i++){
new Writer(barrier).start();
}
}
static class Writer extends Thread{
private CyclicBarrier barrier;
public Writer(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
System.out.println("線程" + Thread.currentThread().getName() + "正在寫入數(shù)據(jù)...");
try {
//模擬寫入耗時操作
Thread.sleep(5000);
System.out.println("線程" + Thread.currentThread().getName() + "寫入數(shù)據(jù)完畢添坊,等待其他線程寫入完畢...");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("所有線程寫入完畢,繼續(xù)處理其他任務(wù)...");
}
}
}
控制臺輸出結(jié)果:
線程Thread-1正在寫入數(shù)據(jù)...
線程Thread-0正在寫入數(shù)據(jù)...
線程Thread-3正在寫入數(shù)據(jù)...
線程Thread-2正在寫入數(shù)據(jù)...
線程Thread-1寫入數(shù)據(jù)完畢箫锤,等待其他線程寫入完畢...
線程Thread-2寫入數(shù)據(jù)完畢贬蛙,等待其他線程寫入完畢...
線程Thread-3寫入數(shù)據(jù)完畢雨女,等待其他線程寫入完畢...
線程Thread-0寫入數(shù)據(jù)完畢,等待其他線程寫入完畢...
所有線程寫入完畢速客,繼續(xù)處理其他任務(wù)...
所有線程寫入完畢戚篙,繼續(xù)處理其他任務(wù)...
所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
所有線程寫入完畢溺职,繼續(xù)處理其他任務(wù)...
Process finished with exit code 0
從上面輸出結(jié)果可以看到岔擂,每個線程執(zhí)行完畢之后,都在等待其他線程執(zhí)行完畢浪耘,當(dāng)所有線程都執(zhí)行完畢之后乱灵,則他們就可以繼續(xù)進行后續(xù)的任務(wù)了。
- 如果想在所有線程都到達屏障后七冲,還需要進行一些額外的操作痛倚,那么這個時候可以使用上面說的第二個構(gòu)造函數(shù)了,傳入一個Runnable參數(shù)進行處理澜躺。
package com.feizi.java.concurrency.tool;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* Created by feizi on 2018/6/5.
*/
public class TestCyclicBarrierDemo {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N, new Runnable() {
@Override
public void run() {
System.out.println("當(dāng)前線程" + Thread.currentThread().getName() + "被翻牌了...");
}
});
for (int i = 0; i < N; i++){
new Writer(barrier).start();
}
}
static class Writer extends Thread{
private CyclicBarrier barrier;
public Writer(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
System.out.println("線程" + Thread.currentThread().getName() + "正在寫入數(shù)據(jù)...");
try {
//模擬耗時操作
Thread.sleep(5000);
System.out.println("線程" + Thread.currentThread().getName() + "寫入數(shù)據(jù)完畢...");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("所有線程寫入完畢蝉稳,繼續(xù)處理其他任務(wù)...");
}
}
}
控制臺輸出結(jié)果:
線程Thread-0正在寫入數(shù)據(jù)...
線程Thread-1正在寫入數(shù)據(jù)...
線程Thread-2正在寫入數(shù)據(jù)...
線程Thread-3正在寫入數(shù)據(jù)...
線程Thread-1寫入數(shù)據(jù)完畢...
線程Thread-2寫入數(shù)據(jù)完畢...
線程Thread-3寫入數(shù)據(jù)完畢...
線程Thread-0寫入數(shù)據(jù)完畢...
當(dāng)前線程Thread-0被翻牌了...
所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
所有線程寫入完畢掘鄙,繼續(xù)處理其他任務(wù)...
所有線程寫入完畢耘戚,繼續(xù)處理其他任務(wù)...
所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
Process finished with exit code 0
從上面的輸出結(jié)果我們可以看到操漠,當(dāng)4個線程都到達屏障后收津,CyclicBarrier會從這組線程中選擇一個線程(隨機選擇,跟CPU調(diào)度有關(guān)浊伙,誰被分配了時間片撞秋,就誰去執(zhí)行)去執(zhí)行Runnable中的內(nèi)容,也就是額外的操作了嚣鄙。我們看到上面選擇的是Thread-0
這個線程吻贿。
- 如果想讓那些先到達屏障狀態(tài)的線程等待一定的時間,然后如果此時還有線程沒有到達屏障則直接讓已經(jīng)到達屏障的線程繼續(xù)執(zhí)行后續(xù)任務(wù)拗慨。那么我們可以使用上面介紹的這個
public int await(long timeout, TimeUnit unit)
方法廓八。
package com.feizi.java.concurrency.tool;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Created by feizi on 2018/6/5.
*/
public class TestCyclicBarrierDemo1 {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
for (int i = 0; i < N; i++){
if(i < N -1){
new Writer(barrier).start();
}else {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Writer(barrier).start();
}
}
}
static class Writer extends Thread{
private CyclicBarrier barrier;
public Writer(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
System.out.println("線程 " + Thread.currentThread().getName() + "正在寫入數(shù)據(jù)...");
try {
//模擬耗時操作
Thread.sleep(1000);
System.out.println("線程 " + Thread.currentThread().getName() + "寫入數(shù)據(jù)完畢, 等待其他線程寫入完畢...");
try {
barrier.await(5000, TimeUnit.MILLISECONDS);
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ", 所有線程寫入完畢赵抢,繼續(xù)處理其他任務(wù)...");
}
}
}
控制臺輸出結(jié)果:
線程 Thread-0正在寫入數(shù)據(jù)...
線程 Thread-2正在寫入數(shù)據(jù)...
線程 Thread-1正在寫入數(shù)據(jù)...
線程 Thread-0寫入數(shù)據(jù)完畢, 等待其他線程寫入完畢...
線程 Thread-2寫入數(shù)據(jù)完畢声功, 等待其他線程寫入完畢...
線程 Thread-1寫入數(shù)據(jù)完畢烦却, 等待其他線程寫入完畢...
線程 Thread-3正在寫入數(shù)據(jù)...
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at com.feizi.java.concurrency.tool.TestCyclicBarrierDemo1$Writer.run(TestCyclicBarrierDemo1.java:49)
java.util.concurrent.TimeoutException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:257)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at com.feizi.java.concurrency.tool.TestCyclicBarrierDemo1$Writer.run(TestCyclicBarrierDemo1.java:49)
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:250)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at com.feizi.java.concurrency.tool.TestCyclicBarrierDemo1$Writer.run(TestCyclicBarrierDemo1.java:49)
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:207)
at java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:435)
at com.feizi.java.concurrency.tool.TestCyclicBarrierDemo1$Writer.run(TestCyclicBarrierDemo1.java:49)
線程 Thread-3寫入數(shù)據(jù)完畢, 等待其他線程寫入完畢...
Thread-2, 所有線程寫入完畢先巴,繼續(xù)處理其他任務(wù)...
Thread-0, 所有線程寫入完畢其爵,繼續(xù)處理其他任務(wù)...
Thread-1, 所有線程寫入完畢冒冬,繼續(xù)處理其他任務(wù)...
Thread-3, 所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
Process finished with exit code 0
從上面輸出結(jié)果摩渺,我們可以看到简烤,因為在main方法測試的時候,我們特意指定了讓最后一個線程延遲啟動摇幻,由于在前面三個線程都到達了屏障之后横侦,在等待了指定的時間之后發(fā)現(xiàn)第4個線程還沒有到達屏障,然后就會拋出異常而去繼續(xù)執(zhí)行后面的任務(wù)了绰姻。
五枉侧、CyclicBarrier和CountDownLatch的區(qū)別
CountDownLatch的計數(shù)器只能使用一次。而CyclicBarrier的計數(shù)器可以使用reset() 方法重置狂芋。所以CyclicBarrier能處理更為復(fù)雜的業(yè)務(wù)場景榨馁,比如如果計算發(fā)生錯誤,可以重置計數(shù)器帜矾,并讓線程們重新執(zhí)行一次翼虫。
CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得CyclicBarrier阻塞的線程數(shù)量屡萤。isBroken方法用來知道阻塞的線程是否被中斷珍剑。
除此之外,CyclicBarrier還可以進行重用灭衷,即重復(fù)使用:
package com.feizi.java.concurrency.tool;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* Created by feizi on 2018/6/5.
*/
public class TestCyclicBarrierDemo2 {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
for (int i = 0; i < N; i++){
new Writer(barrier).start();
}
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("CyclicBarrierz重用1...");
for (int i = 0; i < N; i++){
new Writer(barrier).start();
}
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("CyclicBarrierz重用2...");
for (int i = 0; i < N; i++){
new Writer(barrier).start();
}
}
static class Writer extends Thread{
private CyclicBarrier barrier;
public Writer(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
System.out.println("線程 " + Thread.currentThread().getName() + " 正在寫入數(shù)據(jù)...");
try {
Thread.sleep(5000);
System.out.println("線程 " + Thread.currentThread().getName() + " 寫入數(shù)據(jù)完畢次慢,等待其他線程寫入完畢...");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ", 所有線程寫入完畢翔曲,繼續(xù)處理其他任務(wù)...");
}
}
}
控制臺輸出結(jié)果:
線程 Thread-0 正在寫入數(shù)據(jù)...
線程 Thread-1 正在寫入數(shù)據(jù)...
線程 Thread-2 正在寫入數(shù)據(jù)...
線程 Thread-3 正在寫入數(shù)據(jù)...
線程 Thread-0 寫入數(shù)據(jù)完畢迫像,等待其他線程寫入完畢...
線程 Thread-3 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢...
線程 Thread-1 寫入數(shù)據(jù)完畢瞳遍,等待其他線程寫入完畢...
線程 Thread-2 寫入數(shù)據(jù)完畢闻妓,等待其他線程寫入完畢...
Thread-3, 所有線程寫入完畢掠械,繼續(xù)處理其他任務(wù)...
Thread-0由缆, 所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
Thread-2猾蒂, 所有線程寫入完畢均唉,繼續(xù)處理其他任務(wù)...
Thread-1, 所有線程寫入完畢肚菠,繼續(xù)處理其他任務(wù)...
CyclicBarrierz重用1...
線程 Thread-4 正在寫入數(shù)據(jù)...
線程 Thread-5 正在寫入數(shù)據(jù)...
線程 Thread-6 正在寫入數(shù)據(jù)...
線程 Thread-7 正在寫入數(shù)據(jù)...
線程 Thread-5 寫入數(shù)據(jù)完畢舔箭,等待其他線程寫入完畢...
線程 Thread-4 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢...
線程 Thread-6 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢...
線程 Thread-7 寫入數(shù)據(jù)完畢层扶,等待其他線程寫入完畢...
Thread-5箫章, 所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
Thread-6镜会, 所有線程寫入完畢檬寂,繼續(xù)處理其他任務(wù)...
Thread-4, 所有線程寫入完畢戳表,繼續(xù)處理其他任務(wù)...
Thread-7桶至, 所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
CyclicBarrierz重用2...
線程 Thread-8 正在寫入數(shù)據(jù)...
線程 Thread-9 正在寫入數(shù)據(jù)...
線程 Thread-10 正在寫入數(shù)據(jù)...
線程 Thread-11 正在寫入數(shù)據(jù)...
線程 Thread-9 寫入數(shù)據(jù)完畢扒袖,等待其他線程寫入完畢...
線程 Thread-10 寫入數(shù)據(jù)完畢塞茅,等待其他線程寫入完畢...
線程 Thread-8 寫入數(shù)據(jù)完畢,等待其他線程寫入完畢...
線程 Thread-11 寫入數(shù)據(jù)完畢季率,等待其他線程寫入完畢...
Thread-11野瘦, 所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
Thread-8飒泻, 所有線程寫入完畢鞭光,繼續(xù)處理其他任務(wù)...
Thread-10, 所有線程寫入完畢泞遗,繼續(xù)處理其他任務(wù)...
Thread-9惰许, 所有線程寫入完畢,繼續(xù)處理其他任務(wù)...
Process finished with exit code 0
從上面輸出結(jié)果史辙,我們可以看到汹买,在首次的4個線程穿過屏障之后,又可以繼續(xù)重新一輪的使用聊倔,而CountDownLatch則無法做到重復(fù)使用晦毙。