在介紹mqtt離線消息之前夜矗,先了解下mqtt協(xié)議的幾個概念:
QoS(Quality of Service)
指代消息傳輸?shù)姆?wù)質(zhì)量。它包括以下級別:
服務(wù)質(zhì)量 | 具體含義 |
---|---|
QoS0 | 代表最多分發(fā)一次 |
QoS1 | 代表至少達(dá)到一次 |
QoS2 | 代表僅分發(fā)一次 |
cleanSession
cleanSession 標(biāo)志是 MQTT 協(xié)議中對一個客戶端建立 TCP 連接后是否關(guān)心之前狀態(tài)的定義泻肯。具體語義如下:
cleanSession | 具體含義 |
---|---|
true | 非持久化連接佳励,客戶端再次上線時,將不再關(guān)心之前所有的訂閱關(guān)系以及離線消息 |
false | 持久化連接富腊,客戶端再次上線時坏逢,還需要處理之前的離線消息,而之前的訂閱關(guān)系也會持續(xù)生效 |
QoS 和 cleanSession 的不同組合產(chǎn)生的結(jié)果如下表所示:
QoS 級別 | cleanSession=true | cleanSession=false |
---|---|---|
QoS0 | 無離線消息赘被,在線消息只嘗試推一次 | 無離線消息是整,在線消息只嘗試推一次 |
QoS1 | 無離線消息,在線消息保證可達(dá) | 有離線消息浮入,所有消息保證可達(dá) |
QoS2 | 無離線消息,在線消息保證只推一次 | 有離線消息羊异,所有消息保證只推一次 |
對于 QoS > 0的消息,如果是持久化連接野舶,當(dāng)客戶端不在線時,發(fā)送消息會保存離線消息到broker平道,當(dāng)客戶端上線時,mqtt會從broker拉取消息推送給客戶端。
mqtt 離線消息實現(xiàn)相關(guān)的主要存儲結(jié)構(gòu)以及作用為:
inflightWindow :
為了提高消息吞吐效率和減少網(wǎng)絡(luò)波動帶來的影響窘疮,已發(fā)送但未確認(rèn)的報文將被存放在 inflightWindow 中直至完成確認(rèn)從inflightWindow 中移除,key為packetId(報文標(biāo)識符)
ConcurrentHashMap<Integer, InflightMessage> inflightWindow
consumeOffsetTable:內(nèi)存中保存了消息的消費(fèi)進(jìn)度,當(dāng)客戶端斷開連接闸衫,從內(nèi)存中刪除,再次上線會重新創(chuàng)建
ConcurrentHashMap<String /*broker^rootTopic*/, Map<String /*queueId^clientId*/, Long/*consumeOffset*/>> consumeOffsetTable
最初的離線消息的方案:
這個方案在正常收發(fā)以及在producer先發(fā)送完所有的消息然后consumer上線拉取離線消息時是不會有問題的蔚出,之前測試也沒有發(fā)現(xiàn)問題,但是在以下場景進(jìn)行離線消息推送會存在問題身冬,最近一次變更測試中岔乔,測試了以下場景:
producer發(fā)送2000條消息
1、qos =1雏门、 cleansession=false的consumer訂閱了一個topicA
2嘿歌、producer發(fā)送500條消息時,consumer斷開連接茁影,到1000條消息的時候consumer再次連接宙帝,這個過程中producer一直發(fā)送消息,此時離線消息和在線消息會一起推送募闲,前500條消息步脓,后1000條消息是在線消息,中間還有500條是離線消息浩螺。
這個場景測試會有大量消息丟失和很多重復(fù)消息靴患,經(jīng)過打印大量日志分析造成這個現(xiàn)象的有兩個地方代碼邏輯需要優(yōu)化:
1、客戶端在線時要出,推送消息時鸳君,消息放入inflightWindow ,收到客戶端的ACK,從inflightWindow 中移除該消息患蹂,更新consumeOffsetTable或颊,當(dāng)客戶端掉線會更新消費(fèi)進(jìn)度到redis中
問題:先收到的ACK刪除的消息的消費(fèi)進(jìn)度不一定時最大的,比如如下圖所示
當(dāng)客戶端收到消息進(jìn)度為5的消息ACK時传于,consumeOffsetTable消費(fèi)進(jìn)度更新到5,此時客戶端掉線囱挑,consumeOffsetTable消費(fèi)進(jìn)度5會更新到redis中,而消費(fèi)進(jìn)度6格了、7看铆、8的消息會作為 NOT ACK的消息持久化到redis中,當(dāng)客戶端在次上線時盛末,會先取出NOT ACK的消息發(fā)送弹惦,然后從broker拉取消息會從消費(fèi)進(jìn)度5開始否淤,這樣6、7棠隐、8會重復(fù)消費(fèi)石抡,一次是inflightWindow 作為未收到ACK重發(fā),還有一次是作為離線消息從broker拉取消費(fèi)
2助泽、在上面的例子中啰扛,producer發(fā)送500條消息時,consumer斷開連接嗡贺,假設(shè)此時consumeOffsetTable中offset是500隐解,斷開連接會更新到redis中,當(dāng)producer發(fā)送到1000條消息的時候consumer再次連接诫睬,會從redis中取出offset煞茫,代碼邏輯如下:
private long calcNextOffset(ConcurrentHashMap<String, Map<String, Long>> offsetTable, String key,
String innerKey,
PersistService persistService) {
if (!offsetTable.containsKey(key)) {
long persistOffset = persistService.queryConsumeOffset(new StringBuilder().append(key).append(KEY_SEPARATOR).append(innerKey).toString());
Map<String, Long> offsetMap = new HashMap<>();
Map<String, Long> previous = offsetTable.putIfAbsent(key, offsetMap);
if (previous != null) {
offsetMap = previous;
}
offsetMap.putIfAbsent(innerKey, persistOffset);
} else if (!offsetTable.get(key).containsKey(innerKey)) {
long persistOffset = persistService.queryConsumeOffset(new StringBuilder().append(key).append(KEY_SEPARATOR).append(innerKey).toString());
Map<String, Long> offsetMap = offsetTable.get(key);
offsetMap.putIfAbsent(innerKey, persistOffset);
}
return offsetTable.get(key).get(innerKey);
}
但是由于producer在線消息也在發(fā)送,當(dāng)consumer上線時摄凡,假設(shè)此時在線消息的offset到了1021续徽,導(dǎo)致這個離線消息從broker拉取消息的offset并不是500而是1021,從而使得500到1000之間的離線消息都會丟失钦扭,這個問題實際上就是離線消息和在線消息consumeOffset混合使用導(dǎo)致
最后為了解決上述兩個問題床绪,優(yōu)化后的方案為:
方案主要作了兩個地方的邏輯修改:
1、改進(jìn)之前consumeOffsetTable的offset更新是在MQTT收到客戶端的ACK更新消費(fèi)進(jìn)度癞己,改為在消息放入inflightWindow時就把consumeOffsetTable的消費(fèi)進(jìn)度更新,即使inflightWindow中消息沒有收到ACK,也會作為NOT ACK的消息處理末秃,不會丟失消息
2、在客戶端上線推送離線消息時惰匙,計算拉取離線消息的消費(fèi)進(jìn)度改為直接從redis中取出铃将,不必判斷consumeOffsetTable里面是否有值,避免了在線跟離線消息混合使用consumeOffsetTable绘盟,把在線跟離線的消費(fèi)進(jìn)度分開處理
private long calcNextOffset(String key, String innerKey, PersistService persistService) {
long offlineMsgOffset = persistService.queryConsumeOffset(new StringBuilder().append(key).append(KEY_SEPARATOR).append(innerKey).toString());
return offlineMsgOffset + 1;
}
改進(jìn)后的方案如下:
除了以上問題龄毡,還有一個目前待MQTT離線消息解決的問題,先介紹一下這個問題祭隔,拉取離線消息主要就是獲取拉取消息消費(fèi)進(jìn)度的開始結(jié)束值路操,前面的問題都是解決了消費(fèi)進(jìn)度開始值的問題,還有一個遺留問題是拉取離線消息消費(fèi)進(jìn)度結(jié)束值有問題搞坝。
出現(xiàn)這個問題場景還是上述測試場景
private long getMaxOffset(String enodeName,
String topic,
int queueId) throws InterruptedException, RemotingTimeoutException, RemotingCommandException, RemotingSendRequestException, RemotingConnectException {
GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
return this.mqttBridgeController.getEnodeService().getMaxOffsetInQueue(enodeName, topic, request);
}