Influxdb中的Compaction操作

Influxdb中的Compaction操作

Compaction概述

  • Influxdb的存儲引擎使用了TSM文件結(jié)構(gòu)轻掩,這其實也是在LSM-Tree基礎(chǔ)針對時序特點作了改進,因此其與LSM-Tree類似祭刚,也有MemTable, WAL和SSTable;
  • 既然是類似LSM-Tree坷牛,也需要Compation, 將內(nèi)存MemTable的數(shù)據(jù)持久化到磁盤芯侥,將磁盤上的若干文件merge,以便減少文件個數(shù)缠犀,優(yōu)化讀效率;
  • Influxdb的Compaction通常來說需要兩步:
    1. 生成一個compaction計劃,簡單來說就是生成一組可以并行compaction的文件列表;
    2. 針對一組tsm文件來作compation;

Compaction計劃的生成

CompactionPlanner接口
  • 其實可以使用多種策略來生成這個計劃臭觉,所謂的計劃就是根據(jù)特定的規(guī)則來獲取到一組可以并行作compact的文件組昆雀。因此Influxdb首先定義了一個Interface:
type CompactionPlanner interface {
    Plan(lastWrite time.Time) []CompactionGroup
    PlanLevel(level int) []CompactionGroup
    PlanOptimize() []CompactionGroup
    Release(group []CompactionGroup)
    FullyCompacted() bool

    // ForceFull causes the planner to return a full compaction plan the next
    // time Plan() is called if there are files that could be compacted.
    ForceFull()

    SetFileStore(fs *FileStore)
}

Plan,PlanLevel,PlanOptimize返回的都是[]CompactionGroup, 它的類型其實是 [][]string, 即一組可以并行執(zhí)行Compaction操作的tsm文件路徑的列表;

CompactionPlanner的默認實現(xiàn) - DefaultPlanner
  • 在講這個DefaultPlanner之前,我們先來看一下一個tsm文件的命名:000001-01.tsm蝠筑,前面的000001被稱為Generation, 后面的01被 稱為Sequence number忆肾,也被稱為Level

  • tsmGeneration類型介紹: 它封裝了屬同一個Generation的多個TSM文件

type tsmGeneration struct {
    id            int // Generation
    files         []FileStat //包含的tsm文件的信息, 并且這個files是按文件名從小到大排序好的
    parseFileName ParseFileNameFunc //這個函數(shù)用來從tsm文件名中解析出Generation和Sequence number
}

因為在compact過程中針對同一個Generation,可以對應(yīng)有多個不同的sequence,比如 001-001.tsm, 001-002.tsm, 這些都屬于同一Generation, 下一次壓縮時這兩個文件可以被客視為一個大的001-001.tsm文件,這也就是需要這個tsmGeneration的原因

  • PlanLevel: 針對某一level, 抽取出一組tsm文件組, 大概步驟為下:
  1. 根據(jù)當前file_store包含的所有tsm文件菱肖,將相同generation的文件歸于一類,生成tsmGenerations, 這是通過fileGenerations完成的;
  2. 按level將上面得到的所有tsmGeneration分組, 最后得到的分組的成員是按tsmGenerations.Level()從大到小排列的
  3. 按PlanLevel(level int)中的level過濾上面得到的tsmGeneration group
  4. 將上面得到的每個tsmGeneration group中的tsmGeneratons按指定大小分堆旭从,作chunk, 這些分為的堆中的tsm文件按堆可以被并行compact稳强;
  5. 代碼有點多场仲,也不太直觀,大體上是這個思路;
