ZK的java客戶端—curator 基本使用
普通的增刪改查實(shí)現(xiàn)--同步接口
public class CuratorConTest {
static RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); // 重試3次
// 創(chuàng)建連接 -- 傳統(tǒng)寫(xiě)法
/* CuratorFramework zkClient = CuratorFrameworkFactory.newClient("localhost:32770",
5000,
3000,
policy);*/
// 創(chuàng)建連接-- 流式寫(xiě)法
static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
.connectString("localhost:32770")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(policy)
.namespace("zk-jsy") // 如果指定了某個(gè)應(yīng)用只能在某一個(gè)節(jié)點(diǎn)下操作配深,
// 可以指定namespace,這里base表示路徑為/base痊末。記得不能直接用/捏顺,會(huì)報(bào)錯(cuò)柒莉。
.build();
public static void main(String[] args) throws Exception {
// 連接開(kāi)啟
// zkClient.start();
zkFluentClient.start();
// 測(cè)試創(chuàng)建
CuratorConTest test = new CuratorConTest();
test.testCreate();
// 測(cè)試獲取
test.testGet();
// 測(cè)試更新
test.testUpdate();
// 測(cè)試刪除
test.testDelete();
// Thread.sleep(Integer.MAX_VALUE);
}
private void testCreate() throws Exception {
// 1、創(chuàng)建默認(rèn)類(lèi)型節(jié)點(diǎn),書(shū)上說(shuō)默認(rèn)內(nèi)容為空,但是實(shí)際上上我本地的ip地址
//org.apache.zookeeper.KeeperException$UnimplementedException: KeeperErrorCode = Unimplemented for /zk-jsy/book
// 是因?yàn)閦ookeeper的版本和curator的版本不兼容導(dǎo)致的部翘,默認(rèn)zk的版本是3.5.1-Alpha搂根,降級(jí)成3.4.8即可
zkFluentClient.create().forPath("/book1-" + ThreadLocalRandom.current().nextInt());
// 2珍促、創(chuàng)建有默認(rèn)值的節(jié)點(diǎn)
zkFluentClient.create().forPath("/book2-" + ThreadLocalRandom.current().nextFloat(), "mytestbook2Create".getBytes());
// 3、創(chuàng)建臨時(shí)節(jié)點(diǎn)剩愧,斷開(kāi)后會(huì)自動(dòng)清除
zkFluentClient.create().withMode(CreateMode.EPHEMERAL).
forPath("/book3-" + ThreadLocalRandom.current().nextInt());
// 4猪叙、創(chuàng)建臨時(shí)節(jié)點(diǎn),同時(shí)如果父節(jié)點(diǎn)不存在仁卷,也把父節(jié)點(diǎn)創(chuàng)建了穴翩。但是父節(jié)點(diǎn)會(huì)是持久節(jié)點(diǎn)
zkFluentClient.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL).
forPath("/test/book4-test" + ThreadLocalRandom.current().nextInt());
}
private void testGet() throws Exception {
String path = "/getData/mydata-" + ThreadLocalRandom.current().nextInt();
zkFluentClient.create().creatingParentsIfNeeded().forPath(path,
("sogetdata" + ThreadLocalRandom.current().nextInt()).getBytes());
// 1、獲取,注意返回的是bytes
String value = new String(zkFluentClient.getData().forPath(path));
System.out.println(value);
// 2五督、獲取屬性
Stat stat = new Stat();
String value11 = new String(zkFluentClient.getData().storingStatIn(stat).forPath(path));
System.out.println(stat.toString());
System.out.println(value11);
}
private void testUpdate() throws Exception {
String path = "/updateData/mydata-" + ThreadLocalRandom.current().nextInt();
zkFluentClient.create().creatingParentsIfNeeded().forPath(path,
("toBeUpdate" + ThreadLocalRandom.current().nextInt()).getBytes());
System.out.println("originData:" + new String(zkFluentClient.getData().forPath(path)));
// 1藏否、 普通的update,不管version
Stat stat = zkFluentClient.setData().forPath(path, ("newData" + ThreadLocalRandom.current().nextInt()).getBytes());
System.out.println("newData:" + new String(zkFluentClient.getData().forPath(path)));
// 2充包、樂(lè)觀鎖更新副签,可以用來(lái)實(shí)現(xiàn)CAS,如果version不匹配基矮,是無(wú)法更新的
zkFluentClient.setData().withVersion(stat.getVersion()).forPath(path, ("UpdateByVersion:" + stat.getVersion()).getBytes());
System.out.println("updateByVersionData:" + new String(zkFluentClient.getData().forPath(path)));
// 2.1 測(cè)試cas淆储,傳入version=1,而當(dāng)前實(shí)際為2
zkFluentClient.setData().withVersion(1).forPath(path,"error".getBytes());
System.out.println("updateByErrorVersionData:" + new String(zkFluentClient.getData().forPath(path)));
// KeeperErrorCode = BadVersion for /zk-jsy/updateData/mydata--1431282676 返回這個(gè)異常,version不對(duì)
}
private void testDelete() throws Exception {
// 1家浇、普通刪除本砰,但是不能刪除含有葉子節(jié)點(diǎn)的父節(jié)點(diǎn)
String path = "/book/forDelete" + ThreadLocalRandom.current().nextInt();
zkFluentClient.create().creatingParentsIfNeeded().forPath(path);
//Thread.sleep(20000); // sleep期間可以看到對(duì)應(yīng)的節(jié)點(diǎn)
zkFluentClient.delete().forPath(path); // 刪除節(jié)點(diǎn)
// 2、刪除節(jié)點(diǎn)钢悲,以及遞歸刪除其子節(jié)點(diǎn)点额,如果傳入/,刪除的是 namespace下的根目錄
zkFluentClient.delete().deletingChildrenIfNeeded().forPath("/book");
// 3舔株、無(wú)論如何,只要客戶端連接存在还棱,就會(huì)一直重試载慈,直到刪除成功,避免因?yàn)榧哼x主等情況造成數(shù)據(jù)無(wú)法清除
zkFluentClient.delete().guaranteed().deletingChildrenIfNeeded().forPath("/");
}
}
創(chuàng)建的異步實(shí)現(xiàn)
/**
* 異步實(shí)現(xiàn)
*/
public class AsyncCuratorTest {
static RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); // 重試3次
// 創(chuàng)建連接-- 流式寫(xiě)法
static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
.connectString("localhost:32770")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(policy)
.namespace("zk-asyncjsy") // 如果指定了某個(gè)應(yīng)用只能在某一個(gè)節(jié)點(diǎn)下操作,
// 可以指定namespace珍手,這里base表示路徑為/base办铡。記得不能直接用/,會(huì)報(bào)錯(cuò)琳要。
.build();
static CountDownLatch countDownLatch = new CountDownLatch(2); // countDownLatch
static ExecutorService tp = Executors.newFixedThreadPool(2); // ThreadPool
public static void main(String[] args) throws Exception {
String path = "/asyncCreate" + ThreadLocalRandom.current().nextInt();
zkFluentClient.start();
// 異步創(chuàng)建 -1
zkFluentClient.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println("Event[code:" + curatorEvent.getResultCode() + ", type:" + curatorEvent.getType());
System.out.println("Thread of processResult:" + Thread.currentThread().getName());
countDownLatch.countDown();
}
}, tp).forPath(path, "createInfo".getBytes());
// 異步創(chuàng)建 -2 寡具,會(huì)重復(fù),返回錯(cuò)誤碼
zkFluentClient.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println("Event[code:" + curatorEvent.getResultCode() + ", type:" + curatorEvent.getType());
System.out.println("Thread of processResult:" + Thread.currentThread().getName());
countDownLatch.countDown();
}
}).forPath(path, "createAgain".getBytes());
countDownLatch.await();
tp.shutdown();
}
}
執(zhí)行結(jié)果如下:
Event[code:0, type:CREATE
Thread of processResult:pool-3-thread-1
Event[code:-110, type:CREATE
Thread of processResult:main-EventThread
注意以下方面:
- 第一次異步回調(diào)傳入了 線程池tp稚补,用于在線程池中執(zhí)行對(duì)應(yīng)的回調(diào)童叠,可以從結(jié)果中看到,執(zhí)行的線程并不是Main線程
- 第二次異步回調(diào)课幕,并沒(méi)有傳入線程池tp拯钻,所以執(zhí)行操作的是主線程Main
- countDownLatch在這里只是為了保證線程執(zhí)行結(jié)束后,可以shutdown線程池
- 返回的event.getResultCode如果是0撰豺,表示操作成功,如果是其他值拼余,表示不成功污桦,比如-110,表示數(shù)據(jù)節(jié)點(diǎn)已存在匙监。