zookeeper源碼分析(9)-Curator相關介紹

zookeeper常用的Java客戶端有三種:zookeeper原生的澎蛛、Apache Curator、開源的zkclient躏鱼。Curator官網(wǎng)上這么說

image

一般生產(chǎn)環(huán)境我們使用curator氮采,它主要解決了三類問題:
1.封裝ZooKeeper client與ZooKeeper server之間的連接處理,當會話超時時可自動重連染苛。
2.提供了一套流式風格的操作API
3.提供ZooKeeper各種分布式協(xié)調(diào)應用場景(recipe, 比如leader選舉鹊漠,分布式鎖主到,分布式緩存等
)的抽象封裝。

本文基于3.1.0版本結(jié)合curator的使用簡要介紹curator的啟動加載,會話管理躯概,通知方式recipe功能的實現(xiàn)登钥。

curator的用法

curator組件如下:


其中curator-recipes是建立在Curator Framework之上實現(xiàn)的,提供了zookeeper分布式協(xié)調(diào)相關的技巧娶靡,大多時候我們只需要依賴這一個jar包即可牧牢。

啟動加載

//1.初始化過程
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectionString)
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(connectionTimeoutMs)
                .sessionTimeoutMs(sessionTimeoutMs)
                // etc
                .build();
//2.啟動過程
client.start();

可分為初始化過程和啟動過程。
初始化過程
通過以上builder模式即可創(chuàng)建一個CuratorFrameworkImpl客戶端實例姿锭,初始化方法主要為:

public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
    {
//定義創(chuàng)建原生客戶端實例zookeeper的工廠方法
        ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
//zookeeper的包裝類塔鳍,可處理curator較低層次的會話保持和同步請求等
        this.client = new CuratorZookeeperClient
            (
                localZookeeperFactory,
                builder.getEnsembleProvider(),
                builder.getSessionTimeoutMs(),
                builder.getConnectionTimeoutMs(),
                new Watcher()
                {
                    @Override
                    public void process(WatchedEvent watchedEvent)
                    {
                        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
                        processEvent(event);
                    }
                },
                builder.getRetryPolicy(),
                builder.canBeReadOnly(),
                builder.getConnectionHandlingPolicy()
            );
//用于判斷連接斷開和連接超時的狀態(tài),設置curator的連接狀態(tài)呻此,并通過connectionStateManager觸發(fā)連接事件狀態(tài)通知
        internalConnectionHandler = isClassic ? new ClassicInternalConnectionHandler() : new StandardInternalConnectionHandler();
//接收事件的通知轮纫。后臺線程操作事件和連接狀態(tài)事件會觸發(fā)
        listeners = new ListenerContainer<CuratorListener>();
//當后臺線程發(fā)生異常或者handler發(fā)生異常的時候會觸發(fā)
        unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
//后臺線程執(zhí)行的操作隊列
        backgroundOperations = new DelayQueue<OperationAndData<?>>();
//命名空間
        namespace = new NamespaceImpl(this, builder.getNamespace());
    //線程工廠方法焚鲜,初始化后臺線程池時會使用
        threadFactory = getThreadFactory(builder);
//負責連接狀態(tài)變化時的通知
        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent());
//CuratorFrameworkImpl的狀態(tài)蜡感,調(diào)用start方法之前為 LATENT,調(diào)用start方法之后為 STARTED ,調(diào)用close()方法之后為STOPPED
        state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
//錯誤連接策略
        connectionStateErrorPolicy = Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(), "errorPolicy cannot be null");
//有保障的執(zhí)行刪除操作恃泪,其實是不斷嘗試直到刪除成功郑兴,通過遞歸調(diào)用實現(xiàn)
        failedDeleteManager = new FailedDeleteManager(this);
//有保障的執(zhí)行刪除watch操作
        failedRemoveWatcherManager = new FailedRemoveWatchManager(this);
//服務端可用節(jié)點的檢測器,第一次連接和重連成功之后都會觸發(fā)重新獲取服務端列表
        ensembleTracker = new EnsembleTracker(this, builder.getEnsembleProvider());
    }

可以看出贝乎,主要初始化了zookeeper客戶端包裝實例CuratorZookeeperClient情连,與后臺操作,連接事件览效,異常相關的listener容器却舀,命名空間和負載均衡等。這些都是與curator 功能密切相關的實現(xiàn)锤灿。這里具體看下CuratorZookeeperClientConnectionStateManager的初始化過程挽拔。
CuratorZookeeperClient初始化

