Zookeeper入門之五-常見應(yīng)用場(chǎng)景及代碼

Master選舉實(shí)現(xiàn)

思路:選擇一個(gè)根節(jié)點(diǎn),例如/master_select酬诀,多臺(tái)機(jī)器同時(shí)向該節(jié)點(diǎn)創(chuàng)建一個(gè)子節(jié)點(diǎn) /master_select/lock鸳碧,利用zk的特性报强,最終只有一臺(tái)機(jī)器能夠創(chuàng)建成功,這臺(tái)機(jī)器就是master

 static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .retryPolicy(new ExponentialBackoffRetry(1000,3))
            .namespace("master_select")
            .build();

    public static void main(String[] args) throws InterruptedException {

        zkFluentClient.start();

        String selectPath = "/master_select";

        LeaderSelector selector = new LeaderSelector(zkFluentClient, selectPath, new LeaderSelectorListenerAdapter() {
            @Override
            // 需要注意的是爬虱,一旦執(zhí)行完這個(gè)方法隶债,curator就會(huì)立即釋放Master的權(quán)利,然后重新開始新一輪的Master選舉
            public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                System.out.println("Be a Leader");
                TimeUnit.SECONDS.sleep(3);
                System.out.println("釋放 Leader ");
            }
        });

        selector.autoRequeue();

        selector.start();

        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);

    }

如果同時(shí)有2個(gè)請(qǐng)求跑筝,可以看到交替執(zhí)行死讹,創(chuàng)建2個(gè)臨時(shí)節(jié)點(diǎn):

[zk: localhost:2181(CONNECTED) 21] ls /master_select/master_select
[_c_93265fd6-4b11-4668-baf8-e4211a8d1b5f-lock-0000000067, _c_36a0c859-efb1-442a-9dff-26121e7a1a7e-lock-0000000068]

這里的臨時(shí)節(jié)點(diǎn),在master失效的時(shí)候就會(huì)被刪除曲梗。

一旦takeLeaderShip執(zhí)行結(jié)束赞警,master的就會(huì)被釋放妓忍,然后重新開始新一輪的master選舉。

分布式鎖

使用InterProcessMutex來(lái)做分布式鎖處理

public class DistributeLockTest {

    static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .namespace("lock")
            .build();

    public static void main(String[] args) {
        zkFluentClient.start();

        final InterProcessMutex lock = new InterProcessMutex(zkFluentClient, "/distribute_lock");

        final CountDownLatch latch = new CountDownLatch(1);

        for (int i = 0; i < 30; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {

                    try {
                        latch.await();
                        lock.acquire(); // 獲取鎖

                        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                        String orderNo = sdf.format(Date.from(Instant.now()));
                        System.out.println("OrderNo is:" + orderNo);

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            lock.release();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }


                }
            }).start();
        }

        latch.countDown();// 這里很有意思愧旦,在主線程啟動(dòng)了幾十個(gè)線程之后世剖,這些線程都是hold住的(通過(guò) countDownLatch.await()方法)
        // 然后主線程處理latch.countDown(),導(dǎo)致所有子線程同時(shí)滿足觸發(fā)條件,同時(shí)執(zhí)行笤虫,保證并發(fā)旁瘫。不過(guò)僅用在測(cè)試環(huán)節(jié)比較合適。
        // 其實(shí)latch可以去掉琼蚯,只是這樣并發(fā)沒(méi)有那么集中酬凳。
    }
}
分布式計(jì)數(shù)器

思路很類似,用上述分布式鎖的思路遭庶。

比如統(tǒng)計(jì)在線人數(shù)宁仔,指定zk的一個(gè)數(shù)據(jù)節(jié)點(diǎn)作為計(jì)數(shù)器,多個(gè)應(yīng)用實(shí)例在分布式鎖的控制下峦睡,通過(guò)更新該數(shù)據(jù)節(jié)點(diǎn)的內(nèi)容來(lái)實(shí)現(xiàn)計(jì)數(shù)功能台诗。

public class DistributeCounterTest {

    static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
            .namespace("counter")
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .retryPolicy(new ExponentialBackoffRetry(800, 5))
            .build();

