Netty4實(shí)戰(zhàn) - TCP粘包&拆包解決方案

Netty是目前業(yè)界最流行的NIO框架之一,它的健壯性膜廊、高性能乏沸、可定制和可擴(kuò)展性在同類框架中都是首屈一指。它已經(jīng)得到了成百上千的商業(yè)項(xiàng)目的驗(yàn)證爪瓜,例如Hadoop的RPC框架Avro就使用了Netty作為底層通信框架蹬跃,其他的業(yè)界主流RPC框架,例如:Dubbo铆铆、Google 開源的gRPC蝶缀、新浪微博開源的Motan、Twitter 開源的 finagle也使用Netty來構(gòu)建高性能的異步通信能力薄货。另外翁都,阿里巴巴開源的消息中間件RocketMQ也使用Netty作為底層通信框架。

TCP黏包/拆包

TCP是一個(gè)“流”協(xié)議谅猾,所謂流柄慰,就是沒有界限的一長(zhǎng)串二進(jìn)制數(shù)據(jù)。TCP作為傳輸層協(xié)議并不不了解上層業(yè)務(wù)數(shù)據(jù)的具體含義税娜,它會(huì)根據(jù)TCP緩沖區(qū)的實(shí)際情況進(jìn)行數(shù)據(jù)包的劃分坐搔,所以在業(yè)務(wù)上認(rèn)為是一個(gè)完整的包,可能會(huì)被TCP拆分成多個(gè)包進(jìn)行發(fā)送敬矩,也有可能把多個(gè)小的包封裝成一個(gè)大的數(shù)據(jù)包發(fā)送薯蝎,這就是所謂的TCP粘包和拆包問題。

粘包問題的解決策略

由于底層的TCP無(wú)法理解上層的業(yè)務(wù)數(shù)據(jù)谤绳,所以在底層是無(wú)法保證數(shù)據(jù)包不被拆分和重組的,這個(gè)問題只能通過上層的應(yīng)用協(xié)議棧設(shè)計(jì)來解決袒哥。業(yè)界的主流協(xié)議的解決方案缩筛,可以歸納如下:

  1. 消息定長(zhǎng),報(bào)文大小固定長(zhǎng)度堡称,例如每個(gè)報(bào)文的長(zhǎng)度固定為200字節(jié)瞎抛,如果不夠空位補(bǔ)空格;
  2. 包尾添加特殊分隔符却紧,例如每條報(bào)文結(jié)束都添加回車換行符(例如FTP協(xié)議)或者指定特殊字符作為報(bào)文分隔符桐臊,接收方通過特殊分隔符切分報(bào)文區(qū)分胎撤;
  3. 將消息分為消息頭和消息體,消息頭中包含表示信息的總長(zhǎng)度(或者消息體長(zhǎng)度)的字段断凶;
  4. 更復(fù)雜的自定義應(yīng)用層協(xié)議伤提。

Netty粘包和拆包解決方案

Netty提供了多個(gè)解碼器,可以進(jìn)行分包的操作认烁,分別是:

  • LineBasedFrameDecoder
  • DelimiterBasedFrameDecoder(添加特殊分隔符報(bào)文來分包)
  • FixedLengthFrameDecoder(使用定長(zhǎng)的報(bào)文來分包)
  • LengthFieldBasedFrameDecoder

LineBasedFrameDecoder解碼器

LineBasedFrameDecoder是回車換行解碼器肿男,如果用戶發(fā)送的消息以回車換行符作為消息結(jié)束的標(biāo)識(shí),則可以直接使用Netty的LineBasedFrameDecoder對(duì)消息進(jìn)行解碼却嗡,只需要在初始化Netty服務(wù)端或者客戶端時(shí)將LineBasedFrameDecoder正確的添加到ChannelPipeline中即可舶沛,不需要自己重新實(shí)現(xiàn)一套換行解碼器。

