Android線(xiàn)程池學(xué)習(xí)筆記(二)

ThreadPoolExecutor

線(xiàn)程池的實(shí)現(xiàn)類(lèi)。

構(gòu)造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize——最大核心線(xiàn)程數(shù)。核心線(xiàn)程即使空閑也不會(huì)被銷(xiāo)毀寓搬,除非調(diào)用allowCoreThreadTimeOut(true)兄春。
  • maximumPoolSize——線(xiàn)程池最多可運(yùn)行的線(xiàn)程數(shù)甜癞。該值不小于corePoolSize。
  • keepAliveTime——非核心線(xiàn)程在空閑狀態(tài)的存活時(shí)間宰译。
  • unit——keepAliveTime的時(shí)間單位。
  • workQueue——存放等待執(zhí)行的任務(wù)的隊(duì)列敷扫,只有通過(guò)execute(Runnable)提交的任務(wù)才可能進(jìn)入該隊(duì)列。
  • threadFactory——線(xiàn)程池創(chuàng)建通過(guò)該工廠(chǎng)創(chuàng)建線(xiàn)程哲身。
  • handler——在線(xiàn)程池滿(mǎn)載的情況下脯丝,提交的任務(wù)交由handler處理堤器。

以上各參數(shù)均有響應(yīng)的setter方法裕膀。

ThreadPoolExecutor提供了四種handler:

  1. CallerRunsPolicy——在線(xiàn)程池未關(guān)閉的情況毅厚,直接在調(diào)用execute(Runnable)方法的線(xiàn)程執(zhí)行任務(wù)妆棒。
  2. AbortPolicy——拋出一個(gè)RejectedExecutionException。
  3. DiscardPolicy——丟棄
  4. DiscardOldestPolicy——丟棄等待隊(duì)列中最早提交的那個(gè)任務(wù),然后重新提交新的任務(wù)菱鸥。

ThreadPoolExecutor默認(rèn)使用AbortPolicy躯概。

線(xiàn)程池狀態(tài)和線(xiàn)程數(shù)量的指示字段

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl是一個(gè)32位原子整型献幔,高3位表示線(xiàn)程狀態(tài)(runState),剩余位代碼線(xiàn)程數(shù)量(workerCount)。所以線(xiàn)程池可容納的最大線(xiàn)程數(shù)是(2^29)-1术裸。
wokerCount未必和存活的線(xiàn)程數(shù)一致冻晤,比如使用ThreadFactory創(chuàng)建線(xiàn)程失敗時(shí)歼捏,或者線(xiàn)程終結(jié)前仍然進(jìn)行著某些工作時(shí)。

線(xiàn)程池狀態(tài)有以下幾種:

  • RUNNING(-1)——可接收新的任務(wù),可執(zhí)行隊(duì)列中的任務(wù)歉井。
  • SHUTDOWN(0)——不再接收新的任務(wù)仇参,仍可執(zhí)行隊(duì)列中的任務(wù)。
  • STOP(1)——不再接收新的任務(wù),不再執(zhí)行隊(duì)列中的任務(wù)贱除,中斷正在執(zhí)行的任務(wù)难衰。
  • TIDYING(2)——過(guò)渡狀態(tài)。所有任務(wù)都已終結(jié)凭峡,workerCount等于0,terminated()方法執(zhí)行之前的狀態(tài)≡侣澹可以覆寫(xiě)terminated()做一些清理工作钓株。
  • TERMINATED(3)——terminated()執(zhí)行之后的狀態(tài)闰渔。

狀態(tài)的更改是遞增的席函,可能的狀態(tài)改變?nèi)缦拢?/p>

  • RUNNING -> SHUTDOWN——調(diào)用了shutdown()方法,可能是通過(guò)finalize()隱式調(diào)用的冈涧。
  • (RUNNING or SHUTDOWN) -> STOP——調(diào)用了shutdownNow()茂附。
  • SHUTDOWN -> TIDYING——線(xiàn)程池和隊(duì)列都為空。
  • STOP -> TIDYING——線(xiàn)程池為空督弓。
  • TIDYING -> TERMINATED——terminated()方法執(zhí)行完畢何之。

awaitTermination()方法在狀態(tài)為T(mén)ERMINATED時(shí)返回。

