Netty學習筆記(三)

  • Netty 應用實例:群聊系統(tǒng)互例。
  • 要求:①實現(xiàn)多人群聊粗井;②服務器端:可以監(jiān)測用戶上線枝哄,離線,并實現(xiàn)消息轉發(fā)功能键科;③客戶端:通過 channel 可以無阻塞發(fā)送消息給其它所有用戶闻丑,同時可以接受其它用戶發(fā)送的消息(有服務器轉發(fā)得到)。
  • GroupChatServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class GroupChatServer {
    private int port; //監(jiān)聽端口
    public GroupChatServer(int port) {
        this.port = port;
    }
    //編寫run方法勋颖,處理客戶端的請求
    public void run() throws Exception {
        //創(chuàng)建兩個線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //獲取到pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //向pipeline加入解碼器
                            pipeline.addLast("decoder", new StringDecoder());
                            //向pipeline加入編碼器
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自己的業(yè)務處理handler
                            pipeline.addLast(new GroupChatServerHandler());
                        }
                    });
            System.out.println("netty 服務器啟動...");
            ChannelFuture channelFuture = b.bind(port).sync();
            //監(jiān)聽關閉
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws Exception {
        new GroupChatServer(7000).run();
    }
}
  • GroupChatServerHandler.java
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
    //public static List<Channel> channels = new ArrayList<Channel>();
    //使用一個hashMap 管理
    //public static Map<String, Channel> channels = new HashMap<String,Channel>();
    //定義一個channel 組嗦嗡,管理所有的channel
    //GlobalEventExecutor.INSTANCE 是全局的事件執(zhí)行器(單例)
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    //handlerAdded 表示連接一旦建立,第一個被執(zhí)行
    //將當前channel 加入到 channelGroup
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //將該客戶加入聊天的信息推送給其它在線的客戶端
        //該方法會將 channelGroup 中所有的channel 都遍歷一次饭玲,并發(fā)送消息侥祭,所以我們不需要自己遍歷
        channelGroup.writeAndFlush("[客戶端]:" + channel.remoteAddress() + " 加入聊天:" + sdf.format(new java.util.Date()) + " \n");
        channelGroup.add(channel);
    }
    //斷開連接,將xx客戶離開信息推送給當前在線的客戶
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[客戶端]:" + channel.remoteAddress() + " 離開了\n");
        System.out.println("channelGroup size:" + channelGroup.size());
    }
    //表示channel 處于活動狀態(tài),提示 xxx 上線
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " 上線了~");
    }
    //表示channel 處于不活動狀態(tài),提示 xxx離線了
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + " 離線了~");
    }
    //讀取數(shù)據(jù)
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        //獲取到當前channel
        Channel channel = ctx.channel();
        //這時我們遍歷channelGroup, 根據(jù)不同的情況殊者,回送不同的消息
        channelGroup.forEach(ch -> {
            //若不是當前的channel鲸沮,則轉發(fā)消息
            if (channel != ch) {
                ch.writeAndFlush("[客戶端]:" + channel.remoteAddress() + " 發(fā)送了消息:" + msg + "\n");
            } else {//回顯自己發(fā)送的消息給自己
                ch.writeAndFlush("[自己]發(fā)送了消息:" + msg + "\n");
            }
        });
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //關閉通道
        ctx.close();
    }
}
  • GroupChatClient.java
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
public class GroupChatClient {
    //屬性
    private final String host;
    private final int port;
    public GroupChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }
    public void run() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //得到pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //加入相關handler
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自定義的handler
                            pipeline.addLast(new GroupChatClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            Channel channel = channelFuture.channel();
            System.out.println("-------" + channel.localAddress() + "--------");
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                //通過channel 發(fā)送到服務器端
                channel.writeAndFlush(msg + "\r\n");
            }
        } finally {
            group.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws Exception {
        new GroupChatClient("127.0.0.1", 7000).run();
    }
}
  • GroupChatClientHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg.trim());
    }
}
  • Netty 應用實例:心跳機制檢測往扔。
  • 要求:當服務器超過 3 秒沒有讀時,就提示讀空閑;當服務器超過 5 秒沒有寫操作時,就提示寫空閑琼牧,實現(xiàn)當服務器超過 7 秒沒有讀或者寫操作時恢筝,就提示讀寫空閑。
  • MyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class MyServer {
    public static void main(String[] args) throws Exception {
        //創(chuàng)建兩個線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)); //增加一個日志處理器
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    //加入一個netty 提供 IdleStateHandler
                    /*
                        1巨坊、IdleStateHandler 是netty 提供的處理空閑狀態(tài)的處理器
                        2撬槽、long readerIdleTime:表示多長時間沒有讀,就會發(fā)送一個心跳檢測包檢測是否連接
                        3抱究、long writerIdleTime:表示多長時間沒有寫恢氯,就會發(fā)送一個心跳檢測包檢測是否連接
                        4、long allIdleTime:表示多長時間沒有讀寫鼓寺,就會發(fā)送一個心跳檢測包檢測是否連接
                        5、文檔說明:triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.
                        6勋磕、當 IdleStateEvent 觸發(fā)后妈候,就會傳遞給管道 的下一個handler去處理
                            通過調(diào)用(觸發(fā))下一個handler 的 userEventTriggered,在該方法中去處理 IdleStateEvent(讀空閑挂滓,寫空閑苦银,讀寫空閑)
                     */
                    pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
                    //加入一個對空閑檢測進一步處理的handler(自定義)
                    pipeline.addLast(new MyServerHandler());
                }
            });
            //啟動服務器
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  • MyServerHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
public class MyServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * @param ctx 上下文
     * @param evt 事件
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            //將 evt 向下轉型 IdleStateEvent
            IdleStateEvent event = (IdleStateEvent) evt;
            String eventType = null;
            switch (event.state()) {
                case READER_IDLE:
                    eventType = "讀空閑";
                    break;
                case WRITER_IDLE:
                    eventType = "寫空閑";
                    break;
                case ALL_IDLE:
                    eventType = "讀寫空閑";
                    break;
            }
            System.out.println(ctx.channel().remoteAddress() + "--超時時間--" + eventType);
            System.out.println("服務器做相應處理...");
            //若發(fā)生空閑,則關閉通道
            //ctx.channel().close();
        }
    }
}
  • Netty 應用實例:Netty 通過 WebSocket 編程實現(xiàn)服務器和客戶端長連接赶站。
  • 要求:實現(xiàn)基于 WebSocket 的長連接的全雙工地交互幔虏,通過改變 Http 協(xié)議多次請求的約束,實現(xiàn)了長連接贝椿,服務器可以發(fā)送消息給瀏覽器想括。客戶端瀏覽器和服務器端會相互感知烙博,比如服務器關閉了瑟蜈,瀏覽器會感知,同樣瀏覽器關閉了渣窜,服務器會感知铺根。
  • hello.html
