zk源碼閱讀20:zk client之網(wǎng)絡(luò)I/O(四) ClientCnxn.SendThread

概述

SendThread是客戶端ClientCnxn內(nèi)部的一個核心I/O調(diào)度線程,用于管理客戶端與服務(wù)端之間的所有網(wǎng)絡(luò)I/O操作,在Zookeeper客戶端實(shí)際運(yùn)行中,SendThread的作用如下:

  1. 維護(hù)了客戶端與服務(wù)端之間的會話生命周期(通過一定周期頻率內(nèi)向服務(wù)端發(fā)送PING包檢測心跳),如果會話周期內(nèi)客戶端與服務(wù)端出現(xiàn)TCP連接斷開尼桶,那么就會自動且透明地完成重連操作。
  2. 管理了客戶端所有的請求發(fā)送和響應(yīng)接收操作锯仪,其將上層客戶端API操作轉(zhuǎn)換成相應(yīng)的請求協(xié)議并發(fā)送到服務(wù)端泵督,并完成對同步調(diào)用的返回和異步調(diào)用的回調(diào)。
  3. 將來自服務(wù)端的事件傳遞給EventThread去處理庶喜。

源碼

屬性

sendThread屬性

意義如下

字段 意義
lastPingSentNs 上一次ping的 nano time
clientCnxnSocket 通信層ClientCnxnSocket
r 隨機(jī)數(shù)生成器
isFirstConnect 是否第一次connect
rwServerAddress 讀寫server地址
minPingRwTimeout 最短ping 讀寫server的 timeout時間
maxPingRwTimeout 最長ping 讀寫server的 timeout時間
pingRwTimeout 默認(rèn)ping 讀寫server的 timeout時間
saslLoginFailed sasl登錄失敗
RETRY_CONN_MSG 日志

函數(shù)

SendThread函數(shù)

簡要介紹如下

方法 作用 備注
SendThread 構(gòu)造函數(shù)
readResponse 讀取server的回復(fù)小腊,進(jìn)行outgoingQueue以及pendingQueue的相關(guān)處理,事件觸發(fā)等等 重要
getZkState 或者client狀態(tài)
getClientCnxnSocket 獲取通信層clientCnxnSocket
primeConnection 主要連接函數(shù)久窟,用于將watches和authData傳給server秩冈,允許clientCnxnSocket可讀寫 重要
prependChroot 根據(jù)clientPath以及chrootPath得到serverPath
sendPing ping命令,記錄發(fā)出時間斥扛,生成請求入问,加入outgoingQueue待發(fā)送 重要
startConnect 開始連接,主要是和server的socket完成connect和accept 重要
logStartConnect log
run 線程方法稀颁,完成了連接驗(yàn)證芬失,超時檢測,ping命令匾灶,以及網(wǎng)絡(luò)IO 重要
pingRwServer ping讀寫server 重要
cleanup socket清理以及通知兩個queue失去連接 以及 清理兩個隊(duì)列
onConnected 讀取server的connect response后棱烂,設(shè)置相關(guān)參數(shù) 重要
close 關(guān)閉socket
testableCloseSocket
clientTunneledAuthenticationInProgress 是否在驗(yàn)證sasl
sendPacket 發(fā)送packet

將幾個重要的函數(shù)進(jìn)行源碼講解


readResponse方法

可以拆成幾部分,分別完成

1.處理ping命令,AuthPacket,WatcherEvent,驗(yàn)證sasl并返回
2.從pendingQueue取出packet進(jìn)行驗(yàn)證(有順序保證)
3.調(diào)用finishPacket完成AsyncCallBack處理以及watcher的注冊

