Java并發(fā)JUC——ThreadPoolExecutor 深入解析

線程池的作用

● 利用線程池管理并復(fù)用線程、控制最大并發(fā)數(shù)等
既然使用了線程池就需要確保線程池是在復(fù)用的复哆,每次new一個(gè)線程池出來可能比不用線程池還糟糕。如果沒有直接聲明線程池而是使用其他人提供的類庫來獲得一個(gè)線程池吱涉,請(qǐng)務(wù)必查看源碼幅聘,以確認(rèn)線程池的實(shí)例化方式和配置是符合預(yù)期的。

● 實(shí)現(xiàn)任務(wù)線程隊(duì)列緩存策略和拒絕機(jī)制余寥。

● 實(shí)現(xiàn)某些與時(shí)間相關(guān)的功能领铐,如定時(shí)執(zhí)行悯森、周期執(zhí)行等

● 隔離線程環(huán)境
比如,交易服務(wù)和搜索服務(wù)在同一臺(tái)服務(wù)器上绪撵,分別開啟兩個(gè)線程池瓢姻,交易線程的資源消耗明顯要大;因此音诈,通過配置獨(dú)立的線程池幻碱,將較慢的交易服務(wù)與搜索服務(wù)隔離開,避免各服務(wù)線程相互影響改艇。

Java中的線程池是運(yùn)用場(chǎng)景最多的并發(fā)框架收班,幾乎所有需要異步或并發(fā)執(zhí)行任務(wù)的程序 都可以使用線程池。合理地使用線程池能夠帶來3個(gè)好處:

  • 1谒兄、降低資源消耗摔桦。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。

  • 2承疲、提高響應(yīng)速度邻耕。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行燕鸽。

  • 3兄世、提高線程的可管理性。線程是稀缺資源啊研,如果無限制地創(chuàng)建御滩,不僅會(huì)消耗系統(tǒng)資源, 還會(huì)降低系統(tǒng)的穩(wěn)定性党远,使用線程池可以進(jìn)行統(tǒng)一分配削解、調(diào)優(yōu)和監(jiān)控。

線程池的主要處理流程

接口定義和實(shí)現(xiàn)類

類型 名稱 描述
接口 Executor 最上層的接口沟娱,定義了執(zhí)行任務(wù)的方法execute
接口 ExecutorService 繼承了Executor接口氛驮,拓展了Callable、Future济似、關(guān)閉方法
接口 ScheduledExecutorService 繼承了ExecutorService矫废,增加了定時(shí)任務(wù)相關(guān)方法
實(shí)現(xiàn)類 ThreadPoolExecutor 基礎(chǔ)、標(biāo)準(zhǔn)的線程池實(shí)現(xiàn)
實(shí)現(xiàn)類 ScheduledThreadPoolExecutor 繼承了ThreadPoolExecutor砰蠢,實(shí)現(xiàn)了ScheduledExecutorService中相關(guān)定時(shí)任務(wù)的方法

ThreadPoolExecutor 類圖

java中的線程池都是基于ThreadPoolExecutor 來實(shí)現(xiàn)的蓖扑。

可以認(rèn)為ScheduledThreadPoolExecutor是最豐富的實(shí)現(xiàn)類。

ExecutorService 方法定義

public interface ExecutorService extends Executor {

    /**
     * 在之前提交的台舱,需要被執(zhí)行的任務(wù)中赵誓,有序的進(jìn)行關(guān)閉操作,此時(shí)不會(huì)再接受新的任務(wù),已經(jīng)提交的任務(wù)繼續(xù)執(zhí)行
     * 如果此時(shí)所有的任務(wù)已經(jīng)關(guān)閉的話俩功,那么就不會(huì)起到什么效果幻枉,因?yàn)橐呀?jīng)沒有任務(wù)可關(guān)閉了
     */
    void shutdown();

    /**
     * 嘗試關(guān)閉所有正在執(zhí)行的任務(wù),并且中斷正在等待要執(zhí)行的任務(wù)诡蜓,返回一個(gè)包含正在等待的任務(wù)的列表
     * @return
     */
    List<Runnable> shutdownNow();

    /**
     * 如果線程已經(jīng)關(guān)閉了熬甫,就返回true
     * @return
     */
    boolean isShutdown();

    /**
     * 如果所有的線程任務(wù)已經(jīng)關(guān)閉了,就返回true
     * @return
     */
    boolean isTerminated();

    /**
     * 只有當(dāng)所有的任務(wù)都成功執(zhí)行蔓罚,否則會(huì)一直處于阻塞狀態(tài)椿肩,只有當(dāng)以下情況發(fā)生時(shí),才會(huì)中斷阻塞
     * 例如收到一個(gè)關(guān)閉的請(qǐng)求豺谈,或者超時(shí)發(fā)生郑象、或者當(dāng)前的線程被中斷后
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 提交一個(gè)需要返回結(jié)果的任務(wù)去執(zhí)行,返回一個(gè)有結(jié)果的消息體茬末,只有成功執(zhí)行后厂榛,才會(huì)返回結(jié)果
     * @param task
     * @param <T>
     * @return
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 只有當(dāng)任務(wù)成功被執(zhí)行后,才會(huì)返回給定的結(jié)果
     * @param task
     * @param result
     * @param <T>
     * @return
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交一個(gè)Runnable任務(wù)用于執(zhí)行丽惭,和返回代表任務(wù)的Future击奶。
     * Future的get方法成功執(zhí)行后,返回null
     */
    Future<?> submit(Runnable task);

