4. 一步一步帶你實(shí)現(xiàn)raft(2C)

第一步 閱讀論文6之后的部分

https://www.infoq.cn/article/raft-paper

隨后開(kāi)始看2C文檔
任務(wù)目標(biāo)

If a Raft-based server reboots it should resume service where it left off. This requires that Raft keep persistent state that survives a reboot. The paper's Figure 2 mentions which state should be persistent, and raft.go contains examples of how to save and restore persistent state.

A “real” implementation would do this by writing Raft's persistent state to disk each time it changes, and reading the latest saved state from disk when restarting after a reboot. Your implementation won't use the disk; instead, it will save and restore persistent state from a Persister object (see persister.go). Whoever calls Raft.Make() supplies a Persister that initially holds Raft's most recently persisted state (if any). Raft should initialize its state from that Persister, and should use it to save its persistent state each time the state changes. Use the Persister's ReadRaftState() and SaveRaftState() methods.

第二步 實(shí)現(xiàn)persist(),readPersist()

Complete the functions persist() and readPersist() in raft.go by adding code to save and restore persistent state. You will need to encode (or "serialize") the state as an array of bytes in order to pass it to the Persister. Use the labgob encoder we provide to do this; see the comments in persist() and readPersist(). labgob is derived from Go's gob encoder; the only difference is that labgob prints error messages if you try to encode structures with lower-case field names.
因?yàn)槔佣荚谧⑨尷锝o你了姨裸,我把注釋解掉众弓,改一改寫(xiě)起來(lái)十分簡(jiǎn)單蚂踊。只要知道什么是要持久存儲(chǔ)的即可冈欢。

image.png
image.png

第三步 思考何時(shí)調(diào)用這2個(gè)函數(shù)

You now need to determine at what points in the Raft protocol your servers are required to persist their state, and insert calls to persist() in those places. There is already a call to readPersist() in Raft.Make(). Once you've done this, you should pass the remaining tests. You may want to first try to pass the "basic persistence" test (go test -run 'TestPersist12C'), and then tackle the remaining ones (go test -run 2C).

基本思路就是修改了這3個(gè)值的時(shí)候样刷,就需要做PERSIST窒朋,最后來(lái)思考怎么加鎖的問(wèn)題辰妙。
我們搜索rf.currentTerm, rf.voteFor, rf.log
搜完之后,我加了如下幾處碉输。

image.png

image.png
image.png

image.png
image.png

因?yàn)閞eadPersist,只有當(dāng)機(jī)器掛了重啟之后才需要亭珍,所以在MAKE里做了就夠了敷钾。不用添加額外的地方枝哄。


image.png

我們可以發(fā)現(xiàn)調(diào)用persist 都是在鎖保護(hù)范圍內(nèi)的,所以這個(gè)方法不用加鎖阻荒。
但是readPersist需要加鎖


image.png

image.png

第四步 FROM HINT

In order to pass some of the challenging tests towards the end, such as those marked "unreliable", you will need to implement the optimization to allow a follower to back up the leader's nextIndex by more than one entry at a time. See the description in the extended Raft paper starting at the bottom of page 7 and top of page 8 (marked by a gray line). The paper is vague about the details; you will need to fill in the gaps, perhaps with the help of the 6.824 Raft lectures.

文章內(nèi)容:

If desired, the protocol can be optimized to reduce the number of rejected AppendEntries RPCs. For example, when rejecting an AppendEntries request, the follower can include the term of the conflicting entry and the first index it stores for that term. With this information, the leader can decrement nextIndex to bypass all of the conflicting entries in that term; one AppendEntries RPC will be required for each term with conflicting entries, rather than one RPC per entry.
例如挠锥,當(dāng)拒絕了一個(gè) AppendEntries 請(qǐng)求,追隨者可以記錄下沖突日志條目的任期號(hào)和自己存儲(chǔ)那個(gè)任期的最早的索引侨赡。通過(guò)這些信息蓖租,領(lǐng)導(dǎo)人能夠直接遞減nextIndex跨過(guò)那個(gè)任期內(nèi)所有的沖突條目;這樣的話羊壹,一個(gè)沖突的任期需要一次 AppendEntries RPC蓖宦,而不是每一個(gè)沖突條目需要一次 AppendEntries RPC。

Student Guide

We believe the protocol the authors probably want you to follow is:

  • If a follower does not have prevLogIndex in its log, it should return with conflictIndex = len(log) and conflictTerm = None.
  • If a follower does have prevLogIndex in its log, but the term does not match, it should return conflictTerm = log[prevLogIndex].Term, and then search its log for the first index whose entry has term equal to conflictTerm.
  • Upon receiving a conflict response, the leader should first search its log for conflictTerm. If it finds an entry in its log with that term, it should set nextIndex to be the one beyond the index of the last entry in that term in its log.
  • If it does not find an entry with that term, it should set nextIndex = conflictIndex.

根據(jù)上述內(nèi)容油猫,首先要在REPLY里加一個(gè)conflictIndex 和 conflictTerm


image.png

因?yàn)橹皇荝EPLY.SUCCESS = FALSE 的時(shí)候 需要設(shè)置conflictIndexconfictTerm
所以只要關(guān)注AppendEntries handler 的2個(gè)RETURN球昨, 結(jié)合上文的意思,修改代碼如下:

image.png
image.png

PASS 2C

image.png

Test race

zyx@zyx-virtual-machine:~/Desktop/mit6824/6.824/src/raft$ go test -race
Test (2A): initial election ...
  ... Passed --   3.1  3   38    0
Test (2A): election after network failure ...
  ... Passed --   4.5  3   88    0
Test (2B): basic agreement ...
  ... Passed --   1.3  5   32    3
