1、Channel打厘、ChannelPipeline、ChannelHandler贺辰、ChannelHandlerContext關(guān)系
四者關(guān)系如下圖:
Channel:Channel為通信載體户盯,負責底層傳輸層的具體事件及消息處理,其封裝底層處理的復(fù)雜性饲化,通過統(tǒng)一接口將事件及消息交給ChannelPipeline處理莽鸭。
ChannelPipeline:ChannelPipeline為消息的管道,一個Channel對應(yīng)唯一ChannelPipeline吃靠,ChannelPipeline中包含多個ChannelHandlerContext硫眨,各個ChannelHandlerContext以鏈表的形式構(gòu)成消息處理的責任鏈,而ChannelPipeline并不對消息做處理巢块,其只是轉(zhuǎn)發(fā)給ChannelHandlerContext處理礁阁,而ChannelHandlerContext又交給具體的ChannelHandler處理,并將處理后的消息沿著鏈表轉(zhuǎn)發(fā)給下一個ChannelHandlerContext夕冲。
ChannelHandlerContext:ChannelHandlerContext為ChannelPipeline和ChannelHandler的上下文,其保存對應(yīng)的ChannelPipeline及ChannelHandler裂逐,并且根據(jù)添加順序歹鱼,多個ChannelHandlerContext之間構(gòu)成鏈表。ChannelHandlerContext提供和ChannelPipeline類似的方法卜高,但調(diào)用ChannelHandlerContext上的方法只會從當前的ChannelHandler開始向下一個ChannelHandler傳播弥姻;而調(diào)用ChannelPipeline上的方法會從鏈表頭或尾向下傳播南片。
ChannelHandler:ChannelHandler為具體的消息處理類,其由應(yīng)用層定義庭敦。消息由某個ChannelHandler處理完后疼进,會沿著鏈表將消息交由下個ChannelHandler處理。
2秧廉、ChannelPipeline源碼分析
ChannelPipeline類繼圖:
說明:
Iterable:遍歷器接口伞广,具體接口為:Iterable<Entry<String, ChannelHandler>>,其提供iterator()疼电、forEach()等方法嚼锄,用于遍歷管道中的ChannelHandler。
ChannelInboundInvoker:管道的入口事件處理接口蔽豺,對于Channel中的入口事件都是通過此接口進行處理的区丑。消息類型包括:register、unregister修陡、active沧侥、inactive、exception魄鸦、read等宴杀;
ChannelOutboundInvoker:管道的出口事件處理接口,對于Channel相關(guān)的出口事件都是通過此接口進行處理的号杏。消息類型包括:bind婴氮、connect、close盾致、write主经、flush等。
ChannelPipeline:管道相關(guān)操作接口庭惜,提供了對管道中的ChannelHandler進行增刪改查等接口罩驻,包括:addFirst、addLast等护赊。
DefaultChannelPipeline:ChannelPipeline的默認實現(xiàn)類惠遏。
2.1、遍歷處理器
pipeline提供其對應(yīng)的Handler的遍歷處理接口骏啰。Iterable<Entry<String, ChannelHandler>及ChannelPipeline中的部分方法节吮。
2.1.1、方法說明
方法名稱 | 返回值 | 功能說明 |
---|---|---|
iterator() | Iterator<Map.Entry<String, ChannelHandler>> | 返回Map.Entry<String, ChannelHandler>的遍歷器判耕,此map即為pipeline中所有Handler的name與Handler的map透绩。 |
names() | List<String> | 返回所有handler的name的集合 |
toMap() | Map<String, ChannelHandler> | 返回handler的name與ChannelHandler的map |
2.1.2、方法實現(xiàn)
iterator()及toMap()方法實現(xiàn):
public final Map<String, ChannelHandler> toMap() {
Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
AbstractChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == tail) {
return map;
}
map.put(ctx.name(), ctx.handler());
ctx = ctx.next;
}
}
@Override
public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
return toMap().entrySet().iterator();
}
由以上toMap()實現(xiàn)可知,map中為pipeline中從head到tail的handler的map帚豪;
2.2碳竟、inbound事件
當發(fā)生I/O事件時,如鏈路建立連接狸臣、鏈路關(guān)閉莹桅、讀取數(shù)據(jù)完成等,都會產(chǎn)生一個事件烛亦,事件在pipeline中進行傳播和處理诈泼,它是實際處理的總?cè)肟凇etty將有限的網(wǎng)絡(luò)I/O事件進行統(tǒng)一抽象此洲,ChannelInboundInvoker即為pipeline抽象的入口接口厂汗。pipeline中以fireXXX命名的方法都是從I/O線程流向用戶業(yè)務(wù)Handler的inbound消息。
2.2.1呜师、方法說明
方法名稱 | 返回值 | 功能說明 |
---|---|---|
fireChannelRegistered() | ChannelInboundInvoker | 當Channel 已經(jīng)注冊到它的EventLoop 并且能夠處理I/O 時被調(diào)用 |
fireChannelUnregistered() | ChannelInboundInvoker | 當Channel 從它的EventLoop 注銷并且無法處理任何I/O 時被調(diào)用 |
fireChannelActive() | ChannelInboundInvoker | 當Channel 處于活動狀態(tài)時被調(diào)用娶桦;Channel 已經(jīng)連接/綁定并且已經(jīng)就緒 |
fireChannelInactive() | ChannelInboundInvoker | 當Channel 離開活動狀態(tài)并且不再連接它的遠程節(jié)點時被調(diào)用 |
fireExceptionCaught(Throwable cause) | ChannelInboundInvoker | Channel異常事件 |
fireUserEventTriggered(Object event) | ChannelInboundInvoker | 當ChannelnboundHandler.fireUserEventTriggered()方法被調(diào) |
fireChannelRead(Object msg) | ChannelInboundInvoker | 當從Channel 讀取數(shù)據(jù)時被調(diào)用 |
fireChannelReadComplete() | ChannelInboundInvoker | 當Channel上的一個讀操作完成時被調(diào)用 |
fireChannelWritabilityChanged() | ChannelInboundInvoker | 當Channel 的可寫狀態(tài)發(fā)生改變時被調(diào)用。用戶可以確保寫操作不會完成得太快(以避免發(fā)生OutOfMemoryError)或者可以在Channel 變?yōu)樵俅慰蓪憰r恢復(fù)寫入汁汗≈云瑁可以通過調(diào)用Channel 的isWritable()方法來檢測Channel 的可寫性。與可寫性相關(guān)的閾值可以通過Channel.config().setWriteHighWatesetWriteHighWaterMark()和Channel.config().setWriteLowWaterMark()方法來設(shè)置 |
2.2.2知牌、方法實現(xiàn)
pipeline中inbound事件的處理都非常簡單祈争,其主要交由AbstractChannelHandlerContext中對應(yīng)的靜態(tài)方法進行處理。
部分處理源碼如下:
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
@Override
public final ChannelPipeline fireChannelInactive() {
AbstractChannelHandlerContext.invokeChannelInactive(head);
return this;
}
@Override
public final ChannelPipeline fireExceptionCaught(Throwable cause) {
AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
return this;
}
@Override
public final ChannelPipeline fireUserEventTriggered(Object event) {
AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
return this;
}
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
@Override
public final ChannelPipeline fireChannelReadComplete() {
AbstractChannelHandlerContext.invokeChannelReadComplete(head);
return this;
}
@Override
public final ChannelPipeline fireChannelWritabilityChanged() {
AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
return this;
}
2.3角寸、outbound事件
ChannelOutboundInvoker是outbound消息的接口菩混,由用戶或者代碼發(fā)起的I/O操作被稱為outbound消息,即為從pipeline中流程的消息的統(tǒng)稱扁藕。
2.3.1沮峡、方法說明
方法名稱 | 返回值 | 功能說明 |
---|---|---|
bind(SocketAddress localAddress) | ChannelFuture | 當請求將Channel 綁定到本地地址時被調(diào)用,綁定成功或失敗都通過ChannelFuture進行通知 |
connect(SocketAddress remoteAddress) | ChannelFuture | 當請求將Channel 連接到遠程節(jié)點時被調(diào)用亿柑,當連接超時時拋出ConnectTimeoutException邢疙,當連接被拒絕時,將拋出ConnectException |
connect(SocketAddress remoteAddress, SocketAddress localAddress) | ChannelFuture | |
disconnect() | ChannelFuture | 當請求將Channel 從遠程節(jié)點斷開時被調(diào)用望薄,不論處理成功或失敗疟游,都會進行通知 |
close() | ChannelFuture | 當請求關(guān)閉Channel 時被調(diào)用 |
deregister() | ChannelFuture | 當請求將Channel 從它的EventLoop 注銷時被調(diào)用 |
bind(SocketAddress localAddress, ChannelPromise promise) | ChannelFuture | 當請求將Channel 綁定到本地地址時被調(diào)用 |
connect(SocketAddress remoteAddress, ChannelPromise promise) | ChannelFuture | 當請求將Channel 連接到遠程節(jié)點時被調(diào)用 |
connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) | ChannelFuture | 當請求將Channel 連接到遠程節(jié)點時被調(diào)用 |
disconnect(ChannelPromise promise) | ChannelFuture | 當請求將Channel 從遠程節(jié)點斷開時被調(diào)用 |
close(ChannelPromise promise) | ChannelFuture | 請求關(guān)閉Channel 時被調(diào)用 |
deregister(ChannelPromise promise) | ChannelFuture | 當請求將Channel 從它的EventLoop 注銷時被調(diào)用 |
read() | ChannelFuture | 當請求從Channel 讀取更多的數(shù)據(jù)時被調(diào)用 |
write(Object msg) | ChannelFuture | 當請求通過Channel 將數(shù)據(jù)寫到遠程節(jié)點時被調(diào)用 |
write(Object msg, ChannelPromise promise) | ChannelFuture | 當請求通過Channel 將數(shù)據(jù)寫到遠程節(jié)點時被調(diào)用 |
flush() | ChannelOutboundInvoker | 當請求通過Channel 將入隊數(shù)據(jù)沖刷到遠程節(jié)點時被調(diào)用 |
writeAndFlush(Object msg, ChannelPromise promise) | ChannelFuture | 當請求通過Channel 將入隊數(shù)據(jù)沖刷到遠程節(jié)點時被調(diào)用 |
writeAndFlush(Object msg) | ChannelFuture | 當請求通過Channel 將入隊數(shù)據(jù)沖刷到遠程節(jié)點時被調(diào)用 |
newPromise() | ChannelPromise | 返回一個新的ChannelPromise |
newProgressivePromise() | ChannelProgressivePromise | 返回一個新的ChannelProgressivePromise |
newSucceededFuture() | ChannelFuture | 返回一個已被標記為成功的ChannelFuture,所有與此ChannelFuture綁定的監(jiān)聽器都將被通知痕支,所有阻塞調(diào)用也將直接返回 |
newFailedFuture(Throwable cause) | ChannelFuture | 返回一個已被標記為失敗的ChannelFuture颁虐,所有與此ChannelFuture綁定的監(jiān)聽器都將被通知,所有阻塞調(diào)用也將直接返回 |
voidPromise() | ChannelPromise | 返回一個不同操作也重用的ChannelPromise卧须,但使用有一定限制另绩,需要小心使用 |
2.3.2瞬痘、實現(xiàn)源碼
實現(xiàn)源碼如下:
@Override
public final ChannelFuture bind(SocketAddress localAddress) {
return tail.bind(localAddress);
}
@Override
public final ChannelFuture connect(SocketAddress remoteAddress) {
return tail.connect(remoteAddress);
}
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return tail.connect(remoteAddress, localAddress);
}
@Override
public final ChannelFuture disconnect() {
return tail.disconnect();
}
@Override
public final ChannelFuture close() {
return tail.close();
}
@Override
public final ChannelFuture deregister() {
return tail.deregister();
}
@Override
public final ChannelPipeline flush() {
tail.flush();
return this;
}
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
@Override
public final ChannelFuture connect(
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, localAddress, promise);
}
@Override
public final ChannelFuture disconnect(ChannelPromise promise) {
return tail.disconnect(promise);
}
@Override
public final ChannelFuture close(ChannelPromise promise) {
return tail.close(promise);
}
@Override
public final ChannelFuture deregister(final ChannelPromise promise) {
return tail.deregister(promise);
}
@Override
public final ChannelPipeline read() {
tail.read();
return this;
}
@Override
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
@Override
public final ChannelFuture write(Object msg, ChannelPromise promise) {
return tail.write(msg, promise);
}
@Override
public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return tail.writeAndFlush(msg, promise);
}
@Override
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
@Override
public final ChannelPromise newPromise() {
return new DefaultChannelPromise(channel);
}
@Override
public final ChannelProgressivePromise newProgressivePromise() {
return new DefaultChannelProgressivePromise(channel);
}
@Override
public final ChannelFuture newSucceededFuture() {
return succeededFuture;
}
@Override
public final ChannelFuture newFailedFuture(Throwable cause) {
return new FailedChannelFuture(channel, null, cause);
}
@Override
public final ChannelPromise voidPromise() {
return voidPromise;
}
由以上源碼可知,outbound的具體實現(xiàn)都是交由tail(ChannelHandlerContext)來實現(xiàn)的板熊。
2.4、ChannelPipeline鏈表維護
ChannelPipeline中維護了一個ChannelHandlerContext的鏈表察绷,I/O事件通過鏈表在用戶的Handler中傳播干签。
2.4.1、鏈表維護接口
方法名稱 | 返回值 | 功能說明 |
---|---|---|
addFirst(String name, ChannelHandler handler) | ChannelPipeline | 將handler添加到pipeline隊列的頭部 |
addLast(String name, ChannelHandler handler) | ChannelPipeline | 將handler添加到pipeline隊列的尾部 |
addBefore(String baseName, String name, ChannelHandler handler) | ChannelPipeline | 將handler添加到baseName對應(yīng)的handler之前 |
addAfter(String baseName, String name, ChannelHandler handler) | ChannelPipeline | 將handler添加到baseName對應(yīng)的handler之后 |
addFirst(ChannelHandler... handlers) | ChannelPipeline | 按順序批量添加Handler到隊列頭部 |
addLast(ChannelHandler... handlers) | ChannelPipeline | 按順序批量添加Handler到隊列尾部 |
remove(ChannelHandler handler) | ChannelPipeline | 移除handler |
remove(String name) | ChannelHandler | 移除名字為name的handler |
remove(Class<T> handlerType) | ChannelPipeline | 移除類型為handlerType的handler |
removeFirst() | ChannelPipeline | 移除第一個handler |
removeLast() | ChannelPipeline | 移除最后一個handler |
replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) | ChannelPipeline | 用newHandler替換oldHandler |
first() | ChannelHandler | 獲取第一個Handler |
firstContext() | ChannelHandlerContext | 獲取第一個Context |
last() | ChannelHandler | 獲取最后一個Handler |
lastContext() | ChannelHandlerContext | 獲取最后一個Context |
get(String name) | ChannelHandler | 通過名字獲取Handler |
context(ChannelHandler handler) | ChannelHandlerContext | 通過Handler獲取其對應(yīng)的Context |
context(String name) | ChannelHandlerContext | 通過Handler的名字獲取其對應(yīng)的Context |
注:以上接口中添加的頭尾不包括head節(jié)點和tail節(jié)點拆撼,這兩節(jié)點為netty框架的節(jié)點容劳,不允許用戶修改。
2.4.2闸度、接口實現(xiàn)
以下對主要接口的源碼進行分析竭贩。
addFirst():
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
name = filterName(name, handler);
newCtx = newContext(group, name, handler);
addFirst0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
private void addFirst0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext nextCtx = head.next;
newCtx.prev = head;
newCtx.next = nextCtx;
head.next = newCtx;
nextCtx.prev = newCtx;
}
主要流程為:
- 檢查Handler是否在多個pipeline中重復(fù)添加。被注解為@Sharable的Handler是可以在多個pipeline中重復(fù)添加的莺禁,否則為保證線程安全留量,不允許在多個pipeline中添加。
- 檢查handler名字是否重復(fù)哟冬。如果添加時的name為空楼熄,則由框架自動生成name,生成規(guī)則為:[SimpleName] + "#" + [數(shù)字]浩峡,數(shù)字從0累加可岂,知道名字不重復(fù)為止。如果添加時的name不空翰灾,則檢查name是否重復(fù)缕粹,重復(fù)則拋出IllegalArgumentException異常,否則驗證通過纸淮;
- 根據(jù)pipeline平斩、EventExecutorGroup、name萎馅、handler新建一個ChannelHandlerContext双戳;
- 挑用addFirst0()將新建的context添加到pipeline中head的下一個節(jié)點;
- 若Channel還未在EventLoop中注冊糜芳,則注冊PendingHandlerAddedTask任務(wù)飒货,當Channel注冊成功時,調(diào)用ChannelHandler.handlerAdded()方法峭竣;若Channel已經(jīng)注冊成功則直接調(diào)用callHandlerAdded0()方法來通過管道調(diào)用所有Handler的ChannelHandler.handlerAdded()方法塘辅。
addLast():
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
addLast()的實現(xiàn)源碼與addFirst基本一樣,唯一區(qū)別是將handler添加的pipeline的tail節(jié)點的前一個節(jié)點皆撩。
2.5扣墩、DefaultChannelPipeline源碼分析
DefaultChannelPipeline為ChannelPipeline接口的實現(xiàn)哲银。也定義了Pipeline中的head和tail節(jié)點及實現(xiàn)等。
2.5.1呻惕、基本屬性
private static final String HEAD_NAME = generateName0(HeadContext.class);
private static final String TAIL_NAME = generateName0(TailContext.class);
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
new FastThreadLocal<Map<Class<?>, String>>() {
@Override
protected Map<Class<?>, String> initialValue() throws Exception {
return new WeakHashMap<Class<?>, String>();
}
};
private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
AtomicReferenceFieldUpdater.newUpdater(
DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
private final boolean touch = ResourceLeakDetector.isEnabled();
private Map<EventExecutorGroup, EventExecutor> childExecutors;
private volatile MessageSizeEstimator.Handle estimatorHandle;
private boolean firstRegistration = true;
/**
* This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
* all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
*
* We only keep the head because it is expected that the list is used infrequently and its size is small.
* Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
* complexity.
*/
private PendingHandlerCallback pendingHandlerCallbackHead;
/**
* Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
* change.
*/
private boolean registered;
HEAD_NAME:head對應(yīng)的Handler的名字荆责;
TAIL_NAME:tail對應(yīng)的handler的名字;
nameCaches:Handler與其name的map的緩存亚脆;
ESTIMATOR:消息中字節(jié)大小統(tǒng)計器做院;
head:pipeline隊列的頭節(jié)點,其是ChannelHandlerContext與ChannelHandler的實現(xiàn)濒持。
tail:pipeline隊列的尾節(jié)點键耕,其是ChannelHandlerContext與ChannelHandler的實現(xiàn)。
channel:pipeline對應(yīng)的Channel柑营;
succeededFuture:處理成功的異步結(jié)果屈雄;
voidPromise:通用的異步處理結(jié)果;
childExecutors:子執(zhí)行器官套;
estimatorHandle:消息字節(jié)大小統(tǒng)計器的處理器酒奶;
firstRegistration:是否第一次注冊
pendingHandlerCallbackHead:頭節(jié)點一些事件的異步回調(diào)任務(wù);
2.5.2奶赔、構(gòu)造函數(shù)
構(gòu)造函數(shù)源碼:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
構(gòu)造函數(shù)比較簡單讥蟆,主要新建的succeededFuture和voidPromise異步通知,以及鏈表的頭結(jié)點(head)和尾節(jié)點(tail)纺阔。
2.5.3瘸彤、HeadContext源碼解析
HeadContext為Pipeline的頭節(jié)點實現(xiàn),其即時ChannelHandlerContext的實現(xiàn)笛钝,也是ChannelHandler的實現(xiàn)质况。
2.5.3.1、HeadContext類繼承關(guān)系
HeadContext類繼承圖:
HeadContext實現(xiàn)ChannelHandler的inbound接口和outbound接口玻靡,也實現(xiàn)了ChannelHandlerContext的inbound及outbound接口结榄。
2.5.4、TailContext源碼解析
TailContext為pipeline的尾節(jié)點實現(xiàn)囤捻,其即時ChannelHandlerContext的實現(xiàn)臼朗,也是ChannelHandler的實現(xiàn)。
TailContext類繼承圖:
TailContext在實現(xiàn)ChannelHandlerContext接口蝎土,同時實現(xiàn)ChannelHandler的inbound接口视哑。
3、ChannelHandlerContext源碼解析
ChannelHandlerContext 代表了ChannelHandler 和ChannelPipeline 之間的關(guān)聯(lián)誊涯,每當有ChannelHandler 添加到ChannelPipeline 中時挡毅,都會創(chuàng)建ChannelHandlerContext。ChannelHandlerContext 的主要功能是管理它所關(guān)聯(lián)的ChannelHandler 和在同一個ChannelPipeline 中的其他ChannelHandler 之間的交互暴构。
ChannelHandlerContext中的一些接口在ChannelPipeline中也有實現(xiàn)跪呈,但傳播方向有一點重要的不同段磨。如果調(diào)用Channel 或者ChannelPipeline 上的這些方法,它們將沿著整個ChannelPipeline 進行傳播耗绿。而調(diào)用位于ChannelHandlerContext上的相同方法苹支,則將從當前所關(guān)聯(lián)的ChannelHandler 開始,并且只會傳播給位于該ChannelPipeline 中的下一個能夠處理該事件的ChannelHandler误阻。
ChannelHandlerContext類繼承圖:
ChannelInboundInvoker:是網(wǎng)絡(luò)I/O的事件的統(tǒng)一抽象沐序,即為inbound事件,方法都以fireXXX開頭堕绩,pipeline也實現(xiàn)此接口。
ChannelOutboundInvoker:是用戶線程或代碼發(fā)起的I/O操作邑时,被稱為outbound事件奴紧。
AttributeMap:存儲屬性鍵值對;
AbstractChannelHandlerContext:ChannelHandlerContext的抽象實現(xiàn)類晶丘,對通用處理進行了處理黍氮;
DefaultChannelHandlerContext:ChannelHandlerContext的默認實現(xiàn),netty框架即使用此實現(xiàn)浅浮;
HeadContext/TailContext:pipeline的頭結(jié)點和尾節(jié)點實現(xiàn)沫浆;
3.1、AbstractChannelHandlerContext源碼分析
AbstractChannelHandlerContext為ChannelHandlerContext的抽象實現(xiàn)滚秩。
3.1.1专执、基本屬性
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
*/
private static final int ADD_PENDING = 1;
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
*/
private static final int ADD_COMPLETE = 2;
/**
* {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
private static final int REMOVE_COMPLETE = 3;
/**
* Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
* nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
private static final int INIT = 0;
private final boolean inbound;
private final boolean outbound;
private final DefaultChannelPipeline pipeline;
private final String name;
private final boolean ordered;
// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
final EventExecutor executor;
private ChannelFuture succeededFuture;
// Lazily instantiated tasks used to trigger events to a handler with different executor.
// There is no need to make this volatile as at worse it will just create a few more instances then needed.
private Runnable invokeChannelReadCompleteTask;
private Runnable invokeReadTask;
private Runnable invokeChannelWritableStateChangedTask;
private Runnable invokeFlushTask;
private volatile int handlerState = INIT;
next:pipeline中的下一個ChannelHandlerContext節(jié)點;
prev:pipeline中的上一個ChannelHandlerContext節(jié)點郁油;
inbound:標識此Context對應(yīng)的Handler是否為ChannelInboundHandler類型本股;
outbound:標識此Context對應(yīng)的Handler是否為ChannelOutboundHandler類型;
pipeline:此Context對應(yīng)的Pipeline桐腌;
name:此Context的名字拄显;
ordered:事件順序標志;
executor:事件執(zhí)行線程案站;
succeededFuture:成功的異步處理結(jié)果躬审;
invokeChannelReadCompleteTask:讀完成處理任務(wù);
invokeReadTask:讀數(shù)據(jù)任務(wù)蟆盐;
invokeChannelWritableStateChangedTask:Channel寫狀態(tài)變更任務(wù)承边;
invokeFlushTask:沖刷數(shù)據(jù)任務(wù);
handlerState:當前Handler的狀態(tài)
handlerState有以下四種狀態(tài):
// 初始狀態(tài)
private static final int INIT = 0;
// 對應(yīng)Handler的handlerAdded方法將要被調(diào)用但還未調(diào)用
private static final int ADD_PENDING = 1;
// 對應(yīng)Handler的handlerAdded方法被調(diào)用
private static final int ADD_COMPLETE = 2;
// 對應(yīng)Handler的handlerRemoved方法被調(diào)用
private static final int REMOVE_COMPLETE = 3;
3.1.1石挂、構(gòu)造函數(shù)
構(gòu)造函數(shù)源碼:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
3.1.2炒刁、inbound事件
AbstractChannelHandlerContext中對inbound事件的處理大同小異,本處只對fireChannelRegistered進行分析誊稚,其他事件處理流程基本相同翔始;
源碼:
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound());
return this;
}
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
fireChannelRegistered():此方法主要是找到下個inbound類型的Context罗心,并交由invokeChannelRegistered(final AbstractChannelHandlerContext next):靜態(tài)方法進行處理;
invokeChannelRegistered(final AbstractChannelHandlerContext next):此靜態(tài)方法主要判斷事件處理是否在執(zhí)行線程中城瞎,是則直接處理渤闷;否則異步處理。同時脖镀,pipeline中也會調(diào)用此方法對注冊事件進行傳播飒箭,pipeline中fireChannelRegistered事件的處理就是調(diào)用此靜態(tài)方法,而參數(shù)為HeadContext蜒灰,即從head節(jié)點開始傳播注冊事件弦蹂;
invokeChannelRegistered():此方法首先判斷Context的Handler是否已經(jīng)在pipeline中添加完成,完成則直接調(diào)用對應(yīng)Handler的channelRegistered()方法對注冊事件進行處理强窖;否則直接調(diào)用fireChannelRegistered()將事件交由下個inbound類型的Context處理凸椿。
3.1.3、outbound事件
與inbound事件相同翅溺,Context的outbound事件的傳播流程也大體相同脑漫,本處以bind()事件為例進行傳播流程的分析,其他事件傳播流程類似咙崎。
源碼:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
從bind()方法可知优幸,其主要查找下一個Context并調(diào)用invokeBind()進行處理,而invokeBind()又調(diào)用Handler的bind()褪猛;Handler的bind()通用處理是沿著outbound的Context向head節(jié)點傳播网杆,其最終調(diào)用的是pipeline中head節(jié)點的Handler的bind()方法,而head節(jié)點的bind的方法會調(diào)用底層Channel的Unsafe的bind()方法進行最終的bind()操作伊滋。
3.2跛璧、DefaultChannelHandlerContext源碼分析
DefaultChannelHandlerContext為netty的默認ChannelHandlerContext實現(xiàn),其實現(xiàn)非常簡單新啼。
源碼:
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
@Override
public ChannelHandler handler() {
return handler;
}
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
}
3.2.1追城、基本屬性
handler:context對應(yīng)的ChannelHandler;
3.2.1燥撞、構(gòu)造函數(shù)
構(gòu)造函數(shù)主要通過isInbound()方法和isOutbound()方法判斷此ChannelHandler為inbound或outbound處理器座柱。其他處理都交由AbstractChannelHandlerContext。
相關(guān)閱讀:
Netty源碼愫讀(一)ByteBuf相關(guān)源碼學(xué)習(xí) 【http://www.reibang.com/p/016daa404957】
Netty源碼愫讀(二)Channel相關(guān)源碼學(xué)習(xí)【http://www.reibang.com/p/02eac974258e】
Netty源碼愫讀(四)ChannelHandler相關(guān)源碼學(xué)習(xí)【http://www.reibang.com/p/6ee0a3b9d73a】
Netty源碼愫讀(五)EventLoop與EventLoopGroup相關(guān)源碼學(xué)習(xí)【http://www.reibang.com/p/05096995d296】
Netty源碼愫讀(六)ServerBootstrap相關(guān)源碼學(xué)習(xí)【http://www.reibang.com/p/a71a9a0291f3】
參考書籍:
《Netty實戰(zhàn)》
《Netty權(quán)威指南》
參考博客:
http://www.reibang.com/p/4c35541eec10
http://www.reibang.com/p/0b79872eb515
http://www.reibang.com/p/a0a51fd79f62
http://www.wolfbe.com/detail/201609/379.html