之前的文章分段介紹了 prometheus/tsdb
下的各個(gè) pkg 的具體內(nèi)容
這篇文章將完整分析 prometheus/tsdb
本身的實(shí)現(xiàn)
tombstones.go
Stone
Stone 是作為刪除數(shù)據(jù)的標(biāo)記
// Stone holds the information on the posting and time-range
// that is deleted.
type Stone struct {
ref uint64
intervals Intervals
}
Interval, Intervals
用來(lái)記錄時(shí)間段
// Interval represents a single time-interval.
type Interval struct {
Mint, Maxt int64
}
func (tr Interval) inBounds(t int64) bool {
return t >= tr.Mint && t <= tr.Maxt
}
func (tr Interval) isSubrange(dranges Intervals) bool {
for _, r := range dranges {
if r.inBounds(tr.Mint) && r.inBounds(tr.Maxt) {
return true
}
}
return false
}
TombstoneReader
// TombstoneReader gives access to tombstone intervals by series reference.
type TombstoneReader interface {
// Get returns deletion intervals for the series with the given reference.
Get(ref uint64) (Intervals, error)
// Iter calls the given function for each encountered interval.
Iter(func(uint64, Intervals) error) error
// Close any underlying resources
Close() error
}
提供了一個(gè)內(nèi)存版的實(shí)現(xiàn)
type memTombstones map[uint64]Intervals
var emptyTombstoneReader = memTombstones{}
// EmptyTombstoneReader returns a TombstoneReader that is always empty.
func EmptyTombstoneReader() TombstoneReader {
return emptyTombstoneReader
}
func (t memTombstones) Get(ref uint64) (Intervals, error) {
return t[ref], nil
}
func (t memTombstones) Iter(f func(uint64, Intervals) error) error {
for ref, ivs := range t {
if err := f(ref, ivs); err != nil {
return err
}
}
return nil
}
func (t memTombstones) add(ref uint64, itv Interval) {
t[ref] = t[ref].add(itv)
}
func (memTombstones) Close() error {
return nil
}
TombstoneReader 的內(nèi)容可以被寫(xiě)入文件, 也可以通過(guò)文件讀出.
func writeTombstoneFile(dir string, tr TombstoneReader) error {
path := filepath.Join(dir, tombstoneFilename)
tmp := path + ".tmp"
// ...
return renameFile(tmp, path)
}
func readTombstones(dir string) (memTombstones, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
// ...
stonesMap := memTombstones{}
for d.len() > 0 {
// ...
stonesMap.add(k, Interval{mint, maxt})
}
return stonesMap, nil
}
wal.go
prometheus/tsdb 會(huì)將幾類數(shù)據(jù)先寫(xiě)入 wal (write ahead log) 文件
// WALEntryType indicates what data a WAL entry contains.
type WALEntryType uint8
// Entry types in a segment file.
const (
WALEntrySymbols WALEntryType = 1
WALEntrySeries WALEntryType = 2
WALEntrySamples WALEntryType = 3
WALEntryDeletes WALEntryType = 4
)
// WAL is a write ahead log that can log new series labels and samples.
// It must be completely read before new entries are logged.
type WAL interface {
Reader() WALReader
LogSeries([]RefSeries) error
LogSamples([]RefSample) error
LogDeletes([]Stone) error
Truncate(mint int64, keep func(uint64) bool) error
Close() error
}
// WALReader reads entries from a WAL.
type WALReader interface {
Read(
seriesf func([]RefSeries),
samplesf func([]RefSample),
deletesf func([]Stone),
) error
}
與之相關(guān)的數(shù)據(jù)結(jié)構(gòu)定義如下
// RefSeries is the series labels with the series ID.
type RefSeries struct {
Ref uint64
Labels labels.Labels
}
// RefSample is a timestamp/value pair associated with a reference to a series.
type RefSample struct {
Ref uint64
T int64
V float64
// 基于內(nèi)存的 series 數(shù)據(jù), 在后續(xù)的閱讀中再仔細(xì)分析
series *memSeries
}
SegmentWAL
這是 WAL 的一個(gè)實(shí)現(xiàn), 會(huì)將數(shù)據(jù)切成 256MB 一片進(jìn)行存儲(chǔ), 切片的組織方式與 chunks 類似.
相應(yīng)的, 操作文件的相關(guān)實(shí)現(xiàn)代碼也很相似.
// segmentFile wraps a file object of a segment and tracks the highest timestamp
// it contains. During WAL truncating, all segments with no higher timestamp than
// the truncation threshold can be compacted.
type segmentFile struct {
*os.File
maxTime int64 // highest tombstone or sample timpstamp in segment
minSeries uint64 // lowerst series ID in segment
}
// SegmentWAL is a write ahead log for series data.
type SegmentWAL struct {
mtx sync.Mutex
metrics *walMetrics
dirFile *os.File
files []*segmentFile
logger log.Logger
flushInterval time.Duration
segmentSize int64
crc32 hash.Hash32
cur *bufio.Writer
curN int64
// 信號(hào)
stopc chan struct{}
donec chan struct{}
// 后臺(tái)執(zhí)行的操作
actorc chan func() error // sequentialized background operations
buffers sync.Pool
}
LogXXXX
LogSeries, LogSamples, LogDeletes 對(duì)各自的操作數(shù)據(jù)分別編碼寫(xiě)入 WAL.
Truncate
// Truncate deletes the values prior to mint and the series which the keep function
// does not indiciate to preserve.
// 用于清除不再需要的數(shù)據(jù)
func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error {
// ...
return nil
}
run
通過(guò) OpenSegmentWAL
打開(kāi)一個(gè) SegmentWAL 的時(shí)候, 會(huì)在一個(gè)獨(dú)立的 goroutine 中運(yùn)行 run 函數(shù), 用來(lái)處理 actorc
傳遞的后臺(tái)操作.
目前 actorc
傳遞的操作僅有文件的分片
// cut finishes the currently active segments and opens the next one.
// The encoder is reset to point to the new segment.
func (w *SegmentWAL) cut() error {
// Sync current head to disk and close.
if hf := w.head(); hf != nil {
if err := w.flush(); err != nil {
return err
}
// Finish last segment asynchronously to not block the WAL moving along
// in the new segment.
// 結(jié)束當(dāng)前的切片文件
go func() {
w.actorc <- func() error {
off, err := hf.Seek(0, os.SEEK_CUR)
if err != nil {
return errors.Wrapf(err, "finish old segment %s", hf.Name())
}
if err := hf.Truncate(off); err != nil {
return errors.Wrapf(err, "finish old segment %s", hf.Name())
}
if err := hf.Sync(); err != nil {
return errors.Wrapf(err, "finish old segment %s", hf.Name())
}
if err := hf.Close(); err != nil {
return errors.Wrapf(err, "finish old segment %s", hf.Name())
}
return nil
}
}()
}
// 初始化新的切片文件供寫(xiě)入
// ...
return nil
}
Compact.go
對(duì)底層存儲(chǔ)的壓縮相關(guān)的實(shí)現(xiàn)
// Compactor provides compaction against an underlying storage
// of time series data.
type Compactor interface {
// Plan returns a set of non-overlapping directories that can
// be compacted concurrently.
// Results returned when compactions are in progress are undefined.
Plan(dir string) ([]string, error)
// Write persists a Block into a directory.
Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error)
// Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan().
Compact(dest string, dirs ...string) (ulid.ULID, error)
}
LeveledCompactor
是 Compactor 的實(shí)現(xiàn)
Plan
// Plan returns a list of compactable blocks in the provided directory.
func (c *LeveledCompactor) Plan(dir string) ([]string, error) {
dirs, err := blockDirs(dir)
// ...
var dms []dirMeta
for _, dir := range dirs {
// 讀取 BlockMeta 作為判斷是否可以 compact 的依據(jù)
meta, err := readMetaFile(dir)
// ...
}
return c.plan(dms)
}
populateBlock
LeveledCompactor.Write
和 LeveledCompactor.Compact
兩個(gè)方法中都用到 LeveledCompactor.write
, 而 LeveledCompactor.populateBlock
是 write 方法的重要邏輯.
其作用是將一組 Block 的數(shù)據(jù)合并, 再寫(xiě)入 IndexWriter, ChunkWriter.
// populateBlock fills the index and chunk writers with new data gathered as the union
// of the provided blocks. It returns meta information for the new block.
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error {
var (
set ChunkSeriesSet
allSymbols = make(map[string]struct{}, 1<<16)
closers = []io.Closer{}
)
defer func() { closeAll(closers...) }()
// 遍歷舊 block 數(shù)據(jù)
for i, b := range blocks {
indexr, err := b.Index()
// ...
chunkr, err := b.Chunks()
// ...
tombsr, err := b.Tombstones()
// ...
symbols, err := indexr.Symbols()
// ...
all, err := indexr.Postings(index.AllPostingsKey())
if err != nil {
return err
}
all = indexr.SortedPostings(all)
s := newCompactionSeriesSet(indexr, chunkr, tombsr, all)
// ...
// 與上一層并形成一個(gè)新的 merger
set, err = newCompactionMerger(set, s)
if err != nil {
return err
}
}
// We fully rebuild the postings list index from merged series.
// ...
// 遍歷 merger
for set.Next() {
lset, chks, dranges := set.At() // The chunks here are not fully deleted.
// Skip the series with all deleted chunks.
// ...
if err := chunkw.WriteChunks(chks...); err != nil {
return errors.Wrap(err, "write chunks")
}
if err := indexw.AddSeries(i, lset, chks...); err != nil {
return errors.Wrap(err, "add series")
}
// ...
}
// ...
s := make([]string, 0, 256)
for n, v := range values {
// ...
if err := indexw.WriteLabelIndex([]string{n}, s); err != nil {
return errors.Wrap(err, "write label index")
}
}
for _, l := range postings.SortedKeys() {
if err := indexw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)); err != nil {
return errors.Wrap(err, "write postings")
}
}
return nil
}
block.go
Block
Delete
// Delete matching series between mint and maxt in the block.
// 前面說(shuō)到, Delete 的時(shí)候會(huì)暫時(shí)先標(biāo)記為 Tombstone, 這里即實(shí)現(xiàn)部分
func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
// ...
err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
for _, iv := range ivs {
stones.add(id, iv)
pb.meta.Stats.NumTombstones++
}
return nil
})
if err != nil {
return err
}
pb.tombstones = stones
if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil {
return err
}
return writeMetaFile(pb.dir, &pb.meta)
}
CleanTombstones
// CleanTombstones will rewrite the block if there any tombstones to remove them
// and returns if there was a re-write.
func (pb *Block) CleanTombstones(dest string, c Compactor) (bool, error) {
numStones := 0
pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
for _ = range ivs {
numStones++
}
return nil
})
if numStones == 0 {
return false, nil
}
if _, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime); err != nil {
return false, err
}
return true, nil
}
Snapshot
疑問(wèn), 這里僅對(duì)目標(biāo)文件夾及其內(nèi)部文件做了 hardlink, 怎么確保內(nèi)容不變?
head.go
Head
Head 向調(diào)用方提供, 用于某個(gè)時(shí)間段內(nèi)的數(shù)據(jù)讀寫(xiě).
Head 會(huì)同時(shí)處理 WAL 內(nèi)的和已經(jīng)持久化的數(shù)據(jù).
Head 可以認(rèn)為是current Block
所有 Block 不可再寫(xiě)入, Head 在寫(xiě)入有效期過(guò)后會(huì)轉(zhuǎn)化為 Block 進(jìn)行持久化.
Appender
// Appender returns a new Appender on the database.
// 會(huì)根據(jù)具體情形決定返回的 Appender 實(shí)例
// Appender 實(shí)例共兩類
// initAppender 會(huì)在接收到第一個(gè)數(shù)據(jù)點(diǎn)時(shí)初始化 Head 的起始時(shí)間
// headAppender 邏輯相對(duì)簡(jiǎn)單
func (h *Head) Appender() Appender {
h.metrics.activeAppenders.Inc()
// The head cache might not have a starting point yet. The init appender
// picks up the first appended timestamp as the base.
if h.MinTime() == math.MinInt64 {
return &initAppender{head: h}
}
return h.appender()
}
func (h *Head) appender() *headAppender {
return &headAppender{
head: h,
mint: h.MaxTime() - h.chunkRange/2,
samples: h.getAppendBuffer(),
highTimestamp: math.MinInt64,
}
}
querier.go
圍繞以下三個(gè)接口, 向調(diào)用方提供查詢能力.
// Querier provides querying access over time series data of a fixed
// time range.
type Querier interface {
// Select returns a set of series that matches the given label matchers.
Select(...labels.Matcher) (SeriesSet, error)
// LabelValues returns all potential values for a label name.
LabelValues(string) ([]string, error)
// LabelValuesFor returns all potential values for a label name.
// under the constraint of another label.
LabelValuesFor(string, labels.Label) ([]string, error)
// Close releases the resources of the Querier.
Close() error
}
// Series exposes a single time series.
type Series interface {
// Labels returns the complete set of labels identifying the series.
Labels() labels.Labels
// Iterator returns a new iterator of the data of the series.
Iterator() SeriesIterator
}
// SeriesSet contains a set of series.
type SeriesSet interface {
Next() bool
At() Series
Err() error
}
querier, blockQuerier
blockQuerier 是針對(duì)一個(gè) block 的 Querier
querier 是 blockQuerier 的聚合
db.go
Appender
Appender 是寫(xiě)入接口, *Head 就實(shí)現(xiàn)了 Appender
// Appender allows appending a batch of data. It must be completed with a
// call to Commit or Rollback and must not be reused afterwards.
//
// Operations on the Appender interface are not goroutine-safe.
type Appender interface {
// Add adds a sample pair for the given series. A reference number is
// returned which can be used to add further samples in the same or later
// transactions.
// Returned reference numbers are ephemeral and may be rejected in calls
// to AddFast() at any point. Adding the sample via Add() returns a new
// reference number.
// If the reference is the empty string it must not be used for caching.
Add(l labels.Labels, t int64, v float64) (uint64, error)
// Add adds a sample pair for the referenced series. It is generally faster
// than adding a sample by providing its full label set.
AddFast(ref uint64, t int64, v float64) error
// Commit submits the collected samples and purges the batch.
Commit() error
// Rollback rolls back all modifications made in the appender so far.
Rollback() error
}
DB
DB 是向調(diào)用者提供的最主要的結(jié)構(gòu)體.
// DB handles reads and writes of time series falling into
// a hashed partition of a seriedb.
type DB struct {
dir string
lockf *lockfile.Lockfile
logger log.Logger
metrics *dbMetrics
opts *Options
chunkPool chunkenc.Pool
compactor Compactor
// Mutex for that must be held when modifying the general block layout.
mtx sync.RWMutex
blocks []*Block
head *Head
compactc chan struct{}
donec chan struct{}
stopc chan struct{}
// cmtx is used to control compactions and deletions.
cmtx sync.Mutex
compactionsEnabled bool
}
reload
// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes
// a list of block directories which should be deleted during reload.
func (db *DB) reload(deleteable ...string) (err error) {
// ...
// 讀取當(dāng)前所有的 block 目錄
dirs, err := blockDirs(db.dir)
// ...
var (
blocks []*Block
exist = map[ulid.ULID]struct{}{}
)
for _, dir := range dirs {
meta, err := readMetaFile(dir)
// ...
// 嘗試獲取目錄對(duì)應(yīng)的 Block, 先從內(nèi)存, 再?gòu)挠脖P(pán)
b, ok := db.getBlock(meta.ULID)
if !ok {
b, err = OpenBlock(dir, db.chunkPool)
// ...
}
blocks = append(blocks, b)
exist[meta.ULID] = struct{}{}
}
// 按照 Block 覆蓋的時(shí)間重新排序
if err := validateBlockSequence(blocks); err != nil {
return errors.Wrap(err, "invalid block sequence")
}
// ...
// 清除不必要的 Block 文件
for _, b := range oldBlocks {
if _, ok := exist[b.Meta().ULID]; ok {
continue
}
if err := b.Close(); err != nil {
level.Warn(db.logger).Log("msg", "closing block failed", "err", err)
}
if err := os.RemoveAll(b.Dir()); err != nil {
level.Warn(db.logger).Log("msg", "deleting block failed", "err", err)
}
}
// Garbage collect data in the head if the most recent persisted block
// covers data of its current time range.
if len(blocks) == 0 {
return nil
}
maxt := blocks[len(blocks)-1].Meta().MaxTime
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
}
run
run 方法在 Open 時(shí)被調(diào)用, 在一個(gè)單獨(dú)的 goroutine 中執(zhí)行, 主要是定期對(duì)數(shù)據(jù)進(jìn)行壓縮以節(jié)省空間
func (db *DB) run() {
defer close(db.donec)
backoff := time.Duration(0)
for {
select {
case <-db.stopc:
return
case <-time.After(backoff):
}
select {
case <-time.After(1 * time.Minute):
select {
case db.compactc <- struct{}{}:
default:
}
case <-db.compactc:
// 執(zhí)行壓縮相關(guān)代碼
case <-db.stopc:
return
}
}
}
Appender
返回的是封裝的結(jié)果 dbAppender, 后面專門(mén)再分析
Qurier
返回的是所有指定時(shí)間范圍內(nèi)的 Block 聚合
// Querier returns a new querier over the data partition for the given time range.
// A goroutine must not handle more than one open Querier.
func (db *DB) Querier(mint, maxt int64) (Querier, error) {
var blocks []BlockReader
db.mtx.RLock()
defer db.mtx.RUnlock()
for _, b := range db.blocks {
m := b.Meta()
// 找出符合時(shí)間段的 block
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
blocks = append(blocks, b)
}
}
// 前面提到, Head 可以視作當(dāng)前 Block
if maxt >= db.head.MinTime() {
blocks = append(blocks, db.head)
}
// Block 的聚合
sq := &querier{
blocks: make([]Querier, 0, len(blocks)),
}
for _, b := range blocks {
q, err := NewBlockQuerier(b, mint, maxt)
if err == nil {
sq.blocks = append(sq.blocks, q)
continue
}
// If we fail, all previously opened queriers must be closed.
for _, q := range sq.blocks {
q.Close()
}
return nil, errors.Wrapf(err, "open querier for block %s", b)
}
return sq, nil
}
Delete
這邊實(shí)際會(huì)將 Delete 操作分給各個(gè)受影響的 Block
CleanTombstone
前面提到, 各個(gè) Block Delete 內(nèi)的邏輯實(shí)際是寫(xiě) WAL 以及 Tombstone 文件
這里會(huì)對(duì)當(dāng)前所有 Block 真正進(jìn)行清理, 然后調(diào)用 reload
方法.
dbAppender
是對(duì) *headAppender 的封裝, 在 Commit 的時(shí)候觸發(fā) compact
// Appender opens a new appender against the database.
func (db *DB) Appender() Appender {
return dbAppender{db: db, Appender: db.head.Appender()}
}
// dbAppender wraps the DB's head appender and triggers compactions on commit
// if necessary.
type dbAppender struct {
Appender
db *DB
}
func (a dbAppender) Commit() error {
err := a.Appender.Commit()
// We could just run this check every few minutes practically. But for benchmarks
// and high frequency use cases this is the safer way.
if a.db.head.MaxTime()-a.db.head.MinTime() > a.db.head.chunkRange/2*3 {
select {
case a.db.compactc <- struct{}{}:
default:
}
}
return err
}
Summary
prometheus/tsdb
(下稱 ptsdb ) 的結(jié)構(gòu)體之間的層次大概可以這樣劃分:
-
DB: 對(duì)外提供的核心對(duì)象
- Block 已經(jīng)持久化的, 覆蓋某個(gè)時(shí)間段的時(shí)序數(shù)據(jù). Block 的
- Index: 用于保存 labels 的索引數(shù)據(jù)
- Chunk: 用于保存時(shí)間戳-采樣值 數(shù)據(jù)
- Block 已經(jīng)持久化的, 覆蓋某個(gè)時(shí)間段的時(shí)序數(shù)據(jù). Block 的
- Head: 由于 ptsdb 規(guī)定, 數(shù)據(jù)必須增序?qū)懭? 已經(jīng)持久化的 Block 不能再寫(xiě)入, 因此一個(gè)時(shí)刻只會(huì)有一個(gè)可供寫(xiě)入的 Block, 即 Head. Head 同時(shí)還承擔(dān)記錄刪除動(dòng)作的任務(wù)
- WAL 增刪改的動(dòng)作都會(huì)先進(jìn)入 WAL, 供后續(xù)恢復(fù)用
- Tombstone: 用于標(biāo)記刪除動(dòng)作, 被標(biāo)記的數(shù)據(jù)在 compact 的時(shí)候統(tǒng)一清理
- Compactor: 對(duì)文件進(jìn)行壓縮. Block 數(shù)據(jù)的組織參考了 LSM, 因此 Compactor 的實(shí)現(xiàn)也和基于 LSM 的 kv db 類似.
關(guān)于 ptsdb, 時(shí)間序列數(shù)據(jù)的存儲(chǔ)和計(jì)算 - 開(kāi)源時(shí)序數(shù)據(jù)庫(kù)解析(四) 這篇文章有更宏觀的闡述, 可以參考.