分布式安裝部署
- 集群規(guī)劃
在hadoop-100仰冠,hadoop-101蜕依,hadoop-102三個節(jié)點上部署Zookeeper - 解壓Zookeeper
- 創(chuàng)建zkDate目錄
- 在zkData目錄下常見myid文件
- 修改zoo_sample.cfg文件為zoo.cfg文件
- 把hadoop-100上的/opt/module/zookeeper-3.4.10分發(fā)到hadoop-101和hadoop-102上
[hadoop@hadoop-100 module]$ xsync zookeeper-3.4.10/ - 配置服務(wù)器編號
在hadoop-100,hadoop-101劫哼,hadoop-102分別修改myid文件為1,2,3 - 配置zoo.cfg文件
在hadoop-100上修改數(shù)據(jù)存儲路徑
dataDir=/opt/module/zookeeper-3.4.10/zkData
并增加如下配置
server.1=hadoop-100:2888:3888
server.2=hadoop-101:2888:3888
server.3=hadoop-102:2888:3888
配置參數(shù)解讀
server.A=B:C:D
A是一個數(shù)字叮趴,表示這個是第幾號服務(wù)器
集群模式下配置一個文件myid,這個文件在dataDir目錄下权烧,這個文件里面有一個數(shù)據(jù)就是A的值疫向,Zookeeper啟動時讀取此文件,拿到里面的數(shù)據(jù)與zoo.cfg里面的配置信息比較從而判斷到底是哪個server
B是這個服務(wù)器的ip地址
C是這個服務(wù)器與集群中的Leader服務(wù)器交換信息的端口
D是萬一集群中的Leader服務(wù)器掛了豪嚎,需要一個端口來重新進(jìn)行選舉,選出一個新的Leader谈火,而這個端口就是用來執(zhí)行選舉時服務(wù)器相互通信的端口 - 同步zoo.cfg文件
[hadoop@hadoop-100 conf]$ xsync zoo.cfg - 集群操作
分別啟動Zookeeper
[hadoop@hadoop-100 zookeeper-3.4.10]$ bin/zkServer.sh start
[hadoop@hadoop-101 zookeeper-3.4.10]$ bin/zkServer.sh start
[hadoop@hadoop-102 zookeeper-3.4.10]$ bin/zkServer.sh start
查看狀態(tài)
[hadoop@hadoop-100 zookeeper-3.4.10]$ bin/zkServer.sh status
[hadoop@hadoop-101 zookeeper-3.4.10]$ bin/zkServer.sh status
[hadoop@hadoop-102 zookeeper-3.4.10]$ bin/zkServer.sh status
客戶端命令行操作
命令基本語法 | 功能描述 |
---|---|
help | 顯示所有操作命令 |
ls path [watch] | 使用 ls 命令來查看當(dāng)前znode中所包含的內(nèi)容 |
ls2 path [watch] | 查看當(dāng)前節(jié)點數(shù)據(jù)并能看到更新次數(shù)等數(shù)據(jù) |
create | 普通創(chuàng)建 -s 含有序列 -e 臨時(重啟或者超時消失) |
get path [watch] | 獲得節(jié)點的值 |
set | 設(shè)置節(jié)點的具體值 |
stat | 查看節(jié)點狀態(tài) |
delete | 刪除節(jié)點 |
rmr | 遞歸刪除節(jié)點 |
- 啟動客戶端
[hadoop@hadoop-100 zookeeper-3.4.10]$ bin/zkCli.sh - 顯示所有操作命令
[zk: localhost:2181(CONNECTED) 0] help - 查看當(dāng)前znode中所包含的內(nèi)容
[zk: localhost:2181(CONNECTED) 1] ls / - 查看當(dāng)前節(jié)點詳細(xì)數(shù)據(jù)
[zk: localhost:2181(CONNECTED) 2] ls2 / - 創(chuàng)建兩個普通節(jié)點
[zk: localhost:2181(CONNECTED) 3] create /test "hello Zookeeper"
[zk: localhost:2181(CONNECTED) 4] create /test/hello "hello i use zookeeper" - 獲得節(jié)點的值
[zk: localhost:2181(CONNECTED) 5] get /test
[zk: localhost:2181(CONNECTED) 6] get /test/hello - 創(chuàng)建短暫節(jié)點
create -e /test/ephemeral "Ephemeral data"
在當(dāng)前客戶端查看
[zk: localhost:2181(CONNECTED) 8] ls /test
退出當(dāng)前客戶端
[zk: localhost:2181(CONNECTED) 9] quit
重啟客戶端
[hadoop@hadoop-100 zookeeper-3.4.10]$ bin/zkCli.sh
再次查看侈询,節(jié)點已經(jīng)刪除
ls /test - 創(chuàng)建帶序號的節(jié)點
先創(chuàng)建普通節(jié)點
[zk: localhost:2181(CONNECTED) 3] create /test/sdata "sdata"
創(chuàng)建再序號的節(jié)點
[zk: localhost:2181(CONNECTED) 5] create -s /test/sdata/data "hello"
[zk: localhost:2181(CONNECTED) 5] create -s /test/sdata/data "hello"
[zk: localhost:2181(CONNECTED) 5] create -s /test/sdata/data "hello"
如果原來沒有序號節(jié)點,序號從0開始依次遞增糯耍。如果原節(jié)點下已有2個節(jié)點扔字,則再排序時從2開始,以此類推 - 修改節(jié)點數(shù)據(jù)值
[zk: localhost:2181(CONNECTED) 10] set /test/hello "hhh" - 節(jié)點的值變化監(jiān)聽
在hadoop-101上注冊監(jiān)聽/test/hello節(jié)點數(shù)據(jù)變化
[zk: localhost:2181(CONNECTED) 0] get /test/hello watch
在hadoop-100上修改/test/hello的數(shù)據(jù)
[zk: localhost:2181(CONNECTED) 11] set /test/hello "hhhh"
在hadoop-101上觀察收到的數(shù)據(jù)變化的監(jiān)聽
[zk: localhost:2181(CONNECTED) 1]
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/test/hello - 節(jié)點的子節(jié)點變化監(jiān)聽(路徑變化)
在hadoop-101上監(jiān)聽/test節(jié)點的子節(jié)點的變化
[zk: localhost:2181(CONNECTED) 2] ls /test watch
在hadoop-100/test節(jié)點上創(chuàng)建子節(jié)點
[zk: localhost:2181(CONNECTED) 12] create -s /test/watch "hhh"
在hadoop-101上觀察收到子節(jié)點變化的監(jiān)聽
[zk: localhost:2181(CONNECTED) 3]
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/test - 刪除節(jié)點
[zk: localhost:2181(CONNECTED) 14] delete /test/watch0000000003 - 遞歸刪除節(jié)點
[zk: localhost:2181(CONNECTED) 15] rmr /test - 查看節(jié)點狀態(tài)
stat /
API應(yīng)用
環(huán)境準(zhǔn)備
創(chuàng)建maven工程
添加pom文件
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
</dependencies>
在resources目錄下温技,新增log4j.properties文件
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
創(chuàng)建Zookeeper客戶端
private static String connectString = "hadoop-100:2181,hadoop-101:2181,hadoop-102:2181";
private static int sessionTime = 2000;
private ZooKeeper zkClient = null;
@Test
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTime, null);
}
創(chuàng)建子節(jié)點
@Test
public void create() throws KeeperException, InterruptedException {
// 參數(shù)1:要創(chuàng)建的節(jié)點的路徑革为; 參數(shù)2:節(jié)點數(shù)據(jù) ; 參數(shù)3:節(jié)點權(quán)限 舵鳞;參數(shù)4:節(jié)點的類型
String s = zkClient.create("/test", "testApi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
獲取子節(jié)點并監(jiān)聽節(jié)點變化
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTime, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("watchedEvent.getType() = " + watchedEvent.getType());
System.out.println("watchedEvent.getPath() = " + watchedEvent.getPath());
try {
zkClient.getChildren("/", true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
@Test
public void getDataAndWatch() throws KeeperException, InterruptedException {
List<String> children = zkClient.getChildren("/", true);
for(String data : children) {
System.out.println("data = " + data);
}
Thread.sleep(Long.MAX_VALUE);
}
判斷Znode是否存在
@Test
public void exist() throws KeeperException, InterruptedException {
Stat exists = zkClient.exists("/test/apii", false);
System.out.println(exists == null ? "not exist" : "exist");
}
監(jiān)聽服務(wù)器節(jié)點動態(tài)上下線案例
需求
某分布式系統(tǒng)中震檩,主節(jié)點可以有多臺,可以動態(tài)上下線蜓堕,任意一臺客戶端都能實時感知到主節(jié)點服務(wù)器的上下線
需求分析
具體實現(xiàn)
- 先在集群上創(chuàng)建/servers節(jié)點
[zk: localhost:2181(CONNECTED) 10] create /servers "servers" - 服務(wù)端代碼
public class DistributeServer {
private static String connectString = "hadoop-100:2181,hadoop-101:2181,hadoop-102:2181";
private static int sessionTimeout = 2000;
private ZooKeeper zk = null;
private String parentNode = "/servers";
// 創(chuàng)建到zk的客戶端連接
public void getConnect() throws IOException{
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
// 注冊服務(wù)器
public void registServer(String hostname) throws Exception{
String create = zk.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname +" is online "+ create);
}
// 業(yè)務(wù)功能
public void business(String hostname) throws Exception{
System.out.println(hostname+" is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 1獲取zk連接
DistributeServer server = new DistributeServer();
server.getConnect();
// 2 利用zk連接注冊服務(wù)器信息
server.registServer(args[0]);
// 3 啟動業(yè)務(wù)功能
server.business(args[0]);
}
}
- 客戶端代碼
public class DistributeClient {
private static String connectString = "hadoop-100:2181,hadoop-101:2181,hadoop-102:2181";
private static int sessionTimeout = 2000;
private ZooKeeper zk = null;
private String parentNode = "/servers";
// 創(chuàng)建到zk的客戶端連接
public void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 再次啟動監(jiān)聽
try {
getServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
// 獲取服務(wù)器列表信息
public void getServerList() throws Exception {
// 1獲取服務(wù)器子節(jié)點信息抛虏,并且對父節(jié)點進(jìn)行監(jiān)聽
List<String> children = zk.getChildren(parentNode, true);
// 2存儲服務(wù)器信息列表
ArrayList<String> servers = new ArrayList<>();
// 3遍歷所有節(jié)點,獲取節(jié)點中的主機名稱信息
for (String child : children) {
byte[] data = zk.getData(parentNode + "/" + child, false, null);
servers.add(new String(data));
}
// 4打印服務(wù)器列表信息
System.out.println(servers);
}
// 業(yè)務(wù)功能
public void business() throws Exception{
System.out.println("client is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 1獲取zk連接
DistributeClient client = new DistributeClient();
client.getConnect();
// 2獲取servers的子節(jié)點信息套才,從中獲取服務(wù)器信息列表
client.getServerList();
// 3業(yè)務(wù)進(jìn)程啟動
client.business();
}
}