線程池(2)execute()執(zhí)行過程源碼分析

線程池ThreadPoolExecutor源碼分析

所需知識(shí)點(diǎn):

  • 1贴捡、ReentranLock 重入鎖 以及 Condition的聯(lián)合使用。
  • 不可重入的互斥鎖汛骂,AQS AbstractQueuedSynchronizer
  • AQS是怎么實(shí)現(xiàn)不可重入互斥鎖的帘瞭。
  • 2抛腕、AtomicInteger 線程安全的int
  • 3、volatile 線程安全關(guān)鍵字
  • 4全封、BlockingQueue 阻塞線程隊(duì)列

源碼分析execute()執(zhí)行過程

老規(guī)矩我們先上圖:

image.png

1、如果線程池中的線程數(shù)量少于corePoolSize颂跨,就創(chuàng)建新的線程來執(zhí)行新添加的任務(wù)

2、如果線程池中的線程數(shù)量大于等于corePoolSize钓丰,但隊(duì)列workQueue未滿,則將新添加的任務(wù)放到workQueue中

3梦鉴、如果線程池中的線程數(shù)量大于等于corePoolSize,且隊(duì)列workQueue已滿存筏,但線程池中的線程數(shù)量小于maximumPoolSize予跌,則會(huì)創(chuàng)建新的線程來處理被添加的任務(wù)

4、如果線程池中的線程數(shù)量等于了maximumPoolSize汁掠,就用RejectedExecutionHandler來執(zhí)行拒絕策略

<span id="jump1">一宜狐、ThreadPoolExecutor.execute()方法</span>

流程圖:


image.png
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 如果運(yùn)行的線程少于corePoolSize络拌,嘗試開啟一個(gè)新線程去運(yùn)行command混萝,command作為這個(gè)線程的第一個(gè)任務(wù)
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     * 如果任務(wù)成功放入隊(duì)列允粤,我們?nèi)孕枰粋€(gè)雙重校驗(yàn)去確認(rèn)是否應(yīng)該新建一個(gè)線程(因?yàn)榭赡艽嬖谟行┚€程在我們上次檢查后死了) 或者 從我們進(jìn)入這個(gè)方法后类垫,pool被關(guān)閉了
     * 所以我們需要再次檢查state残家,如果線程池停止了需要回滾入隊(duì)列,如果池中沒有線程了,新開啟 一個(gè)線程
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     * 如果無法將任務(wù)入隊(duì)列(可能隊(duì)列滿了),需要新開區(qū)一個(gè)線程(自己:往maxPoolSize發(fā)展)
     * 如果失敗了,說明線程池shutdown 或者 飽和了,所以我們拒絕任務(wù)
     */
    //獲取線程池當(dāng)前狀態(tài)
    int c = ctl.get();
    /**
     * 1掂榔、獲取當(dāng)前工作的Worker數(shù)量瑞信,如果小于核心線程池?cái)?shù)量凡简。就創(chuàng)建新的Worker
     */
    if (workerCountOf(c) < corePoolSize) {
        //創(chuàng)建新worker對(duì)象秤涩,啟動(dòng)新線程浊竟,并且設(shè)置為true 核心線程必怜。
        //如果添加創(chuàng)建成功直接返回膏执。
        if (addWorker(command, true))
            return;
        //新增Worker失敗欺栗,重新獲取線程池狀態(tài)值
        /**
         * 沒有成功addWorker()类腮,再次獲取c(凡是需要再次用ctl做判斷時(shí),都會(huì)再次調(diào)用ctl.get())
         * 失敗的原因可能是:
         * 1、線程池已經(jīng)shutdown,shutdown的線程池不再接收新任務(wù)
         * 2吗伤、workerCountOf(c) < corePoolSize 判斷后,由于并發(fā)丹鸿,別的線程先創(chuàng)建了worker線程,導(dǎo)致workerCount>=corePoolSize
         */
        c = ctl.get();
    }


    /**
     * 2、走到這,說明核心線程池已滿,或者線程池shutdown了拧粪。
     * 如果線程池在運(yùn)行狀態(tài)宴杀,并且workQueue隊(duì)列插入成功扁达。
     * BlockQueue #offer()特性叉讥,插入值失敗返回false
     */
    if (isRunning(c) && workQueue.offer(command)) {
        //再次校驗(yàn)位
        int recheck = ctl.get();
        /**
         * 再次校驗(yàn)放入workerQueue中的任務(wù)是否能被執(zhí)行
         * 1、如果線程池不是運(yùn)行狀態(tài)了碳竟,應(yīng)該拒絕添加新任務(wù)懂拾,從workQueue中刪除任務(wù)
         * 2唐断、如果線程池是運(yùn)行狀態(tài)钝的,或者從workQueue中刪除任務(wù)失斠诟獭(剛好有一個(gè)線程執(zhí)行完畢,并消耗了這個(gè)任務(wù))儒陨,
         *  那么addWorker(null,false)確保還有線程執(zhí)行任務(wù)(只要有一個(gè)就夠了)
         */

        //如果線程池不再運(yùn)行狀態(tài)隘击,并且workQueue成功刪除了剛添加的任務(wù),那么就調(diào)用拒絕handler方法。
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //如果當(dāng)前worker數(shù)量為0缕粹,通過addWorker(null, false)創(chuàng)建一個(gè)線程侈沪,其任務(wù)為null
        //為什么只檢查運(yùn)行的worker數(shù)量是不是0呢情组?盲泛? 為什么不和corePoolSize比較呢??
        //只保證有一個(gè)worker線程可以從queue中獲取任務(wù)執(zhí)行就行了绞旅?琐鲁?
        //因?yàn)橹灰€有活動(dòng)的worker線程暴构,就可以消費(fèi)workerQueue中的任務(wù)
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);//第一個(gè)參數(shù)為null,說明只為新建一個(gè)worker線程段磨,沒有指定firstTask
        //第二個(gè)參數(shù)為true代表占用corePoolSize取逾,false占用maxPoolSize
    }


    /**
     * 3、如果添加workQueue隊(duì)列失敗苹支,那么就嘗試添加非核心線程砾隅。
     * 直到線程擴(kuò)容超過maximumPoolSize,addWorker失敗就會(huì)調(diào)用拒絕handler方法
     */
    else if (!addWorker(command, false))
        reject(command);
}

總結(jié)分析:
execute(Runnable command)

參數(shù):

command    提交執(zhí)行的任務(wù)债蜜,不能為空

執(zhí)行流程:

