mqtt離線消息的實現(xiàn)

在介紹mqtt離線消息之前夜矗,先了解下mqtt協(xié)議的幾個概念:
QoS(Quality of Service)
指代消息傳輸?shù)姆?wù)質(zhì)量。它包括以下級別:

服務(wù)質(zhì)量 具體含義
QoS0 代表最多分發(fā)一次
QoS1 代表至少達(dá)到一次
QoS2 代表僅分發(fā)一次

cleanSession
cleanSession 標(biāo)志是 MQTT 協(xié)議中對一個客戶端建立 TCP 連接后是否關(guān)心之前狀態(tài)的定義泻肯。具體語義如下:

cleanSession 具體含義
true 非持久化連接佳励,客戶端再次上線時,將不再關(guān)心之前所有的訂閱關(guān)系以及離線消息
false 持久化連接富腊,客戶端再次上線時坏逢,還需要處理之前的離線消息,而之前的訂閱關(guān)系也會持續(xù)生效

QoS 和 cleanSession 的不同組合產(chǎn)生的結(jié)果如下表所示:

QoS 級別 cleanSession=true cleanSession=false
QoS0 無離線消息赘被,在線消息只嘗試推一次 無離線消息是整,在線消息只嘗試推一次
QoS1 無離線消息,在線消息保證可達(dá) 有離線消息浮入,所有消息保證可達(dá)
QoS2 無離線消息,在線消息保證只推一次 有離線消息羊异,所有消息保證只推一次

對于 QoS > 0的消息,如果是持久化連接野舶,當(dāng)客戶端不在線時,發(fā)送消息會保存離線消息到broker平道,當(dāng)客戶端上線時,mqtt會從broker拉取消息推送給客戶端。
mqtt 離線消息實現(xiàn)相關(guān)的主要存儲結(jié)構(gòu)以及作用為:
inflightWindow :
為了提高消息吞吐效率和減少網(wǎng)絡(luò)波動帶來的影響窘疮,已發(fā)送但未確認(rèn)的報文將被存放在 inflightWindow 中直至完成確認(rèn)從inflightWindow 中移除,key為packetId(報文標(biāo)識符)

ConcurrentHashMap<Integer, InflightMessage> inflightWindow  

consumeOffsetTable:內(nèi)存中保存了消息的消費(fèi)進(jìn)度,當(dāng)客戶端斷開連接闸衫,從內(nèi)存中刪除,再次上線會重新創(chuàng)建

ConcurrentHashMap<String /*broker^rootTopic*/, Map<String /*queueId^clientId*/, Long/*consumeOffset*/>> consumeOffsetTable

最初的離線消息的方案:


mqtt離線消息實現(xiàn).png

這個方案在正常收發(fā)以及在producer先發(fā)送完所有的消息然后consumer上線拉取離線消息時是不會有問題的蔚出,之前測試也沒有發(fā)現(xiàn)問題,但是在以下場景進(jìn)行離線消息推送會存在問題身冬,最近一次變更測試中岔乔,測試了以下場景:
producer發(fā)送2000條消息
1、qos =1雏门、 cleansession=false的consumer訂閱了一個topicA
2嘿歌、producer發(fā)送500條消息時,consumer斷開連接茁影,到1000條消息的時候consumer再次連接宙帝,這個過程中producer一直發(fā)送消息,此時離線消息和在線消息會一起推送募闲,前500條消息步脓,后1000條消息是在線消息,中間還有500條是離線消息浩螺。
這個場景測試會有大量消息丟失和很多重復(fù)消息靴患,經(jīng)過打印大量日志分析造成這個現(xiàn)象的有兩個地方代碼邏輯需要優(yōu)化:
1、客戶端在線時要出,推送消息時鸳君,消息放入inflightWindow ,收到客戶端的ACK,從inflightWindow 中移除該消息患蹂,更新consumeOffsetTable或颊,當(dāng)客戶端掉線會更新消費(fèi)進(jìn)度到redis中
問題:先收到的ACK刪除的消息的消費(fèi)進(jìn)度不一定時最大的,比如如下圖所示


