Netty源碼分析之pipeline

承接題意麦牺,平鋪直敘钮蛛。Netty中每個(gè)channel都有一個(gè)pipeline,可以看下channel的層次結(jié)構(gòu):

其實(shí)在之前的客戶端和服務(wù)端初始化的時(shí)候已經(jīng)說過了剖膳,在初始化Channel的時(shí)候魏颓,同時(shí)初始化pipeline;

//AbstractChannel
protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}
//DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

從DefaultChannelPipeline即可知道初始化pipeline的時(shí)候吱晒,head和tail是雙向鏈表甸饱。可以看下它們的繼承關(guān)系,再進(jìn)行其初始化的觀察叹话。

HeadContext:

TailContext:

從繼承結(jié)構(gòu)可以看出偷遗,HeadContext和TailContext都繼承了AbstractChannelHandlerContext和實(shí)現(xiàn)了ChannelOutboundHandler/ChannelInboundHandler,說明用戶雙重屬性驼壶,既是context同時(shí)也是handler氏豌,按我的理解意味著其即擁有上下文屬性也擁有handler屬性(處理業(yè)務(wù)邏輯)。在文章開始的圖里已經(jīng)說明:每個(gè)channel包含一個(gè)pipeline热凹,而pipeline又維護(hù)了一個(gè)雙向鏈表泵喘。

TailContext和HeadContext


這是TailContext和HeadContext構(gòu)造函數(shù),需要注意的是TailContext的inbound為true般妙,outbound為false纪铺,HeadContext則相反,這兩個(gè)參數(shù)和netty事件流向有關(guān)碟渺,具體情況下文說明霹陡。

重新分析ChannelInitializer和自定義handler的添加

Bootstrap.connect()-->Bootstrap.doResolveAndConnect()-->AbstractBootstrap.initAndRegister()-->Bootstrap.init()

void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(config.handler());
    。止状。。
}

config.handler()獲取的就是ChannelInitializer攒霹,p.addLast(config.handler());就是把ChannelInitializer加入雙向鏈表怯疤,看代碼:

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);
        newCtx = newContext(group, filterName(name, handler), handler);
        addLast0(newCtx);
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

讓我們來看看其中的關(guān)鍵代碼

newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

由上可知,在加入ChannelInitializer的過程中可以知道催束,為了添加一個(gè) handler 到pipeline中, 會把此handler包裝成ChannelHandlerContext集峦。同時(shí)addLast0說明ChannelInitializer是添加在tail之前。這個(gè)過程中注意下兩個(gè)有意思的方法:

private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}

從源碼中可以看到, 當(dāng)一個(gè)handler實(shí)現(xiàn)了ChannelInboundHandler接口, 則 isInbound 返回真; 類似地, 當(dāng)一個(gè)handler實(shí)現(xiàn)了ChannelOutboundHandler接口, 則isOutbound就返回真抠刺。ChannelInitializer是實(shí)現(xiàn)了ChannelInboundHandlerAdapter塔淤,所以inbound傳入的是true。

自定義handler的添加

addLast方法中的另一條關(guān)鍵代碼如下:

callHandlerAdded0(newCtx);

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    速妖。高蜂。。    
    ctx.handler().handlerAdded(ctx);
    罕容。备恤。。
}

ctx.handler()取到的自然是ChannelInitializer锦秒,而handlerAdded(ctx)都做了什么呢:

handlerAdded()-->boolean initChannel()-->void initChannel()

public void  handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        initChannel(ctx);
    }
}

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
        try {
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            exceptionCaught(ctx, cause);
        } finally {
            remove(ctx);
        }
        return true;
    }
    return false;
}

ChannelInitializer在加入雙向鏈表后露泊,調(diào)用重寫initChannel()方法,在initChannel()方法中加入自定義handler旅择,最后remove(ctx);移除ChannelInitializer惭笑。
回過頭來看下channel的結(jié)構(gòu)層次圖,在初始化channel的時(shí)候會構(gòu)建一個(gè)pipeline座位channel的屬性(pipeline也有一個(gè)channel屬性),每個(gè)pipeline維護(hù)了一個(gè)由ChannelHandlerContext 組成的雙向鏈表. 這個(gè)鏈表的頭是HeadContext沉噩,鏈表的尾是TailContext捺宗,并且每個(gè)ChannelHandlerContext中又關(guān)聯(lián)著一個(gè)ChannelHandler。


pipeline的傳輸機(jī)制

從上面的分析屁擅,我們知道AbstractChannelHandlerContext中有inbound和outbound兩個(gè)boolean變量, 分別用于標(biāo)識Context所對應(yīng)的handler的類型, 即:

  • inbound為真時(shí), 表示對應(yīng)的ChannelHandler實(shí)現(xiàn)了ChannelInboundHandler方法.
  • outbound 為真時(shí), 表示對應(yīng)的 ChannelHandler 實(shí)現(xiàn)了 ChannelOutboundHandler 方法.
    pipieline的事件傳輸類型有兩種:inbound事件和outbound事件兩種:
                                             I/O Request
                                        via Channel or
                                    ChannelHandlerContext
                                                  |