第一部分代碼如下

    ByteBufferInputStream bbis = new ByteBufferInputStream(
                    incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();

            replyHdr.deserialize(bbia, "header");//反序列化出 回復(fù)頭
            if (replyHdr.getXid() == -2) {
                // -2 is the xid for pings
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got ping response for sessionid: 0x"
                            + Long.toHexString(sessionId)
                            + " after "
                            + ((System.nanoTime() - lastPingSentNs) / 1000000)
                            + "ms");
                }
                return;
            }
            if (replyHdr.getXid() == -4) {
                // -4 is the xid for AuthPacket               
                if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                    state = States.AUTH_FAILED;                    
                    eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
                            Watcher.Event.KeeperState.AuthFailed, null) ); //加入eventThread
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got auth sessionid:0x"
                            + Long.toHexString(sessionId));
                }
                return;
            }
            if (replyHdr.getXid() == -1) {//-1代表通知類型 即WatcherEvent

                // -1 means notification
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got notification sessionid:0x"
                        + Long.toHexString(sessionId));
                }
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");//反序列化WatcherEvent

                // convert from a server path to a client path
                if (chrootPath != null) {//把serverPath轉(zhuǎn)化成clientPath
                    String serverPath = event.getPath();
                    if(serverPath.compareTo(chrootPath)==0)
                        event.setPath("/");
                    else if (serverPath.length() > chrootPath.length())
                        event.setPath(serverPath.substring(chrootPath.length()));
                    else {
                        LOG.warn("Got server path " + event.getPath()
                                + " which is too short for chroot path "
                                + chrootPath);
                    }
                }

                WatchedEvent we = new WatchedEvent(event);//WatcherEvent還原成WatchedEvent
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }
                eventThread.queueEvent( we );//加入eventThread
                return;
            }

            // If SASL authentication is currently in progress, construct and
            // send a response packet immediately, rather than queuing a
            // response as with other packets.
            if (clientTunneledAuthenticationInProgress()) {
                GetSASLRequest request = new GetSASLRequest();
                request.deserialize(bbia,"token");
                zooKeeperSaslClient.respondToServer(request.getToken(),
                  ClientCnxn.this);
                return;
            }

可以看出都是直接return的

第2,3部分代碼如下

    Packet packet;//auth和ping以及正在處理的sasl沒有加入pendingQueue,觸發(fā)的watch也沒有在pendingQueue中(是server主動發(fā)過來的),而AsyncCallBack在(見finishPacket)
            synchronized (pendingQueue) {
                if (pendingQueue.size() == 0) {
                    throw new IOException("Nothing in the queue, but got "
                            + replyHdr.getXid());
                }
                packet = pendingQueue.remove();//得到了response阶女,從pendingQueue中移除
            }
            /*
             * Since requests are processed in order, we better get a response
             * to the first request!
             */
            try {
                if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                    packet.replyHeader.setErr(
                            KeeperException.Code.CONNECTIONLOSS.intValue());
                    throw new IOException("Xid out of order. Got Xid "
                            + replyHdr.getXid() + " with err " +
                            + replyHdr.getErr() +
                            " expected Xid "
                            + packet.requestHeader.getXid()
                            + " for a packet with details: "
                            + packet );
                }

                packet.replyHeader.setXid(replyHdr.getXid());//設(shè)置replyHeader
                packet.replyHeader.setErr(replyHdr.getErr());
                packet.replyHeader.setZxid(replyHdr.getZxid());
                if (replyHdr.getZxid() > 0) {
                    lastZxid = replyHdr.getZxid();
                }
                if (packet.response != null && replyHdr.getErr() == 0) {
                    packet.response.deserialize(bbia, "response");
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Reading reply sessionid:0x"
                            + Long.toHexString(sessionId) + ", packet:: " + packet);
                }
            } finally {
                finishPacket(packet);
            }

這里可以看出
auth和ping以及正在處理的sasl沒有加入pendingQueue,觸發(fā)的watch也沒有在pendingQueue中(是server主動發(fā)過來的),而AsyncCallBack在pendingQueue中(見finishPacket)


primeConnection方法

primeConnection主要完成

  根據(jù)之前是否連接過設(shè)置sessId以及生成ConnectRequest
  根據(jù)disableAutoWatchReset將已有的watches和authData以及放入outgoingQueue準(zhǔn)備發(fā)送
  允許clientCnxnSocket可讀寫,表示和server準(zhǔn)備IO