public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
    {
//3.0.0版本后默認為StandardInternalConnectionHandler,之前session Expired是由服務端告知才會觸發(fā)Expired事件但校,
//StandardConnectionHandler當收到Disconnect事件后螃诅,如果在規(guī)定時間內(nèi)沒有重連到服務器,則會主動觸發(fā)Expired事件
        this.connectionHandlingPolicy = connectionHandlingPolicy;
        if ( sessionTimeoutMs < connectionTimeoutMs )
        {
            log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
        }
//重新嘗試連接的策略
        retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
        ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");
        this.connectionTimeoutMs = connectionTimeoutMs;
//curator注冊到原生客戶端上的defaultWatcher,會收到和連接狀態(tài)有關的事件通知等状囱,負責超時重連
        state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);
        setRetryPolicy(retryPolicy);
    }

可以看出他主要負責了連接的創(chuàng)建和保證連接正常术裸,此外如果直接同步調(diào)用客戶端與服務端操作,他也根據(jù)retryPolicy負責同步操作時候的連接保證亭枷。ConnectionState是注冊到原生客戶端上的defaultWatcher.

ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
    {
        this.ensembleProvider = ensembleProvider;
        this.sessionTimeoutMs = sessionTimeoutMs;
        this.connectionTimeoutMs = connectionTimeoutMs;
        this.tracer = tracer;
        this.connectionHandlingPolicy = connectionHandlingPolicy;
        if ( parentWatcher != null )
        {
//因為defaultWatcher只能有一個袭艺,通過parentWatchers可實現(xiàn)defaultWatcher接到事件通知時parentWatchers的回調(diào)
            parentWatchers.offer(parentWatcher);
        }

        zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
    }

ConnectionStateManager初始化
它主要負責curator相關連接狀態(tài)的處理和通知,如果我們想要監(jiān)聽連接狀態(tài)的改變叨粘,就需要向它的listeners上注冊一個ConnectionStateListener猾编。

//連接狀態(tài)事件通知隊列
private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
//需要通知的listeners
    private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
//ConnectionStateManager的運行狀態(tài)
 private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);

public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent)
    {
        this.client = client;
        this.sessionTimeoutMs = sessionTimeoutMs;
        this.sessionExpirationPercent = sessionExpirationPercent;
        if ( threadFactory == null )
        {
            threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
        }
//事件隊列處理線程池
        service = Executors.newSingleThreadExecutor(threadFactory);
    }

啟動過程
建立與服務端的會話連接和相關功能的啟動
CuratorFrameworkImpl.start

public void start()
    {
        log.info("Starting");
        if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
        {
            throw new IllegalStateException("Cannot be started more than once");
        }

        try
        {
//啟動connectionStateManager
            connectionStateManager.start(); // ordering dependency - must be called before client.start()

            final ConnectionStateListener listener = new ConnectionStateListener()
            {
                @Override
                public void stateChanged(CuratorFramework client, ConnectionState newState)
                {
                    if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
                    {
                        logAsErrorConnectionErrors.set(true);
                    }
                }
            };

            this.getConnectionStateListenable().addListener(listener);
//建立與服務端的連接
            client.start();

            executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    backgroundOperationsLoop();
                    return null;
                }
            });

            ensembleTracker.start();
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            handleBackgroundOperationException(null, e);
        }
    }

可以看出分別啟動ConnectionStateManager和CuratorZookeeperClient瘤睹。啟動ConnectionStateManager可做好連接事件通知的準備,啟動CuratorZookeeperClient建立與服務端的會話連接答倡。

會話管理

curator的會話管理是在原生客戶端的會話管理基礎上包裝而來默蚌,在上面的啟動過程中我們介紹到ConnectionState會負責超時的重連,ConnectionStateManager會負責連接狀態(tài)的改變和通知苇羡, connectionHandlingPolicy會負責連接超時的主動觸發(fā)绸吸。此外,在客戶端執(zhí)行一些操作時如果感知到連接斷開设江,也可以主動進行連接重連锦茁。下面會介紹下curator如何在原生客戶端的會話管理基礎上進行會話狀態(tài)的通知會話超時的重連
我們知道會話連接狀態(tài)相關的事件類型為Watcher.Event.EventType.None叉存,會通知到客戶端上所有的Watcher码俩,ConnectionState作為defaultWatcher,它的事件回調(diào)如下:

 public void process(WatchedEvent event)
    {
        if ( LOG_EVENTS )
        {
            log.debug("ConnectState watcher: " + event);
        }

        if ( event.getType() == Watcher.Event.EventType.None )
        {
          //isConnected:客戶當前的連接狀態(tài),true表示已連接(SyncConnected和ConnectedReadOnly狀態(tài))
            boolean wasConnected = isConnected.get();
            boolean newIsConnected = checkState(event.getState(), wasConnected);
            if ( newIsConnected != wasConnected )
            {
//如果連接狀態(tài)發(fā)生改變歼捏,則更新
                isConnected.set(newIsConnected);
                connectionStartMs = System.currentTimeMillis();
                if ( newIsConnected )
                {
//說明是重連稿存,更新會話超時協(xié)商時間
                    lastNegotiatedSessionTimeoutMs.set(zooKeeper.getNegotiatedSessionTimeoutMs());
                    log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get());
                }
            }
        }
