主要是在極客時(shí)間中的學(xué)習(xí)筆記摘入
kakfa源碼閱讀
第一部分 日志
- 日志組織架構(gòu)
kafka日志對(duì)象有多個(gè)日志端對(duì)象組成他膳,包括消息日志文件(.log)欣尼、位移索引文件(.index)版姑、時(shí)間戳索引文件(.timeindex)以及已中止(Aborted)事務(wù)的索引文件(.txnindex)
logsegment(kafka.log.LogSegment)
構(gòu)造函數(shù)中幾個(gè)重要的參數(shù)
@nonthreadsafe
class LogSegment private[log] (val log: FileRecords,
val lazyOffsetIndex: LazyIndex[OffsetIndex],
val lazyTimeIndex: LazyIndex[TimeIndex],
val txnIndex: TransactionIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time)
filerecords: 實(shí)際保存kafka的消息對(duì)象
位移索引文件
時(shí)間索引未見(jiàn)
已中止索引文件
indexIntervalBytes 其實(shí)就是 Broker 端參數(shù) log.index.interval.bytes 值,它控制了日志段對(duì)象新增索引項(xiàng)的頻率唇兑。默認(rèn)情況下坎弯,日志段至少新寫(xiě)入 4KB 的消息數(shù)據(jù)才會(huì)新增一條索引項(xiàng)纺涤。而 rollJitterMs 是日志段對(duì)象新增倒計(jì)時(shí)的“擾動(dòng)值”。因?yàn)槟壳?Broker 端日志段新增倒計(jì)時(shí)是全局設(shè)置抠忘,這就是說(shuō)撩炊,在未來(lái)的某個(gè)時(shí)刻可能同時(shí)創(chuàng)建多個(gè)日志段對(duì)象,這將極大地增加物理磁盤(pán) I/O 壓力崎脉。有了 rollJitterMs 值的干擾拧咳,每個(gè)新增日志段在創(chuàng)建時(shí)會(huì)彼此岔開(kāi)一小段時(shí)間,這樣可以緩解物理磁盤(pán)的 I/O 負(fù)載瓶頸囚灼。
baseoffset : 每個(gè)日志端保存自己的起始位移大小骆膝,一旦對(duì)象唄創(chuàng)建,則是固定的啦撮,不能再被修改
append 方法
/**
* Append the given messages starting with the given offset. Add
* an entry to the index if needed.
*
* It is assumed this method is being called from within a lock.
*
* @param largestOffset The last offset in the message set 最大位移
* @param largestTimestamp The largest timestamp in the message set. 最大時(shí)間戳
* @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. 最大時(shí)間戳對(duì)應(yīng)的消息位移
* @param records The log entries to append. 真正要寫(xiě)入的消息集合
* @return the physical position in the file of the appended records
* @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow
*/
@nonthreadsafe
def append(largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords): Unit = {
//判斷日志端是否為空谭网,,如果日志端為空赃春,kakfa需要記錄要寫(xiě)入消息集合的最大時(shí)間戳,并將其作為后面新增日志端倒計(jì)時(shí)的依據(jù)
if (records.sizeInBytes > 0) {
trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
val physicalPosition = log.sizeInBytes()
if (physicalPosition == 0)
rollingBasedTimestamp = Some(largestTimestamp)
// 確保輸入?yún)?shù)最大位移值是合法的劫乱, 確保lastest-baseoffset = [0,Int.MaxValue]之間 织中,這是一個(gè)已知常見(jiàn)的問(wèn)題
ensureOffsetInRange(largestOffset)
// append真正的寫(xiě)入,將內(nèi)存中的消息對(duì)象寫(xiě)入操作系統(tǒng)的頁(yè)緩存當(dāng)中去
// append the messages
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
// Update the in memory max timestamp and corresponding offset. 更新最大日志最大時(shí)間戳衷戈,最大時(shí)間戳所屬的位移值屬性狭吼,每個(gè)日志段都要保存最大時(shí)間戳信息和所屬消息的位移信息
if (largestTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = largestTimestamp
offsetOfMaxTimestampSoFar = shallowOffsetOfMaxTimestamp
}
// append an entry to the index (if needed) 更新索引項(xiàng),以及寫(xiě)入的字節(jié)數(shù)殖妇;日志端沒(méi)寫(xiě)入4KB刁笙,數(shù)據(jù)就要更新索引項(xiàng),當(dāng)寫(xiě)入字節(jié)數(shù)操作4KB的時(shí)候谦趣,append方法會(huì)調(diào)用索引對(duì)象的append方法新增索引項(xiàng)疲吸,同時(shí)清空已寫(xiě)入的字節(jié)數(shù)目
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex.append(largestOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
bytesSinceLastIndexEntry = 0
}
bytesSinceLastIndexEntry += records.sizeInBytes
}
}
read 方法
/**
* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
*
* @param startOffset A lower bound on the first offset to include in the message set we read 要讀取的第一條信息位移
* @param maxSize The maximum number of bytes to include in the message set we read 能讀取的最大字節(jié)數(shù)
* @param maxPosition The maximum position in the log segment that should be exposed for read 能讀到的最大文件位置
* @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists) 是否允許在消息體過(guò)大時(shí),至少返回地第一條信息(為了保證不出現(xiàn)消費(fèi)餓死的情況)
*
* @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
* or null if the startOffset is larger than the largest offset in this log
*/
@threadsafe
def read(startOffset: Long,
maxSize: Int,
maxPosition: Long = size,
minOneMessage: Boolean = false): FetchDataInfo = {
if (maxSize < 0)
throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")
// 定位要讀取的起始文件位置前鹅, kafka要更加索引信息找到對(duì)應(yīng)物理文件位置才開(kāi)始讀取消息
val startOffsetAndSize = translateOffset(startOffset)
// if the start position is already off the end of the log, return null
if (startOffsetAndSize == null)
return null
val startPosition = startOffsetAndSize.position
val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
val adjustedMaxSize =
if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
else maxSize
// return a log segment but with zero size in the case below
if (adjustedMaxSize == 0)
return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
//舉個(gè)例子摘悴,假設(shè) maxSize=100,maxPosition=300舰绘,startPosition=250蹂喻,那么 read 方法只能讀取 50 字節(jié)葱椭,因?yàn)?maxPosition - startPosition = 50。我們把它和 maxSize 參數(shù)相比較口四,其中的最小值就是最終能夠讀取的總字節(jié)數(shù)孵运。
val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)
// 從指定位置開(kāi)始讀取指定大小的消息集合
FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
}
recover方法
/**
* Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes
* from the end of the log and index.
* Broker 在啟動(dòng)時(shí)會(huì)從磁盤(pán)上加載所有日志段信息到內(nèi)存中,并創(chuàng)建相應(yīng)的 LogSegment 對(duì)象實(shí)例蔓彩。在這個(gè)過(guò)程中掐松,它需要執(zhí)行一系列的操作。
* @param producerStateManager Producer state corresponding to the segment's base offset. This is needed to recover
* the transaction index.
* @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery.
* @return The number of bytes truncated from the log
* @throws LogSegmentOffsetOverflowException if the log segment contains an offset that causes the index offset to overflow
*/
@nonthreadsafe
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
offsetIndex.reset()
timeIndex.reset()
txnIndex.reset()
var validBytes = 0
var lastIndexEntry = 0
maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
try {
for (batch <- log.batches.asScala) {
batch.ensureValid()
ensureOffsetInRange(batch.lastOffset)
// The max timestamp is exposed at the batch level, so no need to iterate the records
if (batch.maxTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = batch.maxTimestamp
offsetOfMaxTimestampSoFar = batch.lastOffset
}
// Build offset index
if (validBytes - lastIndexEntry > indexIntervalBytes) {
offsetIndex.append(batch.lastOffset, validBytes)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
lastIndexEntry = validBytes
}
validBytes += batch.sizeInBytes()
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
leaderEpochCache.foreach { cache =>
if (batch.partitionLeaderEpoch >= 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
}
updateProducerState(producerStateManager, batch)
}
}
} catch {
case e@ (_: CorruptRecordException | _: InvalidRecordException) =>
warn("Found invalid messages in log segment %s at byte offset %d: %s. %s"
.format(log.file.getAbsolutePath, validBytes, e.getMessage, e.getCause))
}
val truncated = log.sizeInBytes - validBytes
if (truncated > 0)
debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")
log.truncateTo(validBytes)
offsetIndex.trimToValidSize()
// A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true)
timeIndex.trimToValidSize()
truncated
}
//注意
recover 開(kāi)始時(shí)粪小,代碼依次調(diào)用索引對(duì)象的 reset 方法清空所有的索引文件大磺,之后會(huì)開(kāi)始遍歷日志段中的所有消息集合或消息批次(RecordBatch)。對(duì)于讀取到的每個(gè)消息集合探膊,日志段必須要確保它們是合法的杠愧,這主要體現(xiàn)在兩個(gè)方面:該集合中的消息必須要符合 Kafka 定義的二進(jìn)制格式;該集合中最后一條消息的位移值不能越界逞壁,即它與日志段起始位移的差值必須是一個(gè)正整數(shù)值流济。