6. 基于RAFT 實現(xiàn)容錯KV服務(wù)(3B)

第一步 閱讀相關(guān)需求

https://pdos.csail.mit.edu/6.824/labs/lab-kvraft.html
的Part B: Key/value service with log compaction

論文的第7章浇借。

第二步 思考

You should spend some time figuring out what the interface will be between your Raft library and your service so that your Raft library can discard log entries. Think about how your Raft will operate while storing only the tail of the log, and how it will discard old log entries.

下面給了一定的線索

You should compare maxraftstate to persister.RaftStateSize(). Whenever your key/value server detects that the Raft state size is approaching this threshold, it should save a snapshot, and tell the Raft library that it has snapshotted, so that Raft can discard old log entries. If maxraftstate is -1, you do not have to snapshot.

image.png

所以日志瘦身的步驟大概是

  1. 當(dāng)SERVER端發(fā)現(xiàn) 當(dāng)前的RAFT STATE SIZE 接近 max raft state 的時候 觸發(fā)do snap shot
  2. do snap shot 就是把這個INDEX前的日志截斷战授,更新LAST INCULDED INDEX 河咽,持久化到RAFT STATE
  3. 把 SERVER那邊的SNAPSHOT 持久化進(jìn)SNAPSHOT
  4. 更新RAFT的代碼淫僻,保證下標(biāo)的準(zhǔn)確性(因為LOG被截斷過缰儿,所以原INDEX需要減去LAST INCULDED INDEX频祝, 才是新的LOG數(shù)組的下標(biāo))
  5. KV SERVER 啟動的時候 需要載入SNAPSHOT

第三步 實現(xiàn)TASK1 DoSnapShot

任務(wù)目標(biāo)

Your raft.go probably keeps the entire log in a Go slice. Modify it so that it can be given a log index, discard the entries before that index, and continue operating while storing only log entries after that index. Make sure you pass all the Raft tests after making these changes.

首先KV這邊要把他的狀態(tài)傳到RAFT這邊沧踏,因為發(fā)起這個DO SNAPSHOT的請求同時,可能會有新進(jìn)來的LOG欣簇,所以要把這個SNAPSHOT所屬的INDEX也發(fā)過來规脸。RAFT接受到后

  1. 首先建立一個新的LOG數(shù)組,值保留這個INDEX之后的LOG

  2. 把LOG數(shù)組的指針指向新的數(shù)組

  3. 把新的RAFT STATE(currentTerm, voteFor, Log[])存下來

  4. 把KVSERVER的 SNAPSHOT存下來熊咽。同時要保存LAST INDEX和這個INDEX的TERM


    image.png
image.png
image.png
image.png

第四步 實現(xiàn)task 2 KV SERVER端的調(diào)用

Modify your kvserver so that it detects when the persisted Raft state grows too large, and then hands a snapshot to Raft and tells Raft that it can discard old log entries. Raft should save each snapshot with persister.SaveStateAndSnapshot() (don't use files). A kvserver instance should restore the snapshot from the persister when it re-starts.

image.png

image.png
image.png
image.png

第五步 實現(xiàn)下標(biāo)的更新

原代碼有很多處要更新莫鸭,就是原來rf.log[i]現(xiàn)在都調(diào)用這個方法rf.getLog(i)
原來的len(rf.log) 改成 rf.logLen()
還有些slice的地方也要自己注意修改,加上偏移量横殴。

image.png

image.png

image.png

注意有些0被因,要改成rf.lastIncludedIndex,如下圖

image.png

第六步 測試

改完之后,重測raft的所有test

在kvserver 3A的test里定義一個CONST衫仑,隨后把所有maxraftstate的值改成150

image.png

發(fā)現(xiàn)第一個測試 可以過,加個log看下效果梨与。


image.png

image.png

但是測第2個并發(fā)的時候 block住了。
進(jìn)一步測試文狱,發(fā)現(xiàn)只要在這個后臺進(jìn)程里調(diào)用了RAFT的LOCK粥鞋,就會阻塞。


image.png

但是這邊只有一把鎖瞄崇,我一開始怎么也想不通為啥會阻塞陷虎,因為一把鎖是不會死鎖的到踏。
然后我封裝了加鎖的邏輯,在每一處拿鎖的地方打上LOG

func (rf *Raft)Unlock() {
    if rf.me == 0 {
        log.Println(rf.me, " unlock ", MyCaller())
    }
    rf.mu.Unlock()
}

func (rf *Raft)Lock(tag... string) {
    tg := ""
    if len(tag) > 0{
        tg = tag[0]
    }
    if rf.me == 0 {
        log.Println(rf.me, " want lock", MyCaller(), tg)
    }
    rf.mu.Lock()
    if rf.me == 0 {
        log.Println(rf.me, " lock", MyCaller(),tg)
    }
}

func getFrame(skipFrames int) runtime.Frame {
    // We need the frame at index skipFrames+2, since we never want runtime.Callers and getFrame
    targetFrameIndex := skipFrames + 2

    // Set size to targetFrameIndex+2 to ensure we have room for one more caller than we need
    programCounters := make([]uintptr, targetFrameIndex+2)
    n := runtime.Callers(0, programCounters)

    frame := runtime.Frame{Function: "unknown"}
    if n > 0 {
        frames := runtime.CallersFrames(programCounters[:n])
        for more, frameIndex := true, 0; more && frameIndex <= targetFrameIndex; frameIndex++ {
            var frameCandidate runtime.Frame
            frameCandidate, more = frames.Next()
            if frameIndex == targetFrameIndex {
                frame = frameCandidate
            }
        }
    }

    return frame
}

// MyCaller returns the caller of the function that called it :)
func MyCaller() string {
    // Skip GetCallerFunctionName and the function to get the caller of
    return getFrame(2).Function
}

隨后定位到在LEADER 在APPEND LOG的時候拿了一把鎖就沒有還了尚猿。


image.png

為了確保一定會UNLOCK,我把LOCK和UNLOCK的邏輯抽到最上層用DEFER來寫
但依然是這樣楣富,不得已凿掂,我加了一些輸出,看走到哪一步后面就阻塞了纹蝴。

image.png

image.png

根據(jù)LOG 有4 沒6
所以懷疑是在SUCCESS里面庄萎,發(fā)現(xiàn)問題。
原因是在rf.updateCommitIndex()
里面調(diào)用了rf.updateLastApplied()
里面會往APPLY CH發(fā)消息塘安,會阻塞糠涛,如果那邊沒人拿消息的話
image.png

所以最終結(jié)果是一個阻塞隊列 和一把鎖引起的死鎖。

解決方案有2個兼犯,修改RAFT的鎖邏輯忍捡。把APPLYCH 移出鎖的范圍。

第二個在APPLICATION層的接受APPLYCH的GOROUTINE需要調(diào)用RAFT的鎖都單獨(dú)開個GOROUTINE去做切黔,不要阻塞收APPLY MSG的工作砸脊。

這邊選擇了第一種。(之后發(fā)現(xiàn)重跑1000次RAFT的測試纬霞,會出現(xiàn)1次 LOG順序加載不一致的情況)


image.png

image.png

所以改成了第2種凌埂。


image.png

修改之后,會到第3個測試fail掉


image.png

在append entry 的時候诗芜,出現(xiàn)數(shù)組越界瞳抓,加個log看一下


image.png

image.png

大概是發(fā)生了,leader做了snapshot伏恐,此時還在給follower 發(fā)log孩哑,隨后更新了lastIncludedIndex, 和log,這個時候舊的log還沒發(fā)過去就被compact脐湾。

此時就需要用install snapshot rpc來解決了臭笆。

第7步 實現(xiàn) task 3 install snapshot RPC struct and sender

Modify your Raft leader code to send an InstallSnapshot RPC to a follower when the leader has discarded the log entries the follower needs. When a follower receives an InstallSnapshot RPC, your Raft code will need to send the included snapshot to its kvserver. You can use the applyCh for this purpose, by adding new fields to ApplyMsg. Your solution is complete when it passes all of the Lab 3 tests.

重讀論文第7章,F(xiàn)IRGURE 13秤掌。
結(jié)合HINT


image.png

You should send the entire snapshot in a single InstallSnapshot RPC. You do not have to implement Figure 13's offset mechanism for splitting up the snapshot.

image.png

第8步 實現(xiàn) task 3 install snapshot RPC Handler

這邊如果SNAPSHOT需要更新愁铺,需要通過APPLY MSG去和KV SERVER交互,所以要定義一個SNAPSHOT的APPLY MSG闻鉴。如果是的話茵乱,就從里面拿到SNAPSHOT 去更新自己的DB 和CID2SEQ

When a follower receives an InstallSnapshot RPC, your Raft code will need to send the included snapshot to its kvserver. You can use the applyCh for this purpose, by adding new fields to ApplyMsg.

image.png
image.png

image.png

第9步 實現(xiàn)LOG被壓縮了,走不同分支

Raft Leader在發(fā)送心跳的時候發(fā)現(xiàn)和Follower的log不匹配的點(diǎn)孟岛,在它現(xiàn)有的log中不存在(已經(jīng)被存入snapshot)瓶竭,Leader就需要把AppendEntries RPC變?yōu)镮nstallSnapshot RPC督勺,同時附帶心跳效果。

