fileutil
fileutil
├── dir_unix.go
├── dir_windows.go
├── fileutil.go
├── mmap.go
├── mmap_unix.go
├── mmap_windows.go
├── preallocate.go
├── preallocate_darwin.go
├── preallocate_linux.go
├── preallocate_other.go
├── sync.go
├── sync_darwin.go
└── sync_linux.go
fileutil 提供了一些操作文件/目錄的函數(shù), 處理了不同平臺 (主要是 win) 的兼容性問題.
除非遇到類似的場景需要相關(guān)的處理思路.
直接閱讀 godoc 即可.
index
index
├── encoding_helpers.go
├── index.go
├── index_test.go
├── postings.go
└── postings_test.go
index 實(shí)現(xiàn)針對 labels 的索引.
在 prometheus/tsdb 中, 認(rèn)為 labels + timestamp + value 是一個(gè)完整的數(shù)據(jù)點(diǎn)
chunks 相關(guān)的代碼用于存儲 timestamp + value, 而 index 則是對于 labels 的處理.
encbuf, decbuf
作為 buffer 在 index 數(shù)據(jù)編碼/解碼時(shí)進(jìn)行 復(fù)用
實(shí)際上這里也定義了一些數(shù)據(jù)格式如何進(jìn)行存儲
// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!).
// 對于字符串, 分別寫入長度及字符串本身
func (e *encbuf) putUvarintStr(s string) {
b := *(*[]byte)(unsafe.Pointer(&s))
e.putUvarint(len(b))
e.putString(s)
}
// 相應(yīng)地, 在解碼時(shí)也會先確定 str 長度, 再從整個(gè) []byte 中取出必要的部分
func (d *decbuf) uvarintStr() string {
l := d.uvarint64()
if d.e != nil {
return ""
}
if len(d.b) < int(l) {
d.e = errInvalidSize
return ""
}
s := string(d.b[:l])
d.b = d.b[l:]
return s
}
indexWriterSeries
type indexWriterSeries struct {
// labels 的實(shí)際內(nèi)容, 即 kv 對
labels labels.Labels
// 這里重要的實(shí)際是 Meta.Ref, 即每個(gè) chunk 對應(yīng)的文件/起點(diǎn)
chunks []chunks.Meta // series file offset of chunks
// 這里是 labels 數(shù)據(jù)在文件中的 offset
offset uint32 // index file offset of series reference
}
indexTOC
index table of contents, 記錄 index 不同類型數(shù)據(jù)的位置
type indexTOC struct {
symbols uint64
series uint64
labelIndices uint64
labelIndicesTable uint64
postings uint64
postingsTable uint64
}
Writer
實(shí)現(xiàn) IndexWriter , 基于文件的 index 存儲
index 的文件格式要比 chunk 復(fù)雜的多, 可以參考 Documentation/format/index.md
每個(gè) index 文件的寫入分為 5 個(gè)階段, 順序執(zhí)行.
type indexWriterStage uint8
const (
idxStageNone indexWriterStage = iota
idxStageSymbols
idxStageSeries
idxStageLabelIndex
idxStagePostings
idxStageDone
)
// ensureStage handles transitions between write stages and ensures that IndexWriter
// methods are called in an order valid for the implementation.
func (w *Writer) ensureStage(s indexWriterStage) error {
if w.stage == s {
return nil
}
// 排在當(dāng)前階段之前的, 不可再執(zhí)行
if w.stage > s {
return errors.Errorf("invalid stage %q, currently at %q", s, w.stage)
}
// Mark start of sections in table of contents.
switch s {
// ...
// 執(zhí)行到完成階段時(shí), 自動(dòng)寫入必要的輔助信息
case idxStageDone:
w.toc.labelIndicesTable = w.pos
if err := w.writeOffsetTable(w.labelIndexes); err != nil {
return err
}
w.toc.postingsTable = w.pos
if err := w.writeOffsetTable(w.postings); err != nil {
return err
}
if err := w.writeTOC(); err != nil {
return err
}
}
w.stage = s
return nil
}
AddSymbols
func (w *Writer) AddSymbols(sym map[string]struct{}) error {
if err := w.ensureStage(idxStageSymbols); err != nil {
return err
}
// ...
return errors.Wrap(err, "write symbols")
}
label 中的每一個(gè)鍵或值都是一個(gè) symbol.
通過 "使用對 symbol 的引用" 的方式, 來縮減后續(xù)索引文件中的空間占用.
AddSeries
func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta) error {
if err := w.ensureStage(idxStageSeries); err != nil {
return err
}
if labels.Compare(lset, w.lastSeries) <= 0 {
return errors.Errorf("out-of-order series added with label set %q", lset)
}
// 記錄每個(gè)時(shí)間序列的位置
if _, ok := w.seriesOffsets[ref]; ok {
return errors.Errorf("series with reference %d already added", ref)
}
w.seriesOffsets[ref] = w.pos
w.buf2.reset()
w.buf2.putUvarint(len(lset))
// 對于每個(gè) label, 分別記錄 它的 name 和 value 在索引文件中的位置
for _, l := range lset {
offset, ok := w.symbols[l.Name]
// ...
offset, ok = w.symbols[l.Value]
// ...
}
w.buf2.putUvarint(len(chunks))
// 對于 chunk 數(shù)據(jù), 記錄它覆蓋的時(shí)間范圍, 以及存儲地址
// 除第一個(gè) chunk 外, 其他記錄的都是變化量
if len(chunks) > 0 {
c := chunks[0]
w.buf2.putVarint64(c.MinTime)
w.buf2.putUvarint64(uint64(c.MaxTime - c.MinTime))
w.buf2.putUvarint64(c.Ref)
t0 := c.MaxTime
ref0 := int64(c.Ref)
for _, c := range chunks[1:] {
w.buf2.putUvarint64(uint64(c.MinTime - t0))
w.buf2.putUvarint64(uint64(c.MaxTime - c.MinTime))
t0 = c.MaxTime
w.buf2.putVarint64(int64(c.Ref) - ref0)
ref0 = int64(c.Ref)
}
}
// ...
return nil
}
WriteLabelIndex
// 這里傳入的參數(shù)可以認(rèn)為是下述結(jié)構(gòu)
// 其中每一組 value 都是 names 的一組取值組合
// type Label struct {
// names []string
// valus [][]string
// }
func (w *Writer) WriteLabelIndex(names []string, values []string) error {
if len(values)%len(names) != 0 {
return errors.Errorf("invalid value list length %d for %d names", len(values), len(names))
}
if err := w.ensureStage(idxStageLabelIndex); err != nil {
return errors.Wrap(err, "ensure stage")
}
// ...
// 所有 hash entry 會統(tǒng)一在后續(xù)階段寫入
w.labelIndexes = append(w.labelIndexes, hashEntry{
keys: names,
offset: w.pos,
})
// ...
// 對于每個(gè) value, 都只寫入引用值
for _, v := range valt.s {
offset, ok := w.symbols[v]
// ...
}
// ...
err = w.write(w.buf1.get(), w.buf2.get())
return errors.Wrap(err, "write label index")
}
WritePostings
// Postings 用來記錄每一個(gè) label (一對 name, value) 對應(yīng)了哪些數(shù)據(jù)塊, 用于檢索
func (w *Writer) WritePostings(name, value string, it Postings) error {
// ...
// 每一對 name-value 對應(yīng)的數(shù)據(jù)位置
w.postings = append(w.postings, hashEntry{
keys: []string{name, value},
offset: w.pos,
})
// Order of the references in the postings list does not imply order
// of the series references within the persisted block they are mapped to.
// We have to sort the new references again.
refs := w.uint32s[:0]
for it.Next() {
offset, ok := w.seriesOffsets[it.At()]
// ...
refs = append(refs, uint32(offset))
}
if err := it.Err(); err != nil {
return err
}
sort.Sort(uint32slice(refs))
// ...
err := w.write(w.buf1.get(), w.buf2.get())
return errors.Wrap(err, "write postings")
}
Close
func (w *Writer) Close() error {
// 這里會自動(dòng)執(zhí)行 labelIndexes, postings, toc 的寫入
if err := w.ensureStage(idxStageDone); err != nil {
return err
}
// 文件落盤
if err := w.fbuf.Flush(); err != nil {
return err
}
if err := fileutil.Fsync(w.f); err != nil {
return err
}
return w.f.Close()
}
// writeOffsetTable writes a sequence of readable hash entries.
func (w *Writer) writeOffsetTable(entries []hashEntry) error {
w.buf2.reset()
w.buf2.putBE32int(len(entries))
for _, e := range entries {
w.buf2.putUvarint(len(e.keys))
for _, k := range e.keys {
w.buf2.putUvarintStr(k)
}
w.buf2.putUvarint64(e.offset)
}
w.buf1.reset()
w.buf1.putBE32int(w.buf2.len())
w.buf2.putHash(w.crc32)
return w.write(w.buf1.get(), w.buf2.get())
}
func (w *Writer) writeTOC() error {
w.buf1.reset()
w.buf1.putBE64(w.toc.symbols)
w.buf1.putBE64(w.toc.series)
w.buf1.putBE64(w.toc.labelIndices)
w.buf1.putBE64(w.toc.labelIndicesTable)
w.buf1.putBE64(w.toc.postings)
w.buf1.putBE64(w.toc.postingsTable)
w.buf1.putHash(w.crc32)
return w.write(w.buf1.get())
}
Reader
實(shí)現(xiàn)了 IndexReader
func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
r := &Reader{
// ...
}
// Verify magic number.
// ...
// toc 在文件尾部, 且長度固定, 因此可以直接讀出
if err := r.readTOC(); err != nil {
return nil, errors.Wrap(err, "read TOC")
}
if err := r.readSymbols(int(r.toc.symbols)); err != nil {
return nil, errors.Wrap(err, "read symbols")
}
var err error
err = r.readOffsetTable(r.toc.labelIndicesTable, func(key []string, off uint32) error {
// 不知道這里為什么會強(qiáng)制長度為 1?
// 根據(jù) Writer.WriteLabelIndex 的定義, 明顯是支持多 names 的
// 實(shí)際驗(yàn)證, 多 names 寫入沒有問題, 但在讀取的時(shí)候會在這里報(bào)錯(cuò)
// 等待后續(xù)看相關(guān)代碼來理解吧.
if len(key) != 1 {
return errors.Errorf("unexpected key length %d", len(key))
}
r.labels[key[0]] = off
return nil
})
if err != nil {
return nil, errors.Wrap(err, "read label index table")
}
err = r.readOffsetTable(r.toc.postingsTable, func(key []string, off uint32) error {
// ...
return nil
})
if err != nil {
return nil, errors.Wrap(err, "read postings table")
}
r.dec = &DecoderV1{symbols: r.symbols}
return r, nil
}
Postings
Posting 及其實(shí)現(xiàn)的具體作用, 待閱讀剩余部分的代碼后再回過頭來確認(rèn).
這是一個(gè) Iterator.
// Postings provides iterative access over a postings list.
type Postings interface {
// Next advances the iterator and returns true if another value was found.
Next() bool
// Seek advances the iterator to value v or greater and returns
// true if a value was found.
Seek(v uint64) bool
// At returns the value at the current iterator position.
At() uint64
// Err returns the last error of the iterator.
Err() error
}
-
給出了 Posting 的交集, 并集, 以及差集實(shí)現(xiàn)
// Intersect returns a new postings list over the intersection of the // input postings. func Intersect(its ...Postings) Postings { if len(its) == 0 { return emptyPostings } if len(its) == 1 { return its[0] } l := len(its) / 2 return newIntersectPostings(Intersect(its[:l]...), Intersect(its[l:]...)) } type intersectPostings struct { a, b Postings aok, bok bool cur uint64 }
// Merge returns a new iterator over the union of the input iterators. func Merge(its ...Postings) Postings { if len(its) == 0 { return EmptyPostings() } if len(its) == 1 { return its[0] } l := len(its) / 2 return newMergedPostings(Merge(its[:l]...), Merge(its[l:]...)) } type mergedPostings struct { a, b Postings initialized bool aok, bok bool cur uint64 }
// Without returns a new postings list that contains all elements from the full list that // are not in the drop list func Without(full, drop Postings) Postings { return newRemovedPostings(full, drop) } type removedPostings struct { full, remove Postings cur uint64 initialized bool fok, rok bool }
?
-
給出了幾種特定類型的 Postings
// EmptyPostings returns a postings list that's always empty. func EmptyPostings() Postings { return emptyPostings }
// ErrPostings returns new postings that immediately error. func ErrPostings(err error) Postings { return errPostings{err} }
// listPostings implements the Postings interface over a plain list. type listPostings struct { list []uint64 cur uint64 }
// bigEndianPostings implements the Postings interface over a byte stream of // big endian numbers. type bigEndianPostings struct { list []byte cur uint32 }
?
MemPostings
label - posting idx 的映射記錄器
// MemPostings holds postings list for series ID per label pair. They may be written
// to out of order.
// ensureOrder() must be called once before any reads are done. This allows for quick
// unordered batch fills on startup.
type MemPostings struct {
mtx sync.RWMutex
// label 和 posting id 的關(guān)聯(lián)
m map[labels.Label][]uint64
// 成功執(zhí)行 EnsureOrder 之后置為 true
ordered bool
}
labels
labels
├── labels.go
├── labels_test.go
└── selector.go
labels 是標(biāo)簽, 對應(yīng) influxdb 中的 tags, 即一組鍵值對.
在 promethues/tsdb 中, timestamp 和 value 之外的所有信息都放在 labels 中
這個(gè) pkg 的核心就是 Label, Labels, 以及 Labels 的 Matcher
Label
// Label is a key/value pair of strings.
type Label struct {
Name, Value string
}
就是鍵值對
Labels
// Labels is a sorted set of labels. Order has to be guaranteed upon
// instantiation.
type Labels []Label
在實(shí)際使用中, Labels 都應(yīng)該是應(yīng)該排序的. 因此 Labels 首先實(shí)現(xiàn)了 sort.Interface
.
同時(shí), Labels 之間也是可以進(jìn)行比較的
// Compare compares the two label sets.
// The result will be 0 if a==b, <0 if a < b, and >0 if a > b.
func Compare(a, b Labels) int {
l := len(a)
if len(b) < l {
l = len(b)
}
// 逐個(gè) label 比較 name, value 的字母序
for i := 0; i < l; i++ {
if d := strings.Compare(a[i].Name, b[i].Name); d != 0 {
return d
}
if d := strings.Compare(a[i].Value, b[i].Value); d != 0 {
return d
}
}
// If all labels so far were in common, the set with fewer labels comes first.
// 可比較的部分無法確定順序, 則比較兩者長度
return len(a) - len(b)
}
Slice
Slice
是 Labels
的切片
因?yàn)?Labels
可比較, 因此 Slice
也實(shí)現(xiàn)了 sort.Interface
Matcher
// Matcher specifies a constraint for the value of a label.
type Matcher interface {
// Name returns the label name the matcher should apply to.
Name() string
// Matches checks whether a value fulfills the constraints.
Matches(v string) bool
}
Matcher
用來篩選 Labels
這里提供了 equal, prefix, regexp, not 四種基本的 Matcher
tsdbutil
tsdbutil
├── buffer.go
└── buffer_test.go
tsdbutil 目前只提供了一個(gè) BufferedSeriesIterator
sampleRing
type sample struct {
t int64
v float64
}
// 既然是 Ring, 那么 buf 就是環(huán)裝的, 因此有輔助的 i, f, l
type sampleRing struct {
delta int64
buf []sample // lookback buffer
i int // position of most recent element in ring buffer
f int // position of first element in ring buffer
l int // number of elements in buffer
}
sampleRing 用來處理數(shù)據(jù)點(diǎn)的采樣
// add adds a sample to the ring buffer and frees all samples that fall
// out of the delta range.
func (r *sampleRing) add(t int64, v float64) {
l := len(r.buf)
// Grow the ring buffer if it fits no more elements.
if l == r.l {
// ring buffer 的擴(kuò)容
buf := make([]sample, 2*l)
copy(buf[l+r.f:], r.buf[r.f:])
copy(buf, r.buf[:r.f])
r.buf = buf
r.i = r.f
r.f += l
} else {
r.i++
if r.i >= l {
r.i -= l
}
}
r.buf[r.i] = sample{t: t, v: v}
r.l++
// Free head of the buffer of samples that just fell out of the range.
// 這里認(rèn)為 add 是有序的, 將頭部所有早于 `t - r.delta` 的數(shù)據(jù)點(diǎn)移出有效區(qū)域
for r.buf[r.f].t < t-r.delta {
r.f++
if r.f >= l {
r.f -= l
}
r.l--
}
}
sampleRingIterator
type sampleRingIterator struct {
r *sampleRing
i int
}
sampleRingIterator 是 SeriesIterator 的實(shí)現(xiàn)
BufferedSeriesIterator
BufferedSeriesIterator 同樣也實(shí)現(xiàn)了 SeriesIterator, 它將一段部分?jǐn)?shù)據(jù)點(diǎn)通過 sampleRing 緩存下來, 具體效果, 待閱讀其他代碼.
// BufferedSeriesIterator wraps an iterator with a look-back buffer.
type BufferedSeriesIterator struct {
it tsdb.SeriesIterator
buf *sampleRing
lastTime int64
}
// NewBuffer returns a new iterator that buffers the values within the time range
// of the current element and the duration of delta before.
// BufferedSeriesIterator 的作用是對上層 Iter 進(jìn)行封裝
// 將其中最多 delta 時(shí)間范圍內(nèi)的數(shù)據(jù)點(diǎn)通過 sampleRing 緩存下來
func NewBuffer(it tsdb.SeriesIterator, delta int64) *BufferedSeriesIterator {
return &BufferedSeriesIterator{
it: it,
buf: newSampleRing(delta, 16),
lastTime: math.MinInt64,
}
}
Seek
// Seek advances the iterator to the element at time t or greater.
// 這里的 `指針` 只會向后移動(dòng), 不會向前
func (b *BufferedSeriesIterator) Seek(t int64) bool {
t0 := t - b.buf.delta
// If the delta would cause us to seek backwards, preserve the buffer
// and just continue regular advancement while filling the buffer on the way.
// 此時(shí) sampleRing 中的點(diǎn)都會失效, 因此直接重置
if t0 > b.lastTime {
b.buf.reset()
ok := b.it.Seek(t0)
if !ok {
return false
}
b.lastTime, _ = b.At()
}
if b.lastTime >= t {
return true
}
for b.Next() {
if b.lastTime >= t {
return true
}
}
return false
}