zookeeper單機(jī)版-client啟動

前言

在前面介紹了zookeeper server端的啟動過程担映,現(xiàn)在我們分析zookeeper client啟動過程

創(chuàng)建客戶端連接對象

一般情況下使用zookeeper原生庫創(chuàng)建建立的方式如下

Zookeeper zookeeper = new Zookeeper(connectionString,sessionTimeout,watcher)

我直接看Zookeeper類初始化的源代碼

public ZooKeeper(
        String connectString,
        int sessionTimeout,
        Watcher watcher,
        boolean canBeReadOnly,
        HostProvider aHostProvider,
        ZKClientConfig clientConfig) throws IOException {
        LOG.info(
            "Initiating client connection, connectString={} sessionTimeout={} watcher={}",
            connectString,
            sessionTimeout,
            watcher);

        if (clientConfig == null) {
            clientConfig = new ZKClientConfig();
        }
         //clientConfig存儲zookeeper客戶端一些可配置屬性的信息
        this.clientConfig = clientConfig;
       //創(chuàng)建客戶端的watcher的管理器
        watchManager = defaultWatchManager();
        //設(shè)置watcher管理器默認(rèn)的watcher
        watchManager.defaultWatcher = watcher;
        //根據(jù)用戶提供的connectString穿件ConnectStringParser對象,下面會解析ConnectStringParser對象的作用
        ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
      //用戶提供的連接信息中可能包含多個(gè)ip地址邀桑,那么當(dāng)客戶端去連接zookeeper server的時(shí)候應(yīng)該選擇哪一個(gè)ip去連接呢祟蚀?通過hostProvider來封裝這個(gè)邏輯
        hostProvider = aHostProvider;
      //客戶端的連接對象工窍,這個(gè)對象是實(shí)現(xiàn)客戶端連接服務(wù)端的核心,在下面我們會詳細(xì)解析
        cnxn = createConnection(
            connectStringParser.getChrootPath(),
            hostProvider,
            sessionTimeout,
            this,
            watchManager,
            getClientCnxnSocket(),
            canBeReadOnly);
        //啟動客戶端相關(guān)的一些線程
        cnxn.start();
    }
ConnectStringParser

我們先解析下ConnectStringParser這個(gè)類


ConnectStringParser.png

從上圖我們可以看出ConnectStringParser類有兩個(gè)重要的屬性chrootPath前酿,serverAddresses患雏。ConnectStringParser就是根據(jù)用戶傳入的連接信息解析出這兩個(gè)屬性的值,chrootPath是用戶傳入的連接信息中包含的路徑信息罢维,比如用戶提供的連接信息是"192.168.11.1:2181,192.168.11.2:2181/tt",那么通過ConnectStringParser的解析chrootPath=tt淹仑,serverAddresses用來存儲用戶提供的地址和端口對的解析結(jié)果,同樣是上面的例子,serverAddresses解析的結(jié)果是["192.168.11.1:2181","192.168.11.2:2181"]
現(xiàn)在給出ConnectStringParser解析用戶提供的連接信息的源代碼

public ConnectStringParser(String connectString) {
        // parse out chroot, if any
        //取得chrootPath的分解符的位置
        int off = connectString.indexOf('/');
        if (off >= 0) {
            //解析出chrootPath
            String chrootPath = connectString.substring(off);
            // ignore "/" chroot spec, same as null
            if (chrootPath.length() == 1) {
                this.chrootPath = null;
            } else {
                PathUtils.validatePath(chrootPath);
                this.chrootPath = chrootPath;
            }
            //解析出客戶端連接服務(wù)端的ip:port對信息
            connectString = connectString.substring(0, off);
        } else {
            this.chrootPath = null;
        }
       //通過逗號分隔符分割出每個(gè)ip:port的連接信息
        List<String> hostsList = split(connectString, ",");
        for (String host : hostsList) {
            int port = DEFAULT_PORT;
            try {
                //解析出ip和port
                String[] hostAndPort = ConfigUtils.getHostAndPort(host);
                host = hostAndPort[0];
                if (hostAndPort.length == 2) {
                    port = Integer.parseInt(hostAndPort[1]);
                }
            } catch (ConfigException e) {
                e.printStackTrace();
            }
            //根據(jù)ip和port創(chuàng)建InetSocketAddress對象匀借,然后加入serverAddresses中
            serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
        }
    }


HostProvider