.However, an exceptionallyslow follower or a new server joining the cluster (Section 6) would not. The way to bring such a follower up-to-date is for the leader to send it a snapshot over the network.
The leader uses a new RPC called InstallSnapshot to
send snapshots to followers that are too far behind;

image.png
image.png
image.png

第10步 添加KVSERVER更新SNAPSHOT邏輯

image.png

第11步 測試

按照HINT 先從

Make sure you pass TestSnapshotRPC before moving on to the other Snapshot tests.

image.png

原先的UNRELIABLE過不去的斤贰,也過了

image.png

BUG 1 測試3B第一個會BLOCK

也是經(jīng)過一陣打LOG 研究(消耗了4個小時)智哀,發(fā)現(xiàn)了這個問題。


image.png

論文里說

Raft also includes a small amount of metadata
in the snapshot: the last included index is the index of the
last entry in the log that the snapshot replaces (the last entry the state machine had applied), and the last included
term is the term of this entry. These are preserved to support the AppendEntries consistency check for the first log
entry following the snapshot, since that entry needs a previous log index and term.

所以我這邊不能把最后的抹成空的term荧恍,需要把最后一個term也記進(jìn)去瓷叫。不然leader之后發(fā)送pre log term 永遠(yuǎn)是0,然后follower就會一直reply false送巡。因為//1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)

正確寫法


image.png

過了3B第一個RPC的測試之后摹菠,開始測全部3B


image.png

發(fā)現(xiàn)有數(shù)組越界 去做檢查


image.png

BUG 2

懷疑是有地方?jīng)]有更新LAST APPLIED,造成的骗爆。


image.png

看了下這個TEST的屬性次氨,是1個CLIENT,但是會CRASH摘投,懷疑是生成SNAPSHOT之后煮寡,CRASH恢復(fù)回來,沒有更新LAST APPLIED 和 COMMIT INDEX
全局搜更新的地方

發(fā)我只在InstallSnapshot RPC Handler 里加了更新谷朝。
沒有在READ PERSIST里加


image.png

這次 測試全過了


image.png

但是項目要求

A reasonable amount of time to take for the Lab 3 tests is 400 seconds of real time and 700 seconds of CPU time. Further, go test -run TestSnapshotSize should take less than 20 seconds of real time.

第12步 優(yōu)化性能

因為最慢的是第二個測試 去研究下為啥他那么慢
在測試?yán)锎蛄撕芏郘OG


image.png

因為這個循環(huán)有200次洲押,一秒只能跑3次左右。所以要一分多鐘圆凰。而網(wǎng)站給的跑測試的CASE里杈帐,這個CASE是1秒內(nèi)就就解決了。


image.png
image.png

這里面的操作就是在PUT专钉,而PUT是阻塞的挑童,他必須等到LEADER把這個條目COMMIT了并且APPLIED出去,PUT才能返回跃须。那么關(guān)鍵就是什么時候會COMMIT呢

仔細(xì)分析站叼,COMMIT是在大多數(shù)NODE都把這個條目加到自己的LOG,更新了COMMIT INDEX隨后告訴SERVER菇民。LEADER才會做APPLY尽楔。

那么問題就是這個操作只會發(fā)生在LEADER發(fā)起APPEND LOG RPC的時候,根據(jù)RAFT的代碼第练,是每100MS發(fā)起一次阔馋。這就是原因。

那要讓PUT盡可能的快娇掏,就需要告訴LEADER加一條LOG呕寝,立刻去觸發(fā)這個START APPEND LOG

image.png

我們可以看到前2個測試飛快就跑完了。


image.png

第四個測試拋了數(shù)組越界婴梧,從下面這行代碼

image.png

