前言
本文接著深入分析服務(wù)引用的核心流程胁后。
Dubbo 支持兩種方式引用遠(yuǎn)程的服務(wù):
1挟阻、服務(wù)直連的方式琼娘,僅適合在調(diào)試服務(wù)的時(shí)候使用。
2赁濒、基于注冊(cè)中心引用服務(wù)轨奄,這是生產(chǎn)環(huán)境中使用的服務(wù)引用方式孟害。
DubboBootstrap 入口
在介紹服務(wù)發(fā)布的時(shí)候拒炎,介紹了 DubboBootstrap.start() 方法的核心流程,其中除了會(huì)調(diào)用 exportServices() 方法完成服務(wù)發(fā)布之外挨务,還會(huì)調(diào)用 referServices() 方法完成服務(wù)引用击你。
在 DubboBootstrap.referServices() 方法中,會(huì)從 ConfigManager 中獲取所有 ReferenceConfig 列表谎柄,并根據(jù) ReferenceConfig 獲取對(duì)應(yīng)的代理對(duì)象丁侄,入口邏輯如下:
public class DubboBootstrap extends GenericEventListener {
private final ConfigManager configManager;
private ReferenceConfigCache cache;
private final ExecutorRepository executorRepository = getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
private volatile boolean referAsync;
private List<CompletableFuture<Object>> asyncReferringFutures = new ArrayList<>();
private void referServices() {
if (cache == null) {
// 初始ReferenceConfigCache
cache = ReferenceConfigCache.getCache();
}
// 遍歷ReferenceConfig列表
configManager.getReferences().forEach(rc -> {
ReferenceConfig referenceConfig = (ReferenceConfig) rc;
referenceConfig.setBootstrap(this);
// 檢測(cè)ReferenceConfig是否已經(jīng)初始化
if (rc.shouldInit()) {
// 異步
if (referAsync) {
CompletableFuture<Object> future = ScheduledCompletableFuture.submit(
executorRepository.getServiceExporterExecutor(),
() -> cache.get(rc)
);
asyncReferringFutures.add(future);
} else {
// 同步
cache.get(rc);
}
}
});
}
}
Dubbo 服務(wù)引用的時(shí)機(jī)有兩個(gè),第一個(gè)是在 Spring 容器調(diào)用 ReferenceBean 的 afterPropertiesSet 方法時(shí)引用服務(wù)朝巫,第二個(gè)是在 ReferenceBean 對(duì)應(yīng)的服務(wù)被注入到其他類中時(shí)引用鸿摇。這兩個(gè)引用服務(wù)的時(shí)機(jī)區(qū)別在于,第一個(gè)是餓漢式的劈猿,第二個(gè)是懶漢式的拙吉。默認(rèn)情況下,Dubbo 使用懶漢式引用服務(wù)揪荣。
新建的 ReferenceConfig 對(duì)象會(huì)通過 DubboBootstrap.reference() 方法添加到 ConfigManager 中進(jìn)行管理筷黔,如下所示:
public class DubboBootstrap extends GenericEventListener {
private final ConfigManager configManager;
public DubboBootstrap reference(ReferenceConfig<?> referenceConfig) {
configManager.addReference(referenceConfig);
return this;
}
}
ReferenceConfigCache
服務(wù)引用的核心實(shí)現(xiàn)在 ReferenceConfig 之中,一個(gè) ReferenceConfig 對(duì)象對(duì)應(yīng)一個(gè)服務(wù)接口仗颈,每個(gè) ReferenceConfig 對(duì)象中都封裝了與注冊(cè)中心的網(wǎng)絡(luò)連接佛舱,以及與 Provider 的網(wǎng)絡(luò)連接,這是一個(gè)非常重要的對(duì)象。
為了避免底層連接泄漏造成性能問題请祖,從 Dubbo 2.4.0 版本開始订歪,Dubbo 提供了 ReferenceConfigCache 用于緩存 ReferenceConfig 實(shí)例。
在 dubbo-demo-api-consumer 示例中肆捕,我們可以看到 ReferenceConfigCache 的基本使用方式:
ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
reference.setInterface(DemoService.class);
...
// 這一步在DubboBootstrap.start()方法中完成
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
...
DemoService demoService = ReferenceConfigCache.getCache().get(reference);
在 ReferenceConfigCache 中維護(hù)了一個(gè)靜態(tài)的 Map(CACHE_HOLDER)字段陌粹,其中 Key 是由 Group、服務(wù)接口和 version 構(gòu)成福压,Value 是一個(gè) ReferenceConfigCache 對(duì)象掏秩。在 ReferenceConfigCache 中可以傳入一個(gè) KeyGenerator 用來修改緩存 Key 的生成邏輯,KeyGenerator 接口的定義如下:
public class ReferenceConfigCache {
public interface KeyGenerator {
String generateKey(ReferenceConfigBase<?> referenceConfig);
}
}
默認(rèn)的 KeyGenerator 實(shí)現(xiàn)是 ReferenceConfigCache 中的匿名內(nèi)部類荆姆,其對(duì)象由 DEFAULT_KEY_GENERATOR 這個(gè)靜態(tài)字段引用蒙幻,具體實(shí)現(xiàn)如下:
public class ReferenceConfigCache {
public static final String DEFAULT_NAME = "_DEFAULT_";
public static final KeyGenerator DEFAULT_KEY_GENERATOR = referenceConfig -> {
String iName = referenceConfig.getInterface();
if (StringUtils.isBlank(iName)) {
// 獲取服務(wù)接口名稱
Class<?> clazz = referenceConfig.getInterfaceClass();
iName = clazz.getName();
}
if (StringUtils.isBlank(iName)) {
throw new IllegalArgumentException("No interface info in ReferenceConfig" + referenceConfig);
}
// Key的格式是group/interface:version
StringBuilder ret = new StringBuilder();
if (!StringUtils.isBlank(referenceConfig.getGroup())) {
ret.append(referenceConfig.getGroup()).append("/");
}
ret.append(iName);
if (!StringUtils.isBlank(referenceConfig.getVersion())) {
ret.append(":").append(referenceConfig.getVersion());
}
return ret.toString();
};
}
在 ReferenceConfigCache 實(shí)例對(duì)象中,會(huì)維護(hù)下面兩個(gè) Map 集合:
proxies(ConcurrentMap<Class<?>, ConcurrentMap<String, Object>>類型):該集合用來存儲(chǔ)服務(wù)接口的全部代理對(duì)象胆筒,其中第一層 Key 是服務(wù)接口的類型邮破,第二層 Key 是上面介紹的 KeyGenerator 為不同服務(wù)提供方生成的 Key,Value 是服務(wù)的代理對(duì)象仆救。
referredReferences(ConcurrentMap<String, ReferenceConfigBase<?>> 類型):該集合用來存儲(chǔ)已經(jīng)被處理的 ReferenceConfig 對(duì)象抒和。
回到 DubboBootstrap.referServices() 方法中,看一下其中與 ReferenceConfigCache 相關(guān)的邏輯彤蔽。
首先是 ReferenceConfigCache.getCache() 這個(gè)靜態(tài)方法摧莽,會(huì)在 CACHE_HOLDER 集合中添加一個(gè) Key 為“DEFAULT”的 ReferenceConfigCache 對(duì)象(使用默認(rèn)的 KeyGenerator 實(shí)現(xiàn)),它將作為默認(rèn)的 ReferenceConfigCache 對(duì)象顿痪。
接下來镊辕,無論是同步服務(wù)引用還是異步服務(wù)引用,都會(huì)調(diào)用 ReferenceConfigCache.get() 方法蚁袭,創(chuàng)建并緩存代理對(duì)象征懈。下面就是 ReferenceConfigCache.get() 方法的核心實(shí)現(xiàn):
public class ReferenceConfigCache {
public <T> T get(ReferenceConfigBase<T> referenceConfig) {
// 生成服務(wù)提供方對(duì)應(yīng)的Key
String key = generator.generateKey(referenceConfig);
// 獲取接口類型
Class<?> type = referenceConfig.getInterfaceClass();
// 獲取該接口對(duì)應(yīng)代理對(duì)象集合
proxies.computeIfAbsent(type, _t -> new ConcurrentHashMap<>());
ConcurrentMap<String, Object> proxiesOfType = proxies.get(type);
// 根據(jù)Key獲取服務(wù)提供方對(duì)應(yīng)的代理對(duì)象
proxiesOfType.computeIfAbsent(key, _k -> {
// 服務(wù)引用
Object proxy = referenceConfig.get();
// 將ReferenceConfig記錄到referredReferences集合
referredReferences.put(key, referenceConfig);
return proxy;
});
return (T) proxiesOfType.get(key);
}
}
ReferenceConfig
通過前面的介紹知道,ReferenceConfig 是服務(wù)引用的真正入口揩悄,其中會(huì)創(chuàng)建相關(guān)的代理對(duì)象卖哎。下面先來看 ReferenceConfig.get() 方法:
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
private transient volatile T ref;
public synchronized T get() {
// 檢測(cè)當(dāng)前ReferenceConfig狀態(tài)
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
// 檢測(cè) ref 是否為空,為空則通過 init 方法創(chuàng)建
if (ref == null) {// ref指向了服務(wù)的代理對(duì)象
// 啟動(dòng)初始化操作 init 方法主要用于處理配置删性,以及調(diào)用 createProxy 生成代理類
init();
}
return ref;
}
}
在 ReferenceConfig.init() 方法中亏娜,首先會(huì)對(duì)服務(wù)引用的配置進(jìn)行處理,以保證配置的正確性镇匀。
ReferenceConfig.init() 方法的核心邏輯是調(diào)用 createProxy() 方法照藻,調(diào)用之前會(huì)從配置中獲取 createProxy() 方法需要的參數(shù):
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
private transient volatile T ref;
private transient volatile boolean initialized;
private DubboBootstrap bootstrap;
public synchronized void init() {
//避免重復(fù)加載
if (initialized) {
return;
}
//獲取Dubbo核心容器
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
//進(jìn)行Dubbo核心配置的加載和檢查
bootstrap.initialize();
}
//在對(duì)象創(chuàng)建后在使用其他配置模塊配置對(duì)象之前檢查對(duì)象配置并重寫默認(rèn)配置
checkAndUpdateSubConfigs();
//檢查并生成sub配置和Local配置是否合法
checkStubAndLocal(interfaceClass);
//判斷對(duì)象是否有mock并生成mock信息
ConfigValidationUtils.checkMock(interfaceClass, this);
//保存對(duì)象屬性map信息
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, CONSUMER_SIDE);
//添加版本信息,包含dubbo版本汗侵,release版本幸缕,timestamp運(yùn)行時(shí)間戳和sid_key等信息
ReferenceConfigBase.appendRuntimeParameters(map);
//添加泛型 revision信息
if (!ProtocolUtils.isGeneric(generic)) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
//生成服務(wù)的代理對(duì)象群发,跟服務(wù)導(dǎo)出是一樣,通過代理對(duì)象來代理发乔,返回代理方法
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
//添加需要代理的方法
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
}
}
//添加interface名
map.put(INTERFACE_KEY, interfaceName);
//添加重試信息
AbstractConfig.appendParameters(map, getMetrics());
//檢查獲取并添加Application信息
AbstractConfig.appendParameters(map, getApplication());
//檢查獲取并添加Module信息
AbstractConfig.appendParameters(map, getModule());
// remove 'default.' prefix for configs from ConsumerConfig
// appendParameters(map, consumer, Constants.DEFAULT_KEY);
//檢查獲取并添加consumer信息
AbstractConfig.appendParameters(map, consumer);
AbstractConfig.appendParameters(map, this);
MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
if (metadataReportConfig != null && metadataReportConfig.isValid()) {
map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
}
//設(shè)置方法重試信息并收集方法異步調(diào)用信息
Map<String, AsyncMethodInfo> attributes = null;
if (CollectionUtils.isNotEmpty(getMethods())) {
attributes = new HashMap<>();
for (MethodConfig methodConfig : getMethods()) {
AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
String retryKey = methodConfig.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(methodConfig.getName() + ".retries", "0");
}
}
AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
if (asyncMethodInfo != null) {
// consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
attributes.put(methodConfig.getName(), asyncMethodInfo);
}
}
}
//獲取服務(wù)消費(fèi)者 ip 地址
String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
if (StringUtils.isEmpty(hostToRegistry)) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
}
//添加服務(wù)注冊(cè)信息
map.put(REGISTER_IP_KEY, hostToRegistry);
//將配置保存如服務(wù)元信息中
serviceMetadata.getAttachments().putAll(map);
//創(chuàng)建代理
ref = createProxy(map);
serviceMetadata.setTarget(ref);
serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
// 根據(jù)服務(wù)名熟妓,ReferenceConfig,代理類構(gòu)建 ConsumerModel栏尚,
// 并將 ConsumerModel 存入到 ApplicationModel 中
ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
consumerModel.setProxyObject(ref);
consumerModel.init(attributes);
initialized = true;
//檢查引入的服務(wù)是否可用
checkInvokerAvailable();
// dispatch a ReferenceConfigInitializedEvent since 2.7.4
dispatch(new ReferenceConfigInitializedEvent(this, invoker));
}
}
ReferenceConfig.createProxy() 方法中處理了多種服務(wù)引用的場(chǎng)景起愈,例如,直連單個(gè)/多個(gè)Provider译仗、單個(gè)/多個(gè)注冊(cè)中心抬虽。下面是 createProxy() 方法的核心流程,大致可以梳理出這么 5 個(gè)步驟:
1纵菌、根據(jù)傳入的參數(shù)集合判斷協(xié)議是否為 injvm 協(xié)議阐污,如果是,直接通過 InjvmProtocol 引用服務(wù)咱圆。
2笛辟、構(gòu)造 urls 集合。Dubbo 支持直連 Provider和依賴注冊(cè)中心兩種服務(wù)引用方式序苏。如果是直連服務(wù)的模式手幢,我們可以通過 url 參數(shù)指定一個(gè)或者多個(gè) Provider 地址,會(huì)被解析并填充到 urls 集合忱详;如果通過注冊(cè)中心的方式進(jìn)行服務(wù)引用围来,則會(huì)調(diào)用 AbstractInterfaceConfig.loadRegistries() 方法加載所有注冊(cè)中心。
3踱阿、如果 urls 集合中只記錄了一個(gè) URL管钳,通過 Protocol 適配器選擇合適的 Protocol 擴(kuò)展實(shí)現(xiàn)創(chuàng)建 Invoker 對(duì)象钦铁。如果是直連 Provider 的場(chǎng)景软舌,則 URL 為 dubbo 協(xié)議,這里就會(huì)使用 DubboProtocol 這個(gè)實(shí)現(xiàn)牛曹;如果依賴注冊(cè)中心佛点,則使用 RegistryProtocol 這個(gè)實(shí)現(xiàn)。
4黎比、如果 urls 集合中有多個(gè)注冊(cè)中心超营,則使用 ZoneAwareCluster 作為 Cluster 的默認(rèn)實(shí)現(xiàn),生成對(duì)應(yīng)的 Invoker 對(duì)象阅虫;如果 urls 集合中記錄的是多個(gè)直連服務(wù)的地址演闭,則使用 Cluster 適配器選擇合適的擴(kuò)展實(shí)現(xiàn)生成 Invoker 對(duì)象。
5颓帝、通過 ProxyFactory 適配器選擇合適的 ProxyFactory 擴(kuò)展實(shí)現(xiàn)米碰,將 Invoker 包裝成服務(wù)接口的代理對(duì)象窝革。
通過上面的流程我們可以看出createProxy() 方法中有兩個(gè)核心:
- 1、通過 Protocol 適配器選擇合適的 Protocol 擴(kuò)展實(shí)現(xiàn)創(chuàng)建 Invoker 對(duì)象吕座。
- 2虐译、通過 ProxyFactory 適配器選擇合適的 ProxyFactory 創(chuàng)建代理對(duì)象。
下面我們來看 createProxy() 方法的具體實(shí)現(xiàn):
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
private T createProxy(Map<String, String> map) {
//jvm本地引入
// 根據(jù)url的協(xié)議吴趴、scope以及injvm等參數(shù)檢測(cè)是否需要本地引用
if (shouldJvmRefer(map)) {
// 創(chuàng)建injvm協(xié)議的URL
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
// 本地引用invoker生成
// 通過Protocol的適配器選擇對(duì)應(yīng)的Protocol實(shí)現(xiàn)創(chuàng)建Invoker對(duì)象
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
urls.clear();
// 用戶配置url信息,表明用戶可能想進(jìn)行點(diǎn)對(duì)點(diǎn)調(diào)用
if (url != null && url.length() > 0) {
// 當(dāng)需要配置多個(gè) url 時(shí)漆诽,可用分號(hào)進(jìn)行分割,這里會(huì)進(jìn)行切分
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
// 設(shè)置接口全限定名為 url 路徑
url = url.setPath(interfaceName);
}
// 檢測(cè) url 協(xié)議是否為 registry锣枝,若是厢拭,表明用戶想使用指定的注冊(cè)中心
if (UrlUtils.isRegistry(url)) {
// 將 map 轉(zhuǎn)換為查詢字符串,并作為 refer 參數(shù)的值添加到 url 中
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
// 合并 url撇叁,移除服務(wù)提供者的一些配置(這些配置來源于用戶配置的 url 屬性)蚪腐,
// 比如線程池相關(guān)配置。并保留服務(wù)提供者的部分配置税朴,比如版本回季,group,時(shí)間戳等
// 最后將合并后的配置設(shè)置為 url 查詢字符串中正林。
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
// 從注冊(cè)中心的配置中組裝url信息
// if protocols not injvm checkRegistry
// 如果協(xié)議不是在jvm本地中
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
//檢查注冊(cè)中心是否存在(如果當(dāng)前配置不存在則獲取服務(wù)默認(rèn)配置),然后將他們轉(zhuǎn)換到RegistryConfig中
checkRegistry();
//通過注冊(cè)中心配置信息組裝URL
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
//添加monitor監(jiān)控信息
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 將map中的參數(shù)整理成refer參數(shù)泡一,添加到RegistryURL中
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
// 既不是服務(wù)直連,也沒有配置注冊(cè)中心觅廓,拋出異常
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
}
//單個(gè)注冊(cè)中心或服務(wù)提供者(服務(wù)直連鼻忠,下同)
if (urls.size() == 1) {
// 調(diào)用 RegistryProtocol 的 refer 構(gòu)建 Invoker 實(shí)例
// 在單注冊(cè)中心或是直連單個(gè)服務(wù)提供方的時(shí)候,通過Protocol的適配器選擇對(duì)應(yīng)的Protocol實(shí)現(xiàn)創(chuàng)建Invoker對(duì)象
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
//多個(gè)注冊(cè)中心或多個(gè)服務(wù)提供者杈绸,或者兩者混合
// 多注冊(cè)中心或是直連多個(gè)服務(wù)提供方的時(shí)候帖蔓,會(huì)根據(jù)每個(gè)URL創(chuàng)建Invoker對(duì)象
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
// 獲取所有的 Invoker
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {// 確定是多注冊(cè)中心,還是直連多個(gè)Provider
// 保存使用注冊(cè)中心的最新的URL信息
registryURL = url; // use last registry url
}
}
// 注冊(cè)中心URL存在
if (registryURL != null) { // registry url is available
// for multi-subscription scenario, use 'zone-aware' policy by default
// 多注冊(cè)中心的場(chǎng)景中瞳脓,會(huì)使用ZoneAwareCluster作為Cluster默認(rèn)實(shí)現(xiàn)塑娇,多注冊(cè)中心之間的選擇
// 對(duì)于對(duì)區(qū)域訂閱方案,默認(rèn)使用"zone-aware"區(qū)域
String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
// invoker 包裝順序: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
} else { // not a registry url, must be direct invoke.
// 如果不存在注冊(cè)中心連接劫侧,只能使用直連
//如果訂閱區(qū)域未設(shè)置埋酬,則設(shè)置為默認(rèn)區(qū)域"zone-aware"
String cluster = CollectionUtils.isNotEmpty(invokers)
? (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) : Cluster.DEFAULT)
: Cluster.DEFAULT;
// 創(chuàng)建 StaticDirectory 實(shí)例,并由 Cluster 對(duì)多個(gè) Invoker 進(jìn)行合并
invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
}
}
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
MetadataUtils.publishServiceDefinition(consumerURL);
// 通過ProxyFactory適配器選擇合適的ProxyFactory擴(kuò)展實(shí)現(xiàn)烧栋,創(chuàng)建代理對(duì)象
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
}
RegistryProtocol
在直連 Provider 的場(chǎng)景中写妥,會(huì)使用 DubboProtocol.refer() 方法完成服務(wù)引用,DubboProtocol.refer() 方法的具體實(shí)現(xiàn)在之前已經(jīng)詳細(xì)介紹過了审姓,這里我們重點(diǎn)來看存在注冊(cè)中心的場(chǎng)景中珍特,Dubbo Consumer 是如何通過 RegistryProtocol 完成服務(wù)引用的。
在 RegistryProtocol.refer() 方法中魔吐,會(huì)先根據(jù) URL 獲取注冊(cè)中心的 URL扎筒,再調(diào)用 doRefer 方法生成 Invoker呼猪,在 refer() 方法中會(huì)使用 MergeableCluster 處理多 group 引用的場(chǎng)景。
public class RegistryProtocol implements Protocol {
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 從URL中獲取注冊(cè)中心的URL
url = getRegistryUrl(url);
// 獲取Registry實(shí)例,這里的RegistryFactory對(duì)象是通過Dubbo SPI的自動(dòng)裝載機(jī)制注入的
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// 從注冊(cè)中心URL的refer參數(shù)中獲取此次服務(wù)引用的一些參數(shù),其中就包括group
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
// 如果此次可以引用多個(gè)group的服務(wù)器紧,則Cluser實(shí)現(xiàn)使用MergeableCluster實(shí)現(xiàn)捏浊,
// 這里的getMergeableCluster()方法就會(huì)通過Dubbo SPI方式找到MergeableCluster實(shí)例
return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url);
}
}
Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
// 如果沒有g(shù)roup參數(shù)或是只指定了一個(gè)group,則通過Cluster適配器選擇Cluster實(shí)現(xiàn)
return doRefer(cluster, registry, type, url);
}
}
在 doRefer() 方法中,首先會(huì)根據(jù) URL 初始化 RegistryDirectory 實(shí)例,然后生成 Subscribe URL 并進(jìn)行注冊(cè),之后會(huì)通過 Registry 訂閱服務(wù)壶唤,最后通過 Cluster 將多個(gè) Invoker 合并成一個(gè) Invoker 返回給上層,具體實(shí)現(xiàn)如下:
public class RegistryProtocol implements Protocol {
protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
return interceptInvoker(getInvoker(cluster, registry, type, url), url);
}
protected <T> ClusterInvoker<T> getInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 創(chuàng)建RegistryDirectory實(shí)例
DynamicDirectory<T> directory = createDirectory(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// 生成SubscribeUrl棕所,協(xié)議為consumer闸盔,具體的參數(shù)是RegistryURL中refer參數(shù)指定的參數(shù)
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
URL urlToRegistry = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
// 在SubscribeUrl中添加category=consumers和check=false參數(shù)
directory.setRegisteredConsumerUrl(urlToRegistry);
// 服務(wù)注冊(cè),在Zookeeper的consumers節(jié)點(diǎn)下琳省,添加該Consumer對(duì)應(yīng)的節(jié)點(diǎn)
registry.register(directory.getRegisteredConsumerUrl());
}
// 根據(jù)SubscribeUrl創(chuàng)建服務(wù)路由
directory.buildRouterChain(urlToRegistry);
// 訂閱服務(wù)迎吵,toSubscribeUrl()方法會(huì)將SubscribeUrl中category參數(shù)修改為"providers,configurators,routers"
// RegistryDirectory的subscribe()在前面詳細(xì)分析過了,其中會(huì)通過Registry訂閱服務(wù)针贬,同時(shí)還會(huì)添加相應(yīng)的監(jiān)聽器
directory.subscribe(toSubscribeUrl(urlToRegistry));
// 注冊(cè)中心中可能包含多個(gè)Provider击费,相應(yīng)地,也就有多個(gè)Invoker桦他,
// 這里通過前面選擇的Cluster將多個(gè)Invoker對(duì)象封裝成一個(gè)Invoker對(duì)象
return (ClusterInvoker<T>) cluster.join(directory);
}
protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url) {
// 根據(jù)URL中的registry.protocol.listener參數(shù)加載相應(yīng)的監(jiān)聽器實(shí)現(xiàn)
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
//引入了RegistryProtocol偵聽器蔫巩,以使用戶有機(jī)會(huì)自定義或更改導(dǎo)出并引用RegistryProtocol的行為。
// 例如:在滿足某些條件時(shí)立即重新導(dǎo)出或重新引用快压。
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, invoker);
}
return invoker;
}
}
總結(jié)
本文重點(diǎn)介紹了 Dubbo 服務(wù)引用的整個(gè)流程:
首先圆仔,我們介紹了 DubboBootStrap 這個(gè)入口門面類與服務(wù)引用相關(guān)的方法,其中涉及 referServices()蔫劣、reference() 等核心方法坪郭。
接下來,我們分析了 ReferenceConfigCache 這個(gè) ReferenceConfig 對(duì)象緩存拦宣,以及 ReferenceConfig 實(shí)現(xiàn)服務(wù)引用的核心流程截粗。
最后,我們還講解了 RegistryProtocol 從注冊(cè)中心引用服務(wù)的核心實(shí)現(xiàn)鸵隧。