線程池實時管理與監(jiān)控工具的實現(xiàn)與思考

0. 前言

Java線程池作為最常使用到的并發(fā)工具揭朝,大多數(shù)同學(xué)都有使用的經(jīng)驗呢簸,但你確定你是在正確的使用線程池嗎扮叨?阿里Java代碼規(guī)范要求我們不使用 Executors來快速創(chuàng)建線程池漫玄,但是拋棄Executors敌土,使用其它方式創(chuàng)建線程池就一定不會出現(xiàn)問題嗎疫铜?本文詳細(xì)描述了一款Java線程池動態(tài)管理和實時監(jiān)控插件的開發(fā)過程茂浮,希望大家對線程池有新的認(rèn)識,主要內(nèi)容如下:

  • 我們在使用線程池過程中碰到了什么問題?
  • Java線程池是如何進(jìn)行容量管理的席揽?
  • 通過分析線程池源碼來講解如何動態(tài)修改Java線程池參數(shù)并進(jìn)行有效的監(jiān)控
  • 線上運行的實際成果展示

1. 問題

Spring提供了非常友好的@Async注解幫助我們快速方便構(gòu)造異步線程池顽馋,默認(rèn)情況下Spring Async使用SimpleAsyncTaskExecutor來處理線程,本質(zhì)上SimpleAsyncTaskExecutor不算線程池幌羞,每次方法調(diào)用都會創(chuàng)建新的線程寸谜,但是它提供了限流機制,通過concurrencyLimit屬性來控制限流的開啟(>=0:開啟新翎,-1:關(guān)閉程帕,默認(rèn)值為-1),所以通常情況下我們需要自定義線程池配置:

 @EnableAsync  
 @Configuration  
 public class SpringAsyncConfig { 
    @Bean(name = "commonExecutor")
    public Executor commonExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(20);
        taskExecutor.setQueueCapacity(50);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setAllowCoreThreadTimeOut(true);
        taskExecutor.setThreadNamePrefix("CommonExecutor-");
        taskExecutor.initialize();
        return taskExecutor;
    }
} 

@Async("commonExecutor")
public void doSth() {...}

于是在項目中我們這樣使用線程池來進(jìn)行異步化處理

@Async("commonExecutor")
public void serviceA() {...}

@Async("commonExecutor")
public void serviceB() {...}

@Async("commonExecutor")
public void serviceC() {...}

在實際應(yīng)用當(dāng)中地啰,這是一個十分危險的操作方式愁拭,在線上運行過程中,serviceA的QPS遠(yuǎn)高于serviceB和serviceC, 這樣導(dǎo)致線程池被A大量占用亏吝,B和C的請求直接被reject或者因為長時間排隊而超時岭埠。


圖1 線程池觸發(fā)RejectedExecutionException

2. 思考

針對上述問題,團(tuán)隊做了如下思考:

  • 如何針對不同的接口使用相互隔離的線程池蔚鸥?
  • 如何合理的配置線程池參數(shù)惜论?
  • 如何動態(tài)的對線程池參數(shù)進(jìn)行調(diào)整?

2.1. 如何針對不同的業(yè)務(wù)接口使用相互隔離的線程池止喷?

這個問題讓我們很容易聯(lián)想到Hystrix的資源隔離馆类,同樣是使用線程池技術(shù)來實現(xiàn)的,hystrix資源隔離的原理圖如下圖2所示:

圖2 Hystrix線程隔離

