- 發(fā)送到Kafka的消息最終都是要落盤(pán)存儲(chǔ)到磁盤(pán)上;
- 本章涉及到的類(lèi):
- OffsetIndex;
- LogSegment;
OffsetIndex類(lèi)
- 所在文件: core/src/main/scala/kafka/log/OffsetIndex.scala
-
作用: 我們知道所有發(fā)送到kafka的消息都是以
Record
的結(jié)構(gòu)(Kafka中Message存儲(chǔ)相關(guān)類(lèi)大揭密)寫(xiě)入到本地文件, 有寫(xiě)就要有讀,讀取時(shí)一般是從給定的offset開(kāi)始讀取,這個(gè)offset是邏輯offset, 需要轉(zhuǎn)換成文件的實(shí)際偏移量, 為了加速這個(gè)轉(zhuǎn)換, kafka針對(duì)每個(gè)log文件,提供了index文件, index文件采用稀疏索引的方式, 只記錄部分log offset到file position的轉(zhuǎn)換, 然后還需要在log文件中進(jìn)行少量的順序遍歷, 來(lái)精確定位到需要的Record
; - index文件結(jié)構(gòu): 文件里存的是一條條的log offset與file position的映射, 每條記錄8個(gè)字節(jié),前4個(gè)字節(jié)是log offset, 后4個(gè)字節(jié)是file position, 這樣的每一條映射信息我們可以稱(chēng)為是一個(gè)slot
- 讀寫(xiě)方式: 為了加速index文件的讀寫(xiě), 采用了文件內(nèi)存映射的方式:
/* initialize the memory mapping for this index */
private var mmap: MappedByteBuffer =
{
val newlyCreated = file.createNewFile()
val raf = new RandomAccessFile(file, "rw")
try {
/* pre-allocate the file if necessary */
if(newlyCreated) {
if(maxIndexSize < 8)
throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
raf.setLength(roundToExactMultiple(maxIndexSize, 8))
}
/* memory-map the file */
val len = raf.length()
val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
/* set the position in the index for the next entry */
if(newlyCreated)
idx.position(0)
else
// if this is a pre-existing index, assume it is all valid and set position to last entry
idx.position(roundToExactMultiple(idx.limit, 8))
idx
} finally {
CoreUtils.swallow(raf.close())
}
}
- 主要方法:
-
def lookup(targetOffset: Long): OffsetPosition
: 查找小于或等于targetOffset
的最大Offset;
-
maybeLock(lock) {
val idx = mmap.duplicate
val slot = indexSlotFor(idx, targetOffset)
if(slot == -1)
OffsetPosition(baseOffset, 0)
else
OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
}
-
private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int
:采用二分法查找到對(duì)于targetOffset
在index文件中的slot
// binary search for the entry 二分查找法
var lo = 0
var hi = entries-1
while(lo < hi) {
val mid = ceil(hi/2.0 + lo/2.0).toInt
val found = relativeOffset(idx, mid)
if(found == relOffset)
return mid
else if(found < relOffset)
lo = mid
else
hi = mid - 1
}
-
def append(offset: Long, position: Int)
: 向index文件中追加一個(gè)offset/location的映射信息 -
def truncateTo(offset: Long)
: 按給定的offset,找到對(duì)應(yīng)的slot, 然后截?cái)?/li> -
def resize(newSize: Int)
: 重新設(shè)置index文件size, 但保持當(dāng)前mmap的position不變
inLock(lock) {
val raf = new RandomAccessFile(file, "rw")
val roundedNewSize = roundToExactMultiple(newSize, 8)
val position = this.mmap.position
/* Windows won't let us modify the file length while the file is mmapped :-( */
if(Os.isWindows)
forceUnmap(this.mmap)
try {
raf.setLength(roundedNewSize)
this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
this.maxEntries = this.mmap.limit / 8
this.mmap.position(position)
} finally {
CoreUtils.swallow(raf.close())
}
}
- 有意思的一件事:
上面我們說(shuō)過(guò)這個(gè)index文件的讀取是使用了內(nèi)存文件映射MappedByteBuffer
, 然后并沒(méi)有找到相應(yīng)的unmap(實(shí)際上是沒(méi)有這方法)的調(diào)用, 這個(gè)會(huì)不會(huì)有問(wèn)題呢?遇到google了一下, 果然有發(fā)現(xiàn): Long GC pause harming broker performance which is caused by mmap objects created for OffsetIndex,
在實(shí)際應(yīng)用中確實(shí)遇到了這樣的問(wèn)題,一直沒(méi)搞明白為什么IO會(huì)升高.
LogSegment
- 所在文件: core/src/main/scala/kafka/log/LogSegment.scala
- 作用: 封裝對(duì)消息落地后的log和index文件的所有操作
- 類(lèi)定義:
class LogSegment(val log: FileMessageSet,
val index: OffsetIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
time: Time) extends Loggin
可以看到使用FileMessageSet來(lái)操作Log文件, 使用OffsetIndex
來(lái)操作Index文件
- 主要方法:
-
def size: Long = log.sizeInBytes()
: 返回當(dāng)前l(fā)og文件的大小 -
def append(offset: Long, messages: ByteBufferMessageSet)
:追加msg到log文件尾部,必要時(shí)更新index文件
-
if (messages.sizeInBytes > 0) {
// append an entry to the index (if needed)
// index采用的是稀疏索引, 所以先判斷是否需要寫(xiě)入
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
index.append(offset, log.sizeInBytes())
this.bytesSinceLastIndexEntry = 0
}
// append the messages
log.append(messages) //追加msg到log文件尾部
this.bytesSinceLastIndexEntry += messages.sizeInBytes
}
-
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size): FetchDataInfo
: 根據(jù)給定的offset信息等讀取相應(yīng)的msg 和offset信息,構(gòu)成FetchDataInfo
返回
val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)
// if the size is zero, still return a log segment but with zero size
if(maxSize == 0)
return FetchDataInfo(offsetMetadata, MessageSet.Empty)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val length =
maxOffset match {
case None =>
// no max offset, just read until the max position
min((maxPosition - startPosition.position).toInt, maxSize)
case Some(offset) => {
// there is a max offset, translate it to a file position and use that to calculate the max read size
if(offset < startOffset)
throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
val mapping = translateOffset(offset, startPosition.position)
val endPosition =
if(mapping == null)
logSize // the max offset is off the end of the log, use the end of the file
else
mapping.position
min(min(maxPosition, endPosition) - startPosition.position, maxSize).toInt
}
}
FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
實(shí)際上最終是調(diào)用FileMessageSet
的read
方法讀取
-
def recover(maxMessageSize: Int): Int
:讀取當(dāng)前的log文件內(nèi)容,重新構(gòu)建index文件
//逐條讀取log里的msg, 然后構(gòu)建index文件
val iter = log.iterator(maxMessageSize)
try {
while(iter.hasNext) {
val entry = iter.next
entry.message.ensureValid()
if(validBytes - lastIndexEntry > indexIntervalBytes) {
// we need to decompress the message, if required, to get the offset of the first uncompressed message
val startOffset =
entry.message.compressionCodec match {
case NoCompressionCodec =>
entry.offset
case _ =>
ByteBufferMessageSet.deepIterator(entry.message).next().offset
}
index.append(startOffset, validBytes)
lastIndexEntry = validBytes
}
validBytes += MessageSet.entrySize(entry.message)
}
} catch {
case e: InvalidMessageException =>
logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage))
}
-
def truncateTo(offset: Long): Int
: 根據(jù)給定的offset截?cái)鄉(xiāng)og和index文件
val mapping = translateOffset(offset)
if(mapping == null)
return 0
index.truncateTo(offset)
// after truncation, reset and allocate more space for the (new currently active) index
index.resize(index.maxIndexSize)
val bytesTruncated = log.truncateTo(mapping.position)
if(log.sizeInBytes == 0)
created = time.milliseconds
bytesSinceLastIndexEntry = 0
bytesTruncated
-
def nextOffset(): Long
: 獲取下一個(gè)offset值, 其實(shí)就是當(dāng)前最大的offset + 1
val ms = read(index.lastOffset, None, log.sizeInBytes)
if(ms == null) {
baseOffset
} else {
ms.messageSet.lastOption match {
case None => baseOffset
case Some(last) => last.nextOffset
}
}