一臂容、CommitLog
RocketMQ 通過使用內(nèi)存映射文件來提高IO 訪問性能牛欢,無論是CommitLog 风题、ConsumeQueue 單個文件都被設計為固定長度像棘,如果一個文件寫滿以后再創(chuàng)建一個新文件蹄梢,文件名就為該文件第一條消息對應的全局物理偏移量疙筹。
CommitLog邏輯存儲如下:
二、MappedFileQueue
(一) MappedFileQueue核心屬性
MappedFileQueue巳是MappedFile 的管理容器, MappedFileQueue 是對存儲目錄的封裝而咆,例如CommitLog 文件的存儲路徑{ ROCKET_HOME} /store/commitlog/ 霍比,該目錄下會存在多個內(nèi)存映射文件(MappedFile)
MappedFileQueue 的核心屬性:
String storePath :存儲目錄。
int mappedFileSize : 單個文件的存儲大小暴备。
CopyOnWriteArrayList<MappedFile> mappedFiles: MappedFile 文件集合悠瞬。
AllocateMappedFileService allocateMappedFileService :創(chuàng)建MappedFile 服務類。
long flushedWhere = 0 : 當前刷盤指針涯捻, 表示該指針之前的所有數(shù)據(jù)全部持久化到磁盤浅妆。
long committedWhere = 0 : 當前數(shù)據(jù)提交指針,內(nèi)存中ByteBuffer 當前的寫指針障癌,該值大于等于flushedWhere 凌外。
(二) 根據(jù)消息存儲時間戳來查找MappdFile
根據(jù)消息存儲時間戳來查找MappdFile 。從MappedFile 列表中第一個文件開始查找涛浙,找到第一個最后一次更新時間大于待查找時間戳的文件康辑,如果不存在,則返回最后一個MappedFile 文件蝗拿。
public MappedFile getMappedFileByTime(final long timestamp) {
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return null;
?
for (int i = 0; i < mfs.length; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
return mappedFile;
}
}
return (MappedFile) mfs[mfs.length - 1];
}
(三) 根據(jù)偏移量獲取文件
根據(jù)offset 定位MappedFile 的算法為:( int) ((offset / this.mappedFileSize) -(mappedFile.getFileFromOffset() / this . MappedFileSize ))
/**
* Finds a mapped file by offset.
*
* @param offset Offset.
* @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
* @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).
*/
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try {
MappedFile firstMappedFile = this.getFirstMappedFile();
MappedFile lastMappedFile = this.getLastMappedFile();
if (firstMappedFile != null && lastMappedFile != null) {
// 是否存在file
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
offset,
firstMappedFile.getFileFromOffset(),
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
this.mappedFileSize,
this.mappedFiles.size());
} else {
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
MappedFile targetFile = null;
try {
targetFile = this.mappedFiles.get(index);
} catch (Exception ignored) {
}
?
if (targetFile != null && offset >= targetFile.getFileFromOffset()
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
return targetFile;
}
?
for (MappedFile tmpMappedFile : this.mappedFiles) {
if (offset >= tmpMappedFile.getFileFromOffset()
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
return tmpMappedFile;
}
}
}
?
if (returnFirstOnNotFound) {
return firstMappedFile;
}
}
} catch (Exception e) {
log.error("findMappedFileByOffset Exception", e);
}
return null;
}
(四) 獲取最后一個MappedFile
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();
// 是否為空
if (mappedFileLast == null) {
createOffset = startOffset - (startOffset % this.mappedFileSize);
}
// 不為空晾捏,但是已經(jīng)滿
if (mappedFileLast != null && mappedFileLast.isFull()) {
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
}
// 創(chuàng)建mappedfile
if (createOffset != -1 && needCreate) {
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
// 通過AllocateMappedFileService創(chuàng)建MappedFile,后面小節(jié)進行具體分析
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
// 直接創(chuàng)建MappedFile
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
?
if (mappedFile != null) {
if (this.mappedFiles.isEmpty()) {
mappedFile.setFirstCreateInQueue(true);
}
this.mappedFiles.add(mappedFile);
}
return mappedFile;
}
return mappedFileLast;
}
三哀托、MappedFile
(一) MappedFile 核心屬性
MappedFile是RocketMQ 內(nèi)存映射文件的具體實現(xiàn)惦辛,相關(guān)屬性如下:
int OS_PAGE_SIZE :操作系統(tǒng)每頁大小,默認4k 仓手。
AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY : 當前JVM 實例中MappedFile虛擬內(nèi)存胖齐。
Atomiclnteger TOTAL_MAPPED_FILES :當前JVM 實例中MappedFile 對象個數(shù)。
Atomiclnteger wrotePosition : 當前該文件的寫指針嗽冒,從0 開始(內(nèi)存映射文件中的寫指針) 呀伙。
Atomiclnteger committedPosition :當前文件的提交指針,如果開啟transientStore PoolEnable添坊, 則數(shù)據(jù)會存儲在TransientStorePool 中剿另, 然后提交到內(nèi)存映射ByteBuffer 中, 再刷寫到磁盤贬蛙。
Atomiclnteger flushedPosition :刷寫到磁盤指針雨女,該指針之前的數(shù)據(jù)持久化到磁盤中。
int fileSize :文件大小阳准。
FileChannel fileChannel : 文件通道氛堕。
ByteBuffer writeBuffer :堆內(nèi)存ByteBuffer , 如果不為空野蝇,數(shù)據(jù)首先將存儲在該Buffer 中讼稚, 然后提交到MappedFile 對應的內(nèi)存映射文件Buffer 括儒。transientStorePoolEnable為true 時不為空。
TransientStorePool transientStorePool :堆內(nèi)存池锐想, transientStorePoolEnable 為true時啟用帮寻。
String fileName :文件名稱。
long fileFromOffset :該文件的初始偏移量痛倚。
File file :物理文件规婆。
MappedByteBuffer mappedByteBuffer :物理文件對應的內(nèi)存映射Buffer 。
volatile long storeTimestamp = 0 :文件最后一次內(nèi)容寫入時間蝉稳。
boolean firstCreatelnQueue :是否是MappedFileQueue 隊列中第一個文件抒蚜。
(二) MappedFile 初始化
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
// 確保目錄存在
ensureDirOK(this.file.getParent());
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("create file channel " + this.fileName + " Failed. ", e);
throw e;
} catch (IOException e) {
log.error("map file " + this.fileName + " Failed. ", e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
(三) MappedFile 提交
內(nèi)存映射文件的提交動作由MappedFile 的commit 方法實現(xiàn),具體實現(xiàn)如下:
public int commit(final int commitLeastPages) {
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
commit0(commitLeastPages);
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}
?
// All dirty data has been committed to FileChannel.
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
this.transientStorePool.returnBuffer(writeBuffer);
this.writeBuffer = null;
}
?
return this.committedPosition.get();
}
commitLeastPages為本次最小提交頁數(shù)耘戚,如果提交頁數(shù)不滿足commitLeastPages嗡髓,則不執(zhí)行提交操作,writeBuffer 如果為空收津,直接返回wrotePosition 指針饿这,無須執(zhí)行commit操作, 表明commit 操作主體是writeBuffer撞秋。
最終調(diào)用commit0()方法长捧,實現(xiàn)真正的數(shù)據(jù)提交,具體實現(xiàn)如下:
protected void commit0(final int commitLeastPages) {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
?
if (writePos - this.committedPosition.get() > 0) {
try {
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
this.committedPosition.set(writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}
首先創(chuàng)建writeBuffer 的共享緩存區(qū)吻贿,然后將新創(chuàng)建的position 回退到上一次提交的位置( committedPosition ) 串结, 設置limit 為wrotePosition (當前最大有效數(shù)據(jù)指針),然后把commitedPosition 到wrotePosition 的數(shù)據(jù)復制(寫入)到FileChannel中舅列, 然后更新committedPosition 指針為wrotePosition.commit 的作用就是將MappedFile#writeBuffer中的數(shù)據(jù)提交到文件通道FileChannel 中肌割。
ByteBuffer 使用技巧: slice () 方法創(chuàng)建一個共享緩存區(qū), 與原先的ByteBuffer 共享內(nèi)存但維護一套獨立的指針( position 帐要、mark 把敞、limit) 。
(四) MappedFile 刷新
MappedFile調(diào)用flush方法榨惠,將內(nèi)存中的數(shù)據(jù)刷新到磁盤中奋早,具體實現(xiàn)如下:
/**
* @return The current flushed position
*/
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
int value = getReadPosition();
?
try {
//We only append data to fileChannel or mappedByteBuffer, never both.
if (writeBuffer != null || this.fileChannel.position() != 0) {
this.fileChannel.force(false);
} else {
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
?
this.flushedPosition.set(value);
this.release();
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
return this.getFlushedPosition();
}
刷寫磁盤,直接調(diào)用mappedByteBuffer 或fileChannel 的force 方法將內(nèi)存中的數(shù)據(jù)持久化到磁盤赠橙,那么flushedPosition 應該等于MappedByteBuffer 中的寫指針伸蚯;
如果writeBuffer不為空, 則flushedPosition 應等于上一次commit 指針简烤;因為上一次提交的數(shù)據(jù)就是進入到MappedByteBuffer 中的數(shù)據(jù);
如果writeBuffer 為空摇幻,數(shù)據(jù)是直接進入到MappedByteBuffer的wrotePosition 代表的是MappedByteBuffer 中的指針横侦,故設置flushedPosition 為wrotePosition 挥萌。
(五) 獲取MappedFile 最大讀指針
RocketMQ 文件的一個組織方式是內(nèi)存映射文件,預先申請一塊連續(xù)的固定大小的內(nèi)存枉侧, 需要一套指針標識當前最大有效數(shù)據(jù)的位置引瀑,獲取最大有效數(shù)據(jù)偏移量的方法由MappedFile 的getReadPosition 方法實現(xiàn)
/**
* @return The max position which have valid data
*/
public int getReadPosition() {
return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}
獲取當前文件最大的可讀指針。如果writeBuffer 為空榨馁, 則直接返回當前的寫指針憨栽;如果writeBuffer 不為空, 則返回上一次提交的指針翼虫。在MappedFile 設計中屑柔,只有提交了的數(shù)據(jù)(寫入到MappedByteBuffer 或FileChannel 中的數(shù)據(jù))才是安全的數(shù)據(jù)。
四珍剑、 AllocateMappedFileService
(一) AllocateMappedFileService核心屬性
ConcurrentMap<String, AllocateRequest> requestTable:key是filepath掸宛,value是分配請求
PriorityBlockingQueue<AllocateRequest> requestQueue:分配請求的 隊列,注意是優(yōu)先級隊列
int waitTimeOut:生成對應請求到創(chuàng)建MappedFile,可以等待5s
(二) MappedFile創(chuàng)建
通過調(diào)用putRequestAndReturnMappedFile方法招拙,往隊列里面添加創(chuàng)建MappedFile的請求
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
// 處理兩個分配MappedFile請求
int canSubmitRequests = 2;
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();
}
}
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
?
if (nextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
this.requestTable.remove(nextFilePath);
return null;
}
boolean offerOK = this.requestQueue.offer(nextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
canSubmitRequests--;
}
?
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
this.requestTable.remove(nextNextFilePath);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
}
}
?
if (hasException) {
log.warn(this.getServiceName() + " service has exception. so return null");
return null;
}
?
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) {
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
this.requestTable.remove(nextFilePath);
return result.getMappedFile();
}
} else {
log.error("find preallocate mmap failed, this never happen");
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
?
return null;
}
異步通過調(diào)用mmapOperation方法唧瘾,異步創(chuàng)建MappedFile
/**
* Only interrupted by the external thread, will return false
*/
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
if (null == expectedRequest) {
log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize());
return true;
}
if (expectedRequest != req) {
log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
return true;
}
?
if (req.getMappedFile() == null) {
long beginTime = System.currentTimeMillis();
?
MappedFile mappedFile;
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
?
long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
if (eclipseTime > 10) {
int queueSize = this.requestQueue.size();
log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
+ " " + req.getFilePath() + " " + req.getFileSize());
}
?
// pre write mappedFile
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMapedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
?
req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
this.hasException = true;
return false;
} catch (IOException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.hasException = true;
if (null != req) {
requestQueue.offer(req);
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
}
}
} finally {
if (req != null && isSuccess)
req.getCountDownLatch().countDown();
}
return true;
}