Netty源碼筆記之ChannelPipeline

ChannelPipeline是什么

ChannelPipelineChannelHandler的列表贷揽,用于攔截或處理Channel的入站事件和出站事件操作袜硫。ChannelPipeline實(shí)現(xiàn)了攔截過濾器模式衍慎,讓用戶完全控制如何處理事件以及管道中的ChannelHandler如何相互交互。

每個(gè)Channel都有各自的ChannelPipeline纱兑,并且一個(gè)新的Channel被創(chuàng)建時(shí)該ChannelPipeline同時(shí)也會(huì)被創(chuàng)建因俐。

事件是如何在Pipeline中流動(dòng)的

下圖描述了ChannelHandlerChannelPipeline中如何處理I/O事件榴嗅。I/O事件由ChannelInboundHandlerChannelOutboundHandler處理妄呕,并通過調(diào)用ChannelHandlerContext中定義的事件傳播方法(例如ChannelHandlerContext#fireChannelRead(Object)ChannelHandlerContext#write(Object)中定義的事件傳播方法轉(zhuǎn)發(fā)給最近的處理程序。

ChannelPipeline事件流動(dòng)

Channel與ChannelPipeline的關(guān)系

我們已經(jīng)知道當(dāng)在創(chuàng)建Channel時(shí)并會(huì)自動(dòng)創(chuàng)建對應(yīng)的ChannelPipeline嗽测,所以它們就是一對一的關(guān)系绪励,如下圖所示:

Channel與Pipeline

在創(chuàng)建Channel時(shí)會(huì)創(chuàng)建對應(yīng)的ChannelPipeline,而在創(chuàng)建ChannelPipeline時(shí)又會(huì)創(chuàng)建對應(yīng)的TailContextHeadContext,然后將其構(gòu)造成雙向鏈表的節(jié)點(diǎn)唠粥。

接下來我們來具體分析下ChannelPipeline的實(shí)現(xiàn)過程疏魏。

ChannelPipeline的實(shí)現(xiàn)

在前面分析BootstrapServerBootstrap的章節(jié)中我們知道在AbstractChannel構(gòu)造方法中會(huì)實(shí)例化ChannelPipeline,所以下面我們一步一步來進(jìn)行分析晤愧。

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    //實(shí)例化ChannelPipeline的操作
    pipeline = newChannelPipeline();
}

這里我們只需關(guān)注newChannelPipeline方法即可大莫,如下:

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

該方法直接將當(dāng)前Channel對象傳遞給構(gòu)造函數(shù)并實(shí)例化了DefaultChannelPipeline對象,我們先來看看該類的一個(gè)類結(jié)構(gòu)圖:

DefaultChannelPipeline類圖

我們來分析下DefaultChannelPipeline對應(yīng)的父類接口ChannelInboundInvoker官份,該接口主要定義了入站事件方法只厘,如下所示:

 /** 
 * 入站事件傳播方法:
 * ChannelHandlerContext#fireChannelRegistered()
 * ChannelHandlerContext#fireChannelActive()
 * ChannelHandlerContext#fireChannelRead(Object)
 * ChannelHandlerContext#fireChannelReadComplete()
 * ChannelHandlerContext#fireExceptionCaught(Throwable)
 * ChannelHandlerContext#fireUserEventTriggered(Object)
 * ChannelHandlerContext#fireChannelWritabilityChanged()
 * ChannelHandlerContext#fireChannelInactive()
 * ChannelHandlerContext#fireChannelUnregistered()
 */
     /**
     * Channel已注冊到EventLoop
     */
    ChannelInboundInvoker fireChannelRegistered();

    /**
     * Channel已從其EventLoop中取消注冊
     */
    ChannelInboundInvoker fireChannelUnregistered();

    /**
     * Channel當(dāng)前處于活動(dòng)狀態(tài),表示已連接
     */
    ChannelInboundInvoker fireChannelActive();

    /**
     * Channel當(dāng)前處于非活動(dòng)狀態(tài)舅巷,表示連接已關(guān)閉
     */
    ChannelInboundInvoker fireChannelInactive();

    /**
     * Channel在入站操作中收到了異常
     */
    ChannelInboundInvoker fireExceptionCaught(Throwable cause);

    /**
     * Channel接收到用戶自定義事件
     */
    ChannelInboundInvoker fireUserEventTriggered(Object event);

    /**
     * Channel接收到一條消息
     */
    ChannelInboundInvoker fireChannelRead(Object msg);

    /**
     * 觸發(fā)ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)事件到ChannelPipeline中的下一個(gè)ChannelInboundHandler事件
     */
    ChannelInboundInvoker fireChannelReadComplete();

    /**
     * 觸發(fā)ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)事件到ChannelPipeline中的下一個(gè)ChannelInboundHandler
     */
    ChannelInboundInvoker fireChannelWritabilityChanged();