1晴埂、如果線程池當(dāng)前線程數(shù)量少于corePoolSize,則addWorker(command, true)創(chuàng)建新worker線程寻定,如創(chuàng)建成功返回儒洛,如沒創(chuàng)建成功,則執(zhí)行后續(xù)步驟狼速;

addWorker(command, true)失敗的原因可能是:
A琅锻、線程池已經(jīng)shutdown,shutdown的線程池不再接收新任務(wù)
B、workerCountOf(c) < corePoolSize 判斷后恼蓬,由于并發(fā)惊完,別的線程先創(chuàng)建了worker線程,導(dǎo)致workerCount>=corePoolSize

2处硬、如果線程池還在running狀態(tài)小槐,將task加入workQueue阻塞隊(duì)列中,如果加入成功郁油,進(jìn)行double-check本股,如果加入失敗(可能是隊(duì)列已滿)桐腌,則執(zhí)行后續(xù)步驟拄显;

double-check主要目的是判斷剛加入workQueue阻塞隊(duì)列的task是否能被執(zhí)行
A、如果線程池已經(jīng)不是running狀態(tài)了案站,應(yīng)該拒絕添加新任務(wù)躬审,從workQueue中刪除任務(wù)
B、如果線程池是運(yùn)行狀態(tài)蟆盐,或者從workQueue中刪除任務(wù)失敵斜摺(剛好有一個(gè)線程執(zhí)行完畢,并消耗了這個(gè)任務(wù))石挂,確保還有線程執(zhí)行任務(wù)(只要有一個(gè)就夠了)

3博助、如果線程池不是running狀態(tài) 或者 無法入隊(duì)列,嘗試開啟新線程痹愚,擴(kuò)容至maxPoolSize富岳,如果addWork(command, false)失敗了,拒絕當(dāng)前command

<span id="jump2">二拯腮、ThreadPoolExecutor.addWorker()方法</span>

image.png
  /**
 * Checks if a new worker can be added with respect to current
 * pool state and the given bound (either core or maximum). If so,
 * the worker count is adjusted accordingly, and, if possible, a
 * new worker is created and started, running firstTask as its
 * first task. This method returns false if the pool is stopped or
 * eligible to shut down. It also returns false if the thread
 * factory fails to create a thread when asked.  If the thread
 * creation fails, either due to the thread factory returning
 * null, or due to an exception (typically OutOfMemoryError in
 * Thread.start()), we roll back cleanly.
 *
 * @param firstTask the task the new thread should run first (or
 * null if none). Workers are created with an initial first task
 * (in method execute()) to bypass queuing when there are fewer
 * than corePoolSize threads (in which case we always start one),
 * or when the queue is full (in which case we must bypass queue).
 * Initially idle threads are usually created via
 * prestartCoreThread or to replace other dying workers.
 *
 * @param core if true use corePoolSize as bound, else
 * maximumPoolSize. (A boolean indicator is used here rather than a
 * value to ensure reads of fresh values after checking other pool
 * state).
 * @return true if successful
 *  檢查根據(jù)當(dāng)前線程池的狀態(tài)和給定的邊界(core or maximum)是否可以創(chuàng)建一個(gè)新的worker
 *  * 如果是這樣的話窖式,worker的數(shù)量做相應(yīng)的調(diào)整,如果可能的話动壤,創(chuàng)建一個(gè)新的worker并啟動(dòng)萝喘,參數(shù)中的firstTask作為worker的第一個(gè)任務(wù)
 *  * 如果方法返回false,可能因?yàn)閜ool已經(jīng)關(guān)閉或者調(diào)用過了shutdown
 *  * 如果線程工廠創(chuàng)建線程失敗琼懊,也會(huì)失敗阁簸,返回false
 *  * 如果線程創(chuàng)建失敗,要么是因?yàn)榫€程工廠返回null哼丈,要么是發(fā)生了OutOfMemoryError
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    //外層循環(huán)启妹,負(fù)責(zé)判斷線程池狀態(tài)
    retry:
    for (;;) {
        int c = ctl.get();
        //線程池狀態(tài)
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /**
         * 線程池的state越小越是運(yùn)行狀態(tài),running=-1削祈,shutdown=0, stop=1, tidying=2翅溺,terminated=3
         * 1、如果線程池state已經(jīng)至少是shutdown狀態(tài)了
         * 2髓抑、并且以下3個(gè)條件任意一個(gè)是false
         *   rs == SHUTDOWN         (隱含:rs>=SHUTDOWN)false情況: 線程池狀態(tài)已經(jīng)超過shutdown咙崎,可能是stop、tidying吨拍、terminated其中一個(gè)褪猛,即線程池已經(jīng)終止
         *   firstTask == null      (隱含:rs==SHUTDOWN)false情況: firstTask不為空,rs==SHUTDOWN 且 firstTask不為空羹饰,return false伊滋,場(chǎng)景是在線程池已經(jīng)shutdown后,還要添加新的任務(wù)队秩,拒絕
         *   ! workQueue.isEmpty()  (隱含:rs==SHUTDOWN笑旺,firstTask==null)false情況: workQueue為空,當(dāng)firstTask為空時(shí)是為了創(chuàng)建一個(gè)沒有任務(wù)的線程馍资,再?gòu)膚orkQueue中獲取任務(wù)筒主,如果workQueue已經(jīng)為空,那么就沒有添加新worker線程的必要了
         * return false鸟蟹,即無法addWorker()
         */
        if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())
                )
            return false;

        //內(nèi)層循環(huán)乌妙,負(fù)責(zé)worker數(shù)量+1
        for (;;) {
            //獲取當(dāng)前worker數(shù)量
            int wc = workerCountOf(c);
            //如果worker數(shù)量>線程池最大上限CAPACITY(即使用int低29位可以容納的最大值)
            //或者( worker數(shù)量>corePoolSize 或  worker數(shù)量>maximumPoolSize ),即已經(jīng)超過了給定的邊界
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;

            //調(diào)用unsafe CAS操作建钥,使得worker數(shù)量+1藤韵,成功則跳出外層retry循環(huán)
            //CAS 即Compare and Swap 調(diào)用AtomicInteger的同步+1方法,這個(gè)方法可能會(huì)失敗熊经,返回true成功泽艘,false失敗
            if (compareAndIncrementWorkerCount(c))
                break retry;

            //重新驗(yàn)證線程池運(yùn)行狀態(tài)
            c = ctl.get();  // Re-read ctl
            //如果當(dāng)前狀態(tài)和外層循環(huán)開始時(shí)不一樣了,那么回到外層循環(huán)重新開始奈搜。
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }


    /**
     * worker數(shù)量+1成功的后續(xù)操作
     * 添加到workers Set集合悉盆,并啟動(dòng)worker線程
     */
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //1、設(shè)置worker這個(gè)AQS鎖的同步狀態(tài)state=-1
        //2馋吗、將firstTask設(shè)置給worker的成員變量firstTask
        //3焕盟、使用worker自身這個(gè)runnable,調(diào)用ThreadFactory創(chuàng)建一個(gè)線程宏粤,并設(shè)置給worker的成員變量thread
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //--------------------------------------------這部分代碼是上鎖的
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                // 當(dāng)獲取到鎖后脚翘,再次檢查線程池運(yùn)行狀態(tài),
                int rs = runStateOf(ctl.get());

                //如果線程池在運(yùn)行running<shutdown 或者 線程池已經(jīng)shutdown绍哎,且firstTask==null(可能是workQueue中仍有未執(zhí)行完成的任務(wù)来农,創(chuàng)建沒有初始任務(wù)的worker線程執(zhí)行)
                //worker數(shù)量-1的操作在addWorkerFailed()
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable 線程已經(jīng)啟動(dòng),拋非法線程狀態(tài)異常
                        throw new IllegalThreadStateException();

                    //workers是一個(gè)HashSet<Worker>,將worker存入
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;

                    //標(biāo)識(shí)worker添加成功
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //添加成功崇堰,啟動(dòng)線程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果啟動(dòng)失敗沃于,回滾操作
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

