基于Zookeeper的分布式事務(wù)鎖

ZooKeeper是一個分布式的,開放源碼的分布式應(yīng)用程序協(xié)調(diào)服務(wù)蜜徽,是Google的Chubby一個開源的實現(xiàn)腋寨,是Hadoop和Hbase的重要組件莹桅。它是一個為分布式應(yīng)用提供一致性服務(wù)的軟件粉寞,提供的功能包括:配置維護尼荆、域名服務(wù)、分布式同步唧垦、組服務(wù)等捅儒。

ZooKeeper的架構(gòu)通過冗余服務(wù)實現(xiàn)高可用性。因此振亮,如果第一次無應(yīng)答巧还,客戶端就可以詢問另一臺ZooKeeper主機。ZooKeeper節(jié)點將它們的數(shù)據(jù)存儲于一個分層的命名空間双炕,非常類似于一個文件系統(tǒng)或一個前綴樹結(jié)構(gòu)〈樽ィ客戶端可以在節(jié)點讀寫妇斤,從而以這種方式擁有一個共享的配置服務(wù)。更新是全序的丹拯。



基于ZooKeeper分布式鎖的流程

(1)在zookeeper指定節(jié)點(locks)下創(chuàng)建臨時順序節(jié)點node_n

(2)獲取locks下所有子節(jié)點children

(3)對子節(jié)點按節(jié)點自增序號從小到大排序

(4)判斷本節(jié)點是不是第一個子節(jié)點站超,若是,則獲取鎖乖酬;若不是死相,則監(jiān)聽比該節(jié)點小的那個節(jié)點的刪除事件

若監(jiān)聽事件生效,則回到第二步重新進行判斷咬像,直到獲取到鎖算撮。



具體實現(xiàn)

下面就具體使用java和zookeeper實現(xiàn)分布式鎖,操作zookeeper使用的是apache提供的zookeeper的包县昂。

通過實現(xiàn)Watch接口肮柜,實現(xiàn)process(WatchedEvent event)方法來實施監(jiān)控,使CountDownLatch來完成監(jiān)控倒彰,在等待鎖的時候使用CountDownLatch來計數(shù)审洞,等到后進行countDown,停止等待待讳,繼續(xù)運行芒澜。

以下整體流程基本與上述描述流程一致仰剿,只是在監(jiān)聽的時候使用的是CountDownLatch來監(jiān)聽前一個節(jié)點。

代碼如下

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.Stat;

import java.io.IOException;

import java.util.ArrayList;

import java.util.Collections;

import java.util.List;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

public class? DistributedLock implements Lock,Watcher{

? ? private ZooKeeper zk = null;

? ? // 根節(jié)點? ? private String ROOT_LOCK = "/locks";

? ? // 競爭的資源? ? private String lockName;

? ? // 等待的前一個鎖? ? private String WAIT_LOCK;

? ? // 當(dāng)前鎖? ? private String CURRENT_LOCK;

? ? // 計數(shù)器? ? private CountDownLatch countDownLatch;

? ? private int sessionTimeout = 30000;

? ? private List<Exception> exceptionList = new ArrayList<Exception>();

? ? /**? ? * 配置分布式鎖? ? *@paramconfig 連接的url? ? *@paramlockName 競爭資源? ? */? ? ? ? ? ?

? ?publicDistributedLock(String config, String lockName){

? ? ? ? this.lockName = lockName;

? ? ? ? try {

? ? ? ? ? ? // 連接zookeeper? ? ? ? ? ? zk = new ZooKeeper(config, sessionTimeout, this);

? ? ? ? ? ? Stat stat = zk.exists(ROOT_LOCK, false);

? ? ? ? ? ? if (stat == null) {

? ? ? ? ? ? ? ? // 如果根節(jié)點不存在痴晦,則創(chuàng)建根節(jié)點? ? ? ? ? ? ? ? zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

? ? ? ? ? ? }

? ? ? ? } catch (IOException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? } catch (InterruptedException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? } catch (KeeperException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? }

? ? }

? ? // 節(jié)點監(jiān)視器? ? publicvoidprocess(WatchedEvent event){

? ? ? ? if (this.countDownLatch != null) {

? ? ? ? ? ? this.countDownLatch.countDown();

? ? ? ? }

? ? }

