通過了解RejectedExecutionException來分析ThreadPoolExecutor源碼

觀看本文章之前遏考,最好看一下這篇文章熟悉下ThreadPoolExecutor基礎(chǔ)知識(shí)殴蓬。
1.關(guān)于Java多線程的一些常考知識(shí)點(diǎn)
2.看ThreadPoolExecutor源碼前的騷操作


講解本篇文章從下面一個(gè)例子開始,test1()和test2()方法都會(huì)拋出RejectedExecutionException異常趟章,ThreadPoolExecutor默認(rèn)的拒絕任務(wù)策略是AbortPolicy。test1()中線程池中corePoolSize和maximumPoolSize都為2慎王,阻塞隊(duì)列的長度是10蚓土,線程池最多能處理12個(gè)任務(wù)。當(dāng)超過12個(gè)任務(wù)時(shí)赖淤,就會(huì)拒絕新的任務(wù)蜀漆,拋出RejectedExecutionException。而test2()中的任務(wù)沒有超過線程池的閥值咱旱,但是在線程池調(diào)用shutdown()后嗜愈,線程池的狀態(tài)會(huì)變成shutdown,此時(shí)不接收新任務(wù)莽龟,但會(huì)處理正在運(yùn)行的任務(wù)和在阻塞隊(duì)列中等待處理的任務(wù)蠕嫁。所以我們?cè)趕hutdown()之后再調(diào)用submit(),會(huì)拋出RejectedExecutionException異常毯盈。有了這個(gè)例子的基礎(chǔ)剃毒,我們?cè)賮矸治鲈创a,會(huì)好過一點(diǎn)搂赋。

/**
 * @author cmazxiaoma
 * @version V1.0
 * @Description: 分析拋出RejectedExecutionException問題
 * @date 2018/8/16 14:35
 */
public class RejectedExecutionExceptionTest {

    public static void main(String[] args) {
//        test1();
        test2();
    }

    /**
     * 提交的任務(wù)數(shù)量超過其本身最大能處理的任務(wù)量
     */
    public static void test1() {
        CustomThreadPoolExecutor customThreadPoolExecutor =
                new CustomThreadPoolExecutor(2, 2,
                        0L,
                        TimeUnit.SECONDS,
                        new ArrayBlockingQueue<Runnable>(10));

        for (int i = 0; i < 13; i++) {
            CustomThreadPoolExecutor.CustomTask customTask
                    = new CustomThreadPoolExecutor.CustomTask(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(60 * 60);
                        System.out.println("線程" + Thread.currentThread().getName()
                                + "正在執(zhí)行...");
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }, "success");

            if (i == 12) {
                // throw RejectedExectionException
                customThreadPoolExecutor.submit(customTask);
            } else {
                customThreadPoolExecutor.submit(customTask);
            }
        }
        customThreadPoolExecutor.shutdown();
    }

    /**
     * 當(dāng)線程池shutdown()后赘阀,會(huì)中斷空閑線程。但是正在運(yùn)行的線程和處于阻塞隊(duì)列等待執(zhí)行的線程不會(huì)中斷脑奠。
     * shutdown(),不會(huì)接收新的線程基公。
     */
    public static void test2() {
        CustomThreadPoolExecutor customThreadPoolExecutor =
                new CustomThreadPoolExecutor(2, 2,
                        0L,
                        TimeUnit.SECONDS,
                        new ArrayBlockingQueue<Runnable>(10));

        for (int i = 0; i < 2; i++) {
            CustomThreadPoolExecutor.CustomTask customTask
                    = new CustomThreadPoolExecutor.CustomTask(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(60 * 60);
                        System.out.println("線程" + Thread.currentThread().getName()
                                + "正在執(zhí)行...");
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }, "success");
            customThreadPoolExecutor.submit(customTask);
        }
        customThreadPoolExecutor.shutdown();

        CustomThreadPoolExecutor.CustomTask customTask
                = new CustomThreadPoolExecutor.CustomTask(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(60 * 60);
                    System.out.println("線程" + Thread.currentThread().getName()
                            + "正在執(zhí)行...");
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            }
        }, "success");

        customThreadPoolExecutor.submit(customTask);
    }

}

源碼分析

線程池執(zhí)行過程

關(guān)于線程池執(zhí)行過程,我們看下面一幅圖宋欺,就能明白個(gè)大概轰豆。
1.當(dāng)線程池中的線程數(shù)量小于corePoolSize,就會(huì)創(chuàng)建新的線程來處理添加的任務(wù)直至線程數(shù)量等于corePoolSize胰伍。