/**
 * 如果worker不為空涩咖,就從workQueue移除
 * @param w
 */
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        //里邊就是調(diào)用AtomicInteger#compareAndSet(expect, expect - 1) 減了1
        decrementWorkerCount();
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

addWorker(Runnable firstTask, boolean core)

參數(shù):

firstTask:    worker線程的初始任務(wù),可以為空
core:           true:將corePoolSize作為上限繁莹,false:將maximumPoolSize作為上限

addWorker方法有4種傳參的方式:

1檩互、addWorker(command, true)

2、addWorker(command, false)

3咨演、addWorker(null, false)

4闸昨、addWorker(null, true)

在execute方法中就使用了前3種,結(jié)合這個(gè)核心方法進(jìn)行以下分析

第一個(gè):線程數(shù)小于corePoolSize時(shí)薄风,放一個(gè)需要處理的task進(jìn)Workers Set饵较。如果Workers Set長(zhǎng)度超過corePoolSize,就返回false
第二個(gè):當(dāng)隊(duì)列被放滿時(shí)遭赂,就嘗試將這個(gè)新來的task直接放入Workers Set循诉,而此時(shí)Workers Set的長(zhǎng)度限制是maximumPoolSize。如果線程池也滿了的話就返回false
第三個(gè):放入一個(gè)空的task進(jìn)workers Set撇他,長(zhǎng)度限制是maximumPoolSize打洼。這樣一個(gè)task為空的worker在線程執(zhí)行的時(shí)候會(huì)去任務(wù)隊(duì)列里拿任務(wù),這樣就相當(dāng)于創(chuàng)建了一個(gè)新的線程逆粹,只是沒有馬上分配任務(wù)
第四個(gè):這個(gè)方法就是放一個(gè)null的task進(jìn)Workers Set募疮,而且是在小于corePoolSize時(shí),如果此時(shí)Set中的數(shù)量已經(jīng)達(dá)到corePoolSize那就返回false僻弹,什么也不干阿浓。實(shí)際使用中是在prestartAllCoreThreads()方法,這個(gè)方法用來為線程池預(yù)先啟動(dòng)corePoolSize個(gè)worker等待從workQueue中獲取任務(wù)執(zhí)行

執(zhí)行流程:

1蹋绽、判斷線程池當(dāng)前是否為可以添加worker線程的狀態(tài)芭毙,可以則繼續(xù)下一步,不可以return false:
    A卸耘、線程池狀態(tài)>shutdown退敦,可能為stop、tidying蚣抗、terminated侈百,不能添加worker線程
    B、線程池狀態(tài)==shutdown翰铡,firstTask不為空钝域,不能添加worker線程,因?yàn)閟hutdown狀態(tài)的線程池不接收新任務(wù)
    C锭魔、線程池狀態(tài)==shutdown例证,firstTask==null,workQueue為空迷捧,不能添加worker線程织咧,因?yàn)閒irstTask為空是為了
    添加一個(gè)沒有任務(wù)的線程再?gòu)膚orkQueue獲取task胀葱,而workQueue為空,說明添加無任務(wù)線程已經(jīng)沒有意義
2笙蒙、線程池當(dāng)前線程數(shù)量是否超過上限(corePoolSize 或 maximumPoolSize)巡社,超過了return false,沒超過則對(duì)workerCount+1手趣,繼續(xù)下一步
3、在線程池的ReentrantLock保證下肥荔,向Workers Set中添加新創(chuàng)建的worker實(shí)例绿渣,添加完成后解鎖,并啟動(dòng)worker線程燕耿,
如果這一切都成功了中符,return true,如果添加worker入Set失敗或啟動(dòng)失敗誉帅,調(diào)用addWorkerFailed()邏輯

