curator 介紹 party1

在如今服務(wù)器的開發(fā)與部署時(shí)瞭吃,往往考慮的不是單機(jī)服務(wù)的承載力了郁副,而是更高一階,如何設(shè)計(jì)出高可用笼才,高負(fù)載尾膊,高容量的服務(wù)架構(gòu)媳危。并且業(yè)務(wù)開發(fā)不是簡(jiǎn)單的將所有業(yè)務(wù)糅合在單臺(tái)服務(wù)上,而是分模塊冈敛,分功能分而治理待笑,才有了微服務(wù)架構(gòu)。然后微服務(wù)設(shè)計(jì)中的每一個(gè)模塊都是可以設(shè)計(jì)為一個(gè)集群抓谴,例如常用的暮蹂,用戶模塊,權(quán)限模塊癌压,商品模塊仰泻,訂單模塊,支付模塊滩届,供應(yīng)鏈等等我纪。服務(wù)器的設(shè)計(jì)越來越復(fù)雜,對(duì)于開發(fā)人員的技術(shù)能力提出更高要求丐吓。
先如今微服務(wù)浅悉,分布式,集群的架構(gòu)思想中券犁,zookeeper成為了大多首選并且非常重要的中間件术健,例如我們熟悉的Hadoop,dubbo中粘衬,都出現(xiàn)了zookeeper 荞估。zookeeper有很多特性咳促,例如有注冊(cè)訂閱,分布式鎖勘伺,隊(duì)列等等功能跪腹。但是zookeeper中提供的java api 并不是很友好,使用起來容易踩坑飞醉。例如創(chuàng)建path時(shí)冲茸,需要判斷path的parent是否存在,必須先創(chuàng)建parent path才能創(chuàng)建子路徑缅帘。還有在添加watcher事件時(shí)轴术,一旦該事件觸發(fā)一次后,如果沒有主動(dòng)將事件重新設(shè)置钦无,他不會(huì)收到第二次逗栽。還有其他一些不太友好的api開發(fā)就不在贅述。
所以才引入了curator工具失暂,他實(shí)際是更高級(jí)的api彼宠,使用起來更加方便。但內(nèi)部核心也是使用zookeeper提供的api弟塞,只是在開發(fā)中不那么繁瑣而已凭峡。
舉個(gè)創(chuàng)建path例子:

  public void createPath() {
        String host = "127.0.0.1:2181";
        String path = root + "/my_path";
        CuratorFramework curator = CuratorClient.create(host);
        try {
            String last = curator.create().creatingParentsIfNeeded().forPath(path, "123".getBytes());
            logger.info("創(chuàng)建路徑完成 " + last);
        } catch (Exception e) {
            e.printStackTrace();
            logger.info("創(chuàng)建路徑失敗 異常類型:" + e.getClass().getName() + ", message:" + e.getMessage());
        }
    }

創(chuàng)建路徑,是不是很簡(jiǎn)單宣肚,不需要關(guān)心root是否已經(jīng)創(chuàng)建想罕。curator自己會(huì)去做驗(yàn)證判斷是否需要?jiǎng)?chuàng)建root路徑悠栓。然而霉涨,我們?cè)偕A一下,既然zookeeper提供了很多特性惭适,那么curator是否也能足夠支撐呢笙瑟?在curator組件中,recipes模塊中癞志,可以了解到很多有意思的地方:


image.png

可以看到他提供了很多功能往枷,例如原子計(jì)算,柵欄凄杯,緩存错洁,選舉,鎖戒突,隊(duì)列等屯碴,提供了很豐富的功能。 那么我們來分析一下 curator如何利用zookeeper的特性膊存,實(shí)現(xiàn)這些功能的导而。

首先需要了解一些基本嘗試忱叭,例如zookeeper中Watcher有哪些事件

public enum EventType {
            None (-1),
            NodeCreated (1),
            NodeDeleted (2),
            NodeDataChanged (3),
            NodeChildrenChanged (4);
}

包含了節(jié)點(diǎn)創(chuàng)建,節(jié)點(diǎn)刪除今艺,節(jié)點(diǎn)數(shù)據(jù)變革和子節(jié)點(diǎn)變更韵丑,這些是zookeeper自己的watcher事件類型。
那么curator組件還會(huì)提出哪些自己的事件呢虚缎?