Test (2B): agreement despite follower disconnection ...
  ... Passed --   6.6  3   97    8
Test (2B): no agreement if too many followers disconnect ...
  ... Passed --   3.9  5  148    3
Test (2B): concurrent Start()s ...
  ... Passed --   0.8  3   10    6
Test (2B): rejoin of partitioned leader ...
  ... Passed --   4.9  3  109    4
Test (2B): leader backs up quickly over incorrect follower logs ...
  ... Passed --  37.8  5 2198  102
Test (2B): RPC counts aren't too high ...
  ... Passed --   2.3  3   28   12
Test (2C): basic persistence ...
  ... Passed --   5.1  3  224    6
Test (2C): more persistence ...
  ... Passed --  18.7  5 2182   16
Test (2C): partitioned leader and one follower crash, leader restarts ...
  ... Passed --   2.5  3   51    4
Test (2C): Figure 8 ...
  ... Passed --  34.7  5 26854   37
Test (2C): unreliable agreement ...
  ... Passed --  12.4  5  326  246
Test (2C): Figure 8 (unreliable) ...
  ... Passed --  44.2  5 2680  461
Test (2C): churn ...
  ... Passed --  16.7  5  833  254
Test (2C): unreliable churn ...
  ... Passed --  19.1  5 1403  174
PASS
ok      raft    219.623s

但是我們看到Test (2C): Figure 8 ...
... Passed -- 34.7 5 26854 37
的RPC發(fā)送條目 非常大眨攘,有時(shí)候TEST -RACE 會(huì)報(bào)錯(cuò)主慰, 說(shuō)超過(guò)race mode 下8192個(gè)goroutine

閱讀了一下TestFigure82C 的代碼
發(fā)現(xiàn)會(huì)在選出LEADER 后 做一個(gè)CRASH 操作


image.png

看下CRASH的實(shí)現(xiàn)


image.png

發(fā)現(xiàn)這里會(huì)調(diào)用KILL,但我并沒(méi)有實(shí)現(xiàn)這個(gè)

實(shí)現(xiàn)KILL

所謂KILL鲫售,就是讓一個(gè)NODE共螺,發(fā)現(xiàn)了之后,就直接結(jié)束掉情竹。
我還是用GO的CHANNEL來(lái)做藐不。


image.png

image.png

image.png

重新跑一下測(cè)試


image.png

Test All with time

image.png

Test 100 times

BUG 1 TestFigure8Unreliable2C

測(cè)試多次的時(shí)候,發(fā)現(xiàn) TestFigure8Unreliable2C 這個(gè)CASE 有1/10的概率會(huì)FAIL秦效,看下FAIL信息

Test (2C): Figure 8 (unreliable) ...
2019/05/01 09:49:14 apply error: commit index=235 server=0 7998 != server=2 4299
exit status 1
FAIL raft 28.516s

2個(gè)INDEX 不一致雏蛮。似乎是2個(gè)NODE 上,同一個(gè)INDEX的LOG 不一致了阱州。而COMMIT的應(yīng)該要一致挑秉。
沒(méi)有頭緒時(shí),還是先過(guò)一遍 STUDENT GUIDE
https://thesquareplanet.com/blog/students-guide-to-raft/

確保里面提到的每一個(gè)點(diǎn) 你都完全理解苔货,而且在代碼里有所行動(dòng)犀概。

我發(fā)現(xiàn)了一個(gè)問(wèn)題,在拿到REPLY 后他要用CURRENT TERM 和 ARGS.TERM 去做比較夜惭。也就是說(shuō)2個(gè)SEND RPC的地方姻灶,在拿到REPLY之后,還要做個(gè)CHECK 诈茧,如果CURRENT TERM 和 ARGS.TERM 不一致产喉,就要直接RETURN,而忽略這個(gè)REPLY

image.png

在2個(gè)SENDER函數(shù)里 添加代碼如下


image.png

image.png

測(cè)試腳本

#!/bin/bash

export GOPATH="/home/zyx/Desktop/mit6824/6.824"
export PATH="$PATH:/usr/lib/go-1.9/bin"

rm res -rf
mkdir res

for ((i = 0; i < 10; i++))
do

    for ((c = $((i*30)); c < $(( (i+1)*30)); c++))
    do                  #replace job name here
         (go test -run TestFigure8Unreliable2C ) &> ./res/$c & 
    done
    
    sleep 40
    
    grep -nr "FAIL.*raft.*" res

done

修改前測(cè)試結(jié)果

image.png

修改后測(cè)試結(jié)果

image.png

BUG 2

Test (2B): no agreement if too many followers disconnect ...
--- FAIL: TestFailNoAgree2B (2.35s)
config.go:465: one(10) failed to reach agreement

這個(gè)BUG 出現(xiàn)概率非常低,有時(shí)測(cè)1000次會(huì)出現(xiàn)10次左右曾沈。有時(shí)測(cè)3000次都不出現(xiàn)尘颓。
在我打了很多LOG,花了很多時(shí)間測(cè)了很多遍晦譬,找到原因疤苹。

測(cè)試腳本

#!/bin/bash

export GOPATH="/home/zyx/Desktop/mit6824/6.824"
export PATH="$PATH:/usr/lib/go-1.9/bin"

rm res -rf
mkdir res
set int j = 0
for ((i = 0; j < 25; i++))
do
    if (($((i % 10)) == 0)); then
        rm res -rf
        mkdir res
        echo $i
    fi
    for ((c = $((i*30)); c < $(( (i+1)*30)); c++))
    do
         (go test -run TestFailNoAgree2B) &> ./res/$c &
    done

    sleep 7
    if grep -nr "FAIL.*raft.*" res; then
        exit 1
    fi


