Dubbo調(diào)用過(guò)程參與者有服務(wù)提供方蕉陋、注冊(cè)中心捐凭、服務(wù)消費(fèi)方。其中注冊(cè)中心是單獨(dú)部署的凳鬓,服務(wù)提供方和消費(fèi)方是集成在業(yè)務(wù)里面的茁肠,今天來(lái)分析下服務(wù)提供方服務(wù)暴露的流程。
不管通過(guò)哪種啟動(dòng)方式(Dubbo的幾種啟動(dòng)方式)缩举,服務(wù)接口暴露的起點(diǎn)都是從ServiceConfig.export方法開(kāi)始的垦梆,這里先簡(jiǎn)單分析下通過(guò)xml中配置dubbo到export的流程。
從xml配置到ServiceConfig.export
xml中dubbo服務(wù)提供方的配置:
<dubbo:application name="provider"/>
<dubbo:registry address="zookeeper://ip:2181" timeout="3000"/>
<dubbo:protocol name="dubbo" port="20880" host="127.0.0.1"/>
<dubbo:service interface="com.provider.service.TestService" class="com.provider.service.TestServiceImpl"/>
可以看到仅孩,配置文件中使用的是dubbo的自定義標(biāo)簽托猩,那么,就先來(lái)看dubbo的自定義標(biāo)簽解析器辽慕。通過(guò)dubbo的spring.handlers文件京腥,知道dubbo標(biāo)簽是通過(guò)
DubboNamespaceHandler來(lái)處理的:
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
可以看到解析service標(biāo)簽委托給了DubboBeanDefinitionParser,beanClass為ServiceBean溅蛉,繼續(xù)看DubboBeanDefinitionParser.parse方法:
private static BeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required) {
RootBeanDefinition beanDefinition = new RootBeanDefinition();
beanDefinition.setBeanClass(beanClass);
beanDefinition.setLazyInit(false);
...
parserContext.getRegistry().registerBeanDefinition(id, beanDefinition);
...
parse的代碼做了刪減绞旅,主要的邏輯是配置bean的屬性并往spring容器中注冊(cè)了beanClass的beanDefinition,此時(shí)是ServiceBean温艇。所以通過(guò)xml進(jìn)行配置dubbo其實(shí)主要是在完成一件事情因悲,就是配置好ServiceBean的屬性,然后將它注冊(cè)到spring容器中勺爱。而ServiceBean是ServiceConfig的子類晃琳,同時(shí)實(shí)現(xiàn)了InitializingBean、ApplicationContextAware琐鲁、ApplicationListener<ContextRefreshedEvent>等spring擴(kuò)展功能接口卫旱。根據(jù)ApplicationContext是否支持事件監(jiān)聽(tīng)機(jī)制來(lái)決定是在收到ContextRefreshedEvent時(shí)調(diào)用export還是初始化的時(shí)候進(jìn)行:
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
public void afterPropertiesSet() throws Exception {
......
if (!supportedApplicationListener) {
export();
}
}
ServiceBean的export中調(diào)用了super.export,也就是ServiceConfig.export方法围段。
@Override
public void export() {
super.export();
// Publish ServiceBeanExportedEvent
publishExportEvent();
}
上面流程分析了xml中配置dubbo也是通過(guò)ServiceConfig.export方法來(lái)暴露服務(wù)的顾翼,下面從ServiceConfig.export來(lái)分析服務(wù)啟動(dòng)的過(guò)程。
ServiceConfig.export服務(wù)暴露流程分析
首先從需求角度來(lái)大至服務(wù)暴露需要完成哪些工作奈泪?
服務(wù)暴露成功后适贸,消費(fèi)端就可以通過(guò)接口透明的調(diào)用服務(wù)了灸芳,由于是RPC調(diào)用,是通過(guò)網(wǎng)絡(luò)訪問(wèn)來(lái)完成的拜姿,所以服務(wù)暴露肯定需要?jiǎng)?chuàng)建一個(gè)網(wǎng)絡(luò)服務(wù)器等待消費(fèi)者來(lái)連接通信烙样;要進(jìn)行通信肯定需要定義協(xié)議,才能保證雙方能夠正常的通信蕊肥;消費(fèi)方并不是直接寫死的服務(wù)提供方主機(jī)和端口谒获,而是從注冊(cè)中心拉取的,這樣才靈活可擴(kuò)展壁却,所以服務(wù)暴露的過(guò)程肯定需要將自己的信息注冊(cè)到注冊(cè)中心批狱。下面就從ServiceConfig.export源碼開(kāi)始看下這個(gè)過(guò)程
public synchronized void export() {
checkAndUpdateSubConfigs();
if (!shouldExport()) {
return;
}
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
doExport();
}
}
做了一些校驗(yàn)和延時(shí)暴露的判斷,關(guān)鍵流程在doExport中:
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) {
return;
}
exported = true;
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
doExportUrls();
}
通過(guò)標(biāo)識(shí)位設(shè)置避免重復(fù)暴露展东,然后進(jìn)入doExportUrls:
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
迭代針對(duì)每一種配置的協(xié)議調(diào)用doExportUrlsFor1Protocol進(jìn)行暴露(一個(gè)接口可以通過(guò)多種協(xié)議同時(shí)進(jìn)行暴露)精耐,這個(gè)方法里面開(kāi)始了上述邏輯,方法體比較長(zhǎng)琅锻,節(jié)選幾個(gè)比較關(guān)鍵的部分:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//省略一些參數(shù)配置的部分
// export service
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
//將協(xié)議卦停、接口、主機(jī)恼蓬、端口和配置參數(shù)封裝到URL對(duì)象里面
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
...
//下面是服務(wù)暴露的關(guān)鍵代碼
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
最后幾行是服務(wù)暴露的關(guān)鍵代碼惊完,著重看下。
- PROXY_FACTORY.getInvoker是將服務(wù)的接口的實(shí)現(xiàn)類封裝成Invoker, Invoker是 Dubbo 的核心模型处硬,其它模型都向它靠擾小槐,或轉(zhuǎn)換成它,它代表一個(gè)可執(zhí)行體荷辕,可向它發(fā)起 invoke 調(diào)用凿跳,它有可能是一個(gè)本地的實(shí)現(xiàn),也可能是一個(gè)遠(yuǎn)程的實(shí)現(xiàn)疮方,也可能一個(gè)集群實(shí)現(xiàn)控嗜。當(dāng)遠(yuǎn)程請(qǐng)求過(guò)來(lái)是,就是通過(guò)這個(gè)invoker來(lái)調(diào)用服務(wù)端接口實(shí)現(xiàn)類的具體方法的骡显。
- protocol.export(wrapperInvoker)是通過(guò)協(xié)議將invoker進(jìn)行暴露疆栏,是最關(guān)鍵的地方了。接下來(lái)跟進(jìn)這個(gè)方法里面詳細(xì)查看惫谤,由于protocol是一個(gè)自適應(yīng)拓展點(diǎn)壁顶,而且Protocol接口沒(méi)有默認(rèn)的自適應(yīng)實(shí)現(xiàn)類(有關(guān)拓展點(diǎn)可查看Dubbo拓展點(diǎn)),所以protocol.export的執(zhí)行是根據(jù)URL中protocol參數(shù)來(lái)決定的溜歪。
在有注冊(cè)中心的情況下(沒(méi)有注冊(cè)中心的情況比較簡(jiǎn)單若专,且包含在了有注冊(cè)中心的情況下),invoker中的url是registryURL蝴猪,協(xié)議參數(shù)為registry调衰,所以export方法調(diào)用的是RegistryProtocol. export:
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
其中doLocalExport(originInvoker, providerUrl)是本地暴露膊爪,而register(registryUrl, registeredProviderUrl)則是將本地暴露的URL注冊(cè)到注冊(cè)中心了。
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
本地暴露的時(shí)候窖式,也是調(diào)用的protocol.export,不過(guò)此時(shí)的invoker中的URL是providerURL动壤,協(xié)議參數(shù)為具體的協(xié)議萝喘,默認(rèn)是dubbo,會(huì)進(jìn)入dubboProtocol.export方法琼懊,如果沒(méi)有配置注冊(cè)中心的話阁簸,也是直接進(jìn)入這個(gè)方法:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
看到openServer(url)猜測(cè)應(yīng)該就是開(kāi)啟服務(wù)器,監(jiān)聽(tīng)遠(yuǎn)程網(wǎng)絡(luò)連接了哼丈,源碼就不一一列舉了启妹,簡(jiǎn)單理解就是開(kāi)啟了NettyServer并注冊(cè)了requestHandler來(lái)處理入站數(shù)據(jù)。
這樣本地暴露就完成了醉旦,結(jié)果是創(chuàng)建了一個(gè)netty服務(wù)器等待遠(yuǎn)程連接來(lái)訪問(wèn)饶米,這個(gè)時(shí)候已經(jīng)可以通過(guò)消費(fèi)端來(lái)調(diào)用接口了,但是寫死的服務(wù)端ip车胡、端口不夠靈活檬输,所以一般都使用注冊(cè)中心來(lái)進(jìn)行服務(wù)發(fā)現(xiàn)⌒偌回過(guò)頭來(lái)看服務(wù)注冊(cè)到注冊(cè)中心的流程:register(registryUrl, registeredProviderUrl):
public void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
實(shí)際上就是通過(guò)工廠方法獲取注冊(cè)中心丧慈,然后注冊(cè)服務(wù)提供方的URL,注冊(cè)中心有多種實(shí)現(xiàn)主卫,以zookeeper為例逃默,
@Override
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
實(shí)現(xiàn)為向zookeeper根據(jù)接口參數(shù)創(chuàng)建一個(gè)臨時(shí)目錄,這樣在消費(fèi)端連接上注冊(cè)中心后簇搅,就可以從對(duì)應(yīng)的接口目錄下獲取provider的信息了完域。