    /**
     * 提交一批任務(wù)责掏,并返回一批任務(wù)的結(jié)果列表
     * @param tasks
     * @param <T>
     * @return
     * @throws InterruptedException
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    /**
     * 執(zhí)行給定的任務(wù)集合柜砾,執(zhí)行完畢或者超時(shí)后,返回結(jié)果换衬,其他任務(wù)終止
     *
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 提交一批任務(wù)信息痰驱,當(dāng)其中一個(gè)成功的執(zhí)行,沒有返回異常的時(shí)候瞳浦,就返回結(jié)果
     * @param tasks
     * @param <T>
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    /**
     * 執(zhí)行給定的任務(wù)集合担映,任意一個(gè)執(zhí)行成功或超時(shí)后,返回結(jié)果术幔,其他任務(wù)終止
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

ScheduledExecutorService

public interface ScheduledExecutorService extends ExecutorService {


    //創(chuàng)建并執(zhí)行一個(gè)一次性任務(wù), 過了延遲時(shí)間就會(huì)被執(zhí)行
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    //創(chuàng)建并執(zhí)行一個(gè)一次性任務(wù), 過了延遲時(shí)間就會(huì)被執(zhí)行
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    //創(chuàng)建并執(zhí)行一個(gè)周期性任務(wù)
    //過了給定的初始延遲時(shí)間,會(huì)第一次被執(zhí)行
    //執(zhí)行過程中發(fā)生了異常,那么任務(wù)就停止
    //一次任務(wù) 執(zhí)行時(shí)長超過了周期時(shí)間湃密,下一次任務(wù)會(huì)等到該次任務(wù)執(zhí)行結(jié)束后诅挑,立刻執(zhí)行,
    //這也是它和scheduleWithFixedDelay的重要區(qū)別
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    //創(chuàng)建并執(zhí)行一個(gè)周期性任務(wù)
    //過了初始延遲時(shí)間泛源,第一次被執(zhí)行拔妥,后續(xù)以給定的周期時(shí)間執(zhí)行
    //執(zhí)行過程中發(fā)生了異常,那么任務(wù)就停止
    //一次任務(wù)執(zhí)行時(shí)長超過了周期時(shí)間,下一次任務(wù)會(huì)在該次任務(wù)執(zhí)行結(jié)束的時(shí)間基礎(chǔ)上达箍,計(jì)算執(zhí)行延時(shí)没龙。
    //對(duì)于超過周期的長時(shí)間處理任務(wù)的不同處理方式,這是它和scheduleAtFixedRate的重要區(qū)別。
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

Executors工具類

可以自己實(shí)例化線程池硬纤,也可用Executors創(chuàng)建線程池的工廠類解滓,推薦自己實(shí)例化線程池。

常用方法

ExecutorService 的抽象類AbstractExecutorService提供了submit筝家、invokeAll 等方法的實(shí)現(xiàn)洼裤,但是核心方法Executor.execute()并沒有在這里實(shí)現(xiàn)。
因?yàn)樗械娜蝿?wù)都在該方法執(zhí)行溪王,不同實(shí)現(xiàn)會(huì)帶來不同的執(zhí)行策略腮鞍。

通過Executors的靜態(tài)工廠方法可以創(chuàng)建三個(gè)線程池的包裝對(duì)象

  • ForkJoinPool
  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor

Executors.newWorkStealingPool
JDK8 引入,創(chuàng)建持有足夠線程的線程池支持給定的并行度莹菱,并通過使用多個(gè)隊(duì)列減少競(jìng)爭(zhēng)移国,構(gòu)造方法中把CPU數(shù)量設(shè)置為默認(rèn)的并行度。
返回ForkJoinPool ( JDK7引入)對(duì)象道伟,它也是AbstractExecutorService 的子類

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

Executors.newCachedThreadPool
創(chuàng)建一個(gè)無界的緩沖線程池迹缀,它的任務(wù)隊(duì)列是一個(gè)同步隊(duì)列。
任務(wù)加入到池中

  • 若池中有空閑線程皱卓,則用空閑線程執(zhí)行
  • 若無裹芝,則創(chuàng)建新線程執(zhí)行
  • 池中的線程空閑超過60秒,將被銷毀娜汁。線程數(shù)隨任務(wù)的多少變化嫂易。適用于執(zhí)行耗時(shí)較小的異步任務(wù)。
  • 線程池的核心線程數(shù)=0掐禁,最大線程數(shù)= Integer.MAX_ _VALUE
  • maximumPoolSize 最大可至Integer.MAX_VALUE怜械,是高度可伸縮的線程池。若達(dá)到該上限傅事,沒有服務(wù)器能夠繼續(xù)工作缕允,直接OOM。
  • keepAliveTime默認(rèn)為60秒;
  • 工作線程處于空閑狀態(tài)蹭越,則回收工作線程障本;如果任務(wù)數(shù)增加,再次創(chuàng)建出新線程處理任務(wù)响鹃。
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Executors.newScheduledThreadPool
能定時(shí)執(zhí)行任務(wù)的線程池驾霜。該池的核心線程數(shù)由參數(shù)指定,線程數(shù)最大至Integer.MAX_ VALUE买置,與上述一樣存在OOM風(fēng)險(xiǎn)粪糙。
ScheduledExecutorService接口的實(shí)現(xiàn)類,支持定時(shí)及周期性任務(wù)執(zhí)行忿项;相比Timer蓉冈、ScheduledExecutorService 更安全城舞,功能更強(qiáng)大。
與newCachedThreadPool的區(qū)別是不回收工作線程寞酿。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

Executors.newSingleThreadExecutor
創(chuàng)建一個(gè)單線程的線程池家夺,相當(dāng)于單線程串行執(zhí)行所有任務(wù),保證按任務(wù)的提交順序依次執(zhí)行熟嫩。
只有1個(gè)線程來執(zhí)行無界任務(wù)隊(duì)列的單-線程池秦踪。該線程池確保任務(wù)按加入的順序一個(gè)一
個(gè)依次執(zhí)行。當(dāng)唯一的線程因任務(wù)異常中止時(shí)掸茅,將創(chuàng)建一個(gè)新的線程來繼續(xù)執(zhí)行后續(xù)的任務(wù)椅邓。
與newFixedThreadPool(1)的區(qū)別在于,單線程池的池大小在newSingleThreadExecutor方法中硬編碼昧狮,不能再改變的景馁。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

Executors.newFixedThreadPool
創(chuàng)建一個(gè)固定大小任務(wù)隊(duì)列容量無界的線程池,輸入的參數(shù)即是固定線程數(shù)逗鸣;既是核心線程數(shù)也是最大線程數(shù)合住;不存在空閑線程,所以keepAliveTime等于0撒璧。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

ThreadPoolExecutor 核心屬性

// 狀態(tài)控制屬性:高3位表示線程池的運(yùn)行狀態(tài)透葛,剩下的29位表示當(dāng)前有效的線程數(shù)量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 線程池的基本大小,當(dāng)提交一個(gè)任務(wù)到線程池時(shí)卿樱,線程池會(huì)創(chuàng)建一個(gè)線程來執(zhí)行任務(wù)僚害,
// 即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也會(huì)創(chuàng)建線程,等到需要執(zhí)行的任務(wù)數(shù)大于
// 線程池基本大小時(shí)就不再創(chuàng)建繁调。如果調(diào)用了線程池的prestartAllCoreThreads()方法萨蚕,
// 線程池會(huì)提前創(chuàng)建并啟動(dòng)所有基本線程。
private volatile int corePoolSize;

// 線程池線程最大數(shù)量蹄胰,如果隊(duì)列滿了岳遥,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),
// 則線程池會(huì)再創(chuàng)建新的線程執(zhí)行任務(wù)裕寨。如果使用了無界的任務(wù)隊(duì)列這個(gè)參數(shù)就沒什么效果浩蓉。
private volatile int maximumPoolSize;

// 用于設(shè)置創(chuàng)建線程的工廠,可以通過線程工廠給每個(gè)創(chuàng)建出來的線程設(shè) 置更有意義的名字宾袜。
private volatile ThreadFactory threadFactory;

// 飽和策略捻艳,默認(rèn)情況下是AbortPolicy。
private volatile RejectedExecutionHandler handler;

// 線程池的工作線程空閑后试和,保持存活的時(shí)間讯泣。如果任務(wù)很多纫普,并且每個(gè)任務(wù)執(zhí)行的時(shí)間比較短阅悍,
// 可以調(diào)大時(shí)間好渠,提高線程的利用率。
private volatile long keepAliveTime;

// 用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列
private final BlockingQueue<Runnable> workQueue;

// 存放工作線程的容器节视,必須獲取到鎖才能訪問
private final HashSet<Worker> workers = new HashSet<Worker>();

// ctl的拆包和包裝
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl狀態(tài)控制屬性拳锚,高3位表示線程池的運(yùn)行狀態(tài)(runState),剩下的29位表示當(dāng)前有效的線程數(shù)量(workerCount)
線程池最大線程數(shù)是(1 << COUNT_BITS) - 1 = 536 870 911

@Native public static final int SIZE = 32;
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

線程池的運(yùn)行狀態(tài)runState

狀態(tài) 解釋
RUNNING 運(yùn)行態(tài)寻行,可處理新任務(wù)并執(zhí)行隊(duì)列中的任務(wù)
SHUTDOW 關(guān)閉態(tài)霍掺,不接受新任務(wù),但處理隊(duì)列中的任務(wù)
STOP 停止態(tài)拌蜘,不接受新任務(wù)杆烁,不處理隊(duì)列中任務(wù),且打斷運(yùn)行中任務(wù)
TIDYING 整理態(tài)简卧,所有任務(wù)已經(jīng)結(jié)束兔魂,workerCount = 0 ,將執(zhí)行terminated()方法
TERMINATED 結(jié)束態(tài)举娩,terminated() 方法已完成

RejectedExecutionHandler(拒絕策略)

  • AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常析校。
  • CallerRunsPolicy:由調(diào)用者所在線程來運(yùn)行任務(wù)。
  • DiscardOldestPolicy:丟棄隊(duì)列最前面(最老)的任務(wù)铜涉,然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)智玻。
  • DiscardPolicy:也是丟棄任務(wù),但是不拋出異常芙代。

核心內(nèi)部類 Worker

private final class Worker  extends AbstractQueuedSynchronizer  implements Runnable {
    // 正在執(zhí)行任務(wù)的線程
    final Thread thread;
    // 線程創(chuàng)建時(shí)初始化的任務(wù)
    Runnable firstTask;
    // 完成任務(wù)計(jì)數(shù)器
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        // 在runWorker方法運(yùn)行之前禁止中斷吊奢,要中斷線程必須先獲取worker內(nèi)部的互斥鎖
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** delegates main run loop to outer runworker  */
    // 直接委托給外部runworker方法
    public void run() {
        runWorker(this);
    }
    
