Curator解決了很多Zookeeper客戶端非常底層的細(xì)節(jié)開發(fā)工作,包括連接重連,反復(fù)注冊(cè)Watcher和NodeExistsException異常等。此外還有zkClient和Zooleeper自帶的Java API。
添加依賴:
在pom.xml文件中添加如下內(nèi)容即可欢嘿。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.8.0</version>
</dependency>
創(chuàng)建會(huì)話:
Curator除了使用一般方法創(chuàng)建會(huì)話外,還可以使用fluent風(fēng)格進(jìn)行創(chuàng)建也糊。
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class Create_Session_Sample {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("base").build();
client.start();
System.out.println("Zookeeper session established. ");
}
}
運(yùn)行結(jié)果:
Zookeeper session1 established.
Zookeeper session2 established.
session會(huì)話含有隔離命名空間炼蹦,即客戶端對(duì)Zookeeper上數(shù)據(jù)節(jié)點(diǎn)的任何操作都是相對(duì)/base目錄進(jìn)行的,這有利于實(shí)現(xiàn)不同的Zookeeper的業(yè)務(wù)之間的隔離狸剃。當(dāng)然也可以不設(shè)置框弛。
創(chuàng)建節(jié)點(diǎn):
通過使用Fluent風(fēng)格的接口,開發(fā)人員可以進(jìn)行自由組合來完成各種類型節(jié)點(diǎn)的創(chuàng)建捕捂。
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class Create_Node_Sample {
public static void main(String[] args) throws Exception {
String path = "/zk-book/c1";
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, "i am c1".getBytes());
System.out.println("success create znode: " + path);
}
}
運(yùn)行結(jié)果:
success create znode: /zk-book/c1
其中,也創(chuàng)建了/base/zk-book/c1的父節(jié)點(diǎn)/base/zk-book節(jié)點(diǎn)斗搞。
刪除節(jié)點(diǎn):
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class Del_Data_Sample {
public static void main(String[] args) throws Exception {
String path = "/zk-book/c1";
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(path, "i am c1".getBytes());
System.out.println("success create znode: " + path);
//以上指攒,節(jié)點(diǎn)創(chuàng)建完成。
Stat stat = new Stat();
System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));
client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path);
System.out.println("success delete znode " + path);
}
}
運(yùn)行結(jié)果:
i am c1
success delete znode /zk-book/c1
獲取數(shù)據(jù):
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class Get_Data_Sample {
public static void main(String[] args) throws Exception {
String path = "/zk-book";
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "i am c1".getBytes());
Stat stat = new Stat();
byte b[] = client.getData().storingStatIn(stat).forPath(path);
System.out.println(new String(b));
}
}
運(yùn)行結(jié)果:
i am c1
更新數(shù)據(jù):
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class Set_Data_Sample {
public static void main(String[] args) throws Exception {
String path = "/zk-book";
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "i am c1".getBytes());
stat = client.setData().withVersion(stat.getVersion()).forPath(path);
System.out.println("Success set node for : " + path + ", new version: "+ stat.getVersion());
}
}
運(yùn)行結(jié)果:
Success set node for : /zk-book, new version: 1
異步接口:
如同Zookeeper原生API提供了異步接口僻焚,Curator也提供了異步接口允悦。在Zookeeper中,所有的異步通知事件處理都是由EventThread這個(gè)線程來處理的虑啤,EventThread線程用于串行處理所有的事件通知隙弛,其可以保證對(duì)事件處理的順序性,但是一旦碰上復(fù)雜的處理單元狞山,會(huì)消耗過長(zhǎng)的處理時(shí)間全闷,從而影響其他事件的處理,Curator允許用戶傳入Executor實(shí)例萍启,這樣可以將比較復(fù)雜的事件處理放到一個(gè)專門的線程池中去总珠。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class Create_Node_Background_Sample {
static String path = "/zk-book";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
static CountDownLatch semaphore = new CountDownLatch(2);
static ExecutorService tp = Executors.newFixedThreadPool(2);
public static void main(String[] args) throws Exception {
client.start();
System.out.println("Main thread: " + Thread.currentThread().getName());
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.inBackground(
new BackgroundCallback(){
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
semaphore.countDown();
}
}, tp
)
.forPath(path, "init".getBytes());
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.inBackground(
new BackgroundCallback(){
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
semaphore.countDown();
}
}
)
.forPath(path, "init".getBytes());
semaphore.await();
tp.shutdown();
}
}
運(yùn)行結(jié)果:
Main thread: main
event[code: -110, type: CREATE], Thread of processResult: main-EventThread
event[code: 0, type: CREATE], Thread of processResult: pool-3-thread-1
其中屏鳍,創(chuàng)建節(jié)點(diǎn)的事件由線程池自己處理,而非默認(rèn)線程處理局服。
節(jié)點(diǎn)監(jiān)聽:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class NodeCache_Sample {
public static void main(String[] args) throws Exception {
String path = "/zk-book/nodecache";
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
//新建節(jié)點(diǎn)
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "i am nodecache".getBytes());
//監(jiān)聽
final NodeCache cache = new NodeCache(client, path, false);
cache.start(true);
cache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
System.out.println("Node data update, new data: " + new String(cache.getCurrentData().getData()));
}
});
//更新節(jié)點(diǎn)
client.setData().forPath(path, "u".getBytes());
Thread.sleep(1000);
}
}
運(yùn)行結(jié)果:
Node data update, new data: u
當(dāng)節(jié)點(diǎn)數(shù)據(jù)變更后收到了通知钓瞭。NodeCache不僅可以監(jiān)聽數(shù)據(jù)節(jié)點(diǎn)的內(nèi)容變更,也能監(jiān)聽指定節(jié)點(diǎn)是否存在。
子節(jié)點(diǎn)監(jiān)聽:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class PathChildrenCache_Sample {
public static void main(String[] args) throws Exception {
String path = "/zk-book/nodecache";
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
PathChildrenCache cache = new PathChildrenCache(client, path, true);
cache.start(StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED," + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED," + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED," + event.getData().getPath());
break;
default:
break;
}
}
});
client.create().withMode(CreateMode.PERSISTENT).forPath(path);
client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
client.delete().forPath(path + "/c1");
Thread.sleep(1000);
}
}
運(yùn)行結(jié)果:
CHILD_ADDED,/zk-book/c1
CHILD_REMOVED,/zk-book/c1
監(jiān)聽節(jié)點(diǎn)的子節(jié)點(diǎn),包括新增历造、數(shù)據(jù)變化芹扭、刪除三類事件。
Master選舉:
借助Zookeeper褥傍,開發(fā)者可以很方便地實(shí)現(xiàn)Master選舉功能,其大體思路如下:選擇一個(gè)根節(jié)點(diǎn),如/master_select系吩,多臺(tái)機(jī)器同時(shí)向該節(jié)點(diǎn)創(chuàng)建一個(gè)子節(jié)點(diǎn)/master_select/lock,利用Zookeeper特性妒蔚,最終只有一臺(tái)機(jī)器能夠成功創(chuàng)建穿挨,成功的那臺(tái)機(jī)器就是Master。
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class Recipes_MasterSelect {
public static void main(String[] args) throws Exception {
String path = "/zk-book/nodecache";
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("base")
.build();
client.start();
LeaderSelector selector = new LeaderSelector(client, master_path, new LeaderSelectorListenerAdapter() {
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println("成為Master角色");
Thread.sleep(3000);
System.out.println("完成Master操作肴盏,釋放Master權(quán)利");
}
});
selector.autoRequeue();
selector.start();
Thread.sleep(1000);
}
}
運(yùn)行結(jié)果:
成為Master角色
完成Master操作科盛,釋放Master權(quán)利
成為Master角色
以上結(jié)果會(huì)反復(fù)循環(huán),并且當(dāng)一個(gè)應(yīng)用程序完成Master邏輯后菜皂,另外一個(gè)應(yīng)用程序的相應(yīng)方法才會(huì)被調(diào)用贞绵,即當(dāng)一個(gè)應(yīng)用實(shí)例成為Master后,其他應(yīng)用實(shí)例會(huì)進(jìn)入等待恍飘,直到當(dāng)前Master掛了或者推出后才會(huì)開始選舉Master榨崩。