1涛碑、ZkClient簡(jiǎn)介
ZkClient是由Datameer的工程師開(kāi)發(fā)的開(kāi)源客戶端,對(duì)Zookeeper的原生API進(jìn)行了包裝,實(shí)現(xiàn)了超時(shí)重連枫疆、Watcher反復(fù)注冊(cè)等功能忘分;目前已經(jīng)應(yīng)用到了很多項(xiàng)目中棋枕,比如Dubbo、Kafka妒峦、Helix重斑;
Github:https://github.com/sgroschupf/zkclient
Maven依賴
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version></version>
</dependency>
或者
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version></version>
</dependency>
2、ZkClient組件
IZKConnection:是一個(gè)ZkClient與Zookeeper之間的一個(gè)適配器肯骇;在代碼里直接使用的是ZKClient窥浪,實(shí)質(zhì)上還是委托了zookeeper來(lái)處理了。
在ZKClient中笛丙,根據(jù)事件類型漾脂,分為
- 節(jié)點(diǎn)事件(數(shù)據(jù)事件),對(duì)應(yīng)的事件處理器是IZKDataListener胚鸯;
- 子節(jié)點(diǎn)事件骨稿,對(duì)應(yīng)的事件處理器是IZKChildListener;
- Session事件蠢琳,對(duì)應(yīng)的事件處理器是IZKStatusListener啊终;
ZkEventThread:是專門(mén)用來(lái)處理事件的線程
3、API介紹
- 啟動(dòng)ZKClient:在創(chuàng)建ZKClient對(duì)象時(shí)傲须,就完成了到ZooKeeper服務(wù)器連接的建立
1蓝牲、啟動(dòng)時(shí),制定好connection string泰讽,連接超時(shí)時(shí)間例衍,序列化工具等
2、創(chuàng)建并啟動(dòng)eventThread已卸,用于接收事件佛玄,并調(diào)度事件監(jiān)聽(tīng)器Listener的執(zhí)行
3、連接到Zookeeper服務(wù)器累澡,同時(shí)將ZKClient自身作為默認(rèn)的Watcher
- 為節(jié)點(diǎn)注冊(cè)Watcher
Zookeeper 原始API的三個(gè)方法:getData梦抢,getChildren、exists愧哟,ZKClient都提供了相應(yīng)的代理方法奥吩,比如exists哼蛆,
hasListeners是看有沒(méi)有與該數(shù)據(jù)節(jié)點(diǎn)綁定的listener
所以,默認(rèn)情況下霞赫,都會(huì)自動(dòng)的為指定的path注冊(cè)watcher腮介,并且是默認(rèn)的watcher(ZKClient),那么怎樣才能讓hasListeners值為true呢端衰,也就是怎么才能為path綁定Listener呢叠洗?
ZKClient提供了訂閱功能,一個(gè)新建的會(huì)話旅东,只需要在取得響應(yīng)的數(shù)據(jù)節(jié)點(diǎn)后灭抑,調(diào)用subscribeXXX就可以訂閱上相應(yīng)的事件了。
- Zookeeper的CURD(節(jié)點(diǎn)的增刪查改)
Zookeeper中提供的變更操作有:節(jié)點(diǎn)的創(chuàng)建玉锌、刪除名挥,節(jié)點(diǎn)數(shù)據(jù)的修改
1、創(chuàng)建操作主守,節(jié)點(diǎn)分為4種禀倔,所以ZKClient分別為他們提供了相應(yīng)的代理
2、刪除節(jié)點(diǎn)操作
3参淫、修改節(jié)點(diǎn)數(shù)據(jù)
updateDataSerialized:修改已系列化的數(shù)據(jù)救湖;執(zhí)行過(guò)程是,先讀取數(shù)據(jù)涎才,然后DataUpdater對(duì)數(shù)據(jù)修改鞋既,最后調(diào)用writeData將修改后的數(shù)據(jù)發(fā)送給服務(wù)端;
writeDataReturnStat:寫(xiě)數(shù)據(jù)并返回?cái)?shù)據(jù)的狀態(tài)耍铜;
4邑闺、客戶端處理變更流程
ZKClient是默認(rèn)的Watcher(ZKClient實(shí)現(xiàn)了Watcher接口),并且在為各個(gè)數(shù)據(jù)節(jié)點(diǎn)注冊(cè)的Watcher都是這個(gè)默認(rèn)的Watcher棕兼,那么該如何將各種事件通知給相應(yīng)的Listener呢:
1陡舅、判斷變更類型,變更類型分為state變更伴挚、ChildNode變更靶衍、NodeData變更;
2茎芋、取出與path關(guān)聯(lián)的Listeners颅眶,并為每一個(gè)Listener創(chuàng)建一個(gè)ZKEvent,將ZkEvent田弥,將ZkEvent交給ZkEventThread處理涛酗;
3、ZkEventThread線程,拿到ZkEvent后煤杀,只需要調(diào)用ZkEvent的run方法進(jìn)行處理就可以了眷蜈,所以,具體的如何調(diào)用Listener沈自,還要依賴于ZkEvent的run()實(shí)現(xiàn)
5、序列化處理
Zookeeper中辜妓,會(huì)涉及到序列化枯途、反序列化的操作有兩種:getData/setData;在ZkClient中籍滴,分別用readData/WriteData來(lái)替代了酪夷。
- ReadData:先調(diào)用zookeeper的getData,然后使用ZKSerializer進(jìn)行反序列化工作
- WriteData:先使用ZKSerializer將對(duì)象序列化后孽惰,再調(diào)用zookeeper的setData
6晚岭、注冊(cè)監(jiān)聽(tīng)
在ZkClient中客戶端可以通過(guò)注冊(cè)相關(guān)的事件監(jiān)聽(tīng)來(lái)實(shí)現(xiàn)對(duì)Zookeeper服務(wù)端事件的訂閱。
7勋功、demo
package com.xxx.api.zkclient;
import com.xxx.ZookeeperUtil;
import com.xxx.api.natives.crud.User;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class ZkClientCrud<T> {
ZkClient zkClient ;
final static Logger logger = LoggerFactory.getLogger(ZkClientCrud.class);
public ZkClientCrud(ZkSerializer zkSerializer) {
logger.info("鏈接zk開(kāi)始");
// zkClient=new ZkClient(ZookeeperUtil.connectString,ZookeeperUtil.sessionTimeout);
zkClient=new ZkClient(ZookeeperUtil.connectString,ZookeeperUtil.sessionTimeout,ZookeeperUtil.sessionTimeout,zkSerializer);
}
public void createEphemeral(String path,Object data){
zkClient.createEphemeral(path,data);
}
/***
* 支持創(chuàng)建遞歸方式
* @param path
* @param createParents
*/
public void createPersistent(String path,boolean createParents){
zkClient.createPersistent(path,createParents);
}
/***
* 創(chuàng)建節(jié)點(diǎn) 跟上data數(shù)據(jù)
* @param path
* @param data
*/
public void createPersistent(String path,Object data){
zkClient.createPersistent(path,data);
}
/***
* 子節(jié)點(diǎn)
* @param path
* @return
*/
public List<String> getChildren(String path){
return zkClient.getChildren(path);
}
public T readData(String path){
return zkClient.readData(path);
}
public void writeData(String path,Object data){
zkClient.writeData(path,data);
}
//遞歸刪除
public void deleteRecursive(String path){
zkClient.deleteRecursive(path);
}
}
package com.xxx.api.zkclient;
import com.xxx.api.natives.crud.User;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class ZkClientCrudTest {
final static Logger logger = LoggerFactory.getLogger(ZkClientCrudTest.class);
public static void main(String[] args) {
ZkClientCrud<User> zkClientCrud=new ZkClientCrud<User>(new SerializableSerializer());
String path="/root";
zkClientCrud.deleteRecursive(path);
zkClientCrud.createPersistent(path,"hi");
/* zkClientCrud.createPersistent(path+"/a/b/c",true);//遞歸創(chuàng)建 但是不能設(shè)在value
//zkClientCrud.createPersistent(path,"hi");
logger.info(zkClientCrud.readData(path));
//更新
zkClientCrud.writeData(path,"hello");
logger.info(zkClientCrud.readData(path));
logger.info(String.valueOf(zkClientCrud.getChildren(path)));
//子節(jié)點(diǎn)
List<String> list=zkClientCrud.getChildren(path);
for(String child:list){
logger.info("子節(jié)點(diǎn):"+child);
}*/
User user=new User();
user.setUserid(1);
user.setUserName("張三");
zkClientCrud.writeData(path,user);
System.out.println(zkClientCrud.readData(path).getUserName());;
}
}
package com.xxx.api.zkclient;
import com.xxx.ZookeeperUtil;
import org.I0Itec.zkclient.*;
import org.apache.zookeeper.Watcher;
import java.util.List;
public class ZkClientWatcher {
ZkClient zkClient;
public ZkClientWatcher() {
zkClient= new ZkClient(new ZkConnection(ZookeeperUtil.connectString), ZookeeperUtil.sessionTimeout);
}
public void createPersistent(String path,Object data){
zkClient.createPersistent(path,data);
}
public void writeData(String path,Object object){
zkClient.writeData(path,object);
}
public void delete(String path){
zkClient.delete(path);
}
public boolean exists(String path){
return zkClient.exists(path);
}
public void deleteRecursive(String path){
zkClient.deleteRecursive(path);
}
//對(duì)父節(jié)點(diǎn)添加監(jiān)聽(tīng)數(shù)據(jù)變化坦报。
public void subscribe(String path){
zkClient.subscribeDataChanges(path, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.printf("變更的節(jié)點(diǎn)為:%s,數(shù)據(jù):%s\r\n", dataPath,data );
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.printf("刪除的節(jié)點(diǎn)為:%s\r\n", dataPath );
}
});
}
//對(duì)父節(jié)點(diǎn)添加監(jiān)聽(tīng)子節(jié)點(diǎn)變化。
public void subscribe2(String path){
zkClient.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("父節(jié)點(diǎn): " + parentPath+",子節(jié)點(diǎn):"+currentChilds+"\r\n");
}
});
}
//客戶端狀態(tài)
public void subscribe3(String path) {
zkClient.subscribeStateChanges(new IZkStateListener() {
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
if(state== Watcher.Event.KeeperState.SyncConnected){
//當(dāng)我重新啟動(dòng)后start狂鞋,監(jiān)聽(tīng)觸發(fā)
System.out.println("連接成功");
}else if(state== Watcher.Event.KeeperState.Disconnected){
System.out.println("連接斷開(kāi)");//當(dāng)我在服務(wù)端將zk服務(wù)stop時(shí)片择,監(jiān)聽(tīng)觸發(fā)
}else
System.out.println("其他狀態(tài)"+state);
}
@Override
public void handleNewSession() throws Exception {
System.out.println("重建session");
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
}
});
}
/* @Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
}*/
}
package com.xxx.api.zkclient;
import java.util.concurrent.TimeUnit;
public class ZkClientWatcherTest {
public static void main(String[] args) throws InterruptedException {
ZkClientWatcher zkClientWatche=new ZkClientWatcher();
String path="/root";
zkClientWatche.deleteRecursive(path);
zkClientWatche.createPersistent(path,"hello");
zkClientWatche.subscribe(path);
zkClientWatche.subscribe2(path);
// zkClientWatche.subscribe3(path);//需要啟服務(wù)
// Thread.sleep(Integer.MAX_VALUE);
zkClientWatche.createPersistent(path+"/root2","word");
TimeUnit.SECONDS.sleep(1);
zkClientWatche.writeData(path,"hi");
TimeUnit.SECONDS.sleep(1);
//zkClientWatche.delete(path);//如果目錄下有內(nèi)容 不能刪除 會(huì)報(bào) Directory not empty for /root的異常
zkClientWatche.deleteRecursive(path);
TimeUnit.SECONDS.sleep(1); //這個(gè)main線程就結(jié)束
}
}
package com.xxx;
public class ZookeeperUtil {
/** zookeeper服務(wù)器地址 */
public static final String connectString = "192.168.0.101:2181,192.168.0.102:2181,192.168.0.104:2181";
/** 定義session失效時(shí)間 */
public static final int sessionTimeout = 5000;
public static final String path = "/root";
}