Cluster下的數(shù)據(jù)寫入
數(shù)據(jù)寫入的實(shí)現(xiàn)
- 主要分析
cluster/points_writer.go
中的WritePoints
函數(shù)的實(shí)現(xiàn)
// WritePoints writes across multiple local and remote data nodes according the consistency level.
func (w *PointsWriter) WritePoints(p *WritePointsRequest) error {
w.statMap.Add(statWriteReq, 1)
w.statMap.Add(statPointWriteReq, int64(len(p.Points)))
//2.1 先獲取RetentionPolicy
if p.RetentionPolicy == "" {
db, err := w.MetaClient.Database(p.Database)
if err != nil {
return err
} else if db == nil {
return influxdb.ErrDatabaseNotFound(p.Database)
}
p.RetentionPolicy = db.DefaultRetentionPolicy
}
// 2.2 生成 shardMap
shardMappings, err := w.MapShards(p)
if err != nil {
return err
}
// Write each shard in it's own goroutine and return as soon
// as one fails.
ch := make(chan error, len(shardMappings.Points))
for shardID, points := range shardMappings.Points {
// 2.3 寫入數(shù)據(jù)到Shard
go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {
ch <- w.writeToShard(shard, p.Database, p.RetentionPolicy, p.ConsistencyLevel, points)
}(shardMappings.Shards[shardID], p.Database, p.RetentionPolicy, points)
}
// Send points to subscriptions if possible.
ok := false
// We need to lock just in case the channel is about to be nil'ed
w.mu.RLock()
select {
case w.subPoints <- p:
ok = true
default:
}
w.mu.RUnlock()
if ok {
w.statMap.Add(statSubWriteOK, 1)
} else {
w.statMap.Add(statSubWriteDrop, 1)
}
// 2.4 等待寫入完成
for range shardMappings.Points {
select {
case <-w.closing:
return ErrWriteFailed
case err := <-ch:
if err != nil {
return err
}
}
}
return nil
}
- 上面的函數(shù)實(shí)現(xiàn)主要分如下幾個(gè)步驟
2.1 獲取對(duì)應(yīng)的RetentionPolicy
2.2 生成ShardMap, 將各個(gè)point對(duì)應(yīng)到相應(yīng)ShardGroup中的Shard中, 這步很關(guān)鍵
2.3 按ShardId不同部念,開啟新的goroutine, 將points寫入相應(yīng)的Shard,可能設(shè)計(jì)對(duì)寫入數(shù)據(jù)到其它的DataNode上;
2.4 等待寫入完成或退出
ShardMap的生成
先講一下ShardGroup的概念
1.1 寫入Influxdb的每一條數(shù)據(jù)對(duì)帶有相應(yīng)的time時(shí)間睬捶,每一個(gè)SharGroup都有自己的start和end時(shí)間笨奠,這個(gè)時(shí)間跨度是由用戶寫入時(shí)選取的RetentionPolicy時(shí)的ShardGroupDarution決定,這樣每條寫入的數(shù)據(jù)就必然僅屬于一個(gè)確定的ShardGroup中;主要實(shí)現(xiàn)在
cluster/points_writer.go
中的MapShards
中
func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {
// holds the start time ranges for required shard groups
timeRanges := map[time.Time]*meta.ShardGroupInfo{}
rp, err := w.MetaClient.RetentionPolicy(wp.Database, wp.RetentionPolicy)
if err != nil {
return nil, err
}
if rp == nil {
return nil, influxdb.ErrRetentionPolicyNotFound(wp.RetentionPolicy)
}
for _, p := range wp.Points {
timeRanges[p.Time().Truncate(rp.ShardGroupDuration)] = nil
}
// holds all the shard groups and shards that are required for writes
for t := range timeRanges {
sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, t)
if err != nil {
return nil, err
}
timeRanges[t] = sg
}
mapping := NewShardMapping()
for _, p := range wp.Points {
sg := timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]
sh := sg.ShardFor(p.HashID())
mapping.MapPoint(&sh, p)
}
return mapping, nil
}
- 我們來拆解下上面函數(shù)的實(shí)現(xiàn)
3.1 掃描所有的points, 按時(shí)間確定我們需要多個(gè)ShardGroup
for _, p := range wp.Points {
timeRanges[p.Time().Truncate(rp.ShardGroupDuration)] = nil
}
3.2 調(diào)用w.MetaClient.CreateShardGroup
, 如果ShardGroup存在直接返回ShardGroup信息仿吞,如果不存在創(chuàng)建,創(chuàng)建過程涉及到將CreateShardGroup的請(qǐng)求發(fā)送給MetadataServer并等待本地更新到新的MetaData數(shù)據(jù);
sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, t)
3.3 分析ShardGroup的分配規(guī)則唬复, 在services/meta/data.go
中的CreateShardGroup
func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) error {
...
// Require at least one replica but no more replicas than nodes.
// 確認(rèn)復(fù)本數(shù)穗椅,不能大于DataNode節(jié)點(diǎn)總數(shù)
replicaN := rpi.ReplicaN
if replicaN == 0 {
replicaN = 1
} else if replicaN > len(data.DataNodes) {
replicaN = len(data.DataNodes)
}
// Determine shard count by node count divided by replication factor.
// This will ensure nodes will get distributed across nodes evenly and
// replicated the correct number of times.
// 根據(jù)復(fù)本數(shù)確定Shard數(shù)量
shardN := len(data.DataNodes) / replicaN
// Create the shard group.
// 創(chuàng)建ShardGroup
data.MaxShardGroupID++
sgi := ShardGroupInfo{}
sgi.ID = data.MaxShardGroupID
sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC()
sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()
// Create shards on the group.
sgi.Shards = make([]ShardInfo, shardN)
for i := range sgi.Shards {
data.MaxShardID++
sgi.Shards[i] = ShardInfo{ID: data.MaxShardID}
}
// Assign data nodes to shards via round robin.
// Start from a repeatably "random" place in the node list.
// ShardInfo中的Owners記錄了當(dāng)前Shard所有復(fù)本所在DataNode的信息
// 分Shard的所有復(fù)本分配DataNode
// 使用data.Index作為基數(shù)確定開始的DataNode,然后使用 round robin策略分配
// data.Index:每次meta信息有更新,Index就會(huì)更新, 可以理解為meta信息的版本號(hào)
nodeIndex := int(data.Index % uint64(len(data.DataNodes)))
for i := range sgi.Shards {
si := &sgi.Shards[i]
for j := 0; j < replicaN; j++ {
nodeID := data.DataNodes[nodeIndex%len(data.DataNodes)].ID
si.Owners = append(si.Owners, ShardOwner{NodeID: nodeID})
nodeIndex++
}
}
// Retention policy has a new shard group, so update the policy. Shard
// Groups must be stored in sorted order, as other parts of the system
// assume this to be the case.
rpi.ShardGroups = append(rpi.ShardGroups, sgi)
sort.Sort(ShardGroupInfos(rpi.ShardGroups))
return nil
}
3.3 按每一個(gè)具體的point對(duì)應(yīng)到ShardGroup中的一個(gè)Shard: 按point的HashID來對(duì)Shard總數(shù)取模客蹋,HashID是measurment + tag set
的Hash值
for _, p := range wp.Points {
sg := timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]
sh := sg.ShardFor(p.HashID())
mapping.MapPoint(&sh, p)
}
....
func (sgi *ShardGroupInfo) ShardFor(hash uint64) ShardInfo {
return sgi.Shards[hash%uint64(len(sgi.Shards))]
}
數(shù)據(jù)按一致性要求寫入
- 過程簡(jiǎn)述
1.1 根據(jù)一致性要求確認(rèn)需要成功寫入幾份
switch consistency {
// 對(duì)于ConsistencyLevelAny, ConsistencyLevelOne只需要寫入一份即滿足一致性要求塞蹭,返回客戶端
case ConsistencyLevelAny, ConsistencyLevelOne:
required = 1
case ConsistencyLevelQuorum:
required = required/2 + 1
}
1.2 根據(jù)Shard.Owners對(duì)應(yīng)的DataNode, 向其中的每個(gè)DataNode寫入數(shù)據(jù),如果是本機(jī)讶坯,直接調(diào)用w.TSDBStore.WriteToShard
寫入;如果非本機(jī)番电,調(diào)用err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points)
;
1.3 寫入遠(yuǎn)端失敗時(shí),數(shù)據(jù)寫入HintedHandoff本地磁盤隊(duì)列多次重試寫到遠(yuǎn)端辆琅,直到數(shù)據(jù)過期被清理;對(duì)于一致性要求是ConsistencyLevelAny
, 寫入本地HintedHandoff成功漱办,就算是寫入成功;
w.statMap.Add(statWritePointReqHH, int64(len(points)))
hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points)
if hherr != nil {
ch <- &AsyncWriteResult{owner, hherr}
return
}
if hherr == nil && consistency == ConsistencyLevelAny {
ch <- &AsyncWriteResult{owner, nil}
return
}
1.4 等待寫入超時(shí)或完成
for range shard.Owners {
select {
case <-w.closing:
return ErrWriteFailed
case <-timeout:
w.statMap.Add(statWriteTimeout, 1)
// return timeout error to caller
return ErrTimeout
case result := <-ch:
// If the write returned an error, continue to the next response
if result.Err != nil {
if writeError == nil {
writeError = result.Err
}
continue
}
wrote++
// 寫入已達(dá)到一致性要求,就立即返回
if wrote >= required {
w.statMap.Add(statWriteOK, 1)
return nil
}
}
}
HintedHandoff服務(wù)
定義在
services/hh/service.go
中-
寫入HintedHandoff中的數(shù)據(jù)婉烟,按NodeID的不同寫入不同的目錄洼冻,每個(gè)目錄下又分多個(gè)文件,每個(gè)文件作為一個(gè)segment, 命名規(guī)則就是依次遞增的id, id的大小按序就是寫入的時(shí)間按從舊到新排序;
HintedHandoff服務(wù)會(huì)針對(duì)每一個(gè)遠(yuǎn)端DataNode創(chuàng)建
NodeProcessor
, 每個(gè)負(fù)責(zé)自己DataNode的寫入, 運(yùn)行在一個(gè)單獨(dú)的goroutine中在每個(gè)goroutine中隅很,作兩件事:一個(gè)是定時(shí)清理過期的數(shù)據(jù)撞牢,如果被清理掉的數(shù)據(jù)還沒有成功寫入到遠(yuǎn)端,則會(huì)丟失;二是從文件讀取數(shù)據(jù)寫入到遠(yuǎn)端;
func (n *NodeProcessor) run() {
defer n.wg.Done()
...
for {
select {
case <-n.done:
return
case <-time.After(n.PurgeInterval):
if err := n.queue.PurgeOlderThan(time.Now().Add(-n.MaxAge)); err != nil {
n.Logger.Printf("failed to purge for node %d: %s", n.nodeID, err.Error())
}
case <-time.After(currInterval):
limiter := NewRateLimiter(n.RetryRateLimit)
for {
c, err := n.SendWrite()
if err != nil {
if err == io.EOF {
// No more data, return to configured interval
currInterval = time.Duration(n.RetryInterval)
} else {
currInterval = currInterval * 2
if currInterval > time.Duration(n.RetryMaxInterval) {
currInterval = time.Duration(n.RetryMaxInterval)
}
}
break
}
// Success! Ensure backoff is cancelled.
currInterval = time.Duration(n.RetryInterval)
// Update how many bytes we've sent
limiter.Update(c)
// Block to maintain the throughput rate
time.Sleep(limiter.Delay())
}
}
}
}
- 數(shù)據(jù)的本地存儲(chǔ)和讀取
5.1 定義在services/hh/queue.go
,所有的segment file在內(nèi)存中組織成一個(gè)隊(duì)列叔营,讀從head指向的segment讀取屋彪,寫入到tail指向的segment, 每個(gè)segment文件的最后8字節(jié)記錄當(dāng)前segment文件已經(jīng)讀到什么位置
5.2 清理,當(dāng)這個(gè)segment文件內(nèi)容都發(fā)送完當(dāng)前文件會(huì)被刪除绒尊,周期性清理每次只會(huì)check當(dāng)前head指向的segment是否需要清理掉