zookeeper 基于Netty實(shí)現(xiàn)的網(wǎng)絡(luò)通信

前言

我在前面的文章Zookeeper單機(jī)版源碼解析系列的解析zookeeper源代碼汗茄,在前面介紹中悬包,zookeeper網(wǎng)絡(luò)通信層是基于NIO實(shí)現(xiàn)的,其實(shí)zookeeper還提供了對netty的支持蕴茴,如果想使用netty作為zookeeper網(wǎng)絡(luò)通信層的實(shí)現(xiàn),需要在client和server端分別去指定

  • Client端需要設(shè)置啟動(dòng)參數(shù)
-Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
  • Server端需要設(shè)置啟動(dòng)參數(shù)
-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory

Netty在Zookeeper客戶端的使用

如果大家看過之前系列的代碼姐直,可能對下面這段代碼有印象,這段代碼是客戶端創(chuàng)建session層連接表示對象時(shí)調(diào)用的方法

 private ClientCnxnSocket getClientCnxnSocket() throws IOException {
        //獲取客戶端連接的實(shí)現(xiàn)類倦淀,我們通過設(shè)置指定為org.apache.zookeeper.ClientCnxnSocketNetty
        String clientCnxnSocketName = getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
        if (clientCnxnSocketName == null) {
            clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
        }
        try {
           //通過反射生成客戶端連接實(shí)例
            Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName)
                                                       .getDeclaredConstructor(ZKClientConfig.class);
            ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
            return clientCxnSocket;
        } catch (Exception e) {
            throw new IOException("Couldn't instantiate " + clientCnxnSocketName, e);
        }
    }
ClientCnxnSocketNetty

ClientCnxnSocketNetty是zookeeper基于netty創(chuàng)建的session層連接表示對象
我們分析下ClientCnxnSocketNetty的創(chuàng)建

 ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException {
        this.clientConfig = clientConfig;
        // Client only has 1 outgoing socket, so the event loop group only needs
        // a single thread.
        //客戶端創(chuàng)建只包含一個(gè)執(zhí)行線程的eventLoopGroup
        eventLoopGroup = NettyUtils.newNioOrEpollEventLoopGroup(1 /* nThreads */);

        initProperties();
    }

SendThread.run會調(diào)用ClientCxnx.startConnect來創(chuàng)建客戶端到服務(wù)端socket連接,真正創(chuàng)建連接的地方是ClientCnxnSocket.connect方法声畏,對于ClientCnxnSocketNIO來說就是創(chuàng)建SocketChannel撞叽,然后SocketChannel根據(jù)指定的IP:Port去連接遠(yuǎn)程服務(wù)器,這個(gè)之前解析過插龄。我們現(xiàn)在分析基于netty實(shí)現(xiàn)的ClientCnxnSocketNetty.connect

