Curator源碼分析--創(chuàng)建client

curator創(chuàng)建zookeeper連接
1.初始化CuratorFramework client = CuratorFrameworkFactory.newClient()
1)入?yún)ⅲ悍?wù)器IP地址,session超時(shí)時(shí)間澜汤,連接超時(shí)時(shí)間,重試策略
2)初始化ZookeeperFactory,實(shí)現(xiàn)newZooKeeper方法,該方法實(shí)現(xiàn)zookeeper的連接創(chuàng)建
3)初始化CuratorZookeeperClient居灯,傳入Watcher香浩,但這個(gè)并不是傳給zookeeper的,zookeeper回調(diào)時(shí)恭垦,會(huì)調(diào)到這個(gè)watcher,這個(gè)watcher會(huì)回調(diào)我們創(chuàng)建連接時(shí)的client.getCuratorListenable().addListener(new LcpCuratorListener(ip));這個(gè)操作格嗅》Γ可以用于我們對(duì)事件的監(jiān)控與處理(eventReceived(CuratorFramework client, CuratorEvent event))。
4)初始化ConnectionState屯掖,將3)中的watcher傳給ConnectionState的parentWatchers屬性玄柏,ConnectionState也是最終傳給zookeeper的watcher
2.創(chuàng)建連接client.start();
1)連接狀態(tài)管理 connectionStateManager.start()
connectionStateManager中,BlockingQueue<ConnectionState>保存著連接狀態(tài)的狀態(tài)變化值贴铜,
start()方法中循環(huán)獲取隊(duì)列中的狀態(tài)值粪摘,然后執(zhí)行在創(chuàng)建client時(shí) client.getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));進(jìn)行狀態(tài)變化的監(jiān)控或處理
2)連接創(chuàng)建 client.start();
該方法主要執(zhí)行的是ConnectionState的reset()方法,reset ()主要完成老連接的關(guān)閉绍坝,和新連接的創(chuàng)建徘意,此處創(chuàng)建連接即調(diào)用初始化ZookeeperFactory實(shí)現(xiàn)的newZooKeeper方法
3.zookeeper回調(diào)watcher
傳入zookeeper的watcher是ConnectionState對(duì)象,則回調(diào)時(shí)轩褐,則先調(diào)用ConnectionState中的process方法椎咧,此處會(huì)判斷連接狀態(tài),SyncConnected灾挨,ConnectedReadOnly則連接成功邑退,Expired連接過(guò)期,則重新調(diào)用2中的reset()方法,其他狀態(tài)則連接失敗劳澄。如果連接狀態(tài)有變化則通過(guò)AtomicBoolean進(jìn)行保存地技。
此處還會(huì)調(diào)用之前初始化進(jìn)去的parentWatchers,回調(diào)到初始化CuratorZookeeperClient時(shí)傳入的watcher秒拔。此處莫矗,會(huì)校驗(yàn)連接狀態(tài),并將連接狀態(tài)加入ConnectionStateManager狀態(tài)管理器重進(jìn)行管理(ConnectionStateManager的BlockingQueue<ConnectionState>)。加入ConnectionStateManager管理的狀態(tài)會(huì)在connectionStateManager.start()中獲取到作谚,并可以通過(guò) client.getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));來(lái)監(jiān)控或處理

//創(chuàng)建Curator連接
public void init(){
        Assert.hasText(zkServer,"zkServer is empty");
        Assert.hasText(zkPath,"zkPath is empty");
        ip = IpUtils.getOneIpV4();
        //RetryNTimes 重試策略三娩,
        client = CuratorFrameworkFactory.newClient(zkServer,sessionTimeoutMs,connectionTimeoutMs,new RetryNTimes(maxRetries,sleepMsBetweenRetries));
        client.getCuratorListenable().addListener(new LcpCuratorListener(ip));
        client.getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));
        client.getUnhandledErrorListenable().addListener(new LcpErrorListener(ip));
        client.start();
    }

client = CuratorFrameworkFactory.newClient(zkServer,sessionTimeoutMs,connectionTimeoutMs,new RetryNTimes(maxRetries,sleepMsBetweenRetries));

 public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
    {
        return builder().
            connectString(connectString).
            sessionTimeoutMs(sessionTimeoutMs).
            connectionTimeoutMs(connectionTimeoutMs).
            retryPolicy(retryPolicy).
            build();
    }
//返回一個(gè)CuratorFrameworkImpl對(duì)象
public CuratorFramework build()
        {
            return new CuratorFrameworkImpl(this);
        }

