InterProcessMutex Curator 分布式鎖

curator分布式鎖,大概過程:
創(chuàng)建臨時有序節(jié)點(diǎn)上煤,排序祟绊,最先創(chuàng)建節(jié)點(diǎn)的獲取到鎖牧抽,其他節(jié)點(diǎn)監(jiān)聽前一個節(jié)點(diǎn)刪除事件。當(dāng)監(jiān)聽到時扬舒,則重新進(jìn)行排序讲坎,index最小的獲取到鎖。


public class lockService {
    @Autowired
    private ZookeeperDao dao;
    InterProcessMutex lock = new InterProcessMutex(new CuratorClient().getClient(),"/lock");

    public  void loclMethod(){

        try {
            //獲取鎖
            //一直等待鎖
            //lock.acquire();
            //嘗試獲取鎖,如果在指定時間獲取鎖,則返回true
            if (lock.acquire(1000, TimeUnit.SECONDS)){
                int check = dao.check();
                if (check > 0){
                    System.out.println("售出");
                    dao.des(--check);
                }else {
                    System.out.println("沒有庫存");
                }
            }

            //Thread.sleep(50);
        } catch (Exception e) {
            System.out.println(e);
        }finally {
            try {
                //釋放鎖
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }
}

interProcessMutex.acquire(1000, TimeUnit.SECONDS)

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
{
    private final LockInternals internals;
    private final String basePath;

    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

    private static class LockData
    {
        final Thread owningThread;
        final String lockPath;
        //分布式鎖重入次數(shù)
        final AtomicInteger lockCount = new AtomicInteger(1);

        private LockData(Thread owningThread, String lockPath)
        {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }

    private static final String LOCK_NAME = "lock-";
    
    @Override
    public void acquire() throws Exception
    {  //獲取鎖瓮栗,一直等待
        if ( !internalLock(-1, null) )
        {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }
    //獲取鎖,等待指定時間
    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception
    {
        return internalLock(time, unit);
    }

    private boolean internalLock(long time, TimeUnit unit) throws Exception
    {    //獲取當(dāng)前線程
        Thread currentThread = Thread.currentThread();
        //從ConcurrentMap<Thread, LockData>中獲取當(dāng)前線程的鎖數(shù)據(jù)弥激,不為空微服,則直接獲取鎖
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {    //實(shí)現(xiàn)可重入
            // 統(tǒng)計重入次數(shù)
            lockData.lockCount.incrementAndGet();
            return true;
        }
        //ConcurrentMap<Thread, LockData>中沒有當(dāng)前線程的所數(shù)據(jù)缨历,則當(dāng)前線程嘗試去獲取鎖
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {  //獲取到鎖辛孵,封裝鎖數(shù)據(jù)
            LockData newLockData = new LockData(currentThread, lockPath);
            //保存到ConcurrentMap<Thread, LockData>緩存中
            threadData.put(currentThread, newLockData);
            return true;
        }
        return false;
    }
}

嘗試獲取鎖
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());

public class LockInternals
{
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        final long      startMillis = System.currentTimeMillis();
        //無線等待時millisToWait 為null
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        //節(jié)點(diǎn)數(shù)據(jù)
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        //當(dāng)前重試獲取鎖次數(shù)
        int             retryCount = 0;
        // 在Zookeeper中創(chuàng)建的臨時順序節(jié)點(diǎn)的路徑觉吭,相當(dāng)于一把待激活的分布式鎖
        // 激活條件:同級目錄子節(jié)點(diǎn)鲜滩,名稱排序最小(排隊(duì)榜聂,公平鎖)
        String          ourPath = null;
        // 是否已經(jīng)持有分布式鎖
        boolean         hasTheLock = false;
        // 是否已經(jīng)完成嘗試獲取分布式鎖的操作
        boolean         isDone = false;
        while ( !isDone )
        {    
            isDone = true;
            try
            {  //StandardLockInternalsDriver须肆,創(chuàng)建有序臨時節(jié)點(diǎn)
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                //是否獲取當(dāng)前鎖
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }
        if ( hasTheLock )
        {  //成功獲取鎖
            return ourPath;
        }
        return null;
    }
     //是否獲取當(dāng)鎖
     private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {  //是否擁有分布式鎖
        boolean     haveTheLock = false;
        //是否需要刪除子節(jié)點(diǎn)
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }
            //循環(huán)嘗試獲取鎖
            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {  //排序節(jié)點(diǎn)
                List<String>        children = getSortedChildren();
                // 獲取前面自己創(chuàng)建的臨時順序子節(jié)點(diǎn)的名稱
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1);
                // StandardLockInternalsDriver
                //判斷是否回去鎖,沒有獲取返回監(jiān)聽路徑
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {  //獲得鎖
                    haveTheLock = true;
                }
                else
                {    //沒有所得到鎖泄隔,監(jiān)聽上一個臨時順序節(jié)點(diǎn)
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                    synchronized(this)
                    {
                        try
                        {    //上一個臨時順序節(jié)點(diǎn)如果被刪除拒贱,會喚醒當(dāng)前線程繼續(xù)競爭鎖
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {  //獲取鎖超時,標(biāo)記刪除之前創(chuàng)建的臨時順序節(jié)點(diǎn)
                                    doDelete = true;  
                                    break;
                                }
                                wait(millisToWait);
                            }
                            else
                            {
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e )
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }
    //創(chuàng)建有序臨時節(jié)點(diǎn)
     @Override
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
    {
        String ourPath;
        if ( lockNodeBytes != null )
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        }
        else
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }
}

StandardLockInternalsDriver.getsTheLock(client, children, sequenceNodeName, maxLeases);

public class StandardLockInternalsDriver implements LockInternalsDriver
{
     @Override
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
    {    // 之前創(chuàng)建的臨時順序節(jié)點(diǎn)在排序后的子節(jié)點(diǎn)列表中的索引
        int             ourIndex = children.indexOf(sequenceNodeName);
         // 校驗(yàn)之前創(chuàng)建的臨時順序節(jié)點(diǎn)是否有效
        validateOurIndex(sequenceNodeName, ourIndex);
        //maxLeases 初始化為1
        //ourIndex 為0 表示是當(dāng)前排序的節(jié)點(diǎn)里最先創(chuàng)建出節(jié)點(diǎn)的連接佛嬉,也就是越早創(chuàng)建節(jié)點(diǎn)的越早獲取到鎖
        boolean         getsTheLock = ourIndex < maxLeases;
        //獲取到鎖則不需要監(jiān)聽逻澳,沒有獲取到鎖,則監(jiān)聽前一個節(jié)點(diǎn)的刪除事件
        String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
         // 返回獲取鎖的結(jié)果暖呕,交由上層繼續(xù)處理(添加監(jiān)聽等操作)
        return new PredicateResults(pathToWatch, getsTheLock);
    }
//校驗(yàn)節(jié)點(diǎn)有效性
static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException
    {
        if ( ourIndex < 0 )
        {     // 由于會話過期或連接丟失等原因斜做,該線程創(chuàng)建的臨時順序節(jié)點(diǎn)被Zookeeper服務(wù)端刪除,往外拋出NoNodeException
            throw new KeeperException.NoNodeException("Sequential path not found: " + sequenceNodeName);
        }
    }
}

釋放鎖

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
{
      @Override
    public void release() throws Exception
    {
        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData == null )
        {  // 無法從映射表中獲取鎖信息湾揽,不持有鎖
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }

        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 )
        {  // 鎖是可重入的,初始值為1钝腺,原子-1到0抛姑,鎖才釋放
            return;
        }
        if ( newLockCount < 0 )
        {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try
        {    //釋放鎖
            internals.releaseLock(lockData.lockPath);
        }
        finally
        {    //從映射中刪除當(dāng)前線程信息
            threadData.remove(currentThread);
        }
    }
}


public class LockInternals
{
       final void releaseLock(String lockPath) throws Exception
    {    //刪除監(jiān)聽
        client.removeWatchers();
        revocable.set(null);
        //刪除節(jié)點(diǎn)
        deleteOurPath(lockPath);
    }

    private void deleteOurPath(String ourPath) throws Exception
    {
        try
        {    // 后臺不斷嘗試刪除
            client.delete().guaranteed().forPath(ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            // ignore - already deleted (possibly expired session, etc.)
        }
    }
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市艳狐,隨后出現(xiàn)的幾起案子定硝,更是在濱河造成了極大的恐慌,老刑警劉巖毫目,帶你破解...
    沈念sama閱讀 221,406評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蔬啡,死亡現(xiàn)場離奇詭異侥加,居然都是意外死亡翩肌,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,395評論 3 398
  • 文/潘曉璐 我一進(jìn)店門怔檩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來刮便,“玉大人空猜,你說我怎么就攤上這事。” “怎么了辈毯?”我有些...
    開封第一講書人閱讀 167,815評論 0 360
  • 文/不壞的土叔 我叫張陵坝疼,是天一觀的道長。 經(jīng)常有香客問我谆沃,道長钝凶,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,537評論 1 296
  • 正文 為了忘掉前任唁影,我火速辦了婚禮耕陷,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘据沈。我一直安慰自己哟沫,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,536評論 6 397
  • 文/花漫 我一把揭開白布卓舵。 她就那樣靜靜地躺著南用,像睡著了一般。 火紅的嫁衣襯著肌膚如雪掏湾。 梳的紋絲不亂的頭發(fā)上裹虫,一...
    開封第一講書人閱讀 52,184評論 1 308
  • 那天,我揣著相機(jī)與錄音融击,去河邊找鬼筑公。 笑死,一個胖子當(dāng)著我的面吹牛尊浪,可吹牛的內(nèi)容都是我干的匣屡。 我是一名探鬼主播,決...
    沈念sama閱讀 40,776評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼拇涤,長吁一口氣:“原來是場噩夢啊……” “哼捣作!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起鹅士,我...
    開封第一講書人閱讀 39,668評論 0 276
  • 序言:老撾萬榮一對情侶失蹤券躁,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后掉盅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體也拜,經(jīng)...
    沈念sama閱讀 46,212評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,299評論 3 340
  • 正文 我和宋清朗相戀三年趾痘,在試婚紗的時候發(fā)現(xiàn)自己被綠了慢哈。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,438評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡永票,死狀恐怖卵贱,靈堂內(nèi)的尸體忽然破棺而出滥沫,到底是詐尸還是另有隱情,我是刑警寧澤艰赞,帶...
    沈念sama閱讀 36,128評論 5 349
  • 正文 年R本政府宣布佣谐,位于F島的核電站,受9級特大地震影響方妖,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜罚攀,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,807評論 3 333
  • 文/蒙蒙 一党觅、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧斋泄,春花似錦杯瞻、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,279評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至募胃,卻和暖如春旗唁,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背痹束。 一陣腳步聲響...
    開封第一講書人閱讀 33,395評論 1 272
  • 我被黑心中介騙來泰國打工检疫, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人祷嘶。 一個月前我還...
    沈念sama閱讀 48,827評論 3 376
  • 正文 我出身青樓屎媳,卻偏偏與公主長得像,于是被迫代替她去往敵國和親论巍。 傳聞我的和親對象是個殘疾皇子烛谊,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,446評論 2 359

推薦閱讀更多精彩內(nèi)容