done

輸出

2019/05/07 21:37:23.027238 SEND IN MSG (1), applyMsg:10
2019/05/07 21:37:23.389237 SEND IN MSG (2), applyMsg:10
2019/05/07 21:37:23.389254 SEND IN MSG (3), applyMsg:10
2019/05/07 21:37:23.389261 SEND IN MSG (4), applyMsg:10
2019/05/07 21:37:23.389472 0 at 1 start election, last index 0 last term 0 last entry {0 <nil>}
2019/05/07 21:37:23.406102 4 be follow so curterm++ now is 1
2019/05/07 21:37:23.406188 granted Vote (4)
2019/05/07 21:37:23.406633 1 be follow so curterm++ now is 1
2019/05/07 21:37:23.406695 granted Vote (1)
2019/05/07 21:37:23.406874 CANDIDATE: 0 receive enough vote and becoming a new leader
2019/05/07 21:37:23.425553 3 be follow so curterm++ now is 1
2019/05/07 21:37:23.425675 2 be follow so curterm++ now is 1
2019/05/07 21:37:23.425745 granted Vote (2)
2019/05/07 21:37:23.429599 3 at 2 start election, last index 0 last term 0 last entry {0 <nil>}
2019/05/07 21:37:23.429635 granted Vote (3)
2019/05/07 21:37:23.429815 1 be follow so curterm++ now is 2
2019/05/07 21:37:23.429839 granted Vote (1)
2019/05/07 21:37:23.430065 2 be follow so curterm++ now is 2
2019/05/07 21:37:23.430098 granted Vote (2)
2019/05/07 21:37:23.430163 CANDIDATE: 3 receive enough vote and becoming a new leader
2019/05/07 21:37:23.430172 be Ledaer (3)
2019/05/07 21:37:23.430177 send Hearbeat (3)
2019/05/07 21:37:23.430194 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.430368 4 be follow so curterm++ now is 2
2019/05/07 21:37:23.430386 receive Hearbeat (4)
2019/05/07 21:37:23.430489 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.430527 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.430612 receive Hearbeat (4)
2019/05/07 21:37:23.430672 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.430684 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.430786 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.430891 receive Hearbeat (1)
2019/05/07 21:37:23.430945 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.430954 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.431046 receive Hearbeat (2)
2019/05/07 21:37:23.431129 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.431143 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.431277 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.431312 granted Vote (4)
2019/05/07 21:37:23.431444 receive Hearbeat (1)
2019/05/07 21:37:23.431459 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.431566 receive Hearbeat (2)
2019/05/07 21:37:23.431620 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.431695 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.433226 be Ledaer (0)
2019/05/07 21:37:23.433240 SEND IN MSG (0), applyMsg:10
2019/05/07 21:37:23.433277 SEND IN MSG SUCCESS (0), applyMsg:10
2019/05/07 21:37:23.433321 send Hearbeat from 0 to (4)
2019/05/07 21:37:23.433504 send Hearbeat from 0 to (1)
2019/05/07 21:37:23.433696 send Hearbeat from 0 to (2)
2019/05/07 21:37:23.433839 send Hearbeat from 0 to (3)
2019/05/07 21:37:23.433975 0 be follow so curterm++ now is 2
2019/05/07 21:37:23.434064 receive Hearbeat (0)
2019/05/07 21:37:23.434121 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.530457 send Hearbeat (3)
2019/05/07 21:37:23.530501 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.530628 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.530647 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.530689 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.530822 receive Hearbeat (0)
2019/05/07 21:37:23.530831 receive Hearbeat (2)
2019/05/07 21:37:23.530822 receive Hearbeat (1)
2019/05/07 21:37:23.530911 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.530924 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.530955 receive Hearbeat (4)
2019/05/07 21:37:23.530992 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.531030 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.630715 send Hearbeat (3)
2019/05/07 21:37:23.630745 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.630775 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.630827 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.630869 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.630914 receive Hearbeat (4)
2019/05/07 21:37:23.630928 receive Hearbeat (1)
2019/05/07 21:37:23.630961 receive Hearbeat (0)
2019/05/07 21:37:23.631001 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.631030 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.631040 receive Hearbeat (2)
2019/05/07 21:37:23.631052 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.631118 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.730978 send Hearbeat (3)
2019/05/07 21:37:23.731032 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.731043 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.731036 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.731202 receive Hearbeat (4)
2019/05/07 21:37:23.731209 receive Hearbeat (1)
2019/05/07 21:37:23.731264 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.731274 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.731391 receive Hearbeat (2)
2019/05/07 21:37:23.731498 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.731544 receive Hearbeat (0)
2019/05/07 21:37:23.731801 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.731869 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.831101 send Hearbeat (3)
2019/05/07 21:37:23.831131 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.831294 receive Hearbeat (4)
2019/05/07 21:37:23.831356 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.831374 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.831457 receive Hearbeat (0)
2019/05/07 21:37:23.831512 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.831521 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.831619 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.831687 receive Hearbeat (1)
2019/05/07 21:37:23.831771 receive Hearbeat (2)
2019/05/07 21:37:23.831782 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.831841 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.931542 send Hearbeat (3)
2019/05/07 21:37:23.931608 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.931619 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.931809 receive Hearbeat (1)
2019/05/07 21:37:23.931810 receive Hearbeat (4)
2019/05/07 21:37:23.931836 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.931912 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.931957 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.932056 receive Hearbeat (0)
2019/05/07 21:37:23.932139 receive Hearbeat (2)
2019/05/07 21:37:23.932170 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.932301 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.932326 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.031737 send Hearbeat (3)
2019/05/07 21:37:24.031778 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.031788 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.031847 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.032029 receive Hearbeat (1)
2019/05/07 21:37:24.032059 receive Hearbeat (0)
2019/05/07 21:37:24.032084 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.032204 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.032324 receive Hearbeat (2)
2019/05/07 21:37:24.032363 receive Hearbeat (4)
2019/05/07 21:37:24.032427 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.032525 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.032573 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.132327 send Hearbeat (3)
2019/05/07 21:37:24.132377 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.132603 receive Hearbeat (4)
2019/05/07 21:37:24.132715 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.132734 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.132878 receive Hearbeat (0)
2019/05/07 21:37:24.132981 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.132999 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.133252 receive Hearbeat (1)
2019/05/07 21:37:24.133277 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.133379 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.133436 receive Hearbeat (2)
2019/05/07 21:37:24.133602 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.232982 send Hearbeat (3)
2019/05/07 21:37:24.233076 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.233182 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.233387 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.233423 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.233534 receive Hearbeat (1)
2019/05/07 21:37:24.233651 receive Hearbeat (2)
2019/05/07 21:37:24.233680 receive Hearbeat (0)
2019/05/07 21:37:24.233699 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.233795 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.233887 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.233927 receive Hearbeat (4)
2019/05/07 21:37:24.234070 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.333358 send Hearbeat (3)
2019/05/07 21:37:24.333395 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.333438 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.333444 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.333744 receive Hearbeat (1)
2019/05/07 21:37:24.333745 receive Hearbeat (2)
2019/05/07 21:37:24.333801 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.333862 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.334212 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.334231 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.334237 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.334243 receive Hearbeat (4)
2019/05/07 21:37:24.334251 receive Hearbeat (0)
2019/05/07 21:37:24.433686 send Hearbeat (3)
2019/05/07 21:37:24.433717 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.433911 receive Hearbeat (4)
2019/05/07 21:37:24.434052 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.434075 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.434211 receive Hearbeat (0)
2019/05/07 21:37:24.434308 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.434322 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.434446 receive Hearbeat (1)
2019/05/07 21:37:24.434549 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.434569 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.434694 receive Hearbeat (2)
2019/05/07 21:37:24.434787 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.534242 send Hearbeat (3)
2019/05/07 21:37:24.534276 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.534342 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.534430 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.534579 receive Hearbeat (4)
2019/05/07 21:37:24.534644 receive Hearbeat (1)
2019/05/07 21:37:24.534660 receive Hearbeat (2)
2019/05/07 21:37:24.534673 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.534686 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.534793 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.534873 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.534934 receive Hearbeat (0)
2019/05/07 21:37:24.535063 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.634898 send Hearbeat (3)
2019/05/07 21:37:24.634944 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.635082 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.635106 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.635230 receive Hearbeat (0)
2019/05/07 21:37:24.635231 receive Hearbeat (1)
2019/05/07 21:37:24.635248 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.635309 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.635343 receive Hearbeat (2)
2019/05/07 21:37:24.635386 receive Hearbeat (4)
2019/05/07 21:37:24.635420 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.635445 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.635475 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.735184 send Hearbeat (3)
2019/05/07 21:37:24.735229 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.735543 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.735633 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.735650 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.735670 receive Hearbeat (1)
2019/05/07 21:37:24.735767 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.735777 receive Hearbeat (2)
...上面無(wú)線循環(huán)直到超時(shí)

