Zookeeper基礎(chǔ)(三):Zookeeper客戶端ZkClient

1涛碑、ZkClient簡(jiǎn)介

ZkClient是由Datameer的工程師開(kāi)發(fā)的開(kāi)源客戶端,對(duì)Zookeeper的原生API進(jìn)行了包裝,實(shí)現(xiàn)了超時(shí)重連枫疆、Watcher反復(fù)注冊(cè)等功能忘分;目前已經(jīng)應(yīng)用到了很多項(xiàng)目中棋枕,比如Dubbo、Kafka妒峦、Helix重斑;

Github:https://github.com/sgroschupf/zkclient
Maven依賴

<dependency>
  <groupId>com.101tec</groupId>
  <artifactId>zkclient</artifactId>
  <version></version>
</dependency>

或者

 <dependency>
     <groupId>com.github.sgroschupf</groupId>
     <artifactId>zkclient</artifactId>
     <version></version>
 </dependency>

2、ZkClient組件

image.png

IZKConnection:是一個(gè)ZkClient與Zookeeper之間的一個(gè)適配器肯骇;在代碼里直接使用的是ZKClient窥浪,實(shí)質(zhì)上還是委托了zookeeper來(lái)處理了。

在ZKClient中笛丙,根據(jù)事件類型漾脂,分為

  • 節(jié)點(diǎn)事件(數(shù)據(jù)事件),對(duì)應(yīng)的事件處理器是IZKDataListener胚鸯;
  • 子節(jié)點(diǎn)事件骨稿,對(duì)應(yīng)的事件處理器是IZKChildListener;
  • Session事件蠢琳,對(duì)應(yīng)的事件處理器是IZKStatusListener啊终;

ZkEventThread:是專門(mén)用來(lái)處理事件的線程

3、API介紹

  • 啟動(dòng)ZKClient:在創(chuàng)建ZKClient對(duì)象時(shí)傲须,就完成了到ZooKeeper服務(wù)器連接的建立
    1蓝牲、啟動(dòng)時(shí),制定好connection string泰讽,連接超時(shí)時(shí)間例衍,序列化工具等
    2、創(chuàng)建并啟動(dòng)eventThread已卸,用于接收事件佛玄,并調(diào)度事件監(jiān)聽(tīng)器Listener的執(zhí)行
    3、連接到Zookeeper服務(wù)器累澡,同時(shí)將ZKClient自身作為默認(rèn)的Watcher
image.png
  • 為節(jié)點(diǎn)注冊(cè)Watcher
    Zookeeper 原始API的三個(gè)方法:getData梦抢,getChildren、exists愧哟,ZKClient都提供了相應(yīng)的代理方法奥吩,比如exists哼蛆,
image.png

hasListeners是看有沒(méi)有與該數(shù)據(jù)節(jié)點(diǎn)綁定的listener


image.png

所以,默認(rèn)情況下霞赫,都會(huì)自動(dòng)的為指定的path注冊(cè)watcher腮介,并且是默認(rèn)的watcher(ZKClient),那么怎樣才能讓hasListeners值為true呢端衰,也就是怎么才能為path綁定Listener呢叠洗?
ZKClient提供了訂閱功能,一個(gè)新建的會(huì)話旅东,只需要在取得響應(yīng)的數(shù)據(jù)節(jié)點(diǎn)后灭抑,調(diào)用subscribeXXX就可以訂閱上相應(yīng)的事件了。


image.png
  • Zookeeper的CURD(節(jié)點(diǎn)的增刪查改)
    Zookeeper中提供的變更操作有:節(jié)點(diǎn)的創(chuàng)建玉锌、刪除名挥,節(jié)點(diǎn)數(shù)據(jù)的修改

1、創(chuàng)建操作主守,節(jié)點(diǎn)分為4種禀倔,所以ZKClient分別為他們提供了相應(yīng)的代理


image.png

2、刪除節(jié)點(diǎn)操作


image.png

3参淫、修改節(jié)點(diǎn)數(shù)據(jù)


image.png

updateDataSerialized:修改已系列化的數(shù)據(jù)救湖;執(zhí)行過(guò)程是,先讀取數(shù)據(jù)涎才,然后DataUpdater對(duì)數(shù)據(jù)修改鞋既,最后調(diào)用writeData將修改后的數(shù)據(jù)發(fā)送給服務(wù)端;
writeDataReturnStat:寫(xiě)數(shù)據(jù)并返回?cái)?shù)據(jù)的狀態(tài)耍铜;

