1. 前言
Dubbo服務啟動過程中伴隨著服務注冊的過程傲武,也就是服務導出。本篇文章主要是記錄一下Dubbo的服務導出過程渠驼。Dubbo服務導出開始于Spring容器發(fā)布刷新事件夹姥。注:本篇文章選取的源代碼版本是2.7.6。
2. 源碼分析
DubboBootstrapApplicationListener#onApplicationContextEvent
@Override
public void onApplicationContextEvent(ApplicationContextEvent event) {
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
dubboBootstrap.start();
}
這段代碼的主要邏輯是在Spring容器啟動或者刷新執(zhí)行Dubbo初始化洽洁。
- Spring容器啟動或者刷新發(fā)布ContextRefreshedEvent
- Dubbo通過監(jiān)聽該事件執(zhí)行DubboBootstrap#start方法
DubboBootstrap#start
/**
* Start the bootstrap
*/
public DubboBootstrap start() {
if (started.compareAndSet(false, true)) {
initialize();
if (logger.isInfoEnabled()) {
logger.info(NAME + " is starting...");
}
// 1. export Dubbo Services
exportServices();
// Not only provider register
if (!isOnlyRegisterProvider() || hasExportedServices()) {
// 2. export MetadataService
exportMetadataService();
//3. Register the local ServiceInstance if required
registerServiceInstance();
}
referServices();
if (logger.isInfoEnabled()) {
logger.info(NAME + " has started.");
}
}
return this;
}
private void exportServices() {
configManager.getServices().forEach(sc -> {
// TODO, compatible with ServiceConfig.export()
ServiceConfig serviceConfig = (ServiceConfig) sc;
serviceConfig.setBootstrap(this);
if (exportAsync) {
ExecutorService executor = executorRepository.getServiceExporterExecutor();
Future<?> future = executor.submit(() -> {
sc.export();
});
asyncExportingFutures.add(future);
} else {
sc.export();
exportedServices.add(sc);
}
});
}
這個方法主要是做一些初始化,我們重點關注exportServices這個服務導出的方法菲嘴。該方法的邏輯也很簡單,主要是根據(jù)開關選擇同步導出還是異步導出汰翠,最終核心方法都指向了ServiceConfig#export
ServiceConfig#export
public synchronized void export() {
if (!shouldExport()) {
return;
}
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.init();
}
// 檢查配置
checkAndUpdateSubConfigs();
//init serviceMetadata
serviceMetadata.setVersion(version);
serviceMetadata.setGroup(group);
serviceMetadata.setDefaultGroup(group);
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setServiceInterfaceName(getInterface());
serviceMetadata.setTarget(getRef());
// 判斷是否需要延遲暴露服務
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 執(zhí)行服務導出
doExport();
}
// 服務導出成功業(yè)務邏輯處理
// 分發(fā)服務成功導出事件
exported();
}
主要業(yè)務邏輯:
- 檢查配置龄坪,將未填寫的配置填充默認值
- 初始化Service的原數(shù)據(jù)
- 根據(jù)參數(shù)判斷是否需要延遲暴露服務
- 執(zhí)行服務導出
- 服務導出后的業(yè)務邏輯處理,主要是分發(fā)事件
ServiceConfig#doExport
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) {
return;
}
exported = true;
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
// 導出服務
doExportUrls();
}
該方法沒有多少邏輯复唤,主要是判斷是否需要導出服務健田。<dubbo:provider>
提供了參數(shù)可以取消導出服務用于本地調試。
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" export="false"/>
ServiceConfig#doExportUrls
private void doExportUrls() {
// 將當前的service添加到ServiceRepository
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
repository.registerProvider(
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);
// 加載注冊中心鏈接
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
// 遍歷 protocols佛纫,并在每個協(xié)議下導出服務
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// In case user specified path, register service one more time to map it to path.
repository.registerService(pathKey, interfaceClass);
// TODO, uncomment this line once service key is unified
serviceMetadata.setServiceKey(pathKey);
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
主要邏輯:
- 將當前的Service信息添加至ServiceRepository妓局,(ApplicationModel保存著服務提供者和調用者的基本信息)
- 加載注冊中心的鏈接(根據(jù)用戶配置將注冊中心的地址轉化為URL)
- 遍歷protocols,并在每個寫一下導出服務
ServiceConfig#doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
Map<String, String> map = new HashMap<String, String>();
// 省略代碼 主要是將ProtocolConfig中的信息添加至Map中呈宇,用于構造URL
// export service
String host = findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = findConfigedPorts(protocolConfig, name, map);
// 構造URL
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
// 省略代碼
}
該方法主要是用于構造協(xié)議的URL好爬,主要邏是將一些信息及配置對象字段放在Map中,Map中的內容將傳遞給URL對象甥啄。(上面代碼中為快速理解整體流程存炮,省略了具體的參數(shù)獲取源碼)
URL對象內容大致如下
dubbo://127.0.0.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=127.0.0.1&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&metadata-type=remote&methods=sayHello,sayHelloAsync&pid=3983&qos.port=22222&release=&side=provider×tamp=1630134753068
構造好URL對象,接下來開始服務導出相關代碼
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
// 不存在注冊中心則僅導出本地服務
} else {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
if (metadataService != null) {
metadataService.publishServiceDefinition(url);
}
}
}
this.urls.add(url);
}
這段代碼的主要邏輯如下:
- 獲取scope參數(shù)
- scope參數(shù)如果為none則不進行導出
- scope != remote 導出到本地
- scopre != local 導出到遠程
接下來看一下導出服務到本地的方法
private void exportLocal(URL url) {
// 構建URL,協(xié)議頭為injvm
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
// 創(chuàng)建Invoker 并調用InjvmProtocol.export方法
Exporter<?> exporter = PROTOCOL.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}
主要邏輯:
- 構造injvm協(xié)議頭的URL
- 創(chuàng)建 Invoker并調用InjvmProtocol.export方法
接下來看一下InjvmProtocol.export方法
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 創(chuàng)建InjvmExporter
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
這邊邏輯較為簡單蜈漓,創(chuàng)建InjvmExporter穆桂,并將該Exporter添加至Map中,key為服務名
接下來看一下服務導出到遠程的方法,這邊我們直接看RegistryProtocol#export方法
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 獲取注冊中心的URL 示例如下
// zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F127.0.0.1%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D127.0.0.1%26bind.port%3D20880%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26metadata-type%3Dremote%26methods%3DsayHello%2CsayHelloAsync%26pid%3D4292%26qos.port%3D22222%26release%3D%26side%3Dprovider%26timestamp%3D1630136961051&metadata-type=remote&pid=4292&qos.port=22222×tamp=1630136958853
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
// 獲取服務提供者的URL
// dubbo://127.0.0.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=127.0.0.1&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&metadata-type=remote&methods=sayHello,sayHelloAsync&pid=4292&qos.port=22222&release=&side=provider×tamp=1630136961051
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
// 獲取訂閱URL
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
// 創(chuàng)建監(jiān)聽器
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker
// 導出服務
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
// 判斷是否要注冊服務
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
// 服務注冊
register(registryUrl, registeredProviderUrl);
}
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
主要業(yè)務邏輯:
- 獲取注冊中心URL
- 獲取服務提供者的URL
- 獲取訂閱URL
- 創(chuàng)建訂閱監(jiān)聽器
- 導出服務
- 判斷是否需要服務注冊融虽,如果需要則進行服務注冊
該方法中主要有兩個導出服務(org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport)和注冊服務(org.apache.dubbo.registry.integration.RegistryProtocol#register)
先看一下導出服務org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport方法
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
// 創(chuàng)建一個Exporter對象
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
// 通過Dubbo協(xié)議進行導出得到一個exporter對象
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 獲取URL
URL url = invoker.getUrl();
// export service. key=org.apache.dubbo.demo.DemoService:20880
String key = serviceKey(url);
// 創(chuàng)建exporter存入緩存中
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
}
}
// 創(chuàng)建監(jiān)聽服務器 默認是NettyServer
openServer(url);
optimizeSerialization(url);
return exporter;
}
主要邏輯:
- 獲取URL創(chuàng)建Exporter對象
- 將export對象存入緩存
- 首次導出創(chuàng)建監(jiān)聽服務器
接下來看一下服務怎么注冊到注冊中心上(Zookeeper)
org.apache.dubbo.registry.integration.RegistryProtocol#register
public void register(URL registryUrl, URL registeredProviderUrl) {
// 獲取Registry實例
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
ProviderModel model = ApplicationModel.getProviderModel(registeredProviderUrl.getServiceKey());
model.addStatedUrl(new ProviderModel.RegisterStatedURL(
registeredProviderUrl,
registryUrl,
true
));
}
最終會調用到ZookeeperRegistry的doRegister方法享完,在Zookeeper中創(chuàng)建節(jié)點。
@Override
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
最終我們連上Zookeeper查看
3. 總結
最終用一張圖來過一下Dubbo服務導出的流程圖