1.服務發(fā)布概述
Dubbo 服務導出過程始于 Spring 容器發(fā)布刷新事件[dubbo:service --> ServiceBean --> onApplicationEvent(ContextRefreshedEvent event)],在接收到ContextRefreshedEvent 事件后執(zhí)行服務導出邏輯。整個邏輯大致可分為三個部分:
第一部分是前置工作孽椰,主要用于檢查參數(shù)泛烙,組裝 URL叹谁;
第二部分是導出服務矢腻,包含導出服務到本地 (JVM)瓶殃,和導出服務到遠程兩個過程敲茄;
第三部分是向注冊中心注冊服務位谋,用于服務發(fā)現(xiàn),包括注冊到zk和訂閱zk堰燎。
本文的重點實在整個發(fā)布流程掏父,一些細節(jié)簡單描述省略,比如配置檢查秆剪,URL組裝赊淑。
2.源碼環(huán)境說明
基于dubbo2.6.4版本爵政,使用官方的dubbo-demo項目,項目結構圖如下:
修改注冊中心為zookeeper
接口和實現(xiàn)類代碼:
public interface DemoService {
String sayHello(String name);
}
public class DemoServiceImpl implements DemoService {
@Override
public String sayHello(String name) {
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress();
}
}
3.源碼分析
服務發(fā)布的入口方法是 ServiceBean 的 onApplicationEvent陶缺,如下:
代碼塊 ServiceBean #onApplicationEvent
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// 是否有延遲導出 && 是否已導出 && 是不是已被取消導出
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
3.1 服務發(fā)布前置工作
3.1.1 概述
前置工作主要包含兩個部分钾挟,分別是配置檢查,以及 URL 裝配组哩。在導出服務之前等龙,Dubbo 需要檢查用戶的配置是否合理,或者為用戶補充缺省配置伶贰。配置檢查完成后蛛砰,接下來需要根據(jù)這些配置組裝 URL。在 Dubbo 中黍衙,URL 的作用十分重要泥畅。Dubbo 使用 URL 作為配置載體,所有的拓展點都是通過 URL 獲取配置琅翻。
代碼塊 ServiceConfig#doExport
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("Already unexported!");
}
if (exported) {
return;
}
exported = true;
// 檢測 interfaceName 是否合法
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("interface not allow null!");
}
// 檢測 provider 是否為空位仁,為空則新建一個,并通過系統(tǒng)變量為其初始化
checkDefault();
// 下面幾個 if 語句用于檢測 provider方椎、application 等核心配置類對象是否為空聂抢,
// 若為空,則嘗試從其他配置類對象中獲取相應的實例棠众。
if (provider != null) {
if (application == null) {
application = provider.getApplication();
}
if (module == null) {
module = provider.getModule();
}
if (registries == null) {...}
if (monitor == null) {...}
if (protocols == null) {...}
}
if (module != null) {
if (registries == null) {
registries = module.getRegistries();
}
if (monitor == null) {...}
}
if (application != null) {
if (registries == null) {
registries = application.getRegistries();
}
if (monitor == null) {...}
}
// 檢測 ref 是否為泛化服務類型
if (ref instanceof GenericService) {
// 設置 interfaceClass 為 GenericService.class
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
// 設置 generic = "true"
generic = Boolean.TRUE.toString();
}
// ref 非 GenericService 類型
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 對 interfaceClass琳疏,以及 <dubbo:method> 標簽中的必要字段進行檢查
checkInterfaceAndMethods(interfaceClass, methods);
// 對 ref 合法性進行檢測
checkRef();
// 設置 generic = "false"
generic = Boolean.FALSE.toString();
}
// local 和 stub 在功能應該是一致的,用于配置本地存根
if (local != null) {
if ("true".equals(local)) {
local = interfaceName + "Local";
}
Class<?> localClass;
try {
// 獲取本地存根類
localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 檢測本地存根類是否可賦值給接口類闸拿,若不可賦值則會拋出異常空盼,提醒使用者本地存根類類型不合法
if (!interfaceClass.isAssignableFrom(localClass)) {
throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
}
}
if (stub != null) {
// 此處的代碼和上一個 if 分支的代碼基本一致,這里省略
}
// 檢測各種對象是否為空新荤,為空則新建揽趾,或者拋出異常
checkApplication();
checkRegistry();
checkProtocol();
appendProperties(this);
checkStubAndMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
// 導出服務
doExportUrls();
// ProviderModel 表示服務提供者模型,此對象中存儲了與服務提供者相關的信息苛骨。
// 比如服務的配置信息篱瞎,服務實例等。每個被導出的服務對應一個 ProviderModel智袭。
// ApplicationModel 持有所有的 ProviderModel奔缠。
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
3.1.2 對配置檢查的邏輯進行簡單的總結:
檢測 <dubbo:service> 標簽的 interface 屬性合法性,不合法則拋出異常
檢測 ProviderConfig吼野、ApplicationConfig 等核心配置類對象是否為空,若為空两波,則嘗試從其他配置類對象中獲取相應的實例瞳步。
檢測并處理泛化服務和普通服務類
檢測本地存根配置闷哆,并進行相應的處理
對 ApplicationConfig、RegistryConfig 等配置類進行檢測单起,為空則嘗試創(chuàng)建抱怔,若無法創(chuàng)建則拋出異常
3.2 服務暴露
下面進入doExportUrls();
方法:
private void doExportUrls() {
// 加載注冊中心鏈接
List<URL> registryURLs = loadRegistries(true);
// 遍歷 protocols,并在每個協(xié)議下導出服務
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
代碼塊:ServiceConfig#doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
/***
代碼有點長嘀倒,省略組裝url部分的代碼
配置檢查完畢后屈留,緊接著要做的事情是根據(jù)配置,以及其他一些信息組裝 URL测蘑。
URL 是 Dubbo 配置的載體灌危,通過 URL 可讓 Dubbo 的各種配置在各個模塊之間傳遞。
***/
//...
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
/***此處組裝的url示例:
dubbo://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.43.174&bind.port=20880&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=8564&qos.port=22222&side=provider×tamp=1578456375449
***/
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
//下面開始要進入暴露服務的代碼了
String scope = url.getParameter(Constants.SCOPE_KEY);
// don't export when none is configured
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
//暴露服務到本地
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//暴露服務到遠程
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
上面代碼根據(jù) url 中的 scope 參數(shù)決定服務導出方式碳胳,分別如下:
- scope = none勇蝙,不導出服務,注意這里是none字符串
- scope != remote挨约,導出到本地
- scope != local味混,導出到遠程
我們示例中到這里socpe=null,所以會同時暴露服務到本地和遠程
3.2.1 暴露服務到本地
接下來進入ServiceConfig#exportLocal(URL url)方法
private void exportLocal(URL url) {
// 如果 URL 的協(xié)議頭等于 injvm诫惭,說明已經(jīng)導出到本地了翁锡,無需再次導出
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL) // 設置協(xié)議頭為 injvm
.setHost(LOCALHOST)
.setPort(0);
ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
// 創(chuàng)建 Invoker,并導出服務夕土,這里的 protocol 會在運行時調用 InjvmProtocol 的 export 方法
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
}
}
到這里看出服務暴露的結果是生成了一個Exporter對象存起來馆衔,關聯(lián)一個Invoker對象,這兩個是什么呢隘弊?
介紹Invoker和Exporter
Invoker 是實體域哈踱,它是 Dubbo 的核心模型,其它模型都向它靠擾梨熙,或轉換成它开镣,它代表一個可執(zhí)行體,可向它發(fā)起 invoke 調用咽扇,它有可能是一個本地的實現(xiàn)邪财,也可能是一個遠程的實現(xiàn),也可能一個集群實現(xiàn)质欲。
這是官方描述树埠,看起來還是不清楚具體Invoker是做什么的,有什么用嘶伟?
這樣說怎憋,以開頭的DemoService為例,中有一個sayHello(String s)方法,這個方法是給其他地方使用的绊袋,有可能是本地也可能是遠程調用毕匀,通過對應的Invoker.invoke()方法就可以調用了。調用invoker的結果就是最終調用DemoService.sayHello()癌别。
public interface Exporter<T> {
Invoker<T> getInvoker();
void unexport();
}
通過Exporter可以獲取到Invoker皂岔,把緩存起來,后面需要調用的時候就可以獲取inoker調用對應的本地或者遠程方法了展姐。先這么理解就可以了躁垛,Invoker具體如何來的就先不分析了
接下來繼續(xù)看這段代碼:
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
此處protocol為生產的動態(tài)代理類Protocol$Adaptive如下:
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
//執(zhí)行到此處的時候extName=Injvm
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
protocol.export執(zhí)行的時候先獲取Protocol的擴展實例,在這里是InjvmProtocol圾笨,然后調用InjvmProtocol#export方法(如下)返回了一個InjvmExporter教馆。
InjvmProtocol#export
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
到這里服務本地暴露就分析完了。
3.2.2 暴露服務到遠程
然后回到ServiceConfig#doExportUrlsFor1Protocol
中的這行代碼 Exporter<?> exporter = protocol.export(wrapperInvoker);
這里的wrapperInvoker信息如下:
interface com.alibaba.dubbo.demo.DemoService -> registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.43.174%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D192.168.43.174%26bind.port%3D20880%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D16888%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1578470846696&pid=16888&qos.port=22222®istry=zookeeper×tamp=1578470846603
protocol.export在執(zhí)行的時候會根據(jù)protocol擴展名獲取具體的實現(xiàn):
Protocol$Adaptive#export 方法中部分代碼(這個類在上面以及貼過了)
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("registry");
//在這里extension就是RegistryProtocol了
extension.refer(arg0, arg1);
RegistryProtocol #export
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 導出服務
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// 獲取注冊中心 URL墅拭,以 zookeeper 注冊中心為例活玲,得到的示例 URL 如下:
// zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
URL registryUrl = getRegistryUrl(originInvoker);
// 根據(jù) URL 加載 Registry 實現(xiàn)類,比如 ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 獲取已注冊的服務提供者 URL谍婉,比如:
// dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
// 獲取 register 參數(shù)
boolean register = registeredProviderUrl.getParameter("register", true);
// 向服務提供者與消費者注冊表中注冊服務提供者
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
// 根據(jù) register 的值決定是否注冊服務
if (register) {
// 向注冊中心注冊服務
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// 獲取訂閱 URL舒憾,比如:
// provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
// 創(chuàng)建監(jiān)聽器
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 向注冊中心進行訂閱 override 數(shù)據(jù)
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 創(chuàng)建并返回 DestroyableExporter
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
上面代碼看起來比較復雜,主要做如下一些操作:
- 調用 doLocalExport 導出服務
- 向注冊中心注冊服務
- 向注冊中心進行訂閱 override 數(shù)據(jù)
- 創(chuàng)建并返回 DestroyableExporter
下面先來分析 doLocalExport 方法的邏輯穗熬,如下:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
// 訪問緩存
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
// 創(chuàng)建 Invoker 為委托類對象
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
// 調用 protocol 的 export 方法導出服務
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
// 寫緩存
bounds.put(key, exporter);
}
}
}
return exporter;
}
接下來镀迂,我們把重點放在 Protocol 的 export 方法上。假設運行時協(xié)議為 dubbo唤蔗,此處的 protocol 變量會在運行時加載 DubboProtocol探遵,并調用 DubboProtocol 的 export 方法。所以妓柜,接下來我們目光轉移到 DubboProtocol 的 export 方法上箱季,相關分析如下:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 獲取服務標識,理解成服務坐標也行棍掐。由服務組名藏雏,服務名,服務版本號以及端口組成作煌。比如:
// demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
String key = serviceKey(url);
// 創(chuàng)建 DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 將 <key, exporter> 鍵值對放入緩存中
exporterMap.put(key, exporter);
// 本地存根相關代碼
//本地存根是一個代理對象掘殴,一般用于在真正調用服務前做一些參數(shù)見檢查之類的
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
// 省略日志打印代碼
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 啟動服務器
openServer(url);
// 優(yōu)化序列化
optimizeSerialization(url);
return exporter;
}
重點關注 DubboExporter 的創(chuàng)建以及 openServer 方法,下面分析 openServer 方法。
private void openServer(URL url) {
// 獲取 host:port粟誓,并將其作為服務器實例的 key奏寨,用于標識當前的服務器實例
String key = url.getAddress();
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
// 訪問緩存
ExchangeServer server = serverMap.get(key);
if (server == null) {
// 創(chuàng)建服務器實例
serverMap.put(key, createServer(url));
} else {
// 服務器已創(chuàng)建,則根據(jù) url 中的配置重置服務器
//在同一臺機器上(單網(wǎng)卡)鹰服,同一個端口上僅允許啟動一個服務器實例病瞳。若某個端口上已有服務器實例,此時則調用 reset 方法重置服務器的一些配置。
server.reset(url);
}
}
}
接下來分析服務器實例的創(chuàng)建過程仍源,如下:
private ExchangeServer createServer(URL url) {
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,
// 添加心跳檢測配置到 url 中
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 獲取 server 參數(shù)心褐,默認為 netty
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
// 通過 SPI 檢測是否存在 server 參數(shù)所代表的 Transporter 拓展舔涎,不存在則拋出異常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
// 添加編碼解碼器參數(shù)
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
// 創(chuàng)建 ExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server...");
}
// 獲取 client 參數(shù)笼踩,可指定 netty,mina
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
// 獲取所有的 Transporter 實現(xiàn)類名稱集合亡嫌,比如 supportedTypes = [netty, mina]
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
// 檢測當前 Dubbo 所支持的 Transporter 實現(xiàn)類名稱列表中嚎于,
// 是否包含 client 所表示的 Transporter,若不包含挟冠,則拋出異常
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type...");
}
}
return server;
}
繼續(xù)看創(chuàng)建服務器的部分:
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 獲取 Exchanger于购,默認為 HeaderExchanger。
// 緊接著調用 HeaderExchanger 的 bind 方法創(chuàng)建 ExchangeServer 實例
return getExchanger(url).bind(url, handler);
}
下面看一下 HeaderExchanger 的 bind 方法知染。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 創(chuàng)建 HeaderExchangeServer 實例肋僧,該方法包含了多個邏輯,分別如下:
// 1. new HeaderExchangeHandler(handler)
// 2. new DecodeHandler(new HeaderExchangeHandler(handler))
// 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
HeaderExchanger 的 bind 方法包含的邏輯比較多控淡,但目前我們僅需關心 Transporters 的 bind 方法邏輯即可嫌吠。該方法的代碼如下:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handlers 元素數(shù)量大于1,則創(chuàng)建 ChannelHandler 分發(fā)器
handler = new ChannelHandlerDispatcher(handlers);
}
// 獲取自適應 Transporter 實例掺炭,并調用實例方法
return getTransporter().bind(url, handler);
}
如上辫诅,getTransporter() 方法獲取的 Transporter 是在運行時動態(tài)創(chuàng)建的,類名為 TransporterAdaptive涧狮,也就是自適應拓展類炕矮。TransporterAdaptive 會在運行時根據(jù)傳入的 URL 參數(shù)決定加載什么類型的 Transporter,默認為 NettyTransporter者冤。下面我們繼續(xù)跟下去肤视,這次分析的是 NettyTransporter 的 bind 方法。
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
// 創(chuàng)建 NettyServer
return new NettyServer(url, listener);
}
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// 調用父類構造方法
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
//doOpen()..
//doClose()..
//...
}
public abstract class AbstractServer extends AbstractEndpoint implements Server {
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
// 調用父類構造方法涉枫,這里就不用跟進去了邢滑,沒什么復雜邏輯
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
// 獲取 ip 和端口
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
// 設置 ip 為 0.0.0.0
bindIp = NetUtils.ANYHOST;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 獲取最大可接受連接數(shù)
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
// 調用模板方法 doOpen 啟動服務器
doOpen();
} catch (Throwable t) {
throw new RemotingException("Failed to bind ");
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
protected abstract void doOpen() throws Throwable;
protected abstract void doClose() throws Throwable;
}
我們重點關注 doOpen 抽象方法,該方法需要子類實現(xiàn)
NettyServer#doOpen
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
// 創(chuàng)建 boss 和 worker 線程池
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
// 創(chuàng)建 ServerBootstrap
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setOption("child.tcpNoDelay", true);
// 設置 PipelineFactory
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// 綁定到指定的 ip 和端口上
channel = bootstrap.bind(getBindAddress());
}
看到這段代碼用過netty的同學應該很熟悉了拜银,其啟動netty服務端翩迈。到這里服務暴露到遠程就分析完了。
上面涉及到protocol姓惑,exchange搀庶,transport這幾個概念,回顧一下:
-
protocol 遠程調用層:封裝 RPC 調用泵督,以
Invocation
,Result
為中心趾盐,擴展接口為Protocol
,Invoker
,Exporter
-
exchange 信息交換層:封裝請求響應模式,同步轉異步,以
Request
,Response
為中心救鲤,擴展接口為Exchanger
,ExchangeChannel
,ExchangeClient
,ExchangeServer
-
transport 網(wǎng)絡傳輸層:抽象 mina 和 netty 為統(tǒng)一接口久窟,以
Message
為中心,擴展接口為Channel
,Transporter
,Client
,Server
,Codec
3.3 服務注冊
回到RegistryProtocol#export 方法上
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// ${導出服務}
// 省略其他代碼
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
// 注冊服務
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 訂閱 override 數(shù)據(jù)
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 省略部分代碼
}
RegistryProtocol 的 export 方法包含了服務導出本缠,注冊斥扛,以及數(shù)據(jù)訂閱等邏輯。其中服務導出邏輯上一節(jié)已經(jīng)分析過了丹锹,本節(jié)將分析服務注冊邏輯稀颁,相關代碼如下:
public void register(URL registryUrl, URL registedProviderUrl) {
// 獲取 Registry
Registry registry = registryFactory.getRegistry(registryUrl);
// 注冊服務
registry.register(registedProviderUrl);
}
register 方法包含兩步操作,第一步是獲取注冊中心實例楣黍,第二步是向注冊中心注冊服務匾灶。
3.3.1 創(chuàng)建注冊中心
文章開頭已經(jīng)說了,本文使用的注冊中心是 Zookeeper
AbstractRegistryFactory #getRegistry
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString();
LOCK.lock();
try {
// 訪問緩存
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
// 緩存未命中租漂,創(chuàng)建 Registry 實例
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry...");
}
// 寫入緩存
REGISTRIES.put(key, registry);
return registry;
} finally {
LOCK.unlock();
}
}
protected abstract Registry createRegistry(URL url);
如上阶女,getRegistry 方法先訪問緩存,緩存未命中則調用 createRegistry 創(chuàng)建 Registry哩治,然后寫入緩存秃踩。這里的 createRegistry 是一個模板方法,由具體的子類實現(xiàn)锚扎。
ZookeeperRegistryFactory #AbstractRegistryFactory
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
// zookeeperTransporter 由 SPI 在運行時注入吞瞪,類型為 ZookeeperTransporter$Adaptive
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
// 創(chuàng)建 ZookeeperRegistry
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 獲取組名,默認為 dubbo
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
// group = "/" + group
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 創(chuàng)建 Zookeeper 客戶端驾孔,默認為 CuratorZookeeperTransporter
//在2.5.x版本默認的是ZkclientZookeeperClient芍秆,
//在2.6.4默認的CuratorZookeeperClient
//在2.7.x版本已經(jīng)移除Zkclient,若要使用需要自己擴展
zkClient = zookeeperTransporter.connect(url);
// 添加狀態(tài)監(jiān)聽器
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
在上面的代碼代碼中翠勉,我們重點關注 ZookeeperTransporter 的 connect 方法調用妖啥,這個方法用于創(chuàng)建 Zookeeper 客戶端。創(chuàng)建好 Zookeeper 客戶端对碌,意味著注冊中心的創(chuàng)建過程就結束了荆虱。接下來,再來分析一下 Zookeeper 客戶端的創(chuàng)建過程朽们。
前面說過怀读,這里的 zookeeperTransporter 類型為自適應拓展類,因此 connect 方法會在被調用時決定加載什么類型的 ZookeeperTransporter 拓展骑脱,默認為 CuratorZookeeperTransporter菜枷。下面我們到 CuratorZookeeperTransporter 中看一看。
public ZookeeperClient connect(URL url) {
// 創(chuàng)建 CuratorZookeeperClient
return new CuratorZookeeperClient(url);
}
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {
private final CuratorFramework client;
public CuratorZookeeperClient(URL url) {
super(url);
try {
// 創(chuàng)建 CuratorFramework 構造器
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(5000);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
// 構建 CuratorFramework 實例
client = builder.build();
// 添加監(jiān)聽器
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
if (state == ConnectionState.LOST) {
CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
} else if (state == ConnectionState.CONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
} else if (state == ConnectionState.RECONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
}
}
});
// 啟動客戶端
client.start();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
再順便看下ZkclientZookeeperClient
public class ZkclientZookeeperClient extends AbstractZookeeperClient<IZkChildListener> {
private final ZkClientWrapper client;
private volatile KeeperState state = KeeperState.SyncConnected;
public ZkclientZookeeperClient(URL url) {
super(url);
client = new ZkClientWrapper(url.getBackupAddress(), 30000);
client.addListener(new IZkStateListener() {
@Override
public void handleStateChanged(KeeperState state) throws Exception {
ZkclientZookeeperClient.this.state = state;
if (state == KeeperState.Disconnected) {
stateChanged(StateListener.DISCONNECTED);
} else if (state == KeeperState.SyncConnected) {
stateChanged(StateListener.CONNECTED);
}
}
@Override
public void handleNewSession() throws Exception {
stateChanged(StateListener.RECONNECTED);
}
});
client.start();
}
}
過程類似叁丧,都是創(chuàng)建客戶端啤誊,然后增加一個監(jiān)聽器岳瞭。
到這里注冊中心實例創(chuàng)建好了,接下來要做的事情是向注冊中心注冊服務蚊锹。
3.3.2 服務注冊
以 Zookeeper 為例瞳筏,所謂的服務注冊,本質上是將服務配置數(shù)據(jù)寫入到 Zookeeper 的某個路徑的節(jié)點下牡昆。
Zookeeper 可視化客戶端 ZooInspector 查看節(jié)點數(shù)據(jù)如下:
圖中可以看到 com.alibaba.dubbo.demo.DemoService 這個服務對應的配置信息(存儲在 URL 中)最終被注冊到了 /dubbo/com.alibaba.dubbo.demo.DemoService/providers/ 節(jié)點下姚炕。
附一張dubbo注冊到zookeper的節(jié)點層次說明圖:
像注冊中心注冊的代碼在RegistryProtocol#register(registryUrl, registeredProviderUrl)
public void register(URL registryUrl, URL registedProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registedProviderUrl);
}
FailbackRegistry#register(URL url)
public void register(URL url) {
//需要注冊的url: dubbo://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16024&side=provider×tamp=1578478503772
super.register(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// 模板方法,由子類實現(xiàn)
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 獲取 check 參數(shù)迁杨,若 check = true 將會直接拋出異常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register");
} else {
logger.error("Failed to register");
}
// 記錄注冊失敗的鏈接
failedRegistered.add(url);
}
}
protected abstract void doRegister(URL url);
doRegister 方法是一個模板方法钻心,因此我們到 FailbackRegistry 子類 ZookeeperRegistry 中進行分析。如下:
protected void doRegister(URL url) {
try {
// 通過 Zookeeper 客戶端創(chuàng)建節(jié)點铅协,節(jié)點路徑由 toUrlPath 方法生成,路徑格式如下:
// /${group}/${serviceInterface}/providers/${url}
// 比如
// /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register...");
}
}
如上摊沉,ZookeeperRegistry 在 doRegister 中調用了 Zookeeper 客戶端創(chuàng)建服務節(jié)點狐史。節(jié)點路徑由 toUrlPath 方法生成,該方法邏輯不難理解说墨,就不分析了骏全。接下來分析 create 方法,如下:
public void create(String path, boolean ephemeral) {
//path:
///dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F192.168.43.174%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D16024%26side%3Dprovider%26timestamp%3D1578478503772
if (!ephemeral) {
// 如果要創(chuàng)建的節(jié)點類型非臨時節(jié)點尼斧,那么這里要檢測節(jié)點是否存在
if (checkExists(path)) {
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
// 遞歸創(chuàng)建上一級路徑
create(path.substring(0, i), false);
}
// 根據(jù) ephemeral 的值創(chuàng)建臨時或持久節(jié)點
if (ephemeral) {
createEphemeral(path);
} else {
createPersistent(path);
}
}
經(jīng)過這段代碼會創(chuàng)建這些節(jié)點:
持久節(jié)點 /dubbo
持久節(jié)點 /com.alibaba.dubbo.demo.DemoService
持久節(jié)點 /providers
臨時節(jié)點
/dubbo%3A%2F%2F192.168.43.174%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D16024%26side%3Dprovider%26timestamp%3D1578478503772
對于上面的樹型結構數(shù)據(jù)
3.3.2 訂閱override 數(shù)據(jù)
又得回到RegistryProtocol#export方法姜贡,再貼一次
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 導出服務
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// 獲取注冊中心 URL,以 zookeeper 注冊中心為例棺棵,得到的示例 URL 如下:
// zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
URL registryUrl = getRegistryUrl(originInvoker);
// 根據(jù) URL 加載 Registry 實現(xiàn)類楼咳,比如 ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 獲取已注冊的服務提供者 URL,比如:
// dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
// 獲取 register 參數(shù)
boolean register = registeredProviderUrl.getParameter("register", true);
// 向服務提供者與消費者注冊表中注冊服務提供者
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
// 根據(jù) register 的值決定是否注冊服務
if (register) {
// 向注冊中心注冊服務
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// 獲取訂閱 URL烛恤,比如:
// provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
//表示訂閱的是服務提供者provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService的configurators節(jié)點的信息
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
// 創(chuàng)建監(jiān)聽器
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 向注冊中心進行訂閱 override 數(shù)據(jù)
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 創(chuàng)建并返回 DestroyableExporter
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
關注: registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
FailbackRegistry#subscribe
public void subscribe(URL url, NotifyListener listener) {
//url示例
//provider://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=17976&side=provider×tamp=1578479464018
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if (urls != null && !urls.isEmpty()) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// Record a failed registration request to a failed list, retry regularly
addFailedSubscribed(url, listener);
}
}
關注doSubscribe(url, listener);方法
ZookeeperRegistry#doSubscribe(url, listener)
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
//省略...
} else {
List<URL> urls = new ArrayList<URL>();
for (String path : toCategoriesPath(url)) {
//toCategoriesPath(url)解析出要訂閱的節(jié)點路徑
//path:/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
//添加監(jiān)聽器母怜,如果有變化調用notify(url, listener, urls)
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
//創(chuàng)建持久節(jié)點 /dubbo/com.alibaba.dubbo.demo.DemoService/configurators
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
//toUrlsWithEmpty(url, path, children) 這個方法url的協(xié)議頭由provider替換為了empty
//獲得provider中,和consumer匹配的url數(shù)組
//若不存在則創(chuàng)建 empty://的url返回缚柏,可以處理類似服務提供者為空的情況
urls.addAll(toUrlsWithEmpty(url, path, children));
//此時url為:empty://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1384&side=provider×tamp=1578532572533
}
}
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
notify(url, listener, urls)方法會調用AbstractRegistry# notify(URL url, NotifyListener listener, List<URL> urls)方法苹熏,如下:
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((urls == null || urls.isEmpty())
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
/***
表示服務提供者存入本地緩存文件key=com.alibaba.dubbo.demo.DemoService
value=
provider://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16140&side=provider×tamp=1578536147384
***/
saveProperties(url);
//調用RegistryProtocol中的OverrideListener#notify(List<URL> urls)方法
listener.notify(categoryList);
}
}
OverrideListener#notify(List<URL> urls)方法
@Override
public synchronized void notify(List<URL> urls) {
//urls這里只有一條數(shù)據(jù)empty://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16140&side=provider×tamp=1578536147384
logger.debug("original override urls: " + urls);
//subscribeUrl
//provider://192.168.43.174:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16140&side=provider×tamp=1578536147384
//獲取匹配的url
List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl);
logger.debug("subscribe url: " + subscribeUrl + ", override urls: " + matchedUrls);
// No matching results
if (matchedUrls.isEmpty()) {
return;
}
//提取出變化的配置
List<Configurator> configurators = RegistryDirectory.toConfigurators(matchedUrls);
final Invoker<?> invoker;
if (originInvoker instanceof InvokerDelegete) {
invoker = ((InvokerDelegete<?>) originInvoker).getInvoker();
} else {
invoker = originInvoker;
}
//The origin invoker
URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<?> exporter = bounds.get(key);
if (exporter == null) {
logger.warn(new IllegalStateException("error state, exporter should not be null"));
return;
}
//The current, may have been merged many times
URL currentUrl = exporter.getInvoker().getUrl();
//Merged with this configuration
//根據(jù)變化的配置信息組裝新的url
URL newUrl = getConfigedInvokerUrl(configurators, originUrl);
if (!currentUrl.equals(newUrl)) {
//如果新的url和原來的不一樣,則重新導出服務
RegistryProtocol.this.doChangeLocalExport(originInvoker, newUrl);
logger.info("exported provider url changed, origin url: " + originUrl + ", old export url: " + currentUrl + ", new export url: " + newUrl);
}
}
到這里訂閱override數(shù)據(jù)的部分也分析完了
3.3.4 小結
到這里服務注冊的過程分析完了币喧,分為兩個部分:先創(chuàng)建注冊中心實例轨域,之后再通過注冊中心實例注冊服務,然后訂閱配置信息變化杀餐。
4.總結
服務發(fā)布整個流程講完了干发,總結下主要由以下一個步驟:
- 前置工作:檢查參數(shù)組裝URl
- 暴露服務到本地
- 暴露服務到遠程
- 啟動netty暴露服務
- 創(chuàng)建連接zk注冊中心
- 服務注冊到zk
- 到zk訂閱override數(shù)據(jù)