開篇
?這篇文章的目的主要是分析下Dubbo當(dāng)中關(guān)于線程池的策略和線程模型浮还,主要從源碼角度出發(fā)并結(jié)合網(wǎng)上一些現(xiàn)成的文章來進(jìn)行闡述。
?個(gè)人閱讀過源碼以后的第一感覺就是成熟的框架也是靠使用了基礎(chǔ)的線程池來實(shí)現(xiàn)的闽巩,唯一的遺憾就是這篇文章沒有理順dubbo的配置和真正線程池參數(shù)之間的關(guān)系碑定,這個(gè)后面再補(bǔ)充一篇類似的文章流码。
Dubbo線程池策略
resources目錄下的com.alibaba.dubbo.common.threadpool.ThreadPool的文件
fixed=com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool
cached=com.alibaba.dubbo.common.threadpool.support.cached.CachedThreadPool
limited=com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool
說明:
- dubbo的線程池策略通過SPI配置文件對(duì)外提供,在com.alibaba.dubbo.common.threadpool.ThreadPool文件當(dāng)中定義延刘。
- dubbo的線程池策略對(duì)外提供了三種策略漫试,分別是fixed、cached碘赖、limited三類驾荣。
- 每類策略的定義見下述源碼。
CachedThreadPool
public class CachedThreadPool implements ThreadPool {
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
說明:
- CachedThreadPool的實(shí)現(xiàn)可以理解為JDK當(dāng)中Executors.newCachedThreadPool()方法普泡。
FixedThreadPool
public class FixedThreadPool implements ThreadPool {
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
說明:
- FixedThreadPool的實(shí)現(xiàn)可以理解為JDK當(dāng)中Executors.newFixedThreadPool()方法播掷。
LimitedThreadPool
public class LimitedThreadPool implements ThreadPool {
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
說明:
- LimitedThreadPool的實(shí)現(xiàn)可以理解為JDK當(dāng)中Executors.newCachedThreadPool()方法。
- LimitedThreadPool的區(qū)別在于線程池中的線程永遠(yuǎn)不會(huì)過期撼班,因?yàn)閍live時(shí)間為最大值歧匈。
NamedThreadFactory
public class NamedThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
private final AtomicInteger mThreadNum = new AtomicInteger(1);
private final String mPrefix;
private final boolean mDaemon;
private final ThreadGroup mGroup;
public NamedThreadFactory() {
this("pool-" + POOL_SEQ.getAndIncrement(), false);
}
public NamedThreadFactory(String prefix) {
this(prefix, false);
}
public NamedThreadFactory(String prefix, boolean daemon) {
mPrefix = prefix + "-thread-";
mDaemon = daemon;
SecurityManager s = System.getSecurityManager();
mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
}
public Thread newThread(Runnable runnable) {
String name = mPrefix + mThreadNum.getAndIncrement();
Thread ret = new Thread(mGroup, runnable, name, 0);
ret.setDaemon(mDaemon);
return ret;
}
public ThreadGroup getThreadGroup() {
return mGroup;
}
}
說明:
- 提供了線程池中線程創(chuàng)建的工廠,推薦使用線程池時(shí)候一定要自定義線程池工廠砰嘁,便于定位線程的用途件炉。
Dubbo線程模型
resources目錄下的com.alibaba.dubbo.remoting.Dispatcher文件
all=com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=com.alibaba.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=com.alibaba.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=com.alibaba.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=com.alibaba.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher
說明:
- Dubbo線程模型在resources目錄下的com.alibaba.dubbo.remoting.Dispatcher文件當(dāng)中。
- 提供5類線程模型矮湘,分別是all斟冕、direct、message缅阳、execution磕蛇、connection。
- 每類的用途見下面的介紹十办。
Dubbo線程模型對(duì)比
根據(jù)請(qǐng)求的消息類被IO線程處理還是被業(yè)務(wù)線程池處理秀撇,Dubbo提供了下面幾種線程模型:
- all : (AllDispatcher類)所有消息都派發(fā)到業(yè)務(wù)線程池,這些消息包括請(qǐng)求/響應(yīng)/連接事件/斷開事件/心跳等向族,這些線程模型如下圖:
AllDispatcher
- direct : (DirectDispacher類)所有消息都不派發(fā)到業(yè)務(wù)線程池呵燕,全部在IO線程上直接執(zhí)行,模型如下圖:
DirectDispacher
- message : (MessageOnlyDispatcher類)只有請(qǐng)求響應(yīng)消息派發(fā)到業(yè)務(wù)線程池炸枣,其他連接斷開事件/心跳等消息,直接在IO線程上執(zhí)行弄唧,模型圖如下:
MessageOnlyDispatcher
- execution:(ExecutionDispatcher類)只把請(qǐng)求類消息派發(fā)到業(yè)務(wù)線程池處理适肠,但是響應(yīng)和其它連接斷開事件,心跳等消息直接在IO線程上執(zhí)行候引,模型如下圖:
execution
- connection:(ConnectionOrderedDispatcher類)在IO線程上侯养,將連接斷開事件放入隊(duì)列,有序逐個(gè)執(zhí)行澄干,其它消息派發(fā)到業(yè)務(wù)線程池處理逛揩,模型如下圖:
ConnectionOrderedDispatcher
AllDispatcher
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
}
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
public void connected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {}
}
public void disconnected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {}
}
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {}
}
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {}
}
private ExecutorService getExecutorService() {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
return cexecutor;
}
}
說明:
- AllChannelHandler的connected、disconnected、received統(tǒng)一通過業(yè)務(wù)線程池處理采章。
- cexecutor.execute(new ChannelEventRunnable())統(tǒng)一提交葛菇。
ConnectionOrderedDispatcher
public class ConnectionOrderedDispatcher implements Dispatcher {
public static final String NAME = "connection";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new ConnectionOrderedChannelHandler(handler, url);
}
}
public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
protected final ThreadPoolExecutor connectionExecutor;
private final int queuewarninglimit;
public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
connectionExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
new NamedThreadFactory(threadName, true),
new AbortPolicyWithReport(threadName, url)
); // FIXME There's no place to release connectionExecutor!
queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
}
public void connected(Channel channel) throws RemotingException {
try {
checkQueueLength();
connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
} catch (Throwable t) {}
}
public void disconnected(Channel channel) throws RemotingException {
try {
checkQueueLength();
connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
} catch (Throwable t) {}
}
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {}
}
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
} catch (Throwable t) {}
}
private void checkQueueLength() {
if (connectionExecutor.getQueue().size() > queuewarninglimit) {
logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));
}
}
}
說明:
- ConnectionOrderedDispatcher的connected和disconnected事件通過connectionExecutor實(shí)現(xiàn)。
- ConnectionOrderedDispatcher的received事件通過單獨(dú)的executor去實(shí)現(xiàn)逞泄。
- ConnectionOrderedDispatcher的連接處理和消息處理通過不同的executor處理患整。
DirectDispatcher
public class DirectDispatcher implements Dispatcher {
public static final String NAME = "direct";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return handler;
}
}
說明:
- DirectDispatcher內(nèi)部的處理沒有用到線程池,統(tǒng)一由IO線程去處理喷众。
ExecutionDispatcher
public class ExecutionDispatcher implements Dispatcher {
public static final String NAME = "execution";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new ExecutionChannelHandler(handler, url);
}
}
public class ExecutionChannelHandler extends WrappedChannelHandler {
public ExecutionChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
public void connected(Channel channel) throws RemotingException {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
}
public void disconnected(Channel channel) throws RemotingException {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
}
public void received(Channel channel, Object message) throws RemotingException {
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {}
}
public void caught(Channel channel, Throwable exception) throws RemotingException {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
}
}
說明:
- ExecutionDispatcher的實(shí)現(xiàn)和AllDispatcher幾乎相同各谚。
- ExecutionDispatcher的連接和消息處理統(tǒng)一由業(yè)務(wù)線程池處理。
MessageOnlyDispatcher
public class MessageOnlyDispatcher implements Dispatcher {
public static final String NAME = "message";
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new MessageOnlyChannelHandler(handler, url);
}
}
public class MessageOnlyChannelHandler extends WrappedChannelHandler {
public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {}
}
}
說明:
- MessageOnlyDispatcher的消息處理通過業(yè)務(wù)線程池去執(zhí)行到千。
- MessageOnlyDispatcher的連接事件通過IO線程去執(zhí)行昌渤。