前言
本文就先來重點(diǎn)關(guān)注 Provider 節(jié)點(diǎn)發(fā)布服務(wù)的過程盏檐,從 DubboBootstrap 這個(gè)入口類開始介紹歇式,分析 Provider URL 的組裝以及服務(wù)發(fā)布流程,其中會(huì)詳細(xì)介紹本地發(fā)布和遠(yuǎn)程發(fā)布的核心流程胡野。
DubboBootstrap 入口
整個(gè) Provider 節(jié)點(diǎn)的啟動(dòng)入口是 DubboBootstrap.start() 方法材失,在該方法中會(huì)執(zhí)行一些初始化操作,以及一些狀態(tài)控制字段的更新硫豆,具體實(shí)現(xiàn)如下:
public class DubboBootstrap extends GenericEventListener {
public DubboBootstrap start() {
// CAS操作龙巨,保證啟動(dòng)一次
if (started.compareAndSet(false, true)) {
// 用于判斷當(dāng)前節(jié)點(diǎn)是否已經(jīng)啟動(dòng)完畢,在后面的Dubbo QoS中會(huì)使用到該字段
ready.set(false);
// 初始化一些基礎(chǔ)組件熊响,例如旨别,配置中心相關(guān)組件、事件監(jiān)聽汗茄、元數(shù)據(jù)相關(guān)組件
initialize();
if (logger.isInfoEnabled()) {
logger.info(NAME + " is starting...");
}
// 1. export Dubbo Services
// 重點(diǎn):發(fā)布服務(wù)
exportServices();
// Not only provider register
if (!isOnlyRegisterProvider() || hasExportedServices()) {
// 2. export MetadataService
// 用于暴露本地元數(shù)據(jù)服務(wù)
exportMetadataService();
//3. Register the local ServiceInstance if required
// 用于將服務(wù)實(shí)例注冊到專用于服務(wù)發(fā)現(xiàn)的注冊中心
registerServiceInstance();
}
// 處理Consumer的ReferenceConfig
referServices();
if (asyncExportingFutures.size() > 0) {
// 異步發(fā)布服務(wù)秸弛,會(huì)啟動(dòng)一個(gè)線程監(jiān)聽發(fā)布是否完成,完成之后會(huì)將ready設(shè)置為true
new Thread(() -> {
try {
this.awaitFinish();
} catch (Exception e) {
logger.warn(NAME + " exportAsync occurred an exception.");
}
ready.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
}
ExtensionLoader<DubboBootstrapStartStopListener> exts = getExtensionLoader(DubboBootstrapStartStopListener.class);
exts.getSupportedExtensionInstances().forEach(ext -> ext.onStart(this));
}).start();
} else {
// 同步發(fā)布服務(wù)成功之后洪碳,會(huì)將ready設(shè)置為true
ready.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
}
ExtensionLoader<DubboBootstrapStartStopListener> exts = getExtensionLoader(DubboBootstrapStartStopListener.class);
exts.getSupportedExtensionInstances().forEach(ext -> ext.onStart(this));
}
if (logger.isInfoEnabled()) {
logger.info(NAME + " has started.");
}
}
return this;
}
}
不僅是直接通過 API 啟動(dòng) Provider 的方式會(huì)使用到 DubboBootstrap递览,在 Spring 與 Dubbo 集成的時(shí)候也是使用 DubboBootstrap 作為服務(wù)發(fā)布入口的,具體邏輯在 DubboBootstrapApplicationListener 這個(gè) Spring Context 監(jiān)聽器中瞳腌,如下所示:
public class DubboBootstrapApplicationListener extends OnceApplicationContextEventListener implements Ordered {
/**
* The bean name of {@link DubboBootstrapApplicationListener}
*
* @since 2.7.6
*/
public static final String BEAN_NAME = "dubboBootstrapApplicationListener";
private final DubboBootstrap dubboBootstrap;
public DubboBootstrapApplicationListener(ApplicationContext applicationContext) {
super(applicationContext);
// 初始化DubboBootstrap對象
this.dubboBootstrap = DubboBootstrap.getInstance();
DubboBootstrapStartStopListenerSpringAdapter.applicationContext = applicationContext;
}
@Override
public void onApplicationContextEvent(ApplicationContextEvent event) {
// 監(jiān)聽ContextRefreshedEvent事件和ContextClosedEvent事件
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
// 啟動(dòng)DubboBootstrap
dubboBootstrap.start();
}
private void onContextClosedEvent(ContextClosedEvent event) {
dubboBootstrap.stop();
}
@Override
public int getOrder() {
return LOWEST_PRECEDENCE;
}
}
這里重點(diǎn)關(guān)注的是exportServices() 方法绞铃,它是服務(wù)發(fā)布核心邏輯的入口,其中每一個(gè)服務(wù)接口都會(huì)轉(zhuǎn)換為對應(yīng)的 ServiceConfig 實(shí)例嫂侍,然后通過代理的方式轉(zhuǎn)換成 Invoker儿捧,最終轉(zhuǎn)換成 Exporter 進(jìn)行發(fā)布荚坞。服務(wù)發(fā)布流程中涉及的核心對象轉(zhuǎn)換,如下圖所示:
exportServices() 方法的具體實(shí)現(xiàn)如下:
public class DubboBootstrap extends GenericEventListener {
private void exportServices() {
// 從配置管理器中獲取到所有的要暴露的服務(wù)配置菲盾,一個(gè)接口類對應(yīng)一個(gè)ServiceConfigBase實(shí)例
configManager.getServices().forEach(sc -> {
// TODO, compatible with ServiceConfig.export()
ServiceConfig serviceConfig = (ServiceConfig) sc;
serviceConfig.setBootstrap(this);
// 異步模式颓影,獲取一個(gè)線程池來異步執(zhí)行服務(wù)發(fā)布邏輯
if (exportAsync) {
ExecutorService executor = executorRepository.getServiceExporterExecutor();
Future<?> future = executor.submit(() -> {
sc.export();
exportedServices.add(sc);
});
// 記錄異步發(fā)布的Future
asyncExportingFutures.add(future);
} else {
// 同步發(fā)布
sc.export();
exportedServices.add(sc);
}
});
}
}
ServiceConfig
在 ServiceConfig.export() 方法中,服務(wù)發(fā)布的第一步是檢查參數(shù)亿汞,第二步會(huì)根據(jù)當(dāng)前配置決定是延遲發(fā)布還是立即調(diào)用 doExport() 方法進(jìn)行發(fā)布瞭空,第三步會(huì)通過 exported() 方法回調(diào)相關(guān)監(jiān)聽器,具體實(shí)現(xiàn)如下:
public class ServiceConfig<T> extends ServiceConfigBase<T> {
public synchronized void export() {
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.initialize();
}
// 檢查并更新各項(xiàng)配置
checkAndUpdateSubConfigs();
//init serviceMetadata
// 初始化元數(shù)據(jù)相關(guān)服務(wù)
serviceMetadata.setVersion(getVersion());
serviceMetadata.setGroup(getGroup());
serviceMetadata.setDefaultGroup(getGroup());
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setServiceInterfaceName(getInterface());
serviceMetadata.setTarget(getRef());
if (!shouldExport()) {
return;
}
// 延遲發(fā)布
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 立即發(fā)布
doExport();
}
// 回調(diào)監(jiān)聽器
exported();
}
}
在 checkAndUpdateSubConfigs() 方法中疗我,會(huì)去檢查各項(xiàng)配置是否合理咆畏,并補(bǔ)齊一些缺省的配置信息,這個(gè)方法非常冗長吴裤。
完成配置的檢查之后旧找,再來看 doExport() 方法,其中首先調(diào)用 loadRegistries() 方法加載注冊中心信息麦牺,即將 RegistryConfig 配置解析成 registryUrl钮蛛。無論是使用 XML、Annotation剖膳,還是 API 配置方式魏颓,都可以配置多個(gè)注冊中心地址,一個(gè)服務(wù)接口可以同時(shí)注冊在多個(gè)不同的注冊中心吱晒。
RegistryConfig 是 Dubbo 的多個(gè)配置對象之一甸饱,可以通過解析 XML、Annotation 中注冊中心相關(guān)的配置得到仑濒,對應(yīng)的配置如下(當(dāng)然叹话,也可以直接通過 API 創(chuàng)建得到):
<dubbo:registry address="zookeeper://127.0.0.1:2181" protocol="zookeeper" port="2181" />
RegistryUrl 的格式大致如下(為了方便查看,這里將每個(gè) URL 參數(shù)單獨(dú)放在一行中展示):
// path是Zookeeper的地址
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?
application=dubbo-demo-api-provider
&dubbo=2.0.2
&pid=9405
®istry=zookeeper // 使用的注冊中心是Zookeeper
×tamp=1600307343086
加載注冊中心信息得到 RegistryUrl 之后墩瞳,會(huì)遍歷所有的 ProtocolConfig驼壶,依次調(diào)用 doExportUrlsFor1Protocol(protocolConfig, registryURLs) 在每個(gè)注冊中心發(fā)布服務(wù)。一個(gè)服務(wù)接口可以以多種協(xié)議進(jìn)行發(fā)布喉酌,每種協(xié)議都對應(yīng)一個(gè) ProtocolConfig热凹,例如我們在 Demo 示例中,只使用了 dubbo 協(xié)議泪电,對應(yīng)的配置是:<dubbo:protocol name="dubbo" />
般妙。
組裝服務(wù) URL
doExportUrlsFor1Protocol() 方法的代碼非常長,這里我們分成兩個(gè)部分進(jìn)行介紹:一部分是組裝服務(wù)的 URL歪架,另一部分就是后面緊接著介紹的服務(wù)發(fā)布股冗。
組裝服務(wù)的 URL核心步驟有如下 7 步:
1霹陡、獲取此次發(fā)布使用的協(xié)議和蚪,默認(rèn)使用 dubbo 協(xié)議止状。
2、設(shè)置服務(wù) URL 中的參數(shù)攒霹,這里會(huì)從 MetricsConfig怯疤、ApplicationConfig、ModuleConfig催束、ProviderConfig集峦、ProtocolConfig 中獲取配置信息,并作為參數(shù)添加到 URL 中抠刺。這里調(diào)用的 appendParameters() 方法會(huì)將 AbstractConfig 中的配置信息存儲(chǔ)到 Map 集合中塔淤,后續(xù)在構(gòu)造 URL 的時(shí)候,會(huì)將該集合中的 KV 作為 URL 的參數(shù)速妖。
3高蜂、解析指定方法的 MethodConfig 配置以及方法參數(shù)的 ArgumentConfig 配置,得到的配置信息也是記錄到 Map 集合中,后續(xù)作為 URL 參數(shù)。
4讼稚、根據(jù)此次調(diào)用是泛化調(diào)用還是普通調(diào)用力崇,向 Map 集合中添加不同的鍵值對。
5步悠、獲取 token 配置,并添加到 Map 集合中,默認(rèn)隨機(jī)生成 UUID惭笑。
6、獲取 host砌左、port 值脖咐,并開始組裝服務(wù)的 URL。
7汇歹、根據(jù) Configurator 覆蓋或新增 URL 參數(shù)屁擅。
下面是 doExportUrlsFor1Protocol() 方法組裝 URL 的核心實(shí)現(xiàn):
public class ServiceConfig<T> extends ServiceConfigBase<T> {
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//首先是將一些信息,比如版本产弹、時(shí)間戳派歌、方法名以及各種配置對象的字段信息放入到 map 中
//map 中的內(nèi)容將作為 URL 的查詢字符串。構(gòu)建好 map 后痰哨,緊接著是獲取上下文路徑胶果、主機(jī)名以及端口號(hào)等信息。
//最后將 map 和主機(jī)名等數(shù)據(jù)傳給 URL 構(gòu)造方法創(chuàng)建 URL 對象斤斧。需要注意的是早抠,這里出現(xiàn)的 URL 并非 java.net.URL,而是 com.alibaba.dubbo.common.URL撬讽。
String name = protocolConfig.getName();
// 如果協(xié)議名為空蕊连,或空串悬垃,則將協(xié)議名變量設(shè)置為 dubbo
if (StringUtils.isEmpty(name)) {
//<dubbo:protocol name=""/>默認(rèn)為dubbo
name = DUBBO;
}
Map<String, String> map = new HashMap<String, String>();
// 添加 side、版本甘苍、時(shí)間戳以及進(jìn)程號(hào)等信息到 map 中
map.put(SIDE_KEY, PROVIDER_SIDE);
ServiceConfig.appendRuntimeParameters(map);
// 通過反射將對象的字段信息添加到 map 中
AbstractConfig.appendParameters(map, getMetrics());
AbstractConfig.appendParameters(map, getApplication());
AbstractConfig.appendParameters(map, getModule());
// remove 'default.' prefix for configs from ProviderConfig
// appendParameters(map, provider, Constants.DEFAULT_KEY);
AbstractConfig.appendParameters(map, provider);
AbstractConfig.appendParameters(map, protocolConfig);
AbstractConfig.appendParameters(map, this);
MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
if (metadataReportConfig != null && metadataReportConfig.isValid()) {
map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
}
// methods 為 MethodConfig 集合尝蠕,MethodConfig 中存儲(chǔ)了 <dubbo:method> 標(biāo)簽的配置信息
if (CollectionUtils.isNotEmpty(getMethods())) {
//檢測 <dubbo:method> 標(biāo)簽中的配置信息,并將相關(guān)配置添加到 map 中
for (MethodConfig method : getMethods()) {
AbstractConfig.appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (CollectionUtils.isNotEmpty(arguments)) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
//genericService設(shè)置
if (ProtocolUtils.isGeneric(generic)) {
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
//接口版本號(hào)载庭,跟隨jar包 例如:1.0-SNAPSHOT
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
//接口的方法名數(shù)組
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
//空方法標(biāo)志map.put("methods","*")
map.put(METHODS_KEY, ANY_VALUE);
} else {
//方法數(shù)組字符串map.put("methods","insert,update")
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
/**
* Here the token value configured by the provider is used to assign the value to ServiceConfig#token
* Service的token驗(yàn)證
*/
if (ConfigUtils.isEmpty(token) && provider != null) {
token = provider.getToken();
}
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(TOKEN_KEY, token);
}
}
//init serviceMetadata attachments
//map放入到serviceMetadata中
serviceMetadata.getAttachments().putAll(map);
// export service
// 獲取協(xié)議host看彼,默認(rèn)獲取本機(jī)ip <dubbo:protocol host="" />
String host = findConfigedHosts(protocolConfig, registryURLs, map);
//獲取協(xié)議port,dubbo默認(rèn)20880 <dubbo:protocol port="" />
Integer port = findConfigedPorts(protocolConfig, name, map);
//生成url格式如下
//dubbo://192.168.56.1:20880/org.study.service.UserService?anyhost=true&application=dubbo-demo&bind.ip=192.168.56.1&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.study.service.UserService&methods=getUserById,update,insert,transactionalTest,getUserByUserId,delete&pid=13252&release=2.7.5&revision=1.0-SNAPSHOT&side=provider×tamp=1584192937036
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
// You can customize Configurator to append extra parameters
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
//........
}
}
經(jīng)過上述準(zhǔn)備操作之后囚聚,得到的服務(wù) URL 如下所示(為了方便查看靖榕,這里將每個(gè) URL 參數(shù)單獨(dú)放在一行中展示):
dubbo://172.17.108.185:20880/org.apache.dubbo.demo.DemoService?
anyhost=true
&application=dubbo-demo-api-provider
&bind.ip=172.17.108.185
&bind.port=20880
&default=true
&deprecated=false
&dubbo=2.0.2
&dynamic=true
&generic=false
&interface=org.apache.dubbo.demo.DemoService
&methods=sayHello,sayHelloAsync
&pid=3918
&release=
&side=provider
×tamp=1600437404483
服務(wù)發(fā)布入口
完成了服務(wù) URL 的組裝之后,doExportUrlsFor1Protocol() 方法開始執(zhí)行服務(wù)發(fā)布顽铸。服務(wù)發(fā)布可以分為遠(yuǎn)程發(fā)布和本地發(fā)布序矩,具體發(fā)布方式與服務(wù) URL 中的 scope 參數(shù)有關(guān)。
scope 參數(shù)有三個(gè)可選值跋破,分別是 none簸淀、remote 和 local,分別代表不發(fā)布毒返、發(fā)布到本地和發(fā)布到遠(yuǎn)端注冊中心租幕,從下面介紹的 doExportUrlsFor1Protocol() 方法代碼中可以看到:
發(fā)布到本地的條件是 scope != remote;
發(fā)布到注冊中心的條件是 scope != local拧簸。
scope 參數(shù)的默認(rèn)值為 null劲绪,也就是說,默認(rèn)會(huì)同時(shí)在本地和注冊中心發(fā)布該服務(wù)盆赤。下面來看 doExportUrlsFor1Protocol() 方法中發(fā)布服務(wù)的具體實(shí)現(xiàn):
public class ServiceConfig<T> extends ServiceConfigBase<T> {
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// 省略組裝服務(wù)URL的過程
// 從URL中獲取scope參數(shù)贾富,其中可選值有none、remote牺六、local三個(gè)颤枪,
// 分別代表不發(fā)布、發(fā)布到本地以及發(fā)布到遠(yuǎn)端淑际,具體含義在下面一一介紹
//獲取作用域scope屬性
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
// scope == "none",不注冊畏纲,一般為null
// scope不為none,才進(jìn)行發(fā)布
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
//scope != "remote" ,注冊到本地春缕,一般為null會(huì)注冊到本地
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
// 發(fā)布到遠(yuǎn)端的注冊中心
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
//scope != "local" ,注冊到注冊表盗胀,一般為null會(huì)注冊到注冊表
if (CollectionUtils.isNotEmpty(registryURLs)) {// 當(dāng)前配置了至少一個(gè)注冊中心
for (URL registryURL : registryURLs) {// 向每個(gè)注冊中心發(fā)布服務(wù)
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
// injvm協(xié)議只在exportLocal()中有用,不會(huì)將服務(wù)發(fā)布到注冊中心
//injvm,內(nèi)部調(diào)用不注冊
continue;
}
//url存在dynamic保留锄贼,不存在賦值為registryURL的dynamic屬性值
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
// 創(chuàng)建monitorUrl票灰,并作為monitor參數(shù)添加到服務(wù)URL中
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}
// For providers, this is used to enable custom proxy to generate invoker
// 設(shè)置服務(wù)URL的proxy參數(shù),即生成動(dòng)態(tài)代理方式(jdk或是javassist),作為參數(shù)添加到RegistryURL中
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 為服務(wù)實(shí)現(xiàn)類的對象創(chuàng)建相應(yīng)的Invoker屑迂,getInvoker()方法的第三個(gè)參數(shù)中强品,會(huì)將服務(wù)URL作為export參數(shù)添加到RegistryURL中
// 這里的PROXY_FACTORY是ProxyFactory接口的適配器
//創(chuàng)建一個(gè)AbstractProxyInvoker的子類實(shí)例new AbstractProxyInvoker(ref, interfaceClass,rul)
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// DelegateProviderMetaDataInvoker是個(gè)裝飾類,
// 將當(dāng)前ServiceConfig和Invoker關(guān)聯(lián)起來而已屈糊,invoke()方法透傳給底層Invoker對象
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//默認(rèn)DubboProtocol.export注冊到遠(yuǎn)程注冊表zookeeper中
// 調(diào)用Protocol實(shí)現(xiàn),進(jìn)行發(fā)布
// 這里的PROTOCOL是Protocol接口的適配器
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
// 不存在注冊中心琼了,僅導(dǎo)出服務(wù)
} else {
// 不存在注冊中心逻锐,僅發(fā)布服務(wù),不會(huì)將服務(wù)信息發(fā)布到注冊中心雕薪。
// Consumer沒法在注冊中心找到該服務(wù)的信息昧诱,但是可以直連
// 具體的發(fā)布過程與上面的過程類似,只不過不會(huì)發(fā)布到注冊中心
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
MetadataUtils.publishServiceDefinition(url);
}
}
this.urls.add(url);
}
}
本地發(fā)布
了解了本地發(fā)布所袁、遠(yuǎn)程發(fā)布的入口邏輯之后盏档,下面開始深入本地發(fā)布的邏輯。
在 exportLocal() 方法中燥爷,會(huì)將 Protocol 替換成 injvm 協(xié)議蜈亩,將 host 設(shè)置成 127.0.0.1,將 port 設(shè)置為 0前翎,得到新的 LocalURL稚配,大致如下:
injvm://127.0.0.1/org.apache.dubbo.demo.DemoService?anyhost=true
&application=dubbo-demo-api-provider
&bind.ip=172.17.108.185
&bind.port=20880
&default=true
&deprecated=false
&dubbo=2.0.2
&dynamic=true
&generic=false
&interface=org.apache.dubbo.demo.DemoService
&methods=sayHello,sayHelloAsync
&pid=4249
&release=
&side=provider
×tamp=1600440074214
之后,會(huì)通過 ProxyFactory 接口適配器找到對應(yīng)的 ProxyFactory 實(shí)現(xiàn)(默認(rèn)使用 JavassistProxyFactory)港华,并調(diào)用 getInvoker() 方法創(chuàng)建 Invoker 對象道川;最后,通過 Protocol 接口的適配器查找到 InjvmProtocol 實(shí)現(xiàn)立宜,并調(diào)用 export() 方法進(jìn)行發(fā)布冒萄。 exportLocal() 方法的具體實(shí)現(xiàn)如下:
public class ServiceConfig<T> extends ServiceConfigBase<T> {
private void exportLocal(URL url) {
//進(jìn)行本地URL的構(gòu)建
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
//根據(jù)本地的URL來實(shí)現(xiàn)對應(yīng)的Invoker
Exporter<?> exporter = PROTOCOL.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}
}
String LOCALHOST_VALUE = "127.0.0.1";
String LOCAL_PROTOCOL = "injvm";
可見發(fā)布到本地是重新構(gòu)建了protocol,injvm就是代表在本地的JVM里橙数,host與port都統(tǒng)一默認(rèn)127.0.0.1:0尊流。
遠(yuǎn)程發(fā)布
介紹完本地發(fā)布之后,再來看遠(yuǎn)程發(fā)布的核心邏輯灯帮,遠(yuǎn)程服務(wù)發(fā)布的流程相較本地發(fā)布流程奠旺,要復(fù)雜得多。
在 doExportUrlsFor1Protocol() 方法中施流,遠(yuǎn)程發(fā)布服務(wù)時(shí)响疚,會(huì)遍歷全部 RegistryURL,并根據(jù) RegistryURL 選擇對應(yīng)的 Protocol 擴(kuò)展實(shí)現(xiàn)進(jìn)行發(fā)布瞪醋。我們知道 RegistryURL 是 "registry://" 協(xié)議忿晕,所以這里使用的是 RegistryProtocol 實(shí)現(xiàn)。
下面來看 RegistryProtocol.export() 方法的核心流程:
public class RegistryProtocol implements Protocol {
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 將"registry://"協(xié)議轉(zhuǎn)換成"zookeeper://"協(xié)議
//注冊url银受,zookeeper
//zookeeper://mcip:2291/org.apache.dubbo.registry.RegistryService?application=dubbo-demo&backup=mcip:2292,mcip:2293&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Forg.study.service.UserService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.study.service.UserService%26methods%3DgetUserById%2Cupdate%2Cinsert%2CtransactionalTest%2CgetUserByUserId%2Cdelete%26pid%3D12956%26release%3D2.7.5%26revision%3D1.0-SNAPSHOT%26side%3Dprovider%26timestamp%3D1584262152345&pid=12956&release=2.7.5×tamp=1584262152342
URL registryUrl = getRegistryUrl(originInvoker);
// 獲取export參數(shù)践盼,其中存儲(chǔ)了一個(gè)"dubbo://"協(xié)議的ProviderURL
// 暴露服務(wù)的url
//dubbo://192.168.56.1:20880/org.study.service.UserService?anyhost=true&application=dubbo-demo&bind.ip=192.168.56.1&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.study.service.UserService&methods=getUserById,update,insert,transactionalTest,getUserByUserId,delete&pid=12956&release=2.7.5&revision=1.0-SNAPSHOT&side=provider×tamp=1584262152345
URL providerUrl = getProviderUrl(originInvoker);
// 獲取要監(jiān)聽的配置目錄鸦采,這里會(huì)在ProviderURL的基礎(chǔ)上添加category=configurators參數(shù),
// 并封裝成對OverrideListener記錄到overrideListeners集合中
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
//url與service綁定咕幻,放入容器中渔伯,遠(yuǎn)程調(diào)用時(shí)根據(jù)url找到serviceimpl
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
//向訂閱中心推送監(jiān)聽器
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 初始化時(shí)會(huì)檢測一次Override配置,重寫ProviderURL
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//導(dǎo)出服務(wù)
//注冊invoker到本地dubbo.expo
//調(diào)用DubboProtocol.export肄程,其中openServer開啟netty監(jiān)聽锣吼,
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// 最終的注冊到zookeeper
final Registry registry = getRegistry(originInvoker);
// 獲取將要發(fā)布到注冊中心上的Provider URL,其中會(huì)刪除一些多余的參數(shù)信息
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// 獲取 register 參數(shù)
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
// 根據(jù) register 的值決定是否注冊服務(wù)
if (register) {
// 調(diào)用Registry.register()方法將registeredProviderUrl發(fā)布到注冊中心
register(registryUrl, registeredProviderUrl);
}
// 將Provider相關(guān)信息記錄到的ProviderModel中
registerStatedUrl(registryUrl, registeredProviderUrl, register);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
/// 向注冊中心進(jìn)行訂閱override數(shù)據(jù)蓝厌,主要是監(jiān)聽該服務(wù)的configurators節(jié)點(diǎn)
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 確認(rèn)注冊完
// 觸發(fā)RegistryProtocolListener監(jiān)聽器
notifyExport(exporter);
// 創(chuàng)建并返回 DestroyableExporter
return new DestroyableExporter<>(exporter);
}
}
可以看到玄叠,遠(yuǎn)程發(fā)布流程大致可分為下面 5 個(gè)步驟:
1、準(zhǔn)備 URL拓提,比如 ProviderURL读恃、RegistryURL 和 OverrideSubscribeUrl。
2代态、發(fā)布 Dubbo 服務(wù)寺惫。在 doLocalExport() 方法中調(diào)用 DubboProtocol.export() 方法啟動(dòng) Provider 端底層 Server。
3蹦疑、注冊 Dubbo 服務(wù)肌蜻。在 register() 方法中,調(diào)用 ZookeeperRegistry.register() 方法向 Zookeeper 注冊服務(wù)必尼。
4蒋搜、訂閱 Provider 端的 Override 配置。調(diào)用 ZookeeperRegistry.subscribe() 方法訂閱注冊中心 configurators 節(jié)點(diǎn)下的配置變更判莉。
5豆挽、觸發(fā) RegistryProtocolListener 監(jiān)聽器。
遠(yuǎn)程發(fā)布的詳細(xì)流程如下圖所示:
總結(jié)
本文重點(diǎn)介紹了 Dubbo 服務(wù)發(fā)布的核心流程券盅。
首先介紹了 DubboBootstrap 這個(gè)入口門面類中與服務(wù)發(fā)布相關(guān)的方法帮哈,重點(diǎn)是 start() 和 exportServices() 兩個(gè)方法;然后詳細(xì)介紹了 ServiceConfig 類的三個(gè)核心步驟:檢查參數(shù)锰镀、立即(或延遲)執(zhí)行 doExport() 方法進(jìn)行發(fā)布娘侍、回調(diào)服務(wù)發(fā)布的相關(guān)監(jiān)聽器。
接下來泳炉,分析了doExportUrlsFor1Protocol() 方法憾筏,它是發(fā)布一個(gè)服務(wù)的入口,也是規(guī)定服務(wù)發(fā)布流程的地方花鹅,其中涉及 Provider URL 的組裝氧腰、本地服務(wù)發(fā)布流程以及遠(yuǎn)程服務(wù)發(fā)布流程。