線程池核心ThreadPoolExecutor原理

1.線程池概覽

線程池主要用于線程資源的管理,防止頻繁的創(chuàng)建以及銷毀線程你踩,提升資源的利用率凰荚。JDK中的線程池實現(xiàn)昌渤,本質(zhì)上來說就是基于生產(chǎn)者-消費者模型來實現(xiàn)的,如圖所示:


線程池概覽.png

向線程池中提交待執(zhí)行任務(wù),首先進(jìn)入阻塞隊列中排隊等待,然后統(tǒng)一由消費者worker執(zhí)行(這里的說法不是太嚴(yán)謹(jǐn),如果worker沒有超過核心線程數(shù)的話贼穆,會被直接創(chuàng)建出來的worker執(zhí)行,具體后面會有分析)兰粉,本文基于JDK8講解ThreadPoolExecutor的原理故痊。

2.一些基本變量

  • workerCount:indicating the effective number of threads(線程池中的線程數(shù)量)
  • runState:indicating whether running, shutting down etc(線程池狀態(tài))
  • ctl: the combination of workerCount and runState(In order to pack them into one int, we limit workerCount to (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 billion) otherwise representable.)(該變量為int類型,前3位保存線程池狀態(tài)玖姑,剩下29位保存線程數(shù)量愕秫,線程池中對該變量的更新都是使用CAS操作來保證線程安全)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

//計算ctl值
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

線程池一共有五種狀態(tài)

  • RUNNING: Accept new tasks and process queued tasks(接收新的任務(wù)并且處理阻塞隊列中的任務(wù))
  • SHUTDOWN: Don't accept new tasks, but process queued tasks(不再接收新的任務(wù),但是處理阻塞隊列中的任務(wù))
  • STOP: Don't accept new tasks, don't process queued tasks,and interrupt in-progress tasks(新的任務(wù)以及阻塞隊列中的任務(wù)都不再處理焰络,中斷正在被處理的任務(wù))
  • TIDYING: All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING,will run the terminated() hook method(所有的任務(wù)都被終止戴甩,線程數(shù)量為0,線程池狀態(tài)改變闪彼,執(zhí)行terminated()鉤子函數(shù)甜孤,terminated函數(shù)是線程提供出去的擴(kuò)展點,使用者可以重寫該函數(shù)畏腕,在該階段執(zhí)行自己的邏輯)
  • TERMINATED: terminated() has completed(terminated()執(zhí)行完畢)
    整個狀態(tài)的流轉(zhuǎn)注釋里面也寫得很清楚
    * RUNNING -> SHUTDOWN
    *    On invocation of shutdown(), perhaps implicitly in finalize()
    * (RUNNING or SHUTDOWN) -> STOP
    *    On invocation of shutdownNow()
    * SHUTDOWN -> TIDYING
    *    When both queue and pool are empty
    * STOP -> TIDYING
    *    When pool is empty
    * TIDYING -> TERMINATED
    *    When the terminated() hook method has completed

流程圖如下:

線程池狀態(tài)流轉(zhuǎn).png

shutdown方法執(zhí)行時比較優(yōu)雅缴川,會允許線程池把阻塞隊列中的任務(wù)執(zhí)行完成再進(jìn)行下一步,shutdownNow方法就比較暴力描馅,直接會丟棄掉阻塞隊列中未執(zhí)行的任務(wù)

3.創(chuàng)建線程池任務(wù)流程

整體流程如下:


線程池提交任務(wù)流程概覽.png

線程池處理任務(wù)的主要流程分為4步驟把夸,如上圖中step所示,主要邏輯為

  • 提交任務(wù)铭污,判斷線程數(shù)決定是否創(chuàng)建worker對象
  • 創(chuàng)建worker對象(worker對象內(nèi)部保存線程池中的線程)
  • 啟動worker對象中的線程執(zhí)行
  • 直接執(zhí)行提交任務(wù)或者從阻塞隊列中獲取待執(zhí)行的任務(wù)并執(zhí)行

