目錄
dubbo 拓展機(jī)制 SPI
dubbo 自適應(yīng)拓展機(jī)制
dubbo 服務(wù)導(dǎo)出
dubbo 服務(wù)引用
dubbo 服務(wù)字典
dubbo 服務(wù)路由
dubbo 集群容錯策略
dubbo 負(fù)載均衡
dubbo 服務(wù)調(diào)用過程
Dubbo 服務(wù)導(dǎo)出過程始于 Spring 容器發(fā)布刷新事件咐扭,Dubbo 在接收到事件后燃观,會立即執(zhí)行服務(wù)導(dǎo)出邏輯。整個邏輯大致可分為三個部分嘱兼,第一部分是前置工作,主要用于檢查參數(shù)情屹,組裝 URL邻遏。第二部分是導(dǎo)出服務(wù),包含導(dǎo)出服務(wù)到本地 (JVM)舌狗,和導(dǎo)出服務(wù)到遠(yuǎn)程兩個過程。第三部分是向注冊中心注冊服務(wù)扔水,用于服務(wù)發(fā)現(xiàn)痛侍。
ServiceBean是整個服務(wù)導(dǎo)出的核心類,它實現(xiàn)了
- InitializingBean:從applicationContext中獲取如protocol魔市,module等配置信息主届,并且在注冊監(jiān)聽上下文刷新的事件赵哲,失敗時立即進(jìn)行服務(wù)導(dǎo)出服務(wù)。
- DisposableBean:bean的摧毀(2.7.3版本中已經(jīng)是空實現(xiàn)了君丁,因為前面已經(jīng)注冊了掛鉤)枫夺。
- ApplicationContextAware:保存applicationContext對象,注冊掛鉤當(dāng) jvm 關(guān)閉時關(guān)閉所有的鏈接以及摧毀已經(jīng)注冊了的 url 地址谈截,注冊監(jiān)聽上下文刷新的事件筷屡。
- ApplicationListener<ContextRefreshedEvent>:監(jiān)聽上下文的刷新,判斷是否需要導(dǎo)出服務(wù)簸喂。
- BeanNameAware:設(shè)置 beanName毙死。
- ApplicationEventPublisherAware:用于發(fā)布事件,當(dāng)導(dǎo)出服務(wù)完成時喻鳄,發(fā)布一個ServiceBeanExportedEvent事件扼倘,dubbo監(jiān)聽到這個事件后會查看這個事件包含的類是否在本地調(diào)用中,如果在就立刻執(zhí)行服務(wù)引入除呵。
等接口再菊。
大致流程:
首先在ServiceBean這個類初始化之前,將自己作為ApplicationListener添加到容器中颜曾,然后在容器啟動完成時纠拔,監(jiān)聽容器刷新完成的事件,也就是ServiceBean 的 onApplicationEvent方法泛豪。onApplicationEvent 是一個事件響應(yīng)方法稠诲,該方法會在收到 Spring 上下文刷新事件后執(zhí)行服務(wù)導(dǎo)出操作。
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
// 保存 applicationContext,并且在 jvm 關(guān)閉時,刪除所有的register service和關(guān)閉所有的連接
SpringExtensionFactory.addApplicationContext(applicationContext);
// 當(dāng)上下文狀態(tài)變化時震嫉,添加監(jiān)聽事件
supportedApplicationListener = addApplicationListener(applicationContext, this);
}
@Override
/**
* 監(jiān)聽ContextRefreshedEvent事件,當(dāng)所有的bean都初始化完成并被成功裝載或后會觸發(fā)該事件
*/
public void onApplicationEvent(ContextRefreshedEvent event) {
// 是否已導(dǎo)出 && 是不是已被取消導(dǎo)出
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
// 導(dǎo)出服務(wù)
export();
}
}
代碼比較簡單劝萤,繼續(xù)向下看export方法
public void export() {
super.export();
// Publish ServiceBeanExportedEvent
// 發(fā)布事件,spring攔截事件調(diào)用 referenceBean 的 get() 進(jìn)行服務(wù)引入
publishExportEvent();
}
這里主要分析的是super.export方法慎璧。
public synchronized void export() {
// 檢測和修改配置
checkAndUpdateSubConfigs();
// 是否顯示的使用AbstractServiceBuilder設(shè)置export床嫌,
// 否則使用<dubbo:provider export="" />的配置
if (!shouldExport()) {
return;
}
// 是否顯示的使用AbstractServiceBuilder設(shè)置delay,
// 否則使用<dubbo:provider delay="" />的配置
if (shouldDelay()) {
// 延遲發(fā)布服務(wù)
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 發(fā)布服務(wù)
doExport();
}
}
繼續(xù)看doExport方法炸卑,該方法中先是做了一些狀態(tài)判斷既鞠,最主要的是其中的doExportUrls方法
private void doExportUrls() {
// 多注冊中心組裝注冊中心的url
List<URL> registryURLs = loadRegistries(true);
// 多協(xié)議
for (ProtocolConfig protocolConfig : protocols) {
// 獲取protocol中配置的contextpath,缺省獲取provider中的contextpath
// group/(contextpath/path或path):version
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
// 服務(wù)提供者模型
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
// 保存pathKey和providerModel的映射關(guān)系
ApplicationModel.initProviderModel(pathKey, providerModel);
// 組裝 URL
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
首先看看loadRegistries方法
loadRegistries 方法主要包含如下的邏輯:
1.檢測是否存在注冊中心配置類盖文,不存在則拋出異常
2.構(gòu)建參數(shù)映射集合,也就是 map
3.構(gòu)建注冊中心鏈接列表
4.遍歷鏈接列表蚯姆,并根據(jù)條件決定是否將其添加到 registryList 中
protected List<URL> loadRegistries(boolean provider) {
// check && override if necessary
List<URL> registryList = new ArrayList<URL>();
if (CollectionUtils.isNotEmpty(registries)) {
for (RegistryConfig config : registries) {
String address = config.getAddress();
if (StringUtils.isEmpty(address)) {
// 若 address 為空五续,則將其設(shè)為 0.0.0.0
address = ANYHOST_VALUE;
}
// address不為n/a
if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
// 添加 ApplicationConfig 中的字段信息到 map 中
appendParameters(map, application);
// 添加 RegistryConfig 字段信息到 map 中
appendParameters(map, config);
// 添加 path洒敏,protocol 等信息到 map 中
map.put(PATH_KEY, RegistryService.class.getName());
appendRuntimeParameters(map);
if (!map.containsKey(PROTOCOL_KEY)) {
map.put(PROTOCOL_KEY, DUBBO_PROTOCOL);
}
// 解析得到 URL 列表,address 可能包含多個注冊中心 ip疙驾,
// 因此解析得到的是一個 URL 列表
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
url = URLBuilder.from(url)
.addParameter(REGISTRY_KEY, url.getProtocol())
// 將 URL 協(xié)議頭設(shè)置為 registry
.setProtocol(REGISTRY_PROTOCOL)
.build();
// 通過判斷條件凶伙,決定是否添加 url 到 registryList 中,條件如下:
// (服務(wù)提供者 && register = true 或 null)
// || (非服務(wù)提供者 && subscribe = true 或 null)
if ((provider && url.getParameter(REGISTER_KEY, true))
|| (!provider && url.getParameter(SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
}
回到doExportUrls中它碎,在通過注冊中心配置了對應(yīng)的URL之后函荣,就是通過doExportUrlsFor1Protocol方法在各個協(xié)議的基礎(chǔ)上向所有的注冊中心注冊服務(wù)。
URL 是 Dubbo 配置的載體扳肛,通過 URL 可讓 Dubbo 的各種配置在各個模塊之間傳遞
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// 默認(rèn)使用dubbo協(xié)議
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
Map<String, String> map = new HashMap<String, String>();
// 添加 side傻挂、版本、時間戳以及進(jìn)程號等信息到 map 中
map.put(SIDE_KEY, PROVIDER_SIDE);
// 添加時間戳挖息,dubbo 版本金拒,pid等運行時參數(shù)
appendRuntimeParameters(map);
// 通過反射將對象的字段信息添加到 map 中
appendParameters(map, metrics);
appendParameters(map, application);
appendParameters(map, module);
// remove 'default.' prefix for configs from ProviderConfig
// appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, provider);
appendParameters(map, protocolConfig);
appendParameters(map, this);
// methods 為 MethodConfig 集合,MethodConfig 中存儲了 <dubbo:method> 標(biāo)簽的配置信息
// 這段代碼用于添加 Callback 配置到 map 中
if (CollectionUtils.isNotEmpty(methods)) {
for (MethodConfig method : methods) {
// 添加 MethodConfig 對象的字段信息到 map 中套腹,鍵 = 方法名.屬性名绪抛。
// 比如存儲 <dubbo:method name="sayHello" retries="2"> 對應(yīng)的 MethodConfig,
// 鍵 = sayHello.retries电禀,map = {"sayHello.retries": 2, "xxx": "yyy"}
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
// 檢測 MethodConfig retry 是否為 false幢码,若是,則設(shè)置重試次數(shù)為0
if (Boolean.FALSE.toString().equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
// 獲取 ArgumentConfig 列表
// 設(shè)置方法的參數(shù)和值到map中
List<ArgumentConfig> arguments = method.getArguments();
if (CollectionUtils.isNotEmpty(arguments)) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
// 檢測 ArgumentConfig 中的 type 屬性與方法參數(shù)列表中的參數(shù)名稱是否一致尖飞,不一致則拋出異常
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
// 添加 ArgumentConfig 字段信息到 map 中症副,
// 鍵前綴 = 方法名.index,比如:
// map = {"sayHello.3": true}
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
// 從參數(shù)類型列表中查找類型名稱為 argument.type 的參數(shù)
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
// 添加 ArgumentConfig 字段信息到 map 中
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
// 檢測 generic 是否為 "true"葫松,并根據(jù)檢測結(jié)果向 map 中添加不同的信息
if (ProtocolUtils.isGeneric(generic)) {
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
// 為接口生成包裹類 Wrapper瓦糕,Wrapper 中包含了接口的詳細(xì)信息,比如接口方法名數(shù)組腋么,字段信息等
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
// 將逗號作為分隔符連接方法名咕娄,并將連接后的字符串放入 map 中
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
// 添加 token 到 map 中
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
// 生成隨機(jī)token
map.put(TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(TOKEN_KEY, token);
}
}
// export service
// 獲取 host 和 port
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
// 組裝 URL
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
// 加載 ConfiguratorFactory,并生成 Configurator 實例珊擂,然后通過實例配置 url
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
// 當(dāng)scope為none時圣勒,不導(dǎo)出服務(wù)
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
// scope != remote,導(dǎo)出到本地
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
// scope != local摧扇,導(dǎo)出到遠(yuǎn)程
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
// dynamic配置圣贸,是否開啟動態(tài)注冊
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
// 加載監(jiān)視器鏈接
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
// 將監(jiān)視器鏈接作為參數(shù)添加到 url 中
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 為服務(wù)提供類(ref)生成 Invoker
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 導(dǎo)出服務(wù),并生成 Exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
// 不存在注冊中心扛稽,僅導(dǎo)出服務(wù)
else {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
* 2.7 以后精簡url中的配置項
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
this.urls.add(url);
}
代碼很長主要分成兩部分來看
- getWrapper對類生成代理吁峻,當(dāng)服務(wù)調(diào)用方調(diào)用方法的時候,會首先調(diào)用代理類,代理類再調(diào)用對應(yīng)的方法
- 執(zhí)行服務(wù)導(dǎo)出和服務(wù)注冊
1.getWrapper
public static Wrapper getWrapper(Class<?> c) {
// can not wrapper on dynamic class.
while (ClassGenerator.isDynamicClass(c))
{
c = c.getSuperclass();
}
if (c == Object.class) {
return OBJECT_WRAPPER;
}
Wrapper ret = WRAPPER_MAP.get(c);
if (ret == null) {
ret = makeWrapper(c);
WRAPPER_MAP.put(c, ret);
}
return ret;
}
通過makeWrapper方法生成代理類用含,然后保存代理類到WRAPPER_MAP緩存中
private static Wrapper makeWrapper(Class<?> c) {
// 檢測 c 是否為基本類型矮慕,若是則拋出異常
if (c.isPrimitive()) {
throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);
}
String name = c.getName();
ClassLoader cl = ClassUtils.getClassLoader(c);
// c1 用于存儲 setPropertyValue 方法代碼
StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
// c2 用于存儲 getPropertyValue 方法代碼
StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
// c3 用于存儲 invokeMethod 方法代碼
StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");
// 生成類型轉(zhuǎn)換代碼及異常捕捉代碼,比如:
// DemoService w; try { w = ((DemoServcie) $1); }}catch(Throwable e){ throw new IllegalArgumentException(e); }
c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
// pts 用于存儲成員變量名和類型
Map<String, Class<?>> pts = new HashMap<>(); // <property name, property types>
// ms 用于存儲方法描述信息(可理解為方法簽名)及 Method 實例
Map<String, Method> ms = new LinkedHashMap<>(); // <method desc, Method instance>
// mns 為方法名列表
List<String> mns = new ArrayList<>(); // method names.
// dmns 用于存儲“定義在當(dāng)前類中的方法”的名稱
List<String> dmns = new ArrayList<>(); // declaring method names.
// get all public field.
// 獲取 public 訪問級別的字段啄骇,并為所有字段生成條件判斷語句
for (Field f : c.getFields()) {
String fn = f.getName();
Class<?> ft = f.getType();
// 忽略關(guān)鍵字 static 或 transient 修飾的變量
if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers())) {
continue;
}
// 生成條件判斷及賦值語句痴鳄,比如:
// if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;}
// if( $2.equals("age") ) { w.age = ((Number) $3).intValue(); return;}
c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");
// 生成條件判斷及返回語句,比如:
// if( $2.equals("name") ) { return ($w)w.name; }
c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");
// 存儲 <字段名, 字段類型> 鍵值對到 pts 中
pts.put(fn, ft);
}
Method[] methods = c.getMethods();
// get all public method.
// 檢測 c 中是否包含在當(dāng)前類中聲明的方法
boolean hasMethod = hasMethods(methods);
if (hasMethod) {
c3.append(" try{");
for (Method m : methods) {
//ignore Object's method.
// 忽略 Object 中定義的方法
if (m.getDeclaringClass() == Object.class) {
continue;
}
String mn = m.getName();
// 生成方法名判斷語句缸夹,比如:
// if ( "sayHello".equals( $2 )
c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");
int len = m.getParameterTypes().length;
// 生成“運行時傳入的參數(shù)數(shù)量與方法參數(shù)列表長度”判斷語句痪寻,比如:
// && $3.length == 2
c3.append(" && ").append(" $3.length == ").append(len);
boolean override = false;
for (Method m2 : methods) {
// 檢測方法是否存在重載情況,條件為:方法對象不同 && 方法名相同
if (m != m2 && m.getName().equals(m2.getName())) {
override = true;
break;
}
}
// 對重載方法進(jìn)行處理虽惭,考慮下面的方法:
// 1. void sayHello(Integer, String)
// 2. void sayHello(Integer, Integer)
// 方法名相同橡类,參數(shù)列表長度也相同,因此不能僅通過這兩項判斷兩個方法是否相等趟妥。
// 需要進(jìn)一步判斷方法的參數(shù)類型
if (override) {
if (len > 0) {
for (int l = 0; l < len; l++) {
// 生成參數(shù)類型進(jìn)行檢測代碼猫态,比如:
// && $3[0].getName().equals("java.lang.Integer")
// && $3[1].getName().equals("java.lang.String")
c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"")
.append(m.getParameterTypes()[l].getName()).append("\")");
}
}
}
// 添加 ) {,完成方法判斷語句披摄,此時生成的代碼可能如下(已格式化):
// if ("sayHello".equals($2)
// && $3.length == 2
// && $3[0].getName().equals("java.lang.Integer")
// && $3[1].getName().equals("java.lang.String")) {
c3.append(" ) { ");
// 根據(jù)返回值類型生成目標(biāo)方法調(diào)用語句
if (m.getReturnType() == Void.TYPE) {
// w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); return null;
c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");
} else {
// return w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");
}
// 添加 }, 生成的代碼形如(已格式化):
// if ("sayHello".equals($2)
// && $3.length == 2
// && $3[0].getName().equals("java.lang.Integer")
// && $3[1].getName().equals("java.lang.String")) {
//
// w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
// return null;
// }
c3.append(" }");
// 添加方法名到 mns 集合中
mns.add(mn);
// 檢測當(dāng)前方法是否在 c 中被聲明的
if (m.getDeclaringClass() == c) {
// 若是亲雪,則將當(dāng)前方法名添加到 dmns 中
dmns.add(mn);
}
ms.put(ReflectUtils.getDesc(m), m);
}
// 添加異常捕捉語句
c3.append(" } catch(Throwable e) { ");
c3.append(" throw new java.lang.reflect.InvocationTargetException(e); ");
c3.append(" }");
}
// 添加 NoSuchMethodException 異常拋出代碼
c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");
// deal with get/set method.
Matcher matcher;
for (Map.Entry<String, Method> entry : ms.entrySet()) {
String md = entry.getKey();
Method method = entry.getValue();
// 匹配以 get 開頭的方法
if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
// 獲取屬性名
String pn = propertyName(matcher.group(1));
// 生成屬性判斷以及返回語句,示例如下:
// if( $2.equals("name") ) { return ($w).w.getName(); }
c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
pts.put(pn, method.getReturnType());
}
// 匹配以 is/has/can 開頭的方法
else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {
String pn = propertyName(matcher.group(1));
// 生成屬性判斷以及返回語句疚膊,示例如下:
// if( $2.equals("dream") ) { return ($w).w.hasDream(); }
c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
pts.put(pn, method.getReturnType());
}
// 匹配以 set 開頭的方法
else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
Class<?> pt = method.getParameterTypes()[0];
String pn = propertyName(matcher.group(1));
// 生成屬性判斷以及 setter 調(diào)用語句义辕,示例如下:
// if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; }
c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }");
pts.put(pn, pt);
}
}
// 添加 NoSuchPropertyException 異常拋出代碼
c1.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }");
c2.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }");
// make class
long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
// 創(chuàng)建類生成器
ClassGenerator cc = ClassGenerator.newInstance(cl);
// 設(shè)置類名及超類
cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);
cc.setSuperClass(Wrapper.class);
// 添加默認(rèn)構(gòu)造方法
cc.addDefaultConstructor();
// 添加字段
cc.addField("public static String[] pns;"); // property name array.
cc.addField("public static " + Map.class.getName() + " pts;"); // property type map.
cc.addField("public static String[] mns;"); // all method name array.
cc.addField("public static String[] dmns;"); // declared method name array.
for (int i = 0, len = ms.size(); i < len; i++) {
cc.addField("public static Class[] mts" + i + ";");
}
// 添加方法代碼
cc.addMethod("public String[] getPropertyNames(){ return pns; }");
cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");
cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");
cc.addMethod("public String[] getMethodNames(){ return mns; }");
cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
cc.addMethod(c1.toString());
cc.addMethod(c2.toString());
cc.addMethod(c3.toString());
try {
// 生成類
Class<?> wc = cc.toClass();
// setup static field.
// 設(shè)置字段值
wc.getField("pts").set(null, pts);
wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
wc.getField("mns").set(null, mns.toArray(new String[0]));
wc.getField("dmns").set(null, dmns.toArray(new String[0]));
int ix = 0;
for (Method m : ms.values()) {
wc.getField("mts" + ix++).set(null, m.getParameterTypes());
}
// 創(chuàng)建 Wrapper 實例
return (Wrapper) wc.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Throwable e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
cc.release();
ms.clear();
mns.clear();
dmns.clear();
}
}
例如源碼demo中的DemoService生成的wrapper類如下:
public class Wrapper0
extends Wrapper
implements ClassGenerator.DC {
public static String[] pns;
public static Map pts;
public static String[] mns;
public static String[] dmns;
public static Class[] mts0;
public String[] getPropertyNames() {
return pns;
}
public boolean hasProperty(String string) {
return pts.containsKey(string);
}
public Class getPropertyType(String string) {
return (Class)pts.get(string);
}
public String[] getMethodNames() {
return mns;
}
public String[] getDeclaredMethodNames() {
return dmns;
}
public void setPropertyValue(Object object, String string, Object object2) {
try {
DemoService demoService = (DemoService)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class org.apache.dubbo.demo.DemoService.").toString());
}
public Object getPropertyValue(Object object, String string) {
try {
DemoService demoService = (DemoService)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class org.apache.dubbo.demo.DemoService.").toString());
}
public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
DemoService demoService;
try {
demoService = (DemoService)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
try {
if ("sayHello".equals(string) && arrclass.length == 1) {
return demoService.sayHello((String)arrobject[0]);
}
}
catch (Throwable throwable) {
throw new InvocationTargetException(throwable);
}
throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class org.apache.dubbo.demo.DemoService.").toString());
}
}
2.導(dǎo)出服務(wù)到本地
根據(jù)執(zhí)行順序先來看看exportLocal
private void exportLocal(URL url) {
// 設(shè)置協(xié)議頭為 injvm
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
// 創(chuàng)建 Invoker,并導(dǎo)出服務(wù)寓盗,這里的 protocol 會在運行時調(diào)用 InjvmProtocol 的 export 方法
Exporter<?> exporter = protocol.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}
調(diào)用的是InjvmProtocol的實現(xiàn)灌砖,把生成的invoker保存在AbstractExporter中,當(dāng)調(diào)用invoker的doInvoke方法時會通過wrapper類調(diào)用具體實現(xiàn)類的方法
3.導(dǎo)出服務(wù)到遠(yuǎn)程
Invoker 是由 ProxyFactory 創(chuàng)建而來傀蚌。ProxyFactory有多個實現(xiàn)類基显,那么這里使用的是哪個實現(xiàn)類呢?答案就在前文的創(chuàng)建自適應(yīng)類中善炫,查看前文生成的ProxyFactory自適應(yīng)類撩幽,發(fā)現(xiàn)是根據(jù)URL的proxy參數(shù)獲取對應(yīng)的實現(xiàn)類,沒有配置則默認(rèn)取JavassistProxyFactory箩艺。下面到 JavassistProxyFactory 代碼中窜醉,探索 Invoker 的創(chuàng)建過程。
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
// 為目標(biāo)類創(chuàng)建 Wrapper
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 創(chuàng)建匿名 Invoker 類對象艺谆,并實現(xiàn) doInvoke 方法
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 調(diào)用 Wrapper 的 invokeMethod 方法榨惰,invokeMethod 最終會調(diào)用目標(biāo)方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
這里依然是通過getWrapper生成代理類,然后再使用AbstractProxyInvoker包裝一下静汤,AbstractProxyInvoker也就是直接調(diào)用代理類的invokeMethod方法琅催。
回到doExportUrlsFor1Protocol中居凶,接下來是導(dǎo)出服務(wù)到遠(yuǎn)程
Exporter<?> exporter = protocol.export(wrapperInvoker);
其中這里用到了前面提到的protocol的自適應(yīng)生成類,生成類export方法代碼例子如下
public Exporter export(Invoker invoker) throws RpcException {
String string;
if (invoker == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
}
if (invoker.getUrl() == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
}
URL uRL = invoker.getUrl();
String string2 = string = uRL.getProtocol() == null ? "dubbo" : uRL.getProtocol();
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (").append(uRL.toString()).append(") use keys([protocol])").toString());
}
Protocol protocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(string);
return protocol.export(invoker);
}
從這里看出會根據(jù)wrapperInvoker中的url屬性的protocol元素路由到具體的實現(xiàn)類恢暖。而在前面的loadRegistries方法會把url的protocol設(shè)置為registry排监。所以接下來就到了RegistryProtocol的export方法進(jìn)行遠(yuǎn)程發(fā)布狰右,接下來就看這個export方法(PS:這里從自適應(yīng)類之后是先到ProtocolListenerWrapper然后到ProtocolFilterWrapper杰捂,因為它們兩是protocol這個類的包裝類,但是在provider端export方法什么也不會做棋蚌,所以這里直接分析RegistryProtocol的export)嫁佳。
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 獲取注冊中心 URL,以 zookeeper 注冊中心為例谷暮,得到的示例 URL 如下:
// zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
// 獲取已注冊的服務(wù)提供者 URL蒿往,比如:
// dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
// 獲取訂閱 URL,比如:
// provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
// 向注冊中心進(jìn)行訂閱 override 數(shù)據(jù)
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
// 創(chuàng)建監(jiān)聽器
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 根據(jù)配置修改providerUrl湿弦,
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
先來看下doLocalExport方法
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
// 調(diào)用 protocol 的 export 方法導(dǎo)出服務(wù)
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
假設(shè)運行時協(xié)議為 dubbo瓤漏,此處的 protocol 變量會在運行時加載 DubboProtocol,并調(diào)用 DubboProtocol 的 export 方法颊埃。所以蔬充,接下來目光轉(zhuǎn)移到 DubboProtocol 的 export 方法上,相關(guān)分析如下
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
// 獲取服務(wù)標(biāo)識班利,理解成服務(wù)坐標(biāo)也行饥漫。由服務(wù)組名,服務(wù)名罗标,服務(wù)版本號以及端口組成庸队。比如:
// demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
String key = serviceKey(url);
// 創(chuàng)建 DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 將 <key, exporter> 鍵值對放入緩存中
exporterMap.put(key, exporter);
//export an stub service for dispatching event
// 本地存根相關(guān)代碼
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 啟動服務(wù)器
openServer(url);
// 優(yōu)化序列化
optimizeSerialization(url);
return exporter;
}
重點看看openServer方法,別的邏輯不理解也不影響理解服務(wù)導(dǎo)出的流程
private void openServer(URL url) {
// find server.
// 獲取 host:port闯割,并將其作為服務(wù)器實例的 key彻消,用于標(biāo)識當(dāng)前的服務(wù)器實例
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
// 訪問緩存
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
// 創(chuàng)建服務(wù)器實例
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
// 服務(wù)器已創(chuàng)建,則根據(jù) url 中的配置重置服務(wù)器
server.reset(url);
}
}
}
在同一臺機(jī)器上(單網(wǎng)卡)宙拉,同一個端口上僅允許啟動一個服務(wù)器實例宾尚。若某個端口上已有服務(wù)器實例,此時則調(diào)用 reset 方法重置服務(wù)器的一些配置鼓黔。接下來看看createServer央勒。
private ExchangeServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
// 添加心跳檢測配置到 url 中
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
// 添加編碼解碼器參數(shù)
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
// 獲取 server 參數(shù),默認(rèn)為 netty
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
// 通過 SPI 檢測是否存在 server 參數(shù)所代表的 Transporter 拓展澳化,不存在則拋出異常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {
// 創(chuàng)建 ExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 獲取 client 參數(shù)崔步,可指定 netty,mina
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
// 獲取所有的 Transporter 實現(xiàn)類名稱集合缎谷,比如 supportedTypes = [netty, mina]
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
// 檢測當(dāng)前 Dubbo 所支持的 Transporter 實現(xiàn)類名稱列表中井濒,
// 是否包含 client 所表示的 Transporter灶似,若不包含,則拋出異常
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
createServer 包含三個核心的邏輯瑞你。第一是檢測是否存在 server 參數(shù)所代表的 Transporter 拓展酪惭,不存在則拋出異常。第二是創(chuàng)建服務(wù)器實例者甲。第三是檢測是否支持 client 參數(shù)所表示的 Transporter 拓展春感,不存在也是拋出異常。
接下來繼續(xù)看bind方法
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
// 獲取 Exchanger虏缸,默認(rèn)為 HeaderExchanger鲫懒。
// 緊接著調(diào)用 HeaderExchanger 的 bind 方法創(chuàng)建 ExchangeServer 實例
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
這里又是通過dubbo SPI加載Exchanger的實現(xiàn)類,默認(rèn)實現(xiàn)是HeaderExchanger刽辙。接下來繼續(xù)看HeaderExchanger的bind方法
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
這里只有一行代碼窥岩,但是包含了三個邏輯:
- new HeaderExchangeHandler(handler),
- new DecodeHandler(new HeaderExchangeHandler(handler))
- Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
僅需關(guān)心 Transporters 的 bind 方法邏輯宰缤。
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handlers 元素數(shù)量大于1颂翼,則創(chuàng)建 ChannelHandler 分發(fā)器
handler = new ChannelHandlerDispatcher(handlers);
}
// 獲取自適應(yīng) Transporter 實例,并調(diào)用實例方法
return getTransporter().bind(url, handler);
}
自適應(yīng)拓展加載 Transporter慨灭,默認(rèn)為 NettyTransporter
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
// 創(chuàng)建 NettyServer
return new NettyServer(url, listener);
}
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
接下來看看ChannelHandlers.wrap朦乏,這個方法很有意思,是dubbo的線程派發(fā)模型的實現(xiàn)缘挑。
public class ChannelHandlers {
private static ChannelHandlers INSTANCE = new ChannelHandlers();
protected ChannelHandlers() {
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected static ChannelHandlers getInstance() {
return INSTANCE;
}
static void setTestingChannelHandlers(ChannelHandlers instance) {
INSTANCE = instance;
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
// 一層又一層的裝飾器集歇,只關(guān)注最里面的dispatch方法
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
這里默認(rèn)的自適應(yīng)類是AllDispatcher,在后面的服務(wù)調(diào)用過程语淘,將會詳細(xì)講解一下诲宇。回到NettyServer惶翻,這里調(diào)用了父類AbstractServer的構(gòu)造方法姑蓝。
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
// 獲取 ip 和端口
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
// 設(shè)置 ip 為 0.0.0.0
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 獲取最大可接受連接數(shù)
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {
// 調(diào)用模板方法 doOpen 啟動服務(wù)器
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
doOpen方法由子類實現(xiàn),下面回到nettyservice中
protected void doOpen() throws Throwable {
// 創(chuàng)建 ServerBootstrap
bootstrap = new ServerBootstrap();
// 創(chuàng)建 boss 和 worker 線程池
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
// 設(shè)置 PipelineFactory
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
// 綁定到指定的 ip 和端口上
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
到此就已經(jīng)完成了在指定的端口開啟netty服務(wù)的過程吕粗,以及配置了一系列層層包裝的ChannelHandler纺荧。
回到RegistryProtocol的export,繼續(xù)看服務(wù)注冊和數(shù)據(jù)訂閱的邏輯
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
.......
// url to registry
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
// 注冊服務(wù)
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
public void register(URL registryUrl, URL registeredProviderUrl) {
// 獲取 Registry
Registry registry = registryFactory.getRegistry(registryUrl);
// 注冊服務(wù)
registry.register(registeredProviderUrl);
}
以 Zookeeper 注冊中心為例進(jìn)行分析颅筋。下面先來看一下 getRegistry 方法的源碼宙暇,這個方法由 ZookeeperRegistryFactory 的父類 AbstractRegistryFactory 實現(xiàn)。
public Registry getRegistry(URL url) {
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = url.toServiceStringWithoutResolving();
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
// 訪問緩存
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
// 緩存未命中议泵,創(chuàng)建 Registry 實例
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
// 寫入緩存
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
接下來去ZookeeperRegistryFactory中繼續(xù)看createRegistry方法
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 獲取組名占贫,默認(rèn)為 dubbo
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
// group = "/" + group
group = PATH_SEPARATOR + group;
}
this.root = group;
// 創(chuàng)建 Zookeeper 客戶端,默認(rèn)為 CuratorZookeeperTransporter
zkClient = zookeeperTransporter.connect(url);
// 添加狀態(tài)監(jiān)聽器
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
// zookeeperTransporter 由 SPI 在運行時注入先口,類型為 ZookeeperTransporter$Adaptive
private ZookeeperTransporter zookeeperTransporter;
/**
* Invisible injection of zookeeper client via IOC/SPI
* @param zookeeperTransporter
*/
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
// 創(chuàng)建 ZookeeperRegistry
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
這里提一下recover()方法型奥,在重連zookeeper后會做兩個動作瞳收。
- 1.添加所有的注冊連接到失敗連接集合中,并且創(chuàng)建定時timer默認(rèn)5秒中后重新建立連接厢汹,建立成功則從失敗連接集合中刪除螟深。
- 2.通知directory,刷新配置烫葬,通知成功則從失敗集合刪除界弧。
接下來看看zookeeperTransporter.connect方法怎么獲取zkclient的
public ZookeeperClient connect(URL url) {
ZookeeperClient zookeeperClient;
List<String> addressList = getURLBackupAddress(url);
// The field define the zookeeper server , including protocol, host, port, username, password
// 根據(jù)連接地址從緩存獲取zkclient連接
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
logger.info("find valid zookeeper client from the cache for address: " + url);
return zookeeperClient;
}
// avoid creating too many connections, so add lock
synchronized (zookeeperClientMap) {
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
logger.info("find valid zookeeper client from the cache for address: " + url);
return zookeeperClient;
}
// 緩存中獲取不到時新建
zookeeperClient = createZookeeperClient(toClientURL(url));
logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
// 寫進(jìn)緩存
writeToClientMap(addressList, zookeeperClient);
}
return zookeeperClient;
}
其中createZookeeperClient由子類實現(xiàn)厘灼,繼續(xù)看CuratorZookeeperClient的createZookeeperClient方法
public ZookeeperClient createZookeeperClient(URL url) {
return new CuratorZookeeperClient(url);
}
public CuratorZookeeperClient(URL url) {
super(url);
try {
int timeout = url.getParameter(TIMEOUT_KEY, 5000);
// 創(chuàng)建 CuratorFramework 構(gòu)造器
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(timeout);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
// 構(gòu)建 CuratorFramework 實例
client = builder.build();
// 添加監(jiān)聽器
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
if (state == ConnectionState.LOST) {
CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
} else if (state == ConnectionState.CONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
} else if (state == ConnectionState.RECONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
}
}
});
// 啟動客戶端
client.start();
boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
if (!connected) {
throw new IllegalStateException("zookeeper not connected");
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
獲取到Registry對象后就是調(diào)用registry.register正式的注冊服務(wù)夹纫,繼續(xù)看FailbackRegistry(ZookeeperRegistry的父類)的register方法
public void register(URL url) {
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
// 模板方法,由子類實現(xiàn)
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
// 獲取 check 參數(shù)设凹,若 check = true 將會直接拋出異常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
// 記錄注冊失敗的鏈接
addFailedRegistered(url);
}
}
接著看doRegister,回到 FailbackRegistry 子類 ZookeeperRegistry 中
protected void doRegister(URL url) {
try {
// 通過 Zookeeper 客戶端創(chuàng)建節(jié)點茅姜,節(jié)點路徑由 toUrlPath 方法生成闪朱,路徑格式如下:
// /${group}/${serviceInterface}/providers/${url}
// 比如
// /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register...");
}
}
到此,dubbo服務(wù)就把自己的信息通過zkClient注冊到了特定的路徑下钻洒,并且注冊的是zk的臨時節(jié)點奋姿。