    public static void main(String[] args) throws Exception {

        zkFluentClient.start();

        // 計(jì)數(shù)器
        DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(zkFluentClient, "/adder",
                new RetryNTimes(3, 1000));

        AtomicValue<Integer> rc = atomicInteger.add(8);
        rc = atomicInteger.increment();
        rc = atomicInteger.decrement();
        atomicInteger.increment();

        System.out.println("Result:" + rc.succeeded());
        System.out.println("preValue:" + rc.preValue() + ",postValue:" + rc.postValue());
        System.out.println();

        String value = new String(zkFluentClient.getData().forPath("/adder"));
        System.out.println(value);

        // 試著重新取
        DistributedAtomicInteger newAtomicInteger = new DistributedAtomicInteger(zkFluentClient, "/adder", new RetryNTimes(3, 800));
        System.out.println(newAtomicInteger.get().preValue() + "_" + newAtomicInteger.get().postValue());
    }
}

可以看到,只要同一個(gè)路徑下赐俗,對(duì)應(yīng)的DistributeAtomicInteger的對(duì)象值都是同一個(gè)拉队,可以隨時(shí)創(chuàng)建一個(gè)對(duì)象直接使用。

分布式Barrier

先看一個(gè)JDK自帶的CyclicBarrier阻逮,先看下CyclicBarrier的說(shuō)明:

CyclicBarrier 的字面意思是可循環(huán)(Cyclic)使用的屏障(Barrier)粱快。它要做的事情是,讓一組線程到達(dá)一個(gè)屏障(也可以叫同步點(diǎn))時(shí)被阻塞叔扼,直到最后一個(gè)線程到達(dá)屏障時(shí)事哭,屏障才會(huì)開門,所有被屏障攔截的線程才會(huì)繼續(xù)干活瓜富。線程進(jìn)入屏障通過(guò)CyclicBarrier的await()方法鳍咱。

CyclicBarrier默認(rèn)的構(gòu)造方法是CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量与柑,每個(gè)線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障谤辜,然后當(dāng)前線程被阻塞。

實(shí)現(xiàn)原理:在CyclicBarrier的內(nèi)部定義了一個(gè)Lock對(duì)象价捧,每當(dāng)一個(gè)線程調(diào)用CyclicBarrier的await方法時(shí)丑念,將剩余攔截的線程數(shù)減1,然后判斷剩余攔截?cái)?shù)是否為0结蟋,如果不是脯倚,進(jìn)入Lock對(duì)象的條件隊(duì)列等待。如果是嵌屎,執(zhí)行barrierAction對(duì)象的Runnable方法推正,然后將鎖的條件隊(duì)列中的所有線程放入鎖等待隊(duì)列中恍涂,這些線程會(huì)依次的獲取鎖、釋放鎖植榕,接著先從await方法返回乳丰,再?gòu)腃yclicBarrier的await方法中返回。

CyclicBarrier主要用于一組線程之間的相互等待内贮,而CountDownLatch一般用于一組線程等待另一組線程。實(shí)際上可以通過(guò)CountDownLatch的countDown()和await()來(lái)實(shí)現(xiàn)CyclicBarrier的功能汞斧。即 CountDownLatch中的countDown()+await() = CyclicBarrier中的await()夜郁。注意:在一個(gè)線程中先調(diào)用countDown(),然后調(diào)用await()粘勒。

先看代碼

public class DistributeCyclicBarrierTest {

    static CyclicBarrier jdkBarrier = new CyclicBarrier(3);
    

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newFixedThreadPool(3);
        executorService.execute(new Thread(new JdkBasedRuner("jinsiyu")));
        executorService.execute(new Thread(new JdkBasedRuner("AMANDA")));
        executorService.execute(new Thread(new JdkBasedRuner("QQ")));
    }

    static class JdkBasedRuner implements Runnable {

        private String name;

        public JdkBasedRuner(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println(name + " Ready!!!");

            try {
                jdkBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

            System.out.println(name + " GO!!");
        }
    }
}

結(jié)果如下:

jinsiyu Ready!!!
AMANDA Ready!!!
QQ Ready!!!

QQ GO!!
jinsiyu GO!!
AMANDA GO!!

可以看到竞端,只有當(dāng)CyclicBarrier中的值為0時(shí),才會(huì)統(tǒng)一執(zhí)行其后的操作庙睡,也就是“XXX GO”的語(yǔ)句打印事富。

