服務發(fā)布分析完了塘辅,下面讓我們開始服務消費端晃虫。首先看下官方的時序圖,有個總體印象
根據(jù)上一篇服務發(fā)布的邏輯扣墩,先看我們配置文件的使用
<dubbo:application name="dubbo-client"/>
<dubbo:registry id="nina-register" address="127.0.0.1" port="2181" protocol="zookeeper"/>
<!--故意配置兩個注冊中心的情況測試-->
<dubbo:registry id="nina-register2" address="127.0.0.1" port="2181" protocol="zookeeper"/>
<dubbo:reference interface="com.alibaba.dubbo.kai.api.TestApi" id="TestApi" protocol="dubbo" check="true" version="0.0" timeout="10000"/>
<!--測試通過generic泛化方式實現(xiàn)調(diào)用-->
<dubbo:reference interface="com.alibaba.dubbo.kai.api.HelloApi" id="helloApi2" protocol="dubbo" check="true" version="0.0" timeout="10000" generic="true"/>
根據(jù)spring自定義配置文件的規(guī)則哲银,我們一樣可以找到入口ReferenceBean
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
public void init() {
......
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
......
}
}
先看ReferenceBean的定義
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {
發(fā)現(xiàn)其實現(xiàn)了 FactoryBean和InitializingBean接口,想必和服務發(fā)布一樣InitializingBean對應的afterPropertiesSet用來服務注入呻惕,F(xiàn)actoryBean對應的getObject一定是去實現(xiàn)代理實現(xiàn)的荆责。首先大家一看看下我最初到代碼分析流水圖(代碼分析到一種方法),有個總體印象:
按圖索驥亚脆,我們找到afterPropertiesSet中進行參數(shù)設置
public void afterPropertiesSet() throws Exception {
if (getConsumer() == null) {
Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);
if (consumerConfigMap != null && consumerConfigMap.size() > 0) {
ConsumerConfig consumerConfig = null;
.......
}
if (getApplication() == null
&& (getConsumer() == null || getConsumer().getApplication() == null)) {
........
}
if (getModule() == null
&& (getConsumer() == null || getConsumer().getModule() == null)) {
........
}
if ((getRegistries() == null || getRegistries().size() == 0)
&& (getConsumer() == null || getConsumer().getRegistries() == null || getConsumer().getRegistries().size() == 0)
&& (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().size() == 0)) {
.......
}
if (getMonitor() == null
&& (getConsumer() == null || getConsumer().getMonitor() == null)
&& (getApplication() == null || getApplication().getMonitor() == null)) {
........
}
Boolean b = isInit();
if (b == null && getConsumer() != null) {
b = getConsumer().isInit();
}
if (b != null && b.booleanValue()) {
getObject();
}
}
發(fā)現(xiàn)實際入口的確在getObject方法做院,而此時b又不會等于true,所以此時不會進入次方法型酥,則實際進入getObject的一定是通過FactoryBean的特性山憨。下面開始重點分析怎么獲取到的這個引用對象
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
進去init方法
private void init() {
//和服務端對應賦值
.....
// 獲取消費者全局配置
checkDefault();
appendProperties(this);
if (getGeneric() == null && getConsumer() != null) {
setGeneric(getConsumer().getGeneric());
}
if (ProtocolUtils.isGeneric(getGeneric())) {
.....
}
String resolve = System.getProperty(interfaceName);
String resolveFile = null;
if (resolve == null || resolve.length() == 0) {
.....
}
if (consumer != null) {
...
}
if (module != null) {
...
}
checkApplication();
checkStubAndMock(interfaceClass);
....
//attributes通過系統(tǒng)context進行存儲.
StaticContext.getSystemContext().putAll(attributes);
//重點,去創(chuàng)建引用代理
ref = createProxy(map);
}
此方法中重點是把相關參數(shù)加載到paramterMap中(服務端也又類似操作)弥喉,然后進入createProxy核心方法
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
......
if (isJvmRefer) {
//同一jvm引用郁竟,不是分析重點
....
} else {
if (url != null && url.length() > 0) {
// 用戶指定URL,指定的URL可能是對點對直連地址由境,也可能是注冊中心URL
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else {
// 通過注冊中心配置拼裝URL
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最后一個registry url
}
}
if (registryURL != null) { // 有 注冊中心協(xié)議的URL
// 對有注冊中心的Cluster 只用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // 不是 注冊中心的URL
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
.......
// 創(chuàng)建服務代理
return (T) proxyFactory.getProxy(invoker);
}
(略過tempUrl對應的injvm邏輯棚亩,此處是同一jvm內(nèi)引用情況)首先檢查用戶是否在此引用上單獨設置對端(或在多個注冊中心情況下指定哪個url)url蓖议,如果配置則直接用此url,否則加載配置文件所有配置的注冊中心生成url
List<URL> us = loadRegistries(false);
此方法在服務發(fā)布已經(jīng)分析過一次讥蟆,只是此時入?yún)閒alse(服務端為true)勒虾,區(qū)別在哪呢?
protected List<URL> loadRegistries(boolean provider) {
......
if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
return registryList;
大致意思是當服務端且沒有禁用register或當客戶端且沒有禁用訂閱時添加到注冊url列表中瘸彤。
loadRegistries方法會返回所有<dubbo:registry>配置到對象修然,我們現(xiàn)在模擬的是兩個register的情況,但配置一樣质况,所以會返回兩個一樣但url
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-client&dubbo=2.0.0&pid=19028®istry=zookeeper×tamp=1540603677848
然后進行了一段和服務端類似端操作愕宋,將剛才生成的參數(shù)map生成string,存入map中结榄,以refer為key(服務端key為exporter)中贝。
if (us != null && us.size() > 0) {
for (URL u : us) {
....
//監(jiān)控相關省略,有機會分享
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
獲取到注冊中心url后臼朗,值大概樣子是:
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-client&dubbo=2.0.0&pid=20267&refer=application%3Ddubbo-client%26check%3Dtrue%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.kai.api.TestApi%26methods%3Dgo%26pid%3D20267%26protocol%3Ddubbo%26register.ip%3D192.168.199.130%26revision%3D0.0%26side%3Dconsumer%26timeout%3D10000%26timestamp%3D1540622309609%26version%3D0.0®istry=zookeeper×tamp=1540622309684
下面到重點是什么呢邻寿?當然應該是從注冊中心中訂閱到對應服務
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最后一個registry url
}
}
if (registryURL != null) { // 有 注冊中心協(xié)議的URL
// 對有注冊中心的Cluster 只用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // 不是 注冊中心的URL
invoker = cluster.join(new StaticDirectory(invokers));
}
他先判斷了注冊中心是否只有一個,如果只有一個則會直接發(fā)布賦值invoker视哑,如果又多個绣否,會生成個invokerList,將每個注冊中心訂閱到到url添加進去黎炉,最后生成一個新到clusterInvoker(具體是什么后邊會講)枝秤。
那么我們只需要先分析完單個注冊中心發(fā)布的過程,多個注冊中心的發(fā)布過程就只是在上面又套了一層StaticDirectory而已了慷嗜。
好了淀弹,讓我們看每個url是怎么發(fā)布的:
-
refprotocol.refer(interfaceClass, url)
這是服務發(fā)布生成invoker的重點了,首先refprotocol這個是什么庆械?
//根據(jù)spi機制薇溃,此處適配protocol為ProtocolAdaptive
private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
根據(jù)我們spi的知識,已經(jīng)找到此類
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
.......
public com.alibaba.dubbo.rpc.Invoker refer(Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
........
}
適配邏輯和服務端一樣缭乘,主要還是看url中的protocol沐序,回頭看下url發(fā)現(xiàn)此時協(xié)議為registry,所以最終會進入RegistryProtocol的refer方法中堕绩,
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//還原最初的注冊中心協(xié)議頭zookeeper
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
//從工廠中獲取registry對象策幼,此處根據(jù)spi機制肯定獲取到ZookeeperRegistry
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*" 分組邏輯,暫時不考慮奴紧,不影響主流程
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
//核心引用邏輯
return doRefer(cluster, registry, type, url);
}
我們看到此時獲取到了ZookeeperRegistry特姐,再調(diào)用了doRefer(cluster, registry, type, url)方法
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 創(chuàng)建RegistryDirectory對象,此為注冊中心目錄對象黍氮,重點對象
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// 獲取REFER_KEY的所有屬性
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
//生成自身的消費端urlconsumer://192.168.199.130/com.alibaba.dubbo.kai.api.TestApi?application=dubbo-client&check=true&dubbo=2.0.0&interface=com.alibaba.dubbo.kai.api.TestApi&methods=go&pid=20496&protocol=dubbo&revision=0.0&side=consumer&timeout=10000×tamp=1540623417681&version=0.0
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
//注冊自己本身端訂閱地址 唐含,供別人可見 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
//核心邏輯浅浮,訂閱providers、configurators捷枯、routers三個目錄下信息滚秩,生成一個復合的invoker
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
//cluster進行join聯(lián)合,根據(jù)cluster適配選擇不同的集群調(diào)用策略
return cluster.join(directory);
}
這里提出了引用端重要的一個對象RegistryDirectory淮捆,顧名思義是注冊中心的一個文件目錄郁油,里邊應該包含了當前注冊中心的所有內(nèi)容。方法中先是創(chuàng)建了ci對象争剿,然后將當前消費者自身注冊到注冊中心已艰,以便別人能看到自己。然后重點是通過 directory.subscribe方法去訂閱providers蚕苇、configurators、routers三個文件的目錄凿叠,看下此方法:
public void subscribe(URL url) {
setConsumerUrl(url);
registry.subscribe(url, this);
}
落到了 registry.subscribe(url, this)方法上涩笤,此方法我們是不是很眼熟?對了盒件,分析服務發(fā)布時蹬碧,也曾分析過此段代碼,看下當時那張大圖端這塊
大致意思是炒刁,服務端訂閱了configurtors目錄恩沽,此目錄內(nèi)發(fā)生改變時,zookeeper回調(diào)監(jiān)聽器端通知方法notify()翔始,然后判斷新端url和舊的url是否一致罗心,不一致則重新發(fā)布服務。那消費端呢城瞎?
我們看到添加監(jiān)聽邏輯和服務端一樣最終都會落到zookeeperRegistry的doSubscribe(final URL url, final NotifyListener listener) 的方法上(具體跳轉(zhuǎn)流程渤闷,可看服務發(fā)布)
protected void doSubscribe(final URL url, final NotifyListener listener) {
.............
. List<URL> urls = new ArrayList<URL>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
}
.........
}
此處和服務端不同端是服務端 toCategoriesPath(url)返回只有
/dubbo/com.alibaba.dubbo.kai.api.TestApi/configurators,而消費端有/dubbo/com.alibaba.dubbo.kai.api.TestApi/configurators脖镀、/dubbo/com.alibaba.dubbo.kai.api.TestApi/routers和/dubbo/com.alibaba.dubbo.kai.api.TestApi/providers三個目錄飒箭。
創(chuàng)建完訂閱目錄,消費端一樣會調(diào)用監(jiān)聽器端notify方法蜒灰,而此時端listener為RegistryDirectory
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// configurators
if (configuratorUrls != null && configuratorUrls.size() > 0) {
this.configurators = toConfigurators(configuratorUrls);
}
// routers
if (routerUrls != null && routerUrls.size() > 0) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// 合并override參數(shù),更該配置時刷新invoker
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && localConfigurators.size() > 0) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers
refreshInvoker(invokerUrls);
}
前邊是對configurator和router的合并弦蹂,更新overrideDirectoryUrl,此時初次引用過程configurator和router都沒空强窖,直接調(diào)用refreshInvoker(invokerUrls)
/**
* 根據(jù)invokerURL列表轉(zhuǎn)換為invoker列表凸椿。轉(zhuǎn)換規(guī)則如下:
* 1.如果url已經(jīng)被轉(zhuǎn)換為invoker,則不在重新引用毕骡,直接從緩存中獲取削饵,注意如果url中任何一個參數(shù)變更也會重新引用
* 2.如果傳入的invoker列表不為空岩瘦,則表示最新的invoker列表
* 3.如果傳入的invokerUrl列表是空,則表示只是下發(fā)的override規(guī)則或route規(guī)則窿撬,需要重新交叉對比启昧,決定是否需要重新引用。
*
* @param invokerUrls 傳入的參數(shù)不能為null
*/
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // 禁止訪問
this.methodInvokerMap = null; // 置空列表
destroyAllInvokers(); // 關閉所有Invoker
} else {
this.forbidden = false; // 允許訪問
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//緩存invokerUrls列表劈伴,便于交叉對比
}
if (invokerUrls.size() == 0) {
return;
}
// 將URL列表轉(zhuǎn)成Invoker列表
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
// 換方法名映射Invoker列表
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
// state change
//如果計算錯誤密末,則不進行處理.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // 關閉未使用的Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
我們看到主流程重點在
// 將URL列表轉(zhuǎn)成Invoker列表
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
// 換方法名映射Invoker列表
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
這兩個方法,第一個將url生成對應都invoker跛璧,第二將生成都invoker改成對應目的方法名對應都invoker严里,看下toInvokers(invokerUrls)生成邏輯
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.size() == 0) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
//如果reference端配置了protocol,則只選擇匹配的protocol
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
//對于空目錄追城,沒有配置都直接跳過
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
+ ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
//合并consumer和provider兩面都url生成新都url
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // URL參數(shù)是排序的
if (keys.contains(key)) { // 重復URL
continue;
}
keys.add(key);
// 緩存key為沒有合并消費端參數(shù)的URL刹碾,不管消費端如何合并參數(shù),如果服務端URL發(fā)生變化座柱,則重新refer
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // 緩存中沒有迷帜,重新refer
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
//重點生成invoker邏輯
//url為合并后地址dubbo://172.18.166.201:20880/com.alibaba.dubbo.kai.api.TestApi?anyhost=true&application=dubbo-client&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.kai.api.TestApi&methods=go&pid=9629&protocol=dubbo®ister.ip=172.18.166.201&remote.timestamp=1540454917247&revision=0.0&server=netty4&side=consumer&timeout=10000×tamp=1540456251008&version=0.0
//providerUrl為服務端地址dubbo://172.18.166.201:20880/com.alibaba.dubbo.kai.api.TestApi?anyhost=true&application=kai-nina-server&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.kai.api.TestApi&methods=go&pid=9346&revision=0.0&server=netty4&side=provider×tamp=1540454917247&version=0.0
invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // 將新的引用放入緩存
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
還是先做了url合并檢查邏輯(畢竟修改和新增都是通過此方法),當緩存中沒有此invoker時通過消費端的invokerDelegete委派器生成新的invoker色洞,看其入?yún)⒃俅苏{(diào)用了protocol.refer(serviceType, url)方法進行發(fā)布戏锹,此時的url是什么呢?
dubbo://192.168.199.130:20880/com.alibaba.dubbo.kai.api.TestApi?anyhost=true&application=dubbo-client&check=false&client=netty4&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.kai.api.TestApi&methods=go&pid=21873&protocol=dubbo®ister.ip=192.168.199.130&remote.timestamp=1540603631243&revision=0.0&server=netty4&side=consumer&timeout=10000×tamp=1540632189708&version=0.0
因為上邊 mergeUrl(providerUrl)方法火诸,已將服務端url進行合并锦针,改為dubbo協(xié)議頭地址,此時發(fā)布將會調(diào)用DubboProtocol的refer方法(是不是和服務發(fā)布的邏輯類似)
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
生成了DubboInvoker對象置蜀,想必這就是最終返回的invoker對象奈搜,我們注意到其入?yún)⒂袀€getClients(url)
private ExchangeClient[] getClients(URL url) {
//是否共享連接
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
//如果connections不配置,則共享連接盾碗,否則每服務每連接
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
//對服務端鏈接端封裝
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
看到這個方法感覺終于看到曙光了媚污,又是一個Exchange,顯然是在對應著服務端端socket服務廷雅,生成端客戶端耗美。此處有個connections參數(shù),表示dubbo可以配置客戶端是否所有invoker用同一條netty通道航缀,我們沒配置端化默認就是共享商架,進入getSharedClient(url)方法
private ExchangeClient getSharedClient(URL url) {
String key = url.getAddress();
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if (client != null) {
if (!client.isClosed()) {
client.incrementAndGetCount();
return client;
} else {
referenceClientMap.remove(key);
}
}
synchronized (key.intern()) {
ExchangeClient exchangeClient = initClient(url);
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
return client;
}
}
發(fā)現(xiàn)dubbo是用了個緩存來保證唯一,緩存有則移除原有芥玉,然后重新生成一個蛇摸,最后還是通過initClient(url)生成一個新的客戶端
private ExchangeClient initClient(URL url) {
// client type setting.
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
boolean compatible = (version != null && version.startsWith("1.0."));
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
//默認開啟heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// BIO存在嚴重性能問題,暫時不允許使用
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
//設置連接應該是lazy的
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
初始化client通過 client = Exchangers.connect(url, requestHandler)方法進行灿巧,Exchangers類就是我們上一篇看到的發(fā)布類赶袄,此處服務引用一樣用的這個類(肯定都是一一對應的)揽涮。再看看這個類
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler);
}
一樣還是調(diào)用getExchanger(url)獲取Exhanger對象,不重復解釋饿肺,這里獲取到到HeaderExchanger
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
生成了一個HeaderExchangeClient蒋困,內(nèi)部一樣包裝了一個Transporters進行管道連接
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().connect(url, handler);
}
這里包裝了一個ChannelHandlerDispatcher作為委派器,里邊不過時當handler有多個handler循環(huán)進行處理敬辣,最后一樣getTransporter()肯定獲得當還是和服務對應的NettyTransporter
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
誒雪标,找到了,終于看到了熟悉的NettyClient溉跃,進去看一眼
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(NioSocketChannel.class);
if (getTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
}
bootstrap.handler(new ChannelInitializer() {
protected void initChannel(Channel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())//
.addLast("encoder", adapter.getEncoder())//
.addLast("handler", nettyClientHandler);
}
});
}
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(3000, TimeUnit.MILLISECONDS);
if (ret && future.isSuccess()) {
Channel newChannel = future.channel();
try {
// 關閉舊的連接
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
}
}
} else if (future.cause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
if (!isConnected()) {
//future.cancel(true);
}
}
}
一樣的套路村刨,構(gòu)造方法調(diào)用父類,然后兩個do方法進行實際操作撰茎,父類方法肯定還是個模版模式嵌牺,調(diào)用實際的do方法,不過調(diào)用之前和服務端一樣都是有一個包裝的操作wrapChannelHandler(url, handler)
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
return ChannelHandlers.wrap(handler, url);
}
和服務端一樣了龄糊,最后都生成了
new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)))
這個和服務端做功能的一一對應髓梅。
兩個do方法就簡單了doOpen常規(guī)的Netty客戶端代碼,doConnect進行服務鏈接獲取通道進行緩存绎签。而其處理消息的核心handler還是一樣是通過DubboProtocol傳過來的requestHandler
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要處理高版本調(diào)用低版本的問題
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
.........
};
所以消費端和客戶端對消息處理的handler一致,收消息一定最后調(diào)用這個reply方法酝锅,那發(fā)消息呢诡必?
還記得NettyClient的這段代碼嗎?
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
這里邊有個NettyChannel每次有廢棄通道時搔扁,為什么會調(diào)用這個方法呢爸舒?原來dubbo中的netty通道都緩存在此類中
final class NettyChannel extends AbstractChannel {
private static final Logger logger = LoggerFactory.getLogger(NettyChannel.class);
private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();
private final org.jboss.netty.channel.Channel channel;
channelMap緩存了所有正在使用都channel,并包裝成dubbo內(nèi)部都NettyChannel稿蹲,在此類中對netty對channel進行了些包裝扭勉,主要處理狀態(tài)檢查等等操作。當需要發(fā)送消息時苛聘,也是從NettyChannel對靜態(tài)方法getOrAddChannel中獲取到通道進行發(fā)送的涂炎。
好了,到這里client終于連接上服務端了设哗〕罚回到起點,生成了
ExchangeClient exchangeClient = initClient(url)
這個client對象网梢,會被client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap)包裝一層震缭,和服務端的ExporterChangeableWrapper類型,一個可變的引用包裝战虏。完成后拣宰,生成我們的DubboInvoker(還記得吧党涕,可以結(jié)合大圖看),看眼大圖
回到上文的RegistryDirectory訂閱后的notify邏輯
通過此refreshInvoker方法將所有服務端url轉(zhuǎn)換為invoker對象巡社,
//轉(zhuǎn)換起點在這
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
此時訂閱的主流程就完成了膛堤,當然還有一寫廢棄以前就invoker等等操作,都是為了更新時使用的重贺。
流程回到RegistryProtocol的refer中
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
return cluster.join(directory);
directory已經(jīng)包含了此接口對應的所有invoker對象骑祟,又引入了另外一個概念cluster。
顧名思義气笙,cluster是集群的意思次企,必然是dubbo對集群處理的不同實現(xiàn),看看這個cluster對象
private Cluster cluster;
是個成員變量潜圃,那么一定是通過spi依賴注入來的
public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster {
public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("cluster", "failover");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
return extension.join(arg0);
}
}
看到字節(jié)碼生成的動態(tài)類默認用的是failover擴展名缸棵,我們找到對應的類是public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
}
調(diào)用其join方法,會返回一個對應其策略的FailoverClusterInvoker谭期。
那么知識點來了:
-
Failover Cluster(默認)
失敗自動切換堵第,當出現(xiàn)失敗,重試其它服務器 隧出。通常用于讀操作踏志,但重試會帶來更長延遲≌偷桑可通過 retries="2" 來設置重試次數(shù)(不含第一次)针余。重試次數(shù)配置如下:
<dubbo:service retries="2" />
<dubbo:reference retries="2" />
<dubbo:reference>
<dubbo:method name="sayHello" retries="2" />
</dubbo:reference>
- Failfast Cluster
快速失敗,只發(fā)起一次調(diào)用凄诞,失敗立即報錯圆雁。通常用于非冪等性的寫操作,比如新增記錄 - Failsafe Cluster
失敗安全帆谍,出現(xiàn)異常時伪朽,直接忽略。通常用于寫入審計日志等操作汛蝙。 - Failback Cluster
失敗自動恢復烈涮,后臺記錄失敗請求,定時重發(fā)患雇。通常用于消息通知操作跃脊。 - Forking Cluster
并行調(diào)用多個服務器,只要一個成功即返回苛吱。通常用于實時性要求較高的讀操作酪术,但需要浪費更多服務資源。可通過 forks="2" 來設置最大并行數(shù)绘雁。 - Broadcast Cluster
廣播調(diào)用所有提供者橡疼,逐個調(diào)用,任意一臺報錯則報錯 [2]庐舟。通常用于通知所有提供者更新緩存或日志等本地資源信息
dubbo一共主要又這六種集群處理機制(沒有算mock欣除,和多注冊中心的AvailableCluster操作),每個都實現(xiàn)了一種調(diào)用挪略,通過父類的AbstractClusterInvoker的select和doSelect進行負載均衡算法历帚,然后通過自己實現(xiàn)doInvoke方法趁怔,每個ClusterInvoker進行不同的容錯處理機制今瀑,欲知詳情痴昧,請看下章調(diào)用過程講解吩蔑。
生成了clusterInvoker,基本的引用創(chuàng)建invoker過程就基本完了隙轻。
回到ReferenceConfig的 createProxy(Map<String, String> map)方法中恨诱,生成完invoker晕窑,如果是多注冊中心室叉,會進行一次AvailableCluster包裝
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最后一個registry url
}
}
if (registryURL != null) { // 有 注冊中心協(xié)議的URL
// 對有注冊中心的Cluster 只用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // 不是 注冊中心的URL
invoker = cluster.join(new StaticDirectory(invokers));
}
}
這邏輯就大同小異了睹栖,總之最后獲得了clusterinvoker。
再往下
return (T) proxyFactory.getProxy(invoker);
通過proxyFactory生成對應invoker的代理類茧痕,看下實現(xiàn)野来,最后都回到
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
生成了一個動態(tài)代理,當然此處proxyFactory又兩個實現(xiàn)一個JdkProxyFactory和JavassistProxyFactory踪旷。前者使用jdk的動態(tài)代碼實現(xiàn)梁只,后者則通過javasssist的方法先手寫一個的class文件,然后去創(chuàng)建對象
public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
.......
long id = PROXY_CLASS_COUNTER.getAndIncrement();
String pkg = null;
ClassGenerator ccp = null, ccm = null;
try {
ccp = ClassGenerator.newInstance(cl);
Set<String> worked = new HashSet<String>();
List<Method> methods = new ArrayList<Method>();
for (int i = 0; i < ics.length; i++) {
if (!Modifier.isPublic(ics[i].getModifiers())) {
String npkg = ics[i].getPackage().getName();
if (pkg == null) {
pkg = npkg;
} else {
if (!pkg.equals(npkg))
throw new IllegalArgumentException("non-public interfaces from different packages");
}
}
ccp.addInterface(ics[i]);
for (Method method : ics[i].getMethods()) {
String desc = ReflectUtils.getDesc(method);
if (worked.contains(desc))
continue;
worked.add(desc);
int ix = methods.size();
Class<?> rt = method.getReturnType();
Class<?>[] pts = method.getParameterTypes();
StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
for (int j = 0; j < pts.length; j++)
code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
if (!Void.TYPE.equals(rt))
code.append(" return ").append(asArgument(rt, "ret")).append(";");
methods.add(method);
ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
}
}
if (pkg == null)
pkg = PACKAGE_NAME;
// create ProxyInstance class.
String pcn = pkg + ".proxy" + id;
ccp.setClassName(pcn);
ccp.addField("public static java.lang.reflect.Method[] methods;");
ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
ccp.addDefaultConstructor();
Class<?> clazz = ccp.toClass();
clazz.getField("methods").set(null, methods.toArray(new Method[0]));
// create Proxy class.
String fcn = Proxy.class.getName() + id;
ccm = ClassGenerator.newInstance(cl);
ccm.setClassName(fcn);
ccm.addDefaultConstructor();
ccm.setSuperClass(Proxy.class);
ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
Class<?> pc = ccm.toClass();
proxy = (Proxy) pc.newInstance();
..........
return proxy;
}
此處會生成一個通過javassist生成一個代理類繼承Proxy且實現(xiàn)你的接口埃脏,最終調(diào)用時通過定義的InvokerInvocationHandler進行實際調(diào)用(和jdk動態(tài)代理實際機制一樣)。我們看一眼InvokerInvocationHandler類
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
內(nèi)容很少秋忙,最主要的就是最后通過剛才生成invoker進行最終調(diào)用彩掐。
好了,到這里最終的代理對象終于生成完了灰追,讓我們總體看下類圖
(上圖中的HeaderExchangeChannel是在HeaderExchangeClient的一個屬性堵幽,NettyClient將被包裝在此channel中,而DubboProtocol$requestHandler并沒有在此圖最下面展示弹澎,因為消費端接收消息并沒有用到朴下,下章會細講)
總結(jié)下,dubbo最終生成代理類苦蒿,把生成的invoker集合以目錄類Directory的形式存入代理類的handler中殴胧,并通過cluster層包裝來實現(xiàn)集群調(diào)用策略。不過我們分析過程中少了一部分 DubboProtocol調(diào)用時肯定會經(jīng)過對應的wrapper類,此時會將invoker進行層層包裝形成圖中的filtersInvoker团滥,此處和服務端一樣竿屹,不再贅述。
好了到此灸姊,服務引用分析完了拱燃,要想看怎么調(diào)用的,請看下節(jié)吧力惯!
下一篇 ??? dubbo調(diào)用源碼之消費端
首頁 ??? dubbo源碼欣賞簡介