我來(lái)大概解釋下這個(gè)問(wèn)題是怎么造成的尤莺。這里一共有5個(gè)SERVER涂滴。
首先CANDIDATE 0率先拿到3票成為L(zhǎng)EADER,其中不包括節(jié)點(diǎn)3的票或详。
節(jié)點(diǎn)3在處理投票的時(shí)候梧乘,發(fā)現(xiàn)CANDIDATE 0 的CURRENT TERM是1辑莫,他會(huì)先把自己變?yōu)镕OLLOWER 把自己的TERM增加為1揭蜒,隨后開(kāi)始去驗(yàn)證要不要給CANDIDATE 0的投票瑰谜。

在驗(yàn)證期間渤早,節(jié)點(diǎn)3的計(jì)時(shí)器也到了,開(kāi)始發(fā)起一波投票涯竟。

隨后計(jì)時(shí)器才收到節(jié)點(diǎn)3 GRANT VOTE了給CANDIDATE 0,計(jì)時(shí)器重置篓冲。此時(shí)節(jié)點(diǎn)3已經(jīng)再發(fā)起投票了诽俯。

因?yàn)榘l(fā)起投票的時(shí)候,節(jié)點(diǎn)3的CURRENT TERM是1候味,++之后變?yōu)?,所以節(jié)點(diǎn)3是用TERM為2去索要投票崖堤。
自然也可以當(dāng)選LEADER昧廷,此時(shí)在TESTER那已經(jīng)拿到了0號(hào)節(jié)點(diǎn)是LEADER淹办,同時(shí)向LEADER 0發(fā)送了MSG姥宝,并且LEADER0接受了MSG也認(rèn)為自己是LEADER。因?yàn)榇藭r(shí)LEADER 0并沒(méi)有收到節(jié)點(diǎn)3的REQUEST VOTE茸炒,也沒(méi)有收到節(jié)點(diǎn)3的HEARTBEAT。

隨后在節(jié)點(diǎn)0 吃下了MSG,返回是LEADER之后,TESTER開(kāi)始等待這個(gè)結(jié)果在所有節(jié)點(diǎn)達(dá)成一致了吮便。

