背景
在一些場(chǎng)景中邮绿,我們需要獲取多份數(shù)據(jù)阔挠,而這些數(shù)據(jù)獲取的先后順序是無(wú)關(guān)的,我們只需要把數(shù)據(jù)收集齊俘侠,然后再對(duì)這些數(shù)據(jù)統(tǒng)一處理。
比如:執(zhí)行兩個(gè)任務(wù) task1 和 task2蔬将,都執(zhí)行完畢后爷速,再把兩個(gè)任務(wù)的結(jié)果相加。
例如執(zhí)行 task1 需要 1s霞怀,執(zhí)行 task2 需要 2s惫东,順序執(zhí)行,那么總時(shí)間是 1s + 2s = 3s毙石。而如果我們用異步的方式廉沮,task1 和 task2 同時(shí)執(zhí)行,則能減少響應(yīng)的時(shí)間徐矩。
但并行的方式則必須注意滞时,需要等待所有任務(wù)執(zhí)行完畢之后,才能計(jì)算最終的結(jié)果滤灯,因此這就涉及多線程之間的等待坪稽。
方案一:Future.get() 獲取數(shù)據(jù)
先創(chuàng)建一個(gè)線程池,并使用 ExecutorService.submit()
方法提交兩個(gè) Callable
任務(wù)鳞骤。
提交任務(wù)后窒百,這倆任務(wù)將開(kāi)始異步并行執(zhí)行,并返回了 Future
類(lèi)型的對(duì)象弟孟,代表一個(gè)未來(lái)能獲取結(jié)果的對(duì)象贝咙。
當(dāng)我們調(diào)用 Future
對(duì)象的 get()
方法時(shí)样悟,如果提交的任務(wù)已經(jīng)完成拂募,我們就直接獲得結(jié)果。如果異步任務(wù)還沒(méi)有完成窟她,那么 get()
會(huì)阻塞當(dāng)前線程陈症,直到所有任務(wù)都完成后,才能獲得執(zhí)行的結(jié)果震糖。
注意:當(dāng) submit()
時(shí)录肯,任務(wù)就已經(jīng)開(kāi)始執(zhí)行,但不會(huì)阻塞線程吊说,當(dāng) get()
時(shí)论咏,如果還有未完成的任務(wù)优炬,才會(huì)阻塞當(dāng)前主線程的運(yùn)行,直到所有任務(wù)都完成后厅贪,才能獲得執(zhí)行的結(jié)果蠢护。
void executorServiceExample() throws Exception {
long time1 = System.currentTimeMillis();
// 任務(wù)1
Callable<Integer> task1 = () -> {
Thread.sleep(1000);
return 10;
};
// 任務(wù)2
Callable<Integer> task2 = () -> {
Thread.sleep(2000);
return 20;
};
// 創(chuàng)建線程池
ExecutorService service = Executors.newFixedThreadPool(2);
// 提交任務(wù)2個(gè)任務(wù)
Future<Integer> submit1 = service.submit(task1);
Future<Integer> submit2 = service.submit(task2);
// 獲取任務(wù)結(jié)果,由于任務(wù)還未執(zhí)行完畢养涮,因此下一行將阻塞當(dāng)前線程葵硕,直到所有任務(wù)都執(zhí)行完畢
Integer result1 = submit1.get();
Integer result2 = submit2.get();
System.out.println("總耗時(shí):" + (System.currentTimeMillis() - time1)); // 總耗時(shí):2012
System.out.println(result1 + result2); // 30
service.shutdown();
}
方案二:CountdownLatch
CountdownLatch 用來(lái)控制一個(gè)或者多個(gè)線程等待多個(gè)線程。
維護(hù)了一個(gè)計(jì)數(shù)器 cnt贯吓,每次調(diào)用 countDown()
方法會(huì)讓計(jì)數(shù)器 cnt 的值減 1懈凹,當(dāng) cnt 減到 0 的時(shí)候,那些因?yàn)檎{(diào)用 await()
方法而在等待的線程就會(huì)被喚醒悄谐。
void countdownLatchExample() throws InterruptedException {
long time = System.currentTimeMillis();
// 創(chuàng)建 CountDownLatch介评,計(jì)數(shù)為2
CountDownLatch countDownLatch = new CountDownLatch(2);
// 創(chuàng)建線程池
ExecutorService executorService = Executors.newCachedThreadPool(2);
// 任務(wù)1
Runnable task1 = () -> {
System.out.println("begin task 1");
Thread.sleep(1000); // 省略 try-catch
System.out.println("end task 1");
countDownLatch.countDown(); // 計(jì)數(shù)減1
};
// 任務(wù)2
Runnable task2 = () -> {
System.out.println("begin task 2");
Thread.sleep(2000); // 省略 try-catch
System.out.println("end task 2");
countDownLatch.countDown(); // 計(jì)數(shù)減1
};
// 并行執(zhí)行任務(wù)
executorService.execute(task1);
executorService.execute(task2);
// 等待任務(wù)執(zhí)行完畢,直到countDownLatch計(jì)數(shù)器為0爬舰,才往下執(zhí)行
countDownLatch.await();
System.out.println("total time: " + (System.currentTimeMillis() - time));
executorService.shutdown();
}
方案三:CyclicBarrier
CyclicBarrier 用來(lái)控制多個(gè)線程互相等待威沫,只有當(dāng)多個(gè)線程都到達(dá)時(shí),這些線程才會(huì)繼續(xù)執(zhí)行洼专。
與 CountdownLatch 相似棒掠,都是通過(guò)維護(hù)計(jì)數(shù)器來(lái)實(shí)現(xiàn)的。線程執(zhí)行 await()
方法之后計(jì)數(shù)器會(huì)減 1屁商,并進(jìn)行等待烟很,直到計(jì)數(shù)器為 0,所有調(diào)用 await()
方法而在等待的線程才能繼續(xù)執(zhí)行蜡镶。
CyclicBarrier 和 CountdownLatch 的一個(gè)區(qū)別是:
1雾袱、CyclicBarrier 的計(jì)數(shù)器通過(guò)調(diào)用 reset()
方法可以循環(huán)使用,所以它才叫做循環(huán)屏障官还。
2芹橡、countDownLatch.countDown()
只會(huì)將計(jì)數(shù)減 1,不會(huì)阻塞當(dāng)前線程望伦,countDownLatch.await()
只會(huì)阻塞當(dāng)前線程林说,不會(huì)減少計(jì)數(shù);而 cyclicBarrier.await()
既會(huì)阻塞當(dāng)前線程屯伞,也會(huì)計(jì)數(shù)減 1腿箩。
CyclicBarrier 有兩個(gè)構(gòu)造函數(shù),其中 parties
指示計(jì)數(shù)器的初始值劣摇,barrierAction
當(dāng)計(jì)數(shù)為 0 時(shí)珠移,將回調(diào)該方法。
public CyclicBarrier(int parties);
public CyclicBarrier(int parties, Runnable barrierAction);
具體使用如下:
void cyclicBarrierExample() {
long time = System.currentTimeMillis();
// 創(chuàng)建CyclicBarrier,計(jì)數(shù)為2钧惧,當(dāng)計(jì)數(shù)為0時(shí)會(huì)回調(diào)該方法
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
System.out.println("all task end");
System.out.println("total time: " + (System.currentTimeMillis() - time));
});
// 創(chuàng)建線程池
ExecutorService executorService = Executors.newCachedThreadPool();
// 任務(wù)1
Runnable task1 = () -> {
System.out.println("begin task 1");
Thread.sleep(1000); // 省略 try-catch
System.out.println("end task 1");
cyclicBarrier.await(); // 計(jì)數(shù)減1并阻塞當(dāng)前線程暇韧,省略了try-catch
};
// 任務(wù)2
Runnable task2 = () -> {
System.out.println("begin task 2");
Thread.sleep(2000); // 省略 try-catch
System.out.println("end task 2");
cyclicBarrier.await(); // 計(jì)數(shù)減1并阻塞當(dāng)前線程,省略了try-catch
};
executorService.execute(task1);
executorService.execute(task2);
executorService.shutdown();
}