Influxdb Cluster下的數(shù)據(jù)寫入

Cluster下的數(shù)據(jù)寫入

數(shù)據(jù)寫入的實(shí)現(xiàn)
  1. 主要分析cluster/points_writer.go中的WritePoints函數(shù)的實(shí)現(xiàn)
// WritePoints writes across multiple local and remote data nodes according the consistency level.
func (w *PointsWriter) WritePoints(p *WritePointsRequest) error {
    w.statMap.Add(statWriteReq, 1)
    w.statMap.Add(statPointWriteReq, int64(len(p.Points)))

    //2.1 先獲取RetentionPolicy
    if p.RetentionPolicy == "" {
        db, err := w.MetaClient.Database(p.Database)
        if err != nil {
            return err
        } else if db == nil {
            return influxdb.ErrDatabaseNotFound(p.Database)
        }
        p.RetentionPolicy = db.DefaultRetentionPolicy
    }

    // 2.2 生成 shardMap
    shardMappings, err := w.MapShards(p)
    if err != nil {
        return err
    }

    // Write each shard in it's own goroutine and return as soon
    // as one fails.
    ch := make(chan error, len(shardMappings.Points))
    for shardID, points := range shardMappings.Points {
    
        // 2.3 寫入數(shù)據(jù)到Shard
        go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {
            ch <- w.writeToShard(shard, p.Database, p.RetentionPolicy, p.ConsistencyLevel, points)
        }(shardMappings.Shards[shardID], p.Database, p.RetentionPolicy, points)
    }

    // Send points to subscriptions if possible.
    ok := false
    // We need to lock just in case the channel is about to be nil'ed
    w.mu.RLock()
    select {
    case w.subPoints <- p:
        ok = true
    default:
    }
    w.mu.RUnlock()
    if ok {
        w.statMap.Add(statSubWriteOK, 1)
    } else {
        w.statMap.Add(statSubWriteDrop, 1)
    }

    // 2.4 等待寫入完成 
    for range shardMappings.Points {
        select {
        case <-w.closing:
            return ErrWriteFailed
        case err := <-ch:
            if err != nil {
                return err
            }
        }
    }
    return nil
}
  1. 上面的函數(shù)實(shí)現(xiàn)主要分如下幾個(gè)步驟
    2.1 獲取對(duì)應(yīng)的RetentionPolicy
    2.2 生成ShardMap, 將各個(gè)point對(duì)應(yīng)到相應(yīng)ShardGroup中的Shard中, 這步很關(guān)鍵
    2.3 按ShardId不同部念,開啟新的goroutine, 將points寫入相應(yīng)的Shard,可能設(shè)計(jì)對(duì)寫入數(shù)據(jù)到其它的DataNode上;
    2.4 等待寫入完成或退出
ShardMap的生成
  1. 先講一下ShardGroup的概念
    1.1 寫入Influxdb的每一條數(shù)據(jù)對(duì)帶有相應(yīng)的time時(shí)間睬捶,每一個(gè)SharGroup都有自己的start和end時(shí)間笨奠,這個(gè)時(shí)間跨度是由用戶寫入時(shí)選取的RetentionPolicy時(shí)的ShardGroupDarution決定,這樣每條寫入的數(shù)據(jù)就必然僅屬于一個(gè)確定的ShardGroup中;

  2. 主要實(shí)現(xiàn)在cluster/points_writer.go中的MapShards

