線程池及其靈魂ThreadPoolExecutor

先上類圖:


各級(jí)類功能瀏覽

  • Executors
    工廠類河狐,負(fù)責(zé)創(chuàng)建各式各樣的Threadpoolexecutors出來
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor() 
public static ExecutorService newCachedThreadPool() 
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
  • Executor
public interface Executor {
    // 一個(gè)抽象方法
    void execute(Runnable command);
}
  • ExecutorService

增加一寫生命周期管理,任務(wù)提交增強(qiáng)處理等api。

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

   
    boolean isShutdown();


    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

   
    <T> Future<T> submit(Callable<T> task);

    
    <T> Future<T> submit(Runnable task, T result);

    
    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

  
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • AbstractExecutorService

提供了invokeAny, invokeAll,submit等方法的默認(rèn)實(shí)現(xiàn)馋艺。

  • ScheduledExecutorService

提供了兩個(gè)叼逼定時(shí)任務(wù)方法栅干,一看就很親切

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                          long delay, TimeUnit unit);

   public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                 long initialDelay,
                                                 long period,
                                                 TimeUnit unit);


   public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                    long initialDelay,
                                                    long delay,
                                                    TimeUnit unit);

}
  • ThreadPoolExecutor

這個(gè)叼逼在最后慢慢介紹

  • DelegatedExecutorService(圖是扣的,這筆我不認(rèn)識(shí)捐祠。碱鳞。。踱蛀。)

  • ThreadFactory

jdk提供的線程創(chuàng)建接口
有自己的默認(rèn)實(shí)現(xiàn)DefaultThreadFactory窿给,只是在創(chuàng)建線程時(shí)為線程添加了命名等。我們可以自己實(shí)現(xiàn)該接口實(shí)現(xiàn)自定義的任何創(chuàng)建線程的方法率拒。創(chuàng)建線程池時(shí)我們可以自定義填大。

  • 圖中沒有,不得不提一下RunnableFuture這個(gè)類
public interface RunnableFuture<V> extends Runnable, Future<V> {
   
    void run();
}

異步獲取線程池里線程執(zhí)行結(jié)果就靠他了G伍佟允华!

ThreadPoolExecutor規(guī)則

  • corePoolSize與maximumPoolSize 由于ThreadPoolExecutor 將根據(jù) corePoolSize和 maximumPoolSize設(shè)置的邊界自動(dòng)調(diào)整池大小,當(dāng)新任務(wù)在方法 execute(java.lang.Runnable) 中提交時(shí):
     ×绕(1)如果運(yùn)行的線程少于 corePoolSize靴寂,則創(chuàng)建新線程來處理請(qǐng)求,即使其他輔助線程是空閑的召耘;
     “倬妗(2)如果設(shè)置的corePoolSize 和 maximumPoolSize相同,則創(chuàng)建的線程池是大小固定的污它,如果運(yùn)行的線程與corePoolSize相同剖踊,當(dāng)有新請(qǐng)求過來時(shí),若workQueue未滿衫贬,則將請(qǐng)求放入workQueue中德澈,等待有空閑的線程去從workQueue中取任務(wù)并處理
      (3)如果運(yùn)行的線程多于 corePoolSize 而少于 maximumPoolSize固惯,則僅當(dāng)隊(duì)列滿時(shí)才創(chuàng)建新線程才創(chuàng)建新的線程去處理請(qǐng)求梆造;
      (4)如果運(yùn)行的線程多于corePoolSize 并且等于maximumPoolSize葬毫,若隊(duì)列已經(jīng)滿了镇辉,則通過handler所指定的策略來處理新請(qǐng)求纵诞;
     ∠敖佟(5)如果將 maximumPoolSize 設(shè)置為基本的無界值(如 Integer.MAX_VALUE),則允許池適應(yīng)任意數(shù)量的并發(fā)任務(wù)
      
      也就是說损敷,處理任務(wù)的優(yōu)先級(jí)為:
     ±谜(1) 核心線程corePoolSize > 任務(wù)隊(duì)列workQueue > 最大線程maximumPoolSize屹逛,如果三者都滿了础废,使用handler處理被拒絕的任務(wù)。
     〖逶础(2)當(dāng)池中的線程數(shù)大于corePoolSize的時(shí)候色迂,多余的線程會(huì)等待keepAliveTime長(zhǎng)的時(shí)間,如果無請(qǐng)求可處理就自行銷毀手销。

  • cache線程池使用SynchronousQueue歇僧,fixed使用LinkedBlockingQueue
    區(qū)別:
    a) cache,創(chuàng)業(yè)公司有特別多的活要干,那就有一個(gè)活來現(xiàn)在的人沒時(shí)間認(rèn)領(lǐng)就去招聘一個(gè)新人锋拖,后來活干完了少了诈悍,招聘進(jìn)來的人閑了,閑下來超過xxx時(shí)間后又把他們開除了兽埃。
    b) fixed,創(chuàng)業(yè)公司有特別多的活要干侥钳,但是公司就這么幾個(gè)人,有任務(wù)來就排期吧柄错,反正這幾個(gè)人一個(gè)個(gè)處理舷夺。。售貌。给猾。

  • 隊(duì)列滿了之后的應(yīng)對(duì)策略(rejecthanlder)
    a) 直接拋棄
    b) 拋出異常
    c) 主線程執(zhí)行
    d) 將最老的線程踢出去,把這個(gè)線程放入隊(duì)列颂跨。

