etcd學習筆記(一): raft-example

主要參考資料

  1. 《etcd技術(shù)內(nèi)幕》
  2. 極客時間《etcd實戰(zhàn)課》
  3. blog https://gohalo.me/post/theme-database-etcd.html

主要目的

  1. 理順一些etcd實現(xiàn)上的細節(jié)問題

1. 概述

etcd的核心是raft協(xié)議的實現(xiàn)掏愁。整體來說,etcd的raft lib只實現(xiàn)了一個整體框架和核心邏輯(append log赫模、選主邏輯、snapshot汹想、成員變更等)高氮;但該庫沒有實現(xiàn)WAL、SnapShot撒汉、存儲轿钠、序列化巢钓、網(wǎng)絡(luò)(消息傳輸和接收)等。

所以整體上etcd中模塊疗垛、模塊之間的通信比較多症汹,甚至有一些同名的模塊,導致整體上代碼實現(xiàn)難度大贷腕,閱讀難度大烈菌。

比較核心的類有:

  • raft.Node:raft node,raft協(xié)議的主要邏輯花履,其中 raftlog是存儲在內(nèi)存中的,持久化依賴于wal和snapshot
  • storge:日志應(yīng)用后的數(shù)據(jù)存儲
  • wal:write ahead log用于處理持久化
  • snapshot:快照
  • 網(wǎng)絡(luò)傳輸
    node        raft.Node
    raftStorage *raft.MemoryStorage
    wal         *wal.WAL

    snapshotter      *snap.Snapshotter
    snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready
    snapCount uint64
    transport *rafthttp.Transport

raft.Node

type Node interface {
    Tick() -> 觸發(fā)一次Tick挚赊,會觸發(fā)Node心跳或者選舉
    Campaign(ctx context.Context) error -> 觸發(fā)一次選舉
    Propose(ctx context.Context, data []byte) error -> 進行一次提案
    ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error -> 成員變更提案
    Step(ctx context.Context, msg pb.Message) error -> 處理msg

    Ready() <-chan Ready
    Advance()

    ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState
    TransferLeadership(ctx context.Context, lead, transferee uint64)
    ReadIndex(ctx context.Context, rctx []byte) error

    Status() Status
    ReportUnreachable(id uint64)
    ReportSnapshot(id uint64, status SnapshotStatus)
    Stop()
}

raft.Storage

type Storage interface {
    InitialState() (pb.HardState, pb.ConfState, error)
    Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
    Term(i uint64) (uint64, error)

    LastIndex() (uint64, error)
    FirstIndex() (uint64, error)
    Snapshot() (pb.Snapshot, error)
}

2. ready / advance

對于這種 IO 網(wǎng)絡(luò)密集型的應(yīng)用诡壁,提高吞吐最好的手段就是批量操作,ETCD 與之相關(guān)的核心抽象就是 Ready 結(jié)構(gòu)體荠割。

type Ready struct {
    // The current volatile state of a Node.
    // SoftState will be nil if there is no update.
    // It is not required to consume or store SoftState.
    *SoftState 

    // The current state of a Node to be saved to stable storage BEFORE
    // Messages are sent.
    // HardState will be equal to empty state if there is no update.
    pb.HardState

    // ReadStates can be used for node to serve linearizable read requests locally
    // when its applied index is greater than the index in ReadState.
    // Note that the readState will be returned when raft receives msgReadIndex.
    // The returned is only valid for the request that requested to read.
    ReadStates []ReadState

    // Entries specifies entries to be saved to stable storage BEFORE
    // Messages are sent.
  // 通過raftLog.unstableEntries()讀取的是raftLog.unstable.entries中的數(shù)據(jù)
    Entries []pb.Entry

    // Snapshot specifies the snapshot to be saved to stable storage.
    Snapshot pb.Snapshot

    // CommittedEntries specifies entries to be committed to a
    // store/state-machine. These have previously been committed to stable
    // store.
// 包括了所有已經(jīng)持久化到日志但是還沒有應(yīng)用到狀態(tài)機的數(shù)據(jù)
    CommittedEntries []pb.Entry

    // Messages specifies outbound messages to be sent AFTER Entries are
    // committed to stable storage.
    // If it contains a MsgSnap message, the application MUST report back to raft
    // when the snapshot has been received or has failed by calling ReportSnapshot.
 // 包含了應(yīng)該發(fā)送給對端的數(shù)據(jù)妹卿,也就是直接讀取的raft.msgs[]中緩存的數(shù)據(jù)
    Messages []pb.Message

    // MustSync indicates whether the HardState and Entries must be synchronously
    // written to disk or if an asynchronous write is permissible.
    MustSync bool
}
  • 什么時候可以讀。ReadState 用來支持 Linearizable Read蔑鹦。
  • 需要持久化的狀態(tài)夺克。HardState、Entries 需要在正式發(fā)送數(shù)據(jù)之前持久化嚎朽。
  • Snapshot 需要執(zhí)行SnapShot的數(shù)據(jù)铺纽。
  • CommittedEntries 已經(jīng)提交的數(shù)據(jù),可以應(yīng)用到狀態(tài)機哟忍。
  • Messages 需要發(fā)送到其它機器的消息狡门。需要在處理完持久化數(shù)據(jù)之后處理。

