zookeeper源碼分析(8)-會話管理

zookeeper客戶端和服務(wù)端維持一個(gè)TCP長連接左医,它們之間任何正常的通信都需要一個(gè)正常的會話。本文主要分析會話生命周期中會話狀態(tài)的變化過程和客戶端服務(wù)端如何管理會話爆侣。

客戶端記錄的會話狀態(tài)有:
ZooKeeper.States

public enum States {
//第一次會話正在創(chuàng)建的狀態(tài)
        CONNECTING, 
//沒有使用
        ASSOCIATING, 
//連接建立完成的狀態(tài)
        CONNECTED,
//只讀模式下鸟蜡,連接建立完成的狀態(tài)
        CONNECTEDREADONLY,
//會話關(guān)閉狀態(tài),包括客戶端主動關(guān)閉或者會話超時(shí)
        CLOSED,   
//授權(quán)失敗诉稍,未通過Sasl認(rèn)證
        AUTH_FAILED, 
//會話還未創(chuàng)建時(shí)的初始化狀態(tài)
        NOT_CONNECTED;
}

原生客戶端會話狀態(tài)變化時(shí)的觸發(fā)事件有:

public enum KeeperState {
            /** Unused, this state is never generated by the server */
            @Deprecated
            Unknown (-1),

         //客戶端發(fā)現(xiàn)與服務(wù)端斷開連接時(shí)蝠嘉,會馬上嘗試重連并觸發(fā)該事件
            Disconnected (0),

            /** Unused, this state is never generated by the server */
            @Deprecated
            NoSyncConnected (1),

//非只讀模式下,每次客戶端剛連接上服務(wù)端的時(shí)候會觸發(fā)該事件
            SyncConnected (3),
            //如果有權(quán)限驗(yàn)證的話杯巨,驗(yàn)證失敗觸發(fā)該事件
            AuthFailed (4),

//只讀模式下蚤告,每次客戶端剛連接上服務(wù)端的時(shí)候會觸發(fā)該事件
            ConnectedReadOnly (5),

            //如果有權(quán)限驗(yàn)證的話,驗(yàn)證成功觸發(fā)該事件
            SaslAuthenticated(6),

           //當(dāng)客戶端與服務(wù)端重新通信服爷,服務(wù)端認(rèn)為會話已超時(shí)杜恰,發(fā)送會話過期響應(yīng),觸發(fā)該事件
            Expired (-112);
}

可以看出仍源,會話的狀態(tài)主要包括CONNECTING(創(chuàng)建過程中)心褐,CONNECTED(創(chuàng)建完成),CLOSED(關(guān)閉會話)這三個(gè)狀態(tài)笼踩,其中狀態(tài)的改變也會觸發(fā)對應(yīng)的事件方便通知對應(yīng)的事件監(jiān)聽者逗爹。

在介紹具體的會話狀態(tài)變化前,先看下會話狀態(tài)變更流程圖:


會話創(chuàng)建

一次會話的創(chuàng)建過程中我們分析了會話的完整創(chuàng)建過程嚎于,此處聚焦會話的狀態(tài)變化和對應(yīng)觸發(fā)事件掘而。

客戶端處理
通過ClientCnxn.sendThread線程處理與服務(wù)端的連接和IO過程,第一次連接時(shí)調(diào)用
SendThread.startConnect

private void startConnect() throws IOException {
//1
            state = States.CONNECTING;

            InetSocketAddress addr;
            if (rwServerAddress != null) {
                addr = rwServerAddress;
                rwServerAddress = null;
            } else {
                addr = hostProvider.next(1000);
            }

          //2
            if (ZooKeeperSaslClient.isEnabled()) {
                try {
                    String principalUserName = System.getProperty(
                            ZK_SASL_CLIENT_USERNAME, "zookeeper");
                    zooKeeperSaslClient =
                        new ZooKeeperSaslClient(
                                principalUserName+"/"+addr.getHostString());
                } catch (LoginException e) {
                    LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
                      + "SASL authentication, if Zookeeper server allows it.");
                    eventThread.queueEvent(new WatchedEvent(
                      Watcher.Event.EventType.None,
                      Watcher.Event.KeeperState.AuthFailed, null));
                    saslLoginFailed = true;
                }
            }
            logStartConnect(addr);
//3
            clientCnxnSocket.connect(addr);
        }

主要流程為:
1.連接狀態(tài)設(shè)置為States.CONNECTING
2.從服務(wù)器列表中選取一個(gè)服務(wù)器地址,如果需要授權(quán)校驗(yàn)于购,則進(jìn)行校驗(yàn)
3.嘗試與服務(wù)器連接镣屹,如果連接上了,客戶端會發(fā)送創(chuàng)建會話的第一個(gè)請求价涝,SendThread等待服務(wù)端的響應(yīng)。線程中處理IO的方法為clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
ClientCnxnSocket是和服務(wù)端底層通信的實(shí)現(xiàn)持舆,它調(diào)用ClientCnxnSocketNIO.doIO處理讀寫事件

