最近 ONS 消息堆積的很嚴(yán)重濒旦,并且經(jīng)常發(fā)現(xiàn)部分幾乎沒有消息消費(fèi)的消費(fèi)者也提示堆積比伏,所以有必要深入了解一下
RocketMQ 的設(shè)計(jì)思路,來(lái)看看堆積量如何計(jì)算户魏,以及如何正確的使用 Topic 以及 Consumer 等組件澡刹。
產(chǎn)生的問(wèn)題背景在于呻征,由于一開始對(duì)于RocketMQ
不夠了解,同時(shí)足夠懶得原因罢浇,導(dǎo)致我們所有業(yè)務(wù)都僅適用了一個(gè)topic
陆赋,所有業(yè)務(wù)線通過(guò)訂閱不同的tag
來(lái)進(jìn)行消費(fèi)边篮,本次深入了解后將進(jìn)行業(yè)務(wù)重構(gòu),以正確的姿勢(shì)使用RocketMQ
奏甫。
本次要排查的問(wèn)題包括:
1戈轿、消息拉取時(shí)模型,是否會(huì)將非該消費(fèi)者消息的消息也拉取到客戶端阵子?
2思杯、如何計(jì)算堆積?
問(wèn)題1的本質(zhì)問(wèn)題是消息拉取的過(guò)濾模型在于客戶端挠进,還是在服務(wù)端色乾?問(wèn)題2的本質(zhì)問(wèn)題是消息如何存儲(chǔ)計(jì)算?欲探究該問(wèn)題則需要明確RocketMQ
的底層存儲(chǔ)模型設(shè)計(jì)领突,從頂層設(shè)計(jì)俯瞰消息隊(duì)列整個(gè)框架暖璧。
底層存儲(chǔ)模型
commitlog
是整個(gè)消息隊(duì)列存儲(chǔ)的核心文件,而consumerquque
是邏輯消息隊(duì)列君旦,主要存儲(chǔ)commitlog offset
澎办,消息長(zhǎng)度
,tag的hashcode
金砍,用于在消息消費(fèi)時(shí)快速定位消息在commit log
文件位置局蚀,便于讀取消息。IndexFile
俗稱索引文件恕稠,主要存儲(chǔ)消息key的hashcode
以及commitlog offset
琅绅,用于通過(guò)key快速定位到消息在commit log
文件位置,便于讀取消息鹅巍。
消息拉取模型分析
找到問(wèn)題1的答案之前千扶,先思考消息隊(duì)列投遞時(shí)做了什么?
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
以上是代碼是從官網(wǎng)的地址copy而來(lái)骆捧,雖簡(jiǎn)單但是從其中足以找到消息投遞時(shí)所需要的基本條件包括namesrvAddr
澎羞、topic
、tag
凑懂。
消息投遞
// DefaultProducerImpl#sendDefaultImpl()
// 省略大部分代碼煤痕,關(guān)鍵看備注部分
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());// 從本地緩存或namesrv遠(yuǎn)程讀取topic信息
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();‘
// 根據(jù)某種策略選擇一個(gè)邏輯消息隊(duì)列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
從文中可以看到,在消息投遞的過(guò)程中接谨,已經(jīng)在客戶端通過(guò)某種策略找到指定的topic
下的邏輯隊(duì)列,邏輯隊(duì)列具體指的是consumerqueue
文件塘匣,服務(wù)端對(duì)應(yīng)的處理主要是寫入脓豪,具體有興趣可以了解SendMessageProcessor
類,最終通過(guò)DefaultMessageStore
實(shí)現(xiàn)了數(shù)據(jù)的寫入忌卤,但是并未看到寫入consumerqueue
扫夜,因?yàn)閷?shí)現(xiàn)consumerqueue
文件寫入是通過(guò)另外的線程實(shí)現(xiàn)的,具體實(shí)現(xiàn)請(qǐng)參考ReputMessageService
,本文不再深入笤闯。
我們主要知道堕阔,在客戶端除了上傳基本屬性數(shù)據(jù)之外,同時(shí)還在客戶端選擇好了將要寫入的邏輯消息隊(duì)列颗味。
消息拉取
消息的拉取在客戶端就不進(jìn)行贅述了超陆,主要看服務(wù)端的實(shí)現(xiàn)。有興趣可以了解PullMessageService#run()
浦马。服務(wù)端則重點(diǎn)查閱PullMessageProcessor#processRequest()
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
} else {
// 構(gòu)建消息過(guò)濾
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
}
// 消息過(guò)濾的核心源碼在ExpressionMessageFilter#isMatchedByConsumeQueue方法
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (null == subscriptionData) {
return true;
}
if (subscriptionData.isClassFilterMode()) {
return true;
}
// by tags code.
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
if (tagsCode == null) {
return true;
}
if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
return true;
}
// tagecode其實(shí)就是tag的hashcode
return subscriptionData.getCodeSet().contains(tagsCode.intValue());
}
/// ....
}
// 接著PullMessageProcessor#processRequest()往下看
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
// 注意該消息讀取的參數(shù)时呀,包括topic, queueid, queueoffset, 已經(jīng)消息最大條數(shù)
// 通過(guò)DefaultMessageStore#getMessage()繼續(xù)查看
// 注意,這里的offset是queueoffset晶默,而不是commitlog offset
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
// ...
// 查找consumerqueue
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
//
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long nextPhyFileStartOffset = Long.MIN_VALUE;
long maxPhyOffsetPulling = 0;
int i = 0;
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
/// .....
// 消息匹配谨娜,這個(gè)對(duì)象由前文的MessageFilter定義
if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue; //不匹配的消息則繼續(xù)往下來(lái)讀取
}
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);// offsetPy與sizePy查找commitlog上存儲(chǔ)的消息內(nèi)容
///....
}
以上源碼閱讀完后,問(wèn)題1 不攻自破磺陡,在服務(wù)端上過(guò)濾好消息趴梢,但是很明顯,查閱完整地源碼可以清晰地確定币他,并非是每一次拉取消息都可以過(guò)濾到自己想要的消息垢油,即該消費(fèi)者拉取消息時(shí)可能在某一個(gè)comsumerqueue
上拉取不到消息,因?yàn)槌涑庵粋€(gè)topic
下的其他tag
的消息圆丹,也就意味著不是每次拉取都有意義滩愁,而阿里云ONS
的計(jì)費(fèi)上明顯提示拉取消息是要計(jì)算費(fèi)用的。
消息堆積
消息堆積意為著服務(wù)端要維護(hù)消息的消費(fèi)進(jìn)度辫封。
先來(lái)看一張圖硝枉,圖中的brokerOffset - consumerOffset = diffTotal, 而diffTotal就是指堆積量倦微,而描述堆積量的指標(biāo)是消息條數(shù)妻味。
從commitlog中來(lái)看,由于存儲(chǔ)了大量的消息文件欣福,并且消息消費(fèi)是非順序消費(fèi)责球,繼而很難從commitlog中看出哪個(gè)
哪個(gè)consumer堆積量。
那么哪里可以描述清楚消息條數(shù)呢拓劝?先來(lái)深入了解Consumer Queue
的設(shè)計(jì)
ConsumerQueue
consumerqueue
的設(shè)計(jì)以topic
作為邏輯分區(qū)雏逾,每個(gè)topic
下分多個(gè)消息隊(duì)列進(jìn)行,具體多少消息隊(duì)列存儲(chǔ)參照broker
的配置參數(shù)郑临,隊(duì)列名稱以數(shù)組0開始栖博,比如配置0,1,2,3 四個(gè)消息隊(duì)列。
配置參數(shù)請(qǐng)參考BrokerConfig
厢洞,其中有一個(gè)參數(shù)private int defaultTopicQueueNums = 8;
從語(yǔ)義上理解仇让,堆積量應(yīng)該指未被消費(fèi)的存在
broker
上的消息數(shù)量典奉,這是基本認(rèn)知。
commitlog
存儲(chǔ)著broker
上所有的消息丧叽,設(shè)想一下如果每次要查詢消息并消費(fèi)需要從該文件遍歷查詢卫玖,性能之差可想
而知,為了提高查詢的消息踊淳,優(yōu)先想到的是諸如MySQL
上的索引設(shè)計(jì)假瞬。同理,consumerqueue
的設(shè)計(jì)之初就是為了
快速定位到對(duì)應(yīng)的消費(fèi)者可以消費(fèi)的消息嚣崭,當(dāng)然RocketMQ
也提供了indexfile
笨触,俗稱索引文件,主要是解決通過(guò)key
快速定位消息的方式雹舀。
consumerqueue 消息結(jié)構(gòu)
在consumerqueue
的結(jié)構(gòu)設(shè)計(jì)芦劣,在consumequeue
的條目設(shè)計(jì)是固定的,并且它整好對(duì)應(yīng)一條消息说榆。consumerqueue
單個(gè)文件默認(rèn)是30w個(gè)條目虚吟,單個(gè)文件長(zhǎng)度30w * 20字節(jié)。從文件的存儲(chǔ)模型可以看出签财,consumerqueue
存儲(chǔ)維度是topic
串慰,并非是consumer
。那么如何找到consumer
的堆積量唱蒸?
假設(shè)
假設(shè)一個(gè)topic
對(duì)應(yīng)一個(gè)consumer
邦鲫,topic
的堆積量即consumer
的堆積量。從這個(gè)維度來(lái)推理神汹,前文提到部分consumer
是幾乎沒有消息庆捺,但是卻提示消息堆積即合理,因?yàn)槎逊e的消息并非是該consumer
的需要消費(fèi)的消息屁魏,而是該consumerqueue
對(duì)應(yīng)的topic
的堆積
論證過(guò)程
從rocketmq console
后臺(tái)看到的消費(fèi)者的堆積數(shù)量滔以,看到AdminBrokerProcess#getConsumeStats()
。
private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,
// ...
for (String topic : topics) {
// ...
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue();
mq.setTopic(topic);
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
mq.setQueueId(i);
OffsetWrapper offsetWrapper = new OffsetWrapper();
// 核心的問(wèn)題在于要確定brokerOffset 以及consumerOffset的語(yǔ)義
long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
if (brokerOffset < 0)
brokerOffset = 0;
long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
requestHeader.getConsumerGroup(),
topic,
i);
if (consumerOffset < 0)
consumerOffset = 0;
// ....
}
// 隊(duì)列最大索引
public long getMaxOffsetInQueue(String topic, int queueId) {
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
long offset = logic.getMaxOffsetInQueue();
return offset;
}
return 0;
}
public long getMaxOffsetInQueue() {
return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
// 總的邏輯偏移量 / 20 = 總的消息條數(shù)
}
public static final int CQ_STORE_UNIT_SIZE = 20;// 前文提到每個(gè)條目固定20個(gè)字節(jié)
// 當(dāng)前消費(fèi)者的消費(fèi)進(jìn)度
long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),topic,i);
if (consumerOffset < 0)
consumerOffset = 0;
public long queryOffset(final String group, final String topic, final int queueId) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);// 從offsetTable中讀取
if (null != map) {
Long offset = map.get(queueId);
if (offset != null)
return offset;
}
return -1;
}
核心的問(wèn)題在于從offset緩存中讀取出來(lái)的氓拼,那么offset的數(shù)據(jù) 又是哪里來(lái)的你画?
// 通過(guò)IDE快速可以很快找到如下代碼
@Override
public String configFilePath() {
return
BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().
getStorePathRootDir());
}
@Override
public void decode(String jsonString) {
if (jsonString != null) {
ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString,ConsumerOffsetManager.class);
if (obj != null) {
this.offsetTable = obj.offsetTable;
}
}
}
public static String getConsumerOffsetPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
}
也就是說(shuō)offset的數(shù)據(jù)是從json文件中加載進(jìn)來(lái)的。
這個(gè)文件描述的是topic與消費(fèi)者的關(guān)系桃漾,每一個(gè)隊(duì)列對(duì)應(yīng)的消費(fèi)進(jìn)度坏匪。但是消費(fèi)是實(shí)時(shí)更新的,所以必須實(shí)時(shí)更新消費(fèi)進(jìn)度呈队,消費(fèi)進(jìn)度的更新是從消息的拉取得到的剥槐。
DefaultStoreMessage
前文看過(guò)該類的部分代碼,主要是拉取的部分宪摧,這里補(bǔ)充拉取時(shí)的offset的值得語(yǔ)義粒竖。
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
// ...
// offsetPy 是commitlog的邏輯偏移量
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
continue;
}
// 消息過(guò)濾
if (messageFilter != null
&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
// release...
selectResult.release();
continue;
}
// ....
}
// ...
//
// 計(jì)算下一次開始的offset,是前文的offset
// i 是ConsumeQueue.CQ_STORE_UNIT_SIZE的倍數(shù)
// ConsumeQueue.CQ_STORE_UNIT_SIZE是每一條consumerqueue中的條目的大小几于,20字節(jié)
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
看到此處蕊苗,可以明確消費(fèi)者拉取消息時(shí)的nextBeginOffset
就是consumerqueue
的偏移量/20,意味著類似下標(biāo)數(shù)組index
沿彭。
到此處還要再確認(rèn)拉取的這個(gè)消費(fèi)進(jìn)度是不是會(huì)更新到到offsetTable
朽砰?核心看RemoteBrokerOffsetStore
類
消息消費(fèi)
貼幾張圖簡(jiǎn)單了解客戶端上報(bào)消費(fèi)進(jìn)度的過(guò)程
至此,可以看到堆積量的實(shí)際是根據(jù)topic來(lái)算喉刘,按照前文最開始的假設(shè)推斷其實(shí)是成立的瞧柔,那么現(xiàn)在那些沒有消息堆積的消息為何還會(huì)顯示堆積就可以理解了。
總結(jié)
消息消費(fèi)屬于服務(wù)端過(guò)濾模式睦裳,不過(guò)其實(shí)還要其他的消息過(guò)濾模式
造锅,只是本文并未提及(Class
)。但是由于topic
使用的不合理導(dǎo)致消息可能存在拉取不到數(shù)據(jù)廉邑,但是ONS是計(jì)算收費(fèi)的哥蔚。同時(shí)消息的堆積意義明朗,那么使用RocketMQ
的姿勢(shì)也就不言而喻蛛蒙,按照業(yè)務(wù)合理使用topic
以及tag
等糙箍。
參考資料
源碼:https://github.com/apache/rocketmq
官網(wǎng):http://rocketmq.apache.org/docs/rmq-deployment/
書籍:《RocketMQ技術(shù)內(nèi)幕》,特別推薦該書牵祟,讓你對(duì)RocketMQ
的架構(gòu)設(shè)計(jì)深夯,代碼有更深的了解