前言
在前面介紹了zookeeper server端的啟動過程担映,現(xiàn)在我們分析zookeeper client啟動過程
創(chuàng)建客戶端連接對象
一般情況下使用zookeeper原生庫創(chuàng)建建立的方式如下
Zookeeper zookeeper = new Zookeeper(connectionString,sessionTimeout,watcher)
我直接看Zookeeper類初始化的源代碼
public ZooKeeper(
String connectString,
int sessionTimeout,
Watcher watcher,
boolean canBeReadOnly,
HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info(
"Initiating client connection, connectString={} sessionTimeout={} watcher={}",
connectString,
sessionTimeout,
watcher);
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
//clientConfig存儲zookeeper客戶端一些可配置屬性的信息
this.clientConfig = clientConfig;
//創(chuàng)建客戶端的watcher的管理器
watchManager = defaultWatchManager();
//設(shè)置watcher管理器默認(rèn)的watcher
watchManager.defaultWatcher = watcher;
//根據(jù)用戶提供的connectString穿件ConnectStringParser對象,下面會解析ConnectStringParser對象的作用
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
//用戶提供的連接信息中可能包含多個(gè)ip地址邀桑,那么當(dāng)客戶端去連接zookeeper server的時(shí)候應(yīng)該選擇哪一個(gè)ip去連接呢祟蚀?通過hostProvider來封裝這個(gè)邏輯
hostProvider = aHostProvider;
//客戶端的連接對象工窍,這個(gè)對象是實(shí)現(xiàn)客戶端連接服務(wù)端的核心,在下面我們會詳細(xì)解析
cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
this,
watchManager,
getClientCnxnSocket(),
canBeReadOnly);
//啟動客戶端相關(guān)的一些線程
cnxn.start();
}
ConnectStringParser
我們先解析下ConnectStringParser這個(gè)類
從上圖我們可以看出ConnectStringParser類有兩個(gè)重要的屬性chrootPath前酿,serverAddresses患雏。ConnectStringParser就是根據(jù)用戶傳入的連接信息解析出這兩個(gè)屬性的值,chrootPath是用戶傳入的連接信息中包含的路徑信息罢维,比如用戶提供的連接信息是"192.168.11.1:2181,192.168.11.2:2181/tt",那么通過ConnectStringParser的解析chrootPath=tt淹仑,serverAddresses用來存儲用戶提供的地址和端口對的解析結(jié)果,同樣是上面的例子,serverAddresses解析的結(jié)果是["192.168.11.1:2181","192.168.11.2:2181"]
現(xiàn)在給出ConnectStringParser解析用戶提供的連接信息的源代碼
public ConnectStringParser(String connectString) {
// parse out chroot, if any
//取得chrootPath的分解符的位置
int off = connectString.indexOf('/');
if (off >= 0) {
//解析出chrootPath
String chrootPath = connectString.substring(off);
// ignore "/" chroot spec, same as null
if (chrootPath.length() == 1) {
this.chrootPath = null;
} else {
PathUtils.validatePath(chrootPath);
this.chrootPath = chrootPath;
}
//解析出客戶端連接服務(wù)端的ip:port對信息
connectString = connectString.substring(0, off);
} else {
this.chrootPath = null;
}
//通過逗號分隔符分割出每個(gè)ip:port的連接信息
List<String> hostsList = split(connectString, ",");
for (String host : hostsList) {
int port = DEFAULT_PORT;
try {
//解析出ip和port
String[] hostAndPort = ConfigUtils.getHostAndPort(host);
host = hostAndPort[0];
if (hostAndPort.length == 2) {
port = Integer.parseInt(hostAndPort[1]);
}
} catch (ConfigException e) {
e.printStackTrace();
}
//根據(jù)ip和port創(chuàng)建InetSocketAddress對象匀借,然后加入serverAddresses中
serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
}
}
HostProvider
通過對ConnectStringParser的解析颜阐,我們知道用戶可能會提供多個(gè)連接服務(wù)端的IP:Port,那么客戶端應(yīng)該選擇哪一個(gè)去連接服務(wù)端呢吓肋?這個(gè)就是HostProvider的工作了凳怨。HostProvider默認(rèn)實(shí)現(xiàn)是StaticHostProvider
上圖是StaticHostProvider屬性圖,紅框框出來的三個(gè)屬性serverAddresses,lastIndex,currentIndex是StaticHostProvider實(shí)現(xiàn)選擇一個(gè)服務(wù)器進(jìn)行服務(wù)端連接的核心是鬼。StaticHostProvider的next()方法向外部提供了選取一個(gè)服務(wù)器進(jìn)行連接的封裝
next()
public InetSocketAddress next(long spinDelay) {
boolean needToSleep = false;
InetSocketAddress addr;
synchronized (this) {
//reconfigMode是zookeeper為了server端連接的負(fù)債均衡而設(shè)計(jì)的一個(gè)功能
if (reconfigMode) {
addr = nextHostInReconfigMode();
if (addr != null) {
currentIndex = serverAddresses.indexOf(addr);
return resolve(addr);
}
//tried all servers and couldn't connect
reconfigMode = false;
needToSleep = (spinDelay > 0);
}
//更新currentIndex
++currentIndex;
//如果currentIndex和服務(wù)器列表長度一樣大肤舞,那么重置currentIndex為0
if (currentIndex == serverAddresses.size()) {
currentIndex = 0;
}
//從服務(wù)器列表中獲取一個(gè)服務(wù)器
addr = serverAddresses.get(currentIndex);
//判斷是不是需要sleep 一會
needToSleep = needToSleep || (currentIndex == lastIndex && spinDelay > 0);
if (lastIndex == -1) {
// We don't want to sleep on the first ever connect attempt.
//初始化lastIndex為0,如果一開始lastIndex就設(shè)置成0而不是-1那么會導(dǎo)致第一次連接時(shí)候needToSleep就是true均蜜,這樣顯然不合適
lastIndex = 0;
}
}
if (needToSleep) {
try {
//休眠一會會李剖,當(dāng)服務(wù)器列表中的所有的服務(wù)器都被連接一遍之后,再次去連接服務(wù)器的時(shí)候需要休眠一會(個(gè)人理解:既然所有的服務(wù)器連接都沒連接上囤耳,那么服務(wù)端可能在忙著什么事情篙顺,等一會給服務(wù)器端一些喘息的機(jī)會)
Thread.sleep(spinDelay);
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
//返回服務(wù)器ip信息
return resolve(addr);
}
ClientCnxn
客戶端連接對象,保存了客戶端連接服務(wù)端的信息充择,包含的屬性比較多慰安,我們選擇幾個(gè)進(jìn)行注釋
我們看下ClientCnxn最終的構(gòu)造方法
public ClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
ZooKeeper zooKeeper,
ClientWatchManager watcher,
ClientCnxnSocket clientCnxnSocket,
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
//創(chuàng)建SendThread線程,SendThread負(fù)責(zé)IO的處理聪铺,clientCnxnSocket在默認(rèn)情況下的實(shí)現(xiàn)是ClientCnxnSocketNIO
sendThread = new SendThread(clientCnxnSocket);
//創(chuàng)建EventThread用來處理各種watcher關(guān)注的事件
eventThread = new EventThread();
this.clientConfig = zooKeeper.getClientConfig();
initRequestTimeout();
}
ClientCnxn.start()
ClientCnxn.start會啟動SendThread和EventThread
public void start() {
sendThread.start();
eventThread.start();
}
SendThread
SendThread負(fù)責(zé)處理客戶端的所有IO,我們看下它的run方法
SendThread.run
public void run() {
//設(shè)置clientCnxnSocket的sessionId萄窜,outgoingQueue屬性
//注意當(dāng)?shù)谝淮谓⑦B接的時(shí)候由于服務(wù)端的sessionId還沒有生成铃剔,所以為默認(rèn)的0
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
//客戶端向服務(wù)端發(fā)送心跳的頻率,默認(rèn)是10s查刻,但是為了debug我把這個(gè)參數(shù)設(shè)置的很大
final int MAX_SEND_PING_INTERVAL = 10000000; //10 seconds
InetSocketAddress serverAddress = null;
//注意這個(gè)這里是while(xxx),如果連接正常键兜,那么會一直執(zhí)行下面的邏輯
while (state.isAlive()) {
try {
//如果客戶端還沒有和服務(wù)端建立連接,那么進(jìn)入建立連接流程
if (!clientCnxnSocket.isConnected()) {
// don't re-establish connection if we are closing
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
//通過hostProvider去服務(wù)器列表中獲取一個(gè)服務(wù)進(jìn)行連接
serverAddress = hostProvider.next(1000);
}
//建立到服務(wù)端的socket連接穗泵,下面會給出具體的源碼
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
//如果已經(jīng)建立了到服務(wù)端的連接
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent) {
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
if (state == States.AUTH_FAILED) {
eventThread.queueEventOfDeath();
}
}
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
//統(tǒng)計(jì)連接的耗時(shí)
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
//如果to<0說明操作超時(shí)拋出異常
if (to <= 0) {
String warnInfo = String.format(
"Client session timed out, have not heard from server in %dms for session id 0x%s",
clientCnxnSocket.getIdleRecv(),
Long.toHexString(sessionId));
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
if (state.isConnected()) {
//如果已經(jīng)建立了到服務(wù)端的連接普气,下面是下一次發(fā)送心跳信息到服務(wù)端的時(shí)間點(diǎn)
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2
- clientCnxnSocket.getIdleSend()
- ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
//如果timeTNextPing<=0說明發(fā)送的心跳的時(shí)間到了,亦或者客戶端已經(jīng)過了MAX_SEND_PING_INTERVAL這么久都沒有發(fā)送任何消息到服務(wù)端佃延,在上述兩種情況下都需要發(fā)送心跳信息到服務(wù)端
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
//如果state == States.CONNECTEDREADONLY现诀,看下面的英文解釋
// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
//客戶端處理各種IO事件,這個(gè)我們后面會詳細(xì)解析
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
// closing so this is expected
LOG.warn(
"An exception was thrown while closing send thread for session 0x{}.",
Long.toHexString(getSessionId()),
e);
break;
} else {
LOG.warn(
"Session 0x{} for sever {}, Closing socket connection. "
+ "Attempting reconnect except it is a SessionExpiredException.",
Long.toHexString(getSessionId()),
serverAddress,
e);
// At this point, there might still be new packets appended to outgoingQueue.
// they will be handled in next connection or cleared up if closed.
cleanAndNotifyState();
}
}
}
//代碼到這步履肃,說明客戶端和服務(wù)端的連接出現(xiàn)了異常
synchronized (state) {
// When it comes to this point, it guarantees that later queued
// packet to outgoingQueue will be notified of death.
cleanup();
}
clientCnxnSocket.close();
if (state.isAlive()) {
//向客戶端事件處理線程發(fā)現(xiàn)服務(wù)端連接斷開信息
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
}
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
ZooTrace.logTraceMessage(
LOG,
ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
}
客戶端到服務(wù)端的socket連接
客戶端建立到服務(wù)端的socket發(fā)生在ClientCnxnSocket.connect中
void connect(InetSocketAddress addr) throws IOException {
//創(chuàng)建客戶端socketChannel
SocketChannel sock = createSock();
try {
//socketChannel向selector注冊O(shè)P_CONNECT時(shí)間仔沿,同時(shí)
//socketChannel向遠(yuǎn)程服務(wù)器發(fā)起連接請求
registerAndConnect(sock, addr);
} catch (IOException e) {
LOG.error("Unable to open socket to {}", addr);
sock.close();
throw e;
}
//連接初始化標(biāo)識
initialized = false;
/*
* Reset incomingBuffer
*/
//zookeeper默認(rèn)連接是基于NIO實(shí)現(xiàn),通信消息流分成兩個(gè)部分:消息長度和消息尺棋,消息又分成兩個(gè)部分[消息頭封锉,消息體]
//消息長度部分固定為4個(gè)字節(jié)大小用來標(biāo)識消息體的長度,lenBuffer就是用來表示消息長度的byteBuffer
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
上面客戶端向服務(wù)端發(fā)起了連接請求,之后會執(zhí)行到ClientCnxnSocketNIO.doTransport方法成福,這個(gè)方法是客戶端處理IO信息的入口
ClientCnxnSocketNIO.doTransport
void doTransport(
int waitTimeOut,
Queue<Packet> pendingQueue,
ClientCnxn cnxn) throws IOException, InterruptedException {
//等待注冊監(jiān)聽事件的發(fā)生
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
//如果發(fā)生的是OP_CONNECT,那么完成socketChannel的連接
if (sc.finishConnect()) {
updateLastSendAndHeard();
updateSocketAddresses();
//完成了sessionId建立和認(rèn)證等操作碾局,我們在下面會詳細(xì)解析
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
//如果是發(fā)生的是IO讀寫事件執(zhí)行doIO,在下面我們會詳細(xì)解析
doIO(pendingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
//如果發(fā)送隊(duì)列outgoingQueue有數(shù)據(jù)那么向selector注冊O(shè)P_WRITE監(jiān)聽
//多說一句奴艾,因?yàn)閦ookeeper NIO一次IO寫出去的數(shù)據(jù)量有限制净当,所以在一次doIO完成后還需要判斷outgoingQueue是不是還有數(shù)據(jù)要寫,如果有那么就設(shè)置OP_WRITE監(jiān)聽
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
}
//清空事件
selected.clear();
}
當(dāng)客戶端和服務(wù)端建立起socket連接之后握侧,緊接著就是session的建立蚯瞧,
sendThread.primeConnection完成了這一過程
sendThread.primeConnection
客戶端在完成到服務(wù)端的socket連接建立之后,會向服務(wù)端發(fā)起建立session會話的請求品擎,下面就是這一邏輯的實(shí)現(xiàn)
void primeConnection() throws IOException {
LOG.info(
"Socket connection established, initiating session, client: {}, server: {}",
clientCnxnSocket.getLocalSocketAddress(),
clientCnxnSocket.getRemoteSocketAddress());
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;
//初始化創(chuàng)建會話請求
ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
// TODO: here we have the only remaining use of zooKeeper in
// this class. It's to be eliminated!
if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
//下面很長的這段代碼是客戶端處理各種watcher發(fā)送到服務(wù)端的情況
List<String> dataWatches = zooKeeper.getDataWatches();
List<String> existWatches = zooKeeper.getExistWatches();
List<String> childWatches = zooKeeper.getChildWatches();
List<String> persistentWatches = zooKeeper.getPersistentWatches();
List<String> persistentRecursiveWatches = zooKeeper.getPersistentRecursiveWatches();
if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()
|| !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) {
Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
Iterator<String> persistentWatchesIter = prependChroot(persistentWatches).iterator();
Iterator<String> persistentRecursiveWatchesIter = prependChroot(persistentRecursiveWatches).iterator();
long setWatchesLastZxid = lastZxid;
while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()
|| persistentWatchesIter.hasNext() || persistentRecursiveWatchesIter.hasNext()) {
List<String> dataWatchesBatch = new ArrayList<String>();
List<String> existWatchesBatch = new ArrayList<String>();
List<String> childWatchesBatch = new ArrayList<String>();
List<String> persistentWatchesBatch = new ArrayList<String>();
List<String> persistentRecursiveWatchesBatch = new ArrayList<String>();
int batchLength = 0;
// Note, we may exceed our max length by a bit when we add the last
// watch in the batch. This isn't ideal, but it makes the code simpler.
while (batchLength < SET_WATCHES_MAX_LENGTH) {
final String watch;
if (dataWatchesIter.hasNext()) {
watch = dataWatchesIter.next();
dataWatchesBatch.add(watch);
} else if (existWatchesIter.hasNext()) {
watch = existWatchesIter.next();
existWatchesBatch.add(watch);
} else if (childWatchesIter.hasNext()) {
watch = childWatchesIter.next();
childWatchesBatch.add(watch);
} else if (persistentWatchesIter.hasNext()) {
watch = persistentWatchesIter.next();
persistentWatchesBatch.add(watch);
} else if (persistentRecursiveWatchesIter.hasNext()) {
watch = persistentRecursiveWatchesIter.next();
persistentRecursiveWatchesBatch.add(watch);
} else {
break;
}
batchLength += watch.length();
}
Record record;
int opcode;
if (persistentWatchesBatch.isEmpty() && persistentRecursiveWatchesBatch.isEmpty()) {
// maintain compatibility with older servers - if no persistent/recursive watchers
// are used, use the old version of SetWatches
record = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch);
opcode = OpCode.setWatches;
} else {
record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch,
childWatchesBatch, persistentWatchesBatch, persistentRecursiveWatchesBatch);
opcode = OpCode.setWatches2;
}
//set watcher 請求的header
RequestHeader header = new RequestHeader(ClientCnxn.SET_WATCHES_XID, opcode);
//把請求頭和請求體封裝成Packet對象然后放入outgoingQueue中埋合,等待發(fā)送
Packet packet = new Packet(header, new ReplyHeader(), record, null, null);
outgoingQueue.addFirst(packet);
}
}
}
for (AuthData id : authInfo) {
//把客戶端認(rèn)證信息放入outgoingQueue中
outgoingQueue.addFirst(
new Packet(
new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth),
null,
new AuthPacket(0, id.scheme, id.data),
null,
null));
}
//最后把連接請求加入到outgoingQueue的頭部,
outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
//通過connectionPrimed向selector注冊op_read和op_write事件
clientCnxnSocket.connectionPrimed();
LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress());
}
上面的發(fā)送的連接請求就會觸發(fā)ClientCnxnSocketNIO.doIO方法
ClientCnxnSocketNIO.doIO
客戶端處理IO事件的方法萄传,這個(gè)方法也是很長請大家耐心看完甚颂,我都耐心的分析完了,我想讀者應(yīng)該更有耐心讀完
void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
//從SelectionKey中獲取SocketChannel
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
//如果是可讀事件發(fā)生
if (sockKey.isReadable()) {
//從socket中讀取消息
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
//數(shù)據(jù)讀取完成秀菱,設(shè)置byteBuffer狀態(tài)準(zhǔn)備讀
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
//incomingBuffer等于lenBuffer說讀取的消息長度信息
recvCount.getAndIncrement();
//獲取到了消息的長度振诬,那么就初始化一個(gè)相應(yīng)長度的bytebuffer,為讀取消息做準(zhǔn)備
readLength();
} else if (!initialized) {
//如果連接還沒有初始化衍菱,說明session會話還沒建立完成赶么,
//那么通過readConnectResult來處理服務(wù)端發(fā)送來的ConnectResponse,下面我們會解析
readConnectResult();
//注冊op_read事件
enableRead();
//下面同樣是根據(jù)outgoingQueue的狀態(tài)設(shè)置op_write信息
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
//下面是一些設(shè)置和清理操作
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
//上面是讀取消息長度的過程脊串,下面就是讀取消息體過程辫呻,
//readResponse下面會詳細(xì)解析
sendThread.readResponse(incomingBuffer);
//重置lenBuffer
lenBuffer.clear();
//設(shè)置incomingBuffer = lenBuffer 為下次讀取做準(zhǔn)備
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
//下面是處理寫事件的過程
if (sockKey.isWritable()) {
//從outgoingQueue中獲取第一個(gè)待發(fā)送的消息
Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null)
&& (p.requestHeader.getType() != OpCode.ping)
&& (p.requestHeader.getType() != OpCode.auth)) {
//如果請求不是ping和auth類型,那么客戶端為了保證請求按照順序處理琼锋,會在requestHeader中設(shè)置xid放闺,xid在客戶端按照自增的形式產(chǎn)生
p.requestHeader.setXid(cnxn.getXid());
}
//把請求消息對象轉(zhuǎn)換成byteBuffer,下面會解析
p.createBB();
}
//把消息通過socket發(fā)送給服務(wù)端
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
//如果一個(gè)消息被一次性的發(fā)送了缕坎,那么從outgoingQueue把這個(gè)消息刪除怖侦,如果一次write io操作沒有把一個(gè)消息寫完,那么這個(gè)消息會繼續(xù)存在outgoingQueue中等待下一次write io 繼續(xù)寫出去
sentCount.getAndIncrement();
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
//如果發(fā)送的請求不是ping和auth類型的谜叹,那么這個(gè)請求需要等待服務(wù)端的response匾寝,把該請求放入pendingQueue中
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
//如果outgoingQueue中的所有消息都發(fā)送了,那么取消對op_write的監(jiān)控
// No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport() if not.
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
// On initial connection, write the complete connect request
// packet, but then disable further writes until after
// receiving a successful connection response. If the
// session is expired, then the server sends the expiration
// response and immediately closes its end of the socket. If
// the client is simultaneously writing on its end, then the
// TCP stack may choose to abort with RST, in which case the
// client would never receive the session expired event. See
// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
//簡單來說就是在初始連接成功后但是很快session超時(shí)了叉谜,這個(gè)時(shí)候服務(wù)端會給客戶端發(fā)送session超時(shí)事件同時(shí)關(guān)閉socket連接旗吁,如果與此同時(shí)客戶端發(fā)送消息給服務(wù)端,會導(dǎo)致TCP的RST狀態(tài)從而導(dǎo)致客戶端收不到session 超時(shí)的消息停局。故而在連接沒有完成的情況下initialized=false很钓,客戶端取消對op_write的監(jiān)聽
disableWrite();
} else {
// Just in case
//就像注釋一樣香府,以防萬一,outgoingQueue還有數(shù)據(jù)繼續(xù)注冊op_write監(jiān)聽
enableWrite();
}
}
}
上面分析了doIO的邏輯码倦,還有幾個(gè)小點(diǎn)需要交待企孩,我們先看下消息是如何轉(zhuǎn)化成ByteBuffer的
我們先看消息對象
對于發(fā)送端來說,Packet類有兩個(gè)屬性袁稽,請求頭requestHeader和請求體request
這個(gè)兩個(gè)屬性的數(shù)據(jù)會轉(zhuǎn)化成ByteBuffer類型的bb勿璃,下面我們就分析這個(gè)過程
public void createBB() {
try {
//創(chuàng)建輸出流
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
//消息流的前4個(gè)字節(jié)是全部消息的長度,但是目前還沒確定推汽,所以先初始化成-1
boa.writeInt(-1, "len"); // We'll fill this in later
if (requestHeader != null) {
//requestHeader序列化到消息輸出流中补疑,requestHeader會序列化兩個(gè)屬性:客戶端事物id(xid),請求類型碼type歹撒,其實(shí)這個(gè)對象序列化結(jié)果也是固定長度[ 4(xid) + 4(type) = 8個(gè)字節(jié) ]
requestHeader.serialize(boa, "header");
}
if (request instanceof ConnectRequest) {
//如果請求是創(chuàng)建會話連接莲组,序列化ConnectRequest到消息輸出流
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
//序列化請求到消息輸出流
request.serialize(boa, "request");
}
baos.close();
//把消息輸出流轉(zhuǎn)化成ByteBuffer
this.bb = ByteBuffer.wrap(baos.toByteArray());
//設(shè)置消息的長度
this.bb.putInt(this.bb.capacity() - 4);
//為寫做準(zhǔn)備
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
}
}
zookeeper使用自帶的JUTE作為序列化實(shí)現(xiàn),有興趣的可以去研究下
上面解析了消息發(fā)送時(shí)的結(jié)構(gòu)暖夭,接來下我們分析客戶端處理會話創(chuàng)建完成的response
SendThread.readConnectResult
void readConnectResult() throws IOException {
if (LOG.isTraceEnabled()) {
StringBuilder buf = new StringBuilder("0x[");
for (byte b : incomingBuffer.array()) {
buf.append(Integer.toHexString(b)).append(",");
}
buf.append("]");
if (LOG.isTraceEnabled()) {
LOG.trace("readConnectResult {} {}", incomingBuffer.remaining(), buf.toString());
}
}
//通過ByteBuffer創(chuàng)建輸入流
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
//反序列化ConnectResponse
conRsp.deserialize(bbia, "connect");
// read "is read-only" flag
boolean isRO = false;
try {
isRO = bbia.readBool("readOnly");
} catch (IOException e) {
// this is ok -- just a packet from an old server which
// doesn't contain readOnly field
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}
//從反序列化的ConnectResponse對象中獲得sessionId
this.sessionId = conRsp.getSessionId();
//向eventThread發(fā)送連接結(jié)果的通知
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
}
到此zookeeper 完成了客戶端的啟動锹杈,客戶端啟動包含了socket連接建立和session建立的過程,下圖對上面過程做了簡短的概述