ClientCnxnSocketNetty.connect
void connect(InetSocketAddress addr) throws IOException {
        
        firstConnect = new CountDownLatch(1);
        //初始化客戶端bootstrap愿棋,并設(shè)定客戶端的handler為ZKClientPipelineFactory,
        //ZKClientPipelineFactory繼承了ChannelInitializer均牢,下面我們會分析它的initChannel方法
        Bootstrap bootstrap = new Bootstrap().group(eventLoopGroup)
                                             .channel(NettyUtils.nioOrEpollSocketChannel())
                                             .option(ChannelOption.SO_LINGER, -1)
                                             .option(ChannelOption.TCP_NODELAY, true)
                                             .handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
       //設(shè)置ByteBufAllocator
        bootstrap = configureBootstrapAllocator(bootstrap);
        bootstrap.validate();
        //下面是執(zhí)行連接到服務(wù)端的過程糠雨,先加鎖,這把鎖將來會在處理連接響應(yīng)的時(shí)候用到
        connectLock.lock();
        try {
            //異步的執(zhí)行連接到服務(wù)端
            connectFuture = bootstrap.connect(addr);
           //添加連接結(jié)果回調(diào)
            connectFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    // this lock guarantees that channel won't be assigned after cleanup().
                    boolean connected = false;
                    connectLock.lock();
                    try {
                        
                        if (!channelFuture.isSuccess()) {
                            //連接不成功徘跪,直接返回
                            LOG.warn("future isn't success.", channelFuture.cause());
                            return;
                        } else if (connectFuture == null) {
                            LOG.info("connect attempt cancelled");
                            // If the connect attempt was cancelled but succeeded
                            // anyway, make sure to close the channel, otherwise
                            // we may leak a file descriptor.
                            channelFuture.channel().close();
                            return;
                        }
                        // setup channel, variables, connection, etc.
                        //獲取建立的連接通道
                        channel = channelFuture.channel();
                         //設(shè)置disconnect狀態(tài)為false甘邀,表示已經(jīng)連接上
                        disconnected.set(false);
                        //設(shè)置initialized為false表示session會話還沒有建立
                        initialized = false;
                        //lenBuffer,incomingBuffer和之前文章中分析的作用一樣垮庐,用來讀取服務(wù)端發(fā)送來的數(shù)據(jù)
                        lenBuffer.clear();
                        incomingBuffer = lenBuffer;
                        //primeConnection發(fā)送建立session會話請求(這個(gè)在NIO通信的系列文章中已經(jīng)分析過)松邪,下面我們會解析這個(gè)請求是如何被發(fā)送出去的
                        sendThread.primeConnection();
                        updateNow();
                        updateLastSendAndHeard();

                        if (sendThread.tunnelAuthInProgress()) {
                            waitSasl.drainPermits();
                            needSasl.set(true);
                            sendPrimePacket();
                        } else {
                            needSasl.set(false);
                        }
                        connected = true;
                    } finally {
                        connectFuture = null;
                        //釋放連接鎖
                        connectLock.unlock();
                        if (connected) {
                            LOG.info("channel is connected: {}", channelFuture.channel());
                        }
                        // need to wake on connect success or failure to avoid
                        // timing out ClientCnxn.SendThread which may be
                        // blocked waiting for first connect in doTransport().
                        wakeupCnxn();
                        //完成連接計(jì)數(shù)釋放
                        firstConnect.countDown();
                    }
                }
            });
        } finally {
            //  釋放連接鎖
            connectLock.unlock();
        }
    }

通過對connect分析,可以看到這個(gè)方法完成了netty客戶端bootstrap建立哨查,客戶端到服務(wù)端socket建立逗抑,創(chuàng)建session會話請求。因?yàn)閏onnect方法是異步的寒亥,所以SendThread.run會繼續(xù)執(zhí)行到ClientCnxnSocketNetty.doTransport邮府,之前NIO系列文章有解析過ClientCnxnSocketNIO.doTransport的源代碼,下面我們解析ClientCnxnSocketNetty.doTransport的實(shí)現(xiàn)

ClientCnxnSocketNetty.doTransport
void doTransport(
        int waitTimeOut,
        Queue<Packet> pendingQueue,
        ClientCnxn cnxn) throws IOException, InterruptedException {
        try {
           //firstConnect等待連接完成
            if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
                return;
            }
            Packet head = null;
            if (needSasl.get()) {
                if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
                    return;
                }
            } else {
               //從outgoingQueue取出一個(gè)請求packet
                head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS);
            }
            // check if being waken up on closing.
         
            if (!sendThread.getZkState().isAlive()) {
                // adding back the packet to notify of failure in conLossPacket().
               //如果服務(wù)端連接掛了或者連接還沒有建立好溉奕,這個(gè)時(shí)候如果有請求packet被取到褂傀,那么直接使用addBack把該請求加入到outgoing的頭部
                addBack(head);
                return;
            }
            // channel disconnection happened
            if (disconnected.get()) {
                addBack(head);
                throw new EndOfStreamException("channel for sessionid 0x" + Long.toHexString(sessionId) + " is lost");
            }
            if (head != null) {
                //把請求寫到服務(wù)端
                doWrite(pendingQueue, head, cnxn);
            }
        } finally {
            updateNow();
        }
    }

不同于ClientCnxnSocketNIO.doTransport處理邏輯,在ClientCnxnSocketNetty中直接處理寫數(shù)據(jù)的請求