void doIO(List<Packet> pendingQueue, ClientCnxn cnxn)
      throws InterruptedException, IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sockKey.isReadable()) {
          if (!initialized) {
                    readConnectResult();
   
                    initialized = true;
                } 
               ··········省略不相關(guān)代碼·············
}

第一次讀到服務(wù)端的響應(yīng)數(shù)據(jù)時(shí)色瘩,會調(diào)用readConnectResult

void readConnectResult() throws IOException {
        ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
        ConnectResponse conRsp = new ConnectResponse();
        conRsp.deserialize(bbia, "connect");

        this.sessionId = conRsp.getSessionId();
        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
                conRsp.getPasswd(), isRO);
    }

主要是反序列化解析響應(yīng)并調(diào)用sendThread.onConnected進(jìn)行會話完成的回調(diào)處理

void onConnected(int _negotiatedSessionTimeout, long _sessionId,
                byte[] _sessionPasswd, boolean isRO) throws IOException {
            negotiatedSessionTimeout = _negotiatedSessionTimeout;
            if (negotiatedSessionTimeout <= 0) {
                state = States.CLOSED;

                eventThread.queueEvent(new WatchedEvent(
                        Watcher.Event.EventType.None,
                        Watcher.Event.KeeperState.Expired, null));
                eventThread.queueEventOfDeath();
                throw new SessionExpiredException(
                        "Unable to reconnect to ZooKeeper service, session 0x"
                                + Long.toHexString(sessionId) + " has expired");
            }
            if (!readOnly && isRO) {
                LOG.error("Read/write client got connected to read-only server");
            }
            readTimeout = negotiatedSessionTimeout * 2 / 3;
            connectTimeout = negotiatedSessionTimeout / hostProvider.size();
            hostProvider.onConnected();
            sessionId = _sessionId;
            sessionPasswd = _sessionPasswd;
            state = (isRO) ?
                    States.CONNECTEDREADONLY : States.CONNECTED;
            seenRwServerBefore |= !isRO;
            LOG.info("Session establishment complete on server "
                    + clientCnxnSocket.getRemoteSocketAddress()
                    + ", sessionid = 0x" + Long.toHexString(sessionId)
                    + ", negotiated timeout = " + negotiatedSessionTimeout
                    + (isRO ? " (READ-ONLY mode)" : ""));
            KeeperState eventState = (isRO) ?
                    KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
            eventThread.queueEvent(new WatchedEvent(
                    Watcher.Event.EventType.None,
                    eventState, null));
        }