<span id="jump3">三淀散、Worker內(nèi)部類</span>

 /**
 * Class Worker mainly maintains interrupt control state for
 * threads running tasks, along with other minor bookkeeping.
 * This class opportunistically extends AbstractQueuedSynchronizer
 * to simplify acquiring and releasing a lock surrounding each
 * task execution.This protects against interrupts that are
 * intended to wake up a worker thread waiting for a task from
 * instead interrupting a task being run.
 *
 * We implement a simple non-reentrant mutual exclusion lock rather than use
 * ReentrantLock because we do not want worker tasks to be able to
 * reacquire the lock when they invoke pool control methods like
 * setCorePoolSize.
 *
 * Additionally, to suppress interrupts until
 * the thread actually starts running tasks, we initialize lock
 * state to a negative value, and clear it upon start (in
 * runWorker).
 *
 * Worker類大體上管理著運(yùn)行線程的中斷狀態(tài) 和 一些指標(biāo)
 * Worker類投機(jī)取巧的繼承了AbstractQueuedSynchronizer來簡(jiǎn)化在執(zhí)行任務(wù)時(shí)的獲取、釋放鎖
 * 這樣防止了中斷在運(yùn)行中的任務(wù)蚜锨,只會(huì)喚醒(中斷)在等待從workQueue中獲取任務(wù)的線程
 *   解釋:
 *    為什么不直接在execute(runnable command)直接執(zhí)行command档插,而要包一層Worker呢?
 *        1亚再、主要目的是為了控制線程中斷郭膛,使用不可重入的互斥鎖AQS,來限制同一線程中其他操作導(dǎo)致線程中斷氛悬。
 *        2则剃、正常shutdown()方法,調(diào)用的是interruptIdleWorkers()如捅,這個(gè)方法是需要w.tryLock()的棍现,
 *        也就是用不可重入鎖AQS協(xié)助攔截正在運(yùn)行的線程調(diào)用t.intercept()中斷。所以shutdown()方法
 *        不會(huì)中斷正在執(zhí)行任務(wù)的worker線程镜遣。
 *        3己肮、但如果shutdownNow()方法,調(diào)用的是interruptWorkers()悲关,這個(gè)方法并不加鎖朴肺,而是直接遍歷
 *        所有worker,并t.intercept()坚洽。所以shutdownNow()相當(dāng)于會(huì)中斷正在執(zhí)行的Worker線程戈稿。
 *
 * worker實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的不可重入的互斥鎖,而不是用ReentrantLock可重入鎖
 * 因?yàn)槲覀儾幌胱屧谡{(diào)用比如setCorePoolSize()這種線程池控制方法時(shí)可以再次獲取鎖(重入)
 *   解釋:
 *     1讶舰、setCorePoolSize()時(shí)會(huì)調(diào)用interruptIdleWorkers()鞍盗,通過這個(gè)方法里w.tryLock()來攔截
 *     利用不可重入鎖的特性需了,保證同一線程中執(zhí)行時(shí)也會(huì)阻塞,來保證worker不被中斷般甲。
 *     2肋乍、類似的方法還有(只要調(diào)用interruptIdleWorkers()的全是):
 *     shutdown()
 *     setMaximumPoolSize()
 *     setKeepAliveTime()
 *     allowCoreThreadTimeOut()
 *
 * 另外,為了保證只有worker中的線程已經(jīng)在運(yùn)行狀態(tài)才能被中斷敷存。我們初始化state = -1墓造,并在runWorker()
 * 啟動(dòng)線程時(shí),將state設(shè)置 = 0锚烦。
 *    解釋:
 *      1觅闽、創(chuàng)建Worker過程并沒有真正的t.start()啟動(dòng)線程。在runWorker()中才調(diào)用啟動(dòng)線程涮俄。
 *      2蛉拙、所以在t.start()之前并沒有必要去中斷線程。只有state >= 0的時(shí)候彻亲,才表示有線程可中斷孕锄。
 */
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     *
     * 這個(gè)類永遠(yuǎn)都不會(huì)被序列化,我們提供序列號(hào)id只是解決javac 的警告
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter 記錄已經(jīng)完成的任務(wù)數(shù)*/
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        //初始化state為-1苞尝,在runWorker()調(diào)用t.start()時(shí)再設(shè)置為0
        setState(-1); // inhibit interrupts until runWorker
        //任務(wù)可能為null畸肆,為空時(shí)runWorker()時(shí)就會(huì)自旋,getTask()宙址,不斷獲取workQueue中的任務(wù)恼除。
        this.firstTask = firstTask;
        //通過線程工廠創(chuàng)建線程
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.
    //
    // 獲取鎖狀態(tài),0表示未鎖曼氛,1表示已鎖

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    /**
     * 嘗試獲取鎖方法豁辉。AQS獲取鎖的時(shí)候會(huì)調(diào)用這個(gè),本身就是讓子類實(shí)現(xiàn)的舀患。
     *
     * 這里判斷邏輯是通過(CAS)unsafe.compareAndSwapInt的原子性徽级,來比較并設(shè)置值。
     * 最終是比較當(dāng)前state == 0 聊浅?那么就設(shè)置為 1 并返回true餐抢。
     *   true: 將當(dāng)前線程綁定上。return true表示獲取鎖成功
     *   false: 獲取鎖失敗低匙。
     */
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    /**
     * 釋放鎖旷痕,將state置為0
     */
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    //lock方法也會(huì)調(diào)用tryAcquire()方法,但是獲取失敗會(huì)中斷線程顽冶。
    public void lock()        { acquire(1); }
    //嘗試獲取鎖
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    /**
     * 結(jié)束線程t欺抗。如果線程已經(jīng)start(),并且線程t不為空强重,也不是中斷狀態(tài)绞呈,那么就中斷贸人。
     * 這個(gè)方法再shutdownNow()中使用了,不獲取鎖直接中斷佃声,
     */
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

Worker存在的意義:

- 1艺智、控制中斷,保證task能夠完整執(zhí)行圾亏。防止在shutdown()等情況下意外中斷十拣。
- 2、控制自旋獲取getTask()志鹃。
- 3夭问、控制自身退出時(shí)機(jī)

Worker如何實(shí)現(xiàn)不可重入鎖:state (-1,0,1)

控制中斷,主要意義就是盡量保證worker中正在執(zhí)行的task能成功執(zhí)行完弄跌。
不要在執(zhí)行過程中被其他方法intercept()了。
- 1尝苇、繼承AQS(AbstractQueuedSynchronizer)實(shí)現(xiàn)不可重入鎖
- 2铛只、在new Worker()時(shí),線程還沒有start()時(shí),state設(shè)置-1。此時(shí)線程如果調(diào)用intercept()沒有意義糠溜。
在runWoker()=>t.start()時(shí)再設(shè)置state=0
- 3淳玩、在tryLock()->tryAcquire(),使用CAS先比較再重置的方式設(shè)置state 0=>1非竿,就保證了不可重入鎖蜕着。
因?yàn)橹挥衧tate=0時(shí)才能重新設(shè)置,其他-1红柱,1的情況都不能設(shè)置成功承匣。

Worker如何控制中斷

- 1、初始化AQS state = -1锤悄,此時(shí)不允許調(diào)用interrupt()韧骗。只有runWorker()將state設(shè)置為0才允許調(diào)用中斷。
- 2零聚、shutdown()袍暴,setMaximumPoolSize()等安全退出或改變線程池的方法,都會(huì)調(diào)用interruptIdleWorkers();中斷空閑worker的方法隶症。
這個(gè)方法中會(huì)遍歷所有worker政模,然后嘗試獲取鎖tryLock(),如果tryLock()獲取成功蚂会,則說明該worker屬于空閑狀態(tài)淋样,
因?yàn)閣orker自旋時(shí)如果獲取到task的話就會(huì)lock()。只有在getTask()阻塞狀態(tài)時(shí)才會(huì)釋放鎖胁住。故如果w.tryLock()能成功獲取鎖习蓬,
則說明worker空閑纽什。這一點(diǎn)就是利用不可重入鎖的特點(diǎn)來實(shí)現(xiàn)的。
- 3躲叼、shutdownNow()芦缰,調(diào)用的是interruptWorkers(),這個(gè)方法是直接遍歷worker枫慷,直接調(diào)用interrupt()让蕾,并不會(huì)去獲取鎖。
但是判斷state是不是>-1或听,因?yàn)閟tate=-1時(shí)探孝,線程都沒有start()呢,沒必要中斷誉裆。