代碼如下

    void primeConnection() throws IOException {
            LOG.info("Socket connection established to "
                     + clientCnxnSocket.getRemoteSocketAddress()
                     + ", initiating session");
            isFirstConnect = false;
            long sessId = (seenRwServerBefore) ? sessionId : 0;//如果之前見過讀寫server就設(shè)置sessionId,否則默認(rèn)0
            ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                    sessionTimeout, sessId, sessionPasswd);//生成ConnectRequest
            synchronized (outgoingQueue) {
                // We add backwards since we are pushing into the front
                // Only send if there's a pending watch
                // TODO: here we have the only remaining use of zooKeeper in
                // this class. It's to be eliminated!
                if (!disableAutoWatchReset) {
                    List<String> dataWatches = zooKeeper.getDataWatches();
                    List<String> existWatches = zooKeeper.getExistWatches();
                    List<String> childWatches = zooKeeper.getChildWatches();
                    if (!dataWatches.isEmpty()
                                || !existWatches.isEmpty() || !childWatches.isEmpty()) {

                        Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();//根據(jù)chrootPath轉(zhuǎn)化成serverPath
                        Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
                        Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
                        long setWatchesLastZxid = lastZxid;

                        while (dataWatchesIter.hasNext()
                                       || existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
                            List<String> dataWatchesBatch = new ArrayList<String>();
                            List<String> existWatchesBatch = new ArrayList<String>();
                            List<String> childWatchesBatch = new ArrayList<String>();
                            int batchLength = 0;

                            // Note, we may exceed our max length by a bit when we add the last
                            // watch in the batch. This isn't ideal, but it makes the code simpler.
                            while (batchLength < SET_WATCHES_MAX_LENGTH) {//限定長度
                                final String watch;
                                if (dataWatchesIter.hasNext()) {
                                    watch = dataWatchesIter.next();
                                    dataWatchesBatch.add(watch);
                                } else if (existWatchesIter.hasNext()) {
                                    watch = existWatchesIter.next();
                                    existWatchesBatch.add(watch);
                                } else if (childWatchesIter.hasNext()) {
                                    watch = childWatchesIter.next();
                                    childWatchesBatch.add(watch);
                                } else {
                                    break;
                                }
                                batchLength += watch.length();
                            }

                            SetWatches sw = new SetWatches(setWatchesLastZxid,
                                    dataWatchesBatch,
                                    existWatchesBatch,
                                    childWatchesBatch);//設(shè)置watches
                            RequestHeader h = new RequestHeader();
                            h.setType(ZooDefs.OpCode.setWatches);
                            h.setXid(-8);
                            Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
                            outgoingQueue.addFirst(packet);//加入發(fā)送隊(duì)列
                        }
                    }
                }

                for (AuthData id : authInfo) {
                    outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
                            OpCode.auth), null, new AuthPacket(0, id.scheme,
                            id.data), null, null));//authInfo加入發(fā)送隊(duì)列
                }
                outgoingQueue.addFirst(new Packet(null, null, conReq,
                            null, null, readOnly));//ConnectRequest確保在發(fā)送隊(duì)列的第一個
            }
            clientCnxnSocket.enableReadWriteOnly();//開啟讀寫颊糜,這樣outgoingQueue內(nèi)容就可以發(fā)出去了
            if (LOG.isDebugEnabled()) {
                LOG.debug("Session establishment request sent on "
                        + clientCnxnSocket.getRemoteSocketAddress());
            }
        }

主要注意:
1.sessId:如果之前連接過,那么重連用之前的sessionId张肾,否則默認(rèn)0,重連參見ClientCnxn.SendThread#startConnect的調(diào)用
2.什么時候連接會有watches需要去注冊?重連且disableAutoWatchReset為false的時候
3.ConnectRequest是放在outgoingQueue第一個的芭析,確保最先發(fā)出去的是連接請求(保證了第一個response是被ClientCnxnSocket#readConnectResult處理)


sendPing方法

這個就是個異步調(diào)用

    private void sendPing() {//ping命令,記錄發(fā)出時間吞瞪,生成請求,加入outgoingQueue待發(fā)送
            lastPingSentNs = System.nanoTime();
            RequestHeader h = new RequestHeader(-2, OpCode.ping);
            queuePacket(h, null, null, null, null, null, null, null, null);
        }

注意一點(diǎn)驾孔,在run方法會將outgoingQueue的內(nèi)容發(fā)送出去芍秆,在ClientCnxnSocketNIO#doIO中惯疙,
ping命令的packet是沒有進(jìn)入pendingQueue的


startConnect方法
干的事情很簡單

    1.根據(jù)hostProvider或者已經(jīng)設(shè)置的讀寫服務(wù)器地址確定server 地址
    2.sasl相關(guān)處理
    3.調(diào)用clientCnxnSocket.connect

源碼如下

    private void startConnect() throws IOException {//開始連接
            state = States.CONNECTING;

            InetSocketAddress addr;
            if (rwServerAddress != null) {
                addr = rwServerAddress;//有rwServerAddress 就設(shè)置
                rwServerAddress = null;
            } else {
                addr = hostProvider.next(1000);//沒有就從服務(wù)器地址列表取出來一個
            }

            setName(getName().replaceAll("\\(.*\\)",
                    "(" + addr.getHostName() + ":" + addr.getPort() + ")"));//設(shè)置線程名字
            if (ZooKeeperSaslClient.isEnabled()) {//如果開啟了sasl,這部分不清楚妖啥,忽略
                try {
                    String principalUserName = System.getProperty(
                            ZK_SASL_CLIENT_USERNAME, "zookeeper");
                    zooKeeperSaslClient =
                        new ZooKeeperSaslClient(
                                principalUserName+"/"+addr.getHostName());
                } catch (LoginException e) {
                    // An authentication error occurred when the SASL client tried to initialize:
                    // for Kerberos this means that the client failed to authenticate with the KDC.
                    // This is different from an authentication error that occurs during communication
                    // with the Zookeeper server, which is handled below.
                    LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
                      + "SASL authentication, if Zookeeper server allows it.");
                    eventThread.queueEvent(new WatchedEvent(
                      Watcher.Event.EventType.None,
                      Watcher.Event.KeeperState.AuthFailed, null));
                    saslLoginFailed = true;
                }
            }
            logStartConnect(addr);//log

            clientCnxnSocket.connect(addr);//socket連接
        }