父類接口ChannelOutboundInvoker主要定義了出站事件方法羔味,如下所示:

/**
 * 出站事件傳播方法:
 * ChannelHandlerContext#bind(SocketAddress, ChannelPromise)
 * ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)
 * ChannelHandlerContext#write(Object, ChannelPromise)
 * ChannelHandlerContext#flush()
 * ChannelHandlerContext#read()
 * ChannelHandlerContext#disconnect(ChannelPromise)
 * ChannelHandlerContext#close(ChannelPromise)
 * ChannelHandlerContext#deregister(ChannelPromise)
 */
    /**
     * 請求綁定給定的SocketAddress一旦操作完成并通知ChannelFuture,要么操作成功或者錯(cuò)誤
     */
    ChannelFuture bind(SocketAddress localAddress);

    /**
     * 請求連接給定的SocketAddress并在操作完成后通知ChannelFuture钠右,要么操作成功或者錯(cuò)誤
     */
    ChannelFuture connect(SocketAddress remoteAddress);

    /**
     * 請求連接給定的SocketAddress赋元,同時(shí)綁定到localAddress并在操作完成后通知ChannelFuture,要么操作成功或者錯(cuò)誤
     */
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);

    /**
     * 請求與遠(yuǎn)程節(jié)點(diǎn)斷開連接并在操作完成后通知ChannelFuture(要么操作成功要么因?yàn)殄e(cuò)誤)
     */
    ChannelFuture disconnect();

    /**
     * 請求關(guān)閉Channel并在操作完成后通知ChannelFuture,關(guān)閉后不能再重復(fù)使用
     */
    ChannelFuture close();

    /**
     * 請求從先前分配的EventExecutor中注銷,并在操作完成后通知ChannelFuture
     */
    ChannelFuture deregister();

    /**
     * 請求綁定給定的SocketAddress并在操作完成后通知給定的ChannelPromise
     */
    ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);

    /**
     * 請求連接給定的SocketAddress并在操作完成后通知給定的ChannelPromise
     */
    ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);

    /**
     * 請求連接給定的SocketAddress同時(shí)綁定到localAddress搁凸,并在操作完成后通知ChannelPromise
     */
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);

    /**
     * 請求斷開遠(yuǎn)程節(jié)點(diǎn)的連接并在操作完成后通知給定的ChannelPromise
     */
    ChannelFuture disconnect(ChannelPromise promise);

    /**
     * 請求關(guān)閉Channel并在操作完成后通知給定的ChannelPromise,該Channel關(guān)閉后不可復(fù)用
     */
    ChannelFuture close(ChannelPromise promise);

    /**
     * 請求從先前分配的EventExecutor中注銷媚值,并在操作完成后通知給定的ChannelPromise
     */
    ChannelFuture deregister(ChannelPromise promise);

    /**
     * 請求從Channel讀取數(shù)據(jù)到第一個(gè)入站緩沖區(qū),如果讀取了數(shù)據(jù)护糖,則觸發(fā)ChannelInboundHandler#channelRead(ChannelHandlerContext,Object)事件杂腰,并觸發(fā)ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)事件,以便處理程序可以決定是否繼續(xù)讀取椅文。如果已經(jīng)有一個(gè)待處理的讀取操作,則此方法不執(zhí)行任何操作惜颇。
     */
    ChannelOutboundInvoker read();

    /**
     * 通過ChannelPipeline請求通過ChannelHandler寫入消息皆刺。該方法不會(huì)執(zhí)行實(shí)際寫出消息,所以要確保執(zhí)行#flush()方法將所有未寫出的消息刷新到實(shí)際傳輸中凌摄。
     */
    ChannelFuture write(Object msg);

    /**
     * 通過ChannelPipeline請求通過ChannelHandlerContext寫出消息羡蛾。該方法不會(huì)執(zhí)行實(shí)際寫出消息,所以要確保執(zhí)行#flush()方法將所有未寫出的消息刷新到實(shí)際傳輸中锨亏。
     */
    ChannelFuture write(Object msg, ChannelPromise promise);

    /**
     * 請求通過ChannelOutboundInvoker刷新所有等待的消息
     */
    ChannelOutboundInvoker flush();

    /**
     * 簡化調(diào)用##write(Object, ChannelPromise)和#flush()
     */
    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);

    /**
     * 簡化調(diào)用#write(Object)和#flush()
     */
    ChannelFuture writeAndFlush(Object msg);

    /**
     * 返回一個(gè)新的ChannelPromise
     */
    ChannelPromise newPromise();

    /**
 
     * 返回一個(gè)新的ChannelProgressPromise
     */
    ChannelProgressivePromise newProgressivePromise();

    /**
     * 創(chuàng)建一個(gè)新的ChannelFuture并將其標(biāo)記為已成功痴怨。因此ChannelFuture#isSuccess()返回為true。添加到其中的所有FutureListener將直接受到通知器予。通用每次調(diào)用阻塞方法都將返回而不會(huì)阻塞
     */
    ChannelFuture newSucceededFuture();

    /**
     * 創(chuàng)建一個(gè)新的ChannelFuture并將其標(biāo)記為已失敗浪藻。因此ChannelFuture#isSuccess()返回為false。添加到其中的所有FutureListener將直接受到通知乾翔。通用每次調(diào)用阻塞方法都將返回而不會(huì)阻塞
     */
    ChannelFuture newFailedFuture(Throwable cause);

    /**
     * 返回一個(gè)特殊的ChannelPromise爱葵,可以將其重復(fù)用于不同的操作
     */
    ChannelPromise voidPromise();