Netty依賴

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.9.Final</version>
        </dependency>

1.1 Server端

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 * @author Ricky
 *
 */
public class LineBasedServer {
    private Logger logger = LoggerFactory.getLogger(getClass());

    public void bind(int port) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {

                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new LineBasedFrameDecoder(1024));
                    p.addLast(new StringDecoder());
                    p.addLast(new StringEncoder());

                    p.addLast(new LineServerHandler());
                }
            });

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)

            logger.info("server bind port:{}", port);

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {

        new LineBasedServer().bind(Constants.PORT);
    }
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LineServerHandler extends ChannelInboundHandlerAdapter {
    private Logger logger = LoggerFactory.getLogger(getClass());

    private int count = 0;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {

        count++;
        String body = (String) msg;
        logger.info("server read msg:{}, count:{}", body, count);

        String response = "hello from server"+System.getProperty("line.separator");
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        logger.error("server caught exception", cause);
        ctx.close();
    }

}

1.2 Client

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class LineBasedClient {

    public void connect(String host, int port) throws InterruptedException {

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {

                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new LineBasedFrameDecoder(1024));
                    p.addLast(new StringDecoder());
                    p.addLast(new StringEncoder());

                    p.addLast(new LineClientHandler());
                }
            });

            ChannelFuture future = b.connect(Constants.HOST, Constants.PORT).sync();

            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {

        new LineBasedClient().connect(Constants.HOST, Constants.PORT);
    }
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LineClientHandler extends ChannelInboundHandlerAdapter {
    private Logger logger = LoggerFactory.getLogger(getClass());

    private int count =0;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // Send the message to Server
       for(int i=0; i<100; i++){

           String msg = "hello from client "+i;
           logger.info("client send message:{}", msg);

           ctx.writeAndFlush(msg+System.getProperty("line.separator"));
       }

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        String body = (String) msg;
        count++;
        logger.info("client read msg:{}, count:{}", body, count);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        logger.error("client caught exception", cause);
        ctx.close();
    }
}

DelimiterBasedFrameDecoder解碼器

DelimiterBasedFrameDecoder是分隔符解碼器窗价,用戶可以指定消息結(jié)束的分隔符如庭,它可以自動(dòng)完成以分隔符作為碼流結(jié)束標(biāo)識(shí)的消息的解碼『掣郏回車換行解碼器實(shí)際上是一種特殊的DelimiterBasedFrameDecoder解碼器坪它。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 * @author Ricky
 *
 */
public class DelimiterServer {
    private Logger logger = LoggerFactory.getLogger(getClass());


    public void bind(int port) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {

                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(Constants.DELIMITER.getBytes())));
                    p.addLast(new StringDecoder());
                    p.addLast(new StringEncoder());

                    p.addLast(new DelimiterServerHandler());
                }
            });

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)

            logger.info("server bind port:{}", port);

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {

        new DelimiterServer().bind(Constants.PORT);
    }
}

Client:

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class DelimiterClient {
    
    public void connect(String host, int port) throws InterruptedException {

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {

                    ChannelPipeline p = ch.pipeline();
                    p.addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(Constants.DELIMITER.getBytes())));
                    p.addLast(new StringDecoder());
                    p.addLast(new StringEncoder());

                    p.addLast(new DelimiterClientHandler());
                }
            });

            ChannelFuture future = b.connect(Constants.HOST, Constants.PORT).sync();

            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {

        new DelimiterClient().connect(Constants.HOST, Constants.PORT);
    }
}

FixedLengthFrameDecoder解碼器

FixedLengthFrameDecoder是固定長(zhǎng)度解碼器,它能夠按照指定的長(zhǎng)度對(duì)消息進(jìn)行自動(dòng)解碼餐胀,開發(fā)者不需要考慮TCP的粘包/拆包等問題哟楷,非常實(shí)用。

