上一節(jié)中介紹的java NIO的開發(fā)责掏,回顧下NIO開發(fā)的步驟:
1、創(chuàng)建ServerSocketChannel并設(shè)置為非阻塞模式
2小槐、綁定監(jiān)聽端口
3皂股、創(chuàng)建多路服務(wù)器Selector夕凝,將創(chuàng)建的ServerSocketChannel注冊到Selector,監(jiān)聽SelectKey.Accept事件
4努潘、創(chuàng)建IO線程輪詢Selector.select
5呻此、如果輪詢到就緒的Channel如果是OP_ACCEPT狀態(tài)說明是新的客戶端接入則調(diào)用ServerSocketChannel.accept方法接受新的客戶端
6火俄、設(shè)置客戶端鏈路SocketChannel為非阻塞模式棘利,并且將其注冊到Selector上呢撞,監(jiān)聽SelectKey.OP_READ操作位
7蒜茴、如果輪詢到Channel為OP_READ,則說明SocketChannel中有新的就緒的數(shù)據(jù)包需要讀取洋机,則通過ByteBuffer讀取數(shù)據(jù)
由上來看,直接通過Java原生的類進(jìn)行NIO編程非常復(fù)雜繁瑣浇冰,而Netty正好解決了這個(gè)難題
下面通過一個(gè)簡單的Netty程序了解Netty開發(fā)
netty服務(wù)端代碼:
public class TimeServer {
public void bind(int port) throws Exception {
//創(chuàng)建兩個(gè)線程組 一個(gè)用于服務(wù)端接收客戶端的連接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//一個(gè)用于網(wǎng)絡(luò)讀寫
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHander());
ChannelFuture future = b.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHander extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler());
}
}
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
}
}
try {
new TimeServer().bind(port);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class TimeServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
ByteBuf byteBuf = (ByteBuf)msg;
byte[] req = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(req);
String body = new String(req,"UTF-8");
System.out.println("the time server receive order:"+body);
String currentTIme = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()
).toString():"BAD ORDER";
ByteBuf resp = Unpooled.copiedBuffer(currentTIme.getBytes());
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
程序剛開始創(chuàng)建了兩個(gè)EventLoopGroup線程贬媒,一個(gè)專門監(jiān)聽服務(wù)端接收客戶端請求,一個(gè)專門處理業(yè)務(wù)邏輯肘习,之后創(chuàng)建一個(gè)ServerBootstrap類,并將線程組傳遞到ServerBootstrap际乘,設(shè)置的Channel為NioServerSocketChannel并將事件處理類設(shè)置為ChildChannelHander,最后調(diào)用bind方法綁定監(jiān)聽端口漂佩,最后future.channel().closeFuture().sync()會(huì)阻塞直到服務(wù)端斷開連接
客戶端代碼:
public class TimeClient {
public void connect(int port,String host) throws Exception {
//創(chuàng)建讀寫io線程組
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
System.out.println("1");
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
System.out.println("2");
ChannelFuture f = b.connect(host,port).sync();
System.out.println("3");
f.channel().closeFuture().sync();
System.out.println("4");
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length >0) {
try {
port = Integer.valueOf(args[0]);
}catch (NumberFormatException e) {
}
}
try {
new TimeClient().connect(port,"127.0.0.1");
} catch (Exception e) {
e.printStackTrace();
}
}
}
Netty客戶端代碼更為簡單脖含,第一步創(chuàng)建NioEventLoopGroup線程組,跟服務(wù)端一樣再創(chuàng)建客戶端輔助類Bootstrap并將線程組和渠道作為參數(shù)出傳入投蝉,并且指定handler實(shí)現(xiàn)initChannel方法养葵,其作用就是當(dāng)創(chuàng)建NioSocketChannel之后回調(diào)initChannel方法處理網(wǎng)絡(luò)IO事件
下面是TimeChientHander代碼
public class TimeClientHandler extends ChannelHandlerAdapter {
private final ByteBuf firstMsg;
public TimeClientHandler() {
byte[] req = "QUERY TIME ORDER".getBytes();
firstMsg = Unpooled.buffer(req.length);
firstMsg.writeBytes(req);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(firstMsg);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8");
System.out.println("Now is : "+body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("Unexpected exception from downstream :"+cause.getMessage());
ctx.close();
}
}
TimeChientHander實(shí)現(xiàn)了channelActive,channelRead,exceptionCaught方法,
channelActive為客戶端或者服務(wù)端TCP鏈路建立連接之后回調(diào)瘩缆,當(dāng)服務(wù)端寫數(shù)據(jù)到客戶端時(shí)channelRead回調(diào)关拒,當(dāng)發(fā)生異常時(shí)會(huì)回調(diào)exceptionCaught方法。
從上面代碼可以看出Netty將IO事件的處理抽象為ChannelHandler接口,開發(fā)者可以繼承ChannelHandler接口來自定義事件處理邏輯庸娱。
ChannelHandler接口主要有以下方法:
channelRegistered(ChannelHandlerContext ctx) 注冊到EventLoop成功回調(diào)
channelActive(ChannelHandlerContext ctx) 當(dāng)建立連接后回調(diào)
channelRead(ChannelHandlerContext ctx, Object msg) 當(dāng)前channel讀到數(shù)據(jù)時(shí)回調(diào)
channelReadComplete(ChannelHandlerContext ctx) 讀取數(shù)據(jù)完成后回調(diào)
exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 發(fā)生異常時(shí)回調(diào)
在實(shí)際Netty開發(fā)中基本都是繼承ChannelHandler實(shí)現(xiàn)業(yè)務(wù)邏輯
在初始化Channel時(shí)需要將自定義的ChannelHandler添加到該Channel上着绊,Netty實(shí)現(xiàn)是ChannelPipeline.addLast
源碼如下
public interface ChannelPipeline extends Iterable<Entry<String, ChannelHandler>> {
……
}
其中繼承Iterable迭代器接口,可以看出其hannelPipeline是一系列ChannelHandler的集合(可以理解為鏈表)自定義的ChannelHandler可以調(diào)用它的addFirst涌韩、addLast添加到IO處理pipeLine的頭或者尾
主要方法有以下:
ChannelPipeline addFirst(String name, ChannelHandler handler) 將handler插入到handler鏈表頭部
ChannelPipeline addLast(ChannelHandler... handlers) 將handler插入到handler鏈表尾部
自動(dòng)以的ChannelHandler一般通過上述方法添加到事件處理鏈表中