2.當(dāng)線程池中的線程數(shù)量大于等于corePoolSize且阻塞隊(duì)列(workQueue)未滿,就會(huì)把新添加的任務(wù)放到阻塞隊(duì)列中酸休。

3.當(dāng)線程池中的線程數(shù)量大于等于corePoolSize且阻塞隊(duì)列滿了骂租,就會(huì)創(chuàng)建線程來處理添加的任務(wù)直到線程數(shù)量等于maximumPoolSize

4.如果線程池的數(shù)量大于maximumPoolSize,會(huì)根據(jù)RejectedExecutionHandler策略來拒絕任務(wù)。AbortPolicy就是其中的一種拒絕任務(wù)策略斑司。


線程池執(zhí)行過程(圖來自于網(wǎng)絡(luò)).png

submit

submit()相比于execute()而言渗饮,多了RunnableFuture<Void> ftask = newTaskFor(task, null);這一步,把task包裝成RunnableFuture類型的ftask宿刮。所以submit()有返回值互站,返回值類型是Future<?>,可以通過get()獲取線程執(zhí)行完畢后返回的值。還可以通過isDone()僵缺、isCancelled()云茸、cancel(boolean mayInterruptIfRunning)這些方法進(jìn)行某些操作。比如判斷線程是否執(zhí)行完畢谤饭、判斷線程是否被取消标捺,顯式取消啟動(dòng)的線程的操作。

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

execute

線程池去處理被提交的任務(wù)揉抵,很明顯通過execute()方法提交的任務(wù)必須要實(shí)現(xiàn)Runnable接口亡容。

我們來仔細(xì)看下execute()注釋,發(fā)現(xiàn)它說到:如果任務(wù)不能被成功提交得到執(zhí)行冤今,因?yàn)榫€程池已經(jīng)處于shutdown狀態(tài)或者是任務(wù)數(shù)量已經(jīng)達(dá)到容器上限闺兢,任務(wù)會(huì)被RejectedExecutionHandler處理進(jìn)行拒絕操作。很明顯戏罢,注釋已經(jīng)告訴上文拋出RejectedExecutionException異常的答案了屋谭。有時(shí)候真的要仔細(xì)看注釋!9旮狻桐磁!多看注釋,事半功倍讲岁。

我們來看execute()中做了什么操作我擂。

1.獲取線程池的狀態(tài),如果線程池中的線程數(shù)量小于corePoolSize缓艳,調(diào)用addWorker(command, true)創(chuàng)建新的線程去處理command任務(wù)校摩。如果addWorker()返回失敗,我們?cè)俅潍@取線程池的狀態(tài)阶淘。因?yàn)閍ddWorker()失敗的原因可能有:線程池已經(jīng)處于shutdown狀態(tài)不接收新的任務(wù)或者是存在并發(fā)衙吩,在workerCountOf(c) < corePoolSize這塊代碼后,有其他的線程創(chuàng)建了worker線程溪窒,導(dǎo)致worker線程的數(shù)量大于等于corePoolSize

2.如果線程池的數(shù)量大于等于corePoolSize坤塞,且線程池的狀態(tài)處于RUNNING狀態(tài)冯勉,我們將任務(wù)放到阻塞隊(duì)列中。當(dāng)任務(wù)成功放入阻塞隊(duì)列中尺锚,我們?nèi)匀恍枰粋€(gè)雙重校驗(yàn)的機(jī)制去判斷是否應(yīng)該創(chuàng)建新的線程去處理任務(wù)珠闰。

因?yàn)闀?huì)存在這些情況:有些線程在我們上次校驗(yàn)后已經(jīng)死掉惜浅、線程池在上次校驗(yàn)后突然關(guān)閉處于shutdown狀態(tài)瘫辩。考慮到這些原因坛悉,我們必須再次校驗(yàn)線程池的狀態(tài)伐厌。如果線程池的狀態(tài)不處于RUNNING狀態(tài),那么就行回滾操作裸影,把剛才入隊(duì)的任務(wù)移除掉挣轨,后續(xù)通過reject(command)執(zhí)行拒絕任務(wù)策略。

如果線程池處于RUNNING狀態(tài)且線程池中線程數(shù)量等于0或者從阻塞隊(duì)列中刪除任務(wù)失敗(意味著:這個(gè)任務(wù)已經(jīng)被其他線程處理掉了)且線程池中線程數(shù)量等于0轩猩,那么調(diào)用addWorker(null, false)新建一個(gè)worker線程卷扮,去消費(fèi)workQueue中里面的任務(wù)

