ThreadPoolExecutor原理

1 ThreadPoolExecutor介紹

ThreadPoolExecutor是線程池是實(shí)現(xiàn)怒坯。包含了對(duì)線程生命周期的管理视译。ThreadPoolExecutor的核心參數(shù)包括:

    //線程工廠酷含,設(shè)置線程名字,方便定位線程
    private volatile ThreadFactory threadFactory;

    //超過(guò)最大線程數(shù)量之后執(zhí)行的飽和策略
     1.AbortPolicy:直接拋出異常。     2.CallerRunsPolicy:只用調(diào)用者所在線程來(lái)運(yùn)行任務(wù)慌闭。
     3.DiscardOldestPolicy:丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)丧失。
     4.DiscardPolicy:不處理布讹,丟棄掉。
     5.也可以根據(jù)自定義策略。如先持久化呼股,記錄日志或者數(shù)據(jù)庫(kù)吸奴,后續(xù)補(bǔ)償。
    private volatile RejectedExecutionHandler handler;

    //線程空閑時(shí)間逞度,超過(guò)這個(gè)時(shí)間,線程自動(dòng)退出(默認(rèn)針對(duì)超出corePoolSize的線程)
    private volatile long keepAliveTime;

    //是否回收核心線程
    private volatile boolean allowCoreThreadTimeOut;

    //核心線程數(shù)量:
    //小于該數(shù)量馆匿,執(zhí)行任務(wù)時(shí)會(huì)創(chuàng)建新的線程
    //默認(rèn)核心線程是不會(huì)回收的,當(dāng)然也可以設(shè)置成可回收
    private volatile int corePoolSize;

    //最大線程數(shù)量(包括線程隊(duì)列里面的線程數(shù)量),達(dá)到這個(gè)線程數(shù)的時(shí)候會(huì)執(zhí)行飽和策略
    private volatile int maximumPoolSize;

    //線程任務(wù)隊(duì)列
     1.ArrayBlockingQueue:基于數(shù)組的有界阻塞隊(duì)列呕臂,此隊(duì)列有序: FIFO(先進(jìn)先出)歧蒋。
     2.LinkedBlockingQueue:基于鏈表的阻塞隊(duì)列吴叶,此隊(duì)列有序: 吞吐量通常要高于ArrayBlockingQueue
     3.SynchronousQueue:不存儲(chǔ)元素的阻塞隊(duì)列敌呈。每個(gè)插入操作必須等到另一個(gè)線程調(diào)用取出操作吭练,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue
     4.PriorityBlockingQueue:具有優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列分尸。
    private final BlockingQueue<Runnable> workQueue;

    //默認(rèn)飽和策略
    private static final RejectedExecutionHandler defaultHandler =
            new ThreadPoolExecutor.AbortPolicy();

注意:

threadFactory一般自定義為線程池創(chuàng)建線程用尺上,通常會(huì)為線程創(chuàng)建一個(gè)易于理解的名稱卑吭。

allowCoreThreadTimeOut默認(rèn)是false马绝,當(dāng)為true是可以回收空閑的核心線程數(shù)豆赏。

handler通常可以自定義拒絕策略富稻,可以實(shí)現(xiàn)任務(wù)持久化河绽,便于后期補(bǔ)償。

2 ThreadPoolExecutor的狀態(tài)

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 000-1111111111111111111111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// 111 - 0000000000000000000000000000(十進(jìn)制: -536, 870, 912) 
private static final int RUNNING    = -1 << COUNT_BITS;
// 000 - 0000000000000000000000000(十進(jìn)制: 0)
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001 - 00000000000000000000000000(十進(jìn)制: 536,870唉窃, 912)
private static final int STOP       =  1 << COUNT_BITS;
// 010 - 00000000000000000000000000.(十進(jìn)制值: 1, 073, 741, 824)
private static final int TIDYING    =  2 << COUNT_BITS;
// 011 - 000000000000000000000000000(十進(jìn)制值: 1, 610,612, 736)
private static final int TERMINATED =  3 << COUNT_BITS;

ThreadPoolExecutor通過(guò)ctl保存線程池的狀態(tài)和工作線程數(shù),ctl是一個(gè)32位整數(shù)蔓涧,高3位表示線程池狀態(tài)鉴未,低29位表示工作線程數(shù)量(workCount)巍糯,所有工作線程的最大數(shù)量是2^29-1咱筛。

