源碼|批量執(zhí)行invokeAll()&&多選一invokeAny()

ExecutorService中定義了兩個(gè)批量執(zhí)行任務(wù)的方法,invokeAll()和invokeAny()莺债,在批量執(zhí)行或多選一的業(yè)務(wù)場(chǎng)景中非常方便。invokeAll()在所有任務(wù)都完成(包括成功/被中斷/超時(shí))后才會(huì)返回签夭,invokeAny()在任意一個(gè)任務(wù)成功(或ExecutorService被中斷/超時(shí))后就會(huì)返回齐邦。

AbstractExecutorService實(shí)現(xiàn)了這兩個(gè)方法,本文將先后分析invokeAll()和invokeAny()兩個(gè)方法的源碼實(shí)現(xiàn)第租。

JDK版本:oracle java 1.8.0_102

invokeAll()

invokeAll()在所有任務(wù)都完成(包括成功/被中斷/超時(shí))后才會(huì)返回措拇。有不限時(shí)和限時(shí)版本,從更簡(jiǎn)單的不限時(shí)版入手慎宾。

不限時(shí)版

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }
        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {
                try {
                    f.get(); // 無所謂先執(zhí)行哪個(gè)任務(wù)的get()方法
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

8-12行丐吓,先將所有任務(wù)都提交到線程池(當(dāng)然,任何ExecutorService均可)中趟据。

嚴(yán)格來說券犁,不是“提交”,而是“執(zhí)行”汹碱。執(zhí)行可能是同步或異步的粘衬,取決于線程池的策略。不過由于我們僅討論異步情況(同步同理)咳促,用“提交”一詞更容易理解稚新。下同。

13-22行跪腹,for循環(huán)的目的是阻塞調(diào)用invokeAll的線程褂删,直到所有任務(wù)都執(zhí)行完畢。當(dāng)然我們也可以使用其他方式實(shí)現(xiàn)阻塞冲茸,不過這種方式是最簡(jiǎn)單的:

  • 15行如果f.isDone()返回true屯阀,則當(dāng)前任務(wù)已結(jié)束,繼續(xù)檢查下一個(gè)任務(wù)噪裕;否則蹲盘,調(diào)用f.get()讓線程阻塞,直到當(dāng)前任務(wù)結(jié)束膳音。
  • 17行無所謂先執(zhí)行哪一個(gè)FutureTask實(shí)例的get()方法召衔。由于所有任務(wù)并發(fā)執(zhí)行,總體阻塞時(shí)間取決于于是耗時(shí)最長(zhǎng)的任務(wù)祭陷,從而實(shí)現(xiàn)了invodeAll的阻塞調(diào)用苍凛。
  • 18-20行沒有捕獲InterruptedException。如果有任務(wù)被中斷兵志,主線程將拋出InterruptedException醇蝴,以響應(yīng)中斷。

最后想罕,為防止在全部任務(wù)結(jié)束之前過早退出悠栓,23行霉涨、25-29行相配合,如果done不為true(未執(zhí)行到40行就退出了)則取消全部任務(wù)惭适。

限時(shí)版

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                     long timeout, TimeUnit unit)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks)
            futures.add(newTaskFor(t));

        final long deadline = System.nanoTime() + nanos;
        final int size = futures.size();

        // Interleave time checks and calls to execute in case
        // executor doesn't have any/much parallelism.
        for (int i = 0; i < size; i++) {
            execute((Runnable)futures.get(i));
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) // 及時(shí)檢查是否超時(shí)
                return futures;
        }

        for (int i = 0; i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {
                if (nanos <= 0L) // 及時(shí)檢查是否超時(shí)
                    return futures;
                try {
                    f.get(nanos, TimeUnit.NANOSECONDS);
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                } catch (TimeoutException toe) {
                    return futures;
                }
                nanos = deadline - System.nanoTime();
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

10-11行笙瑟,先將所有任務(wù)封裝為FutureTask,添加到futures列表中癞志。

18-23行往枷,每提交一個(gè)任務(wù),就立刻判斷是否超時(shí)凄杯。這樣的話错洁,如果在任務(wù)全部提交到線程池中之前,就已經(jīng)達(dá)到了超時(shí)時(shí)間戒突,則能夠盡快檢查出超時(shí)屯碴,結(jié)束提交并退出。

對(duì)于限時(shí)版膊存,將封裝任務(wù)與提交任務(wù)拆開是必要的窿锉。

28-29行,每次在調(diào)用限時(shí)版f.get()進(jìn)入阻塞狀態(tài)之前膝舅,先檢查是否超時(shí)嗡载。這里也是希望超時(shí)后,能夠盡快發(fā)現(xiàn)并退出仍稀。

其他同不限時(shí)版洼滚。

invokeAny()

invokeAny()在任意一個(gè)任務(wù)成功(或ExecutorService被中斷/超時(shí))后就會(huì)返回。也分為不限時(shí)和限時(shí)版本技潘,但為了進(jìn)一步保障性能遥巴,invokeAny()的實(shí)現(xiàn)思路與invokeAll()略有不同。

public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException {
    try {
        return doInvokeAny(tasks, false, 0);
    } catch (TimeoutException cannotHappen) {
        assert false;
        return null;
    }
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                       long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    return doInvokeAny(tasks, true, unit.toNanos(timeout));
}

內(nèi)部調(diào)用了doInvokeAny()享幽。

學(xué)習(xí)5-8行的寫法铲掐,代碼自解釋。

doInvokeAny()

簡(jiǎn)化如下:

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                          boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
...
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    ExecutorCompletionService<T> ecs =
        new ExecutorCompletionService<T>(this);
...
    try {
        ExecutionException ee = null;
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Iterator<? extends Callable<T>> it = tasks.iterator();

        futures.add(ecs.submit(it.next()));
        --ntasks;
        int active = 1;

        for (;;) {
            Future<T> f = ecs.poll();
            if (f == null) {
                if (ntasks > 0) {
                    --ntasks;
                    futures.add(ecs.submit(it.next()));
                    ++active;
                }
                else if (active == 0)
                    break;
                else if (timed) {
                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    if (f == null)
                        throw new TimeoutException();
                    nanos = deadline - System.nanoTime();
                }
                else
                    f = ecs.take();
            }
            if (f != null) {
                --active;
                try {
                    return f.get();
                } catch (...) {
                    ee = ...;
                }
            }
        }
...
        throw ee;
    } finally {
        for (int i = 0, size = futures.size(); i < size; i++)
            futures.get(i).cancel(true);
    }
}

要點(diǎn):

  • ntasks維護(hù)未提交的任務(wù)數(shù)值桩,active維護(hù)已提交未結(jié)束的任務(wù)數(shù)摆霉。
  • 內(nèi)部使用ExecutorCompletionService維護(hù)已完成的任務(wù)。
  • 如果沒有任務(wù)成功結(jié)束奔坟,則返回捕獲的最后一個(gè)異常携栋。
  • 第一個(gè)任務(wù)是必將被執(zhí)行的,其他任務(wù)按照迭代器順序增量提交咳秉。

增量提交有什么好處呢婉支?節(jié)省資源,如果在提交過程中就有任務(wù)完成了澜建,那么沒必要繼續(xù)提交任務(wù)耗費(fèi)時(shí)間和空間向挖;降低延遲蝌以,如果有任務(wù)完成,與全量提交相比何之,能更早被發(fā)現(xiàn)饼灿。

14行先向線程池提交一個(gè)任務(wù)(迭代器第一個(gè)),ntasks--帝美,active=1:

futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;

這里是真“提交”了,不是“執(zhí)行”晤硕。

然后18-45行循環(huán)檢查是否有任務(wù)成功結(jié)束悼潭。

首先,19行通過及時(shí)返回的poll()方法舞箍,嘗試取出一個(gè)已完成的任務(wù):

Future<T> f = ecs.poll();

根據(jù)f的結(jié)果舰褪,分成兩種情況討論。

ExecutorCompletionService默認(rèn)使用LinkedBlockingQueue作為任務(wù)隊(duì)列疏橄。對(duì)LinkedBlockingQueue不熟悉的可參照源碼|并發(fā)一枝花之BlockingQueue占拍。

case1:如果有任務(wù)完成

如果有任務(wù)完成,則f不為null捎迫,進(jìn)入40-49行晃酒,active--,并嘗試取出任務(wù)結(jié)果:

if (f != null) {
    --active;
    try {
        return f.get();
    } catch (...) {
        ee = ...;
    }
}
  • 如果能夠成功取出窄绒,即當(dāng)前任務(wù)已成功結(jié)束贝次,直接返回。
  • 如果拋出異常彰导,則當(dāng)前任務(wù)異常結(jié)束蛔翅,使用ee記錄異常。

顯然位谋,如果已完成的任務(wù)是異常結(jié)束的山析,invokeAny()不會(huì)退出,而是繼續(xù)查看其它任務(wù)。

FutureTask#get()的用法參照源碼|使用FutureTask的正確姿勢(shì)鹃栽。

case2:如果沒有任務(wù)完成

如果沒有任務(wù)完成炬丸,則f為null,進(jìn)入23-39行翩腐,判斷是繼續(xù)提交任務(wù)、退出還是等待任務(wù)結(jié)果:

if (f == null) {
    if (ntasks > 0) { // check1
        --ntasks;
        futures.add(ecs.submit(it.next()));
        ++active;
    }
    else if (active == 0) // check2
        break;
    else if (timed) { // check3
        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
        if (f == null)
            throw new TimeoutException();
        nanos = deadline - System.nanoTime();
    }
    else // check4
        f = ecs.take();
}
  • check1:如果還有剩余任務(wù)(ntasks > 0)膏燃,那就繼續(xù)提交茂卦,同時(shí)ntasks--,active++组哩。
  • check2:如果沒有剩余任務(wù)了等龙,且也沒有已提交未結(jié)束的任務(wù)(active == 0)处渣,則表示全部任務(wù)均已執(zhí)行結(jié)束,但沒有一個(gè)任務(wù)是成功的蛛砰,可以退出循環(huán)罐栈。退出循環(huán)后,將在47行拋出ee記錄的最后一個(gè)異常泥畅。
  • check3:如果可以沒有剩余任務(wù)荠诬,但還有已提交未結(jié)束的任務(wù),且開啟了超時(shí)機(jī)制位仁,則嘗試使用超時(shí)版poll()等待任務(wù)完成柑贞。但是,如果這種情況下超時(shí)了聂抢,就表示整個(gè)invokeAny()方法超時(shí)了钧嘶,所以poll()返回null的時(shí)候,要主動(dòng)拋出TimeoutException琳疏。
  • check4:如果可以沒有剩余任務(wù)有决,但還有已提交未結(jié)束的任務(wù),且未開啟超時(shí)機(jī)制空盼,則使用無限阻塞的take()方法书幕,等待任務(wù)完成。

這種一堆if-else的代碼很丑揽趾“粗洌可修改如下:

if (f == null) { // check1
   if (ntasks > 0) {
        --ntasks;
        futures.add(ecs.submit(it.next()));
        ++active;
        continue;
    }
    if (active == 0) { // check2
        assert ntasks == 0; // 防止自己改吧改吧把它這句判斷挪到了前面
        break;
    }
   if (timed) { // check3
       f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
       if (f == null) {
           throw new TimeoutException();
       }
       nanos = deadline - System.nanoTime();
   } else { // check4
       f = ecs.take();
   }
}

修改依據(jù):

  • check1、check2但骨、check3/check4沒有并列的判斷關(guān)系
  • check3励七、check4有并列的判斷關(guān)系,非此即彼
  • 結(jié)構(gòu)更清爽

總結(jié)

不會(huì)寫總結(jié)奔缠。掠抬。。

但是會(huì)寫吐槽靶0ァA讲ā!闷哆!

猴子現(xiàn)在每次寫博客都經(jīng)歷著從“臥槽似乎很簡(jiǎn)單啊腰奋,寫個(gè)毛”到“臥槽這跟想象的不一樣啊抱怔!臥槽巨帥劣坊!”的心態(tài)崩塌,各位巨巨寫的代碼是真好看屈留,性能還棒棒噠局冰,羨慕崇拜打雞血测蘑。哎,康二,碳胳,保持謙卑,并羨慕臉遙拜各位巨巨沫勿。


本文鏈接:源碼|批量執(zhí)行invokeAll()&&多選一invokeAny()
作者:猴子007
出處:https://monkeysayhi.github.io
本文基于 知識(shí)共享署名-相同方式共享 4.0 國(guó)際許可協(xié)議發(fā)布,歡迎轉(zhuǎn)載产雹,演繹或用于商業(yè)目的诫惭,但是必須保留本文的署名及鏈接。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末洽故,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子盗誊,更是在濱河造成了極大的恐慌时甚,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件哈踱,死亡現(xiàn)場(chǎng)離奇詭異荒适,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)开镣,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門刀诬,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人邪财,你說我怎么就攤上這事陕壹。” “怎么了树埠?”我有些...
    開封第一講書人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵糠馆,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我怎憋,道長(zhǎng)又碌,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任绊袋,我火速辦了婚禮毕匀,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘癌别。我一直安慰自己皂岔,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開白布展姐。 她就那樣靜靜地躺著凤薛,像睡著了一般姓建。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上缤苫,一...
    開封第一講書人閱讀 51,541評(píng)論 1 305
  • 那天速兔,我揣著相機(jī)與錄音,去河邊找鬼活玲。 笑死涣狗,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的舒憾。 我是一名探鬼主播镀钓,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼镀迂!你這毒婦竟也來了丁溅?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤探遵,失蹤者是張志新(化名)和其女友劉穎窟赏,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體箱季,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡涯穷,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了藏雏。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片拷况。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖掘殴,靈堂內(nèi)的尸體忽然破棺而出赚瘦,到底是詐尸還是另有隱情,我是刑警寧澤奏寨,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布蚤告,位于F島的核電站,受9級(jí)特大地震影響服爷,放射性物質(zhì)發(fā)生泄漏杜恰。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一仍源、第九天 我趴在偏房一處隱蔽的房頂上張望心褐。 院中可真熱鬧,春花似錦笼踩、人聲如沸逗爹。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽掘而。三九已至挟冠,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間袍睡,已是汗流浹背知染。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留斑胜,地道東北人控淡。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像止潘,于是被迫代替她去往敵國(guó)和親掺炭。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • ¥開啟¥ 【iAPP實(shí)現(xiàn)進(jìn)入界面執(zhí)行逐一顯】 〖2017-08-25 15:22:14〗 《//首先開一個(gè)線程凭戴,因...
    小菜c閱讀 6,419評(píng)論 0 17
  • 國(guó)家電網(wǎng)公司企業(yè)標(biāo)準(zhǔn)(Q/GDW)- 面向?qū)ο蟮挠秒娦畔?shù)據(jù)交換協(xié)議 - 報(bào)批稿:20170802 前言: 排版 ...
    庭說閱讀 10,970評(píng)論 6 13
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理涧狮,服務(wù)發(fā)現(xiàn),斷路器么夫,智...
    卡卡羅2017閱讀 134,657評(píng)論 18 139
  • 3月2日 晚上21點(diǎn)左右 永澄老師在支付寶群組進(jìn)行了“共讀如何閱讀一本書”活動(dòng)的第七次閱讀分享者冤。 同時(shí)我們找到了第...
    若曉沐閱讀 741評(píng)論 1 3
  • 《三只小豬》中企圖摧毀小豬房子最后反掉進(jìn)熱鍋燙死的狼,《小紅帽》中假扮外婆吞下小紅帽的惡狼魏割,還有披著羊皮的狼...
    陽光下的逗逗閱讀 365評(píng)論 0 1