應(yīng)用需要對 Ready 的處理包括:

  1. 將 HardState锅很、Entries其馏、Snapshot 持久化到 storage;
  2. 將 Messages 非阻塞的廣播給其他 peers爆安;
  3. 將 CommittedEntries (已經(jīng)提交但是還沒有應(yīng)用的日志) 應(yīng)用到狀態(tài)機叛复;
  4. 如果發(fā)現(xiàn) CommittedEntries 中有成員變更類型的 entry,則調(diào)用 node 的 ApplyConfChange() 方法讓 node 知道;
  5. 調(diào)用 Node.Advance() 告訴 raft node 這批狀態(tài)更新處理完褐奥,狀態(tài)已經(jīng)演進了咖耘,可以給我下一批 Ready 讓我處理了。

這里的一個問題是entries和messages的區(qū)別是什么抖僵?

entry是日志條目鲤看,messages是需要發(fā)送到peer的msg。當接受到消息的時候耍群,可能會同時產(chǎn)生entry和msg义桂。

在raft-example中,ready的處理:

        // store raft entries to wal, then publish over commit channel
        case rd := <-rc.node.Ready():
            rc.wal.Save(rd.HardState, rd.Entries)
            if !raft.IsEmptySnap(rd.Snapshot) {
                rc.saveSnap(rd.Snapshot)
                rc.raftStorage.ApplySnapshot(rd.Snapshot)
                rc.publishSnapshot(rd.Snapshot)
            }
            rc.raftStorage.Append(rd.Entries)
            rc.transport.Send(rd.Messages)
            applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
            if !ok {
                rc.stop()
                return
            }
            rc.maybeTriggerSnapshot(applyDoneC)
            rc.node.Advance()

可以看出來蹈垢,對entries的處理主要是save到wal慷吊,然后append到raftStorage;對msg的處理是通過transport發(fā)送出去曹抬。

為什么會調(diào)用rc.raftStorage.Append(rd.Entries)溉瓶,這又牽扯到raftLog的數(shù)據(jù)結(jié)構(gòu),raftLog的持久化是由上層來做的谤民,raft.Node做不了堰酿,因為持久化、快照的代碼模塊與raft核心模塊是分離的张足。

image.png

對于剛剛接收到的 Entry記錄首先都會被存儲在unstable中 触创。然后按照Raft協(xié)議將unstable中緩存的這些 Entry 記錄交給上層模塊進行處理,上層模塊會將這些 Entry 記錄發(fā)送到集群其他節(jié)點或進行保存( 寫入 Storage
中)
为牍。之后哼绑,上層模塊會調(diào)用 Advance()方法通知底層的 etcd-raft模塊將 unstable 中對應(yīng)的 Entry
記錄刪除(因為己經(jīng)保存到了 Storage 中。正因為 unstable 中保存的 Entry 記錄并未進行持久化碉咆,
可能會因節(jié)點故障而意外丟失抖韩,所以被稱為unstable。

另一個需要注意的點是rc.wal.Save(rd.HardState, rd.Entries)疫铜,每次ready的處理都需要保存一下當時的rd.HardState茂浮。

如何生成ready

什么時候應(yīng)該處理ready?

  1. 比較容易理解的地方是有待處理的msgs壳咕、unstableEntries励稳、committedEntries
  2. softState和hardState發(fā)生變化?囱井?驹尼??庞呕?
func (rn *RawNode) HasReady() bool {
    r := rn.raft
    if !r.softState().equal(rn.prevSoftSt) {
        return true
    }
    if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {
        return true
    }
    if r.raftLog.hasPendingSnapshot() {
        return true
    }
    if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {
        return true
    }
    if len(r.readStates) != 0 {
        return true
    }
    return false
}

ready處理完之后調(diào)用advance更新相關(guān)的索引位置新翎,具體可以看代碼程帕,沒有特別的地方。

3. 如何處理成員變更

參考文檔

  1. https://zhuanlan.zhihu.com/p/27908888
  2. https://segmentfault.com/a/1190000022796386

關(guān)鍵函數(shù)是raft.applyConfChange

啟動時地啰,會調(diào)用apply

    for _, peer := range peers {
        rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}.AsV2())
    }

