[TOC]
本文主要記錄自己學(xué)習(xí)zookeeper時(shí)的一些個(gè)人筆記。不喜勿噴募疮。
1 環(huán)境準(zhǔn)備
隨便建個(gè)java項(xiàng)目即可。
maven坐標(biāo):
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>
2 同步方式調(diào)用
public class ZKTestSync {
public static String connectStr = "192.168.161.128:2181";
public ZooKeeper keeper = null;
private static Stat stat = new Stat();
private static Logger log = LoggerFactory.getLogger(ZKTestSync.class);
@Before
public void init() {
try {
keeper = new ZooKeeper(connectStr, 3000, (WatchedEvent event) -> {
log.info("事件:{}", event);
if (event.getState() == KeeperState.SyncConnected) {
if (event.getType() == EventType.None && event.getPath() == null) {
} else {
if (event.getType() == EventType.NodeChildrenChanged) {
log.info("節(jié)點(diǎn){}發(fā)生變化", event.getPath());
} else if (EventType.NodeDataChanged == event.getType()) {
try {
log.info("節(jié)點(diǎn){}數(shù)據(jù)發(fā)生變化", event.getPath());
this.keeper.getData(event.getPath(), false, stat);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
@After
public void close() {
if (keeper != null)
try {
keeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void testCreateNode() {
try {
String string = this.keeper.create("/node_3", "3".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(string);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void testGetChildren() {
try {
List<String> list = this.keeper.getChildren("/", false);
list.stream().forEach(System.out::println);
// true表示對(duì)節(jié)點(diǎn)的變化感興趣,
// 在節(jié)點(diǎn)變化時(shí)在ZooKeeper構(gòu)造函數(shù)傳入的Watcher中可收到通知.
// 但是這種監(jiān)聽器只是一次性的
list = this.keeper.getChildren("/", true);
list.stream().forEach(System.out::println);
Thread.sleep(3 * 60 * 1000);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testGetData() {
try {
String str = new String(this.keeper.getData("/node_8", true, stat));
System.out.println(str);
Thread.sleep(3 * 60 * 1000);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testDelete() {
try {
this.keeper.delete("/node_40000000005", -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Test
public void testSetData() {
try {
Stat s = this.keeper.setData("/node_1", "ddd".getBytes(), -1);
System.out.println(s);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testSetACL() {
try {
ACL ipAcl = new ACL(Perms.CREATE | Perms.DELETE | Perms.READ, new Id("ip", "192.168.161.1"));
ACL digestAcl = new ACL(Perms.READ | Perms.WRITE, new Id("digest", DigestAuthenticationProvider.generateDigest("hylexus:123456")));
List<ACL> acls = Arrays.asList(ipAcl, digestAcl);
String string = this.keeper.create("/node_11", "8".getBytes(), acls, CreateMode.PERSISTENT);
System.out.println(string);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testGenerateDigest() throws NoSuchAlgorithmException {
System.out.println(DigestAuthenticationProvider.generateDigest("hylexus:123"));
}
}
3 異步方式調(diào)用
和上面的同步代碼的最大區(qū)別就是僻弹,異步代碼沒法及時(shí)獲取返回值阿浓,或者說他沒有返回值。
只能通過提供回調(diào)函數(shù)的方式來處理操作完成后的工作蹋绽。
但是有了回調(diào)函數(shù)芭毙,難免代碼中有大量匿名類,為簡(jiǎn)單卸耘,此處使用java8的lambda代替匿名類退敦。
public class ZKTestASync {
public static String connectStr = "192.168.161.128:2181";
public ZooKeeper keeper = null;
private static final Logger log = LoggerFactory.getLogger(ZKTestASync.class);
private static Stat stat = new Stat();
@Before
public void init() {
try {
keeper = new ZooKeeper(connectStr, 3000, (WatchedEvent event) -> {
log.info("事件:{}", event);
if (event.getState() == KeeperState.SyncConnected) {
if (event.getType() == EventType.None && event.getPath() == null) {
} else {
if (event.getType() == EventType.NodeChildrenChanged) {
log.info("節(jié)點(diǎn){}發(fā)生變化", event.getPath());
} else if (EventType.NodeDataChanged == event.getType()) {
try {
log.info("節(jié)點(diǎn){}數(shù)據(jù)發(fā)生變化", event.getPath());
this.keeper.getData(event.getPath(), false, stat);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
@After
public void close() {
if (keeper != null)
try {
keeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 異步創(chuàng)建
@Test
public void testCreateNodeASync() {
// 異步調(diào)用,沒有返回值,通過回調(diào)函數(shù)處理結(jié)果
this.keeper.create("/node_4", "3".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, new StringCallback() {
/***
* @param rc:返回碼
* @param path:要?jiǎng)?chuàng)建的節(jié)點(diǎn)的完整路徑(想要?jiǎng)?chuàng)建的路徑)
* @param ctx:create方法傳入的上下文參數(shù),此處是
* "testCreateNodeASync"
* @param name:返回的創(chuàng)建的真實(shí)路徑(創(chuàng)建順序節(jié)點(diǎn)時(shí)返回的真實(shí)路徑和傳入的路徑是不同的)
*/
@Override
public void processResult(int rc, String path, Object ctx, String name) {
log.info("rc:{}", rc);
log.info("path:{}", path);
log.info("ctx:{}", ctx);
log.info("name:{}", name);
}
}, "testCreateNodeASync");
}
@Test
public void testGetChildren() {
try {
/***
* @param rc:返回碼
* @param path:要?jiǎng)?chuàng)建的節(jié)點(diǎn)的完整路徑(想要?jiǎng)?chuàng)建的路徑)
* @param ctx:create方法傳入的上下文參數(shù)
* @param children:子節(jié)點(diǎn)列表
* @param stat:節(jié)點(diǎn)狀態(tài)
*/
this.keeper.getChildren("/", true, //
(int rc, String path, Object ctx, List<String> children, Stat stat) -> {
log.info("rc:{}", rc);
log.info("path:{}", path);
log.info("ctx:{}", ctx);
log.info("children:{}", children);
log.info("stat:{}", stat);
}, "這里可以傳入任何Object作為上下文以便在回調(diào)函數(shù)函數(shù)中使用");
Thread.sleep(3 * 60 * 100);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testGetData() {
try {
this.keeper.getData("/node_1", true, (int rc, String path, Object ctx, byte[] data, Stat stat) -> {
log.info("rc:{}", rc);
log.info("path:{}", path);
log.info("ctx:{}", ctx);
log.info("data:{}", new String(data));
log.info("stat:{}", stat);
}, null);
Thread.sleep(3 * 60 * 1000);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testDelete() {
try {
this.keeper.delete("/node_6", -1, (int rc, String path, Object ctx) -> {
log.info("rc:{}", rc);
log.info("path:{}", path);
log.info("ctx:{}", ctx);
}, null);
Thread.sleep(3 * 60 * 1000);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testSetData() {
try {
this.keeper.setData("/node_1", "aaa".getBytes(), -1, (int rc, String path, Object ctx, Stat stat) -> {
log.info("rc:{}", rc);
log.info("path:{}", path);
log.info("ctx:{}", ctx);
log.info("stat:{}", stat);
}, null);
Thread.sleep(3 * 60 * 1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
4 總結(jié)
- 整個(gè)API還是簡(jiǎn)單易用的
- 注冊(cè)的監(jiān)聽器只是一次性的,沒有提供類似于自動(dòng)注冊(cè)多次的API
- session的超時(shí)重連可能導(dǎo)致watcher的重復(fù)執(zhí)行蚣抗,需要手動(dòng)自己控制
- 返回值中還有的是byte[],入?yún)⒁灿衎yte[]侈百。操作不是很舒服
當(dāng)然,也有zookeeper編程的其他框架可用翰铡,比如ZkClient等