Influxdb中的Compaction操作
Compaction概述
- Influxdb的存儲引擎使用了TSM文件結(jié)構(gòu)轻掩,這其實也是在LSM-Tree基礎(chǔ)針對時序特點作了改進,因此其與LSM-Tree類似祭刚,也有MemTable, WAL和SSTable;
- 既然是類似LSM-Tree坷牛,也需要Compation, 將內(nèi)存MemTable的數(shù)據(jù)持久化到磁盤芯侥,將磁盤上的若干文件merge,以便減少文件個數(shù)缠犀,優(yōu)化讀效率;
- Influxdb的Compaction通常來說需要兩步:
- 生成一個compaction計劃,簡單來說就是生成一組可以并行compaction的文件列表;
- 針對一組tsm文件來作compation;
Compaction計劃的生成
CompactionPlanner接口
- 其實可以使用多種策略來生成這個計劃臭觉,所謂的計劃就是根據(jù)特定的規(guī)則來獲取到一組可以并行作compact的文件組昆雀。因此Influxdb首先定義了一個Interface:
type CompactionPlanner interface {
Plan(lastWrite time.Time) []CompactionGroup
PlanLevel(level int) []CompactionGroup
PlanOptimize() []CompactionGroup
Release(group []CompactionGroup)
FullyCompacted() bool
// ForceFull causes the planner to return a full compaction plan the next
// time Plan() is called if there are files that could be compacted.
ForceFull()
SetFileStore(fs *FileStore)
}
Plan
,PlanLevel
,PlanOptimize
返回的都是[]CompactionGroup
, 它的類型其實是 [][]string
, 即一組可以并行執(zhí)行Compaction操作的tsm文件路徑的列表;
CompactionPlanner的默認實現(xiàn) - DefaultPlanner
在講這個
DefaultPlanner
之前,我們先來看一下一個tsm文件的命名:000001-01.tsm
蝠筑,前面的000001
被稱為Generation
, 后面的01
被 稱為Sequence number
忆肾,也被稱為Level
tsmGeneration
類型介紹: 它封裝了屬同一個Generation的多個TSM文件
type tsmGeneration struct {
id int // Generation
files []FileStat //包含的tsm文件的信息, 并且這個files是按文件名從小到大排序好的
parseFileName ParseFileNameFunc //這個函數(shù)用來從tsm文件名中解析出Generation和Sequence number
}
因為在compact過程中針對同一個Generation,可以對應(yīng)有多個不同的sequence,比如 001-001.tsm, 001-002.tsm
, 這些都屬于同一Generation, 下一次壓縮時這兩個文件可以被客視為一個大的001-001.tsm
文件,這也就是需要這個tsmGeneration的原因
-
PlanLevel
: 針對某一level, 抽取出一組tsm文件組, 大概步驟為下:
- 根據(jù)當前file_store包含的所有tsm文件菱肖,將相同generation的文件歸于一類,生成tsmGenerations, 這是通過
fileGenerations
完成的; - 按level將上面得到的所有tsmGeneration分組, 最后得到的分組的成員是按tsmGenerations.Level()從大到小排列的
- 按PlanLevel(level int)中的level過濾上面得到的tsmGeneration group
- 將上面得到的每個tsmGeneration group中的tsmGeneratons按指定大小分堆旭从,作chunk, 這些分為的堆中的tsm文件按堆可以被并行compact稳强;
- 代碼有點多场仲,也不太直觀,大體上是這個思路;
func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
...
// Determine the generations from all files on disk. We need to treat
// a generation conceptually as a single file even though it may be
// split across several files in sequence.
// 將相同generation id的tsm文件放在一起
// generations -> tsmGenerations
generations := c.findGenerations(true)
...
// 按level把tsmGenerations分組
// 這些分完組后groups中的tsmGenerations的level從大到小排列的
var currentGen tsmGenerations
var groups []tsmGenerations
for i := 0; i < len(generations); i++ {
cur := generations[i]
// See if this generation is orphan'd which would prevent it from being further
// compacted until a final full compactin runs.
if i < len(generations)-1 {
if cur.level() < generations[i+1].level() {
currentGen = append(currentGen, cur)
continue
}
}
if len(currentGen) == 0 || currentGen.level() == cur.level() {
currentGen = append(currentGen, cur)
continue
}
groups = append(groups, currentGen)
currentGen = tsmGenerations{}
currentGen = append(currentGen, cur)
}
if len(currentGen) > 0 {
groups = append(groups, currentGen)
}
// Remove any groups in the wrong level
// level是這個函數(shù)傳進來的參數(shù)退疫,指明要compact哪一level的file,這里作個過濾
// cur.level()返回的是這個tmsGeneration中所有fileState中最小的level, 這樣作
// 合適嗎焰情?
var levelGroups []tsmGenerations
for _, cur := range groups {
if cur.level() == level {
levelGroups = append(levelGroups, cur)
}
}
minGenerations := 4
if level == 1 {
//對于level至少要有8個文件林说,才會compact
minGenerations = 8
}
//type CompactionGroup []string
var cGroups []CompactionGroup
for _, group := range levelGroups {
// 將每個tsmGenerations中的tsmGeneration按給定大小分堆
for _, chunk := range group.chunk(minGenerations) {
var cGroup CompactionGroup
var hasTombstones bool
for _, gen := range chunk {
if gen.hasTombstones() {
hasTombstones = true
}
for _, file := range gen.files {
//cGroup里存需要被分組compact的file.Path
cGroup = append(cGroup, file.Path)
}
}
// 如果當前的chunk里的tsmGeneration數(shù)不夠minGeneration大小,
// 需要用下一個chunk來湊夠這個數(shù)
// hasTombstones為true, 說明有標記刪除的,需要通過 compact 真正刪除掉
if len(chunk) < minGenerations && !hasTombstones {
continue
}
cGroups = append(cGroups, cGroup)
}
}
if !c.acquire(cGroups) {
return nil
}
return cGroups
}
-
Plan(lastWrite time.Time)
: 針對full compaction或level >= 4的generation產(chǎn)生一組tsm文件組
- 代碼可以說是又臭又長橡卤,規(guī)則讀起來說實話也不是完全明白;
- fullCompaction是有時間間隔的,滿足了這個時間間隔臭挽,作fullCompaction潜叛;而且需要根據(jù)一些條件作排除;
- 如果不作fullCompaction, 那就只針對generation.level >= 4的 generations生成compaction計劃;
- 我把代碼放在下面,里面有一些注釋:
func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
generations := c.findGenerations(true)
for _, v := range generations {
fmt.Printf("xxx | generations: %v\n", v)
}
c.mu.RLock()
forceFull := c.forceFull
c.mu.RUnlock()
// first check if we should be doing a full compaction because nothing has been written in a long time
// fullCompact是有時間間隔的坝冕,這里作判斷
// 這部分處理fullCompact的情況
if forceFull || c.compactFullWriteColdDuration > 0 && time.Since(lastWrite) > c.compactFullWriteColdDuration && len(generations) > 1 {
// Reset the full schedule if we planned because of it.
if forceFull {
c.mu.Lock()
c.forceFull = false
c.mu.Unlock()
}
var tsmFiles []string
var genCount int
for i, group := range generations {
var skip bool
// Skip the file if it's over the max size and contains a full block and it does not have any tombstones
if len(generations) > 2 && group.size() > uint64(maxTSMFileSize) &&
c.FileStore.BlockCount(group.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock &&
!group.hasTombstones() {
skip = true
}
// compressed files.
if i < len(generations)-1 {
if generations[i+1].level() <= 3 {
skip = false
}
}
if skip {
continue
}
for _, f := range group.files {
tsmFiles = append(tsmFiles, f.Path)
}
genCount += 1
}
sort.Strings(tsmFiles)
// Make sure we have more than 1 file and more than 1 generation
if len(tsmFiles) <= 1 || genCount <= 1 {
return nil
}
group := []CompactionGroup{tsmFiles}
if !c.acquire(group) {
return nil
}
return group
}
// don't plan if nothing has changed in the filestore
if c.lastPlanCheck.After(c.FileStore.LastModified()) && !generations.hasTombstones() {
return nil
}
c.lastPlanCheck = time.Now()
// If there is only one generation, return early to avoid re-compacting the same file
// over and over again.
if len(generations) <= 1 && !generations.hasTombstones() {
return nil
}
// Need to find the ending point for level 4 files. They will be the oldest files. We scan
// each generation in descending break once we see a file less than 4.
end := 0
start := 0
// 這里找到level >= 4的截至點
// 按現(xiàn)在的plan的規(guī)則徒探,這個generations是按gereration id從小到大排列的
// generation越小的包含的文件越是較老的文件,compact的話優(yōu)先選取這些文件
// 如果最小generation包含的文件的level是 < 4的喂窟,說明level 4文件還沒產(chǎn)生或者是最近剛產(chǎn)生的
for i, g := range generations {
if g.level() <= 3 {
break
}
end = i + 1
}
// As compactions run, the oldest files get bigger. We don't want to re-compact them during
// this planning if they are maxed out so skip over any we see.
var hasTombstones bool
for i, g := range generations[:end] {
if g.hasTombstones() {
hasTombstones = true
}
if hasTombstones {
continue
}
// 下面這部分主要是跳到過大的tsm文件
// Skip the file if it's over the max size and contains a full block or the generation is split
// over multiple files. In the latter case, that would mean the data in the file spilled over
// the 2GB limit.
if g.size() > uint64(maxTSMFileSize) &&
c.FileStore.BlockCount(g.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock {
start = i + 1
}
// This is an edge case that can happen after multiple compactions run. The files at the beginning
// can become larger faster than ones after them. We want to skip those really big ones and just
// compact the smaller ones until they are closer in size.
if i > 0 {
if g.size()*2 < generations[i-1].size() {
start = i
break
}
}
}
// step is how may files to compact in a group. We want to clamp it at 4 but also stil
// return groups smaller than 4.
step := 4
if step > end {
step = end
}
// slice off the generations that we'll examine
generations = generations[start:end]
// 下面這些代碼主要就是將generations分堆测暗,也就是最后要將tsm文件分堆,以便并行作compaction
// Loop through the generations in groups of size step and see if we can compact all (or
// some of them as group)
groups := []tsmGenerations{}
for i := 0; i < len(generations); i += step {
var skipGroup bool
startIndex := i
for j := i; j < i+step && j < len(generations); j++ {
gen := generations[j]
lvl := gen.level()
// Skip compacting this group if there happens to be any lower level files in the
// middle. These will get picked up by the level compactors.
if lvl <= 3 {
fmt.Printf("xxx | lvl <= 3")
skipGroup = true
break
}
// Skip the file if it's over the max size and it contains a full block
if gen.size() >= uint64(maxTSMFileSize) && c.FileStore.BlockCount(gen.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !gen.hasTombstones() {
startIndex++
continue
}
}
if skipGroup {
continue
}
endIndex := i + step
if endIndex > len(generations) {
endIndex = len(generations)
}
if endIndex-startIndex > 0 {
groups = append(groups, generations[startIndex:endIndex])
}
}
if len(groups) == 0 {
return nil
}
// With the groups, we need to evaluate whether the group as a whole can be compacted
compactable := []tsmGenerations{}
for _, group := range groups {
//if we don't have enough generations to compact, skip it
if len(group) < 4 && !group.hasTombstones() {
continue
}
compactable = append(compactable, group)
}
// All the files to be compacted must be compacted in order. We need to convert each
// group to the actual set of files in that group to be compacted.
var tsmFiles []CompactionGroup
for _, c := range compactable {
var cGroup CompactionGroup
for _, group := range c {
for _, f := range group.files {
cGroup = append(cGroup, f.Path)
}
}
sort.Strings(cGroup)
tsmFiles = append(tsmFiles, cGroup)
}
if !c.acquire(tsmFiles) {
return nil
}
return tsmFiles
}
-
針對這些compaction策略磨澡,我將一般情況用張圖表明一下碗啄,它不能涵蓋所有情況,只作為一般性參考:
Compation的執(zhí)行
Compactor-Compaction的執(zhí)行者
兩個作用:
- 將內(nèi)存的Cache(MemTable)持久化到磁盤TSM文件(SSTable), Influxdb中叫
寫快照
- 將磁盤上的多個TSM文件作merge
持久化Cache到TSM文件
Cache回顧
先回顧一下Cache的構(gòu)成稳摄,簡單說就是個Key-Value稚字,為了降低讀寫時鎖的競爭,又引入了partiton(桶)的概念秩命,每個partition里又是一個key-value的map;Key通過hash選擇一個partition
-
這里的key是series key + filed, value就是具體的存入influxdb的用戶數(shù)據(jù)
持久化就是將這些key-value存到磁盤尉共,在存之前還要作encode;
按influxdb代碼的一貫寫法,這里在寫入磁盤時需要一個iterator來遍歷所有的key-value
Cache的遍歷
- 上面的這些功能都通過
cacheKeyIterator
完成弃锐, 它提供了按key遍歷的功能袄友,并且在遍歷前已經(jīng)對Values(包含value和時間戳)作了列編碼;- 這個編譯過程會啟動多個goroutine并行進行
- 針對Cache中的每個key對應(yīng)的values,都單獨編碼霹菊,結(jié)果記錄在
c.blocks
中剧蚣,Caceh中有幾個key,c.blocks中就有幾項 - 對于同一個key的所有values,也不是統(tǒng)一編碼到一塊block中旋廷,每一個cacheBlock最多容納
c.size
個vlaues
func (c *cacheKeyIterator) encode() {
concurrency := runtime.GOMAXPROCS(0)
n := len(c.ready)
// Divide the keyset across each CPU
chunkSize := 1
idx := uint64(0)
// 啟動多個goroutine來作encode
for i := 0; i < concurrency; i++ {
// Run one goroutine per CPU and encode a section of the key space concurrently
go func() {
// 獲取Time, Float, Boolean, Unsigned, String, Iterger的編碼器
tenc := getTimeEncoder(tsdb.DefaultMaxPointsPerBlock)
fenc := getFloatEncoder(tsdb.DefaultMaxPointsPerBlock)
benc := getBooleanEncoder(tsdb.DefaultMaxPointsPerBlock)
uenc := getUnsignedEncoder(tsdb.DefaultMaxPointsPerBlock)
senc := getStringEncoder(tsdb.DefaultMaxPointsPerBlock)
ienc := getIntegerEncoder(tsdb.DefaultMaxPointsPerBlock)
defer putTimeEncoder(tenc)
defer putFloatEncoder(fenc)
defer putBooleanEncoder(benc)
defer putUnsignedEncoder(uenc)
defer putStringEncoder(senc)
defer putIntegerEncoder(ienc)
for {
i := int(atomic.AddUint64(&idx, uint64(chunkSize))) - chunkSize
if i >= n {
break
}
key := c.order[i]
values := c.cache.values(key)
for len(values) > 0 {
//每次最多編碼c.size個value
end := len(values)
if end > c.size {
end = c.size
}
minTime, maxTime := values[0].UnixNano(), values[end-1].UnixNano()
var b []byte
var err error
switch values[0].(type) {
case FloatValue:
b, err = encodeFloatBlockUsing(nil, values[:end], tenc, fenc)
case IntegerValue:
b, err = encodeIntegerBlockUsing(nil, values[:end], tenc, ienc)
case UnsignedValue:
b, err = encodeUnsignedBlockUsing(nil, values[:end], tenc, uenc)
case BooleanValue:
b, err = encodeBooleanBlockUsing(nil, values[:end], tenc, benc)
case StringValue:
b, err = encodeStringBlockUsing(nil, values[:end], tenc, senc)
default:
b, err = Values(values[:end]).Encode(nil)
}
// 更新values為剩余未編碼的
values = values[end:]
// 每個key對應(yīng)c.blocks中的一項鸠按,里面存儲的是cacheBlock
c.blocks[i] = append(c.blocks[i], cacheBlock{
k: key,
minTime: minTime,
maxTime: maxTime,
b: b,
err: err,
})
if err != nil {
c.err = err
}
}
// Notify this key is fully encoded
// 對于每個key, 如果全部編碼完成,就向這個key對應(yīng)的chan中寫入數(shù)據(jù)饶碘,通知其編碼完成
c.ready[i] <- struct{}{}
}
}()
}
}
- 編碼結(jié)果的遍歷
-
Next()
:
func (c *cacheKeyIterator) Next() bool {
//c.i的初值是 -1, 第一次調(diào)用或當前c.blocks[c.i]中已讀取完目尖,則下面的if不會進入
if c.i >= 0 && c.i < len(c.ready) && len(c.blocks[c.i]) > 0 {
c.blocks[c.i] = c.blocks[c.i][1:]
if len(c.blocks[c.i]) > 0 {
return true
}
}
c.i++
if c.i >= len(c.ready) {
return false
}
// 這里阻塞等待對應(yīng)的key編碼完成
<-c.ready[c.i]
return true
}
-
read
讀取:
func (c *cacheKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
// See if snapshot compactions were disabled while we were running.
select {
case <-c.interrupt:
c.err = errCompactionAborted{}
return nil, 0, 0, nil, c.err
default:
}
blk := c.blocks[c.i][0]
return blk.k, blk.minTime, blk.maxTime, blk.b, blk.err
}
- Cache的Compaction操作:
- 先根據(jù)cache的規(guī)模和cache產(chǎn)生的速度確定是否需要作流控和compact的并發(fā)度
- 根據(jù)并發(fā)度將Cache分裂成若干個小規(guī)模Cache,每個小Cache對應(yīng)一個goroutine來作compaction
- compaction過程是通過遍歷相應(yīng)的cacheKeyIterator來寫入文件
c.writeNewFiles
- 對于每個并發(fā)執(zhí)行的
c.writeNewFiles
, 都對應(yīng)不同的Generation, Sequence number都從0開始
// 將Cache的內(nèi)容寫入到 *.tsm.tmp文件中
// cache中value過多的話扎运,會將cache作split成多個cache,并行處理瑟曲,每個splited cache有自己的generation
func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
c.mu.RLock()
enabled := c.snapshotsEnabled
intC := c.snapshotsInterrupt
c.mu.RUnlock()
if !enabled {
return nil, errSnapshotsDisabled
}
start := time.Now()
// cache.Count() 返回cache的所有的 value的個數(shù)
card := cache.Count()
// Enable throttling if we have lower cardinality or snapshots are going fast.
// 3e6 = 3 x 10的6次方
// compaction過程是否要作流控
throttle := card < 3e6 && c.snapshotLatencies.avg() < 15*time.Second
// Write snapshost concurrently if cardinality is relatively high.
concurrency := card / 2e6
if concurrency < 1 {
concurrency = 1
}
// Special case very high cardinality, use max concurrency and don't throttle writes.
if card >= 3e6 {
concurrency = 4
throttle = false
}
splits := cache.Split(concurrency)
type res struct {
files []string
err error
}
resC := make(chan res, concurrency)
for i := 0; i < concurrency; i++ {
go func(sp *Cache) {
iter := NewCacheKeyIterator(sp, tsdb.DefaultMaxPointsPerBlock, intC)
files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, nil, iter, throttle)
resC <- res{files: files, err: err}
}(splits[i])
}
var err error
files := make([]string, 0, concurrency)
for i := 0; i < concurrency; i++ {
result := <-resC
if result.err != nil {
err = result.err
}
files = append(files, result.files...)
}
...
return files, err
}
- 遍歷
keyIterator
,將編碼后的block寫入到tsm文件writeNewFiles
- 主要就是調(diào)用tsmWriter的方法寫入文件
- 寫入文件時先寫具體的block, 再寫索引
- 文件的大小或block數(shù)達到上限時饮戳,切下一個文件
func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool) ([]string, error) {
// These are the new TSM files written
var files []string
for {
// sequence + 1, 這個sequence其實就是 level
sequence++
// 這里寫入的文件的命名為 *.tsm.tmp
// 它在作fullCompact時被重命名為 *.tsm
fileName := filepath.Join(c.Dir, c.formatFileName(generation, sequence)+"."+TSMFileExtension+"."+TmpTSMFileExtension)
// Write as much as possible to this file
// c.write實現(xiàn)了實際的寫入操作
err := c.write(fileName, iter, throttle)
// We've hit the max file limit and there is more to write. Create a new file
// and continue.
// 寫入的文件大小或block數(shù)達到上限,就切下一個文件洞拨,sequence + 1
if err == errMaxFileExceeded || err == ErrMaxBlocksExceeded {
files = append(files, fileName)
continue
} else if err == ErrNoValues {
// ErrNoValues意味著沒有有效的value, 只有tombstoned entires, 就不寫入文件
// If the file only contained tombstoned entries, then it would be a 0 length
// file that we can drop.
if err := os.RemoveAll(fileName); err != nil {
return nil, err
}
break
} else if _, ok := err.(errCompactionInProgress); ok {
// Don't clean up the file as another compaction is using it. This should not happen as the
// planner keeps track of which files are assigned to compaction plans now.
return nil, err
} else if err != nil {
// Remove any tmp files we already completed
for _, f := range files {
if err := os.RemoveAll(f); err != nil {
return nil, err
}
}
// We hit an error and didn't finish the compaction. Remove the temp file and abort.
if err := os.RemoveAll(fileName); err != nil {
return nil, err
}
return nil, err
}
files = append(files, fileName)
break
}
return files, nil
}
多個tsm文件的compaction
概述
我們先來簡單講一下這個compaction的過程扯罐,這類似于歸并合并操作,每個tsm文件中的keys在其索引中都是從小到小排序的烦衣,compaction時就是將多個文件中的相同key的block合并在一起歹河,再生成新的索引,說起來就是這么簡單花吟,但influxdb在實現(xiàn)時為了效率等作了一些額外的策略;
tsmBatchKeyIterator
- 和上面的Cache的compatcon一樣秸歧,這里也需要一個Iterator:
tsmBatchKeyIterator
, 它用來同時遍歷多個tsm文件, 這個是compaction過程的精華所在
tsmBatchKeyIterator的遍歷
- 先將各tsm文件中的第一個key對應(yīng)的block一一取出
- 掃描1中獲取到的所有每一個key,確定一個當前最小的key
- 從1中獲取到的所有block中提取出key等于2中獲取的最小key的block,存在
k.blocks
中 - 對3中獲取的所有block作merge, 主要是按minTime排序示辈,這樣基本就完成了一個Next的操作
- 具體代碼如下寥茫,我在里面加了注釋
func (k *tsmBatchKeyIterator) Next() bool {
RETRY:
// Any merged blocks pending?
if len(k.merged) > 0 {
k.merged = k.merged[1:]
if len(k.merged) > 0 {
return true
}
}
// Any merged values pending?
if k.hasMergedValues() {
k.merge()
if len(k.merged) > 0 || k.hasMergedValues() {
return true
}
}
// If we still have blocks from the last read, merge them
if len(k.blocks) > 0 {
k.merge()
if len(k.merged) > 0 || k.hasMergedValues() {
return true
}
}
// Read the next block from each TSM iterator
// 讀每一個tsm文件,將其第一組block都存到k.buf里矾麻,看起來是要合并排序
// 每個tsm文件對應(yīng)一個blocks
// 這個blocks和tsm的index是一樣的纱耻,是按key從小到大排序的
for i, v := range k.buf {
if len(v) != 0 {
continue
}
iter := k.iterators[i]
if iter.Next() {
key, minTime, maxTime, typ, _, b, err := iter.Read()
if err != nil {
k.err = err
}
// This block may have ranges of time removed from it that would
// reduce the block min and max time.
// 這個tombstones是[]TimeRange
tombstones := iter.r.TombstoneRange(key)
var blk *block
// k.buf[i]的類型是[]blocks -> [][]block
// 下面這段邏輯,就是不斷向k.buf[i]中append新的bolck
// 如果k.buf[i]需要擴容险耀,就在append時擴,擴為原有cap的二倍
if cap(k.buf[i]) > len(k.buf[i]) {
k.buf[i] = k.buf[i][:len(k.buf[i])+1]
blk = k.buf[i][len(k.buf[i])-1]
if blk == nil {
blk = &block{}
k.buf[i][len(k.buf[i])-1] = blk
}
} else {
blk = &block{}
k.buf[i] = append(k.buf[i], blk)
}
blk.minTime = minTime
blk.maxTime = maxTime
blk.key = key
blk.typ = typ
blk.b = b
blk.tombstones = tombstones
blk.readMin = math.MaxInt64
blk.readMax = math.MinInt64
blockKey := key
// 如果這兩個key相等甩牺,說明還沒有遍歷完當前的block
for bytes.Equal(iter.PeekNext(), blockKey) {
iter.Next()
key, minTime, maxTime, typ, _, b, err := iter.Read()
if err != nil {
k.err = err
}
tombstones := iter.r.TombstoneRange(key)
var blk *block
if cap(k.buf[i]) > len(k.buf[i]) {
k.buf[i] = k.buf[i][:len(k.buf[i])+1]
blk = k.buf[i][len(k.buf[i])-1]
if blk == nil {
blk = &block{}
k.buf[i][len(k.buf[i])-1] = blk
}
} else {
blk = &block{}
k.buf[i] = append(k.buf[i], blk)
}
blk.minTime = minTime
blk.maxTime = maxTime
blk.key = key
blk.typ = typ
blk.b = b
blk.tombstones = tombstones
blk.readMin = math.MaxInt64
blk.readMax = math.MinInt64
}
}
if iter.Err() != nil {
k.err = iter.Err()
}
}
// Each reader could have a different key that it's currently at, need to find
// the next smallest one to keep the sort ordering.
// 找出當前最小的key(series key + field)
// 因為k.buf中的每個blocks都是按key從小到大排好的蘑志,
// 所以這里只需看每個blocks[0]
var minKey []byte
var minType byte
for _, b := range k.buf {
// block could be nil if the iterator has been exhausted for that file
if len(b) == 0 {
continue
}
if len(minKey) == 0 || bytes.Compare(b[0].key, minKey) < 0 {
minKey = b[0].key
minType = b[0].typ
}
}
k.key = minKey
k.typ = minType
// Now we need to find all blocks that match the min key so we can combine and dedupe
// the blocks if necessary
// 把key都等于上面獲取的minKey的block放到k.blocks中
for i, b := range k.buf {
if len(b) == 0 {
continue
}
//b[0]即為當前的k.buf[i][0], 是一個block
// b是[]block
if bytes.Equal(b[0].key, k.key) {
//k.blocks => []block
// b => []block
k.blocks = append(k.blocks, b...)
//k.buf[i]的length被reset為0, 即已有的數(shù)據(jù)被清掉
k.buf[i] = k.buf[i][:0]
}
}
if len(k.blocks) == 0 {
return false
}
k.merge()
// After merging all the values for this key, we might not have any. (e.g. they were all deleted
// through many tombstones). In this case, move on to the next key instead of ending iteration.
if len(k.merged) == 0 {
goto RETRY
}
return len(k.merged) > 0
}
tsmBtchKeyIterator的合并
- 當前需要合并的block都存在
k.blocks
里,先將其按block.minTime
排序; - 判斷是否需要去重贬派,如果
k.blocks
中的block在[minTime, maxTime]上有重疊或者某個block有tombstones急但,就都需要重構(gòu)這些block,需要作去重,刪除,重排操作, 相當于將所有的block按minTime重新組合排序; - 我們來看下關(guān)鍵代碼搞乏,里面我添加了一些注釋
func (k *tsmBatchKeyIterator) combineFloat(dedup bool) blocks {
if dedup {
//實現(xiàn)了按minTime來排序,去重
for k.mergedFloatValues.Len() < k.size && len(k.blocks) > 0 {
// 去除已經(jīng)讀取過的block
for len(k.blocks) > 0 && k.blocks[0].read() {
k.blocks = k.blocks[1:]
}
if len(k.blocks) == 0 {
break
}
first := k.blocks[0]
minTime := first.minTime
maxTime := first.maxTime
// Adjust the min time to the start of any overlapping blocks.
// 其實i可以從1開始
// 為了按minTime排序波桩,需要確定一個全局最小范圍的[minTime, maxTime]
for i := 0; i < len(k.blocks); i++ {
if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() {
if k.blocks[i].minTime < minTime {
minTime = k.blocks[i].minTime
}
// 將最大值減小
if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime {
maxTime = k.blocks[i].maxTime
}
}
}
// We have some overlapping blocks so decode all, append in order and then dedup
// 按上面確定的[minTime, maxTime]在所有的blocks中撈數(shù)據(jù)
for i := 0; i < len(k.blocks); i++ {
if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() {
continue
}
var v tsdb.FloatArray
var err error
if err = DecodeFloatArrayBlock(k.blocks[i].b, &v); err != nil {
k.err = err
return nil
}
// Remove values we already read
v.Exclude(k.blocks[i].readMin, k.blocks[i].readMax)
// Filter out only the values for overlapping block
// 這個Include是不是可以不用調(diào)用
v.Include(minTime, maxTime)
if v.Len() > 0 {
// Record that we read a subset of the block
k.blocks[i].markRead(v.MinTime(), v.MaxTime())
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v.Exclude(ts.Min, ts.Max)
}
k.mergedFloatValues.Merge(&v)
}
}
// Since we combined multiple blocks, we could have more values than we should put into
// a single block. We need to chunk them up into groups and re-encode them.
return k.chunkFloat(nil)
}
var i int
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
// If we this block is already full, just add it as is
// 遇到一個不full的Block就break, 那如果后續(xù)還有full的block怎么辦?
if BlockCount(k.blocks[i].b) >= k.size {
k.merged = append(k.merged, k.blocks[i])
} else {
break
}
i++
}
if k.fast {
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
k.merged = append(k.merged, k.blocks[i])
i++
}
}
// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
if i == len(k.blocks)-1 {
if !k.blocks[i].read() {
k.merged = append(k.merged, k.blocks[i])
}
i++
}
// The remaining blocks can be combined and we know that they do not overlap and
// so we can just append each, sort and re-encode.
for i < len(k.blocks) && k.mergedFloatValues.Len() < k.size {
if k.blocks[i].read() {
i++
continue
}
var v tsdb.FloatArray
if err := DecodeFloatArrayBlock(k.blocks[i].b, &v); err != nil {
k.err = err
return nil
}
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v.Exclude(ts.Min, ts.Max)
}
k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)
k.mergedFloatValues.Merge(&v)
i++
}
k.blocks = k.blocks[i:]
return k.chunkFloat(k.merged)
}