用戶主動請求變更的時候

node.ProposeConfChange
-> node.Step(Prop msg)
  -> 向propc中寫msg

node.Run中:

        case pm := <-propc:
            m := pm.m
            m.From = r.id
            err := r.Step(m)
            if pm.result != nil {
                pm.result <- err
                close(pm.result)
            }

調(diào)用r.step愁拭,對于stepFollow會轉(zhuǎn)發(fā)msg給leader,對于leader來說亏吝,alreadyPending來保證同一時刻僅有一個成員變更請求岭埠,然后將成員變更的entries寫到unstable中,同時調(diào)用r.bcastAppend()蔚鸥,生成send到peer的MsgApp-msg 惜论,等待上層處理。

        for i := range m.Entries {
            e := &m.Entries[i]
            var cc pb.ConfChangeI
            if e.Type == pb.EntryConfChange {
                var ccc pb.ConfChange
                if err := ccc.Unmarshal(e.Data); err != nil {
                    panic(err)
                }
                cc = ccc
            } else if e.Type == pb.EntryConfChangeV2 {
                var ccc pb.ConfChangeV2
                if err := ccc.Unmarshal(e.Data); err != nil {
                    panic(err)
                }
                cc = ccc
            }
            if cc != nil {
                alreadyPending := r.pendingConfIndex > r.raftLog.applied
                alreadyJoint := len(r.prs.Config.Voters[1]) > 0
                wantsLeaveJoint := len(cc.AsV2().Changes) == 0

                var refused string
                if alreadyPending {
                    refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
                } else if alreadyJoint && !wantsLeaveJoint {
                    refused = "must transition out of joint config first"
                } else if !alreadyJoint && wantsLeaveJoint {
                    refused = "not in joint state; refusing empty conf change"
                }

                if refused != "" {
                    r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
                    m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
                } else {
                    r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
                }
            }
        }

        if !r.appendEntry(m.Entries...) {
            return ErrProposalDropped
        }
        r.bcastAppend()

整個待處理的entries和msg包裝成ready struct止喷,交給上層處理馆类,等到對應(yīng)的entries被committed,則可以處理弹谁,主要的操作有兩個:調(diào)用raftNode的ApplyConfChange乾巧,然后transport也去除對應(yīng)的peer:

    for i := range ents {
        switch ents[i].Type {
        case raftpb.EntryNormal:
            if len(ents[i].Data) == 0 {
                // ignore empty messages
                break
            }
            s := string(ents[i].Data)
            data = append(data, s)
        case raftpb.EntryConfChange:
            var cc raftpb.ConfChange
            cc.Unmarshal(ents[i].Data)
            rc.confState = *rc.node.ApplyConfChange(cc)
            switch cc.Type {
            case raftpb.ConfChangeAddNode:
                if len(cc.Context) > 0 {
                    rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)})
                }
            case raftpb.ConfChangeRemoveNode:
                if cc.NodeID == uint64(rc.id) {
                    log.Println("I've been removed from the cluster! Shutting down.")
                    return nil, false
                }
                rc.transport.RemovePeer(types.ID(cc.NodeID))
            }
        }
    }

可以看到主要最終調(diào)用的函數(shù)是raft.applyConfChange