而如果這里jdkBarrier如果設(shè)置的為4,那么下面三句“XXX GO”的語(yǔ)句根本不會(huì)打印乘陪,會(huì)一直等待统台。

ZK下的實(shí)現(xiàn):

    static DistributedBarrier distributedBarrier;

    static CuratorFramework zkClient = CuratorFrameworkFactory.builder()
            .connectionTimeoutMs(3000)
            .sessionTimeoutMs(5000)
            .connectString("localhost:32770")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .namespace("cyclicBarrier").build();

.......
        zkClient.start();
                // distribute
        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    distributedBarrier = new DistributedBarrier(zkClient, "/barrier");
                    System.out.println(Thread.currentThread().getName() + "號(hào)barrier設(shè)置");
                    try {
                        distributedBarrier.setBarrier(); // 看實(shí)現(xiàn),就是在create節(jié)點(diǎn)
                        distributedBarrier.waitOnBarrier(); // 等待啡邑,直到remove
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "Starting...");
                }
            }).start();
        }
        TimeUnit.SECONDS.sleep(2);
        distributedBarrier.removeBarrier(); // delete節(jié)點(diǎn)

ZKPaths & EnsurePath
public class ZKPathsTest {

    static String path = "/zkpath_sample";

    static CuratorFramework zkClient = CuratorFrameworkFactory.builder()
            .retryPolicy(new ExponentialBackoffRetry(1000,3))
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .build();

    public static void main(String[] args) throws Exception {

        zkClient.start();

        System.out.println(ZKPaths.fixForNamespace(path,"/sub"));
        System.out.println(ZKPaths.makePath(path,"/sub"));
        System.out.println(ZKPaths.getNodeFromPath("/zkpath_sample/sub1")); // 不存在節(jié)點(diǎn)也不會(huì)報(bào)錯(cuò)贱勃,從路徑str中截取


        ZKPaths.PathAndNode pn = ZKPaths.getPathAndNode("/zkpath_sample/sub1"); // 獲取節(jié)點(diǎn),不存在也不會(huì)報(bào)錯(cuò),只是從路徑上截取
        System.out.println(pn.getPath());
        System.out.println(pn.getNode());

        // 獲取zookeeper谤逼,這個(gè)是干啥的贵扰?
        ZooKeeper zooKeeper = zkClient.getZookeeperClient().getZooKeeper();

        String dir1 = path + "/child1";
        String dir2 = path + "/child2";
        ZKPaths.mkdirs(zooKeeper,dir1); // 創(chuàng)建目錄,如果存在不會(huì)報(bào)錯(cuò)流部,也不會(huì)拋異常
        ZKPaths.mkdirs(zooKeeper,dir2);

        System.out.println(ZKPaths.getSortedChildren(zooKeeper,path)); // 獲取已排序的子節(jié)點(diǎn)

        ZKPaths.deleteChildren(zooKeeper,path,false); // 刪除子節(jié)點(diǎn)戚绕,如果最后一個(gè)參數(shù)為true,會(huì)刪除本身
    }
}
public class EnsurePathTest {

    static String path = "/path2";

    static CuratorFramework zkClient = CuratorFrameworkFactory.builder()
            .connectionTimeoutMs(3000)
            .sessionTimeoutMs(5000)
            .retryPolicy(new ExponentialBackoffRetry(1000,3))
            .connectString("localhost:32770")
            .namespace("ensure")
            .build();

    public static void main(String[] args) throws Exception {

        zkClient.start();

        EnsurePath ensurePath = new EnsurePath(path);  // 這里跟namespace沒(méi)關(guān)系枝冀,只會(huì)從根目錄下開始建舞丛,所以是絕對(duì)路徑了
        ensurePath.ensure(zkClient.getZookeeperClient());

        EnsurePath ensurePath1 = zkClient.newNamespaceAwareEnsurePath("/c2"); // 用這個(gè)方法,namespace生效
        ensurePath1.ensure(zkClient.getZookeeperClient());
    }

}

不過(guò)EnsurePath貌似已經(jīng)不推薦使用了果漾。

順序節(jié)點(diǎn)
    static CuratorFramework zkClient = CuratorFrameworkFactory.builder()
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .namespace("sequence-jin")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
            .build();

    public static void main(String[] args) throws Exception {

        zkClient.start();
        // 創(chuàng)建順序節(jié)點(diǎn)
        zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)
                .forPath("/seq");
    }