//通知parentWatchers,注意初始化的時候其實傳入了一個parentWatcher,會調(diào)用CuratorFrameworkImpl.processEvent
        for ( Watcher parentWatcher : parentWatchers )
        {
            TimeTrace timeTrace = new TimeTrace("connection-state-parent-process", tracer.get());
            parentWatcher.process(event);
            timeTrace.commit();
        }
    }

可以看到,對連接狀態(tài)事件的處理主要是checkState方法

private boolean checkState(Event.KeeperState state, boolean wasConnected)
    {
        boolean isConnected = wasConnected;
        boolean checkNewConnectionString = true;
        switch ( state )
        {
        default:
        case Disconnected:
        {
            isConnected = false;
            break;
        }

        case SyncConnected:
        case ConnectedReadOnly:
        {
            isConnected = true;
            break;
        }

        case AuthFailed:
        {
            isConnected = false;
            log.error("Authentication failed");
            break;
        }

        case Expired:
        {
            isConnected = false;
            checkNewConnectionString = false;
            handleExpiredSession();
            break;
        }

        case SaslAuthenticated:
        {
            // NOP
            break;
        }
        }

        if ( checkNewConnectionString )
        {
//如果服務端列表發(fā)生變化瞳秽,則更新
            String newConnectionString = zooKeeper.getNewConnectionString();
            if ( newConnectionString != null )
            {
                handleNewConnectionString(newConnectionString);
            }
        }

        return isConnected;
    }

可以看到會根據(jù)不同的會話狀態(tài)判斷連接是否正常瓣履,isConnected = true表示正常。當會話超時過期Expired時练俐,會調(diào)用handleExpiredSession進行reset操作袖迎,也就是連接的關閉和重新建立新的會話連接。即會話超時的被動重連腺晾。在連接過程中燕锥,會根據(jù)客戶端設置的連接重試機制retryPolicy檢測重連是否超時。

  1. parentWatchers的回調(diào)
    其實在CuratorFramework client初始化時悯蝉,會初始化一個watcher添加到ConnectionStateparentWatcher中归形,負責連接狀態(tài)改變時的會話狀態(tài)改變。
//初始化的parentWatcher
new Watcher()
                {
                    @Override
                    public void process(WatchedEvent watchedEvent)
                    {
                        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
                        processEvent(event);
                    }
                }

會話狀態(tài)改變時調(diào)用上面watcher的process方法鼻由,調(diào)用至CuratorFrameworkImpl.processEvent

private void processEvent(final CuratorEvent curatorEvent)
    {
        if ( curatorEvent.getType() == CuratorEventType.WATCHED )
        {
//狀態(tài)轉(zhuǎn)換
            validateConnection(curatorEvent.getWatchedEvent().getState());
        }
//通知所有注冊的CuratorListener 
        listeners.forEach(new Function<CuratorListener, Void>()
        {
            @Override
            public Void apply(CuratorListener listener)
            {
                try
                {
                    TimeTrace trace = client.startTracer("EventListener");
                    listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
                    trace.commit();
                }
                catch ( Exception e )
                {
                    ThreadUtils.checkInterrupted(e);
                    logError("Event listener threw exception", e);
                }
                return null;
            }
        });
    }

其中暇榴,validateConnection負責連接狀態(tài)的轉(zhuǎn)換

void validateConnection(Watcher.Event.KeeperState state)
    {
        if ( state == Watcher.Event.KeeperState.Disconnected )
        {
            internalConnectionHandler.suspendConnection(this);
        }
        else if ( state == Watcher.Event.KeeperState.Expired )
        {
            connectionStateManager.addStateChange(ConnectionState.LOST);
        }
        else if ( state == Watcher.Event.KeeperState.SyncConnected )
        {
            internalConnectionHandler.checkNewConnection(this);
            connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
        }
        else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
        {
            internalConnectionHandler.checkNewConnection(this);
            connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
        }
    }

可以看出ConnectionStateManager負責curator連接狀態(tài)的更新。原生客戶端的連接狀態(tài)和curator包裝的連接狀態(tài)對應關系如下:


同時嗡靡,ConnectionStateManager會將當前狀態(tài)ConnectionState放入自身的事件隊列中跺撼,通知所有注冊到自身listenersConnectionStateListener

