Zookeeper/Curator客戶端

Curator解決了很多Zookeeper客戶端非常底層的細(xì)節(jié)開發(fā)工作,包括連接重連,反復(fù)注冊(cè)Watcher和NodeExistsException異常等。此外還有zkClient和Zooleeper自帶的Java API。

添加依賴:
在pom.xml文件中添加如下內(nèi)容即可欢嘿。

  <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>2.8.0</version>
  </dependency>
  <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>2.8.0</version>
  </dependency>
  <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-client</artifactId>
      <version>2.8.0</version>
  </dependency> 

創(chuàng)建會(huì)話:
Curator除了使用一般方法創(chuàng)建會(huì)話外,還可以使用fluent風(fēng)格進(jìn)行創(chuàng)建也糊。

    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;

    public class Create_Session_Sample {
        public static void main(String[] args) throws Exception {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                    .sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("base").build();
            client.start();
            System.out.println("Zookeeper session established. ");        
        }
    }

    運(yùn)行結(jié)果: 
    Zookeeper session1 established. 
    Zookeeper session2 established. 

session會(huì)話含有隔離命名空間炼蹦,即客戶端對(duì)Zookeeper上數(shù)據(jù)節(jié)點(diǎn)的任何操作都是相對(duì)/base目錄進(jìn)行的,這有利于實(shí)現(xiàn)不同的Zookeeper的業(yè)務(wù)之間的隔離狸剃。當(dāng)然也可以不設(shè)置框弛。

創(chuàng)建節(jié)點(diǎn):
通過使用Fluent風(fēng)格的接口,開發(fā)人員可以進(jìn)行自由組合來完成各種類型節(jié)點(diǎn)的創(chuàng)建捕捂。

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;

    public class Create_Node_Sample {
        public static void main(String[] args) throws Exception {
            String path = "/zk-book/c1";
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")
                    .sessionTimeoutMs(5000)
                    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                    .namespace("base")
                    .build();

            client.start();
            
            client.create().creatingParentsIfNeeded()
                           .withMode(CreateMode.EPHEMERAL)
                           .forPath(path, "i am c1".getBytes());

            System.out.println("success create znode: " + path);
        }
    }

    運(yùn)行結(jié)果:
    success create znode: /zk-book/c1

其中,也創(chuàng)建了/base/zk-book/c1的父節(jié)點(diǎn)/base/zk-book節(jié)點(diǎn)斗搞。

刪除節(jié)點(diǎn):

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.data.Stat;

    public class Del_Data_Sample {
        public static void main(String[] args) throws Exception {
            String path = "/zk-book/c1";
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString("127.0.0.1:2181")
                    .sessionTimeoutMs(5000)
                    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                    .namespace("base")
                    .build();

            client.start();

            client.create().creatingParentsIfNeeded()
                           .withMode(CreateMode.EPHEMERAL)
                           .forPath(path, "i am c1".getBytes());

            System.out.println("success create znode: " + path);
            //以上指攒,節(jié)點(diǎn)創(chuàng)建完成。

            Stat stat = new Stat();
            System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));
            client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path);
            System.out.println("success delete znode " + path);
        }
    }
    
    運(yùn)行結(jié)果: 
    i am c1
    success delete znode /zk-book/c1

獲取數(shù)據(jù):

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.data.Stat;

    public class Get_Data_Sample {
        public static void main(String[] args) throws Exception {
            String path = "/zk-book";
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                    .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
            client.start();
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "i am c1".getBytes());
            Stat stat = new Stat();
            byte b[] = client.getData().storingStatIn(stat).forPath(path);
            System.out.println(new String(b));
        }
    }

    運(yùn)行結(jié)果:
    i am c1

更新數(shù)據(jù):

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.data.Stat;

    public class Set_Data_Sample {
        public static void main(String[] args) throws Exception {
            String path = "/zk-book";
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                    .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
            client.start();
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "i am c1".getBytes());            

            stat = client.setData().withVersion(stat.getVersion()).forPath(path);
            System.out.println("Success set node for : " + path + ", new version: "+ stat.getVersion());
        }
    }

    運(yùn)行結(jié)果:  
    Success set node for : /zk-book, new version: 1