? ? publicvoidlock(){

? ? ? ? if (exceptionList.size() > 0) {

? ? ? ? ? ? throw new LockException(exceptionList.get(0));

? ? ? ? }

? ? ? ? try {

? ? ? ? ? ? if (this.tryLock()) {

? ? ? ? ? ? ? ? System.out.println(Thread.currentThread().getName() + " " + lockName + "獲得了鎖");

? ? ? ? ? ? ? ? return;

? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? // 等待鎖? ? ? ? ? ? ? ? waitForLock(WAIT_LOCK, sessionTimeout);

? ? ? ? ? ? }

? ? ? ? } catch (InterruptedException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? } catch (KeeperException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? }

? ? }

? ? publicbooleantryLock(){

? ? ? ? try {

? ? ? ? ? ? String splitStr = "_lock_";

? ? ? ? ? ? if (lockName.contains(splitStr)) {

? ? ? ? ? ? ? ? throw new LockException("鎖名有誤");

? ? ? ? ? ? }

? ? ? ? ? ? // 創(chuàng)建臨時有序節(jié)點? ? ? ? ? ? CURRENT_LOCK = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],

? ? ? ? ? ? ? ? ? ? ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

? ? ? ? ? ? System.out.println(CURRENT_LOCK + " 已經(jīng)創(chuàng)建");

? ? ? ? ? ? // 取所有子節(jié)點? ? ? ? ? ? List<String> subNodes = zk.getChildren(ROOT_LOCK, false);

? ? ? ? ? ? // 取出所有l(wèi)ockName的鎖? ? ? ? ? ? List<String> lockObjects = new ArrayList<String>();

? ? ? ? ? ? for (String node : subNodes) {

? ? ? ? ? ? ? ? String _node = node.split(splitStr)[0];

? ? ? ? ? ? ? ? if (_node.equals(lockName)) {

? ? ? ? ? ? ? ? ? ? lockObjects.add(node);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? Collections.sort(lockObjects);

? ? ? ? ? ? System.out.println(Thread.currentThread().getName() + " 的鎖是 " + CURRENT_LOCK);

? ? ? ? ? ? // 若當(dāng)前節(jié)點為最小節(jié)點南吮,則獲取鎖成功? ? ? ? ? ? if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {

? ? ? ? ? ? ? ? return true;

? ? ? ? ? ? }

? ? ? ? ? ? // 若不是最小節(jié)點,則找到自己的前一個節(jié)點? ? ? ? ? ? String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);

? ? ? ? ? ? WAIT_LOCK = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1);

? ? ? ? } catch (InterruptedException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? } catch (KeeperException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? }

? ? ? ? return false;

? ? }

? ? publicbooleantryLock(longtimeout, TimeUnit unit){

? ? ? ? try {

? ? ? ? ? ? if (this.tryLock()) {

? ? ? ? ? ? ? ? return true;

? ? ? ? ? ? }

? ? ? ? ? ? return waitForLock(WAIT_LOCK, timeout);

? ? ? ? } catch (Exception e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? }

? ? ? ? return false;

? ? }

? ? // 等待鎖? ? privatebooleanwaitForLock(String prev,longwaitTime)throwsKeeperException, InterruptedException{

? ? ? ? Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true);

? ? ? ? if (stat != null) {

? ? ? ? ? ? System.out.println(Thread.currentThread().getName() + "等待鎖 " + ROOT_LOCK + "/" + prev);

? ? ? ? ? ? this.countDownLatch = new CountDownLatch(1);

? ? ? ? ? ? // 計數(shù)等待阅酪,若等到前一個節(jié)點消失旨袒,則precess中進行countDown,停止等待术辐,獲取鎖? ? ? ? ? ? this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);

? ? ? ? ? ? this.countDownLatch = null;

? ? ? ? ? ? System.out.println(Thread.currentThread().getName() + " 等到了鎖");

? ? ? ? }

? ? ? ? return true;

? ? }

? ? publicvoidunlock(){

? ? ? ? try {

? ? ? ? ? ? System.out.println("釋放鎖 " + CURRENT_LOCK);

? ? ? ? ? ? zk.delete(CURRENT_LOCK, -1);

? ? ? ? ? ? CURRENT_LOCK = null;

? ? ? ? ? ? zk.close();

? ? ? ? } catch (InterruptedException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? } catch (KeeperException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? }

? ? }

? ? publicConditionnewCondition(){

? ? ? ? return null;

? ? }

? ? publicvoidlockInterruptibly()throwsInterruptedException{

? ? ? ? this.lock();

? ? }

