ZK應(yīng)用場(chǎng)景
監(jiān)聽+回調(diào)—NodeCache
// 節(jié)點(diǎn)監(jiān)控測(cè)試
public class NodeWatcherTest {
static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
.connectString("localhost:32770")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.namespace("watcher")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
public static void main(String[] args) throws Exception {
String path = "/nodecache";
zkFluentClient.start();
// 創(chuàng)建節(jié)點(diǎn)
zkFluentClient.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());
// 創(chuàng)建監(jiān)控watcher
final NodeCache nodeCache = new NodeCache(zkFluentClient, path);
nodeCache.start();// 開啟監(jiān)控
// 注冊(cè)對(duì)應(yīng)的listener
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
// 這里只是打印出來
System.out.println("Node Data updated,new Data:" + new String(nodeCache.getCurrentData().getData()));
}
});
// 做個(gè)實(shí)際的更新處理--set
zkFluentClient.setData().forPath(path,"update my data".getBytes());
TimeUnit.SECONDS.sleep(10);
// 清除對(duì)應(yīng)的數(shù)據(jù)
zkFluentClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
TimeUnit.MICROSECONDS.sleep(Integer.MAX_VALUE);
}
}
這里關(guān)鍵是NodeCache,理解這個(gè)東西。
new NodeCache第三個(gè)參數(shù)箱硕,默認(rèn)為false。如果為true悟衩,那么NodeCache在第一次啟動(dòng)的時(shí)候就會(huì)立刻從zk上讀取對(duì)應(yīng)節(jié)點(diǎn)的數(shù)據(jù)內(nèi)容剧罩,并保存在Cache中。
NodeCache不僅可以用于監(jiān)聽數(shù)據(jù)節(jié)點(diǎn)的內(nèi)容變更座泳,也可以監(jiān)聽指定節(jié)點(diǎn)是否存在惠昔。如果原節(jié)點(diǎn)不存在,那么Cache就會(huì)在節(jié)點(diǎn)被創(chuàng)建后觸發(fā)listener挑势。但是如果數(shù)據(jù)節(jié)點(diǎn)被刪除镇防,就無法觸發(fā)listener。
子節(jié)點(diǎn)監(jiān)聽--PathChildrenCache
注意潮饱,無法監(jiān)聽二級(jí)子節(jié)點(diǎn)来氧。
子節(jié)點(diǎn)變化有三種事件:CHILD_ADD\CHILD_UPDATED\CHILD_REMOVED
import java.util.concurrent.TimeUnit;
public class PathChildrenCacheTest {
static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
.connectString("localhost:32770")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.namespace("child-watcher")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
public static void main(String[] args) throws Exception {
zkFluentClient.start();
final String path = "/child3";
// 指定需要監(jiān)控的父目錄
final PathChildrenCache pathChildrenCache = new PathChildrenCache(zkFluentClient, path, true);
pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
// 注冊(cè)listener
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework,
PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
switch (pathChildrenCacheEvent.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED" + pathChildrenCacheEvent.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED" + pathChildrenCacheEvent.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED" + pathChildrenCacheEvent.getData().getPath());
break;
default:
break;
}
}
});
// 創(chuàng)建父節(jié)點(diǎn),如果存在就跳過
//zkFluentClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
// TimeUnit.SECONDS.sleep(1);
zkFluentClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
zkFluentClient.setData().forPath(path + "/c1","update data".getBytes()); // update 沒看到執(zhí)行回調(diào),但是數(shù)據(jù)內(nèi)容已經(jīng)更新了
zkFluentClient.delete().guaranteed().forPath(path + "/c1");
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
}
這里會(huì)關(guān)注子節(jié)點(diǎn)的變化香拉,但是update內(nèi)容貌似并沒有觸發(fā)listener的執(zhí)行啦扬,好神奇。