Netty源碼_AbstractChannel和ChannelOutboundBuffer詳解

一. AbstractChannel

1.1 構(gòu)造方法

    /**
     * 創(chuàng)建一個新實例。
     */
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        // 創(chuàng)建
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

    /**
     * 創(chuàng)建一個新實例。
     */
    protected AbstractChannel(Channel parent, ChannelId id) {
        this.parent = parent;
        this.id = id;
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

可以看出在構(gòu)造方法中幼东,就綁定了這個通道的四個成員變量 parent,id,unsafe,pipeline

    protected ChannelId newId() {
        return DefaultChannelId.newInstance();
    }
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
    /**
     * 由子類來實現(xiàn)肮街,創(chuàng)建對應(yīng)的 Unsafe 類型實例
     */
    protected abstract AbstractUnsafe newUnsafe();
  • idpipeline都是直接創(chuàng)建林束,默認是 DefaultChannelIdDefaultChannelPipeline 類型。
  • newUnsafe() 是抽樣方法蹄梢,有子類才能創(chuàng)建對應(yīng)的 Unsafe 類型實例疙筹。

1.2 ChannelOutboundInvoker 接口方法

Channel 還繼承了 ChannelOutboundInvoker 接口,也就是說通道是可以發(fā)送出站 IO 操作的禁炒。

 @Override
    public ChannelFuture bind(SocketAddress localAddress) {
        return pipeline.bind(localAddress);
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress) {
        return pipeline.connect(remoteAddress);
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
        return pipeline.connect(remoteAddress, localAddress);
    }

    @Override
    public ChannelFuture disconnect() {
        return pipeline.disconnect();
    }

    @Override
    public ChannelFuture close() {
        return pipeline.close();
    }

    @Override
    public ChannelFuture deregister() {
        return pipeline.deregister();
    }
  @Override
    public Channel flush() {
        pipeline.flush();
        return this;
    }

    @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }
    ........

你會發(fā)現(xiàn)基本上都是調(diào)用 ChannelPipeline 對應(yīng)的方法而咆。

  • 也就是說直接調(diào)用通道Channel 的發(fā)送出站IO事件的方法,和調(diào)用管道pipeline() 發(fā)送出站IO事件的方法是一樣的幕袱。
  • 根據(jù) DefaultChannelPipeline 的分析暴备,我們知道這些出站 IO 事件最后都會調(diào)用到該通道的 Unsafe 屬性對應(yīng)方法進行處理。

1.3 抽樣方法

AbstractChannel 還有幾個需要子類實現(xiàn)抽樣方法们豌,由子類提供不同的處理邏輯:

  1. AbstractUnsafe newUnsafe()

    不同類型的 Channel有自己特定的 Unsafe 類型涯捻。

  2. boolean isCompatible(EventLoop loop)

    判斷給定的事件輪詢器 EventLoop 和當前的通道類型是不是兼容阁危。每種類型的通道Channel 都有自己特定的事件輪詢器。

  3. SocketAddress localAddress0()SocketAddress remoteAddress0()

    通道綁定的本地地址和通道連接的遠程地址汰瘫。

  4. void doBind(SocketAddress localAddress)

    進行綁定操作狂打,每種類型的通道綁定處理是不一樣的。

  5. void doDisconnect()

    進行連接操作混弥。

  6. void doClose()

    進行關(guān)閉連接操作趴乡。

  7. void doBeginRead()

    將通道設(shè)為開始讀操作。

  8. void doWrite(ChannelOutboundBuffer in)

    進行寫操作蝗拿。

AbstractChannel 真正重點的操作都是在 AbstractUnsafe 中實現(xiàn)的啊晾捏,下面講解 AbstractUnsafe

二. AbstractUnsafe 類

2.1 成員屬性

        // 寫緩沖區(qū) ChannelOutboundBuffer
        private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);

        // 用于在接收數(shù)據(jù)時分配緩存區(qū) ByteBuf
        private RecvByteBufAllocator.Handle recvHandle;

        // 當前是否正在刷新數(shù)據(jù)哀托,防止重復刷新數(shù)據(jù)
        private boolean inFlush0;

        // 如果通道從未被注冊惦辛,則為true,否則為false
        private boolean neverRegistered = true;

