概述:
每個(gè) MQTT 控制報(bào)文都包含一個(gè)固定報(bào)頭猬腰。固定報(bào)頭由兩個(gè)字節(jié)組成,第一個(gè)字節(jié)的高4位為報(bào)文類型悍缠,低4位為報(bào)文標(biāo)志卦绣;第二個(gè)字節(jié)開始為剩余長度,最多4個(gè)字節(jié)飞蚓,也就是說剩余長度最少占用一個(gè)字節(jié)滤港,最多占用4個(gè)字節(jié)。由此可以看出, MQTT 報(bào)文最少就只有兩個(gè)字節(jié)溅漾,確實(shí)是個(gè)很輕量級(jí)的協(xié)議了山叮。
MQTT 規(guī)范:https://github.com/mcxiaoke/mqtt/blob/master/mqtt/02-ControlPacketFormat.md
代碼實(shí)現(xiàn):
在 paho 中關(guān)于 MQTT 報(bào)文的編碼解碼的實(shí)現(xiàn)主要是 MqttWireMessage 類及其子類。MqttWireMessage 是個(gè)抽象類添履,里面存在兩個(gè)未實(shí)現(xiàn)的抽象方法
getMessageInfo()
和getVariableHeader()
屁倔,通過跟蹤代碼我們可以發(fā)現(xiàn)getHeader()
與createWireMessage(InputStream inputStream)
是編解碼的核心實(shí)現(xiàn)方法。
// MqttWireMessage#getHeader
public byte[] getHeader() throws MqttException {
try {
// 這行代碼主要是計(jì)算出固定報(bào)頭的第一個(gè)字節(jié)暮胧,
int first = ((getType() & 0x0f) << 4) ^ (getMessageInfo() & 0x0f);
// 可變報(bào)頭通過調(diào)用子類實(shí)現(xiàn)獲取
byte[] varHeader = getVariableHeader();
// 剩余長度=可變報(bào)頭長度+有效荷載的長度
int remLen = varHeader.length + getPayload().length;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
dos.writeByte(first);
// 這里調(diào)用了encodeMBI方法來編碼剩余長度
dos.write(encodeMBI(remLen));
dos.write(varHeader);
dos.flush();
return baos.toByteArray();
} catch (IOException ioe) {
throw new MqttException(ioe);
}
}
// MqttWireMessage#createWireMessage
private static MqttWireMessage createWireMessage(InputStream inputStream) throws MqttException {
try {
CountingInputStream counter = new CountingInputStream(inputStream);
DataInputStream in = new DataInputStream(counter);
// 讀取第一個(gè)字節(jié)锐借,并解析
int first = in.readUnsignedByte();
// 報(bào)文類型
byte type = (byte) (first >> 4);
// 標(biāo)記位,不同的報(bào)文會(huì)由不同的涵義往衷,交給子類來處理
byte info = (byte) (first &= 0x0f);
// 這里調(diào)用了readMBI方法來解碼剩余長度
long remLen = readMBI(in).getValue();
long totalToRead = counter.getCounter() + remLen;
MqttWireMessage result;
long remainder = totalToRead - counter.getCounter();
// 剩余有效數(shù)據(jù)钞翔,包括可變報(bào)頭和有效荷載,交由子類處理
byte[] data = new byte[0];
// The remaining bytes must be the payload...
if (remainder > 0) {
data = new byte[(int) remainder];
// 讀取剩余有效數(shù)據(jù)
in.readFully(data, 0, data.length);
}
// 根據(jù)類型構(gòu)造不同的消息
if (type == MqttWireMessage.MESSAGE_TYPE_CONNECT) {
result = new MqttConnect(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_PUBLISH) {
result = new MqttPublish(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_PUBACK) {
result = new MqttPubAck(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_PUBCOMP) {
result = new MqttPubComp(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_CONNACK) {
result = new MqttConnack(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_PINGREQ) {
result = new MqttPingReq(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_PINGRESP) {
result = new MqttPingResp(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_SUBSCRIBE) {
result = new MqttSubscribe(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_SUBACK) {
result = new MqttSuback(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_UNSUBSCRIBE) {
result = new MqttUnsubscribe(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_UNSUBACK) {
result = new MqttUnsubAck(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_PUBREL) {
result = new MqttPubRel(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_PUBREC) {
result = new MqttPubRec(info, data);
} else if (type == MqttWireMessage.MESSAGE_TYPE_DISCONNECT) {
result = new MqttDisconnect(info, data);
} else {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
}
return result;
} catch (IOException io) {
throw new MqttException(io);
}
}
// MqttWireMessage#encodeMBI
public static byte[] encodeMBI(long number) {
validateVariableByteInt((int) number);
int numBytes = 0;
long no = number;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
// Encode the remaining length fields in the four bytes
do {
byte digit = (byte) (no % 128);
no = no / 128;
if (no > 0) {
digit |= 0x80;
}
bos.write(digit);
numBytes++;
} while ((no > 0) && (numBytes < 4));
return bos.toByteArray();
}
// MqttWireMessage#readMBI
public static MultiByteInteger readMBI(DataInputStream in) throws IOException {
byte digit;
int msgLength = 0;
int multiplier = 1;
int count = 0;
do {
digit = in.readByte();
count++;
msgLength += ((digit & 0x7F) * multiplier);
multiplier *= 128; // multiplier <<= 7;
} while ((digit & 0x80) != 0);
if (msgLength < 0 || msgLength > VARIABLE_BYTE_INT_MAX) {
throw new IOException("This property must be a number between 0 and " + VARIABLE_BYTE_INT_MAX
+ ". Read value was: " + msgLength);
}
return new MultiByteInteger(msgLength, count);
}