在HttpKVAPI中kvstore的集群增加一個節(jié)點請求處理如下:
case r.Method == "POST":
url, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Printf("Failed to read on POST (%v)\n", err)
http.Error(w, "Failed on POST", http.StatusBadRequest)
return
}
nodeId, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil {
log.Printf("Failed to convert ID for conf change (%v)\n", err)
http.Error(w, "Failed on POST", http.StatusBadRequest)
return
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: nodeId,
Context: url,
}
h.confChangeC <- cc
// As above, optimistic that raft will apply the conf change
w.WriteHeader(http.StatusNoContent)
處理邏輯是向confChangeC通道寫入增加節(jié)點消息熊尉,下面看下raftNode的routine中對該通道事件的處理:
//集群變更事件
case cc, ok := <-rc.confChangeC:
if !ok {
rc.confChangeC = nil
} else {
confChangeCount += 1
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
}
}
調(diào)用了node的ProposeConfChange方法:
func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
data, err := cc.Marshal()
if err != nil {
return err
}
return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
}
調(diào)用了node的Step方法校套,然后調(diào)用step方法骡湖,觸發(fā)MsgProp事件:
func (n *node) step(ctx context.Context, m pb.Message) error {
ch := n.recvc
if m.Type == pb.MsgProp {
//當(dāng)是追加配置請求時ch為n.propc
ch = n.propc
}
select {
//當(dāng)是追加配置請求時會向ch(即n.propc)通道寫入數(shù)據(jù)(消息類型和數(shù)據(jù))
case ch <- m:
return nil
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
}
由此可見對于集群變更的處理是將集群變更信息寫入n.propc通道梆暖,下面看下leader角色的node對于該通道數(shù)據(jù)的處理妖爷,在raft的stepLeader方法中:
case pb.MsgProp:
//配置追加請求
if len(m.Entries) == 0 {
r.logger.Panicf("%x stepped empty MsgProp", r.id)
}
if _, ok := r.prs[r.id]; !ok {
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
return
}
if r.leadTransferee != None {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return
}
for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
if r.pendingConf {
r.logger.Infof("propose conf %s ignored since pending unapplied configuration", e.String())
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
}
r.pendingConf = true
}
}
r.appendEntry(m.Entries...)
r.bcastAppend()
return
有以上代碼可知是將該集群變更作為日志追加到本地r.appendEntry(m.Entries...)咐旧,然后向其他follower發(fā)送附加日志rpc:r.bcastAppend()驶鹉。
當(dāng)該集群變更日志復(fù)制到過半數(shù)server后,raftNode提交日志的處理邏輯如下:
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
cc.Unmarshal(ents[i].Data)
rc.confState = *rc.node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
if len(cc.Context) > 0 {
rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)})
}
case raftpb.ConfChangeRemoveNode:
if cc.NodeID == uint64(rc.id) {
log.Println("I've been removed from the cluster! Shutting down.")
return false
}
rc.transport.RemovePeer(types.ID(cc.NodeID))
}
}
對于添加一個節(jié)點的處理铣墨,首先是更新集群變更的狀態(tài)信息室埋,如下:
func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
var cs pb.ConfState
select {
case n.confc <- cc:
case <-n.done:
}
select {
case cs = <-n.confstatec:
case <-n.done:
}
return &cs
}
會向n.confc通道寫入集群變更消息,下面看node的處理:
case cc := <-n.confc:
if cc.NodeID == None {
r.resetPendingConf()
select {
case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
case <-n.done:
}
break
}
switch cc.Type {
case pb.ConfChangeAddNode:
r.addNode(cc.NodeID)
case pb.ConfChangeRemoveNode:
// block incoming proposal when local node is
// removed
if cc.NodeID == r.id {
propc = nil
}
r.removeNode(cc.NodeID)
case pb.ConfChangeUpdateNode:
r.resetPendingConf()
default:
panic("unexpected conf type")
}
select {
case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
case <-n.done:
}
r.addNode的代碼如下:
func (r *raft) addNode(id uint64) {
r.pendingConf = false
if _, ok := r.prs[id]; ok {
// Ignore any redundant addNode calls (which can happen because the
// initial bootstrapping entries are applied twice).
return
}
r.setProgress(id, 0, r.raftLog.lastIndex()+1)
}
就是設(shè)置下該peer的發(fā)送日志進(jìn)度信息伊约。再回到raftNode中對于可提交的集群變更日志的處理姚淆,在更新完集群信息后執(zhí)行了rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)}),即把該節(jié)點加入到當(dāng)前集群屡律,建立與該節(jié)點的通信腌逢,這塊代碼之前分析過了不再贅述。當(dāng)該日志在其他follower也提交時超埋,其他follower也會同樣處理把這個新節(jié)點加入集群搏讶。
因此,只有集群變更日志在當(dāng)前server被提交完成后霍殴,當(dāng)前server才建立與新節(jié)點的通信媒惕,才知道集群的最新規(guī)模,在復(fù)制集群變更日志的過程中他們依然不知道集群的最新規(guī)模来庭。但對于新節(jié)點來說妒蔚,在啟動式會知道老集群的節(jié)點信息,因此新節(jié)點啟動后就知道了集群的最新規(guī)模。
總結(jié)如下肴盏,在ectd的raft實現(xiàn)中科盛,處理集群變更的方案是:每次變更只能添加或刪除一個節(jié)點,不能一次變更多個節(jié)點菜皂,因為每次變更一個節(jié)點能保證不會有多個leader同時產(chǎn)生土涝,下面以下圖為例分析下原因。
最初節(jié)點個數(shù)為3幌墓,即server1但壮、server2、server3常侣,最初leader為server3蜡饵,如果有2個節(jié)點要加入到集群,那么在原來的3個節(jié)點收到集群變更請求前認(rèn)為集群中有3個節(jié)點胳施,確切的說是集群變更日志提交前溯祸,假如server3作為leader,把集群變更日志發(fā)送到了server4舞肆、server5焦辅,這樣可以提交該集群變更日志了,因此server3椿胯、server4筷登、server5的集群變更日志提交后他們知道當(dāng)前集群有5個節(jié)點了。而server1和server2還沒收到集群變更日志或者收到了集群變更日志但沒有提交哩盲,那么server1和server2認(rèn)為集群中還是有3個節(jié)點前方。假設(shè)此時server3因為網(wǎng)絡(luò)原因重新發(fā)起選舉,server1也同時發(fā)起選舉廉油,server1在收到server2的投票贊成響應(yīng)而成為leader惠险,而server3可以通過server4和server5也可以成為leader,這時出現(xiàn)了兩個leader抒线,是不安全且不允許的班巩。
但如果每次只添加或減少1個節(jié)點,假設(shè)最初有3個節(jié)點嘶炭,有1個節(jié)點要加入抱慌。最初leader為server1,在server1的集群變更日志提交前旱物,server1遥缕、server2、server3認(rèn)為集群中有3個節(jié)點宵呛,只有server4認(rèn)為集群中有4個節(jié)點,如果leader在server1夕凝、server2宝穗、server3中產(chǎn)生户秤,那么必然需要2個server,而server4只能收到server1逮矛、server2鸡号、server3中1個server的響應(yīng),是無法成為leader的须鼎,因為server4認(rèn)為集群中有4個節(jié)點鲸伴,需要3個節(jié)點同意才能成為leader。如果在server1是leader時該集群變更日志提交了晋控,那么集群中至少有2個server有該集群變更日志了汞窗,假如server1和server2都有該集群變更日志了,server3和server4還沒有赡译,那么server3和server4不可能被選為leader仲吏,因為他們的日志不夠新。對于server4來說需要3個server同時同意才能成為leader蝌焚,而server1和server2的日志比他新裹唆,不會同意他成為leader。對于server3來說只洒,在集群變更日志提交前他認(rèn)為集群中只有3個server许帐,因此只會把投票請求發(fā)送給server1和server2,而server1和server2因為日志比他新不會同意毕谴;如果server3的集群變更日志也提交了舞吭,那么他人為集群中有4個節(jié)點,這時與server4一樣析珊,需要3個server同時同意才能成為leader羡鸥,如果server1通過server2成為leader了,那么server1和server2都不會參與投票了忠寻。
因此每次一個節(jié)點的加入不管在集群變更日志提交前惧浴、提交過程中還是提交后都不會出現(xiàn)兩個leader的情況。
提交前是因為原來的節(jié)點不知道這個新的節(jié)點奕剃,不會發(fā)送投票給他衷旅,也不會處理新節(jié)點的投票請求;
提交后是因為大家都知道集群的最新規(guī)模了纵朋,不會產(chǎn)生兩個大多數(shù)的投票柿顶;
提交過程中是因為沒有這條集群變更日志的server由于日志不夠新也不能成為leader,比如最初集群規(guī)模是2n+1操软,現(xiàn)在有1個新節(jié)點加入嘁锯,如果集群變更日志復(fù)制到了過半數(shù)server,因為之前的leader是老的集群的,因此過半數(shù)是n+1家乘,假如這個n+1個server中產(chǎn)生了一個leader蝗羊,那么對于新的節(jié)點來說,因為集群變更日志還沒有應(yīng)用到狀態(tài)機所以只有這個新節(jié)點認(rèn)為集群中有2n+2個server仁锯,因此需要n+2個server同意投票他才能成為leader耀找,但這是不可能的,因為已經(jīng)有n+1個節(jié)點已經(jīng)投過票了业崖,而對于老集群中的剩下的沒有投票的n個節(jié)點中野芒,他們?nèi)魏我粋€server都需要n+1個server同意才能成為leader,而他們因為還沒有把集群變更日志真正提交即應(yīng)用到狀態(tài)機双炕,還不知道新節(jié)點的存在狞悲,也就不能收到n+1個server投票,最多只能收到n個節(jié)點的投票雄家,因此也不能成為leader效诅,保證了只能有一個leader被選出來。