Netty Pipeline源碼分析(1)

原文鏈接:https://wangwei.one/posts/netty-pipeline-source-analyse-1.html

前面,我們分析了Netty EventLoop的 創(chuàng)建啟動(dòng) 原理,接下里我們來(lái)分析Netty中另外兩個(gè)重要組件—— ChannelHandlerPipeline张惹。Netty中I/O事件的傳播機(jī)制均由它負(fù)責(zé)困介,下面我們來(lái)看看它是如何實(shí)現(xiàn)的。

Netty版本:4.1.30

我們前面在講 Channel創(chuàng)建 時(shí)敛滋,在AbstractChannel的構(gòu)造函數(shù)中田篇, 一筆帶過(guò)地提到了Pipeline替废,現(xiàn)在我們來(lái)深入分析一下它的原理。

概述

Netty channel lifecycle

前面斯辰,我們?cè)诜治?Netty channel 源碼時(shí)舶担,分析了Channel的創(chuàng)建坡疼、初始化彬呻、注冊(cè)、綁定過(guò)程柄瑰。在Netty中闸氮,channel的生命周期如下所示:

Channel lifecycle
  • ChannelRegistered:Channel注冊(cè)到了EventLoop上
  • ChannelActive:Channel激活,連接到了遠(yuǎn)程某一個(gè)節(jié)點(diǎn)上教沾,可以收發(fā)數(shù)據(jù)了
  • ChannelInactive:斷開連接
  • ChannelUnregistered:Channel從EventLoop上取消注冊(cè)

Netty channelHandler

Channel 每一次狀態(tài)的變化蒲跨,都會(huì)產(chǎn)生一個(gè)事件,調(diào)用 ChannelHandler 中對(duì)應(yīng)的方法進(jìn)行處理授翻,我們看下 ChannelHandler的UML或悲,其中最為重要的兩個(gè)ChannelHandler:

ChannelHandlerUML

Netty ChannelPipeline

前面我們?cè)诜治鯟hannel創(chuàng)建過(guò)程時(shí)孙咪,每一個(gè)新創(chuàng)建的Channel都將會(huì)被分配一個(gè)新的ChannelPipeline。ChannelPipeline是一個(gè)攔截流經(jīng)Channel的入站和出站事件的ChannelHandler實(shí)例鏈巡语,如圖所示:

Channel-Pipeline

一個(gè) Channel 包含了一個(gè) ChannelPipeline翎蹈,ChannelPipeline內(nèi)部是一個(gè)雙向的鏈表結(jié)構(gòu),內(nèi)部由一個(gè)個(gè)的ChannelHandlerContext節(jié)點(diǎn)組成男公,ChannelPipeline有頭尾兩個(gè)固定的節(jié)點(diǎn)HeadContext與TailContext荤堪。用戶自定的ChannelHandler就是由ChannelHandlerContext包裝成Pipeline的節(jié)點(diǎn),參與Channel整個(gè)生命周期中所觸發(fā)的入站事件與出站事件以及相應(yīng)數(shù)據(jù)流的攔截處理枢赔。

根據(jù)事件的起源澄阳,事件將會(huì)被ChannelInboundHandler(入站處理器)或者ChannelOutboundHandler(出站處理器)處理。隨后踏拜,通過(guò)調(diào)用ChannelHandlerContext實(shí)現(xiàn)碎赢,它將被轉(zhuǎn)發(fā)給同一超類型的下一個(gè)ChannelHandler,如圖所示:

Channel Pipeline Event Flow

Pipeline UML

我們先來(lái)看下 ChannelPipeline 以及 ChannelHandlerContext 的類圖結(jié)構(gòu)速梗,它們都實(shí)現(xiàn)了ChannelInboundInvokerChannelOutboundInvoker接口揩抡。

Netty Pipeline UML

Pipeline初始化

AbstractChannel構(gòu)造函數(shù)如下:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    // 創(chuàng)建默認(rèn)Pipeline
    pipeline = newChannelPipeline();
}

// 創(chuàng)建默認(rèn)Pipeline
protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

DefaultChannelPipeline 構(gòu)造函數(shù)如下:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    // 設(shè)置尾部節(jié)點(diǎn)
    tail = new TailContext(this);
    // 設(shè)置頭部節(jié)點(diǎn)
    head = new HeadContext(this);
    
    // 將tail與head串聯(lián)起來(lái)
    head.next = tail;
    tail.prev = head;
}

