從 dgraph-io/dgraph 了解 etcd/raft 的使用 0x01

本次代碼閱讀基于commit 189fdd3

1. raftwal

godoc

之前提到, etcd/raft提供了 MemoryStorage + wal 的方式 來(lái)對(duì) raft 中的 HardState, Snapshot 和 Entry 進(jìn)行持久化. wal 將數(shù)據(jù)直接寫入文件.

而對(duì)于 dgraph 來(lái)說(shuō), 它的一個(gè)物理節(jié)點(diǎn)上有多個(gè) raft group, 且 raft group 會(huì)自動(dòng)新建. 此時(shí), 所有 raft group 使用同一套底層存儲(chǔ)會(huì)相對(duì)簡(jiǎn)單一些.

本包中, dgraph 使用 badger 這個(gè)同屬 dgraph-io 出品的 kv 數(shù)據(jù)庫(kù)來(lái)保存所有 raft group 的日志.

1.1 Keys

既然不同 raft 的日志都存在同一個(gè) kv 數(shù)據(jù)庫(kù)中, 那么就需要對(duì)存儲(chǔ)的 key 進(jìn)行有效地區(qū)分.

對(duì)于一個(gè) raft node 來(lái)說(shuō), 它通過(guò)節(jié)點(diǎn) id RaftId(uint64) 和 組 id gid(uint32) 兩層 來(lái)標(biāo)識(shí)自己

相應(yīng)地, raftwal 中的三類 key 都包含這兩個(gè) id

  1. snapshotKey:

    func (w *Wal) snapshotKey(gid uint32) []byte {
     b := make([]byte, 14)
     binary.BigEndian.PutUint64(b[0:8], w.id)
     copy(b[8:10], []byte("ss"))
     binary.BigEndian.PutUint32(b[10:14], gid)
     return b
    }
    

    ?

  2. hardStateKey:

    func (w *Wal) hardStateKey(gid uint32) []byte {
     b := make([]byte, 14)
     binary.BigEndian.PutUint64(b[0:8], w.id)
     copy(b[8:10], []byte("hs"))
     binary.BigEndian.PutUint32(b[10:14], gid)
     return b
    }
    
  3. entryKey:

    func (w *Wal) entryKey(gid uint32, term, idx uint64) []byte {
     b := make([]byte, 28)
     binary.BigEndian.PutUint64(b[0:8], w.id)
     binary.BigEndian.PutUint32(b[8:12], gid)
     binary.BigEndian.PutUint64(b[12:20], term)
     binary.BigEndian.PutUint64(b[20:28], idx)
     return b
    }
    

1.2 Wal

Wal 提供 raft 數(shù)據(jù)的讀寫.

對(duì)于 raft 數(shù)據(jù)的持久化, 最重要的是保證數(shù)據(jù)的一致性.

StoreSnapshot
func (w *Wal) StoreSnapshot(gid uint32, s raftpb.Snapshot) error {
    txn := w.wals.NewTransactionAt(1, true)
    defer txn.Discard()
    
    // ...
    
    if err := txn.Set(w.snapshotKey(gid), data); err != nil {
        return err
    }
    
    // ...
    
    // 清除 snapshot 數(shù)據(jù)之前的所有 entry
    // Delete all entries before this snapshot to save disk space.
    start := w.entryKey(gid, 0, 0)
    last := w.entryKey(gid, s.Metadata.Term, s.Metadata.Index)
    
    // 這里利用了 badger 的特性, 在遍歷的時(shí)候僅讀取 key 數(shù)據(jù), 減少了讀取 value 帶來(lái)的開(kāi)銷
    opt := badger.DefaultIteratorOptions
    opt.PrefetchValues = false
    itr := txn.NewIterator(opt)
    defer itr.Close()

    // 逐一刪除不再需要的 entry
    for itr.Seek(start); itr.Valid(); itr.Next() {
        // ...
    }

    // Failure to delete entries is not a fatal error, so should be
    // ok to ignore
    if err := txn.CommitAt(1, nil); err != nil {
        x.Printf("Error while storing snapshot %v\n", err)
        return err
    }
    return nil
}
Store
// Store stores the hardstate and entries for a given RAFT group.
func (w *Wal) Store(gid uint32, h raftpb.HardState, es []raftpb.Entry) error {
    txn := w.wals.NewTransactionAt(1, true)

    var t, i uint64
    // 逐一保存 entry
    for _, e := range es {
        t, i = e.Term, e.Index
        
        // ...
    }

    // 如果有必要, 保存 HardState
    if !raft.IsEmptyHardState(h) {
        // ...
    }

    // If we get no entries, then the default value of t and i would be zero. That would
    // end up deleting all the previous valid raft entry logs. This check avoids that.
    if t > 0 || i > 0 {
        // When writing an Entry with Index i, any previously-persisted entries
        // with Index >= i must be discarded.
        // Ideally we should be deleting entries from previous term with index >= i,
        // but to avoid complexity we remove them during reading from wal.
        // 有可能出現(xiàn)某個(gè)時(shí)間點(diǎn)之后, 由于網(wǎng)絡(luò)原因, 數(shù)據(jù)分叉的情形.
        // 為了在網(wǎng)絡(luò)恢復(fù)之后保證數(shù)據(jù)一致性, 對(duì)于每一批 entry, 需要清除邏輯上排在這批數(shù)據(jù)之后的 entry.
        start := w.entryKey(gid, t, i+1)
        prefix := w.prefix(gid)
        // ...
        
        // 逐一清除
        for itr.Seek(start); itr.ValidForPrefix(prefix); itr.Next() {
            // ...
        }
    }
    if err := txn.CommitAt(1, nil); err != nil {
        return err
    }
    return nil
}
讀取
func (w *Wal) Snapshot(gid uint32) (snap raftpb.Snapshot, rerr error) {
    // ...
}
func (w *Wal) HardState(gid uint32) (hd raftpb.HardState, rerr error) {
    // ...
}
func (w *Wal) Entries(gid uint32, fromTerm, fromIndex uint64) (es []raftpb.Entry, rerr error) {
    // ...
}