public enum CuratorEventType
{
    /**
     * Corresponds to {@link CuratorFramework#create()}
     */
    CREATE,

    /**
     * Corresponds to {@link CuratorFramework#delete()}
     */
    DELETE,

    /**
     * Corresponds to {@link CuratorFramework#checkExists()}
     */
    EXISTS,

    /**
     * Corresponds to {@link CuratorFramework#getData()}
     */
    GET_DATA,

    /**
     * Corresponds to {@link CuratorFramework#setData()}
     */
    SET_DATA,

    /**
     * Corresponds to {@link CuratorFramework#getChildren()}
     */
    CHILDREN,

  //....后面還有很多 事件
}

這些事件與zookeeper沒有直接關(guān)系撵彻,而是curator通過調(diào)用相應(yīng)api后,會(huì)觸發(fā)相應(yīng)的事件遥巴,例如調(diào)用create()方法千康,會(huì)觸發(fā)CREATE事件。如果調(diào)用checkExists方法铲掐,會(huì)觸發(fā)EXISTES事件拾弃。

cache包

該包內(nèi)主要熟悉NodeCache和PathChildrenCache

? NodeCache,是指可以從本地cache中得到節(jié)點(diǎn)數(shù)據(jù)摆霉,并且該node可以增加watcher事件豪椿,例如節(jié)點(diǎn)的 更新/創(chuàng)建/刪除。然后重新拉取數(shù)據(jù)携栋,然后通過本地注冊(cè)的listeners搭盾,他們會(huì)得到變更通知。

    private final CuratorFramework client;
    private final String path;
    private final boolean dataIsCompressed;
    private final AtomicReference<ChildData> data = new AtomicReference<ChildData>(null);
    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
    private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
    private final AtomicBoolean isConnected = new AtomicBoolean(true);
    private ConnectionStateListener connectionStateListener = new ConnectionStateListener();
    private Watcher watcher = new Watcher();

這些是NodeCache的基本屬性婉支,
listeners是存儲(chǔ)了節(jié)點(diǎn)緩存變更的監(jiān)聽器鸯隅。
data是當(dāng)前節(jié)點(diǎn)的存儲(chǔ)的數(shù)據(jù),從zookeeper節(jié)點(diǎn)上緩存在本地的數(shù)據(jù)
connectionStateListener是連接狀態(tài)變更監(jiān)聽器向挖,例如重連蝌以,掉線等事件
watcher就是與zookeeper中的一樣,針對(duì)路徑進(jìn)行監(jiān)聽何之。

    public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
    {
        this.client = client;
        this.path = PathUtils.validatePath(path);
        this.dataIsCompressed = dataIsCompressed;
    }

普通的構(gòu)造器跟畅,最重要的是一個(gè)path路徑和client,當(dāng)聲明對(duì)象后溶推,就要啟動(dòng)該NodeCache徊件。

    public void     start(boolean buildInitial) throws Exception
    {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
        client.getConnectionStateListenable().addListener(connectionStateListener);

        if ( buildInitial )
        {
            client.checkExists().creatingParentContainersIfNeeded().forPath(path);
            internalRebuild();
        }
        reset();
    }

首先會(huì)將connectionStateListener狀態(tài)監(jiān)聽器添加到client狀態(tài)監(jiān)聽列表中。如果buildInitial=true蒜危,需要初始化虱痕,那么嘗試創(chuàng)建parent,然后獲取zk上的節(jié)點(diǎn)數(shù)據(jù)辐赞。最終執(zhí)行reset方法部翘。


    private void     reset() throws Exception
    {
        if ( (state.get() == State.STARTED) && isConnected.get() )
        {
            client.checkExists().creatingParentContainersIfNeeded()
              .usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
        }
    }