4邑闺、客戶端處理變更流程

ZKClient是默認(rèn)的Watcher(ZKClient實(shí)現(xiàn)了Watcher接口),并且在為各個(gè)數(shù)據(jù)節(jié)點(diǎn)注冊(cè)的Watcher都是這個(gè)默認(rèn)的Watcher棕兼,那么該如何將各種事件通知給相應(yīng)的Listener呢:
1陡舅、判斷變更類型,變更類型分為state變更伴挚、ChildNode變更靶衍、NodeData變更;
2茎芋、取出與path關(guān)聯(lián)的Listeners颅眶,并為每一個(gè)Listener創(chuàng)建一個(gè)ZKEvent,將ZkEvent田弥,將ZkEvent交給ZkEventThread處理涛酗;
3、ZkEventThread線程,拿到ZkEvent后煤杀,只需要調(diào)用ZkEvent的run方法進(jìn)行處理就可以了眷蜈,所以,具體的如何調(diào)用Listener沈自,還要依賴于ZkEvent的run()實(shí)現(xiàn)

5、序列化處理

Zookeeper中辜妓,會(huì)涉及到序列化枯途、反序列化的操作有兩種:getData/setData;在ZkClient中籍滴,分別用readData/WriteData來(lái)替代了酪夷。

  • ReadData:先調(diào)用zookeeper的getData,然后使用ZKSerializer進(jìn)行反序列化工作
  • WriteData:先使用ZKSerializer將對(duì)象序列化后孽惰,再調(diào)用zookeeper的setData

6晚岭、注冊(cè)監(jiān)聽(tīng)

在ZkClient中客戶端可以通過(guò)注冊(cè)相關(guān)的事件監(jiān)聽(tīng)來(lái)實(shí)現(xiàn)對(duì)Zookeeper服務(wù)端事件的訂閱。


image.png

7勋功、demo

package com.xxx.api.zkclient;

import com.xxx.ZookeeperUtil;
import com.xxx.api.natives.crud.User;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class ZkClientCrud<T> {

    ZkClient zkClient ;
    final static Logger logger = LoggerFactory.getLogger(ZkClientCrud.class);

    public ZkClientCrud(ZkSerializer zkSerializer) {
        logger.info("鏈接zk開(kāi)始");
       // zkClient=new ZkClient(ZookeeperUtil.connectString,ZookeeperUtil.sessionTimeout);
        zkClient=new ZkClient(ZookeeperUtil.connectString,ZookeeperUtil.sessionTimeout,ZookeeperUtil.sessionTimeout,zkSerializer);
    }


    public void createEphemeral(String path,Object data){

        zkClient.createEphemeral(path,data);
    }

    /***
     * 支持創(chuàng)建遞歸方式
     * @param path
     * @param createParents
     */
    public void createPersistent(String path,boolean createParents){

        zkClient.createPersistent(path,createParents);
    }

    /***
     * 創(chuàng)建節(jié)點(diǎn) 跟上data數(shù)據(jù)
     * @param path
     * @param data
     */
    public void createPersistent(String path,Object data){

        zkClient.createPersistent(path,data);
    }

    /***
     * 子節(jié)點(diǎn)
     * @param path
     * @return
     */
    public  List<String> getChildren(String path){
       return zkClient.getChildren(path);

    }

    public  T readData(String path){
        return zkClient.readData(path);
    }

    public  void  writeData(String path,Object data){
         zkClient.writeData(path,data);
    }

    //遞歸刪除
    public  void deleteRecursive(String path){
        zkClient.deleteRecursive(path);

    }
}

package com.xxx.api.zkclient;
import com.xxx.api.natives.crud.User;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class ZkClientCrudTest {
    final static Logger logger = LoggerFactory.getLogger(ZkClientCrudTest.class);
    public static void main(String[] args) {
        ZkClientCrud<User> zkClientCrud=new ZkClientCrud<User>(new SerializableSerializer());
        String path="/root";
        zkClientCrud.deleteRecursive(path);
        zkClientCrud.createPersistent(path,"hi");
     /*  zkClientCrud.createPersistent(path+"/a/b/c",true);//遞歸創(chuàng)建 但是不能設(shè)在value
       //zkClientCrud.createPersistent(path,"hi");
        logger.info(zkClientCrud.readData(path));
        //更新
        zkClientCrud.writeData(path,"hello");
        logger.info(zkClientCrud.readData(path));
        logger.info(String.valueOf(zkClientCrud.getChildren(path)));
        //子節(jié)點(diǎn)
        List<String> list=zkClientCrud.getChildren(path);
        for(String child:list){
            logger.info("子節(jié)點(diǎn):"+child);
        }*/

        User user=new User();
        user.setUserid(1);
        user.setUserName("張三");
        zkClientCrud.writeData(path,user);
        System.out.println(zkClientCrud.readData(path).getUserName());;


    }



}