ChannelPipeline接口主要定義了一些ChannelHandler的一些操作,如下所示:

public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

    ChannelPipeline addFirst(String name, ChannelHandler handler);

    ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);

    ChannelPipeline addLast(String name, ChannelHandler handler);

    ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);

    ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);

    ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

 
    ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);

    ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

    ChannelPipeline addFirst(ChannelHandler... handlers);

    ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);

    ChannelPipeline addLast(ChannelHandler... handlers);

    ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);

    ChannelPipeline remove(ChannelHandler handler);

    ChannelHandler remove(String name);

    <T extends ChannelHandler> T remove(Class<T> handlerType);

    ChannelHandler removeFirst();

    ChannelHandler removeLast();

    ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);

    ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);

    <T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName,
                                         ChannelHandler newHandler);

    ChannelHandler first();

    ChannelHandlerContext firstContext();
    
    ChannelHandler last();

    ChannelHandlerContext lastContext();

    ChannelHandler get(String name);

    <T extends ChannelHandler> T get(Class<T> handlerType);

    ChannelHandlerContext context(ChannelHandler handler);

    ChannelHandlerContext context(String name);

    ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);

    Channel channel();

    List<String> names();

    Map<String, ChannelHandler> toMap();
}

ChannelPipeline接口主要定義了一些常用添加ChannelHandler的操作反浓,這里就不過講解了萌丈。

講完了ChannelPipeline的繼承接口之后,我們來看看具體實(shí)現(xiàn)DefaultChannelPipeline的實(shí)現(xiàn)雷则。

DefaultChannelPipeline的構(gòu)造方法如下:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

這里先將Channel進(jìn)行保存辆雾,然后分別實(shí)例化了TailContextHeadContext,因?yàn)檫@2個(gè)比較重要,所以來研究下其具體實(shí)現(xiàn)月劈,這里先來看看TailContext的類圖度迂。

TailContext

由上圖可知,TailContext繼承了AbstractChannelHandlerContext并且實(shí)現(xiàn)了ChannelInboundHandlerHandler接口艺栈,所以本質(zhì)上TailContext既是一個(gè)標(biāo)準(zhǔn)的Handler也是一個(gè)HandlerContext英岭, 接下來看下其源碼:

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    TailContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, TAIL_NAME, TailContext.class);
        setAddComplete();
    }
}

這里構(gòu)造方法中調(diào)用了父類AbstractChannelHandlerContext的構(gòu)造方法,進(jìn)來看看:

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.executionMask = mask(handlerClass);
    // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}

因?yàn)檫@里都是賦值操作湿右,我們來看看#mask()方法是如何計(jì)算得出executionMask的诅妹。

