【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

作為Executor框架中最核心的類,ThreadPoolExecutor代表著鼎鼎大名的線程池黔州,它給了我們足夠的理由來弄清楚它耍鬓。

下面我們就通過源碼來一步一步弄清楚它。

內(nèi)部狀態(tài)

線程有五種狀態(tài):新建流妻,就緒牲蜀,運(yùn)行,阻塞绅这,死亡涣达,
線程池同樣有五種狀態(tài):Running, SHUTDOWN, STOP, TIDYING, TERMINATED。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

變量ctl定義為AtomicInteger ,其功能非常強(qiáng)大度苔,記錄了“線程池中的任務(wù)數(shù)量”和“線程池的狀態(tài)”兩個信息匆篓。共32位,其中高3位表示”線程池狀態(tài)”寇窑,低29位表示”線程池中的任務(wù)數(shù)量”鸦概。

RUNNING            -- 對應(yīng)的高3位值是111。
SHUTDOWN       -- 對應(yīng)的高3位值是000甩骏。
STOP                   -- 對應(yīng)的高3位值是001窗市。
TIDYING              -- 對應(yīng)的高3位值是010。
TERMINATED     -- 對應(yīng)的高3位值是011饮笛。

RUNNING:處于RUNNING狀態(tài)的線程池能夠接受新任務(wù)咨察,以及對新添加的任務(wù)進(jìn)行處理。

SHUTDOWN:處于SHUTDOWN狀態(tài)的線程池不可以接受新任務(wù)福青,但是可以對已添加的任務(wù)進(jìn)行處理摄狱。

STOP:處于STOP狀態(tài)的線程池不接收新任務(wù),不處理已添加的任務(wù)素跺,并且會中斷正在處理的任務(wù)二蓝。

TIDYING:當(dāng)所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0指厌,線程池會變?yōu)門IDYING狀態(tài)刊愚。當(dāng)線程池變?yōu)門IDYING狀態(tài)時,會執(zhí)行鉤子函數(shù)terminated()踩验。terminated()在ThreadPoolExecutor類中是空的鸥诽,若用戶想在線程池變?yōu)門IDYING時,進(jìn)行相應(yīng)的處理箕憾;可以通過重載terminated()函數(shù)來實現(xiàn)牡借。

TERMINATED:線程池徹底終止的狀態(tài)。

各個狀態(tài)的轉(zhuǎn)換如下:

線程池狀態(tài)轉(zhuǎn)換

創(chuàng)建線程池

我們可以通過ThreadPoolExecutor構(gòu)造函數(shù)來創(chuàng)建一個線程池:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

共有七個參數(shù)袭异,每個參數(shù)含義如下:

corePoolSize

線程池中核心線程的數(shù)量钠龙。當(dāng)提交一個任務(wù)時,線程池會新建一個線程來執(zhí)行任務(wù)御铃,直到當(dāng)前線程數(shù)等于corePoolSize碴里。如果調(diào)用了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有基本線程上真。

maximumPoolSize

線程池中允許的最大線程數(shù)咬腋。線程池的阻塞隊列滿了之后,如果還有任務(wù)提交睡互,如果當(dāng)前的線程數(shù)小于maximumPoolSize根竿,則會新建線程來執(zhí)行任務(wù)陵像。注意,如果使用的是無界隊列寇壳,該參數(shù)也就沒有什么效果了醒颖。

keepAliveTime

線程空閑的時間。線程的創(chuàng)建和銷毀是需要代價的九巡。線程執(zhí)行完任務(wù)后不會立即銷毀图贸,而是繼續(xù)存活一段時間:keepAliveTime蹂季。默認(rèn)情況下冕广,該參數(shù)只有在線程數(shù)大于corePoolSize時才會生效。

unit

keepAliveTime的單位偿洁。TimeUnit

workQueue

用來保存等待執(zhí)行的任務(wù)的阻塞隊列撒汉,等待的任務(wù)必須實現(xiàn)Runnable接口。我們可以選擇如下幾種:

threadFactory

用于設(shè)置創(chuàng)建線程的工廠。該對象可以通過Executors.defaultThreadFactory()增拥,如下:

    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }

返回的是DefaultThreadFactory對象啄巧,源碼如下:

    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

