Netty源碼剖析

Netty服務(wù)端示例:

public class NettyServer {

    public static void main(String[] args) throws Exception {

        // 創(chuàng)建兩個(gè)線(xiàn)程組bossGroup和workerGroup, 含有的子線(xiàn)程N(yùn)ioEventLoop的個(gè)數(shù)默認(rèn)為cpu核數(shù)的兩倍
        // bossGroup只是處理連接請(qǐng)求 ,真正的和客戶(hù)端業(yè)務(wù)處理幌蚊,會(huì)交給workerGroup完成
        EventLoopGroup bossGroup = new NioEventLoopGroup(3);
        EventLoopGroup workerGroup = new NioEventLoopGroup(8);
        try {
            // 創(chuàng)建服務(wù)器端的啟動(dòng)對(duì)象
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 使用鏈?zhǔn)骄幊虂?lái)配置參數(shù)
            bootstrap.group(bossGroup, workerGroup) //設(shè)置兩個(gè)線(xiàn)程組
                    // 使用NioServerSocketChannel作為服務(wù)器的通道實(shí)現(xiàn)
                    .channel(NioServerSocketChannel.class)
                    // 初始化服務(wù)器連接隊(duì)列大小烟瞧,服務(wù)端處理客戶(hù)端連接請(qǐng)求是順序處理的,所以同一時(shí)間只能處理一個(gè)客戶(hù)端連接。
                    // 多個(gè)客戶(hù)端同時(shí)來(lái)的時(shí)候,服務(wù)端將不能處理的客戶(hù)端連接請(qǐng)求放在隊(duì)列中等待處理
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {//創(chuàng)建通道初始化對(duì)象沮焕,設(shè)置初始化參數(shù),在 SocketChannel 建立起來(lái)之前執(zhí)行

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //對(duì)workerGroup的SocketChannel設(shè)置處理器
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            System.out.println("netty server start拉宗。峦树。");
            // 綁定一個(gè)端口并且同步, 生成了一個(gè)ChannelFuture異步對(duì)象,通過(guò)isDone()等方法可以判斷異步事件的執(zhí)行情況
            // 啟動(dòng)服務(wù)器(并綁定端口)旦事,bind是異步操作魁巩,sync方法是等待異步操作執(zhí)行完畢
            ChannelFuture cf = bootstrap.bind(9000).sync();
            // 給cf注冊(cè)監(jiān)聽(tīng)器,監(jiān)聽(tīng)我們關(guān)心的事件
            /*cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("監(jiān)聽(tīng)端口9000成功");
                    } else {
                        System.out.println("監(jiān)聽(tīng)端口9000失敗");
                    }
                }
            });*/
            // 等待服務(wù)端監(jiān)聽(tīng)端口關(guān)閉姐浮,closeFuture是異步操作
            // 通過(guò)sync方法同步等待通道關(guān)閉處理完畢谷遂,這里會(huì)阻塞等待通道關(guān)閉完成,內(nèi)部調(diào)用的是Object的wait()方法
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

1.NioEventLoopGroup和NioEventLoop

public NioEventLoopGroup() {
        this(0);
}
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

線(xiàn)程數(shù)默認(rèn)是核心數(shù)的兩倍卖鲤。

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

重點(diǎn)看下newChild()

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }

NioEventLoop里面有兩個(gè)最核心的組件:

  • 1)在其父類(lèi)構(gòu)造方法SingleThreadEventExecutor()里面初始化了taskQueue肾扰,有可能是LinkedBlockingQueue畴嘶,也有可能是MpscUnboundedArrayQueue或者M(jìn)pscUnboundedAtomicArrayQueue
  • 2)selectorTuple = openSelector();

2.ServerBootstrap

配置參數(shù):

  • ServerBootstrap#group(parentGroup, childGroup),設(shè)置this.group = parentGroup, this.childGroup = childGroup集晚。
  • AbstractBootstrap#channel掠廓,設(shè)置this.channelFactory
  • AbstractBootstrap#option,設(shè)置參數(shù)
  • ServerBootstrap#childHandler(i)甩恼,設(shè)置this.childHandler

2.1 服務(wù)端向selector注冊(cè)ACCEPT事件并綁定端口地址

