該文章基于個人的理解批什,翻譯自netty5.0 API务甥。
綜述
ChannelPipeline的作用是保存一個ChannelHandler的列表麻蹋,這些ChannelHandler用于處理或者攔截一個channel的inbound事件和outbound操作宛徊。ChannelPipeline實現(xiàn)了一個高級形式的I攔截過濾器模式歧杏,給了用戶完全的控制權來決定一個event如何被處理陪每,以及在pipeline中的channelHandler之間如何的交互影晓。
創(chuàng)建一個pipeline
每一個channel都有自己的pipeline,并且當一個新的channel被創(chuàng)建時其對應的pipeline也自動被創(chuàng)建檩禾。
event在pipeline中如何流動
下面的圖描述了I/O event如何被ChannelPipeline中的ChannelHandlers處理的典型情況挂签。一個I/O event被一個ChannelHandler處理,并被這個ChannelHandler向前遞交給其相鄰的下一個ChannelHandler盼产。如果必要的話饵婆,一個ChannelHandler同樣可以觸發(fā)一個任意的I/O event。為了遞交或者觸發(fā)一個event戏售,一個ChannelHandler調用定義在ChannelHandlerContext中的event繁殖方法侨核,比如:ChannelHandlerContext.fireChannelRead(Object)和ChannelHandlerContext.write(Object)。
一個inbound event被圖中左邊的ChannelHandlers從下至上地處理灌灾。一個inbound event通常被圖中底部的I/O線程所觸發(fā)搓译,以便于在Channel的狀態(tài)改變的時候(比如:新創(chuàng)建了連接和關閉了連接),或者在inbound數(shù)據(jù)被遠端讀取的時候锋喜,ChannelHandlers能夠被通知到些己。如果一個inbound event超過了圖中的頂部的范圍,那么它將根據(jù)你的日志等級決定是被拋棄還是被記錄入日志嘿般。
一個outbound event被圖中右邊的ChannelHandlers從上至下地處理段标。一個outbound event通常被你用于請求一個outbound I/O操作的代碼所觸發(fā),比如一個寫請求和一個連接的嘗試炉奴。如果一個outbound event超過了圖中ChannelHandlers底部的范圍逼庞,它將被與Channel相關的I/O線程所處理。I/O線程通常進行真正的output操作瞻赶,比如:SocketChannel.write(ByteBuffer)往堡。
將一個event向前遞交給下一個相鄰的handler
根據(jù)上面簡單的解釋械荷,一個ChannelHandler只能通過喚醒ChannelHandlerContext中event繁殖的方法,來將一個event遞交給它的下一個handler虑灰。這些方法包括:
- Inbound event propagation methods:
- ChannelHandlerContext.fireChannelRegistered()
- ChannelHandlerContext.fireChannelActive()
- ChannelHandlerContext.fireChannelRead(Object)
- ChannelHandlerContext.fireChannelReadComplete()
- ChannelHandlerContext.fireExceptionCaught(Throwable)
- ChannelHandlerContext.fireUserEventTriggered(Object)
- ChannelHandlerContext.fireChannelWritabilityChanged()
- ChannelHandlerContext.fireChannelInactive()
- Outbound event propagation methods:
- ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
- ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
- ChannelHandlerContext.write(Object, ChannelPromise)
- ChannelHandlerContext.flush()
- ChannelHandlerContext.read()
- ChannelHandlerContext.disconnect(ChannelPromise)
- ChannelHandlerContext.close(ChannelPromise)
下面的例子展示了event繁殖通常怎么做:
public class MyInboundHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Connected!");
ctx.fireChannelActive();
}
}
public class MyOutboundHandler extends ChannelHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
System.out.println("Closing ..");
ctx.close(promise);
}
}
創(chuàng)建一個pipeline
用戶可能有一個或多個ChannelHandlers在一個pipeline中來接收I/O events(比如:讀)和請求I/O操作(比如:寫和關閉)吨瞎。例如,一個典型的服務端在每一個channel的pipeline中都會有以下的handlers穆咐。但是你的handler的個數(shù)通常是變化的颤诀,這取決于protocol和業(yè)務邏輯的復雜性和特點:
- Protocol解碼器:將二進制數(shù)據(jù)(比如:ByteBuf)轉成一個Java對象。
- Protocol編碼器:將一個Java對象轉成二進制數(shù)據(jù)对湃。
- 業(yè)務邏輯相關的Handler:處理真正的業(yè)務邏輯(比如:數(shù)據(jù)庫訪問)崖叫。
這有可能像下面的例子展示的這樣:
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
...
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new MyProtocolDecoder());
pipeline.addLast("encoder", new MyProtocolEncoder());
// Tell the pipeline to run MyBusinessLogicHandler's event handler methods
// in a different thread than an I/O thread so that the I/O thread is not blocked by
// a time-consuming task.
// If your business logic is fully asynchronous or finished very quickly, you don't
// need to specify a group.
pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
線程安全
一個ChannelHandler在任何時候被添加或移除,因為一個ChannelPipeline是線程安全的拍柒。例如心傀,你可以在將要交換敏感信息的時候添加一個加密的handler,并在交換完畢之后移除它拆讯。