2.2 注冊 register

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");

            if (isRegistered()) {
                // 當前通道已經(jīng)注冊仓手,失敗胖齐,調(diào)用 promise 的setFailure方法進行通知
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                // 這個事件輪詢器和當前通道不兼容,失敗贬蛙,調(diào)用 promise 的setFailure方法進行通知
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                // 當前線程就是通道 事件輪詢器線程,直接調(diào)用 register0 方法
                register0(promise);
            } else {
                try {
                    // 通過 eventLoop.execute 方法,
                    // 保證 register0 方法在通道事件輪詢器線程中調(diào)用
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    // 發(fā)生異常蝉稳,要關(guān)閉通道饿这,并進行相關(guān)通知
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

        private void register0(ChannelPromise promise) {
            try {
                // 檢查通道是否仍然打開
                // 當注冊操作在 eventLoop 線程之外調(diào)用的話舅列,
                // 有可能這時通道被別的線程關(guān)閉了
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                // 第一次注冊
                boolean firstRegistration = neverRegistered;
                // 調(diào)用 AbstractChannel 的 doRegister 方法,進行注冊操作
                doRegister();
                neverRegistered = false;
                // 設(shè)置 AbstractChannel 的 registered 成員屬性摩渺,表示已經(jīng)注冊
                registered = true;

                // 確保在通道未注冊前添加到管道上的 ChannelHandler 的 handlerAdded(…) 也會被調(diào)用
                // 這是必需的,因為用戶可能已經(jīng)通過ChannelFutureListener中的管道觸發(fā)了事件翼虫。
                pipeline.invokeHandlerAddedIfNeeded();

                // 注冊成功的通知
                safeSetSuccess(promise);
                // 發(fā)送注冊 入站IO事件
                pipeline.fireChannelRegistered();
                // 只有在通道從未注冊的情況下才觸發(fā) channelActive 事件珍剑。
                // 這可以防止在通道被取消注冊和重新注冊時觸發(fā)多個通道 channelActive 事件掸宛。
                if (isActive()) {
                    if (firstRegistration) {
                        // 第一次注冊時,才會發(fā)送 channelActive 事件
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // 此通道在注冊之前招拙,設(shè)置了 autoRead()唧瘾。
                        // 這意味著我們需要重新設(shè)置開始讀取操作,以便介紹入站數(shù)據(jù)迫像。

                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // 發(fā)生異常劈愚,要關(guān)閉通道,并進行相關(guān)通知
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

將通道注冊到事件輪詢器EventLoop 上:

  • 如果當前通道已經(jīng)注冊闻妓,或者當前通道和事件輪詢器不兼容菌羽,那么注冊失敗,調(diào)用 promisesetFailure方法進行通知由缆。
  • 保證在事件輪詢器線程調(diào)用實際注冊register0方法注祖。
  • 調(diào)用 AbstractChanneldoRegister 方法,進行注冊操作均唉,發(fā)送注冊事件是晨。
  • 如果通道已活躍,第一次注冊的時候舔箭,就會發(fā)送 channelActive 事件;
  • 如果不是罩缴,那么就可能設(shè)置開始讀的操作。
  • 如果這期間發(fā)生異常层扶,就關(guān)閉通道箫章,并進行相關(guān)通知。

2.3 取消注冊 deregister

        @Override
        public final void deregister(final ChannelPromise promise) {
            assertEventLoop();

            deregister(promise, false);
        }

        private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
            // 如果 promise不是 不可取消的镜会,那么直接返回
            if (!promise.setUncancellable()) {
                return;
            }

            if (!registered) {
                // 當前通道沒有注冊檬寂,那么也表示取消注冊成功,進行成功通知
                safeSetSuccess(promise);
                return;
            }

            // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
            // we need to ensure we do the actual deregister operation later. This is needed as for example,
            // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
            // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
            // the deregister operation this could lead to have a handler invoked by different EventLoop and so
            // threads.

            // See:
            // https://github.com/netty/netty/issues/4435

            // 通過 invokeLater 方法戳表,將 doDeregister() 方法放在下一個事件輪詢周期進行
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 調(diào)用 AbstractChannel 的 doDeregister 方法桶至,進行取消注冊操作
                        doDeregister();
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception occurred while deregistering a channel.", t);
                    } finally {
                        if (fireChannelInactive) {
                            pipeline.fireChannelInactive();
                        }
                        // Some transports like local and AIO does not allow the deregistration of
                        // an open channel.  Their doDeregister() calls close(). Consequently,
                        // close() calls deregister() again - no need to fire channelUnregistered, so check
                        // if it was registered.
                        if (registered) {
                            // 如果通道之前是注冊成功了,
                            // 這里才發(fā)送取消注冊的 IO 事件
                            registered = false;
                            pipeline.fireChannelUnregistered();
                        }
                        // 取消綁定成功通知
                        safeSetSuccess(promise);
                    }
                }
            });
        }

重點就是調(diào)用AbstractChanneldoDeregister 方法匾旭,進行取消注冊操作镣屹。
如果 fireChannelInactive == true,將發(fā)送 ChannelInactive 事件季率。

2.4 綁定 bind

       @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            assertEventLoop();
            // 檢查通道是否仍然打開
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            // See: https://github.com/netty/netty/issues/576
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                localAddress instanceof InetSocketAddress &&
                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
                // Warn a user about the fact that a non-root user can't receive a
                // broadcast packet on *nix if the socket is bound on non-wildcard address.
                logger.warn(
                        "A non-root user can't receive a broadcast packet if the socket " +
                        "is not bound to a wildcard address; binding to a non-wildcard " +
                        "address (" + localAddress + ") anyway as requested.");
            }

            boolean wasActive = isActive();
            try {
                // 調(diào)用 AbstractChannel 的 doBind 方法野瘦,進行綁定操作
                doBind(localAddress);
            } catch (Throwable t) {
                // 綁定失敗的通知
                safeSetFailure(promise, t);

                // doBind(localAddress) 方法有可能關(guān)閉這個通道,
                // 就可能需要進行關(guān)閉通道的通知
                closeIfClosed();
                return;
            }

           // 如果綁定操作后,通道從不活躍變成活躍鞭光,就要發(fā)送 ChannelActive 事件
            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }
            // 綁定成功的通知
            safeSetSuccess(promise);
        }
  • 這個方法邏輯比較簡單吏廉,重點就是調(diào)用AbstractChanneldoBind方法,進行綁定操作惰许。
  • 如果綁定操作后席覆,通道從不活躍變成活躍,就要發(fā)送 ChannelActive 事件汹买。