ThreadFactory的左右就是提供創(chuàng)建線程的功能的線程工廠。他是通過newThread()方法提供創(chuàng)建線程的功能掌栅,newThread()方法創(chuàng)建的線程都是“非守護(hù)線程”而且“線程優(yōu)先級都是Thread.NORM_PRIORITY”秩仆。

handler

RejectedExecutionHandler,線程池的拒絕策略猾封。所謂拒絕策略澄耍,是指將任務(wù)添加到線程池中時,線程池拒絕該任務(wù)所采取的相應(yīng)策略晌缘。當(dāng)向線程池中提交任務(wù)時齐莲,如果此時線程池中的線程已經(jīng)飽和了,而且阻塞隊列也已經(jīng)滿了枚钓,則線程池會選擇一種拒絕策略來處理該任務(wù)铅搓。

線程池提供了四種拒絕策略:

  1. AbortPolicy:直接拋出異常,默認(rèn)策略搀捷;
  2. CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù)星掰;
  3. DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù)多望,并執(zhí)行當(dāng)前任務(wù);
  4. DiscardPolicy:直接丟棄任務(wù)氢烘;
    當(dāng)然我們也可以實現(xiàn)自己的拒絕策略怀偷,例如記錄日志等等,實現(xiàn)RejectedExecutionHandler接口即可播玖。

線程池

Executor框架提供了三種線程池椎工,他們都可以通過工具類Executors來創(chuàng)建。

FixedThreadPool

FixedThreadPool蜀踏,可重用固定線程數(shù)的線程池维蒙,其定義如下:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

corePoolSize 和 maximumPoolSize都設(shè)置為創(chuàng)建FixedThreadPool時指定的參數(shù)nThreads,意味著當(dāng)線程池滿時且阻塞隊列也已經(jīng)滿時果覆,如果繼續(xù)提交任務(wù)颅痊,則會直接走拒絕策略,該線程池不會再新建線程來執(zhí)行任務(wù)局待,而是直接走拒絕策略斑响。FixedThreadPool使用的是默認(rèn)的拒絕策略,即AbortPolicy钳榨,則直接拋出異常舰罚。

keepAliveTime設(shè)置為0L,表示空閑的線程會立刻終止薛耻。

workQueue則是使用LinkedBlockingQueue营罢,但是沒有設(shè)置范圍,那么則是最大值(Integer.MAX_VALUE)昭卓,這基本就相當(dāng)于一個無界隊列了愤钾。使用該“無界隊列”則會帶來哪些影響呢?當(dāng)線程池中的線程數(shù)量等于corePoolSize 時候醒,如果繼續(xù)提交任務(wù)能颁,該任務(wù)會被添加到阻塞隊列workQueue中,當(dāng)阻塞隊列也滿了之后倒淫,則線程池會新建線程執(zhí)行任務(wù)直到maximumPoolSize伙菊。由于FixedThreadPool使用的是“無界隊列”LinkedBlockingQueue,那么maximumPoolSize參數(shù)無效敌土,同時指定的拒絕策略AbortPolicy也將無效镜硕。而且該線程池也不會拒絕提交的任務(wù),如果客戶端提交任務(wù)的速度快于任務(wù)的執(zhí)行返干,那么keepAliveTime也是一個無效參數(shù)兴枯。

其運(yùn)行圖如下(參考《Java并發(fā)編程的藝術(shù)》):

FixedThreadPool

SingleThreadExecutor

SingleThreadExecutor是使用單個worker線程的Executor,定義如下:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

作為單一worker線程的線程池矩欠,SingleThreadExecutor把corePool和maximumPoolSize均被設(shè)置為1财剖,和FixedThreadPool一樣使用的是無界隊列LinkedBlockingQueue,所以帶來的影響和FixedThreadPool一樣悠夯。

SingleThreadExecutor

CachedThreadPool

