paho 中 MQTT 協(xié)議固定報(bào)頭實(shí)現(xiàn)

概述:

每個(gè) MQTT 控制報(bào)文都包含一個(gè)固定報(bào)頭猬腰。固定報(bào)頭由兩個(gè)字節(jié)組成,第一個(gè)字節(jié)的高4位為報(bào)文類型悍缠,低4位為報(bào)文標(biāo)志卦绣;第二個(gè)字節(jié)開始為剩余長度,最多4個(gè)字節(jié)飞蚓,也就是說剩余長度最少占用一個(gè)字節(jié)滤港,最多占用4個(gè)字節(jié)。由此可以看出, MQTT 報(bào)文最少就只有兩個(gè)字節(jié)溅漾,確實(shí)是個(gè)很輕量級(jí)的協(xié)議了山叮。

MQTT 規(guī)范:https://github.com/mcxiaoke/mqtt/blob/master/mqtt/02-ControlPacketFormat.md

代碼實(shí)現(xiàn):

在 paho 中關(guān)于 MQTT 報(bào)文的編碼解碼的實(shí)現(xiàn)主要是 MqttWireMessage 類及其子類。MqttWireMessage 是個(gè)抽象類添履,里面存在兩個(gè)未實(shí)現(xiàn)的抽象方法getMessageInfo()getVariableHeader()屁倔,通過跟蹤代碼我們可以發(fā)現(xiàn)getHeader()createWireMessage(InputStream inputStream)是編解碼的核心實(shí)現(xiàn)方法。


// MqttWireMessage#getHeader

public byte[] getHeader() throws MqttException {

    try {

        // 這行代碼主要是計(jì)算出固定報(bào)頭的第一個(gè)字節(jié)暮胧,

        int first = ((getType() & 0x0f) << 4) ^ (getMessageInfo() & 0x0f);

        // 可變報(bào)頭通過調(diào)用子類實(shí)現(xiàn)獲取

        byte[] varHeader = getVariableHeader();

        // 剩余長度=可變報(bào)頭長度+有效荷載的長度

        int remLen = varHeader.length + getPayload().length;

        ByteArrayOutputStream baos = new ByteArrayOutputStream();

        DataOutputStream dos = new DataOutputStream(baos);

        dos.writeByte(first);

        // 這里調(diào)用了encodeMBI方法來編碼剩余長度

        dos.write(encodeMBI(remLen));

        dos.write(varHeader);

        dos.flush();

        return baos.toByteArray();

    } catch (IOException ioe) {

        throw new MqttException(ioe);

    }

}

// MqttWireMessage#createWireMessage

private static MqttWireMessage createWireMessage(InputStream inputStream) throws MqttException {

    try {

        CountingInputStream counter = new CountingInputStream(inputStream);

        DataInputStream in = new DataInputStream(counter);

        // 讀取第一個(gè)字節(jié)锐借,并解析

        int first = in.readUnsignedByte();

        // 報(bào)文類型

        byte type = (byte) (first >> 4);

        // 標(biāo)記位,不同的報(bào)文會(huì)由不同的涵義往衷,交給子類來處理

        byte info = (byte) (first &= 0x0f);

        // 這里調(diào)用了readMBI方法來解碼剩余長度

        long remLen = readMBI(in).getValue();

        long totalToRead = counter.getCounter() + remLen;

        MqttWireMessage result;

        long remainder = totalToRead - counter.getCounter();

        // 剩余有效數(shù)據(jù)钞翔,包括可變報(bào)頭和有效荷載,交由子類處理

        byte[] data = new byte[0];

        // The remaining bytes must be the payload...

        if (remainder > 0) {

            data = new byte[(int) remainder];

            // 讀取剩余有效數(shù)據(jù)

            in.readFully(data, 0, data.length);

        }

        // 根據(jù)類型構(gòu)造不同的消息

        if (type == MqttWireMessage.MESSAGE_TYPE_CONNECT) {

            result = new MqttConnect(info, data);

        } else if (type == MqttWireMessage.MESSAGE_TYPE_PUBLISH) {

            result = new MqttPublish(info, data);

        } else if (type == MqttWireMessage.MESSAGE_TYPE_PUBACK) {

            result = new MqttPubAck(info, data);

        } else if (type == MqttWireMessage.MESSAGE_TYPE_PUBCOMP) {

            result = new MqttPubComp(info, data);

        } else if (type == MqttWireMessage.MESSAGE_TYPE_CONNACK) {

            result = new MqttConnack(info, data);

        } else if (type == MqttWireMessage.MESSAGE_TYPE_PINGREQ) {

            result = new MqttPingReq(info, data);

        } else if (type == MqttWireMessage.MESSAGE_TYPE_PINGRESP) {

            result = new MqttPingResp(info, data);

        } else if (type == MqttWireMessage.MESSAGE_TYPE_SUBSCRIBE) {

            result = new MqttSubscribe(info, data);

        } else if (type == MqttWireMessage.MESSAGE_TYPE_SUBACK) {

            result = new MqttSuback(info, data);

        } else if (type == MqttWireMessage.MESSAGE_TYPE_UNSUBSCRIBE) {

            result = new MqttUnsubscribe(info, data);

        } else if (type == MqttWireMessage.MESSAGE_TYPE_UNSUBACK) {

            result = new MqttUnsubAck(info, data);

        } else if (type == MqttWireMessage.MESSAGE_TYPE_PUBREL) {

            result = new MqttPubRel(info, data);

        } else if (type == MqttWireMessage.MESSAGE_TYPE_PUBREC) {

            result = new MqttPubRec(info, data);

        } else if (type == MqttWireMessage.MESSAGE_TYPE_DISCONNECT) {

            result = new MqttDisconnect(info, data);

        } else {

            throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);

        }

        return result;

    } catch (IOException io) {

        throw new MqttException(io);

    }

}