2.5 取消連接 disconnect

        @Override
        public final void disconnect(final ChannelPromise promise) {
            assertEventLoop();
            // 如果 promise不是 不可取消的佩伤,那么直接返回
            if (!promise.setUncancellable()) {
                return;
            }

            boolean wasActive = isActive();
            try {
                doDisconnect();
                // 重置 remoteAddress and localAddress
                remoteAddress = null;
                localAddress = null;
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                // doDisconnect() 方法有可能關(guān)閉這個通道,
                // 就可能需要進行關(guān)閉通道的通知
                closeIfClosed();
                return;
            }

            // 如果取消連接后晦毙,通道從活躍變成不活躍生巡,就要發(fā)送 ChannelInactive 事件
            if (wasActive && !isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelInactive();
                    }
                });
            }

            safeSetSuccess(promise);
            // doDisconnect() 方法有可能關(guān)閉這個通道,
            // 就可能需要進行關(guān)閉通道的通知
            closeIfClosed();
        }
  • 這個方法邏輯比較簡單见妒,重點就是調(diào)用AbstractChanneldoDisconnect()方法孤荣,進行取消連接操作。
  • 如果取消連接操作成功后须揣,通道從活躍變成不活躍盐股,就要發(fā)送 ChannelInactive 事件。

2.6關(guān)閉 close

        public void close(final ChannelPromise promise) {
            assertEventLoop();

            ClosedChannelException closedChannelException =
                    StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
            close(promise, closedChannelException, closedChannelException, false);
        }

     private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify) {
            // 如果 promise不是 不可取消的耻卡,那么直接返回
            if (!promise.setUncancellable()) {
                return;
            }

            if (closeInitiated) {
                // closeInitiated == true疯汁,已經(jīng)調(diào)用過關(guān)閉操作了,就要return 返回了卵酪。
                if (closeFuture.isDone()) {
                    // 已經(jīng)通道已經(jīng)關(guān)閉了幌蚊,通知 promise
                    safeSetSuccess(promise);
                } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
                    // 當前通道正在關(guān)閉,那么就添加一個監(jiān)聽器溃卡,當關(guān)閉成功后霹肝,再通知 promise
                    closeFuture.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            promise.setSuccess();
                        }
                    });
                }
                return;
            }

            // 保證關(guān)閉方法只調(diào)用一次,不能重復調(diào)用
            closeInitiated = true;

            final boolean wasActive = isActive();

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            // 禁止再向?qū)懢彌_區(qū) outboundBuffer 添加任何消息和刷新操作塑煎。
            this.outboundBuffer = null;
            Executor closeExecutor = prepareToClose();
            if (closeExecutor != null) {
                closeExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // Execute the close.
                            doClose0(promise);
                        } finally {
                            // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
                            invokeLater(new Runnable() {
                                @Override
                                public void run() {
                                    if (outboundBuffer != null) {
                                        // 使寫緩沖區(qū)中所有排隊的消息失敗
                                        outboundBuffer.failFlushed(cause, notify);
                                         // 關(guān)閉寫緩沖區(qū)
                                        outboundBuffer.close(closeCause);
                                    }
                                    fireChannelInactiveAndDeregister(wasActive);
                                }
                            });
                        }
                    }
                });
            } else {
                try {
                    // Close the channel and fail the queued messages in all cases.
                    doClose0(promise);
                } finally {
                    if (outboundBuffer != null) {
                         // 使寫緩沖區(qū)中所有排隊的消息失敗
                        outboundBuffer.failFlushed(cause, notify);
                         // 關(guān)閉寫緩沖區(qū)
                        outboundBuffer.close(closeCause);
                    }
                }

                if (inFlush0) {
                    // 如果正在刷新操作,那么就讓 fireChannelInactiveAndDeregister 操作臭蚁,
                    // 放到下一個事件輪詢周期中處理
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            fireChannelInactiveAndDeregister(wasActive);
                        }
                    });
                } else {
                    // 取消注冊和可能發(fā)送 ChannelInactive 事件
                    fireChannelInactiveAndDeregister(wasActive);
                }
            }
        }

        private void doClose0(ChannelPromise promise) {
            try {
                doClose();
                closeFuture.setClosed();
                safeSetSuccess(promise);
            } catch (Throwable t) {
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

        private void fireChannelInactiveAndDeregister(final boolean wasActive) {
            deregister(voidPromise(), wasActive && !isActive());
        }

方法流程:

  1. 通過 closeInitiated 成員屬性保證關(guān)閉方法只調(diào)用一次最铁,不能重復調(diào)用。
  2. 因為關(guān)閉連接垮兑,需要考慮寫緩沖區(qū) ChannelOutboundBuffer 中的待寫入數(shù)據(jù)的問題冷尉。
  3. 通過 prepareToClose() 方法,返回一個關(guān)閉通道的事件執(zhí)行器系枪。
    • 如果不為空雀哨,那么就在這個事件執(zhí)行器中進行接下來的關(guān)閉操作。
    • 如果為空,那么就在當前線程進行接下來的關(guān)閉操作雾棺。
  4. 調(diào)用 doClose0(promise) 方法膊夹,進行關(guān)閉以及操作成功或失敗的相關(guān)通知。
  5. 處理寫緩沖區(qū) outboundBuffer 中的數(shù)據(jù)捌浩,并關(guān)閉寫緩沖區(qū)放刨。
  6. 最后調(diào)用 fireChannelInactiveAndDeregister 方法,取消管道注冊尸饺,以及可能會發(fā)送 ChannelInactive 事件进统。

    如果在 doClose() 方法之后,通道從活躍變成不活躍的情況下浪听,才會發(fā)送 ChannelInactive 事件螟碎。

2.7 shutdownOutput

       @UnstableApi
        public final void shutdownOutput(final ChannelPromise promise) {
            assertEventLoop();
            shutdownOutput(promise, null);
        }

        /**
         * 關(guān)閉相應(yīng)通道的輸出部分。
         * 例如迹栓,這將清理ChannelOutboundBuffer并不再允許任何寫操作掉分。
         */
        private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
            if (!promise.setUncancellable()) {
                return;
            }

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                promise.setFailure(new ClosedChannelException());
                return;
            }
            // 禁止再向?qū)懢彌_區(qū) outboundBuffer 添加任何消息和刷新操作。
            this.outboundBuffer = null;

            final Throwable shutdownCause = cause == null ?
                    new ChannelOutputShutdownException("Channel output shutdown") :
                    new ChannelOutputShutdownException("Channel output shutdown", cause);
            Executor closeExecutor = prepareToClose();
            if (closeExecutor != null) {
                closeExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 調(diào)用 AbstractChannel 的 doShutdownOutput 方法迈螟,
                            // 進行 shutdown 操作
                            doShutdownOutput();
                            // 操作成功通知
                            promise.setSuccess();
                            // 操作失敗通知
                        } catch (Throwable err) {
                            promise.setFailure(err);
                        } finally {
                            // Dispatch to the EventLoop
                            eventLoop().execute(new Runnable() {
                                @Override
                                public void run() {
                                    // 在 Shutdown 的時候叉抡,關(guān)閉寫緩沖區(qū) ChannelOutboundBuffer,
                                    // 并發(fā)送用戶通知事件
                                    closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
                                }
                            });
                        }
                    }
                });
            } else {
                try {
                    // 調(diào)用 AbstractChannel 的 doShutdownOutput 方法,
                    // 進行 shutdown 操作
                    doShutdownOutput();
                    // 操作成功通知
                    promise.setSuccess();
                } catch (Throwable err) {
                    // 操作失敗通知
                    promise.setFailure(err);
                } finally {
                    // 在 Shutdown 的時候答毫,關(guān)閉寫緩沖區(qū) ChannelOutboundBuffer,
                    // 并發(fā)送用戶通知事件
                    closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
                }
            }
        }

        /**
         * 在 Shutdown 的時候褥民,關(guān)閉寫緩沖區(qū) ChannelOutboundBuffer,
         * 并發(fā)送用戶通知事件
         */
        private void closeOutboundBufferForShutdown(
                ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {

            // 使寫緩沖區(qū)中所有排隊的消息失敗
            buffer.failFlushed(cause, false);
            // 關(guān)閉寫緩沖區(qū)
            buffer.close(cause, true);
            // 發(fā)送一個通道 Shutdown 的用戶通知事件
            pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
        }

shutdown 的方法流程和 close 很像洗搂,區(qū)別點:

  • shutdown 是調(diào)用AbstractChanneldoShutdownOutput() 方法進行相關(guān)操作消返,而 close 是調(diào)用AbstractChanneldoClose() 方法。
  • close 最后會取消注冊耘拇,以及可能會發(fā)送ChannelInactive 事件撵颊。
  • shutdown 會發(fā)送一個 ChannelOutputShutdownEvent.INSTANCE 用戶自定義的通知事件。

2.8 強制關(guān)閉 closeForcibly

        @Override
        public final void closeForcibly() {
            assertEventLoop();

            try {
                doClose();
            } catch (Exception e) {
                logger.warn("Failed to close a channel.", e);
            }
        }

你會發(fā)現(xiàn)只調(diào)用了AbstractChanneldoClose() 方法進行關(guān)閉操作惫叛,不觸發(fā)任何事件倡勇,也不處理寫緩沖區(qū)。只可能在某些特殊情況下調(diào)用嘉涌,例如嘗試注冊失敗的時候妻熊。

2.9 開始讀 beginRead

        @Override
        public final void beginRead() {
            assertEventLoop();

            try {
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }

調(diào)用AbstractChanneldoBeginRead() 方法設(shè)置通道開始讀取數(shù)據(jù)。

2.10 寫操作 write

        @Override
        public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                // 寫緩沖區(qū)為 null仑最,
                try {
                    // 現(xiàn)在釋放資源扔役,以防止資源泄漏
                    ReferenceCountUtil.release(msg);
                } finally {
                    //  如果outboundBuffer為空,我們就知道通道被關(guān)閉了警医,所以立即進行失敗通知亿胸。
                    // See https://github.com/netty/netty/issues/2362
                    safeSetFailure(promise,
                            newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
                }
                return;
            }

            int size;
            try {
                // 進行消息的轉(zhuǎn)換坯钦,例如將堆緩沖區(qū)變成直接緩沖區(qū)
                msg = filterOutboundMessage(msg);
                // 估算數(shù)據(jù)的大小
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                try {
                    // 失敗時需要釋放資源,以防止資源泄漏
                    ReferenceCountUtil.release(msg);
                } finally {
                    // 進行操作失敗的通知
                    safeSetFailure(promise, t);
                }
                return;
            }

            // 將數(shù)據(jù)添加到寫緩沖區(qū) outboundBuffer 中
            outboundBuffer.addMessage(msg, size, promise);
        }

方法流程

  1. 先判斷寫緩沖區(qū) outboundBuffer 是不是為 null侈玄,為空說明通道已關(guān)閉婉刀,進行失敗通知。
  2. 通過 filterOutboundMessage(msg) 方法進行數(shù)據(jù)轉(zhuǎn)換拗馒,例如將堆緩沖區(qū)變成直接緩沖區(qū)路星。
  3. 估算數(shù)據(jù)大小。
  4. 通過 outboundBuffer.addMessage(...) 方法诱桂,將數(shù)據(jù)添加到寫緩沖區(qū) outboundBuffer 中洋丐。
  5. 如果發(fā)送異常,記得釋放數(shù)據(jù) msg 的引用挥等,防止內(nèi)存泄露友绝,并進行操作失敗通知。

2.11 刷新 flush

       @Override
        public final void flush() {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            // 寫緩沖區(qū)為空肝劲,直接返回
            if (outboundBuffer == null) {
                return;
            }

            // 將寫緩沖區(qū)中的消息都標記成待刷新
            outboundBuffer.addFlush();
            // 進行刷新操作
            flush0();
        }

        @SuppressWarnings("deprecation")
        protected void flush0() {
            if (inFlush0) {
                // 避免重復刷新
                return;
            }

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            // 當前寫緩沖區(qū)沒有數(shù)據(jù)迁客,那么直接返回
            if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                return;
            }

            // 避免重復刷新
            inFlush0 = true;

            // 如果通道處于非活動狀態(tài),則將所有掛起的寫請求標記為失敗辞槐。
            if (!isActive()) {
                try {
                    // Check if we need to generate the exception at all.
                    if (!outboundBuffer.isEmpty()) {
                        if (isOpen()) {
                            outboundBuffer.failFlushed(new NotYetConnectedException(), true);
                        } else {
                            // Do not trigger channelWritabilityChanged because the channel is closed already.
                            outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
                        }
                    }
                } finally {
                    // 刷新操作完成掷漱,將 inFlush0重新設(shè)置為 false,以便下次刷新榄檬。
                    inFlush0 = false;
                }
                return;
            }

            try {
                // 將給定緩沖區(qū)的內(nèi)容刷新到遠端
                doWrite(outboundBuffer);
            } catch (Throwable t) {
                handleWriteError(t);
            } finally {
                // 刷新操作完成卜范,將 inFlush0重新設(shè)置為 false,以便下次刷新鹿榜。
                inFlush0 = false;
            }
        }
  • 通過 inFlush0 成員屬性海雪,來避免重復刷新。
  • 如果通道處于非活動狀態(tài)舱殿,則將所有掛起的寫請求標記為失敗奥裸。
  • 通過 AbstractChanneldoWrite(outboundBuffer) 方法,將緩沖區(qū)的內(nèi)容刷新到遠端沪袭。