CachedThreadPool是一個會根據(jù)需要創(chuàng)建新線程的線程池 ,他定義如下:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CachedThreadPool的corePool為0躺坟,maximumPoolSize為Integer.MAX_VALUE沦补,這就意味著所有的任務(wù)一提交就會加入到阻塞隊列中。keepAliveTime這是為60L咪橙,unit設(shè)置為TimeUnit.SECONDS夕膀,意味著空閑線程等待新任務(wù)的最長時間為60秒,空閑線程超過60秒后將會被終止美侦。阻塞隊列采用的SynchronousQueue产舞,而我們在【死磕Java并發(fā)】—-J.U.C之阻塞隊列:SynchronousQueue中了解到SynchronousQueue是一個沒有元素的阻塞隊列,加上corePool = 0 音榜,maximumPoolSize = Integer.MAX_VALUE庞瘸,這樣就會存在一個問題捧弃,如果主線程提交任務(wù)的速度遠(yuǎn)遠(yuǎn)大于CachedThreadPool的處理速度赠叼,則CachedThreadPool會不斷地創(chuàng)建新線程來執(zhí)行任務(wù),這樣有可能會導(dǎo)致系統(tǒng)耗盡CPU和內(nèi)存資源违霞,所以在使用該線程池是嘴办,一定要注意控制并發(fā)的任務(wù)數(shù),否則創(chuàng)建大量的線程可能導(dǎo)致嚴(yán)重的性能問題买鸽。

CachedThreadPool

任務(wù)提交

線程池根據(jù)業(yè)務(wù)不同的需求提供了兩種方式提交任務(wù):Executor.execute()涧郊、ExecutorService.submit()。其中ExecutorService.submit()可以獲取該任務(wù)執(zhí)行的Future眼五。
我們以Executor.execute()為例妆艘,來看看線程池的任務(wù)提交經(jīng)歷了那些過程。

定義:

public interface Executor {
    void execute(Runnable command);
}

ThreadPoolExecutor提供實現(xiàn):

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

執(zhí)行流程如下:

  1. 如果線程池當(dāng)前線程數(shù)小于corePoolSize看幼,則調(diào)用addWorker創(chuàng)建新線程執(zhí)行任務(wù)批旺,成功返回true,失敗執(zhí)行步驟2诵姜。
  2. 如果線程池處于RUNNING狀態(tài)汽煮,則嘗試加入阻塞隊列,如果加入阻塞隊列成功棚唆,則嘗試進(jìn)行Double Check暇赤,如果加入失敗,則執(zhí)行步驟3宵凌。
  3. 如果線程池不是RUNNING狀態(tài)或者加入阻塞隊列失敗鞋囊,則嘗試創(chuàng)建新線程直到maxPoolSize,如果失敗瞎惫,則調(diào)用reject()方法運(yùn)行相應(yīng)的拒絕策略溜腐。

在步驟2中如果加入阻塞隊列成功了坯门,則會進(jìn)行一個Double Check的過程。Double Check過程的主要目的是判斷加入到阻塞隊里中的線程是否可以被執(zhí)行逗扒。如果線程池不是RUNNING狀態(tài)古戴,則調(diào)用remove()方法從阻塞隊列中刪除該任務(wù),然后調(diào)用reject()方法處理任務(wù)矩肩。否則需要確保還有線程執(zhí)行现恼。

addWorker
當(dāng)線程中的當(dāng)前線程數(shù)小于corePoolSize,則調(diào)用addWorker()創(chuàng)建新線程執(zhí)行任務(wù)黍檩,當(dāng)前線程數(shù)則是根據(jù)ctl變量來獲取的叉袍,調(diào)用workerCountOf(ctl)獲取低29位即可:

    private static int workerCountOf(int c)  { return c & CAPACITY; }