打了LOG下梢,看到lastApplied客蹋,小于了last included index
猜測是append log rpc頻率高了之后,這邊的頻率也會加高孽江,先加個邊界保護(hù)讶坯,因為last included index是LOG的基點(diǎn),這個改動不破壞正確性岗屏。
image.png

BUG 3 第5個測試阻塞

又是之前檢測死鎖闽巩,對LOCK,UNLOCK加LOG大法,發(fā)現(xiàn)死鎖出現(xiàn)在APPEND ENTRY RPC HANDLER 或者 出現(xiàn)在 INSTALL SNAPSHOT RPC HANDLER担汤。

這2個RPC HANDLER最后都對往APPLY CH里發(fā)消息。懷疑又是接受消息的后臺函數(shù)堵塞洼冻。
閱讀了代碼崭歧,懷疑是下面這個。

image.png

image.png
image.png

根據(jù)STUDENT GUIDE也說到這點(diǎn)

Re-appearing indices: Say that your Raft library has some method Start() that takes a command, and return the index at which that command was placed in the log (so that you know when to return to the client, as discussed above). You might assume that you will never see Start() return the same index twice, or at the very least, that if you see the same index again, the command that first returned that index must have failed. It turns out that neither of these things are true, even if no servers crash.

解決方案撞牢,新的替代舊的SEND

image.png

TEST RACE 測試成功

zyx@zyx-virtual-machine:~/Desktop/mit6824/mit6.824/src/kvraft$ go test -race
Test: snapshots, one client (3A) ...
  ... Passed --  15.0  5 18675 3525
Test: snapshots, many clients (3A) ...
  ... Passed --  15.1  5 21955 4067
Test: unreliable net, snapshots, many clients (3A) ...
  ... Passed --  16.3  5 10669 1461
Test: concurrent append to same key, unreliable (3A) ...
  ... Passed --   1.0  3   276   52
Test: progress in majority (3A) ...
  ... Passed --   0.4  5    54    2
Test: no progress in minority (3A) ...
  ... Passed --   1.0  5   127    3
Test: completion after heal (3A) ...
  ... Passed --   1.0  5    60    3
Test: partitions, one client (3A) ...
  ... Passed --  23.3  5  5696  775
Test: partitions, many clients (3A) ...
  ... Passed --  22.8  5 10947 1005
Test: restarts, one client (3A) ...
  ... Passed --  20.7  5  6857  952
Test: restarts, many clients (3A) ...
  ... Passed --  20.2  5 11353 1263
Test: unreliable net, restarts, many clients (3A) ...
  ... Passed --  21.4  5  8034  969
Test: restarts, partitions, many clients (3A) ...
  ... Passed --  27.4  5 11132 1028
Test: unreliable net, restarts, partitions, many clients (3A) ...
  ... Passed --  28.3  5  6883  695
Test: unreliable net, restarts, partitions, many clients, linearizability checks (3A) ...
  ... Passed --  25.9  7 12984  649
Test: InstallSnapshot RPC (3B) ...
  ... Passed --   2.8  3   704   63
Test: snapshot size is reasonable (3B) ...
  ... Passed --   3.2  3  2869  800
Test: restarts, snapshots, one client (3B) ...
  ... Passed --  19.1  5 18039 3209
Test: restarts, snapshots, many clients (3B) ...
  ... Passed --  19.8  5 19701 2804
Test: unreliable net, snapshots, many clients (3B) ...
  ... Passed --  15.8  5 11034 1533
Test: unreliable net, restarts, snapshots, many clients (3B) ...
  ... Passed --  20.1  5 11658 1490
Test: unreliable net, restarts, partitions, snapshots, many clients (3B) ...
  ... Passed --  27.5  5  8853 1021
Test: unreliable net, restarts, partitions, snapshots, many clients, linearizability checks (3B) ...
  ... Passed --  25.9  7 24348 1862
PASS
ok      kvraft  375.658s

測試性能

image.png

測500次 發(fā)現(xiàn)一些不是必然出現(xiàn)的BUG

BUG 1

image.png

通過在更新NEXT INDEX數(shù)組的地方加LOG率碾。定位有時會超過LOG的LEN。
修復(fù)方式


image.png

BUG 2

來源于3A系列


image.png

DEBUG思路如下屋彪,因為之前新代碼跑過1000次RAFT的回歸測試所宰,排除掉RAFT有問題。
而且3A系列又不涉及INSTALL SNAPSHOT(因為LAB2的測試COVER不到SNAPSHOT這塊)
大概率 懷疑CLIENT SERVER端肯定有地方?jīng)]寫對畜挥。
仔細(xì)過了一遍代碼思考每步有什么漏洞仔粥,加上一些LOG,發(fā)現(xiàn)比較OP的時候蟹但,沒有用全躯泰,比如說GET,只要GET的KEY一樣华糖,OP很容易一樣麦向。
于是對每一個GET操作,用CLIENT端的NRAND函數(shù)客叉,去生成一個唯一碼


image.png

同時更新EQUAL OP


image.png

單獨(dú)測試上面這個失敗的CASE诵竭,500次發(fā)現(xiàn)沒有問題了。
單獨(dú)測試全部的3A 測試集 200次兼搏,沒有任何異常卵慰。

BUG3

測試整個LAB3, 在3B發(fā)現(xiàn)問題
來源于3B系列

image.png

這個錯首先能明確CLIENT SERVER端代碼好的(不然3A會有問題)向族,RAFT基礎(chǔ)好的呵燕。
3B出錯,大概率代碼錯在INSTALL SNAPSHOT 這些改動上件相。
因為破壞的是線性一致性再扭,也就是APPLY LOG不一致了氧苍。那么造成問題的一定是并行的(就會造成隨機(jī)的)APPLY LOG,或者LAST APPLY 或 COMMIT INDEX這2個變量的更新有BUG泛范。
按照這個思路也是閱讀代碼让虐,排除了前者,因為APPLY LOG我的代碼都嚴(yán)格的被鎖保護(hù)了罢荡。
搜索rf.commitIndex =的代碼
發(fā)現(xiàn)之前我們曾經(jīng)加過
image.png

隨后在install snapshot rpc handler的地方代碼改成和上面一樣赡突,通過打LOG也確實會發(fā)生,COMMIT INDEX 比 LAST INCLUDED INDEX大的情況区赵。
可是依然線性一致性的檢測有錯惭缰,排查了很久。
發(fā)現(xiàn)笼才,因為如果LAST APPLY已經(jīng)超過 LAST INCLUDED INDEX 的情況下漱受,KV SERVER端的DB 狀態(tài)是比SNAPSHOT 更新的。
所以還要加上如下代碼
image.png