func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {

    // holds the start time ranges for required shard groups
    timeRanges := map[time.Time]*meta.ShardGroupInfo{}

    rp, err := w.MetaClient.RetentionPolicy(wp.Database, wp.RetentionPolicy)
    if err != nil {
        return nil, err
    }
    if rp == nil {
        return nil, influxdb.ErrRetentionPolicyNotFound(wp.RetentionPolicy)
    }

    for _, p := range wp.Points {
        timeRanges[p.Time().Truncate(rp.ShardGroupDuration)] = nil
    }

    // holds all the shard groups and shards that are required for writes
    for t := range timeRanges {
        sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, t)
        if err != nil {
            return nil, err
        }
        timeRanges[t] = sg
    }

    mapping := NewShardMapping()
    for _, p := range wp.Points {
        sg := timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]
        sh := sg.ShardFor(p.HashID())
        mapping.MapPoint(&sh, p)
    }
    return mapping, nil
}
  1. 我們來拆解下上面函數(shù)的實(shí)現(xiàn)
    3.1 掃描所有的points, 按時(shí)間確定我們需要多個(gè)ShardGroup
for _, p := range wp.Points {
        timeRanges[p.Time().Truncate(rp.ShardGroupDuration)] = nil
    }

3.2 調(diào)用w.MetaClient.CreateShardGroup, 如果ShardGroup存在直接返回ShardGroup信息仿吞,如果不存在創(chuàng)建,創(chuàng)建過程涉及到將CreateShardGroup的請(qǐng)求發(fā)送給MetadataServer并等待本地更新到新的MetaData數(shù)據(jù);

sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, t)

3.3 分析ShardGroup的分配規(guī)則唬复, 在services/meta/data.go中的CreateShardGroup

func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) error {
    ...

    // Require at least one replica but no more replicas than nodes.
    // 確認(rèn)復(fù)本數(shù)穗椅,不能大于DataNode節(jié)點(diǎn)總數(shù)
    replicaN := rpi.ReplicaN
    if replicaN == 0 {
        replicaN = 1
    } else if replicaN > len(data.DataNodes) {
        replicaN = len(data.DataNodes)
    }

    // Determine shard count by node count divided by replication factor.
    // This will ensure nodes will get distributed across nodes evenly and
    // replicated the correct number of times.
    // 根據(jù)復(fù)本數(shù)確定Shard數(shù)量
    shardN := len(data.DataNodes) / replicaN

    // Create the shard group.
    // 創(chuàng)建ShardGroup
    data.MaxShardGroupID++
    sgi := ShardGroupInfo{}
    sgi.ID = data.MaxShardGroupID
    sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC()
    sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()

    // Create shards on the group.
    sgi.Shards = make([]ShardInfo, shardN)
    for i := range sgi.Shards {
        data.MaxShardID++
        sgi.Shards[i] = ShardInfo{ID: data.MaxShardID}
    }

    // Assign data nodes to shards via round robin.
    // Start from a repeatably "random" place in the node list.
    // ShardInfo中的Owners記錄了當(dāng)前Shard所有復(fù)本所在DataNode的信息
    // 分Shard的所有復(fù)本分配DataNode
    // 使用data.Index作為基數(shù)確定開始的DataNode,然后使用 round robin策略分配
    // data.Index:每次meta信息有更新,Index就會(huì)更新, 可以理解為meta信息的版本號(hào)
    nodeIndex := int(data.Index % uint64(len(data.DataNodes)))
    for i := range sgi.Shards {
        si := &sgi.Shards[i]
        for j := 0; j < replicaN; j++ {
            nodeID := data.DataNodes[nodeIndex%len(data.DataNodes)].ID
            si.Owners = append(si.Owners, ShardOwner{NodeID: nodeID})
            nodeIndex++
        }
    }

    // Retention policy has a new shard group, so update the policy. Shard
    // Groups must be stored in sorted order, as other parts of the system
    // assume this to be the case.
    rpi.ShardGroups = append(rpi.ShardGroups, sgi)
    sort.Sort(ShardGroupInfos(rpi.ShardGroups))

    return nil
}

3.3 按每一個(gè)具體的point對(duì)應(yīng)到ShardGroup中的一個(gè)Shard: 按point的HashID來對(duì)Shard總數(shù)取模客蹋,HashID是measurment + tag set的Hash值