reset方法其實(shí)將watcher添加path路徑中,并且針對(duì)checkExist方法增加回調(diào)方法backgroundCallback占拍,那么該回調(diào)拿到的CuratorEvent事件肯定是EXIST事件略就。其實(shí)rest并沒有獲取節(jié)點(diǎn)數(shù)據(jù)捎迫。
看一下最終調(diào)用方法processBackgroundResult()方法:

    private void processBackgroundResult(CuratorEvent event) throws Exception
    {
        switch ( event.getType() )
        {
            case GET_DATA:
            {
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    ChildData childData = new ChildData(path, event.getStat(), event.getData());
                    setNewData(childData);
                }
                break;
            }

            case EXISTS:
            {
                if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
                {
                    setNewData(null);
                }
                else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    if ( dataIsCompressed )
                    {                       
                          client.getData().decompressed().usingWatcher(watcher)
                                                 .inBackground(backgroundCallback).forPath(path);
                    }
                    else {
                        client.getData().usingWatcher(watcher)
                                  .inBackground(backgroundCallback).forPath(path);           
                    }
                }
                break;
            }
        }
    }

當(dāng)限制任然是EXISTS事件時(shí),判斷是否有節(jié)點(diǎn)表牢,如果沒有則setNewData方法窄绒,即設(shè)置Node的data數(shù)據(jù)為空崔兴。那么如果存在節(jié)點(diǎn)敲茄,他然后沒有去主動(dòng)獲取得到data數(shù)據(jù)位谋,怎么做的?看一下他執(zhí)行了getData()方法掏父,并且添加了watcher事件秆剪,但是任然通過回調(diào)方法赊淑,那么此時(shí)回調(diào)方法是GET_DATA事件了。最終processBackgroundResult方法是執(zhí)行了case GET_DATA這塊代碼陶缺。因?yàn)榇藭r(shí)發(fā)起獲取數(shù)據(jù)時(shí)饱岸,會(huì)將數(shù)據(jù)添加到CuratorEvent中徽千,此時(shí)生成了ChildData對(duì)象,包括了path罐栈,stat和節(jié)點(diǎn)數(shù)據(jù)等信息荠诬。
考慮一下zookeeper中Watcher是什么時(shí)候才能觸發(fā)事件柑贞,當(dāng)然是節(jié)點(diǎn)刪除聂抢,更新琳疏,或者創(chuàng)建才會(huì)發(fā)起,那么怎么才能使用在NodeCache中新荤。再來看一下NodeCache自帶的屬性watcher實(shí)現(xiàn)類

    private Watcher watcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            try
            {
                reset();
            }
            catch(Exception e)
            {
                ThreadUtils.checkInterrupted(e);
                handleException(e);
            }
        }
    };

