Kafka Log Compaction 解析

最近查看Kafka文檔, 發(fā)現(xiàn) Kafka 有個(gè) Log Compaction 功能是我們之前沒(méi)有留意到的, 但是有著很高的潛在實(shí)用價(jià)值.
什么是Log Compaction
Kafka 中的每一條數(shù)據(jù)都有一對(duì) Key 和 Value, 數(shù)據(jù)存放在磁盤(pán)上, 一般不會(huì)被永久保留, 而是在到達(dá)一定的量或者時(shí)間后對(duì)最早寫(xiě)入的數(shù)據(jù)進(jìn)行刪除. Log Compaction 在默認(rèn)的刪除規(guī)則之外提供了另一種刪除過(guò)時(shí)數(shù)據(jù)(或者說(shuō)保留有價(jià)值的數(shù)據(jù))的方式, 就是對(duì)于有相同 Key 的不同數(shù)據(jù), 只保留最后一條, 前面的數(shù)據(jù)在合適的情況下刪除.
Log Compaction 的應(yīng)用場(chǎng)景
Log Compaction 特性, 就實(shí)時(shí)計(jì)算而言, 可以在災(zāi)難恢復(fù)方面有很好地應(yīng)用場(chǎng)景. 比如說(shuō)我們?cè)?Storm 里做計(jì)算時(shí), 需要長(zhǎng)期在內(nèi)存里維護(hù)一些數(shù)據(jù), 這些數(shù)據(jù)可能是通過(guò)聚合了一天或者一周的日志得到的, 這些數(shù)據(jù)一旦由于偶然的原因(磁盤(pán),網(wǎng)絡(luò)等)崩潰了, 從頭開(kāi)始計(jì)算需要漫長(zhǎng)的時(shí)間.一個(gè)可行的應(yīng)對(duì)方法是定時(shí)將內(nèi)存里的數(shù)據(jù)備份到外部存儲(chǔ)中, 比如 Redis 或者 Mysql 等, 當(dāng)崩潰發(fā)生的時(shí)候再?gòu)耐獠看鎯?chǔ)讀回來(lái)繼續(xù)計(jì)算.
使用 Log Compaction 來(lái)代替這些外部存儲(chǔ)有以下好處.
Kafka 既是數(shù)據(jù)源又是存儲(chǔ)工具, 可以簡(jiǎn)化技術(shù)棧, 降低維護(hù)成本.

使用 Mysql 或者 Redis 作為外部存儲(chǔ)的話, 需要將存儲(chǔ)的 Key 記錄下來(lái), 恢復(fù)時(shí)再用這些 Key 將數(shù)據(jù)取回, 實(shí)現(xiàn)起來(lái)有一定的工程復(fù)雜度. 用Log Compaction 特性的話只要把數(shù)據(jù)一股腦兒地寫(xiě)進(jìn) Kafka, 等災(zāi)難恢復(fù)的時(shí)候再讀回內(nèi)存就行了.

Kafka 針對(duì)磁盤(pán)讀寫(xiě)都有很高的順序性, 相對(duì)于 Mysql 沒(méi)有索引查詢(xún)等工作量的負(fù)擔(dān), 可以實(shí)現(xiàn)高性能, 相對(duì)于 Redis 而言, 它可以充分利用廉價(jià)的磁盤(pán)而對(duì)內(nèi)存要求很低, 在接近的性能下能實(shí)現(xiàn)非常高的性?xún)r(jià)比(僅僅針對(duì)災(zāi)難恢復(fù)這個(gè)場(chǎng)景而言).

實(shí)現(xiàn)方式的簡(jiǎn)要介紹
當(dāng) topic 的 cleanup.policy (默認(rèn)為delete) 設(shè)置為 compact 時(shí), Kafka 的后臺(tái)線程會(huì)定時(shí)把 topic 遍歷兩次, 第一次把每個(gè) key 的哈希值最后一次出現(xiàn)的 offset 都存下來(lái), 第二次檢查每個(gè) offset 對(duì)應(yīng)的 key 是否在更后面的日志中出現(xiàn)過(guò),如果出現(xiàn)了就刪除對(duì)應(yīng)的日志.


源碼解析
Log Compaction 的大部分功能由CleanerThread完成, 核心邏輯在 Cleaner 的 clean方法