執(zhí)行多次后瓷马,結(jié)果如下:

ls /sequence-jin
[seq0000000001, seq0000000000, seq0000000002]

可以看到,順序節(jié)點(diǎn)跨晴。欧聘。。就是如此

可以通過(guò)臨時(shí)節(jié)點(diǎn)來(lái)代替心跳端盆,來(lái)判斷client端是否存在怀骤。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末费封,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子蒋伦,更是在濱河造成了極大的恐慌弓摘,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,723評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件痕届,死亡現(xiàn)場(chǎng)離奇詭異韧献,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)研叫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,485評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門锤窑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人嚷炉,你說(shuō)我怎么就攤上這事渊啰。” “怎么了申屹?”我有些...
    開封第一講書人閱讀 152,998評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵绘证,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我哗讥,道長(zhǎng)嚷那,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,323評(píng)論 1 279
  • 正文 為了忘掉前任杆煞,我火速辦了婚禮车酣,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘索绪。我一直安慰自己湖员,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,355評(píng)論 5 374
  • 文/花漫 我一把揭開白布瑞驱。 她就那樣靜靜地躺著娘摔,像睡著了一般。 火紅的嫁衣襯著肌膚如雪唤反。 梳的紋絲不亂的頭發(fā)上凳寺,一...
    開封第一講書人閱讀 49,079評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音彤侍,去河邊找鬼肠缨。 笑死,一個(gè)胖子當(dāng)著我的面吹牛盏阶,可吹牛的內(nèi)容都是我干的晒奕。 我是一名探鬼主播,決...
    沈念sama閱讀 38,389評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼脑慧!你這毒婦竟也來(lái)了魄眉?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,019評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤闷袒,失蹤者是張志新(化名)和其女友劉穎坑律,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體囊骤,經(jīng)...
    沈念sama閱讀 43,519評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡晃择,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,971評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了也物。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片宫屠。...
    茶點(diǎn)故事閱讀 38,100評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖焦除,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情作彤,我是刑警寧澤膘魄,帶...
    沈念sama閱讀 33,738評(píng)論 4 324
  • 正文 年R本政府宣布,位于F島的核電站竭讳,受9級(jí)特大地震影響创葡,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜绢慢,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,293評(píng)論 3 307
  • 文/蒙蒙 一灿渴、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧胰舆,春花似錦骚露、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,289評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至倦零,卻和暖如春误续,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背扫茅。 一陣腳步聲響...
    開封第一講書人閱讀 31,517評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工蹋嵌, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人葫隙。 一個(gè)月前我還...
    沈念sama閱讀 45,547評(píng)論 2 354
  • 正文 我出身青樓栽烂,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子愕鼓,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,834評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • 此篇博客所有源碼均來(lái)自JDK 1.8 在上篇博客中介紹了Java四大并發(fā)工具之一的CyclicBarrier钙态,今天...
    chenssy閱讀 2,913評(píng)論 2 27
  • 進(jìn)程與線程 1.進(jìn)程: 進(jìn)程是資源分配的基本單位,又是調(diào)度運(yùn)行的基本單位菇晃,是系統(tǒng)中并發(fā)執(zhí)行的單位册倒。進(jìn)程是一個(gè)具...
    Sponge1128閱讀 670評(píng)論 0 1
  • CountDownLatch 介紹 CountDownLatch是一個(gè)同步協(xié)助類,允許一個(gè)或多個(gè)線程等待磺送,直到其他...
    tomas家的小撥浪鼓閱讀 3,178評(píng)論 0 9
  • 我看到一個(gè)人驻子,我在他的身上找到了一絲你的模樣,我覺(jué)得他呼出的白霧都是性感的估灿,我喜歡你崇呵,我想你,可你對(duì)我的定位很普通馅袁。
    怪咖與笨咖閱讀 80評(píng)論 0 0
  • 前段時(shí)間看了《滾蛋吧腫瘤君》域慷,有歡笑也有淚水。里面有很多情節(jié)汗销,特能觸動(dòng)我的淚腺犹褒。雖然與原著有所改動(dòng),但不能否認(rèn)白百...
    慕溪水閱讀 446評(píng)論 2 1