像以往一樣赞庶,繼續(xù)回顧這幅圖。目前為止巡通,我們學習了Netty的EventLoop尘执、Channel以及ChannelFuture,還差最后兩個部分:ByteBuf和ChannelHandler宴凉。ByteBuf作為通道讀寫數(shù)據(jù)的緩沖區(qū)誊锭,Channel底層數(shù)據(jù)的讀寫細節(jié)正是由ByteBuf完成。ChannelHandler作為處理各種事件的處理器弥锄,為用戶提供實際的業(yè)務邏輯處理功能丧靡。在本章中蟆沫,我們將介紹ChannelHandler以及存儲它的容器ChannelPipeline。使用自頂向下的方法温治,首先介紹整體ChannePipeline饭庞,然后介紹ChannelHandler。
7.1 總述
7.1.1 ChannelPipeline
提到pipeline熬荆,我們首先想到的是*nix中的管道舟山,可實現(xiàn)將一個程序的輸出作為另一個程序的輸入。ChannelPipeline也實現(xiàn)類似的功能卤恳,不同的是:ChannelPipeline將一個ChannelHandler的處理后的數(shù)據(jù)作為下一個ChannelHandler處理的數(shù)據(jù)源累盗。Netty的ChannelPipeline示意圖如下:
Xnix的管道中流動的是數(shù)據(jù),ChnanelPipeline中流動的是事件(事件中可能附加數(shù)據(jù))突琳。Netty定義了兩種事件類型:入站(inbound)事件和出站(outbound)事件若债。ChannelPipeline使用攔截過濾器模式使用戶可以掌控ChannelHandler處理事件的流程。注意:事件在ChannelPipeline中不自動流動而需要調(diào)用ChannelHandlerContext中諸如fileXXX()或者read()類似的方法將事件從一個ChannelHandler傳播到下一個ChannelHandler拆融。
事實上蠢琳,ChannelHandler不處理具體的事件,處理具體的事件由相應的子類完成:ChannelInboundHandler處理和攔截入站事件镜豹,ChannelOutboundHandler處理和攔截出站事件傲须。那么事件是怎么在ChannelPipeline中流動的呢?我們使用代碼注釋中的例子:
ChannelPipeline p = ...;
p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
對于入站事件逛艰,處理序列為:1-->2-->5躏碳;對于出站事件,處理序列為:5-->4-->3散怖『鹗唬可見涩澡,入站事件與出站事件處理順序正好相反。事件不會在ChannelPipeline中自動流動,而完全由用戶控制劲妙,所以ChannelHandler處理的代碼可能如下:
public class InboundHandlerA implements ChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Connected!"); // 用戶自定義處理邏輯
ctx.fireChannelActive(); // 將channelActive事件傳播到InboundHandlerB
}
}
public class OutboundHandlerB extends ChannelOutboundHandler{
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
System.out.println("Closing .."); // 用戶自定義處理邏輯
ctx.close(promise); // 將close事件傳播到OutboundHandlerA
}
}
入站事件一般由I/O線程觸發(fā)请敦,以下事件為入站事件:
ChannelRegistered() // Channel注冊到EventLoop
ChannelActive() // Channel激活
ChannelRead(Object) // Channel讀取到數(shù)據(jù)
ChannelReadComplete() // Channel讀取數(shù)據(jù)完畢
ExceptionCaught(Throwable) // 捕獲到異常
UserEventTriggered(Object) // 用戶自定義事件
ChannelWritabilityChanged() // Channnel可寫性改變贸营,由寫高低水位控制
ChannelInactive() // Channel不再激活
ChannelUnregistered() // Channel從EventLoop中注銷
出站事件一般由用戶觸發(fā)贾漏,以下事件為出站事件:
bind(SocketAddress, ChannelPromise) // 綁定到本地地址
connect(SocketAddress, SocketAddress, ChannelPromise) // 連接一個遠端機器
write(Object, ChannelPromise) // 寫數(shù)據(jù),實際只加到Netty出站緩沖區(qū)
flush() // flush數(shù)據(jù)具伍,實際執(zhí)行底層寫
read() // 讀數(shù)據(jù)翅雏,實際設置關心OP_READ事件,當數(shù)據(jù)到來時觸發(fā)ChannelRead入站事件
disconnect(ChannelPromise) // 斷開連接人芽,NIO Server和Client不支持望几,實際調(diào)用close
close(ChannelPromise) // 關閉Channel
deregister(ChannelPromise) // 從EventLoop注銷Channel
入站事件一般由I/O線程觸發(fā),用戶程序員也可根據(jù)實際情況觸發(fā)萤厅¢夏ǎ考慮這樣一種情況:一個協(xié)議由頭部和數(shù)據(jù)部分組成靴迫,其中頭部含有數(shù)據(jù)長度,由于數(shù)據(jù)量較大楼誓,客戶端分多次發(fā)送該協(xié)議的數(shù)據(jù)玉锌,服務端接收到數(shù)據(jù)后需要收集足夠的數(shù)據(jù),組裝為更有意義的數(shù)據(jù)傳給下一個ChannelInboudHandler疟羹。也許你已經(jīng)知道主守,這個收集數(shù)據(jù)的ChannelInboundHandler正是Netty中基本的Encoder,Encoder中會處理多次ChannelRead()事件阁猜,只觸發(fā)一次對下一個ChannelInboundHandler更有意義的ChannelRead()事件丸逸。
出站事件一般由用戶觸發(fā),而I/O線程也可能會觸發(fā)剃袍。比如,當用戶已配置ChannelOption.AutoRead選項捎谨,則I/O在執(zhí)行完ChannelReadComplete()事件民效,會調(diào)用read()方法繼續(xù)關心OP_READ事件,保證數(shù)據(jù)到達時自動觸發(fā)ChannelRead()事件涛救。
如果你初次接觸Netty畏邢,會對下面的方法感到疑惑,所以列出區(qū)別:
channelHandlerContext.close() // close事件傳播到下一個Handler
channel.close() // ==channelPipeline.close()
channelPipeline.close() // 事件沿整個ChannelPipeline傳播检吆,注意in/outboud的傳播起點
回憶AbstractChannel的構造方法:
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
可見舒萎,新建一個Channel時會自動新建一個ChannelPipeline,也就是說他們之間是一對一的關系蹭沛。另外需要注意的是:ChannelPipeline是線程安全的臂寝,也就是說,我們可以動態(tài)的添加摊灭、刪除其中的ChannelHandler咆贬。考慮這樣的場景:服務器需要對用戶登錄信息進行加密帚呼,而其他信息不加密掏缎,則可以首先將加密Handler添加到ChannelPipeline,驗證完用戶信息后煤杀,主動從ChnanelPipeline中刪除眷蜈,從而實現(xiàn)該需求。
7.1.2 ChannelHandler
ChannelHandler并沒有方法處理事件沈自,而需要由子類處理:ChannelInboundHandler攔截和處理入站事件酌儒,ChannelOutboundHandler攔截和處理出站事件。我們已經(jīng)明白酥泛,ChannelPipeline中的事件不會自動流動今豆,而我們一般需求事件自動流動嫌拣,Netty提供了兩個Adapter:ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter來滿足這種需求。其中的實現(xiàn)類似如下:
// inboud事件默認處理過程
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered(); // 事件傳播到下一個Handler
}
// outboud事件默認處理過程
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise); // 事件傳播到下一個Handler
}
在Adapter中呆躲,事件默認自動傳播到下一個Handler异逐,這樣帶來的另一個好處是:用戶的Handler類可以繼承Adapter且覆蓋自己感興趣的事件實現(xiàn),其他事件使用默認實現(xiàn)插掂,不用再實現(xiàn)ChannelIn/outboudHandler接口中所有方法灰瞻,提高效率。
我們常常遇到這樣的需求:在一個業(yè)務邏輯處理器中辅甥,需要寫數(shù)據(jù)庫酝润、進行網(wǎng)絡連接等耗時業(yè)務。Netty的原則是不阻塞I/O線程璃弄,所以需指定Handler執(zhí)行的線程池要销,可使用如下代碼:
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
...
ChannelPipeline pipeline = ch.pipeline();
// 簡單非阻塞業(yè)務,可以使用I/O線程執(zhí)行
pipeline.addLast("decoder", new MyProtocolDecoder());
pipeline.addLast("encoder", new MyProtocolEncoder());
// 復雜耗時業(yè)務夏块,使用新的線程池
pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
ChannelHandler中有一個Sharable注解疏咐,使用該注解后多個ChannelPipeline中的Handler對象實例只有一個,從而減少Handler對象實例的創(chuàng)建脐供。代碼示例如下:
public class DataServerInitializer extends ChannelInitializer<Channel> {
private static final DataServerHandler SHARED = new DataServerHandler();
@Override
public void initChannel(Channel channel) {
channel.pipeline().addLast("handler", SHARED);
}
}
Sharable注解的使用是有限制的浑塞,多個ChannelPipeline只有一個實例,所以該Handler要求無狀態(tài)政己。上述示例中酌壕,DataServerHandler的事件處理方法中,不能使用或改變本身的私有變量歇由,因為ChannelHandler是非線程安全的卵牍,使用私有變量會造成線程競爭而產(chǎn)生錯誤結果。
7.1.3 ChannelHandlerContext
Context指上下文關系印蓖,ChannelHandler的Context指的是ChannleHandler之間的關系以及ChannelHandler與ChannelPipeline之間的關系辽慕。ChannelPipeline中的事件傳播主要依賴于ChannelHandlerContext實現(xiàn),由于ChannelHandlerContext中有ChannelHandler之間的關系赦肃,所以能得到ChannelHandler的后繼節(jié)點溅蛉,從而將事件傳播到下一個ChannelHandler。
ChannelHandlerContext繼承自AttributeMap他宛,所以提供了attr()方法設置和刪除一些狀態(tài)屬性值船侧,用戶可將業(yè)務邏輯中所需使用的狀態(tài)屬性值存入到Context中。此外厅各,Channel也繼承自AttributeMap镜撩,也有attr()方法,在Netty4.0中队塘,這兩個attr()方法并不等效袁梗,這會給用戶程序員帶來困惑并且增加內(nèi)存開銷宜鸯,所以Netty4.1中將channel.attr()==ctx.attr()。在使用Netty4.0時遮怜,建議只使用channel.attr()防止引起不必要的困惑淋袖。
一個Channel對應一個ChannelPipeline,一個ChannelHandlerContext對應一個ChannelHandler锯梁,但一個ChannelHandler可以對應多個ChannelHandlerContext即碗。當一個ChannelHandler使用Sharable注解修飾且添加同一個實例對象到不用的Channel時,只有一個ChannelHandler實例對象陌凳,但每個Channel中都有一個ChannelHandlerContext對象實例與之對應剥懒。