此外窟感,當相關操作(包括同步和后臺線程的操作讨彼,如getData)發(fā)現(xiàn)連接斷開了,也會調(diào)用client.getZooKeeper()重連柿祈,(注意底層建立客戶端連接是加鎖的哈误,保證一個客戶端只有一個線程可以創(chuàng)建會話成功)哩至。如CuratorFrameworkImpl的后臺線程任務:

 void performBackgroundOperation(OperationAndData<?> operationAndData)
    {
        try
        {
            if ( !operationAndData.isConnectionRequired() || client.isConnected() )
            {
                operationAndData.callPerformBackgroundOperation();
            }
            else
            {
                client.getZooKeeper();  // important - allow connection resets, timeouts, etc. to occur
                if ( operationAndData.getElapsedTimeMs() >= client.getConnectionTimeoutMs() )
                {
                    throw new CuratorConnectionLossException();
                }
                operationAndData.sleepFor(1, TimeUnit.SECONDS);
                queueOperation(operationAndData);
            }
        }
        catch ( Throwable e )
        {
            ThreadUtils.checkInterrupted(e);

            /**
             * Fix edge case reported as CURATOR-52. ConnectionState.checkTimeouts() throws KeeperException.ConnectionLossException
             * when the initial (or previously failed) connection cannot be re-established. This needs to be run through the retry policy
             * and callbacks need to get invoked, etc.
             */
            if ( e instanceof CuratorConnectionLossException )
            {
                WatchedEvent watchedEvent = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null);
                CuratorEvent event = new CuratorEventImpl(this, CuratorEventType.WATCHED, KeeperException.Code.CONNECTIONLOSS.intValue(), null, null, operationAndData.getContext(), null, null, null, watchedEvent, null, null);
                if ( checkBackgroundRetry(operationAndData, event) )
                {
                    queueOperation(operationAndData);
                }
                else
                {
                    logError("Background retry gave up", e);
                }
            }
            else
            {
                handleBackgroundOperationException(operationAndData, e);
            }
        }
    }

通知機制

通知機制其實就是在事件發(fā)生的地方觸發(fā)已經(jīng)注冊好的listerner相應的回調(diào)函數(shù)(觀察者模式)。CuratorFrameworkImpl client可注冊listener的方式有:

  • 一次性watch
client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground().forPath(path);

同原生客戶端的watch蜜自,只能生效一次菩貌,需要反復注冊。

  • 注冊CuratorListener
// this is one method of getting event/async notifications
        CuratorListener listener = new CuratorListener() {
            @Override
            public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                // examine event for details
            }
        };
        client.getCuratorListenable().addListener(listener);

其實是將listener添加到CuratorFrameworkImpl.listeners中重荠。當后臺線程完成操作會觸發(fā)相應的事件通知該listener箭阶,如異步創(chuàng)建路徑會觸發(fā)CuratorEventType.CREATE事件。此外當連接狀態(tài)事件觸發(fā)時戈鲁,parentWatcher也會回調(diào)這些listeners.

  • 注冊ConnectionStateListener
ConnectionStateListener connectionStateListener = new ConnectionStateListener()
    {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState)
        {
          //Some details
        }
    };
client.getConnectionStateListenable().addListener(connectionStateListener);

其實是將connectionStateListener添加到connectionStateManager.listeners中仇参,在連接狀態(tài)發(fā)生改變時,會收到通知婆殿。

  • 注冊UnhandledErrorListener
UnhandledErrorListener unhandledErrorListener = new UnhandledErrorListener() {
            @Override
            public void unhandledError(String message, Throwable e) {
                //
            }
        };
        client.getUnhandledErrorListenable().addListener(unhandledErrorListener);

其實是將unhandledErrorListener添加到CuratorFrameworkImpl.unhandledErrorListeners中诈乒,當后臺線程操作發(fā)生異常或者handler發(fā)生異常的時候會觸發(fā)婆芦,收到通知怕磨。

  • 后臺線程操作完成時的回調(diào)
    public static void setDataAsyncWithCallback(CuratorFramework client, BackgroundCallback callback, String path, byte[] payload) throws Exception {
        // this is another method of getting notification of an async completion
        client.setData().inBackground(callback).forPath(path, payload);
    }

對于不同的操作,如setData,可通過鏈式調(diào)用inBackground(callback)傳入回調(diào)函數(shù)callback消约,這樣當操作完成后肠鲫,會執(zhí)行回調(diào)函數(shù)。

  • 緩存機制或粮,多次注冊
    curator提供了三種類型的緩存方式:Path Cache,Node Cache 和Tree Cache滩届。相當于和服務端的節(jié)點數(shù)據(jù)進行對比,沒當數(shù)據(jù)不一致時被啼,會通過watch機制觸發(fā)帜消,后臺回調(diào)更新本地緩存數(shù)據(jù),同時再次注冊相應的watch浓体。同時泡挺,每次重連成功之后也會重新注冊watch,保證了watch不丟失命浴。

