使用Curator去做分布式的實(shí)時(shí)參數(shù)配置

Curator簡(jiǎn)介

Apache Curator是一個(gè)比較完善的ZooKeeper客戶端框架真友,通過封裝的一套高級(jí)API 簡(jiǎn)化了ZooKeeper的操作今豆。通過查看官方文檔,可以發(fā)現(xiàn)Curator主要解決了三類問題:

  • 封裝ZooKeeper client與ZooKeeper server之間的連接處理
  • 提供了一套Fluent風(fēng)格的操作API
  • 提供ZooKeeper各種應(yīng)用場(chǎng)景(recipe卢鹦, 比如:分布式鎖服務(wù)锻离、集群領(lǐng)導(dǎo)選舉欢顷、共享計(jì)數(shù)器槽棍、緩存機(jī)制、分布式隊(duì)列等)的抽象封裝

Curator主要從以下幾個(gè)方面降低了zk使用的復(fù)雜性:

  • 重試機(jī)制:提供可插拔的重試機(jī)制, 它將給捕獲所有可恢復(fù)的異常配置一個(gè)重試策略抬驴,并且內(nèi)部也提供了幾種標(biāo)準(zhǔn)的重試策略(比如指數(shù)補(bǔ)償)
  • 連接狀態(tài)監(jiān)控: Curator初始化之后會(huì)一直對(duì)zk連接進(jìn)行監(jiān)聽炼七,一旦發(fā)現(xiàn)連接狀態(tài)發(fā)生變化將會(huì)作出相應(yīng)的處理
  • zk客戶端實(shí)例管理:Curator會(huì)對(duì)zk客戶端到server集群的連接進(jìn)行管理,并在需要的時(shí)候重建zk實(shí)例布持,保證與zk集群連接的可靠性
  • 各種使用場(chǎng)景支持:Curator實(shí)現(xiàn)了zk支持的大部分使用場(chǎng)景(甚至包括zk自身不支持的場(chǎng)景)豌拙,這些實(shí)現(xiàn)都遵循了zk的最佳實(shí)踐,并考慮了各種極端情況

采用Curator去是使用zookeeper往往操作簡(jiǎn)單鳖链,代碼量少姆蘸,可以很簡(jiǎn)單地實(shí)現(xiàn)斷線重連墩莫,監(jiān)聽器等使用起來也比較簡(jiǎn)單芙委,還支持分布式鎖等一些zookeeper常用的功能的代碼實(shí)現(xiàn)。

使用Curator監(jiān)聽目錄的數(shù)據(jù)變化

集群中的機(jī)器去監(jiān)聽zookeeper某一個(gè)目錄的數(shù)據(jù)變化狂秦,就可以動(dòng)態(tài)地灌侣、實(shí)時(shí)地將新的配置信息修改至集群的每一臺(tái)機(jī)器,省去了手工配置地麻煩裂问,還可以在程序運(yùn)行過程中動(dòng)態(tài)地更新一些配置侧啼。采用Curator的NodeCache可以完成一次注冊(cè)n次監(jiān)聽,這個(gè)對(duì)象可以緩存節(jié)點(diǎn)數(shù)據(jù)堪簿,在節(jié)點(diǎn)數(shù)據(jù)發(fā)生改變的時(shí)候痊乾,就會(huì)觸發(fā)這個(gè)事件。

下面分別是修改配置的客戶端和監(jiān)聽器程序代碼:
修改的客戶端代碼:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;

public class myTest {
    /** zookeeper地址 */
    static final String CONNECT_ADDR = "localhost:2181";
    /** session超時(shí)時(shí)間 */
    static final int SESSION_OUTTIME = 5000;//ms 
    public static void main(String[] args) throws Exception {
        String path = "/myApplication/myIPs";
        //1 重試策略:初試時(shí)間為1s 重試10次
                RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
                //2 通過工廠創(chuàng)建連接
                CuratorFramework cf = CuratorFrameworkFactory.builder()
                            .connectString(CONNECT_ADDR)
                            .sessionTimeoutMs(SESSION_OUTTIME)
                            .retryPolicy(retryPolicy)
                            .build();
                //3 開啟連接
                cf.start();
                
                Stat stat = cf.checkExists().forPath(path);
                 
                if (stat == null)
                {
                    System.out.println("節(jié)點(diǎn)尚不存在");
                    cf.create().forPath(path, "192.168.1.102".getBytes());
                }
                cf.setData().forPath(path, "192.168.1.102,192.168.2.11,192.168.2.11".getBytes());
                cf.delete().forPath(path);
                if (cf != null) {
                     cf.close();
                    }
                
    }

}