我們可以看到Pipeline其實(shí)是一個(gè)雙向鏈表的結(jié)構(gòu),剛剛初始化的時(shí)候镀琉,Pipeline(管道)中只有兩個(gè)節(jié)點(diǎn)峦嗤,如圖:

Pipeline Init Ctx

接下來(lái)我們看看組成Pipeline節(jié)點(diǎn)的對(duì)象—— ChannelHandlerContext。

ChannelHandlerContext

ChannelHandlerContext 實(shí)現(xiàn)了AttributeMap屋摔、ChannelInboundInvoker烁设、ChannelOutboundInvoker接口。Pipeline中的事件傳播钓试,都是由ChannelHandlerContext負(fù)責(zé)装黑,將發(fā)生的事件從一個(gè)節(jié)點(diǎn)傳到下一個(gè)節(jié)點(diǎn)。

ChannelHandlerContext接口

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {

    // 返回ChannelHandlerContext中綁定的Channel
    Channel channel();

    // 返回專用于執(zhí)行任務(wù)的 EventExecutor
    EventExecutor executor();

    // 返回ChannelHandlerContext的唯一名稱弓熏。該名字將在ChannelHandler被添加到ChannelPipeline時(shí)會(huì)被用到恋谭,從ChannelPipeline中訪問(wèn)注冊(cè)的ChannelHandler時(shí),也會(huì)被用到挽鞠。
    String name();
    
    // 返回ChannelHandlerContext中綁定的ChannelHandler
    ChannelHandler handler();

    // 屬于這個(gè)ChannelHandlerContext的ChannelHandler從ChannelPipeline移除了疚颊,返回true
    boolean isRemoved();
        
    @Override
    ChannelHandlerContext fireChannelRegistered();

    @Override
    ChannelHandlerContext fireChannelUnregistered();

    @Override
    ChannelHandlerContext fireChannelActive();

    @Override
    ChannelHandlerContext fireChannelInactive();

    @Override
    ChannelHandlerContext fireExceptionCaught(Throwable cause);

    @Override
    ChannelHandlerContext fireUserEventTriggered(Object evt);

    @Override
    ChannelHandlerContext fireChannelRead(Object msg);

    @Override
    ChannelHandlerContext fireChannelReadComplete();

    @Override
    ChannelHandlerContext fireChannelWritabilityChanged();

    @Override
    ChannelHandlerContext read();

    @Override
    ChannelHandlerContext flush();

    // 返回分配的ChannelPipeline
    ChannelPipeline pipeline();

    // 返回用于分配ByteBuf的ByteBufAllocator
    ByteBufAllocator alloc();

}

AttributeMap接口

實(shí)現(xiàn) AttributeMap 接口,表示ChannelHandlerContext節(jié)點(diǎn)可以存儲(chǔ)自定義的屬性信认。

// 屬性Map接口
public interface AttributeMap {
    // 通過(guò)Key獲取屬性
    <T> Attribute<T> attr(AttributeKey<T> key);
    // 判斷屬性是否存在
    <T> boolean hasAttr(AttributeKey<T> key);
}

ChannelInboundInvoker接口

實(shí)現(xiàn)ChannelInboundInvoker接口材义,表示節(jié)點(diǎn)可以用于傳播入站相關(guān)的事件。

public interface ChannelInboundInvoker {
    // 當(dāng)Channel注冊(cè)到EventLoop上時(shí)
    // 調(diào)用ChannelPipeline中下一個(gè)ChannelInboundHandler的channelRegistered(ChannelHandlerContext)方法 
    ChannelInboundInvoker fireChannelRegistered();
    
    // 當(dāng)Channel從EventLoop上取消注冊(cè)
    // 調(diào)用ChannelPipeline中下一個(gè)ChannelInboundHandler的channelUnregistered(ChannelHandlerContext)方法 
    ChannelInboundInvoker fireChannelUnregistered();
    
    // 當(dāng)Channel處理激活狀態(tài)嫁赏,意味著連接已經(jīng)建立
    // 調(diào)用ChannelPipeline中下一個(gè)ChannelInboundHandler的channelActive(ChannelHandlerContext)方法 
    ChannelInboundInvoker fireChannelActive();
    