線程池的狀態(tài)數(shù)值大小一次是RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED,并且只有RUNNING狀態(tài)是負(fù)數(shù)盟猖,這樣設(shè)計(jì)的原因是方便狀態(tài)判斷你弦。

線程池狀態(tài)分為5種RUNNINGSHUTDOWN磁浇、STOPTIDYING后专、TERMINATED

  • RUNNING:(運(yùn)行)接收新的任務(wù)并處理隊(duì)列里的任務(wù)
  • SHUTDOWN:(關(guān)閉)不接收新的任務(wù)殷蛇,但是隊(duì)列中任務(wù)會(huì)執(zhí)行
  • STOP:(停止)不接收新的任務(wù)露氮,不處理隊(duì)列中的任務(wù),并終止正在處理的任務(wù)
  • TIDYING:(整理)所有任務(wù)都被終止的,同時(shí)工作線程數(shù)量為0
  • TERMINATED:(已終止) terminated()方法執(zhí)行結(jié)束后會(huì)進(jìn)入這一狀態(tài)慧瘤,表示線程池已關(guān)閉桦沉。terminated()方法默認(rèn)是空方法,可以自定義實(shí)現(xiàn)淘菩。

下面是這五種狀態(tài)的轉(zhuǎn)換:

  • RUNNING -> SHUTDOWN:通過(guò)調(diào)用shutdown()方法實(shí)現(xiàn)殖告。

  • (RUNNING or SHUTDOWN) -> STOP:通過(guò)調(diào)用shutdownNow()方法實(shí)現(xiàn)。

  • SHUTDOWN -> TIDYING:task隊(duì)列和墓懂,worker集合為空焰宣。

  • STOP -> TIDYING:worker集合為空。

  • TIDYING -> TERMINATED:執(zhí)行完terminated()方法后捕仔。

注意:

ThreadPoolExecutor只提供了shutdown()shutdownNow()兩個(gè)方法轉(zhuǎn)換狀態(tài)匕积。其他狀態(tài)的轉(zhuǎn)變都是隱式裝換(tryTerminate()方法中完成)。但是可以通過(guò)如下方法查看線程池狀態(tài):

public boolean isTerminating() {  
    int c = ctl.get();
    return ! isRunning(c) && runStateLessThan(c, TERMINATED);
}
public boolean isTerminated() {
    return runStateAtLeast(ctl.get(), TERMINATED);
}
public boolean isShutdown() {
    return ! isRunning(ctl.get());
}
2.1 ThreadPoolExecutor獲取狀態(tài)和線程數(shù)
    //~CAPACITY: 11100000000000000000000000000001
    //return c & ~CAPACITY:獲取c高3位
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //CAPACITY:00011111111111111111111111111111
    //c & CAPACITY:返回的就是c的低29位
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //SHUTDOWN是0榜跌,小于SHUTDOWN闪唆,也就是判斷是否是RUNNING
    private static boolean isRunning(int c)     { return c < SHUTDOWN;}
    //通過(guò)狀態(tài)和線程數(shù)量組合ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }
2.2 線程池狀態(tài)判斷
// 判斷當(dāng)前線程池運(yùn)行狀態(tài)值是否小于給定值
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
// 判斷當(dāng)前線程池運(yùn)行狀態(tài)值是否大于等于給定值
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

3 ThreadPoolExecutor的運(yùn)行過(guò)程