<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8">
    <title>Title</title>
  </head>
  <body>
    <script>
      var socket;
      //判斷當前瀏覽器是否支持websocket
      if (window.WebSocket) {
        //go on
        socket = new WebSocket("ws://localhost:7000/hello");
        //相當于channelReado,ev收到服務器端回送的消息
        socket.onmessage = function (ev) {
          var rt = document.getElementById("responseText");
          rt.value = rt.value + "\n" + ev.data;
        };
        //相當于連接開啟(感知到連接開啟)
        socket.onopen = function (ev) {
          var rt = document.getElementById("responseText");
          rt.value = "連接開啟了.."
        };
        //相當于連接關閉(感知到連接關閉)
        socket.onclose = function (ev) {
          var rt = document.getElementById("responseText");
          rt.value = rt.value + "\n" + "連接關閉了.."
        }
      } else {
        alert("當前瀏覽器不支持websocket...")
      }
      //發(fā)送消息到服務器
      function send(message) {
        if (!window.socket) { //先判斷socket是否創(chuàng)建好
          return;
        }
        if (socket.readyState == WebSocket.OPEN) {
          //通過socket 發(fā)送消息
          socket.send(message)
        } else {
          alert("連接沒有開啟...");
        }
      }
    </script>
    <form onsubmit="return false">
      <textarea name="message" style="height: 300px; width: 300px"></textarea>
      <input type="button" value="發(fā)生消息" onclick="send(this.form.message.value)">
      <textarea id="responseText" style="height: 300px; width: 300px"></textarea>
      <input type="button" value="清空內(nèi)容" onclick="document.getElementById('responseText').value=''">
    </form>
  </body>