public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
    {
        //初始化ZookeeperFactory
        ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
       初始化ConnectionState,HandleHolder
        this.client = new CuratorZookeeperClient
            (
                localZookeeperFactory,
                builder.getEnsembleProvider(),
                builder.getSessionTimeoutMs(),
                builder.getConnectionTimeoutMs(),
                builder.getWaitForShutdownTimeoutMs(),
                //這個(gè)watcher并不是真正傳給zookeeper的watcher妹懒,傳給zookeeper的是ConnectionState雀监,
                //ConnectionState中重寫process(WatchedEvent event)方法中,會(huì)調(diào)用這個(gè)Watcher
                new Watcher()
                {
                    @Override
                    public void process(WatchedEvent watchedEvent)
                    {
                        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
                        //在這里眨唬,實(shí)現(xiàn)CuratorListener接口的listener重寫eventReceived方法会前,接收zk事件信息
                        processEvent(event);
                    }
                },
                builder.getRetryPolicy(),
                builder.canBeReadOnly(),
                builder.getConnectionHandlingPolicy()
            );

       //zk連接狀態(tài)的管理類, 狀態(tài)發(fā)生變化時(shí)匾竿,回掉listener的
        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerDecorator());
       //K節(jié)點(diǎn)默認(rèn)值為本機(jī)IP瓦宜,ZK本身是不允許創(chuàng)建沒(méi)有value的節(jié)點(diǎn)的,但curator允許岭妖,就是使用了該默認(rèn)值
        byte[] builderDefaultData = builder.getDefaultData();
       //省略其他變量賦值
    }

CuratorFramework.start();

 @Override
    public void start()
    {
       try
          {
            //開啟 連接狀態(tài)管理
            connectionStateManager.start(); 
            //CuratorZookeeperClient中的start方法临庇,真正與ZK建立連接
            client.start();
            }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            handleBackgroundOperationException(null, e);
        }
    }

CuratorZookeeperClient

 public void start() throws Exception
    {
        log.debug("Starting");

        if ( !started.compareAndSet(false, true) )
        {
            throw new IllegalStateException("Already started");
        }
        //ConnectionState 的start方法
        state.start();
    }

ConnectionState類

void start() throws Exception
    {
        log.debug("Starting");
        ensembleProvider.start();
        reset();
    }

synchronized void reset() throws Exception
    {
        log.debug("reset");
        //用來(lái)記錄zookeeper實(shí)例創(chuàng)建次數(shù)
        instanceIndex.incrementAndGet();
        isConnected.set(false);
        //連接開始時(shí)間
        connectionStartMs = System.currentTimeMillis();
        //HandleHolder  關(guān)閉老的zookeeper實(shí)例,重新構(gòu)建新的helper
        zooKeeper.closeAndReset();
        //調(diào)用zookeeperFactory.newZooKeeper創(chuàng)建原生zookeeper連接
        zooKeeper.getZooKeeper();  
    }

HandleHolder類

void closeAndReset() throws Exception
    {
        //如果有的話關(guān)閉之前的zookeeper實(shí)例昵慌,重構(gòu)HandleHolder
        internalClose(0);
        helper = new Helper()
        {
            private volatile ZooKeeper zooKeeperHandle = null;
            private volatile String connectionString = null;

            @Override
            public ZooKeeper getZooKeeper() throws Exception
            {
                synchronized(this)
                {
                    if ( zooKeeperHandle == null )
                    {
                        connectionString = ensembleProvider.getConnectionString();
                      //這里創(chuàng)建zookeeper連接假夺,傳入的watcher就是 ConnectionState
                        zooKeeperHandle = zookeeperFactory.newZooKeeper(connectionString, sessionTimeout, watcher, canBeReadOnly);
                    }

                    helper = new Helper()
                    {
                        @Override
                        public ZooKeeper getZooKeeper() throws Exception
                        {
                            return zooKeeperHandle;
                        }

                        @Override
                        public String getConnectionString()
                        {
                            return connectionString;
                        }

                        @Override
                        public int getNegotiatedSessionTimeoutMs()
                        {
                            return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;
                        }
                    };

                    return zooKeeperHandle;
                }
            }

            @Override
            public String getConnectionString()
            {
                return connectionString;
            }

            @Override
            public int getNegotiatedSessionTimeoutMs()
            {
                return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;
            }
        };
    }
//通過(guò)上面的helper實(shí)現(xiàn)知道這里真正與ZK建立連接
ZooKeeper getZooKeeper() throws Exception
    {
        return (helper != null) ? helper.getZooKeeper() : null;
    }

