本文主要聚焦 kafka 的日志存儲以及日志清理相關训唱。
日志存儲結構
首先我們來看一張 kafak 的存儲結構圖。
如上圖所示挚冤、kafka 中消息是以主題 topic 為基本單位進行歸類的雪情,這里的 topic 是邏輯上的概念,實際上在磁盤存儲是根據(jù)分區(qū)存儲的你辣,每個主題可以分為多個分區(qū)巡通、分區(qū)的數(shù)量可以在主題創(chuàng)建的時候進行指定。例如下面 kafka 命令創(chuàng)建了一個 topic 為 test 的主題舍哄、該主題下有 4 個分區(qū)宴凉、每個分區(qū)有兩個副本保證高可用。
./bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 2 --partitions 4 --topic test
分區(qū)的修改除了在創(chuàng)建的時候指定表悬。還可以動態(tài)的修改弥锄。如下將 kafka 的 test 主題分區(qū)數(shù)修改為 12 個
./kafka-topics.sh --alter --zookeeper 127.0.0.1:2181 --topic test --partitions 12
分區(qū)內每條消息都會被分配一個唯一的消息 id,也就是我們通常所說的 offset, 因此 kafak 只能保證每一個分區(qū)內部有序性,不能保證全局有序性。
如果分區(qū)設置的合理,那么所有的消息都可以均勻的分布到不同的分區(qū)中去籽暇,這樣可以實現(xiàn)水平擴展温治。不考慮多副本的情況下,一個分區(qū)對應一個 log 日志戒悠、如上圖所示熬荆。為了防止 log 日志過大,kafka 又引入了日志分段(LogSegment)的概念绸狐,將 log 切分為多個 LogSegement卤恳,相當于一個巨型文件被平均分配為相對較小的文件,這樣也便于消息的維護和清理寒矿。事實上突琳,Log 和 LogSegement 也不是純粹物理意義上的概念,Log 在物理上只是以文件夾的形式存儲符相,而每個 LogSegement 對應于磁盤上的一個日志文件和兩個索引文件拆融,以及可能的其他文件(比如以".txindex"為后綴的事務索引文件)。
kafak 中的 Log 對應了一個命名為<topic>-<partition> 的文件夾啊终。舉個例子镜豹、假如有一個 test 主題,此主題下游 3 個分區(qū)孕索,那么在實際物理上的存儲就是 "test-0","test-1","test-2" 這三個文件夾逛艰。
向 Log 中寫入消息是順序寫入的。只有最后一個 LogSegement 才能執(zhí)行寫入操作搞旭,在此之前的所有 LogSegement 都不能執(zhí)行寫入操作散怖。為了方便描述,我們將最后一個 LogSegement 成為"ActiveSegement"肄渗,即表示當前活躍的日志分段镇眷。隨著消息的不斷寫入,當 ActiveSegement 滿足一定的條件時翎嫡,就需要創(chuàng)建新的 activeSegement欠动,之后在追加的消息寫入新的 activeSegement。
為了便于消息的檢索惑申,每個 LogSegement 中的日志文件(以".log" 為文件后綴)都有對應的兩個文件索引:偏移量索引文件(以".index" 為文件后綴)和時間戳索引文件(以".timeindex"為文件后綴)具伍。每個 LogSegement 都有一個“基準偏移量” baseOffset,用來標識當前 LogSegement 中第一條消息的 offset圈驼。偏移量是一個 64 位的長整形人芽。日志文件和兩個索引文件都是根據(jù)基準偏移量(baseOffset)命名的,名稱固定為 20 位數(shù)字绩脆,沒有達到的位數(shù)則用 0 填充萤厅。比如第一個 LogSegment 的基準偏移量為 0橄抹,對應的日志文件為 00000000000000000000.log
示例中第 2 個 LogSegment 對應的基準位移是 256,也說明了該 LogSegment 中的第一條消息的偏移量為 256惕味,同時可以反映出第一個 LogSegment 中共有 256 條消息(偏移量從 0 至 254 的消息)楼誓。
注意每個 LogSegment 中不只包含“.log”“.index”“.timeindex”這 3 種文件,還可能包含“.deleted”“.cleaned”“.swap”等臨時文件,以及可能的“.snapshot”“.txnindex”“l(fā)eader-epoch-checkpoint”等文件。
日志清理機制
由于 kafak 是把消息存儲 在磁盤上真朗,為了控制消息的不斷增加我們就必須對消息做一定的清理和壓縮。kakfa 中的每一個分區(qū)副本都對應的一個 log 日志文件阁猜。而 Log 又分為多個 LogSegement 日志分段丸逸。這樣也便于日志清理蹋艺。kafka 內部提供了兩種日志清理策略。
日志刪除
按照一定的保留策略直接刪除不符合條件的日志分段黄刚。
1.基于時間
我們可以通過 broker 端參數(shù) log.cleanup.policy 來設置日志清理策略捎谨,此參數(shù)的默認值為“delete”,即采用日志刪除的清理策略憔维。如果要采用日志壓縮的清理策略涛救,就需要將 log.cleanup.policy 設置為“compact”,并且還需要將 log.cleaner.enable(默認值為 true)設定為 true业扒。通過將 log.cleanup.policy 參數(shù)設置為“delete检吆,compact”,還可以同時支持日志刪除和日志壓縮兩種策略程储。日志清理的粒度可以控制到主題級別蹭沛,比如與 log.cleanup.policy 對應的主題級別的參數(shù)為 cleanup.policy,為了簡化說明章鲤,本文只采用 broker 端參數(shù)做陳述摊灭。
日志刪除任務會檢查當前日志文件中是否有保留時間超過設定的閾值(retentionMs)來尋找可刪除的日志分段文件集合(deletableSegments),如圖下圖所示败徊。retentionMs 可以通過 broker 端參數(shù) log.retention.hours帚呼、log.retention.minutes 和 log.retention.ms 來配置,其中 log.retention.ms 的優(yōu)先級最高皱蹦,log.retention.minutes 次之煤杀,log.retention.hours 最低。默認情況下只配置了 log.retention.hours 參數(shù)沪哺,其值為 168沈自,故默認情況下日志分段文件的保留時間為 7 天。
查找過期的日志分段文件凤粗,并不是簡單地根據(jù)日志分段的最近修改時間 lastModifiedTime 來計算的酥泛,而是根據(jù)日志分段中最大的時間戳 largestTimeStamp 來計算的今豆。因為日志分段的 lastModifiedTime 可以被有意或無意地修改,比如執(zhí)行了 touch 操作柔袁,或者分區(qū)副本進行了重新分配呆躲,lastModifiedTime 并不能真實地反映出日志分段在磁盤的保留時間。要獲取日志分段中的最大時間戳 largestTimeStamp 的值捶索,首先要查詢該日志分段所對應的時間戳索引文件插掂,查找時間戳索引文件中最后一條索引項,若最后一條索引項的時間戳字段值大于 0腥例,則取其值辅甥,否則才設置為最近修改時間 lastModifiedTime.
若待刪除的日志分段的總數(shù)等于該日志文件中所有的日志分段的數(shù)量,那么說明所有的日志分段都已過期燎竖,但該日志文件中還要有一個日志分段用于接收消息的寫入璃弄,即必須要保證有一個活躍的日志分段 activeSegment,在此種情況下构回,會先切分出一個新的日志分段作為 activeSegment夏块,然后執(zhí)行刪除操作。
刪除日志分段時纤掸,首先會從 Log 對象中所維護日志分段的跳躍表中移除待刪除的日志分段脐供,以保證沒有線程對這些日志分段進行讀取操作。然后將日志分段所對應的所有文件添加上“.deleted”的后綴(當然也包括對應的索引文件)借跪。最后交由一個以“delete-file”命名的延遲任務來刪除這些以“.deleted”為后綴的文件政己,這個任務的延遲執(zhí)行時間可以通過 file.delete.delay.ms 參數(shù)來調配,此參數(shù)的默認值為 60000掏愁,即 1 分鐘歇由。
2.基于日志大小
日志刪除任務會檢查當前日志的大小是否超過設定的閾值(retentionSize)來尋找可刪除的日志分段的文件集合(deletableSegments),如下圖所示托猩。retentionSize 可以通過 broker 端參數(shù) log.retention.bytes 來配置印蓖,默認值為-1,表示無窮大京腥。注意 log.retention.bytes 配置的是 Log 中所有日志文件的總大小赦肃,而不是單個日志分段(確切地說應該為.log 日志文件)的大小。單個日志分段的大小由 broker 端參數(shù) log.segment.bytes 來限制公浪,默認值為 1073741824他宛,即 1GB。
基于日志大小的保留策略與基于時間的保留策略類似欠气,首先計算日志文件的總大小 size 和 retentionSize 的差值 diff厅各,即計算需要刪除的日志總大小,然后從日志文件中的第一個日志分段開始進行查找可刪除的日志分段的文件集合 deletableSegments预柒。查找出 deletableSegments 之后就執(zhí)行刪除操作队塘,這個刪除操作和基于時間的保留策略的刪除操作相同