其實(shí)他任然調(diào)用reset方法苛骨,就是這么簡(jiǎn)單苟呐。還是先是發(fā)起EXISTS事件牵素,然后GET_DATA事件笆呆。但是考慮清楚,watcher事件是一次性觸發(fā)功能单起,不會(huì)執(zhí)行第二次嘀倒,所以在reset中测蘑,都會(huì)對(duì)path添加watcher事件康二。
那么NodeCache節(jié)點(diǎn)變更沫勿,如何通知添加在監(jiān)聽容器內(nèi)的監(jiān)聽器的产雹?

    private void setNewData(ChildData newData) throws InterruptedException
    {
        ChildData   previousData = data.getAndSet(newData);
        if ( !Objects.equal(previousData, newData) )
        {
            listeners.forEach
            (
                new Function<NodeCacheListener, Void>()
                {
                    @Override
                    public Void apply(NodeCacheListener listener)
                    {
                        try
                        {
                            listener.nodeChanged();
                        }
                        catch ( Exception e )
                        {
                            ThreadUtils.checkInterrupted(e);
                            log.error("Calling listener", e);
                        }
                        return null;
                    }
                }
            );
//.....
    }

在setNewData方法中蔓挖,通過與原子應(yīng)用的data中之前存儲(chǔ)的previousData比較瘟判,如果不同。則那么需要遍歷容器內(nèi)的監(jiān)聽器了开镣,最終執(zhí)行nodeChanged方法咽扇。

? PathChildrenCache 子路徑緩存
考慮一下质欲,既然curator能在集群中使用嘶伟,那么舉個(gè)最簡(jiǎn)單的例子九昧,在集群中,增加或者減少服務(wù)癌别,需要及時(shí)發(fā)現(xiàn)才能防止繼續(xù)調(diào)用該服務(wù)展姐。那么在curator如何使用圾笨?當(dāng)然在集群中同等服務(wù)功能中每臺(tái)服務(wù)都是作為一個(gè)節(jié)點(diǎn)角色使用的擂达,那好胶滋,只要監(jiān)聽節(jié)點(diǎn)的變化例如節(jié)點(diǎn)移除镀钓,或者節(jié)點(diǎn)增加了丁溅。就能知道服務(wù)集群中的變更窟赏,那么節(jié)點(diǎn)該有哪些標(biāo)識(shí)涯穷,可以用服務(wù)器的ip和端口組成唯一標(biāo)識(shí)。
所以作煌,引申出來幾個(gè)概念粟誓,
1.需要監(jiān)聽的節(jié)點(diǎn)都是某個(gè)業(yè)務(wù)下parent的子節(jié)點(diǎn)children鹰服,2.針對(duì)添加子節(jié)點(diǎn)悲酷,任然必須在parent下變更设易;
在curator中引入了子節(jié)點(diǎn)管理的幾個(gè)事件
PathChildrenCacheEvent下的Type類型:

    public enum Type
    {
        /**
         * A child was added to the path
         */
        CHILD_ADDED,

        /**
         * A child's data was changed
         */
        CHILD_UPDATED,

        /**
         * A child was removed from the path
         */
        CHILD_REMOVED,
        //還有其他事件
}

例如子節(jié)點(diǎn)新增亡嫌,變更掘而,刪除等其他事件袍睡。在curator采用了大量的異步調(diào)用線程斑胜,并且在PathChildrenCache中通過推送事件方式通知節(jié)點(diǎn)狀態(tài)變更的止潘。
RefreshOperation 刷新事件凭戴,主要調(diào)用PathChildrenCache中refresh方法;
GetDataOperation 獲取節(jié)點(diǎn)數(shù)據(jù)事件者冤,主要調(diào)用getDataAndStat方法()涉枫,異步方式得到節(jié)點(diǎn)數(shù)據(jù)
EventOperation 推送事件愿汰,推送給記錄在事件容器中的監(jiān)聽器衬廷,發(fā)起childEvent方法
那么在PathChildrenCache 分為2種形式的基本路徑泵督,parentPath路徑小腊,和childPath路徑秩冈,在zk中入问,已經(jīng)提過,當(dāng)對(duì)parentPath進(jìn)行監(jiān)聽楣黍,如果parentPath新增節(jié)點(diǎn)租漂,就會(huì)觸發(fā)children事件哩治。所以PathChildrenCache也是利用了這點(diǎn)业筏。
在refresh方法中

    void refresh(final RefreshMode mode) throws Exception
    {
        ensurePath();

        final BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                if (PathChildrenCache.this.state.get().equals(State.CLOSED)) {
                    // This ship is closed, don't handle the callback
                    return;
                }
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    processChildren(event.getChildren(), mode);
                }
            }
        };

        client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path);
    }

對(duì)client添加了childrenWatcher蒜胖,然后內(nèi)部有個(gè)回調(diào)類callback翠勉,他接受的CuratorEventType類型肯定是CHILDREN事件对碌。在processChildren方法中朽们,

    private void processChildren(List<String> children, RefreshMode mode) throws Exception
    {
        Set<String> removedNodes = Sets.newHashSet(currentData.keySet());
        for ( String child : children ) {
            removedNodes.remove(ZKPaths.makePath(path, child));
        }

        for ( String fullPath : removedNodes )
        {
            remove(fullPath);
        }

        for ( String name : children )
        {
            String fullPath = ZKPaths.makePath(path, name);

            if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) )
            {
                getDataAndStat(fullPath);
            }

            updateInitialSet(name, NULL_CHILD_DATA);
        }
        maybeOfferInitializedEvent(initialSet.get());
    }

其中children 是當(dāng)前parent地下所有的子路徑的名字(不是完整的路徑)。與本地記錄的當(dāng)前數(shù)據(jù)苍糠,比較出需要移除的節(jié)點(diǎn)岳瞭,發(fā)送EventOperation事件中的子節(jié)點(diǎn)移除事件瞳筏。然后通過RefreshMode模式或者當(dāng)前currentData中沒有保護(hù)子節(jié)點(diǎn)的全路徑姚炕,那么需要獲取數(shù)據(jù)柱宦。
在getDataAndStat()方法中

    void getDataAndStat(final String fullPath) throws Exception
    {
        BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                applyNewData(fullPath, event.getResultCode(), event.getStat(), cacheData ? event.getData() : null);
            }
        };

        if ( USE_EXISTS && !cacheData )
        {
            client.checkExists().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
        }
        else
        {
            // always use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
            if ( dataIsCompressed && cacheData )
            {
                client.getData().decompressed().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
            }
            else
            {
                client.getData().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
            }
        }
    }