func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
    ... 
    
    // Determine the generations from all files on disk.  We need to treat
    // a generation conceptually as a single file even though it may be
    // split across several files in sequence.
    // 將相同generation id的tsm文件放在一起
    // generations -> tsmGenerations
    generations := c.findGenerations(true)
    ...
    
    // 按level把tsmGenerations分組
    // 這些分完組后groups中的tsmGenerations的level從大到小排列的
    var currentGen tsmGenerations
    var groups []tsmGenerations
    for i := 0; i < len(generations); i++ {
        cur := generations[i]

        // See if this generation is orphan'd which would prevent it from being further
        // compacted until a final full compactin runs.
        if i < len(generations)-1 {
            if cur.level() < generations[i+1].level() {
                currentGen = append(currentGen, cur)
                continue
            }
        }

        if len(currentGen) == 0 || currentGen.level() == cur.level() {
            currentGen = append(currentGen, cur)
            continue
        }
        groups = append(groups, currentGen)
        currentGen = tsmGenerations{}
        currentGen = append(currentGen, cur)
    }
    if len(currentGen) > 0 {
        groups = append(groups, currentGen)
    }

    // Remove any groups in the wrong level
    // level是這個函數(shù)傳進來的參數(shù)退疫,指明要compact哪一level的file,這里作個過濾
    // cur.level()返回的是這個tmsGeneration中所有fileState中最小的level, 這樣作
    // 合適嗎焰情?
    var levelGroups []tsmGenerations
    for _, cur := range groups {
        if cur.level() == level {
            levelGroups = append(levelGroups, cur)
        }
    }

    minGenerations := 4
    if level == 1 {
        //對于level至少要有8個文件林说,才會compact
        minGenerations = 8
    }

    //type CompactionGroup []string
    var cGroups []CompactionGroup
    for _, group := range levelGroups {
        // 將每個tsmGenerations中的tsmGeneration按給定大小分堆
        for _, chunk := range group.chunk(minGenerations) {
            var cGroup CompactionGroup
            var hasTombstones bool
            for _, gen := range chunk {
                if gen.hasTombstones() {
                    hasTombstones = true
                }
                for _, file := range gen.files {
                    //cGroup里存需要被分組compact的file.Path
                    cGroup = append(cGroup, file.Path)
                }
            }

            // 如果當前的chunk里的tsmGeneration數(shù)不夠minGeneration大小,
            // 需要用下一個chunk來湊夠這個數(shù)
            // hasTombstones為true, 說明有標記刪除的,需要通過 compact 真正刪除掉
            if len(chunk) < minGenerations && !hasTombstones {
                continue
            }

            cGroups = append(cGroups, cGroup)
        }
    }

    if !c.acquire(cGroups) {
        return nil
    }
    return cGroups
}
  • Plan(lastWrite time.Time): 針對full compaction或level >= 4的generation產(chǎn)生一組tsm文件組
  1. 代碼可以說是又臭又長橡卤,規(guī)則讀起來說實話也不是完全明白;
  2. fullCompaction是有時間間隔的,滿足了這個時間間隔臭挽,作fullCompaction潜叛;而且需要根據(jù)一些條件作排除;
  3. 如果不作fullCompaction, 那就只針對generation.level >= 4的 generations生成compaction計劃;
  4. 我把代碼放在下面,里面有一些注釋:
func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
    generations := c.findGenerations(true)

    for _, v := range generations {
        fmt.Printf("xxx | generations: %v\n", v)
    }

    c.mu.RLock()
    forceFull := c.forceFull
    c.mu.RUnlock()

    // first check if we should be doing a full compaction because nothing has been written in a long time
    // fullCompact是有時間間隔的坝冕,這里作判斷
    // 這部分處理fullCompact的情況
    if forceFull || c.compactFullWriteColdDuration > 0 && time.Since(lastWrite) > c.compactFullWriteColdDuration && len(generations) > 1 {

        // Reset the full schedule if we planned because of it.
        if forceFull {
            c.mu.Lock()
            c.forceFull = false
            c.mu.Unlock()
        }

        var tsmFiles []string
        var genCount int
        for i, group := range generations {
            var skip bool

            // Skip the file if it's over the max size and contains a full block and it does not have any tombstones
            if len(generations) > 2 && group.size() > uint64(maxTSMFileSize) &&
                c.FileStore.BlockCount(group.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock &&
                !group.hasTombstones() {
                skip = true
            }

            // compressed files.
            if i < len(generations)-1 {
                if generations[i+1].level() <= 3 {
                    skip = false
                }
            }

            if skip {
                continue
            }

            for _, f := range group.files {
                tsmFiles = append(tsmFiles, f.Path)
            }
            genCount += 1
        }
        sort.Strings(tsmFiles)

        // Make sure we have more than 1 file and more than 1 generation
        if len(tsmFiles) <= 1 || genCount <= 1 {
            return nil
        }

        group := []CompactionGroup{tsmFiles}
        if !c.acquire(group) {
            return nil
        }
        return group
    }

    // don't plan if nothing has changed in the filestore
    if c.lastPlanCheck.After(c.FileStore.LastModified()) && !generations.hasTombstones() {
        return nil
    }

    c.lastPlanCheck = time.Now()

    // If there is only one generation, return early to avoid re-compacting the same file
    // over and over again.
    if len(generations) <= 1 && !generations.hasTombstones() {
        return nil
    }

    // Need to find the ending point for level 4 files.  They will be the oldest files. We scan
    // each generation in descending break once we see a file less than 4.
    end := 0
    start := 0
    // 這里找到level >= 4的截至點
        // 按現(xiàn)在的plan的規(guī)則徒探,這個generations是按gereration id從小到大排列的
    // generation越小的包含的文件越是較老的文件,compact的話優(yōu)先選取這些文件
    // 如果最小generation包含的文件的level是 < 4的喂窟,說明level 4文件還沒產(chǎn)生或者是最近剛產(chǎn)生的
    for i, g := range generations {
        if g.level() <= 3 {
            break
        }
        end = i + 1
    }

    // As compactions run, the oldest files get bigger.  We don't want to re-compact them during
    // this planning if they are maxed out so skip over any we see.
    var hasTombstones bool
    for i, g := range generations[:end] {
        if g.hasTombstones() {
            hasTombstones = true
        }

        if hasTombstones {
            continue
        }

        // 下面這部分主要是跳到過大的tsm文件
        // Skip the file if it's over the max size and contains a full block or the generation is split
        // over multiple files.  In the latter case, that would mean the data in the file spilled over
        // the 2GB limit.
        if g.size() > uint64(maxTSMFileSize) &&
            c.FileStore.BlockCount(g.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock {
            start = i + 1
        }

        // This is an edge case that can happen after multiple compactions run.  The files at the beginning
        // can become larger faster than ones after them.  We want to skip those really big ones and just
        // compact the smaller ones until they are closer in size.
        if i > 0 {
            if g.size()*2 < generations[i-1].size() {
                start = i
                break
            }
        }
    }

    // step is how may files to compact in a group.  We want to clamp it at 4 but also stil
    // return groups smaller than 4.
    step := 4
    if step > end {
        step = end
    }

    // slice off the generations that we'll examine
    generations = generations[start:end]

    // 下面這些代碼主要就是將generations分堆测暗,也就是最后要將tsm文件分堆,以便并行作compaction
    // Loop through the generations in groups of size step and see if we can compact all (or
    // some of them as group)
    groups := []tsmGenerations{}
    for i := 0; i < len(generations); i += step {
        var skipGroup bool
        startIndex := i

        for j := i; j < i+step && j < len(generations); j++ {
            gen := generations[j]
            lvl := gen.level()

            // Skip compacting this group if there happens to be any lower level files in the
            // middle.  These will get picked up by the level compactors.
            if lvl <= 3 {
                fmt.Printf("xxx | lvl <= 3")
                skipGroup = true
                break
            }

            // Skip the file if it's over the max size and it contains a full block
            if gen.size() >= uint64(maxTSMFileSize) && c.FileStore.BlockCount(gen.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !gen.hasTombstones() {
                startIndex++
                continue
            }
        }

        if skipGroup {
            continue
        }

        endIndex := i + step
        if endIndex > len(generations) {
            endIndex = len(generations)
        }
        if endIndex-startIndex > 0 {
            groups = append(groups, generations[startIndex:endIndex])
        }
    }

    if len(groups) == 0 {
        return nil
    }

    // With the groups, we need to evaluate whether the group as a whole can be compacted
    compactable := []tsmGenerations{}
    for _, group := range groups {
        //if we don't have enough generations to compact, skip it
        if len(group) < 4 && !group.hasTombstones() {
            continue
        }
        compactable = append(compactable, group)
    }

    // All the files to be compacted must be compacted in order.  We need to convert each
    // group to the actual set of files in that group to be compacted.
    var tsmFiles []CompactionGroup
    for _, c := range compactable {
        var cGroup CompactionGroup
        for _, group := range c {
            for _, f := range group.files {
                cGroup = append(cGroup, f.Path)
            }
        }
        sort.Strings(cGroup)
        tsmFiles = append(tsmFiles, cGroup)
    }

    if !c.acquire(tsmFiles) {
        return nil
    }
    return tsmFiles
}
  • 針對這些compaction策略磨澡,我將一般情況用張圖表明一下碗啄,它不能涵蓋所有情況,只作為一般性參考:


    5206a8b540ac4adc8a69d980bb9fb523.jpg

Compation的執(zhí)行

Compactor-Compaction的執(zhí)行者

兩個作用:

  • 將內(nèi)存的Cache(MemTable)持久化到磁盤TSM文件(SSTable), Influxdb中叫寫快照
  • 將磁盤上的多個TSM文件作merge
持久化Cache到TSM文件
Cache回顧
  • 先回顧一下Cache的構(gòu)成稳摄,簡單說就是個Key-Value稚字,為了降低讀寫時鎖的競爭,又引入了partiton(桶)的概念秩命,每個partition里又是一個key-value的map;Key通過hash選擇一個partition

  • 這里的key是series key + filed, value就是具體的存入influxdb的用戶數(shù)據(jù)


    618da1d984c8d48961950ab9bd681b31.jpg
  • 持久化就是將這些key-value存到磁盤尉共,在存之前還要作encode;

  • 按influxdb代碼的一貫寫法,這里在寫入磁盤時需要一個iterator來遍歷所有的key-value

Cache的遍歷
  • 上面的這些功能都通過cacheKeyIterator完成弃锐, 它提供了按key遍歷的功能袄友,并且在遍歷前已經(jīng)對Values(包含value和時間戳)作了列編碼;
    1. 這個編譯過程會啟動多個goroutine并行進行
    2. 針對Cache中的每個key對應(yīng)的values,都單獨編碼霹菊,結(jié)果記錄在c.blocks中剧蚣,Caceh中有幾個key,c.blocks中就有幾項
    3. 對于同一個key的所有values,也不是統(tǒng)一編碼到一塊block中旋廷,每一個cacheBlock最多容納c.size個vlaues
func (c *cacheKeyIterator) encode() {
    concurrency := runtime.GOMAXPROCS(0)
    n := len(c.ready)

    // Divide the keyset across each CPU
    chunkSize := 1
    idx := uint64(0)

    // 啟動多個goroutine來作encode
    for i := 0; i < concurrency; i++ {
        // Run one goroutine per CPU and encode a section of the key space concurrently
        go func() {
            // 獲取Time, Float, Boolean, Unsigned, String, Iterger的編碼器
            tenc := getTimeEncoder(tsdb.DefaultMaxPointsPerBlock)
            fenc := getFloatEncoder(tsdb.DefaultMaxPointsPerBlock)
            benc := getBooleanEncoder(tsdb.DefaultMaxPointsPerBlock)
            uenc := getUnsignedEncoder(tsdb.DefaultMaxPointsPerBlock)
            senc := getStringEncoder(tsdb.DefaultMaxPointsPerBlock)
            ienc := getIntegerEncoder(tsdb.DefaultMaxPointsPerBlock)

            defer putTimeEncoder(tenc)
            defer putFloatEncoder(fenc)
            defer putBooleanEncoder(benc)
            defer putUnsignedEncoder(uenc)
            defer putStringEncoder(senc)
            defer putIntegerEncoder(ienc)

            for {
                i := int(atomic.AddUint64(&idx, uint64(chunkSize))) - chunkSize

                if i >= n {
                    break
                }

                key := c.order[i]
                values := c.cache.values(key)

                for len(values) > 0 {

                    //每次最多編碼c.size個value
                    end := len(values)
                    if end > c.size {
                        end = c.size
                    }

                    minTime, maxTime := values[0].UnixNano(), values[end-1].UnixNano()
                    var b []byte
                    var err error

                    switch values[0].(type) {
                    case FloatValue:
                        b, err = encodeFloatBlockUsing(nil, values[:end], tenc, fenc)
                    case IntegerValue:
                        b, err = encodeIntegerBlockUsing(nil, values[:end], tenc, ienc)
                    case UnsignedValue:
                        b, err = encodeUnsignedBlockUsing(nil, values[:end], tenc, uenc)
                    case BooleanValue:
                        b, err = encodeBooleanBlockUsing(nil, values[:end], tenc, benc)
                    case StringValue:
                        b, err = encodeStringBlockUsing(nil, values[:end], tenc, senc)
                    default:
                        b, err = Values(values[:end]).Encode(nil)
                    }

                    // 更新values為剩余未編碼的
                    values = values[end:]

                    // 每個key對應(yīng)c.blocks中的一項鸠按,里面存儲的是cacheBlock
                    c.blocks[i] = append(c.blocks[i], cacheBlock{
                        k:       key,
                        minTime: minTime,
                        maxTime: maxTime,
                        b:       b,
                        err:     err,
                    })

                    if err != nil {
                        c.err = err
                    }
                }
                
                // Notify this key is fully encoded
                // 對于每個key, 如果全部編碼完成,就向這個key對應(yīng)的chan中寫入數(shù)據(jù)饶碘,通知其編碼完成
                c.ready[i] <- struct{}{}
            }
        }()
    }
}
  • 編碼結(jié)果的遍歷
  1. Next():
func (c *cacheKeyIterator) Next() bool {
   //c.i的初值是 -1, 第一次調(diào)用或當前c.blocks[c.i]中已讀取完目尖,則下面的if不會進入
   if c.i >= 0 && c.i < len(c.ready) && len(c.blocks[c.i]) > 0 {
       c.blocks[c.i] = c.blocks[c.i][1:]
       if len(c.blocks[c.i]) > 0 {
           return true
       }
   }
   c.i++

   if c.i >= len(c.ready) {
       return false
   }

   // 這里阻塞等待對應(yīng)的key編碼完成
   <-c.ready[c.i]
   return true
}
  1. read讀取:
func (c *cacheKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
   // See if snapshot compactions were disabled while we were running.
   select {
   case <-c.interrupt:
       c.err = errCompactionAborted{}
       return nil, 0, 0, nil, c.err
   default:
   }

   blk := c.blocks[c.i][0]
   return blk.k, blk.minTime, blk.maxTime, blk.b, blk.err
}
  • Cache的Compaction操作:
    1. 先根據(jù)cache的規(guī)模和cache產(chǎn)生的速度確定是否需要作流控和compact的并發(fā)度
    2. 根據(jù)并發(fā)度將Cache分裂成若干個小規(guī)模Cache,每個小Cache對應(yīng)一個goroutine來作compaction
    3. compaction過程是通過遍歷相應(yīng)的cacheKeyIterator來寫入文件c.writeNewFiles
    4. 對于每個并發(fā)執(zhí)行的c.writeNewFiles, 都對應(yīng)不同的Generation, Sequence number都從0開始
// 將Cache的內(nèi)容寫入到 *.tsm.tmp文件中
// cache中value過多的話扎运,會將cache作split成多個cache,并行處理瑟曲,每個splited cache有自己的generation
func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {
    c.mu.RLock()
    enabled := c.snapshotsEnabled
    intC := c.snapshotsInterrupt
    c.mu.RUnlock()

    if !enabled {
        return nil, errSnapshotsDisabled
    }

    start := time.Now()
    // cache.Count() 返回cache的所有的 value的個數(shù)
    card := cache.Count()

    // Enable throttling if we have lower cardinality or snapshots are going fast.
    // 3e6 = 3 x 10的6次方
    // compaction過程是否要作流控
    throttle := card < 3e6 && c.snapshotLatencies.avg() < 15*time.Second

    // Write snapshost concurrently if cardinality is relatively high.
    concurrency := card / 2e6
    if concurrency < 1 {
        concurrency = 1
    }

    // Special case very high cardinality, use max concurrency and don't throttle writes.
    if card >= 3e6 {
        concurrency = 4
        throttle = false
    }

    splits := cache.Split(concurrency)

    type res struct {
        files []string
        err   error
    }

    resC := make(chan res, concurrency)
    for i := 0; i < concurrency; i++ {
        go func(sp *Cache) {
            iter := NewCacheKeyIterator(sp, tsdb.DefaultMaxPointsPerBlock, intC)
            files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, nil, iter, throttle)
            resC <- res{files: files, err: err}

        }(splits[i])
    }

    var err error
    files := make([]string, 0, concurrency)
    for i := 0; i < concurrency; i++ {
        result := <-resC
        if result.err != nil {
            err = result.err
        }
        files = append(files, result.files...)
    }

    ... 

    return files, err
}
  • 遍歷keyIterator,將編碼后的block寫入到tsm文件 writeNewFiles
    1. 主要就是調(diào)用tsmWriter的方法寫入文件
    2. 寫入文件時先寫具體的block, 再寫索引
    3. 文件的大小或block數(shù)達到上限時饮戳,切下一個文件
func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool) ([]string, error) {
    // These are the new TSM files written
    var files []string

    for {
        // sequence + 1, 這個sequence其實就是 level
        sequence++

        // 這里寫入的文件的命名為 *.tsm.tmp
        // 它在作fullCompact時被重命名為 *.tsm
        fileName := filepath.Join(c.Dir, c.formatFileName(generation, sequence)+"."+TSMFileExtension+"."+TmpTSMFileExtension)

        // Write as much as possible to this file
        // c.write實現(xiàn)了實際的寫入操作
        err := c.write(fileName, iter, throttle)

        // We've hit the max file limit and there is more to write.  Create a new file
        // and continue.
        // 寫入的文件大小或block數(shù)達到上限,就切下一個文件洞拨,sequence + 1
        if err == errMaxFileExceeded || err == ErrMaxBlocksExceeded {
            files = append(files, fileName)
            continue
        } else if err == ErrNoValues {
            // ErrNoValues意味著沒有有效的value, 只有tombstoned entires, 就不寫入文件
            // If the file only contained tombstoned entries, then it would be a 0 length
            // file that we can drop.
            if err := os.RemoveAll(fileName); err != nil {
                return nil, err
            }
            break
        } else if _, ok := err.(errCompactionInProgress); ok {
            // Don't clean up the file as another compaction is using it.  This should not happen as the
            // planner keeps track of which files are assigned to compaction plans now.
            return nil, err
        } else if err != nil {
            // Remove any tmp files we already completed
            for _, f := range files {
                if err := os.RemoveAll(f); err != nil {
                    return nil, err
                }
            }
            // We hit an error and didn't finish the compaction.  Remove the temp file and abort.
            if err := os.RemoveAll(fileName); err != nil {
                return nil, err
            }
            return nil, err
        }

        files = append(files, fileName)
        break
    }

    return files, nil
}
多個tsm文件的compaction
概述