ClientCnxnSocketNetty.doWrite
 private void doWrite(Queue<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
        updateNow();
        boolean anyPacketsSent = false;
        while (true) {
            
            if (p != WakeupPacket.getInstance()) {
               //如果不是wakeUp型的packet
                if ((p.requestHeader != null)
                    && (p.requestHeader.getType() != ZooDefs.OpCode.ping)
                    && (p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
                   //設(shè)置packet的xid用來保證請求的順序處理
                    p.requestHeader.setXid(cnxn.getXid());
                    synchronized (pendingQueue) {
                        pendingQueue.add(p);
                    }
                }
               //這個(gè)方法就是使用netty把數(shù)據(jù)寫出去的入口
                sendPktOnly(p);
                anyPacketsSent = true;
            }
           //如果outgoingQueue中有數(shù)據(jù)那么一直去寫腐宋,
            if (outgoingQueue.isEmpty()) {
               //outgoingQueue中的數(shù)據(jù)寫完了紊服,跳出doWrite
                break;
            }
            p = outgoingQueue.remove();
        }
        // TODO: maybe we should flush in the loop above every N packets/bytes?
        // But, how do we determine the right value for N ...
        if (anyPacketsSent) {
          //如果本次有數(shù)據(jù)被寫出去檀轨,那么通過channel.flush把數(shù)據(jù)刷出到網(wǎng)絡(luò)
            channel.flush();
        }
    }

sendPktOnly是使用netty把數(shù)據(jù)寫出去的入口,它最終會調(diào)用ClientCnxnSocketNetty.sendPkt

ClientCnxnSocketNetty.sendPkt
private ChannelFuture sendPkt(Packet p, boolean doFlush) {
        // Assuming the packet will be sent out successfully. Because if it fails,
        // the channel will close and clean up queues.
         //把packet中的請求轉(zhuǎn)化成ByteBuffer
        p.createBB();
        updateLastSend();
        final ByteBuf writeBuffer = Unpooled.wrappedBuffer(p.bb)
        //通過channel.writeAndFlush或者write把數(shù)據(jù)發(fā)送出去
        final ChannelFuture result = doFlush ? channel.writeAndFlush(writeBuffer) : channel.write(writeBuffer);
        result.addListener(onSendPktDoneListener);
        return result;
    }

通過上面的分析我們可以了解到客戶端是如何使用netty把請求發(fā)送給服務(wù)端欺嗤,接下來我分析客戶端是如何使用netty讀取來自服務(wù)端的響應(yīng)數(shù)據(jù)参萄,這需要先看下ZKClientPipelineFactory.initChannel方法

ZKClientPipelineFactory.initChannel
 protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            if (clientConfig.getBoolean(ZKClientConfig.SECURE_CLIENT)) {
                initSSL(pipeline);
            }
           //zookeeper客戶端通過initChannel向pipeline注冊了一個(gè)ZKClientHandler,ZKClientHandler是一個(gè)inbound類型的handler
           //沒有注冊任何outbound類型的handler煎饼,所以在上面講解zookeeper是如何使用netty來發(fā)送請求到服務(wù)端的時(shí)候讹挎,我們沒有先講解ZKClientPipelineFactory.initChannel

            pipeline.addLast("handler", new ZKClientHandler());
        }

我們看下ZKClientHandler定義