2.12 小結(jié)

對比 Unsafe 的方法湾宙,你會發(fā)現(xiàn) AbstractUnsafe 中沒有實現(xiàn) connect(...) 連接方法。

對比發(fā)送入站IO事件:

  1. ChannelRegisteredChannelUnregistered

    • register 方法會發(fā)送 ChannelRegistered 事件冈绊。
    • deregister 方法只有在通道之前已經(jīng)注冊之后创倔,才會發(fā)送 ChannelUnregistered 事件。
  2. ChannelActiveChannelInactive

    • 一般都是通道Channel從不活躍變成活躍焚碌,要發(fā)送 ChannelActive 事件;可能引起這個變化的操作有 bindconnect 操作霸妹。
    • 通道Channel從活躍變成不活躍十电,就要發(fā)送 ChannelInactive 事件;可能引起這個變化的操作有 disconnect,closeshutdown
    • 最后如果第一次注冊時鹃骂,且當前通道是活躍狀態(tài)台盯,也會發(fā)送 ChannelActive 事件。

三. ChannelOutboundBuffer

AbstractChannel.Unsafe 中看到用戶調(diào)用write(...) 方法寫的數(shù)據(jù)畏线,會先添加到寫緩沖區(qū) ChannelOutboundBuffer 中静盅,然后調(diào)用 flush() 方法,才將寫緩沖區(qū)中的數(shù)據(jù)發(fā)送到遠端寝殴。