? ? public classLockExceptionextendsRuntimeException{

? ? ? ? private static final long serialVersionUID = 1L;

? ? ? ? publicLockException(String e){

? ? ? ? ? ? super(e);

? ? ? ? }

? ? ? ? publicLockException(Exception e){

? ? ? ? ? ? super(e);

? ? ? ? }

? ? }

}



測試代碼

public classTest{

? ? static int n = 500;

? ? publicstaticvoidsecskill(){

? ? ? ? System.out.println(--n);

? ? }

? ? publicstaticvoidmain(String[] args){


? ? ? ? Runnable runnable = new Runnable() {

? ? ? ? ? ? publicvoidrun(){

? ? ? ? ? ? ? ? DistributedLock lock = null;

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? lock = new DistributedLock("127.0.0.1:2181", "test1");

? ? ? ? ? ? ? ? ? ? lock.lock();

? ? ? ? ? ? ? ? ? ? secskill();

? ? ? ? ? ? ? ? ? ? System.out.println(Thread.currentThread().getName() + "正在運行");

? ? ? ? ? ? ? ? } finally {

? ? ? ? ? ? ? ? ? ? if (lock != null) {

? ? ? ? ? ? ? ? ? ? ? ? lock.unlock();

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? };

? ? ? ? for (int i = 0; i < 10; i++) {

? ? ? ? ? ? Thread t = new Thread(runnable);

? ? ? ? ? ? t.start();

? ? ? ? }

? ? }

}



總體來說砚尽,如果了解到整個實現(xiàn)流程,使用zookeeper實現(xiàn)分布式鎖并不是很困難辉词,不過這也只是一個簡單的實現(xiàn)必孤,與前面實現(xiàn)Redis實現(xiàn)相比,本實現(xiàn)的穩(wěn)定性更強瑞躺,這是因為zookeeper的特性所致敷搪,在外界看來,zookeeper集群中每一個節(jié)點都是一致的幢哨。


https://www.cnblogs.com/liuyang0/p/6800538.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末赡勘,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子捞镰,更是在濱河造成了極大的恐慌闸与,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,104評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件岸售,死亡現(xiàn)場離奇詭異践樱,居然都是意外死亡,警方通過查閱死者的電腦和手機凸丸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評論 3 399
  • 文/潘曉璐 我一進店門拷邢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人屎慢,你說我怎么就攤上這事瞭稼。” “怎么了腻惠?”我有些...
    開封第一講書人閱讀 168,697評論 0 360
  • 文/不壞的土叔 我叫張陵弛姜,是天一觀的道長。 經(jīng)常有香客問我妖枚,道長廷臼,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,836評論 1 298
  • 正文 為了忘掉前任,我火速辦了婚禮荠商,結(jié)果婚禮上寂恬,老公的妹妹穿的比我還像新娘。我一直安慰自己莱没,他們只是感情好初肉,可當(dāng)我...
    茶點故事閱讀 68,851評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著饰躲,像睡著了一般牙咏。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上嘹裂,一...
    開封第一講書人閱讀 52,441評論 1 310
  • 那天妄壶,我揣著相機與錄音,去河邊找鬼寄狼。 笑死丁寄,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的泊愧。 我是一名探鬼主播伊磺,決...
    沈念sama閱讀 40,992評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼删咱!你這毒婦竟也來了屑埋?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,899評論 0 276
  • 序言:老撾萬榮一對情侶失蹤痰滋,失蹤者是張志新(化名)和其女友劉穎摘能,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體即寡,經(jīng)...
    沈念sama閱讀 46,457評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡徊哑,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,529評論 3 341
  • 正文 我和宋清朗相戀三年袜刷,在試婚紗的時候發(fā)現(xiàn)自己被綠了聪富。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,664評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡著蟹,死狀恐怖墩蔓,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情萧豆,我是刑警寧澤奸披,帶...
    沈念sama閱讀 36,346評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站涮雷,受9級特大地震影響阵面,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,025評論 3 334
  • 文/蒙蒙 一样刷、第九天 我趴在偏房一處隱蔽的房頂上張望仑扑。 院中可真熱鬧,春花似錦置鼻、人聲如沸镇饮。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽储藐。三九已至,卻和暖如春嘶是,著一層夾襖步出監(jiān)牢的瞬間钙勃,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評論 1 272
  • 我被黑心中介騙來泰國打工俊啼, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留肺缕,地道東北人。 一個月前我還...
    沈念sama閱讀 49,081評論 3 377
  • 正文 我出身青樓授帕,卻偏偏與公主長得像同木,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子跛十,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,675評論 2 359

推薦閱讀更多精彩內(nèi)容