ZooKeeper 典型應用場景-數據發(fā)布與訂閱

ZooKeeper 典型應用場景-數據發(fā)布與訂閱

ZooKeeper 是一個高可用的分布式數據管理與系統(tǒng)協(xié)調框架⊙嘎幔基于對 Paxos 算法的實現(xiàn)馋劈,使該框架保證了分布式環(huán)境中數據的強一致性,也正是基于這樣的特性偶洋,使得 ZooKeeper 可以解決很多分布式問題熟吏。

隨著互聯(lián)網系統(tǒng)規(guī)模的不斷擴大,大數據時代飛速到來玄窝,越來越多的分布式系統(tǒng)將 ZooKeeper 作為核心組件使用牵寺,如 Hadoop、Hbase恩脂、Kafka帽氓、Storm等,因此俩块,正確理解 ZooKeeper 的應用場景黎休,對于 ZooKeeper 的使用者來說顯得尤為重要浓领。本節(jié)主要將重點圍繞數據發(fā)布/訂閱、負載均衡势腮、命名服務联贩、分布式協(xié)調/通知、集群管理捎拯、Master選舉泪幌、分布式鎖和分布式隊列等方面來講解 ZooKeeper 的典型應用場景及實現(xiàn)。

1署照、數據發(fā)布/訂閱

發(fā)布/訂閱模式是一對多的關系祸泪,多個訂閱者對象同時監(jiān)聽某一主題對象,這個主題對象在自身狀態(tài)發(fā)生變化時會通知所有的訂閱者對象藤树。使它們能自動的更新自己的狀態(tài)浴滴。發(fā)布/訂閱可以使得發(fā)布方和訂閱方獨立封裝、獨立改變岁钓。當一個對象的改變需要同時改變其他對象升略,而且它不知道具體有多少對象需要改變時可以使用發(fā)布/訂閱模式。發(fā)布/訂閱模式在分布式系統(tǒng)中的典型應用有配置管理服務發(fā)現(xiàn)屡限、注冊品嚣。

配置管理是指如果集群中的機器擁有某些相同的配置并且這些配置信息需要動態(tài)的改變,我們可以使用發(fā)布/訂閱模式把配置做統(tǒng)一集中管理钧大,讓這些機器格子各自訂閱配置信息的改變翰撑,當配置發(fā)生改變時,這些機器就可以得到通知并更新為最新的配置啊央。

服務發(fā)現(xiàn)眶诈、注冊是指對集群中的服務上下線做統(tǒng)一管理。每個工作服務器都可以作為數據的發(fā)布方向集群注冊自己的基本信息瓜饥,而讓某些監(jiān)控服務器作為訂閱方逝撬,訂閱工作服務器的基本信息,當工作服務器的基本信息發(fā)生改變如上下線乓土、服務器角色或服務范圍變更宪潮,監(jiān)控服務器可以得到通知并響應這些變化。

1.1趣苏、配置管理

所謂的配置中心狡相,顧名思義就是發(fā)布者將數據發(fā)布到 ZooKeeper 的一個或一系列節(jié)點上,供訂閱者進行數據訂閱食磕,進而達到動態(tài)獲取數據的目的尽棕,實現(xiàn)配置信息的集中式管理和數據的動態(tài)更新。

發(fā)布/訂閱系統(tǒng)一般有兩種設計模式彬伦,分別是推(Push)模式和拉(Pull)模式滔悉。

  • 推模式

服務端主動將數據更新發(fā)送給所有訂閱的客戶端蟀悦。

  • 拉模式

客戶端通過采用定時輪詢拉取。

ZooKeeper采用的是推拉相結合的方式:客戶端向服務端注冊自己需要關注的節(jié)點氧敢,一旦該節(jié)點的數據發(fā)生變更,那么服務端就會向相應的客戶端發(fā)送Watcher事件通知询张,客戶端接收到這個消息通知之后孙乖,需要主動到服務端獲取最新的數據。

如果將配置信息存放到ZK上進行集中管理份氧,那么通常情況下唯袄,應用在啟動的時候會主動到ZK服務器上進行一次配置信息的獲取,同時蜗帜,在指定上注冊一個Watcher監(jiān)聽恋拷,這樣一來,但凡配置信息發(fā)生變更厅缺,服務器都會實時通知所有訂閱的客戶端蔬顾,從而達到實時獲取最新配置信息的目的。