for _, p := range wp.Points {
        sg := timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]
        sh := sg.ShardFor(p.HashID())
        mapping.MapPoint(&sh, p)
    }
....

 func (sgi *ShardGroupInfo) ShardFor(hash uint64) ShardInfo {
    return sgi.Shards[hash%uint64(len(sgi.Shards))]
}
數(shù)據(jù)按一致性要求寫入
  1. 過程簡(jiǎn)述
    1.1 根據(jù)一致性要求確認(rèn)需要成功寫入幾份
switch consistency {
    // 對(duì)于ConsistencyLevelAny, ConsistencyLevelOne只需要寫入一份即滿足一致性要求塞蹭,返回客戶端
    case ConsistencyLevelAny, ConsistencyLevelOne:
        required = 1
    case ConsistencyLevelQuorum:
        required = required/2 + 1
    }

1.2 根據(jù)Shard.Owners對(duì)應(yīng)的DataNode, 向其中的每個(gè)DataNode寫入數(shù)據(jù),如果是本機(jī)讶坯,直接調(diào)用w.TSDBStore.WriteToShard寫入;如果非本機(jī)番电,調(diào)用err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points);
1.3 寫入遠(yuǎn)端失敗時(shí),數(shù)據(jù)寫入HintedHandoff本地磁盤隊(duì)列多次重試寫到遠(yuǎn)端辆琅,直到數(shù)據(jù)過期被清理;對(duì)于一致性要求是ConsistencyLevelAny, 寫入本地HintedHandoff成功漱办,就算是寫入成功;

    w.statMap.Add(statWritePointReqHH, int64(len(points)))
                hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points)
                if hherr != nil {
                    ch <- &AsyncWriteResult{owner, hherr}
                    return
                }

                if hherr == nil && consistency == ConsistencyLevelAny {
                    ch <- &AsyncWriteResult{owner, nil}
                    return
            }

1.4 等待寫入超時(shí)或完成

for range shard.Owners {
        select {
        case <-w.closing:
            return ErrWriteFailed
        case <-timeout:
            w.statMap.Add(statWriteTimeout, 1)
            // return timeout error to caller
            return ErrTimeout
        case result := <-ch:
            // If the write returned an error, continue to the next response
            if result.Err != nil {
                if writeError == nil {
                    writeError = result.Err
                }
                continue
            }

            wrote++

            // 寫入已達(dá)到一致性要求,就立即返回
            if wrote >= required {
                w.statMap.Add(statWriteOK, 1)
                return nil
            }
        }
    }
HintedHandoff服務(wù)
  1. 定義在services/hh/service.go

  2. 寫入HintedHandoff中的數(shù)據(jù)婉烟,按NodeID的不同寫入不同的目錄洼冻,每個(gè)目錄下又分多個(gè)文件,每個(gè)文件作為一個(gè)segment, 命名規(guī)則就是依次遞增的id, id的大小按序就是寫入的時(shí)間按從舊到新排序;


    hitnedhandoff.png
  3. HintedHandoff服務(wù)會(huì)針對(duì)每一個(gè)遠(yuǎn)端DataNode創(chuàng)建NodeProcessor, 每個(gè)負(fù)責(zé)自己DataNode的寫入, 運(yùn)行在一個(gè)單獨(dú)的goroutine中

  4. 在每個(gè)goroutine中隅很,作兩件事:一個(gè)是定時(shí)清理過期的數(shù)據(jù)撞牢,如果被清理掉的數(shù)據(jù)還沒有成功寫入到遠(yuǎn)端,則會(huì)丟失;二是從文件讀取數(shù)據(jù)寫入到遠(yuǎn)端;

