MappedFileQueue
?前面已經(jīng)介紹了RocketMQ跟存儲(chǔ)交互的底層封裝對(duì)象mappedFile
茬祷。而跟CommitLog悼凑,ConsumeQueue進(jìn)行交互的并不是mappedFile
虏缸,而是對(duì)其進(jìn)一步封裝的MappedFileQueue
類。
屬性介紹
//文件的存儲(chǔ)路徑
private final String storePath;
//映射文件大小,指的是單個(gè)文件的大小整以,比如CommitLog大小為1G
private final int mappedFileSize;
//并發(fā)線程安全隊(duì)列存儲(chǔ)映射文件
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
private final AllocateMappedFileService allocateMappedFileService;
//刷新完的位置
private long flushedWhere = 0;
//提交完成的位置
private long committedWhere = 0;
//存儲(chǔ)時(shí)間
private volatile long storeTimestamp = 0;
?MappedFileQueue
這個(gè)類的屬性相對(duì)來(lái)說比較少,其中需要說的是,AllocateMappedFileService
類型的字段践付,這個(gè)對(duì)象的作用是根據(jù)情況來(lái)決定是否需要提前創(chuàng)建好MappedFile
對(duì)象供后續(xù)的直接使用。而這個(gè)參數(shù)是在構(gòu)造MappedFileQueue
對(duì)象的時(shí)候的一個(gè)參數(shù)缺厉。只有在CommitLog
中構(gòu)造時(shí)才會(huì)傳入AllocateMappedFileService
永高,在ConsumeQueue
并沒有傳入。
方法介紹
構(gòu)造方法
?MappedFileQueue
只有一個(gè)全參構(gòu)造器提针,分別是傳入文件的存儲(chǔ)路徑storePath
命爬,單個(gè)存儲(chǔ)文件的大小mappedFileSize
和提前創(chuàng)建MappedFile
對(duì)象的allocateMappedFileService
public MappedFileQueue(final String storePath, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService) {
//指定文件的存儲(chǔ)路徑
this.storePath = storePath;
//指定單個(gè)文件的大小
this.mappedFileSize = mappedFileSize;
this.allocateMappedFileService = allocateMappedFileService;
}
檢查文件是否完整checkSelf
/**
* 檢查文件的是否完整,檢查的方式辐脖。上一個(gè)文件的起始偏移量減去當(dāng)前文件的起始偏移量饲宛,如果差值=mappedFileSize那么說明文件是完整的,否則有損壞
*/
public void checkSelf() {
//檢查文件組是否為空
if (!this.mappedFiles.isEmpty()) {
//對(duì)文件進(jìn)行迭代嗜价,一個(gè)一個(gè)進(jìn)行檢查
Iterator<MappedFile> iterator = mappedFiles.iterator();
MappedFile pre = null;
while (iterator.hasNext()) {
MappedFile cur = iterator.next();
if (pre != null) {
//用當(dāng)前文件的其實(shí)偏移量-上一個(gè)文件的其實(shí)偏移量 正常情況下應(yīng)該等于一個(gè)文件的大小艇抠。如果不相等,說明文件存在問題
if (cur.getFileFromOffset() - pre.getFileFromOffset() != this.mappedFileSize) {
LOG_ERROR.error("[BUG]The mappedFile queue's data is damaged, the adjacent mappedFile's offset don't match. pre file {}, cur file {}",
pre.getFileName(), cur.getFileName());
}
}
pre = cur;
}
}
}
?這里檢查文件是否被破壞的原理久锥,就是檢查文件的大小是不是等于前一個(gè)文件的起始偏移量和后一個(gè)文件的起始偏移量是不是等于文件大小家淤。而這里的起始偏移量又是在MappedFile
進(jìn)行獲取的fileFromOffset
,而這個(gè)值就是我們?cè)跇?gòu)造MappedFile
的時(shí)候傳入的文件名轉(zhuǎn)化得到的
private void init(final String fileName, final int fileSize) throws IOException {
//根據(jù)文件的名稱計(jì)算文件其實(shí)的偏移量
this.fileFromOffset = Long.parseLong(this.file.getName());
}
加載文件load
public boolean load() {
/**
*System.getProperty("user.home") + File.separator + "store" + File.separator + 文件名
* 根據(jù)傳入的文件保存路徑storePath 來(lái)獲取文件
*/
File dir = new File(this.storePath);
File[] files = dir.listFiles();
//文件列表不為空則進(jìn)行加載
if (files != null) {
// ascending order
//對(duì)文件進(jìn)行排序
Arrays.sort(files);
for (File file : files) {
//隊(duì)列映射文件的大小不等于設(shè)置的文件類型的大小瑟由,說明加載到了最后的一個(gè)文件 比如 如果是commitLog那么對(duì)于的大小應(yīng)該為1G
if (file.length() != this.mappedFileSize) {
log.warn(file + "\t" + file.length()
+ " length not matched message store config value, ignore it");
return true;
}
try {
//根據(jù)文件的路徑和文件大小絮重,創(chuàng)建對(duì)應(yīng)的文件映射,然后加入到映射列表中
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
mappedFile.setCommittedPosition(this.mappedFileSize);
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
} catch (IOException e) {
log.error("load file " + file + " error", e);
return false;
}
}
}
return true;
}
?這里的邏輯比較簡(jiǎn)單,就是根據(jù)傳入的文件路徑青伤,加載對(duì)應(yīng)的文件夾下面的文件督怜,并創(chuàng)建文件映射,并加入到文件映射列表中去狠角。這個(gè)方法在RocketMQ啟動(dòng)的時(shí)候回調(diào)用号杠,用來(lái)加載系統(tǒng)中已經(jīng)存在的消息日志文件。
根據(jù)時(shí)間戳獲取文件getMappedFileByTime
public MappedFile getMappedFileByTime(final long timestamp) {
//獲取所有的文件映射對(duì)象MappedFile
Object[] mfs = this.copyMappedFiles(0);
//為null說明 mappedFiles 中沒有MappedFile
if (null == mfs)
return null;
for (int i = 0; i < mfs.length; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
//如果文件的最后修改時(shí)間大于等于參數(shù)時(shí)間丰歌,說文件在當(dāng)前傳入的時(shí)間之后進(jìn)行修改了究流,就是需要尋找的文件
if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
return mappedFile;
}
}
//如果沒有找到合適的MappedFile 就用最后一個(gè)
return (MappedFile) mfs[mfs.length - 1];
}
?這個(gè)方法主要使用的位置在ConsumeQueue
中,在通過時(shí)間戳來(lái)找文件中的消息的偏移量动遭。
根據(jù)偏移量獲取文件findMappedFileByOffset
public MappedFile findMappedFileByOffset(final long offset) {
//根據(jù)偏移量來(lái)找映射文件芬探,如果沒有找到文件的情況下不返回映射文件列表第一個(gè)映射文件
return findMappedFileByOffset(offset, false);
}
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try {
//獲取隊(duì)列中第一個(gè)映射文件
MappedFile firstMappedFile = this.getFirstMappedFile();
//獲取隊(duì)列中最后一個(gè)映射文件
MappedFile lastMappedFile = this.getLastMappedFile();
//如果不存在文件則直接返回null
if (firstMappedFile != null && lastMappedFile != null) {
//如果要查找的偏移量offset不在所有的文件偏移量范圍內(nèi),則打印錯(cuò)誤日志
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
offset,
firstMappedFile.getFileFromOffset(),
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
this.mappedFileSize,
this.mappedFiles.size());
} else {
//(指定Offset-第一個(gè)文件的其實(shí)偏移量)/文件大小=第幾個(gè)文件夾
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
MappedFile targetFile = null;
try {
//獲取指定的映射文件
targetFile = this.mappedFiles.get(index);
} catch (Exception ignored) {
}
//offset在指定的映射文件中厘惦,則直接返回對(duì)應(yīng)的映射文件
if (targetFile != null && offset >= targetFile.getFileFromOffset()
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
return targetFile;
}
//如果按索引在隊(duì)列中找不到映射文件就遍歷隊(duì)列查找映射文件
for (MappedFile tmpMappedFile : this.mappedFiles) {
if (offset >= tmpMappedFile.getFileFromOffset()
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
return tmpMappedFile;
}
}
}
//如果指定了沒有找到文件就返回第一個(gè)映射文件偷仿,則直接返回第一個(gè)映射文件
if (returnFirstOnNotFound) {
return firstMappedFile;
}
}
} catch (Exception e) {
log.error("findMappedFileByOffset Exception", e);
}
return null;
}
?如上代碼分析中有兩個(gè)根據(jù)偏移量獲取映射文件的方法,其中有兩個(gè)參數(shù)的方法是在知道偏移量所指的信息在第一個(gè)映射文件中的時(shí)候才調(diào)用宵蕉,而調(diào)用的這個(gè)方法的基本就是寫入信息或者刷新信息的時(shí)候調(diào)用酝静。關(guān)于文件的刷新和提交可以看上一篇對(duì)MappedFile
分析的文章
根據(jù)偏移量截?cái)辔募?code>truncateDirtyFiles
public void truncateDirtyFiles(long offset) {
List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
/**
* 如果 文件的起始偏移量>指定截?cái)嗥屏縪ffset 那么整個(gè)文件需要?jiǎng)h除
* 如果 文件的起始偏移量<指定截?cái)嗥屏縪ffset<文件的最大偏移量 那么文件中的部分記錄需要清除
* 如果 文件的最大偏移量<指定截?cái)嗥屏縪ffset 那么這個(gè)文件不需要進(jìn)行處理
*/
for (MappedFile file : this.mappedFiles) {
//文件的開始偏移量+文件大小= 文件尾offset
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
//當(dāng)前文件的最大偏移量大于 指定截?cái)辔恢玫钠屏浚f明需要截?cái)嗟奈恢镁褪窃谶@個(gè)文件中
if (fileTailOffset > offset) {
//如果文件初始偏移量小于指定的偏移量羡玛,說明只需要截?cái)辔募械囊徊糠? if (offset >= file.getFileFromOffset()) {
//設(shè)置映射文件寫的位置
file.setWrotePosition((int) (offset % this.mappedFileSize));
//設(shè)置文件commit的位置
file.setCommittedPosition((int) (offset % this.mappedFileSize));
//設(shè)置文件刷新的位置
file.setFlushedPosition((int) (offset % this.mappedFileSize));
} else {
//如果文件的起始偏移量也比指定的偏移量大别智,則說明這個(gè)文件整個(gè)需要丟棄
file.destroy(1000);
//需要?jiǎng)h除的文件加上這個(gè)文件
willRemoveFiles.add(file);
}
}
}
//刪除映射的文件
this.deleteExpiredFile(willRemoveFiles);
}
?截?cái)辔募姆椒ǎ?code>load方法一樣稼稿,在RocketMQ啟動(dòng)的時(shí)候會(huì)使用到薄榛,用來(lái)刪除那些無(wú)效的或者損壞的需要?jiǎng)h除的消息。
獲取最后一個(gè)文件getLastMappedFile
public MappedFile getLastMappedFile() {
MappedFile mappedFileLast = null;
//如果文件隊(duì)列不為空則獲取最后一個(gè)文件
while (!this.mappedFiles.isEmpty()) {
try {
//直接獲取最后一個(gè)映射文件
mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
break;
} catch (IndexOutOfBoundsException e) {
//continue;
} catch (Exception e) {
log.error("getLastMappedFile has exception.", e);
break;
}
}
return mappedFileLast;
}
?這個(gè)方法的作用基本就是獲取最后一個(gè)映射文件让歼,然后進(jìn)行消息的插入敞恋,或者獲取最大消息偏移量等信息。
根據(jù)時(shí)間刪除過期文件deleteExpiredFileByTime
public int deleteExpiredFileByTime(final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately) {
//獲取映射文件列表
Object[] mfs = this.copyMappedFiles(0);
//如果映射文件列表為空直接返回
if (null == mfs){
return 0;
}
int mfsLength = mfs.length - 1;
int deleteCount = 0;
List<MappedFile> files = new ArrayList<MappedFile>();
if (null != mfs) {
//對(duì)映射文件進(jìn)行遍歷
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
//文件最后的修改時(shí)間+過期時(shí)間= 文件最終能夠存活的時(shí)間
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
//如果當(dāng)前時(shí)間大于文件能夠存活的最大時(shí)間谋右,比如 當(dāng)前是2021-03-18 12:00:00 硬猫,而文件最大存活時(shí)間2021-03-18 11:00:00 就需要?jiǎng)h除「闹矗或者調(diào)用方法的時(shí)候指定了馬上刪除
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
//刪除文件啸蜜,就是解除對(duì)文件的引用
if (mappedFile.destroy(intervalForcibly)) {
//要?jiǎng)h除的的文件加入到要?jiǎng)h除的集合中
files.add(mappedFile);
//增加計(jì)數(shù)
deleteCount++;
//一次性最多刪除的人為10
if (files.size() >= DELETE_FILES_BATCH_MAX) {
break;
}
//如果刪除時(shí)間間隔大于0,并且沒有循環(huán)玩辈挂,則睡眠指定的刪除間隔時(shí)長(zhǎng)后在殺出
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
Thread.sleep(deleteFilesInterval);
} catch (InterruptedException e) {
}
}
} else {
break;
}
} else {
//avoid deleting files in the middle
break;
}
}
}
//從文件映射隊(duì)列中刪除對(duì)應(yīng)的文件映射
deleteExpiredFile(files);
//返回刪除的文件個(gè)數(shù)
return deleteCount;
}
?這個(gè)方法被用在定期刪除過去的CommitLog
文件衬横,來(lái)保證內(nèi)存空間。
根據(jù)偏移量刪除文件deleteExpiredFileByOffset
public int deleteExpiredFileByOffset(long offset, int unitSize) {
Object[] mfs = this.copyMappedFiles(0);
List<MappedFile> files = new ArrayList<MappedFile>();
int deleteCount = 0;
if (null != mfs) {
int mfsLength = mfs.length - 1;
for (int i = 0; i < mfsLength; i++) {
boolean destroy;
MappedFile mappedFile = (MappedFile) mfs[i];
//unitSize是一個(gè)文件格式占用的長(zhǎng)度 比如ConsumeQueue中一條記錄長(zhǎng)度為20byte 這里是獲取一個(gè)文件中最后一條記錄的起始偏移量呢岗,
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
if (result != null) {
//獲取文件中最后一條記錄的偏移量
long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
result.release();
//如果最大偏移量 < 指定的偏移量冕香,則需要?jiǎng)h除
destroy = maxOffsetInLogicQueue < offset;
if (destroy) {
log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
+ maxOffsetInLogicQueue + ", delete it");
}
} else if (!mappedFile.isAvailable()) { // Handle hanged file.
log.warn("Found a hanged consume queue file, attempting to delete it.");
destroy = true;
} else {
log.warn("this being not executed forever.");
break;
}
//刪除文件
if (destroy && mappedFile.destroy(1000 * 60)) {
files.add(mappedFile);
deleteCount++;
} else {
break;
}
}
}
// 刪除映射文件隊(duì)列中的映射文件=》
deleteExpiredFile(files);
return deleteCount;
}
?按照偏移量刪除文件用于刪除過期的ConsumeQueue
文件,因?yàn)?code>ConsumeQueue文件中信息的記錄是定長(zhǎng)的20byte后豫,如果偏移量小于指定的偏移量表示都是之前的消息悉尾,可以直接刪除。
其他跟MappedFile
有關(guān)聯(lián)的方法
MappedFile |
MappedFileQueue |
---|---|
flush |
flush |
commit |
commit |
destroy |
destroy |
getFileFromOffset 獲取文件的初始偏移量 |
getMinOffset 獲取文件的最小偏移量挫酿,就是獲取映射文件隊(duì)列的第一個(gè)文件构眯,然后調(diào)用getFileFromOffset
|
getFileFromOffset +getReadPosition
|
getMaxOffset 獲取文件最大偏移量,就是獲取最后一個(gè)映射文件的起始偏移量+文件的寫入的位置 |
getFileFromOffset +getWrotePosition
|
getMaxWrotePosition 獲取文件最大偏移量早龟,就是獲取最后一個(gè)映射文件的起始偏移量+文件的寫入的位置 |
remainHowManyDataToCommit 獲取文件尚未提交的長(zhǎng)度 |
|
remainHowManyDataToFlush 獲取文件尚未刷新的長(zhǎng)度 |
在CommitLog
和ConsumeQueue
中的使用
方法 | CommitLog | ConsumeQueue |
---|---|---|
checkSelf | 定時(shí)檢查文件是否完整 | 定時(shí)檢查文件是否完整 |
load | MQ啟動(dòng)時(shí)加載CommitLog | MQ啟動(dòng)時(shí)加載ConsumeQueue |
getMappedFileByTime | 無(wú) | 根據(jù)時(shí)間戳查找特定的topic和queue中的消息 |
findMappedFileByOffset | 無(wú) | 根據(jù)index獲取消息 |
truncateDirtyFiles | MQ啟動(dòng)時(shí)截?cái)酂o(wú)用日志 | MQ啟動(dòng)時(shí)截?cái)酂o(wú)用日志 |
getLastMappedFile | 保存消息時(shí)獲取文件 | 保存消息時(shí)獲取文件 |
deleteExpiredFileByTime | 定時(shí)刪除過期的文件 | 無(wú) |
deleteExpiredFileByOffset | 無(wú) | 定時(shí)刪除過期的文件 |
?可以看到惫霸,MappedFileQueue
類中的方法基本是操作MappedFile
組成的集合,間接的操作MappedFile
達(dá)到對(duì)日志文件組的增刪改的操作葱弟,都是一些提供給CommitLog
和ConsumeQueue
用來(lái)對(duì)日志文件進(jìn)行查找壹店,刪除的基礎(chǔ)方法。