ExecutorService|CompletionService的區(qū)別與選擇

這段時(shí)間對(duì)業(yè)務(wù)系統(tǒng)做了個(gè)性能測(cè)試速警,其中使用了較多線程池的技術(shù),故此做一個(gè)技術(shù)總結(jié)。

這次總結(jié)的內(nèi)容比較多忿墅,主要是四個(gè):

ExecutorService
CompletionService
Runnable
Callable
前兩個(gè)是線程池相關(guān)接口,后兩個(gè)是多線程相關(guān)接口沮峡。在最后疚脐,我會(huì)說(shuō)明什么情況下使用哪個(gè)接口,這兩類接口如何搭配使用邢疙。

Tips:個(gè)人拙見(jiàn)棍弄,如有不對(duì),請(qǐng)多多指正疟游。

一呼畸、ExecutorService
ExecutorService是一個(gè)接口,繼承自Executor颁虐。ExecutorService提供了一些常用操作和方法蛮原,但是ExecutorService是一個(gè)接口,無(wú)法實(shí)例化另绩。
不過(guò)儒陨,Java提供了一個(gè)幫助類Executors,可以快速獲取一個(gè)ExecutorService對(duì)象板熊,并使用ExecutorService接口的一些方法框全。

Executors幫助類提供了多個(gè)構(gòu)造線程池的方法,常用的分為兩類:

直接執(zhí)行的
newCachedThreadPool
newFixedThreadPool
newSingleThreadExecutor
延遲或定時(shí)執(zhí)行的
newScheduledThreadPool
newSingleThreadScheduledExecutor
Executors為每種方法提供了一個(gè)線程工廠重載干签。

(一)newCachedThreadPool
創(chuàng)建一個(gè)默認(rèn)的線程池對(duì)象津辩,里面的線程和重用,且在第一次使用的時(shí)候才創(chuàng)建〈兀可以理解為線程優(yōu)先模式闸度,來(lái)一個(gè)創(chuàng)一個(gè)線程,直到線程處理完成后蚜印,再處理其他的任務(wù)莺禁。
Code:

package com.macro.boot.javaBuiltThreadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class MyExecutorService {
    public static void main(String[] args) {
        // 1. 使用幫助類
//        ExecutorService executorService = Executors.newCachedThreadPool();

        // 2. 提交任務(wù)
/*        for (int i = 0; i < 20; i++) {
            executorService.submit(new MyRunnable(i));
        }*/

        // 3. 重載方法測(cè)試
        test();
    }

    private static void test() {
        // 1. 使用幫助類
        ExecutorService executorService = Executors.newCachedThreadPool(
                new ThreadFactory() {
                    int n = 1;

                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "線程正在執(zhí)行 --->" + n++);
                    }
                }
        );

        // 2. 提交任務(wù)
        for (int i = 0; i < 20; i++) {
            executorService.submit(new MyRunnable(i));
        }
    }
}

/**
 * 1. 線程類
 */
class MyRunnable implements Runnable {
    private int id;

    public MyRunnable(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println(name + "正在執(zhí)行..." + "--->" + id);
    }
}

輸出:幾乎是一下子就執(zhí)行了,newCachedThreadPool會(huì)創(chuàng)建和任務(wù)數(shù)同等匹配的線程窄赋,直到處理完成任務(wù)的線程可以處理新增的任務(wù)哟冬。

(二)newFixedThreadPool
Code:創(chuàng)建一個(gè)可重用固定線程數(shù)量的線程池

package com.macro.boot.javaBuiltThreadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
 * 創(chuàng)建一個(gè)可固定重用次數(shù)的線程池
 */
public class MyNewFixedThreadPool {
    public static void main(String[] args) {
/*        // nThreads:線程數(shù)量
        ExecutorService es = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            es.submit(new MyRunnable(i));
        }*/
        test();
    }