這個(gè)時(shí)候HEARTBEAT來(lái)了笔呀,節(jié)點(diǎn)0因?yàn)門(mén)ERM小于節(jié)點(diǎn)3,所以變?yōu)榱薋OLLOWER线衫,所以這個(gè)消息丟了凿可。

之后也就沒(méi)有這個(gè)MSG了。這個(gè)MSG丟了授账。

image.png
image.png

結(jié)論 和同事聊了下枯跑,因?yàn)镸SG沒(méi)有COMMIT,所以RAFT是可能會(huì)丟掉沒(méi)有COMMIT的消息白热,所以應(yīng)該是這個(gè)測(cè)試不應(yīng)該覺(jué)得消息給過(guò)去敛助,在沒(méi)COMMIT前,就一定會(huì)達(dá)成一致屋确。

BUG 3 肉眼發(fā)現(xiàn)

需要反向排序纳击,也就是大的在前面续扔,原來(lái)的代碼是小的在前面了。因?yàn)闇y(cè)試都是奇數(shù)焕数,所以沒(méi)測(cè)出來(lái)


image.png

測(cè)試1000次 測(cè)試腳本

預(yù)計(jì)時(shí)間 12小時(shí)

#!/bin/bash

export GOPATH="/home/zyx/Desktop/mit6824/6.824"
export PATH="$PATH:/usr/lib/go-1.9/bin"

rm res -rf
mkdir res

for ((i = 0; i < 100; i++))
do

    for ((c = $((i*6)); c < $(( (i+1)*6)); c++))
    do
         (go test -race) &> ./res/$c &
         sleep 15
    done

    sleep 90

    if grep -nr "WARNING.*" res; then
        echo "WARNING: DATA RACE"
    fi
    if grep -nr "FAIL.*raft.*" res; then
        echo "found fail"
    fi

done

CONCISE 版 代碼

最后根據(jù)論文整理了一版纱昧,CONCISE的代碼,除去注釋堡赔,核心代碼400行识脆,同時(shí)把論文的重點(diǎn)參差進(jìn)代碼注釋中。
這版代碼的LOCK方案有一個(gè)不足善已。我很小心的避開(kāi)了自己創(chuàng)建的CHANNEL使用的時(shí)候灼捂,都UNLOCK 去用。
可是APPLY CH换团,我沒(méi)有做到這一點(diǎn)悉稠。
這就要求上層的APPLICATION的寫(xiě)代碼的時(shí)候,一定要單獨(dú)開(kāi)一個(gè)線程去聽(tīng)APPLY CH的MSG艘包,且避免調(diào)用到RAFT SERVER的LOCK的猛,已經(jīng)包含RAFT SERVER LOCK的方法

這個(gè)問(wèn)題我是在寫(xiě)LAB 3B的時(shí)候發(fā)現(xiàn)的。具體發(fā)現(xiàn)過(guò)程參見(jiàn)我的LAB 3B指南

package raft

import (
    "bytes"
    "labgob"
    "log"
    "math/rand"
    "sort"
    "sync"
    "sync/atomic"
    "time"
)
import "labrpc"

type ApplyMsg struct {
    CommandValid bool
    Command      interface{}
    CommandIndex int
}

type State int
const (
    Follower State = iota // value --> 0
    Candidate             // value --> 1
    Leader                // value --> 2
)

const NULL int = -1

type Log struct {
    Term    int         "term when entry was received by leader"
    Command interface{} "command for state machine,"
}
// A Go object implementing a single Raft peer.
type Raft struct {
    mu        sync.Mutex          // Lock to protect shared access to this peer's state
    peers     []*labrpc.ClientEnd // RPC end points of all peers
    persister *Persister          // Object to hold this peer's persisted state
    me        int                 // this peer's index into peers[]

    // state a Raft server must maintain.
    state     State

    //Persistent state on all servers:(Updated on stable storage before responding to RPCs)
    currentTerm int    "latest term server has seen (initialized to 0 increases monotonically)"
    votedFor    int    "candidateId that received vote in current term (or null if none)"
    log         []Log  "log entries;(first index is 1)"

    //Volatile state on all servers:
    commitIndex int    "index of highest log entry known to be committed (initialized to 0, increases monotonically)"
    lastApplied int    "index of highest log entry applied to state machine (initialized to 0, increases monotonically)"

    //Volatile state on leaders:(Reinitialized after election)
    nextIndex   []int  "for each server,index of the next log entry to send to that server"
    matchIndex  []int  "for each server,index of highest log entry known to be replicated on server(initialized to 0, im)"

    //channel
    applyCh     chan ApplyMsg // from Make()
    killCh      chan bool //for Kill()
    //handle rpc
    voteCh      chan bool
    appendLogCh chan bool

}

// return currentTerm and whether this server believes it is the leader.
func (rf *Raft) GetState() (int, bool) {
    var term int
    var isleader bool
    rf.mu.Lock()
    defer rf.mu.Unlock()
    term = rf.currentTerm
    isleader = (rf.state == Leader)
    return term, isleader
}

// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
func (rf *Raft) persist() {
    w := new(bytes.Buffer)
    e := labgob.NewEncoder(w)
    e.Encode(rf.currentTerm)
    e.Encode(rf.votedFor)
    e.Encode(rf.log)
    data := w.Bytes()
    rf.persister.SaveRaftState(data)
}

// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
    if data == nil || len(data) < 1 { // bootstrap without any state?
        return
    }
    r := bytes.NewBuffer(data)
    d := labgob.NewDecoder(r)
    var currentTerm int
    var voteFor int
    var clog []Log
    if  d.Decode(&currentTerm) != nil || d.Decode(&voteFor) != nil || d.Decode(&clog) != nil {
        log.Fatal("readPersist ERROR for server %v",rf.me)
    } else {
        rf.mu.Lock()
        rf.currentTerm, rf.votedFor, rf.log = currentTerm, voteFor, clog
        rf.mu.Unlock()
    }
}

