zookeeper客戶端和服務(wù)端維持一個(gè)TCP長連接左医,它們之間任何正常的通信都需要一個(gè)正常的會話。本文主要分析會話生命周期中會話狀態(tài)的變化過程和客戶端服務(wù)端如何管理會話爆侣。
客戶端記錄的會話狀態(tài)有:
ZooKeeper.States
public enum States {
//第一次會話正在創(chuàng)建的狀態(tài)
CONNECTING,
//沒有使用
ASSOCIATING,
//連接建立完成的狀態(tài)
CONNECTED,
//只讀模式下鸟蜡,連接建立完成的狀態(tài)
CONNECTEDREADONLY,
//會話關(guān)閉狀態(tài),包括客戶端主動關(guān)閉或者會話超時(shí)
CLOSED,
//授權(quán)失敗诉稍,未通過Sasl認(rèn)證
AUTH_FAILED,
//會話還未創(chuàng)建時(shí)的初始化狀態(tài)
NOT_CONNECTED;
}
原生客戶端會話狀態(tài)變化時(shí)的觸發(fā)事件有:
public enum KeeperState {
/** Unused, this state is never generated by the server */
@Deprecated
Unknown (-1),
//客戶端發(fā)現(xiàn)與服務(wù)端斷開連接時(shí)蝠嘉,會馬上嘗試重連并觸發(fā)該事件
Disconnected (0),
/** Unused, this state is never generated by the server */
@Deprecated
NoSyncConnected (1),
//非只讀模式下,每次客戶端剛連接上服務(wù)端的時(shí)候會觸發(fā)該事件
SyncConnected (3),
//如果有權(quán)限驗(yàn)證的話杯巨,驗(yàn)證失敗觸發(fā)該事件
AuthFailed (4),
//只讀模式下蚤告,每次客戶端剛連接上服務(wù)端的時(shí)候會觸發(fā)該事件
ConnectedReadOnly (5),
//如果有權(quán)限驗(yàn)證的話,驗(yàn)證成功觸發(fā)該事件
SaslAuthenticated(6),
//當(dāng)客戶端與服務(wù)端重新通信服爷,服務(wù)端認(rèn)為會話已超時(shí)杜恰,發(fā)送會話過期響應(yīng),觸發(fā)該事件
Expired (-112);
}
可以看出仍源,會話的狀態(tài)主要包括CONNECTING
(創(chuàng)建過程中)心褐,CONNECTED
(創(chuàng)建完成),CLOSED
(關(guān)閉會話)這三個(gè)狀態(tài)笼踩,其中狀態(tài)的改變也會觸發(fā)對應(yīng)的事件方便通知對應(yīng)的事件監(jiān)聽者逗爹。
在介紹具體的會話狀態(tài)變化前,先看下會話狀態(tài)變更流程圖:
會話創(chuàng)建
一次會話的創(chuàng)建過程中我們分析了會話的完整創(chuàng)建過程嚎于,此處聚焦會話的狀態(tài)變化和對應(yīng)觸發(fā)事件掘而。
客戶端處理
通過ClientCnxn.sendThread
線程處理與服務(wù)端的連接和IO過程,第一次連接時(shí)調(diào)用
SendThread.startConnect
private void startConnect() throws IOException {
//1
state = States.CONNECTING;
InetSocketAddress addr;
if (rwServerAddress != null) {
addr = rwServerAddress;
rwServerAddress = null;
} else {
addr = hostProvider.next(1000);
}
//2
if (ZooKeeperSaslClient.isEnabled()) {
try {
String principalUserName = System.getProperty(
ZK_SASL_CLIENT_USERNAME, "zookeeper");
zooKeeperSaslClient =
new ZooKeeperSaslClient(
principalUserName+"/"+addr.getHostString());
} catch (LoginException e) {
LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
+ "SASL authentication, if Zookeeper server allows it.");
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
saslLoginFailed = true;
}
}
logStartConnect(addr);
//3
clientCnxnSocket.connect(addr);
}
主要流程為:
1.連接狀態(tài)設(shè)置為States.CONNECTING
2.從服務(wù)器列表中選取一個(gè)服務(wù)器地址,如果需要授權(quán)校驗(yàn)于购,則進(jìn)行校驗(yàn)
3.嘗試與服務(wù)器連接镣屹,如果連接上了,客戶端會發(fā)送創(chuàng)建會話的第一個(gè)請求价涝,SendThread等待服務(wù)端的響應(yīng)。線程中處理IO的方法為clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
ClientCnxnSocket
是和服務(wù)端底層通信的實(shí)現(xiàn)持舆,它調(diào)用ClientCnxnSocketNIO.doIO
處理讀寫事件
void doIO(List<Packet> pendingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sockKey.isReadable()) {
if (!initialized) {
readConnectResult();
initialized = true;
}
··········省略不相關(guān)代碼·············
}
第一次讀到服務(wù)端的響應(yīng)數(shù)據(jù)時(shí)色瘩,會調(diào)用readConnectResult
void readConnectResult() throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
this.sessionId = conRsp.getSessionId();
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
conRsp.getPasswd(), isRO);
}
主要是反序列化解析響應(yīng)并調(diào)用sendThread.onConnected
進(jìn)行會話完成的回調(diào)處理
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (negotiatedSessionTimeout <= 0) {
state = States.CLOSED;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();
throw new SessionExpiredException(
"Unable to reconnect to ZooKeeper service, session 0x"
+ Long.toHexString(sessionId) + " has expired");
}
if (!readOnly && isRO) {
LOG.error("Read/write client got connected to read-only server");
}
readTimeout = negotiatedSessionTimeout * 2 / 3;
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
hostProvider.onConnected();
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
state = (isRO) ?
States.CONNECTEDREADONLY : States.CONNECTED;
seenRwServerBefore |= !isRO;
LOG.info("Session establishment complete on server "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", sessionid = 0x" + Long.toHexString(sessionId)
+ ", negotiated timeout = " + negotiatedSessionTimeout
+ (isRO ? " (READ-ONLY mode)" : ""));
KeeperState eventState = (isRO) ?
KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
eventState, null));
}
主要流程為:
1.如果服務(wù)端返回的會話過期時(shí)間小于等于0,說明會話已經(jīng)過期逸寓,將會話狀態(tài)設(shè)置為States.CLOSED
居兆,并觸發(fā)Expired
事件
2.根據(jù)響應(yīng)內(nèi)容設(shè)置客戶端會話相關(guān)屬性,如readTimeout
, ·connectTimeout,
sessionId`等
3.根據(jù)客戶端是否只讀竹伸,設(shè)置會話狀態(tài)和觸發(fā)事件
- 如果是只讀客戶端泥栖,將會話狀態(tài)設(shè)置為
States. CONNECTEDREADONLY
,并觸發(fā)ConnectedReadOnly
事件 - 如果不是只讀客戶端勋篓,將會話狀態(tài)設(shè)置為
States. CONNECTED
吧享,并觸發(fā)SyncConnected
事件
服務(wù)端處理
因?yàn)榉?wù)端通過會話管理器來管理會話,所以先介紹下會話管理器的內(nèi)容譬嚣。
服務(wù)端初始化時(shí)會初始化自己的會話管理器SessionTracker sessionTracker
,Leader服務(wù)器的實(shí)現(xiàn)為:LeaderSessionTracker
钢颂,F(xiàn)ollower和Observer服務(wù)器的實(shí)現(xiàn)為LearnerSessionTracker
LeaderSessionTracker
:負(fù)責(zé)所有會話激活,會話超時(shí)檢查拜银,會話清理殊鞭。
public class LeaderSessionTracker extends UpgradeableSessionTracker {
//是否為本地session遭垛,該值一般為false
private final boolean localSessionsEnabled;
//全局會話管理器
private final SessionTrackerImpl globalSessionTracker;
/**
* Server id of the leader
*/
private final long serverId;
public LeaderSessionTracker(SessionExpirer expirer,
ConcurrentMap<Long, Integer> sessionsWithTimeouts,
int tickTime, long id, boolean localSessionsEnabled,
ZooKeeperServerListener listener) {
this.globalSessionTracker = new SessionTrackerImpl(
expirer, sessionsWithTimeouts, tickTime, id, listener);
this.localSessionsEnabled = localSessionsEnabled;
if (this.localSessionsEnabled) {
createLocalSessionTracker(expirer, tickTime, id, listener);
}
serverId = id;
}
··········
}
主要成員變量為SessionTrackerImpl globalSessionTracker
,是全局會話管理的實(shí)現(xiàn)操灿。它的主要成員變量為:
//key:sessionId value:session實(shí)體 锯仪,每個(gè)會話都會保存其中
protected final ConcurrentHashMap<Long, SessionImpl> sessionsById =
new ConcurrentHashMap<Long, SessionImpl>();
//key:sessionId value:會話超時(shí)時(shí)間 ,該數(shù)據(jù)結(jié)構(gòu)與內(nèi)存數(shù)據(jù)庫相連通趾盐,會被定期持久化到快照文件中
private final ConcurrentMap<Long, Integer> sessionsWithTimeout;
//將會話按照各自的過期時(shí)間(優(yōu)化為心跳時(shí)間的整數(shù)倍)分桶存放庶喜,可快速用于會話的超時(shí)校驗(yàn)
private final ExpiryQueue<SessionImpl> sessionExpiryQueue;
//當(dāng)前服務(wù)器創(chuàng)建會話的最新sessionId
private final AtomicLong nextSessionId = new AtomicLong();
//會話超時(shí)清理器
private final SessionExpirer expirer;
//保存本地會話信息
private ConcurrentMap<Long, Integer> localSessionsWithTimeouts;
//本地會話相關(guān)
protected LocalSessionTracker localSessionTracker;
其中sessionExpiryQueue
按照每個(gè)會話的過期時(shí)間分桶管理會話。
ExpiryQueue
//key:每一個(gè)session實(shí)體谤碳,value:最近一次會話激活時(shí)計(jì)算出來的過期時(shí)間點(diǎn)
private final ConcurrentHashMap<E, Long> elemMap =
new ConcurrentHashMap<E, Long>();
//key:過期時(shí)間點(diǎn)溃卡,value:在這個(gè)時(shí)間點(diǎn)過期的會話集合
private final ConcurrentHashMap<Long, Set<E>> expiryMap =
new ConcurrentHashMap<Long, Set<E>>();
//expirer的下一個(gè)會話過期檢查時(shí)間
private final AtomicLong nextExpirationTime = new AtomicLong();
//心跳時(shí)間
private final int expirationInterval;
public ExpiryQueue(int expirationInterval) {
this.expirationInterval = expirationInterval;
nextExpirationTime.set(roundToNextInterval(Time.currentElapsedTime()));
}
//計(jì)算過期時(shí)間
private long roundToNextInterval(long time) {
return (time / expirationInterval + 1) * expirationInterval;
}
所有會話都被按照各自的過期時(shí)間點(diǎn)分批放在expiryMap
中,正常來說會話的過期時(shí)間點(diǎn)應(yīng)該為:會話創(chuàng)建時(shí)間(當(dāng)前時(shí)間) + 會話的超市時(shí)間
,但是每個(gè)會話的創(chuàng)建時(shí)間是很隨機(jī)的蜒简,服務(wù)端不可能時(shí)時(shí)刻刻檢查每一個(gè)會話是否過期了瘸羡。心跳時(shí)間就是大體保證服務(wù)端定期檢查會話的時(shí)間間隔。如果將會話管理器的定期檢查會話的時(shí)間點(diǎn)
和會話的過期時(shí)間點(diǎn)
都轉(zhuǎn)化為心跳時(shí)間的整數(shù)倍搓茬,那么就比較好管理會話犹赖。
通過roundToNextInterval
方法將這些時(shí)間化為心跳時(shí)間的整數(shù)倍。
會話的分桶管理示意圖為:
globalSessionTracker線程會不斷從sessionExpiryQueue中獲取下一個(gè)過期時(shí)間點(diǎn)nextExpirationTime
已經(jīng)超時(shí)的會話卷仑,調(diào)用expirer.expire
進(jìn)行會話清理峻村。
public void run() {
try {
while (running) {
long waitTime = sessionExpiryQueue.getWaitTime();
if (waitTime > 0) {
Thread.sleep(waitTime);
continue;
}
for (SessionImpl s : sessionExpiryQueue.poll()) {
setSessionClosing(s.sessionId);
expirer.expire(s);
}
}
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
LearnerSessionTracker
:一是保存當(dāng)前follower或observer服務(wù)器的會話,當(dāng)leader服務(wù)器發(fā)送服務(wù)間心跳時(shí)锡凝,會把當(dāng)前所有會話響應(yīng)給leader粘昨,用于會話激活。二是如果會話是本地會話窜锯,當(dāng)遇到必須升級為全局會話的情況张肾,需要從LearnerSessionTracker取出會話交給leader創(chuàng)建全局會話。
主要成員變量為:
private final SessionExpirer expirer;
// key:sessionId, value:sessionTimeout 用于將會話交給leader激活
private final AtomicReference<Map<Long, Integer>> touchTable =
new AtomicReference<Map<Long, Integer>>();
private final long serverId;
//當(dāng)前服務(wù)器創(chuàng)建會話的最新sessionId
private final AtomicLong nextSessionId = new AtomicLong();
//是否可創(chuàng)建本地會話锚扎,一般為false
private final boolean localSessionsEnabled;
//全局會話吞瞪,和定時(shí)快照有關(guān)
private final ConcurrentMap<Long, Integer> globalSessionsWithTimeouts;
//保存本地會話信息
private ConcurrentMap<Long, Integer> localSessionsWithTimeouts;
//本地會話相關(guān)
protected LocalSessionTracker localSessionTracker;
對于服務(wù)端來說,一個(gè)會話創(chuàng)建驾孔,會話信息會保存在leader服務(wù)器globalSessionTracker中的globalSessionTrackersessionsById , sessionsWithTimeout , sessionExpiryQueue
中芍秆。同時(shí)也會保存在learner服務(wù)器LearnerSessionTracker中的touchTable,globalSessionsWithTimeouts
中。所發(fā)送的響應(yīng)數(shù)據(jù)為:
ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
: 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
// longer valid
valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
也就是會把會話的sessionTimeout翠勉,sessionId妖啥,sessionPasswd信息發(fā)送給客戶端。
心跳維持
為了保持客戶端會話的有效性对碌,客戶端在會話超時(shí)時(shí)間內(nèi)會向服務(wù)端發(fā)送PING請求來保持有效性迹栓。服務(wù)端接收到PING請求后會重新計(jì)算當(dāng)前會話的過期時(shí)間,激活會話。
客戶端處理
客戶端主動發(fā)送PING的邏輯在sendThread.run
中
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
while (state.isAlive()) {
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
············省略無關(guān)代碼········
}
其中timeToNextPing
為下次發(fā)送PING的剩余時(shí)間克伊,創(chuàng)建完成會話時(shí)會把readTimeout
設(shè)置為會話超時(shí)時(shí)間的2/3酥郭,clientCnxnSocket.getIdleSend
是指距離最后一次發(fā)送數(shù)據(jù)的時(shí)間間隔。所以客戶端主動PING的時(shí)間間隔大體為1/3的會話時(shí)間愿吹,此外如果超過了MAX_SEND_PING_INTERVAL (10s)
客戶端沒有主動和服務(wù)端通信不从,也會發(fā)送PING
服務(wù)端處理
對于服務(wù)端來說,不管是客戶端的主動PING還是其他類型的通信都會激活會話犁跪。
如果客戶端是與Leader服務(wù)器建立的會話椿息,Leader服務(wù)器激活會話的流程圖為:
激活會話的過程在
SessionTrackerImpl.touchSession
synchronized public boolean touchSession(long sessionId, int timeout) {
SessionImpl s = sessionsById.get(sessionId);
if (s == null) {
logTraceTouchInvalidSession(sessionId, timeout);
return false;
}
//1
if (s.isClosing()) {
logTraceTouchClosingSession(sessionId, timeout);
return false;
}
//2
updateSessionExpiry(s, timeout);
return true;
}
1.如果會話已經(jīng)關(guān)閉,說明此時(shí)會話已經(jīng)超時(shí)了坷衍,將不在激活寝优,忽略掉這個(gè)客戶端請求。
2.激活會話
updateSessionExpiry
private void updateSessionExpiry(SessionImpl s, int timeout) {
logTraceTouchSession(s.sessionId, timeout, "");
sessionExpiryQueue.update(s, timeout);
}
主要調(diào)用sessionExpiryQueue.update
public Long update(E elem, int timeout) {
//1
Long prevExpiryTime = elemMap.get(elem);
long now = Time.currentElapsedTime();
Long newExpiryTime = roundToNextInterval(now + timeout);
if (newExpiryTime.equals(prevExpiryTime)) {
// No change, so nothing to update
return null;
}
//2
// First add the elem to the new expiry time bucket in expiryMap.
Set<E> set = expiryMap.get(newExpiryTime);
if (set == null) {
// Construct a ConcurrentHashSet using a ConcurrentHashMap
set = Collections.newSetFromMap(
new ConcurrentHashMap<E, Boolean>());
// Put the new set in the map, but only if another thread
// hasn't beaten us to it
Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
if (existingSet != null) {
set = existingSet;
}
}
set.add(elem);
//3
// Map the elem to the new expiry time. If a different previous
// mapping was present, clean up the previous expiry bucket.
prevExpiryTime = elemMap.put(elem, newExpiryTime);
if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
Set<E> prevSet = expiryMap.get(prevExpiryTime);
if (prevSet != null) {
prevSet.remove(elem);
}
}
return newExpiryTime;
}
1.重新計(jì)算該會話的新的超時(shí)時(shí)間點(diǎn)枫耳,如果和當(dāng)前的超時(shí)時(shí)間點(diǎn)一樣乏矾,直接返回
2.將會話加入到新的超時(shí)時(shí)間點(diǎn)集合中,并從原超時(shí)時(shí)間點(diǎn)集合中刪除
示意圖為:
如果客戶端是與Learner服務(wù)器建立的會話迁杨,首先會調(diào)用
LearnerSessionTracker.touchSession
public boolean touchSession(long sessionId, int sessionTimeout) {
touchTable.get().put(sessionId, sessionTimeout);
return true;
}
使得會話一直保存在touchTable
中
其次钻心,通過leader服務(wù)器與Learner服務(wù)器的定期心跳來完成Learner服務(wù)器上會話在leader服務(wù)器的激活。
先看下服務(wù)器間的心跳維持:
1.leader服務(wù)器主動發(fā)送PING:
leader.lead
while (true) {
synchronized (this) {
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.tickTime / 2;
while (cur < end) {
wait(end - cur);
cur = Time.currentElapsedTime();
}
for (LearnerHandler f : getLearners()) {
f.ping();
}
··············省略無關(guān)代碼··········
}
可以看到在1/2心跳時(shí)間間隔內(nèi)铅协,會主動發(fā)送PING給 learner服務(wù)器捷沸。
2.learner服務(wù)器接收PING請求之后的處理
Learner.ping
protected void ping(QuorumPacket qp) throws IOException {
// Send back the ping with our session data
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
Map<Long, Integer> touchTable = zk.getTouchSnapshot();
//
for (Entry<Long, Integer> entry : touchTable.entrySet()) {
dos.writeLong(entry.getKey());
dos.writeInt(entry.getValue());
}
qp.setData(bos.toByteArray());
writePacket(qp, true);
}
可以看到會把touchTable
發(fā)送給leader服務(wù)器,當(dāng)leader接收到來自follower的PING響應(yīng)時(shí)狐史,會遍歷touchTable
中的session痒给,調(diào)用LearnerSessionTracker.touchSession
來激活這些會話。通過服務(wù)集群之間的心跳來激活learner服務(wù)器上的會話骏全,是很巧妙的一種方式侈玄。
只要客戶端能夠在指定時(shí)間內(nèi)發(fā)送數(shù)據(jù),服務(wù)端能夠順利激活會話吟温,會話的連接狀態(tài)就一直是States. CONNECTED
會話連接斷開之后的重連
可能由于網(wǎng)絡(luò)不穩(wěn)定等原因?qū)е戮W(wǎng)絡(luò)連接斷開,主要討論會話超時(shí)時(shí)間內(nèi)的連接斷開重連
和會話超時(shí)之后的重連
這兩種情況突颊。
會話超時(shí)時(shí)間內(nèi)的socket連接斷開
客戶端處理
1.斷開處理
當(dāng)客戶端sendThread進(jìn)行IO操作出現(xiàn)可確定連接異常時(shí)調(diào)用cleanup
方法
private void cleanup() {
//1
clientCnxnSocket.cleanup();
synchronized (pendingQueue) {
for (Packet p : pendingQueue) {
//2
conLossPacket(p);
}
pendingQueue.clear();
}
// We can't call outgoingQueue.clear() here because
// between iterating and clear up there might be new
// packets added in queuePacket().
//3
Iterator<Packet> iter = outgoingQueue.iterator();
while (iter.hasNext()) {
Packet p = iter.next();
conLossPacket(p);
iter.remove();
}
}
1.底層socket的處理clientCnxnSocket.cleanup
鲁豪,關(guān)閉掉當(dāng)前socket,并注銷SelectionKey sockKey
律秃,這樣sendThread便可知道連接斷開爬橡,需要進(jìn)行重連了
2.通知等待請求隊(duì)列和發(fā)送請求隊(duì)列連接已斷開
conLossPacket
private void conLossPacket(Packet p) {
if (p.replyHeader == null) {
return;
}
switch (state) {
case AUTH_FAILED:
p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
break;
case CLOSED:
p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
break;
default:
p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
}
finishPacket(p);
}
當(dāng)連接斷開剛斷開時(shí),會設(shè)置請求的響應(yīng)頭err信息為KeeperException.Code.CONNECTIONLOSS
棒动,在后續(xù)的請求響應(yīng)處理中finishPacket
會根據(jù)該err信息通知該請求路徑上關(guān)聯(lián)的所有watchers,發(fā)生了連接斷開事件糙申。
private void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
p.watchRegistration.register(err);
}
// Add all the removed watch events to the event queue, so that the
// clients will be notified with 'Data/Child WatchRemoved' event type.
if (p.watchDeregistration != null) {
Map<EventType, Set<Watcher>> materializedWatchers = null;
try {
//查出路徑上注冊的所有watchers
materializedWatchers = p.watchDeregistration.unregister(err);
for (Entry<EventType, Set<Watcher>> entry : materializedWatchers
.entrySet()) {
Set<Watcher> watchers = entry.getValue();
if (watchers.size() > 0) {
//觸發(fā)連接斷開事件
queueEvent(p.watchDeregistration.getClientPath(), err,
watchers, entry.getKey());
// ignore connectionloss when removing from local
// session
p.replyHeader.setErr(Code.OK.intValue());
}
}
}
queueEvent
void queueEvent(String clientPath, int err,
Set<Watcher> materializedWatchers, EventType eventType) {
KeeperState sessionState = KeeperState.SyncConnected;
if (KeeperException.Code.SESSIONEXPIRED.intValue() == err
|| KeeperException.Code.CONNECTIONLOSS.intValue() == err) {
sessionState = Event.KeeperState.Disconnected;
}
WatchedEvent event = new WatchedEvent(eventType, sessionState,
clientPath);
eventThread.queueEvent(event, materializedWatchers);
}
可以看出會發(fā)送Event.KeeperState.Disconnected
事件
3.迭代outgoingQueue
發(fā)送隊(duì)列中的請求,通知新加入的請求連接斷開
4.sendThread調(diào)用cleanup
通知了隊(duì)列中的請求之后船惨,仍會 觸發(fā)Disconnected事件柜裸,通知當(dāng)前所有注冊的watch缕陕。
//run方法中
// At this point, there might still be new packets appended to outgoingQueue.
// they will be handled in next connection or cleared up if closed.
cleanup();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
null));
}
5.此外,如果確定連接已斷開疙挺,再往發(fā)送隊(duì)列發(fā)送數(shù)據(jù)時(shí)也會調(diào)用conLossPacket
通知請求連接斷開
發(fā)送數(shù)據(jù)
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) {
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
// The synchronized block here is for two purpose:
// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
// 2. synchronized against each packet. So if a closeSession packet is added,
// later packet will be notified.
synchronized (state) {
if (!state.isAlive() || closing) {
//通知請求已斷開
conLossPacket(packet);
}
}
2.客戶端重連
sendThread線程發(fā)現(xiàn)客戶端連接斷開了扛邑,會選擇下一個(gè)服務(wù)器地址,進(jìn)行重連铐然,此時(shí)會帶上sessionId
if (!clientCnxnSocket.isConnected()) {
// don't re-establish connection if we are closing
if (closing) {
break;
}
startConnect();
clientCnxnSocket.updateLastSendAndHeard();
}
}
3.接收會話創(chuàng)建成功響應(yīng)
同第一次會話創(chuàng)建蔬崩,將會話狀態(tài)設(shè)置為States. CONNECTED,并觸發(fā)SyncConnected事件
服務(wù)端處理
1.斷開處理
服務(wù)端發(fā)現(xiàn)無法與客戶端的ServerCnxn
通信時(shí)搀暑,NIOServerCnxn.doIO
會catch住異常,調(diào)用NIOServerCnxn.close
從cnxns列表中移除ServerCnxn沥阳,并關(guān)閉當(dāng)前連接
/**
* Close the cnxn and remove it from the factory cnxns list.
*/
@Override
public void close() {
if (!factory.removeCnxn(this)) {
return;
}
if (zkServer != null) {
zkServer.removeCnxn(this);
}
if (sk != null) {
try {
// need to cancel this selection key from the selector
sk.cancel();
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
LOG.debug("ignoring exception during selectionkey cancel", e);
}
}
}
closeSock();
}
2.收到客戶端的重連創(chuàng)建會話請求
一般客戶端會選擇另外一臺服務(wù)端發(fā)送會話創(chuàng)建請求,當(dāng)服務(wù)器在本地session校驗(yàn)通過后自点,便會激活會話桐罕,創(chuàng)建與客戶端的socket連接。
處理連接請求:
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
long sessionId = connReq.getSessionId();
if (sessionId == 0) {
LOG.info("Client attempting to establish new session at "
+ cnxn.getRemoteSocketAddress());
createSession(cnxn, passwd, sessionTimeout);
} else {
//sessionId不為0樟氢,表示重連
long clientSessionId = connReq.getSessionId();
LOG.info("Client attempting to renew session 0x"
+ Long.toHexString(clientSessionId)
+ " at " + cnxn.getRemoteSocketAddress());
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId);
}
if (secureServerCnxnFactory != null) {
secureServerCnxnFactory.closeSession(sessionId);
}
cnxn.setSessionId(sessionId);
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
}
}
public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,
int sessionTimeout) throws IOException {
if (checkPasswd(sessionId, passwd)) {
revalidateSession(cnxn, sessionId, sessionTimeout);
} else {
LOG.warn("Incorrect password from " + cnxn.getRemoteSocketAddress()
+ " for session 0x" + Long.toHexString(sessionId));
finishSessionInit(cnxn, false);
}
}
3.發(fā)送會話創(chuàng)建成功響應(yīng)數(shù)據(jù):
ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
: 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
// longer valid
valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
會話超時(shí)之后的重連
服務(wù)端處理
1.會話清理
Leader服務(wù)器的會話管理器線程會檢查出過期的會話冈绊,進(jìn)行會話清理。清理操作為:
SessionTrackerImpl.run
for (SessionImpl s : sessionExpiryQueue.poll()) {
//1
setSessionClosing(s.sessionId);
expirer.expire(s);
}
expirer.expire
public void expire(Session session) {
long sessionId = session.getSessionId();
LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
+ ", timeout of " + session.getTimeout() + "ms exceeded");
//2
close(sessionId);
}
private void close(long sessionId) {
Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
setLocalSessionFlag(si);
submitRequest(si);
}
- 標(biāo)記會話狀態(tài)為已關(guān)閉埠啃,
s.isClosing = true
- 發(fā)起
OpCode.closeSession
事務(wù)請求死宣,主要做的事情是- 刪除與會話相關(guān)的臨時(shí)節(jié)點(diǎn)
包括即將會被創(chuàng)建但為保存到內(nèi)存數(shù)據(jù)庫中的臨時(shí)節(jié)點(diǎn)。 - 移除會話
主要從服務(wù)器各自的SessionTracker中移除 - 關(guān)閉NIOServerCnxn
- 刪除與會話相關(guān)的臨時(shí)節(jié)點(diǎn)
2.會話過期響應(yīng)
此時(shí)服務(wù)端已經(jīng)沒有了當(dāng)前會話的sesionId,校驗(yàn)session revalidateSession
時(shí)無法重新激活會話
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
int sessionTimeout) throws IOException {
//返回false
boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
"Session 0x" + Long.toHexString(sessionId) +
" is valid: " + rc);
}
finishSessionInit(cnxn, rc);
}
finishSessionInit(cnxn, rc);
public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
else {
LOG.info("Invalid session 0x"
+ Long.toHexString(cnxn.getSessionId())
+ " for client "
+ cnxn.getRemoteSocketAddress()
+ ", probably expired");
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
}
}
此時(shí)會發(fā)送ServerCnxnFactory.closeConn
給客戶端碴开,使其關(guān)閉連接毅该。
/**
* The buffer will cause the connection to be close when we do a send.
*/
static final ByteBuffer closeConn = ByteBuffer.allocate(0);
客戶端處理
1.重連
這種場景表示在會話超時(shí)時(shí)間之后客戶端才發(fā)送創(chuàng)建會話的重連請求到服務(wù)端。
2.接收關(guān)閉響應(yīng)
因?yàn)榇藭r(shí)收到的響應(yīng)沒有negotiatedSessionTimeout潦牛,所以會將連接狀態(tài)設(shè)置為States.CLOSED
眶掌,并發(fā)送KeeperState.Expired
事件,通知所有watcher巴碗。同時(shí)等待eventThread處理完所有事件朴爬,將線程狀態(tài)標(biāo)記為isRunning = false
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (negotiatedSessionTimeout <= 0) {
state = States.CLOSED;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();
String warnInfo;
warnInfo = "Unable to reconnect to ZooKeeper service, session 0x"
+ Long.toHexString(sessionId) + " has expired";
LOG.warn(warnInfo);
throw new SessionExpiredException(warnInfo);
}
感謝您的閱讀,我是Monica23334 || Monica2333 橡淆。立下每周寫一篇原創(chuàng)文章flag的小姐姐召噩,關(guān)注我并期待打臉吧~