概述
SendThread是客戶端ClientCnxn內(nèi)部的一個核心I/O調(diào)度線程,用于管理客戶端與服務(wù)端之間的所有網(wǎng)絡(luò)I/O操作,在Zookeeper客戶端實(shí)際運(yùn)行中,SendThread的作用如下:
1. 維護(hù)了客戶端與服務(wù)端之間的會話生命周期(通過一定周期頻率內(nèi)向服務(wù)端發(fā)送PING包檢測心跳),如果會話周期內(nèi)客戶端與服務(wù)端出現(xiàn)TCP連接斷開尼桶,那么就會自動且透明地完成重連操作。
2. 管理了客戶端所有的請求發(fā)送和響應(yīng)接收操作锯仪,其將上層客戶端API操作轉(zhuǎn)換成相應(yīng)的請求協(xié)議并發(fā)送到服務(wù)端泵督,并完成對同步調(diào)用的返回和異步調(diào)用的回調(diào)。
3. 將來自服務(wù)端的事件傳遞給EventThread去處理庶喜。
源碼
屬性
意義如下
字段 | 意義 |
---|---|
lastPingSentNs | 上一次ping的 nano time |
clientCnxnSocket | 通信層ClientCnxnSocket |
r | 隨機(jī)數(shù)生成器 |
isFirstConnect | 是否第一次connect |
rwServerAddress | 讀寫server地址 |
minPingRwTimeout | 最短ping 讀寫server的 timeout時間 |
maxPingRwTimeout | 最長ping 讀寫server的 timeout時間 |
pingRwTimeout | 默認(rèn)ping 讀寫server的 timeout時間 |
saslLoginFailed | sasl登錄失敗 |
RETRY_CONN_MSG | 日志 |
函數(shù)
簡要介紹如下
方法 | 作用 | 備注 |
---|---|---|
SendThread | 構(gòu)造函數(shù) | |
readResponse | 讀取server的回復(fù)小腊,進(jìn)行outgoingQueue以及pendingQueue的相關(guān)處理,事件觸發(fā)等等 | 重要 |
getZkState | 或者client狀態(tài) | |
getClientCnxnSocket | 獲取通信層clientCnxnSocket | |
primeConnection | 主要連接函數(shù)久窟,用于將watches和authData傳給server秩冈,允許clientCnxnSocket可讀寫 | 重要 |
prependChroot | 根據(jù)clientPath以及chrootPath得到serverPath | |
sendPing | ping命令,記錄發(fā)出時間斥扛,生成請求入问,加入outgoingQueue待發(fā)送 | 重要 |
startConnect | 開始連接,主要是和server的socket完成connect和accept | 重要 |
logStartConnect | log | |
run | 線程方法稀颁,完成了連接驗(yàn)證芬失,超時檢測,ping命令匾灶,以及網(wǎng)絡(luò)IO | 重要 |
pingRwServer | ping讀寫server | 重要 |
cleanup | socket清理以及通知兩個queue失去連接 以及 清理兩個隊(duì)列 | |
onConnected | 讀取server的connect response后棱烂,設(shè)置相關(guān)參數(shù) | 重要 |
close | 關(guān)閉socket | |
testableCloseSocket | ||
clientTunneledAuthenticationInProgress | 是否在驗(yàn)證sasl | |
sendPacket | 發(fā)送packet |
將幾個重要的函數(shù)進(jìn)行源碼講解
readResponse方法
可以拆成幾部分,分別完成
1.處理ping命令,AuthPacket,WatcherEvent,驗(yàn)證sasl并返回
2.從pendingQueue取出packet進(jìn)行驗(yàn)證(有順序保證)
3.調(diào)用finishPacket完成AsyncCallBack處理以及watcher的注冊
第一部分代碼如下
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");//反序列化出 回復(fù)頭
if (replyHdr.getXid() == -2) {
// -2 is the xid for pings
if (LOG.isDebugEnabled()) {
LOG.debug("Got ping response for sessionid: 0x"
+ Long.toHexString(sessionId)
+ " after "
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
+ "ms");
}
return;
}
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) ); //加入eventThread
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got auth sessionid:0x"
+ Long.toHexString(sessionId));
}
return;
}
if (replyHdr.getXid() == -1) {//-1代表通知類型 即WatcherEvent
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");//反序列化WatcherEvent
// convert from a server path to a client path
if (chrootPath != null) {//把serverPath轉(zhuǎn)化成clientPath
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);//WatcherEvent還原成WatchedEvent
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
eventThread.queueEvent( we );//加入eventThread
return;
}
// If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
if (clientTunneledAuthenticationInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia,"token");
zooKeeperSaslClient.respondToServer(request.getToken(),
ClientCnxn.this);
return;
}
可以看出都是直接return的
第2,3部分代碼如下
Packet packet;//auth和ping以及正在處理的sasl沒有加入pendingQueue,觸發(fā)的watch也沒有在pendingQueue中(是server主動發(fā)過來的),而AsyncCallBack在(見finishPacket)
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
packet = pendingQueue.remove();//得到了response阶女,從pendingQueue中移除
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid "
+ replyHdr.getXid() + " with err " +
+ replyHdr.getErr() +
" expected Xid "
+ packet.requestHeader.getXid()
+ " for a packet with details: "
+ packet );
}
packet.replyHeader.setXid(replyHdr.getXid());//設(shè)置replyHeader
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
finishPacket(packet);
}
這里可以看出
auth和ping以及正在處理的sasl沒有加入pendingQueue,觸發(fā)的watch也沒有在pendingQueue中(是server主動發(fā)過來的),而AsyncCallBack在pendingQueue中(見finishPacket)
primeConnection方法
primeConnection主要完成
根據(jù)之前是否連接過設(shè)置sessId以及生成ConnectRequest
根據(jù)disableAutoWatchReset將已有的watches和authData以及放入outgoingQueue準(zhǔn)備發(fā)送
允許clientCnxnSocket可讀寫,表示和server準(zhǔn)備IO
代碼如下
void primeConnection() throws IOException {
LOG.info("Socket connection established to "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", initiating session");
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;//如果之前見過讀寫server就設(shè)置sessionId,否則默認(rèn)0
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);//生成ConnectRequest
synchronized (outgoingQueue) {
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
// TODO: here we have the only remaining use of zooKeeper in
// this class. It's to be eliminated!
if (!disableAutoWatchReset) {
List<String> dataWatches = zooKeeper.getDataWatches();
List<String> existWatches = zooKeeper.getExistWatches();
List<String> childWatches = zooKeeper.getChildWatches();
if (!dataWatches.isEmpty()
|| !existWatches.isEmpty() || !childWatches.isEmpty()) {
Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();//根據(jù)chrootPath轉(zhuǎn)化成serverPath
Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
long setWatchesLastZxid = lastZxid;
while (dataWatchesIter.hasNext()
|| existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
List<String> dataWatchesBatch = new ArrayList<String>();
List<String> existWatchesBatch = new ArrayList<String>();
List<String> childWatchesBatch = new ArrayList<String>();
int batchLength = 0;
// Note, we may exceed our max length by a bit when we add the last
// watch in the batch. This isn't ideal, but it makes the code simpler.
while (batchLength < SET_WATCHES_MAX_LENGTH) {//限定長度
final String watch;
if (dataWatchesIter.hasNext()) {
watch = dataWatchesIter.next();
dataWatchesBatch.add(watch);
} else if (existWatchesIter.hasNext()) {
watch = existWatchesIter.next();
existWatchesBatch.add(watch);
} else if (childWatchesIter.hasNext()) {
watch = childWatchesIter.next();
childWatchesBatch.add(watch);
} else {
break;
}
batchLength += watch.length();
}
SetWatches sw = new SetWatches(setWatchesLastZxid,
dataWatchesBatch,
existWatchesBatch,
childWatchesBatch);//設(shè)置watches
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setWatches);
h.setXid(-8);
Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
outgoingQueue.addFirst(packet);//加入發(fā)送隊(duì)列
}
}
}
for (AuthData id : authInfo) {
outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
OpCode.auth), null, new AuthPacket(0, id.scheme,
id.data), null, null));//authInfo加入發(fā)送隊(duì)列
}
outgoingQueue.addFirst(new Packet(null, null, conReq,
null, null, readOnly));//ConnectRequest確保在發(fā)送隊(duì)列的第一個
}
clientCnxnSocket.enableReadWriteOnly();//開啟讀寫颊糜,這樣outgoingQueue內(nèi)容就可以發(fā)出去了
if (LOG.isDebugEnabled()) {
LOG.debug("Session establishment request sent on "
+ clientCnxnSocket.getRemoteSocketAddress());
}
}
主要注意:
1.sessId:如果之前連接過,那么重連用之前的sessionId张肾,否則默認(rèn)0,重連參見ClientCnxn.SendThread#startConnect的調(diào)用
2.什么時候連接會有watches需要去注冊?重連且disableAutoWatchReset為false的時候
3.ConnectRequest是放在outgoingQueue第一個的芭析,確保最先發(fā)出去的是連接請求(保證了第一個response是被ClientCnxnSocket#readConnectResult處理)
sendPing方法
這個就是個異步調(diào)用
private void sendPing() {//ping命令,記錄發(fā)出時間吞瞪,生成請求,加入outgoingQueue待發(fā)送
lastPingSentNs = System.nanoTime();
RequestHeader h = new RequestHeader(-2, OpCode.ping);
queuePacket(h, null, null, null, null, null, null, null, null);
}
注意一點(diǎn)驾孔,在run方法會將outgoingQueue的內(nèi)容發(fā)送出去芍秆,在ClientCnxnSocketNIO#doIO中惯疙,
ping命令的packet是沒有進(jìn)入pendingQueue的
startConnect方法
干的事情很簡單
1.根據(jù)hostProvider或者已經(jīng)設(shè)置的讀寫服務(wù)器地址確定server 地址
2.sasl相關(guān)處理
3.調(diào)用clientCnxnSocket.connect
源碼如下
private void startConnect() throws IOException {//開始連接
state = States.CONNECTING;
InetSocketAddress addr;
if (rwServerAddress != null) {
addr = rwServerAddress;//有rwServerAddress 就設(shè)置
rwServerAddress = null;
} else {
addr = hostProvider.next(1000);//沒有就從服務(wù)器地址列表取出來一個
}
setName(getName().replaceAll("\\(.*\\)",
"(" + addr.getHostName() + ":" + addr.getPort() + ")"));//設(shè)置線程名字
if (ZooKeeperSaslClient.isEnabled()) {//如果開啟了sasl,這部分不清楚妖啥,忽略
try {
String principalUserName = System.getProperty(
ZK_SASL_CLIENT_USERNAME, "zookeeper");
zooKeeperSaslClient =
new ZooKeeperSaslClient(
principalUserName+"/"+addr.getHostName());
} catch (LoginException e) {
// An authentication error occurred when the SASL client tried to initialize:
// for Kerberos this means that the client failed to authenticate with the KDC.
// This is different from an authentication error that occurs during communication
// with the Zookeeper server, which is handled below.
LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
+ "SASL authentication, if Zookeeper server allows it.");
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
saslLoginFailed = true;
}
}
logStartConnect(addr);//log
clientCnxnSocket.connect(addr);//socket連接
}
run方法
這個方法很重要
1.clientCnxnSocket相關(guān)初始化
2.不斷檢測clientCnxnSocket是否和服務(wù)器處于連接狀態(tài),沒有連接則進(jìn)行連接
3.檢測是否超時:當(dāng)處于連接狀態(tài)時霉颠,檢測是否讀超時,當(dāng)處于未連接狀態(tài)時荆虱,檢測是否連接超時
4.不斷的發(fā)送ping通知蒿偎,服務(wù)器端每接收到ping請求,就會從當(dāng)前時間重新計(jì)算session過期時間怀读,所以當(dāng)客戶端按照一定時間間隔不斷的發(fā)送ping請求诉位,就能保證客戶端的session不會過期
5.如果當(dāng)前是只讀的話,不斷去找有沒有支持讀寫的server
6.不斷進(jìn)行IO操作菜枷,發(fā)送請求隊(duì)列中的請求和讀取服務(wù)器端的響應(yīng)數(shù)據(jù)
7.!state.isAlive()時苍糠,進(jìn)行相關(guān)清理工作
第1部分,clientCnxnSocket相關(guān)初始化
clientCnxnSocket.introduce(this,sessionId);//clientCnxnSocket初始化
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = System.currentTimeMillis();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
第2部分,不斷檢測clientCnxnSocket是否和服務(wù)器處于連接狀態(tài),沒有連接則進(jìn)行連接
while (state.isAlive()) {
try {
if (!clientCnxnSocket.isConnected()) {//如果clientCnxnSocket的SelectionKey為null
if(!isFirstConnect){//如果不是第一次連接就sleep一下
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
break;
}
startConnect();//開始連接
clientCnxnSocket.updateLastSendAndHeard();
}
第3部分,檢測是否超時:當(dāng)處于連接狀態(tài)時,檢測是否讀超時啤誊,當(dāng)處于未連接狀態(tài)時岳瞭,檢測是否連接超時
//檢測是否超時,分為讀超時和連接超時
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent == true) {
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
}
}
//如果已經(jīng)連接上,預(yù)計(jì)讀超時時間 - 距離上次讀已經(jīng)過去的時間
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
//如果沒連接上,預(yù)計(jì)連接時間 - 上次讀已經(jīng)過去的時間
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {//代表讀超時或連接超時
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv()
+ "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId);
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
第4部分,不斷的發(fā)送ping通知
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
System.out.println("org.apache.zookeeper.ClientCnxn.SendThread.run readTimeout = " + readTimeout);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {//readTimeout已經(jīng)過了近一半的時間蚊锹,或者距離上次發(fā)送請求已過過了10s
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {//如果預(yù)計(jì)下次ping的時間 < 實(shí)際距離下次ping的時間
to = timeToNextPing;
}
}
}
第5部分瞳筏,如果當(dāng)前是只讀的話,不斷去找有沒有支持讀寫的server
// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {//如果是只讀的話
long now = System.currentTimeMillis();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
第6部分,不斷進(jìn)行IO操作牡昆,發(fā)送請求隊(duì)列中的請求和讀取服務(wù)器端的響應(yīng)數(shù)據(jù)
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);//在特定時間內(nèi)姚炕,根據(jù)兩個queue進(jìn)行網(wǎng)絡(luò)傳輸
這個看clientCnxnSocket內(nèi)的源碼,已經(jīng)在之前講過了
第7部分迁杨,!state.isAlive()時钻心,進(jìn)行相關(guān)清理工作
//下面是state is not alive的情況
cleanup();
clientCnxnSocket.close();//關(guān)閉socket
if (state.isAlive()) {//???什么時候會出現(xiàn)這種情況
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x"
+ Long.toHexString(getSessionId()));
這個地方有點(diǎn)不理解,為什么還會出現(xiàn) if (state.isAlive()) 的情況
pingRwServer方法
這個方法是client連接了只讀的server時铅协,不斷根據(jù)hostProvider找到一個可讀寫的server
private void pingRwServer() throws RWServerFoundException {//找到讀寫server捷沸,更新rwServerAddress
String result = null;
InetSocketAddress addr = hostProvider.next(0);
LOG.info("Checking server " + addr + " for being r/w." +
" Timeout " + pingRwTimeout);
Socket sock = null;
BufferedReader br = null;
try {
sock = new Socket(addr.getHostName(), addr.getPort());
sock.setSoLinger(false, -1);
sock.setSoTimeout(1000);
sock.setTcpNoDelay(true);
sock.getOutputStream().write("isro".getBytes());
sock.getOutputStream().flush();
sock.shutdownOutput();
br = new BufferedReader(
new InputStreamReader(sock.getInputStream()));
result = br.readLine();
} catch (ConnectException e) {
// ignore, this just means server is not up
} catch (IOException e) {
// some unexpected error, warn about it
LOG.warn("Exception while seeking for r/w server " +
e.getMessage(), e);
} finally {
if (sock != null) {
try {
sock.close();
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
}
}
if (br != null) {
try {
br.close();
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
}
}
}
if ("rw".equals(result)) {
pingRwTimeout = minPingRwTimeout;
// save the found address so that it's used during the next
// connection attempt
rwServerAddress = addr;
throw new RWServerFoundException("Majority server found at "
+ addr.getHostName() + ":" + addr.getPort());
}
}
主要看最后的if條件就夠了
注意的是,如果更新了rwServerAddress 會拋異常,run方法處理異常狐史,會清理后進(jìn)行重連,來讓client重連到讀寫server上去
onConnected方法
這個方法是收到了zk server的連接回復(fù)后的一些參數(shù)設(shè)置痒给,以及zk state的狀態(tài)改變
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {//讀取server的connect response后,設(shè)置相關(guān)參數(shù)
negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (negotiatedSessionTimeout <= 0) {
state = States.CLOSED;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();
String warnInfo;
warnInfo = "Unable to reconnect to ZooKeeper service, session 0x"
+ Long.toHexString(sessionId) + " has expired";
LOG.warn(warnInfo);
throw new SessionExpiredException(warnInfo);
}
if (!readOnly && isRO) {//如果client不允許只讀骏全,但是目前是只讀
LOG.error("Read/write client got connected to read-only server");
}
readTimeout = negotiatedSessionTimeout * 2 / 3;/
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
hostProvider.onConnected();//更新hostProvider循環(huán)列表的index
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
state = (isRO) ?
States.CONNECTEDREADONLY : States.CONNECTED;//根據(jù)isRO設(shè)置state
seenRwServerBefore |= !isRO;//是否見過讀寫server
LOG.info("Session establishment complete on server "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", sessionid = 0x" + Long.toHexString(sessionId)
+ ", negotiated timeout = " + negotiatedSessionTimeout
+ (isRO ? " (READ-ONLY mode)" : ""));
KeeperState eventState = (isRO) ?
KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
eventState, null));//加入watcherEvent
}
思考
run方法主要干的幾件事情
1.clientCnxnSocket相關(guān)初始化
2.不斷檢測clientCnxnSocket是否和服務(wù)器處于連接狀態(tài),沒有連接則進(jìn)行連接
3.檢測是否超時:當(dāng)處于連接狀態(tài)時苍柏,檢測是否讀超時,當(dāng)處于未連接狀態(tài)時姜贡,檢測是否連接超時
4.不斷的發(fā)送ping通知试吁,服務(wù)器端每接收到ping請求,就會從當(dāng)前時間重新計(jì)算session過期時間,所以當(dāng)客戶端按照一定時間間隔不斷的發(fā)送ping請求熄捍,就能保證客戶端的session不會過期
5.如果當(dāng)前是只讀的話烛恤,不斷去找有沒有支持讀寫的server
6.不斷進(jìn)行IO操作,發(fā)送請求隊(duì)列中的請求和讀取服務(wù)器端的響應(yīng)數(shù)據(jù)
7.!state.isAlive()時余耽,進(jìn)行相關(guān)清理工作
主要連接primeConnection干了什么,如果保證第一個發(fā)出去的請求是connect請求
根據(jù)之前是否連接過設(shè)置sessId以及生成ConnectRequest
根據(jù)disableAutoWatchReset將已有的watches和authData以及放入outgoingQueue準(zhǔn)備發(fā)送
允許clientCnxnSocket可讀寫,表示和server準(zhǔn)備IO
隊(duì)列里面第一個就是connect請求
sessId和sessionId
根據(jù)seenRwServerBefore判斷
沒有連接過 sessId就是0
有連接過 則sessId就是上次連接的sessionId
用sessId和全局的sessionPasswd去連接
哪些回復(fù)是不存在于pendingQueue當(dāng)中的
auth和ping以及正在處理的sasl沒有加入pendingQueue,觸發(fā)的watch也沒有在pendingQueue中
startConnect和primeConnection區(qū)別是什么
兩者的區(qū)別在于NIO的SelectionKey
前者限于connect和accept
后者已經(jīng)連接完成缚柏,開始了write和read,準(zhǔn)備開始和zk server完成socket io
pingRwServer和sendPing兩個函數(shù)區(qū)別是什么
前者是目前client只連接了只讀的zk server碟贾,會不斷地調(diào)用币喧,更新rwServerAddress
后者是不論client處于什么模式,都要進(jìn)行的心跳驗(yàn)證
clientCnxnSocket.isConnected()和isFirstConnect為什么有這兩個參數(shù)
isFirstConnect代表client 第一次連接袱耽,如果不是第一次連接,就sleep一段時間杀餐,然后從hostProvider選出下一個server addr
大體連接過程
首先與ZooKeeper服務(wù)器建立連接,有兩層連接要建立扛邑。
1.客戶端與服務(wù)器端的TCP連接
2.在TCP連接的基礎(chǔ)上建立session關(guān)聯(lián)
建立TCP連接之后怜浅,客戶端發(fā)送ConnectRequest請求,申請建立session關(guān)聯(lián)蔬崩,此時服務(wù)器端會為該客戶端分配sessionId和密碼恶座,同時開啟對該session是否超時的檢測。
當(dāng)在sessionTimeout時間內(nèi)沥阳,即還未超時跨琳,此時TCP連接斷開,服務(wù)器端仍然認(rèn)為該sessionId處于存活狀態(tài)桐罕。
此時脉让,客戶端會選擇下一個ZooKeeper服務(wù)器地址進(jìn)行TCP連接建立,TCP連接建立完成后功炮,拿著之前的sessionId和密碼發(fā)送ConnectRequest請求溅潜,如果還未到該sessionId的超時時間,則表示自動重連成功薪伏。
對客戶端用戶是透明的滚澜,一切都在背后默默執(zhí)行,ZooKeeper對象是有效的嫁怀。
如果重新建立TCP連接后设捐,已經(jīng)達(dá)到該sessionId的超時時間了(服務(wù)器端就會清理與該sessionId相關(guān)的數(shù)據(jù)),則返回給客戶端的sessionTimeout時間為0塘淑,sessionid為0萝招,密碼為空字節(jié)數(shù)組。
客戶端接收到該數(shù)據(jù)后存捺,會判斷協(xié)商后的sessionTimeout時間是否小于等于0槐沼,如果小于等于0,則使用eventThread線程先發(fā)出一個KeeperState.Expired事件,通知相應(yīng)的Watcher母赵。
然后結(jié)束EventThread線程的循環(huán)逸爵,開始走向結(jié)束具滴。此時ZooKeeper對象就是無效的了凹嘲,必須要重新new一個新的ZooKeeper對象,分配新的sessionId了构韵。
client一開始連接到了ReadOnly的server周蹭,后續(xù)找到rwServerAddress如何完成的重新連接
ClientCnxn.SendThread#run接收到RWServerFoundException異常,然后調(diào)用了
cleanUp調(diào)用后使得ClientCnxnSocketNIO#isConnected為false
因此ClientCnxn.SendThread#run方法又進(jìn)入了連接的操作
if (!clientCnxnSocket.isConnected()) {//如果clientCnxnSocket的SelectionKey為null
if(!isFirstConnect){//如果不是第一次連接就sleep一下
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
break;
}
startConnect();//開始連接
clientCnxnSocket.updateLastSendAndHeard();
}
問題
SendThread#run的疑惑
為什么while(state.isAlive()) break出去之后
還有 if (state.isAlive())
備注
這是后續(xù)看server的網(wǎng)絡(luò)IO代碼后需要搞清楚的
怎么保證server的處理和發(fā)送順序
什么時候server是只可讀的疲恢,什么時候是讀寫的
server如何分配的sessionId和pwd
server是如何區(qū)分連接請求的不同sessId的凶朗,后續(xù)待看
refer
https://my.oschina.net/pingpangkuangmo/blog/486780 run方法以及大體過程
http://www.cnblogs.com/leesf456/p/6098255.html 概念
http://www.voidcn.com/blog/aBOUNTWINTER/article/p-6400711.html 概念
http://shift-alt-ctrl.iteye.com/blog/1846971 RwServer,seenRwServerBefore讀寫server相關(guān)說明
《paxos到zk》