通過是否需要保持節(jié)點(diǎn)數(shù)據(jù)執(zhí)行相應(yīng)的方法摊沉,但是這里都有個(gè)共同的是说墨,對(duì)該子節(jié)點(diǎn)全路徑添加dataWatcher事件尼斧,那么該路徑的刪除或者變更棺棵,都會(huì)通知到dataWatcher事件中。
在PathChildrenCache中存在2中Watcher事件對(duì)象

    private volatile Watcher childrenWatcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));
        }
    };

    private volatile Watcher dataWatcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            try
            {
                if ( event.getType() == Event.EventType.NodeDeleted )
                {
                    remove(event.getPath());
                }
                else if ( event.getType() == Event.EventType.NodeDataChanged )
                {
                    offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));
                }
            }
            catch ( Exception e )
            {
                ThreadUtils.checkInterrupted(e);
                handleException(e);
            }
        }
    };

有針對(duì)parent路徑添加childrenWatcher事件,基本上子節(jié)點(diǎn)變更苹熏,都會(huì)觸發(fā)轨域,然后通過刷新事件干发,異步方式重新執(zhí)行refresh方法枉长。
由這對(duì)具體的child路徑田間dataWatcher事件必峰,主要是子節(jié)點(diǎn)路徑刪除自点,或者數(shù)據(jù)變更桂敛,做出相應(yīng)的移除事件或者獲取數(shù)據(jù)事件動(dòng)作术唬。
熟悉了PathChildrenPath的工作原理粗仓,那么在工作中如何整合借浊。首先我們要聲明自己的PathChildrenCacheListener 監(jiān)聽器實(shí)現(xiàn)類蚂斤,有了他曙蒸,才能知道節(jié)點(diǎn)的變更情況。 假設(shè)有個(gè)業(yè)務(wù)功能是多組服務(wù)器支撐提供肖油,需要保證他能可動(dòng)態(tài)調(diào)整服務(wù)器資源森枪。那么上游調(diào)用者就可以通過PathChildrenPath工具監(jiān)聽當(dāng)前提供服務(wù)器組有哪些凶朗,而不用實(shí)時(shí)關(guān)心棚愤,在發(fā)起調(diào)用時(shí),去判斷當(dāng)前存在的服務(wù)器信息了瘸洛。

locks包

在分布式系統(tǒng)中反肋,如果要使用公用某一資源時(shí)候石蔗,往往會(huì)申請(qǐng)一個(gè)分布式鎖养距。curator也提供了分布式鎖,利用了zk的特性耘纱。使用方式很簡(jiǎn)單:

    public void testDistributeLock() throws Exception {
        String host = "127.0.0.1:2181";
        String path = root + "/lock_test";
        CuratorFramework curator = CuratorClient.create(host);
        InterProcessMutex mutex = new InterProcessMutex(curator, path);
        if(mutex.acquire(10, TimeUnit.SECONDS)) {
            try {
                // 業(yè)務(wù)邏輯
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                mutex.release();
            }
        }
    }

path就是比作資源束析,鎖針對(duì)path就行資源獲取畸陡,然后執(zhí)行業(yè)務(wù)邏輯丁恭,最終都需要release鎖資源牲览。觀察一下InterProcessMutex工作原理第献。
首先通過acquire方法了解internalLock方法

    private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */

        Thread currentThread = Thread.currentThread();

        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }

        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }

為什么通過當(dāng)前線程得知鎖的狀態(tài)數(shù)據(jù)仔拟,加入從threadData拿到了lockData數(shù)據(jù)利花,說明在這個(gè)線程之前就已經(jīng)獲取鎖資源了炒事,如果重復(fù)獲取同一個(gè)鎖挠乳,那么只要記錄lockCount數(shù)量即可睡扬。當(dāng)前線程沒有保存的鎖資源威蕉,需要通過internals內(nèi)置鎖工具嘗試獲取鎖韧涨,最終得到一個(gè)lockPath,然后進(jìn)行封裝成LockData保存在threadData中宪哩,沒有返回路徑锁孟,說明獲取鎖失敗了。
看一下LockInternals 類attemptLock方法甜熔,是如何嘗試獲取鎖

    LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases)
    {
        this.driver = driver;
        this.lockName = lockName;
        this.maxLeases = maxLeases;

        this.client = client;
        this.basePath = PathUtils.validatePath(path);
        this.path = ZKPaths.makePath(path, lockName);
    }