通過對ConnectStringParser的解析颜阐,我們知道用戶可能會提供多個(gè)連接服務(wù)端的IP:Port,那么客戶端應(yīng)該選擇哪一個(gè)去連接服務(wù)端呢吓肋?這個(gè)就是HostProvider的工作了凳怨。HostProvider默認(rèn)實(shí)現(xiàn)是StaticHostProvider


StaticHostProvider.png

上圖是StaticHostProvider屬性圖,紅框框出來的三個(gè)屬性serverAddresses,lastIndex,currentIndex是StaticHostProvider實(shí)現(xiàn)選擇一個(gè)服務(wù)器進(jìn)行服務(wù)端連接的核心是鬼。StaticHostProvider的next()方法向外部提供了選取一個(gè)服務(wù)器進(jìn)行連接的封裝

next()
public InetSocketAddress next(long spinDelay) {
        boolean needToSleep = false;
        InetSocketAddress addr;

        synchronized (this) {
           //reconfigMode是zookeeper為了server端連接的負(fù)債均衡而設(shè)計(jì)的一個(gè)功能
            if (reconfigMode) {
                addr = nextHostInReconfigMode();
                if (addr != null) {
                    currentIndex = serverAddresses.indexOf(addr);
                    return resolve(addr);
                }
                //tried all servers and couldn't connect
                reconfigMode = false;
                needToSleep = (spinDelay > 0);
            }
           //更新currentIndex
            ++currentIndex;
            //如果currentIndex和服務(wù)器列表長度一樣大肤舞,那么重置currentIndex為0
            if (currentIndex == serverAddresses.size()) {
                currentIndex = 0;
            }
           //從服務(wù)器列表中獲取一個(gè)服務(wù)器
            addr = serverAddresses.get(currentIndex);
           //判斷是不是需要sleep 一會
            needToSleep = needToSleep || (currentIndex == lastIndex && spinDelay > 0);
            if (lastIndex == -1) {
                // We don't want to sleep on the first ever connect attempt.
                //初始化lastIndex為0,如果一開始lastIndex就設(shè)置成0而不是-1那么會導(dǎo)致第一次連接時(shí)候needToSleep就是true均蜜,這樣顯然不合適
                lastIndex = 0;
            }
        }
        if (needToSleep) {
            try {
                //休眠一會會李剖,當(dāng)服務(wù)器列表中的所有的服務(wù)器都被連接一遍之后,再次去連接服務(wù)器的時(shí)候需要休眠一會(個(gè)人理解:既然所有的服務(wù)器連接都沒連接上囤耳,那么服務(wù)端可能在忙著什么事情篙顺,等一會給服務(wù)器端一些喘息的機(jī)會)
                Thread.sleep(spinDelay);
            } catch (InterruptedException e) {
                LOG.warn("Unexpected exception", e);
            }
        }
      //返回服務(wù)器ip信息
        return resolve(addr);
    }

ClientCnxn

客戶端連接對象,保存了客戶端連接服務(wù)端的信息充择,包含的屬性比較多慰安,我們選擇幾個(gè)進(jìn)行注釋


ClientCnxn.png

我們看下ClientCnxn最終的構(gòu)造方法

 public ClientCnxn(
        String chrootPath,
        HostProvider hostProvider,
        int sessionTimeout,
        ZooKeeper zooKeeper,
        ClientWatchManager watcher,
        ClientCnxnSocket clientCnxnSocket,
        long sessionId,
        byte[] sessionPasswd,
        boolean canBeReadOnly) {
        this.zooKeeper = zooKeeper;
        this.watcher = watcher;
        this.sessionId = sessionId;
        this.sessionPasswd = sessionPasswd;
        this.sessionTimeout = sessionTimeout;
        this.hostProvider = hostProvider;
        this.chrootPath = chrootPath;

        connectTimeout = sessionTimeout / hostProvider.size();
        readTimeout = sessionTimeout * 2 / 3;
        readOnly = canBeReadOnly;
        //創(chuàng)建SendThread線程,SendThread負(fù)責(zé)IO的處理聪铺,clientCnxnSocket在默認(rèn)情況下的實(shí)現(xiàn)是ClientCnxnSocketNIO
        sendThread = new SendThread(clientCnxnSocket);
       //創(chuàng)建EventThread用來處理各種watcher關(guān)注的事件
        eventThread = new EventThread();
        this.clientConfig = zooKeeper.getClientConfig();
        initRequestTimeout();
    }
