本系列參考官網(wǎng)文檔统求、芋道源碼的源碼解讀和《深入理解Apache Dubbo與實戰(zhàn)》一書乘碑。Dubbo版本為2.6.1
這篇文章是為了理清Dubbo
里的線程模型挖息、Handler
機制,順便解釋在服務(wù)暴露時候最后遺留下來的問題兽肤,即在最后的DubboProtocol#createServer(url)
方法里調(diào)用了Exchangers.bind(url, requestHandler)
做了什么套腹。
我們將討論當收到請求的時候是怎么從Netty轉(zhuǎn)到Dubbo的邏輯來的,再介紹Handler的調(diào)用鏈路轿衔,并且分析如何將解碼這一IO操作Dubbo的業(yè)務(wù)線程池做的
文章內(nèi)容順序:
1. Dubbo的線程模型
1.1 Dubbo的線程模型調(diào)用流程
1.2 Netty線程模型
1.3 Dubbo中的線程池
1.4 那么什么業(yè)務(wù)會由Dubbo自己的線程池來實現(xiàn)呢沉迹?
2. Handler的包裝流程
2.1 Handler包裝圖解和描述
2.2 創(chuàng)建一個requestHandler
2.3 Exchange層做兩次包裝new DecodeHandler(new HeaderExchangeHandler(requestHandler))
2.4 HeaderExchangeHandler#received
2.5 DecodeHandler#received
2.6 Transporters#bind
2.6.1 ChannelHandlerDispatcher#received
2.7 NettyTransporter(netty3)
2.8 NettyServer構(gòu)造方法
2.9 ChannelHandlers#wrap 又進行三層包裝
2.10 AllChannelHandler 與Dubbo線程池耦合
2.10.1 ChannelEventRunnable 線程的執(zhí)行邏輯
2.11 HeartbeatHandler 處理心跳請求
2.12 MultiMessageHandler 處理批量請求
2.13 NettyServer
2.14 NettyServer#doOpen創(chuàng)建Netty的執(zhí)行鏈睦疫,包裝到NettyHandler里
2.15 NettyHandler做了什么害驹,為什么要封裝?
3. Handler的調(diào)用順序
4. 需要注意的幾個類
5. Dubbo是怎么用到Netty的
1. Dubbo的線程模型模型
首先來講講Dubbo的線程模型蛤育,可以看下這篇博客的介紹宛官,下面的圖也是來此博客。
dubbo線程模型
1.1Dubbo的線程調(diào)用流程
image.png
客戶端的主線程發(fā)出一個請求后獲得future
瓦糕,在執(zhí)行get
時進行阻塞等待底洗;
服務(wù)端使用worker線程
(netty通信模型)接收到請求后,將請求提交到server線程池
(Dubbo線程池)中進行處理
server
線程處理完成之后咕娄,將相應(yīng)結(jié)果返回給客戶端的worker
線程池(netty通信模型)亥揖,最后,worker線程
將響應(yīng)結(jié)果提交到client線程池
進行處理
client線程將
響應(yīng)結(jié)果填充到future
中,然后喚醒等待的主線程费变,主線程獲取結(jié)果摧扇,返回給客戶端
這邊再簡單概括下博客的內(nèi)容:
1.2Netty線程模型
Dubbo使用netty作為網(wǎng)絡(luò)傳輸框架,所以我們先來簡單了解下Netty挚歧,下圖為Netty的線程模型扛稽。
Netty
中存在兩種線程:boss線程
和worker線程
。
boss線程
的作用:
accept客戶端的連接滑负;將接收到的連接注冊到一個worker線程上
個數(shù):通常情況下在张,服務(wù)端每綁定一個端口,開啟一個boss線程
worker線程
的作用:
處理注冊在其身上的連接connection上的各種io事件
個數(shù):默認是核數(shù)+1
注意:
一個worker線程可以注冊多個connection
一個connection只能注冊在一個worker線程上
1.3Dubbo中的線程池
為了配合worker線程工作矮慕,在Dubbo中還實現(xiàn)了自己的線程池來執(zhí)行各種業(yè)務(wù)帮匾。
Dubbo擴展接口 ThreadPool 的SPI實現(xiàn)有如下幾種:
- fixed:固定大小線程池,啟動時建立線程痴鳄,不關(guān)閉辟狈,一直持有(默認實現(xiàn))。
coresize:200
maxsize:200
隊列:SynchronousQueue
回絕策略:AbortPolicyWithReport - 打印線程信息jstack夏跷,之后拋出異常- cached:緩存線程池哼转,空閑一分鐘自動刪除,需要時重建槽华。
- limited:可伸縮線程池壹蔓,但池中的線程數(shù)只會增長不會收縮。只增長不收縮的目的是為了避免收縮時突然帶來大流量引起性能問題猫态。
1.4那么什么業(yè)務(wù)會由Dubbo自己的線程池來實現(xiàn)呢佣蓉?
有5種派發(fā)策略:
- 默認是all:所有消息都派發(fā)到Dubbo線程池,包括請求亲雪,響應(yīng)勇凭,連接事件,斷開事件义辕,心跳等虾标。 即worker線程接收到事件后,將該事件提交到業(yè)務(wù)線程池中灌砖,自己再去處理其他事璧函。
- direct:worker線程接收到事件后,由worker執(zhí)行到底基显。
- message:只有請求響應(yīng)消息派發(fā)到Dubbo線程池蘸吓,其它連接斷開事件,心跳等消息撩幽,直接在 IO線程上執(zhí)行
- execution:只請求消息派發(fā)到Dubbo線程池库继,不含響應(yīng)(客戶端線程池),響應(yīng)和其它連接斷開事件,心跳等消息宪萄,直接在 IO 線程上執(zhí)行
- connection:在 IO 線程上舅桩,將連接斷開事件放入隊列,有序逐個執(zhí)行雨膨,其它消息派發(fā)到Dubbo線程池擂涛。
image.png
如圖所示切揭,各個策略的實現(xiàn)也很簡單从橘,直接返回了一個對應(yīng)名稱的XXXChannelHandler
,說明具體代碼邏輯在這個Handler
里面實現(xiàn)运褪。
以上就是基本的Dubbo線程模型了排监。
2. Handler的包裝流程
接下來來講講Dubbo里的Handler狰右,會一步步來分析他是怎么包裝起來的。
參考如下:dubbo的handler機制
2.1 Handler包裝圖解和描述
這是使用
Dubbo Protocol
并且使用Netty
作為服務(wù)器的情況下Handler
的整個包裝過程
image.png上圖是來自參考鏈接的圖的一部分舆床,他的
Transport
層部分畫錯了棋蚌,我就自己畫了一個,如下圖挨队,比較簡陋谷暮,下圖就是完整的handler
包裝。(我也知道你們喜歡彩色的圖啊盛垦,可惜我懶)
image.png
1.在
DubboProtocol
中構(gòu)建ExchangeHandler
命名requestHandler
2.在
Exchange
層做兩次包裝new DecodeHandler(new HeaderExchangeHandler(requestHandler))
湿弦,具體參考類:HeaderExchanger
? ① 使用HeaderExchangeHandler
做一次包裝,HeaderExchangeHandler
的作用是實現(xiàn)了Reques
t和Response
的概念腾夯,當接到received
請求后颊埃,將請求轉(zhuǎn)為reply
。請參考類HeaderExchangeHandler
蝶俱,
? ② 使用DecodeHandler
做一次包裝班利,DecodeHandler
的作用是用來對Request Message
和Response Message
做解碼操作,解碼完成后才能給HeaderExchangeHandler
使用榨呆。3.在
Exchange
層包裝后的Handler
會被傳遞到Transporter
層(NettyTransporter
)并且把類型轉(zhuǎn)換成ChannelHandler
罗标,因為ChannelHandler
更為抽象。4.
Handler
在Transporter
層流轉(zhuǎn)愕提,會被傳遞到NettyServer
中5.在
NettyServer
中被AllChannelHandler
包裝馒稍,其作用是把NettyServer
接收到的請求轉(zhuǎn)移給Transporter
層的線程池來處理皿哨。同步轉(zhuǎn)異步浅侨。6.接著就先被
HeartbeatHandler
包裝用以處理心跳請求,接著被MultiMessageHandler
包裝用以處理批量請求7.這個
MultiMessageHandler
會在NettyServer
中以構(gòu)造函數(shù)的方式注入進來8.
NettyServer
被再次NettyHandler
包裝证膨,NettyHandler
的父類是SimpleChannelHandler
如输。它屬于Netty
的Handler
。由Netty
來管理和調(diào)用其中的回調(diào)方法。Netty
在接受到channelActive
不见,channelRead
等方法后澳化,會把請求轉(zhuǎn)移給Dubbo
的Handler
,這樣每當請求過來稳吮,Netty
的Handler
接到請求就立馬把數(shù)據(jù)和相關(guān)信息轉(zhuǎn)交給Dubbo
的Handler
缎谷,由Dubbo
的Handler
來管理了。
上面是簡單的概括灶似,我們來一個個上代碼
2.2創(chuàng)建一個requestHandler
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
// 獲得請求對應(yīng)的 Invoker 對象
Invoker<?> invoker = getInvoker(channel, inv);
// 如果是callback 需要處理高版本調(diào)用低版本的問題
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
// 設(shè)置調(diào)用方的地址
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 執(zhí)行調(diào)用
return invoker.invoke(inv);
}
throw new RemotingException(channel, message.getClass().getName() + ": " + message
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
this.reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
@Override
public void connected(Channel channel) {
this.invoke(channel, Constants.ON_CONNECT_KEY);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
if (logger.isInfoEnabled()) {
logger.info("disconected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
}
this.invoke(channel, Constants.ON_DISCONNECT_KEY);
}
/**
* 調(diào)用方法
*
* @param channel 通道
* @param methodKey 方法名
*/
private void invoke(Channel channel, String methodKey) {
// 創(chuàng)建 Invocation 對象
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
// 調(diào)用 received 方法列林,執(zhí)行對應(yīng)的方法
if (invocation != null) {
try {
this.received(channel, invocation);
} catch (Throwable t) {
logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
}
}
}
private Invocation createInvocation(Channel channel, URL url, String methodKey) {
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
invocation.setAttachment(Constants.PATH_KEY, url.getPath());
invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
};
他實現(xiàn)了ChannelHandler
接口5個關(guān)鍵方法,連接酪惭,斷開連接希痴,發(fā)送消息,接受消息和異常處理方法春感。也是rpc調(diào)用的常用處理方法砌创。 同時也是線程派發(fā)處理關(guān)注的方法。
而requestHandler
第一次用到鲫懒,也就是在服務(wù)暴露那篇最后提的DubboProtocol#Exchangers.bind(url, requestHandler)
方法中
2.3 Exchange層做兩次包裝new DecodeHandler(new HeaderExchangeHandler(requestHandler))
進去后發(fā)現(xiàn)
Exchangers
是個門面類嫩实,調(diào)用的是HeaderExchanger#bind
方法,事實上也只有HeaderExchanger
這一個實現(xiàn)窥岩。如下圖
image.png
image.png
接著來看HeaderExchanger#bind
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
注意舶赔,這里我們的requestHandler
直接被包裝了兩層,對應(yīng)的是最上面圖里的Exchange層的包裝谦秧,從里到外看看這兩個包裝的Handler
做了什么(主要看他的received
方法)
2.4 HeaderExchangeHandler#received
/**
* ExchangeReceiver
*
* 基于消息頭部( Header )的信息交換處理器實現(xiàn)類
*/
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
public void received(Channel channel, Object message) throws RemotingException {
// 設(shè)置最后的讀時間
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
// 創(chuàng)建 ExchangeChannel 對象
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
// 處理請求( Request )
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
// 處理事件請求
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
// 處理普通請求竟纳,判斷是否要響應(yīng)(即雙向通信)
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
// 將調(diào)用結(jié)果返回給服務(wù)消費端
channel.send(response);
// 如果是單向通信,僅向后調(diào)用指定服務(wù)即可疚鲤,無需返回調(diào)用結(jié)果
} else {
handler.received(exchangeChannel, request.getData());
}
}
// 處理響應(yīng)( Response )
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
// 處理 String
} else if (message instanceof String) {
// 客戶端側(cè)锥累,不支持 String
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
// 服務(wù)端側(cè),目前是 telnet 命令
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
// 提交給裝飾的 `handler`集歇,繼續(xù)處理
} else {
handler.received(exchangeChannel, message);
}
} finally {
// 移除 ExchangeChannel 對象桶略,若已斷開
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
//省略其他代碼
}
代碼注釋已經(jīng)比較清晰,簡單來說就是會辨別收到的message
是服務(wù)端收到的Request還是消費端發(fā)起請求后得到的Reponse
進行一系列業(yè)務(wù)判斷操作(比如對于不同的請求诲宇,有事件請求际歼、需要響應(yīng)的和不需要響應(yīng)的,都有不同的執(zhí)行邏輯)
如果需要響應(yīng)還會傳到更里面一層也就是我們的requestHandler
執(zhí)行received
方法
再來看看在更外面一層的DecodeHandler
2.5DecodeHandler#received
public class DecodeHandler extends AbstractChannelHandlerDelegate {
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
// 對 Decodeable 接口實現(xiàn)類對象進行解碼
decode(message);
}
if (message instanceof Request) {
// 對 Request 的 data 字段進行解碼
decode(((Request) message).getData());
}
if (message instanceof Response) {
// 對 Request 的 result 字段進行解碼
decode(((Response) message).getResult());
}
// 執(zhí)行后續(xù)邏輯
handler.received(channel, message);
}
private void decode(Object message) {
if (message != null && message instanceof Decodeable) {
try {
((Decodeable) message).decode(); // 解析消息
if (log.isDebugEnabled()) {
log.debug(new StringBuilder(32).append("Decode decodeable message ").append(message.getClass().getName()).toString());
}
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn(new StringBuilder(32).append("Call Decodeable.decode failed: ").append(e.getMessage()).toString(), e);
}
} // ~ end of catch
} // ~ end of if
} // ~ end of method decode
//省略其他代碼
}
注意看這個
received
中的方法姑蓝,很明顯這個DecodeHandler
就是用來解碼的鹅心,根據(jù)不同的message類型,執(zhí)行不同的邏輯纺荧,,這里的if (message instanceof Decodeable)
判斷下的代碼就是我們解碼篇提到的交由業(yè)務(wù)線程池來執(zhí)行解碼操作旭愧。
每個Handler都會執(zhí)行到這一步颅筋,如果被解碼過自然不用再解碼了(在DecodeableRpcInvocation
或者DecodeableRpcResult
中會有是否解碼已完成的標志位),如果還未解碼输枯,當前線程就會進行解碼操作
接下來進入到Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
中的Transporters#bind
方法
2.6 Transporters#bind
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
// 創(chuàng)建 handler
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
// 創(chuàng)建 Server 對象
return getTransporter().bind(url, handler);
}
同樣是門面類议泵,會調(diào)用真正的Transporter
來執(zhí)行bind方法,
這里我們只傳了一個ChannelHandler
對象桃熄,所以直接到了rerun
先口,如果ChannelHandler
有多個的情況下,說明這些handler是同級的瞳收,new ChannelHandlerDispatcher(handlers)
每個實現(xiàn)的方法池充,都會循環(huán)調(diào)用 channelHandlers
的方法
2.6.1ChannelHandlerDispatcher#received
如下面的代碼例子所示:
public class ChannelHandlerDispatcher implements ChannelHandler {
public void received(Channel channel, Object message) {
for (ChannelHandler listener : channelHandlers) {
try {
listener.received(channel, message);
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
//省略其他代碼
}
那我們便直接來到真正的
Transporter#bind
方法中一探究竟。
image.png這個Transporter有四種實現(xiàn)缎讼,其中又有
netty3
和netty4
收夸,默認實現(xiàn)為netty3
,我們就來看看netty3的實現(xiàn)吧
netty3的NettyTransporter 實現(xiàn)如下
2.7 NettyTransporter(netty3)
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
接著來看NettyServer
的實現(xiàn)
2.8 NettyServer構(gòu)造方法
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
}
又是一層對handler
的包裝血崭,調(diào)用了ChannelHandlers#wrap
方法卧惜,跟進去看看
這里注意一下!<腥摇咽瓷!我們先關(guān)注
ChannelHandlers#warp
的調(diào)用,這里的super
方法我們最后會分析舰讹。
2.9 ChannelHandlers#wrap 又進行三層包裝
public class ChannelHandlers {
/**
* 單例
*/
private static ChannelHandlers INSTANCE = new ChannelHandlers();
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(
new HeartbeatHandler(
ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)
)
);
}
//省略其他方法
}
wrap
方法直接調(diào)用了他自己實現(xiàn)的wrapInternal
方法茅姜,套了一層又一層。
注意ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url)
這一段代碼月匣,dispatch(handler, url)
前面很好理解钻洒,就是拿到Dispatcher
這個接口的擴展類,我們的默認擴展類是AllDispatcher
锄开,在前面已經(jīng)介紹過了素标,我們就以默認的類來繼續(xù)講,后面直接調(diào)用了這個AllDispatcher#dispatch
方法萍悴,這個我們前面也同樣介紹過头遭,再來簡單復(fù)習下下面的圖,就是直接把我們的handler又又又封裝了一層癣诱,封裝到了AllChannelHandler
中计维。那么就來看看這個AllChannelHandler
又做了什么吧
2.10 AllChannelHandler 與Dubbo線程池耦合
/**
* `all` 所有消息都派發(fā)到線程池,包括請求撕予,響應(yīng)剑按,連接事件艺蝴,斷開事件,心跳等。
*/
public class AllChannelHandler extends WrappedChannelHandler {
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
//用來解決線程池已滿后無法將異常信息發(fā)送到另一端的問題的臨時解決方案,仍然需要重構(gòu)
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
}
這個類還調(diào)用了父類的有參構(gòu)造,父類的構(gòu)造方法里面就是通過SPI機制拿到url后創(chuàng)建對應(yīng)的線程池(還記得我們Dubbo中的三種線程池不,默認是fixed
)疗杉,這里就不貼代碼了蚕礼。
還是來看看這個AllChannelHandler#received
方法,直接調(diào)用了線程池來執(zhí)行方法奠蹬。
2.10.1 ChannelEventRunnable 線程的執(zhí)行邏輯
熟悉線程池的小伙伴肯定猜出來了玖翅,這個執(zhí)行的ChannelEventRunnable
肯定是實現(xiàn)了Runnable接口的類了金度。再來進去看看他的執(zhí)行邏輯吧
public class ChannelEventRunnable implements Runnable {
public void run() {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
break;
case RECEIVED:
try {
handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
break;
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is: " + message + ", exception is " + exception, e);
}
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
//省略其他代碼
}
這里的線程池很明顯丢胚,用了case
語句來判斷到底執(zhí)行的我們傳入的包裝類DecodeHandler
里的什么方法。(AllChannelHandler里面的一層是DecodeHandler)
到了這里峡蟋,我們別忘了在ChannelHandlers#wrapInternal
還沒分析完,外面還有一層HeartbeatHandler
和一層MultiMessageHandler
赖舟。先來看HeartbeatHandler
2.11 HeartbeatHandler 處理心跳請求
public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
public void received(Channel channel, Object message) throws RemotingException {
// 設(shè)置最后的讀時間
setReadTimestamp(channel);
// 如果是心跳事件請求,返回心跳事件的響應(yīng)
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(Response.HEARTBEAT_EVENT);
channel.send(res);
if (logger.isInfoEnabled()) {
int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
if (logger.isDebugEnabled()) {
logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period"
+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
}
}
}
return;
}
// 如果是心跳事件響應(yīng)宾抓,返回
if (isHeartbeatResponse(message)) {
if (logger.isDebugEnabled()) {
logger.debug(new StringBuilder(32).append("Receive heartbeat response in thread ").append(Thread.currentThread().getName()).toString());
}
return;
}
// 提交給裝飾的 `handler`,繼續(xù)處理
handler.received(channel, message);
}
//省略其他代碼
}
可以看到HeartbeatHandler
對received
方法進行了處理痛单,所以心跳的消息的接受和發(fā)送是不會派發(fā)到Dubbo線程池
的。
接著是外面的那層MultiMessageHandler啦
2.12 MultiMessageHandler 處理批量請求
public class MultiMessageHandler extends AbstractChannelHandlerDelegate {
public MultiMessageHandler(ChannelHandler handler) {
super(handler);
}
@SuppressWarnings("unchecked")
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof MultiMessage) { // 多消息
MultiMessage list = (MultiMessage) message;
for (Object obj : list) {
handler.received(channel, obj);
}
} else {
handler.received(channel, message);
}
}
}
從他的名字就可以看出來主要是處理多消息的劲腿,主要完成多消息類型的循環(huán)解析接收。
這里來做下收尾:還記得我當時三個!:鲂佟!的NettyServer
構(gòu)造方法調(diào)用的父類構(gòu)造方法嘛袋倔,一起來看看批狐。
2.13 NettyServer
public abstract class AbstractServer extends AbstractEndpoint implements Server {
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
// 服務(wù)地址
localAddress = getUrl().toInetSocketAddress();
// 綁定地址
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = NetUtils.ANYHOST;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 服務(wù)器最大可接受連接數(shù)
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
// 空閑超時時間
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
// 開啟服務(wù)器
try {
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
// 獲得線程池
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
//省略其他代碼
}
這個AbstractServer
就是NettyServer
的父類,可以看到這個構(gòu)造方法中前塔,開啟服務(wù)器時調(diào)用了doOpen()
方法嚣艇,這是交由子類自己的實現(xiàn)的方法承冰。(注意,我們這里只關(guān)注handler
的傳遞過程食零,其他代碼的介紹就略過啦)
2.14 NettyServer#doOpen創(chuàng)建Netty的執(zhí)行鏈困乒,包裝到NettyHandler里
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
@Override
protected void doOpen() {
// 設(shè)置日志工廠
NettyHelper.setNettyLoggerFactory();
// 創(chuàng)建線程池
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
// 創(chuàng)建 ChannelFactory 對象
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
// 實例化 ServerBootstrap
bootstrap = new ServerBootstrap(channelFactory);
// 創(chuàng)建 NettyHandler 對象,著重注意;藕椤6パ唷凑保!
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
// 設(shè)置 `channels` 屬性
channels = nettyHandler.getChannels();
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
// 創(chuàng)建 NettyCodecAdapter 對象
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder()); // 解碼
pipeline.addLast("encoder", adapter.getEncoder()); // 解碼
pipeline.addLast("handler", nettyHandler); // 處理器冈爹,著重注意!E芬频伤!
return pipeline;
}
});
// 服務(wù)器綁定端口監(jiān)聽
// bind
channel = bootstrap.bind(getBindAddress());
}
//省略其他代碼
}
這個就是
NettyServer#doOpen()
的代碼啦,注意代碼打上著重注意Vゴ恕1镄ぁ!的兩行婚苹,
一行是final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
將這個NettyServer
又包裝進了NettyHandler
里岸更,
還有一行是pipeline.addLast("handler", nettyHandler);
,這是Netty的機制膊升,
可以參考詳細講解Netty中Pipeline責任鏈做一個簡單的了解怎炊,這里解釋了為什么收到消息后能調(diào)用到我們的Handler。
順帶一提廓译,NettyServer
其實也實現(xiàn)了ChannelHandler
评肆,他也算是個"handler",看下圖非区。
image.png
閑話至此瓜挽,我們來看下NettyHandler做了什么,為什么要封裝征绸?
2.15 NettyHandler做了什么久橙,為什么要封裝?
@Sharable
public class NettyHandler extends SimpleChannelHandler {
public NettyHandler(URL url, ChannelHandler handler) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.url = url;
this.handler = handler;
}
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
//省略其他代碼
}
這個
SimpleChannelHandler
是Netty
的接口管怠,說明這個NettyHandler
是Netty handler服務(wù)
與Dubbo handler職責
交接的地方淆衷,Netty
首先接收到信息后在內(nèi)部進行一連串調(diào)用,然后調(diào)用到Dubbo
自己實現(xiàn)的Netty接口的方法排惨,從此開始在Dubbo
中進行方法的調(diào)用吭敢。
再順帶一提,當我們debug
的時到NettyHandler#messageReceived
這行的時候暮芭,
這里的handler
一個個分解下去鹿驼,就是包裝的順序啦欲低,看下圖。
image.png
這里再再順便提一嘴畜晰,我們介紹的都是以Netty3
砾莱,如果換成Netty4
實現(xiàn)的話,最外面的handler類名
不是現(xiàn)在的NettyHandle
r而是NettyServerHandler
凄鼻。(一開始我迷糊了后來才知道所以在這一提腊瑟,可能會有跟我一樣迷糊的人呢)
3. Handler的調(diào)用順序
至此,我們的Handler終于分析完畢了块蚌!闰非,再來捋一遍調(diào)用的順序:
- netty處理接收請求
- ->NioServerSocketPipelineSink$Boss
- ->注冊NioWorker線程用于處理長連接和IO操作
- ->DefaultChannelPipeline(處理請求的反序列化和分發(fā)handler,初始化過程見NettyServer峭范,會往pipeline加入decoder财松、encoder和NettyHandler)
- ->decoder做反序列化
- ->NettyHandler處理請求
- ->NettyServer處理請求(NettyServer初始化封裝了MultiMessageHandler、HeartbeatHandler以及根據(jù)dispatcher得到的channelHandler(里面還有封裝纱控,就不往下說了)辆毡,默認是AllDispatcher)
- ->MultiMessageHandler for each處理批量請求
- ->HeartbeatHandler處理心跳請求
- ->AllDispatcher得到的AllChannelHandler處理將通道所有狀態(tài)變更用新線程處理(包括建立連接、斷開連接甜害、接收請求舶掖、異常)
- ->從指定的線程池拿到線程執(zhí)行rpc請求
- ->DecodeHandler進行對message的解碼,此操作可能會被放在work線程進行尔店,如果work線程已經(jīng)解碼過了眨攘,得到的message內(nèi)部會有標識,解碼的方法會直接返回進行下一步操作
- ->HeaderExchangeHandler判斷雙向和單向處理請求闹获,雙向則將結(jié)果使用當前channel回寫
- ->到DubboProtocol$ExchangeHandlerAdapter處理請求調(diào)用
- ->到此服務(wù)端網(wǎng)絡(luò)傳輸層結(jié)束
4. 需要注意的幾個類
1.DubboProtocol類期犬,Dubbo Handler初始化創(chuàng)建的地方
2.HeaderExchangeHandler類,Request和Response概念重點提現(xiàn)的地方
3.DecodeHandler類避诽,Dubbo線程池幫忙解碼的地方
3.NettyHandler類龟虎,Netty Handler服務(wù)與Dubbo Handler職責交接的地方
5. Dubbo是怎么用到Netty的?
可以看到沙庐,
dubbo
使用Netty
還是挺簡單的鲤妥,消費者使用NettyClient
,提供者使用NettyServer
拱雏,Provider
啟動的時候棉安,會開啟端口監(jiān)聽,使用我們平時啟動Netty
一樣的方式铸抑。而Client
在Spring getBean
的時候贡耽,會創(chuàng)建Client
,當調(diào)用遠程方法的時候,將數(shù)據(jù)通過dubbo
協(xié)議編碼發(fā)送到NettyServer
蒲赂,然后NettServer
收到數(shù)據(jù)后解碼阱冶,并調(diào)用本地方法,并返回數(shù)據(jù)滥嘴,完成一次完美的 RPC 調(diào)用木蹬。