Java線程池_ThreadPoolExecutor原理分析

線程池中有一定數(shù)量的工作線程寞蚌,工作線程會循環(huán)從任務隊列中獲取任務峻村,并執(zhí)行這個任務麸折。那么怎么去停止這些工作線程呢?
這里就涉及到線程池兩個重要概念:工作線程數(shù)量和線程池狀態(tài)粘昨。

一.線程池狀態(tài)和工作線程數(shù)量

這本來是兩個不同的概念垢啼,但是在ThreadPoolExecutor中我們使用一個變量ctl來存儲這兩個值窜锯,這樣我們只需要維護這一個變量的并發(fā)問題,提高運行效率芭析。

    /**
     * 記錄線程池中Worker工作線程數(shù)量和線程池的狀態(tài)
     * int類型是32位锚扎,它的高3位,表示線程池的狀態(tài)馁启,低29位表示W(wǎng)orker的數(shù)量
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // COUNT_BITS 29位驾孔,
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 表示線程池中創(chuàng)建Worker工作線程數(shù)量的最大值。即 0b0001.....1(29位1)
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

怎么使用一個變量ctl存儲兩個值呢惯疙?
就是利用int變量的高3位來儲存線程池狀態(tài)翠勉,用int變量的低29位來儲存工作線程數(shù)量。

這樣就有兩個需要注意的地方:

  1. 工作線程數(shù)量最大值不能超過int類型29位的值CAPACITY 即0b0001.....1(29位1)
  2. 因為線程池狀態(tài)都是高3位儲存的霉颠,所以工作線程數(shù)量不會影響狀態(tài)值大小關系对碌。

1.1 線程池狀態(tài)

    // 高3位值是111
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 高3位值是000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 高3位值是001
    private static final int STOP       =  1 << COUNT_BITS;
    // 高3位值是010
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 高3位值是011
    private static final int TERMINATED =  3 << COUNT_BITS;

線程池狀態(tài)分析:

  1. RUNNING狀態(tài):線程池剛創(chuàng)建時的狀態(tài)。向任務隊列中添加任務蒿偎,并執(zhí)行任務隊列中的任務朽们。因為高3位值是111,即處于RUNNING狀態(tài)下的ctl值都是負數(shù)诉位。
  2. SHUTDOWN狀態(tài): 調(diào)用shutdown方法华坦,會將線程池設置成這個狀態(tài)。不能向任務隊列中添加任務不从,但是可以執(zhí)行任務隊列中已添加的任務惜姐。并且處于SHUTDOWN狀態(tài)下正在運行任務的工作線程不能中斷的,就是保證任務能夠執(zhí)行完成椿息。
  3. STOP狀態(tài): 調(diào)用shutdownNow方法歹袁,會將線程池設置成這個狀態(tài)。不能向任務隊列中添加任務寝优,也不能再執(zhí)行任務隊列中已添加的任務条舔。
  4. TIDYING狀態(tài): 調(diào)用tryTerminate方法,可能會將線程池設置成這個狀態(tài)乏矾。這個只是中斷過度狀態(tài)孟抗,表示線程池即將變成TERMINATED狀態(tài)。
  5. TERMINATED狀態(tài): 調(diào)用tryTerminate方法钻心,可能會將線程池設置成這個狀態(tài)凄硼。表示線程池已經(jīng)完全終止,即任務隊列為空捷沸,工作線程數(shù)量也是0.

線程池為什么要定義這么多狀態(tài)呢摊沉?按道理說線程池只應該有運行和終止這兩種狀態(tài)啊。
主要是因為終止線程池時痒给,要考慮正在執(zhí)行的任務和已經(jīng)添加到任務隊列中待執(zhí)行的任務該如何處理说墨,否則的話骏全,這些任務可能就會被丟失。

線程池提供了兩個方式處理:

  1. shutdown方法: 它會將線程池狀態(tài)變成SHUTDOWN 狀態(tài)尼斧。禁止向添加新的任務姜贡,但是會讓任務隊列中的任務繼續(xù)執(zhí)行,最后釋放所有的工作線程棺棵,讓線程池狀態(tài)變成TERMINATED狀態(tài)鲁豪。
  2. shutdownNow方法: 它會將線程池狀態(tài)變成STOP 狀態(tài)。禁止向添加新的任務律秃,也不會執(zhí)行任務隊列中的任務爬橡,但是會返回這個任務集合,釋放所有的工作線程棒动,讓線程池狀態(tài)變成TERMINATED狀態(tài)糙申。

1.2 操作ctl的方法

1.2.1 獲取線程池的狀態(tài)

    /**
     * 獲取線程池的狀態(tài)。因為線程池的狀態(tài)是使用高3位儲存船惨,所以屏蔽低29位就行了柜裸。
     * 所以就c與~CAPACITY(0b1110..0)進行&操作,屏蔽低29位的值了粱锐。
     * 注意:這里是屏蔽低29位的值疙挺,而不是右移29位。
     */
    private static int runStateOf(int c)     { return c & ~CAPACITY; }

1.2.2 獲取工作線程數(shù)量

    /**
     * 獲取線程池中Worker工作線程的數(shù)量怜浅,
     * 因為只使用低29位保存Worker的數(shù)量铐然,只要屏蔽高3位的值就行了
     * 所以就c與CAPACITY(0b0001...1)進行&操作,屏蔽高3位的值了恶座。
     */
    private static int workerCountOf(int c)  { return c & CAPACITY; }

1.2.3 合并ctl的值

    /**
     * 得到ctl的值搀暑。
     * 接受兩個參數(shù)rs和wc。rs表示線程池的狀態(tài)跨琳,wc表示W(wǎng)orker工作線程的數(shù)量自点。
     * 對于rs來說我們只需要高3位的值,對于wc來說我們需要低29位的值脉让。
     * 所以我們將rs | wc就可以得到ctl的值了桂敛。
     */
    private static int ctlOf(int rs, int wc) { return rs | wc; }

1.2.4 其他方法

    // 因為RUNNING狀態(tài)高三位是111,所以狀態(tài)值rs與工作線程數(shù)量ws相與的結(jié)果值c一定是個負數(shù)溅潜,
    // 而其他狀態(tài)值都是大于等于0的數(shù)术唬,所以c是負數(shù),那么表示當前線程處于運行狀態(tài)伟恶。
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    /**
     * 使用CAS函數(shù)將ctl值自增
     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    /**
     * 使用CAS函數(shù)將ctl值自減
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
    /**
     * 使用CAS函數(shù)加循環(huán)方法這種樂觀鎖的方式碴开,解決并發(fā)問題毅该。
     * 保證使ctl值減一
     */
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

