本系列主要參考官網(wǎng)文檔基跑、芋道源碼的源碼解讀和《深入理解Apache Dubbo與實(shí)戰(zhàn)》一書糜烹。Dubbo版本為2.6.1养泡。
文章內(nèi)容順序:
1.服務(wù)引用的介紹
2.服務(wù)引用的入口方法getObject()=>createProxy()方法介紹3.本地引用
- 3.1是否為本地引用的判別方法璧微,InjvmProtocol#isInjvmRefer
- 3.2createProxy()中的本地引用鏈路
- 3.3InjvmIvoker介紹,引出proxyFactory
4.proxyFactory擴(kuò)展點(diǎn)
- 4.1proxyFactory的包裝類StubProxyFactoryWrapper,本地存根的作用
- 4.2擴(kuò)展類JavassistProxyFactory的getProxy(Invoker)
- 4.3Proxy 實(shí)例中傳入了InvokerInvocationHandler類的意義
- 4.4JavassistProxyFactory生成的代碼樣例及作用
- 4.5服務(wù)引用存在哪邑雅?
5.遠(yuǎn)程引用
- 5.1 createProxy()中的遠(yuǎn)程引用鏈路
- 5.2只配置了一個(gè)注冊(cè)中心的遠(yuǎn)程引用
- 5.3RegistryProtocol#refer()
- 5.4直連時(shí)的遠(yuǎn)程引用
- 5.5new DubboInvoker時(shí)的getClients(url)
- 5.6getClients(url)中的getSharedClient(url)
- 5.7getClients(url)中的initClient(url)
- 5.8initClient(url)中的自適應(yīng)Exchangers#connect()
- 5.9多注冊(cè)中心時(shí)鏈路
1.服務(wù)引用的介紹
來自官網(wǎng)的服務(wù)引用介紹:
- Dubbo 服務(wù)引用的時(shí)機(jī)有兩個(gè)嗦玖,第一個(gè)是在 Spring 容器調(diào)用 ReferenceBean 的
afterPropertiesSet
方法時(shí)引用服務(wù)患雇,第二個(gè)是在 ReferenceBean對(duì)應(yīng)的服務(wù)被注入到其他類中時(shí)引用。這兩個(gè)引用服務(wù)的時(shí)機(jī)區(qū)別在于宇挫,第一個(gè)是餓漢式的苛吱,第二個(gè)是懶漢式的。- 默認(rèn)情況下器瘪,Dubbo 使用懶漢式引用服務(wù)翠储。如果需要使用餓漢式,可通過配置 <dubbo:reference> 的 init 屬性開啟橡疼。下面我們按照 Dubbo 默認(rèn)配置進(jìn)行分析援所,整個(gè)分析過程從 ReferenceBean 的
getObject
方法開始。當(dāng)我們的服務(wù)被注入到其他類中時(shí)欣除,Spring 會(huì)第一時(shí)間調(diào)用getObject
方法住拭,并由該方法執(zhí)行服務(wù)引用邏輯。- 按照慣例历帚,在進(jìn)行具體工作之前滔岳,需先進(jìn)行配置檢查與收集工作。接著根據(jù)收集到的信息決定服務(wù)用的方式挽牢,有三種谱煤,第一種是引用本地 (JVM) 服務(wù),第二是通過直連方式引用遠(yuǎn)程服務(wù)卓研,第三是通過注冊(cè)中心引用遠(yuǎn)程服務(wù)趴俘。不管是哪種引用方式睹簇,最后都會(huì)得到一個(gè) Invoker 實(shí)例。
- 如果有多個(gè)注冊(cè)中心寥闪,多個(gè)服務(wù)提供者太惠,這個(gè)時(shí)候會(huì)得到一組 Invoker 實(shí)例,此時(shí)需要通過集群管理類 Cluster 將多個(gè) Invoker 合并成一個(gè)實(shí)例疲憋。合并后的 Invoker 實(shí)例已經(jīng)具備調(diào)用本地或遠(yuǎn)程服務(wù)的能力了凿渊,但并不能將此實(shí)例暴露給用戶使用,這會(huì)對(duì)用戶業(yè)務(wù)代碼造成侵入缚柳。此時(shí)框架還需要通過代理工廠類 (ProxyFactory) 為服務(wù)接口生成代理類埃脏,并讓代理類去調(diào)用 Invoker 邏輯。避免了 Dubbo 框架代碼對(duì)業(yè)務(wù)代碼的侵入秋忙,同時(shí)也讓框架更容易使用彩掐。
本圖暫不考慮集群容錯(cuò)、網(wǎng)絡(luò)調(diào)用灰追、序列化反序列
2.服務(wù)引用的入口,getObject()=>createProxy()方法介紹
那么就從ReferenceBean的getObject
開始吧:
public Object getObject() throws Exception {
return get();
}
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
// 檢測(cè) ref (即service對(duì)象)是否為空堵幽,為空則通過 init 方法創(chuàng)建
if (ref == null) {
// init 方法主要用于處理配置,以及調(diào)用 createProxy 生成代理類
init();
}
return ref;
}
注意這個(gè)ReferenceConfig#init()
弹澎,主要邏輯就是配置解析封裝(實(shí)在太長就不貼了朴下,而且解析配置不在討論的點(diǎn)),該實(shí)例包含了事件通知配置苦蒿,比如 onreturn殴胧、onthrow、oninvoke 等佩迟。并在最后調(diào)用 createProxy ()
創(chuàng)建代理對(duì)象团滥。
從createProxy 我們也將開始真正的引用鏈路。
//map為 `side`报强,`dubbo`惫撰,`timestamp`,`pid`等 參數(shù)
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
// 是否本地引用
final boolean isJvmRefer;
// injvm 屬性為空躺涝,不通過該屬性判斷
if (isInjvm() == null) {
// 直連服務(wù)提供者,參見文檔《直連提供者》https://dubbo.gitbooks.io/dubbo-user-book/demos/explicit-target.html
if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
isJvmRefer = false;
// 通過 `tmpUrl` 判斷扼雏,是否需要本地引用
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
// by default, reference local service if there is
isJvmRefer = true;
// 默認(rèn)不是
} else {
isJvmRefer = false;
}
// 通過 injvm 屬性坚嗜。
} else {
isJvmRefer = isInjvm();
}
// 本地引用
if (isJvmRefer) {
// 創(chuàng)建本地服務(wù)引用 URL 對(duì)象。
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
// 引用服務(wù)诗充,返回 Invoker 對(duì)象
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
// 正常流程苍蔬,一般為遠(yuǎn)程引用
} else {
// 定義直連地址,可以是服務(wù)提供者的地址蝴蜓,也可以是注冊(cè)中心的地址
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
// 拆分地址成數(shù)組碟绑,使用 ";" 分隔俺猿。
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
// 循環(huán)數(shù)組,添加到 `url` 中格仲。
if (us != null && us.length > 0) {
for (String u : us) {
// 創(chuàng)建 URL 對(duì)象
URL url = URL.valueOf(u);
// 設(shè)置默認(rèn)路徑
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
// 注冊(cè)中心的地址押袍,帶上服務(wù)引用的配置參數(shù)
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
// 服務(wù)提供者的地址
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
// 注冊(cè)中心
} else { // assemble URL from register center's configuration
// 加載注冊(cè)中心 URL 數(shù)組
List<URL> us = loadRegistries(false);
// 循環(huán)數(shù)組火本,添加到 `url` 中扰才。
if (us != null && !us.isEmpty()) {
for (URL u : us) {
// 加載監(jiān)控中心 URL
URL monitorUrl = loadMonitor(u);
// 服務(wù)引用配置對(duì)象 `map`,帶上監(jiān)控中心的 URL
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 注冊(cè)中心的地址凌停,帶上服務(wù)引用的配置參數(shù)
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); // 注冊(cè)中心侮东,帶上服務(wù)引用的配置參數(shù)
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
// 單 `urls` 時(shí)圈盔,引用服務(wù),返回 Invoker 對(duì)象
if (urls.size() == 1) {
// 引用服務(wù)
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
// 循環(huán) `urls` 悄雅,引用服務(wù)驱敲,返回 Invoker 對(duì)象
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
// 引用服務(wù)
invokers.add(refprotocol.refer(interfaceClass, url));
// 使用最后一個(gè)注冊(cè)中心的 URL
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
// 有注冊(cè)中心
if (registryURL != null) { // registry url is available
// 對(duì)有注冊(cè)中心的 Cluster 只用 AvailableCluster
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
// 無注冊(cè)中心,全部都是服務(wù)直連
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
// 啟動(dòng)時(shí)檢查
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// 創(chuàng)建 Service 代理對(duì)象
// create service proxy
return (T) proxyFactory.getProxy(invoker);
}
3.createProxy()中的本地引用鏈路
3.1是否為本地引用的判別方法宽闲,InjvmProtocol#isInjvmRefer
同樣的众眨,服務(wù)引用也分為本地引用和遠(yuǎn)程引用,本地引用還是遠(yuǎn)程引用是從URL來辨別的便锨,先從本地引用來講起围辙。
這邊來簡(jiǎn)單介紹下他的判別方法InjvmProtocol#isInjvmRefer
,也比較簡(jiǎn)單
public boolean isInjvmRefer(URL url) {
final boolean isJvmRefer;
String scope = url.getParameter(Constants.SCOPE_KEY);
// Since injvm protocol is configured explicitly, we don't need to set any extra flag, use normal refer process.
// 當(dāng) `protocol = injvm` 時(shí)放案,本身已經(jīng)是 jvm 協(xié)議了姚建,走正常流程就是了。
if (Constants.LOCAL_PROTOCOL.toString().equals(url.getProtocol())) {
isJvmRefer = false;
// 當(dāng) `scope = local` 或者 `injvm = true` 時(shí)吱殉,本地引用
} else if (Constants.SCOPE_LOCAL.equals(scope) || (url.getParameter("injvm", false))) {
// if it's declared as local reference
// 'scope=local' is equivalent to 'injvm=true', injvm will be deprecated in the future release
isJvmRefer = true;
// 當(dāng) `scope = remote` 時(shí)掸冤,遠(yuǎn)程引用
} else if (Constants.SCOPE_REMOTE.equals(scope)) {
// it's declared as remote reference
isJvmRefer = false;
// 當(dāng) `generic = true` 時(shí),即使用泛化調(diào)用友雳,遠(yuǎn)程引用稿湿。
} else if (url.getParameter(Constants.GENERIC_KEY, false)) {
// generic invocation is not local reference
isJvmRefer = false;
// 當(dāng)本地已經(jīng)有該 Exporter 時(shí),本地引用
} else if (getExporter(exporterMap, url) != null) {
// by default, go through local reference if there's the service exposed locally
isJvmRefer = true;
// 默認(rèn)押赊,遠(yuǎn)程引用
} else {
isJvmRefer = false;
}
return isJvmRefer;
}
}
上面的方法沒什么好說的饺藤,都已經(jīng)注釋好啦。
createProxy()中的本地引用鏈路
這邊我們先走本地引用的鏈路流礁,再貼一下createProxy()
的部分代碼如下涕俗,注意到在創(chuàng)建本地服務(wù)引用 URL 對(duì)象時(shí),已經(jīng)把Protocol設(shè)置成InjvmProtocol了神帅,
// 本地引用
if (isJvmRefer) {
// 創(chuàng)建本地服務(wù)引用 URL 對(duì)象再姑。
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
// 引用服務(wù),返回 Invoker 對(duì)象
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
看到直接調(diào)用了Protocol#refer(interface, url)
找御,根據(jù)url獲得對(duì)應(yīng) Protocol 拓展實(shí)現(xiàn)為 InjvmProtocol 元镀。同樣的還是SPI機(jī)制绍填,調(diào)用鏈路是:
Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => InjvmProtocol
,已經(jīng)在SPI和服務(wù)暴露一文提過很多次了栖疑,包裝類里export()
和refer()
的邏輯都差不多讨永,就不再介紹。直接來看Protocol#refer(interface, url)
做了什么吧蔽挠。
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
}
非常樸素的new了InjvmInvoker,在進(jìn)去看看他干了什么住闯。
3.3InjvmIvoker介紹,引出proxyFactory
/**
* Exporter 集合
*
* key: 服務(wù)鍵
*
* 該值實(shí)際就是 {@link com.alibaba.dubbo.rpc.protocol.AbstractProtocol#exporterMap}
*/
private final Map<String, Exporter<?>> exporterMap;
InjvmInvoker(Class<T> type, URL url, String key, Map<String, Exporter<?>> exporterMap) {
super(type, url);
this.key = key;
this.exporterMap = exporterMap;
}
- 最后返回的就是這個(gè)InjvmIvoker,Invoker 是 Dubbo 的核心模型澳淑,代表一個(gè)可執(zhí)行體比原,這個(gè)InjvmIvoker他將一些屬性和本地的緩存聚合到一起形成了一個(gè)Invoker。
每個(gè)InjvmInvoker都會(huì)持有一個(gè)指向本地緩存的指針杠巡。- 拿到這個(gè)Invoker后量窘,在
ReferenceConfig#createProxy
最后調(diào)用// 創(chuàng)建 Service 代理對(duì)象 // create service proxy return (T) proxyFactory.getProxy(invoker);
調(diào)用
ProxyFactory#getProxy(invoker)
方法,創(chuàng)建 Service 代理對(duì)象氢拥。這邊的proxyFactory
同樣也是擴(kuò)展點(diǎn)蚌铜,由傳入的invoker.url來決定調(diào)用哪個(gè)。
順帶一提 Service 代理對(duì)象的內(nèi)部嫩海,可以調(diào)用Invoker#invoke(Invocation)
方法冬殃,進(jìn)行 Dubbo 服務(wù)的調(diào)用。
那么最后叁怪,getProxy()到底是怎么創(chuàng)建代理對(duì)象的呢审葬?
4.proxyFactory擴(kuò)展點(diǎn)
ProxyFactory同樣也是個(gè)擴(kuò)展類,有Stub包裝類奕谭,還有兩個(gè)擴(kuò)展類實(shí)現(xiàn)涣觉,可以自由決定用哪個(gè)Factory。Invoker通過Javassist動(dòng)態(tài)代理或者JDK動(dòng)態(tài)代理血柳,兩者都是通過生成字節(jié)碼實(shí)現(xiàn)的官册。
而技術(shù)的選型可以看下這張圖作者的解釋
image.png
4.1proxyFactory的包裝類StubProxyFactoryWrapper
ProxyFactory#getProxy(invoker)`鏈路其實(shí)是這樣的,StubProxyFactoryWrapper的作用就是生成本地存根。
image.png
在此之前我們先要知道本地存根的作用(也就是為什么要調(diào)用
StubProxyFactoryWrapper
):
遠(yuǎn)程服務(wù)后难捌,客戶端通常只剩下接口膝宁,而實(shí)現(xiàn)全在服務(wù)器端,但提供方有些時(shí)候想在客戶端也執(zhí)行部分邏輯根吁,比如:做 ThreadLocal 緩存昆汹,提前驗(yàn)證參數(shù),調(diào)用失敗后偽造容錯(cuò)數(shù)據(jù)等等婴栽,此時(shí)就需要在 API 中帶上 Stub,客戶端生成 Proxy 實(shí)例辈末,會(huì)把 Proxy 通過構(gòu)造函數(shù)傳給 Stub 愚争,然后把 Stub 暴露給用戶映皆,Stub 可以決定要不要去調(diào) Proxy。
說多了不如看一個(gè)簡(jiǎn)單的實(shí)例:Dubbo本地存根
StubProxyFactoryWrapper#getProxy(invoker)
代碼如下
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
// 獲得 Service Proxy 對(duì)象,這邊調(diào)用的就是指定的proxyFactory了
T proxy = proxyFactory.getProxy(invoker);
if (GenericService.class != invoker.getInterface()) { // 非泛化引用
// 獲得 `stub` 配置項(xiàng)
String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));
if (ConfigUtils.isNotEmpty(stub)) {
Class<?> serviceType = invoker.getInterface();
// `stub = true` 的情況轰枝,使用接口 + `Stub` 字符串捅彻。
if (ConfigUtils.isDefault(stub)) {
if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {
stub = serviceType.getName() + "Stub";
} else {
stub = serviceType.getName() + "Local";
}
}
try {
// 加載 Stub 類
Class<?> stubClass = ReflectUtils.forName(stub);
if (!serviceType.isAssignableFrom(stubClass)) {
throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + serviceType.getName());
}
try {
Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
// 創(chuàng)建 Stub 對(duì)象,使用帶 Service Proxy 對(duì)象的構(gòu)造方法
proxy = (T) constructor.newInstance(new Object[]{proxy});
// 【TODO 8033】參數(shù)回調(diào)
//export stub service
URL url = invoker.getUrl();
if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) {
url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));
url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());
try {
export(proxy, (Class) invoker.getInterface(), url);
} catch (Exception e) {
LOGGER.error("export a stub service error.", e);
}
}
} catch (NoSuchMethodException e) {
throw new IllegalStateException("No such constructor \"public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")\" in stub implementation class " + stubClass.getName(), e);
}
} catch (Throwable t) {
LOGGER.error("Failed to create stub implementation class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + t.getMessage(), t);
// ignore
}
}
}
return proxy;
}
- 動(dòng)態(tài)的用proxyFactory(默認(rèn)是JavassistFactory)來獲取Service Proxy 對(duì)象
- 獲取配置中的stub 配置項(xiàng)鞍陨,而后調(diào)用
ReflectUtils#forName(stub)
方法步淹,加載我們自己寫的 Stub 類,注意是加載哦,就是拿到Class诚撵,初始化對(duì)象是在下面完成的缭裆。- 之后創(chuàng)建 Stub 對(duì)象,使用帶 Service Proxy 對(duì)象作為參數(shù)的構(gòu)造方法寿烟。例如澈驼,
public DemoServiceStub(DemoService demoService)
。通過這樣的方式筛武,我們就擁有了一個(gè)內(nèi)部有 Proxy Service 對(duì)象的 Stub 對(duì)象啦缝其,可以實(shí)現(xiàn)各種 OOXX 啦。最后將這個(gè)Stub對(duì)象返回徘六。- 再次提醒内边,所以我們有的,是這個(gè)Stub對(duì)象待锈,可以用這個(gè)對(duì)象來對(duì)執(zhí)行方法進(jìn)行一些本地的AOP攔截
4.2擴(kuò)展類JavassistProxyFactory的getProxy(Invoker)
StubProxyFactoryWrapper#getProxy(invoker)
第一行就調(diào)用了擴(kuò)展類的getProxy(invoker)
既然默認(rèn)實(shí)現(xiàn)是javassist漠其,那么我們就來看看javassist都干了什么吧
先來看看他的父類,AbstractProxyFactory
,
public abstract class AbstractProxyFactory implements ProxyFactory {
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
Class<?>[] interfaces = null;
// TODO 8022 芋艿
String config = invoker.getUrl().getParameter("interfaces");
if (config != null && config.length() > 0) {
String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length + 2];
interfaces[0] = invoker.getInterface();
interfaces[1] = EchoService.class;
for (int i = 0; i < types.length; i++) {
interfaces[i + 1] = ReflectUtils.forName(types[i]);
}
}
}
// 增加 EchoService 接口炉擅,用于回生測(cè)試辉懒。參見文檔《回聲測(cè)試》https://dubbo.gitbooks.io/dubbo-user-book/demos/echo-service.html
if (interfaces == null) {
interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
}
return getProxy(invoker, interfaces);
}
public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
}
可以看到,該抽象類谍失,主要是實(shí)現(xiàn)了 #getProxy(invoker)
方法眶俩,獲得需要生成代理的接口們,而后調(diào)用了我們的子類JavassistProxyFactory#getProxy(invoker, types)
。接著往下看
public class JavassistProxyFactory extends AbstractProxyFactory {
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
/// 生成 Proxy 子類(Proxy 是抽象類)快鱼。并調(diào)用 Proxy 子類的 newInstance 方法創(chuàng)建 Proxy 實(shí)例
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
// TODO Wrapper類不能正確處理帶$的類名
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
JavassistProxyFactory#getProxy(invoker, types)
return的時(shí)候生成 Proxy 子類(Proxy 是抽象類)颠印。并調(diào)用 Proxy 子類的 newInstance 方法創(chuàng)建 Proxy 實(shí)例。注意:其中傳入的參數(shù)是 InvokerInvocationHandler
類抹竹,通過這樣的方式线罕,讓 proxy 和真正的邏輯代碼解耦。
那我們來看看這個(gè)InvokerInvocationHandler
類到底是什么
4.3Proxy 實(shí)例中傳入了InvokerInvocationHandler類
public class InvokerInvocationHandler implements InvocationHandler {
/**
* Invoker 對(duì)象
*/
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// wait 等Object類的方法窃判,直接反射調(diào)用
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
// 基礎(chǔ)方法钞楼,不使用 RPC 調(diào)用
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// RPC 調(diào)用
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
- 這個(gè)類就是攔截一些接口類調(diào)用的用途。將一些簡(jiǎn)單的方法本地調(diào)用袄琳,就不用浪費(fèi)網(wǎng)絡(luò)了通信了询件。
- 這里注意
InvokerInvocationHandler#invoke
return的調(diào)用invoker.invoke(new RpcInvocation(method, args)).recreate()
燃乍,如果消費(fèi)者調(diào)用invoke的話,會(huì)在這里終于開始了網(wǎng)絡(luò)調(diào)用
4.4JavassistProxyFactory生成的代碼樣例及作用
最終JavassistProxyFactory生成的代碼樣例如下:
package org.apache.dubbo.common.bytecode;
public class proxy0 implements org.apache.dubbo.demo.DemoService {
public static java.lang.reflect.Method[] methods;
private java.lang.reflect.InvocationHandler handler;
public proxy0() {
}
public proxy0(java.lang.reflect.InvocationHandler arg0) {
handler = $1;
}
public java.lang.String sayHello(java.lang.String arg0) {
Object[] args = new Object[1];
args[0] = ($w) $1;
Object ret = handler.invoke(this, methods[0], args);
return (java.lang.String) ret;
}
}
- 這個(gè)proxy實(shí)現(xiàn)了我們的業(yè)務(wù)接口方法宛琅,使得我們自己調(diào)用業(yè)務(wù)方法刻蟹,比如
sayHello
的時(shí)候,可以進(jìn)行這樣的鏈路:
InvokerInvocationHandler對(duì)象(攔截基本方法嘿辟,使其不進(jìn)行網(wǎng)絡(luò)調(diào)用舆瘪,如toString()) => 網(wǎng)絡(luò)調(diào)用
- 而使用方對(duì)此是沒有感覺的,他只需要調(diào)用消費(fèi)者的接口红伦,Dubbo幫他實(shí)現(xiàn)了一切英古。
4.5服務(wù)引用存在哪?
別急色建,這就來
XML中的服務(wù)引用配置樣例如下:<dubbo:reference interface="cn.com.wang.service.UserService" id="userService"> </dubbo:reference>
createProxy()
方法最終返回的一個(gè)Stub(如果實(shí)現(xiàn)了本地存根的話)哺呜,getObject()
的返回值正是這個(gè)Stub,存儲(chǔ)在ReferenceBean中。- ReferenceBean 有個(gè)特殊之處箕戳,實(shí)現(xiàn)了FactoryBean ,FactoryBean就是spring的工廠bean某残,工廠bean也就是說當(dāng)我們要獲取dubbo:reference中interface時(shí)也就是我們的UserService,我們會(huì)直接注入到使用類中陵吸,spring就得從容器中找玻墅。
- 也因?yàn)樗枪Sbean,才會(huì)有上文我們提到的壮虫,調(diào)用FactoryBean的getObject()方法澳厢,這個(gè)方法返回的對(duì)象就會(huì)作為標(biāo)簽配置返回的對(duì)象。所以我們的引用囚似,也就是最后返回Service 代理對(duì)象剩拢,的實(shí)際上是存在Spring容器里的。
至此饶唤,我們簡(jiǎn)單分析了本地引用和proxyFactory的動(dòng)態(tài)代理以及本地存根的作用徐伐,接下來來看看遠(yuǎn)程引用與本地引用的區(qū)別吧。
5.遠(yuǎn)程調(diào)用
5.1 createProx()中的遠(yuǎn)程引入的鏈路
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
// 【省略代碼】是否本地引用
final boolean isJvmRefer;
if (isJvmRefer) {
// 【省略代碼】本地引用
} else {// 遠(yuǎn)程引用
// url 不為空募狂,表明用戶可能想進(jìn)行點(diǎn)對(duì)點(diǎn)調(diào)用
if (url != null && url.length() > 0) {
// 當(dāng)需要配置多個(gè) url 時(shí)办素,可用分號(hào)進(jìn)行分割,這里會(huì)進(jìn)行切分
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
// 設(shè)置接口全限定名為 url 路徑
url = url.setPath(interfaceName);
}
// 檢測(cè) url 協(xié)議是否為 registry祸穷,若是性穿,表明用戶想使用指定的注冊(cè)中心
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
// 將 map 轉(zhuǎn)換為查詢字符串,并作為 refer 參數(shù)的值添加到 url 中
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
// 合并 url雷滚,移除服務(wù)提供者的一些配置(這些配置來源于用戶配置的 url 屬性)需曾,
// 比如線程池相關(guān)配置。并保留服務(wù)提供者的部分配置,比如版本胯舷,group刻蚯,時(shí)間戳等
// 最后將合并后的配置設(shè)置為 url 查詢字符串中。
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else {
// url為空桑嘶,加載注冊(cè)中心 url,
//注意是這里開始將不同的注冊(cè)中心循環(huán)添加到urls
List<URL> us = loadRegistries(false);
if (us != null && !us.isEmpty()) {
for (URL u : us) {
// 加載監(jiān)控中心 URL
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
// 服務(wù)引用配置對(duì)象 `map`躬充,帶上監(jiān)控中心的 URL
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 添加 refer 參數(shù)到 url 中逃顶,并將 url 添加到 urls 中
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
// 未配置注冊(cè)中心,拋出異常
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference...");
}
}
// 單個(gè)注冊(cè)中心或服務(wù)提供者(服務(wù)直連充甚,下同)
if (urls.size() == 1) {
// 調(diào)用 RegistryProtocol 的 refer 構(gòu)建 Invoker 實(shí)例
invoker = refprotocol.refer(interfaceClass, urls.get(0));
// 多個(gè)注冊(cè)中心或多個(gè)服務(wù)提供者以政,或者兩者混合
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
// 獲取所有的 Invoker
for (URL url : urls) {
// 通過 refprotocol 調(diào)用 refer 構(gòu)建 Invoker,refprotocol 會(huì)在運(yùn)行時(shí)
// 根據(jù) url 協(xié)議頭加載指定的 Protocol 實(shí)例伴找,并調(diào)用實(shí)例的 refer 方法
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url;
}
}
if (registryURL != null) {
// 如果注冊(cè)中心鏈接不為空盈蛮,則將使用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
// 創(chuàng)建 StaticDirectory 實(shí)例,并由 Cluster 對(duì)多個(gè) Invoker 進(jìn)行合并
invoker = cluster.join(new StaticDirectory(u, invokers));
} else {
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true;
}
// invoker 可用性檢查
if (c && !invoker.isAvailable()) {
throw new IllegalStateException("No provider available for the service...");
}
// 生成代理類
return (T) proxyFactory.getProxy(invoker);
}
- 簡(jiǎn)單概括下遠(yuǎn)程引用的邏輯:讀取直連配置項(xiàng)技矮,或注冊(cè)中心 url抖誉,并將讀取到的 url 存儲(chǔ)到 urls 中。然后根據(jù) urls 元素?cái)?shù)量進(jìn)行后續(xù)操作衰倦。
- 若 urls 元素?cái)?shù)量為1袒炉,則直接通過 Protocol 自適應(yīng)拓展類構(gòu)建 Invoker 實(shí)例接口。
- 若 urls 元素?cái)?shù)量大于1樊零,即存在多個(gè)注冊(cè)中心或服務(wù)直連 url我磁,此時(shí)先根據(jù) url 構(gòu)建 Invoker。然后再通過 Cluster 合并多個(gè) Invoker驻襟,最后調(diào)用 ProxyFactory 生成代理類夺艰。
- 對(duì)于上述的判斷,可以看下多注冊(cè)中心官網(wǎng)的配置樣例來輔助理解沉衣。多注冊(cè)中心
Invoker 的構(gòu)建過程以及代理類的過程比較重要郁副,就是refprotocol.refer(type, url)
這一方法的調(diào)用鏈路。
5.2只配置了一個(gè)注冊(cè)中心的遠(yuǎn)程引用:
截取上面的方法部分如下:
// 單個(gè)注冊(cè)中心或服務(wù)提供者
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
// 多個(gè)注冊(cè)中心或多個(gè)服務(wù)提供者厢蒜,或者兩者混合
} else {
//…………
}
在上述方法中霞势,如果只配置了一個(gè)注冊(cè)中心的話,urls屬性樣例如下(配置了直連會(huì)有不同斑鸦,下面會(huì)介紹)
registry://224.5.6.7:1234/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=26540&qos.port=33333&refer=application%3Ddemo-consumer%26check%3Dfalse%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D26540%26qos.port%3D33333%26register.ip%3D192.168.1.102%26side%3Dconsumer%26timestamp%3D1594622397172®istry=multicast×tamp=1594622397254
image.png調(diào)用
refprotocol.refer(type, url)
方法愕贡,引用服務(wù),返回 Invoker 對(duì)象巷屿。又到了我們喜聞樂見的SPI自適應(yīng)特性固以,自動(dòng)根據(jù) URL 參數(shù),獲得對(duì)應(yīng)的拓展實(shí)現(xiàn)。
在這里我們根據(jù)url可以得到調(diào)用的是RegistryProtocol
憨琳,#refer(...)
方法的調(diào)用順序是:
Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => RegistryProtocol
與服務(wù)暴露時(shí)類似诫钓,ProtocolFilterWrapper 用于創(chuàng)建過濾鏈,
ProtocolListenerWrapper
返回了一個(gè)ListenerInvokerWrapper
對(duì)象,ListenerInvokerWrapper
裝飾invoker, 在構(gòu)造器中遍歷listeners構(gòu)建referer的監(jiān)聽鏈篙螟,這兩個(gè)類都會(huì)放行RegistryProtocol菌湃。也就是什么都不做。
5.35.3RegistryProtocol#refer()
直接來看RegistryProtocol的refer(type, url)做了什么吧
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 獲得真實(shí)的注冊(cè)中心的 URL
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// 獲得注冊(cè)中心
Registry registry = registryFactory.getRegistry(url);
// TODO 芋艿
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// 將 url 查詢字符串轉(zhuǎn)為 Map,獲得服務(wù)引用配置參數(shù)集合
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
// 分組聚合遍略,參見文檔 http://dubbo.io/books/dubbo-user-book/demos/group-merger.html
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
// 通過 SPI 加載 MergeableCluster 實(shí)例惧所,并調(diào)用 doRefer 繼續(xù)執(zhí)行服務(wù)引用邏輯
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 執(zhí)行服務(wù)引用
return doRefer(cluster, registry, type, url);
}
上面代碼首先為 url 設(shè)置協(xié)議頭,然后根據(jù) url 參數(shù)從registryFactory加載注冊(cè)中心實(shí)例绪杏,如果用的是zk的協(xié)議下愈,那么注冊(cè)器就是zk的注冊(cè)器。然后獲取 group 配置蕾久,根據(jù) group 配置決定
doRefer()
第一個(gè)參數(shù)的類型势似。這里的重點(diǎn)是doRefer()
方法
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 創(chuàng)建 RegistryDirectory 對(duì)象,并設(shè)置注冊(cè)中心和協(xié)議
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// 服務(wù)引用配置集合
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 生成服務(wù)消費(fèi)者鏈接
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
// 向注冊(cè)中心注冊(cè)自己(服務(wù)消費(fèi)者)
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
// 向注冊(cè)中心訂閱服務(wù)提供者 + 路由規(guī)則 + 配置規(guī)則
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// 創(chuàng)建 Invoker 對(duì)象
Invoker invoker = cluster.join(directory);
// 向本地注冊(cè)表僧著,注冊(cè)消費(fèi)者
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
- doRefer 方法創(chuàng)建一個(gè)
RegistryDirectory
實(shí)例履因,然后生成服務(wù)者消費(fèi)者鏈接,并向注冊(cè)中心進(jìn)行注冊(cè)霹抛。- 注冊(cè)完畢后搓逾,緊接著訂閱 providers、configurators杯拐、routers 等節(jié)點(diǎn)下的數(shù)據(jù)霞篡。完成訂閱后,RegistryDirectory 會(huì)收到這幾個(gè)節(jié)點(diǎn)下的子節(jié)點(diǎn)信息端逼。
- 由于一個(gè)服務(wù)可能部署在多臺(tái)服務(wù)器上朗兵,這樣就會(huì)在 providers 產(chǎn)生多個(gè)節(jié)點(diǎn),這個(gè)時(shí)候就需要 Cluster 將多個(gè)服務(wù)節(jié)點(diǎn)合并為一個(gè)顶滩,并生成一個(gè) Invoker余掖。
注意這個(gè)cluster,他也是通過SPI機(jī)制依賴注入來的礁鲁,(關(guān)于Cluster盐欺,RegistryDirectory的內(nèi)容本篇暫不討論,等我下一篇再說仅醇! )
5.45.4直連時(shí)的遠(yuǎn)程引用
上面講的是沒有配置直連的情況冗美,這次我們改一下配置,將引用改成直連看看會(huì)咋樣析二。
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService" url="dubbo://localhost:20880" />
同樣是到這一行代碼粉洼,
// 單個(gè)注冊(cè)中心或服務(wù)提供者
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
// 多個(gè)注冊(cè)中心或多個(gè)服務(wù)提供者节预,或者兩者混合
} else {
//…………
}
此時(shí)的urls屬性如下
dubbo://localhost:20880/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=2144&qos.port=33333®ister.ip=192.168.1.102&side=consumer×tamp=1594651670644
image.png
注意到頭部已經(jīng)被換成dubbo了,與上文的registry類似属韧,這邊就是調(diào)用DubboProtocol來進(jìn)行引用了安拟,鏈路如下
Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol
ProtocolFilterWrapper
創(chuàng)建好過濾器鏈,ProtocolListenerWrapper
開啟監(jiān)聽后宵喂,就開始調(diào)用DubboProtocol#refer()
了糠赦,來看一下這個(gè)方法做了什么吧
5.5DubboProtocol#refer()
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// 初始化序列化優(yōu)化器
optimizeSerialization(url);
// 獲得遠(yuǎn)程通信客戶端數(shù)組
// 創(chuàng)建 DubboInvoker 對(duì)象
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
// 添加到 `invokers`
invokers.add(invoker);
return invoker;
}
上面方法看起來比較簡(jiǎn)單,不過這里有一個(gè)調(diào)用需要我們注意一下锅棕,即
getClients(url)
愉棱。這個(gè)方法用于獲取客戶端實(shí)例,實(shí)例類型為ExchangeClient
哲戚。ExchangeClient
實(shí)際上并不具備通信能力,它需要基于更底層的客戶端實(shí)例進(jìn)行通信艾岂。比如NettyClient
顺少、MinaClient
等,默認(rèn)情況下王浴,Dubbo 使用NettyClient
進(jìn)行通信脆炎。接下來,我們簡(jiǎn)單看一下getClients()
方法的邏輯氓辣。
5.5new DubboInvoker時(shí)的getClients(url)
private ExchangeClient[] getClients(URL url) {
// 是否共享連接
boolean service_share_connect = false;
// 獲取連接數(shù)秒裕,默認(rèn)為0,表示未配置
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// 如果未配置 connections钞啸,則共享連接
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
// 獲取共享客戶端
clients[i] = getSharedClient(url);
} else {
// 初始化新的客戶端
clients[i] = initClient(url);
}
}
return clients;
}
這里根據(jù) connections 數(shù)量決定是獲取共享客戶端還是創(chuàng)建新的客戶端實(shí)例几蜻,默認(rèn)情況下,使用共享客戶端實(shí)例体斩。
getSharedClient
方法中也會(huì)調(diào)用initClient 方法
梭稚,因此下面我們一起看一下這兩個(gè)方法,看看他是怎么共享絮吵,怎么初始化的弧烤。
5.6getClients(url)中的getSharedClient(url)
private ExchangeClient getSharedClient(URL url) {
// 從集合中,查找 ReferenceCountExchangeClient 對(duì)象
String key = url.getAddress();
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if (client != null) {
// 若未關(guān)閉蹬敲,增加指向該 Client 的數(shù)量暇昂,并返回它
if (!client.isClosed()) {
client.incrementAndGetCount();
return client;
// 若已關(guān)閉,移除
} else {
referenceClientMap.remove(key);
}
}
// 同步伴嗡,創(chuàng)建 ExchangeClient 對(duì)象急波。
synchronized (key.intern()) {
// 創(chuàng)建 ExchangeClient 對(duì)象
ExchangeClient exchangeClient = initClient(url);
// 將 `exchangeClient` 包裝,創(chuàng)建 ReferenceCountExchangeClient 對(duì)象
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
// 添加到集合
referenceClientMap.put(key, client);
// 從 `ghostClientMap`移除
ghostClientMap.remove(key);
return client;
}
}
- 上面方法先訪問緩存闹究,若緩存未命中幔崖,則通過
initClient
方法創(chuàng)建新的 ExchangeClient 實(shí)例,并將該實(shí)例傳給ReferenceCountExchangeClient
構(gòu)造方法創(chuàng)建一個(gè)帶有引用計(jì)數(shù)功能的ExchangeClient
實(shí)例。ReferenceCountExchangeClient
類內(nèi)部使用引用計(jì)數(shù)的方式記錄共享的數(shù)量赏寇。
ghostClientMap
吉嫩,這個(gè)名字說實(shí)話確實(shí)有點(diǎn)難理解,這個(gè)實(shí)際上是一個(gè)存儲(chǔ)LazyConnectExchangeClient
的集合嗅定。
這個(gè)時(shí)候我們就要理解LazyConnectExchangeClient
的作用自娩,當(dāng)服務(wù)引用時(shí),我們并不想此時(shí)就是開始通信渠退,而是在調(diào)用的時(shí)候再與服務(wù)端通信忙迁,LazyConnectExchangeClient
就像是一個(gè)緩存,在服務(wù)調(diào)用的時(shí)候才會(huì)創(chuàng)建真正的Client去連接碎乃,節(jié)省了資源.- LazyConnectExchangeClient 在每次數(shù)據(jù)傳輸前姊扔,先判斷tcp連接狀態(tài),若連接斷開則先執(zhí)行connect建立連接梅誓。
5.7getClients(url)中的initClient(url)
在DubboProtocol#getSharedClient()
也也調(diào)用了DubboProtocol#initClient()
恰梢,看來是繞不過這方法了,來繼續(xù)看他的代碼梗掰。
private ExchangeClient initClient(URL url) {
// 獲取客戶端類型嵌言,默認(rèn)為 netty
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
// 校驗(yàn) Client 的 Dubbo SPI 拓展是否存在,不存在則拋出異常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
// 設(shè)置編解碼器為 Dubbo 及穗,即 DubboCountCodec
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
// 默認(rèn)開啟 heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 連接服務(wù)器摧茴,創(chuàng)建客戶端
ExchangeClient client;
try {
// 懶連接,創(chuàng)建 LazyConnectExchangeClient 對(duì)象
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
// 直接連接埂陆,創(chuàng)建 HeaderExchangeClient 對(duì)象
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
- DubboProtocol的
requestHandler
是ExchangeHandler
的實(shí)現(xiàn)苛白,是remoting層接收數(shù)據(jù)后的回調(diào)。只定義了一個(gè)回復(fù)請(qǐng)求結(jié)果的方法猜惋,返回的是請(qǐng)求結(jié)果initClient()
方法首先獲取用戶配置的客戶端類型丸氛,默認(rèn)為netty
。然后檢測(cè)用戶配置的客戶端類型是否存在著摔,不存在則拋出異常缓窜。最后根據(jù) lazy 配置決定創(chuàng)建什么類型的客戶端。這里的LazyConnectExchangeClient
代碼并不是很復(fù)雜谍咆,該類會(huì)在 request 方法被調(diào)用時(shí)通過Exchangers#connect
方法創(chuàng)建 ExchangeClient 客戶端禾锤,該類的代碼本節(jié)就不分析了。下面我們分析一下Exchangers#connect()
方法摹察。
5.8initClient(url)中的自適應(yīng)Exchangers#connect()
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 獲取 Exchanger 實(shí)例恩掷,默認(rèn)為 HeaderExchangeClient
return getExchanger(url).connect(url, handler);
Exchangers#getExchanger
會(huì)通過 SPI 加載HeaderExchanger
實(shí)例(在Dubbo中只有這一個(gè)實(shí)例),來就來看看HeaderExchanger#connect()
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 這里包含了多個(gè)調(diào)用供嚎,分別如下:
// 1. 創(chuàng)建 HeaderExchangeHandler 對(duì)象
// 2. 創(chuàng)建 DecodeHandler 對(duì)象
// 3. 通過 Transporters 構(gòu)建 Client 實(shí)例
// 4. 創(chuàng)建 HeaderExchangeClient 對(duì)象
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
我們這里重點(diǎn)看一下
Transporters#connect
方法
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handler 數(shù)量大于1黄娘,則創(chuàng)建一個(gè) ChannelHandler 分發(fā)器
handler = new ChannelHandlerDispatcher(handlers);
}
// 獲取 Transporter 自適應(yīng)拓展類峭状,并調(diào)用 connect 方法生成 Client 實(shí)例
return getTransporter().connect(url, handler);
}
如上,
getTransporter()
方法返回的是自適應(yīng)拓展類逼争,該類會(huì)在運(yùn)行時(shí)根據(jù)客戶端類型加載指定的Transporter
實(shí)現(xiàn)類优床。若用戶未配置客戶端類型,則默認(rèn)加載NettyTransporter
誓焦,并調(diào)用該類的connect()
方法胆敞。如下:
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
// 創(chuàng)建 NettyClient 對(duì)象
return new NettyClient(url, listener);
}
到這里就不繼續(xù)跟下去了,在往下就是通過 Netty 提供的 API 構(gòu)建 Netty 客戶端了杂伟。
5.9多注冊(cè)中心時(shí)鏈路
else {
// 循環(huán) `urls` 移层,引用服務(wù),返回 Invoker 對(duì)象
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
// 引用服務(wù)
invokers.add(refprotocol.refer(interfaceClass, url));
// 使用最后一個(gè)注冊(cè)中心的 URL
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
// 有注冊(cè)中心
if (registryURL != null) { // registry url is available
// 對(duì)有注冊(cè)中心的 Cluster 只用 AvailableCluster
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
// 無注冊(cè)中心赫粥,全部都是服務(wù)直連
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
可以看到多注冊(cè)中心的時(shí)候多了一步 cluster.join()观话,其他與上文分析的單注冊(cè)中心并無區(qū)別,僅僅是Cluster 將多個(gè)服務(wù)節(jié)點(diǎn)合并為一個(gè)越平,并生成一個(gè) Invoker匪燕,這個(gè)我們留待下面章節(jié)來主要分析Cluster。
至此喧笔,我們的DubboProcotol#refer()也分析完畢告一段落,我們知道了他里面創(chuàng)建了一個(gè)DubboInvoker,其中有我們的Clients實(shí)例龟再,基于此书闸,我們才可以通過它來進(jìn)行遠(yuǎn)程調(diào)用了。