1.3 關(guān)于badger

badger 來(lái)源于這篇論文 WiscKey: Separating Keys from Values in SSD-conscious Storage. .

知乎上僅有的評(píng)論里, 對(duì)它的評(píng)價(jià)不甚高 如何評(píng)價(jià) Badger (fast key-value storage) ??.

但不論怎樣, 它在一些情況下確實(shí)比較 , 也可能非常適合 dgraph 的使用場(chǎng)景.

2. conn

godoc

conn 充當(dāng)了 etcd/raft 的網(wǎng)絡(luò)傳輸層, 基于 gRPC 在 raft 節(jié)點(diǎn)之間同步信息.

2.1 Pool

看名字是個(gè)連接池, 實(shí)際上其中的 *grpc.ClienctConn 是復(fù)用的.

一旦創(chuàng)建, 會(huì)每隔 10 秒嘗試 ping 一下, 根據(jù)結(jié)果判斷當(dāng)前連接是否可用.

// "Pool" is used to manage the grpc client connection(s) for communicating with other
// worker instances.  Right now it just holds one of them.
type Pool struct {
    sync.RWMutex
    
    // 這段注釋說(shuō)明了 *grpc.ClientConn 可以服用的原因
    // A "pool" now consists of one connection.  gRPC uses HTTP2 transport to combine
    // messages in the same TCP stream.
    conn *grpc.ClientConn

    // 上一次 ping 請(qǐng)求成功的時(shí)間
    lastEcho time.Time
    
    // 目標(biāo)節(jié)點(diǎn)的地址
    Addr     string
    
    // 發(fā)起 ping 請(qǐng)求的 ticker
    ticker   *time.Ticker
}

2.2 Pools

Pools 維護(hù)了不同地址的 Pool.

這里是一個(gè)單例.

type Pools struct {
    sync.RWMutex
    all map[string]*Pool
}

var pi *Pools

func init() {
    pi = new(Pools)
    pi.all = make(map[string]*Pool)
}

2.3 Node

Node 的用于維護(hù) raft 成員節(jié)點(diǎn), 以及在各節(jié)點(diǎn)之間傳輸信息.職責(zé)包括:

初始化 / 讀取 當(dāng)前的 raft.Node
// SetRaft would set the provided raft.Node to this node.
// It would check fail if the node is already set.
func (n *Node) SetRaft(r raft.Node) {
    // ...
}

// Raft would return back the raft.Node stored in the node.
func (n *Node) Raft() raft.Node {
    // ...
}
維護(hù) ConfState 即節(jié)點(diǎn) id 列表
// SetConfState would store the latest ConfState generated by ApplyConfChange.
func (n *Node) SetConfState(cs *raftpb.ConfState) {
    // ...
}

// ConfState would return the latest ConfState stored in node.
func (n *Node) ConfState() *raftpb.ConfState {
    // ...
}
維護(hù) 節(jié)點(diǎn) id - 地址 的對(duì)應(yīng)關(guān)系
func (n *Node) Peer(pid uint64) (string, bool) {
    // ...
}

// addr must not be empty.
func (n *Node) SetPeer(pid uint64, addr string) {
    // ...
}

func (n *Node) DeletePeer(pid uint64) {
    // ...
}

