ConsumeQueue的作用
上一篇文章講到Broker在收到消息后喷屋,通過MessageStore
將消息存儲到commitLog
中拦宣,但是consumer在消費消息的時候是按照topic+queue的維度來拉取消息的诉植。為了方便讀取届腐,MessageStore
將CommitLog
中消息的offset按照topic+queueId劃分后适篙,存儲到不同的文件中往核,這就是ConsumeQueue
文件組織方式
回顧一下數(shù)據(jù)結(jié)構(gòu)圖中ConsumeQueue
相關(guān)的部分。
底層儲存跟
CommitLog
一樣使用MappedFile
嚷节,每個CQUnit
的大小是固定的聂儒,存儲了消息的offset、消息size和tagCode硫痰。存tag是為了在consumer取到消息offset后時候先根據(jù)tag做一次過濾衩婚,剩下的才需要到CommitLog
中取消息詳情。之前講過效斑,
MessageStore
通過ReputMessageService
來將消息的offset寫道ConsumeQueue
中非春,我們看下這部分代碼實現(xiàn)
ReputMessageService
這個Service是一個單線程的任務(wù),一直循環(huán)的調(diào)用doReput()
方法:
private boolean isCommitLogAvailable() {
return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}
private void doReput() {
//1缓屠、判斷commitLog的maxOffset是否比上次讀取的offset大
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
//2奇昙、從上次的結(jié)束offset開始讀取commitLog文件中的消息
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
//3、檢查message數(shù)據(jù)完整性并封裝成DispatchRequest
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
//4敌完、分發(fā)消息到CommitLogDispatcher储耐,1)構(gòu)建索引; 2)更新consumeQueue
DefaultMessageStore.this.doDispatch(dispatchRequest);
//5、分發(fā)消息到MessageArrivingListener,喚醒等待的PullReqeust接收消息,Only Master?
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
//5滨溉、更新offset
this.reputFromOffset += size;
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.addAndGet(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
//6什湘、如果讀到文件結(jié)尾,則切換到新文件
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
//7业踏、解析消息出錯禽炬,跳過。commitLog文件中消息數(shù)據(jù)損壞的情況下才會進(jìn)來
if (size > 0) {
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
} else {
doNext = false;
if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {
//8勤家、release對MappedFile的引用
result.release();
}
} else {
doNext = false;
}
}
}
/**
* 消息分發(fā)
*/
public void doDispatch(DispatchRequest req) {
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
}
}
- 第1步腹尖,每次處理完讀取消息后,都將當(dāng)前已經(jīng)處理的最大offset記錄下來,下次處理從這個offset開始讀取消息
- 第2步热幔,從commitLog文件中讀取消息詳情
- 第4步乐设,分發(fā)讀取到的消息,
MessageStore
在初始化的時候會往dispatcherList中添加兩個Dispatcher.
this.dispatcherList = new LinkedList<>();
//consumeQueue構(gòu)建Dispatcher
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
//索引更新Dispatcher
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
具體Dispatcher的處理邏輯绎巨,我們下面詳細(xì)說
- 第8步近尚,在通過commitLog讀取消息時,不會把消息數(shù)據(jù)復(fù)制到堆內(nèi)存中场勤,只是返回文件映射的byteBuffer戈锻,所以MappedFile記錄了有多少個引用,在數(shù)據(jù)使用完后需要釋放和媳。
Dispatcher構(gòu)建ConsumeQueue
CommitLogDispatcherBuildConsumeQueue
實現(xiàn)比較簡單格遭,直接調(diào)用的MessageStore
的接口
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
/** 對于非事務(wù)消息和commit事務(wù)消息 */
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
MessageStore
中的實現(xiàn):
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
//找到對應(yīng)的ComsumeQueue文件
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
前面已經(jīng)講過consumeQueue的數(shù)據(jù)存儲結(jié)構(gòu),每個topic+queueId
對應(yīng)一個ConsumeQueue
留瞳,每個ConsumeQueue
包含一系列MappedFile
拒迅。所以,這里第一步就是獲取對應(yīng)的ConsumeQueue
她倘,如果不存在的話就會新建一個璧微。后面就是調(diào)用CQ的put方法:
public void putMessagePositionInfoWrapper(DispatchRequest request) {
//1、寫入重試次數(shù)硬梁,最多30次
final int maxRetries = 30;
//2前硫、判斷CQ是否是可寫的
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
for (int i = 0; i < maxRetries && canWrite; i++) {
long tagsCode = request.getTagsCode();
if (isExtWriteEnable()) {
//3、如果需要寫ext文件荧止,則將消息的tagscode寫入
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());
long extAddr = this.consumeQueueExt.put(cqExtUnit);
if (isExtAddr(extAddr)) {
tagsCode = extAddr;
} else {
log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
topic, queueId, request.getCommitLogOffset());
}
}
//4开瞭、寫入文件
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
if (result) {
//5、記錄check point
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return;
} else {
...
...
}
}
...
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
- 第3步罩息,將tagcode和bitMap記錄進(jìn)CQExt文件中,這個是一個過濾的擴(kuò)展功能个扰,采用的bloom過濾器先記錄消息的bitMap瓷炮,這樣consumer來讀取消息時先通過bloom過濾器判斷是否有符合過濾條件的消息
- 第4步,將消息offset寫入CQ文件中递宅,這邊代碼如下:
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
if (offset <= this.maxPhysicOffset) {
return true;
}
//一個CQUnit的大小是固定的20字節(jié)
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
//獲取最后一個MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
//對新創(chuàng)建的文件娘香,寫將所有CQUnit初始化0值
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
this.maxPhysicOffset = offset;
//CQUnit寫入文件中
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
寫文件的邏輯和寫CommitLog的邏輯是一樣的,首先封裝一個CQUnit办龄,這里面offset占8個字節(jié)烘绽,消息size占用4個字節(jié),tagcode占用8個字節(jié)俐填。然后找最后一個MappedFile安接,對于新建的文件,會有一個預(yù)熱的動作英融,寫把所有CQUnit初始化成0值盏檐。最后將Unit寫入到文件中歇式。
總結(jié)
ConsumeQueue
文件數(shù)據(jù)生成的整個步驟就講到這里了。Consumer來讀取文件的時候胡野,只要指定要讀的topic和queueId材失,以及開始o(jì)ffset。因為每個CQUnit的大小是固定的硫豆,所以很容易就可以在文件中定位到龙巨。找到開始的位置后,只需要連續(xù)讀取后面指定數(shù)量的Unit熊响,然后根據(jù)Unit中存的CommitLog
的offset就可以到CommitLog
中讀取消息詳情了旨别。