第一步 閱讀4B的文檔
弄清楚里面的每個(gè)段落
第二步 基于LAB3师倔,寫出可以通過(guò)單GROUP的代碼
Your first task is to pass the very first shardkv test. In this test, there is only a single assignment of shards, so your code should be very similar to that of your Lab 3 server. The biggest modification will be to have your server detect when a configuration happens and start accepting requests whose keys match shards that it now owns.
基于LAB3洞斯,把代碼可以抄的都抄過(guò)去
抄代碼就不展示了∠园荩基本大多數(shù)都是把代碼復(fù)制過(guò)去。
因?yàn)檫@邊的CLIENT 給了代碼 是需要看ERR這個(gè)屬性的爹袁。
所以和LAB3不同(那里我沒(méi)用這個(gè)REPLY ERR的屬性)远荠,需要加上返回值。
很快失息,測(cè)試1通過(guò)了
第三步 增加拉CONFIG譬淳,和拒絕不屬于自己的SHARD的代碼
來(lái)自HINT
Add code to server.go to periodically fetch the latest configuration from the shardmaster, and add code to reject client requests if the receiving group isn't responsible for the client's key's shard. You should still pass the first test.
測(cè)試之后,依然確表锞ぃ可以過(guò)第一個(gè)TEST
第四步 思考
思考 CONFIG變化后邻梆,如何轉(zhuǎn)移SHARD
我的思考是這樣的。如果一個(gè)REPLICA GROUP A得到一個(gè)SHARD 1蛤迎,對(duì)應(yīng)B 失去一個(gè)SHARD 1
如果是A檢測(cè)到多了确虱,去等待別人來(lái)發(fā)給我,會(huì)比較被動(dòng)替裆。因?yàn)椴恢酪榷嗑谩?br>
其次B還需要發(fā)現(xiàn)自己失去SHARD 1后校辩,要主動(dòng)去發(fā)給A窘问,這增加了B 的工作量。
因?yàn)槲覀冞@邊在做SHARD MIGRATION的時(shí)候宜咒,是不能響應(yīng)請(qǐng)求惠赫。但對(duì)B來(lái)說(shuō),他可以立刻更新CONFIG故黑,即使沒(méi)把SHARD 1發(fā)送出去儿咱,他也是可以響應(yīng)請(qǐng)求。
但對(duì)A來(lái)說(shuō)场晶,一定要拿到SHARD 1后混埠,它才可以繼續(xù)服務(wù)。
基于上述思考诗轻,決定讓A去問(wèn)B要SHARD钳宪,這樣會(huì)簡(jiǎn)化設(shè)計(jì)。因?yàn)檫@樣做B可以如果發(fā)現(xiàn)了新的CONFIG,可以直接更新讓它立刻生效扳炬。A需要等待PULL成功后吏颖,更新CONFIG讓它生效。
Reconfiguration 會(huì)影響到 PutAppend/Get恨樟,因此同樣需要利用 raft 保證 group 內(nèi)的一致性半醉,確保集群內(nèi)完成了之前的操作后同時(shí)進(jìn)行Reconfiguration;
要思考的第二個(gè)點(diǎn)劝术,是傳輸SHARD的RPC缩多。
首先SHARD DATA是一定要發(fā)過(guò)去的。
但是只是發(fā)SHARD DATA是不夠的养晋。
比如一個(gè)APPEND REQUEST 在向A SERVER發(fā)送的時(shí)候,TIMEOUT了瞧壮。這個(gè)時(shí)候A server 已經(jīng)做了這個(gè)更新操作。
在這個(gè)點(diǎn)之后匙握,Reconfiguration 發(fā)生,CLIENT 去問(wèn)B SERVER發(fā)送APPEND REQ陈轿。如果只是SHARD DATA過(guò)去圈纺。會(huì)造成APPEND2次。
所以我們還需要把去重的MAP也一起發(fā)過(guò)去麦射。
發(fā)過(guò)去的參數(shù)除了要訴說(shuō)我要哪個(gè)SHARD之外蛾娶,還需要加上CONFIG NUM,因?yàn)橛锌赡芪野l(fā)的CONFIG NUM比那邊還要大潜秋,說(shuō)明那邊的CONFIG還沒(méi)同步到蛔琅。
基于上述思路。我設(shè)計(jì)的RPC如下峻呛。
第五步 實(shí)現(xiàn)MIGRATE SHARD RPC HANDLER
這里有個(gè)很重要的思路罗售,每個(gè)RAFT GROUP都是由LEADER負(fù)責(zé)發(fā)送和接受RPC辜窑。FOLLOWER只負(fù)責(zé)從APPLY MSG里去和LEADER SYNC狀態(tài)。
還有一個(gè)點(diǎn)寨躁,就是我們不能直接從DB里去取數(shù)據(jù)穆碎,如果我們沒(méi)有實(shí)現(xiàn)清理數(shù)據(jù)的前提下,因?yàn)閿?shù)據(jù)不清理职恳。所以我們會(huì)有多的數(shù)據(jù)所禀,想象一下。我們先接受SHARD1放钦,然后不接受色徘,再重新接受SHARD1,此時(shí)做遷移操禀,會(huì)是一個(gè)并集褂策。而我們只是希望是重新接受的那部分〈仓基于上述考慮辙培。我們需要基于每一個(gè)CONFIG,單獨(dú)把要遷移的數(shù)據(jù)給抽出來(lái)邢锯。這樣依據(jù)CONFIG來(lái)做遷移扬蕊。
第六步 思考難點(diǎn)
如何去PULL DATA?如果我們選擇讓LEADER去交互丹擎,我們必須要HANDLER RAFT Leader掛掉尾抑,得有新的LEADER來(lái)負(fù)責(zé)PULL DATA。
所以在所有節(jié)點(diǎn)上必須得存好要問(wèn)哪里去PULL DATA蒂培。如果PULL到再愈,我們需要確保LEADER會(huì)往RAFT里發(fā)CMD(這個(gè)CMD是讓節(jié)點(diǎn)同步數(shù)據(jù),同時(shí)刪掉那個(gè)維護(hù)的哪里去PULL DATA的地方)
而且我們必須額外開(kāi)一個(gè)后臺(tái)進(jìn)程與循環(huán)的做這件事护戳。不然LEADER轉(zhuǎn)移過(guò)去之后翎冲,就沒(méi)有人PULL DATA了。 因?yàn)镻ULL DATA 這件事是沒(méi)有CLIENT超時(shí)重試的媳荒。
因?yàn)橐笈_(tái)循環(huán)去PULL DATA抗悍,我們拿到DATA后,送進(jìn)RAFT钳枕,再進(jìn)入到APPLY CH缴渊,需要所有的節(jié)點(diǎn)都可以同步這個(gè)數(shù)據(jù)。一旦同步成功鱼炒,我們需要清理這個(gè)要等待的數(shù)據(jù)衔沼。這樣后臺(tái)線程可以少發(fā)很多無(wú)用的RPC。
同時(shí)我們?cè)谒饕獢?shù)據(jù)的時(shí)候也要知道往哪個(gè)REPLICA GROUP要。
第7步
目前我們已經(jīng)再LEADER端指蚁,把收到的新的CONFIG和拿到的MIGRATION DATA打給放進(jìn)RAFT的LOG去做線性一致的排序菩佑。
所以當(dāng)這個(gè)2個(gè)消息從APPLY MSG出來(lái)的時(shí)候,需要去做一些事情欣舵。
為此擎鸠,我單獨(dú)開(kāi)了一個(gè)函數(shù)去寫APPLY的邏輯
第8步 實(shí)現(xiàn)APPLY MSG 是MIGRATION DATA REPLY
這里的點(diǎn)(后面大量調(diào)試獲得的),因?yàn)镽EPLY 發(fā)到RAFT里面缘圈,雖然有順序劣光,但返回的時(shí)候順序可能是亂的。比如現(xiàn)在我的CONFIG已經(jīng)更新到9糟把,這個(gè)時(shí)候RAFT才把CONFIG的6 返回回來(lái)绢涡。我們應(yīng)該直接忽略這個(gè)版本。如果更新了遣疯,就會(huì)產(chǎn)生不一致雄可。
那么依據(jù)亂序思想,我們不得不CHECK 就是當(dāng)前REPLY的CONFIG版本號(hào)必須是當(dāng)前CONFIG版本號(hào)小一個(gè)缠犀。
為什么数苫?
這里我們?cè)谑盏紺ONFIG 變更,我們就會(huì)刷新CONFIG辨液。但是此時(shí)CONFIG刷新之后虐急,我們會(huì)更新COME IN SHARD,隨后后臺(tái)線程會(huì)去PULL滔迈。從更新COME IN SHARD到數(shù)據(jù)SHARD過(guò)來(lái)止吁,這段時(shí)間內(nèi),我們必須得拒絕掉所有的索要該SHARD的請(qǐng)求燎悍。所以我們不能直接從CONFIG來(lái)判斷是不是WRONG GROUP敬惦。
至此,我們需要額外再維護(hù)一個(gè)我現(xiàn)在能HANDLER哪些SHARD的數(shù)據(jù)結(jié)構(gòu)谈山。
那么發(fā)出去的SHARD徙硅,我可以直接從這個(gè)數(shù)據(jù)結(jié)構(gòu)里刪掉有勾。要進(jìn)來(lái)的話洲劣,等真的進(jìn)來(lái)了祖屏,再添加到這個(gè)數(shù)據(jù)結(jié)構(gòu)中捕仔。
判斷是不是WRONG GROUP呜袁,也依據(jù)這個(gè)數(shù)據(jù)結(jié)構(gòu)來(lái)看呆细。
第9步 實(shí)現(xiàn)updateInAndOutDataShard
這邊我們會(huì)根據(jù)新的CONFIG來(lái)棒坏,判斷自己要送出去的數(shù)據(jù)是哪些妨猩,自己要接受進(jìn)來(lái)的數(shù)組是哪些潜叛。在我的設(shè)計(jì)里這2個(gè)數(shù)據(jù)結(jié)構(gòu)必要性在第5,6步討論過(guò)了。
第十步 判斷WRONG GROUP的時(shí)機(jī)
在前面的版本中威兜,我們是在SERVER端接受到請(qǐng)求的時(shí)候销斟,就直接去依據(jù)CONFIG判斷WRONG GROUP。現(xiàn)在我們改成依據(jù)MYSHARD來(lái)看椒舵,但是這還是不足的蚂踊。
還記得我們?cè)谧鯨AB 3的時(shí)候,判斷去重笔宿,必須得再消息回來(lái)的時(shí)候再看一次犁钟。 因?yàn)榭赡茉谡?qǐng)求發(fā)送的時(shí)候,數(shù)據(jù)還在REPLICA GROUP 1泼橘±远可是到消息從RAFT返回來(lái)的時(shí)候,當(dāng)中發(fā)生過(guò)更新CONFIG炬灭。數(shù)據(jù)不再GROUP 1了醋粟。所以要把判斷WRONG GROUP的邏輯,加在數(shù)據(jù)返回層重归。
同時(shí)因?yàn)閿?shù)據(jù)會(huì)在APPLY CH收到新的CONFIG米愿,一部分要TO OUT的數(shù)據(jù)就會(huì)從DB里DELETE掉。為了確保NOTIFY CH的傳輸過(guò)程中鼻吮,這個(gè)DB的更改不會(huì)影響到實(shí)際的GET的返回值育苟。我們需要在接到APPLY CH的時(shí)候就把結(jié)果給注入到OP里。不然等OP發(fā)過(guò)去再?gòu)腄B拿狈网,有一定概率此時(shí)另一個(gè)線程已經(jīng)再DELETE DB了宙搬。
同時(shí)根據(jù)這個(gè)思路,我把SHARD MASTER的QUERY 也加在返回層來(lái)做拓哺。
第11步 初始化新加的屬性
第12步 更新POLL NEW CONFIG代碼勇垛,需要一個(gè)個(gè)更新
來(lái)自HINT
Process re-configurations one at a time, in order.
同時(shí)注意如果當(dāng)前CONFIG,那些需要轉(zhuǎn)移的SHARD還沒(méi)做完士鸥。不要立刻去拿下一個(gè)CONFIG闲孤。
第13步 測(cè)試JOIN AND LEAVE
發(fā)現(xiàn)有時(shí)可以過(guò),有時(shí)過(guò)不了會(huì)阻塞烤礁。
BUG 1 死鎖
通過(guò)幾小時(shí)的調(diào)試發(fā)現(xiàn)讼积,是一個(gè)3維死鎖。首先RAFT里面拿了RAFT的鎖脚仔,阻塞在APPLY CH那勤众。APPLY CH的后臺(tái)線程阻塞在KV SERVER的鎖上。 還有一個(gè)PULL CONFIG的線程鲤脏,持有了KV SERVER的鎖们颜,阻塞在RAFT的鎖上吕朵。
FIX方法,交換代碼順序窥突。
測(cè)試通過(guò)
但是寫了這么多代碼努溃,很多地方都沒(méi)注意保護(hù)共享變量。所以用TEST DATA RACE的時(shí)候會(huì)出問(wèn)題阻问。
檢查思路梧税,先看3個(gè)后臺(tái)進(jìn)程,隨后看幾個(gè)RPC handler
這邊就自己加一下鎖吧称近。
GO TEST RACE OK之后第队,會(huì)在第三個(gè)測(cè)試敗掉。是SNAPSHOT
我們需要存儲(chǔ)更多的狀態(tài)進(jìn)SNAPSHOT煌茬。
第14步 實(shí)現(xiàn)新的SNAPSHOT
再直接測(cè)試斥铺,發(fā)現(xiàn)阻塞在UNRELIABLE 3
BUG 2 一處地方?jīng)]有釋放鎖
修復(fù)后重新對(duì)這個(gè)CASE單獨(dú)測(cè)試100次通過(guò),測(cè)全集坛善。只剩下CHANLLEGE 1晾蜘,DELETE的TASK了
第15步 思考如何刪除不必要的狀態(tài)
在上面的實(shí)現(xiàn)里,我們開(kāi)了3個(gè)數(shù)據(jù)結(jié)構(gòu)眠屎,一個(gè)是TO OUT剔交,一個(gè)是COME IN,一個(gè)是MY SHARD改衩;
第三個(gè)是固定大小的岖常。不用考慮
第二個(gè),我們已經(jīng)再接受到DATA之后會(huì)去刪除它葫督。
唯一沒(méi)有回收的就是第一個(gè)竭鞍。
最NAIVE的實(shí)現(xiàn)是當(dāng)我們把數(shù)據(jù)當(dāng)做REPLY發(fā)過(guò)去的時(shí)候,就直接刪掉橄镜。這是危險(xiǎn)的偎快。因?yàn)楹苡锌赡苓@個(gè)消息會(huì)丟失,被那邊服務(wù)器拒絕洽胶,造成這個(gè)數(shù)據(jù)就永遠(yuǎn)不會(huì)被回收晒夹。
正確的做法是等到對(duì)方服務(wù)器,成功接收了DATA姊氓,然后刪除了對(duì)應(yīng)的COME IN丐怯,這個(gè)時(shí)候應(yīng)該發(fā)REQUEST告訴TO OUT一方,你可以安全的把TO OUT里的這塊DATA給回收了翔横。
但是依然存在RPC會(huì)丟失的情況读跷。和PULL的思想一樣。(用一個(gè)COME IN LIST+ 后臺(tái)線程禾唁,來(lái)不斷重試舔亭,成功時(shí)候刪除COME IN LIST內(nèi)容些膨,就不再去PULL直到有新的COME IN來(lái)。失敗的話钦铺,因?yàn)镃OME IN 內(nèi)容還在,就會(huì)自動(dòng)重試肢预,不怕網(wǎng)絡(luò)不穩(wěn)定)
那么我針對(duì)這個(gè)CASE矛洞,用相同的套路。后臺(tái)GC線程+Garbage List.
具體思路就是當(dāng)COME IN 的DATA收到后烫映,我們要把這塊數(shù)據(jù)標(biāo)記進(jìn)Garbage List沼本。 后臺(tái)GC線程發(fā)現(xiàn)Garbage List有內(nèi)容,就會(huì)往對(duì)應(yīng)的GROUP發(fā)送GC RPC锭沟。對(duì)應(yīng)的GROUP清理成功后抽兆,REPLY告知。我們把Garbage List對(duì)應(yīng)的內(nèi)容刪除族淮。
同樣我們依然只和LEADER交互辫红,并且利用RAFT LOG,來(lái)確保所有節(jié)點(diǎn)都成功刪除GARBAGE祝辣,再RPC回復(fù)SUCCESS
第16步 寫GC RPC HANDLER贴妻,抽一個(gè)TEMPLATE
發(fā)現(xiàn)可以用ERR 里面加一個(gè)WRONGLEADER來(lái)代表LEADER不對(duì)。就可以去掉一個(gè)參數(shù)蝙斜。
當(dāng)OP TYPE是GC的時(shí)候名惩,KEY 是CONFIG NUM,SEQNUM是SHARD孕荠。
第17步 實(shí)現(xiàn)GC
第18步 實(shí)現(xiàn)GC后臺(tái)進(jìn)程
第19步 往GARBAGE里添加值
第20步娩鹉,更新SNAPSHOT
這里小伙伴自行更新吧
測(cè)試通過(guò)
第21步
因?yàn)槲业腞EPLICA GROUP里會(huì)往MASTER 發(fā)送QUERY請(qǐng)求,這個(gè)時(shí)候可能會(huì)造成LAST LEADER的DATA RACE稚伍。
所以我用原子方法改寫弯予。
第22步 CONCISE代碼
1.我把WRONDLEADER給去掉了。同時(shí)用ERR 的WRONGLEADER來(lái)表示槐瑞。
2.把幾個(gè)RPC HANDLER用TEMPLATE 提取公有邏輯
最終430 行代碼
GO TEST 測(cè)試200次
這個(gè)測(cè)試不適合并行熙涤,因?yàn)闀?huì)大量開(kāi)線程在做。并行測(cè)試會(huì)造成有些CASE跑的巨慢困檩。所以串行測(cè)試了祠挫。
同時(shí)CONFIG里因?yàn)镸ASTER的資源沒(méi)有回收。越到后面TEST 跑的越慢悼沿,我加了如下代碼來(lái)提速測(cè)試
基本跑完一整套是2分鐘
測(cè)試200次的結(jié)果 是
BUG 3
TestChallenge2Unaffected 會(huì)有1/50的概率阻塞等舔。
經(jīng)過(guò)打LOG 發(fā)現(xiàn),是還沒(méi)來(lái)得及把所有DATA SHARD完糟趾,超過(guò)了1秒慌植,之后就有數(shù)據(jù)再也MIGRATE不過(guò)來(lái)甚牲。造成拿不到而阻塞。
這里分享一個(gè)打LOG的技巧蝶柿,避免淹沒(méi)在茫茫LOG海里丈钙。就是出了問(wèn)題,再打LOG
原因如下:
下圖這個(gè)4號(hào)數(shù)據(jù)塊 在測(cè)試?yán)飳儆?01的OWN交汤,但是還沒(méi)來(lái)得及拿到雏赦,100的網(wǎng)就斷了。再也取不到了芙扎。
解決方案星岗,加快PULL的頻率
加快QUERY CONFIG的速度。思路是如果是拿已知的CONFIG戒洼,因?yàn)镃ONFIG的APPEND不會(huì)修改俏橘,所以可以直接返回。
縮短MASTER CLIENT的睡眠時(shí)間圈浇。
測(cè)試200次后無(wú)阻塞寥掐。
SHARD KV 測(cè)試200次通過(guò)
測(cè)試腳本
#!/bin/bash
export GOPATH="/home/zyx/Desktop/mit6.824/6.824"
export PATH="$PATH:/usr/lib/go-1.9/bin"
rm res -rf
mkdir res
for ((i = 0; i < 200; i++))
do
echo $i
(go test) > ./res/$i
grep -nr "FAIL.*" res
done
回歸測(cè)試SHARD MASTER500次通過(guò)
回歸測(cè)試RAFT 300次通過(guò)
有2次時(shí)之前說(shuō)的不是代碼問(wèn)題的KNOWN ISSUE,具體參考文集的2C部分
回歸測(cè)試KVRAFT 210次通過(guò)
CONCISE SERVER
package shardkv
import (
"bytes"
"labrpc"
"log"
"shardmaster"
"strconv"
"time"
)
import "raft"
import "sync"
import "labgob"
type Op struct {
OpType string "operation type(eg. put/append/gc/get)"
Key string "key for normal, config num for gc"
Value string
Cid int64 "cid for put/append, operation uid for get/gc"
SeqNum int "seqnum for put/append, shard for gc"
}
type ShardKV struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
make_end func(string) *labrpc.ClientEnd
gid int
masters []*labrpc.ClientEnd
maxraftstate int // snapshot if log grows this big
// Your definitions here.
mck *shardmaster.Clerk
cfg shardmaster.Config
persist *raft.Persister
db map[string]string
chMap map[int]chan Op
cid2Seq map[int64]int
toOutShards map[int]map[int]map[string]string "cfg num -> (shard -> db)"
comeInShards map[int]int "shard->config number"
myShards map[int]bool "to record which shard i can offer service"
garbages map[int]map[int]bool "cfg number -> shards"
killCh chan bool
}
func (kv *ShardKV) Get(args *GetArgs, reply *GetReply) {
originOp := Op{"Get",args.Key,"",Nrand(),0}
reply.Err,reply.Value = kv.templateStart(originOp)
}
func (kv *ShardKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
originOp := Op{args.Op,args.Key,args.Value,args.Cid,args.SeqNum}
reply.Err,_ = kv.templateStart(originOp)
}
func (kv *ShardKV) templateStart(originOp Op) (Err, string) {
index,_,isLeader := kv.rf.Start(originOp)
if isLeader {
ch := kv.put(index, true)
op := kv.beNotified(ch, index)
if equalOp(originOp, op) { return OK, op.Value }
if op.OpType == ErrWrongGroup { return ErrWrongGroup, "" }
}
return ErrWrongLeader,""
}
func (kv *ShardKV) GarbageCollection(args *MigrateArgs, reply *MigrateReply) {
reply.Err = ErrWrongLeader
if _, isLeader := kv.rf.GetState(); !isLeader {return}
kv.mu.Lock()
defer kv.mu.Unlock()
if _,ok := kv.toOutShards[args.ConfigNum]; !ok {return}
if _,ok := kv.toOutShards[args.ConfigNum][args.Shard]; !ok {return}
originOp := Op{"GC",strconv.Itoa(args.ConfigNum),"",Nrand(),args.Shard}
kv.mu.Unlock()
reply.Err,_ = kv.templateStart(originOp)
kv.mu.Lock()
}
func (kv *ShardKV) ShardMigration(args *MigrateArgs, reply *MigrateReply) {
reply.Err, reply.Shard, reply.ConfigNum = ErrWrongLeader, args.Shard, args.ConfigNum
if _,isLeader := kv.rf.GetState(); !isLeader {return}
kv.mu.Lock()
defer kv.mu.Unlock()
reply.Err = ErrWrongGroup
if args.ConfigNum >= kv.cfg.Num {return}
reply.Err,reply.ConfigNum, reply.Shard = OK, args.ConfigNum, args.Shard
reply.DB, reply.Cid2Seq = kv.deepCopyDBAndDedupMap(args.ConfigNum,args.Shard)
}
func (kv *ShardKV) deepCopyDBAndDedupMap(config int,shard int) (map[string]string, map[int64]int) {
db2 := make(map[string]string)
cid2Seq2 := make(map[int64]int)
for k, v := range kv.toOutShards[config][shard] {
db2[k] = v
}
for k, v := range kv.cid2Seq {
cid2Seq2[k] = v
}
return db2, cid2Seq2
}
func (kv *ShardKV) beNotified(ch chan Op,index int) Op{
select {
case notifyArg,ok := <- ch :
if ok {
close(ch)
}
kv.mu.Lock()
delete(kv.chMap,index)
kv.mu.Unlock()
return notifyArg
case <- time.After(time.Duration(1000)*time.Millisecond):
return Op{}
}
}
func (kv *ShardKV) put(idx int,createIfNotExists bool) chan Op{
kv.mu.Lock()
defer kv.mu.Unlock()
if _, ok := kv.chMap[idx]; !ok {
if !createIfNotExists {return nil}
kv.chMap[idx] = make(chan Op,1)
}
return kv.chMap[idx]
}
func equalOp(a Op, b Op) bool{
return a.Key == b.Key && a.OpType == b.OpType && a.SeqNum == b.SeqNum && a.Cid == b.Cid
}
func (kv *ShardKV) Kill() {
kv.rf.Kill()
select{
case <-kv.killCh:
default:
}
kv.killCh <- true
}
func (kv *ShardKV) readSnapShot(snapshot []byte) {
kv.mu.Lock()
defer kv.mu.Unlock()
if snapshot == nil || len(snapshot) < 1 {return}
r := bytes.NewBuffer(snapshot)
d := labgob.NewDecoder(r)
var db map[string]string
var cid2Seq map[int64]int
var toOutShards map[int]map[int]map[string]string
var comeInShards map[int]int
var myShards map[int]bool
var garbages map[int]map[int]bool
var cfg shardmaster.Config
if d.Decode(&db) != nil || d.Decode(&cid2Seq) != nil || d.Decode(&comeInShards) != nil ||
d.Decode(&toOutShards) != nil || d.Decode(&myShards) != nil || d.Decode(&cfg) != nil ||
d.Decode(&garbages) != nil {
log.Fatal("readSnapShot ERROR for server %v",kv.me)
} else {
kv.db, kv.cid2Seq, kv.cfg = db, cid2Seq, cfg
kv.toOutShards, kv.comeInShards, kv.myShards, kv.garbages = toOutShards,comeInShards,myShards,garbages
}
}
func (kv *ShardKV) needSnapShot() bool {
kv.mu.Lock()
defer kv.mu.Unlock()
threshold := 10
return kv.maxraftstate > 0 &&
kv.maxraftstate - kv.persist.RaftStateSize() < kv.maxraftstate/threshold
}
func (kv *ShardKV) doSnapShot(index int) {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
kv.mu.Lock()
e.Encode(kv.db)
e.Encode(kv.cid2Seq)
e.Encode(kv.comeInShards)
e.Encode(kv.toOutShards)
e.Encode(kv.myShards)
e.Encode(kv.cfg)
e.Encode(kv.garbages)
kv.mu.Unlock()
kv.rf.DoSnapShot(index,w.Bytes())
}
func (kv *ShardKV) tryPollNewCfg() {
_, isLeader := kv.rf.GetState();
kv.mu.Lock()
if !isLeader || len(kv.comeInShards) > 0{
kv.mu.Unlock()
return
}
next := kv.cfg.Num + 1
kv.mu.Unlock()
cfg := kv.mck.Query(next)
if cfg.Num == next {
kv.rf.Start(cfg) //sync follower with new cfg
}
}
func (kv *ShardKV) tryGC() {
_, isLeader := kv.rf.GetState();
kv.mu.Lock()
if !isLeader || len(kv.garbages) == 0{
kv.mu.Unlock()
return
}
var wait sync.WaitGroup
for cfgNum, shards := range kv.garbages {
for shard := range shards {
wait.Add(1)
go func(shard int, cfg shardmaster.Config) {
defer wait.Done()
args := MigrateArgs{shard, cfg.Num}
gid := cfg.Shards[shard]
for _, server := range cfg.Groups[gid] {
srv := kv.make_end(server)
reply := MigrateReply{}
if ok := srv.Call("ShardKV.GarbageCollection", &args, &reply); ok && reply.Err == OK {
kv.mu.Lock()
defer kv.mu.Unlock()
delete(kv.garbages[cfgNum], shard)
if len(kv.garbages[cfgNum]) == 0 {
delete(kv.garbages, cfgNum)
}
}
}
}(shard, kv.mck.Query(cfgNum))
}
}
kv.mu.Unlock()
wait.Wait()
}
func (kv *ShardKV) tryPullShard() {
_, isLeader := kv.rf.GetState();
kv.mu.Lock()
if !isLeader || len(kv.comeInShards) == 0 {
kv.mu.Unlock()
return
}
var wait sync.WaitGroup
for shard, idx := range kv.comeInShards {
wait.Add(1)
go func(shard int, cfg shardmaster.Config) {
defer wait.Done()
args := MigrateArgs{shard, cfg.Num}
gid := cfg.Shards[shard]
for _, server := range cfg.Groups[gid] {
srv := kv.make_end(server)
reply := MigrateReply{}
if ok := srv.Call("ShardKV.ShardMigration", &args, &reply); ok && reply.Err == OK {
kv.rf.Start(reply)
}
}
}(shard, kv.mck.Query(idx))
}
kv.mu.Unlock()
wait.Wait()
}
func (kv *ShardKV) daemon(do func(), sleepMS int) {
for {
select {
case <-kv.killCh:
return
default:
do()
}
time.Sleep(time.Duration(sleepMS) * time.Millisecond)
}
}
func (kv *ShardKV) apply(applyMsg raft.ApplyMsg) {
if cfg, ok := applyMsg.Command.(shardmaster.Config); ok {
kv.updateInAndOutDataShard(cfg)
} else if migrationData, ok := applyMsg.Command.(MigrateReply); ok{
kv.updateDBWithMigrateData(migrationData)
}else {
op := applyMsg.Command.(Op)
if op.OpType == "GC" {
cfgNum,_ := strconv.Atoi(op.Key)
kv.gc(cfgNum,op.SeqNum);
} else {
kv.normal(&op)
}
if notifyCh := kv.put(applyMsg.CommandIndex,false); notifyCh != nil {
send(notifyCh,op)
}
}
if kv.needSnapShot() {
go kv.doSnapShot(applyMsg.CommandIndex)
}
}
func (kv *ShardKV) gc(cfgNum int, shard int) {
kv.mu.Lock()
defer kv.mu.Unlock()
if _, ok := kv.toOutShards[cfgNum]; ok {
delete(kv.toOutShards[cfgNum], shard)
if len(kv.toOutShards[cfgNum]) == 0 {
delete(kv.toOutShards, cfgNum)
}
}
}
func (kv *ShardKV) updateInAndOutDataShard(cfg shardmaster.Config) {
kv.mu.Lock()
defer kv.mu.Unlock()
if cfg.Num <= kv.cfg.Num { //only consider newer config
return
}
oldCfg, toOutShard := kv.cfg, kv.myShards
kv.myShards, kv.cfg = make(map[int]bool), cfg
for shard, gid := range cfg.Shards {
if gid != kv.gid {continue}
if _, ok := toOutShard[shard]; ok || oldCfg.Num == 0 {
kv.myShards[shard] = true
delete(toOutShard, shard)
} else {
kv.comeInShards[shard] = oldCfg.Num
}
}
if len(toOutShard) > 0 { // prepare data that needed migration
kv.toOutShards[oldCfg.Num] = make(map[int]map[string]string)
for shard := range toOutShard {
outDb := make(map[string]string)
for k, v := range kv.db {
if key2shard(k) == shard {
outDb[k] = v
delete(kv.db, k)
}
}
kv.toOutShards[oldCfg.Num][shard] = outDb
}
}
}
func (kv *ShardKV) updateDBWithMigrateData(migrationData MigrateReply) {
kv.mu.Lock()
defer kv.mu.Unlock()
if migrationData.ConfigNum != kv.cfg.Num-1 {return}
delete(kv.comeInShards, migrationData.Shard)
//this check is necessary, to avoid use kv.cfg.Num-1 to update kv.cfg.Num's shard
if _, ok := kv.myShards[migrationData.Shard]; !ok {
kv.myShards[migrationData.Shard] = true
for k, v := range migrationData.DB {
kv.db[k] = v
}
for k, v := range migrationData.Cid2Seq {
kv.cid2Seq[k] = Max(v,kv.cid2Seq[k])
}
if _, ok := kv.garbages[migrationData.ConfigNum]; !ok {
kv.garbages[migrationData.ConfigNum] = make(map[int]bool)
}
kv.garbages[migrationData.ConfigNum][migrationData.Shard] = true
}
}
func (kv *ShardKV) normal(op *Op) {
shard := key2shard(op.Key)
kv.mu.Lock()
if _, ok := kv.myShards[shard]; !ok {
op.OpType = ErrWrongGroup
} else {
maxSeq,found := kv.cid2Seq[op.Cid]
if !found || op.SeqNum > maxSeq {
if op.OpType == "Put" {
kv.db[op.Key] = op.Value
} else if op.OpType == "Append" {
kv.db[op.Key] += op.Value
}
kv.cid2Seq[op.Cid] = op.SeqNum
}
if op.OpType == "Get" {
op.Value = kv.db[op.Key]
}
}
kv.mu.Unlock()
}
func send(notifyCh chan Op,op Op) {
select{
case <-notifyCh:
default:
}
notifyCh <- op
}
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int, gid int, masters []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *ShardKV {
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
labgob.Register(Op{})
labgob.Register(MigrateArgs{})
labgob.Register(MigrateReply{})
labgob.Register(shardmaster.Config{})
kv := new(ShardKV)
kv.me = me
kv.maxraftstate = maxraftstate
kv.make_end = make_end
kv.gid = gid
kv.masters = masters
// Your initialization code here.
kv.persist = persister
// Use something like this to talk to the shardmaster:
kv.mck = shardmaster.MakeClerk(kv.masters)
kv.cfg = shardmaster.Config{}
kv.db = make(map[string]string)
kv.chMap = make(map[int]chan Op)
kv.cid2Seq = make(map[int64]int)
kv.toOutShards = make(map[int]map[int]map[string]string)
kv.comeInShards = make(map[int]int)
kv.myShards = make(map[int]bool)
kv.garbages = make(map[int]map[int]bool)
kv.readSnapShot(kv.persist.ReadSnapshot())
kv.applyCh = make(chan raft.ApplyMsg)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
kv.killCh = make(chan bool,1)
go kv.daemon(kv.tryPollNewCfg,50)
go kv.daemon(kv.tryPullShard,80)
go kv.daemon(kv.tryGC,100)
go func() {
for {
select {
case <- kv.killCh:
return
case applyMsg := <- kv.applyCh:
if !applyMsg.CommandValid {
kv.readSnapShot(applyMsg.SnapShot)
continue
}
kv.apply(applyMsg)
}
}
}()
return kv
}
CONCISE CLIENT
package shardkv
//
// client code to talk to a sharded key/value service.
//
// the client first talks to the shardmaster to find out
// the assignment of shards (keys) to groups, and then
// talks to the group that holds the key's shard.
//
import (
"labrpc"
)
import "crypto/rand"
import "math/big"
import "shardmaster"
import "time"
func key2shard(key string) int {
shard := 0
if len(key) > 0 {
shard = int(key[0])
}
shard %= shardmaster.NShards
return shard
}
func Nrand() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := rand.Int(rand.Reader, max)
x := bigx.Int64()
return x
}
type Clerk struct {
sm *shardmaster.Clerk
config shardmaster.Config
make_end func(string) *labrpc.ClientEnd
// You will have to modify this struct.
lastLeader int
id int64
seqNum int
}
func MakeClerk(masters []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.sm = shardmaster.MakeClerk(masters)
ck.make_end = make_end
// You'll have to add code here.
ck.id = Nrand()//give each client a unique identifier, and then have them
ck.seqNum = 0// tag each request with a monotonically increasing sequence number.
ck.lastLeader = 0
return ck
}
//
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
// You will have to modify this function.
//
func (ck *Clerk) Get(key string) string {
args := GetArgs{}
args.Key = key
for {
shard := key2shard(key)
gid := ck.config.Shards[shard]
if servers, ok := ck.config.Groups[gid]; ok {
// try each server for the shard.
for i := 0; i < len(servers); i++ {
si := (i + ck.lastLeader) % len(servers)
srv := ck.make_end(servers[si])
var reply GetReply
ok := srv.Call("ShardKV.Get", &args, &reply)
if ok && reply.Err == OK {
ck.lastLeader = si;
return reply.Value
}
if ok && (reply.Err == ErrWrongGroup) {
break
}
}
}
time.Sleep(100 * time.Millisecond)
// ask master for the latest configuration.
ck.config = ck.sm.Query(-1)
}
}
//
// shared by Put and Append.
// You will have to modify this function.
//
func (ck *Clerk) PutAppend(key string, value string, op string) {
args := PutAppendArgs{key,value,op,ck.id,ck.seqNum}
ck.seqNum++
for {
shard := key2shard(key)
gid := ck.config.Shards[shard]
if servers, ok := ck.config.Groups[gid]; ok {
for i := 0; i < len(servers); i++ {
si := (i + ck.lastLeader) % len(servers)
srv := ck.make_end(servers[si])
var reply PutAppendReply
ok := srv.Call("ShardKV.PutAppend", &args, &reply)
if ok && reply.Err == OK {
ck.lastLeader = si
return
}
if ok && reply.Err == ErrWrongGroup {
break
}
}
}
time.Sleep(100 * time.Millisecond)
// ask master for the latest configuration.
ck.config = ck.sm.Query(-1)
}
}
func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
ck.PutAppend(key, value, "Append")
}
CONCISE COMMON
package shardkv
//
// Sharded key/value server.
// Lots of replica groups, each running op-at-a-time paxos.
// Shardmaster decides which group serves each shard.
// Shardmaster may change shard assignment from time to time.
//
// You will have to modify these definitions.
//
const (
OK = "OK"
ErrWrongLeader = "ErrWrongLeader"
ErrWrongGroup = "ErrWrongGroup"
)
type Err string
// Put or Append
type PutAppendArgs struct {
Key string
Value string
Op string // "Put" or "Append"
Cid int64 "client unique id"
SeqNum int "each request with a monotonically increasing sequence number"
}
type PutAppendReply struct {
Err Err
}
type GetArgs struct {
Key string
}
type GetReply struct {
Err Err
Value string
}
type MigrateArgs struct {
Shard int
ConfigNum int
}
type MigrateReply struct {
Err Err
ConfigNum int
Shard int
DB map[string]string
Cid2Seq map[int64]int
}
func Max(x, y int) int {
if x > y {
return x
}
return y
}
最后再把全部代碼提交進(jìn)GITHUB汉额。待我把MAP REDUCE寫了一起吧曹仗。