run方法

這個方法很重要

  1.clientCnxnSocket相關(guān)初始化
  2.不斷檢測clientCnxnSocket是否和服務(wù)器處于連接狀態(tài),沒有連接則進(jìn)行連接
  3.檢測是否超時:當(dāng)處于連接狀態(tài)時霉颠,檢測是否讀超時,當(dāng)處于未連接狀態(tài)時荆虱,檢測是否連接超時
  4.不斷的發(fā)送ping通知蒿偎,服務(wù)器端每接收到ping請求,就會從當(dāng)前時間重新計(jì)算session過期時間怀读,所以當(dāng)客戶端按照一定時間間隔不斷的發(fā)送ping請求诉位,就能保證客戶端的session不會過期
  5.如果當(dāng)前是只讀的話,不斷去找有沒有支持讀寫的server
  6.不斷進(jìn)行IO操作菜枷,發(fā)送請求隊(duì)列中的請求和讀取服務(wù)器端的響應(yīng)數(shù)據(jù)
  7.!state.isAlive()時苍糠,進(jìn)行相關(guān)清理工作

第1部分,clientCnxnSocket相關(guān)初始化

            clientCnxnSocket.introduce(this,sessionId);//clientCnxnSocket初始化
            clientCnxnSocket.updateNow();
            clientCnxnSocket.updateLastSendAndHeard();
            int to;
            long lastPingRwServer = System.currentTimeMillis();
            final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds

第2部分,不斷檢測clientCnxnSocket是否和服務(wù)器處于連接狀態(tài),沒有連接則進(jìn)行連接

        while (state.isAlive()) {
                try {
                    if (!clientCnxnSocket.isConnected()) {//如果clientCnxnSocket的SelectionKey為null
                        if(!isFirstConnect){//如果不是第一次連接就sleep一下
                            try {
                                Thread.sleep(r.nextInt(1000));
                            } catch (InterruptedException e) {
                                LOG.warn("Unexpected exception", e);
                            }
                        }
                        // don't re-establish connection if we are closing
                        if (closing || !state.isAlive()) {
                            break;
                        }
                        startConnect();//開始連接
                        clientCnxnSocket.updateLastSendAndHeard();
                    }

第3部分,檢測是否超時:當(dāng)處于連接狀態(tài)時,檢測是否讀超時啤誊,當(dāng)處于未連接狀態(tài)時岳瞭,檢測是否連接超時

    //檢測是否超時,分為讀超時和連接超時
                    if (state.isConnected()) {
                        // determine whether we need to send an AuthFailed event.
                        if (zooKeeperSaslClient != null) {
                            boolean sendAuthEvent = false;
                            if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                                try {
                                    zooKeeperSaslClient.initialize(ClientCnxn.this);
                                } catch (SaslException e) {
                                   LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
                                    state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                }
                            }
                            KeeperState authState = zooKeeperSaslClient.getKeeperState();
                            if (authState != null) {
                                if (authState == KeeperState.AuthFailed) {
                                    // An authentication error occurred during authentication with the Zookeeper Server.
                                    state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                } else {
                                    if (authState == KeeperState.SaslAuthenticated) {
                                        sendAuthEvent = true;
                                    }
                                }
                            }

                            if (sendAuthEvent == true) {
                                eventThread.queueEvent(new WatchedEvent(
                                      Watcher.Event.EventType.None,
                                      authState,null));
                            }
                        }
                        //如果已經(jīng)連接上,預(yù)計(jì)讀超時時間 - 距離上次讀已經(jīng)過去的時間
                        to = readTimeout - clientCnxnSocket.getIdleRecv();
                    } else {
                        //如果沒連接上,預(yù)計(jì)連接時間 - 上次讀已經(jīng)過去的時間
                        to = connectTimeout - clientCnxnSocket.getIdleRecv();
                    }
                    
                    if (to <= 0) {//代表讀超時或連接超時
                        String warnInfo;
                        warnInfo = "Client session timed out, have not heard from server in "
                            + clientCnxnSocket.getIdleRecv()
                            + "ms"
                            + " for sessionid 0x"
                            + Long.toHexString(sessionId);
                        LOG.warn(warnInfo);
                        throw new SessionTimeoutException(warnInfo);
                    }

第4部分,不斷的發(fā)送ping通知

    if (state.isConnected()) {
                        //1000(1 second) is to prevent race condition missing to send the second ping
                        //also make sure not to send too many pings when readTimeout is small 
                        int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - 
                                ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                        System.out.println("org.apache.zookeeper.ClientCnxn.SendThread.run readTimeout = " + readTimeout);
                        //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                        if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {//readTimeout已經(jīng)過了近一半的時間蚊锹,或者距離上次發(fā)送請求已過過了10s
                            sendPing();
                            clientCnxnSocket.updateLastSend();
                        } else {
                            if (timeToNextPing < to) {//如果預(yù)計(jì)下次ping的時間 < 實(shí)際距離下次ping的時間
                                to = timeToNextPing;
                            }
                        }
                    }

第5部分瞳筏,如果當(dāng)前是只讀的話,不斷去找有沒有支持讀寫的server

                // If we are in read-only mode, seek for read/write server
                    if (state == States.CONNECTEDREADONLY) {//如果是只讀的話
                        long now = System.currentTimeMillis();
                        int idlePingRwServer = (int) (now - lastPingRwServer);
                        if (idlePingRwServer >= pingRwTimeout) {
                            lastPingRwServer = now;
                            idlePingRwServer = 0;
                            pingRwTimeout =
                                Math.min(2*pingRwTimeout, maxPingRwTimeout);
                            pingRwServer();
                        }
                        to = Math.min(to, pingRwTimeout - idlePingRwServer);
                    }

第6部分,不斷進(jìn)行IO操作牡昆,發(fā)送請求隊(duì)列中的請求和讀取服務(wù)器端的響應(yīng)數(shù)據(jù)

      clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);//在特定時間內(nèi)姚炕,根據(jù)兩個queue進(jìn)行網(wǎng)絡(luò)傳輸

這個看clientCnxnSocket內(nèi)的源碼,已經(jīng)在之前講過了

第7部分迁杨,!state.isAlive()時钻心,進(jìn)行相關(guān)清理工作

    //下面是state is not alive的情況
            cleanup();
            clientCnxnSocket.close();//關(guān)閉socket
            if (state.isAlive()) {//???什么時候會出現(xiàn)這種情況
                eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                        Event.KeeperState.Disconnected, null));
            }
            ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
                    "SendThread exited loop for session: 0x"
                           + Long.toHexString(getSessionId()));

這個地方有點(diǎn)不理解,為什么還會出現(xiàn) if (state.isAlive()) 的情況


pingRwServer方法

