深入理解ThreadPoolExecutor

線程池不允許使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式肥隆,這樣的處理方式讓寫的同學(xué)更加明確線程池的運行規(guī)則,規(guī)避資源耗盡的風(fēng)險待笑。


Executors返回的線程池對象的弊端如下:
1)FixedThreadPoolSingleThreadPool:
??允許的請求隊列長度為Integer.MAX_VALUE絮记,可能會堆積大量的請求摔踱,從而導(dǎo)致OOM。
2)CachedThreadPoolScheduledThreadPool:
??允許的創(chuàng)建線程數(shù)量為Integer.MAX_VALUE怨愤,可能會創(chuàng)建大量的線程派敷,從而導(dǎo)致OOM。

以前代碼里面 Executors使用的多一些撰洗,ThreadPoolExecutor也有使用篮愉,但是對其原理和代碼了解的卻不多。最近通過看源碼了赵,了解了一下。

ThreadPoolExecutor的創(chuàng)建

可以通過調(diào)用構(gòu)造函數(shù)來創(chuàng)建一個線程池甸赃。它有4個構(gòu)造函數(shù)柿汛,其中參數(shù)最全的是下面這個

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

具體解釋一下上述參數(shù):

  • corePoolSize 線程池核心線程數(shù)大小,初始化是核心線程數(shù)也是0埠对,除非先調(diào)用prestartCoreThread或者prestartAllCoreThreads先創(chuàng)建核心線程络断;在沒有設(shè)置allowCoreThreadTimeOut為true情況下,核心線程不會銷毀
  • maximumPoolSize 線程池線程數(shù)最大值项玛,達(dá)到最大值后線程池不會再增加線程執(zhí)行任務(wù)
  • keepAliveTime 線程池空閑時貌笨,線程存活的時間
  • TimeUnit 時間單位
  • ThreadFactory 線程工廠
  • BlockingQueue 任務(wù)隊列
  • RejectedExecutionHandler 任務(wù)拒絕策略;負(fù)責(zé)處理當(dāng)線程飽后襟沮、線程池正在關(guān)閉時的新提交任務(wù)锥惋;

ThreadPoolExecutor內(nèi)部有實現(xiàn)4個拒絕策略:

  • (1)、CallerRunsPolicy开伏,由調(diào)用execute方法提交任務(wù)的線程來執(zhí)行這個任務(wù)膀跌;
  • (2)、AbortPolicy固灵,拋出異常RejectedExecutionException拒絕提交任務(wù)捅伤;
  • (3)、DiscardPolicy巫玻,直接拋棄任務(wù)丛忆,不做任何處理;
  • (4)仍秤、DiscardOldestPolicy熄诡,去除任務(wù)隊列中的第一個任務(wù),重新提交诗力;

一個變量控制線程狀態(tài)和線程池容量

ThreadPoolExecutor有如下 5種狀態(tài):

    private static final int RUNNING    = -1 << COUNT_BITS;
    
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    

其中

private static final int COUNT_BITS = Integer.SIZE - 3;  //Integer.SIZE=32

所以COUNT_BITS=29粮彤。
實際上線程池的5種狀態(tài)是使用Integer的高三位。其10機制數(shù)分別是

RUNNING=111

SHUTDOWN=000

STOP=001

TIDYING=010

TERMINATED=110

這樣線程池的狀態(tài)和線程數(shù)量就盡由一個變量存儲:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //使用AtomicInteger 當(dāng)然是為了保證多線程同步問題

ctl 可以理解為control(控制),初始值為線程數(shù)0导坟,狀態(tài)RUNNING屿良;

線程池的執(zhí)行

線程池執(zhí)行一個線程有兩種方法: execute 和submit。下面看一下execute惫周。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

通過代碼可以看出流程圖如下:


  • 1尘惧、調(diào)用ThreadPoolExecutor的execute提交線程,首先檢查CorePool递递,如果CorePool內(nèi)的線程小于CorePoolSize喷橙,新創(chuàng)建線程執(zhí)行任務(wù)。
  • 2登舞、如果當(dāng)前CorePool內(nèi)的線程大于等于CorePoolSize贰逾,那么將線程加入到BlockingQueue。
  • 3菠秒、如果不能加入BlockingQueue疙剑,在小于MaxPoolSize的情況下創(chuàng)建線程執(zhí)行任務(wù)。
  • 4践叠、如果線程數(shù)大于等于MaxPoolSize言缤,那么執(zhí)行拒絕策略。

