netty之Pipeline

在netty中忘分,每個channel都有僅有一個ChannelPipeline與之對應档押,ChannelPipeline中又維護了一個由ChannelHandlerContext 組成的雙向鏈表,這個鏈表的頭是 HeadContext, 鏈表的尾是 TailContext, 并且每個 ChannelHandlerContext 中關聯著一個 ChannelHandler.


image.png

我們來回顧下Channel的初識化過程:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

AbstractChannel 有一個 pipeline 字段, 在構造器中會初始化它為 DefaultChannelPipeline的實例. 也即每個 Channel 都有一個 ChannelPipeline.

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

在 DefaultChannelPipeline 構造器中, 首先將與之關聯的 Channel 保存到字段 channel 中, 然后實例化兩個 ChannelHandlerContext, 一個是 HeadContext 實例 head, 另一個是 TailContext 實例 tail. 接著將 head 和 tail 互相指向, 其實在 DefaultChannelPipeline 中, 維護了一個以 AbstractChannelHandlerContext 為節(jié)點的雙向鏈表, 這個鏈表是 Netty 實現 Pipeline 機制的關鍵.

image.png
image.png

從類層次結構圖中可以很清楚地看到, head 實現了 ChannelInboundHandler, 而 tail 實現了 ChannelOutboundHandler 接口, 并且它們都實現了 ChannelHandlerContext 接口, 因此可以說 head 和 tail 即是一個 ChannelHandler, 又是一個 ChannelHandlerContext.

接著看一下 HeadContext 的構造器:

HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, false, true);
    unsafe = pipeline.channel().unsafe();
}

它調用了父類 AbstractChannelHandlerContext 的構造器, 并傳入參數 inbound = false, outbound = true.
TailContext 的構造器與 HeadContext 的相反, 它調用了父類 AbstractChannelHandlerContext 的構造器, 并傳入參數 inbound = true, outbound = false.

ChannelInitializer 的添加

我們知道最開始的時候 ChannelPipeline 中含有兩個 ChannelHandlerContext(同時也是 ChannelHandler), 但是這個 Pipeline并不能實現什么特殊的功能, 因為我們還沒有給它添加自定義的 ChannelHandler.
通常來說, 我們在初始化 Bootstrap, 會添加我們自定義的 ChannelHandler, 就以我們熟悉的 EchoClient 來舉例吧:

Bootstrap b = new Bootstrap();
b.group(group)
 .channel(NioSocketChannel.class)
 .option(ChannelOption.TCP_NODELAY, true)
 .handler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new EchoClientHandler());
     }
 });

在調用 handler 時, 傳入了 ChannelInitializer 對象, 它提供了一個 initChannel 方法供我們初始化 ChannelHandler.
ChannelInitializer 實現了 ChannelHandler, 那么它是在什么時候添加到 ChannelPipeline 中的呢? 它是在 Bootstrap.init 方法中添加到 ChannelPipeline 中的.

@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(handler());
    ...
}

上面的代碼將 handler() 返回的 ChannelHandler 添加到 Pipeline 中, 而 handler() 返回的是handler 其實就是我們在初始化 Bootstrap 調用 handler 設置的 ChannelInitializer 實例,因此這里就是將 ChannelInitializer 插入到了 Pipeline 的末端.

@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
    synchronized (this) {
        checkDuplicateName(name); // 檢查此 handler 是否有重復的名字

        AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
        addLast0(name, newCtx);
    }

    return this;
}

上面的 addLast 方法中, 首先檢查這個 ChannelHandler 的名字是否是重復的, 如果不重復的話, 則為這個 Handler 創(chuàng)建一個對應的 DefaultChannelHandlerContext 實例, 并與之關聯起來(Context 中有一個 handler 屬性保存著對應的 Handler 實例). 判斷此 Handler 是否重名的方法很簡單: Netty 中有一個 name2ctx Map 字段, key 是 handler 的名字, 而 value 則是 handler 本身. 因此通過如下代碼就可以判斷一個 handler 是否重名了:

private void checkDuplicateName(String name) {
    if (name2ctx.containsKey(name)) {
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}

為了添加一個 handler 到 pipeline 中, 必須把此 handler 包裝成 ChannelHandlerContext. 因此在上面的代碼中我們可以看到新實例化了一個 newCtx 對象, 并將 handler 作為參數傳遞到構造方法中.

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, ChannelHandler handler) {
    super(pipeline, group, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
}

DefaultChannelHandlerContext 的構造器中, 調用了兩個很有意思的方法: isInbound 與 isOutbound, 這兩個方法是做什么的呢?

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}