結(jié)合上面會話管理和通知機制的介紹娄猫,可以知道原生客戶端的watcher是同步通知的,當然可以在特定watcher中的處理做異步生闲。connectionStateManager.listeners是由內(nèi)部的線程池做異步通知的媳溺,CuratorFrameworkImpl.listeners對于連接狀態(tài)的通知是與watcher通知線程同步,由后臺線程通知時為異步碍讯。如果客戶端watcher注冊過多悬蔽,那么可能就會導致重連之后watch丟失(重連會清空sendThread的發(fā)送和接收隊列,可能會導致watch丟失)捉兴,甚至重連不成功(本文分析的版本3.1.0中只要調(diào)用client.getZooKeeper()就會重連蝎困,和當時的連接狀態(tài)無關录语。所以我覺得看版本,如參考資料二)禾乘。

recipes功能

curator實現(xiàn)的recipes功能主要有:分布式鎖澎埠,Leader選舉Barriers始藕,計數(shù)器蒲稳,緩存隊列伍派,事務等弟塞。對于隊列和事務,我們可以使用其他中間件拙已,如kafka,TCC-Transaction等解決决记,這里不做介紹。

分布式鎖

類似于Java的j.u.c包中的鎖倍踪,recipes提供了分布式協(xié)調(diào)下(不同JVM)的互斥鎖(可重入/不可重入)系宫,可重入讀寫鎖,信號量和多鎖對象建车。

  • 可重入互斥鎖
    InterProcessMutex類實現(xiàn)扩借。基于臨時順序節(jié)點實現(xiàn)缤至,獲取鎖規(guī)則為:
    1.當前存在的創(chuàng)建時間最早的臨時節(jié)點獲得鎖權限潮罪,再次進入可重新獲得鎖,內(nèi)部維持了一個ConcurrentMap<Thread, LockData> threadData記錄鎖的重入次數(shù)领斥。
    2.不是第一個節(jié)點的每個臨時順序節(jié)點都在前一個臨時順序節(jié)點上注冊節(jié)點watch
    當前一個節(jié)點被刪除時嫉到,后一個節(jié)點會重新根據(jù)鎖規(guī)則競爭鎖。

  • 不可重入互斥鎖
    InterProcessSemaphoreMutex實現(xiàn)月洛,相比InterProcessMutex只是少了重入功能何恶,實現(xiàn)原理是在互斥鎖InterProcessMutex的基礎上構(gòu)造一個租約,由InterProcessSemaphoreV2類實現(xiàn)(下文的信號量)嚼黔。每次只允許客戶端獲得一個租約细层,重入InterProcessSemaphoreMutex這個鎖就因為租約不夠而無法獲得。

  • 可重入讀寫鎖
    類似JDK的ReentrantReadWriteLock唬涧,讀寫鎖擁有一個可重入讀鎖和可重入寫鎖疫赎。讀讀操作不互斥,涉及寫操作和其他讀寫操作都互斥碎节,寫鎖可降級為讀鎖捧搞。在使用寫鎖時,應該先獲取讀鎖,再釋放寫鎖实牡。由InterProcessReadWriteLock類和InternalInterProcessMutex類實現(xiàn)陌僵。
    InterProcessReadWriteLock類封裝了兩個由InternalInterProcessMutex類實現(xiàn)的讀鎖readMutex和寫鎖writeMutex轴合。它們是在可重入互斥鎖InterProcessMutex的基礎上創(chuàng)建的创坞。
    客戶端在獲取讀鎖或?qū)戞i時均在相同的父路徑下創(chuàng)建臨時順序節(jié)點。獲取讀鎖時受葛,如果是寫鎖線程或前面節(jié)點沒有寫鎖臨時節(jié)點题涨,可直接獲取讀鎖。否則讀鎖客戶端watch在前面節(jié)點最小的寫鎖上面总滩,直到前面沒有寫鎖節(jié)點時獲取鎖纲堵。獲取寫鎖時,如果前面沒有臨時順序節(jié)點,則直接獲得闰渔,否則寫鎖客戶端watch在前面一個臨時順序節(jié)點上席函,直到成為第一個臨時順序節(jié)點時獲取到寫鎖。

  • 信號量
    由InterProcessSemaphoreV2實現(xiàn)冈涧,可指定租約數(shù)茂附。在獲取租約時,先獲取由其維護的互斥鎖督弓,如果租約數(shù)量足夠(也就是與租約有關的臨時順序節(jié)點數(shù)目不到租約數(shù))营曼,可獲得租約,即獲取信號量愚隧,然后釋放互斥鎖進行資源的操作蒂阱,可保證其他線程可繼續(xù)獲取互斥鎖,然后獲取剩余的租約狂塘。同理录煤,歸還租約時,只需刪除與租約有關的臨時順序節(jié)點即可荞胡。

  • 多鎖對象
    InterProcessMultiLock實現(xiàn)辐赞,內(nèi)部維護了一個互斥鎖(可為重入或非重入)列表locks,只有同時獲得列表中所有的互斥鎖時硝训,才保證了獲取到了這個多鎖對象响委。釋放多鎖對象時同樣需要釋放列表中的每個互斥鎖。