    ...
}

Worker 類將執(zhí)行任務(wù)的線程封裝到了內(nèi)部,在初始化Worker 的時(shí)候链蕊,會(huì)調(diào)用ThreadFactory初始化新線程事甜;Worker 繼承了AbstractQueuedSynchronizer,在內(nèi)部實(shí)現(xiàn)了一個(gè)互斥鎖滔韵,主要目的是控制工作線程的中斷狀態(tài)逻谦。

線程的中斷一般是由其他線程發(fā)起的,比如ThreadPoolExecutor#interruptIdleWorkers(boolean)方法陪蜻,它在調(diào)用過程中會(huì)去中斷worker內(nèi)部的工作線程邦马,Work的互斥鎖可以保證正在執(zhí)行的任務(wù)不被打斷。它是怎么保證的呢宴卖?在線程真正執(zhí)行任務(wù)的時(shí)候滋将,也就是runWorker方法被調(diào)用時(shí),它會(huì)先獲取到Work的鎖症昏,當(dāng)我們?cè)谄渌€程需要中斷當(dāng)前線程時(shí)也需要獲取到work的互斥鎖随闽,否則不能中斷。

構(gòu)造函數(shù)

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

通過構(gòu)造函數(shù)我們可以發(fā)現(xiàn)肝谭,構(gòu)造函數(shù)就是在對(duì)線程池核心屬性進(jìn)行賦值掘宪,下面我們來介紹一下這些核心屬性:

  • corePoolSize:核心線程數(shù)
  • maximumPoolSize:線程池最大數(shù)量
  • keepAliveTime:線程池的工作線程空閑后蛾扇,保持存活的時(shí)間。
  • unit:線程活動(dòng)保持時(shí)間的單位魏滚。
  • workQueue:用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列镀首。
  • threadFactory:用于設(shè)置創(chuàng)建線程的工廠。
  • handler:飽和策略鼠次,默認(rèn)情況下是AbortPolicy更哄。

execute() 提交線程

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 獲取控制的值
    int c = ctl.get();
    // 判斷工作線程數(shù)是否小于corePoolSize
    if (workerCountOf(c) < corePoolSize) {
        // 新創(chuàng)建核心線程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 工作線程數(shù)大于或等于corePoolSize
    // 判斷線程池是否處于運(yùn)行狀態(tài),如果是將任務(wù)command入隊(duì)
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次檢查線程池的運(yùn)行狀態(tài)腥寇,如果不在運(yùn)行中成翩,那么將任務(wù)從隊(duì)列里面刪除,并嘗試結(jié)束線程池
        if (! isRunning(recheck) && remove(command))
            // 調(diào)用驅(qū)逐策略
            reject(command);
        // 檢查活躍線程總數(shù)是否為0
        else if (workerCountOf(recheck) == 0)
            // 新創(chuàng)建非核心線程
            addWorker(null, false);
    }
    // 隊(duì)列滿了赦役,新創(chuàng)建非核心線程
    else if (!addWorker(command, false))
        // 調(diào)用驅(qū)逐策略
        reject(command);
}

該方法是沒有返回值的

addWorker() 新創(chuàng)建線程

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 僅在必要的時(shí)候檢查隊(duì)列是否為NULL
        // 檢查隊(duì)列是否處于非運(yùn)行狀態(tài)
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 獲取活躍線程數(shù)
            int wc = workerCountOf(c);
            // 判斷線程是否超過最大值捕传,當(dāng)隊(duì)列滿了則驗(yàn)證線程數(shù)是否大于maximumPoolSize,
            // 沒有滿則驗(yàn)證corePoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 增加活躍線程總數(shù)扩劝,否則重試
            if (compareAndIncrementWorkerCount(c))
                // 如果成功跳出外層循環(huán)
                break retry;
            c = ctl.get();  // Re-read ctl
            // 再次校驗(yàn)一下線程池運(yùn)行狀態(tài)
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // 工作線程是否啟動(dòng)
    boolean workerStarted = false;
    // 工作線程是否創(chuàng)建
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 新創(chuàng)建線程
        w = new Worker(firstTask);
        // 獲取新創(chuàng)建的線程
        final Thread t = w.thread;
        if (t != null) {
            // 創(chuàng)建線程要獲得全局鎖
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // 檢查線程池的運(yùn)行狀態(tài)
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 檢查線程的狀態(tài)
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 將新建工作線程存放到容器
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize) {
                        // 跟蹤線程池最大的工作線程總數(shù)
                        largestPoolSize = s;
                    }
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 啟動(dòng)工作線程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // 啟動(dòng)新的工作線程失敗庸论,
            // 1. 將工作線程移除workers容器
            // 2. 還原工作線程總數(shù)(workerCount)
            // 3. 嘗試結(jié)束線程
            addWorkerFailed(w);
    }
    return workerStarted;
}

如果啟動(dòng)新線程失敗那么addWorkerFailed()這個(gè)方法將做以下三件事:

  • 1、將工作線程移除workers容器
  • 2棒呛、還原工作線程總數(shù)(workerCount)
  • 3聂示、嘗試結(jié)束線程

