分布式鎖實(shí)戰(zhàn):基于Zookeeper的實(shí)現(xiàn)

1. Zookeeper概述

Zookeeper(后續(xù)簡(jiǎn)稱ZK)是一個(gè)分布式的闺属,開放源碼的分布式應(yīng)用程序協(xié)調(diào)服務(wù)汪疮,通常以集群模式運(yùn)轉(zhuǎn)循诉,其協(xié)調(diào)能力可以理解為是基于觀察者設(shè)計(jì)模式來實(shí)現(xiàn)的渡嚣;ZK服務(wù)會(huì)使用Znode存儲(chǔ)使用者的數(shù)據(jù)洁仗,并將這些數(shù)據(jù)以樹形目錄的形式來組織管理讲衫,支持使用者以觀察者的角色指定自己關(guān)注哪些節(jié)點(diǎn)\數(shù)據(jù)的變更缕棵,當(dāng)這些變更發(fā)生時(shí),ZK會(huì)通知其觀察者涉兽;為滿足本篇目標(biāo)所需招驴,著重介紹以下幾個(gè)關(guān)鍵特性:

  • 數(shù)據(jù)組織:數(shù)據(jù)節(jié)點(diǎn)以樹形目錄(類似文件系統(tǒng))組織管理,每一個(gè)節(jié)點(diǎn)中都會(huì)保存數(shù)據(jù)信息和節(jié)點(diǎn)信息枷畏。
  • 集群模式:通常是由3别厘、5個(gè)基數(shù)實(shí)例組成集群,當(dāng)超過半數(shù)服務(wù)實(shí)例正常工作就能對(duì)外提供服務(wù)矿辽,既能避免單點(diǎn)故障丹允,又盡量高可用,每個(gè)服務(wù)實(shí)例都有一個(gè)數(shù)據(jù)備份袋倔,以實(shí)現(xiàn)數(shù)據(jù)全局一致
  • 順序更新:更新請(qǐng)求都會(huì)轉(zhuǎn)由leader執(zhí)行雕蔽,來自同一客戶端的更新將按照發(fā)送的順序被寫入到ZK,處理寫請(qǐng)求創(chuàng)建Znode時(shí)宾娜,Znode名稱后會(huì)被分配一個(gè)全局唯一的遞增編號(hào)批狐,可以通過順序號(hào)推斷請(qǐng)求的順序,利用這個(gè)特性可以實(shí)現(xiàn)高級(jí)協(xié)調(diào)服務(wù)
  • 監(jiān)聽機(jī)制:給某個(gè)節(jié)點(diǎn)注冊(cè)監(jiān)聽器前塔,該節(jié)點(diǎn)一旦發(fā)生變更(例如更新或者刪除)晃虫,監(jiān)聽者就會(huì)收到一個(gè)Watch Event,可以感知到節(jié)點(diǎn)\數(shù)據(jù)的變更
  • 臨時(shí)節(jié)點(diǎn):session鏈接斷開臨時(shí)節(jié)點(diǎn)就沒了子寓,不能創(chuàng)建子節(jié)點(diǎn)(很關(guān)鍵)

ZK的分布式鎖正是基于以上特性來實(shí)現(xiàn)的滑废,簡(jiǎn)單來說是:

  • 臨時(shí)節(jié)點(diǎn):用于支撐異常情況下的鎖自動(dòng)釋放能力

  • 順序節(jié)點(diǎn):用于支撐公平鎖獲取鎖和排隊(duì)等待的能力

  • 監(jiān)聽機(jī)制:用于支撐搶鎖能力

  • 集群模式:用于支撐鎖服務(wù)的高可用

