前言
我在前面的文章Zookeeper單機(jī)版源碼解析系列的解析zookeeper源代碼汗茄,在前面介紹中悬包,zookeeper網(wǎng)絡(luò)通信層是基于NIO實(shí)現(xiàn)的,其實(shí)zookeeper還提供了對netty的支持蕴茴,如果想使用netty作為zookeeper網(wǎng)絡(luò)通信層的實(shí)現(xiàn),需要在client和server端分別去指定
- Client端需要設(shè)置啟動(dòng)參數(shù)
-Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
- Server端需要設(shè)置啟動(dòng)參數(shù)
-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
Netty在Zookeeper客戶端的使用
如果大家看過之前系列的代碼姐直,可能對下面這段代碼有印象,這段代碼是客戶端創(chuàng)建session層連接表示對象時(shí)調(diào)用的方法
private ClientCnxnSocket getClientCnxnSocket() throws IOException {
//獲取客戶端連接的實(shí)現(xiàn)類倦淀,我們通過設(shè)置指定為org.apache.zookeeper.ClientCnxnSocketNetty
String clientCnxnSocketName = getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
//通過反射生成客戶端連接實(shí)例
Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName)
.getDeclaredConstructor(ZKClientConfig.class);
ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
return clientCxnSocket;
} catch (Exception e) {
throw new IOException("Couldn't instantiate " + clientCnxnSocketName, e);
}
}
ClientCnxnSocketNetty
ClientCnxnSocketNetty是zookeeper基于netty創(chuàng)建的session層連接表示對象
我們分析下ClientCnxnSocketNetty的創(chuàng)建
ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException {
this.clientConfig = clientConfig;
// Client only has 1 outgoing socket, so the event loop group only needs
// a single thread.
//客戶端創(chuàng)建只包含一個(gè)執(zhí)行線程的eventLoopGroup
eventLoopGroup = NettyUtils.newNioOrEpollEventLoopGroup(1 /* nThreads */);
initProperties();
}
SendThread.run會調(diào)用ClientCxnx.startConnect來創(chuàng)建客戶端到服務(wù)端socket連接,真正創(chuàng)建連接的地方是ClientCnxnSocket.connect方法声畏,對于ClientCnxnSocketNIO來說就是創(chuàng)建SocketChannel撞叽,然后SocketChannel根據(jù)指定的IP:Port去連接遠(yuǎn)程服務(wù)器,這個(gè)之前解析過插龄。我們現(xiàn)在分析基于netty實(shí)現(xiàn)的ClientCnxnSocketNetty.connect
ClientCnxnSocketNetty.connect
void connect(InetSocketAddress addr) throws IOException {
firstConnect = new CountDownLatch(1);
//初始化客戶端bootstrap愿棋,并設(shè)定客戶端的handler為ZKClientPipelineFactory,
//ZKClientPipelineFactory繼承了ChannelInitializer均牢,下面我們會分析它的initChannel方法
Bootstrap bootstrap = new Bootstrap().group(eventLoopGroup)
.channel(NettyUtils.nioOrEpollSocketChannel())
.option(ChannelOption.SO_LINGER, -1)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
//設(shè)置ByteBufAllocator
bootstrap = configureBootstrapAllocator(bootstrap);
bootstrap.validate();
//下面是執(zhí)行連接到服務(wù)端的過程糠雨,先加鎖,這把鎖將來會在處理連接響應(yīng)的時(shí)候用到
connectLock.lock();
try {
//異步的執(zhí)行連接到服務(wù)端
connectFuture = bootstrap.connect(addr);
//添加連接結(jié)果回調(diào)
connectFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
// this lock guarantees that channel won't be assigned after cleanup().
boolean connected = false;
connectLock.lock();
try {
if (!channelFuture.isSuccess()) {
//連接不成功徘跪,直接返回
LOG.warn("future isn't success.", channelFuture.cause());
return;
} else if (connectFuture == null) {
LOG.info("connect attempt cancelled");
// If the connect attempt was cancelled but succeeded
// anyway, make sure to close the channel, otherwise
// we may leak a file descriptor.
channelFuture.channel().close();
return;
}
// setup channel, variables, connection, etc.
//獲取建立的連接通道
channel = channelFuture.channel();
//設(shè)置disconnect狀態(tài)為false甘邀,表示已經(jīng)連接上
disconnected.set(false);
//設(shè)置initialized為false表示session會話還沒有建立
initialized = false;
//lenBuffer,incomingBuffer和之前文章中分析的作用一樣垮庐,用來讀取服務(wù)端發(fā)送來的數(shù)據(jù)
lenBuffer.clear();
incomingBuffer = lenBuffer;
//primeConnection發(fā)送建立session會話請求(這個(gè)在NIO通信的系列文章中已經(jīng)分析過)松邪,下面我們會解析這個(gè)請求是如何被發(fā)送出去的
sendThread.primeConnection();
updateNow();
updateLastSendAndHeard();
if (sendThread.tunnelAuthInProgress()) {
waitSasl.drainPermits();
needSasl.set(true);
sendPrimePacket();
} else {
needSasl.set(false);
}
connected = true;
} finally {
connectFuture = null;
//釋放連接鎖
connectLock.unlock();
if (connected) {
LOG.info("channel is connected: {}", channelFuture.channel());
}
// need to wake on connect success or failure to avoid
// timing out ClientCnxn.SendThread which may be
// blocked waiting for first connect in doTransport().
wakeupCnxn();
//完成連接計(jì)數(shù)釋放
firstConnect.countDown();
}
}
});
} finally {
// 釋放連接鎖
connectLock.unlock();
}
}
通過對connect分析,可以看到這個(gè)方法完成了netty客戶端bootstrap建立哨查,客戶端到服務(wù)端socket建立逗抑,創(chuàng)建session會話請求。因?yàn)閏onnect方法是異步的寒亥,所以SendThread.run會繼續(xù)執(zhí)行到ClientCnxnSocketNetty.doTransport邮府,之前NIO系列文章有解析過ClientCnxnSocketNIO.doTransport的源代碼,下面我們解析ClientCnxnSocketNetty.doTransport的實(shí)現(xiàn)
ClientCnxnSocketNetty.doTransport
void doTransport(
int waitTimeOut,
Queue<Packet> pendingQueue,
ClientCnxn cnxn) throws IOException, InterruptedException {
try {
//firstConnect等待連接完成
if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
return;
}
Packet head = null;
if (needSasl.get()) {
if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
return;
}
} else {
//從outgoingQueue取出一個(gè)請求packet
head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS);
}
// check if being waken up on closing.
if (!sendThread.getZkState().isAlive()) {
// adding back the packet to notify of failure in conLossPacket().
//如果服務(wù)端連接掛了或者連接還沒有建立好溉奕,這個(gè)時(shí)候如果有請求packet被取到褂傀,那么直接使用addBack把該請求加入到outgoing的頭部
addBack(head);
return;
}
// channel disconnection happened
if (disconnected.get()) {
addBack(head);
throw new EndOfStreamException("channel for sessionid 0x" + Long.toHexString(sessionId) + " is lost");
}
if (head != null) {
//把請求寫到服務(wù)端
doWrite(pendingQueue, head, cnxn);
}
} finally {
updateNow();
}
}
不同于ClientCnxnSocketNIO.doTransport處理邏輯,在ClientCnxnSocketNetty中直接處理寫數(shù)據(jù)的請求
ClientCnxnSocketNetty.doWrite
private void doWrite(Queue<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
updateNow();
boolean anyPacketsSent = false;
while (true) {
if (p != WakeupPacket.getInstance()) {
//如果不是wakeUp型的packet
if ((p.requestHeader != null)
&& (p.requestHeader.getType() != ZooDefs.OpCode.ping)
&& (p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
//設(shè)置packet的xid用來保證請求的順序處理
p.requestHeader.setXid(cnxn.getXid());
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
//這個(gè)方法就是使用netty把數(shù)據(jù)寫出去的入口
sendPktOnly(p);
anyPacketsSent = true;
}
//如果outgoingQueue中有數(shù)據(jù)那么一直去寫腐宋,
if (outgoingQueue.isEmpty()) {
//outgoingQueue中的數(shù)據(jù)寫完了紊服,跳出doWrite
break;
}
p = outgoingQueue.remove();
}
// TODO: maybe we should flush in the loop above every N packets/bytes?
// But, how do we determine the right value for N ...
if (anyPacketsSent) {
//如果本次有數(shù)據(jù)被寫出去檀轨,那么通過channel.flush把數(shù)據(jù)刷出到網(wǎng)絡(luò)
channel.flush();
}
}
sendPktOnly是使用netty把數(shù)據(jù)寫出去的入口,它最終會調(diào)用ClientCnxnSocketNetty.sendPkt
ClientCnxnSocketNetty.sendPkt
private ChannelFuture sendPkt(Packet p, boolean doFlush) {
// Assuming the packet will be sent out successfully. Because if it fails,
// the channel will close and clean up queues.
//把packet中的請求轉(zhuǎn)化成ByteBuffer
p.createBB();
updateLastSend();
final ByteBuf writeBuffer = Unpooled.wrappedBuffer(p.bb)
//通過channel.writeAndFlush或者write把數(shù)據(jù)發(fā)送出去
final ChannelFuture result = doFlush ? channel.writeAndFlush(writeBuffer) : channel.write(writeBuffer);
result.addListener(onSendPktDoneListener);
return result;
}
通過上面的分析我們可以了解到客戶端是如何使用netty把請求發(fā)送給服務(wù)端欺嗤,接下來我分析客戶端是如何使用netty讀取來自服務(wù)端的響應(yīng)數(shù)據(jù)参萄,這需要先看下ZKClientPipelineFactory.initChannel方法
ZKClientPipelineFactory.initChannel
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (clientConfig.getBoolean(ZKClientConfig.SECURE_CLIENT)) {
initSSL(pipeline);
}
//zookeeper客戶端通過initChannel向pipeline注冊了一個(gè)ZKClientHandler,ZKClientHandler是一個(gè)inbound類型的handler
//沒有注冊任何outbound類型的handler煎饼,所以在上面講解zookeeper是如何使用netty來發(fā)送請求到服務(wù)端的時(shí)候讹挎,我們沒有先講解ZKClientPipelineFactory.initChannel
pipeline.addLast("handler", new ZKClientHandler());
}
我們看下ZKClientHandler定義
ZKClientHandler
private class ZKClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
AtomicBoolean channelClosed = new AtomicBoolean(false);
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//處理連接斷開的情況
LOG.info("channel is disconnected: {}", ctx.channel());
cleanup();
}
/**
* netty handler has encountered problems. We are cleaning it up and tell outside to close
* the channel/connection.
*/
private void cleanup() {
//設(shè)置channelClosed為true
if (!channelClosed.compareAndSet(false, true)) {
return;
}
disconnected.set(true);
onClosing();
}
//channelRead0處理來自服務(wù)端的響應(yīng)數(shù)據(jù)
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
updateNow();
//下面對請求的處理過程和NIO很像
while (buf.isReadable()) {
if (incomingBuffer.remaining() > buf.readableBytes()) {
int newLimit = incomingBuffer.position() + buf.readableBytes();
incomingBuffer.limit(newLimit);
}
//用incomingBuffer來讀數(shù)據(jù)
buf.readBytes(incomingBuffer);
incomingBuffer.limit(incomingBuffer.capacity());
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
//incomingBuffer == lenBuffer說明這個(gè)時(shí)候讀的是4個(gè)字節(jié)表示響應(yīng)數(shù)據(jù)長度的數(shù)據(jù)
recvCount.getAndIncrement();
readLength();
} else if (!initialized) {
// initialized如果為false說明這個(gè)響應(yīng)是建立session會話的響應(yīng)
//readConnectResult之前在NIO系列文章中有解析
readConnectResult();
lenBuffer.clear();
incomingBuffer = lenBuffer;
initialized = true;
updateLastHeard();
} else {
//這個(gè)時(shí)候incomingBuffer包含的數(shù)據(jù)是請求的響應(yīng)對象,
//sendThread.readResponse之前在NIO 系列文章中已經(jīng)講解過
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
wakeupCnxn();
// Note: SimpleChannelInboundHandler releases the ByteBuf for us
// so we don't need to do it.
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.error("Unexpected throwable", cause);
cleanup();
}
}
通過上面對ZKClientHandler的解析吆玖,相信大家對zookeeper如何使用netty讀取服務(wù)端響應(yīng)數(shù)據(jù)有了一定的了解筒溃。
到此我們就完成zookeeper客戶端是如何使用netty來完成網(wǎng)絡(luò)通信的講解,下面我們來說說netty在zookeeper服務(wù)端的使用
Netty在zookeeper server端的使用
服務(wù)端在啟動(dòng)時(shí)候會創(chuàng)建ServerCnxnFactory沾乘,默認(rèn)ServerCnxnFactory的實(shí)現(xiàn)是NIOServerCnxnFactory怜奖,下面我們看下NettyServerCnxnFactory的實(shí)現(xiàn)。
NettyServerCnxnFactory初始化
NettyServerCnxnFactory() {
x509Util = new ClientX509Util();
boolean usePortUnification = Boolean.getBoolean(PORT_UNIFICATION_KEY);
LOG.info("{}={}", PORT_UNIFICATION_KEY, usePortUnification);
if (usePortUnification) {
try {
QuorumPeerConfig.configureSSLAuth();
} catch (QuorumPeerConfig.ConfigException e) {
LOG.error("unable to set up SslAuthProvider, turning off client port unification", e);
usePortUnification = false;
}
}
this.shouldUsePortUnification = usePortUnification;
this.advancedFlowControlEnabled = Boolean.getBoolean(NETTY_ADVANCED_FLOW_CONTROL);
LOG.info("{} = {}", NETTY_ADVANCED_FLOW_CONTROL, this.advancedFlowControlEnabled);
setOutstandingHandshakeLimit(Integer.getInteger(OUTSTANDING_HANDSHAKE_LIMIT, -1));
//bossGroup處理客戶端的連接請求翅阵,workerGroup負(fù)責(zé)每個(gè)連接IO事件的處理歪玲,典型的reactor模式
EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup(NettyUtils.getClientReachableLocalInetAddressCount());
EventLoopGroup workerGroup = NettyUtils.newNioOrEpollEventLoopGroup();
//下面是創(chuàng)建服務(wù)端ServerBootstrap的典型模式?jīng)]有什么好講解的,我們直接看channelHandler的邏輯
ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NettyUtils.nioOrEpollServerSocketChannel())
// parent channel options
.option(ChannelOption.SO_REUSEADDR, true)
// child channels options
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_LINGER, -1)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (advancedFlowControlEnabled) {
pipeline.addLast(readIssuedTrackingHandler);
}
if (secure) {
initSSL(pipeline, false);
} else if (shouldUsePortUnification) {
initSSL(pipeline, true);
}
pipeline.addLast("servercnxnfactory", channelHandler);
}
});
this.bootstrap = configureBootstrapAllocator(bootstrap);
this.bootstrap.validate();
}
zookeeper 服務(wù)端的ServerBootstrap只包含一個(gè)業(yè)務(wù)處理handler: CnxnChannelHandler掷匠,CnxnChannelHandler是一個(gè)duplexHandler滥崩,下面我們分析下它的實(shí)現(xiàn)
CnxnChannelHandler定義
class CnxnChannelHandler extends ChannelDuplexHandler {
//當(dāng)服務(wù)端接受客戶端的socket連接之后,channelActive會被調(diào)用
//正常情況下通過channelActive服務(wù)端session層的連接表示對象會被建立起來
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel active {}", ctx.channel());
}
final Channel channel = ctx.channel();
//連接數(shù)有沒有達(dá)到服務(wù)端設(shè)置的連接最大數(shù)讹语,如果達(dá)到了钙皮,直接關(guān)閉底層socket,拒絕新的連接請求
if (limitTotalNumberOfCnxns()) {
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
channel.close();
return;
}
InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
//單個(gè)客戶端的連接數(shù)是不是超過了用戶設(shè)置的最大可建立連接數(shù)顽决,如果達(dá)到了拒絕客戶端的連接請求
if (maxClientCnxns > 0 && getClientCnxnCount(addr) >= maxClientCnxns) {
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
LOG.warn("Too many connections from {} - max is {}", addr, maxClientCnxns);
channel.close();
return;
}
//創(chuàng)建session會話層的連接表示對象NettyServerCnxn
NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this);
//把session會話連接表示對象存儲到channel中
ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
if (handshakeThrottlingEnabled) {
// Favor to check and throttling even in dual mode which
// accepts both secure and insecure connections, since
// it's more efficient than throttling when we know it's
// a secure connection in DualModeSslHandler.
//
// From benchmark, this reduced around 15% reconnect time.
int outstandingHandshakesNum = outstandingHandshake.addAndGet(1);
if (outstandingHandshakesNum > outstandingHandshakeLimit) {
outstandingHandshake.addAndGet(-1);
channel.close();
ServerMetrics.getMetrics().TLS_HANDSHAKE_EXCEEDED.add(1);
} else {
cnxn.setHandshakeState(HandshakeState.STARTED);
}
}
if (secure) {
SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
} else if (!shouldUsePortUnification) {
allChannels.add(ctx.channel());
addCnxn(cnxn);
}
}
//連接關(guān)閉時(shí)的處理邏輯
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel inactive {}", ctx.channel());
}
allChannels.remove(ctx.channel());
NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
if (cnxn != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Channel inactive caused close {}", cnxn);
}
updateHandshakeCountIfStarted(cnxn);
//關(guān)閉連接
cnxn.close(ServerCnxn.DisconnectReason.CHANNEL_DISCONNECTED);
}
}
//異常處理方法
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOG.warn("Exception caught", cause);
NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
if (cnxn != null) {
LOG.debug("Closing {}", cnxn);
updateHandshakeCountIfStarted(cnxn);
cnxn.close(ServerCnxn.DisconnectReason.CHANNEL_CLOSED_EXCEPTION);
}
}
//處理用自定義的channel事件:主要是處理channel讀和不讀的事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
try {
if (evt == NettyServerCnxn.ReadEvent.ENABLE) {
LOG.debug("Received ReadEvent.ENABLE");
NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
// TODO: Not sure if cnxn can be null here. It becomes null if channelInactive()
// or exceptionCaught() trigger, but it's unclear to me if userEventTriggered() can run
// after either of those. Check for null just to be safe ...
if (cnxn != null) {
if (cnxn.getQueuedReadableBytes() > 0) {
cnxn.processQueuedBuffer();
if (advancedFlowControlEnabled && cnxn.getQueuedReadableBytes() == 0) {
// trigger a read if we have consumed all
// backlog
ctx.read();
LOG.debug("Issued a read after queuedBuffer drained");
}
}
}
if (!advancedFlowControlEnabled) {
ctx.channel().config().setAutoRead(true);
}
} else if (evt == NettyServerCnxn.ReadEvent.DISABLE) {
LOG.debug("Received ReadEvent.DISABLE");
ctx.channel().config().setAutoRead(false);
}
} finally {
ReferenceCountUtil.release(evt);
}
}
//服務(wù)端讀取客戶端發(fā)送來的請求數(shù)據(jù)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("message received called {}", msg);
}
try {
LOG.debug("New message {} from {}", msg, ctx.channel());
//對應(yīng)于channelActive短条,從channel中獲取session層連接表示對象NettyServerCnxn
NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
if (cnxn == null) {
LOG.error("channelRead() on a closed or closing NettyServerCnxn");
} else {
//NettyServerCnxn處理客戶端發(fā)送過來的請求
cnxn.processMessage((ByteBuf) msg);
}
} catch (Exception ex) {
LOG.error("Unexpected exception in receive", ex);
throw ex;
}
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (advancedFlowControlEnabled) {
NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
if (cnxn != null && cnxn.getQueuedReadableBytes() == 0 && cnxn.readIssuedAfterReadComplete == 0) {
ctx.read();
LOG.debug("Issued a read since we do not have anything to consume after channelReadComplete");
}
}
ctx.fireChannelReadComplete();
}
// Use a single listener instance to reduce GC
// Note: this listener is only added when LOG.isTraceEnabled() is true,
// so it should not do any work other than trace logging.
private final GenericFutureListener<Future<Void>> onWriteCompletedTracer = (f) -> {
if (LOG.isTraceEnabled()) {
LOG.trace("write success: {}", f.isSuccess());
}
};
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (LOG.isTraceEnabled()) {
promise.addListener(onWriteCompletedTracer);
}
super.write(ctx, msg, promise);
}
}
現(xiàn)在我們看下NettyServerCnxn.processMessage是如何處理讀入的請求信息
NettyServerCnxn.processMessage
void processMessage(ByteBuf buf) {
checkIsInEventLoop("processMessage");
LOG.debug("0x{} queuedBuffer: {}", Long.toHexString(sessionId), queuedBuffer);
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} buf {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(buf));
}
if (throttled.get()) {
LOG.debug("Received message while throttled");
// we are throttled, so we need to queue
if (queuedBuffer == null) {
LOG.debug("allocating queue");
queuedBuffer = channel.alloc().compositeBuffer();
}
appendToQueuedBuffer(buf.retainedDuplicate());
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} queuedBuffer {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(queuedBuffer));
}
} else {
LOG.debug("not throttled");
if (queuedBuffer != null) {
appendToQueuedBuffer(buf.retainedDuplicate());
processQueuedBuffer();
} else {
//上面的直接跳過,來到receiveMessage
receiveMessage(buf);
// Have to check !closingChannel, because an error in
// receiveMessage() could have led to close() being called.
if (!closingChannel && buf.isReadable()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Before copy {}", buf);
}
if (queuedBuffer == null) {
queuedBuffer = channel.alloc().compositeBuffer();
}
appendToQueuedBuffer(buf.retainedSlice(buf.readerIndex(), buf.readableBytes()));
if (LOG.isTraceEnabled()) {
LOG.trace("Copy is {}", queuedBuffer);
LOG.trace("0x{} queuedBuffer {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(queuedBuffer));
}
}
}
}
}
NettyServerCnxn.receiveMessage
NettyServerCnxn.receiveMessage是zookeeper server處理客戶端請求的核心
private void receiveMessage(ByteBuf message) {
checkIsInEventLoop("receiveMessage");
try {
while (message.isReadable() && !throttled.get()) {
if (bb != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("message readable {} bb len {} {}", message.readableBytes(), bb.remaining(), bb);
ByteBuffer dat = bb.duplicate();
dat.flip();
LOG.trace("0x{} bb {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
if (bb.remaining() > message.readableBytes()) {
int newLimit = bb.position() + message.readableBytes();
bb.limit(newLimit);
}
//使用bb接受客戶端發(fā)送來的請求對象
message.readBytes(bb);
bb.limit(bb.capacity());
if (LOG.isTraceEnabled()) {
LOG.trace("after readBytes message readable {} bb len {} {}", message.readableBytes(), bb.remaining(), bb);
ByteBuffer dat = bb.duplicate();
dat.flip();
LOG.trace("after readbytes 0x{} bb {}",
Long.toHexString(sessionId),
ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
if (bb.remaining() == 0) {
bb.flip();
packetReceived(4 + bb.remaining());
ZooKeeperServer zks = this.zkServer;
if (zks == null || !zks.isRunning()) {
throw new IOException("ZK down");
}
if (initialized) {
//如果連接已經(jīng)完成擎值,那么使用zks.processPacket來處理這個(gè)請求慌烧,processPacket在NIO系列文章中已經(jīng)解析過
// TODO: if zks.processPacket() is changed to take a ByteBuffer[],
// we could implement zero-copy queueing.
zks.processPacket(this, bb);
} else {
LOG.debug("got conn req request from {}", getRemoteSocketAddress());
//如果連接沒有完成說明這個(gè)請求是一個(gè)session會話創(chuàng)建請求逐抑,那么使用zks.processConnectRequest來處理這個(gè)請求鸠儿,
//zks.processConnectRequest在NIO系列文章中已經(jīng)解析過
zks.processConnectRequest(this, bb);
initialized = true;
}
bb = null;
}
} else {
//代碼執(zhí)行到這里說明bb還沒有被初始化,那么第一次需要從ByteBuf讀取的數(shù)據(jù)是前4個(gè)字節(jié)表示的是請求的長度
if (LOG.isTraceEnabled()) {
LOG.trace("message readable {} bblenrem {}", message.readableBytes(), bbLen.remaining());
ByteBuffer dat = bbLen.duplicate();
dat.flip();
LOG.trace("0x{} bbLen {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
if (message.readableBytes() < bbLen.remaining()) {
bbLen.limit(bbLen.position() + message.readableBytes());
}
//從message中讀取前4個(gè)字節(jié)
message.readBytes(bbLen);
bbLen.limit(bbLen.capacity());
if (bbLen.remaining() == 0) {
bbLen.flip();
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} bbLen {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(bbLen)));
}
//獲取客戶請求數(shù)據(jù)的長度
int len = bbLen.getInt();
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} bbLen len is {}", Long.toHexString(sessionId), len);
}
bbLen.clear();
if (!initialized) {
if (checkFourLetterWord(channel, message, len)) {
return;
}
}
if (len < 0 || len > BinaryInputArchive.maxBuffer) {
throw new IOException("Len error " + len);
}
// checkRequestSize will throw IOException if request is rejected
zkServer.checkRequestSizeWhenReceivingMessage(len);
//創(chuàng)建容量為len的ByteBuffer厕氨,為接受客戶請求數(shù)據(jù)做準(zhǔn)備
bb = ByteBuffer.allocate(len);
}
}
}
} catch (IOException e) {
LOG.warn("Closing connection to {}", getRemoteSocketAddress(), e);
close(DisconnectReason.IO_EXCEPTION);
} catch (ClientCnxnLimitException e) {
// Common case exception, print at debug level
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
LOG.debug("Closing connection to {}", getRemoteSocketAddress(), e);
close(DisconnectReason.CLIENT_RATE_LIMIT);
}
}
通過receiveMessage我們知道了zookeeper是如何利用netty從客戶端讀取請求消息进每。
一個(gè)客戶端請求經(jīng)過zookeeper server請求處理鏈處理之后處理結(jié)果是如何被發(fā)送到客戶端的呢?
請求的Response會在FinalRequestProcessor中形成然后被NettyServerCnxn.sendResponse發(fā)送出去
NettyServerCnxn.sendBuffer
sendResponse會調(diào)用sendBuffer命斧,sendBuffer直接調(diào)用channel.writeAndFlush把請求響應(yīng)發(fā)送給客戶端
public void sendBuffer(ByteBuffer... buffers) {
if (buffers.length == 1 && buffers[0] == ServerCnxnFactory.closeConn) {
close(DisconnectReason.CLIENT_CLOSED_CONNECTION);
return;
}
channel.writeAndFlush(Unpooled.wrappedBuffer(buffers)).addListener(onSendBufferDoneListener);
}
上面就是zookeeper server端如何使用netty處理IO事件的過程