Java多線程:線程池

1. new Thread的弊端

  • 每次都使用new Thread()性能很差。
  • 線程缺乏統(tǒng)一管理浙炼。如線程數(shù)的管理弯屈。

2. 線程池

一種線程使用模式资厉。線程池維護著多個線程湘捎,等待著監(jiān)督管理者分配可并發(fā)執(zhí)行的任務(wù)消痛。這避免了在處理短時間任務(wù)時創(chuàng)建與銷毀線程的代價秩伞。

一般來說纱新,對于有N個CPU的主機(或N個核心)脸爱,線程池大小應(yīng)如下設(shè)置:

  • 如果是CPU密集型應(yīng)用,線程池大小為N+1族檬。
  • 如果是IO密集型應(yīng)用单料,線程池大小為2N+1扫尖。

3. 線程池的優(yōu)勢

  • 重用存在的線程,省去線程的創(chuàng)建銷毀過程沉颂,性能佳。
  • 有效控制最大并發(fā)線程數(shù)塞关。提高了使用率并避免了競爭帆赢。
  • 定時執(zhí)行怠益,定期執(zhí)行蜻牢,單線程抢呆,并發(fā)控制等功能。

4. Executors

Java通過Executors類提供四種線程池恳邀。創(chuàng)建方法為靜態(tài)方式創(chuàng)建谣沸。

4.1. ExecutorService

繼承了Executor類,在其基礎(chǔ)上進行具體的擴展秉版。

4.2. ThreadPoolExecutor

ThreadPoolExecutor是ExecutorService類的子樹上的類,是ExecutorService類提供的四個主要線程池方法的實現(xiàn)類滚停,其完整構(gòu)造器包括以下參數(shù):

  • corePoolSize:線程池中核心線程數(shù)的最大數(shù)值。核心線程:線程池新建線程的時候,如果當(dāng)前線程總數(shù)小于corePoolSize问词,則新建的是核心線程激挪,如果超過corePoolSize,則新建的是非核心線程锋喜。核心線程默認情況下會一直存活在線程池中嘿般,即使這個核心線程閑置炉奴。如果指定ThreadPoolExecutor的allowCoreThreadTimeOut這個屬性為true,那么核心線程如果不干活(閑置狀態(tài))的話砸逊,超過一定時間(時長下面參數(shù)決定)师逸,就會被銷毀掉。
  • maximumPoolSize:池中允許的最大線程總數(shù)员辩〉旎縣城總數(shù)=核心線程+非核心線程。非核心線程在閑置時會被銷毀弃甥。
  • keepAliveTime:非核心線程閑置超時時長淆攻,若allowCoreThreadTimeOut這個屬性為true,核心線程也會被影響伞芹。
  • unit:keepAliveTime參數(shù)的時間單位唱较。
  • workQueue:執(zhí)行前用于保持任務(wù)的隊列南缓。此隊列僅保持由 execute方法提交的 Runnable任務(wù)收捣,可在Java容器:Stack童漩,Queue侧馅,PriorityQueue和BlockingQueue一文中查詢济欢。
  • threadFactory:執(zhí)行程序創(chuàng)建新線程時使用的工廠,可用于定義創(chuàng)建現(xiàn)成的方式,一般無用蔚舀。
  • handler:由于超出線程范圍和隊列容量而使執(zhí)行被阻塞時所使用的處理程序缅叠,存在默認方案鸥鹉。

(繼承關(guān)系Executor-ExecutorService-AbstractExecutorService-ThreadPoolExecutor)

而接下來介紹的幾種方法,其實即是預(yù)定義的ThreadPoolExecutor喇聊。

4.3. CachedThreadPool