ClientCnxn.start()

ClientCnxn.start會啟動SendThread和EventThread

  public void start() {
        sendThread.start();
        eventThread.start();
    }

SendThread

SendThread負(fù)責(zé)處理客戶端的所有IO,我們看下它的run方法

SendThread.run
 public void run() {
            //設(shè)置clientCnxnSocket的sessionId萄窜,outgoingQueue屬性
            //注意當(dāng)?shù)谝淮谓⑦B接的時(shí)候由于服務(wù)端的sessionId還沒有生成铃剔,所以為默認(rèn)的0
            clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
            clientCnxnSocket.updateNow(); 
            clientCnxnSocket.updateLastSendAndHeard();
            int to;
            long lastPingRwServer = Time.currentElapsedTime();
             //客戶端向服務(wù)端發(fā)送心跳的頻率,默認(rèn)是10s查刻,但是為了debug我把這個(gè)參數(shù)設(shè)置的很大
            final int MAX_SEND_PING_INTERVAL = 10000000; //10 seconds
            InetSocketAddress serverAddress = null;
           //注意這個(gè)這里是while(xxx),如果連接正常键兜,那么會一直執(zhí)行下面的邏輯
            while (state.isAlive()) {
                try {
                    //如果客戶端還沒有和服務(wù)端建立連接,那么進(jìn)入建立連接流程
                    if (!clientCnxnSocket.isConnected()) {
                        // don't re-establish connection if we are closing
                        if (closing) {
                            break;
                        }
                        if (rwServerAddress != null) {
                            serverAddress = rwServerAddress;
                            rwServerAddress = null;
                        } else {
                         //通過hostProvider去服務(wù)器列表中獲取一個(gè)服務(wù)進(jìn)行連接
                            serverAddress = hostProvider.next(1000);
                        }
                       //建立到服務(wù)端的socket連接穗泵,下面會給出具體的源碼
                        startConnect(serverAddress);
                        clientCnxnSocket.updateLastSendAndHeard();
                    }
                    //如果已經(jīng)建立了到服務(wù)端的連接
                    if (state.isConnected()) {
                        // determine whether we need to send an AuthFailed event.
                        if (zooKeeperSaslClient != null) {
                            boolean sendAuthEvent = false;
                            if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                                try {
                                    zooKeeperSaslClient.initialize(ClientCnxn.this);
                                } catch (SaslException e) {
                                    LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
                                    state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                }
                            }
                            KeeperState authState = zooKeeperSaslClient.getKeeperState();
                            if (authState != null) {
                                if (authState == KeeperState.AuthFailed) {
                                    // An authentication error occurred during authentication with the Zookeeper Server.
                                    state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                } else {
                                    if (authState == KeeperState.SaslAuthenticated) {
                                        sendAuthEvent = true;
                                    }
                                }
                            }

                            if (sendAuthEvent) {
                                eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
                                if (state == States.AUTH_FAILED) {
                                    eventThread.queueEventOfDeath();
                                }
                            }
                        }
                        to = readTimeout - clientCnxnSocket.getIdleRecv();
                    } else {
                       //統(tǒng)計(jì)連接的耗時(shí)
                        to = connectTimeout - clientCnxnSocket.getIdleRecv();
                    }
                     //如果to<0說明操作超時(shí)拋出異常
                    if (to <= 0) {
                        String warnInfo = String.format(
                            "Client session timed out, have not heard from server in %dms for session id 0x%s",
                            clientCnxnSocket.getIdleRecv(),
                            Long.toHexString(sessionId));
                        LOG.warn(warnInfo);
                        throw new SessionTimeoutException(warnInfo);
                    }
                  
                    if (state.isConnected()) {
                       //如果已經(jīng)建立了到服務(wù)端的連接普气,下面是下一次發(fā)送心跳信息到服務(wù)端的時(shí)間點(diǎn)
                        //1000(1 second) is to prevent race condition missing to send the second ping
                        //also make sure not to send too many pings when readTimeout is small
                        int timeToNextPing = readTimeout / 2
                                             - clientCnxnSocket.getIdleSend()
                                             - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                        //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                        //如果timeTNextPing<=0說明發(fā)送的心跳的時(shí)間到了,亦或者客戶端已經(jīng)過了MAX_SEND_PING_INTERVAL這么久都沒有發(fā)送任何消息到服務(wù)端佃延,在上述兩種情況下都需要發(fā)送心跳信息到服務(wù)端
                        if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                            sendPing();
                            clientCnxnSocket.updateLastSend();
                        } else {
                            if (timeToNextPing < to) {
                                to = timeToNextPing;
                            }
                        }
                    }
                   //如果state == States.CONNECTEDREADONLY现诀,看下面的英文解釋
                    // If we are in read-only mode, seek for read/write server
                    if (state == States.CONNECTEDREADONLY) {
                        long now = Time.currentElapsedTime();
                        int idlePingRwServer = (int) (now - lastPingRwServer);
                        if (idlePingRwServer >= pingRwTimeout) {
                            lastPingRwServer = now;
                            idlePingRwServer = 0;
                            pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout);
                            pingRwServer();
                        }
                        to = Math.min(to, pingRwTimeout - idlePingRwServer);
                    }
                  //客戶端處理各種IO事件,這個(gè)我們后面會詳細(xì)解析
                    clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
                } catch (Throwable e) {
                    if (closing) {
                        // closing so this is expected
                        LOG.warn(
                            "An exception was thrown while closing send thread for session 0x{}.",
                            Long.toHexString(getSessionId()),
                            e);
                        break;
                    } else {
                        LOG.warn(
                            "Session 0x{} for sever {}, Closing socket connection. "
                                + "Attempting reconnect except it is a SessionExpiredException.",
                            Long.toHexString(getSessionId()),
                            serverAddress,
                            e);

                        // At this point, there might still be new packets appended to outgoingQueue.
                        // they will be handled in next connection or cleared up if closed.
                        cleanAndNotifyState();
                    }
                }
            }
         //代碼到這步履肃,說明客戶端和服務(wù)端的連接出現(xiàn)了異常
            synchronized (state) {
                // When it comes to this point, it guarantees that later queued
                // packet to outgoingQueue will be notified of death.
                cleanup();
            }
            clientCnxnSocket.close();
            if (state.isAlive()) {
                //向客戶端事件處理線程發(fā)現(xiàn)服務(wù)端連接斷開信息
                eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
            }
            eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
            ZooTrace.logTraceMessage(
                LOG,
                ZooTrace.getTextTraceLevel(),
                "SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
        }

