Java并發(fā) 之 線程池系列 (2) 使用ThreadPoolExecutor構(gòu)造線程池

threadpoolexecutor-example.jpg

Executors的“罪與罰”

在上一篇文章Java并發(fā) 之 線程池系列 (1) 讓多線程不再坑爹的線程池中务甥,我們介紹了使用JDK concurrent包下的工廠和工具類Executors來(lái)創(chuàng)建線程池常用的幾種方法:

//創(chuàng)建固定線程數(shù)量的線程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

//創(chuàng)建一個(gè)線程池,該線程池會(huì)根據(jù)需要?jiǎng)?chuàng)建新的線程,但如果之前創(chuàng)建的線程可以使用储耐,會(huì)重用之前創(chuàng)建的線程
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

//創(chuàng)建一個(gè)只有一個(gè)線程的線程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

誠(chéng)然础浮,這種創(chuàng)建線程池的方法非常簡(jiǎn)單和方便氧映。但仔細(xì)閱讀源碼柜蜈,卻把我嚇了一條: 這是要老子的命澳ピ琛约谈!

我們前面講過(guò)笔宿,如果有新的請(qǐng)求過(guò)來(lái)犁钟,在線程池中會(huì)創(chuàng)建新的線程處理這些任務(wù),一直創(chuàng)建到線程池的最大容量(Max Size)為止泼橘;超出線程池的最大容量的Tasks涝动,會(huì)被放入阻塞隊(duì)列(Blocking
Queue)進(jìn)行等待,知道有線程資源釋放出來(lái)為止炬灭;要知道的是醋粟,阻塞隊(duì)列也是有最大容量的,多余隊(duì)列最大容量的請(qǐng)求不光沒(méi)有獲得執(zhí)行的機(jī)會(huì)重归,連排隊(duì)的資格都沒(méi)有米愿!

那這些連排隊(duì)的資格都沒(méi)有的Tasks怎么處理呢?不要急鼻吮,后面在介紹ThreadPoolExecutor的拒絕處理策略(Handler Policies for Rejected Task)的時(shí)候會(huì)詳細(xì)介紹育苟。

說(shuō)到這里你也許有寫疑惑了,上面這些東西椎木,我通常使用Executors的時(shí)候沒(méi)有指定過(guò)啊违柏。是的,因?yàn)?code>Executors很“聰明”地幫我們做了這些事情香椎。

Executors的源碼

我們看下ExecutorsnewFixedThreadPoolnewSingleThreadExecutor方法的源碼:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(
        nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
     return new FinalizableDelegatedExecutorService
         (new ThreadPoolExecutor(
         1, 1,
         0L, TimeUnit.MILLISECONDS,
         new LinkedBlockingQueue<Runnable>()));
 }

其實(shí)它們底層還是通過(guò)ThreadPoolExecutor來(lái)創(chuàng)建ExecutorService的漱竖,這里對(duì)妻子的參數(shù)先不作介紹,下面會(huì)詳細(xì)講畜伐,這里只說(shuō)一下new LinkedBlockingQueue<Runnable>()這個(gè)參數(shù)闲孤。

LinkedBlockingQueue就是當(dāng)任務(wù)數(shù)大于線程池的線程數(shù)的時(shí)候的阻塞隊(duì)列,這里使用的是無(wú)參構(gòu)造烤礁,我們?cè)倏匆幌聵?gòu)造函數(shù):