測試SHELL 腳本

export GOPATH="/home/zyx/Desktop/mit6824/mit6.824"
export PATH="$PATH:/usr/lib/go-1.9/bin"

rm res -rf
mkdir res
set int j = 0
for ((i = 0; j < 25; i++))
do
    for ((c = $((i*10)); c < $(( (i+1)*10)); c++))
    do
         (go test -run TestSnapshotUnreliableRecoverConcurrentPartitionLinearizable3B) &> ./res/$c &
    done

    sleep 40

    if grep -nr "FAIL.*raft.*" res; then
        echo "fail"
    fi

done

測試500次通過

image.png

下面對代碼做一些CONCISE骡送。

Client

package raftkv

import (
    "labrpc"
)
import "crypto/rand"
import "math/big"


type Clerk struct {
    servers []*labrpc.ClientEnd
    lastLeader  int
    id          int64
    seqNum      int
}

func Nrand() int64 {
    max := big.NewInt(int64(1) << 62)
    bigx, _ := rand.Int(rand.Reader, max)
    x := bigx.Int64()
    return x
}

func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
    ck := new(Clerk)
    ck.servers = servers
    ck.id = Nrand()//give each client a unique identifier, and then have them
    ck.seqNum = 0// tag each request with a monotonically increasing sequence number.
    ck.lastLeader = 0
    return ck
}

func (ck *Clerk) Get(key string) string {
    index := ck.lastLeader
    for {
        args := GetArgs{key}
        reply := GetReply{}
        ok := ck.servers[index].Call("KVServer.Get", &args, &reply)
        if ok && !reply.WrongLeader {
            ck.lastLeader = index
            return reply.Value
        }
        index = (index + 1) % len(ck.servers)
        time.Sleep(time.Duration(100)*time.Millisecond)
    }
}

func (ck *Clerk) PutAppend(key string, value string, op string) {
    index := ck.lastLeader

    args := PutAppendArgs{key, value, op, ck.id, ck.seqNum}
    ck.seqNum++
    for {
        reply := PutAppendReply{}
        ok := ck.servers[index].Call("KVServer.PutAppend", &args, &reply)
        if ok && !reply.WrongLeader {
            ck.lastLeader = index
            return
        }
        index = (index + 1) % len(ck.servers)
        time.Sleep(time.Duration(100)*time.Millisecond)
    }
}
func (ck *Clerk) Put(key string, value string) {
    ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
    ck.PutAppend(key, value, "Append")
}

server

package raftkv

import (
    "bytes"
    "labgob"
    "labrpc"
    "log"
    "raft"
    "strconv"
    "sync"
    "time"
)

type Op struct {
    OpType  string "operation type(eg. put/append)"
    Key     string
    Value   string
    Cid     int64
    SeqNum  int
}

type KVServer struct {
    mu      sync.Mutex
    me      int
    rf      *raft.Raft
    applyCh chan raft.ApplyMsg

    maxraftstate int // snapshot if log grows this big
    timeout      time.Duration
    persist *raft.Persister
    db      map[string]string
    chMap   map[int]chan Op
    cid2Seq map[int64]int
    killCh  chan bool
}

func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
    //from hint: A simple solution is to enter every Get() (as well as each Put() and Append()) in the Raft log.
    originOp := Op{"Get",args.Key,strconv.FormatInt(Nrand(),10),0,0}
    reply.WrongLeader = true
    index,_,isLeader := kv.rf.Start(originOp)
    if !isLeader {return}
    ch := kv.putIfAbsent(index)
    op := beNotified(ch)
    if equalOp(op,originOp) {
        reply.WrongLeader = false
        kv.mu.Lock()
        reply.Value = kv.db[op.Key]
        kv.mu.Unlock()
    }
}

func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
    originOp := Op{args.Op,args.Key,args.Value,args.Cid,args.SeqNum}
    reply.WrongLeader = true
    index,_,isLeader := kv.rf.Start(originOp)
    if !isLeader {return}
    ch := kv.putIfAbsent(index)
    op := beNotified(ch)
    if equalOp(originOp,op) {
        reply.WrongLeader = false
    }
}
func beNotified(ch chan Op) Op{
    select {
    case notifyArg := <- ch :
        return notifyArg
    case <- time.After(time.Duration(600)*time.Millisecond):
        return Op{}
    }
}
func (kv *KVServer) putIfAbsent(idx int) chan Op{
    kv.mu.Lock()
    defer kv.mu.Unlock()
    if _, ok := kv.chMap[idx]; !ok {
        kv.chMap[idx] = make(chan Op,1)
    }
    return kv.chMap[idx]

}
func equalOp(a Op, b Op) bool{
    return a.Key == b.Key && a.Value == b.Value && a.OpType == b.OpType && a.SeqNum == b.SeqNum && a.Cid == b.Cid
}

func (kv *KVServer) Kill() {
    kv.rf.Kill()
    kv.killCh <- true
}

func (kv *KVServer) readSnapShot(snapshot []byte) {
    kv.mu.Lock()
    defer kv.mu.Unlock()
    if snapshot == nil || len(snapshot) < 1 {return}
    r := bytes.NewBuffer(snapshot)
    d := labgob.NewDecoder(r)
    var db map[string]string
    var cid2Seq map[int64]int
    if  d.Decode(&db) != nil || d.Decode(&cid2Seq) != nil {
        log.Fatal("readSnapShot ERROR for server %v",kv.me)
    } else {
        kv.db, kv.cid2Seq = db, cid2Seq
    }
}

func (kv *KVServer) needSnapShot() bool {
    kv.mu.Lock()
    defer kv.mu.Unlock()
    threshold := 10
    return kv.maxraftstate > 0 &&
        kv.maxraftstate - kv.persist.RaftStateSize() < kv.maxraftstate/threshold
}