監(jiān)聽器代碼:


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class CuratorWatcher1 {
    
    private static List<String> ips = new ArrayList<String>();
    
    /** zookeeper地址 */
    static final String CONNECT_ADDR = "localhost:2181";
    /** session超時(shí)時(shí)間 */
    static final int SESSION_OUTTIME = 5000;//ms 
    
    public static void main(String[] args) throws Exception {
        
        //1 重試策略:初試時(shí)間為1s 重試10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        //2 通過工廠創(chuàng)建連接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                    .connectString(CONNECT_ADDR)
                    .sessionTimeoutMs(SESSION_OUTTIME)
                    .retryPolicy(retryPolicy)
                    .build();
        
        //3 建立連接
        cf.start();
        /**
         * 想要實(shí)現(xiàn)watch一次注冊(cè)n次監(jiān)聽的話椭更,我們需要使用到curator里的一個(gè)NodeCache對(duì)象哪审。
         * 這個(gè)對(duì)象可以用來緩存節(jié)點(diǎn)數(shù)據(jù),并且可以給節(jié)點(diǎn)添加nodeChange事件虑瀑,當(dāng)節(jié)點(diǎn)的數(shù)據(jù)發(fā)生變化就會(huì)觸發(fā)這個(gè)事件
         */
        //4 建立一個(gè)cache緩存    Curator之nodeCache一次注冊(cè)湿滓,N次監(jiān)聽
        final NodeCache cache = new NodeCache(cf, "/myApplication/myIPs", false);
        cache.start(true);
        cache.getListenable().addListener(new NodeCacheListener() {
            /**
             * <B>方法名稱:</B>nodeChanged<BR>
        
             * @see org.apache.curator.framework.recipes.cache.NodeCacheListener#nodeChanged()
             */
            @Override
            public void nodeChanged() throws Exception {
                 // 防止節(jié)點(diǎn)被刪除時(shí)發(fā)生錯(cuò)誤
                if (cache.getCurrentData() == null) {
                    System.out.println("獲取節(jié)點(diǎn)數(shù)據(jù)異常,無法獲取當(dāng)前緩存的節(jié)點(diǎn)數(shù)據(jù)舌狗,可能該節(jié)點(diǎn)已被刪除");
                    return;
                }

                 // 獲取節(jié)點(diǎn)最新的數(shù)據(jù)
                String data = new String(cache.getCurrentData().getData());
                System.out.println(cache.getCurrentData().getPath() + " 節(jié)點(diǎn)的數(shù)據(jù)發(fā)生變化叽奥,最新的數(shù)據(jù)為:" + data);
                if(data!=null){
                    String[] ipsArray = data.split(",");
                    System.out.println("change the Parameter ips !!!");
                    synchronized(ips){
                        ips.clear();
                        if(ipsArray!=null&&ipsArray.length>0){
                            for(int i=0;i<ipsArray.length;i++){
                                ips.add(ipsArray[i]);
                            }
                        }
                    }
                }
            
            }
        });
        Thread.sleep(Integer.MAX_VALUE);
        // 獲取當(dāng)前客戶端的狀態(tài)
        boolean isZkCuratorStarted = cf.isStarted();
        System.out.println("當(dāng)前客戶端的狀態(tài):" + (isZkCuratorStarted ? "連接中..." : "已關(guān)閉..."));
    }
}

運(yùn)行結(jié)果:


curator監(jiān)聽器.png

程序持續(xù)運(yùn)行,就可以將全局的靜態(tài)參數(shù)ips 動(dòng)態(tài)修改痛侍〕ィ可以看到,節(jié)點(diǎn)創(chuàng)建并賦值的過程,更新數(shù)據(jù)的時(shí)候以及刪除的時(shí)候都觸發(fā)了監(jiān)聽器赵哲。

當(dāng)然嘹狞,這只是一個(gè)簡(jiǎn)單的demo,很多實(shí)際的場(chǎng)景中缝呕,我們還得有參數(shù)修改后的重新加載過程疹启,這里要根據(jù)需求來進(jìn)行編碼。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末竹捉,一起剝皮案震驚了整個(gè)濱河市筷屡,隨后出現(xiàn)的幾起案子涧偷,更是在濱河造成了極大的恐慌,老刑警劉巖毙死,帶你破解...
    沈念sama閱讀 222,681評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件燎潮,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡扼倘,警方通過查閱死者的電腦和手機(jī)确封,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,205評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來再菊,“玉大人爪喘,你說我怎么就攤上這事【腊危” “怎么了秉剑?”我有些...
    開封第一講書人閱讀 169,421評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)稠诲。 經(jīng)常有香客問我侦鹏,道長(zhǎng),這世上最難降的妖魔是什么臀叙? 我笑而不...
    開封第一講書人閱讀 60,114評(píng)論 1 300
  • 正文 為了忘掉前任略水,我火速辦了婚禮,結(jié)果婚禮上劝萤,老公的妹妹穿的比我還像新娘渊涝。我一直安慰自己,他們只是感情好稳其,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,116評(píng)論 6 398
  • 文/花漫 我一把揭開白布驶赏。 她就那樣靜靜地躺著,像睡著了一般既鞠。 火紅的嫁衣襯著肌膚如雪煤傍。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,713評(píng)論 1 312
  • 那天嘱蛋,我揣著相機(jī)與錄音蚯姆,去河邊找鬼五续。 笑死,一個(gè)胖子當(dāng)著我的面吹牛龄恋,可吹牛的內(nèi)容都是我干的疙驾。 我是一名探鬼主播,決...
    沈念sama閱讀 41,170評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼郭毕,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼它碎!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起显押,我...
    開封第一講書人閱讀 40,116評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤扳肛,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后乘碑,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體挖息,經(jīng)...
    沈念sama閱讀 46,651評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,714評(píng)論 3 342
  • 正文 我和宋清朗相戀三年兽肤,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了套腹。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,865評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡资铡,死狀恐怖电禀,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情害驹,我是刑警寧澤鞭呕,帶...
    沈念sama閱讀 36,527評(píng)論 5 351
  • 正文 年R本政府宣布,位于F島的核電站宛官,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏瓦糕。R本人自食惡果不足惜底洗,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,211評(píng)論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望咕娄。 院中可真熱鬧亥揖,春花似錦、人聲如沸圣勒。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,699評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)圣贸。三九已至挚歧,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間吁峻,已是汗流浹背滑负。 一陣腳步聲響...
    開封第一講書人閱讀 33,814評(píng)論 1 274
  • 我被黑心中介騙來泰國(guó)打工在张, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人矮慕。 一個(gè)月前我還...
    沈念sama閱讀 49,299評(píng)論 3 379
  • 正文 我出身青樓帮匾,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親痴鳄。 傳聞我的和親對(duì)象是個(gè)殘疾皇子瘟斜,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,870評(píng)論 2 361