創(chuàng)建一個可緩存線程池邻遏,線程池長度超過處理需要時垂寥,可靈活回收空閑線程律杠,若無可回收線程則新建線程讼撒。

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
    final int index = i;
    try {
        Thread.sleep(index * 1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    cachedThreadPool.execute(new Runnable() {
        @Override
        public void run() {
            System.out.println(index);
        }
    });
}

其實現(xiàn)代碼如下:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

可見,該方法中所有線程均由SynchronousQueue管理,且不設(shè)置線程數(shù)量上限担映。對于SynchronousQueue朋魔,每個插入線程必須等待另一線程的對應(yīng)移除操作。(即該隊列沒有容量哨苛,僅試圖取得元素時元素才存在)因而祝懂,該方法實現(xiàn)了隔躲,如果有線程空閑组力,則使用空閑線程進行操作家夺,否則就會創(chuàng)建新線程日川。

4.4. FixedThreadPool

創(chuàng)建一個定長線程池傀蓉,可以控制最大并發(fā)數(shù),超出的線程會在隊列中等待。

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
    final int index = i;
    fixedThreadPool.execute(new Runnable() {

        @Override
        public void run() {
            try {
                System.out.println(index);
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    });
}

其實現(xiàn)代碼如下:

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

可見該方法讓keepAliveTime為0,即限制了線程數(shù)必須小于等于corePoolSize。而多出的線程則會被無界隊列所存儲,在其中排隊。

4.5. ScheduledThreadPool

創(chuàng)建一個定長線程池窗看,相對于FixedThreadPool,它支持周期性執(zhí)行和延期執(zhí)行。

延期3秒執(zhí)行

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {

    @Override
    public void run() {
        System.out.println("delay 3 seconds");
    }
}, 3, TimeUnit.SECONDS);

每三秒隔一秒執(zhí)行

scheduledThreadPool.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        System.out.println("delay 1 seconds, and excute every 3 seconds");
    }
}, 1, 3, TimeUnit.SECONDS);

和FixedThreadPool的最大不同是页滚,它采用一個DelayedWorkQueue去控制線程,該隊列僅有到期時才能取出元素。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

4.6. SingleThreadExecuter

創(chuàng)建一個單線程線程池讥脐,只會用唯一的工作線程執(zhí)行任務(wù)枪蘑,保證所有任務(wù)按FIFO赦役,LIFO的優(yōu)先級執(zhí)行。

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
    final int index = i;
    singleThreadExecutor.execute(new Runnable() {

        @Override
        public void run() {
            try {
                System.out.println(index);
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    });
}

在實現(xiàn)上,其相當(dāng)于一個線程數(shù)為1的FixedThreadPool

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

4.7. ForkJoinPool

ForkJoinPool是JDK7中引用的特殊的新的線程池,其核心思想類似于MapReduce误窖,將大任務(wù)拆分成多個小任務(wù)(fork)丙唧,再將多個小任務(wù)匯集到結(jié)果上(join)北苟,同時妆档,他通過繼承了AbstractExecutorService獲得了基礎(chǔ)的線程池功能,可以像普通線程池一樣配置。其工作模式如圖:

image

普通的線程池中每個任務(wù)都由單獨的線程處理,如果出現(xiàn)一個耗時比較大的任務(wù)糜俗,可能出現(xiàn)線程池中只有一個線程在進行這個任務(wù)啤挎,其他線程卻空閑著伙判,所謂“一核有難,八核圍觀”,造成了CPU負載不均衡邮利。ForkJoinPool為解決這種問題提出方庭,在ForkJoinPool中龄减,引入了工作竊取算法希停,其核心思想為:

  • 每個線程有自己的工作隊列(WorkQueue)宠能,該隊列是一個雙向鏈表(Java的WorkQueue結(jié)構(gòu)中用ArrayList實現(xiàn))
  • 隊列所有者線程可以調(diào)用雙鏈表的push/pop(取頭取尾根據(jù)模式?jīng)Q定)方法,其他線程可以調(diào)用該隊列poll(取尾取頭根據(jù)模式?jīng)Q定)方法羞延,push/poll/pop均引入CAS伴箩,為原子操作。
  • 劃分的子任務(wù)調(diào)用fork時彤恶,會把任務(wù)push到自己的隊列中。
  • 默認情況,工作線程從自己的隊列pop任務(wù)并執(zhí)行虹菲。
  • 自己隊列為空损合,線程隨機從另一線程poll任務(wù)并執(zhí)行胳嘲。

隊列結(jié)構(gòu)

/**
 * Queues supporting work-stealing as well as external task
 * submission. See above for descriptions and algorithms.
 */
public class WorkQueue {
    volatile int source;       // source queue id, or sentinel
    int id;                    // pool index, mode, tag
    int base;                  // index of next slot for poll
    int top;                   // index of next slot for push
    volatile int phase;        // versioned, negative: queued, 1: locked
    int stackPred;             // pool stack (ctl) predecessor link
    int nsteals;               // number of steals
    ForkJoinTask<?>[] array;   // the queued tasks; power of 2 size
    final ForkJoinPool pool;   // the containing pool (may be null)
    final ForkJoinWorkerThread owner; // owning thread or null if shared
    //......
}

Java8中在Executors里也加入了新增ForkJoinPool的方法径玖,讓它像普通線程池一樣工作匿垄,創(chuàng)建的ForJoinPool任務(wù)是FIFO的滴劲。

    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

該方法也可以帶參數(shù),決定parallelism。

此外贾虽,還可以使用ForkJoinPool內(nèi)部已經(jīng)初始化好的commonPool:

        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