異步接口:
如同Zookeeper原生API提供了異步接口僻焚,Curator也提供了異步接口允悦。在Zookeeper中,所有的異步通知事件處理都是由EventThread這個(gè)線程來處理的虑啤,EventThread線程用于串行處理所有的事件通知隙弛,其可以保證對(duì)事件處理的順序性,但是一旦碰上復(fù)雜的處理單元狞山,會(huì)消耗過長(zhǎng)的處理時(shí)間全闷,從而影響其他事件的處理,Curator允許用戶傳入Executor實(shí)例萍启,這樣可以將比較復(fù)雜的事件處理放到一個(gè)專門的線程池中去总珠。

    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.api.BackgroundCallback;
    import org.apache.curator.framework.api.CuratorEvent;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;

    public class Create_Node_Background_Sample {
        static String path = "/zk-book";
        static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        
        static CountDownLatch semaphore = new CountDownLatch(2);
        static ExecutorService tp = Executors.newFixedThreadPool(2);

        public static void main(String[] args) throws Exception {
            client.start();
            System.out.println("Main thread: " + Thread.currentThread().getName());

            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
                                                     .inBackground(
                                                         new BackgroundCallback(){
                                                             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                                                                 System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
                                                                 semaphore.countDown();
                                                             }
                                                         }, tp
                                                         )
                                                    .forPath(path, "init".getBytes());

            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
                                                     .inBackground(
                                                         new BackgroundCallback(){
                                                             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                                                                 System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
                                                                 semaphore.countDown();
                                                             }
                                                         }
                                                         )
                                                     .forPath(path, "init".getBytes());

            semaphore.await();
            tp.shutdown();
        }
    }
    
    運(yùn)行結(jié)果:
    Main thread: main
    event[code: -110, type: CREATE], Thread of processResult: main-EventThread
    event[code: 0, type: CREATE], Thread of processResult: pool-3-thread-1

其中屏鳍,創(chuàng)建節(jié)點(diǎn)的事件由線程池自己處理,而非默認(rèn)線程處理局服。

節(jié)點(diǎn)監(jiān)聽:

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.cache.NodeCache;
    import org.apache.curator.framework.recipes.cache.NodeCacheListener;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;

    public class NodeCache_Sample {

        public static void main(String[] args) throws Exception {
            String path = "/zk-book/nodecache";
            CuratorFramework client = CuratorFrameworkFactory.builder()
                                                            .connectString("127.0.0.1:2181")
                                                            .sessionTimeoutMs(5000)
                                                            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                                                            .namespace("base")
                                                            .build();
            client.start();

            //新建節(jié)點(diǎn)
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "i am nodecache".getBytes());
            
            //監(jiān)聽
            final NodeCache cache = new NodeCache(client, path, false);
            cache.start(true);
            cache.getListenable().addListener(new NodeCacheListener() {
                public void nodeChanged() throws Exception {
                    System.out.println("Node data update, new data: " + new String(cache.getCurrentData().getData()));
                }
            });

            //更新節(jié)點(diǎn)
            client.setData().forPath(path, "u".getBytes());
            Thread.sleep(1000);
        }
    }

    運(yùn)行結(jié)果:  
    Node data update, new data: u

當(dāng)節(jié)點(diǎn)數(shù)據(jù)變更后收到了通知钓瞭。NodeCache不僅可以監(jiān)聽數(shù)據(jù)節(jié)點(diǎn)的內(nèi)容變更,也能監(jiān)聽指定節(jié)點(diǎn)是否存在。

子節(jié)點(diǎn)監(jiān)聽:

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.cache.PathChildrenCache;
    import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;

    public class PathChildrenCache_Sample {

        public static void main(String[] args) throws Exception {
            String path = "/zk-book/nodecache";
            CuratorFramework client = CuratorFrameworkFactory.builder()
                                                            .connectString("127.0.0.1:2181")
                                                            .sessionTimeoutMs(5000)
                                                            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                                                            .namespace("base")
                                                            .build();
            client.start();

            PathChildrenCache cache = new PathChildrenCache(client, path, true);
            cache.start(StartMode.POST_INITIALIZED_EVENT);
            cache.getListenable().addListener(new PathChildrenCacheListener() {
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                    switch (event.getType()) {
                    case CHILD_ADDED:
                        System.out.println("CHILD_ADDED," + event.getData().getPath());
                        break;
                    case CHILD_UPDATED:
                        System.out.println("CHILD_UPDATED," + event.getData().getPath());
                        break;
                    case CHILD_REMOVED:
                        System.out.println("CHILD_REMOVED," + event.getData().getPath());
                        break;
                    default:
                        break;
                    }
                }
            });

            client.create().withMode(CreateMode.PERSISTENT).forPath(path);
            client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
            client.delete().forPath(path + "/c1");
            Thread.sleep(1000);
        }
    }

    運(yùn)行結(jié)果:
    CHILD_ADDED,/zk-book/c1
    CHILD_REMOVED,/zk-book/c1

監(jiān)聽節(jié)點(diǎn)的子節(jié)點(diǎn),包括新增历造、數(shù)據(jù)變化芹扭、刪除三類事件。

