其實(shí)在之前的客戶端和服務(wù)端初始化的時(shí)候已經(jīng)說過了剖膳,在初始化Channel的時(shí)候魏颓,同時(shí)初始化pipeline;
//AbstractChannel
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
//DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
從DefaultChannelPipeline即可知道初始化pipeline的時(shí)候吱晒,head和tail是雙向鏈表甸饱。可以看下它們的繼承關(guān)系,再進(jìn)行其初始化的觀察叹话。
TailContext:
從繼承結(jié)構(gòu)可以看出偷遗,HeadContext和TailContext都繼承了AbstractChannelHandlerContext和實(shí)現(xiàn)了ChannelOutboundHandler/ChannelInboundHandler,說明用戶雙重屬性驼壶,既是context同時(shí)也是handler氏豌,按我的理解意味著其即擁有上下文屬性也擁有handler屬性(處理業(yè)務(wù)邏輯)。在文章開始的圖里已經(jīng)說明:每個(gè)channel包含一個(gè)pipeline热凹,而pipeline又維護(hù)了一個(gè)雙向鏈表泵喘。
TailContext和HeadContext
這是TailContext和HeadContext構(gòu)造函數(shù),需要注意的是TailContext的inbound為true般妙,outbound為false纪铺,HeadContext則相反,這兩個(gè)參數(shù)和netty事件流向有關(guān)碟渺,具體情況下文說明霹陡。
重新分析ChannelInitializer和自定義handler的添加
Bootstrap.connect()-->Bootstrap.doResolveAndConnect()-->AbstractBootstrap.initAndRegister()-->Bootstrap.init()
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
。止状。。
}
config.handler()獲取的就是ChannelInitializer攒霹,p.addLast(config.handler());就是把ChannelInitializer加入雙向鏈表怯疤,看代碼:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
讓我們來看看其中的關(guān)鍵代碼
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
由上可知,在加入ChannelInitializer的過程中可以知道催束,為了添加一個(gè) handler 到pipeline中, 會把此handler包裝成ChannelHandlerContext集峦。同時(shí)addLast0說明ChannelInitializer是添加在tail之前。這個(gè)過程中注意下兩個(gè)有意思的方法:
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
從源碼中可以看到, 當(dāng)一個(gè)handler實(shí)現(xiàn)了ChannelInboundHandler接口, 則 isInbound 返回真; 類似地, 當(dāng)一個(gè)handler實(shí)現(xiàn)了ChannelOutboundHandler接口, 則isOutbound就返回真抠刺。ChannelInitializer是實(shí)現(xiàn)了ChannelInboundHandlerAdapter塔淤,所以inbound傳入的是true。
自定義handler的添加
addLast方法中的另一條關(guān)鍵代碼如下:
callHandlerAdded0(newCtx);
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
速妖。高蜂。。
ctx.handler().handlerAdded(ctx);
罕容。备恤。。
}
ctx.handler()取到的自然是ChannelInitializer锦秒,而handlerAdded(ctx)都做了什么呢:
handlerAdded()-->boolean initChannel()-->void initChannel()
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
return true;
}
return false;
}
ChannelInitializer在加入雙向鏈表后露泊,調(diào)用重寫initChannel()方法,在initChannel()方法中加入自定義handler旅择,最后remove(ctx);移除ChannelInitializer惭笑。
回過頭來看下channel的結(jié)構(gòu)層次圖,在初始化channel的時(shí)候會構(gòu)建一個(gè)pipeline座位channel的屬性(pipeline也有一個(gè)channel屬性),每個(gè)pipeline維護(hù)了一個(gè)由ChannelHandlerContext 組成的雙向鏈表. 這個(gè)鏈表的頭是HeadContext沉噩,鏈表的尾是TailContext捺宗,并且每個(gè)ChannelHandlerContext中又關(guān)聯(lián)著一個(gè)ChannelHandler。
pipeline的傳輸機(jī)制
從上面的分析屁擅,我們知道AbstractChannelHandlerContext中有inbound和outbound兩個(gè)boolean變量, 分別用于標(biāo)識Context所對應(yīng)的handler的類型, 即:
- inbound為真時(shí), 表示對應(yīng)的ChannelHandler實(shí)現(xiàn)了ChannelInboundHandler方法.
- outbound 為真時(shí), 表示對應(yīng)的 ChannelHandler 實(shí)現(xiàn)了 ChannelOutboundHandler 方法.
pipieline的事件傳輸類型有兩種:inbound事件和outbound事件兩種:
I/O Request
via Channel or
ChannelHandlerContext
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+
這個(gè)是netty官方文檔偿凭,可以很明顯地看出:inbound事件和outbound事件的流向相反。 inbound 的傳遞方式是通過調(diào)用相應(yīng)的ChannelHandlerContext.fireIN_EVT() 方法, 而 outbound 方法的的傳遞方式是通過調(diào)用 ChannelHandlerContext.OUT_EVT() 方法派歌。例如ChannelHandlerContext.fireChannelRegistered()調(diào)用會發(fā)送一個(gè)ChannelRegistered 的inbound給下一個(gè)ChannelHandlerContext, 而ChannelHandlerContext.bind調(diào)用會發(fā)送一個(gè)bind的outbound事件給下一個(gè) ChannelHandlerContext弯囊。
讓我們來看下inbound事件傳播的方法有哪些:
ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead()
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught()
ChannelHandlerContext.fireUserEventTriggered()
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()
outbound事件傳播的方法有:
ChannelHandlerContext.bind()
ChannelHandlerContext.connect()
ChannelHandlerContext.write()
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect()
ChannelHandlerContext.close()
讓我們具體來看看這兩類事件
outbound事件
outbound事件是請求事件,inbound事件是通知事件胶果,這個(gè)要區(qū)分清楚匾嘱。請求事件就是請求某件事即將發(fā)生,然后outbound事件進(jìn)行通知早抠。outbound事件的流向是:
tail -> customContext -> head
讓我們用connect事件代碼來證明:
當(dāng)調(diào)用Bootstrap.connect()的時(shí)候霎烙,會觸發(fā)一個(gè)outbound事件。以下是調(diào)用鏈
Bootstrap.connect -> Bootstrap.doConnect -> AbstractChannel.connect
讓我們看看AbstractChannel.connect
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
//pipeline.connect的實(shí)現(xiàn)如下:
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
當(dāng) outbound 事件(這里是 connect 事件)傳遞到 Pipeline 后, 它其實(shí)是以 tail 為起點(diǎn)開始傳播的蕊连。而 tail.connect 其實(shí)調(diào)用的是AbstractChannelHandlerContext.connect 方法悬垃。繼續(xù)跟進(jìn),在AbstractChannelHandlerContext中connect方法:
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
甘苍。尝蠕。。
final AbstractChannelHandlerContext next = findContextOutbound()
next.invokeConnect(remoteAddress, localAddress, promise);
载庭。看彼。。
}
讓我們看下其中的關(guān)鍵代碼:
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
顧名思義囚聚,findContextOutbound就是找出以this context(tail)為基本節(jié)點(diǎn)靖榕,找出第一個(gè)outbound為true的context,然后通過ctx調(diào)用invokeConnect方法顽铸,如果
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
從tail往head方向獲取handler并且調(diào)用其connect茁计,如果用戶沒有從寫這個(gè)方法,那么會調(diào)用ChannelOutboundHandlerAdapter實(shí)現(xiàn)的方法:
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
connect又會調(diào)用AbstractChannelHandlerContext中connect方法找到下一個(gè)outbound為true的handler調(diào)用其connect跋破,這樣的循環(huán)中簸淀,直到connect事件傳遞到DefaultChannelPipeline的雙向鏈表的頭節(jié)點(diǎn), 即 head 中(head的outbound設(shè)置為true)。
Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect
outbound事件傳到head后毒返,因?yàn)閔ead本身也是handler租幕,handler()返回的的就它本身,讓我們看看它c(diǎn)onnect方法的實(shí)現(xiàn):
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
到這邊outbound事件就結(jié)束了拧簸。inbound事件
inbound是通知事件劲绪,就是說某件事情已經(jīng)發(fā)生了,然后利用inbound事件進(jìn)行通知。inbound事件的傳輸方向和outbound剛好相反:
head -> customcontext -> tail
沿著connect繼續(xù)走贾富,在之后會有inbound事件歉眷,我們就以這個(gè)為例子進(jìn)行inbound事件講解。
承接上文颤枪,之前看到head的connect方法:
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
這里unsafe.connect調(diào)用的是AbstractNioChannel.connect()汗捡,關(guān)鍵代碼如下:
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
···
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
}
···
}
在doConnect完成連接之后調(diào)用了fulfillConnectPromise,
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
return;
}
boolean active = isActive();
boolean promiseSet = promise.trySuccess();
if (!wasActive && active) {
pipeline().fireChannelActive();
}
if (!promiseSet) {
close(voidPromise());
}
}
讓我們看pipeline().fireChannelActive();pipeline().fireChannelActive()將通道激活的消息(即 Socket 連接成功)發(fā)送出去畏纲。這里就是inbound事件的起點(diǎn)扇住,往下走看這個(gè)過程是怎么樣的:
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
很明顯,以head(HeadContext)為起點(diǎn)盗胀,讓我們看下在invokeChannelActive做了什么
static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelInactive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelInactive();
}
});
}
}
//next.invokeChannelInactive()實(shí)現(xiàn)
private void invokeChannelInactive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelInactive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelInactive();
}
}
這個(gè)方法和 Outbound 的對應(yīng)方法(例如 invokeConnect) 如出一轍. 同Outbound一樣, 如果用戶沒有重寫channelActive方法, 那么會調(diào)用ChannelInboundHandlerAdapter 的 channelActive 方法:
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
public ChannelHandlerContext fireChannelActive() {
invokeChannelActive(findContextInbound());
return this;
}
和outbound事件一樣艘蹋,一樣的循環(huán),最后事件傳輸?shù)絫ail票灰。tail 本身既實(shí)現(xiàn)了ChannelInboundHandler接口, 又實(shí)現(xiàn)了ChannelHandlerContext接口女阀,因此當(dāng)channelActive消息傳遞到tail后,會將消息轉(zhuǎn)遞到對應(yīng)的ChannelHandler中處理屑迂,tail的handler()返回的就是tail本身浸策,最后的channelActive即是tail中的。
inbound事件到這里也就結(jié)束了惹盼。