1. 協(xié)議格式
* * * 自定義協(xié)議 數(shù)據(jù)包格式
* * * -----------------------------------
* * Length 數(shù)據(jù)包長度 Data長度 + Head長度
* * Version 協(xié)議版本號 默認0x0
* * Command 操作指令碼 十六進制
* * MsgType 數(shù)據(jù)類型 0x0:Json,0x1:ProtoBuf,0x2:Xml,默認:0x0
* * SecretKey 密鑰key 默認0x0,明文
* * Data 業(yè)務(wù)數(shù)據(jù)包
2. 自定義編碼器
/**
* @author wangxx
* @data 2020/6/19
* describe 自定義編碼器
* * * -----------------------------------
*/
public class CustomizeEncoder extends MessageToByteEncoder<IMMessage> {
private static final String TAG = "CustomizeEncoder";
/**
* 頭部固定長度7個字節(jié)
*/
public static final int HEAD_LENGTH = 7;
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, IMMessage message, ByteBuf byteBuf) throws Exception {
Log.i(TAG, channelHandlerContext.toString() + ",pk=" + message.toString());
if (null == message) {
throw new Exception();
}
String body = message.getData();
byte[] bodyBytes = null;
//包長度
int headLength = message.getLength();
if (!TextUtils.isEmpty(body)) {
bodyBytes = body.getBytes(Charset.forName("utf-8"));
headLength = bodyBytes.length+HEAD_LENGTH;
}
byteBuf.writeShort(headLength);
//版本號
byteBuf.writeByte(message.getVersion());
//操作指令
byteBuf.writeShort(message.getCommand());
//數(shù)據(jù)類型
byteBuf.writeByte(message.getMsgType());
//密鑰key
byteBuf.writeByte(message.getSecretKey());
//數(shù)據(jù)包
if (bodyBytes != null && bodyBytes.length > 0) {
byteBuf.writeBytes(bodyBytes);
}
}
}
3. 自定義解碼器
public class CustomizeDecoder extends ByteToMessageDecoder {
private static final String TAG = "CustomizeDecoder";
/**
* 頭部固定長度7個字節(jié)
*/
public static final int HEAD_LENGTH = 7;
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buffer, List<Object> out) throws Exception {
Log.i(TAG, "decode ChannelHandlerContext=" + channelHandlerContext.toString() + " ,bytebuf=" + buffer.toString() + ",list=" + out.toString());
Log.i(TAG, "decode ChannelHandlerContext=" + convertByteBufToString(buffer));
if (buffer.readableBytes() < HEAD_LENGTH) {
Log.i(TAG, "數(shù)據(jù)不足一條");
return;
}
// 記錄包頭開始的index
int beginReader;
int contentLength;
IMMessage message = new IMMessage();
while (true) {
// 獲取包頭開始的index
beginReader = buffer.readerIndex();
// 標記包頭開始的index
buffer.markReaderIndex();
// 讀到了協(xié)議的開始標志瞳脓,結(jié)束while循環(huán)
short headLength = buffer.readShort();
//版本號
byte version = buffer.readByte();
//操作指令
int command = buffer.readUnsignedShort();
//數(shù)據(jù)類型
byte msgType = buffer.readByte();
//密鑰key
byte secretKey = buffer.readByte();
message.setLength(headLength);
message.setVersion(version);
message.setCommand(command);
message.setMsgType(msgType);
message.setSecretKey(secretKey);
//按照協(xié)議計算data的長度
contentLength = headLength - HEAD_LENGTH;
if (buffer.readableBytes() == contentLength) {
break;
}
// 未讀到包頭亿卤,略過一個字節(jié)
// 每次略過,一個字節(jié),去讀取中姜,包頭信息的開始標記
buffer.resetReaderIndex();
buffer.readByte();
// 當略過睦柴,一個字節(jié)之后徙融,
// 數(shù)據(jù)包的長度毁菱,又變得不滿足
// 此時,應(yīng)該結(jié)束衷佃。等待后面的數(shù)據(jù)到達
if (buffer.readableBytes() < HEAD_LENGTH) {
return;
}
}
// 消息的長度
// 判斷請求數(shù)據(jù)包數(shù)據(jù)是否到齊
if (buffer.readableBytes() < contentLength) {
// 還原讀指針
buffer.readerIndex(beginReader);
return;
}
// 讀取data數(shù)據(jù)
byte[] bytes = new byte[contentLength];
buffer.readBytes(bytes);
if (message.getCommand() == TcpAPI.MIMSocketCommandHeartBeatPong) {
message.setTimeStamp(ByteUtil.bytesToInt(bytes));
} else {
message.setData(new String(bytes, StandardCharsets.UTF_8));
}
out.add(message);
}
public String convertByteBufToString(ByteBuf buf) {
String str;
if (buf.hasArray()) { // 處理堆緩沖區(qū)
str = new String(buf.array(), buf.arrayOffset() + buf.readerIndex(), buf.readableBytes());
} else { // 處理直接緩沖區(qū)以及復合緩沖區(qū)
byte[] bytes = new byte[buf.readableBytes()];
buf.getBytes(buf.readerIndex(), bytes);
str = new String(bytes, 0, buf.readableBytes());
}
return str;
}
}
4. 協(xié)議
/**
* 包協(xié)議
*/
public class IMMessage {
/**
* 數(shù)據(jù)包長度
*/
private int Length=0x0;
/**
* 版本
*/
private byte Version = 0x0;
/**
* 命令
*/
private int Command;
/**
* 0x0:Json,0x1:ProtoBuf,0x2:Xml,默認:0x0
*/
private byte MsgType = 0x0;
/**
* 密鑰key | 默認0x0,明文
*/
private byte SecretKey = 0x0;
/**
* 數(shù)據(jù)
*/
private String Data;
/**
* 服務(wù)器時間(2分鐘收到服務(wù)端給的)
*/
private int timeStamp;
public int getLength() {
return Length;
}
public void setLength(int length) {
Length = length;
}
public byte getVersion() {
return Version;
}
public void setVersion(byte version) {
Version = version;
}
public int getCommand() {
return Command;
}
public void setCommand(int command) {
Command = command;
}
public byte getMsgType() {
return MsgType;
}
public void setMsgType(byte msgType) {
MsgType = msgType;
}
public byte getSecretKey() {
return SecretKey;
}
public void setSecretKey(byte secretKey) {
SecretKey = secretKey;
}
public String getData() {
return Data;
}
public void setData(String data) {
Data = data;
}
public void setData(BaseSocketBody body){
Gson gson = new Gson();
this.Data = gson.toJson(body);
}
public int getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(int timeStamp) {
this.timeStamp = timeStamp;
}
@Override
public String toString() {
return "IMMessage{" +
"Length=" + Length +
", Version=" + Version +
", Command=" + Command +
", MsgType=" + MsgType +
", SecretKey=" + SecretKey +
", Data='" + Data + '\'' +
", timeStamp=" + timeStamp +
'}';
}
}