這個方法是client連接了只讀的server時铅协,不斷根據(jù)hostProvider找到一個可讀寫的server

      private void pingRwServer() throws RWServerFoundException {//找到讀寫server捷沸,更新rwServerAddress
            String result = null;
            InetSocketAddress addr = hostProvider.next(0);
            LOG.info("Checking server " + addr + " for being r/w." +
                    " Timeout " + pingRwTimeout);

            Socket sock = null;
            BufferedReader br = null;
            try {
                sock = new Socket(addr.getHostName(), addr.getPort());
                sock.setSoLinger(false, -1);
                sock.setSoTimeout(1000);
                sock.setTcpNoDelay(true);
                sock.getOutputStream().write("isro".getBytes());
                sock.getOutputStream().flush();
                sock.shutdownOutput();
                br = new BufferedReader(
                        new InputStreamReader(sock.getInputStream()));
                result = br.readLine();
            } catch (ConnectException e) {
                // ignore, this just means server is not up
            } catch (IOException e) {
                // some unexpected error, warn about it
                LOG.warn("Exception while seeking for r/w server " +
                        e.getMessage(), e);
            } finally {
                if (sock != null) {
                    try {
                        sock.close();
                    } catch (IOException e) {
                        LOG.warn("Unexpected exception", e);
                    }
                }
                if (br != null) {
                    try {
                        br.close();
                    } catch (IOException e) {
                        LOG.warn("Unexpected exception", e);
                    }
                }
            }

            if ("rw".equals(result)) {
                pingRwTimeout = minPingRwTimeout;
                // save the found address so that it's used during the next
                // connection attempt
                rwServerAddress = addr;
                throw new RWServerFoundException("Majority server found at "
                        + addr.getHostName() + ":" + addr.getPort());
            }
        }

主要看最后的if條件就夠了
注意的是,如果更新了rwServerAddress 會拋異常,run方法處理異常狐史,會清理后進(jìn)行重連,來讓client重連到讀寫server上去

異常處理

onConnected方法

這個方法是收到了zk server的連接回復(fù)后的一些參數(shù)設(shè)置痒给,以及zk state的狀態(tài)改變

    void onConnected(int _negotiatedSessionTimeout, long _sessionId,
                byte[] _sessionPasswd, boolean isRO) throws IOException {//讀取server的connect response后,設(shè)置相關(guān)參數(shù)
            negotiatedSessionTimeout = _negotiatedSessionTimeout;
            if (negotiatedSessionTimeout <= 0) {
                state = States.CLOSED;

                eventThread.queueEvent(new WatchedEvent(
                        Watcher.Event.EventType.None,
                        Watcher.Event.KeeperState.Expired, null));
                eventThread.queueEventOfDeath();

                String warnInfo;
                warnInfo = "Unable to reconnect to ZooKeeper service, session 0x"
                    + Long.toHexString(sessionId) + " has expired";
                LOG.warn(warnInfo);
                throw new SessionExpiredException(warnInfo);
            }
            if (!readOnly && isRO) {//如果client不允許只讀骏全,但是目前是只讀
                LOG.error("Read/write client got connected to read-only server");
            }
            readTimeout = negotiatedSessionTimeout * 2 / 3;/
            connectTimeout = negotiatedSessionTimeout / hostProvider.size();
            hostProvider.onConnected();//更新hostProvider循環(huán)列表的index
            sessionId = _sessionId;
            sessionPasswd = _sessionPasswd;
            state = (isRO) ?
                    States.CONNECTEDREADONLY : States.CONNECTED;//根據(jù)isRO設(shè)置state
            seenRwServerBefore |= !isRO;//是否見過讀寫server
            LOG.info("Session establishment complete on server "
                    + clientCnxnSocket.getRemoteSocketAddress()
                    + ", sessionid = 0x" + Long.toHexString(sessionId)
                    + ", negotiated timeout = " + negotiatedSessionTimeout
                    + (isRO ? " (READ-ONLY mode)" : ""));
            KeeperState eventState = (isRO) ?
                    KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
            eventThread.queueEvent(new WatchedEvent(
                    Watcher.Event.EventType.None,
                    eventState, null));//加入watcherEvent
        }

思考

run方法主要干的幾件事情

  1.clientCnxnSocket相關(guān)初始化
  2.不斷檢測clientCnxnSocket是否和服務(wù)器處于連接狀態(tài),沒有連接則進(jìn)行連接
  3.檢測是否超時:當(dāng)處于連接狀態(tài)時苍柏,檢測是否讀超時,當(dāng)處于未連接狀態(tài)時姜贡,檢測是否連接超時
  4.不斷的發(fā)送ping通知试吁,服務(wù)器端每接收到ping請求,就會從當(dāng)前時間重新計(jì)算session過期時間,所以當(dāng)客戶端按照一定時間間隔不斷的發(fā)送ping請求熄捍,就能保證客戶端的session不會過期
  5.如果當(dāng)前是只讀的話烛恤,不斷去找有沒有支持讀寫的server
  6.不斷進(jìn)行IO操作,發(fā)送請求隊(duì)列中的請求和讀取服務(wù)器端的響應(yīng)數(shù)據(jù)
  7.!state.isAlive()時余耽,進(jìn)行相關(guān)清理工作

