kafka 日志清理機制——LogCompact(七)

圖片展示不了請到我的CSDN下看
https://blog.csdn.net/u013332124/article/details/82793381

一. 日志清理是干什么的稽莉?

kafka的日志清理機制主要用于縮減日志的大小慈迈,它并不是指通過壓縮算法對日志文件進(jìn)行壓縮,而是對重復(fù)的日志進(jìn)行清理來達(dá)到目的剧劝。在日志清理過程中扛门,會清理重復(fù)的key烦衣,最后只會保留最后一條key,可以理解為map的put方法勤众。在清理完后舆绎,一些segment的文件大小就會變小,這時候们颜,kafka會將那些小的文件再合并成一個大的segment文件吕朵。

另外,通過日志清理功能窥突,我們可以做到刪除某個key的功能努溃。推送value為null的key到kafka,kafka在做日志清理時就會將這條key從日志中刪去阻问。

[圖片上傳失敗...(image-30d677-1537448233809)]

二. 清理相關(guān)原理

對于每一個kafka partition的日志梧税,以segment為單位,都會被分為兩部分,已清理未清理的部分第队。同時哮塞,未清理的那部分又分為可以清理的不可清理的

[圖片上傳失敗...(image-156a98-1537448233809)]

每個日志目錄下都會有一個文件cleaner-offset-checkpoint來記錄當(dāng)前清理到哪里了凳谦,這時候kafka就知道哪部分是已經(jīng)清理的忆畅,哪部分是未清理的。

接著尸执,在未清理的segment中邻眷,找出可以清理的那部分segment。首先剔交,active segment肯定是不能清理的肆饶。接著kafka會根據(jù)min.compaction.lag.ms配置找出不能清理的segment,規(guī)則是根據(jù)segment最后的一條記錄的插入時間是否已經(jīng)超過最小保留時間岖常,如果沒有驯镊,這個segment就不能清理。這是為了保證日志至少存留多長時間才會被清理竭鞍。

找出可以清理的segment后板惑,kafka會構(gòu)建一個SkimpyOffsetMap對象,這個對象是一個key與offset的映射關(guān)系的哈希表偎快。接著會遍歷可以清理那部分的segment的每一條日志冯乘,然后將key和offset存到SkimpyOffsetMap中。

之后晒夹,再遍歷已清理部分可以清理部分的segment的每一條日志裆馒,根據(jù)SkimpyOffsetMap來判斷是否保留。假設(shè)一條日志key的offset是1丐怯,但是在SkimpyOffsetMap中對應(yīng)key的offset是100喷好,那么這條日志就可以清楚掉了。

最后读跷,再兩次遍歷后梗搅,可清理部分的segment已變已清理的segment了。同時cleaner checkpoint會執(zhí)行已經(jīng)清理的segment的最后一條offset效览。

三无切、墓碑消息(tombstone)

對于value為null的日志,kafka稱這種日志為tombstone丐枉,也就是墓碑消息哆键。在執(zhí)行日志清理時,會刪除到期的墓碑消息矛洞。墓碑消息的存放時間和broker的配置log.cleaner.delete.retention.ms有關(guān)洼哎,它的默認(rèn)值是24小時。

kafka做日志清理時沼本,會根據(jù)一些規(guī)則判斷是否要保留墓碑消息噩峦。判斷規(guī)則如下:

所在LogSegment的lastModifiedTime + deleteRetionMs > 可清理部分中最后一個LogSegment的lastModifiedTime

所以,墓碑消息的保留時間和已清理部分的最后一個segment有關(guān)系抽兆。

四识补、日志segment合并

再經(jīng)過一次次清理后,各個segment大小會慢慢變小辫红。為了避免日志目錄下有過多的小文件凭涂,kafka在每次日志清理后會進(jìn)行小文件日志合并。kafka會保證合并后的segment大小不超過segmentSize(通過log.segments.bytes設(shè)置贴妻,默認(rèn)值是1G)切油,且對應(yīng)的索引文件占用大小之和不超過maxIndexSize(可以通過broker端參數(shù)log.index.interval.bytes設(shè)置,默認(rèn)值為10MB)名惩。

下面是日志合并的示意圖:

[圖片上傳失敗...(image-56adee-1537448233809)]

五澎胡、清理線程的啟動

