ChannelPipeline是什么
ChannelPipeline
是ChannelHandler
的列表贷揽,用于攔截或處理Channel
的入站事件和出站事件操作袜硫。ChannelPipeline
實(shí)現(xiàn)了攔截過濾器模式衍慎,讓用戶完全控制如何處理事件以及管道中的ChannelHandler
如何相互交互。
每個(gè)Channel
都有各自的ChannelPipeline
纱兑,并且一個(gè)新的Channel
被創(chuàng)建時(shí)該ChannelPipeline
同時(shí)也會(huì)被創(chuàng)建因俐。
事件是如何在Pipeline中流動(dòng)的
下圖描述了ChannelHandler
在ChannelPipeline
中如何處理I/O
事件榴嗅。I/O
事件由ChannelInboundHandler
或ChannelOutboundHandler
處理妄呕,并通過調(diào)用ChannelHandlerContext
中定義的事件傳播方法(例如ChannelHandlerContext#fireChannelRead(Object)
和ChannelHandlerContext#write(Object)
中定義的事件傳播方法轉(zhuǎn)發(fā)給最近的處理程序。
Channel與ChannelPipeline的關(guān)系
我們已經(jīng)知道當(dāng)在創(chuàng)建Channel
時(shí)并會(huì)自動(dòng)創(chuàng)建對應(yīng)的ChannelPipeline
嗽测,所以它們就是一對一的關(guān)系绪励,如下圖所示:
在創(chuàng)建Channel
時(shí)會(huì)創(chuàng)建對應(yīng)的ChannelPipeline
,而在創(chuàng)建ChannelPipeline
時(shí)又會(huì)創(chuàng)建對應(yīng)的TailContext
和HeadContext
,然后將其構(gòu)造成雙向鏈表的節(jié)點(diǎn)唠粥。
接下來我們來具體分析下ChannelPipeline
的實(shí)現(xiàn)過程疏魏。
ChannelPipeline的實(shí)現(xiàn)
在前面分析Bootstrap
和ServerBootstrap
的章節(jié)中我們知道在AbstractChannel
構(gòu)造方法中會(huì)實(shí)例化ChannelPipeline
,所以下面我們一步一步來進(jìn)行分析晤愧。
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
//實(shí)例化ChannelPipeline的操作
pipeline = newChannelPipeline();
}
這里我們只需關(guān)注newChannelPipeline
方法即可大莫,如下:
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
該方法直接將當(dāng)前Channel
對象傳遞給構(gòu)造函數(shù)并實(shí)例化了DefaultChannelPipeline
對象,我們先來看看該類的一個(gè)類結(jié)構(gòu)圖:
我們來分析下DefaultChannelPipeline
對應(yīng)的父類接口ChannelInboundInvoker
官份,該接口主要定義了入站事件方法只厘,如下所示:
/**
* 入站事件傳播方法:
* ChannelHandlerContext#fireChannelRegistered()
* ChannelHandlerContext#fireChannelActive()
* ChannelHandlerContext#fireChannelRead(Object)
* ChannelHandlerContext#fireChannelReadComplete()
* ChannelHandlerContext#fireExceptionCaught(Throwable)
* ChannelHandlerContext#fireUserEventTriggered(Object)
* ChannelHandlerContext#fireChannelWritabilityChanged()
* ChannelHandlerContext#fireChannelInactive()
* ChannelHandlerContext#fireChannelUnregistered()
*/
/**
* Channel已注冊到EventLoop
*/
ChannelInboundInvoker fireChannelRegistered();
/**
* Channel已從其EventLoop中取消注冊
*/
ChannelInboundInvoker fireChannelUnregistered();
/**
* Channel當(dāng)前處于活動(dòng)狀態(tài),表示已連接
*/
ChannelInboundInvoker fireChannelActive();
/**
* Channel當(dāng)前處于非活動(dòng)狀態(tài)舅巷,表示連接已關(guān)閉
*/
ChannelInboundInvoker fireChannelInactive();
/**
* Channel在入站操作中收到了異常
*/
ChannelInboundInvoker fireExceptionCaught(Throwable cause);
/**
* Channel接收到用戶自定義事件
*/
ChannelInboundInvoker fireUserEventTriggered(Object event);
/**
* Channel接收到一條消息
*/
ChannelInboundInvoker fireChannelRead(Object msg);
/**
* 觸發(fā)ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)事件到ChannelPipeline中的下一個(gè)ChannelInboundHandler事件
*/
ChannelInboundInvoker fireChannelReadComplete();
/**
* 觸發(fā)ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)事件到ChannelPipeline中的下一個(gè)ChannelInboundHandler
*/
ChannelInboundInvoker fireChannelWritabilityChanged();
父類接口ChannelOutboundInvoker
主要定義了出站事件方法羔味,如下所示:
/**
* 出站事件傳播方法:
* ChannelHandlerContext#bind(SocketAddress, ChannelPromise)
* ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)
* ChannelHandlerContext#write(Object, ChannelPromise)
* ChannelHandlerContext#flush()
* ChannelHandlerContext#read()
* ChannelHandlerContext#disconnect(ChannelPromise)
* ChannelHandlerContext#close(ChannelPromise)
* ChannelHandlerContext#deregister(ChannelPromise)
*/
/**
* 請求綁定給定的SocketAddress一旦操作完成并通知ChannelFuture,要么操作成功或者錯(cuò)誤
*/
ChannelFuture bind(SocketAddress localAddress);
/**
* 請求連接給定的SocketAddress并在操作完成后通知ChannelFuture钠右,要么操作成功或者錯(cuò)誤
*/
ChannelFuture connect(SocketAddress remoteAddress);
/**
* 請求連接給定的SocketAddress赋元,同時(shí)綁定到localAddress并在操作完成后通知ChannelFuture,要么操作成功或者錯(cuò)誤
*/
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
/**
* 請求與遠(yuǎn)程節(jié)點(diǎn)斷開連接并在操作完成后通知ChannelFuture(要么操作成功要么因?yàn)殄e(cuò)誤)
*/
ChannelFuture disconnect();
/**
* 請求關(guān)閉Channel并在操作完成后通知ChannelFuture,關(guān)閉后不能再重復(fù)使用
*/
ChannelFuture close();
/**
* 請求從先前分配的EventExecutor中注銷,并在操作完成后通知ChannelFuture
*/
ChannelFuture deregister();
/**
* 請求綁定給定的SocketAddress并在操作完成后通知給定的ChannelPromise
*/
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
/**
* 請求連接給定的SocketAddress并在操作完成后通知給定的ChannelPromise
*/
ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
/**
* 請求連接給定的SocketAddress同時(shí)綁定到localAddress搁凸,并在操作完成后通知ChannelPromise
*/
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
/**
* 請求斷開遠(yuǎn)程節(jié)點(diǎn)的連接并在操作完成后通知給定的ChannelPromise
*/
ChannelFuture disconnect(ChannelPromise promise);
/**
* 請求關(guān)閉Channel并在操作完成后通知給定的ChannelPromise,該Channel關(guān)閉后不可復(fù)用
*/
ChannelFuture close(ChannelPromise promise);
/**
* 請求從先前分配的EventExecutor中注銷媚值,并在操作完成后通知給定的ChannelPromise
*/
ChannelFuture deregister(ChannelPromise promise);
/**
* 請求從Channel讀取數(shù)據(jù)到第一個(gè)入站緩沖區(qū),如果讀取了數(shù)據(jù)护糖,則觸發(fā)ChannelInboundHandler#channelRead(ChannelHandlerContext,Object)事件杂腰,并觸發(fā)ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)事件,以便處理程序可以決定是否繼續(xù)讀取椅文。如果已經(jīng)有一個(gè)待處理的讀取操作,則此方法不執(zhí)行任何操作惜颇。
*/
ChannelOutboundInvoker read();
/**
* 通過ChannelPipeline請求通過ChannelHandler寫入消息皆刺。該方法不會(huì)執(zhí)行實(shí)際寫出消息,所以要確保執(zhí)行#flush()方法將所有未寫出的消息刷新到實(shí)際傳輸中凌摄。
*/
ChannelFuture write(Object msg);
/**
* 通過ChannelPipeline請求通過ChannelHandlerContext寫出消息羡蛾。該方法不會(huì)執(zhí)行實(shí)際寫出消息,所以要確保執(zhí)行#flush()方法將所有未寫出的消息刷新到實(shí)際傳輸中锨亏。
*/
ChannelFuture write(Object msg, ChannelPromise promise);
/**
* 請求通過ChannelOutboundInvoker刷新所有等待的消息
*/
ChannelOutboundInvoker flush();
/**
* 簡化調(diào)用##write(Object, ChannelPromise)和#flush()
*/
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
/**
* 簡化調(diào)用#write(Object)和#flush()
*/
ChannelFuture writeAndFlush(Object msg);
/**
* 返回一個(gè)新的ChannelPromise
*/
ChannelPromise newPromise();
/**
* 返回一個(gè)新的ChannelProgressPromise
*/
ChannelProgressivePromise newProgressivePromise();
/**
* 創(chuàng)建一個(gè)新的ChannelFuture并將其標(biāo)記為已成功痴怨。因此ChannelFuture#isSuccess()返回為true。添加到其中的所有FutureListener將直接受到通知器予。通用每次調(diào)用阻塞方法都將返回而不會(huì)阻塞
*/
ChannelFuture newSucceededFuture();
/**
* 創(chuàng)建一個(gè)新的ChannelFuture并將其標(biāo)記為已失敗浪藻。因此ChannelFuture#isSuccess()返回為false。添加到其中的所有FutureListener將直接受到通知乾翔。通用每次調(diào)用阻塞方法都將返回而不會(huì)阻塞
*/
ChannelFuture newFailedFuture(Throwable cause);
/**
* 返回一個(gè)特殊的ChannelPromise爱葵,可以將其重復(fù)用于不同的操作
*/
ChannelPromise voidPromise();
ChannelPipeline接口主要定義了一些ChannelHandler的一些操作,如下所示:
public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
ChannelPipeline addFirst(String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addLast(String name, ChannelHandler handler);
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addFirst(ChannelHandler... handlers);
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addLast(ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline remove(ChannelHandler handler);
ChannelHandler remove(String name);
<T extends ChannelHandler> T remove(Class<T> handlerType);
ChannelHandler removeFirst();
ChannelHandler removeLast();
ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
<T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName,
ChannelHandler newHandler);
ChannelHandler first();
ChannelHandlerContext firstContext();
ChannelHandler last();
ChannelHandlerContext lastContext();
ChannelHandler get(String name);
<T extends ChannelHandler> T get(Class<T> handlerType);
ChannelHandlerContext context(ChannelHandler handler);
ChannelHandlerContext context(String name);
ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);
Channel channel();
List<String> names();
Map<String, ChannelHandler> toMap();
}
ChannelPipeline
接口主要定義了一些常用添加ChannelHandler
的操作反浓,這里就不過講解了萌丈。
講完了ChannelPipeline
的繼承接口之后,我們來看看具體實(shí)現(xiàn)DefaultChannelPipeline
的實(shí)現(xiàn)雷则。
DefaultChannelPipeline
的構(gòu)造方法如下:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
這里先將Channel
進(jìn)行保存辆雾,然后分別實(shí)例化了TailContext
和HeadContext
,因?yàn)檫@2個(gè)比較重要,所以來研究下其具體實(shí)現(xiàn)月劈,這里先來看看TailContext
的類圖度迂。
由上圖可知,TailContext
繼承了AbstractChannelHandlerContext
并且實(shí)現(xiàn)了ChannelInboundHandlerHandler
接口艺栈,所以本質(zhì)上TailContext
既是一個(gè)標(biāo)準(zhǔn)的Handler
也是一個(gè)HandlerContext
英岭, 接下來看下其源碼:
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, TailContext.class);
setAddComplete();
}
}
這里構(gòu)造方法中調(diào)用了父類AbstractChannelHandlerContext
的構(gòu)造方法,進(jìn)來看看:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.executionMask = mask(handlerClass);
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
因?yàn)檫@里都是賦值操作湿右,我們來看看#mask()方法是如何計(jì)算得出executionMask的诅妹。
這里說明一下executionMask是干什么的,早在之前Netty
的版本中是用instanceof
來判斷是inbound
事件還是outbound
事件,這樣判斷的結(jié)果是比較暴力的吭狡,因?yàn)槲覀內(nèi)绻x了一個(gè)inbound
處理程序尖殃,但是我這個(gè)程序只希望處理我想處理的事件,因?yàn)槭褂?code>instanceof判斷的方式使我們不得不處理我們不想處理的事件划煮,所以在一定的程度上加大了耦合程度送丰,在新版Netty
中采用了位運(yùn)算來判斷,并且粒度更細(xì)(方法級別)弛秋,使得我們不必關(guān)系我們不關(guān)系的事件器躏,使得Handler
更加靈活。
接下來我們看下ChannelHandlerMask類定義的粒度:
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
static final int MASK_BIND = 1 << 9;
static final int MASK_CONNECT = 1 << 10;
static final int MASK_DISCONNECT = 1 << 11;
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;
static final int MASK_ONLY_INBOUND = MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
static final int MASK_ONLY_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;
因?yàn)閑xecutionMask是通過mask方法計(jì)算得出蟹略,這里跟進(jìn)mask方法:
static int mask(Class<? extends ChannelHandler> clazz) {
// Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast
// lookup in the future.
Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) {
mask = mask0(clazz);
cache.put(clazz, mask);
}
return mask;
}
mask方法中做了一層緩存處理登失,這里我們直接跟進(jìn)mask0()方法是如何計(jì)算得出的:
private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_ACTIVE;
}
if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_INACTIVE;
}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_CHANNEL_READ;
}
if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_READ_COMPLETE;
}
if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_USER_EVENT_TRIGGERED;
}
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;
if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_CONNECT;
}
if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DISCONNECT;
}
if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_CLOSE;
}
if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DEREGISTER;
}
if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
mask &= ~MASK_READ;
}
if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
Object.class, ChannelPromise.class)) {
mask &= ~MASK_WRITE;
}
if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
mask &= ~MASK_FLUSH;
}
}
if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
mask &= ~MASK_EXCEPTION_CAUGHT;
}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}
return mask;
}
我們先來看下isSkippable方法的實(shí)現(xiàn),最后再來說明該方法:
private static boolean isSkippable(final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {
Method m;
try {
m = handlerType.getMethod(methodName, paramTypes);
} catch (NoSuchMethodException e) {
return false;
}
return m != null && m.isAnnotationPresent(Skip.class);
}
});
}
該方法用于判斷給定的Class
對象是否有給定的方法或者是否有對應(yīng)Skip
注解挖炬,如果有則返回true
,否則返回false
揽浙。
再回到mask0()方法,該方法中意敛,首先mask=MASK_EXCEPTION_CAUGHT馅巷,然后判斷給定的Handler
是屬于ChannelInboundHandler
還是ChannelOutboundHandler
,進(jìn)而再判斷對應(yīng)的事件方法草姻,如下整理了其大致結(jié)構(gòu):
InboundHandler事件:
- MASK_EXCEPTION_CAUGHT
- MASK_CHANNEL_REGISTERED
- MASK_CHANNEL_UNREGISTERED
- MASK_CHANNEL_ACTIVE
- MASK_CHANNEL_INACTIVE
- MASK_CHANNEL_READ
- MASK_CHANNEL_READ_COMPLETE
- MASK_CHANNEL_WRITABILITY_CHANGED
- MASK_USER_EVENT_TRIGGERED
OutboundHandler事件:
- MASK_EXCEPTION_CAUGHT
- MASK_BIND
- MASK_CONNECT
- MASK_DISCONNECT
- MASK_CLOSE
- MASK_DEREGISTER
- MASK_READ
- MASK_WRITE
- MASK_FLUSH
也就是如果給定的Handler
類型為inbound
則該mask默認(rèn)處理所有對應(yīng)的inbound
事件钓猬,然后通過isSkippable方法來判斷該handler
是否有處理該事件的方法或者該方法是否有@Skip注解,如果存在該條件則從該mask中移除該事件撩独,表示后面ChannelPipeline
通過位運(yùn)算查找對應(yīng)事件處理的Handler
時(shí)逗噩,該Handler
默認(rèn)會(huì)將其過濾掉,自然也不會(huì)觸發(fā)該事件了跌榔。
在TailContext
中异雁,因?yàn)閷?shí)現(xiàn)了ChannelInboundHandler
,執(zhí)行結(jié)果如下,所以最終計(jì)算得出的executionMask為511僧须。
int mask = MASK_EXCEPTION_CAUGHT; // mask = 1
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND; // mask=511
}
//bit
// 1 |= 511
// 0000 0000 0001
//|0001 1111 1111
//---------------
// 0001 1111 1111
//mask = 511
我們再回到TailContext
的構(gòu)造函數(shù)中纲刀,接下來執(zhí)行了一個(gè)setAddComplete()方法,如下所示:
final boolean setAddComplete() {
for (;;) {
int oldState = handlerState;
if (oldState == REMOVE_COMPLETE) {
return false;
}
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
// exposing ordering guarantees.
if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return true;
}
}
}
該方法采用CAS
的方式改變Handler
狀態(tài)為已添加完成担平。
到此為止示绊,TailContext
的工作到這里就完成了,接下里我們看看它的兄弟HeadContext
暂论。
HeadContext
HeadContext
和TailContext
的不同之處在于該對象同時(shí)實(shí)現(xiàn)了ChannelInboundHandler
和ChannelOutboundHandler
面褐,同時(shí)也繼承了AbstractHandlerContext
,一起來看下類結(jié)構(gòu)圖取胎。
接下來看看其代碼實(shí)現(xiàn):
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
}
該對象與TailContext
大致相同展哭,所以就不具體講解了湃窍,不同之處該對象引用了一個(gè)Unsafe
的引用。因?yàn)?code>HeadContext同時(shí)實(shí)現(xiàn)了ChannelInboundHandler
和ChannelOutboundHandler
匪傍,所以其executionMask=131071您市。
我們再回到DefaultChannelPipeline
構(gòu)造函數(shù)中,如下:
protected DefaultChannelPipeline(Channel channel) {
//...
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
在完成了TailContext
和HeadContext
實(shí)例化后役衡,此時(shí)tail和head就分別指向了不同的AbstractHandlerContext
茵休,最終通過head.next=tail和tail.prev = head進(jìn)行鏈表關(guān)聯(lián),如下所示手蝎。
ChannelHandler是如何添加的
當(dāng)我們要添加一個(gè)Handler
的時(shí)候榕莺,內(nèi)部是如何實(shí)現(xiàn)的呢,接下來使用addLast方法來分析其實(shí)現(xiàn)棵介。
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
當(dāng)我們調(diào)用ChannelPipeline
添加一個(gè)Handler
時(shí)帽撑,我們假設(shè)調(diào)用的是addLast方法,首先會(huì)調(diào)用checkMultiplicity方法來檢測是否重復(fù)添加鞍时,如下所示:
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
該方法判斷Handler
是否重復(fù)添加,判斷條件為Handler
的added字段是否為true扣蜻,或者該Handler
是否被標(biāo)記為@Sharable重復(fù)使用逆巍,否則直接拋出錯(cuò)誤,重復(fù)添加Handler
莽使,否則該Handler
為首次添加锐极,將added字段設(shè)置為true,表示已經(jīng)添加芳肌,防止重復(fù)添加的判斷條件灵再。
判斷重復(fù)添加工作后,接下來調(diào)用了newContext方法實(shí)例化了一個(gè)AbstractChannelHandlerContext對象亿笤,代碼如下:
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
該方法直接實(shí)例化了一個(gè)DefaultChannelHandlerContext
對象翎迁,一起來看看該類的實(shí)現(xiàn):
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, handler.getClass());
this.handler = handler;
}
@Override
public ChannelHandler handler() {
return handler;
}
}
該對象和TailContext
、HeadContext
的實(shí)現(xiàn)基本上差不多净薛,前文有提汪榔,這里就不過多深入講解了。
執(zhí)行完newContext()方法后肃拜,接下來就執(zhí)行了addLast0()方法了痴腌,如下所示:
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
可以看到的是,這是將節(jié)點(diǎn)進(jìn)行鏈接到對應(yīng)的鏈表節(jié)點(diǎn)上燃领,執(zhí)行完的鏈表結(jié)果如下所示:
到此為止士聪,一個(gè)完整的Handler
添加過程就完成了,當(dāng)然還有一些其它操作過程這里就不細(xì)講了猛蔽,有興趣可以自行了解剥悟。
總結(jié)
通過源碼分析,我們可以知道ChannelPipeline
就像是一個(gè)大管家,管理著多個(gè)Handler
和HandlerContext
的關(guān)聯(lián)懦胞,任何入站和出站的事件通過位運(yùn)算找到對應(yīng)需要處理的Handler
替久,然后根據(jù)規(guī)則流經(jīng)不同的Handler
事件處理方法。當(dāng)一個(gè)入站事件觸發(fā)的時(shí)候會(huì)從head節(jié)點(diǎn)躏尉,依次找到合適節(jié)點(diǎn)進(jìn)行處理蚯根,出站事件則會(huì)從tail節(jié)點(diǎn)開始依次找到合適的節(jié)點(diǎn)進(jìn)行處理。