下面我們通過一個“配置管理”的實際案例來展示ZK在“數據發(fā)布/訂閱”場景下的使用方式湘捎。

在我們平常的應用系統(tǒng)開發(fā)中诀豁,經常會碰到這樣的需求:系統(tǒng)中需要使用一些通用的配置信息,例如機器列表信息窥妇、運行時的開關配置舷胜、數據庫的配置信息等。這些全局配置信息通常具備以下特性:

1)活翩、數據量通常比較小

2)烹骨、數據內容在運行時會發(fā)生變化

3)、集群中各機器共享材泄、配置一致

對于這類配置信息沮焕,一般的做法通常可以選擇將其存儲的本地配置文件或是內存變量中脸爱。無論采取哪種配置都可以實現(xiàn)相應的操作遇汞。但是一旦遇到集群規(guī)模比較大的情況的話,兩種方式就不再可取簿废。而我們還需要能夠快速的做到全部配置信息的變更空入,同時希望變更成本足夠小,因此我們需要一種更為分布式的解決方案族檬。

接下來我們以“數據庫切換”的應用場景展開歪赢,看看如何使用ZK來實現(xiàn)配置管理。

配置存儲

在進行配置管理之前单料,首先我們需要將初始化配置存儲到ZK上去埋凯,一般情況下点楼,我們可以在ZK上選取一個數據節(jié)點用于配置的存儲,我們將需要集中管理的配置信息寫入到該數據節(jié)點中去白对。

配置獲取

集群中每臺機器在啟動初始化階段掠廓,首先會從上面提到的ZK的配置節(jié)點上讀取數據庫信息,同時甩恼,客戶端還需要在該配置節(jié)點上注冊一個數據變更的Watcher監(jiān)聽蟀瞧,一旦發(fā)生節(jié)點數據變更,所有訂閱的客戶端都能夠獲取數據變更通知条摸。

配置變更

在系統(tǒng)運行過程中悦污,可能會出現(xiàn)需要進行書局切換的情況,這個時候就需要進行配置變更钉蒲。借助ZK切端,我們只需要對ZK上配置節(jié)點的內容進行更新,ZK就能夠幫我們將數據變更的通知發(fā)送到各個客戶端顷啼,每個客戶端在接收到這個變更通知后踏枣,就可以重新進行最新數據的獲取。

1.2线梗、服務發(fā)現(xiàn)椰于、注冊

1.3、綜合例子

架構圖:

[圖片上傳失敗...(image-8f75c4-1587351379309)]

Manage Server 程序主體流程:

[圖片上傳失敗...(image-cbb9bd-1587351379309)]

Work Server 程序主體流程:

[圖片上傳失敗...(image-a7b1d7-1587351379309)]

系統(tǒng)的核心類:

[圖片上傳失敗...(image-29066e-1587351379309)]

1.4仪搔、程序代碼實現(xiàn)

[
復制代碼

](javascript:void(0); "復制代碼")

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">public class ServerConfig { private String dbUrl; private String dbPwd; private String dbUser; public String getDbUrl() { return dbUrl;
} public void setDbUrl(String dbUrl) { this.dbUrl = dbUrl;
} public String getDbPwd() { return dbPwd;
} public void setDbPwd(String dbPwd) { this.dbPwd = dbPwd;
} public String getDbUser() { return dbUser;
} public void setDbUser(String dbUser) { this.dbUser = dbUser;
}

@Override public String toString() { return "ServerConfig [dbUrl=" + dbUrl + ", dbPwd=" + dbPwd + ", dbUser=" + dbUser + "]";
}

}</pre>

[
復制代碼

](javascript:void(0); "復制代碼")

[
復制代碼

](javascript:void(0); "復制代碼")

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">public class ServerData { private String address; private Integer id; private String name; public String getAddress() { return address;
} public void setAddress(String address) { this.address = address;
} public Integer getId() { return id;
} public void setId(Integer id) { this.id = id;
} public String getName() { return name;
} public void setName(String name) { this.name = name;
}

@Override public String toString() { return "ServerData [address=" + address + ", id=" + id + ", name=" + name + "]";
}

}</pre>

