在存儲第一篇中主要說了一些存儲文件的載體,和其他的管理類绞吁。至于消息的轉(zhuǎn)換存儲幢痘,中間的一些設(shè)計只是聊了一部分。
DefaultAppendMessageCallback 繼續(xù)聊
之前將了消息的大小計算家破,計算好了以后颜说,就可以進(jìn)行其他的驗證了
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
// 消息太大了
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
// Determines whether there is sufficient free space
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
// 在消息放入到buffer中,放入失敗汰聋,需要將buffer標(biāo)識為文件已滿门粪。
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
// END_FILE_MIN_BLANK_LENGTH 這個長度放在文件的末尾,當(dāng)讀取到該位置時烹困,發(fā)現(xiàn)時BLANK_MAGIC_CODE玄妈,
// 說明文件讀取到盡頭了,該換一個文件讀取
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank); // 先寫入剩余的內(nèi)容的長度
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 設(shè)置一個標(biāo)識位髓梅,表示讀到盡頭了
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
首先消息的長度不能太大了拟蜻,太大就是非法的消息。maxBlank參數(shù)是mappedFile中計算剩余多大的容量女淑。但是這里的判斷是否大于剩余量時,是通過消息的長度加上END_FILE_MIN_BLANK_LENGTH 的長度比較的辜御。并且END_FILE_MIN_BLANK_LENGTH是默認(rèn)的8個字節(jié)鸭你,有什么用意呢?往下看擒权,重置了msgStoreItemMemory袱巨,并且limit為maxBlank。但是先放入4個字節(jié)的maxBlank長度碳抄,然后又放入4個字節(jié)的CommitLog.BLANK_MAGIC_CODE愉老。在計算消息的大小時,也有4個字節(jié)的消息大小剖效,和一個4個字節(jié)的MAGICCODE標(biāo)識嫉入。
public final static int MESSAGE_MAGIC_CODE = -626843481;
protected final static int BLANK_MAGIC_CODE = -875286124;
看一下焰盗,他們的code,一個標(biāo)識消息咒林,一個標(biāo)識空白熬拒。可以看出來垫竞,在怎么放消息到MappedFile中澎粟,文件是需要滿或者結(jié)束的,那怎么標(biāo)識這個文件內(nèi)容獲取時欢瞪,沒有消息了呢活烙,就可以通過BLANK_MAGIC_CODE標(biāo)識,說明該文件的存儲的消息已經(jīng)結(jié)束遣鼓,后面的內(nèi)容都是空啸盏。在查詢消息功能時,讀取到BLANK狀態(tài)時就可以停止了譬正,往下查詢了宫补。所以每個文件的結(jié)尾必須要包含BLANK_MAGIC_CODE,從而就需要自動占用8個字節(jié)了曾我。最后返回的AppendMessageResult中狀態(tài)為END_OF_FILE粉怕,告訴調(diào)用方,文件滿了抒巢,需要重新創(chuàng)建新的MappedFile贫贝。在考慮一下,MappedFile在調(diào)用callBack方法時蛉谜,會將自身的wrote值對result中寫入的數(shù)量進(jìn)行累加的稚晚,那么就算文件不能繼續(xù)寫了,也要告訴MappedFile本次寫入多少長度型诚,所以在AppendMessageResult中的wroteBytes參數(shù)值就是maxBlank值了客燕。
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 表示是個消息
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
//this.msgBatchMemory.put(msgInner.getStoreHostBytes());
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
// 當(dāng)消息沒有事務(wù),或者事務(wù)提交狰贯,則更新queue偏移量
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
break;
default:
break;
}
return result;
接下來就是消息轉(zhuǎn)發(fā)成byte數(shù)組了也搓,依次按規(guī)則寫入到msgStoreItemMemory中,最終msgStoreItemMemory寫入到byteBuffer中涵紊。其中有個queueOffset參數(shù)是在第6個次序?qū)懭氲陌剩⑶以谑聞?wù)提交或者沒有事務(wù)時,進(jìn)行++queueOffset操作摸柄,放入到topicQueueTable中颤练。說明queueOffset依次遞增的,他的作用是什么呢驱负?
DefaultAppendMessageCallback的append方法已經(jīng)大概了解嗦玖,本文只是講了單個消息放置患雇,當(dāng)然還提供了批量消息放置,原理都差不多
再回到CommitLog中putMessag方法剩余片段
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
handleDiskFlush(result, putMessageResult, msg);
handleHA(result, putMessageResult, msg);
return putMessageResult;
該邏輯就是解鎖掉unlockMappedFile文件踏揣,即釋放掉文件與內(nèi)存映射關(guān)系映射庆亡,因為不需要再寫了,只剩下讀了捞稿。然后做個統(tǒng)計同一個topic的生成次數(shù)又谋,和消息大小。
先是處理磁盤刷新的邏輯娱局,因為broker支持同步刷盤和異步刷盤的彰亥。同步刷屏的好處就是保證數(shù)據(jù)不丟失,但是性能會降低很多衰齐;異步刷屏則就有可能會丟消息數(shù)據(jù)了任斋。那么就看看同步和異步是如何實現(xiàn)的把?
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}
首先看一下異步方式耻涛,他只是通過ThreadService方法中喚醒線程wakeup()废酷,該flush消息線程就可以喚醒∧疲看一下FlushRealTimeService 實時刷新服務(wù)類
在線程實現(xiàn)方法中
try {
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
}
if (printFlushProgress) {
this.printFlushProgress();
}
long begin = System.currentTimeMillis();
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
// 物理消息時間戳更新
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
在刷新之前都會等待或者sleep一段時間澈蟆,然后通過mappedFileQueue執(zhí)行flush方法,并且更新了StoreCheckPoint的存儲消息的時間卓研。異步刷新很簡單趴俘,可以通過其他線程喚醒刷新線程,執(zhí)行刷盤操作奏赘。
同步刷新時寥闪,聲明了GroupCommitRequest請求,并且設(shè)置了內(nèi)部屬性nextOffset的值磨淌,該值是由消息的存儲起始位置+消息的寫入長度組合的疲憋。將該Request放入到了GroupCommitService服務(wù)中的Request列表中。該Request也存在倒計時監(jiān)聽器梁只,所以這段代碼request.waitForFlush()缚柳,進(jìn)行等待刷新完成。
GroupCommitService中代碼片段
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
public synchronized void putRequest(final GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
首先定義了2個屬性集合敛纲,一個是請求寫喂击,一個請求讀剂癌。在放入請求時淤翔,是將request對象放入到requestsWrite里面的,并且是鎖住requestsWrite對象佩谷。然后喚醒ServiceThread線程旁壮。在喚起線程是监嗜,會調(diào)用onWaiteEnd方法,而GroupCommitService實現(xiàn)該方法時調(diào)用了swapRequests()方法抡谐,
private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
起始就是將讀寫的集合進(jìn)行交換而已裁奇。而線程喚醒后,就會調(diào)用doCommit方法
private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) {
CommitLog.this.mappedFileQueue.flush(0);
}
}
req.wakeupCustomer(flushOK);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
首先對讀集合進(jìn)行鎖定麦撵。在這里依次遍歷所有的請求刽肠,然后判斷mappedFileQueue中FlushedWhere與請求中nextOffset比較,如果大于則刷新成功了免胃,就可以直接喚醒等待request請求的線程音五,如果小于則調(diào)用mappedFileQueue的flush方法。并且可以保證2次刷新羔沙。通過這種方式躺涝,實現(xiàn)消息的同步刷屏的,但是性能的確不是很高扼雏。
刷新磁盤后坚嗜,還有handleHA()方法,該方法是高可用消息的處理方式诗充,如何實現(xiàn)的苍蔬,后面會專門聊聊如何實現(xiàn) Master/Slave功能
考慮一下幾點問題?
1.已經(jīng)存儲的消息其障,存儲在commitLog中的消息都是各種類型的topic消息银室,包括有延遲消息,事務(wù)消息励翼,普通消息如何區(qū)分消費蜈敢;
2.由于commitLog中存儲的所有的消息,消息的查詢設(shè)計的不好汽抚,效率特別低抓狭,最終導(dǎo)致消費進(jìn)度緩慢
3.還有一些特別需求,例如通過關(guān)鍵字或者時間段造烁,檢索消息否过,這些都是需要設(shè)計良好的方式,提升查詢效率惭蟋,從而可以加快消費進(jìn)度苗桂。
ReputMessageService
該類是DefaultMessageStore的內(nèi)部類,它繼承與ServiceThread類告组,也是一個線程類煤伟,在該類中只有一個屬性 reputFromOffset 簡單解釋為重放偏移量。既然是實現(xiàn)線程接口,看一下run方法
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
內(nèi)部也是一個線程自循環(huán)便锨,不停的調(diào)用doReput()方法围辙。
private void doReput() {
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
DefaultMessageStore.this.doDispatch(dispatchRequest);
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
this.reputFromOffset += size;
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.addAndGet(dispatchRequest.getMsgSize());
}
} else if (size == 0) {
// 讀取到了文件的末尾,重新?lián)Q個文件讀取
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
if (size > 0) {
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
} else {
doNext = false;
// If user open the dledger pattern or the broker is master node,
// it will not ignore the exception and fix the reputFromOffset variable
if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);
this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}
先通過reputFromOffset偏移量從commitLog中的MappedFile中截取剩余部分的所有消息內(nèi)容SelectMappedBufferResult放案,之前在MappedFile中也講過SelectMappedBufferResult可能會存在多條消息姚建,他不是只有一條數(shù)據(jù),因為他截取的部分是從reputFromOffset到MappedFile的wrotePositon位置的數(shù)據(jù)吱殉。獲取到SelectMappedBufferResult時就開始遍歷數(shù)據(jù)掸冤。由于result中的ByteBuffer是順序讀取,所以內(nèi)部的pos位置隨著讀取也會越來越大友雳,但是不需要重置贩虾。通過CommitLog中的checkMessageAndReturnSize方法,就可以知道一個消息的大致信息沥阱,
public class DispatchRequest {
private final String topic;
private final int queueId;
private final long commitLogOffset;
private int msgSize;
private final long tagsCode;
private final long storeTimestamp;
private final long consumeQueueOffset;
private final String keys;
private final boolean success;
private final String uniqKey;
private final int sysFlag;
private final long preparedTransactionOffset;
private final Map<String, String> propertiesMap;
private byte[] bitMap;
private int bufferSize = -1;//the buffer size maybe larger than the msg size if the message is wrapped by something
// .....
}
其中得到的消息內(nèi)容都是一些關(guān)鍵屬性缎罢,例如topic,queueId考杉,msgSize等等策精,這些屬性有什么用,繼續(xù)講崇棠。因為得到dispatchRequest的結(jié)果不太相同的咽袜,例如文件讀到MAGIC_BLANK_CODE怎么處理的。首先dispatchRequest返回成功的枕稀,都是正常去讀的询刹,如果size大于0,存在消息萎坷。如果size=0說明文件末尾了凹联,需要換下一個文件讀取了,在這里commitLog.rollNextFile(reputFromOffset)就是指向了下一個文件的起始偏移量哆档。在存在消息的時蔽挠,首先調(diào)用了了doDispatch(request) 分發(fā)消息的方法,通過判斷條件進(jìn)行執(zhí)行消息到達(dá)監(jiān)聽器瓜浸,將消息的reputFromOffset加上了消息的size長度澳淑,然后做一些統(tǒng)計。重放線程主要功能還是在doDispatch()方法內(nèi)插佛。
public void doDispatch(DispatchRequest req) {
// 消息入磁盤成功杠巡,還有后續(xù)處理,例如創(chuàng)建索引雇寇,放入到消費隊列中氢拥,
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
}
}
CommitLogDispatcher 消息分發(fā)的接口绑改,在doDispatch方法只是遍歷一遍分發(fā)接口實現(xiàn)類,那么有哪些實現(xiàn)類
CommitLogDispatcherBuildConsumeQueue 構(gòu)建消費隊列
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
先判斷消息的事務(wù)類型兄一,如果是無事務(wù)或者事務(wù)提交,則執(zhí)行putMessagePositionInfo方法识腿,如果其他事務(wù)則不做任何處理出革。
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
// 得到消費隊列,然后進(jìn)行數(shù)據(jù)更新
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
cq.putMessagePositionInfoWrapper(dispatchRequest);
}
ok渡讼,這里就引入了ConsumeQueue的消費隊列骂束,在生成的時候已經(jīng)選擇好放入那個topic下的隊列編號,那么對于消費組成箫,也應(yīng)該知道消費的是哪個消費隊列展箱。基本上一個生成隊列對應(yīng)一個消費隊列蹬昌,除非讀寫權(quán)限控制了混驰。
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
logic = newLogic;
}
}
return logic;
}
查找消費隊列,首先在一個broker下皂贩,topic是唯一的栖榨,但是topic下可以有多個不同編號的queueId組成的消費隊列ConsumeQueue。屬性一下消費隊列的屬性信息
public ConsumeQueue(
final String topic,
final int queueId,
final String storePath,
final int mappedFileSize,
final DefaultMessageStore defaultMessageStore) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
this.defaultMessageStore = defaultMessageStore;
this.topic = topic;
this.queueId = queueId;
String queueDir = this.storePath
+ File.separator + topic
+ File.separator + queueId;
this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {
this.consumeQueueExt = new ConsumeQueueExt(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
);
}
}
這是一個消費隊列的構(gòu)造器方法明刷,包含了topic婴栽,queueId,也需要MappedFileQueue映射文件隊列辈末,說明該消費隊列也是需要存儲數(shù)據(jù)的愚争,只是他與CommitLog存儲的內(nèi)容可能不同而已。定義了文件的大小mappedFileSize挤聘,和其他的存儲根地址等等轰枝。
this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
這段代碼是申請了CQ_STORE_UNIT_SIZE=20長度的字節(jié),為什么是20個字節(jié)组去?下面繼續(xù)說狸膏。在通過topic和queueId查詢得到了一個ConsumeQueue,然后執(zhí)行cq.putMessagePositionInfoWrapper方法添怔。
public void putMessagePositionInfoWrapper(DispatchRequest request) {
final int maxRetries = 30;
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
for (int i = 0; i < maxRetries && canWrite; i++) {
long tagsCode = request.getTagsCode();
if (isExtWriteEnable()) {
// ...
}
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
if (result) {
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return;
} else {
// XXX: warn and notify me
log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
+ " failed, retry " + i + " times");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.warn("", e);
}
}
}
// XXX: warn and notify me
log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
在執(zhí)行putMessagePositionInfo方法湾戳,然后更新StoreCheckPoint中l(wèi)ogicsMsgTimestamp方法。如果失敗广料,則繼續(xù)嘗試砾脑。
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
if (offset + size <= this.maxPhysicOffset) {
log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
return true;
}
this.byteBufferIndex.flip(); // 長度縮短limit=pos,并且重置pos=0位置
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
// cqOffset是編號艾杏,他的真實地址是CQ_STORE_UNIT_SIZE的倍數(shù)
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
// 當(dāng)文件時隊列中的第一個韧衣,且消費對了的偏移量不為0.文件中寫入的數(shù)據(jù)為0,需要重置一下flush,commit偏移量
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
// 并且填充之前的數(shù)據(jù)畅铭,
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
if (cqOffset != 0) {
// 進(jìn)行校驗氏淑,保證expectLogicOffset的偏移量與真正需要寫入的位置時一致的
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
this.maxPhysicOffset = offset + size;
// 將byte數(shù)組添加到mappedFile中
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
所以putMessagePositionInfo才是真正核心的方法,在方法中參數(shù)包括了物理偏移量 offset硕噩,消息大小size假残,消息tags碼,和消息邏輯順序cqOffset(在存儲消息時炉擅,會通過topic從CommitLog中topicQueueTable 中得到一個順序偏移量辉懒,消息存儲成功就行進(jìn)行 自身+1 操作)。byteBufferIndex之前說過長度為20個字節(jié)谍失,是固定的眶俩。在這里他放了哪些信息,8個字節(jié)的物理偏移量offset快鱼,4個字節(jié)的消息長度size颠印,8個字節(jié)的togasCode,剛剛組成20個字節(jié)抹竹,也說明了一個消息轉(zhuǎn)換成ConsumeQueu信息時嗽仪,只存儲了3個屬性值,并且是固定長度的20個字節(jié)柒莉。expectLogicOffset是存儲byteBufferIndex內(nèi)容的起始位置闻坚,通過MappedFileQueue得到了MappedFile。在確認(rèn)一下ConsumeQueue中一個MappedFile文件大芯ばⅰ:
public int getMappedFileSizeConsumeQueue() {
int factor = (int) Math.ceil(this.mappedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0));
return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE);
}
他是20的倍數(shù)窿凤,說明一個文件是可以完整的記錄factor數(shù)量的數(shù)據(jù)。不需要類似消息存儲一樣跨蟹,需要有個結(jié)尾結(jié)束標(biāo)識雳殊。在看一下 存儲的消費信息,當(dāng)獲取符合條件的MappedFile時窗轩,判斷了該文件是否第一次創(chuàng)建即完全是沒有寫入過數(shù)據(jù)的夯秃,該文件需要初始化,設(shè)置了最小的邏輯偏移量minLogicOffset 痢艺,更新了刷新和提交的位置仓洼,還執(zhí)行了fillPreBlank填充方法。其他的都是一些偏移量校驗過程堤舒,然后更新maxPhysicOffset色建,相同topic下的最大物理偏移量,然后將byteBufferIndex轉(zhuǎn)換成byte數(shù)組添加到mappedFile中舌缤。
好了ConsumeQueue有什么特點箕戳,
1.他的存儲數(shù)據(jù)格式固定的某残,20個字節(jié)大小。2.他可以側(cè)面看到陵吸,topic下的消息存儲情況玻墅。3.由于commitLog是存儲了所有的消息,但是通過不同的topic和queueId時壮虫,存儲的簡化數(shù)據(jù)澳厢,方便以后數(shù)據(jù)定位及查找。4.有些系統(tǒng)自定義的topic旨指,例如延遲類型的topic,或者重試這樣的topic喳整,系統(tǒng)可以進(jìn)行單獨管理和分配谆构。
CommitLogDispatcherBuildIndex 構(gòu)建索引分發(fā)器
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}
為了方便消息內(nèi)容查詢,例如數(shù)據(jù)庫設(shè)計中框都,引入索引搬素,就能快速定位到具體消息的位置。在RocketMQ中的設(shè)計魏保,索引的存儲設(shè)計熬尺,采用數(shù)組及鏈表結(jié)合方式的數(shù)據(jù)結(jié)構(gòu),與HashMap中的結(jié)構(gòu)設(shè)計類似谓罗。索引管理通過IndexService索引服務(wù)控制粱哼。
public void buildIndex(DispatchRequest req) {
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
if (msg.getCommitLogOffset() < endPhyOffset) {
return;
}
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
return;
}
if (req.getUniqKey() != null) {
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
}
}
} else {
log.error("build index error, stop building index");
}
}
索引存儲的結(jié)構(gòu)和內(nèi)容,在該代碼片段中檩咱,首先獲取到IndexFile索引文件揭措,然后通過DispatchRequest中topic,keys刻蚯,uniqKey等屬性進(jìn)行放置绊含。尤其是keys,他是多個關(guān)鍵字組成炊汹,但都會拆分多個key躬充,與topic組合成最終的key進(jìn)行存儲。索引一個消息可以有多個關(guān)鍵字組成讨便,或者一個唯一關(guān)鍵字組成充甚。那么IndexFile是如何存儲索引內(nèi)容的。
public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,
final long endPhyOffset, final long endTimestamp) throws IOException {
// 一個索引文件最大需需要占的子節(jié)霸褒,有頭文件(40)+ 槽 + 索引信息
int fileTotalSize =
IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
this.mappedFile = new MappedFile(fileName, fileTotalSize);
this.fileChannel = this.mappedFile.getFileChannel();
this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();
this.hashSlotNum = hashSlotNum;
this.indexNum = indexNum;
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
this.indexHeader = new IndexHeader(byteBuffer);
// 初始化時津坑,設(shè)置了起始的頭部信息
if (endPhyOffset > 0) {
this.indexHeader.setBeginPhyOffset(endPhyOffset);
this.indexHeader.setEndPhyOffset(endPhyOffset);
}
if (endTimestamp > 0) {
this.indexHeader.setBeginTimestamp(endTimestamp);
this.indexHeader.setEndTimestamp(endTimestamp);
}
}
這個索引文件的構(gòu)造器,
1.先是定義了文件的大小fileTotalSize傲霸,并且已經(jīng)確定了他的組成部門疆瑰,包括INDEX_HEADER_SIZE長度眉反,hash槽的長度,索引的長度穆役。也可以看出索引文件是三部分組成的寸五。頭文件,hash槽數(shù)據(jù)耿币,索引數(shù)據(jù)組成梳杏。
2.定義了索引文件的槽數(shù)量,和索引數(shù)量
3.得到了IndexHeader淹接,索引頭數(shù)據(jù)十性。
private AtomicLong beginTimestamp = new AtomicLong(0);
private AtomicLong endTimestamp = new AtomicLong(0);
private AtomicLong beginPhyOffset = new AtomicLong(0);
private AtomicLong endPhyOffset = new AtomicLong(0);
private AtomicInteger hashSlotCount = new AtomicInteger(0);
private AtomicInteger indexCount = new AtomicInteger(1);
indexHeader由6個屬性組成,開始塑悼,結(jié)束時間劲适。開始結(jié)束物理偏移量。槽數(shù)量厢蒜,索引數(shù)量霞势。因為IndexHeader也是存儲在磁盤中的,從屬性中斑鸦,可以確定一個IndexHeader占用了40個字節(jié)愕贡。
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {
int keyHash = indexKeyHashMethod(key); // 通過key hash進(jìn)行分配
int slotPos = keyHash % this.hashSlotNum; // 槽的位置
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; // 已經(jīng)占的位置,頭文件和所屬槽的地址
FileLock fileLock = null;
try {
// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
// false);
// 記錄了這個槽對應(yīng)的值巷屿,該值是記錄最近一次put索引時的索引位置固以,但初始都是0
// 那么索引的位置怎么拿到?通過header中的indexCount獲取
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
// 需要put索引的真正位置
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
// 一個索引所占的位置,4個byte=hash值嘱巾,8個byte=消息物理偏移量嘴纺,4個byte=時間差,4個byte=上一個索引的位置
// 這個索引的設(shè)計類似與HashMap的結(jié)構(gòu)設(shè)計浓冒,采用數(shù)組與鏈表的形式
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); // 為什么要記錄時間差栽渴?因為省空間
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
// 重新更新一下槽當(dāng)前的索引位置,提供給下一個索引用
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) {
// 如果是第一個開始放置索引稳懒,更新開始物理偏移量和開始存儲時間
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}
IndexFile是如何存儲key的闲擦,如何結(jié)合數(shù)據(jù)結(jié)構(gòu)存儲的?索引文件的基本結(jié)構(gòu)圖场梆,如下:
從文件的橫向看墅冷,和縱向看,了解清楚內(nèi)部結(jié)構(gòu)設(shè)計思路或油。
首先確定索引數(shù)量還能繼續(xù)放置寞忿。通過key得到一個keyHash值,然后通過keyHash百分比槽的數(shù)量顶岸,得到了slotPos腔彰,該位置就是可以對應(yīng)槽的位置叫编。但是slotPos只是相對順序位置,真實的存放位置還需要包含header部分霹抛。所以absSlotPos是槽的絕對位置搓逾。通過absSlotPos位置得到4個字節(jié)長度即slotValue,該值記錄的是最近一次放置索引的順序值杯拐。timeDiff為什么要時間差霞篡,并且轉(zhuǎn)換成了單位秒。因為文件的IndexHeader存儲了文件的開始時間的端逼,如果要得到索引的最終時間朗兵,就可以通過開始時間加上時間差。從而到達(dá)從long8個字節(jié)只需要int 4個字節(jié)存儲顶滩,磁盤空間可以剩下很多余掖。因為indexHeader中存放了當(dāng)前索引存放的順序位置,就能得到absIndexPos絕對索引位置诲祸,在存放索引數(shù)據(jù)時浊吏,一個索引存放需要20個字節(jié)而昨。除了存放的key的hash值救氯,物理偏移量,時間差等歌憨,還存儲了同一個hash槽的上一個索引順序位置着憨,這樣就能組合成了一個單向鏈表了。存儲索引后务嫡,更新了當(dāng)前hash槽中的索引順序編號甲抖。并且增加了indexHeader中的索引數(shù)量,更新了最大的物理偏移量phyOffset心铃,和最大存儲消息的時間准谚。
既然有了存儲的邏輯,那么查詢索引如何實現(xiàn)呢去扣?IndexFile中查詢方法
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
if (this.mappedFile.hold()) {
// 找出hashslot 位置柱衔,得到索引編號,通過索引編號找出具體的索引信息愉棱,然后依次找出上一個索引的位置進(jìn)行遍歷
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
if (lock) {
// fileLock = this.fileChannel.lock(absSlotPos,
// hashSlotSize, true);
}
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
} else {
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
// 1.索引已經(jīng)沒有上一個索引位置唆铐。2.前一個索引編號大于了當(dāng)前編號,
// 3.索引編號一致奔滑,4艾岂,時間小于查詢的時間
break;
}
nextIndexToRead = prevIndexRead;
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
this.mappedFile.release();
}
}
}
先從查詢的key計算得到keyHash,定位到屬于哪個槽朋其,然后得到這個槽的絕對位置absSlotPos王浴。通過absSlotPos讀取4個字節(jié)脆炎,就能獲取到最近索引的順便編號。最終計算得到absIndexPos絕對索引位置叼耙,然后依次讀取響應(yīng)的數(shù)據(jù)腕窥,與查詢的時間,關(guān)鍵字比較等查找合適的物理偏移量筛婉。然后nextIndexToRead重新賦值到當(dāng)前索引指向的順序編號prevIndexRead簇爆,繼續(xù)循環(huán)。如何結(jié)束循環(huán)呢爽撒?直到索引存儲的上一個索引的編號為0入蛆,才查找結(jié)束,或者查找內(nèi)容滿了等等硕勿。
RocketMQ最重要的存儲有這些數(shù)據(jù)組成哨毁,包括消息元數(shù)據(jù),消費隊列數(shù)據(jù)源武,和索引數(shù)據(jù)等扼褪。在設(shè)計中運用了很多線程方式,解耦很多業(yè)務(wù)關(guān)聯(lián)粱栖。例如在存儲消息的時候话浇,也會將消息存儲到對應(yīng)的隊列中,但是RocketMQ設(shè)計中闹究,消息的存放是加了鎖的同步代碼塊幔崖,為了保證效率,提高代碼執(zhí)行速率渣淤,盡可能減少其他工作赏寇,能解耦的用異步方式處理,所以只將消息存放到MappedFile中价认。我們知道消息要進(jìn)行2次處理后梆砸,才能更加有效的查詢消息幽纷,所以用重放線程來控制消息的二次處理,包括消費隊列的控制,索引的添加等等盈魁。