創(chuàng)建zookeeper連接之后,watcher接收zookeeper返回的連接事件并進(jìn)行處理废离,這里的watcher就是ConnectionState類侄泽,執(zhí)行其中的process方法

    @Override
    public void process(WatchedEvent event)
    {
        //這里為None說(shuō)明收到的事件是ZK連接狀態(tài)改變的事件
        if ( event.getType() == Watcher.Event.EventType.None )
        {
            boolean wasConnected = isConnected.get();
            boolean newIsConnected = checkState(event.getState(), wasConnected);
            //連接狀態(tài)發(fā)生變化
            if ( newIsConnected != wasConnected )
            {
                isConnected.set(newIsConnected);
                //記錄連接開始時(shí)間
                connectionStartMs = System.currentTimeMillis();
                //連接狀態(tài)變化為已連接則記錄新協(xié)商的回話超市時(shí)間
                if ( newIsConnected )
                {
                    //重置session超時(shí)時(shí)間
                    lastNegotiatedSessionTimeoutMs.set(zooKeeper.getNegotiatedSessionTimeoutMs());
                    log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get());
                }
            }
        }
        //回調(diào)CuratorZookeeperClient創(chuàng)建時(shí)的watcher,
        for ( Watcher parentWatcher : parentWatchers )
        {
            OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
             //回調(diào)CuratorZookeeperClient創(chuàng)建時(shí)的watcher
            parentWatcher.process(event);
            trace.commit();
        }
    }
// 獲取當(dāng)前連接狀態(tài)
private boolean checkState(Event.KeeperState state, boolean wasConnected)
    {
        // AtomicBoolean isConnected = new AtomicBoolean(false); 原子boolean保存連接狀態(tài)
        boolean isConnected = wasConnected;
        boolean checkNewConnectionString = true;
        switch ( state )
        {
        default:
        //連接斷開
        case Disconnected:
        {
            isConnected = false;
            break;
        }
      //連接成功
        case SyncConnected:
        case ConnectedReadOnly:
        {
            isConnected = true;
            break;
        }
        //權(quán)限驗(yàn)證失敗連接失敗
        case AuthFailed:
        {
            isConnected = false;
            log.error("Authentication failed");
            break;
        }
      //連接過(guò)期
        case Expired:
        {
            isConnected = false;
            checkNewConnectionString = false;
            //處理連接過(guò)期
            //調(diào)用ConnectionState.reset() 重新構(gòu)建zookeeper連接
            handleExpiredSession();
            break;
        }

        case SaslAuthenticated:
        {
            // NOP
            break;
        }
        }
         //當(dāng)連接狀態(tài)發(fā)生改變且不是會(huì)話過(guò)期時(shí)蜻韭,檢查ZK地址是否發(fā)生變化
        if ( checkNewConnectionString )
        {
            String newConnectionString = zooKeeper.getNewConnectionString();
            if ( newConnectionString != null )
            {  //處理ZK地址發(fā)生變化
                handleNewConnectionString(newConnectionString);
            }
        }
        return isConnected;
    }

parentWatcher.process(event);回調(diào)初始化CuratorZookeeperClient時(shí)傳入的watcher

new Watcher()
                {
                    @Override
                    public void process(WatchedEvent watchedEvent)
                    {  //將zookeeper的event包裝成CuratorEvent
                        CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
                        processEvent(event);
                    }
                }


private void processEvent(final CuratorEvent curatorEvent)
    {
        if ( curatorEvent.getType() == CuratorEventType.WATCHED )
        {
            //校驗(yàn)連接狀態(tài),并將狀態(tài)加入connectionStateManager進(jìn)行管理
            validateConnection(curatorEvent.getWatchedEvent().getState());
        }

        listeners.forEach(new Function<CuratorListener, Void>()
        {
            @Override
            public Void apply(CuratorListener listener)
            {
                try
                {
                    OperationTrace trace = client.startAdvancedTracer("EventListener");
                    //去回調(diào)創(chuàng)建client時(shí)的client.getCuratorListenable().addListener(new LcpCuratorListener(ip));
                    listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
                    trace.commit();
                }
                catch ( Exception e )
                {
                    ThreadUtils.checkInterrupted(e);
                    logError("Event listener threw exception", e);
                }
                return null;
            }
        });
    }

void validateConnection(Watcher.Event.KeeperState state)
    {
        if ( state == Watcher.Event.KeeperState.Disconnected )
        {
            internalConnectionHandler.suspendConnection(this);
        }
        else if ( state == Watcher.Event.KeeperState.Expired )
        {  //將狀態(tài)加入 阻塞隊(duì)列中柿扣,在connectionStateManager.start()中循環(huán)獲取該隊(duì)列中的狀態(tài)數(shù)據(jù)肖方,并執(zhí)行我們初始化client時(shí)的getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));
            connectionStateManager.addStateChange(ConnectionState.LOST);
        }
        else if ( state == Watcher.Event.KeeperState.SyncConnected )
        {
            internalConnectionHandler.checkNewConnection(this);
            connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
            unSleepBackgroundOperations();
        }
        else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
        {
            internalConnectionHandler.checkNewConnection(this);
            connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
        }
    }