// RequestVote RPC arguments structure. field names must start with capital letters!
type RequestVoteArgs struct {
    Term            int "candidate’s term"
    CandidateId     int "candidate requesting vote"
    LastLogIndex    int "index of candidate’s last log entry (§5.4)"
    LastLogTerm     int "term of candidate’s last log entry (§5.4)"
}

// RequestVote RPC reply structure. field names must start with capital letters!
type RequestVoteReply struct {
    Term        int  "currentTerm, for candidate to update itself"
    VoteGranted bool "true means candidate received vote"
}

//RequestVote RPC handler.
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    if (args.Term > rf.currentTerm) {//all server rule 1 If RPC request or response contains term T > currentTerm:
        rf.beFollower(args.Term) // set currentTerm = T, convert to follower (§5.1)
    }
    reply.Term = rf.currentTerm
    reply.VoteGranted = false
    if (args.Term < rf.currentTerm) || (rf.votedFor != NULL && rf.votedFor != args.CandidateId) {
        // Reply false if term < currentTerm (§5.1)  If votedFor is not null and not candidateId,
    } else if args.LastLogTerm < rf.getLastLogTerm() || (args.LastLogTerm == rf.getLastLogTerm() && args.LastLogIndex < rf.getLastLogIdx()){
        //If the logs have last entries with different terms, then the log with the later term is more up-to-date.
        // If the logs end with the same term, then whichever log is longer is more up-to-date.
        // Reply false if candidate’s log is at least as up-to-date as receiver’s log
    } else {
        //grant vote
        rf.votedFor = args.CandidateId
        reply.VoteGranted = true
        rf.state = Follower
        rf.persist()
        send(rf.voteCh) //because If election timeout elapses without receiving granting vote to candidate, so wake up

    }
}

////RequestVote RPC sender.
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
    ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
    return ok
}

type AppendEntriesArgs struct {
    Term         int    "leader’s term"
    LeaderId     int    "so follower can redirect clients"
    PrevLogIndex int    "index of log entry immediately preceding new ones"
    PrevLogTerm  int    "term of prevLogIndex entry"
    Entries      []Log  "log entries to store (empty for heartbeat;may send more than one for efficiency)"
    LeaderCommit int    "leader’s commitIndex"
}

type AppendEntriesReply struct {
    Term          int   "currentTerm, for leader to update itself"
    Success       bool  "true if follower contained entry matching prevLogIndex and prevLogTerm"
    ConflictIndex int   "the first index it stores for that conflict term"
    ConflictTerm  int   "the term of the conflicting entry"
}

//AppendEntries RPC handler.
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {//now only for heartbeat
    rf.mu.Lock()
    defer rf.mu.Unlock()
    defer send(rf.appendLogCh) //If election timeout elapses without receiving AppendEntries RPC from current leader
    if args.Term > rf.currentTerm { //all server rule 1 If RPC request or response contains term T > currentTerm:
        rf.beFollower(args.Term) // set currentTerm = T, convert to follower (§5.1)
    }
    reply.Term = rf.currentTerm
    reply.Success = false
    reply.ConflictTerm = NULL
    reply.ConflictIndex = 0
    //1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
    prevLogIndexTerm := -1
    logSize := len(rf.log)
    if args.PrevLogIndex >= 0 && args.PrevLogIndex < len(rf.log) {
        prevLogIndexTerm = rf.log[args.PrevLogIndex].Term
    }
    if prevLogIndexTerm != args.PrevLogTerm {
        reply.ConflictIndex = logSize
        if prevLogIndexTerm == -1 {//If a follower does not have prevLogIndex in its log,
            //it should return with conflictIndex = len(log) and conflictTerm = None.
        } else { //If a follower does have prevLogIndex in its log, but the term does not match
            reply.ConflictTerm = prevLogIndexTerm //it should return conflictTerm = log[prevLogIndex].Term,
            i := 0
            for ; i < logSize; i++ {//and then search its log for
                if rf.log[i].Term == reply.ConflictTerm {//the first index whose entry has term equal to conflictTerm
                    reply.ConflictIndex = i
                    break
                }
            }
        }
        return
    }
    //2. Reply false if term < currentTerm (§5.1)
    if args.Term < rf.currentTerm {return}

    index := args.PrevLogIndex
    for i := 0; i < len(args.Entries); i++ {
        index++
        if index < logSize {
            if rf.log[index].Term == args.Entries[i].Term {
                continue
            } else {//3. If an existing entry conflicts with a new one (same index but different terms),
                rf.log = rf.log[:index]//delete the existing entry and all that follow it (§5.3)
            }
        }
        rf.log = append(rf.log,args.Entries[i:]...) //4. Append any new entries not already in the log
        rf.persist()
        break;
    }
    //5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
    if args.LeaderCommit > rf.commitIndex {
        rf.commitIndex = Min(args.LeaderCommit ,rf.getLastLogIdx())
        rf.updateLastApplied()
    }
    reply.Success = true
}

func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
    ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
    return ok
}


