文章有點(diǎn)長(zhǎng)兄渺,親侵浸,要慢慢看!
1. 概述
1.1 注冊(cè)中心作用
- 在Dubbo中础嫡,注冊(cè)中心為核心模塊指么,Dubbo通過(guò)注冊(cè)中心實(shí)現(xiàn)各個(gè)服務(wù)之間的注冊(cè)與發(fā)現(xiàn)等功能,而本次源碼的分析為registry模塊的api和zookeeper的實(shí)現(xiàn)榴鼎。
- 服務(wù)的提供者和消費(fèi)者都需要把自己注冊(cè)到注冊(cè)中心伯诬,提供者讓消費(fèi)者感知到服務(wù)存在,從而消費(fèi)者發(fā)起遠(yuǎn)程調(diào)用巫财,也讓服務(wù)治理中心感知到有服務(wù)提供者上線盗似;消費(fèi)者則是讓服務(wù)治理中心可以發(fā)現(xiàn)自己。
1.2 Zookeeper
- Zookeeper是一個(gè)提供分布式協(xié)調(diào)服務(wù)的開(kāi)源軟件平项,常用于解決分布式應(yīng)用中經(jīng)常遇到的一些數(shù)據(jù)管理問(wèn)題赫舒。Zookeeper功能非常強(qiáng)大,可以實(shí)現(xiàn)如分布式應(yīng)用配置管理闽瓢、統(tǒng)一命名服務(wù)接癌、狀態(tài)同步服務(wù)、集群管理等功能扣讼。關(guān)于Zookeeper缺猛,大家如果想了解可以關(guān)注一下自行去搜索一下。
1.3 registry模塊
-
整個(gè)registry下的模塊
dubbo-registry api是注冊(cè)中心所有的API和抽象類(lèi)實(shí)現(xiàn)
default是注冊(cè)中心的內(nèi)存實(shí)現(xiàn)
zookeeper、redis荔燎、nacos就是基于不同的組件的實(shí)現(xiàn)
multicast是通過(guò)廣播實(shí)現(xiàn)
1.4 注冊(cè)中心工作流程
這張圖相信只要是用過(guò)的都不陌生耻姥,掛在dubbo.io的官網(wǎng)掛了很久很久了。那么這個(gè)流程主要是說(shuō)了什么呢有咨?
- 0.是生產(chǎn)者(服務(wù)提供方)初始化咏闪,就好比你寫(xiě)了個(gè)服務(wù)實(shí)現(xiàn)然后啟動(dòng)起來(lái)。
- 1.是服務(wù)提供方向啟動(dòng)器起來(lái)過(guò)后摔吏,就會(huì)向注冊(cè)中心提交自己的服務(wù)信息
- 2.是消費(fèi)者(服務(wù)消費(fèi)方)向注冊(cè)中心提交訂閱請(qǐng)求鸽嫂。就是你寫(xiě)了一個(gè)業(yè)務(wù)需要用到一個(gè)生產(chǎn)者服務(wù),這個(gè)時(shí)候你需要提前打招呼征讲,我需要它据某,有它的消息的時(shí)候讓注冊(cè)中心告訴你他的信息。
- 3.這個(gè)時(shí)候當(dāng)服務(wù)提供者離開(kāi)或者是有新的服務(wù)提供者加入诗箍,注冊(cè)中心就會(huì)將變化的信息發(fā)送給消費(fèi)者癣籽。
- 4.消費(fèi)者知道了生產(chǎn)者的信息,要用的時(shí)候就直接調(diào)用滤祖,注意這里的調(diào)用是不經(jīng)過(guò)注冊(cè)中心的筷狼,而是直接同步的網(wǎng)絡(luò)調(diào)用。
2. dubbo-registry-api
- api層主要是注冊(cè)中心所有API的抽象實(shí)現(xiàn)類(lèi)匠童,并不是實(shí)際提供服務(wù)的組件埂材。
-
模塊關(guān)系圖
image -
類(lèi)關(guān)系圖
image -
目錄結(jié)構(gòu)
image
2.1 Registry的相關(guān)實(shí)現(xiàn)
- 由類(lèi)的關(guān)系圖科看到Registry的實(shí)現(xiàn)關(guān)系,我們接下來(lái)就分析下各個(gè)接口和這個(gè)類(lèi)
2.1.1 RegistryService
- 注冊(cè)中心模塊的服務(wù)接口:提供了注冊(cè)汤求、取消注冊(cè)俏险、訂閱、取消訂閱扬绪、查詢(xún)符合條件的已注冊(cè)數(shù)據(jù)竖独。
- 雖然官方有解釋這個(gè)的地方但是還是復(fù)制一下方法解釋如下,官方地址是:http://dubbo.apache.org/zh-cn/docs/dev/impls/registry.html
public interface RegistryService {
/**
* 注冊(cè)服務(wù).
* @param url 注冊(cè)信息挤牛,不允許為空莹痢,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
*/
void register(URL url);
/**
* 取消注冊(cè)服務(wù).
* @param url 注冊(cè)信息,不允許為空墓赴,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
*/
void unregister(URL url);
/**
* 訂閱服務(wù).
* @param listener 變更事件監(jiān)聽(tīng)器竞膳,不允許為空
*/
void subscribe(URL url, NotifyListener listener);
/**
* 取消訂閱服務(wù).
* @param url 訂閱條件,不允許為空竣蹦,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
* @param listener 變更事件監(jiān)聽(tīng)器顶猜,不允許為空
*/
void unsubscribe(URL url, NotifyListener listener);
/**
* 查詢(xún)注冊(cè)列表,與訂閱的推模式相對(duì)應(yīng)痘括,這里為拉模式,只返回一次結(jié)果。
*
* @see org.apache.dubbo.registry.NotifyListener#notify(List)
* @param url 查詢(xún)條件纲菌,不允許為空挠日,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
* @return 已注冊(cè)信息列表,可能為空翰舌,含義同{@link org.apache.dubbo.registry.NotifyListener#notify(List<URL>)}的參數(shù)嚣潜。
*/
List<URL> lookup(URL url);
}
2.1.2 Node (不在api中定義,在common模塊中)
- 節(jié)點(diǎn)的接口 里面聲明了一些關(guān)于節(jié)點(diǎn)的操作方法
public interface Node {
/**
* 獲取節(jié)點(diǎn)Url
*/
URL getUrl();
/**
* 是否可用
*/
boolean isAvailable();
/**
* 銷(xiāo)毀節(jié)點(diǎn)
*/
void destroy();
}
2.1.2 Registry
- 這個(gè)接口其實(shí)就是把節(jié)點(diǎn)以及注冊(cè)中心服務(wù)的方法放在了一起
public interface Registry extends Node, RegistryService {
}
2.1.3 AbstractRegistry
- AbstractRegistry實(shí)現(xiàn)了Registry接口,為減輕注冊(cè)中心的壓力椅贱,在該類(lèi)中實(shí)現(xiàn)了把本地URL緩存到property文件中的機(jī)制懂算,并且實(shí)現(xiàn)了注冊(cè)中心的注冊(cè)、訂閱等方法庇麦。
- 看下類(lèi)圖
- 首先是抽象類(lèi)的屬性
// url地址分隔符计技,用于文件緩存,服務(wù)提供程序url分隔
private static final char URL_SEPARATOR = ' ';
// URL地址分隔的正則表達(dá)式山橄,用于分析文件緩存中的服務(wù)提供程序URL列表
private static final String URL_SPLIT = "\\s+";
// 日志輸出
protected final Logger logger = LoggerFactory.getLogger(getClass());
// 本地磁盤(pán)緩存垮媒,其中的特殊key為registies是記錄注冊(cè)表中心列表,其他是服務(wù)提供者的李彪
private final Properties properties = new Properties();
// 文件緩存寫(xiě)入執(zhí)行器 提供一個(gè)線程的線程池
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
// 是否同步保存文件標(biāo)志
private final boolean syncSaveFile;
// 這個(gè)是緩存的版本號(hào)
private final AtomicLong lastCacheChanged = new AtomicLong();
// 這個(gè)是已經(jīng)注冊(cè)的URL集合航棱,不僅僅是服務(wù)提供者的睡雇,也可以是服務(wù)消費(fèi)者的
private final Set<URL> registered = new ConcurrentHashSet<URL>();
// 已訂閱的url 值為url的監(jiān)聽(tīng)器集合
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// 消費(fèi)者或服務(wù)治理服務(wù)獲取注冊(cè)信息后的緩存對(duì)象
// 內(nèi)存中服務(wù)器緩存的notified對(duì)象是ConcurrentHashMap里面嵌套了一個(gè)Map,
// 外層Map的Key是消費(fèi)者的URL饮醇,
// 內(nèi)層的Map的key是分類(lèi)它抱,包括provider,consumer朴艰,routes抗愁,configurators四種,
// value則對(duì)應(yīng)服務(wù)列表呵晚,沒(méi)有服務(wù)提供者提供服務(wù)的URL蜘腌,會(huì)以一個(gè)特別的empty://前綴開(kāi)頭
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
// 注冊(cè)中心的URL
private URL registryUrl;
// 本地磁盤(pán)緩存文件保存的是注冊(cè)中心的數(shù)據(jù)
private File file;
2.1.3.1 構(gòu)造方法
public AbstractRegistry(URL url) {
// 設(shè)置注冊(cè)中心的地址URL
setUrl(url);
// 從URL參數(shù)中獲取是否同步保存的狀態(tài),URL中如果不包含饵隙,那就設(shè)置默認(rèn)值為false
syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
// 獲取文件路徑
String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
// 開(kāi)始讀入文件
File file = null;
// 不存在就拋出異常
if (ConfigUtils.isNotEmpty(filename)) {
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
// 把文件對(duì)象放到屬性上
this.file = file;
// 加載文件中的參數(shù)放入Properties撮珠,Properties繼承HashTable蛮放。
loadProperties();
// 通知監(jiān)聽(tīng)器 URL變化 見(jiàn)下面notify的源碼
notify(url.getBackupUrls());
}
private void loadProperties() {
if (file != null && file.exists()) {
InputStream in = null;
try {
// 把文件中的key-value讀進(jìn)來(lái)
in = new FileInputStream(file);
// Properties是一個(gè)繼承HashTable的類(lèi).
// 這個(gè)地方就是按行讀入涧卵,util里面的類(lèi)锅棕,里面調(diào)用了一個(gè)load0 方法會(huì)把key和value做分割然后放入Properties中袭景,汗菜。
properties.load(in);
if (logger.isInfoEnabled()) {
logger.info("Load registry store file " + file + ", data: " + properties);
}
} catch (Throwable e) {
logger.warn("Failed to load registry store file " + file, e);
} finally {
// 關(guān)閉流
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
}
2.1.3.2 lookup
- 獲得消費(fèi)者url訂閱的服務(wù)URL列表
@Override
public List<URL> lookup(URL url) {
// 查找的結(jié)果數(shù)據(jù)
List<URL> result = new ArrayList<URL>();
// 獲取注冊(cè)信息中的分類(lèi)服務(wù)列表信息
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
// 如果該消費(fèi)者訂閱了服務(wù)
if (notifiedUrls != null && notifiedUrls.size() > 0) {
for (List<URL> urls : notifiedUrls.values()) {
for (URL u : urls) {
// 把非空的加入結(jié)果集中
if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
result.add(u);
}
}
}
} else {
// 如果沒(méi)有訂閱服務(wù)
// 使用原子類(lèi)以保證在獲取注冊(cè)在注冊(cè)中心的服務(wù)url時(shí)能夠保證是最新的url集合
final AtomicReference<List<URL>> reference = new AtomicReference<List<URL>>();
// 通知監(jiān)聽(tīng)器地淀。當(dāng)收到服務(wù)變更通知時(shí)觸發(fā)
NotifyListener listener = new NotifyListener() {
@Override
public void notify(List<URL> urls) {
reference.set(urls);
}
};
// 添加這個(gè)服務(wù)的監(jiān)聽(tīng)器
subscribe(url, listener); // Subscribe logic guarantees the first notify to return
List<URL> urls = reference.get();
// 然后把非空結(jié)果放入結(jié)果集中
if (urls != null && !urls.isEmpty()) {
for (URL u : urls) {
if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
result.add(u);
}
}
}
}
return result;
}
2.1.3.3 register and unregister
- url注冊(cè)和取消注冊(cè)代碼很簡(jiǎn)單梧疲,就是向registered中add或者remove url
@Override
public void register(URL url) {
if (url == null) {
throw new IllegalArgumentException("register url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Register: " + url);
}
registered.add(url);
}
@Override
public void unregister(URL url) {
if (url == null) {
throw new IllegalArgumentException("unregister url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unregister: " + url);
}
registered.remove(url);
}
2.1.3.4 notify
protected void notify(List<URL> urls) {
if (urls == null || urls.isEmpty()) return;
// 遍歷已訂閱的URL
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL url = entry.getKey();
if (!UrlUtils.isMatch(url, urls.get(0))) {
continue;
}
// 通知URL對(duì)應(yīng)的監(jiān)聽(tīng)器
Set<NotifyListener> listeners = entry.getValue();
if (listeners != null) {
for (NotifyListener listener : listeners) {
try {
// 通知監(jiān)聽(tīng)器擦耀,看下方代碼注釋
notify(url, listener, filterEmpty(url, urls));
} catch (Throwable t) {
logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
}
}
}
}
}
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((urls == null || urls.isEmpty())
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
// 將url進(jìn)行分類(lèi)
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
// 根據(jù)不同的category分別放到不同List中處理 以category的值做分類(lèi)
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}
// 沒(méi)有分類(lèi)結(jié)果就直接return
if (result.size() == 0) {
return;
}
// 獲得消費(fèi)者被通知的url的Map
Map<String, List<URL>> categoryNotified = notified.get(url);
// 如果沒(méi)有 就創(chuàng)建一個(gè)
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
// 創(chuàng)建過(guò)后再獲取
categoryNotified = notified.get(url);
}
// 發(fā)送URL變化給監(jiān)聽(tīng)器
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
// 把分類(lèi)標(biāo)實(shí)和分類(lèi)后的列表放入notified的value中 覆蓋到 `notified`
// 當(dāng)分類(lèi)的數(shù)據(jù)為空饼酿,依然有urls 榕酒。不過(guò)其中的urls[0].protocol是empty胚膊,以此來(lái)處理所有服務(wù)提供者為空時(shí)的情況。
categoryNotified.put(category, categoryList);
// 保存一份到文件緩存中 中間做的 就是解析出參數(shù)然后同步或者異步保存到文件中
saveProperties(url);
// 通知監(jiān)聽(tīng)器
listener.notify(categoryList);
}
}
2.1.3.5 subscribe and unsubscribe
- 注冊(cè)中心服務(wù)實(shí)現(xiàn)的訂閱和取消訂閱
@Override
public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}
// 獲得url已經(jīng)訂閱的服務(wù)的監(jiān)聽(tīng)器集合
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners == null) {
subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
listeners = subscribed.get(url);
}
// 然后把listener添加到上
listeners.add(listener);
}
@Override
public void unsubscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("unsubscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("unsubscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unsubscribe: " + url);
}
// 獲得url已經(jīng)訂閱的服務(wù)的監(jiān)聽(tīng)器集合
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners != null) {
// 然后移除
listeners.remove(listener);
}
}
2.1.3.6 recover
- 注冊(cè)中心的連接斷開(kāi)后恢復(fù)時(shí)調(diào)用的方法想鹰,里面其實(shí)就是注冊(cè)和訂閱
protected void recover() throws Exception {
// register
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
for (URL url : recoverRegistered) {
//調(diào)用的上面的注冊(cè)方法
register(url);
}
}
// subscribe
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
// 調(diào)用上面的訂閱方法
subscribe(url, listener);
}
}
}
}
2.1.3.7 destory
- 這個(gè)方法是在進(jìn)程關(guān)閉時(shí)紊婉,去取消注冊(cè)和訂閱,實(shí)際上就是調(diào)用unregister和unsubscribe
@Override
public void destroy() {
if (logger.isInfoEnabled()) {
logger.info("Destroy registry:" + getUrl());
}
// 獲取以注冊(cè)的URL
Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
if (!destroyRegistered.isEmpty()) {
for (URL url : new HashSet<URL>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
try {
// 取消注冊(cè)
unregister(url);
if (logger.isInfoEnabled()) {
logger.info("Destroy unregister url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
// 獲取已訂閱的URL以及監(jiān)聽(tīng)器
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
try {
// 去取消訂閱
unsubscribe(url, listener);
if (logger.isInfoEnabled()) {
logger.info("Destroy unsubscribe url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
}
2.1.4 FailbackRegistry
- 這個(gè)類(lèi)其實(shí)是為AbstractRegistry增加了失敗重試的機(jī)制作為抽象能力辑舷,后面不同的注冊(cè)中心具體實(shí)現(xiàn)繼承了這個(gè)類(lèi)就可以直接使用這個(gè)能力喻犁。
-
類(lèi)圖
image - 常規(guī)套路 類(lèi)的屬性
// Scheduled executor service
// 經(jīng)過(guò)固定時(shí)間后(默認(rèn)是5s),調(diào)用FailbackRegistry#retry方法
private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));
// Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
// 失敗重試計(jì)時(shí)器何缓,定期檢查是否有失敗請(qǐng)求肢础,如果有,則無(wú)限制重試
private final ScheduledFuture<?> retryFuture;
// 注冊(cè)失敗的集合
private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
// 取消注冊(cè)失敗的集合
private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
// 發(fā)起訂閱失敗的監(jiān)聽(tīng)器集合
private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// 取消訂閱失敗的監(jiān)聽(tīng)器集合
private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// 通知失敗的URL集合
private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
/**
* The time in milliseconds the retryExecutor will wait
* RetryExecutor將等待的時(shí)間(毫秒)
*/
private final int retryPeriod;
2.1.4.1 構(gòu)造方法
public FailbackRegistry(URL url) {
super(url);
// 獲取重試的時(shí)間 如果沒(méi)有就設(shè)置成默認(rèn)的 DEFAULT_REGISTRY_RETRY_PERIOD = 5 * 1000;
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
// 設(shè)置重試任務(wù) 里面就是調(diào)用retry方法 見(jiàn)下方retry方法的解析
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// Check and connect to the registry
try {
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
2.1.4.2 register and unregister 碌廓、 subscribe and unsubscribe
- 注冊(cè)和取消注冊(cè)
@Override
public void register(URL url) {
// 緩存等注冊(cè)操作 見(jiàn)AbstractRegistry
super.register(url);
// 在失敗集合中將這個(gè)移除
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// Sending a registration request to the server side
// 向服務(wù)器發(fā)送注冊(cè)請(qǐng)求
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
// 開(kāi)啟了啟動(dòng)時(shí)就檢測(cè)传轰,直接拋異常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
// 記錄失敗的url
failedRegistered.add(url);
}
}
- 后面的unregister方法,subscribe unsubscribe都類(lèi)似 可以看下源碼氓皱, 中間的doXXXX這幾個(gè)方法都是abstract方法等著后面不同的服務(wù)來(lái)實(shí)現(xiàn)路召。
2.1.4.3 notify
- notify則與上面的 四個(gè)方法不同,它是默認(rèn)調(diào)用的父類(lèi)AbstractRegistry的notify方法
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
doNotify(url, listener, urls);
} catch (Exception t) {
// Record a failed registration request to a failed list, retry regularly
// 將失敗的注冊(cè)請(qǐng)求記錄到失敗列表波材,定期重試
Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
if (listeners == null) {
failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
listeners = failedNotified.get(url);
}
listeners.put(listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
// 注意 這個(gè)是調(diào)用父類(lèi)的
super.notify(url, listener, urls);
}
2.1.4.4 recover
- recover方法也區(qū)別于AbstractRegistry股淡,他是直接添加到失敗重試的集合中,讓定時(shí)任務(wù)自己去重新注冊(cè)和訂閱
@Override
protected void recover() throws Exception {
// register
// 把已注冊(cè)的添加到失敗重試的列表中
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
for (URL url : recoverRegistered) {
// 添加到失敗重試注冊(cè)列表
failedRegistered.add(url);
}
}
// subscribe
// 把已訂閱的添加到失敗重試的列表中
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
// 添加到失敗重試訂閱列表
addFailedSubscribed(url, listener);
}
}
}
}
2.1.4.5 retry
- 重試的方法廷区,其實(shí)也比較簡(jiǎn)單唯灵,就是把集合中的數(shù)據(jù)拿出來(lái),該做注冊(cè)做注冊(cè)隙轻,該訂閱就訂閱埠帕,成功了就從失敗重試集合中移除,失敗了就等下次再來(lái)玖绿。簡(jiǎn)單看下對(duì)注冊(cè)列表的代碼就明白了敛瓷。其他代碼都是類(lèi)似的
// Retry the failed actions
protected void retry() {
if (!failedRegistered.isEmpty()) {
// 不為空就把他URL拿到
Set<URL> failed = new HashSet<URL>(failedRegistered);
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry register " + failed);
}
try {
// 然后遍歷它 做對(duì)應(yīng)的操作
for (URL url : failed) {
try {
// 做注冊(cè)操作
doRegister(url);
// 移除失敗集合中URL
failedRegistered.remove(url);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
......
}
2.1.4.6 destroy
@Override
public void destroy() {
// 調(diào)用父類(lèi)的方法
super.destroy();
try {
// 取消執(zhí)行任務(wù)
retryFuture.cancel(true);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
ExecutorUtil.gracefulShutdown(retryExecutor, retryPeriod);
}
2.1.4.7 待實(shí)現(xiàn)的方法
- 這些方法都是交給不同的服務(wù)提供組件去自己實(shí)現(xiàn)的,后面的Zookeeper就針對(duì)這些方法做了實(shí)現(xiàn)斑匪。
// ==== Template method ====
protected abstract void doRegister(URL url);
protected abstract void doUnregister(URL url);
protected abstract void doSubscribe(URL url, NotifyListener listener);
protected abstract void doUnsubscribe(URL url, NotifyListener listener);
2.2 Registry的相關(guān)Factory的實(shí)現(xiàn)
- 注冊(cè)中心的工廠類(lèi)呐籽,顧名思義就是生產(chǎn)上面的Registry的實(shí)現(xiàn)。
2.2.1 RegistryFactory
@SPI("dubbo")
public interface RegistryFactory {
// 這個(gè)接口方法實(shí)際上就是獲取對(duì)注冊(cè)中心的連接蚀瘸,然后返回不同注冊(cè)中心的不同Regsitry的實(shí)現(xiàn)對(duì)象狡蝶,
// 注解就是根據(jù)設(shè)置不同的protocol(協(xié)議)來(lái)選擇不同的實(shí)現(xiàn),
// 比如Zookeeper贮勃,就會(huì)去使用Zookeeper的ZookeeperRegistryFactory,具體怎么選擇贪惹,后續(xù)博客再寫(xiě)
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
2.2.2 AbstractRegistryFactory
- 類(lèi)圖
- 這個(gè)抽象類(lèi)還是相對(duì)來(lái)說(shuō)比較簡(jiǎn)答的。咱們看一下他的類(lèi)屬性
// 注冊(cè)中心獲取過(guò)程的鎖
private static final ReentrantLock LOCK = new ReentrantLock();
// 注冊(cè)中心Map<注冊(cè)地址寂嘉,registry> 一個(gè)類(lèi)的緩存奏瞬。
private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();
2.2.2.1 getRegistryies
- 獲取所有的registry對(duì)象
/**
* Get all registries
* 獲取所有的registry對(duì)象
* @return all registries
*/
public static Collection<Registry> getRegistries() {
//得到一個(gè)集合的鏡像,它的返回結(jié)果不可直接被改變,否則會(huì)報(bào)錯(cuò)
return Collections.unmodifiableCollection(REGISTRIES.values());
}
2.2.2.2 destoryAll
- 關(guān)閉所有已創(chuàng)建的registry對(duì)象
/**
* Close all created registries
* 關(guān)閉所有已創(chuàng)建的registry對(duì)象
*/
// TODO: 2017/8/30 to move somewhere else better
public static void destroyAll() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
}
//對(duì)注冊(cè)中心關(guān)閉操作加鎖
LOCK.lock();
try {
// 遍歷所有的注冊(cè)中心的操作類(lèi)枫绅,然后調(diào)用destroy來(lái)銷(xiāo)毀。
for (Registry registry : getRegistries()) {
try {
registry.destroy();
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
// 然后清除集合
REGISTRIES.clear();
} finally {
// Release the lock
LOCK.unlock();
}
}
2.2.2.3 getRegistry
- 獲取對(duì)應(yīng)注冊(cè)中心的操作實(shí)現(xiàn)類(lèi)
@Override
public Registry getRegistry(URL url) {
// 通過(guò)URL來(lái)獲取到注冊(cè)中心的類(lèi)型
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceStringWithoutResolving();
// 鎖定注冊(cè)中心訪問(wèn)進(jìn)程以確保注冊(cè)表的單個(gè)實(shí)例
LOCK.lock();
try {
// 通過(guò)key來(lái)拿到對(duì)應(yīng)的注冊(cè)中心的操作類(lèi)
Registry registry = REGISTRIES.get(key);
// 有就直接返回
if (registry != null) {
return registry;
}
// 沒(méi)有就創(chuàng)建對(duì)應(yīng)的注冊(cè)中心操作類(lèi)
registry = createRegistry(url);
// 如果創(chuàng)建失敗丝格,報(bào)錯(cuò)
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
// 創(chuàng)建成功就放到結(jié)合中
REGISTRIES.put(key, registry);
// 然后再返回
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
2.2.2.4 createRegistry
- 抽象方法撑瞧,沒(méi)有實(shí)現(xiàn)棵譬,需要不同的服務(wù)提供工廠對(duì)象來(lái)自己實(shí)現(xiàn)對(duì)應(yīng)的創(chuàng)建方法
protected abstract Registry createRegistry(URL url);
2.3 Consumer And Provider InvokerWrapper
- 實(shí)現(xiàn)Invoker接口显蝌,主要包裝消費(fèi)者和服務(wù)提供者的屬性
- 主要為QOS提供服務(wù) 官方地址:http://dubbo.apache.org/zh-cn/docs/user/references/qos.html
- 什么是QOS? qos-server,是dubbo在線運(yùn)維命令服務(wù)订咸,默認(rèn)端口號(hào)為:2222曼尊,用于接口命令,運(yùn)維dubbo脏嚷。
2.3.1 ConsumerInvokerWrapper
// invoker對(duì)象
private Invoker<T> invoker;
// 原始的URL地址
private URL originUrl;
// 注冊(cè)中心的地址
private URL registryUrl;
// 消費(fèi)者的地址
private URL consumerUrl;
// 注冊(cè)中心的Directory
private RegistryDirectory registryDirectory;
2.3.2 ProviderInvokerWrapper
// invoker對(duì)象
private Invoker<T> invoker;
// 原始的URL地址
private URL originUrl;
// 注冊(cè)中心的地址
private URL registryUrl;
// 提供者的地址
private URL providerUrl;
// 是否注冊(cè)
private volatile boolean isReg;
2.4 ProviderConsumerRegTable
- 這個(gè)類(lèi)是消費(fèi)者和服務(wù)提供者的注冊(cè)表操作骆撇,也是用在QOS中。
- 主要類(lèi)屬性
// 服務(wù)提供者的Invokers集合
public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();
// 服務(wù)消費(fèi)者的Invokers集合
public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();
- 類(lèi)圖
- 里面就是一些對(duì)類(lèi)屬性集合的操作父叙,主要是QOS會(huì)用神郊。
2.5 RegistryStatusChecker
- 這個(gè)類(lèi)就一個(gè)方法 check方法,主要是做狀態(tài)校驗(yàn)趾唱。做注冊(cè)中心相關(guān)的狀態(tài)檢查校驗(yàn)
- 類(lèi)上面的@Activate 注解 使這個(gè)類(lèi)自動(dòng)被激活加載涌乳。
@Override
public Status check() {
// 獲取所有的注冊(cè)中心的對(duì)象
Collection<Registry> registries = AbstractRegistryFactory.getRegistries();
if (registries.isEmpty()) {
return new Status(Status.Level.UNKNOWN);
}
Status.Level level = Status.Level.OK;
StringBuilder buf = new StringBuilder();
// 遍歷
for (Registry registry : registries) {
if (buf.length() > 0) {
buf.append(",");
}
// 把地址拼接到一起
buf.append(registry.getUrl().getAddress());
// 如果注冊(cè)中心的某個(gè)節(jié)點(diǎn)不可用就把狀態(tài)設(shè)置成error
if (!registry.isAvailable()) {
level = Status.Level.ERROR;
buf.append("(disconnected)");
} else {
buf.append("(connected)");
}
}
// 然后返回價(jià)差的結(jié)果對(duì)象
return new Status(level, buf.toString());
}
2.5 RegistryDirectory and RegistryProtocol
- 這兩個(gè)類(lèi)后續(xù)再說(shuō)。牽涉到其他地方的一些東西甜癞。
3. dubbo-registry-zookeeper
不知道大家看到這里有沒(méi)有忘記這張圖
-
模塊關(guān)系圖
image 所有的注冊(cè)中心實(shí)現(xiàn)FailbackRegistry 和 AbstractRegistryFactory來(lái)實(shí)現(xiàn)對(duì)應(yīng)的功能夕晓。
那么Zookeeper也是如此。Zookeeper主要就只有兩個(gè)類(lèi)
- 是ZookeeperRegistry
- 是ZookeeperRegistryFactory來(lái)實(shí)現(xiàn)對(duì)應(yīng)的功能
3.1 Dubbo在Zookeeper中的數(shù)據(jù)結(jié)構(gòu)
- dubbo在使用Zookeeper時(shí)只會(huì)創(chuàng)建永久節(jié)點(diǎn)和臨時(shí)節(jié)點(diǎn)悠咱。
- 根節(jié)點(diǎn)是注冊(cè)中心分組蒸辆,下面是很多的服務(wù)接口,分組來(lái)自用戶(hù)配置的<dubbo:registry>中的group屬性析既,默認(rèn)是/dubbo躬贡。
- 服務(wù)接口下是如圖所示的四種服務(wù)目錄,都是持久節(jié)點(diǎn)眼坏。
- 服務(wù)提供者路徑/dubbo/service/providers (這里方便標(biāo)識(shí)全部都用service替代接口com.demo.DemoService)拂玻,下面包含接口的多個(gè)服務(wù)提供者者的URL元數(shù)據(jù)信息。
- 服務(wù)提供者路徑/dubbo/service/consumers空骚,下面包含接口有多個(gè)消費(fèi)者的URL元數(shù)據(jù)信息
- 服務(wù)提供者路徑/dubbo/service/routers纺讲,下面包含多個(gè)用于消費(fèi)者路由策略URL元數(shù)據(jù)信息。
- 服務(wù)提供者路徑/dubbo/service/configurators囤屹,下面包含多個(gè)用于服務(wù)提供者動(dòng)態(tài)配置的URL元數(shù)據(jù)信息熬甚。
在Dubbo框架啟動(dòng)時(shí)會(huì)根據(jù)我們所寫(xiě)的服務(wù)相關(guān)的配置在注冊(cè)中心創(chuàng)建4個(gè)目錄,在providers和consumers目錄中分別存儲(chǔ)服務(wù)提供方肋坚、消費(fèi)方元數(shù)據(jù)信息乡括。包括:IP肃廓、端口、權(quán)重和應(yīng)用名等數(shù)據(jù)诲泌。
- 目錄包含信息
目錄名稱(chēng) | 存儲(chǔ)值樣例 |
---|---|
/dubbo/service/providers | dubbo://192.168.1.1.20880/com.demo.DemoService?key=value&... |
/dubbo/service/consumers | dubbo://192.168.1.1.5002/com.demo.DemoService?key=value&... |
/dubbo/service/routers | condition://0.0.0.0/com.demo.DemoService?category=routers&key=value&... |
/dubbo/service/configurators | override://0.0.0.0/com.demo.DemoService?category=configurators&key=value&... |
- 在Dubbo中啟用注冊(cè)中心:
<beans>
<!-- 適用于Zookeeper一個(gè)集群有多個(gè)節(jié)點(diǎn)盲赊,多個(gè)IP和端口用逗號(hào)分隔-->
<dubbo:registry protocol="zookeeper" address="ip:port;ip:port">
<!-- 適用于Zookeeper多個(gè)集群有多個(gè)節(jié)點(diǎn),多個(gè)IP和端口用豎線分隔-->
<dubbo:registry protocol="zookeeper" address="ip:port|ip:port">
</beans>
3.2 ZookeeperRegistry
- 慣例給大家一張類(lèi)圖
- 然后看下屬性
// Zookeeper的默認(rèn)端口號(hào)
private final static int DEFAULT_ZOOKEEPER_PORT = 2181;
// Dubbo在Zookeeper中注冊(cè)的默認(rèn)根節(jié)點(diǎn)
private final static String DEFAULT_ROOT = "dubbo";
// 組的名稱(chēng) 或者說(shuō)是 根節(jié)點(diǎn)的值
private final String root;
// 服務(wù)集合
private final Set<String> anyServices = new ConcurrentHashSet<String>();
// zk節(jié)點(diǎn)的監(jiān)聽(tīng)器
// Dubbo底層封裝了2套Zookeeper API敷扫,所以通過(guò)ChildListener抽象了監(jiān)聽(tīng)器哀蘑,
// 但是在實(shí)際調(diào)用時(shí)會(huì)通過(guò)createTargetChildListener轉(zhuǎn)為對(duì)應(yīng)框架的監(jiān)聽(tīng)器實(shí)現(xiàn)
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
// zk的客戶(hù)端, 對(duì)節(jié)點(diǎn)進(jìn)行一些刪改等操作
private final ZookeeperClient zkClient;
- 關(guān)于Dubbo中的Zookeeper客戶(hù)端葵第,Dubbo實(shí)現(xiàn)了一個(gè)統(tǒng)一的Client API绘迁,但是用兩種不同的Zookeeper開(kāi)源庫(kù)來(lái)實(shí)現(xiàn),一個(gè)是Apache的Curator卒密,另一個(gè)是zkClient 如果用戶(hù)不設(shè)置缀台,則默認(rèn)使用Curator實(shí)現(xiàn)。
3.2.1 構(gòu)造方法
- 構(gòu)造方法比較簡(jiǎn)單哮奇,就是獲取組名膛腐,連接Zookeeper
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
// 調(diào)用FailbackRegistry的構(gòu)造方法
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 獲取組名稱(chēng) 并復(fù)制給root
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 連接上Zookeeper
zkClient = zookeeperTransporter.connect(url);
// 添加連接狀態(tài)監(jiān)聽(tīng)器
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
// 重連恢復(fù)
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
3.2.2 服務(wù)注冊(cè)發(fā)布與服務(wù)下線取消注冊(cè)
- 也比較簡(jiǎn)單就是創(chuàng)建節(jié)點(diǎn)和刪除節(jié)點(diǎn)
// 發(fā)布
@Override
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
// 取消發(fā)布
@Override
protected void doUnregister(URL url) {
try {
zkClient.delete(toUrlPath(url));
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
3.2.3 服務(wù)訂閱和取消訂閱
- 訂閱有pull和push兩種方式,一種是客戶(hù)端定時(shí)輪詢(xún)注冊(cè)中心拉去配置鼎俘,另一種是注冊(cè)中心主動(dòng)推送數(shù)據(jù)給客戶(hù)端哲身。Dubbo目前采用的是第一次啟動(dòng)拉取然后接受事件再重新拉取。
- 再暴露服務(wù)的時(shí)候而芥,服務(wù)端會(huì)訂閱configurators監(jiān)聽(tīng)動(dòng)態(tài)配置律罢,消費(fèi)端啟動(dòng)的時(shí)候回訂閱providers、routers棍丐、configurators類(lèi)接收這三者的變更通知误辑。
- Dubbo在實(shí)現(xiàn)Zookeeper注冊(cè)中心的時(shí)候是,客戶(hù)端第一次連接獲取全量數(shù)據(jù)歌逢,然后在訂閱節(jié)點(diǎn)上注冊(cè)一個(gè)watcher巾钉,客戶(hù)端與注冊(cè)中心之間保持TCP長(zhǎng)連接,后續(xù)有節(jié)點(diǎn)發(fā)生變化則會(huì)觸發(fā)watcher事件來(lái)把對(duì)應(yīng)節(jié)點(diǎn)下的全量數(shù)據(jù)拉取過(guò)來(lái)秘案。
3.2.3.1 doSubscribe
@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
// 訂閱所有數(shù)據(jù)
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
// 為空則把listeners放入到緩存的Map中
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
// 創(chuàng)建子節(jié)點(diǎn)監(jiān)聽(tīng)器砰苍,對(duì)root下的子節(jié)點(diǎn)做監(jiān)聽(tīng),一旦有子節(jié)點(diǎn)發(fā)生改變阱高,
// 那么就對(duì)這個(gè)節(jié)點(diǎn)進(jìn)行訂閱.
if (zkListener == null) {
// zkListener為空說(shuō)明是第一次拉取赚导,則新建一個(gè)listener
listeners.putIfAbsent(listener, new ChildListener() {
// 節(jié)點(diǎn)變更時(shí),觸發(fā)通知時(shí)執(zhí)行
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
// 遍歷所有節(jié)點(diǎn)
child = URL.decode(child);
// 如果有子節(jié)點(diǎn)還未被訂閱賊說(shuō)明是新節(jié)點(diǎn)赤惊,
if (!anyServices.contains(child)) {
// 加入到集合中
anyServices.add(child);
//就訂閱之
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
// 創(chuàng)建持久節(jié)點(diǎn)root吼旧,接下來(lái)訂閱持久節(jié)點(diǎn)的子節(jié)點(diǎn)
zkClient.create(root, false);
// 添加root節(jié)點(diǎn)的子節(jié)點(diǎn)監(jiān)聽(tīng)器,并返回當(dāng)前的services
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && !services.isEmpty()) {
// 遍歷所有的子節(jié)點(diǎn)進(jìn)行訂閱
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
// 增加當(dāng)前節(jié)點(diǎn)的訂閱未舟,并且會(huì)返回改節(jié)點(diǎn)下所有子節(jié)點(diǎn)的列表
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
// 訂閱類(lèi)別服務(wù)
} else {
List<URL> urls = new ArrayList<URL>();
// 將url轉(zhuǎn)變成
// /dubbo/com.demo.DemoService/providers
// /dubbo/com.demo.DemoService/configurators
// /dubbo/com.demo.DemoService/routers
// 根據(jù)url類(lèi)別獲取一組要訂閱的路徑
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
// 如果緩存沒(méi)有圈暗,則添加到緩存中
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
// 同樣如果監(jiān)聽(tīng)器緩存中沒(méi)有 則放入緩存
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
// 通知節(jié)點(diǎn)變化
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
// 訂閱并返回該節(jié)點(diǎn)下的子路徑并緩存
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
// 有子節(jié)點(diǎn)組裝掂为,沒(méi)有那么就將消費(fèi)者的協(xié)議變成empty作為url。
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 回調(diào)NotifyListener员串,更新本地緩存信息
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
3.2.3.2 doUnsubscribe
@Override
protected void doUnsubscribe(URL url, NotifyListener listener) {
// 通過(guò)url把監(jiān)聽(tīng)器全部拿到
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
ChildListener zkListener = listeners.get(listener);
if (zkListener != null) {
// 直接刪除group下所有的
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
// 移除監(jiān)聽(tīng)器
zkClient.removeChildListener(root, zkListener);
} else {
// 移除類(lèi)別服務(wù)下的監(jiān)聽(tīng)器
for (String path : toCategoriesPath(url)) {
zkClient.removeChildListener(path, zkListener);
}
}
}
}
}
3.2.3.3 其他
- 其他代碼相對(duì)來(lái)說(shuō)不是很復(fù)雜可以自行看一下勇哗。
3.3 ZookeeperRegistryFactory
- 工廠類(lèi)的代碼極其短,隨意看下寸齐。
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
3.3.1 關(guān)于ZookeeperTransporter
@SPI("curator")
public interface ZookeeperTransporter {
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
ZookeeperClient connect(URL url);
}
-
上面我提到過(guò)欲诺,Dubbo用Zookeeper的時(shí)候用了兩種方式實(shí)現(xiàn),一個(gè)是Apache Curator访忿,另一個(gè)是zkClient瞧栗,這個(gè)類(lèi)就是做看了一個(gè)轉(zhuǎn)換斯稳。如下圖
image 兩個(gè)類(lèi)都實(shí)現(xiàn)了該接口來(lái)向外提供統(tǒng)一的ZookeeperClient海铆。
這個(gè)實(shí)現(xiàn)在remoting模塊。暫時(shí)就不講了挣惰。
4. 結(jié)語(yǔ)
- 整個(gè)模塊卧斟,其他的Redis、Nacos等實(shí)現(xiàn)都是根據(jù)不同組件的特點(diǎn)來(lái)實(shí)現(xiàn)憎茂。功能都一樣珍语,只是實(shí)現(xiàn)不一樣,大家可以自己去探索一下竖幔。
- 整個(gè)模塊中我們單獨(dú)看的話主要是就是一個(gè)實(shí)現(xiàn)板乙,一個(gè)工廠,里面牽涉到了本地緩存拳氢、重試這些機(jī)制募逞。代碼量不是很大。認(rèn)真看還是不難的馋评。其中特別需要注意的就是注冊(cè)中心的數(shù)據(jù)結(jié)構(gòu) 和 發(fā)布訂閱這些的實(shí)現(xiàn)了放接。
- 結(jié)語(yǔ)有點(diǎn)亂。就這樣留特,不足之處希望留言指出纠脾,后續(xù)優(yōu)化。蜕青!感謝9兜浮!右核!
關(guān)于我
- 坐標(biāo)杭州慧脱,普通本科在讀,計(jì)算機(jī)科學(xué)與技術(shù)專(zhuān)業(yè)蒙兰,20年畢業(yè)磷瘤,目前處于實(shí)習(xí)階段芒篷。
- 主要做Java開(kāi)發(fā),會(huì)寫(xiě)點(diǎn)Golang采缚、Shell针炉。對(duì)微服務(wù)、大數(shù)據(jù)比較感興趣扳抽,預(yù)備做這個(gè)方向篡帕。
- 目前處于菜鳥(niǎo)階段,各位大佬輕噴贸呢,小弟正在瘋狂學(xué)習(xí)镰烧。
- 歡迎大家和我交流鴨!@阆荨怔鳖!