前言
Registry是dubbo對(duì)注冊(cè)中心的抽象分唾,提供服務(wù)的注冊(cè)抗碰、注銷、查找绽乔、訂閱弧蝇、取消訂閱等功能。本文按照dubbo中Registry的組織形式折砸,分析Registry的核心邏輯看疗。首先來(lái)看注冊(cè)中心的創(chuàng)建,RegistryFactory接口定義注冊(cè)中心的創(chuàng)建(工廠模式實(shí)現(xiàn))睦授,支持SPI擴(kuò)展鹃觉,默認(rèn)SPI實(shí)現(xiàn)是DubboRegistryFactory。注冊(cè)中心Registry接口繼承Node睹逃、RegistryService盗扇,抽象基類AbstractRegistry直接實(shí)現(xiàn)Registry,F(xiàn)ailbackRegistry繼承自基類AbstractRegistry沉填,所有注冊(cè)中心實(shí)現(xiàn)均繼承自FailbackRegistry疗隶,這里可以看出,所有注冊(cè)中心均支持失敗重試(Failback)翼闹。
一斑鼻、RegistryFactory(注冊(cè)中心工廠)
RegistryFactory的UML類圖如下:
先來(lái)看RegistryFactory接口定義,比較簡(jiǎn)單:
@SPI("dubbo")
public interface RegistryFactory {
/**
* Connect to the registry
* 1猎荠、check=false時(shí)坚弱,連接無(wú)需check,否則連接斷開(kāi)時(shí)直接拋異常
* 2关摇、支持URL的用戶名荒叶、密碼權(quán)限校驗(yàn)
* 3、支持注冊(cè)中心族備份地址:10.20.153.10
* 4输虱、支持注冊(cè)中心本地緩存文件
* 5些楣、支持超時(shí)設(shè)置
* 6、支持session 60s過(guò)期
*/
@Adaptive({"protocol"})
Registry getRegistry(URL url);
1.1 AbstractRegistryFactory
接著是抽象基類AbstractRegistryFactory宪睹,實(shí)現(xiàn)RegistryFactory接口的getRegistry方法愁茁,同時(shí)定義模板方法createRegistry供子類實(shí)現(xiàn);AbstractRegistryFactory的getRegistry方法先從Map緩存查詢注冊(cè)中心亭病,查不到則執(zhí)行模板方法createRegistry創(chuàng)建注冊(cè)中心鹅很,并放入緩存,然后返回該registry
@Override
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceStringWithoutResolving();
//加鎖罪帖,保證單例
LOCK.lock();
try {
// 緩存查詢注冊(cè)中心
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
// 不存在則通過(guò)spi促煮、ioc方式創(chuàng)建registry
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// 鎖釋放
LOCK.unlock();
}
}
1.2 其他實(shí)現(xiàn)
基類AbstractRegistryFactory的子類實(shí)現(xiàn)中食听,比較重要的是DubboRegistryFactory(介紹Protocol時(shí)已經(jīng)做了解析),其他實(shí)現(xiàn)比如RedisRegistryFactory污茵、ZookeeperRegistryFactory樱报、MulticastRegistryFactory的邏輯非常簡(jiǎn)單,直接返回對(duì)應(yīng)的注冊(cè)中心實(shí)現(xiàn)泞当,代碼就省略了迹蛤。
二、Registry(注冊(cè)中心)
來(lái)看dubbo中Registry實(shí)現(xiàn)襟士。UML類圖如下:
方便理解起見(jiàn)盗飒,這里把注冊(cè)中實(shí)現(xiàn)分為三個(gè)層次,分別是Registry接口陋桂、FailbackRegistry實(shí)現(xiàn)逆趣、注冊(cè)中心實(shí)現(xiàn)。
2.1嗜历、Registry接口
上面UML類圖中可以看出宣渗,Registry繼承Node和RegistryService接口(Registry接口內(nèi)部無(wú)新增方法),重點(diǎn)關(guān)注RegistryService接口梨州。RegistryService抽象了服務(wù)的注冊(cè)痕囱、注銷、訂閱暴匠、取消訂閱鞍恢、查找等核心功能,來(lái)看接口定義:
public interface RegistryService {
/**
* 注冊(cè)數(shù)據(jù)每窖,比如提供者服務(wù)帮掉、消費(fèi)者地址、路由規(guī)則窒典、override規(guī)則以及其他數(shù)據(jù)
* 1蟆炊、若URL中check=false,那么注冊(cè)失敗會(huì)進(jìn)行重試且不會(huì)拋出異常崇败,否則直接拋異常
* 2盅称、若URL中dynamic=false肩祥,那么URL中信息會(huì)被持久化存儲(chǔ)后室,否則注冊(cè)過(guò)程異常退出,URL信息應(yīng)當(dāng)被刪除
* 3混狠、若URL中category=routers岸霹,那么意味著分類存儲(chǔ),默認(rèn)catetory=providers将饺,且會(huì)根據(jù)分類區(qū)域進(jìn)行通知更新
* 4贡避、當(dāng)注冊(cè)中心重啟,網(wǎng)絡(luò)波動(dòng),數(shù)據(jù)不能被丟棄蟆盐,包括刪除損壞的流水線中數(shù)據(jù)的刪除
* 5啄踊、參數(shù)不同的URL可以共存,不能相互覆蓋
* @param url Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
*/
void register(URL url);
/**
* 注銷
* 1杀捻、若是dynamic=false的持久化存儲(chǔ)井厌,如果找不到注冊(cè)數(shù)據(jù),那么會(huì)拋出非法狀態(tài)異常致讥,否則會(huì)忽略
* 2仅仆、根據(jù)完整URL進(jìn)行匹配注銷
* @param url Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
*/
void unregister(URL url);
/**
* 訂閱注冊(cè)數(shù)據(jù),并在注冊(cè)數(shù)據(jù)更新的時(shí)候自動(dòng)推送
* 服務(wù)訂閱
* 1垢袱、若URL中check=false墓拜,那么當(dāng)注冊(cè)失敗時(shí),會(huì)直接在后臺(tái)重試不會(huì)拋異常
* 2请契、若URL中category=routers咳榜,那么只會(huì)通知特定分類數(shù)據(jù);多個(gè)分類用逗號(hào)隔開(kāi)爽锥;允許使用*全部匹配
* 3贿衍、允許接口、組救恨、版本以及分類作為查詢條件
* 4贸辈、查詢條件允許使用*進(jìn)行匹配,意味著訂閱接口的所有版本
* 5肠槽、若注冊(cè)中心重啟擎淤、網(wǎng)絡(luò)波動(dòng),必須自動(dòng)保存訂閱請(qǐng)求
* 6秸仙、參數(shù)不同的URL可以共存嘴拢,不能相互覆蓋
* 7、訂閱過(guò)程必須是阻塞的
*/
void subscribe(URL url, NotifyListener listener);
/**
* 1寂纪、沒(méi)有訂閱席吴,則直接忽略
* 2、根據(jù)URL完全匹配
**/
void unsubscribe(URL url, NotifyListener listener);
/**
* 根據(jù)條件匹配捞蛋,查詢注冊(cè)數(shù)據(jù)孝冒;對(duì)應(yīng)訂閱的推模式,提供拉模式拟杉,且僅返回一個(gè)結(jié)果
*/
List<URL> lookup(URL url);
從接口定義可以看出庄涡,RegsitryService對(duì)數(shù)據(jù)的注冊(cè)、注銷搬设、訂閱穴店、取消訂閱撕捍、查找等功能做了定義約束,所有對(duì)RegistryService的實(shí)現(xiàn)都必須滿足這個(gè)約束泣洞。
2.2忧风、AbstractRegistry & FailbackRegistry
接下來(lái)看Registry接口的基類實(shí)現(xiàn)AbstractRegistry、FailbackRegistry球凰。
2.2.1 AbstractRegistry
先來(lái)看AbstractRegistry阀蒂,重點(diǎn)關(guān)注構(gòu)造方法,AbstractRegistry基類的構(gòu)造過(guò)程的核心邏輯分為三部分:1、創(chuàng)建注冊(cè)中心緩存文件弟蚀;2蚤霞、cache文件加載至properties;3义钉、同步backUpUrl信息昧绣。我們先來(lái)看構(gòu)造方法的定義,然后再分步來(lái)看:
public AbstractRegistry(URL url) {
// 注冊(cè)中心URL負(fù)載
setUrl(url);
// Start file save timer
// 1捶闸、注冊(cè)中心文件同步保存開(kāi)關(guān)夜畴,默認(rèn)關(guān)閉,即默認(rèn)異步保存
syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
// 默認(rèn)文件路徑: user.home/.dubbo/dubbo-registry-applicationName-ip.cache
String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
File file = null;
if (ConfigUtils.isNotEmpty(filename)) {
// 文件目錄創(chuàng)建失敗删壮,直接拋異常
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
this.file = file;
// When starting the subscription center,we need to read the local cache file for future Registry fault tolerance processing.
// 2贪绘、加載注冊(cè)中心cache文件到內(nèi)存,用于容錯(cuò)
loadProperties();
// 3央碟、backUpUrl數(shù)據(jù)同步税灌,同步所有訂閱者、更新properties緩存亿虽。
notify(url.getBackupUrls());
}
2.2.1.1菱涤、創(chuàng)建注冊(cè)中心緩存
這一步比較簡(jiǎn)單,首先根據(jù)URL信息洛勉,確定緩存文件保存方式(異步粘秆、同步);然后收毫,拼接緩存文件名稱攻走,根據(jù)文件名創(chuàng)建緩存文件,創(chuàng)建失敗則直接拋異常此再,否則初始化緩存文件file昔搂。
2.2.1.2、注冊(cè)中心cache加載
加載本地cache文件到內(nèi)存緩存properties引润,將上一步中的cache文件內(nèi)容巩趁,加載到properties。比較容易理解淳附,啟動(dòng)時(shí)先讀本地緩存议慰,用于容錯(cuò)。
private void loadProperties() {
if (file != null && file.exists()) {
InputStream in = null;
// 省略try-catch
in = new FileInputStream(file);
properties.load(in);
if (logger.isInfoEnabled()) {
logger.info("Load registry cache file " + file + ", data: " + properties);
}
}
}
2.2.1.3奴曙、backupUrl數(shù)據(jù)同步
重點(diǎn)來(lái)看backupUrl的數(shù)據(jù)同步别凹,backupUrl的生成方式前面我們已經(jīng)講過(guò)了,這一步主要是將生成的backupUrl列表同步給各訂閱者以及內(nèi)存緩存Properties洽糟,來(lái)看代碼:
protected void notify(List<URL> urls) {
if (CollectionUtils.isEmpty(urls)) {
return;
}
// 根據(jù)已訂閱的URL炉菲,以及訂閱者listener進(jìn)行同步
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL url = entry.getKey();
//過(guò)濾掉不匹配的URL
if (!UrlUtils.isMatch(url, urls.get(0))) {
continue;
}
Set<NotifyListener> listeners = entry.getValue();
if (listeners != null) {
for (NotifyListener listener : listeners) {
try {
// 核心邏輯,執(zhí)行URL信息同步
notify(url, listener, filterEmpty(url, urls));
} catch (Throwable t) {
logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
}
}
}
}
}
繼續(xù)來(lái)看notify方法:
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((CollectionUtils.isEmpty(urls))
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
// keep every provider's category.
// 按照category分組
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
//緩存分組后的Notified結(jié)果
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
// 同步URL信息坤溃,這里有兩種實(shí)現(xiàn)拍霜,分別是RegistryDirectory和RegistryProtocol$OverrideListener,listener通過(guò)subscribe方法被注冊(cè)到subscribed緩存,
listener.notify(categoryList);
// 每次notify都會(huì)更新cache緩存文件(同步或者異步)薪介,保證注冊(cè)中心properties內(nèi)容與各訂閱者拿到的信息一致祠饺。
saveProperties(url);
}
}
同步URL信息到訂閱者的邏輯主要在RegistryDirectory和RegistryProtocol$OverrideListener,這里就不再做解析了汁政。來(lái)看同步到內(nèi)存properties的邏輯道偷,把所有待同步的URL序列化為字符串(每個(gè)url中間用空格隔開(kāi)),然后將URL數(shù)據(jù)保存(同步或異步)至緩存文件(即2.2.1.1中創(chuàng)建的緩存文件)记劈,注意勺鸦,這里異步保存實(shí)際執(zhí)行的邏輯與同步保存完全一致,核心邏輯在doSaveProperties方法目木。
private void saveProperties(URL url) {
if (file == null) {
return;
}
try {
StringBuilder buf = new StringBuilder();
// url緩存信息組裝,每個(gè)url中間用空格隔開(kāi)
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified != null) {
for (List<URL> us : categoryNotified.values()) {
for (URL u : us) {
if (buf.length() > 0) {
buf.append(URL_SEPARATOR);
}
buf.append(u.toFullString());
}
}
}
// 更新內(nèi)存緩存
properties.setProperty(url.getServiceKey(), buf.toString());
// 版本控制换途,可以看出,MVCC并非DB專有
long version = lastCacheChanged.incrementAndGet();
// 同步保存文件則直接執(zhí)行save,將properties緩存內(nèi)容同步至緩存文件刽射,否則放入線程池異步調(diào)度
if (syncSaveFile) {
doSaveProperties(version);
} else {
registryCacheExecutor.execute(new SaveProperties(version));
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
繼續(xù)來(lái)看doSaveProperties方法怀跛,方法參數(shù)是當(dāng)前文件的版本號(hào),可以看到柄冲,防止并發(fā)操作吻谋,版本號(hào)用于版本控制
// 保存配置到本地緩存文件,文件版本
public void doSaveProperties(long version) {
// 防止版本回退
if (version < lastCacheChanged.get()) {
return;
}
if (file == null) {
return;
}
// Save
try {
// 創(chuàng)建本地緩存文件鎖,不存在則直接創(chuàng)建现横;
File lockfile = new File(file.getAbsolutePath() + ".lock");
if (!lockfile.exists()) {
lockfile.createNewFile();
}
try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
FileChannel channel = raf.getChannel()) {
FileLock lock = channel.tryLock();
//拿到鎖才可以操作
if (lock == null) {
throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
}
// Save
try {
if (!file.exists()) {
file.createNewFile();
}
try (FileOutputStream outputFile = new FileOutputStream(file)) {
properties.store(outputFile, "Dubbo Registry Cache");
}
} finally {
lock.release();
}
}
} catch (Throwable e) {
if (version < lastCacheChanged.get()) {
return;
} else {
registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
}
logger.warn("Failed to save registry cache file, cause: " + e.getMessage(), e);
}
}
為了進(jìn)一步加深理解漓拾,我們把AbstractRegistry構(gòu)建過(guò)程中的數(shù)據(jù)流圖示如下:
2.2.1.4、數(shù)據(jù)注冊(cè) - register
基類中的注冊(cè)等方法邏輯非常簡(jiǎn)單戒祠,只是將URL放入緩存,非常簡(jiǎn)單骇两,不做過(guò)多說(shuō)明。
2.2.1.5姜盈、數(shù)據(jù)注銷 - unRegister
同樣的低千,注銷方法邏輯也非常簡(jiǎn)單,將URL從緩存中刪除。
2.2.1.6示血、數(shù)據(jù)訂閱 - subscribe
數(shù)據(jù)訂閱需要注意棋傍,訂閱的邏輯核心是注冊(cè)監(jiān)聽(tīng)器,以便數(shù)據(jù)變更時(shí)难审,同步更新瘫拣;subscribed緩存的結(jié)構(gòu)比較特殊,來(lái)看代碼:
@Override
public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}
// subscribed緩存結(jié)構(gòu):ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>()
Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>());
listeners.add(listener);
}
2.2.1.7告喊、取消數(shù)據(jù)訂閱 - unSubscribe
取消訂閱即刪除監(jiān)聽(tīng)器麸拄,從subscribed緩存中刪除對(duì)應(yīng)監(jiān)聽(tīng)器。
2.2.1.8黔姜、數(shù)據(jù)查找 - lookup
查找邏輯即拢切,從已同步過(guò)的URL列表(notified緩存)中,過(guò)濾滿足要求的URL秆吵;若當(dāng)前已同步過(guò)的URL集合為空淮椰,則
@Override
public List<URL> lookup(URL url) {
List<URL> result = new ArrayList<>();
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
if (notifiedUrls != null && notifiedUrls.size() > 0) {
for (List<URL> urls : notifiedUrls.values()) {
for (URL u : urls) {
if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
result.add(u);
}
}
}
} else {
final AtomicReference<List<URL>> reference = new AtomicReference<>();
NotifyListener listener = reference::set;
// 即注冊(cè)監(jiān)聽(tīng)器,保證首次notify有數(shù)據(jù)返回
subscribe(url, listener); // Subscribe logic guarantees the first notify to return
List<URL> urls = reference.get();
if (CollectionUtils.isNotEmpty(urls)) {
for (URL u : urls) {
if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
result.add(u);
}
}
}
}
return result;
}
2.2.1.9帮毁、注冊(cè)中心銷毀- destory
銷毀的邏輯比較簡(jiǎn)單实苞,分為注銷數(shù)據(jù)和注銷監(jiān)聽(tīng)器兩部分,來(lái)看代碼:
@Override
// 主要做兩件事情:1烈疚、注銷URL黔牵,從registered列表中剔除所有URL;2爷肝、移除所有NotifyListener猾浦,也就是說(shuō)不再接受配置變更同步消息。
public void destroy() {
if (logger.isInfoEnabled()) {
logger.info("Destroy registry:" + getUrl());
}
// 將所有URL從registered移除,即注銷全部數(shù)據(jù)
Set<URL> destroyRegistered = new HashSet<>(getRegistered());
if (!destroyRegistered.isEmpty()) {
for (URL url : new HashSet<>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
try {
unregister(url);
if (logger.isInfoEnabled()) {
logger.info("Destroy unregister url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
// 注銷監(jiān)聽(tīng)器,即從subscribed移除所有訂閱URL的listener
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
if (!destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
try {
unsubscribe(url, listener);
if (logger.isInfoEnabled()) {
logger.info("Destroy unsubscribe url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
}
AbstractRegistry基類的邏輯就分析到這里灯抛,接著來(lái)看FailbackRegistry金赦。
2.2.2、FailbackRegistry
FailbackRegistry 顧名思義对嚼,支持失敗恢復(fù)的注冊(cè)中心夹抗,繼承自基類AbstractRegistry,可以看到在基類基礎(chǔ)上擴(kuò)展了失敗恢復(fù)功能纵竖,先來(lái)看幾個(gè)緩存變量:
// 重試任務(wù)map
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();
private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();
private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();
private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();
private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();
幾個(gè)重試任務(wù)map漠烧,用于緩存失敗的任務(wù)以便于重試;接著來(lái)看構(gòu)造方法,在父類構(gòu)造方法的基礎(chǔ)上靡砌,新增了HashedWheelTimer實(shí)例已脓,用于定時(shí)重試失敗任務(wù),默認(rèn)重試時(shí)間間隔5s通殃。
public FailbackRegistry(URL url) {
super(url);
// 默認(rèn)重試時(shí)間間隔 5s
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
// 利用hashTimer實(shí)現(xiàn)定時(shí)重試,
retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
}
除此之外度液,F(xiàn)ailbackRegistry還定義了幾個(gè)關(guān)鍵的模板方法,由子類實(shí)現(xiàn):
public abstract void doRegister(URL url);
public abstract void doUnregister(URL url);
public abstract void doSubscribe(URL url, NotifyListener listener);
public abstract void doUnsubscribe(URL url, NotifyListener listener);
2.2.2.1、數(shù)據(jù)注冊(cè)-register
注冊(cè)邏輯在父類基礎(chǔ)上新增了失敗以后的操作堕担,邏輯比較簡(jiǎn)單已慢,直接來(lái)看代碼:
@Override
public void register(URL url) {
// 父類注冊(cè)方法,保存url到已注冊(cè)列表
super.register(url);
// 將url從注冊(cè)失敗列表中剔除
removeFailedRegistered(url);
// 將url從注銷失敗列表中剔除
removeFailedUnregistered(url);
try {
// 執(zhí)行模板方法邏輯
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 如果啟動(dòng)檢測(cè)開(kāi)關(guān)開(kāi)啟(默認(rèn)開(kāi)啟)照宝,失敗會(huì)直接拋異常蛇受,否則加入失敗列表句葵,用于重試
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// 加入注冊(cè)失敗列表厕鹃,用于重試
addFailedRegistered(url);
}
}
2.2.2.2、數(shù)據(jù)注銷-unRegister
注銷邏輯與注冊(cè)類似乍丈,直接來(lái)看代碼:
@Override
public void unregister(URL url) {
// 父類注銷邏輯
super.unregister(url);
// 從注冊(cè)失敗緩存中刪除
removeFailedRegistered(url);
// 從注銷失敗列表中刪除
removeFailedUnregistered(url);
try {
// 執(zhí)行模板方法
doUnregister(url);
} catch (Exception e) {
Throwable t = e;
// 如果啟動(dòng)檢測(cè)開(kāi)關(guān)開(kāi)啟(默認(rèn)開(kāi)啟)剂碴,失敗會(huì)直接拋異常,否則加入失敗列表轻专,用于重試
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to unregister " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
//加入失敗重試緩存
addFailedUnregistered(url);
}
}
2.2.2.3忆矛、數(shù)據(jù)訂閱-subscribe
與注冊(cè)、注銷邏輯類似请垛,數(shù)據(jù)訂閱邏輯也非常簡(jiǎn)單催训,代碼就不再展示了
2.2.2.4、取消數(shù)據(jù)訂閱-unSubscribe
與注冊(cè)宗收、注銷邏輯類似匈勋,不做過(guò)多解析璃俗。
2.3喊衫、注冊(cè)中心實(shí)現(xiàn)
好了充尉,前面的鋪墊結(jié)束了鲜锚,本節(jié)來(lái)看具體的注冊(cè)中心實(shí)現(xiàn)周拐,下面按照DubboRegistry、MulticastRegistry吏够、RedisRegistry勾给、ZookeeperRegistry的順序依次分析滩报。
2.3.1、DubboRegistry
DubboRegistry作為dubbo的默認(rèn)注冊(cè)中心實(shí)現(xiàn)(RegistryFactory默認(rèn)SPI實(shí)現(xiàn)是DubboRegistryFactory)播急,嚴(yán)格意義上來(lái)講脓钾,DubboRegistry實(shí)際上是一個(gè)Registry代理,核心邏輯全部由代理也即registryService實(shí)現(xiàn)桩警。DubboRegistry的創(chuàng)建在DubboRegistryFactory中已經(jīng)做了解析可训,這里不做過(guò)多說(shuō)明。重點(diǎn)關(guān)注DubboRegistry的構(gòu)造方法
public DubboRegistry(Invoker<RegistryService> registryInvoker, RegistryService registryService) {
super(registryInvoker.getUrl());
this.registryInvoker = registryInvoker;
// Registry代理捶枢,核心邏輯借助registryService實(shí)現(xiàn)
this.registryService = registryService;
// 重連定時(shí)器握截,默認(rèn)重連間隔時(shí)間3s
this.reconnectPeriod = registryInvoker.getUrl().getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, RECONNECT_PERIOD_DEFAULT);
// 初始化調(diào)度任務(wù)邏輯,具體邏輯參考recover方法
reconnectFuture = reconnectTimer.scheduleWithFixedDelay(() -> {
try {
// 重連邏輯
connect();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at reconnect, cause: " + t.getMessage(), t);
}
}, reconnectPeriod, reconnectPeriod, TimeUnit.MILLISECONDS);
}
介紹FailbackRegistry時(shí)說(shuō)過(guò)柱蟀,所有的注冊(cè)中心實(shí)現(xiàn)都支持自動(dòng)恢復(fù)川蒙,DubboRegistry的自動(dòng)恢復(fù)實(shí)現(xiàn)原理是若當(dāng)前注冊(cè)中心不可用蚜厉,則直接將該URL注冊(cè)信息加入到注冊(cè)失敗长已、訂閱失敗緩存(借助父類FailbackRegistry的addFailedRegistered、addFailedSubscribed方法實(shí)現(xiàn))昼牛,由父類的HashedWheelTimer重新調(diào)度术瓮,進(jìn)行恢復(fù),來(lái)看失敗后加入緩存的邏輯(外層方法是connect贰健,內(nèi)部實(shí)際上執(zhí)行的recover方法):
// 核心邏輯:注冊(cè)失敗胞四、訂閱失敗的url分別放入對(duì)應(yīng)列表,用于hashedWheelTimer調(diào)度
@Override
protected void recover() throws Exception {
// register
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
// 放入注冊(cè)失敗列表
for (URL url : recoverRegistered) {
addFailedRegistered(url);
}
}
// subscribe
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
addFailedSubscribed(url, listener);
}
}
}
}
2.3.2伶椿、MulticastRegistry
MulticastRegistry即多播注冊(cè)中心辜伟,顧名思義,采用多播實(shí)現(xiàn)脊另;注冊(cè)导狡、注銷、訂閱偎痛、取消訂閱等均通過(guò)多播方式實(shí)現(xiàn)旱捧。核心邏輯在構(gòu)造方法,包括多播組的創(chuàng)建以及多播消息的處理踩麦;創(chuàng)建多播組比較容易理解枚赡,重點(diǎn)關(guān)注多播消息的處理,在構(gòu)造方法中創(chuàng)建并啟動(dòng)一個(gè)守護(hù)線程谓谦,用于接收并處理多播消息(注冊(cè)消息贫橙、注銷消息、訂閱消息)反粥,處理多播消息的入口是receive方法卢肃。
2.3.2.1谓松、構(gòu)造方法
先來(lái)看構(gòu)造方法:
// 創(chuàng)建并啟動(dòng)daemon線程,用于接收廣播消息践剂,對(duì)接收到的消息處理邏輯在receive方法
public MulticastRegistry(URL url) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
try {
// 創(chuàng)建并加入多播組
multicastAddress = InetAddress.getByName(url.getHost());
checkMulticastAddress(multicastAddress);
multicastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
multicastSocket = new MulticastSocket(multicastPort);
// 注冊(cè)中心URL地址加入多播組
NetUtils.joinMulticastGroup(multicastSocket, multicastAddress);
// 啟動(dòng)daemon線程鬼譬,用于接收廣播socket消息
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
byte[] buf = new byte[2048];
// UDP包封裝
DatagramPacket recv = new DatagramPacket(buf, buf.length);
while (!multicastSocket.isClosed()) {
try {
// 接收UDP報(bào)文
multicastSocket.receive(recv);
String msg = new String(recv.getData()).trim();
int i = msg.indexOf('\n');
if (i > 0) {
msg = msg.substring(0, i).trim();
}
// 多播消息接收處理
MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
Arrays.fill(buf, (byte) 0);
} catch (Throwable e) {
if (!multicastSocket.isClosed()) {
logger.error(e.getMessage(), e);
}
}
}
}
}, "DubboMulticastRegistryReceiver");
thread.setDaemon(true);
thread.start();
} catch (IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 這里利用定時(shí)調(diào)度線程池,定時(shí)清理received緩存中不可用socket逊脯;默認(rèn)清理時(shí)間間隔60s;
this.cleanPeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
if (url.getParameter("clean", true)) {
this.cleanFuture = cleanExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
clean(); // Remove the expired
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected exception occur at clean expired provider, cause: " + t.getMessage(), t);
}
}
}, cleanPeriod, cleanPeriod, TimeUnit.MILLISECONDS);
} else {
this.cleanFuture = null;
}
}
receive方法實(shí)現(xiàn)比較簡(jiǎn)單优质,根據(jù)msg類型(通過(guò)消息前綴判斷)執(zhí)行相應(yīng)的注冊(cè)、注銷军洼、訂閱操作巩螃;重點(diǎn)關(guān)注registered、unregistered匕争、multicast三個(gè)方法(MulticastRegistry的注冊(cè)避乏、注銷、訂閱邏輯均通過(guò)這三個(gè)方法實(shí)現(xiàn))甘桑。
private void receive(String msg, InetSocketAddress remoteAddress) {
if (logger.isInfoEnabled()) {
logger.info("Receive multicast message: " + msg + " from " + remoteAddress);
}
// 廣播注冊(cè)消息
if (msg.startsWith(Constants.REGISTER)) {
URL url = URL.valueOf(msg.substring(Constants.REGISTER.length()).trim());
registered(url);
// 廣播注銷消息
} else if (msg.startsWith(Constants.UNREGISTER)) {
URL url = URL.valueOf(msg.substring(Constants.UNREGISTER.length()).trim());
unregistered(url);
// 廣播訂閱消息
} else if (msg.startsWith(Constants.SUBSCRIBE)) {
URL url = URL.valueOf(msg.substring(Constants.SUBSCRIBE.length()).trim());
// 根據(jù)注冊(cè)的地址拍皮,發(fā)送單播、多播消息
Set<URL> urls = getRegistered();
if (CollectionUtils.isNotEmpty(urls)) {
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String host = remoteAddress != null && remoteAddress.getAddress() != null ? remoteAddress.getAddress().getHostAddress() : url.getIp();
// 發(fā)送單播跑杭,多播消息
if (url.getParameter("unicast", true) // Whether the consumer's machine has only one process
&& !NetUtils.getLocalHost().equals(host)) { // Multiple processes in the same machine cannot be unicast with unicast or there will be only one process receiving information
unicast(Constants.REGISTER + " " + u.toFullString(), host);
} else {
multicast(Constants.REGISTER + " " + u.toFullString());
}
}
}
}
}/* else if (msg.startsWith(UNSUBSCRIBE)) {
}*/
}
先來(lái)看registered方法铆帽,核心邏輯是將URL與subscried緩存中的URL進(jìn)行匹配,匹配成功則加入received德谅,同時(shí)同步給訂閱者(通過(guò)NotifyListener的notify方法),然后通知當(dāng)前l(fā)istener(在subscribe時(shí)wait*)
protected void registered(URL url) {
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL key = entry.getKey();
if (UrlUtils.isMatch(key, url)) {
Set<URL> urls = received.get(key);
if (urls == null) {
received.putIfAbsent(key, new ConcurrentHashSet<URL>());
urls = received.get(key);
}
urls.add(url);
List<URL> list = toList(urls);
for (NotifyListener listener : entry.getValue()) {
notify(key, listener, list);
synchronized (listener) {
listener.notify();
}
}
}
}
}
再來(lái)看unRegistered爹橱,邏輯上大體與registered類似,將多播消息中的URL與subscried中URL匹配窄做,匹配成功則將該URL從received中剔除愧驱;若當(dāng)前received中該URL對(duì)應(yīng)URL列表為空,則重置該URL的protocol為empty椭盏;最后同步變更給訂閱者组砚。
protected void unregistered(URL url) {
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL key = entry.getKey();
if (UrlUtils.isMatch(key, url)) {
Set<URL> urls = received.get(key);
if (urls != null) {
urls.remove(url);
}
// received中url對(duì)應(yīng)URL列表為空,則直接重置該url協(xié)議為empty
if (urls == null || urls.isEmpty()) {
if (urls == null) {
urls = new ConcurrentHashSet<URL>();
}
URL empty = url.setProtocol(Constants.EMPTY_PROTOCOL);
urls.add(empty);
}
List<URL> list = toList(urls);
// 同步變更消息到各訂閱者
for (NotifyListener listener : entry.getValue()) {
notify(key, listener, list);
}
}
}
}
最后來(lái)看multicast方法庸汗,邏輯比較簡(jiǎn)單惫确,即為發(fā)送多播消息到多播消息組(多播消息由構(gòu)造方法中創(chuàng)建的線程異步處理)
private void multicast(String msg) {
if (logger.isInfoEnabled()) {
logger.info("Send multicast message: " + msg + " to " + multicastAddress + ":" + multicastPort);
}
try {
byte[] data = (msg + "\n").getBytes();
DatagramPacket hi = new DatagramPacket(data, data.length, multicastAddress, multicastPort);
multicastSocket.send(hi);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
2.3.2.2、其他邏輯
MulticastRegistry的其他邏輯包括doRegister蚯舱、doUnregister改化、doSubscribe、doUnsubscribe枉昏、destroy陈肛,前面四個(gè)方法的實(shí)現(xiàn)方式完全一致,即拼接并發(fā)送多播消息到多播組兄裂,這里以doRegister為例句旱,代碼如下:
@Override
public void doRegister(URL url) {
multicast(Constants.REGISTER + " " + url.toFullString());
}
最后來(lái)看destroy阳藻,銷毀需要處理多播組、關(guān)閉所有線程池谈撒、關(guān)閉所有socket腥泥,來(lái)看代碼:
@Override
public void destroy() {
super.destroy();
try {
// 取消定時(shí)清理任務(wù)
ExecutorUtil.cancelScheduledFuture(cleanFuture);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
// 退出多播組,并關(guān)閉socket
multicastSocket.leaveGroup(multicastAddress);
multicastSocket.close();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
// 關(guān)閉定時(shí)清理線程池,這里關(guān)注下線程池的優(yōu)雅關(guān)閉
ExecutorUtil.gracefulShutdown(cleanExecutor, cleanPeriod);
}
來(lái)看下dubbo線程池的優(yōu)雅關(guān)閉:
public static void gracefulShutdown(Executor executor, int timeout) {
if (!(executor instanceof ExecutorService) || isTerminated(executor)) {
return;
}
final ExecutorService es = (ExecutorService) executor;
try {
// 新任務(wù)不能再提交
es.shutdown();
} catch (SecurityException ex2) {
return;
} catch (NullPointerException ex2) {
return;
}
try {
// 等待隊(duì)列內(nèi)剩余任務(wù)結(jié)束啃匿,立即關(guān)閉線程池
if (!es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
es.shutdownNow();
}
} catch (InterruptedException ex) {
es.shutdownNow();
Thread.currentThread().interrupt();
}
// 若線程池仍未關(guān)閉蛔外,則新建線程用于線程池的關(guān)閉
if (!isTerminated(es)) {
newThreadToCloseExecutor(es);
}
}
// 專門用于關(guān)閉線程池的線程池shutdownExecutor
private static void newThreadToCloseExecutor(final ExecutorService es) {
if (!isTerminated(es)) {
shutdownExecutor.execute(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < 1000; i++) {
es.shutdownNow();
if (es.awaitTermination(10, TimeUnit.MILLISECONDS)) {
break;
}
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
});
}
}
了解完MulticastRegistry的所有方法,我們來(lái)看整個(gè)MulticastRegistry中的數(shù)據(jù)流溯乒,執(zhí)行注冊(cè)操作(這里以register為例夹厌,其他操作類似)將數(shù)據(jù)多播至多播組,然后由deamon線程異步將數(shù)據(jù)同步至訂閱者裆悄,如下圖所示:
2.3.3矛纹、RedisRegistry
RedisRegistry,即使用Redis存儲(chǔ)URL數(shù)據(jù)的注冊(cè)中心光稼,這里從初始化或南、數(shù)據(jù)流以及核心方法等幾個(gè)方面進(jìn)行解析。RedisRegisty除了使用Redis緩存之外钟哥,還使用了Redis的消息隊(duì)列迎献,用于doSubscribe過(guò)程中的URL數(shù)據(jù)變更消息處理。
2.3.3.1腻贰、初始化
先來(lái)看RedisRegistry的初始化,大致可以分為父類構(gòu)造方法扒秸、注冊(cè)中心參數(shù)初始化播演、RedisPool連接池創(chuàng)建、過(guò)期調(diào)度線程池啟動(dòng)伴奥。父類構(gòu)造方法主要是AbstractRegistry構(gòu)造方法的調(diào)用写烤;參數(shù)初始化主要包括RedisPool連接池參數(shù)初始化、注冊(cè)中心重連時(shí)間間隔初始化拾徙、過(guò)期線程池調(diào)度時(shí)間間隔初始化等洲炊;RedisPool的創(chuàng)建比較簡(jiǎn)單,即直接取上一步中的參數(shù)創(chuàng)建RedisPool連接池(這里需要注意尼啡,會(huì)對(duì)同一個(gè)URL的多個(gè)backup地址單獨(dú)創(chuàng)建RedisPool暂衡,多個(gè)地址之間進(jìn)行了隔離,保證URL的可用性)崖瞭;過(guò)期調(diào)度線程池的調(diào)度比較容易理解狂巢,也就是說(shuō)在注冊(cè)中心構(gòu)造過(guò)程中已經(jīng)啟動(dòng)了對(duì)過(guò)期URL數(shù)據(jù)的定時(shí)清理,默認(rèn)調(diào)度間隔30s书聚。
然后來(lái)看構(gòu)造方法:
public RedisRegistry(URL url) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 借用對(duì)象池管理配置
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
config.setTestOnReturn(url.getParameter("test.on.return", false));
config.setTestWhileIdle(url.getParameter("test.while.idle", false));
// config參數(shù)配置唧领,略去
// 支持的redis集群模式藻雌,failover和replicate
String cluster = url.getParameter("cluster", "failover");
if (!"failover".equals(cluster) && !"replicate".equals(cluster)) {
throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
}
replicate = "replicate".equals(cluster);
List<String> addresses = new ArrayList<>();
addresses.add(url.getAddress());
String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]);
if (ArrayUtils.isNotEmpty(backups)) {
addresses.addAll(Arrays.asList(backups));
}
for (String address : addresses) {
int i = address.indexOf(':');
String host;
int port;
if (i > 0) {
host = address.substring(0, i);
port = Integer.parseInt(address.substring(i + 1));
} else {
host = address;
port = DEFAULT_REDIS_PORT;
}
// 每個(gè)地址對(duì)應(yīng)一個(gè)jedis連接池,URL的各地址之間互不影響斩个,保證可用性
this.jedisPools.put(address, new JedisPool(config, host, port,
url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT), StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(),
url.getParameter("db.index", 0)));
}
this.reconnectPeriod = url.getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD);
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
if (!group.endsWith(Constants.PATH_SEPARATOR)) {
group = group + Constants.PATH_SEPARATOR;
}
// group = "/group/"
this.root = group;
// 配置過(guò)期定時(shí)調(diào)度
this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
try {
// 延遲過(guò)期
deferExpired(); // Extend the expiration time
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
}
}, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
}
// 延遲過(guò)期
private void deferExpired() {
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
for (URL url : new HashSet<>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
String key = toCategoryPath(url);
// 延長(zhǎng)緩存過(guò)期時(shí)間胯杭,并發(fā)布隊(duì)列消息
if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
jedis.publish(key, Constants.REGISTER);
}
}
}
// 如果開(kāi)啟了強(qiáng)制清理開(kāi)關(guān);則直接清理redis中數(shù)據(jù),并發(fā)布注銷消息
if (admin) {
clean(jedis);
}
// 無(wú)需創(chuàng)建副本受啥,則只寫單臺(tái)redis歉摧,直接break;
if (!replicate) {
break;// If the server side has synchronized data, just write a single machine
}
}
} catch (Throwable t) {
logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
}
}
}
再來(lái)看整個(gè)RedisRegistry中的數(shù)據(jù)流腔呜,可以發(fā)現(xiàn)叁温,RedisRegistry通過(guò)Redis緩存與Redis消息隊(duì)列,異步+同步的方式將數(shù)據(jù)同步至本地cache文件核畴、內(nèi)存緩存Properties以及具體的數(shù)據(jù)訂閱者膝但,如RegistryDirectory:
2.3.3.2、注冊(cè)(doRegister)
了解完RedisRegistry中的數(shù)據(jù)流谤草,再來(lái)看注冊(cè)過(guò)程就比較容易理解了跟束。主要包括兩個(gè)核心操作:將當(dāng)前URL緩存至Redis;然后將URL信息發(fā)布至Redis消息隊(duì)列丑孩。直接來(lái)看代碼:
public void doRegister(URL url) {
String key = toCategoryPath(url);
String value = url.toFullString();
String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
boolean success = false;
RpcException exception = null;
// 當(dāng)前URL的所有可用地址冀宴,遍歷,存入redis温学,并發(fā)布注冊(cè)消息
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
jedis.hset(key, value, expire);
jedis.publish(key, Constants.REGISTER);
success = true;
if (!replicate) {
break; // If the server side has synchronized data, just write a single machine
}
}
} catch (Throwable t) {
exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
if (exception != null) {
if (success) {
logger.warn(exception.getMessage(), exception);
} else {
throw exception;
}
}
}
2.3.3.3略贮、注銷(doUnregister)
注銷邏輯即注冊(cè)邏輯的反向操作,先刪除redis緩存仗岖,再發(fā)布注銷消息:
public void doUnregister(URL url) {
String key = toCategoryPath(url);
String value = url.toFullString();
RpcException exception = null;
boolean success = false;
// 遍歷當(dāng)前URL的所有可用節(jié)點(diǎn)地址逃延,遍歷從redis中刪除,并發(fā)布注銷消息
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
jedis.hdel(key, value);
jedis.publish(key, Constants.UNREGISTER);
success = true;
if (!replicate) {
break; // If the server side has synchronized data, just write a single machine
}
}
} catch (Throwable t) {
exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
if (exception != null) {
if (success) {
logger.warn(exception.getMessage(), exception);
} else {
throw exception;
}
}
}
2.3.3.4轧拄、訂閱(doSubscribe)
訂閱邏輯稍微復(fù)雜揽祥,支持同步和異步方式將URL信息同步至訂閱者,如RegistryDirectory檩电;同步邏輯比較簡(jiǎn)單拄丰,將被訂閱的URL與redis中緩存URL進(jìn)行交叉過(guò)濾,最終通過(guò)父類notify方法(AbstractRegistry的notify方法)俐末,完成URL數(shù)據(jù)同步料按;異步邏輯由Notifier線程實(shí)現(xiàn),Notifier依照線性退避規(guī)則執(zhí)行鹅搪,執(zhí)行邏輯除了同步URL信息之外站绪,還會(huì)訂閱redis消息隊(duì)列并消費(fèi)隊(duì)列中消息,當(dāng)然丽柿,消費(fèi)的核心邏輯也是執(zhí)行doNotify方法恢准。
public void doSubscribe(final URL url, final NotifyListener listener) {
String service = toServicePath(url);
// 每個(gè)service對(duì)應(yīng)一個(gè)notifier線程
Notifier notifier = notifiers.get(service);
// 異步方式魂挂,創(chuàng)建Notifier線程,并啟動(dòng)馁筐,線性回避執(zhí)行涂召,數(shù)據(jù)流 : redis緩存、redis消息隊(duì)列 -> 訂閱者
if (notifier == null) {
Notifier newNotifier = new Notifier(service);
notifiers.putIfAbsent(service, newNotifier);
notifier = notifiers.get(service);
// notifier線程創(chuàng)建后即啟動(dòng)
if (notifier == newNotifier) {
notifier.start();
}
}
boolean success = false;
RpcException exception = null;
// 同步方式訂閱 數(shù)據(jù)流 redis緩存 -> 訂閱者敏沉,如RegistryDirector
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
// 區(qū)分所有服務(wù)與單個(gè)服務(wù)果正。
if (service.endsWith(Constants.ANY_VALUE)) {
admin = true;
Set<String> keys = jedis.keys(service);
if (CollectionUtils.isNotEmpty(keys)) {
Map<String, Set<String>> serviceKeys = new HashMap<>();
for (String key : keys) {
String serviceKey = toServicePath(key);
Set<String> sk = serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
sk.add(key);
}
for (Set<String> sk : serviceKeys.values()) {
doNotify(jedis, sk, url, Collections.singletonList(listener));
}
}
} else {
doNotify(jedis, jedis.keys(service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE), url, Collections.singletonList(listener));
}
success = true;
break; // Just read one server's data
}
} catch (Throwable t) { // Try the next server
exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
// 異常處理邏輯略去
}
來(lái)看doNotify實(shí)現(xiàn)
private void doNotify(Jedis jedis, String key) {
for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<>(getSubscribed()).entrySet()) {
doNotify(jedis, Collections.singletonList(key), entry.getKey(), new HashSet<>(entry.getValue()));
}
}
private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
if (keys == null || keys.isEmpty()
|| listeners == null || listeners.isEmpty()) {
return;
}
long now = System.currentTimeMillis();
List<URL> result = new ArrayList<>();
List<String> categories = Arrays.asList(url.getParameter(Constants.CATEGORY_KEY, new String[0]));
String consumerService = url.getServiceInterface();
for (String key : keys) {
if (!Constants.ANY_VALUE.equals(consumerService)) {
String providerService = toServiceName(key);
if (!providerService.equals(consumerService)) {
continue;
}
}
String category = toCategoryName(key);
if (!categories.contains(Constants.ANY_VALUE) && !categories.contains(category)) {
continue;
}
List<URL> urls = new ArrayList<>();
// redis緩存URL信息與內(nèi)存中被訂閱的URL信息進(jìn)行交叉過(guò)濾
Map<String, String> values = jedis.hgetAll(key);
if (CollectionUtils.isNotEmptyMap(values)) {
for (Map.Entry<String, String> entry : values.entrySet()) {
URL u = URL.valueOf(entry.getKey());
if (!u.getParameter(Constants.DYNAMIC_KEY, true)
|| Long.parseLong(entry.getValue()) >= now) {
if (UrlUtils.isMatch(url, u)) {
// 與url匹配的緩存URL放入待通知列表
urls.add(u);
}
}
}
}
// 若無(wú)有效URL,則填充urls地址為任意值*
if (urls.isEmpty()) {
urls.add(url.setProtocol(Constants.EMPTY_PROTOCOL)
.setAddress(Constants.ANYHOST_VALUE)
.setPath(toServiceName(key))
.addParameter(Constants.CATEGORY_KEY, category));
}
result.addAll(urls);
if (logger.isInfoEnabled()) {
logger.info("redis notify: " + key + " = " + urls);
}
}
if (CollectionUtils.isEmpty(result)) {
return;
}
// 由父類nofity方法完成最終URL數(shù)據(jù)的同步
for (NotifyListener listener : listeners) {
notify(url, listener, result);
}
}
再來(lái)看Notifier實(shí)現(xiàn)
private class Notifier extends Thread {
private final String service;
private final AtomicInteger connectSkip = new AtomicInteger();
private final AtomicInteger connectSkipped = new AtomicInteger();
private volatile Jedis jedis;
private volatile boolean first = true;
private volatile boolean running = true;
private volatile int connectRandom;
public Notifier(String service) {
super.setDaemon(true);
super.setName("DubboRedisSubscribe");
this.service = service;
}
private void resetSkip() {
connectSkip.set(0);
connectSkipped.set(0);
connectRandom = 0;
}
// 首次結(jié)果為false盟迟;線性退避算法
private boolean isSkip() {
// 初始值均為0
int skip = connectSkip.get(); // Growth of skipping times
if (skip >= 10) {
if (connectRandom == 0) {
connectRandom = ThreadLocalRandom.current().nextInt(10);
}
skip = 10 + connectRandom;
}
// 初始值 false秋泳,
// 第一次:false,0-1
// 第二次:true攒菠,1-1
// 第三次:false迫皱,0-2
// 第四次:true,1-2
// 第五次:true,2-2
// 第五次:false辖众,0-3
if (connectSkipped.getAndIncrement() < skip) { // Check the number of skipping times
return true;
}
// skip 自增
connectSkip.incrementAndGet();
// skiped重置0
connectSkipped.set(0);
connectRandom = 0;
return false;
}
@Override
public void run() {
while (running) {
try {
// 線性回避卓起,是否跳過(guò)
if (!isSkip()) {
try {
for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
JedisPool jedisPool = entry.getValue();
try {
jedis = jedisPool.getResource();
try {
// service是具體類型還是所有服務(wù)
if (service.endsWith(Constants.ANY_VALUE)) {
if (!first) {
first = false;
Set<String> keys = jedis.keys(service);
if (CollectionUtils.isNotEmpty(keys)) {
for (String s : keys) {
// 同步redis中緩存數(shù)據(jù)至各訂閱者以及本地cache文件
doNotify(jedis, s);
}
}
// 同步完之后,重置skip
resetSkip();
}
// 處理Redis消息隊(duì)列中所有與service匹配的消息
jedis.psubscribe(new NotifySub(jedisPool), service); // blocking
} else {
if (!first) {
first = false;
// 同步redis中緩存數(shù)據(jù)至各訂閱者以及本地cache文件
doNotify(jedis, service);
resetSkip();
}
// 處理Redis消息隊(duì)列中所有與service匹配的消息凹炸,同樣的同步至各數(shù)據(jù)訂閱者以及本地cache文件戏阅。
jedis.psubscribe(new NotifySub(jedisPool), service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE); // blocking
}
break;
} finally {
jedis.close();
}
} catch (Throwable t) { // Retry another server
logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
// If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources
sleep(reconnectPeriod);
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
sleep(reconnectPeriod);
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
public void shutdown() {
try {
running = false;
jedis.disconnect();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
這里順便提一下NotifySub,繼承自JedisPubSub啤它,用于redis消息隊(duì)列中消息的消費(fèi)奕筐,核心邏輯在onMessage方法:
@Override
public void onMessage(String key, String msg) {
if (logger.isInfoEnabled()) {
logger.info("redis event: " + key + " = " + msg);
}
// 只處理注冊(cè)、注銷類消息
if (msg.equals(Constants.REGISTER)
|| msg.equals(Constants.UNREGISTER)) {
try {
Jedis jedis = jedisPool.getResource();
try {
doNotify(jedis, key);
} finally {
jedis.close();
}
} catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee
logger.error(t.getMessage(), t);
}
}
}
2.3.4蚕键、ZookeeperRegistry
ZookeeperRegistry救欧,即zk注冊(cè)中心,也是dubbo官方默認(rèn)采用的注冊(cè)中心锣光。先來(lái)看構(gòu)造方法:
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 初始化zkClient
zkClient = zookeeperTransporter.connect(url);
// 若是重連狀態(tài),執(zhí)行恢復(fù)邏輯铝耻,即將已注冊(cè)過(guò)的URL誊爹,放入retry列表,重新注冊(cè)瓢捉;
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
構(gòu)造方法里有一個(gè)參數(shù)频丘,ZookeeperTransporter,先來(lái)看一下這個(gè)ZookeeperTransporter泡态。
2.3.4.1甥材、ZookeeperTransporter
ZookeeperTransporter是dubbo對(duì)zk客戶端的適配接口蚕涤,支持SPI(方法級(jí)),內(nèi)部只有一個(gè)connect方法鱼鸠,返回ZookeeperClient實(shí)例。我們知道厢呵,zk客戶端有常用的兩個(gè)實(shí)現(xiàn),分別是:
-
Curator,Netflix公司開(kāi)源的一套zookeeper客戶端框架,jar包gav如下:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.1</version> </dependency>
-
Zkclient,Github上一個(gè)開(kāi)源的Zookeeper客戶端,在Zookeeper原生 API接口之上進(jìn)行了包裝员萍,是一個(gè)更加易用的Zookeeper客戶端,jar包gav如下:
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.2</version> </dependency>
dubbo中借助這兩種客戶端組件,ZookeeperClient的實(shí)現(xiàn)有CuratorZookeeperClient拣度、ZkclientZookeeperClient碎绎,內(nèi)部通過(guò)客戶端組件完成zk的連接、監(jiān)聽(tīng)等動(dòng)作抗果,具體邏輯這里不做詳細(xì)解析筋帖。繼續(xù)來(lái)看ZookeeperTransporter,基于接口的基類實(shí)現(xiàn)AbstractZookeeperTransporter冤馏,實(shí)現(xiàn)了connect方法日麸,同時(shí)定義模板方法createZookeeperClient由具體的Transporter(CuratorZookeeperTransporter、ZkclientZookeeperTransporter宿接,邏輯比較簡(jiǎn)單赘淮,省略)實(shí)現(xiàn)。直接來(lái)看基類的connect方法:
@Override public ZookeeperClient connect(URL url) { ZookeeperClient zookeeperClient; // backUrl解析 List<String> addressList = getURLBackupAddress(url); // The field define the zookeeper server , including protocol, host, port, username, password // fetchAndUpdateZookeeperClientCache 邏輯比較簡(jiǎn)單睦霎,從緩存里取梢卸,取不到則返回null; if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) { logger.info("find valid zookeeper client from the cache for address: " + url); return zookeeperClient; } // avoid creating too many connections副女, so add lock蛤高,加鎖,防并發(fā)碑幅。 synchronized (zookeeperClientMap) { if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) { logger.info("find valid zookeeper client from the cache for address: " + url); return zookeeperClient; } // 緩存沒(méi)取到戴陡,創(chuàng)建zkClient,并緩存到map zookeeperClient = createZookeeperClient(toClientURL(url)); logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url); writeToClientMap(addressList, zookeeperClient); } return zookeeperClient; }
2.3.4.2沟涨、doRegister方法
基類FailbackRegistry的模板方法實(shí)現(xiàn)恤批,邏輯非常簡(jiǎn)單:
@Override
public void doRegister(URL url) {
try {
// 創(chuàng)建zk臨時(shí)節(jié)點(diǎn),節(jié)點(diǎn)目錄類似 /dubbo/xxx/xxx/xxx.xxx.xxxService
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
2.3.4.3裹赴、doUnregister方法
基類FailbackRegistry的模板方法實(shí)現(xiàn)喜庞,邏輯同樣非常簡(jiǎn)單:
@Override
public void doUnregister(URL url) {
try {
// 刪除zk節(jié)點(diǎn)
zkClient.delete(toUrlPath(url));
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
2.3.4.4、doSubscribe方法
基類FailbackRegistry的模板方法實(shí)現(xiàn)棋返,邏輯相對(duì)復(fù)雜延都,主要分為幾部分1)zkListener的初始化,2)遞歸訂閱url變更信息睛竣,創(chuàng)建zk永久節(jié)點(diǎn)晰房,3)執(zhí)行父類notify(邏輯參考AbstractRegistry解析部分),代碼如下:
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
// 無(wú)指定服務(wù)
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
// 根節(jié)點(diǎn)/dubbo
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
// 初始化zkListener
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
// 初始化anyServices 注冊(cè) ChildListener到listener內(nèi)存緩存,訂閱childChange變更殊者,即當(dāng)執(zhí)行childChanged時(shí)与境,完成對(duì)所有child的訂閱。
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
// 遞歸訂閱url變更消息幽污,最終會(huì)走到else嚷辅,結(jié)束
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
});
zkListener = listeners.get(listener);
}
// 創(chuàng)建永久節(jié)點(diǎn)/dubbo
zkClient.create(root, false);
// 訂閱URL變更信息
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
// 遞歸訂閱url變更消息,最終會(huì)走到else距误,結(jié)束
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
//指定URL變更處理
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
// childChange事件簸搞,只做notify到指定listener
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
// 監(jiān)聽(tīng)該path
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 父類AbstractRegistry的notify邏輯,略去
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
2.3.4.5准潭、doUnsubscribe方法
基類FailbackRegistry的模板方法實(shí)現(xiàn)趁俊,直接上代碼:
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
ChildListener zkListener = listeners.get(listener);
if (zkListener != null) {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
// 邏輯比較簡(jiǎn)單,即不再訂閱對(duì)應(yīng)的listener
zkClient.removeChildListener(root, zkListener);
} else {
for (String path : toCategoriesPath(url)) {
zkClient.removeChildListener(path, zkListener);
}
}
}
}
}
2.3.4.6刑然、lookup方法
基類AbstractRegistry的模板方法實(shí)現(xiàn),核心邏輯是從zk中查詢指定URL對(duì)應(yīng)地址的所有可用URL(不同Category)寺擂,直接來(lái)看代碼:
@Override
public List<URL> lookup(URL url) {
if (url == null) {
throw new IllegalArgumentException("lookup url == null");
}
try {
List<String> providers = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
List<String> children = zkClient.getChildren(path);
if (children != null) {
providers.addAll(children);
}
}
return toUrlsWithoutEmpty(url, providers);
} catch (Throwable e) {
throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
小結(jié)
本文重點(diǎn)解析了dubbo中Registry的相關(guān)核心實(shí)現(xiàn),從RegistryFactory到Registry泼掠,dubbo抽象了一系列注冊(cè)中心怔软,為用戶自定義擴(kuò)展提供了非常優(yōu)良的入口;同時(shí)择镇,dubbo為我們提供了基于緩存挡逼、多播、zk等的注冊(cè)中心實(shí)現(xiàn)腻豌,非常方便家坎,不得不感嘆設(shè)計(jì)的非常好。
注:源碼版本 2.7.1吝梅。8月以來(lái)虱疏,接連經(jīng)歷了迎接小生命的驚喜,跳槽后的適應(yīng)期苏携,中間耽誤了挺長(zhǎng)一段時(shí)間做瞪,后面我會(huì)努力持續(xù)更新的,come on右冻。