</html>
  • MyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class MyServer {
    public static void main(String[] args) throws Exception {
        //創(chuàng)建兩個線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    //因為基于http協(xié)議乔宿,所以要使用http的編碼和解碼器
                    pipeline.addLast(new HttpServerCodec());
                    //數(shù)據(jù)以塊方式寫位迂,需要添加ChunkedWriteHandler處理器
                    pipeline.addLast(new ChunkedWriteHandler());
                    /*
                        1、http數(shù)據(jù)在傳輸過程中是分段详瑞,HttpObjectAggregator掂林,就是可以將多個段聚合
                        2、這就是為什么當瀏覽器發(fā)送大量數(shù)據(jù)時蛤虐,就會發(fā)出多次http請求
                     */
                    pipeline.addLast(new HttpObjectAggregator(8192));
                    /*
                        1党饮、對應websocket,它的數(shù)據(jù)是以幀(frame) 形式傳遞
                        2驳庭、WebSocketFrame 有六個實現(xiàn)子類
                        3刑顺、瀏覽器請求 ws://localhost:7000/xxx:表示請求的uri
                        4氯窍、WebSocketServerProtocolHandler 核心功能是將http協(xié)議升級為ws協(xié)議(通過一個狀態(tài)碼 101來切換),保持長連接狀態(tài)
                     */
                    pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
                    //自定義的handler蹲堂,處理業(yè)務邏輯
                    pipeline.addLast(new MyTextWebSocketFrameHandler());
                }
            });
            //啟動服務器
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  • MyTextWebSocketFrameHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.time.LocalDateTime;
//這里 TextWebSocketFrame 類型狼讨,表示一個文本幀(frame)
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        System.out.println("服務器收到消息:" + msg.text());
        //回復消息
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服務器時間:" + LocalDateTime.now() + " " + msg.text()));
    }
    //當web客戶端連接后,handlerAdded 方法被觸發(fā)執(zhí)行
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        //id 表示唯一的值柒竞,LongText 是唯一的政供,ShortText 不是唯一
        System.out.println("handlerAdded 被調(diào)用:" + ctx.channel().id().asLongText());
        System.out.println("handlerAdded 被調(diào)用:" + ctx.channel().id().asShortText());
    }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerRemoved 被調(diào)用:" + ctx.channel().id().asLongText());
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("異常發(fā)生 " + cause.getMessage());
        ctx.close(); //關閉連接
    }
}
  • 編寫網(wǎng)絡應用程序時,因為數(shù)據(jù)在網(wǎng)絡中傳輸?shù)亩际嵌M制字節(jié)碼數(shù)據(jù)朽基,所以在發(fā)送數(shù)據(jù)時就需要編碼布隔,接收數(shù)據(jù)時就需要解碼。
  • codec(編解碼器)的組成部分有兩個:decoder(解碼器)和 encoder(編碼器)稼虎。encoder 負責把業(yè)務數(shù)據(jù)轉換成字節(jié)碼數(shù)據(jù)衅檀,decoder 負責把字節(jié)碼數(shù)據(jù)轉換成業(yè)務數(shù)據(jù)。
  • Netty 提供的編碼器:StringEncoder:對字符串數(shù)據(jù)進行編碼霎俩;ObjectEncoder:對Java對象進行編碼哀军。
  • Netty 提供的解碼器:StringDecoder:對字符串數(shù)據(jù)進行解碼;ObjectDecoder:對 Java 對象進行解碼打却。
  • Netty 本身自帶的ObjectDecoderObjectEncoder可以用來實現(xiàn) POJO 對象或各種業(yè)務對象的編碼和解碼杉适,底層使用的仍是Java序列化技術,而Java序列化技術本身效率就不高柳击,存在如下問題:①無法跨語言猿推;②序列化后的體積太大,是二進制編碼的5倍多腻暮;③序列化性能太低彤守。
  • Protobuf(Google Protocol Buffers)是一種輕便高效的結構化數(shù)據(jù)存儲格式,可以用于結構化數(shù)據(jù)串行化(序列化)哭靖。它很適合做數(shù)據(jù)存儲或 RPC 數(shù)據(jù)交換格式(遠程過程調(diào)用 remote procedure call)具垫。目前很多公司使用的數(shù)據(jù)交換技術:http + json tcp + protobuf。
  • Protobuf 是以message的方式來管理數(shù)據(jù)的试幽,并且支持跨平臺筝蚕、跨語言。使用 protobuf 編譯器能自動生成代碼铺坞,Protobuf 是將類的定義使用.proto文件進行描述起宽。
  • Protobuf 入門案例:要求:客戶端可以隨機發(fā)送StudentPoJo / WorkerPoJo對象到服務器,服務端能接StudentPoJo / WorkerPoJo對象(需要判斷是哪種類型)济榨,并顯示信息(通過 Protobuf 解碼)坯沪。
  • 導入protobuf依賴:
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.13.0</version>
</dependency>
  • Student.proto
syntax = "proto3";
option optimize_for = SPEED; // 加快解析
option java_package = "com.zzw.netty.codec2"; //指定生成到哪個包下
option java_outer_classname = "MyDataInfo"; // 外部類名, 文件名
//protobuf 可以使用 message 管理其他的message
message MyMessage {
  //定義一個枚舉類型
  enum DataType {
    StudentType = 0; //在proto3中要求enum的編號從0開始
    WorkerType = 1;
  }
  //用data_type 來標識傳的是哪一個枚舉類型
  DataType data_type = 1;
  //表示每次枚舉類型最多只能出現(xiàn)其中的一個,節(jié)省空間
  oneof dataBody {
    Student student = 2;
    Worker worker = 3;
  }
}
message Student {
  int32 id = 1;//Student類的屬性
  string name = 2;
}
message Worker {
  string name = 1;
  int32 age = 2;
}
  • 下載好protoc編譯器擒滑,配置好環(huán)境變量腐晾,然后編譯Student.proto文件并在當前路徑下存放生成的文件MyDataInfo.javaprotoc --java_out=. Student.proto叉弦。
  • NettyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