    // 當(dāng)Channel處理失效狀態(tài)其掂,意味著連接已經(jīng)斷開
    // 調(diào)用ChannelPipeline中下一個(gè)ChannelInboundHandler的channelInactive(ChannelHandlerContext)方法 
    ChannelInboundInvoker fireChannelInactive();
    
    // 在pipeline中某個(gè)一個(gè)入站(inbound)操作出現(xiàn)了異常
    // 調(diào)用ChannelPipeline中下一個(gè)ChannelInboundHandler的exceptionCaught(ChannelHandlerContext)方法 
    ChannelInboundInvoker fireExceptionCaught(Throwable cause);
    
    // 收到用戶自定義的事件
    // 調(diào)用ChannelPipeline中下一個(gè)ChannelInboundHandler的userEventTriggered(ChannelHandlerContext)方法
    ChannelInboundInvoker fireUserEventTriggered(Object event);
    
    // Channel接收到了消息
    // 調(diào)用ChannelPipeline中下一個(gè)ChannelInboundHandler的channelRead(ChannelHandlerContext)方法
    ChannelInboundInvoker fireChannelRead(Object msg);
    
    // 調(diào)用ChannelPipeline中下一個(gè)ChannelInboundHandler的channelReadComplete(ChannelHandlerContext)方法
    ChannelInboundInvoker fireChannelReadComplete();
    
    // 調(diào)用ChannelPipeline中下一個(gè)ChannelInboundHandler的channelWritabilityChanged(ChannelHandlerContext)方法
    ChannelInboundInvoker fireChannelWritabilityChanged();
}

ChannelOutboundInvoker接口

實(shí)現(xiàn)ChannelOutboundInvoker接口,意味著節(jié)點(diǎn)可以用來(lái)處理出站相關(guān)的事件潦蝇。

public interface ChannelOutboundInvoker {
    
    // 將Channel綁定到一個(gè)本地地址款熬,這將調(diào)用ChannelPipeline中的下一個(gè)ChannelOutboundHandler的bind(ChannelHandlerContext, Socket- Address, ChannelPromise)方法
    ChannelFuture bind(SocketAddress localAddress);
    ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);

    // 將Channel連接到一個(gè)遠(yuǎn)程地址深寥,這將調(diào)用ChannelPipeline中的下一個(gè)ChannelOutboundHandler的connect(ChannelHandlerContext, Socket- Address, ChannelPromise)方法
    ChannelFuture connect(SocketAddress remoteAddress);
    ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);

    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);

    // 將Channel斷開連接。這將調(diào)用ChannelPipeline中的下一個(gè)ChannelOutbound- Handler的disconnect(ChannelHandlerContext, Channel Promise)方法
    ChannelFuture disconnect();
    ChannelFuture disconnect(ChannelPromise promise);

    // 將Channel關(guān)閉贤牛。這將調(diào)用ChannelPipeline中的下一個(gè)ChannelOutbound- Handler的close(ChannelHandlerContext, ChannelPromise)方法
    ChannelFuture close();
    ChannelFuture close(ChannelPromise promise);

    // 將Channel從它先前所分配的EventExecutor(即EventLoop)中注銷翩迈。這將調(diào)用ChannelPipeline中的下一個(gè)ChannelOutboundHandler的deregister (ChannelHandlerContext, ChannelPromise)方法
    ChannelFuture deregister();
    ChannelFuture deregister(ChannelPromise promise);
    
    // 請(qǐng)求從Channel中讀取更多的數(shù)據(jù)。這將調(diào)用ChannelPipeline中的下一個(gè)ChannelOutboundHandler的read(ChannelHandlerContext)方法
    ChannelOutboundInvoker read();
    
    // 將消息寫入Channel盔夜。這將調(diào)用ChannelPipeline中的下一個(gè)Channel- OutboundHandler的write(ChannelHandlerContext, Object msg, Channel- Promise)方法负饲。注意:這并不會(huì)將消息寫入底層的Socket,而只會(huì)將它放入隊(duì)列中喂链。要將它寫入Socket返十,需要調(diào)用flush()或者writeAndFlush()方法 
    ChannelFuture write(Object msg);
    ChannelFuture write(Object msg, ChannelPromise promise);
    
   // 沖刷Channel所有掛起的寫入。這將調(diào)用ChannelPipeline中的下一個(gè)Channel- OutboundHandler的flush(ChannelHandlerContext)方法
    ChannelOutboundInvoker flush();
    
    // 這是一個(gè)先調(diào)用write()方法再接著調(diào)用flush()方法的便利方法
    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
    ChannelFuture writeAndFlush(Object msg);

    ChannelPromise newPromise();

    ChannelProgressivePromise newProgressivePromise();

    ChannelFuture newSucceededFuture();

    ChannelFuture newFailedFuture(Throwable cause);

    ChannelPromise voidPromise();
}

