6.ChannelPipeline

pipeline和handler

ChannelPipline

pipeline可以譯為管道暑刃、流水線悠砚,正如工廠的流水線一樣壹粟,ChannelPipline將各種handler串聯(lián)起來(lái)杖挣,將IO事件在這些handler中進(jìn)行傳播,每個(gè)handler負(fù)責(zé)一部分邏輯。從ChannelPipeline接口定義的方法可以看出來(lái)骤星,它是一個(gè)雙向鏈表,處理過(guò)程類似于JavaWeb中的filter爆哑。這種責(zé)任鏈模式的設(shè)計(jì)不僅有利于解耦洞难,還能動(dòng)態(tài)調(diào)整pipeline中的handler,這一點(diǎn)在前文中的channelInitializerHandler已經(jīng)有所體現(xiàn)揭朝。

ChannelHandler

handler指的是ChannelHandler接口及其子類队贱,是處理讀寫(xiě)事件的類,也是實(shí)際開(kāi)發(fā)時(shí)主要編寫(xiě)的類潭袱。ChannelHandler作為跟借口柱嫌,定義了3個(gè)方法和一個(gè)注解。

public interface ChannelHandler {

    void handlerAdded(ChannelHandlerContext ctx) throws Exception;

    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

    @interface Sharable {}
}

從方法的名字不難理解這3個(gè)方法分別在handler被添加屯换、移除编丘、拋出異常時(shí)回調(diào)觸發(fā)。而@Sharable注解表明某個(gè)handler實(shí)例可以被多個(gè)pipeline共享(也即多個(gè)channel共享)。
經(jīng)過(guò)pipeline的后嘉抓,handler處理過(guò)的事件會(huì)作為臨近handler的事件入口索守。netty將事件分成了入站事件和出站事件,這里的入和出是相對(duì)于netty所屬的應(yīng)用程序而言的掌眠,一般來(lái)說(shuō)蕾盯,由外部觸發(fā)的事件是inbound事件,而outbound事件是由應(yīng)用程序主動(dòng)請(qǐng)求而觸發(fā)的事件蓝丙。相應(yīng)的,handler也被分成inBoundHandler和outBoundHandler兩種望拖。顧名思義渺尘,inBoundHandler只會(huì)處理inBound事件,outBoundHandler只會(huì)處理outBound事件说敏。具體的入站和出站事件可以參考ChannelInboundHandler和ChannelOutboundHandler2個(gè)接口各自定義的方法鸥跟。

// inbound事件
fireChannelRegistered()
fireChannelActive()
fireChannelRead(Object)
fireChannelReadComplete()
fireExceptionCaught()
fireUserEventTriggered()
fireChannelWritabilityChanged()
fireChannelInactive()
fireChannelUnregistered()

// outbound事件
bind()
connect()
write()
flush()
read()
disconnect()
close()
deregister()

在上述事件中,別的事件都容易理解盔沫,唯獨(dú)read這個(gè)事件出現(xiàn)了3次医咨,容易混淆,所以單獨(dú)拿出來(lái)提一下架诞。
fireChannelRead(Object)和FireChannelReadComplete屬于inBound事件拟淮,而read屬于outBound事件,這表明谴忧,read事件是應(yīng)用程序主動(dòng)觸發(fā)的事件很泊。在ChannelOutBoundInvoker關(guān)于read方法的注釋中也提到,請(qǐng)求將channel中的數(shù)據(jù)讀入第一個(gè)inbound緩沖區(qū)沾谓,然后根據(jù)是否還有數(shù)據(jù)來(lái)決定觸發(fā)channelRead(Object)和channelReadComplete委造。

ChannelHandlerContext