+---------------------------------------------------+---------------+
|                           ChannelPipeline         |               |
|                                                  \|/              |
|    +---------------------+            +-----------+----------+    |
|    | Inbound Handler  N  |            | Outbound Handler  1  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  .               |
|               .                                   .               |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
|        [ method call]                       [method call]         |
|               .                                   .               |
|               .                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  1  |            | Outbound Handler  M  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
+---------------+-----------------------------------+---------------+
              |                                  \|/
+---------------+-----------------------------------+---------------+
|               |                                   |               |
|       [ Socket.read() ]                    [ Socket.write() ]     |
|                                                                   |
|  Netty Internal I/O Threads (Transport Implementation)            |
+-------------------------------------------------------------------+

這個(gè)是netty官方文檔偿凭,可以很明顯地看出:inbound事件和outbound事件的流向相反。 inbound 的傳遞方式是通過調(diào)用相應(yīng)的ChannelHandlerContext.fireIN_EVT() 方法, 而 outbound 方法的的傳遞方式是通過調(diào)用 ChannelHandlerContext.OUT_EVT() 方法派歌。例如ChannelHandlerContext.fireChannelRegistered()調(diào)用會發(fā)送一個(gè)ChannelRegistered 的inbound給下一個(gè)ChannelHandlerContext, 而ChannelHandlerContext.bind調(diào)用會發(fā)送一個(gè)bind的outbound事件給下一個(gè) ChannelHandlerContext弯囊。
讓我們來看下inbound事件傳播的方法有哪些:

ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead()
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught()
ChannelHandlerContext.fireUserEventTriggered()
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

outbound事件傳播的方法有:

ChannelHandlerContext.bind()
ChannelHandlerContext.connect()
ChannelHandlerContext.write()
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect()
ChannelHandlerContext.close()

讓我們具體來看看這兩類事件

outbound事件

outbound事件是請求事件,inbound事件是通知事件胶果,這個(gè)要區(qū)分清楚匾嘱。請求事件就是請求某件事即將發(fā)生,然后outbound事件進(jìn)行通知早抠。outbound事件的流向是:

tail -> customContext -> head

讓我們用connect事件代碼來證明:
當(dāng)調(diào)用Bootstrap.connect()的時(shí)候霎烙,會觸發(fā)一個(gè)outbound事件。以下是調(diào)用鏈

Bootstrap.connect -> Bootstrap.doConnect -> AbstractChannel.connect
讓我們看看AbstractChannel.connect

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, promise);
}
//pipeline.connect的實(shí)現(xiàn)如下:
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}

當(dāng) outbound 事件(這里是 connect 事件)傳遞到 Pipeline 后, 它其實(shí)是以 tail 為起點(diǎn)開始傳播的蕊连。而 tail.connect 其實(shí)調(diào)用的是AbstractChannelHandlerContext.connect 方法悬垃。繼續(xù)跟進(jìn),在AbstractChannelHandlerContext中connect方法:

@Override
public ChannelFuture connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    甘苍。尝蠕。。
    final AbstractChannelHandlerContext next = findContextOutbound()
    next.invokeConnect(remoteAddress, localAddress, promise);
    载庭。看彼。。
}

讓我們看下其中的關(guān)鍵代碼:

private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

顧名思義囚聚,findContextOutbound就是找出以this context(tail)為基本節(jié)點(diǎn)靖榕,找出第一個(gè)outbound為true的context,然后通過ctx調(diào)用invokeConnect方法顽铸,如果

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        connect(remoteAddress, localAddress, promise);
    }
}

從tail往head方向獲取handler并且調(diào)用其connect茁计,如果用戶沒有從寫這個(gè)方法,那么會調(diào)用ChannelOutboundHandlerAdapter實(shí)現(xiàn)的方法:

@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
        SocketAddress localAddress, ChannelPromise promise) throws Exception {
    ctx.connect(remoteAddress, localAddress, promise);
}

connect又會調(diào)用AbstractChannelHandlerContext中connect方法找到下一個(gè)outbound為true的handler調(diào)用其connect跋破,這樣的循環(huán)中簸淀,直到connect事件傳遞到DefaultChannelPipeline的雙向鏈表的頭節(jié)點(diǎn), 即 head 中(head的outbound設(shè)置為true)。

Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect

outbound事件傳到head后毒返,因?yàn)閔ead本身也是handler租幕,handler()返回的的就它本身,讓我們看看它c(diǎn)onnect方法的實(shí)現(xiàn):

public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

到這邊outbound事件就結(jié)束了拧簸。

inbound事件

inbound是通知事件劲绪,就是說某件事情已經(jīng)發(fā)生了,然后利用inbound事件進(jìn)行通知。inbound事件的傳輸方向和outbound剛好相反:

head -> customcontext -> tail