ZKClientHandler
private class ZKClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

        AtomicBoolean channelClosed = new AtomicBoolean(false);

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
           //處理連接斷開的情況
            LOG.info("channel is disconnected: {}", ctx.channel());
            cleanup();
        }

        /**
         * netty handler has encountered problems. We are cleaning it up and tell outside to close
         * the channel/connection.
         */
        private void cleanup() {
           //設(shè)置channelClosed為true
            if (!channelClosed.compareAndSet(false, true)) {
                return;
            }
            disconnected.set(true);
            onClosing();
        }
 
         //channelRead0處理來自服務(wù)端的響應(yīng)數(shù)據(jù)
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
            updateNow();
           //下面對請求的處理過程和NIO很像
            while (buf.isReadable()) {
                if (incomingBuffer.remaining() > buf.readableBytes()) {
                    int newLimit = incomingBuffer.position() + buf.readableBytes();
                    incomingBuffer.limit(newLimit);
                }
                //用incomingBuffer來讀數(shù)據(jù)
                buf.readBytes(incomingBuffer);
                incomingBuffer.limit(incomingBuffer.capacity());

                if (!incomingBuffer.hasRemaining()) {
                    incomingBuffer.flip();
                    if (incomingBuffer == lenBuffer) {
                        //incomingBuffer == lenBuffer說明這個(gè)時(shí)候讀的是4個(gè)字節(jié)表示響應(yīng)數(shù)據(jù)長度的數(shù)據(jù)
                        recvCount.getAndIncrement();
                        readLength();
                    } else if (!initialized) {
                        // initialized如果為false說明這個(gè)響應(yīng)是建立session會話的響應(yīng)
                        //readConnectResult之前在NIO系列文章中有解析
                        readConnectResult();
                        lenBuffer.clear();
                        incomingBuffer = lenBuffer;
                        initialized = true;
                        updateLastHeard();
                    } else {
                       //這個(gè)時(shí)候incomingBuffer包含的數(shù)據(jù)是請求的響應(yīng)對象,
                       //sendThread.readResponse之前在NIO 系列文章中已經(jīng)講解過
                        sendThread.readResponse(incomingBuffer);
                        lenBuffer.clear();
                        incomingBuffer = lenBuffer;
                        updateLastHeard();
                    }
                }
            }
   
            wakeupCnxn();
            // Note: SimpleChannelInboundHandler releases the ByteBuf for us
            // so we don't need to do it.
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            LOG.error("Unexpected throwable", cause);
            cleanup();
        }

    }

通過上面對ZKClientHandler的解析吆玖,相信大家對zookeeper如何使用netty讀取服務(wù)端響應(yīng)數(shù)據(jù)有了一定的了解筒溃。


到此我們就完成zookeeper客戶端是如何使用netty來完成網(wǎng)絡(luò)通信的講解,下面我們來說說netty在zookeeper服務(wù)端的使用

Netty在zookeeper server端的使用

服務(wù)端在啟動(dòng)時(shí)候會創(chuàng)建ServerCnxnFactory沾乘,默認(rèn)ServerCnxnFactory的實(shí)現(xiàn)是NIOServerCnxnFactory怜奖,下面我們看下NettyServerCnxnFactory的實(shí)現(xiàn)。

NettyServerCnxnFactory初始化
 NettyServerCnxnFactory() {
        x509Util = new ClientX509Util();

        boolean usePortUnification = Boolean.getBoolean(PORT_UNIFICATION_KEY);
        LOG.info("{}={}", PORT_UNIFICATION_KEY, usePortUnification);
        if (usePortUnification) {
            try {
                QuorumPeerConfig.configureSSLAuth();
            } catch (QuorumPeerConfig.ConfigException e) {
                LOG.error("unable to set up SslAuthProvider, turning off client port unification", e);
                usePortUnification = false;
            }
        }
        this.shouldUsePortUnification = usePortUnification;

        this.advancedFlowControlEnabled = Boolean.getBoolean(NETTY_ADVANCED_FLOW_CONTROL);
        LOG.info("{} = {}", NETTY_ADVANCED_FLOW_CONTROL, this.advancedFlowControlEnabled);

        setOutstandingHandshakeLimit(Integer.getInteger(OUTSTANDING_HANDSHAKE_LIMIT, -1));
        //bossGroup處理客戶端的連接請求翅阵,workerGroup負(fù)責(zé)每個(gè)連接IO事件的處理歪玲,典型的reactor模式
        EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup(NettyUtils.getClientReachableLocalInetAddressCount());
        EventLoopGroup workerGroup = NettyUtils.newNioOrEpollEventLoopGroup();
       //下面是創(chuàng)建服務(wù)端ServerBootstrap的典型模式?jīng)]有什么好講解的,我們直接看channelHandler的邏輯
        ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup)
                                                         .channel(NettyUtils.nioOrEpollServerSocketChannel())
                                                         // parent channel options
                                                         .option(ChannelOption.SO_REUSEADDR, true)
                                                         // child channels options
                                                         .childOption(ChannelOption.TCP_NODELAY, true)
                                                         .childOption(ChannelOption.SO_LINGER, -1)
                                                         .childHandler(new ChannelInitializer<SocketChannel>() {
                                                             @Override
                                                             protected void initChannel(SocketChannel ch) throws Exception {
                                                                 ChannelPipeline pipeline = ch.pipeline();
                                                                 if (advancedFlowControlEnabled) {
                                                                     pipeline.addLast(readIssuedTrackingHandler);
                                                                 }
                                                                 if (secure) {
                                                                     initSSL(pipeline, false);
                                                                 } else if (shouldUsePortUnification) {
                                                                     initSSL(pipeline, true);
                                                                 }
                                                                 pipeline.addLast("servercnxnfactory", channelHandler);
                                                             }
                                                         });
        this.bootstrap = configureBootstrapAllocator(bootstrap);
        this.bootstrap.validate();
    }