public void execute(Runnable command)

提交任務(wù)有幾種情況:

  1. 工作線(xiàn)程數(shù) < 核心線(xiàn)程數(shù)——?jiǎng)?chuàng)建新的核心線(xiàn)程咽筋,并處理該任務(wù)溶推。
  2. 工作線(xiàn)程數(shù) >= 核心線(xiàn)程數(shù),隊(duì)列未滿(mǎn)——添加到隊(duì)列奸攻。
  3. 隊(duì)列滿(mǎn)蒜危,工作線(xiàn)程數(shù) < 最大線(xiàn)程數(shù)——?jiǎng)?chuàng)建非核心線(xiàn)程處理任務(wù)。
  4. 交由handler處理睹耐。

public boolean prestartCoreThread()

手動(dòng)啟動(dòng)一個(gè)核心線(xiàn)程辐赞,這樣新來(lái)的任務(wù)就可以直接運(yùn)行,從而減少線(xiàn)程啟動(dòng)的時(shí)間硝训。如果所有核心線(xiàn)程都已啟動(dòng)响委,返回false新思。

public int prestartAllCoreThreads()

手動(dòng)啟動(dòng)所有核心線(xiàn)程。返回啟動(dòng)的核心線(xiàn)程數(shù)赘风。

public void allowCoreThreadTimeOut(boolean value)

設(shè)置空閑時(shí)夹囚,核心線(xiàn)程是否允許超時(shí)關(guān)閉。存活時(shí)間同非核心線(xiàn)程邀窃。

public boolean allowsCoreThreadTimeOut()

查詢(xún)核心線(xiàn)程在空閑時(shí)荸哟,是否允許超時(shí)關(guān)閉。

public BlockingQueue<Runnable> getQueue()

獲取等待隊(duì)列瞬捕。

public boolean remove(Runnable task)

從等待隊(duì)列中刪除task鞍历。

public void purge()

將等待隊(duì)列中所有已經(jīng)取消的Future任務(wù)立即移除。

public int getActiveCount()

返回正在執(zhí)行任務(wù)的線(xiàn)程數(shù)肪虎。

public int getLargestPoolSize()

返回線(xiàn)程池中出現(xiàn)過(guò)的最大的線(xiàn)程數(shù)量劣砍,不大于最大線(xiàn)程數(shù)。

public long getTaskCount()

返回線(xiàn)程池總共執(zhí)行過(guò)的以及正在執(zhí)行的任務(wù)數(shù)扇救,該值是個(gè)大概值刑枝。

public long getCompletedTaskCount()

返回線(xiàn)程池總共執(zhí)行過(guò)的任務(wù)數(shù),該值是個(gè)大概值爵政。


以上是線(xiàn)程池的基本理解和使用,不想深究的話(huà)陶缺,到這里也就可以了钾挟。

下面是需要注意的點(diǎn)。

關(guān)于新建線(xiàn)程

線(xiàn)程池使用ThreadFactory來(lái)創(chuàng)建線(xiàn)程饱岸,如果沒(méi)有顯示提供ThreadFactory掺出,則使用默認(rèn)的Executors#DefaultThreadFactory來(lái)創(chuàng)建線(xiàn)程:

    private static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

從以上代碼可以得到如下信息:

  1. 線(xiàn)程名稱(chēng)均是“pool-XX-thread-XX”形式。
  2. 均是非daemon線(xiàn)程苫费。
  3. 優(yōu)先級(jí)均是Thread.NORM_PRIORITY汤锨。

關(guān)于隊(duì)列選擇