上述步驟中每次在創(chuàng)建線程對象時恋日,都會判斷當(dāng)前線程池中的線程數(shù)是否符合設(shè)置并做相應(yīng)的調(diào)整,具體細(xì)節(jié)會在下文討論嘹狞;針對于獲取任務(wù)的途徑岂膳,依據(jù)于當(dāng)前線程數(shù)量與核心線程數(shù)量的值來決定是直接讓創(chuàng)建出的worker對象來執(zhí)行或是先放入阻塞隊列中,讓運行中的worker從隊列中獲取任務(wù)來執(zhí)行磅网,也會在后文中詳細(xì)討論

3.1 Step1 提交任務(wù)

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
       //獲取表示線程數(shù)量以及線程池狀態(tài)的變量
        int c = ctl.get();
      //線程數(shù)量小于核心線程數(shù)谈截,直接將提交的任務(wù)傳入addWorker,
     //創(chuàng)建出worker對象后直接執(zhí)行該任務(wù)
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
     //線程數(shù)量大于核心線程數(shù),所有提交任務(wù)直接放入到阻塞隊列中傻盟,
    //由運行中的worker從隊首獲取任務(wù)來執(zhí)行
        if (isRunning(c) && workQueue.offer(command)) {
           //double check,再次獲取線程池狀態(tài)以及線程數(shù)嫂丙,
          //以防中途線程池狀態(tài)改變或者線程數(shù)量減少為0
            int recheck = ctl.get();
           //線程池非running狀態(tài)娘赴,則將該任務(wù)從阻塞隊列中移除并且調(diào)用tryTerminate()方法
            if (! isRunning(recheck) && remove(command))
                reject(command);
           //線程數(shù)為0時,創(chuàng)建新的worker對象從阻塞隊列中獲取任務(wù)
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
       //1.阻塞隊列已滿跟啤,丟棄任務(wù)诽表,調(diào)用reject()方法
       //2.線程池為非running狀態(tài),丟棄任務(wù)隅肥,調(diào)用reject()方法
        else if (!addWorker(command, false))
            reject(command);
    }

3.2 Step2 創(chuàng)建worker對象

addWorker主要負(fù)責(zé)創(chuàng)建worker對象竿奏,每一個worker對象中有擁有一個線程,創(chuàng)建worker的本質(zhì)就是創(chuàng)建線程池中的執(zhí)行任務(wù)的線程腥放,同時worker繼承AbstractQueuedSynchronizer泛啸,自身攜帶鎖的機(jī)制,可以控制并發(fā)秃症,比如在后面提到的runWorker調(diào)用中候址,執(zhí)行任務(wù)前都會先獲取自身的鎖,在調(diào)用shutdown方法后种柑,中斷線程的調(diào)用中岗仑,也會先獲取鎖。addWorker 整體流程如下:

addWorker.png

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
           //獲取當(dāng)前線程池狀態(tài)
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                //SHUTDOWN狀態(tài)時聚请,允許線程池把阻塞隊列中的任務(wù)執(zhí)行完荠雕,
                //如果workQueue還有任務(wù),是允許增加線程數(shù)量的驶赏,
                //firstTask == null代表不是外部新提交的任務(wù)
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
               //獲取線程數(shù)量
                int wc = workerCountOf(c);
                //是否大于能存儲線程數(shù)量的最大值
                if (wc >= CAPACITY ||
                   //根據(jù)core的值選擇是對核心線程數(shù)還是最大線程數(shù)比較
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               //通過CAS操作增加線程池中記錄的線程數(shù)量
                if (compareAndIncrementWorkerCount(c))
                   //更新線程池線程數(shù)成功炸卑,繼續(xù)向下執(zhí)行
                    break retry;
               //更新失敗說明線程池數(shù)量發(fā)生改變,重新獲取線程池線程數(shù)量以及狀態(tài)
                c = ctl.get();  // Re-read ctl
               //如果狀態(tài)發(fā)生改變母市,則回到retry位置矾兜,重新執(zhí)行,判斷狀態(tài)等操作
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
               //(狀態(tài)沒有發(fā)生改變患久,繼續(xù)重新計算線程數(shù)椅寺,通過CAS操作更新線程數(shù))
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
          /**創(chuàng)建worker對象,worker對象在創(chuàng)建的時候會調(diào)用threadFactory創(chuàng)建一個新的線程蒋失,
         新的線程會把worker對象傳入構(gòu)造函數(shù)
         小于核心線程數(shù)的時候返帕,firstTask不為null,提交的任務(wù)會被該線程直接執(zhí)行
         **/
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        //線程池為shutdown狀態(tài)并且沒有向線程池中提交任務(wù)的情況
                        (rs == SHUTDOWN && firstTask == null)) {
                        //檢查線程是否被提前啟動
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                       //把worker添加到workers集合中
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                   //啟動該worker中的線程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
           //如果線程沒有正常啟動
            if (! workerStarted)
                //1.如果workers集合中存在worker篙挽,則移除 2.減少線程數(shù) 3.調(diào)用tryTerminate()方法
                addWorkerFailed(w);
        }
        return workerStarted;
    }

3.3 Step3 調(diào)用runWorker方法

runWorker方法是由每個worker對象中的線程來執(zhí)行荆萤,具體執(zhí)行的任務(wù)放在worker對象中的firstTask中,如果firstTask為null,則會調(diào)用getTask方法從阻塞隊列中獲取任務(wù)链韭,如下圖流程所示:

worker執(zhí)行任務(wù)的模型圖.png

簡單來說偏竟,runWorker就是線程執(zhí)行任務(wù)的地方,但是實際runWorker的設(shè)計是有許多地方的考量敞峭,包括:

  • 線程的回收:線程池狀態(tài)或者參數(shù)的變化時踊谋,已經(jīng)啟動的線程如何被回收,針對于線程池狀態(tài)的改變旋讹,該方法中會獲取一次線程池的狀態(tài)殖蚕,如果是STOPTIDYING沉迹,TERMINATED狀態(tài)睦疫,就會調(diào)用interrupt方法釋放中斷信號;線程池參數(shù)的變化是依賴getTask來實現(xiàn)的鞭呕,只要getTask返回為null蛤育,線程就會跳出while循環(huán),執(zhí)行結(jié)束葫松。例如:默認(rèn)allowCoreThreadTimeOut為false缨伊,阻塞隊列為空,線程數(shù)量大于設(shè)置的核心線程數(shù)之后进宝,在特定時間內(nèi)被回收的邏輯刻坊,或者線程池在運行中,動態(tài)修改核心線程數(shù)之后党晋,線程的是否會回收的邏輯均在getTask方法中谭胚,具體會在后文詳細(xì)分析getTask方法。
  • 擴(kuò)展性:線程執(zhí)行任務(wù)的上下文中穿插hook函數(shù)用以實現(xiàn)擴(kuò)展邏輯未玻,包括beforeExecute方法用以實現(xiàn)執(zhí)行任務(wù)前的邏輯以及afterExecute用以實現(xiàn)執(zhí)行任務(wù)后的邏輯灾而,使用者可以針對這兩個函數(shù)自行擴(kuò)展
  • 異常情況的處理:異常發(fā)生的地方主要有,beforeExecute方法產(chǎn)生的異常扳剿,線程執(zhí)行任務(wù)中產(chǎn)生的異常(最常見)旁趟,afterExecute方法產(chǎn)生的異常,當(dāng)產(chǎn)生異常的時庇绽,completedAbruptly會一直是true锡搜,同時異常會被拋出到開發(fā)者的代碼層面,在后續(xù)執(zhí)行processWorkerExit的方法中調(diào)用addWorker方法瞧掺,產(chǎn)生新的線程來彌補(bǔ)因為異常終止的線程耕餐。