TailContext & HeadContext

接下來(lái)椭微,我們看看Pipeline中的頭部與尾部節(jié)點(diǎn)洞坑。

TailContext節(jié)點(diǎn)

TailContext是尾部節(jié)點(diǎn),inbound類型蝇率,主要處理Pipeline中數(shù)據(jù)流的收尾工作迟杂。

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    
    TailContext(DefaultChannelPipeline pipeline) {
        // 調(diào)用AbstractChannelHandlerContext構(gòu)造器
        // TailContext是一個(gè)inbound(入站)節(jié)點(diǎn)
        super(pipeline, null, TAIL_NAME, true, false);
        // 設(shè)置添加完成
        setAddComplete();
    }
    
    // 返回Handler,就是它自身
    @Override
    public ChannelHandler handler() {
        return this;
    }
    
    ...
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        onUnhandledInboundException(cause);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        onUnhandledInboundMessage(msg);
    }
    
    ...
}

// 如果pipeline中有異常沒做處理本慕,最終會(huì)由TailContext打贏一個(gè)警告日志
protected void onUnhandledInboundException(Throwable cause) {
    try {
        logger.warn(
            "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
            "It usually means the last handler in the pipeline did not handle the exception.",
            cause);
    } finally {
        // 釋放對(duì)象
        ReferenceCountUtil.release(cause);
    }
}

// 如果pipeline中有read消息沒有處理排拷,最終會(huì)由TailContext打贏一個(gè)警告日志
protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
            "Discarded inbound message {} that reached at the tail of the pipeline. " +
            "Please check your pipeline configuration.", msg);
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

// 設(shè)置 ChannelHandlerContext 狀態(tài)為添加完成,狀態(tài)=2
final void setAddComplete() {
    for (;;) {
        int oldState = handlerState;
        if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
            return;
        }
    }
}

AbstractChannelHandlerContext

AbstractChannelHandlerContext 是 ChannelHandlerContext 的抽象實(shí)現(xiàn):

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
    
    ...
    
    // 下一個(gè)節(jié)點(diǎn)
    volatile AbstractChannelHandlerContext next;
    // 上一個(gè)節(jié)點(diǎn)
    volatile AbstractChannelHandlerContext prev;
    
    // 是否為inBound類型
    private final boolean inbound;
    // 是否為outbound類型
    private final boolean outbound;
    // 綁定的默認(rèn)pipeline
    private final DefaultChannelPipeline pipeline;
    // 節(jié)點(diǎn)名
    private final String name;
    private final boolean ordered;
    
    // Will be set to null if no child executor should be used, otherwise it will be set to the
    // child executor.
    final EventExecutor executor;
    
    ...

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {
        // 設(shè)置HandlerContext名稱
        this.name = ObjectUtil.checkNotNull(name, "name");
        // 綁定pipeline
        this.pipeline = pipeline;
        // 綁定executor(這里為null)
        this.executor = executor;
        // 如果節(jié)點(diǎn)為inbound類型就設(shè)置為true
        this.inbound = inbound;
        // 如果節(jié)點(diǎn)為outbound類型就設(shè)置為true
        this.outbound = outbound;
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

    ...
    
}

DefaultChannelHandlerContext

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
    
    private final ChannelHandler handler;
    
    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        // 調(diào)用 AbstractChannelHandlerContext 構(gòu)造函數(shù)
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }

    // 是否為inBound類型
    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }

    // 是否為outBound類型
    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }
}

HeadContext

HeadContext是頭部節(jié)點(diǎn)锅尘,outbound類型监氢,用于傳播事件和進(jìn)行一些底層socket操作。