二. 重要成員變量

    // 記錄線程池中Worker工作線程數(shù)量和線程池的狀態(tài)
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    // 任務線程的阻塞隊列博秫,因為是阻塞隊列潦牛,所以它是并發(fā)安全的
    private final BlockingQueue<Runnable> workQueue;

    // 獨占鎖,用來保證操作成員變量的并發(fā)安全問題
    private final ReentrantLock mainLock = new ReentrantLock();

    // 等待線程池完全終止的條件Condition挡育,
    private final Condition termination = mainLock.newCondition();

    //-----------------  需要mainLock來保證并發(fā)安全-------------------------//
    // 線程池中工作線程集合巴碗。Worker中持有線程thread變量
    private final HashSet<Worker> workers = new HashSet<Worker>();

    // 線程池中曾擁有過的最大工作線程個數(shù)
    private int largestPoolSize;

    // 線程池完成過任務的總個數(shù)
    private long completedTaskCount;
    //-----------------  需要mainLock來保證并發(fā)安全-------------------------//

    // 創(chuàng)建線程的工廠類
    private volatile ThreadFactory threadFactory;

    // 當任務被拒絕時,用來處理這個被拒絕的任務
    private volatile RejectedExecutionHandler handler;
    // 工作線程空閑的超時時間keepAliveTime
    private volatile long keepAliveTime;

    // 是否允許核心池線程超時釋放
    private volatile boolean allowCoreThreadTimeOut;

    // 線程池核心池線程個數(shù)
    private volatile int corePoolSize;

    // 線程池最大的線程個數(shù)
    private volatile int maximumPoolSize;

成員變量的含義已經(jīng)標注了:

  1. mainLock:使用mainLock來保證會發(fā)生變化成員變量的并發(fā)安全問題即寒。會發(fā)生的成員變量有5個:ctl橡淆、workQueue、workers母赵、largestPoolSize和completedTaskCount逸爵。但是其中ctl和workQueue的類型本身就是多線程安全的,所以不用mainLock鎖保護凹嘲。
  2. termination:等待線程池完全終止的條件师倔,如果線程池沒有完全終止,調(diào)用它的awaitNanos方法周蹭,讓線程等待趋艘。當線程池完全終止后,調(diào)用它的signalAll方法凶朗,喚醒所有等待termination條件的線程瓷胧。
  3. workers:記錄所有的工作線程Worker
  4. workQueue:記錄所有待執(zhí)行的任務。使用阻塞隊列BlockingQueue棚愤,可以在隊列為空時搓萧,線程等待,隊列有值時宛畦,喚醒等待的線程矛绘。
  5. largestPoolSize:線程池中曾擁有過的最大工作線程個數(shù)
  6. completedTaskCount:線程池完成過任務的總個數(shù)
  7. threadFactory:創(chuàng)建線程的工廠類
  8. handler:當任務被拒絕時,用來處理這個被拒絕的任務
  9. keepAliveTime:工作線程允許空閑的超時時間刃永,一般都是針對超過核心池數(shù)量的工作線程货矮。
  10. allowCoreThreadTimeOut: 是否允許核心池的工作線程超時釋放。
  11. corePoolSize:線程池核心池線程個數(shù)斯够。
  12. maximumPoolSize: 線程池最大的線程個數(shù)囚玫。

這里注意一下兩個概念核心池個數(shù)和最大線程池個數(shù):

  1. 核心池個數(shù)就是線程池能夠維持的常用工作線程個數(shù),當工作線程沒有執(zhí)行任務空閑時读规,它不會被銷毀抓督,而是在等待。但是如果設置allowCoreThreadTimeOut為true束亏,那么核心池工作線程也是會被銷毀铃在。
  2. 最大線程池個數(shù)就是線程池允許開啟的最大工作線程個數(shù)。最大線程池的意義就是當核心池的工作線程不夠用,且任務隊列也已經(jīng)滿了定铜,不能添加新的任務了阳液,那么就要開啟新的工作線程來執(zhí)行任務。

三. 執(zhí)行任務execute方法