3.1 execute方法
 public void execute(Runnable command) {
        if (command == null)
            //不能為null
            throw new NullPointerException();
        int c = ctl.get();
        //返回ctl的低29位,判斷是否小于核心線程數(shù)
        if (workerCountOf(c) < corePoolSize) {
            //創(chuàng)建核心線程執(zhí)行任務(wù)
            if (addWorker(command, true))
                //成功就返回
                return;
            //失敗重新獲取ctl
            c = ctl.get();
        }
        //走到這钓葫,說(shuō)明大于核心線程數(shù)悄蕾,則判斷是否是運(yùn)行狀態(tài),是就往隊(duì)列中加任務(wù)
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //如果不是RUNNING础浮,就移除當(dāng)前任務(wù)
            if (! isRunning(recheck) && remove(command))
                //失敗就執(zhí)行拒絕策略
                reject(command);
            //判斷線程總數(shù)量是否為0(有可能線程執(zhí)行任務(wù)報(bào)錯(cuò)了帆调,然后就被銷毀了)
            else if (workerCountOf(recheck) == 0)
                //創(chuàng)建非核心線程執(zhí)行任務(wù)(注意這里是null)
                addWorker(null, false);
        }
        //走到這,說(shuō)明不是運(yùn)行狀態(tài)或者隊(duì)列滿了豆同,就創(chuàng)建非核心線程執(zhí)行任務(wù)
        else if (!addWorker(command, false))
            //失敗就執(zhí)行拒絕策略
            reject(command);
    }

上述代碼的執(zhí)行流程如下:

注意:

只有在線程池處于RUNNING狀態(tài)時(shí)提交任務(wù)的任務(wù)才會(huì)被執(zhí)行番刊,否則執(zhí)行拒絕策略。

3.2 addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // 運(yùn)行狀態(tài)
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 分兩種情況考慮:
            // 1影锈、如果線程池狀態(tài)>SHUTDOWN芹务,即(STOP,TIDYING,TERMINATED)中的一個(gè),直接返回false(因?yàn)檫@些條件下要求 清空隊(duì)列鸭廷,正在運(yùn)行的任務(wù)也要停止)
            // 2枣抱、如果線程池狀態(tài)=SHUTDOWN & (firstTask!=null || 隊(duì)列為空)直接返回false, 
            // 換言之,在SHUTDOWN狀態(tài)下, 想要?jiǎng)?chuàng)建一個(gè)firstTask為空的新worker靴姿,需要確保隊(duì)列不為空(隊(duì)列為空就意味著這個(gè)新的worker暫時(shí)還沒有任務(wù)可以執(zhí)行沃但,所以也沒有創(chuàng)建worker的必要, 因?yàn)檫@個(gè)worker是用來(lái)消化隊(duì)列中的任務(wù))
            // 大部分情況下,我們需要新創(chuàng)建的worker的firstTask都有初始化任務(wù), 在SHUTDOWN狀態(tài)下佛吓,就不允許在創(chuàng)建worker了宵晚,直接返回false
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 計(jì)算worker的數(shù)量
                int wc = workerCountOf(c);
                // 如果worker數(shù)量已經(jīng)超過(guò)指定大小,則不允許創(chuàng)建
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 嘗試cas累加worker的數(shù)量维雇,如果成功就跳出最外層循環(huán)    
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 本次cas失敗淤刃,表明ctl已經(jīng)變了,檢查線程池狀態(tài)吱型,如果狀態(tài)變了就跳到最外層重新執(zhí)行一次
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
                // 如果到這里了說(shuō)明是worker的數(shù)量改變了導(dǎo)致的cas失敗逸贾,那就在內(nèi)層自旋操作 再來(lái)一次
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    // 狀態(tài)rs < SHUTDOWN表明是RUNNING狀態(tài)
                    // 如果線程池的狀態(tài)是SHUTDOWN,那么創(chuàng)建的worker是用來(lái)處理隊(duì)列中的任務(wù),因此需要滿足firstTask == null
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 如果線程提前start則認(rèn)為是異常狀態(tài)
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        // 就是記錄一下線程池在運(yùn)行中線程數(shù)量的最大值
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 這里啟動(dòng)worker運(yùn)行
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                // 如果添加worker失敗了铝侵,這里需要處理一下
                addWorkerFailed(w);
        }
        return workerStarted;
    }  

當(dāng)創(chuàng)建新worker時(shí)灼伤,firstTask為null的含義:

  • 首先需要知道的是worker執(zhí)行的任務(wù)的來(lái)源,一是初始化時(shí)的firstTask 即當(dāng)前worker的第一個(gè)任務(wù), 二是從隊(duì)列里取

  • firstTask表示當(dāng)前worker初始化任務(wù)咪鲜,也就是第一個(gè)要執(zhí)行的任務(wù)狐赡;如果firstTask=null, 說(shuō)明此worker只從隊(duì)列中取任務(wù)來(lái)執(zhí)行

  • 所以當(dāng)創(chuàng)建firstTask為null的worker時(shí),只有隊(duì)列不為空才有創(chuàng)建的必要疟丙,因?yàn)槟康氖侨ハ?duì)列中的任務(wù)