在上圖中弹谁,線程池A用來處理對service1和service2的請求乾巧,線程池B處理service3的請求,線程池C處理service4的請求预愤,Hystrix通過@HystrixCommand( threadPoolKey="xxx" ) 指定當(dāng)前HystrixCommand實例的threadPoolKey沟于,相同threadPoolKey的方法將使用相同的線程池實例,為了讓大家聚焦到線程池本身植康,這里不再對hystrix線程隔離的具體原理進(jìn)行說明旷太。類似的處理方式,我們在Spring Async 中通過配置不同的線程池實例來實現(xiàn):

 @EnableAsync  
 @Configuration  
 public class SpringAsyncConfig { 
    @Bean(name = "executorA")
    public Executor executorA() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(20);
        taskExecutor.setQueueCapacity(50);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setAllowCoreThreadTimeOut(true);
        taskExecutor.setThreadNamePrefix("ExecutorA-");
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean(name = "executorB")
    public Executor executorB() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(2);
        taskExecutor.setMaxPoolSize(8);
        taskExecutor.setQueueCapacity(20);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setAllowCoreThreadTimeOut(true);
        taskExecutor.setThreadNamePrefix("ExecutorB-");
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean(name = "executorC")
    public Executor executorC() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(40);
        taskExecutor.setQueueCapacity(100);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setAllowCoreThreadTimeOut(true);
        taskExecutor.setThreadNamePrefix("ExecutorC-");
        taskExecutor.initialize();
        return taskExecutor;
    }
} 

我們在不同的業(yè)務(wù)方法中國呢使用@Async注解傳入不同的實例name來使用不同的線程池實例:

@Async("executorA")
public void doSthA() {...}

@Async("executorB")
public void doSthB() {...}

@Async("executorC")
public void doSthC() {...}

當(dāng)然销睁,在實際開發(fā)過程當(dāng)中供璧,不可能每個業(yè)務(wù)接口都單獨使用一個線程池,否則會引入額外的開銷冻记。我們要根據(jù)實際的業(yè)務(wù)情況來進(jìn)行線程池資源的規(guī)劃睡毒。

2.2. 如何合理的配置線程池參數(shù)?

公式1:Nthreads = Ncpu \ast Ucpu \ast \frac{W}{C}

其中:
Ncpu = cpu的核心數(shù) 檩赢,Ucpu = cpu的利用率
W = 線程等待時間,C = 線程計算時間

此方案偏理論化,cpu的實際利用率(即分配多少cpu給線程池使用)和線程的計算贞瞒,等待時間非常難評估偶房,并且最后計算出來的結(jié)果也很容易偏離實際應(yīng)用場景。

公式2:coreSize = 2 \ast Ncpu , maxSize = 25 \ast Ncpu

實際使用過程中不同的業(yè)務(wù)對線程池的需求不一樣军浆,所以統(tǒng)一采用cpu核心數(shù)來配置顯然不太合理

公式3:coreSize = tps \ast time , maxSize = tps \ast time \ast (1.7~2)

此種計算方式考慮到了實際的業(yè)務(wù)情況棕洋,引入了TPS和執(zhí)行時間,假定每個接口的流量分配是平均的情況下是比較合理的乒融,但是實際情況是接口的流量是隨機的掰盘,在業(yè)務(wù)低峰期很低,在業(yè)務(wù)高峰期很高赞季,如果按平均tps去配置線程池愧捕,可能沒法在業(yè)務(wù)高峰期扛住系統(tǒng)的壓力;按峰值來配置的話申钩,在大多數(shù)時候線程池都是空閑了次绘,增加了系統(tǒng)的開銷

綜上,不管哪種計算方式都無法準(zhǔn)確的評估線程池的規(guī)模撒遣,我們需要找到一種比較靈活的配置方式邮偎。

2.3. 如何動態(tài)的對線程池參數(shù)進(jìn)行調(diào)整?

有沒有辦法動態(tài)的調(diào)整線程池的各項參數(shù)义黎,要解決這個問題禾进,我們需要對Java線程池的原理有深入的了解。下圖3是Java線程池創(chuàng)建工作線程Worker的流程圖:

圖3:Java線程池工作線程創(chuàng)建流程

圖3描述了一個線程池執(zhí)行一個工作任務(wù)的基本流程廉涕,具體原理這里不再贅述泻云,這里聊一下自己對JAVA線程池模型的一些思考:

  • Java線程池并沒有使用非常復(fù)雜的數(shù)據(jù)結(jié)構(gòu)來管理工作線程,代碼也非常簡潔火的,只用了一個HashSet來存儲工作線程對象壶愤,Worker通過繼承AQS并自己實現(xiàn)了一個不可重入的獨占鎖來保證多線程的并發(fā)安全;
  • Java線程池本質(zhì)是一個生產(chǎn)者消費者模型馏鹤,Worker作為消費者不停的消費提交過來的Task, 所以如何提交任務(wù)消費的吞吐量和效率征椒,同時控制消費者的規(guī)模,避免過多的占用系統(tǒng)資源湃累,是線程池要解決的核心問題勃救。