可以看到當一個 handler 實現了 ChannelInboundHandler 接口, 則 isInbound 返回真; 相似地, 當一個 handler 實現了 ChannelOutboundHandler 接口, 則 isOutbound 就返回真. 而這兩個 boolean 變量會傳遞到父類 AbstractChannelHandlerContext 中, 并初始化父類的兩個字段: inbound 與 outbound.
那么這里的 ChannelInitializer 所對應的 DefaultChannelHandlerContext 的 inbound 與 inbound 字段分別是什么呢? 那就看一下 ChannelInitializer 到底實現了哪個接口不就行了?


image.png

可以清楚地看到, ChannelInitializer 僅僅實現了 ChannelInboundHandler 接口, 因此這里實例化的 DefaultChannelHandlerContext 的 inbound = true, outbound = false. 其實這兩個字段關系到 pipeline 的事件的流向與分類塌西,這個我們后面再詳細分析脖阵。

當創(chuàng)建好 Context 后, 就將這個 Context 插入到 Pipeline 的雙向鏈表中:

private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
    checkMultiplicity(newCtx);

    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;

    name2ctx.put(name, newCtx);

    callHandlerAdded(newCtx);
}

自定義 ChannelHandler 的添加過程

在上面, 我們已經分析了一個 ChannelInitializer 如何插入到 Pipeline 中的, 接下來就來探討一下 ChannelInitializer 在哪里被調用, ChannelInitializer 的作用, 以及我們自定義的 ChannelHandler 是如何插入到 Pipeline 中的.

netty客戶端我們已經分析過 Channel 的注冊過程了, 這里簡單回顧一下:

首先在 AbstractBootstrap.initAndRegister中, 通過 group().register(channel), 調用 MultithreadEventLoopGroup.register 方法

在MultithreadEventLoopGroup.register 中, 通過 next() 獲取一個可用的 SingleThreadEventLoop, 然后調用它的 register

在 SingleThreadEventLoop.register 中, 通過 channel.unsafe().register(this, promise) 來獲取 channel 的 unsafe() 底層操作對象, 然后調用它的 register.

在 AbstractUnsafe.register 方法中, 調用 register0 方法注冊 Channel

在 AbstractUnsafe.register0 中, 調用 AbstractNioChannel#doRegister 方法

AbstractNioChannel.doRegister 方法通過 javaChannel().register(eventLoop().selector, 0, this) 將 Channel 對應的 Java NIO SockerChannel 注冊到一個 eventLoop 的 Selector 中, 并且將當前 Channel 作為 attachment.

而我們自定義 ChannelHandler 的添加過程, 發(fā)生在 AbstractUnsafe.register0 中, 在這個方法中調用了 pipeline.fireChannelRegistered() 方法, 其實現如下:

private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;
                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (firstRegistration && isActive()) {
                    pipeline.fireChannelActive();
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
@Override
public ChannelPipeline fireChannelRegistered() {
    head.fireChannelRegistered();
    return this;
}

上面的代碼很簡單, 就是調用了 head.fireChannelRegistered() 方法而已.
還記得 head 的 類層次結構圖不, head 是一個 AbstractChannelHandlerContext 實例, 并且它沒有重寫 fireChannelRegistered 方法, 因此 head.fireChannelRegistered 其實是調用的 AbstractChannelHandlerContext.fireChannelRegistered:

public ChannelHandlerContext fireChannelRegistered() {
        final AbstractChannelHandlerContext next = findContextInbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new OneTimeTask() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
        return this;
    }

這個方法的第一句是調用 findContextInbound 獲取一個 Context, 那么它返回的 Context 到底是什么呢? 我們跟進代碼中看一下:

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

很顯然, 這個代碼會從 head 開始遍歷 Pipeline 的雙向鏈表, 然后找到第一個屬性 inbound 為 true 的 ChannelHandlerContext 實例. 想起來了沒? 我們在前面分析 ChannelInitializer 時, 花了大量的筆墨來分析了 inbound 和 outbound 屬性, 你看現在這里就用上了. 回想一下, ChannelInitializer 實現了 ChannelInboudHandler, 因此它所對應的 ChannelHandlerContext 的 inbound 屬性就是 true, 因此這里返回就是 ChannelInitializer 實例所對應的 ChannelHandlerContext. 即:


image.png

當獲取到 inbound 的 Context 后, 就調用它的 invokeChannelRegistered 方法:

private void invokeChannelRegistered() {
    try {
        ((ChannelInboundHandler) handler()).channelRegistered(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

我們已經強調過了, 每個 ChannelHandler 都與一個 ChannelHandlerContext 關聯, 我們可以通過 ChannelHandlerContext 獲取到對應的 ChannelHandler. 因此很顯然了, 這里 handler() 返回的, 其實就是一開始我們實例化的 ChannelInitializer 對象, 并接著調用了 ChannelInitializer.channelRegistered 方法.

@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    initChannel((C) ctx.channel());
    ctx.pipeline().remove(this);
    ctx.fireChannelRegistered();
}

initChannel 這個方法我們很熟悉了吧, 它就是我們在初始化 Bootstrap 時, 調用 handler 方法傳入的匿名內部類所實現的方法:

.handler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new EchoClientHandler());
     }
 });