在線程池中如何執(zhí)行一個任務command揣炕,要分三種情況:

  1. 線程池中工作線程的數(shù)量沒有達到核心池個數(shù)帘皿,那么線程池就應該開啟新的工作線程來執(zhí)行任務。
  2. 線程池中工作線程的數(shù)量達到核心池個數(shù)畸陡,那么就應該將任務添加到任務隊列中鹰溜,等待著工作線程去任務隊列中獲取任務并執(zhí)行。
  3. 如果任務添加到任務隊列失敗丁恭,那么就要開啟新的工作線程來執(zhí)行任務曹动。

    public void execute(Runnable command) {
        // 如果command為null,拋出異常
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 分為三個步驟:
         * 1. 如果運行的工作線程數(shù)量少于核心池數(shù)量corePoolSize牲览,
         * 那么就調(diào)用addWorker方法開啟一個新的工作線程仁期,運行任務command。
         * 2. 如果開啟新的工作線程失敗竭恬,就將任務添加到任務隊列中跛蛋。
         * 3. 添加到任務隊列失敗,
         * 那么仍然addWorker方法在最大池中開啟一個新的工作線程痊硕,運行任務command赊级。     
         */
        int c = ctl.get();
        // 運行的工作線程數(shù)量少于核心池數(shù)量corePoolSize
        if (workerCountOf(c) < corePoolSize) {
            /**
             * 開啟一個新的工作線程,運行任務command岔绸。
             * 返回true理逊,表示開啟工作線程成功,直接return盒揉。
             * 返回false晋被,表示沒有開啟新線程。那么任務command就沒有運行刚盈,所以要執(zhí)行下面代碼羡洛。
              */
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 線程池處于運行狀態(tài),
        // 且任務添加到任務阻塞隊列workQueue中成功藕漱,即workQueue隊列有剩余空間欲侮。
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次檢查線程池狀態(tài)和工作線程數(shù)量
            int recheck = ctl.get();
            /**
             * 如果線程池不在運行狀態(tài),那么就調(diào)用remove方法移除workQueue隊列這個任務command肋联,
             * 如果移除成功威蕉,那么調(diào)用reject(command)方法,進行拒絕任務的處理橄仍。
             * 如果移除失敗韧涨,那么這個任務還是會被執(zhí)行牍戚,那么就不用調(diào)用reject(command)方法
             */
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果工作線程數(shù)量為0,但是workQueue隊列中我們添加過任務虑粥,
            // 那么必須調(diào)用addWorker方法如孝,開啟一個新的工作線程。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 調(diào)用addWorker方法舀奶,開啟一個新的工作線程暑竟,運行任務command斋射。
        // 如果還是失敗育勺,那么這個任務command就不會不可能執(zhí)行了,
        // 那么調(diào)用reject(command)方法拒絕這個任務
        else if (!addWorker(command, false))
            reject(command);
    }

方法流程上面已經(jīng)有標注罗岖,注意有以下幾點:

  1. addWorker(Runnable firstTask, boolean core):表示開啟一個新的工作線程執(zhí)行任務firstTask涧至。core是用來判斷核心池還是最大池。返回false桑包,表示開啟新線程失敗南蓬,即任務firstTask沒有機會執(zhí)行。
  2. isRunning(c)線程池處于RUNNING狀態(tài),只有處于RUNNING狀態(tài)下哑了,才能將任務添加到任務隊列赘方。
  3. reject(command) 當任務command不能在線程池中執(zhí)行時,就會調(diào)用這個方法弱左,告訴調(diào)用值窄陡,線程池拒絕執(zhí)行這個任務。

四. 添加工作線程addWorker方法

就是利用任務task創(chuàng)建一個新的工作線程Work拆火,然后將它添加到工作線程集合workers中跳夭。但是需要注意多線程并發(fā)問題。

    private boolean addWorker(Runnable firstTask, boolean core) {
        // 利用死循環(huán)和CAS函數(shù)们镜,實現(xiàn)樂觀鎖币叹,來實現(xiàn)多線程改變ctl值的并發(fā)問題
        // 因為ctl值代表兩個東西,工作線程數(shù)量和線程池狀態(tài)模狭。
        // 這里就用了兩個for循環(huán)颈抚,一個是線程池狀態(tài)的for循環(huán),一個是工作線程數(shù)量的for循環(huán)
        retry:
        for (;;) {
            int c = ctl.get();
            // 獲取線程池運行狀態(tài)rs嚼鹉,
            int rs = runStateOf(c);

            // 首先判斷線程池狀態(tài)和任務隊列狀態(tài)邪意,
            // 來判斷能否創(chuàng)建新的工作線程
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            
            for (;;) {
                // 線程池中工作線程數(shù)量wc
                int wc = workerCountOf(c);
                // 當線程池工作線程數(shù)量wc大于線程上限CAPACITY,
                // 或者用戶規(guī)定核心池數(shù)量corePoolSize或用戶規(guī)定最大線程池數(shù)量maximumPoolSize
                // 表示不能創(chuàng)建工作線程了反砌,所以返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 使用CAS函數(shù)雾鬼,使工作線程數(shù)量wc加一
                if (compareAndIncrementWorkerCount(c))
                    // 跳出retry循環(huán)
                    break retry;
                // 來到這里表示CAS函數(shù)失敗,那么就要循環(huán)重新判斷
                // 但是c還代表線程狀態(tài)宴树,如果線程狀態(tài)改變策菜,那么就必須跳轉(zhuǎn)到retry循環(huán)
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // 工作線程是否開始,即調(diào)用了線程的start方法
        boolean workerStarted = false;
        // 工作線程是否添加到工作線程隊列workers中
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 創(chuàng)建一個Worker對象
            w = new Worker(firstTask);
            // 得到Worker所擁有的線程thread
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 并發(fā)鎖
                mainLock.lock();
                try {
                    //  獲取線程池運行狀態(tài)rs
                    int rs = runStateOf(ctl.get());

                    // 當線程池是運行狀態(tài),或者是SHUTDOWN狀態(tài)但firstTask為null又憨,
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 如果線程t已經(jīng)被開啟翠霍,就拋出異常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 將w添加到工作線程集合workers中
                        workers.add(w);
                        // 獲取工作線程集合workers的個數(shù)
                        int s = workers.size();
                        // 記錄線程池歷史最大的工作線程個數(shù)
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 如果已經(jīng)添加到工作線程隊列中,那么開啟線程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果開啟工作線程失敗蠢莺,那么這個任務也就沒有執(zhí)行
            // 因此移除這個任務w(如果隊列中有)寒匙,減少工作線程數(shù)量,因為這個數(shù)量在之前已經(jīng)增加了
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

添加一個新的工作線程躏将,就涉及到兩個成員變量的改變锄弱,一個是工作線程數(shù)量ctl,一個是工作線程集合workers祸憋。而ctl的類型是AtomicInteger会宪,所以它可以使用樂觀鎖解決并發(fā)問題,workers就只能使用mainLock互斥鎖來保證并發(fā)安全問題蚯窥。

4.1 更改工作線程數(shù)量ctl

因為ctl儲存了兩個值掸鹅,工作線程數(shù)量和線程池狀態(tài)。所以使用了兩個for循環(huán)來監(jiān)控多線程對這兩個值的更改拦赠。
用線程池狀態(tài)來判斷是否允許添加新的工作線程:

            //  是對addWorker中線程狀態(tài)if判斷的拆分
            // 當線程池不是處于運行狀態(tài)
            if (rs >= SHUTDOWN) {
                /**
                 * 線程池狀態(tài)不是SHUTDOWN巍沙,或者firstTask不為null,或者任務隊列為空,
                 * 都直接返回false荷鼠,表示開啟新工作線程失敗句携。
                 * 只有當線程池狀態(tài)是SHUTDOWN,firstTask為null颊咬,任務隊列不為空時务甥,
                 * 需要創(chuàng)建新的工作線程。
                 * 從execute(Runnable command)方法中分析喳篇,firstTask參數(shù)為空只有一種情況敞临,
                 * 此時線程池中工作線程數(shù)量是0,而任務隊列不為空麸澜,
                 * 那么就要開啟一個新工作線程去執(zhí)行任務隊列中的任務涡贱,否則這些任務會被丟失欢峰。
                 */
                if (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()) {
                    return false;
                }
            }

由此可以得出腊凶,只有兩種情形允許添加新的工作線程:

  1. 線程池處于RUNNING狀態(tài)
  2. 線程池雖然處于SHUTDOWN狀態(tài)狼渊,但是線程池工作線程個數(shù)是0(即這里的firstTask != null),且任務隊列workQueue不為空,那么就要開啟一個新工作線程去執(zhí)行任務隊列中的任務馁害。

然后使用for循環(huán)和CAS函數(shù)方式窄俏,來給工作線程數(shù)量加一。注意此時工作線程還沒有創(chuàng)建碘菜,并添加到線程集合workers中凹蜈,所以如果線程添加失敗限寞,那么還要將工作線程數(shù)量減一。

4.2 添加工作線程集合workers

創(chuàng)建一個工作線程Worker仰坦,將它添加到線程集合workers中履植,然后開啟這個工作線程,使用mainLock獨占鎖保證成員變量workers的并發(fā)安全問題悄晃。

五. 內(nèi)部類Worker

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        /** 該Worker所擁有的工作線程 */
        final Thread thread;
        /** Worker擁有的第一個任務玫霎,初始化的時候賦值 */
        Runnable firstTask;
        /** 該工作線程Worker完成任務的數(shù)量 */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            // 將state設置為-1,禁止發(fā)起中斷請求妈橄,
            // 直到調(diào)用過runWorker方法庶近,即線程已經(jīng)運行時。
            setState(-1);
            // 第一個任務
            this.firstTask = firstTask;
            // 創(chuàng)建一個thread線程對象眷细,它的run方法就是本W(wǎng)orker的run方法
            // 這個thread就是Worker真正執(zhí)行任務的工作線程
            this.thread = getThreadFactory().newThread(this);
        }

        /** 復寫的是Runnable中的run方法拦盹,所以當工作線程開啟運行后鹃祖,會調(diào)用這個方法溪椎。  */
        public void run() {
            runWorker(this);
        }

        // 當前獨占鎖是否空閑
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        // 嘗試獲取獨占鎖
        protected boolean tryAcquire(int unused) {
            // 如果通過CAS函數(shù),可以將state值從0改變成1恬口,那么表示獲取獨占鎖成功校读。
            // 否則獨占鎖被別的線程獲取了。
            if (compareAndSetState(0, 1)) {
                // 設置擁有獨占鎖的線程是當前線程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // 釋放獨占鎖
        protected boolean tryRelease(int unused) {
            // 設置擁有獨占鎖的線程為null
            setExclusiveOwnerThread(null);
            // 設置獲取獨占鎖的次數(shù)是0祖能,表示鎖是空閑狀態(tài)
            setState(0);
            return true;
        }

        // 獲取獨占鎖歉秫,如果鎖被別的獲取,就一直等待养铸。
        public void lock()        { acquire(1); }
        // 嘗試獲取獨占鎖雁芙,如果鎖被別的獲取,就直接返回false钞螟,表示獲取失敗兔甘。
        public boolean tryLock()  { return tryAcquire(1); }
        // 釋放獨占鎖
        public void unlock()      { release(1); }
        // 當前獨占鎖是否空閑
        public boolean isLocked() { return isHeldExclusively(); }

        // 如果Worker的工作線程thread已經(jīng)開啟,那么發(fā)起中斷請求鳞滨。
        void interruptIfStarted() {
            Thread t;
            // getState() >= 0表示thread已經(jīng)開啟
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

Worker實現(xiàn)了Runnable接口洞焙,那么就可以通過Worker對象創(chuàng)建一個新線程thread,這個thread就是Worker的工作線程拯啦,而任務都在run方法中執(zhí)行澡匪。
Worker還繼承自AbstractQueuedSynchronizer類。我們知道可以通過AQS類實現(xiàn)獨占鎖和共享鎖褒链,而Worker中實現(xiàn)了tryAcquire和tryRelease方法唁情,說明Worker對象也是個獨占鎖對象。我們可以考慮一下Worker這個獨占鎖的作用是什么甫匹?在后面會介紹到甸鸟。

六. 工作線程運行任務runWorker方法

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 將w的state狀態(tài)設置成0夯巷,這樣就允許對w的thread線程進行中斷請求了。
        w.unlock();
        // completedAbruptly表示線程突然終結(jié)
        boolean completedAbruptly = true;
        try {
            // 通過getTask從任務隊列中獲取任務task執(zhí)行哀墓,這個方法是個阻塞方法趁餐。
            while (task != null || (task = getTask()) != null) {
                // 獲取w獨占鎖,保證當本工作線程運行任務時篮绰,
                // 不能對該線程進行中斷請求后雷。
                w.lock();
                /**
                 * 如果線程池大于STOP狀態(tài),且Worker工作線程中斷標志位是false吠各,
                 * 那么就調(diào)用wt的interrupt方法發(fā)起中斷請求臀突。
                 */
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    // Worker工作線程發(fā)起中斷請求
                    wt.interrupt();
                try {
                    // 鉤子方法,提供給子類贾漏。在執(zhí)行任務之前調(diào)用
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 調(diào)用run方法候学,執(zhí)行任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 鉤子方法,提供給子類纵散。在執(zhí)行任務完成后調(diào)用
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 將task設置為null梳码,進行下一次循環(huán)
                    task = null;
                    // 將work完成的任務數(shù)completedTasks加一
                    w.completedTasks++;
                    // 釋放w獨占鎖
                    w.unlock();
                }
            }
            // completedAbruptly = false表示線程正常完成終結(jié)
            completedAbruptly = false;
        } finally {
            // 進行一個工作線程完結(jié)后的后續(xù)操作
            processWorkerExit(w, completedAbruptly);
        }
    }

runWorker方法是在每個工作線程的run方法中調(diào)用,通過getTask()方法從任務隊列中獲取任務task執(zhí)行伍掀,這個方法可以阻塞當前工作線程掰茶,如果getTask()方法返回null,那么工作線程就會運行結(jié)束蜜笤,釋放線程濒蒋。

雖然runWorker方法運行在每個工作線程中,但是對于一個Worker來說把兔,只會有它的工作線程能夠運行runWorker方法沪伙,而且改變的也是這個Worker的成員變量,且這些成員變量也只能在runWorker方法改變县好,那么它沒有多線程并發(fā)問題啊围橡,那么為什么在這里加鎖呢?
這是因為Worker中有一個變量是可以被其他線程改變的聘惦,就是它的工作線程thread的中斷請求某饰,所以Worker獨占鎖的作用就是控制別的線程對它的工作線程thread中斷請求的。

最后調(diào)用processWorkerExit方法善绎,進行一個工作線程完結(jié)后的后續(xù)操作黔漂。

七. 獲取任務getTask方法

 private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            // 獲取線程池狀態(tài)rs
            int rs = runStateOf(c);

            // 如果有需要檢查任務隊列workQueue是否為空
            // 即rs >= STOP或者rs == SHUTDOWN且workQueue為空,那么返回null禀酱,停止工作線程
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 將工作線程數(shù)量減一
                decrementWorkerCount();
                return null;
            }

            // 獲取工作線程數(shù)量wc
            int wc = workerCountOf(c);

            /**
             * 如果allowCoreThreadTimeOut為true或者wc > corePoolSize時炬守,
             * 就要減少工作線程數(shù)量了。
             * 當工作線程在keepAliveTime時間內(nèi)剂跟,沒有獲取到可執(zhí)行的任務减途,
             * 那么該工作線程就要被銷毀酣藻。
             */
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                // 工作線程數(shù)量減一,返回null鳍置,銷毀工作線程辽剧。
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 從任務隊列workQueue中獲取了任務r,會阻塞當前線程税产。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 如果r不為null怕轿,返回這個任務r
                if (r != null)
                    return r;
                // r是null,表示獲取任務超時
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

從阻塞任務隊列workQueue中獲取任務返回辟拷,因為是阻塞任務隊列撞羽,所以可以阻塞當前線程。如果返回null衫冻,那么會完結(jié)調(diào)用getTask方法的那個工作線程诀紊。那么getTask方法在什么情況下返回null呢?

  1. 線程池的狀態(tài)大于等于STOP隅俘,或者線程狀態(tài)是SHUTDOWN且當前任務隊列為空邻奠,那么返回null,停止工作線程考赛。
  2. 獲取任務時間超時惕澎,那么也會返回null莉测,停止工作線程颜骤。因為線程池一般只維護一定數(shù)量的工作線程,如果超過這個數(shù)量捣卤,那么超過數(shù)量的工作線程忍抽,在空閑一定時間后,應該被釋放董朝。

八. 終止線程池的方法

8.1 shutdown和shutdownNow方法

    /**
     * 終止線程池鸠项。不能在添加新任務了,但是已經(jīng)添加到任務隊列的任務還是會執(zhí)行子姜。
     * 且對所有不是正在執(zhí)行任務的工作線程都發(fā)起中斷請求
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 檢查是否擁有Shutdown的權(quán)限
            checkShutdownAccess();
            // 將線程池狀態(tài)變成SHUTDOWN狀態(tài)
            advanceRunState(SHUTDOWN);
            // 對所有不是正在執(zhí)行任務的工作線程都發(fā)起中斷請求
            interruptIdleWorkers();
            // 鉤子方法祟绊,提供給子類實現(xiàn)。表示線程池已經(jīng)shutdown了
            onShutdown();
        } finally {
            mainLock.unlock();
        }
        // 嘗試去終結(jié)線程池
        tryTerminate();
    }

    /**
     * 終止線程池哥捕。不能在添加新任務了牧抽,也不會執(zhí)行已經(jīng)添加到任務隊列的任務,只是將這些任務返回遥赚。
     * 且對所有工作線程都發(fā)起中斷請求, 不管這個工作線程是否正在執(zhí)行任務
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 檢查是否擁有Shutdown的權(quán)限
            checkShutdownAccess();
            // 將線程池狀態(tài)變成STOP狀態(tài)
            advanceRunState(STOP);
            // 對所有工作線程都發(fā)起中斷請求, 不管這個工作線程是否正在執(zhí)行任務
            interruptWorkers();
            // 返回阻塞隊列workQueue中未執(zhí)行任務的集合
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        // 嘗試去終結(jié)線程池
        tryTerminate();
        return tasks;
    }

shutdown和shutdownNow區(qū)別:

  1. shutdown方法將線程池設置成SHUTDOWN狀態(tài)扬舒,shutdownNow將線程池設置成STOP狀態(tài)。
  2. shutdown方法調(diào)用之后不能在添加新任務了凫佛,但是已經(jīng)添加到任務隊列的任務還是會執(zhí)行讲坎。shutdownNow方法調(diào)用之后不能在添加新任務了孕惜,也不會執(zhí)行已經(jīng)添加到任務隊列的任務,只是將這些任務返回晨炕。
  3. shutdown方法會對所有不是正在執(zhí)行任務的工作線程都發(fā)起中斷請求衫画,shutdownNow方法會對所有工作線程都發(fā)起中斷請求, 不管這個工作線程是否正在執(zhí)行任務。

8.2 advanceRunState方法

    private void advanceRunState(int targetState) {
        // 采用樂觀鎖的方法瓮栗,來并發(fā)更改線程池狀態(tài)碧磅。
        for (;;) {
            int c = ctl.get();
            // 如果runStateAtLeast方法返回true,表示當前線程池狀態(tài)已經(jīng)是目標狀態(tài)targetState
            // 采用CAS函數(shù)嘗試更改線程池狀態(tài)遵馆,如果失敗就循環(huán)繼續(xù)鲸郊。
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

這個方法來改變線程池狀態(tài),使用樂觀鎖的方式保證并發(fā)安全货邓。

8.3 中斷空閑狀態(tài)下的工作線程

    /**
     * 對所有不是正在執(zhí)行任務的工作線程都發(fā)起中斷請求秆撮。
     */
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 遍歷工作線程Worker集合
            for (Worker w : workers) {
                Thread t = w.thread;
                // 如果工作線程中斷標志位是false,
                // 且能夠獲取鎖换况,即當前工作線程沒有運行任務
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        // 發(fā)起中斷請求职辨。
                        // 因為獲取了鎖,所以在進入中斷請求時戈二,worker工作線程不會執(zhí)行任務
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        // 釋放鎖
                        w.unlock();
                    }
                }
                // 是否只進行一個工作線程的中斷請求舒裤。
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

遍歷工作線程Worker集合,如果工作線程出于空閑狀態(tài)觉吭,且沒有被中斷腾供,那么就發(fā)起中斷請求。通過獨占鎖Worker知道鲜滩,當前工作線程是否在執(zhí)行任務伴鳖。

8.4 對所有已開啟的工作線程發(fā)起中斷請求

    /**
     * 對所有工作線程都發(fā)起中斷請求, 不管這個工作線程是否正在執(zhí)行任務
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                 // 如果w的工作線程thread已經(jīng)開啟,那么發(fā)起中斷請求徙硅。
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

遍歷工作線程Worker集合榜聂,調(diào)用Worker的interruptIfStarted方法,如果工作線程已開啟嗓蘑,那么就會發(fā)起中斷须肆。

8.5 嘗試完結(jié)線程池的方法

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            /**
             * 如果線程池是RUNNING狀態(tài),
             * 或者線程池是TIDYING狀態(tài)(是因為已經(jīng)有別的線程在終止線程池了)
             * 或者線程池是SHUTDOWN狀態(tài)且任務隊列不為空桩皿,
             * 線程池不能被terminate終止豌汇,直接return返回
             *
             */
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            // 線程池中工作線程數(shù)量不是0,線程池不能被terminate終止业簿,所以要return
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 鉤子方法瘤礁,提供給子類實現(xiàn)。表示線程池已經(jīng)終止梅尤。
                        terminated();
                    } finally {
                        // 設置線程池狀態(tài)是TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
        }
    }

線程池在什么情況下算是完全停止了呢柜思?有三個條件:

  1. 線程池不是RUNNING狀態(tài)岩调。
  2. 線程池中工作線程數(shù)量是0。
  3. 線程池中任務隊列為空赡盘。

所以在看看tryTerminate()中号枕,前面兩個if判斷條件,就可以理解了陨享。

8.6 等待線程池完結(jié)的方法

    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                // 如果是TERMINATED已終止狀態(tài)葱淳,那么就返回true
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                // 如果已經(jīng)超時就返回false
                if (nanos <= 0)
                    return false;
                // 讓當前線程等待。并設置超時時間nanos
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

如果線程池不是TERMINATED狀態(tài)抛姑,就讓當前線程在termination條件上等待赞厕,直到線程池變成TERMINATED狀態(tài),或者等待時間超時才會被喚醒定硝。

8.7 工作線程退出的方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 如果工作線程突然被終結(jié)皿桑,那么工作線程的數(shù)量就沒有減一。
        if (completedAbruptly)
            // 將工作線程數(shù)量減一蔬啡。
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 將工作線程的任務完成數(shù)添加到線程池完成任務總數(shù)中
            completedTaskCount += w.completedTasks;
            // 從工作線程集合中移除本工作線程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        // 因為有一個工作線程已經(jīng)完成被釋放诲侮,那么就去嘗試終結(jié)線程池。
        tryTerminate();

        int c = ctl.get();
        // 如果線程池狀態(tài)小于STOP,
        // 就要判斷終結(jié)了這個工作線程之后箱蟆,線程池中工作線程數(shù)量是否滿足需求沟绪。
        if (runStateLessThan(c, STOP)) {
            // 如果工作線程正常終結(jié),
            // 那么要看線程池中工作線程數(shù)量是否滿足需求空猜。
            if (!completedAbruptly) {
                // 不允許核心池線程釋放绽慈,那么最小值是corePoolSize,否則就可以為0
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 但是如果任務隊列中還有任務抄肖,那么工作線程數(shù)量最少為1
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 如果工作線程數(shù)量小于min值久信,就要創(chuàng)建新的工作線程了。
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 開啟一個新的工作線程
            addWorker(null, false);
        }
    }

工作線程被釋放漓摩,有兩種情況,一種是運行完成正常結(jié)束入客,一種是發(fā)生異常意外終止管毙。
當工作線程被釋放時,需要將它從工作線程集合workers中移除桌硫,將該工作線程任務完成數(shù)添加到線程池完成任務總數(shù)中夭咬。調(diào)用tryTerminate方法嘗試終結(jié)線程池。
另外因為有一個工作線程被釋放铆隘,那么就要考慮線程池中當前工作線程數(shù)量是否符合要求卓舵,要不要添加新的工作線程。

九. 創(chuàng)建線程池的方法膀钠。

上面分析完線程池的功能方法后掏湾,再來說說怎樣創(chuàng)建一個線程池裹虫。

9.1 構(gòu)造函數(shù)

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        // 判斷設置的核心池數(shù)量corePoolSize、最大池數(shù)量maximumPoolSize融击、
        // 與線程空閑存活時間keepAliveTime的值筑公,是否符合要求
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();

        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();

        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();

        // 賦值
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

ThreadPoolExecutor類一共有四個構(gòu)造函數(shù),前面三個構(gòu)造函數(shù)都是調(diào)用后面那個構(gòu)造函數(shù)來實現(xiàn)的尊浪。參數(shù)意義:

  1. corePoolSize: 線程池核心池線程個數(shù)匣屡。
  2. maximumPoolSize: 線程池允許最大的線程個數(shù)。
  3. keepAliveTime: 線程空閑時拇涤,允許存活的時間捣作。
  4. unit:輔助變量,用來將keepAliveTime參數(shù)鹅士,轉(zhuǎn)成對應納秒值虾宇。
  5. workQueue:儲存所有待執(zhí)行任務的阻塞隊列
  6. threadFactory:用來創(chuàng)建線程的工廠類
  7. handler:通過它來通知調(diào)用值,線程池拒絕了任務如绸。
    注:有沒有注意到嘱朽,沒有傳遞allowCoreThreadTimeOut這個參數(shù),那么怎么設置這個成語變量呢怔接?通過allowCoreThreadTimeOut(boolean value)方法來設置搪泳。

一般我們不用自己來new ThreadPoolExecutor對象,而是通過Executors這個工具類來創(chuàng)建ThreadPoolExecutor實例扼脐。

9.2 創(chuàng)建固定數(shù)量的線程池

    // 創(chuàng)建固定數(shù)量的線程池
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    // 創(chuàng)建固定數(shù)量的線程池
    public static ExecutorService newFixedThreadPool(int nThreads, 
              ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

根據(jù)我們前面講解岸军,要想線程池維持固定數(shù)量的工作線程,那么工作線程就不能被釋放瓦侮,就要做到兩點:

  1. allowCoreThreadTimeOut為false艰赞,這個是默認的。keepAliveTime設置為0肚吏,這樣當調(diào)用allowCoreThreadTimeOut(boolean value)方法修改allowCoreThreadTimeOut值時方妖,會拋出異常,不允許修改罚攀。
  2. 核心池數(shù)量和最大池數(shù)量一樣党觅,防止添加新的工作線程池。任務隊列容量要足夠大斋泄,防止任務添加到任務隊列中失敗杯瞻,不能執(zhí)行。

9.3 創(chuàng)建單個線程的線程池

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    public static ExecutorService newSingleThreadExecutor(
              ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

與固定數(shù)量的線程池相比:

  1. 將固定數(shù)量nThreads變成了1
  2. 使用了FinalizableDelegatedExecutorService這個代理類炫掐,主要作用就是當對象被銷毀時魁莉,會調(diào)用shutdown方法,停止線程池。

9.4 創(chuàng)建緩存線程池

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

什么叫做緩存線程池旗唁,當有任務執(zhí)行時畦浓,會創(chuàng)建工作線程來執(zhí)行任務,當任務執(zhí)行完畢后逆皮,工作線程會等待一段時間宅粥,如果還是沒有任務需要執(zhí)行,那么就會釋放工作線程电谣。

十. ThreadPoolExecutor 重要參數(shù)方法

10.1 getActiveCount 正在執(zhí)行任務線程數(shù)

    public int getActiveCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int n = 0;
            for (Worker w : workers)
                // isLocked() 表示這個 Worker 正在執(zhí)行任務
                if (w.isLocked())
                    ++n;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

這個方法獲取正在執(zhí)行任務的線程數(shù)量秽梅。w.isLocked() 表示正在執(zhí)行任務的 Worker

10.2 getCompletedTaskCount 返回完成任務大致數(shù)量

  public long getCompletedTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // completedTaskCount 表示已完成 Worker 的completedTasks數(shù)量之和剿牺。
            long n = completedTaskCount;
            for (Worker w : workers)
                n += w.completedTasks;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

返回已完成執(zhí)行的任務的大致總數(shù)企垦。由于任務和線程的狀態(tài)可能在計算過程中動態(tài)變化,因此返回值只是一個近似值晒来。

completedTaskCount 表示已完成 WorkercompletedTasks 數(shù)量之和钞诡。在 Worker 退出方法 processWorkerExit() 中進行增加操作。

10.3 getTaskCount 返回已計劃執(zhí)行的任務的大致總數(shù)

    public long getTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // completedTaskCount 表示已完成 Worker 的completedTasks數(shù)量之和湃崩。
            long n = completedTaskCount;
            for (Worker w : workers) {
                n += w.completedTasks;
                // w.isLocked() 表示一個任務正在執(zhí)行
                if (w.isLocked())
                    ++n;
            }
            // 再加上待執(zhí)行的任務數(shù)量
            return n + workQueue.size();
        } finally {
            mainLock.unlock();
        }
    }

返回已計劃執(zhí)行的任務的大致總數(shù)荧降。由于任務和線程的狀態(tài)可能在計算過程中動態(tài)變化,因此返回值只是一個近似值攒读。

10.4 getCorePoolSize 方法

    public int getCorePoolSize() {
        return corePoolSize;
    }

返回線程池核心線程數(shù)量朵诫。

10.5 getKeepAliveTime 方法

   public long getKeepAliveTime(TimeUnit unit) {
        return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
    }

返回線程保持活動時間。一是當線程超過核心線程數(shù)時薄扁,超過的線程超過 keepAliveTime 時間沒有執(zhí)行任務剪返,就會關閉;二是 allowCoreThreadTimeOut 值為 true邓梅,那么核心線程空閑事件超過 keepAliveTime 也會關閉脱盲。

10.6 getLargestPoolSize 方法

    public int getLargestPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            return largestPoolSize;
        } finally {
            mainLock.unlock();
        }
    }

