基于Zookeeper的分布式共享鎖
實(shí)現(xiàn)原理
- 基于Zookeeper椅野、Lock實(shí)現(xiàn)的分布共享式鎖
- 構(gòu)造初始化Zookeeper連接
- 在lock中嘗試獲取鎖(tryLock)
- 首先創(chuàng)建當(dāng)前連接的節(jié)點(diǎn)
- 獲取所有相關(guān)節(jié)點(diǎn),并排序
- 若當(dāng)前為只有一個(gè)節(jié)點(diǎn)或?yàn)樽钚≈蛋兀苯臃祷孬@取鎖成功
- 否則獲取前一個(gè)節(jié)點(diǎn),監(jiān)聽事件猖任,讓當(dāng)前節(jié)點(diǎn)進(jìn)入等待狀態(tài)
- 如果監(jiān)聽到事件為上刪除事件忿偷,釋放鎖
- 刪除節(jié)點(diǎn),釋放資源
代碼實(shí)現(xiàn)
@Data
@Slf4j
public class MyZkDistributedLock implements Lock, Watcher {
// 超時(shí)時(shí)間
private static final int SESSION_TIMEOUT = 5000;
// zookeeper server列表
private String hosts;
private String groupNode = "locks";
private String subNode = "sub";
private String lockName;
private ZooKeeper zk;
// 當(dāng)前client創(chuàng)建的子節(jié)點(diǎn)
private String thisPath;
// 當(dāng)前client等待的子節(jié)點(diǎn)
private String waitPath;
private List<Exception> exceptionList = new ArrayList<>();
private CountDownLatch latch = new CountDownLatch(1);
public MyZkDistributedLock(String hosts, String lockName) {
this.hosts = hosts;
this.lockName = lockName;
try {
// 連接zookeeper
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
Stat stat = zk.exists(groupNode, false);
if (stat == null) {
// 如果根節(jié)點(diǎn)不存在日丹,則創(chuàng)建根節(jié)點(diǎn)
zk.create(groupNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (IOException e) {
log.error("Zk連接異常", e);
exceptionList.add(e);
} catch (InterruptedException e) {
log.error("Zk連接異常", e);
exceptionList.add(e);
} catch (KeeperException e) {
log.error("Zk連接異常", e);
exceptionList.add(e);
}
}
@Override
public void lock() {
if (exceptionList.size() > 0) {
throw new LockException(exceptionList.get(0));
}
try {
if (this.tryLock()) {
log.info("------------>線程:{},鎖:{},獲得", Thread.currentThread().getName(), lockName);
return;
} else {
// 等待鎖
this.latch.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
try {
String splitStr = "_lock_";
if (lockName.contains(splitStr)) {
throw new MyZkDistributedLock.LockException("鎖名有誤");
}
// 創(chuàng)建子節(jié)點(diǎn)
thisPath = zk
.create("/" + groupNode + "/" + lockName + subNode + splitStr, null,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 注意, 沒有必要監(jiān)聽"/locks"的子節(jié)點(diǎn)的變化情況
List<String> childrenNodes = zk.getChildren("/" + groupNode, false);
// 取出所有l(wèi)ockName的鎖
List<String> lockObjects = new ArrayList<String>();
for (String node : childrenNodes) {
String _node = node.split(splitStr)[0];
if (_node.equals(lockName)) {
lockObjects.add(node);
}
}
// 列表中只有一個(gè)子節(jié)點(diǎn), 那肯定就是thisPath, 說明client獲得鎖
if (lockObjects.size() == 1) {
return true;
} else {
String thisNode = thisPath.substring(("/" + groupNode + "/").length());
// 排序
Collections.sort(lockObjects);
int index = lockObjects.indexOf(thisNode);
if (index == -1) {
// never happened
} else if (index == 0) {
// inddx == 0, 說明thisNode在列表中最小, 獲得鎖
return true;
} else {
// 獲得排名比thisPath前1位的節(jié)點(diǎn)
this.waitPath = "/" + groupNode + "/" + lockObjects.get(index - 1);
// 在waitPath上注冊監(jiān)聽器, 當(dāng)waitPath被刪除時(shí), zookeeper會回調(diào)監(jiān)聽器的process方法
zk.getData(waitPath, true, new Stat());
}
}
} catch (Exception e) {
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return tryLock();
}
@Override
public void unlock() {
try {
log.info("釋放鎖 {}", thisPath);
zk.delete(thisPath, -1);
thisPath = null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public Condition newCondition() {
return null;
}
@Override
public void process(WatchedEvent event) {
try {
// 發(fā)生了waitPath的刪除事件
if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {
this.latch.countDown();
}
} catch (Exception e) {
e.printStackTrace();
}
}
public class LockException extends RuntimeException {
private static final long serialVersionUID = 1L;
public LockException(String e) {
super(e);
}
public LockException(Exception e) {
super(e);
}
}
}
總結(jié)
總體來說瓷翻,實(shí)現(xiàn)并不難聚凹,我認(rèn)為主要就是排序號監(jiān)聽上一個(gè)節(jié)點(diǎn)的刪除事件割坠,依此類推齐帚,最后實(shí)現(xiàn)所有節(jié)點(diǎn)的監(jiān)聽