前言
Dubbo的線程模型中可使用4種線程池
- CachedThreadPool
- LimitedThreadPool
- FixedThreadPool
- EagerThreadPool
想深入了解線程池原理的同學渣蜗,可以閱讀我的線程池那些事專欄酥宴。
在Dubbo中什么時候會用到線程池
我們的線程主要執(zhí)行2種邏輯,一是普通IO事件宽气,比如建立連接俄认,斷開連接,二是請求IO事件,執(zhí)行業(yè)務(wù)邏輯共屈。
在Dubbo的Dispatcher擴展點會使用到這些線程池,Dispatcher這個擴展點用于決定Netty ChannelHandler中的那些事件在Dubbo提供的線程池中執(zhí)行党窜。
源碼解析
CachedThreadPool
public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
緩沖線程池拗引,默認配置如下
配置 | 配置值 |
---|---|
corePoolSize | 0 |
maximumPoolSize | Integer.MAX_VALUE |
keepAliveTime | 60s |
workQueue | 根據(jù)queue決定是SynchronousQueue還是LinkedBlockingQueue,默認queue=0幌衣,所以是SynchronousQueue |
threadFactory | NamedInternalThreadFactory |
rejectHandler | AbortPolicyWithReport |
就默認配置來看矾削,和Executors創(chuàng)建的差不多,存在內(nèi)存溢出風險。
NamedInternalThreadFactory主要用于修改線程名哼凯,方便我們排查問題欲间。
AbortPolicyWithReport對拒絕的任務(wù)打印日志,也是方便排查問題断部。
LimitedThreadPool
public class LimitedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
配置 | 配置值 |
---|---|
corePoolSize | 0 |
maximumPoolSize | 200 |
keepAliveTime | Long.MAX_VALUE,相當于無限長 |
workQueue | 根據(jù)queue決定是SynchronousQueue還是LinkedBlockingQueue猎贴,默認queue=0,所以是SynchronousQueue |
threadFactory | NamedInternalThreadFactory |
rejectHandler | AbortPolicyWithReport |
從keepAliveTime的配置可以看出來家坎,LimitedThreadPool線程池的特性是線程數(shù)只會增加不會減少嘱能。
FixedThreadPool
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
配置 | 配置值 |
---|---|
corePoolSize | 200 |
maximumPoolSize | 200 |
keepAliveTime | 0 |
workQueue | 根據(jù)queue決定是SynchronousQueue還是LinkedBlockingQueue,默認queue=0虱疏,所以是SynchronousQueue |
threadFactory | NamedInternalThreadFactory |
rejectHandler | AbortPolicyWithReport |
Dubbo的默認線程池惹骂,固定200個線程,就配置來看和LimitedThreadPool基本一致做瞪。
如果一定要說區(qū)別对粪,那就是FixedThreadPool等到創(chuàng)建完200個線程,再往隊列放任務(wù)装蓬。而LimitedThreadPool是先放隊列放任務(wù)著拭,放滿了之后才創(chuàng)建線程。
EagerThreadPool
public class EagerThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
// init queue and executor
TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedInternalThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
return executor;
}
}
配置 | 配置值 |
---|---|
corePoolSize | 0 |
maximumPoolSize | Integer.MAX_VALUE |
keepAliveTime | 60s |
workQueue | 自定義實現(xiàn)TaskQueue牍帚,默認長度為1儡遮,使用時要自己配置下 |
threadFactory | NamedInternalThreadFactory |
rejectHandler | AbortPolicyWithReport |
我們知道,當線程數(shù)量達到corePoolSize之后暗赶,只有當workqueue滿了之后鄙币,才會增加工作線程。
這個線程池就是對這個特性做了優(yōu)化蹂随,首先繼承ThreadPoolExecutor實現(xiàn)EagerThreadPoolExecutor十嘿,對當前線程池提交的任務(wù)數(shù)submittedTaskCount進行記錄。
其次是通過自定義TaskQueue作為workQueue岳锁,它會在提交任務(wù)時判斷是否currentPoolSize<submittedTaskCount<maxPoolSize绩衷,然后通過workQueue的offer方法返回false導致增加工作線程。
為什么返回false會增加工作線程激率,我們回顧下ThreadPoolExecutor的execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//在這一步如果offer方法返回false咳燕,那么會進入到下一個else分支判斷
//如果這個時候當前工作線程數(shù)沒有達到上限,那么就會增加一個工作線程
//對于普通的workQueue在沒有滿的情況下是不會返回false的
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
然后看下TaskQueue的offer方法邏輯
public boolean offer(Runnable runnable) {
//TaskQueue持有executor引用乒躺,用于獲取當前提交任務(wù)數(shù)
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
int currentPoolThreadSize = executor.getPoolSize();
//如果提交任務(wù)數(shù)小于當前工作線程數(shù)迟郎,說明當前工作線程足夠處理任務(wù),將提交的任務(wù)插入到工作隊列
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
//如果提交任務(wù)數(shù)大于當前工作線程數(shù)并且小于最大線程數(shù)聪蘸,說明提交的任務(wù)量線程已經(jīng)處理不過來,那么需要增加線程數(shù),返回false
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
//工作線程數(shù)到達最大線程數(shù)健爬,插入到workqueue
return super.offer(runnable);
}
最后看下EagerThreadPoolExecutor是如何統(tǒng)計已提交的任務(wù)
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
//任務(wù)執(zhí)行完畢后控乾,submittedTaskCount--
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
//執(zhí)行任務(wù)前,submittedTaskCount++
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
//失敗嘗試重新加入到workqueue
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
//如果重新加入失敗娜遵,那么拋出異常蜕衡,并且統(tǒng)計數(shù)-1
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
}
}
最后
一般來講使用Dubbo的默認配置,我們公司的業(yè)務(wù)量還沒到需要對線程池進行特殊配置的地步设拟。本文主要目的是慨仿,通過一個成熟框架對線程池的配置點,指導我們在實際使用線程池中需要注意的點纳胧。