這里說明一下executionMask是干什么的,早在之前Netty的版本中是用instanceof來判斷是inbound事件還是outbound事件,這樣判斷的結(jié)果是比較暴力的吭狡,因?yàn)槲覀內(nèi)绻x了一個(gè)inbound處理程序尖殃,但是我這個(gè)程序只希望處理我想處理的事件,因?yàn)槭褂?code>instanceof判斷的方式使我們不得不處理我們不想處理的事件划煮,所以在一定的程度上加大了耦合程度送丰,在新版Netty中采用了位運(yùn)算來判斷,并且粒度更細(xì)(方法級別)弛秋,使得我們不必關(guān)系我們不關(guān)系的事件器躏,使得Handler更加靈活。

接下來我們看下ChannelHandlerMask類定義的粒度:

static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
static final int MASK_BIND = 1 << 9;
static final int MASK_CONNECT = 1 << 10;
static final int MASK_DISCONNECT = 1 << 11;
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;

static final int MASK_ONLY_INBOUND =  MASK_CHANNEL_REGISTERED |
        MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
        MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
static final int MASK_ONLY_OUTBOUND =  MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
        MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;

因?yàn)閑xecutionMask是通過mask方法計(jì)算得出蟹略,這里跟進(jìn)mask方法:

static int mask(Class<? extends ChannelHandler> clazz) {
    // Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast
    // lookup in the future.
    Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
    Integer mask = cache.get(clazz);
    if (mask == null) {
        mask = mask0(clazz);
        cache.put(clazz, mask);
    }
    return mask;
}

mask方法中做了一層緩存處理登失,這里我們直接跟進(jìn)mask0()方法是如何計(jì)算得出的:

private static int mask0(Class<? extends ChannelHandler> handlerType) {
    int mask = MASK_EXCEPTION_CAUGHT;
    try {
        if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
            mask |= MASK_ALL_INBOUND;

            if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_REGISTERED;
            }
            if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_UNREGISTERED;
            }
            if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_ACTIVE;
            }
            if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_INACTIVE;
            }
            if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
                mask &= ~MASK_CHANNEL_READ;
            }
            if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_READ_COMPLETE;
            }
            if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
            }
            if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
                mask &= ~MASK_USER_EVENT_TRIGGERED;
            }
        }

        if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
            mask |= MASK_ALL_OUTBOUND;

            if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
                    SocketAddress.class, ChannelPromise.class)) {
                mask &= ~MASK_BIND;
            }
            if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
                    SocketAddress.class, ChannelPromise.class)) {
                mask &= ~MASK_CONNECT;
            }
            if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
                mask &= ~MASK_DISCONNECT;
            }
            if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
                mask &= ~MASK_CLOSE;
            }
            if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
                mask &= ~MASK_DEREGISTER;
            }
            if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
                mask &= ~MASK_READ;
            }
            if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
                    Object.class, ChannelPromise.class)) {
                mask &= ~MASK_WRITE;
            }
            if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
                mask &= ~MASK_FLUSH;
            }
        }

        if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
            mask &= ~MASK_EXCEPTION_CAUGHT;
        }
    } catch (Exception e) {
        // Should never reach here.
        PlatformDependent.throwException(e);
    }

    return mask;
}

我們先來看下isSkippable方法的實(shí)現(xiàn),最后再來說明該方法:

private static boolean isSkippable(final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
    return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
        @Override
        public Boolean run() throws Exception {
            Method m;
            try {
                m = handlerType.getMethod(methodName, paramTypes);
            } catch (NoSuchMethodException e) {
                return false;
            }
            return m != null && m.isAnnotationPresent(Skip.class);
        }
    });
}

該方法用于判斷給定的Class對象是否有給定的方法或者是否有對應(yīng)Skip注解挖炬,如果有則返回true,否則返回false揽浙。

再回到mask0()方法,該方法中意敛,首先mask=MASK_EXCEPTION_CAUGHT馅巷,然后判斷給定的Handler是屬于ChannelInboundHandler還是ChannelOutboundHandler,進(jìn)而再判斷對應(yīng)的事件方法草姻,如下整理了其大致結(jié)構(gòu):

InboundHandler事件:

  • MASK_EXCEPTION_CAUGHT
  • MASK_CHANNEL_REGISTERED
  • MASK_CHANNEL_UNREGISTERED
  • MASK_CHANNEL_ACTIVE
  • MASK_CHANNEL_INACTIVE
  • MASK_CHANNEL_READ
  • MASK_CHANNEL_READ_COMPLETE
  • MASK_CHANNEL_WRITABILITY_CHANGED
  • MASK_USER_EVENT_TRIGGERED