ThreadPoolExecutor源碼

1.小技巧敢伸,把線程數(shù)和線程池狀態(tài)存在一個(gè)原子整數(shù)中,通過cas原子更新

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

低29位存線程數(shù)恒削,高3位存runState(線程池生命周期),這樣runState有5個(gè)值,但是出于簡(jiǎn)單就考慮以下三個(gè)值:
RUNNING狀態(tài):線程池正常運(yùn)行池颈,可以接受新的任務(wù)并處理隊(duì)列中的任務(wù);
SHUTDOWN狀態(tài):不再接受新的任務(wù)钓丰,但是會(huì)執(zhí)行隊(duì)列中的任務(wù)躯砰;
STOP狀態(tài):不再接受新任務(wù),不處理隊(duì)列中的任務(wù)

提供幾個(gè)工具函數(shù)針對(duì)gictl這個(gè)變量操作:

/**
 * 這個(gè)方法用于取出runState的值 因?yàn)镃APACITY值為:00011111111111111111111111111111
 * ~為按位取反操作斑粱,則~CAPACITY值為:11100000000000000000000000000000
 * 再同參數(shù)做&操作弃揽,就將低29位置0了,而高3位還是保持原先的值则北,也就是runState的值
 * 
 * @param c
 *            該參數(shù)為存儲(chǔ)runState和workerCount的int值
 * @return runState的值
 */
private static int runStateOf(int c) {
    return c & ~CAPACITY;
}


/**
 * 這個(gè)方法用于取出workerCount的值
 * 因?yàn)镃APACITY值為:00011111111111111111111111111111,所以&操作將參數(shù)的高3位置0了
 * 保留參數(shù)的低29位痕慢,也就是workerCount的值
 * 
 * @param c
 *            ctl, 存儲(chǔ)runState和workerCount的int值
 * @return workerCount的值
 */
private static int workerCountOf(int c) {
    return c & CAPACITY;
}

/**
 * 將runState和workerCount存到同一個(gè)int中
 * “|”運(yùn)算的意思是尚揣,假設(shè)rs的值是101000,wc的值是000111掖举,則他們位或運(yùn)算的值為101111
 * 
 * @param rs
 *            runState移位過后的值快骗,負(fù)責(zé)填充返回值的高3位
 * @param wc
 *            workerCount移位過后的值,負(fù)責(zé)填充返回值的低29位
 * @return 兩者或運(yùn)算過后的值
 */
private static int ctlOf(int rs, int wc) {
    return rs | wc;
}

// 只有RUNNING狀態(tài)會(huì)小于0
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

2. Worker

實(shí)現(xiàn)了互斥鎖但不可重入,所以沒有使用ReentrantLock方篮。
貼重要代碼:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            // 真正執(zhí)行的線程
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        // 實(shí)現(xiàn)不可重入的互斥鎖
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
    }

3.開始從execute方法進(jìn)入