客戶端到服務(wù)端的socket連接

客戶端建立到服務(wù)端的socket發(fā)生在ClientCnxnSocket.connect中

  void connect(InetSocketAddress addr) throws IOException {
       //創(chuàng)建客戶端socketChannel
        SocketChannel sock = createSock();
        try {
           //socketChannel向selector注冊O(shè)P_CONNECT時(shí)間仔沿,同時(shí)
           //socketChannel向遠(yuǎn)程服務(wù)器發(fā)起連接請求
            registerAndConnect(sock, addr);
        } catch (IOException e) {
            LOG.error("Unable to open socket to {}", addr);
            sock.close();
            throw e;
        }
        //連接初始化標(biāo)識
        initialized = false;

        /*
         * Reset incomingBuffer
         */
       //zookeeper默認(rèn)連接是基于NIO實(shí)現(xiàn),通信消息流分成兩個(gè)部分:消息長度和消息尺棋,消息又分成兩個(gè)部分[消息頭封锉,消息體]
      //消息長度部分固定為4個(gè)字節(jié)大小用來標(biāo)識消息體的長度,lenBuffer就是用來表示消息長度的byteBuffer
     
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }

上面客戶端向服務(wù)端發(fā)起了連接請求,之后會執(zhí)行到ClientCnxnSocketNIO.doTransport方法成福,這個(gè)方法是客戶端處理IO信息的入口

