目錄
[TOC]
Java并發(fā)包提供了三個類迁杨,處理并發(fā)線程:CountdownLatch、Semaphore凄硼、CyclicBarrier铅协。
1. CountDownLatch
1.1 描述
中文含義"
門栓
",當(dāng)指定數(shù)量的線程全部執(zhí)行完畢后摊沉,主線程才繼續(xù)向后執(zhí)行
狐史。countDownLatch是一個一次性計數(shù)器,初始化需設(shè)置一個計數(shù)器的值说墨。
調(diào)用
await
方法可阻塞當(dāng)前線程骏全。調(diào)用一次
countDown
方法,計數(shù)器減1婉刀,當(dāng)計數(shù)器減到0后吟温,await方法阻塞的線程,繼續(xù)向下執(zhí)行突颊。
1.2 業(yè)務(wù)場景
某個業(yè)務(wù)場景鲁豪,程序需要分成很多線程一起執(zhí)行,當(dāng)全部子任務(wù)執(zhí)行完成后(每個任務(wù)執(zhí)行完律秃,執(zhí)行一次countDown爬橡,讓計數(shù)器減1),再繼續(xù)執(zhí)行主程序后續(xù)的操作棒动。
如并行計算糙申,處理的數(shù)據(jù)量很大,將任務(wù)拆分成多個子任務(wù)船惨,等所有子任務(wù)都完成后柜裸,父任務(wù)再拿到所有自認為的運算結(jié)果進行匯總。
1.3 代碼
public static void main(String[] args) throws InterruptedException {
AtomicInteger result=new AtomicInteger(0);
int count=5;
CountDownLatch countDownLatch=new CountDownLatch(5);
for(int i=0;i<count;i++){
new Thread(() -> {
try {
//業(yè)務(wù)邏輯...
result.addAndGet(10);
System.out.println(Thread.currentThread());
}catch (Exception e){
e.printStackTrace();
}finally {
//保證計數(shù)器能順利減1粱锐,否則父線程將一直阻塞
countDownLatch.countDown();
}
}).start();
}
//阻塞疙挺,等待計數(shù)器減為0
countDownLatch.await();
System.out.println("執(zhí)行結(jié)果:"+result);
}
執(zhí)行結(jié)果:
Thread[Thread-0,5,main]
Thread[Thread-1,5,main]
Thread[Thread-2,5,main]
Thread[Thread-3,5,main]
Thread[Thread-4,5,main]
執(zhí)行結(jié)果:50
2. CyclicBarrier
2.1 說明
- 中文含義"
柵欄
",多個線程互相等待怜浅,只要有一個線程沒完成(沒到屏障點)铐然,所有線程都等待
。 - 調(diào)用await方法的線程告訴CyclicBarrier自己已經(jīng)到達同步點恶座,然后當(dāng)前線程被阻塞搀暑。直到parties個參與線程調(diào)用了await方法,則全部阻塞的線程一起執(zhí)行阻塞后面的代碼跨琳。
- 父線程不會阻塞自点。
核心方法:
- 等待:await()
2.2 應(yīng)用
2.3 代碼
public class CyclicBarrierTest {
/**
* 執(zhí)行數(shù)量5,配置執(zhí)行后回調(diào)線程
*/
public static CyclicBarrier cyclicBarrier=new CyclicBarrier(5,() -> {
System.out.println("----------滿足要求 call back thread----------");
});
public static void main(String[] args) throws Exception{
for(int i=0;i<5;i++){
new Thread(()->{
try {
System.out.println("進入線程:"+Thread.currentThread());
Thread.sleep(Long.valueOf(new Random().nextInt(3000)));
System.out.println("線程前置方法執(zhí)行結(jié)束:"+Thread.currentThread());
//await會導(dǎo)致當(dāng)前線程阻塞脉让,當(dāng)調(diào)用次數(shù)達到設(shè)置的值時桂敛,所有阻塞的線程將全部執(zhí)行
cyclicBarrier.await();
System.out.println("滿足條件冈绊,離開線程:"+Thread.currentThread());
}catch (Exception e){
e.printStackTrace();
}
}).start();
}
//父線程并不會阻塞
System.out.println("主線程結(jié)束");
}
}
執(zhí)行結(jié)果:
主線程結(jié)束
進入線程:Thread[Thread-3,5,main]
進入線程:Thread[Thread-0,5,main]
進入線程:Thread[Thread-2,5,main]
進入線程:Thread[Thread-1,5,main]
進入線程:Thread[Thread-4,5,main]
線程前置方法執(zhí)行結(jié)束:Thread[Thread-1,5,main]
線程前置方法執(zhí)行結(jié)束:Thread[Thread-4,5,main]
線程前置方法執(zhí)行結(jié)束:Thread[Thread-0,5,main]
線程前置方法執(zhí)行結(jié)束:Thread[Thread-3,5,main]
線程前置方法執(zhí)行結(jié)束:Thread[Thread-2,5,main]
----------滿足要求 call back thread----------
滿足條件,離開線程:Thread[Thread-2,5,main]
滿足條件埠啃,離開線程:Thread[Thread-1,5,main]
滿足條件,離開線程:Thread[Thread-4,5,main]
滿足條件伟恶,離開線程:Thread[Thread-0,5,main]
滿足條件碴开,離開線程:Thread[Thread-3,5,main]
3. Semaphore
3.1 說明
中文含義”
信號量
“,保證同一批并發(fā)線程博秫,最多同時執(zhí)行的線程數(shù)潦牛,防止同時執(zhí)行的線程過多
。限制
最多
可以有幾個線程進來挡育,超過最大線程數(shù)巴碗,新進來的線程可以選擇等待
,或者離開即寒,如執(zhí)行中的線程有結(jié)束的橡淆,則等待中的線程會自動
加入到執(zhí)行隊列中,執(zhí)行后續(xù)操作母赵。可一次獲取多個許可
核心方法:
獲取權(quán)限:acquire()
釋放權(quán)限:release()
3.2 應(yīng)用
特別適用于要求一個并發(fā)任務(wù)最多有幾個線程
執(zhí)行的情況逸爵。
3.3 代碼
3.1.1 獲取一個許可
public static void main(String[] args) {
//最多允許2個線程執(zhí)行
Semaphore semaphore=new Semaphore(2);
for(int i=0;i<3;i++){
new Thread(()->{
try {
//獲取許可
semaphore.acquire();
//執(zhí)行業(yè)務(wù)操作...
System.out.println(Thread.currentThread()+"----拿到許可"+LocalTime.now());
Thread.sleep(Long.valueOf(new Random().nextInt(3000)));
//釋放許可
semaphore.release();
System.out.println(Thread.currentThread()+"----釋放許可"+LocalTime.now());
}catch (Exception e){
e.printStackTrace();
}
}).start();
}
System.out.println("執(zhí)行結(jié)束");
}
執(zhí)行結(jié)果:
執(zhí)行結(jié)束
Thread[Thread-0,5,main]----拿到許可13:12:49.252
Thread[Thread-1,5,main]----拿到許可13:12:49.252
Thread[Thread-1,5,main]----釋放許可13:12:49.906
Thread[Thread-2,5,main]----拿到許可13:12:49.906
Thread[Thread-2,5,main]----釋放許可13:12:49.980
Thread[Thread-0,5,main]----釋放許可13:12:51.535
3.1.2 獲取多個許可
public static void main(String[] args) {
//最多允許4個線程執(zhí)行
Semaphore semaphore=new Semaphore(4);
for(int i=0;i<3;i++){
new Thread(()->{
try {
//獲取兩個許可
semaphore.acquire(2);
//執(zhí)行業(yè)務(wù)操作...
System.out.println(Thread.currentThread()+"----拿到許可"+LocalTime.now());
Thread.sleep(Long.valueOf(new Random().nextInt(3000)));
//釋放兩個許可
semaphore.release(2);
System.out.println(Thread.currentThread()+"----釋放許可"+LocalTime.now());
}catch (Exception e){
e.printStackTrace();
}
}).start();
}
System.out.println("執(zhí)行結(jié)束");
}
執(zhí)行結(jié)果:
執(zhí)行結(jié)束
Thread[Thread-1,5,main]----拿到許可13:10:32.917
Thread[Thread-0,5,main]----拿到許可13:10:32.917
Thread[Thread-1,5,main]----釋放許可13:10:33.208
Thread[Thread-2,5,main]----拿到許可13:10:33.208
Thread[Thread-2,5,main]----釋放許可13:10:35.031
Thread[Thread-0,5,main]----釋放許可13:10:35.491
3.1.3 嘗試獲取許可
public static void main(String[] args) {
//最多允許2個線程執(zhí)行
Semaphore semaphore=new Semaphore(2);
for(int i=0;i<5;i++){
new Thread(()->{
try {
//獲取許可
if(semaphore.tryAcquire()){
//執(zhí)行業(yè)務(wù)操作...
System.out.println(Thread.currentThread()+"----拿到許可"+LocalTime.now());
Thread.sleep(100);
//釋放許可
semaphore.release();
System.out.println(Thread.currentThread()+"----釋放許可"+LocalTime.now());
}else{
System.out.println(Thread.currentThread()+"----"+"未獲取許可,放棄"+LocalTime.now());
}
}catch (Exception e){
e.printStackTrace();
}
}).start();
}
System.out.println("執(zhí)行結(jié)束");
}
執(zhí)行結(jié)果:
執(zhí)行結(jié)束
Thread[Thread-4,5,main]----未獲取許可凹嘲,放棄13:22:01.626
Thread[Thread-2,5,main]----未獲取許可师倔,放棄13:22:01.626
Thread[Thread-1,5,main]----拿到許可13:22:01.626
Thread[Thread-3,5,main]----未獲取許可,放棄13:22:01.626
Thread[Thread-0,5,main]----拿到許可13:22:01.626
Thread[Thread-0,5,main]----釋放許可13:22:01.727
Thread[Thread-1,5,main]----釋放許可13:22:01.727
3.1.4 指定時間內(nèi)嘗試獲取許可
public static void main(String[] args) {
//最多允許2個線程執(zhí)行
Semaphore semaphore=new Semaphore(2);
for(int i=0;i<5;i++){
new Thread(()->{
try {
//獲取許可
if(semaphore.tryAcquire(1, TimeUnit.SECONDS)){
//執(zhí)行業(yè)務(wù)操作...
System.out.println(Thread.currentThread()+"----"+LocalTime.now()+"----拿到許可");
Thread.sleep(Long.valueOf(new Random().nextInt(3000)));
//釋放許可
semaphore.release();
System.out.println(Thread.currentThread()+"----"+LocalTime.now()+"----釋放許可");
}else{
System.out.println(Thread.currentThread()+"----"+LocalTime.now()+"----1秒內(nèi)未獲取許可周蹭,放棄");
}
}catch (Exception e){
e.printStackTrace();
}
}).start();
}
System.out.println("執(zhí)行結(jié)束");
}
執(zhí)行結(jié)果:
執(zhí)行結(jié)束
Thread[Thread-3,5,main]----13:24:29.752----拿到許可
Thread[Thread-2,5,main]----13:24:29.752----拿到許可
Thread[Thread-2,5,main]----13:24:30.088----釋放許可
Thread[Thread-0,5,main]----13:24:30.088----拿到許可
Thread[Thread-4,5,main]----13:24:30.732----1秒內(nèi)未獲取許可趋艘,放棄
Thread[Thread-1,5,main]----13:24:30.733----1秒內(nèi)未獲取許可,放棄
Thread[Thread-3,5,main]----13:24:31.929----釋放許可
Thread[Thread-0,5,main]----13:24:32.340----釋放許可
4. 補充(引用其他網(wǎng)友)
-
CountDownLatch和CyclicBarrier區(qū)別:
CountDownLatch和CyclicBarrier都有讓多個線程等待同步然后再開始下一步動作的意思凶朗,但是CountDownLatch的下一步的動作實施者是主線程瓷胧,具有不可重復(fù)性;
而CyclicBarrier的下一步動作實施者還是“其他線程”本身