1.簡介
本篇文章检眯,我們來研究一下 Dubbo 導(dǎo)出服務(wù)的過程厘擂。Dubbo 服務(wù)導(dǎo)出過程始于 Spring 容器發(fā)布刷新事件,Dubbo 在接收到事件后锰瘸,會立即執(zhí)行服務(wù)導(dǎo)出邏輯刽严。整個邏輯大致可分為三個部分,第一部分是前置工作避凝,主要用于檢查參數(shù)舞萄,組裝 URL。第二部分是導(dǎo)出服務(wù)管削,包含導(dǎo)出服務(wù)到本地 (JVM)倒脓,和導(dǎo)出服務(wù)到遠程兩個過程。第三部分是向注冊中心注冊服務(wù)含思,用于服務(wù)發(fā)現(xiàn)崎弃。本篇文章將會對這三個部分代碼進行詳細的分析甘晤。
2.源碼分析
2.1 源于事件監(jiān)聽
服務(wù)導(dǎo)出的入口方法是 DubboBootstrapApplicationListener
的 onApplicationContextEvent(ApplicationContextEvent event)
洽洁。我們來看一下這個類的結(jié)構(gòu):
onApplicationContextEvent(ApplicationContextEvent event)
是一個事件響應(yīng)方法瓦堵,該方法會在收到 Spring 上下文刷新事件后執(zhí)行服務(wù)導(dǎo)出操作。方法代碼如下:
@Override
public void onApplicationContextEvent(ApplicationContextEvent event) {
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
// 啟動類封裝
dubboBootstrap.start();
}
我們來看一下start
方法執(zhí)行哪些操作蜒灰,代碼如下:
/**
* Start the bootstrap
*/
public DubboBootstrap start() {
// 保證刷新事件只會觸發(fā)一次duboo的啟動類
if (started.compareAndSet(false, true)) {
ready.set(false);
initialize();
if (logger.isInfoEnabled()) {
logger.info(NAME + " is starting...");
}
// 1. 導(dǎo)出Dubbo服務(wù)
exportServices();
// Not only provider register
if (!isOnlyRegisterProvider() || hasExportedServices()) {
// 2. 導(dǎo)出 MetadataService
exportMetadataService();
//3. 如果需要的話艇炎,注冊本地服務(wù)實例
registerServiceInstance();
}
referServices();
if (asyncExportingFutures.size() > 0) {
new Thread(() -> {
try {
this.awaitFinish();
} catch (Exception e) {
logger.warn(NAME + " exportAsync occurred an exception.");
}
ready.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
}
}).start();
} else {
ready.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
}
}
if (logger.isInfoEnabled()) {
logger.info(NAME + " has started.");
}
}
return this;
}
我們看一下 exportServices 方法:
private void exportServices() {
configManager.getServices().forEach(sc -> {
ServiceConfig serviceConfig = (ServiceConfig) sc;
serviceConfig.setBootstrap(this);
if (exportAsync) {
ExecutorService executor = executorRepository.getServiceExporterExecutor();
Future<?> future = executor.submit(() -> {
sc.export();
exportedServices.add(sc);
});
asyncExportingFutures.add(future);
} else {
sc.export();
exportedServices.add(sc);
}
});
}
這個方法,Debug看一下
[圖片上傳失敗...(image-a4b84c-1587054229281)]
先是 configManager
中獲取多個 serviceBean
酌伊,而 serviceBean
又是如何來的呢?這得了解Dubbo配置是怎樣實現(xiàn)的缀踪;
2.2 Dubbo配置實現(xiàn)
2.2.1 Config依賴
2.2.2 配置管理
我們知道spring加載xml或annotation,第一步需要將這些配置元數(shù)據(jù)載入spring容器中,首先確認下這些<dubbo:* >
標簽虹脯,對應(yīng)的數(shù)據(jù)載體類驴娃。
我們以官方給的服務(wù)導(dǎo)出的Demo的配置文件為例:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<dubbo:application metadata-type="remote" name="demo-provider"/>
<dubbo:metadata-report address="zookeeper://127.0.0.1:2181"/>
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<dubbo:protocol name="dubbo"/>
<bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService"/>
</beans>
找到 dubbo-config\dubbo-config-spring\src\main\resources\META-INF\spring.handlers 文件。找到負責具體解析dubbo標簽的handler循集。
http\://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
http\://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
查看DubboNamespaceHandler
代碼唇敞,可以看到,dubbo自定義標簽最終是由DubboNamespaceHandler
進行處理的咒彤,通過前面對spring自定義標簽的講解我們知道疆柔,該類必然實現(xiàn)了NamespaceHandler
接口,而我們只需要查看其init()
方法是如何實現(xiàn)的即可镶柱。如下是該類的源碼:
public class DubboNamespaceHandler extends NamespaceHandlerSupport implements ConfigurableSourceBeanMetadataElement {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
@Override
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
registerBeanDefinitionParser("ssl", new DubboBeanDefinitionParser(SslConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
// 省略其他代碼...
}
對于dubbo的各個子標簽的bean的生成旷档,最終都是通過DubboBeanDefinitionParser
來實現(xiàn)的,而該Parser必然也實現(xiàn)了BeanDefinitionParser
接口歇拆,最終通過其parse()
方法來將標簽屬性解析為各個bean屬性鞋屈。這里我們直接閱讀其parse()
方法較長??:
@Override
public BeanDefinition parse(Element element, ParserContext parserContext) {
// Register DubboConfigAliasPostProcessor
registerDubboConfigAliasPostProcessor(parserContext.getRegistry());
return parse(element, parserContext, beanClass, required);
}
@SuppressWarnings("unchecked")
private static BeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required) {
RootBeanDefinition beanDefinition = new RootBeanDefinition();
beanDefinition.setBeanClass(beanClass);
beanDefinition.setLazyInit(false);
// 這里會嘗試獲取當前標簽的id值,如果當前標簽不存在id值故觅,則會根據(jù)以下策略來為其生成一個bean name:
// 1. 獲取其name屬性厂庇,將其作為當前bean的名稱;
// 2. 如果name屬性不存在输吏,則獲取其interface屬性权旷,將其作為bean的名稱,這里如果beanClass
// 是ProtocolConfig贯溅,則直接以dubbo作為其名稱拄氯,這是因為ProtocolConfig中沒有interface屬性;
// 3. 如果還是無法獲取到名稱盗迟,則直接以beanClass的名稱作為其名稱坤邪;
// 4. 到這里,也就能保證一定會獲取到一個名稱罚缕,但是很有可能該名稱在當前spring容器中已經(jīng)使用過了艇纺,
// 那么這里會判斷當前容器中是否包含該名稱,如果包含,則在一個無限循環(huán)中在其名稱后加一個數(shù)字黔衡,
// 最終一定能夠保證生成的名稱是唯一的
String id = element.getAttribute("id");
if (StringUtils.isEmpty(id) && required) {
// 獲取name屬性的值
String generatedBeanName = element.getAttribute("name");
if (StringUtils.isEmpty(generatedBeanName)) {
if (ProtocolConfig.class.equals(beanClass)) {
generatedBeanName = "dubbo";
} else {
generatedBeanName = element.getAttribute("interface");
}
}
// 獲取beanClass的名稱
if (StringUtils.isEmpty(generatedBeanName)) {
generatedBeanName = beanClass.getName();
}
id = generatedBeanName;
int counter = 2;
// 通過無限循環(huán)生成唯一的bean名稱
while (parserContext.getRegistry().containsBeanDefinition(id)) {
id = generatedBeanName + (counter++);
}
}
if (StringUtils.isNotEmpty(id)) {
// 將當前的BeanDefinition注冊到BeanDefinitionRegistry中影所,并且這里會設(shè)置其id屬性值
if (parserContext.getRegistry().containsBeanDefinition(id)) {
throw new IllegalStateException("Duplicate spring bean id " + id);
}
parserContext.getRegistry().registerBeanDefinition(id, beanDefinition);
beanDefinition.getPropertyValues().addPropertyValue("id", id);
}
// 這里判斷當前注冊的beanClass是否為ProtocolConfig蟆豫,如果是壳繁,則在當前BeanDefinitionRegistry
// 中找到所有的包含這樣一種屬性的BeanDefinition皂冰,該屬性名為protocol萍丐,屬性值為ProtocolConfig
// 類型,如果找到了兆旬,則將當前生成的ProtocolConfig的屬性注入到這些找到的BeanDefinition中
if (ProtocolConfig.class.equals(beanClass)) {
for (String name : parserContext.getRegistry().getBeanDefinitionNames()) {
BeanDefinition definition = parserContext.getRegistry().getBeanDefinition(name);
PropertyValue property = definition.getPropertyValues().getPropertyValue("protocol");
if (property != null) {
Object value = property.getValue();
if (value instanceof ProtocolConfig && id.equals(((ProtocolConfig) value).getName())) {
definition.getPropertyValues().addPropertyValue("protocol", new RuntimeBeanReference(id));
}
}
}
// 如果當前beanClass是ServiceBean柠座,這種bean對應(yīng)的標簽是<dubbo:service/>,這里會獲取該標簽
// 中的class屬性值捧书,并以該class為準創(chuàng)建一個BeanDefinition吹泡,然后將該BeanDefinition作為當前
// BeanDefinition的ref屬性注入其中。
// 這里parseProperties()方法會獲取當前標簽的所有<property/>子標簽经瓷,
// 然后將其屬性注入到新生成的BeanDefinition中
} else if (ServiceBean.class.equals(beanClass)) {
String className = element.getAttribute("class");
if (StringUtils.isNotEmpty(className)) {
RootBeanDefinition classDefinition = new RootBeanDefinition();
classDefinition.setBeanClass(ReflectUtils.forName(className));
classDefinition.setLazyInit(false);
// 轉(zhuǎn)換<property/>子標簽的屬性值
parseProperties(element.getChildNodes(), classDefinition);
beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));
}
// 這里判斷beanClass是否為ProviderConfig類型爆哑,如果是該類型,則將相關(guān)邏輯委托給parseNested()
// 方法進行處理舆吮,該方法的主要有兩個作用:
// 1. 獲取第一個標簽名為service的子標簽揭朝,判斷其是否有default屬性,如果有色冀,則將該屬性設(shè)置為當前
// BeanDefinition的default屬性值潭袱,也就是將當前的provider作為默認的provider;
// 2. 遍歷得到所有的標簽名為service的子標簽锋恬,通過遞歸的方式在當前BeanDefinitionRegistry中注冊
// 注冊ServiceBean屯换,并且將其provider設(shè)置為當前父標簽的provider。也就是說与学,通過這種方式彤悔,
// 我們可以為特定的ServiceBean自定義設(shè)置其provider配置。
} else if (ProviderConfig.class.equals(beanClass)) {
parseNested(element, parserContext, ServiceBean.class, true, "service", "provider", id, beanDefinition);
// 這里的邏輯與上面的provider的處理方式一致索守,即配置一個默認的consumer晕窑,然后將其子標簽中定義的
// reference設(shè)置默認的consumer為當前的consumer
} else if (ConsumerConfig.class.equals(beanClass)) {
parseNested(element, parserContext, ReferenceBean.class, false, "reference", "consumer", id, beanDefinition);
}
Set<String> props = new HashSet<>();
ManagedMap parameters = null;
// 除去上面的特殊情況以外,下面的邏輯主要目的是獲取當前beanClass中的各個屬性名蕾盯,然后獲取當前標簽
// 中對應(yīng)于該屬性名的各個標簽值幕屹,并將其轉(zhuǎn)換到對應(yīng)的屬性中
for (Method setter : beanClass.getMethods()) {
// 獲取當前beanClass中所有的set方法,并且通過該方法獲取其后屬性的名稱
String name = setter.getName();
if (name.length() > 3 && name.startsWith("set")
&& Modifier.isPublic(setter.getModifiers())
&& setter.getParameterTypes().length == 1) {
Class<?> type = setter.getParameterTypes()[0];
String beanProperty = name.substring(3, 4).toLowerCase() + name.substring(4);
String property = StringUtils.camelToSplitName(beanProperty, "-");
props.add(property);
// check the setter/getter whether match
Method getter = null;
try {
// 判斷當前set方法對應(yīng)的屬性是否有對應(yīng)的get方法或is方法级遭,如果沒有則忽略該屬性
getter = beanClass.getMethod("get" + name.substring(3), new Class<?>[0]);
} catch (NoSuchMethodException e) {
try {
getter = beanClass.getMethod("is" + name.substring(3), new Class<?>[0]);
} catch (NoSuchMethodException e2) {
// ignore, there is no need any log here since some class implement the interface: EnvironmentAware,
// ApplicationAware, etc. They only have setter method, otherwise will cause the error log during application start up.
}
}
// 沒有g(shù)et方法或is方法則忽略該屬性
if (getter == null
|| !Modifier.isPublic(getter.getModifiers())
|| !type.equals(getter.getReturnType())) {
continue;
}
if ("parameters".equals(property)) {
// 獲取當前標簽的所有名稱為parameter的子標簽望拖,將該標簽中設(shè)置的屬性值注入到當前
// BeanDefinition的parameters屬性中
parameters = parseParameters(element.getChildNodes(), beanDefinition);
} else if ("methods".equals(property)) {
// 獲取當前標簽的所有名稱為method的子標簽,并將這每一個子標簽都注冊
// 為一個MethodConfig的對象挫鸽,最終將這些對象注入到當前BeanDefinition
// 的methods屬性中
parseMethods(id, element.getChildNodes(), beanDefinition, parserContext);
} else if ("arguments".equals(property)) {
// 獲取當前標簽的所有名稱為argument的子標簽说敏,并將這每一個子標簽都注冊為一個
// ArgumentConfig的對象,最終將這些對象注入到當前BeanDefinition
// 的arguments屬性中
parseArguments(id, element.getChildNodes(), beanDefinition, parserContext);
} else {
// 如果當前屬性名不是上述的幾種特例情況丢郊,則會在當前標簽中獲取與屬性名同名的標簽的值盔沫,
// 如果該值為空医咨,則不進行處理
String value = element.getAttribute(property);
if (value != null) {
value = value.trim();
if (value.length() > 0) {
if ("registry".equals(property) && RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(value)) {
// 如果當前屬性名為registry,并且其值為N/A架诞,則為期生成一個空的
// RegistryConfig對象注入到當前BeanDefinition中
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress(RegistryConfig.NO_AVAILABLE);
beanDefinition.getPropertyValues().addPropertyValue(beanProperty, registryConfig);
} else if ("provider".equals(property) || "registry".equals(property) || ("protocol".equals(property) && AbstractServiceConfig.class.isAssignableFrom(beanClass))) {
/**
* For 'provider' 'protocol' 'registry', keep literal value (should be id/name) and set the value to 'registryIds' 'providerIds' protocolIds'
* The following process should make sure each id refers to the corresponding instance, here's how to find the instance for different use cases:
* 1. Spring, check existing bean by id, see{@link ServiceBean#afterPropertiesSet()}; then try to use id to find configs defined in remote Config Center
* 2. API, directly use id to find configs defined in remote Config Center; if all config instances are defined locally, please use {@link ServiceConfig#setRegistries(List)}
*/
beanDefinition.getPropertyValues().addPropertyValue(beanProperty + "Ids", value);
} else {
Object reference;
if (isPrimitive(type)) {
// 如果當前屬性類型是基本數(shù)據(jù)類型拟淮,并且其值為默認值,
// 則將當前屬性設(shè)置為空
if ("async".equals(property) && "false".equals(value)
|| "timeout".equals(property) && "0".equals(value)
|| "delay".equals(property) && "0".equals(value)
|| "version".equals(property) && "0.0.0".equals(value)
|| "stat".equals(property) && "-1".equals(value)
|| "reliable".equals(property) && "false".equals(value)) {
// backward compatibility for the default value in old version's xsd
value = null;
}
reference = value;
} else if (ONRETURN.equals(property) || ONTHROW.equals(property) || ONINVOKE.equals(property)) {
// 如果當前屬性為上述幾種谴忧,則獲取該屬性所指定的bean名稱和方法名很泊,
// 將其設(shè)置到當前的BeanDefinition中
int index = value.lastIndexOf(".");
String ref = value.substring(0, index);
String method = value.substring(index + 1);
reference = new RuntimeBeanReference(ref);
beanDefinition.getPropertyValues().addPropertyValue(property + METHOD, method);
} else {
if ("ref".equals(property) && parserContext.getRegistry().containsBeanDefinition(value)) {
// 如果屬性名為ref,并且當前BeanDefinitionRegistry中包含有
// 該名稱的bean沾谓,則將該bean注入到當前BeanDefinition中
BeanDefinition refBean = parserContext.getRegistry().getBeanDefinition(value);
if (!refBean.isSingleton()) {
throw new IllegalStateException("The exported service ref " + value + " must be singleton! Please set the " + value + " bean scope to singleton, eg: <bean id=\"" + value + "\" scope=\"singleton\" ...>");
}
}
reference = new RuntimeBeanReference(value);
}
beanDefinition.getPropertyValues().addPropertyValue(beanProperty, reference);
}
}
}
}
}
}
// 對于那些在標簽中存在委造,但是在當前beanClass中不存在的屬性,dubbo會將其以鍵值對的形式
// 存入到當前BeanDefinition的parameters屬性中
NamedNodeMap attributes = element.getAttributes();
int len = attributes.getLength();
for (int i = 0; i < len; i++) {
Node node = attributes.item(i);
String name = node.getLocalName();
if (!props.contains(name)) {
if (parameters == null) {
parameters = new ManagedMap();
}
String value = node.getNodeValue();
parameters.put(name, new TypedStringValue(value, String.class));
}
}
if (parameters != null) {
beanDefinition.getPropertyValues().addPropertyValue("parameters", parameters);
}
return beanDefinition;
}
在上述解析過程中均驶,dubbo首先會為當前BeanDefinition生成一個名稱昏兆,然后判斷當前beanClass是否為ProtocolConfig
,ServiceBean
妇穴,ProviderConfig
和ConsumerConfig
中的一個爬虱,如果是,則會進行一定的特殊解析腾它。在特殊解析完成后饮潦,dubbo會獲取當前beanClass的所有屬性,然后在當前標簽中查找對應(yīng)的標簽值携狭,并將其設(shè)置到對應(yīng)的屬性中,最終完成所有屬性值的裝配回俐。
之后會調(diào)用AbstractCofig的標有@PostConstruct 注解的addIntoConfigManager
方法逛腿,將響應(yīng)的Config托給ConfigManager管理
/**
* Add {@link AbstractConfig instance} into {@link ConfigManager}
* <p>
* Current method will invoked by Spring or Java EE container automatically, or should be triggered manually.
*
* @see ConfigManager#addConfig(AbstractConfig)
* @since 2.7.5
*/
@PostConstruct
public void addIntoConfigManager() {
ApplicationModel.getConfigManager().addConfig(this);
}private final Map<String, Map<String, AbstractConfig>> configsCache = newMap();`
ConfigManager
中維護了一份配置緩存,存儲響應(yīng)的配置數(shù)據(jù)(我們本章關(guān)注的是dubbo:service 標簽的配置的導(dǎo)出的服務(wù)接口)
private final Map<String, Map<String, AbstractConfig>> configsCache = newMap();
最后通過從configsCache獲取serviceBean
仅颇,代碼如下:
protected <C extends AbstractConfig> Map<String, C> getConfigsMap(String configType) {
return (Map<String, C>) read(() -> configsCache.getOrDefault(configType, emptyMap()));
}
2.3 服務(wù)導(dǎo)出
我們繼續(xù)回到 2.1 章節(jié)单默,繼續(xù)了解 導(dǎo)出的詳細過程, ServiceBean 繼承自 ServiceConfig 忘瓦,ServiceConfig 本身帶了一個 ScheduledExecutorService 用于某個 bean 延遲啟動 (這個感覺類的對象消耗挺大的搁廓,因為每個 service bean 都帶有一個 ScheduledExecutorService 字段,而服務(wù)一般都會有多個)耕皮,我們在代碼 sc.export() 這句就會看到具體的實現(xiàn)境蜕。
// ServiceConfig#export()
@Override
public synchronized void export() {
if (!shouldExport()) {
return;
}
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.init();
}
checkAndUpdateSubConfigs();
// init serviceMetadata
serviceMetadata.setVersion(version);
serviceMetadata.setGroup(group);
serviceMetadata.setDefaultGroup(group);
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setServiceInterfaceName(getInterface());
serviceMetadata.setTarget(getRef());
// 延遲導(dǎo)出
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 最終調(diào)用這個方法
doExport();
}
exported();
}
// ServiceConfig#doExport()
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) {
return;
}
exported = true;
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
doExportUrls();
}
// ServiceConfig#doExportUrls()
@SuppressWarnings({"unchecked", "rawtypes"})
private void doExportUrls() {
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
repository.registerProvider(
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);
// URL 中 protocol屬性 zookeeper-> registry,??接下來會調(diào)用RegistryProtocol#export方法
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// In case user specified path, register service one more time to map it to path.
repository.registerService(pathKey, interfaceClass);
// TODO, uncomment this line once service key is unified
serviceMetadata.setServiceKey(pathKey);
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
通過調(diào)試 registryURLs 一個例子例如這樣 :
registry://172.20.xx.xx:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&metadata-type=remote&pid=7900&qos.port=22222®istry=zookeeper×tamp=1587026084220
可以知道前面的邏輯是獲取 protocol 信息凌停,獲取知道 某個 service 是通過什么 protocol 進行傳輸?shù)牧荒辏谑沁M入了這個方法??????又是很長的代碼 :
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
//這一段設(shè)置了一堆信息到 map中,見下圖
Map<String, String> map = new HashMap<String, String>();
省略.....
//init serviceMetadata attachments
serviceMetadata.getAttachments().putAll(map);
// 導(dǎo)出核心邏輯
// export service
String host = findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
// 加載 ConfiguratorFactory罚拟,并生成 Configurator 實例台诗,然后通過實例配置 url
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(SCOPE_KEY);
// 如果 scope = none完箩,則什么都不做
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// scope != remote,導(dǎo)出到本地
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// scope != local拉队,導(dǎo)出到遠程
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
// 加載監(jiān)視器鏈接
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
// 將監(jiān)視器鏈接作為參數(shù)添加到 url 中
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
// 省略無關(guān)代碼...(日志)
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
//注意這里1字!使用 ProxyFactory 為服務(wù)提供類(ref)生成 Invoker (相當與 spring 中的 AOP 實現(xiàn))
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 導(dǎo)出服務(wù)粱快,并生成 Exporter
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
// 不存在注冊中心秩彤,僅導(dǎo)出服務(wù)
} else {
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
if (metadataService != null) {
metadataService.publishServiceDefinition(url);
}
}
}
this.urls.add(url);
}
總結(jié)來說有以下幾個步驟:
- 封裝 map信息,示例如下:
- PROXY_FACTORY 生成 invoker
- 生成包裝類
- 調(diào)用 protocol 的 export 方法
上面代碼根據(jù) url 中的 scope 參數(shù)決定服務(wù)導(dǎo)出方式皆尔,分別如下:
- scope = none呐舔,不導(dǎo)出服務(wù)
- scope != remote,導(dǎo)出到本地
- scope != local慷蠕,導(dǎo)出到遠程
不管是導(dǎo)出到本地珊拼,還是遠程。進行服務(wù)導(dǎo)出之前流炕,均需要先創(chuàng)建 Invoker澎现,這是一個很重要的步驟。因此下面先來分析 Invoker 的創(chuàng)建過程每辟。
2.3.1 Invoker 創(chuàng)建過程
在 Dubbo 中剑辫,Invoker 是一個非常重要的模型。在服務(wù)提供端渠欺,以及服務(wù)引用端均會出現(xiàn) Invoker妹蔽。Dubbo 官方文檔中對 Invoker 進行了說明,這里引用一下挠将。
Invoker 是實體域胳岂,它是 Dubbo 的核心模型,其它模型都向它靠擾舔稀,或轉(zhuǎn)換成它乳丰,它代表一個可執(zhí)行體,可向它發(fā)起 invoke 調(diào)用内贮,它有可能是一個本地的實現(xiàn)产园,也可能是一個遠程的實現(xiàn),也可能一個集群實現(xiàn)夜郁。
既然 Invoker 如此重要什燕,那么我們很有必要搞清楚 Invoker 的用途。Invoker 是由 ProxyFactory 創(chuàng)建而來竞端,Dubbo 默認的 ProxyFactory 實現(xiàn)類是 JavassistProxyFactory秋冰。下面我們到 JavassistProxyFactory 代碼中,探索 Invoker 的創(chuàng)建過程婶熬。如下:
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 為目標類創(chuàng)建 Wrapper
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 創(chuàng)建匿名 Invoker 類對象剑勾,并實現(xiàn) doInvoke 方法埃撵。
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 調(diào)用 Wrapper 的 invokeMethod 方法,invokeMethod 最終會調(diào)用目標方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
如上虽另,JavassistProxyFactory 創(chuàng)建了一個繼承自 AbstractProxyInvoker 類的匿名對象暂刘,并覆寫了抽象方法 doInvoke谣拣。覆寫后的 doInvoke 邏輯比較簡單拴还,僅是將調(diào)用請求轉(zhuǎn)發(fā)給了 Wrapper 類的 invokeMethod 方法孝偎。Wrapper 用于“包裹”目標類渤昌,Wrapper 是一個抽象類,僅可通過 getWrapper(Class) 方法創(chuàng)建子類湖员。在創(chuàng)建 Wrapper 子類的過程中贫悄,子類代碼生成邏輯會對 getWrapper 方法傳入的 Class 對象進行解析,拿到諸如類方法娘摔,類成員變量等信息窄坦。以及生成 invokeMethod 方法代碼和其他一些方法代碼。代碼生成完畢后晰筛,通過 Javassist 生成 Class 對象嫡丙,最后再通過反射創(chuàng)建 Wrapper 實例。相關(guān)的代碼如下:
public static Wrapper getWrapper(Class<?> c) {
while (ClassGenerator.isDynamicClass(c)) {
c = c.getSuperclass();
}
if (c == Object.class) {
return OBJECT_WRAPPER;
}
return WRAPPER_MAP.computeIfAbsent(c, key -> makeWrapper(key));
}
getWrapper 方法僅包含一些緩存操作邏輯读第,不難理解曙博。下面我們看一下 makeWrapper 方法。
private static Wrapper makeWrapper(Class<?> c) {
// 檢測 c 是否為基本類型怜瞒,若是則拋出異常
if (c.isPrimitive()) {
throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);
}
String name = c.getName();
ClassLoader cl = ClassUtils.getClassLoader(c);
// c1 用于存儲 setPropertyValue 方法代碼
StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
// c2 用于存儲 getPropertyValue 方法代碼
StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
// c3 用于存儲 invokeMethod 方法代碼
StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");
// 生成類型轉(zhuǎn)換代碼及異常捕捉代碼父泳,比如:
// DemoService w; try { w = ((DemoServcie) $1); }}catch(Throwable e){ throw new IllegalArgumentException(e); }
c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
// pts 用于存儲成員變量名和類型
Map<String, Class<?>> pts = new HashMap<>();
// ms 用于存儲方法描述信息(可理解為方法簽名)及 Method 實例
Map<String, Method> ms = new LinkedHashMap<>();
// mns 為方法名列表
List<String> mns = new ArrayList<>();
// dmns 用于存儲“定義在當前類中的方法”的名稱
List<String> dmns = new ArrayList<>();
// --------------------------------? 分割線1 ?-------------------------------------
// 獲取 public 訪問級別的字段,并為所有字段生成條件判斷語句
for (Field f : c.getFields()) {
String fn = f.getName();
Class<?> ft = f.getType();
if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers())) {
// 忽略關(guān)鍵字 static 或 transient 修飾的變量
continue;
}
// 生成條件判斷及賦值語句吴汪,比如:
// if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;}
// if( $2.equals("age") ) { w.age = ((Number) $3).intValue(); return;}
c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");
// 生成條件判斷及返回語句惠窄,比如:
// if( $2.equals("name") ) { return ($w)w.name; }
c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");
// 存儲 <字段名, 字段類型> 鍵值對到 pts 中
pts.put(fn, ft);
}
// --------------------------------? 分割線2 ?-------------------------------------
Method[] methods = c.getMethods();
// 檢測 c 中是否包含在當前類中聲明的方法
boolean hasMethod = hasMethods(methods);
if (hasMethod) {
c3.append(" try{");
for (Method m : methods) {
//ignore Object's method.
if (m.getDeclaringClass() == Object.class) {
// 忽略 Object 中定義的方法
continue;
}
String mn = m.getName();
// 生成方法名判斷語句,比如:
// if ( "sayHello".equals( $2 )
c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");
int len = m.getParameterTypes().length;
// 生成“運行時傳入的參數(shù)數(shù)量與方法參數(shù)列表長度”判斷語句漾橙,比如:
// && $3.length == 2
c3.append(" && ").append(" $3.length == ").append(len);
boolean override = false;
for (Method m2 : methods) {
// 檢測方法是否存在重載情況杆融,條件為:方法對象不同 && 方法名相同
if (m != m2 && m.getName().equals(m2.getName())) {
override = true;
break;
}
}
// 對重載方法進行處理,考慮下面的方法:
// 1. void sayHello(Integer, String)
// 2. void sayHello(Integer, Integer)
// 方法名相同霜运,參數(shù)列表長度也相同脾歇,因此不能僅通過這兩項判斷兩個方法是否相等。
// 需要進一步判斷方法的參數(shù)類型
if (override) {
if (len > 0) {
for (int l = 0; l < len; l++) {
// 生成參數(shù)類型進行檢測代碼淘捡,比如:
// && $3[0].getName().equals("java.lang.Integer")
// && $3[1].getName().equals("java.lang.String")
c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"")
.append(m.getParameterTypes()[l].getName()).append("\")");
}
}
}
// 添加 ) {藕各,完成方法判斷語句,此時生成的代碼可能如下(已格式化):
// if ("sayHello".equals($2)
// && $3.length == 2
// && $3[0].getName().equals("java.lang.Integer")
// && $3[1].getName().equals("java.lang.String")) {
c3.append(" ) { ");
// 根據(jù)返回值類型生成目標方法調(diào)用語句
if (m.getReturnType() == Void.TYPE) {
// w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); return null;
c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");
} else {
// return w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");
}
// 添加 }, 生成的代碼形如(已格式化):
// if ("sayHello".equals($2)
// && $3.length == 2
// && $3[0].getName().equals("java.lang.Integer")
// && $3[1].getName().equals("java.lang.String")) {
//
// w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
// return null;
// }
c3.append(" }");
// 添加方法名到 mns 集合中
mns.add(mn);
// 檢測當前方法是否在 c 中被聲明的
if (m.getDeclaringClass() == c) {
// 若是焦除,則將當前方法名添加到 dmns 中
dmns.add(mn);
}
ms.put(ReflectUtils.getDesc(m), m);
}
// 添加異常捕捉語句
c3.append(" } catch(Throwable e) { ");
c3.append(" throw new java.lang.reflect.InvocationTargetException(e); ");
c3.append(" }");
}
// 添加 NoSuchMethodException 異常拋出代碼
c3.append(" throw new ").append(NoSuchMethodException.class.getName()).append("(\"Not found method \\\"\"+$2+\"\\\" in class ").append(c.getName()).append(".\"); }");
// --------------------------------? 分割線3 ?-------------------------------------
// 處理 get/set 方法
Matcher matcher;
for (Map.Entry<String, Method> entry : ms.entrySet()) {
String md = entry.getKey();
Method method = entry.getValue();
// 匹配以 get 開頭的方法
if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
String pn = propertyName(matcher.group(1));
// 生成屬性判斷以及返回語句激况,示例如下:
// if( $2.equals("dream") ) { return ($w).w.hasDream(); }
c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
pts.put(pn, method.getReturnType());
// 匹配以 is/has/can 開頭的方法
} else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {
String pn = propertyName(matcher.group(1));
// 生成屬性判斷以及返回語句,示例如下:
// if( $2.equals("dream") ) { return ($w).w.hasDream(); }
c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
pts.put(pn, method.getReturnType());
// 匹配以 set 開頭的方法
} else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
Class<?> pt = method.getParameterTypes()[0];
String pn = propertyName(matcher.group(1));
// 生成屬性判斷以及 setter 調(diào)用語句,示例如下:
// if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; }
c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }");
pts.put(pn, pt);
}
}
// 添加 NoSuchPropertyException 異常拋出代碼
c1.append(" throw new ").append(NoSuchPropertyException.class.getName()).append("(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class ").append(c.getName()).append(".\"); }");
c2.append(" throw new ").append(NoSuchPropertyException.class.getName()).append("(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class ").append(c.getName()).append(".\"); }");
// --------------------------------? 分割線4 ?-------------------------------------
// make class
long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
// 創(chuàng)建類生成器
ClassGenerator cc = ClassGenerator.newInstance(cl);
// 設(shè)置類名及超類
cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);
cc.setSuperClass(Wrapper.class);
// 添加默認構(gòu)造方法
cc.addDefaultConstructor();
// 添加字段
cc.addField("public static String[] pns;"); // property name array.
cc.addField("public static " + Map.class.getName() + " pts;"); // property type map.
cc.addField("public static String[] mns;"); // all method name array.
cc.addField("public static String[] dmns;"); // declared method name array.
for (int i = 0, len = ms.size(); i < len; i++) {
cc.addField("public static Class[] mts" + i + ";");
}
// 添加方法代碼
cc.addMethod("public String[] getPropertyNames(){ return pns; }");
cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");
cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");
cc.addMethod("public String[] getMethodNames(){ return mns; }");
cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
cc.addMethod(c1.toString());
cc.addMethod(c2.toString());
cc.addMethod(c3.toString());
try {
// 生成類
Class<?> wc = cc.toClass();
// 設(shè)置字段值
wc.getField("pts").set(null, pts);
wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
wc.getField("mns").set(null, mns.toArray(new String[0]));
wc.getField("dmns").set(null, dmns.toArray(new String[0]));
int ix = 0;
for (Method m : ms.values()) {
wc.getField("mts" + ix++).set(null, m.getParameterTypes());
}
// 創(chuàng)建 Wrapper 實例
return (Wrapper) wc.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Throwable e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
cc.release();
ms.clear();
mns.clear();
dmns.clear();
}
}
上面代碼很長乌逐,大家耐心看一下竭讳。我們在上面代碼中做了大量的注釋,并按功能對代碼進行了分塊黔帕,以幫助大家理解代碼邏輯代咸。下面對這段代碼進行講解。首先我們把目光移到分割線1之上的代碼成黄,這段代碼主要用于進行一些初始化操作呐芥。比如創(chuàng)建 c1、c2奋岁、c3 以及 pts思瘟、ms、mns 等變量闻伶,以及向 c1滨攻、c2、c3 中添加方法定義和類型轉(zhuǎn)換代碼蓝翰。接下來是分割線1到分割線2之間的代碼光绕,這段代碼用于為 public 級別的字段生成條件判斷取值與賦值代碼。這段代碼不是很難看懂畜份,就不多說了诞帐。繼續(xù)向下看,分割線2和分隔線3之間的代碼用于為定義在當前類中的方法生成判斷語句爆雹,和方法調(diào)用語句停蕉。因為需要對方法重載進行校驗,因此到這這段代碼看起來有點復(fù)雜钙态。不過耐心看一下慧起,也不是很難理解。接下來是分割線3和分隔線4之間的代碼册倒,這段代碼用于處理 getter蚓挤、setter 以及以 is/has/can 開頭的方法。處理方式是通過正則表達式獲取方法類型(get/set/is/...)驻子,以及屬性名灿意。之后為屬性名生成判斷語句,然后為方法生成調(diào)用語句拴孤。最后我們再來看一下分隔線4以下的代碼,這段代碼通過 ClassGenerator 為剛剛生成的代碼構(gòu)建 Class 類甲捏,并通過反射創(chuàng)建對象演熟。ClassGenerator 是 Dubbo 自己封裝的,該類的核心是 toClass() 的重載方法 toClass(ClassLoader, ProtectionDomain),該方法通過 javassist 構(gòu)建 Class芒粹。這里就不分析 toClass 方法了兄纺,大家請自行分析。
閱讀 Wrapper 類代碼需要對 javassist 框架有所了解化漆。關(guān)于 javassist估脆,大家如果不熟悉,請自行查閱資料座云,本節(jié)不打算介紹 javassist 相關(guān)內(nèi)容疙赠。
好了,關(guān)于 Wrapper 類生成過程就分析到這朦拖。如果大家看的不是很明白圃阳,可以單獨為 Wrapper 創(chuàng)建單元測試,然后單步調(diào)試璧帝。并將生成的代碼拷貝出來捍岳,格式化后再進行觀察和理解。
2.3.2 導(dǎo)出服務(wù)到本地
本節(jié)我們來看一下服務(wù)導(dǎo)出相關(guān)的代碼睬隶,按照代碼執(zhí)行順序锣夹,本節(jié)先來分析導(dǎo)出服務(wù)到本地的過程。相關(guān)代碼如下:
/**
* always export injvm
*/
private void exportLocal(URL url) {
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL) // 設(shè)置協(xié)議頭為 injvm
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
// 創(chuàng)建 Invoker苏潜,并導(dǎo)出服務(wù)银萍,這里的 protocol 會在運行時調(diào)用 InjvmProtocol 的 export 方法
Exporter<?> exporter = PROTOCOL.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
}
exportLocal 方法比較簡單,創(chuàng)建一個新的 URL 并將協(xié)議頭窖贤、主機名以及端口設(shè)置成新的值砖顷。然后創(chuàng)建 Invoker,并調(diào)用 InjvmProtocol 的 export 方法導(dǎo)出服務(wù)赃梧。下面我們來看一下 InjvmProtocol 的 export 方法都做了哪些事情滤蝠。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
如上,InjvmProtocol 的 export 方法僅創(chuàng)建了一個 InjvmExporter授嘀,無其他邏輯物咳。到此導(dǎo)出服務(wù)到本地就分析完了,接下來蹄皱,我們繼續(xù)分析導(dǎo)出服務(wù)到遠程的過程览闰。
2.3.3 導(dǎo)出服務(wù)到遠程
與導(dǎo)出服務(wù)到本地相比,導(dǎo)出服務(wù)到遠程的過程要復(fù)雜不少巷折,其包含了服務(wù)導(dǎo)出與服務(wù)注冊兩個過程压鉴。這兩個過程涉及到了大量的調(diào)用,比較復(fù)雜锻拘。
下面開始分析油吭,我們把目光移動到 RegistryProtocol 的 export 方法上击蹲。
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 獲取注冊中心 URL,以 zookeeper 注冊中心為例婉宰,得到的示例 URL 如下:
// zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
URL registryUrl = getRegistryUrl(originInvoker);
// 獲取已注冊的服務(wù)提供者 URL歌豺,比如:
// dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
URL providerUrl = getProviderUrl(originInvoker);
// 獲取訂閱 URL,比如:
// provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
// 創(chuàng)建監(jiān)聽器
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
// 向注冊中心進行訂閱 override 數(shù)據(jù)
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
// --------------------------------? 分割線1 ?-------------------------------------
// 導(dǎo)出服務(wù)
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
// 向注冊中心注冊服務(wù)
register(registryUrl, registeredProviderUrl);
}
// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
在以上操作中心包,除了創(chuàng)建并返回 DestroyableExporter 沒什么難度外类咧,其他幾步操作都不是很簡單。這其中蟹腾,導(dǎo)出服務(wù)和注冊服務(wù)是本章要重點分析的邏輯痕惋。下面先來分析 doLocalExport 方法的邏輯,如下:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
// 訪問緩存bounds岭佳,如果沒有創(chuàng)建并更新緩存bounds
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
// 創(chuàng)建 Invoker 為委托類對象
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
// 調(diào)用 protocol 的 export 方法導(dǎo)出服務(wù)
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
假設(shè)運行時協(xié)議為 dubbo血巍,此處的 protocol 變量會在運行時加載 DubboProtocol,并調(diào)用 DubboProtocol 的 export 方法珊随。所以述寡,接下來我們目光轉(zhuǎn)移到 DubboProtocol 的 export 方法上,相關(guān)分析如下:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 獲取服務(wù)標識叶洞,理解成服務(wù)坐標也行鲫凶。由服務(wù)組名,服務(wù)名衩辟,服務(wù)版本號以及端口組成螟炫。比如:
// demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
String key = serviceKey(url);
// 創(chuàng)建 DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 將 <key, exporter> 鍵值對放入緩存中
exporterMap.put(key, exporter);
// 本地存根相關(guān)代碼
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
// 省略日志打印代碼
}
}
// 啟動服務(wù)器
openServer(url);
// 優(yōu)化序列化
optimizeSerialization(url);
return exporter;
}
如上,我們重點關(guān)注 DubboExporter 的創(chuàng)建以及 openServer 方法,其他邏輯看不懂也沒關(guān)系,不影響理解服務(wù)導(dǎo)出過程宁否。另外悲立,DubboExporter 的代碼比較簡單坷檩,就不分析了。下面分析 openServer 方法。
private void openServer(URL url) {
// 獲取 host:port,并將其作為服務(wù)器實例的 key碗淌,用于標識當前的服務(wù)器實例
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
// 訪問緩存
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
// 創(chuàng)建服務(wù)器實例
serverMap.put(key, createServer(url));
}
}
} else {
// 服務(wù)器已創(chuàng)建,則根據(jù) url 中的配置重置服務(wù)器
server.reset(url);
}
}
}
如上抖锥,在同一臺機器上(單網(wǎng)卡)亿眠,同一個端口上僅允許啟動一個服務(wù)器實例。若某個端口上已有服務(wù)器實例磅废,此時則調(diào)用 reset 方法重置服務(wù)器的一些配置纳像。考慮到篇幅問題拯勉,關(guān)于服務(wù)器實例重置的代碼就不分析了竟趾。接下來分析服務(wù)器實例的創(chuàng)建過程耙考。如下:
private ProtocolServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// 添加心跳檢測配置到 url 中
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
// 添加編碼解碼器參數(shù)
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
// 獲取 server 參數(shù),默認為 netty
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
// 通過 SPI 檢測是否存在 server 參數(shù)所代表的 Transporter 拓展潭兽,不存在則拋出異常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {
// 創(chuàng)建 ExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 獲取 client 參數(shù),可指定 netty斗遏,mina
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
// 獲取所有的 Transporter 實現(xiàn)類名稱集合山卦,比如 supportedTypes = [netty, mina]
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
// 檢測當前 Dubbo 所支持的 Transporter 實現(xiàn)類名稱列表中,
// 是否包含 client 所表示的 Transporter诵次,若不包含账蓉,則拋出異常
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return new DubboProtocolServer(server);
}
如上,createServer 包含三個核心的邏輯逾一。第一是檢測是否存在 server 參數(shù)所代表的 Transporter 拓展铸本,不存在則拋出異常。第二是創(chuàng)建服務(wù)器實例遵堵。第三是檢測是否支持 client 參數(shù)所表示的 Transporter 拓展箱玷,不存在也是拋出異常。兩次檢測操作所對應(yīng)的代碼比較直白了陌宿,無需多說锡足。但創(chuàng)建服務(wù)器的操作目前還不是很清晰,我們繼續(xù)往下看壳坪。
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 獲取 Exchanger舶得,默認為 HeaderExchanger。
// 緊接著調(diào)用 HeaderExchanger 的 bind 方法創(chuàng)建 ExchangeServer 實例
return getExchanger(url).bind(url, handler);
}
上面代碼比較簡單爽蝴,就不多說了沐批。下面看一下 HeaderExchanger 的 bind 方法。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 創(chuàng)建 HeaderExchangeServer 實例蝎亚,該方法包含了多個邏輯九孩,分別如下:
// 1. new HeaderExchangeHandler(handler)
// 2. new DecodeHandler(new HeaderExchangeHandler(handler))
// 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
HeaderExchanger 的 bind 方法包含的邏輯比較多,但目前我們僅需關(guān)心 Transporters 的 bind 方法邏輯即可颖对。該方法的代碼如下:
public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handlers 元素數(shù)量大于1捻撑,則創(chuàng)建 ChannelHandler 分發(fā)器
handler = new ChannelHandlerDispatcher(handlers);
}
// 獲取自適應(yīng) Transporter 實例,并調(diào)用實例方法
return getTransporter().bind(url, handler);
}
如上缤底,getTransporter() 方法獲取的 Transporter 是在運行時動態(tài)創(chuàng)建的顾患,類名為 TransporterAdaptive,也就是自適應(yīng)拓展類个唧。TransporterAdaptive 會在運行時根據(jù)傳入的 URL 參數(shù)決定加載什么類型的 Transporter江解,默認為 NettyTransporter。下面我們繼續(xù)跟下去徙歼,這次分析的是 NettyTransporter 的 bind 方法犁河。
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
// 創(chuàng)建 NettyServer
return new NettyServer(url, handler);
}
這里僅有一句創(chuàng)建 NettyServer 的代碼鳖枕,無需多說,我們繼續(xù)向下看桨螺。
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// 調(diào)用父類構(gòu)造方法
super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
}
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
// 調(diào)用父類構(gòu)造方法宾符,這里就不用跟進去了,沒什么復(fù)雜邏輯
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
// 獲取 ip 和端口
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
// 設(shè)置 ip 為 0.0.0.0
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 獲取最大可接受連接數(shù)
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {
// 調(diào)用模板方法 doOpen 啟動服務(wù)器
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
executor = executorRepository.createExecutorIfAbsent(url);
}
上面代碼多為賦值代碼灭翔,不需要多講魏烫。我們重點關(guān)注 doOpen 抽象方法,該方法需要子類實現(xiàn)肝箱。下面回到 NettyServer 中哄褒。
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
// 創(chuàng)建 boss 和 worker 線程池
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
// 創(chuàng)建 ServerBootstrap
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("backlog", getUrl().getPositiveParameter(BACKLOG_KEY, Constants.DEFAULT_BACKLOG));
// 設(shè)置 PipelineFactory
bootstrap.setPipelineFactory(() -> {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
});
// 綁定到指定的 ip 和端口上
channel = bootstrap.bind(getBindAddress());
}
以上就是 NettyServer 創(chuàng)建的過程,熟悉netty的應(yīng)該都比較容易理解煌张,dubbo 2.7.6使用的 NettyServer 是基于 netty 4.1.35.Final 版本實現(xiàn)的呐赡。
本節(jié)內(nèi)容先到這里,接下來分析服務(wù)導(dǎo)出的另一塊邏輯 — 服務(wù)注冊骏融。
2.3.4 服務(wù)注冊
節(jié)我們來分析服務(wù)注冊過程链嘀,服務(wù)注冊操作對于 Dubbo 來說不是必需的,通過服務(wù)直連的方式就可以繞過注冊中心档玻。但通常我們不會這么做管闷,直連方式不利于服務(wù)治理,僅推薦在測試服務(wù)時使用窃肠。對于 Dubbo 來說包个,注冊中心雖不是必需,但卻是必要的冤留。因此碧囊,關(guān)于注冊中心以及服務(wù)注冊相關(guān)邏輯,我們也需要搞懂纤怒。
本節(jié)內(nèi)容以 Zookeeper 注冊中心作為分析目標糯而,其他類型注冊中心大家可自行分析。下面從服務(wù)注冊的入口方法開始分析泊窘,我們把目光再次移到 RegistryProtocol 的 export 方法上熄驼。如下:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// ${導(dǎo)出服務(wù)}
// 省略其他代碼
if (register) {
// 注冊服務(wù)
register(registryUrl, registeredProviderUrl);
}
// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);
// 省略部分代碼
notifyExport(exporter);
return new DestroyableExporter<>(exporter);
}
進入register代碼:
private void register(URL registryUrl, URL registeredProviderUrl) {
// 獲取 Registry
Registry registry = registryFactory.getRegistry(registryUrl);
// 注冊服務(wù)
registry.register(registeredProviderUrl);
}
register 方法包含兩步操作,第一步是獲取注冊中心實例烘豹,第二步是向注冊中心注冊服務(wù)瓜贾。接下來分兩節(jié)內(nèi)容對這兩步操作進行分析。
2.3.4.1 創(chuàng)建注冊中心
本節(jié)內(nèi)容以 Zookeeper 注冊中心為例進行分析携悯。下面先來看一下 getRegistry 方法的源碼祭芦,這個方法由 AbstractRegistryFactory 實現(xiàn)。如下:
public Registry getRegistry(URL url) {
if (destroyed.get()) {
return DEFAULT_NOP_REGISTRY;
}
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = createRegistryCacheKey(url);
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
// 訪問緩存
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
// 緩存未命中憔鬼,通過 spi/ioc 創(chuàng)建 Registry 實例
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
// 寫入緩存
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
如上龟劲,getRegistry 方法先訪問緩存胃夏,緩存未命中則調(diào)用 createRegistry 創(chuàng)建 Registry,然后寫入緩存昌跌。這里的 createRegistry 是一個模板方法仰禀,由具體的子類實現(xiàn)。因此蚕愤,下面我們到 ZookeeperRegistryFactory 中探究一番悼瘾。
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
// zookeeperTransporter 由 SPI 在運行時注入,類型為 ZookeeperTransporter$Adaptive
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
// 創(chuàng)建 ZookeeperRegistry
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
ZookeeperRegistryFactory 的 createRegistry 方法僅包含一句代碼审胸,無需解釋,繼續(xù)跟下去卸勺。
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 獲取組名砂沛,默認為 dubbo
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
// group = "/" + group
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 創(chuàng)建 Zookeeper 客戶端,默認為 CuratorZookeeperTransporter
zkClient = zookeeperTransporter.connect(url);
// 添加狀態(tài)監(jiān)聽器
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
在上面的代碼代碼中曙求,我們重點關(guān)注 ZookeeperTransporter 的 connect 方法調(diào)用碍庵,這個方法用于創(chuàng)建 Zookeeper 客戶端。創(chuàng)建好 Zookeeper 客戶端悟狱,意味著注冊中心的創(chuàng)建過程就結(jié)束了静浴。接下來,再來分析一下 Zookeeper 客戶端的創(chuàng)建過程挤渐。
前面說過苹享,這里的 zookeeperTransporter 類型為自適應(yīng)拓展類,因此 connect 方法會在被調(diào)用時決定加載什么類型的 ZookeeperTransporter 拓展浴麻,默認為 CuratorZookeeperTransporter得问。下面我們到 CuratorZookeeperTransporter 中看一看。
public ZookeeperClient connect(URL url) {
// 創(chuàng)建 CuratorZookeeperClient
return new CuratorZookeeperClient(url);
}
繼續(xù)向下看软免。
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {
private final CuratorFramework client;
public CuratorZookeeperClient(URL url) {
super(url);
try {
// 創(chuàng)建 CuratorFramework 構(gòu)造器
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(5000);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
// 構(gòu)建 CuratorFramework 實例
client = builder.build();
// 添加監(jiān)聽器
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
if (state == ConnectionState.LOST) {
CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
} else if (state == ConnectionState.CONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
} else if (state == ConnectionState.RECONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
}
}
});
// 啟動客戶端
client.start();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
CuratorZookeeperClient 構(gòu)造方法主要用于創(chuàng)建和啟動 CuratorFramework 實例宫纬。以上基本上都是 Curator 框架的代碼,大家如果對 Curator 框架不是很了解膏萧,可以參考 Curator 官方文檔漓骚。
本節(jié)分析了 ZookeeperRegistry 實例的創(chuàng)建過程,整個過程并不是很復(fù)雜榛泛。大家在看完分析后蝌蹂,可以自行調(diào)試,以加深理解〔芟牵現(xiàn)在注冊中心實例創(chuàng)建好了叉信,接下來要做的事情是向注冊中心注冊服務(wù),我們繼續(xù)往下看艘希。
2.3.4.2 節(jié)點創(chuàng)建
以 Zookeeper 為例硼身,所謂的服務(wù)注冊硅急,本質(zhì)上是將服務(wù)配置數(shù)據(jù)寫入到 Zookeeper 的某個路徑的節(jié)點下。為了讓大家有一個直觀的了解佳遂,下面我們將 Dubbo 的 demo 跑起來营袜,然后通過 Zookeeper 可視化客戶端 ZooInspector 查看節(jié)點數(shù)據(jù)。如下:
從上圖中可以看到 com.alibaba.dubbo.demo.DemoService 這個服務(wù)對應(yīng)的配置信息(存儲在 URL 中)最終被注冊到了 /dubbo/com.alibaba.dubbo.demo.DemoService/providers/ 節(jié)點下丑罪。搞懂了服務(wù)注冊的本質(zhì)荚板,那么接下來我們就可以去閱讀服務(wù)注冊的代碼了。服務(wù)注冊的接口為 register(URL)吩屹,這個方法定義在 FailbackRegistry 抽象類中跪另。代碼如下:
public void register(URL url) {
super.register(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// 模板方法,由子類實現(xiàn)
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 獲取 check 參數(shù)煤搜,若 check = true 將會直接拋出異常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register");
} else {
logger.error("Failed to register");
}
// 記錄注冊失敗的鏈接
failedRegistered.add(url);
}
}
protected abstract void doRegister(URL url);
如上免绿,我們重點關(guān)注 doRegister 方法調(diào)用即可,其他的代碼先忽略擦盾。doRegister 方法是一個模板方法嘲驾,因此我們到 FailbackRegistry 子類 ZookeeperRegistry 中進行分析。如下:
protected void doRegister(URL url) {
try {
// 通過 Zookeeper 客戶端創(chuàng)建節(jié)點迹卢,節(jié)點路徑由 toUrlPath 方法生成辽故,路徑格式如下:
// /${group}/${serviceInterface}/providers/${url}
// 比如
// /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register...");
}
}
如上,ZookeeperRegistry 在 doRegister 中調(diào)用了 Zookeeper 客戶端創(chuàng)建服務(wù)節(jié)點腐碱。節(jié)點路徑由 toUrlPath 方法生成誊垢,該方法邏輯不難理解,就不分析了症见。接下來分析 create 方法彤枢,如下:
public void create(String path, boolean ephemeral) {
if (!ephemeral) {
// 如果要創(chuàng)建的節(jié)點類型非臨時節(jié)點,那么這里要檢測節(jié)點是否存在
if (checkExists(path)) {
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
// 遞歸創(chuàng)建上一級路徑
create(path.substring(0, i), false);
}
// 根據(jù) ephemeral 的值創(chuàng)建臨時或持久節(jié)點
if (ephemeral) {
createEphemeral(path);
} else {
createPersistent(path);
}
}
上面方法先是通過遞歸創(chuàng)建當前節(jié)點的上一級路徑筒饰,然后再根據(jù) ephemeral 的值決定創(chuàng)建臨時還是持久節(jié)點缴啡。createEphemeral 和 createPersistent 這兩個方法都比較簡單,這里簡單分析其中的一個瓷们。如下:
public void createEphemeral(String path) {
try {
// 通過 Curator 框架創(chuàng)建節(jié)點
client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
} catch (NodeExistsException e) {
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
好了业栅,到此關(guān)于服務(wù)注冊的過程就分析完了。整個過程可簡單總結(jié)為:先創(chuàng)建注冊中心實例谬晕,之后再通過注冊中心實例注冊服務(wù)碘裕。
3.總結(jié)
本篇文章詳細分析了 Dubbo 服務(wù)導(dǎo)出過程,包括配置檢測攒钳,URL 組裝帮孔,Invoker 創(chuàng)建過程、導(dǎo)出服務(wù)以及注冊服務(wù)等等。篇幅比較大文兢,需要大家耐心閱讀晤斩。本篇文章先就到這,如果文章有不妥錯誤之處姆坚,希望大家能夠進行反饋或修正澳泵。
4.參考資料
本文參考于Dubbo官網(wǎng),詳情以官網(wǎng)最新文檔為準兼呵。