public class NettyServer {
    public static void main(String[] args) throws Exception {
        //創(chuàng)建BossGroup 和 WorkerGroup
        //說明
        //1、創(chuàng)建兩個線程組 bossGroup 和 workerGroup
        //2藻糖、bossGroup 只是處理連接請求淹冰,真正的客戶端業(yè)務處理是會交給 workerGroup 完成
        //3、兩個都是無限循環(huán)
        //4巨柒、bossGroup 和 workerGroup 含有的子線程(NioEventLoop)的個數(shù):默認值為 cpu核數(shù) * 2
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個
        try {
            //創(chuàng)建服務器端的啟動對象樱拴,配置參數(shù)
            ServerBootstrap bootstrap = new ServerBootstrap();
            //使用鏈式編程來進行設置
            bootstrap.group(bossGroup, workerGroup) //設置兩個線程組
                    .channel(NioServerSocketChannel.class) //使用 NioSocketChannel 作為服務器的通道實現(xiàn)
                    .option(ChannelOption.SO_BACKLOG, 128) //設置線程隊列等待連接的個數(shù)
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //設置保持活動連接狀態(tài)
                    //.handler(null) // 該 handler對應 bossGroup , childHandler 對應 workerGroup
                    .childHandler(new ChannelInitializer<SocketChannel>() { //創(chuàng)建一個通道初始化對象(匿名對象)
                        //給 pipeline 設置處理器
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //在pipeline中加入 ProtobufDecoder
                            //指定對哪種對象進行解碼
                            pipeline.addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
                            pipeline.addLast(new NettyServerHandler()); //給 workerGroup 的 EventLoop 對應的管道設置處理器
                        }
                    });
            System.out.println(".....服務器 is ready...");
            //綁定一個端口并且同步處理,生成了一個 ChannelFuture 對象
            //啟動服務器(并綁定端口)
            ChannelFuture cf = bootstrap.bind(6668).sync();
            //給 cf 注冊監(jiān)聽器洋满,監(jiān)控我們關心的事件
            //綁定端口是異步操作晶乔,當綁定操作處理完,將會調(diào)用相應的監(jiān)聽器處理邏輯
            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("監(jiān)聽端口 6668 成功");
                    } else {
                        System.out.println("監(jiān)聽端口 6668 失敗");
                    }
                }
            });
            //對關閉通道進行監(jiān)聽
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  • NettyServerHandler.java
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
 * 說明:自定義一個Handler 需要繼承 netty 規(guī)定好的某個HandlerAdapter(規(guī)范)
 */