核心方法addWorker

 /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

代碼中首先進(jìn)行一次線程池狀態(tài)的檢測

 if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

這段代碼初看有點難懂禁灼。如果把代碼改成如下可能會好理解一些

    if (rs >= SHUTDOWN &&
                (rs == !SHUTDOWN ||
                   firstTask == !null ||
                    workQueue.isEmpty()))
                   {
                        return false;

                   }
                

大致邏輯是:

  • 如果rs>=SHUTDOWN,同時不等于SHUTDOWN管挟,即為SHUTDOWN以上的狀態(tài),那么不接受新線程弄捕。
  • 如果rs>=SHUTDOWN僻孝,同時等于SHUTDOWN(其實就是SHUTDOWN),同時firstTask守谓!=null皮璧,那么拒絕新線程;
  • 如果rs>=SHUTDOWN分飞,同時等于SHUTDOWN(其實就是SHUTDOWN)悴务,同時firstTask ==null,那么可能是新增加線程消耗Queue中的線程譬猫。但是同時還要檢測workQueue是否isEmpty()讯檐,如果為Empty,那么隊列已空染服,不需要增加消耗線程别洪,如果隊列沒有空那么運行增加first=null的Worker。

從這里是可以總結(jié)出線程池增加線程的策略:

首先柳刮,在rs>SHUTDOWN時挖垛,拒絕一切線程的增加痒钝,因為STOP是會終止所有的線程,同時移除Queue中所有的待執(zhí)行的線程的痢毒,所以也不需要增加first=null的Worker了;

其次送矩,在SHUTDOWN狀態(tài)時,是不能增加first哪替!=null的Worker的栋荸,同時即使first=null,但是此時Queue為Empty也是不允許增加Worker的凭舶,SHUTDOWN下增加的Worker主要用于消耗Queue中的任務(wù)晌块。
SHUTDOWN狀態(tài)時,是不允許向workQueue中增加線程的帅霜,isRunning(c) && workQueue.offer(command) 每次在offer之前都要做狀態(tài)檢測匆背,也就是線程池狀態(tài)變?yōu)?gt;=SHUTDOWN時不允許新線程進(jìn)入線程池了。

通過上面的分析身冀,對線程池的運行基本了解了钝尸。后續(xù)會補充一下線程狀態(tài)的切換。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末闽铐,一起剝皮案震驚了整個濱河市蝶怔,隨后出現(xiàn)的幾起案子奶浦,更是在濱河造成了極大的恐慌兄墅,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件澳叉,死亡現(xiàn)場離奇詭異隙咸,居然都是意外死亡,警方通過查閱死者的電腦和手機成洗,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進(jìn)店門五督,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人瓶殃,你說我怎么就攤上這事充包。” “怎么了遥椿?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵基矮,是天一觀的道長。 經(jīng)常有香客問我冠场,道長家浇,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任碴裙,我火速辦了婚禮钢悲,結(jié)果婚禮上点额,老公的妹妹穿的比我還像新娘。我一直安慰自己莺琳,他們只是感情好还棱,可當(dāng)我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著芦昔,像睡著了一般诱贿。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上咕缎,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天珠十,我揣著相機與錄音,去河邊找鬼凭豪。 笑死焙蹭,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的嫂伞。 我是一名探鬼主播孔厉,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼帖努!你這毒婦竟也來了撰豺?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤拼余,失蹤者是張志新(化名)和其女友劉穎污桦,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體匙监,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡凡橱,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了亭姥。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片稼钩。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖达罗,靈堂內(nèi)的尸體忽然破棺而出坝撑,到底是詐尸還是另有隱情,我是刑警寧澤粮揉,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布巡李,位于F島的核電站,受9級特大地震影響滔蝉,放射性物質(zhì)發(fā)生泄漏击儡。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一蝠引、第九天 我趴在偏房一處隱蔽的房頂上張望阳谍。 院中可真熱鬧蛀柴,春花似錦、人聲如沸矫夯。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽训貌。三九已至制肮,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間递沪,已是汗流浹背豺鼻。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留款慨,地道東北人儒飒。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像檩奠,于是被迫代替她去往敵國和親桩了。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,927評論 2 355

推薦閱讀更多精彩內(nèi)容