活動(dòng)線程小于corePoolSize的時(shí)候創(chuàng)建新的線程名秀;
活動(dòng)線程大于corePoolSize時(shí)都是先加入到任務(wù)隊(duì)列當(dāng)中;
任務(wù)隊(duì)列滿了再去啟動(dòng)新的線程藕溅,如果線程數(shù)達(dá)到最大值就拒絕任務(wù)匕得。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    // 活動(dòng)線程數(shù) < corePoolSize
    if (workerCountOf(c) < corePoolSize) {
        // 直接啟動(dòng)新的線程。第二個(gè)參數(shù)true:addWorker中會(huì)重新檢查workerCount是否小于corePoolSize
        if (addWorker(command, true))
            // 添加成功返回
            return;
        c = ctl.get();
    }
    // 活動(dòng)線程數(shù) >= corePoolSize
    // runState為RUNNING && 隊(duì)列未滿
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // double check
        // 非RUNNING狀態(tài) 則從workQueue中移除任務(wù)并拒絕
        if (!isRunning(recheck) && remove(command))
            reject(command);// 采用線程池指定的策略拒絕任務(wù)
        // 線程池處于RUNNING狀態(tài) || 線程池處于非RUNNING狀態(tài)但是任務(wù)移除失敗
        else if (workerCountOf(recheck) == 0)
            // 這行代碼是為了SHUTDOWN狀態(tài)下沒有活動(dòng)線程了巾表,但是隊(duì)列里還有任務(wù)沒執(zhí)行這種特殊情況汁掠。
            // 添加一個(gè)null任務(wù)是因?yàn)镾HUTDOWN狀態(tài)下,線程池不再接受新任務(wù)
            addWorker(null, false);

        // 兩種情況:
        // 1.非RUNNING狀態(tài)拒絕新的任務(wù)
        // 2.隊(duì)列滿了啟動(dòng)新的線程失敿摇(workCount > maximumPoolSize)
    } else if (!addWorker(command, false))
        reject(command);
}

addWorker(null, false);這一行考阱,這要結(jié)合addWorker一起來看。 主要目的是防止HUTDOWN狀態(tài)下沒有活動(dòng)線程了鞠苟,但是隊(duì)列里還有任務(wù)沒執(zhí)行這種特殊情況乞榨。

4.addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
        retry: for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);// 當(dāng)前線程池狀態(tài)

            // Check if queue empty only if necessary.
            // 這條語句等價(jià):rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||
            // workQueue.isEmpty())
            // 滿足下列調(diào)價(jià)則直接返回false,線程創(chuàng)建失敗:
            // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此時(shí)不再接受新的任務(wù)当娱,且所有任務(wù)執(zhí)行結(jié)束
            // rs = SHUTDOWN:firtTask != null 此時(shí)不再接受任務(wù)吃既,但是仍然會(huì)執(zhí)行隊(duì)列中的任務(wù)
            // rs = SHUTDOWN:firtTask == null見execute方法的addWorker(null,
            // false),任務(wù)為null && 隊(duì)列為空
            // 最后一種情況也就是說SHUTDONW狀態(tài)下趾访,如果隊(duì)列不為空還得接著往下執(zhí)行态秧,為什么?add一個(gè)null任務(wù)目的到底是什么扼鞋?
            // 看execute方法只有workCount==0的時(shí)候firstTask才會(huì)為null結(jié)合這里的條件就是線程池SHUTDOWN了不再接受新任務(wù)
            // 但是此時(shí)隊(duì)列不為空申鱼,那么還得創(chuàng)建線程把任務(wù)給執(zhí)行完才行。
            if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
                return false;

            // 走到這的情形:
            // 1.線程池狀態(tài)為RUNNING
            // 2.SHUTDOWN狀態(tài)云头,但隊(duì)列中還有任務(wù)需要執(zhí)行
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))// 原子操作遞增workCount
                    break retry;// 操作成功跳出的重試的循環(huán)
                c = ctl.get(); // Re-read ctl
                if (runStateOf(c) != rs)// 如果線程池的狀態(tài)發(fā)生變化則重試
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // wokerCount遞增成功

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 并發(fā)的訪問線程池workers對(duì)象必須加鎖
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    // RUNNING狀態(tài) || SHUTDONW狀態(tài)下清理隊(duì)列中剩余的任務(wù)
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 將新啟動(dòng)的線程添加到線程池中
                        workers.add(w);
                        // 更新largestPoolSize
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 啟動(dòng)新添加的線程捐友,這個(gè)線程首先執(zhí)行firstTask,然后不停的從隊(duì)列中取任務(wù)執(zhí)行
                // 當(dāng)?shù)却齥eepAlieTime還沒有任務(wù)執(zhí)行則該線程結(jié)束溃槐。見runWoker和getTask方法的代碼匣砖。
                if (workerAdded) {
                    t.start();// 最終執(zhí)行的是ThreadPoolExecutor的runWoker方法
                    workerStarted = true;
                }
            }
        } finally {
            // 線程啟動(dòng)失敗,則從wokers中移除w并遞減wokerCount
            if (!workerStarted)
                // 遞減wokerCount會(huì)觸發(fā)tryTerminate方法
                addWorkerFailed(w);
        }
        return workerStarted;
    }

