zookeeper命令行操作
- 運行
zkCli.sh –server <ip>
進(jìn)入命令行工具 - 使用 ls 命令來查看當(dāng)前 ZooKeeper 中所包含的內(nèi)容:
ls /
- 創(chuàng)建一個新的 znode 月幌,使用 create /zk myData 。這個命令創(chuàng)建了一個新的 znode 節(jié)點“ zk ”以及與它關(guān)聯(lián)的字符串:
create /zk "myData"
- 我們運行 get 命令來確認(rèn) znode 是否包含我們所創(chuàng)建的字符串:
get /zk
- 監(jiān)聽這個節(jié)點的變化,當(dāng)另外一個客戶端改變/zk時,它會打出下面的
get /zk watch
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/zk
- 下面我們通過 set 命令來對 zk 所關(guān)聯(lián)的字符串進(jìn)行設(shè)置:
set /zk "zsl"
- 下面我們將剛才創(chuàng)建的 znode 刪除:
delete /zk
- 刪除節(jié)點:
rmr /zk
zookeeper-api應(yīng)用
功能 | 描述 |
---|---|
create | 在本地目錄樹中創(chuàng)建一個節(jié)點 |
delete | 刪除一個節(jié)點 |
exists | 測試本地是否存在目標(biāo)節(jié)點 |
get/set data | 從目標(biāo)節(jié)點上讀取 / 寫數(shù)據(jù) |
get/set ACL | 獲取 / 設(shè)置目標(biāo)節(jié)點訪問控制列表信息 |
get children | 檢索一個子節(jié)點上的列表 |
sync | 等待要被傳送的數(shù)據(jù) |
demo代碼
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
public class SimpleZkClient {
private static final String connectString = "192.168.127.61:2181,192.168.127.62:2181,192.168.127.63:2181";
private static final int sessionTimeout = 2000;
ZooKeeper zkClient = null;
@Before
public void init() throws Exception {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 收到事件通知后的回調(diào)函數(shù)(應(yīng)該是我們自己的事件處理邏輯)
System.out.println(event.getType() + "---" + event.getPath());
try {
zkClient.getChildren("/", true);
} catch (Exception e) {
}
}
});
}
/**
* 數(shù)據(jù)的增刪改查
*
* @throws InterruptedException
* @throws KeeperException
*/
// 創(chuàng)建數(shù)據(jù)節(jié)點到zk中
public void testCreate() throws KeeperException, InterruptedException {
// 參數(shù)1:要創(chuàng)建的節(jié)點的路徑 參數(shù)2:節(jié)點大數(shù)據(jù) 參數(shù)3:節(jié)點的權(quán)限 參數(shù)4:節(jié)點的類型
String nodeCreated = zkClient.create("/eclipse", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//上傳的數(shù)據(jù)可以是任何類型矢空,但都要轉(zhuǎn)成byte[]
}
//判斷znode是否存在
@Test
public void testExist() throws Exception{
Stat stat = zkClient.exists("/eclipse", false);
System.out.println(stat==null?"not exist":"exist");
}
// 獲取子節(jié)點
@Test
public void getChildren() throws Exception {
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
Thread.sleep(Long.MAX_VALUE);
}
//獲取znode的數(shù)據(jù)
@Test
public void getData() throws Exception {
byte[] data = zkClient.getData("/eclipse", false, null);
System.out.println(new String(data));
}
//刪除znode
@Test
public void deleteZnode() throws Exception {
//參數(shù)2:指定要刪除的版本霍弹,-1表示刪除所有版本
zkClient.delete("/eclipse", -1);
}
//刪除znode
@Test
public void setData() throws Exception {
zkClient.setData("/app1", "imissyou angelababy".getBytes(), -1);
byte[] data = zkClient.getData("/app1", false, null);
System.out.println(new String(data));
}
}
Zookeeper的監(jiān)聽器工作機制
- 監(jiān)聽器是一個接口楣铁,我們的代碼中可以實現(xiàn)Wather這個接口,實現(xiàn)其中的process方法,方法中即我們自己的業(yè)務(wù)邏輯
- 監(jiān)聽器的注冊是在獲取數(shù)據(jù)的操作中實現(xiàn):
-
getData(path,watch?)
監(jiān)聽的事件是:節(jié)點數(shù)據(jù)變化事件 -
getChildren(path,watch?)
監(jiān)聽的事件是:節(jié)點下的子節(jié)點增減變化事件
-
分布式共享鎖的簡單實現(xiàn)
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class DistributedClientLock {
// 會話超時
private static final int SESSION_TIMEOUT = 2000;
// zookeeper集群地址
private String hosts = "mini1:2181,mini2:2181,mini3:2181";
private String groupNode = "locks";
private String subNode = "sub";
private boolean haveLock = false;
private ZooKeeper zk;
// 記錄自己創(chuàng)建的子節(jié)點路徑
private volatile String thisPath;
/**
* 連接zookeeper
*/
public void connectZookeeper() throws Exception {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
try {
// 判斷事件類型房交,此處只處理子節(jié)點變化事件
if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {
//獲取子節(jié)點帜篇,并對父節(jié)點進(jìn)行監(jiān)聽
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
String thisNode = thisPath.substring(("/" + groupNode + "/").length());
// 去比較是否自己是最小id
Collections.sort(childrenNodes);
if (childrenNodes.indexOf(thisNode) == 0) {
//訪問共享資源處理業(yè)務(wù)糙捺,并且在處理完成之后刪除鎖
doSomething();
//重新注冊一把新的鎖
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 1、程序一進(jìn)來就先注冊一把鎖到zk上
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// wait一小會笙隙,便于觀察
Thread.sleep(new Random().nextInt(1000));
// 從zk的鎖父目錄下洪灯,獲取所有子節(jié)點,并且注冊對父節(jié)點的監(jiān)聽
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
//如果爭搶資源的程序就只有自己竟痰,則可以直接去訪問共享資源
if (childrenNodes.size() == 1) {
doSomething();
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
/**
* 處理業(yè)務(wù)邏輯签钩,并且在最后釋放鎖
*/
private void doSomething() throws Exception {
try {
System.out.println("gain lock: " + thisPath);
Thread.sleep(2000);
// do something
} finally {
System.out.println("finished: " + thisPath);
//釋放鎖
zk.delete(this.thisPath, -1);
}
}
public static void main(String[] args) throws Exception {
DistributedClientLock dl = new DistributedClientLock();
dl.connectZookeeper();
Thread.sleep(Long.MAX_VALUE);
}
}