func (kv *KVServer) doSnapShot(index int) {
    w := new(bytes.Buffer)
    e := labgob.NewEncoder(w)
    kv.mu.Lock()
    e.Encode(kv.db)
    e.Encode(kv.cid2Seq)
    kv.mu.Unlock()
    kv.rf.DoSnapShot(index,w.Bytes())
}
func send(notifyCh chan Op,op Op) {
    select{
    case  <-notifyCh:
    default:
    }
    notifyCh <- op
}
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
    // call labgob.Register on structures you want
    // Go's RPC library to marshall/unmarshall.
    labgob.Register(Op{})
    kv := new(KVServer)
    kv.me = me
    kv.maxraftstate = maxraftstate
    kv.persist = persister
    // You may need initialization code here.
    kv.db = make(map[string]string)
    kv.chMap = make(map[int]chan Op)
    kv.cid2Seq = make(map[int64]int)
    kv.readSnapShot(kv.persist.ReadSnapshot())
    kv.applyCh = make(chan raft.ApplyMsg)
    kv.rf = raft.Make(servers, me, persister, kv.applyCh)
    kv.killCh = make(chan bool,1)
    go func() {
        for {
            select {
            case <- kv.killCh:
                return
            case applyMsg := <- kv.applyCh:
                if !applyMsg.CommandValid {
                    kv.readSnapShot(applyMsg.SnapShot)
                    continue
                }
                op := applyMsg.Command.(Op)
                kv.mu.Lock()
                maxSeq,found := kv.cid2Seq[op.Cid]
                if !found || op.SeqNum > maxSeq {
                    switch op.OpType {
                    case "Put":
                        kv.db[op.Key] = op.Value
                    case "Append":
                        kv.db[op.Key] += op.Value
                    }
                    kv.cid2Seq[op.Cid] = op.SeqNum
                }
                kv.mu.Unlock()
                notifyCh := kv.putIfAbsent(applyMsg.CommandIndex)
                if kv.needSnapShot() {
                    go kv.doSnapShot(applyMsg.CommandIndex)
                }
                send(notifyCh,op)

            }
        }
    }()
    return kv
}

新的RAFT代碼

package raft

import (
    "bytes"
    "labgob"
    "log"
    "math/rand"
    "sort"
    "sync"
    "sync/atomic"
    "time"
)
import "labrpc"

type ApplyMsg struct {
    CommandValid bool
    Command      interface{}
    CommandIndex int
    SnapShot     []byte
}
type State int
const (
    Follower State = iota // value --> 0
    Candidate             // value --> 1
    Leader                // value --> 2
)
const NULL int = -1

type Log struct {
    Term    int         "term when entry was received by leader"
    Command interface{} "command for state machine,"
}
// A Go object implementing a single Raft peer.
type Raft struct {
    mu        sync.Mutex          // Lock to protect shared access to this peer's state
    peers     []*labrpc.ClientEnd // RPC end points of all peers
    persister *Persister          // Object to hold this peer's persisted state
    me        int                 // this peer's index into peers[]
    // state a Raft server must maintain.
    state     State
    //Persistent state on all servers:(Updated on stable storage before responding to RPCs)
    currentTerm int    "latest term server has seen (initialized to 0 increases monotonically)"
    votedFor    int    "candidateId that received vote in current term (or null if none)"
    log         []Log  "log entries;(first index is 1)"
    //log compaction
    lastIncludedIndex   int "the snapshot replaces all entries up through and including this index"
    lastIncludedTerm    int "term of lastIncludedIndex"

    //Volatile state on all servers:
    commitIndex int    "index of highest log entry known to be committed (initialized to 0, increases monotonically)"
    lastApplied int    "index of highest log entry applied to state machine (initialized to 0, increases monotonically)"

    //Volatile state on leaders:(Reinitialized after election)
    nextIndex   []int  "for each server,index of the next log entry to send to that server"
    matchIndex  []int  "for each server,index of highest log entry known to be replicated on server(initialized to 0, im)"

    //channel
    applyCh     chan ApplyMsg // from Make()
    killCh      chan bool //for Kill()
    //handle rpc
    voteCh      chan bool
    appendLogCh chan bool

}

// return currentTerm and whether this server believes it is the leader.
func (rf *Raft) GetState() (int, bool) {
    var term int
    var isleader bool
    rf.mu.Lock()
    defer rf.mu.Unlock()
    term = rf.currentTerm
    isleader = (rf.state == Leader)
    return term, isleader
}

// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
func (rf *Raft) persist() {
    rf.persister.SaveRaftState(rf.encodeRaftState())
}
func (rf *Raft) encodeRaftState() []byte {
    w := new(bytes.Buffer)
    e := labgob.NewEncoder(w)
    e.Encode(rf.currentTerm)
    e.Encode(rf.votedFor)
    e.Encode(rf.log)
    e.Encode(rf.lastIncludedIndex)
    e.Encode(rf.lastIncludedTerm)
    return w.Bytes()
}
func (rf *Raft) persistWithSnapShot(snapshot []byte) {
    rf.persister.SaveStateAndSnapshot(rf.encodeRaftState(),snapshot)
}

// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
    if data == nil || len(data) < 1 { // bootstrap without any state?
        return
    }
    r := bytes.NewBuffer(data)
    d := labgob.NewDecoder(r)
    var currentTerm int
    var voteFor int
    var clog []Log
    var lastIncludedIndex int
    var lastIncludedTerm int
    if  d.Decode(&currentTerm) != nil || d.Decode(&voteFor) != nil || d.Decode(&clog) != nil ||
        d.Decode(&lastIncludedIndex) != nil || d.Decode(&lastIncludedTerm) != nil{
        log.Fatal("readPersist ERROR for server %v",rf.me)
    } else {
        rf.mu.Lock()
        rf.currentTerm, rf.votedFor, rf.log = currentTerm, voteFor, clog
        rf.lastIncludedTerm, rf.lastIncludedIndex = lastIncludedTerm, lastIncludedIndex
        rf.commitIndex, rf.lastApplied = rf.lastIncludedIndex, rf.lastIncludedIndex
        rf.mu.Unlock()
    }
}

// RequestVote RPC arguments structure. field names must start with capital letters!
type RequestVoteArgs struct {
    Term            int "candidate’s term"
    CandidateId     int "candidate requesting vote"
    LastLogIndex    int "index of candidate’s last log entry (§5.4)"
    LastLogTerm     int "term of candidate’s last log entry (§5.4)"
}

// RequestVote RPC reply structure. field names must start with capital letters!
type RequestVoteReply struct {
    Term        int  "currentTerm, for candidate to update itself"
    VoteGranted bool "true means candidate received vote"
}

