netty之pipeline2

ChannelHandler 的名字

pipeline.addXXX 都有一個重載的方法, 例如 addLast, 它有一個重載的版本是:

ChannelPipeline addLast(String name, ChannelHandler handler);

第一個參數(shù)指定了所添加的 handler 的名字(更準(zhǔn)確地說是 ChannelHandlerContext 的名字, 不過我們通常是以 handler 作為敘述的對象, 因此說成 handler 的名字便于理解). 那么 handler 的名字有什么用呢? 如果我們不設(shè)置name, 那么 handler 會有怎樣的名字?

@Override
public ChannelPipeline addLast(String name, ChannelHandler handler) {
    return addLast(null, name, handler);
}
@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
    synchronized (this) {
        checkDuplicateName(name);

        AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
        addLast0(name, newCtx);
    }

    return this;
}

第一個參數(shù)被設(shè)置為 null, 我們不關(guān)心它. 第二參數(shù)就是這個 handler 的名字. 看代碼可知, 在添加一個 handler 之前, 需要調(diào)用 checkDuplicateName 方法來確定此 handler 的名字是否和已添加的 handler 的名字重復(fù). 而這個 checkDuplicateName 方法我們在pipeline1
已經(jīng)有提到, 這里再回顧一下:

private void checkDuplicateName(String name) {
    if (name2ctx.containsKey(name)) {
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}

Netty 判斷一個 handler 的名字是否重復(fù)的依據(jù)很簡單: DefaultChannelPipeline 中有一個 類型為 Map<String, AbstractChannelHandlerContext> 的 name2ctx 字段, 它的 key 是一個 handler 的名字, 而 value 則是這個 handler 所對應(yīng)的 ChannelHandlerContext. 每當(dāng)新添加一個 handler 時, 就會 put 到 name2ctx 中. 因此檢查 name2ctx 中是否包含這個 name 即可.
當(dāng)沒有重名的 handler 時, 就為這個 handler 生成一個關(guān)聯(lián)的 DefaultChannelHandlerContext 對象, 然后就將 name 和 newCtx 作為 key-value 對 放到 name2Ctx 中.

自動生成 handler 的名字

我們調(diào)用的是如下的 addLast 方法

ChannelPipeline addLast(ChannelHandler... handlers);

那么 Netty 會調(diào)用 generateName 為我們的 handler 自動生成一個名字:

private String generateName(ChannelHandler handler) {
    WeakHashMap<Class<?>, String> cache = nameCaches[(int) (Thread.currentThread().getId() % nameCaches.length)];
    Class<?> handlerType = handler.getClass();
    String name;
    synchronized (cache) {
        name = cache.get(handlerType);
        if (name == null) {
            name = generateName0(handlerType);
            cache.put(handlerType, name);
        }
    }

    synchronized (this) {
        // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
        // any name conflicts.  Note that we don't cache the names generated here.
        if (name2ctx.containsKey(name)) {
            String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
            for (int i = 1;; i ++) {
                String newName = baseName + i;
                if (!name2ctx.containsKey(newName)) {
                    name = newName;
                    break;
                }
            }
        }
    }

    return name;
}

而 generateName 會接著調(diào)用 generateName0 來實(shí)際產(chǎn)生一個 handler 的名字:

private static String generateName0(Class<?> handlerType) {
    return StringUtil.simpleClassName(handlerType) + "#0";
}

自動生成的名字的規(guī)則很簡單, 就是 handler 的簡單類名加上 "#0", 因此我們的 EchoClientHandler 的名字就是 "EchoClientHandler#0",

Pipeline的事件傳輸機(jī)制

我們知道 AbstractChannelHandlerContext 中有 inbound 和 outbound 兩個 boolean 變量, 分別用于標(biāo)識 Context 所對應(yīng)的 handler 的類型, 即:

  • inbound 為真時, 表示對應(yīng)的 ChannelHandler 實(shí)現(xiàn)了 ChannelInboundHandler 方法.
  • outbound 為真時, 表示對應(yīng)的 ChannelHandler 實(shí)現(xiàn)了 ChannelOutboundHandler 方法.

Netty 的事件可以分為 Inbound 和 Outbound 事件.

                                             I/O Request
                                        via Channel or
                                    ChannelHandlerContext
                                                  |
+---------------------------------------------------+---------------+
|                           ChannelPipeline         |               |
|                                                  \|/              |
|    +---------------------+            +-----------+----------+    |
|    | Inbound Handler  N  |            | Outbound Handler  1  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  .               |
|               .                                   .               |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
|        [ method call]                       [method call]         |
|               .                                   .               |
|               .                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  1  |            | Outbound Handler  M  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
+---------------+-----------------------------------+---------------+
              |                                  \|/
+---------------+-----------------------------------+---------------+
|               |                                   |               |
|       [ Socket.read() ]                    [ Socket.write() ]     |
|                                                                   |
|  Netty Internal I/O Threads (Transport Implementation)            |
+-------------------------------------------------------------------+

從上圖可以看出, inbound 事件和 outbound 事件的流向是不一樣的, inbound 事件的流行是從下至上, 而 outbound 剛好相反, 是從上到下. 并且 inbound 的傳遞方式是通過調(diào)用相應(yīng)的 ChannelHandlerContext.fireIN_EVT() 方法, 而 outbound 方法的的傳遞方式是通過調(diào)用 ChannelHandlerContext.OUT_EVT() 方法.
Inbound 事件傳播方法有:

ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

Oubound 事件傳輸方法有:

ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)

