Dubbo中關(guān)于服務(wù)的訂閱和通知主要發(fā)生在服務(wù)提供方暴露服務(wù)的過程和服務(wù)消費方初始化時候引用服務(wù)的過程中蹲诀。
服務(wù)引用過程中的訂閱和通知
在服務(wù)消費者初始化的過程中,會有一步是進(jìn)行服務(wù)的引用括细,具體的代碼是在RegistryProtocol的refer方法:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
//在這一步獲取注冊中心實例的過程中,也會有notify的操作。(這里省略)
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0 ) {
if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
|| "*".equals( group ) ) {
return doRefer( getMergeableCluster(), registry, type, url );
}
}
return doRefer(cluster, registry, type, url);
}
在refer方法中有一步是獲取注冊中心實例,這一步中也會有一個notify操作领迈,先暫時不解釋。接著就是doRefer方法:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
//訂閱的url
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
//服務(wù)消費方向注冊中心注冊自己,供其他層使用狸捅,比如服務(wù)治理
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
//訂閱服務(wù)提供方
//同時訂閱了三種類型providers衷蜓,routers,configurators尘喝。
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
return cluster.join(directory);
}
在doRefer方法中服務(wù)消費者會訂閱服務(wù)磁浇,同時訂閱了三種類型:providers,routers朽褪,configurators置吓。
接續(xù)看directory.subscribe訂閱方法,這里directory是RegistryDirectory:
public void subscribe(URL url) {
//設(shè)置消費者url
setConsumerUrl(url);
//訂閱
//url為訂閱條件缔赠,不能為空
//第二個參數(shù)this衍锚,是變更事件監(jiān)聽器,不允許為空嗤堰,RegistryDirectory實現(xiàn)了NotifyListener接口戴质,因此是一個事件監(jiān)聽器
registry.subscribe(url, this);
}
這里registry是ZookeeperRegistry,在ZookeeperRegistry調(diào)用subscribe處理之前會先經(jīng)過AbstractRegistry的處理踢匣,然后經(jīng)過FailbackRegistry處理告匠,在FailbackRegistry中會調(diào)用ZookeeperRegistry的doSubscribe方法符糊。
首先看下AbstractRegistry中subscribe方法:
public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
//從緩存中獲取已經(jīng)訂閱的url的監(jiān)聽器
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners == null) {
subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
listeners = subscribed.get(url);
}
//將當(dāng)前監(jiān)聽器添加到監(jiān)聽器的set中
listeners.add(listener);
}
然后是FailbackRegistry的subscribe方法:
public void subscribe(URL url, NotifyListener listener) {
//上面AbstractRegistry的處理
super.subscribe(url, listener);
//移除訂閱失敗的
removeFailedSubscribed(url, listener);
try {
// 向服務(wù)器端發(fā)送訂閱請求
//子類實現(xiàn),我們這里使用的是ZookeeperRegistry
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if (urls != null && urls.size() > 0) {
//訂閱失敗行贪,進(jìn)行通知,重試
notify(url, listener, urls);
} else {
// 如果開啟了啟動時檢測模闲,則直接拋出異常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if(skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
}
}
// 將失敗的訂閱請求記錄到失敗列表建瘫,定時重試
addFailedSubscribed(url, listener);
}
}
這里總共進(jìn)行了一下幾件事情:
- AbstractRegistry的處理
- 移除訂閱失敗的
- 由具體的子類向服務(wù)器端發(fā)送訂閱請求
- 如果訂閱發(fā)生失敗了尸折,嘗試獲取緩存url,然后進(jìn)行失敗通知或者如果開啟了啟動時檢測实夹,則直接拋出異常
- 將失敗的訂閱請求記錄到失敗列表橄浓,定時重試
主要看下子類向服務(wù)器段發(fā)送訂閱請求的步驟,在ZookeeperRegistry的doSubscribe方法中:
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {//這里暫時沒用到先不解釋
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
if (! anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && services.size() > 0) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<URL> urls = new ArrayList<URL>();
//這里的path分別為providers荸实,routers缴淋,configurators三種
for (String path : toCategoriesPath(url)) {
//根據(jù)url獲取對應(yīng)的監(jiān)聽器map
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
//根據(jù)我們的listener獲取一個ChildListener實例
ChildListener zkListener = listeners.get(listener);
//沒有的話就創(chuàng)建一個ChildListener實例泄朴。
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
//根據(jù)path在Zookeeper中創(chuàng)建節(jié)點露氮,這里就是訂閱服務(wù)
zkClient.create(path, false);
//這里zkClient是dubbo的ZookeeperClient,在addChildListener中會轉(zhuǎn)化為ZkClient的Listener
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//訂閱完成之后畔规,進(jìn)行通知
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
上面主要是分別對providers,routers详民,configurators三種不同類型的進(jìn)行訂閱陌兑,也就是往zookeeper中注冊節(jié)點,注冊之前先給url添加監(jiān)聽器兔综。最后是訂閱完之后進(jìn)行通知。
notify方法涧窒,這里notify方法實現(xiàn)是在ZookeeperRegistry的父類FailbackRegistry中:
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
//doNotify方法中沒做處理锭亏,直接調(diào)用父類的notify方法
doNotify(url, listener, urls);
} catch (Exception t) {
// 將失敗的通知請求記錄到失敗列表,定時重試
Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
if (listeners == null) {
failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
listeners = failedNotified.get(url);
}
listeners.put(listener, urls);
}
}
看下AbstractRegistry的notify方法:
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
//獲取catagory列表慧瘤,providers,routers糖儡,configurators
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
//已經(jīng)通知過
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
//providers怔匣,routers,configurators中的一個
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
saveProperties(url);
//還記得剛開始的時候每瞒,listener參數(shù)么,這里listener是RegistryDirectory
listener.notify(categoryList);
}
}
繼續(xù)看RegistryDirectory的notify方法:
public synchronized void notify(List<URL> urls) {
//三種類型分開
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
}
}
// configurators
//更新緩存的服務(wù)提供方配置規(guī)則
if (configuratorUrls != null && configuratorUrls.size() >0 ){
this.configurators = toConfigurators(configuratorUrls);
}
// routers
//更新緩存的路由配置規(guī)則
if (routerUrls != null && routerUrls.size() >0 ){
List<Router> routers = toRouters(routerUrls);
if(routers != null){ // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// 合并override參數(shù)
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && localConfigurators.size() > 0) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers
//重建invoker實例
refreshInvoker(invokerUrls);
}
最重要的重建invoker實例呐矾,在服務(wù)引用的文章中已經(jīng)介紹過懦砂,不再重復(fù),還有上面說省略的獲取注冊中心實例的過程中罚随,也會有notify的操作羽资。(這里省略)
這里也是進(jìn)行了invoker實例的重建淘菩。
暴露服務(wù)過程中的訂閱和通知
服務(wù)暴露過程中的訂閱在RegistryProtocol的export方法中:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
// 訂閱override數(shù)據(jù)
// FIXME 提供者訂閱時屠升,會影響同一JVM即暴露服務(wù),又引用同一服務(wù)的的場景汇在,因為subscribed以服務(wù)名為緩存的key脏答,導(dǎo)致訂閱信息覆蓋糕殉。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
//OverrideListener是RegistryProtocol的內(nèi)部類
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
//訂閱override數(shù)據(jù)
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保證每次export都返回一個新的exporter實例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}
registry.subscribe訂閱override數(shù)據(jù)阿蝶,會首先經(jīng)過AbstractRegistry處理黄绩,然后經(jīng)過FailbackRegistry處理羡洁。處理方法在上面消費者發(fā)布訂閱的講解中都已經(jīng)介紹爽丹。往下的步驟基本相同,不同之處在于AbstractRegistry的notify方法:
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
//獲取catagory列表习劫,providers,routers袒餐,configurators
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
//已經(jīng)通知過
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
//providers谤狡,routers,configurators中的一個
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
saveProperties(url);
//對于消費者來說這里listener是RegistryDirectory
//而對于服務(wù)提供者來說這里是OverrideListener墓懂,是RegistryProtocol的內(nèi)部類
listener.notify(categoryList);
}
}
接下來看OverrideListener的notify方法:
/*
* provider 端可識別的override url只有這兩種.
* override://0.0.0.0/serviceName?timeout=10
* override://0.0.0.0/?timeout=10
*/
public void notify(List<URL> urls) {
List<URL> result = null;
for (URL url : urls) {
URL overrideUrl = url;
if (url.getParameter(Constants.CATEGORY_KEY) == null
&& Constants.OVERRIDE_PROTOCOL.equals(url.getProtocol())) {
// 兼容舊版本
overrideUrl = url.addParameter(Constants.CATEGORY_KEY, Constants.CONFIGURATORS_CATEGORY);
}
if (! UrlUtils.isMatch(subscribeUrl, overrideUrl)) {
if (result == null) {
result = new ArrayList<URL>(urls);
}
result.remove(url);
logger.warn("Subsribe category=configurator, but notifed non-configurator urls. may be registry bug. unexcepted url: " + url);
}
}
if (result != null) {
urls = result;
}
this.configurators = RegistryDirectory.toConfigurators(urls);
List<ExporterChangeableWrapper<?>> exporters = new ArrayList<ExporterChangeableWrapper<?>>(bounds.values());
for (ExporterChangeableWrapper<?> exporter : exporters){
Invoker<?> invoker = exporter.getOriginInvoker();
final Invoker<?> originInvoker ;
if (invoker instanceof InvokerDelegete){
originInvoker = ((InvokerDelegete<?>)invoker).getInvoker();
}else {
originInvoker = invoker;
}
URL originUrl = RegistryProtocol.this.getProviderUrl(originInvoker);
URL newUrl = getNewInvokerUrl(originUrl, urls);
if (! originUrl.equals(newUrl)){
//對修改了url的invoker重新export
RegistryProtocol.this.doChangeLocalExport(originInvoker, newUrl);
}
}
}
這里也是對Invoker重新進(jìn)行了引用捕仔。