本章要點:
- java序列化缺點
- 業(yè)界流行的集中編解碼框架介紹
3.1 java序列化缺點
- 無法跨越語言扳缕,是java序列化最致命的缺點饰潜;
- 序列化后的碼流太大
- 序列化性能太低
3.2 主流編輯碼框架
3.2.1 Google Protobuf編解碼
Protobuf在業(yè)界非常流行,Protobuf具有以下有點:
- 產品成熟
- 跨語言箕别、支持多種語言
- 編碼后消息更小铜幽,便與存儲和傳輸
- 編解碼性能高、
- 支持定義可選和必選字段
Protobuf是一個靈活串稀、高效除抛、結構化的數(shù)據(jù)序列化框架,相比于XML等傳統(tǒng)的序列化工具母截,它更小到忽、更快、更簡單清寇。
https://github.com/google/protobuf/releases
下載protof工具喘漏,protoc-3.5.1-win32.zip,并解壓华烟,會看到protoc.exe工具翩迈。
User.proto的文件內容如下:
package com.bj58.wuxian.protobuf;
option java_outer_classname = "UserProto";
message User{
required int32 id=1;
required string username=2;
required string password=3;
enum Sex{
nan=1;
nv=2;
}
required Sex sex=4;
}
執(zhí)行如下命令:
D:/develop/protoc-3.5.1-win32/bin/protoc.exe -I=./proto --java_out=D:/develop/protoc-3.5.1-win32/bin/proto/ ./proto/User.proto
生成了一個UserProto.java文件。
protoc.exe -I=proto的輸入目錄 --java_out=java類輸出目錄 proto的輸入目錄包括包括proto文件
此時將UserProto.java文件拷貝到IDE中會報錯垦江,那是因為缺少包依賴:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.5.1</version>
</dependency>
protobuf序列化傳輸代碼實例:
UserServer代碼:
package com.bj58.wuxian.netty.codec.protobuf;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import com.bj58.wuxian.protobuf.UserProto;
public class UserServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workGroup=new NioEventLoopGroup();
try {
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());//用于半包處理
ch.pipeline().addLast(new ProtobufDecoder(UserProto.User.getDefaultInstance()));//ProtobufDecoder參數(shù)是所要解碼的目標類
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
ch.pipeline().addLast(new UserServerHandler());
}
});
ChannelFuture f=bootstrap.bind(8888).sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally{
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
UserServerHandler 代碼:
package com.bj58.wuxian.netty.codec.protobuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import com.bj58.wuxian.protobuf.UserProto;
import com.bj58.wuxian.protobuf.UserProto.User.Sex;
public class UserServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("**********server channelActive*************");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
UserProto.User user =(UserProto.User)msg;
System.out.println("request:"+msg);
UserProto.User.Builder builder = UserProto.User.newBuilder();
builder.setId(123);
builder.setSex(Sex.nv);
builder.setPassword("45678");
if("zhaoshichao".equalsIgnoreCase(user.getUsername())){
builder.setUsername("shangjing");
}else{
builder.setUsername("guanggunhan");
}
UserProto.User wife = builder.build();
ctx.writeAndFlush(wife);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
}
UserClient 代碼:
package com.bj58.wuxian.netty.codec.protobuf;
import com.bj58.wuxian.protobuf.UserProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class UserClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group=new NioEventLoopGroup();
try {
Bootstrap bootstrap=new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(new ProtobufDecoder(UserProto.User.getDefaultInstance()));
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
ch.pipeline().addLast(new UserClientHandler());
}
});
ChannelFuture f=bootstrap.connect("127.0.0.1",8888).sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally{
group.shutdownGracefully();
}
}
}
UserClientHandler 代碼:
package com.bj58.wuxian.netty.codec.protobuf;
import com.bj58.wuxian.protobuf.UserProto;
import com.bj58.wuxian.protobuf.UserProto.User.Sex;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class UserClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=0;i<=5;i++){
UserProto.User.Builder builder = UserProto.User.newBuilder();
builder.setId(123);
builder.setSex(Sex.nan);
builder.setPassword("45678");
if(i%2==0){
builder.setUsername("zhaoshichao");
}else{
builder.setUsername("zhaoshichao"+i);
}
UserProto.User wife = builder.build();
ctx.writeAndFlush(wife);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("response:"+(UserProto.User)msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
3.2.1 Message Pack編解碼
MessagePack特點如下:
- 編解碼高效帽馋,性能高
- 序列化之后的碼流小
- 支持跨語言
添加pom依賴:
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
自定義編碼器:
package com.bj58.wuxian.netty.codec.msgpack;
import org.msgpack.MessagePack;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class MsgPackEncoder extends MessageToByteEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out)
throws Exception {
MessagePack msfPack=new MessagePack();
byte[] bytes=msfPack.write(msg);
out.writeBytes(bytes);
}
}
自定義解碼器:
package com.bj58.wuxian.netty.codec.msgpack;
import java.util.List;
import org.msgpack.MessagePack;
import com.bj58.wuxian.msgpack.model.User;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg,
List<Object> out) throws Exception {
byte[] bytes=new byte[msg.readableBytes()];
msg.readBytes(bytes);
MessagePack messagePack=new MessagePack();
out.add(messagePack.read(bytes,User.class));
}
}
UserServer代碼:
package com.bj58.wuxian.netty.codec.msgpack;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
public class UserServer {
public static void main(String[] argsStrings) throws Exception {
// 配置服務端NIO線程組(boss線程、worker線程)
EventLoopGroup bGroup = new NioEventLoopGroup();
EventLoopGroup wGroup = new NioEventLoopGroup();
// 創(chuàng)建啟動輔助類
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bGroup, wGroup).channel(NioServerSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
// 添加對象系列化編解碼器,同時提供粘包拆包支持
channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
channel.pipeline().addLast("解碼器", new MsgPackDecoder());
channel.pipeline().addLast(new LengthFieldPrepender(2));
channel.pipeline().addLast("編碼器", new MsgPackEncoder());
channel.pipeline().addLast(new UserServerHandler());
}
});
try {
// 監(jiān)聽本地端口,同步等待監(jiān)聽結果
ChannelFuture future = bootstrap.bind(8888).sync();
// 等待服務端監(jiān)聽端口關閉,優(yōu)雅退出
future.channel().closeFuture().sync();
} finally {
bGroup.shutdownGracefully();
wGroup.shutdownGracefully();
}
}
}
ServerHanler代碼:
package com.bj58.wuxian.netty.codec.msgpack;
import com.bj58.wuxian.msgpack.model.User;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class UserServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("**********server channelActive**********");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
User user=(User)msg;
User wife=new User();
if("zhaoshichao".equals(user.getUsername())){
wife.setPassword("1234");
wife.setUsername("shangjing");;
}else{
wife.setPassword("1234");
wife.setUsername("others");
}
ctx.writeAndFlush(wife);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
User客戶端:
package com.bj58.wuxian.netty.codec.msgpack;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
public class UserClient {
public static void main(String [] argsStrings) throws Exception {
//配置客戶端端NIO線程組
EventLoopGroup bGroup = new NioEventLoopGroup();
//創(chuàng)建客戶端啟動輔助類
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(bGroup).
channel(NioSocketChannel.class).
option(ChannelOption.TCP_NODELAY, true).
option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000).
handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//添加對象系列化編解碼器,同時提供粘包拆包支持
channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
channel.pipeline().addLast("解碼器", new MsgPackDecoder());
channel.pipeline().addLast(new LengthFieldPrepender(2));
channel.pipeline().addLast("編碼器", new MsgPackEncoder());
channel.pipeline().addLast(new UserClientHandler());
}
});
//發(fā)起異步連接
ChannelFuture future = bootstrap.connect("127.0.0.1", 8888).sync();
try {
//等待客戶端鏈路關閉
future.channel().closeFuture().sync();
} finally {
//優(yōu)雅退出,釋放資源
bGroup.shutdownGracefully();
}
}
}
ClientHandler代碼:
package com.bj58.wuxian.netty.codec.msgpack;
import com.bj58.wuxian.msgpack.model.User;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class UserClientHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=0;i<=5;i++){
User user=new User();
user.setId(123);
user.setPassword("45678");
if(i%2==0){
user.setUsername("zhaoshichao");
}else{
user.setUsername("zhaoshichao"+i);
}
ctx.writeAndFlush(user);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("response:"+(User)msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}