ClientCnxnSocketNIO.doTransport
 void doTransport(
        int waitTimeOut,
        Queue<Packet> pendingQueue,
        ClientCnxn cnxn) throws IOException, InterruptedException {
       //等待注冊監(jiān)聽事件的發(fā)生
        selector.select(waitTimeOut);
        Set<SelectionKey> selected;
        synchronized (this) {
            selected = selector.selectedKeys();
        }
        // Everything below and until we get back to the select is
        // non blocking, so time is effectively a constant. That is
        // Why we just have to do this once, here
        updateNow();
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                 //如果發(fā)生的是OP_CONNECT,那么完成socketChannel的連接
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    updateSocketAddresses();
                   //完成了sessionId建立和認(rèn)證等操作碾局,我們在下面會詳細(xì)解析
                    sendThread.primeConnection();
                }
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                //如果是發(fā)生的是IO讀寫事件執(zhí)行doIO,在下面我們會詳細(xì)解析
                doIO(pendingQueue, cnxn);
            }
        }
        if (sendThread.getZkState().isConnected()) {
           //如果發(fā)送隊(duì)列outgoingQueue有數(shù)據(jù)那么向selector注冊O(shè)P_WRITE監(jiān)聽
          //多說一句奴艾,因?yàn)閦ookeeper NIO一次IO寫出去的數(shù)據(jù)量有限制净当,所以在一次doIO完成后還需要判斷outgoingQueue是不是還有數(shù)據(jù)要寫,如果有那么就設(shè)置OP_WRITE監(jiān)聽
            if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
                enableWrite();
            }
        }
      //清空事件
        selected.clear();
    }

當(dāng)客戶端和服務(wù)端建立起socket連接之后握侧,緊接著就是session的建立蚯瞧,
sendThread.primeConnection完成了這一過程

sendThread.primeConnection

客戶端在完成到服務(wù)端的socket連接建立之后,會向服務(wù)端發(fā)起建立session會話的請求品擎,下面就是這一邏輯的實(shí)現(xiàn)

void primeConnection() throws IOException {
            LOG.info(
                "Socket connection established, initiating session, client: {}, server: {}",
                clientCnxnSocket.getLocalSocketAddress(),
                clientCnxnSocket.getRemoteSocketAddress());
            isFirstConnect = false;
            long sessId = (seenRwServerBefore) ? sessionId : 0;
             //初始化創(chuàng)建會話請求
            ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
            // We add backwards since we are pushing into the front
            // Only send if there's a pending watch
            // TODO: here we have the only remaining use of zooKeeper in
            // this class. It's to be eliminated!
            if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
             //下面很長的這段代碼是客戶端處理各種watcher發(fā)送到服務(wù)端的情況
                List<String> dataWatches = zooKeeper.getDataWatches();
                List<String> existWatches = zooKeeper.getExistWatches();
                List<String> childWatches = zooKeeper.getChildWatches();
                List<String> persistentWatches = zooKeeper.getPersistentWatches();
                List<String> persistentRecursiveWatches = zooKeeper.getPersistentRecursiveWatches();
                if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()
                        || !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) {
                    Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
                    Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
                    Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
                    Iterator<String> persistentWatchesIter = prependChroot(persistentWatches).iterator();
                    Iterator<String> persistentRecursiveWatchesIter = prependChroot(persistentRecursiveWatches).iterator();
                    long setWatchesLastZxid = lastZxid;

                    while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()
                            || persistentWatchesIter.hasNext() || persistentRecursiveWatchesIter.hasNext()) {
                        List<String> dataWatchesBatch = new ArrayList<String>();
                        List<String> existWatchesBatch = new ArrayList<String>();
                        List<String> childWatchesBatch = new ArrayList<String>();
                        List<String> persistentWatchesBatch = new ArrayList<String>();
                        List<String> persistentRecursiveWatchesBatch = new ArrayList<String>();
                        int batchLength = 0;

                        // Note, we may exceed our max length by a bit when we add the last
                        // watch in the batch. This isn't ideal, but it makes the code simpler.
                        while (batchLength < SET_WATCHES_MAX_LENGTH) {
                            final String watch;
                            if (dataWatchesIter.hasNext()) {
                                watch = dataWatchesIter.next();
                                dataWatchesBatch.add(watch);
                            } else if (existWatchesIter.hasNext()) {
                                watch = existWatchesIter.next();
                                existWatchesBatch.add(watch);
                            } else if (childWatchesIter.hasNext()) {
                                watch = childWatchesIter.next();
                                childWatchesBatch.add(watch);
                            }  else if (persistentWatchesIter.hasNext()) {
                                watch = persistentWatchesIter.next();
                                persistentWatchesBatch.add(watch);
                            } else if (persistentRecursiveWatchesIter.hasNext()) {
                                watch = persistentRecursiveWatchesIter.next();
                                persistentRecursiveWatchesBatch.add(watch);
                            } else {
                                break;
                            }
                            batchLength += watch.length();
                        }

                        Record record;
                        int opcode;
                        if (persistentWatchesBatch.isEmpty() && persistentRecursiveWatchesBatch.isEmpty()) {
                            // maintain compatibility with older servers - if no persistent/recursive watchers
                            // are used, use the old version of SetWatches
                            record = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch);
                            opcode = OpCode.setWatches;
                        } else {
                            record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch,
                                    childWatchesBatch, persistentWatchesBatch, persistentRecursiveWatchesBatch);
                            opcode = OpCode.setWatches2;
                        }
                   //set watcher 請求的header
                        RequestHeader header = new RequestHeader(ClientCnxn.SET_WATCHES_XID, opcode);
                        //把請求頭和請求體封裝成Packet對象然后放入outgoingQueue中埋合,等待發(fā)送
                        Packet packet = new Packet(header, new ReplyHeader(), record, null, null);
                        outgoingQueue.addFirst(packet);
                    }
                }
            }
           
            for (AuthData id : authInfo) {
              //把客戶端認(rèn)證信息放入outgoingQueue中
                outgoingQueue.addFirst(
                    new Packet(
                        new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth),
                        null,
                        new AuthPacket(0, id.scheme, id.data),
                        null,
                        null));
            }
          //最后把連接請求加入到outgoingQueue的頭部,
            outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
            //通過connectionPrimed向selector注冊op_read和op_write事件
            clientCnxnSocket.connectionPrimed();
            LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress());
        }