5.runWorker方法

任務(wù)添加成功后實(shí)際執(zhí)行的是runWorker這個(gè)方法昏滴,這個(gè)方法非常重要猴鲫,簡(jiǎn)單來說它做的就是:

第一次啟動(dòng)會(huì)執(zhí)行初始化傳進(jìn)來的任務(wù)firstTask;
然后會(huì)從workQueue中取任務(wù)執(zhí)行谣殊,如果隊(duì)列為空則等待keepAliveTime這么長(zhǎng)時(shí)間拂共。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // Worker的構(gòu)造函數(shù)中抑制了線程中斷setState(-1)----AQS中設(shè)置了state為-1,unlock會(huì)把state重新置為0,所以這里需要unlock從而允許中斷
        w.unlock();
        // 用于標(biāo)識(shí)是否異常終止姻几,finally中processWorkerExit的方法會(huì)有不同邏輯
        // 為true的情況:1.執(zhí)行任務(wù)拋出異常宜狐;2.被中斷势告。
        boolean completedAbruptly = true;
        try {
            // 如果getTask返回null那么getTask中會(huì)將workerCount遞減,如果異常了這個(gè)遞減操作會(huì)在processWorkerExit中處理
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted. This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                        && !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 任務(wù)執(zhí)行前可以插入一些處理抚恒,子類重載該方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();// 執(zhí)行用戶任務(wù)
                    } catch (RuntimeException x) {
                        thrown = x;
                        throw x;
                    } catch (Error x) {
                        thrown = x;
                        throw x;
                    } catch (Throwable x) {
                        thrown = x;
                        throw new Error(x);
                    } finally {
                        // 和beforeExecute一樣咱台,留給子類去重載
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }

            completedAbruptly = false;
        } finally {
            // 結(jié)束線程的一些清理工作
            processWorkerExit(w, completedAbruptly);
        }
    }

從上面的源碼上可以看出,這里叉入了兩個(gè)鉤子俭驮,執(zhí)行前后處理回溺,所以可以自己擴(kuò)展ThreadPoolExecutor實(shí)現(xiàn)一些增強(qiáng)功能,如暫停執(zhí)行(jdk8源碼注釋上就有這個(gè)例子):

class PausableThreadPoolExecutor extends ThreadPoolExecutor {
    private boolean isPaused;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();
 
    public PausableThreadPoolExecutor(...) { super(...); }
 
    protected void beforeExecute(Thread t, Runnable r) {
      super.beforeExecute(t, r);
      pauseLock.lock();
      try {
        while (isPaused) unpaused.await();
      } catch (InterruptedException ie) {
        t.interrupt();
      } finally {
        pauseLock.unlock();
      }
    }
 
    public void pause() {
      pauseLock.lock();
      try {
        isPaused = true;
      } finally {
        pauseLock.unlock();
      }
    }
 
    public void resume() {
      pauseLock.lock();
      try {
        isPaused = false;
        unpaused.signalAll();
      } finally {
        pauseLock.unlock();
      }
    }
 }}

