在dubbo服務(wù)的啟動(dòng)過程中,看到浩蓉,所有的dubbo自定義標(biāo)簽都會(huì)由DubboNamespaceHandler
處理廓旬, 遇到reference
標(biāo)簽寨腔,如何發(fā)現(xiàn)服務(wù)的?
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
在Spring容器啟動(dòng)過程中抱既,遇到reference
標(biāo)簽會(huì)初始化一個(gè)ReferenceBean
,在初始化這個(gè)bean的過程中,獲取到服務(wù)提供方提供的proxy服務(wù)
扁誓。
官方文檔:
按照官方文檔的說法防泵,應(yīng)該是由ReferenceConfig
,調(diào)用DubboProtol
獲取DubboInvoker
,然后通過ProxyFactory
獲取到下游ref對(duì)象的代理蝗敢。
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {
public Object getObject() throws Exception {
//返回的真實(shí)對(duì)象
return get();
}
public void afterPropertiesSet() throws Exception {
//省略一萬行代碼
Boolean b = isInit();
if (b != null && b.booleanValue()) {
getObject();
}
}
}
從ReferenceBean
對(duì)象定義實(shí)現(xiàn)了FactoryBean
接口能夠看到择克,該對(duì)象初始化完成之后返回的是getObject()
方法的返回對(duì)象,實(shí)現(xiàn)了InitializingBean
接口前普,在該對(duì)象初始化完成之后調(diào)用afterPropertiesSet()
肚邢,最終都會(huì)調(diào)用ReferenceConfig
的get()
方法獲取ref
對(duì)象。
// interface proxy reference
private transient volatile T ref;
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
//初始化
init();
}
return ref;
}
private void init() {
if (initialized) {
return;
}
initialized = true;
//拼接URLkey=value參數(shù)拭卿,保存在Map中
Map<String, String> map = new HashMap<String, String>();
Map<Object, Object> attributes = new HashMap<Object, Object>();
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
map.put(Constants.INTERFACE_KEY, interfaceName);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
String prefix = StringUtils.getServiceKey(map);
//忽略很多代碼
String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
if (hostToRegistry == null || hostToRegistry.length() == 0) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
}
map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
//attributes are stored by system context.
StaticContext.getSystemContext().putAll(attributes);
//重點(diǎn)在這里骡湖,創(chuàng)建一個(gè)Proxy,拿到ref對(duì)象峻厚。
ref = createProxy(map);
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}
在創(chuàng)建ref對(duì)象時(shí)响蕴,是通過注冊(cè)中心配置,拼接URL惠桃,當(dāng)注冊(cè)中心只有一個(gè)服務(wù)時(shí)浦夷,直接去引用服務(wù)辖试,當(dāng)注冊(cè)中心有多個(gè)服務(wù)時(shí),則先將多個(gè)服務(wù)分別get到劈狐,然后對(duì)外統(tǒng)一組裝成一個(gè)服務(wù)罐孝,然后通過代理工廠產(chǎn)生了一個(gè)代理對(duì)象。
//這里忽略很多代碼 點(diǎn)對(duì)點(diǎn)肥缔,只分析dubbo協(xié)議莲兢,且只從注冊(cè)中心訂閱服務(wù)的場(chǎng)景
//這里加載配置中心
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
//registry://224.5.6.7:1234/com.alibaba.dubbo.registry.RegistryService?application=test-protocol-random-port&dubbo=2.0.0&pid=2264&refer=application%3Dtest-protocol-random-port%26dubbo%3D2.0.0%26injvm%3Dfalse%26interface%3Dcom.alibaba.dubbo.config.api.DemoService%26methods%3DsayName%2CgetUsers%2Cecho%2CthrowDemoException%2CgetBox%26pid%3D2264%26register.ip%3D192.168.5.5%26side%3Dconsumer%26timestamp%3D1520518336336®istry=multicast×tamp=1520518337358
//組裝URL
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
//忽略若干行代碼
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
return (T) proxyFactory.getProxy(invoker);
引用服務(wù),實(shí)質(zhì)是組裝一個(gè)DubboInvoker
//refprotocol.refer(interfaceClass, url)源碼
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
而將多個(gè)服務(wù)如何包裝成一個(gè)服務(wù)续膳?dubbo中有一層目錄結(jié)構(gòu)改艇,directory,將多個(gè)invoker組裝成一個(gè)FailoverClusterInvoker服務(wù)坟岔,實(shí)質(zhì)就是一個(gè)invoker服務(wù)谒兄。
cluster.join(new StaticDirectory(invokers));
//cluster定義,用到spi
private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
}
//FailoverClusterInvoker定義
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T>
//AbstractClusterInvoker定義
public abstract class AbstractClusterInvoker<T> implements Invoker<T>
代理對(duì)象如何產(chǎn)生社付?這里和服務(wù)發(fā)布一樣舵变,使用proxyFactory獲取一個(gè)代理對(duì)象,實(shí)質(zhì)返回的是 InvokerInvocationHandler
對(duì)象瘦穆。
return (T) proxyFactory.getProxy(invoker);
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
這里有一個(gè)問題纪隙,當(dāng)我們?cè)谡{(diào)用的過程中,發(fā)現(xiàn)實(shí)際上調(diào)用的InvokerInvocationHandler
對(duì)象扛或,并不是FailoverClusterInvoker
绵咱,而是MockClusterInvoker
,這是為什么呢熙兔?是wrapper擴(kuò)展點(diǎn)加載機(jī)制導(dǎo)致的悲伶。后面再介紹。
fyi