3.1 重要成員屬性

    // 在鏈表結(jié)構(gòu)中第一個被刷新的節(jié)點
    private Entry flushedEntry;

    // 在鏈表結(jié)構(gòu)中第一個未刷新的節(jié)點
    private Entry unflushedEntry;

    // 表示鏈表中最后一個節(jié)點
    private Entry tailEntry;
    // 等待刷新節(jié)點的數(shù)量
    private int flushed;

寫緩沖區(qū)通過鏈表來儲存數(shù)據(jù)(依靠 Entry.next 來實現(xiàn)鏈表)蒿叠,鏈表形式 Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)

  • flushedEntry 表示第一個被刷新的節(jié)點,在鏈表頭蚣常,當然也是通過 addFlush() 方法設(shè)置的市咽。
  • unflushedEntry 表示第一個未刷新的節(jié)點,表示還沒有被標記刷新的第一個節(jié)點抵蚊。
  • tailEntry 最后一個節(jié)點施绎。
  • flushed 刷新節(jié)點的數(shù)量,這個屬性很重要贞绳,靠它來標記刷新節(jié)點谷醉,也就是說從 flushedEntry 開始, flushed 數(shù)量的節(jié)點都被標記為刷新節(jié)點了冈闭。

3.2 重要方法

3.2.1 添加數(shù)據(jù)