我們先來簡單講一下這個compaction的過程扯罐,這類似于歸并合并操作,每個tsm文件中的keys在其索引中都是從小到小排序的烦衣,compaction時就是將多個文件中的相同key的block合并在一起歹河,再生成新的索引,說起來就是這么簡單花吟,但influxdb在實現(xiàn)時為了效率等作了一些額外的策略;

tsmBatchKeyIterator
  • 和上面的Cache的compatcon一樣秸歧,這里也需要一個Iterator: tsmBatchKeyIterator, 它用來同時遍歷多個tsm文件, 這個是compaction過程的精華所在
tsmBatchKeyIterator的遍歷
  1. 先將各tsm文件中的第一個key對應(yīng)的block一一取出
  2. 掃描1中獲取到的所有每一個key,確定一個當前最小的key
  3. 從1中獲取到的所有block中提取出key等于2中獲取的最小key的block,存在k.blocks
  4. 對3中獲取的所有block作merge, 主要是按minTime排序示辈,這樣基本就完成了一個Next的操作
  5. 具體代碼如下寥茫,我在里面加了注釋
 func (k *tsmBatchKeyIterator) Next() bool {
RETRY:
    // Any merged blocks pending?
    if len(k.merged) > 0 {
        k.merged = k.merged[1:]
        if len(k.merged) > 0 {
            return true
        }
    }

    // Any merged values pending?
    if k.hasMergedValues() {
        k.merge()
        if len(k.merged) > 0 || k.hasMergedValues() {
            return true
        }
    }

    // If we still have blocks from the last read, merge them
    if len(k.blocks) > 0 {
        k.merge()
        if len(k.merged) > 0 || k.hasMergedValues() {
            return true
        }
    }

    // Read the next block from each TSM iterator
    // 讀每一個tsm文件,將其第一組block都存到k.buf里矾麻,看起來是要合并排序
    // 每個tsm文件對應(yīng)一個blocks
    // 這個blocks和tsm的index是一樣的纱耻,是按key從小到大排序的
    for i, v := range k.buf {
        if len(v) != 0 {
            continue
        }

        iter := k.iterators[i]
        if iter.Next() {
            key, minTime, maxTime, typ, _, b, err := iter.Read()
            if err != nil {
                k.err = err
            }

            // This block may have ranges of time removed from it that would
            // reduce the block min and max time.
            // 這個tombstones是[]TimeRange
            tombstones := iter.r.TombstoneRange(key)

            var blk *block
            // k.buf[i]的類型是[]blocks -> [][]block
            // 下面這段邏輯,就是不斷向k.buf[i]中append新的bolck
            // 如果k.buf[i]需要擴容险耀,就在append時擴,擴為原有cap的二倍
            if cap(k.buf[i]) > len(k.buf[i]) {
                k.buf[i] = k.buf[i][:len(k.buf[i])+1]
                blk = k.buf[i][len(k.buf[i])-1]
                if blk == nil {
                    blk = &block{}
                    k.buf[i][len(k.buf[i])-1] = blk
                }
            } else {
                blk = &block{}
                k.buf[i] = append(k.buf[i], blk)
            }

            blk.minTime = minTime
            blk.maxTime = maxTime
            blk.key = key
            blk.typ = typ
            blk.b = b
            blk.tombstones = tombstones
            blk.readMin = math.MaxInt64
            blk.readMax = math.MinInt64

            blockKey := key
            // 如果這兩個key相等甩牺,說明還沒有遍歷完當前的block
            for bytes.Equal(iter.PeekNext(), blockKey) {
                iter.Next()
                key, minTime, maxTime, typ, _, b, err := iter.Read()
                if err != nil {
                    k.err = err
                }

                tombstones := iter.r.TombstoneRange(key)

                var blk *block
                if cap(k.buf[i]) > len(k.buf[i]) {
                    k.buf[i] = k.buf[i][:len(k.buf[i])+1]
                    blk = k.buf[i][len(k.buf[i])-1]
                    if blk == nil {
                        blk = &block{}
                        k.buf[i][len(k.buf[i])-1] = blk
                    }
                } else {
                    blk = &block{}
                    k.buf[i] = append(k.buf[i], blk)
                }

                blk.minTime = minTime
                blk.maxTime = maxTime
                blk.key = key
                blk.typ = typ
                blk.b = b
                blk.tombstones = tombstones
                blk.readMin = math.MaxInt64
                blk.readMax = math.MinInt64
            }
        }

        if iter.Err() != nil {
            k.err = iter.Err()
        }
    }

    // Each reader could have a different key that it's currently at, need to find
    // the next smallest one to keep the sort ordering.
    // 找出當前最小的key(series key + field)
    // 因為k.buf中的每個blocks都是按key從小到大排好的蘑志,
    // 所以這里只需看每個blocks[0]
    var minKey []byte
    var minType byte
    for _, b := range k.buf {
        // block could be nil if the iterator has been exhausted for that file
        if len(b) == 0 {
            continue
        }
        if len(minKey) == 0 || bytes.Compare(b[0].key, minKey) < 0 {
            minKey = b[0].key
            minType = b[0].typ
        }
    }
    k.key = minKey
    k.typ = minType

    // Now we need to find all blocks that match the min key so we can combine and dedupe
    // the blocks if necessary
    // 把key都等于上面獲取的minKey的block放到k.blocks中
    for i, b := range k.buf {
        if len(b) == 0 {
            continue
        }
        //b[0]即為當前的k.buf[i][0], 是一個block
        // b是[]block
        if bytes.Equal(b[0].key, k.key) {
            //k.blocks => []block
            // b => []block
            k.blocks = append(k.blocks, b...)
            //k.buf[i]的length被reset為0, 即已有的數(shù)據(jù)被清掉
            k.buf[i] = k.buf[i][:0]
        }
    }

    if len(k.blocks) == 0 {
        return false
    }

    k.merge()

    // After merging all the values for this key, we might not have any.  (e.g. they were all deleted
    // through many tombstones).  In this case, move on to the next key instead of ending iteration.
    if len(k.merged) == 0 {
        goto RETRY
    }

    return len(k.merged) > 0
}
tsmBtchKeyIterator的合并
  1. 當前需要合并的block都存在k.blocks里,先將其按block.minTime排序;
  2. 判斷是否需要去重贬派,如果k.blocks中的block在[minTime, maxTime]上有重疊或者某個block有tombstones急但,就都需要重構(gòu)這些block,需要作去重,刪除,重排操作, 相當于將所有的block按minTime重新組合排序;
  3. 我們來看下關(guān)鍵代碼搞乏,里面我添加了一些注釋