2. 加解鎖的流程描述

  1. 創(chuàng)建一個(gè)永久節(jié)點(diǎn)作為鎖節(jié)點(diǎn)(/lock2)
  2. 試圖加鎖的客戶端在指定鎖名稱節(jié)點(diǎn)(/lock2)下,創(chuàng)建臨時(shí)順序子節(jié)點(diǎn)
  3. 獲取鎖節(jié)點(diǎn)(/lock2)下所有子節(jié)點(diǎn)
  4. 對(duì)所獲取的子節(jié)點(diǎn)按節(jié)點(diǎn)自增序號(hào)從小到大排序
  5. 判斷自己是不是第一個(gè)子節(jié)點(diǎn)寂屏,若是贰谣,則獲取鎖
  6. 若不是,則監(jiān)聽比該節(jié)點(diǎn)小的那個(gè)節(jié)點(diǎn)的刪除事件(這種只監(jiān)聽前一個(gè)節(jié)點(diǎn)的方式避免了驚群效應(yīng))
  7. 若是阻塞申請(qǐng)鎖迁霎,則申請(qǐng)鎖的操作可增加阻塞等待
  8. 若監(jiān)聽事件生效(說明前節(jié)點(diǎn)釋放了吱抚,可以嘗試去獲取鎖),則回到第3步重新進(jìn)行判斷考廉,直到獲取到鎖
  9. 解鎖時(shí)秘豹,將第一個(gè)子節(jié)點(diǎn)刪除釋放

3. ZK分布式鎖的能力

可能讀者是單篇閱讀,這里引入上一篇《分布式鎖上-初探》中的一些內(nèi)容昌粤,一個(gè)分布式鎖應(yīng)具備這樣一些功能特點(diǎn):

  • 互斥性:在同一時(shí)刻既绕,只有一個(gè)客戶端能持有鎖

  • 安全性:避免死鎖啄刹,如果某個(gè)客戶端獲得鎖之后處理時(shí)間超過最大約定時(shí)間,或者持鎖期間發(fā)生了故障導(dǎo)致無法主動(dòng)釋放鎖岸更,其持有的鎖也能夠被其他機(jī)制正確釋放鸵膏,并保證后續(xù)其它客戶端也能加鎖,整個(gè)處理流程繼續(xù)正常執(zhí)行

  • 可用性:也被稱作容錯(cuò)性怎炊,分布式鎖需要有高可用能力谭企,避免單點(diǎn)故障,當(dāng)提供鎖的服務(wù)節(jié)點(diǎn)故障(宕機(jī))時(shí)不影響服務(wù)運(yùn)行评肆,這里有兩種模式:一種是分布式鎖服務(wù)自身具備集群模式债查,遇到故障能自動(dòng)切換恢復(fù)工作;另一種是客戶端向多個(gè)獨(dú)立的鎖服務(wù)發(fā)起請(qǐng)求瓜挽,當(dāng)某個(gè)鎖服務(wù)故障時(shí)仍然可以從其他鎖服務(wù)讀取到鎖信息(Redlock)

  • 可重入性:對(duì)同一個(gè)鎖盹廷,加鎖和解鎖必須是同一個(gè)線程,即不能把其他線程程持有的鎖給釋放了

  • 高效靈活:加鎖久橙、解鎖的速度要快俄占;支持阻塞和非阻塞;支持公平鎖和非公平鎖

基于上文的內(nèi)容淆衷,這里簡(jiǎn)單總結(jié)一下ZK的能力矩陣(其它分布式鎖的情況會(huì)在后續(xù)文章中補(bǔ)充):

關(guān)于性能不太高的一種說法

因?yàn)槊看卧趧?chuàng)建鎖和釋放鎖的過程中缸榄,都要?jiǎng)討B(tài)創(chuàng)建、銷毀臨時(shí)節(jié)點(diǎn)來實(shí)現(xiàn)鎖功能祝拯。ZK中創(chuàng)建和刪除節(jié)點(diǎn)只能通過Leader服務(wù)器來執(zhí)行甚带,然后Leader服務(wù)器還需要將數(shù)據(jù)同步到所有的Follower機(jī)器上,這樣頻繁的網(wǎng)絡(luò)通信佳头,性能的短板是非常突出的鹰贵。在高性能,高并發(fā)的場(chǎng)景下康嘉,不建議使用ZooKeeper的分布式鎖碉输。

由于ZooKeeper的高可用特性,在并發(fā)量不是太高的場(chǎng)景亭珍,也推薦使用ZK的分布式鎖敷钾。

4. InterProcessMutex 使用示例

