ExecutorService中定義了兩個(gè)批量執(zhí)行任務(wù)的方法,invokeAll()和invokeAny()莺债,在批量執(zhí)行或多選一的業(yè)務(wù)場(chǎng)景中非常方便。invokeAll()在所有任務(wù)都完成(包括成功/被中斷/超時(shí))后才會(huì)返回签夭,invokeAny()在任意一個(gè)任務(wù)成功(或ExecutorService被中斷/超時(shí))后就會(huì)返回齐邦。
AbstractExecutorService實(shí)現(xiàn)了這兩個(gè)方法,本文將先后分析invokeAll()和invokeAny()兩個(gè)方法的源碼實(shí)現(xiàn)第租。
JDK版本:oracle java 1.8.0_102
invokeAll()
invokeAll()在所有任務(wù)都完成(包括成功/被中斷/超時(shí))后才會(huì)返回措拇。有不限時(shí)和限時(shí)版本,從更簡(jiǎn)單的不限時(shí)版入手慎宾。
不限時(shí)版
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
f.get(); // 無所謂先執(zhí)行哪個(gè)任務(wù)的get()方法
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
8-12行丐吓,先將所有任務(wù)都提交到線程池(當(dāng)然,任何ExecutorService均可)中趟据。
嚴(yán)格來說券犁,不是“提交”,而是“執(zhí)行”汹碱。執(zhí)行可能是同步或異步的粘衬,取決于線程池的策略。不過由于我們僅討論異步情況(同步同理)咳促,用“提交”一詞更容易理解稚新。下同。
13-22行跪腹,for循環(huán)的目的是阻塞調(diào)用invokeAll的線程褂删,直到所有任務(wù)都執(zhí)行完畢。當(dāng)然我們也可以使用其他方式實(shí)現(xiàn)阻塞冲茸,不過這種方式是最簡(jiǎn)單的:
- 15行如果f.isDone()返回true屯阀,則當(dāng)前任務(wù)已結(jié)束,繼續(xù)檢查下一個(gè)任務(wù)噪裕;否則蹲盘,調(diào)用f.get()讓線程阻塞,直到當(dāng)前任務(wù)結(jié)束膳音。
- 17行無所謂先執(zhí)行哪一個(gè)FutureTask實(shí)例的get()方法召衔。由于所有任務(wù)并發(fā)執(zhí)行,總體阻塞時(shí)間取決于于是耗時(shí)最長(zhǎng)的任務(wù)祭陷,從而實(shí)現(xiàn)了invodeAll的阻塞調(diào)用苍凛。
- 18-20行沒有捕獲InterruptedException。如果有任務(wù)被中斷兵志,主線程將拋出InterruptedException醇蝴,以響應(yīng)中斷。
最后想罕,為防止在全部任務(wù)結(jié)束之前過早退出悠栓,23行霉涨、25-29行相配合,如果done不為true(未執(zhí)行到40行就退出了)則取消全部任務(wù)惭适。
限時(shí)版
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
if (nanos <= 0L) // 及時(shí)檢查是否超時(shí)
return futures;
}
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (nanos <= 0L) // 及時(shí)檢查是否超時(shí)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
10-11行笙瑟,先將所有任務(wù)封裝為FutureTask,添加到futures列表中癞志。
18-23行往枷,每提交一個(gè)任務(wù),就立刻判斷是否超時(shí)凄杯。這樣的話错洁,如果在任務(wù)全部提交到線程池中之前,就已經(jīng)達(dá)到了超時(shí)時(shí)間戒突,則能夠盡快檢查出超時(shí)屯碴,結(jié)束提交并退出。
對(duì)于限時(shí)版膊存,將封裝任務(wù)與提交任務(wù)拆開是必要的窿锉。
28-29行,每次在調(diào)用限時(shí)版f.get()進(jìn)入阻塞狀態(tài)之前膝舅,先檢查是否超時(shí)嗡载。這里也是希望超時(shí)后,能夠盡快發(fā)現(xiàn)并退出仍稀。
其他同不限時(shí)版洼滚。
invokeAny()
invokeAny()在任意一個(gè)任務(wù)成功(或ExecutorService被中斷/超時(shí))后就會(huì)返回。也分為不限時(shí)和限時(shí)版本技潘,但為了進(jìn)一步保障性能遥巴,invokeAny()的實(shí)現(xiàn)思路與invokeAll()略有不同。
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
內(nèi)部調(diào)用了doInvokeAny()享幽。
學(xué)習(xí)5-8行的寫法铲掐,代碼自解釋。
doInvokeAny()
簡(jiǎn)化如下:
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
...
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
...
try {
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
for (;;) {
Future<T> f = ecs.poll();
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();
}
if (f != null) {
--active;
try {
return f.get();
} catch (...) {
ee = ...;
}
}
}
...
throw ee;
} finally {
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
要點(diǎn):
- ntasks維護(hù)未提交的任務(wù)數(shù)值桩,active維護(hù)已提交未結(jié)束的任務(wù)數(shù)摆霉。
- 內(nèi)部使用ExecutorCompletionService維護(hù)已完成的任務(wù)。
- 如果沒有任務(wù)成功結(jié)束奔坟,則返回捕獲的最后一個(gè)異常携栋。
- 第一個(gè)任務(wù)是必將被執(zhí)行的,其他任務(wù)按照迭代器順序增量提交咳秉。
增量提交有什么好處呢婉支?節(jié)省資源,如果在提交過程中就有任務(wù)完成了澜建,那么沒必要繼續(xù)提交任務(wù)耗費(fèi)時(shí)間和空間向挖;降低延遲蝌以,如果有任務(wù)完成,與全量提交相比何之,能更早被發(fā)現(xiàn)饼灿。
14行先向線程池提交一個(gè)任務(wù)(迭代器第一個(gè)),ntasks--帝美,active=1:
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
這里是真“提交”了,不是“執(zhí)行”晤硕。
然后18-45行循環(huán)檢查是否有任務(wù)成功結(jié)束悼潭。
首先,19行通過及時(shí)返回的poll()方法舞箍,嘗試取出一個(gè)已完成的任務(wù):
Future<T> f = ecs.poll();
根據(jù)f的結(jié)果舰褪,分成兩種情況討論。
ExecutorCompletionService默認(rèn)使用LinkedBlockingQueue作為任務(wù)隊(duì)列疏橄。對(duì)LinkedBlockingQueue不熟悉的可參照源碼|并發(fā)一枝花之BlockingQueue占拍。
case1:如果有任務(wù)完成
如果有任務(wù)完成,則f不為null捎迫,進(jìn)入40-49行晃酒,active--,并嘗試取出任務(wù)結(jié)果:
if (f != null) {
--active;
try {
return f.get();
} catch (...) {
ee = ...;
}
}
- 如果能夠成功取出窄绒,即當(dāng)前任務(wù)已成功結(jié)束贝次,直接返回。
- 如果拋出異常彰导,則當(dāng)前任務(wù)異常結(jié)束蛔翅,使用ee記錄異常。
顯然位谋,如果已完成的任務(wù)是異常結(jié)束的山析,invokeAny()不會(huì)退出,而是繼續(xù)查看其它任務(wù)。
FutureTask#get()的用法參照源碼|使用FutureTask的正確姿勢(shì)鹃栽。
case2:如果沒有任務(wù)完成
如果沒有任務(wù)完成炬丸,則f為null,進(jìn)入23-39行翩腐,判斷是繼續(xù)提交任務(wù)、退出還是等待任務(wù)結(jié)果:
if (f == null) {
if (ntasks > 0) { // check1
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0) // check2
break;
else if (timed) { // check3
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else // check4
f = ecs.take();
}
- check1:如果還有剩余任務(wù)(ntasks > 0)膏燃,那就繼續(xù)提交茂卦,同時(shí)ntasks--,active++组哩。
- check2:如果沒有剩余任務(wù)了等龙,且也沒有已提交未結(jié)束的任務(wù)(active == 0)处渣,則表示全部任務(wù)均已執(zhí)行結(jié)束,但沒有一個(gè)任務(wù)是成功的蛛砰,可以退出循環(huán)罐栈。退出循環(huán)后,將在47行拋出ee記錄的最后一個(gè)異常泥畅。
- check3:如果可以沒有剩余任務(wù)荠诬,但還有已提交未結(jié)束的任務(wù),且開啟了超時(shí)機(jī)制位仁,則嘗試使用超時(shí)版poll()等待任務(wù)完成柑贞。但是,如果這種情況下超時(shí)了聂抢,就表示整個(gè)invokeAny()方法超時(shí)了钧嘶,所以poll()返回null的時(shí)候,要主動(dòng)拋出TimeoutException琳疏。
- check4:如果可以沒有剩余任務(wù)有决,但還有已提交未結(jié)束的任務(wù),且未開啟超時(shí)機(jī)制空盼,則使用無限阻塞的take()方法书幕,等待任務(wù)完成。
這種一堆if-else的代碼很丑揽趾“粗洌可修改如下:
if (f == null) { // check1 if (ntasks > 0) { --ntasks; futures.add(ecs.submit(it.next())); ++active; continue; } if (active == 0) { // check2 assert ntasks == 0; // 防止自己改吧改吧把它這句判斷挪到了前面 break; } if (timed) { // check3 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); if (f == null) { throw new TimeoutException(); } nanos = deadline - System.nanoTime(); } else { // check4 f = ecs.take(); } }
修改依據(jù):
- check1、check2但骨、check3/check4沒有并列的判斷關(guān)系
- check3励七、check4有并列的判斷關(guān)系,非此即彼
- 結(jié)構(gòu)更清爽
總結(jié)
不會(huì)寫總結(jié)奔缠。掠抬。。
但是會(huì)寫吐槽靶0ァA讲ā!闷哆!
猴子現(xiàn)在每次寫博客都經(jīng)歷著從“臥槽似乎很簡(jiǎn)單啊腰奋,寫個(gè)毛”到“臥槽這跟想象的不一樣啊抱怔!臥槽巨帥劣坊!”的心態(tài)崩塌,各位巨巨寫的代碼是真好看屈留,性能還棒棒噠局冰,羨慕崇拜打雞血测蘑。哎,康二,碳胳,保持謙卑,并羨慕臉遙拜各位巨巨沫勿。
本文鏈接:源碼|批量執(zhí)行invokeAll()&&多選一invokeAny()
作者:猴子007
出處:https://monkeysayhi.github.io
本文基于 知識(shí)共享署名-相同方式共享 4.0 國(guó)際許可協(xié)議發(fā)布,歡迎轉(zhuǎn)載产雹,演繹或用于商業(yè)目的诫惭,但是必須保留本文的署名及鏈接。