使用zookeeper實(shí)現(xiàn)配置動(dòng)態(tài)管理

zookeeper典型應(yīng)用場(chǎng)景之一就是利用發(fā)布訂閱模式實(shí)現(xiàn)配置動(dòng)態(tài)管理尸昧≠诵校基本原理就是將配置信息存在zk的某個(gè)節(jié)點(diǎn)中枉层,客戶端啟動(dòng)時(shí)從這個(gè)節(jié)點(diǎn)讀取配置信息啸臀,并Watcher届宠,一旦配置發(fā)生變化,客戶端會(huì)接收到變化通知,便可以再次讀取節(jié)點(diǎn)內(nèi)容席揽。

樣例

/**
 * A simple example program to use DataMonitor to start and
 * stop executables based on a znode. The program watches the
 * specified znode and saves the data that corresponds to the
 * znode in the filesystem. It also starts the specified program
 * with the specified arguments when the znode exists and kills
 * the program if the znode goes away.
 */
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class Executor
        implements Watcher, Runnable, DataMonitor.DataMonitorListener
{
    String znode;

    DataMonitor dm;

    ZooKeeper zk;

    String filename;

    String exec[];

    Process child;

    public Executor(String hostPort, String znode, String filename,
                    String exec[]) throws KeeperException, IOException {
        this.filename = filename;
        this.exec = exec;
        zk = new ZooKeeper(hostPort, 3000, this);
        dm = new DataMonitor(zk, znode, null, this);
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        if (args.length < 4) {
            System.err
                    .println("USAGE: Executor hostPort znode filename program [args ...]");
            System.exit(2);
        }
        String hostPort = args[0];
        String znode = args[1];
        String filename = args[2];
        String exec[] = new String[args.length - 3];
        System.arraycopy(args, 3, exec, 0, exec.length);
        try {
            new Executor(hostPort, znode, filename, exec).run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /***************************************************************************
     * We do process any events ourselves, we just need to forward them on.
     *
     * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent)
     */
    @Override
    public void process(WatchedEvent event) {
        dm.process(event);
    }

    @Override
    public void run() {
        try {
            synchronized (this) {
                while (!dm.dead) {
                    wait();
                }
            }
        } catch (InterruptedException e) {
        }
    }

    public void closing(int rc) {
        synchronized (this) {
            notifyAll();
        }
    }

    static class StreamWriter extends Thread {
        OutputStream os;

        InputStream is;

        StreamWriter(InputStream is, OutputStream os) {
            this.is = is;
            this.os = os;
            start();
        }

        public void run() {
            byte b[] = new byte[80];
            int rc;
            try {
                while ((rc = is.read(b)) > 0) {
                    os.write(b, 0, rc);
                }
            } catch (IOException e) {
            }

        }
    }

    public void exists(byte[] data) {
        if (data == null) {
            if (child != null) {
                System.out.println("Killing process");
                child.destroy();
                try {
                    child.waitFor();
                } catch (InterruptedException e) {
                }
            }
            child = null;
        } else {
            if (child != null) {
                System.out.println("Stopping child");
                child.destroy();
                try {
                    child.waitFor();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            try {
                FileOutputStream fos = new FileOutputStream(filename);
                fos.write(data);
                fos.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                System.out.println("Starting child");
                child = Runtime.getRuntime().exec(exec);
                new StreamWriter(child.getInputStream(), System.out);
                new StreamWriter(child.getErrorStream(), System.err);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
/**
 * A simple class that monitors the data and existence of a ZooKeeper
 * node. It uses asynchronous ZooKeeper APIs.
 */
import java.util.Arrays;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;

public class DataMonitor implements Watcher, StatCallback {

    ZooKeeper zk;

    String znode;

    Watcher chainedWatcher;

    boolean dead;

    DataMonitorListener listener;

    byte prevData[];

    public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
                       DataMonitorListener listener) {
        this.zk = zk;
        this.znode = znode;
        this.chainedWatcher = chainedWatcher;
        this.listener = listener;
        // Get things started by checking if the node exists. We are going
        // to be completely event driven
        zk.exists(znode, true, this, null);
    }

    /**
     * Other classes use the DataMonitor by implementing this method
     */
    public interface DataMonitorListener {
        /**
         * The existence status of the node has changed.
         */
        void exists(byte data[]);

        /**
         * The ZooKeeper session is no longer valid.
         *
         * @param rc
         *                the ZooKeeper reason code
         */
        void closing(int rc);
    }

    public void process(WatchedEvent event) {
        String path = event.getPath();
        if (event.getType() == Event.EventType.None) {
            // We are are being told that the state of the
            // connection has changed
            switch (event.getState()) {
                case SyncConnected:
                    // In this particular example we don't need to do anything
                    // here - watches are automatically re-registered with
                    // server and any watches triggered while the client was
                    // disconnected will be delivered (in order of course)
                    break;
                case Expired:
                    // It's all over
                    dead = true;
                    listener.closing(KeeperException.Code.SessionExpired);
                    break;
            }
        } else {
            if (path != null && path.equals(znode)) {
                // Something has changed on the node, let's find out
                zk.exists(znode, true, this, null);
            }
        }
        if (chainedWatcher != null) {
            chainedWatcher.process(event);
        }
    }

    public void processResult(int rc, String path, Object ctx, Stat stat) {
        boolean exists;
        switch (rc) {
            case Code.Ok:
                exists = true;
                break;
            case Code.NoNode:
                exists = false;
                break;
            case Code.SessionExpired:
            case Code.NoAuth:
                dead = true;
                listener.closing(rc);
                return;
            default:
                // Retry errors
                zk.exists(znode, true, this, null);
                return;
        }

        byte b[] = null;
        if (exists) {
            try {
                b = zk.getData(znode, false, null);
            } catch (KeeperException e) {
                // We don't need to worry about recovering now. The watch
                // callbacks will kick off any exception handling
                e.printStackTrace();
            } catch (InterruptedException e) {
                return;
            }
        }
        if ((b == null && b != prevData)
                || (b != null && !Arrays.equals(prevData, b))) {
            listener.exists(b);
            prevData = b;
        }
    }
}

測(cè)試

以上兩個(gè)類(lèi)來(lái)自zookeeper官網(wǎng)顽馋,下面測(cè)試一下這兩個(gè)類(lèi)是如何工作的。

  • 創(chuàng)建zk節(jié)點(diǎn)

    設(shè)置初始值為Hello_world

[zk: localhost:2181(CONNECTED) 0] create /zk_tutorials Hello_world
Created /zk_tutorials
[zk: localhost:2181(CONNECTED) 1] get /zk_tutorials
Hello_world
cZxid = 0x300000017
ctime = Mon May 22 08:28:10 CST 2017
mZxid = 0x300000017
mtime = Mon May 22 08:28:10 CST 2017
pZxid = 0x300000017
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 0
```

  • 創(chuàng)建測(cè)試腳本count.sh
    output.txt保存的是/zk_tutorials的值幌羞,所以這個(gè)腳本的作用是每5秒輸出一次此節(jié)點(diǎn)的值寸谜。

count=0
while [ true ]; do
let count+=1
echo Count: $count using cat output.txt
sleep 5
done
```

  • 編譯Executor.javaDataMonitor

javac Executor.java DataMonitor.java -classpath /usr/hdp/2.5.0.0-1245/zookeeper/zookeeper-3.4.6.2.5.0.0-1245.jar
```

  • 運(yùn)行測(cè)試

    export ZK_HOME=/usr/hdp/2.5.0.0-1245/zookeeper
    export CLASSPATH=.:$ZK_HOME/zookeeper-3.4.6.2.5.0.0-1245.jar:$ZK_HOME/lib/slf4j-api-1.6.1.jar:$ZK_HOME/lib/slf4j-log4j12-1.6.1.jar:$ZK_HOME/lib/log4j-1.2.16.jar:$ZK_HOME/conf
    java Executor localhost:3181 /zk_tutorials output.txt ./count.sh
    

    會(huì)看到每5秒輸出一次結(jié)果

    Count: 1 using Hello_world
    Count: 2 using Hello_world
    Count: 3 using Hello_world
    

    這時(shí)在另一終端修改/zk_tutorials的值

    set /zk_tutorials Hello_zk
    

    可以看到結(jié)果已經(jīng)發(fā)生了改變

Count: 43 using Hello_world
Stopping child
Starting child
Count: 1 using Hello_zk
Count: 2 using Hello_zk
Count: 3 using Hello_zk
Count: 4 using Hello_zk
Count: 5 using Hello_zk
```

總結(jié)

通過(guò)上面的方法,利用zookeeper可以實(shí)現(xiàn)配置的動(dòng)態(tài)管理属桦,更多內(nèi)容請(qǐng)參考zookeeper-v3.4.10官網(wǎng)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末熊痴,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子聂宾,更是在濱河造成了極大的恐慌果善,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件系谐,死亡現(xiàn)場(chǎng)離奇詭異巾陕,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)纪他,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)鄙煤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人茶袒,你說(shuō)我怎么就攤上這事梯刚。” “怎么了薪寓?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵亡资,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我向叉,道長(zhǎng)锥腻,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任植康,我火速辦了婚禮旷太,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘销睁。我一直安慰自己供璧,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布冻记。 她就那樣靜靜地躺著睡毒,像睡著了一般。 火紅的嫁衣襯著肌膚如雪冗栗。 梳的紋絲不亂的頭發(fā)上演顾,一...
    開(kāi)封第一講書(shū)人閱讀 51,541評(píng)論 1 305
  • 那天供搀,我揣著相機(jī)與錄音,去河邊找鬼钠至。 笑死葛虐,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的棉钧。 我是一名探鬼主播屿脐,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼宪卿!你這毒婦竟也來(lái)了的诵?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤佑钾,失蹤者是張志新(化名)和其女友劉穎西疤,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體休溶,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡代赁,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了邮偎。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片管跺。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡义黎,死狀恐怖禾进,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情廉涕,我是刑警寧澤泻云,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布,位于F島的核電站狐蜕,受9級(jí)特大地震影響宠纯,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜层释,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一婆瓜、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧贡羔,春花似錦廉白、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至楣嘁,卻和暖如春磅轻,著一層夾襖步出監(jiān)牢的瞬間珍逸,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工聋溜, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留谆膳,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓撮躁,卻偏偏與公主長(zhǎng)得像摹量,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子馒胆,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理缨称,服務(wù)發(fā)現(xiàn),斷路器祝迂,智...
    卡卡羅2017閱讀 134,657評(píng)論 18 139
  • 1 Zookeeper概述# ZooKeeper是一個(gè)為分布式應(yīng)用所設(shè)計(jì)的分布的睦尽、開(kāi)源的協(xié)調(diào)服務(wù),它主要是用來(lái)解決...
    七寸知架構(gòu)閱讀 7,354評(píng)論 0 101
  • ** 今天看了一下kafka官網(wǎng)型雳,嘗試著在自己電腦上安裝和配置当凡,然后學(xué)一下官方document。** Introd...
    RainChang閱讀 5,005評(píng)論 1 30
  • 1. Zookeeper介紹: 1.基本介紹: Zookeeper: 為分布式應(yīng)用提供分布式協(xié)作(協(xié)調(diào))服務(wù)纠俭。使用...
    奉先閱讀 4,572評(píng)論 0 10
  • 在某問(wèn)答平臺(tái)上冤荆,有人提出“如何在一年時(shí)間內(nèi)閱讀完200本書(shū)朴则?”。少年啊钓简,我們同樣的年輕而且我是一個(gè)大一的學(xué)生乌妒,正常...
    我是錢(qián)非閱讀 615評(píng)論 2 3