前面的章節(jié)里面對zk的介紹很少咖刃,這邊會介紹serviceBean在export的過程中僵缺,到底向zk寫了什么磕潮,以及訂閱了什么容贝。zk的功能無非是信息的存儲和變更通知。
我們還是從ServiceBean的export方法進(jìn)行跟蹤斤富。會一直的跟蹤到ServiceConfig類的doExportUrls方法。如下
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
由于我們只配置了zk一個注冊中心焕参,registryURLs返回如下
在ServiceConfig的doExportUrlsFor1Protocol方法的三句非常重要
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker)
前兩句就是將此ref封裝成Invoker叠纷,只有將ref封裝成Invoker之后,才會將此invoker的信息寫入到zk里面去崇众。
由于Invoker里面封裝的url的protocol為registry航厚,不難猜出。
Exporter<?> exporter = protocol.export(wrapperInvoker);
會調(diào)到RegistryProtocol類的export方法眯漩。那么核心的方法都封裝在RegistryProtocol類的export方法,如下
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker 在本地暴露的意思是坤塞,在本地啟動一個nettyService進(jìn)行暴露
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//拿到zk的地址
URL registryUrl = getRegistryUrl(originInvoker);
//registry provider 這個就是注冊provider
//根據(jù)originInvoker的url拿到Registry ZookeeperRegistry(其實(shí)就是連接zk service
的客戶端,里面封裝了各種的信息)
final Registry registry = getRegistry(originInvoker);
//服務(wù)端暴露的dubbo服務(wù)的本地信息
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
//這里是核心灼狰,去注冊
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
其實(shí)上面的這個方法,就兩個目的
1 在本地進(jìn)行暴露份汗,啟動nettySevice蝴簇,這個在前面已經(jīng)講過了,這里帶過
2 在zk上面進(jìn)行注冊旁钧,這樣消費(fèi)者就能看到自己互拾。
我們看下 register(registryUrl, registeredProviderUrl)方法
public void register(URL registryUrl, URL registedProviderUrl) {
//這個就是前面說的ZookeeperRegistry
Registry registry = registryFactory.getRegistry(registryUrl);
//registedProviderUrl,這個就是本地dubbo服務(wù)的url信息。
registry.register(registedProviderUrl);
}
而 registry.register(registedProviderUrl);最終會進(jìn)入到ZookeeperRegistry的doRegister方法颜矿,如下
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
而toUrlPath(url)返回值為,如下
可以看到田篇,我們再
/dubbo/com.ilsp.order.service.ContractService/providers節(jié)點(diǎn)下添加了此dubbo服務(wù)的本地信息。
經(jīng)過這步之后泊柬,那消費(fèi)者通過查詢/dubbo/com.ilsp.order.service.ContractService/providers節(jié)點(diǎn)就知道有哪些消費(fèi)者進(jìn)行注冊了彬呻。
而在RegistryProtocol類的export方法里面還有一句重要的語句
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
overrideSubscribeUrl路徑添加監(jiān)聽器overrideSubscribeListener進(jìn)行監(jiān)聽。
而這個方法最終也會進(jìn)入ZookeeperRegistry的doSubscribe方法闸氮,源碼如下
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && !services.isEmpty()) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<URL> urls = new ArrayList<URL>();
//path就是/dubbo/com.ilsp.order.service.ContractService/configurators
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
//創(chuàng)建/dubbo/com.ilsp.order.service.ContractService/configurators的zk path
zkClient.create(path, false);
//對該path添加監(jiān)聽器
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//回調(diào)監(jiān)聽器
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
而這個notify(url, listener, urls)會回調(diào)到RegistryProtocol的內(nèi)部私有類OverrideListener剪况。
如果configurators的配置發(fā)生了修改,同時也會修改本地暴露的dubbo服務(wù)的信息蒲跨。
如果我們打開dubbo控制臺译断,對于dubbo服務(wù)來說,只關(guān)心如下的兩項(xiàng)或悲。
ok 下一篇我們會看下 當(dāng)ReferenceBean在refer的時候孙咪,到底想zk寫了什么,訂閱了什么