//RequestVote RPC handler.
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    if args.Term > rf.currentTerm {//all server rule 1 If RPC request or response contains term T > currentTerm:
        rf.beFollower(args.Term) // set currentTerm = T, convert to follower (§5.1)
    }
    reply.Term = rf.currentTerm
    reply.VoteGranted = false
    if (args.Term < rf.currentTerm) || (rf.votedFor != NULL && rf.votedFor != args.CandidateId) {
        // Reply false if term < currentTerm (§5.1)  If votedFor is not null and not candidateId,
    } else if args.LastLogTerm < rf.getLastLogTerm() || (args.LastLogTerm == rf.getLastLogTerm() && args.LastLogIndex < rf.getLastLogIdx()){
        //If the logs have last entries with different terms, then the log with the later term is more up-to-date.
        // If the logs end with the same term, then whichever log is longer is more up-to-date.
        // Reply false if candidate’s log is at least as up-to-date as receiver’s log
    } else {
        //grant vote
        rf.votedFor = args.CandidateId
        reply.VoteGranted = true
        rf.state = Follower
        rf.persist()
        send(rf.voteCh) //because If election timeout elapses without receiving granting vote to candidate, so wake up

    }
}

////RequestVote RPC sender.
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
    ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
    return ok
}

type AppendEntriesArgs struct {
    Term         int    "leader’s term"
    LeaderId     int    "so follower can redirect clients"
    PrevLogIndex int    "index of log entry immediately preceding new ones"
    PrevLogTerm  int    "term of prevLogIndex entry"
    Entries      []Log  "log entries to store (empty for heartbeat;may send more than one for efficiency)"
    LeaderCommit int    "leader’s commitIndex"
}

type AppendEntriesReply struct {
    Term          int   "currentTerm, for leader to update itself"
    Success       bool  "true if follower contained entry matching prevLogIndex and prevLogTerm"
    ConflictIndex int   "the first index it stores for that conflict term"
    ConflictTerm  int   "the term of the conflicting entry"
}

//AppendEntries RPC handler.
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {//now only for heartbeat
    rf.mu.Lock()
    defer rf.mu.Unlock()

    if args.Term > rf.currentTerm { //all server rule 1 If RPC request or response contains term T > currentTerm:
        rf.beFollower(args.Term) // set currentTerm = T, convert to follower (§5.1)
    }
    send(rf.appendLogCh) //If election timeout elapses without receiving AppendEntries RPC from current leader

    reply.Term = rf.currentTerm
    reply.Success = false
    reply.ConflictTerm = NULL
    reply.ConflictIndex = 0
    //1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
    prevLogIndexTerm := -1
    logSize := rf.logLen()
    if args.PrevLogIndex >= rf.lastIncludedIndex && args.PrevLogIndex < rf.logLen() {
        prevLogIndexTerm = rf.getLog(args.PrevLogIndex).Term
    }
    if prevLogIndexTerm != args.PrevLogTerm {
        reply.ConflictIndex = logSize
        if prevLogIndexTerm == -1 {//If a follower does not have prevLogIndex in its log,
            //it should return with conflictIndex = len(log) and conflictTerm = None.
        } else { //If a follower does have prevLogIndex in its log, but the term does not match
            reply.ConflictTerm = prevLogIndexTerm //it should return conflictTerm = log[prevLogIndex].Term,
            i := rf.lastIncludedIndex
            for ; i < logSize; i++ {//and then search its log for
                if rf.getLog(i).Term == reply.ConflictTerm {//the first index whose entry has term equal to conflictTerm
                    reply.ConflictIndex = i
                    break
                }
            }
        }
        return
    }
    //2. Reply false if term < currentTerm (§5.1)
    if args.Term < rf.currentTerm {return}

    index := args.PrevLogIndex
    for i := 0; i < len(args.Entries); i++ {
        index++
        if index < logSize {
            if rf.getLog(index).Term == args.Entries[i].Term {
                continue
            } else {//3. If an existing entry conflicts with a new one (same index but different terms),
                rf.log = rf.log[:index - rf.lastIncludedIndex]//delete the existing entry and all that follow it (§5.3)
            }
        }
        rf.log = append(rf.log,args.Entries[i:]...) //4. Append any new entries not already in the log
        rf.persist()
        break;
    }
    //5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
    if args.LeaderCommit > rf.commitIndex {
        rf.commitIndex = Min(args.LeaderCommit ,rf.getLastLogIdx())
        rf.updateLastApplied()
    }
    reply.Success = true
}

func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
    ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
    return ok
}
//InstallSnapshot RPC
type InstallSnapshotArgs struct {
    Term              int       "leader’s term"
    LeaderId          int       "so follower can redirect clients"
    LastIncludedIndex int       "the snapshot replaces all entries up through and including this index"
    LastIncludedTerm  int       "term of lastIncludedIndex"
    Data              []byte    "raw bytes of the snapshot chunk, starting at offset"
}

type InstallSnapshotReply struct {
    Term    int "currentTerm, for leader to update itself"
}

func (rf *Raft) sendInstallSnapshot(server int, args *InstallSnapshotArgs, reply *InstallSnapshotReply) bool {
    ok := rf.peers[server].Call("Raft.InstallSnapshot", args, reply)
    return ok
}

func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    reply.Term = rf.currentTerm
    if args.Term < rf.currentTerm { //Reply immediately if term < currentTerm
        return
    }
    if args.Term > rf.currentTerm { //all server rule 1 If RPC request or response contains term T > currentTerm:
        rf.beFollower(args.Term) // set currentTerm = T, convert to follower (§5.1)
    }
    send(rf.appendLogCh) //If election timeout elapses without receiving AppendEntries RPC from current leader
    if args.LastIncludedIndex <= rf.lastIncludedIndex {// discard any existing or partial snapshot with a smaller index
        return
    }
    applyMsg := ApplyMsg{CommandValid: false, SnapShot: args.Data}
    //If existing log entry has same index and term as snapshot’s last included entry,retain log entries following it and reply
    if args.LastIncludedIndex < rf.logLen()-1 {
        rf.log = append(make([]Log,0),rf.log[args.LastIncludedIndex -rf.lastIncludedIndex:]...)
    }else {//7. Discard the entire log
        rf.log = []Log{{args.LastIncludedTerm, nil},}
    }
    //Reset state machine using snapshot contents (and load snapshot’s cluster configuration)
    rf.lastIncludedIndex, rf.lastIncludedTerm = args.LastIncludedIndex, args.LastIncludedTerm
    rf.persistWithSnapShot(args.Data)
    rf.commitIndex = Max(rf.commitIndex,rf.lastIncludedIndex)
    rf.lastApplied = Max(rf.lastApplied, rf.lastIncludedIndex)
    if rf.lastApplied > rf.lastIncludedIndex {return} //snapshot is older than kvserver's db, so reply immediately
    rf.applyCh <- applyMsg

}