6.getTask方法(這里實(shí)現(xiàn)了一只獲取不到任務(wù)表鳍,超時(shí)就終結(jié)掉線程)

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry: for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 1.rs > SHUTDOWN 所以rs至少等于STOP,這時(shí)不再處理隊(duì)列中的任務(wù)
            // 2.rs = SHUTDOWN 所以rs>=STOP肯定不成立馅而,這時(shí)還需要處理隊(duì)列中的任務(wù)除非隊(duì)列為空
            // 這兩種情況都會(huì)返回null讓runWoker退出while循環(huán)也就是當(dāng)前線程結(jié)束了,所以必須要decrement
            // wokerCount
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 遞減workerCount值
                decrementWorkerCount();
                return null;
            }

            // 標(biāo)記從隊(duì)列中取任務(wù)時(shí)是否設(shè)置超時(shí)時(shí)間
            boolean timed; // Are workers subject to culling?

            // 1.RUNING狀態(tài)
            // 2.SHUTDOWN狀態(tài)譬圣,但隊(duì)列中還有任務(wù)需要執(zhí)行
            for (;;) {
                int wc = workerCountOf(c);

                // 1.core thread允許被超時(shí)瓮恭,那么超過corePoolSize的的線程必定有超時(shí)
                // 2.allowCoreThreadTimeOut == false && wc >
                // corePoolSize時(shí),一般都是這種情況厘熟,core thread即使空閑也不會(huì)被回收屯蹦,只要超過的線程才會(huì)
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                // 從addWorker可以看到一般wc不會(huì)大于maximumPoolSize,所以更關(guān)心后面半句的情形:
                // 1. timedOut == false 第一次執(zhí)行循環(huán)绳姨, 從隊(duì)列中取出任務(wù)不為null方法返回 或者
                // poll出異常了重試
                // 2.timeOut == true && timed ==
                // false:看后面的代碼workerQueue.poll超時(shí)時(shí)timeOut才為true登澜,
                // 并且timed要為false,這兩個(gè)條件相悖不可能同時(shí)成立(既然有超時(shí)那么timed肯定為true)
                // 所以超時(shí)不會(huì)繼續(xù)執(zhí)行而是return null結(jié)束線程飘庄。(重點(diǎn):線程是如何超時(shí)的脑蠕??跪削?)
                if (wc <= maximumPoolSize && !(timedOut && timed))
                    break;

                // workerCount遞減谴仙,結(jié)束當(dāng)前thread
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get(); // Re-read ctl
                // 需要重新檢查線程池狀態(tài),因?yàn)樯鲜霾僮鬟^程中線程池可能被SHUTDOWN
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
// ============== 以上這一坨都是生命周期判斷以及cas遞減workcount碾盐。
            try {
                // 1.以指定的超時(shí)時(shí)間從隊(duì)列中取任務(wù)
                // 2.core thread沒有超時(shí)
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;// 超時(shí)
            } catch (InterruptedException retry) {
                timedOut = false;// 線程被中斷重試
            }
        }
    }

7.processWorkerExit

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 正常的話再runWorker的getTask方法workerCount已經(jīng)被減一了
        if (completedAbruptly)
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 累加線程的completedTasks
            completedTaskCount += w.completedTasks;
            // 從線程池中移除超時(shí)或者出現(xiàn)異常的線程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        // 嘗試停止線程池
        tryTerminate();

        int c = ctl.get();
        // runState為RUNNING或SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            // 線程不是異常結(jié)束
            if (!completedAbruptly) {
                // 線程池最小空閑數(shù)晃跺,允許core thread超時(shí)就是0,否則就是corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果min == 0但是隊(duì)列不為空要保證有1個(gè)線程來執(zhí)行隊(duì)列中的任務(wù)
                if (min == 0 && !workQueue.isEmpty())
                    min = 1;
                // 線程池還不為空那就不用擔(dān)心了
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 1.線程異常退出
            // 2.線程池為空毫玖,但是隊(duì)列中還有任務(wù)沒執(zhí)行掀虎,看addWoker方法對(duì)這種情況的處理
            addWorker(null, false);
        }
    }

8.tryTerminate