這個方法一般在 AbstractChannel.AbstractUnsafewrite(...) 方法中調(diào)用俱尼。

 /**
     * 將給定的消息 msg 添加到ChannelOutboundBuffer中。
     * 一旦消息寫入拒秘,給定的ChannelPromise將被通知号显。
     */
    public void addMessage(Object msg, int size, ChannelPromise promise) {
        // 將給定消息封裝成一個節(jié)點
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
            // 將新消息節(jié)點添加到隊列尾
            Entry tail = tailEntry;
            tail.next = entry;
        }
        tailEntry = entry;
        if (unflushedEntry == null) {
            // 如果未刷新節(jié)點為空,說明隊列節(jié)點都變成刷新節(jié)點了躺酒,
            // 那么這個新添加的節(jié)點押蚤,就是未刷新節(jié)點的頭了。
            unflushedEntry = entry;
        }

        // See https://github.com/netty/netty/issues/1619
        // 向未刷新的數(shù)組添加消息后羹应,增加掛起的字節(jié)數(shù)揽碘。
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }
  • 先將數(shù)據(jù) msg 封裝成一個節(jié)點 entry,并將節(jié)點添加到鏈表尾园匹。
  • 如果 unflushedEntrynull,那么這個節(jié)點就是第一個未刷新節(jié)點雳刺。
  • incrementPendingOutboundBytes(...) 方法,增加掛起的字節(jié)數(shù)裸违,看是否需要改變通道的 可寫屬性掖桦。