AbstractBootstrap#bind(int)

  • bind(SocketAddress)
  • AbstractBootstrap#doBind
    1)initAndRegister();
    ?1-1)channel = channelFactory.newChannel(); 這里調(diào)用ReflectiveChannelFactory#newChannel蟀瞧,然后會(huì)調(diào)用傳入類(lèi)的構(gòu)造方法constructor.newInstance();
    ??NioServerSocketChannel()構(gòu)造方法;
    ??this(newSocket(DEFAULT_SELECTOR_PROVIDER));其中newSocket()會(huì)調(diào)用SelectorProvider.provider(). openServerSocketChannel()創(chuàng)建ServerSocketChannel条摸。
    ??pipeline = newChannelPipeline();創(chuàng)建DefaultChannelPipeline悦污。
    ??super(null, channel, SelectionKey.OP_ACCEPT);
    ??this.readInterestOp = readInterestOp;關(guān)注ACCEPT事件;
    ??ch.configureBlocking(false);設(shè)置為非阻塞模式钉蒲;
    ?1-2)init(channel); 核心是向pipeline添加了一個(gè)ChannelHandler(ChannelInitializer一次性切端、初始化handler),負(fù)責(zé)添加一個(gè)ServerBootstrapAcceptor handler顷啼,添加完后踏枣,自己就移除了,ServerBootstrapAcceptor handler: 負(fù)責(zé)接收客戶(hù)端連接創(chuàng)建連接后钙蒙,對(duì)連接的初始化工作茵瀑。
    ?1-3)config().group().register(channel);
    ??MultithreadEventLoopGroup#register()
    ??SingleThreadEventLoop#register()
    ??AbstractChannel.AbstractUnsafe#register
    ??eventLoop.execute()提交了一個(gè)register0()任務(wù)。
    ???AbstractNioChannel#doRegister躬厌,調(diào)用selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
    ???pipeline.invokeHandlerAddedIfNeeded();這里會(huì)調(diào)用上面設(shè)置的ChannelInitializer#initChannel方法马昨,移除自己添加ServerBootstrapAcceptor 。(參考DefaultChannelPipeline#addLast()-> callHandlerCallbackLater(newCtx, true)-> PendingHandlerAddedTask -> ChannelInitializer#handlerAdded)
    ???pipeline.fireChannelRegistered();
    ???beginRead()扛施,調(diào)用父類(lèi)AbstractNioChannel#doBeginRead鸿捧,這里會(huì)調(diào)用selectionKey.interestOps(interestOps | readInterestOp),也即關(guān)注ACCEPT事件疙渣。
    2)doBind0(regFuture, channel, localAddress, promise);
    ?channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

2.2 NioEventLoop#run

來(lái)分析一下SingleThreadEventExecutor#execute

  • 1)addTask(task);將任務(wù)加入到隊(duì)列中taskQueue.offer(task);
  • 2)startThread();
    SingleThreadEventExecutor#doStartThread匙奴,這里會(huì)調(diào)用executor.execute()執(zhí)行Runnable,Runnable的核心如下妄荔;
    SingleThreadEventExecutor.this.run();
    NioEventLoop#run

上面的executor是ThreadPerTaskExecutor泼菌,在MultithreadEventExecutorGroup構(gòu)造方法里面。

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

重點(diǎn)就在NioEventLoop#run

  • 1)SelectStrategy.SELECT:select(wakenUp.getAndSet(false));這里面核心是調(diào)用 int selectedKeys = selector.select(timeoutMillis);
  • 2)processSelectedKeys();
  • 3)runAllTasks();從taskQueue中取出任務(wù)并執(zhí)行懦冰。

無(wú)鎖串行化設(shè)計(jì)思想

Netty的NioEventLoop讀取到消息之后灶轰,直接調(diào)用ChannelPipeline的fireChannelRead(Object msg),只要用戶(hù)不主動(dòng)切換線(xiàn)程刷钢,一直會(huì)由NioEventLoop調(diào)用到用戶(hù)的Handler,期間不進(jìn)行線(xiàn)程切換乳附,這種串行化處理方式避免了多線(xiàn)程操作導(dǎo)致的鎖的競(jìng)爭(zhēng)内地,從性能角度看是最優(yōu)的伴澄。