//public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
    //讀取數(shù)據(jù)實際(這里我們可以讀取客戶端發(fā)送的消息)
    /**
     * 1芦岂、ChannelHandlerContext ctx:上下文對象瘪弓,含有管道 pipeline,通道channel禽最,地址
     * 2、Object msg:客戶端發(fā)送的數(shù)據(jù)袱饭,默認是 Object
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MyDataInfo.MyMessage msg) throws Exception {
        //根據(jù)dataType來顯示不同的信息
        MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
        if (dataType == MyDataInfo.MyMessage.DataType.StudentType) {
            MyDataInfo.Student student = msg.getStudent();
            System.out.println("學生id:" + student.getId() + "川无,學生姓名:" + student.getName());
        } else if (dataType == MyDataInfo.MyMessage.DataType.WorkerType) {
            MyDataInfo.Worker worker = msg.getWorker();
            System.out.println("工人的年齡:" + worker.getAge() + ",工人的姓名:" + worker.getName());
        } else {
            System.out.println("傳輸?shù)念愋筒徽_虑乖!");
        }
    }
    //數(shù)據(jù)讀取完畢
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //writeAndFlush:write + flush
        //將數(shù)據(jù)寫入到緩存懦趋,并刷新
        //一般需要對發(fā)送的數(shù)據(jù)進行編碼
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客戶端~(>^ω^<)喵1", CharsetUtil.UTF_8));
    }
    //發(fā)生異常時疹味,需要關閉通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
  • NettyClient.java
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
public class NettyClient {
    public static void main(String[] args) throws Exception {
        //客戶端需要一個事件循環(huán)組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //創(chuàng)建客戶端啟動對象
            //注意客戶端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //設置相關參數(shù)
            bootstrap.group(group) //設置線程組
                    .channel(NioSocketChannel.class) // 設置客戶端通道的實現(xiàn)類(反射)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            //在pipeline中加入編碼器 ProtobufEncoder()
                            pipeline.addLast("encoder", new ProtobufEncoder());
                            //加入自定義的處理器
                            pipeline.addLast(new NettyClientHandler());
                        }
                    });
            System.out.println("...客戶端 is ok...");
            //啟動客戶端去連接服務器端
            //關于ChannelFuture 涉及到netty的異步模型
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            //給關閉通道進行監(jiān)聽
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}
  • NettyClientHandler.java
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.Random;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    //當通道就緒就會觸發(fā)該方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //隨機地發(fā)送 Student 或者 Worker 對象
        int num = new Random().nextInt(3);
        MyDataInfo.MyMessage myMessage = null;
        if (0 == num) {
            myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType)
                    .setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("玉麒麟 盧俊義").build()).build();
        } else {
            //發(fā)送一個Worker 對象
            myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType)
                    .setWorker(MyDataInfo.Worker.newBuilder().setAge(20).setName("老李").build()).build();
        }
        ctx.writeAndFlush(myMessage);
    }
    //當通道有讀取事件時仅叫,會觸發(fā)
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服務器回復的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服務器的地址:" + ctx.channel().remoteAddress());
    }
    //發(fā)生異常時,需要關閉通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  • Netty 的主要組件有 Channel糙捺、EventLoop诫咱、ChannelFuture、ChannelHandler洪灯、ChannelPipe 等坎缭。
  • ChannelHandler 充當了處理入站和出站數(shù)據(jù)的應用程序邏輯的容器。例如签钩,實現(xiàn) ChannelInboundHandler 接口(或 ChannelInboundHandlerAdapter)掏呼,就可以接收入站事件和數(shù)據(jù),這些數(shù)據(jù)會被業(yè)務邏輯處理铅檩。當要給客戶端發(fā)送響應時憎夷,也可以從 ChannelInboundHandler 沖刷數(shù)據(jù)。業(yè)務邏輯通常寫在一個或者多個 ChannelInboundHandler 中昧旨。ChannelOutboundHandler 原理一樣拾给,只不過它是用來處理出站數(shù)據(jù)的祥得。
  • ChannelPipeline 提供了 ChannelHandler 鏈的容器。以客戶端應用程序為例鸣戴,若事件的運動方向是從客戶端到服務端啃沪,則稱這些事件為出站的,即客戶端發(fā)送給服務端的數(shù)據(jù)會通過 pipeline 中的一系列 ChannelOutboundHandler窄锅,并被這些 Handler 處理创千,反之則稱為入站的。
  • 當 Netty 發(fā)送或者接受一個消息時入偷,就將會發(fā)生一次數(shù)據(jù)轉換追驴。入站消息會被解碼:從字節(jié)轉換為另一種格式(比如 java 對象);若是出站消息疏之,則會被編碼成字節(jié)殿雪。
  • Netty 提供一系列實用的編解碼器,它們都實現(xiàn)了 ChannelInboundHadnler 或ChannelOutboundHandler 接口锋爪。在這些類中丙曙,channelRead 方法已經(jīng)被重寫了。以入站為例其骄,對于每個從入站 Channel 讀取的消息亏镰,這個方法會被調(diào)用。隨后拯爽,它將調(diào)用由解碼器所提供的 decode() 方法進行解碼索抓,并將已經(jīng)解碼的字節(jié)轉發(fā)給 ChannelPipeline 中的下一個 ChannelInboundHandler。
解碼器-ByteToMessageDecoder
  • 由于不可能知道遠程節(jié)點是否會一次性發(fā)送一個完整的信息毯炮,tcp 有可能出現(xiàn)粘包拆包的問題逼肯,這個類會對入站數(shù)據(jù)進行緩沖,直到它準備好被處理為止桃煎。
關于 ByteToMessageDecoder 實例分析
  • Netty 應用實例:使用自定義的編碼器和解碼器來說明 Netty 的 handler 鏈調(diào)用機制篮幢,要求客戶端發(fā)送 long類型的數(shù)據(jù)給服務器,同時服務端也發(fā)送 long 類型的數(shù)據(jù)給客戶端备禀。
  • MyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MyServer {
    public static void main(String[] args) throws Exception {
        //創(chuàng)建兩個線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup);
            serverBootstrap.channel(NioServerSocketChannel.class);
            //自定義初始化類
            serverBootstrap.childHandler(new MyServerInitializer());
            //啟動服務器
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  • MyServerInitializer.java
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //注意編碼解碼操作必須在handler操作前
        //入站的handler進行解碼
        pipeline.addLast(new MyByteToLongDecoder());
        //出站的handler進行編碼
        pipeline.addLast(new MyLongToByteEncoder());
        //自定義一個handler來處理業(yè)務邏輯
        pipeline.addLast(new MyServerHandler());
    }
}
  • MyServerHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
        System.out.println("從客戶端:" + ctx.channel().remoteAddress() + " 讀到的數(shù)據(jù)為:" + msg);
        //給客戶端發(fā)送一個long類型的數(shù)據(jù)
        ctx.writeAndFlush(98765L);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  • MyByteToLongDecoder.java
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class MyByteToLongDecoder extends ByteToMessageDecoder {
    /**
     * decode 會被調(diào)用多次洲拇,直到確定沒有新的元素被添加到 list 集合為止 或者 ByteBuf 沒有更多可讀的字節(jié)為止
     * 若 list out不為空,則會將 list 的內(nèi)容傳遞給下一個 channelinboundhandler 處理曲尸,該處理器的方法也會被調(diào)用多次
     * @param ctx 上下文對象
     * @param in  入站的 ByteBuf
     * @param out 將解碼后的數(shù)據(jù)傳給下一個 handler
     * @throws Exception
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MyByteToLongDecoder decode 被調(diào)用");
        if (in.readableBytes() >= 8) {
            out.add(in.readLong());
        }
    }
}
  • MyClient.java
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyClient {
    public static void main(String[] args) throws Exception {
        //客戶端需要一個事件循環(huán)組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //創(chuàng)建客戶端啟動對象
            //注意客戶端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //設置相關參數(shù)
            bootstrap.group(group) //設置線程組
                    .channel(NioSocketChannel.class) // 設置客戶端通道的實現(xiàn)類(反射)
                    .handler(new MyClientInitializer());
            System.out.println("...客戶端 is ok...");
            //啟動客戶端去連接服務器端
            //關于ChannelFuture 涉及到netty的異步模型
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7000).sync();
            //給關閉通道進行監(jiān)聽
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}
  • MyClientInitializer.java
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //注意:順序不能相反
        //加入一個出站的handler赋续,對數(shù)據(jù)進行編碼
        pipeline.addLast(new MyLongToByteEncoder());
        //加入一個入站的handler,對數(shù)據(jù)進行解碼
        pipeline.addLast(new MyByteToLongDecoder());
        //加入一個自定義的handler來處理業(yè)務
        pipeline.addLast(new MyClientHandler());
    }
}
  • MyLongToByteEncoder.java
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
        System.out.println("MyLongToByteEncoder encode 被調(diào)用");
        System.out.println("msg=" + msg);
        out.writeLong(msg);
    }
}
  • MyClientHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
        System.out.println("服務器的ip:" + ctx.channel().remoteAddress());
        System.out.println("收到服務器的消息:" + msg);
    }
    //發(fā)送數(shù)據(jù)
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("MyClientHandler 發(fā)送數(shù)據(jù)");
        //1另患、"abcdabcdabcdabcd" 是 16個字節(jié)
        //2纽乱、該處理器的前一個handler 是 MyLongToByteEncoder
        //3、MyLongToByteEncoder 的父類:MessageToByteEncoder
        //4昆箕、父類 MessageToByteEncoder
        /*
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try { //判斷當前msg 是否為處理的類型鸦列,若是則處理租冠,否則跳過自定義的 encode 過程
            if (this.acceptOutboundMessage(msg)) {
                I cast = msg;
                buf = this.allocateBuffer(ctx, msg, this.preferDirect);
                try {
                    this.encode(ctx, cast, buf);
                } finally {
                    ReferenceCountUtil.release(msg);
                }
                if (buf.isReadable()) {
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                ctx.write(msg, promise);
            }
        } catch (EncoderException var17) {
            throw var17;
        } catch (Throwable var18) {
            throw new EncoderException(var18);
        } finally {
            if (buf != null) {
                buf.release();
            }
        }
    }
        5、因此薯嗤,我們編寫 Encoder 是要注意傳入的數(shù)據(jù)類型和處理的數(shù)據(jù)類型一致
        */
        // ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd",CharsetUtil.UTF_8));
        ctx.writeAndFlush(1234567L);
    }
}
  • 總結:①不論解碼器 handler 還是編碼器 handler 即接收的消息類型必須與待處理的消息類型必須一致顽爹,否則該 handler 不會被執(zhí)行;②在解碼器進行數(shù)據(jù)解碼時骆姐,需要判斷緩存區(qū)(ByteBuf)的數(shù)據(jù)是否足夠镜粤,否則接收到的結果和期望結果可能不一致。
  • ReplayingDecoder解碼器(public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder)擴展了ByteToMessageDecoder類玻褪,使用了這個類肉渴,我們就不必調(diào)用readableBytes()方法。其中带射,參數(shù) S 指定了用戶狀態(tài)管理的類型同规,Void值代表不需要狀態(tài)管理。
  • 應用實例:使用 ReplayingDecoder編寫解碼器對上一個案例進行簡化窟社。
  • MyByteToLongDecoder2.java
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MyByteToLongDecoder2 被調(diào)用");
        //在 ReplayingDecoder 不需要判斷數(shù)據(jù)是否足夠讀取券勺,內(nèi)部會進行處理判斷
        out.add(in.readLong());
    }
}
  • 雖然使用ReplayingDecoder很方便,但它也有一些局限性:
    • 并不是所有的 ByteBuf 操作都被支持灿里,若調(diào)用了一個不被支持的方法朱灿,則會拋出一個UnsupportedOperationException
    • ReplayingDecoder在某些情況下可能稍慢于ByteToMessageDecoder钠四,例如網(wǎng)絡緩慢并且消息格式復雜時,消息會被拆成了多個碎片跪楞,速度變慢缀去。
  • 其它的編解碼器:
    • LineBasedFrameDecoder:這個類在 Netty 內(nèi)部也有使用,它使用行尾控制字符(\n或者\r\n)作為分隔符來解析數(shù)據(jù)甸祭。
    • DelimiterBasedFrameDecoder:使用自定義的特殊字符作為消息的分隔符缕碎。
    • HttpObjectDecoder:一個 HTTP 數(shù)據(jù)的解碼器。
    • LengthFieldBasedFrameDecoder:通過指定長度來標識整包消息池户,這樣就可以自動地處理黏包和半包消息咏雌。
  • TCP 是面向連接的,面向流的校焦,提供高可靠性的服務赊抖。收發(fā)兩端(客戶端和服務器端)都要有一一成對的 socket。因此寨典,發(fā)送端為了將多個發(fā)給接收端的包更有效地發(fā)給對方氛雪,使用了優(yōu)化方法(Nagle算法),將多次間隔較小且數(shù)據(jù)量小的數(shù)據(jù)耸成,合并成一個大的數(shù)據(jù)塊报亩,然后進行封包浴鸿。這樣做雖然提高了效率,但是接收端難于分辨出完整的數(shù)據(jù)包了弦追,因為面向流的通信是無消息保護邊界的岳链。
  • 由于 TCP 無消息保護邊界,需要在接收端處理消息邊界問題劲件,也就是我們所說的粘包掸哑、拆包問題。