3.2.2 標記刷新

這個方法一般在 AbstractChannel.AbstractUnsafeflush() 方法中調(diào)用。

  /**
     * 向此ChannelOutboundBuffer添加刷新供汛。
     * 這意味著所有以前添加的消息都被標記為刷新枪汪,因此您將能夠處理它們涌穆。
     */
    public void addFlush() {
        // There is no need to process all entries if there was already a flush before and no new messages
        // where added in the meantime.
        //
        // See https://github.com/netty/netty/issues/2577
        // 未刷新節(jié)點后面的鏈表示新添加的節(jié)點列表,都是要加入到刷新中
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) {
                // there is no flushedEntry yet, so start with the entry
                flushedEntry = entry;
            }
            do {
                flushed ++;
                // 將所有要刷新的節(jié)點變成不可取消的
                if (!entry.promise.setUncancellable()) {
                    // Was cancelled so make sure we free up memory and notify about the freed bytes
                    // 掛起消息被取消雀久,所以確保我們釋放內(nèi)存并通知釋放的字節(jié)
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);

            // 節(jié)點都變成已刷新的了宿稀,未刷新節(jié)點就設(shè)置為 null
            unflushedEntry = null;
        }
    }
  • 將從 unflushedEntry 未刷新節(jié)點開始到鏈表尾的所有節(jié)點都標記為刷新。通過 flushed++ 來增加刷新節(jié)點數(shù)量赖捌。
  • 調(diào)用 setUncancellable(...) 要寫入的節(jié)點是不可取消的祝沸,如果設(shè)置失敗,就要取消掛起數(shù)據(jù)越庇,并調(diào)用 decrementPendingOutboundBytes(...) 減少掛起字節(jié)數(shù)罩锐,看是否需要改變通道的 可寫屬性。

