zk 原生API 連接

對zk 操作的方式有一下幾種:

  • 基于java的 shell命令缭黔,zkCli.sh
  • 原生的java api
  • zkClient
  • Curator
  • spring cloud zookeeper

原生API

代碼展示

  • ZKConstant 常量
public class ZKConstant {
    public static final String CONNET_STR = "192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181";
    public static final int SESSION_TIMEOUT = 5000;
}
  • CreateSession
public class CreateSession_API {
    private static ZooKeeper zk1;
    private static CountDownLatch connectSemaphore = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
//        createSession();
        createSessionWithSID();
    }

    public static void createSession() throws Exception {
        //Zookeeper是API提供的1個類,我們連接zk集群,進(jìn)行相應(yīng)的znode操作,都是通過ZooKeeper的實(shí)例進(jìn)行,這個實(shí)例就是zk client权旷,和命令行客戶端是同樣的角色
        //Zookeeper實(shí)例的創(chuàng)建需要傳遞3個參數(shù)
        //connectString 代表要連接zk集群服務(wù),通過逗號分隔
        // 注冊watcher事件
        zk1 = new ZooKeeper(ZKConstant.CONNET_STR, ZKConstant.SESSION_TIMEOUT, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
//                這個方法只會調(diào)用一次贯溅,在這個session建立完成調(diào)用
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    connectSemaphore.countDown();
                    System.out.println("event:" + watchedEvent);
                    System.out.println("receive session established.");
                }
            }
        });
        System.out.println(zk1.getState());
        connectSemaphore.await();
        System.out.println("zk session established");
    }

    // 重復(fù)使用上次session, 利用sessionId和passwd
    public static void createSessionWithSID() throws Exception {
        zk1 = new ZooKeeper(ZKConstant.CONNET_STR, ZKConstant.SESSION_TIMEOUT,
                new MyWatcher());
        connectSemaphore.await();
        long sessionId = zk1.getSessionId();
        byte[] passwd = zk1.getSessionPasswd();

        zk1 = new ZooKeeper(ZKConstant.CONNET_STR, ZKConstant.SESSION_TIMEOUT,
                new MyWatcher(),
                1l, "test".getBytes());

        zk1 = new ZooKeeper(ZKConstant.CONNET_STR, ZKConstant.SESSION_TIMEOUT,
                new MyWatcher(),
                sessionId,
                passwd);
        Thread.sleep(Integer.MAX_VALUE);
    }

    static class MyWatcher implements Watcher {
        @Override
        public void process(WatchedEvent watchedEvent) {
//            只注冊一次
            System.out.println("receive watched event:" + watchedEvent);
            if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
                connectSemaphore.countDown();
            }
        }
    }
}
  • CreateNode
public class CreateNode_API {
    private static ZooKeeper zk1;
    private static CountDownLatch connectSemaphore = new CountDownLatch(1); // 同步計(jì)數(shù)器
    public static void main(String[] args) throws Exception {
//        createNodeASync();
        createNodeSync();
    }
    public static void createNodeSync() throws Exception {
        ZooKeeper zookeeper = new ZooKeeper(ZKConstant.CONNET_STR,
                ZKConstant.SESSION_TIMEOUT, //
                new MyWatcher());
        connectSemaphore.await();
        // 創(chuàng)建臨時節(jié)點(diǎn)
        String path1 = zookeeper.create("/zk-test-ephemeral-",
                "".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL);
        System.out.println("Success create znode: " + path1);

        String path2 = zookeeper.create("/zk-test-ephemeral-",
                "".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("Success create znode: " + path2);
    }
    public static void createNodeASync() throws Exception {
        ZooKeeper zk1 = new ZooKeeper(ZKConstant.CONNET_STR, ZKConstant.SESSION_TIMEOUT,
                new MyWatcher());
        connectSemaphore.await();
        // 創(chuàng)建臨時節(jié)點(diǎn)
        zk1.create("/zk-test-eph-", "".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
                new IStringCallback(), "I am context1.");
        zk1.create("/zk-test-eph-", "".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
                new IStringCallback(), "i am context2");
        //  創(chuàng)建臨時有序節(jié)點(diǎn)
        zk1.create("/zk-test-eph-", "".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                new IStringCallback(), "i am context3");
        Thread.sleep(Integer.MAX_VALUE);
    }
    static class MyWatcher implements Watcher {
        @Override
        public void process(WatchedEvent watchedEvent) {
//            建立連接成功回調(diào)
            if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                connectSemaphore.countDown();
            }
        }
    }
    // 創(chuàng)建節(jié)點(diǎn)成功回調(diào)
    static class IStringCallback implements AsyncCallback.StringCallback {
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            System.out.println("create path result: [" + rc + "," + path + "," + ctx + ", real path name:" + name);
        }
    }
}

  • DeleteNode
