producer 發(fā)送消息后析既,broker端開始存儲消息者疤,會調(diào)用 store 模塊的 DefaultMessageStore.putMessage 進行存儲消息畜埋。
DefaultMessageStore.putMessage
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
// 判斷運行狀態(tài)
if (this.shutdown) {
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
// 如果當前節(jié)點為 SLAVE 則禁止寫入
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is slave mode, so putMessage is forbidden ");
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
// 判斷是否有可寫權(quán)限
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
this.printTimes.set(0);
}
// 判斷Topic 長度不能超過 127
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
// properties 屬性長度不能超過 32767
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
// 判斷當前系統(tǒng)是否正忙
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
long beginTime = this.getSystemClock().now();
// 調(diào)用 commitLog.putMessage 插入消息
PutMessageResult result = this.commitLog.putMessage(msg);
long eclipseTime = this.getSystemClock().now() - beginTime;
if (eclipseTime > 500) {
log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
}
// 添加統(tǒng)計消息
this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
判斷系統(tǒng)是否繁忙
public boolean isOSPageCacheBusy() {
// 獲取 putMessage 寫入鎖的時間,如果當前沒有正在寫的消息谤碳,則該值為0
long begin = this.getCommitLog().getBeginTimeInLock();
long diff = this.systemClock.now() - begin;
return diff < 10000000
&& diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
}
如果當前沒有正在寫入消息佑钾,則 begin = 0
;
如果當前有正在寫入的消息伞辛,則 begin
為寫入消息獲取開始時間烂翰。
CommitLog.putMessage()
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// 設(shè)置存儲時間
msg.setStoreTimestamp(System.currentTimeMillis());
// 設(shè)置消息的校驗信息
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
// 添加 延遲消息
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
if (msg.getDelayTimeLevel() > 0) {
// 如果設(shè)置的延遲消息級別大于系統(tǒng)默認的最大值,則設(shè)置默認的最大值(默認最大級別為18)
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
// 根據(jù)延遲的級別獲取對應(yīng)延遲隊列的 queue id
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 把延時消息先存儲到延時的消息隊列文件中
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
long eclipseTimeInLock = 0;
MappedFile unlockMappedFile = null;
// 獲取最后一個 mappedFile 文件(最后一個是可寫入的)
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// put message 需要獲取鎖蚤氏,配置文件中可以配置使用自旋鎖或重入鎖
putMessageLock.lock();
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// 設(shè)置存儲時間甘耿,為了確保全局有序
msg.setStoreTimestamp(beginLockTimestamp);
// 如果 mappedFile 為空或已滿,則創(chuàng)建一個新的 mappedFile
if (null == mappedFile || mappedFile.isFull()) {
// 該方法中會判斷 lastMappedFile 是否可用竿滨,如果沒有可用的則創(chuàng)建
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
// 向文件中添加信息
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// 文件不足寫下這條消息佳恬,創(chuàng)建一個新的 commitlog 文件
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
// 存儲消息超過500毫秒,打印警告信息
if (eclipseTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
// 設(shè)置返回結(jié)果
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// 添加統(tǒng)計信息
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
// 刷盤策略(異步刷盤還是同步刷盤)
handleDiskFlush(result, putMessageResult, msg);
// 同步策略(消息同步Slave:實時同步還是異步同步)
handleHA(result, putMessageResult, msg);
return putMessageResult;
}
延時消息
如果發(fā)送的消息是延時消息于游,則把消息的 topic 和 queueid 修改為延時消息隊列的信息毁葱。
發(fā)送消息的時候通過 setDelayTimeLevel(int level) 方法設(shè)置延遲級別。這個級別目前只支持1-18贰剥。
每一個級別對應(yīng)一個固定的延時時間倾剿,代碼如下:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
public boolean parseDelayLevel() {
HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
try {
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
Long tu = timeUnitTable.get(ch);
int level = i + 1;
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
long num = Long.parseLong(value.substring(0, value.length() - 1));
long delayTimeMillis = tu * num;
this.delayLevelTable.put(level, delayTimeMillis);
}
} catch (Exception e) {
log.error("parseDelayLevel exception", e);
log.info("levelString String = {}", levelString);
return false;
}
return true;
}
可以看出 最大級別是18,最大延時的時間是2小時鸠澈。
獲取要寫入的 CommitLog 文件
調(diào)用this.mappedFileQueue.getLastMappedFile(0);
獲取要寫入的 CommitLog 文件
因為 CommitLog 是順序?qū)懙闹妫詈笠粋€ CommitLog 肯定是要寫如的文件截驮。
如果獲取不到,或者最后的一個CommitLog 文件已經(jīng)寫滿則創(chuàng)建一個际度。
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
// 獲取要寫入的CommitLog文件對應(yīng)的 MappedFile
MappedFile mappedFileLast = getLastMappedFile();
//mappedFileLast 為空或者最后一個對象對應(yīng)的文件已經(jīng)寫滿葵袭,則創(chuàng)建一個新的文件(即新的 MapedFile 對象) ;
//計算出新文件的起始偏移量(起始偏移量就是文件名稱)
if (mappedFileLast == null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
if (mappedFileLast != null && mappedFileLast.isFull()) {
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
if (createOffset != -1 && needCreate) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
// 判斷 allocateMappedFileService 服務(wù)是否初始化乖菱,并創(chuàng)建下一個文件和下下一個文件
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
// 如果allocateMappedFileService 沒有創(chuàng)建坡锡,則直接創(chuàng)建文件
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
//最后將創(chuàng)建或返回的 MapedFile 對象存入 MapedFileQueue 的 MapedFile 列表中,并返回該 MapedFile 對象給調(diào)用者
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
return mappedFileLast;
}
- 1窒所、獲取要寫入的CommitLog文件對應(yīng)的 MappedFile鹉勒。
- 2、如果 needCreate = false吵取,則直接返回 mappedFileLast 禽额。
- 3、如果 needCreate = true
3.1皮官、如果allocateMappedFileService 已創(chuàng)建脯倒,構(gòu)建需要創(chuàng)建的 MappedFile 對應(yīng)的文件路徑。這里連續(xù)構(gòu)建兩個 MappedFile捺氢。
然后調(diào)用AllocateMappedFileService.putRequestAndReturnMappedFile
去創(chuàng)建 MappedFile藻丢。
3.2、如果allocateMappedFileService 未創(chuàng)建摄乒,則直接使用new MappedFile()
創(chuàng)建CommitLog 文件悠反。
MappedFile 對應(yīng)的是一個映射文件。
RocketMQ 中 CommitLog馍佑、ConsumerQueue斋否、index等都是使用的 MappedFile進行創(chuàng)建的。這里分析的是 CommitLog 的創(chuàng)建過程挤茄,所以 MappedFile 對應(yīng)的就是一個 CommitLog文件如叼。
putRequestAndReturnMappedFile
AllocateMappedFileService.putRequestAndReturnMappedFile
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
int canSubmitRequests = 2;
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();
}
}
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
// 嘗試向 ConcurrentHashMap 中存放 nextReq ,如果存放失敗說明有別的線程已經(jīng)創(chuàng)建文件
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
//存放成功穷劈,則進行向requestQueue中存放數(shù)據(jù)
if (nextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
this.requestTable.remove(nextFilePath);
return null;
}
boolean offerOK = this.requestQueue.offer(nextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
canSubmitRequests--;
}
// 再創(chuàng)建一個 CommitLog 文件
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
this.requestTable.remove(nextNextFilePath);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
}
}
if (hasException) {
log.warn(this.getServiceName() + " service has exception. so return null");
return null;
}
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) {
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
this.requestTable.remove(nextFilePath);
return result.getMappedFile();
}
} else {
log.error("find preallocate mmap failed, this never happen");
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
return null;
}
創(chuàng)建下一個 CommitLog 文件和下下一個CommitLog文件笼恰。
1、構(gòu)建 AllocateRequest nexReq 對象歇终,并把該對象存放到 requestQueue 隊列中
2社证、構(gòu)建 AllocateRequest nextNextReq 對象,并把該對象存放到 requestQueue 隊列中
然后在 AllocateMappedFileService.run() 方法中從 requestQueue 阻塞中獲取需要創(chuàng)建的任務(wù)评凝,并創(chuàng)建文件追葡。
創(chuàng)建文件的優(yōu)先級
AllocateRequest 對象實現(xiàn)了 Comparable 接口,把 AllocateRequest 保存到 requestQueue 隊列中,該隊列是 PriorityBlockingQueue 優(yōu)先級隊列宜肉≡染可以根據(jù)優(yōu)先級進行創(chuàng)建文件。
優(yōu)先級的根據(jù) 先根據(jù)文件的大小進行排序谬返,大的文件先創(chuàng)建之斯;
如果文件大小相同,則根據(jù)文件的起始偏移量進行排序遣铝,偏移量小的優(yōu)先創(chuàng)建佑刷。
為什么大文件先創(chuàng)建呢?
因為大文件肯定是 CommitLog 文件酿炸。只要優(yōu)先把CommitLog 文件創(chuàng)建完成那么就可以接受 Producer端的請求瘫絮。然后再根據(jù) CommitLog 中的數(shù)據(jù)生存 ConsumerQueue 和 index 相關(guān)的數(shù)據(jù)。
創(chuàng)建 CommitLog 文件
AllocateMappedFileService.run() 方法
public void run() {
log.info(this.getServiceName() + " service started");
// 循環(huán)獲取待創(chuàng)建文件的請求任務(wù)填硕,并創(chuàng)建文件
while (!this.isStopped() && this.mmapOperation()) {
}
log.info(this.getServiceName() + " service end");
}
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
// 從 requestQueue 阻塞隊列中獲取 AllocateRequest 任務(wù)麦萤。
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
if (null == expectedRequest) {
log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize());
return true;
}
if (expectedRequest != req) {
log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
return true;
}
if (req.getMappedFile() == null) {
long beginTime = System.currentTimeMillis();
MappedFile mappedFile;
// 判斷是否開啟 isTransientStorePoolEnable ,如果開啟則使用直接內(nèi)存進行寫入數(shù)據(jù)廷支,最后從直接內(nèi)存中 commit 到 FileChannel 中频鉴。
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
// 使用 mmap 方式創(chuàng)建 MappedFile
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
if (eclipseTime > 10) {
int queueSize = this.requestQueue.size();
log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
+ " " + req.getFilePath() + " " + req.getFileSize());
}
// 預(yù)寫入數(shù)據(jù)。按照系統(tǒng)的 pagesize 進行每個pagesize 寫入一個字節(jié)數(shù)據(jù)恋拍。
//為了把mmap 方式映射的文件都加載到內(nèi)存中。
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMapedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
this.hasException = true;
return false;
} catch (IOException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.hasException = true;
if (null != req) {
requestQueue.offer(req);
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
}
}
} finally {
if (req != null && isSuccess)
req.getCountDownLatch().countDown();
}
return true;
}
初始化 MappedFile
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
ensureDirOK(this.file.getParent());
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("create file channel " + this.fileName + " Failed. ", e);
throw e;
} catch (IOException e) {
log.error("map file " + this.fileName + " Failed. ", e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
使用 RandomAccessFile 獲取 可讀性 channel 進行讀寫消息藕甩。
核心存儲機制
該方法mappedFile.appendMessage
最終會調(diào)用到 DefaultAppendMessageCallback.doAppend(...)
進行存儲信息施敢。
代碼如下:
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// PHY OFFSET
long wroteOffset = fileFromOffset + byteBuffer.position();
this.resetByteBuffer(hostHolder, 8);
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}
// Transaction messages that require special handling
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queuec
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}
/**
* Serialize message
*/
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}
// Determines whether there is sufficient free space
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
//this.msgBatchMemory.put(msgInner.getStoreHostBytes());
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
break;
default:
break;
}
return result;
}
就是把消息寫入到 byteBuffer 中。
字段 | 字段大小(字節(jié)) | 字段含義 |
---|---|---|
msgSize | 4 | 代表這個消息的大小 |
MAGICCODE | 4 | MAGICCODE = daa320a7 |
BODY CRC | 4 | 消息體 BODY CRC |
queueId | 4 | queueId |
flag | 4 | 系統(tǒng)消息標志(是否壓縮消息狭莱、是否有tag僵娃、是否事務(wù)等) |
QUEUEOFFSET | 8 | 這個值是個自增值不是真正的 consume queue 的偏移量,可以代表這個consumeQueue 隊列或者 tranStateTable 隊列中消息的個數(shù) |
SYSFLAG | 4 | 指明消息是事物事物狀態(tài)等消息特征腋妙,二進制為四個字節(jié)從右往左數(shù):當 4 個字節(jié)均為 0(值為 0)時表示非事務(wù)消息默怨;當?shù)?1 個字 節(jié)為 1(值為 1)時表示表示消息是壓縮的(Compressed);當?shù)?2 個字節(jié)為 1(值為 2) 表示多消息(MultiTags)骤素;當?shù)?3 個字節(jié)為 1(值為 4)時表示 prepared 消息匙睹;當?shù)?4 個字節(jié)為 1(值為 8)時表示commit 消息; 當?shù)?3/4 個字節(jié)均為 1 時(值為 12)時表示 rollback 消息;當?shù)?3/4 個字節(jié)均為 0 時表 示非事務(wù)消息济竹; |
BORNTIMESTAMP | 8 | 消息產(chǎn)生端(producer)的時間戳 |
BORNHOST | 8 | 消息產(chǎn)生端(producer)地址(address:port) |
STORETIMESTAMP | 8 | 消息在broker存儲時間 |
STOREHOSTADDRESS | 8 | 消息存儲到broker的地址(address:port) |
RECONSUMETIMES | 8 | 消息被某個訂閱組重新消費了幾次(訂閱組 之間獨立計數(shù)),因為重試消息發(fā)送到了topic 名字為%retry%groupName的隊列 queueId=0的隊列中去了痕檬,成功消費一次記 錄為0; |
PreparedTransaction Offset | 8 | 表示是prepared狀態(tài)的事物消息 |
messagebodyLength | 4 | 消息體大小值 |
messagebody | bodyLength | 消息體內(nèi)容 |
topicLength | 1 | topic名稱內(nèi)容大小 |
topic | topicLength | topic的內(nèi)容值 |
propertiesLength | 2 | 屬性值大小 |
properties | propertiesLength | propertiesLength大小的屬性數(shù)據(jù) |