func (n *node) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState {
    var cs pb.ConfState
    select {
    case n.confc <- cc.AsV2():
    case <-n.done:
    }
    select {
    case cs = <-n.confstatec:
    case <-n.done:
    }
    return &cs
}
------------------------


    case cc := <-n.confc:
            _, okBefore := r.prs.Progress[r.id]
            cs := r.applyConfChange(cc)
            // If the node was removed, block incoming proposals. Note that we
            // only do this if the node was in the config before. Nodes may be
            // a member of the group without knowing this (when they're catching
            // up on the log and don't have the latest config) and we don't want
            // to block the proposal channel in that case.
            //
            // NB: propc is reset when the leader changes, which, if we learn
            // about it, sort of implies that we got readded, maybe? This isn't
            // very sound and likely has bugs.
            if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {
                var found bool
            outer:
                for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} {
                    for _, id := range sl {
                        if id == r.id {
                            found = true
                            break outer
                        }
                    }
                }
                if !found {
                    propc = nil
                }
            }
            select {
            case n.confstatec <- cs:
            case <-n.done:
            }

raft.applyConfChange

對于一次最多一個成員變更的情況下,主要做的操作是生成新的peer预愤,設(shè)置其next和matched字段沟于。

3. 上層是如何把多個模塊組合協(xié)作起來的

啟動

有兩種啟動方式,start或者Restart植康。

oldwal := wal.Exist(rc.waldir)
    if oldwal || rc.join {
        rc.node = raft.RestartNode(c)
    } else {
        rc.node = raft.StartNode(c, rpeers)
    }

根據(jù)是否存在wal文件來判斷restart還是start社裆,start會多調(diào)用bootstrap來處理成員變更信息。

// RestartNode is similar to StartNode but does not take a list of peers.
// The current membership of the cluster will be restored from the Storage.
// If the caller has an existing state machine, pass in the last log index that
// has been applied to it; otherwise use zero.
func RestartNode(c *Config) Node {
    rn, err := NewRawNode(c)
    if err != nil {
        panic(err)
    }
    n := newNode(rn)
    go n.run()
    return &n
}

restart會復用wal和快照信息向图,那么如何使用快照信息來恢復etcd-node?

    if !fileutil.Exist(rc.snapdir) {
        if err := os.Mkdir(rc.snapdir, 0750); err != nil {
            log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
        }
    }
    rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)

    oldwal := wal.Exist(rc.waldir)
    rc.wal = rc.replayWAL()

    // signal replay has finished
    rc.snapshotterReady <- rc.snapshotter

snapshotter對象生成之后标沪,會發(fā)送channel榄攀,通知snapshot模塊根據(jù)snap恢復storage;這里需要考慮的問題是存在多個snap的情況下金句,選擇最新的未損壞snap來使用檩赢。這里主要是根據(jù)snap的名字來判斷fname := fmt.Sprintf("%016x-%016x%s", snapshot.Metadata.Term, snapshot.Metadata.Index, snapSuffix)

    snapshot, err := s.loadSnapshot()
    if err != nil {
        log.Panic(err)
    }
    if snapshot != nil {
        log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
        if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
            log.Panic(err)
        }
    }

然后wal模塊replayWAL:

WAL 記錄類型目前支持 5 種,分別是文件元數(shù)據(jù)記錄违寞、日志條目記錄贞瞒、狀態(tài)信息記錄、CRC 記錄趁曼、快照記錄:

  • 文件元數(shù)據(jù)記錄包含節(jié)點 ID军浆、集群 ID 信息,它在 WAL 文件創(chuàng)建的時候?qū)懭耄?/li>
  • 日志條目記錄包含 Raft 日志信息挡闰,如 put 提案內(nèi)容乒融;
  • 狀態(tài)信息記錄掰盘,包含集群的任期號、節(jié)點投票信息等赞季,一個日志文件中會有多條愧捕,以最后的記錄為準;
  • CRC 記錄包含上一個 WAL 文件的最后的 CRC(循環(huán)冗余校驗碼)信息申钩, 在創(chuàng)建次绘、切割 WAL 文件時,作為第一條記錄寫入到新的 WAL 文件撒遣, 用于校驗數(shù)據(jù)文件的完整性邮偎、準確性等;
  • 快照記錄包含快照的任期號愉舔、日志索引信息钢猛,用于檢查快照文件的準確性。
