概述
IndexFile由幾個(gè)部分構(gòu)成:
IndexHeader 文件頭(40字節(jié))
SlotTable 槽表(4字節(jié)500w記錄)
Index Linked List 索引鏈表(20字節(jié)2000w記錄)
如下圖所示(參照refer)
因此一個(gè)IndexFile共計(jì)40 + 4500w+ 202000w個(gè)字節(jié)
每個(gè)部分占用空間分布如下
下面針對IndexHeader以及IndexFile兩個(gè)類谤祖,對各個(gè)部分進(jìn)行講解
IndexHeader:文件頭
索引文件頭信息40個(gè)字節(jié)的數(shù)據(jù)組成,如下所示
各字段意義如下
beginTimestamp 8位long類型慈格,索引文件構(gòu)建第一個(gè)索引的消息落在broker的時(shí)間
endTimestamp 8位long類型,索引文件構(gòu)建最后一個(gè)索引消息落broker時(shí)間
beginPhyOffset 8位long類型位衩,索引文件構(gòu)建第一個(gè)索引的消息commitLog偏移量
endPhyOffset 8位long類型,索引文件構(gòu)建最后一個(gè)索引消息commitLog偏移量
hashSlotCount 4位int類型,構(gòu)建索引占用的槽位數(shù)(這個(gè)值貌似沒有具體作用)
indexCount 4位int類型吻贿,索引文件中構(gòu)建的索引個(gè)數(shù)
源碼如下
public class IndexHeader {
public static final int INDEX_HEADER_SIZE = 40;//索引文件頭的大小
private static int beginTimestampIndex = 0;
private static int endTimestampIndex = 8;
private static int beginPhyoffsetIndex = 16;
private static int endPhyoffsetIndex = 24;
private static int hashSlotcountIndex = 32;
private static int indexCountIndex = 36;
private final ByteBuffer byteBuffer;
private AtomicLong beginTimestamp = new AtomicLong(0);//開始時(shí)間
private AtomicLong endTimestamp = new AtomicLong(0);//結(jié)束時(shí)間
private AtomicLong beginPhyOffset = new AtomicLong(0);//開始物理偏移
private AtomicLong endPhyOffset = new AtomicLong(0);//結(jié)束物理偏移
private AtomicInteger hashSlotCount = new AtomicInteger(0);
private AtomicInteger indexCount = new AtomicInteger(1);
public IndexHeader(final ByteBuffer byteBuffer) {
this.byteBuffer = byteBuffer;
}
//讀取byteBuffer的內(nèi)容到內(nèi)存中
public void load() {
this.beginTimestamp.set(byteBuffer.getLong(beginTimestampIndex));
this.endTimestamp.set(byteBuffer.getLong(endTimestampIndex));
this.beginPhyOffset.set(byteBuffer.getLong(beginPhyoffsetIndex));
this.endPhyOffset.set(byteBuffer.getLong(endPhyoffsetIndex));
this.hashSlotCount.set(byteBuffer.getInt(hashSlotcountIndex));
this.indexCount.set(byteBuffer.getInt(indexCountIndex));
if (this.indexCount.get() <= 0) {
this.indexCount.set(1);
}
}
/**
* 更新header的各個(gè)記錄
*/
public void updateByteBuffer() {
this.byteBuffer.putLong(beginTimestampIndex, this.beginTimestamp.get());
this.byteBuffer.putLong(endTimestampIndex, this.endTimestamp.get());
this.byteBuffer.putLong(beginPhyoffsetIndex, this.beginPhyOffset.get());
this.byteBuffer.putLong(endPhyoffsetIndex, this.endPhyOffset.get());
this.byteBuffer.putInt(hashSlotcountIndex, this.hashSlotCount.get());
this.byteBuffer.putInt(indexCountIndex, this.indexCount.get());
}
public long getBeginTimestamp() {
return beginTimestamp.get();
}
public void setBeginTimestamp(long beginTimestamp) {
this.beginTimestamp.set(beginTimestamp);
this.byteBuffer.putLong(beginTimestampIndex, beginTimestamp);
}
public long getEndTimestamp() {
return endTimestamp.get();
}
public void setEndTimestamp(long endTimestamp) {
this.endTimestamp.set(endTimestamp);
this.byteBuffer.putLong(endTimestampIndex, endTimestamp);
}
public long getBeginPhyOffset() {
return beginPhyOffset.get();
}
public void setBeginPhyOffset(long beginPhyOffset) {
this.beginPhyOffset.set(beginPhyOffset);
this.byteBuffer.putLong(beginPhyoffsetIndex, beginPhyOffset);
}
public long getEndPhyOffset() {
return endPhyOffset.get();
}
public void setEndPhyOffset(long endPhyOffset) {
this.endPhyOffset.set(endPhyOffset);
this.byteBuffer.putLong(endPhyoffsetIndex, endPhyOffset);
}
public AtomicInteger getHashSlotCount() {
return hashSlotCount;
}
public void incHashSlotCount() {
int value = this.hashSlotCount.incrementAndGet();
this.byteBuffer.putInt(hashSlotcountIndex, value);
}
public int getIndexCount() {
return indexCount.get();
}
//對應(yīng)的索引文件記錄的索引個(gè)數(shù)+1
public void incIndexCount() {
int value = this.indexCount.incrementAndGet();
this.byteBuffer.putInt(indexCountIndex, value);
}
}
注意load,updateByteBuffer,incHashSlotCount,incIndexCount函數(shù)即可
***注意indexCount默認(rèn)是1,不是0鳍怨,下面講IndexFile會用###
IndexFile
這個(gè)類里面包含了SlotTable 以及 Index Linked List兩個(gè)部分呻右,先針對這兩個(gè)部分進(jìn)行講解
邏輯意義
SlotTable
默認(rèn)有500w個(gè)槽(可以理解為hash到這500w個(gè)槽中),每個(gè)槽占4個(gè)字節(jié)鞋喇,也就是一個(gè)int值,這個(gè)int值范圍是[1,2000w],2000w是默認(rèn)的索引文件數(shù)量声滥。
每個(gè)槽中記錄的值的是什么:這一個(gè)slot存放的最新的索引,下標(biāo)是幾(從1開始,到默認(rèn)的2000w)
比如說第5個(gè)槽,之前放了第20個(gè)記錄侦香,slotTable中第5個(gè)槽記錄的值就是20
之后第22條記錄也hash到了第5個(gè)槽落塑,那么slotTable第5個(gè)槽記錄的值就變成22
Index Linked List
索引鏈表,默認(rèn)2000w條記錄,每一條記錄占位20字節(jié)
20字節(jié)構(gòu)成如下:
keyHash: 4位罐韩,int值憾赁,key的hash值
phyOffset:8位,long值散吵,索引消息commitLog偏移量
timeDiff: 4位龙考,int值,索引存儲的時(shí)間與IndexHeader的beginTimestamp時(shí)間差(這是為了節(jié)省空間)
slotValue:4位矾睦,int值盐须,索引對應(yīng)slot的上一條索引的下標(biāo)(據(jù)此完成一條向老記錄的鏈表)
源碼講解
屬性
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static int hashSlotSize = 4;//每一個(gè)槽信息含4字節(jié)狰晚,即該slot對應(yīng)的count數(shù)
private static int indexSize = 20;//每一個(gè)索引信息占據(jù)20字節(jié)
private static int invalidIndex = 0;//第0個(gè)索引信息是invalid的
private final int hashSlotNum;//hash槽的個(gè)數(shù)彻犁,默認(rèn)5million
private final int indexNum;//最多記錄的索引條數(shù),默認(rèn)2千萬
private final MappedFile mappedFile;
private final FileChannel fileChannel;//沒有用到
private final MappedByteBuffer mappedByteBuffer;
private final IndexHeader indexHeader;//索引頭
這里注意invalidIndex是0表示無效索引下標(biāo)绰姻,前面講IndexHeader#indexCount是1,這個(gè)用于判斷特定位置存放索引下標(biāo)時(shí)官紫,是否有效
方法
get,destroy相關(guān)函數(shù)省略
構(gòu)造函數(shù)
/**
* 創(chuàng)建一個(gè)IndexFile
* @param fileName 文件名
* @param hashSlotNum hash槽的個(gè)數(shù)肛宋,默認(rèn)5million
* @param indexNum 最多記錄的索引條數(shù),默認(rèn)2千萬
* @param endPhyOffset 開始物理偏移
* @param endTimestamp 開始的時(shí)間戳
* @throws IOException
*/
public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,
final long endPhyOffset, final long endTimestamp) throws IOException {
int fileTotalSize =
IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
this.mappedFile = new MappedFile(fileName, fileTotalSize);
this.fileChannel = this.mappedFile.getFileChannel();
this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();
this.hashSlotNum = hashSlotNum;
this.indexNum = indexNum;
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
this.indexHeader = new IndexHeader(byteBuffer);
if (endPhyOffset > 0) {//設(shè)置物理偏移
this.indexHeader.setBeginPhyOffset(endPhyOffset);
this.indexHeader.setEndPhyOffset(endPhyOffset);
}
if (endTimestamp > 0) {//設(shè)置時(shí)間
this.indexHeader.setBeginTimestamp(endTimestamp);
this.indexHeader.setEndTimestamp(endTimestamp);
}
}
因?yàn)槭切聞?chuàng)建州藕,所以開始結(jié)束時(shí)間,開始結(jié)束物理偏移一起設(shè)置了
后續(xù)更新時(shí)酝陈,會更新結(jié)束的物理偏移床玻,時(shí)間
load
加載IndexHeader,讀取取byteBuffer的內(nèi)容到內(nèi)存中
public void load() {
this.indexHeader.load();
}
flush
將內(nèi)存中的數(shù)據(jù)更新ByteBuffer沉帮,持久化IndexHeader
public void flush() {
long beginTime = System.currentTimeMillis();
if (this.mappedFile.hold()) {//引用+1
this.indexHeader.updateByteBuffer();
this.mappedByteBuffer.force();//刷到磁盤
this.mappedFile.release();//釋放引用
log.info("flush index file eclipse time(ms) " + (System.currentTimeMillis() - beginTime));
}
}
isWriteFull
是否寫滿了锈死,就是已經(jīng)寫的索引數(shù)是否超過了指定的索引數(shù)(2千萬)
public boolean isWriteFull() {
return this.indexHeader.getIndexCount() >= this.indexNum;
}
isTimeMatched
//[begin,end]時(shí)間與 [indexHeader.getBeginTimestamp(), this.indexHeader.getEndTimestamp()]時(shí)間有重疊
public boolean isTimeMatched(final long begin, final long end) {
boolean result = begin < this.indexHeader.getBeginTimestamp() && end > this.indexHeader.getEndTimestamp();
result = result || (begin >= this.indexHeader.getBeginTimestamp() && begin <= this.indexHeader.getEndTimestamp());
result = result || (end >= this.indexHeader.getBeginTimestamp() && end <= this.indexHeader.getEndTimestamp());
return result;
}
indexKeyHashMethod
獲取key的hash值(int),轉(zhuǎn)成絕對值
public int indexKeyHashMethod(final String key) {
int keyHash = key.hashCode();
int keyHashPositive = Math.abs(keyHash);
if (keyHashPositive < 0)
keyHashPositive = 0;
return keyHashPositive;
}
putKey
存放一條索引消息,更新IndexHeader穆壕,slotTable待牵,Index Linked List三個(gè)記錄
/**
* 寫入索引消息
* @param key 是 topic-key值
* @param phyOffset 物理偏移
* @param storeTimestamp 時(shí)間戳
* @return
**/
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {//還沒寫滿
int keyHash = indexKeyHashMethod(key);//key的hash值
int slotPos = keyHash % this.hashSlotNum;//放在哪個(gè)槽
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;//槽對應(yīng)的信息在文件中的位置
FileLock fileLock = null;
try {
// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
// false);
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);//獲取該槽記錄的key在該文件的第幾個(gè)字節(jié)
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();//timeDiff是相對header的beginTime的偏移
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;//索引該放在文件的哪個(gè)字節(jié)
/**
* 索引信息:
* 先是4字節(jié),keyHash值
* 再4字節(jié)喇勋,是commitLog的物理偏移
* 再8字節(jié)缨该,是時(shí)間戳
* 再4字節(jié),槽位信息川背,代表該槽位上一個(gè)key在文件中的位置(即向前的鏈表)
*/
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);//記錄同一個(gè)槽位的上一個(gè)key的位置(向前的鏈表)
//槽位信息贰拿,記錄該槽最新key在所有索引信息中的下標(biāo)
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());//IndexCount默認(rèn)從1開始的,所以這里就是槽位表記錄最新key的位置
if (this.indexHeader.getIndexCount() <= 1) {//第一條索引熄云,記錄開始的物理偏移膨更,以及時(shí)間
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
this.indexHeader.incHashSlotCount();//槽的數(shù)量+1,這個(gè)slotCount并沒有實(shí)際作用,隨便寫
this.indexHeader.incIndexCount();//索引文件中的索引個(gè)數(shù)+1
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}
具體步驟見思考
selectPhyOffset
根據(jù)key找到slot對應(yīng)的整個(gè)列表缴允,找到keyHash一樣且存儲時(shí)間在給定的[begin,end]范圍內(nèi)的索引列表(最多maxNum條荚守,從新到舊),返回他們的物理偏移
/**
*
* @param phyOffsets 物理偏移列表,用于返回
* @param key 關(guān)鍵字
* @param maxNum phyOffsets最多保存這么多條記錄就返回
* @param begin 開始時(shí)間戳
* @param end 結(jié)束時(shí)間戳
* @param lock:沒有用
*/
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
if (this.mappedFile.hold()) {
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
if (lock) {
// fileLock = this.fileChannel.lock(absSlotPos,
// hashSlotSize, true);
}
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);//獲取對應(yīng)槽的最新的key的下標(biāo)
// if (fileLock != null) {
// fileLock.release();
// fileLock = null;
// }
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {//key對應(yīng)的槽就沒有合理記錄
} else {
for (int nextIndexToRead = slotValue; ; ) {//遍歷鏈表一樣的結(jié)構(gòu)
if (phyOffsets.size() >= maxNum) {//最多紀(jì)錄maxNum個(gè)
break;
}
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;//根據(jù)key的下標(biāo)找到在文件中的位置
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);//鏈表癌椿,找到該slot前一個(gè)key的下標(biāo)
if (timeDiff < 0) {//不斷地找更老的記錄健蕊,timeDiff會越來越小,
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;//根據(jù)基準(zhǔn)時(shí)間戳以及 timeDiff踢俄,得到索引存儲的時(shí)間
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);//存儲時(shí)間戳在[begin,end]內(nèi)
if (keyHash == keyHashRead && timeMatched) {//原有hash值匹配,
phyOffsets.add(phyOffsetRead);
}
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}//鏈表循環(huán),已經(jīng)找到了最老的索引了晴及,可以break了
nextIndexToRead = prevIndexRead;//遍歷到同slot的前一個(gè)key中
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
this.mappedFile.release();
}
}
}
思考
主要內(nèi)容
主要理解IndexFile的三個(gè)構(gòu)成部分
主要理解putKey函數(shù)以及selectPhyOffset函數(shù)
為什么說IndexHeader#hashSlotCount沒用
因?yàn)閕ncHashSlotCount和incIndexCount在IndexFile類中是一起調(diào)用的都办,并不是真正體現(xiàn)占有slot數(shù)量的
slotTable中500項(xiàng),每項(xiàng)4字節(jié)虑稼,這4字節(jié)存儲的值的意義是什么
該槽最新記錄的索引的下標(biāo)琳钉,從1到200w
IndexFile中的鏈表結(jié)構(gòu)的形成
上面已經(jīng)講過,這里再講一下蛛倦,
slotTable記錄該槽最新索引的下標(biāo)
在每一條索引的記錄中歌懒,記錄slotValue代表同一個(gè)槽的上一個(gè)索引的下標(biāo)。
這兩點(diǎn)溯壶,就能根據(jù)槽的值及皂,從最新向最老的索引進(jìn)行查找
putKey步驟總結(jié)
* 先根據(jù)key找到hash值再%hashSlotNum甫男, 得到應(yīng)該放入哪個(gè)槽
* 去slotTable找到槽的位置,獲取該槽最新的索引的下標(biāo),不合理就置0(invalidIndex)
* 記錄時(shí)間戳與 IndexHeader begin時(shí)間的差
* 找到這個(gè)索引應(yīng)該存在哪個(gè)位置验烧,依次寫下keyHash板驳,phyOffset,timeDiff碍拆,slotValue
* slotTable對應(yīng)槽位置更新值為最新的索引值
* 更新count若治,結(jié)束時(shí)間戳,結(jié)束物理偏移等值
代碼中各種偏移
IndexFile文件中感混,依次是
IndexHeader 文件頭(40字節(jié))
SlotTable 槽表(4字節(jié)500w記錄)
Index Linked List 索引鏈表(20字節(jié)2000w記錄)
再去看代碼端幼,就知道各種偏移是干嘛的了
index下標(biāo)是1開始的
IndexHeader#indexCount是1
所以IndexFile中的invalidIndex是0
問題
selectPhyOffset一定準(zhǔn)確嗎
代碼中,判斷的語句是
keyHash == keyHashRead && timeMatched
這里keyHash是String key的一個(gè)hash值
如果不同的key的hash值一樣弧满,然后后面時(shí)間戳也匹配婆跑,怎么辦?
refer
http://blog.csdn.net/quhongwei_zhanqiu/article/details/39153195
http://blog.csdn.net/meilong_whpu/article/details/76921583
IndexFileTest 自帶測試類