TCP 粘包寇仓、拆包圖解
  • 對上圖的說明:假設客戶端分別發(fā)送了兩個數(shù)據(jù)包 D1 和 D2 給服務端举户,由于服務端一次讀取到字節(jié)數(shù)是不確定的,故可能存在以下四種情況:
    • 服務端分兩次讀取到了兩個獨立的數(shù)據(jù)包遍烦,分別是 D1 和 D2俭嘁,沒有粘包和拆包。
    • 服務端一次接受到了兩個數(shù)據(jù)包服猪,D1 和 D2 粘合在一起供填,稱之為TCP 粘包
    • 服務端分兩次讀取到了數(shù)據(jù)包罢猪,第一次讀取到了完整的 D1 包和 D2 包的部分內(nèi)容近她,第二次讀取到了 D2 包的剩余內(nèi)容,稱之為TCP 拆包膳帕。
    • 服務端分兩次讀取到了數(shù)據(jù)包粘捎,第一次讀取到了 D1 包的部分內(nèi)容 D1_1,第二次讀取到了 D1 包的剩余部分內(nèi)容 D1_2 和完整的 D2 包危彩,也稱之為TCP 拆包攒磨。
  • Netty應用實例:TCP 粘包和拆包現(xiàn)象實例。
  • MyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MyServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定義一個初始化類
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  • MyServerInitializer.java
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyServerHandler());
    }
}
  • MyServerHandler.java
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private int count;
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //cause.printStackTrace();
        ctx.close();
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);
        //將buffer轉成字符串
        String message = new String(buffer, StandardCharsets.UTF_8);
        System.out.println("服務器接收到數(shù)據(jù):" + message);
        System.out.println("服務器接收到消息量為:" + (++this.count));
        //服務器回送數(shù)據(jù)給客戶端汤徽,回送一個隨機id
        ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString() + " ", StandardCharsets.UTF_8);
        ctx.writeAndFlush(responseByteBuf);
    }
}
  • MyClient.java
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new MyClientInitializer()); //自定義一個初始化類
            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}
  • MyClientInitializer.java
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyClientHandler());
    }
}
  • MyClientHandler.java
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.StandardCharsets;
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //使用客戶端發(fā)送10條數(shù)據(jù) hello娩缰,server + 編號
        for (int i = 0; i < 10; ++i) {
            ByteBuf buffer = Unpooled.copiedBuffer("hello,server + " + i, StandardCharsets.UTF_8);
            ctx.writeAndFlush(buffer);
        }
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);
        String message = new String(buffer, StandardCharsets.UTF_8);
        System.out.println("客戶端接收到消息:" + message);
        System.out.println("客戶端接收消息數(shù)量為:" + (++this.count));
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  • TCP 粘包和拆包解決方案:使用自定義協(xié)議包+編解碼器來解決谒府,關鍵就是要解決服務器端每次讀取數(shù)據(jù)長度的問題拼坎,一旦解決了,就不會出現(xiàn)服務器多讀或少讀數(shù)據(jù)的問題完疫,從而避免的 TCP 粘包泰鸡、拆包。
  • Netty 應用實例:解決TCP 粘包趋惨、拆包問題鸟顺。
  • 要求:客戶端一共發(fā)送 5 個Message對象,每發(fā)送一個Message對象,服務器端就接收一個 Message讯嫂,將其解碼并回復一個 Message 對象給客戶端蹦锋。
  • MessageProtocol.java
