<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<!-- 應(yīng)用名毡证,通常與 artifactId 相同即可 -->
<dubbo:application name="demo-consumer"/>
<!-- 注冊中心 -->
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<!-- 生成遠(yuǎn)端服務(wù)的代理對象, 之后可以向調(diào)用本地服務(wù)一樣調(diào)用遠(yuǎn)端服務(wù) -->
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService"/>
</beans>
服務(wù)消費(fèi)者創(chuàng)建服務(wù)代理
// demoService 是代理對象瘪贱,代理對象是 DemoService 接口的實現(xiàn)類
DemoService demoService = (DemoService) context.getBean("demoService");
一卷雕、服務(wù)消費(fèi)者創(chuàng)建服務(wù)代理簡圖
總體流程:(默認(rèn)配置情況下)
- 首先 ReferenceConfig 類的 init 方法調(diào)用
Protocol#refer
方法生成 Invoker 實例(如上圖中的紅色部分)抑堡,這是服務(wù)消費(fèi)的關(guān)鍵浸踩。- 然后使用 JavassistProxyFactory#getProxy 生成接口(DemoService)的代理對象 ref
- 服務(wù)引用第一步掌敬,有注冊中心的情況下(最常用)會調(diào)用
RegistryProtocol#refer(Class<T> type, URL url)
匀借,RegistryProtocol 實際上是其他具體 Protocol(eg. DubboProtocol)的 AOP 類碴倾,在refer(...)
中:
- 獲取注冊中心
- 創(chuàng)建 RegistryDirectory(AOP)
- 首先會獲取注冊中心 Registry逗噩,然后進(jìn)行服務(wù)注冊;(AOP)
- 訂閱providers跌榔、configurators异雁、routers
4.1. 做第一次服務(wù)發(fā)現(xiàn),獲取到 provider 節(jié)點(provider 以 URL 進(jìn)行表示)后僧须;(AOP)
4.2.之后使用具體的 DubboProtocol 將這些表示 provider 的 URL 轉(zhuǎn)化為 DubboInvoker纲刀,并且為每一個 provider 創(chuàng)建 nettyClient,與 nettyServer 進(jìn)行連接
(具體的 Protocol 做的事
)
4.3担平、進(jìn)行 DubboInvoker 的緩存(AOP)示绊,其中 RegistryDirectory#Map<String, List<Invoker<T>>> methodInvokerMap 是后續(xù)發(fā)起調(diào)用時獲取 Invoker 的真正容器(重要
)
- 將directory封裝成一個ClusterInvoker(MockClusterInvoker)。
public class RegistryProtocol implements Protocol {
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
...
// 1. 獲取注冊中心:創(chuàng)建ZkClient實例暂论,連接zk
Registry registry = registryFactory.getRegistry(url);
...
// 2.
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 1. 創(chuàng)建 RegistryDirectory 實例
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
...
// 2. 向注冊中心注冊服務(wù)
registry.register(registeredConsumerUrl);
// 3. 訂閱providers面褐、configurators、routers(訂閱時取胎,調(diào)用了具體的Protocol,eg. DubboProtocol 的 refer(RegistryDirectory)展哭,方法,在該方法中,創(chuàng)建了 nettyClient 端匪傍,并建立了長連接)
directory.subscribe(subscribeUrl.addParameter("category","providers,configurators,routers"));
// 4. 將directory封裝成一個ClusterInvoker(MockClusterInvoker)
Invoker invoker = cluster.join(directory);
...
return invoker;
}
}
大致看下后續(xù)發(fā)起調(diào)用時坝咐,是怎么從 RegistryDirectory 中獲取可執(zhí)行的 Invoker 的。
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
// 以注釋處的例子為例析恢,初始化之后:{"sayHello":[A,B], "sayBye":[B], "*":[router過濾后的provider]}
private volatile Map<String, List<Invoker<T>>> methodInvokerMap;
/************************* 初始化更新 newMethodInvokerMap *************************/
// 訂閱providers墨坚、configurators、routers時映挂,執(zhí)行的通知邏輯
@Override
public synchronized void notify(List<URL> urls) {
// 服務(wù)提供者URL
List<URL> invokerUrls = new ArrayList<URL>();
...
// 初始化 invokerUrls
for (URL url : urls) {
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
...
if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
}
}
...
// 只針對 providers 進(jìn)行調(diào)用
refreshInvoker(invokerUrls);
}
private void refreshInvoker(List<URL> invokerUrls) {
...
// Translate url list to Invoker map => 將url轉(zhuǎn)換為InvokerDelegate
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
// Change method name to map Invoker Map => 構(gòu)造{"sayHello":InvokerDelegate}這樣的鍵值對
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
...
// 初始化實例屬性 methodInvokerMap
this.methodInvokerMap = newMethodInvokerMap;
...
}
private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
Map<String, List<Invoker<T>>> newMethodInvokerMap = new HashMap<String, List<Invoker<T>>>();
...
for (Invoker<T> invoker : invokersMap.values()) {
// 1. 獲取 provider 的所有方法 methods=xxx,yyy,zzz
// (同一個接口的不同 provider 的 methods 參數(shù)可能不同泽篮,例如 DemoService#sayHello() 在 providerA 中有,后續(xù)添加了 DemoService#sayBye() 之后柑船,部署到了 providerB帽撑,
// 此時 providerA 還沒部署,這一時刻鞍时,進(jìn)行的服務(wù)發(fā)現(xiàn)根據(jù) serviceKey 會發(fā)現(xiàn) providerA 和 providerB亏拉,但是二者所擁有的方法卻是不同的,那么經(jīng)過如下邏輯后逆巍,
// newMethodInvokerMap={"sayHello":[A,B], "sayBye":[B]})
String parameter = invoker.getUrl().getParameter(Constants.METHODS_KEY);
String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter);
for (String method : methods) {
...
newMethodInvokerMap.put(method, methodInvokers);
...
}
}
// newMethodInvokerMap={"sayHello":[A,B], "sayBye":[B], "*":[router過濾后的provider]}
newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList);
...
return Collections.unmodifiableMap(newMethodInvokerMap);
}
/************************* 從 newMethodInvokerMap 中選擇 Invoker *************************/
@Override
public List<Invoker<T>> doList(Invocation invocation) {
...
List<Invoker<T>> invokers = null;
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
String methodName = RpcUtils.getMethodName(invocation);
...
// 1. 根據(jù)方法名進(jìn)行 Invoker 的獲燃疤痢:從 {"sayHello":List<InvokerDelegate實例>} 中根據(jù) methodName("sayHello")獲取List<InvokerDelegate實例>
if (invokers == null) {
invokers = localMethodInvokerMap.get(methodName);
}
// 2. 根據(jù) key=* 進(jìn)行獲取 List<Invoker>
if (invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
// 3. 遍歷獲取一個 List<Invoker>
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
...
}
Q
: 為什么要按照方法名進(jìn)行 provider 的緩存而不是直接按照 interfaceName/group/version 這樣的格式?
A
: 同一個接口的不同 provider 的 methods 參數(shù)可能不同锐极,例如 DemoService#sayHello() 在 providerA 中有笙僚,后續(xù)添加了 DemoService#sayBye() 之后,部署到了 providerB灵再,同時將新包(包含sayBye())打包給 consumer肋层,consumer 部署完成后,假設(shè)此時 providerA 還沒部署翎迁,這一時刻栋猖,consumer 進(jìn)行的服務(wù)發(fā)現(xiàn)根據(jù) serviceKey 會發(fā)現(xiàn) providerA 和 providerB,但是二者所擁有的方法卻是不同的汪榔,那么經(jīng)過如下邏輯后蒲拉,newMethodInvokerMap={"sayHello":[A,B], "sayBye":[B]}
),這樣后續(xù)如果執(zhí)行 DemoService#sayBye() 揍异,就會直接獲取到 B全陨。
注意
:在 2.7.x 中去掉了 newMethodInvokerMap 屬性,不再使用方法名作為 key衷掷,直接存儲 List<Invoker>辱姨,代碼雖然簡化了,但是也丟失了A
中描述的好處戚嗅。
服務(wù)引用的第二步雨涛,見下文第二小節(jié)分析和 第10章 Dubbo 代理層的設(shè)計與實現(xiàn)
二枢舶、服務(wù)消費(fèi)者創(chuàng)建服務(wù)代理源碼梯形圖
ReferenceConfig.init()
-->createProxy(Map<String, String> map)
//一 獲取Invoker
-->RegistryProtocol.refer(Class<T> type, URL url)
//1 獲取注冊中心:創(chuàng)建ZkClient實例,連接zk
-->Registry registry = registryFactory.getRegistry(url)
-->AbstractRegistryFactory.getRegistry(URL url)
-->ZookeeperRegistryFactory.createRegistry(URL url)
-->new ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter)
-->ZkclientZookeeperTransporter.connect(URL url)
-->new ZkclientZookeeperClient(URL url)
-->new ZkClient(url.getBackupAddress())
-->AbstractRegistryFactory.Map<String, Registry> REGISTRIES.put("zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService", 上邊的ZookeeperRegistry實例)
-->doRefer(Cluster cluster, Registry registry, Class<T> type, URL url)
-->new RegistryDirectory<T>(type, url)
//2 向注冊中心注冊服務(wù)
-->registry.register(url)
-->ZookeeperRegistry.doRegister(URL url)
-->AbstractZookeeperClient.create(String path, boolean ephemeral)
//3 訂閱providers替久、configurators凉泄、routers
-->RegistryDirectory.subscribe(URL url)
-->ZookeeperRegistry.doSubscribe(final URL url, final NotifyListener listener)
//3.1 會獲取當(dāng)前節(jié)點下已經(jīng)存在的子節(jié)點(第一次服務(wù)發(fā)現(xiàn)發(fā)生在這里),添加子節(jié)點變化監(jiān)聽器
-->List<String> children = zkClient.addChildListener(path, zkListener)
-->AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls)
-->saveProperties(url)
-->RegistryDirectory.notify(List<URL> urls)
//僅僅針對的是providers
-->refreshInvoker(List<URL> invokerUrls)
-->toInvokers(List<URL> urls)
-->ProtocolFilterWrapper.refer(Class<T> type, URL url)
-->DubboProtocol.refer(Class<T> serviceType, URL url)
//3.1.1 創(chuàng)建ExchangeClient蚯根,對第一次服務(wù)發(fā)現(xiàn)providers路徑下的相關(guān)url建立長連接
-->getClients(URL url)
-->getSharedClient(URL url)
-->ExchangeClient exchangeClient = initClient(url)
-->Exchangers.connect(url, requestHandler)
-->HeaderExchanger.connect(URL url, ExchangeHandler handler)
-->new DecodeHandler(new HeaderExchangeHandler(handler)))
-->Transporters.connect(URL url, ChannelHandler... handlers)
-->NettyTransporter.connect(URL url, ChannelHandler listener)
-->new NettyClient(url, listener)
-->new MultiMessageHandler(HeartbeatHandler(AllChannelHandler(handler)))
-->getChannelCodec(url)//獲取Codec2后众,這里是DubboCountCodec實例
-->doOpen()//開啟netty客戶端
-->doConnect()//連接服務(wù)端,建立長連接
-->new HeaderExchangeClient(Client client, boolean needHeartbeat)//上述的NettyClient實例颅拦,needHeartbeat:true
-->startHeatbeatTimer()//啟動心跳計數(shù)器
-->ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap)/
-->Map<String, ReferenceCountExchangeClient> referenceClientMap.put("10.10.10.10:20880", 上邊的ReferenceCountExchangeClient實例)
//3.2 創(chuàng)建DubboInvoker(nettyClient的持有者蒂誉,真正發(fā)起netty調(diào)用的Invoker,做兩件事:選擇nettyClient + 處理單向/異步/同步調(diào)用模板)
-->new DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers)
-->DubboProtocol.Set<Invoker<?>> invokers.add(上邊的DubboInvoker實例)
-->ProtocolFilterWrapper.buildInvokerChain(final Invoker<T> invoker, String key, String group)
-->new InvokerDelegete(Invoker<T> invoker, URL url, URL providerUrl)
//3.3 將創(chuàng)建出來的Invoker緩存起來
-->newUrlInvokerMap.put("dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16001®ister.ip=10.10.10.10&remote.timestamp=1510127991625&side=consumer×tamp=1510128022123", 上邊的InvokerDelegate實例)
-->toMethodInvokers(newUrlInvokerMap)
-->Map<String, List<Invoker<T>>> newMethodInvokerMap:{sayHello=[InvokerDelegete實例], *=[InvokerDelegete實例]}
//4 將directory封裝成一個ClusterInvoker(MockClusterInvoker)
-->cluster.join(directory)
-->Cluster$Adaptive.join(directory)
-->ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("failover")//MockClusterWrapper包裝FailoverCluster
-->MockClusterWrapper.join(Directory<T> directory)
-->FailoverCluster.join(Directory<T> directory)
-->new FailoverClusterInvoker<T>(directory)
-->MockClusterInvoker(Directory<T> directory, Invoker<T> invoker)//invoker:上邊的FailoverClusterInvoker實例
//二 獲取代理
-->JavassistProxyFactory.getProxy(Invoker<T> invoker, Class<?>[] interfaces)//invoker:上邊的MockClusterInvoker實例距帅, interfaces:[interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService]
-->Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker))
-->Proxy.getProxy(ClassLoader cl, Class<?>... ics)//使用javassist獲取一個動態(tài)類
-->new InvokerInvocationHandler(invoker)//invoker:上邊的MockClusterInvoker實例
消費(fèi)者發(fā)布的時候總體做了兩件事:創(chuàng)建 Invoker
和 創(chuàng)建 API 接口的代理對象
右锨。
2.1 創(chuàng)建Invoker
1. 獲取注冊中心:創(chuàng)建 ZkClient 實例,連接 zk
//1 獲取注冊中心:創(chuàng)建ZkClient實例碌秸,連接zk
-->Registry registry = registryFactory.getRegistry(url)
-->AbstractRegistryFactory.getRegistry(URL url)
-->ZookeeperRegistryFactory.createRegistry(URL url)
-->new ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter)
-->ZkclientZookeeperTransporter.connect(URL url)
-->new ZkclientZookeeperClient(URL url)
-->new ZkClient(url.getBackupAddress())
-->AbstractRegistryFactory.Map<String, Registry> REGISTRIES.put("zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService", 上邊的ZookeeperRegistry實例)
與provider相同绍移,不再贅述。
2. 向注冊中心注冊 consumer 服務(wù)
//2 向注冊中心注冊服務(wù)
-->registry.register(url)
-->ZookeeperRegistry.doRegister(URL url)
-->AbstractZookeeperClient.create(String path, boolean ephemeral)
consumer 完成注冊后讥电,會在 zk 上創(chuàng)建節(jié)點(url 解碼后):
/dubbo
- /com.alibaba.dubbo.demo.DemoService
-- /consumers
--- /consumer://10.10.10.10/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&side=consumer×tamp=1510225913509
3. 使用 RegistryDirectory 訂閱 providers蹂窖、configurators、routers 節(jié)點
其中允趟,configurators
節(jié)點用于覆蓋配置(實現(xiàn)“熱配置”)恼策,routers
節(jié)點用于配置路由信息。這里重點說一下 providers
節(jié)點潮剪,該節(jié)點下存儲著 DemoService 的服務(wù)提供者列表,當(dāng)該列表發(fā)生變化時(添加 provider 機(jī)器或者宕機(jī))分唾,會通知 consumer 進(jìn)行 refreshInvoker
操作抗碰。看一下 RegistryDirectory 訂閱 providers 的邏輯绽乔。
//3.1 會獲取當(dāng)前節(jié)點下已經(jīng)存在的字節(jié)點(第一次服務(wù)發(fā)現(xiàn)發(fā)生在這里)弧蝇,添加子節(jié)點變化監(jiān)聽器
-->List<String> children = zkClient.addChildListener(path, zkListener)
-->AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls)
-->saveProperties(url)
-->RegistryDirectory.notify(List<URL> urls)
//僅僅針對的是providers
-->refreshInvoker(List<URL> invokerUrls)
-->toInvokers(List<URL> urls
-->ProtocolFilterWrapper.refer(Class<T> type, URL url)
-->DubboProtocol.refer(Class<T> serviceType, URL url)
//3.1.1 創(chuàng)建ExchangeClient,對第一次服務(wù)發(fā)現(xiàn)providers路徑下的相關(guān)url建立長連接
-->getClients(URL url)
-->getSharedClient(URL url)
-->ExchangeClient exchangeClient = initClient(url)
-->Exchangers.connect(url, requestHandler)
-->HeaderExchanger.connect(URL url, ExchangeHandler handler)
-->new DecodeHandler(new HeaderExchangeHandler(handler)))
-->Transporters.connect(URL url, ChannelHandler... handlers)
-->NettyTransporter.connect(URL url, ChannelHandler listener)
-->new NettyClient(url, listener)
-->new MultiMessageHandler(HeartbeatHandler(AllChannelHandler(handler)))
-->getChannelCodec(url)//獲取Codec2折砸,這里是DubboCountCodec實例
-->doOpen()//開啟netty客戶端
-->doConnect()//連接服務(wù)端看疗,建立長連接
-->new HeaderExchangeClient(Client client, boolean needHeartbeat)//上述的NettyClient實例,needHeartbeat:true
-->startHeatbeatTimer()//啟動心跳計數(shù)器
-->ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap)/
-->Map<String, ReferenceCountExchangeClient> referenceClientMap.put("10.10.10.10:20880", 上邊的ReferenceCountExchangeClient實例)
//3.2 創(chuàng)建DubboInvoker
-->new DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers)
-->DubboProtocol.Set<Invoker<?>> invokers.add(上邊的DubboInvoker實例)
-->ProtocolFilterWrapper.buildInvokerChain(final Invoker<T> invoker, String key, String group)
-->new InvokerDelegete(Invoker<T> invoker, URL url, URL providerUrl)
//3.3 將創(chuàng)建出來的Invoker緩存起來
-->newUrlInvokerMap.put("dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16001®ister.ip=10.10.10.10&remote.timestamp=1510127991625&side=consumer×tamp=1510128022123", 上邊的InvokerDelegate實例)
-->toMethodInvokers(newUrlInvokerMap)
-->Map<String, List<Invoker<T>>> newMethodInvokerMap:{sayHello=[InvokerDelegete實例], *=[InvokerDelegete實例]}
總體流程 :
- 首先添加 provider 子節(jié)點監(jiān)聽器睦授,同時進(jìn)行
第一次服務(wù)發(fā)現(xiàn)
两芳,找出當(dāng)前注冊在 zk 上的 provider 節(jié)點。- 然后創(chuàng)建
ReferenceCountExchangeClient
去枷,為每一個provider
創(chuàng)建一條Netty
長連接(開啟了Netty
客戶端并連接 provider 的Netty
服務(wù)端)- 開啟心跳定時器
- 緩存 ReferenceCountExchangeClient
- 將創(chuàng)建出來的 ReferenceCountExchangeClient 封裝到 DubboInvoker 實例中
- 根據(jù)條件獲取相關(guān)的 filter怖辆,之后使用這些 fiter 對 DubboInvoker 實例進(jìn)行鏈?zhǔn)桨b是复,將包裝后的 DubboInvoker 封裝到 InvokerDelegete 中
- 最后將該 InvokerDelegete 實例按照方法名封裝到
Map<String, List<Invoker<T>>> newMethodInvokerMap
中。(該 map 也是 consumer 調(diào)用 provider 時竖螃,獲取 provider 實例的地方淑廊,該 map 是 RegistryDirectory 的一個屬性,所以我們可以將 RegistryDirectory 看做是一個 provider 的客戶端緩存器
)
Directory.List<Router> routers 會在兩個地方被設(shè)置
:
- 初次創(chuàng)建 Directory 實例時特咆,在 Directory 構(gòu)造器中進(jìn)行設(shè)置季惩;
- notify 的時候會進(jìn)行 routers 的重新設(shè)置。
關(guān)于 Directory 的設(shè)計腻格,后續(xù)分析
4. 將 directory 封裝成一個 ClusterInvoker(MockClusterInvoker)
//4 將directory封裝成一個ClusterInvoker(MockClusterInvoker)
-->cluster.join(directory)
-->Cluster$Adaptive.join(directory)
-->ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("failover")//MockClusterWrapper包裝FailoverCluster
-->MockClusterWrapper.join(Directory<T> directory)
-->FailoverCluster.join(Directory<T> directory)
-->new FailoverClusterInvoker<T>(directory)
-->MockClusterInvoker(Directory<T> directory, Invoker<T> invoker)//invoker:上邊的FailoverClusterInvoker實例
Dubbo 實現(xiàn)了集群容錯功能画拾。在 consumer 發(fā)布的最后流程中,實現(xiàn)了集群容錯荒叶。
- 首先根據(jù) Dubbo SPI 機(jī)制獲取指定類型的
Cluster
實現(xiàn)碾阁,這里默認(rèn)是FailoverCluster
(失敗重試機(jī)制);- 之后將上邊的
RegistryDirectory
封裝到FailoverClusterInvoker
實例中些楣;- 最后創(chuàng)建一個
MockClusterInvoker
實例脂凶,封裝了RegistryDirectory
和FailoverClusterInvoker
。(MockClusterInvoker
用于實現(xiàn)服務(wù)降級
功能)
到這里愁茁,consumer 創(chuàng)建 Invoker 的源碼就結(jié)束了蚕钦。總結(jié)一下
:
- 獲取注冊中心:創(chuàng)建 ZkClient 實例,連接 zk
- 向注冊中心注冊 consumer 服務(wù)
- 使用 RegistryDirectory 訂閱 providers鹅很、configurators嘶居、routers 節(jié)點
3.1. 添加 provider 子節(jié)點監(jiān)聽器,同時進(jìn)行第一次服務(wù)發(fā)現(xiàn)促煮,找出當(dāng)前注冊在 zk 上的 provider 節(jié)點
3.1.1. 創(chuàng)建 ReferenceCountExchangeClient邮屁,為每一個 provider 創(chuàng)建一條 Netty 長連接(開啟了 Netty 客戶端并連接 provider 的 Netty 服務(wù)端)
3.1.2. 開啟心跳定時器
3.2. 根據(jù)條件獲取相關(guān)的 filter,之后使用這些 fiter 對DubboInvoker
實例進(jìn)行鏈?zhǔn)桨b菠齿,將包裝后的 DubboInvoker 封裝到 InvokerDelegete 中
3.3. 最后將該 InvokerDelegete 實例按照方法名封裝到Map<String, List<Invoker<T>>> newMethodInvokerMap
中
- 將 directory 封裝成一個
ClusterInvoker
(MockClusterInvoker)4.1 首先根據(jù) Dubbo SPI 機(jī)制獲取指定類型的 Cluster 實現(xiàn)佑吝,這里默認(rèn)是 FailoverCluster(失敗重試機(jī)制);
4.2 之后將上邊的 RegistryDirectory 封裝到 FailoverClusterInvoker 實例中绳匀;
4.3 最后創(chuàng)建一個 MockClusterInvoker 實例芋忿,封裝了 RegistryDirectory 和 FailoverClusterInvoker。(MockClusterInvoker用于實現(xiàn)服務(wù)降級功能)
我們可以看到疾棵,最終獲取到的 Invoker 是 MockClusterInvoker 實例
戈钢。
關(guān)于 Cluster 的設(shè)計,后續(xù)分析
2.2 創(chuàng)建API接口的代理對象
//二 獲取代理
-->JavassistProxyFactory.getProxy(Invoker<T> invoker, Class<?>[] interfaces)//invoker:上邊的MockClusterInvoker實例是尔, interfaces:[interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService]
-->Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker))
-->Proxy.getProxy(ClassLoader cl, Class<?>... ics)//使用javassist獲取一個動態(tài)類
-->new InvokerInvocationHandler(invoker)//invoker:上邊的MockClusterInvoker實例
獲取 API 接口代理的邏輯比較簡單殉了,注意這里的 Proxy 是 com.alibaba.dubbo.common.bytecode.Proxy
,而非 JDK 的 Proxy嗜历。
這里首先調(diào)用 Proxy.getProxy(interfaces)
獲取到一個創(chuàng)建代理的工廠類 com.alibaba.dubbo.common.bytecode.Proxy0
宣渗,如下:
package com.alibaba.dubbo.common.bytecode;
import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.common.bytecode.Proxy;
import com.alibaba.dubbo.common.bytecode.proxy0;
import java.lang.reflect.InvocationHandler;
public class Proxy0 extends Proxy implements ClassGenerator.DC {
public Object newInstance(InvocationHandler invocationHandler) {
return new proxy0(invocationHandler);
}
}
之后調(diào)用了 Proxy0#newInstance
方法抖所,創(chuàng)建了一個 com.alibaba.dubbo.common.bytecode.proxy0
實例,該實例就是最終的 DemoService 的代理對象痕囱。
DemoService demoService = (DemoService) context.getBean("demoService");
這里的 demoService 就是上述的 com.alibaba.dubbo.common.bytecode.proxy0
實例田轧。
package com.alibaba.dubbo.common.bytecode;
import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.demo.DemoService;
import com.alibaba.dubbo.rpc.service.EchoService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public class proxy0 implements EchoService, DemoService {
public static Method[] methods;
private InvocationHandler handler;
public String sayHello(String string) {
Object[] arrobject = new Object[]{string};
Object object = this.handler.invoke(this, methods[0], arrobject);
return (String)object;
}
public Object $echo(Object object) {
Object[] arrobject = new Object[]{object};
Object object2 = this.handler.invoke(this, methods[1], arrobject);
return object2;
}
public proxy0() {
}
public proxy0(InvocationHandler invocationHandler) {
this.handler = invocationHandler;
}
}
可以看到,當(dāng)調(diào)用 proxy0#sayHello
時鞍恢,實際上其內(nèi)部執(zhí)行的是 InvocationHandler#invoke
傻粘,來看一下 InvocationHandler。
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker; //上邊的MockClusterInvoker實例
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
...
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
InvocationHandler#invoke 調(diào)用的又是 MockClusterInvoker#invoke 方法帮掉。到此為止弦悉,整個服務(wù)引用的源碼分析就完成了。