今天我們用配置變更來結(jié)束整個EtcdRaft源碼分析系列恒界。橫向擴展能力是衡量分布式系統(tǒng)優(yōu)劣的決定性指標十酣,而能否輕松,快捷耸采,有效虾宇,及時的變更集群成員是其中的關(guān)鍵。下面我們一起來看看EtcdRaft是怎么實現(xiàn)的好唯。
接口
type Node interface {
...
// ProposeConfChange proposes config change.
// At most one ConfChange can be in the process of going through consensus.
// Application needs to call ApplyConfChange when applying EntryConfChange type entry.
ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
...
// ApplyConfChange applies config change to the local node.
// Returns an opaque ConfState protobuf which must be recorded
// in snapshots. Will never return nil; it returns a pointer only
// to match MemoryStorage.Compact.
ApplyConfChange(cc pb.ConfChange) *pb.ConfState
...
}
可以看到有兩個方法跟配置變更相關(guān)燥翅,看過前面的知道,外部跟Raft打交道的方式靶端。先提案(propose), 然后等內(nèi)部達成一致,再落地(Apply)杨名。
struct
type ConfChange struct {
ID uint64 `protobuf:"varint,1,opt,name=ID" json:"ID"`
Type ConfChangeType `protobuf:"varint,2,opt,name=Type,enum=raftpb.ConfChangeType" json:"Type"`
NodeID uint64 `protobuf:"varint,3,opt,name=NodeID" json:"NodeID"`
Context []byte `protobuf:"bytes,4,opt,name=Context" json:"Context,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
const (
ConfChangeAddNode ConfChangeType = 0
ConfChangeRemoveNode ConfChangeType = 1
ConfChangeUpdateNode ConfChangeType = 2
ConfChangeAddLearnerNode ConfChangeType = 3
)
以上是提案內(nèi)容台谍,很清晰,但有個地方需要注意趁蕊,一次只能變更一個節(jié)點。至于為什么是己,有興趣的可以去看論文哈任柜。
Propose
func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
data, err := cc.Marshal()
if err != nil {
return err
}
return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
}
- 基本就是走的提交數(shù)據(jù)的流程,唯一需要注意的是這里用pb.EntryConfChange將它與其他提案區(qū)別開來摔认。
- 接下來宅粥,我們再走一遍數(shù)據(jù)提交的流程
Leader
...
for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
if r.pendingConfIndex > r.raftLog.applied {
r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
e.String(), r.pendingConfIndex, r.raftLog.applied)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
}
}
}
...
if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend()
- 如果當前還有配置更新沒有處理完粹胯,那么這次新的變更將丟棄,用一個空的entry來替換它
- 如果都處理完了风纠,那么記下這個配置變更的位置到pendingConfIndex
- 后面就一樣了竹观,累加到本地,而且群發(fā)給其他人臭增。
- 問題來了,配置變更都同步給成員了列牺,怎么確認都收到了拗窃,可以開始apply了呢泌辫?我想也猜得到九默,會通過Ready的committedIndex來通知應(yīng)用層驼修。
apply
應(yīng)用層
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
if err := cc.Unmarshal(ents[i].Data); err != nil {
c.logger.Warnf("Failed to unmarshal ConfChange data: %s", err)
continue
}
c.confState = *c.Node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
c.logger.Infof("Applied config change to add node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Nodes)
case raftpb.ConfChangeRemoveNode:
c.logger.Infof("Applied config change to remove node %d, current nodes in channel: %+v", cc.NodeID, c.confState.Nodes)
default:
c.logger.Panic("Programming error, encountered unsupported raft config change")
}
// This ConfChange was introduced by a previously committed config block,
// we can now unblock submitC to accept envelopes.
if c.confChangeInProgress != nil &&
c.confChangeInProgress.NodeID == cc.NodeID &&
c.confChangeInProgress.Type == cc.Type {
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
c.confChangeInProgress = nil
c.configInflight = false
// report the new cluster size
c.Metrics.ClusterSize.Set(float64(len(c.opts.RaftMetadata.Consenters)))
}
if cc.Type == raftpb.ConfChangeRemoveNode && cc.NodeID == c.raftID {
c.logger.Infof("Current node removed from replica set for channel %s", c.channelID)
// calling goroutine, since otherwise it will be blocked
// trying to write into haltC
go c.Halt()
}
}
- 這里乙各,我舉的是Fabric的例子,只關(guān)注關(guān)鍵流程就好
- 收到Raft的ConfChange觅丰,第一件事妇萄,我們就要Node.ApplyConfChange(cc)
- Raft的通訊層是需要應(yīng)用層托管的咬荷,所以不是Raft那邊做完配置變更,就可以收工了懦底。
- Fabric要根據(jù)最新的集群成員數(shù)據(jù)遂唧,去做grpc的連接
- 如果有刪除的節(jié)點辫继,還要去停掉這個成員杆查,這個后面會講臀蛛。
Raft
ApplyConfChange
case cc := <-n.confc:
if cc.NodeID == None {
select {
case n.confstatec <- pb.ConfState{
Nodes: r.nodes(),
Learners: r.learnerNodes()}:
case <-n.done:
}
break
}
switch cc.Type {
case pb.ConfChangeAddNode:
r.addNode(cc.NodeID)
case pb.ConfChangeAddLearnerNode:
r.addLearner(cc.NodeID)
case pb.ConfChangeRemoveNode:
// block incoming proposal when local node is
// removed
if cc.NodeID == r.id {
propc = nil
}
r.removeNode(cc.NodeID)
case pb.ConfChangeUpdateNode:
default:
panic("unexpected conf type")
}
select {
case n.confstatec <- pb.ConfState{
Nodes: r.nodes(),
Learners: r.learnerNodes()}:
case <-n.done:
}
- 首先浊仆,這里有個調(diào)用技巧,如果調(diào)用的時候傳入的NodeID為None舔琅,那么會返回當前Raft的成員
- 下面我們具體看下這幾種變更類型具體是在干嘛
addNode&addLearner
func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
pr := r.getProgress(id)
if pr == nil {
r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
} else {
if isLearner && !pr.IsLearner {
// can only change Learner to Voter
r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id)
return
}
if isLearner == pr.IsLearner {
// Ignore any redundant addNode calls (which can happen because the
// initial bootstrapping entries are applied twice).
return
}
// change Learner to Voter, use origin Learner progress
delete(r.learnerPrs, id)
pr.IsLearner = false
r.prs[id] = pr
}
if r.id == id {
r.isLearner = isLearner
}
// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked
// before the added node has a chance to communicate with us.
pr = r.getProgress(id)
pr.RecentActive = true
}
- 如果是全新的節(jié)點搏明,初始化Progress,這里match=0星著,next=r.raftLog.lastIndex()+1
- 如果之前是learner虚循,那么從learner里面轉(zhuǎn)移到正常節(jié)點里面
removeNode
func (r *raft) removeNode(id uint64) {
r.delProgress(id)
// do not try to commit or abort transferring if there is no nodes in the cluster.
if len(r.prs) == 0 && len(r.learnerPrs) == 0 {
return
}
// The quorum size is now smaller, so see if any pending entries can
// be committed.
if r.maybeCommit() {
r.bcastAppend()
}
// If the removed node is the leadTransferee, then abort the leadership transferring.
if r.state == StateLeader && r.leadTransferee == id {
r.abortLeaderTransfer()
}
}
- 真的刪除Progress就夠了么?想象下整個系統(tǒng)運轉(zhuǎn)是靠什么铺遂?
- 還記得應(yīng)用層會tick么茎刚?Leader靠這個發(fā)心跳,非Leader靠這個選舉超時粮坞。
- 我們再回顧下應(yīng)用層還做了些什么
應(yīng)用層
if cc.Type == raftpb.ConfChangeRemoveNode && cc.NodeID == c.raftID {
// calling goroutine, since otherwise it will be blocked
// trying to write into haltC
go c.Halt()
}
case <-n.chain.haltC:
ticker.Stop()
n.Stop()
n.storage.Close()
n.logger.Infof("Raft node stopped")
close(n.chain.doneC) // close after all the artifacts are closed
return
}
- 可以看到如果是刪除當前節(jié)點的消息初狰,會最終會讓該節(jié)點的ticker.Stop。這也導(dǎo)致該節(jié)點最終會被Raft拋棄筝闹。