inflightWindow 中消息

當(dāng)客戶端收到消息進(jìn)度為5的消息ACK時传于,consumeOffsetTable消費(fèi)進(jìn)度更新到5,此時客戶端掉線囱挑,consumeOffsetTable消費(fèi)進(jìn)度5會更新到redis中,而消費(fèi)進(jìn)度6格了、7看铆、8的消息會作為 NOT ACK的消息持久化到redis中,當(dāng)客戶端在次上線時盛末,會先取出NOT ACK的消息發(fā)送弹惦,然后從broker拉取消息會從消費(fèi)進(jìn)度5開始否淤,這樣6、7棠隐、8會重復(fù)消費(fèi)石抡,一次是inflightWindow 作為未收到ACK重發(fā),還有一次是作為離線消息從broker拉取消費(fèi)
2助泽、在上面的例子中啰扛,producer發(fā)送500條消息時,consumer斷開連接嗡贺,假設(shè)此時consumeOffsetTable中offset是500隐解,斷開連接會更新到redis中,當(dāng)producer發(fā)送到1000條消息的時候consumer再次連接诫睬,會從redis中取出offset煞茫,代碼邏輯如下:

    private long calcNextOffset(ConcurrentHashMap<String, Map<String, Long>> offsetTable, String key,
        String innerKey,
        PersistService persistService) {
        if (!offsetTable.containsKey(key)) {
            long persistOffset = persistService.queryConsumeOffset(new StringBuilder().append(key).append(KEY_SEPARATOR).append(innerKey).toString());
            Map<String, Long> offsetMap = new HashMap<>();
            Map<String, Long> previous = offsetTable.putIfAbsent(key, offsetMap);
            if (previous != null) {
                offsetMap = previous;
            }
            offsetMap.putIfAbsent(innerKey, persistOffset);
        } else if (!offsetTable.get(key).containsKey(innerKey)) {
            long persistOffset = persistService.queryConsumeOffset(new StringBuilder().append(key).append(KEY_SEPARATOR).append(innerKey).toString());
            Map<String, Long> offsetMap = offsetTable.get(key);
            offsetMap.putIfAbsent(innerKey, persistOffset);
        }
        return offsetTable.get(key).get(innerKey);
    }

但是由于producer在線消息也在發(fā)送,當(dāng)consumer上線時摄凡,假設(shè)此時在線消息的offset到了1021续徽,導(dǎo)致這個離線消息從broker拉取消息的offset并不是500而是1021,從而使得500到1000之間的離線消息都會丟失钦扭,這個問題實際上就是離線消息和在線消息consumeOffset混合使用導(dǎo)致
最后為了解決上述兩個問題床绪,優(yōu)化后的方案為:

方案主要作了兩個地方的邏輯修改:
1、改進(jìn)之前consumeOffsetTable的offset更新是在MQTT收到客戶端的ACK更新消費(fèi)進(jìn)度癞己,改為在消息放入inflightWindow時就把consumeOffsetTable的消費(fèi)進(jìn)度更新,即使inflightWindow中消息沒有收到ACK,也會作為NOT ACK的消息處理末秃,不會丟失消息
2、在客戶端上線推送離線消息時惰匙,計算拉取離線消息的消費(fèi)進(jìn)度改為直接從redis中取出铃将,不必判斷consumeOffsetTable里面是否有值,避免了在線跟離線消息混合使用consumeOffsetTable绘盟,把在線跟離線的消費(fèi)進(jìn)度分開處理

   private long calcNextOffset(String key, String innerKey, PersistService persistService) {
        long offlineMsgOffset = persistService.queryConsumeOffset(new StringBuilder().append(key).append(KEY_SEPARATOR).append(innerKey).toString());
        return offlineMsgOffset + 1;
    }