在構(gòu)造器中腔稀,有兩個(gè)路徑焊虏,basePath基本路徑诵闭,還有path這個(gè)路徑是通過basePath與lockName(默認(rèn)名字為lock-)組合起來涂圆,說明這里的path是basePath的子節(jié)點(diǎn)路徑模狭。還有一個(gè)參數(shù) driver嚼鹉,默認(rèn)通過StandardLockInternalsDriver類實(shí)現(xiàn)的锚赤,該類主要負(fù)責(zé)創(chuàng)建路徑线脚,判斷是否能獲取鎖浑侥。看一下StandardLockInternalsDriver創(chuàng)建路徑代碼:

    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
    {
        String ourPath;
        if ( lockNodeBytes != null )
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection()
                  .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        }
        else
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection()
                  .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }

其中path路徑時(shí)基本路徑與lockName組合起來的最終路徑名稱寓落,但是創(chuàng)建的時(shí)候括丁,采用了EPHEMERAL_SEQUENTIAL模式得到的路徑,首先路徑是非永久狀態(tài)存儲(chǔ)的伶选,如果連接端口史飞,該ourPath就會(huì)刪除尖昏。然后還有特點(diǎn)是有序的,就是ourPath的路徑是path路徑與順序編號(hào)組合在一起的构资,并且是有序遞增編號(hào)的路徑,例如 /test/lock-000001,test/lock-000002蚯窥。每次創(chuàng)建都會(huì)增加編號(hào)掸鹅,而且不會(huì)重復(fù),這是zookeeper中一個(gè)特性拦赠。
再來熟悉LockInternals中的attemptLock方法巍沙。

    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        final long      startMillis = System.currentTimeMillis();
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int             retryCount = 0;

        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        while ( !isDone )
        {
            isDone = true;
            try {
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e ) {
                // gets thrown by StandardLockInternalsDriver when it can't find the lock node
                // this can happen when the session expires, etc. So, if the retry allows, just try it all again
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) {
                    isDone = false;
                }
                else  {
                    throw e;
                }
            }
        }

        if ( hasTheLock ) {
            return ourPath;
        }

        return null;
    }