需要注意的是窖梁,這些鎖內(nèi)部沒有使用ConnectionStateListener監(jiān)聽連接狀態(tài)赘风,也就是說當擁有鎖的客戶端會話過期服務端刪掉其臨時節(jié)點了,擁有鎖的客戶端并不能及時感知到這種變化纵刘。

Leader選舉

分布式場景下邀窃,通常選出一個leader負責任務的分派,數(shù)據(jù)的寫入等。此外瞬捕,當leader意外宕機鞍历,新的leader要被選舉出來。recipes提供了兩種選舉方式:Leader latchLeaderSelector肪虎。
Leader latch
客戶端分別在相同的path下創(chuàng)建臨時順序節(jié)點劣砍,選舉規(guī)則為:
1.當前存在的創(chuàng)建時間最早的臨時節(jié)點獲得leader權限。
2.不是第一個節(jié)點的每個臨時順序節(jié)點都在前一個臨時順序節(jié)點上注冊節(jié)點watch
當前一個節(jié)點被刪除時扇救,后一個節(jié)點會重新根據(jù)選舉規(guī)則進行選舉刑枝。

此外,每個客戶端都會注冊一個ConnectionStateListener監(jiān)聽連接狀態(tài)迅腔。當連接異常時装畅,會根據(jù)當前連接策略決定是否釋放leader權限。重新連接上之后如果原來的leader喪失了權限沧烈,會刪除原來節(jié)點并創(chuàng)建新的節(jié)點掠兄,重新參與選舉。
LeaderSelector
基于互斥鎖InterProcessMutex實現(xiàn)的锌雀,獲取鎖即擁有l(wèi)eader權限蚂夕,用完了會釋放鎖。如果連接異常汤锨,會中斷任務執(zhí)行線程双抽。與LeaderLatch相比, 通過LeaderSelectorListener可以對領導權進行控制闲礼, 用完了就釋放領導權牍汹,這樣每個節(jié)點都有可能獲得領導權。 而LeaderLatch只有主動調(diào)用close方法才會釋放領導權柬泽。

Barriers

分布式柵欄會使等待在相同節(jié)點路徑上一批線程阻塞慎菲,直到某個條件滿足時,才允許他們繼續(xù)運行锨并。類似于j.u.c包中的CountDownlatch與CyclicBarrier露该,分為DistributedBarrierDistributedDoubleBarrier兩類Barriers。前者線程間可使用一次等待第煮,后者可重復使用解幼。
DistributedBarrier
1.調(diào)用setBarrier創(chuàng)建barrierPath持久節(jié)點
2.等待線程調(diào)用waitOnBarrier分別在相同的barrierPath下注冊節(jié)點watch,然后阻塞
3.當調(diào)用釋放柵欄的removeBarrier方法時,會刪掉持久節(jié)點包警,等待線程繼續(xù)運行
DistributedDoubleBarrier
允許一組固定數(shù)量的分布式進程撵摆,相互等待。直到最后一個進程到達害晦,才允許所有進程繼續(xù)運行特铝。同時離開的時候,也需相互等待,直到最后一個進程要離開鲫剿,才允許所有進程繼續(xù)運行鳄逾。
在分析之前先知道下文中watch的作用:
在節(jié)點上注冊的watch主要做的事情是:1.當節(jié)點被刪除或創(chuàng)建時锰镀,喚醒注冊watch的線程挟纱。2。當當前線程所在客戶端連接斷開時抚笔,喚醒注冊watch的線程笆呆。
1.調(diào)用enter方法等待線程在barrierPath下創(chuàng)建臨時節(jié)點并在barrierPath/ready節(jié)點下注冊節(jié)點watch请琳,如果此時臨時節(jié)點數(shù)未達到等待線程的數(shù)目,則阻塞粱挡。否則創(chuàng)建barrierPath/ready節(jié)點赠幕,并放行。
2.調(diào)用leave時询筏,將這一批等待線程按照臨時節(jié)點名字排序榕堰,最小節(jié)點線程不斷在最大節(jié)點上面注冊watch,而其他節(jié)點線程都在最小節(jié)點上面注冊watch,其他節(jié)點會主動刪除嫌套,然后阻塞直到只剩下最小節(jié)點的時候最小節(jié)點也刪除逆屡。此時當所有節(jié)點都刪掉了,所有線程可跳出無限循環(huán)踱讨,繼續(xù)運行魏蔗。

計數(shù)器