改進(jìn)后的方案如下:


改進(jìn)后的MQTT離線方案

除了以上問題龄毡,還有一個目前待MQTT離線消息解決的問題,先介紹一下這個問題祭隔,拉取離線消息主要就是獲取拉取消息消費(fèi)進(jìn)度的開始結(jié)束值路操,前面的問題都是解決了消費(fèi)進(jìn)度開始值的問題,還有一個遺留問題是拉取離線消息消費(fèi)進(jìn)度結(jié)束值有問題搞坝。
出現(xiàn)這個問題場景還是上述測試場景

   private long getMaxOffset(String enodeName,
        String topic,
        int queueId) throws InterruptedException, RemotingTimeoutException, RemotingCommandException, RemotingSendRequestException, RemotingConnectException {
        GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setQueueId(queueId);
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);

     return this.mqttBridgeController.getEnodeService().getMaxOffsetInQueue(enodeName, topic, request);
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末瞄沙,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子申尼,更是在濱河造成了極大的恐慌,老刑警劉巖粟按,帶你破解...
    沈念sama閱讀 207,113評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件霹粥,死亡現(xiàn)場離奇詭異,居然都是意外死亡庙曙,警方通過查閱死者的電腦和手機(jī)浩淘,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評論 2 381
  • 文/潘曉璐 我一進(jìn)店門张抄,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人左驾,你說我怎么就攤上這事」钣遥” “怎么了?”我有些...
    開封第一講書人閱讀 153,340評論 0 344
  • 文/不壞的土叔 我叫張陵嗜闻,是天一觀的道長琉雳。 經(jīng)常有香客問我友瘤,道長,這世上最難降的妖魔是什么束倍? 我笑而不...
    開封第一講書人閱讀 55,449評論 1 279
  • 正文 為了忘掉前任盟戏,我火速辦了婚禮,結(jié)果婚禮上邮旷,老公的妹妹穿的比我還像新娘蝇摸。我一直安慰自己,他們只是感情好律歼,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,445評論 5 374
  • 文/花漫 我一把揭開白布啡专。 她就那樣靜靜地躺著,像睡著了一般辱揭。 火紅的嫁衣襯著肌膚如雪病附。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,166評論 1 284
  • 那天域庇,我揣著相機(jī)與錄音,去河邊找鬼听皿。 笑死,一個胖子當(dāng)著我的面吹牛庵朝,可吹牛的內(nèi)容都是我干的又厉。 我是一名探鬼主播,決...
    沈念sama閱讀 38,442評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼侄旬,長吁一口氣:“原來是場噩夢啊……” “哼煌妈!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起汰蜘,我...
    開封第一講書人閱讀 37,105評論 0 261
  • 序言:老撾萬榮一對情侶失蹤之宿,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體炕婶,經(jīng)...
    沈念sama閱讀 43,601評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡柠掂,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,066評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了枪狂。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片宋渔。...
    茶點(diǎn)故事閱讀 38,161評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖严蓖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情颗胡,我是刑警寧澤,帶...
    沈念sama閱讀 33,792評論 4 323
  • 正文 年R本政府宣布哑蔫,位于F島的核電站弧呐,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏泉懦。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,351評論 3 307
  • 文/蒙蒙 一巡球、第九天 我趴在偏房一處隱蔽的房頂上張望酣栈。 院中可真熱鬧,春花似錦矿筝、人聲如沸棚贾。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽怯伊。三九已至,卻和暖如春耿芹,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背吧秕。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評論 1 261
  • 我被黑心中介騙來泰國打工砸彬, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留疗涉,地道東北人吟秩。 一個月前我還...
    沈念sama閱讀 45,618評論 2 355
  • 正文 我出身青樓涵防,卻偏偏與公主長得像,于是被迫代替她去往敵國和親壮池。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,916評論 2 344

推薦閱讀更多精彩內(nèi)容