對(duì)于定長(zhǎng)消息否灾,如果消息實(shí)際長(zhǎng)度小于定長(zhǎng)卖擅,則往往會(huì)進(jìn)行補(bǔ)位操作,它在一定程度上導(dǎo)致了空間和資源的浪費(fèi)墨技。但是它的優(yōu)點(diǎn)也是非常明顯的惩阶,編解碼比較簡(jiǎn)單,因此在實(shí)際項(xiàng)目中仍然有一定的應(yīng)用場(chǎng)景扣汪。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author Ricky Fung
 */
public class NettyServer {
    private Logger logger = LoggerFactory.getLogger(getClass());

    public void bind(int port) throws InterruptedException {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //配置服務(wù)器啟動(dòng)類
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))//配置日志輸出
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast(new FixedLengthFrameDecoder(1<<5));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new StringEncoder());

                            ch.pipeline().addLast(new ServerHandler());
                        }
                    });

            ChannelFuture f = b.bind(port).sync();
            //等待服務(wù)器退出
            f.channel().closeFuture().sync();
        } finally {
            //釋放線程資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ServerHandler extends ChannelInboundHandlerAdapter {
        private int counter = 0;

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

            logger.info("接收客戶端msg:{}", msg);

            ByteBuf echo = Unpooled.copiedBuffer(String.format("Hello from server:", counter).getBytes());
            ctx.writeAndFlush(echo);
        }
    }

    public static void main(String[] args) throws InterruptedException {

        new NettyServer().bind(Constants.PORT);
    }
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author Ricky Fung
 */
public class NettyClient {

    private Logger logger = LoggerFactory.getLogger(getClass());

    public void connect(String host, int port) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ch.pipeline().addLast(new FixedLengthFrameDecoder(1<<5));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new StringEncoder());

                            ch.pipeline().addLast(new ClientHandler());
                        }
                    });

            ChannelFuture future = b.connect(host, port).sync();

            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    private class ClientHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {

            for(int i=0; i<100; i++){

                String msg = "hello from client "+i;
                logger.info("client send message:{}", msg);

                ctx.writeAndFlush(msg);
            }
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {

            logger.info("接收服務(wù)端msg:{}", msg);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }

    }

    public static void main(String[] args) throws InterruptedException {

        new NettyClient().connect(Constants.HOST, Constants.PORT);
    }
}

LengthFieldBasedFrameDecoder解碼器

大多數(shù)的協(xié)議(私有或者公有)断楷,協(xié)議頭中會(huì)攜帶長(zhǎng)度字段,用于標(biāo)識(shí)消息體或者整包消息的長(zhǎng)度崭别,例如SMPP冬筒、HTTP協(xié)議等。由于基于長(zhǎng)度解碼需求的通用性茅主,以及為了降低用戶的協(xié)議開發(fā)難度舞痰,Netty提供了LengthFieldBasedFrameDecoder,自動(dòng)屏蔽TCP底層的拆包和粘包問題诀姚,只需要傳入正確的參數(shù)响牛,即可輕松解決“讀半包“問題。

Message.java

import java.nio.charset.Charset;

/**
 * @author Ricky Fung
 */
public class Message {

    private final Charset charset = Charset.forName("utf-8");

    private byte magicType;
    private byte type;//消息類型  0xAF 表示心跳包    0xBF 表示超時(shí)包  0xCF 業(yè)務(wù)信息包
    private long requestId; //請(qǐng)求id
    private int length;
    private String body;

    public Message(){

    }

    public Message(byte magicType, byte type, long requestId, byte[] data) {
        this.magicType = magicType;
        this.type = type;
        this.requestId = requestId;
        this.length = data.length;
        this.body = new String(data, charset);
    }

    public Message(byte magicType, byte type, long requestId, String body) {
        this.magicType = magicType;
        this.type = type;
        this.requestId = requestId;
        this.length = body.getBytes(charset).length;
        this.body = body;
    }
    ...setter/getter
}

