1.等待多線程完成的CountDownLatch
CountDownLatch允許一個(gè)或多個(gè)線程等待其他線程完成操作蝶糯。
假設(shè)有一個(gè)需求:需要解析一個(gè)Excel里多個(gè)sheet的數(shù)據(jù)费就,可以考慮使用多線程赐俗,每個(gè)線程解析一個(gè)sheet里的數(shù)據(jù)形纺,等到所有sheet解析完之后企软,程序需要提示解析完成拄丰。
使用join():
public class JoinCountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
Thread parser1 = new Thread(() -> System.out.println("parser1 finish"));
Thread parser2 = new Thread(() -> System.out.println("parser2 finish"));
parser1.start();
parser2.start();
parser1.join();
parser2.join();
System.out.println("all parser finish");
}
}
使用CountDownLatch:
public class CountDownLatchTest {
static CountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(1);
c.countDown();
System.out.println(2);
c.countDown();
}
}).start();
c.await();
System.out.println("3");
}
}
CountDownLatch的構(gòu)造函數(shù)接收一個(gè)int類型的參數(shù)作為計(jì)數(shù)器现斋,如果你想等待N個(gè)點(diǎn)完成喜最,這里就傳入N。
當(dāng)調(diào)用CountDownLatch的countDown方法時(shí)庄蹋,N就會(huì)減1瞬内,CountDownLatch的await方法會(huì)阻塞當(dāng)前線程,直到N變成零限书。這里所說的N個(gè)點(diǎn)虫蝶,可以是N個(gè)線程,也可以是1個(gè)線程里的N個(gè)執(zhí)行步驟倦西。
注意:計(jì)數(shù)器必須大于等于0能真,只是等于0時(shí),調(diào)用await方法時(shí)不會(huì)阻塞當(dāng)前線程扰柠。CountDownLatch不能重新初始化或修改對(duì)象的內(nèi)部計(jì)數(shù)器的值粉铐。
2.同步屏障CyclicBarrier
CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是卤档,讓一組線程到達(dá)一個(gè)屏障時(shí)被阻塞蝙泼,知道最后一個(gè)線程到達(dá)屏障時(shí),屏障才會(huì)開門劝枣,所有被屏障攔截的線程才會(huì)繼續(xù)運(yùn)行汤踏。
①CyclicBarrier簡(jiǎn)介
默認(rèn)構(gòu)造方法CyclicBarrier(int parties),parties標(biāo)識(shí)屏障攔截的線程數(shù)量舔腾,每個(gè)線程調(diào)用await方法告訴CyclicBarrier它已經(jīng)到達(dá)了屏障溪胶,然后當(dāng)前線程被阻塞。
public class CynlicBarrierTest {
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}
}).start();
try{
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
}
輸出結(jié)果:
1
2
或者
2
1
如果把 new CyclicBarrier(2) 改成 new CyclicBarrier(3) 稳诚,則主線程和子線程會(huì)永遠(yuǎn)等待哗脖,因?yàn)闆]有第三個(gè)線程執(zhí)行await方法。
如果起了三個(gè)線程調(diào)用await,但是new CyclicBarrier(2)懒熙,依然會(huì)永遠(yuǎn)等待丘损,因?yàn)楫?dāng)CyclicBarrier的count為0時(shí),會(huì)重置為parties工扎。
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
另一個(gè)構(gòu)造器:CyclicBarrier(int parties, Runnable barrierAction)徘钥,用于在線程到達(dá)屏障時(shí),優(yōu)先執(zhí)行barrierAction肢娘,方便更負(fù)責(zé)的業(yè)務(wù)場(chǎng)景:
public class CynlicBarrierTest2 {
static CyclicBarrier c = new CyclicBarrier(2, new A());
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}
}).start();
try{
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
static class A implements Runnable {
@Override
public void run() {
System.out.println(3);
}
}
}
輸出結(jié)果:
3
2
1
或者
3
1
2
②CyclicBarrier的應(yīng)用場(chǎng)景
CyclicBarrier可用于多線程計(jì)算數(shù)據(jù)呈础,最后合并計(jì)算結(jié)果的場(chǎng)景。
如:用一個(gè)Excel保存了用戶所有銀行流水橱健,每個(gè)sheet薄脆一個(gè)賬戶近一年的每筆銀行流水而钞,現(xiàn)在需要統(tǒng)計(jì)用戶的日均銀行流水,最后再用barrierAction用這些線程的計(jì)算結(jié)果拘荡,計(jì)算出整個(gè)Excel的日均銀行流水臼节。
/**
* 銀行流水處理服務(wù)類
*/
public class BankWaterService implements Runnable {
/**
* 創(chuàng)建4個(gè)屏障,處理完之后執(zhí)行當(dāng)前類的run方法
*/
private CyclicBarrier c = new CyclicBarrier(4, this);
/**
* 假設(shè)只有4個(gè)sheet珊皿,所以只啟動(dòng)4個(gè)線程
*/
private Executor executor = Executors.newFixedThreadPool(4);
/**
* 保存每個(gè)sheet計(jì)算出的銀行流水結(jié)果
*/
private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
private void count() {
for (int i = 0; i < 4; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
//計(jì)算當(dāng)前sheet的銀行流水?dāng)?shù)據(jù)网缝,計(jì)算代碼省略
sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
//銀行流水計(jì)算完成插入一個(gè)屏障
try {
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
@Override
public void run() {
int result = 0;
//匯總每個(gè)sheet計(jì)算出的結(jié)果
for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
result += sheet.getValue();
}
//將結(jié)果輸出
sheetBankWaterCount.put("result", result);
System.out.println(result);
}
public static void main(String[] args) {
BankWaterService bankWaterCount = new BankWaterService();
bankWaterCount.count();
}
}
輸出結(jié)果
4
③CyclicBarrier和CountDownLatch的區(qū)別
CountDownLatch的計(jì)數(shù)器只能使用一次,而CyclicBarrier的計(jì)數(shù)器考科一使用reset方法重置蟋定。
CyclicBarrier還提供其他有用的方法粉臊,比如:
- getNumberWaiting():可以獲取CyclicBarrier阻塞的線程數(shù)。
- isBroken():用來了解阻塞的線程是否被中斷驶兜。
public class CynlicBarrierTest3 {
static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
}
}
});
thread.start();
thread.interrupt();
try{
c.await();
} catch (Exception e) {
System.out.println(c.isBroken());
}
}
}
輸出:
true
3.控制并發(fā)線程數(shù)的Semaphore
Semaphore(信號(hào)量)是用來控制同時(shí)訪問特定資源的線程數(shù)量扼仲,它通過協(xié)調(diào)各個(gè)線程,以保障合理的使用公共資源抄淑。
①應(yīng)用場(chǎng)景
Semaphore可以用于做流量控制屠凶,特別是共用資源有限的應(yīng)用場(chǎng)景,比如數(shù)據(jù)庫連接肆资。
如:讀取幾萬個(gè)文件的數(shù)據(jù)矗愧,因?yàn)槎际荌O密集型人物,我們可以啟動(dòng)幾十個(gè)線程并發(fā)地讀取迅耘,但是如果讀到內(nèi)存后贱枣,還需要存儲(chǔ)到數(shù)據(jù)庫中监署,而數(shù)據(jù)庫的連接數(shù)只有10個(gè)颤专,這時(shí)我們必須控制只有10個(gè)線程同時(shí)獲取數(shù)據(jù)庫連接保存數(shù)據(jù),否則會(huì)報(bào)錯(cuò)無法獲取數(shù)據(jù)庫連接钠乏。
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();//獲取一個(gè)許可證
System.out.println("save data");
s.release();//歸還許可證
} catch (InterruptedException e) {
}
}
});
}
threadPool.shutdown();
}
}
代碼中栖秕,雖然有30個(gè)線程在執(zhí)行,但是只允許10個(gè)并發(fā)執(zhí)行晓避。
構(gòu)造方法:Semaphore(int permits)簇捍,permits表示可用的許可證數(shù)量只壳。
②其他方法
- boolean tryAcquire():嘗試獲取許可證。
- int availablePermits():返回此信號(hào)量中當(dāng)前可用的許可證數(shù)暑塑。
- int getQueueLength():返回正在等待獲取許可證的線程數(shù)吼句。
- boolean hasQueuedThreads():是否有線程正在等待獲取許可證。
- void reducePermits(int reduction):減少reduction個(gè)許可證事格,是個(gè)protected方法惕艳。
- Collection<Thread> getQueuedThreads():返回所有等待獲取許可證的線程集合,是個(gè)protected方法驹愚。
4.線程間交換數(shù)據(jù)的Exchanger
Exchanger(交換者)是一個(gè)用于線程間協(xié)作的工具類远搪。用于進(jìn)行線程間的數(shù)據(jù)交換。它提供一個(gè)同步點(diǎn)逢捺,在這個(gè)同步點(diǎn)谁鳍,兩個(gè)線程可以交換彼此的數(shù)據(jù)。
這兩個(gè)線程通過exchange方法交換數(shù)據(jù)劫瞳,如果第一個(gè)線程先執(zhí)行exchange方法倘潜,它會(huì)一直等待第二個(gè)線程也執(zhí)行exchange方法,當(dāng)兩個(gè)線程都到達(dá)同步點(diǎn)時(shí)柠新,這兩個(gè)線程可以交換數(shù)據(jù)窍荧,將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對(duì)方。
public class ExchangerTest {
private static final Exchanger<String> exgr = new Exchanger<>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String a = "銀行流水A";//A錄入銀行流水?dāng)?shù)據(jù)
String exchange = exgr.exchange(a);
System.out.println(exchange);
} catch (InterruptedException e) {
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String b = "銀行流水B";//B錄入銀行流水?dāng)?shù)據(jù)
String a = exgr.exchange(b);
System.out.println("A和B數(shù)據(jù)是否一致:" + a.equals(b) + "恨憎,A錄入的是:" + a + "蕊退,B錄入的是:" + b);
} catch (InterruptedException e) {
}
}
});
threadPool.shutdown();
}
}
如果擔(dān)心有特殊情況發(fā)生,避免一直等待憔恳,可以使用exchange(V x, long timeout, TimeUnit unit)設(shè)置最大等待時(shí)長瓤荔。