為什么在一個(gè)循環(huán)體內(nèi),是為了容錯(cuò)荷鼠,在創(chuàng)建ourPath失敗時(shí)句携,進(jìn)行重復(fù)嘗試。通過driver創(chuàng)建了一個(gè)非持久的并且有序編號(hào)的ourPath路徑允乐,那么考慮一下矮嫉,因?yàn)槁窂綍r(shí)的編號(hào)是遞增的,那么編號(hào)越小牍疏,那么他獲得鎖的概率應(yīng)該是最大的蠢笋,因?yàn)樗亲钤鐒?chuàng)建路徑,也就分配的編號(hào)小了鳞陨。當(dāng)定義完這個(gè)獲取鎖的規(guī)則后昨寞,后續(xù)就方便很多了。
在internalLockLoop方法中厦滤,如何與等待時(shí)間相結(jié)合援岩,當(dāng)獲得鎖后,就可以成功呢掏导?

    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try {
            if ( revocable.get() != null ) {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) {
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() ) {
                    haveTheLock = true;
                }
                else {
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this) {
                        try  {
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null ) {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 ) {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }
                                wait(millisToWait);
                            }
                            else {
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e )  {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

在while循環(huán)體內(nèi)享怀,獲取了所有basePath下的children節(jié)點(diǎn)名稱,并且進(jìn)行從小到大編號(hào)排序趟咆。然后通過driver中的getsTheLock方法得到我節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)名稱添瓷,如果不存在,說明我的節(jié)點(diǎn)是第一個(gè)忍啸,那么我就能獲取鎖仰坦。
如果存在前一個(gè)節(jié)點(diǎn),構(gòu)建完整的路徑previousSequencePath计雌,并對(duì)該路徑進(jìn)行監(jiān)聽悄晃,增加watcher事件。為什么要這么?還是那個(gè)問題妈橄,編號(hào)是有序遞增的庶近,只有當(dāng)我前一個(gè)節(jié)點(diǎn)釋放鎖了,下一個(gè)是我,我就能得到鎖。那么前一個(gè)節(jié)點(diǎn)如何釋放鎖册烈,可以主動(dòng)刪除節(jié)點(diǎn)砚婆,或者掉線系統(tǒng)自動(dòng)刪除潮模。在對(duì)previousSequencePath添加watcher事件后,進(jìn)入等待,那么當(dāng)前線程等待時(shí)通過誰來喚醒呢?當(dāng)然是通過watcher來喚醒投队,通過調(diào)用notifyAll方式喚醒線程,然后重新執(zhí)行循環(huán)爵川,知道超時(shí)敷鸦,或者得到鎖。這里有個(gè)需要考慮寝贡,在超時(shí)時(shí)扒披,將doDelete標(biāo)記為刪除,然后再finally方法中通過這個(gè)狀態(tài)去刪除ourPath節(jié)點(diǎn)圃泡,為什么要這樣呢碟案?因?yàn)槌瑫r(shí)情況下,認(rèn)定是沒有獲取鎖洞焙,但是路徑我已經(jīng)創(chuàng)建了蟆淀,如果不去主動(dòng)刪除,那么他會(huì)一直占用澡匪,在ourPath后面的路徑就會(huì)一直等著他主動(dòng)刪除。在考慮一下褒链,這里為什么會(huì)存在 KeeperException.NoNodeException異常呢唁情?因?yàn)樵趯?duì)previousSequencePath進(jìn)行監(jiān)聽時(shí),假設(shè)這個(gè)鎖剛好釋放了甫匹,已經(jīng)刪除了previousSequencePath路徑甸鸟,那么當(dāng)前去監(jiān)聽時(shí),路徑就會(huì)不存在兵迅,然后會(huì)拋出節(jié)點(diǎn)不存在的異常抢韭。
這是一個(gè)完整的獲取鎖的流程,也很嚴(yán)謹(jǐn)?shù)奶幚砀鞣N出現(xiàn)異常時(shí)的邏輯恍箭。當(dāng)然獲取鎖刻恭,用完就要進(jìn)行釋放。
在Mutex中的release方法中

    public void release() throws Exception
    {        
        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData == null )   {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }

        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 ) {
            return;
        }
        if ( newLockCount < 0 ) {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try {
            internals.releaseLock(lockData.lockPath);
        }
        finally
        {
            threadData.remove(currentThread);
        }
    }

首先釋放時(shí),從LockData中對(duì)lockCount進(jìn)行扣減鳍贾,如果任然大于0鞍匾,就return了,表示該鎖沒有用完骑科。最終通過internals執(zhí)行releaseLock方法橡淑,然后移除掉threadData中的currentThread數(shù)據(jù),在LockInternal中釋放就很簡(jiǎn)單了咆爽,就是調(diào)用刪除路徑功能梁棠,達(dá)到釋放資源效果。那么監(jiān)聽此時(shí)的lockPath時(shí)斗埂,就能監(jiān)聽到刪除事件掰茶,就會(huì)獲取鎖。
在java并發(fā)包中存在讀寫鎖蜜笤,那么在curator中也存在這樣的讀寫分布式鎖-InterProcessReadWriteLock濒蒋。
大致與InterProcessMutex思想一致的,但是內(nèi)部有兩個(gè)InterProcessMutex組成把兔,一個(gè)是readMutex沪伙,一個(gè)是writeMutex。與Java中的ReentrantReadWriteLock一樣县好,如果當(dāng)前是read獲取到了資源围橡,那么另外一個(gè)read線程也能獲取資源,都是read資源是不進(jìn)行互斥的缕贡,但是如果有write資源翁授,那么就會(huì)互斥。寫與寫資源也是存在互斥的晾咪。所以InterProcessReadWriteLock是如何實(shí)現(xiàn)功能的收擦?
無論read,write鎖谍倦,他們的basePath肯定是一致的塞赂,而且在zk中,創(chuàng)建子節(jié)點(diǎn)為序列化的時(shí)候昼蛀,不會(huì)因?yàn)樽庸?jié)點(diǎn)的名稱不一樣宴猾,編號(hào)會(huì)重置。而且同等對(duì)待叼旋,編號(hào)永遠(yuǎn)是有序遞增的仇哆。那么好了,writeMutex就是互斥鎖夫植,與什么請(qǐng)求資源無關(guān)讹剔,readMutex只要判斷在之前的節(jié)點(diǎn)中存在write路徑,那么就需要等待。那么怎么判斷呢辟拷。在StandardLockInternalsDriver中有個(gè)getsTheLock方法撞羽,該方法返回的PredicateResults結(jié)果才能知道是否能拿到鎖,或者對(duì)前一個(gè)路徑進(jìn)行監(jiān)聽衫冻。
在readMutex鎖重新了該方法

    private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception
    {
        if ( writeMutex.isOwnedByCurrentThread() )
        {
            return new PredicateResults(null, true);
        }

        int         index = 0;
        int         firstWriteIndex = Integer.MAX_VALUE;
        int         ourIndex = -1;
        for ( String node : children )
        {
            if ( node.contains(WRITE_LOCK_NAME) )
            {
                firstWriteIndex = Math.min(index, firstWriteIndex);
            }
            else if ( node.startsWith(sequenceNodeName) )
            {
                ourIndex = index;
                break;
            }

            ++index;
        }

        StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);

        boolean     getsTheLock = (ourIndex < firstWriteIndex);
        String      pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
        return new PredicateResults(pathToWatch, getsTheLock);
    }