常用的隊(duì)列選擇策略有以下幾種:

  1. 直接傳遞——使用SynchronousQueue同步隊(duì)列實(shí)現(xiàn)。該隊(duì)列不保存任務(wù)百框,可以理解為一個(gè)單純的管道闲礼。其“放入”和“取出”都是阻塞的,每個(gè)“放入”操作都要等待一個(gè)“取出”操作铐维,反之亦然柬泽。直接傳遞,通常要求最大線(xiàn)程數(shù)量不受限制嫁蛇,以避免新提交的任務(wù)交由handler處理锨并;但這就會(huì)導(dǎo)致線(xiàn)程數(shù)量不可控。該策略在任務(wù)間有內(nèi)部依賴(lài)時(shí)睬棚,可避免鎖住第煮。
  2. 無(wú)限隊(duì)列——使用一個(gè)無(wú)容量限制的隊(duì)列解幼,比如LinkedBlockingQueue(FIFO隊(duì)列)或者PriorityBlockingQueue(可自定義Comparator)。這樣在核心線(xiàn)程都在工作時(shí)包警,新的任務(wù)會(huì)被添加到隊(duì)列中撵摆,最大線(xiàn)程數(shù)不會(huì)超過(guò)核心線(xiàn)程數(shù),也就是說(shuō)maximumPoolSize這個(gè)參數(shù)將不起作用揽趾。該策略適合任務(wù)間完全獨(dú)立台汇,相互不影響的情況。所謂無(wú)限篱瞎,并非是真的無(wú)限苟呐,只是容量非常大而已,可能是Integer.MAX_VALUE俐筋,也可能是其他值牵素。
  3. 有限隊(duì)列——比如ArrayBlockingQueue〕握撸可以避免資源的過(guò)度消耗笆呆,但控制起來(lái)比較復(fù)雜,需要權(quán)衡隊(duì)列容量和最大線(xiàn)程數(shù)的關(guān)系:使用大隊(duì)列和小線(xiàn)程數(shù)量粱挡,會(huì)減少CPU的使用赠幕,以及操作系統(tǒng)資源的占用,但會(huì)降低吞吐率询筏;使用小隊(duì)列和大線(xiàn)程數(shù)榕堰,可以充分利用CPU資源,但可能會(huì)加大調(diào)度開(kāi)支嫌套,同樣降低吞吐率逆屡。

關(guān)于RejectedExecutionHandler

上面介紹了四種默認(rèn)的RejectedExecutionHandler,同樣也可以自己定義踱讨。需要注意的是魏蔗,RejectedExecutionHandler的選擇需要參照線(xiàn)程數(shù)量和隊(duì)列選擇策略。比如無(wú)限隊(duì)列的情況痹筛,可以隨意設(shè)置莺治。

關(guān)于覆寫(xiě)ThreadPoolExecutor

ThreadPoolExecutor提供了3個(gè)protected的hook方法。beforeExecute(Thread, Runnable)afterExecute(Runnable, Throwable)分別在每個(gè)任務(wù)的執(zhí)行前/后調(diào)用帚稠,terminated()方法在線(xiàn)程池終結(jié)時(shí)調(diào)用产雹。

以下是一個(gè)覆寫(xiě)的例子,添加了pause|resume方法:

class PausableThreadPoolExecutor extends ThreadPoolExecutor {
    private boolean isPaused;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();

    public PausableThreadPoolExecutor(...) { 
        super(...);
     }

    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        pauseLock.lock();
        try {
            while (isPaused) 
                unpaused.await();
        } catch (InterruptedException ie) {
            t.interrupt();
        } finally {
            pauseLock.unlock();
        }
    }

    public void pause() {
        pauseLock.lock();
        try {
            isPaused = true;
        } finally {
            pauseLock.unlock();
        }
    }

    public void resume() {
        pauseLock.lock();
        try {
            isPaused = false;
            unpaused.signalAll();
        } finally {
            pauseLock.unlock();
        }
    }
}

下面學(xué)習(xí)線(xiàn)程池的實(shí)現(xiàn)原理翁锡。

線(xiàn)程在哪里

線(xiàn)程池的線(xiàn)程由內(nèi)部類(lèi)Worker持有蔓挖。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker. */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

Worker類(lèi)繼承自AbstractQueuedSynchronizer,該父類(lèi)這里不深究馆衔,只需要知道它是一個(gè)鎖的實(shí)現(xiàn)即可瘟判。Worker類(lèi)還實(shí)現(xiàn)了Runnable接口怨绣。
由構(gòu)造方法可知,Worker在創(chuàng)建時(shí)會(huì)通過(guò)ThreadFactory.newThread方法創(chuàng)建一個(gè)線(xiàn)程拷获,并將自身作為Runnable對(duì)象傳遞給該線(xiàn)程篮撑。
protected修飾的方法是覆寫(xiě)的父類(lèi)方法,暫且不用管匆瓜。lock赢笨、tryLock、unlock驮吱、isLocked是自身鎖的調(diào)用方法茧妒。