獲取曾經(jīng)出現(xiàn)的最大線程數(shù)。

10.7 getMaximumPoolSize 方法

    public int getMaximumPoolSize() {
        return maximumPoolSize;
    }

獲取最大線程數(shù)日缨。

10.8 getPoolSize 方法

    public int getPoolSize() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Remove rare and surprising possibility of
            // isTerminated() && getPoolSize() > 0
            return runStateAtLeast(ctl.get(), TIDYING) ? 0
                : workers.size();
        } finally {
            mainLock.unlock();
        }
    }

返回池中的當前線程數(shù)钱反。

10.9 getQueue 方法

    public BlockingQueue<Runnable> getQueue() {
        return workQueue;
    }

返回待執(zhí)行任務隊列。

十一 總結(jié)

線程池有兩個概念核心池與最大池殿遂。

  1. 核心池:線程池應該維持的工作線程數(shù)量诈铛,如果線程池中工作線程數(shù)量小于核心池數(shù)量,就會創(chuàng)建新的工作線程添加到線程池中墨礁。
  2. 最大池: 線程池中臨時存在的工作線程,當任務隊列不能添加新任務時耳峦,就會創(chuàng)建新的工作線程添加到線程池中恩静。執(zhí)行完任務后,超過一定時間沒有接受到新任務,這個臨時工作線程就會被釋放驶乾。