Zookeeper 客戶端框架 Curator 提供的 InterProcessMutex 是分布式鎖的一種實(shí)現(xiàn),acquire 方法阻塞|非阻塞獲取鎖块蚌,release 方法釋放鎖,另外還提供了可撤銷膘格、可重入功能峭范。
4.1 接口介紹
// 獲取互斥鎖
public void acquire() throws Exception;// 在給定的時(shí)間內(nèi)獲取互斥鎖
public boolean acquire(long time, TimeUnit unit) throws Exception;
// 釋放鎖處理public void release() throws Exception;// 如果當(dāng)前線程獲取了互斥鎖,則返回
trueboolean isAcquiredInThisProcess();
4.2 pom依賴
<dependency>
  <groupId>org.apache.logging.log4j</groupId>
  <artifactId>log4j-core</artifactId>
  <version>2.8.2</version>
</dependency>
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.5.7</version>
</dependency>
<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-framework</artifactId>
  <version>4.3.0</version>
</dependency>
<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-recipes</artifactId>
  <version>4.3.0</version>
</dependency>
<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-client</artifactId>
  <version>4.3.0</version>
</dependency>
4.3 示例
package com.atguigu.case3;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLockTest {

    public static void main(String[] args) {

        // 創(chuàng)建分布式鎖1
        InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

        // 創(chuàng)建分布式鎖2
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.acquire();
                    System.out.println("線程1 獲取到鎖");

                    lock1.acquire();
                    System.out.println("線程1 再次獲取到鎖");

                    Thread.sleep(5 * 1000);

                    lock1.release();
                    System.out.println("線程1 釋放鎖");

                    lock1.release();
                    System.out.println("線程1  再次釋放鎖");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.acquire();
                    System.out.println("線程2 獲取到鎖");

                    lock2.acquire();
                    System.out.println("線程2 再次獲取到鎖");

                    Thread.sleep(5 * 1000);

                    lock2.release();
                    System.out.println("線程2 釋放鎖");

                    lock2.release();
                    System.out.println("線程2  再次釋放鎖");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    private static CuratorFramework getCuratorFramework() {

        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);

        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("xxx:2181,xxx:2181,xxx:2181")
                .connectionTimeoutMs(2000)
                .sessionTimeoutMs(2000)
                .retryPolicy(policy).build();

        // 啟動(dòng)客戶端
        client.start();

        System.out.println("zookeeper 啟動(dòng)成功");
        return client;
    }
}

5. DIY一個(gè)閹割版的分布式鎖

通過這個(gè)實(shí)例對(duì)照第2節(jié)內(nèi)容來理解加解鎖的流程瘪贱,以及如何避免驚群效應(yīng)纱控。

package com.rock.case2;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * zk 分布式鎖 v1版本:
 * 完成功能 :
 *      1. 避免了驚群效應(yīng)
 * 缺失功能:
 *      1. 超時(shí)控制
 *      2. 讀寫鎖
 *      3. 重入控制
 */
public class DistributedLock {

    private String connectString;
    private int sessionTimeout;
    private ZooKeeper zk;

    private CountDownLatch connectLatch = new CountDownLatch(1);
    private CountDownLatch waitLatch = new CountDownLatch(1);

    private String waitPath;
    private String currentNode;
    private String LOCK_ROOT_PATH;

    private static String NODE_PREFIX = "w";

    public DistributedLock(String connectString, int sessionTimeout, String lockName) {
        //TODO:數(shù)據(jù)校驗(yàn)
        this.connectString = connectString;
        this.sessionTimeout = sessionTimeout;
        this.LOCK_ROOT_PATH = lockName;
    }