Curator有兩個計數(shù)器, SharedCount是用int來計數(shù)痹筛,DistributedAtomicLong是用long來計數(shù)莺治。
SharedCount
SharedCount管理一個相同path下的共享int值,start的時候在該path節(jié)點下注冊watch帚稠,能夠感知到節(jié)點數(shù)據(jù)的變化谣旁,并更新本地緩存的數(shù)據(jù)值和版本值。如果想改變共享值滋早,則通過本地版本號+最新值 去更新節(jié)點榄审,如果版本過期,則更新失敗并更新本地數(shù)據(jù)杆麸,否則更新成功搁进。類似樂觀鎖。
此外昔头,可以為它增加一個SharedCountListener饼问,可以接收到共享值和連接狀態(tài)的改變事件。
DistributedAtomicLong
DistributedAtomicLong管理一個相同path下的共享long值,調(diào)用trySet修改時减细,首先會采用樂觀鎖的方式(版本+最新值)進行修改匆瓜,按照嘗試策略retryPolicy修改直到成功。如果嘗試策略次數(shù)用完仍然失敗,如果允許使用悲觀鎖方式(初始化promotedToLock!=null驮吱,會初始化一個互斥鎖)修改茧妒,則會獲取互斥鎖,然后再修改左冬。

緩存

緩存是客戶端對服務端的數(shù)據(jù)的緩存桐筏,如果服務端數(shù)據(jù)發(fā)生變化,通過watch機制對客戶端通知拇砰,更新緩存數(shù)據(jù)并重新watch梅忌。可分為Path Cache除破、Node Cache和Tree Cache牧氮。

我們知道zookeeper原生的watch是一次性的,每次觸發(fā)之后服務端和客戶端都會清理掉瑰枫。watch分為三種:node watch踱葛,path watch和default watch。
node watch:當監(jiān)控路徑下的節(jié)點數(shù)據(jù)變化光坝,節(jié)點被創(chuàng)建和刪除時觸發(fā)尸诽,通過調(diào)用existsgetData方法可注冊該類watch盯另。
path watch:當監(jiān)控路徑下的節(jié)點被刪除性含,新增或刪除子節(jié)點時觸發(fā),通過調(diào)用getChildren方法可注冊該類watch鸳惯。
default watch:每當連接狀態(tài)發(fā)生改變時商蕴,都會觸發(fā)。通過初始化zookeeper的時候注冊悲敷。同時也可以作為node watch和path watch傳入exists究恤,getDatagetChildren方法中,此時對于監(jiān)控路徑是一次性的后德。

Path Cache
是對節(jié)點路徑下子節(jié)點的新增部宿,修改和刪除的監(jiān)控。當一個子節(jié)點增加瓢湃, 更新理张,刪除時, Path Cache會改變它的數(shù)據(jù)和狀態(tài)绵患。
調(diào)用start方法雾叭,連接異常時重連成功(通過啟動時注冊connectionStateListener),每次收到pathwatch的回調(diào)時都重新注冊path watch達到始終監(jiān)控子節(jié)點新增和刪除的效果落蝙。同時每次收到watch時织狐,都會獲得子節(jié)點的所有信息暂幼,更新緩存數(shù)據(jù)。如果是新增子節(jié)點移迫,則注冊node watch旺嬉,達到監(jiān)控子節(jié)點數(shù)據(jù)更新的效果,從而觸發(fā)對應的子節(jié)點增加厨埋, 更新邪媳,刪除事件,并通知注冊的PathChildrenCacheListener荡陷。
Node Cache
NodeCache是對一個節(jié)點的監(jiān)控雨效。當節(jié)點數(shù)據(jù)內(nèi)容修改或者刪除節(jié)點時,都會觸發(fā)本地緩存的更新废赞。
node cache在調(diào)用start方法徽龟,連接異常時重連成功(通過啟動時注冊connectionStateListener),每次收到node watch的回調(diào)時都重新注冊node watch達到始終監(jiān)控節(jié)點的效果蛹头。此外顿肺,每次注冊watch時是后臺線程發(fā)起的戏溺,會在他的響應數(shù)據(jù)上注冊一個回調(diào)函數(shù)渣蜗,負責獲得最新節(jié)點數(shù)據(jù),當節(jié)點數(shù)據(jù)與之前的本地緩存不一樣時旷祸,觸發(fā)ListenerContainer<NodeCacheListener> listeners的回調(diào)耕拷,同時使注冊的NodeCacheListener感知到。
Tree Cache
Tree Cache是對監(jiān)控路徑下所有節(jié)點(一棵樹)的新增托享,修改和刪除的監(jiān)控骚烧。
首先對于樹上每個節(jié)點會把它當成一個TreeNode,并在節(jié)點上注冊node watch 和 path watch闰围。當調(diào)用start方法赃绊,連接異常時重連成功(通過啟動時注冊connectionStateListener),每次收到節(jié)點上對應 watch的回調(diào)時都會根據(jù)條件重新構(gòu)造相應TreeNode羡榴,注冊對應的node watch 和 path watch碧查。同時樹上的節(jié)點變化會通知到注冊的TreeCacheListener。