OutboundHandler事件:

  • MASK_EXCEPTION_CAUGHT
  • MASK_BIND
  • MASK_CONNECT
  • MASK_DISCONNECT
  • MASK_CLOSE
  • MASK_DEREGISTER
  • MASK_READ
  • MASK_WRITE
  • MASK_FLUSH

也就是如果給定的Handler類型為inbound則該mask默認(rèn)處理所有對應(yīng)的inbound事件钓猬,然后通過isSkippable方法來判斷該handler是否有處理該事件的方法或者該方法是否有@Skip注解,如果存在該條件則從該mask中移除該事件撩独,表示后面ChannelPipeline通過位運(yùn)算查找對應(yīng)事件處理的Handler時(shí)逗噩,該Handler默認(rèn)會(huì)將其過濾掉,自然也不會(huì)觸發(fā)該事件了跌榔。

TailContext中异雁,因?yàn)閷?shí)現(xiàn)了ChannelInboundHandler,執(zhí)行結(jié)果如下,所以最終計(jì)算得出的executionMask為511僧须。

int mask = MASK_EXCEPTION_CAUGHT; // mask = 1
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
    mask |= MASK_ALL_INBOUND; // mask=511
}
//bit
// 1 |= 511
// 0000 0000 0001
//|0001 1111 1111
//---------------
// 0001 1111 1111
//mask = 511     

我們再回到TailContext的構(gòu)造函數(shù)中纲刀,接下來執(zhí)行了一個(gè)setAddComplete()方法,如下所示:

final boolean setAddComplete() {
    for (;;) {
        int oldState = handlerState;
        if (oldState == REMOVE_COMPLETE) {
            return false;
        }
        // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
        // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
        // exposing ordering guarantees.
        if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
            return true;
        }
    }
}

該方法采用CAS的方式改變Handler狀態(tài)為已添加完成担平。

到此為止示绊,TailContext的工作到這里就完成了,接下里我們看看它的兄弟HeadContext暂论。

HeadContext

HeadContextTailContext的不同之處在于該對象同時(shí)實(shí)現(xiàn)了ChannelInboundHandlerChannelOutboundHandler面褐,同時(shí)也繼承了AbstractHandlerContext,一起來看下類結(jié)構(gòu)圖取胎。

HeadContext

接下來看看其代碼實(shí)現(xiàn):

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

    private final Unsafe unsafe;

    HeadContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, HEAD_NAME, HeadContext.class);
        unsafe = pipeline.channel().unsafe();
        setAddComplete();
    }
}

該對象與TailContext大致相同展哭,所以就不具體講解了湃窍,不同之處該對象引用了一個(gè)Unsafe的引用。因?yàn)?code>HeadContext同時(shí)實(shí)現(xiàn)了ChannelInboundHandlerChannelOutboundHandler匪傍,所以其executionMask=131071您市。

我們再回到DefaultChannelPipeline構(gòu)造函數(shù)中,如下:

protected DefaultChannelPipeline(Channel channel) {
    //...
    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

在完成了TailContextHeadContext實(shí)例化后役衡,此時(shí)tail和head就分別指向了不同的AbstractHandlerContext茵休,最終通過head.next=tail和tail.prev = head進(jìn)行鏈表關(guān)聯(lián),如下所示手蝎。

節(jié)點(diǎn)關(guān)聯(lián)

ChannelHandler是如何添加的

當(dāng)我們要添加一個(gè)Handler的時(shí)候榕莺,內(nèi)部是如何實(shí)現(xiàn)的呢,接下來使用addLast方法來分析其實(shí)現(xiàn)棵介。

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);

        newCtx = newContext(group, filterName(name, handler), handler);

        addLast0(newCtx);

        // If the registered is false it means that the channel was not registered on an eventLoop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

當(dāng)我們調(diào)用ChannelPipeline添加一個(gè)Handler時(shí)帽撑,我們假設(shè)調(diào)用的是addLast方法,首先會(huì)調(diào)用checkMultiplicity方法來檢測是否重復(fù)添加鞍时,如下所示:

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException(
                    h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        h.added = true;
    }
}

該方法判斷Handler是否重復(fù)添加,判斷條件為Handler的added字段是否為true扣蜻,或者該Handler是否被標(biāo)記為@Sharable重復(fù)使用逆巍,否則直接拋出錯(cuò)誤,重復(fù)添加Handler莽使,否則該Handler為首次添加锐极,將added字段設(shè)置為true,表示已經(jīng)添加芳肌,防止重復(fù)添加的判斷條件灵再。