public class DeleteNode_API {
   private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
   private static ZooKeeper zk;
   public static void main(String[] args) throws Exception {
       deleteNodeSync();
   }
   public static void deleteNodeSync() throws Exception {
       String path = "/zk_book";
       zk = new ZooKeeper(ZKConstant.CONNET_STR, ZKConstant.SESSION_TIMEOUT,
               new DeleteWatcher());
       connectedSemaphore.await();
       zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
       zk.delete(path, -1);
       Thread.sleep(Integer.MAX_VALUE);
   }
   static class DeleteWatcher implements Watcher {
       @Override
       public void process(WatchedEvent watchedEvent) {
           if (Event.KeeperState.SyncConnected == watchedEvent.getState() && watchedEvent.getPath() == null) {
               connectedSemaphore.countDown();
           }
       }
   }
}
  • ExistsNode
public class ExistsNode_API {
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private static ZooKeeper zk;
    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        zk = new ZooKeeper(ZKConstant.CONNET_STR,
                ZKConstant.SESSION_TIMEOUT, //
                new MyWatcher());
        connectedSemaphore.await();
//         對path 路勁進(jìn)行監(jiān)聽
        zk.exists(path, true);
        zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk.setData(path, "123".getBytes(), -1);
        zk.create(path+"/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk.setData(path + "/c1", "000".getBytes(), -1);
        zk.delete(path + "/c1", -1);
        zk.delete(path, -1);
        Thread.sleep(Integer.MAX_VALUE);
    }
    static class MyWatcher implements Watcher {
        public void process(WatchedEvent watchedEvent) {
            try {
                if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
                    if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) {
                        connectedSemaphore.countDown();
                    } else if (Event.EventType.NodeCreated == watchedEvent.getType()) {
                        System.out.println("node (" + watchedEvent.getPath() + ") created ");
                        zk.exists(watchedEvent.getPath(), true);
                    } else if (Event.EventType.NodeDeleted == watchedEvent.getType()) {
                        System.out.println("node (" + watchedEvent.getPath() + ") deleted ");
                        zk.exists(watchedEvent.getPath(), true);
                    } else if (Event.EventType.NodeDataChanged == watchedEvent.getType()) {
                        System.out.println("node (" + watchedEvent.getPath() + ") dataChanged");
                        zk.exists(watchedEvent.getPath(), true);
                    }
                }
            } catch (Exception e) {
            }
        }
    }
}
  • GetData
public class GetData_API {
    private static ZooKeeper zk1;
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private static Stat stat = new Stat();
    static String path = "/zk-book";
    public static void main(String[] args) throws Exception {
        sync_setData();
    }
    public static void sync_setData() throws Exception {
        String path = "/zk-book";
        zk1 = new ZooKeeper(ZKConstant.CONNET_STR,
                ZKConstant.SESSION_TIMEOUT, //
                new MyWatcher());
        connectedSemaphore.await();
        Stat stat = zk1.setData(path, "haha".getBytes(), -1);
        System.out.println(stat.getCzxid() + "," + stat.getMzxid() + "," +
                stat.getVersion());
        Stat stat2 = zk1.setData(path, "haha".getBytes(), -1);
        System.out.println(stat.getCzxid() + "," + stat.getMzxid() + "," +
                stat.getVersion());
        try {
            // 指定version拄氯, 需要正確的version才可以通過
            zk1.setData(path, "456".getBytes(), stat.getVersion());
        } catch (KeeperException e) {
            System.out.println("Error: " + e.code() + "," + e.getMessage());
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    public static void async_setData() throws Exception {
        String path = "/zk-book";
        zk1 = new ZooKeeper(ZKConstant.CONNET_STR,
                ZKConstant.SESSION_TIMEOUT, //
                new MyWatcher());
        connectedSemaphore.await();
        zk1.setData(path, "456".getBytes(), -1, new AsyncCallback.StatCallback() {
            public void processResult(int i, String s, Object o, Stat stat) {
                if (i == 0) {
                    System.out.println("SUCCESS");
                }
            }
        }, null);
        Thread.sleep(Integer.MAX_VALUE);
    }
    public static void sync_getChildren() throws Exception {
        String path = "/zk-book";
        zk1 = new ZooKeeper(ZKConstant.CONNET_STR,
                ZKConstant.SESSION_TIMEOUT, //
                new MyWatcher());
        connectedSemaphore.await();
//        zk1.delete(path+"/c1", 0);
        zk1.delete(path, 0);
        zk1.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT);
        zk1.create(path + "/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT);
        List<String> childrenList = zk1.getChildren(path, true);
        System.out.println(childrenList);
        zk1.create(path + "/c2", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL);
        Thread.sleep(Integer.MAX_VALUE);
    }

    public static void async_getChildren() throws Exception {
        String path = "/zk-book";
        zk1 = new ZooKeeper(ZKConstant.CONNET_STR,
                ZKConstant.SESSION_TIMEOUT, //
                new MyWatcher());
        connectedSemaphore.await();
        zk1.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT);
        zk1.create(path + "/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT);
        // 只會響應(yīng)一次
        zk1.getChildren(path, true, new AsyncCallback.Children2Callback() {
            public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
                System.out.println("Get Children znode result: [response code: " + rc + ", param path: " + path
                        + ", ctx: " + ctx + ", children list: " + children + ", stat: " + stat);
            }
        }, null);
        zk1.create(path + "/c2", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL);
        Thread.sleep(Integer.MAX_VALUE);
    }
    public static void sync_getData() throws Exception {
        String path = "/zk-book";
        zk1 = new ZooKeeper(ZKConstant.CONNET_STR,
                ZKConstant.SESSION_TIMEOUT, //
                new MyWatcher());
        connectedSemaphore.await();
        zk1.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println(new String(zk1.getData(path, true, stat)));
        System.out.println(stat.getCzxid() + "," + stat.getMzxid() + "," + stat.getVersion());
        zk1.setData(path, "456".getBytes(), -1);
        Thread.sleep(Integer.MAX_VALUE);
    }
    public static void async_getData() throws Exception {
        zk1 = new ZooKeeper(ZKConstant.CONNET_STR,
                ZKConstant.SESSION_TIMEOUT, //
                new MyWatcher());
        connectedSemaphore.await();
        zk1.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        zk1.getData(path, true, new IDataCallback(), null);
        zk1.setData(path, "456".getBytes(), -1);
        Thread.sleep(Integer.MAX_VALUE);
    }
    static class MyWatcher implements Watcher {
        public void process(WatchedEvent watchedEvent) {
            if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
                if (Event.EventType.None == watchedEvent.getType() && null == watchedEvent.getPath()) {
                    connectedSemaphore.countDown();
                } else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
                    try {
                        zk1.getData(watchedEvent.getPath(), true, new IDataCallback(), null);
                    } catch (Exception e) {
                    }
                }
            }
        }
    }
    static class IDataCallback implements AsyncCallback.DataCallback {
        public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
            System.out.println(rc + ", " + path + ", " + new String(data));
            System.out.println("--" + stat.getCzxid() + "," +
                    stat.getMzxid() + "," +
                    stat.getVersion());
        }
    }
}

