Kafka中Message存儲相關(guān)類大揭密

  • 我們使用Kafka, 最終都是要存,取消息,今天我們就來看下源碼中和消息相關(guān)的類;
  • 涉及到的類:
    1. Message
    2. Record
    3. MessageSet
    4. ByteBufferMessageSet
    5. BufferingOutputStream
    6. MessageWriter
    7. FileMessageSet

Message類:
  • 所在文件: core/src/main/scala/kafka/message/Message.scala
  • 作用: kafka系統(tǒng)單條message結(jié)構(gòu)組成
  • Message結(jié)構(gòu):
1.png
  • 這個類主要就是使用ByteBuffer來承載Message這個結(jié)構(gòu), 默認構(gòu)造函數(shù)封裝了ByteBuffer, 還提供了一系列的this構(gòu)造函數(shù),參數(shù)為Message結(jié)構(gòu)的若干個字段;
  • checksum的計算: checksum的計算從Magic字段開始, 計算結(jié)果寫入CRC32字段.
  • 提供了一系列便捷方法,來獲取Message結(jié)構(gòu)中各個字段和屬性:
  /**
   * The complete serialized size of this message in bytes (including crc, header attributes, etc)
   */
  def size: Int = buffer.limit

  /**
   * The length of the key in bytes
   */
  def keySize: Int = buffer.getInt(Message.KeySizeOffset)

 /**
   * The length of the message value in bytes
   */
  def payloadSize: Int = buffer.getInt(payloadSizeOffset)
 /**
   * The magic version of this message
   */
  def magic: Byte = buffer.get(MagicOffset)
  
  /**
   * The attributes stored with this message
   */
  def attributes: Byte = buffer.get(AttributesOffset)
  
  /**
   * The compression codec used with this message
   */
  def compressionCodec: CompressionCodec = 
    CompressionCodec.getCompressionCodec(buffer.get(AttributesOffset) & CompressionCodeMask)
  
  /**
   * A ByteBuffer containing the content of the message
   */
  def payload: ByteBuffer = sliceDelimited(payloadSizeOffset)
Record類
  • 實際上kafka源碼中沒有這個類, kafka中的一條消息是上面我們講的一個Message, 但實際上記錄到log文件中的不是這個Message, 而是一條Record
  • Record的結(jié)構(gòu): 其實很簡單 [Offset MessageSize Message], 在一條Message前面加上8字節(jié)的Offset和4字節(jié)的MessageSize
  • 實際是多條Record就構(gòu)成了我們下面要說的一個MessageSet
MessageSet類
  • 所在文件: core/src/main/scala/kafka/message/MessageSet.scala
  • 作用: 存儲若干條Record, 官網(wǎng)上給出的結(jié)構(gòu):
    MessageSet => [Offset MessageSize Message]  => 這里就是我們上面說的Record
      Offset => int64
      MessageSize => int32
      Message
  • 定義:abstract class MessageSet extends Iterable[MessageAndOffset]
    從定義可以看出MessageSet是個抽象類, 且繼承了Iterable[MessageAndOffset],
  • 主要方法:
    1. def iterator: Iterator[MessageAndOffset]: 返回迭代器, 用于迭代所有的MessageAndOffset, 主要是因為它繼承了Iterable[MessageAndOffset];
    2. def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int:寫message到指定的Channel
  • Object Message里其實已經(jīng)定義了我們上面說的Record:
  val MessageSizeLength = 4
  val OffsetLength = 8
  val LogOverhead = MessageSizeLength + OffsetLength
 
 //這里的entry就是我們說的Record
 def entrySize(message: Message): Int = LogOverhead + message.size
  • 結(jié)構(gòu)示意圖:
2.jpg
ByteBufferMessageSet類
  • 所在文件: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
  • 定義: class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Logging
    1. 繼承于MessageSet;
    2. 提供了ByteBufferMessageSet之間的相互轉(zhuǎn)換, MessageSet在內(nèi)存中的操作
  • 主要方法:
    1. override def iterator: Iterator[MessageAndOffset] = internalIterator(): 返回迭代器,用來遍歷包含的每條MessageAndOffset; 主要是用來從ByteBuffer里抽取Message
      1.1 實際上是通過internalIterator()方法返回;
      1.2 private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset],返回MessageAndOffset的迭代器 new IteratorTemplate[MessageAndOffset]
      1.3 真正干活的是IteratorTemplate[MessageAndOffset]override def makeNext(): MessageAndOffset, 實際上就是把上面介紹的MessageSet的結(jié)構(gòu)里的Record一條條解出來, 對于壓縮后的MessageSet涉及到一層遞歸,具體可以參見上面的 2.jpg
      1.4 放一段核心代碼:
       if(isShallow) { //是不是要作深層迭代需要迭代,就是我們上面2.jpg里的M1
          new MessageAndOffset(newMessage, offset) //直接返回一條MessageAndOffset
        } else { //需要迭代,就是我們上面2.jpg里的M2
          newMessage.compressionCodec match {//根據(jù)壓縮Codec決定作什么處理
            case NoCompressionCodec => //未壓縮,直接返回一條MessageAndOffset
              innerIter = null
              new MessageAndOffset(newMessage, offset)
            case _ => //壓縮了的MessageSet, 就再深入一層, 逐條解壓讀取
              innerIter = ByteBufferMessageSet.deepIterator(newMessage)
              if(!innerIter.hasNext)
                innerIter = null
              makeNext()
          }
        }
  1. private def create(offsetCounter: AtomicLong, compressionCodec: CompressionCodec, messages: Message*): ByteBuffer: 用于從Message List到ByteBuffer的轉(zhuǎn)換, 實際上最后生成的ByteBuffer里就是上面說的一條Record
   if(messages.size == 0) {
      MessageSet.Empty.buffer
    } else if(compressionCodec == NoCompressionCodec) {
      // 非壓縮的
      val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
      for(message <- messages)
        writeMessage(buffer, message, offsetCounter.getAndIncrement)
      buffer.rewind()
      buffer
    } else {
     //壓縮的使用 MessageWriter類來寫
      var offset = -1L
      val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
      messageWriter.write(codec = compressionCodec) { outputStream =>
        val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream))
        try {
          //逐條壓縮
          for (message <- messages) {
            offset = offsetCounter.getAndIncrement
            output.writeLong(offset)
            output.writeInt(message.size)
            output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
          }
        } finally {
          output.close()
        }
      }
      //寫入buffer作為一條Record
      val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
      writeMessage(buffer, messageWriter, offset)
      buffer.rewind()
      buffer
    }
  1. def writeTo(channel: GatheringByteChannel, offset: Long, size: Int): Int: 寫MessageSet到GatheringByteChannel:
    // Ignore offset and size from input. We just want to write the whole buffer to the channel.
    buffer.mark()
    var written = 0
    while(written < sizeInBytes)
      written += channel.write(buffer)
    buffer.reset()
    written
  }
  1. Message驗證和Offset的重新賦值: 這是一個神奇的函數(shù),在broker把收到的producer request里的MessageSet append到Log之前瘟滨,以及consumer和follower獲取消息之后胜卤,都需要進行校驗, 這個函數(shù)就是這個驗證的一部分, 我把相應(yīng)的說明寫在源碼里,這個函數(shù)在后面講到處理log append和consumer時我們還會用到.
private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: AtomicLong,
                                                      sourceCodec: CompressionCodec,
                                                      targetCodec: CompressionCodec,
                                                      compactedTopic: Boolean = false): ByteBufferMessageSet = {
    if(sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) { // 非壓縮的Message
      // do in-place validation and offset assignment
      var messagePosition = 0
      buffer.mark()
      while(messagePosition < sizeInBytes - MessageSet.LogOverhead) {
        buffer.position(messagePosition)
       // 根據(jù)參數(shù)傳入的 offsetCountern 更新當前的Offset
        buffer.putLong(offsetCounter.getAndIncrement())
        val messageSize = buffer.getInt()
        val positionAfterKeySize = buffer.position + Message.KeySizeOffset + Message.KeySizeLength
        // 如果是compact topic(比如__cosumer_offsets),  key是一定要有的, 這里檢查這個key的合法性
        if (compactedTopic && positionAfterKeySize < sizeInBytes) {
          buffer.position(buffer.position() + Message.KeySizeOffset)
          val keySize = buffer.getInt()
          if (keySize <= 0) {
            buffer.reset()
            throw new InvalidMessageException("Compacted topic cannot accept message without key.")
          }
        }
        messagePosition += MessageSet.LogOverhead + messageSize
      }
      buffer.reset()
      this
    } else {
      // 壓縮的Message,  下面源碼里的注釋已經(jīng)說得很清楚了
      // We need to deep-iterate over the message-set if any of these are true:
      // (i) messages are compressed
      // (ii) the topic is configured with a target compression codec so we need to recompress regardless of original codec
      // 深度迭代, 獲取所有的message
      val messages = this.internalIterator(isShallow = false).map(messageAndOffset => {
        if (compactedTopic && !messageAndOffset.message.hasKey)
          throw new InvalidMessageException("Compacted topic cannot accept message without key.")

        messageAndOffset.message
      })
      //使用targetCodec重新壓縮
      new ByteBufferMessageSet(compressionCodec = targetCodec, offsetCounter = offsetCounter, messages = messages.toBuffer:_*)
    }
  }