final class HeadContext extends AbstractChannelHandlerContext
        implements ChannelOutboundHandler, ChannelInboundHandler {

    private final Unsafe unsafe;
    
    HeadContext(DefaultChannelPipeline pipeline) {
        // 調(diào)用AbstractChannelHandlerContext構(gòu)造器
        // HeadContext是一個(gè)outbound(出站)節(jié)點(diǎn)
        super(pipeline, null, HEAD_NAME, false, true);
        // 設(shè)置Unsafe對(duì)象
        unsafe = pipeline.channel().unsafe();
        // 設(shè)置添加完成
        setAddComplete();
    }
    
    // 返回ChannelHandler藤违,就只它自身
    @Override
    public ChannelHandler handler() {
        return this;
    }
    
    @Override
    public void bind(
            ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
            throws Exception {
        // 調(diào)用 unsafe 進(jìn)行bind操作    
        unsafe.bind(localAddress, promise);
    }
    
    @Override
    public void connect(
            ChannelHandlerContext ctx,
            SocketAddress remoteAddress, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
        // 調(diào)用 unsafe 進(jìn)行 connect 操作
        unsafe.connect(remoteAddress, localAddress, promise);
    }
    
    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        // 調(diào)用 unsafe 進(jìn)行 disconnect 操作
        unsafe.disconnect(promise);
    }
    
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        // 調(diào)用 unsafe 進(jìn)行 close 操作
        unsafe.close(promise);
    }
    
    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        // 調(diào)用 unsafe 進(jìn)行 deregister 操作
        unsafe.deregister(promise);
    }
    
    @Override
    public void read(ChannelHandlerContext ctx) {
      // 調(diào)用 unsafe 進(jìn)行 read 操作
        unsafe.beginRead();
    }
    
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        // 調(diào)用 unsafe 進(jìn)行 write 操作
        unsafe.write(msg, promise);
    }
    
    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        // 調(diào)用 unsafe 進(jìn)行 flush 操作
        unsafe.flush();
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 傳播ExceptionCaught事件
        ctx.fireExceptionCaught(cause);
    }
    
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        invokeHandlerAddedIfNeeded();
        // 傳播channelRegistered事件
        ctx.fireChannelRegistered();
    }
    
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        // 傳播channelUnregistered事件
        ctx.fireChannelUnregistered();

        // Remove all handlers sequentially if channel is closed and unregistered.
        if (!channel.isOpen()) {
            destroy();
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 傳播 channelActive 事件
        ctx.fireChannelActive();
        // 在 https://wangwei.one/posts/netty-channel-source-analyse.html 中分析過(guò)了
        // 主要是在channel激活之后浪腐,向底層的selector注冊(cè)一個(gè)SelectionKey.OP_ACCEPT監(jiān)聽事件
        // 這樣channel在連接之后,就可以監(jiān)聽到一個(gè)read事件
        readIfIsAutoRead();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 傳播 channelInactive 事件
        ctx.fireChannelInactive();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 傳播 channelRead 事件
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 傳播 channelReadComplete 事件
        ctx.fireChannelReadComplete();
        // 
        readIfIsAutoRead();
    }

    private void readIfIsAutoRead() {
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // 傳播 userEventTriggered 事件
        ctx.fireUserEventTriggered(evt);
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        // 傳播 channelWritabilityChanged 事件
        ctx.fireChannelWritabilityChanged();
    }
}

Pipeline 節(jié)點(diǎn)添加

上面我們分析了Pipeline的基本結(jié)構(gòu)顿乒,接下來(lái)我們看看Pipeline添加節(jié)點(diǎn)(也就是Handler處理器)的過(guò)程议街。該過(guò)程主要分為三步:

  • 判斷是否重復(fù)添加
  • 創(chuàng)建節(jié)點(diǎn)并添加至鏈表
  • 回調(diào)添加完成事件

以這段常見的代碼為例:

ServerBootstrap b = new ServerBootstrap();
b.group(group)
        .channel(NioServerSocketChannel.class)
        .localAddress(new InetSocketAddress(port))
        .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
        .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                // 添加 serverHandler
                ch.pipeline().addLast(serverHandler);
            }
        });
ChannelFuture f = b.bind().sync();

我們從 ChannelPipeline.addLast() 方法進(jìn)去:

public class DefaultChannelPipeline implements ChannelPipeline {
    