    public void init() throws IOException, KeeperException, InterruptedException {
        // 建聯(lián)
        zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
            // connectLatch  連接上zk后  釋放
            if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
                connectLatch.countDown();
            }
        });

        connectLatch.await();// 等待zk正常連接后

        // 判斷鎖名稱節(jié)點(diǎn)是否存在
        Stat stat = zk.exists(LOCK_ROOT_PATH, false);
        if (stat == null) {
            // 創(chuàng)建一下鎖名稱節(jié)點(diǎn)
            try {
                zk.create(LOCK_ROOT_PATH, LOCK_ROOT_PATH.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            } catch (KeeperException e) {
                //并發(fā)創(chuàng)建沖突忽略辆毡。
                if (!e.code().name().equals("NODEEXISTS")) {
                    throw e;
                }
            }
        }
    }

    /**
     * 待補(bǔ)充功能:
     * 1. 超時(shí)設(shè)置
     * 2. 讀寫區(qū)分
     * 3. 重入控制
     */
    public void zklock() throws KeeperException, InterruptedException {
        if (!tryLock()) {
            waitLock();
            zklock();
        }
    }

    /**
     *
     */
    private void waitLock() throws KeeperException, InterruptedException {
        try {
            zk.getData(waitPath, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    // waitLatch  需要釋放
                    if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
                        waitLatch.countDown();
                    }
                }
            }, new Stat());
            // 等待監(jiān)聽
            waitLatch.await();
        } catch (KeeperException.NoNodeException e) {
            //如果等待的節(jié)點(diǎn)已經(jīng)被清除了,不等了,再嘗試去搶鎖
            return;
        }

    }

    private boolean tryLock() throws KeeperException, InterruptedException {

        currentNode = zk.create(LOCK_ROOT_PATH + "/" + NODE_PREFIX, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        // 判斷創(chuàng)建的節(jié)點(diǎn)是否是最小的序號(hào)節(jié)點(diǎn),如果是獲取到鎖甜害;如果不是舶掖,監(jiān)聽他序號(hào)前一個(gè)節(jié)點(diǎn)
        List<String> children = zk.getChildren(LOCK_ROOT_PATH, false);
        // 如果children 只有一個(gè)值,那就直接獲取鎖尔店; 如果有多個(gè)節(jié)點(diǎn)眨攘,需要判斷,誰最小
        if (children.size() == 1) {
            return true;
        } else {
            String thisNode = currentNode.substring(LOCK_ROOT_PATH.length() + 1);
            // 通過w00000000獲取該節(jié)點(diǎn)在children集合的位置
            int index = children.indexOf(thisNode);
            if (index == 0) {
                //自己就是第一個(gè)節(jié)點(diǎn)
                return true;
            }
            // 需要監(jiān)聽  他前一個(gè)節(jié)點(diǎn)變化
            waitPath = LOCK_ROOT_PATH + "/" + children.get(index - 1);
        }
        return false;
    }


    // 解鎖
    public void unZkLock() {
        // 刪除節(jié)點(diǎn)
        try {
            zk.delete(this.currentNode, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末嚣州,一起剝皮案震驚了整個(gè)濱河市鲫售,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌该肴,老刑警劉巖情竹,帶你破解...
    沈念sama閱讀 219,539評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異匀哄,居然都是意外死亡秦效,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門涎嚼,熙熙樓的掌柜王于貴愁眉苦臉地迎上來阱州,“玉大人,你說我怎么就攤上這事铸抑」钡ⅲ” “怎么了?”我有些...
    開封第一講書人閱讀 165,871評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵鹊汛,是天一觀的道長(zhǎng)蒲赂。 經(jīng)常有香客問我,道長(zhǎng)刁憋,這世上最難降的妖魔是什么滥嘴? 我笑而不...
    開封第一講書人閱讀 58,963評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮至耻,結(jié)果婚禮上若皱,老公的妹妹穿的比我還像新娘。我一直安慰自己尘颓,他們只是感情好走触,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評(píng)論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著疤苹,像睡著了一般互广。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,763評(píng)論 1 307
  • 那天惫皱,我揣著相機(jī)與錄音像樊,去河邊找鬼。 笑死旅敷,一個(gè)胖子當(dāng)著我的面吹牛生棍,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播媳谁,決...
    沈念sama閱讀 40,468評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼涂滴,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了韩脑?” 一聲冷哼從身側(cè)響起氢妈,我...
    開封第一講書人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎段多,沒想到半個(gè)月后首量,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,850評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡进苍,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評(píng)論 3 338
  • 正文 我和宋清朗相戀三年加缘,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片觉啊。...
    茶點(diǎn)故事閱讀 40,144評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡拣宏,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出杠人,到底是詐尸還是另有隱情勋乾,我是刑警寧澤,帶...
    沈念sama閱讀 35,823評(píng)論 5 346
  • 正文 年R本政府宣布嗡善,位于F島的核電站辑莫,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏罩引。R本人自食惡果不足惜各吨,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望袁铐。 院中可真熱鬧揭蜒,春花似錦、人聲如沸剔桨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽洒缀。三九已至瑰谜,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背似舵。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留葱峡,地道東北人砚哗。 一個(gè)月前我還...
    沈念sama閱讀 48,415評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像砰奕,于是被迫代替她去往敵國(guó)和親蛛芥。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容