如果我們捕獲了一個事件, 并且想讓這個事件繼續(xù)傳遞下去, 那么需要調(diào)用 Context 相應(yīng)的傳播方法.

public class MyInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("Connected!");
        ctx.fireChannelActive();
    }
}

public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
        System.out.println("Closing ..");
        ctx.close(promise);
    }
}

上面的例子中, MyInboundHandler 收到了一個 channelActive 事件, 它在處理后, 如果希望將事件繼續(xù)傳播下去, 那么需要接著調(diào)用 ctx.fireChannelActive().

Outbound 操作(outbound operations of a channel)

Outbound 事件都是請求事件(request event), 即請求某件事情的發(fā)生, 然后通過 Outbound 事件進(jìn)行通知.Outbound 事件的傳播方向是 tail -> customContext -> head.
我們接下來以 connect 事件為例, 分析一下 Outbound 事件的傳播機(jī)制.
首先, 當(dāng)用戶調(diào)用了 Bootstrap.connect 方法時, 就會觸發(fā)一個 Connect 請求事件, 此調(diào)用會觸發(fā)如下調(diào)用鏈:

Bootstrap.connect -> Bootstrap.doConnect -> Bootstrap.doConnect0 -> AbstractChannel.connect

繼續(xù)跟蹤的話, 我們就發(fā)現(xiàn), AbstractChannel.connect 其實(shí)是調(diào)用了 DefaultChannelPipeline.connect 方法:

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, promise);
}

而 pipeline.connect 的實(shí)現(xiàn)如下:

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}

可以看到, 當(dāng) outbound 事件(這里是 connect 事件)傳遞到 Pipeline 后, 它其實(shí)是以 tail 為起點(diǎn)開始傳播的.而 tail.connect 其實(shí)調(diào)用的是 AbstractChannelHandlerContext.connect 方法:

@Override
public ChannelFuture connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    ...
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    ...
    next.invokeConnect(remoteAddress, localAddress, promise);
    ...
    return promise;
}

findContextOutbound() 顧名思義, 它的作用是以當(dāng)前 Context 為起點(diǎn), 向 Pipeline 中的 Context 雙向鏈表的前端尋找第一個 outbound 屬性為真的 Context(即關(guān)聯(lián)著 ChannelOutboundHandler 的 Context), 然后返回.
它的實(shí)現(xiàn)如下:

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

當(dāng)我們找到了一個 outbound 的 Context 后, 就調(diào)用它的 invokeConnect 方法, 這個方法中會調(diào)用 Context 所關(guān)聯(lián)著的 ChannelHandler 的 connect 方法:

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

如果用戶沒有重寫 ChannelHandler 的 connect 方法, 那么會調(diào)用 ChannelOutboundHandlerAdapter 所實(shí)現(xiàn)的方法:

@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
        SocketAddress localAddress, ChannelPromise promise) throws Exception {
    ctx.connect(remoteAddress, localAddress, promise);
}

我們看到, ChannelOutboundHandlerAdapter.connect 僅僅調(diào)用了 ctx.connect, 而這個調(diào)用又回到了:

Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect

這樣的循環(huán)中, 直到 connect 事件傳遞到DefaultChannelPipeline 的雙向鏈表的頭節(jié)點(diǎn), 即 head 中. 為什么會傳遞到 head 中呢? 回想一下, head 實(shí)現(xiàn)了 ChannelOutboundHandler, 因此它的 outbound 屬性是 true.因?yàn)?head 本身既是一個 ChannelHandlerContext, 又實(shí)現(xiàn)了 ChannelOutboundHandler 接口, 因此當(dāng) connect 消息傳遞到 head 后, 會將消息轉(zhuǎn)遞到對應(yīng)的 ChannelHandler 中處理, 而恰好, head 的 handler() 返回的就是 head 本身:

@Override
public ChannelHandler handler() {
    return this;
}

因此最終 connect 事件是在 head 中處理的. head 的 connect 事件處理方法如下:

@Override
public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

到這里, 整個 Connect 請求事件就結(jié)束了.

image.png

Inbound 事件

Inbound 事件是一個通知事件, 即某件事已經(jīng)發(fā)生了, 然后通過 Inbound 事件進(jìn)行通知. Inbound 通常發(fā)生在 Channel 的狀態(tài)的改變或 IO 事件就緒.
Inbound 的特點(diǎn)是它傳播方向是 head -> customContext -> tail.

既然上面我們分析了 Connect 這個 Outbound 事件, 那么接著分析 Connect 事件后會發(fā)生什么 Inbound 事件, 并最終找到 Outbound 和 Inbound 事件之間的聯(lián)系.

當(dāng) Connect 這個 Outbound 傳播到 unsafe 后, 其實(shí)是在 AbstractNioUnsafe.connect 方法中進(jìn)行處理的:

@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    ...
    if (doConnect(remoteAddress, localAddress)) {
        fulfillConnectPromise(promise, wasActive);
    } else {
        ...
    }
    ...
}

在 AbstractNioUnsafe.connect 中, 首先調(diào)用 doConnect 方法進(jìn)行實(shí)際上的 Socket 連接, 當(dāng)連接上后, 會調(diào)用 fulfillConnectPromise 方法:

private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
    ...
    // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
    // because what happened is what happened.
    if (!wasActive && isActive()) {
        pipeline().fireChannelActive();
    }
    ...
}

我們看到, 在 fulfillConnectPromise 中, 會通過調(diào)用 pipeline().fireChannelActive() 將通道激活的消息(即 Socket 連接成功)發(fā)送出去.
而這里, 當(dāng)調(diào)用 pipeline.fireXXX 后, 就是 Inbound 事件的起點(diǎn).
因此當(dāng)調(diào)用了 pipeline().fireChannelActive() 后, 就產(chǎn)生了一個 ChannelActive Inbound 事件, 我們就從這里開始看看這個 Inbound 事件是怎么傳播的吧.

@Override
public ChannelPipeline fireChannelActive() {
    head.fireChannelActive();

    if (channel.config().isAutoRead()) {
        channel.read();
    }

    return this;
}

果然, 在 fireChannelActive 方法中, 調(diào)用的是 head.fireChannelActive, 因此可以證明了, Inbound 事件在 Pipeline 中傳輸?shù)钠瘘c(diǎn)是 head.那么, 在 head.fireChannelActive() 中又做了什么呢?

@Override
public ChannelHandlerContext fireChannelActive() {
    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    ...
    next.invokeChannelActive();
    ...
    return this;
}
  • 首先調(diào)用 findContextInbound, 從 Pipeline 的雙向鏈表中中找到第一個屬性 inbound 為真的 Context, 然后返回
  • 調(diào)用這個 Context 的 invokeChannelActive

invokeChannelActive 方法如下:

private void invokeChannelActive() {
    try {
        ((ChannelInboundHandler) handler()).channelActive(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

這個方法和 Outbound 的對應(yīng)方法(例如 invokeConnect) 如出一轍. 同 Outbound 一樣, 如果用戶沒有重寫 channelActive 方法, 那么會調(diào)用 ChannelInboundHandlerAdapter 的 channelActive 方法:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();
}

同樣地, 在 ChannelInboundHandlerAdapter.channelActive 中, 僅僅調(diào)用了 ctx.fireChannelActive 方法, 因此就會有如下循環(huán):

Context.fireChannelActive -> Connect.findContextInbound -> nextContext.invokeChannelActive -> nextHandler.channelActive -> nextContext.fireChannelActive

這樣的循環(huán)中. 同理, tail 本身 既實(shí)現(xiàn)了 ChannelInboundHandler 接口, 又實(shí)現(xiàn)了 ChannelHandlerContext 接口, 因此當(dāng) channelActive 消息傳遞到 tail 后, 會將消息轉(zhuǎn)遞到對應(yīng)的 ChannelHandler 中處理, 而恰好, tail 的 handler() 返回的就是 tail 本身:

@Override
public ChannelHandler handler() {
    return this;
}

因此 channelActive Inbound 事件最終是在 tail 中處理的, 我們看一下它的處理方法:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { }

TailContext.channelActive 方法是空的. 可見, 如果是 Inbound, 當(dāng)用戶沒有實(shí)現(xiàn)自定義的處理器時, 那么默認(rèn)是不處理的.


image.png

小結(jié)

對于 Outbound事件:

  • Outbound 事件是請求事件(由 Connect 發(fā)起一個請求, 并最終由 unsafe 處理這個請求)
  • Outbound 事件的發(fā)起者是 Channel
  • Outbound 事件的處理者是 unsafe
  • Outbound 事件在 Pipeline 中的傳輸方向是 tail -> head.
  • 在 ChannelHandler 中處理事件時, 如果這個 Handler 不是最后一個 Hnalder, 則需要調(diào)用 ctx.xxx (例如 ctx.connect) 將此事件繼續(xù)傳播下去. 如果不這樣做, 那么此事件的傳播會提前終止.
  • Outbound 事件流: Context.OUT_EVT -> Connect.findContextOutbound -> nextContext.invokeOUT_EVT -> nextHandler.OUT_EVT -> nextContext.OUT_EVT

對于 Inbound 事件:

  • Inbound 事件是通知事件, 當(dāng)某件事情已經(jīng)就緒后, 通知上層.
  • Inbound 事件發(fā)起者是 unsafe
  • Inbound 事件的處理者是 Channel, 如果用戶沒有實(shí)現(xiàn)自定義的處理方法, 那么Inbound 事件默認(rèn)的處理者是 TailContext, 并且其處理方法是空實(shí)現(xiàn).
  • Inbound 事件在 Pipeline 中傳輸方向是 head -> tail
  • 在 ChannelHandler 中處理事件時, 如果這個 Handler 不是最后一個 Handler, 則需要調(diào)用 ctx.fireIN_EVT (例如 ctx.fireChannelActive) 將此事件繼續(xù)傳播下去. 如果不這樣做, 那么此事件的傳播會提前終止.
  • Inbound 事件流: Context.fireIN_EVT -> Connect.findContextInbound -> nextContext.invokeIN_EVT -> nextHandler.IN_EVT -> nextContext.fireIN_EVT
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末脾拆,一起剝皮案震驚了整個濱河市围来,隨后出現(xiàn)的幾起案子枷踏,更是在濱河造成了極大的恐慌括眠,老刑警劉巖骏啰,帶你破解...
    沈念sama閱讀 211,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件实抡,死亡現(xiàn)場離奇詭異阁簸,居然都是意外死亡畔柔,警方通過查閱死者的電腦和手機(jī)纷责,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評論 3 385
  • 文/潘曉璐 我一進(jìn)店門捍掺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人再膳,你說我怎么就攤上這事挺勿。” “怎么了喂柒?”我有些...
    開封第一講書人閱讀 157,435評論 0 348
  • 文/不壞的土叔 我叫張陵不瓶,是天一觀的道長。 經(jīng)常有香客問我灾杰,道長蚊丐,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,509評論 1 284
  • 正文 為了忘掉前任麦备,我火速辦了婚禮,結(jié)果婚禮上昭娩,老公的妹妹穿的比我還像新娘凛篙。我一直安慰自己,他們只是感情好栏渺,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,611評論 6 386
  • 文/花漫 我一把揭開白布呛梆。 她就那樣靜靜地躺著,像睡著了一般迈嘹。 火紅的嫁衣襯著肌膚如雪削彬。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,837評論 1 290
  • 那天秀仲,我揣著相機(jī)與錄音融痛,去河邊找鬼。 笑死神僵,一個胖子當(dāng)著我的面吹牛雁刷,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播保礼,決...
    沈念sama閱讀 38,987評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼沛励,長吁一口氣:“原來是場噩夢啊……” “哼责语!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起目派,我...
    開封第一講書人閱讀 37,730評論 0 267
  • 序言:老撾萬榮一對情侶失蹤坤候,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后企蹭,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體白筹,經(jīng)...
    沈念sama閱讀 44,194評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,525評論 2 327
  • 正文 我和宋清朗相戀三年谅摄,在試婚紗的時候發(fā)現(xiàn)自己被綠了徒河。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,664評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡送漠,死狀恐怖顽照,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情闽寡,我是刑警寧澤代兵,帶...
    沈念sama閱讀 34,334評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站下隧,受9級特大地震影響奢人,放射性物質(zhì)發(fā)生泄漏谓媒。R本人自食惡果不足惜淆院,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,944評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望句惯。 院中可真熱鬧土辩,春花似錦、人聲如沸抢野。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽指孤。三九已至启涯,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間恃轩,已是汗流浹背结洼。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留叉跛,地道東北人松忍。 一個月前我還...
    沈念sama閱讀 46,389評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像筷厘,于是被迫代替她去往敵國和親鸣峭。 傳聞我的和親對象是個殘疾皇子宏所,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,554評論 2 349

推薦閱讀更多精彩內(nèi)容