3.pipeline責(zé)任鏈

來(lái)看看pipeline責(zé)任鏈調(diào)用流程:

  • DefaultChannelPipeline#fireChannelRegistered
  • AbstractChannelHandlerContext.invokeChannelRegistered(head);
  • 調(diào)用head.invokeChannelRegistered()
  • 調(diào)用HeadContext#channelRegistered
  • 核心是findContextInbound(MASK_CHANNEL_REGISTERED),找到下一個(gè)與MASK_CHANNEL_REGISTERED匹配的調(diào)用者阱缓,然后又重復(fù)調(diào)用AbstractChannelHandlerContext.invokeChannelRegistered()方法非凌,實(shí)現(xiàn)責(zé)任鏈調(diào)用

AbstractChannelHandlerContext.invokeChannelRegistered:

    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }

AbstractChannelHandlerContext#invokeChannelRegistered()

    private void invokeChannelRegistered() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRegistered();
        }
    }

DefaultChannelPipeline.HeadContext#channelRegistered

       public void channelRegistered(ChannelHandlerContext ctx) {
            invokeHandlerAddedIfNeeded();
            ctx.fireChannelRegistered();
        }

AbstractChannelHandlerContext#fireChannelRegistered

    public ChannelHandlerContext fireChannelRegistered() {
        invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
        return this;
    }

核心是AbstractChannelHandlerContext#findContextInbound:

  • 從前往后查找AbstractChannelHandlerContext ,直到找到與mask匹配為止
    private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while ((ctx.executionMask & mask) == 0);
        return ctx;
    }

4.服務(wù)端Channel注冊(cè)并處理ACCEPT事件

NioEventLoop#processSelectedKeys

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            //不用JDK的selector.selectedKeys(), 性能更好(1%-2%)荆针,垃圾回收更少
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;

            //呼應(yīng)于channel的register中的this: 例如:selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1;
            }
        }
    }
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            //處理讀請(qǐng)求(斷開(kāi)連接)或接入連接
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

重點(diǎn)關(guān)注一下服務(wù)端處理SelectionKey.OP_ACCEPT請(qǐng)求:unsafe.read();

實(shí)際是AbstractNioMessageChannel.NioMessageUnsafe#read

  • 1)doReadMessages()首先會(huì)調(diào)用serverSocketChannel.accept()敞嗡,然后將其封裝成NioSocketChannel。
    ?NioSocketChannel構(gòu)造方法航背,會(huì)創(chuàng)建DefaultChannelPipeline喉悴,會(huì)關(guān)注SelectionKey.OP_READ事件(賦值給this.readInterestOp),會(huì)設(shè)置非阻塞模式ch.configureBlocking(false)玖媚。
  • 2)pipeline.fireChannelRead(readBuf.get(i));這里會(huì)觸發(fā)服務(wù)端的pipeline中的handler箕肃,核心是ServerBootstrapAcceptor#channelRead()。
    ?2-1)child.pipeline().addLast(childHandler)今魔,將netty服務(wù)端初始化時(shí)寫(xiě)的ChannelInitializer加入到客戶(hù)端socketChannel的pipeline里面勺像;
    ?2-2)childGroup.register(child).addListener(),跟上面服務(wù)端channel處理類(lèi)似错森。
    ??A)向workerGroup線(xiàn)程池某個(gè)NioEventLoop中的selector注冊(cè)讀事件(是在pipeline.fireChannelActive() -> .DefaultChannelPipeline.HeadContext#read -> AbstractChannel.AbstractUnsafe#beginRead吟宦,這里會(huì)注冊(cè)上面channel初始化傳入的讀事件),NioEventLoop#run死循環(huán)監(jiān)聽(tīng)該事件涩维;
    ??B)ChannelInitializer#initChannel方法督函,移除自己添加NettyServerHandler。
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);

                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

5.客戶(hù)端Channel處理READ事件

NioEventLoop#processSelectedKey()
-> AbstractNioByteChannel.NioByteUnsafe#read

  • 1)byteBuf = allocHandle.allocate(allocator); 分配byteBuf
  • 2) allocHandle.lastBytesRead(doReadBytes(byteBuf)); 從channel讀取數(shù)據(jù)激挪;
  • 3)pipeline.fireChannelRead(byteBuf)辰狡,pipeline上執(zhí)行,業(yè)務(wù)邏輯的處理就在這個(gè)地方