    ...
    
    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }
        // 循環(huán)處理
        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }
        return this;
    }
    
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 檢查是否重復(fù)
            checkMultiplicity(handler);
            // 創(chuàng)建新節(jié)點(diǎn)
            newCtx = newContext(group, filterName(name, handler), handler);
            // 添加新節(jié)點(diǎn)
            addLast0(newCtx);
            
            // 如果 registered 為 false,則表示這個(gè)channel還未注冊(cè)到EventLoop上.
            // 在這種情況下,我們添加一個(gè)Task到PendingHandlerCallback中璧榄,
            // 等到這個(gè)channel注冊(cè)成功之后特漩,將會(huì)調(diào)用立即調(diào)用 ChannelHandler.handlerAdded(...) 方法,已達(dá)到channel添加的目的
            if (!registered) {
                // 設(shè)置為待添加狀態(tài)
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            
            // 獲取executor
            EventExecutor executor = newCtx.executor();

            if (!executor.inEventLoop()) {
                // 設(shè)置為待添加狀態(tài)
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 回調(diào)添加完成事件
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        // 回調(diào)添加完成事件
        callHandlerAdded0(newCtx);
        return this;
    }

    // 檢查是否重復(fù)
    private static void checkMultiplicity(ChannelHandler handler) {
        // handler是否為ChannelHandlerAdapter類型犹菱,不是則不做處理
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            // 判斷handler是否添加了Sharable注解 && 是否添加過(guò)了
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                    h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }

    // 創(chuàng)建新的節(jié)點(diǎn)
    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        // 調(diào)用DefaultChannelHandlerContext的構(gòu)造函數(shù)
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

    // 在tail節(jié)點(diǎn)之前添加新節(jié)點(diǎn)
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
    
    // 回調(diào)ChannelHandler中的handlerAdded方法
    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            // 我們必須在handlerAdded方法之前調(diào)用setAddComplete方法拾稳。否則的話吮炕,一旦handlerAdded方法產(chǎn)生了任何pipeline事件腊脱,由于狀態(tài)的緣故,ctx.handler()將會(huì)丟失這些事件的處理龙亲。
            // 設(shè)置新節(jié)點(diǎn)的狀態(tài)為添加完成狀態(tài)
            ctx.setAddComplete();
            // 調(diào)用handlerAdded接口
            ctx.handler().handlerAdded(ctx);
        } catch (Throwable t) {
            ...
            // 如果添加失敗陕凹,則刪除新節(jié)點(diǎn)    
            remove0(ctx);
            ...
        }
    }
    
    ...

}

我們來(lái)看下setAddComplete()方法:

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
     
    ...
    
    // 通過(guò)自旋操作悍抑,設(shè)置狀態(tài)為ADD_COMPLETE
    final void setAddComplete() {
        for (;;) {
            int oldState = handlerState;
            // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
            // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
            // exposing ordering guarantees.
            if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
                return;
            }
        }
    }
    
    ...
    
    // 設(shè)置為 ADD_PENDING 狀態(tài)
    final void setAddPending() {
        boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
        assert updated; 
        // This should always be true as it MUST be called before setAddComplete() or setRemoved().
    }
    
    ...
}

回調(diào)用戶自定義Handler中的handlerAdded方法:

@Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {
    
    ...
       
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.printf("ServerHandler added ....");
    }
 
    ...
    
}    

ChannelInitializer

關(guān)于回調(diào)ChannelHandler中的handlerAdded()方法,最常見的一個(gè)場(chǎng)景就是杜耙,使用 ChannelInitializer 來(lái)添加我們自定義的ChannelHandler搜骡。ChannelInitializer被添加完成之后,會(huì)回調(diào)到它的 initChannel 方法佑女。

接下來(lái)记靡,我們看看 ChannelInitializer 這個(gè)類,它是一個(gè)特殊的ChannelInboundHandler团驱,它提供了一種在Channel注冊(cè)到EventLoop后初始化Channel的簡(jiǎn)便方法摸吠。