//Leader Section:
func (rf *Raft) startAppendLog() {
    for i := 0; i < len(rf.peers); i++ {
        if i == rf.me {
            continue
        }
        go func(idx int) {
            for {
                rf.mu.Lock();
                if rf.state != Leader {
                    rf.mu.Unlock()
                    return
                } //send initial empty AppendEntries RPCs (heartbeat) to each server
                args := AppendEntriesArgs{
                    rf.currentTerm,
                    rf.me,
                    rf.getPrevLogIdx(idx),
                    rf.getPrevLogTerm(idx),
                    //If last log index ≥ nextIndex for a follower:send AppendEntries RPC with log entries starting at nextIndex
                    //nextIndex > last log index, rf.log[rf.nextIndex[idx]:] will be empty then like a heartbeat
                    append(make([]Log,0),rf.log[rf.nextIndex[idx]:]...),
                    rf.commitIndex,
                }
                rf.mu.Unlock()
                reply := &AppendEntriesReply{}
                ret := rf.sendAppendEntries(idx, &args, reply)
                rf.mu.Lock();
                if !ret || rf.state != Leader || rf.currentTerm != args.Term {
                    rf.mu.Unlock()
                    return
                }
                if reply.Term > rf.currentTerm {//all server rule 1 If RPC response contains term T > currentTerm:
                    rf.beFollower(reply.Term) // set currentTerm = T, convert to follower (§5.1)
                    rf.mu.Unlock()
                    return
                }
                if reply.Success {//If successful:update nextIndex and matchIndex for follower
                    rf.matchIndex[idx] = args.PrevLogIndex + len(args.Entries)
                    rf.nextIndex[idx] = rf.matchIndex[idx] + 1
                    rf.updateCommitIndex()
                    rf.mu.Unlock()
                    return
                } else { //If AppendEntries fails because of log inconsistency: decrement nextIndex and retry
                    tarIndex := reply.ConflictIndex //If it does not find an entry with that term
                    if reply.ConflictTerm != NULL {
                        logSize := len(rf.log) //first search its log for conflictTerm
                        for i := 0; i < logSize; i++ {//if it finds an entry in its log with that term,
                            if rf.log[i].Term != reply.ConflictTerm { continue }
                            for i < logSize && rf.log[i].Term == reply.ConflictTerm { i++ }//set nextIndex to be the one
                            tarIndex = i //beyond the index of the last entry in that term in its log
                        }
                    }
                    rf.nextIndex[idx] = tarIndex;
                    rf.mu.Unlock()
                }
            }
        }(i)
    }
}

// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
func (rf *Raft) Start(command interface{}) (int, int, bool) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    index := -1
    term := rf.currentTerm
    isLeader := (rf.state == Leader)
    //If command received from client: append entry to local log, respond after entry applied to state machine (§5.3)
    if isLeader {
        index = rf.getLastLogIdx() + 1
        newLog := Log{
            rf.currentTerm,
            command,
        }
        rf.log = append(rf.log,newLog)
        rf.persist()
    }
    return index, term, isLeader
}

//If there exists an N such that N > commitIndex,
// a majority of matchIndex[i] ≥ N, and log[N].term == currentTerm: set commitIndex = N (§5.3, §5.4).
func (rf *Raft) updateCommitIndex() {
    rf.matchIndex[rf.me] = len(rf.log) - 1
    copyMatchIndex := make([]int,len(rf.matchIndex))
    copy(copyMatchIndex,rf.matchIndex)
    sort.Sort(sort.Reverse(sort.IntSlice(copyMatchIndex)))
    N := copyMatchIndex[len(copyMatchIndex)/2]
    if N > rf.commitIndex && rf.log[N].Term == rf.currentTerm {
        rf.commitIndex = N
        rf.updateLastApplied()
    }
}

func (rf *Raft) beLeader() {
    if rf.state != Candidate {
        return
    }
    rf.state = Leader
    //initialize leader data
    rf.nextIndex = make([]int,len(rf.peers))
    rf.matchIndex = make([]int,len(rf.peers))//initialized to 0
    for i := 0; i < len(rf.nextIndex); i++ {//(initialized to leader last log index + 1)
        rf.nextIndex[i] = rf.getLastLogIdx() + 1
    }
}
//end Leader section


//Candidate Section:
// If AppendEntries RPC received from new leader: convert to follower implemented in AppendEntries RPC Handler
func (rf *Raft) beCandidate() { //Reset election timer are finished in caller
    rf.state = Candidate
    rf.currentTerm++ //Increment currentTerm
    rf.votedFor = rf.me //vote myself first
    rf.persist()
    //ask for other's vote
    go rf.startElection() //Send RequestVote RPCs to all other servers
}
//If election timeout elapses: start new election handled in caller
func (rf *Raft) startElection() {
    rf.mu.Lock()
    args := RequestVoteArgs{
        rf.currentTerm,
        rf.me,
        rf.getLastLogIdx(),
        rf.getLastLogTerm(),

    };
    rf.mu.Unlock()
    var votes int32 = 1;
    for i := 0; i < len(rf.peers); i++ {
        if i == rf.me {
            continue
        }
        go func(idx int) {
            reply := &RequestVoteReply{}
            ret := rf.sendRequestVote(idx,&args,reply)

            if ret {
                rf.mu.Lock()
                defer rf.mu.Unlock()
                if reply.Term > rf.currentTerm {
                    rf.beFollower(reply.Term)
                    return
                }
                if rf.state != Candidate || rf.currentTerm != args.Term{
                    return
                }
                if reply.VoteGranted {
                    atomic.AddInt32(&votes,1)
                } //If votes received from majority of servers: become leader
                if atomic.LoadInt32(&votes) > int32(len(rf.peers) / 2) {
                    rf.beLeader()
                    send(rf.voteCh) //after be leader, then notify 'select' goroutine will sending out heartbeats immediately
                }
            }
        }(i)
    }
}
//end Candidate section

//Follower Section:
func (rf *Raft) beFollower(term int) {
    rf.state = Follower
    rf.votedFor = NULL
    rf.currentTerm = term
    rf.persist()
}
//end Follower section