判斷重復(fù)添加工作后,接下來調(diào)用了newContext方法實(shí)例化了一個(gè)AbstractChannelHandlerContext對象亿笤,代碼如下:

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

該方法直接實(shí)例化了一個(gè)DefaultChannelHandlerContext對象翎迁,一起來看看該類的實(shí)現(xiàn):

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
    private final ChannelHandler handler;

    DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, handler.getClass());
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }
}

該對象和TailContextHeadContext的實(shí)現(xiàn)基本上差不多净薛,前文有提汪榔,這里就不過多深入講解了。

執(zhí)行完newContext()方法后肃拜,接下來就執(zhí)行了addLast0()方法了痴腌,如下所示:

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

可以看到的是,這是將節(jié)點(diǎn)進(jìn)行鏈接到對應(yīng)的鏈表節(jié)點(diǎn)上燃领,執(zhí)行完的鏈表結(jié)果如下所示:

鏈表節(jié)點(diǎn)

到此為止士聪,一個(gè)完整的Handler添加過程就完成了,當(dāng)然還有一些其它操作過程這里就不細(xì)講了猛蔽,有興趣可以自行了解剥悟。

總結(jié)

通過源碼分析,我們可以知道ChannelPipeline就像是一個(gè)大管家,管理著多個(gè)HandlerHandlerContext的關(guān)聯(lián)懦胞,任何入站和出站的事件通過位運(yùn)算找到對應(yīng)需要處理的Handler替久,然后根據(jù)規(guī)則流經(jīng)不同的Handler事件處理方法。當(dāng)一個(gè)入站事件觸發(fā)的時(shí)候會(huì)從head節(jié)點(diǎn)躏尉,依次找到合適節(jié)點(diǎn)進(jìn)行處理蚯根,出站事件則會(huì)從tail節(jié)點(diǎn)開始依次找到合適的節(jié)點(diǎn)進(jìn)行處理。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末胀糜,一起剝皮案震驚了整個(gè)濱河市颅拦,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌距帅,老刑警劉巖,帶你破解...
    沈念sama閱讀 207,113評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件括堤,死亡現(xiàn)場離奇詭異碌秸,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)悄窃,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評論 2 381
  • 文/潘曉璐 我一進(jìn)店門讥电,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人轧抗,你說我怎么就攤上這事恩敌。” “怎么了横媚?”我有些...
    開封第一講書人閱讀 153,340評論 0 344
  • 文/不壞的土叔 我叫張陵纠炮,是天一觀的道長。 經(jīng)常有香客問我灯蝴,道長恢口,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,449評論 1 279
  • 正文 為了忘掉前任穷躁,我火速辦了婚禮弧蝇,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘折砸。我一直安慰自己看疗,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,445評論 5 374
  • 文/花漫 我一把揭開白布睦授。 她就那樣靜靜地躺著两芳,像睡著了一般。 火紅的嫁衣襯著肌膚如雪去枷。 梳的紋絲不亂的頭發(fā)上怖辆,一...
    開封第一講書人閱讀 49,166評論 1 284
  • 那天是复,我揣著相機(jī)與錄音,去河邊找鬼竖螃。 笑死淑廊,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的特咆。 我是一名探鬼主播季惩,決...
    沈念sama閱讀 38,442評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼腻格!你這毒婦竟也來了画拾?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,105評論 0 261
  • 序言:老撾萬榮一對情侶失蹤菜职,失蹤者是張志新(化名)和其女友劉穎青抛,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體酬核,經(jīng)...
    沈念sama閱讀 43,601評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蜜另,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,066評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了嫡意。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片举瑰。...
    茶點(diǎn)故事閱讀 38,161評論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖鹅很,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情罪帖,我是刑警寧澤促煮,帶...
    沈念sama閱讀 33,792評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站整袁,受9級特大地震影響菠齿,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜坐昙,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,351評論 3 307
  • 文/蒙蒙 一绳匀、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧炸客,春花似錦疾棵、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至开仰,卻和暖如春拟枚,著一層夾襖步出監(jiān)牢的瞬間薪铜,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評論 1 261
  • 我被黑心中介騙來泰國打工恩溅, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留隔箍,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,618評論 2 355
  • 正文 我出身青樓脚乡,卻偏偏與公主長得像蜒滩,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子每窖,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,916評論 2 344