<span id="jump4">四顿颅、ThreadPoolExecutor.runWorker()</span>

[圖片上傳失敗...(image-107cc8-1618402940252)]

 /**
 * Main worker run loop.  Repeatedly gets tasks from queue and
 * executes them, while coping with a number of issues:
 * 重復(fù)的從隊(duì)列中獲取任務(wù)并執(zhí)行,同時(shí)應(yīng)對(duì)一些問題:
 *
 * 1. We may start out with an initial task, in which case we
 * don't need to get the first one. Otherwise, as long as pool is
 * running, we get tasks from getTask. If it returns null then the
 * worker exits due to changed pool state or configuration
 * parameters.  Other exits result from exception throws in
 * external code, in which case completedAbruptly holds, which
 * usually leads processWorkerExit to replace this thread.
 * 我們可能使用一個(gè)初始化任務(wù)開始足丢,即firstTask為null
 * 然后只要線程池在運(yùn)行粱腻,我們就從getTask()獲取任務(wù)
 * 如果getTask()返回null,則worker由于改變了線程池狀態(tài)或參數(shù)配置而退出
 * 其它退出因?yàn)橥獠看a拋異常了斩跌,這會(huì)使得completedAbruptly為true绍些,這會(huì)導(dǎo)致在processWorkerExit()方法中替換當(dāng)前線程
 *
 * 2. Before running any task, the lock is acquired to prevent
 * other pool interrupts while the task is executing, and
 * clearInterruptsForTaskRun called to ensure that unless pool is
 * stopping, this thread does not have its interrupt set.
 * 在任何任務(wù)執(zhí)行之前,都需要對(duì)worker加鎖去防止在任務(wù)運(yùn)行時(shí)耀鸦,其它的線程池中斷操作
 * clearInterruptsForTaskRun保證除非線程池正在stoping柬批,線程不會(huì)被設(shè)置中斷標(biāo)示
 *
 * 3. Each task run is preceded by a call to beforeExecute, which
 * might throw an exception, in which case we cause thread to die
 * (breaking loop with completedAbruptly true) without processing
 * the task.
 * 每個(gè)任務(wù)執(zhí)行前會(huì)調(diào)用beforeExecute(),其中可能拋出一個(gè)異常袖订,這種情況下會(huì)導(dǎo)致線程die(跳出循環(huán)氮帐,且completedAbruptly==true),沒有執(zhí)行任務(wù)
 * 因?yàn)閎eforeExecute()的異常沒有cache住洛姑,會(huì)上拋揪漩,跳出循環(huán)
 *
 * 4. Assuming beforeExecute completes normally, we run the task,
 * gathering any of its thrown exceptions to send to
 * afterExecute. We separately handle RuntimeException, Error
 * (both of which the specs guarantee that we trap) and arbitrary
 * Throwables.  Because we cannot rethrow Throwables within
 * Runnable.run, we wrap them within Errors on the way out (to the
 * thread's UncaughtExceptionHandler).  Any thrown exception also
 * conservatively causes thread to die.
 * 假定beforeExecute()正常完成,我們執(zhí)行任務(wù)
 * 匯總?cè)魏螔伋龅漠惓2l(fā)送給afterExecute(task, thrown)
 * 因?yàn)槲覀儾荒茉赗unnable.run()方法中重新上拋Throwables吏口,我們將Throwables包裝到Errors上拋(會(huì)到線程的UncaughtExceptionHandler去處理)
 * 任何上拋的異常都會(huì)導(dǎo)致線程die
 *
 * 5. After task.run completes, we call afterExecute, which may
 * also throw an exception, which will also cause thread to
 * die. According to JLS Sec 14.20, this exception is the one that
 * will be in effect even if task.run throws.
 * 任務(wù)執(zhí)行結(jié)束后奄容,調(diào)用afterExecute(),也可能拋異常产徊,也會(huì)導(dǎo)致線程die
 * 根據(jù)JLS Sec 14.20昂勒,這個(gè)異常(finally中的異常)會(huì)生效
 *
 * The net effect of the exception mechanics is that afterExecute
 * and the thread's UncaughtExceptionHandler have as accurate
 * information as we can provide about any problems encountered by
 * user code.
 *
 *
 * @param w the worker
 */