Worker的run方法中調(diào)用了runWorker方法。如下:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();//獲取Worker類(lèi)的thread
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//getTask為阻塞方法
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||//當(dāng)前線(xiàn)程池狀態(tài)至少為STOP左冬,不考慮offset桐筏,其值為1。則滿(mǎn)足的狀態(tài)為STOP拇砰、TIDYING梅忌、TERMINATED。
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//hook方法除破,可重寫(xiě)
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//hook方法牧氮,可重寫(xiě)
                    }
                } finally {
                    task = null;
                    w.completedTasks++;//已完成的任務(wù)數(shù)加1
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);//Worker退出時(shí)的清理操作
        }
    }

由源碼可知,該方法不斷從隊(duì)列中取出任務(wù)瑰枫,交由該Worker所在的線(xiàn)程執(zhí)行踱葛,是執(zhí)行任務(wù)的最終場(chǎng)所。
順騰摸瓜躁垛,這里涉及到getTaskprocessWorkerExit兩個(gè)方法剖毯。

先看processWorkerExit

    /**
     * @param completedAbruptly if the worker died due to user exception
     */
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();//由異常引起的圾笨,需要ctl變量中存儲(chǔ)的工作線(xiàn)程數(shù)量減1教馆。

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);//workers是一個(gè)Set,存儲(chǔ)了所有活動(dòng)的Worker擂达。
        } finally {
            mainLock.unlock();
        }

        tryTerminate();//該方法留意一下土铺,下面會(huì)介紹

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {//線(xiàn)程池的狀態(tài)低于STOP,包括RUNNING板鬓、SHUTDOWN
            if (!completedAbruptly) {//非異常引起的死亡悲敷,比如空閑狀態(tài)超過(guò)了存活時(shí)間。
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);//重新添加一個(gè)Worker俭令。
        }
    }

該方法為死掉的Worker做一些清理工作后德。首先將死亡的Worker已經(jīng)完成的任務(wù)數(shù)添加到線(xiàn)程池已完成的任務(wù)數(shù)中,緊接著從線(xiàn)程池的Worker集合中刪除該Worker抄腔,然后調(diào)用tryTerminate瓢湃,最后根據(jù)線(xiàn)程池的狀態(tài)等理张,判斷是否需要重新添加Worker到Worker集合中。

