??Dubbo的Provider隆檀,Consumer在啟動時都會創(chuàng)建一個注冊中心,注冊中心可以選擇Zookeeper宠哄,Redis垫蛆。常用的是Zookeeper禽最,我們這篇博客主要講的就是Dubbo與Zookeeper的注冊交互過程。
??Dubbo里默認使用zkclient
來操作zookeeper服務器袱饭,其對zookeeper原始客戶單做了一定的封裝川无,操作zookeeper時能便捷一些,比如不需要手動處理session
超時虑乖,不需要重復注冊watcher
等等懦趋。
??分布式服務框架Dubbo中使用zookeeper來作為其命名服務,維護全局的服務地址列表疹味。在Dubbo是實現(xiàn)中:服務提供者provider在啟動的時候仅叫,向Zookeeper的指定節(jié)點:~/dubbo/${serviceName}/providers
目錄下寫入自己的URL地址,這個操作就會完成服務的發(fā)布糙捺。一般數(shù)據(jù)存放路徑為 /zookeeper-xxx/data/version-xx/log.x
诫咱。
??服務消費者啟動的時候,訂閱~/dubbo/${serviceName}/providers
目錄下的提供者URL地址洪灯。注意:所有向ZK上注冊的地址都是臨時節(jié)點遂跟,這樣就能夠保證服務提供者和消費者能夠自動感應資源的變化。Dubbo還有針對服務粒度的監(jiān)控婴渡,方法是訂閱/dubbo/${serviceName}目錄下所有提供者和消費者的信息。
Dubbo在Zookeeper上注冊的節(jié)點目錄:假設接口名稱是:com.bob.dubbo.service.CityDubboService
如果注冊中心集群都掛掉凯亮,發(fā)布者和訂閱者之間還能通信么边臼?
可以通信,啟動dubbo時假消,消費者會從zk拉去注冊的生產(chǎn)者的地址接口作為數(shù)據(jù)柠并,緩存在本地,每次調(diào)用安裝本地的緩存地址進行調(diào)用。
在具體講解ZookeeperRegistry的相關源碼之前臼予,先來分析下dubbo在zookeeper的目錄結(jié)構以及dubbo如何利用這個特性:
針對每個接口節(jié)點會存在以下4個子節(jié)點:
節(jié)點名 | 作用 |
---|---|
consumers | 存儲消費者節(jié)點url |
configuators | 存儲override或者absent url鸣戴,用于服務治理 |
routers | 用于設置路由url,用于服務治理 |
providers | 存儲在線提供者url |
如圖:
Dubbo啟動時粘拾,Consumer和Provider都會把自身的URL格式化為字符串窄锅,然后注冊到zookeeper相應節(jié)點下,作為一個臨時節(jié)點缰雇,當連斷開時入偷,節(jié)點被刪除。
Consumer在啟動時械哟,不僅僅會注冊自身到 …/consumers/目錄下疏之,同時還會訂閱…/providers目錄,實時獲取其上Provider的URL字符串信息暇咆。
zookeeper注冊中心的源碼為com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry
锋爪。
ZookeeperRegistry
類繼承自 FailbackRegistry
,FailbackRegistry
又繼承自 AbstractRegistry
爸业,AbstractRegistry
實現(xiàn)了 RegistryService
接口其骄。
因此我們閱讀源碼順序為:RegistryService -> AbstractRegistry -> FailbackRegistry -> ZookeeperRegistry
RegistryService 接口
AbstractRegistry 抽象類
從構造函數(shù)可以看出 AbstractRegistry
抽象類主要是提供了對注冊中心數(shù)據(jù)的文件緩存。
Dubbo會在用戶目錄創(chuàng)建
./dubbo
文件夾及緩存文件,以windows為例,生成的緩存文件為:C:\Users\你的登錄用戶名/.dubbo/dubbo-registry-127.0.0.1.cache
FailbackRegistry 抽象類
FailbackRegistry
顧名思義是主要提供的是失敗自動恢復馆揉,同樣看一下構造函數(shù)龄砰,在構造函數(shù)中會通過 ScheduledExecutorService
一直執(zhí)行Retry
方法進行重試。
retry()方法主要的從各個操作中的失敗列表取出失敗的操作進行重試员魏。
ZookeeperRegistry 類
同時提供了幾個抽象方法
ZookeeperRegistry流程
服務提供者啟動時
向/dubbo/com.foo.BarService/providers目錄下寫入自己的URL地址。
服務消費者啟動時
訂閱/dubbo/com.foo.BarService/providers目錄下的提供者URL地址。
并向/dubbo/com.foo.BarService/consumers目錄下寫入自己的URL地址否副。
監(jiān)控中心啟動時
訂閱/dubbo/com.foo.BarService目錄下的所有提供者和消費者URL地址。
ZookeeperRegistry
主要是實現(xiàn)了FailbackRegistry
的那幾個抽象方法崎坊。本次也主要分析 doRegister(),doSubscribe()這兩個方法备禀。
注冊:
doRegister() 主要是調(diào)用zkClient創(chuàng)建一個節(jié)點。 create()以遞歸的方式創(chuàng)建節(jié)點奈揍,通過判斷Url中dynamic=false 判斷創(chuàng)建的是持久化節(jié)點還是臨時節(jié)點曲尸。
主要做了以下幾步:
1)記錄注冊注冊地址 2) 注冊節(jié)點到zookeeper上 3) 捕捉錯誤信息,出錯則記錄下來男翰,等待定期器去重新執(zhí)行
訂閱
doSubscribe()
doSubscribe()
訂閱Zookeeper節(jié)點是通過創(chuàng)建ChildListener
來實現(xiàn)的具體調(diào)用的方法是 addChildListener()
addChildListener()
又調(diào)用 AbstractZookeeperClient.addTargetChildListener()
然后調(diào)用subscribeChildChanges()
最后調(diào)用ZkclientZookeeperClient ZkClientd.watchForChilds()
消費者在引用服務時另患,會訂閱接口下的providers的節(jié)點。一旦providers下的子節(jié)點發(fā)生改變(提供者的服務器增加或者刪除)蛾绎,會通知到消費者昆箕。消費者會把提供者的集群地址緩存到本地鸦列。
主要做了以下幾步操作(以具體接口為例)
1) 將訂閱信息記錄到集合中 `
2) 將路徑轉(zhuǎn)變成/dubbo/xxService/providers,
/dubbo/xxService/configurators
鹏倘,/dubbo/xxService/routers 循環(huán)這三個路徑
如果消費者的接口沒有創(chuàng)建過子節(jié)點監(jiān)聽器薯嗤,那么就創(chuàng)建子節(jié)點監(jiān)聽器
創(chuàng)建路徑節(jié)點,并將子節(jié)點監(jiān)聽器放入到節(jié)點上纤泵。(一旦子節(jié)點發(fā)生改變骆姐,就通知)
獲取到當前路徑節(jié)點下的所有子節(jié)點(提供者),將這些子節(jié)點組裝成集合夕吻;
如果沒有節(jié)點诲锹,那么就將消費者的地址的協(xié)議變成empty、
empty://10.118.14.204/com.test.ITestService?
application\=testservice&category\=configurators
通知
3) 出現(xiàn)異常涉馅,根據(jù)url從本地緩存文件中獲取到提供者的地址归园,通知
[站外圖片上傳中...(image-3598a1-1550486301310)]
ZookeeperRegistry
接口是* (對所有的接口進行訂閱,有點類似于遞歸訂閱)
`1) 如果在集合中沒有創(chuàng)建過*的子節(jié)點監(jiān)聽器稚矿,那么就創(chuàng)建子節(jié)點監(jiān)聽器庸诱,一旦root下的子節(jié)點(service)發(fā)生改變,那么就對這個節(jié)點就行訂閱NotifyListener 晤揣。(這時就有具體的接口了)
- 創(chuàng)建root節(jié)點桥爽,將子節(jié)點監(jiān)聽器放入到root上。并返回root下的所有的接口昧识,對這些接口訂閱NotifyListener`
接口是具體 (以providers為例)
`1)將接口名稱轉(zhuǎn)變成/dubbo/com.test.ITestService/providers钠四,集合中沒有沒有providers的子節(jié)點監(jiān)聽器,就創(chuàng)建子節(jié)點監(jiān)聽器跪楞。一旦子節(jié)點發(fā)生改變缀去,那么就通知
- 創(chuàng)建 /dubbo/com.test.ITestService/providers,并且將子節(jié)點監(jiān)聽器放入到這個節(jié)點上甸祭,并返回所有的子節(jié)點(提供者)缕碎,通知。`
總結(jié)
當消費者要訂閱接口中的提供者時
會監(jiān)聽/dubbo/xxService/providers下的所有提供者池户。一旦提供者的節(jié)點刪除或增加時咏雌,都會通知到消費者的url(consumer://10.118.14.204/com.test.ITestService…….)
它會監(jiān)聽以下三個節(jié)點的子節(jié)點
1) /dubbo/xxService/providers 2)/dubbo/xxService/configurators 3)/dubbo/xxService/routers
組裝的url集合(即提供者的子節(jié)點providers,configurators,routers下的子節(jié)點)。如果沒有子節(jié)點(沒有提供者)校焦,那么就將消費者的協(xié)議變成empty作為url赊抖。
protected void doSubscribe(final URL url, final NotifyListener listener) {
//接口名稱(*代表需要監(jiān)聽root下面的所有節(jié)點)
if ("*".equals(url.getServiceInterface())) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.
get(url);
//如果listeners為空創(chuàng)建并放入到map中...
ChildListener zkListener = listeners.get(listener);
/**
root下的子節(jié)點是service接口
創(chuàng)建子節(jié)點監(jiān)聽器,對root下的子節(jié)點做監(jiān)聽寨典,一旦有子節(jié)點發(fā)生改變熏迹,
那么就對這個節(jié)點進行訂閱.
**/
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String>
currentChilds) {
for (String child : currentChilds) {
//如果不存在,才訂閱
if (! anyServices.contains(child)) {
anyServices.add(child);
//訂閱
subscribe(url.setPath(child).addParameters(
"interface", child,"check", "false"), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
//創(chuàng)建root節(jié)點
zkClient.create(root, false);
//添加root節(jié)點的子節(jié)點監(jiān)聽器凝赛,并返回當前的services
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && services.size() > 0) {
//對root下的所有service節(jié)點進行訂閱
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters("interface",
service, "check", "false"), listener);
}
}
} else {
List<URL> urls = new ArrayList<URL>();
/**將url轉(zhuǎn)變成
/dubbo/com.test.ITestService/providers
/dubbo/com.test.ITestService/configurators
/dubbo/com.test.ITestService/routers
**/
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners =
zkListeners.get(url);
//如果listeners為空就創(chuàng)建并放入盜map中
ChildListener zkListener = listeners.get(listener);
/**
對接口下的providers的子節(jié)點進行監(jiān)聽注暗,一旦發(fā)生改變,就通知
**/
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String>
currentChilds) {
//通知
ZookeeperRegistry.this.notify(url, listener,
toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
//創(chuàng)建/dubbo/com.test.ITestService/providers
zkClient.create(path, false);
//獲取到providers的所有子節(jié)點(提供者)
List<String> children = zkClient.addChildListener(path,
zkListener);
//獲取到所有的提供者墓猎,組裝起來
if (children != null) {
//有子節(jié)點組裝捆昏,沒有那么就將消費者的協(xié)議變成empty作為url。
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//通知/dubbo/com.test.ITestService/providers的所有子節(jié)點
notify(url, listener, urls);
}
}
/**
根據(jù)url獲取到哪些類型
consumer://10.118.14.204/com.test.ITestService?application=testservice
&category=providers,configurators,routers&...
這里的category是重點
**/
private String[] toCategoriesPath(URL url) {
String[] categroies;
//如果是*
if ("*".equals(url.getParameter(Constants.CATEGORY_KEY)))
categroies = new String[] {"providers", "consumers",
"routers", "configurators"};
else
//從url獲取到category的值毙沾,沒有的話就默認providers
categroies = url.getParameter("category",
new String[] {"providers"});
String[] paths = new String[categroies.length];
//將格式轉(zhuǎn)變成/dubbo/xxService/類型
for (int i = 0; i < categroies.length; i ++) {
paths[i] = toServicePath(url) + "/" + categroies[i];
}
return paths;
}
/**
組裝providers骗卜、routers、configurators下的url左胞。
如果有提供者那么就組裝寇仓;沒有的話,就將消費者的協(xié)議變成empty
**/
private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
if (urls.isEmpty()) {
int i = path.lastIndexOf('/');
String category = i < 0 ? path : path.substring(i + 1);
URL empty = consumer.setProtocol("empty").addParameter(
"category", category);
urls.add(empty);
}
return urls;
}
通知
有三個參數(shù)
url: 消費者的地址 consumer://10.118.14.204/com….. listener: 監(jiān)聽器 urls: providers烤宙,configurators和routers
FailbackRegistry
主要是鋪捉到異常時放入到集合中遍烦,定時重試
AbstractRegistry
urls三種
1) providers(providers下的子節(jié)點) dubbo://10.118.22.29:20710/com.test.ITestService?anyhost=true&application=testservice&default.cluster=failfast…
2) configurators(configurators下的子節(jié)點為空,將消費者的url變成empty ) empty://10.118.14.204/com.test.ITestService?application=testservice&category=configurators&default.check=false..
3) routers(routers下的子節(jié)點為空躺枕,將消費者的url變成empty) empty://10.118.14.204/com.test.ITestService?application=testservice&category=routers&default.check=false..
保存到本地緩存文件
組裝url保存到properties中服猪,如果是同步,直接保存到本地緩存文件中拐云,否則文件緩存定時寫入
首先會有個dubbo-registry-10.118.22.25.cache.lock
罢猪,會獲取這個文件的鎖,然后保存dubbo-registry-10.118.22.25.cache
文件叉瘩,再釋放鎖膳帕。
public void doSaveProperties(long version) {
if (version < lastCacheChanged.get()) {
return;
}
if (file == null) {
return;
}
Properties newProperties = new Properties();
// 保存之前先讀取一遍,防止多個注冊中心之間沖突
InputStream in = null;
try {
if (file.exists()) {
in = new FileInputStream(file);
newProperties.load(in);
}
} catch (Throwable e) {
logger.warn("Failed to load registry store file, cause: " + e.getMessage(), e);
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
// 保存
try {
newProperties.putAll(properties);
File lockfile = new File(file.getAbsolutePath() + ".lock");
if (!lockfile.exists()) {
lockfile.createNewFile();
}
RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
try {
FileChannel channel = raf.getChannel();
try {
FileLock lock = channel.tryLock();
if (lock == null) {
throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
}
// 保存
try {
if (!file.exists()) {
file.createNewFile();
}
FileOutputStream outputFile = new FileOutputStream(file);
try {
newProperties.store(outputFile, "Dubbo Registry Cache");
} finally {
outputFile.close();
}
} finally {
lock.release();
}
} finally {
channel.close();
}
} finally {
raf.close();
}
} catch (Throwable e) {
if (version < lastCacheChanged.get()) {
return;
} else {
registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
}
logger.warn("Failed to save registry store file, cause: " + e.getMessage(), e);
}
}
監(jiān)聽器通知 (當收到服務變更通知時觸發(fā)薇缅。)
當收到提供者的地址發(fā)生改變時危彩,這時刷新緩存中的invoker,如果url不存在捅暴,那么重新refer(根據(jù)dubbo協(xié)議)
通知需處理契約:
1. 總是以服務接口和數(shù)據(jù)類型為維度全量通知恬砂,即不會通知一個服務的同類型的部分數(shù)據(jù),用戶不需要對比上一次通知結(jié)果蓬痒。
2. 訂閱時的第一次通知泻骤,必須是一個服務的所有類型數(shù)據(jù)的全量通知。
3. 中途變更時梧奢,允許不同類型的數(shù)據(jù)分開通知狱掂,比如:providers, consumers, routers, overrides,允許只通知其中一種類型亲轨,但該類型的數(shù)據(jù)必須是全量的趋惨,不是增量的。
4. 如果一種類型的數(shù)據(jù)為空惦蚊,需通知一個empty協(xié)議并帶category參數(shù)的標識性URL數(shù)據(jù)器虾。
5. 通知者(即注冊中心實現(xiàn))需保證通知的順序讯嫂,比如:單線程推送,隊列串行化兆沙,帶版本對比欧芽。