execute() 執(zhí)行過程

  • 1、如果當(dāng)前運(yùn)行的線程少于corePoolSize簇秒,即使有空閑線程也會(huì)創(chuàng)建新線程來執(zhí)行任務(wù)鱼喉,(注意,執(zhí)行這一步驟 需要獲取全局鎖)趋观。如果調(diào)用了線程池的restartAllCoreThreads()方法扛禽, 線程池會(huì)提前創(chuàng)建并啟動(dòng)所有基本線程。

  • 2皱坛、如果運(yùn)行的線程等于或多于corePoolSize编曼,則將任務(wù)加入BlockingQueue。

  • 3剩辟、如果無法將任務(wù)加入BlockingQueue(隊(duì)列已滿)掐场,則創(chuàng)建新的線程來處理任務(wù)(注意,執(zhí) 行這一步驟需要獲取全局鎖)贩猎。

  • 4熊户、如果創(chuàng)建新線程將使當(dāng)前運(yùn)行的線程超出maximumPoolSize,任務(wù)將被拒絕吭服,并調(diào)用 RejectedExecutionHandler.rejectedExecution()方法嚷堡。

線程任務(wù)的執(zhí)行

線程的正在執(zhí)行是ThreadPoolExecutor.Worker#run()方法,但是這個(gè)方法直接委托給了外部的runWorker()方法艇棕,源碼如下:

// 直接委托給外部runworker方法
public void run() {
    runWorker(this);
}

runWorker() 執(zhí)行任務(wù)

final void runWorker(Worker w) {
    // 當(dāng)前Work中的工作線程
    Thread wt = Thread.currentThread();
    // 獲取初始任務(wù)
    Runnable task = w.firstTask;
    // 初始任務(wù)置NULL(表示不是建線程)
    w.firstTask = null;
    // 修改鎖的狀態(tài)蝌戒,使需發(fā)起中斷的線程可以獲取到鎖(使工作線程可以響應(yīng)中斷)
    w.unlock(); // allow interrupts
    // 工作線程是否是異常結(jié)束
    boolean completedAbruptly = true;
    try {
        // 循環(huán)的從隊(duì)列里面獲取任務(wù)
        while (task != null || (task = getTask()) != null) {
            // 每次執(zhí)行任務(wù)時(shí)需要獲取到內(nèi)置的互斥鎖
            w.lock();
            // 1. 當(dāng)前工作線程不是中斷狀態(tài)绿饵,且線程池是STOP,TIDYING,TERMINATED狀態(tài),我們需要中斷當(dāng)前工作線程
            // 2. 當(dāng)前工作線程是中斷狀態(tài)瓶颠,且線程池是STOP,TIDYING,TERMINATED狀態(tài),我們需要中斷當(dāng)前工作線程
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                    && !wt.isInterrupted())
                // 中斷線程刺桃,中斷標(biāo)志位設(shè)置成true
                wt.interrupt();
            try {
                // 執(zhí)行任務(wù)前置方法,擴(kuò)展用
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 執(zhí)行任務(wù)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 執(zhí)行任務(wù)后置方法,擴(kuò)展用
                    afterExecute(task, thrown);
                }
            } finally {
                // 任務(wù)NULL表示已經(jīng)處理了
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 將工作線程從容器中剔除
        processWorkerExit(w, completedAbruptly);
    }
}

正在執(zhí)行線程的方法粹淋,執(zhí)行流程:

  • 1、獲取到當(dāng)前的工作線程

  • 2瑟慈、獲取初始化的線程任務(wù)

  • 3桃移、修改鎖的狀態(tài),使工作線程可以響應(yīng)中斷

  • 4葛碧、獲取工作線程的鎖(保證在任務(wù)執(zhí)行過程中工作線程不被外部線程中斷)借杰,如果獲取到的任務(wù)是NULL,則結(jié)束當(dāng)前工作線程

  • 5进泼、判斷先測(cè)試狀態(tài)蔗衡,看是否需要中斷當(dāng)前工作線程

  • 6、執(zhí)行任務(wù)前置方法beforeExecute(wt, task);

  • 7乳绕、執(zhí)行任務(wù)(執(zhí)行提交到線程池的線程)task.run();

  • 8召夹、執(zhí)行任務(wù)后置方法afterExecute(task, thrown);跪楞,處理異常信息

  • 9、修改完成任務(wù)的總數(shù)

  • 10、解除當(dāng)前工作線程的鎖

  • 11类少、獲取隊(duì)列里面的任務(wù),循環(huán)第4步

  • 12艺蝴、將工作線程從容器中剔除

  • wt.isInterrupted():獲取中斷狀態(tài)腿准,無副作用
  • Thread.interrupted():獲取中斷狀態(tài),并將中斷狀態(tài)恢重置成false(不中斷)
  • beforeExecute(wt, task);:執(zhí)行任務(wù)前置方法滓鸠,擴(kuò)展用雁乡。如果這個(gè)方法在執(zhí)行過程中拋出異常,那么會(huì)導(dǎo)致當(dāng)前工作線程直接死亡而被回收糜俗,工作線程異常結(jié)束標(biāo)記位completedAbruptly被設(shè)置成true蔗怠,任務(wù)線程不能被執(zhí)行
  • task.run();:執(zhí)行任務(wù)
  • afterExecute(task, thrown);:執(zhí)行任務(wù)后置方法,擴(kuò)展用吩跋。這個(gè)方法可以收集到任務(wù)運(yùn)行的異常信息寞射,這個(gè)方法如果有異常拋出,也會(huì)導(dǎo)致當(dāng)前工作線程直接死亡而被回收锌钮,工作線程異常結(jié)束標(biāo)記位completedAbruptly被設(shè)置成true
  • 任務(wù)運(yùn)行過程中的異常信息除了RuntimeException以外桥温,其他全部封裝成Error,然后被afterExecute方法收集
  • terminated()這也是一個(gè)擴(kuò)展方法梁丘,在線程池結(jié)束的時(shí)候調(diào)用

getTask() 獲取任務(wù)

private Runnable getTask() {
    // 記錄最后一次獲取任務(wù)是不是超時(shí)了
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        // 獲取線程池狀態(tài)
        int rs = runStateOf(c);

        // 線程池是停止?fàn)顟B(tài)或者狀態(tài)是關(guān)閉并且隊(duì)列為空
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 扣減工作線程總數(shù)
            decrementWorkerCount();
            return null;
        }
        // 獲取工作線程總數(shù)
        int wc = workerCountOf(c);

        // 工作線程是否需要剔除
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 扣減工作線程總數(shù)
            if (compareAndDecrementWorkerCount(c))
                // 剔除工作線程侵浸,當(dāng)返回為NULL的時(shí)候旺韭,runWorker方法的while循環(huán)會(huì)結(jié)束
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

getTask() 阻塞或定時(shí)獲取任務(wù)。當(dāng)該方法返回NULL時(shí)掏觉,當(dāng)前工作線程會(huì)結(jié)束区端,最后被回收,下面是返回NULL的幾種情況:

  • 1澳腹、當(dāng)前工作線程總數(shù)wc大于maximumPoolSize最大工作線程總數(shù)织盼。maximumPoolSize可能被setMaximumPoolSize方法改變。
  • 2酱塔、當(dāng)線程池處于停止?fàn)顟B(tài)時(shí)沥邻。
  • 3、當(dāng)線程池處于關(guān)閉狀態(tài)且阻塞隊(duì)列為空羊娃。
  • 4唐全、當(dāng)前工作線程超時(shí)等待任務(wù),并且當(dāng)前工作線程總數(shù)wc大于corePoolSize或者allowCoreThreadTimeOut=true允許核心線程超時(shí)被回收蕊玷,默認(rèn)是false邮利。