MessageDecoder.java

import com.mindflow.netty4.unpack.model.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author Ricky Fung
 */
public class MessageDecoder extends LengthFieldBasedFrameDecoder {
    private Logger logger = LoggerFactory.getLogger(getClass());

    //頭部信息的大小應(yīng)該是 byte+byte+int = 1+1+8+4 = 14
    private static final int HEADER_SIZE = 14;

    public MessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (in == null) {
            return null;
        }

        if (in.readableBytes() <= HEADER_SIZE) {
            return null;
        }

        in.markReaderIndex();

        byte magic = in.readByte();
        byte type = in.readByte();
        long requestId = in.readLong();
        int dataLength = in.readInt();

        // FIXME 如果dataLength過大,可能導(dǎo)致問題
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return null;
        }

        byte[] data = new byte[dataLength];
        in.readBytes(data);

        String body = new String(data, "UTF-8");
        Message msg = new Message(magic, type, requestId, body);
        return msg;
    }
}

MessageEncoder.java

import com.mindflow.netty4.unpack.model.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.nio.charset.Charset;

/**
 * @author Ricky Fung
 */
public class MessageEncoder extends MessageToByteEncoder<Message> {
    private final Charset charset = Charset.forName("utf-8");

    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {

        //
        out.writeByte(msg.getMagicType());
        out.writeByte(msg.getType());
        out.writeLong(msg.getRequestId());

        byte[] data = msg.getBody().getBytes(charset);
        out.writeInt(data.length);
        out.writeBytes(data);
    }
}

服務(wù)端:

import com.mindflow.netty4.unpack.model.Message;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author Ricky Fung
 */
public class NettyServer {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public void bind(int port) throws InterruptedException {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {

                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new MessageDecoder(1<<20, 10, 4));
                            p.addLast(new MessageEncoder());
                            p.addLast(new ServerHandler());
                        }
                    });

            // Bind and start to accept incoming connections.
            ChannelFuture future = b.bind(port).sync(); // (7)

            logger.info("server bind port:{}", port);

            // Wait until the server socket is closed.
            future.channel().closeFuture().sync();

        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ServerHandler extends SimpleChannelInboundHandler<Message> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {

            logger.info("server read msg:{}", msg);

            Message resp = new Message(msg.getMagicType(), msg.getType(), msg.getRequestId(), "Hello world from server");
            ctx.writeAndFlush(resp);
        }
    }

    public static void main(String[] args) throws Exception {

        new NettyServer().bind(Constants.PORT);
    }
}

客戶端:

import com.mindflow.netty4.unpack.model.Message;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
 * @author Ricky Fung
 */
public class NettyClient {

    public void connect(String host, int port) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new MessageDecoder(1<<20, 10, 4));
                            p.addLast(new MessageEncoder());

                            p.addLast(new ClientHandler());
                        }
                    });

            ChannelFuture future = b.connect(host, port).sync();

            future.awaitUninterruptibly(2000, TimeUnit.MILLISECONDS);
            if(future.channel().isActive()){

                for(int i=0; i<100; i++) {

                    String body = "Hello world from client:"+ i;
                    Message msg = new Message((byte) 0XAF, (byte) 0XBF, i, body);

                    future.channel().writeAndFlush(msg);
                }
            }

            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    private class ClientHandler extends ChannelInboundHandlerAdapter {
        private Logger logger = LoggerFactory.getLogger(getClass());

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {

            logger.info("client read msg:{}, ", msg);

        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            logger.error("client caught exception", cause);
            ctx.close();
        }
    }

    public static void main(String[] args) throws Exception {

        new NettyClient().connect(Constants.HOST, Constants.PORT);
    }
}

參考資料

Netty API Reference

Netty系列之Netty編解碼框架分析

源碼下載