兩者的區(qū)別:

  1. 線程釋放:最大池中的線程當超過一定時間沒有接受到新任務邑飒,就會被釋放,而核心池中的線程级乐,一般不釋放疙咸,只有設置allowCoreThreadTimeOut為true,且超過一定時間沒有接受到新任務风科,也會被釋放撒轮。
  2. 創(chuàng)建時機:線程池中工作線程數(shù)量小于核心池數(shù)量,就會創(chuàng)建核心池線程贼穆。但是對于最大池來說题山,只有任務隊列已滿,不能添加新任務時故痊,才會創(chuàng)建新線程顶瞳,放入最大池中。
    注:一般稱小于等于corePoolSize數(shù)量的工作線程池是核心池中的線程愕秫,大于corePoolSize數(shù)量的工作線程池就是最大池中的線程慨菱。

11.1 線程池執(zhí)行任務流程

通過execute方法執(zhí)行新任務command,分為三個步驟:

  1. 線程池中工作線程數(shù)量小于核心池數(shù)量戴甩,那么就開啟新的工作線程來執(zhí)行任務符喝。
  2. 線程池中工作線程數(shù)量達到核心池數(shù)量,那么就將新任務添加到任務隊列中等恐。
  3. 如果新任務添加到任務隊列失敗洲劣,那么就開啟新的工作線程來執(zhí)行任務(這個線程就在最大池中了)。

