CyclicBarrier 是什么剖效?
讓一組線程到達(dá)一個(gè)屏障后被阻塞寺董,直到最后一個(gè)線程到達(dá)屏障時(shí)抄淑,屏障才會(huì)“開門”,所有被屏障阻塞的線程繼續(xù)執(zhí)行馋记。
CyclicBarrier 構(gòu)造
CyclicBarrier(int parties)
parties表示屏障攔截的線程數(shù)量号坡,當(dāng)線程調(diào)用await()方法就是告訴CyclicBarrier我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞梯醒。
CyclicBarrier(int parties,Runnable barrierAction)
用于在線程到達(dá)屏障時(shí)宽堆,優(yōu)先執(zhí)行barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場(chǎng)景茸习。
CyclicBarrier 方法
類型 | 方法 | 描述 |
---|---|---|
int | await() | 等待直到parties個(gè)線程都調(diào)用了await() |
int | await(long timeout, TimeUnit unit) | 等待直到parties個(gè)線程都調(diào)用了await() 或者過了超時(shí)時(shí)間 |
int | getNumberWaiting() | 獲取CyclicBarrier當(dāng)前在等待的線程數(shù)量 |
int | getParties() | 獲取CyclicBarrier攔截線程數(shù)量 |
boolean | isBroken() | 獲取阻塞的線程是否被中斷 |
int | reset() | 將屏障設(shè)置為初始狀態(tài) |
CyclicBarrier(int parties)例子
public class CyclicBarrierTest {
//定義一個(gè)屏障畜隶,設(shè)置攔截兩個(gè)線程
static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
//通知CyclicBarrier我已經(jīng)到達(dá)屏障,線程阻塞
cyclicBarrier.await();
System.out.println("1");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
System.out.println("2");
new Thread(new Runnable() {
@Override
public void run() {
try {
//通知CyclicBarrier我已經(jīng)到達(dá)屏障号胚,線程阻塞
cyclicBarrier.await();
System.out.println("3");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
輸出結(jié)果
2
3
1
因?yàn)槲覀兌x的CyclicBarrier是攔截兩個(gè)線程籽慢,所以第一個(gè)線程執(zhí)行了await()后開始阻塞,然后繼續(xù)執(zhí)行猫胁,輸出“2”箱亿,當(dāng)?shù)诙€(gè)線程執(zhí)行了await()方法后,第二個(gè)線程達(dá)到屏障弃秆,屏障打開“大門”届惋,兩個(gè)線程繼續(xù)執(zhí)行,輸出“3”和“1”菠赚,也有一種可能是先輸出“1”然后輸出“3”脑豹,這是因?yàn)橹骶€程和子線程的調(diào)度是由CPU決定的,都有可能先執(zhí)行衡查。
CyclicBarrier(int parties,Runnable barrierAction)例子
public class CyclicBarrierTest2 {
static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new A());
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
cyclicBarrier.await();
System.out.println("1");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
System.out.println("2");
new Thread(new Runnable() {
@Override
public void run() {
try {
cyclicBarrier.await();
System.out.println("3");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
static class A implements Runnable {
@Override
public void run() {
System.out.println("4");
}
}
}
結(jié)果
2
4
3
1
根據(jù)上面的例子稍微進(jìn)行改造瘩欺,當(dāng)?shù)诙€(gè)線程達(dá)到屏障后,優(yōu)先執(zhí)行了A峡捡,然后阻塞線程才繼續(xù)執(zhí)行击碗。
CyclicBarrier應(yīng)用場(chǎng)景
可以用于多線程計(jì)算數(shù)據(jù),最后合并計(jì)算結(jié)果的場(chǎng)景们拙。
例子
創(chuàng)建一個(gè)屏障稍途,設(shè)置攔截線程數(shù)為10
假設(shè)每個(gè)線程計(jì)算結(jié)果返回1
最終十個(gè)線程計(jì)算結(jié)果相加得到的結(jié)果應(yīng)該為10
package com.sy.thread.example;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
/**
* Description: thread
* @author songyu
*/
public class CyclicBarrierTest3 implements Runnable{
/**
* 創(chuàng)建屏障
*/
private CyclicBarrier cyclicBarrier = new CyclicBarrier(10,this::run);
/**
* 保存每個(gè)線程執(zhí)行的結(jié)果
*/
private ConcurrentHashMap<String,Integer> concurrentHashMap = new ConcurrentHashMap<>();
private void calculationData() {
//使用線程不規(guī)范,實(shí)際使用時(shí)可以使用ThreadPoolExecutor
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
concurrentHashMap.put(Thread.currentThread().getName(),1);
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
@Override
public void run() {
int result = 0;
for (Map.Entry<String, Integer> map : concurrentHashMap.entrySet()) {
result = result + map.getValue();
}
System.out.println("最終計(jì)算結(jié)果:" + result);
}
public static void main(String[] args) {
CyclicBarrierTest3 c = new CyclicBarrierTest3();
c.calculationData();
}
}
結(jié)果
最終計(jì)算結(jié)果:10