kafka 源碼閱讀-LogSegment(一)

主要是在極客時(shí)間中的學(xué)習(xí)筆記摘入

kakfa源碼閱讀

第一部分 日志

  • 日志組織架構(gòu)
kafka日志組成.jpg

kafka日志對(duì)象有多個(gè)日志端對(duì)象組成他膳,包括消息日志文件(.log)欣尼、位移索引文件(.index)版姑、時(shí)間戳索引文件(.timeindex)以及已中止(Aborted)事務(wù)的索引文件(.txnindex)

logsegment(kafka.log.LogSegment)

構(gòu)造函數(shù)中幾個(gè)重要的參數(shù)

@nonthreadsafe
class LogSegment private[log] (val log: FileRecords,
                               val lazyOffsetIndex: LazyIndex[OffsetIndex],
                               val lazyTimeIndex: LazyIndex[TimeIndex],
                               val txnIndex: TransactionIndex,
                               val baseOffset: Long,
                               val indexIntervalBytes: Int,
                               val rollJitterMs: Long,
                               val time: Time) 
  • filerecords: 實(shí)際保存kafka的消息對(duì)象

  • 位移索引文件

  • 時(shí)間索引未見(jiàn)

  • 已中止索引文件

  • indexIntervalBytes 其實(shí)就是 Broker 端參數(shù) log.index.interval.bytes 值,它控制了日志段對(duì)象新增索引項(xiàng)的頻率唇兑。默認(rèn)情況下坎弯,日志段至少新寫(xiě)入 4KB 的消息數(shù)據(jù)才會(huì)新增一條索引項(xiàng)纺涤。而 rollJitterMs 是日志段對(duì)象新增倒計(jì)時(shí)的“擾動(dòng)值”。因?yàn)槟壳?Broker 端日志段新增倒計(jì)時(shí)是全局設(shè)置抠忘,這就是說(shuō)撩炊,在未來(lái)的某個(gè)時(shí)刻可能同時(shí)創(chuàng)建多個(gè)日志段對(duì)象,這將極大地增加物理磁盤(pán) I/O 壓力崎脉。有了 rollJitterMs 值的干擾拧咳,每個(gè)新增日志段在創(chuàng)建時(shí)會(huì)彼此岔開(kāi)一小段時(shí)間,這樣可以緩解物理磁盤(pán)的 I/O 負(fù)載瓶頸囚灼。

  • baseoffset : 每個(gè)日志端保存自己的起始位移大小骆膝,一旦對(duì)象唄創(chuàng)建,則是固定的啦撮,不能再被修改

append 方法

/**
 * Append the given messages starting with the given offset. Add
 * an entry to the index if needed.
 *
 * It is assumed this method is being called from within a lock.
 *
 * @param largestOffset The last offset in the message set 最大位移
 * @param largestTimestamp The largest timestamp in the message set. 最大時(shí)間戳 
 * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.  最大時(shí)間戳對(duì)應(yīng)的消息位移
 * @param records The log entries to append.  真正要寫(xiě)入的消息集合
 * @return the physical position in the file of the appended records
 * @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow
 */