當(dāng)然蟆技,你可以直接調(diào)用構(gòu)造方法來創(chuàng)建ForkJoinPool谐丢,其完整參數(shù)如下右蕊,解釋見注釋贪磺。

    public ForkJoinPool(int parallelism,//并行化層數(shù)制妄,默認為可用CPU處理器數(shù)振愿。
                        ForkJoinWorkerThreadFactory factory,//threadFactory,前文提過毁兆,無視薇组。
                        UncaughtExceptionHandler handler,//handler,前文提過菩暗,無視。
                        boolean asyncMode,//控制workQueue工作模式佑稠,若為true讶坯,則任務(wù)FIFO從base取任務(wù),默認為false婉烟,任務(wù)LIFO,從top取任務(wù)昙衅。
                        int corePoolSize,//核心線程數(shù)而涉,通常和parallelism數(shù)量一致材原。設(shè)置較大可以降低動態(tài)開銷余蟹,如果任務(wù)中經(jīng)常有阻塞,建議設(shè)置為小值兼搏,比如默認值0裳朋。
                        int maximumPoolSize,//最大線程數(shù)
                        int minimumRunnable,//允許的最小的不被join操作阻塞的線程數(shù)鲤嫡。默認值為1惕耕。
                        Predicate<? super ForkJoinPool> saturate,//若不為空則可能創(chuàng)建超過最大線程數(shù)的線程數(shù)。
                        long keepAliveTime,//非核心線程閑置時長挤安。
                        TimeUnit unit)//keepAliveTime的單位。

由于構(gòu)造器重載围肥,多個參數(shù)可缺省怨愤。

根據(jù)工作模式不同撰洗,WorkQueue取元素模型如下:

image

當(dāng)不把ForkJoinPool作為簡單線程池使用時猪勇,使用ForkJoinPool泣刹,需要構(gòu)建ForkJoinTask對象到ForJoinPool中外冀,F(xiàn)orkJoinTask有三個核心方法:

  • fork():用于任務(wù)分治,調(diào)用子任務(wù)fork()可以將任務(wù)放到線程池異步調(diào)用脑沿。
  • join():調(diào)用子任務(wù)的join()方法等待返回的結(jié)果,不受中斷機制影響措近。join()會拋出異常,若不需要可以使用quietlyJoin()并用getExecption()或getRawResult()自己處理異常和結(jié)果凰浮。
  • invoke():在當(dāng)前線程同步執(zhí)行該任務(wù)菜拓,該方法不受中斷機制影響。

ForkJoinTask實現(xiàn)了Future接口贱鄙,內(nèi)部維護四個狀態(tài)并提供查詢API

  • isCancelled() => CANCELLED
  • isCompletedAbnormally => status < NORMAL => CANCELLED || EXCEPTIONAL
  • isCompletedNormally => NORMAL
  • isDone() => status<0 => NORMAL || CANCELLED || EXCEPTIONAL

通常情況我們使用ForkJoinTask的兩個子類

  • RecursiveAction:沒有返回值的任務(wù)
  • RecursiveTask:有返回值的任務(wù)

兩個例子,轉(zhuǎn)載自ForkJoinPool入門篇

使用RecursiveAction

public class RecursiveActionTest {
    static class Sorter extends RecursiveAction {
        public static void sort(long[] array) {
            ForkJoinPool.commonPool().invoke(new Sorter(array, 0, array.length));
        }

        private final long[] array;
        private final int lo, hi;

        private Sorter(long[] array, int lo, int hi) {
            this.array = array;
            this.lo = lo;
            this.hi = hi;
        }

        private static final int THRESHOLD = 1000;

        protected void compute() {
            // 數(shù)組長度小于1000直接排序
            if (hi - lo < THRESHOLD)
                Arrays.sort(array, lo, hi);
            else {
                int mid = (lo + hi) >>> 1;
                // 數(shù)組長度大于1000梦湘,將數(shù)組平分為兩份
                // 由兩個子任務(wù)進行排序
                Sorter left = new Sorter(array, lo, mid);
                Sorter right = new Sorter(array, mid, hi);
                invokeAll(left, right);
                // 排序完成后合并排序結(jié)果
                merge(lo, mid, hi);
            }
        }

        private void merge(int lo, int mid, int hi) {
            long[] buf = Arrays.copyOfRange(array, lo, mid);
            for (int i = 0, j = lo, k = mid; i < buf.length; j++) {
                if (k == hi || buf[i] < array[k]) {
                    array[j] = buf[i++];
                } else {
                    array[j] = array[k++];
                }
            }
        }
    }