在每個工作線程课蔬,會通過循環(huán)囱稽,調(diào)用getTask方法,不斷地從任務隊列中獲取任務來執(zhí)行二跋。如果任務隊列中沒有任務战惊,那么getTask方法會阻塞當前工作線程。

但是工作線程被喚醒后扎即,getTask方法返回null吞获,那么就會跳出循環(huán),該工作線程運行結(jié)束谚鄙,準備釋放各拷。

11.2 終止線程池

線程池不可能立即就終止,因為涉及到線程池正在執(zhí)行任務的線程和任務隊列中等待執(zhí)行的任務該如何處理問題闷营,有兩個方式:

  1. shutdown方法:不能再向線程池中添加新任務了烤黍,但是已經(jīng)添加到任務隊列的任務還是會執(zhí)行知市,也不會對正在執(zhí)行任務的線程發(fā)起中斷請求。等待任務隊列任務執(zhí)行完成速蕊,釋放線程池中所有線程嫂丙,線程池進入完全終止狀態(tài)。
  2. shutdownNow方法:不能再向線程池中添加新任務了规哲,也不會執(zhí)行已經(jīng)添加到任務隊列的任務跟啤,但是會返回未執(zhí)行的任務集合。而且對所有工作線程都發(fā)起中斷請求, 不管這個工作線程是否正在執(zhí)行任務唉锌。等待線程池中所有線程釋放隅肥,線程池進入完全終止狀態(tài)。