final void runWorker(Worker w) {
//      runWorker()這個(gè)方法執(zhí)行節(jié)點(diǎn)是Worker.run(),而run()方法執(zhí)行節(jié)點(diǎn)是addWorker()里的worker.t.start()舟铜。
//      所以執(zhí)行到這個(gè)方法的時(shí)候戈盈,說明線程已經(jīng)start()了。
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;

    // 目的是調(diào)用worker的tryRelease()方法,將state 置為 0塘娶。
    // 意思是當(dāng)前線程已經(jīng)運(yùn)行了归斤,如果有人需要intercept()中斷。就可以調(diào)用了刁岸。
    w.unlock(); // allow interrupts

    //標(biāo)識(shí)線程是否為正常退出的脏里。不拋意想不到的異常就會(huì)將這個(gè)值置為false。
    //這個(gè)標(biāo)識(shí)會(huì)傳到processWorkerExit()中虹曙,在里邊判斷如果不是正常退出會(huì)啟動(dòng)一個(gè)新worker來繼續(xù)處理出現(xiàn)問題的task迫横。
    boolean completedAbruptly = true;
    try {
        //判斷firstTask不為空,或者從workQueue阻塞隊(duì)列中拿到了任務(wù)酝碳。就就開始執(zhí)行task矾踱。
        //自旋
        while (task != null || (task = getTask()) != null) {
            //開始任務(wù)之前先加鎖,而且是不可重入的互斥鎖疏哗。不可重入的目的在Worker中已經(jīng)說過了呛讲。
            //上鎖,不是為了防止并發(fā)執(zhí)行任務(wù)返奉,為了在shutdown()時(shí)不終止正在運(yùn)行的worker
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            /**
             * clearInterruptsForTaskRun操作
             * 確保只有在線程stoping時(shí)贝搁,才會(huì)被設(shè)置中斷標(biāo)示,否則清除中斷標(biāo)示
             * 1衡瓶、如果線程池狀態(tài)>=stop徘公,且當(dāng)前線程沒有設(shè)置中斷狀態(tài)牲证,wt.interrupt()
             * 2哮针、如果一開始判斷線程池狀態(tài)<stop丧诺,但Thread.interrupted()為true淤年,即線程已經(jīng)被中斷,又清除了中斷標(biāo)示探橱,再次判斷線程池狀態(tài)是否>=stop
             *   是捂齐,再次設(shè)置中斷標(biāo)示蛮放,wt.interrupt()
             *   否,不做操作奠宜,清除中斷標(biāo)示后進(jìn)行后續(xù)步驟
             */
            // RUNNING    = -1
            // SHUTDOWN   =  0
            // STOP       =  1
            // TIDYING    =  2
            // TERMINATED =  3
            if (
                    (runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                     && !wt.isInterrupted()
            )
                wt.interrupt();
            try {
                //任務(wù)執(zhí)行前(子類實(shí)現(xiàn)包颁,可自定義操作)
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //執(zhí)行任務(wù)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //任務(wù)執(zhí)行后(子類實(shí)現(xiàn),可自定義操作)
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;//完成任務(wù)數(shù)+1
                //解鎖
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //處理worker的退出
        processWorkerExit(w, completedAbruptly);
    }
}

執(zhí)行流程:

1压真、Worker線程啟動(dòng)后娩嚼,通過Worker類的run()方法調(diào)用runWorker(this)
2、執(zhí)行任務(wù)之前滴肿,首先worker.unlock()岳悟,將AQS的state置為0,允許中斷當(dāng)前worker線程
3、開始執(zhí)行firstTask贵少,調(diào)用task.run()呵俏,在執(zhí)行任務(wù)前會(huì)上鎖wroker.lock(),在執(zhí)行完任務(wù)后會(huì)解鎖滔灶,為了防止在任務(wù)運(yùn)行時(shí)被線程池一些中斷操作中斷
4普碎、在任務(wù)執(zhí)行前后,可以根據(jù)業(yè)務(wù)場(chǎng)景自定義beforeExecute() 和 afterExecute()方法
5宽气、無論在beforeExecute()随常、task.run()、afterExecute()發(fā)生異常上拋萄涯,都會(huì)導(dǎo)致worker線程終止绪氛,進(jìn)入processWorkerExit()處理worker退出的流程
6、如正常執(zhí)行完當(dāng)前task后涝影,會(huì)通過getTask()從阻塞隊(duì)列中獲取新任務(wù)枣察,當(dāng)隊(duì)列中沒有任務(wù),且獲取任務(wù)超時(shí)燃逻,那么當(dāng)前worker也會(huì)進(jìn)入退出流程

<span id="jump5">五序目、ThreadPoolExecutor.getTask()方法</span>

[圖片上傳失敗...(image-7d2362-1618402940252)]

/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
*    a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
*    workers are subject to termination (that is,
*    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
*    both before and after the timed wait, and if the queue is
*    non-empty, this worker is not the last thread in the pool.
*
* 執(zhí)行阻塞隊(duì)列的:阻塞獲取方法:take() 或者 超時(shí)等待方法:poll(timeout)方法,取決于當(dāng)前的配置伯襟。
* 如果這個(gè)worker返回null猿涨,必須滿足如下條件:
* 1、超過最大線程數(shù)量(因?yàn)檎{(diào)用了setMaximumPoolSize())
* 2姆怪、線程池stop了
* 3叛赚、線程池shutdown了,并且任務(wù)隊(duì)列queue為空
* 4稽揭、這個(gè)worker不是核心線程俺附、或者設(shè)置了允許核心線程退出。并且超過了keepAliveTime的等待時(shí)間溪掀,
* 仍然沒有獲取到task事镣,那么return null
*
*
*  如果返回null,那么worker就是要退出了揪胃,所以把工作計(jì)數(shù) - 1
* @return task, or null if the worker must exit, in which case
*         workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
    // 獲取線程池狀態(tài)
    int c = ctl.get();
    int rs = runStateOf(c);

    //線程池狀態(tài)已經(jīng)在SHUTDOWN之后了 && (線程池狀態(tài)在STOP之后了  ||  工作隊(duì)列為空了)
    //那么返回null璃哟,退出線程。并且調(diào)用decrementWorkerCount喊递,(CAS)方式核減調(diào)當(dāng)前線程數(shù)随闪。
//            RUNNING    = -1 << COUNT_BITS;
//            SHUTDOWN   =  0 << COUNT_BITS;
//            STOP       =  1 << COUNT_BITS;
//            TIDYING    =  2 << COUNT_BITS;
//            TERMINATED =  3 << COUNT_BITS;
    // Check if queue empty only if necessary.
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//                (CAS)方式核減調(diào)當(dāng)前線程數(shù)。
        //方法里最終是循環(huán)調(diào)用 ctl.compareAndSet(expect, expect - 1);册舞,直到成功
        decrementWorkerCount();
        return null;
    }

    // 獲取當(dāng)前線程數(shù)
    int wc = workerCountOf(c);

    // worker是否允許退出蕴掏?
    // 設(shè)置了允許核心線程退出 || 當(dāng)前線程數(shù) > 核心線程數(shù)
    // Are workers subject to culling?
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    //(當(dāng)前線程數(shù) > maximumPollSize || (允許線程退出 && timedOut超時(shí)事件過后仍然沒有任務(wù)))
    // && (線程數(shù) > 1 || 任務(wù)隊(duì)列queue為空)
    if (
            (wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())
    ) {
        //調(diào)用CAS方式,把工作線程數(shù)-1,如果-1成功就直接返回null盛杰。如果失敗就再循環(huán)一圈挽荡。
        if (compareAndDecrementWorkerCount(c))
            return null;
        continue;
    }

    try {
        //允許線程退出 ? 調(diào)用poll() : 調(diào)用take()
        //poll(keepAliveTime)方法即供,是等待超過keepAliveTime之后會(huì)返回null定拟。
        //take()方法,是一直處于阻塞狀態(tài)逗嫡,直到隊(duì)列中有新數(shù)據(jù)插入時(shí)青自,拿到數(shù)據(jù)。
        //workQueue的源碼分析過驱证,
        // 1延窜、其實(shí)就是take()獲取時(shí)通過線程鎖的Condition屬性控制線程睡眠掛起await()
        //      Condition notEmpty = lock.newCondition()
        //      notEmpty.await()睡眠等待。
        // 2抹锄、在插入時(shí)逆瑞,notEmpty.signal();進(jìn)行線程喚醒而已。
        Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
        if (r != null)
            return r;
        //執(zhí)行到這伙单,意味著當(dāng)前線程具備退出條件了获高。下次循環(huán)后再結(jié)合其他條件確定是否返回null
        timedOut = true;
    } catch (InterruptedException retry) {
        //如果拋異常,那么退出標(biāo)識(shí)就先置為false吻育,繼續(xù)自旋
        timedOut = false;
    }
}
}

核心就是利用workQueue阻塞隊(duì)列的特性:

A念秧、workQueue.poll():如果在keepAliveTime時(shí)間內(nèi),阻塞隊(duì)列還是沒有任務(wù)布疼,返回null
B摊趾、workQueue.take():如果阻塞隊(duì)列為空,當(dāng)前線程會(huì)被掛起等待await()缎除;當(dāng)隊(duì)列中有任務(wù)加入時(shí)严就,線程被喚醒signal()总寻,take方法返回任務(wù)

<span id="jump6">六器罐、ThreadPoolExecutor.processWorkerExit()方法</span>

/**
 * Performs cleanup and bookkeeping for a dying worker. Called
 * only from worker threads. Unless completedAbruptly is set,
 * assumes that workerCount has already been adjusted to account
 * for exit.  This method removes thread from worker set, and
 * possibly terminates the pool or replaces the worker if either
 * it exited due to user task exception or if fewer than
 * corePoolSize workers are running or queue is non-empty but
 * there are no workers.
 *
 * 為被干掉的worker調(diào)用清理方法和記錄。這個(gè)方法只會(huì)被worker線程調(diào)用渐行。
 * @param w the worker
 * @param completedAbruptly if the worker died due to user exception
 */
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
    /**
     * 1轰坊、worker數(shù)量-1
     * 如果是突然終止,說明是task執(zhí)行時(shí)異常情況導(dǎo)致祟印,即run()方法執(zhí)行時(shí)發(fā)生了異常肴沫,那么正在工作的worker線程數(shù)量需要-1
     * 如果不是突然終止,說明是worker線程沒有task可執(zhí)行了蕴忆,不用-1颤芬,因?yàn)橐呀?jīng)在getTask()方法中-1了
     */
    //在runWorker中拋異常了才為true,那么就調(diào)用CAS方式將工作線程-1
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    /**
     * 2、從Workers Set中移除worker
     */
    //線程加鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //已完成任務(wù)總數(shù) += worker的完成數(shù)
        completedTaskCount += w.completedTasks;
        //將該worker從HashSet中移除
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    /**
     * 3站蝠、在對(duì)線程池有負(fù)效益的操作時(shí)汰具,都需要“嘗試終止”線程池
     * 主要是判斷線程池是否滿足終止的狀態(tài)
     * 如果狀態(tài)滿足,但還有線程池還有線程菱魔,嘗試對(duì)其發(fā)出中斷響應(yīng)留荔,使其能進(jìn)入退出流程
     * 沒有線程了,更新狀態(tài)為tidying->terminated
     */
    tryTerminate();

    /**
     * 4澜倦、是否需要增加worker線程
     *    1.線程池狀態(tài)是running 或 shutdown
     *    2.如果當(dāng)前線程是突然終止的聚蝶,addWorker()
     *    3.如果當(dāng)前線程不是突然終止的,但當(dāng)前線程數(shù)量 < 要維護(hù)的線程數(shù)量藻治,addWorker()
     * 故如果調(diào)用線程池shutdown()碘勉,直到workQueue為空前,
     * 線程池都會(huì)維持corePoolSize個(gè)或者1個(gè)線程桩卵,然后再逐漸銷毀這corePoolSize個(gè)或者1個(gè)線程
     */
    int c = ctl.get();
    //如果狀態(tài)是running恰聘、shutdown,即tryTerminate()沒有成功終止線程池吸占,嘗試再添加一個(gè)worker
    if (runStateLessThan(c, STOP)) {
        //不是突然完成的晴叨,即沒有task任務(wù)可以獲取而完成的,計(jì)算min矾屯,并根據(jù)當(dāng)前worker數(shù)量判斷是否需要addWorker()
        if (!completedAbruptly) {
            //最小值min = 如果允許核心線程退出兼蕊,那么最小值就是0。否則就是核心線程數(shù)件蚕。
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            //如果最小值為0 && 工作隊(duì)列不為空孙技,那么意味著任務(wù)沒有消化完,至少還需要一個(gè)worker去消化排作。
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            //如果當(dāng)前的工作worker數(shù)量 >= 最小值牵啦,就不需要加worker了。否則就會(huì)執(zhí)行下邊的addWorker(null,false)方法妄痪,添加worker
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        //添加一個(gè)沒有firstTask的worker
        //只要worker是completedAbruptly突然終止的哈雏,或者線程數(shù)量小于要維護(hù)的數(shù)量,就新添一個(gè)worker線程衫生。
        //因?yàn)閽伋霎惓5那闆r可能是workQueue的task并沒有完成執(zhí)行完裳瘪。啟動(dòng)一個(gè)空task的worker去消化workQueue隊(duì)列
        addWorker(null, false);
    }
}

<span id="jump7">七、ThreadPoolExecutor.tryTerminate()方法</span>

/**
 * Transitions to TERMINATED state if either (SHUTDOWN and pool
 * and queue empty) or (STOP and pool empty).  If otherwise
 * eligible to terminate but workerCount is nonzero, interrupts an
 * idle worker to ensure that shutdown signals propagate. This
 * method must be called following any action that might make
 * termination possible -- reducing worker count or removing tasks
 * from the queue during shutdown. The method is non-private to
 * allow access from ScheduledThreadPoolExecutor.
 * 
 * 在以下情況將線程池變?yōu)門ERMINATED終止?fàn)顟B(tài)
 * shutdown 且 正在運(yùn)行的worker 和 workQueue隊(duì)列 都empty
 * stop 且  沒有正在運(yùn)行的worker
 * 
 * 這個(gè)方法必須在任何可能導(dǎo)致線程池終止的情況下被調(diào)用罪针,如:
 * 減少worker數(shù)量
 * shutdown時(shí)從queue中移除任務(wù)
 * 
 * 這個(gè)方法不是私有的彭羹,所以允許子類ScheduledThreadPoolExecutor調(diào)用
 */
final void tryTerminate() {
    //這個(gè)for循環(huán)主要是和進(jìn)入關(guān)閉線程池操作的CAS判斷結(jié)合使用的
    for (;;) {
        int c = ctl.get();
         
        /**
         * 線程池是否需要終止
         * 如果以下3中情況任一為true,return泪酱,不進(jìn)行終止
         * 1派殷、還在運(yùn)行狀態(tài)
         * 2还最、狀態(tài)是TIDYING、或 TERMINATED毡惜,已經(jīng)終止過了
         * 3憋活、SHUTDOWN 且 workQueue不為空
         */
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
         
        /**
         * 只有shutdown狀態(tài) 且 workQueue為空,或者 stop狀態(tài)能執(zhí)行到這一步
         * 如果此時(shí)線程池還有線程(正在運(yùn)行任務(wù)虱黄,正在等待任務(wù))
         * 中斷喚醒一個(gè)正在等任務(wù)的空閑worker
         * 喚醒后再次判斷線程池狀態(tài)悦即,會(huì)return null,進(jìn)入processWorkerExit()流程
         */
        if (workerCountOf(c) != 0) { // Eligible to terminate 資格終止
            interruptIdleWorkers(ONLY_ONE); //中斷workers集合中的空閑任務(wù)橱乱,參數(shù)為true辜梳,只中斷一個(gè)
            return;
        }
 
        /**
         * 如果狀態(tài)是SHUTDOWN,workQueue也為空了泳叠,正在運(yùn)行的worker也沒有了作瞄,開始terminated
         */
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //CAS:將線程池的ctl變成TIDYING(所有的任務(wù)被終止,workCount為0危纫,為此狀態(tài)時(shí)將會(huì)調(diào)用terminated()方法)宗挥,期間ctl有變化就會(huì)失敗,會(huì)再次for循環(huán)
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated(); //需子類實(shí)現(xiàn)
                } 
                finally {
                    ctl.set(ctlOf(TERMINATED, 0)); //將線程池的ctl變成TERMINATED
                    termination.signalAll(); //喚醒調(diào)用了 等待線程池終止的線程 awaitTermination() 
                }
                return;
            }
        }
        finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
        // 如果上面的CAS判斷false种蝶,再次循環(huán)
    }
}