kafka日志清理是交給LogCleaner組件來完成的。

kafka在啟動LogManager時娩鹉,如果日志清理機制開啟的話攻谁,就會啟動LogCleaner組件開始定時的清理日志。是否開啟日志清理是由broker的log.cleaner.enable來決定的弯予,默認(rèn)是開啟的戚宦。

LogCleaner啟動后,會注冊n個線程CleanerThread锈嫩,開始不斷的檢查日志并清理受楼。這個線程數(shù)量和broker的配置log.cleaner.threads有關(guān)系,默認(rèn)值是1呼寸。當(dāng)清理線程啟動后那槽,就開始檢查是否有日志需要清理,接著清理完再檢查是否有日志需要清理等舔。如果發(fā)現(xiàn)沒有需要清理的日志骚灸,這個線程會進(jìn)入休眠,休眠時間根據(jù)broker的log.cleaner.backoff.ms來決定慌植,默認(rèn)值是15000甚牲。

//LogCleaner.scala
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs)
private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
def startup() {
    info("Starting the log cleaner")
    cleaners.foreach(_.start())
}
//CleanerThread.scala
override def doWork() {
   cleanOrSleep()
}
private def cleanOrSleep() {
    //獲取哪些日志可以清理,grabFilthiestCompactedLog方法只會返回一個partition的日志
      val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
        case None =>
          false
        case Some(cleanable) =>
          //這里拿到要清理的日志
          var endOffset = cleanable.firstDirtyOffset
          try {
              //開始清理日志
            val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable)
            recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, endOffset, cleanerStats)
            endOffset = nextDirtyOffset
          } catch {
            case _: LogCleaningAbortedException => // task can be aborted, let it go.
          } finally {
            cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
          }
          true
      }
        //刪除一些舊的日志
      val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
      deletable.foreach{
        case (topicPartition, log) =>
          try {
            log.deleteOldSegments()
          } finally {
            cleanerManager.doneDeleting(topicPartition)
          }
      }
      //如果沒有要清理的日志,就進(jìn)入休眠
      if (!cleaned)
        backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS)
}

六蝶柿、通過dirtyRatio獲取要清理的partition日志

cleanerManager.grabFilthiestCompactedLog方法中丈钙,在這里,kafka會遍歷該broker上所有partition目錄交汤,判斷這些partition是否可以清理雏赦,然后從可以清理的那些partition中找出dirtyRatio最高的日志劫笙,開始清理。

//CleanerManager.scala
def grabFilthiestCompactedLog(time: Time): Option[LogToClean] = {
    inLock(lock) {
      val now = time.milliseconds
      this.timeOfLastRun = now
      val lastClean = allCleanerCheckpoints
      val dirtyLogs = logs.filter {
          //判斷這個partition log是否可以清理
        case (_, log) => log.config.compact  // match logs that are marked as compacted
      }.filterNot {
          //可能其他線程在清理這個partition log了
        case (topicPartition, _) => inProgress.contains(topicPartition) // skip any logs already in-progress
      }.map {
        case (topicPartition, log) => // create a LogToClean instance for each
          //獲取可清理部分的第一條offset和不可清理部分的第一條offset
          val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicPartition,
            lastClean, now)
          LogToClean(topicPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset)
      }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
      this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
      // 獲取dirtyRatio最高的partiton log
      val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio)
      if(cleanableLogs.isEmpty) {
        None
      } else {
        val filthiest = cleanableLogs.max
        inProgress.put(filthiest.topicPartition, LogCleaningInProgress)
        Some(filthiest)
      }
    }
  }

  def cleanableOffsets(log: Log, topicPartition: TopicPartition, lastClean: immutable.Map[TopicPartition, Long], now: Long): (Long, Long) = {
    val lastCleanOffset: Option[Long] = lastClean.get(topicPartition)

    // 找出之前清理到哪個offset了星岗,從而找到未清理部分的第一條offset
    val logStartOffset = log.logSegments.head.baseOffset
    val firstDirtyOffset = {
      val offset = lastCleanOffset.getOrElse(logStartOffset)
      if (offset < logStartOffset) {
        // don't bother with the warning if compact and delete are enabled.
        if (!isCompactAndDelete(log))
          warn(s"Resetting first dirty offset to log start offset $logStartOffset since the checkpointed offset $offset is invalid.")
        logStartOffset
      } else {
        offset
      }
    }

    // 先把active segment排除出去
    val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset)

    val compactionLagMs = math.max(log.config.compactionLagMs, 0L)

    //找出不可清理部分的第一條offset填大,其中active segment
      //再通過compactionLagMs過濾掉那些不能清理的segment
    val firstUncleanableDirtyOffset: Long = Seq (

        Option(log.activeSegment.baseOffset),
        if (compactionLagMs > 0) {
          dirtyNonActiveSegments.find {
            s =>
              val isUncleanable = s.largestTimestamp > now - compactionLagMs
              isUncleanable
          } map(_.baseOffset)
        } else None
      ).flatten.min

    (firstDirtyOffset, firstUncleanableDirtyOffset)
  }