processWorkerExit() 工作線程結(jié)束

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 判斷是否是異常情況導(dǎo)致工作線程被回收
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        // 如果是扣減工作線程總數(shù),如果不是在getTask()方法就已經(jīng)扣減了
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 將當(dāng)前工作線程完成任務(wù)的總數(shù)加到completedTaskCount標(biāo)志位上
        completedTaskCount += w.completedTasks;
        // 剔除當(dāng)前工作線程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 嘗試結(jié)束線程池
    tryTerminate();

    // 判刑是否需要新實(shí)例化工程線程
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

剔除線程流程:

  • 1垃帅、判斷是否是異常情況導(dǎo)致工作線程被回收近弟,如果是workerCount--
  • 2、獲取到全局鎖
  • 3挺智、將當(dāng)前工作線程完成任務(wù)的總數(shù)加到completedTaskCount標(biāo)志位上
  • 4祷愉、剔除工作線程
  • 5、解鎖
  • 6赦颇、嘗試結(jié)束線程池tryTerminate()
  • 7二鳄、判斷是否需要重新實(shí)例化工程線程放到workers容器

結(jié)束線程池

shutdown() 關(guān)閉線程池

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 檢查權(quán)限
        checkShutdownAccess();
        // 設(shè)置線程池狀態(tài)為關(guān)閉
        advanceRunState(SHUTDOWN);
        // 中斷線程
        interruptIdleWorkers();
        // 擴(kuò)展方法
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 嘗試結(jié)束線池
    tryTerminate();
}
  • 1、通過遍歷工作線程容器workers媒怯,然后逐個(gè)中斷工作線程订讼,如果無法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無法終止。
  • 2扇苞、shutdown只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài)欺殿,然后中斷所有沒有正在執(zhí)行任務(wù)的線程。

shutdown() 關(guān)閉線程池

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 檢查權(quán)限
        checkShutdownAccess();
        // 設(shè)置線程池狀態(tài)為停止?fàn)顟B(tài)
        advanceRunState(STOP);
        // 中斷線程
        interruptIdleWorkers();
        // 將所有任務(wù)移動(dòng)到list容器
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    // 嘗試結(jié)束線池
    tryTerminate();
    // 返回所有未執(zhí)行的任務(wù)
    return tasks;
}
  • 1鳖敷、通過遍歷工作線程容器workers脖苏,然后逐個(gè)中斷工作線程,如果無法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無法終止定踱。
  • 2棍潘、shutdownNow首先將線程池的狀態(tài)設(shè)置成 STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表亦歉。

tryTerminate() 嘗試結(jié)束線程池

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        //  判斷是否在運(yùn)行中,如果是直接返回
        if (isRunning(c) ||
            // 判斷是否進(jìn)入整理狀態(tài)恤浪,如果進(jìn)入了直接返回
            runStateAtLeast(c, TIDYING) ||
            // 如果是狀態(tài)是關(guān)閉并且隊(duì)列非空,也直接返回(關(guān)閉狀態(tài)需要等到隊(duì)列里面的線程處理完)
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 判斷工作線程是否都關(guān)閉了
        if (workerCountOf(c) != 0) { // Eligible to terminate
            // 中斷空閑線程
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 將狀態(tài)替換成整理狀態(tài)
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 整理發(fā)放執(zhí)行
                    terminated();
                } finally {
                    // 狀態(tài)替換成結(jié)束狀態(tài)
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

結(jié)束線程池大致流程為:

  • 1肴楷、判斷是否在運(yùn)行中水由,如果是則不結(jié)束線程
  • 2、判斷是否進(jìn)入整理狀態(tài)赛蔫,如果是也不用執(zhí)行后面內(nèi)容了
  • 3砂客、判斷如果線程池是關(guān)閉狀態(tài)并且隊(duì)列非空,則不結(jié)束線程池(關(guān)閉狀態(tài)需要等到隊(duì)列里面的線程處理完)
  • 4濒募、判斷工作線程是否都關(guān)閉了,如果沒有就發(fā)起中斷工作線程的請(qǐng)求
  • 5圾结、獲取全局鎖將線程池狀態(tài)替換成整理狀態(tài)
  • 6瑰剃、調(diào)用terminated();擴(kuò)展方法(這也是一個(gè)擴(kuò)展方法,在線程池結(jié)束的時(shí)候調(diào)用)
  • 7筝野、將線程池狀態(tài)替換成結(jié)束狀態(tài)
  • 8晌姚、解除全局鎖

注意:

  • 1、我們可以通過的shutdown或shutdownNow方法來結(jié)束線程池歇竟。他們都是通過遍歷工作線程容器挥唠,然后逐個(gè)中斷工作線程,所以無法響應(yīng)中斷的任務(wù) 可能永遠(yuǎn)無法終止焕议。
  • 2宝磨、shutdown和shutdownNow的區(qū)別在于:shutdownNow首先將線程池的狀態(tài)設(shè)置成 STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程盅安,并返回等待執(zhí)行任務(wù)的列表唤锉;而 shutdown只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒有正在執(zhí)行任務(wù)的線程别瞭。
  • 3窿祥、只要調(diào)用了shutdown和shutdownNow那么isShutdown方法就會(huì)返回true。
  • 4蝙寨、當(dāng)所有的任務(wù)都已關(guān)閉后晒衩,才表示線程池關(guān)閉成功,這時(shí)調(diào)用isTerminaed方法會(huì)返回true墙歪。

線程池的監(jiān)控

通過擴(kuò)展線程池進(jìn)行監(jiān)控听系。可以通過繼承線程池來自定義線程池虹菲,重寫線程池的 beforeExecute跛锌、afterExecute和terminated方法,也可以在任務(wù)執(zhí)行前、執(zhí)行后和線程池關(guān)閉前執(zhí) 行一些代碼來進(jìn)行監(jiān)控髓帽。例如菠赚,監(jiān)控任務(wù)的平均執(zhí)行時(shí)間、最大執(zhí)行時(shí)間和最小執(zhí)行時(shí)間等郑藏。這幾個(gè)方法在線程池里是空方法衡查。

getTaskCount()

public long getTaskCount() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        long n = completedTaskCount;
        for (Worker w : workers) {
            n += w.completedTasks;
            if (w.isLocked())
                ++n;
        }
        return n + workQueue.size();
    } finally {
        mainLock.unlock();
    }
}

獲取線程池需要執(zhí)行的任務(wù)數(shù)量”馗牵總數(shù)=已經(jīng)結(jié)束線工作程完成的任務(wù)數(shù)(completedTaskCount) + 還未結(jié)束線程工作線程完成的任務(wù)數(shù)(w.completedTasks)+正在執(zhí)行的任務(wù)數(shù)(w.isLocked())+還未執(zhí)行的任務(wù)數(shù)(workQueue.size())

getCompletedTaskCount()

public long getCompletedTaskCount() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        long n = completedTaskCount;
        for (Worker w : workers)
            n += w.completedTasks;
        return n;
    } finally {
        mainLock.unlock();
    }
}

獲取線程池在運(yùn)行過程中已完成的任務(wù)數(shù)量拌牲。總數(shù)=已經(jīng)結(jié)束線工作程完成的任務(wù)數(shù)(completedTaskCount) + 還未結(jié)束線程工作線程完成的任務(wù)數(shù)(w.completedTasks)