<span id="jump8">八契耿、ThreadPoolExecutor.interruptIdleWorkers()方法</span>

/**
 * Interrupts threads that might be waiting for tasks (as
 * indicated by not being locked) so they can check for
 * termination or configuration changes. Ignores
 * SecurityExceptions (in which case some threads may remain
 * uninterrupted).
 * 中斷在等待任務(wù)的線程(沒有上鎖的),中斷喚醒后螃征,可以判斷線程池狀態(tài)是否變化來決定是否繼續(xù)
 *
 * @param onlyOne If true, interrupt at most one worker. This is
 * called only from tryTerminate when termination is otherwise
 * enabled but there are still other workers.  In this case, at
 * most one waiting worker is interrupted to propagate shutdown
 * signals in case(以免) all threads are currently waiting.
 * Interrupting any arbitrary thread ensures that newly arriving
 * workers since shutdown began will also eventually exit.
 * To guarantee eventual termination, it suffices to always
 * interrupt only one idle worker, but shutdown() interrupts all
 * idle workers so that redundant workers exit promptly, not
 * waiting for a straggler task to finish.
 * 
 * onlyOne如果為true搪桂,最多interrupt一個(gè)worker
 * 只有當(dāng)終止流程已經(jīng)開始,但線程池還有worker線程時(shí),tryTerminate()方法會(huì)做調(diào)用onlyOne為true的調(diào)用
 * (終止流程已經(jīng)開始指的是:shutdown狀態(tài) 且 workQueue為空盯滚,或者 stop狀態(tài))
 * 在這種情況下踢械,最多有一個(gè)worker被中斷,為了傳播shutdown信號(hào)魄藕,以免所有的線程都在等待
 * 為保證線程池最終能終止内列,這個(gè)操作總是中斷一個(gè)空閑worker
 * 而shutdown()中斷所有空閑worker,來保證空閑線程及時(shí)退出
 */
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); //上鎖
    try {
        for (Worker w : workers) { 
            Thread t = w.thread;
            //w.tryLock()背率,只有執(zhí)行完task话瞧,正在getTask()阻塞狀態(tài)的worker才能獲取到lock。因?yàn)閞unWorker()時(shí)獲取到task,會(huì)先lock()
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock(); //解鎖
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末退渗,一起剝皮案震驚了整個(gè)濱河市移稳,隨后出現(xiàn)的幾起案子蕴纳,更是在濱河造成了極大的恐慌会油,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,036評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件古毛,死亡現(xiàn)場(chǎng)離奇詭異翻翩,居然都是意外死亡都许,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門嫂冻,熙熙樓的掌柜王于貴愁眉苦臉地迎上來胶征,“玉大人,你說我怎么就攤上這事桨仿【Φ停” “怎么了?”我有些...
    開封第一講書人閱讀 164,411評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵服傍,是天一觀的道長(zhǎng)钱雷。 經(jīng)常有香客問我,道長(zhǎng)吹零,這世上最難降的妖魔是什么罩抗? 我笑而不...
    開封第一講書人閱讀 58,622評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮灿椅,結(jié)果婚禮上套蒂,老公的妹妹穿的比我還像新娘。我一直安慰自己茫蛹,他們只是感情好操刀,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,661評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著婴洼,像睡著了一般馍刮。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上窃蹋,一...
    開封第一講書人閱讀 51,521評(píng)論 1 304
  • 那天卡啰,我揣著相機(jī)與錄音,去河邊找鬼警没。 笑死匈辱,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的杀迹。 我是一名探鬼主播亡脸,決...
    沈念sama閱讀 40,288評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼树酪!你這毒婦竟也來了浅碾?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,200評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤续语,失蹤者是張志新(化名)和其女友劉穎垂谢,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體疮茄,經(jīng)...
    沈念sama閱讀 45,644評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡滥朱,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,837評(píng)論 3 336
  • 正文 我和宋清朗相戀三年根暑,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片徙邻。...
    茶點(diǎn)故事閱讀 39,953評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡排嫌,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出缰犁,到底是詐尸還是另有隱情淳地,我是刑警寧澤,帶...
    沈念sama閱讀 35,673評(píng)論 5 346
  • 正文 年R本政府宣布帅容,位于F島的核電站薇芝,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏丰嘉。R本人自食惡果不足惜夯到,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,281評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望饮亏。 院中可真熱鬧路幸,春花似錦简肴、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至测摔,卻和暖如春舟肉,著一層夾襖步出監(jiān)牢的瞬間黄琼,已是汗流浹背围苫。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工腺占, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,119評(píng)論 3 370
  • 正文 我出身青樓漱贱,卻偏偏與公主長(zhǎng)得像蚜退,于是被迫代替她去往敵國(guó)和親配猫。 傳聞我的和親對(duì)象是個(gè)殘疾皇子捆交,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,901評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容