zookeeper 服務(wù)端的ServerBootstrap只包含一個(gè)業(yè)務(wù)處理handler: CnxnChannelHandler掷匠,CnxnChannelHandler是一個(gè)duplexHandler滥崩,下面我們分析下它的實(shí)現(xiàn)

CnxnChannelHandler定義
class CnxnChannelHandler extends ChannelDuplexHandler {

        //當(dāng)服務(wù)端接受客戶端的socket連接之后,channelActive會被調(diào)用
       //正常情況下通過channelActive服務(wù)端session層的連接表示對象會被建立起來
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Channel active {}", ctx.channel());
            }

            final Channel channel = ctx.channel();
            //連接數(shù)有沒有達(dá)到服務(wù)端設(shè)置的連接最大數(shù)讹语,如果達(dá)到了钙皮,直接關(guān)閉底層socket,拒絕新的連接請求
            if (limitTotalNumberOfCnxns()) {
                ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
                channel.close();
                return;
            }
            InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
            //單個(gè)客戶端的連接數(shù)是不是超過了用戶設(shè)置的最大可建立連接數(shù)顽决,如果達(dá)到了拒絕客戶端的連接請求
            if (maxClientCnxns > 0 && getClientCnxnCount(addr) >= maxClientCnxns) {
                ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
                LOG.warn("Too many connections from {} - max is {}", addr, maxClientCnxns);
                channel.close();
                return;
            }
            //創(chuàng)建session會話層的連接表示對象NettyServerCnxn
            NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this);
           //把session會話連接表示對象存儲到channel中
            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);

            if (handshakeThrottlingEnabled) {
                // Favor to check and throttling even in dual mode which
                // accepts both secure and insecure connections, since
                // it's more efficient than throttling when we know it's
                // a secure connection in DualModeSslHandler.
                //
                // From benchmark, this reduced around 15% reconnect time.
                int outstandingHandshakesNum = outstandingHandshake.addAndGet(1);
                if (outstandingHandshakesNum > outstandingHandshakeLimit) {
                    outstandingHandshake.addAndGet(-1);
                    channel.close();
                    ServerMetrics.getMetrics().TLS_HANDSHAKE_EXCEEDED.add(1);
                } else {
                    cnxn.setHandshakeState(HandshakeState.STARTED);
                }
            }

            if (secure) {
                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
            } else if (!shouldUsePortUnification) {
                allChannels.add(ctx.channel());
                addCnxn(cnxn);
            }
        }

         //連接關(guān)閉時(shí)的處理邏輯
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Channel inactive {}", ctx.channel());
            }

            allChannels.remove(ctx.channel());
            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
            if (cnxn != null) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Channel inactive caused close {}", cnxn);
                }
                updateHandshakeCountIfStarted(cnxn);
               //關(guān)閉連接
                cnxn.close(ServerCnxn.DisconnectReason.CHANNEL_DISCONNECTED);
            }
        }

        //異常處理方法
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            LOG.warn("Exception caught", cause);
            NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
            if (cnxn != null) {
                LOG.debug("Closing {}", cnxn);
                updateHandshakeCountIfStarted(cnxn);
                cnxn.close(ServerCnxn.DisconnectReason.CHANNEL_CLOSED_EXCEPTION);
            }
        }
         //處理用自定義的channel事件:主要是處理channel讀和不讀的事件
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            try {
                if (evt == NettyServerCnxn.ReadEvent.ENABLE) {
                    LOG.debug("Received ReadEvent.ENABLE");
                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
                    // TODO: Not sure if cnxn can be null here. It becomes null if channelInactive()
                    // or exceptionCaught() trigger, but it's unclear to me if userEventTriggered() can run
                    // after either of those. Check for null just to be safe ...
                    if (cnxn != null) {
                        if (cnxn.getQueuedReadableBytes() > 0) {
                            cnxn.processQueuedBuffer();
                            if (advancedFlowControlEnabled && cnxn.getQueuedReadableBytes() == 0) {
                                // trigger a read if we have consumed all
                                // backlog
                                ctx.read();
                                LOG.debug("Issued a read after queuedBuffer drained");
                            }
                        }
                    }
                    if (!advancedFlowControlEnabled) {
                        ctx.channel().config().setAutoRead(true);
                    }
                } else if (evt == NettyServerCnxn.ReadEvent.DISABLE) {
                    LOG.debug("Received ReadEvent.DISABLE");
                    ctx.channel().config().setAutoRead(false);
                }
            } finally {
                ReferenceCountUtil.release(evt);
            }
        }

         //服務(wù)端讀取客戶端發(fā)送來的請求數(shù)據(jù)
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("message received called {}", msg);
                }
                try {
                    LOG.debug("New message {} from {}", msg, ctx.channel());
                    //對應(yīng)于channelActive短条,從channel中獲取session層連接表示對象NettyServerCnxn
                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
                    if (cnxn == null) {
                        LOG.error("channelRead() on a closed or closing NettyServerCnxn");
                    } else {
                        //NettyServerCnxn處理客戶端發(fā)送過來的請求
                        cnxn.processMessage((ByteBuf) msg);
                    }
                } catch (Exception ex) {
                    LOG.error("Unexpected exception in receive", ex);
                    throw ex;
                }
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            if (advancedFlowControlEnabled) {
                NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
                if (cnxn != null && cnxn.getQueuedReadableBytes() == 0 && cnxn.readIssuedAfterReadComplete == 0) {
                    ctx.read();
                    LOG.debug("Issued a read since we do not have anything to consume after channelReadComplete");
                }
            }

            ctx.fireChannelReadComplete();
        }

        // Use a single listener instance to reduce GC
        // Note: this listener is only added when LOG.isTraceEnabled() is true,
        // so it should not do any work other than trace logging.
        private final GenericFutureListener<Future<Void>> onWriteCompletedTracer = (f) -> {
            if (LOG.isTraceEnabled()) {
                LOG.trace("write success: {}", f.isSuccess());
            }
        };

     
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            if (LOG.isTraceEnabled()) {
                promise.addListener(onWriteCompletedTracer);
            }
            super.write(ctx, msg, promise);
        }

    }