那么,Java線程池是如何對線程的容量進(jìn)行管理的治力?對應(yīng)的源碼如下:

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

通過源碼可以發(fā)現(xiàn)蒙秒,ThreadPoolExecutor使用一個32位的AtomicInteger類型來同時標(biāo)識線程池的運行狀態(tài)和線程數(shù)量,然后通過位運算的左移操作來修改狀態(tài)值宵统,這是一種非常巧妙的算法設(shè)計晕讲,具體計算過程如下圖所示:

圖4 線程池工作線程數(shù)量標(biāo)識

當(dāng)需要修改工作線程的數(shù)量時,通過CAS(Compare And Swap)操作來保證并發(fā)安全,CAS使用JVM底層Unsafe提供的API來直接修改變量的值瓢省,類似于樂觀鎖的機制弄息。

    /**
     * Attempts to CAS-increment the workerCount field of ctl.
     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    /**
     * Attempts to CAS-decrement the workerCount field of ctl.
     */
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    /**
     * Decrements the workerCount field of ctl. This is called only on
     * abrupt termination of a thread (see processWorkerExit). Other
     * decrements are performed within getTask.
     */
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

下面對線程池的幾個重要參數(shù)進(jìn)行說明,部分細(xì)節(jié)會引用線程池的源碼勤婚。

核心線程數(shù):corePoolSize 摹量, 最大線程數(shù):maxPoolSize
  • 如果workCount < corePoolSize,則創(chuàng)建新的Worker對象來處理任務(wù)馒胆,即使線程池容器中的其他Worker是空閑的缨称。
  • 如果corePoolSize < workCount < maximumPoolSize,若workQueue未滿祝迂,則將請求放入workQueue中睦尽,等待有空閑的線程去從workQueue中取任務(wù)并處理,只有當(dāng)workQueue滿時才創(chuàng)建新的Worker去處理任務(wù)液兽。
  • 如果workCount > maximumPoolSize且workQueue已滿骂删,通過handler所指定的拒絕策略來處理。
    所以四啰,任務(wù)提交時宁玫,判斷的順序為 corePoolSize –> queueSize –> maximumPoolSize。
最大線程數(shù)量: maximumPoolSize

線程池會保證所有可用狀態(tài)的Worker線程的數(shù)量不會超過此最大值

工作隊列: workQueue

當(dāng)工作任務(wù)的數(shù)量超過coreSize時會將新創(chuàng)建的Worker對象放入等待隊列柑晒,隊列的實現(xiàn)可以自己選擇欧瘪,可選的實現(xiàn)有SynchronousQueue,LinkedBlockingQueue匙赞,ArrayBlockingQueue等佛掖,具體原理本文不再詳述,請參考文章末尾引用的文檔涌庭。

非核心線程所允許的空閑時間: keepAliveTime

當(dāng)線程池中的線程數(shù)量大于corePoolSize的時候芥被,如果這時沒有新的任務(wù)提交,核心線程外的線程不會立即銷毀坐榆,而是會等待拴魄,直到超過了keepAliveTime。

拒絕策略執(zhí)行器: RejectedExecutionHandler

表示線程池的飽和策略席镀。如果阻塞隊列滿了并且沒有空閑的線程匹中,這時如果繼續(xù)提交任務(wù),就需要采取一種策略處理該任務(wù)豪诲。線程池提供了4種策略:

  • AbortPolicy:直接拋出異常(默認(rèn)策略)
  • CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù)
  • DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù)顶捷,并執(zhí)行當(dāng)前任務(wù)
  • DiscardPolicy:直接丟棄任務(wù)