主要連接primeConnection干了什么,如果保證第一個發(fā)出去的請求是connect請求

  根據(jù)之前是否連接過設(shè)置sessId以及生成ConnectRequest
  根據(jù)disableAutoWatchReset將已有的watches和authData以及放入outgoingQueue準(zhǔn)備發(fā)送
  允許clientCnxnSocket可讀寫,表示和server準(zhǔn)備IO

隊(duì)列里面第一個就是connect請求

sessId和sessionId

根據(jù)seenRwServerBefore判斷
沒有連接過 sessId就是0
有連接過 則sessId就是上次連接的sessionId
用sessId和全局的sessionPasswd去連接

哪些回復(fù)是不存在于pendingQueue當(dāng)中的

auth和ping以及正在處理的sasl沒有加入pendingQueue,觸發(fā)的watch也沒有在pendingQueue中

startConnect和primeConnection區(qū)別是什么

兩者的區(qū)別在于NIO的SelectionKey
前者限于connect和accept
后者已經(jīng)連接完成缚柏,開始了write和read,準(zhǔn)備開始和zk server完成socket io

pingRwServer和sendPing兩個函數(shù)區(qū)別是什么

前者是目前client只連接了只讀的zk server碟贾,會不斷地調(diào)用币喧,更新rwServerAddress
后者是不論client處于什么模式,都要進(jìn)行的心跳驗(yàn)證

clientCnxnSocket.isConnected()和isFirstConnect為什么有這兩個參數(shù)

isFirstConnect代表client 第一次連接袱耽,如果不是第一次連接,就sleep一段時間杀餐,然后從hostProvider選出下一個server addr

大體連接過程

首先與ZooKeeper服務(wù)器建立連接,有兩層連接要建立扛邑。

1.客戶端與服務(wù)器端的TCP連接
2.在TCP連接的基礎(chǔ)上建立session關(guān)聯(lián)

建立TCP連接之后怜浅,客戶端發(fā)送ConnectRequest請求,申請建立session關(guān)聯(lián)蔬崩,此時服務(wù)器端會為該客戶端分配sessionId和密碼恶座,同時開啟對該session是否超時的檢測。

當(dāng)在sessionTimeout時間內(nèi)沥阳,即還未超時跨琳,此時TCP連接斷開,服務(wù)器端仍然認(rèn)為該sessionId處于存活狀態(tài)桐罕。
此時脉让,客戶端會選擇下一個ZooKeeper服務(wù)器地址進(jìn)行TCP連接建立,TCP連接建立完成后功炮,拿著之前的sessionId和密碼發(fā)送ConnectRequest請求溅潜,如果還未到該sessionId的超時時間,則表示自動重連成功薪伏。
對客戶端用戶是透明的滚澜,一切都在背后默默執(zhí)行,ZooKeeper對象是有效的嫁怀。

如果重新建立TCP連接后设捐,已經(jīng)達(dá)到該sessionId的超時時間了(服務(wù)器端就會清理與該sessionId相關(guān)的數(shù)據(jù)),則返回給客戶端的sessionTimeout時間為0塘淑,sessionid為0萝招,密碼為空字節(jié)數(shù)組。
客戶端接收到該數(shù)據(jù)后存捺,會判斷協(xié)商后的sessionTimeout時間是否小于等于0槐沼,如果小于等于0,則使用eventThread線程先發(fā)出一個KeeperState.Expired事件,通知相應(yīng)的Watcher母赵。
然后結(jié)束EventThread線程的循環(huán)逸爵,開始走向結(jié)束具滴。此時ZooKeeper對象就是無效的了凹嘲,必須要重新new一個新的ZooKeeper對象,分配新的sessionId了构韵。

client一開始連接到了ReadOnly的server周蹭,后續(xù)找到rwServerAddress如何完成的重新連接

ClientCnxn.SendThread#run接收到RWServerFoundException異常,然后調(diào)用了


相關(guān)清理操作

