ZooKeeper是一個分布式的,開放源碼的分布式應(yīng)用程序協(xié)調(diào)服務(wù)蜜徽,是Google的Chubby一個開源的實現(xiàn)腋寨,是Hadoop和Hbase的重要組件莹桅。它是一個為分布式應(yīng)用提供一致性服務(wù)的軟件粉寞,提供的功能包括:配置維護尼荆、域名服務(wù)、分布式同步唧垦、組服務(wù)等捅儒。
ZooKeeper的架構(gòu)通過冗余服務(wù)實現(xiàn)高可用性。因此振亮,如果第一次無應(yīng)答巧还,客戶端就可以詢問另一臺ZooKeeper主機。ZooKeeper節(jié)點將它們的數(shù)據(jù)存儲于一個分層的命名空間双炕,非常類似于一個文件系統(tǒng)或一個前綴樹結(jié)構(gòu)〈樽ィ客戶端可以在節(jié)點讀寫妇斤,從而以這種方式擁有一個共享的配置服務(wù)。更新是全序的丹拯。
基于ZooKeeper分布式鎖的流程
(1)在zookeeper指定節(jié)點(locks)下創(chuàng)建臨時順序節(jié)點node_n
(2)獲取locks下所有子節(jié)點children
(3)對子節(jié)點按節(jié)點自增序號從小到大排序
(4)判斷本節(jié)點是不是第一個子節(jié)點站超,若是,則獲取鎖乖酬;若不是死相,則監(jiān)聽比該節(jié)點小的那個節(jié)點的刪除事件
若監(jiān)聽事件生效,則回到第二步重新進行判斷咬像,直到獲取到鎖算撮。
具體實現(xiàn)
下面就具體使用java和zookeeper實現(xiàn)分布式鎖,操作zookeeper使用的是apache提供的zookeeper的包县昂。
通過實現(xiàn)Watch接口肮柜,實現(xiàn)process(WatchedEvent event)方法來實施監(jiān)控,使CountDownLatch來完成監(jiān)控倒彰,在等待鎖的時候使用CountDownLatch來計數(shù)审洞,等到后進行countDown,停止等待待讳,繼續(xù)運行芒澜。
以下整體流程基本與上述描述流程一致仰剿,只是在監(jiān)聽的時候使用的是CountDownLatch來監(jiān)聽前一個節(jié)點。
代碼如下
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class? DistributedLock implements Lock,Watcher{
? ? private ZooKeeper zk = null;
? ? // 根節(jié)點? ? private String ROOT_LOCK = "/locks";
? ? // 競爭的資源? ? private String lockName;
? ? // 等待的前一個鎖? ? private String WAIT_LOCK;
? ? // 當(dāng)前鎖? ? private String CURRENT_LOCK;
? ? // 計數(shù)器? ? private CountDownLatch countDownLatch;
? ? private int sessionTimeout = 30000;
? ? private List<Exception> exceptionList = new ArrayList<Exception>();
? ? /**? ? * 配置分布式鎖? ? *@paramconfig 連接的url? ? *@paramlockName 競爭資源? ? */? ? ? ? ? ?
? ?publicDistributedLock(String config, String lockName){
? ? ? ? this.lockName = lockName;
? ? ? ? try {
? ? ? ? ? ? // 連接zookeeper? ? ? ? ? ? zk = new ZooKeeper(config, sessionTimeout, this);
? ? ? ? ? ? Stat stat = zk.exists(ROOT_LOCK, false);
? ? ? ? ? ? if (stat == null) {
? ? ? ? ? ? ? ? // 如果根節(jié)點不存在痴晦,則創(chuàng)建根節(jié)點? ? ? ? ? ? ? ? zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
? ? ? ? ? ? }
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? } catch (KeeperException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
? ? // 節(jié)點監(jiān)視器? ? publicvoidprocess(WatchedEvent event){
? ? ? ? if (this.countDownLatch != null) {
? ? ? ? ? ? this.countDownLatch.countDown();
? ? ? ? }
? ? }
? ? publicvoidlock(){
? ? ? ? if (exceptionList.size() > 0) {
? ? ? ? ? ? throw new LockException(exceptionList.get(0));
? ? ? ? }
? ? ? ? try {
? ? ? ? ? ? if (this.tryLock()) {
? ? ? ? ? ? ? ? System.out.println(Thread.currentThread().getName() + " " + lockName + "獲得了鎖");
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? // 等待鎖? ? ? ? ? ? ? ? waitForLock(WAIT_LOCK, sessionTimeout);
? ? ? ? ? ? }
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? } catch (KeeperException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
? ? publicbooleantryLock(){
? ? ? ? try {
? ? ? ? ? ? String splitStr = "_lock_";
? ? ? ? ? ? if (lockName.contains(splitStr)) {
? ? ? ? ? ? ? ? throw new LockException("鎖名有誤");
? ? ? ? ? ? }
? ? ? ? ? ? // 創(chuàng)建臨時有序節(jié)點? ? ? ? ? ? CURRENT_LOCK = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
? ? ? ? ? ? ? ? ? ? ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
? ? ? ? ? ? System.out.println(CURRENT_LOCK + " 已經(jīng)創(chuàng)建");
? ? ? ? ? ? // 取所有子節(jié)點? ? ? ? ? ? List<String> subNodes = zk.getChildren(ROOT_LOCK, false);
? ? ? ? ? ? // 取出所有l(wèi)ockName的鎖? ? ? ? ? ? List<String> lockObjects = new ArrayList<String>();
? ? ? ? ? ? for (String node : subNodes) {
? ? ? ? ? ? ? ? String _node = node.split(splitStr)[0];
? ? ? ? ? ? ? ? if (_node.equals(lockName)) {
? ? ? ? ? ? ? ? ? ? lockObjects.add(node);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? Collections.sort(lockObjects);
? ? ? ? ? ? System.out.println(Thread.currentThread().getName() + " 的鎖是 " + CURRENT_LOCK);
? ? ? ? ? ? // 若當(dāng)前節(jié)點為最小節(jié)點南吮,則獲取鎖成功? ? ? ? ? ? if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? }
? ? ? ? ? ? // 若不是最小節(jié)點,則找到自己的前一個節(jié)點? ? ? ? ? ? String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
? ? ? ? ? ? WAIT_LOCK = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? } catch (KeeperException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? return false;
? ? }
? ? publicbooleantryLock(longtimeout, TimeUnit unit){
? ? ? ? try {
? ? ? ? ? ? if (this.tryLock()) {
? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? }
? ? ? ? ? ? return waitForLock(WAIT_LOCK, timeout);
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? ? ? return false;
? ? }
? ? // 等待鎖? ? privatebooleanwaitForLock(String prev,longwaitTime)throwsKeeperException, InterruptedException{
? ? ? ? Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);
? ? ? ? if (stat != null) {
? ? ? ? ? ? System.out.println(Thread.currentThread().getName() + "等待鎖 " + ROOT_LOCK + "/" + prev);
? ? ? ? ? ? this.countDownLatch = new CountDownLatch(1);
? ? ? ? ? ? // 計數(shù)等待阅酪,若等到前一個節(jié)點消失旨袒,則precess中進行countDown,停止等待术辐,獲取鎖? ? ? ? ? ? this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
? ? ? ? ? ? this.countDownLatch = null;
? ? ? ? ? ? System.out.println(Thread.currentThread().getName() + " 等到了鎖");
? ? ? ? }
? ? ? ? return true;
? ? }
? ? publicvoidunlock(){
? ? ? ? try {
? ? ? ? ? ? System.out.println("釋放鎖 " + CURRENT_LOCK);
? ? ? ? ? ? zk.delete(CURRENT_LOCK, -1);
? ? ? ? ? ? CURRENT_LOCK = null;
? ? ? ? ? ? zk.close();
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? } catch (KeeperException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
? ? publicConditionnewCondition(){
? ? ? ? return null;
? ? }
? ? publicvoidlockInterruptibly()throwsInterruptedException{
? ? ? ? this.lock();
? ? }
? ? public classLockExceptionextendsRuntimeException{
? ? ? ? private static final long serialVersionUID = 1L;
? ? ? ? publicLockException(String e){
? ? ? ? ? ? super(e);
? ? ? ? }
? ? ? ? publicLockException(Exception e){
? ? ? ? ? ? super(e);
? ? ? ? }
? ? }
}
測試代碼
public classTest{
? ? static int n = 500;
? ? publicstaticvoidsecskill(){
? ? ? ? System.out.println(--n);
? ? }
? ? publicstaticvoidmain(String[] args){
? ? ? ? Runnable runnable = new Runnable() {
? ? ? ? ? ? publicvoidrun(){
? ? ? ? ? ? ? ? DistributedLock lock = null;
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? lock = new DistributedLock("127.0.0.1:2181", "test1");
? ? ? ? ? ? ? ? ? ? lock.lock();
? ? ? ? ? ? ? ? ? ? secskill();
? ? ? ? ? ? ? ? ? ? System.out.println(Thread.currentThread().getName() + "正在運行");
? ? ? ? ? ? ? ? } finally {
? ? ? ? ? ? ? ? ? ? if (lock != null) {
? ? ? ? ? ? ? ? ? ? ? ? lock.unlock();
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? };
? ? ? ? for (int i = 0; i < 10; i++) {
? ? ? ? ? ? Thread t = new Thread(runnable);
? ? ? ? ? ? t.start();
? ? ? ? }
? ? }
}
總體來說砚尽,如果了解到整個實現(xiàn)流程,使用zookeeper實現(xiàn)分布式鎖并不是很困難辉词,不過這也只是一個簡單的實現(xiàn)必孤,與前面實現(xiàn)Redis實現(xiàn)相比,本實現(xiàn)的穩(wěn)定性更強瑞躺,這是因為zookeeper的特性所致敷搪,在外界看來,zookeeper集群中每一個節(jié)點都是一致的幢哨。
https://www.cnblogs.com/liuyang0/p/6800538.html