最近查看Kafka文檔, 發(fā)現(xiàn) Kafka 有個(gè) Log Compaction 功能是我們之前沒(méi)有留意到的, 但是有著很高的潛在實(shí)用價(jià)值.
什么是Log Compaction
Kafka 中的每一條數(shù)據(jù)都有一對(duì) Key 和 Value, 數(shù)據(jù)存放在磁盤(pán)上, 一般不會(huì)被永久保留, 而是在到達(dá)一定的量或者時(shí)間后對(duì)最早寫(xiě)入的數(shù)據(jù)進(jìn)行刪除. Log Compaction 在默認(rèn)的刪除規(guī)則之外提供了另一種刪除過(guò)時(shí)數(shù)據(jù)(或者說(shuō)保留有價(jià)值的數(shù)據(jù))的方式, 就是對(duì)于有相同 Key 的不同數(shù)據(jù), 只保留最后一條, 前面的數(shù)據(jù)在合適的情況下刪除.
Log Compaction 的應(yīng)用場(chǎng)景
Log Compaction 特性, 就實(shí)時(shí)計(jì)算而言, 可以在災(zāi)難恢復(fù)方面有很好地應(yīng)用場(chǎng)景. 比如說(shuō)我們?cè)?Storm 里做計(jì)算時(shí), 需要長(zhǎng)期在內(nèi)存里維護(hù)一些數(shù)據(jù), 這些數(shù)據(jù)可能是通過(guò)聚合了一天或者一周的日志得到的, 這些數(shù)據(jù)一旦由于偶然的原因(磁盤(pán),網(wǎng)絡(luò)等)崩潰了, 從頭開(kāi)始計(jì)算需要漫長(zhǎng)的時(shí)間.一個(gè)可行的應(yīng)對(duì)方法是定時(shí)將內(nèi)存里的數(shù)據(jù)備份到外部存儲(chǔ)中, 比如 Redis 或者 Mysql 等, 當(dāng)崩潰發(fā)生的時(shí)候再?gòu)耐獠看鎯?chǔ)讀回來(lái)繼續(xù)計(jì)算.
使用 Log Compaction 來(lái)代替這些外部存儲(chǔ)有以下好處.
Kafka 既是數(shù)據(jù)源又是存儲(chǔ)工具, 可以簡(jiǎn)化技術(shù)棧, 降低維護(hù)成本.
使用 Mysql 或者 Redis 作為外部存儲(chǔ)的話, 需要將存儲(chǔ)的 Key 記錄下來(lái), 恢復(fù)時(shí)再用這些 Key 將數(shù)據(jù)取回, 實(shí)現(xiàn)起來(lái)有一定的工程復(fù)雜度. 用Log Compaction 特性的話只要把數(shù)據(jù)一股腦兒地寫(xiě)進(jìn) Kafka, 等災(zāi)難恢復(fù)的時(shí)候再讀回內(nèi)存就行了.
Kafka 針對(duì)磁盤(pán)讀寫(xiě)都有很高的順序性, 相對(duì)于 Mysql 沒(méi)有索引查詢(xún)等工作量的負(fù)擔(dān), 可以實(shí)現(xiàn)高性能, 相對(duì)于 Redis 而言, 它可以充分利用廉價(jià)的磁盤(pán)而對(duì)內(nèi)存要求很低, 在接近的性能下能實(shí)現(xiàn)非常高的性?xún)r(jià)比(僅僅針對(duì)災(zāi)難恢復(fù)這個(gè)場(chǎng)景而言).
實(shí)現(xiàn)方式的簡(jiǎn)要介紹
當(dāng) topic 的 cleanup.policy (默認(rèn)為delete) 設(shè)置為 compact 時(shí), Kafka 的后臺(tái)線程會(huì)定時(shí)把 topic 遍歷兩次, 第一次把每個(gè) key 的哈希值最后一次出現(xiàn)的 offset 都存下來(lái), 第二次檢查每個(gè) offset 對(duì)應(yīng)的 key 是否在更后面的日志中出現(xiàn)過(guò),如果出現(xiàn)了就刪除對(duì)應(yīng)的日志.
源碼解析
Log Compaction 的大部分功能由CleanerThread完成, 核心邏輯在 Cleaner 的 clean方法
/** * Clean the given log * * @param cleanable The log to be cleaned * * @return The first offset not cleaned and the statistics for this round of cleaning */ private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = { val stats = new CleanerStats() info("Beginning cleaning of log %s.".format(cleanable.log.name)) val log = cleanable.log // build the offset map info("Building offset map for %s...".format(cleanable.log.name)) val upperBoundOffset = cleanable.firstUncleanableOffset buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats) // <----- 這里第一次遍歷所有offset將key索引 val endOffset = offsetMap.latestOffset + 1 stats.indexDone() // figure out the timestamp below which it is safe to remove delete tombstones // this position is defined to be a configurable time beneath the last modified time of the last clean segment val deleteHorizonMs = log.logSegments(0, cleanable.firstDirtyOffset).lastOption match { case None => 0L case Some(seg) => seg.lastModified - log.config.deleteRetentionMs } // determine the timestamp up to which the log will be cleaned // this is the lower of the last active segment and the compaction lag val cleanableHorizonMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.lastModified).getOrElse(0L) // group the segments and clean the groups info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizonMs), new Date(deleteHorizonMs))) for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) cleanSegments(log, group, offsetMap, deleteHorizonMs, stats) // <-- 這里第二次遍歷所有offset,刪除冗余的日志,并且將多個(gè)小的segment合并為一個(gè) // record buffer utilization stats.bufferUtilization = offsetMap.utilization stats.allDone() (endOffset, stats) }
log compaction 通過(guò)兩次遍歷所有數(shù)據(jù)來(lái)實(shí)現(xiàn), 兩次遍歷之間交流的媒介就是一個(gè)OffsetMap, 下面是OffsetMap的簽名
trait OffsetMap { def slots: Int def put(key: ByteBuffer, offset: Long) def get(key: ByteBuffer): Long def clear() def size: Int def utilization: Double = size.toDouble / slots def latestOffset: Long}
這基本就是個(gè)普通的mutable map, 在 Kafka 項(xiàng)目中,它的實(shí)現(xiàn)只有一個(gè), 叫做SkimpyOffsetMap
put方法
put 方法會(huì)為每個(gè) key 生成一份摘要,默認(rèn)使用 md5 方法生成一個(gè) 16byte 的摘要, 根據(jù)這個(gè)摘要在 bytes
中哈希的到一個(gè)下標(biāo), 如果這個(gè)下標(biāo)已經(jīng)被別的摘要占據(jù), 則線性查找到下個(gè)空余的下標(biāo)為止, 然后在對(duì)應(yīng)位置插入該 key 對(duì)應(yīng)的 offset
/** * Associate this offset to the given key. * @param key The key * @param offset The offset */override def put(key: ByteBuffer, offset: Long) { require(entries < slots, "Attempt to add a new entry to a full offset map.") lookups += 1 hashInto(key, hash1) // probe until we find the first empty slot var attempt = 0 var pos = positionOf(hash1, attempt) while(!isEmpty(pos)) { bytes.position(pos) bytes.get(hash2) if(Arrays.equals(hash1, hash2)) { // we found an existing entry, overwrite it and return (size does not change) bytes.putLong(offset) lastOffset = offset return } attempt += 1 pos = positionOf(hash1, attempt) } // found an empty slot, update it--size grows by 1 bytes.position(pos) bytes.put(hash1) bytes.putLong(offset) lastOffset = offset entries += 1}
get方法
get 方法使用和 put 同樣的摘要算法獲得 key 的摘要, 通過(guò)摘要獲得 offset 的存儲(chǔ)位置
/** * Get the offset associated with this key. * @param key The key * @return The offset associated with this key or -1 if the key is not found */ override def get(key: ByteBuffer): Long = { lookups += 1 hashInto(key, hash1) // search for the hash of this key by repeated probing until we find the hash we are looking for or we find an empty slot var attempt = 0 var pos = 0 //we need to guard against attempt integer overflow if the map is full //limit attempt to number of slots once positionOf(..) enters linear search mode val maxAttempts = slots + hashSize - 4 do { if(attempt >= maxAttempts) return -1L pos = positionOf(hash1, attempt) bytes.position(pos) if(isEmpty(pos)) return -1L bytes.get(hash2) attempt += 1 } while(!Arrays.equals(hash1, hash2)) bytes.getLong() }
可能的空間問(wèn)題 性能問(wèn)題 沖突問(wèn)題
空間問(wèn)題
默認(rèn)情況下, Kafka 用 16 個(gè) byte 存放key的摘要, 用 8 個(gè) byte 存放摘要對(duì)應(yīng)的 offset, 1GB 的空間可以保存 1024* 1024*1024 / 24 = 44,739,242.666...
個(gè) key 對(duì)應(yīng)的數(shù)據(jù).
性能問(wèn)題
這個(gè) log compaction 的原理挺簡(jiǎn)單, 就是定期把所有日志讀兩遍,寫(xiě)一遍, cpu 的速度超過(guò)磁盤(pán)完全不是問(wèn)題, 只要日志的量對(duì)應(yīng)的讀兩遍寫(xiě)一遍的時(shí)間在可接受的范圍內(nèi), 它的性能就是可以接受的.
沖突問(wèn)題
現(xiàn)在的 OffsetMap 唯一的實(shí)現(xiàn)名字叫做 SkimpyOffsetMap, 相信你們已經(jīng)從這個(gè)名字里看出端倪, 最初的作者本身也認(rèn)為這樣的實(shí)現(xiàn)不夠嚴(yán)謹(jǐn). 這個(gè)算法在兩個(gè) key 的 md5 值相同的情況下就判斷 key 是相同的, 如果遇到了 key 不同而 md5 值相同的情況, 那兩個(gè) key 中其中一個(gè)的消息就丟失了. 雖然 md5 值相同的概率很低, 但如果真的碰上了, 那就是100%, 概率值再低也沒(méi)用, 而且從網(wǎng)上的反映看似乎沖突還不少見(jiàn).
我個(gè)人目前想到的處理方案是, 大部分的 key 總長(zhǎng)度并不算長(zhǎng), 可以把這類(lèi) key 所有可能的情況都md5一遍看一下是否有沖突, 如果沒(méi)有的話就放心用.