package com.xxx.api.zkclient;


import com.xxx.ZookeeperUtil;
import org.I0Itec.zkclient.*;
import org.apache.zookeeper.Watcher;

import java.util.List;

public class ZkClientWatcher    {
    ZkClient zkClient;
    public ZkClientWatcher() {
        zkClient= new ZkClient(new ZkConnection(ZookeeperUtil.connectString), ZookeeperUtil.sessionTimeout);
    }


    public void createPersistent(String path,Object data){
        zkClient.createPersistent(path,data);
    }


    public  void writeData(String path,Object object){
        zkClient.writeData(path,object);

    }

    public  void delete(String path){
        zkClient.delete(path);

    }

    public  boolean exists(String path){
        return zkClient.exists(path);

    }

    public  void deleteRecursive(String path){
        zkClient.deleteRecursive(path);

    }

    //對(duì)父節(jié)點(diǎn)添加監(jiān)聽(tīng)數(shù)據(jù)變化坦报。
    public void subscribe(String path){


        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.printf("變更的節(jié)點(diǎn)為:%s,數(shù)據(jù):%s\r\n", dataPath,data );
            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.printf("刪除的節(jié)點(diǎn)為:%s\r\n", dataPath );
            }
        });
    }
    //對(duì)父節(jié)點(diǎn)添加監(jiān)聽(tīng)子節(jié)點(diǎn)變化。
    public void subscribe2(String path){
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println("父節(jié)點(diǎn): " + parentPath+",子節(jié)點(diǎn):"+currentChilds+"\r\n");
            }
        });
    }


    //客戶端狀態(tài)
    public void subscribe3(String path) {
        zkClient.subscribeStateChanges(new IZkStateListener() {
            @Override
            public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
                if(state== Watcher.Event.KeeperState.SyncConnected){
                    //當(dāng)我重新啟動(dòng)后start狂鞋,監(jiān)聽(tīng)觸發(fā)
                    System.out.println("連接成功");
                }else if(state== Watcher.Event.KeeperState.Disconnected){
                    System.out.println("連接斷開(kāi)");//當(dāng)我在服務(wù)端將zk服務(wù)stop時(shí)片择,監(jiān)聽(tīng)觸發(fā)
                }else
                    System.out.println("其他狀態(tài)"+state);
            }

            @Override
            public void handleNewSession() throws Exception {
                System.out.println("重建session");

            }

            @Override
            public void handleSessionEstablishmentError(Throwable error) throws Exception {

            }
        });

    }


  /*  @Override
    public void handleDataChange(String dataPath, Object data) throws Exception {



    }

    @Override
    public void handleDataDeleted(String dataPath) throws Exception {

    }*/
}

package com.xxx.api.zkclient;

import java.util.concurrent.TimeUnit;

public class ZkClientWatcherTest {
    public static void main(String[] args) throws InterruptedException {
        ZkClientWatcher zkClientWatche=new ZkClientWatcher();
        String path="/root";
        zkClientWatche.deleteRecursive(path);
        zkClientWatche.createPersistent(path,"hello");
        zkClientWatche.subscribe(path);
        zkClientWatche.subscribe2(path);
       // zkClientWatche.subscribe3(path);//需要啟服務(wù)
       // Thread.sleep(Integer.MAX_VALUE);
        zkClientWatche.createPersistent(path+"/root2","word");
        TimeUnit.SECONDS.sleep(1);
        zkClientWatche.writeData(path,"hi");
        TimeUnit.SECONDS.sleep(1);
        //zkClientWatche.delete(path);//如果目錄下有內(nèi)容 不能刪除 會(huì)報(bào) Directory not empty for /root的異常
        zkClientWatche.deleteRecursive(path);
        TimeUnit.SECONDS.sleep(1); //這個(gè)main線程就結(jié)束

    }
}