    private static void test() {
        ExecutorService es = Executors.newFixedThreadPool(5, new ThreadFactory() {
            int n = 1;

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "線程" + n++);
            }
        });
        // 提交任務(wù)
        for (int i = 0; i < 10; i++) {
            es.submit(new MyRunnable(i));
        }
    }
}

(三)newSingleThreadExecutor
只有一個(gè)線程(線程安全)

package com.macro.boot.javaBuiltThreadPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class MyNewSingleThreadExecutor {
    public static void main(String[] args) throws InterruptedException {
/*        ExecutorService es = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            es.submit(new MyRunnable(i));
        }*/
        test();
    }

    private static void test() throws InterruptedException {
        ExecutorService es = Executors.newSingleThreadExecutor(new ThreadFactory() {
            int n = 1;

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "線程" + n++);
            }
        });
        for (int i = 0; i < 10; i++) {
            Thread.sleep(100);
            es.submit(new MyRunnable(i));
        }
    }
}

(四)newScheduledThreadPool
怎么理解這個(gè)線程池的延遲時(shí)間?很簡(jiǎn)單忆绰,第一次執(zhí)行的開(kāi)始時(shí)間浩峡,加上延遲的時(shí)間,就是第二次執(zhí)行的時(shí)間错敢。

package com.macro.boot.ScheduledExecutorService;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class MyScheduledExecutor {
    public static void main(String[] args) {
        ScheduledExecutorService sec = Executors.newScheduledThreadPool(4);
        for (int i = 0; i < 10; i++) {
            sec.schedule(new MyRunnable(i), 1, TimeUnit.SECONDS);
        }
        System.out.println("開(kāi)始執(zhí)行翰灾。。稚茅。");
        sec.shutdown();
    }
}

class MyRunnable implements Runnable {
    private int id;

    @Override
    public String toString() {
        return "MyRunnable{" +
                "id=" + id +
                '}';
    }

    public MyRunnable(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println(name + "執(zhí)行了任務(wù)" + id);
    }
}

(五)newSingleThreadScheduledExecutor
newSingleThreadScheduledExecutor和newScheduledThreadPool的區(qū)別是纸淮,newSingleThreadScheduledExecutor的第二次執(zhí)行時(shí)間,等于第一次開(kāi)始執(zhí)行的時(shí)間亚享,加上執(zhí)行線程所耗費(fèi)的時(shí)間咽块,再加上延遲時(shí)間,即等于第二次執(zhí)行的時(shí)間虹蒋。

二糜芳、CompletionService
CompletionService是一個(gè)接口飒货。
當(dāng)我們使用ExecutorService啟動(dòng)多個(gè)Callable時(shí)魄衅,每個(gè)Callable返回一個(gè)Future,而當(dāng)我們執(zhí)行Future的get方法獲取結(jié)果時(shí)塘辅,會(huì)阻塞線程直到獲取結(jié)果晃虫。
而CompletionService正是為了解決這個(gè)問(wèn)題,它是Java8的新增接口扣墩,它的實(shí)現(xiàn)類是ExecutorCompletionService哲银。CompletionService會(huì)根據(jù)線程池中Task的執(zhí)行結(jié)果按執(zhí)行完成的先后順序排序,任務(wù)先完成的可優(yōu)先獲取到呻惕。
Code:

package com.macro.boot.completions;

import java.util.concurrent.*;

public class CompletionBoot {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 實(shí)例化線程池
        ExecutorService es = Executors.newCachedThreadPool();
        ExecutorCompletionService<Integer> ecs = new ExecutorCompletionService<>(es);