分三種情況來(lái)看創(chuàng)建worker:

  • 正常情況下線程池的狀態(tài)是RUNNING颖侄,這個(gè)時(shí)候只需根據(jù)corePoolSize或者maximumPoolSize來(lái)判斷是否應(yīng)該創(chuàng)建新的woker

  • 如果是STOP,TIDYING,TERMINATED狀態(tài),表明線程池處于清理資源享郊,關(guān)閉線程池(清空隊(duì)列览祖,終止正在運(yùn)行的任務(wù),清空worker)炊琉,這個(gè)時(shí)候不允許創(chuàng)建新的worker

  • SHUTDOWN狀態(tài)展蒂,此狀態(tài)比較特殊,因?yàn)樵诖藸顟B(tài)會(huì)繼續(xù)處理隊(duì)列中的任務(wù)苔咪,但是不允許往隊(duì)列中新增任務(wù)玄货,同時(shí)正常處理的任務(wù)也會(huì)繼續(xù)處理; 此狀態(tài)下悼泌,在firstTask為null并且隊(duì)列不為空的情況下可以創(chuàng)建新的worker來(lái)處理隊(duì)列中的任務(wù)松捉,其他情況是不允許的

3.3 addWorkerFailed方法

當(dāng)worker啟動(dòng)失敗時(shí)處理:

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            // worker計(jì)數(shù)器減1    
            decrementWorkerCount();
            // 嘗試終止線程池
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    } 

4 Worker線程

worker繼承AbstractQueuedSynchronizer,實(shí)現(xiàn)了非重入鎖

  • state=0, 表示鎖未被持有
  • state=1, 表示鎖被持有
  • state=-1, 初始化的值馆里,防止worker線程在真正運(yùn)行task之前被中斷隘世,

Worker也是一個(gè)任務(wù),實(shí)現(xiàn)了Runnable接口鸠踪。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {

        // 每個(gè)worker中都會(huì)封裝一個(gè)真正用于處理任務(wù)的線程
        final Thread thread;
        // 這個(gè)worker初始化的時(shí)候分配的首個(gè)任務(wù)
        Runnable firstTask;
        // 記錄當(dāng)前worker處理完的任務(wù)數(shù)量
        volatile long completedTasks;

        // 構(gòu)造方法
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            // 使用ThreadFactory創(chuàng)建線程
            // 這里this是當(dāng)前實(shí)現(xiàn)了Runnable接口的Worker對(duì)象丙者,也就是說(shuō)當(dāng)我調(diào)用thread.start()方法時(shí),就會(huì)調(diào)用worker的入口方法run
            this.thread = getThreadFactory().newThread(this);
        }

        // 基于AQS實(shí)現(xiàn)的非重入鎖
        protected boolean isHeldExclusively() {
             return getState() != 0;
         }

         // 實(shí)際并未使用傳入的參數(shù)
         // state=0表示可以獲取营密,將state設(shè)置為1表示獲取成功
         protected boolean tryAcquire(int unused) {
             if (compareAndSetState(0, 1)) {
                 setExclusiveOwnerThread(Thread.currentThread());
                 return true;
             }
             return false;
         }

         // 實(shí)際并未使用傳入的參數(shù)
         // 將state設(shè)置為0表示鎖釋放成功
         protected boolean tryRelease(int unused) {
             setExclusiveOwnerThread(null);
             setState(0);
             return true;
         }

         public void lock()        { acquire(1); }
         public boolean tryLock()  { return tryAcquire(1); }
         public void unlock()      { release(1); }
         public boolean isLocked() { return isHeldExclusively(); }        

        // worker的主要入口邏輯
        public void run() {
            runWorker(this);
        }
}
4.1 runWorker方法
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 和setState(-1); 相對(duì)應(yīng)
        // 這個(gè)時(shí)候state=0了械媒,也就可以被中斷了,(ThreadPoolExecutor.Worker#interruptIfStarted)
        w.unlock(); // allow interrupts
        // 是否有異常產(chǎn)生
        boolean completedAbruptly = true;
        try {
            // task的來(lái)源要么是firstTask评汰,要么是隊(duì)列
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 如果線程池是STOP及之后的狀態(tài)纷捞,需要確保它是中斷的
                // 如果是STOP之前的狀態(tài),就要確保它不能被中斷(如果有的話就要清除中斷標(biāo)志被去,Thread.interrupted會(huì)清除中斷標(biāo)志)
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 執(zhí)行成功的任務(wù)數(shù)量++
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 處理worker退出
            //  一種是異常退出
            //  一種是普通線程(非核心)主儡,空閑時(shí)間到了退出
            processWorkerExit(w, completedAbruptly);
        }
    } 