上面的發(fā)送的連接請求就會觸發(fā)ClientCnxnSocketNIO.doIO方法

ClientCnxnSocketNIO.doIO

客戶端處理IO事件的方法萄传,這個(gè)方法也是很長請大家耐心看完甚颂,我都耐心的分析完了,我想讀者應(yīng)該更有耐心讀完

void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
       //從SelectionKey中獲取SocketChannel
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        //如果是可讀事件發(fā)生
        if (sockKey.isReadable()) {
           //從socket中讀取消息
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
                                               + Long.toHexString(sessionId)
                                               + ", likely server has closed socket");
            }
            
            if (!incomingBuffer.hasRemaining()) {
              //數(shù)據(jù)讀取完成秀菱,設(shè)置byteBuffer狀態(tài)準(zhǔn)備讀
                incomingBuffer.flip();
   
                if (incomingBuffer == lenBuffer) {
                  //incomingBuffer等于lenBuffer說讀取的消息長度信息
                    recvCount.getAndIncrement();
                     //獲取到了消息的長度振诬,那么就初始化一個(gè)相應(yīng)長度的bytebuffer,為讀取消息做準(zhǔn)備
                    readLength();
                } else if (!initialized) {
                   //如果連接還沒有初始化衍菱,說明session會話還沒建立完成赶么,
                   //那么通過readConnectResult來處理服務(wù)端發(fā)送來的ConnectResponse,下面我們會解析
                    readConnectResult();
                    //注冊op_read事件
                    enableRead();
                    //下面同樣是根據(jù)outgoingQueue的狀態(tài)設(shè)置op_write信息
                    if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
                        // Since SASL authentication has completed (if client is configured to do so),
                        // outgoing packets waiting in the outgoingQueue can now be sent.
                        enableWrite();
                    }
                    //下面是一些設(shè)置和清理操作
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                    initialized = true;
                } else {
                    //上面是讀取消息長度的過程脊串,下面就是讀取消息體過程辫呻,
                   //readResponse下面會詳細(xì)解析
                    sendThread.readResponse(incomingBuffer);
                   //重置lenBuffer
                    lenBuffer.clear();
                   //設(shè)置incomingBuffer = lenBuffer 為下次讀取做準(zhǔn)備
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                }
            }
        }
        //下面是處理寫事件的過程
        if (sockKey.isWritable()) {
            //從outgoingQueue中獲取第一個(gè)待發(fā)送的消息
            Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());

            if (p != null) {
                updateLastSend();
                // If we already started writing p, p.bb will already exist
                if (p.bb == null) {
                    if ((p.requestHeader != null)
                        && (p.requestHeader.getType() != OpCode.ping)
                        && (p.requestHeader.getType() != OpCode.auth)) {
                        //如果請求不是ping和auth類型,那么客戶端為了保證請求按照順序處理琼锋,會在requestHeader中設(shè)置xid放闺,xid在客戶端按照自增的形式產(chǎn)生
                        p.requestHeader.setXid(cnxn.getXid());
                    }
                   //把請求消息對象轉(zhuǎn)換成byteBuffer,下面會解析
                    p.createBB();
                }
               //把消息通過socket發(fā)送給服務(wù)端
                sock.write(p.bb);
                if (!p.bb.hasRemaining()) {
                   //如果一個(gè)消息被一次性的發(fā)送了缕坎,那么從outgoingQueue把這個(gè)消息刪除怖侦,如果一次write io操作沒有把一個(gè)消息寫完,那么這個(gè)消息會繼續(xù)存在outgoingQueue中等待下一次write io 繼續(xù)寫出去
                    sentCount.getAndIncrement();
                    outgoingQueue.removeFirstOccurrence(p);
                    if (p.requestHeader != null
                        && p.requestHeader.getType() != OpCode.ping
                        && p.requestHeader.getType() != OpCode.auth) {
                        //如果發(fā)送的請求不是ping和auth類型的谜叹,那么這個(gè)請求需要等待服務(wù)端的response匾寝,把該請求放入pendingQueue中
                        synchronized (pendingQueue) {
                            pendingQueue.add(p);
                        }
                    }
                }
            }
            if (outgoingQueue.isEmpty()) {
                //如果outgoingQueue中的所有消息都發(fā)送了,那么取消對op_write的監(jiān)控
                // No more packets to send: turn off write interest flag.
                // Will be turned on later by a later call to enableWrite(),
                // from within ZooKeeperSaslClient (if client is configured
                // to attempt SASL authentication), or in either doIO() or
                // in doTransport() if not.
                disableWrite();
            } else if (!initialized && p != null && !p.bb.hasRemaining()) {
                // On initial connection, write the complete connect request
                // packet, but then disable further writes until after
                // receiving a successful connection response.  If the
                // session is expired, then the server sends the expiration
                // response and immediately closes its end of the socket.  If
                // the client is simultaneously writing on its end, then the
                // TCP stack may choose to abort with RST, in which case the
                // client would never receive the session expired event.  See
                // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
      //簡單來說就是在初始連接成功后但是很快session超時(shí)了叉谜,這個(gè)時(shí)候服務(wù)端會給客戶端發(fā)送session超時(shí)事件同時(shí)關(guān)閉socket連接旗吁,如果與此同時(shí)客戶端發(fā)送消息給服務(wù)端,會導(dǎo)致TCP的RST狀態(tài)從而導(dǎo)致客戶端收不到session 超時(shí)的消息停局。故而在連接沒有完成的情況下initialized=false很钓,客戶端取消對op_write的監(jiān)聽
                disableWrite();
            } else {
                // Just in case
               //就像注釋一樣香府,以防萬一,outgoingQueue還有數(shù)據(jù)繼續(xù)注冊op_write監(jiān)聽
                enableWrite();
            }
        }
    }