        for (int i = 0, j = 3; i < 20; i++) {
            ecs.submit(new CallableExample(i, j));
        }
        for (int i = 0; i < 20; i++) {
            // take:阻塞方法荆责,從結(jié)果隊(duì)列中獲取并移除一個(gè)已經(jīng)執(zhí)行完成的任務(wù)的結(jié)果,如果沒(méi)有就會(huì)阻塞亚脆,直到有任務(wù)完成返回結(jié)果做院。
            Integer integer = ecs.take().get();
            // 從結(jié)果隊(duì)列中獲取并移除一個(gè)已經(jīng)執(zhí)行完成的任務(wù)的結(jié)果,如果沒(méi)有就會(huì)返回null,該方法不會(huì)阻塞键耕。
            // Integer integer = ecs.poll().get();
            System.out.println(integer);
        }
        // 不要忘記關(guān)閉線程池
        es.shutdown();
    }
}
class CallableExample implements Callable<Integer> {
    /**
     * 使用構(gòu)造方法獲取變量
     * */
    private int a;
    private int b;

    public CallableExample(int a, int b) {
        this.a = a;
        this.b = b;
    }

    @Override
    public Integer call() throws Exception {
        return a + b;
    }

    @Override
    public String toString() {
        return "CallableExample{" +
                "a=" + a +
                ", b=" + b +
                '}';
    }
}

三寺滚、Runnable
Runnable和Callable兩者都是接口,但是也有區(qū)別:

實(shí)現(xiàn)Callable接口的任務(wù)線程能返回執(zhí)行結(jié)果屈雄;而實(shí)現(xiàn)Runnable接口的任務(wù)線程不能返回結(jié)果村视;(重點(diǎn))
Callable接口的call()方法允許拋出異常;而Runnable接口的run()方法的異常只能在內(nèi)部消化酒奶,不能繼續(xù)上拋蚁孔;
Code:

class MyRunnable02 implements Runnable {
    private int i;

    public MyRunnable02(int i) {
        this.i = i;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.println(name + "執(zhí)行了... ---> " + i);
    }

    @Override
    public String toString() {
        return "MyRunnable{" +
                "i=" + i +
                '}';
    }
}

四、Callable
Code:

class CallableExample implements Callable<Integer> {
    /**
     * 使用構(gòu)造方法獲取變量
     * */
    private int a;
    private int b;

    public CallableExample(int a, int b) {
        this.a = a;
        this.b = b;
    }

    @Override
    public Integer call() throws Exception {
        return a + b;
    }

    @Override
    public String toString() {
        return "CallableExample{" +
                "a=" + a +
                ", b=" + b +
                '}';
    }
}

五惋嚎、Example
本次Demo:使用線程池勒虾,循環(huán)查詢數(shù)據(jù)庫(kù)500次。
在最開(kāi)始的時(shí)候瘸彤,是使用ExecutorServer + Future.get(因?yàn)椴樵償?shù)據(jù)庫(kù)肯定需要獲取結(jié)果修然,所以必須要用Callable,并且get到結(jié)果集)质况。但是get的阻塞操作愕宋,實(shí)在是太影響速度了,雖然考慮了兩種手段去解決结榄,但是都不了了之中贝。
Code:(只貼線程池的代碼,線程類和獲取連接的類就不放了)

private void executorServerStart() throws SQLException, ClassNotFoundException, ExecutionException, InterruptedException {
        // get con
        TDConUtils tdConUtils = new TDConUtils();
        Connection con = tdConUtils.getCon();
        Statement statement = con.createStatement();

        // SQL
        String sql = "select last_row(value_double) from db1.tb1;";

        // ThreadPool
        ExecutorService es = Executors.newCachedThreadPool();

        // for each
        int count = 500;
        for (int i = 0; i < count; i++) {
            Future<ResultSet> submit = es.submit(new MyThread(i, con, sql));
            ResultSet resultSet = submit.get();
            // print
            while (resultSet.next()) {
                System.out.printf("輸出:時(shí)間:%s,值:%f \n", resultSet.getTimestamp(1)
                        , resultSet.getDouble(2));
            }
        }
        es.shutdown();

        // close resources
        tdConUtils.close(con, statement);
    }

運(yùn)行時(shí)間:8000ms +
改CompletionService:
Code:

private void completionServerStart() throws SQLException, ClassNotFoundException, InterruptedException, ExecutionException {
        // get con
        TDConUtils tdConUtils = new TDConUtils();
        Connection con = tdConUtils.getCon();
        Statement statement = con.createStatement();

        // SQL
        String sql = "select last_row(value_double) from db1.tb1;";

        // ThreadPool
        ExecutorService es = Executors.newCachedThreadPool();

        //構(gòu)建ExecutorCompletionService,與線程池關(guān)聯(lián)
        ExecutorCompletionService<ResultSet> ecs = new ExecutorCompletionService<ResultSet>(es);
        // for each
        int count = 500;

        for (int i = 0; i < count; i++) {
            ecs.submit(new MyThread(i, con, sql));
        }
        for (int i = 0; i < count; i++) {
            // 通過(guò)take獲取Future結(jié)果,此方法會(huì)阻塞
            ResultSet resultSet = ecs.take().get();
            while (resultSet.next()) {
                System.out.printf("輸出:時(shí)間:%s,值:%f \n", resultSet.getTimestamp(1)
                        , resultSet.getDouble(2));
            }
        }

        es.shutdown();
        tdConUtils.close(con, statement);
    }

運(yùn)行時(shí)間:300+ms

六臼朗、使用小結(jié)
分情況邻寿。
如果需要獲取結(jié)果:線程使用Callable;
如果需要異步獲取結(jié)果:線程池使用CompletionService视哑。
如果不需要獲取結(jié)果:線程使用Runnable绣否;
如果需要阻塞獲取結(jié)果:線程池使用ExecutorService。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末挡毅,一起剝皮案震驚了整個(gè)濱河市蒜撮,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌跪呈,老刑警劉巖段磨,帶你破解...
    沈念sama閱讀 206,214評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異耗绿,居然都是意外死亡苹支,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門误阻,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)债蜜,“玉大人琉用,你說(shuō)我怎么就攤上這事〔哂祝” “怎么了邑时?”我有些...
    開(kāi)封第一講書人閱讀 152,543評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)特姐。 經(jīng)常有香客問(wèn)我晶丘,道長(zhǎng),這世上最難降的妖魔是什么唐含? 我笑而不...
    開(kāi)封第一講書人閱讀 55,221評(píng)論 1 279
  • 正文 為了忘掉前任浅浮,我火速辦了婚禮,結(jié)果婚禮上捷枯,老公的妹妹穿的比我還像新娘滚秩。我一直安慰自己,他們只是感情好淮捆,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布郁油。 她就那樣靜靜地躺著,像睡著了一般攀痊。 火紅的嫁衣襯著肌膚如雪桐腌。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 49,007評(píng)論 1 284
  • 那天苟径,我揣著相機(jī)與錄音案站,去河邊找鬼。 笑死棘街,一個(gè)胖子當(dāng)著我的面吹牛蟆盐,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播遭殉,決...
    沈念sama閱讀 38,313評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼石挂,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了恩沽?” 一聲冷哼從身側(cè)響起誊稚,我...
    開(kāi)封第一講書人閱讀 36,956評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤翔始,失蹤者是張志新(化名)和其女友劉穎罗心,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體城瞎,經(jīng)...
    沈念sama閱讀 43,441評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡渤闷,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了脖镀。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片飒箭。...
    茶點(diǎn)故事閱讀 38,018評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出弦蹂,到底是詐尸還是另有隱情肩碟,我是刑警寧澤,帶...
    沈念sama閱讀 33,685評(píng)論 4 322
  • 正文 年R本政府宣布凸椿,位于F島的核電站削祈,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏脑漫。R本人自食惡果不足惜髓抑,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望优幸。 院中可真熱鬧吨拍,春花似錦、人聲如沸网杆。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,240評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)碳却。三九已至严里,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間追城,已是汗流浹背刹碾。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,464評(píng)論 1 261
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留座柱,地道東北人迷帜。 一個(gè)月前我還...
    沈念sama閱讀 45,467評(píng)論 2 352
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像色洞,于是被迫代替她去往敵國(guó)和親戏锹。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容