線程池- AbstractExecutorService

前言

AbstractExecutorService實(shí)現(xiàn)了ExecutorService和Executor接口的基本方法秽梅,ThreadPoolExecute和ForkJoinPool繼承AbstractExecutorService就可以減少實(shí)現(xiàn)的復(fù)雜度,接口適配器模式
類繼承結(jié)構(gòu)
  1. Executor接口

Executor的存在用來實(shí)現(xiàn)異步框架(將任務(wù)和任務(wù)的執(zhí)行分開,不同于Thread將任務(wù)和執(zhí)行綁定在一起),即將任務(wù)提交和任務(wù)如何執(zhí)行分開,Executor正是用來提交任務(wù)的。 void execute(Runnable command)用于提交沒有返回值的任務(wù)

public interface Executor {
    void execute(Runnable command);
}
  1. ExecutorService接口

ExecutorService接口繼承自Executor 接口,并且提供了對任務(wù)執(zhí)行過程的管理操作孩灯,為了Executor提供各種管理服務(wù)而存在的,拓展了提交有返回值的任務(wù)的submit()方法

public interface ExecutorService extends Executor {
// 請求關(guān)閉逾滥、發(fā)生超時(shí)或者當(dāng)前線程中斷峰档,無論哪一個(gè)首先發(fā)生之后败匹,都將導(dǎo)致阻塞,直到所有任務(wù)完成執(zhí)行讥巡。
boolean awaitTermination(long timeout, TimeUnit unit);
// 執(zhí)行給定的任務(wù)掀亩,當(dāng)所有任務(wù)完成時(shí),返回保持任務(wù)狀態(tài)和結(jié)果的 Future 列表欢顷。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
// 執(zhí)行給定的任務(wù)槽棍,當(dāng)所有任務(wù)完成或超時(shí)期滿時(shí)(無論哪個(gè)首先發(fā)生),返回保持任務(wù)狀態(tài)和結(jié)果的 Future 列表抬驴。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);
// 執(zhí)行給定的任務(wù)炼七,如果某個(gè)任務(wù)已成功完成(也就是未拋出異常),則返回其結(jié)果布持。
<T> T invokeAny(Collection<? extends Callable<T>> tasks);
// 執(zhí)行給定的任務(wù)豌拙,如果在給定的超時(shí)期滿前某個(gè)任務(wù)已成功完成(也就是未拋出異常),則返回其結(jié)果题暖。
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);
// 如果此執(zhí)行程序已關(guān)閉按傅,則返回 true。
boolean isShutdown();
// 如果關(guān)閉后所有任務(wù)都已完成胧卤,則返回 true唯绍。
boolean isTerminated();
// 啟動(dòng)一次順序關(guān)閉,執(zhí)行以前提交的任務(wù)灌侣,但不接受新任務(wù)。
void shutdown();
// 試圖停止所有正在執(zhí)行的活動(dòng)任務(wù)裂问,暫停處理正在等待的任務(wù)侧啼,并返回等待執(zhí)行的任務(wù)列表。
List<Runnable> shutdownNow();
// 提交一個(gè)返回值的任務(wù)用于執(zhí)行堪簿,返回一個(gè)表示任務(wù)的未決結(jié)果的 Future痊乾。
<T> Future<T> submit(Callable<T> task);
// 提交一個(gè) Runnable 任務(wù)用于執(zhí)行,并返回一個(gè)表示該任務(wù)的 Future椭更。
Future<?> submit(Runnable task);
// 提交一個(gè) Runnable 任務(wù)用于執(zhí)行哪审,并返回一個(gè)表示該任務(wù)的 Future。
<T> Future<T> submit(Runnable task, T result);
}
  1. AbstractExecutorService是對ExecutorService接口的默認(rèn)實(shí)現(xiàn)

AbstractExecutorService 源碼詳解