func (rf *Raft) sendSnapshot(server int) {
    args := InstallSnapshotArgs{
        Term:              rf.currentTerm,
        LastIncludedIndex: rf.lastIncludedIndex,
        LastIncludedTerm:  rf.lastIncludedTerm,
        LeaderId:          rf.me,
        Data:              rf.persister.ReadSnapshot(),
    }
    rf.mu.Unlock()
    reply := InstallSnapshotReply{}
    ret := rf.sendInstallSnapshot(server,&args,&reply)
    rf.mu.Lock();
    defer rf.mu.Unlock()
    if !ret || rf.state != Leader || rf.currentTerm != args.Term {
        return
    }
    if reply.Term > rf.currentTerm {//all server rule 1 If RPC response contains term T > currentTerm:
        rf.beFollower(reply.Term) // set currentTerm = T, convert to follower (§5.1)
        return
    }
    rf.updateNextMatchIdx(server,rf.lastIncludedIndex)
}

//Leader Section:
func (rf *Raft) startAppendLog() {
    for i := 0; i < len(rf.peers); i++ {
        if i == rf.me {
            continue
        }
        go func(idx int) {
            for {
                rf.mu.Lock();
                if rf.state != Leader {
                    rf.mu.Unlock()
                    return
                } //send initial empty AppendEntries RPCs (heartbeat) to each server
                if rf.nextIndex[idx]-rf.lastIncludedIndex < 1 { //The leader uses a new RPC called InstallSnapshot to
                    rf.sendSnapshot(idx) // followers that are too far behind
                    return
                }
                args := AppendEntriesArgs{
                    rf.currentTerm,
                    rf.me,
                    rf.getPrevLogIdx(idx),
                    rf.getPrevLogTerm(idx),
                    //If last log index ≥ nextIndex for a follower:send AppendEntries RPC with log entries starting at nextIndex
                    //nextIndex > last log index, rf.log[rf.nextIndex[idx]:] will be empty then like a heartbeat
                    append(make([]Log, 0), rf.log[rf.nextIndex[idx]-rf.lastIncludedIndex:]...),
                    rf.commitIndex,
                }
                rf.mu.Unlock()
                reply := &AppendEntriesReply{}
                ret := rf.sendAppendEntries(idx, &args, reply)
                rf.mu.Lock();
                if !ret || rf.state != Leader || rf.currentTerm != args.Term {
                    rf.mu.Unlock()
                    return
                }
                if reply.Term > rf.currentTerm { //all server rule 1 If RPC response contains term T > currentTerm:
                    rf.beFollower(reply.Term) // set currentTerm = T, convert to follower (§5.1)
                    rf.mu.Unlock()
                    return
                }
                if reply.Success { //If successful:update nextIndex and matchIndex for follower
                    rf.updateNextMatchIdx(idx, args.PrevLogIndex+len(args.Entries))
                    rf.mu.Unlock()
                    return
                } else { //If AppendEntries fails because of log inconsistency: decrement nextIndex and retry
                    tarIndex := reply.ConflictIndex //If it does not find an entry with that term
                    if reply.ConflictTerm != NULL {
                        logSize := rf.logLen() //first search its log for conflictTerm
                        for i := rf.lastIncludedIndex; i < logSize; i++ { //if it finds an entry in its log with that term,
                            if rf.getLog(i).Term != reply.ConflictTerm {
                                continue
                            }
                            for i < logSize && rf.getLog(i).Term == reply.ConflictTerm {
                                i++
                            }            //set nextIndex to be the one
                            tarIndex = i //beyond the index of the last entry in that term in its log
                        }
                    }
                    rf.nextIndex[idx] = Min(rf.logLen(),tarIndex);
                    rf.mu.Unlock()
                }
            }
        }(i)
    }
}

// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
func (rf *Raft) Start(command interface{}) (int, int, bool) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    index := -1
    term := rf.currentTerm
    isLeader := (rf.state == Leader)
    //If command received from client: append entry to local log, respond after entry applied to state machine (§5.3)
    if isLeader {
        index = rf.getLastLogIdx() + 1
        newLog := Log{
            rf.currentTerm,
            command,
        }
        rf.log = append(rf.log,newLog)
        rf.persist()
        rf.startAppendLog()
    }
    return index, term, isLeader
}

//If there exists an N such that N > commitIndex,
// a majority of matchIndex[i] ≥ N, and log[N].term == currentTerm: set commitIndex = N (§5.3, §5.4).
func (rf *Raft) updateCommitIndex() {
    rf.matchIndex[rf.me] = rf.logLen() - 1
    copyMatchIndex := make([]int,len(rf.matchIndex))
    copy(copyMatchIndex,rf.matchIndex)
    sort.Sort(sort.Reverse(sort.IntSlice(copyMatchIndex)))
    N := copyMatchIndex[len(copyMatchIndex)/2]
    if N > rf.commitIndex && rf.getLog(N).Term == rf.currentTerm {
        rf.commitIndex = N
        rf.updateLastApplied()
    }
}

func (rf *Raft) beLeader() {
    if rf.state != Candidate {
        return
    }
    rf.state = Leader
    //initialize leader data
    rf.nextIndex = make([]int,len(rf.peers))
    rf.matchIndex = make([]int,len(rf.peers))//initialized to 0
    for i := 0; i < len(rf.nextIndex); i++ {//(initialized to leader last log index + 1)
        rf.nextIndex[i] = rf.getLastLogIdx() + 1
    }
}
//end Leader section


//Candidate Section:
// If AppendEntries RPC received from new leader: convert to follower implemented in AppendEntries RPC Handler
func (rf *Raft) beCandidate() { //Reset election timer are finished in caller
    rf.state = Candidate
    rf.currentTerm++ //Increment currentTerm
    rf.votedFor = rf.me //vote myself first
    rf.persist()
    //ask for other's vote
    go rf.startElection() //Send RequestVote RPCs to all other servers
}
//If election timeout elapses: start new election handled in caller
func (rf *Raft) startElection() {
    rf.mu.Lock()
    args := RequestVoteArgs{
        rf.currentTerm,
        rf.me,
        rf.getLastLogIdx(),
        rf.getLastLogTerm(),

    };
    rf.mu.Unlock()
    var votes int32 = 1;
    for i := 0; i < len(rf.peers); i++ {
        if i == rf.me {
            continue
        }
        go func(idx int) {
            reply := &RequestVoteReply{}
            ret := rf.sendRequestVote(idx,&args,reply)

            if ret {
                rf.mu.Lock()
                defer rf.mu.Unlock()
                if reply.Term > rf.currentTerm {
                    rf.beFollower(reply.Term)
                    return
                }
                if rf.state != Candidate || rf.currentTerm != args.Term{
                    return
                }
                if reply.VoteGranted {
                    atomic.AddInt32(&votes,1)
                } //If votes received from majority of servers: become leader
                if atomic.LoadInt32(&votes) > int32(len(rf.peers) / 2) {
                    rf.beLeader()
                    rf.startAppendLog()
                    send(rf.voteCh) //after be leader, then notify 'select' goroutine will sending out heartbeats immediately
                }
            }
        }(i)
    }
}
//end Candidate section