/** * Clean the given log * * @param cleanable The log to be cleaned * * @return The first offset not cleaned and the statistics for this round of cleaning */ private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = { val stats = new CleanerStats() info("Beginning cleaning of log %s.".format(cleanable.log.name)) val log = cleanable.log // build the offset map info("Building offset map for %s...".format(cleanable.log.name)) val upperBoundOffset = cleanable.firstUncleanableOffset buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats) // <----- 這里第一次遍歷所有offset將key索引 val endOffset = offsetMap.latestOffset + 1 stats.indexDone() // figure out the timestamp below which it is safe to remove delete tombstones // this position is defined to be a configurable time beneath the last modified time of the last clean segment val deleteHorizonMs = log.logSegments(0, cleanable.firstDirtyOffset).lastOption match { case None => 0L case Some(seg) => seg.lastModified - log.config.deleteRetentionMs } // determine the timestamp up to which the log will be cleaned // this is the lower of the last active segment and the compaction lag val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L) // group the segments and clean the groups info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs))) for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) cleanSegments(log, group, offsetMap, deleteHorizonMs, stats) // <-- 這里第二次遍歷所有offset,刪除冗余的日志,并且將多個(gè)小的segment合并為一個(gè) // record buffer utilization stats.bufferUtilization = offsetMap.utilization stats.allDone() (endOffset, stats) }

log compaction 通過(guò)兩次遍歷所有數(shù)據(jù)來(lái)實(shí)現(xiàn), 兩次遍歷之間交流的媒介就是一個(gè)OffsetMap, 下面是OffsetMap的簽名
trait OffsetMap { def slots: Int def put(key: ByteBuffer, offset: Long) def get(key: ByteBuffer): Long def clear() def size: Int def utilization: Double = size.toDouble / slots def latestOffset: Long}

這基本就是個(gè)普通的mutable map, 在 Kafka 項(xiàng)目中,它的實(shí)現(xiàn)只有一個(gè), 叫做SkimpyOffsetMap
put方法
put 方法會(huì)為每個(gè) key 生成一份摘要,默認(rèn)使用 md5 方法生成一個(gè) 16byte 的摘要, 根據(jù)這個(gè)摘要在 bytes
中哈希的到一個(gè)下標(biāo), 如果這個(gè)下標(biāo)已經(jīng)被別的摘要占據(jù), 則線性查找到下個(gè)空余的下標(biāo)為止, 然后在對(duì)應(yīng)位置插入該 key 對(duì)應(yīng)的 offset
/** * Associate this offset to the given key. * @param key The key * @param offset The offset */override def put(key: ByteBuffer, offset: Long) { require(entries < slots, "Attempt to add a new entry to a full offset map.") lookups += 1 hashInto(key, hash1) // probe until we find the first empty slot var attempt = 0 var pos = positionOf(hash1, attempt) while(!isEmpty(pos)) { bytes.position(pos) bytes.get(hash2) if(Arrays.equals(hash1, hash2)) { // we found an existing entry, overwrite it and return (size does not change) bytes.putLong(offset) lastOffset = offset return } attempt += 1 pos = positionOf(hash1, attempt) } // found an empty slot, update it--size grows by 1 bytes.position(pos) bytes.put(hash1) bytes.putLong(offset) lastOffset = offset entries += 1}

get方法
get 方法使用和 put 同樣的摘要算法獲得 key 的摘要, 通過(guò)摘要獲得 offset 的存儲(chǔ)位置
/** * Get the offset associated with this key. * @param key The key * @return The offset associated with this key or -1 if the key is not found */ override def get(key: ByteBuffer): Long = { lookups += 1 hashInto(key, hash1) // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot var attempt = 0 var pos = 0 //we need to guard against attempt integer overflow if the map is full //limit attempt to number of slots once positionOf(..) enters linear search mode val maxAttempts = slots + hashSize - 4 do { if(attempt >= maxAttempts) return -1L pos = positionOf(hash1, attempt) bytes.position(pos) if(isEmpty(pos)) return -1L bytes.get(hash2) attempt += 1 } while(!Arrays.equals(hash1, hash2)) bytes.getLong() }

可能的空間問(wèn)題 性能問(wèn)題 沖突問(wèn)題
空間問(wèn)題
默認(rèn)情況下, Kafka 用 16 個(gè) byte 存放key的摘要, 用 8 個(gè) byte 存放摘要對(duì)應(yīng)的 offset, 1GB 的空間可以保存 1024* 1024*1024 / 24 = 44,739,242.666...
個(gè) key 對(duì)應(yīng)的數(shù)據(jù).
性能問(wèn)題
這個(gè) log compaction 的原理挺簡(jiǎn)單, 就是定期把所有日志讀兩遍,寫(xiě)一遍, cpu 的速度超過(guò)磁盤(pán)完全不是問(wèn)題, 只要日志的量對(duì)應(yīng)的讀兩遍寫(xiě)一遍的時(shí)間在可接受的范圍內(nèi), 它的性能就是可以接受的.
沖突問(wèn)題
現(xiàn)在的 OffsetMap 唯一的實(shí)現(xiàn)名字叫做 SkimpyOffsetMap, 相信你們已經(jīng)從這個(gè)名字里看出端倪, 最初的作者本身也認(rèn)為這樣的實(shí)現(xiàn)不夠嚴(yán)謹(jǐn). 這個(gè)算法在兩個(gè) key 的 md5 值相同的情況下就判斷 key 是相同的, 如果遇到了 key 不同而 md5 值相同的情況, 那兩個(gè) key 中其中一個(gè)的消息就丟失了. 雖然 md5 值相同的概率很低, 但如果真的碰上了, 那就是100%, 概率值再低也沒(méi)用, 而且從網(wǎng)上的反映看似乎沖突還不少見(jiàn).
我個(gè)人目前想到的處理方案是, 大部分的 key 總長(zhǎng)度并不算長(zhǎng), 可以把這類(lèi) key 所有可能的情況都md5一遍看一下是否有沖突, 如果沒(méi)有的話就放心用.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市黔衡,隨后出現(xiàn)的幾起案子镐侯,更是在濱河造成了極大的恐慌,老刑警劉巖禁谦,帶你破解...
    沈念sama閱讀 221,888評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件率寡,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡捧挺,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,677評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén)尿瞭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)闽烙,“玉大人,你說(shuō)我怎么就攤上這事声搁『诰海” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,386評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵疏旨,是天一觀的道長(zhǎng)很魂。 經(jīng)常有香客問(wèn)我,道長(zhǎng)檐涝,這世上最難降的妖魔是什么遏匆? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,726評(píng)論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮谁榜,結(jié)果婚禮上幅聘,老公的妹妹穿的比我還像新娘。我一直安慰自己窃植,他們只是感情好帝蒿,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,729評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著巷怜,像睡著了一般葛超。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上延塑,一...
    開(kāi)封第一講書(shū)人閱讀 52,337評(píng)論 1 310
  • 那天绣张,我揣著相機(jī)與錄音,去河邊找鬼页畦。 笑死胖替,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播独令,決...
    沈念sama閱讀 40,902評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼端朵,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了燃箭?” 一聲冷哼從身側(cè)響起冲呢,我...
    開(kāi)封第一講書(shū)人閱讀 39,807評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎招狸,沒(méi)想到半個(gè)月后敬拓,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,349評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡裙戏,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,439評(píng)論 3 340
  • 正文 我和宋清朗相戀三年乘凸,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片累榜。...
    茶點(diǎn)故事閱讀 40,567評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡营勤,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出壹罚,到底是詐尸還是另有隱情葛作,我是刑警寧澤,帶...
    沈念sama閱讀 36,242評(píng)論 5 350
  • 正文 年R本政府宣布猖凛,位于F島的核電站赂蠢,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏辨泳。R本人自食惡果不足惜虱岂,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,933評(píng)論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望漠吻。 院中可真熱鬧量瓜,春花似錦、人聲如沸途乃。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,420評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)耍共。三九已至烫饼,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間试读,已是汗流浹背杠纵。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,531評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留钩骇,地道東北人比藻。 一個(gè)月前我還...
    沈念sama閱讀 48,995評(píng)論 3 377
  • 正文 我出身青樓铝量,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親银亲。 傳聞我的和親對(duì)象是個(gè)殘疾皇子慢叨,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,585評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容