為了使handler類更關(guān)注于實(shí)際對(duì)數(shù)據(jù)的邏輯處理,netty將handler與pipeline關(guān)聯(lián)的過(guò)程交由ChannelHandlerContext完成均驶。熟悉鏈表數(shù)據(jù)結(jié)構(gòu)的都知道昏兆,鏈表的每一個(gè)節(jié)點(diǎn)都包含數(shù)據(jù)域和指針域,顯然妇穴,handler和handlerContext的關(guān)系就像數(shù)據(jù)域和指針域爬虱。但context不僅僅只是一個(gè)指針域,從它的接口定義可以看出來(lái)伟骨,hannelHandlerContext一方面將handler包裹起來(lái)饮潦,繼而進(jìn)行inbound和outbound事件的傳播,另一方面繼承于attributeMap的attr方法也令其可以自定義一些屬性(已經(jīng)被廢棄携狭,轉(zhuǎn)而使用handler的attr方法)继蜡。此外,context還可以為handler賦予名稱、獲取內(nèi)存分配器稀并,它還持有pipeline的引用仅颇,以便在必要時(shí)刻從頭尾指針重新開(kāi)始處理。

ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker{...}

pipeline的初始化

對(duì)以上3個(gè)類有概述性的了解后碘举,我們先看一下pipeline是如何初始化的忘瓦。
在channel初始化時(shí),channel的構(gòu)造函數(shù)初始化了一個(gè)pipeline引颈。

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    tail = new TailContext(this);
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
}

可以看到pipeline在初始化時(shí)耕皮,添加了Tail和Head2個(gè)ChannelHandlerContext,且將這2個(gè)節(jié)點(diǎn)作為哨兵節(jié)點(diǎn)蝙场,組成雙向鏈表這樣一個(gè)數(shù)據(jù)結(jié)構(gòu)凌停。
兩個(gè)哨兵的繼承關(guān)系如下

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {...}
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
    private final Unsafe unsafe;
    ...
}

可以看到tail節(jié)點(diǎn)只是InboundHandler,而head節(jié)點(diǎn)既是InboundHandler又是OutboundHandler售滤。tailContext通常做的是一個(gè)收尾的工作罚拟,比如異常沒(méi)有捕獲,傳遞到tail完箩,就會(huì)打印日志等等赐俗、釋放內(nèi)存等等;而headContext持有一個(gè)Unsafe對(duì)象弊知,在前文說(shuō)過(guò)阻逮,unsafe是實(shí)現(xiàn)底層數(shù)據(jù)讀寫(xiě)的一個(gè)類,也因此吉捶,head在處理inbount事件時(shí)夺鲜,會(huì)原封不動(dòng)的往下傳播,而處理outbound事件時(shí)呐舔,會(huì)委托給unsafe進(jìn)行處理币励。
這里還有一個(gè)小細(xì)節(jié)。在傳播事件時(shí)需要判斷下一個(gè)handler是否可以處理這個(gè)事件珊拼,netty于是將各種事件用位圖的形式區(qū)分食呻,采用這種方式大大節(jié)省判斷操作所需要的額外空間。
// ChannelHandlerMask類定義的部分事件位運(yùn)算

static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;

利用掩碼判斷handler處理對(duì)應(yīng)事件

do {
    ctx = ctx.prev;
} while ((ctx.executionMask & mask) == 0);

handler的添加和刪除

handler在調(diào)用pipeline的addXXX系列方法里添加澎现,以addLast(ChannelHandler... handlers)方法為例仅胞,默認(rèn)情況下,該方法會(huì)重載到addLast(EventExecutorGroup group, String name, ChannelHandler handler)方法剑辫,默認(rèn)情況下group和name均為null

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);
        newCtx = newContext(group, filterName(name, handler), handler);
        addLast0(newCtx);
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

總的來(lái)說(shuō)可以分為3個(gè)步驟:

  1. 檢查handler是否重復(fù)添加干旧,主要是通過(guò)handler的@Sharable注解和added字段判斷;
  2. 創(chuàng)建HandlerContext,并添加到鏈表中妹蔽。
  3. 回調(diào)handlerAdded方法椎眯。

handler的刪除類似挠将,先通過(guò)參數(shù)找到對(duì)應(yīng)的handler,然后刪除鏈表中的context節(jié)點(diǎn)编整,最后回調(diào)handlerRemove方法舔稀。

handler的傳播順序

