第一步 閱讀相關(guān)需求
https://pdos.csail.mit.edu/6.824/labs/lab-kvraft.html
的Part B: Key/value service with log compaction
論文的第7章浇借。
第二步 思考
You should spend some time figuring out what the interface will be between your Raft library and your service so that your Raft library can discard log entries. Think about how your Raft will operate while storing only the tail of the log, and how it will discard old log entries.
下面給了一定的線索
You should compare maxraftstate to persister.RaftStateSize(). Whenever your key/value server detects that the Raft state size is approaching this threshold, it should save a snapshot, and tell the Raft library that it has snapshotted, so that Raft can discard old log entries. If maxraftstate is -1, you do not have to snapshot.
所以日志瘦身的步驟大概是
- 當(dāng)SERVER端發(fā)現(xiàn) 當(dāng)前的RAFT STATE SIZE 接近 max raft state 的時候 觸發(fā)do snap shot
- do snap shot 就是把這個INDEX前的日志截斷战授,更新LAST INCULDED INDEX 河咽,持久化到RAFT STATE
- 把 SERVER那邊的SNAPSHOT 持久化進(jìn)SNAPSHOT
- 更新RAFT的代碼淫僻,保證下標(biāo)的準(zhǔn)確性(因為LOG被截斷過缰儿,所以原INDEX需要減去LAST INCULDED INDEX频祝, 才是新的LOG數(shù)組的下標(biāo))
- KV SERVER 啟動的時候 需要載入SNAPSHOT
第三步 實現(xiàn)TASK1 DoSnapShot
任務(wù)目標(biāo)
Your raft.go probably keeps the entire log in a Go slice. Modify it so that it can be given a log index, discard the entries before that index, and continue operating while storing only log entries after that index. Make sure you pass all the Raft tests after making these changes.
首先KV這邊要把他的狀態(tài)傳到RAFT這邊沧踏,因為發(fā)起這個DO SNAPSHOT的請求同時,可能會有新進(jìn)來的LOG欣簇,所以要把這個SNAPSHOT所屬的INDEX也發(fā)過來规脸。RAFT接受到后
首先建立一個新的LOG數(shù)組,值保留這個INDEX之后的LOG
把LOG數(shù)組的指針指向新的數(shù)組
把新的RAFT STATE(currentTerm, voteFor, Log[])存下來
-
把KVSERVER的 SNAPSHOT存下來熊咽。同時要保存LAST INDEX和這個INDEX的TERM
image.png
第四步 實現(xiàn)task 2 KV SERVER端的調(diào)用
Modify your kvserver so that it detects when the persisted Raft state grows too large, and then hands a snapshot to Raft and tells Raft that it can discard old log entries. Raft should save each snapshot with persister.SaveStateAndSnapshot() (don't use files). A kvserver instance should restore the snapshot from the persister when it re-starts.
第五步 實現(xiàn)下標(biāo)的更新
原代碼有很多處要更新莫鸭,就是原來rf.log[i]
現(xiàn)在都調(diào)用這個方法rf.getLog(i)
原來的len(rf.log)
改成 rf.logLen()
還有些slice的地方也要自己注意修改,加上偏移量横殴。
注意有些0被因,要改成rf.lastIncludedIndex
,如下圖
第六步 測試
改完之后,重測raft的所有test
在kvserver 3A的test里定義一個CONST衫仑,隨后把所有maxraftstate
的值改成150
發(fā)現(xiàn)第一個測試 可以過,加個log看下效果梨与。
但是測第2個并發(fā)的時候 block住了。
進(jìn)一步測試文狱,發(fā)現(xiàn)只要在這個后臺進(jìn)程里調(diào)用了RAFT的LOCK粥鞋,就會阻塞。
但是這邊只有一把鎖瞄崇,我一開始怎么也想不通為啥會阻塞陷虎,因為一把鎖是不會死鎖的到踏。
然后我封裝了加鎖的邏輯,在每一處拿鎖的地方打上LOG
func (rf *Raft)Unlock() {
if rf.me == 0 {
log.Println(rf.me, " unlock ", MyCaller())
}
rf.mu.Unlock()
}
func (rf *Raft)Lock(tag... string) {
tg := ""
if len(tag) > 0{
tg = tag[0]
}
if rf.me == 0 {
log.Println(rf.me, " want lock", MyCaller(), tg)
}
rf.mu.Lock()
if rf.me == 0 {
log.Println(rf.me, " lock", MyCaller(),tg)
}
}
func getFrame(skipFrames int) runtime.Frame {
// We need the frame at index skipFrames+2, since we never want runtime.Callers and getFrame
targetFrameIndex := skipFrames + 2
// Set size to targetFrameIndex+2 to ensure we have room for one more caller than we need
programCounters := make([]uintptr, targetFrameIndex+2)
n := runtime.Callers(0, programCounters)
frame := runtime.Frame{Function: "unknown"}
if n > 0 {
frames := runtime.CallersFrames(programCounters[:n])
for more, frameIndex := true, 0; more && frameIndex <= targetFrameIndex; frameIndex++ {
var frameCandidate runtime.Frame
frameCandidate, more = frames.Next()
if frameIndex == targetFrameIndex {
frame = frameCandidate
}
}
}
return frame
}
// MyCaller returns the caller of the function that called it :)
func MyCaller() string {
// Skip GetCallerFunctionName and the function to get the caller of
return getFrame(2).Function
}
隨后定位到在LEADER 在APPEND LOG的時候拿了一把鎖就沒有還了尚猿。
為了確保一定會UNLOCK,我把LOCK和UNLOCK的邏輯抽到最上層用DEFER來寫
但依然是這樣楣富,不得已凿掂,我加了一些輸出,看走到哪一步后面就阻塞了纹蝴。
根據(jù)LOG 有4 沒6
所以懷疑是在SUCCESS里面庄萎,發(fā)現(xiàn)問題。
原因是在
rf.updateCommitIndex()
里面調(diào)用了
rf.updateLastApplied()
里面會往APPLY CH發(fā)消息塘安,會阻塞糠涛,如果那邊沒人拿消息的話
所以最終結(jié)果是一個阻塞隊列 和一把鎖引起的死鎖。
解決方案有2個兼犯,修改RAFT的鎖邏輯忍捡。把APPLYCH 移出鎖的范圍。
第二個在APPLICATION層的接受APPLYCH的GOROUTINE需要調(diào)用RAFT的鎖都單獨(dú)開個GOROUTINE去做切黔,不要阻塞收APPLY MSG的工作砸脊。
這邊選擇了第一種。(之后發(fā)現(xiàn)重跑1000次RAFT的測試纬霞,會出現(xiàn)1次 LOG順序加載不一致的情況)
所以改成了第2種凌埂。
修改之后,會到第3個測試fail掉
在append entry 的時候诗芜,出現(xiàn)數(shù)組越界瞳抓,加個log看一下
大概是發(fā)生了,leader做了snapshot伏恐,此時還在給follower 發(fā)log孩哑,隨后更新了lastIncludedIndex, 和log,這個時候舊的log還沒發(fā)過去就被compact脐湾。
此時就需要用install snapshot rpc來解決了臭笆。
第7步 實現(xiàn) task 3 install snapshot RPC struct and sender
Modify your Raft leader code to send an InstallSnapshot RPC to a follower when the leader has discarded the log entries the follower needs. When a follower receives an InstallSnapshot RPC, your Raft code will need to send the included snapshot to its kvserver. You can use the applyCh for this purpose, by adding new fields to ApplyMsg. Your solution is complete when it passes all of the Lab 3 tests.
重讀論文第7章,F(xiàn)IRGURE 13秤掌。
結(jié)合HINT
You should send the entire snapshot in a single InstallSnapshot RPC. You do not have to implement Figure 13's offset mechanism for splitting up the snapshot.
第8步 實現(xiàn) task 3 install snapshot RPC Handler
這邊如果SNAPSHOT需要更新愁铺,需要通過APPLY MSG去和KV SERVER交互,所以要定義一個SNAPSHOT的APPLY MSG闻鉴。如果是的話茵乱,就從里面拿到SNAPSHOT 去更新自己的DB 和CID2SEQ
When a follower receives an InstallSnapshot RPC, your Raft code will need to send the included snapshot to its kvserver. You can use the applyCh for this purpose, by adding new fields to ApplyMsg.
第9步 實現(xiàn)LOG被壓縮了,走不同分支
Raft Leader在發(fā)送心跳的時候發(fā)現(xiàn)和Follower的log不匹配的點(diǎn)孟岛,在它現(xiàn)有的log中不存在(已經(jīng)被存入snapshot)瓶竭,Leader就需要把AppendEntries RPC變?yōu)镮nstallSnapshot RPC督勺,同時附帶心跳效果。
.However, an exceptionallyslow follower or a new server joining the cluster (Section 6) would not. The way to bring such a follower up-to-date is for the leader to send it a snapshot over the network.
The leader uses a new RPC called InstallSnapshot to
send snapshots to followers that are too far behind;
第10步 添加KVSERVER更新SNAPSHOT邏輯
第11步 測試
按照HINT 先從
Make sure you pass TestSnapshotRPC before moving on to the other Snapshot tests.
原先的UNRELIABLE過不去的斤贰,也過了
BUG 1 測試3B第一個會BLOCK
也是經(jīng)過一陣打LOG 研究(消耗了4個小時)智哀,發(fā)現(xiàn)了這個問題。
論文里說
Raft also includes a small amount of metadata
in the snapshot: the last included index is the index of the
last entry in the log that the snapshot replaces (the last entry the state machine had applied), and the last included
term is the term of this entry. These are preserved to support the AppendEntries consistency check for the first log
entry following the snapshot, since that entry needs a previous log index and term.
所以我這邊不能把最后的抹成空的term荧恍,需要把最后一個term也記進(jìn)去瓷叫。不然leader之后發(fā)送pre log term 永遠(yuǎn)是0,然后follower就會一直reply false送巡。因為//1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
正確寫法
過了3B第一個RPC的測試之后摹菠,開始測全部3B
發(fā)現(xiàn)有數(shù)組越界 去做檢查
BUG 2
懷疑是有地方?jīng)]有更新LAST APPLIED,造成的骗爆。
看了下這個TEST的屬性次氨,是1個CLIENT,但是會CRASH摘投,懷疑是生成SNAPSHOT之后煮寡,CRASH恢復(fù)回來,沒有更新LAST APPLIED 和 COMMIT INDEX
全局搜更新的地方
發(fā)我只在InstallSnapshot RPC Handler 里加了更新谷朝。
沒有在READ PERSIST里加
這次 測試全過了
但是項目要求
A reasonable amount of time to take for the Lab 3 tests is 400 seconds of real time and 700 seconds of CPU time. Further, go test -run TestSnapshotSize should take less than 20 seconds of real time.
第12步 優(yōu)化性能
因為最慢的是第二個測試 去研究下為啥他那么慢
在測試?yán)锎蛄撕芏郘OG
因為這個循環(huán)有200次洲押,一秒只能跑3次左右。所以要一分多鐘圆凰。而網(wǎng)站給的跑測試的CASE里杈帐,這個CASE是1秒內(nèi)就就解決了。
這里面的操作就是在PUT专钉,而PUT是阻塞的挑童,他必須等到LEADER把這個條目COMMIT了并且APPLIED出去,PUT才能返回跃须。那么關(guān)鍵就是什么時候會COMMIT呢
仔細(xì)分析站叼,COMMIT是在大多數(shù)NODE都把這個條目加到自己的LOG,更新了COMMIT INDEX隨后告訴SERVER菇民。LEADER才會做APPLY尽楔。
那么問題就是這個操作只會發(fā)生在LEADER發(fā)起APPEND LOG RPC的時候,根據(jù)RAFT的代碼第练,是每100MS發(fā)起一次阔馋。這就是原因。
那要讓PUT盡可能的快娇掏,就需要告訴LEADER加一條LOG呕寝,立刻去觸發(fā)這個START APPEND LOG
我們可以看到前2個測試飛快就跑完了。
第四個測試拋了數(shù)組越界婴梧,從下面這行代碼
打了LOG下梢,看到
lastApplied
客蹋,小于了last included index
猜測是append log rpc頻率高了之后,這邊的頻率也會加高孽江,先加個邊界保護(hù)讶坯,因為
last included index
是LOG的基點(diǎn),這個改動不破壞正確性岗屏。BUG 3 第5個測試阻塞
又是之前檢測死鎖闽巩,對LOCK,UNLOCK加LOG大法,發(fā)現(xiàn)死鎖出現(xiàn)在APPEND ENTRY RPC HANDLER 或者 出現(xiàn)在 INSTALL SNAPSHOT RPC HANDLER担汤。
這2個RPC HANDLER最后都對往APPLY CH里發(fā)消息。懷疑又是接受消息的后臺函數(shù)堵塞洼冻。
閱讀了代碼崭歧,懷疑是下面這個。
根據(jù)STUDENT GUIDE也說到這點(diǎn)
Re-appearing indices: Say that your Raft library has some method Start() that takes a command, and return the index at which that command was placed in the log (so that you know when to return to the client, as discussed above). You might assume that you will never see Start() return the same index twice, or at the very least, that if you see the same index again, the command that first returned that index must have failed. It turns out that neither of these things are true, even if no servers crash.
解決方案撞牢,新的替代舊的SEND
TEST RACE 測試成功
zyx@zyx-virtual-machine:~/Desktop/mit6824/mit6.824/src/kvraft$ go test -race
Test: snapshots, one client (3A) ...
... Passed -- 15.0 5 18675 3525
Test: snapshots, many clients (3A) ...
... Passed -- 15.1 5 21955 4067
Test: unreliable net, snapshots, many clients (3A) ...
... Passed -- 16.3 5 10669 1461
Test: concurrent append to same key, unreliable (3A) ...
... Passed -- 1.0 3 276 52
Test: progress in majority (3A) ...
... Passed -- 0.4 5 54 2
Test: no progress in minority (3A) ...
... Passed -- 1.0 5 127 3
Test: completion after heal (3A) ...
... Passed -- 1.0 5 60 3
Test: partitions, one client (3A) ...
... Passed -- 23.3 5 5696 775
Test: partitions, many clients (3A) ...
... Passed -- 22.8 5 10947 1005
Test: restarts, one client (3A) ...
... Passed -- 20.7 5 6857 952
Test: restarts, many clients (3A) ...
... Passed -- 20.2 5 11353 1263
Test: unreliable net, restarts, many clients (3A) ...
... Passed -- 21.4 5 8034 969
Test: restarts, partitions, many clients (3A) ...
... Passed -- 27.4 5 11132 1028
Test: unreliable net, restarts, partitions, many clients (3A) ...
... Passed -- 28.3 5 6883 695
Test: unreliable net, restarts, partitions, many clients, linearizability checks (3A) ...
... Passed -- 25.9 7 12984 649
Test: InstallSnapshot RPC (3B) ...
... Passed -- 2.8 3 704 63
Test: snapshot size is reasonable (3B) ...
... Passed -- 3.2 3 2869 800
Test: restarts, snapshots, one client (3B) ...
... Passed -- 19.1 5 18039 3209
Test: restarts, snapshots, many clients (3B) ...
... Passed -- 19.8 5 19701 2804
Test: unreliable net, snapshots, many clients (3B) ...
... Passed -- 15.8 5 11034 1533
Test: unreliable net, restarts, snapshots, many clients (3B) ...
... Passed -- 20.1 5 11658 1490
Test: unreliable net, restarts, partitions, snapshots, many clients (3B) ...
... Passed -- 27.5 5 8853 1021
Test: unreliable net, restarts, partitions, snapshots, many clients, linearizability checks (3B) ...
... Passed -- 25.9 7 24348 1862
PASS
ok kvraft 375.658s
測試性能
測500次 發(fā)現(xiàn)一些不是必然出現(xiàn)的BUG
BUG 1
通過在更新NEXT INDEX數(shù)組的地方加LOG率碾。定位有時會超過LOG的LEN。
修復(fù)方式
BUG 2
來源于3A系列
DEBUG思路如下屋彪,因為之前新代碼跑過1000次RAFT的回歸測試所宰,排除掉RAFT有問題。
而且3A系列又不涉及INSTALL SNAPSHOT(因為LAB2的測試COVER不到SNAPSHOT這塊)
大概率 懷疑CLIENT SERVER端肯定有地方?jīng)]寫對畜挥。
仔細(xì)過了一遍代碼思考每步有什么漏洞仔粥,加上一些LOG,發(fā)現(xiàn)比較OP的時候蟹但,沒有用全躯泰,比如說GET,只要GET的KEY一樣华糖,OP很容易一樣麦向。
于是對每一個GET操作,用CLIENT端的NRAND函數(shù)客叉,去生成一個唯一碼
同時更新EQUAL OP
單獨(dú)測試上面這個失敗的CASE诵竭,500次發(fā)現(xiàn)沒有問題了。
單獨(dú)測試全部的3A 測試集 200次兼搏,沒有任何異常卵慰。
BUG3
測試整個LAB3, 在3B發(fā)現(xiàn)問題
來源于3B系列
這個錯首先能明確CLIENT SERVER端代碼好的(不然3A會有問題)向族,RAFT基礎(chǔ)好的呵燕。
3B出錯,大概率代碼錯在INSTALL SNAPSHOT 這些改動上件相。
因為破壞的是線性一致性再扭,也就是APPLY LOG不一致了氧苍。那么造成問題的一定是并行的(就會造成隨機(jī)的)APPLY LOG,或者LAST APPLY 或 COMMIT INDEX這2個變量的更新有BUG泛范。
按照這個思路也是閱讀代碼让虐,排除了前者,因為APPLY LOG我的代碼都嚴(yán)格的被鎖保護(hù)了罢荡。
搜索
rf.commitIndex =
的代碼發(fā)現(xiàn)之前我們曾經(jīng)加過
隨后在install snapshot rpc handler的地方代碼改成和上面一樣赡突,通過打LOG也確實會發(fā)生,COMMIT INDEX 比 LAST INCLUDED INDEX大的情況区赵。
可是依然線性一致性的檢測有錯惭缰,排查了很久。
發(fā)現(xiàn)笼才,因為如果LAST APPLY已經(jīng)超過 LAST INCLUDED INDEX 的情況下漱受,KV SERVER端的DB 狀態(tài)是比SNAPSHOT 更新的。
所以還要加上如下代碼
測試SHELL 腳本
export GOPATH="/home/zyx/Desktop/mit6824/mit6.824"
export PATH="$PATH:/usr/lib/go-1.9/bin"
rm res -rf
mkdir res
set int j = 0
for ((i = 0; j < 25; i++))
do
for ((c = $((i*10)); c < $(( (i+1)*10)); c++))
do
(go test -run TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable3B) &> ./res/$c &
done
sleep 40
if grep -nr "FAIL.*raft.*" res; then
echo "fail"
fi
done
測試500次通過
下面對代碼做一些CONCISE骡送。
Client
package raftkv
import (
"labrpc"
)
import "crypto/rand"
import "math/big"
type Clerk struct {
servers []*labrpc.ClientEnd
lastLeader int
id int64
seqNum int
}
func Nrand() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := rand.Int(rand.Reader, max)
x := bigx.Int64()
return x
}
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.servers = servers
ck.id = Nrand()//give each client a unique identifier, and then have them
ck.seqNum = 0// tag each request with a monotonically increasing sequence number.
ck.lastLeader = 0
return ck
}
func (ck *Clerk) Get(key string) string {
index := ck.lastLeader
for {
args := GetArgs{key}
reply := GetReply{}
ok := ck.servers[index].Call("KVServer.Get", &args, &reply)
if ok && !reply.WrongLeader {
ck.lastLeader = index
return reply.Value
}
index = (index + 1) % len(ck.servers)
time.Sleep(time.Duration(100)*time.Millisecond)
}
}
func (ck *Clerk) PutAppend(key string, value string, op string) {
index := ck.lastLeader
args := PutAppendArgs{key, value, op, ck.id, ck.seqNum}
ck.seqNum++
for {
reply := PutAppendReply{}
ok := ck.servers[index].Call("KVServer.PutAppend", &args, &reply)
if ok && !reply.WrongLeader {
ck.lastLeader = index
return
}
index = (index + 1) % len(ck.servers)
time.Sleep(time.Duration(100)*time.Millisecond)
}
}
func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
ck.PutAppend(key, value, "Append")
}
server
package raftkv
import (
"bytes"
"labgob"
"labrpc"
"log"
"raft"
"strconv"
"sync"
"time"
)
type Op struct {
OpType string "operation type(eg. put/append)"
Key string
Value string
Cid int64
SeqNum int
}
type KVServer struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
maxraftstate int // snapshot if log grows this big
timeout time.Duration
persist *raft.Persister
db map[string]string
chMap map[int]chan Op
cid2Seq map[int64]int
killCh chan bool
}
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
//from hint: A simple solution is to enter every Get() (as well as each Put() and Append()) in the Raft log.
originOp := Op{"Get",args.Key,strconv.FormatInt(Nrand(),10),0,0}
reply.WrongLeader = true
index,_,isLeader := kv.rf.Start(originOp)
if !isLeader {return}
ch := kv.putIfAbsent(index)
op := beNotified(ch)
if equalOp(op,originOp) {
reply.WrongLeader = false
kv.mu.Lock()
reply.Value = kv.db[op.Key]
kv.mu.Unlock()
}
}
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
originOp := Op{args.Op,args.Key,args.Value,args.Cid,args.SeqNum}
reply.WrongLeader = true
index,_,isLeader := kv.rf.Start(originOp)
if !isLeader {return}
ch := kv.putIfAbsent(index)
op := beNotified(ch)
if equalOp(originOp,op) {
reply.WrongLeader = false
}
}
func beNotified(ch chan Op) Op{
select {
case notifyArg := <- ch :
return notifyArg
case <- time.After(time.Duration(600)*time.Millisecond):
return Op{}
}
}
func (kv *KVServer) putIfAbsent(idx int) chan Op{
kv.mu.Lock()
defer kv.mu.Unlock()
if _, ok := kv.chMap[idx]; !ok {
kv.chMap[idx] = make(chan Op,1)
}
return kv.chMap[idx]
}
func equalOp(a Op, b Op) bool{
return a.Key == b.Key && a.Value == b.Value && a.OpType == b.OpType && a.SeqNum == b.SeqNum && a.Cid == b.Cid
}
func (kv *KVServer) Kill() {
kv.rf.Kill()
kv.killCh <- true
}
func (kv *KVServer) readSnapShot(snapshot []byte) {
kv.mu.Lock()
defer kv.mu.Unlock()
if snapshot == nil || len(snapshot) < 1 {return}
r := bytes.NewBuffer(snapshot)
d := labgob.NewDecoder(r)
var db map[string]string
var cid2Seq map[int64]int
if d.Decode(&db) != nil || d.Decode(&cid2Seq) != nil {
log.Fatal("readSnapShot ERROR for server %v",kv.me)
} else {
kv.db, kv.cid2Seq = db, cid2Seq
}
}
func (kv *KVServer) needSnapShot() bool {
kv.mu.Lock()
defer kv.mu.Unlock()
threshold := 10
return kv.maxraftstate > 0 &&
kv.maxraftstate - kv.persist.RaftStateSize() < kv.maxraftstate/threshold
}
func (kv *KVServer) doSnapShot(index int) {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
kv.mu.Lock()
e.Encode(kv.db)
e.Encode(kv.cid2Seq)
kv.mu.Unlock()
kv.rf.DoSnapShot(index,w.Bytes())
}
func send(notifyCh chan Op,op Op) {
select{
case <-notifyCh:
default:
}
notifyCh <- op
}
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
labgob.Register(Op{})
kv := new(KVServer)
kv.me = me
kv.maxraftstate = maxraftstate
kv.persist = persister
// You may need initialization code here.
kv.db = make(map[string]string)
kv.chMap = make(map[int]chan Op)
kv.cid2Seq = make(map[int64]int)
kv.readSnapShot(kv.persist.ReadSnapshot())
kv.applyCh = make(chan raft.ApplyMsg)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
kv.killCh = make(chan bool,1)
go func() {
for {
select {
case <- kv.killCh:
return
case applyMsg := <- kv.applyCh:
if !applyMsg.CommandValid {
kv.readSnapShot(applyMsg.SnapShot)
continue
}
op := applyMsg.Command.(Op)
kv.mu.Lock()
maxSeq,found := kv.cid2Seq[op.Cid]
if !found || op.SeqNum > maxSeq {
switch op.OpType {
case "Put":
kv.db[op.Key] = op.Value
case "Append":
kv.db[op.Key] += op.Value
}
kv.cid2Seq[op.Cid] = op.SeqNum
}
kv.mu.Unlock()
notifyCh := kv.putIfAbsent(applyMsg.CommandIndex)
if kv.needSnapShot() {
go kv.doSnapShot(applyMsg.CommandIndex)
}
send(notifyCh,op)
}
}
}()
return kv
}
新的RAFT代碼
package raft
import (
"bytes"
"labgob"
"log"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"
)
import "labrpc"
type ApplyMsg struct {
CommandValid bool
Command interface{}
CommandIndex int
SnapShot []byte
}
type State int
const (
Follower State = iota // value --> 0
Candidate // value --> 1
Leader // value --> 2
)
const NULL int = -1
type Log struct {
Term int "term when entry was received by leader"
Command interface{} "command for state machine,"
}
// A Go object implementing a single Raft peer.
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
// state a Raft server must maintain.
state State
//Persistent state on all servers:(Updated on stable storage before responding to RPCs)
currentTerm int "latest term server has seen (initialized to 0 increases monotonically)"
votedFor int "candidateId that received vote in current term (or null if none)"
log []Log "log entries;(first index is 1)"
//log compaction
lastIncludedIndex int "the snapshot replaces all entries up through and including this index"
lastIncludedTerm int "term of lastIncludedIndex"
//Volatile state on all servers:
commitIndex int "index of highest log entry known to be committed (initialized to 0, increases monotonically)"
lastApplied int "index of highest log entry applied to state machine (initialized to 0, increases monotonically)"
//Volatile state on leaders:(Reinitialized after election)
nextIndex []int "for each server,index of the next log entry to send to that server"
matchIndex []int "for each server,index of highest log entry known to be replicated on server(initialized to 0, im)"
//channel
applyCh chan ApplyMsg // from Make()
killCh chan bool //for Kill()
//handle rpc
voteCh chan bool
appendLogCh chan bool
}
// return currentTerm and whether this server believes it is the leader.
func (rf *Raft) GetState() (int, bool) {
var term int
var isleader bool
rf.mu.Lock()
defer rf.mu.Unlock()
term = rf.currentTerm
isleader = (rf.state == Leader)
return term, isleader
}
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
func (rf *Raft) persist() {
rf.persister.SaveRaftState(rf.encodeRaftState())
}
func (rf *Raft) encodeRaftState() []byte {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
e.Encode(rf.lastIncludedIndex)
e.Encode(rf.lastIncludedTerm)
return w.Bytes()
}
func (rf *Raft) persistWithSnapShot(snapshot []byte) {
rf.persister.SaveStateAndSnapshot(rf.encodeRaftState(),snapshot)
}
// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var currentTerm int
var voteFor int
var clog []Log
var lastIncludedIndex int
var lastIncludedTerm int
if d.Decode(¤tTerm) != nil || d.Decode(&voteFor) != nil || d.Decode(&clog) != nil ||
d.Decode(&lastIncludedIndex) != nil || d.Decode(&lastIncludedTerm) != nil{
log.Fatal("readPersist ERROR for server %v",rf.me)
} else {
rf.mu.Lock()
rf.currentTerm, rf.votedFor, rf.log = currentTerm, voteFor, clog
rf.lastIncludedTerm, rf.lastIncludedIndex = lastIncludedTerm, lastIncludedIndex
rf.commitIndex, rf.lastApplied = rf.lastIncludedIndex, rf.lastIncludedIndex
rf.mu.Unlock()
}
}
// RequestVote RPC arguments structure. field names must start with capital letters!
type RequestVoteArgs struct {
Term int "candidate’s term"
CandidateId int "candidate requesting vote"
LastLogIndex int "index of candidate’s last log entry (§5.4)"
LastLogTerm int "term of candidate’s last log entry (§5.4)"
}
// RequestVote RPC reply structure. field names must start with capital letters!
type RequestVoteReply struct {
Term int "currentTerm, for candidate to update itself"
VoteGranted bool "true means candidate received vote"
}
//RequestVote RPC handler.
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if args.Term > rf.currentTerm {//all server rule 1 If RPC request or response contains term T > currentTerm:
rf.beFollower(args.Term) // set currentTerm = T, convert to follower (§5.1)
}
reply.Term = rf.currentTerm
reply.VoteGranted = false
if (args.Term < rf.currentTerm) || (rf.votedFor != NULL && rf.votedFor != args.CandidateId) {
// Reply false if term < currentTerm (§5.1) If votedFor is not null and not candidateId,
} else if args.LastLogTerm < rf.getLastLogTerm() || (args.LastLogTerm == rf.getLastLogTerm() && args.LastLogIndex < rf.getLastLogIdx()){
//If the logs have last entries with different terms, then the log with the later term is more up-to-date.
// If the logs end with the same term, then whichever log is longer is more up-to-date.
// Reply false if candidate’s log is at least as up-to-date as receiver’s log
} else {
//grant vote
rf.votedFor = args.CandidateId
reply.VoteGranted = true
rf.state = Follower
rf.persist()
send(rf.voteCh) //because If election timeout elapses without receiving granting vote to candidate, so wake up
}
}
////RequestVote RPC sender.
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
return ok
}
type AppendEntriesArgs struct {
Term int "leader’s term"
LeaderId int "so follower can redirect clients"
PrevLogIndex int "index of log entry immediately preceding new ones"
PrevLogTerm int "term of prevLogIndex entry"
Entries []Log "log entries to store (empty for heartbeat;may send more than one for efficiency)"
LeaderCommit int "leader’s commitIndex"
}
type AppendEntriesReply struct {
Term int "currentTerm, for leader to update itself"
Success bool "true if follower contained entry matching prevLogIndex and prevLogTerm"
ConflictIndex int "the first index it stores for that conflict term"
ConflictTerm int "the term of the conflicting entry"
}
//AppendEntries RPC handler.
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {//now only for heartbeat
rf.mu.Lock()
defer rf.mu.Unlock()
if args.Term > rf.currentTerm { //all server rule 1 If RPC request or response contains term T > currentTerm:
rf.beFollower(args.Term) // set currentTerm = T, convert to follower (§5.1)
}
send(rf.appendLogCh) //If election timeout elapses without receiving AppendEntries RPC from current leader
reply.Term = rf.currentTerm
reply.Success = false
reply.ConflictTerm = NULL
reply.ConflictIndex = 0
//1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
prevLogIndexTerm := -1
logSize := rf.logLen()
if args.PrevLogIndex >= rf.lastIncludedIndex && args.PrevLogIndex < rf.logLen() {
prevLogIndexTerm = rf.getLog(args.PrevLogIndex).Term
}
if prevLogIndexTerm != args.PrevLogTerm {
reply.ConflictIndex = logSize
if prevLogIndexTerm == -1 {//If a follower does not have prevLogIndex in its log,
//it should return with conflictIndex = len(log) and conflictTerm = None.
} else { //If a follower does have prevLogIndex in its log, but the term does not match
reply.ConflictTerm = prevLogIndexTerm //it should return conflictTerm = log[prevLogIndex].Term,
i := rf.lastIncludedIndex
for ; i < logSize; i++ {//and then search its log for
if rf.getLog(i).Term == reply.ConflictTerm {//the first index whose entry has term equal to conflictTerm
reply.ConflictIndex = i
break
}
}
}
return
}
//2. Reply false if term < currentTerm (§5.1)
if args.Term < rf.currentTerm {return}
index := args.PrevLogIndex
for i := 0; i < len(args.Entries); i++ {
index++
if index < logSize {
if rf.getLog(index).Term == args.Entries[i].Term {
continue
} else {//3. If an existing entry conflicts with a new one (same index but different terms),
rf.log = rf.log[:index - rf.lastIncludedIndex]//delete the existing entry and all that follow it (§5.3)
}
}
rf.log = append(rf.log,args.Entries[i:]...) //4. Append any new entries not already in the log
rf.persist()
break;
}
//5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = Min(args.LeaderCommit ,rf.getLastLogIdx())
rf.updateLastApplied()
}
reply.Success = true
}
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
return ok
}
//InstallSnapshot RPC
type InstallSnapshotArgs struct {
Term int "leader’s term"
LeaderId int "so follower can redirect clients"
LastIncludedIndex int "the snapshot replaces all entries up through and including this index"
LastIncludedTerm int "term of lastIncludedIndex"
Data []byte "raw bytes of the snapshot chunk, starting at offset"
}
type InstallSnapshotReply struct {
Term int "currentTerm, for leader to update itself"
}
func (rf *Raft) sendInstallSnapshot(server int, args *InstallSnapshotArgs, reply *InstallSnapshotReply) bool {
ok := rf.peers[server].Call("Raft.InstallSnapshot", args, reply)
return ok
}
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm { //Reply immediately if term < currentTerm
return
}
if args.Term > rf.currentTerm { //all server rule 1 If RPC request or response contains term T > currentTerm:
rf.beFollower(args.Term) // set currentTerm = T, convert to follower (§5.1)
}
send(rf.appendLogCh) //If election timeout elapses without receiving AppendEntries RPC from current leader
if args.LastIncludedIndex <= rf.lastIncludedIndex {// discard any existing or partial snapshot with a smaller index
return
}
applyMsg := ApplyMsg{CommandValid: false, SnapShot: args.Data}
//If existing log entry has same index and term as snapshot’s last included entry,retain log entries following it and reply
if args.LastIncludedIndex < rf.logLen()-1 {
rf.log = append(make([]Log,0),rf.log[args.LastIncludedIndex -rf.lastIncludedIndex:]...)
}else {//7. Discard the entire log
rf.log = []Log{{args.LastIncludedTerm, nil},}
}
//Reset state machine using snapshot contents (and load snapshot’s cluster configuration)
rf.lastIncludedIndex, rf.lastIncludedTerm = args.LastIncludedIndex, args.LastIncludedTerm
rf.persistWithSnapShot(args.Data)
rf.commitIndex = Max(rf.commitIndex,rf.lastIncludedIndex)
rf.lastApplied = Max(rf.lastApplied, rf.lastIncludedIndex)
if rf.lastApplied > rf.lastIncludedIndex {return} //snapshot is older than kvserver's db, so reply immediately
rf.applyCh <- applyMsg
}
func (rf *Raft) sendSnapshot(server int) {
args := InstallSnapshotArgs{
Term: rf.currentTerm,
LastIncludedIndex: rf.lastIncludedIndex,
LastIncludedTerm: rf.lastIncludedTerm,
LeaderId: rf.me,
Data: rf.persister.ReadSnapshot(),
}
rf.mu.Unlock()
reply := InstallSnapshotReply{}
ret := rf.sendInstallSnapshot(server,&args,&reply)
rf.mu.Lock();
defer rf.mu.Unlock()
if !ret || rf.state != Leader || rf.currentTerm != args.Term {
return
}
if reply.Term > rf.currentTerm {//all server rule 1 If RPC response contains term T > currentTerm:
rf.beFollower(reply.Term) // set currentTerm = T, convert to follower (§5.1)
return
}
rf.updateNextMatchIdx(server,rf.lastIncludedIndex)
}
//Leader Section:
func (rf *Raft) startAppendLog() {
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
go func(idx int) {
for {
rf.mu.Lock();
if rf.state != Leader {
rf.mu.Unlock()
return
} //send initial empty AppendEntries RPCs (heartbeat) to each server
if rf.nextIndex[idx]-rf.lastIncludedIndex < 1 { //The leader uses a new RPC called InstallSnapshot to
rf.sendSnapshot(idx) // followers that are too far behind
return
}
args := AppendEntriesArgs{
rf.currentTerm,
rf.me,
rf.getPrevLogIdx(idx),
rf.getPrevLogTerm(idx),
//If last log index ≥ nextIndex for a follower:send AppendEntries RPC with log entries starting at nextIndex
//nextIndex > last log index, rf.log[rf.nextIndex[idx]:] will be empty then like a heartbeat
append(make([]Log, 0), rf.log[rf.nextIndex[idx]-rf.lastIncludedIndex:]...),
rf.commitIndex,
}
rf.mu.Unlock()
reply := &AppendEntriesReply{}
ret := rf.sendAppendEntries(idx, &args, reply)
rf.mu.Lock();
if !ret || rf.state != Leader || rf.currentTerm != args.Term {
rf.mu.Unlock()
return
}
if reply.Term > rf.currentTerm { //all server rule 1 If RPC response contains term T > currentTerm:
rf.beFollower(reply.Term) // set currentTerm = T, convert to follower (§5.1)
rf.mu.Unlock()
return
}
if reply.Success { //If successful:update nextIndex and matchIndex for follower
rf.updateNextMatchIdx(idx, args.PrevLogIndex+len(args.Entries))
rf.mu.Unlock()
return
} else { //If AppendEntries fails because of log inconsistency: decrement nextIndex and retry
tarIndex := reply.ConflictIndex //If it does not find an entry with that term
if reply.ConflictTerm != NULL {
logSize := rf.logLen() //first search its log for conflictTerm
for i := rf.lastIncludedIndex; i < logSize; i++ { //if it finds an entry in its log with that term,
if rf.getLog(i).Term != reply.ConflictTerm {
continue
}
for i < logSize && rf.getLog(i).Term == reply.ConflictTerm {
i++
} //set nextIndex to be the one
tarIndex = i //beyond the index of the last entry in that term in its log
}
}
rf.nextIndex[idx] = Min(rf.logLen(),tarIndex);
rf.mu.Unlock()
}
}
}(i)
}
}
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
func (rf *Raft) Start(command interface{}) (int, int, bool) {
rf.mu.Lock()
defer rf.mu.Unlock()
index := -1
term := rf.currentTerm
isLeader := (rf.state == Leader)
//If command received from client: append entry to local log, respond after entry applied to state machine (§5.3)
if isLeader {
index = rf.getLastLogIdx() + 1
newLog := Log{
rf.currentTerm,
command,
}
rf.log = append(rf.log,newLog)
rf.persist()
rf.startAppendLog()
}
return index, term, isLeader
}
//If there exists an N such that N > commitIndex,
// a majority of matchIndex[i] ≥ N, and log[N].term == currentTerm: set commitIndex = N (§5.3, §5.4).
func (rf *Raft) updateCommitIndex() {
rf.matchIndex[rf.me] = rf.logLen() - 1
copyMatchIndex := make([]int,len(rf.matchIndex))
copy(copyMatchIndex,rf.matchIndex)
sort.Sort(sort.Reverse(sort.IntSlice(copyMatchIndex)))
N := copyMatchIndex[len(copyMatchIndex)/2]
if N > rf.commitIndex && rf.getLog(N).Term == rf.currentTerm {
rf.commitIndex = N
rf.updateLastApplied()
}
}
func (rf *Raft) beLeader() {
if rf.state != Candidate {
return
}
rf.state = Leader
//initialize leader data
rf.nextIndex = make([]int,len(rf.peers))
rf.matchIndex = make([]int,len(rf.peers))//initialized to 0
for i := 0; i < len(rf.nextIndex); i++ {//(initialized to leader last log index + 1)
rf.nextIndex[i] = rf.getLastLogIdx() + 1
}
}
//end Leader section
//Candidate Section:
// If AppendEntries RPC received from new leader: convert to follower implemented in AppendEntries RPC Handler
func (rf *Raft) beCandidate() { //Reset election timer are finished in caller
rf.state = Candidate
rf.currentTerm++ //Increment currentTerm
rf.votedFor = rf.me //vote myself first
rf.persist()
//ask for other's vote
go rf.startElection() //Send RequestVote RPCs to all other servers
}
//If election timeout elapses: start new election handled in caller
func (rf *Raft) startElection() {
rf.mu.Lock()
args := RequestVoteArgs{
rf.currentTerm,
rf.me,
rf.getLastLogIdx(),
rf.getLastLogTerm(),
};
rf.mu.Unlock()
var votes int32 = 1;
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
go func(idx int) {
reply := &RequestVoteReply{}
ret := rf.sendRequestVote(idx,&args,reply)
if ret {
rf.mu.Lock()
defer rf.mu.Unlock()
if reply.Term > rf.currentTerm {
rf.beFollower(reply.Term)
return
}
if rf.state != Candidate || rf.currentTerm != args.Term{
return
}
if reply.VoteGranted {
atomic.AddInt32(&votes,1)
} //If votes received from majority of servers: become leader
if atomic.LoadInt32(&votes) > int32(len(rf.peers) / 2) {
rf.beLeader()
rf.startAppendLog()
send(rf.voteCh) //after be leader, then notify 'select' goroutine will sending out heartbeats immediately
}
}
}(i)
}
}
//end Candidate section
//Follower Section:
func (rf *Raft) beFollower(term int) {
rf.state = Follower
rf.votedFor = NULL
rf.currentTerm = term
rf.persist()
}
//end Follower section
//all server rule : If commitIndex > lastApplied: increment lastApplied, apply log[lastApplied] to state machine
func (rf *Raft) updateLastApplied() {
rf.lastApplied = Max(rf.lastApplied,rf.lastIncludedIndex)
rf.commitIndex = Max(rf.commitIndex,rf.lastIncludedIndex)
for rf.lastApplied < rf.commitIndex {
rf.lastApplied++
curLog := rf.getLog(rf.lastApplied)
applyMsg := ApplyMsg{true, curLog.Command, rf.lastApplied, nil,}
rf.applyCh <- applyMsg
}
}
//log compaction:
func (rf *Raft) DoSnapShot(curIdx int, snapshot []byte) {
rf.mu.Lock()
defer rf.mu.Unlock()
if curIdx <= rf.lastIncludedIndex {return}
//update last included index & term
rf.log = append(make([]Log,0), rf.log[curIdx-rf.lastIncludedIndex:]...)
rf.lastIncludedIndex = curIdx
rf.lastIncludedTerm = rf.getLog(curIdx).Term
rf.persistWithSnapShot(snapshot)
}
// the tester calls Kill() when a Raft instance won't be needed again.
func (rf *Raft) Kill() {
send(rf.killCh)
}
//Helper function
func send(ch chan bool) {
select {
case <-ch: //if already set, consume it then resent to avoid block
default:
}
ch <- true
}
func (rf *Raft) getLog(i int) Log {
return rf.log[i - rf.lastIncludedIndex]
}
func (rf *Raft) logLen() int {
return len(rf.log) + rf.lastIncludedIndex
}
func (rf *Raft) getPrevLogIdx(i int) int {
return rf.nextIndex[i] - 1
}
func (rf *Raft) getPrevLogTerm(i int) int {
prevLogIdx := rf.getPrevLogIdx(i)
if prevLogIdx < rf.lastIncludedIndex {
return -1
}
return rf.getLog(prevLogIdx).Term
}
func (rf *Raft) getLastLogIdx() int {
return rf.logLen() - 1
}
func (rf *Raft) getLastLogTerm() int {
idx := rf.getLastLogIdx()
if idx < rf.lastIncludedIndex {
return -1
}
return rf.getLog(idx).Term
}
func (rf *Raft) updateNextMatchIdx(server int, matchIdx int) {
rf.matchIndex[server] = matchIdx
rf.nextIndex[server] = matchIdx + 1
rf.updateCommitIndex()
}
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
rf.state = Follower
rf.currentTerm = 0
rf.votedFor = NULL
rf.log = make([]Log,1) //(first index is 1)
rf.commitIndex = 0
rf.lastApplied = 0
rf.applyCh = applyCh
//because gorountne only send the chan to below goroutine,to avoid block, need 1 buffer
rf.voteCh = make(chan bool,1)
rf.appendLogCh = make(chan bool,1)
rf.killCh = make(chan bool,1)
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
//because from hint The tester requires that the leader send heartbeat RPCs no more than ten times per second.
heartbeatTime := time.Duration(100) * time.Millisecond
//from hint :You'll need to write code that takes actions periodically or after delays in time.
// The easiest way to do this is to create a goroutine with a loop that calls time.Sleep().
go func() {
for {
select {
case <-rf.killCh:
return
default:
}
electionTime := time.Duration(rand.Intn(200) + 300) * time.Millisecond
rf.mu.Lock()
state := rf.state
rf.mu.Unlock()
switch state {
case Follower, Candidate:
select {
case <-rf.voteCh:
case <-rf.appendLogCh:
case <-time.After(electionTime):
rf.mu.Lock()
rf.beCandidate() //becandidate, Reset election timer, then start election
rf.mu.Unlock()
}
case Leader:
time.Sleep(heartbeatTime)
rf.startAppendLog()
}
}
}()
return rf
}