//all server rule : If commitIndex > lastApplied: increment lastApplied, apply log[lastApplied] to state machine
func (rf *Raft) updateLastApplied() {
    for rf.lastApplied < rf.commitIndex {
        rf.lastApplied++
        curLog := rf.log[rf.lastApplied]
        applyMsg := ApplyMsg{
            true,
            curLog.Command,
            rf.lastApplied,
        }
        rf.applyCh <- applyMsg
    }
}


// the tester calls Kill() when a Raft instance won't be needed again.
func (rf *Raft) Kill() {
    send(rf.killCh)
}

//Helper function
func send(ch chan bool) {
    select {
    case <-ch: //if already set, consume it then resent to avoid block
    default:
    }
    ch <- true
}

func (rf *Raft) getPrevLogIdx(i int) int {
    return rf.nextIndex[i] - 1
}

func (rf *Raft) getPrevLogTerm(i int) int {
    prevLogIdx := rf.getPrevLogIdx(i)
    if prevLogIdx < 0 {
        return -1
    }
    return rf.log[prevLogIdx].Term
}

func (rf *Raft) getLastLogIdx() int {
    return len(rf.log) - 1
}

func (rf *Raft) getLastLogTerm() int {
    idx := rf.getLastLogIdx()
    if idx < 0 {
        return -1
    }
    return rf.log[idx].Term
}

// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
func Make(peers []*labrpc.ClientEnd, me int,
    persister *Persister, applyCh chan ApplyMsg) *Raft {
    rf := &Raft{}
    rf.peers = peers
    rf.persister = persister
    rf.me = me

    rf.state = Follower
    rf.currentTerm = 0
    rf.votedFor = NULL
    rf.log = make([]Log,1) //(first index is 1)

    rf.commitIndex = 0
    rf.lastApplied = 0

    rf.applyCh = applyCh
    //because gorountne only send the chan to below goroutine,to avoid block, need 1 buffer
    rf.voteCh = make(chan bool,1)
    rf.appendLogCh = make(chan bool,1)
    rf.killCh = make(chan bool,1)
    // initialize from state persisted before a crash
    rf.readPersist(persister.ReadRaftState())

    //because from hint The tester requires that the leader send heartbeat RPCs no more than ten times per second.
    heartbeatTime := time.Duration(100) * time.Millisecond

    //from hint :You'll need to write code that takes actions periodically or after delays in time.
    //  The easiest way to do this is to create a goroutine with a loop that calls time.Sleep().
    go func() {
        for {
            select {
            case <-rf.killCh:
                return
            default:
            }
            electionTime := time.Duration(rand.Intn(200) + 300) * time.Millisecond

            rf.mu.Lock()
            state := rf.state
            rf.mu.Unlock()

            switch state {
            case Follower, Candidate:
                select {
                case <-rf.voteCh:
                case <-rf.appendLogCh:
                case <-time.After(electionTime):
                    rf.mu.Lock()
                    rf.beCandidate() //becandidate, Reset election timer, then start election
                    rf.mu.Unlock()
                }
            case Leader:
                rf.startAppendLog()
                time.Sleep(heartbeatTime)
            }
        }
    }()
    return rf
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末辑甜,一起剝皮案震驚了整個(gè)濱河市衰絮,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌磷醋,老刑警劉巖猫牡,帶你破解...
    沈念sama閱讀 218,451評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異邓线,居然都是意外死亡淌友,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,172評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門(mén)骇陈,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)震庭,“玉大人,你說(shuō)我怎么就攤上這事你雌∑髁” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,782評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵婿崭,是天一觀的道長(zhǎng)拨拓。 經(jīng)常有香客問(wèn)我,道長(zhǎng)氓栈,這世上最難降的妖魔是什么渣磷? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,709評(píng)論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮授瘦,結(jié)果婚禮上醋界,老公的妹妹穿的比我還像新娘竟宋。我一直安慰自己,他們只是感情好形纺,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,733評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布丘侠。 她就那樣靜靜地躺著,像睡著了一般挡篓。 火紅的嫁衣襯著肌膚如雪婉陷。 梳的紋絲不亂的頭發(fā)上帚称,一...
    開(kāi)封第一講書(shū)人閱讀 51,578評(píng)論 1 305
  • 那天官研,我揣著相機(jī)與錄音,去河邊找鬼闯睹。 笑死戏羽,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的楼吃。 我是一名探鬼主播始花,決...
    沈念sama閱讀 40,320評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼孩锡!你這毒婦竟也來(lái)了酷宵?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,241評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤躬窜,失蹤者是張志新(化名)和其女友劉穎浇垦,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體荣挨,經(jīng)...
    沈念sama閱讀 45,686評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡男韧,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,878評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了默垄。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片此虑。...
    茶點(diǎn)故事閱讀 39,992評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖口锭,靈堂內(nèi)的尸體忽然破棺而出朦前,到底是詐尸還是另有隱情,我是刑警寧澤鹃操,帶...
    沈念sama閱讀 35,715評(píng)論 5 346
  • 正文 年R本政府宣布韭寸,位于F島的核電站,受9級(jí)特大地震影響组民,放射性物質(zhì)發(fā)生泄漏棒仍。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,336評(píng)論 3 330
  • 文/蒙蒙 一臭胜、第九天 我趴在偏房一處隱蔽的房頂上張望莫其。 院中可真熱鬧癞尚,春花似錦、人聲如沸乱陡。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,912評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)憨颠。三九已至胳徽,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間爽彤,已是汗流浹背养盗。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,040評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留适篙,地道東北人往核。 一個(gè)月前我還...
    沈念sama閱讀 48,173評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像嚷节,于是被迫代替她去往敵國(guó)和親聂儒。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,947評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容