5.Handler業(yè)務(wù)處理器
在Reactor反應(yīng)器經(jīng)典模型中果复,反應(yīng)器查詢到IO事件后费坊,分發(fā)到Handler業(yè)務(wù)處理器,由Handler完成IO操作和業(yè)務(wù)處理秉犹。整個的IO處理操作環(huán)節(jié)包括:從通道讀數(shù)據(jù)包殖氏、數(shù)據(jù)包解碼晚树、業(yè)務(wù)處理、目標數(shù)據(jù)編碼雅采、把數(shù)據(jù)包寫到通道爵憎,然后由通道發(fā)送到對端
用戶程序主要在Handler業(yè)務(wù)處理器中,Handler涉及的環(huán)節(jié)為:數(shù)據(jù)包解碼总滩、業(yè)務(wù)處理纲堵、目標數(shù)據(jù)編碼、把數(shù)據(jù)包寫到通道中闰渔。
從應(yīng)用程序開發(fā)人員的角度來看席函,有入站和出站兩種類型操作。
· 入站處理冈涧,觸發(fā)的方向為:自底向上茂附,Netty的內(nèi)部(如通道)到ChannelInboundHandler入站處理器正蛙。
· 出站處理,觸發(fā)的方向為:自頂向下营曼,從ChannelOutboundHandler出站處理器到Netty的內(nèi)部(如通道)乒验。
-
ChannelInboundHandler通道入站處理器
-
ChannelOutboundHandler通道出站處理器
5.1ChannelInitializer通道初始化處理器
通道和Handler業(yè)務(wù)處理器的關(guān)系是:一條Netty的通道擁有一條Handler業(yè)務(wù)處理器流水線,負責裝配自己的Handler業(yè)務(wù)處理器
如果向流水線中裝配業(yè)務(wù)處理器呢蒂阱?這就得借助通道的初始化類——ChannelInitializer锻全。
- initChannel()方法是ChannelInitializer定義的一個抽象方法,這個抽象方法需要開發(fā)人員自己實現(xiàn)录煤。在父通道調(diào)用initChannel()方法時鳄厌,會將新接收的通道作為參數(shù),傳遞給initChannel()方法妈踊。initChannel()方法內(nèi)部大致的業(yè)務(wù)代碼是:拿到新連接通道作為實際參數(shù)了嚎,往它的流水線中裝配Handler業(yè)務(wù)處理器。
5.2ChannelInboundHandler的生命周期
ChannelInboundHandler的生命周期分2類:
- 生命周期方法
(1)handlerAdded() :當業(yè)務(wù)處理器被加入到流水線后廊营,此方法被回調(diào)歪泳。也就是在完成ch.pipeline().addLast(handler)語句之后,會回調(diào)handlerAdded()露筒。
(2)channelRegistered():當通道成功綁定一個NioEventLoop線程后呐伞,會通過流水線回調(diào)所有業(yè)務(wù)處理器的channelRegistered()方法。(3)channelActive():當通道激活成功后邀窃,會通過流水線回調(diào)所有業(yè)務(wù)處理器的channelActive()方法荸哟。通道激活成功指的是假哎,所有的業(yè)務(wù)處理器添加瞬捕、注冊的異步任務(wù)完成,并且NioEventLoop線程綁定的異步任務(wù)完成舵抹。(4)channelInactive():當通道的底層連接已經(jīng)不是ESTABLISH狀態(tài)肪虎,或者底層連接已經(jīng)關(guān)閉時,會首先回調(diào)所有業(yè)務(wù)處理器的channelInactive()方法惧蛹。
(5)channelUnregistered():通道和NioEventLoop線程解除綁定扇救,移除掉對這條通道的事件處理之后,回調(diào)所有業(yè)務(wù)處理器的channelUnregistered ()方法香嗓。
(6)handlerRemoved():最后迅腔,Netty會移除掉通道上所有的業(yè)務(wù)處理器,并且回調(diào)所有的業(yè)務(wù)處理器的handlerRemoved()方法靠娱。
-入棽琢遥回調(diào)方法
(1)channelRead():有數(shù)據(jù)包入站,通道可讀像云。流水線會啟動入站處理流程锌雀,從前向后蚂夕,入站處理器的channelRead()方法會被依次回調(diào)到。
(2)channelReadComplete():流水線完成入站處理后腋逆,會從前向后婿牍,依次回調(diào)每個入站處理器的channelReadComplete()方法,表示數(shù)據(jù)讀取完畢惩歉。
5.3代碼示例
public class InHandlerDemo extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Logger.info("被調(diào)用:handlerAdded()");
super.handlerAdded(ctx);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
Logger.info("被調(diào)用:channelRegistered()");
super.channelRegistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Logger.info("被調(diào)用:channelActive()");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Logger.info("被調(diào)用:channelRead()");
super.channelRead(ctx, msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
Logger.info("被調(diào)用:channelReadComplete()");
super.channelReadComplete(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Logger.info("被調(diào)用:channelInactive()");
super.channelInactive(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
Logger.info("被調(diào)用: channelUnregistered()");
super.channelUnregistered(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Logger.info("被調(diào)用:handlerRemoved()");
super.handlerRemoved(ctx);
}
}
測試類
public class InHandlerDemoTester {
@Test
public void testInHandlerLifeCircle() {
final InHandlerDemo inHandler = new InHandlerDemo();
//初始化處理器
ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
@Override
protected void initChannel(EmbeddedChannel ch) {
ch.pipeline().addLast(inHandler);
}
};
//創(chuàng)建嵌入式通道
EmbeddedChannel channel = new EmbeddedChannel(i);
ByteBuf buf = Unpooled.buffer();
buf.writeInt(1);
//模擬入站等脂,寫一個入站包
channel.writeInbound(buf);
channel.flush();
//模擬入站,再寫一個入站包
channel.writeInbound(buf);
channel.flush();
//通道關(guān)閉
channel.close();
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}