在上一節(jié)中介紹了Dubbo啟動過程中的一個操作:將Dubbo服務(wù)注冊到Zk上。下面就介紹啟動過程的另外一個重要操作:服務(wù)監(jiān)聽。
服務(wù)監(jiān)聽邏輯聽起來是比較簡單的。畢竟就四個字~~服務(wù)監(jiān)聽這里的邏輯比較繞,看了好久都感覺有點云里霧里的浓领,所以只能盡力去講一些自己看懂的地方了玉凯,有的地方如果自己說錯了請直接評論區(qū)回復(fù)就好势腮。
概念澄清
Channel:Dubbo中的Channel不同于Netty中Channel,也是為了屏蔽底層邏輯而自定義的接口類漫仆,比如NettyChannel就是包含了Dubbo中Channel與Netty中的Channel的映射關(guān)系捎拯。
ChannelHandler:含義類似于Netty中的ChannelHandler,主要是用于觸發(fā)某些事件盲厌,比如connected署照,disconnectd,received等事件吗浩。
裝飾者模式:在dubbo的源碼中充斥著裝飾者模式建芙,在我們看到的Handler出現(xiàn)的時候,往往都是裝飾者模式在運作著懂扼。Handler內(nèi)部中嵌套著Handler的情景非常之多禁荸,所以理清楚每個Handler都干了什么顯得十分重要(麻煩)。
監(jiān)聽端口的過程
從上一章知道DubboProtocol.export()是這次研究的入口阀湿,我們就從這里開始看:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
//這個key就是可以唯一區(qū)分所暴露的服務(wù)的key,group+interface+version
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
//唯一的服務(wù)與exporter映射起來赶熟,這樣的話在DubboProtocol中就能根據(jù)三要素找到具體可以執(zhí)行的Exporter
exporterMap.put(key, exporter);
//重點
openServer(url);
return exporter;
}
private void openServer(URL url) {
// find server.
//host:port
String key = url.getAddress();
//client 也可以暴露一個只有server可以調(diào)用的服務(wù)。
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
if (isServer) {
//因為Dubbo是基于長鏈接的陷嘴,所以每一個client和server只會通過一個長鏈接來進行通信映砖,所以這里通過client的key與server作為一個映射存儲起來
ExchangeServer server = serverMap.get(key);
if (server == null) {
//重點
serverMap.put(key, createServer(url));
} else {
//server支持reset,配合override功能使用
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
//默認開啟server關(guān)閉時發(fā)送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
//默認開啟heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
// 檢查是否有對應(yīng)的擴展類
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
ExchangeServer server;
try {
//所以重點就在于對于Server的創(chuàng)建上
//requestHandler可以理解為對于request的處理器
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
從上文的代碼中我們看到,最后的重點落到了server = Exchangers.bind(url, requestHandler);上灾挨,在這句代碼中創(chuàng)建了server并完成了監(jiān)聽邑退。這里也是最初的Handler的原型,即requestHandler劳澄。下面我們先理一下Handler都具體做了哪些:
public interface ChannelHandler {
/**
* on channel connected.
*
* @param channel channel.
*/
void connected(Channel channel) throws RemotingException;
/**
* on channel disconnected.
*
* @param channel channel.
*/
void disconnected(Channel channel) throws RemotingException;
/**
* on message sent.
*
* @param channel channel.
* @param message message.
*/
void sent(Channel channel, Object message) throws RemotingException;
/**
* on message received.
*
* @param channel channel.
* @param message message.
*/
void received(Channel channel, Object message) throws RemotingException;
/**
* on exception caught.
*
* @param channel channel.
* @param exception exception.
*/
void caught(Channel channel, Throwable exception) throws RemotingException;
}
ChannelHandler主要是定義跟“管道“相關(guān)的接口瓜饥,這些操作是在netty收到某類遠程傳來的消息時候觸發(fā)的行為,是一個比較底層的接口類浴骂,之后我們凡是看到Handler結(jié)尾的類乓土,大多數(shù)都實現(xiàn)了這個接口。
public interface ExchangeHandler extends ChannelHandler, TelnetHandler {
/**
* reply.
*
* @param channel
* @param request
* @return response
* @throws RemotingException
*/
Object reply(ExchangeChannel channel, Object request) throws RemotingException;
}
ExchangeHandler僅僅是作為Handler的擴展子類,其就多一個方法趣苏。但是這個方法恰恰表情ExchangeHandler的作用范圍:Exchange層面主要是用于信息交換的層面狡相,用于同步轉(zhuǎn)異步操作,其操作的核心就是Request和Response食磕。上面的reply方法就是將request轉(zhuǎn)換為Response的核心接口方法尽棕,定義在ExchangeHandler中也十分清晰。
然后經(jīng)過上面的介紹之后我們再看一下requestHandler的具體內(nèi)容:
//這個類是裝飾者模式下最底層的ChannelHandler類
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
//如果Message是調(diào)用請求的話
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
//根據(jù)exporter領(lǐng)取相關(guān)聯(lián)的invoker,通過inv參數(shù)構(gòu)建出需要的三元素彬伦,然后找到對應(yīng)的exporter滔悉,再通過exporter找到對應(yīng)的invoker
Invoker<?> invoker = getInvoker(channel, inv);
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
//最終還是委托到invoker去調(diào)用真實的接口
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
// 因為ExchangeHandlerAdapter也是ChannelHandler的接口實現(xiàn)類,所以需要實現(xiàn)相關(guān)接口
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
//目前這里并沒有其他操作
super.received(channel, message);
}
}
@Override
public void connected(Channel channel) throws RemotingException {
invoke(channel, Constants.ON_CONNECT_KEY);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
if(logger.isInfoEnabled()){
logger.info("disconected from "+ channel.getRemoteAddress() + ",url:" + channel.getUrl());
}
invoke(channel, Constants.ON_DISCONNECT_KEY);
}
//根據(jù)三要素構(gòu)造出來對應(yīng)的Invocation单绑,然后調(diào)用其方法回官,這里主要處理的是配置的方法,
private void invoke(Channel channel, String methodKey) {
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
received(channel, invocation);
} catch (Throwable t) {
logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
}
}
}
private Invocation createInvocation(Channel channel, URL url, String methodKey) {
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
invocation.setAttachment(Constants.PATH_KEY, url.getPath());
invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
if (url.getParameter(Constants.STUB_EVENT_KEY, false)){
invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
};
看完了requestHandler搂橙,我們再來繼續(xù)看看那server = Exchangers.bind(url, requestHandler);
繼續(xù)跟蹤方法看到其實調(diào)用的是:
server = new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
這里的裝飾者模式可以看一下:
==1.== handler歉提,也就是上面提到的requestHandler
==2.== HeaderExchangeHandler 包裝了handler
// 個人理解:Exchange主要是處理Request和Response的邏輯,所以這里就是調(diào)動底層的方法將Request的處理結(jié)果轉(zhuǎn)換為Response
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
//構(gòu)造Request對應(yīng)的Response
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {//如果請求本身有問題的話
Object data = req.getData();
String msg;
if (data == null) msg = null;
else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
else msg = data.toString();
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
return res;
}
// 根據(jù)massage類型找到對應(yīng)的Handler
Object msg = req.getData();
try {
// 執(zhí)行與Exporter交接的最初的Handler
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
這一層主要是借助與requestHandler的返回結(jié)果区转,將其封裝成具體的Response然后返回苔巨。
==3.== DecodeHandler 包裝了HeaderExchangeHandler
DecodeHandler主要是復(fù)寫了received方法,將收到的請求信息解碼然后傳入下一層的Handler調(diào)用废离。因為編碼解碼的整體不打算在本章涉及侄泽,所以這里先忽略。
通過上面的三部蜻韭,我們看到了這里對外暴露的是DecodeHandler悼尾,然后我們繼續(xù)跟著原來的邏輯看:
server = new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
進一步跟蹤代碼可以得到下面的邏輯:
server = new NettyServer(url, listener),這里的listener其實就是剛才我們裝飾者模式下生成的DecodeHandler湘捎。然后我們進入到NettyServer看一下诀豁。
public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
//在URL山添加對應(yīng)的線程名稱然后包裝handler
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
這里又是裝飾者模式,而且又對剛才的DecodeHandler進行包裝窥妇,我們看一下包裝的邏輯:
public static ChannelHandler wrap(ChannelHandler handler, URL url){
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected ChannelHandlers() {}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
從上面的代碼中我們可以看到這里又對handler做了三層包裝舷胜。我們接著上面講過的三次包裝繼續(xù)看這寫包裝:
==4.== ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension()獲得的是默認的AllDispatcher,所以這里的第四層包裝就是AllChannelHandler活翩。
public AllChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
}
父類構(gòu)造函數(shù)如下:
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
// 因為我司們?nèi)粘J褂玫亩际莊ixThreadPool烹骨,所以這里就以FixedThreadPool來一看具體獲取到的線程池:
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
//感覺就是區(qū)分consumer和provider端,這里看意義不大
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
componentKey = Constants.CONSUMER_SIDE;
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
// 根據(jù)url中取到的參數(shù)材泄,定義一個線程池沮焕,這個線程池非常重要,在我們的工程中執(zhí)行任務(wù)的線程就是這個線程池中的線程拉宗,
// 在我們的dubbo配置中峦树,threads和queues都是重要的參數(shù)
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
==5.== HeartbeatHandler 包裝了AllChannelHandler
這個Handler的操作顧名思義辣辫,就是處理心跳有關(guān)的操作,我們簡單看一下:
public void connected(Channel channel) throws RemotingException {
//在channel上設(shè)置對應(yīng)的讀寫時間
setReadTimestamp(channel);
setWriteTimestamp(channel);
handler.connected(channel);
}
該Handler中比較重要的操作是:
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
//如果是心跳請求或者心跳響應(yīng)的話就會直接在此步驟進行處理魁巩,不會在繼續(xù)調(diào)用之后的handler急灭,減少了不必要的調(diào)用。
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(Response.HEARTBEAT_EVENT);
channel.send(res);
if (logger.isInfoEnabled()) {
int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
if(logger.isDebugEnabled()) {
logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period"
+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
}
}
}
return;
}
if (isHeartbeatResponse(message)) {
if (logger.isDebugEnabled()) {
logger.debug(
new StringBuilder(32)
.append("Receive heartbeat response in thread ")
.append(Thread.currentThread().getName())
.toString());
}
return;
}
handler.received(channel, message);
}
綜上來看谷遂,有了這一層的好處就是某些請求可以在這里直接處理掉葬馋,不用再往之后的handler中傳遞。
==6.== MultiMessageHandler包裝了HeartbeatHandler
// 這個是MultiMessageHandler中的核心方法
@SuppressWarnings("unchecked")
@Override
public void received(Channel channel, Object message) throws RemotingException {
// 在所有的Handler的最開端處理肾扰,如果請求信息是MultiMessage的話畴嘶,代表可能是多個請求合并而成的請求,所以遍歷處理
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage)message;
for(Object obj : list) {
handler.received(channel, obj);
}
} else {
handler.received(channel, message);
}
}
到這里基本的Handler包裝已經(jīng)差不多完了集晚,然后我們就接著原來的邏輯繼續(xù)看:
public NettyServer(URL url, ChannelHandler handler) throws RemotingException{<br>
//在URL山添加對應(yīng)的線程名稱然后包裝handler
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
//獲取服務(wù)暴露所需參數(shù)窗悯,host,port
localAddress = getUrl().toInetSocketAddress();
String host = url.getParameter(Constants.ANYHOST_KEY, false)
|| NetUtils.isInvalidLocalHost(getUrl().getHost())
? NetUtils.ANYHOST : getUrl().getHost();
bindAddress = new InetSocketAddress(host, getUrl().getPort());
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
//具體的open操作落到的NettyServer的實現(xiàn)上
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
if (handler instanceof WrappedChannelHandler ){
executor = ((WrappedChannelHandler)handler).getExecutor();
}
}
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
//設(shè)置線程池(但是線程池中的線程都是守護線程甩恼,為的就是當JVM退出時候不用考慮守護線程是否已經(jīng)結(jié)束)
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory); //Netty啟動類
//定義NettyHandler(這個應(yīng)該是通用的Handler蟀瞧,只有在服務(wù)啟動的時候生效一次)
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder()); //增加解碼處理器
pipeline.addLast("encoder", adapter.getEncoder()); //增加編碼處理器
pipeline.addLast("handler", nettyHandler); //增加具體操作的處理器
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
這里的邏輯就是一般的Netty初始化邏輯沉颂,并沒有做任何特殊處理的地方条摸。唯一需要注意的就是nettyHandler,nettyHandler中包含了對于當前nettyServer的引用铸屉,所以最終的事件處理還是通過上面層層包裝的Handler來處理钉蒲。
到這里,整體上的服務(wù)暴露環(huán)節(jié)已經(jīng)講完彻坛。本節(jié)的重點就是理解每一層的Handler都干了什么事情顷啼,其實這里涉及很多細節(jié)的內(nèi)容都沒有講到,例如參數(shù)的含義昌屉,Stub钙蒙,回調(diào)的處理等等。因為這次是第一次讀源碼间驮,所以重點在于對于核心流程的理解躬厌,先理解核心概念,后面再在核心流程里面補充其他的細節(jié)竞帽。