首先判斷是否是重復(fù)鎖诀紊,然后查詢第一個(gè)firstWriteIndex寫路徑的位置,與ourIndex自己的位置隅俘,進(jìn)行比較邻奠。如果ourIndex小,那么就可以獲得鎖了为居,如果大碌宴,那么需要監(jiān)聽firstWriteIndex的對(duì)應(yīng)的路徑了。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末蒙畴,一起剝皮案震驚了整個(gè)濱河市贰镣,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌膳凝,老刑警劉巖碑隆,帶你破解...
    沈念sama閱讀 219,039評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異蹬音,居然都是意外死亡上煤,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門著淆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來劫狠,“玉大人,你說我怎么就攤上這事永部《琅ⅲ” “怎么了?”我有些...
    開封第一講書人閱讀 165,417評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵扬舒,是天一觀的道長(zhǎng)阐肤。 經(jīng)常有香客問我,道長(zhǎng)讲坎,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,868評(píng)論 1 295
  • 正文 為了忘掉前任愧薛,我火速辦了婚禮晨炕,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘毫炉。我一直安慰自己瓮栗,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著费奸,像睡著了一般弥激。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上愿阐,一...
    開封第一講書人閱讀 51,692評(píng)論 1 305
  • 那天微服,我揣著相機(jī)與錄音,去河邊找鬼缨历。 笑死以蕴,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的辛孵。 我是一名探鬼主播丛肮,決...
    沈念sama閱讀 40,416評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼魄缚!你這毒婦竟也來了宝与?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,326評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤冶匹,失蹤者是張志新(化名)和其女友劉穎习劫,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體徙硅,經(jīng)...
    沈念sama閱讀 45,782評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡榜聂,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評(píng)論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了嗓蘑。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片须肆。...
    茶點(diǎn)故事閱讀 40,102評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖桩皿,靈堂內(nèi)的尸體忽然破棺而出豌汇,到底是詐尸還是另有隱情,我是刑警寧澤泄隔,帶...
    沈念sama閱讀 35,790評(píng)論 5 346
  • 正文 年R本政府宣布拒贱,位于F島的核電站,受9級(jí)特大地震影響佛嬉,放射性物質(zhì)發(fā)生泄漏逻澳。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評(píng)論 3 331
  • 文/蒙蒙 一暖呕、第九天 我趴在偏房一處隱蔽的房頂上張望斜做。 院中可真熱鬧,春花似錦湾揽、人聲如沸瓤逼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽霸旗。三九已至贷帮,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間诱告,已是汗流浹背撵枢。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評(píng)論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蔬啡,地道東北人诲侮。 一個(gè)月前我還...
    沈念sama閱讀 48,332評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像箱蟆,于是被迫代替她去往敵國和親沟绪。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容