//Follower Section:
func (rf *Raft) beFollower(term int) {
    rf.state = Follower
    rf.votedFor = NULL
    rf.currentTerm = term
    rf.persist()
}
//end Follower section

//all server rule : If commitIndex > lastApplied: increment lastApplied, apply log[lastApplied] to state machine
func (rf *Raft) updateLastApplied() {
    rf.lastApplied = Max(rf.lastApplied,rf.lastIncludedIndex)
    rf.commitIndex = Max(rf.commitIndex,rf.lastIncludedIndex)
    for rf.lastApplied < rf.commitIndex {
        rf.lastApplied++
        curLog := rf.getLog(rf.lastApplied)
        applyMsg := ApplyMsg{true, curLog.Command, rf.lastApplied, nil,}
        rf.applyCh <- applyMsg
    }
}

//log compaction:
func (rf *Raft) DoSnapShot(curIdx int, snapshot []byte) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    if curIdx <= rf.lastIncludedIndex {return}
    //update last included index & term
    rf.log = append(make([]Log,0), rf.log[curIdx-rf.lastIncludedIndex:]...)
    rf.lastIncludedIndex = curIdx
    rf.lastIncludedTerm = rf.getLog(curIdx).Term
    rf.persistWithSnapShot(snapshot)
}

// the tester calls Kill() when a Raft instance won't be needed again.
func (rf *Raft) Kill() {
    send(rf.killCh)
}

//Helper function
func send(ch chan bool) {
    select {
    case <-ch: //if already set, consume it then resent to avoid block
    default:
    }
    ch <- true
}

func (rf *Raft) getLog(i int) Log {
    return rf.log[i - rf.lastIncludedIndex]
}

func (rf *Raft) logLen() int {
    return len(rf.log) + rf.lastIncludedIndex
}

func (rf *Raft) getPrevLogIdx(i int) int {
    return rf.nextIndex[i] - 1
}

func (rf *Raft) getPrevLogTerm(i int) int {
    prevLogIdx := rf.getPrevLogIdx(i)
    if prevLogIdx < rf.lastIncludedIndex {
        return -1
    }
    return rf.getLog(prevLogIdx).Term
}

func (rf *Raft) getLastLogIdx() int {
    return rf.logLen() - 1
}

func (rf *Raft) getLastLogTerm() int {
    idx := rf.getLastLogIdx()
    if idx < rf.lastIncludedIndex {
        return -1
    }
    return rf.getLog(idx).Term
}

func (rf *Raft) updateNextMatchIdx(server int, matchIdx int) {
    rf.matchIndex[server] = matchIdx
    rf.nextIndex[server] = matchIdx + 1
    rf.updateCommitIndex()
}

func Make(peers []*labrpc.ClientEnd, me int,
    persister *Persister, applyCh chan ApplyMsg) *Raft {
    rf := &Raft{}
    rf.peers = peers
    rf.persister = persister
    rf.me = me

    rf.state = Follower
    rf.currentTerm = 0
    rf.votedFor = NULL
    rf.log = make([]Log,1) //(first index is 1)

    rf.commitIndex = 0
    rf.lastApplied = 0

    rf.applyCh = applyCh
    //because gorountne only send the chan to below goroutine,to avoid block, need 1 buffer
    rf.voteCh = make(chan bool,1)
    rf.appendLogCh = make(chan bool,1)
    rf.killCh = make(chan bool,1)
    // initialize from state persisted before a crash
    rf.readPersist(persister.ReadRaftState())

    //because from hint The tester requires that the leader send heartbeat RPCs no more than ten times per second.
    heartbeatTime := time.Duration(100) * time.Millisecond

    //from hint :You'll need to write code that takes actions periodically or after delays in time.
    //  The easiest way to do this is to create a goroutine with a loop that calls time.Sleep().
    go func() {
        for {
            select {
            case <-rf.killCh:
                return
            default:
            }
            electionTime := time.Duration(rand.Intn(200) + 300) * time.Millisecond

            rf.mu.Lock()
            state := rf.state
            rf.mu.Unlock()

            switch state {
            case Follower, Candidate:
                select {
                case <-rf.voteCh:
                case <-rf.appendLogCh:
                case <-time.After(electionTime):
                    rf.mu.Lock()
                    rf.beCandidate() //becandidate, Reset election timer, then start election
                    rf.mu.Unlock()
                }
            case Leader:
                time.Sleep(heartbeatTime)
                rf.startAppendLog()
            }
        }
    }()
    return rf
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末昂羡,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子摔踱,更是在濱河造成了極大的恐慌虐先,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件派敷,死亡現(xiàn)場離奇詭異蛹批,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)膀息,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進(jìn)店門般眉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人潜支,你說我怎么就攤上這事甸赃。” “怎么了冗酿?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵埠对,是天一觀的道長。 經(jīng)常有香客問我裁替,道長项玛,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任弱判,我火速辦了婚禮襟沮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己开伏,他們只是感情好膀跌,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著固灵,像睡著了一般捅伤。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上巫玻,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天丛忆,我揣著相機(jī)與錄音,去河邊找鬼仍秤。 笑死熄诡,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的诗力。 我是一名探鬼主播粮彤,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼姜骡!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起屿良,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤圈澈,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后尘惧,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體康栈,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年喷橙,在試婚紗的時候發(fā)現(xiàn)自己被綠了啥么。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡贰逾,死狀恐怖悬荣,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情疙剑,我是刑警寧澤氯迂,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站言缤,受9級特大地震影響嚼蚀,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜管挟,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一轿曙、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦导帝、人聲如沸守谓。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽分飞。三九已至,卻和暖如春睹限,著一層夾襖步出監(jiān)牢的瞬間譬猫,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工羡疗, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留染服,地道東北人。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓叨恨,卻偏偏與公主長得像柳刮,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子痒钝,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評論 2 355

推薦閱讀更多精彩內(nèi)容