runWorker詳細(xì)流程如下:

runWorker流程.png

源碼參照:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
           //第一次被創(chuàng)建時如果自帶任務(wù)則執(zhí)行該任務(wù),除此之外則從阻塞隊列中獲取
            while (task != null || (task = getTask()) != null) {
               //獲取自身的鎖辟狈,線程池調(diào)用shutdown方法肠缔,終止線程時也會搶奪該鎖
                w.lock();
                //再次判斷線程狀態(tài)夏跷,如果大于STOP狀態(tài),則釋放中斷信號
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                   //執(zhí)行提交任務(wù)前的hook函數(shù)明未,留給用戶自己實現(xiàn)
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    //真正的執(zhí)行提交給線程池的任務(wù)
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                   //執(zhí)行提交任務(wù)后的hook函數(shù)槽华,留給用戶自己實現(xiàn)
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //發(fā)生異常以及獲取task=null時的兜底方法
            processWorkerExit(w, completedAbruptly);
        }
    }

processWorkerExit作為runWorker方法中的重要一環(huán),主要承擔(dān)(cleanup)清理(bookkeeping)簿記dying worker的作用趟妥,dying worker其實就是指因為task==null或者執(zhí)行過程中因為異常走到finally代碼塊中的線程硼莽。執(zhí)行processWorkerExit時,線程池的參數(shù)設(shè)定以及狀態(tài)相對于runWorker方法中的代碼塊煮纵,也有可能是已經(jīng)發(fā)生變化了的,processWorkerExit也會有相應(yīng)的判斷偏螺,線程池的設(shè)計目的本來就是動態(tài)維護(hù)一定的線程數(shù)量行疏,因此但凡涉及到添加線程的操作,都會進(jìn)行狀態(tài)以及參數(shù)的判斷套像。
processWorkerExit參照的源碼如下:

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
       //如果是發(fā)生異常時進(jìn)入到該方法中酿联,先將線程計數(shù)減1
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
          //從workers集合中移除該worker
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
   
        tryTerminate();

        int c = ctl.get();
       //再次對線程池狀態(tài)進(jìn)行判斷
        if (runStateLessThan(c, STOP)) {
           //task==null時進(jìn)入該方法,如果設(shè)置核心線程始終存活夺巩,當(dāng)線程池數(shù)大于設(shè)置的核心 
          //線程數(shù)時贞让,則不需要補(bǔ)充線程;如果設(shè)置核心線程有存活時間并且當(dāng)前阻塞隊列中有
          //任務(wù)柳譬,當(dāng)線程數(shù)大于1時則不需要補(bǔ)充線程喳张,同樣當(dāng)阻塞隊列中沒有任務(wù)時,
          //也不需要補(bǔ)充
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //異常退出時美澳,直接調(diào)用addWorker向線程池補(bǔ)充新的線程销部,無論此時線程池配置
           //是否發(fā)生變化,都會在addWorker校驗
            addWorker(null, false);
        }
    }

3.4 Step4 調(diào)用getTask方法

getTask方法主要用于根據(jù)當(dāng)前線程池配置(線程池中的配置是能在啟動后動態(tài)設(shè)置的)獲取任務(wù)制跟,該過程主要包括獲取任務(wù)的過程中被阻塞以及在特定時間內(nèi)獲取任務(wù)舅桩,如果無法及時獲取到任務(wù)則會直接返回null,getTask返回null后雨膨,在上層的runWorker中就會跳出while循環(huán)擂涛,繼續(xù)往下執(zhí)行直到被回收。當(dāng)無法獲取任務(wù)即return null主要包括以下情況:

  • 最大線程數(shù)減少聊记,線程在運行過程中調(diào)用setMaximumPoolSize可以重新設(shè)置核心線程數(shù)
  • 線程池處于STOP狀態(tài)
  • 線程池處于SHUTDOWN并且阻塞隊列中任務(wù)為0
  • 從阻塞隊列中獲取任務(wù)超時并且超時的worker需要被終止(當(dāng)設(shè)置allowCoreThreadTimeOut為true允許核心線程數(shù)擁有存活時間限制或者當(dāng)前線程池中的線程數(shù)大于核心線程數(shù))