6.直接內(nèi)存垄分、零拷貝與ByteBuf內(nèi)存池

在上面分配byteBuf里面宛篇,就使用了直接內(nèi)存:

RecvByteBufAllocator.DelegatingHandle#allocate

        public ByteBuf allocate(ByteBufAllocator alloc) {
            return delegate.allocate(alloc);
        }

DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#allocate

        public ByteBuf allocate(ByteBufAllocator alloc) {
            return alloc.ioBuffer(guess());
        }

AbstractByteBufAllocator#ioBuffer(int)

    public ByteBuf ioBuffer(int initialCapacity) {
        if (PlatformDependent.hasUnsafe() || isDirectBufferPooled()) {
            return directBuffer(initialCapacity);
        }
        return heapBuffer(initialCapacity);
    }

PooledByteBufAllocator#newDirectBuffer

    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        PoolThreadCache cache = threadCache.get();
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;
        if (directArena != null) {
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }

使用直接內(nèi)存的優(yōu)缺點(diǎn)
優(yōu)點(diǎn):

  • 不占用堆內(nèi)存空間,減少了發(fā)生GC的可能
  • java虛擬機(jī)實(shí)現(xiàn)上薄湿,本地IO會(huì)直接操作直接內(nèi)存(直接內(nèi)存=>系統(tǒng)調(diào)用=>硬盤(pán)/網(wǎng)卡)叫倍,而非直接內(nèi)存則需要二次拷貝(堆內(nèi)存=>直接內(nèi)存=>系統(tǒng)調(diào)用=>硬盤(pán)/網(wǎng)卡)

缺點(diǎn):

  • 初始分配較慢
  • 沒(méi)有JVM直接幫助管理內(nèi)存,容易發(fā)生內(nèi)存溢出豺瘤。為了避免一直沒(méi)有FULL GC吆倦,最終導(dǎo)致直接內(nèi)存把物理內(nèi)存耗完。我們可以指定直接內(nèi)存的最大值坐求,通過(guò)-XX:MaxDirectMemorySize來(lái)指定蚕泽,當(dāng)達(dá)到閾值的時(shí)候,調(diào)用system.gc來(lái)進(jìn)行一次FULL GC,間接把那些沒(méi)有被使用的直接內(nèi)存回收掉须妻。

對(duì)于堆外直接內(nèi)存的分配和回收仔蝌,是一件耗時(shí)的操作。為了盡量重用緩沖區(qū)荒吏,Netty提供了基于ByteBuf內(nèi)存池的緩沖區(qū)重用機(jī)制敛惊。需要的時(shí)候直接從池子里獲取ByteBuf使用即可,使用完畢之后就重新放回到池子里去绰更。

PooledByteBufAllocator#newDirectBuffer
-> PoolArena#allocate()
-> PoolArena.DirectArena#newByteBuf
-> PooledUnsafeDirectByteBuf#newInstance
-> 最終通過(guò)RECYCLER內(nèi)存池獲取ByteBuf對(duì)象瞧挤,如果是非內(nèi)存池實(shí)現(xiàn),則直接創(chuàng)建一個(gè)新的ByteBuf對(duì)象儡湾。

PoolArena#allocate()

    PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
        PooledByteBuf<T> buf = newByteBuf(maxCapacity);
        allocate(cache, buf, reqCapacity);
        return buf;
    }

PoolArena.DirectArena#newByteBuf

        protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
            if (HAS_UNSAFE) {
                return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
            } else {
                return PooledDirectByteBuf.newInstance(maxCapacity);
            }
        }

PooledUnsafeDirectByteBuf#newInstance

    static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
        PooledUnsafeDirectByteBuf buf = RECYCLER.get();
        buf.reuse(maxCapacity);
        return buf;
    }

7.ByteBuf擴(kuò)容機(jī)制

ByteBuf.writeByte()->AbstractByteBuf

AbstractByteBuf#writeByte

    public ByteBuf writeByte(int value) {
        ensureWritable0(1);
        _setByte(writerIndex++, value);
        return this;
    }