invokeAny方法實(shí)現(xiàn)
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        // 含有結(jié)果的Future隊(duì)列
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        // 將本對象作為Executor創(chuàng)建ExecutorCompletionService對象
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        try {
            // 記錄可能拋出的執(zhí)行異常
            ExecutionException ee = null;
            // 初始化超時(shí)時(shí)間
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // 確定在主循環(huán)之前開始一個(gè)任務(wù)
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1; // 記錄正在執(zhí)行的任務(wù)數(shù)量

            for (;;) {
                // 獲取并移除下一個(gè)將要完成的任務(wù)的結(jié)果表示虑瀑,如果沒有任何表示則返回null
                Future<T> f = ecs.poll();// 底層調(diào)用隊(duì)列的poll方法(非阻塞)
                if (f == null) { // 沒有結(jié)果表示
                    if (ntasks > 0) { //如果還有剩余的任務(wù)湿滓,則提交下一個(gè)任務(wù)
                        --ntasks;
                        futures.add(ecs.submit(it.next())); 
                        ++active;
                    }
                    // 出現(xiàn)這種情況說明,已經(jīng)有任務(wù)完成舌狗,并返回結(jié)果表示叽奥,但是
                    // 捕獲到了異常,則跳出主循環(huán)痛侍,進(jìn)行異常的拋出
                    else if (active == 0) 
                        break;
                    else if (timed) { // 超時(shí)獲取結(jié)果表示
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else // 阻塞獲取結(jié)果表示
                        f = ecs.take();
                }
                if (f != null) { //含有結(jié)果表示
                    --active;
                    try {
                        return f.get(); // 返回結(jié)果
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }
            // 
            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally { // 最后取消所有提交的任務(wù)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }
    // 對doInvokeAny的封裝朝氓,實(shí)現(xiàn)無超時(shí)等待的版本
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }
     // 對doInvokeAny的封裝,實(shí)現(xiàn)超時(shí)等待的版本
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }

通過對私有內(nèi)部實(shí)現(xiàn)doInvokeAny的封裝,實(shí)現(xiàn)對外的無超時(shí)等待的版本和超時(shí)等待的兩個(gè)版本赵哲,通過ExecutorCompletionService類來實(shí)現(xiàn)對所有提交的任務(wù)執(zhí)行完成時(shí)返回結(jié)果的存儲(chǔ)和獲取待德。

  1. invokeAny方法沒有對task進(jìn)行顯示地包裝,但是通過ExecutorCompletionService的submit()方法提交任務(wù)時(shí)枫夺,實(shí)際上是調(diào)用newTaskFor()方法對任務(wù)進(jìn)行了包裝為RunnableFuture對象将宪,然后調(diào)用了本對象的execute()方法提交任務(wù),并返回異步計(jì)算結(jié)果對象
// ExecutorCompletionService 的submit方法
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }
  1. 使用了ExecutorCompletionService對象對任務(wù)執(zhí)行完成時(shí)結(jié)果的存取筷屡,隱含了對任務(wù)是否完成的判斷涧偷;所以對返回結(jié)果就不用通過isDone()方法判斷是否任務(wù)已經(jīng)完成了
    ExecutorCompletionService部分源碼如下:
    // 阻塞隊(duì)列:用來存儲(chǔ)已經(jīng)完成的任務(wù)
    private final BlockingQueue<Future<V>> completionQueue;

    /**
     * 拓展了 FutureTask 在完成時(shí)將任務(wù)入隊(duì)功能
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        // 此方法在FutureTask任務(wù)run方法完成時(shí)調(diào)用,這里是將完成的任務(wù)入隊(duì)
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }
invokeAll方法
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false; // 所有任務(wù)是否完成的標(biāo)志
        try {
            // 對所有任務(wù)進(jìn)行包裝毙死,并提交任務(wù)燎潮,并將返回的結(jié)果添加到futures集合中
            for (Callable<T> t : tasks) { 
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            // 對所有結(jié)果進(jìn)行判斷或者阻塞等待結(jié)果返回
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) { // 如果任務(wù)沒有完成
                    try {
                        f.get(); // 則阻塞等待結(jié)果返回,并壓制異常
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            // 當(dāng)所有任務(wù)已經(jīng)完成了(不管是正常完成還是異常完成扼倘,
            // 如發(fā)生CancellationException确封、ExecutionException ),
            // 則將完成標(biāo)志設(shè)為true再菊,并返回結(jié)果集合
            done = true; 
            return futures;
        } finally {
            if (!done) // 如果發(fā)生中斷異常InterruptedException 則取消已經(jīng)提交的任務(wù)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false; 
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            final long deadline = System.nanoTime() + nanos;
            final int size = futures.size();

            for (int i = 0; i < size; i++) {
                execute((Runnable)futures.get(i));
                // 在添加執(zhí)行任務(wù)時(shí)超時(shí)判斷爪喘,如果超時(shí)則立刻返回futures集合
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L)
                    return futures;
            }
            // 每次對結(jié)果進(jìn)行判斷時(shí)都進(jìn)行超時(shí)判斷
            for (int i = 0; i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) { // 判斷超時(shí)
                    if (nanos <= 0L)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    nanos = deadline - System.nanoTime(); // 更新剩余時(shí)間
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

invokeAll方法也實(shí)現(xiàn)了無超時(shí)和超時(shí)兩個(gè)版本

  1. 無超時(shí)版本首先將所有任務(wù)包裝后提交給本對象的執(zhí)行器(調(diào)用execute)執(zhí)行,并將返回的結(jié)果添加到futures集合中纠拔,然后對futures集合進(jìn)行遍歷判斷秉剑,是否已經(jīng)完成,如果沒有完成則使用get方法阻塞稠诲,等待結(jié)果的返回侦鹏,并壓制了一些異常;并在finally模塊對標(biāo)志進(jìn)行檢查臀叙,取消已經(jīng)提交的任務(wù)略水。
  2. 超時(shí)版本和無超時(shí)版本基本一致,但是加了超時(shí)邏輯劝萤。在2個(gè)地方增加了超時(shí)判斷1) 在添加執(zhí)行任務(wù)時(shí)超時(shí)判斷渊涝,如果超時(shí)則立刻返回futures集合;** 2) **每次對結(jié)果進(jìn)行判斷時(shí)都進(jìn)行超時(shí)判斷。
方法總覽
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末床嫌,一起剝皮案震驚了整個(gè)濱河市跨释,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌厌处,老刑警劉巖煤傍,帶你破解...
    沈念sama閱讀 222,681評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異嘱蛋,居然都是意外死亡蚯姆,警方通過查閱死者的電腦和手機(jī)五续,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,205評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來龄恋,“玉大人疙驾,你說我怎么就攤上這事」希” “怎么了它碎?”我有些...
    開封第一講書人閱讀 169,421評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長显押。 經(jīng)常有香客問我扳肛,道長,這世上最難降的妖魔是什么乘碑? 我笑而不...
    開封第一講書人閱讀 60,114評(píng)論 1 300
  • 正文 為了忘掉前任挖息,我火速辦了婚禮,結(jié)果婚禮上兽肤,老公的妹妹穿的比我還像新娘套腹。我一直安慰自己,他們只是感情好资铡,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,116評(píng)論 6 398
  • 文/花漫 我一把揭開白布电禀。 她就那樣靜靜地躺著,像睡著了一般笤休。 火紅的嫁衣襯著肌膚如雪尖飞。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,713評(píng)論 1 312
  • 那天店雅,我揣著相機(jī)與錄音政基,去河邊找鬼。 笑死底洗,一個(gè)胖子當(dāng)著我的面吹牛腋么,可吹牛的內(nèi)容都是我干的咕娄。 我是一名探鬼主播亥揖,決...
    沈念sama閱讀 41,170評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼圣勒!你這毒婦竟也來了费变?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,116評(píng)論 0 277
  • 序言:老撾萬榮一對情侶失蹤圣贸,失蹤者是張志新(化名)和其女友劉穎挚歧,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體吁峻,經(jīng)...
    沈念sama閱讀 46,651評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡滑负,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,714評(píng)論 3 342
  • 正文 我和宋清朗相戀三年在张,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片矮慕。...
    茶點(diǎn)故事閱讀 40,865評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡帮匾,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出痴鳄,到底是詐尸還是另有隱情瘟斜,我是刑警寧澤,帶...
    沈念sama閱讀 36,527評(píng)論 5 351
  • 正文 年R本政府宣布痪寻,位于F島的核電站螺句,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏橡类。R本人自食惡果不足惜蛇尚,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,211評(píng)論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望猫态。 院中可真熱鬧佣蓉,春花似錦、人聲如沸亲雪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,699評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽义辕。三九已至虾标,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間灌砖,已是汗流浹背璧函。 一陣腳步聲響...
    開封第一講書人閱讀 33,814評(píng)論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留基显,地道東北人蘸吓。 一個(gè)月前我還...
    沈念sama閱讀 49,299評(píng)論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像撩幽,于是被迫代替她去往敵國和親库继。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,870評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容