圖片展示不了請到我的CSDN下看
https://blog.csdn.net/u013332124/article/details/82793381
一. 日志清理是干什么的稽莉?
kafka的日志清理機制主要用于縮減日志的大小慈迈,它并不是指通過壓縮算法對日志文件進(jìn)行壓縮,而是對重復(fù)的日志進(jìn)行清理來達(dá)到目的剧劝。在日志清理過程中扛门,會清理重復(fù)的key烦衣,最后只會保留最后一條key,可以理解為map的put方法勤众。在清理完后舆绎,一些segment的文件大小就會變小,這時候们颜,kafka會將那些小的文件再合并成一個大的segment文件吕朵。
另外,通過日志清理功能窥突,我們可以做到刪除某個key的功能努溃。推送value為null的key到kafka,kafka在做日志清理時就會將這條key從日志中刪去阻问。
[圖片上傳失敗...(image-30d677-1537448233809)]
二. 清理相關(guān)原理
對于每一個kafka partition的日志梧税,以segment為單位,都會被分為兩部分,已清理和未清理的部分第队。同時哮塞,未清理的那部分又分為可以清理的和不可清理的。
[圖片上傳失敗...(image-156a98-1537448233809)]
每個日志目錄下都會有一個文件cleaner-offset-checkpoint
來記錄當(dāng)前清理到哪里了凳谦,這時候kafka就知道哪部分是已經(jīng)清理的忆畅,哪部分是未清理的。
接著尸执,在未清理的segment中邻眷,找出可以清理的那部分segment。首先剔交,active segment肯定是不能清理的肆饶。接著kafka會根據(jù)min.compaction.lag.ms
配置找出不能清理的segment,規(guī)則是根據(jù)segment最后的一條記錄的插入時間是否已經(jīng)超過最小保留時間岖常,如果沒有驯镊,這個segment就不能清理。這是為了保證日志至少存留多長時間才會被清理竭鞍。
找出可以清理的segment后板惑,kafka會構(gòu)建一個SkimpyOffsetMap對象,這個對象是一個key與offset的映射關(guān)系的哈希表偎快。接著會遍歷可以清理那部分的segment的每一條日志冯乘,然后將key和offset存到SkimpyOffsetMap中。
之后晒夹,再遍歷已清理部分和可以清理部分的segment的每一條日志裆馒,根據(jù)SkimpyOffsetMap來判斷是否保留。假設(shè)一條日志key的offset是1丐怯,但是在SkimpyOffsetMap中對應(yīng)key的offset是100喷好,那么這條日志就可以清楚掉了。
最后读跷,再兩次遍歷后梗搅,可清理部分的segment已變已清理的segment了。同時cleaner checkpoint會執(zhí)行已經(jīng)清理的segment的最后一條offset效览。
三无切、墓碑消息(tombstone)
對于value為null的日志,kafka稱這種日志為tombstone丐枉,也就是墓碑消息哆键。在執(zhí)行日志清理時,會刪除到期的墓碑消息矛洞。墓碑消息的存放時間和broker的配置log.cleaner.delete.retention.ms
有關(guān)洼哎,它的默認(rèn)值是24小時。
kafka做日志清理時沼本,會根據(jù)一些規(guī)則判斷是否要保留墓碑消息噩峦。判斷規(guī)則如下:
所在LogSegment的lastModifiedTime + deleteRetionMs > 可清理部分中最后一個LogSegment的lastModifiedTime
所以,墓碑消息的保留時間和已清理部分的最后一個segment有關(guān)系抽兆。
四识补、日志segment合并
再經(jīng)過一次次清理后,各個segment大小會慢慢變小辫红。為了避免日志目錄下有過多的小文件凭涂,kafka在每次日志清理后會進(jìn)行小文件日志合并。kafka會保證合并后的segment大小不超過segmentSize(通過log.segments.bytes設(shè)置贴妻,默認(rèn)值是1G)切油,且對應(yīng)的索引文件占用大小之和不超過maxIndexSize(可以通過broker端參數(shù)log.index.interval.bytes設(shè)置,默認(rèn)值為10MB)名惩。
下面是日志合并的示意圖:
[圖片上傳失敗...(image-56adee-1537448233809)]
五澎胡、清理線程的啟動
kafka日志清理是交給LogCleaner組件來完成的。
kafka在啟動LogManager時娩鹉,如果日志清理機制開啟的話攻谁,就會啟動LogCleaner組件開始定時的清理日志。是否開啟日志清理是由broker的log.cleaner.enable
來決定的弯予,默認(rèn)是開啟的戚宦。
LogCleaner啟動后,會注冊n個線程CleanerThread锈嫩,開始不斷的檢查日志并清理受楼。這個線程數(shù)量和broker的配置log.cleaner.threads
有關(guān)系,默認(rèn)值是1呼寸。當(dāng)清理線程啟動后那槽,就開始檢查是否有日志需要清理,接著清理完再檢查是否有日志需要清理等舔。如果發(fā)現(xiàn)沒有需要清理的日志骚灸,這個線程會進(jìn)入休眠,休眠時間根據(jù)broker的log.cleaner.backoff.ms
來決定慌植,默認(rèn)值是15000甚牲。
//LogCleaner.scala
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs)
private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
def startup() {
info("Starting the log cleaner")
cleaners.foreach(_.start())
}
//CleanerThread.scala
override def doWork() {
cleanOrSleep()
}
private def cleanOrSleep() {
//獲取哪些日志可以清理,grabFilthiestCompactedLog方法只會返回一個partition的日志
val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
case None =>
false
case Some(cleanable) =>
//這里拿到要清理的日志
var endOffset = cleanable.firstDirtyOffset
try {
//開始清理日志
val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable)
recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats)
endOffset = nextDirtyOffset
} catch {
case _: LogCleaningAbortedException => // task can be aborted, let it go.
} finally {
cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
}
true
}
//刪除一些舊的日志
val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
deletable.foreach{
case (topicPartition, log) =>
try {
log.deleteOldSegments()
} finally {
cleanerManager.doneDeleting(topicPartition)
}
}
//如果沒有要清理的日志,就進(jìn)入休眠
if (!cleaned)
backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS)
}
六蝶柿、通過dirtyRatio獲取要清理的partition日志
在cleanerManager.grabFilthiestCompactedLog
方法中丈钙,在這里,kafka會遍歷該broker上所有partition目錄交汤,判斷這些partition是否可以清理雏赦,然后從可以清理的那些partition中找出dirtyRatio最高的日志劫笙,開始清理。
//CleanerManager.scala
def grabFilthiestCompactedLog(time: Time): Option[LogToClean] = {
inLock(lock) {
val now = time.milliseconds
this.timeOfLastRun = now
val lastClean = allCleanerCheckpoints
val dirtyLogs = logs.filter {
//判斷這個partition log是否可以清理
case (_, log) => log.config.compact // match logs that are marked as compacted
}.filterNot {
//可能其他線程在清理這個partition log了
case (topicPartition, _) => inProgress.contains(topicPartition) // skip any logs already in-progress
}.map {
case (topicPartition, log) => // create a LogToClean instance for each
//獲取可清理部分的第一條offset和不可清理部分的第一條offset
val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicPartition,
lastClean, now)
LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset)
}.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
// 獲取dirtyRatio最高的partiton log
val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
if(cleanableLogs.isEmpty) {
None
} else {
val filthiest = cleanableLogs.max
inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
Some(filthiest)
}
}
}
def cleanableOffsets(log: Log, topicPartition: TopicPartition, lastClean: immutable.Map[TopicPartition, Long], now: Long): (Long, Long) = {
val lastCleanOffset: Option[Long] = lastClean.get(topicPartition)
// 找出之前清理到哪個offset了星岗,從而找到未清理部分的第一條offset
val logStartOffset = log.logSegments.head.baseOffset
val firstDirtyOffset = {
val offset = lastCleanOffset.getOrElse(logStartOffset)
if (offset < logStartOffset) {
// don't bother with the warning if compact and delete are enabled.
if (!isCompactAndDelete(log))
warn(s"Resetting first dirty offset to log start offset $logStartOffset since the checkpointed offset $offset is invalid.")
logStartOffset
} else {
offset
}
}
// 先把active segment排除出去
val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset)
val compactionLagMs = math.max(log.config.compactionLagMs, 0L)
//找出不可清理部分的第一條offset填大,其中active segment
//再通過compactionLagMs過濾掉那些不能清理的segment
val firstUncleanableDirtyOffset: Long = Seq (
Option(log.activeSegment.baseOffset),
if (compactionLagMs > 0) {
dirtyNonActiveSegments.find {
s =>
val isUncleanable = s.largestTimestamp > now - compactionLagMs
isUncleanable
} map(_.baseOffset)
} else None
).flatten.min
(firstDirtyOffset, firstUncleanableDirtyOffset)
}
注意以下幾點:
- 是否開啟topic的日志清理機制和broker的log.cleanup.policy有關(guān)。這個配置的默認(rèn)值是[delete]俏橘,也就是沒有開啟允华。但是并不是所有的partition log都會根據(jù)這個配置來判斷是否開啟日志清理。因為每個topic在創(chuàng)建的時候寥掐,也會指定是否開啟日志清理(會覆蓋broker的那個配置)靴寂。所以需要遍歷所有的partiton,排除掉那些不用清理的partition召耘。
- dirtyRatio的計算規(guī)則為
dirtyRatio = dirtyBytes / (cleanBytes + dirtyBytes)
百炬。其中dirtyBytes表示可清理部分的日志大小,cleanBytes表示已清理部分的日志大小污它。