0. 前言
Java線程池作為最常使用到的并發(fā)工具揭朝,大多數(shù)同學(xué)都有使用的經(jīng)驗呢簸,但你確定你是在正確的使用線程池嗎扮叨?阿里Java代碼規(guī)范要求我們不使用 Executors來快速創(chuàng)建線程池漫玄,但是拋棄Executors敌土,使用其它方式創(chuàng)建線程池就一定不會出現(xiàn)問題嗎疫铜?本文詳細(xì)描述了一款Java線程池動態(tài)管理和實時監(jiān)控插件的開發(fā)過程茂浮,希望大家對線程池有新的認(rèn)識,主要內(nèi)容如下:
- 我們在使用線程池過程中碰到了什么問題?
- Java線程池是如何進(jìn)行容量管理的席揽?
- 通過分析線程池源碼來講解如何動態(tài)修改Java線程池參數(shù)并進(jìn)行有效的監(jiān)控
- 線上運行的實際成果展示
1. 問題
Spring提供了非常友好的@Async注解幫助我們快速方便構(gòu)造異步線程池顽馋,默認(rèn)情況下Spring Async使用SimpleAsyncTaskExecutor來處理線程,本質(zhì)上SimpleAsyncTaskExecutor不算線程池幌羞,每次方法調(diào)用都會創(chuàng)建新的線程寸谜,但是它提供了限流機制,通過concurrencyLimit屬性來控制限流的開啟(>=0:開啟新翎,-1:關(guān)閉程帕,默認(rèn)值為-1),所以通常情況下我們需要自定義線程池配置:
@EnableAsync
@Configuration
public class SpringAsyncConfig {
@Bean(name = "commonExecutor")
public Executor commonExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(20);
taskExecutor.setQueueCapacity(50);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setAllowCoreThreadTimeOut(true);
taskExecutor.setThreadNamePrefix("CommonExecutor-");
taskExecutor.initialize();
return taskExecutor;
}
}
@Async("commonExecutor")
public void doSth() {...}
于是在項目中我們這樣使用線程池來進(jìn)行異步化處理
@Async("commonExecutor")
public void serviceA() {...}
@Async("commonExecutor")
public void serviceB() {...}
@Async("commonExecutor")
public void serviceC() {...}
在實際應(yīng)用當(dāng)中地啰,這是一個十分危險的操作方式愁拭,在線上運行過程中,serviceA的QPS遠(yuǎn)高于serviceB和serviceC, 這樣導(dǎo)致線程池被A大量占用亏吝,B和C的請求直接被reject或者因為長時間排隊而超時岭埠。
2. 思考
針對上述問題,團(tuán)隊做了如下思考:
- 如何針對不同的接口使用相互隔離的線程池蔚鸥?
- 如何合理的配置線程池參數(shù)惜论?
- 如何動態(tài)的對線程池參數(shù)進(jìn)行調(diào)整?
2.1. 如何針對不同的業(yè)務(wù)接口使用相互隔離的線程池止喷?
這個問題讓我們很容易聯(lián)想到Hystrix的資源隔離馆类,同樣是使用線程池技術(shù)來實現(xiàn)的,hystrix資源隔離的原理圖如下圖2所示:
在上圖中弹谁,線程池A用來處理對service1和service2的請求乾巧,線程池B處理service3的請求,線程池C處理service4的請求预愤,Hystrix通過@HystrixCommand( threadPoolKey="xxx" ) 指定當(dāng)前HystrixCommand實例的threadPoolKey沟于,相同threadPoolKey的方法將使用相同的線程池實例,為了讓大家聚焦到線程池本身植康,這里不再對hystrix線程隔離的具體原理進(jìn)行說明旷太。類似的處理方式,我們在Spring Async 中通過配置不同的線程池實例來實現(xiàn):
@EnableAsync
@Configuration
public class SpringAsyncConfig {
@Bean(name = "executorA")
public Executor executorA() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(20);
taskExecutor.setQueueCapacity(50);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setAllowCoreThreadTimeOut(true);
taskExecutor.setThreadNamePrefix("ExecutorA-");
taskExecutor.initialize();
return taskExecutor;
}
@Bean(name = "executorB")
public Executor executorB() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(2);
taskExecutor.setMaxPoolSize(8);
taskExecutor.setQueueCapacity(20);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setAllowCoreThreadTimeOut(true);
taskExecutor.setThreadNamePrefix("ExecutorB-");
taskExecutor.initialize();
return taskExecutor;
}
@Bean(name = "executorC")
public Executor executorC() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(40);
taskExecutor.setQueueCapacity(100);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setAllowCoreThreadTimeOut(true);
taskExecutor.setThreadNamePrefix("ExecutorC-");
taskExecutor.initialize();
return taskExecutor;
}
}
我們在不同的業(yè)務(wù)方法中國呢使用@Async注解傳入不同的實例name來使用不同的線程池實例:
@Async("executorA")
public void doSthA() {...}
@Async("executorB")
public void doSthB() {...}
@Async("executorC")
public void doSthC() {...}
當(dāng)然销睁,在實際開發(fā)過程當(dāng)中供璧,不可能每個業(yè)務(wù)接口都單獨使用一個線程池,否則會引入額外的開銷冻记。我們要根據(jù)實際的業(yè)務(wù)情況來進(jìn)行線程池資源的規(guī)劃睡毒。
2.2. 如何合理的配置線程池參數(shù)?
公式1:Nthreads = Ncpu
Ucpu
其中:
Ncpu = cpu的核心數(shù) 檩赢,Ucpu = cpu的利用率
W = 線程等待時間,C = 線程計算時間
此方案偏理論化,cpu的實際利用率(即分配多少cpu給線程池使用)和線程的計算贞瞒,等待時間非常難評估偶房,并且最后計算出來的結(jié)果也很容易偏離實際應(yīng)用場景。
公式2:coreSize = 2
Ncpu , maxSize = 25
Ncpu
實際使用過程中不同的業(yè)務(wù)對線程池的需求不一樣军浆,所以統(tǒng)一采用cpu核心數(shù)來配置顯然不太合理
公式3:coreSize = tps
time , maxSize = tps
time
(1.7~2)
此種計算方式考慮到了實際的業(yè)務(wù)情況棕洋,引入了TPS和執(zhí)行時間,假定每個接口的流量分配是平均的情況下是比較合理的乒融,但是實際情況是接口的流量是隨機的掰盘,在業(yè)務(wù)低峰期很低,在業(yè)務(wù)高峰期很高赞季,如果按平均tps去配置線程池愧捕,可能沒法在業(yè)務(wù)高峰期扛住系統(tǒng)的壓力;按峰值來配置的話申钩,在大多數(shù)時候線程池都是空閑了次绘,增加了系統(tǒng)的開銷
綜上,不管哪種計算方式都無法準(zhǔn)確的評估線程池的規(guī)模撒遣,我們需要找到一種比較靈活的配置方式邮偎。
2.3. 如何動態(tài)的對線程池參數(shù)進(jìn)行調(diào)整?
有沒有辦法動態(tài)的調(diào)整線程池的各項參數(shù)义黎,要解決這個問題禾进,我們需要對Java線程池的原理有深入的了解。下圖3是Java線程池創(chuàng)建工作線程Worker的流程圖:
圖3描述了一個線程池執(zhí)行一個工作任務(wù)的基本流程廉涕,具體原理這里不再贅述泻云,這里聊一下自己對JAVA線程池模型的一些思考:
- Java線程池并沒有使用非常復(fù)雜的數(shù)據(jù)結(jié)構(gòu)來管理工作線程,代碼也非常簡潔火的,只用了一個HashSet來存儲工作線程對象壶愤,Worker通過繼承AQS并自己實現(xiàn)了一個不可重入的獨占鎖來保證多線程的并發(fā)安全;
- Java線程池本質(zhì)是一個生產(chǎn)者消費者模型馏鹤,Worker作為消費者不停的消費提交過來的Task, 所以如何提交任務(wù)消費的吞吐量和效率征椒,同時控制消費者的規(guī)模,避免過多的占用系統(tǒng)資源湃累,是線程池要解決的核心問題勃救。
那么,Java線程池是如何對線程的容量進(jìn)行管理的治力?對應(yīng)的源碼如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
通過源碼可以發(fā)現(xiàn)蒙秒,ThreadPoolExecutor使用一個32位的AtomicInteger類型來同時標(biāo)識線程池的運行狀態(tài)和線程數(shù)量,然后通過位運算的左移操作來修改狀態(tài)值宵统,這是一種非常巧妙的算法設(shè)計晕讲,具體計算過程如下圖所示:
當(dāng)需要修改工作線程的數(shù)量時,通過CAS(Compare And Swap)操作來保證并發(fā)安全,CAS使用JVM底層Unsafe提供的API來直接修改變量的值瓢省,類似于樂觀鎖的機制弄息。
/**
* Attempts to CAS-increment the workerCount field of ctl.
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* Attempts to CAS-decrement the workerCount field of ctl.
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
下面對線程池的幾個重要參數(shù)進(jìn)行說明,部分細(xì)節(jié)會引用線程池的源碼勤婚。
核心線程數(shù):corePoolSize 摹量, 最大線程數(shù):maxPoolSize
- 如果workCount < corePoolSize,則創(chuàng)建新的Worker對象來處理任務(wù)馒胆,即使線程池容器中的其他Worker是空閑的缨称。
- 如果corePoolSize < workCount < maximumPoolSize,若workQueue未滿祝迂,則將請求放入workQueue中睦尽,等待有空閑的線程去從workQueue中取任務(wù)并處理,只有當(dāng)workQueue滿時才創(chuàng)建新的Worker去處理任務(wù)液兽。
- 如果workCount > maximumPoolSize且workQueue已滿骂删,通過handler所指定的拒絕策略來處理。
所以四啰,任務(wù)提交時宁玫,判斷的順序為 corePoolSize –> queueSize –> maximumPoolSize。
最大線程數(shù)量: maximumPoolSize
線程池會保證所有可用狀態(tài)的Worker線程的數(shù)量不會超過此最大值
工作隊列: workQueue
當(dāng)工作任務(wù)的數(shù)量超過coreSize時會將新創(chuàng)建的Worker對象放入等待隊列柑晒,隊列的實現(xiàn)可以自己選擇欧瘪,可選的實現(xiàn)有SynchronousQueue,LinkedBlockingQueue匙赞,ArrayBlockingQueue等佛掖,具體原理本文不再詳述,請參考文章末尾引用的文檔涌庭。
非核心線程所允許的空閑時間: keepAliveTime
當(dāng)線程池中的線程數(shù)量大于corePoolSize的時候芥被,如果這時沒有新的任務(wù)提交,核心線程外的線程不會立即銷毀坐榆,而是會等待拴魄,直到超過了keepAliveTime。
拒絕策略執(zhí)行器: RejectedExecutionHandler
表示線程池的飽和策略席镀。如果阻塞隊列滿了并且沒有空閑的線程匹中,這時如果繼續(xù)提交任務(wù),就需要采取一種策略處理該任務(wù)豪诲。線程池提供了4種策略:
- AbortPolicy:直接拋出異常(默認(rèn)策略)
- CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù)
- DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù)顶捷,并執(zhí)行當(dāng)前任務(wù)
- DiscardPolicy:直接丟棄任務(wù)
那我們有沒有辦法在線程池運行過程當(dāng)中,動態(tài)的修改corePoolSize和maxPoolSize的值呢屎篱,以setCorePoolSize方法為例服赎,我們看一下ThreadPoolExecutor的源碼:
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
通過源碼可以看出葵蒂,當(dāng)新設(shè)置的corePoolSize的值大于當(dāng)前值時,會按照等待隊列中的任務(wù)數(shù)量來創(chuàng)建新的工作線程重虑;當(dāng)新設(shè)置的corePoolSize小于當(dāng)前工作線程時刹勃,則會調(diào)用interruptIdleWorkers方法來中斷空閑的工作線程,我們繼續(xù)看一下interruptIdleWorkers()的源碼:
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
interruptIdleWorkers方法遍歷workers集合中所有的工作線程嚎尤,如果通過tryLock獲取鎖成功,就中斷該線程伍宦。
這里為什么需要使用mainLock芽死?因為workers是HashSet類型的,不能保證線程安全次洼。我們再來看看其它線程池參數(shù)的set方法:
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < 0)
throw new IllegalArgumentException();
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
long keepAliveTime = unit.toNanos(time);
long delta = keepAliveTime - this.keepAliveTime;
this.keepAliveTime = keepAliveTime;
if (delta < 0)
interruptIdleWorkers();
}
發(fā)現(xiàn)都會調(diào)用interruptIdleWorkers()方法來對線程池的容量進(jìn)行調(diào)節(jié)关贵,所以結(jié)論是我們可以在線程池運行的過程中實時的對線程池的規(guī)模重新進(jìn)行調(diào)節(jié),并且是安全可靠的卖毁。
3. 實現(xiàn)
經(jīng)過充分的調(diào)研揖曾,我們決定實現(xiàn)一個非常輕量級的動態(tài)創(chuàng)建線程池SpringBoot插件,主要功能如下圖所示:
3.1 配置管理
將線程池的基本參數(shù)存儲載Apollo配置中心和MySQL,可以很方便的進(jìn)行配置的修改操作(第一個版本只支持Apollo配置中心亥啦,后續(xù)會提供MySQL的支持)炭剪,Apollo配置參考如下:
#DataMonitor 監(jiān)控業(yè)務(wù)報表數(shù)據(jù)
naughty.threadpools.executors[0].corePoolSize = 10
naughty.threadpools.executors[0].maximumPoolSize = 30
naughty.threadpools.executors[0].keepAliveTime = 300
naughty.threadpools.waitRefreshConfigSeconds = 10
naughty.threadpools.executors[0].threadPoolName = DataMonitor
naughty.threadpools.executors[0].queueCapacity = 5
naughty.threadpools.executors[0].rejectedExecutionType = AbortPolicy
#ExposureExecutor 用戶準(zhǔn)入接口異步調(diào)用
naughty.threadpools.executors[1].threadPoolName = ExposureExecutor
naughty.threadpools.executors[1].queueCapacity = 1
naughty.threadpools.executors[1].rejectedExecutionType = CallerRunsPolicy
naughty.threadpools.executors[1].corePoolSize = 5
naughty.threadpools.executors[1].maximumPoolSize = 20
naughty.threadpools.executors[1].keepAliveTime = 300
3.2 配置監(jiān)聽
我們利用Apollo的ChangeListener來實現(xiàn)對配置變更的監(jiān)聽,(如果是MySQL翔脱,可以修改完配置后直接同過HTTP接口通知客戶端進(jìn)行配置刷新)奴拦,代碼片段如下:
public class ThreadPoolConfigUpdateListener {
@Value("${apollo.bootstrap.namespaces:application}")
private String namespace;
@Autowired
private DynamicThreadPoolFacade dynamicThreadPoolManager;
@Autowired
private DynamicThreadPoolProperties poolProperties;
@PostConstruct
public void init() {
initConfigUpdateListener();
}
public void initConfigUpdateListener() {
String apolloNamespace = namespace;
if (StringUtils.hasText(poolProperties.getApolloNamespace())) {
apolloNamespace = poolProperties.getApolloNamespace();
}
String finalApolloNamespace = apolloNamespace;
Config config = ConfigService.getConfig(finalApolloNamespace);
config.addChangeListener(changeEvent -> {
try {
Thread.sleep(poolProperties.getWaitRefreshConfigSeconds() * 1000);
} catch (InterruptedException e) {
log.error("配置刷新異常",e);
}
dynamicThreadPoolManager.refreshThreadPoolExecutor();
log.info("線程池配置有變化,刷新完成");
});
}
}
線程池配置的刷新的邏輯簡單描述如下:
- 對于新增線程池配置届吁,我們需要創(chuàng)建一個線程池實例 错妖,調(diào)用Spring容器提供的defaultListableBeanFactory.registerSingleton()方法將ThreadPoolTaskExecutor實例添加到Spring容器,最后調(diào)用autowireCapableBeanFactory.autowireBean(executor)方法注入相關(guān)的依賴疚沐。
- 如果是更新已經(jīng)存在的線程池配置暂氯,直接從Spring容器中取出該實例然后刷新相應(yīng)的配置即可。
public void refreshThreadPoolExecutor(DynamicThreadPoolProperties dynamicThreadPoolProperties) {
dynamicThreadPoolProperties.getExecutors().forEach(poolProperties -> {
NaughtyThreadPoolTaskExecutor executor = getExecutor(poolProperties.getThreadPoolName());
if (executor == null) {
executor = new NaughtyThreadPoolTaskExecutor();
managerExecutor(executor, poolProperties);
executor.setBlockingQueue(getBlockingQueue(poolProperties.getQueueType(), poolProperties.getQueueCapacity()));
executor.initialize();
//將new出的對象放入Spring容器中
defaultListableBeanFactory.registerSingleton(poolProperties.getThreadPoolName(), executor);
//自動注入依賴
autowireCapableBeanFactory.autowireBean(executor);
}else{
managerExecutor(executor, poolProperties);
BlockingQueue<Runnable> queue = executor.getThreadPoolExecutor().getQueue();
if (queue instanceof ResizableCapacityLinkedBlockIngQueue) {
((ResizableCapacityLinkedBlockIngQueue<Runnable>) queue).setCapacity(poolProperties.getQueueCapacity());
}
}
});
}
private void managerExecutor(NaughtyThreadPoolTaskExecutor executor, ThreadPoolProperties poolProperties) {
try {
if (executor!=null) {
executor.setBeanName(poolProperties.getThreadPoolName());
executor.setCorePoolSize(poolProperties.getCorePoolSize());
executor.setMaxPoolSize(poolProperties.getMaximumPoolSize());
executor.setKeepAliveSeconds((int) poolProperties.getKeepAliveTime());
executor.setRejectedExecutionHandler(this.getRejectedExecutionHandler(poolProperties.getRejectedExecutionType(), poolProperties.getThreadPoolName()));
executor.setThreadPoolName(poolProperties.getThreadPoolName());
}
}catch(Exception e){
log.error("Executor 參數(shù)設(shè)置異常",e);
}
}
3.3 狀態(tài)監(jiān)控
ThreadPoolExecutor提供了beforeExecute, afterExecute 等鉤子方法亮蛔,我們可以可以在鉤子方法中對線程池任務(wù)的執(zhí)行時間上報CAT,代碼片段如下:
@Override
protected void beforeExecute(Thread t, Runnable r) {
String threadName = Thread.currentThread().getName();
Transaction transaction = Cat.newTransaction(threadPoolName, runnableNameMap.get(r.getClass().getSimpleName()));
transactionMap.put(threadName, transaction);
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
String threadName = Thread.currentThread().getName();
Transaction transaction = transactionMap.get(threadName);
transaction.setStatus(Message.SUCCESS);
if (t != null) {
Cat.logError(t);
transaction.setStatus(t);
}
transaction.complete();
transactionMap.remove(threadName);
}
對應(yīng)的CAT監(jiān)控數(shù)據(jù)如下圖所示:
通過使用CAT的StatusExtension痴施,可以定時將線程池的運行時狀態(tài)數(shù)據(jù)發(fā)送到CAT并生成柱狀圖,相關(guān)實現(xiàn)代碼如下:
public StatusExtension registerStatusExtension(ThreadPoolProperties prop, Object object) {
NaughtyThreadPoolTaskExecutor executor = (NaughtyThreadPoolTaskExecutor) object;
StatusExtension statusExtension = new StatusExtension() {
@Override
public String getId() {
return "thread.pool.info." + prop.getThreadPoolName();
}
@Override
public String getDescription() {
return "線程池監(jiān)控";
}
@Override
public Map<String, String> getProperties() {
AtomicLong rejectCount = getRejectCount(prop.getThreadPoolName());
Map<String, String> pool = new HashMap<>();
pool.put("activeCount", String.valueOf(executor.getActiveCount()));
pool.put("keepAliveTime", String.valueOf(executor.getKeepAliveSeconds()));
int coreSize = executor.getCorePoolSize();
int maxSize = executor.getMaxPoolSize();
if (coreSize!=0){
pool.put("active/core", String.valueOf(Float.valueOf(executor.getActiveCount())/Float.valueOf(coreSize)));
}
if (maxSize!=0){
pool.put("active/max", String.valueOf(Float.valueOf(executor.getActiveCount())/Float.valueOf(maxSize)));
}
pool.put("coreSize", String.valueOf(executor.getCorePoolSize()));
pool.put("maxSize", String.valueOf(executor.getMaxPoolSize()));
ThreadPoolExecutor threadPoolExecutor = executor.getThreadPoolExecutor();
pool.put("completedTaskCount", String.valueOf(threadPoolExecutor.getCompletedTaskCount()));
pool.put("largestPoolSize", String.valueOf(threadPoolExecutor.getLargestPoolSize()));
pool.put("taskCount", String.valueOf(threadPoolExecutor.getTaskCount()));
pool.put("rejectCount", String.valueOf(rejectCount == null ? 0 : rejectCount.get()));
pool.put("queueSize", String.valueOf(threadPoolExecutor.getQueue().size()));
return pool;
}
};
StatusExtensionRegister.getInstance().register(statusExtension);
return statusExtension;
}
各項監(jiān)控指標(biāo)的說明如下:(以下部分觀點可能需要經(jīng)過進(jìn)一步的驗證尔邓,僅供大家參考)
- active/coreSize :活動線程數(shù)和核心線程數(shù)的比值晾剖, 其中active = executor.getActiveCount(),表示所有運行中的工作線程的數(shù)量梯嗽,這個比值反應(yīng)線程池的線程活躍狀態(tài)齿尽,如果一直維持在一個很低的水平,則說明線程池需要進(jìn)行縮容灯节;如果長時間維持一個很大的數(shù)值循头,說明活躍度好绵估,線程池利用率高。
- active/maxSize :活動線程數(shù)和最大線程數(shù)的比值卡骂,這個值可以配合上面的 active/coreSize 來看国裳,當(dāng)active/coreSize大于100%的時候,如果active/maxSize維持在一個較低的值全跨,則說明當(dāng)前線程池的負(fù)載偏低缝左,如果大于60%或者更高,則說明線程池過載浓若,需要及時調(diào)整線程池容量配置渺杉。
- completedTaskCount:執(zhí)行完畢的工作線程的總數(shù),包含歷史所有挪钓。
- largestPoolSize:歷史上線程池容量觸達(dá)過的最大值
- rejectCount:被拒絕的線程的數(shù)量是越,如果大量線程被拒絕,則說明當(dāng)前線程池已經(jīng)溢出了碌上,需要及時調(diào)整線程池配置
- queueSize:隊列中工作線程的數(shù)量倚评,如果大量的線程池在排隊,說明coreSize已經(jīng)不夠用了馏予,可以根據(jù)實際情況來調(diào)整天梧,對于執(zhí)行時間要求很嚴(yán)格的業(yè)務(wù)場景,可能需要通過提升coreSize來減少排隊情況霞丧。
實際生產(chǎn)環(huán)境的線程池狀態(tài)監(jiān)控如下圖所示:
展望
項目在使用線程池監(jiān)控插件以后腿倚,獲得了如下收益:
- 大量在后臺使用線程池異步運行的批處理任務(wù)得到了有效隔離,管理和監(jiān)控蚯妇,通過查看CAT的各項埋點能夠有針對性的對某個業(yè)務(wù)的批處理任務(wù)進(jìn)行優(yōu)化敷燎,避免影響服務(wù)的穩(wěn)定性
- 對于高并發(fā)的線程池場景,在高負(fù)載的情況下能夠第一時間收到告警箩言,基本杜絕了因線程池溢出導(dǎo)致的接口不可用
- 作為一個輕量插件硬贯,通過極小的投入獲得了還不錯的收益,我們需要挖掘類似的高性價比的技術(shù)解決方案陨收,不斷提高研發(fā)團(tuán)隊的效能饭豹。
未來我們會考慮進(jìn)行后續(xù)版本的迭代,嘗試加入以下功能:
- 支持MySQL配置或其它配置中心
- 提供配置管理Web界面
- 提供可以獨立部署的線程池監(jiān)控Portal务漩,不依賴CAT