本次代碼閱讀基于commit 189fdd3
1. raftwal
之前提到, etcd/raft
提供了 MemoryStorage
+ wal
的方式 來(lái)對(duì) raft 中的 HardState, Snapshot 和 Entry 進(jìn)行持久化. wal
將數(shù)據(jù)直接寫入文件.
而對(duì)于 dgraph 來(lái)說(shuō), 它的一個(gè)物理節(jié)點(diǎn)上有多個(gè) raft group, 且 raft group 會(huì)自動(dòng)新建. 此時(shí), 所有 raft group 使用同一套底層存儲(chǔ)會(huì)相對(duì)簡(jiǎn)單一些.
本包中, dgraph 使用 badger 這個(gè)同屬 dgraph-io 出品的 kv 數(shù)據(jù)庫(kù)來(lái)保存所有 raft group 的日志.
1.1 Keys
既然不同 raft 的日志都存在同一個(gè) kv 數(shù)據(jù)庫(kù)中, 那么就需要對(duì)存儲(chǔ)的 key 進(jìn)行有效地區(qū)分.
對(duì)于一個(gè) raft node 來(lái)說(shuō), 它通過(guò)節(jié)點(diǎn) id RaftId(uint64)
和 組 id gid(uint32)
兩層 來(lái)標(biāo)識(shí)自己
相應(yīng)地, raftwal 中的三類 key 都包含這兩個(gè) id
-
snapshotKey:
func (w *Wal) snapshotKey(gid uint32) []byte { b := make([]byte, 14) binary.BigEndian.PutUint64(b[0:8], w.id) copy(b[8:10], []byte("ss")) binary.BigEndian.PutUint32(b[10:14], gid) return b }
?
-
hardStateKey:
func (w *Wal) hardStateKey(gid uint32) []byte { b := make([]byte, 14) binary.BigEndian.PutUint64(b[0:8], w.id) copy(b[8:10], []byte("hs")) binary.BigEndian.PutUint32(b[10:14], gid) return b }
-
entryKey:
func (w *Wal) entryKey(gid uint32, term, idx uint64) []byte { b := make([]byte, 28) binary.BigEndian.PutUint64(b[0:8], w.id) binary.BigEndian.PutUint32(b[8:12], gid) binary.BigEndian.PutUint64(b[12:20], term) binary.BigEndian.PutUint64(b[20:28], idx) return b }
1.2 Wal
Wal 提供 raft 數(shù)據(jù)的讀寫.
對(duì)于 raft 數(shù)據(jù)的持久化, 最重要的是保證數(shù)據(jù)的一致性.
StoreSnapshot
func (w *Wal) StoreSnapshot(gid uint32, s raftpb.Snapshot) error {
txn := w.wals.NewTransactionAt(1, true)
defer txn.Discard()
// ...
if err := txn.Set(w.snapshotKey(gid), data); err != nil {
return err
}
// ...
// 清除 snapshot 數(shù)據(jù)之前的所有 entry
// Delete all entries before this snapshot to save disk space.
start := w.entryKey(gid, 0, 0)
last := w.entryKey(gid, s.Metadata.Term, s.Metadata.Index)
// 這里利用了 badger 的特性, 在遍歷的時(shí)候僅讀取 key 數(shù)據(jù), 減少了讀取 value 帶來(lái)的開(kāi)銷
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = false
itr := txn.NewIterator(opt)
defer itr.Close()
// 逐一刪除不再需要的 entry
for itr.Seek(start); itr.Valid(); itr.Next() {
// ...
}
// Failure to delete entries is not a fatal error, so should be
// ok to ignore
if err := txn.CommitAt(1, nil); err != nil {
x.Printf("Error while storing snapshot %v\n", err)
return err
}
return nil
}
Store
// Store stores the hardstate and entries for a given RAFT group.
func (w *Wal) Store(gid uint32, h raftpb.HardState, es []raftpb.Entry) error {
txn := w.wals.NewTransactionAt(1, true)
var t, i uint64
// 逐一保存 entry
for _, e := range es {
t, i = e.Term, e.Index
// ...
}
// 如果有必要, 保存 HardState
if !raft.IsEmptyHardState(h) {
// ...
}
// If we get no entries, then the default value of t and i would be zero. That would
// end up deleting all the previous valid raft entry logs. This check avoids that.
if t > 0 || i > 0 {
// When writing an Entry with Index i, any previously-persisted entries
// with Index >= i must be discarded.
// Ideally we should be deleting entries from previous term with index >= i,
// but to avoid complexity we remove them during reading from wal.
// 有可能出現(xiàn)某個(gè)時(shí)間點(diǎn)之后, 由于網(wǎng)絡(luò)原因, 數(shù)據(jù)分叉的情形.
// 為了在網(wǎng)絡(luò)恢復(fù)之后保證數(shù)據(jù)一致性, 對(duì)于每一批 entry, 需要清除邏輯上排在這批數(shù)據(jù)之后的 entry.
start := w.entryKey(gid, t, i+1)
prefix := w.prefix(gid)
// ...
// 逐一清除
for itr.Seek(start); itr.ValidForPrefix(prefix); itr.Next() {
// ...
}
}
if err := txn.CommitAt(1, nil); err != nil {
return err
}
return nil
}
讀取
func (w *Wal) Snapshot(gid uint32) (snap raftpb.Snapshot, rerr error) {
// ...
}
func (w *Wal) HardState(gid uint32) (hd raftpb.HardState, rerr error) {
// ...
}
func (w *Wal) Entries(gid uint32, fromTerm, fromIndex uint64) (es []raftpb.Entry, rerr error) {
// ...
}
1.3 關(guān)于badger
badger 來(lái)源于這篇論文 WiscKey: Separating Keys from Values in SSD-conscious Storage. .
知乎上僅有的評(píng)論里, 對(duì)它的評(píng)價(jià)不甚高 如何評(píng)價(jià) Badger (fast key-value storage) ??.
但不論怎樣, 它在一些情況下確實(shí)比較 快, 也可能非常適合 dgraph 的使用場(chǎng)景.
2. conn
conn
充當(dāng)了 etcd/raft
的網(wǎng)絡(luò)傳輸層, 基于 gRPC
在 raft 節(jié)點(diǎn)之間同步信息.
2.1 Pool
看名字是個(gè)連接池, 實(shí)際上其中的 *grpc.ClienctConn
是復(fù)用的.
一旦創(chuàng)建, 會(huì)每隔 10 秒嘗試 ping 一下, 根據(jù)結(jié)果判斷當(dāng)前連接是否可用.
// "Pool" is used to manage the grpc client connection(s) for communicating with other
// worker instances. Right now it just holds one of them.
type Pool struct {
sync.RWMutex
// 這段注釋說(shuō)明了 *grpc.ClientConn 可以服用的原因
// A "pool" now consists of one connection. gRPC uses HTTP2 transport to combine
// messages in the same TCP stream.
conn *grpc.ClientConn
// 上一次 ping 請(qǐng)求成功的時(shí)間
lastEcho time.Time
// 目標(biāo)節(jié)點(diǎn)的地址
Addr string
// 發(fā)起 ping 請(qǐng)求的 ticker
ticker *time.Ticker
}
2.2 Pools
Pools 維護(hù)了不同地址的 Pool.
這里是一個(gè)單例.
type Pools struct {
sync.RWMutex
all map[string]*Pool
}
var pi *Pools
func init() {
pi = new(Pools)
pi.all = make(map[string]*Pool)
}
2.3 Node
Node 的用于維護(hù) raft 成員節(jié)點(diǎn), 以及在各節(jié)點(diǎn)之間傳輸信息.職責(zé)包括:
初始化 / 讀取 當(dāng)前的 raft.Node
// SetRaft would set the provided raft.Node to this node.
// It would check fail if the node is already set.
func (n *Node) SetRaft(r raft.Node) {
// ...
}
// Raft would return back the raft.Node stored in the node.
func (n *Node) Raft() raft.Node {
// ...
}
維護(hù) ConfState
即節(jié)點(diǎn) id 列表
// SetConfState would store the latest ConfState generated by ApplyConfChange.
func (n *Node) SetConfState(cs *raftpb.ConfState) {
// ...
}
// ConfState would return the latest ConfState stored in node.
func (n *Node) ConfState() *raftpb.ConfState {
// ...
}
維護(hù) 節(jié)點(diǎn) id - 地址 的對(duì)應(yīng)關(guān)系
func (n *Node) Peer(pid uint64) (string, bool) {
// ...
}
// addr must not be empty.
func (n *Node) SetPeer(pid uint64, addr string) {
// ...
}
func (n *Node) DeletePeer(pid uint64) {
// ...
}
// Connects the node and makes its peerPool refer to the constructed pool and address
// (possibly updating ourselves from the old address.) (Unless pid is ourselves, in which
// case this does nothing.)
func (n *Node) Connect(pid uint64, addr string) {
// ..
}
加入和移除節(jié)點(diǎn)
-
加入節(jié)點(diǎn)
// 這個(gè)函數(shù)是可以認(rèn)為是 AddCluster 的回調(diào) // 由使用者在收到 ConfChange 成功 apply 時(shí)主動(dòng)調(diào)用 // dgraph 中調(diào)用這個(gè)方法的地方 err 都傳入了 nil func (n *Node) DoneConfChange(id uint64, err error) { n.Lock() defer n.Unlock() ch, has := n.confChanges[id] if !has { return } delete(n.confChanges, id) ch <- err } func (n *Node) AddToCluster(ctx context.Context, pid uint64) error { addr, ok := n.Peer(pid) // ... rcBytes, err := rc.Marshal() // ... ch := make(chan error, 1) // 這個(gè)函數(shù)中, 將 channel 和一個(gè)隨機(jī)生成的 id 映射起來(lái) // 并在向其他節(jié)點(diǎn)同步的信息中帶上這個(gè) id id := n.storeConfChange(ch) err = n.Raft().ProposeConfChange(ctx, raftpb.ConfChange{ ID: id, Type: raftpb.ConfChangeAddNode, NodeID: pid, Context: rcBytes, }) if err != nil { return err } // 等待 ConfChange apply 成功的回調(diào) err = <-ch return err }
-
移除節(jié)點(diǎn)
func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error { // ... // 和 AddToCluster 類似, 這里需要等待 ConfChange 完成 ch := make(chan error, 1) pid := n.storeConfChange(ch) err := n.Raft().ProposeConfChange(ctx, raftpb.ConfChange{ ID: pid, Type: raftpb.ConfChangeRemoveNode, NodeID: id, }) // ... return err }
?
節(jié)點(diǎn)間同步信息
func (n *Node) Send(m raftpb.Message) {
// ...
select {
case n.messages <- sendmsg{to: m.To, data: data}:
// pass
// - -0 為什么這邊會(huì)有 ignore... 僅僅是為了不阻塞調(diào)用者么?
default:
// ignore
}
}
// 所有通過(guò) n.messages 傳遞的信息都會(huì)積累到一定程度后一起發(fā)送
// 發(fā)往同一個(gè)節(jié)點(diǎn)的信息也會(huì)整合
func (n *Node) BatchAndSendMessages() {
// 對(duì)同一個(gè)目標(biāo) id, 始終復(fù)用一個(gè) *bytes.Buffer
batches := make(map[uint64]*bytes.Buffer)
for {
totalSize := 0
sm := <-n.messages
slurp_loop:
for {
// 如有必要, 初始化 *bytes.Buffer
var buf *bytes.Buffer
// ...
// 先將當(dāng)前 data 的長(zhǎng)度寫入, 用做 message 之間的分隔.
// 再寫入 data 本體
// 因此每條 message 占用 4 + len(sm.data)
totalSize += 4 + len(sm.data)
x.Check(binary.Write(buf, binary.LittleEndian, uint32(len(sm.data))))
x.Check2(buf.Write(sm.data))
// 如果累積的數(shù)據(jù)量足夠大, 中斷此次匯集, 執(zhí)行發(fā)送
if totalSize > messageBatchSoftLimit {
// We limit the batch size, but we aren't pushing back on
// n.messages, because the loop below spawns a goroutine
// to do its dirty work. This is good because right now
// (*node).send fails(!) if the channel is full.
break
}
// 如果沒(méi)有新的 message 傳入, 同樣中斷匯集執(zhí)行發(fā)送
select {
case sm = <-n.messages:
default:
break slurp_loop
}
}
// 執(zhí)行發(fā)送
for to, buf := range batches {
if buf.Len() == 0 {
continue
}
data := make([]byte, buf.Len())
copy(data, buf.Bytes())
go n.doSendMessage(to, data)
// 重置 buf 供下一輪 message 匯集循環(huán)使用
buf.Reset()
}
}
}
func (n *Node) doSendMessage(to uint64, data []byte) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// 獲取到指定節(jié)點(diǎn)的連接
addr, has := n.Peer(to)
pool, err := Get().Get(addr)
if !has || err != nil {
x.Printf("No healthy connection found to node Id: %d, err: %v\n", to, err)
// No such peer exists or we got handed a bogus config (bad addr), so we
// can't send messages to this peer.
return
}
client := pool.Get()
// ...
ch := make(chan error, 1)
go func() {
_, err = c.RaftMessage(ctx, p)
if err != nil {
x.Printf("Error while sending message to node Id: %d, err: %v\n", to, err)
}
ch <- err
}()
// 超時(shí)或發(fā)送完成
select {
case <-ctx.Done():
return
case <-ch:
// We don't need to do anything if we receive any error while sending message.
// RAFT would automatically retry.
return
}
}
保存/恢復(fù) raft 數(shù)據(jù)
func (n *Node) SaveSnapshot(s raftpb.Snapshot) {
// ...
}
func (n *Node) SaveToStorage(h raftpb.HardState, es []raftpb.Entry) {
// ...
}
func (n *Node) InitFromWal(wal *raftwal.Wal) (idx uint64, restart bool, rerr error) {
// ...
}
WaitForMinProposal
這里應(yīng)該是 LinearRead 相關(guān), 用來(lái)確認(rèn) Read 對(duì)應(yīng)的 message 已經(jīng) apply
func (n *Node) WaitForMinProposal(ctx context.Context, read *api.LinRead) error {
if read == nil || read.Ids == nil {
return nil
}
gid := n.RaftContext.Group
min := read.Ids[gid]
return n.Applied.WaitForMark(ctx, min)
}
2.4 RaftServer
RaftServer 是 gRPC service Raft 的實(shí)現(xiàn), 內(nèi)部是對(duì) Node 的操作.