Dubbo內(nèi)部使用zkclient操作zookeeper
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
ZKClient示例
public class CreateSession {
public static ZkClient connectZK(){
//zk集群的地址
String ZKServers = "192.168.99.100:2181";
ZkClient zkClient = new ZkClient(ZKServers,10000,10000,new SerializableSerializer());
zkClient.subscribeStateChanges(new IZkStateListener() {
public void handleNewSession() throws Exception {
System.out.println("handleNewSession()");
}
public void handleStateChanged(KeeperState stat) throws Exception {
//鏈接關(guān)閉重連等
System.out.println( "handleStateChanged,stat:" + stat);
}
});
System.out.println("conneted ok!");
return zkClient;
}
}
- 節(jié)點(diǎn)創(chuàng)建髓废、更新、刪除的操作
package com.soa.other.zk;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.CreateMode;
public class NodeManager {
public void createNode() {
ZkClient zkClient = CreateSession.connectZK();
User user = new User();
user.setId(1);
user.setName("agan1");
/**
* "/testUserNode" :節(jié)點(diǎn)的地址 user:數(shù)據(jù)的對(duì)象 CreateMode.PERSISTENT:創(chuàng)建的節(jié)點(diǎn)類型
*/
// zkClient.createPersistent("/testUserNode", user);
String path = zkClient.create("/aganNode", user,CreateMode.PERSISTENT);
// 輸出創(chuàng)建節(jié)點(diǎn)的路徑
System.out.println("創(chuàng)建節(jié)點(diǎn):" + path);
}
public void updateNode() {
ZkClient zkClient = CreateSession.connectZK();
User user = new User();
user.setId(2);
user.setName("agan2");
/**
* testUserNode 節(jié)點(diǎn)的路徑 user 傳入的數(shù)據(jù)對(duì)象
*/
zkClient.writeData("/aganNode", user);
System.out.println("修改aganNode節(jié)點(diǎn)成功" );
}
public void deleteNode(){
ZkClient zkClient = CreateSession.connectZK();
//刪除單獨(dú)一個(gè)節(jié)點(diǎn),返回true表示成功
boolean e1 = zkClient.delete("/aganNode");
//返回 true表示節(jié)點(diǎn)成功 缓屠,false表示刪除失敗
System.out.println("刪除aganNode節(jié)點(diǎn)是否成功:"+e1);
}
public static void main(String[] args) {
NodeManager nm=new NodeManager();
nm.createNode();
//斷點(diǎn)
nm.updateNode();
//斷點(diǎn)
nm.deleteNode();
}
}
- 訂閱子節(jié)點(diǎn)變化
package com.soa.other.zk;
import java.util.List;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
/**
*訂閱節(jié)點(diǎn)的信息改變(創(chuàng)建節(jié)點(diǎn),刪除節(jié)點(diǎn)护侮,添加子節(jié)點(diǎn))
*/
public class SubscribeChildChanges {
private static class ZKChildListener implements IZkChildListener{
/**
* handleChildChange: 用來處理服務(wù)器端發(fā)送過來的通知
* parentPath:對(duì)應(yīng)的父節(jié)點(diǎn)的路徑
* currentChilds:子節(jié)點(diǎn)的相對(duì)路徑
*/
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("訂閱節(jié)點(diǎn)的信息改變(創(chuàng)建節(jié)點(diǎn)敌完,刪除節(jié)點(diǎn),添加子節(jié)點(diǎn))"+parentPath+" "+currentChilds.toString());
}
}
public static void main(String[] args) throws InterruptedException {
ZkClient zkClient = CreateSession.connectZK();
/**
* "/testUserNode" 監(jiān)聽的節(jié)點(diǎn)羊初,可以是現(xiàn)在存在的也可以是不存在的
*/
zkClient.subscribeChildChanges("/aganNode", new ZKChildListener());
Thread.sleep(Integer.MAX_VALUE);
}
}
- 訂閱節(jié)點(diǎn)數(shù)據(jù)變化
package com.soa.other.zk;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
/**
* 訂閱節(jié)點(diǎn)的數(shù)據(jù)內(nèi)容的變化
*/
public class SubscribeDataChanges {
private static class ZKDataListener implements IZkDataListener {
public void handleDataChange(String dataPath, Object data)
throws Exception {
System.out.println("訂閱節(jié)點(diǎn)的數(shù)據(jù)內(nèi)容的變化"+dataPath + ":" + data.toString());
}
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("訂閱節(jié)點(diǎn)的數(shù)據(jù)內(nèi)容被刪除"+dataPath);
}
}
public static void main(String[] args) throws InterruptedException {
ZkClient zkClient = CreateSession.connectZK();
zkClient.subscribeDataChanges("/aganNode", new ZKDataListener());
Thread.sleep(Integer.MAX_VALUE);
}
}
Dubbo與ZK交互
Dubbo與zk交互主要是通過Registry這個(gè)接口的實(shí)現(xiàn)類ZookeeperRegistry來完成的滨溉,ZookeeperRegistry封裝了和注冊(cè)中心交互的細(xì)節(jié)什湘,底層使用ZookeeperClient接口通過zkclient或者Curator操縱zk
#com.alibaba.dubbo.registry.integration.RegistryProtocol#export
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider
//獲取注冊(cè)中心實(shí)例
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
// 訂閱override數(shù)據(jù)
// FIXME 提供者訂閱時(shí),會(huì)影響同一JVM即暴露服務(wù)晦攒,又引用同一服務(wù)的的場(chǎng)景闽撤,因?yàn)閟ubscribed以服務(wù)名為緩存的key,導(dǎo)致訂閱信息覆蓋脯颜。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保證每次export都返回一個(gè)新的exporter實(shí)例
......
};
}
獲取Registry
1. 獲取RegistryFactory
獲取RegistryFactory 擴(kuò)展點(diǎn)哟旗,擴(kuò)展點(diǎn)名稱為zookeeper,用于創(chuàng)建Registry
/**
* 根據(jù)invoker的地址獲取registry實(shí)例
* @param originInvoker
* @return
*/
private Registry getRegistry(final Invoker<?> originInvoker){
URL registryUrl = originInvoker.getUrl();
if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
}
return registryFactory.getRegistry(registryUrl);
}
public class RegistryFactory$Adpative implements com.alibaba.dubbo.registry.RegistryFactory {
public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) {
if (arg0 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory)
//extName =zookeeper
ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class)
.getExtension(extName);
return extension.getRegistry(arg0);
}
}
2.創(chuàng)建Registry
使用AbstractRegistryFactory#getRegistry
獲取Registry栋操,調(diào)用子類ZookeeperRegistryFactory.createRegistry
創(chuàng)建Registry
#com.alibaba.dubbo.registry.support.AbstractRegistryFactory#getRegistry
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString();
// 鎖定注冊(cè)中心獲取過程闸餐,保證注冊(cè)中心單一實(shí)例
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//創(chuàng)建注冊(cè)中心并加入緩存
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// 釋放鎖
LOCK.unlock();
}
}
#com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
Registry的繼承關(guān)系如下
父類
AbstractRegistry
中統(tǒng)一完成緩存配置文件的加載
public AbstractRegistry(URL url) {
setUrl(url);
// 啟動(dòng)文件保存定時(shí)器
syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache");
File file = null;
if (ConfigUtils.isNotEmpty(filename)) {
file = new File(filename);
if(! file.exists() && file.getParentFile() != null && ! file.getParentFile().exists()){
if(! file.getParentFile().mkdirs()){
throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
//
this.file = file;
loadProperties();
notify(url.getBackupUrls());
}
把文件C:\Users\bobo.dubbo\dubbo-registry-192.168.48.117.cache中的內(nèi)容加載為properties,內(nèi)容每個(gè)接口對(duì)應(yīng)注冊(cè)中心的地址的緩存
最終得到的Registry實(shí)例為ZookeeperRegistry類型
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (! group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
//創(chuàng)建zkclient
zkClient = zookeeperTransporter.connect(url);
//接收下層同意封裝后發(fā)送上來的消息
zkClient.addStateListener(new StateListener() {
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
3.初始化ZKClient
zookeeperTransporter.connect(url);
初始化ZK
public class ZkclientZookeeperTransporter implements ZookeeperTransporter {
public ZookeeperClient connect(URL url) {
return new ZkclientZookeeperClient(url);
}
}
#com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry
public ZkclientZookeeperClient(URL url) {
super(url);
client = new ZkClient(url.getBackupAddress());
client.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
ZkclientZookeeperClient.this.state = state;
if (state == KeeperState.Disconnected) {
stateChanged(StateListener.DISCONNECTED);
} else if (state == KeeperState.SyncConnected) {
stateChanged(StateListener.CONNECTED);
}
}
public void handleNewSession() throws Exception {
stateChanged(StateListener.RECONNECTED);
}
});
}
使用中間轉(zhuǎn)換層解耦底層不同客戶端的事件機(jī)制,將底層不同的事件轉(zhuǎn)換后向上層傳遞
小結(jié)
RegistryProtocol.export
-->>registryFactory.getRegistry
-->>ZookeeperRegistryFactory.createRegistry
-->>ZookeeperRegistry
-->>zookeeperTransporter.connect
-->>ZookeeperClient
-->>ZkclientZookeeperClient
注冊(cè)provider到zk
創(chuàng)建并獲取到連接了zk的registry之后就向zk注冊(cè)provider的節(jié)點(diǎn)矾芙,執(zhí)行流程如下:
-->registry.register(registedProviderUrl)//創(chuàng)建節(jié)點(diǎn)
-->AbstractRegistry.register
-->FailbackRegistry.register
-->doRegister(url)//向zk服務(wù)器端發(fā)送注冊(cè)請(qǐng)求
-->ZookeeperRegistry.doRegister
-->zkClient.create
-->AbstractZookeeperClient.create//dubbo/com.alibaba.dubbo.demo.DemoService/providers/
dubbo%3A%2F%2F192.168.100.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26
application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3D
com.alibaba.dubbo.demo.DemoService%26loadbalance%3Droundrobin%26methods%3DsayHello%26owner%3
Dwilliam%26pid%3D2416%26side%3Dprovider%26timestamp%3D1474276306353
-->createEphemeral(path);//臨時(shí)節(jié)點(diǎn) dubbo%3A%2F%2F192.168.100.52%3A20880%2F.............
-->createPersistent(path);//持久化節(jié)點(diǎn) dubbo/com.alibaba.dubbo.demo.DemoService/providers
#com.alibaba.dubbo.registry.integration.RegistryProtocol#export
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
//1.啟動(dòng)netty遠(yuǎn)程暴露
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider
//2.連接zk獲取registry(ZookeeperRegistry):registry封裝了和注冊(cè)中心交互和心跳的功能
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
//3.向zk注冊(cè)服務(wù)舍沙,創(chuàng)建節(jié)點(diǎn)
registry.register(registedProviderUrl);
// 訂閱override數(shù)據(jù)
// FIXME 提供者訂閱時(shí),會(huì)影響同一JVM即暴露服務(wù)剔宪,又引用同一服務(wù)的的場(chǎng)景拂铡,因?yàn)閟ubscribed以服務(wù)名為緩存的key,導(dǎo)致訂閱信息覆蓋葱绒。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
#com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
#com.alibaba.dubbo.remoting.zookeeper.support.AbstractZookeeperClient#create
//中間層感帅,下層是zkclient和curator
public void create(String path, boolean ephemeral) {
//將path按分割符切分依次創(chuàng)建
int i = path.lastIndexOf('/');
if (i > 0) {
create(path.substring(0, i), false);
}
if (ephemeral) {
//服務(wù)提供者信息是臨時(shí)節(jié)點(diǎn),會(huì)話失效自動(dòng)刪除
createEphemeral(path);
} else {
//創(chuàng)建持久化節(jié)點(diǎn)
createPersistent(path);
}
}
#com.alibaba.dubbo.remoting.zookeeper.zkclient.ZkclientZookeeperClient#createEphemeral
public void createEphemeral(String path) {
try {
client.createEphemeral(path);
} catch (ZkNodeExistsException e) {
}
}
創(chuàng)建并訂閱configurators
/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
包含服務(wù)的配置信息地淀,在注冊(cè)完成provider之后需要訂閱該節(jié)點(diǎn)失球,當(dāng)這個(gè)節(jié)點(diǎn)數(shù)據(jù)變更的時(shí)候zk會(huì)通知訂閱了監(jiān)聽器
- 入口
RegistryProtocol#export
#com.alibaba.dubbo.registry.integration.RegistryProtocol#export
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
...
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
...
}
- 代碼執(zhí)行流程
-->registry.subscribe//訂閱ZK
-->AbstractRegistry.subscribe
-->FailbackRegistry.subscribe
-->doSubscribe(url, listener)// 向服務(wù)器端發(fā)送訂閱請(qǐng)求
-->ZookeeperRegistry.doSubscribe
-->new ChildListener()
-->實(shí)現(xiàn)了 childChanged
-->實(shí)現(xiàn)并執(zhí)行 ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
//A
-->zkClient.create(path, false);//第一步:先創(chuàng)建持久化節(jié)點(diǎn)/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
-->zkClient.addChildListener(path, zkListener)
-->AbstractZookeeperClient.addChildListener
//C
-->createTargetChildListener(path, listener)//第三步:收到訂閱后的處理,交給FailbackRegistry.notify處理
-->ZkclientZookeeperClient.createTargetChildListener
-->new IZkChildListener()
-->實(shí)現(xiàn)了 handleChildChange //收到訂閱后的處理
-->listener.childChanged(parentPath, currentChilds);
-->實(shí)現(xiàn)并執(zhí)行ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
-->收到訂閱后處理 FailbackRegistry.notify
//B
-->addTargetChildListener(path, targetListener)////第二步
-->ZkclientZookeeperClient.addTargetChildListener
-->client.subscribeChildChanges(path, listener)//第二步:啟動(dòng)加入訂閱/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
-->notify(url, listener, urls)
-->FailbackRegistry.notify
-->doNotify(url, listener, urls);
-->AbstractRegistry.notify
-->saveProperties(url);//把服務(wù)端的注冊(cè)u(píng)rl信息更新到C:\Users\bobo\.dubbo\dubbo-registry-192.168.48.117.cache
-->registryCacheExecutor.execute(new SaveProperties(version));//采用線程池來處理
-->listener.notify(categoryList)
-->RegistryProtocol.notify
-->RegistryProtocol.this.getProviderUrl(originInvoker)//通過invoker的url 獲取 providerUrl的地址
創(chuàng)建持久化節(jié)點(diǎn)/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
,然后創(chuàng)建上層統(tǒng)一的監(jiān)聽器ChildListener通過抽象類AbstractZookeeperClient
完成事件的監(jiān)聽骚秦,并在方法回調(diào)的時(shí)候觸發(fā)ZookeeperRegistry.this.notify
#com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
protected void doSubscribe(final URL url, final NotifyListener listener) {
...
List<URL> urls = new ArrayList<URL>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
//創(chuàng)建監(jiān)聽器
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
//節(jié)點(diǎn)數(shù)據(jù)變更后的處理
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
//創(chuàng)建持久化節(jié)點(diǎn)/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
zkClient.create(path, false);
//向zk注冊(cè)監(jiān)聽
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
...
}
將上層統(tǒng)一的監(jiān)聽器轉(zhuǎn)換成為zk(IZkChildListener )或者curatro需要的監(jiān)聽器,并注冊(cè)到各自的實(shí)現(xiàn)中
#com.alibaba.dubbo.remoting.zookeeper.support.AbstractZookeeperClient#addChildListener
public List<String> addChildListener(String path, final ChildListener listener) {
ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path);
if (listeners == null) {
childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, TargetChildListener>());
listeners = childListeners.get(path);
}
TargetChildListener targetListener = listeners.get(listener);
if (targetListener == null) {
//將上層監(jiān)聽器的hook璧微,子類實(shí)現(xiàn)完成上層統(tǒng)一實(shí)現(xiàn)作箍,子類收到事件完成回調(diào)
listeners.putIfAbsent(listener, createTargetChildListener(path, listener));
targetListener = listeners.get(listener);
}
//注冊(cè)監(jiān)聽器,因?yàn)槁窂胶妥?cè)時(shí)機(jī)都無法確定所以不能像狀態(tài)監(jiān)聽那樣提前注冊(cè)
return addTargetChildListener(path, targetListener);
}
#com.alibaba.dubbo.remoting.zookeeper.zkclient.ZkclientZookeeperClient#addTargetChildListener
public IZkChildListener createTargetChildListener(String path, final ChildListener listener) {
return new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChilds)
throws Exception {
listener.childChanged(parentPath, currentChilds);
}
};
}
public List<String> addTargetChildListener(String path, final IZkChildListener listener) {
//實(shí)際訂閱節(jié)點(diǎn)
return client.subscribeChildChanges(path, listener);
}