3.如果線程池不處于RUNNING狀態(tài)或者任務(wù)無法成功入隊(duì)(此時(shí)阻塞隊(duì)列已經(jīng)滿了),此時(shí)需要?jiǎng)?chuàng)建新的線程擴(kuò)容至maximumPoolSize均践。如果addWorker(command, false)返回false晤锹,那么通過reject(command)執(zhí)行拒絕任務(wù)策略。

這里再嘮叨幾句彤委,調(diào)用addWorker()有這4種傳參的方式鞭铆,適用于不同場(chǎng)景。

1.addWorker(command, true)當(dāng)線程池中的線程數(shù)量少于corePoolSize焦影,會(huì)把command包裝成worker并且放入到workers集合中车遂。如果線程池中的線程數(shù)量超過了corePoolSize,會(huì)返回false斯辰。

2.addWorker(command, false)當(dāng)阻塞隊(duì)列滿了舶担,同樣會(huì)把command包裝成worker并且放入到worker集合中。如果線程池中的線程數(shù)量超過了maximumPoolSize,會(huì)返回false彬呻。

3.addWorker(null, false)說明firstTask是個(gè)空任務(wù)柄沮,同樣把它包裝成worker并且放入到worker集合中。如果線程池中的數(shù)量超過了maximumPoolSize,會(huì)返回false废岂。這樣firstTask為空的worker在線程執(zhí)行的時(shí)候祖搓,也可以從阻塞隊(duì)列中獲取任務(wù)去處理。

4.addWorker(null, true):和上面一樣湖苞,只是線程池的線程數(shù)量限制在corePoolSize拯欧,超過也是返回false。使用它的有prestartAllCoreThreads()prestartCoreThread()這2個(gè)方法财骨,其使用目的是預(yù)加載線程池中的核心線程镐作。

    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

addWorker

addWorker()主要是創(chuàng)建新的線程藏姐,然后執(zhí)行任務(wù)。

1.首先判斷線程池的狀態(tài)是否滿足創(chuàng)建worker線程的要求该贾。

如果線程池的狀態(tài)大于SHUTDOWN狀態(tài)羔杨,那么此時(shí)處于STOP、TIDYING杨蛋、TERMINATE狀態(tài)兜材,不能創(chuàng)建worker線程,返回false。

如果線程池處于shutdown狀態(tài)且firstTask不等于null,此時(shí)也無法創(chuàng)建worker線程杯矩。因?yàn)樘幱趕hutdown狀態(tài)的線程池不會(huì)去接收新的任務(wù)雨让。

如果線程池處于shutdown狀態(tài)且firstTask等于null且workQueue阻塞隊(duì)列為空,此時(shí)就更沒有必要?jiǎng)?chuàng)建worker線程了。因?yàn)閒irstTask為null,就是為了創(chuàng)建一個(gè)沒有任務(wù)的worker線程去阻塞隊(duì)列里面獲取任務(wù)。而阻塞隊(duì)列都已經(jīng)為空户侥,那么再創(chuàng)建一個(gè)firstTask為null的worker線程顯然沒有什么意思,返回false即可峦嗤。

  1. 判斷線程池中的線程數(shù)量是否超過最大值蕊唐。當(dāng)core為true,最大值為corePoolSize。當(dāng)core為false寻仗,最大值為maximumPoolSize刃泌。如果超過最大值,也無法創(chuàng)建worker線程署尤,直接返回false即可耙替。如果沒有超過最大值,通過CAS操作讓當(dāng)前線程數(shù)加1,然后通過標(biāo)簽跳轉(zhuǎn)跳出循環(huán)體至retry:位置曹体。如果CAS操作失敗俗扇,說明workerCount被其他線程修改過。我們?cè)俅潍@取ctl箕别,判斷當(dāng)前線程池狀態(tài)和之前的狀態(tài)是否匹配铜幽。如果不匹配,說明線程池狀態(tài)發(fā)生變更串稀,繼續(xù)循環(huán)操作除抛。

3.通過傳入來的firstTask創(chuàng)建worker線程。Worker的構(gòu)造方法中通過setState(-1)設(shè)置state(同步狀態(tài))為-1母截。Worker繼承了AbstractQueuedSynchronizer到忽,其本身是一把不可重入鎖。getThreadFactory().newThread(this)創(chuàng)建新線程清寇,因?yàn)閃orker實(shí)現(xiàn)了Runnable接口喘漏,其本身也是一個(gè)可執(zhí)行的任務(wù)护蝶。

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