func (n *NodeProcessor) run() {
    defer n.wg.Done()

    ...

    for {
        select {
        case <-n.done:
            return

        case <-time.After(n.PurgeInterval):
            if err := n.queue.PurgeOlderThan(time.Now().Add(-n.MaxAge)); err != nil {
                n.Logger.Printf("failed to purge for node %d: %s", n.nodeID, err.Error())
            }

        case <-time.After(currInterval):
            limiter := NewRateLimiter(n.RetryRateLimit)
            for {
                c, err := n.SendWrite()
                if err != nil {
                    if err == io.EOF {
                        // No more data, return to configured interval
                        currInterval = time.Duration(n.RetryInterval)
                    } else {
                        currInterval = currInterval * 2
                        if currInterval > time.Duration(n.RetryMaxInterval) {
                            currInterval = time.Duration(n.RetryMaxInterval)
                        }
                    }
                    break
                }

                // Success! Ensure backoff is cancelled.
                currInterval = time.Duration(n.RetryInterval)

                // Update how many bytes we've sent
                limiter.Update(c)

                // Block to maintain the throughput rate
                time.Sleep(limiter.Delay())
            }
        }
    }
}
  1. 數(shù)據(jù)的本地存儲(chǔ)和讀取
    5.1 定義在services/hh/queue.go,所有的segment file在內(nèi)存中組織成一個(gè)隊(duì)列叔营,讀從head指向的segment讀取屋彪,寫入到tail指向的segment, 每個(gè)segment文件的最后8字節(jié)記錄當(dāng)前segment文件已經(jīng)讀到什么位置
    5.2 清理,當(dāng)這個(gè)segment文件內(nèi)容都發(fā)送完當(dāng)前文件會(huì)被刪除绒尊,周期性清理每次只會(huì)check當(dāng)前head指向的segment是否需要清理掉
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末畜挥,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子婴谱,更是在濱河造成了極大的恐慌蟹但,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,544評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件谭羔,死亡現(xiàn)場(chǎng)離奇詭異华糖,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)瘟裸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門客叉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事兼搏÷盐浚” “怎么了?”我有些...
    開封第一講書人閱讀 162,764評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵佛呻,是天一觀的道長(zhǎng)裳朋。 經(jīng)常有香客問我,道長(zhǎng)吓著,這世上最難降的妖魔是什么再扭? 我笑而不...
    開封第一講書人閱讀 58,193評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮夜矗,結(jié)果婚禮上泛范,老公的妹妹穿的比我還像新娘。我一直安慰自己紊撕,他們只是感情好罢荡,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,216評(píng)論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著对扶,像睡著了一般区赵。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上浪南,一...
    開封第一講書人閱讀 51,182評(píng)論 1 299
  • 那天笼才,我揣著相機(jī)與錄音,去河邊找鬼络凿。 笑死骡送,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的絮记。 我是一名探鬼主播摔踱,決...
    沈念sama閱讀 40,063評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼怨愤!你這毒婦竟也來了派敷?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,917評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤撰洗,失蹤者是張志新(化名)和其女友劉穎篮愉,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體差导,經(jīng)...
    沈念sama閱讀 45,329評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡试躏,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,543評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了柿汛。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片冗酿。...
    茶點(diǎn)故事閱讀 39,722評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖络断,靈堂內(nèi)的尸體忽然破棺而出裁替,到底是詐尸還是另有隱情,我是刑警寧澤貌笨,帶...
    沈念sama閱讀 35,425評(píng)論 5 343
  • 正文 年R本政府宣布弱判,位于F島的核電站,受9級(jí)特大地震影響锥惋,放射性物質(zhì)發(fā)生泄漏昌腰。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,019評(píng)論 3 326
  • 文/蒙蒙 一膀跌、第九天 我趴在偏房一處隱蔽的房頂上張望遭商。 院中可真熱鬧,春花似錦捅伤、人聲如沸劫流。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)祠汇。三九已至,卻和暖如春熄诡,著一層夾襖步出監(jiān)牢的瞬間可很,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工凰浮, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留我抠,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,729評(píng)論 2 368
  • 正文 我出身青樓袜茧,卻偏偏與公主長(zhǎng)得像屿良,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子惫周,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,614評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容