https://github.com/TiFG/netty4-in-action/tree/master/netty4-unpack-demo

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末呀打,一起剝皮案震驚了整個(gè)濱河市矢赁,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌贬丛,老刑警劉巖撩银,帶你破解...
    沈念sama閱讀 211,639評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異瘫寝,居然都是意外死亡蜒蕾,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門焕阿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來咪啡,“玉大人,你說我怎么就攤上這事暮屡〕访” “怎么了?”我有些...
    開封第一講書人閱讀 157,221評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵褒纲,是天一觀的道長(zhǎng)准夷。 經(jīng)常有香客問我,道長(zhǎng)莺掠,這世上最難降的妖魔是什么衫嵌? 我笑而不...
    開封第一講書人閱讀 56,474評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮彻秆,結(jié)果婚禮上楔绞,老公的妹妹穿的比我還像新娘。我一直安慰自己唇兑,他們只是感情好酒朵,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,570評(píng)論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著扎附,像睡著了一般蔫耽。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上留夜,一...
    開封第一講書人閱讀 49,816評(píng)論 1 290
  • 那天匙铡,我揣著相機(jī)與錄音,去河邊找鬼碍粥。 笑死慰枕,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的即纲。 我是一名探鬼主播,決...
    沈念sama閱讀 38,957評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼博肋,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼低斋!你這毒婦竟也來了蜂厅?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,718評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤膊畴,失蹤者是張志新(化名)和其女友劉穎掘猿,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體唇跨,經(jīng)...
    沈念sama閱讀 44,176評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡稠通,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,511評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了买猖。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片改橘。...
    茶點(diǎn)故事閱讀 38,646評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖玉控,靈堂內(nèi)的尸體忽然破棺而出飞主,到底是詐尸還是另有隱情,我是刑警寧澤高诺,帶...
    沈念sama閱讀 34,322評(píng)論 4 330
  • 正文 年R本政府宣布碌识,位于F島的核電站,受9級(jí)特大地震影響虱而,放射性物質(zhì)發(fā)生泄漏筏餐。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,934評(píng)論 3 313
  • 文/蒙蒙 一牡拇、第九天 我趴在偏房一處隱蔽的房頂上張望魁瞪。 院中可真熱鬧,春花似錦诅迷、人聲如沸佩番。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)趟畏。三九已至,卻和暖如春滩租,著一層夾襖步出監(jiān)牢的瞬間赋秀,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評(píng)論 1 266
  • 我被黑心中介騙來泰國(guó)打工律想, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留猎莲,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,358評(píng)論 2 360
  • 正文 我出身青樓技即,卻偏偏與公主長(zhǎng)得像著洼,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,514評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 什么是TCP協(xié)議身笤? TCP(Transmission Control Protocol傳輸控制協(xié)議)是Intern...
    BlackManba_24閱讀 2,075評(píng)論 0 5
  • 簡(jiǎn)介 用簡(jiǎn)單的話來定義tcpdump豹悬,就是:dump the traffic on a network,根據(jù)使用者...
    保川閱讀 5,946評(píng)論 1 13
  • 個(gè)人認(rèn)為液荸,Goodboy1881先生的TCP /IP 協(xié)議詳解學(xué)習(xí)博客系列博客是一部非常精彩的學(xué)習(xí)筆記瞻佛,這雖然只是...
    貳零壹柒_fc10閱讀 5,051評(píng)論 0 8
  • 1.這篇文章不是本人原創(chuàng)的,只是個(gè)人為了對(duì)這部分知識(shí)做一個(gè)整理和系統(tǒng)的輸出而編輯成的娇钱,在此鄭重地向本文所引用文章的...
    SOMCENT閱讀 13,049評(píng)論 6 174
  • 《不能承受的生命之輕》:人在美感的引導(dǎo)下,把偶然的事件變成了一個(gè)主題细疚,然后記錄在生命的樂章中蔗彤。猶如作曲家譜寫奏鳴曲...
    LWALLE閱讀 243評(píng)論 0 0