4.我們往workers添加worker線程時(shí),通過ReentrantLock保證線程安全翩迈。只有在當(dāng)前線程池處于RUNNING狀態(tài)或者是處于SHUTDOWN狀態(tài)且firstTask等于null的情況下持灰,才可以添加worker線程。如果worker線程已經(jīng)處于啟動(dòng)且未死亡的狀態(tài)负饲,會(huì)拋出IllegalThreadStateException異常堤魁。

添加完畢后,啟動(dòng)worker線程绽族。如果worker線程啟動(dòng)成功返回true姨涡,啟動(dòng)失敗調(diào)用addWorkerFailed()進(jìn)行回滾操作衩藤。

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker

我們來看下ThreadPoolExecutor的內(nèi)部類Worker吧慢,上文已經(jīng)說到Worker繼承了AbstractQueuedSynchronizer類且實(shí)現(xiàn)了Runnable接口。所以說是一個(gè)可執(zhí)行的任務(wù)赏表,也是一把不可重入鎖检诗,具有排他性。

1.我們創(chuàng)建Worker對(duì)象時(shí)瓢剿,默認(rèn)的state為-1逢慌。我們中斷的時(shí)候,要獲取worker對(duì)象的鎖(state從0 CAS到1)间狂。獲取鎖成功后攻泼,才能進(jìn)行中斷。這說明了在初始化worker對(duì)象階段鉴象,不允許中斷忙菠。只有調(diào)用了runWorker()之后,將state置為0纺弊,才能中斷牛欢。

2.shutdown()中調(diào)用interruptIdleWorkers()中斷空閑線程和shutdownNow()中調(diào)用interruptWorkers()中斷所有線程。

interruptIdleWorkers()中中斷空閑線程的前提是要獲取worker對(duì)象的鎖淆游。

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

interruptWorkers()中中斷所有線程時(shí)傍睹,不用調(diào)用tryLock()獲取worker對(duì)象的鎖,最終是通過worker中的interruptIfStarted()來中斷線程犹菱。在這個(gè)方法中只有state大于等于0且線程不等于null且線程沒有被中斷過拾稳,才能進(jìn)行中斷操作。說明只有經(jīng)過了runworker()階段才能進(jìn)行中斷操作腊脱。

這也是Worker為什么要設(shè)計(jì)成不可重入的原因访得,就是為了防止中斷在運(yùn)行中的任務(wù),只會(huì)中斷在等待從workQueue中通過getTask()獲取任務(wù)的線程(因?yàn)樗麄儧]有上鎖虑椎,此時(shí)state為0)震鹉。

以下這5種方法都會(huì)調(diào)用到interruptIdleWorkers()去中斷空閑線程俱笛。

setCorePoolSize()
setKeepAliveTime(long time, TimeUnit unit)
setMaximumPoolSize(int maximumPoolSize)
shutdown()
allowCoreThreadTimeOut(boolean value)

還有一點(diǎn)必須強(qiáng)調(diào)。Task沒有真正的被執(zhí)行传趾,執(zhí)行的是Work線程迎膜。Work線程中只是調(diào)用到了Task中的run()方法。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

runWorker

1.work線程啟動(dòng)后浆兰,會(huì)調(diào)用其run()方法磕仅。run()方法再去調(diào)用runWorker(this)方法。

2.執(zhí)行任務(wù)之前簸呈,獲取work線程中的task榕订,然后釋放worker的鎖。讓state狀態(tài)從-1 CAS到0蜕便。當(dāng)state為0劫恒,說明可以去中斷此線程。

3.以輪詢的方式通過getTask()從阻塞隊(duì)列中獲取task轿腺,當(dāng)task為null两嘴,跳出輪詢。

4.開始執(zhí)行任務(wù)的時(shí)候族壳,通過lock()獲取鎖憔辫,將state從0 CAS到1。任務(wù)執(zhí)行完畢時(shí)仿荆,通過unlock()釋放鎖贰您。

5.如果線程池處于STOP、TIDYING拢操、TERMINATE狀態(tài)锦亦,要中斷worker線程。

6.通過beforeExecute(wt, task)和afterExecute(task, thrown)對(duì)task進(jìn)行前置和后置處理庐冯。

7.在task.run()孽亲、beforeExecute(wt, task)、afterExecute(task, thrown)發(fā)生異常時(shí)都會(huì)導(dǎo)致worker線程終止展父。通過調(diào)用processWorkerExit(w, completedAbruptly)來進(jìn)行worker退出操作返劲。

