ZooKeeper 典型應用場景-數據發(fā)布與訂閱
ZooKeeper 是一個高可用的分布式數據管理與系統(tǒng)協(xié)調框架⊙嘎幔基于對 Paxos 算法的實現(xiàn)馋劈,使該框架保證了分布式環(huán)境中數據的強一致性,也正是基于這樣的特性偶洋,使得 ZooKeeper 可以解決很多分布式問題熟吏。
隨著互聯(lián)網系統(tǒng)規(guī)模的不斷擴大,大數據時代飛速到來玄窝,越來越多的分布式系統(tǒng)將 ZooKeeper 作為核心組件使用牵寺,如 Hadoop、Hbase恩脂、Kafka帽氓、Storm等,因此俩块,正確理解 ZooKeeper 的應用場景黎休,對于 ZooKeeper 的使用者來說顯得尤為重要浓领。本節(jié)主要將重點圍繞數據發(fā)布/訂閱、負載均衡势腮、命名服務联贩、分布式協(xié)調/通知、集群管理捎拯、Master選舉泪幌、分布式鎖和分布式隊列等方面來講解 ZooKeeper 的典型應用場景及實現(xiàn)。
1署照、數據發(fā)布/訂閱
發(fā)布/訂閱模式是一對多的關系祸泪,多個訂閱者對象同時監(jiān)聽某一主題對象,這個主題對象在自身狀態(tài)發(fā)生變化時會通知所有的訂閱者對象藤树。使它們能自動的更新自己的狀態(tài)浴滴。發(fā)布/訂閱可以使得發(fā)布方和訂閱方獨立封裝、獨立改變岁钓。當一個對象的改變需要同時改變其他對象升略,而且它不知道具體有多少對象需要改變時可以使用發(fā)布/訂閱模式。發(fā)布/訂閱模式在分布式系統(tǒng)中的典型應用有配置管理和服務發(fā)現(xiàn)屡限、注冊品嚣。
配置管理是指如果集群中的機器擁有某些相同的配置并且這些配置信息需要動態(tài)的改變,我們可以使用發(fā)布/訂閱模式把配置做統(tǒng)一集中管理钧大,讓這些機器格子各自訂閱配置信息的改變翰撑,當配置發(fā)生改變時,這些機器就可以得到通知并更新為最新的配置啊央。
服務發(fā)現(xiàn)眶诈、注冊是指對集群中的服務上下線做統(tǒng)一管理。每個工作服務器都可以作為數據的發(fā)布方向集群注冊自己的基本信息瓜饥,而讓某些監(jiān)控服務器作為訂閱方逝撬,訂閱工作服務器的基本信息,當工作服務器的基本信息發(fā)生改變如上下線乓土、服務器角色或服務范圍變更宪潮,監(jiān)控服務器可以得到通知并響應這些變化。
1.1趣苏、配置管理
所謂的配置中心狡相,顧名思義就是發(fā)布者將數據發(fā)布到 ZooKeeper 的一個或一系列節(jié)點上,供訂閱者進行數據訂閱食磕,進而達到動態(tài)獲取數據的目的尽棕,實現(xiàn)配置信息的集中式管理和數據的動態(tài)更新。
發(fā)布/訂閱系統(tǒng)一般有兩種設計模式彬伦,分別是推(Push)模式和拉(Pull)模式滔悉。
- 推模式
服務端主動將數據更新發(fā)送給所有訂閱的客戶端蟀悦。
- 拉模式
客戶端通過采用定時輪詢拉取。
ZooKeeper采用的是推拉相結合的方式:客戶端向服務端注冊自己需要關注的節(jié)點氧敢,一旦該節(jié)點的數據發(fā)生變更,那么服務端就會向相應的客戶端發(fā)送Watcher事件通知询张,客戶端接收到這個消息通知之后孙乖,需要主動到服務端獲取最新的數據。
如果將配置信息存放到ZK上進行集中管理份氧,那么通常情況下唯袄,應用在啟動的時候會主動到ZK服務器上進行一次配置信息的獲取,同時蜗帜,在指定上注冊一個Watcher監(jiān)聽恋拷,這樣一來,但凡配置信息發(fā)生變更厅缺,服務器都會實時通知所有訂閱的客戶端蔬顾,從而達到實時獲取最新配置信息的目的。
下面我們通過一個“配置管理”的實際案例來展示ZK在“數據發(fā)布/訂閱”場景下的使用方式湘捎。
在我們平常的應用系統(tǒng)開發(fā)中诀豁,經常會碰到這樣的需求:系統(tǒng)中需要使用一些通用的配置信息,例如機器列表信息窥妇、運行時的開關配置舷胜、數據庫的配置信息等。這些全局配置信息通常具備以下特性:
1)活翩、數據量通常比較小
2)烹骨、數據內容在運行時會發(fā)生變化
3)、集群中各機器共享材泄、配置一致
對于這類配置信息沮焕,一般的做法通常可以選擇將其存儲的本地配置文件或是內存變量中脸爱。無論采取哪種配置都可以實現(xiàn)相應的操作遇汞。但是一旦遇到集群規(guī)模比較大的情況的話,兩種方式就不再可取簿废。而我們還需要能夠快速的做到全部配置信息的變更空入,同時希望變更成本足夠小,因此我們需要一種更為分布式的解決方案族檬。
接下來我們以“數據庫切換”的應用場景展開歪赢,看看如何使用ZK來實現(xiàn)配置管理。
配置存儲
在進行配置管理之前单料,首先我們需要將初始化配置存儲到ZK上去埋凯,一般情況下点楼,我們可以在ZK上選取一個數據節(jié)點用于配置的存儲,我們將需要集中管理的配置信息寫入到該數據節(jié)點中去白对。
配置獲取
集群中每臺機器在啟動初始化階段掠廓,首先會從上面提到的ZK的配置節(jié)點上讀取數據庫信息,同時甩恼,客戶端還需要在該配置節(jié)點上注冊一個數據變更的Watcher監(jiān)聽蟀瞧,一旦發(fā)生節(jié)點數據變更,所有訂閱的客戶端都能夠獲取數據變更通知条摸。
配置變更
在系統(tǒng)運行過程中悦污,可能會出現(xiàn)需要進行書局切換的情況,這個時候就需要進行配置變更钉蒲。借助ZK切端,我們只需要對ZK上配置節(jié)點的內容進行更新,ZK就能夠幫我們將數據變更的通知發(fā)送到各個客戶端顷啼,每個客戶端在接收到這個變更通知后踏枣,就可以重新進行最新數據的獲取。
1.2线梗、服務發(fā)現(xiàn)椰于、注冊
1.3、綜合例子
架構圖:
[圖片上傳失敗...(image-8f75c4-1587351379309)]
Manage Server 程序主體流程:
[圖片上傳失敗...(image-cbb9bd-1587351379309)]
Work Server 程序主體流程:
[圖片上傳失敗...(image-a7b1d7-1587351379309)]
系統(tǒng)的核心類:
[圖片上傳失敗...(image-29066e-1587351379309)]
1.4仪搔、程序代碼實現(xiàn)
[](javascript:void(0); "復制代碼")
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">public class ServerConfig { private String dbUrl; private String dbPwd; private String dbUser; public String getDbUrl() { return dbUrl;
} public void setDbUrl(String dbUrl) { this.dbUrl = dbUrl;
} public String getDbPwd() { return dbPwd;
} public void setDbPwd(String dbPwd) { this.dbPwd = dbPwd;
} public String getDbUser() { return dbUser;
} public void setDbUser(String dbUser) { this.dbUser = dbUser;
}
@Override public String toString() { return "ServerConfig [dbUrl=" + dbUrl + ", dbPwd=" + dbPwd + ", dbUser=" + dbUser + "]";
}
}</pre>
[](javascript:void(0); "復制代碼")
[](javascript:void(0); "復制代碼")
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">public class ServerData { private String address; private Integer id; private String name; public String getAddress() { return address;
} public void setAddress(String address) { this.address = address;
} public Integer getId() { return id;
} public void setId(Integer id) { this.id = id;
} public String getName() { return name;
} public void setName(String name) { this.name = name;
}
@Override public String toString() { return "ServerData [address=" + address + ", id=" + id + ", name=" + name + "]";
}
}</pre>
[](javascript:void(0); "復制代碼")
[](javascript:void(0); "復制代碼")
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNoNodeException; import com.alibaba.fastjson.JSON; public class WorkServer { private ZkClient zkClient; private String configPath; private String serversPath; private ServerData serverData; private ServerConfig serverConfig; private IZkDataListener dataListener; public WorkServer(String configPath, String serversPath, ServerData serverData, ZkClient zkClient, ServerConfig initConfig) { this.zkClient = zkClient; this.configPath = configPath; this.serversPath = serversPath; this.serverData = serverData; this.serverConfig = initConfig; this.dataListener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception {
} public void handleDataChange(String dataPath, Object data) throws Exception {
String retJson = new String((byte[]) data);
ServerConfig serverConfigLocal = (ServerConfig) JSON.parseObject(retJson, ServerConfig.class);
updateConfig(serverConfigLocal);
System.out.println("new Work server config is:" + serverConfig.toString());
}
};
} public void start() {
System.out.println("work server start...");
initRunning();
} public void stop() {
System.out.println("work server stop...");
zkClient.unsubscribeDataChanges(configPath, dataListener);
} private void initRunning() {
registMe();
zkClient.subscribeDataChanges(configPath, dataListener);
} private void registMe() {
String mePath = serversPath.concat("/").concat(serverData.getAddress()); try {
zkClient.createEphemeral(mePath, JSON.toJSONString(serverData)
.getBytes());
} catch (ZkNoNodeException e) {
zkClient.createPersistent(serversPath, true);
registMe();
}
} private void updateConfig(ServerConfig serverConfig) { this.serverConfig = serverConfig;
}
}</pre>
[](javascript:void(0); "復制代碼")
[](javascript:void(0); "復制代碼")
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import com.alibaba.fastjson.JSON; public class ManageServer { private String serversPath; private String commandPath; private String configPath; private ZkClient zkClient; private ServerConfig config; private IZkChildListener childListener; private IZkDataListener dataListener; private List<String> workServerList; public ManageServer(String serversPath, String commandPath,
String configPath, ZkClient zkClient, ServerConfig config) { this.serversPath = serversPath; this.commandPath = commandPath; this.zkClient = zkClient; this.config = config; this.configPath = configPath; this.childListener = new IZkChildListener() { public void handleChildChange(String parentPath,
List<String> currentChilds) throws Exception {
workServerList = currentChilds;
System.out.println("work server list changed, new list is ");
execList();
}
}; this.dataListener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { // ignore;
} public void handleDataChange(String dataPath, Object data) throws Exception {
String cmd = new String((byte[]) data);
System.out.println("cmd:" + cmd);
exeCmd(cmd);
}
};
} private void initRunning() {
zkClient.subscribeDataChanges(commandPath, dataListener);
zkClient.subscribeChildChanges(serversPath, childListener);
} /* * 1: list 2: create 3: modify */
private void exeCmd(String cmdType) { if ("list".equals(cmdType)) {
execList();
} else if ("create".equals(cmdType)) {
execCreate();
} else if ("modify".equals(cmdType)) {
execModify();
} else {
System.out.println("error command!" + cmdType);
}
} private void execList() {
System.out.println(workServerList.toString());
} private void execCreate() { if (!zkClient.exists(configPath)) { try {
zkClient.createPersistent(configPath, JSON.toJSONString(config)
.getBytes());
} catch (ZkNodeExistsException e) {
zkClient.writeData(configPath, JSON.toJSONString(config)
.getBytes());
} catch (ZkNoNodeException e) {
String parentDir = configPath.substring(0,
configPath.lastIndexOf('/'));
zkClient.createPersistent(parentDir, true);
execCreate();
}
}
} private void execModify() {
config.setDbUser(config.getDbUser() + "_modify"); try {
zkClient.writeData(configPath, JSON.toJSONString(config).getBytes());
} catch (ZkNoNodeException e) {
execCreate();
}
} public void start() {
initRunning();
} public void stop() {
zkClient.unsubscribeChildChanges(serversPath, childListener);
zkClient.unsubscribeDataChanges(commandPath, dataListener);
}
}</pre>
[](javascript:void(0); "復制代碼")
[](javascript:void(0); "復制代碼")
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; import com.sql.zookeeper.common.ZookeeperConstant; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer; public class SubscribeZkClient { private static final int CLIENT_QTY = 5; private static final String CONFIG_PATH = "/config"; private static final String COMMAND_PATH = "/command"; private static final String SERVERS_PATH = "/servers"; public static void main(String[] args) throws Exception {
List<ZkClient> clients = new ArrayList<ZkClient>();
List<WorkServer> workServers = new ArrayList<WorkServer>();
ManageServer manageServer = null; try {
ServerConfig initConfig = new ServerConfig();
initConfig.setDbPwd("123456");
initConfig.setDbUrl("jdbc:mysql://localhost:3306/mydb");
initConfig.setDbUser("root");
ZkClient clientManage = new ZkClient(ZookeeperConstant.ZK_CONNECTION_STRING, 5000, 5000, new BytesPushThroughSerializer());
manageServer = new ManageServer(SERVERS_PATH, COMMAND_PATH, CONFIG_PATH, clientManage, initConfig);
manageServer.start(); for (int i = 0; i < CLIENT_QTY; ++i) {
ZkClient client = new ZkClient(ZookeeperConstant.ZK_CONNECTION_STRING, 5000, 5000, new BytesPushThroughSerializer());
clients.add(client);
ServerData serverData = new ServerData();
serverData.setId(i);
serverData.setName("WorkServer#" + i);
serverData.setAddress("192.168.1." + i);
WorkServer workServer = new WorkServer(CONFIG_PATH, SERVERS_PATH, serverData, client, initConfig);
workServers.add(workServer);
workServer.start();
}
System.out.println("敲回車鍵退出瘾婿!\n"); new BufferedReader(new InputStreamReader(System.in)).readLine();
} finally {
System.out.println("Shutting down..."); for (WorkServer workServer : workServers) { try {
workServer.stop();
} catch (Exception e) {
e.printStackTrace();
}
} for (ZkClient client : clients) { try {
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}</pre>
[](javascript:void(0); "復制代碼")