/**
 * Creates a {@code LinkedBlockingQueue} with a capacity of
 * {@link Integer#MAX_VALUE}.
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

我們看到阻塞隊(duì)列的默認(rèn)大小竟然是Integer.MAX_VALUE讼积!

如果不做控制,拼命地往阻塞隊(duì)列里放Task脚仔,分分鐘“Out of Memory”扒谥凇!

還有更絕的鲤脏,newCachedThreadPool方法:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(
    0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
}

最大線程數(shù)默認(rèn)也是Integer.MAX_VALUE们颜,也就是說(shuō),如果之前的任務(wù)沒(méi)有執(zhí)行完就有新的任務(wù)進(jìn)來(lái)了猎醇,就會(huì)繼續(xù)創(chuàng)建新的線程窥突,指導(dǎo)創(chuàng)建到Integer.MAX_VALUE為止。

讓你的JVM OutOfMemoryError

下面提供一個(gè)使用newCachedThreadPool創(chuàng)建大量線程處理Tasks硫嘶,最終OutOfMemoryError的例子阻问。

友情提醒:場(chǎng)面過(guò)于血腥,請(qǐng)勿在生產(chǎn)環(huán)境使用沦疾。

package net.ijiangtao.tech.concurrent.jsd.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolExample2 {

    private static final ExecutorService executorService = Executors.newCachedThreadPool();

    private static class Task implements Runnable {
        @Override
        public void run() {
            try {
                Thread.sleep(1000 * 600);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static void newCachedThreadPoolTesterBadly() {
        System.out.println("begin............");
        for (int i = 0; i <= Integer.MAX_VALUE; i++) {
            executorService.execute(new Task());
        }
        System.out.println("end.");
    }

    public static void main(String[] args) {
        newCachedThreadPoolTesterBadly();
    }

}

當(dāng)main方法啟動(dòng)以后称近,打開控制面板第队,看到CPU和內(nèi)存幾乎已經(jīng)全部耗盡:

[圖片上傳失敗...(image-240e8d-1554126159952)]

很快控制臺(tái)就拋出了java.lang.OutOfMemoryError

begin............
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Thread.java:717)
    at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378)
    at net.ijiangtao.tech.concurrent.jsd.threadpool.ThreadPoolExample2.newCachedThreadPoolTesterBadly(ThreadPoolExample2.java:24)
    at net.ijiangtao.tech.concurrent.jsd.threadpool.ThreadPoolExample2.main(ThreadPoolExample2.java:30)

阿里巴巴Java開發(fā)手冊(cè)

下面我們?cè)诳碕ava開發(fā)手冊(cè)這條規(guī)定,應(yīng)該就明白作者的良苦用心了吧刨秆。

【強(qiáng)制】線程池不允許使用Executors去創(chuàng)建凳谦,而是通過(guò)ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學(xué)更加明確線程池的運(yùn)行規(guī)則衡未,規(guī)避資源耗盡的風(fēng)險(xiǎn)尸执。
說(shuō)明:
Executors返回的線程池對(duì)象的弊端如下:
1)FixedThreadPool和SingleThreadPool:允許的請(qǐng)求隊(duì)列長(zhǎng)度為Integer.MAX_VALUE,可能會(huì)堆積大量的請(qǐng)求缓醋,從而導(dǎo)致OOM如失。
2)CachedThreadPool和ScheduledThreadPool:允許的創(chuàng)建線程數(shù)量為Integer.MAX_VALUE,可能會(huì)創(chuàng)建大量的線程改衩,從而導(dǎo)致OOM岖常。

主角出場(chǎng)

解鈴還須系鈴人驯镊,其實(shí)避免這個(gè)OutOfMemoryError風(fēng)險(xiǎn)的鑰匙就藏在Executors的源碼里葫督,那就是自己直接使用ThreadPoolExecutor

ThreadPoolExecutor的構(gòu)造

構(gòu)造一個(gè)ThreadPoolExecutor需要蠻多參數(shù)的板惑。下面是ThreadPoolExecutor的構(gòu)造函數(shù)橄镜。

/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

下面就一一介紹一下這些參數(shù)的具體含義。

ThreadPoolExecutor構(gòu)造參數(shù)說(shuō)明

其實(shí)從源碼中的JavaDoc已經(jīng)可以很清晰地明白這些參數(shù)的含義了冯乘,下面照顧懶得看英文的同學(xué)洽胶,再解釋一下:

  • corePoolSize

線程池核心線程數(shù)。

默認(rèn)情況下核心線程會(huì)一直存活裆馒,即使處于閑置狀態(tài)也不會(huì)受存keepAliveTime限制姊氓,除非將allowCoreThreadTimeOut設(shè)置為true

  • maximumPoolSize

線程池所能容納的最大線程數(shù)喷好。超過(guò)maximumPoolSize的線程將被阻塞翔横。

最大線程數(shù)maximumPoolSize不能小于corePoolSize

  • keepAliveTime

非核心線程的閑置超時(shí)時(shí)間。

超過(guò)這個(gè)時(shí)間非核心線程就會(huì)被回收梗搅。

  • TimeUnit

keepAliveTime的時(shí)間單位禾唁,如TimeUnit.SECONDS。

當(dāng)將allowCoreThreadTimeOut為true時(shí)无切,對(duì)corePoolSize生效荡短。

  • workQueue

線程池中的任務(wù)隊(duì)列。

沒(méi)有獲得線程資源的任務(wù)將會(huì)被放入workQueue哆键,等待線程資源被釋放掘托。如果放入workQueue的任務(wù)數(shù)大于workQueue的容量,將由RejectedExecutionHandler的拒絕策略進(jìn)行處理籍嘹。

常用的有三種隊(duì)列:
SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue烫映。

  • threadFactory

提供創(chuàng)建新線程功能的線程工廠沼本。

ThreadFactory是一個(gè)接口,只有一個(gè)newThread方法:

Thread newThread(Runnable r);
  • rejectedExecutionHandler

無(wú)法被線程池處理的任務(wù)的處理器锭沟。

一般是因?yàn)槿蝿?wù)數(shù)超出了workQueue的容量抽兆。

當(dāng)一個(gè)任務(wù)被加入線程池時(shí)

總結(jié)一下,當(dāng)一個(gè)任務(wù)通過(guò)execute(Runnable)方法添加到線程池時(shí):

  1. 如果此時(shí)線程池中線程的數(shù)量小于corePoolSize族淮,即使線程池中的線程都處于空閑狀態(tài)辫红,也要?jiǎng)?chuàng)建新的線程來(lái)處理被添加的任務(wù)。

  2. 如果此時(shí)線程池中的數(shù)量等于corePoolSize祝辣,但是緩沖隊(duì)列workQueue未滿贴妻,那么任務(wù)被放入緩沖隊(duì)列。

  3. 如果此時(shí)線程池中的數(shù)量大于corePoolSize蝙斜,緩沖隊(duì)列workQueue滿名惩,并且線程池中的數(shù)量小于maximumPoolSize,建新的線程來(lái)處理被添加的任務(wù)孕荠。

  4. 如果此時(shí)線程池中的數(shù)量大于corePoolSize娩鹉,緩沖隊(duì)列workQueue滿,并且線程池中的數(shù)量等于maximumPoolSize稚伍,那么通過(guò) handler所指定的拒絕策略來(lái)處理此任務(wù)弯予。

處理任務(wù)的優(yōu)先級(jí)為:核心線程數(shù)(corePoolSize) > 任務(wù)隊(duì)列容量(workQueue) > 最大線程數(shù)(maximumPoolSize);如果三者都滿了,使用rejectedExecutionHandler處理被拒絕的任務(wù)个曙。

Thread_pool.png

ThreadPoolExecutor的使用

下面就通過(guò)一個(gè)簡(jiǎn)單的例子锈嫩,使用ThreadPoolExecutor構(gòu)造的線程池執(zhí)行任務(wù)。

ThreadPoolExample3

package net.ijiangtao.tech.concurrent.jsd.threadpool;

import java.time.LocalTime;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author ijiangtao.net
 */