BufferingOutputStream類
  • 所在文件: core/src/main/scala/kafka/message/MessageWriter.scala
  • 定義: class BufferingOutputStream(segmentSize: Int) extends OutputStream 繼承自OutputStream
  • 作用: 這個來接納寫入它的各種數(shù)據(jù)類型, 比如int, byte, byte array, 其內(nèi)部定義了 Segment類, Segment內(nèi)部使用Array[byte]來存儲數(shù)據(jù), 多個Segment連成一個鏈接, 鏈接可以自動擴展,來存儲寫入BufferingOutputStream的所有數(shù)據(jù)
  • 主要方法:
    1. 一組write函數(shù): 用于寫入不能類型的數(shù)據(jù);
    2. def reserve(len: Int): ReservedOutput: 從當前位置開始預(yù)留len長度存儲空間
    3. def writeTo(buffer: ByteBuffer): Unit: 將存儲在Segment鏈接中的數(shù)據(jù)全部拷貝到ByteBuffer中 .
MessageWriter
  • 所在文件: core/src/main/scala/kafka/message/MessageWriter.scala
  • 定義: class MessageWriter(segmentSize: Int) extends BufferingOutputStream(segmentSize), 繼承自上面的BufferingOutputStream
  • 作用: 在ByteBufferMessageSet::create中用到, 將若干條Message構(gòu)造成多條對應(yīng)的壓縮后的Record, 將這個壓縮后的結(jié)果再次作為payload構(gòu)造成一條新的Message;
  • 主要方法:
    1. 構(gòu)造Message, 添加Crc, 寫入Magic, Attribete, key size, key.......
def write(key: Array[Byte] = null, codec: CompressionCodec)(writePayload: OutputStream => Unit): Unit = {
    withCrc32Prefix {
      write(CurrentMagicValue)
      var attributes: Byte = 0
      if (codec.codec > 0)
        attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte
      write(attributes)
      // write the key
      if (key == null) {
        writeInt(-1)
      } else {
        writeInt(key.length)
        write(key, 0, key.length)
      }
      // write the payload with length prefix
      withLengthPrefix {
        writePayload(this)
      }
    }
  }
FileMessageSet類
  • 所在文件:core/src/main/scala/kafka/log/FileMessageSet.scala
  • 定義: class FileMessageSet private[kafka](@volatile var file: File, private[log] val channel: FileChannel, private[log] val start: Int, private[log] val end: Int, isSlice: Boolean) extends MessageSet with Logging
  • 作用:用于MessageSet與磁盤文件之前的讀取
  • 主要方法:
  1. def iterator(maxMessageSize: Int): Iterator[MessageAndOffset]: 返回一個迭代器宝穗,用于獲取對應(yīng)本地log文件里的每一條Record, 寫入到文件里是不是Message,而是Record
