package com.shaolong;
public class Lock {
private String lockId;
private boolean isActive;
private String path;
public Lock() {
}
public Lock(String lockId, String path) {
this.lockId = lockId;
this.path = path;
}
public String getLockId() {
return lockId;
}
public void setLockId(String lockId) {
this.lockId = lockId;
}
public boolean isActive() {
return isActive;
}
public void setActive(boolean active) {
isActive = active;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
}
package com.shaolong;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.List;
import java.util.stream.Collectors;
public class ZookeeperLock {
private ZkClient zkClient;
private String rootConcatPath = "/wms_lock/";// 從根路徑開始 拼接 /wms_lock/itm-0001-0000000026 節(jié)點
private String root = "/wms_lock";
public ZookeeperLock(){
zkClient = new ZkClient("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",2000,5000);
if(!zkClient.exists(root)){
zkClient.createPersistent(root);
}
}
public Lock lock(String lockId,long timeOut){
Lock lockNode = createLockNode(lockId);
lockNode = tryActiveLock(lockNode);
if(!lockNode.isActive()){
try {
synchronized (lockNode){
lockNode.wait(timeOut);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return lockNode;
}
public Lock tryActiveLock(Lock lock){
//判斷是否獲得鎖
List<String> list = zkClient.getChildren(root)
.stream().sorted()
.map(p -> root+"/" + p)
.collect(Collectors.toList());
String firstPath = list.get(0);
if(firstPath.equals(lock.getPath())){
lock.setActive(true);
}else{
//添加監(jiān)聽
String upNodePath = list.get(list.indexOf(lock.getPath()) - 1);
zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("刪除節(jié)點 "+dataPath);
Lock lockNode = tryActiveLock(lock);
synchronized (lock){
if(lockNode.isActive()){
lockNode.notify();
}
}
zkClient.unsubscribeDataChanges(upNodePath,this);
}
});
}
//添加上一個節(jié)點監(jiān)聽
//再次重試激活鎖
return lock;
}
public void unLock(Lock lock){
zkClient.delete(lock.getPath());
}
private Lock createLockNode(String lockId){
//創(chuàng)建好目錄
createMakedir(lockId);
//創(chuàng)建臨時節(jié)點
String path = zkClient.createEphemeralSequential(rootConcatPath + lockId, "w");
Lock lock = new Lock();
lock.setLockId(lockId);
lock.setPath(path);
lock.setActive(false);
return lock;
}
private void createMakedir(String lockId) {
if(lockId.contains("/")){
String temp = lockId;
if(lockId.startsWith("/")){
temp = lockId.substring(1);
}
String[] split = temp.split("/");
for (int i=0;i<split.length-1;i++) {
root+="/"+split[i];
if (!zkClient.exists(root)) {
zkClient.createPersistent(root);
}
}
if(lockId.startsWith("/")){
rootConcatPath = rootConcatPath.substring(0,rootConcatPath.length()-1);
}
}
}
}
package com.xinhua.zk;
import com.shaolong.Lock;
import com.shaolong.ZookeeperLock;
public class RawOut implements Runnable{
ZookeeperLock lock2 = new ZookeeperLock();
String lockId = "rawout/aaa/bbb/mm/itm-0001-";
@Override
public void run() {
Lock lock = null;
try {
lock = lock2.lock(lockId, 3000);
Thread.sleep(3000);
}catch (Exception e){
e.printStackTrace();
}finally {
lock2.unLock(lock);
}
}
}
package com.xinhua.zk;
public class test {
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(new RawOut()).start();
}
}
}
最后編輯于 :2021.11.22 10:56:01
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者