上面分析了doIO的邏輯码倦,還有幾個(gè)小點(diǎn)需要交待企孩,我們先看下消息是如何轉(zhuǎn)化成ByteBuffer的
我們先看消息對象


Packet.png

對于發(fā)送端來說,Packet類有兩個(gè)屬性袁稽,請求頭requestHeader和請求體request
這個(gè)兩個(gè)屬性的數(shù)據(jù)會轉(zhuǎn)化成ByteBuffer類型的bb勿璃,下面我們就分析這個(gè)過程

 public void createBB() {
            try {
               //創(chuàng)建輸出流
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
              //消息流的前4個(gè)字節(jié)是全部消息的長度,但是目前還沒確定推汽,所以先初始化成-1
                boa.writeInt(-1, "len"); // We'll fill this in later
                if (requestHeader != null) {
                   //requestHeader序列化到消息輸出流中补疑,requestHeader會序列化兩個(gè)屬性:客戶端事物id(xid),請求類型碼type歹撒,其實(shí)這個(gè)對象序列化結(jié)果也是固定長度[ 4(xid) +  4(type) = 8個(gè)字節(jié) ]
                    requestHeader.serialize(boa, "header");
                }
                if (request instanceof ConnectRequest) {
                    //如果請求是創(chuàng)建會話連接莲组,序列化ConnectRequest到消息輸出流
                    request.serialize(boa, "connect");
                    // append "am-I-allowed-to-be-readonly" flag
                    boa.writeBool(readOnly, "readOnly");
                } else if (request != null) {
                //序列化請求到消息輸出流
                    request.serialize(boa, "request");
                }
         
                baos.close();
                //把消息輸出流轉(zhuǎn)化成ByteBuffer
                this.bb = ByteBuffer.wrap(baos.toByteArray());
              //設(shè)置消息的長度
                this.bb.putInt(this.bb.capacity() - 4);
               //為寫做準(zhǔn)備
                this.bb.rewind();
            } catch (IOException e) {
                LOG.warn("Unexpected exception", e);
            }
        }