8.在getTask()獲取阻塞隊(duì)列中的任務(wù),如果隊(duì)列中沒有任務(wù)或者是獲取任務(wù)超時(shí)栖茉,都會(huì)調(diào)用processWorkerExit(w, completedAbruptly)來進(jìn)行worker退出操作篮绿。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask

上文已經(jīng)提起過getTask()方法,主要是從阻塞隊(duì)列獲取task的吕漂。那么條件下task會(huì)返回null呢亲配?我們可以通過注釋得到一些信息。

  • 超過了maximumPoolSize設(shè)置的線程數(shù)量,因?yàn)檎{(diào)用了setMaximumPoolSize()方法吼虎。
  • 線程池處于stop狀態(tài)犬钢。
  • 線程池處于shutdown狀態(tài)且workQueue為空.
  • 獲取任務(wù)等待超時(shí)。

1.首先獲取線程池運(yùn)行狀態(tài)思灰,如果線程池的狀態(tài)處于shutdown狀態(tài)且workQueue為空玷犹,或者處于stop狀態(tài)。然后調(diào)用decrementWorkerCount()遞減workerCount洒疚,最后返回null歹颓。

     * Decrements the workerCount field of ctl. This is called only on
     * abrupt termination of a thread (see processWorkerExit). Other
     * decrements are performed within getTask.
     */
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

2.allowCoreThreadTimeOut默認(rèn)為false。為false的時(shí)候油湖,核心線程即時(shí)在空閑時(shí)也會(huì)保持活躍巍扛。為true的時(shí)候,核心線程在keepAliveTime時(shí)間范圍內(nèi)等待工作乏德。如果線程池的數(shù)量超過maximumPoolSize或者等待任務(wù)超時(shí)或者workQueue為空撤奸,那么直接通過CAS減少workerCount數(shù)量,返回null鹅经。

3.如果timed為true寂呛,通過workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)獲取task怎诫,等待時(shí)間超過了keepAliveTime還沒獲取到task瘾晃,直接返回null。如果timed為false幻妓,通過workQueue.take()獲取task蹦误。如果沒有獲取到task,會(huì)一直阻塞當(dāng)前線程直到獲取到task(當(dāng)阻塞隊(duì)列中加入了新的任務(wù)肉津,會(huì)喚醒當(dāng)前線程)為止强胰。

4.如果獲取task成功,就直接返回妹沙。如果獲取task超時(shí)偶洋,timedOut會(huì)置為true,會(huì)在下一次循環(huán)中以返回null告終距糖。

再強(qiáng)調(diào)一點(diǎn)玄窝,只有當(dāng)線程池中的線程數(shù)量大于corePoolSize才會(huì)進(jìn)行獲取任務(wù)超時(shí)檢查,這也體現(xiàn)線程池中的一種策略:當(dāng)線程池中線程數(shù)量達(dá)到maximumPoolSize大小后悍引,如果一直沒有任務(wù)進(jìn)來恩脂,會(huì)逐漸減少workerCount直到線程數(shù)量等于corePoolSize。

    /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

processWorkerExit

1.completedAbruptly為true趣斤,說明worker線程時(shí)突然終止俩块,說明執(zhí)行task.run()發(fā)生了異常,所以要通過CAS減少workerCount的數(shù)量。
2.completedAbruptly為false玉凯,說明worker線程是正常終止势腮,不需要對(duì)workerCount進(jìn)行減少的操作。因?yàn)樵趃etTask()中已經(jīng)做了此操作漫仆。

3.對(duì)worker完成的任務(wù)數(shù)進(jìn)行統(tǒng)計(jì)嫉鲸,并且從workers集合中移出。

4.調(diào)用tryTerminate()方法歹啼,嘗試終止線程池玄渗。如果狀態(tài)滿足的話,線程池還存在線程狸眼,會(huì)調(diào)用interruptIdleWorkers(ONLY_ONE)進(jìn)行中斷處理藤树,使其進(jìn)入退出流程。如果線程池中的線程數(shù)量等于0的話拓萌,通過CAS把線程池的狀態(tài)更新到TIDYING岁钓。然后通過terminated()進(jìn)行一些結(jié)束的處理,最后通過CAS把線程池狀態(tài)更新到TERMINATED微王。最后的最后屡限,調(diào)用termination.signalAll()喚醒等待的線程,通知它們線程池已經(jīng)終止炕倘。

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