[
復制代碼

](javascript:void(0); "復制代碼")

[
復制代碼

](javascript:void(0); "復制代碼")

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNoNodeException; import com.alibaba.fastjson.JSON; public class WorkServer { private ZkClient zkClient; private String configPath; private String serversPath; private ServerData serverData; private ServerConfig serverConfig; private IZkDataListener dataListener; public WorkServer(String configPath, String serversPath, ServerData serverData, ZkClient zkClient, ServerConfig initConfig) { this.zkClient = zkClient; this.configPath = configPath; this.serversPath = serversPath; this.serverData = serverData; this.serverConfig = initConfig; this.dataListener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception {

        } public void handleDataChange(String dataPath, Object data) throws Exception {
            String retJson = new String((byte[]) data);
            ServerConfig serverConfigLocal = (ServerConfig) JSON.parseObject(retJson, ServerConfig.class);
            updateConfig(serverConfigLocal);
            System.out.println("new Work server config is:" + serverConfig.toString());
        }
    };
} public void start() {
    System.out.println("work server start...");
    initRunning();
} public void stop() {
    System.out.println("work server stop...");
    zkClient.unsubscribeDataChanges(configPath, dataListener);
} private void initRunning() {
    registMe();
    zkClient.subscribeDataChanges(configPath, dataListener);
} private void registMe() {
    String mePath = serversPath.concat("/").concat(serverData.getAddress()); try {
        zkClient.createEphemeral(mePath, JSON.toJSONString(serverData)
                .getBytes());
    } catch (ZkNoNodeException e) {
        zkClient.createPersistent(serversPath, true);
        registMe();
    }
} private void updateConfig(ServerConfig serverConfig) { this.serverConfig = serverConfig;
}

}</pre>

[
復制代碼

](javascript:void(0); "復制代碼")

[
復制代碼

](javascript:void(0); "復制代碼")

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import com.alibaba.fastjson.JSON; public class ManageServer { private String serversPath; private String commandPath; private String configPath; private ZkClient zkClient; private ServerConfig config; private IZkChildListener childListener; private IZkDataListener dataListener; private List<String> workServerList; public ManageServer(String serversPath, String commandPath,
String configPath, ZkClient zkClient, ServerConfig config) { this.serversPath = serversPath; this.commandPath = commandPath; this.zkClient = zkClient; this.config = config; this.configPath = configPath; this.childListener = new IZkChildListener() { public void handleChildChange(String parentPath,
List<String> currentChilds) throws Exception {
workServerList = currentChilds;

            System.out.println("work server list changed, new list is ");
            execList();
        }
    }; this.dataListener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { // ignore;

} public void handleDataChange(String dataPath, Object data) throws Exception {
String cmd = new String((byte[]) data);
System.out.println("cmd:" + cmd);
exeCmd(cmd);
}
};
} private void initRunning() {
zkClient.subscribeDataChanges(commandPath, dataListener);
zkClient.subscribeChildChanges(serversPath, childListener);
} /* * 1: list 2: create 3: modify */
private void exeCmd(String cmdType) { if ("list".equals(cmdType)) {
execList();

    } else if ("create".equals(cmdType)) {
        execCreate();
    } else if ("modify".equals(cmdType)) {
        execModify();
    } else {
        System.out.println("error command!" + cmdType);
    }
} private void execList() {
    System.out.println(workServerList.toString());
} private void execCreate() { if (!zkClient.exists(configPath)) { try {
            zkClient.createPersistent(configPath, JSON.toJSONString(config)
                    .getBytes());
        } catch (ZkNodeExistsException e) {
            zkClient.writeData(configPath, JSON.toJSONString(config)
                    .getBytes());
        } catch (ZkNoNodeException e) {
            String parentDir = configPath.substring(0,
                    configPath.lastIndexOf('/'));
            zkClient.createPersistent(parentDir, true);
            execCreate();
        }
    }
} private void execModify() {
    config.setDbUser(config.getDbUser() + "_modify"); try {
        zkClient.writeData(configPath, JSON.toJSONString(config).getBytes());
    } catch (ZkNoNodeException e) {
        execCreate();
    }
} public void start() {
    initRunning();
} public void stop() {
    zkClient.unsubscribeChildChanges(serversPath, childListener);
    zkClient.unsubscribeDataChanges(commandPath, dataListener);
}

}</pre>