zookeeper使用自帶的JUTE作為序列化實(shí)現(xiàn),有興趣的可以去研究下
上面解析了消息發(fā)送時(shí)的結(jié)構(gòu)暖夭,接來下我們分析客戶端處理會話創(chuàng)建完成的response

SendThread.readConnectResult
void readConnectResult() throws IOException {
        if (LOG.isTraceEnabled()) {
            StringBuilder buf = new StringBuilder("0x[");
            for (byte b : incomingBuffer.array()) {
                buf.append(Integer.toHexString(b)).append(",");
            }
            buf.append("]");
            if (LOG.isTraceEnabled()) {
                LOG.trace("readConnectResult {} {}", incomingBuffer.remaining(), buf.toString());
            }
        }
       //通過ByteBuffer創(chuàng)建輸入流
        ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
        ConnectResponse conRsp = new ConnectResponse();
        //反序列化ConnectResponse
        conRsp.deserialize(bbia, "connect");

        // read "is read-only" flag
        boolean isRO = false;
        try {
            isRO = bbia.readBool("readOnly");
        } catch (IOException e) {
            // this is ok -- just a packet from an old server which
            // doesn't contain readOnly field
            LOG.warn("Connected to an old server; r-o mode will be unavailable");
        }
         //從反序列化的ConnectResponse對象中獲得sessionId
        this.sessionId = conRsp.getSessionId();
       //向eventThread發(fā)送連接結(jié)果的通知
        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
    }

到此zookeeper 完成了客戶端的啟動锹杈,客戶端啟動包含了socket連接建立和session建立的過程,下圖對上面過程做了簡短的概述


連接創(chuàng)建.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末迈着,一起剝皮案震驚了整個(gè)濱河市竭望,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌裕菠,老刑警劉巖咬清,帶你破解...
    沈念sama閱讀 216,591評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異奴潘,居然都是意外死亡枫振,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評論 3 392
  • 文/潘曉璐 我一進(jìn)店門萤彩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人斧拍,你說我怎么就攤上這事雀扶。” “怎么了肆汹?”我有些...
    開封第一講書人閱讀 162,823評論 0 353
  • 文/不壞的土叔 我叫張陵愚墓,是天一觀的道長。 經(jīng)常有香客問我昂勉,道長浪册,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,204評論 1 292
  • 正文 為了忘掉前任岗照,我火速辦了婚禮村象,結(jié)果婚禮上笆环,老公的妹妹穿的比我還像新娘。我一直安慰自己厚者,他們只是感情好躁劣,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,228評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著库菲,像睡著了一般账忘。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上熙宇,一...
    開封第一講書人閱讀 51,190評論 1 299
  • 那天鳖擒,我揣著相機(jī)與錄音,去河邊找鬼烫止。 笑死蒋荚,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的烈拒。 我是一名探鬼主播圆裕,決...
    沈念sama閱讀 40,078評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼荆几!你這毒婦竟也來了吓妆?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,923評論 0 274
  • 序言:老撾萬榮一對情侶失蹤吨铸,失蹤者是張志新(化名)和其女友劉穎行拢,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體诞吱,經(jīng)...
    沈念sama閱讀 45,334評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡舟奠,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,550評論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了房维。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片沼瘫。...
    茶點(diǎn)故事閱讀 39,727評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖咙俩,靈堂內(nèi)的尸體忽然破棺而出耿戚,到底是詐尸還是另有隱情,我是刑警寧澤阿趁,帶...
    沈念sama閱讀 35,428評論 5 343
  • 正文 年R本政府宣布膜蛔,位于F島的核電站,受9級特大地震影響脖阵,放射性物質(zhì)發(fā)生泄漏皂股。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,022評論 3 326
  • 文/蒙蒙 一命黔、第九天 我趴在偏房一處隱蔽的房頂上張望呜呐。 院中可真熱鬧就斤,春花似錦、人聲如沸卵史。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽以躯。三九已至槐秧,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間忧设,已是汗流浹背刁标。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留址晕,地道東北人膀懈。 一個(gè)月前我還...
    沈念sama閱讀 47,734評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像谨垃,于是被迫代替她去往敵國和親启搂。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,619評論 2 354