DUBBO線程模型
從官方描述來看dubbo線程模型支持業(yè)務(wù)線程和I/O線程分離,并且提供5種不同的調(diào)度策略忆谓。
拿Netty組件為例(Netty4x),在NettyServer的構(gòu)造方法中通過ChannelHandlers#wrap方法設(shè)置MultiMessageHandler,HeartbeatHandler并通過SPI擴(kuò)展選擇調(diào)度策略。
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); // 線程派發(fā)
}
ChannelHandlers#wrapInternal
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
// 選擇調(diào)度策略 默認(rèn)是all
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url))); //
}
接下來看下NettyServer#doOpen方法 显沈,主要設(shè)置Netty的boss線程池?cái)?shù)量為1,worker線程池(也就是I/O線程)為cpu核心數(shù)+1和向Netty中注測(cè)Handler用于消息的編解碼和處理。
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
// 多線程模型
// boss線程池撕瞧,負(fù)責(zé)和消費(fèi)者建立新的連接
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
// worker線程池罐孝,負(fù)責(zé)連接的數(shù)據(jù)交換
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) // nagele 算法
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)// TIME_WAIT
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //內(nèi)存池
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder()) //設(shè)置編解碼器
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind 端口
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
可以看出呐馆,如果我們?cè)谝粋€(gè)JVM進(jìn)程只暴露一個(gè)Dubbo服務(wù)端口,那么一個(gè)JVM進(jìn)程只會(huì)有一個(gè)NettyServer實(shí)例莲兢,也會(huì)只有一個(gè)NettyHandler實(shí)例 汹来。從上面代碼也可以看出,Dubbo在Netty的Pipeline中只注冊(cè)了三個(gè)Handler改艇,而Dubbo內(nèi)部也定義了一個(gè)ChannelHandler接口用來將和Channel相關(guān)的處理串起來收班,而第一個(gè)ChannelHandler就是由NettyHandler來調(diào)用的。有趣的是NettyServer本身也是一個(gè)ChannelHandler谒兄。當(dāng)Dubbo將Spring容器中的服務(wù)實(shí)例做了動(dòng)態(tài)代理的處理后摔桦,就會(huì)通過NettyServer#doOpen來暴露服務(wù)端口,再接著將服務(wù)注冊(cè)到注冊(cè)中心承疲。這些步驟做完后邻耕,Dubbo的消費(fèi)者就可以來和提供者建立連接了,當(dāng)然是消費(fèi)者來主動(dòng)建立連接纪隙,而提供者在初始化連接后會(huì)調(diào)用NettyHandler#channelActive方法來創(chuàng)建一個(gè)NettyChannel:
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.debug("channelActive <" + NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()) + ">" + " channle <" + ctx.channel());
//獲取或者創(chuàng)建一個(gè)NettyChannel
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
if (channel != null) {
// <ip:port, channel>
channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
}
// 這里的 handler就是NettyServer
handler.connected(channel);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
與Netty和Dubbo都有自己的ChannelHandler一樣赊豌,Netty和Dubbo也有著自己的Channel。該方法最后會(huì)調(diào)用NettyServer#connected方法來檢查新添加channel后是否會(huì)超出提供者配置的accepts配置绵咱,如果超出碘饼,則直接打印錯(cuò)誤日志并關(guān)閉該Channel,這樣的話消費(fèi)者端自然會(huì)收到連接中斷的異常信息悲伶,詳細(xì)可以見AbstractServer#connected方法艾恼。
public void connected(Channel ch) throws RemotingException {
// If the server has entered the shutdown process, reject any new connection
if (this.isClosing() || this.isClosed()) {
logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
ch.close();
return;
}
Collection<Channel> channels = getChannels();
//大于accepts的tcp連接直接關(guān)閉
if (accepts > 0 && channels.size() > accepts) {
logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
ch.close();
return;
}
super.connected(ch);
}
注意的是在dubbo中消費(fèi)者和提供者默認(rèn)只建立一個(gè)TCP長連接,為了增加消費(fèi)者調(diào)用服務(wù)提供者的吞吐量麸锉, 可以在消費(fèi)方增加如下配置:
<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService" connections="20"/>
而作為提供者可以使用accepts控制長連接的數(shù)量防止連接數(shù)量過多钠绍,配置如下:
<dubbo:protocol name="dubbo" port="20880" accepts="10"/>
當(dāng)連接建立完成后,消費(fèi)者就可以請(qǐng)求提供者的服務(wù)了花沉,當(dāng)請(qǐng)求到來柳爽,提供者這邊會(huì)依次經(jīng)過如下Handler的處理:
--->NettyServerHandler#channelRead接收請(qǐng)求消息
--- >AbstractPeer#received:如果服務(wù)已經(jīng)關(guān)閉,則返回碱屁,否則調(diào)用下一個(gè)Handler來處理磷脯。
--->MultiMessageHandler#received:如果是批量請(qǐng)求,則依次對(duì)請(qǐng)求調(diào)用下一個(gè)Handler來處理娩脾。
? --->HeartbeatHandler#received: 處理心跳消息赵誓。
? --->AllChannelHandler#received:該Dubbo的Handler非常重要,因?yàn)閺倪@里是IO線程池和業(yè)務(wù)線程池的隔離。
? --->DecodeHandler#received: 消息解碼
? --->HeaderExchangeHandler #received :消息處理
? --->DubboProtocol : 遠(yuǎn)程調(diào)用
看下AllChannelHandler#received:
public void received(Channel channel, Object message) throws RemotingException {
// 獲取業(yè)務(wù)線程池
ExecutorService cexecutor = getExecutorService();
try {
// 使用線程池處理消息
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
這里對(duì)execute進(jìn)行了異常捕獲俩功,這是因?yàn)镮/O線程池是無界的幻枉,但業(yè)務(wù)線程池可能是有界的,所以進(jìn)行execute提交可能會(huì)遇到RejectedExecutionException異常 诡蜓。
那么這里是如何獲取到業(yè)務(wù)線程池的那熬甫?實(shí)際上WrappedChannelHandler是xxxChannelHandlerd的裝飾類,根據(jù)dubbo SPI可以知道万牺,獲取AllChannelHandler會(huì)首先實(shí)例化WrappedChannelHandler罗珍。
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
// 獲取業(yè)務(wù)線程池
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
componentKey = Constants.CONSUMER_SIDE;
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
根據(jù)上面的代碼可以看出dubbo業(yè)務(wù)線程池是在WrappedChannelHandler實(shí)例化的時(shí)候獲取的。
接下來我們要看下dubbo的業(yè)務(wù)線程池模型脚粟,先上一個(gè)官方描述:
- FixedThreadPool:
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// 線程池名稱DubboServerHanler-server:port
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 缺省線程數(shù)量200
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
// 任務(wù)隊(duì)列類型
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
缺省情況下使用200個(gè)線程和SynchronousQueue這意味著如果如果線程池所有線程都在工作再有新任務(wù)會(huì)直接拒絕覆旱。
-
CachedThreadPool:
public class CachedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 核心線程數(shù)量 缺省為0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); // 最大線程數(shù)量 缺省為Integer.MAX_VALUE int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); // queue 缺省為0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); // 空閑線程存活時(shí)間 int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
緩存線程池,可以看出如果提交任務(wù)的速度大于maxThreads將會(huì)不斷創(chuàng)建線程核无,極端條件下將會(huì)耗盡CPU和內(nèi)存資源扣唱。在突發(fā)大流量進(jìn)入時(shí)不適合使用。
-
LimitedThreadPool:
public class LimitedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 缺省核心線程數(shù)量為0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); // 缺省最大線程數(shù)量200 int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); // 任務(wù)隊(duì)列缺省0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
不配置的話和FixedThreadPool沒有區(qū)別
-
EagerThreadPool :
public class EagerThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); // Integer.MAX_VALUE int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); // 0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); // 60s int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); // init queue and executor // 初始任務(wù)隊(duì)列為1 TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues); EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, taskQueue, new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); taskQueue.setExecutor(executor); return executor; } }
EagerThreadPoolExecutor:
public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } // do not increment in method beforeExecute! //已提交任務(wù)數(shù)量 submittedTaskCount.incrementAndGet(); try { super.execute(command); } catch (RejectedExecutionException rx) { //大于最大線程數(shù)被拒絕任務(wù) 重新添加到任務(wù)隊(duì)列 // retry to offer the task into queue. final TaskQueue queue = (TaskQueue) super.getQueue(); try { if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException("Queue capacity is full.", rx); } } catch (InterruptedException x) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException(x); } } catch (Throwable t) { // decrease any way submittedTaskCount.decrementAndGet(); throw t; } }
TaskQueue:
public boolean offer(Runnable runnable) { if (executor == null) { throw new RejectedExecutionException("The task queue does not have executor!"); } // 獲取當(dāng)前線程池中的線程數(shù)量 int currentPoolThreadSize = executor.getPoolSize(); // have free worker. put task into queue to let the worker deal with task. // 如果已經(jīng)提交的任務(wù)數(shù)量小于當(dāng)前線程池中線程數(shù)量(不是很理解這里的操作) if (executor.getSubmittedTaskCount() < currentPoolThreadSize) { return super.offer(runnable); } // return false to let executor create new worker. //當(dāng)前線程數(shù)小于最大線程程數(shù)直接創(chuàng)建新worker if (currentPoolThreadSize < executor.getMaximumPoolSize()) { return false; } // currentPoolThreadSize >= max return super.offer(runnable); }
優(yōu)先創(chuàng)建
Worker
線程池团南。在任務(wù)數(shù)量大于corePoolSize
但是小于maximumPoolSize
時(shí)噪沙,優(yōu)先創(chuàng)建Worker
來處理任務(wù)。當(dāng)任務(wù)數(shù)量大于maximumPoolSize
時(shí)吐根,將任務(wù)放入阻塞隊(duì)列中正歼。阻塞隊(duì)列充滿時(shí)拋出RejectedExecutionException
。(相比于cached
:cached
在任務(wù)數(shù)量超過maximumPoolSize
時(shí)直接拋出異常而不是將任務(wù)放入阻塞隊(duì)列)
根據(jù)以上的代碼分析拷橘,如果消費(fèi)者的請(qǐng)求過快很有可能導(dǎo)致服務(wù)提供者業(yè)務(wù)線程池拋出RejectedExecutionException
異常局义。這個(gè)異常是duboo的采用的線程拒絕策略AbortPolicyWithReport#rejectedExecution拋出的,并且會(huì)被反饋到消費(fèi)端,此時(shí)簡(jiǎn)單的解決辦法就是將提供者的服務(wù)調(diào)用線程池?cái)?shù)目調(diào)大點(diǎn)冗疮,例如如下配置:
<dubbo:provider threads="500"/>
或
<dubbo:protocol name="dubbo" port="20882" accepts="10" threads="500"/>
我們?yōu)榱吮WC模塊內(nèi)的主要服務(wù)有線程可用(防止次要服務(wù)搶占過多服務(wù)調(diào)用線程)萄唇,可以對(duì)次要服務(wù)進(jìn)行并發(fā)限制,例如:
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" executes="100"/>
回過頭來再看下dubbo得Dispatcher 策略默認(rèn)是all术幔,實(shí)際上比較好的處理方式是I/O線程和業(yè)務(wù)線程分離另萤,所以采取message是比較好得配置。并且如果采用all如果使用的dubo版本比較低很有可能會(huì)觸發(fā)dubbo的bug诅挑。
原因請(qǐng)參見:全部請(qǐng)求都使用業(yè)務(wù)線程池的問題
在dubbo重新維護(hù)之后這個(gè)bug已經(jīng)被修復(fù):
public void received(Channel channel, Object message) throws RemotingException {
// 獲取業(yè)務(wù)線程池
ExecutorService cexecutor = getExecutorService();
try {
// 使用線程池處理消息
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
//fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
如果是RejectedExecutionException異常直接返回給消費(fèi)者四敞。
建議的配置是:
<dubbo:protocol name="dubbo" port="8888" threads="500" dispatcher="message" />