那我們有沒有辦法在線程池運行過程當(dāng)中,動態(tài)的修改corePoolSize和maxPoolSize的值呢屎篱,以setCorePoolSize方法為例服赎,我們看一下ThreadPoolExecutor的源碼:

    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
            interruptIdleWorkers();
        else if (delta > 0) {
            // We don't really know how many new threads are "needed".
            // As a heuristic, prestart enough new workers (up to new
            // core size) to handle the current number of tasks in
            // queue, but stop if queue becomes empty while doing so.
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }

通過源碼可以看出葵蒂,當(dāng)新設(shè)置的corePoolSize的值大于當(dāng)前值時,會按照等待隊列中的任務(wù)數(shù)量來創(chuàng)建新的工作線程重虑;當(dāng)新設(shè)置的corePoolSize小于當(dāng)前工作線程時刹勃,則會調(diào)用interruptIdleWorkers方法來中斷空閑的工作線程,我們繼續(xù)看一下interruptIdleWorkers()的源碼:

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

interruptIdleWorkers方法遍歷workers集合中所有的工作線程嚎尤,如果通過tryLock獲取鎖成功,就中斷該線程伍宦。
這里為什么需要使用mainLock芽死?因為workers是HashSet類型的,不能保證線程安全次洼。我們再來看看其它線程池參數(shù)的set方法:

    public void setMaximumPoolSize(int maximumPoolSize) {
        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
            throw new IllegalArgumentException();
        this.maximumPoolSize = maximumPoolSize;
        if (workerCountOf(ctl.get()) > maximumPoolSize)
            interruptIdleWorkers();
    }

    public void setKeepAliveTime(long time, TimeUnit unit) {
        if (time < 0)
            throw new IllegalArgumentException();
        if (time == 0 && allowsCoreThreadTimeOut())
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        long keepAliveTime = unit.toNanos(time);
        long delta = keepAliveTime - this.keepAliveTime;
        this.keepAliveTime = keepAliveTime;
        if (delta < 0)
            interruptIdleWorkers();
    }

發(fā)現(xiàn)都會調(diào)用interruptIdleWorkers()方法來對線程池的容量進(jìn)行調(diào)節(jié)关贵,所以結(jié)論是我們可以在線程池運行的過程中實時的對線程池的規(guī)模重新進(jìn)行調(diào)節(jié),并且是安全可靠的卖毁。

3. 實現(xiàn)

經(jīng)過充分的調(diào)研揖曾,我們決定實現(xiàn)一個非常輕量級的動態(tài)創(chuàng)建線程池SpringBoot插件,主要功能如下圖所示:


圖5 動態(tài)線程池架構(gòu)圖

3.1 配置管理

將線程池的基本參數(shù)存儲載Apollo配置中心和MySQL,可以很方便的進(jìn)行配置的修改操作(第一個版本只支持Apollo配置中心亥啦,后續(xù)會提供MySQL的支持)炭剪,Apollo配置參考如下:

#DataMonitor 監(jiān)控業(yè)務(wù)報表數(shù)據(jù)
naughty.threadpools.executors[0].corePoolSize = 10
naughty.threadpools.executors[0].maximumPoolSize = 30
naughty.threadpools.executors[0].keepAliveTime = 300
naughty.threadpools.waitRefreshConfigSeconds = 10
naughty.threadpools.executors[0].threadPoolName = DataMonitor
naughty.threadpools.executors[0].queueCapacity = 5
naughty.threadpools.executors[0].rejectedExecutionType = AbortPolicy

#ExposureExecutor 用戶準(zhǔn)入接口異步調(diào)用
naughty.threadpools.executors[1].threadPoolName = ExposureExecutor
naughty.threadpools.executors[1].queueCapacity = 1
naughty.threadpools.executors[1].rejectedExecutionType = CallerRunsPolicy
naughty.threadpools.executors[1].corePoolSize = 5
naughty.threadpools.executors[1].maximumPoolSize = 20
naughty.threadpools.executors[1].keepAliveTime = 300

3.2 配置監(jiān)聽

我們利用Apollo的ChangeListener來實現(xiàn)對配置變更的監(jiān)聽,(如果是MySQL翔脱,可以修改完配置后直接同過HTTP接口通知客戶端進(jìn)行配置刷新)奴拦,代碼片段如下:

public class ThreadPoolConfigUpdateListener {

    @Value("${apollo.bootstrap.namespaces:application}")
    private String namespace;

    @Autowired
    private DynamicThreadPoolFacade dynamicThreadPoolManager;

    @Autowired
    private DynamicThreadPoolProperties poolProperties;

    @PostConstruct
    public void init() {
        initConfigUpdateListener();
    }

    public void initConfigUpdateListener() {
        String apolloNamespace = namespace;
        if (StringUtils.hasText(poolProperties.getApolloNamespace())) {
            apolloNamespace = poolProperties.getApolloNamespace();
        }
        String finalApolloNamespace = apolloNamespace;
        Config config = ConfigService.getConfig(finalApolloNamespace);
        config.addChangeListener(changeEvent -> {
            try {
                Thread.sleep(poolProperties.getWaitRefreshConfigSeconds() * 1000);
            } catch (InterruptedException e) {
                log.error("配置刷新異常",e);
            }
            dynamicThreadPoolManager.refreshThreadPoolExecutor();
            log.info("線程池配置有變化,刷新完成");
        });
    }

}

線程池配置的刷新的邏輯簡單描述如下:

  • 對于新增線程池配置届吁,我們需要創(chuàng)建一個線程池實例 错妖,調(diào)用Spring容器提供的defaultListableBeanFactory.registerSingleton()方法將ThreadPoolTaskExecutor實例添加到Spring容器,最后調(diào)用autowireCapableBeanFactory.autowireBean(executor)方法注入相關(guān)的依賴疚沐。
  • 如果是更新已經(jīng)存在的線程池配置暂氯,直接從Spring容器中取出該實例然后刷新相應(yīng)的配置即可。
    public void refreshThreadPoolExecutor(DynamicThreadPoolProperties dynamicThreadPoolProperties) {
            dynamicThreadPoolProperties.getExecutors().forEach(poolProperties -> {
                NaughtyThreadPoolTaskExecutor executor = getExecutor(poolProperties.getThreadPoolName());
                if (executor == null) {
                    executor = new NaughtyThreadPoolTaskExecutor();

                    managerExecutor(executor, poolProperties);
                    executor.setBlockingQueue(getBlockingQueue(poolProperties.getQueueType(), poolProperties.getQueueCapacity()));

                    executor.initialize();
                    //將new出的對象放入Spring容器中
                    defaultListableBeanFactory.registerSingleton(poolProperties.getThreadPoolName(), executor);
                    //自動注入依賴
                    autowireCapableBeanFactory.autowireBean(executor);
                }else{
                    managerExecutor(executor, poolProperties);
                    BlockingQueue<Runnable> queue = executor.getThreadPoolExecutor().getQueue();
                    if (queue instanceof ResizableCapacityLinkedBlockIngQueue) {
                        ((ResizableCapacityLinkedBlockIngQueue<Runnable>) queue).setCapacity(poolProperties.getQueueCapacity());
                    }
                }

        });
    }

    private void managerExecutor(NaughtyThreadPoolTaskExecutor executor, ThreadPoolProperties poolProperties) {
        try {
            if (executor!=null) {
                executor.setBeanName(poolProperties.getThreadPoolName());
                executor.setCorePoolSize(poolProperties.getCorePoolSize());
                executor.setMaxPoolSize(poolProperties.getMaximumPoolSize());
                executor.setKeepAliveSeconds((int) poolProperties.getKeepAliveTime());
                executor.setRejectedExecutionHandler(this.getRejectedExecutionHandler(poolProperties.getRejectedExecutionType(), poolProperties.getThreadPoolName()));
                executor.setThreadPoolName(poolProperties.getThreadPoolName());
            }
        }catch(Exception e){
            log.error("Executor 參數(shù)設(shè)置異常",e);
        }
    }

3.3 狀態(tài)監(jiān)控