addWorker(Runnable firstTask, boolean core)方法用于創(chuàng)建線程執(zhí)行任務(wù),源碼如下:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();

            // 獲取當(dāng)前線程狀態(tài)
            int rs = runStateOf(c);

            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;

            // 內(nèi)層循環(huán)刽酱,worker + 1
            for (;;) {
                // 線程數(shù)量
                int wc = workerCountOf(c);
                // 如果當(dāng)前線程數(shù)大于線程最大上限CAPACITY  return false
                // 若core == true喳逛,則與corePoolSize 比較,否則與maximumPoolSize 棵里,大于 return false
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // worker + 1,成功跳出retry循環(huán)
                if (compareAndIncrementWorkerCount(c))
                    break retry;

                // CAS add worker 失敗润文,再次讀取ctl
                c = ctl.get();

                // 如果狀態(tài)不等于之前獲取的state,跳出內(nèi)層循環(huán)殿怜,繼續(xù)去外層循環(huán)判斷
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {

            // 新建線程:Worker
            w = new Worker(firstTask);
            // 當(dāng)前線程
            final Thread t = w.thread;
            if (t != null) {
                // 獲取主鎖:mainLock
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {

                    // 線程狀態(tài)
                    int rs = runStateOf(ctl.get());

                    // rs < SHUTDOWN ==> 線程處于RUNNING狀態(tài)
                    // 或者線程處于SHUTDOWN狀態(tài)典蝌,且firstTask == null(可能是workQueue中仍有未執(zhí)行完成的任務(wù),創(chuàng)建沒有初始任務(wù)的worker線程執(zhí)行)
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {

                        // 當(dāng)前線程已經(jīng)啟動头谜,拋出異常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();

                        // workers是一個HashSet<Worker>
                        workers.add(w);

                        // 設(shè)置最大的池大小largestPoolSize骏掀,workerAdded設(shè)置為true
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    // 釋放鎖
                    mainLock.unlock();
                }
                // 啟動線程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {

            // 線程啟動失敗
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
  1. 判斷當(dāng)前線程是否可以添加任務(wù),如果可以則進(jìn)行下一步柱告,否則return false截驮;

    1. rs >= SHUTDOWN ,表示當(dāng)前線程處于SHUTDOWN 际度,STOP葵袭、TIDYING、TERMINATED狀態(tài)
    2. rs == SHUTDOWN , firstTask != null時不允許添加線程甲脏,因為線程處于SHUTDOWN 狀態(tài)眶熬,不允許添加任務(wù)
    3. rs == SHUTDOWN , firstTask == null,但workQueue.isEmpty() == true块请,不允許添加線程娜氏,因為firstTask == null是為了添加一個沒有任務(wù)的線程然后再從workQueue中獲取任務(wù)的,如果workQueue == null墩新,則說明添加的任務(wù)沒有任何意義贸弥。
  2. 內(nèi)嵌循環(huán),通過CAS worker + 1

  3. 獲取主鎖mailLock海渊,如果線程池處于RUNNING狀態(tài)獲取處于SHUTDOWN狀態(tài)且 firstTask == null绵疲,則將任務(wù)添加到workers Queue中哲鸳,然后釋放主鎖mainLock,然后啟動線程盔憨,然后return true徙菠,如果中途失敗導(dǎo)致workerStarted= false,則調(diào)用addWorkerFailed()方法進(jìn)行處理郁岩。

在這里需要好好理論addWorker中的參數(shù)婿奔,在execute()方法中,有三處調(diào)用了該方法:
第一次:workerCountOf(c) < corePoolSize ==> addWorker(command, true)问慎,這個很好理解萍摊,當(dāng)然線程池的線程數(shù)量小于 corePoolSize ,則新建線程執(zhí)行任務(wù)即可如叼,在執(zhí)行過程core == true冰木,內(nèi)部與corePoolSize比較即可。
第二次:加入阻塞隊列進(jìn)行Double Check時笼恰,else if (workerCountOf(recheck) == 0) ==>addWorker(null, false)硬毕。如果線程池中的線程==0人乓,按照道理應(yīng)該該任務(wù)應(yīng)該新建線程執(zhí)行任務(wù)镇辉,但是由于已經(jīng)該任務(wù)已經(jīng)添加到了阻塞隊列佛掖,那么就在線程池中新建一個空線程,然后從阻塞隊列中取線程即可猴仑。
第三次:線程池不是RUNNING狀態(tài)或者加入阻塞隊列失敗:else if (!addWorker(command, false))肥哎,這里core == fase辽俗,則意味著是與maximumPoolSize比較。

在新建線程執(zhí)行任務(wù)時篡诽,將講Runnable包裝成一個Worker崖飘,Woker為ThreadPoolExecutor的內(nèi)部類

Woker內(nèi)部類

Woker的源碼如下:

    private final class Worker extends AbstractQueuedSynchronizer
            implements Runnable {
        private static final long serialVersionUID = 6138294804551838833L;

        // task 的thread
        final Thread thread;

        // 運(yùn)行的任務(wù)task
        Runnable firstTask;

        volatile long completedTasks;

        Worker(Runnable firstTask) {

            //設(shè)置AQS的同步狀態(tài)private volatile int state,是一個計數(shù)器杈女,大于0代表鎖已經(jīng)被獲取
            setState(-1);
            this.firstTask = firstTask;

            // 利用ThreadFactory和 Worker這個Runnable創(chuàng)建的線程對象
            this.thread = getThreadFactory().newThread(this);
        }

        // 任務(wù)執(zhí)行
        public void run() {
            runWorker(this);
        }

    }

從Worker的源碼中我們可以看到Woker繼承AQS朱浴,實現(xiàn)Runnable接口,所以可以認(rèn)為Worker既是一個可以執(zhí)行的任務(wù)达椰,也可以達(dá)到獲取鎖釋放鎖的效果翰蠢。這里繼承AQS主要是為了方便線程的中斷處理。這里注意兩個地方:構(gòu)造函數(shù)啰劲、run()梁沧。構(gòu)造函數(shù)主要是做三件事:1.設(shè)置同步狀態(tài)state為-1,同步狀態(tài)大于0表示就已經(jīng)獲取了鎖蝇裤,2.設(shè)置將當(dāng)前任務(wù)task設(shè)置為firstTask廷支,3.利用Worker本身對象this和ThreadFactory創(chuàng)建線程對象频鉴。

當(dāng)線程thread啟動(調(diào)用start()方法)時,其實就是執(zhí)行Worker的run()方法恋拍,內(nèi)部調(diào)用runWorker()垛孔。

runWorker

    final void runWorker(Worker w) {

        // 當(dāng)前線程
        Thread wt = Thread.currentThread();

        // 要執(zhí)行的任務(wù)
        Runnable task = w.firstTask;

        w.firstTask = null;

        // 釋放鎖,運(yùn)行中斷
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                // worker 獲取鎖
                w.lock();

                // 確保只有當(dāng)線程是stoping時施敢,才會被設(shè)置為中斷似炎,否則清楚中斷標(biāo)示
                // 如果線程池狀態(tài) >= STOP ,且當(dāng)前線程沒有設(shè)置中斷狀態(tài),則wt.interrupt()
                // 如果線程池狀態(tài) < STOP悯姊,但是線程已經(jīng)中斷了羡藐,再次判斷線程池是否 >= STOP,如果是 wt.interrupt()
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 自定義方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 執(zhí)行任務(wù)
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 完成任務(wù)數(shù) + 1
                    w.completedTasks++;
                    // 釋放鎖
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

運(yùn)行流程

  1. 根據(jù)worker獲取要執(zhí)行的任務(wù)task悯许,然后調(diào)用unlock()方法釋放鎖仆嗦,這里釋放鎖的主要目的在于中斷,因為在new Worker時先壕,設(shè)置的state為-1瘩扼,調(diào)用unlock()方法可以將state設(shè)置為0,這里主要原因就在于interruptWorkers()方法只有在state >= 0時才會執(zhí)行垃僚;
  2. 通過getTask()獲取執(zhí)行的任務(wù)集绰,調(diào)用task.run()執(zhí)行,當(dāng)然在執(zhí)行之前會調(diào)用worker.lock()上鎖谆棺,執(zhí)行之后調(diào)用worker.unlock()放鎖栽燕;
  3. 在任務(wù)執(zhí)行前后,可以根據(jù)業(yè)務(wù)場景自定義beforeExecute() 和 afterExecute()方法改淑,則兩個方法在ThreadPoolExecutor中是空實現(xiàn)碍岔;
  4. 如果線程執(zhí)行完成,則會調(diào)用getTask()方法從阻塞隊列中獲取新任務(wù)朵夏,如果阻塞隊列為空蔼啦,則根據(jù)是否超時來判斷是否需要阻塞;
  5. task == null或者拋出異常(beforeExecute()仰猖、task.run()捏肢、afterExecute()均有可能)導(dǎo)致worker線程終止,則調(diào)用processWorkerExit()方法處理worker退出流程饥侵。

getTask()

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {

            // 線程池狀態(tài)
            int c = ctl.get();
            int rs = runStateOf(c);

            // 線程池中狀態(tài) >= STOP 或者 線程池狀態(tài) == SHUTDOWN且阻塞隊列為空鸵赫,則worker - 1,return null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 判斷是否需要超時控制
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {

                // 從阻塞隊列中獲取task
                // 如果需要超時控制爆捞,則調(diào)用poll()奉瘤,否則調(diào)用take()
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

timed == true,調(diào)用poll()方法,如果在keepAliveTime時間內(nèi)還沒有獲取task的話盗温,則返回null藕赞,繼續(xù)循環(huán)。timed == false卖局,則調(diào)用take()方法斧蜕,該方法為一個阻塞方法,沒有任務(wù)時會一直阻塞掛起砚偶,直到有任務(wù)加入時對該線程喚醒批销,返回任務(wù)。

在runWorker()方法中染坯,無論最終結(jié)果如何均芽,都會執(zhí)行processWorkerExit()方法對worker進(jìn)行退出處理。

processWorkerExit()

    private void processWorkerExit(Worker w, boolean completedAbruptly) {

        // true:用戶線程運(yùn)行異常,需要扣減
        // false:getTask方法中扣減線程數(shù)量
        if (completedAbruptly)
            decrementWorkerCount();

        // 獲取主鎖
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // 從HashSet中移出worker
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        // 有worker線程移除单鹿,可能是最后一個線程退出需要嘗試終止線程池
        tryTerminate();

        int c = ctl.get();
        // 如果線程為running或shutdown狀態(tài)掀宋,即tryTerminate()沒有成功終止線程池,則判斷是否有必要一個worker
        if (runStateLessThan(c, STOP)) {
            // 正常退出仲锄,計算min:需要維護(hù)的最小線程數(shù)量
            if (!completedAbruptly) {
                // allowCoreThreadTimeOut 默認(rèn)false:是否需要維持核心線程的數(shù)量
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果min ==0 或者workerQueue為空劲妙,min = 1
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;

                // 如果線程數(shù)量大于最少數(shù)量min,直接返回儒喊,不需要新增線程
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 添加一個沒有firstTask的worker
            addWorker(null, false);
        }
    }

首先completedAbruptly的值來判斷是否需要對線程數(shù)-1處理镣奋,如果completedAbruptly == true,說明在任務(wù)運(yùn)行過程中出現(xiàn)了異常怀愧,那么需要進(jìn)行減1處理侨颈,否則不需要,因為減1處理在getTask()方法中處理了掸驱。然后從HashSet中移出該worker肛搬,過程需要獲取mainlock。然后調(diào)用tryTerminate()方法處理毕贼,該方法是對最后一個線程退出做終止線程池動作。如果線程池沒有終止蛤奢,那么線程池需要保持一定數(shù)量的線程鬼癣,則通過addWorker(null,false)新增一個空的線程。

addWorkerFailed()

在addWorker()方法中啤贩,如果線程t==null待秃,或者在add過程出現(xiàn)異常,會導(dǎo)致workerStarted == false痹屹,那么在最后會調(diào)用addWorkerFailed()方法:

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 從HashSet中移除該worker
            if (w != null)
                workers.remove(w);

            // 線程數(shù) - 1
            decrementWorkerCount();
            // 嘗試終止線程
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

整個邏輯顯得比較簡單章郁。

tryTerminate()

當(dāng)線程池涉及到要移除worker時候都會調(diào)用tryTerminate(),該方法主要用于判斷線程池中的線程是否已經(jīng)全部移除了,如果是的話則關(guān)閉線程池暖庄。

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 線程池處于Running狀態(tài)
            // 線程池已經(jīng)終止了
            // 線程池處于ShutDown狀態(tài)聊替,但是阻塞隊列不為空
            if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;

            // 執(zhí)行到這里,就意味著線程池要么處于STOP狀態(tài)培廓,要么處于SHUTDOWN且阻塞隊列為空
            // 這時如果線程池中還存在線程惹悄,則會嘗試中斷線程
            if (workerCountOf(c) != 0) {
                // /線程池還有線程,但是隊列沒有任務(wù)了肩钠,需要中斷喚醒等待任務(wù)的線程
                // (runwoker的時候首先就通過w.unlock設(shè)置線程可中斷泣港,getTask最后面的catch處理中斷)
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 嘗試終止線程池
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        // 線程池狀態(tài)轉(zhuǎn)為TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
        }
    }

在關(guān)閉線程池的過程中,如果線程池處于STOP狀態(tài)或者處于SHUDOWN狀態(tài)且阻塞隊列為null价匠,則線程池會調(diào)用interruptIdleWorkers()方法中斷所有線程当纱,注意ONLY_ONE== true,表示僅中斷一個線程踩窖。

interruptIdleWorkers

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

onlyOne==true僅終止一個線程坡氯,否則終止所有線程。

線程終止

線程池ThreadPoolExecutor提供了shutdown()和shutDownNow()用于關(guān)閉線程池毙石。

shutdown():按過去執(zhí)行已提交任務(wù)的順序發(fā)起一個有序的關(guān)閉廉沮,但是不接受新任務(wù)。

shutdownNow() :嘗試停止所有的活動執(zhí)行任務(wù)徐矩、暫停等待任務(wù)的處理滞时,并返回等待執(zhí)行的任務(wù)列表。

shutdown

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 推進(jìn)線程狀態(tài)
            advanceRunState(SHUTDOWN);
            // 中斷空閑的線程
            interruptIdleWorkers();
            // 交給子類實現(xiàn)
            onShutdown();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

shutdownNow

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            // 中斷所有線程
            interruptWorkers();
            // 返回等待執(zhí)行的任務(wù)列表
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

與shutdown不同滤灯,shutdownNow會調(diào)用interruptWorkers()方法中斷所有線程坪稽。

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

同時會調(diào)用drainQueue()方法返回等待執(zhí)行到任務(wù)列表。

    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末鳞骤,一起剝皮案震驚了整個濱河市窒百,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌豫尽,老刑警劉巖篙梢,帶你破解...
    沈念sama閱讀 222,590評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異美旧,居然都是意外死亡渤滞,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,157評論 3 399
  • 文/潘曉璐 我一進(jìn)店門榴嗅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來妄呕,“玉大人,你說我怎么就攤上這事嗽测⌒骼” “怎么了?”我有些...
    開封第一講書人閱讀 169,301評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長疏魏。 經(jīng)常有香客問我停做,道長,這世上最難降的妖魔是什么蠢护? 我笑而不...
    開封第一講書人閱讀 60,078評論 1 300
  • 正文 為了忘掉前任雅宾,我火速辦了婚禮,結(jié)果婚禮上葵硕,老公的妹妹穿的比我還像新娘眉抬。我一直安慰自己,他們只是感情好懈凹,可當(dāng)我...
    茶點故事閱讀 69,082評論 6 398
  • 文/花漫 我一把揭開白布蜀变。 她就那樣靜靜地躺著,像睡著了一般介评。 火紅的嫁衣襯著肌膚如雪库北。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,682評論 1 312
  • 那天们陆,我揣著相機(jī)與錄音寒瓦,去河邊找鬼。 笑死坪仇,一個胖子當(dāng)著我的面吹牛杂腰,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播椅文,決...
    沈念sama閱讀 41,155評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼喂很,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了皆刺?” 一聲冷哼從身側(cè)響起少辣,我...
    開封第一講書人閱讀 40,098評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎羡蛾,沒想到半個月后漓帅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,638評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡痴怨,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,701評論 3 342
  • 正文 我和宋清朗相戀三年煎殷,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片腿箩。...
    茶點故事閱讀 40,852評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖劣摇,靈堂內(nèi)的尸體忽然破棺而出珠移,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 36,520評論 5 351
  • 正文 年R本政府宣布钧惧,位于F島的核電站暇韧,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏浓瞪。R本人自食惡果不足惜懈玻,卻給世界環(huán)境...
    茶點故事閱讀 42,181評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望乾颁。 院中可真熱鬧涂乌,春花似錦、人聲如沸英岭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,674評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽诅妹。三九已至罚勾,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間吭狡,已是汗流浹背尖殃。 一陣腳步聲響...
    開封第一講書人閱讀 33,788評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留划煮,地道東北人送丰。 一個月前我還...
    沈念sama閱讀 49,279評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像般此,于是被迫代替她去往敵國和親蚪战。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,851評論 2 361