當(dāng)server啟動(dòng)后辛孵,初始狀態(tài)是follower固惯,然后如果在集群中第一個(gè)觸發(fā)選舉超時(shí)薇正,則變?yōu)閏andicate柑晒,然后向其他server發(fā)起投票欧瘪,當(dāng)收到過(guò)半數(shù)的贊成票后變?yōu)閘eader,開始每隔一個(gè)心跳時(shí)間向其他server發(fā)送心跳匙赞。啟動(dòng)流程如下:
在server正常工作前佛掖,需要進(jìn)行l(wèi)eader選舉,server啟動(dòng)時(shí)為follower罚屋,在raftNode的一個(gè)routine中苦囱,會(huì)監(jiān)聽ticker.C這個(gè)通道的事件,這個(gè)通道對(duì)應(yīng)一個(gè)定時(shí)器脾猛,每隔100ms觸發(fā)一次撕彤,每次觸發(fā)該100ms定時(shí)事件,raftNode會(huì)向node的n.tickc通道寫入事件猛拴,這樣相當(dāng)于node有了一個(gè)每隔100ms觸發(fā)一次的時(shí)鐘信號(hào)羹铅,node在自己的一個(gè)routine中會(huì)根據(jù)該時(shí)鐘信號(hào)判斷是否達(dá)到了選舉超時(shí)時(shí)間,到達(dá)選舉超時(shí)時(shí)間后愉昆,會(huì)變?yōu)閏andicate狀態(tài)职员,然后對(duì)集群中其他每個(gè)server(在peer中保存該server與其他server的關(guān)聯(lián)數(shù)據(jù)),構(gòu)造選舉請(qǐng)求消息跛溉,追加到raft算法模塊的msgs中焊切,而在node的一個(gè)routine中會(huì)監(jiān)聽r.msgs是否有消息要處理,有的話會(huì)將r.msgs封裝成Ready寫入到node的n.readyc通道芳室,raftNode當(dāng)監(jiān)聽到n.readyc通道有消息時(shí)會(huì)將消息寫入到cw.msgc通道专肪,streamWriter會(huì)把cw.msgc通道的消息發(fā)送到對(duì)應(yīng)的server。
當(dāng)對(duì)應(yīng)peer的streamReader監(jiān)聽到p.recvc通道有事件并且事件為投票響應(yīng)消息時(shí)會(huì)將消息寫入到node的n.recvc通道堪侯,node根據(jù)投票響應(yīng)結(jié)果統(tǒng)計(jì)投贊成票的server個(gè)數(shù)嚎尤,如果超過(guò)半數(shù)則變?yōu)閘eader,開始向其他server發(fā)送心跳伍宦。
下面看下follower在收到選舉請(qǐng)求時(shí)如何處理芽死,在raft的Step方法中:
case pb.MsgVote, pb.MsgPreVote:
// The m.Term > r.Term clause is for MsgPreVote. For MsgVote m.Term should
// always equal r.Term.
if (r.Vote == None || m.Term > r.Term || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// Only record real votes.
r.electionElapsed = 0
r.Vote = m.From
}
} else {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: voteRespMsgType(m.Type), Reject: true})
}
投票消息為pb.MsgVote,直接看投贊成票的條件r.raftLog.isUpToDate(m.Index, m.LogTerm)次洼,方法如下:
func (l *raftLog) isUpToDate(lasti, term uint64) bool {
return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
}
m.Index為candicate的最新日志的索引位置关贵,即參數(shù)中的lasti,m.LogTerm為candicate最新日志的任期號(hào)卖毁,即參數(shù)中的term揖曾。贊成條件為candicate的最新日志的任期號(hào)比f(wàn)ollower的最新日志的任期號(hào)大term > l.lastTerm(),或者在雙方最新日志任期號(hào)相同的情況下,candicate最新日志的索引位置要比f(wàn)ollower的最新日志索引位置大翩肌,即比f(wàn)ollower的日志更新 (term == l.lastTerm() && lasti >= l.lastIndex()模暗。
下面看下candicate對(duì)于投票響應(yīng)請(qǐng)求的處理,在raft的stepCandicate方法中:
case myVoteRespType:
gr := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
switch r.quorum() {
case gr:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastAppend()
}
case len(r.votes) - gr:
r.becomeFollower(r.Term, None)
}
關(guān)鍵語(yǔ)句在這一行g(shù)r := r.poll(m.From, m.Type, !m.Reject)念祭,下面看下這個(gè)方法:
func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int) {
if v {
r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
} else {
r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
}
if _, ok := r.votes[id]; !ok {
r.votes[id] = v
}
for _, vv := range r.votes {
if vv {
granted++
}
}
return granted
}
m.From為投票響應(yīng)請(qǐng)求來(lái)源follower的id兑宇,!m.Reject是該follower是否同意投票,首先設(shè)置follower的投票結(jié)果:
r.votes[id] = v
v就是!m.Reject粱坤,然后統(tǒng)計(jì)所有server的贊成票數(shù):
for _, vv := range r.votes {
if vv {
granted++
}
}
回到raft的stepCandicate方法隶糕,gr就是poll返回的贊成票數(shù)granted,當(dāng)贊成票數(shù)達(dá)到r.quorum()站玄,即過(guò)半數(shù)枚驻,如下:
func (r *raft) quorum() int {
return len(r.prs)/2 + 1
}
當(dāng)贊成票達(dá)到過(guò)半數(shù)時(shí),成為leader株旷,并向其他follower發(fā)送附加日志rpc再登。