ThreadPoolExecutor提供了beforeExecute, afterExecute 等鉤子方法亮蛔,我們可以可以在鉤子方法中對線程池任務(wù)的執(zhí)行時間上報CAT,代碼片段如下:

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        String threadName = Thread.currentThread().getName();
        Transaction transaction = Cat.newTransaction(threadPoolName, runnableNameMap.get(r.getClass().getSimpleName()));
        transactionMap.put(threadName, transaction);
        super.beforeExecute(t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        String threadName = Thread.currentThread().getName();
        Transaction transaction = transactionMap.get(threadName);
        transaction.setStatus(Message.SUCCESS);
        if (t != null) {
            Cat.logError(t);
            transaction.setStatus(t);
        }
        transaction.complete();
        transactionMap.remove(threadName);
    }

對應(yīng)的CAT監(jiān)控數(shù)據(jù)如下圖所示:


圖6 線程池執(zhí)行時間監(jiān)控

通過使用CAT的StatusExtension痴施,可以定時將線程池的運行時狀態(tài)數(shù)據(jù)發(fā)送到CAT并生成柱狀圖,相關(guān)實現(xiàn)代碼如下:

    public StatusExtension registerStatusExtension(ThreadPoolProperties prop, Object object) {
        NaughtyThreadPoolTaskExecutor executor = (NaughtyThreadPoolTaskExecutor) object;
        StatusExtension statusExtension =  new StatusExtension() {
            @Override
            public String getId() {
                return "thread.pool.info." + prop.getThreadPoolName();
            }

            @Override
            public String getDescription() {
                return "線程池監(jiān)控";
            }

            @Override
            public Map<String, String> getProperties() {
                AtomicLong rejectCount = getRejectCount(prop.getThreadPoolName());

                Map<String, String> pool = new HashMap<>();
              pool.put("activeCount", String.valueOf(executor.getActiveCount()));
                    pool.put("keepAliveTime", String.valueOf(executor.getKeepAliveSeconds()));
                    int coreSize = executor.getCorePoolSize();
                    int maxSize = executor.getMaxPoolSize();
                    if (coreSize!=0){
                        pool.put("active/core", String.valueOf(Float.valueOf(executor.getActiveCount())/Float.valueOf(coreSize)));
                    }
                    if (maxSize!=0){
                        pool.put("active/max", String.valueOf(Float.valueOf(executor.getActiveCount())/Float.valueOf(maxSize)));
                    }
                    pool.put("coreSize", String.valueOf(executor.getCorePoolSize()));
                    pool.put("maxSize", String.valueOf(executor.getMaxPoolSize()));
                    ThreadPoolExecutor threadPoolExecutor = executor.getThreadPoolExecutor();
                    pool.put("completedTaskCount", String.valueOf(threadPoolExecutor.getCompletedTaskCount()));
                    pool.put("largestPoolSize", String.valueOf(threadPoolExecutor.getLargestPoolSize()));
                    pool.put("taskCount", String.valueOf(threadPoolExecutor.getTaskCount()));
                    pool.put("rejectCount", String.valueOf(rejectCount == null ? 0 : rejectCount.get()));
                    pool.put("queueSize", String.valueOf(threadPoolExecutor.getQueue().size()));
                return pool;
            }
        };
        StatusExtensionRegister.getInstance().register(statusExtension);
        return statusExtension;
    }

各項監(jiān)控指標(biāo)的說明如下:(以下部分觀點可能需要經(jīng)過進(jìn)一步的驗證尔邓,僅供大家參考)

  • active/coreSize :活動線程數(shù)和核心線程數(shù)的比值晾剖, 其中active = executor.getActiveCount(),表示所有運行中的工作線程的數(shù)量梯嗽,這個比值反應(yīng)線程池的線程活躍狀態(tài)齿尽,如果一直維持在一個很低的水平,則說明線程池需要進(jìn)行縮容灯节;如果長時間維持一個很大的數(shù)值循头,說明活躍度好绵估,線程池利用率高。
  • active/maxSize :活動線程數(shù)和最大線程數(shù)的比值卡骂,這個值可以配合上面的 active/coreSize 來看国裳,當(dāng)active/coreSize大于100%的時候,如果active/maxSize維持在一個較低的值全跨,則說明當(dāng)前線程池的負(fù)載偏低缝左,如果大于60%或者更高,則說明線程池過載浓若,需要及時調(diào)整線程池容量配置渺杉。
  • completedTaskCount:執(zhí)行完畢的工作線程的總數(shù),包含歷史所有挪钓。
  • largestPoolSize:歷史上線程池容量觸達(dá)過的最大值
  • rejectCount:被拒絕的線程的數(shù)量是越,如果大量線程被拒絕,則說明當(dāng)前線程池已經(jīng)溢出了碌上,需要及時調(diào)整線程池配置
  • queueSize:隊列中工作線程的數(shù)量倚评,如果大量的線程池在排隊,說明coreSize已經(jīng)不夠用了馏予,可以根據(jù)實際情況來調(diào)整天梧,對于執(zhí)行時間要求很嚴(yán)格的業(yè)務(wù)場景,可能需要通過提升coreSize來減少排隊情況霞丧。

