Netty是目前業(yè)界最流行的NIO框架之一,它的健壯性膜廊、高性能乏沸、可定制和可擴(kuò)展性在同類框架中都是首屈一指。它已經(jīng)得到了成百上千的商業(yè)項(xiàng)目的驗(yàn)證爪瓜,例如Hadoop的RPC框架Avro就使用了Netty作為底層通信框架蹬跃,其他的業(yè)界主流RPC框架,例如:Dubbo铆铆、Google 開源的gRPC蝶缀、新浪微博開源的Motan、Twitter 開源的 finagle也使用Netty來構(gòu)建高性能的異步通信能力薄货。另外翁都,阿里巴巴開源的消息中間件RocketMQ也使用Netty作為底層通信框架。
TCP黏包/拆包
TCP是一個(gè)“流”協(xié)議谅猾,所謂流柄慰,就是沒有界限的一長(zhǎng)串二進(jìn)制數(shù)據(jù)。TCP作為傳輸層協(xié)議并不不了解上層業(yè)務(wù)數(shù)據(jù)的具體含義税娜,它會(huì)根據(jù)TCP緩沖區(qū)的實(shí)際情況進(jìn)行數(shù)據(jù)包的劃分坐搔,所以在業(yè)務(wù)上認(rèn)為是一個(gè)完整的包,可能會(huì)被TCP拆分成多個(gè)包進(jìn)行發(fā)送敬矩,也有可能把多個(gè)小的包封裝成一個(gè)大的數(shù)據(jù)包發(fā)送薯蝎,這就是所謂的TCP粘包和拆包問題。
粘包問題的解決策略
由于底層的TCP無(wú)法理解上層的業(yè)務(wù)數(shù)據(jù)谤绳,所以在底層是無(wú)法保證數(shù)據(jù)包不被拆分和重組的,這個(gè)問題只能通過上層的應(yīng)用協(xié)議棧設(shè)計(jì)來解決袒哥。業(yè)界的主流協(xié)議的解決方案缩筛,可以歸納如下:
- 消息定長(zhǎng),報(bào)文大小固定長(zhǎng)度堡称,例如每個(gè)報(bào)文的長(zhǎng)度固定為200字節(jié)瞎抛,如果不夠空位補(bǔ)空格;
- 包尾添加特殊分隔符却紧,例如每條報(bào)文結(jié)束都添加回車換行符(例如FTP協(xié)議)或者指定特殊字符作為報(bào)文分隔符桐臊,接收方通過特殊分隔符切分報(bào)文區(qū)分胎撤;
- 將消息分為消息頭和消息體,消息頭中包含表示信息的總長(zhǎng)度(或者消息體長(zhǎng)度)的字段断凶;
- 更復(fù)雜的自定義應(yīng)用層協(xié)議伤提。
Netty粘包和拆包解決方案
Netty提供了多個(gè)解碼器,可以進(jìn)行分包的操作认烁,分別是:
- LineBasedFrameDecoder
- DelimiterBasedFrameDecoder(添加特殊分隔符報(bào)文來分包)
- FixedLengthFrameDecoder(使用定長(zhǎng)的報(bào)文來分包)
- LengthFieldBasedFrameDecoder
LineBasedFrameDecoder解碼器
LineBasedFrameDecoder是回車換行解碼器肿男,如果用戶發(fā)送的消息以回車換行符作為消息結(jié)束的標(biāo)識(shí),則可以直接使用Netty的LineBasedFrameDecoder對(duì)消息進(jìn)行解碼却嗡,只需要在初始化Netty服務(wù)端或者客戶端時(shí)將LineBasedFrameDecoder正確的添加到ChannelPipeline中即可舶沛,不需要自己重新實(shí)現(xiàn)一套換行解碼器。
Netty依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.9.Final</version>
</dependency>
1.1 Server端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author Ricky
*
*/
public class LineBasedServer {
private Logger logger = LoggerFactory.getLogger(getClass());
public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LineBasedFrameDecoder(1024));
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
p.addLast(new LineServerHandler());
}
});
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
logger.info("server bind port:{}", port);
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new LineBasedServer().bind(Constants.PORT);
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LineServerHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory.getLogger(getClass());
private int count = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
count++;
String body = (String) msg;
logger.info("server read msg:{}, count:{}", body, count);
String response = "hello from server"+System.getProperty("line.separator");
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
logger.error("server caught exception", cause);
ctx.close();
}
}
1.2 Client
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class LineBasedClient {
public void connect(String host, int port) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LineBasedFrameDecoder(1024));
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
p.addLast(new LineClientHandler());
}
});
ChannelFuture future = b.connect(Constants.HOST, Constants.PORT).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new LineBasedClient().connect(Constants.HOST, Constants.PORT);
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LineClientHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory.getLogger(getClass());
private int count =0;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Send the message to Server
for(int i=0; i<100; i++){
String msg = "hello from client "+i;
logger.info("client send message:{}", msg);
ctx.writeAndFlush(msg+System.getProperty("line.separator"));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
String body = (String) msg;
count++;
logger.info("client read msg:{}, count:{}", body, count);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
logger.error("client caught exception", cause);
ctx.close();
}
}
DelimiterBasedFrameDecoder解碼器
DelimiterBasedFrameDecoder是分隔符解碼器窗价,用戶可以指定消息結(jié)束的分隔符如庭,它可以自動(dòng)完成以分隔符作為碼流結(jié)束標(biāo)識(shí)的消息的解碼『掣郏回車換行解碼器實(shí)際上是一種特殊的DelimiterBasedFrameDecoder解碼器坪它。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author Ricky
*
*/
public class DelimiterServer {
private Logger logger = LoggerFactory.getLogger(getClass());
public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(Constants.DELIMITER.getBytes())));
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
p.addLast(new DelimiterServerHandler());
}
});
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
logger.info("server bind port:{}", port);
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new DelimiterServer().bind(Constants.PORT);
}
}
Client:
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class DelimiterClient {
public void connect(String host, int port) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(Constants.DELIMITER.getBytes())));
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
p.addLast(new DelimiterClientHandler());
}
});
ChannelFuture future = b.connect(Constants.HOST, Constants.PORT).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new DelimiterClient().connect(Constants.HOST, Constants.PORT);
}
}
FixedLengthFrameDecoder解碼器
FixedLengthFrameDecoder是固定長(zhǎng)度解碼器,它能夠按照指定的長(zhǎng)度對(duì)消息進(jìn)行自動(dòng)解碼餐胀,開發(fā)者不需要考慮TCP的粘包/拆包等問題哟楷,非常實(shí)用。
對(duì)于定長(zhǎng)消息否灾,如果消息實(shí)際長(zhǎng)度小于定長(zhǎng)卖擅,則往往會(huì)進(jìn)行補(bǔ)位操作,它在一定程度上導(dǎo)致了空間和資源的浪費(fèi)墨技。但是它的優(yōu)點(diǎn)也是非常明顯的惩阶,編解碼比較簡(jiǎn)單,因此在實(shí)際項(xiàng)目中仍然有一定的應(yīng)用場(chǎng)景扣汪。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Ricky Fung
*/
public class NettyServer {
private Logger logger = LoggerFactory.getLogger(getClass());
public void bind(int port) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//配置服務(wù)器啟動(dòng)類
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))//配置日志輸出
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(1<<5));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture f = b.bind(port).sync();
//等待服務(wù)器退出
f.channel().closeFuture().sync();
} finally {
//釋放線程資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ServerHandler extends ChannelInboundHandlerAdapter {
private int counter = 0;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("接收客戶端msg:{}", msg);
ByteBuf echo = Unpooled.copiedBuffer(String.format("Hello from server:", counter).getBytes());
ctx.writeAndFlush(echo);
}
}
public static void main(String[] args) throws InterruptedException {
new NettyServer().bind(Constants.PORT);
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Ricky Fung
*/
public class NettyClient {
private Logger logger = LoggerFactory.getLogger(getClass());
public void connect(String host, int port) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(1<<5));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = b.connect(host, port).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
private class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=0; i<100; i++){
String msg = "hello from client "+i;
logger.info("client send message:{}", msg);
ctx.writeAndFlush(msg);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
logger.info("接收服務(wù)端msg:{}", msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
}
public static void main(String[] args) throws InterruptedException {
new NettyClient().connect(Constants.HOST, Constants.PORT);
}
}
LengthFieldBasedFrameDecoder解碼器
大多數(shù)的協(xié)議(私有或者公有)断楷,協(xié)議頭中會(huì)攜帶長(zhǎng)度字段,用于標(biāo)識(shí)消息體或者整包消息的長(zhǎng)度崭别,例如SMPP冬筒、HTTP協(xié)議等。由于基于長(zhǎng)度解碼需求的通用性茅主,以及為了降低用戶的協(xié)議開發(fā)難度舞痰,Netty提供了LengthFieldBasedFrameDecoder,自動(dòng)屏蔽TCP底層的拆包和粘包問題诀姚,只需要傳入正確的參數(shù)响牛,即可輕松解決“讀半包“問題。
Message.java
import java.nio.charset.Charset;
/**
* @author Ricky Fung
*/
public class Message {
private final Charset charset = Charset.forName("utf-8");
private byte magicType;
private byte type;//消息類型 0xAF 表示心跳包 0xBF 表示超時(shí)包 0xCF 業(yè)務(wù)信息包
private long requestId; //請(qǐng)求id
private int length;
private String body;
public Message(){
}
public Message(byte magicType, byte type, long requestId, byte[] data) {
this.magicType = magicType;
this.type = type;
this.requestId = requestId;
this.length = data.length;
this.body = new String(data, charset);
}
public Message(byte magicType, byte type, long requestId, String body) {
this.magicType = magicType;
this.type = type;
this.requestId = requestId;
this.length = body.getBytes(charset).length;
this.body = body;
}
...setter/getter
}
MessageDecoder.java
import com.mindflow.netty4.unpack.model.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Ricky Fung
*/
public class MessageDecoder extends LengthFieldBasedFrameDecoder {
private Logger logger = LoggerFactory.getLogger(getClass());
//頭部信息的大小應(yīng)該是 byte+byte+int = 1+1+8+4 = 14
private static final int HEADER_SIZE = 14;
public MessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in == null) {
return null;
}
if (in.readableBytes() <= HEADER_SIZE) {
return null;
}
in.markReaderIndex();
byte magic = in.readByte();
byte type = in.readByte();
long requestId = in.readLong();
int dataLength = in.readInt();
// FIXME 如果dataLength過大,可能導(dǎo)致問題
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return null;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
String body = new String(data, "UTF-8");
Message msg = new Message(magic, type, requestId, body);
return msg;
}
}
MessageEncoder.java
import com.mindflow.netty4.unpack.model.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.nio.charset.Charset;
/**
* @author Ricky Fung
*/
public class MessageEncoder extends MessageToByteEncoder<Message> {
private final Charset charset = Charset.forName("utf-8");
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
//
out.writeByte(msg.getMagicType());
out.writeByte(msg.getType());
out.writeLong(msg.getRequestId());
byte[] data = msg.getBody().getBytes(charset);
out.writeInt(data.length);
out.writeBytes(data);
}
}
服務(wù)端:
import com.mindflow.netty4.unpack.model.Message;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Ricky Fung
*/
public class NettyServer {
private Logger logger = LoggerFactory.getLogger(this.getClass());
public void bind(int port) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new MessageDecoder(1<<20, 10, 4));
p.addLast(new MessageEncoder());
p.addLast(new ServerHandler());
}
});
// Bind and start to accept incoming connections.
ChannelFuture future = b.bind(port).sync(); // (7)
logger.info("server bind port:{}", port);
// Wait until the server socket is closed.
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ServerHandler extends SimpleChannelInboundHandler<Message> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
logger.info("server read msg:{}", msg);
Message resp = new Message(msg.getMagicType(), msg.getType(), msg.getRequestId(), "Hello world from server");
ctx.writeAndFlush(resp);
}
}
public static void main(String[] args) throws Exception {
new NettyServer().bind(Constants.PORT);
}
}
客戶端:
import com.mindflow.netty4.unpack.model.Message;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
* @author Ricky Fung
*/
public class NettyClient {
public void connect(String host, int port) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new MessageDecoder(1<<20, 10, 4));
p.addLast(new MessageEncoder());
p.addLast(new ClientHandler());
}
});
ChannelFuture future = b.connect(host, port).sync();
future.awaitUninterruptibly(2000, TimeUnit.MILLISECONDS);
if(future.channel().isActive()){
for(int i=0; i<100; i++) {
String body = "Hello world from client:"+ i;
Message msg = new Message((byte) 0XAF, (byte) 0XBF, i, body);
future.channel().writeAndFlush(msg);
}
}
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
private class ClientHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
logger.info("client read msg:{}, ", msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
logger.error("client caught exception", cause);
ctx.close();
}
}
public static void main(String[] args) throws Exception {
new NettyClient().connect(Constants.HOST, Constants.PORT);
}
}
參考資料
源碼下載
https://github.com/TiFG/netty4-in-action/tree/master/netty4-unpack-demo