一、zookeeper介紹
? ? ? ? zookeeper是源代碼開放的分布式協(xié)調(diào)服務(wù)鹊杖,由雅虎創(chuàng)建悴灵,是Google的開源實現(xiàn)。zookeeper是一個高性能的分布式數(shù)據(jù)一致性解決方案骂蓖,它將那些復雜的积瞒、容易出錯的分布式一致性服務(wù)封裝起來,構(gòu)成一個高效可靠的原語集涯竟,并提供一系列簡單易用的接口給用戶使用。
? ??????其本身提供了一致性保證空厌,特點如下:
????????(1)順序一致性:客戶端的更新順序與它們被發(fā)送的順序一致庐船。
????????(2)原子性:更新操作要么成功,要么失敗嘲更,沒有第三種結(jié)果筐钟。
????????(3)單系統(tǒng)鏡像:無論客戶端連接到哪一個服務(wù)器,他都將看到相同的zookeeper視圖赋朦。
????????(4)可靠性:一旦一個更新操作被應(yīng)用篓冲,那么在客戶端再次更新之前李破,其值不會再改變。
二壹将、應(yīng)用場景
? ? ? ? zookeeper可以應(yīng)用于很多的分布式服務(wù)場景嗤攻,包括:集群管理,master選舉诽俯,發(fā)布/訂閱妇菱,分布式鎖,分布式隊列暴区,分布式命名服務(wù)闯团,服務(wù)注冊于發(fā)現(xiàn),負載均衡等等仙粱。下面一個例子介紹zookeeper如何實現(xiàn)分布式隊列房交。
三、zookeeper分布式隊列實現(xiàn)
? ? ? ??zookeeper分布式隊列的實現(xiàn)完成以下幾個要素:
? ? ? ? (1)伐割、數(shù)據(jù)入隊候味,在一個節(jié)點下創(chuàng)建有序子節(jié)點,節(jié)點中設(shè)置需要入隊的數(shù)據(jù)口猜,完成數(shù)據(jù)的入隊操作负溪。
? ? ? ? (2)、數(shù)據(jù)出隊济炎,取出該節(jié)點下的所有子節(jié)點川抡,如果數(shù)量不為0,取出一個子節(jié)點须尚,并將子節(jié)點刪除崖堤。
? ? ? ? (3)、提供判斷是否有數(shù)據(jù)等的api耐床。
? ? ? ? 下面為具體代碼實現(xiàn)
1密幔、DistributedSimpleQueue類
public class DistributedSimpleQueue{
????????protected final ZkClient zkClient;
? ? ? ? protected final String root;
????????protected static final String Node_NAME = "n_";
????????public DistributedSimpleQueue(ZkClient zkClient, String root) {
????????????????this.zkClient = zkClient;
????????????????this.root = root;
? ? ? ? ?}
????????public int size() {
????????????return zkClient.getChildren(root).size();
????????}
????????public boolean isEmpty() {
????????????return zkClient.getChildren(root).size() == 0;
????????}?
? ? ? ? public boolean offer(T element) throws Exception{?
? ? ? ? ? ? ? ? String nodeFullPath = root .concat( "/" ).concat( Node_NAME );
?????????????????try {?
?????????????????????????zkClient.createPersistentSequential(nodeFullPath , element);?
?????????????????}catch (ZkNoNodeException e) {?
?????????????????????????zkClient.createPersistent(root);?
?????????????????????????offer(element);?
?????????????????} catch (Exception e) {?
?????????????????????throw ExceptionUtil.convertToRuntimeException(e);?
?????????????????}?
?????????????????return true;?
? ? ? ? ? ? ?}
????????????@SuppressWarnings("unchecked")
? ? ? ? ? ? public T poll() throws Exception {
? ? ? ? ? ? ? ? ? ?try {
? ? ? ? ? ? ? ? ? ? ? ? ? ?List? list = zkClient.getChildren(root);
? ? ? ? ? ? ? ? ? ? ? ? ? ?if (list.size() == 0) {
????????????????????????????????return null;
????????????????????????????}
????????????????????????????Collections.sort(list, new Comparator() {
????????????????????????????????????public int compare(String lhs, String rhs) {
????????????????????????????????????????????return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));
????????????????????????????????????}
????????????????????????????});
????????????????????????????for ( String nodeName : list ){
????????????????????????????????????String nodeFullPath = root.concat("/").concat(nodeName);
????????????????????????????????????try {
????????????????????????????????????????????T node = (T) zkClient.readData(nodeFullPath);
????????????????????????????????????????????zkClient.delete(nodeFullPath);
????????????????????????????????????????????return node;
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?} catch (ZkNoNodeException e) {
????????????????????????????????????????????????// ignore
????????????????????????????????????????}
????????????????????????????}
????????????????????????????return null;
? ? ? ? ? ? ? ? ? ? } catch (Exception e) {
????????????????????????????throw ExceptionUtil.convertToRuntimeException(e);
????????????????????}
????????????}
????????????private String getNodeNumber(String str, String nodeName) {
????????????????????int index = str.lastIndexOf(nodeName);
????????????????????if (index >= 0) {
????????????????????????index += Node_NAME.length();
????????????????????????return index <= str.length() ? str.substring(index) : "";
????????????????????}
????????????????????return str;
????????????}
}
2、阻塞隊列實現(xiàn)類
public class DistributedBlockingQueue? extends DistributedSimpleQueue{?
?????????public DistributedBlockingQueue(ZkClient zkClient, String root) {?
?????????????????super(zkClient, root);
????????}?
? ? ? ? @Override
? ? ? ? public T poll() throws Exception {
????????????????while (true){
????????????????????????final CountDownLatch latch = new CountDownLatch(1);
????????????????????????final IZkChildListener childListener = new IZkChildListener() {
????????????????????????????????public void handleChildChange(String parentPath, List currentChilds) throws Exception {
????????????????????????????????????????latch.countDown();
????????????????????????????????}
????????????????????????};
????????????????????????zkClient.subscribeChildChanges(root, childListener);
????????????????????????try{
????????????????????????????????T node = super.poll();
? ? ? ? ? ????????????????????? if ( node != null ){
? ? ? ? ? ? ? ? ????????????????????????return node;
? ? ? ? ? ????????????????????? }else{
? ? ? ? ? ????????????????????????????? latch.await();
? ? ? ? ? ????????????????????? }
????????????????????????}finally{
????????????????????????????????zkClient.unsubscribeChildChanges(root, childListener);
????????????????????????}
????????????????}
????????????}
}
? ? ? ? 阻塞隊列的實現(xiàn)利用了CountDownLatch 的特性撩轰。當子節(jié)點數(shù)量為0時胯甩,即隊列中沒有元素,這是線程在此等待堪嫂,同時監(jiān)聽子節(jié)點的變化偎箫,如果有數(shù)據(jù)入隊,則從等待返回皆串,取出數(shù)據(jù)淹办。
3、測試類
public class TestDistributedBlockingQueue {
????????????public static void main(String[] args) {
????????????????????ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
????????????????????int delayTime = 5;
? ? ? ? ? ? ? ? ? ? ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer());
????????????????????final DistributedBlockingQueuequeue = new DistributedBlockingQueue(zkClient,"/Queue");
????????????????????final User user1 = new User();
????????????????????user1.setId("1");
????????????????????user1.setName("xiao wang");
????????????????????final User user2 = new User();
????????????????????user2.setId("2");
????????????????????user2.setName("xiao wang");
????????????????????try {
????????????????????????????delayExector.schedule(new Runnable() {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? public void run() {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?try {
????????????????????????????????????????????????queue.offer(user1);
????????????????????????????????????????????????queue.offer(user2);
????????????????????????????????????????????} catch (Exception e) {
????????????????????????????????????????????????????e.printStackTrace();
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }
????????????????????????????????????????}
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}, delayTime , TimeUnit.SECONDS);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?System.out.println("ready poll!");
????????????????????????????????User u1 = (User) queue.poll();
????????????????????????????????User u2 = (User) queue.poll();
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){
????????????????????????????????????????System.out.println("Success!");
????????????????????????????????}
????????????????????} catch (Exception e) {
????????????????????????????e.printStackTrace();
????????????????????} finally{
????????????????????????????????delayExector.shutdown();
????????????????????try {
????????????????????????????????delayExector.awaitTermination(2, TimeUnit.SECONDS);
????????????????????} catch (InterruptedException e) {
????????????????????}
????????????}
????????}
}