getLargestPoolSize()

public int getLargestPoolSize() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        return largestPoolSize;
    } finally {
        mainLock.unlock();
    }
}

獲取線程池里曾經(jīng)創(chuàng)建過的最大線程數(shù)量歌粥。通過這個(gè)數(shù)據(jù)可以知道線程池是 否曾經(jīng)滿過塌忽。如該數(shù)值等于線程池的最大大小,則表示線程池曾經(jīng)滿過失驶。

getPoolSize()

public int getPoolSize() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Remove rare and surprising possibility of
        // isTerminated() && getPoolSize() > 0
        return runStateAtLeast(ctl.get(), TIDYING) ? 0
            : workers.size();
    } finally {
        mainLock.unlock();
    }
}

獲取線程池的線程數(shù)量土居。如果線程池不銷毀的話,線程池里的線程不會(huì)自動(dòng)銷 毀嬉探,所以這個(gè)大小只增不減擦耀。

getActiveCount()

public int getActiveCount() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        int n = 0;
        for (Worker w : workers)
            if (w.isLocked())
                ++n;
        return n;
    } finally {
        mainLock.unlock();
    }
}

獲取活動(dòng)的線程數(shù)。

合理地配置線程池

要想合理地配置線程池涩堤,就必須首先分析任務(wù)特性眷蜓,可以從以下幾個(gè)角度來分析。

  • 任務(wù)的性質(zhì):CPU密集型任務(wù)胎围、IO密集型任務(wù)和混合型任務(wù)吁系。
  • 任務(wù)的優(yōu)先級(jí):高、中和低白魂。
  • 任務(wù)的執(zhí)行時(shí)間:長垮抗、中和短。
  • 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源碧聪,如數(shù)據(jù)庫連接冒版。

性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。CPU密集型任務(wù)應(yīng)配置盡可能小的 線程逞姿,如配置Ncpu+1個(gè)線程的線程池辞嗡。由于IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配 置盡可能多的線程滞造,如2*Ncpu续室。混合型的任務(wù)谒养,如果可以拆分挺狰,將其拆分成一個(gè)CPU密集型任務(wù) 和一個(gè)IO密集型任務(wù),只要這兩個(gè)任務(wù)執(zhí)行的時(shí)間相差不是太大,那么分解后執(zhí)行的吞吐量 將高于串行執(zhí)行的吞吐量丰泊。如果這兩個(gè)任務(wù)執(zhí)行時(shí)間相差太大薯定,則沒必要進(jìn)行分解。

優(yōu)先級(jí)不同的任務(wù)可以使用優(yōu)先級(jí)隊(duì)列PriorityBlockingQueue來處理瞳购。它可以讓優(yōu)先級(jí)高 的任務(wù)先執(zhí)行话侄。

  • 如果一直有優(yōu)先級(jí)高的任務(wù)提交到隊(duì)列里,那么優(yōu)先級(jí)低的任務(wù)可能永遠(yuǎn)不能 執(zhí)行学赛。
  • 可以通過 Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個(gè)數(shù)年堆。
  • 建議使用有界隊(duì)列。有界隊(duì)列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力盏浇,可以根據(jù)需要設(shè)大一點(diǎn) 兒变丧,比如幾千。無界隊(duì)列在某些異常情況下可能會(huì)撐爆內(nèi)存绢掰。

N核服務(wù)器痒蓬,通過執(zhí)行業(yè)務(wù)的單線程分析出本地計(jì)算時(shí)間為x,等待時(shí)間為y曼月,則工作線程數(shù)(線程池線程數(shù))設(shè)置為 N*(x+y)/x谊却,能讓CPU的利用率最大化柔昼,詳情可以參考線程數(shù)究竟設(shè)多少合理哑芹。

線程池的參數(shù)動(dòng)態(tài)調(diào)整

線程執(zhí)行流程,一圖勝千言:


阻塞隊(duì)列成員表捕透,一覽無余:


線程池使用面臨的核心的問題在于:線程池的參數(shù)并不好配置聪姿。

  • 一方面線程池的運(yùn)行機(jī)制不是很好理解,配置合理需要強(qiáng)依賴開發(fā)人員的個(gè)人經(jīng)驗(yàn)和知識(shí)乙嘀;

  • 另一方面末购,線程池執(zhí)行的情況和任務(wù)類型相關(guān)性較大,IO密集型和CPU密集型的任務(wù)運(yùn)行起來的情況差異非常大虎谢。

這導(dǎo)致業(yè)界并沒有一些成熟的經(jīng)驗(yàn)策略幫助開發(fā)人員參考盟榴。

線程池參數(shù)動(dòng)態(tài)化

盡管經(jīng)過謹(jǐn)慎的評(píng)估,仍然不能夠保證一次計(jì)算出來合適的參數(shù)婴噩,那么我們是否可以將修改線程池參數(shù)的成本降下來擎场,這樣至少可以發(fā)生故障的時(shí)候可以快速調(diào)整從而縮短故障恢復(fù)的時(shí)間?

基于這個(gè)思考几莽,我們是否可以將線程池的參數(shù)從代碼中遷移到分布式配置中心上迅办,實(shí)現(xiàn)線程池參數(shù)可動(dòng)態(tài)配置和即時(shí)生效,線程池參數(shù)動(dòng)態(tài)化前后的參數(shù)修改流程對(duì)比如下:

現(xiàn)有的解決方案的痛點(diǎn)章蚣。

現(xiàn)在市面上大多數(shù)的答案都是先區(qū)分線程池中的任務(wù)是 IO 密集型還是 CPU 密集型站欺。

CPU 密集型的任務(wù)

可以把核心線程數(shù)設(shè)置為核心數(shù)+1。

《Java并發(fā)編程實(shí)戰(zhàn)》一書中給出的原因是:即使當(dāng)計(jì)算(CPU)密集型的線程偶爾由于頁缺失故障或者其他原因而暫停時(shí),這個(gè)“額外”的線程也能確保 CPU 的時(shí)鐘周期不會(huì)被浪費(fèi)矾策。即可以理解為一個(gè)備份的線程磷账。

這個(gè)地方還有個(gè)需要注意的小點(diǎn)就是,如果你的服務(wù)器上部署的不止一個(gè)應(yīng)用蝴韭,你就得考慮其他的應(yīng)用的線程池配置情況够颠。

包含 IO 操作的任務(wù)

《Java并發(fā)編程實(shí)戰(zhàn)》一書中給出的計(jì)算方式是這樣的:


  • 第一個(gè)就是上面說的,和實(shí)際業(yè)務(wù)場(chǎng)景有所偏離榄鉴。

  • 第二個(gè)設(shè)置為 2*CPU 核心數(shù)履磨,有點(diǎn)像是把任務(wù)都當(dāng)做 IO 密集型去處理了。而且一個(gè)項(xiàng)目里面一般來說不止一個(gè)自定義線程池吧庆尘?比如有專門處理數(shù)據(jù)上送的線程池剃诅,有專門處理查詢請(qǐng)求的線程池,這樣去做一個(gè)簡(jiǎn)單的線程隔離驶忌。但是如果都用這樣的參數(shù)配置的話矛辕,顯然是不合理的。

  • 第三個(gè)不說了付魔,理想狀態(tài)聊品。流量是不可能這么均衡的,就拿美團(tuán)來說几苍,下午3翻屈,4點(diǎn)的流量,和 12 點(diǎn)左右午飯時(shí)的流量相差很大妻坝。