Master選舉:
借助Zookeeper褥傍,開發(fā)者可以很方便地實(shí)現(xiàn)Master選舉功能,其大體思路如下:選擇一個(gè)根節(jié)點(diǎn),如/master_select系吩,多臺(tái)機(jī)器同時(shí)向該節(jié)點(diǎn)創(chuàng)建一個(gè)子節(jié)點(diǎn)/master_select/lock,利用Zookeeper特性妒蔚,最終只有一臺(tái)機(jī)器能夠成功創(chuàng)建穿挨,成功的那臺(tái)機(jī)器就是Master。

    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.leader.LeaderSelector;
    import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
    import org.apache.curator.retry.ExponentialBackoffRetry;

    public class Recipes_MasterSelect {

        public static void main(String[] args) throws Exception {
        String path = "/zk-book/nodecache";
            CuratorFramework client = CuratorFrameworkFactory.builder()
                                                            .connectString("127.0.0.1:2181")
                                                            .sessionTimeoutMs(5000)
                                                            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                                                            .namespace("base")
                                                            .build();
            client.start();

            LeaderSelector selector = new LeaderSelector(client, master_path, new LeaderSelectorListenerAdapter() {
                public void takeLeadership(CuratorFramework client) throws Exception {
                    System.out.println("成為Master角色");
                    Thread.sleep(3000);
                    System.out.println("完成Master操作肴盏,釋放Master權(quán)利");
                }
            });
            selector.autoRequeue();
            selector.start();
            Thread.sleep(1000);
        }
    }

    運(yùn)行結(jié)果:
    成為Master角色
    完成Master操作科盛,釋放Master權(quán)利
    成為Master角色

以上結(jié)果會(huì)反復(fù)循環(huán),并且當(dāng)一個(gè)應(yīng)用程序完成Master邏輯后菜皂,另外一個(gè)應(yīng)用程序的相應(yīng)方法才會(huì)被調(diào)用贞绵,即當(dāng)一個(gè)應(yīng)用實(shí)例成為Master后,其他應(yīng)用實(shí)例會(huì)進(jìn)入等待恍飘,直到當(dāng)前Master掛了或者推出后才會(huì)開始選舉Master榨崩。


參考:http://www.cnblogs.com/leesf456/p/6032716.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市章母,隨后出現(xiàn)的幾起案子母蛛,更是在濱河造成了極大的恐慌,老刑警劉巖乳怎,帶你破解...
    沈念sama閱讀 207,248評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件彩郊,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡蚪缀,警方通過查閱死者的電腦和手機(jī)秫逝,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來询枚,“玉大人违帆,你說我怎么就攤上這事〗鹗瘢” “怎么了前方?”我有些...
    開封第一講書人閱讀 153,443評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵狈醉,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我惠险,道長(zhǎng)苗傅,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,475評(píng)論 1 279
  • 正文 為了忘掉前任班巩,我火速辦了婚禮渣慕,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘抱慌。我一直安慰自己逊桦,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,458評(píng)論 5 374
  • 文/花漫 我一把揭開白布抑进。 她就那樣靜靜地躺著强经,像睡著了一般。 火紅的嫁衣襯著肌膚如雪寺渗。 梳的紋絲不亂的頭發(fā)上匿情,一...
    開封第一講書人閱讀 49,185評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音信殊,去河邊找鬼炬称。 笑死,一個(gè)胖子當(dāng)著我的面吹牛涡拘,可吹牛的內(nèi)容都是我干的玲躯。 我是一名探鬼主播,決...
    沈念sama閱讀 38,451評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼鳄乏,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼跷车!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起橱野,我...
    開封第一講書人閱讀 37,112評(píng)論 0 261
  • 序言:老撾萬榮一對(duì)情侶失蹤朽缴,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后仲吏,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,609評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蝌焚,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,083評(píng)論 2 325
  • 正文 我和宋清朗相戀三年裹唆,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片只洒。...
    茶點(diǎn)故事閱讀 38,163評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡许帐,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出毕谴,到底是詐尸還是另有隱情成畦,我是刑警寧澤距芬,帶...
    沈念sama閱讀 33,803評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站循帐,受9級(jí)特大地震影響框仔,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜拄养,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,357評(píng)論 3 307
  • 文/蒙蒙 一离斩、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧瘪匿,春花似錦跛梗、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至顽染,卻和暖如春漾岳,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背家乘。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評(píng)論 1 261
  • 我被黑心中介騙來泰國打工蝗羊, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人仁锯。 一個(gè)月前我還...
    沈念sama閱讀 45,636評(píng)論 2 355
  • 正文 我出身青樓耀找,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國和親业崖。 傳聞我的和親對(duì)象是個(gè)殘疾皇子野芒,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,925評(píng)論 2 344

推薦閱讀更多精彩內(nèi)容