public class ThreadPoolExample3 {

    private static final AtomicInteger threadNumber = new AtomicInteger(1);

    private static class Task implements Runnable {
        @Override
        public void run() {
            try {
                Thread.currentThread().sleep(2000);
                System.out.println(Thread.currentThread().getName() + "-" + LocalTime.now());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    private static class MyThreadFactory implements ThreadFactory {

        private final String namePrefix;

        public MyThreadFactory(String namePrefix) {
            this.namePrefix = namePrefix;
        }

        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, namePrefix + "-" + threadNumber.getAndIncrement());
        }

    }

    private static final ExecutorService executorService = new ThreadPoolExecutor(
            10,
            20, 30, TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(50),
            new MyThreadFactory("MyThreadFromPool"),
            new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) {

        // creates five tasks
        Task r1 = new Task();
        Task r2 = new Task();
        Task r3 = new Task();
        Task r4 = new Task();
        Task r5 = new Task();

        // submit方法有返回值
        Future future = executorService.submit(r1);
        System.out.println("r1 isDone ? " + future.isDone());

        // execute方法沒(méi)有返回值
        executorService.execute(r2);
        executorService.execute(r3);
        executorService.execute(r4);
        executorService.execute(r5);

        //關(guān)閉線程池
        executorService.shutdown();

    }

}

執(zhí)行結(jié)果

r1 isDone ? false
MyThreadFromPool-2-21:04:03.215
MyThreadFromPool-5-21:04:03.215
MyThreadFromPool-4-21:04:03.215
MyThreadFromPool-3-21:04:03.215
MyThreadFromPool-1-21:04:03.215

從結(jié)果看垦搬,從線程池取出了5個(gè)線程呼寸,并發(fā)執(zhí)行了5個(gè)任務(wù)。

總結(jié)

這一章我們介紹了一種更安全猴贰、更定制化的線程池構(gòu)建方式:ThreadPoolExecutor对雪。相信你以后不敢輕易使用Executors來(lái)構(gòu)造線程池了。

后面我們會(huì)介紹線程池的更多實(shí)現(xiàn)方式(例如使用Google核心庫(kù)Guava)糟趾,以及關(guān)于線程池的更多知識(shí)和實(shí)戰(zhàn)慌植。

Links

作者資源

相關(guān)資源

作者:濤哥 ( ijiangtao.net )
公眾號(hào):西召 ( westcall )
歡迎 評(píng)論、關(guān)注义郑、打賞蝶柿,轉(zhuǎn)發(fā)和點(diǎn)贊,你的鼓勵(lì)是我持續(xù)創(chuàng)作的動(dòng)力非驮。
濤哥這里交汤,干貨和濕貨,一應(yīng)俱全!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末芙扎,一起剝皮案震驚了整個(gè)濱河市星岗,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌戒洼,老刑警劉巖俏橘,帶你破解...
    沈念sama閱讀 217,185評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異圈浇,居然都是意外死亡寥掐,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門磷蜀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)召耘,“玉大人,你說(shuō)我怎么就攤上這事褐隆∥鬯” “怎么了?”我有些...
    開封第一講書人閱讀 163,524評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵庶弃,是天一觀的道長(zhǎng)衫贬。 經(jīng)常有香客問(wèn)我,道長(zhǎng)虫埂,這世上最難降的妖魔是什么祥山? 我笑而不...
    開封第一講書人閱讀 58,339評(píng)論 1 293
  • 正文 為了忘掉前任圃验,我火速辦了婚禮掉伏,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘澳窑。我一直安慰自己斧散,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評(píng)論 6 391
  • 文/花漫 我一把揭開白布摊聋。 她就那樣靜靜地躺著鸡捐,像睡著了一般。 火紅的嫁衣襯著肌膚如雪麻裁。 梳的紋絲不亂的頭發(fā)上箍镜,一...
    開封第一講書人閱讀 51,287評(píng)論 1 301
  • 那天,我揣著相機(jī)與錄音煎源,去河邊找鬼色迂。 笑死,一個(gè)胖子當(dāng)著我的面吹牛手销,可吹牛的內(nèi)容都是我干的歇僧。 我是一名探鬼主播,決...
    沈念sama閱讀 40,130評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼锋拖,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼诈悍!你這毒婦竟也來(lái)了祸轮?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,985評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤侥钳,失蹤者是張志新(化名)和其女友劉穎适袜,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體舷夺,經(jīng)...
    沈念sama閱讀 45,420評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡痪蝇,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了冕房。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片躏啰。...
    茶點(diǎn)故事閱讀 39,779評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖耙册,靈堂內(nèi)的尸體忽然破棺而出给僵,到底是詐尸還是另有隱情,我是刑警寧澤详拙,帶...
    沈念sama閱讀 35,477評(píng)論 5 345
  • 正文 年R本政府宣布帝际,位于F島的核電站,受9級(jí)特大地震影響饶辙,放射性物質(zhì)發(fā)生泄漏蹲诀。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評(píng)論 3 328
  • 文/蒙蒙 一弃揽、第九天 我趴在偏房一處隱蔽的房頂上張望脯爪。 院中可真熱鬧,春花似錦矿微、人聲如沸痕慢。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)掖举。三九已至,卻和暖如春娜庇,著一層夾襖步出監(jiān)牢的瞬間塔次,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工名秀, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留励负,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,876評(píng)論 2 370
  • 正文 我出身青樓泰偿,卻偏偏與公主長(zhǎng)得像熄守,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容