動(dòng)態(tài)更新的工作原理

ThreadPoolExecutor#setCorePoolSize

public class ThreadPoolExecutor extends AbstractExecutorService {

    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
            interruptIdleWorkers();
        else if (delta > 0) {
            // We don't really know how many new threads are "needed".
            // As a heuristic, prestart enough new workers (up to new
            // core size) to handle the current number of tasks in
            // queue, but stop if queue becomes empty while doing so.
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }
}

在運(yùn)行期線程池使用方調(diào)用此方法設(shè)置corePoolSize之后伸眶,線程池會(huì)直接覆蓋原來的corePoolSize值,并且基于當(dāng)前值和原始值的比較結(jié)果采取不同的處理策略刽宪。

對(duì)于當(dāng)前值小于當(dāng)前工作線程數(shù)的情況厘贼,說明有多余的worker線程,此時(shí)會(huì)向當(dāng)前空閑的worker線程發(fā)起中斷請(qǐng)求以實(shí)現(xiàn)回收圣拄,多余的worker在下次空閑的時(shí)候也會(huì)被回收嘴秸;

對(duì)于當(dāng)前值大于原始值且當(dāng)前隊(duì)列中有待執(zhí)行任務(wù),則線程池會(huì)創(chuàng)建新的worker線程來執(zhí)行隊(duì)列任務(wù)庇谆,setCorePoolSize具體流程如下:

  • Spring 的 ThreadPoolTaskExecutor類
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
        implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
    private final Object poolSizeMonitor = new Object();

    private int corePoolSize = 1;

    private int maxPoolSize = Integer.MAX_VALUE;

    private int keepAliveSeconds = 60;

    private int queueCapacity = Integer.MAX_VALUE;

    private boolean allowCoreThreadTimeOut = false;

    @Nullable
    private TaskDecorator taskDecorator;

    @Nullable
    private ThreadPoolExecutor threadPoolExecutor;
    
    /**
     * Set the ThreadPoolExecutor's core pool size.
     * Default is 1.
     * <p><b>This setting can be modified at runtime, for example through JMX.</b>
     */ 
    public void setCorePoolSize(int corePoolSize) {
        synchronized (this.poolSizeMonitor) {
            this.corePoolSize = corePoolSize;
            if (this.threadPoolExecutor != null) {
                this.threadPoolExecutor.setCorePoolSize(corePoolSize);
            }
        }
    }
}

Spring 的 ThreadPoolTaskExecutor類 (就是對(duì)JDK ThreadPoolExecutor 的一層包裝岳掐,可以理解為裝飾者模式)的 setCorePoolSize 方法: 注釋上寫的清清楚楚,可以在線程池運(yùn)行時(shí)修改該參數(shù)族铆。

JDK 的源碼岩四,其實(shí)源碼也體現(xiàn)出了有修改的含義的,兩個(gè)值去做差值哥攘,只是第一次設(shè)置的時(shí)候原來的值為 0 而已剖煌。

ThreadPoolExecutor#setMaximumPoolSize

public class ThreadPoolExecutor extends AbstractExecutorService {

    public void setMaximumPoolSize(int maximumPoolSize) {
        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
            throw new IllegalArgumentException();
        this.maximumPoolSize = maximumPoolSize;
        if (workerCountOf(ctl.get()) > maximumPoolSize)
            interruptIdleWorkers();
    }
}

這個(gè)地方就很簡(jiǎn)單了材鹦,邏輯不太復(fù)雜。

  • 1.首先是參數(shù)合法性校驗(yàn)耕姊。

  • 2.然后用傳遞進(jìn)來的值桶唐,覆蓋原來的值。

  • 3.判斷工作線程是否是大于最大線程數(shù)茉兰,如果大于尤泽,則對(duì)空閑線程發(fā)起中斷請(qǐng)求。

經(jīng)過前面兩個(gè)方法的分析规脸,我們知道了最大線程數(shù)和核心線程數(shù)可以動(dòng)態(tài)調(diào)整坯约。

注意:
當(dāng) allowCoreThreadTimeOut 參數(shù)設(shè)置為 true 的時(shí)候,核心線程在空閑了 keepAliveTime 的時(shí)間后也會(huì)被回收的莫鸭,相當(dāng)于線程池自動(dòng)給你動(dòng)態(tài)修改了闹丐。

如何動(dòng)態(tài)指定隊(duì)列長度

ThreadPoolExecutor并沒有提供動(dòng)態(tài)設(shè)置隊(duì)列長度的set方法。

查看LinkedBlockingQueue的源碼發(fā)現(xiàn)被因,capacity是final修飾的卿拴。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
        
    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;
    
}   

要想實(shí)現(xiàn)隊(duì)列長度動(dòng)態(tài)調(diào)整,可以自定義隊(duì)列梨与。

可以把 LinkedBlockingQueue代碼粘貼一份出來堕花,修改個(gè)名字,然后把 Capacity 參數(shù)的 final 修飾符去掉粥鞋,并提供其對(duì)應(yīng)的 get/set 方法缘挽。

這個(gè)過程中涉及到的面試題

  • 問題一:線程池被創(chuàng)建后里面有線程嗎?如果沒有的話陷虎,你知道有什么方法對(duì)線程池進(jìn)行預(yù)熱嗎到踏?

線程池被創(chuàng)建后如果沒有任務(wù)過來杠袱,里面是不會(huì)有線程的尚猿。如果需要預(yù)熱的話可以調(diào)用下面的兩個(gè)方法:

  • 全部啟動(dòng):
public class ThreadPoolExecutor extends AbstractExecutorService {

    /**
     * Starts all core threads, causing them to idly wait for work. This
     * overrides the default policy of starting core threads only when
     * new tasks are executed.
     *
     * @return the number of threads started
     */
    public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }
}
  • 僅啟動(dòng)一個(gè):
public class ThreadPoolExecutor extends AbstractExecutorService {

    /**
     * Starts a core thread, causing it to idly wait for work. This
     * overrides the default policy of starting core threads only when
     * new tasks are executed. This method will return {@code false}
     * if all core threads have already been started.
     *
     * @return {@code true} if a thread was started
     */
    public boolean prestartCoreThread() {
        return workerCountOf(ctl.get()) < corePoolSize &&
            addWorker(null, true);
    }
}
  • 問題二:核心線程數(shù)會(huì)被回收嗎?需要什么設(shè)置楣富?

核心線程數(shù)默認(rèn)是不會(huì)被回收的凿掂,如果需要回收核心線程數(shù),需要調(diào)用下面的方法:


allowCoreThreadTimeOut 該值默認(rèn)為 false纹蝴。

public class ThreadPoolExecutor extends AbstractExecutorService {

    /**
     * If false (default), core threads stay alive even when idle.
     * If true, core threads use keepAliveTime to time out waiting
     * for work.
     */
    private volatile boolean allowCoreThreadTimeOut;
}

線程池的參數(shù)動(dòng)態(tài)調(diào)整具體操作