// Connects the node and makes its peerPool refer to the constructed pool and address
// (possibly updating ourselves from the old address.)  (Unless pid is ourselves, in which
// case this does nothing.)
func (n *Node) Connect(pid uint64, addr string) {
    // ..
}
加入和移除節(jié)點(diǎn)
  • 加入節(jié)點(diǎn)

    // 這個(gè)函數(shù)是可以認(rèn)為是 AddCluster 的回調(diào)
    // 由使用者在收到 ConfChange 成功 apply 時(shí)主動(dòng)調(diào)用
    // dgraph 中調(diào)用這個(gè)方法的地方 err 都傳入了 nil
    func (n *Node) DoneConfChange(id uint64, err error) {
      n.Lock()
      defer n.Unlock()
      ch, has := n.confChanges[id]
      if !has {
          return
      }
      delete(n.confChanges, id)
      ch <- err
    }
    
    func (n *Node) AddToCluster(ctx context.Context, pid uint64) error {
      addr, ok := n.Peer(pid)
      // ...
      rcBytes, err := rc.Marshal()
      // ...
    
      ch := make(chan error, 1)
      // 這個(gè)函數(shù)中, 將 channel 和一個(gè)隨機(jī)生成的 id 映射起來(lái)
      // 并在向其他節(jié)點(diǎn)同步的信息中帶上這個(gè) id
      id := n.storeConfChange(ch)
      err = n.Raft().ProposeConfChange(ctx, raftpb.ConfChange{
          ID:      id,
          Type:    raftpb.ConfChangeAddNode,
          NodeID:  pid,
          Context: rcBytes,
      })
      if err != nil {
          return err
      }
      
      // 等待 ConfChange apply 成功的回調(diào)
      err = <-ch
      return err
    }
    
    
  • 移除節(jié)點(diǎn)

    func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error {
      // ...
      
      // 和 AddToCluster 類似, 這里需要等待 ConfChange 完成
      ch := make(chan error, 1)
      pid := n.storeConfChange(ch)
      err := n.Raft().ProposeConfChange(ctx, raftpb.ConfChange{
          ID:     pid,
          Type:   raftpb.ConfChangeRemoveNode,
          NodeID: id,
      })
      
      // ...
    
      return err
    }
    

    ?

節(jié)點(diǎn)間同步信息
func (n *Node) Send(m raftpb.Message) {
    // ...
    select {
    case n.messages <- sendmsg{to: m.To, data: data}:
        // pass
        
    // - -0 為什么這邊會(huì)有 ignore... 僅僅是為了不阻塞調(diào)用者么?
    default:
        // ignore
    }
}

// 所有通過(guò) n.messages 傳遞的信息都會(huì)積累到一定程度后一起發(fā)送
// 發(fā)往同一個(gè)節(jié)點(diǎn)的信息也會(huì)整合
func (n *Node) BatchAndSendMessages() {
    // 對(duì)同一個(gè)目標(biāo) id, 始終復(fù)用一個(gè) *bytes.Buffer
    batches := make(map[uint64]*bytes.Buffer)
    for {
        totalSize := 0
        sm := <-n.messages
    slurp_loop:
        for {
            // 如有必要, 初始化 *bytes.Buffer
            var buf *bytes.Buffer
            
            // ...
            
            // 先將當(dāng)前 data 的長(zhǎng)度寫入, 用做 message 之間的分隔.
            // 再寫入 data 本體
            // 因此每條 message 占用 4 + len(sm.data)
            totalSize += 4 + len(sm.data)
            x.Check(binary.Write(buf, binary.LittleEndian, uint32(len(sm.data))))
            x.Check2(buf.Write(sm.data))

            // 如果累積的數(shù)據(jù)量足夠大, 中斷此次匯集, 執(zhí)行發(fā)送
            if totalSize > messageBatchSoftLimit {
                // We limit the batch size, but we aren't pushing back on
                // n.messages, because the loop below spawns a goroutine
                // to do its dirty work.  This is good because right now
                // (*node).send fails(!) if the channel is full.
                break
            }

            // 如果沒(méi)有新的 message 傳入, 同樣中斷匯集執(zhí)行發(fā)送
            select {
            case sm = <-n.messages:
            default:
                break slurp_loop
            }
        }

        // 執(zhí)行發(fā)送
        for to, buf := range batches {
            if buf.Len() == 0 {
                continue
            }
            data := make([]byte, buf.Len())
            copy(data, buf.Bytes())
            go n.doSendMessage(to, data)
            
            // 重置 buf 供下一輪 message 匯集循環(huán)使用
            buf.Reset()
        }
    }
}

