pipeline和handler
ChannelPipline
pipeline可以譯為管道暑刃、流水線悠砚,正如工廠的流水線一樣壹粟,ChannelPipline將各種handler串聯(lián)起來(lái)杖挣,將IO事件在這些handler中進(jìn)行傳播,每個(gè)handler負(fù)責(zé)一部分邏輯。從ChannelPipeline接口定義的方法可以看出來(lái)骤星,它是一個(gè)雙向鏈表,處理過(guò)程類似于JavaWeb中的filter爆哑。這種責(zé)任鏈模式的設(shè)計(jì)不僅有利于解耦洞难,還能動(dòng)態(tài)調(diào)整pipeline中的handler,這一點(diǎn)在前文中的channelInitializerHandler已經(jīng)有所體現(xiàn)揭朝。
ChannelHandler
handler指的是ChannelHandler接口及其子類队贱,是處理讀寫(xiě)事件的類,也是實(shí)際開(kāi)發(fā)時(shí)主要編寫(xiě)的類潭袱。ChannelHandler作為跟借口柱嫌,定義了3個(gè)方法和一個(gè)注解。
public interface ChannelHandler {
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
@interface Sharable {}
}
從方法的名字不難理解這3個(gè)方法分別在handler被添加屯换、移除编丘、拋出異常時(shí)回調(diào)觸發(fā)。而@Sharable注解表明某個(gè)handler實(shí)例可以被多個(gè)pipeline共享(也即多個(gè)channel共享)。
經(jīng)過(guò)pipeline的后嘉抓,handler處理過(guò)的事件會(huì)作為臨近handler的事件入口索守。netty將事件分成了入站事件和出站事件,這里的入和出是相對(duì)于netty所屬的應(yīng)用程序而言的掌眠,一般來(lái)說(shuō)蕾盯,由外部觸發(fā)的事件是inbound事件,而outbound事件是由應(yīng)用程序主動(dòng)請(qǐng)求而觸發(fā)的事件蓝丙。相應(yīng)的,handler也被分成inBoundHandler和outBoundHandler兩種望拖。顧名思義渺尘,inBoundHandler只會(huì)處理inBound事件,outBoundHandler只會(huì)處理outBound事件说敏。具體的入站和出站事件可以參考ChannelInboundHandler和ChannelOutboundHandler2個(gè)接口各自定義的方法鸥跟。
// inbound事件
fireChannelRegistered()
fireChannelActive()
fireChannelRead(Object)
fireChannelReadComplete()
fireExceptionCaught()
fireUserEventTriggered()
fireChannelWritabilityChanged()
fireChannelInactive()
fireChannelUnregistered()
// outbound事件
bind()
connect()
write()
flush()
read()
disconnect()
close()
deregister()
在上述事件中,別的事件都容易理解盔沫,唯獨(dú)read這個(gè)事件出現(xiàn)了3次医咨,容易混淆,所以單獨(dú)拿出來(lái)提一下架诞。
fireChannelRead(Object)和FireChannelReadComplete屬于inBound事件拟淮,而read屬于outBound事件,這表明谴忧,read事件是應(yīng)用程序主動(dòng)觸發(fā)的事件很泊。在ChannelOutBoundInvoker關(guān)于read方法的注釋中也提到,請(qǐng)求將channel中的數(shù)據(jù)讀入第一個(gè)inbound緩沖區(qū)沾谓,然后根據(jù)是否還有數(shù)據(jù)來(lái)決定觸發(fā)channelRead(Object)和channelReadComplete委造。
ChannelHandlerContext
為了使handler類更關(guān)注于實(shí)際對(duì)數(shù)據(jù)的邏輯處理,netty將handler與pipeline關(guān)聯(lián)的過(guò)程交由ChannelHandlerContext完成均驶。熟悉鏈表數(shù)據(jù)結(jié)構(gòu)的都知道昏兆,鏈表的每一個(gè)節(jié)點(diǎn)都包含數(shù)據(jù)域和指針域,顯然妇穴,handler和handlerContext的關(guān)系就像數(shù)據(jù)域和指針域爬虱。但context不僅僅只是一個(gè)指針域,從它的接口定義可以看出來(lái)伟骨,hannelHandlerContext一方面將handler包裹起來(lái)饮潦,繼而進(jìn)行inbound和outbound事件的傳播,另一方面繼承于attributeMap的attr方法也令其可以自定義一些屬性(已經(jīng)被廢棄携狭,轉(zhuǎn)而使用handler的attr方法)继蜡。此外,context還可以為handler賦予名稱、獲取內(nèi)存分配器稀并,它還持有pipeline的引用仅颇,以便在必要時(shí)刻從頭尾指針重新開(kāi)始處理。
ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker{...}
pipeline的初始化
對(duì)以上3個(gè)類有概述性的了解后碘举,我們先看一下pipeline是如何初始化的忘瓦。
在channel初始化時(shí),channel的構(gòu)造函數(shù)初始化了一個(gè)pipeline引颈。
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
可以看到pipeline在初始化時(shí)耕皮,添加了Tail和Head2個(gè)ChannelHandlerContext,且將這2個(gè)節(jié)點(diǎn)作為哨兵節(jié)點(diǎn)蝙场,組成雙向鏈表這樣一個(gè)數(shù)據(jù)結(jié)構(gòu)凌停。
兩個(gè)哨兵的繼承關(guān)系如下
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {...}
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
...
}
可以看到tail節(jié)點(diǎn)只是InboundHandler,而head節(jié)點(diǎn)既是InboundHandler又是OutboundHandler售滤。tailContext通常做的是一個(gè)收尾的工作罚拟,比如異常沒(méi)有捕獲,傳遞到tail完箩,就會(huì)打印日志等等赐俗、釋放內(nèi)存等等;而headContext持有一個(gè)Unsafe對(duì)象弊知,在前文說(shuō)過(guò)阻逮,unsafe是實(shí)現(xiàn)底層數(shù)據(jù)讀寫(xiě)的一個(gè)類,也因此吉捶,head在處理inbount事件時(shí)夺鲜,會(huì)原封不動(dòng)的往下傳播,而處理outbound事件時(shí)呐舔,會(huì)委托給unsafe進(jìn)行處理币励。
這里還有一個(gè)小細(xì)節(jié)。在傳播事件時(shí)需要判斷下一個(gè)handler是否可以處理這個(gè)事件珊拼,netty于是將各種事件用位圖的形式區(qū)分食呻,采用這種方式大大節(jié)省判斷操作所需要的額外空間。
// ChannelHandlerMask類定義的部分事件位運(yùn)算
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
利用掩碼判斷handler處理對(duì)應(yīng)事件
do {
ctx = ctx.prev;
} while ((ctx.executionMask & mask) == 0);
handler的添加和刪除
handler在調(diào)用pipeline的addXXX系列方法里添加澎现,以addLast(ChannelHandler... handlers)方法為例仅胞,默認(rèn)情況下,該方法會(huì)重載到addLast(EventExecutorGroup group, String name, ChannelHandler handler)方法剑辫,默認(rèn)情況下group和name均為null
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
總的來(lái)說(shuō)可以分為3個(gè)步驟:
- 檢查handler是否重復(fù)添加干旧,主要是通過(guò)handler的@Sharable注解和added字段判斷;
- 創(chuàng)建HandlerContext,并添加到鏈表中妹蔽。
- 回調(diào)handlerAdded方法椎眯。
handler的刪除類似挠将,先通過(guò)參數(shù)找到對(duì)應(yīng)的handler,然后刪除鏈表中的context節(jié)點(diǎn)编整,最后回調(diào)handlerRemove方法舔稀。
handler的傳播順序
由于采用了責(zé)任鏈模式,鏈表節(jié)點(diǎn)之間的順序就顯得非常重要了掌测,先看一下inbound事件是如何在pipeline中傳播的
inbount事件的傳播
inbound以AbstractChannelHandlerContext中的fireChannelRead(Object)方法為例内贮。
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
可以看出,fireChannelRead做了2件事
- 通過(guò)事件對(duì)應(yīng)的掩碼找到下一個(gè)inboundHandler
- 將本節(jié)點(diǎn)處理好的數(shù)據(jù)傳播給下一個(gè)inboundHandler
// 步驟1
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}
// 步驟2
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(msg);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(msg);
}
});
}
}
步驟1的實(shí)現(xiàn)是不斷通過(guò)context的executionMask與事件掩碼做與運(yùn)算汞斧,直到與的結(jié)果不為0夜郁。這表明該context對(duì)應(yīng)的handler具備處理對(duì)應(yīng)事件的能力。此外要注意循環(huán)過(guò)程中粘勒,context是next方向拂酣。
步驟2則判斷當(dāng)前線程是否是eventLoop線程,若是仲义,則執(zhí)行下一個(gè)inboundHandlerContext的invokeChannelRead方法,若不是則添加到任務(wù)隊(duì)列里剑勾,待eventLoop線程來(lái)執(zhí)行
至于invokeChannelRead方法也很簡(jiǎn)單埃撵,先判斷該handler是否已存在于pipeline,然后調(diào)用handler的channelRead方法虽另。
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
// 判斷是否添加到pipeline中或即將添加到pipeline中
private boolean invokeHandler() {
int handlerState = this.handlerState;
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}
outbound事件傳播與inbound類似暂刘,只是在通過(guò)掩碼查詢下一個(gè)outboundHandler時(shí)為prev方向,與inbound相反捂刺。具體代碼略過(guò)谣拣。
pipeline與context調(diào)用傳播方法的區(qū)別
pipeline.fireChannelRead()和ChannelHandlerContext.fireChannelRead()在代碼中都時(shí)常出現(xiàn),那么它們的區(qū)別是什么?
不妨看一下DefaultChannelPipeline的fireChannelRead方法族展。
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
}
可以看出森缠,其將headContext作為參數(shù)傳入,調(diào)用了HandlerContext的invokeChannelRead(AbstractChannelHandlerContext, Object)靜態(tài)方法仪缸,這個(gè)靜態(tài)方法會(huì)調(diào)用傳入的HandlerContext的invokeChannelRead(Object)方法贵涵,繼而調(diào)用Context內(nèi)部持有的ChannelInboundHandler的channelRead(ChannelHandlerContext, Object)方法。這個(gè)方法由子類重寫(xiě)恰画,在這里就是HeadContext重寫(xiě)的方法宾茂,它調(diào)用傳入的ChannelHandlerContext,繼續(xù)往下傳播拴还。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg);
}
而DefaultChannelPipeline的read方法則調(diào)用tail的read方法跨晴,tail會(huì)傳播給它的前一個(gè)節(jié)點(diǎn)。
小結(jié)
pipeline調(diào)用傳播方法時(shí)片林,若是inbound事件端盆,從head開(kāi)始往tail方向傳播怀骤,若是outbound事件,從tail開(kāi)始往head方向傳播
context調(diào)用傳播方法爱谁,若是inbound事件晒喷,從當(dāng)前context節(jié)點(diǎn)開(kāi)始往tail方向傳播,若是outbound事件访敌,從當(dāng)前context節(jié)點(diǎn)開(kāi)始往head方向傳播
異常的傳播
異常的傳播路徑
在context處理各種事件時(shí)凉敲,用了channelRead的例子∷峦可以注意到invokeChannelRead方法實(shí)現(xiàn)用了一個(gè)try-catch的寫(xiě)法爷抓。當(dāng)拋出異常時(shí),會(huì)調(diào)用notifyHandlerException(Throwable)阻塑,代碼如下:
private void notifyHandlerException(Throwable cause) {
if (inExceptionCaught(cause)) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler " +
"while handling an exceptionCaught event", cause);
}
return;
}
invokeExceptionCaught(cause);
}
首先調(diào)用inExceptionCaught蓝撇,判斷異常是否發(fā)生在exceptionCaught方法內(nèi)。若是陈莽,則打印警告日志后直接返回渤昌,否則調(diào)用invokeExceptionCaught(Throwable)方法。該方法會(huì)調(diào)用handler復(fù)寫(xiě)的exceptionCaught方法走搁。
若復(fù)寫(xiě)方法調(diào)用了ChannelHandlerContext.fireExceptionCaught方法独柑,則異常會(huì)繼續(xù)往下傳播,不論下一個(gè)節(jié)點(diǎn)是inbound還是outbound私植。若一直傳播到tail忌栅,則會(huì)打印一個(gè)日志,并釋放異常占用的內(nèi)存曲稼。
異常優(yōu)雅處理
在springMvc體系中索绪,通常會(huì)有一個(gè)包含ControllerAdvice注解的類統(tǒng)一進(jìn)行異常的處理,在netty中贫悄,也可以在pipeline的末尾添加一個(gè)異常處理handler統(tǒng)一進(jìn)行異常處理瑞驱。甚至可以用策略模式,對(duì)不同異常類進(jìn)行分門別類的處理清女。