func (k *tsmBatchKeyIterator) combineFloat(dedup bool) blocks {
    if dedup {
        //實現(xiàn)了按minTime來排序,去重
        for k.mergedFloatValues.Len() < k.size && len(k.blocks) > 0 {
            // 去除已經(jīng)讀取過的block
            for len(k.blocks) > 0 && k.blocks[0].read() {
                k.blocks = k.blocks[1:]
            }

            if len(k.blocks) == 0 {
                break
            }
            first := k.blocks[0]
            minTime := first.minTime
            maxTime := first.maxTime

            // Adjust the min time to the start of any overlapping blocks.
            // 其實i可以從1開始
            // 為了按minTime排序波桩,需要確定一個全局最小范圍的[minTime, maxTime]
            for i := 0; i < len(k.blocks); i++ {
                if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() {
                    if k.blocks[i].minTime < minTime {
                        minTime = k.blocks[i].minTime
                    }

                    // 將最大值減小
                    if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime {
                        maxTime = k.blocks[i].maxTime
                    }
                }
            }

            // We have some overlapping blocks so decode all, append in order and then dedup
            // 按上面確定的[minTime, maxTime]在所有的blocks中撈數(shù)據(jù)
            for i := 0; i < len(k.blocks); i++ {
                if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() {
                    continue
                }

                var v tsdb.FloatArray
                var err error
                if err = DecodeFloatArrayBlock(k.blocks[i].b, &v); err != nil {
                    k.err = err
                    return nil
                }

                // Remove values we already read
                v.Exclude(k.blocks[i].readMin, k.blocks[i].readMax)

                // Filter out only the values for overlapping block
                // 這個Include是不是可以不用調(diào)用
                v.Include(minTime, maxTime)
                if v.Len() > 0 {
                    // Record that we read a subset of the block
                    k.blocks[i].markRead(v.MinTime(), v.MaxTime())
                }

                // Apply each tombstone to the block
                for _, ts := range k.blocks[i].tombstones {
                    v.Exclude(ts.Min, ts.Max)
                }

                k.mergedFloatValues.Merge(&v)
            }
        }

        // Since we combined multiple blocks, we could have more values than we should put into
        // a single block.  We need to chunk them up into groups and re-encode them.
        return k.chunkFloat(nil)
    }
    var i int

    for i < len(k.blocks) {

        // skip this block if it's values were already read
        if k.blocks[i].read() {
            i++
            continue
        }
        // If we this block is already full, just add it as is
        // 遇到一個不full的Block就break, 那如果后續(xù)還有full的block怎么辦?
        if BlockCount(k.blocks[i].b) >= k.size {
            k.merged = append(k.merged, k.blocks[i])
        } else {
            break
        }
        i++
    }

    if k.fast {
        for i < len(k.blocks) {
            // skip this block if it's values were already read
            if k.blocks[i].read() {
                i++
                continue
            }

            k.merged = append(k.merged, k.blocks[i])
            i++
        }
    }

    // If we only have 1 blocks left, just append it as is and avoid decoding/recoding
    if i == len(k.blocks)-1 {
        if !k.blocks[i].read() {
            k.merged = append(k.merged, k.blocks[i])
        }
        i++
    }

    // The remaining blocks can be combined and we know that they do not overlap and
    // so we can just append each, sort and re-encode.
    for i < len(k.blocks) && k.mergedFloatValues.Len() < k.size {
        if k.blocks[i].read() {
            i++
            continue
        }

        var v tsdb.FloatArray
        if err := DecodeFloatArrayBlock(k.blocks[i].b, &v); err != nil {
            k.err = err
            return nil
        }

        // Apply each tombstone to the block
        for _, ts := range k.blocks[i].tombstones {
            v.Exclude(ts.Min, ts.Max)
        }

        k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)

        k.mergedFloatValues.Merge(&v)
        i++
    }

    k.blocks = k.blocks[i:]

    return k.chunkFloat(k.merged)
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市请敦,隨后出現(xiàn)的幾起案子镐躲,更是在濱河造成了極大的恐慌,老刑警劉巖侍筛,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件萤皂,死亡現(xiàn)場離奇詭異,居然都是意外死亡匣椰,警方通過查閱死者的電腦和手機裆熙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人弛车,你說我怎么就攤上這事齐媒。” “怎么了纷跛?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長邀杏。 經(jīng)常有香客問我贫奠,道長,這世上最難降的妖魔是什么望蜡? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任唤崭,我火速辦了婚禮,結(jié)果婚禮上脖律,老公的妹妹穿的比我還像新娘谢肾。我一直安慰自己,他們只是感情好小泉,可當我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布芦疏。 她就那樣靜靜地躺著,像睡著了一般微姊。 火紅的嫁衣襯著肌膚如雪酸茴。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天兢交,我揣著相機與錄音薪捍,去河邊找鬼。 笑死配喳,一個胖子當著我的面吹牛酪穿,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播晴裹,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼被济,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了息拜?” 一聲冷哼從身側(cè)響起溉潭,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎少欺,沒想到半個月后喳瓣,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡赞别,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年畏陕,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片仿滔。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡惠毁,死狀恐怖犹芹,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情鞠绰,我是刑警寧澤腰埂,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站蜈膨,受9級特大地震影響屿笼,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜翁巍,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一驴一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧灶壶,春花似錦肝断、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至洒嗤,卻和暖如春箫荡,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背渔隶。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工羔挡, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人间唉。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓绞灼,卻偏偏與公主長得像,于是被迫代替她去往敵國和親呈野。 傳聞我的和親對象是個殘疾皇子低矮,可洞房花燭夜當晚...
    茶點故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內(nèi)容