func (n *Node) doSendMessage(to uint64, data []byte) {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()

    // 獲取到指定節(jié)點(diǎn)的連接
    addr, has := n.Peer(to)
    pool, err := Get().Get(addr)
    if !has || err != nil {
        x.Printf("No healthy connection found to node Id: %d, err: %v\n", to, err)
        // No such peer exists or we got handed a bogus config (bad addr), so we
        // can't send messages to this peer.
        return
    }
    client := pool.Get()

    // ...

    ch := make(chan error, 1)
    go func() {
        _, err = c.RaftMessage(ctx, p)
        if err != nil {
            x.Printf("Error while sending message to node Id: %d, err: %v\n", to, err)
        }
        ch <- err
    }()

    // 超時(shí)或發(fā)送完成
    select {
    case <-ctx.Done():
        return
    case <-ch:
        // We don't need to do anything if we receive any error while sending message.
        // RAFT would automatically retry.
        return
    }
}
保存/恢復(fù) raft 數(shù)據(jù)
func (n *Node) SaveSnapshot(s raftpb.Snapshot) {
    // ...
}

func (n *Node) SaveToStorage(h raftpb.HardState, es []raftpb.Entry) {
    // ...
}

func (n *Node) InitFromWal(wal *raftwal.Wal) (idx uint64, restart bool, rerr error) {
    // ...
}

WaitForMinProposal

這里應(yīng)該是 LinearRead 相關(guān), 用來(lái)確認(rèn) Read 對(duì)應(yīng)的 message 已經(jīng) apply

func (n *Node) WaitForMinProposal(ctx context.Context, read *api.LinRead) error {
    if read == nil || read.Ids == nil {
        return nil
    }
    gid := n.RaftContext.Group
    min := read.Ids[gid]
    return n.Applied.WaitForMark(ctx, min)
}

2.4 RaftServer

RaftServer 是 gRPC service Raft 的實(shí)現(xiàn), 內(nèi)部是對(duì) Node 的操作.

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末昂拂,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子鼻听,更是在濱河造成了極大的恐慌精算,老刑警劉巖碎连,帶你破解...
    沈念sama閱讀 216,496評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件廉嚼,死亡現(xiàn)場(chǎng)離奇詭異倒戏,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)傍念,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門憋槐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)阳仔,“玉大人近范,你說(shuō)我怎么就攤上這事延蟹。” “怎么了稚照?”我有些...
    開(kāi)封第一講書人閱讀 162,632評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵上枕,是天一觀的道長(zhǎng)弱恒。 經(jīng)常有香客問(wèn)我返弹,道長(zhǎng),這世上最難降的妖魔是什么拉背? 我笑而不...
    開(kāi)封第一講書人閱讀 58,180評(píng)論 1 292
  • 正文 為了忘掉前任默终,我火速辦了婚禮齐蔽,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘诱渤。我一直安慰自己谈况,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,198評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布励烦。 她就那樣靜靜地躺著,像睡著了一般赊锚。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上耸袜,一...
    開(kāi)封第一講書人閱讀 51,165評(píng)論 1 299
  • 那天堤框,我揣著相機(jī)與錄音,去河邊找鬼启绰。 笑死委可,一個(gè)胖子當(dāng)著我的面吹牛腊嗡,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播卡者,決...
    沈念sama閱讀 40,052評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼崇决,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼嗽桩!你這毒婦竟也來(lái)了凄敢?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 38,910評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤扑庞,失蹤者是張志新(化名)和其女友劉穎罐氨,沒(méi)想到半個(gè)月后滩援,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,324評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,542評(píng)論 2 332
  • 正文 我和宋清朗相戀三年泣棋,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了畔塔。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鸯屿。...
    茶點(diǎn)故事閱讀 39,711評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡把敢,死狀恐怖技竟,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情熙尉,我是刑警寧澤检痰,帶...
    沈念sama閱讀 35,424評(píng)論 5 343
  • 正文 年R本政府宣布铅歼,位于F島的核電站换可,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏慨飘。R本人自食惡果不足惜译荞,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,017評(píng)論 3 326
  • 文/蒙蒙 一吞歼、第九天 我趴在偏房一處隱蔽的房頂上張望篙骡。 院中可真熱鬧,春花似錦糯俗、人聲如沸叶骨。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,668評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)今膊。三九已至伞剑,卻和暖如春黎泣,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背褐着。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,823評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工含蓉, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留馅扣,地道東北人着降。 一個(gè)月前我還...
    沈念sama閱讀 47,722評(píng)論 2 368
  • 正文 我出身青樓鹊碍,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親公罕。 傳聞我的和親對(duì)象是個(gè)殘疾皇子耀销,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,611評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容