AbstractByteBuf#ensureWritable0

    final void ensureWritable0(int minWritableBytes) {
        ensureAccessible();
        if (minWritableBytes <= writableBytes()) {
            return;
        }
        final int writerIndex = writerIndex();
        if (checkBounds) {
            if (minWritableBytes > maxCapacity - writerIndex) {
                throw new IndexOutOfBoundsException(String.format(
                        "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                        writerIndex, minWritableBytes, maxCapacity, this));
            }
        }

        // Normalize the current capacity to the power of 2.
        int minNewCapacity = writerIndex + minWritableBytes;
        int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity);

        int fastCapacity = writerIndex + maxFastWritableBytes();
        // Grow by a smaller amount if it will avoid reallocation
        if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) {
            newCapacity = fastCapacity;
        }

        // Adjust to the new capacity.
        capacity(newCapacity);
    }

AbstractByteBufAllocator#calculateNewCapacity

    public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
        checkPositiveOrZero(minNewCapacity, "minNewCapacity");
        if (minNewCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                    minNewCapacity, maxCapacity));
        }
        final int threshold = CALCULATE_THRESHOLD; // 4 MiB page

        if (minNewCapacity == threshold) {
            return threshold;
        }

        // If over threshold, do not double but just increase by threshold.
        if (minNewCapacity > threshold) {
            int newCapacity = minNewCapacity / threshold * threshold;
            if (newCapacity > maxCapacity - threshold) {
                newCapacity = maxCapacity;
            } else {
                newCapacity += threshold;
            }
            return newCapacity;
        }

        // Not over threshold. Double up to 4 MiB, starting from 64.
        int newCapacity = 64;
        while (newCapacity < minNewCapacity) {
            newCapacity <<= 1;
        }

        return Math.min(newCapacity, maxCapacity);
    }

Netty的ByteBuf需要?jiǎng)討B(tài)擴(kuò)容來(lái)滿(mǎn)足需要特恬,擴(kuò)容過(guò)程: 默認(rèn)門(mén)限閾值為4MB(這個(gè)閾值是一個(gè)經(jīng)驗(yàn)值,不同場(chǎng)景盒粮,可能取值不同)鸵鸥,當(dāng)需要的容量等于門(mén)限閾值,使用閾值作為新的緩存區(qū)容量 目標(biāo)容量丹皱,如果大于閾值妒穴,采用每次步進(jìn)4MB的方式進(jìn)行內(nèi)存擴(kuò)張((需要擴(kuò)容值/4MB)*4MB),擴(kuò)張后需要和最大內(nèi)存(maxCapacity)進(jìn)行比較摊崭,大于maxCapacity的話(huà)就用maxCapacity,否則使用擴(kuò)容值 目標(biāo)容量讼油,如果小于閾值,采用倍增的方式呢簸,以64(字節(jié))作為基本數(shù)值矮台,每次翻倍增長(zhǎng)64 -->128 --> 256,直到倍增后的結(jié)果大于或等于需要的容量值根时。

8.空輪詢(xún)bug處理

NioEventLoop#select

    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            //按scheduled的task時(shí)間來(lái)計(jì)算select timeout時(shí)間瘦赫。
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

            long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
            if (nextWakeupTime != normalizedDeadlineNanos) {
                nextWakeupTime = normalizedDeadlineNanos;
            }

            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) { //已經(jīng)有定時(shí)task需要執(zhí)行了,或者超過(guò)最長(zhǎng)等待時(shí)間了
                    if (selectCnt == 0) {
                        //非阻塞蛤迎,沒(méi)有數(shù)據(jù)返回0
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                // Selector#wakeup. So we need to check task queue again before executing select operation.
                // If we don't, the task might be pended until select operation was timed out.
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
                //下面select阻塞中确虱,別人喚醒也可以可以的
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The code exists in an extra method to ensure the method is not too big to inline as this
                    // branch is not very likely to get hit very frequently.
                    selector = selectRebuildSelector(selectCnt);
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway
        }
    }

若Selector的輪詢(xún)結(jié)果為空,也沒(méi)有wakeup或新消息處理替裆,則發(fā)生空輪詢(xún)校辩,CPU使用率100%。