func (rc *raftNode) replayWAL() *wal.WAL {
    log.Printf("replaying WAL of member %d", rc.id)
    snapshot := rc.loadSnapshot()
    w := rc.openWAL(snapshot)
    _, st, ents, err := w.ReadAll()
    if err != nil {
        log.Fatalf("raftexample: failed to read WAL (%v)", err)
    }
    rc.raftStorage = raft.NewMemoryStorage()
    if snapshot != nil {
        rc.raftStorage.ApplySnapshot(*snapshot)
    }
    rc.raftStorage.SetHardState(st)

    // append to storage so raft starts at the right place in log
    rc.raftStorage.Append(ents)

    return w
}

首先是獲取根據(jù)wal中的快照記錄和state記錄轩缤,找出所有小于state.commit的快照命迈,然后 loads the newest snapshot available that is in walSnaps。


    // filter out any snaps that are newer than the committed hardstate
    n := 0
    for _, s := range snaps {
        if s.Index <= state.Commit {
            snaps[n] = s
            n++
        }
    }
    snaps = snaps[:n:n]
    return snaps, nil

然后根據(jù)快照數(shù)據(jù)火的,找出比snap.Index的所有wal entries壶愤,然后將所有的ents append到raftLog中。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末馏鹤,一起剝皮案震驚了整個濱河市征椒,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌湃累,老刑警劉巖勃救,帶你破解...
    沈念sama閱讀 212,332評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異治力,居然都是意外死亡蒙秒,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,508評論 3 385
  • 文/潘曉璐 我一進店門宵统,熙熙樓的掌柜王于貴愁眉苦臉地迎上來晕讲,“玉大人,你說我怎么就攤上這事马澈「剩” “怎么了锈锤?”我有些...
    開封第一講書人閱讀 157,812評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我骨望,道長屠升,這世上最難降的妖魔是什么靴寂? 我笑而不...
    開封第一講書人閱讀 56,607評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮荆永,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘国章。我一直安慰自己具钥,他們只是感情好,可當我...
    茶點故事閱讀 65,728評論 6 386
  • 文/花漫 我一把揭開白布液兽。 她就那樣靜靜地躺著骂删,像睡著了一般。 火紅的嫁衣襯著肌膚如雪四啰。 梳的紋絲不亂的頭發(fā)上宁玫,一...
    開封第一講書人閱讀 49,919評論 1 290
  • 那天,我揣著相機與錄音柑晒,去河邊找鬼欧瘪。 笑死,一個胖子當著我的面吹牛匙赞,可吹牛的內(nèi)容都是我干的佛掖。 我是一名探鬼主播,決...
    沈念sama閱讀 39,071評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼涌庭,長吁一口氣:“原來是場噩夢啊……” “哼芥被!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起坐榆,我...
    開封第一講書人閱讀 37,802評論 0 268
  • 序言:老撾萬榮一對情侶失蹤拴魄,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后席镀,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體匹中,經(jīng)...
    沈念sama閱讀 44,256評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,576評論 2 327
  • 正文 我和宋清朗相戀三年豪诲,在試婚紗的時候發(fā)現(xiàn)自己被綠了顶捷。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,712評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡跛溉,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出扮授,到底是詐尸還是另有隱情芳室,我是刑警寧澤,帶...
    沈念sama閱讀 34,389評論 4 332
  • 正文 年R本政府宣布刹勃,位于F島的核電站堪侯,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏荔仁。R本人自食惡果不足惜伍宦,卻給世界環(huán)境...
    茶點故事閱讀 40,032評論 3 316
  • 文/蒙蒙 一芽死、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧次洼,春花似錦关贵、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至亥啦,卻和暖如春炭剪,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背翔脱。 一陣腳步聲響...
    開封第一講書人閱讀 32,026評論 1 266
  • 我被黑心中介騙來泰國打工奴拦, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人届吁。 一個月前我還...
    沈念sama閱讀 46,473評論 2 360
  • 正文 我出身青樓错妖,卻偏偏與公主長得像,于是被迫代替她去往敵國和親瓷产。 傳聞我的和親對象是個殘疾皇子站玄,可洞房花燭夜當晚...
    茶點故事閱讀 43,606評論 2 350

推薦閱讀更多精彩內(nèi)容