全部代碼在我的GitHub妓盲,本文只做分析。
簡(jiǎn)介
該部分主要是要求完成 server 選舉的相關(guān)功能专普,暫時(shí)不牽涉到 log悯衬。重點(diǎn)閱讀論文的 5.1 以及 5.2,結(jié)合 Figure 2 食用脆诉。
首先強(qiáng)調(diào)一下整體架構(gòu)甚亭。在課程給出的框架體系中,Raft
這個(gè)結(jié)構(gòu)體是每個(gè) server 持有一個(gè)的击胜,作為狀態(tài)機(jī)的存在亏狰。每個(gè) server 功能是完全一樣的,只是狀態(tài)不同而已偶摔。我們定義了三種狀態(tài):
- Follower
- Candidate
- Leader
整個(gè)運(yùn)行中暇唾,只有兩種 RPC:
- 請(qǐng)求投票的
RequestVote
- 修改log(也作為心跳包)的
AppendEntries
。
他們都是通過(guò) sendXXX()
來(lái)調(diào)用其他 server 的 XXX()
方法辰斋。
除了上述兩個(gè) RPC策州,我們定義了兩個(gè)需要周期性檢查的 timer。
- 在 timeout 時(shí)重新選舉 Leader 的
electionTimer
- 在 timeout 時(shí)發(fā)送心跳包的
heartbeatTimer
Raft 結(jié)構(gòu)分析
以下內(nèi)容非常重要宫仗,一個(gè)好的設(shè)計(jì)直接決定了后續(xù)的代碼是否順利够挂。我也是在修改了好幾個(gè)版本(都能正確運(yùn)行)之后才選出了個(gè)人覺得最優(yōu)雅的設(shè)計(jì)。Raft Structure Advice 可以作為參考但是我個(gè)人覺得其實(shí)都是一些正確的廢話藕夫。
對(duì)于每一個(gè)不同狀態(tài)的 Raft 節(jié)點(diǎn)孽糖,他們可能有如下并發(fā)的事件:
-
Follower
- 處理
AppendEntries
- 處理
RequestVote
-
electionTimer
超時(shí)
- 處理
-
Follower
- 處理
AppendEntries
- 處理
RequestVote
-
electionTimer
超時(shí) - 發(fā)起投票
sendRequestVote
- 處理
-
Leader
- 處理
AppendEntries
- 處理
RequestVote
-
electionTimer
超時(shí) -
heartbeatTimer
超時(shí) - 發(fā)送心跳包
sendAppendEntries
- 處理
如何組織這些事件,并且避免 race condition 成為了最具有難度的部分毅贮。
首先办悟,labrpc
的特性決定了 AppendEntries
和 RequestVote
都是在一個(gè)新的 goroutine 中處理的。我們無(wú)需關(guān)心如何去給他們安排線程滩褥,只需知道這兩個(gè)函數(shù)都需要使用 mutex 保護(hù)起來(lái)病蛉。
至于兩個(gè) timer electionTimer
和 heartbeatTimer
,我們可以在構(gòu)造 Raft 實(shí)例時(shí) kickoff 一個(gè) goroutine瑰煎,在其中利用 select
不斷處理這兩個(gè) timer 的 timeout 事件铺然。在處理的時(shí)候 并行 地調(diào)用 sendAppendEntries
和 sendRequestVote
。
最好寫一個(gè)專門的狀態(tài)轉(zhuǎn)換函數(shù)來(lái)處理 Raft 節(jié)點(diǎn)三種狀態(tài)的轉(zhuǎn)換丢间。
代碼分析
個(gè)人做了一個(gè)最小實(shí)現(xiàn)探熔,暫不引入用不到的 log 相關(guān)內(nèi)容。
這個(gè)實(shí)現(xiàn)強(qiáng)調(diào)的是容易理解烘挫。總共 220 行代碼。
首先為 Raft 增加了必要的 field饮六。在 lab2 A 中我們僅需要新加 5 個(gè) fields 就足夠了其垄。不需要任何 log 相關(guān)的東西。需要注意的是在讀寫 currentTerm
, votedFor
, state
時(shí)都需要加鎖保護(hù)卤橄。
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
currentTerm int // 2A
votedFor int // 2A
electionTimer *time.Timer // 2A
heartbeatTimer *time.Timer // 2A
state NodeState // 2A
}
在構(gòu)造 Raft 時(shí)绿满,kickoff 一個(gè) goroutine 來(lái)處理 timer 相關(guān)的事件,注意加鎖窟扑。
go func(node *Raft) {
for {
select {
case <-rf.electionTimer.C:
rf.mu.Lock()
if rf.state == Follower {
// rf.startElection() is called in conversion to Candidate
rf.convertTo(Candidate)
} else {
rf.startElection()
}
rf.mu.Unlock()
case <-rf.heartbeatTimer.C:
rf.mu.Lock()
if rf.state == Leader {
rf.broadcastHeartbeat()
rf.heartbeatTimer.Reset(HeartbeatInterval)
}
rf.mu.Unlock()
}
}
}(rf)
狀態(tài)轉(zhuǎn)換函數(shù)喇颁,注意最好由 caller 來(lái)加鎖避免錯(cuò)誤導(dǎo)致死鎖。該函數(shù)也非常簡(jiǎn)明嚎货,需要注意這里對(duì)兩個(gè) timer 的處理橘霎。
// should be called with a lock
func (rf *Raft) convertTo(s NodeState) {
if s == rf.state {
return
}
DPrintf("Term %d: server %d convert from %v to %v\n",
rf.currentTerm, rf.me, rf.state, s)
rf.state = s
switch s {
case Follower:
rf.heartbeatTimer.Stop()
rf.electionTimer.Reset(randTimeDuration(ElectionTimeoutLower, ElectionTimeoutUpper))
rf.votedFor = -1
case Candidate:
rf.startElection()
case Leader:
rf.electionTimer.Stop()
rf.broadcastHeartbeat()
rf.heartbeatTimer.Reset(HeartbeatInterval)
}
}
另外的業(yè)務(wù)邏輯我覺得需要著重強(qiáng)調(diào)的不多。就是以選舉為例講下如何并行 RPC 調(diào)用吧殖属。
// should be called with lock
func (rf *Raft) startElection() {
rf.currentTerm += 1
rf.electionTimer.Reset(randTimeDuration(ElectionTimeoutLower, ElectionTimeoutUpper))
args := RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
}
var voteCount int32
for i := range rf.peers {
if i == rf.me {
rf.votedFor = rf.me
atomic.AddInt32(&voteCount, 1)
continue
}
go func(server int) {
var reply RequestVoteReply
if rf.sendRequestVote(server, &args, &reply) {
rf.mu.Lock()
if reply.VoteGranted && rf.state == Candidate {
atomic.AddInt32(&voteCount, 1)
if atomic.LoadInt32(&voteCount) > int32(len(rf.peers)/2) {
rf.convertTo(Leader)
}
} else {
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.convertTo(Follower)
}
}
rf.mu.Unlock()
} else {
DPrintf("%v send request vote to %d failed", rf, server)
}
}(i)
}
}
需要注意的是以下幾點(diǎn):
- 保證在外界加鎖后調(diào)用
startElection()
- 對(duì)其他每個(gè)節(jié)點(diǎn)開啟一個(gè) goroutine 調(diào)用
sendRequestVote()
姐叁,并在處理返回值時(shí)候加鎖 - 注意我們維護(hù)了一個(gè)記錄投票數(shù)的
int32
并在處理請(qǐng)求返回值時(shí)候用原子操作進(jìn)行讀寫判斷,以決定是否升級(jí)為 Leader洗显。
總結(jié)
以上就是 lab2 part A 的內(nèi)容外潜,雖然邏輯簡(jiǎn)單,但是 lab2 part A 是最值得用心推敲的挠唆,因?yàn)榇a結(jié)構(gòu)是在這時(shí)候決定的处窥。我上一個(gè)陣亡的 6.824 就是因?yàn)殚_始代碼結(jié)構(gòu)不好,導(dǎo)致以后越來(lái)越難改玄组。希望大家引以為鑒滔驾。