導(dǎo)讀
原創(chuàng)文章浆洗,轉(zhuǎn)載請注明出處狈蚤。
本文源碼地址:netty-source-code-analysis
本文所使用的netty版本4.1.6.Final:帶注釋的netty源碼
在"Pipeline的構(gòu)造"這一節(jié)中我們已經(jīng)講過了,Pipeline
中利用了責(zé)任鏈模式笋粟,而發(fā)揮責(zé)任鏈功能的數(shù)據(jù)結(jié)構(gòu)就是由多個HandlerContext
構(gòu)成的的雙向鏈表特占,而每一個HandlerContext
又對應(yīng)一個ChannelHandler
(組合模式或者繼承)乃摹。由于每一個HandlerContext
對應(yīng)一個ChannelHandler
,所以本文行文中有時候會把HandlerContext
和ChannelHandler
等同對待赢乓,例如文章中
從當(dāng)前
HandlerContext
(tail
)開始向head
方向遍歷尋找下一個ChannelOutboundHandler
實際上準確來說應(yīng)該是
從當(dāng)前
HandlerContext
(tail
)開始向head
方向遍歷尋找下一個HandlerContext
忧侧,并且該HandlerContext
包含或者實現(xiàn)了ChannelOutboundHandler
。
為了行文不那么拗口牌芋,我們就把HandlerContext
和ChannelHandler
等同對待了蚓炬。
本文我們以客戶端建立連接、收發(fā)數(shù)據(jù)為例來學(xué)習(xí)一下Pipeline
的工作原理躺屁。
1 ChannelHandler方法概覽
在ChannelPipeline
的構(gòu)造這篇文章中我們提到過ChannelHandler
肯夏,Pipeline
中雙向鏈表由HandlerContext
組成,而每一個HandlerContext
又包含一個ChannleHandler
犀暑。我們看一下其中的主要方法驯击,這是所有ChannelHandler
的公共接口,公共接口中主要有兩個方法handlerAdded
和handlerRemoved
耐亏,這是在ChannelHandler
添加和刪除完成之后的回調(diào)方法徊都。上一篇文章中我們已經(jīng)分析過Pipeline
中ChannelHandler
的添加和刪除了,這里不再贅述广辰。
public interface ChannelHandler {
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
}
我們重點來看ChannelHandler
的兩個子接口暇矫,ChannelInboundHandler
和ChannelOutboundHandler
主之。
首先來看ChannelInboundHandler
,ChannelInboundHandler
類上的注釋如下
which adds callbacks for state changes. This allows the user
to hook in to state changes easily.
翻譯過來就是“Channel狀態(tài)改變時的回調(diào)方法”李根。其中的方法名多為Channel
+ 動詞過去分詞的形式槽奕。
public interface ChannelInboundHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
再來看ChannelOutboundHandler
,ChannelOutboundHandler
上的注釋如下
which will get notified for IO-outbound-operations.
翻譯過來就是“當(dāng)發(fā)生IO出站操作時的回調(diào)方法”朱巨,其中的方法名多為動詞原型的形式史翘。
public interface ChannelOutboundHandler extends ChannelHandler {
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void read(ChannelHandlerContext ctx) throws Exception;
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
void flush(ChannelHandlerContext ctx) throws Exception;
}
2 ChannelInboundInvoker和ChannelOutboundInvoker
和本文相關(guān)的組件中,ChannelPipeline
和ChannelHandlerContext
都實現(xiàn)了ChannelInboundInvoker
和ChannelOutboundInvoker
接口冀续。
ChannelInboundInvoker
內(nèi)的方法的特點是大部分都是以fire
開頭琼讽,并且是fire
+ 動詞的過去分詞的形式。我們知道過去分詞是表被動的意思洪唐,所以這里呢钻蹬,一般是Channel
里發(fā)生了某些“被動事件”之后調(diào)用的。
public interface ChannelInboundInvoker {
ChannelInboundInvoker fireChannelRegistered();
ChannelInboundInvoker fireChannelUnregistered();
ChannelInboundInvoker fireChannelActive();
ChannelInboundInvoker fireChannelInactive();
ChannelInboundInvoker fireExceptionCaught(Throwable cause);
ChannelInboundInvoker fireUserEventTriggered(Object event);
ChannelInboundInvoker fireChannelRead(Object msg);
ChannelInboundInvoker fireChannelReadComplete();
ChannelInboundInvoker fireChannelWritabilityChanged();
}
ChannelOutboundInvoker
內(nèi)的方法的特點是除了后邊幾個以new
開頭的方法之外凭需,都是以單個或者兩個動詞原型的形式作為方法名稱问欠,比如read
、write
和writeAndFlush
粒蜈。這里直接用動詞原型就是表示主動的意思顺献,所以這里一般是向Channel
發(fā)送主動命令
使用的。
public interface ChannelOutboundInvoker {
ChannelFuture bind(SocketAddress localAddress);
ChannelFuture connect(SocketAddress remoteAddress);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
ChannelFuture disconnect();
ChannelFuture close();
ChannelFuture deregister();
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
ChannelFuture disconnect(ChannelPromise promise);
ChannelFuture close(ChannelPromise promise);
ChannelFuture deregister(ChannelPromise promise);
ChannelOutboundInvoker read();
ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
ChannelPromise newPromise();
ChannelProgressivePromise newProgressivePromise();
ChannelFuture newSucceededFuture();
ChannelFuture newFailedFuture(Throwable cause);
ChannelPromise voidPromise();
我們來仔細分析一下上文中提到的ChannelInboundHandler
枯怖、ChannelInboundInvoker
注整、ChannelOutboundHandler
和ChannelOutboundInvoker
。
我們看到ChannelInboundHandler
和ChannelInboundInvoker
中的方法非常相似度硝,ChannelInboundHandler
中的方法名多以Channel
加動詞過去分詞的形式組成肿轨,而ChannelInboundInvoker
中的方法就是ChannelInboundHandler
中的方法加上fire
構(gòu)成。
ChannelOutboundHandler
和ChannelOutboundInvoker
中的方法名幾乎一模一樣蕊程,都是以動詞原形作為方法名椒袍。
我們來仔細琢磨一下這些方法的命名,Channel
加動詞過去分詞表示該Channel
發(fā)生了某種事件藻茂,而fireChannel
加動詞過去分詞表示在該Channel
上觸發(fā)這種事件驹暑。例如ChannelRead
表示該Channel
上讀到了數(shù)據(jù)事件,fireChannelRead
表示在該Channel
上觸發(fā)ChannelRead
事件辨赐。
直接用動詞原形作為方法名岗钩,表示命令該Channel
進行某種操作,例如read
表示命令該Channel
進行數(shù)據(jù)的讀取肖油。為什么ChannelOutboundInvoker
中的方法名沒有以fire
開頭呢兼吓,因為ChannelOutbound
傳播的不是事件,而是一種主動的操作森枪。
還有同學(xué)記得我在“Netty整體架構(gòu)”這篇文章中提到的“事件”和“命令”嗎视搏,在某些地方翻譯成“入站事件”和“出站事件”审孽,我個人認為翻譯成“事件”和“命令”更容易理解,因為我們感知到“事件”的發(fā)生是被動的浑娜,而發(fā)出“命令”是主動的佑力,都翻譯成“事件”不容易理解。
3 以客戶端為例分析Pipeline工作原理
本文的示例代碼在package com.zhongdaima.netty.analysis.pipeline
中筋遭。
服務(wù)端示例代碼如下打颤,這個服務(wù)端的功能就是將所有客戶端發(fā)過來的數(shù)據(jù)原封不動地返回。
/**
* 歡迎關(guān)注公眾號“種代碼“漓滔,獲取博主微信深入交流
*
* @author wangjianxin
*/
public class ServerBoot {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//連接發(fā)過來的數(shù)據(jù)原樣返回
if (msg instanceof ByteBuf) {
ctx.write(msg);
ctx.flush();
}
}
});
}
});
ChannelFuture f = b.bind(8000).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
客戶端的示例代碼如下编饺,在客戶端中我們添加了3個ChannelHandler
,分別是A
响驴,B
透且,C
。
/**
* 歡迎關(guān)注公眾號“種代碼“豁鲤,獲取博主微信深入交流
*
* @author wangjianxin
*/
public class ClientBoot {
public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("A", new AInBoundHandler());
ch.pipeline().addLast("B", new BOutBoundHandler());
ch.pipeline().addLast("C", new CDuplexHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8000);
Channel channel = channelFuture.syncUninterruptibly().channel();
channel.write(Unpooled.wrappedBuffer("Hello, 種代碼".getBytes()));
channel.flush();
channel.closeFuture().awaitUninterruptibly();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
AInboundHandler
繼承自ChannelInboundHandlerAdapter
秽誊,ChannelInboundHandlerAdapter
是Netty
中ChannelInboundHandler
的默認實現(xiàn),AInboundHandler
是一個ChannelInboundHandler
琳骡。
public class AInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ChannelActive in A");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
byte[] bytes = new byte[((ByteBuf) msg).readableBytes()];
//這里調(diào)用getBytes不會導(dǎo)致readerIndex的移動
((ByteBuf) msg).getBytes(0, bytes);
System.out.println("ChannelRead in A, msg=" + new String(bytes));
}
super.channelRead(ctx, msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("ExceptionCaught in A, cause=" + cause);
super.exceptionCaught(ctx, cause);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("HandlerAdded in A");
super.handlerAdded(ctx);
}
}
BOutboundHandler
繼承自ChannelOutboundHandlerAdapter
锅论,ChannelOutboundHandlerAdapter
是netty中``ChannelOutboundHandler的默認實現(xiàn),
BOutboundHandler是一個
ChannelOutboundHandler`楣号。
public class BOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
System.out.println("Connect in B");
super.connect(ctx, remoteAddress, localAddress, promise);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
System.out.println("Read in B");
super.read(ctx);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBuf){
byte[] bytes = new byte[((ByteBuf) msg).readableBytes()];
//這里調(diào)用getBytes不會導(dǎo)致readerIndex的移動
((ByteBuf) msg).getBytes(0, bytes);
System.out.println("Write in B, msg=" + new String(bytes));
}
super.write(ctx, msg, promise);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("HandlerAdded in B");
super.handlerAdded(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("ExceptionCaught in B, cause=" + cause);
super.exceptionCaught(ctx, cause);
}
}
CDuplexHandler
繼承自ChannelDuplexHandler
棍厌,ChannelDuplexHandler
同時實現(xiàn)了ChannelInboundHandler
和ChannelOutboundHandler
,所以CDuplexHandler
既是一個ChannelInboundHandler
又是一個ChannelOutboundHandler
竖席。
public class CDuplexHandler extends ChannelDuplexHandler {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
System.out.println("Connect in C");
super.connect(ctx, remoteAddress, localAddress, promise);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
System.out.println("Read in C");
super.read(ctx);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBuf) {
byte[] bytes = new byte[((ByteBuf) msg).readableBytes()];
//這里調(diào)用getBytes不會導(dǎo)致readerIndex的移動
((ByteBuf) msg).getBytes(0, bytes);
System.out.println("Write in C, msg=" + new String(bytes));
}
super.write(ctx, msg, promise);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("ChannelActive in C");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
byte[] bytes = new byte[((ByteBuf) msg).readableBytes()];
//這里調(diào)用getBytes不會導(dǎo)致readerIndex的移動
((ByteBuf) msg).getBytes(0, bytes);
System.out.println("ChannelRead in C, msg=" + new String(bytes));
}
super.channelRead(ctx, msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("ExceptionCaught in C, cause=" + cause);
super.exceptionCaught(ctx, cause);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("HandlerAdded in C");
super.handlerAdded(ctx);
}
}
在添加完這3個Handler
之后,我們來看一下客戶端的Pipeline
狀態(tài)敬肚,當(dāng)前Pipeline
的狀態(tài)如下圖所示毕荐。除了內(nèi)置的HeadContext
和TailContext
之外,還有我們手動添加的A
艳馒、B
憎亚、C
3個Handler
。
先啟動服務(wù)端弄慰,再啟動客戶端第美,我們可以在客戶端的控制臺中看到如下輸出。
Connect in C
Connect in B
ChannelActive in A
ChannelActive in C
Read in C
Read in B
Write in C, msg=Hello, 種代碼
Write in B, msg=Hello, 種代碼
ChannelRead in A, msg=Hello, 種代碼
ChannelRead in C, msg=Hello, 種代碼
Read in C
Read in B
4 ChannelPipeline的方向
我們在Pipeline
的構(gòu)造中提到過Pipeline
中有兩個特殊的ChannelHandlerContext
陆爽,分別是HeadContext
和TailContext
什往。既然有了Head
和Tail
那必然是有方向了,那Pipeline
的方向是什么樣的呢慌闭,我們看下面這張圖别威。
和Channel
直接接觸的方向是Head
躯舔,另一端是Tail
。
在“Netty整體架構(gòu)”這篇文章中也提到過ChannelInboundHandler
和ChannelOutboundHandler
省古,那Inbound
和Outbound
又是哪個方向呢粥庄。我們站在應(yīng)用的位置來看,即站在BizHandler
的位置來看豺妓,從Channel
讀取數(shù)據(jù)進入到我們的應(yīng)用即是Inbound
惜互,而從我們的應(yīng)用向Channel
寫數(shù)據(jù)即是Outbound
。
5 Pipeline的工作原理
我們在前面的文章中在牽涉到Pipeline
的地方都一筆帶過了琳拭,今天咱們來詳細分析一下曾經(jīng)略過的內(nèi)容训堆。
5.1 connect的傳播
在我們調(diào)用bootstrap.connect("127.0.0.1", 8000)
這行代碼進行連接時最終會調(diào)用到AbstractChannel#connect
方法,該方法邏輯很簡單臀栈,直接調(diào)用了pipeline
的connect
方法蔫慧,咱們跟進去看。
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
Pipeline
的connect
方法同樣很簡單权薯,調(diào)用了tail
的connect
方法姑躲,TailContext
繼承自AbstractChannelHandlerContext
,并沒有覆蓋connect
方法盟蚣,所以tail.connect
調(diào)用到了AbstractChannelHandlerContext
中的connect
方法黍析。
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
AbstractChannelHandlerContext
中的connect
方法,先是調(diào)用findContextOutbound
屎开,找到下一個ChannelOutboundHandler
阐枣,這個方法很簡單,就是從當(dāng)前HandlerContext
(tail
)開始向head
方向遍歷尋找下一個ChannelOutboundHandler
奄抽,在我們的示例中蔼两,tail
的下一個ChannelOutboundHandler
就是我們所添加的CDuplexHandler
,找到之后調(diào)用了next.invokeContext
方法逞度。
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return connect(remoteAddress, null, promise);
}
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
//從當(dāng)前HandlerContext向`head`方向查找下一個包含`ChannelOutboundHandler`的`HandlerContext`
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
invokeConnect
方法首先調(diào)用invokeHandler()
方法判斷當(dāng)前Handler
是否已經(jīng)添加完成(當(dāng)前Handler
的handlerAdded
方法已經(jīng)被調(diào)用過)额划。如果返回true
就調(diào)用當(dāng)前Handler
的connect
方法,這就是我們的CDuplexHandler
的connect
方法被調(diào)用到的地方档泽,也就是我們在控制臺看到的第一個輸出Connect in C
俊戳。如果返回false
就繼續(xù)調(diào)用HandlerContext
的connect
方法繼續(xù)查找下一個ChannelOutboundHanlder
。
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
//判斷當(dāng)前Handler是否已經(jīng)完成添加(即handlerAdded方法被調(diào)用過了)
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
回到咱們的CDuplexHandler
的connect
方法馆匿,打印完Connect in C
之后調(diào)用了super.connect
方法抑胎,即ChannelDuplexHandler#connect
方法。
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
System.out.println("Connect in C");
super.connect(ctx, remoteAddress, localAddress, promise);
}
ChannelDuplexHandler#connect
方法調(diào)用了它所在的HandlerContext
的connect
方法渐北,即AbstractChannelHandlerContext#connect
方法阿逃,熟悉的身影又回來了,咱們剛剛分析過這個方法,即tail.connect
盆昙,而TailContext
未覆蓋這個方法羽历,tail.connect
就是ChannelDuplexHandler#connect
。
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
顯而易見下一個ChannelOutboundHander
就是BOutboundHandler
淡喜,所以我們看到控制臺的第二行輸出Connect in B
秕磷。
最后一個ChannelOutboundHandler
是HeadContext
,我們到HeadContext
來看一下它的connnect
方法炼团。HeadContext
的connect
方法很簡單澎嚣,直接調(diào)用了unsafe
的connect
方法,而unsafe.connect
方法我們在“客戶端的啟動過程”這篇文章中已經(jīng)分析過了瘟芝,不再贅述易桃。
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
5.2 ChannelActive的傳播
我們在“客戶端的啟動過程”這篇文章中提到過AbstractNioUnsafe#finishConnect
方法,當(dāng)NioEventLoop
檢測到Channel
上有OP_CONNECT
事件發(fā)生時锌俱,會調(diào)用unsafe.finishConnect
方法晤郑。
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
finishConnect
方法在AbstratNioUnsafe
中。在調(diào)用完doFinishConnect
方法之后會調(diào)用fulfillConnectPromise
方法贸宏,我們一起來看一下造寝。
@Override
public final void finishConnect() {
try {
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
} finally {
}
}
fulfillConnectPromise
方法里有很多內(nèi)容,這里咱們只關(guān)注本次要講的重點吭练,pipeline().fireChannelActive()
诫龙。
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (!wasActive && active) {
pipeline().fireChannelActive();
}
}
我們看一下pipeline
的fireChannelActive
方法,這里調(diào)用了invokeChannelActive
方法鲫咽,傳遞的參數(shù)是HeadContext
签赃。
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
invokeChannelActive
方法調(diào)用了next.invokeChannelActive
,這里的next
就是HeadContext
分尸。
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
invokeChannelActive
方法調(diào)用了Handler
的channelActive
方法锦聊,這里的Handler
自然也是HeadContext
。
private void invokeChannelActive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelActive();
}
}
HeadContext
里的channelActive
方法調(diào)用了ct.fireChannelActive()
箩绍,顯然這里的ctx
就是HeadContext
本身孔庭。
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
fireChannelActive
方法在AbstractChannelHandlerContext
中,它首先從當(dāng)前HandlerContext
查找下一個ChannelInboundHandler
伶选,然后調(diào)用invokeChannelActive
方法,熟悉的身影又回來了尖昏,invokeChannelActive
方法咱們剛剛也見過仰税。剛才調(diào)用invokeChannelActive
方法是由Pipeline
發(fā)起的,直接傳遞了head
作為參數(shù)抽诉,而這里是由head
發(fā)起的陨簇,參數(shù)是從head
開始的下一個包含ChannelInboundHandler
,顯然這個參數(shù)是我們所添加的AInboundHandler
,所以我們在控制臺看到ChannelActive in A
河绽。
@Override
public ChannelHandlerContext fireChannelActive() {
invokeChannelActive(findContextInbound());
return this;
}
繼續(xù)下去可以想見AInboundHandler
被調(diào)用后也會查找下一個ChannelInboundHandler
就是CDuplexHandler
己单,所以我們在控制臺看到的下一條輸出就是ChannelActive in C
。
5.3 read的傳播
剛才咱們在看HeadContext
里的channelActive
方法時耙饰,其中并不僅僅只有ctx.fireChannelActive()
調(diào)用纹笼,緊跟著的就是readIfIsAutoRead()
的調(diào)用。咱們一起來看一下苟跪。這里判斷channel
是否配置了自動讀取廷痘,默認情況下是true
,接著調(diào)用了channel.read()
件已。
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
channel.read
方法在AbstractChannel
中笋额,調(diào)用了pipeline.read
方法。
@Override
public Channel read() {
pipeline.read();
return this;
}
pipeline
的read
方法調(diào)用了tail.read
篷扩,這里的tail
就是指TailContext
了兄猩。
@Override
public final ChannelPipeline read() {
tail.read();
return this;
}
tail
沒有覆蓋read
方法,這個方法的實現(xiàn)在AbstractChannelHandlerContext
中鉴未,首先調(diào)用findContextOutbound
方法查找下一個ChannelOutboundHandler
枢冤。第一個查找到的就是CDuplexHandler
,調(diào)用了它的invokeRead
方法歼狼。
@Override
public ChannelHandlerContext read() {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
} else {
Runnable task = next.invokeReadTask;
if (task == null) {
next.invokeReadTask = task = new Runnable() {
@Override
public void run() {
next.invokeRead();
}
};
}
executor.execute(task);
}
return this;
}
invokeRead
方法在AbstractChannelHandlerContext
中掏导,invokeRead
調(diào)用了它所包含的ChannelInboundHandler
的channelRead
方法,即CDuplexHandler
的channelRead
方法羽峰,在控制臺打印出Read in C
趟咆。
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
CDuplexHandler
的channelRead
方法在打印完成后調(diào)用了super.read(ctx)
,我們跟下去看一下梅屉。
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
System.out.println("Read in C");
super.read(ctx);
}
接著調(diào)用了ctx.read()
值纱,即AbstractChannelHandlerContext#read
方法,熟悉的方法又出現(xiàn)了坯汤,接著調(diào)用下去就到了下一個ChannelOutboundHandler
即BOutboundHandler
虐唠,在控制臺打印出Read in B
。
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
接著往下調(diào)用就到了HeadContext
惰聂,HeadContext
的read
方法調(diào)用了unsafe.beginRead
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
unsafe
的beginRead
方法調(diào)用了doBeginRead
疆偿。
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
我們以AbstractNioChannel
為例看一下doBeginRead
方法。這里的doBeginRead
方法把readInterestOp
加入到興趣事件中搓幌,readInterestOp
在構(gòu)造方法中被賦值杆故,即是OP_READ
興趣事件。
@Override
protected void doBeginRead() throws Exception {
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
調(diào)用read
方法溉愁,并不能真正從Channel
中讀取數(shù)據(jù)处铛,只是把OP_READ
興趣事件加入到selectionKey
中。
5.4 write的傳播
在完成連接后,我們在客戶端代碼中調(diào)用了channel.write(Unpooled.wrappedBuffer("Hello, 種代碼".getBytes()));
向服務(wù)端發(fā)送數(shù)據(jù)撤蟆。咱們跟進去看看奕塑,這里的channel.write
方法在AbstractChannel
中,直接調(diào)用了pipeline.write
方法家肯。
@Override
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}
pipeline
的write
方法調(diào)用了tail.write
方法龄砰,tail
即是TailContext
。
@Override
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
咱們就不再一步步跟下去了息楔,從控制臺打印出的Write in C, msg=Hello, 種代碼
和Write in B, msg=Hello, 種代碼
來推測寝贡,write
命令從tail
開始經(jīng)過了CDuplexHandler
和BOutboundHandler
最后到達HeadContext
。
5.5 ChannelRead的傳播
在服務(wù)端接收到數(shù)據(jù)之后值依,直接將數(shù)據(jù)原樣返回圃泡,當(dāng)Eventloop
發(fā)現(xiàn)Channel
中發(fā)生OP_READ
事件時,調(diào)用unsafe.read()
進行數(shù)據(jù)的讀取愿险。這部分代碼咱們在講EventLoop
的時候已經(jīng)提到過颇蜡,在NioEventLoop
的processSelectedKey
方法中。
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
咱們跟進去看看unsafe.read
方法辆亏,方法實現(xiàn)在NioByteUnsafe
中风秤,其中分配緩沖區(qū)讀取數(shù)據(jù)的過程咱們先略過,這篇文章咱們著重介紹Pipeline
扮叨,直接看讀取數(shù)據(jù)之后調(diào)用了什么方法缤弦。在讀取數(shù)據(jù)之后(所分配的緩沖區(qū)填滿)調(diào)用了pipeline.fireChannelRead(byteBuf)
,而在讀完數(shù)據(jù)之后(tcp緩沖區(qū)數(shù)據(jù)被讀完)調(diào)用了pipeline.fireChannelReadComplete()
彻磁。
@Override
public final void read() {
try {
do {
//分配緩沖區(qū)
byteBuf = allocHandle.allocate(allocator);
//讀取數(shù)據(jù)
allocHandle.lastBytesRead(doReadBytes(byteBuf));
//觸發(fā)ChannelRead事件
pipeline.fireChannelRead(byteBuf);
} while (allocHandle.continueReading());
//觸發(fā)ChannelReadComplete事件
pipeline.fireChannelReadComplete();
} catch (Throwable t) {
} finally {
}
}
}
先看看``pipeline.fireChannelRead(byteBuf)方法碍沐,很簡單,這里調(diào)用了
AbstractChannelHandlerContext.invokeChannelRead方法衷蜓,并傳入?yún)?shù)
head累提,即是
HeadContext`。
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
invokeChannelRead
方法又調(diào)用了next.invokeChannelRead
磁浇,這里的next
即HeadContext
斋陪。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
到這兒里咱們也不再一步步跟下去了,根據(jù)控制臺打印出的ChannelRead in A, msg=Hello, 種代碼
和ChannelRead in C, msg=Hello, 種代碼
來推測置吓,ChannelRead
事件從HeadContext
開始經(jīng)過AInboundHandler
无虚,和CDuplexHandler
到達TailContext
。
最后在控制臺又打印出“Read in C”和“Read in B”是怎么回事呢衍锚,那是因為在pipeline.fireChannelReadComplete()
又發(fā)出了read
命令友题,咱們不再贅述。
6 總結(jié)
-
ChannelOutboundInvoker
:用來向Channel
發(fā)送“命令”的接口构拳。 -
ChannelInboundInvoker
:用來觸發(fā)Channel
上發(fā)生“事件”的接口咆爽。 -
ChannelOutboundHandler
:“命令”在Pipeline
上傳播時的回調(diào)方法。 -
ChannelInboundHandler
: “事件”在Pipeline
上傳播時的回調(diào)方法置森。 - 在
Pipeline
中主動發(fā)出的命令斗埂,例如connect
、read
凫海、write
等由TailContext
開始依次經(jīng)過所有的ChannelOutboundHandler
呛凶,最后到達HeadContext
。 - 在
Pipeline
中被動發(fā)生的事件行贪,例如ChannelActive
漾稀、ChannelRead
等由HeadContext
開始依次經(jīng)過所有的ChannelInboundHandler
,最后到達TailContext
建瘫。
關(guān)于作者
王建新崭捍,轉(zhuǎn)轉(zhuǎn)架構(gòu)部資深Java工程師,主要負責(zé)服務(wù)治理啰脚、RPC框架殷蛇、分布式調(diào)用跟蹤、監(jiān)控系統(tǒng)等橄浓。愛技術(shù)粒梦、愛學(xué)習(xí),歡迎聯(lián)系交流荸实。