String hello = demoService.sayHello("world");
一触创、服務(wù)通信簡圖
- consumer
- proxy0(demoService)調(diào)用其屬性 InvocationHandler
- InvocationHandler 調(diào)用其屬性 MockClusterInvoker
- MockClusterInvoker 調(diào)用其屬性 FailoverClusterInvoker
- FailoverClusterInvoker 調(diào)用其屬性 RegistryDirectory表悬,從中根據(jù) method 獲取 List<DubboInvoker(filtered)>
- RegistryDirectory 調(diào)用其 List<Router> 對(duì) List<DubboInvoker(filtered) 進(jìn)行過濾
- FailoverClusterInvoker 獲取 LoadBalancer球昨,從 5 的結(jié)果獲取一個(gè) DubboInvoker(filtered)
- 執(zhí)行 DubboInvoker(filtered) 的過濾鏈应又,最后執(zhí)行 DubboInvoker,發(fā)起 Netty 調(diào)用
- provider
- 根據(jù)請(qǐng)求參數(shù) RpcInvocation(獲取 interface尉剩、group间坐、version) 和通信通道 Channel(獲取 port) 組裝 serviceKey,根據(jù) serviceKey 獲取 DubboExporter
- 從 DubboExporter 的取出其屬性 AbstractProxyInvoker(filtered)
- 執(zhí)行 AbstractProxyInvoker(filtered) 的過濾鏈少孝,最后執(zhí)行 AbstractProxyInvoker
- AbstractProxyInvoker 調(diào)用 Wrapper继低,Wrapper 調(diào)用 DemoServiceImpl
注意
通過 第7章 Dubbo 服務(wù)暴露流程的設(shè)計(jì)與實(shí)現(xiàn) 第8章 Dubbo 服務(wù)引用流程的設(shè)計(jì)與實(shí)現(xiàn) 這兩章的分析,我們發(fā)現(xiàn)在 Dubbo 中存在四類 Invoker稍走,而 Invoker 也是 Dubbo 中對(duì)調(diào)用邏輯進(jìn)行封裝的一個(gè)模型體:
AbstractProxyInvoker
:服務(wù)端袁翁,提供了對(duì)具體實(shí)現(xiàn)(eg. DemoServiceImpl)的調(diào)用封裝;DubboInvoker
:客戶端婿脸,封裝了 NettyClient粱胜,進(jìn)行遠(yuǎn)程調(diào)用的發(fā)起;- 具體的
XxxClusterInvoker
:客戶端盖淡,將多個(gè)Invoker偽裝成一個(gè)集群版的Invoker年柠;InvokerWrapper 包裝類
:eg.MockClusterInvoker
,應(yīng)用于客戶端褪迟,是具體的XxxClusterInvoker
的包裝類冗恨,提供 mock操作。
二味赃、服務(wù)通信源碼梯形圖
服務(wù)通信分為:客戶端發(fā)出請(qǐng)求掀抹;服務(wù)端接收請(qǐng)求并返回響應(yīng);客戶端接收響應(yīng)心俗。
2.1 客戶端發(fā)出請(qǐng)求
//代理發(fā)出請(qǐng)求
proxy0.sayHello(String paramString)
-->InvokerInvocationHandler.invoke(Object proxy, Method method, Object[] args)
-->new RpcInvocation(method, args)
-->MockClusterInvoker.invoke(Invocation invocation)//服務(wù)降級(jí)的地方
//ClusterInvoker將多個(gè)Invoker偽裝成一個(gè)集群版的Invoker
-->AbstractClusterInvoker.invoke(Invocation invocation)
//獲取Invokers
-->list(Invocation invocation)
-->AbstractDirectory.list(Invocation invocation)
-->RegistryDirectory.doList(Invocation invocation)//從Map<String, List<Invoker<T>>> methodInvokerMap中獲取key為sayHello的List<Invoker<T>>
-->MockInvokersSelector.getNormalInvokers(List<Invoker<T>> invokers)//對(duì)上述的List<Invoker<T>>再進(jìn)行一次過濾(這里比如說過濾出所有協(xié)議為mock的Invoker傲武,如果一個(gè)也沒有就全部返回),這就是router的作用
//獲取負(fù)載均衡器
-->loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE))//默認(rèn)為random
-->RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation)//異步操作添加invocationID
-->FailoverClusterInvoker.doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance)
//使用負(fù)載均衡器選擇一個(gè)Invoker出來:RegistryDirectory$InvokerDelegete實(shí)例
-->AbstractClusterInvoker.select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected)
-->doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected)
-->AbstractLoadBalance.select(List<Invoker<T>> invokers, URL url, Invocation invocation)
-->RandomLoadBalance.doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation)
//執(zhí)行l(wèi)istener和filter鏈
-->ListenerInvokerWrapper.invoke
-->ConsumerContextFilter.invoke(Invoker<?> invoker, Invocation invocation)//設(shè)置一些RpcContext屬性城榛,并且設(shè)置invocation中的invoker屬性
-->FutureFilter.invoke(Invocation invocation)
-->MonitorFilter.invoke(Invocation invocation)//monitor在這里收集數(shù)據(jù)
-->AbstractInvoker.invoke(Invocation inv)//重新設(shè)置了invocation中的invoker屬性和attachment屬性
-->DubboInvoker.doInvoke(Invocation invocation)
//獲取ExchangeClient進(jìn)行消息的發(fā)送
-->ReferenceCountExchangeClient.request(Object request, int timeout)
-->HeaderExchangeClient.request(Object request, int timeout)
-->HeaderExchangeChannel.request(Object request, int timeout)
-->AbstractClient.send(Object message, boolean sent)//NettyClient的父類
-->getChannel()//NettyChannel實(shí)例揪利,其內(nèi)部channel實(shí)例=NioClientSocketChannel實(shí)例
-->NettyChannel.send(Object message, boolean sent)
-->NioClientSocketChannel.write(Object message)//已經(jīng)是netty的東西了,這里的message=Request實(shí)例:最重要的是RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}]
總體流程(默認(rèn)情況下):
- 代理
proxy0
(demoService)調(diào)用InvokerInvocationHandler
執(zhí)行 sayHelloInvokerInvocationHandler
先將請(qǐng)求信息(methodName狠持,parameterTypes疟位,arguments)封裝成 RpcInvocation 對(duì)象,之后調(diào)用MockClusterInvoker
#invokeMockClusterInvoker
根據(jù)是否配置了mock信息(服務(wù)降級(jí)信息)決定走mock邏輯還是正常邏輯(關(guān)于服務(wù)降級(jí)的詳細(xì)源碼解析喘垂,見 http://www.cnblogs.com/java-zhao/p/8320519.html甜刻,這里只走正常邏輯)MockClusterInvoker
調(diào)用FailoverClusterInvoker
#invokeFailoverClusterInvoker
先從RegistryDirectory
中的newMethodInvokerMap
中根據(jù) methodName 獲取 InvokerDelegate 實(shí)例列表(即可用的 provider 列表)- 使用
Router
對(duì)獲取到的 InvokerDelegate 實(shí)例列表再進(jìn)行一次選擇(這里就可以實(shí)現(xiàn)服務(wù)的讀寫分離)- 根據(jù) Dubbo SPI 機(jī)制創(chuàng)建負(fù)載均衡器(默認(rèn)是 RandomLoadBalance)
- 使用
RandomLoadBalance
從被 Router 過濾過的 InvokerDelegate 實(shí)例列表選擇一個(gè)實(shí)例出來(即從 provider 列表選出一臺(tái) provider 機(jī)器來)- 之后執(zhí)行
InvokerDelegate
#invoke 方法:這里首先執(zhí)行 filter 鏈,最后執(zhí)行到DubboInvoker
#doInvoke 方法正勒,在這里首先為 RpcInvocation 添加了新的參數(shù)得院,然后選取了一個(gè)ReferenceCountExchangeClient
,向服務(wù)端發(fā)出了請(qǐng)求章贞。
注意最終的 RpcInvocation
實(shí)例包含的屬性:
methodName=sayHello
parameterTypes=[class java.lang.String]
arguments=[world]
attachments={path=com.alibaba.dubbo.demo.DemoService, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}
2.2 服務(wù)端接收請(qǐng)求并返回響應(yīng)
服務(wù)端接收請(qǐng)求消息
NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-->MultiMessageHandler.received(Channel channel, Object message)
-->HeartbeatHandler.received(Channel channel, Object message)
-->AllChannelHandler.received(Channel channel, Object message)
-->ExecutorService cexecutor = getExecutorService()
-->cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message))
-->ChannelEventRunnable.run()
-->DecodeHandler.received(Channel channel, Object message)
-->decode(Object message)
-->HeaderExchangeHandler.received(Channel channel, Object message)
-->Response response = handleRequest(exchangeChannel, request)
-->DubboProtocol.requestHandler.reply(ExchangeChannel channel, Object message)//這里的message就是上邊的RpcInvocation
//首先獲取exporter祥绞,之后再獲取invoker
-->getInvoker(Channel channel, Invocation inv)//組裝serviceKey=com.alibaba.dubbo.demo.DemoService:20880
-->(DubboExporter<?>) exporterMap.get(serviceKey)//從Map<String, Exporter<?>> exporterMap中根據(jù)serviceKey獲取DubboExport實(shí)例,
-->exporter.getInvoker()//獲取RegistryProtocol$InvokerDelegete實(shí)例
//執(zhí)行filter鏈
-->EchoFilter.invoke(Invoker<?> invoker, Invocation inv)
-->ClassLoaderFilter.nvoke(Invoker<?> invoker, Invocation invocation)
-->GenericFilter.invoke(Invoker<?> invoker, Invocation inv)
-->ContextFilter.invoke(Invoker<?> invoker, Invocation invocation)
-->TraceFilter.invoke(Invoker<?> invoker, Invocation invocation)
-->TimeoutFilter.invoke(Invoker<?> invoker, Invocation invocation)
-->MonitorFilter.invoke(Invoker<?> invoker, Invocation invocation)
-->ExceptionFilter.invoke(Invoker<?> invoker, Invocation invocation)
//執(zhí)行真正的invoker調(diào)用
-->AbstractProxyInvoker.invoke(Invocation invocation)
-->JavassistProxyFactory$AbstractProxyInvoker.doInvoke
-->Wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments)
-->DemoServiceImpl.sayHello(String name)
-->new RpcResult(Object result)//將返回值result包裝成RpcResult(最后該參數(shù)會(huì)被包裝為Response)
服務(wù)端發(fā)送響應(yīng)消息
-->channel.send(response)//NettyChannel
-->NioAcceptedSocketChannel.write(Object message)//已經(jīng)是netty的東西了,這里的message=Response實(shí)例:最重要的是RpcResult [result=Hello world, response form provider: 10.211.55.2:20880, exception=null]
總體流程(默認(rèn)情況下):
- NettyServer 接收到請(qǐng)求消息后蜕径,進(jìn)行解碼怪蔑,之后交給 provider 端業(yè)務(wù)線程池進(jìn)行處理;
- 業(yè)務(wù)線程調(diào)用
DubboProtocol$requestHandler#reply
方法丧荐,該方法首先根據(jù)請(qǐng)求信息 RpcInvocation 組裝 serviceKey,之后根據(jù) serviceKey 從 DubboProtocol 的 exporterMap 中獲取指定服務(wù)的DubboExporter
(serviceKey 是由 RpcInvocation 中的group/path:version:port
組成喧枷,其中 port 來自于 channel)- 之后從
DubboExporter
中獲取存儲(chǔ)在其內(nèi)的 InvokerDelegete 實(shí)例虹统,之后執(zhí)行 filter 鏈,最后執(zhí)行到AbstractProxyInvoker#invoke
方法隧甚,
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
/**
* 真實(shí)對(duì)象 ref, eg. DemoServiceImpl
*/
private final T proxy;
...
public Result invoke(Invocation invocation) throws RpcException {
try {
return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
} catch (InvocationTargetException e) {
return new RpcResult(e.getTargetException());
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
// 子類覆寫的真正調(diào)用的方法
protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
...
}
AbstractProxyInvoker 的實(shí)現(xiàn)是在 ProxyFactory 中創(chuàng)建的匿名內(nèi)部類车荔。
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
這是在講 provider 服務(wù)暴露時(shí)的一段代碼,這里實(shí)現(xiàn)了 AbstractProxyInvoker#doInvoke 方法戚扳,在該方法中又調(diào)用了 wrapper#invokeMethod
/**
* @param o 實(shí)現(xiàn)類
* @param n 方法名稱
* @param p 參數(shù)類型
* @param v 參數(shù)值
* @return
* @throws java.lang.reflect.InvocationTargetException
*/
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
com.alibaba.dubbo.demo.provider.DemoServiceImpl w;
try {
w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl) o);
} catch (Throwable e) {
throw new IllegalArgumentException(e);
}
try {
if ("sayHello".equals(n) && p.length == 1) {
return ($w) w.sayHello((java.lang.String) v[0]);
}
} catch (Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + n + "\" in class com.alibaba.dubbo.demo.provider.DemoServiceImpl.");
}
這里最終調(diào)到了 DemoServiceImpl#sayHello 方法忧便。最后將響應(yīng)結(jié)果封裝成 RpcResult,返回給客戶端帽借。
2.3 客戶端接收響應(yīng)
客戶端接收響應(yīng)消息
NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-->MultiMessageHandler.received(Channel channel, Object message)
-->HeartbeatHandler.received(Channel channel, Object message)
-->AllChannelHandler.received(Channel channel, Object message)
-->ExecutorService cexecutor = getExecutorService()
-->cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message))
-->ChannelEventRunnable.run()
-->DecodeHandler.received(Channel channel, Object message)
-->decode(Object message)
-->HeaderExchangeHandler.received(Channel channel, Object message)
-->handleResponse(Channel channel, Response response)
-->DefaultFuture.received(channel, response)
-->doReceived(Response res)//異步轉(zhuǎn)同步
客戶端接收到服務(wù)端的響應(yīng)之后珠增,先解碼,之后將響應(yīng)信息交給業(yè)務(wù)線程池去處理砍艾,然后就涉及到 Dubbo 的異步轉(zhuǎn)同步的實(shí)現(xiàn)蒂教,直接看 Dubbo 線程模型。
三脆荷、Dubbo 線程模型
以同步調(diào)用為例凝垛。
線程池
在provider端,存在三個(gè)線程池:
- boss 線程池:Netty 所有蜓谋,默認(rèn)只包含一個(gè) NioEventLoop梦皮。用于接收客戶端的連接 channel,并且之后將 channel 注冊(cè)到 worker 線程池中的一個(gè) NioEventLoop 上(實(shí)際上是注冊(cè)在 NioEventLoop 所擁有的那個(gè) Selector 上)桃焕;
- worker 線程池:Netty 所有剑肯,在 Dubbo 中默認(rèn)包含“核數(shù)+1”個(gè) NioEventLoop(在 Netty 中默認(rèn)是2*核數(shù))。worker 線程池中的每一個(gè) NioEventLoop 去阻塞(Selector.select())獲取注冊(cè)在其上的 channel 準(zhǔn)備就緒的事件覆旭,然后做出相應(yīng)處理退子;
- server 線程池:Dubbo 服務(wù)端的業(yè)務(wù)線程池,默認(rèn) worker 線程會(huì)將解碼后的請(qǐng)求消息交由該線程池進(jìn)行處理型将。
在consumer端寂祥,存在兩個(gè)線程池:
- worker 線程池:同 provider 的 worker 線程池
- client 線程池:Dubbo 服務(wù)端的業(yè)務(wù)線程池,默認(rèn) worker 線程會(huì)將解碼后的響應(yīng)消息交由該線程池進(jìn)行處理七兜。
通信流程
在上一小節(jié)中總結(jié)了通信流程的源碼調(diào)用鏈丸凭,這一節(jié)從線程模型的角度來看通信流程。(下面以同步調(diào)用為例)
- consumer 端用戶線程在發(fā)出請(qǐng)求之前會(huì)先創(chuàng)建一個(gè)
DefaultFuture
對(duì)象;并將requestID
作為 DefaultFuture 對(duì)象的 key 存儲(chǔ)在Map<Long, DefaultFuture> FUTURES
中(注意:每一個(gè)requestID
是一個(gè)請(qǐng)求的唯一標(biāo)識(shí)惜犀,最后相應(yīng)的響應(yīng) Response 的responseID
就等于這個(gè)requestID
)- 之后調(diào)用 Netty 編碼并發(fā)出請(qǐng)求铛碑,然后馬上調(diào)用 DefaultFuture#get 進(jìn)行阻塞等待(阻塞等待 response 不為空);
- provider 端 NettyServer 接收到請(qǐng)求后虽界,解碼汽烦,然后交由 server 線程池進(jìn)行處理;
- server 線程池處理完成之后莉御,調(diào)用 Netty 編碼并發(fā)送響應(yīng)消息給 consumer 端撇吞;
- consumer 端接收到響應(yīng)后,解碼礁叔,然后交給 client 線程池處理牍颈,client 線程池從
Map<Long, DefaultFuture> FUTURES
中獲取key=responseID
的DefaultFuture
對(duì)象,然后將響應(yīng)消息填充到其 response 屬性后琅关,喚醒 consumer 端阻塞的用戶線程煮岁;- 最后 consumer 得到了響應(yīng)