zkclient
1),可以遞歸創(chuàng)建
2),可以遞歸刪除
創(chuàng)建一個單機(jī)的zookeeper
systemctl stop firewalld#關(guān)閉防火墻
docker pull zookeeper:3.4 #拉取鏡像
docker run -d --name=zookeeper -p 2181:2181 zookeeper:3.4 #創(chuàng)建容器
docker exec -it zookeeper /bin/bash#進(jìn)入容器
zkCli.sh #進(jìn)入zookeeper客戶端
依賴
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.11</version>
</dependency>
zkclientApi
package com.demo.service;
import com.demo.config.Person;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.util.List;
public class ZkService {
public static ZkClient zkClient =
new ZkClient("192.168.116.150:2181", 5000, 5000, new SerializableSerializer());
/**
* 創(chuàng)建節(jié)點
* path 節(jié)點路徑
* data 數(shù)據(jù) ,可謂null
* mode 4種類型 Ephemeral 臨時的會話 persistent 永久的會話
* acl acl策略
* callback 注冊一個異步回調(diào)函數(shù)
* context 傳遞一個對象
* createParents 指定是否創(chuàng)建父節(jié)點
*/
public static void create() {
zkClient.create("/test1","zkclient_test", CreateMode.EPHEMERAL);
zkClient.createEphemeral("/test2");
zkClient.createPersistent("/person",new Person("xiaoming","123456"));
zkClient.createPersistent("/hello/c1",true);
}
/**
* 刪除
*/
public static void delete() {
zkClient.delete("/person/xiaoming"); //刪除節(jié)點
//zkClient.deleteRecursive("/testzoo1");//刪除節(jié)點和子節(jié)點
}
/**
* 修改
*/
public static void update() {
//zkClient.writeData("/testzoo3","hello");//寫數(shù)據(jù),覆蓋原來的值
}
/**
* 是否存在
*/
public static void exists() {
zkClient.exists("/Person");
}
/**
* 讀取節(jié)點的值
* 對象要實現(xiàn)序列化接口
*/
public static void select() {
Stat stat = new Stat(); //節(jié)點的信息
Person person = zkClient.readData("/person", stat);
System.out.println(stat);
System.out.println(person);
}
/**
* 注冊監(jiān)聽會開啟一個新的線程來處理,無需自己在開一條線程單獨注冊
* 監(jiān)聽接口 注冊監(jiān)聽方法 解除監(jiān)聽
* IZkChildListener監(jiān)聽子節(jié)點 ZkClient的subscribeChildChanges方法 ZkClient的unsubscribeChildChanges方法
* IZkDataListener 監(jiān)聽數(shù)據(jù)的變化 ZkClient的subscribeDataChanges方法 ZkClient的subscribeDataChanges方法
* IZkStateListener監(jiān)聽服務(wù)狀態(tài)的狀態(tài) ZkClient的subscribeStateChanges方法 ZkClient的unsubscribeStateChanges方法
*/
public static void listen() {
zkClient.subscribeChildChanges("/person", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("parentpath" + parentPath + "--" + currentChilds);
}
});
}
/**
* 監(jiān)聽節(jié)點數(shù)據(jù)的變化
* */
public static void listen(String string) {
zkClient.subscribeDataChanges("/person", new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
//變化觸發(fā)
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
//刪除時觸發(fā)
}
});
}
/**
* 添加數(shù)據(jù)
*/
public static void main(String[] args) throws InterruptedException {
ZkService.select();
Thread.sleep(60 * 1000);
}
}
zoo1.png
zoo2.png
zkclient分布式鎖
有序節(jié)點:假如當(dāng)前有一個父節(jié)點為/lock,我們可以在這個父節(jié)點下面創(chuàng)建子節(jié)點乳幸;zookeeper提供了一個可選的有序特性拭荤,例如我們可以創(chuàng)建子節(jié)點“/lock/node-”并且指明有序,那么zookeeper在生成子節(jié)點時會根據(jù)當(dāng)前的子節(jié)點數(shù)量自動添加整數(shù)序號,
臨時節(jié)點:客戶端可以建立一個臨時節(jié)點严拒,在會話結(jié)束或者會話超時后京痢,zookeeper會自動刪除該節(jié)點。
事件監(jiān)聽:在讀取數(shù)據(jù)時挥吵,我們可以同時對節(jié)點設(shè)置事件監(jiān)聽重父,當(dāng)節(jié)點數(shù)據(jù)或結(jié)構(gòu)變化時,zookeeper會通知客戶端忽匈。當(dāng)前zookeeper有如下四種事件:1)節(jié)點創(chuàng)建房午;2)節(jié)點刪除;3)節(jié)點數(shù)據(jù)修改丹允;4)子節(jié)點變更郭厌。
流程
- 客戶端連接zookeeper,并在/lock下創(chuàng)建臨時的且有序的子節(jié)點雕蔽,第一個客戶端對應(yīng)的子節(jié)點為/lock/lock-0000000000折柠,第二個為/lock/lock-0000000001,以此類推批狐。
- 客戶端獲取/lock下的子節(jié)點列表扇售,判斷自己創(chuàng)建的子節(jié)點是否為當(dāng)前子節(jié)點列表中序號最小的子節(jié)點,如果是則認(rèn)為獲得鎖嚣艇,否則監(jiān)聽/lock的子節(jié)點變更消息缘眶,獲得子節(jié)點變更通知后重復(fù)此步驟直至獲得鎖;
package com.demo.lock;
public interface BaseLock {
/**
* 獲取鎖
* */
boolean getlock();
/**
* 釋放鎖
* */
void unlock();
}
public class BaseLockImpl implements BaseLock {
private static final String ZOOKEEPER_IP_PORT = "192.168.116.150:2181";
private static final String LOCK_PATH = "/LOCK";
private CountDownLatch countDownLatch;
private ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, 4000, 4000, new SerializableSerializer());
private String beforePath;
private String currentPath;
// 判斷有沒有LOCK目錄髓废,沒有則創(chuàng)建
public BaseLockImpl() {
if (!this.client.exists(LOCK_PATH)) {
this.client.createPersistent(LOCK_PATH);
}
}
@Override
public boolean getlock() {
if (tryLock()) {
System.out.println("=======================獲取鎖");
return true;
} else {
waitForLock();
return getlock();
}
}
@Override
public void unlock() {
// 刪除當(dāng)前臨時節(jié)點
client.delete(currentPath);
System.out.println("======刪除節(jié)點=================");
}
/**
* 創(chuàng)建節(jié)點
*/
public Boolean tryLock() {
// 如果currentPath為空則為第一次嘗試加鎖巷懈,第一次加鎖賦值currentPath
if (currentPath == null || currentPath.length() <= 0) {
// 創(chuàng)建一個臨時順序節(jié)點
currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");
// 獲取所有臨時節(jié)點并排序,臨時節(jié)點名稱為自增長的字符串如:0000000400
List<String> childrens = this.client.getChildren(LOCK_PATH);
//由小到大排序所有子節(jié)點
Collections.sort(childrens);
//判斷創(chuàng)建的子節(jié)點/LOCK/Node-n是否最小,即currentPath,如果當(dāng)前節(jié)點等于childrens中的最小的一個就占用鎖
if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {
return true;
}
//監(jiān)聽前一個節(jié)點
else {
int wz = Collections.binarySearch(childrens, currentPath.substring(6));
beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);
return false;
}
}
return true;
}
//等待鎖,對次小節(jié)點進(jìn)行監(jiān)聽
private void waitForLock() {
IZkDataListener listener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
// 對次小節(jié)點進(jìn)行監(jiān)聽,排在前面的的節(jié)點增加數(shù)據(jù)刪除的watcher
this.client.subscribeDataChanges(beforePath, listener);
if (this.client.exists(beforePath)) {
countDownLatch = new CountDownLatch(1);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.client.unsubscribeDataChanges(beforePath, listener);
}
業(yè)務(wù)類
public class Order {
// 自增長序列
private static int i = 0;
// 按照規(guī)則生成訂單編號
public synchronized String getOrderCode() {
Date now = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
return sdf.format(now) + ++i;
}
}
測試10個并發(fā)
public class LocalService implements Runnable{
private static Order order = new Order();
// 同時并發(fā)的線程數(shù)
private static final int num = 10;
// 按照線程數(shù)初始化倒計數(shù)器,倒計數(shù)器
//保證線程同時執(zhí)行
private static CountDownLatch cdl = new CountDownLatch(num);
private BaseLock baseLock = new BaseLockImpl();
public void createOrder(){
String orderCode = null;
baseLock.getlock();
try {
orderCode = order.getOrderCode();
System.out.println(orderCode);
}catch (Exception e){
//todo
}finally {
baseLock.unlock();
}
}
@Override
public void run() {
try {
cdl.await();
}catch (Exception e){
e.printStackTrace();
}
//創(chuàng)建訂單
createOrder();
}
public static void main(String[] args) {
for (int i = 1; i <= num; i++) {
// 按照線程數(shù)迭代實例化線程
new Thread(new LocalService()).start();
// 創(chuàng)建一個線程慌洪,倒計數(shù)器減1
cdl.countDown();
}
}
}
zoo3.png
zoo4.png
curator
curator是連接ZK應(yīng)用最廣泛的工具
zk分布式鎖顶燕,Master選舉等等凑保,curator包含了這些場景。
依賴
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
curator對節(jié)點的增刪改查,事件監(jiān)聽
public class CuratorService {
public static CuratorFramework client = null;
// 集群模式則是多個ip:port,ip:port
public static final String zkServerIps = "192.168.116.150:2181";
//緩存節(jié)點,監(jiān)聽節(jié)點數(shù)據(jù)變動
final static NodeCache nodeCache = new NodeCache(client,"/path");
// 為子節(jié)點添加watcher
// PathChildrenCache: 監(jiān)聽數(shù)據(jù)節(jié)點的增刪改涌攻,可以設(shè)置觸發(fā)的事件
final static PathChildrenCache childrenCache = new PathChildrenCache(client, "/path", true);
/**
* basesleeptimems 初始化sleep的時間
* maxretries 最大重試次數(shù)
* maxsleeoms 最大重試時間
*/
public CuratorService() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerIps)
.sessionTimeoutMs(10000)
.retryPolicy(retryPolicy)
.build();
client.start();
System.out.println("qi dong ke hu duan ...");
}
private void close() {
if (client != null) {
this.client.close();
}
}
public static void main(String[] args) throws Exception {
CuratorService curatorService = new CuratorService();
// 創(chuàng)建節(jié)點
String nodePath = "/super/testNode"; // 節(jié)點路徑
byte[] data = "this is a test data".getBytes(); // 節(jié)點數(shù)據(jù)
String result = curatorService.client.create().creatingParentsIfNeeded() // 創(chuàng)建父節(jié)點欧引,也就是會遞歸創(chuàng)建
.withMode(CreateMode.PERSISTENT) // 節(jié)點類型
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) // 節(jié)點的acl權(quán)限
.forPath(nodePath, data);
System.out.println(result + "節(jié)點,創(chuàng)建成功...");
// 更新節(jié)點數(shù)據(jù)
byte[] newData = "this is a new data".getBytes();
Stat resultStat = curatorService.client.setData().withVersion(0) // 指定數(shù)據(jù)版本
.forPath(nodePath, newData); // 需要修改的節(jié)點路徑以及新數(shù)據(jù)
// 刪除節(jié)點
curatorService.client.delete()
.guaranteed() // 如果刪除失敗恳谎,那么在后端還是會繼續(xù)刪除芝此,直到成功
.deletingChildrenIfNeeded() // 子節(jié)點也一并刪除,也就是會遞歸刪除
.withVersion(resultStat.getVersion())
.forPath(nodePath);
Thread.sleep(1000);
// 讀取節(jié)點數(shù)據(jù)
Stat stat = new Stat();
byte[] nodeData = curatorService.client.getData().storingStatIn(stat).forPath(nodePath);
System.out.println("節(jié)點 " + nodePath + " 的數(shù)據(jù)為:" + new String(nodeData));
System.out.println("該節(jié)點的數(shù)據(jù)版本號為:" + stat.getVersion());
// 獲取子節(jié)點列表
List<String> childNodes = curatorService.client.getChildren().forPath(nodePath);
System.out.println(nodePath + " 節(jié)點下的子節(jié)點列表:");
// 查詢某個節(jié)點是否存在因痛,存在就會返回該節(jié)點的狀態(tài)信息婚苹,如果不存在的話則返回空
Stat statExist = curatorService.client.checkExists().forPath(nodePath);
if (statExist == null) {
System.out.println(nodePath + " 節(jié)點不存在");
} else {
System.out.println(nodePath + " 節(jié)點存在");
}
for (String childNode : childNodes) {
System.out.println(childNode);
}
//緩存節(jié)點的數(shù)據(jù)
curatorService.nodeCache.start(true);
curatorService.nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
// 防止節(jié)點被刪除時發(fā)生錯誤
if (curatorService.nodeCache.getCurrentData() == null) {
System.out.println("獲取節(jié)點數(shù)據(jù)異常,無法獲取當(dāng)前緩存的節(jié)點數(shù)據(jù)鸵膏,可能該節(jié)點已被刪除");
return;
}
// 獲取節(jié)點最新的數(shù)據(jù)
String data = new String(curatorService.nodeCache.getCurrentData().getData());
System.out.println(curatorService.nodeCache.getCurrentData().getPath() + " 節(jié)點的數(shù)據(jù)發(fā)生變化膊升,最新的數(shù)據(jù)為:" + data);
}
});
/**
* 監(jiān)聽子節(jié)點初始化的方式
* POST_INITIALIZED_EVENT:異步初始化,初始化之后會觸發(fā)事件
* NORMAL:異步初始化
* BUILD_INITIAL_CACHE:同步初始化
* */
curatorService.childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
// 列出子節(jié)點數(shù)據(jù)列表谭企,需要使用BUILD_INITIAL_CACHE同步初始化模式才能獲得廓译,異步是獲取不到的
List<ChildData> childDataList = childrenCache.getCurrentData();
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
// 通過判斷event type的方式來實現(xiàn)不同事件的觸發(fā)
if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) { // 子節(jié)點初始化時觸發(fā)
System.out.println("\n--------------\n");
System.out.println("子節(jié)點初始化成功");
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { // 添加子節(jié)點時觸發(fā)
//if (event.getData().getPath().trim().equals(NODE_PATH)) {}
System.out.println("\n--------------\n");
System.out.print("子節(jié)點:" + event.getData().getPath() + " 添加成功,");
System.out.println("該子節(jié)點的數(shù)據(jù)為:" + new String(event.getData().getData()));
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { // 刪除子節(jié)點時觸發(fā)
System.out.println("\n--------------\n");
System.out.println("子節(jié)點:" + event.getData().getPath() + " 刪除成功");
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { // 修改子節(jié)點數(shù)據(jù)時觸發(fā)
System.out.println("\n--------------\n");
System.out.print("子節(jié)點:" + event.getData().getPath() + " 數(shù)據(jù)更新成功债查,");
System.out.println("子節(jié)點:" + event.getData().getPath() + " 新的數(shù)據(jù)為:" + new String(event.getData().getData()));
}
}
});
// 關(guān)閉客戶端
curatorService.close();
}
}
分布式鎖