最近在做 AWS cost saving 的事情剑辫,對于 Kafka 消息集群阱飘,計劃通過壓縮消息來減少消息存儲所占空間摘昌,從而達(dá)到減少 cost 的目的速妖。本文將結(jié)合源碼從 Kafka 支持的消息壓縮類型、何時需要壓縮聪黎、如何開啟壓縮罕容、何處進(jìn)行解壓縮以及壓縮原理來總結(jié) Kafka 整個消息壓縮機(jī)制。文中所涉及源碼部分均來自于 Kafka 當(dāng)前最新的 3.3.0-SNAPSHOT 版本稿饰。
Kafka支持的消息壓縮類型
什么是 Kafka 的消息壓縮
在談消息壓縮類型之前锦秒,我們先看下 Kafka 中關(guān)于消息壓縮的定義是什么。
Kafka 官網(wǎng) 有這樣一段解釋:
此為 Kafka 中端到端的塊壓縮功能喉镰。如果啟用旅择,數(shù)據(jù)將由 producer 壓縮,以壓縮格式寫入服務(wù)器侣姆,并由 consumer 解壓縮生真。壓縮將提高 consumer 的吞吐量,但需付出一定的解壓成本捺宗。這在跨數(shù)據(jù)中心鏡像數(shù)據(jù)時尤其有用柱蟀。
也就是說,Kafka 的消息壓縮是指將消息本身采用特定的壓縮算法進(jìn)行壓縮并存儲蚜厉,待消費時再解壓长已。
我們知道壓縮就是用時間換空間,其基本理念是基于重復(fù),將重復(fù)的片段編碼為字典术瓮,字典的 key 為重復(fù)片段胶果,value 為更短的代碼,比如序列號斤斧,然后將原始內(nèi)容中的片段用代碼表示早抠,達(dá)到縮短內(nèi)容的效果,壓縮后的內(nèi)容則由字典和代碼序列兩部分組成撬讽。解壓時根據(jù)字典和代碼序列可無損地還原為原始內(nèi)容蕊连。注:有損壓縮不在此次討論范圍。
通常來講游昼,重復(fù)越多甘苍,壓縮效果越好。比如 JSON 是 Kafka 消息中常用的序列化格式烘豌,單條消息內(nèi)可能并沒有多少重復(fù)片段载庭,但如果是批量消息,則會有大量重復(fù)的字段名廊佩,批量中消息越多囚聚,則重復(fù)越多,這也是為什么 Kafka 更偏向塊壓縮标锄,而不是單條消息壓縮顽铸。
消息壓縮類型
目前 Kafka 共支持四種主要的壓縮類型:Gzip、Snappy料皇、Lz4 和 Zstd谓松。關(guān)于這幾種壓縮的特性,
壓縮類型 | 壓縮比率 | CPU 使用率 | 壓縮速度 | 帶寬使用率 |
---|---|---|---|---|
Gzip | Highest | Highest | Slowest | Lowest |
Snappy | Medium | Moderate | Moderate | Medium |
Lz4 | Low | Lowest | Fastest | Highest |
Zstd | Medium | Moderate | Moderate | Medium |
從上表可知践剂,Snappy 在 CPU 使用率鬼譬、壓縮比、壓縮速度和網(wǎng)絡(luò)帶寬使用率之間實現(xiàn)良好的平衡逊脯,我們最終也是采用的該類型進(jìn)行壓縮試點优质。這里值得一提的是,Zstd 是 Facebook 于 2016 年開源的新壓縮算法男窟,壓縮率和壓縮性能都不錯盆赤,具有與 Snappy(Google 杰作)相似的特性,直到 Kafka 的 2.1.0 版本才引入支持歉眷。
針對這幾種壓縮本身的性能牺六,Zstd GitHub 官方 公布了壓測對比結(jié)果如下,
Compressor name | Ratio | Compression | Decompress. |
---|---|---|---|
zstd 1.5.1 -1 | 2.887 | 530 MB/s | 1700 MB/s |
zlib 1.2.11 -1 | 2.743 | 95 MB/s | 400 MB/s |
brotli 1.0.9 -0 | 2.702 | 395 MB/s | 450 MB/s |
zstd 1.5.1 --fast=1 | 2.437 | 600 MB/s | 2150 MB/s |
zstd 1.5.1 --fast=3 | 2.239 | 670 MB/s | 2250 MB/s |
quicklz 1.5.0 -1 | 2.238 | 540 MB/s | 760 MB/s |
zstd 1.5.1 --fast=4 | 2.148 | 710 MB/s | 2300 MB/s |
lzo1x 2.10 -1 | 2.106 | 660 MB/s | 845 MB/s |
lz4 1.9.3 | 2.101 | 740 MB/s | 4500 MB/s |
lzf 3.6 -1 | 2.077 | 410 MB/s | 830 MB/s |
snappy 1.1.9 | 2.073 | 550 MB/s | 1750 MB/s |
可以看到 Zstd 可以通過壓縮速度為代價獲得更高的壓縮比汗捡,二者之間的權(quán)衡可通過 --fast
參數(shù)靈活配置淑际。
何時需要壓縮
壓縮是需要額外的 CPU 代價的畏纲,并且會帶來一定的消息分發(fā)延遲,因而在壓縮前要慎重考慮是否有必要春缕。筆者認(rèn)為需考慮以下幾方面:
- 壓縮帶來的磁盤空間和帶寬節(jié)省遠(yuǎn)大于額外的 CPU 代價盗胀,這樣的壓縮是值得的。
- 數(shù)據(jù)量足夠大且具重復(fù)性锄贼。消息壓縮是批量的票灰,低頻的數(shù)據(jù)流可能都無法填滿一個批量,會影響壓縮比宅荤。數(shù)據(jù)重復(fù)性越高屑迂,往往壓縮效果越好,例如 JSON冯键、XML 等結(jié)構(gòu)化數(shù)據(jù)惹盼;但若數(shù)據(jù)不具重復(fù)性,例如文本都是唯一的 md5 或 UUID 之類惫确,違背了壓縮的重復(fù)性前提手报,壓縮效果可能不會理想。
- 系統(tǒng)對消息分發(fā)的延遲沒有嚴(yán)苛要求改化,可容忍輕微的延遲增長掩蛤。
如何開啟壓縮
Kafka 通過配置屬性 compression.type
控制是否壓縮。該屬性在 producer 端和 broker 端各自都有一份所袁,也就是說盏档,我們可以選擇在 producer 或 broker 端開啟壓縮,對應(yīng)的應(yīng)用場景各有不同燥爷。
在 Broker 端開啟壓縮
compression.type 屬性
Broker 端的 compression.type
屬性默認(rèn)值為 producer
,即直接繼承 producer 端所發(fā)來消息的壓縮方式懦窘,無論消息采用何種壓縮或者不壓縮前翎,broker 都原樣存儲,這一點可以從如下代碼片段看出:
class UnifiedLog(...) extends Logging with KafkaMetricsGroup {
...
private def analyzeAndValidateRecords(records: MemoryRecords,
origin: AppendOrigin,
ignoreRecordSize: Boolean,
leaderEpoch: Int): LogAppendInfo = {
records.batches.forEach { batch =>
...
val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)
if (messageCodec != NoCompressionCodec)
sourceCodec = messageCodec
}
// Apply broker-side compression if any
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec);
}
}
object BrokerCompressionCodec {
val brokerCompressionCodecs = List(UncompressedCodec, ZStdCompressionCodec, LZ4CompressionCodec, SnappyCompressionCodec, GZIPCompressionCodec, ProducerCompressionCodec)
val brokerCompressionOptions: List[String] = brokerCompressionCodecs.map(codec => codec.name)
def isValid(compressionType: String): Boolean = brokerCompressionOptions.contains(compressionType.toLowerCase(Locale.ROOT))
def getCompressionCodec(compressionType: String): CompressionCodec = {
compressionType.toLowerCase(Locale.ROOT) match {
case UncompressedCodec.name => NoCompressionCodec
case _ => CompressionCodec.getCompressionCodec(compressionType)
}
}
def getTargetCompressionCodec(compressionType: String, producerCompression: CompressionCodec): CompressionCodec = {
if (ProducerCompressionCodec.name.equals(compressionType))
producerCompression
else
getCompressionCodec(compressionType)
}
}
sourceCodec
為 recordBatch
上的編碼畅涂,即表示從 producer 端發(fā)來的這批消息的編碼港华。 targetCodec
為 broker 端配置的壓縮編碼,從函數(shù) getTargetCompressionCodec
可以看出最終存儲消息的目標(biāo)編碼是結(jié)合 broker 端的 compressionType
和 producer 端的 producerCompression
綜合判斷的:當(dāng) compressionType
為 producer
時直接采用 producer 端的 producerCompression
午衰,否則就采用 broker 端自身的編碼設(shè)置 compressionType
。從 brokerCompressionCodecs
的取值可看出,compression.type
的可選值為 [uncompressed, zstd, lz4, snappy, gzip, producer]
片效。其中 uncompressed
與 none
是等價的蕉饼,producer
不用多說,其余四個則是標(biāo)準(zhǔn)的壓縮類型帅戒。
broker 和 topic 兩個級別
在 broker 端的壓縮配置分為兩個級別:全局的 broker 級別 和 局部的 topic 級別灯帮。顧名思義,如果配置的是 broker 級別,則對于該 Kafka 集群中所有的 topic 都是生效的钟哥。但如果 topic 級別配置了自己的壓縮類型迎献,則會覆蓋 broker 全局的配置,以 topic 自己配置的為準(zhǔn)腻贰。
broker 級別
要配置 broker 級別的壓縮類型吁恍,可通過 configs
命令修改 compression.type
配置項取值。此處要使修改生效播演,是否需要重啟 broker 取決于 Kafak 的版本践盼,在 1.1.0 之前,任何配置項的改動都需要重啟 broker 才生效宾巍,而從 1.1.0 版本開始咕幻,Kafka 引入了動態(tài) broker 參數(shù),將配置項分為三類:read-only
顶霞、per-broker
和 cluster-wide
肄程,第一類跟原來一樣需重啟才生效,而后面兩類都是動態(tài)生效的选浑,只是影響范圍不同蓝厌,關(guān)于 Kafka 動態(tài)參數(shù),以后單開博文介紹古徒。從 官網(wǎng) 可以看到拓提,compression.type
是屬于 cluster-wide
的,如果是 1.1.0 及之后的版本隧膘,則無需重啟 broker代态。
topic 級別
topic 的配置分為兩部分,一部分是 topic 特有的疹吃,如 partitions 等蹦疑,另一部分則是默認(rèn)采用 broker 配置,但也可以覆蓋萨驶。如果要定義 topic 級別的壓縮歉摧,可以在 topic 創(chuàng)建時通過 --config 選項覆蓋配置項 compression.type
的取值,命令如下:
sh bin/kafka-topics.sh --create --topic my-topic --replication-factor 1 --partitions 1 --config compression.type=snappy
當(dāng)然也可以通過 configs
命令修改 topic 的 compression.type
取值腔呜,命令如下:
bin/kafka-configs.sh --entity-type topics --entity-name my-topic --alter --add-config compression.type=snappy
在 Producer 端壓縮
compression.type 屬性
跟 broker 端一樣叁温,producer 端的壓縮配置屬性依然是 compression.type
,只不過默認(rèn)值和可選值有所不同核畴。默認(rèn)值為 none
膝但,表示不壓縮,可選值為枚舉類 CompressionType
中所有實例對應(yīng) name
的列表膛檀。
開啟壓縮的方式
直接在代碼層面更改 producer 的 config锰镀,示例如下娘侍。但需要注意的是,改完 config 之后泳炉,需要重啟 producer 端的應(yīng)用程序憾筏,壓縮才會生效。
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public KafkaTemplate<byte[], byte[]> kafkaTemplate() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerServer);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
config.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
config.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "3000");
config.put(ProducerConfig.LINGER_MS_CONFIG, "1");
...
// 開啟 Snappy 壓縮
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.SNAPPY.name);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(config));
}
}
壓縮和解壓的位置
何處會壓縮
可能產(chǎn)生壓縮的地方有兩處:producer 端和 broker 端花鹅。
producer 端
producer 端發(fā)生壓縮的唯一條件就是在 producer 端為屬性 compression.type
配置了除 none
之外有效的壓縮類型氧腰。此時,producer 在向所負(fù)責(zé)的所有 topics 發(fā)消息之前刨肃,都會將消息壓縮處理古拴。
broker 端
對于 broker 端,產(chǎn)生壓縮的情況就復(fù)雜得多真友,這不僅取決于 broker 端自身的壓縮編碼 targetCodec
是否是需要壓縮的類型黄痪,還取決于 targetCodec
跟 producer 端的 sourceCodec
是否相同,除此之外盔然,還跟消息格式的 magic
版本有關(guān)桅打。直接看代碼,broker 端的消息讀寫是由 UnifiedLog
負(fù)責(zé)的愈案,消息持久化的核心入口是 append
方法挺尾,代碼如下:
class UnifiedLog(...) extends Logging with KafkaMetricsGroup {
...
private def append(records: MemoryRecords,
origin: AppendOrigin,
interBrokerProtocolVersion: ApiVersion,
validateAndAssignOffsets: Boolean,
leaderEpoch: Int,
requestLocal: Option[RequestLocal],
ignoreRecordSize: Boolean): LogAppendInfo = {
...
val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, leaderEpoch)
// return if we have no valid messages or if this is a duplicate of the last appended entry
if (appendInfo.shallowCount == 0) appendInfo
else {
// trim any invalid bytes or partial messages before appending it to the on-disk log
var validRecords = trimInvalidBytes(records, appendInfo)
// they are valid, insert them in the log
lock synchronized {
maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
localLog.checkIfMemoryMappedBufferClosed()
if (validateAndAssignOffsets) {
// assign offsets to the message set
val offset = new LongRef(localLog.logEndOffset)
appendInfo.firstOffset = Some(LogOffsetMetadata(offset.value))
val now = time.milliseconds
val validateAndOffsetAssignResult = try {
LogValidator.validateMessagesAndAssignOffsets(validRecords,
topicPartition,
offset,
time,
now,
appendInfo.sourceCodec,
appendInfo.targetCodec,
config.compact,
config.recordVersion.value,
config.messageTimestampType,
config.messageTimestampDifferenceMaxMs,
leaderEpoch,
origin,
interBrokerProtocolVersion,
brokerTopicStats,
requestLocal.getOrElse(throw new IllegalArgumentException(
"requestLocal should be defined if assignOffsets is true")))
} catch {
case e: IOException =>
throw new KafkaException(s"Error validating messages while appending to log $name", e)
}
...
} else {
// we are taking the offsets we are given
...
}
...
maybeDuplicate match {
case Some(duplicate) =>
...
localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.offsetOfMaxTimestamp, validRecords)
updateHighWatermarkWithLogEndOffset()
...
trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
s"first offset: ${appendInfo.firstOffset}, " +
s"next offset: ${localLog.logEndOffset}, " +
s"and messages: $validRecords")
if (localLog.unflushedMessages >= config.flushInterval) flush(false)
}
appendInfo
}
}
}
}
}
可以看到,先是采用 analyzeAndValidateRecords
在 recordBatch
的維度對批量消息整體做校驗站绪,比如 CRC遭铺、size 等,不會細(xì)化到單條消息恢准,所以這里不會涉及解壓魂挂。這一步通過之后,會采用 LogValidator.validateMessagesAndAssignOffsets
對 recordBatch
以及單條消息做進(jìn)一步驗證并為消息分配 offset
顷歌,該過程可能涉及解壓锰蓬。完成這一步之后,調(diào)用 localLog.append
方法將消息追加到本地日志眯漩,這一步才是真正的落盤。我們繼續(xù)關(guān)注可能發(fā)生解壓的 LogValidator
部分麻顶,代碼如下:
private[log] object LogValidator extends Logging {
private[log] def validateMessagesAndAssignOffsets(records: MemoryRecords,
topicPartition: TopicPartition,
offsetCounter: LongRef,
time: Time,
now: Long,
sourceCodec: CompressionCodec,
targetCodec: CompressionCodec,
compactedTopic: Boolean,
magic: Byte,
timestampType: TimestampType,
timestampDiffMaxMs: Long,
partitionLeaderEpoch: Int,
origin: AppendOrigin,
interBrokerProtocolVersion: ApiVersion,
brokerTopicStats: BrokerTopicStats,
requestLocal: RequestLocal): ValidationAndOffsetAssignResult = {
if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
// check the magic value
if (!records.hasMatchingMagic(magic))
convertAndAssignOffsetsNonCompressed(records, topicPartition, offsetCounter, compactedTopic, time, now, timestampType,
timestampDiffMaxMs, magic, partitionLeaderEpoch, origin, brokerTopicStats)
else
// Do in-place validation, offset assignment and maybe set timestamp
assignOffsetsNonCompressed(records, topicPartition, offsetCounter, now, compactedTopic, timestampType, timestampDiffMaxMs,
partitionLeaderEpoch, origin, magic, brokerTopicStats)
} else {
validateMessagesAndAssignOffsetsCompressed(records, topicPartition, offsetCounter, time, now, sourceCodec,
targetCodec, compactedTopic, magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, origin,
interBrokerProtocolVersion, brokerTopicStats, requestLocal)
}
}
...
}
從上可知赦抖,當(dāng) broker 端配置的壓縮編碼 targetCodec
與所收到的批量消息的壓縮編碼 sourceCodec
都為 none
即不壓縮時,會再檢查消息格式的版本辅肾,如果與 broker 端配置的版本不同队萤,則需要先將原批量消息轉(zhuǎn)換為目標(biāo)版本 magic
對應(yīng)格式的新批量消息,然后再在新批量消息中分配 offset
矫钓;否則直接在原批量消息中就地分配 offset
要尔,此過程均不涉及解壓縮舍杜。這里稍微解釋下分配 offset
的邏輯,我們知道在 Kafka 中 offset
是 partition
下每條消息的唯一標(biāo)識赵辕,consumer 端也是根據(jù) offset
來追蹤消費進(jìn)度既绩,而 offset
的生成和寫入則是在 broker 端,就是此處提到的 offset
分配还惠。理論上說饲握,broker 需要為每條消息都分配一個 offset
的,但在實踐中蚕键,因為用的是 recordBatch
救欧,內(nèi)部消息是順序排列的且總記錄數(shù)是知道的,而 recordBatch
本身會記錄 baseOffset
锣光,故通常只需設(shè)置 lastOffset
即可笆怠。唯一的例外是,當(dāng)因消息格式轉(zhuǎn)換或解壓縮而需要創(chuàng)建新的 recordBatch
時誊爹,會調(diào)用 memoryRecordsBuilder
的 appendWithOffset
方法為每一條消息記錄分配 offset
蹬刷。
當(dāng) targetCodec
與 sourceCodec
至少有一個不為 none
即需要壓縮時,情況就復(fù)雜一些替废,具體邏輯都在 validateMessagesAndAssignOffsetsCompressed
方法中箍铭,
private[log] object LogValidator extends Logging {
...
def validateMessagesAndAssignOffsetsCompressed(...): ValidationAndOffsetAssignResult = {
...
// No in place assignment situation 1
var inPlaceAssignment = sourceCodec == targetCodec
var maxTimestamp = RecordBatch.NO_TIMESTAMP
val expectedInnerOffset = new LongRef(0)
val validatedRecords = new mutable.ArrayBuffer[Record]
var uncompressedSizeInBytes = 0
// Assume there's only one batch with compressed memory records; otherwise, return InvalidRecordException
// One exception though is that with format smaller than v2, if sourceCodec is noCompression, then each batch is actually
// a single record so we'd need to special handle it by creating a single wrapper batch that includes all the records
val firstBatch = getFirstBatchAndMaybeValidateNoMoreBatches(records, sourceCodec)
// No in place assignment situation 2 and 3: we only need to check for the first batch because:
// 1. For most cases (compressed records, v2, for example), there's only one batch anyways.
// 2. For cases that there may be multiple batches, all batches' magic should be the same.
if (firstBatch.magic != toMagic || toMagic == RecordBatch.MAGIC_VALUE_V0)
inPlaceAssignment = false
// Do not compress control records unless they are written compressed
if (sourceCodec == NoCompressionCodec && firstBatch.isControlBatch)
inPlaceAssignment = true
records.batches.forEach { batch =>
validateBatch(topicPartition, firstBatch, batch, origin, toMagic, brokerTopicStats)
uncompressedSizeInBytes += AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
// if we are on version 2 and beyond, and we know we are going for in place assignment,
// then we can optimize the iterator to skip key / value / headers since they would not be used at all
val recordsIterator = if (inPlaceAssignment && firstBatch.magic >= RecordBatch.MAGIC_VALUE_V2)
batch.skipKeyValueIterator(requestLocal.bufferSupplier)
else
batch.streamingIterator(requestLocal.bufferSupplier)
try {
val recordErrors = new ArrayBuffer[ApiRecordError](0)
// this is a hot path and we want to avoid any unnecessary allocations.
var batchIndex = 0
recordsIterator.forEachRemaining { record =>
val expectedOffset = expectedInnerOffset.getAndIncrement()
val recordError = validateRecordCompression(batchIndex, record).orElse {
validateRecord(batch, topicPartition, record, batchIndex, now,
timestampType, timestampDiffMaxMs, compactedTopic, brokerTopicStats).orElse {
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
if (record.timestamp > maxTimestamp)
maxTimestamp = record.timestamp
// Some older clients do not implement the V1 internal offsets correctly.
// Historically the broker handled this by rewriting the batches rather
// than rejecting the request. We must continue this handling here to avoid
// breaking these clients.
if (record.offset != expectedOffset)
inPlaceAssignment = false
}
None
}
}
recordError match {
case Some(e) => recordErrors += e
case None =>
uncompressedSizeInBytes += record.sizeInBytes()
validatedRecords += record
}
batchIndex += 1
}
processRecordErrors(recordErrors)
} finally {
recordsIterator.close()
}
}
if (!inPlaceAssignment) {
val (producerId, producerEpoch, sequence, isTransactional) = {
// note that we only reassign offsets for requests coming straight from a producer. For records with magic V2,
// there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
// with older magic versions, there will never be a producer id, etc.
val first = records.batches.asScala.head
(first.producerId, first.producerEpoch, first.baseSequence, first.isTransactional)
}
buildRecordsAndAssignOffsets(toMagic, offsetCounter, time, timestampType, CompressionType.forId(targetCodec.codec),
now, validatedRecords, producerId, producerEpoch, sequence, isTransactional, partitionLeaderEpoch,
uncompressedSizeInBytes)
} else {
// we can update the batch only and write the compressed payload as is;
// again we assume only one record batch within the compressed set
val batch = records.batches.iterator.next()
val lastOffset = offsetCounter.addAndGet(validatedRecords.size) - 1
batch.setLastOffset(lastOffset)
if (timestampType == TimestampType.LOG_APPEND_TIME)
maxTimestamp = now
if (toMagic >= RecordBatch.MAGIC_VALUE_V1)
batch.setMaxTimestamp(timestampType, maxTimestamp)
if (toMagic >= RecordBatch.MAGIC_VALUE_V2)
batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
val recordConversionStats = new RecordConversionStats(uncompressedSizeInBytes, 0, 0)
ValidationAndOffsetAssignResult(validatedRecords = records,
maxTimestamp = maxTimestamp,
shallowOffsetOfMaxTimestamp = lastOffset,
messageSizeMaybeChanged = false,
recordConversionStats = recordConversionStats)
}
}
...
}
可以看到,inPlaceAssignment
是用于標(biāo)識是否可以原地修改 recordBatch
來分配 offset
椎镣,有三種情況不能原地修改:
- sourceCodec 和 targetCodec 不同诈火,這個比較好理解,編碼不同状答,構(gòu)建目標(biāo) payload 時原
recordBatch
自然不能復(fù)用冷守。 - 目標(biāo)消息格式版本
magic
與 broker 接收到的recordBatch
的magic
不同,此時需要消息格式轉(zhuǎn)換惊科,需要構(gòu)建新的recordBatch
拍摇,這個跟第一種情況是一樣的,無法復(fù)用原recordBatch
馆截。 - 目標(biāo)消息格式版本為
V0
充活,因為老版本V0
格式的消息,需要為每條消息重新分配絕對offset
蜡娶,無法復(fù)用原recordBatch
混卵。
此時,inPlaceAssignment
為 false窖张,直接走 buildRecordsAndAssignOffsets
邏輯來構(gòu)建新的 recordBatch
幕随,此時是否壓縮取決于 targetCodec
,如果不為none
宿接,則此處會按照 targetCodec
編碼進(jìn)行壓縮赘淮。
除了上述三種情況之外辕录,都是可以原地修改,此時可以直接復(fù)用原 recordBatch
來構(gòu)建目標(biāo)消息的 payload梢卸,此時不存在壓縮處理走诞。
何處會解壓
可能發(fā)生解壓的地方依然是兩處:consumer 端和 broker 端。
consumer 端
consumer 端發(fā)生解壓的唯一條件就是從 broker 端拉取到的消息是帶壓縮的低剔。此時速梗,consumer 會根據(jù) recordBatch
中 compressionType
來對消息進(jìn)行解壓,具體細(xì)節(jié)后面源碼分析部分會講襟齿。
broker 端
broker 端是否發(fā)生解壓取決于 producer 發(fā)過來的批量消息 recordBatch
是否是壓縮的:如果 producer 開啟了壓縮姻锁,則會發(fā)生解壓,否則不會猜欺。原因簡單說下位隶,在 broker 端持久化消息前,會對消息做各種驗證开皿,此時必然會迭代 recordBatch
涧黄,而在迭代的過程中,會直接采用 recordBatch
上的 compressionType
對消息字節(jié)流進(jìn)行處理赋荆,是否解壓取決于 compressionType
是否是壓縮類型笋妥。關(guān)于這點,可以在 LogValidator
的 validateMessagesAndAssignOffsets
方法實現(xiàn)中可以看到窄潭,在 convertAndAssignOffsetsNonCompressed
春宣、assignOffsetsNonCompressed
和 validateMessagesAndAssignOffsetsCompressed
三個不同的分支中,都會看到 records.batches.forEach {...}
的身影嫉你,而在后面的源碼分析中會發(fā)現(xiàn)月帝,在 recordBatch
的迭代器邏輯中,直接采用的 compressionType
的解壓邏輯對消息字節(jié)流讀取的幽污。也就是說嚷辅,如果 recordBatch
是壓縮的 ,只要對其進(jìn)行了迭代訪問距误,則會自動觸發(fā)解壓邏輯簸搞。
壓縮和解壓原理
壓縮和解壓涉及到幾個關(guān)鍵的類:CompressionType
、MemoryRecordsBuilder
准潭、DefaultRecordBatch
攘乒、AbstractLegacyRecordBatch
。其中 CompressionType
是壓縮相關(guān)的枚舉惋鹅,集壓縮定義和實現(xiàn)為一體;MemoryRecordsBuilder
是負(fù)責(zé)將新的消息數(shù)據(jù)寫入內(nèi)存 buffer殉簸,即調(diào)用 CompressionType
中的壓縮邏輯 wrapForOutput
來寫入消息闰集;而 DefaultRecordBatch
和 AbstractLegacyRecordBatch
則是負(fù)責(zé)讀取消息數(shù)據(jù)沽讹,即調(diào)用 CompressionType
的解壓邏輯 wrapForInput
將消息還原為無壓縮數(shù)據(jù)。只不過二者區(qū)別是武鲁,前者是用于處理新版本格式的消息(即 magic >= 2
)爽雄,而后者則是處理老版本格式的消息(即 magic 為 0 或 1
)。
CompressionType
在說 CompressionType
之前沐鼠,我們先看下 CompressionCodec
這個 Scala 腳本挚瘟。
CompressionCodec
部分源碼如下,
...
case object GZIPCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
val codec = 1
val name = "gzip"
}
case object SnappyCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
val codec = 2
val name = "snappy"
}
case object LZ4CompressionCodec extends CompressionCodec with BrokerCompressionCodec {
val codec = 3
val name = "lz4"
}
case object ZStdCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
val codec = 4
val name = "zstd"
}
case object NoCompressionCodec extends CompressionCodec with BrokerCompressionCodec {
val codec = 0
val name = "none"
}
case object UncompressedCodec extends BrokerCompressionCodec {
val name = "uncompressed"
}
case object ProducerCompressionCodec extends BrokerCompressionCodec {
val name = "producer"
}
該腳本定義了 GZIPCompressionCodec
等共 7 個 case object饲梭,可類比于 Java 中枚舉乘盖,這些 case object 中的 name
集合則剛好覆蓋了前文所提到的屬性 compression.type
的所有可選值,包括 producer 端和 broker 端的憔涉。而與 name
綁定在一起的 codec
則是最終真正寫入消息體的壓縮編碼订框,name
只是為了可讀性友好。從上可知兜叨,壓縮編碼codec
的有效取值只有 0~4
穿扳,分別對應(yīng) none
、gzip
国旷、snappy
矛物、lz4
和zstd
,而這五種取值恰好是 CompressionType
中定義的五種枚舉常量跪但。
由此可知履羞,CompressionCodec
是面向配置屬性 compression.type
的可選值的,并將數(shù)值化的壓縮編碼 codec
映射為可讀性強(qiáng)的 name
特漩;而 CompressionType
則是定義了與壓縮編碼對應(yīng)的枚舉常量吧雹,二者通過 name
關(guān)聯(lián)。
CompressionType 源碼
CompressionType
定義了與壓縮編碼對應(yīng)的五種壓縮類型枚舉涂身,并且通過用于壓縮的 wrapForOutput
和用于解壓的 wrapForInput
這兩個抽象方法將每種壓縮類型與對應(yīng)的壓縮實現(xiàn)綁定在一起雄卷,既避免了常規(guī)的 if-else
判斷,也將壓縮的定義與實現(xiàn)完全收斂到 CompressionType
蛤售,符合單一職責(zé)原則丁鹉。其實類似這種優(yōu)雅的設(shè)計在 JDK 中也能經(jīng)常看到其身影悴能,比如 TimeUnit
揣钦。直接看源碼,
public enum CompressionType {
...
GZIP(1, "gzip", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
try {
return new BufferedOutputStream(new GZIPOutputStream(buffer, 8 * 1024), 16 * 1024);
} catch (Exception e) {
throw new KafkaException(e);
}
}
@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
// Set output buffer (uncompressed) to 16 KB (none by default) and input buffer (compressed) to
// 8 KB (0.5 KB by default) to ensure reasonable performance in cases where the caller reads a small
// number of bytes (potentially a single byte)
return new BufferedInputStream(new GZIPInputStream(new ByteBufferInputStream(buffer), 8 * 1024),
16 * 1024);
} catch (Exception e) {
throw new KafkaException(e);
}
}
},
...
ZSTD(4, "zstd", 1.0f) {
@Override
public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) {
return ZstdFactory.wrapForOutput(buffer);
}
@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
return ZstdFactory.wrapForInput(buffer, messageVersion, decompressionBufferSupplier);
}
};
...
// Wrap bufferStream with an OutputStream that will compress data with this CompressionType.
public abstract OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte messageVersion);
// Wrap buffer with an InputStream that will decompress data with this CompressionType.
public abstract InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, BufferSupplier decompressionBufferSupplier);
...
}
每種壓縮類型對于 wrapForOutput
和 wrapForInput
兩方法的具體實現(xiàn)已經(jīng)很清楚地闡述了壓縮和解壓的方式漠酿,感興趣的朋友可以從該入口 step in
一探究竟冯凹。這里就不細(xì)述。當(dāng)然這只是處理壓縮最小的基本單元炒嘲,為了搞清楚 Kafka 在何處使用它宇姚,還得繼續(xù)看其他幾個核心類匈庭。
在此之前,就上述源碼浑劳,拋開本次主題阱持,我還想談幾個值得學(xué)習(xí)借鑒的細(xì)節(jié),
Snappy
和Zstd
都是用的XXXFactory
靜態(tài)方法來構(gòu)建 Stream 對象魔熏,而其他的比如Lz4
則都是直接通過new
創(chuàng)建的對象衷咽。之所以這么做,我們進(jìn)一步step in
就會發(fā)現(xiàn)蒜绽,對于Snappy
和Zstd
镶骗,Kafka 都是直接依賴的第三方庫,而其他的則是 JDK 或 Kafka 自己的實現(xiàn)滓窍。為了減少第三方庫的副作用卖词,通過此方式將第三方庫的類的惰性加載做到極致,這也體現(xiàn)出作者對 Java 類加載時機(jī)的充分理解吏夯,很精致的處理此蜈。Gzip
的wrapForInput
實現(xiàn)中,在 KAFKA-6430 這個 Improvement 提交中噪生,input buffer 從 0.5 KB 調(diào)大到 8 KB裆赵,其目的就是能夠在一次 Gzip 壓縮中處理更多的字節(jié),以獲得更高的性能跺嗽。至少战授,從 commit 的描述上看,throughput 能翻倍桨嫁。- 抽象方法
wrapForInput
中暴露的最后一個 BufferSupplier類型的參數(shù)decompressionBufferSupplier
植兰,正如方法的參數(shù)說明所言,對于比較小的批量消息璃吧,如果在wrapForInput
內(nèi)部新建 buffer楣导,那么每次方法調(diào)用都會新分配buffer,這可能比壓縮處理本身更耗時畜挨,所以該參數(shù)給了一個選擇的機(jī)會筒繁,在外面分配內(nèi)存,然后方法內(nèi)循環(huán)利用巴元。在日常的編碼中毡咏,對于循環(huán)中所需的空間,我也經(jīng)常會思考是每次新建好還是先在外面分配逮刨,然后內(nèi)部循環(huán)利用更好呕缭,case by case.
MemoryRecordsBuilder
public class MemoryRecordsBuilder implements AutoCloseable {
...
// Used to append records, may compress data on the fly
private DataOutputStream appendStream;
...
public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
long baseOffset,
long logAppendTime,
long producerId,
short producerEpoch,
int baseSequence,
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
int writeLimit,
long deleteHorizonMs) {
if (magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE)
throw new IllegalArgumentException("TimestampType must be set for magic > 0");
if (magic < RecordBatch.MAGIC_VALUE_V2) {
if (isTransactional)
throw new IllegalArgumentException("Transactional records are not supported for magic " + magic);
if (isControlBatch)
throw new IllegalArgumentException("Control records are not supported for magic " + magic);
if (compressionType == CompressionType.ZSTD)
throw new IllegalArgumentException("ZStandard compression is not supported for magic " + magic);
if (deleteHorizonMs != RecordBatch.NO_TIMESTAMP)
throw new IllegalArgumentException("Delete horizon timestamp is not supported for magic " + magic);
}
...
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
...
}
public void close() {
...
if (numRecords == 0L) {
buffer().position(initialPosition);
builtRecords = MemoryRecords.EMPTY;
} else {
if (magic > RecordBatch.MAGIC_VALUE_V1)
this.actualCompressionRatio = (float) writeDefaultBatchHeader() / this.uncompressedRecordsSizeInBytes;
else if (compressionType != CompressionType.NONE)
this.actualCompressionRatio = (float) writeLegacyCompressedWrapperHeader() / this.uncompressedRecordsSizeInBytes;
ByteBuffer buffer = buffer().duplicate();
buffer.flip();
buffer.position(initialPosition);
builtRecords = MemoryRecords.readableRecords(buffer.slice());
}
}
...
private int writeDefaultBatchHeader() {
...
DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType,
baseTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch,
hasDeleteHorizonMs(), partitionLeaderEpoch, numRecords);
buffer.position(pos);
return writtenCompressed;
}
private int writeLegacyCompressedWrapperHeader() {
...
int wrapperSize = pos - initialPosition - Records.LOG_OVERHEAD;
int writtenCompressed = wrapperSize - LegacyRecord.recordOverhead(magic);
AbstractLegacyRecordBatch.writeHeader(buffer, lastOffset, wrapperSize);
long timestamp = timestampType == TimestampType.LOG_APPEND_TIME ? logAppendTime : maxTimestamp;
LegacyRecord.writeCompressedRecordHeader(buffer, magic, wrapperSize, timestamp, compressionType, timestampType);
buffer.position(pos);
return writtenCompressed;
}
}
可以看到,appendStream
是用于追加消息到內(nèi)存 buffer 的,直接采用的 compressionType
的壓縮邏輯來構(gòu)建寫入流的臊旭,如果此處 compressionType
屬于非 none
的有效壓縮類型落恼,則會產(chǎn)生壓縮。此外离熏,從上面 magic
的判斷邏輯可知,消息的時間戳類型是從大版本 V1
開始支持的戴涝;而事務(wù)消息滋戳、控制消息、Zstd 壓縮和 deleteHorizonMs
都是從 V2
才開始支持的啥刻。這里的 V1
奸鸯、V2
對應(yīng)消息格式的版本,其中 V1
是從 0.10.0 版本開始引入的可帽,在此之前都是 V0
版本娄涩,而 V2
則是從 0.11.0 版本開始引入,直到現(xiàn)在的最新版依然是 V2
映跟。
從 close()
方法可以看出蓄拣,MemoryRecordsBuilder
在構(gòu)建 memoryRecords
時,會根據(jù)消息格式的版本高低努隙,寫入不同的 Header球恤。對于新版消息,在 writeDefaultBatchHeader
方法中直接調(diào)用 DefaultRecordBatch.writeHeader(...)
寫入新版消息特定的 Header荸镊;而對于老版消息咽斧,則是在 writeLegacyCompressedWrapperHeader
方法中調(diào)用 AbstractLegacyRecordBatch.writeHeader
和 LegacyRecord.writeCompressedRecordHeader
寫入老版消息的 Header。雖然 Header 的格式各不相同躬存,但我們在兩種 Header 中都可以看到 compressionType
的身影张惹,以此可見,Kafka 是允許多種版本的消息共存的岭洲,以及壓縮與非壓縮消息的共存宛逗,因為這些信息是保存在 recordBatch
上的,是批量消息級別钦椭。
DefaultRecordBatch
public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch {
...
@Override
public Iterator<Record> iterator() {
if (count() == 0)
return Collections.emptyIterator();
if (!isCompressed())
return uncompressedIterator();
// for a normal iterator, we cannot ensure that the underlying compression stream is closed,
// so we decompress the full record set here. Use cases which call for a lower memory footprint
// can use `streamingIterator` at the cost of additional complexity
try (CloseableIterator<Record> iterator = compressedIterator(BufferSupplier.NO_CACHING, false)) {
List<Record> records = new ArrayList<>(count());
while (iterator.hasNext())
records.add(iterator.next());
return records.iterator();
}
}
...
}
RecordBatch
是表示批量消息的接口拧额,對于老版格式的消息(版本 V0
和 V1
),如果沒有壓縮彪腔,只會包含單條消息侥锦,否則可以包含多條;而新版格式消息(版本 V2
及以上)無論是否壓縮德挣,都是通常包含多條消息恭垦。且該接口中有一個 compressionType()
方法來標(biāo)識該 batch 的壓縮類型,它會作為讀消息時解壓的判斷依據(jù)。而上面的 DefaultRecordBatch
則是該接口的針對新版本格式消息的默認(rèn)實現(xiàn)番挺,它也實現(xiàn)了 Iterable<Record>
接口唠帝,因而 iterator()
是訪問批量消息的核心邏輯,當(dāng) compressionType()
返回 none
時玄柏,表示不壓縮襟衰,直接返回非壓縮迭代器,此處跳過粪摘,當(dāng)有壓縮時瀑晒,走的是壓縮迭代器,具體實現(xiàn)如下徘意,
public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
final ByteBuffer buffer = this.buffer.duplicate();
buffer.position(RECORDS_OFFSET);
return new DataInputStream(compressionType().wrapForInput(buffer, magic(), bufferSupplier));
}
private CloseableIterator<Record> compressedIterator(BufferSupplier bufferSupplier, boolean skipKeyValue) {
final DataInputStream inputStream = recordInputStream(bufferSupplier);
if (skipKeyValue) {
// this buffer is used to skip length delimited fields like key, value, headers
byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
return new StreamRecordIterator(inputStream) {
...
}
} else {
...
}
}
我們可以看到苔悦,compressedIterator()
在構(gòu)造 Stream 迭代器之前,調(diào)用了 recordInputStream(...)
椎咧,該方法中通過 compressionType
的解壓邏輯對原數(shù)據(jù)進(jìn)行了解壓玖详。
AbstractLegacyRecordBatch
public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch implements Record {
...
CloseableIterator<Record> iterator(BufferSupplier bufferSupplier) {
if (isCompressed())
return new DeepRecordsIterator(this, false, Integer.MAX_VALUE, bufferSupplier);
return new CloseableIterator<Record>() {
private boolean hasNext = true;
@Override
public void close() {}
@Override
public boolean hasNext() {
return hasNext;
}
@Override
public Record next() {
if (!hasNext)
throw new NoSuchElementException();
hasNext = false;
return AbstractLegacyRecordBatch.this;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
...
private static class DeepRecordsIterator extends AbstractIterator<Record> implements CloseableIterator<Record> {
private DeepRecordsIterator(AbstractLegacyRecordBatch wrapperEntry,
boolean ensureMatchingMagic,
int maxMessageSize,
BufferSupplier bufferSupplier) {
LegacyRecord wrapperRecord = wrapperEntry.outerRecord();
this.wrapperMagic = wrapperRecord.magic();
if (wrapperMagic != RecordBatch.MAGIC_VALUE_V0 && wrapperMagic != RecordBatch.MAGIC_VALUE_V1)
throw new InvalidRecordException("Invalid wrapper magic found in legacy deep record iterator " + wrapperMagic);
CompressionType compressionType = wrapperRecord.compressionType();
if (compressionType == CompressionType.ZSTD)
throw new InvalidRecordException("Invalid wrapper compressionType found in legacy deep record iterator " + wrapperMagic);
ByteBuffer wrapperValue = wrapperRecord.value();
if (wrapperValue == null)
throw new InvalidRecordException("Found invalid compressed record set with null value (magic = " +
wrapperMagic + ")");
InputStream stream = compressionType.wrapForInput(wrapperValue, wrapperRecord.magic(), bufferSupplier);
...
}
}
}
AbstractLegacyRecordBatch
跟前面的 DefaultRecordBatch
大同小異,同樣也是 iterator()
入口勤讽,當(dāng)開啟了壓縮時蟋座,返回壓縮迭代器 DeepRecordsIterator
,只是名字不同而已地技,迭代器內(nèi)部依然是直接通過 compressionType
的解壓邏輯對數(shù)據(jù)流進(jìn)行解壓蜈七。
原文首發(fā)于:https://www.yangbing.fun/2022/04/30/compression-mechanism-of-the-Kafka-message/