摘要
本節(jié)講解ClientCnxnSocket以及ClientCnxnSocketNIO
涉及一些NIO的知識蛙埂,希望自行了解。
相關(guān)源碼分析很少驰弄,沒有什么參照
主要講解
ClientCnxnSocket抽象類結(jié)構(gòu)
readConnectResult方法 讀取server的connect的response
readLength方法 讀取buffer長度并給incomingBuffer分配對應(yīng)大小空間
ClientCnxnSocketNIO實現(xiàn)
findSendablePacket函數(shù)
根據(jù)sasl以及outgoingQueue情況獲取發(fā)送的Packet
doIO函數(shù)
讀就緒時,讀取response
寫就緒時,從findSendablePacket找到可發(fā)送的Packet
doTransport函數(shù)
如果是連接就緒麻汰,調(diào)用sendThread連接操作
若讀寫就緒,調(diào)用doIO函數(shù)
connect戚篙,createSock五鲫,registerAndConnect函數(shù)
完成client到server的socket連接(僅為網(wǎng)絡(luò)連接,并沒有和server進行IO岔擂,更沒有得到server的connect的response)
簡介
ClientCnxnSocket定義了底層Socket通信的接口.默認是現(xiàn)實ClientCnxnSocketNIO.
類圖如下
源碼
抽象類ClientCnxnSocket
主要講解變量以及已經(jīng)實現(xiàn)的方法
屬性
private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocket.class);
protected boolean initialized;//是否初始化
/**
* This buffer is only used to read the length of the incoming message.
*/
protected final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);//僅僅用來讀取 incoming message的長度
/**
* After the length is read, a new incomingBuffer is allocated in
* readLength() to receive the full message.
*/
protected ByteBuffer incomingBuffer = lenBuffer;
protected long sentCount = 0;//send次數(shù)
protected long recvCount = 0;//接收次數(shù)
protected long lastHeard;//上次接收時間
protected long lastSend;//上次發(fā)送時間
protected long now;//當(dāng)前時間
protected ClientCnxn.SendThread sendThread;//客戶端通信的發(fā)送線程
/**
* The sessionId is only available here for Log and Exception messages.
* Otherwise the socket doesn't need to know it.
*/
protected long sessionId;//僅僅用來輔助log和Exception記錄用的
方法
void introduce(ClientCnxn.SendThread sendThread, long sessionId) {//設(shè)置sendThread以及sessionId
this.sendThread = sendThread;
this.sessionId = sessionId;
}
void updateNow() {//更新now時間
now = System.currentTimeMillis();
}
int getIdleRecv() {//獲取接收的閑置時間
return (int) (now - lastHeard);
}
int getIdleSend() {//獲取發(fā)送的閑置時間
return (int) (now - lastSend);
}
long getSentCount() {//發(fā)送次數(shù)
return sentCount;
}
long getRecvCount() {//接收次數(shù)
return recvCount;
}
void updateLastHeard() {//更新最后一次監(jiān)聽的時間
this.lastHeard = now;
}
void updateLastSend() {//更新最后一次發(fā)送的時間
this.lastSend = now;
}
void updateLastSendAndHeard() {//同時更新最后一次監(jiān)聽和發(fā)送的時間
this.lastSend = now;
this.lastHeard = now;
}
protected void readLength() throws IOException {//讀取incoming message的length
int len = incomingBuffer.getInt();
if (len < 0 || len >= ClientCnxn.packetLen) {//默認長度[0,4M]之間
throw new IOException("Packet len" + len + " is out of range!");
}
incomingBuffer = ByteBuffer.allocate(len);//分配對應(yīng)長度的空間
}
void readConnectResult() throws IOException {//讀取connect的response
if (LOG.isTraceEnabled()) {
StringBuilder buf = new StringBuilder("0x[");
for (byte b : incomingBuffer.array()) {
buf.append(Integer.toHexString(b) + ",");
}
buf.append("]");
LOG.trace("readConnectResult " + incomingBuffer.remaining() + " "
+ buf.toString());
}
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
// read "is read-only" flag
boolean isRO = false;
try {
isRO = bbia.readBool("readOnly");//反序列化,看是否是只讀的
} catch (IOException e) {
// this is ok -- just a packet from an old server which
// doesn't contain readOnly field
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}
this.sessionId = conRsp.getSessionId();
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
conRsp.getPasswd(), isRO);//sendThread完成connect時一些參數(shù)驗證以及zk state更新以及事件處理
}
主要就是各種次數(shù)位喂,時間的設(shè)置以及獲取
其次就是注意readLength和readConnectResult方法即可
子類ClientCnxnSocketNIO
類圖如下
屬性
private static final Logger LOG = LoggerFactory
.getLogger(ClientCnxnSocketNIO.class);
private final Selector selector = Selector.open();
private SelectionKey sockKey;
主要就是NIO的東西
方法
按照一定的順序來講
client連接時
org.apache.zookeeper.ClientCnxn.SendThread#run
org.apache.zookeeper.ClientCnxn.SendThread#startConnect
org.apache.zookeeper.ClientCnxnSocketNIO#connect
@Override
void connect(InetSocketAddress addr) throws IOException {//參數(shù)是某一個zk server的地址
SocketChannel sock = createSock();
try {
registerAndConnect(sock, addr);//注冊SelectionKey到zk server
} catch (IOException e) {
LOG.error("Unable to open socket to " + addr);
sock.close();
throw e;
}
initialized = false;//還沒有初始化,connect ok了但是還讀到server的response
/*
* Reset incomingBuffer
*/
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
里面調(diào)用了createSock和registerAndConnect方法,如下
/**
* create a socket channel.
* @return the created socket channel
* @throws IOException
*/
SocketChannel createSock() throws IOException {//創(chuàng)建SocketChannel
SocketChannel sock;
sock = SocketChannel.open();
sock.configureBlocking(false);//非阻塞
sock.socket().setSoLinger(false, -1);
sock.socket().setTcpNoDelay(true);
return sock;
}
/**
* register with the selection and connect
* @param sock the {@link SocketChannel}
* @param addr the address of remote host
* @throws IOException
*/
void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
throws IOException {
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);//注冊乱灵,監(jiān)聽connect事件
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {//如果立即建立了連接
sendThread.primeConnection();//client把watches和authData等數(shù)據(jù)發(fā)過去忆某,并更新SelectionKey為讀寫
}
}
這里注意一點
registerAndConnect中如果立即connect就調(diào)用sendThread.primeConnection();
如果沒有立即connect上,那么就在下面介紹的doTransport中等待SocketChannel finishConnect再調(diào)用
client 和 server的網(wǎng)絡(luò)交互
主要函數(shù)
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut);//找到就緒的keys個數(shù)
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {//如果就緒的是connect事件阔蛉,這個出現(xiàn)在registerAndConnect函數(shù)沒有立即連接成功
if (sc.finishConnect()) {//如果次數(shù)完成了連接
updateLastSendAndHeard();//更新時間
sendThread.primeConnection();//client把watches和authData等數(shù)據(jù)發(fā)過去弃舒,并更新SelectionKey為讀寫
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {//如果就緒的是讀或者寫事件
doIO(pendingQueue, outgoingQueue, cnxn);//利用pendingQueue和outgoingQueue進行IO
}
}
if (sendThread.getZkState().isConnected()) {//如果zk的state是已連接
synchronized(outgoingQueue) {
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {//如果有可以發(fā)送的packet
enableWrite();//允許寫
}
}
}
selected.clear();//清空
}
參數(shù)pendingQueue 以及 outgoingQueue簡單介紹如下
outgoingQueue 是請求發(fā)送隊列,是client存儲需要被發(fā)送到server端的Packet隊列
pendingQueue是已經(jīng)從client發(fā)送状原,但是要等待server響應(yīng)的packet隊列
后面章節(jié)再細講
主要調(diào)用了doIO 以及 findSendablePacket方法
doIO方法如下
/**
* @return true if a packet was received
* @throws InterruptedException
* @throws IOException
*/
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
if (sockKey.isReadable()) {//若讀就緒
int rc = sock.read(incomingBuffer);//讀出len
if (rc < 0) {//如果<0,表示讀到末尾了,這種情況出現(xiàn)在連接關(guān)閉的時候
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {//如果還有數(shù)據(jù)
incomingBuffer.flip();//切換到讀模式
if (incomingBuffer == lenBuffer) {
recvCount++;//接收次數(shù)+1
readLength();//獲取len并給incomingBuffer分配對應(yīng)空間
} else if (!initialized) {//如果client和server的連接還沒有初始化
readConnectResult();//讀取connect 回復(fù)
enableRead();//啟用讀
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {//如果有可以發(fā)送的packet
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();//允許寫聋呢,因為有要發(fā)送的packet
}
lenBuffer.clear();
incomingBuffer = lenBuffer;//還原incomingBuffer
updateLastHeard();
initialized = true;//client和server連接初始化完成
} else { //如果已連接,并且已經(jīng)給incomingBuffer分配了對應(yīng)len的空間
sendThread.readResponse(incomingBuffer);//讀取response
lenBuffer.clear();
incomingBuffer = lenBuffer;//還原incomingBuffer
updateLastHeard();
}
}
}
if (sockKey.isWritable()) {//若寫就緒
synchronized(outgoingQueue) {
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());//找到可以發(fā)送的Packet
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();//如果packet還沒有生成byteBuffer颠区,那就生成byteBuffer
}
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);//從待發(fā)送隊列中取出該packet
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);//加入待回復(fù)的隊列
}
}
}
}
if (outgoingQueue.isEmpty()) {
// No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport() if not.
disableWrite();//如果沒有要發(fā)的削锰,就禁止寫
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
// On initial connection, write the complete connect request
// packet, but then disable further writes until after
// receiving a successful connection response. If the
// session is expired, then the server sends the expiration
// response and immediately closes its end of the socket. If
// the client is simultaneously writing on its end, then the
// TCP stack may choose to abort with RST, in which case the
// client would never receive the session expired event. See
// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
disableWrite();
} else {
// Just in case
enableWrite();
}
}
}
}
流程接見如下
主要分為讀或者寫兩個case
讀:
沒有初始化就完成初始化
讀取len再給incomingBuffer分配對應(yīng)空間
讀取對應(yīng)的response
寫:
找到可以發(fā)送的Packet
如果Packet的byteBuffer沒有創(chuàng)建,那么就創(chuàng)建
byteBuffer寫入socketChannel
把Packet從outgoingQueue中取出來毕莱,放到pendingQueue中
相關(guān)讀寫的處理
主要注意器贩,讀的時候是分兩次讀的
第一次只讀len颅夺,然后給incomingBuffer分配對應(yīng)的空間
第二次再把剩下的內(nèi)容讀完
findSendablePacket方法如下
private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,
boolean clientTunneledAuthenticationInProgress) {//bool參數(shù)是表示 如果當(dāng)前client和server在處理sasl的權(quán)限
synchronized (outgoingQueue) {
if (outgoingQueue.isEmpty()) {//如果沒有要發(fā)送的
return null;
}
if (outgoingQueue.getFirst().bb != null // If we've already starting sending the first packet, we better finish
|| !clientTunneledAuthenticationInProgress) {//如果有要發(fā)送的 或者 沒有在處理sasl的權(quán)限
return outgoingQueue.getFirst();
}
// Since client's authentication with server is in progress,
// send only the null-header packet queued by primeConnection().
// This packet must be sent so that the SASL authentication process
// can proceed, but all other packets should wait until
// SASL authentication completes.
ListIterator<Packet> iter = outgoingQueue.listIterator();
while (iter.hasNext()) {
Packet p = iter.next();
if (p.requestHeader == null) {//如果在處理sasl的權(quán)限,那么只有requestHeader為null的Packet可以被發(fā)送
// We've found the priming-packet. Move it to the beginning of the queue.
iter.remove();
outgoingQueue.add(0, p);
return p;
} else {
// Non-priming packet: defer it until later, leaving it in the queue
// until authentication completes.
if (LOG.isDebugEnabled()) {
LOG.debug("deferring non-priming packet: " + p +
"until SASL authentication completes.");
}
}
}
// no sendable packet found.
return null;
}
}
主要流程簡介如下
如果沒有要發(fā)送的就返回null
如果有要發(fā)送的或者client沒有在處理sasl的權(quán)限蛹稍,那么就拿隊列第一個
如果在處理sasl吧黄,那么遍歷隊列,把沒有requestHeader為null的放到隊頭唆姐,返回該packet
這個地方主要涉及到sasl驗證拗慨,并不是很了解這個機制,沒有深究
其他函數(shù)
@Override
boolean isConnected() {//這個只是說SelectionKey有沒有初始化奉芦,來標(biāo)示赵抢,并不是真正的Connected
return sockKey != null;
}
部分函數(shù)表格列舉
函數(shù) | 備注 |
---|---|
void cleanup() | socketChannel關(guān)閉,SelectionKey置空 |
void close() | selector關(guān)閉 |
SocketAddress getRemoteSocketAddress() | 獲取遠端地址 |
SocketAddress getLocalSocketAddress() | 獲取本地地址 |
synchronized void wakeupCnxn() | 喚醒selector |
void testableCloseSocket() | 測試socket關(guān)閉 |
synchronized void enableWrite() | 開啟寫 |
public synchronized void disableWrite() | 禁止寫 |
synchronized private void enableRead() | 開啟讀 |
synchronized void enableReadWriteOnly() | 僅允許讀寫 |
Selector getSelector() | 獲取selector |
void sendPacket(Packet p) | 發(fā)送packet |
思考
何時調(diào)用sendThread.primeConnection();以及里面干了什么
如果瞬間連上声功,就直接調(diào)用
否則的話就等到sc.finishConnect()再調(diào)用
這個函數(shù)完成了一些watches和authData的傳遞以及允許更改SelectionKey烦却,允許clientCnxnSocket可讀寫,
org.apache.zookeeper.ClientCnxnSocket#initialized意義
參數(shù)指的是zk client收到的zk server的正確response之后先巴,才算初始化成功
不是說NIO中的connect上了就算成功
兩者的區(qū)別在于NIO的SelectionKey
前者已經(jīng)從connect變化到了write和read
后者僅限于connect
org.apache.zookeeper.ClientCnxnSocketNIO#doIO處理讀就緒的時候短绸,為什么分兩次
第一次只讀len,然后給incomingBuffer分配對應(yīng)的空間
第二次再把剩下的內(nèi)容讀完
唯一能夠想到的優(yōu)點就是節(jié)省空間了
請求發(fā)送與接收 流程圖
吐槽以及問題
1.方法沒有注釋,甚至是錯的注釋
如錯誤的方法注釋 org.apache.zookeeper.ClientCnxnSocketNIO#doIO
2.ClientCnxnSocketNIO中connect以及state相關(guān)的函數(shù)太多了筹裕,有點繞
3.SelectionKey中醋闭,讀寫一會開一會關(guān)的目的是什么,代碼看起來很麻煩
為什么不一直允許讀寫朝卒,單個開關(guān)弄來弄去讓人疑惑证逻,除非close
是有場景需要禁止讀后者禁止寫么,還是這樣會提升性能?
4.org.apache.zookeeper.ClientCnxnSocketNIO#isConnected用SelectionKey是否初始化判斷是否Connected
不太合理抗斤,有可能剛初始化但是還沒有connect呢???
備注
sendThread在下面兩節(jié)中講到囚企,是client完成和server通信的線程
sessionId也會在后面講會話的時候進行講解
pendingQueue和outingQueue之后再講解