實際生產(chǎn)環(huán)境的線程池狀態(tài)監(jiān)控如下圖所示:


圖7 生產(chǎn)環(huán)境線程池監(jiān)控

展望

項目在使用線程池監(jiān)控插件以后腿倚,獲得了如下收益:

  • 大量在后臺使用線程池異步運行的批處理任務(wù)得到了有效隔離,管理和監(jiān)控蚯妇,通過查看CAT的各項埋點能夠有針對性的對某個業(yè)務(wù)的批處理任務(wù)進(jìn)行優(yōu)化敷燎,避免影響服務(wù)的穩(wěn)定性
  • 對于高并發(fā)的線程池場景,在高負(fù)載的情況下能夠第一時間收到告警箩言,基本杜絕了因線程池溢出導(dǎo)致的接口不可用
  • 作為一個輕量插件硬贯,通過極小的投入獲得了還不錯的收益,我們需要挖掘類似的高性價比的技術(shù)解決方案陨收,不斷提高研發(fā)團(tuán)隊的效能饭豹。

未來我們會考慮進(jìn)行后續(xù)版本的迭代,嘗試加入以下功能:

  • 支持MySQL配置或其它配置中心
  • 提供配置管理Web界面
  • 提供可以獨立部署的線程池監(jiān)控Portal务漩,不依賴CAT
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末拄衰,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子饵骨,更是在濱河造成了極大的恐慌翘悉,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件居触,死亡現(xiàn)場離奇詭異妖混,居然都是意外死亡老赤,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進(jìn)店門制市,熙熙樓的掌柜王于貴愁眉苦臉地迎上來抬旺,“玉大人,你說我怎么就攤上這事祥楣】疲” “怎么了?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵误褪,是天一觀的道長床未。 經(jīng)常有香客問我,道長振坚,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任斋扰,我火速辦了婚禮渡八,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘传货。我一直安慰自己屎鳍,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布问裕。 她就那樣靜靜地躺著逮壁,像睡著了一般。 火紅的嫁衣襯著肌膚如雪粮宛。 梳的紋絲不亂的頭發(fā)上窥淆,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天,我揣著相機與錄音巍杈,去河邊找鬼忧饭。 笑死,一個胖子當(dāng)著我的面吹牛筷畦,可吹牛的內(nèi)容都是我干的词裤。 我是一名探鬼主播,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼鳖宾,長吁一口氣:“原來是場噩夢啊……” “哼吼砂!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起鼎文,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤渔肩,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后拇惋,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體赖瞒,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡女揭,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了栏饮。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片吧兔。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖袍嬉,靈堂內(nèi)的尸體忽然破棺而出境蔼,到底是詐尸還是另有隱情,我是刑警寧澤伺通,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布箍土,位于F島的核電站,受9級特大地震影響罐监,放射性物質(zhì)發(fā)生泄漏吴藻。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一弓柱、第九天 我趴在偏房一處隱蔽的房頂上張望沟堡。 院中可真熱鬧,春花似錦矢空、人聲如沸航罗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽粥血。三九已至,卻和暖如春酿箭,著一層夾襖步出監(jiān)牢的瞬間复亏,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工缭嫡, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蜓耻,地道東北人。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓械巡,卻偏偏與公主長得像刹淌,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子讥耗,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容