主要參考資料
- 《etcd技術(shù)內(nèi)幕》
- 極客時間《etcd實戰(zhàn)課》
- blog https://gohalo.me/post/theme-database-etcd.html
主要目的
- 理順一些etcd實現(xiàn)上的細節(jié)問題
1. 概述
etcd的核心是raft協(xié)議的實現(xiàn)掏愁。整體來說,etcd的raft lib只實現(xiàn)了一個整體框架和核心邏輯(append log赫模、選主邏輯、snapshot汹想、成員變更等)高氮;但該庫沒有實現(xiàn)WAL、SnapShot撒汉、存儲轿钠、序列化巢钓、網(wǎng)絡(luò)(消息傳輸和接收)等。
所以整體上etcd中模塊疗垛、模塊之間的通信比較多症汹,甚至有一些同名的模塊,導致整體上代碼實現(xiàn)難度大贷腕,閱讀難度大烈菌。
比較核心的類有:
- raft.Node:raft node,raft協(xié)議的主要邏輯花履,其中 raftlog是存儲在內(nèi)存中的,持久化依賴于wal和snapshot
- storge:日志應(yīng)用后的數(shù)據(jù)存儲
- wal:write ahead log用于處理持久化
- snapshot:快照
- 網(wǎng)絡(luò)傳輸
node raft.Node
raftStorage *raft.MemoryStorage
wal *wal.WAL
snapshotter *snap.Snapshotter
snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready
snapCount uint64
transport *rafthttp.Transport
raft.Node
type Node interface {
Tick() -> 觸發(fā)一次Tick挚赊,會觸發(fā)Node心跳或者選舉
Campaign(ctx context.Context) error -> 觸發(fā)一次選舉
Propose(ctx context.Context, data []byte) error -> 進行一次提案
ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error -> 成員變更提案
Step(ctx context.Context, msg pb.Message) error -> 處理msg
Ready() <-chan Ready
Advance()
ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState
TransferLeadership(ctx context.Context, lead, transferee uint64)
ReadIndex(ctx context.Context, rctx []byte) error
Status() Status
ReportUnreachable(id uint64)
ReportSnapshot(id uint64, status SnapshotStatus)
Stop()
}
raft.Storage
type Storage interface {
InitialState() (pb.HardState, pb.ConfState, error)
Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
Term(i uint64) (uint64, error)
LastIndex() (uint64, error)
FirstIndex() (uint64, error)
Snapshot() (pb.Snapshot, error)
}
2. ready / advance
對于這種 IO 網(wǎng)絡(luò)密集型的應(yīng)用诡壁,提高吞吐最好的手段就是批量操作,ETCD 與之相關(guān)的核心抽象就是 Ready 結(jié)構(gòu)體荠割。
type Ready struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
*SoftState
// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
pb.HardState
// ReadStates can be used for node to serve linearizable read requests locally
// when its applied index is greater than the index in ReadState.
// Note that the readState will be returned when raft receives msgReadIndex.
// The returned is only valid for the request that requested to read.
ReadStates []ReadState
// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
// 通過raftLog.unstableEntries()讀取的是raftLog.unstable.entries中的數(shù)據(jù)
Entries []pb.Entry
// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot pb.Snapshot
// CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable
// store.
// 包括了所有已經(jīng)持久化到日志但是還沒有應(yīng)用到狀態(tài)機的數(shù)據(jù)
CommittedEntries []pb.Entry
// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
// 包含了應(yīng)該發(fā)送給對端的數(shù)據(jù)妹卿,也就是直接讀取的raft.msgs[]中緩存的數(shù)據(jù)
Messages []pb.Message
// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
}
- 什么時候可以讀。ReadState 用來支持 Linearizable Read蔑鹦。
- 需要持久化的狀態(tài)夺克。HardState、Entries 需要在正式發(fā)送數(shù)據(jù)之前持久化嚎朽。
- Snapshot 需要執(zhí)行SnapShot的數(shù)據(jù)铺纽。
- CommittedEntries 已經(jīng)提交的數(shù)據(jù),可以應(yīng)用到狀態(tài)機哟忍。
- Messages 需要發(fā)送到其它機器的消息狡门。需要在處理完持久化數(shù)據(jù)之后處理。
應(yīng)用需要對 Ready 的處理包括:
- 將 HardState锅很、Entries其馏、Snapshot 持久化到 storage;
- 將 Messages 非阻塞的廣播給其他 peers爆安;
- 將 CommittedEntries (已經(jīng)提交但是還沒有應(yīng)用的日志) 應(yīng)用到狀態(tài)機叛复;
- 如果發(fā)現(xiàn) CommittedEntries 中有成員變更類型的 entry,則調(diào)用 node 的 ApplyConfChange() 方法讓 node 知道;
- 調(diào)用 Node.Advance() 告訴 raft node 這批狀態(tài)更新處理完褐奥,狀態(tài)已經(jīng)演進了咖耘,可以給我下一批 Ready 讓我處理了。
這里的一個問題是entries和messages的區(qū)別是什么抖僵?
entry是日志條目鲤看,messages是需要發(fā)送到peer的msg。當接受到消息的時候耍群,可能會同時產(chǎn)生entry和msg义桂。
在raft-example中,ready的處理:
// store raft entries to wal, then publish over commit channel
case rd := <-rc.node.Ready():
rc.wal.Save(rd.HardState, rd.Entries)
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
rc.raftStorage.Append(rd.Entries)
rc.transport.Send(rd.Messages)
applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
if !ok {
rc.stop()
return
}
rc.maybeTriggerSnapshot(applyDoneC)
rc.node.Advance()
可以看出來蹈垢,對entries的處理主要是save到wal慷吊,然后append到raftStorage;對msg的處理是通過transport發(fā)送出去曹抬。
為什么會調(diào)用rc.raftStorage.Append(rd.Entries)
溉瓶,這又牽扯到raftLog的數(shù)據(jù)結(jié)構(gòu),raftLog的持久化是由上層來做的谤民,raft.Node做不了堰酿,因為持久化、快照的代碼模塊與raft核心模塊是分離的张足。
對于剛剛接收到的 Entry記錄首先都會被存儲在unstable中 触创。然后按照Raft協(xié)議將unstable中緩存的這些 Entry 記錄交給上層模塊進行處理,上層模塊會將這些 Entry 記錄發(fā)送到集群其他節(jié)點或進行保存( 寫入 Storage
中)为牍。之后哼绑,上層模塊會調(diào)用 Advance()方法通知底層的 etcd-raft模塊將 unstable 中對應(yīng)的 Entry
記錄刪除(因為己經(jīng)保存到了 Storage 中。正因為 unstable 中保存的 Entry 記錄并未進行持久化碉咆,
可能會因節(jié)點故障而意外丟失抖韩,所以被稱為unstable。
另一個需要注意的點是rc.wal.Save(rd.HardState, rd.Entries)
疫铜,每次ready的處理都需要保存一下當時的rd.HardState茂浮。
如何生成ready
什么時候應(yīng)該處理ready?
- 比較容易理解的地方是有待處理的msgs壳咕、unstableEntries励稳、committedEntries
- softState和hardState發(fā)生變化?囱井?驹尼??庞呕?
func (rn *RawNode) HasReady() bool {
r := rn.raft
if !r.softState().equal(rn.prevSoftSt) {
return true
}
if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {
return true
}
if r.raftLog.hasPendingSnapshot() {
return true
}
if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {
return true
}
if len(r.readStates) != 0 {
return true
}
return false
}
ready處理完之后調(diào)用advance更新相關(guān)的索引位置新翎,具體可以看代碼程帕,沒有特別的地方。
3. 如何處理成員變更
參考文檔
關(guān)鍵函數(shù)是raft.applyConfChange
啟動時地啰,會調(diào)用apply
for _, peer := range peers {
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}.AsV2())
}
用戶主動請求變更的時候
node.ProposeConfChange
-> node.Step(Prop msg)
-> 向propc中寫msg
node.Run中:
case pm := <-propc:
m := pm.m
m.From = r.id
err := r.Step(m)
if pm.result != nil {
pm.result <- err
close(pm.result)
}
調(diào)用r.step愁拭,對于stepFollow會轉(zhuǎn)發(fā)msg給leader,對于leader來說亏吝,alreadyPending來保證同一時刻僅有一個成員變更請求岭埠,然后將成員變更的entries寫到unstable中,同時調(diào)用r.bcastAppend()蔚鸥,生成send到peer的MsgApp-msg 惜论,等待上層處理。
for i := range m.Entries {
e := &m.Entries[i]
var cc pb.ConfChangeI
if e.Type == pb.EntryConfChange {
var ccc pb.ConfChange
if err := ccc.Unmarshal(e.Data); err != nil {
panic(err)
}
cc = ccc
} else if e.Type == pb.EntryConfChangeV2 {
var ccc pb.ConfChangeV2
if err := ccc.Unmarshal(e.Data); err != nil {
panic(err)
}
cc = ccc
}
if cc != nil {
alreadyPending := r.pendingConfIndex > r.raftLog.applied
alreadyJoint := len(r.prs.Config.Voters[1]) > 0
wantsLeaveJoint := len(cc.AsV2().Changes) == 0
var refused string
if alreadyPending {
refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
} else if alreadyJoint && !wantsLeaveJoint {
refused = "must transition out of joint config first"
} else if !alreadyJoint && wantsLeaveJoint {
refused = "not in joint state; refusing empty conf change"
}
if refused != "" {
r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
}
}
}
if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend()
整個待處理的entries和msg包裝成ready struct止喷,交給上層處理馆类,等到對應(yīng)的entries被committed,則可以處理弹谁,主要的操作有兩個:調(diào)用raftNode的ApplyConfChange乾巧,然后transport也去除對應(yīng)的peer:
for i := range ents {
switch ents[i].Type {
case raftpb.EntryNormal:
if len(ents[i].Data) == 0 {
// ignore empty messages
break
}
s := string(ents[i].Data)
data = append(data, s)
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
cc.Unmarshal(ents[i].Data)
rc.confState = *rc.node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
if len(cc.Context) > 0 {
rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)})
}
case raftpb.ConfChangeRemoveNode:
if cc.NodeID == uint64(rc.id) {
log.Println("I've been removed from the cluster! Shutting down.")
return nil, false
}
rc.transport.RemovePeer(types.ID(cc.NodeID))
}
}
}
可以看到主要最終調(diào)用的函數(shù)是raft.applyConfChange
func (n *node) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState {
var cs pb.ConfState
select {
case n.confc <- cc.AsV2():
case <-n.done:
}
select {
case cs = <-n.confstatec:
case <-n.done:
}
return &cs
}
------------------------
case cc := <-n.confc:
_, okBefore := r.prs.Progress[r.id]
cs := r.applyConfChange(cc)
// If the node was removed, block incoming proposals. Note that we
// only do this if the node was in the config before. Nodes may be
// a member of the group without knowing this (when they're catching
// up on the log and don't have the latest config) and we don't want
// to block the proposal channel in that case.
//
// NB: propc is reset when the leader changes, which, if we learn
// about it, sort of implies that we got readded, maybe? This isn't
// very sound and likely has bugs.
if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {
var found bool
outer:
for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} {
for _, id := range sl {
if id == r.id {
found = true
break outer
}
}
}
if !found {
propc = nil
}
}
select {
case n.confstatec <- cs:
case <-n.done:
}
raft.applyConfChange
對于一次最多一個成員變更的情況下,主要做的操作是生成新的peer预愤,設(shè)置其next和matched字段沟于。
3. 上層是如何把多個模塊組合協(xié)作起來的
啟動
有兩種啟動方式,start或者Restart植康。
oldwal := wal.Exist(rc.waldir)
if oldwal || rc.join {
rc.node = raft.RestartNode(c)
} else {
rc.node = raft.StartNode(c, rpeers)
}
根據(jù)是否存在wal文件來判斷restart還是start社裆,start會多調(diào)用bootstrap來處理成員變更信息。
// RestartNode is similar to StartNode but does not take a list of peers.
// The current membership of the cluster will be restored from the Storage.
// If the caller has an existing state machine, pass in the last log index that
// has been applied to it; otherwise use zero.
func RestartNode(c *Config) Node {
rn, err := NewRawNode(c)
if err != nil {
panic(err)
}
n := newNode(rn)
go n.run()
return &n
}
restart會復用wal和快照信息向图,那么如何使用快照信息來恢復etcd-node?
if !fileutil.Exist(rc.snapdir) {
if err := os.Mkdir(rc.snapdir, 0750); err != nil {
log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
}
}
rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)
oldwal := wal.Exist(rc.waldir)
rc.wal = rc.replayWAL()
// signal replay has finished
rc.snapshotterReady <- rc.snapshotter
snapshotter對象生成之后标沪,會發(fā)送channel榄攀,通知snapshot模塊根據(jù)snap恢復storage;這里需要考慮的問題是存在多個snap的情況下金句,選擇最新的未損壞snap來使用檩赢。這里主要是根據(jù)snap的名字來判斷fname := fmt.Sprintf("%016x-%016x%s", snapshot.Metadata.Term, snapshot.Metadata.Index, snapSuffix)
snapshot, err := s.loadSnapshot()
if err != nil {
log.Panic(err)
}
if snapshot != nil {
log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
log.Panic(err)
}
}
然后wal模塊replayWAL:
WAL 記錄類型目前支持 5 種,分別是文件元數(shù)據(jù)記錄违寞、日志條目記錄贞瞒、狀態(tài)信息記錄、CRC 記錄趁曼、快照記錄:
- 文件元數(shù)據(jù)記錄包含節(jié)點 ID军浆、集群 ID 信息,它在 WAL 文件創(chuàng)建的時候?qū)懭耄?/li>
- 日志條目記錄包含 Raft 日志信息挡闰,如 put 提案內(nèi)容乒融;
- 狀態(tài)信息記錄掰盘,包含集群的任期號、節(jié)點投票信息等赞季,一個日志文件中會有多條愧捕,以最后的記錄為準;
- CRC 記錄包含上一個 WAL 文件的最后的 CRC(循環(huán)冗余校驗碼)信息申钩, 在創(chuàng)建次绘、切割 WAL 文件時,作為第一條記錄寫入到新的 WAL 文件撒遣, 用于校驗數(shù)據(jù)文件的完整性邮偎、準確性等;
- 快照記錄包含快照的任期號愉舔、日志索引信息钢猛,用于檢查快照文件的準確性。
func (rc *raftNode) replayWAL() *wal.WAL {
log.Printf("replaying WAL of member %d", rc.id)
snapshot := rc.loadSnapshot()
w := rc.openWAL(snapshot)
_, st, ents, err := w.ReadAll()
if err != nil {
log.Fatalf("raftexample: failed to read WAL (%v)", err)
}
rc.raftStorage = raft.NewMemoryStorage()
if snapshot != nil {
rc.raftStorage.ApplySnapshot(*snapshot)
}
rc.raftStorage.SetHardState(st)
// append to storage so raft starts at the right place in log
rc.raftStorage.Append(ents)
return w
}
首先是獲取根據(jù)wal中的快照記錄和state記錄轩缤,找出所有小于state.commit的快照命迈,然后 loads the newest snapshot available that is in walSnaps。
// filter out any snaps that are newer than the committed hardstate
n := 0
for _, s := range snaps {
if s.Index <= state.Commit {
snaps[n] = s
n++
}
}
snaps = snaps[:n:n]
return snaps, nil
然后根據(jù)快照數(shù)據(jù)火的,找出比snap.Index的所有wal entries壶愤,然后將所有的ents append到raftLog中。