    public static void main(String[] args) {
        long[] array = new Random().longs(100_0000).toArray();
        Sorter.sort(array);
        System.out.println(Arrays.toString(array));
    }
}

使用RecurisiveTask

public class RecursiveTaskTest {
    static class Sum extends RecursiveTask<Long> {
        public static long sum(int[] array) {
            return ForkJoinPool.commonPool().invoke(new Sum(array, 0, array.length));
        }

        private final int[] array;
        private final int lo, hi;

        private Sum(int[] array, int lo, int hi) {
            this.array = array;
            this.lo = lo;
            this.hi = hi;
        }

        private static final int THRESHOLD = 600;

        @Override
        protected Long compute() {
            if (hi - lo < THRESHOLD) {
                return sumSequentially();
            } else {
                int middle = (lo + hi) >>> 1;
                Sum left = new Sum(array, lo, middle);
                Sum right = new Sum(array, middle, hi);
                right.fork();
                long leftAns = left.compute();
                long rightAns = right.join();
                // 注意subTask2.fork要在subTask1.compute之前
                // 因為這里的subTask1.compute實際上是同步計算的
                return leftAns + rightAns;
            }
        }

        private long sumSequentially() {
            long sum = 0;
            for (int i = lo; i < hi; i++) {
                sum += array[i];
            }
            return sum;
        }
    }

    public static void main(String[] args) {
        int[] array = IntStream.rangeClosed(1, 100_0000).toArray();
        Long sum = Sum.sum(array);
        System.out.println(sum);
    }
}

5. 參考文章

Java(Android)線程池
Java線程池使用說明
線程池倦逐,這一篇或許就夠了
ForkJoinPool入門篇

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末守谓,一起剝皮案震驚了整個濱河市斋荞,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蜈彼,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,591評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件俺驶,死亡現(xiàn)場離奇詭異幸逆,居然都是意外死亡,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評論 3 392
  • 文/潘曉璐 我一進店門还绘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人拍顷,你說我怎么就攤上這事抚太。” “怎么了昔案?”我有些...
    開封第一講書人閱讀 162,823評論 0 353
  • 文/不壞的土叔 我叫張陵尿贫,是天一觀的道長胶哲。 經(jīng)常有香客問我锥咸,道長,這世上最難降的妖魔是什么蒜危? 我笑而不...
    開封第一講書人閱讀 58,204評論 1 292
  • 正文 為了忘掉前任呼伸,我火速辦了婚禮身冀,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘括享。我一直安慰自己搂根,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,228評論 6 388
  • 文/花漫 我一把揭開白布铃辖。 她就那樣靜靜地躺著剩愧,像睡著了一般。 火紅的嫁衣襯著肌膚如雪娇斩。 梳的紋絲不亂的頭發(fā)上仁卷,一...
    開封第一講書人閱讀 51,190評論 1 299
  • 那天,我揣著相機與錄音犬第,去河邊找鬼锦积。 笑死,一個胖子當(dāng)著我的面吹牛歉嗓,可吹牛的內(nèi)容都是我干的丰介。 我是一名探鬼主播,決...
    沈念sama閱讀 40,078評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼鉴分,長吁一口氣:“原來是場噩夢啊……” “哼哮幢!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起志珍,我...
    開封第一講書人閱讀 38,923評論 0 274
  • 序言:老撾萬榮一對情侶失蹤橙垢,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后伦糯,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體柜某,經(jīng)...
    沈念sama閱讀 45,334評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡嗽元,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,550評論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了莺琳。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片还棱。...
    茶點故事閱讀 39,727評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖惭等,靈堂內(nèi)的尸體忽然破棺而出珍手,到底是詐尸還是另有隱情,我是刑警寧澤辞做,帶...
    沈念sama閱讀 35,428評論 5 343
  • 正文 年R本政府宣布琳要,位于F島的核電站,受9級特大地震影響秤茅,放射性物質(zhì)發(fā)生泄漏稚补。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,022評論 3 326
  • 文/蒙蒙 一框喳、第九天 我趴在偏房一處隱蔽的房頂上張望课幕。 院中可真熱鬧,春花似錦五垮、人聲如沸乍惊。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽润绎。三九已至,卻和暖如春诞挨,著一層夾襖步出監(jiān)牢的瞬間莉撇,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評論 1 269
  • 我被黑心中介騙來泰國打工惶傻, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留棍郎,地道東北人。 一個月前我還...
    沈念sama閱讀 47,734評論 2 368
  • 正文 我出身青樓达罗,卻偏偏與公主長得像坝撑,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子粮揉,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,619評論 2 354

推薦閱讀更多精彩內(nèi)容