兩者的區(qū)別:

兩者都不能再向線程池中添加新任務了糊秆。shutdown方法還是會將已添加的任務都執(zhí)行完畢武福,而shutdownNow方法不會再執(zhí)行任何新任務了。
注:對于正在執(zhí)行的任務是可能執(zhí)行完成的痘番,因為中斷請求只能中斷處于WAITING與TIMED_WAITING狀態(tài)的線程捉片,對于處于其他狀態(tài)的線程不起作用。

十二. 重要示例

12.1 正常運行線程池

package com.zhang._22._5;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

class Run implements Runnable {

    private int index;

    public Run(int index) {
        this.index = index+1;
    }

    @Override
    public void run() {
        System.out.println("--"+Thread.currentThread().getName()+"開始運行 任務"+index);
        try {
            int waitTime = 100 + index * 10;
            Thread.sleep(waitTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("======="+Thread.currentThread().getName()+"結(jié)束 任務"+index);
    }
}

class MyThreadFactory implements ThreadFactory {
    private int sequenceNumber = 0;

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "線程"+(++sequenceNumber));
    }
}

public class ThreadPoolExecutorTest {

    public static void main(String[] args) {

        ThreadFactory threadFactory = new MyThreadFactory();

        // 固定數(shù)量的線程池
        ExecutorService service = Executors.newFixedThreadPool(3, threadFactory);

//        // 單個線程的線程池
//        ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
//
//        // 緩存線程池
//        ExecutorService service = Executors.newCachedThreadPool(threadFactory);

        for (int i = 0; i < 6; i++) {
            service.execute(new Run(i));
        }
    }
}

運行結(jié)果:

--線程1開始運行 任務1
--線程2開始運行 任務2
--線程3開始運行 任務3
=======線程1結(jié)束 任務1
--線程1開始運行 任務4
=======線程2結(jié)束 任務2
--線程2開始運行 任務5
=======線程3結(jié)束 任務3
--線程3開始運行 任務6
=======線程1結(jié)束 任務4
=======線程2結(jié)束 任務5
=======線程3結(jié)束 任務6

這里使用的是固定數(shù)量的線程池汞舱,所以只有三個線程來執(zhí)行任務伍纫,未執(zhí)行到的任務只能等待。
如果換成單個線程的線程池昂芜,那么只有一個線程在執(zhí)行任務莹规。
而緩存線程池呢?你就會發(fā)現(xiàn)居然有六個線程在執(zhí)行任務泌神,就是有多少任務創(chuàng)建多少個線程良漱。

