- 我們使用Kafka, 最終都是要存,取消息,今天我們就來看下源碼中和消息相關(guān)的類;
- 涉及到的類:
- Message
- Record
- MessageSet
- ByteBufferMessageSet
- BufferingOutputStream
- MessageWriter
- FileMessageSet
Message類:
- 所在文件: core/src/main/scala/kafka/message/Message.scala
- 作用: kafka系統(tǒng)單條message結(jié)構(gòu)組成
- Message結(jié)構(gòu):
- 這個類主要就是使用ByteBuffer來承載Message這個結(jié)構(gòu), 默認構(gòu)造函數(shù)封裝了ByteBuffer, 還提供了一系列的
this
構(gòu)造函數(shù),參數(shù)為Message結(jié)構(gòu)的若干個字段;
- checksum的計算: checksum的計算從
Magic
字段開始, 計算結(jié)果寫入CRC32
字段.
- 提供了一系列便捷方法,來獲取Message結(jié)構(gòu)中各個字段和屬性:
/**
* The complete serialized size of this message in bytes (including crc, header attributes, etc)
*/
def size: Int = buffer.limit
/**
* The length of the key in bytes
*/
def keySize: Int = buffer.getInt(Message.KeySizeOffset)
/**
* The length of the message value in bytes
*/
def payloadSize: Int = buffer.getInt(payloadSizeOffset)
/**
* The magic version of this message
*/
def magic: Byte = buffer.get(MagicOffset)
/**
* The attributes stored with this message
*/
def attributes: Byte = buffer.get(AttributesOffset)
/**
* The compression codec used with this message
*/
def compressionCodec: CompressionCodec =
CompressionCodec.getCompressionCodec(buffer.get(AttributesOffset) & CompressionCodeMask)
/**
* A ByteBuffer containing the content of the message
*/
def payload: ByteBuffer = sliceDelimited(payloadSizeOffset)
Record類
- 實際上kafka源碼中沒有這個類, kafka中的一條消息是上面我們講的一個
Message
, 但實際上記錄到log文件中的不是這個Message
, 而是一條Record
- Record的結(jié)構(gòu): 其實很簡單
[Offset MessageSize Message]
, 在一條Message
前面加上8字節(jié)的Offset
和4字節(jié)的MessageSize
- 實際是多條
Record
就構(gòu)成了我們下面要說的一個MessageSet
MessageSet類
- 所在文件: core/src/main/scala/kafka/message/MessageSet.scala
- 作用: 存儲若干條
Record
, 官網(wǎng)上給出的結(jié)構(gòu):
MessageSet => [Offset MessageSize Message] => 這里就是我們上面說的Record
Offset => int64
MessageSize => int32
Message
- 定義:
abstract class MessageSet extends Iterable[MessageAndOffset]
從定義可以看出MessageSet
是個抽象類, 且繼承了Iterable[MessageAndOffset]
,
- 主要方法:
-
def iterator: Iterator[MessageAndOffset]
: 返回迭代器, 用于迭代所有的MessageAndOffset
, 主要是因為它繼承了Iterable[MessageAndOffset]
;
-
def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int
:寫message到指定的Channel
- 在
Object Message
里其實已經(jīng)定義了我們上面說的Record
:
val MessageSizeLength = 4
val OffsetLength = 8
val LogOverhead = MessageSizeLength + OffsetLength
//這里的entry就是我們說的Record
def entrySize(message: Message): Int = LogOverhead + message.size
ByteBufferMessageSet類
- 所在文件: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
- 定義:
class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Logging
- 繼承于
MessageSet
;
- 提供了
ByteBuffer
和MessageSet
之間的相互轉(zhuǎn)換, MessageSet
在內(nèi)存中的操作
- 主要方法:
-
override def iterator: Iterator[MessageAndOffset] = internalIterator()
: 返回迭代器,用來遍歷包含的每條MessageAndOffset
; 主要是用來從ByteBuffer
里抽取Message
1.1 實際上是通過internalIterator()
方法返回;
1.2 private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset]
,返回MessageAndOffset
的迭代器 new IteratorTemplate[MessageAndOffset]
1.3 真正干活的是IteratorTemplate[MessageAndOffset]
的override def makeNext(): MessageAndOffset
, 實際上就是把上面介紹的MessageSet
的結(jié)構(gòu)里的Record
一條條解出來, 對于壓縮后的MessageSet
涉及到一層遞歸,具體可以參見上面的 2.jpg
1.4 放一段核心代碼:
if(isShallow) { //是不是要作深層迭代需要迭代,就是我們上面2.jpg里的M1
new MessageAndOffset(newMessage, offset) //直接返回一條MessageAndOffset
} else { //需要迭代,就是我們上面2.jpg里的M2
newMessage.compressionCodec match {//根據(jù)壓縮Codec決定作什么處理
case NoCompressionCodec => //未壓縮,直接返回一條MessageAndOffset
innerIter = null
new MessageAndOffset(newMessage, offset)
case _ => //壓縮了的MessageSet, 就再深入一層, 逐條解壓讀取
innerIter = ByteBufferMessageSet.deepIterator(newMessage)
if(!innerIter.hasNext)
innerIter = null
makeNext()
}
}
-
private def create(offsetCounter: AtomicLong, compressionCodec: CompressionCodec, messages: Message*): ByteBuffer
: 用于從Message
List到ByteBuffer
的轉(zhuǎn)換, 實際上最后生成的ByteBuffer
里就是上面說的一條Record
if(messages.size == 0) {
MessageSet.Empty.buffer
} else if(compressionCodec == NoCompressionCodec) {
// 非壓縮的
val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
for(message <- messages)
writeMessage(buffer, message, offsetCounter.getAndIncrement)
buffer.rewind()
buffer
} else {
//壓縮的使用 MessageWriter類來寫
var offset = -1L
val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
messageWriter.write(codec = compressionCodec) { outputStream =>
val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream))
try {
//逐條壓縮
for (message <- messages) {
offset = offsetCounter.getAndIncrement
output.writeLong(offset)
output.writeInt(message.size)
output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
}
} finally {
output.close()
}
}
//寫入buffer作為一條Record
val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
writeMessage(buffer, messageWriter, offset)
buffer.rewind()
buffer
}
-
def writeTo(channel: GatheringByteChannel, offset: Long, size: Int): Int
: 寫MessageSet到GatheringByteChannel:
// Ignore offset and size from input. We just want to write the whole buffer to the channel.
buffer.mark()
var written = 0
while(written < sizeInBytes)
written += channel.write(buffer)
buffer.reset()
written
}
- Message驗證和Offset的重新賦值: 這是一個神奇的函數(shù),在broker把收到的producer request里的MessageSet append到Log之前瘟滨,以及consumer和follower獲取消息之后胜卤,都需要進行校驗, 這個函數(shù)就是這個驗證的一部分, 我把相應(yīng)的說明寫在源碼里,這個函數(shù)在后面講到處理log append和consumer時我們還會用到.
private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: AtomicLong,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
compactedTopic: Boolean = false): ByteBufferMessageSet = {
if(sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) { // 非壓縮的Message
// do in-place validation and offset assignment
var messagePosition = 0
buffer.mark()
while(messagePosition < sizeInBytes - MessageSet.LogOverhead) {
buffer.position(messagePosition)
// 根據(jù)參數(shù)傳入的 offsetCountern 更新當前的Offset
buffer.putLong(offsetCounter.getAndIncrement())
val messageSize = buffer.getInt()
val positionAfterKeySize = buffer.position + Message.KeySizeOffset + Message.KeySizeLength
// 如果是compact topic(比如__cosumer_offsets), key是一定要有的, 這里檢查這個key的合法性
if (compactedTopic && positionAfterKeySize < sizeInBytes) {
buffer.position(buffer.position() + Message.KeySizeOffset)
val keySize = buffer.getInt()
if (keySize <= 0) {
buffer.reset()
throw new InvalidMessageException("Compacted topic cannot accept message without key.")
}
}
messagePosition += MessageSet.LogOverhead + messageSize
}
buffer.reset()
this
} else {
// 壓縮的Message, 下面源碼里的注釋已經(jīng)說得很清楚了
// We need to deep-iterate over the message-set if any of these are true:
// (i) messages are compressed
// (ii) the topic is configured with a target compression codec so we need to recompress regardless of original codec
// 深度迭代, 獲取所有的message
val messages = this.internalIterator(isShallow = false).map(messageAndOffset => {
if (compactedTopic && !messageAndOffset.message.hasKey)
throw new InvalidMessageException("Compacted topic cannot accept message without key.")
messageAndOffset.message
})
//使用targetCodec重新壓縮
new ByteBufferMessageSet(compressionCodec = targetCodec, offsetCounter = offsetCounter, messages = messages.toBuffer:_*)
}
}
BufferingOutputStream類
- 所在文件: core/src/main/scala/kafka/message/MessageWriter.scala
- 定義:
class BufferingOutputStream(segmentSize: Int) extends OutputStream
繼承自OutputStream
- 作用: 這個來接納寫入它的各種數(shù)據(jù)類型, 比如int, byte, byte array, 其內(nèi)部定義了
Segment
類, Segment
內(nèi)部使用Array[byte]
來存儲數(shù)據(jù), 多個Segment
連成一個鏈接, 鏈接可以自動擴展,來存儲寫入BufferingOutputStream
的所有數(shù)據(jù)
- 主要方法:
- 一組
write
函數(shù): 用于寫入不能類型的數(shù)據(jù);
-
def reserve(len: Int): ReservedOutput
: 從當前位置開始預(yù)留len長度存儲空間
-
def writeTo(buffer: ByteBuffer): Unit
: 將存儲在Segment
鏈接中的數(shù)據(jù)全部拷貝到ByteBuffer
中 .
MessageWriter
- 所在文件: core/src/main/scala/kafka/message/MessageWriter.scala
- 定義:
class MessageWriter(segmentSize: Int) extends BufferingOutputStream(segmentSize)
, 繼承自上面的BufferingOutputStream
- 作用: 在
ByteBufferMessageSet::create
中用到, 將若干條Message
構(gòu)造成多條對應(yīng)的壓縮后的Record
, 將這個壓縮后的結(jié)果再次作為payload構(gòu)造成一條新的Message;
- 主要方法:
- 構(gòu)造
Message
, 添加Crc, 寫入Magic, Attribete, key size, key.......
def write(key: Array[Byte] = null, codec: CompressionCodec)(writePayload: OutputStream => Unit): Unit = {
withCrc32Prefix {
write(CurrentMagicValue)
var attributes: Byte = 0
if (codec.codec > 0)
attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte
write(attributes)
// write the key
if (key == null) {
writeInt(-1)
} else {
writeInt(key.length)
write(key, 0, key.length)
}
// write the payload with length prefix
withLengthPrefix {
writePayload(this)
}
}
}
FileMessageSet類
- 所在文件:core/src/main/scala/kafka/log/FileMessageSet.scala
- 定義:
class FileMessageSet private[kafka](@volatile var file: File, private[log] val channel: FileChannel, private[log] val start: Int, private[log] val end: Int, isSlice: Boolean) extends MessageSet with Logging
- 作用:用于
MessageSet
與磁盤文件之前的讀取
- 主要方法:
-
def iterator(maxMessageSize: Int): Iterator[MessageAndOffset]
: 返回一個迭代器宝穗,用于獲取對應(yīng)本地log文件里的每一條Record
, 寫入到文件里是不是Message
,而是Record
override def makeNext(): MessageAndOffset = {
if(location >= end)
return allDone()
// read the size of the item
sizeOffsetBuffer.rewind()
// 先讀Record的頭部蚤告,Offset + MessageSize , 共12字節(jié)
channel.read(sizeOffsetBuffer, location)
if(sizeOffsetBuffer.hasRemaining)
return allDone()
sizeOffsetBuffer.rewind()
val offset = sizeOffsetBuffer.getLong()
val size = sizeOffsetBuffer.getInt()
if(size < Message.MinHeaderSize)
return allDone()
if(size > maxMessageSize)
throw new InvalidMessageException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize))
// read the item itself
// 根所MessageSize讀Message
val buffer = ByteBuffer.allocate(size)
channel.read(buffer, location + 12)
if(buffer.hasRemaining)
return allDone()
buffer.rewind()
// increment the location and return the item
location += size + 1
new MessageAndOffset(new Message(buffer), offset)
}
-
def append(messages: ByteBufferMessageSet) { val written = messages.writeTo(channel, 0, messages.sizeInBytes) _size.getAndAdd(written) } :將多條
Record`由內(nèi)存落地到本地Log文件
-
def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int
: 將本地Log文件中的Message發(fā)送到批定的Channel
val newSize = math.min(channel.size().toInt, end) - start
if (newSize < _size.get()) {
throw new KafkaException("Size of FileMessageSet %s has been truncated during write: old size %d, new size %d"
.format(file.getAbsolutePath, _size.get(), newSize))
}
val position = start + writePosition
val count = math.min(size, sizeInBytes)
val bytesTransferred = (destChannel match {
// 利用sendFile系統(tǒng)調(diào)用已零拷貝方式發(fā)送給客戶端
case tl: TransportLayer => tl.transferFrom(channel, position, count)
case dc => channel.transferTo(position, count, dc)
}).toInt
trace("FileMessageSet " + file.getAbsolutePath + " : bytes transferred : " + bytesTransferred
+ " bytes requested for transfer : " + math.min(size, sizeInBytes))
bytesTransferred
總結(jié)
- 我們看到
ByteBufferMessageSet
和FileMessageSet
都是繼承于MessageSet
, 也就是說一條Record
的結(jié)構(gòu)在內(nèi)存和本地文件中的存儲格式是完全一樣的,在Message的讀寫時不用作多余的轉(zhuǎn)換。