@nonthreadsafe
def append(largestOffset: Long,
           largestTimestamp: Long,
           shallowOffsetOfMaxTimestamp: Long,
           records: MemoryRecords): Unit = {
   //判斷日志端是否為空谭网,,如果日志端為空赃春,kakfa需要記錄要寫(xiě)入消息集合的最大時(shí)間戳,并將其作為后面新增日志端倒計(jì)時(shí)的依據(jù)
  if (records.sizeInBytes > 0) {
    trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
          s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
    val physicalPosition = log.sizeInBytes()
    if (physicalPosition == 0)
      rollingBasedTimestamp = Some(largestTimestamp)
// 確保輸入?yún)?shù)最大位移值是合法的劫乱, 確保lastest-baseoffset = [0,Int.MaxValue]之間 织中,這是一個(gè)已知常見(jiàn)的問(wèn)題
    ensureOffsetInRange(largestOffset)
// append真正的寫(xiě)入,將內(nèi)存中的消息對(duì)象寫(xiě)入操作系統(tǒng)的頁(yè)緩存當(dāng)中去
    // append the messages 
    val appendedBytes = log.append(records)
    trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
    // Update the in memory max timestamp and corresponding offset. 更新最大日志最大時(shí)間戳衷戈,最大時(shí)間戳所屬的位移值屬性狭吼,每個(gè)日志段都要保存最大時(shí)間戳信息和所屬消息的位移信息
    if (largestTimestamp > maxTimestampSoFar) {
      maxTimestampSoFar = largestTimestamp
      offsetOfMaxTimestampSoFar = shallowOffsetOfMaxTimestamp
    }
    // append an entry to the index (if needed) 更新索引項(xiàng),以及寫(xiě)入的字節(jié)數(shù)殖妇;日志端沒(méi)寫(xiě)入4KB刁笙,數(shù)據(jù)就要更新索引項(xiàng),當(dāng)寫(xiě)入字節(jié)數(shù)操作4KB的時(shí)候谦趣,append方法會(huì)調(diào)用索引對(duì)象的append方法新增索引項(xiàng)疲吸,同時(shí)清空已寫(xiě)入的字節(jié)數(shù)目
    if (bytesSinceLastIndexEntry > indexIntervalBytes) {
      offsetIndex.append(largestOffset, physicalPosition)
      timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
      bytesSinceLastIndexEntry = 0
    }
    bytesSinceLastIndexEntry += records.sizeInBytes
  }
}

read 方法

/**
   * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
   * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
   *
   * @param startOffset A lower bound on the first offset to include in the message set we read 要讀取的第一條信息位移
   * @param maxSize The maximum number of bytes to include in the message set we read 能讀取的最大字節(jié)數(shù)  
   * @param maxPosition The maximum position in the log segment that should be exposed for read 能讀到的最大文件位置
   * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists) 是否允許在消息體過(guò)大時(shí),至少返回地第一條信息(為了保證不出現(xiàn)消費(fèi)餓死的情況)
   *
   * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
   *         or null if the startOffset is larger than the largest offset in this log
   */
  @threadsafe
  def read(startOffset: Long,
           maxSize: Int,
           maxPosition: Long = size,
           minOneMessage: Boolean = false): FetchDataInfo = {
    if (maxSize < 0)
      throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")
// 定位要讀取的起始文件位置前鹅, kafka要更加索引信息找到對(duì)應(yīng)物理文件位置才開(kāi)始讀取消息
    val startOffsetAndSize = translateOffset(startOffset)

    // if the start position is already off the end of the log, return null
    if (startOffsetAndSize == null)
      return null

    val startPosition = startOffsetAndSize.position
    val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition)

    val adjustedMaxSize =
      if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
      else maxSize

    // return a log segment but with zero size in the case below
    if (adjustedMaxSize == 0)
      return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)

    // calculate the length of the message set to read based on whether or not they gave us a maxOffset
    //舉個(gè)例子摘悴,假設(shè) maxSize=100,maxPosition=300舰绘,startPosition=250蹂喻,那么 read 方法只能讀取 50 字節(jié)葱椭,因?yàn)?maxPosition - startPosition = 50。我們把它和 maxSize 參數(shù)相比較口四,其中的最小值就是最終能夠讀取的總字節(jié)數(shù)孵运。
    val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)
    // 從指定位置開(kāi)始讀取指定大小的消息集合
    FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
      firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
  }

recover方法

/**
 * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes
 * from the end of the log and index.
 *   Broker 在啟動(dòng)時(shí)會(huì)從磁盤(pán)上加載所有日志段信息到內(nèi)存中,并創(chuàng)建相應(yīng)的 LogSegment 對(duì)象實(shí)例蔓彩。在這個(gè)過(guò)程中掐松,它需要執(zhí)行一系列的操作。
 * @param producerStateManager Producer state corresponding to the segment's base offset. This is needed to recover
 *                             the transaction index.
 * @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery.
 * @return The number of bytes truncated from the log
 * @throws LogSegmentOffsetOverflowException if the log segment contains an offset that causes the index offset to overflow
 */