注意以下幾點:

  1. 是否開啟topic的日志清理機制和broker的log.cleanup.policy有關(guān)。這個配置的默認(rèn)值是[delete]俏橘,也就是沒有開啟允华。但是并不是所有的partition log都會根據(jù)這個配置來判斷是否開啟日志清理。因為每個topic在創(chuàng)建的時候寥掐,也會指定是否開啟日志清理(會覆蓋broker的那個配置)靴寂。所以需要遍歷所有的partiton,排除掉那些不用清理的partition召耘。
  2. dirtyRatio的計算規(guī)則為dirtyRatio = dirtyBytes / (cleanBytes + dirtyBytes)百炬。其中dirtyBytes表示可清理部分的日志大小,cleanBytes表示已清理部分的日志大小污它。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末收壕,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子轨蛤,更是在濱河造成了極大的恐慌蜜宪,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,427評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件祥山,死亡現(xiàn)場離奇詭異圃验,居然都是意外死亡,警方通過查閱死者的電腦和手機缝呕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評論 3 395
  • 文/潘曉璐 我一進(jìn)店門澳窑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人供常,你說我怎么就攤上這事摊聋。” “怎么了栈暇?”我有些...
    開封第一講書人閱讀 165,747評論 0 356
  • 文/不壞的土叔 我叫張陵麻裁,是天一觀的道長。 經(jīng)常有香客問我源祈,道長煎源,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,939評論 1 295
  • 正文 為了忘掉前任香缺,我火速辦了婚禮手销,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘图张。我一直安慰自己锋拖,他們只是感情好诈悍,可當(dāng)我...
    茶點故事閱讀 67,955評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著兽埃,像睡著了一般侥钳。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上讲仰,一...
    開封第一講書人閱讀 51,737評論 1 305
  • 那天慕趴,我揣著相機與錄音痪蝇,去河邊找鬼鄙陡。 笑死,一個胖子當(dāng)著我的面吹牛躏啰,可吹牛的內(nèi)容都是我干的趁矾。 我是一名探鬼主播,決...
    沈念sama閱讀 40,448評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼给僵,長吁一口氣:“原來是場噩夢啊……” “哼毫捣!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起帝际,我...
    開封第一講書人閱讀 39,352評論 0 276
  • 序言:老撾萬榮一對情侶失蹤蔓同,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后蹲诀,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體斑粱,經(jīng)...
    沈念sama閱讀 45,834評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,992評論 3 338
  • 正文 我和宋清朗相戀三年脯爪,在試婚紗的時候發(fā)現(xiàn)自己被綠了则北。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,133評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡痕慢,死狀恐怖尚揣,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情掖举,我是刑警寧澤快骗,帶...
    沈念sama閱讀 35,815評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站塔次,受9級特大地震影響滨巴,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜俺叭,卻給世界環(huán)境...
    茶點故事閱讀 41,477評論 3 331
  • 文/蒙蒙 一恭取、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧熄守,春花似錦蜈垮、人聲如沸耗跛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,022評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽调塌。三九已至,卻和暖如春惠猿,著一層夾襖步出監(jiān)牢的瞬間羔砾,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,147評論 1 272
  • 我被黑心中介騙來泰國打工偶妖, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留姜凄,地道東北人。 一個月前我還...
    沈念sama閱讀 48,398評論 3 373
  • 正文 我出身青樓趾访,卻偏偏與公主長得像态秧,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子扼鞋,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,077評論 2 355

推薦閱讀更多精彩內(nèi)容