Kafka消息的磁盤(pán)存儲(chǔ)

  • 發(fā)送到Kafka的消息最終都是要落盤(pán)存儲(chǔ)到磁盤(pán)上;
  • 本章涉及到的類(lèi):
    1. OffsetIndex;
    2. LogSegment;

OffsetIndex類(lèi)
  • 所在文件: core/src/main/scala/kafka/log/OffsetIndex.scala
  • 作用: 我們知道所有發(fā)送到kafka的消息都是以Record的結(jié)構(gòu)(Kafka中Message存儲(chǔ)相關(guān)類(lèi)大揭密)寫(xiě)入到本地文件, 有寫(xiě)就要有讀,讀取時(shí)一般是從給定的offset開(kāi)始讀取,這個(gè)offset是邏輯offset, 需要轉(zhuǎn)換成文件的實(shí)際偏移量, 為了加速這個(gè)轉(zhuǎn)換, kafka針對(duì)每個(gè)log文件,提供了index文件, index文件采用稀疏索引的方式, 只記錄部分log offset到file position的轉(zhuǎn)換, 然后還需要在log文件中進(jìn)行少量的順序遍歷, 來(lái)精確定位到需要的Record;
  • index文件結(jié)構(gòu): 文件里存的是一條條的log offset與file position的映射, 每條記錄8個(gè)字節(jié),前4個(gè)字節(jié)是log offset, 后4個(gè)字節(jié)是file position, 這樣的每一條映射信息我們可以稱(chēng)為是一個(gè)slot
  • 讀寫(xiě)方式: 為了加速index文件的讀寫(xiě), 采用了文件內(nèi)存映射的方式:
    /* initialize the memory mapping for this index */
    private var mmap: MappedByteBuffer = 
    {
      val newlyCreated = file.createNewFile()
      val raf = new RandomAccessFile(file, "rw")
      try {
        /* pre-allocate the file if necessary */
        if(newlyCreated) {
          if(maxIndexSize < 8)
            throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
          raf.setLength(roundToExactMultiple(maxIndexSize, 8))
        }
          
        /* memory-map the file */
        val len = raf.length()
        val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
          
        /* set the position in the index for the next entry */
        if(newlyCreated)
          idx.position(0)
        else
          // if this is a pre-existing index, assume it is all valid and set position to last entry
          idx.position(roundToExactMultiple(idx.limit, 8))
        idx
      } finally {
        CoreUtils.swallow(raf.close())
      }
    }

  • 主要方法:
    1. def lookup(targetOffset: Long): OffsetPosition: 查找小于或等于targetOffset的最大Offset;
 maybeLock(lock) {
      val idx = mmap.duplicate
      val slot = indexSlotFor(idx, targetOffset)
      if(slot == -1)
        OffsetPosition(baseOffset, 0)
      else
        OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
      }
  1. private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int:采用二分法查找到對(duì)于targetOffset在index文件中的slot
   // binary search for the entry 二分查找法
    var lo = 0
    var hi = entries-1
    while(lo < hi) {
      val mid = ceil(hi/2.0 + lo/2.0).toInt
      val found = relativeOffset(idx, mid)
      if(found == relOffset)
        return mid
      else if(found < relOffset)
        lo = mid
      else
        hi = mid - 1
    }
  1. def append(offset: Long, position: Int): 向index文件中追加一個(gè)offset/location的映射信息
  2. def truncateTo(offset: Long): 按給定的offset,找到對(duì)應(yīng)的slot, 然后截?cái)?/li>
  3. def resize(newSize: Int): 重新設(shè)置index文件size, 但保持當(dāng)前mmap的position不變
      inLock(lock) {
      val raf = new RandomAccessFile(file, "rw")
      val roundedNewSize = roundToExactMultiple(newSize, 8)
      val position = this.mmap.position
      
      /* Windows won't let us modify the file length while the file is mmapped :-( */
      if(Os.isWindows)
        forceUnmap(this.mmap)
      try {
        raf.setLength(roundedNewSize)
        this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
        this.maxEntries = this.mmap.limit / 8
        this.mmap.position(position)
      } finally {
        CoreUtils.swallow(raf.close())
      }
    }
  • 有意思的一件事:
    上面我們說(shuō)過(guò)這個(gè)index文件的讀取是使用了內(nèi)存文件映射MappedByteBuffer, 然后并沒(méi)有找到相應(yīng)的unmap(實(shí)際上是沒(méi)有這方法)的調(diào)用, 這個(gè)會(huì)不會(huì)有問(wèn)題呢?遇到google了一下, 果然有發(fā)現(xiàn): Long GC pause harming broker performance which is caused by mmap objects created for OffsetIndex,
    在實(shí)際應(yīng)用中確實(shí)遇到了這樣的問(wèn)題,一直沒(méi)搞明白為什么IO會(huì)升高.
LogSegment
  • 所在文件: core/src/main/scala/kafka/log/LogSegment.scala
  • 作用: 封裝對(duì)消息落地后的log和index文件的所有操作
  • 類(lèi)定義:
      class LogSegment(val log: FileMessageSet, 
                 val index: OffsetIndex, 
                 val baseOffset: Long, 
                 val indexIntervalBytes: Int,
                 val rollJitterMs: Long,
                 time: Time) extends Loggin