@nonthreadsafe
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
  offsetIndex.reset()
  timeIndex.reset()
  txnIndex.reset()
  var validBytes = 0
  var lastIndexEntry = 0
  maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
  try {
    for (batch <- log.batches.asScala) {
      batch.ensureValid()
      ensureOffsetInRange(batch.lastOffset)

      // The max timestamp is exposed at the batch level, so no need to iterate the records
      if (batch.maxTimestamp > maxTimestampSoFar) {
        maxTimestampSoFar = batch.maxTimestamp
        offsetOfMaxTimestampSoFar = batch.lastOffset
      }

      // Build offset index
      if (validBytes - lastIndexEntry > indexIntervalBytes) {
        offsetIndex.append(batch.lastOffset, validBytes)
        timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
        lastIndexEntry = validBytes
      }
      validBytes += batch.sizeInBytes()

      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
        leaderEpochCache.foreach { cache =>
          if (batch.partitionLeaderEpoch >= 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
            cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
        }
        updateProducerState(producerStateManager, batch)
      }
    }
  } catch {
    case e@ (_: CorruptRecordException | _: InvalidRecordException) =>
      warn("Found invalid messages in log segment %s at byte offset %d: %s. %s"
        .format(log.file.getAbsolutePath, validBytes, e.getMessage, e.getCause))
  }
  val truncated = log.sizeInBytes - validBytes
  if (truncated > 0)
    debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")

  log.truncateTo(validBytes)
  offsetIndex.trimToValidSize()
  // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
  timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true)
  timeIndex.trimToValidSize()
  truncated
}
//注意
recover 開(kāi)始時(shí)粪小,代碼依次調(diào)用索引對(duì)象的 reset 方法清空所有的索引文件大磺,之后會(huì)開(kāi)始遍歷日志段中的所有消息集合或消息批次(RecordBatch)。對(duì)于讀取到的每個(gè)消息集合探膊,日志段必須要確保它們是合法的杠愧,這主要體現(xiàn)在兩個(gè)方面:該集合中的消息必須要符合 Kafka 定義的二進(jìn)制格式;該集合中最后一條消息的位移值不能越界逞壁,即它與日志段起始位移的差值必須是一個(gè)正整數(shù)值流济。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市腌闯,隨后出現(xiàn)的幾起案子绳瘟,更是在濱河造成了極大的恐慌,老刑警劉巖姿骏,帶你破解...
    沈念sama閱讀 216,496評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件糖声,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡分瘦,警方通過(guò)查閱死者的電腦和手機(jī)蘸泻,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)嘲玫,“玉大人悦施,你說(shuō)我怎么就攤上這事∪ネ牛” “怎么了抡诞?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,632評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)土陪。 經(jīng)常有香客問(wèn)我昼汗,道長(zhǎng),這世上最難降的妖魔是什么旺坠? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,180評(píng)論 1 292
  • 正文 為了忘掉前任乔遮,我火速辦了婚禮,結(jié)果婚禮上取刃,老公的妹妹穿的比我還像新娘蹋肮。我一直安慰自己出刷,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,198評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布坯辩。 她就那樣靜靜地躺著馁龟,像睡著了一般。 火紅的嫁衣襯著肌膚如雪漆魔。 梳的紋絲不亂的頭發(fā)上坷檩,一...
    開(kāi)封第一講書(shū)人閱讀 51,165評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音改抡,去河邊找鬼矢炼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛阿纤,可吹牛的內(nèi)容都是我干的句灌。 我是一名探鬼主播,決...
    沈念sama閱讀 40,052評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼欠拾,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼胰锌!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起藐窄,我...
    開(kāi)封第一講書(shū)人閱讀 38,910評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤资昧,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后荆忍,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體格带,經(jīng)...
    沈念sama閱讀 45,324評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,542評(píng)論 2 332
  • 正文 我和宋清朗相戀三年东揣,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了践惑。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,711評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡嘶卧,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出凉袱,到底是詐尸還是另有隱情芥吟,我是刑警寧澤,帶...
    沈念sama閱讀 35,424評(píng)論 5 343
  • 正文 年R本政府宣布专甩,位于F島的核電站钟鸵,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏涤躲。R本人自食惡果不足惜棺耍,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,017評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望种樱。 院中可真熱鬧蒙袍,春花似錦俊卤、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,668評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至以现,卻和暖如春狠怨,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背邑遏。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,823評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工佣赖, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人记盒。 一個(gè)月前我還...
    沈念sama閱讀 47,722評(píng)論 2 368
  • 正文 我出身青樓憎蛤,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親孽鸡。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蹂午,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,611評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容