服務(wù)端
EchoServerHandler - 消息處理
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws FileNotFoundException {
//ReferenceCountUtil.release(msg);
ByteBuf buf = (ByteBuf)msg;
System.out.println("Server recevied:"+ buf.toString(Charset.forName("UTF-8")));
ctx.write(buf);
}
final ByteBuf byteBuffer = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("hello world!\r\n", CharsetUtil.UTF_8));
@Override
public void channelActive(ChannelHandlerContext ctx){
ChannelFuture cf= ctx.writeAndFlush(byteBuffer.duplicate());
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()){
System.out.println("Write successful!");
}else{
System.out.println("Write Error!");
future.cause().printStackTrace();
}
}
});
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx){
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE); //將未決消息沖刷到遠(yuǎn)程節(jié)點且關(guān)閉該Channel
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
cause.printStackTrace();
ctx.close();
}
}
EchoServer- 啟動程序
public class EchoServer {
public static void main(String[] args) throws InterruptedException {
EchoServer echoNettyServer = new EchoServer();
echoNettyServer.start(9981);
}
public void start(int port) throws InterruptedException {
final EchoServerHandler serverHandler = new EchoServerHandler();
final ByteBuf byteBuffer = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("HI!\r\n", Charset.forName("UTF-8")));
EventLoopGroup group = new NioEventLoopGroup(); //非阻塞方式
try{
ServerBootstrap b = new ServerBootstrap(); //創(chuàng)建ServerBootstrap
b.group(group)//NioSctpChannel
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(serverHandler);
}//每個已接收的連接都調(diào)用它
});
ChannelFuture f = b.bind().sync(); //異步綁定服務(wù)器涡驮,調(diào)用sync()方法阻塞等待直接綁定完成
f.channel().closeFuture().sync(); //獲取Channel的CloseFuture,并且阻塞當(dāng)前線程直接它完成
}finally {
group.shutdownGracefully().sync();
}
}
}
客戶端
EchoClientHandler --消息處理
/**
* SimpleChannelInboundHandler:
* 1. 當(dāng)channnelRead0方法完成時稳吮,已傳入的消息并且已處理完它了。當(dāng)方法返回時织咧,SimpleChannelInboundHandler負(fù)責(zé)釋放指向保存該消息的ByteBuffer的內(nèi)存引用
* 2. 而服務(wù)端Handler,需要將傳入的消息回傳給發(fā)送者庄新,而write()是異步操作扶踊,直接ChannelRead()方法返回后可能仍然沒有完成。
*/
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
/**
* 接收到消息朱巨,服務(wù)器發(fā)送的消息可能會被分塊接收(并非一次性全部接收)
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("Client received:"+msg.toString(CharsetUtil.UTF_8));
}
@Override
public void channelActive(ChannelHandlerContext ctx){
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!!", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
cause.printStackTrace();
ctx.close();
}
}
EchoClient - 啟動程序
public class EchoClient {
public static void main(String[] args) throws InterruptedException {
String host = "localhost";
int port = 9981;
new EchoClient().start(host,port);
}
public void start(String host,int port) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
try {
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host,port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler())
;
}
});
ChannelFuture f = b.connect().sync(); //連接到遠(yuǎn)程節(jié)點,阻塞等待直接連接完成
f.channel().closeFuture().sync();//阻塞枉长,直接Channel關(guān)閉
f.addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()){
System.out.println("Connection established!");
}else{
System.out.println("Connection attempt failed!");
future.cause().printStackTrace();;
}
}
}
);
}finally {
group.shutdownGracefully().sync();
}
}
}