現(xiàn)在我們看下NettyServerCnxn.processMessage是如何處理讀入的請求信息

NettyServerCnxn.processMessage
void processMessage(ByteBuf buf) {
        checkIsInEventLoop("processMessage");
        LOG.debug("0x{} queuedBuffer: {}", Long.toHexString(sessionId), queuedBuffer);

        if (LOG.isTraceEnabled()) {
            LOG.trace("0x{} buf {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(buf));
        }

        if (throttled.get()) {
            LOG.debug("Received message while throttled");
            // we are throttled, so we need to queue
            if (queuedBuffer == null) {
                LOG.debug("allocating queue");
                queuedBuffer = channel.alloc().compositeBuffer();
            }
            appendToQueuedBuffer(buf.retainedDuplicate());
            if (LOG.isTraceEnabled()) {
                LOG.trace("0x{} queuedBuffer {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(queuedBuffer));
            }
        } else {
            LOG.debug("not throttled");
            if (queuedBuffer != null) {
                appendToQueuedBuffer(buf.retainedDuplicate());
                processQueuedBuffer();
            } else {
               //上面的直接跳過,來到receiveMessage
                receiveMessage(buf);
                // Have to check !closingChannel, because an error in
                // receiveMessage() could have led to close() being called.
                if (!closingChannel && buf.isReadable()) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Before copy {}", buf);
                    }

                    if (queuedBuffer == null) {
                        queuedBuffer = channel.alloc().compositeBuffer();
                    }
                    appendToQueuedBuffer(buf.retainedSlice(buf.readerIndex(), buf.readableBytes()));
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Copy is {}", queuedBuffer);
                        LOG.trace("0x{} queuedBuffer {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(queuedBuffer));
                    }
                }
            }
        }
    }