注意:

Worker有兩種方式結(jié)束線程執(zhí)行:

  1. task.run()執(zhí)行業(yè)務(wù)方法時(shí)拋出異常,或者響應(yīng)wt.interrupt()中斷拋出異常惨缆。

  2. getTask()方法返回null糜值,正常退出執(zhí)行丰捷。

線程退出時(shí)會(huì)執(zhí)行processWorkerExit()方法,completedAbruptlytrue表示異常退出寂汇,false表示正常退出病往。

Worker線程通過(guò)加鎖改變state狀態(tài)來(lái)表示線程是否空閑。beforeExecuteafterExecute可以自定義實(shí)現(xiàn)擴(kuò)展骄瓣。

4.2 getTask方法
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            // 線程池狀態(tài)
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 兩種情況從隊(duì)列里面取不到task
            // 1荣恐、線程池是STOP狀態(tài),這個(gè)狀態(tài)會(huì)清空隊(duì)列累贤,同時(shí)停止正在處理的任務(wù),自然少漆,如果是這個(gè)狀態(tài)臼膏,直接返回null,表明取不到task
            // 2、線程池是SHUTDOWN狀態(tài)并且隊(duì)列為空示损,因?yàn)榇藸顟B(tài)下 隊(duì)列里是不會(huì)新增任何task渗磅,所以在隊(duì)列為空的情況下,自然也是取不到
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // worker數(shù)量減一
                decrementWorkerCount();
                return null;
            }

            // 計(jì)算worker數(shù)量
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 通過(guò)最大線程數(shù)量或者獲取task超時(shí) 來(lái)決定是否要消減此worker
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 1检访、timed期限的始鱼,非核心線程如果空閑時(shí)間超過(guò)keepAliveTime,就會(huì)被清理掉
                // 因此如果這里從阻塞隊(duì)列里在keepAliveTime時(shí)間內(nèi)都沒有取到task脆贵,說(shuō)明處理超時(shí)了
                // 2医清、沒有timed限制的,take阻塞的取task
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

此方法有兩處返回null卖氨,用于退出線程会烙。

第一處:

  • 當(dāng)線程池處于STOP及之后的狀態(tài)

  • 線程池處于SHUTDOWN狀態(tài)并且任務(wù)隊(duì)列沒有任務(wù)。

第二處:

  • 線程數(shù)超過(guò)最大線程數(shù)

  • 線程獲取任務(wù)超時(shí)

注意:

此方法是用于清除空閑線程筒捺,通過(guò)超時(shí)獲取任務(wù)隊(duì)列來(lái)清除非核心線程柏腻,通過(guò)設(shè)置allowCoreThreadTimeOuttrue也可以清除空閑的核心線程。

4.3 processWorkerExit方法

對(duì)worker執(zhí)行結(jié)束之后系吭,清理掉當(dāng)前worker之后考慮是否采取用新的worker來(lái)替換

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        //異常結(jié)束workerCount減一五嫂,正常結(jié)束在getTask已經(jīng)
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 當(dāng)前worker完成的task數(shù)量累加
            completedTaskCount += w.completedTasks;
            // 從woker Set中移除當(dāng)前worker
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        // 每次worker結(jié)束后都要嘗試終止線程池,說(shuō)不定某個(gè)時(shí)刻worker都被清理了并且達(dá)到了線程池終止的條件
        // 就可以從這里結(jié)束
        tryTerminate();

        int c = ctl.get();
        // 如果線程池狀態(tài)小于STOP
        if (runStateLessThan(c, STOP)) {
            // 如果不是異常
            // 如果用戶任務(wù)發(fā)生了異常肯尺,嘗試替換worker
            // 或者核心線程數(shù)量小于corePoolSize沃缘,嘗試添加worker
            // 或者隊(duì)列非空,但是已經(jīng)沒有worker了则吟,嘗試添加worker
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }

            // 這里新添加worker孩灯,沒有初始化的firstTask, 這里worker處理的任務(wù)來(lái)自于隊(duì)列
            addWorker(null, false);
        }
    }   