package com.xxx;

public class ZookeeperUtil {

    /** zookeeper服務(wù)器地址 */
    public static final String connectString = "192.168.0.101:2181,192.168.0.102:2181,192.168.0.104:2181";
    /** 定義session失效時(shí)間 */
    public static final int sessionTimeout = 5000;
    public static final String path = "/root";
}



最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市骚揍,隨后出現(xiàn)的幾起案子字管,更是在濱河造成了極大的恐慌,老刑警劉巖信不,帶你破解...
    沈念sama閱讀 218,284評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嘲叔,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡抽活,警方通過(guò)查閱死者的電腦和手機(jī)硫戈,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)酌壕,“玉大人掏愁,你說(shuō)我怎么就攤上這事÷央梗” “怎么了果港?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,614評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)糊昙。 經(jīng)常有香客問(wèn)我辛掠,道長(zhǎng),這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,671評(píng)論 1 293
  • 正文 為了忘掉前任萝衩,我火速辦了婚禮回挽,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘猩谊。我一直安慰自己千劈,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,699評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布牌捷。 她就那樣靜靜地躺著墙牌,像睡著了一般。 火紅的嫁衣襯著肌膚如雪暗甥。 梳的紋絲不亂的頭發(fā)上喜滨,一...
    開(kāi)封第一講書(shū)人閱讀 51,562評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音撤防,去河邊找鬼虽风。 笑死,一個(gè)胖子當(dāng)著我的面吹牛寄月,可吹牛的內(nèi)容都是我干的辜膝。 我是一名探鬼主播,決...
    沈念sama閱讀 40,309評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼剥懒,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼内舟!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起初橘,我...
    開(kāi)封第一講書(shū)人閱讀 39,223評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤验游,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后保檐,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體耕蝉,經(jīng)...
    沈念sama閱讀 45,668評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,859評(píng)論 3 336
  • 正文 我和宋清朗相戀三年夜只,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了垒在。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,981評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡扔亥,死狀恐怖场躯,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情旅挤,我是刑警寧澤踢关,帶...
    沈念sama閱讀 35,705評(píng)論 5 347
  • 正文 年R本政府宣布,位于F島的核電站粘茄,受9級(jí)特大地震影響签舞,放射性物質(zhì)發(fā)生泄漏秕脓。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,310評(píng)論 3 330
  • 文/蒙蒙 一儒搭、第九天 我趴在偏房一處隱蔽的房頂上張望吠架。 院中可真熱鬧,春花似錦搂鲫、人聲如沸傍药。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,904評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)怔檩。三九已至,卻和暖如春蓄诽,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背媒吗。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,023評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人版仔。 一個(gè)月前我還...
    沈念sama閱讀 48,146評(píng)論 3 370
  • 正文 我出身青樓钝凶,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親甫何。 傳聞我的和親對(duì)象是個(gè)殘疾皇子出吹,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,933評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • ... 一、相關(guān)概念 中間件:為分布式系統(tǒng)提供協(xié)調(diào)服務(wù)的組件辙喂,如專門(mén)用于計(jì)算服務(wù)的機(jī)器就是一個(gè)計(jì)算型中間件捶牢,還有專...
    帥可兒妞閱讀 476評(píng)論 0 0
  • 轉(zhuǎn)自:http://www.reibang.com/p/84ad63127cd1作者:Jeffbond 簡(jiǎn)介 Z...
    小北覓閱讀 933評(píng)論 0 8
  • Zookeeper系列文章1.Zookeeper簡(jiǎn)介2.Zookeeper集群安裝3.原生API操作Zookeep...
    deve_雨軒閱讀 570評(píng)論 0 0
  • 時(shí)常聽(tīng)到心靈雞湯:開(kāi)心一天是過(guò),不開(kāi)心一天也是過(guò)巍耗,為何不開(kāi)開(kāi)心心度過(guò)每一天呢秋麸?簡(jiǎn)單樸素的話語(yǔ)直抵心間,可似乎總有些...
    Christy_22ba閱讀 161評(píng)論 0 0
  • 我喜歡黑夜的顏色 我更喜歡在夜深人靜的時(shí)候 用我的雙眼 去看清這夜晚的城市 但我什么也看不見(jiàn) 此刻炬太,我想到了鬼 只...
    柳四公子閱讀 361評(píng)論 0 0