原諒鏈接: https://mp.weixin.qq.com/s/PgDXSFGnA7kZNX7Qpxg04A
之前寫了一篇通過(guò)zkCli操作zookeeper的文章,這一篇是通過(guò)Java操作zookeeper的文章滔驾,代碼在這: https://github.com/liangyt/ZookeeperTest/tree/master/base
因?yàn)槭褂玫氖?zookeeper-3.5.5 版本硬霍,所以使用對(duì)應(yīng)的版本株憾。
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.5</version>
</dependency>
有一些回調(diào)的定義類比較簡(jiǎn)單遵绰,這里不就列表出來(lái)了烦感,可以進(jìn)代碼直接翻看就行了。
一基茵、連接服務(wù)
要操作zookeeper第一件事就是連接上zookeeper服務(wù)器:
在類 {common.ZkConnect}
/**
* 創(chuàng)建zk連接
* @return
* @throws IOException
*/
public static ZooKeeper instance() throws IOException {
ZooKeeper zk = new ZooKeeper(
"localhost:2181", // 連接的服務(wù)地址
5000, // 會(huì)話超時(shí)時(shí)間, 在超時(shí)時(shí)間內(nèi)會(huì)進(jìn)行心跳檢測(cè)奋构;如果超過(guò)這個(gè)時(shí)間沒(méi)有心跳檢測(cè),則服務(wù)端認(rèn)為這個(gè)會(huì)話超時(shí)了
new DefaultWatcher() // 默認(rèn)的會(huì)話監(jiān)聽(tīng)器, 如果設(shè)置為 null 則表示沒(méi)有默認(rèn)的監(jiān)聽(tīng)器了
);
return zk;
}
/**
* 創(chuàng)建zk連接
* @return
* @throws IOException
*/
public static ZooKeeper instance(CountDownLatch latch) throws IOException {
ZooKeeper zk = new ZooKeeper(
"localhost:2181", // 連接的服務(wù)地址
5000, // 會(huì)話超時(shí)時(shí)間, 在超時(shí)時(shí)間內(nèi)會(huì)進(jìn)行心跳檢測(cè)拱层;如果超過(guò)這個(gè)時(shí)間沒(méi)有心跳檢測(cè)弥臼,則服務(wù)端認(rèn)為這個(gè)會(huì)話超時(shí)了
new DefaultWatcher(latch) // 默認(rèn)的會(huì)話監(jiān)聽(tīng)器, 如果設(shè)置為 null 則表示沒(méi)有默認(rèn)的監(jiān)聽(tīng)器了
);
return zk;
}
/**
* 創(chuàng)建zk連接
* @param sessionId 會(huì)話id zk.getSessionId()
* @param sessionpwd 會(huì)話密碼 zk.getSessionPasswd()
* @return
* @throws IOException
*/
public static ZooKeeper instance(long sessionId, byte[] sessionpwd) throws IOException {
ZooKeeper zk = new ZooKeeper(
"127.0.0.1:2181",
5000,
new DefaultWatcher(),
sessionId,
sessionpwd
);
return zk;
}
/**
* 創(chuàng)建zk連接
* @param sessionId 會(huì)話id zk.getSessionId()
* @param sessionpwd 會(huì)話密碼 zk.getSessionPasswd()
* @Param latch 同步對(duì)象
* @return
* @throws IOException
*/
public static ZooKeeper instance(long sessionId, byte[] sessionpwd, CountDownLatch latch) throws IOException {
ZooKeeper zk = new ZooKeeper(
"127.0.0.1:2181",
5000,
new DefaultWatcher(latch),
sessionId,
sessionpwd
);
return zk;
}
創(chuàng)建了幾個(gè)連接zookeeper服務(wù)器的簡(jiǎn)易方法,設(shè)置了默認(rèn)的服務(wù)器地址[127.0.0.1:2181]根灯,這個(gè)地址可以是單臺(tái)服務(wù)径缅,也可以是集群服務(wù),如果是群集的話則格式為 [ip1:port1,ip2:port2 ...]烙肺。
二纳猪、創(chuàng)建節(jié)點(diǎn)
節(jié)點(diǎn)的創(chuàng)建方法有幾個(gè):
這6個(gè)方法分同步和異步,不帶 callback 的是同步方法桃笙。
具體使用方法看這個(gè)類:{base1.CreateTest}
private static void normalCreate(ZooKeeper zk) throws KeeperException, InterruptedException {
zk.create(
"/zk-java-01", // 節(jié)點(diǎn)路徑
"你們好".getBytes(), // 節(jié)點(diǎn)內(nèi)容
/**
* 節(jié)點(diǎn)權(quán)限 ZooDefs.Ids.OPEN_ACL_UNSAFE -> 是 world anyone 所有的權(quán)限
* 可以自已定義一個(gè)權(quán)限列表: ArrayList<ACL>
* new ACL("per", "ID") 構(gòu)成一個(gè)權(quán)限對(duì)象 可以設(shè)置多個(gè)
* per 表示操作權(quán)限:READ = 1; WRITE = 2; CREATE = 4; DELETE = 8; ADMIN = 16; ALL = 31;
* ID 表示授權(quán)對(duì)象 new ID("scheme", "id") 如 new ID("world", "anyone")
*/
ZooDefs.Ids.OPEN_ACL_UNSAFE,
/**
* 節(jié)點(diǎn)類型: 使用 zkCli 客戶端創(chuàng)建節(jié)點(diǎn)一般是只有前四種
* PERSISTENT 持久型
* PERSISTENT_SEQUENTIAL 持久有序型
* EPHEMERAL 臨時(shí)型
* EPHEMERAL_SEQUENTIAL 臨時(shí)有序型
* CONTAINER 容器節(jié)點(diǎn)兆旬,用于Leader、Lock等特殊用途怎栽,當(dāng)容器節(jié)點(diǎn)不存在任何子節(jié)點(diǎn)時(shí),容器將成為服務(wù)器在將來(lái)某個(gè)時(shí)候刪除的候選節(jié)點(diǎn)
* PERSISTENT_WITH_TTL 有TTL[存活時(shí)間]的永久節(jié)點(diǎn)宿饱,節(jié)點(diǎn)在TTL時(shí)間之內(nèi)沒(méi)有得到更新并且無(wú)子節(jié)點(diǎn)熏瞄,就會(huì)被自動(dòng)刪除 需要配合另外一個(gè)參數(shù)一起
* PERSISTENT_SEQUENTIAL_WITH_TTL 有TTL[存活時(shí)間]和有序的永久節(jié)點(diǎn),節(jié)點(diǎn)在TTL時(shí)間之內(nèi)沒(méi)有得到更新并且無(wú)子節(jié)點(diǎn)谬以,就會(huì)被自動(dòng)刪除 需要配合另外一個(gè)參數(shù)一起
*/
CreateMode.PERSISTENT // 永久節(jié)點(diǎn)
);
}
public static void statCreate(ZooKeeper zk, String path) throws KeeperException, InterruptedException {
// 用于保存節(jié)點(diǎn)創(chuàng)建完成后的狀態(tài)信息
Stat stat = new Stat();
String p = zk.create(
path,
"帶自定義狀態(tài)".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
/**
* 節(jié)點(diǎn)狀態(tài):
* czxid;
* mzxid;
* ctime;
* mtime;
* version;
* cversion;
* aversion;
* ephemeralOwner;
* dataLength;
* numChildren;
* pzxid;
*/
stat
);
System.out.println("path ->" + p);
System.out.println("stat -> " + stat);
}
private static void asyncCreate(ZooKeeper zk) throws InterruptedException {
zk.create(
"/zk-java-async-01",
"帶自定義狀態(tài)".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
new BaseStringCallback(), // 節(jié)點(diǎn)創(chuàng)建完成的回調(diào)
"我是異步創(chuàng)建節(jié)點(diǎn)"
);
}
三强饮、獲取節(jié)點(diǎn)數(shù)據(jù)
獲取節(jié)點(diǎn)數(shù)據(jù)有四個(gè)方法,使用主要看類:{base1.GetTest}
public static String getData(ZooKeeper zk, Stat stat) throws KeeperException, InterruptedException {
byte[] data = zk.getData(
"/zk-java-stat-01", // 節(jié)點(diǎn)路徑
true, // 是否使用默認(rèn)監(jiān)聽(tīng)器
stat // 用于存放服務(wù)器返回的 stat
);
System.out.println("data -> " + new String(data));
System.out.println("aversion -> " + stat.getAversion());
System.out.println("ctime -> " + stat.getCtime());
System.out.println("cversion -> " + stat.getCversion());
System.out.println("dataLength -> " + stat.getDataLength());
System.out.println("version -> " + stat.getVersion());
return new String(data);
}
public static String getDataNewWatcher(ZooKeeper zk) throws KeeperException, InterruptedException {
Stat stat = new Stat();
byte[] data = zk.getData(
"/zk-java-stat-01",
new CustomWatcher(), // 注冊(cè)節(jié)點(diǎn)內(nèi)容變更監(jiān)聽(tīng)
stat
);
System.out.println("data -> " + new String(data));
return new String(data);
}
private static void asyncGetData(ZooKeeper zk) {
zk.getData(
"/zk-java-stat-01",
false, // 不使用監(jiān)聽(tīng)器
new DataCallback(), // 異步獲取數(shù)據(jù)的回調(diào)
"異步獲取數(shù)據(jù)"
);
zk.getData(
"/zk-java-stat-01",
new CustomWatcher(), // 使用自定義數(shù)據(jù)變更監(jiān)聽(tīng)器 節(jié)點(diǎn)內(nèi)容變更監(jiān)聽(tīng)
new DataCallback(), // 異步獲取數(shù)據(jù)的回調(diào)
"異步獲取監(jiān)聽(tīng)數(shù)據(jù)"
);
}
四为黎、更新節(jié)點(diǎn)數(shù)據(jù)
更新節(jié)點(diǎn)數(shù)據(jù)方法只有兩個(gè)了邮丰,使用方法看類:{base1.UpdateTest}
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = ZkConnect.instance(latch);
latch.await();
Stat gStat = new Stat();
String oldData = GetTest.getData(zk, gStat);
System.out.println("oldData -> " + oldData);
System.out.println("oldVersion -> " + gStat.getVersion());
Stat stat = zk.setData(
"/zk-java-stat-01", // 節(jié)點(diǎn)路徑
"我是新數(shù)據(jù)".getBytes(), // 新數(shù)據(jù)
gStat.getVersion() // 如果版本號(hào)跟服務(wù)器上保存的不一樣行您, 則此時(shí)出現(xiàn)異常 org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /zk-java-stat-01
);
// 此時(shí)版本號(hào)已更新
System.out.println("newVersion -> " + stat.getVersion());
CountDownLatch latch1 = new CountDownLatch(1);
zk.setData(
"/zk-java-stat-01", // 節(jié)點(diǎn)路徑
"我是異步更新新數(shù)據(jù)".getBytes(), // 新數(shù)據(jù)
stat.getVersion(),
new StatCallback(latch1),
"異步更新數(shù)據(jù)"
);
latch1.await();
}
五、刪除節(jié)點(diǎn)
兩個(gè)方法剪廉,一個(gè)同步刪除娃循,一個(gè)異步刪除。代碼在類:{base1.DeleteTest}
private static void baseDelete(ZooKeeper zk) throws KeeperException, InterruptedException {
zk.delete(
"/zk-java-01", // 需要?jiǎng)h除的節(jié)點(diǎn)全路徑
0 // 刪除節(jié)點(diǎn)的版本號(hào) 如果版本號(hào)對(duì)不上的話則刪除失敗
);
}
private static void asyncDelete(ZooKeeper zk) {
zk.delete(
"/zk-java-01",
0,
new BaseVoidCallback(), // 刪除回調(diào)
"刪除基本節(jié)點(diǎn)"
);
}
六斗蒋、獲取子節(jié)點(diǎn)列表
獲取子節(jié)點(diǎn)列表的方法比較多捌斧,這里試了一個(gè)同步一個(gè)異步的,類:{base1.GetChildrenTest}
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = ZkConnect.instance(latch);
latch.await();
// 直接獲取節(jié)點(diǎn)的子節(jié)點(diǎn)
List<String> children = getChildren(zk, "/zk-java-stat-01");
System.out.println("children -> " + children);
// 添加一個(gè)子節(jié)點(diǎn)
// 重復(fù)添加子節(jié)點(diǎn)出現(xiàn)節(jié)點(diǎn)重復(fù)異常
// org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /zk-java-stat-01/child
CreateTest.statCreate(zk, "/zk-java-stat-01/child01");
// 再次獲取子節(jié)點(diǎn)
children = getChildren(zk, "/zk-java-stat-01");
System.out.println("children -> " + children);
// 異常獲取子節(jié)點(diǎn)列表 并添加一個(gè)自定義子節(jié)點(diǎn)列表變更監(jiān)聽(tīng)
latch = new CountDownLatch(1);
zk.getChildren(
"/zk-java-stat-01",
new CustomWatcher(),
new ChildrenCallback(latch),
"異步獲取子節(jié)點(diǎn)列表"
);
latch.await();
}
private static List<String> getChildren(ZooKeeper zk, String pPath) throws KeeperException, InterruptedException {
// 返回的子節(jié)點(diǎn)列表路徑都是相對(duì)于父節(jié)點(diǎn)的路徑泉沾,而不是全路徑
return zk.getChildren(
pPath, // 需要獲取子節(jié)點(diǎn)列表的節(jié)點(diǎn)路徑
false // 不添加監(jiān)聽(tīng)
);
}
七捞蚂、權(quán)限
設(shè)置權(quán)限有兩個(gè)方法,也比較簡(jiǎn)單跷究;類:{base1.AclTest}
public static void main(String[] args) throws Exception {
String node = "/zk-java-acl";
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = ZkConnect.instance(latch);
latch.await();
// 設(shè)置超級(jí)權(quán)限
/**
* 設(shè)置超級(jí)賬戶連接姓迅, super:id 這是 digest:id 的一種特殊方式
* 啟用超級(jí)賬戶需要啟動(dòng)zookeeper服務(wù)的時(shí)候配置對(duì)應(yīng)的參數(shù)(這種提供兩種方式):
* 1. 啟動(dòng)的時(shí)候直接添加 -Dzookeeper.DigestAuthenticationProvider.superDigest=zookeeper:qW/HnTfCSoQpB5G8LgkwT3IbiFc=
* 2. 在配置文件 zoo.cfg 里面配置 DigestAuthenticationProvider.superDigest=zookeeper:qW/HnTfCSoQpB5G8LgkwT3IbiFc=
* 這兩種方式都可以配置超級(jí)賬戶,賬戶可以自定義俊马,結(jié)果: BASE64(SHA-1)
* 可以命令行生成也可以java代碼運(yùn)行生成:
* java: DigestAuthenticationProvider.generateDigest(name:password)
*
*/
zk.addAuthInfo("digest", "zookeeper:admin".getBytes());
// 先創(chuàng)建一個(gè)永久節(jié)點(diǎn), 權(quán)限為 ZooDefs.Ids.OPEN_ACL_UNSAFE
// 如果報(bào)節(jié)點(diǎn)已存在異常 則把這一行注釋掉
// org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /zk-java-acl
CreateTest.statCreate(zk, node);
// 獲取節(jié)點(diǎn)的權(quán)限
Stat stat = new Stat();
List<ACL> acls = zk.getACL(
node, // 需要獲取權(quán)限信息的節(jié)點(diǎn)
stat // 回設(shè)節(jié)點(diǎn)狀態(tài)
);
//
// // [31,s{'world,'anyone}] 這是默認(rèn)創(chuàng)建的節(jié)點(diǎn)權(quán)限
System.out.println("acls ->" + acls);
System.out.println("aclVersion -> " + stat.getAversion());
// 異步獲取節(jié)點(diǎn)權(quán)限
zk.getACL(node, stat, new AsyncCallback.ACLCallback() {
@Override
public void processResult(int resultCode, String path, Object ctx, List<ACL> list, Stat stat) {
System.out.println("async acls ->" + list);
}
}, "異步獲取權(quán)限");
//
String auth = "name:password";
// 對(duì)節(jié)點(diǎn)設(shè)置權(quán)限
acl(node, zk, stat, auth);
// 另起一個(gè)無(wú)權(quán)限會(huì)話 讀取該節(jié)點(diǎn)的數(shù)據(jù)看看
// noAuthGetData(node);
// 另起一個(gè)會(huì)話并設(shè)置權(quán)限 讀取該節(jié)點(diǎn)數(shù)據(jù)
authGetData(node, auth);
// 刪除節(jié)點(diǎn)
zk.getData(node, null, stat);
zk.delete(node, stat.getVersion());
}
// 給節(jié)點(diǎn)設(shè)置權(quán)限
private static void acl(String node, ZooKeeper zk, Stat stat, String auth) throws NoSuchAlgorithmException, KeeperException, InterruptedException {
// 創(chuàng)建授權(quán)對(duì)象
Id id = new Id("digest", DigestUtil.digest(auth));
// 可以定義一個(gè) ip 模式的授權(quán)對(duì)象
// Id ipId = new Id("ip", "192.168.3.17");
// 定義操作權(quán)限
ACL aclRead = new ACL(ZooDefs.Perms.READ, id); // 讀取權(quán)限
ACL aclCrd = new ACL(ZooDefs.Perms.CREATE, id); // 創(chuàng)建權(quán)限
ACL aclDel = new ACL(ZooDefs.Perms.DELETE, id); // 刪除權(quán)限
ACL aclUpd = new ACL(ZooDefs.Perms.WRITE, id); // 更新權(quán)限
ACL aclAdm = new ACL(ZooDefs.Perms.ADMIN, id); // 權(quán)限管理權(quán)限
// 對(duì)節(jié)點(diǎn) /zk-java-acl 設(shè)置權(quán)限
List<ACL> acls = new ArrayList<>();
acls.add(aclRead);
acls.add(aclUpd);
// 返回狀態(tài) 權(quán)限版本號(hào)已改變了
Stat aclStat = zk.setACL(node, acls, stat.getAversion());
System.out.println("aclStatVersion -> " + aclStat.getAversion());
}
private static void authGetData(String node, String auth) throws IOException, InterruptedException, KeeperException {
CountDownLatch latch1 = new CountDownLatch(1);
ZooKeeper authZk = ZkConnect.instance(latch1);
latch1.await();
// 設(shè)置會(huì)話權(quán)限
authZk.addAuthInfo("digest", auth.getBytes());
Stat stat = new Stat();
byte[] dataAuth = authZk.getData(node, false, stat);
System.out.println("dataAuth -> " + new String(dataAuth));
// 沒(méi)有更新權(quán)限丁存,看看能否更新
authZk.setData(node, "試試能否更新成功".getBytes(), stat.getVersion());
}
// 測(cè)試沒(méi)有讀取權(quán)限的會(huì)話去讀取數(shù)據(jù)是什么樣的
private static void noAuthGetData(String node) throws IOException, InterruptedException, KeeperException {
CountDownLatch latch1 = new CountDownLatch(1);
ZooKeeper noAuthZk = ZkConnect.instance(latch1);
latch1.await();
// 出現(xiàn)異常
// org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /zk-java-acl
byte[] dataNoAuth = noAuthZk.getData(node, false, null);
System.out.println("dataNoAuth -> " + new String(dataNoAuth));
}