多線程協(xié)作除了上一篇中講到的簡單的生產(chǎn)者消費者模型的幾種實現(xiàn)愉择,jdk還提供了一些其他api劫乱,實現(xiàn)線程間協(xié)作的模型:CountDownLatch用于倒計數(shù)柵欄模型,一個線程等待其他多個線程就緒后再繼續(xù)執(zhí)行锥涕;CyclicBarrier用于循環(huán)路障模型衷戈,可重置循環(huán)使用的倒計數(shù)柵欄;Semaphore用于信號量模型层坠,釋放一定總量的信號量殖妇,每個線程需要獲取一個信號量才能執(zhí)行,從而進行限流
CountDownLatch:
倒計數(shù)柵欄模型的使用場景
- 場景一:計數(shù)設(shè)為1破花,啟動多個線程await谦趣,等待主線程發(fā)送countdown信號后同步開始執(zhí)行任務(wù),場景類似跑步比賽座每,運動員都就緒等待發(fā)令槍響前鹅。可以用于簡單的并發(fā)測試
- 場景二:計數(shù)設(shè)為n尺栖,將一個任務(wù)劃分為n個線程進行同時處理嫡纠,統(tǒng)計線程await等待所有n個線程執(zhí)行完畢countdown,計數(shù)歸零后完成統(tǒng)計工作延赌。
- 場景三:結(jié)合前兩者除盏,設(shè)置兩個CountDownLatch,一個計數(shù)為1的啟動信號的latch挫以,一個計數(shù)為n的完成信號的latch者蠕,一個大任務(wù),用n個線程執(zhí)行掐松,每個線程都等待統(tǒng)籌線程將啟動信號計數(shù)為1的latch計數(shù)countdown歸零后執(zhí)行任務(wù)踱侣。執(zhí)行完成后每個線程countdown,將完成信號的n個計數(shù)的lantch點計數(shù)歸零后統(tǒng)籌線程進行統(tǒng)計大磺。
- 下面的代碼示例是演示場景二中CountDownLatch的使用
@Slf4j
public class CountDownLatchDemo {
public static void main(String[] args) throws Exception{
// 定義需要等待的倒計時個數(shù)
CountDownLatch latch=new CountDownLatch(10);
Random random=new Random();
// 推薦的線程池的定義方法
ExecutorService executor=new ThreadPoolExecutor(10, 15, 60L, TimeUnit.SECONDS,
// 定義你的等待隊列大小
new LinkedBlockingDeque<>(50),
// 定義你的線程生成方法
new ThreadFactory() {
private AtomicInteger threadNum=new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"custom work thread "+threadNum.addAndGet(1));
}
},
(r,e)->{
// 定義你的拒絕策略
String message="Task " + r.toString() + " rejected from " + e.toString();
log.error(message);
throw new RejectedExecutionException(message);
});
for(int i=0;i<10;i++){
int j=i;
executor.execute(()->{
try {
// 這里用于自己的業(yè)務(wù)實現(xiàn)抡句,如果需要使用共享變量,注意使用線程安全的api或者同步鎖
Thread.sleep(random.nextInt(2000));
log.debug("the {} is ready,i is {}",Thread.currentThread().getName(),j);
// 執(zhí)行完即完成一個倒數(shù)計時
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.execute(()->{
try {
long t1=System.currentTimeMillis();
log.debug("the thread {} is awaiting !",Thread.currentThread().getName());
latch.await();
log.debug("the thread {} is running ,waiting time is {} ms !",Thread.currentThread().getName(),System.currentTimeMillis()-t1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long t1=System.currentTimeMillis();
log.debug("main thread is waiting !");
// 等齊了或者倒計時到后繼續(xù)
latch.await(2000L,TimeUnit.SECONDS);
log.debug("main thread is end ,waiting time is {} ms !",System.currentTimeMillis()-t1);
executor.shutdown();
}
}
debug信息如下:
10:59:17.952 [main] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - main thread is waiting !
10:59:18.091 [custom work thread 3] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 3 is ready,i is 2
10:59:18.094 [custom work thread 3] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the thread custom work thread 3 is awaiting !
10:59:18.179 [custom work thread 8] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 8 is ready,i is 7
10:59:18.282 [custom work thread 1] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 1 is ready,i is 0
10:59:18.599 [custom work thread 5] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 5 is ready,i is 4
10:59:19.096 [custom work thread 9] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 9 is ready,i is 8
10:59:19.246 [custom work thread 6] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 6 is ready,i is 5
10:59:19.540 [custom work thread 10] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 10 is ready,i is 9
10:59:19.574 [custom work thread 2] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 2 is ready,i is 1
10:59:19.670 [custom work thread 4] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 4 is ready,i is 3
10:59:19.941 [custom work thread 7] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the custom work thread 7 is ready,i is 6
10:59:19.941 [main] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - main thread is end ,waiting time is 1990 ms !
10:59:19.941 [custom work thread 3] DEBUG com.dz.demo.multiThread.CountDownLatchDemo - the thread custom work thread 3 is running ,waiting time is 1847 ms !
CyclicBarrier:
循環(huán)路障模型的使用場景
- 計數(shù)設(shè)為n的可循環(huán)使用的路障杠愧,用于多個線程調(diào)用await待榔,n個線程都達到await之后路障計數(shù)歸零,線程一起同步繼續(xù),同時該路障被重置為n锐锣,可以繼續(xù)循環(huán)使用腌闯。路障還可以設(shè)置一個執(zhí)行器,在路障計數(shù)歸零時被觸發(fā)雕憔。用于比如姿骏,將一個大任務(wù)化為m個子任務(wù),用n個線程執(zhí)行斤彼,當(dāng)每執(zhí)行完n個子任務(wù)后分瘦,觸發(fā)一次統(tǒng)計任務(wù),同時開啟下一批n個子任務(wù)的執(zhí)行畅卓;
- 下面的代碼示例即同時執(zhí)行兩個任務(wù)擅腰,每次兩個都執(zhí)行完再啟動下一輪兩個任務(wù)。
@Slf4j
public class CyclicBarrierDemo {
public static void main(String[] args) {
AtomicInteger num=new AtomicInteger(0);
CyclicBarrier cyclicBarrier=new CyclicBarrier(2,()->{
num.addAndGet(1);
log.debug("cyclicBarrier arrived,both ready to run,time is {}",num.get());
});
// 演示demo可以用簡單的新線程池
ExecutorService executorService= Executors.newFixedThreadPool(2);
executorService.execute(()->{
int i=0;
while (i<4) {
try {
log.debug("job 1 begin to start!");
cyclicBarrier.await();
log.debug("cyclicBarrier arrived! job 1 is running to start!");
i++;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
executorService.execute(()->{
int i=0;
while (i<4) {
try {
log.debug("job 2 begin to start!");
cyclicBarrier.await();
log.debug("cyclicBarrier arrived! job 2 is running to start!");
i++;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
}
debug信息如下:
11:56:29.438 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 1 begin to start!
11:56:29.438 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 2 begin to start!
11:56:29.440 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived,both ready to run,time is 1
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 2 is running to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 1 is running to start!
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 2 begin to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 1 begin to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived,both ready to run,time is 2
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 1 is running to start!
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 2 is running to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 1 begin to start!
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 2 begin to start!
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived,both ready to run,time is 3
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 2 is running to start!
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 2 begin to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 1 is running to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - job 1 begin to start!
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived,both ready to run,time is 4
11:56:29.441 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 1 is running to start!
11:56:29.441 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.CyclicBarrierDemo - cyclicBarrier arrived! job 2 is running to start!
Semaphore
信號量模型的使用場景
- 計數(shù)為n的Semaphore翁潘,即持有n個許可趁冈,線程可應(yīng)通過該信號量的acquire或者tryAcquire來獲取許可,獲取許可后才能使用或者獲取資源拜马,使用完后通過release釋放許可渗勘。該模型的設(shè)計即是為了實現(xiàn)對物理或者邏輯資源的獲取進行限流。同時最多n個線程持有該資源的許可俩莽。當(dāng)計數(shù)設(shè)置為1旺坠,即有互斥鎖的效果,相比其他鎖扮超,優(yōu)勢是可以在其他線程進行release取刃,從而處理死鎖。
- 下面的代碼示例對于一個限定的資源出刷,使用Semaphore頒發(fā)許可璧疗,進行限流
@Slf4j
public class SemaphoreDemo {
public static void main(String[] args) {
ResourcePool resourcePool=new ResourcePool(5);
ExecutorService executorService= Executors.newFixedThreadPool(20);
for(int i=0;i<15;i++){
executorService.execute(()->{
String s=null;
try {
s=resourcePool.getResource();
log.debug("{} 線程 獲取資源 {}",Thread.currentThread().getName(),s);
Thread.sleep(1000l);
} catch (Exception e) {
e.printStackTrace();
}finally {
// 釋放資源確保在finnaly中
if(StringUtils.hasText(s)){
resourcePool.releaseResource(s);
}
}
});
}
executorService.shutdown();
}
}
@Slf4j
@Data
class ResourcePool{
private Semaphore semaphore;
private Queue<String> sourceQueue;
public ResourcePool(Integer n){
semaphore=new Semaphore(n);
sourceQueue=new LinkedList<>();
for(int i=0;i<n;i++){
sourceQueue.add(String.valueOf(i));
}
}
String getResource() throws Exception{
if(semaphore.tryAcquire(10L, TimeUnit.SECONDS)){
log.debug("成功獲取信號量");
return sourceQueue.poll();
}else {
return null;
}
}
void releaseResource(String s){
log.debug("釋放資源 {}",s);
semaphore.release();
sourceQueue.add(s);
}
}
打印如下:
13:20:20.970 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ResourcePool - 成功獲取信號量
13:20:20.970 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ResourcePool - 成功獲取信號量
13:20:20.970 [pool-1-thread-5] DEBUG com.dz.demo.multiThread.ResourcePool - 成功獲取信號量
13:20:20.970 [pool-1-thread-4] DEBUG com.dz.demo.multiThread.ResourcePool - 成功獲取信號量
13:20:20.970 [pool-1-thread-3] DEBUG com.dz.demo.multiThread.ResourcePool - 成功獲取信號量
13:20:20.972 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-1 線程 獲取資源 0
13:20:20.972 [pool-1-thread-4] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-4 線程 獲取資源 3
13:20:20.972 [pool-1-thread-3] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-3 線程 獲取資源 4
13:20:20.972 [pool-1-thread-5] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-5 線程 獲取資源 2
13:20:20.972 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-2 線程 獲取資源 1
13:20:21.976 [pool-1-thread-4] DEBUG com.dz.demo.multiThread.ResourcePool - 釋放資源 3
13:20:21.976 [pool-1-thread-3] DEBUG com.dz.demo.multiThread.ResourcePool - 釋放資源 4
13:20:21.976 [pool-1-thread-2] DEBUG com.dz.demo.multiThread.ResourcePool - 釋放資源 1
13:20:21.976 [pool-1-thread-1] DEBUG com.dz.demo.multiThread.ResourcePool - 釋放資源 0
13:20:21.976 [pool-1-thread-5] DEBUG com.dz.demo.multiThread.ResourcePool - 釋放資源 2
13:20:21.976 [pool-1-thread-6] DEBUG com.dz.demo.multiThread.ResourcePool - 成功獲取信號量
13:20:21.976 [pool-1-thread-7] DEBUG com.dz.demo.multiThread.ResourcePool - 成功獲取信號量
13:20:21.977 [pool-1-thread-6] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-6 線程 獲取資源 4
13:20:21.976 [pool-1-thread-8] DEBUG com.dz.demo.multiThread.ResourcePool - 成功獲取信號量
13:20:21.977 [pool-1-thread-7] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-7 線程 獲取資源 1
13:20:21.977 [pool-1-thread-8] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-8 線程 獲取資源 0
13:20:21.977 [pool-1-thread-9] DEBUG com.dz.demo.multiThread.ResourcePool - 成功獲取信號量
13:20:21.977 [pool-1-thread-10] DEBUG com.dz.demo.multiThread.ResourcePool - 成功獲取信號量
13:20:21.977 [pool-1-thread-9] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-9 線程 獲取資源 2
13:20:21.977 [pool-1-thread-10] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-10 線程 獲取資源 null
13:20:22.979 [pool-1-thread-6] DEBUG com.dz.demo.multiThread.ResourcePool - 釋放資源 4
13:20:22.979 [pool-1-thread-9] DEBUG com.dz.demo.multiThread.ResourcePool - 釋放資源 2
13:20:22.979 [pool-1-thread-8] DEBUG com.dz.demo.multiThread.ResourcePool - 釋放資源 0
13:20:22.979 [pool-1-thread-7] DEBUG com.dz.demo.multiThread.ResourcePool - 釋放資源 1
13:20:22.979 [pool-1-thread-11] DEBUG com.dz.demo.multiThread.ResourcePool - 成功獲取信號量
13:20:22.979 [pool-1-thread-11] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-11 線程 獲取資源 4
13:20:22.979 [pool-1-thread-13] DEBUG com.dz.demo.multiThread.ResourcePool - 成功獲取信號量
13:20:22.979 [pool-1-thread-12] DEBUG com.dz.demo.multiThread.ResourcePool - 成功獲取信號量
13:20:22.979 [pool-1-thread-14] DEBUG com.dz.demo.multiThread.ResourcePool - 成功獲取信號量
13:20:22.980 [pool-1-thread-13] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-13 線程 獲取資源 2
13:20:22.980 [pool-1-thread-12] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-12 線程 獲取資源 0
13:20:22.980 [pool-1-thread-14] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-14 線程 獲取資源 1
13:20:23.980 [pool-1-thread-11] DEBUG com.dz.demo.multiThread.ResourcePool - 釋放資源 4
13:20:23.980 [pool-1-thread-13] DEBUG com.dz.demo.multiThread.ResourcePool - 釋放資源 2
13:20:23.980 [pool-1-thread-12] DEBUG com.dz.demo.multiThread.ResourcePool - 釋放資源 0
13:20:23.981 [pool-1-thread-15] DEBUG com.dz.demo.multiThread.ResourcePool - 成功獲取信號量
13:20:23.981 [pool-1-thread-15] DEBUG com.dz.demo.multiThread.SemaphoreDemo - pool-1-thread-15 線程 獲取資源 4
13:20:23.985 [pool-1-thread-14] DEBUG com.dz.demo.multiThread.ResourcePool - 釋放資源 1
13:20:24.981 [pool-1-thread-15] DEBUG com.dz.demo.multiThread.ResourcePool - 釋放資源 4
本篇講到了多線程協(xié)作的CountDownLatch、CyclicBarrier馁龟、Semaphore三個api的使用場景和使用用例崩侠,其他工具比如信號量的api在guava的RateLimiter也可以實現(xiàn)限流,使用guava的令牌桶思想坷檩,也可以用redis實現(xiàn)在分布式環(huán)境下的限流却音。本輪多線程篇將暫時寫到這,將來有人咨詢其他問題也可以繼續(xù)更新多線程篇矢炼。下一篇將是關(guān)于redis的搭建系瓢、使用、集群和在分布式情況下的應(yīng)用場景句灌。