// MqttWireMessage#encodeMBI

public static byte[] encodeMBI(long number) {

    validateVariableByteInt((int) number);

    int numBytes = 0;

    long no = number;

    ByteArrayOutputStream bos = new ByteArrayOutputStream();

    // Encode the remaining length fields in the four bytes

    do {

        byte digit = (byte) (no % 128);

        no = no / 128;

        if (no > 0) {

            digit |= 0x80;

        }

        bos.write(digit);

        numBytes++;

    } while ((no > 0) && (numBytes < 4));

    return bos.toByteArray();

}

// MqttWireMessage#readMBI

public static MultiByteInteger readMBI(DataInputStream in) throws IOException {

    byte digit;

    int msgLength = 0;

    int multiplier = 1;

    int count = 0;

    do {

        digit = in.readByte();

        count++;

        msgLength += ((digit & 0x7F) * multiplier);

        multiplier *= 128; // multiplier <<= 7;

    } while ((digit & 0x80) != 0);

    if (msgLength < 0 || msgLength > VARIABLE_BYTE_INT_MAX) {

        throw new IOException("This property must be a number between 0 and " + VARIABLE_BYTE_INT_MAX

                              + ". Read value was: " + msgLength);

    }

    return new MultiByteInteger(msgLength, count);

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末席舍,一起剝皮案震驚了整個(gè)濱河市布轿,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌俺亮,老刑警劉巖驮捍,帶你破解...
    沈念sama閱讀 218,858評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異脚曾,居然都是意外死亡东且,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門本讥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來珊泳,“玉大人,你說我怎么就攤上這事拷沸∩椋” “怎么了?”我有些...
    開封第一講書人閱讀 165,282評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵撞芍,是天一觀的道長秧了。 經(jīng)常有香客問我,道長序无,這世上最難降的妖魔是什么验毡? 我笑而不...
    開封第一講書人閱讀 58,842評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮帝嗡,結(jié)果婚禮上晶通,老公的妹妹穿的比我還像新娘。我一直安慰自己哟玷,他們只是感情好狮辽,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,857評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般喉脖。 火紅的嫁衣襯著肌膚如雪椰苟。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,679評(píng)論 1 305
  • 那天动看,我揣著相機(jī)與錄音尊剔,去河邊找鬼。 笑死菱皆,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的挨稿。 我是一名探鬼主播仇轻,決...
    沈念sama閱讀 40,406評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼奶甘!你這毒婦竟也來了篷店?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,311評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤臭家,失蹤者是張志新(化名)和其女友劉穎疲陕,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體钉赁,經(jīng)...
    沈念sama閱讀 45,767評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蹄殃,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了你踩。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片诅岩。...
    茶點(diǎn)故事閱讀 40,090評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖带膜,靈堂內(nèi)的尸體忽然破棺而出吩谦,到底是詐尸還是另有隱情,我是刑警寧澤膝藕,帶...
    沈念sama閱讀 35,785評(píng)論 5 346
  • 正文 年R本政府宣布式廷,位于F島的核電站,受9級(jí)特大地震影響芭挽,放射性物質(zhì)發(fā)生泄漏滑废。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,420評(píng)論 3 331
  • 文/蒙蒙 一览绿、第九天 我趴在偏房一處隱蔽的房頂上張望策严。 院中可真熱鬧,春花似錦饿敲、人聲如沸妻导。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽倔韭。三九已至术浪,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間寿酌,已是汗流浹背胰苏。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評(píng)論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留醇疼,地道東北人硕并。 一個(gè)月前我還...
    沈念sama閱讀 48,298評(píng)論 3 372
  • 正文 我出身青樓,卻偏偏與公主長得像秧荆,于是被迫代替她去往敵國和親倔毙。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,033評(píng)論 2 355