一 協(xié)議體
public class RemotingCommand {
private int code;//請求碼
private LanguageCode language = LanguageCode.JAVA;//語言
private int version = 0;//rocketmq版本
private int opaque = requestId.getAndIncrement();//請求id
private int flag = 0;//報文類型標記
private String remark;//描述信息
private HashMap<String, String> extFields;//customHeader的屬性信息
private transient CommandCustomHeader customHeader;
//序列化類型
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
//之前的部分為協(xié)議報文頭
//報文體
private transient byte[] body;
}
private static void setCmdVersion(RemotingCommand cmd) {
if (configVersion >= 0) {
cmd.setVersion(configVersion);
} else {
String v = System.getProperty(REMOTING_VERSION_KEY);
if (v != null) {
int value = Integer.parseInt(v);
cmd.setVersion(value);
configVersion = value;
}
}
}
- createResponseCommand()創(chuàng)建報文體
1.1 編碼
- 報文序列化格式為:總長度|序列化類型+頭長度|頭|消息體旋圆,計算總長度,序列化數據
- 計算總長度,依次存儲數據民傻。
public ByteBuffer encode() {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
if (this.body != null) {
length += body.length;
}
ByteBuffer result = ByteBuffer.allocate(4 + length);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
// body data;
if (this.body != null) {
result.put(this.body);
}
result.flip();
return result;
}
private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}
-
makeCustomHeaderToNet()
把customHeader屬性信息轉換成報文頭的擴展域數據
-
markProtocolType(headerData.length, serializeTypeCurrentRPC)
没龙,第一位存儲序列化類型守屉,后三位存儲報文頭長度
1.2 解碼
- 獲取總長度信息,分配空間存儲數據
- 獲取序列化類型谨垃,反序列化報文頭信息
- 計算報文體長度垢袱,獲取報文體信息
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
二 序列化
public enum SerializeType {
JSON((byte) 0),
ROCKETMQ((byte) 1);
private byte code;
SerializeType(byte code) {
this.code = code;
}
public static SerializeType valueOf(byte code) {
for (SerializeType serializeType : SerializeType.values()) {
if (serializeType.getCode() == code) {
return serializeType;
}
}
return null;
}
public byte getCode() {
return code;
}
}
2.1 JSON
public static byte[] encode(final Object obj) {
final String json = toJson(obj, false);
if (json != null) {
return json.getBytes(CHARSET_UTF8);
}
return null;
}
public static String toJson(final Object obj, boolean prettyFormat) {
return JSON.toJSONString(obj, prettyFormat);
}
public static <T> T decode(final byte[] data, Class<T> classOfT) {
final String json = new String(data, CHARSET_UTF8);
return fromJson(json, classOfT);
}
public static <T> T fromJson(String json, Class<T> classOfT) {
return JSON.parseObject(json, classOfT);
}
2.2 ROCKETMQ
2.2.1 編碼
// String remark
byte[] remarkBytes = null;
int remarkLen = 0;
if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {
remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8);
remarkLen = remarkBytes.length;
}
// HashMap<String, String> extFields
byte[] extFieldsBytes = null;
int extLen = 0;
if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) {
extFieldsBytes = mapSerialize(cmd.getExtFields());
extLen = extFieldsBytes.length;
}
private static int calTotalLen(int remark, int ext) {
// int code(~32767)
int length = 2
// LanguageCode language
+ 1
// int version(~32767)
+ 2
// int opaque
+ 4
// int flag
+ 4
// String remark
+ 4 + remark
// HashMap<String, String> extFields
+ 4 + ext;
return length;
}
- 分配內存
ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
- 寫入報文數據
// int code(~32767),請求編碼
headerBuffer.putShort((short) cmd.getCode());
// LanguageCode language
headerBuffer.put(cmd.getLanguage().getCode());
// int version(~32767)港柜,mcq版本
headerBuffer.putShort((short) cmd.getVersion());
// int opaque请契,請求id
headerBuffer.putInt(cmd.getOpaque());
// int flag,報文類型
headerBuffer.putInt(cmd.getFlag());
// String remark夏醉,描述信息爽锥。序列化時先寫入長度,后寫入數據
if (remarkBytes != null) {
headerBuffer.putInt(remarkBytes.length);
headerBuffer.put(remarkBytes);
} else {
headerBuffer.putInt(0);
}
// HashMap<String, String> extFields; 擴展信息畔柔。序列化時先寫入長度氯夷,后寫入數據
if (extFieldsBytes != null) {
headerBuffer.putInt(extFieldsBytes.length);
headerBuffer.put(extFieldsBytes);
} else {
headerBuffer.putInt(0);
}
return headerBuffer.array();
- flag標識報文類型,
第一位為1表示響應報文靶擦,0表示請求報文
第二位為1表示單向報文腮考,0表示雙向報文
RPC_TYPE = 0;
RPC_ONEWAY = 1;
public void markResponseType() {
int bits = 1 << RPC_TYPE;
this.flag |= bits;
}
public void markOnewayRPC() {
int bits = 1 << RPC_ONEWAY;
this.flag |= bits;
}
2.2.1.1 mapSerialize
public static byte[] mapSerialize(HashMap<String, String> map) {
// keySize+key+valSize+val
if (null == map || map.isEmpty())
return null;
int totalLength = 0;
int kvLength;
Iterator<Map.Entry<String, String>> it = map.entrySet().iterator();
//計算序列化所需的總長度
while (it.hasNext()) {
Map.Entry<String, String> entry = it.next();
if (entry.getKey() != null && entry.getValue() != null) {
kvLength =
// keySize + Key
2 + entry.getKey().getBytes(CHARSET_UTF8).length
// valSize + val
+ 4 + entry.getValue().getBytes(CHARSET_UTF8).length;
totalLength += kvLength;
}
}
ByteBuffer content = ByteBuffer.allocate(totalLength);
byte[] key;
byte[] val;
it = map.entrySet().iterator();
//序列化存儲,key長度+key值+val長度+val值
while (it.hasNext()) {
Map.Entry<String, String> entry = it.next();
if (entry.getKey() != null && entry.getValue() != null) {
key = entry.getKey().getBytes(CHARSET_UTF8);
val = entry.getValue().getBytes(CHARSET_UTF8);
content.putShort((short) key.length);
content.put(key);
content.putInt(val.length);
content.put(val);
}
}
return content.array();
}
2.2.2 解碼
- 根據編碼規(guī)則反向解析數據為結構化的對象玄捕。