運行完任務后,你會發(fā)現(xiàn)程序沒有結(jié)束欢际,那是因為線程池沒有被終止母市。

12.2 終止線程池

package com.zhang._22._5;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

class Run implements Runnable {

    private int index;

    public Run(int index) {
        this.index = index+1;
    }

    @Override
    public void run() {
        System.out.println("--"+Thread.currentThread().getName()+"開始運行 任務"+index);
        try {
            int waitTime = 100 + index * 10;
            Thread.sleep(waitTime);
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName()+" 發(fā)生中斷異常  exception=="+e.getMessage());
        }
        System.out.println("======="+Thread.currentThread().getName()+"結(jié)束 任務"+index);
    }
}

class MyThreadFactory implements ThreadFactory {
    private int sequenceNumber = 0;

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "線程"+(++sequenceNumber));
    }
}

public class ThreadPoolExecutorTest {

    public static void main(String[] args) {

        ThreadFactory threadFactory = new MyThreadFactory();

        // 固定數(shù)量的線程池
        ExecutorService service = Executors.newFixedThreadPool(3, threadFactory);

//        // 單個線程的線程池
//        ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
//
//        // 緩存線程池
//        ExecutorService service = Executors.newCachedThreadPool(threadFactory);

        for (int i = 0; i < 6; i++) {
            service.execute(new Run(i));
        }
        // 還是會執(zhí)行完已經(jīng)添加的任務
        service.shutdown();
    }
}

運行結(jié)果:

--線程1開始運行 任務1
--線程3開始運行 任務3
--線程2開始運行 任務2
=======線程1結(jié)束 任務1
--線程1開始運行 任務4
=======線程2結(jié)束 任務2
--線程2開始運行 任務5
=======線程3結(jié)束 任務3
--線程3開始運行 任務6
=======線程1結(jié)束 任務4
=======線程2結(jié)束 任務5
=======線程3結(jié)束 任務6

Process finished with exit code 0

使用shutdown方法艇棕,還是會執(zhí)行完已經(jīng)添加的任務此改。最后程序退出。

package com.zhang._22._5;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

class Run implements Runnable {

    private int index;

    public Run(int index) {
        this.index = index+1;
    }

    @Override
    public void run() {
        System.out.println("--"+Thread.currentThread().getName()+"開始運行 任務"+index);
        try {
            int waitTime = 100 + index * 10;
            Thread.sleep(waitTime);
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName()+" 發(fā)生中斷異常  exception=="+e.getMessage());
        }
        System.out.println("======="+Thread.currentThread().getName()+"結(jié)束 任務"+index);
    }
}

class MyThreadFactory implements ThreadFactory {
    private int sequenceNumber = 0;

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "線程"+(++sequenceNumber));
    }
}

public class ThreadPoolExecutorTest {

    public static void main(String[] args) {

        ThreadFactory threadFactory = new MyThreadFactory();

        // 固定數(shù)量的線程池
        ExecutorService service = Executors.newFixedThreadPool(3, threadFactory);

//        // 單個線程的線程池
//        ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
//
//        // 緩存線程池
//        ExecutorService service = Executors.newCachedThreadPool(threadFactory);

        for (int i = 0; i < 6; i++) {
            service.execute(new Run(i));
        }

        service.shutdownNow();
    }
}

運行結(jié)果:

--線程1開始運行 任務1
--線程2開始運行 任務2
--線程3開始運行 任務3
線程2 發(fā)生中斷異常  exception==sleep interrupted
線程1 發(fā)生中斷異常  exception==sleep interrupted
=======線程1結(jié)束 任務1
=======線程2結(jié)束 任務2
線程3 發(fā)生中斷異常  exception==sleep interrupted
=======線程3結(jié)束 任務3

Process finished with exit code 0

使用shutdownNow方法示启,在任務隊列中等待的任務是不會執(zhí)行的浑槽,而且立即發(fā)起線程中斷請求蒋失。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市桐玻,隨后出現(xiàn)的幾起案子篙挽,更是在濱河造成了極大的恐慌,老刑警劉巖镊靴,帶你破解...
    沈念sama閱讀 207,113評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嫉髓,死亡現(xiàn)場離奇詭異观腊,居然都是意外死亡邑闲,警方通過查閱死者的電腦和手機算行,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評論 2 381
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來苫耸,“玉大人州邢,你說我怎么就攤上這事⊥首樱” “怎么了量淌?”我有些...
    開封第一講書人閱讀 153,340評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長嫌褪。 經(jīng)常有香客問我呀枢,道長,這世上最難降的妖魔是什么笼痛? 我笑而不...
    開封第一講書人閱讀 55,449評論 1 279
  • 正文 為了忘掉前任裙秋,我火速辦了婚禮,結(jié)果婚禮上缨伊,老公的妹妹穿的比我還像新娘摘刑。我一直安慰自己,他們只是感情好刻坊,可當我...
    茶點故事閱讀 64,445評論 5 374
  • 文/花漫 我一把揭開白布枷恕。 她就那樣靜靜地躺著,像睡著了一般谭胚。 火紅的嫁衣襯著肌膚如雪徐块。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,166評論 1 284
  • 那天灾而,我揣著相機與錄音胡控,去河邊找鬼。 笑死绰疤,一個胖子當著我的面吹牛铜犬,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播轻庆,決...
    沈念sama閱讀 38,442評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼癣猾,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了余爆?” 一聲冷哼從身側(cè)響起纷宇,我...
    開封第一講書人閱讀 37,105評論 0 261
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎蛾方,沒想到半個月后像捶,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體上陕,經(jīng)...
    沈念sama閱讀 43,601評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,066評論 2 325
  • 正文 我和宋清朗相戀三年拓春,在試婚紗的時候發(fā)現(xiàn)自己被綠了释簿。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,161評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡硼莽,死狀恐怖庶溶,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情懂鸵,我是刑警寧澤偏螺,帶...
    沈念sama閱讀 33,792評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站匆光,受9級特大地震影響套像,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜终息,卻給世界環(huán)境...
    茶點故事閱讀 39,351評論 3 307
  • 文/蒙蒙 一夺巩、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧采幌,春花似錦劲够、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至磨取,卻和暖如春人柿,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背忙厌。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評論 1 261
  • 我被黑心中介騙來泰國打工凫岖, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人逢净。 一個月前我還...
    沈念sama閱讀 45,618評論 2 355
  • 正文 我出身青樓哥放,卻偏偏與公主長得像,于是被迫代替她去往敵國和親爹土。 傳聞我的和親對象是個殘疾皇子甥雕,可洞房花燭夜當晚...
    茶點故事閱讀 42,916評論 2 344

推薦閱讀更多精彩內(nèi)容