5.獲取線程池的狀態(tài)钧大。如果線程池的狀態(tài)還處于RUNNING、SHUTDOWN罩旋,說明tryTerminate()沒有成功啊央。如果worker線程是突然終止的話,通過addWorker(null, false)再創(chuàng)建一個(gè)沒有task的worker線程去處理任務(wù)涨醋。

6.如果worker線程是正常終止的話瓜饥,且當(dāng)前線程池中的線程數(shù)量小于需要維護(hù)的數(shù)量,我們也會(huì)通過addWorker(null, false)再創(chuàng)建一個(gè)沒有task的worker線程去處理任務(wù)浴骂。

7.默認(rèn)情況下allowCoreThreadTimeOut為false乓土,那么min就等于corePoolSize。那么線程池需要維護(hù)的線程數(shù)量就是corePoolSize個(gè)溯警。如果allowCoreThreadTimeOut為true趣苏,min就等于0。在workQueue不等于空的情況愧膀,min會(huì)被賦值成1拦键。此時(shí)線程池需要維護(hù)的線程池?cái)?shù)量是1。

如果線程池處于shutdown狀態(tài)檩淋,在workQueue不為空的情況下芬为,線程池始終會(huì)維護(hù)corePoolSize個(gè)線程萄金。當(dāng)workQueue為空的話,線程池會(huì)逐漸銷毀這corePoolSize個(gè)線程媚朦。

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

尾言

大家好氧敢,我是cmazxiaoma(寓意是沉夢(mèng)昂志的小馬),感謝各位閱讀本文章询张。
小弟不才孙乖。
如果您對(duì)這篇文章有什么意見或者錯(cuò)誤需要改進(jìn)的地方,歡迎與我討論。
如果您覺得還不錯(cuò)的話,希望你們可以點(diǎn)個(gè)贊份氧。
希望我的文章對(duì)你能有所幫助唯袄。
有什么意見、見解或疑惑蜗帜,歡迎留言討論恋拷。

最后送上:心之所向,素履以往厅缺。生如逆旅蔬顾,一葦以航。


saoqi.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末湘捎,一起剝皮案震驚了整個(gè)濱河市诀豁,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌窥妇,老刑警劉巖舷胜,帶你破解...
    沈念sama閱讀 221,576評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異秩伞,居然都是意外死亡逞带,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門纱新,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人穆趴,你說我怎么就攤上這事脸爱。” “怎么了未妹?”我有些...
    開封第一講書人閱讀 168,017評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵簿废,是天一觀的道長。 經(jīng)常有香客問我络它,道長族檬,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,626評(píng)論 1 296
  • 正文 為了忘掉前任化戳,我火速辦了婚禮单料,結(jié)果婚禮上埋凯,老公的妹妹穿的比我還像新娘。我一直安慰自己扫尖,他們只是感情好白对,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,625評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著换怖,像睡著了一般甩恼。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上沉颂,一...
    開封第一講書人閱讀 52,255評(píng)論 1 308
  • 那天条摸,我揣著相機(jī)與錄音,去河邊找鬼铸屉。 笑死屈溉,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的抬探。 我是一名探鬼主播子巾,決...
    沈念sama閱讀 40,825評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼小压!你這毒婦竟也來了线梗?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,729評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤怠益,失蹤者是張志新(化名)和其女友劉穎仪搔,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蜻牢,經(jīng)...
    沈念sama閱讀 46,271評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡烤咧,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,363評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了抢呆。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片煮嫌。...
    茶點(diǎn)故事閱讀 40,498評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖抱虐,靈堂內(nèi)的尸體忽然破棺而出昌阿,到底是詐尸還是另有隱情,我是刑警寧澤恳邀,帶...
    沈念sama閱讀 36,183評(píng)論 5 350
  • 正文 年R本政府宣布懦冰,位于F島的核電站,受9級(jí)特大地震影響谣沸,放射性物質(zhì)發(fā)生泄漏刷钢。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,867評(píng)論 3 333
  • 文/蒙蒙 一乳附、第九天 我趴在偏房一處隱蔽的房頂上張望内地。 院中可真熱鬧伴澄,春花似錦、人聲如沸瓤鼻。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,338評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽茬祷。三九已至清焕,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間祭犯,已是汗流浹背秸妥。 一陣腳步聲響...
    開封第一講書人閱讀 33,458評(píng)論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留沃粗,地道東北人粥惧。 一個(gè)月前我還...
    沈念sama閱讀 48,906評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像最盅,于是被迫代替她去往敵國和親突雪。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,507評(píng)論 2 359