dubbo服務引用初始化過程就是創(chuàng)建服務動態(tài)代理的過程,與服務發(fā)布一樣,同樣借助bean初始化完成動態(tài)代理的創(chuàng)建福也。具體調用過程:
com.alibaba.dubbo.config.ReferenceConfig:
get()-->init()-->createProxy()
在createProxy()方法中港准,服務引用分為三種情況:
1.JVM內部引用的代理
2.用戶指定URL 直連
3.通過注冊中心欲鹏,引用遠程的代理
直接看第三種
private T createProxy(Map<String, String> map) {
.....................................
List<URL> us = loadRegistries(false);//導入注冊中心url
if (us != null && us.size() > 0) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);//導入monitor相關信息,加入URL中
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
if (urls.size() == 1) {//只有一個注冊中心
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {//多注冊中心
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {//每個注冊中心初始化引用一遍
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最后一個registry url
}
}
if (registryURL != null) { // 有 注冊中心協(xié)議的URL
// 對有注冊中心的Cluster 只用 AvailableCluster
// 多個注冊中心空扎,封裝成一個Invoker
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // 不是 注冊中心的URL
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
.....................................
// 創(chuàng)建服務代理
return (T) proxyFactory.getProxy(invoker);
}
主要服務引用過程在這個方法里refprotocol.refer(interfaceClass, url)藏鹊,調用到RegistryProtocol.refer(),根據url獲取到注冊中心转锈,然后調用doRefer(),new 一個RegistryDirectory盘寡,主要緩存遠程服務相關信息黑忱,如ip:port菇曲,然后向注冊中心consumer端注冊自己,然后在相應節(jié)點providers端訂閱提供者的信息孵户,具體過程如下圖
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {//注冊自己
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
//訂閱provider注冊中心節(jié)點信息,此處url需要攜帶providers參數
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
return cluster.join(directory);
}
下面進入directory.subscribe()方法进胯,此方法非常重要诸衔,此處完成提供者信息的獲取,通信客戶端的初始化违崇,此方法調用鏈為:FailbackRegistry.subscribe()-->ZookeeperRegistry.doSubscribe()-->RegistryDirectory.notify(),下面來看ZookeeperRegistry.doSubscribe()方法脾还,此處主要完成提供端信息的獲取棺蛛,轉換成url椅野,訂閱的節(jié)點有三類:
/dubbo/xx.xx/providers;
/dubbo/xx.xx/routes;
/dubbo/xx.xx/configurations;
protected void doSubscribe(final URL url, final NotifyListener listener) {
List<URL> urls = new ArrayList<URL>();//緩存訂閱到信息
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
...............................................
zkClient.create(path, false);
//從zk訂閱到的信息,并添加子節(jié)點監(jiān)聽
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
}
然后調用到AbstractRegistry.notify()方法
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
..........................................
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
saveProperties(url);//url信息保存到本地
listener.notify(categoryList);
}
}
listener就是一路傳過來的RegistryDirectory妖爷,進入RegistryDirectory.notify()-->refreshInvoker()
private void refreshInvoker(List<URL> invokerUrls){
..........................................
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 將URL列表轉成Invoker列表
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 換方法名映射Invoker列表
// state change
//如果計算錯誤,則不進行處理.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString()));
return ;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try{
destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 關閉未使用的Invoker
}catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
上面的方法提供者url信息轉換成invoker炸裆,并與method對應起來洛史。下面進入toInvokers(invokerUrls):
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
//檢查提供者相關信息.......
....................
// 緩存key為沒有合并消費端參數的URL忆嗜,不管消費端如何合并參數捆毫,如果服務端URL發(fā)生變化闪湾,則重新refer
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // 緩存中沒有,重新refer
try {
........................................
if (enabled) {
invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:"+serviceType+",url:("+url+")" + t.getMessage(), t);
}
if (invoker != null) { // 將新的引用放入緩存
newUrlInvokerMap.put(key, invoker);
}
}else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
下面調用protocol.refer(serviceType, url)绩卤,此protocol為適配類途样,調用過程為ProtocolFilterWrapper.refer()-->ProtocolListenerWrapper.refer()-->DubboProtocol.refer():
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
然后調用getClients(url)江醇,創(chuàng)建ExchangeClient用來處理和提供端的鏈接,一般一個服務接口只會創(chuàng)建一個client
private ExchangeClient[] getClients(URL url){
//是否共享連接
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
//如果connections不配置何暇,則共享連接陶夜,否則每服務每連接
if (connections == 0){
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect){
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
然后一路調用
Exchangers.connect()-->HeaderExchanger.connect()->Transporters.connect()-->NettyTransporter.connect(),最終初始化一個NettyClient赖晶,調用doopen()方法律适,完成網絡客戶端的初始化
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(channelFactory);
// config
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}
回到RegistryProtocol.doRefer()中,cluster.join(directory)遏插,此處調用過程為:MockClusterWrapper.join()-->FailfastCluster.join()捂贿,最終初始化一個MockClusterInvoker對象,此對象持有RegistryDirectory和FailfastClusterInvoker的引用胳嘲,接下來調用JavassistProxyFactory.getProxy()創(chuàng)建代理:
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
此對象spring注入服務接口中厂僧,形成一個透明化的本地服務,當調用本地接口方法時了牛,就會啟動動態(tài)代理颜屠,執(zhí)行InvokerInvocationHandler.invoke()方法:
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
//調用MockClusterInvoker.invoke()方法
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}