cleanUp調(diào)用后使得ClientCnxnSocketNIO#isConnected為false
因此ClientCnxn.SendThread#run方法又進(jìn)入了連接的操作

                      if (!clientCnxnSocket.isConnected()) {//如果clientCnxnSocket的SelectionKey為null
                        if(!isFirstConnect){//如果不是第一次連接就sleep一下
                            try {
                                Thread.sleep(r.nextInt(1000));
                            } catch (InterruptedException e) {
                                LOG.warn("Unexpected exception", e);
                            }
                        }
                        // don't re-establish connection if we are closing
                        if (closing || !state.isAlive()) {
                            break;
                        }
                        startConnect();//開始連接
                        clientCnxnSocket.updateLastSendAndHeard();
                    }

問題

SendThread#run的疑惑

為什么while(state.isAlive()) break出去之后
還有 if (state.isAlive())

備注

這是后續(xù)看server的網(wǎng)絡(luò)IO代碼后需要搞清楚的

怎么保證server的處理和發(fā)送順序
什么時候server是只可讀的疲恢,什么時候是讀寫的
server如何分配的sessionId和pwd
server是如何區(qū)分連接請求的不同sessId的凶朗,后續(xù)待看

refer

https://my.oschina.net/pingpangkuangmo/blog/486780 run方法以及大體過程
http://www.cnblogs.com/leesf456/p/6098255.html 概念
http://www.voidcn.com/blog/aBOUNTWINTER/article/p-6400711.html 概念
http://shift-alt-ctrl.iteye.com/blog/1846971 RwServer,seenRwServerBefore讀寫server相關(guān)說明
《paxos到zk》

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市显拳,隨后出現(xiàn)的幾起案子棚愤,更是在濱河造成了極大的恐慌,老刑警劉巖杂数,帶你破解...
    沈念sama閱讀 221,576評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件宛畦,死亡現(xiàn)場離奇詭異,居然都是意外死亡揍移,警方通過查閱死者的電腦和手機(jī)次和,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來那伐,“玉大人踏施,你說我怎么就攤上這事『毖” “怎么了畅形?”我有些...
    開封第一講書人閱讀 168,017評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長诉探。 經(jīng)常有香客問我日熬,道長,這世上最難降的妖魔是什么阵具? 我笑而不...
    開封第一講書人閱讀 59,626評論 1 296
  • 正文 為了忘掉前任碍遍,我火速辦了婚禮,結(jié)果婚禮上阳液,老公的妹妹穿的比我還像新娘怕敬。我一直安慰自己,他們只是感情好帘皿,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,625評論 6 397
  • 文/花漫 我一把揭開白布东跪。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪虽填。 梳的紋絲不亂的頭發(fā)上喳挑,一...
    開封第一講書人閱讀 52,255評論 1 308
  • 那天,我揣著相機(jī)與錄音沐寺,去河邊找鬼挠羔。 笑死,一個胖子當(dāng)著我的面吹牛恶守,可吹牛的內(nèi)容都是我干的第献。 我是一名探鬼主播,決...
    沈念sama閱讀 40,825評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼兔港,長吁一口氣:“原來是場噩夢啊……” “哼庸毫!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起衫樊,我...
    開封第一講書人閱讀 39,729評論 0 276
  • 序言:老撾萬榮一對情侶失蹤飒赃,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后科侈,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體载佳,經(jīng)...
    沈念sama閱讀 46,271評論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,363評論 3 340
  • 正文 我和宋清朗相戀三年兑徘,在試婚紗的時候發(fā)現(xiàn)自己被綠了刚盈。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,498評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡挂脑,死狀恐怖藕漱,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情崭闲,我是刑警寧澤肋联,帶...
    沈念sama閱讀 36,183評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站刁俭,受9級特大地震影響橄仍,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜牍戚,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,867評論 3 333
  • 文/蒙蒙 一侮繁、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧如孝,春花似錦宪哩、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,338評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽彬祖。三九已至,卻和暖如春品抽,著一層夾襖步出監(jiān)牢的瞬間储笑,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,458評論 1 272
  • 我被黑心中介騙來泰國打工圆恤, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留突倍,地道東北人。 一個月前我還...
    沈念sama閱讀 48,906評論 3 376
  • 正文 我出身青樓哑了,卻偏偏與公主長得像赘方,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子弱左,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,507評論 2 359

推薦閱讀更多精彩內(nèi)容