@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

   private final ConcurrentMap<ChannelHandlerContext, Boolean> initMap = PlatformDependent.newConcurrentHashMap();
   
   /**
    * 當(dāng) ch 注冊(cè)成功之后,該方法就會(huì)被調(diào)用嚎花,該方法結(jié)束返回之后寸痢,此ChannelInitializer實(shí)例將會(huì)從Channel所綁定的ChannelPipeline中移除
    * 
    * @param ch 所注冊(cè)的Channel
    * 
    */
   protected abstract void initChannel(C ch) throws Exception;
   
   ...
   
   // ChannelInitializer 添加成功后,會(huì)回調(diào)到handlerAdded()接口
   @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            // This should always be true with our current DefaultChannelPipeline implementation.
            // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
            // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
            // will be added in the expected order.
            initChannel(ctx);
        }
    }  
    
    @SuppressWarnings("unchecked")
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        // 標(biāo)記ctx為true,且之前沒有標(biāo)記過(guò)紊选。防止重復(fù)執(zhí)行
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
            try {
                // 調(diào)用initChannel方法
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
                // 最終會(huì)刪除 ChannelInitializer 實(shí)例
                remove(ctx);
            }
            return true;
        }
        return false;
    }

    // 刪除 ChannelInitializer 實(shí)例
    private void remove(ChannelHandlerContext ctx) {
        try {
            // 獲取 Pipeline
            ChannelPipeline pipeline = ctx.pipeline();
            // 從 Pipeline 中返回 ChannelInitializer 實(shí)例
            if (pipeline.context(this) != null) {
                // 刪除 ChannelInitializer 實(shí)例
                // 刪除邏輯請(qǐng)看下一小節(jié)
                pipeline.remove(this);
            }
        } finally {
            initMap.remove(ctx);
        }
    }

}

遍歷 ChannelHandlerContext 節(jié)點(diǎn)查詢出ChannelHandler實(shí)例

public class DefaultChannelPipeline implements ChannelPipeline {

    ...
    
    // 通過(guò)handler獲取ChannelHandlerContext
    @Override
    public final ChannelHandlerContext context(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }
                
        AbstractChannelHandlerContext ctx = head.next;
        for (;;) {

            if (ctx == null) {
                return null;
            }

            if (ctx.handler() == handler) {
                return ctx;
            }
            ctx = ctx.next;
        }
    }
    
    ...
    
}    

Pipeline中除了addLast方法外啼止, 還有addFirst、addBefore兵罢、addAfter等方法献烦,邏輯類似,可以自行研究學(xué)習(xí)卖词。

Pipeline 節(jié)點(diǎn)刪除

上面仿荆,我們講了Pipeline節(jié)點(diǎn)的添加,這小結(jié)我們看看Pipeline節(jié)點(diǎn)的刪除功能坏平。

netty 有個(gè)最大的特性之一就是Handler可插拔拢操,做到動(dòng)態(tài)編織pipeline,比如在首次建立連接的時(shí)候舶替,需要通過(guò)進(jìn)行權(quán)限認(rèn)證令境,在認(rèn)證通過(guò)之后,就可以將此context移除顾瞪,下次pipeline在傳播事件的時(shí)候就就不會(huì)調(diào)用到權(quán)限認(rèn)證處理器舔庶。

下面是權(quán)限認(rèn)證Handler最簡(jiǎn)單的實(shí)現(xiàn),第一個(gè)數(shù)據(jù)包傳來(lái)的是認(rèn)證信息陈醒,如果校驗(yàn)通過(guò)惕橙,就刪除此Handler,否則钉跷,直接關(guān)閉連接

// 鑒權(quán)Handler
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
    ...
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
        if (verify(authDataPacket)) {
            ctx.pipeline().remove(this);
        } else {
            ctx.close();
        }
    }

    private boolean verify(ByteBuf byteBuf) {
        //...
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("AuthHandler has been removed ! ");
    }
}

我們來(lái)看看 DefaultChannelPipeline 中的 remove 方法:

public class DefaultChannelPipeline implements ChannelPipeline {
    
    ...
    
    // 從Pipeline中刪除ChannelHandler
    @Override
    public final ChannelPipeline remove(ChannelHandler handler) {
        remove(getContextOrDie(handler));
        return this;
    }
    
    ...
    