參考資料:
ZooKeeper的Java客戶端使用
跟著實例學習ZooKeeper的用法: 文章匯總
Zookeeper Client架構(gòu)分析——ZK鏈接重連失敗排查
http://zookeeper.apache.org/doc/r3.4.9/recipes.html#sc_recipes_Locks

感謝您的閱讀校仑,歡迎關注我的公眾號:「碼農(nóng)知識點」,和我進一步交流學習~

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末忠售,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子迄沫,更是在濱河造成了極大的恐慌稻扬,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,639評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件羊瘩,死亡現(xiàn)場離奇詭異泰佳,居然都是意外死亡盼砍,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評論 3 385
  • 文/潘曉璐 我一進店門逝她,熙熙樓的掌柜王于貴愁眉苦臉地迎上來衬廷,“玉大人,你說我怎么就攤上這事汽绢÷鸢希” “怎么了?”我有些...
    開封第一講書人閱讀 157,221評論 0 348
  • 文/不壞的土叔 我叫張陵宁昭,是天一觀的道長跌宛。 經(jīng)常有香客問我,道長积仗,這世上最難降的妖魔是什么疆拘? 我笑而不...
    開封第一講書人閱讀 56,474評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮寂曹,結(jié)果婚禮上哎迄,老公的妹妹穿的比我還像新娘。我一直安慰自己隆圆,他們只是感情好漱挚,可當我...
    茶點故事閱讀 65,570評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著渺氧,像睡著了一般旨涝。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上侣背,一...
    開封第一講書人閱讀 49,816評論 1 290
  • 那天白华,我揣著相機與錄音,去河邊找鬼贩耐。 笑死弧腥,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的潮太。 我是一名探鬼主播管搪,決...
    沈念sama閱讀 38,957評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼消别!你這毒婦竟也來了抛蚤?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,718評論 0 266
  • 序言:老撾萬榮一對情侶失蹤寻狂,失蹤者是張志新(化名)和其女友劉穎岁经,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蛇券,經(jīng)...
    沈念sama閱讀 44,176評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡缀壤,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,511評論 2 327
  • 正文 我和宋清朗相戀三年樊拓,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片塘慕。...
    茶點故事閱讀 38,646評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡筋夏,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出图呢,到底是詐尸還是另有隱情条篷,我是刑警寧澤,帶...
    沈念sama閱讀 34,322評論 4 330
  • 正文 年R本政府宣布蛤织,位于F島的核電站赴叹,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏指蚜。R本人自食惡果不足惜乞巧,卻給世界環(huán)境...
    茶點故事閱讀 39,934評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望摊鸡。 院中可真熱鬧绽媒,春花似錦、人聲如沸免猾。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽掸刊。三九已至免糕,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間忧侧,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評論 1 266
  • 我被黑心中介騙來泰國打工牌芋, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蚓炬,地道東北人。 一個月前我還...
    沈念sama閱讀 46,358評論 2 360
  • 正文 我出身青樓躺屁,卻偏偏與公主長得像肯夏,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子犀暑,可洞房花燭夜當晚...
    茶點故事閱讀 43,514評論 2 348

推薦閱讀更多精彩內(nèi)容

  • ZooKeeper 分布式過程協(xié)同技術詳解 簡介 分布式系統(tǒng) 分布式系統(tǒng)是同時跨越多個物理主機, 獨立運行的多個軟...
    魯云飛_閱讀 2,637評論 0 1
  • 本文將從系統(tǒng)模型耐亏、序列化與協(xié)議徊都、客戶端工作原理、會話广辰、服務端工作原理以及數(shù)據(jù)存儲等方面來揭示ZooKeeper的技...
    端木軒閱讀 3,792評論 0 42
  • 分布式服務框架 Zookeeper Zookeeper名字的由來是比較有趣的暇矫,下面的片段摘抄自《從PAXOS到ZO...
    史路比閱讀 1,385評論 0 6
  • 到點了主之,是時候在工作之余開心一笑了,先給大家分一個民航逗比小故事: 故事的背景是民航系統(tǒng)兩大主角:飛行員和管制員李根,...
    飛翔的豆渣閱讀 126評論 0 0
  • 其實今天的工作時間真的很少槽奕。中午出去吃飯,耗到快三點才回公司房轿。上午走在路上就開始參加線上的面試粤攒。其實完全是客串。但...
    SHINJI君閱讀 174評論 0 0