添加maven依賴

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.2.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
    </dependency>

    <dependency>
        <groupId>com.purgeteam</groupId>
        <artifactId>dynamic-config-spring-boot-starter</artifactId>
        <version>0.1.1.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.22</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.79</version>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-dependencies</artifactId>
            <version>2.2.5.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

dynamic-config-spring-boot-starter就是動(dòng)態(tài)監(jiān)聽配置變化的關(guān)鍵依賴

@EnableDynamicConfigEvent

簡(jiǎn)介:開啟這個(gè)特性注解庄萎,具備配置推送更新監(jiān)聽能力。

啟動(dòng)類添加 @EnableDynamicConfigEvent 注解開啟配置變化監(jiān)聽功能塘安。

@SpringBootApplication
@EnableDiscoveryClient
@EnableDynamicConfigEvent
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

編寫事件接收器

創(chuàng)建 NacosListener(名稱隨意) 實(shí)現(xiàn) ApplicationListener<ActionConfigEvent>#onApplicationEvent 方法

@Slf4j
@Component
public class NacosListener implements ApplicationListener<ActionConfigEvent> {

    @Autowired
    private ThreadPoolService threadPoolService;

    @Override
    public void onApplicationEvent(ActionConfigEvent event) {
        log.info("接收事件");
        log.info(event.getPropertyMap().toString());
        Map<String, HashMap> propertyMap = event.getPropertyMap();
        String corePoolSize = (String)propertyMap.get("thread.pool.corePoolSize").get("after");
        if(!StringUtils.isEmpty(corePoolSize)){
            threadPoolService.setCorePoolSize(Integer.valueOf(corePoolSize));
        }
        String maximumPoolSize = (String)propertyMap.get("thread.pool.maximumPoolSize").get("after");
        if(!StringUtils.isEmpty(maximumPoolSize)){
            threadPoolService.setMaximumPoolSize(Integer.valueOf(maximumPoolSize));
        }
    }
}

@Slf4j
@Component
public class ThreadPoolService {

    private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10,
            60L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(2000),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    public void setCorePoolSize(Integer corePoolSize) {
        threadPoolExecutor.setCorePoolSize(corePoolSize);
    }

    public void setMaximumPoolSize(Integer maximumPoolSize) {
        threadPoolExecutor.setMaximumPoolSize(maximumPoolSize);
    }
}

在 NacosListener#onApplicationEvent 方法里獲取目標(biāo)值, 作相應(yīng)的邏輯處理糠涛。

ActionConfigEvent event:

public class ActionConfigEvent extends ApplicationEvent {

    public ActionConfigEvent(Object source) {
        super(source);
    }

    // 事件說明
    private String eventDesc;

    // 更新變化結(jié)果集
    private Map<String, HashMap> propertyMap;


    public String getEventDesc() {
        return eventDesc;
    }

    public void setEventDesc(String eventDesc) {
        this.eventDesc = eventDesc;
    }

    public Map<String, HashMap> getPropertyMap() {
        return propertyMap;
    }

    public void setPropertyMap(Map<String, HashMap> propertyMap) {
        this.propertyMap = propertyMap;
    }
}

ActionConfigEvent 主要包含 Map<String, HashMap> propertyMap;, 從這里可以獲取更新變化結(jié)果, propertyMap結(jié)構(gòu)如下:

{
    `被更新的配置key`:{
        before: `原來的值`,
        after: `更新后的值`
    },
    `被更新的配置key`:{
        before: `原來的值`兼犯,
        after: `更新后的值`
    }
}

更新配置演示

ps: 示例為 Nacos 其它配置中心無差別忍捡。

原始配置:

thread.pool.corePoolSize=5
thread.pool.maximumPoolSize=10

修改配置:

thread.pool.corePoolSize=8
thread.pool.maximumPoolSize=16
  • Nacos配置中心的配置調(diào)整


  • 在 NacosListener#onApplicationEvent 方法加入端點(diǎn)調(diào)試觀察 ActionConfigEvent 對(duì)象參數(shù)


  • ThreadPoolExecutor線程池中的corePoolSize和maximumPoolSize的變化


注意:

  • 線程池阻塞隊(duì)列容量動(dòng)態(tài)調(diào)整集漾,必須構(gòu)造新的BlockingQueue,并且將容量capacity改為非final修飾砸脊,并且是volatile修飾的具篇,這樣才可以對(duì)阻塞隊(duì)列容量進(jìn)行動(dòng)態(tài)調(diào)整。

  • 因?yàn)長inkedBlockingQueue擴(kuò)容不涉及數(shù)據(jù)遷移凌埂,保證線程安全的情況下可直接修改capacity(推薦)驱显,而類似于ArrayBlockingQueue擴(kuò)容后需要數(shù)據(jù)遷移(不推薦)。

參考:
https://blog.csdn.net/lilizhou2008/article/details/114714745

https://www.imooc.com/article/42990

https://www.cnblogs.com/root429/p/12799234.html

https://segmentfault.com/a/1190000020723172

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末瞳抓,一起剝皮案震驚了整個(gè)濱河市埃疫,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌孩哑,老刑警劉巖熔恢,帶你破解...
    沈念sama閱讀 206,723評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異臭笆,居然都是意外死亡叙淌,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,485評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門愁铺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來鹰霍,“玉大人,你說我怎么就攤上這事茵乱∶鳎” “怎么了?”我有些...
    開封第一講書人閱讀 152,998評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵瓶竭,是天一觀的道長督勺。 經(jīng)常有香客問我,道長斤贰,這世上最難降的妖魔是什么智哀? 我笑而不...
    開封第一講書人閱讀 55,323評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮荧恍,結(jié)果婚禮上瓷叫,老公的妹妹穿的比我還像新娘。我一直安慰自己送巡,他們只是感情好摹菠,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,355評(píng)論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著骗爆,像睡著了一般次氨。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上摘投,一...
    開封第一講書人閱讀 49,079評(píng)論 1 285
  • 那天煮寡,我揣著相機(jī)與錄音屉佳,去河邊找鬼。 笑死洲押,一個(gè)胖子當(dāng)著我的面吹牛武花,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播杈帐,決...
    沈念sama閱讀 38,389評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼体箕,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了挑童?” 一聲冷哼從身側(cè)響起累铅,我...
    開封第一講書人閱讀 37,019評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎站叼,沒想到半個(gè)月后娃兽,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,519評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡尽楔,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,971評(píng)論 2 325
  • 正文 我和宋清朗相戀三年投储,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片阔馋。...
    茶點(diǎn)故事閱讀 38,100評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡玛荞,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出呕寝,到底是詐尸還是另有隱情勋眯,我是刑警寧澤,帶...
    沈念sama閱讀 33,738評(píng)論 4 324
  • 正文 年R本政府宣布下梢,位于F島的核電站客蹋,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏孽江。R本人自食惡果不足惜讶坯,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,293評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望竟坛。 院中可真熱鬧闽巩,春花似錦钧舌、人聲如沸担汤。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,289評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽崭歧。三九已至,卻和暖如春撞牢,著一層夾襖步出監(jiān)牢的瞬間率碾,已是汗流浹背叔营。 一陣腳步聲響...
    開封第一講書人閱讀 31,517評(píng)論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留所宰,地道東北人绒尊。 一個(gè)月前我還...
    沈念sama閱讀 45,547評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像仔粥,于是被迫代替她去往敵國和親婴谱。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,834評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容