第一步 閱讀論文6之后的部分
https://www.infoq.cn/article/raft-paper
隨后開(kāi)始看2C文檔
任務(wù)目標(biāo)
If a Raft-based server reboots it should resume service where it left off. This requires that Raft keep persistent state that survives a reboot. The paper's Figure 2 mentions which state should be persistent, and raft.go contains examples of how to save and restore persistent state.
A “real” implementation would do this by writing Raft's persistent state to disk each time it changes, and reading the latest saved state from disk when restarting after a reboot. Your implementation won't use the disk; instead, it will save and restore persistent state from a Persister object (see persister.go). Whoever calls Raft.Make() supplies a Persister that initially holds Raft's most recently persisted state (if any). Raft should initialize its state from that Persister, and should use it to save its persistent state each time the state changes. Use the Persister's ReadRaftState() and SaveRaftState() methods.
第二步 實(shí)現(xiàn)persist(),readPersist()
Complete the functions persist() and readPersist() in raft.go by adding code to save and restore persistent state. You will need to encode (or "serialize") the state as an array of bytes in order to pass it to the Persister. Use the labgob encoder we provide to do this; see the comments in persist() and readPersist(). labgob is derived from Go's gob encoder; the only difference is that labgob prints error messages if you try to encode structures with lower-case field names.
因?yàn)槔佣荚谧⑨尷锝o你了姨裸,我把注釋解掉众弓,改一改寫(xiě)起來(lái)十分簡(jiǎn)單蚂踊。只要知道什么是要持久存儲(chǔ)的即可冈欢。
第三步 思考何時(shí)調(diào)用這2個(gè)函數(shù)
You now need to determine at what points in the Raft protocol your servers are required to persist their state, and insert calls to persist() in those places. There is already a call to readPersist() in Raft.Make(). Once you've done this, you should pass the remaining tests. You may want to first try to pass the "basic persistence" test (go test -run 'TestPersist12C'), and then tackle the remaining ones (go test -run 2C).
基本思路就是修改了這3個(gè)值的時(shí)候样刷,就需要做PERSIST窒朋,最后來(lái)思考怎么加鎖的問(wèn)題辰妙。
我們搜索rf.currentTerm
, rf.voteFor
, rf.log
搜完之后,我加了如下幾處碉输。
因?yàn)閞eadPersist,只有當(dāng)機(jī)器掛了重啟之后才需要亭珍,所以在MAKE里做了就夠了敷钾。不用添加額外的地方枝哄。
我們可以發(fā)現(xiàn)調(diào)用persist 都是在鎖保護(hù)范圍內(nèi)的,所以這個(gè)方法不用加鎖阻荒。
但是readPersist需要加鎖
第四步 FROM HINT
In order to pass some of the challenging tests towards the end, such as those marked "unreliable", you will need to implement the optimization to allow a follower to back up the leader's nextIndex by more than one entry at a time. See the description in the extended Raft paper starting at the bottom of page 7 and top of page 8 (marked by a gray line). The paper is vague about the details; you will need to fill in the gaps, perhaps with the help of the 6.824 Raft lectures.
文章內(nèi)容:
If desired, the protocol can be optimized to reduce the number of rejected AppendEntries RPCs. For example, when rejecting an AppendEntries request, the follower can include the term of the conflicting entry and the first index it stores for that term. With this information, the leader can decrement nextIndex to bypass all of the conflicting entries in that term; one AppendEntries RPC will be required for each term with conflicting entries, rather than one RPC per entry.
例如挠锥,當(dāng)拒絕了一個(gè) AppendEntries 請(qǐng)求,追隨者可以記錄下沖突日志條目的任期號(hào)和自己存儲(chǔ)那個(gè)任期的最早的索引侨赡。通過(guò)這些信息蓖租,領(lǐng)導(dǎo)人能夠直接遞減nextIndex跨過(guò)那個(gè)任期內(nèi)所有的沖突條目;這樣的話羊壹,一個(gè)沖突的任期需要一次 AppendEntries RPC蓖宦,而不是每一個(gè)沖突條目需要一次 AppendEntries RPC。
We believe the protocol the authors probably want you to follow is:
- If a follower does not have prevLogIndex in its log, it should return with conflictIndex = len(log) and conflictTerm = None.
- If a follower does have prevLogIndex in its log, but the term does not match, it should return conflictTerm = log[prevLogIndex].Term, and then search its log for the first index whose entry has term equal to conflictTerm.
- Upon receiving a conflict response, the leader should first search its log for conflictTerm. If it finds an entry in its log with that term, it should set nextIndex to be the one beyond the index of the last entry in that term in its log.
- If it does not find an entry with that term, it should set nextIndex = conflictIndex.
根據(jù)上述內(nèi)容油猫,首先要在REPLY里加一個(gè)conflictIndex 和 conflictTerm
因?yàn)橹皇荝EPLY.SUCCESS = FALSE 的時(shí)候 需要設(shè)置conflictIndex
和 confictTerm
所以只要關(guān)注AppendEntries
handler 的2個(gè)RETURN球昨, 結(jié)合上文的意思,修改代碼如下:
PASS 2C
Test race
zyx@zyx-virtual-machine:~/Desktop/mit6824/6.824/src/raft$ go test -race
Test (2A): initial election ...
... Passed -- 3.1 3 38 0
Test (2A): election after network failure ...
... Passed -- 4.5 3 88 0
Test (2B): basic agreement ...
... Passed -- 1.3 5 32 3
Test (2B): agreement despite follower disconnection ...
... Passed -- 6.6 3 97 8
Test (2B): no agreement if too many followers disconnect ...
... Passed -- 3.9 5 148 3
Test (2B): concurrent Start()s ...
... Passed -- 0.8 3 10 6
Test (2B): rejoin of partitioned leader ...
... Passed -- 4.9 3 109 4
Test (2B): leader backs up quickly over incorrect follower logs ...
... Passed -- 37.8 5 2198 102
Test (2B): RPC counts aren't too high ...
... Passed -- 2.3 3 28 12
Test (2C): basic persistence ...
... Passed -- 5.1 3 224 6
Test (2C): more persistence ...
... Passed -- 18.7 5 2182 16
Test (2C): partitioned leader and one follower crash, leader restarts ...
... Passed -- 2.5 3 51 4
Test (2C): Figure 8 ...
... Passed -- 34.7 5 26854 37
Test (2C): unreliable agreement ...
... Passed -- 12.4 5 326 246
Test (2C): Figure 8 (unreliable) ...
... Passed -- 44.2 5 2680 461
Test (2C): churn ...
... Passed -- 16.7 5 833 254
Test (2C): unreliable churn ...
... Passed -- 19.1 5 1403 174
PASS
ok raft 219.623s
但是我們看到Test (2C): Figure 8 ...
... Passed -- 34.7 5 26854 37
的RPC發(fā)送條目 非常大眨攘,有時(shí)候TEST -RACE 會(huì)報(bào)錯(cuò)主慰, 說(shuō)超過(guò)race mode 下8192個(gè)goroutine
閱讀了一下TestFigure82C 的代碼
發(fā)現(xiàn)會(huì)在選出LEADER 后 做一個(gè)CRASH 操作
看下CRASH的實(shí)現(xiàn)
發(fā)現(xiàn)這里會(huì)調(diào)用KILL,但我并沒(méi)有實(shí)現(xiàn)這個(gè)
實(shí)現(xiàn)KILL
所謂KILL鲫售,就是讓一個(gè)NODE共螺,發(fā)現(xiàn)了之后,就直接結(jié)束掉情竹。
我還是用GO的CHANNEL來(lái)做藐不。
重新跑一下測(cè)試
Test All with time
Test 100 times
BUG 1 TestFigure8Unreliable2C
測(cè)試多次的時(shí)候,發(fā)現(xiàn) TestFigure8Unreliable2C 這個(gè)CASE 有1/10的概率會(huì)FAIL秦效,看下FAIL信息
Test (2C): Figure 8 (unreliable) ...
2019/05/01 09:49:14 apply error: commit index=235 server=0 7998 != server=2 4299
exit status 1
FAIL raft 28.516s
2個(gè)INDEX 不一致雏蛮。似乎是2個(gè)NODE 上,同一個(gè)INDEX的LOG 不一致了阱州。而COMMIT的應(yīng)該要一致挑秉。
沒(méi)有頭緒時(shí),還是先過(guò)一遍 STUDENT GUIDE
https://thesquareplanet.com/blog/students-guide-to-raft/
確保里面提到的每一個(gè)點(diǎn) 你都完全理解苔货,而且在代碼里有所行動(dòng)犀概。
我發(fā)現(xiàn)了一個(gè)問(wèn)題,在拿到REPLY 后他要用CURRENT TERM 和 ARGS.TERM 去做比較夜惭。也就是說(shuō)2個(gè)SEND RPC的地方姻灶,在拿到REPLY之后,還要做個(gè)CHECK 诈茧,如果CURRENT TERM 和 ARGS.TERM 不一致产喉,就要直接RETURN,而忽略這個(gè)REPLY
在2個(gè)SENDER函數(shù)里 添加代碼如下
測(cè)試腳本
#!/bin/bash
export GOPATH="/home/zyx/Desktop/mit6824/6.824"
export PATH="$PATH:/usr/lib/go-1.9/bin"
rm res -rf
mkdir res
for ((i = 0; i < 10; i++))
do
for ((c = $((i*30)); c < $(( (i+1)*30)); c++))
do #replace job name here
(go test -run TestFigure8Unreliable2C ) &> ./res/$c &
done
sleep 40
grep -nr "FAIL.*raft.*" res
done
修改前測(cè)試結(jié)果
修改后測(cè)試結(jié)果
BUG 2
Test (2B): no agreement if too many followers disconnect ...
--- FAIL: TestFailNoAgree2B (2.35s)
config.go:465: one(10) failed to reach agreement
這個(gè)BUG 出現(xiàn)概率非常低,有時(shí)測(cè)1000次會(huì)出現(xiàn)10次左右曾沈。有時(shí)測(cè)3000次都不出現(xiàn)尘颓。
在我打了很多LOG,花了很多時(shí)間測(cè)了很多遍晦譬,找到原因疤苹。
測(cè)試腳本
#!/bin/bash
export GOPATH="/home/zyx/Desktop/mit6824/6.824"
export PATH="$PATH:/usr/lib/go-1.9/bin"
rm res -rf
mkdir res
set int j = 0
for ((i = 0; j < 25; i++))
do
if (($((i % 10)) == 0)); then
rm res -rf
mkdir res
echo $i
fi
for ((c = $((i*30)); c < $(( (i+1)*30)); c++))
do
(go test -run TestFailNoAgree2B) &> ./res/$c &
done
sleep 7
if grep -nr "FAIL.*raft.*" res; then
exit 1
fi
done
輸出
2019/05/07 21:37:23.027238 SEND IN MSG (1), applyMsg:10
2019/05/07 21:37:23.389237 SEND IN MSG (2), applyMsg:10
2019/05/07 21:37:23.389254 SEND IN MSG (3), applyMsg:10
2019/05/07 21:37:23.389261 SEND IN MSG (4), applyMsg:10
2019/05/07 21:37:23.389472 0 at 1 start election, last index 0 last term 0 last entry {0 <nil>}
2019/05/07 21:37:23.406102 4 be follow so curterm++ now is 1
2019/05/07 21:37:23.406188 granted Vote (4)
2019/05/07 21:37:23.406633 1 be follow so curterm++ now is 1
2019/05/07 21:37:23.406695 granted Vote (1)
2019/05/07 21:37:23.406874 CANDIDATE: 0 receive enough vote and becoming a new leader
2019/05/07 21:37:23.425553 3 be follow so curterm++ now is 1
2019/05/07 21:37:23.425675 2 be follow so curterm++ now is 1
2019/05/07 21:37:23.425745 granted Vote (2)
2019/05/07 21:37:23.429599 3 at 2 start election, last index 0 last term 0 last entry {0 <nil>}
2019/05/07 21:37:23.429635 granted Vote (3)
2019/05/07 21:37:23.429815 1 be follow so curterm++ now is 2
2019/05/07 21:37:23.429839 granted Vote (1)
2019/05/07 21:37:23.430065 2 be follow so curterm++ now is 2
2019/05/07 21:37:23.430098 granted Vote (2)
2019/05/07 21:37:23.430163 CANDIDATE: 3 receive enough vote and becoming a new leader
2019/05/07 21:37:23.430172 be Ledaer (3)
2019/05/07 21:37:23.430177 send Hearbeat (3)
2019/05/07 21:37:23.430194 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.430368 4 be follow so curterm++ now is 2
2019/05/07 21:37:23.430386 receive Hearbeat (4)
2019/05/07 21:37:23.430489 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.430527 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.430612 receive Hearbeat (4)
2019/05/07 21:37:23.430672 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.430684 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.430786 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.430891 receive Hearbeat (1)
2019/05/07 21:37:23.430945 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.430954 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.431046 receive Hearbeat (2)
2019/05/07 21:37:23.431129 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.431143 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.431277 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.431312 granted Vote (4)
2019/05/07 21:37:23.431444 receive Hearbeat (1)
2019/05/07 21:37:23.431459 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.431566 receive Hearbeat (2)
2019/05/07 21:37:23.431620 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.431695 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.433226 be Ledaer (0)
2019/05/07 21:37:23.433240 SEND IN MSG (0), applyMsg:10
2019/05/07 21:37:23.433277 SEND IN MSG SUCCESS (0), applyMsg:10
2019/05/07 21:37:23.433321 send Hearbeat from 0 to (4)
2019/05/07 21:37:23.433504 send Hearbeat from 0 to (1)
2019/05/07 21:37:23.433696 send Hearbeat from 0 to (2)
2019/05/07 21:37:23.433839 send Hearbeat from 0 to (3)
2019/05/07 21:37:23.433975 0 be follow so curterm++ now is 2
2019/05/07 21:37:23.434064 receive Hearbeat (0)
2019/05/07 21:37:23.434121 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.530457 send Hearbeat (3)
2019/05/07 21:37:23.530501 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.530628 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.530647 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.530689 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.530822 receive Hearbeat (0)
2019/05/07 21:37:23.530831 receive Hearbeat (2)
2019/05/07 21:37:23.530822 receive Hearbeat (1)
2019/05/07 21:37:23.530911 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.530924 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.530955 receive Hearbeat (4)
2019/05/07 21:37:23.530992 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.531030 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.630715 send Hearbeat (3)
2019/05/07 21:37:23.630745 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.630775 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.630827 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.630869 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.630914 receive Hearbeat (4)
2019/05/07 21:37:23.630928 receive Hearbeat (1)
2019/05/07 21:37:23.630961 receive Hearbeat (0)
2019/05/07 21:37:23.631001 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.631030 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.631040 receive Hearbeat (2)
2019/05/07 21:37:23.631052 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.631118 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.730978 send Hearbeat (3)
2019/05/07 21:37:23.731032 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.731043 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.731036 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.731202 receive Hearbeat (4)
2019/05/07 21:37:23.731209 receive Hearbeat (1)
2019/05/07 21:37:23.731264 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.731274 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.731391 receive Hearbeat (2)
2019/05/07 21:37:23.731498 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.731544 receive Hearbeat (0)
2019/05/07 21:37:23.731801 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.731869 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.831101 send Hearbeat (3)
2019/05/07 21:37:23.831131 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.831294 receive Hearbeat (4)
2019/05/07 21:37:23.831356 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.831374 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.831457 receive Hearbeat (0)
2019/05/07 21:37:23.831512 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.831521 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.831619 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.831687 receive Hearbeat (1)
2019/05/07 21:37:23.831771 receive Hearbeat (2)
2019/05/07 21:37:23.831782 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.831841 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.931542 send Hearbeat (3)
2019/05/07 21:37:23.931608 send Hearbeat from 3 to (4)
2019/05/07 21:37:23.931619 send Hearbeat from 3 to (1)
2019/05/07 21:37:23.931809 receive Hearbeat (1)
2019/05/07 21:37:23.931810 receive Hearbeat (4)
2019/05/07 21:37:23.931836 send Hearbeat from 3 to (0)
2019/05/07 21:37:23.931912 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.931957 send Hearbeat from 3 to (2)
2019/05/07 21:37:23.932056 receive Hearbeat (0)
2019/05/07 21:37:23.932139 receive Hearbeat (2)
2019/05/07 21:37:23.932170 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.932301 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:23.932326 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.031737 send Hearbeat (3)
2019/05/07 21:37:24.031778 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.031788 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.031847 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.032029 receive Hearbeat (1)
2019/05/07 21:37:24.032059 receive Hearbeat (0)
2019/05/07 21:37:24.032084 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.032204 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.032324 receive Hearbeat (2)
2019/05/07 21:37:24.032363 receive Hearbeat (4)
2019/05/07 21:37:24.032427 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.032525 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.032573 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.132327 send Hearbeat (3)
2019/05/07 21:37:24.132377 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.132603 receive Hearbeat (4)
2019/05/07 21:37:24.132715 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.132734 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.132878 receive Hearbeat (0)
2019/05/07 21:37:24.132981 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.132999 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.133252 receive Hearbeat (1)
2019/05/07 21:37:24.133277 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.133379 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.133436 receive Hearbeat (2)
2019/05/07 21:37:24.133602 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.232982 send Hearbeat (3)
2019/05/07 21:37:24.233076 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.233182 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.233387 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.233423 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.233534 receive Hearbeat (1)
2019/05/07 21:37:24.233651 receive Hearbeat (2)
2019/05/07 21:37:24.233680 receive Hearbeat (0)
2019/05/07 21:37:24.233699 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.233795 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.233887 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.233927 receive Hearbeat (4)
2019/05/07 21:37:24.234070 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.333358 send Hearbeat (3)
2019/05/07 21:37:24.333395 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.333438 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.333444 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.333744 receive Hearbeat (1)
2019/05/07 21:37:24.333745 receive Hearbeat (2)
2019/05/07 21:37:24.333801 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.333862 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.334212 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.334231 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.334237 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.334243 receive Hearbeat (4)
2019/05/07 21:37:24.334251 receive Hearbeat (0)
2019/05/07 21:37:24.433686 send Hearbeat (3)
2019/05/07 21:37:24.433717 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.433911 receive Hearbeat (4)
2019/05/07 21:37:24.434052 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.434075 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.434211 receive Hearbeat (0)
2019/05/07 21:37:24.434308 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.434322 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.434446 receive Hearbeat (1)
2019/05/07 21:37:24.434549 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.434569 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.434694 receive Hearbeat (2)
2019/05/07 21:37:24.434787 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.534242 send Hearbeat (3)
2019/05/07 21:37:24.534276 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.534342 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.534430 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.534579 receive Hearbeat (4)
2019/05/07 21:37:24.534644 receive Hearbeat (1)
2019/05/07 21:37:24.534660 receive Hearbeat (2)
2019/05/07 21:37:24.534673 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.534686 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.534793 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.534873 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.534934 receive Hearbeat (0)
2019/05/07 21:37:24.535063 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.634898 send Hearbeat (3)
2019/05/07 21:37:24.634944 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.635082 send Hearbeat from 3 to (0)
2019/05/07 21:37:24.635106 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.635230 receive Hearbeat (0)
2019/05/07 21:37:24.635231 receive Hearbeat (1)
2019/05/07 21:37:24.635248 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.635309 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.635343 receive Hearbeat (2)
2019/05/07 21:37:24.635386 receive Hearbeat (4)
2019/05/07 21:37:24.635420 reply.Success (3=>2), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.635445 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.635475 reply.Success (3=>0), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.735184 send Hearbeat (3)
2019/05/07 21:37:24.735229 send Hearbeat from 3 to (4)
2019/05/07 21:37:24.735543 send Hearbeat from 3 to (1)
2019/05/07 21:37:24.735633 reply.Success (3=>4), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.735650 send Hearbeat from 3 to (2)
2019/05/07 21:37:24.735670 receive Hearbeat (1)
2019/05/07 21:37:24.735767 reply.Success (3=>1), args.PrevLogIndex:0, len:0
2019/05/07 21:37:24.735777 receive Hearbeat (2)
...上面無(wú)線循環(huán)直到超時(shí)
我來(lái)大概解釋下這個(gè)問(wèn)題是怎么造成的尤莺。這里一共有5個(gè)SERVER涂滴。
首先CANDIDATE 0率先拿到3票成為L(zhǎng)EADER,其中不包括節(jié)點(diǎn)3的票或详。
節(jié)點(diǎn)3在處理投票的時(shí)候梧乘,發(fā)現(xiàn)CANDIDATE 0 的CURRENT TERM是1辑莫,他會(huì)先把自己變?yōu)镕OLLOWER 把自己的TERM增加為1揭蜒,隨后開(kāi)始去驗(yàn)證要不要給CANDIDATE 0的投票瑰谜。
在驗(yàn)證期間渤早,節(jié)點(diǎn)3的計(jì)時(shí)器也到了,開(kāi)始發(fā)起一波投票涯竟。
隨后計(jì)時(shí)器才收到節(jié)點(diǎn)3 GRANT VOTE了給CANDIDATE 0,計(jì)時(shí)器重置篓冲。此時(shí)節(jié)點(diǎn)3已經(jīng)再發(fā)起投票了诽俯。
因?yàn)榘l(fā)起投票的時(shí)候,節(jié)點(diǎn)3的CURRENT TERM是1候味,++之后變?yōu)?,所以節(jié)點(diǎn)3是用TERM為2去索要投票崖堤。
自然也可以當(dāng)選LEADER昧廷,此時(shí)在TESTER那已經(jīng)拿到了0號(hào)節(jié)點(diǎn)是LEADER淹办,同時(shí)向LEADER 0發(fā)送了MSG姥宝,并且LEADER0接受了MSG也認(rèn)為自己是LEADER。因?yàn)榇藭r(shí)LEADER 0并沒(méi)有收到節(jié)點(diǎn)3的REQUEST VOTE茸炒,也沒(méi)有收到節(jié)點(diǎn)3的HEARTBEAT。
隨后在節(jié)點(diǎn)0 吃下了MSG,返回是LEADER之后,TESTER開(kāi)始等待這個(gè)結(jié)果在所有節(jié)點(diǎn)達(dá)成一致了吮便。
這個(gè)時(shí)候HEARTBEAT來(lái)了笔呀,節(jié)點(diǎn)0因?yàn)門(mén)ERM小于節(jié)點(diǎn)3,所以變?yōu)榱薋OLLOWER线衫,所以這個(gè)消息丟了凿可。
之后也就沒(méi)有這個(gè)MSG了。這個(gè)MSG丟了授账。
結(jié)論 和同事聊了下枯跑,因?yàn)镸SG沒(méi)有COMMIT,所以RAFT是可能會(huì)丟掉沒(méi)有COMMIT的消息白热,所以應(yīng)該是這個(gè)測(cè)試不應(yīng)該覺(jué)得消息給過(guò)去敛助,在沒(méi)COMMIT前,就一定會(huì)達(dá)成一致屋确。
BUG 3 肉眼發(fā)現(xiàn)
需要反向排序纳击,也就是大的在前面续扔,原來(lái)的代碼是小的在前面了。因?yàn)闇y(cè)試都是奇數(shù)焕数,所以沒(méi)測(cè)出來(lái)
測(cè)試1000次 測(cè)試腳本
預(yù)計(jì)時(shí)間 12小時(shí)
#!/bin/bash
export GOPATH="/home/zyx/Desktop/mit6824/6.824"
export PATH="$PATH:/usr/lib/go-1.9/bin"
rm res -rf
mkdir res
for ((i = 0; i < 100; i++))
do
for ((c = $((i*6)); c < $(( (i+1)*6)); c++))
do
(go test -race) &> ./res/$c &
sleep 15
done
sleep 90
if grep -nr "WARNING.*" res; then
echo "WARNING: DATA RACE"
fi
if grep -nr "FAIL.*raft.*" res; then
echo "found fail"
fi
done
CONCISE 版 代碼
最后根據(jù)論文整理了一版纱昧,CONCISE的代碼,除去注釋堡赔,核心代碼400行识脆,同時(shí)把論文的重點(diǎn)參差進(jìn)代碼注釋中。
這版代碼的LOCK方案有一個(gè)不足善已。我很小心的避開(kāi)了自己創(chuàng)建的CHANNEL使用的時(shí)候灼捂,都UNLOCK 去用。
可是APPLY CH换团,我沒(méi)有做到這一點(diǎn)悉稠。
這就要求上層的APPLICATION的寫(xiě)代碼的時(shí)候,一定要單獨(dú)開(kāi)一個(gè)線程去聽(tīng)APPLY CH的MSG艘包,且避免調(diào)用到RAFT SERVER的LOCK的猛,已經(jīng)包含RAFT SERVER LOCK的方法
這個(gè)問(wèn)題我是在寫(xiě)LAB 3B的時(shí)候發(fā)現(xiàn)的。具體發(fā)現(xiàn)過(guò)程參見(jiàn)我的LAB 3B指南
package raft
import (
"bytes"
"labgob"
"log"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"
)
import "labrpc"
type ApplyMsg struct {
CommandValid bool
Command interface{}
CommandIndex int
}
type State int
const (
Follower State = iota // value --> 0
Candidate // value --> 1
Leader // value --> 2
)
const NULL int = -1
type Log struct {
Term int "term when entry was received by leader"
Command interface{} "command for state machine,"
}
// A Go object implementing a single Raft peer.
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
// state a Raft server must maintain.
state State
//Persistent state on all servers:(Updated on stable storage before responding to RPCs)
currentTerm int "latest term server has seen (initialized to 0 increases monotonically)"
votedFor int "candidateId that received vote in current term (or null if none)"
log []Log "log entries;(first index is 1)"
//Volatile state on all servers:
commitIndex int "index of highest log entry known to be committed (initialized to 0, increases monotonically)"
lastApplied int "index of highest log entry applied to state machine (initialized to 0, increases monotonically)"
//Volatile state on leaders:(Reinitialized after election)
nextIndex []int "for each server,index of the next log entry to send to that server"
matchIndex []int "for each server,index of highest log entry known to be replicated on server(initialized to 0, im)"
//channel
applyCh chan ApplyMsg // from Make()
killCh chan bool //for Kill()
//handle rpc
voteCh chan bool
appendLogCh chan bool
}
// return currentTerm and whether this server believes it is the leader.
func (rf *Raft) GetState() (int, bool) {
var term int
var isleader bool
rf.mu.Lock()
defer rf.mu.Unlock()
term = rf.currentTerm
isleader = (rf.state == Leader)
return term, isleader
}
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
func (rf *Raft) persist() {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
data := w.Bytes()
rf.persister.SaveRaftState(data)
}
// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var currentTerm int
var voteFor int
var clog []Log
if d.Decode(¤tTerm) != nil || d.Decode(&voteFor) != nil || d.Decode(&clog) != nil {
log.Fatal("readPersist ERROR for server %v",rf.me)
} else {
rf.mu.Lock()
rf.currentTerm, rf.votedFor, rf.log = currentTerm, voteFor, clog
rf.mu.Unlock()
}
}
// RequestVote RPC arguments structure. field names must start with capital letters!
type RequestVoteArgs struct {
Term int "candidate’s term"
CandidateId int "candidate requesting vote"
LastLogIndex int "index of candidate’s last log entry (§5.4)"
LastLogTerm int "term of candidate’s last log entry (§5.4)"
}
// RequestVote RPC reply structure. field names must start with capital letters!
type RequestVoteReply struct {
Term int "currentTerm, for candidate to update itself"
VoteGranted bool "true means candidate received vote"
}
//RequestVote RPC handler.
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if (args.Term > rf.currentTerm) {//all server rule 1 If RPC request or response contains term T > currentTerm:
rf.beFollower(args.Term) // set currentTerm = T, convert to follower (§5.1)
}
reply.Term = rf.currentTerm
reply.VoteGranted = false
if (args.Term < rf.currentTerm) || (rf.votedFor != NULL && rf.votedFor != args.CandidateId) {
// Reply false if term < currentTerm (§5.1) If votedFor is not null and not candidateId,
} else if args.LastLogTerm < rf.getLastLogTerm() || (args.LastLogTerm == rf.getLastLogTerm() && args.LastLogIndex < rf.getLastLogIdx()){
//If the logs have last entries with different terms, then the log with the later term is more up-to-date.
// If the logs end with the same term, then whichever log is longer is more up-to-date.
// Reply false if candidate’s log is at least as up-to-date as receiver’s log
} else {
//grant vote
rf.votedFor = args.CandidateId
reply.VoteGranted = true
rf.state = Follower
rf.persist()
send(rf.voteCh) //because If election timeout elapses without receiving granting vote to candidate, so wake up
}
}
////RequestVote RPC sender.
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
return ok
}
type AppendEntriesArgs struct {
Term int "leader’s term"
LeaderId int "so follower can redirect clients"
PrevLogIndex int "index of log entry immediately preceding new ones"
PrevLogTerm int "term of prevLogIndex entry"
Entries []Log "log entries to store (empty for heartbeat;may send more than one for efficiency)"
LeaderCommit int "leader’s commitIndex"
}
type AppendEntriesReply struct {
Term int "currentTerm, for leader to update itself"
Success bool "true if follower contained entry matching prevLogIndex and prevLogTerm"
ConflictIndex int "the first index it stores for that conflict term"
ConflictTerm int "the term of the conflicting entry"
}
//AppendEntries RPC handler.
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {//now only for heartbeat
rf.mu.Lock()
defer rf.mu.Unlock()
defer send(rf.appendLogCh) //If election timeout elapses without receiving AppendEntries RPC from current leader
if args.Term > rf.currentTerm { //all server rule 1 If RPC request or response contains term T > currentTerm:
rf.beFollower(args.Term) // set currentTerm = T, convert to follower (§5.1)
}
reply.Term = rf.currentTerm
reply.Success = false
reply.ConflictTerm = NULL
reply.ConflictIndex = 0
//1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
prevLogIndexTerm := -1
logSize := len(rf.log)
if args.PrevLogIndex >= 0 && args.PrevLogIndex < len(rf.log) {
prevLogIndexTerm = rf.log[args.PrevLogIndex].Term
}
if prevLogIndexTerm != args.PrevLogTerm {
reply.ConflictIndex = logSize
if prevLogIndexTerm == -1 {//If a follower does not have prevLogIndex in its log,
//it should return with conflictIndex = len(log) and conflictTerm = None.
} else { //If a follower does have prevLogIndex in its log, but the term does not match
reply.ConflictTerm = prevLogIndexTerm //it should return conflictTerm = log[prevLogIndex].Term,
i := 0
for ; i < logSize; i++ {//and then search its log for
if rf.log[i].Term == reply.ConflictTerm {//the first index whose entry has term equal to conflictTerm
reply.ConflictIndex = i
break
}
}
}
return
}
//2. Reply false if term < currentTerm (§5.1)
if args.Term < rf.currentTerm {return}
index := args.PrevLogIndex
for i := 0; i < len(args.Entries); i++ {
index++
if index < logSize {
if rf.log[index].Term == args.Entries[i].Term {
continue
} else {//3. If an existing entry conflicts with a new one (same index but different terms),
rf.log = rf.log[:index]//delete the existing entry and all that follow it (§5.3)
}
}
rf.log = append(rf.log,args.Entries[i:]...) //4. Append any new entries not already in the log
rf.persist()
break;
}
//5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
if args.LeaderCommit > rf.commitIndex {
rf.commitIndex = Min(args.LeaderCommit ,rf.getLastLogIdx())
rf.updateLastApplied()
}
reply.Success = true
}
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
return ok
}
//Leader Section:
func (rf *Raft) startAppendLog() {
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
go func(idx int) {
for {
rf.mu.Lock();
if rf.state != Leader {
rf.mu.Unlock()
return
} //send initial empty AppendEntries RPCs (heartbeat) to each server
args := AppendEntriesArgs{
rf.currentTerm,
rf.me,
rf.getPrevLogIdx(idx),
rf.getPrevLogTerm(idx),
//If last log index ≥ nextIndex for a follower:send AppendEntries RPC with log entries starting at nextIndex
//nextIndex > last log index, rf.log[rf.nextIndex[idx]:] will be empty then like a heartbeat
append(make([]Log,0),rf.log[rf.nextIndex[idx]:]...),
rf.commitIndex,
}
rf.mu.Unlock()
reply := &AppendEntriesReply{}
ret := rf.sendAppendEntries(idx, &args, reply)
rf.mu.Lock();
if !ret || rf.state != Leader || rf.currentTerm != args.Term {
rf.mu.Unlock()
return
}
if reply.Term > rf.currentTerm {//all server rule 1 If RPC response contains term T > currentTerm:
rf.beFollower(reply.Term) // set currentTerm = T, convert to follower (§5.1)
rf.mu.Unlock()
return
}
if reply.Success {//If successful:update nextIndex and matchIndex for follower
rf.matchIndex[idx] = args.PrevLogIndex + len(args.Entries)
rf.nextIndex[idx] = rf.matchIndex[idx] + 1
rf.updateCommitIndex()
rf.mu.Unlock()
return
} else { //If AppendEntries fails because of log inconsistency: decrement nextIndex and retry
tarIndex := reply.ConflictIndex //If it does not find an entry with that term
if reply.ConflictTerm != NULL {
logSize := len(rf.log) //first search its log for conflictTerm
for i := 0; i < logSize; i++ {//if it finds an entry in its log with that term,
if rf.log[i].Term != reply.ConflictTerm { continue }
for i < logSize && rf.log[i].Term == reply.ConflictTerm { i++ }//set nextIndex to be the one
tarIndex = i //beyond the index of the last entry in that term in its log
}
}
rf.nextIndex[idx] = tarIndex;
rf.mu.Unlock()
}
}
}(i)
}
}
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
func (rf *Raft) Start(command interface{}) (int, int, bool) {
rf.mu.Lock()
defer rf.mu.Unlock()
index := -1
term := rf.currentTerm
isLeader := (rf.state == Leader)
//If command received from client: append entry to local log, respond after entry applied to state machine (§5.3)
if isLeader {
index = rf.getLastLogIdx() + 1
newLog := Log{
rf.currentTerm,
command,
}
rf.log = append(rf.log,newLog)
rf.persist()
}
return index, term, isLeader
}
//If there exists an N such that N > commitIndex,
// a majority of matchIndex[i] ≥ N, and log[N].term == currentTerm: set commitIndex = N (§5.3, §5.4).
func (rf *Raft) updateCommitIndex() {
rf.matchIndex[rf.me] = len(rf.log) - 1
copyMatchIndex := make([]int,len(rf.matchIndex))
copy(copyMatchIndex,rf.matchIndex)
sort.Sort(sort.Reverse(sort.IntSlice(copyMatchIndex)))
N := copyMatchIndex[len(copyMatchIndex)/2]
if N > rf.commitIndex && rf.log[N].Term == rf.currentTerm {
rf.commitIndex = N
rf.updateLastApplied()
}
}
func (rf *Raft) beLeader() {
if rf.state != Candidate {
return
}
rf.state = Leader
//initialize leader data
rf.nextIndex = make([]int,len(rf.peers))
rf.matchIndex = make([]int,len(rf.peers))//initialized to 0
for i := 0; i < len(rf.nextIndex); i++ {//(initialized to leader last log index + 1)
rf.nextIndex[i] = rf.getLastLogIdx() + 1
}
}
//end Leader section
//Candidate Section:
// If AppendEntries RPC received from new leader: convert to follower implemented in AppendEntries RPC Handler
func (rf *Raft) beCandidate() { //Reset election timer are finished in caller
rf.state = Candidate
rf.currentTerm++ //Increment currentTerm
rf.votedFor = rf.me //vote myself first
rf.persist()
//ask for other's vote
go rf.startElection() //Send RequestVote RPCs to all other servers
}
//If election timeout elapses: start new election handled in caller
func (rf *Raft) startElection() {
rf.mu.Lock()
args := RequestVoteArgs{
rf.currentTerm,
rf.me,
rf.getLastLogIdx(),
rf.getLastLogTerm(),
};
rf.mu.Unlock()
var votes int32 = 1;
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
go func(idx int) {
reply := &RequestVoteReply{}
ret := rf.sendRequestVote(idx,&args,reply)
if ret {
rf.mu.Lock()
defer rf.mu.Unlock()
if reply.Term > rf.currentTerm {
rf.beFollower(reply.Term)
return
}
if rf.state != Candidate || rf.currentTerm != args.Term{
return
}
if reply.VoteGranted {
atomic.AddInt32(&votes,1)
} //If votes received from majority of servers: become leader
if atomic.LoadInt32(&votes) > int32(len(rf.peers) / 2) {
rf.beLeader()
send(rf.voteCh) //after be leader, then notify 'select' goroutine will sending out heartbeats immediately
}
}
}(i)
}
}
//end Candidate section
//Follower Section:
func (rf *Raft) beFollower(term int) {
rf.state = Follower
rf.votedFor = NULL
rf.currentTerm = term
rf.persist()
}
//end Follower section
//all server rule : If commitIndex > lastApplied: increment lastApplied, apply log[lastApplied] to state machine
func (rf *Raft) updateLastApplied() {
for rf.lastApplied < rf.commitIndex {
rf.lastApplied++
curLog := rf.log[rf.lastApplied]
applyMsg := ApplyMsg{
true,
curLog.Command,
rf.lastApplied,
}
rf.applyCh <- applyMsg
}
}
// the tester calls Kill() when a Raft instance won't be needed again.
func (rf *Raft) Kill() {
send(rf.killCh)
}
//Helper function
func send(ch chan bool) {
select {
case <-ch: //if already set, consume it then resent to avoid block
default:
}
ch <- true
}
func (rf *Raft) getPrevLogIdx(i int) int {
return rf.nextIndex[i] - 1
}
func (rf *Raft) getPrevLogTerm(i int) int {
prevLogIdx := rf.getPrevLogIdx(i)
if prevLogIdx < 0 {
return -1
}
return rf.log[prevLogIdx].Term
}
func (rf *Raft) getLastLogIdx() int {
return len(rf.log) - 1
}
func (rf *Raft) getLastLogTerm() int {
idx := rf.getLastLogIdx()
if idx < 0 {
return -1
}
return rf.log[idx].Term
}
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
rf.state = Follower
rf.currentTerm = 0
rf.votedFor = NULL
rf.log = make([]Log,1) //(first index is 1)
rf.commitIndex = 0
rf.lastApplied = 0
rf.applyCh = applyCh
//because gorountne only send the chan to below goroutine,to avoid block, need 1 buffer
rf.voteCh = make(chan bool,1)
rf.appendLogCh = make(chan bool,1)
rf.killCh = make(chan bool,1)
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
//because from hint The tester requires that the leader send heartbeat RPCs no more than ten times per second.
heartbeatTime := time.Duration(100) * time.Millisecond
//from hint :You'll need to write code that takes actions periodically or after delays in time.
// The easiest way to do this is to create a goroutine with a loop that calls time.Sleep().
go func() {
for {
select {
case <-rf.killCh:
return
default:
}
electionTime := time.Duration(rand.Intn(200) + 300) * time.Millisecond
rf.mu.Lock()
state := rf.state
rf.mu.Unlock()
switch state {
case Follower, Candidate:
select {
case <-rf.voteCh:
case <-rf.appendLogCh:
case <-time.After(electionTime):
rf.mu.Lock()
rf.beCandidate() //becandidate, Reset election timer, then start election
rf.mu.Unlock()
}
case Leader:
rf.startAppendLog()
time.Sleep(heartbeatTime)
}
}
}()
return rf
}