沿著connect繼續(xù)走贾富,在之后會有inbound事件歉眷,我們就以這個(gè)為例子進(jìn)行inbound事件講解。
承接上文颤枪,之前看到head的connect方法:

public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

這里unsafe.connect調(diào)用的是AbstractNioChannel.connect()汗捡,關(guān)鍵代碼如下:

@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        ···
        if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
        } 
        ···
}

在doConnect完成連接之后調(diào)用了fulfillConnectPromise,

private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
    if (promise == null) {
        return;
    }
    boolean active = isActive();
    boolean promiseSet = promise.trySuccess();
    if (!wasActive && active) {
        pipeline().fireChannelActive();
    }
    if (!promiseSet) {
        close(voidPromise());
    }
}

讓我們看pipeline().fireChannelActive();pipeline().fireChannelActive()將通道激活的消息(即 Socket 連接成功)發(fā)送出去畏纲。這里就是inbound事件的起點(diǎn)扇住,往下走看這個(gè)過程是怎么樣的:

public final ChannelPipeline fireChannelActive() {
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}

很明顯,以head(HeadContext)為起點(diǎn)盗胀,讓我們看下在invokeChannelActive做了什么

static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelInactive();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelInactive();
            }
        });
    }
}
//next.invokeChannelInactive()實(shí)現(xiàn)
private void invokeChannelInactive() {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelInactive(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelInactive();
    }
}

這個(gè)方法和 Outbound 的對應(yīng)方法(例如 invokeConnect) 如出一轍. 同Outbound一樣, 如果用戶沒有重寫channelActive方法, 那么會調(diào)用ChannelInboundHandlerAdapter 的 channelActive 方法:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();
}

public ChannelHandlerContext fireChannelActive() {
    invokeChannelActive(findContextInbound());
    return this;
}

和outbound事件一樣艘蹋,一樣的循環(huán),最后事件傳輸?shù)絫ail票灰。tail 本身既實(shí)現(xiàn)了ChannelInboundHandler接口, 又實(shí)現(xiàn)了ChannelHandlerContext接口女阀,因此當(dāng)channelActive消息傳遞到tail后,會將消息轉(zhuǎn)遞到對應(yīng)的ChannelHandler中處理屑迂,tail的handler()返回的就是tail本身浸策,最后的channelActive即是tail中的。
inbound事件到這里也就結(jié)束了惹盼。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末的榛,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子逻锐,更是在濱河造成了極大的恐慌,老刑警劉巖雕薪,帶你破解...
    沈念sama閱讀 222,627評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件昧诱,死亡現(xiàn)場離奇詭異,居然都是意外死亡所袁,警方通過查閱死者的電腦和手機(jī)盏档,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,180評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來燥爷,“玉大人蜈亩,你說我怎么就攤上這事∏棒幔” “怎么了稚配?”我有些...
    開封第一講書人閱讀 169,346評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長港华。 經(jīng)常有香客問我道川,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,097評論 1 300
  • 正文 為了忘掉前任冒萄,我火速辦了婚禮臊岸,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘尊流。我一直安慰自己帅戒,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,100評論 6 398
  • 文/花漫 我一把揭開白布崖技。 她就那樣靜靜地躺著逻住,像睡著了一般。 火紅的嫁衣襯著肌膚如雪响疚。 梳的紋絲不亂的頭發(fā)上鄙信,一...
    開封第一講書人閱讀 52,696評論 1 312
  • 那天,我揣著相機(jī)與錄音忿晕,去河邊找鬼装诡。 笑死,一個(gè)胖子當(dāng)著我的面吹牛践盼,可吹牛的內(nèi)容都是我干的鸦采。 我是一名探鬼主播,決...
    沈念sama閱讀 41,165評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼咕幻,長吁一口氣:“原來是場噩夢啊……” “哼渔伯!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起肄程,我...
    開封第一講書人閱讀 40,108評論 0 277
  • 序言:老撾萬榮一對情侶失蹤锣吼,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后蓝厌,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體玄叠,經(jīng)...
    沈念sama閱讀 46,646評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,709評論 3 342
  • 正文 我和宋清朗相戀三年拓提,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了读恃。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,861評論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡代态,死狀恐怖寺惫,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蹦疑,我是刑警寧澤西雀,帶...
    沈念sama閱讀 36,527評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站歉摧,受9級特大地震影響蒋搜,放射性物質(zhì)發(fā)生泄漏篡撵。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,196評論 3 336
  • 文/蒙蒙 一豆挽、第九天 我趴在偏房一處隱蔽的房頂上張望育谬。 院中可真熱鬧,春花似錦帮哈、人聲如沸膛檀。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,698評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽咖刃。三九已至,卻和暖如春憾筏,著一層夾襖步出監(jiān)牢的瞬間嚎杨,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,804評論 1 274
  • 我被黑心中介騙來泰國打工氧腰, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留枫浙,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,287評論 3 379
  • 正文 我出身青樓古拴,卻偏偏與公主長得像箩帚,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子黄痪,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,860評論 2 361

推薦閱讀更多精彩內(nèi)容