    // 獲取 ChannelHandler 弥鹦,獲取不到就拋出異常
    private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
        AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
        if (ctx == null) {
            throw new NoSuchElementException(handler.getClass().getName());
        } else {
            return ctx;
        }
    }
    
    ...
    
    // 刪除
    private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
        // ctx不能為heand與tail
        assert ctx != head && ctx != tail;

        synchronized (this) {
            // 從pipeline中刪除ChannelHandlerContext節(jié)點(diǎn)
            remove0(ctx);
            
            // 如果為false,則表明channel還沒有注冊(cè)到eventloop上
            // 在刪除這種場(chǎng)景下,我們先添加一個(gè)Task彬坏,一旦channel注冊(cè)成功就會(huì)調(diào)用這個(gè)Task朦促,這個(gè)Task就會(huì)立即調(diào)用ChannelHandler.handlerRemoved(...)方法,來(lái)從pipeline中刪除context栓始。
            if (!registered) {
                callHandlerCallbackLater(ctx, false);
                return ctx;
            }

            EventExecutor executor = ctx.executor();
            if (!executor.inEventLoop()) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 回調(diào) handlerRemoved 方法
                        callHandlerRemoved0(ctx);
                    }
                });
                return ctx;
            }
        }
        // 回調(diào) handlerRemoved 方法
        callHandlerRemoved0(ctx);
        return ctx;
    }
    
    ...
 
    // 刪除節(jié)點(diǎn) ChannelHandlerContext
    private static void remove0(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;
        AbstractChannelHandlerContext next = ctx.next;
        prev.next = next;
        next.prev = prev;
    }
    
    ...
        
    private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
        // Notify the complete removal.
        try {
            try {
                // 回調(diào) handlerRemoved 方法
                // 也就是我們前面例子 AuthHandler 中的 handlerRemoved() 方法
                ctx.handler().handlerRemoved(ctx);
            } finally {
                // 設(shè)置為ctx 狀態(tài)為 REMOVE_COMPLETE 
                ctx.setRemoved();
            }
        } catch (Throwable t) {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
        }
    }
    
    ...
    
}    

好了务冕, 刪除的邏輯就分析到這里了。

小結(jié)

這一講我們分析了Pipeline的創(chuàng)建過(guò)程幻赚,了解Pipeline中的鏈表結(jié)構(gòu)以及每個(gè)節(jié)點(diǎn)的數(shù)據(jù)結(jié)構(gòu)禀忆。還分析了Pipeline是如何添加節(jié)點(diǎn)的,又是如何刪除節(jié)點(diǎn)的落恼。接下來(lái) 油湖,我們會(huì)分析Pipeline如何進(jìn)行事件傳播的。

參考資料

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末领跛,一起剝皮案震驚了整個(gè)濱河市乏德,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌吠昭,老刑警劉巖喊括,帶你破解...
    沈念sama閱讀 221,635評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異矢棚,居然都是意外死亡郑什,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門蒲肋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)蘑拯,“玉大人,你說(shuō)我怎么就攤上這事兜粘∩昃剑” “怎么了?”我有些...
    開封第一講書人閱讀 168,083評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵孔轴,是天一觀的道長(zhǎng)剃法。 經(jīng)常有香客問(wèn)我,道長(zhǎng)路鹰,這世上最難降的妖魔是什么贷洲? 我笑而不...
    開封第一講書人閱讀 59,640評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮晋柱,結(jié)果婚禮上优构,老公的妹妹穿的比我還像新娘。我一直安慰自己雁竞,他們只是感情好钦椭,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,640評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般玉凯。 火紅的嫁衣襯著肌膚如雪势腮。 梳的紋絲不亂的頭發(fā)上联贩,一...
    開封第一講書人閱讀 52,262評(píng)論 1 308
  • 那天漫仆,我揣著相機(jī)與錄音,去河邊找鬼泪幌。 笑死盲厌,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的祸泪。 我是一名探鬼主播吗浩,決...
    沈念sama閱讀 40,833評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼没隘!你這毒婦竟也來(lái)了懂扼?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,736評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤右蒲,失蹤者是張志新(化名)和其女友劉穎阀湿,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體瑰妄,經(jīng)...
    沈念sama閱讀 46,280評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡陷嘴,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,369評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了间坐。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片灾挨。...
    茶點(diǎn)故事閱讀 40,503評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖竹宋,靈堂內(nèi)的尸體忽然破棺而出劳澄,到底是詐尸還是另有隱情,我是刑警寧澤蜈七,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布浴骂,位于F島的核電站,受9級(jí)特大地震影響宪潮,放射性物質(zhì)發(fā)生泄漏溯警。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,870評(píng)論 3 333
  • 文/蒙蒙 一狡相、第九天 我趴在偏房一處隱蔽的房頂上張望梯轻。 院中可真熱鬧,春花似錦尽棕、人聲如沸喳挑。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)伊诵。三九已至单绑,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間曹宴,已是汗流浹背搂橙。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留笛坦,地道東北人区转。 一個(gè)月前我還...
    沈念sama閱讀 48,909評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像版扩,于是被迫代替她去往敵國(guó)和親废离。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,512評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容