override def makeNext(): MessageAndOffset = {
        if(location >= end)
          return allDone()
          
        // read the size of the item
        sizeOffsetBuffer.rewind()
        // 先讀Record的頭部蚤告,Offset + MessageSize , 共12字節(jié)
        channel.read(sizeOffsetBuffer, location)
        if(sizeOffsetBuffer.hasRemaining)
          return allDone()
        
        sizeOffsetBuffer.rewind()
        val offset = sizeOffsetBuffer.getLong()
        val size = sizeOffsetBuffer.getInt()
        if(size < Message.MinHeaderSize)
          return allDone()
        if(size > maxMessageSize)
          throw new InvalidMessageException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize))
        
        // read the item itself 
       //  根所MessageSize讀Message
        val buffer = ByteBuffer.allocate(size)
        channel.read(buffer, location + 12)
        if(buffer.hasRemaining)
          return allDone()
        buffer.rewind()
        
        // increment the location and return the item
        location += size + 1
        new MessageAndOffset(new Message(buffer), offset)
      }
  1. def append(messages: ByteBufferMessageSet) { val written = messages.writeTo(channel, 0, messages.sizeInBytes) _size.getAndAdd(written) } :將多條Record`由內(nèi)存落地到本地Log文件
  2. def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int: 將本地Log文件中的Message發(fā)送到批定的Channel
 val newSize = math.min(channel.size().toInt, end) - start
    if (newSize < _size.get()) {
      throw new KafkaException("Size of FileMessageSet %s has been truncated during write: old size %d, new size %d"
        .format(file.getAbsolutePath, _size.get(), newSize))
    }
    val position = start + writePosition
    val count = math.min(size, sizeInBytes)
    val bytesTransferred = (destChannel match {
      // 利用sendFile系統(tǒng)調(diào)用已零拷貝方式發(fā)送給客戶端
      case tl: TransportLayer => tl.transferFrom(channel, position, count)
      case dc => channel.transferTo(position, count, dc)
    }).toInt
    trace("FileMessageSet " + file.getAbsolutePath + " : bytes transferred : " + bytesTransferred
      + " bytes requested for transfer : " + math.min(size, sizeInBytes))
    bytesTransferred
總結(jié)
  • 我們看到ByteBufferMessageSetFileMessageSet都是繼承于MessageSet, 也就是說一條Record的結(jié)構(gòu)在內(nèi)存和本地文件中的存儲格式是完全一樣的,在Message的讀寫時不用作多余的轉(zhuǎn)換。

Kafka源碼分析-匯總

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末毅臊,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子黑界,更是在濱河造成了極大的恐慌管嬉,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,042評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件朗鸠,死亡現(xiàn)場離奇詭異蚯撩,居然都是意外死亡,警方通過查閱死者的電腦和手機烛占,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評論 2 384
  • 文/潘曉璐 我一進店門胎挎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人忆家,你說我怎么就攤上這事犹菇。” “怎么了芽卿?”我有些...
    開封第一講書人閱讀 156,674評論 0 345
  • 文/不壞的土叔 我叫張陵揭芍,是天一觀的道長。 經(jīng)常有香客問我卸例,道長称杨,這世上最難降的妖魔是什么流酬? 我笑而不...
    開封第一講書人閱讀 56,340評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮列另,結(jié)果婚禮上芽腾,老公的妹妹穿的比我還像新娘。我一直安慰自己页衙,他們只是感情好摊滔,可當我...
    茶點故事閱讀 65,404評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著店乐,像睡著了一般艰躺。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上眨八,一...
    開封第一講書人閱讀 49,749評論 1 289
  • 那天腺兴,我揣著相機與錄音,去河邊找鬼廉侧。 笑死页响,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的段誊。 我是一名探鬼主播闰蚕,決...
    沈念sama閱讀 38,902評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼连舍!你這毒婦竟也來了没陡?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,662評論 0 266
  • 序言:老撾萬榮一對情侶失蹤索赏,失蹤者是張志新(化名)和其女友劉穎盼玄,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體潜腻,經(jīng)...
    沈念sama閱讀 44,110評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡埃儿,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了砾赔。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蝌箍。...
    茶點故事閱讀 38,577評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖暴心,靈堂內(nèi)的尸體忽然破棺而出妓盲,到底是詐尸還是另有隱情,我是刑警寧澤专普,帶...
    沈念sama閱讀 34,258評論 4 328
  • 正文 年R本政府宣布悯衬,位于F島的核電站,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏筋粗。R本人自食惡果不足惜策橘,卻給世界環(huán)境...
    茶點故事閱讀 39,848評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望娜亿。 院中可真熱鬧丽已,春花似錦、人聲如沸买决。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽督赤。三九已至嘁灯,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間躲舌,已是汗流浹背丑婿。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留没卸,地道東北人羹奉。 一個月前我還...
    沈念sama閱讀 46,271評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像办悟,于是被迫代替她去往敵國和親尘奏。 傳聞我的和親對象是個殘疾皇子滩褥,可洞房花燭夜當晚...
    茶點故事閱讀 43,452評論 2 348

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理病蛉,服務(wù)發(fā)現(xiàn),斷路器瑰煎,智...
    卡卡羅2017閱讀 134,628評論 18 139
  • ** 今天看了一下kafka官網(wǎng)铺然,嘗試著在自己電腦上安裝和配置,然后學(xué)一下官方document酒甸。** Introd...
    RainChang閱讀 4,993評論 1 30
  • kafka的定義:是一個分布式消息系統(tǒng)魄健,由LinkedIn使用Scala編寫,用作LinkedIn的活動流(Act...
    時待吾閱讀 5,309評論 1 15
  • 發(fā)送到Kafka的消息最終都是要落盤存儲到磁盤上; 本章涉及到的類:OffsetIndex;LogSegment;...
    掃帚的影子閱讀 3,410評論 0 1
  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,812評論 4 54