我們已經(jīng)在raft-example看過(guò)了對(duì)Propose的簡(jiǎn)單處理了,但是真正的etcd對(duì)Propose的處理更加復(fù)雜尾抑。主要是有如下幾個(gè)點(diǎn):
- consistent index。用于處理boltdb和raftlog之間的冪等性逝慧。
- 同步返回截驮。由于raft的log復(fù)制是異步的,如何做到同步返回結(jié)果快集。
當(dāng)blotdb用作狀態(tài)機(jī)的時(shí)候贡羔,wal和blotdb作為兩個(gè)不同的實(shí)體,很有可能存在不一致的情況个初。所以etcd在blotdb中存儲(chǔ)一條記錄consistent-index乖寒,來(lái)代表已經(jīng)apply到blot-db上成功的log index,這樣當(dāng)根據(jù)wal恢復(fù)blot-db的時(shí)候院溺,就可以判斷l(xiāng)og index是不是已經(jīng)被apply過(guò)楣嘁。
處理過(guò)程
在etcd-server中,一條propose的處理過(guò)程:
首先是為每一條請(qǐng)求注冊(cè)一個(gè)唯一requestID覆获,然后register并等待requestID處理完成
func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
ai := s.getAppliedIndex()
ci := s.getCommittedIndex()
if ci > ai+maxGapBetweenApplyAndCommitIndex {
return nil, ErrTooManyRequests
}
r.Header = &pb.RequestHeader{
ID: s.reqIDGen.Next(),
}
id := r.ID
if id == 0 {
id = r.Header.ID
}
ch := s.w.Register(id)
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()
start := time.Now()
err = s.r.Propose(cctx, data)
select {
case x := <-ch:
return x.(*applyResult), nil
case <-cctx.Done():
proposalsFailed.Inc()
s.w.Trigger(id, nil) // GC wait
return nil, s.parseProposeCtxErr(cctx.Err(), start)
case <-s.done:
return nil, ErrStopped
}
}
請(qǐng)求轉(zhuǎn)發(fā)到raft-node處理
根據(jù)之前的知識(shí)马澈,raft-node經(jīng)過(guò)一番處理之后,交給上層的ready結(jié)構(gòu)來(lái)處理弄息,首先msg轉(zhuǎn)發(fā)到leader痊班,然后leader調(diào)用processMsg,這里主要的操作是copy log到follower摹量。
ap := apply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
notifyc: notifyc,
}
updateCommittedIndex(&ap, rh)
select {
case r.applyc <- ap:
case <-r.stopped:
return
}
if islead {
r.transport.Send(r.processMessages(rd.Messages))
}
// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
// ensure that recovery after a snapshot restore is possible.
if !raft.IsEmptySnap(rd.Snapshot) {
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
}
}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
}
if !raft.IsEmptyHardState(rd.HardState) {
proposalsCommitted.Set(float64(rd.HardState.Commit))
}
if !raft.IsEmptySnap(rd.Snapshot) {
// Force WAL to fsync its hard state before Release() releases
// old data from the WAL. Otherwise could get an error like:
// panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
// See https://github.com/etcd-io/etcd/issues/10219 for more details.
if err := r.storage.Sync(); err != nil {
r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
}
// etcdserver now claim the snapshot has been persisted onto the disk
notifyc <- struct{}{}
// gofail: var raftBeforeApplySnap struct{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
// gofail: var raftAfterApplySnap struct{}
if err := r.storage.Release(rd.Snapshot); err != nil {
r.lg.Fatal("failed to release Raft wal", zap.Error(err))
}
// gofail: var raftAfterWALRelease struct{}
}
r.raftStorage.Append(rd.Entries)
if !islead {
msgs := r.processMessages(rd.Messages)
notifyc <- struct{}{}
r.transport.Send(msgs)
} else {
// leader already processed 'MsgSnap' and signaled
notifyc <- struct{}{}
}
r.Advance()
最終調(diào)用的是apply
case ap := <-s.r.apply():
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f)
func (s *EtcdServer) apply(
es []raftpb.Entry,
confState *raftpb.ConfState,
) (appliedt uint64, appliedi uint64, shouldStop bool) {
s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
for i := range es {
e := es[i]
switch e.Type {
case raftpb.EntryNormal:
s.applyEntryNormal(&e)
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)
case raftpb.EntryConfChange:
// We need to apply all WAL entries on top of v2store
// and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend.
shouldApplyV3 := membership.ApplyV2storeOnly
// set the consistent index of current executing entry
if e.Index > s.consistIndex.ConsistentIndex() {
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)
shouldStop = shouldStop || removedSelf
s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
}
appliedi, appliedt = e.Index, e.Term
}
return appliedt, appliedi, shouldStop
}