//協(xié)議包
public class MessageProtocol {
    private int len; //關鍵
    private byte[] content;
    public int getLen() {
        return len;
    }
    public void setLen(int len) {
        this.len = len;
    }
    public byte[] getContent() {
        return content;
    }
    public void setContent(byte[] content) {
        this.content = content;
    }
}
  • MyMessageEncoder.java
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MyMessageEncoder encode 方法被調(diào)用");
        out.writeInt(msg.getLen());
        out.writeBytes(msg.getContent());
    }
}
  • MyMessageDecoder.java
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
public class MyMessageDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MyMessageDecoder decode 被調(diào)用");
        //需要將得到二進制字節(jié)碼轉成 MessageProtocol 數(shù)據(jù)包(對象)
        int length = in.readInt();
        byte[] content = new byte[length];
        in.readBytes(content);
        //封裝成 MessageProtocol 對象,放入 out欧芽, 傳遞下一個handler業(yè)務處理
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLen(length);
        messageProtocol.setContent(content);
        out.add(messageProtocol);
    }
}
  • MyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MyServer {
    public static void main(String[] args) throws Exception{
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定義一個初始化類
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
  • MyServerInitializer.java
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyMessageDecoder());//解碼器
        pipeline.addLast(new MyMessageEncoder());//編碼器
        pipeline.addLast(new MyServerHandler());
    }
}
  • MyServerHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