主要流程為:
1.如果服務(wù)端返回的會話過期時(shí)間小于等于0,說明會話已經(jīng)過期逸寓,將會話狀態(tài)設(shè)置為States.CLOSED居兆,并觸發(fā)Expired事件
2.根據(jù)響應(yīng)內(nèi)容設(shè)置客戶端會話相關(guān)屬性,如readTimeout, ·connectTimeout,sessionId`等
3.根據(jù)客戶端是否只讀竹伸,設(shè)置會話狀態(tài)和觸發(fā)事件

  • 如果是只讀客戶端泥栖,將會話狀態(tài)設(shè)置為States. CONNECTEDREADONLY,并觸發(fā)ConnectedReadOnly事件
  • 如果不是只讀客戶端勋篓,將會話狀態(tài)設(shè)置為States. CONNECTED吧享,并觸發(fā)SyncConnected事件

服務(wù)端處理
因?yàn)榉?wù)端通過會話管理器來管理會話,所以先介紹下會話管理器的內(nèi)容譬嚣。

服務(wù)端初始化時(shí)會初始化自己的會話管理器SessionTracker sessionTracker,Leader服務(wù)器的實(shí)現(xiàn)為:LeaderSessionTracker钢颂,F(xiàn)ollower和Observer服務(wù)器的實(shí)現(xiàn)為LearnerSessionTracker
LeaderSessionTracker:負(fù)責(zé)所有會話激活,會話超時(shí)檢查拜银,會話清理殊鞭。

public class LeaderSessionTracker extends UpgradeableSessionTracker {
//是否為本地session遭垛,該值一般為false
    private final boolean localSessionsEnabled;
//全局會話管理器
    private final SessionTrackerImpl globalSessionTracker;

    /**
     * Server id of the leader
     */
    private final long serverId;

    public LeaderSessionTracker(SessionExpirer expirer,
            ConcurrentMap<Long, Integer> sessionsWithTimeouts,
            int tickTime, long id, boolean localSessionsEnabled,
            ZooKeeperServerListener listener) {

        this.globalSessionTracker = new SessionTrackerImpl(
            expirer, sessionsWithTimeouts, tickTime, id, listener);

        this.localSessionsEnabled = localSessionsEnabled;
        if (this.localSessionsEnabled) {
            createLocalSessionTracker(expirer, tickTime, id, listener);
        }
        serverId = id;
    }
··········
}

主要成員變量為SessionTrackerImpl globalSessionTracker,是全局會話管理的實(shí)現(xiàn)操灿。它的主要成員變量為:

//key:sessionId  value:session實(shí)體 锯仪,每個(gè)會話都會保存其中
protected final ConcurrentHashMap<Long, SessionImpl> sessionsById =
        new ConcurrentHashMap<Long, SessionImpl>();
//key:sessionId  value:會話超時(shí)時(shí)間 ,該數(shù)據(jù)結(jié)構(gòu)與內(nèi)存數(shù)據(jù)庫相連通趾盐,會被定期持久化到快照文件中
    private final ConcurrentMap<Long, Integer> sessionsWithTimeout;
//將會話按照各自的過期時(shí)間(優(yōu)化為心跳時(shí)間的整數(shù)倍)分桶存放庶喜,可快速用于會話的超時(shí)校驗(yàn)
    private final ExpiryQueue<SessionImpl> sessionExpiryQueue;
//當(dāng)前服務(wù)器創(chuàng)建會話的最新sessionId
    private final AtomicLong nextSessionId = new AtomicLong();
//會話超時(shí)清理器
private final SessionExpirer expirer;

//保存本地會話信息
private ConcurrentMap<Long, Integer> localSessionsWithTimeouts;
//本地會話相關(guān)
    protected LocalSessionTracker localSessionTracker;

其中sessionExpiryQueue按照每個(gè)會話的過期時(shí)間分桶管理會話。
ExpiryQueue

//key:每一個(gè)session實(shí)體谤碳,value:最近一次會話激活時(shí)計(jì)算出來的過期時(shí)間點(diǎn)
private final ConcurrentHashMap<E, Long> elemMap =
        new ConcurrentHashMap<E, Long>();
    //key:過期時(shí)間點(diǎn)溃卡,value:在這個(gè)時(shí)間點(diǎn)過期的會話集合
    private final ConcurrentHashMap<Long, Set<E>> expiryMap =
        new ConcurrentHashMap<Long, Set<E>>();
//expirer的下一個(gè)會話過期檢查時(shí)間
    private final AtomicLong nextExpirationTime = new AtomicLong();
//心跳時(shí)間
    private final int expirationInterval;

    public ExpiryQueue(int expirationInterval) {
        this.expirationInterval = expirationInterval;
        nextExpirationTime.set(roundToNextInterval(Time.currentElapsedTime()));
    }
//計(jì)算過期時(shí)間
    private long roundToNextInterval(long time) {
        return (time / expirationInterval + 1) * expirationInterval;
    }

所有會話都被按照各自的過期時(shí)間點(diǎn)分批放在expiryMap中,正常來說會話的過期時(shí)間點(diǎn)應(yīng)該為:會話創(chuàng)建時(shí)間(當(dāng)前時(shí)間) + 會話的超市時(shí)間,但是每個(gè)會話的創(chuàng)建時(shí)間是很隨機(jī)的蜒简,服務(wù)端不可能時(shí)時(shí)刻刻檢查每一個(gè)會話是否過期了瘸羡。心跳時(shí)間就是大體保證服務(wù)端定期檢查會話的時(shí)間間隔。如果將會話管理器的定期檢查會話的時(shí)間點(diǎn)會話的過期時(shí)間點(diǎn)都轉(zhuǎn)化為心跳時(shí)間的整數(shù)倍搓茬,那么就比較好管理會話犹赖。
通過roundToNextInterval方法將這些時(shí)間化為心跳時(shí)間的整數(shù)倍。
會話的分桶管理示意圖為:

globalSessionTracker線程會不斷從sessionExpiryQueue中獲取下一個(gè)過期時(shí)間點(diǎn)nextExpirationTime已經(jīng)超時(shí)的會話卷仑,調(diào)用expirer.expire進(jìn)行會話清理峻村。

public void run() {
        try {
            while (running) {
                long waitTime = sessionExpiryQueue.getWaitTime();
                if (waitTime > 0) {
                    Thread.sleep(waitTime);
                    continue;
                }

                for (SessionImpl s : sessionExpiryQueue.poll()) {
                    setSessionClosing(s.sessionId);
                    expirer.expire(s);
                }
            }
        } catch (InterruptedException e) {
            handleException(this.getName(), e);
        }
        LOG.info("SessionTrackerImpl exited loop!");
    }

LearnerSessionTracker:一是保存當(dāng)前follower或observer服務(wù)器的會話,當(dāng)leader服務(wù)器發(fā)送服務(wù)間心跳時(shí)锡凝,會把當(dāng)前所有會話響應(yīng)給leader粘昨,用于會話激活。二是如果會話是本地會話窜锯,當(dāng)遇到必須升級為全局會話的情況张肾,需要從LearnerSessionTracker取出會話交給leader創(chuàng)建全局會話。
主要成員變量為:

 private final SessionExpirer expirer;
    // key:sessionId, value:sessionTimeout 用于將會話交給leader激活
    private final AtomicReference<Map<Long, Integer>> touchTable =
        new AtomicReference<Map<Long, Integer>>();

    private final long serverId;
//當(dāng)前服務(wù)器創(chuàng)建會話的最新sessionId
    private final AtomicLong nextSessionId = new AtomicLong();
//是否可創(chuàng)建本地會話锚扎,一般為false
    private final boolean localSessionsEnabled;
//全局會話吞瞪,和定時(shí)快照有關(guān)
    private final ConcurrentMap<Long, Integer> globalSessionsWithTimeouts;
//保存本地會話信息
private ConcurrentMap<Long, Integer> localSessionsWithTimeouts;
//本地會話相關(guān)
    protected LocalSessionTracker localSessionTracker;

對于服務(wù)端來說,一個(gè)會話創(chuàng)建驾孔,會話信息會保存在leader服務(wù)器globalSessionTracker中的globalSessionTrackersessionsById , sessionsWithTimeout , sessionExpiryQueue中芍秆。同時(shí)也會保存在learner服務(wù)器LearnerSessionTracker中的touchTable,globalSessionsWithTimeouts中。所發(fā)送的響應(yīng)數(shù)據(jù)為:

 ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
                    : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
                            // longer valid
                            valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);

也就是會把會話的sessionTimeout翠勉,sessionId妖啥,sessionPasswd信息發(fā)送給客戶端。

心跳維持

為了保持客戶端會話的有效性对碌,客戶端在會話超時(shí)時(shí)間內(nèi)會向服務(wù)端發(fā)送PING請求來保持有效性迹栓。服務(wù)端接收到PING請求后會重新計(jì)算當(dāng)前會話的過期時(shí)間,激活會話。

客戶端處理
客戶端主動發(fā)送PING的邏輯在sendThread.run

public void run() {
            clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
            clientCnxnSocket.updateNow();
            clientCnxnSocket.updateLastSendAndHeard();
            int to;
            long lastPingRwServer = Time.currentElapsedTime();
            final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
            while (state.isAlive()) {
                  if (state.isConnected()) {
                        //1000(1 second) is to prevent race condition missing to send the second ping
                        //also make sure not to send too many pings when readTimeout is small 
                        int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - 
                                ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                        //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                        if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                            sendPing();
                            clientCnxnSocket.updateLastSend();
                        } else {
                            if (timeToNextPing < to) {
                                to = timeToNextPing;
                            }
                        }
                    }
············省略無關(guān)代碼········
}

其中timeToNextPing為下次發(fā)送PING的剩余時(shí)間克伊,創(chuàng)建完成會話時(shí)會把readTimeout設(shè)置為會話超時(shí)時(shí)間的2/3酥郭,clientCnxnSocket.getIdleSend是指距離最后一次發(fā)送數(shù)據(jù)的時(shí)間間隔。所以客戶端主動PING的時(shí)間間隔大體為1/3的會話時(shí)間愿吹,此外如果超過了MAX_SEND_PING_INTERVAL (10s)客戶端沒有主動和服務(wù)端通信不从,也會發(fā)送PING

服務(wù)端處理
對于服務(wù)端來說,不管是客戶端的主動PING還是其他類型的通信都會激活會話犁跪。
如果客戶端是與Leader服務(wù)器建立的會話椿息,Leader服務(wù)器激活會話的流程圖為:


激活會話的過程在SessionTrackerImpl.touchSession

synchronized public boolean touchSession(long sessionId, int timeout) {
        SessionImpl s = sessionsById.get(sessionId);

        if (s == null) {
            logTraceTouchInvalidSession(sessionId, timeout);
            return false;
        }
//1
        if (s.isClosing()) {
            logTraceTouchClosingSession(sessionId, timeout);
            return false;
        }
//2
        updateSessionExpiry(s, timeout);
        return true;
    }

1.如果會話已經(jīng)關(guān)閉,說明此時(shí)會話已經(jīng)超時(shí)了坷衍,將不在激活寝优,忽略掉這個(gè)客戶端請求。
2.激活會話
updateSessionExpiry

private void updateSessionExpiry(SessionImpl s, int timeout) {
        logTraceTouchSession(s.sessionId, timeout, "");
        sessionExpiryQueue.update(s, timeout);
    }

主要調(diào)用sessionExpiryQueue.update

public Long update(E elem, int timeout) {
//1
        Long prevExpiryTime = elemMap.get(elem);
        long now = Time.currentElapsedTime();
        Long newExpiryTime = roundToNextInterval(now + timeout);

        if (newExpiryTime.equals(prevExpiryTime)) {
            // No change, so nothing to update
            return null;
        }
//2
        // First add the elem to the new expiry time bucket in expiryMap.
        Set<E> set = expiryMap.get(newExpiryTime);
        if (set == null) {
            // Construct a ConcurrentHashSet using a ConcurrentHashMap
            set = Collections.newSetFromMap(
                new ConcurrentHashMap<E, Boolean>());
            // Put the new set in the map, but only if another thread
            // hasn't beaten us to it
            Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
            if (existingSet != null) {
                set = existingSet;
            }
        }
        set.add(elem);
//3
        // Map the elem to the new expiry time. If a different previous
        // mapping was present, clean up the previous expiry bucket.
        prevExpiryTime = elemMap.put(elem, newExpiryTime);
        if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
            Set<E> prevSet = expiryMap.get(prevExpiryTime);
            if (prevSet != null) {
                prevSet.remove(elem);
            }
        }
        return newExpiryTime;
    }

1.重新計(jì)算該會話的新的超時(shí)時(shí)間點(diǎn)枫耳,如果和當(dāng)前的超時(shí)時(shí)間點(diǎn)一樣乏矾,直接返回
2.將會話加入到新的超時(shí)時(shí)間點(diǎn)集合中,并從原超時(shí)時(shí)間點(diǎn)集合中刪除
示意圖為:


如果客戶端是與Learner服務(wù)器建立的會話迁杨,首先會調(diào)用
LearnerSessionTracker.touchSession

public boolean touchSession(long sessionId, int sessionTimeout) {
  
        touchTable.get().put(sessionId, sessionTimeout);
        return true;
    }

使得會話一直保存在touchTable
其次钻心,通過leader服務(wù)器與Learner服務(wù)器的定期心跳來完成Learner服務(wù)器上會話在leader服務(wù)器的激活。
先看下服務(wù)器間的心跳維持:
1.leader服務(wù)器主動發(fā)送PING:
leader.lead

            while (true) {
                synchronized (this) {
                    long start = Time.currentElapsedTime();
                    long cur = start;
                    long end = start + self.tickTime / 2;
                    while (cur < end) {
                        wait(end - cur);
                        cur = Time.currentElapsedTime();
                    }
                for (LearnerHandler f : getLearners()) {
                    f.ping();
                }
··············省略無關(guān)代碼··········
            }

可以看到在1/2心跳時(shí)間間隔內(nèi)铅协,會主動發(fā)送PING給 learner服務(wù)器捷沸。
2.learner服務(wù)器接收PING請求之后的處理
Learner.ping

protected void ping(QuorumPacket qp) throws IOException {
        // Send back the ping with our session data
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(bos);
        Map<Long, Integer> touchTable = zk.getTouchSnapshot();
//
        for (Entry<Long, Integer> entry : touchTable.entrySet()) {
            dos.writeLong(entry.getKey());
            dos.writeInt(entry.getValue());
        }
        qp.setData(bos.toByteArray());
        writePacket(qp, true);
    }

可以看到會把touchTable發(fā)送給leader服務(wù)器,當(dāng)leader接收到來自follower的PING響應(yīng)時(shí)狐史,會遍歷touchTable中的session痒给,調(diào)用LearnerSessionTracker.touchSession來激活這些會話。通過服務(wù)集群之間的心跳來激活learner服務(wù)器上的會話骏全,是很巧妙的一種方式侈玄。

只要客戶端能夠在指定時(shí)間內(nèi)發(fā)送數(shù)據(jù),服務(wù)端能夠順利激活會話吟温,會話的連接狀態(tài)就一直是States. CONNECTED

會話連接斷開之后的重連

可能由于網(wǎng)絡(luò)不穩(wěn)定等原因?qū)е戮W(wǎng)絡(luò)連接斷開,主要討論會話超時(shí)時(shí)間內(nèi)的連接斷開重連會話超時(shí)之后的重連這兩種情況突颊。
會話超時(shí)時(shí)間內(nèi)的socket連接斷開
客戶端處理
1.斷開處理
當(dāng)客戶端sendThread進(jìn)行IO操作出現(xiàn)可確定連接異常時(shí)調(diào)用cleanup方法

private void cleanup() {
//1
            clientCnxnSocket.cleanup();
            synchronized (pendingQueue) {
                for (Packet p : pendingQueue) {
//2
                    conLossPacket(p);
                }
                pendingQueue.clear();
            }
            // We can't call outgoingQueue.clear() here because
            // between iterating and clear up there might be new
            // packets added in queuePacket().
//3
            Iterator<Packet> iter = outgoingQueue.iterator();
            while (iter.hasNext()) {
                Packet p = iter.next();
                conLossPacket(p);
                iter.remove();
            }
        }

1.底層socket的處理clientCnxnSocket.cleanup鲁豪,關(guān)閉掉當(dāng)前socket,并注銷SelectionKey sockKey律秃,這樣sendThread便可知道連接斷開爬橡,需要進(jìn)行重連了
2.通知等待請求隊(duì)列和發(fā)送請求隊(duì)列連接已斷開
conLossPacket

private void conLossPacket(Packet p) {
        if (p.replyHeader == null) {
            return;
        }
        switch (state) {
        case AUTH_FAILED:
            p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
            break;
        case CLOSED:
            p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
            break;
        default:
            p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
        }
        finishPacket(p);
    }

當(dāng)連接斷開剛斷開時(shí),會設(shè)置請求的響應(yīng)頭err信息為KeeperException.Code.CONNECTIONLOSS棒动,在后續(xù)的請求響應(yīng)處理中finishPacket會根據(jù)該err信息通知該請求路徑上關(guān)聯(lián)的所有watchers,發(fā)生了連接斷開事件糙申。

 private void finishPacket(Packet p) {
        int err = p.replyHeader.getErr();
        if (p.watchRegistration != null) {
            p.watchRegistration.register(err);
        }
        // Add all the removed watch events to the event queue, so that the
        // clients will be notified with 'Data/Child WatchRemoved' event type.
        if (p.watchDeregistration != null) {
            Map<EventType, Set<Watcher>> materializedWatchers = null;
            try {
//查出路徑上注冊的所有watchers
                materializedWatchers = p.watchDeregistration.unregister(err);
                for (Entry<EventType, Set<Watcher>> entry : materializedWatchers
                        .entrySet()) {
                    Set<Watcher> watchers = entry.getValue();
                    if (watchers.size() > 0) {
//觸發(fā)連接斷開事件
                        queueEvent(p.watchDeregistration.getClientPath(), err,
                                watchers, entry.getKey());
                        // ignore connectionloss when removing from local
                        // session
                        p.replyHeader.setErr(Code.OK.intValue());
                    }
                }
            }

queueEvent

void queueEvent(String clientPath, int err,
            Set<Watcher> materializedWatchers, EventType eventType) {
        KeeperState sessionState = KeeperState.SyncConnected;
        if (KeeperException.Code.SESSIONEXPIRED.intValue() == err
                || KeeperException.Code.CONNECTIONLOSS.intValue() == err) {
            sessionState = Event.KeeperState.Disconnected;
        }
        WatchedEvent event = new WatchedEvent(eventType, sessionState,
                clientPath);
        eventThread.queueEvent(event, materializedWatchers);
    }

可以看出會發(fā)送Event.KeeperState.Disconnected事件
3.迭代outgoingQueue發(fā)送隊(duì)列中的請求,通知新加入的請求連接斷開
4.sendThread調(diào)用cleanup通知了隊(duì)列中的請求之后船惨,仍會 觸發(fā)Disconnected事件柜裸,通知當(dāng)前所有注冊的watch缕陕。

//run方法中
 // At this point, there might still be new packets appended to outgoingQueue.
                        // they will be handled in next connection or cleared up if closed.
                        cleanup();
                        if (state.isAlive()) {
                            eventThread.queueEvent(new WatchedEvent(
                                    Event.EventType.None,
                                    Event.KeeperState.Disconnected,
                                    null));
                        }

5.此外,如果確定連接已斷開疙挺,再往發(fā)送隊(duì)列發(fā)送數(shù)據(jù)時(shí)也會調(diào)用conLossPacket通知請求連接斷開
發(fā)送數(shù)據(jù)

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration,
            WatchDeregistration watchDeregistration) {
        Packet packet = null;

        // Note that we do not generate the Xid for the packet yet. It is
        // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
        // where the packet is actually sent.
        packet = new Packet(h, r, request, response, watchRegistration);
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        packet.watchDeregistration = watchDeregistration;
        // The synchronized block here is for two purpose:
        // 1. synchronize with the final cleanup() in SendThread.run() to avoid race
        // 2. synchronized against each packet. So if a closeSession packet is added,
        // later packet will be notified.
        synchronized (state) {
            if (!state.isAlive() || closing) {
//通知請求已斷開
                conLossPacket(packet);
            } 
}

2.客戶端重連
sendThread線程發(fā)現(xiàn)客戶端連接斷開了扛邑,會選擇下一個(gè)服務(wù)器地址,進(jìn)行重連铐然,此時(shí)會帶上sessionId

if (!clientCnxnSocket.isConnected()) {
                        // don't re-establish connection if we are closing
                        if (closing) {
                            break;
                        }
                        startConnect();
                        clientCnxnSocket.updateLastSendAndHeard();
                    }
}

3.接收會話創(chuàng)建成功響應(yīng)
同第一次會話創(chuàng)建蔬崩,將會話狀態(tài)設(shè)置為States. CONNECTED,并觸發(fā)SyncConnected事件

服務(wù)端處理
1.斷開處理
服務(wù)端發(fā)現(xiàn)無法與客戶端的ServerCnxn通信時(shí)搀暑,NIOServerCnxn.doIO會catch住異常,調(diào)用NIOServerCnxn.close從cnxns列表中移除ServerCnxn沥阳,并關(guān)閉當(dāng)前連接

/**
     * Close the cnxn and remove it from the factory cnxns list.
     */
    @Override
    public void close() {
        if (!factory.removeCnxn(this)) {
            return;
        }

        if (zkServer != null) {
            zkServer.removeCnxn(this);
        }

        if (sk != null) {
            try {
                // need to cancel this selection key from the selector
                sk.cancel();
            } catch (Exception e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("ignoring exception during selectionkey cancel", e);
                }
            }
        }

        closeSock();
    }

2.收到客戶端的重連創(chuàng)建會話請求
一般客戶端會選擇另外一臺服務(wù)端發(fā)送會話創(chuàng)建請求,當(dāng)服務(wù)器在本地session校驗(yàn)通過后自点,便會激活會話桐罕,創(chuàng)建與客戶端的socket連接。
處理連接請求:

public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
long sessionId = connReq.getSessionId();
        if (sessionId == 0) {
            LOG.info("Client attempting to establish new session at "
                    + cnxn.getRemoteSocketAddress());
            createSession(cnxn, passwd, sessionTimeout);
        } else {
//sessionId不為0樟氢,表示重連
            long clientSessionId = connReq.getSessionId();
            LOG.info("Client attempting to renew session 0x"
                    + Long.toHexString(clientSessionId)
                    + " at " + cnxn.getRemoteSocketAddress());
            if (serverCnxnFactory != null) {
                serverCnxnFactory.closeSession(sessionId);
            }
            if (secureServerCnxnFactory != null) {
                secureServerCnxnFactory.closeSession(sessionId);
            }
            cnxn.setSessionId(sessionId);
            reopenSession(cnxn, sessionId, passwd, sessionTimeout);
        }
}
public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,
            int sessionTimeout) throws IOException {
        if (checkPasswd(sessionId, passwd)) {
            revalidateSession(cnxn, sessionId, sessionTimeout);
        } else {
            LOG.warn("Incorrect password from " + cnxn.getRemoteSocketAddress()
                    + " for session 0x" + Long.toHexString(sessionId));
            finishSessionInit(cnxn, false);
        }
    }

3.發(fā)送會話創(chuàng)建成功響應(yīng)數(shù)據(jù):

ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
                    : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
                            // longer valid
                            valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);

會話超時(shí)之后的重連

服務(wù)端處理
1.會話清理
Leader服務(wù)器的會話管理器線程會檢查出過期的會話冈绊,進(jìn)行會話清理。清理操作為:
SessionTrackerImpl.run

for (SessionImpl s : sessionExpiryQueue.poll()) {
//1
                    setSessionClosing(s.sessionId);
                    expirer.expire(s);
                }

expirer.expire

public void expire(Session session) {
        long sessionId = session.getSessionId();
        LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
                + ", timeout of " + session.getTimeout() + "ms exceeded");
//2
        close(sessionId);
    }
 private void close(long sessionId) {
        Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
        setLocalSessionFlag(si);
        submitRequest(si);
    }
  • 標(biāo)記會話狀態(tài)為已關(guān)閉埠啃,s.isClosing = true
  • 發(fā)起OpCode.closeSession事務(wù)請求死宣,主要做的事情是
    • 刪除與會話相關(guān)的臨時(shí)節(jié)點(diǎn)
      包括即將會被創(chuàng)建但為保存到內(nèi)存數(shù)據(jù)庫中的臨時(shí)節(jié)點(diǎn)。
    • 移除會話
      主要從服務(wù)器各自的SessionTracker中移除
    • 關(guān)閉NIOServerCnxn

2.會話過期響應(yīng)
此時(shí)服務(wù)端已經(jīng)沒有了當(dāng)前會話的sesionId,校驗(yàn)session revalidateSession時(shí)無法重新激活會話

protected void revalidateSession(ServerCnxn cnxn, long sessionId,
            int sessionTimeout) throws IOException {
//返回false
        boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                    "Session 0x" + Long.toHexString(sessionId) +
                            " is valid: " + rc);
        }
        finishSessionInit(cnxn, rc);
    }

finishSessionInit(cnxn, rc);

public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
else {

                LOG.info("Invalid session 0x"
                        + Long.toHexString(cnxn.getSessionId())
                        + " for client "
                        + cnxn.getRemoteSocketAddress()
                        + ", probably expired");
                cnxn.sendBuffer(ServerCnxnFactory.closeConn);
            }
}

此時(shí)會發(fā)送ServerCnxnFactory.closeConn給客戶端碴开,使其關(guān)閉連接毅该。

 /**
     * The buffer will cause the connection to be close when we do a send.
     */
    static final ByteBuffer closeConn = ByteBuffer.allocate(0);

客戶端處理
1.重連
這種場景表示在會話超時(shí)時(shí)間之后客戶端才發(fā)送創(chuàng)建會話的重連請求到服務(wù)端。
2.接收關(guān)閉響應(yīng)
因?yàn)榇藭r(shí)收到的響應(yīng)沒有negotiatedSessionTimeout潦牛,所以會將連接狀態(tài)設(shè)置為States.CLOSED眶掌,并發(fā)送KeeperState.Expired事件,通知所有watcher巴碗。同時(shí)等待eventThread處理完所有事件朴爬,將線程狀態(tài)標(biāo)記為isRunning = false

 void onConnected(int _negotiatedSessionTimeout, long _sessionId,
                byte[] _sessionPasswd, boolean isRO) throws IOException {
            negotiatedSessionTimeout = _negotiatedSessionTimeout;
            if (negotiatedSessionTimeout <= 0) {
                state = States.CLOSED;

                eventThread.queueEvent(new WatchedEvent(
                        Watcher.Event.EventType.None,
                        Watcher.Event.KeeperState.Expired, null));
                eventThread.queueEventOfDeath();

                String warnInfo;
                warnInfo = "Unable to reconnect to ZooKeeper service, session 0x"
                    + Long.toHexString(sessionId) + " has expired";
                LOG.warn(warnInfo);
                throw new SessionExpiredException(warnInfo);
            }

感謝您的閱讀,我是Monica23334 || Monica2333 橡淆。立下每周寫一篇原創(chuàng)文章flag的小姐姐召噩,關(guān)注我并期待打臉吧~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市逸爵,隨后出現(xiàn)的幾起案子具滴,更是在濱河造成了極大的恐慌,老刑警劉巖师倔,帶你破解...
    沈念sama閱讀 221,430評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件构韵,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)疲恢,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,406評論 3 398
  • 文/潘曉璐 我一進(jìn)店門凶朗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人冈闭,你說我怎么就攤上這事俱尼。” “怎么了萎攒?”我有些...
    開封第一講書人閱讀 167,834評論 0 360
  • 文/不壞的土叔 我叫張陵遇八,是天一觀的道長。 經(jīng)常有香客問我耍休,道長刃永,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,543評論 1 296
  • 正文 為了忘掉前任羊精,我火速辦了婚禮斯够,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘喧锦。我一直安慰自己读规,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,547評論 6 397
  • 文/花漫 我一把揭開白布燃少。 她就那樣靜靜地躺著束亏,像睡著了一般。 火紅的嫁衣襯著肌膚如雪阵具。 梳的紋絲不亂的頭發(fā)上碍遍,一...
    開封第一講書人閱讀 52,196評論 1 308
  • 那天,我揣著相機(jī)與錄音阳液,去河邊找鬼怕敬。 笑死,一個(gè)胖子當(dāng)著我的面吹牛帘皿,可吹牛的內(nèi)容都是我干的东跪。 我是一名探鬼主播,決...
    沈念sama閱讀 40,776評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼鹰溜,長吁一口氣:“原來是場噩夢啊……” “哼虽填!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起奉狈,我...
    開封第一講書人閱讀 39,671評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎涩惑,沒想到半個(gè)月后仁期,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,221評論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,303評論 3 340
  • 正文 我和宋清朗相戀三年跛蛋,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了熬的。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,444評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡赊级,死狀恐怖押框,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情理逊,我是刑警寧澤橡伞,帶...
    沈念sama閱讀 36,134評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站晋被,受9級特大地震影響兑徘,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜羡洛,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,810評論 3 333
  • 文/蒙蒙 一挂脑、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧欲侮,春花似錦崭闲、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,285評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至忘伞,卻和暖如春薄翅,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背氓奈。 一陣腳步聲響...
    開封第一講書人閱讀 33,399評論 1 272
  • 我被黑心中介騙來泰國打工翘魄, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人舀奶。 一個(gè)月前我還...
    沈念sama閱讀 48,837評論 3 376
  • 正文 我出身青樓暑竟,卻偏偏與公主長得像,于是被迫代替她去往敵國和親育勺。 傳聞我的和親對象是個(gè)殘疾皇子但荤,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,455評論 2 359

推薦閱讀更多精彩內(nèi)容