因此當調用了這個方法后, 我們自定義的 ChannelHandler 就插入到 Pipeline 了, 此時的 Pipeline 如下圖所示:

image.png

當添加了自定義的 ChannelHandler 后, 會刪除 ChannelInitializer 這個 ChannelHandler, 即 "ctx.pipeline().remove(this)", 因此最后的 Pipeline 如下:


image.png
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市荷辕,隨后出現的幾起案子,更是在濱河造成了極大的恐慌攀痊,老刑警劉巖桐腌,帶你破解...
    沈念sama閱讀 211,948評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件拄显,死亡現場離奇詭異苟径,居然都是意外死亡,警方通過查閱死者的電腦和手機躬审,發(fā)現死者居然都...
    沈念sama閱讀 90,371評論 3 385
  • 文/潘曉璐 我一進店門棘街,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人承边,你說我怎么就攤上這事遭殉。” “怎么了博助?”我有些...
    開封第一講書人閱讀 157,490評論 0 348
  • 文/不壞的土叔 我叫張陵险污,是天一觀的道長。 經常有香客問我,道長蛔糯,這世上最難降的妖魔是什么拯腮? 我笑而不...
    開封第一講書人閱讀 56,521評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮蚁飒,結果婚禮上动壤,老公的妹妹穿的比我還像新娘。我一直安慰自己淮逻,他們只是感情好琼懊,可當我...
    茶點故事閱讀 65,627評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著爬早,像睡著了一般哼丈。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上凸椿,一...
    開封第一講書人閱讀 49,842評論 1 290
  • 那天削祈,我揣著相機與錄音,去河邊找鬼脑漫。 笑死髓抑,一個胖子當著我的面吹牛,可吹牛的內容都是我干的优幸。 我是一名探鬼主播吨拍,決...
    沈念sama閱讀 38,997評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼网杆!你這毒婦竟也來了羹饰?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,741評論 0 268
  • 序言:老撾萬榮一對情侶失蹤碳却,失蹤者是張志新(化名)和其女友劉穎队秩,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體昼浦,經...
    沈念sama閱讀 44,203評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡馍资,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,534評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了关噪。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鸟蟹。...
    茶點故事閱讀 38,673評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖使兔,靈堂內的尸體忽然破棺而出建钥,到底是詐尸還是另有隱情,我是刑警寧澤虐沥,帶...
    沈念sama閱讀 34,339評論 4 330
  • 正文 年R本政府宣布熊经,位于F島的核電站,受9級特大地震影響,放射性物質發(fā)生泄漏镐依。R本人自食惡果不足惜悉盆,卻給世界環(huán)境...
    茶點故事閱讀 39,955評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望馋吗。 院中可真熱鬧焕盟,春花似錦、人聲如沸宏粤。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,770評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽绍哎。三九已至来农,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間崇堰,已是汗流浹背沃于。 一陣腳步聲響...
    開封第一講書人閱讀 32,000評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留海诲,地道東北人繁莹。 一個月前我還...
    沈念sama閱讀 46,394評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像特幔,于是被迫代替她去往敵國和親咨演。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,562評論 2 349

推薦閱讀更多精彩內容