[
復制代碼

](javascript:void(0); "復制代碼")

[
復制代碼

](javascript:void(0); "復制代碼")

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; import com.sql.zookeeper.common.ZookeeperConstant; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer; public class SubscribeZkClient { private static final int CLIENT_QTY = 5; private static final String CONFIG_PATH = "/config"; private static final String COMMAND_PATH = "/command"; private static final String SERVERS_PATH = "/servers"; public static void main(String[] args) throws Exception {

    List<ZkClient> clients = new ArrayList<ZkClient>();
    List<WorkServer> workServers = new ArrayList<WorkServer>();
    ManageServer manageServer = null; try {
        ServerConfig initConfig = new ServerConfig();
        initConfig.setDbPwd("123456");
        initConfig.setDbUrl("jdbc:mysql://localhost:3306/mydb");
        initConfig.setDbUser("root");

        ZkClient clientManage = new ZkClient(ZookeeperConstant.ZK_CONNECTION_STRING, 5000, 5000, new BytesPushThroughSerializer());
        manageServer = new ManageServer(SERVERS_PATH, COMMAND_PATH, CONFIG_PATH, clientManage, initConfig);
        manageServer.start(); for (int i = 0; i < CLIENT_QTY; ++i) {
            ZkClient client = new ZkClient(ZookeeperConstant.ZK_CONNECTION_STRING, 5000, 5000, new BytesPushThroughSerializer());
            clients.add(client);
            ServerData serverData = new ServerData();
            serverData.setId(i);
            serverData.setName("WorkServer#" + i);
            serverData.setAddress("192.168.1." + i);

            WorkServer workServer = new WorkServer(CONFIG_PATH, SERVERS_PATH, serverData, client, initConfig);
            workServers.add(workServer);
            workServer.start();
        }
        System.out.println("敲回車鍵退出瘾婿!\n"); new BufferedReader(new InputStreamReader(System.in)).readLine();

    } finally {
        System.out.println("Shutting down..."); for (WorkServer workServer : workServers) { try {
                workServer.stop();
            } catch (Exception e) {
                e.printStackTrace();
            }
        } for (ZkClient client : clients) { try {
                client.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

}</pre>

[
復制代碼

](javascript:void(0); "復制代碼")

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市烤咧,隨后出現(xiàn)的幾起案子偏陪,更是在濱河造成了極大的恐慌,老刑警劉巖煮嫌,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件笛谦,死亡現(xiàn)場離奇詭異,居然都是意外死亡昌阿,警方通過查閱死者的電腦和手機饥脑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來懦冰,“玉大人灶轰,你說我怎么就攤上這事∷⒏郑” “怎么了笋颤?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長内地。 經常有香客問我伴澄,道長赋除,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任非凌,我火速辦了婚禮举农,結果婚禮上,老公的妹妹穿的比我還像新娘敞嗡。我一直安慰自己并蝗,他們只是感情好,可當我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布秸妥。 她就那樣靜靜地躺著,像睡著了一般沃粗。 火紅的嫁衣襯著肌膚如雪粥惧。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天最盅,我揣著相機與錄音突雪,去河邊找鬼。 笑死涡贱,一個胖子當著我的面吹牛咏删,可吹牛的內容都是我干的。 我是一名探鬼主播问词,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼督函,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了激挪?” 一聲冷哼從身側響起辰狡,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎垄分,沒想到半個月后宛篇,有當地人在樹林里發(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡薄湿,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年叫倍,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片豺瘤。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡吆倦,死狀恐怖,靈堂內的尸體忽然破棺而出炉奴,到底是詐尸還是另有隱情逼庞,我是刑警寧澤,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布瞻赶,位于F島的核電站赛糟,受9級特大地震影響派任,放射性物質發(fā)生泄漏。R本人自食惡果不足惜璧南,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一掌逛、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧司倚,春花似錦豆混、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至盒粮,卻和暖如春鸵鸥,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背丹皱。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工妒穴, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人摊崭。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓讼油,卻偏偏與公主長得像,于是被迫代替她去往敵國和親呢簸。 傳聞我的和親對象是個殘疾皇子矮台,可洞房花燭夜當晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內容