Netty的解決辦法:

  • 1辆童、對(duì)Selector的select操作周期進(jìn)行統(tǒng)計(jì)宜咒,每完成一次空的select操作進(jìn)行一次計(jì)數(shù),若在某個(gè)周期內(nèi)連續(xù)發(fā)生N次空輪詢(xún)把鉴,則觸發(fā)了epoll死循環(huán)bug故黑。
  • 2、重建Selector,判斷是否是其他線(xiàn)程發(fā)起的重建請(qǐng)求倍阐,若不是則將原SocketChannel從舊的Selector上去除注冊(cè)概疆,重新注冊(cè)到新的Selector上逗威,并將原來(lái)的Selector關(guān)閉峰搪。

Netty解決辦法具體步驟:

  • 1、先定義當(dāng)前時(shí)間currentTimeNanos凯旭。
  • 2概耻、接著計(jì)算出一個(gè)執(zhí)行最少需要的時(shí)間timeoutMillis。
  • 3罐呼、每次對(duì)selectCnt做++操作鞠柄。
  • 4、進(jìn)行判斷嫉柴,如果執(zhí)行達(dá)到或者超過(guò)了最少時(shí)間厌杜,則seletCnt重置為1(過(guò)濾到select超時(shí)返回情況)。
  • 5计螺、一旦到達(dá)SELECTOR_AUTO_REBUILD_THRESHOLD這個(gè)閥值夯尽,就需要重建selector來(lái)解決這個(gè)問(wèn)題。
  • 6登馒、這個(gè)閥值默認(rèn)是512匙握。

參考

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末陈轿,一起剝皮案震驚了整個(gè)濱河市圈纺,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌麦射,老刑警劉巖蛾娶,帶你破解...
    沈念sama閱讀 207,113評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異潜秋,居然都是意外死亡蛔琅,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門(mén)半等,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)揍愁,“玉大人,你說(shuō)我怎么就攤上這事杀饵∶Ф冢” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,340評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵切距,是天一觀(guān)的道長(zhǎng)朽缎。 經(jīng)常有香客問(wèn)我,道長(zhǎng),這世上最難降的妖魔是什么话肖? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,449評(píng)論 1 279
  • 正文 為了忘掉前任北秽,我火速辦了婚禮,結(jié)果婚禮上最筒,老公的妹妹穿的比我還像新娘贺氓。我一直安慰自己,他們只是感情好床蜘,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,445評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布辙培。 她就那樣靜靜地躺著,像睡著了一般邢锯。 火紅的嫁衣襯著肌膚如雪扬蕊。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,166評(píng)論 1 284
  • 那天丹擎,我揣著相機(jī)與錄音尾抑,去河邊找鬼。 笑死蒂培,一個(gè)胖子當(dāng)著我的面吹牛再愈,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播毁渗,決...
    沈念sama閱讀 38,442評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼践磅,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了灸异?” 一聲冷哼從身側(cè)響起府适,我...
    開(kāi)封第一講書(shū)人閱讀 37,105評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎肺樟,沒(méi)想到半個(gè)月后檐春,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,601評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡么伯,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,066評(píng)論 2 325
  • 正文 我和宋清朗相戀三年疟暖,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片田柔。...
    茶點(diǎn)故事閱讀 38,161評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡俐巴,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出硬爆,到底是詐尸還是另有隱情欣舵,我是刑警寧澤,帶...
    沈念sama閱讀 33,792評(píng)論 4 323
  • 正文 年R本政府宣布缀磕,位于F島的核電站缘圈,受9級(jí)特大地震影響劣光,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜糟把,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,351評(píng)論 3 307
  • 文/蒙蒙 一绢涡、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧遣疯,春花似錦雄可、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,352評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)狭归。三九已至夭坪,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間过椎,已是汗流浹背室梅。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,584評(píng)論 1 261
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留疚宇,地道東北人亡鼠。 一個(gè)月前我還...
    沈念sama閱讀 45,618評(píng)論 2 355
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像敷待,于是被迫代替她去往敵國(guó)和親间涵。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,916評(píng)論 2 344

推薦閱讀更多精彩內(nèi)容