主要流程圖如下:


getTask.png

參照源碼:

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
           //檢查線程池狀態(tài)撒妈,線程池處于STOP狀態(tài)或者線程池處于SHUTDOWN并且阻塞隊列中 
           //任務(wù)為0時減少線程數(shù)量并return null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
           //查看是否設(shè)置了核心線程數(shù)超時或者當(dāng)前線程線程數(shù)大于核心線程數(shù)
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
           //1.如果在上一次循環(huán)中獲取任務(wù)超時,只要當(dāng)前線程池中線程數(shù)量大于1排监,
           //就會回收worker并且return nll(該邏輯就是當(dāng)線程池中線程數(shù)量大于核心線程數(shù)小于最 
           //大線程數(shù)時踩身,空閑狀態(tài)下的線程會被回收的邏輯,同理當(dāng)設(shè)置核心線程數(shù)有存活時間 
           //時社露,核心線程也擁有了被回收掉的邏輯)
           //2.如果當(dāng)前線程數(shù)大于最大線程數(shù)挟阻,也會被回收掉
            if ((wc > maximumPoolSize || (timed && timedOut))
                //兜底策略,不論線程池配置如何設(shè)置,只要阻塞隊列中有任務(wù)附鸽,線程池中至少都要保 
                //留一個線程
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                   //設(shè)定獲取任務(wù)的時間脱拼,超時則在下一次循環(huán)中return null
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                   //無timed邏輯則在此處直接阻塞,直到能獲取到任務(wù)
                    workQueue.take();
               //成功獲取任務(wù)則直接返回
                if (r != null)
                    return r;
               //獲取任務(wù)超時后設(shè)置超時標(biāo)志
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

3.5 拒絕策略

當(dāng)線程池中的阻塞隊列任務(wù)已滿坷备,繼續(xù)提交任務(wù)便會調(diào)用reject方法熄浓,觸發(fā)拒絕策略,四種RejectedExecutionHandler接口的實現(xiàn)如下:

  • AbortPolicy:直接拋出異常
    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
  • CallerRunsPolicy:當(dāng)線程池處于運行狀態(tài)中時省撑,直接在調(diào)用方線程中執(zhí)行提交給線程池的任務(wù)
    /**
     * A handler for rejected tasks that runs the rejected task
     * directly in the calling thread of the {@code execute} method,
     * unless the executor has been shut down, in which case the task
     * is discarded.
     */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
  • DiscardOldestPolicy:當(dāng)線程池處于運行狀態(tài)中時赌蔑,丟棄阻塞隊列中隊首的任務(wù),然后再嘗試向線程池中提交任務(wù)
    /**
     * A handler for rejected tasks that discards the oldest unhandled
     * request and then retries {@code execute}, unless the executor
     * is shut down, in which case the task is discarded.
     */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
}
  • DiscardPolicy:針對丟棄的任務(wù)不做任何的處理
    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

4.線程池的關(guān)閉