可以看到使用FileMessageSet來(lái)操作Log文件, 使用OffsetIndex來(lái)操作Index文件

  • 主要方法:
    1. def size: Long = log.sizeInBytes() : 返回當(dāng)前l(fā)og文件的大小
    2. def append(offset: Long, messages: ByteBufferMessageSet):追加msg到log文件尾部,必要時(shí)更新index文件
 if (messages.sizeInBytes > 0) {
      // append an entry to the index (if needed)
      // index采用的是稀疏索引, 所以先判斷是否需要寫(xiě)入
      if(bytesSinceLastIndexEntry > indexIntervalBytes) {
        index.append(offset, log.sizeInBytes())
        this.bytesSinceLastIndexEntry = 0
      }
      // append the messages
      log.append(messages)  //追加msg到log文件尾部
      this.bytesSinceLastIndexEntry += messages.sizeInBytes
    }
  1. def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size): FetchDataInfo: 根據(jù)給定的offset信息等讀取相應(yīng)的msg 和offset信息,構(gòu)成FetchDataInfo返回
 val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)

    // if the size is zero, still return a log segment but with zero size
    if(maxSize == 0)
      return FetchDataInfo(offsetMetadata, MessageSet.Empty)

    // calculate the length of the message set to read based on whether or not they gave us a maxOffset
    val length = 
      maxOffset match {
        case None =>
          // no max offset, just read until the max position
          min((maxPosition - startPosition.position).toInt, maxSize)
        case Some(offset) => {
          // there is a max offset, translate it to a file position and use that to calculate the max read size
          if(offset < startOffset)
            throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
          val mapping = translateOffset(offset, startPosition.position)
          val endPosition = 
            if(mapping == null)
              logSize // the max offset is off the end of the log, use the end of the file
            else
              mapping.position
          min(min(maxPosition, endPosition) - startPosition.position, maxSize).toInt
        }
      }
    FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))

實(shí)際上最終是調(diào)用FileMessageSetread方法讀取

  1. def recover(maxMessageSize: Int): Int :讀取當(dāng)前的log文件內(nèi)容,重新構(gòu)建index文件
//逐條讀取log里的msg, 然后構(gòu)建index文件
val iter = log.iterator(maxMessageSize)
   try {
     while(iter.hasNext) {
       val entry = iter.next
       entry.message.ensureValid()
       if(validBytes - lastIndexEntry > indexIntervalBytes) {
         // we need to decompress the message, if required, to get the offset of the first uncompressed message
         val startOffset =
           entry.message.compressionCodec match {
             case NoCompressionCodec =>
               entry.offset
             case _ =>
               ByteBufferMessageSet.deepIterator(entry.message).next().offset
         }
         index.append(startOffset, validBytes)
         lastIndexEntry = validBytes
       }
       validBytes += MessageSet.entrySize(entry.message)
     }
   } catch {
     case e: InvalidMessageException => 
       logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage))
   }
  1. def truncateTo(offset: Long): Int: 根據(jù)給定的offset截?cái)鄉(xiāng)og和index文件
 val mapping = translateOffset(offset)
    if(mapping == null)
      return 0
    index.truncateTo(offset)
    // after truncation, reset and allocate more space for the (new currently  active) index
    index.resize(index.maxIndexSize)
    val bytesTruncated = log.truncateTo(mapping.position)
    if(log.sizeInBytes == 0)
      created = time.milliseconds
    bytesSinceLastIndexEntry = 0
    bytesTruncated
  1. def nextOffset(): Long : 獲取下一個(gè)offset值, 其實(shí)就是當(dāng)前最大的offset + 1
val ms = read(index.lastOffset, None, log.sizeInBytes)
    if(ms == null) {
      baseOffset
    } else {
      ms.messageSet.lastOption match {
        case None => baseOffset
        case Some(last) => last.nextOffset
      }
    }

Kafka源碼分析-匯總

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌僻他,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,406評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件栽惶,死亡現(xiàn)場(chǎng)離奇詭異克滴,居然都是意外死亡雄卷,警方通過(guò)查閱死者的電腦和手機(jī)儡蔓,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,395評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門(mén)郭蕉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人喂江,你說(shuō)我怎么就攤上這事召锈。” “怎么了获询?”我有些...
    開(kāi)封第一講書(shū)人閱讀 167,815評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵涨岁,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我吉嚣,道長(zhǎng)梢薪,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,537評(píng)論 1 296
  • 正文 為了忘掉前任尝哆,我火速辦了婚禮沮尿,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘较解。我一直安慰自己,他們只是感情好赴邻,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,536評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布印衔。 她就那樣靜靜地躺著,像睡著了一般姥敛。 火紅的嫁衣襯著肌膚如雪奸焙。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 52,184評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音与帆,去河邊找鬼了赌。 笑死,一個(gè)胖子當(dāng)著我的面吹牛玄糟,可吹牛的內(nèi)容都是我干的勿她。 我是一名探鬼主播,決...
    沈念sama閱讀 40,776評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼阵翎,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼逢并!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起郭卫,我...
    開(kāi)封第一講書(shū)人閱讀 39,668評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤砍聊,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后贰军,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體玻蝌,經(jīng)...
    沈念sama閱讀 46,212評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,299評(píng)論 3 340
  • 正文 我和宋清朗相戀三年词疼,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了俯树。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,438評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡寒跳,死狀恐怖聘萨,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情童太,我是刑警寧澤米辐,帶...
    沈念sama閱讀 36,128評(píng)論 5 349
  • 正文 年R本政府宣布,位于F島的核電站书释,受9級(jí)特大地震影響翘贮,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜爆惧,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,807評(píng)論 3 333
  • 文/蒙蒙 一狸页、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧扯再,春花似錦芍耘、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,279評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至秃殉,卻和暖如春坝初,著一層夾襖步出監(jiān)牢的瞬間浸剩,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,395評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工鳄袍, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留绢要,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,827評(píng)論 3 376
  • 正文 我出身青樓拗小,卻偏偏與公主長(zhǎng)得像重罪,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子十籍,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,446評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容