NettyServerCnxn.receiveMessage

NettyServerCnxn.receiveMessage是zookeeper server處理客戶端請求的核心

 private void receiveMessage(ByteBuf message) {
        checkIsInEventLoop("receiveMessage");
        try {
            while (message.isReadable() && !throttled.get()) {
                if (bb != null) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("message readable {} bb len {} {}", message.readableBytes(), bb.remaining(), bb);
                        ByteBuffer dat = bb.duplicate();
                        dat.flip();
                        LOG.trace("0x{} bb {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
                    }

                    if (bb.remaining() > message.readableBytes()) {
                        int newLimit = bb.position() + message.readableBytes();
                        bb.limit(newLimit);
                    }
                    //使用bb接受客戶端發(fā)送來的請求對象
                    message.readBytes(bb);
                    bb.limit(bb.capacity());

                    if (LOG.isTraceEnabled()) {
                        LOG.trace("after readBytes message readable {} bb len {} {}", message.readableBytes(), bb.remaining(), bb);
                        ByteBuffer dat = bb.duplicate();
                        dat.flip();
                        LOG.trace("after readbytes 0x{} bb {}",
                                  Long.toHexString(sessionId),
                                  ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
                    }
                    if (bb.remaining() == 0) {
                        bb.flip();
                        packetReceived(4 + bb.remaining());

                        ZooKeeperServer zks = this.zkServer;
                        if (zks == null || !zks.isRunning()) {
                            throw new IOException("ZK down");
                        }
                        if (initialized) {
                             //如果連接已經(jīng)完成擎值,那么使用zks.processPacket來處理這個(gè)請求慌烧,processPacket在NIO系列文章中已經(jīng)解析過
                            // TODO: if zks.processPacket() is changed to take a ByteBuffer[],
                            // we could implement zero-copy queueing.
                            zks.processPacket(this, bb);
                        } else {
                            LOG.debug("got conn req request from {}", getRemoteSocketAddress());
                            //如果連接沒有完成說明這個(gè)請求是一個(gè)session會話創(chuàng)建請求逐抑,那么使用zks.processConnectRequest來處理這個(gè)請求鸠儿,
                          //zks.processConnectRequest在NIO系列文章中已經(jīng)解析過
                            zks.processConnectRequest(this, bb);
                            initialized = true;
                        }
                        bb = null;
                    }
                } else {
                   //代碼執(zhí)行到這里說明bb還沒有被初始化,那么第一次需要從ByteBuf讀取的數(shù)據(jù)是前4個(gè)字節(jié)表示的是請求的長度
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("message readable {} bblenrem {}", message.readableBytes(), bbLen.remaining());
                        ByteBuffer dat = bbLen.duplicate();
                        dat.flip();
                        LOG.trace("0x{} bbLen {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
                    }

                    if (message.readableBytes() < bbLen.remaining()) {
                        bbLen.limit(bbLen.position() + message.readableBytes());
                    }
                   //從message中讀取前4個(gè)字節(jié)
                    message.readBytes(bbLen);
                    bbLen.limit(bbLen.capacity());
                    if (bbLen.remaining() == 0) {
                        bbLen.flip();

                        if (LOG.isTraceEnabled()) {
                            LOG.trace("0x{} bbLen {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(bbLen)));
                        }
                        //獲取客戶請求數(shù)據(jù)的長度
                        int len = bbLen.getInt();
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("0x{} bbLen len is {}", Long.toHexString(sessionId), len);
                        }

                        bbLen.clear();
                        if (!initialized) {
                            if (checkFourLetterWord(channel, message, len)) {
                                return;
                            }
                        }
                        if (len < 0 || len > BinaryInputArchive.maxBuffer) {
                            throw new IOException("Len error " + len);
                        }
                        // checkRequestSize will throw IOException if request is rejected
                        zkServer.checkRequestSizeWhenReceivingMessage(len);
                       //創(chuàng)建容量為len的ByteBuffer厕氨,為接受客戶請求數(shù)據(jù)做準(zhǔn)備
                        bb = ByteBuffer.allocate(len);
                    }
                }
            }
        } catch (IOException e) {
            LOG.warn("Closing connection to {}", getRemoteSocketAddress(), e);
            close(DisconnectReason.IO_EXCEPTION);
        } catch (ClientCnxnLimitException e) {
            // Common case exception, print at debug level
            ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);

            LOG.debug("Closing connection to {}", getRemoteSocketAddress(), e);
            close(DisconnectReason.CLIENT_RATE_LIMIT);
        }
    }