由于采用了責(zé)任鏈模式,鏈表節(jié)點(diǎn)之間的順序就顯得非常重要了掌测,先看一下inbound事件是如何在pipeline中傳播的

inbount事件的傳播

inbound以AbstractChannelHandlerContext中的fireChannelRead(Object)方法為例内贮。

public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
    return this;
}

可以看出,fireChannelRead做了2件事

  1. 通過(guò)事件對(duì)應(yīng)的掩碼找到下一個(gè)inboundHandler
  2. 將本節(jié)點(diǎn)處理好的數(shù)據(jù)傳播給下一個(gè)inboundHandler
// 步驟1
private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while ((ctx.executionMask & mask) == 0);
    return ctx;
}    
// 步驟2
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(msg);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(msg);
            }
        });
    }
}

步驟1的實(shí)現(xiàn)是不斷通過(guò)context的executionMask與事件掩碼做與運(yùn)算汞斧,直到與的結(jié)果不為0夜郁。這表明該context對(duì)應(yīng)的handler具備處理對(duì)應(yīng)事件的能力。此外要注意循環(huán)過(guò)程中粘勒,context是next方向拂酣。
步驟2則判斷當(dāng)前線程是否是eventLoop線程,若是仲义,則執(zhí)行下一個(gè)inboundHandlerContext的invokeChannelRead方法,若不是則添加到任務(wù)隊(duì)列里剑勾,待eventLoop線程來(lái)執(zhí)行
至于invokeChannelRead方法也很簡(jiǎn)單埃撵,先判斷該handler是否已存在于pipeline,然后調(diào)用handler的channelRead方法虽另。

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}
// 判斷是否添加到pipeline中或即將添加到pipeline中
private boolean invokeHandler() {
    int handlerState = this.handlerState;
    return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}

outbound事件傳播與inbound類似暂刘,只是在通過(guò)掩碼查詢下一個(gè)outboundHandler時(shí)為prev方向,與inbound相反捂刺。具體代碼略過(guò)谣拣。

pipeline與context調(diào)用傳播方法的區(qū)別

pipeline.fireChannelRead()和ChannelHandlerContext.fireChannelRead()在代碼中都時(shí)常出現(xiàn),那么它們的區(qū)別是什么?
不妨看一下DefaultChannelPipeline的fireChannelRead方法族展。

public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
}

可以看出森缠,其將headContext作為參數(shù)傳入,調(diào)用了HandlerContext的invokeChannelRead(AbstractChannelHandlerContext, Object)靜態(tài)方法仪缸,這個(gè)靜態(tài)方法會(huì)調(diào)用傳入的HandlerContext的invokeChannelRead(Object)方法贵涵,繼而調(diào)用Context內(nèi)部持有的ChannelInboundHandler的channelRead(ChannelHandlerContext, Object)方法。這個(gè)方法由子類重寫(xiě)恰画,在這里就是HeadContext重寫(xiě)的方法宾茂,它調(diào)用傳入的ChannelHandlerContext,繼續(xù)往下傳播拴还。

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.fireChannelRead(msg);
}

而DefaultChannelPipeline的read方法則調(diào)用tail的read方法跨晴,tail會(huì)傳播給它的前一個(gè)節(jié)點(diǎn)。
小結(jié)
pipeline調(diào)用傳播方法時(shí)片林,若是inbound事件端盆,從head開(kāi)始往tail方向傳播怀骤,若是outbound事件,從tail開(kāi)始往head方向傳播
context調(diào)用傳播方法爱谁,若是inbound事件晒喷,從當(dāng)前context節(jié)點(diǎn)開(kāi)始往tail方向傳播,若是outbound事件访敌,從當(dāng)前context節(jié)點(diǎn)開(kāi)始往head方向傳播

異常的傳播

異常的傳播路徑

在context處理各種事件時(shí)凉敲,用了channelRead的例子∷峦可以注意到invokeChannelRead方法實(shí)現(xiàn)用了一個(gè)try-catch的寫(xiě)法爷抓。當(dāng)拋出異常時(shí),會(huì)調(diào)用notifyHandlerException(Throwable)阻塑,代碼如下:

private void notifyHandlerException(Throwable cause) {
    if (inExceptionCaught(cause)) {
        if (logger.isWarnEnabled()) {
            logger.warn(
                    "An exception was thrown by a user handler " +
                            "while handling an exceptionCaught event", cause);
        }
        return;
    }
    invokeExceptionCaught(cause);
}

首先調(diào)用inExceptionCaught蓝撇,判斷異常是否發(fā)生在exceptionCaught方法內(nèi)。若是陈莽,則打印警告日志后直接返回渤昌,否則調(diào)用invokeExceptionCaught(Throwable)方法。該方法會(huì)調(diào)用handler復(fù)寫(xiě)的exceptionCaught方法走搁。
若復(fù)寫(xiě)方法調(diào)用了ChannelHandlerContext.fireExceptionCaught方法独柑,則異常會(huì)繼續(xù)往下傳播,不論下一個(gè)節(jié)點(diǎn)是inbound還是outbound私植。若一直傳播到tail忌栅,則會(huì)打印一個(gè)日志,并釋放異常占用的內(nèi)存曲稼。

異常優(yōu)雅處理

在springMvc體系中索绪,通常會(huì)有一個(gè)包含ControllerAdvice注解的類統(tǒng)一進(jìn)行異常的處理,在netty中贫悄,也可以在pipeline的末尾添加一個(gè)異常處理handler統(tǒng)一進(jìn)行異常處理瑞驱。甚至可以用策略模式,對(duì)不同異常類進(jìn)行分門別類的處理清女。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末钱烟,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子嫡丙,更是在濱河造成了極大的恐慌拴袭,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,817評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件曙博,死亡現(xiàn)場(chǎng)離奇詭異拥刻,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)父泳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門般哼,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)吴汪,“玉大人,你說(shuō)我怎么就攤上這事蒸眠⊙龋” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,354評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵楞卡,是天一觀的道長(zhǎng)霜运。 經(jīng)常有香客問(wèn)我,道長(zhǎng)蒋腮,這世上最難降的妖魔是什么淘捡? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,498評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮池摧,結(jié)果婚禮上焦除,老公的妹妹穿的比我還像新娘。我一直安慰自己作彤,他們只是感情好膘魄,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,600評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著竭讳,像睡著了一般瓣距。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上代咸,一...
    開(kāi)封第一講書(shū)人閱讀 49,829評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音成黄,去河邊找鬼呐芥。 笑死,一個(gè)胖子當(dāng)著我的面吹牛奋岁,可吹牛的內(nèi)容都是我干的思瘟。 我是一名探鬼主播,決...
    沈念sama閱讀 38,979評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼闻伶,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼滨攻!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起蓝翰,我...
    開(kāi)封第一講書(shū)人閱讀 37,722評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤光绕,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后畜份,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體诞帐,經(jīng)...
    沈念sama閱讀 44,189評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,519評(píng)論 2 327
  • 正文 我和宋清朗相戀三年爆雹,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了停蕉。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片愕鼓。...
    茶點(diǎn)故事閱讀 38,654評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖慧起,靈堂內(nèi)的尸體忽然破棺而出菇晃,到底是詐尸還是另有隱情,我是刑警寧澤蚓挤,帶...
    沈念sama閱讀 34,329評(píng)論 4 330
  • 正文 年R本政府宣布磺送,位于F島的核電站,受9級(jí)特大地震影響屈尼,放射性物質(zhì)發(fā)生泄漏册着。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,940評(píng)論 3 313
  • 文/蒙蒙 一脾歧、第九天 我趴在偏房一處隱蔽的房頂上張望甲捏。 院中可真熱鬧,春花似錦鞭执、人聲如沸司顿。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,762評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)大溜。三九已至,卻和暖如春估脆,著一層夾襖步出監(jiān)牢的瞬間钦奋,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,993評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工疙赠, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留付材,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,382評(píng)論 2 360
  • 正文 我出身青樓圃阳,卻偏偏與公主長(zhǎng)得像厌衔,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子捍岳,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,543評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容