PS: 若你覺得可以、還行它浅、過得去译柏、甚至不太差的話,可以“關(guān)注”或者“點(diǎn)贊”一下姐霍,就此謝過!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末鄙麦,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子镊折,更是在濱河造成了極大的恐慌胯府,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,378評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件恨胚,死亡現(xiàn)場離奇詭異骂因,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)赃泡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,970評論 3 399
  • 文/潘曉璐 我一進(jìn)店門寒波,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人升熊,你說我怎么就攤上這事俄烁。” “怎么了级野?”我有些...
    開封第一講書人閱讀 168,983評論 0 362
  • 文/不壞的土叔 我叫張陵页屠,是天一觀的道長。 經(jīng)常有香客問我,道長卷中,這世上最難降的妖魔是什么矛双? 我笑而不...
    開封第一講書人閱讀 59,938評論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮蟆豫,結(jié)果婚禮上议忽,老公的妹妹穿的比我還像新娘。我一直安慰自己十减,他們只是感情好栈幸,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,955評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著帮辟,像睡著了一般速址。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上由驹,一...
    開封第一講書人閱讀 52,549評論 1 312
  • 那天芍锚,我揣著相機(jī)與錄音,去河邊找鬼蔓榄。 笑死并炮,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的甥郑。 我是一名探鬼主播逃魄,決...
    沈念sama閱讀 41,063評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼澜搅!你這毒婦竟也來了伍俘?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,991評論 0 277
  • 序言:老撾萬榮一對情侶失蹤勉躺,失蹤者是張志新(化名)和其女友劉穎癌瘾,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體饵溅,經(jīng)...
    沈念sama閱讀 46,522評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡柳弄,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,604評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了概说。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,742評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡嚣伐,死狀恐怖糖赔,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情轩端,我是刑警寧澤放典,帶...
    沈念sama閱讀 36,413評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站,受9級特大地震影響奋构,放射性物質(zhì)發(fā)生泄漏壳影。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,094評論 3 335
  • 文/蒙蒙 一弥臼、第九天 我趴在偏房一處隱蔽的房頂上張望宴咧。 院中可真熱鬧,春花似錦径缅、人聲如沸掺栅。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,572評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽氧卧。三九已至,卻和暖如春氏堤,著一層夾襖步出監(jiān)牢的瞬間沙绝,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,671評論 1 274
  • 我被黑心中介騙來泰國打工鼠锈, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留闪檬,地道東北人。 一個月前我還...
    沈念sama閱讀 49,159評論 3 378
  • 正文 我出身青樓脚祟,卻偏偏與公主長得像谬以,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子由桌,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,747評論 2 361