流程說(shuō)明:

  1. 從worker set中移除當(dāng)前worker

  2. 嘗試終止線程池

  3. 如果線程池狀態(tài)還未達(dá)到STOP,則可以根據(jù)以下情況添加新的worker

    • 用戶task拋出了異常(也就是completedAbruptly=true)

    • 運(yùn)行的線程數(shù)已經(jīng)小于核心線程數(shù)

    • 運(yùn)行的線程數(shù)為0逾滥,但是隊(duì)列中的任務(wù)不為空

5 線程池關(guān)閉

5.1 shutDown方法
public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 如果有security管理峰档,需要確保有權(quán)限去shutdown線程池
            checkShutdownAccess();
            // 設(shè)置線程池狀態(tài)為SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 中斷所有空閑線程
            interruptIdleWorkers();
            // 為ScheduledThreadPoolExecutor提供的鉤子方法
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }

        // 嘗試終止線程池
        tryTerminate();
    }
private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 空閑狀態(tài)下worker的鎖資源肯定是可以直接獲取到的败匹,因此根據(jù)此便可以判別線程是否空閑
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        // 打上中斷標(biāo)志
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    } 

流程說(shuō)明

  • 先將線程池的狀態(tài)改為SHUTDOWN
  • 嘗試中斷所有空閑線程(通過(guò)線程是否可加鎖判斷是否空閑)
  • 嘗試終止線程池
5.2 shutDownNow方法
 // 返回隊(duì)列中未執(zhí)行的task列表
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 將線池的狀態(tài)設(shè)置為STOP
            advanceRunState(STOP);
            // 中斷所有worker
            interruptWorkers();
            // 清空隊(duì)列
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }

        // 嘗試終止線程池
        // 雖然線程池狀態(tài)已經(jīng)改成了STOP狀態(tài),但還需要workers被清空之后才會(huì)真正變成TERMINATED狀態(tài)
        // 所以這里不一定會(huì)成功, runWorker方法中處理worker退出時(shí)會(huì)觸發(fā)tryTerminate
        tryTerminate();
        return tasks;
    }

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

        // Worker內(nèi)部方法
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }        

流程說(shuō)明:

  • 先將線程池的狀態(tài)改為STOP

  • 嘗試中斷所有線程

  • 清空任務(wù)隊(duì)列

  • 嘗試終止線程池

5.3 tryTerminate方法