getTask方法從隊(duì)列中取出一個(gè)任務(wù)并返回绵患。在Worker由于一些原因退出時(shí)雾叭,返回null。這些原因包括:

  1. 線(xiàn)程數(shù)量超過(guò)maximumPoolSize(比如通過(guò)setMaximumPoolSize修改了最大線(xiàn)程數(shù)量)落蝙。
  2. 線(xiàn)程池被stop织狐。
  3. 線(xiàn)程池被關(guān)閉,并且隊(duì)列為空筏勒。
  4. 等待取出任務(wù)超時(shí)移迫,而超時(shí)的Worker需要終結(jié)。
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//線(xiàn)程池狀態(tài)

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//對(duì)應(yīng)情況2奏寨、3
            //該條件等價(jià)于if (rs >= STOP || (rs >= SHUTDOWN && workQueue.isEmpty()))
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);//當(dāng)前線(xiàn)程數(shù)量

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//當(dāng)前Worker是否允許超時(shí)關(guān)閉

            //對(duì)應(yīng)情況1起意、4
            if ((wc > maximumPoolSize || (timed && timedOut))//線(xiàn)程數(shù)量大于maximumPoolSize或者達(dá)到超時(shí)條件
                && (wc > 1 || workQueue.isEmpty())) {//并且此時(shí)隊(duì)列為空,或者線(xiàn)程數(shù)大于1(在隊(duì)列非空時(shí)病瞳,至少需要保留一個(gè)Worker來(lái)執(zhí)行任務(wù))
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://允許超時(shí)關(guān)閉則調(diào)用poll超時(shí)阻塞方法
                    workQueue.take();//否則調(diào)用take揽咕,一直阻塞下去,直到新任務(wù)到來(lái)
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

在processWorkerExit方法中套菜,我們接觸到了addWorker方法亲善。
該方法根據(jù)線(xiàn)程池的狀態(tài)的最大線(xiàn)程數(shù)量,決定是否向線(xiàn)程池添加新的Worker逗柴。遇到以下條件之一時(shí)蛹头,添加失敗,返回false:

  1. 線(xiàn)程池已經(jīng)stop或者達(dá)到shutdown的條件戏溺。
  2. 當(dāng)前線(xiàn)程數(shù)達(dá)到上限渣蜗。
  3. ThreadFactory創(chuàng)建線(xiàn)程失敗。

Worker創(chuàng)建失敗時(shí)旷祸,會(huì)回滾一些數(shù)據(jù)耕拷。

    /**
     * @param firstTask 新Worker需要執(zhí)行的第一個(gè)任務(wù),可為null
     * @param core 是否是核心線(xiàn)程
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry://添加一個(gè)標(biāo)簽
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))//對(duì)應(yīng)情況1
            //等價(jià)于if (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()))
            //之所以有firstTask != null是因?yàn)镾HUTDOWN后不再接收新任務(wù)托享。
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||//CAPACITY為線(xiàn)程池所能容納的最大線(xiàn)程數(shù)量2^29-1
                    wc >= (core ? corePoolSize : maximumPoolSize))//對(duì)應(yīng)情況2
                    return false;
                if (compareAndIncrementWorkerCount(c))//ctl字段中的Worker數(shù)量加1骚烧,此時(shí)Worker還沒(méi)有被實(shí)際創(chuàng)建,
                //也證實(shí)了講ctl字段時(shí)提到的“wokerCount未必和存活的線(xiàn)程數(shù)一致”
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)//線(xiàn)程池狀態(tài)發(fā)生變化時(shí)闰围,重新執(zhí)行前面的邏輯
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {//線(xiàn)程創(chuàng)建成功
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {//SHUTDOWN后不再接收新任務(wù)
                        if (t.isAlive()) // precheck that t is startable
                            //t.isAlive()返回true赃绊,則表明t.start已被調(diào)用過(guò),而正常來(lái)說(shuō)羡榴,此時(shí)還未調(diào)用t.start
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)//largestPoolSize是線(xiàn)程池中出現(xiàn)過(guò)的最大線(xiàn)程的數(shù)量
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);//執(zhí)行數(shù)據(jù)回滾
        }
        return workerStarted;
    }

接下來(lái)看addWorkerFailed方法碧查。

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();//講ctl字段的workerCount減1
            tryTerminate();//查看是否需要終結(jié)線(xiàn)程池
        } finally {
            mainLock.unlock();
        }
    }

processWorkerExit和addWorkerFailed都涉及到了tryTerminate方法。
僅tryTerminate方法會(huì)將線(xiàn)程池狀態(tài)置為T(mén)IDYING和TERMINATED校仑,且需要滿(mǎn)足以下條件之一:

  1. 當(dāng)前狀態(tài)為SHUTDOWN忠售,并且線(xiàn)程池和隊(duì)列都為空者冤。
  2. 當(dāng)前狀態(tài)為STOP,并且隊(duì)列為空档痪。

在執(zhí)行了可能導(dǎo)致線(xiàn)程池TERMINATED的操作后涉枫,必須調(diào)用該方法,比如減少了worker的數(shù)量腐螟,或者shutdown之后從隊(duì)列中移除了任務(wù)愿汰。

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||//線(xiàn)程池正在運(yùn)行中
                runStateAtLeast(c, TIDYING) ||//線(xiàn)程池正在terminate
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))//或者不滿(mǎn)足條件1
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);//workerCount不為0時(shí),中斷一個(gè)空閑的worker乐纸,將中斷信號(hào)傳遞下去(如何傳遞請(qǐng)往下看)
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();//hook方法衬廷,可重寫(xiě)
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));//此處可以看出TIDYING只是一個(gè)過(guò)渡態(tài)。
                        termination.signalAll();//用于喚醒a(bǔ)waitTermination阻塞方法
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

至此我們就見(jiàn)到了可重寫(xiě)的3個(gè)hook方法:beforeExecute汽绢、afterExecute吗跋、terminate

termination.signalAll()這一語(yǔ)句宁昭,簡(jiǎn)單看一下awaitTermination

    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            while (!runStateAtLeast(ctl.get(), TERMINATED)) {
                if (nanos <= 0L)
                    return false;
                nanos = termination.awaitNanos(nanos);//狀態(tài)不是TERMINATED跌宛,則阻塞下去,等待termination.signalAll()喚醒
            }
            return true;
        } finally {
            mainLock.unlock();
        }
    }

tryTerminate方法中調(diào)用了interruptIdleWorkers积仗,僅此處調(diào)用傳遞的onlyOne參數(shù)為true:

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

前面說(shuō)了疆拘,調(diào)用interruptIdleWorkers方法是為了將終結(jié)信號(hào)傳遞下去,那究竟是如何傳遞的呢寂曹?

  1. interruptIdleWorkers會(huì)中斷一個(gè)Worker哎迄。
  2. Worker中斷,則runWorker方法就會(huì)調(diào)用finally塊中的processWorkerExit方法隆圆,參數(shù)completedAbruptly為true漱挚。
  3. processWorkerExit方法中會(huì)再次調(diào)用tryTerminate方法,從而完成終結(jié)信號(hào)的傳遞渺氧。

至此我們就學(xué)習(xí)了線(xiàn)程池的實(shí)現(xiàn)原理旨涝。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市阶女,隨后出現(xiàn)的幾起案子颊糜,更是在濱河造成了極大的恐慌哩治,老刑警劉巖秃踩,帶你破解...
    沈念sama閱讀 212,816評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異业筏,居然都是意外死亡憔杨,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)蒜胖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)消别,“玉大人抛蚤,你說(shuō)我怎么就攤上這事⊙翱瘢” “怎么了岁经?”我有些...
    開(kāi)封第一講書(shū)人閱讀 158,300評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)蛇券。 經(jīng)常有香客問(wèn)我缀壤,道長(zhǎng),這世上最難降的妖魔是什么纠亚? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,780評(píng)論 1 285
  • 正文 為了忘掉前任塘慕,我火速辦了婚禮,結(jié)果婚禮上蒂胞,老公的妹妹穿的比我還像新娘图呢。我一直安慰自己,他們只是感情好骗随,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,890評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布蛤织。 她就那樣靜靜地躺著,像睡著了一般鸿染。 火紅的嫁衣襯著肌膚如雪瞳筏。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 50,084評(píng)論 1 291
  • 那天牡昆,我揣著相機(jī)與錄音姚炕,去河邊找鬼。 笑死丢烘,一個(gè)胖子當(dāng)著我的面吹牛柱宦,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播播瞳,決...
    沈念sama閱讀 39,151評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼掸刊,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了赢乓?” 一聲冷哼從身側(cè)響起忧侧,我...
    開(kāi)封第一講書(shū)人閱讀 37,912評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤瓷耙,失蹤者是張志新(化名)和其女友劉穎堆巧,沒(méi)想到半個(gè)月后弄息,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體拳锚,經(jīng)...
    沈念sama閱讀 44,355評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡谁尸,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,666評(píng)論 2 327
  • 正文 我和宋清朗相戀三年沸毁,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了摩桶。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片烁试。...
    茶點(diǎn)故事閱讀 38,809評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖驯击,靈堂內(nèi)的尸體忽然破棺而出烁兰,到底是詐尸還是另有隱情,我是刑警寧澤徊都,帶...
    沈念sama閱讀 34,504評(píng)論 4 334
  • 正文 年R本政府宣布沪斟,位于F島的核電站,受9級(jí)特大地震影響暇矫,放射性物質(zhì)發(fā)生泄漏币喧。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,150評(píng)論 3 317
  • 文/蒙蒙 一袱耽、第九天 我趴在偏房一處隱蔽的房頂上張望杀餐。 院中可真熱鬧,春花似錦朱巨、人聲如沸史翘。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,882評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)琼讽。三九已至,卻和暖如春洪唐,著一層夾襖步出監(jiān)牢的瞬間钻蹬,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,121評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工凭需, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留问欠,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,628評(píng)論 2 362
  • 正文 我出身青樓粒蜈,卻偏偏與公主長(zhǎng)得像顺献,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子枯怖,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,724評(píng)論 2 351