Netty服務(wù)器構(gòu)成
- 至少一個(gè)ChannelHandler——該組件實(shí)現(xiàn)了服務(wù)器對(duì)從客戶端接受的數(shù)據(jù)的處理诬垂,即它的業(yè)務(wù)邏輯
- 引導(dǎo)——配置服務(wù)器的啟動(dòng)代碼凛澎。至少,它會(huì)將服務(wù)器綁定到它要監(jiān)聽(tīng)連接請(qǐng)求的端口上。
ChannelHandler和業(yè)務(wù)邏輯
?ChannelHandler是一個(gè)接口族的父接口燕刻,它的實(shí)現(xiàn)負(fù)責(zé)接受并響應(yīng)事件通知,在Netty應(yīng)用程序中剖笙,所有的數(shù)據(jù)處理邏輯都包含在這些核心抽象的實(shí)現(xiàn)中卵洗。
?Echo服務(wù)器會(huì)響應(yīng)傳入的消息,因此需要實(shí)現(xiàn)ChannelInboundHandler接口弥咪,用來(lái)定義響應(yīng)入站事件的方法过蹂。由于Echo服務(wù)器的應(yīng)用程序只需要用到少量的方法,所以只需要繼承ChannelInboundHandlerAdapter類(lèi)聚至,它提供了ChannelInboundHandler的默認(rèn)實(shí)現(xiàn)酷勺。
?在ChannelInboundHandler中,我們感興趣的方法有:
- channelRead()——對(duì)于每個(gè)傳入的消息都要調(diào)用
- channelReadComplete()——通知ChannelInboundHandler最后一次對(duì)channelRead()的調(diào)用是當(dāng)前批量讀取中的最后一條消息
- execeptionCaught()——在讀取操作期間扳躬,有異常拋出時(shí)會(huì)調(diào)用脆诉。
EchoServerHandler實(shí)現(xiàn)
package cn.sh.demo.echo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author sh
* @ChannelHandler.Sharable 標(biāo)示一個(gè)ChannelHandler可以被多個(gè)Channel安全地共享
*/
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
//將接受到的消息輸出到客戶端
System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8));
//將接收到的消息寫(xiě)給發(fā)送者,而不沖刷出站消息
ctx.write(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
//將消息沖刷到客戶端贷币,并且關(guān)閉該Channel
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
//打印異常堆棧跟蹤
cause.printStackTrace();
//關(guān)閉該Channel
ctx.close();
}
}
備注
- ChannelInbounHandlerAdapter每個(gè)方法都可以被重寫(xiě)然后掛鉤到事件生命周期的恰當(dāng)點(diǎn)上击胜。
- 重寫(xiě)exceptionCaught()方法允許你對(duì)Throwable的任何子類(lèi)型作出反應(yīng)。
- 每個(gè)Channel都擁有一個(gè)與之關(guān)聯(lián)的ChannelPipeline片择,ChannelPipeline持有一個(gè)ChannelHandler的實(shí)例鏈潜的。在默認(rèn)情況下,ChannelHandler會(huì)把對(duì)方法的調(diào)用轉(zhuǎn)發(fā)給鏈中的下一個(gè)ChannelHandler字管。因此啰挪,如果exceptionCaught()方法沒(méi)有被該鏈中的某處實(shí)現(xiàn),那么異常將會(huì)被傳遞到ChannelPipeline的末端進(jìn)行記錄
引導(dǎo)服務(wù)器
主要涉及的內(nèi)容
- 綁定監(jiān)聽(tīng)并接受傳入連接請(qǐng)求的端口
- 配置Channel嘲叔,將有關(guān)的入站消息通知給EchoServerHandler實(shí)例
Echo服務(wù)引導(dǎo)示例代碼
package cn.sh.demo.echo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void startServer() throws InterruptedException {
EchoServerHandler serverHandler = new EchoServerHandler();
//創(chuàng)建EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
//創(chuàng)建ServerBootstrap
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
//指定所使用的NIO傳輸Channel
.channel(NioServerSocketChannel.class)
//使用指定的端口套接字
.localAddress(new InetSocketAddress(port))
//添加一個(gè)EchoServerHandler到子Channel的ChannelPipeline
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//此處由于EchoServerHandler被注解標(biāo)注為@Shareble亡呵,所以我們總是使用相同的實(shí)例
channel.pipeline().addLast(serverHandler);
}
});
try {
//異步的綁定服務(wù)器,調(diào)用sync()方法阻塞等待直到綁定完成
ChannelFuture channelFuture = bootstrap.bind().sync();
//獲取Channel的CloseFuture硫戈,并且阻塞當(dāng)前線程直到它完成
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//關(guān)閉EventLoopGroup锰什,釋放所有的資源
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException {
if (args.length != 1) {
System.err.println("參數(shù)類(lèi)型或者個(gè)數(shù)不正確");
return;
}
//設(shè)置端口值
int port = Integer.parseInt(args[0]);
//啟動(dòng)Echo服務(wù)器
new EchoServer(port).startServer();
}
}
備注
- 此處使用了一個(gè)特殊的類(lèi)——ChannelInitializer。當(dāng)一個(gè)新的連接被接受時(shí),一個(gè)新的子Channel會(huì)被創(chuàng)建汁胆,此時(shí)ChannelInitializer就會(huì)把一個(gè)EchoServerHandler的示例添加到該Channel的ChannelPipeline中梭姓,這個(gè)ChannelHandler將會(huì)收到有關(guān)入站消息的通知。
回顧引導(dǎo)服務(wù)
- 創(chuàng)建一個(gè)ServerBootStrap實(shí)例來(lái)引導(dǎo)和綁定服務(wù)器
- 創(chuàng)建并分配一個(gè)NioEventLoopGroup實(shí)例進(jìn)行事件的處理嫩码,如接受新連接以及讀/寫(xiě)數(shù)據(jù)
- 指定服務(wù)器綁定的本地InetSocketAddress
- 使用EchoServerHandler的實(shí)例初始化每一個(gè)新的Channel
- 調(diào)用ServerBootstrap.bind()方法來(lái)綁定服務(wù)器
Echo客戶端
客戶端主要包括的操作:
- 連接到服務(wù)器
- 發(fā)送一個(gè)或多個(gè)消息
- 對(duì)于每個(gè)消息誉尖,等待并接受從服務(wù)器發(fā)回相同的消息
- 關(guān)閉連接
編寫(xiě)客戶端主要包括業(yè)務(wù)邏輯和引導(dǎo)
ChannelHandler實(shí)現(xiàn)客戶端邏輯
在該示例中,我們使用SimpleChannelInboundHandler類(lèi)來(lái)處理所有的事件铸题,主要的方法有:
- channelActive()——在和服務(wù)器的連接已經(jīng)建立之后被調(diào)用
- channelRead0()——當(dāng)從服務(wù)器接收到一條消息時(shí)被調(diào)用
- exceptionCaught()——在處理過(guò)程中引發(fā)異常時(shí)被調(diào)用
示例代碼如下:
package cn.sh.demo.echo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* @author sh
* @ChannelHandler.Sharable 標(biāo)記該類(lèi)的示例可以被多個(gè)Channel共享
*/
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) {
//當(dāng)一個(gè)連接被服務(wù)器接受并建立后铡恕,發(fā)送一條消息
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Netty", CharsetUtil.UTF_8));
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
//記錄客戶端接收到服務(wù)器的消息
System.out.println("Client received:" + byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 在發(fā)生異常時(shí),記錄錯(cuò)誤并關(guān)閉Channel
* @param ctx
* @param cause
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
備注
?每次在接受數(shù)據(jù)時(shí)丢间,都會(huì)調(diào)用channelRead0()方法探熔。需要注意的是,由服務(wù)器發(fā)送的消息可能會(huì)被分塊接受烘挫。也就是說(shuō)诀艰,如果服務(wù)器發(fā)送了5字節(jié),那么不能保證這5字節(jié)會(huì)被一次性接受墙牌。即使是對(duì)于這么少量的數(shù)據(jù)涡驮,channelRead0()方法也可能會(huì)被調(diào)用兩次,第一次使用一個(gè)持有3字節(jié)的ByteBuf(Netty的字節(jié)容器)喜滨,第二次使用一個(gè)持有2字節(jié)的ByteBuf捉捅。作為一個(gè)面向流的協(xié)議,TCP保證了字節(jié)數(shù)組會(huì)按照服務(wù)器發(fā)送它們的順序被接受虽风。
為什么客戶端使用SimpleChannelInboundHandler而不是ChannelInboundHandlerAdapter棒口?
主要和業(yè)務(wù)邏輯如何處理消息以及Netty如何管理資源有關(guān)
客戶端中,當(dāng)channelRead0()方法完成時(shí)辜膝,已經(jīng)接受了消息并且處理完畢无牵,當(dāng)該方法返回時(shí),SimpleChannelInboundHandler負(fù)責(zé)釋放指向保存該消息的ByteBuf的內(nèi)存引用厂抖。
但是在服務(wù)器端茎毁,你需要將消息返回給客戶端,write()操作是異步的忱辅,直到channelRead()方法返回后有可能仍然沒(méi)有完成七蜘,ChannelInboundHandlerAdapter在這個(gè)時(shí)間點(diǎn)上不會(huì)釋放消息。
客戶端的消息是在channelComplete()方法中墙懂,通過(guò)writeAndFlush()方法調(diào)用時(shí)被釋放橡卤。
引導(dǎo)客戶端
客戶端使用主機(jī)和端口參數(shù)來(lái)連接遠(yuǎn)程地址,也就是Echo服務(wù)器的地址损搬,而不是綁定到一個(gè)一直被監(jiān)聽(tīng)的端口碧库。
示例代碼如下:
package cn.sh.demo.echo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
//創(chuàng)建客戶端引導(dǎo)器
Bootstrap bootstrap = new Bootstrap();
//指定使用NioEventLoopGroup來(lái)處理客戶端事件
bootstrap.group(group)
//指定使用NIO傳輸?shù)腃hannel類(lèi)型
.channel(NioSocketChannel.class)
//設(shè)置服務(wù)器的InetSocketAddress
.remoteAddress(new InetSocketAddress(host, port))
//在創(chuàng)建Channel時(shí)柜与,向ChannelPipeline中添加一個(gè)EchoHandler實(shí)例
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new EchoClientHandler());
}
});
try {
//連接到遠(yuǎn)程節(jié)點(diǎn),阻塞等待直到連接完成
ChannelFuture future = bootstrap.connect().sync();
//阻塞直到Channel關(guān)閉
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//關(guān)閉線程池并且釋放所有的資源
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException {
if (args.length != 2) {
System.err.println("參數(shù)個(gè)數(shù)不正確");
return;
}
int port = Integer.parseInt(args[1]);
new EchoClient(args[0], port).start();
}
}
備注
服務(wù)器和客戶端均使用了NIO傳輸嵌灰,但是弄匕,客戶端和服務(wù)端可以使用不同的傳輸,例如伞鲫,在服務(wù)器使用NIO傳輸粘茄,客戶端可以使用OIO傳輸
回顧引導(dǎo)服務(wù)器
- 創(chuàng)建一個(gè)Bootstrap實(shí)例,引導(dǎo)并創(chuàng)建客戶端
- 創(chuàng)建一個(gè)NioEventLoopGroup實(shí)例來(lái)進(jìn)行事件處理秕脓,其中事件處理包括創(chuàng)建新的連接以及處理入站和出站數(shù)據(jù)
- 為服務(wù)器連接創(chuàng)建了一個(gè)InetSocketAddress實(shí)例
- 當(dāng)連接建立時(shí),一個(gè)EchoClientHandler實(shí)例會(huì)被添加到(該Channel)的ChannelPipeline中
- 設(shè)置完成后儒搭,調(diào)用Bootstrap.connetc()連接到遠(yuǎn)程節(jié)點(diǎn)
運(yùn)行程序
- 啟動(dòng)服務(wù)端
- 再啟動(dòng)客戶端
服務(wù)端的輸出如下:
客戶端的輸出如下:
代碼地址
該文章的示例代碼位于cn.sh.demo.echo包下吠架。