通過receiveMessage我們知道了zookeeper是如何利用netty從客戶端讀取請求消息进每。
一個(gè)客戶端請求經(jīng)過zookeeper server請求處理鏈處理之后處理結(jié)果是如何被發(fā)送到客戶端的呢?
請求的Response會在FinalRequestProcessor中形成然后被NettyServerCnxn.sendResponse發(fā)送出去

NettyServerCnxn.sendBuffer

sendResponse會調(diào)用sendBuffer命斧,sendBuffer直接調(diào)用channel.writeAndFlush把請求響應(yīng)發(fā)送給客戶端

public void sendBuffer(ByteBuffer... buffers) {
        if (buffers.length == 1 && buffers[0] == ServerCnxnFactory.closeConn) {
            close(DisconnectReason.CLIENT_CLOSED_CONNECTION);
            return;
        }
        channel.writeAndFlush(Unpooled.wrappedBuffer(buffers)).addListener(onSendBufferDoneListener);
    }

上面就是zookeeper server端如何使用netty處理IO事件的過程

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末田晚,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子国葬,更是在濱河造成了極大的恐慌贤徒,老刑警劉巖芹壕,帶你破解...
    沈念sama閱讀 222,252評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異接奈,居然都是意外死亡踢涌,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評論 3 399
  • 文/潘曉璐 我一進(jìn)店門序宦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來睁壁,“玉大人,你說我怎么就攤上這事互捌∨嗣鳎” “怎么了?”我有些...
    開封第一講書人閱讀 168,814評論 0 361
  • 文/不壞的土叔 我叫張陵秕噪,是天一觀的道長钳降。 經(jīng)常有香客問我,道長腌巾,這世上最難降的妖魔是什么牲阁? 我笑而不...
    開封第一講書人閱讀 59,869評論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮壤躲,結(jié)果婚禮上城菊,老公的妹妹穿的比我還像新娘。我一直安慰自己碉克,他們只是感情好凌唬,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,888評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著漏麦,像睡著了一般客税。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上撕贞,一...
    開封第一講書人閱讀 52,475評論 1 312
  • 那天更耻,我揣著相機(jī)與錄音,去河邊找鬼捏膨。 笑死秧均,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的号涯。 我是一名探鬼主播目胡,決...
    沈念sama閱讀 41,010評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼链快!你這毒婦竟也來了誉己?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,924評論 0 277
  • 序言:老撾萬榮一對情侶失蹤域蜗,失蹤者是張志新(化名)和其女友劉穎巨双,沒想到半個(gè)月后噪猾,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,469評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡筑累,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,552評論 3 342
  • 正文 我和宋清朗相戀三年畏妖,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片疼阔。...
    茶點(diǎn)故事閱讀 40,680評論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡戒劫,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出婆廊,到底是詐尸還是另有隱情迅细,我是刑警寧澤,帶...
    沈念sama閱讀 36,362評論 5 351
  • 正文 年R本政府宣布淘邻,位于F島的核電站茵典,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏宾舅。R本人自食惡果不足惜统阿,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,037評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望筹我。 院中可真熱鬧扶平,春花似錦、人聲如沸蔬蕊。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽岸夯。三九已至麻献,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間猜扮,已是汗流浹背勉吻。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留旅赢,地道東北人齿桃。 一個(gè)月前我還...
    沈念sama閱讀 49,099評論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像鲜漩,于是被迫代替她去往敵國和親源譬。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,691評論 2 361