嘗試關(guān)閉線程池

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 如果線程池是RUNNING狀態(tài)的
            // 如果線程池是TIDYING讥巡、TERMINATED狀態(tài)的不管(基本已經(jīng)處于關(guān)閉狀態(tài)了)
            // 如果線程池是SHUTDOWN狀態(tài)并且隊(duì)列中還有task(SHUTDOWN態(tài)下還是會(huì)處理現(xiàn)有的task)
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;

            // 到這里就說(shuō)明需要開始線程池終止操作  
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 操作1:設(shè)置線程池狀態(tài)為TIDYING掀亩,worker數(shù)量為0
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // terminated鉤子方法
                        terminated();
                    } finally {
                        // 操作1設(shè)置后,又處理了terminated方法欢顷,那么就可以把線程池置為TERMINATED狀態(tài)了(線程池已完全關(guān)閉)
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 通知在termination條件上等待的操作
                        // 比如awaitTermination方法的等待操作槽棍,這里喚醒后,可能會(huì)提前結(jié)束等到操作
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

當(dāng)線程池處于以下狀態(tài)不需要關(guān)閉:

  • 如果線程池是RUNNING狀態(tài)的

  • 如果線程池是SHUTDOWN狀態(tài)并且隊(duì)列中還有task(SHUTDOWN狀態(tài)下還是會(huì)處理現(xiàn)有的task)

ThreadPoolExecutor線程池源碼分析_持之以恒-CSDN博客

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末抬驴,一起剝皮案震驚了整個(gè)濱河市炼七,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌布持,老刑警劉巖豌拙,帶你破解...
    沈念sama閱讀 219,366評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異题暖,居然都是意外死亡按傅,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門胧卤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)唯绍,“玉大人,你說(shuō)我怎么就攤上這事枝誊】雒ⅲ” “怎么了?”我有些...
    開封第一講書人閱讀 165,689評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵叶撒,是天一觀的道長(zhǎng)牛柒。 經(jīng)常有香客問(wèn)我,道長(zhǎng)痊乾,這世上最難降的妖魔是什么皮壁? 我笑而不...
    開封第一講書人閱讀 58,925評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮哪审,結(jié)果婚禮上蛾魄,老公的妹妹穿的比我還像新娘。我一直安慰自己湿滓,他們只是感情好滴须,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,942評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著叽奥,像睡著了一般扔水。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上朝氓,一...
    開封第一講書人閱讀 51,727評(píng)論 1 305
  • 那天魔市,我揣著相機(jī)與錄音主届,去河邊找鬼。 笑死待德,一個(gè)胖子當(dāng)著我的面吹牛君丁,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播将宪,決...
    沈念sama閱讀 40,447評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼绘闷,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了较坛?” 一聲冷哼從身側(cè)響起印蔗,我...
    開封第一講書人閱讀 39,349評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎丑勤,沒想到半個(gè)月后华嘹,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,820評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡确封,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,990評(píng)論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了再菊。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片爪喘。...
    茶點(diǎn)故事閱讀 40,127評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖纠拔,靈堂內(nèi)的尸體忽然破棺而出秉剑,到底是詐尸還是另有隱情,我是刑警寧澤稠诲,帶...
    沈念sama閱讀 35,812評(píng)論 5 346
  • 正文 年R本政府宣布侦鹏,位于F島的核電站,受9級(jí)特大地震影響臀叙,放射性物質(zhì)發(fā)生泄漏略水。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,471評(píng)論 3 331
  • 文/蒙蒙 一劝萤、第九天 我趴在偏房一處隱蔽的房頂上張望渊涝。 院中可真熱鬧,春花似錦床嫌、人聲如沸跨释。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)鳖谈。三九已至,卻和暖如春阔涉,著一層夾襖步出監(jiān)牢的瞬間缆娃,已是汗流浹背捷绒。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留龄恋,地道東北人疙驾。 一個(gè)月前我還...
    沈念sama閱讀 48,388評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像郭毕,于是被迫代替她去往敵國(guó)和親它碎。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,066評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 線程池中有一定數(shù)量的工作線程显押,工作線程會(huì)循環(huán)從任務(wù)隊(duì)列中獲取任務(wù)扳肛,并執(zhí)行這個(gè)任務(wù)。那么怎么去停止這些工作線程呢乘碑?這...
    wo883721閱讀 1,625評(píng)論 0 14
  • 背景 ThreadExecutorPool是使用最多的線程池組件挖息,了解它的原始資料最好是從從設(shè)計(jì)者(Doug Le...
    ninetyhe_閱讀 307評(píng)論 0 1
  • 一、線程池簡(jiǎn)介 線程池的使用主要是解決兩個(gè)問(wèn)題:①當(dāng)執(zhí)行大量異步任務(wù)的時(shí)候線程池能夠提供更好的性能兽肤,在不使用線程池...
    java菜閱讀 345評(píng)論 0 0
  • 1.線程池概覽 線程池主要用于線程資源的管理套腹,防止頻繁的創(chuàng)建以及銷毀線程,提升資源的利用率资铡。JDK中的線程池實(shí)現(xiàn)电禀,...
    prozombie閱讀 2,110評(píng)論 0 37
  • 一、 前言 線程池主要解決兩個(gè)問(wèn)題:一方面當(dāng)執(zhí)行大量異步任務(wù)時(shí)候線程池能夠提供較好的性能笤休,尖飞,這是因?yàn)槭褂镁€程池可以...
    阿里加多閱讀 3,671評(píng)論 2 17