3.2.3 刪除節(jié)點


    /**
     * 將刪除當前消息悦荒,將其ChannelPromise標記為success并返回true唯欣。
     * 如果在調(diào)用此方法時不存在刷新的消息,則返回false搬味,表示沒有準備好處理的消息境氢。
     */
    public boolean remove() {
        Entry e = flushedEntry;
        if (e == null) {
            clearNioBuffers();
            return false;
        }
        Object msg = e.msg;

        ChannelPromise promise = e.promise;
        int size = e.pendingSize;

        removeEntry(e);

        if (!e.cancelled) {
            // only release message, notify and decrement if it was not canceled before.
            ReferenceCountUtil.safeRelease(msg);
            safeSuccess(promise);
            decrementPendingOutboundBytes(size, false, true);
        }

        // recycle the entry
        e.recycle();

        return true;
    }

    private void removeEntry(Entry e) {
        if (-- flushed == 0) {
            // flushed == 0, 表示所有刷新節(jié)點都被處理了
            flushedEntry = null;
            if (e == tailEntry) {
                tailEntry = null;
                unflushedEntry = null;
            }
        } else {
            // 將下一個節(jié)點變成刷新節(jié)點
            flushedEntry = e.next;
        }
    }

當緩存區(qū)當前刷新節(jié)點數(shù)據(jù)被寫入到遠端了,那么調(diào)用這個 remove() 方法碰纬,移除當前節(jié)點萍聊,得到下一個刷新節(jié)點。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末悦析,一起剝皮案震驚了整個濱河市寿桨,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌强戴,老刑警劉巖亭螟,帶你破解...
    沈念sama閱讀 217,084評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異骑歹,居然都是意外死亡预烙,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,623評論 3 392
  • 文/潘曉璐 我一進店門道媚,熙熙樓的掌柜王于貴愁眉苦臉地迎上來扁掸,“玉大人,你說我怎么就攤上這事最域∏捶郑” “怎么了肄满?”我有些...
    開封第一講書人閱讀 163,450評論 0 353
  • 文/不壞的土叔 我叫張陵袖牙,是天一觀的道長。 經(jīng)常有香客問我撮执,道長薄翅,這世上最難降的妖魔是什么钞馁? 我笑而不...
    開封第一講書人閱讀 58,322評論 1 293
  • 正文 為了忘掉前任虑省,我火速辦了婚禮,結(jié)果婚禮上僧凰,老公的妹妹穿的比我還像新娘。我一直安慰自己熟丸,他們只是感情好训措,可當我...
    茶點故事閱讀 67,370評論 6 390
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著光羞,像睡著了一般绩鸣。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上纱兑,一...
    開封第一講書人閱讀 51,274評論 1 300
  • 那天呀闻,我揣著相機與錄音,去河邊找鬼潜慎。 笑死捡多,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的铐炫。 我是一名探鬼主播垒手,決...
    沈念sama閱讀 40,126評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼倒信!你這毒婦竟也來了科贬?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,980評論 0 275
  • 序言:老撾萬榮一對情侶失蹤鳖悠,失蹤者是張志新(化名)和其女友劉穎榜掌,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體乘综,經(jīng)...
    沈念sama閱讀 45,414評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡憎账,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,599評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了瘾带。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鼠哥。...
    茶點故事閱讀 39,773評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖看政,靈堂內(nèi)的尸體忽然破棺而出朴恳,到底是詐尸還是另有隱情,我是刑警寧澤允蚣,帶...
    沈念sama閱讀 35,470評論 5 344
  • 正文 年R本政府宣布于颖,位于F島的核電站,受9級特大地震影響嚷兔,放射性物質(zhì)發(fā)生泄漏森渐。R本人自食惡果不足惜做入,卻給世界環(huán)境...
    茶點故事閱讀 41,080評論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望同衣。 院中可真熱鬧竟块,春花似錦、人聲如沸耐齐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,713評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽埠况。三九已至耸携,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間辕翰,已是汗流浹背夺衍。 一陣腳步聲響...
    開封第一講書人閱讀 32,852評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留喜命,地道東北人沟沙。 一個月前我還...
    沈念sama閱讀 47,865評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像渊抄,于是被迫代替她去往敵國和親尝胆。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,689評論 2 354

推薦閱讀更多精彩內(nèi)容