準(zhǔn)備工作
- 閱讀raft論文
- 閱讀我寫的raft理論與實(shí)踐[1]-理論篇
- 閱讀raft理論與實(shí)踐[2]-lab2a
- 閱讀raft理論與實(shí)踐[3]-lab2a講解
- 由于我們需要模擬rpc遠(yuǎn)程調(diào)用三妈, 因此需要查看我寫的這篇文章: 模擬RPC遠(yuǎn)程過程調(diào)用
執(zhí)行日志
- 我們需要執(zhí)行日志中的命令峻呛,因此在make函數(shù)中产还,新開一個(gè)協(xié)程:applyLogEntryDaemon()
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
...
go rf.applyLogEntryDaemon() // start apply log
DPrintf("[%d-%s]: newborn election(%s) heartbeat(%s) term(%d) voted(%d)\n",
rf.me, rf, rf.electionTimeout, rf.heartbeatInterval, rf.CurrentTerm, rf.VotedFor)
return rf
}
- 如果rf.lastApplied == rf.commitIndex, 意味著commit log entry命令都已經(jīng)被執(zhí)行了繁堡,這時(shí)用信號(hào)量陷入等待。
- 一旦收到信號(hào),說明需要執(zhí)行命令。這時(shí)會(huì)把最后執(zhí)行的log entry之后,一直到最后一個(gè)commit log entry的所有l(wèi)og都傳入通道apply中進(jìn)行執(zhí)行诡延。
- 由于是測(cè)試,處理log的邏輯會(huì)在測(cè)試代碼中古胆,暫時(shí)不用關(guān)心
// applyLogEntryDaemon exit when shutdown channel is closed
func (rf *Raft) applyLogEntryDaemon() {
for {
var logs []LogEntry
// wait
rf.mu.Lock()
for rf.lastApplied == rf.commitIndex {
rf.commitCond.Wait()
select {
case <-rf.shutdownCh:
rf.mu.Unlock()
DPrintf("[%d-%s]: peer %d is shutting down apply log entry to client daemon.\n", rf.me, rf, rf.me)
close(rf.applyCh)
return
default:
}
}
last, cur := rf.lastApplied, rf.commitIndex
if last < cur {
rf.lastApplied = rf.commitIndex
logs = make([]LogEntry, cur-last)
copy(logs, rf.Logs[last+1:cur+1])
}
rf.mu.Unlock()
for i := 0; i < cur-last; i++ {
// current command is replicated, ignore nil command
reply := ApplyMsg{
CommandIndex: last + i + 1,
Command: logs[i].Command,
CommandValid: true,
}
// reply to outer service
// DPrintf("[%d-%s]: peer %d apply %v to client.\n", rf.me, rf, rf.me)
DPrintf("[%d-%s]: peer %d apply to client.\n", rf.me, rf, rf.me)
// Note: must in the same goroutine, or may result in out of order apply
rf.applyCh <- reply
}
}
}
- 新增 Start函數(shù)肆良,此函數(shù)為leader執(zhí)行從client發(fā)送過來的命令。
- 當(dāng)client發(fā)送過來之后赤兴,首先需要做的就是新增entry 到leader的log中妖滔。并且將自身的nextIndex 與matchIndex 更新。
func (rf *Raft) Start(command interface{}) (int, int, bool) {
index := -1
term := 0
isLeader := false
// Your code here (2B).
select {
case <-rf.shutdownCh:
return -1, 0, false
default:
rf.mu.Lock()
defer rf.mu.Unlock()
// Your code here (2B).
if rf.state == Leader {
log := LogEntry{rf.CurrentTerm, command}
rf.Logs = append(rf.Logs, log)
index = len(rf.Logs) - 1
term = rf.CurrentTerm
isLeader = true
//DPrintf("[%d-%s]: client add new entry (%d-%v), logs: %v\n", rf.me, rf, index, command, rf.logs)
DPrintf("[%d-%s]: client add new entry (%d)\n", rf.me, rf, index)
//DPrintf("[%d-%s]: client add new entry (%d-%v)\n", rf.me, rf, index, command)
// only update leader
rf.nextIndex[rf.me] = index + 1
rf.matchIndex[rf.me] = index
}
}
return index, term, isLeader
}
- 接下來最重要的部分涉及到日志同步桶良,這是通過AppendEntries實(shí)現(xiàn)的座舍。我們知道leader會(huì)不時(shí)的調(diào)用consistencyCheck(n)進(jìn)行一致性檢查。
- 在給第n號(hào)節(jié)點(diǎn)一致性檢查時(shí)陨帆,首先獲取pre = rf.nextIndex曲秉,pre至少要為1,代表要給n節(jié)點(diǎn)發(fā)送的log index疲牵。
- 因此AppendEntriesArgs參數(shù)中承二,PrevLogIndex 與 prevlogTerm 都為pre - 1位置,
- 代表leader相信PrevLogIndex及其之前的節(jié)點(diǎn)都是與leader相同的纲爸。
- 將pre及其之后的entry 加入到AppendEntriesArgs參數(shù)中亥鸠。 這些log entry可能是與leader不相同的,或者是follower根本就沒有的。
func (rf *Raft) consistencyCheck(n int) {
rf.mu.Lock()
defer rf.mu.Unlock()
pre := max(1,rf.nextIndex[n])
var args = AppendEntriesArgs{
Term: rf.CurrentTerm,
LeaderID: rf.me,
PrevLogIndex: pre - 1,
PrevLogTerm: rf.Logs[pre - 1].Term,
Entries: nil,
LeaderCommit: rf.commitIndex,
}
if rf.nextIndex[n] < len(rf.Logs){
args.Entries = append(args.Entries, rf.Logs[pre:]...)
}
go func() {
DPrintf("[%d-%s]: consistency Check to peer %d.\n", rf.me, rf, n)
var reply AppendEntriesReply
if rf.sendAppendEntries(n, &args, &reply) {
rf.consistencyCheckReplyHandler(n, &reply)
}
}()
}
接下來查看follower執(zhí)行AppendEntries時(shí)的反應(yīng)负蚊。
AppendEntries會(huì)新增兩個(gè)返回參數(shù):
ConflictTerm代表可能發(fā)生沖突的term
FirstIndex 代表可能發(fā)生沖突的第一個(gè)index神妹。
type AppendEntriesReply struct {
CurrentTerm int // currentTerm, for leader to update itself
Success bool // true if follower contained entry matching prevLogIndex and prevLogTerm
// extra info for heartbeat from follower
ConflictTerm int // term of the conflicting entry
FirstIndex int // the first index it stores for ConflictTerm
}
如果args.PrevLogIndex < len(rf.Logs), 表明至少當(dāng)前節(jié)點(diǎn)的log長(zhǎng)度是合理的。
令preLogIdx 與 args.PrevLogIndex相等家妆。prelogTerm為當(dāng)前follower節(jié)點(diǎn)preLogIdx位置的term鸵荠。
如果擁有相同的term,說明follower與leader 在preLogIdx之前的log entry都是相同的伤极。因此請(qǐng)求是成功的蛹找。
此時(shí)會(huì)截?cái)鄁ollower的log,將傳遞過來的entry加入到follower的log之后哨坪,執(zhí)行此步驟后庸疾,強(qiáng)制要求與leader的log相同了。
請(qǐng)求成功后当编,reply的ConflictTerm為最后一個(gè)log entry的term,reply的FirstIndex為最后一個(gè)log entry的index彼硫。
否則說明leader與follower的日志是有沖突的,沖突的原因可能是:
leader認(rèn)為的match log entry超出了follower的log個(gè)數(shù)凌箕,或者follower 還沒有任何log entry(除了index為0的entry是每一個(gè)節(jié)點(diǎn)都有的)。
log在相同的index下词渤,leader的term 與follower的term確是不同的牵舱。
這時(shí)找到follower沖突的term即為ConflictTerm。
獲取此term的第一個(gè)entry的index即為FirstIndex缺虐。
所以最后芜壁,AppendEntries會(huì)返回沖突的term以及第一個(gè)可能沖突的index。
// AppendEntries handler, including heartbeat, must backup quickly
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
...
preLogIdx, preLogTerm := 0, 0
if args.PrevLogIndex < len(rf.Logs) {
preLogIdx = args.PrevLogIndex
preLogTerm = rf.Logs[preLogIdx].Term
}
// last log is match
if preLogIdx == args.PrevLogIndex && preLogTerm == args.PrevLogTerm {
reply.Success = true
// truncate to known match
rf.Logs = rf.Logs[:preLogIdx+1]
rf.Logs = append(rf.Logs, args.Entries...)
var last = len(rf.Logs) - 1
// min(leaderCommit, index of last new entry)
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = min(args.LeaderCommit, last)
// signal possible update commit index
go func() { rf.commitCond.Broadcast() }()
}
// tell leader to update matched index
reply.ConflictTerm = rf.Logs[last].Term
reply.FirstIndex = last
if len(args.Entries) > 0 {
DPrintf("[%d-%s]: AE success from leader %d (%d cmd @ %d), commit index: l->%d, f->%d.\n",
rf.me, rf, args.LeaderID, len(args.Entries), preLogIdx+1, args.LeaderCommit, rf.commitIndex)
} else {
DPrintf("[%d-%s]: <heartbeat> current logs: %v\n", rf.me, rf, rf.Logs)
}
} else {
reply.Success = false
// extra info for restore missing entries quickly: from original paper and lecture note
// if follower rejects, includes this in reply:
//
// the follower's term in the conflicting entry
// the index of follower's first entry with that term
//
// if leader knows about the conflicting term:
// move nextIndex[i] back to leader's last entry for the conflicting term
// else:
// move nextIndex[i] back to follower's first index
var first = 1
reply.ConflictTerm = preLogTerm
if reply.ConflictTerm == 0 {
// which means leader has more logs or follower has no log at all
first = len(rf.Logs)
reply.ConflictTerm = rf.Logs[first-1].Term
} else {
i := preLogIdx
// term的第一個(gè)log entry
for ; i > 0; i-- {
if rf.Logs[i].Term != preLogTerm {
first = i + 1
break
}
}
}
reply.FirstIndex = first
if len(rf.Logs) <= args.PrevLogIndex {
DPrintf("[%d-%s]: AE failed from leader %d, leader has more logs (%d > %d), reply: %d - %d.\n",
rf.me, rf, args.LeaderID, args.PrevLogIndex, len(rf.Logs)-1, reply.ConflictTerm,
reply.FirstIndex)
} else {
DPrintf("[%d-%s]: AE failed from leader %d, pre idx/term mismatch (%d != %d, %d != %d).\n",
rf.me, rf, args.LeaderID, args.PrevLogIndex, preLogIdx, args.PrevLogTerm, preLogTerm)
}
}
}
- leader調(diào)用AppendEntries后高氮,會(huì)執(zhí)行回調(diào)函數(shù)consistencyCheckReplyHandler慧妄。
- 如果調(diào)用是成功的,那么正常的更新matchIndex剪芍,nextIndex即下一個(gè)要發(fā)送的index應(yīng)該為matchIndex + 1塞淹。
- 如果調(diào)用失敗,說明有沖突罪裹。
- 如果confiicting term等于0饱普,說明了leader認(rèn)為的match log entry超出了follower的log個(gè)數(shù),或者follower 還沒有任何log entry(除了index為0的entry是每一個(gè)節(jié)點(diǎn)都有的)状共。
- 此時(shí)簡(jiǎn)單的讓nextIndex 為reply.FirstIndex即可套耕。
- 如果conficting term不為0,獲取leader節(jié)點(diǎn)confiicting term 的最后一個(gè)log index峡继,此時(shí)nextIndex 應(yīng)該為此index與reply.FirstIndex的最小值冯袍。
- 檢查最小值是必須的:
- 假設(shè)
- s1: 0-0 1-1 1-2 1-3 1-4 1-5
- s2: 0-0 1-1 1-2 1-3 1-4 1-5
- s3: 0-0 1-1
- 此時(shí)s1為leader,并一致性檢查s3, 從1-5開始檢查,此時(shí)由于leader有更多的log康愤,因此檢查不成功儡循,返回confict term 1, firstindex:2
- 如果只是獲取confiicting term 的最后一個(gè)log index翘瓮,那么nextIndex又是1-5贮折,陷入了死循環(huán)。
func (rf *Raft) consistencyCheckReplyHandler(n int, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.state != Leader {
return
}
if reply.Success {
// RPC and consistency check successful
rf.matchIndex[n] = reply.FirstIndex
rf.nextIndex[n] = rf.matchIndex[n] + 1
rf.updateCommitIndex() // try to update commitIndex
} else {
// found a new leader? turn to follower
if rf.state == Leader && reply.CurrentTerm > rf.CurrentTerm {
rf.turnToFollow()
rf.resetTimer <- struct{}{}
DPrintf("[%d-%s]: leader %d found new term (heartbeat resp from peer %d), turn to follower.",
rf.me, rf, rf.me, n)
return
}
// Does leader know conflicting term?
var know, lastIndex = false, 0
if reply.ConflictTerm != 0 {
for i := len(rf.Logs) - 1; i > 0; i-- {
if rf.Logs[i].Term == reply.ConflictTerm {
know = true
lastIndex = i
DPrintf("[%d-%s]: leader %d have entry %d is the last entry in term %d.",
rf.me, rf, rf.me, i, reply.ConflictTerm)
break
}
}
if know {
rf.nextIndex[n] = min(lastIndex, reply.FirstIndex)
} else {
rf.nextIndex[n] = reply.FirstIndex
}
} else {
rf.nextIndex[n] = reply.FirstIndex
}
rf.nextIndex[n] = min(rf.nextIndex[n], len(rf.Logs))
DPrintf("[%d-%s]: nextIndex for peer %d => %d.\n",
rf.me, rf, n, rf.nextIndex[n])
}
}
- 當(dāng)調(diào)用AppendEntry成功后资盅,說明follower與leader的log是匹配的调榄。此時(shí)leader會(huì)找到commited的log并且執(zhí)行其命令。
- 這里有一個(gè)比較巧妙的方法呵扛,對(duì)matchIndex排序后取最中間的數(shù)每庆。
- 由于matchIndex代表follower有多少log與leader的log匹配,因此中間的log index意味著其得到了大部分節(jié)點(diǎn)的認(rèn)可今穿。
- 因此會(huì)將此中間的index之前的所有l(wèi)og entry都執(zhí)行了缤灵。
- rf.Logs[target].Term == rf.CurrentTerm 是必要的:
- 這是由于當(dāng)一個(gè)entry出現(xiàn)在大多數(shù)節(jié)點(diǎn)的log中,并不意味著其一定會(huì)成為commit蓝晒∪觯考慮下面的情況:
S1: 1 2 1 2 4
S2: 1 2 1 2
S3: 1 --> 1 2
S4: 1 1
S5: 1 1 3
- s1在term2成為leader,只有s1芝薇,s2添加了entry2.
- s5變成了term3的leader胚嘲,之后s1變?yōu)榱藅erm4的leader,接著繼續(xù)發(fā)送entry2到s3中洛二。
- 此時(shí)馋劈,如果s5再次變?yōu)榱薼eader,那么即便沒有S1的支持晾嘶,S5任然變?yōu)榱薼eader妓雾,并且應(yīng)用entry3,覆蓋掉entry2垒迂。
- 所以一個(gè)entry要變?yōu)閏ommit械姻,必須:
- 1、在其term周期內(nèi)机断,就復(fù)制到大多數(shù)策添。
- 2、如果隨后的entry被提交毫缆。在上例中唯竹,如果s1持續(xù)成為term4的leader,那么entry2就會(huì)成為commit苦丁。
- 這是由于以下原因造成的:更高任期為最新的投票規(guī)則浸颓,以及l(fā)eader將其日志強(qiáng)加給follower。
// updateCommitIndex find new commit id, must be called when hold lock
func (rf *Raft) updateCommitIndex() {
match := make([]int, len(rf.matchIndex))
copy(match, rf.matchIndex)
sort.Ints(match)
DPrintf("[%d-%s]: leader %d try to update commit index: %v @ term %d.\n",
rf.me, rf, rf.me, rf.matchIndex, rf.CurrentTerm)
target := match[len(rf.peers)/2]
if rf.commitIndex < target {
//fmt.Println("target:",target,match)
if rf.Logs[target].Term == rf.CurrentTerm {
//DPrintf("[%d-%s]: leader %d update commit index %d -> %d @ term %d command:%v\n",
// rf.me, rf, rf.me, rf.commitIndex, target, rf.CurrentTerm,rf.Logs[target].Command)
DPrintf("[%d-%s]: leader %d update commit index %d -> %d @ term %d\n",
rf.me, rf, rf.me, rf.commitIndex, target, rf.CurrentTerm)
rf.commitIndex = target
go func() { rf.commitCond.Broadcast() }()
} else {
DPrintf("[%d-%s]: leader %d update commit index %d failed (log term %d != current Term %d)\n",
rf.me, rf, rf.me, rf.commitIndex, rf.Logs[target].Term, rf.CurrentTerm)
}
}
}