每個(gè)worker退出時(shí),都會(huì)嘗試終止線程池付枫。

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 以下狀態(tài)直接返回:
            // 1.線程池還處于RUNNING狀態(tài)
            // 2.SHUTDOWN狀態(tài)但是任務(wù)隊(duì)列非空
            // 3.runState >= TIDYING 線程池已經(jīng)停止了或在停止了
            if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
                return;

            // 只能是以下情形會(huì)繼續(xù)下面的邏輯:結(jié)束線程池烹玉。
            // 1.SHUTDOWN狀態(tài),這時(shí)不再接受新任務(wù)而且任務(wù)隊(duì)列也空了
            // 2.STOP狀態(tài)阐滩,當(dāng)調(diào)用了shutdownNow方法

            // workerCount不為0則還不能停止線程池,而且這時(shí)線程都處于空閑等待的狀態(tài)
            // 需要中斷讓線程“醒”過來春霍,醒過來的線程才能繼續(xù)處理shutdown的信號(hào)。
            if (workerCountOf(c) != 0) { // Eligible to terminate
                // runWoker方法中w.unlock就是為了可以被中斷,getTask方法也處理了中斷叶眉。
                // ONLY_ONE:這里只需要中斷1個(gè)線程去處理shutdown信號(hào)就可以了址儒。
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 進(jìn)入TIDYING狀態(tài)
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 子類重載:一些資源清理工作
                        terminated();
                    } finally {
                        // TERMINATED狀態(tài)
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 繼續(xù)awaitTermination
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

9.線程池的終止

  • shutdown
public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 線程池狀態(tài)設(shè)為SHUTDOWN,如果已經(jīng)至少是這個(gè)狀態(tài)那么則直接返回
            advanceRunState(SHUTDOWN);
            // 注意這里是中斷所有空閑的線程:runWorker中等待的線程被中斷 → 進(jìn)入processWorkerExit →
            // tryTerminate方法中會(huì)保證隊(duì)列中剩余的任務(wù)得到執(zhí)行衅疙。
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // w.tryLock能獲取到鎖莲趣,說明該線程沒有在運(yùn)行,因?yàn)閞unWorker中執(zhí)行任務(wù)會(huì)先lock饱溢,
            // 因此保證了中斷的肯定是空閑的線程喧伞。
           // Worker實(shí)現(xiàn)的是不可重入鎖,獲取鎖失敗就說明任務(wù)正在執(zhí)行中
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    }
    finally {
        mainLock.unlock();
    }
}
  • shutdownnow
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // STOP狀態(tài):不再接受新任務(wù)且不再執(zhí)行隊(duì)列中的任務(wù)绩郎。
        advanceRunState(STOP);
        // 中斷所有線程
        interruptWorkers();
        // 返回隊(duì)列中還沒有被執(zhí)行的任務(wù)潘鲫。
        tasks = drainQueue();
    }
    finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

void interruptIfStarted() {
    Thread t;
    // 初始化時(shí)state == -1
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市肋杖,隨后出現(xiàn)的幾起案子溉仑,更是在濱河造成了極大的恐慌,老刑警劉巖状植,帶你破解...
    沈念sama閱讀 223,002評(píng)論 6 519
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件浊竟,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡津畸,警方通過查閱死者的電腦和手機(jī)振定,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,357評(píng)論 3 400
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來肉拓,“玉大人后频,你說我怎么就攤上這事∨荆” “怎么了卑惜?”我有些...
    開封第一講書人閱讀 169,787評(píng)論 0 365
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)丧肴。 經(jīng)常有香客問我残揉,道長(zhǎng),這世上最難降的妖魔是什么芋浮? 我笑而不...
    開封第一講書人閱讀 60,237評(píng)論 1 300
  • 正文 為了忘掉前任抱环,我火速辦了婚禮,結(jié)果婚禮上纸巷,老公的妹妹穿的比我還像新娘镇草。我一直安慰自己,他們只是感情好瘤旨,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,237評(píng)論 6 398
  • 文/花漫 我一把揭開白布梯啤。 她就那樣靜靜地躺著,像睡著了一般存哲。 火紅的嫁衣襯著肌膚如雪因宇。 梳的紋絲不亂的頭發(fā)上七婴,一...
    開封第一講書人閱讀 52,821評(píng)論 1 314
  • 那天,我揣著相機(jī)與錄音察滑,去河邊找鬼打厘。 笑死,一個(gè)胖子當(dāng)著我的面吹牛贺辰,可吹牛的內(nèi)容都是我干的户盯。 我是一名探鬼主播,決...
    沈念sama閱讀 41,236評(píng)論 3 424
  • 文/蒼蘭香墨 我猛地睜開眼饲化,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼莽鸭!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起吃靠,我...
    開封第一講書人閱讀 40,196評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤硫眨,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后撩笆,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體捺球,經(jīng)...
    沈念sama閱讀 46,716評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,794評(píng)論 3 343
  • 正文 我和宋清朗相戀三年夕冲,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了氮兵。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,928評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡歹鱼,死狀恐怖泣栈,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情弥姻,我是刑警寧澤南片,帶...
    沈念sama閱讀 36,583評(píng)論 5 351
  • 正文 年R本政府宣布,位于F島的核電站庭敦,受9級(jí)特大地震影響疼进,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜秧廉,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,264評(píng)論 3 336
  • 文/蒙蒙 一伞广、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧疼电,春花似錦嚼锄、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,755評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春沧侥,著一層夾襖步出監(jiān)牢的瞬間可霎,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,869評(píng)論 1 274
  • 我被黑心中介騙來泰國打工正什, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留啥纸,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,378評(píng)論 3 379
  • 正文 我出身青樓婴氮,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國和親盾致。 傳聞我的和親對(duì)象是個(gè)殘疾皇子主经,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,937評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容