//處理業(yè)務的handler
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    private int count;
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //cause.printStackTrace();
        ctx.close();
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        //接收到數(shù)據(jù)莉掂,并處理
        int len = msg.getLen();
        byte[] content = msg.getContent();
        System.out.println();
        System.out.println("服務器接收到信息如下:");
        System.out.println("長度=" + len);
        System.out.println("內(nèi)容=" + new String(content, StandardCharsets.UTF_8));
        System.out.println("服務器接收到消息包數(shù)量=" + (++this.count));
        //回復消息
        String responseContent = UUID.randomUUID().toString();
        int responseLen = responseContent.getBytes(StandardCharsets.UTF_8).length;
        byte[] responseContent2 = responseContent.getBytes(StandardCharsets.UTF_8);
        //構建一個協(xié)議包
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLen(responseLen);
        messageProtocol.setContent(responseContent2);
        ctx.writeAndFlush(messageProtocol);
    }
}
  • MyClient.java
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyClient {
    public static void main(String[] args)  throws  Exception{
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .handler(new MyClientInitializer()); //自定義一個初始化類
            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully();
        }
    }
}
  • MyClientInitializer.java
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyMessageEncoder()); //加入編碼器
        pipeline.addLast(new MyMessageDecoder()); //加入解碼器
        pipeline.addLast(new MyClientHandler());
    }
}
  • MyClientHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.StandardCharsets;
public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //使用客戶端發(fā)送10條數(shù)據(jù) "今天天氣冷,吃火鍋" 編號
        for (int i = 0; i < 5; i++) {
            String mes = "今天天氣冷千扔,吃火鍋憎妙!";
            byte[] content = mes.getBytes(StandardCharsets.UTF_8);
            int length = content.length;
            //創(chuàng)建協(xié)議包對象
            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setLen(length);
            messageProtocol.setContent(content);
            ctx.writeAndFlush(messageProtocol);
        }
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        int len = msg.getLen();
        byte[] content = msg.getContent();
        System.out.println("客戶端接收到消息如下:");
        System.out.println("長度=" + len);
        System.out.println("內(nèi)容=" + new String(content, StandardCharsets.UTF_8));
        System.out.println("客戶端接收消息數(shù)量=" + (++this.count));
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("異常消息:" + cause.getMessage());
        ctx.close();
    }
}
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市曲楚,隨后出現(xiàn)的幾起案子厘唾,更是在濱河造成了極大的恐慌,老刑警劉巖龙誊,帶你破解...
    沈念sama閱讀 216,651評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件抚垃,死亡現(xiàn)場離奇詭異,居然都是意外死亡趟大,警方通過查閱死者的電腦和手機鹤树,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,468評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來逊朽,“玉大人罕伯,你說我怎么就攤上這事∵椿洌” “怎么了追他?”我有些...
    開封第一講書人閱讀 162,931評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長岛蚤。 經(jīng)常有香客問我湿酸,道長,這世上最難降的妖魔是什么灭美? 我笑而不...
    開封第一講書人閱讀 58,218評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮昂利,結果婚禮上届腐,老公的妹妹穿的比我還像新娘。我一直安慰自己蜂奸,他們只是感情好犁苏,可當我...
    茶點故事閱讀 67,234評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著扩所,像睡著了一般围详。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,198評論 1 299
  • 那天助赞,我揣著相機與錄音买羞,去河邊找鬼。 笑死雹食,一個胖子當著我的面吹牛畜普,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播群叶,決...
    沈念sama閱讀 40,084評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼吃挑,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了街立?” 一聲冷哼從身側響起舶衬,我...
    開封第一講書人閱讀 38,926評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎赎离,沒想到半個月后逛犹,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,341評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡蟹瘾,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,563評論 2 333
  • 正文 我和宋清朗相戀三年圾浅,在試婚紗的時候發(fā)現(xiàn)自己被綠了渊啰。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片盼樟。...
    茶點故事閱讀 39,731評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖乐设,靈堂內(nèi)的尸體忽然破棺而出众雷,到底是詐尸還是另有隱情灸拍,我是刑警寧澤,帶...
    沈念sama閱讀 35,430評論 5 343
  • 正文 年R本政府宣布砾省,位于F島的核電站鸡岗,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏编兄。R本人自食惡果不足惜轩性,卻給世界環(huán)境...
    茶點故事閱讀 41,036評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望狠鸳。 院中可真熱鬧揣苏,春花似錦、人聲如沸件舵。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,676評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽铅祸。三九已至坑质,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背涡扼。 一陣腳步聲響...
    開封第一講書人閱讀 32,829評論 1 269
  • 我被黑心中介騙來泰國打工稼跳, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人壳澳。 一個月前我還...
    沈念sama閱讀 47,743評論 2 368
  • 正文 我出身青樓岂贩,卻偏偏與公主長得像,于是被迫代替她去往敵國和親巷波。 傳聞我的和親對象是個殘疾皇子萎津,可洞房花燭夜當晚...
    茶點故事閱讀 44,629評論 2 354

推薦閱讀更多精彩內(nèi)容