connectionStateManager.start(); 開啟連接狀態(tài)管理

public void start()
    {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

        service.submit
            (
                new Callable<Object>()
                {
                    @Override
                    public Object call() throws Exception
                    {
                        processEvents();
                        return null;
                    }
                }
            );
    }

 private void processEvents()
    {    //注意是個(gè)循環(huán),一直在獲取 上面那個(gè)阻塞隊(duì)列中的狀態(tài)值
        while ( state.get() == State.STARTED )
        {
            try
            {
                //第一次ZK還沒(méi)有建立連接未状,這里得到的就是用戶指定的會(huì)話超時(shí)時(shí)間
                int useSessionTimeoutMs = getUseSessionTimeoutMs();
                long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch;
                long pollMaxMs = useSessionTimeoutMs - elapsedMs;

                //這個(gè)隊(duì)列就是剛才放進(jìn)去事件的隊(duì)列
                final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
                if ( newState != null )
                {
                    if ( listeners.size() == 0 )
                    {
                        log.warn("There are no ConnectionStateListeners registered.");
                    }
                     //這里僅僅就是回調(diào)監(jiān)聽器StandardListenerManager<ConnectionStateListener>
                    //client.getConnectionStateListenable().addListener(new ConnectionStateListener());
                    //連接狀態(tài)變化
                    listeners.forEach(listener -> listener.stateChanged(client, newState));
                }
                //該值默認(rèn)100俯画,如果長(zhǎng)時(shí)間沒(méi)有收到事件變化就判斷下會(huì)話是否過(guò)期
                else if ( sessionExpirationPercent > 0 )
                {
                    synchronized(this)
                    {
                        checkSessionExpiration();
                    }
                }
            }
            catch ( InterruptedException e )
            {
                // swallow the interrupt as it's only possible from either a background
                // operation and, thus, doesn't apply to this loop or the instance
                // is being closed in which case the while test will get it
            }
        }
    }

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市司草,隨后出現(xiàn)的幾起案子艰垂,更是在濱河造成了極大的恐慌,老刑警劉巖埋虹,帶你破解...
    沈念sama閱讀 211,290評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件猜憎,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡搔课,警方通過(guò)查閱死者的電腦和手機(jī)胰柑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人柬讨,你說(shuō)我怎么就攤上這事崩瓤。” “怎么了踩官?”我有些...
    開封第一講書人閱讀 156,872評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵却桶,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我蔗牡,道長(zhǎng)颖系,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,415評(píng)論 1 283
  • 正文 為了忘掉前任蛋逾,我火速辦了婚禮集晚,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘区匣。我一直安慰自己偷拔,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評(píng)論 6 385
  • 文/花漫 我一把揭開白布亏钩。 她就那樣靜靜地躺著莲绰,像睡著了一般。 火紅的嫁衣襯著肌膚如雪姑丑。 梳的紋絲不亂的頭發(fā)上蛤签,一...
    開封第一講書人閱讀 49,784評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音栅哀,去河邊找鬼震肮。 笑死,一個(gè)胖子當(dāng)著我的面吹牛留拾,可吹牛的內(nèi)容都是我干的戳晌。 我是一名探鬼主播,決...
    沈念sama閱讀 38,927評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼痴柔,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼沦偎!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起咳蔚,我...
    開封第一講書人閱讀 37,691評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤豪嚎,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后谈火,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體侈询,經(jīng)...
    沈念sama閱讀 44,137評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評(píng)論 2 326
  • 正文 我和宋清朗相戀三年堆巧,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了妄荔。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片泼菌。...
    茶點(diǎn)故事閱讀 38,622評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖啦租,靈堂內(nèi)的尸體忽然破棺而出哗伯,到底是詐尸還是另有隱情,我是刑警寧澤篷角,帶...
    沈念sama閱讀 34,289評(píng)論 4 329
  • 正文 年R本政府宣布焊刹,位于F島的核電站,受9級(jí)特大地震影響恳蹲,放射性物質(zhì)發(fā)生泄漏虐块。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評(píng)論 3 312
  • 文/蒙蒙 一嘉蕾、第九天 我趴在偏房一處隱蔽的房頂上張望贺奠。 院中可真熱鬧,春花似錦错忱、人聲如沸儡率。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)儿普。三九已至,卻和暖如春掷倔,著一層夾襖步出監(jiān)牢的瞬間眉孩,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工勒葱, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留浪汪,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,316評(píng)論 2 360
  • 正文 我出身青樓凛虽,卻偏偏與公主長(zhǎng)得像吟宦,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子涩维,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容