存儲(chǔ)在Influxdb中的數(shù)據(jù)類型
存儲(chǔ)每條數(shù)據(jù)時(shí)的時(shí)間戳類型
- time
Field字段的類型
- interger - int64
- unsigned - uint64
- float64
- boolean
- string
Field字段的類型在源碼中對(duì)應(yīng)類型
對(duì)應(yīng)的類型是Value
,這是個(gè)interface,定義在tsdb/engine/tsm1/encoding.go
中
- IntegerValue
- UnsignedValue
- FloatValue
- BooleanValue
- StringValue
上面的每個(gè)類型都包括一個(gè)時(shí)間戳银觅,這個(gè)時(shí)間戳就是這個(gè)數(shù)據(jù)被寫入時(shí)的時(shí)間戳像捶,我們看一下FloadValue
的定義:
type FloatValue struct {
unixnano int64
value float64
}
編解碼
每種類型在存儲(chǔ)時(shí)都需要作編碼垛玻,盡可能地作壓縮,所有針對(duì)各個(gè)類型均提供了Encoder和Decoder酸纲。
這些Encoder負(fù)責(zé)將一組相同類型的Value
作壓縮編碼兔毒,具體的編碼算法我們這里不再展開嫉父。
我們針對(duì)FloatValue
作一下分析encodeFloatBlockUsing
參數(shù)中values []Value就是一系列的
FloatValue`,不僅包括Float值,還包括對(duì)應(yīng)的時(shí)間戳眼刃,都需要被編碼
func encodeFloatBlockUsing(buf []byte, values []Value, tsenc TimeEncoder, venc *FloatEncoder) ([]byte, error) {
tsenc.Reset()
venc.Reset()
for _, v := range values {
vv := v.(FloatValue)
tsenc.Write(vv.unixnano) //使用TimeEncoder編碼每個(gè)時(shí)間戳
venc.Write(vv.value) //使用FloatEncoder編碼每個(gè)Float值
}
venc.Flush()
// Encoded timestamp values
tb, err := tsenc.Bytes()
if err != nil {
return nil, err
}
// Encoded float values
vb, err := venc.Bytes()
if err != nil {
return nil, err
}
// Prepend the first timestamp of the block in the first 8 bytes and the block
// in the next byte, followed by the block
// 將這一組FloatValue打包到一個(gè)Block
return packBlock(buf, BlockFloat64, tb, vb), nil
}
打包到DataBlock
DataBlock是寫入和讀取TSM文件的最小單位,每個(gè)DataBlock里存儲(chǔ)的都是同樣類型的Value,每個(gè)DataBlock里的Value對(duì)應(yīng)都是同一個(gè)寫入的Key,這個(gè)Key是series key + field
;
Influxdb算是列存儲(chǔ)摇肌,在這里所有的Value是連續(xù)存在一起擂红,這些Value對(duì)應(yīng)的時(shí)間戳也是連續(xù)存在一起,這樣更有利于作壓縮
這個(gè)結(jié)構(gòu)中并沒有記錄Values部分的長度围小,這是因?yàn)槲覀冇涗浟藭r(shí)間戳部分的總長昵骤,在解析時(shí)間戳部分時(shí)候我們可以得知有幾個(gè)時(shí)間戳,也就知道了有幾個(gè)Value肯适。
我們來看一下打包過程,結(jié)合上面的結(jié)構(gòu)圖变秦,這個(gè)過程就很簡單了:
func packBlock(buf []byte, typ byte, ts []byte, values []byte) []byte {
// We encode the length of the timestamp block using a variable byte encoding.
// This allows small byte slices to take up 1 byte while larger ones use 2 or more.
sz := 1 + binary.MaxVarintLen64 + len(ts) + len(values)
if cap(buf) < sz {
buf = make([]byte, sz)
}
b := buf[:sz]
b[0] = typ
i := binary.PutUvarint(b[1:1+binary.MaxVarintLen64], uint64(len(ts)))
i += 1
// block is <len timestamp bytes>, <ts bytes>, <value bytes>
copy(b[i:], ts)
// We don't encode the value length because we know it's the rest of the block after
// the timestamp block.
copy(b[i+len(ts):], values)
return b[:i+len(ts)+len(values)]
}
解包DataBlock
我們還以FloatValue
為例
func DecodeFloatBlock(block []byte, a *[]FloatValue) ([]FloatValue, error) {
// Block type is the next block, make sure we actually have a float block
blockType := block[0]
if blockType != BlockFloat64 {
return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockFloat64, blockType)
}
// 跳過1字節(jié)的block type
block = block[1:]
tb, vb, err := unpackBlock(block)
if err != nil {
return nil, err
}
//計(jì)算有多少組Value
sz := CountTimestamps(tb)
if cap(*a) < sz {
*a = make([]FloatValue, sz)
} else {
*a = (*a)[:sz]
}
tdec := timeDecoderPool.Get(0).(*TimeDecoder)
vdec := floatDecoderPool.Get(0).(*FloatDecoder)
var i int
err = func(a []FloatValue) error {
// Setup our timestamp and value decoders
tdec.Init(tb)
err = vdec.SetBytes(vb)
if err != nil {
return err
}
// Decode both a timestamp and value
j := 0
for j < len(a) && tdec.Next() && vdec.Next() {
a[j] = FloatValue{unixnano: tdec.Read(), value: vdec.Values()}
j++
}
i = j
// Did timestamp decoding have an error?
err = tdec.Error()
if err != nil {
return err
}
// Did float decoding have an error?
return vdec.Error()
}(*a)
timeDecoderPool.Put(tdec)
floatDecoderPool.Put(vdec)
return (*a)[:i], err
Dabablock的其他操作
-
BlockType
:取block byte buffer的第一個(gè)字節(jié)
func BlockType(block []byte) (byte, error) {
blockType := block[0]
switch blockType {
case BlockFloat64, BlockInteger, BlockUnsigned, BlockBoolean, BlockString:
return blockType, nil
default:
return 0, fmt.Errorf("unknown block type: %d", blockType)
}
}
-
BlockCount
: 獲取一個(gè)DabaBlock中包含的Value數(shù)量
func BlockCount(block []byte) int {
if len(block) <= encodedBlockHeaderSize {
panic(fmt.Sprintf("count of short block: got %v, exp %v", len(block), encodedBlockHeaderSize))
}
// first byte is the block type
tb, _, err := unpackBlock(block[1:])
if err != nil {
panic(fmt.Sprintf("BlockCount: error unpacking block: %s", err.Error()))
}
return CountTimestamps(tb)
}
-
DecodeBlock
: 解碼一個(gè)DabaBlock,根據(jù)BlockType的不同調(diào)用不同的Decode方法
func DecodeBlock(block []byte, vals []Value) ([]Value, error) {
if len(block) <= encodedBlockHeaderSize {
panic(fmt.Sprintf("decode of short block: got %v, exp %v", len(block), encodedBlockHeaderSize))
}
blockType, err := BlockType(block)
if err != nil {
return nil, err
}
switch blockType {
case BlockFloat64:
var buf []FloatValue
decoded, err := DecodeFloatBlock(block, &buf)
if len(vals) < len(decoded) {
vals = make([]Value, len(decoded))
}
for i := range decoded {
vals[i] = decoded[i]
}
return vals[:len(decoded)], err
case BlockInteger:
...
case BlockUnsigned:
...
case BlockBoolean:
...
case BlockString:
...
default:
panic(fmt.Sprintf("unknown block type: %d", blockType))
}
}
WALEntry
- WAL在寫入TSM文件時(shí)用作預(yù)寫日志。
- 每個(gè)DB的每個(gè)RetentionPolicy下面的每個(gè)Shard下都有自己的一個(gè)單獨(dú)的WAL文件目錄框舔,Influxdb在啟動(dòng)的配置文件中需設(shè)置單獨(dú)的WAL目錄蹦玫,來存儲(chǔ)所有Shard的WAL文件。
- 每個(gè)Shard都對(duì)應(yīng)一個(gè)WAL目錄刘绣,目錄下有多個(gè)wal文件樱溉,每個(gè)稱作一個(gè)
WALSegment
,默認(rèn)大小是10M。文件命名規(guī)則是纬凤,以_
開頭福贞,中間是ID,擴(kuò)展名是wal
, 比如_00001.wal
- 每次寫入WAL的內(nèi)容稱為一個(gè)
WALEntry
, 在寫入和讀取這個(gè)Entry時(shí)需要序列化和反序列化,我們先來看一下其定義:
type WALEntry interface {
Type() WalEntryType // Entry的類型: WriteWALEntry停士, DeleteWALEntry, DeleteRangeWALEntry
Encode(dst []byte) ([]byte, error)
MarshalBinary() ([]byte, error) //使用上面的Encode方法作序列化
UnmarshalBinary(b []byte) error //反序列化
MarshalSize() int
}
我們下面來分析一下具體的三種WALEntry
WriteWALEntry
- 一組
point
組成一個(gè)WriteEALEntry
,然后寫入WALSegment
;
point
是一個(gè)series對(duì)應(yīng)的一些field的集合挖帘,每個(gè)point
被唯一的series
+timestamp
標(biāo)識(shí)完丽,可以簡單將point
理解為就是一個(gè)insert
語句寫入的內(nèi)容。 - 定義:
type WriteWALEntry struct {
Values map[string][]Value
sz int
}
其中Valuse
是個(gè)map,它的key
是series key + field, 它的value
是具有相同的key的所有field value;其實(shí)就是把多個(gè)point按series key + field作了合并
-
結(jié)構(gòu)圖
序列化
Encode
:完全按照上面的結(jié)構(gòu)圖來寫入,比較清晰明了
func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) {
// 計(jì)算總大小拇舀,欲分配內(nèi)存
encLen := w.MarshalSize() // Type (1), Key Length (2), and Count (4) for each key
// allocate or re-slice to correct size
if len(dst) < encLen {
dst = make([]byte, encLen)
} else {
dst = dst[:encLen]
}
// Finally, encode the entry
var n int
var curType byte
// 遍歷Values,逐一編碼
for k, v := range w.Values {
// 確定field的類型
switch v[0].(type) {
case FloatValue:
curType = float64EntryType
case IntegerValue:
curType = integerEntryType
case UnsignedValue:
curType = unsignedEntryType
case BooleanValue:
curType = booleanEntryType
case StringValue:
curType = stringEntryType
default:
return nil, fmt.Errorf("unsupported value type: %T", v[0])
}
// 寫入類型
dst[n] = curType
n++
// 寫入key長度逻族,key = series key + field
binary.BigEndian.PutUint16(dst[n:n+2], uint16(len(k)))
n += 2
// 寫入 key
n += copy(dst[n:], k)
// 寫入 value個(gè)數(shù)
binary.BigEndian.PutUint32(dst[n:n+4], uint32(len(v)))
n += 4
// 逐一寫入合部的value
for _, vv := range v {
binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.UnixNano()))
n += 8
switch vv := vv.(type) {
case FloatValue:
if curType != float64EntryType {
return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
}
binary.BigEndian.PutUint64(dst[n:n+8], math.Float64bits(vv.value))
n += 8
case IntegerValue:
if curType != integerEntryType {
return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
}
binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value))
n += 8
case UnsignedValue:
if curType != unsignedEntryType {
return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
}
binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value))
n += 8
case BooleanValue:
if curType != booleanEntryType {
return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
}
if vv.value {
dst[n] = 1
} else {
dst[n] = 0
}
n++
case StringValue:
if curType != stringEntryType {
return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
}
binary.BigEndian.PutUint32(dst[n:n+4], uint32(len(vv.value)))
n += 4
n += copy(dst[n:], vv.value)
default:
return nil, fmt.Errorf("unsupported value found in %T slice: %T", v[0].Value(), vv)
}
}
}
return dst[:n], nil
}
DeleteWALEntry
- 刪除Series的WALEntry
- 定義:
type DeleteWALEntry struct {
Keys [][]byte
sz int
}
-
結(jié)構(gòu)圖
編碼
Encode
, 各個(gè)key間以\n
分隔
func (w *DeleteWALEntry) Encode(dst []byte) ([]byte, error) {
sz := w.MarshalSize()
if len(dst) < sz {
dst = make([]byte, sz)
}
var n int
for _, k := range w.Keys {
n += copy(dst[n:], k)
n += copy(dst[n:], "\n")
}
// We return n-1 to strip off the last newline so that unmarshalling the value
// does not produce an empty string
return []byte(dst[:n-1]), nil
}
DeleteRangeWALEntry
- 刪除某個(gè)時(shí)間范圍內(nèi)的series的WALEntry
- 定義:
type DeleteRangeWALEntry struct {
Keys [][]byte
Min, Max int64 // 開始時(shí)間戳和結(jié)束時(shí)間戳
sz int
}
-
結(jié)構(gòu)圖
- 編碼
Encode
func (w *DeleteRangeWALEntry) Encode(b []byte) ([]byte, error) {
sz := w.MarshalSize()
if len(b) < sz {
b = make([]byte, sz)
}
// 寫入開始和結(jié)束時(shí)間戳
binary.BigEndian.PutUint64(b[:8], uint64(w.Min))
binary.BigEndian.PutUint64(b[8:16], uint64(w.Max))
i := 16
// 逐一寫入key
for _, k := range w.Keys {
binary.BigEndian.PutUint32(b[i:i+4], uint32(len(k)))
i += 4
i += copy(b[i:], k)
}
return b[:i], nil
}
WALEntry的寫入
- 上面我們介紹了三種WALEntry,在序列化后就可以被寫入到WALSegment文件中了,在寫之前可能還需要作壓縮
- 寫入時(shí)候?yàn)榱俗x取時(shí)便于解析你稚,還需要按一定格式寫入
- 先寫入 1字節(jié) 的 WALEntry類型
- 再寫入 4字節(jié) 的 序列化后且作了壓縮的WALEntry的長度
- 最后寫入 序列化后且作了壓縮的WALEntry的具體內(nèi)容
- 使用
WALSegmentWriter
類來寫入:
func (w *WALSegmentWriter) Write(entryType WalEntryType, compressed []byte) error {
var buf [5]byte
// 寫入類型和具體內(nèi)容的長度
buf[0] = byte(entryType)
binary.BigEndian.PutUint32(buf[1:5], uint32(len(compressed)))
if _, err := w.bw.Write(buf[:]); err != nil {
return err
}
// 寫入具體內(nèi)容
if _, err := w.bw.Write(compressed); err != nil {
return err
}
w.size += len(buf) + len(compressed)
return nil
}
WAL
WAL封裝了一個(gè)預(yù)寫日志的所有操作瓷耙,正如前面提到了,一個(gè)Shard對(duì)應(yīng)一個(gè)WAL,一個(gè)WAL在寫入時(shí)又會(huì)產(chǎn)生多個(gè)WALSegment刁赖。
我們來分析一下一些主要的方法:
Open
操作
遍歷一個(gè)Shard目錄下的所有Segment文件搁痛,這些文件按id從小到大排序,作初始化操作
func (l *WAL) Open() error {
l.mu.Lock()
defer l.mu.Unlock()
..
if err := os.MkdirAll(l.path, 0777); err != nil {
return err
}
// 獲取所有segment 文件列表宇弛,按id從小到大排序鸡典,最后一個(gè)就是當(dāng)前正寫入的文件
segments, err := segmentFileNames(l.path)
if err != nil {
return err
}
if len(segments) > 0 {
// 最后一個(gè)就是當(dāng)前正寫入的文件
lastSegment := segments[len(segments)-1]
// 獲取最新的segment id
id, err := idFromFileName(lastSegment)
if err != nil {
return err
}
// 初始化當(dāng)前的segment id
l.currentSegmentID = id
stat, err := os.Stat(lastSegment)
if err != nil {
return err
}
if stat.Size() == 0 {
// 如果文件大小為0, 刪除
os.Remove(lastSegment)
segments = segments[:len(segments)-1]
} else {
//為寫入,打開該文件
fd, err := os.OpenFile(lastSegment, os.O_RDWR, 0666)
if err != nil {
return err
}
if _, err := fd.Seek(0, io.SeekEnd); err != nil {
return err
}
// 初始化當(dāng)前的SegmentWriter
l.currentSegmentWriter = NewWALSegmentWriter(fd)
// Reset the current segment size stat
atomic.StoreInt64(&l.stats.CurrentBytes, stat.Size())
}
}
...
l.closing = make(chan struct{})
return nil
}
writeToLog
寫入操作
func (l *WAL) writeToLog(entry WALEntry) (int, error) {
// 從buytesPool獲取byte slice, 避免反復(fù)重新分配內(nèi)存
bytes := bytesPool.Get(entry.MarshalSize())
// 將entry作編碼枪芒,前面已經(jīng)介紹過
b, err := entry.Encode(bytes)
if err != nil {
bytesPool.Put(bytes)
return -1, err
}
// 使用snappy壓縮強(qiáng)詞編碼后的entry內(nèi)容
encBuf := bytesPool.Get(snappy.MaxEncodedLen(len(b)))
compressed := snappy.Encode(encBuf, b)
bytesPool.Put(bytes)
syncErr := make(chan error)
segID, err := func() (int, error) {
l.mu.Lock()
defer l.mu.Unlock()
// Make sure the log has not been closed
select {
case <-l.closing:
return -1, ErrWALClosed
default:
}
// roll the segment file if needed
if err := l.rollSegment(); err != nil {
return -1, fmt.Errorf("error rolling WAL segment: %v", err)
}
// write and sync
// 使用SegmentWriter來寫入entry內(nèi)容
if err := l.currentSegmentWriter.Write(entry.Type(), compressed); err != nil {
return -1, fmt.Errorf("error writing WAL entry: %v", err)
}
select {
case l.syncWaiters <- syncErr:
default:
return -1, fmt.Errorf("error syncing wal")
}
// 將執(zhí)行file sync操作彻况,刷到磁盤文件
l.scheduleSync()
// Update stats for current segment size
atomic.StoreInt64(&l.stats.CurrentBytes, int64(l.currentSegmentWriter.size))
l.lastWriteTime = time.Now().UTC()
return l.currentSegmentID, nil
}()
bytesPool.Put(encBuf)
if err != nil {
return segID, err
}
// schedule an fsync and wait for it to complete
return segID, <-syncErr
}
Cache
- 時(shí)序數(shù)據(jù)在寫入時(shí),會(huì)先寫入到上面介紹的WAL,然后寫入到Cache,最后按照一定的策略Flush到磁盤文件【俗伲現(xiàn)在我們來介紹這個(gè)Cache纽甘。
- 這個(gè)Cache里緩存的是什么呢?
- 這個(gè)Cache用什么結(jié)構(gòu)來作內(nèi)存存儲(chǔ)抽碌?
我們下面來一一解答這些問題:
Entry
- 既然是Cache悍赢,那肯定是key-value結(jié)構(gòu),其中的key是
series key + field name
, 對(duì)應(yīng)的value就是這個(gè)key所對(duì)應(yīng)的若干個(gè)field value的集合货徙,也就組合成了一個(gè)entry,我們來看下entry的定義:
type entry struct {
mu sync.RWMutex
values Values // All stored values.
// The type of values stored. Read only so doesn't need to be protected by
// mu.
vtype byte
}
由這個(gè)定義我們可知左权,同一個(gè)entry里面的所有value
的類型都是相同的,都是這個(gè) vtype
里所保存的類型痴颊。
-
Entry
的創(chuàng)建:使用[]Value
來創(chuàng)建赏迟,比較簡單,但需要先判斷這組value的類型是否一致
func newEntryValues(values []Value) (*entry, error) {
e := &entry{}
e.values = make(Values, 0, len(values))
e.values = append(e.values, values...)
// No values, don't check types and ordering
if len(values) == 0 {
return e, nil
}
// 個(gè)人感覺應(yīng)該先校驗(yàn)這組value的類型是否一致蠢棱,不一致就不要作上面的make, append了锌杀。
et := valueType(values[0])
for _, v := range values {
// Make sure all the values are the same type
if et != valueType(v) {
return nil, tsdb.ErrFieldTypeConflict
}
}
// Set the type of values stored.
e.vtype = et
return e, nil
}
-
Entry
的add操作:和上面的創(chuàng)建類似,需要先判斷這組value的類型是否一致 -
Entry
的去重操作泻仙,去掉Values
中時(shí)間戳相同的value,只保留其中的一個(gè)
func (e *entry) deduplicate() {
e.mu.Lock()
defer e.mu.Unlock()
if len(e.values) <= 1 {
return
}
e.values = e.values.Deduplicate()
}
實(shí)際上是調(diào)用了Values.Deduplicate
抛丽,這個(gè)Values
提供了若干實(shí)用的方法,比如去掉饰豺,過濾等亿鲜。
-
Entry
過濾:過濾掉在給定時(shí)間戳范圍內(nèi)的Value
func (e *entry) filter(min, max int64) {
e.mu.Lock()
if len(e.values) > 1 {
e.values = e.values.Deduplicate()
}
e.values = e.values.Exclude(min, max)
e.mu.Unlock()
}
實(shí)際上是調(diào)用了Values.Exclude
storer
- 上面解決了Cache存什么的問題,下面我們來解決怎么存的問題。這個(gè)存儲(chǔ)器是
storer
,它是個(gè)interface,之前是實(shí)現(xiàn)了這個(gè)interface的struct都可以用來存Cache里的entry,我們先來看一下這個(gè)interface
type storer interface {
entry(key []byte) *entry // Get an entry by its key.
write(key []byte, values Values) (bool, error) // Write an entry to the store.
add(key []byte, entry *entry) // Add a new entry to the store.
remove(key []byte) // Remove an entry from the store.
keys(sorted bool) [][]byte // Return an optionally sorted slice of entry keys.
apply(f func([]byte, *entry) error) error // Apply f to all entries in the store in parallel.
applySerial(f func([]byte, *entry) error) error // Apply f to all entries in serial.
reset() // Reset the store to an initial unused state.
split(n int) []storer // Split splits the store into n stores
count() int // Count returns the number of keys in the store
}
注釋很清晰蒿柳,我們這里不累述饶套。
- 那實(shí)際是用什么來存的呢? influxdb里實(shí)現(xiàn)了
ring
,它實(shí)現(xiàn)了這個(gè)storer
的所有接口,定義在tsdb/engine/tsm1/ring.go
中垒探。簡單來說妓蛮,一個(gè)ring內(nèi)部分為若干個(gè)固定數(shù)量的桶,這里叫partition
, 一個(gè)key進(jìn)來后圾叼,按key作hash蛤克,對(duì)桶的數(shù)量取模,確定好要存在哪個(gè)桶里夷蚊,然后每個(gè)桶其實(shí)又是一個(gè)map,最后也就是將key-value存在這個(gè)桶的map里构挤。因?yàn)閏ache的添加,讀取可能很頻繁且都需要加鎖惕鼓,分桶后筋现,各個(gè)桶單獨(dú)加鎖,提升性能箱歧。代碼很簡單矾飞,這里不詳述了。