Leader
假定現(xiàn)在已經(jīng)選出leader箫措,開(kāi)始要準(zhǔn)備給其他人做日志同步了垮卓。
首先你要成為一個(gè)真正的Leader傻工,需要做前期準(zhǔn)備挑格。
- 從Candidate轉(zhuǎn)變?yōu)長(zhǎng)eader咙冗,不是只是換個(gè)名字而已
- 轉(zhuǎn)變完成后,就要給其他成員同步日志了
becomeLeader
func (r *raft) becomeLeader() {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r.state == StateFollower {
panic("invalid transition [follower -> leader]")
}
r.step = stepLeader
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
// Followers enter replicate mode when they've been successfully probed
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
r.prs[r.id].becomeReplicate()
// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
// safe to delay any future proposals until we commit all our
// pending log entries, and scanning the entire tail of the log
// could be expensive.
r.pendingConfIndex = r.raftLog.lastIndex()
emptyEnt := pb.Entry{Data: nil}
if !r.appendEntry(emptyEnt) {
// This won't happen because we just called reset() above.
r.logger.Panic("empty entry was dropped")
}
// As a special case, don't count the initial empty entry towards the
// uncommitted log quota. This is because we want to preserve the
// behavior of allowing one entry larger than quota if the current
// usage is zero.
r.reduceUncommittedSize([]pb.Entry{emptyEnt})
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}
- 首先之前的身份不能是follower
- 之后step處理會(huì)讓stepLeader托管
- 將自己設(shè)為ProgressStateReplicate漂彤,且Next=Match+1
reset(r.Term)
func (r *raft) reset(term uint64) {
if r.Term != term {
r.Term = term
r.Vote = None
}
r.lead = None
r.electionElapsed = 0
r.heartbeatElapsed = 0
r.resetRandomizedElectionTimeout()
r.abortLeaderTransfer()
r.votes = make(map[uint64]bool)
r.forEachProgress(func(id uint64, pr *Progress) {
*pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner}
if id == r.id {
pr.Match = r.raftLog.lastIndex()
}
})
r.pendingConfIndex = 0
r.uncommittedSize = 0
r.readOnly = newReadOnly(r.readOnly.option)
}
- 設(shè)置任期為當(dāng)前任期
- 投票雾消,lead,選舉計(jì)時(shí)器挫望,心跳計(jì)時(shí)器立润,隨機(jī)選舉超時(shí)時(shí)間,leader轉(zhuǎn)移媳板,投票機(jī)桑腮,pendingConfigIndex,未提交的entrySize拷肌,readOnly全部清零
- pendingConfigIndex
- readOnly
- uncommittedSize
- 重置本地保存的其他節(jié)點(diǎn)的進(jìn)度
- 這里需要注意的是到旦,將對(duì)方的Next設(shè)為跟leader保持一致,是leader假定大家都跟我一致巨缘。r.raftLog.lastIndex() + 1
- 每個(gè)節(jié)點(diǎn)的Progress的狀態(tài)初始都為Probe
tickHeartbeat
r.heartbeatElapsed++
r.electionElapsed++
if r.electionElapsed >= r.electionTimeout {
r.electionElapsed = 0
if r.checkQuorum {
r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
}
// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
if r.state == StateLeader && r.leadTransferee != None {
r.abortLeaderTransfer()
}
}
if r.state != StateLeader {
return
}
if r.heartbeatElapsed >= r.heartbeatTimeout {
r.heartbeatElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
}
如果成員接受Leader的同步請(qǐng)求的情況
還記得么添忘,Leader上任的時(shí)候大家都是Probe狀態(tài),現(xiàn)在轉(zhuǎn)換成ProgressStateReplicate若锁,同時(shí)他的Next當(dāng)然是Match+1
- ProgressStateSnapshot 見(jiàn)EtcdRaft源碼分析(快照復(fù)制)
- ProgressStateReplicate
- 到這里說(shuō)明對(duì)方已經(jīng)接受了日志復(fù)制搁骑,那么在ins里面刪除小于或等于這次index的部分。
bcastAppend
func (r *raft) maybeSendAppend (to uint64, sendIfEmpty bool) bool {
pr := r.getProgress(to)
if pr.IsPaused() {
return false
}
m := pb.Message{}
m.To = to
term, errt := r.raftLog.term(pr.Next - 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
if len(ents) == 0 && !sendIfEmpty {
return false
}
if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
if !pr.RecentActive {
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return false
}
m.Type = pb.MsgSnap
snapshot, err := r.raftLog.snapshot()
if err != nil {
if err == ErrSnapshotTemporarilyUnavailable {
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
return false
}
panic(err) // TODO(bdarnell)
}
if IsEmptySnap(snapshot) {
panic("need non-empty snapshot")
}
m.Snapshot = snapshot
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
pr.becomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
} else {
m.Type = pb.MsgApp
m.Index = pr.Next - 1
m.LogTerm = term
m.Entries = ents
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
switch pr.State {
// optimistically increase the next when in ProgressStateReplicate
case ProgressStateReplicate:
last := m.Entries[n-1].Index
pr.optimisticUpdate(last)
pr.ins.add(last)
case ProgressStateProbe:
pr.pause()
default:
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
}
}
r.send(m)
return true
}
- 拿到對(duì)方的Progress又固,也就是進(jìn)度仲器。
- 打包當(dāng)前節(jié)點(diǎn)Next之后的entries
- 打包當(dāng)前節(jié)點(diǎn)Next-1的(任期,index)仰冠,作為接收人校驗(yàn)用
- 將自己committed的情況發(fā)給對(duì)方
- 準(zhǔn)備發(fā)MsgApp消息給對(duì)方
- 遍歷entries
- 如果對(duì)方的狀態(tài)是ProgressStateReplicate
- 更新對(duì)方進(jìn)度的Next為最新的last
- 將last加到ins里面乏冀,注意這個(gè)ins是個(gè)類(lèi)環(huán)形的隊(duì)列。
- Snapshot的情況
- 如果Next-1的任期或之后的entries如果查不到洋只,那肯定就在snapshot里面
- 拿出當(dāng)前節(jié)點(diǎn)存儲(chǔ)的snapshot辆沦,有可能在unstable或storage里面
- 將對(duì)方的Progress設(shè)為ProgressStateSnapshot,且設(shè)置PendingSnapshot為snapshot的index
- 準(zhǔn)備發(fā)MsgSnap消息給對(duì)方
Follower
case pb.MsgApp:
r.electionElapsed = 0
r.lead = m.From
r.handleAppendEntries(m)
- 首先Follower認(rèn)為只有Leader才能發(fā)這種消息识虚,所以只要收到就認(rèn)他為L(zhǎng)eader
- 同時(shí)選舉計(jì)時(shí)要清零
- 真正處理的邏輯在handleAppendEntries里面
handleAppendEntries
func (r *raft) handleAppendEntries(m pb.Message) {
if m.Index < r.raftLog.committed {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
}
}
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
if l.matchTerm(index, logTerm) {
lastnewi = index + uint64(len(ents))
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := index + 1
l.append(ents[ci-offset:]...)
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true
}
return 0, false
}
如果比Follower已經(jīng)committed還要小肢扯,他會(huì)把自己committed的情況發(fā)回給Leader,沒(méi)關(guān)系担锤,將自己committed的情況發(fā)回給Leader
在mayAppend的時(shí)候會(huì)去比較leader發(fā)來(lái)的index的(任期蔚晨,index)是否一致。如果不一致給Leader報(bào)告你給的index位置的entry任期跟我對(duì)不上肛循。有可能我根本都沒(méi)有铭腕,有可能是完全不一樣的東西银择。你的同步請(qǐng)求我拒絕,并附上我現(xiàn)在的最后一位谨履。RejectHint: r.raftLog.lastIndex()欢摄,然后沖突點(diǎn)就是發(fā)來(lái)的index。
報(bào)告中的最后一位的作用笋粟,待分析
Raft中只要某個(gè)位置的(任期怀挠,index)一致,那么index之前都是一致的害捕。
如果能對(duì)上绿淋,說(shuō)明插入位置前一位我們都一致,這樣可以放心往后append了尝盼。
- 首先我們算出append之后新的最后一位吞滞,lastnewi
- findConflict
- 當(dāng)然了,最好的情況是正好能接上盾沫,也就不存在沖突的可能性裁赠,無(wú)腦往后append新的entry就好了
- 還有的情況是,follower本地存儲(chǔ)的entry比leader想象的還要多赴精,還要復(fù)雜佩捞。那怎么辦,當(dāng)然是從前往后找到第一個(gè)沖突點(diǎn)蕾哟,然后之后的全部不要一忱,跟leader保持一致。
- 然后跟Leader要求的committed保持一致
- 然后給Leader報(bào)告說(shuō)谭确,你要求的我都執(zhí)行完了帘营,附上我現(xiàn)在最新的last位置
另外還有一種情況是,Leader的探測(cè)請(qǐng)求逐哈,F(xiàn)ollower
Leader
下面我們剖析下Leader在收到成員的同步響應(yīng)之后的處理芬迄。
case pb.MsgAppResp:
pr.RecentActive = true
if m.Reject {
r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
if pr.maybeDecrTo(m.Index, m.RejectHint) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
r.sendAppend(m.From)
}
} else {
oldPaused := pr.IsPaused()
if pr.maybeUpdate(m.Index) {
switch {
case pr.State == ProgressStateProbe:
pr.becomeReplicate()
case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// Transition back to replicating state via probing state
// (which takes the snapshot into account). If we didn't
// move to replicating state, that would only happen with
// the next round of appends (but there may not be a next
// round for a while, exposing an inconsistent RaftStatus).
pr.becomeProbe()
pr.becomeReplicate()
case pr.State == ProgressStateReplicate:
pr.ins.freeTo(m.Index)
}
if r.maybeCommit() {
r.bcastAppend()
} else if oldPaused {
// If we were paused before, this node may be missing the
// latest commit index, so send it.
r.sendAppend(m.From)
}
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
// at once (such as when transitioning from probe to
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
for r.maybeSendAppend(m.From, false) {
}
// Transfer leadership is in progress.
if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
r.sendTimeoutNow(m.From)
}
}
}
agree
如果成員接受Leader的同步請(qǐng)求的情況
還記得么,Leader上任的時(shí)候大家都是Probe狀態(tài)昂秃,現(xiàn)在轉(zhuǎn)換成ProgressStateReplicate薯鼠,同時(shí)他的Next當(dāng)然是Match+1
- ProgressStateSnapshot 見(jiàn)EtcdRaft源碼分析(快照復(fù)制)
- ProgressStateReplicate 待分析
- 到這里說(shuō)明對(duì)方已經(jīng)接受了日志復(fù)制,那么在ins里面刪除小于或等于這次index的部分械蹋。
maybeUpdate
func (pr *Progress) maybeUpdate(n uint64) bool {
var updated bool
if pr.Match < n {
pr.Match = n
updated = true
pr.resume()
}
if pr.Next < n+1 {
pr.Next = n + 1
}
return updated
}
- maybeUpdate,從上面分析就知道羞芍,沒(méi)有拒絕就說(shuō)明哗戈,大家在某種程度是一致的,對(duì)方發(fā)來(lái)的index就表示leader發(fā)給他的數(shù)據(jù)同步到哪里了荷科。首先第一件事情唯咬,就是記錄下來(lái)對(duì)方同步的進(jìn)度纱注。
maybeCommit
func (r *raft) maybeCommit() bool {
// Preserving matchBuf across calls is an optimization
// used to avoid allocating a new slice on each call.
if cap(r.matchBuf) < len(r.prs) {
r.matchBuf = make(uint64Slice, len(r.prs))
}
mis := r.matchBuf[:len(r.prs)]
idx := 0
for _, p := range r.prs {
mis[idx] = p.Match
idx++
}
sort.Sort(mis)
mci := mis[len(mis)-r.quorum()]
return r.raftLog.maybeCommit(mci, r.Term)
}
- maybeCommit, 這里會(huì)統(tǒng)計(jì)各個(gè)成員的進(jìn)度,如果超過(guò)一半的人的同步進(jìn)度Match已經(jīng)超過(guò)了Leader的committed位置胆胰,這個(gè)時(shí)候Leader才可以安心去commit本地entry了狞贱。
- 最后將commit的變更再次發(fā)給成員去同步
reject
如果被對(duì)方拒絕
if m.Reject {
r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
if pr.maybeDecrTo(m.Index, m.RejectHint) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
r.sendAppend(m.From)
}
}
maybeDecrTo
func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
if pr.State == ProgressStateReplicate {
// the rejection must be stale if the progress has matched and "rejected"
// is smaller than "match".
if rejected <= pr.Match {
return false
}
// directly decrease next to match + 1
pr.Next = pr.Match + 1
return true
}
// the rejection must be stale if "rejected" does not match next - 1
if pr.Next-1 != rejected {
return false
}
if pr.Next = min(rejected, last+1); pr.Next < 1 {
pr.Next = 1
}
pr.resume()
return true
}
- 如果對(duì)方進(jìn)度的狀態(tài)是ProgressStateReplicate,如果沖突點(diǎn)居然比Match要小蜀涨,感覺(jué)不可思議瞎嬉,直接忽略。
- 否則的話厚柳,直接跳到Match+1的地方作為進(jìn)度的Next氧枣,相當(dāng)于Match之后的全部丟掉,準(zhǔn)備重新開(kāi)始同步别垮。簡(jiǎn)單直接粗暴便监。
- 一般來(lái)說(shuō)pr.Next-1是應(yīng)該等于rejected的,想想看rejected是插入位置的前一位碳想,專(zhuān)門(mén)用來(lái)校驗(yàn)用的烧董,而pr.Next-1不也是插入位置的前一位么?所以如果不相等胧奔,感覺(jué)不可思議逊移,直接忽略。
- 將對(duì)方進(jìn)度的Next回退到rejectted葡盗,其實(shí)就相當(dāng)于Next回退一位螟左,為什么這么做,其實(shí)就是在探測(cè)啦觅够,回退一位胶背,發(fā)給Follower看看是不是還是沖突,不行喘先,回來(lái)钳吟,再回退一位,如此往復(fù)窘拯『烨遥總會(huì)找到相同的時(shí)候
- 如果maybeDecrTo能夠成功回退,但還不確定回退的位置涤姊,對(duì)方能接受暇番,這個(gè)時(shí)候如果對(duì)方是ProgressStateReplicate狀態(tài),那么先轉(zhuǎn)為ProgressStateProbe思喊。
- 好了壁酬,該回退的也回退了,將最新的entries按回退的位置再發(fā)給對(duì)方看看。
- 可以看到ProgressStateReplicate會(huì)直接回退到Match+1, 去試試看舆乔,如果還被拒絕岳服,那么會(huì)轉(zhuǎn)成ProgressStateProbe,而ProgressStateProbe只會(huì)每次回退一位希俩,去試試看吊宋。
總結(jié)
到這里,整個(gè)流程還可以往下繼續(xù)在Leader和Follower之間來(lái)回往復(fù)颜武,但是璃搜,大體的邏輯就是這樣,可以說(shuō)算法非常精妙盒刚。希望你能看懂我在說(shuō)什么腺劣。