1. 簡介
本篇文章皿伺,將開始分析 Dubbo 集群容錯方面的源碼。集群容錯源碼包含四個部分盒粮,分別是服務(wù)目錄 Directory心傀、服務(wù)路由 Router、集群 Cluster 和負(fù)載均衡 LoadBalance拆讯。這幾個部分的源碼邏輯相對比較獨立脂男,我們將會分四篇文章進(jìn)行分析。本篇文章作為集群容錯的開篇文章种呐,將和大家一起分析服務(wù)目錄相關(guān)的源碼宰翅。在進(jìn)行深入分析之前,我們先來了解一下服務(wù)目錄是什么爽室。服務(wù)目錄中存儲了一些和服務(wù)提供者有關(guān)的信息汁讼,通過服務(wù)目錄,服務(wù)消費者可獲取到服務(wù)提供者的信息阔墩,比如 ip嘿架、端口、服務(wù)協(xié)議等啸箫。通過這些信息耸彪,服務(wù)消費者就可通過 Netty 等客戶端進(jìn)行遠(yuǎn)程調(diào)用。在一個服務(wù)集群中忘苛,服務(wù)提供者數(shù)量并不是一成不變的蝉娜,如果集群中新增了一臺機器,相應(yīng)地在服務(wù)目錄中就要新增一條服務(wù)提供者記錄扎唾≌俅ǎ或者,如果服務(wù)提供者的配置修改了胸遇,服務(wù)目錄中的記錄也要做相應(yīng)的更新荧呐。如果這樣說,服務(wù)目錄和注冊中心的功能不就雷同了嗎纸镊?確實如此倍阐,這里這么說是為了方便大家理解。實際上服務(wù)目錄在獲取注冊中心的服務(wù)配置信息后薄腻,會為每條配置信息生成一個 Invoker 對象收捣,并把這個 Invoker 對象存儲起來,這個 Invoker 才是服務(wù)目錄最終持有的對象庵楷。Invoker 有什么用呢?看名字就知道了,這是一個具有遠(yuǎn)程調(diào)用功能的對象尽纽。講到這大家應(yīng)該知道了什么是服務(wù)目錄了咐蚯,它可以看做是 Invoker 集合,且這個集合中的元素會隨注冊中心的變化而進(jìn)行動態(tài)調(diào)整弄贿。
2. 繼承體系
服務(wù)目錄目前內(nèi)置的實現(xiàn)有兩個春锋,分別為 StaticDirectory 和 RegistryDirectory,它們均是 AbstractDirectory 的子類差凹。AbstractDirectory 實現(xiàn)了 Directory 接口期奔,這個接口包含了一個重要的方法定義,即 list(Invocation)危尿,用于列舉 Invoker呐萌。下面我們來看一下他們的繼承體系圖。
<img src="https://i.loli.net/2020/04/19/IOH7DwRMLErui9j.png" style="zoom: 50%;" />
如上谊娇,Directory 繼承自 Node 接口肺孤,Node 這個接口繼承者比較多,像 Registry济欢、Monitor赠堵、Invoker 等均繼承了這個接口。這個接口包含了一個獲取配置信息的方法 getUrl法褥,實現(xiàn)該接口的類可以向外提供配置信息茫叭。另外,大家注意看 RegistryDirectory 實現(xiàn)了 NotifyListener 接口半等,當(dāng)注冊中心節(jié)點信息發(fā)生變化后杂靶,RegistryDirectory 可以通過此接口方法得到變更信息,并根據(jù)變更信息動態(tài)調(diào)整內(nèi)部 Invoker 列表酱鸭。
3. 源碼分析
本章將分析 AbstractDirectory 和它兩個子類的源碼吗垮。AbstractDirectory 封裝了 Invoker 列舉流程,具體的列舉邏輯則由子類實現(xiàn)凹髓,這是典型的模板模式烁登。所以,接下來我們先來看一下 AbstractDirectory 的源碼蔚舀。
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
return doList(invocation);
}
protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
關(guān)于 AbstractDirectory 就分析這么多饵沧,下面開始分析子類的源碼。
3.1 StaticDirectory
StaticDirectory 即靜態(tài)服務(wù)目錄赌躺,顧名思義狼牺,它內(nèi)部存放的 Invoker 是不會變動的。所以礼患,理論上它和不可變 List 的功能很相似是钥。
public class StaticDirectory<T> extends AbstractDirectory<T> {
// Invoker 列表
private final List<Invoker<T>> invokers;
// 省略構(gòu)造方法
@Override
public Class<T> getInterface() {
// 獲取接口類
return invokers.get(0).getInterface();
}
@Override
public List<Invoker<T>> getAllInvokers() {
return invokers;
}
// 檢測服務(wù)目錄是否可用
@Override
public boolean isAvailable() {
if (isDestroyed()) {
return false;
}
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
// 只要有一個 Invoker 是可用的掠归,就認(rèn)為當(dāng)前目錄是可用的
return true;
}
}
return false;
}
@Override
public void destroy() {
if (isDestroyed()) {
return;
}
// 調(diào)用父類銷毀邏輯
super.destroy();
// 遍歷 Invoker 列表,并執(zhí)行相應(yīng)的銷毀邏輯
for (Invoker<T> invoker : invokers) {
invoker.destroy();
}
invokers.clear();
}
public void buildRouterChain() {
RouterChain<T> routerChain = RouterChain.buildChain(getUrl());
routerChain.setInvokers(invokers);
this.setRouterChain(routerChain);
}
@Override
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
List<Invoker<T>> finalInvokers = invokers;
if (routerChain != null) {
try {
//調(diào)用父類的路由信息查找對應(yīng)的 invoker
finalInvokers = routerChain.route(getConsumerUrl(), invocation);
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
return finalInvokers == null ? Collections.emptyList() : finalInvokers;
}
}
3.2 RegistryDirectory
RegistryDirectory 是一種動態(tài)服務(wù)目錄悄泥,實現(xiàn)了 NotifyListener 接口虏冻。當(dāng)注冊中心服務(wù)配置發(fā)生變化后,RegistryDirectory 可收到與當(dāng)前服務(wù)相關(guān)的變化弹囚。收到變更通知后厨相,RegistryDirectory 可根據(jù)配置變更信息刷新 Invoker 列表。
public interface NotifyListener {
/**
* Triggered when a service change notification is received.
* <p>
* Notify needs to support the contract: <br>
* 1. Always notifications on the service interface and the dimension of the data type. that is, won't notify part of the same type data belonging to one service. Users do not need to compare the results of the previous notification.<br>
* 2. The first notification at a subscription must be a full notification of all types of data of a service.<br>
* 3. At the time of change, different types of data are allowed to be notified separately, e.g.: providers, consumers, routers, overrides. It allows only one of these types to be notified, but the data of this type must be full, not incremental.<br>
* 4. If a data type is empty, need to notify a empty protocol with category parameter identification of url data.<br>
* 5. The order of notifications to be guaranteed by the notifications(That is, the implementation of the registry). Such as: single thread push, queue serialization, and version comparison.<br>
*
* @param urls The list of registered information , is always not empty. The meaning is the same as the return value of {@link org.apache.dubbo.registry.RegistryService#lookup(URL)}.
*/
void notify(List<URL> urls);
}
RegistryDirectory 中有幾個比較重要的邏輯:
- Invoker 的列舉
- 接收服務(wù)配置變更
- Invoker 列表的刷新
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
// 服務(wù)提供者關(guān)閉或禁用了服務(wù)鸥鹉,此時拋出 No provider 異常
}
if (multiGroup) {
return this.invokers == null ? Collections.emptyList() : this.invokers;
}
List<Invoker<T>> invokers = null;
try {
// Get invokers from cache, only runtime routers will be executed.
invokers = routerChain.route(getConsumerUrl(), invocation);
} catch (Throwable t) {
// 異常日志
}
return invokers == null ? Collections.emptyList() : invokers;
}
后面兩個邏輯可以查看上一節(jié)的方法調(diào)用棧蛮穿。
我們看一下 RD 的類結(jié)構(gòu),可以看到兩個listener 內(nèi)部類:
private static class ReferenceConfigurationListener extends AbstractConfiguratorListener {
private RegistryDirectory directory;
private URL url;
ReferenceConfigurationListener(RegistryDirectory directory, URL url) {
this.directory = directory;
this.url = url;
this.initWith(DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX);
}
void stop() {
this.stopListen(DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX);
}
@Override
protected void notifyOverrides() {
// to notify configurator/router changes
directory.refreshInvoker(Collections.emptyList());
}
}
private static class ConsumerConfigurationListener extends AbstractConfiguratorListener {
List<RegistryDirectory> listeners = new ArrayList<>();
ConsumerConfigurationListener() {
this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX);
}
void addNotifyListener(RegistryDirectory listener) {
this.listeners.add(listener);
}
void removeNotifyListener(RegistryDirectory listener) {
this.listeners.remove(listener);
}
@Override
protected void notifyOverrides() {
listeners.forEach(listener -> listener.refreshInvoker(Collections.emptyList()));
}
}
可以看到更新的操作最終都是調(diào)用了 refreshInvoker 方法 毁渗。
/**
*
* 將invokerURL列表轉(zhuǎn)換為調(diào)用程序映射践磅。轉(zhuǎn)換的規(guī)則如下:
* 1、如果URL已轉(zhuǎn)換為invoker祝蝠,它將不再被重新引用并直接從緩存中獲得音诈,請注意URL中的任何參數(shù)更改都將被重新引用。
* 2绎狭、如果輸入的調(diào)用者列表不是空的细溅,這意味著它是最新的調(diào)用者列表。
* 3儡嘶、如果傳入的invokerUrl列表為空喇聊,則意味著該規(guī)則只是一個覆蓋規(guī)則或路由規(guī)則,需要對其進(jìn)行重新對比蹦狂,以決定是否重新引用誓篱。
*
* @param invokerUrls 參數(shù)不能為空
*/
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");
// invokerUrls 僅有一個元素,且 url 協(xié)議頭為 empty凯楔,此時表示禁用所有服務(wù)
if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.invokers = Collections.emptyList();
routerChain.setInvokers(this.invokers);
//刪除所有的 invokers
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls == Collections.<URL>emptyList()) {
invokerUrls = new ArrayList<>();
}
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<>();
// 添加緩存 url 到 invokerUrls 中
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
// 將 url 轉(zhuǎn)成 Invoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
/**
* If the calculation is wrong, it is not processed.
*
* 1. The protocol configured by the client is inconsistent with the protocol of the server.
* eg: consumer protocol = dubbo, provider only has other protocol services(rest).
* 2. The registration center is not robust and pushes illegal specification data.
*
*/
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
.toString()));
return;
}
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
// pre-route and build cache, notice that route cache should build on original Invoker list.
// toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
try {
//刪除沒有使用的 invoker
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
refreshInvoker 方法首先會根據(jù)入?yún)?invokerUrls 的數(shù)量和協(xié)議頭判斷是否禁用所有的服務(wù)窜骄,如果禁用,則將 forbidden 設(shè)為 true摆屯,并銷毀所有的 Invoker邻遏。若不禁用,則將 url 轉(zhuǎn)成 Invoker虐骑,得到 <url, Invoker> 的映射關(guān)系准验。然后進(jìn)一步進(jìn)行轉(zhuǎn)換,得到 <methodName, Invoker 列表> 映射關(guān)系廷没。之后進(jìn)行多組 Invoker 合并操作糊饱,并將合并結(jié)果賦值給 methodInvokerMap。methodInvokerMap 變量在 doList 方法中會被用到颠黎,doList 會對該變量進(jìn)行讀操作另锋,在這里是寫操作滞项。當(dāng)新的 Invoker 列表生成后,還要一個重要的工作要做砰蠢,就是銷毀無用的 Invoker蓖扑,避免服務(wù)消費者調(diào)用已下線的服務(wù)的服務(wù)唉铜。
到此關(guān)于 Invoker 列表的刷新邏輯就分析了台舱,這里對整個過程進(jìn)行簡單總結(jié)。如下:
- 檢測入?yún)⑹欠駜H包含一個 url潭流,且 url 協(xié)議頭為 empty
- 若第一步檢測結(jié)果為 true竞惋,表示禁用所有服務(wù),此時銷毀所有的 Invoker
- 若第一步檢測結(jié)果為 false灰嫉,此時將入?yún)⑥D(zhuǎn)為 Invoker 列表
- 對上一步邏輯生成的結(jié)果進(jìn)行進(jìn)一步處理拆宛,得到方法名到 Invoker 的映射關(guān)系表
- 合并多組 Invoker
- 銷毀無用 Invoker
Invoker 的刷新邏輯還是比較復(fù)雜的,大家在看的過程中多寫點 demo 進(jìn)行調(diào)試讼撒,以加深理解浑厚。
看完了底層更新的刷新 invoker 操作,RegistryDirectory 是一個動態(tài)服務(wù)目錄根盒,會隨注冊中心配置的變化進(jìn)行動態(tài)調(diào)整钳幅。因此 RegistryDirectory 實現(xiàn)了 NotifyListener 接口,通過這個接口獲取注冊中心變更通知炎滞。下面我們來看一下具體的邏輯敢艰。
public synchronized void notify(List<URL> urls) {
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(this::judgeCategory));
// 定義三個集合,分別用于存放配置器 url册赛,路由 url钠导,服務(wù)提供者 url
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// providers
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
/**
* 3.x added for extend URL address
*/
ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
}
}
refreshOverrideAndInvoker(providerURLs);
}
private void refreshOverrideAndInvoker(List<URL> urls) {
// mock zookeeper://xxx?mock=return null
overrideDirectoryUrl();
refreshInvoker(urls);
}
private void overrideDirectoryUrl() {
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
List<Configurator> localConfigurators = this.configurators; // local reference
doOverrideUrl(localConfigurators);
List<Configurator> localAppDynamicConfigurators = CONSUMER_CONFIGURATION_LISTENER.getConfigurators(); // local reference
doOverrideUrl(localAppDynamicConfigurators);
if (serviceConfigurationListener != null) {
List<Configurator> localDynamicConfigurators = serviceConfigurationListener.getConfigurators(); // local reference
doOverrideUrl(localDynamicConfigurators);
}
}
private void doOverrideUrl(List<Configurator> configurators) {
if (CollectionUtils.isNotEmpty(configurators)) {
for (Configurator configurator : configurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
}
4.參考資料
本文參考于Dubbo官網(wǎng),詳情以官網(wǎng)最新文檔為準(zhǔn)森瘪。