處于運行中狀態(tài)的線程池調(diào)用shutdown或許shutdownNow方法就能終止線程竟秫,這兩者方法的主要區(qū)別在于shutdown會將線程池的狀態(tài)置為SHUTDOWN娃惯,該方法會允許阻塞隊列中的
任務(wù)被執(zhí)行完后才會流轉(zhuǎn)到TIDYING狀態(tài),而shutdownNow方法會將線程池的狀態(tài)置為STOP肥败,該方法會直接丟棄阻塞隊列中任務(wù)趾浅。
shutdown方法

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
           //檢查調(diào)用方是否有權(quán)限操作
            checkShutdownAccess();
           //更新線程池狀態(tài)
            advanceRunState(SHUTDOWN);
           //中斷空閑狀態(tài)的線程,調(diào)用worker的tryLock方法馒稍,如果能獲取到說明該線程處于
           //空閑狀態(tài)皿哨,沒有在執(zhí)行任務(wù)
            interruptIdleWorkers();
           //為ScheduledThreadPoolExecutor提供的鉤子函數(shù)
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
       //調(diào)用最后的terminated方法,線程池狀態(tài)變?yōu)門ERMINATED
        tryTerminate();
    }

shutdownNow方法

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
           //設(shè)置線程池狀態(tài)為STOP
            advanceRunState(STOP);
           //遍歷workers集合纽谒,中斷所有線程
            interruptWorkers();
           //返回阻塞隊列中沒有執(zhí)行完的任務(wù)
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        //調(diào)用最后的terminated方法证膨,線程池狀態(tài)變?yōu)門ERMINATED
        tryTerminate();
        return tasks;
    }

5.結(jié)尾

線程池的本質(zhì)就是動態(tài)維護(hù)一定數(shù)量的線程來執(zhí)行任務(wù),采用生產(chǎn)者-消費者模型鼓黔。但是整體分析下來發(fā)現(xiàn)實現(xiàn)其實并不簡單椎例,主要原因有:

  • 線程池狀態(tài)是變化的
  • 線程數(shù)量是變化的
  • 線程池的配置參數(shù)也可能在運行中被外部調(diào)用方法改變,比如最大線程數(shù)请祖,核心線程數(shù)等等

難點在于變化订歪,因此不論是增加線程或者是中斷線程設(shè)置是獲取任務(wù),都離不開狀態(tài)與線程數(shù)的判斷肆捕。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末刷晋,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子慎陵,更是在濱河造成了極大的恐慌眼虱,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件席纽,死亡現(xiàn)場離奇詭異捏悬,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)润梯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進(jìn)店門过牙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來甥厦,“玉大人,你說我怎么就攤上這事寇钉〉陡恚” “怎么了?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵扫倡,是天一觀的道長谦秧。 經(jīng)常有香客問我,道長撵溃,這世上最難降的妖魔是什么疚鲤? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮缘挑,結(jié)果婚禮上集歇,老公的妹妹穿的比我還像新娘。我一直安慰自己卖哎,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布删性。 她就那樣靜靜地躺著亏娜,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蹬挺。 梳的紋絲不亂的頭發(fā)上维贺,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天,我揣著相機(jī)與錄音巴帮,去河邊找鬼溯泣。 笑死,一個胖子當(dāng)著我的面吹牛榕茧,可吹牛的內(nèi)容都是我干的垃沦。 我是一名探鬼主播,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼用押,長吁一口氣:“原來是場噩夢啊……” “哼肢簿!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起蜻拨,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤池充,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后缎讼,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體收夸,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年血崭,在試婚紗的時候發(fā)現(xiàn)自己被綠了卧惜。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片厘灼。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖序苏,靈堂內(nèi)的尸體忽然破棺而出手幢,到底是詐尸還是另有隱情,我是刑警寧澤忱详,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布围来,位于F島的核電站,受9級特大地震影響匈睁,放射性物質(zhì)發(fā)生泄漏监透。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一航唆、第九天 我趴在偏房一處隱蔽的房頂上張望胀蛮。 院中可真熱鬧,春花似錦糯钙、人聲如沸粪狼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽再榄。三九已至,卻和暖如春享潜,著一層夾襖步出監(jiān)牢的瞬間困鸥,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工剑按, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留疾就,地道東北人。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓艺蝴,卻偏偏與公主長得像猬腰,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子猜敢,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,762評論 2 345