8.MIT 6.824 LAB 4B(分布式shard database)

第一步 閱讀4B的文檔

弄清楚里面的每個(gè)段落

第二步 基于LAB3师倔,寫出可以通過(guò)單GROUP的代碼

Your first task is to pass the very first shardkv test. In this test, there is only a single assignment of shards, so your code should be very similar to that of your Lab 3 server. The biggest modification will be to have your server detect when a configuration happens and start accepting requests whose keys match shards that it now owns.

基于LAB3洞斯,把代碼可以抄的都抄過(guò)去


image.png

抄代碼就不展示了∠园荩基本大多數(shù)都是把代碼復(fù)制過(guò)去。

因?yàn)檫@邊的CLIENT 給了代碼 是需要看ERR這個(gè)屬性的爹袁。


image.png

所以和LAB3不同(那里我沒(méi)用這個(gè)REPLY ERR的屬性)远荠,需要加上返回值。


image.png

很快失息,測(cè)試1通過(guò)了


image.png

第三步 增加拉CONFIG譬淳,和拒絕不屬于自己的SHARD的代碼

來(lái)自HINT

Add code to server.go to periodically fetch the latest configuration from the shardmaster, and add code to reject client requests if the receiving group isn't responsible for the client's key's shard. You should still pass the first test.

image.png

image.png

image.png

image.png

測(cè)試之后,依然確表锞ぃ可以過(guò)第一個(gè)TEST

第四步 思考

思考 CONFIG變化后邻梆,如何轉(zhuǎn)移SHARD

我的思考是這樣的。如果一個(gè)REPLICA GROUP A得到一個(gè)SHARD 1蛤迎,對(duì)應(yīng)B 失去一個(gè)SHARD 1
如果是A檢測(cè)到多了确虱,去等待別人來(lái)發(fā)給我,會(huì)比較被動(dòng)替裆。因?yàn)椴恢酪榷嗑谩?br> 其次B還需要發(fā)現(xiàn)自己失去SHARD 1后校辩,要主動(dòng)去發(fā)給A窘问,這增加了B 的工作量。
因?yàn)槲覀冞@邊在做SHARD MIGRATION的時(shí)候宜咒,是不能響應(yīng)請(qǐng)求惠赫。但對(duì)B來(lái)說(shuō),他可以立刻更新CONFIG故黑,即使沒(méi)把SHARD 1發(fā)送出去儿咱,他也是可以響應(yīng)請(qǐng)求。
但對(duì)A來(lái)說(shuō)场晶,一定要拿到SHARD 1后混埠,它才可以繼續(xù)服務(wù)。
基于上述思考诗轻,決定讓A去問(wèn)B要SHARD钳宪,這樣會(huì)簡(jiǎn)化設(shè)計(jì)。因?yàn)檫@樣做B可以如果發(fā)現(xiàn)了新的CONFIG,可以直接更新讓它立刻生效扳炬。A需要等待PULL成功后吏颖,更新CONFIG讓它生效。

Reconfiguration 會(huì)影響到 PutAppend/Get恨樟,因此同樣需要利用 raft 保證 group 內(nèi)的一致性半醉,確保集群內(nèi)完成了之前的操作后同時(shí)進(jìn)行Reconfiguration;

要思考的第二個(gè)點(diǎn)劝术,是傳輸SHARD的RPC缩多。

首先SHARD DATA是一定要發(fā)過(guò)去的。
但是只是發(fā)SHARD DATA是不夠的养晋。
比如一個(gè)APPEND REQUEST 在向A SERVER發(fā)送的時(shí)候,TIMEOUT了瞧壮。這個(gè)時(shí)候A server 已經(jīng)做了這個(gè)更新操作。
在這個(gè)點(diǎn)之后匙握,Reconfiguration 發(fā)生,CLIENT 去問(wèn)B SERVER發(fā)送APPEND REQ陈轿。如果只是SHARD DATA過(guò)去圈纺。會(huì)造成APPEND2次。
所以我們還需要把去重的MAP也一起發(fā)過(guò)去麦射。

發(fā)過(guò)去的參數(shù)除了要訴說(shuō)我要哪個(gè)SHARD之外蛾娶,還需要加上CONFIG NUM,因?yàn)橛锌赡芪野l(fā)的CONFIG NUM比那邊還要大潜秋,說(shuō)明那邊的CONFIG還沒(méi)同步到蛔琅。
基于上述思路。我設(shè)計(jì)的RPC如下峻呛。


image.png

第五步 實(shí)現(xiàn)MIGRATE SHARD RPC HANDLER

這里有個(gè)很重要的思路罗售,每個(gè)RAFT GROUP都是由LEADER負(fù)責(zé)發(fā)送和接受RPC辜窑。FOLLOWER只負(fù)責(zé)從APPLY MSG里去和LEADER SYNC狀態(tài)。

還有一個(gè)點(diǎn)寨躁,就是我們不能直接從DB里去取數(shù)據(jù)穆碎,如果我們沒(méi)有實(shí)現(xiàn)清理數(shù)據(jù)的前提下,因?yàn)閿?shù)據(jù)不清理职恳。所以我們會(huì)有多的數(shù)據(jù)所禀,想象一下。我們先接受SHARD1放钦,然后不接受色徘,再重新接受SHARD1,此時(shí)做遷移操禀,會(huì)是一個(gè)并集褂策。而我們只是希望是重新接受的那部分〈仓基于上述考慮辙培。我們需要基于每一個(gè)CONFIG,單獨(dú)把要遷移的數(shù)據(jù)給抽出來(lái)邢锯。這樣依據(jù)CONFIG來(lái)做遷移扬蕊。


image.png

第六步 思考難點(diǎn)

如何去PULL DATA?如果我們選擇讓LEADER去交互丹擎,我們必須要HANDLER RAFT Leader掛掉尾抑,得有新的LEADER來(lái)負(fù)責(zé)PULL DATA。
所以在所有節(jié)點(diǎn)上必須得存好要問(wèn)哪里去PULL DATA蒂培。如果PULL到再愈,我們需要確保LEADER會(huì)往RAFT里發(fā)CMD(這個(gè)CMD是讓節(jié)點(diǎn)同步數(shù)據(jù),同時(shí)刪掉那個(gè)維護(hù)的哪里去PULL DATA的地方)

而且我們必須額外開(kāi)一個(gè)后臺(tái)進(jìn)程與循環(huán)的做這件事护戳。不然LEADER轉(zhuǎn)移過(guò)去之后翎冲,就沒(méi)有人PULL DATA了。 因?yàn)镻ULL DATA 這件事是沒(méi)有CLIENT超時(shí)重試的媳荒。

因?yàn)橐笈_(tái)循環(huán)去PULL DATA抗悍,我們拿到DATA后,送進(jìn)RAFT钳枕,再進(jìn)入到APPLY CH缴渊,需要所有的節(jié)點(diǎn)都可以同步這個(gè)數(shù)據(jù)。一旦同步成功鱼炒,我們需要清理這個(gè)要等待的數(shù)據(jù)衔沼。這樣后臺(tái)線程可以少發(fā)很多無(wú)用的RPC。

同時(shí)我們?cè)谒饕獢?shù)據(jù)的時(shí)候也要知道往哪個(gè)REPLICA GROUP要。


image.png
image.png
image.png
image.png

第7步

目前我們已經(jīng)再LEADER端指蚁,把收到的新的CONFIG和拿到的MIGRATION DATA打給放進(jìn)RAFT的LOG去做線性一致的排序菩佑。

所以當(dāng)這個(gè)2個(gè)消息從APPLY MSG出來(lái)的時(shí)候,需要去做一些事情欣舵。
為此擎鸠,我單獨(dú)開(kāi)了一個(gè)函數(shù)去寫APPLY的邏輯


image.png

第8步 實(shí)現(xiàn)APPLY MSG 是MIGRATION DATA REPLY

這里的點(diǎn)(后面大量調(diào)試獲得的),因?yàn)镽EPLY 發(fā)到RAFT里面缘圈,雖然有順序劣光,但返回的時(shí)候順序可能是亂的。比如現(xiàn)在我的CONFIG已經(jīng)更新到9糟把,這個(gè)時(shí)候RAFT才把CONFIG的6 返回回來(lái)绢涡。我們應(yīng)該直接忽略這個(gè)版本。如果更新了遣疯,就會(huì)產(chǎn)生不一致雄可。

那么依據(jù)亂序思想,我們不得不CHECK 就是當(dāng)前REPLY的CONFIG版本號(hào)必須是當(dāng)前CONFIG版本號(hào)小一個(gè)缠犀。

為什么数苫?

這里我們?cè)谑盏紺ONFIG 變更,我們就會(huì)刷新CONFIG辨液。但是此時(shí)CONFIG刷新之后虐急,我們會(huì)更新COME IN SHARD,隨后后臺(tái)線程會(huì)去PULL滔迈。從更新COME IN SHARD到數(shù)據(jù)SHARD過(guò)來(lái)止吁,這段時(shí)間內(nèi),我們必須得拒絕掉所有的索要該SHARD的請(qǐng)求燎悍。所以我們不能直接從CONFIG來(lái)判斷是不是WRONG GROUP敬惦。

至此,我們需要額外再維護(hù)一個(gè)我現(xiàn)在能HANDLER哪些SHARD的數(shù)據(jù)結(jié)構(gòu)谈山。


image.png

那么發(fā)出去的SHARD徙硅,我可以直接從這個(gè)數(shù)據(jù)結(jié)構(gòu)里刪掉有勾。要進(jìn)來(lái)的話洲劣,等真的進(jìn)來(lái)了祖屏,再添加到這個(gè)數(shù)據(jù)結(jié)構(gòu)中捕仔。


image.png

判斷是不是WRONG GROUP呜袁,也依據(jù)這個(gè)數(shù)據(jù)結(jié)構(gòu)來(lái)看呆细。

第9步 實(shí)現(xiàn)updateInAndOutDataShard

這邊我們會(huì)根據(jù)新的CONFIG來(lái)棒坏,判斷自己要送出去的數(shù)據(jù)是哪些妨猩,自己要接受進(jìn)來(lái)的數(shù)組是哪些潜叛。在我的設(shè)計(jì)里這2個(gè)數(shù)據(jù)結(jié)構(gòu)必要性在第5,6步討論過(guò)了。


image.png

第十步 判斷WRONG GROUP的時(shí)機(jī)

在前面的版本中威兜,我們是在SERVER端接受到請(qǐng)求的時(shí)候销斟,就直接去依據(jù)CONFIG判斷WRONG GROUP。現(xiàn)在我們改成依據(jù)MYSHARD來(lái)看椒舵,但是這還是不足的蚂踊。

還記得我們?cè)谧鯨AB 3的時(shí)候,判斷去重笔宿,必須得再消息回來(lái)的時(shí)候再看一次犁钟。 因?yàn)榭赡茉谡?qǐng)求發(fā)送的時(shí)候,數(shù)據(jù)還在REPLICA GROUP 1泼橘±远可是到消息從RAFT返回來(lái)的時(shí)候,當(dāng)中發(fā)生過(guò)更新CONFIG炬灭。數(shù)據(jù)不再GROUP 1了醋粟。所以要把判斷WRONG GROUP的邏輯,加在數(shù)據(jù)返回層重归。

同時(shí)因?yàn)閿?shù)據(jù)會(huì)在APPLY CH收到新的CONFIG米愿,一部分要TO OUT的數(shù)據(jù)就會(huì)從DB里DELETE掉。為了確保NOTIFY CH的傳輸過(guò)程中鼻吮,這個(gè)DB的更改不會(huì)影響到實(shí)際的GET的返回值育苟。我們需要在接到APPLY CH的時(shí)候就把結(jié)果給注入到OP里。不然等OP發(fā)過(guò)去再?gòu)腄B拿狈网,有一定概率此時(shí)另一個(gè)線程已經(jīng)再DELETE DB了宙搬。


image.png

image.png

image.png

同時(shí)根據(jù)這個(gè)思路,我把SHARD MASTER的QUERY 也加在返回層來(lái)做拓哺。


image.png

第11步 初始化新加的屬性

image.png

第12步 更新POLL NEW CONFIG代碼勇垛,需要一個(gè)個(gè)更新

來(lái)自HINT

Process re-configurations one at a time, in order.

同時(shí)注意如果當(dāng)前CONFIG,那些需要轉(zhuǎn)移的SHARD還沒(méi)做完士鸥。不要立刻去拿下一個(gè)CONFIG闲孤。


image.png

image.png

第13步 測(cè)試JOIN AND LEAVE

發(fā)現(xiàn)有時(shí)可以過(guò),有時(shí)過(guò)不了會(huì)阻塞烤礁。

BUG 1 死鎖

通過(guò)幾小時(shí)的調(diào)試發(fā)現(xiàn)讼积,是一個(gè)3維死鎖。首先RAFT里面拿了RAFT的鎖脚仔,阻塞在APPLY CH那勤众。APPLY CH的后臺(tái)線程阻塞在KV SERVER的鎖上。 還有一個(gè)PULL CONFIG的線程鲤脏,持有了KV SERVER的鎖们颜,阻塞在RAFT的鎖上吕朵。

image.png

FIX方法,交換代碼順序窥突。


image.png

測(cè)試通過(guò)

但是寫了這么多代碼努溃,很多地方都沒(méi)注意保護(hù)共享變量。所以用TEST DATA RACE的時(shí)候會(huì)出問(wèn)題阻问。

檢查思路梧税,先看3個(gè)后臺(tái)進(jìn)程,隨后看幾個(gè)RPC handler
這邊就自己加一下鎖吧称近。
GO TEST RACE OK之后第队,會(huì)在第三個(gè)測(cè)試敗掉。是SNAPSHOT

我們需要存儲(chǔ)更多的狀態(tài)進(jìn)SNAPSHOT煌茬。

第14步 實(shí)現(xiàn)新的SNAPSHOT

image.png

image.png

image.png

再直接測(cè)試斥铺,發(fā)現(xiàn)阻塞在UNRELIABLE 3


image.png

BUG 2 一處地方?jīng)]有釋放鎖

image.png

修復(fù)后重新對(duì)這個(gè)CASE單獨(dú)測(cè)試100次通過(guò),測(cè)全集坛善。只剩下CHANLLEGE 1晾蜘,DELETE的TASK了


image.png

第15步 思考如何刪除不必要的狀態(tài)

在上面的實(shí)現(xiàn)里,我們開(kāi)了3個(gè)數(shù)據(jù)結(jié)構(gòu)眠屎,一個(gè)是TO OUT剔交,一個(gè)是COME IN,一個(gè)是MY SHARD改衩;
第三個(gè)是固定大小的岖常。不用考慮
第二個(gè),我們已經(jīng)再接受到DATA之后會(huì)去刪除它葫督。
唯一沒(méi)有回收的就是第一個(gè)竭鞍。

最NAIVE的實(shí)現(xiàn)是當(dāng)我們把數(shù)據(jù)當(dāng)做REPLY發(fā)過(guò)去的時(shí)候,就直接刪掉橄镜。這是危險(xiǎn)的偎快。因?yàn)楹苡锌赡苓@個(gè)消息會(huì)丟失,被那邊服務(wù)器拒絕洽胶,造成這個(gè)數(shù)據(jù)就永遠(yuǎn)不會(huì)被回收晒夹。

正確的做法是等到對(duì)方服務(wù)器,成功接收了DATA姊氓,然后刪除了對(duì)應(yīng)的COME IN丐怯,這個(gè)時(shí)候應(yīng)該發(fā)REQUEST告訴TO OUT一方,你可以安全的把TO OUT里的這塊DATA給回收了翔横。

但是依然存在RPC會(huì)丟失的情況读跷。和PULL的思想一樣。(用一個(gè)COME IN LIST+ 后臺(tái)線程禾唁,來(lái)不斷重試舔亭,成功時(shí)候刪除COME IN LIST內(nèi)容些膨,就不再去PULL直到有新的COME IN來(lái)。失敗的話钦铺,因?yàn)镃OME IN 內(nèi)容還在,就會(huì)自動(dòng)重試肢预,不怕網(wǎng)絡(luò)不穩(wěn)定)

那么我針對(duì)這個(gè)CASE矛洞,用相同的套路。后臺(tái)GC線程+Garbage List.

具體思路就是當(dāng)COME IN 的DATA收到后烫映,我們要把這塊數(shù)據(jù)標(biāo)記進(jìn)Garbage List沼本。 后臺(tái)GC線程發(fā)現(xiàn)Garbage List有內(nèi)容,就會(huì)往對(duì)應(yīng)的GROUP發(fā)送GC RPC锭沟。對(duì)應(yīng)的GROUP清理成功后抽兆,REPLY告知。我們把Garbage List對(duì)應(yīng)的內(nèi)容刪除族淮。

同樣我們依然只和LEADER交互辫红,并且利用RAFT LOG,來(lái)確保所有節(jié)點(diǎn)都成功刪除GARBAGE祝辣,再RPC回復(fù)SUCCESS

第16步 寫GC RPC HANDLER贴妻,抽一個(gè)TEMPLATE

發(fā)現(xiàn)可以用ERR 里面加一個(gè)WRONGLEADER來(lái)代表LEADER不對(duì)。就可以去掉一個(gè)參數(shù)蝙斜。

當(dāng)OP TYPE是GC的時(shí)候名惩,KEY 是CONFIG NUM,SEQNUM是SHARD孕荠。


image.png
image.png
image.png

第17步 實(shí)現(xiàn)GC

image.png

image.png

第18步 實(shí)現(xiàn)GC后臺(tái)進(jìn)程

image.png
image.png

第19步 往GARBAGE里添加值

image.png

第20步娩鹉,更新SNAPSHOT

這里小伙伴自行更新吧

測(cè)試通過(guò)

image.png

第21步

因?yàn)槲业腞EPLICA GROUP里會(huì)往MASTER 發(fā)送QUERY請(qǐng)求,這個(gè)時(shí)候可能會(huì)造成LAST LEADER的DATA RACE稚伍。
所以我用原子方法改寫弯予。


image.png

第22步 CONCISE代碼

1.我把WRONDLEADER給去掉了。同時(shí)用ERR 的WRONGLEADER來(lái)表示槐瑞。


image.png

2.把幾個(gè)RPC HANDLER用TEMPLATE 提取公有邏輯


image.png

最終430 行代碼

GO TEST 測(cè)試200次

這個(gè)測(cè)試不適合并行熙涤,因?yàn)闀?huì)大量開(kāi)線程在做。并行測(cè)試會(huì)造成有些CASE跑的巨慢困檩。所以串行測(cè)試了祠挫。
同時(shí)CONFIG里因?yàn)镸ASTER的資源沒(méi)有回收。越到后面TEST 跑的越慢悼沿,我加了如下代碼來(lái)提速測(cè)試


image.png

基本跑完一整套是2分鐘


image.png

測(cè)試200次的結(jié)果 是

BUG 3

TestChallenge2Unaffected 會(huì)有1/50的概率阻塞等舔。

經(jīng)過(guò)打LOG 發(fā)現(xiàn),是還沒(méi)來(lái)得及把所有DATA SHARD完糟趾,超過(guò)了1秒慌植,之后就有數(shù)據(jù)再也MIGRATE不過(guò)來(lái)甚牲。造成拿不到而阻塞。

這里分享一個(gè)打LOG的技巧蝶柿,避免淹沒(méi)在茫茫LOG海里丈钙。就是出了問(wèn)題,再打LOG


image.png

image.png
image.png
image.png

原因如下:


image.png

下圖這個(gè)4號(hào)數(shù)據(jù)塊 在測(cè)試?yán)飳儆?01的OWN交汤,但是還沒(méi)來(lái)得及拿到雏赦,100的網(wǎng)就斷了。再也取不到了芙扎。


image.png

解決方案星岗,加快PULL的頻率


image.png

加快QUERY CONFIG的速度。思路是如果是拿已知的CONFIG戒洼,因?yàn)镃ONFIG的APPEND不會(huì)修改俏橘,所以可以直接返回。


image.png

縮短MASTER CLIENT的睡眠時(shí)間圈浇。


image.png

測(cè)試200次后無(wú)阻塞寥掐。

SHARD KV 測(cè)試200次通過(guò)

測(cè)試腳本

#!/bin/bash

export GOPATH="/home/zyx/Desktop/mit6.824/6.824"
export PATH="$PATH:/usr/lib/go-1.9/bin"

rm res -rf
mkdir res
for ((i = 0; i < 200; i++))
do
echo $i
(go test) > ./res/$i
grep -nr "FAIL.*" res
done
image.png

回歸測(cè)試SHARD MASTER500次通過(guò)

image.png

回歸測(cè)試RAFT 300次通過(guò)

有2次時(shí)之前說(shuō)的不是代碼問(wèn)題的KNOWN ISSUE,具體參考文集的2C部分


image.png

回歸測(cè)試KVRAFT 210次通過(guò)

image.png

CONCISE SERVER

package shardkv

import (
    "bytes"
    "labrpc"
    "log"
    "shardmaster"
    "strconv"
    "time"
)
import "raft"
import "sync"
import "labgob"

type Op struct {
    OpType  string  "operation type(eg. put/append/gc/get)"
    Key     string  "key for normal, config num for gc"
    Value   string
    Cid     int64   "cid for put/append, operation uid for get/gc"
    SeqNum  int     "seqnum for put/append, shard for gc"
}

type ShardKV struct {
    mu           sync.Mutex
    me           int
    rf           *raft.Raft
    applyCh      chan raft.ApplyMsg

    make_end     func(string) *labrpc.ClientEnd
    gid          int
    masters      []*labrpc.ClientEnd
    maxraftstate int // snapshot if log grows this big
    // Your definitions here.
    mck             *shardmaster.Clerk
    cfg             shardmaster.Config
    persist         *raft.Persister
    db              map[string]string
    chMap           map[int]chan Op
    cid2Seq         map[int64]int

    toOutShards     map[int]map[int]map[string]string "cfg num -> (shard -> db)"
    comeInShards    map[int]int     "shard->config number"
    myShards        map[int]bool    "to record which shard i can offer service"
    garbages        map[int]map[int]bool              "cfg number -> shards"

    killCh      chan bool
}

func (kv *ShardKV) Get(args *GetArgs, reply *GetReply) {
    originOp := Op{"Get",args.Key,"",Nrand(),0}
    reply.Err,reply.Value = kv.templateStart(originOp)
}
func (kv *ShardKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
    originOp := Op{args.Op,args.Key,args.Value,args.Cid,args.SeqNum}
    reply.Err,_ = kv.templateStart(originOp)
}
func (kv *ShardKV) templateStart(originOp Op) (Err, string) {
    index,_,isLeader := kv.rf.Start(originOp)
    if isLeader {
        ch := kv.put(index, true)
        op := kv.beNotified(ch, index)
        if equalOp(originOp, op) { return OK, op.Value }
        if op.OpType == ErrWrongGroup { return ErrWrongGroup, "" }
    }
    return ErrWrongLeader,""
}
func (kv *ShardKV) GarbageCollection(args *MigrateArgs, reply *MigrateReply) {
    reply.Err = ErrWrongLeader
    if _, isLeader := kv.rf.GetState(); !isLeader {return}
    kv.mu.Lock()
    defer kv.mu.Unlock()
    if _,ok := kv.toOutShards[args.ConfigNum]; !ok {return}
    if _,ok := kv.toOutShards[args.ConfigNum][args.Shard]; !ok {return}
    originOp := Op{"GC",strconv.Itoa(args.ConfigNum),"",Nrand(),args.Shard}
    kv.mu.Unlock()
    reply.Err,_ = kv.templateStart(originOp)
    kv.mu.Lock()
}

func (kv *ShardKV) ShardMigration(args *MigrateArgs, reply *MigrateReply) {
    reply.Err, reply.Shard, reply.ConfigNum = ErrWrongLeader, args.Shard, args.ConfigNum
    if _,isLeader := kv.rf.GetState(); !isLeader {return}
    kv.mu.Lock()
    defer kv.mu.Unlock()
    reply.Err = ErrWrongGroup
    if args.ConfigNum >= kv.cfg.Num {return}
    reply.Err,reply.ConfigNum, reply.Shard = OK, args.ConfigNum, args.Shard
    reply.DB, reply.Cid2Seq = kv.deepCopyDBAndDedupMap(args.ConfigNum,args.Shard)
}
func (kv *ShardKV) deepCopyDBAndDedupMap(config int,shard int) (map[string]string, map[int64]int) {
    db2 := make(map[string]string)
    cid2Seq2 := make(map[int64]int)
    for k, v := range kv.toOutShards[config][shard] {
        db2[k] = v
    }
    for k, v := range kv.cid2Seq {
        cid2Seq2[k] = v
    }
    return db2, cid2Seq2
}

func (kv *ShardKV) beNotified(ch chan Op,index int) Op{
    select {
    case notifyArg,ok := <- ch :
        if ok {
            close(ch)
        }
        kv.mu.Lock()
        delete(kv.chMap,index)
        kv.mu.Unlock()
        return notifyArg
    case <- time.After(time.Duration(1000)*time.Millisecond):
        return Op{}
    }
}
func (kv *ShardKV) put(idx int,createIfNotExists bool) chan Op{
    kv.mu.Lock()
    defer kv.mu.Unlock()
    if _, ok := kv.chMap[idx]; !ok {
        if !createIfNotExists {return nil}
        kv.chMap[idx] = make(chan Op,1)
    }
    return kv.chMap[idx]
}
func equalOp(a Op, b Op) bool{
    return a.Key == b.Key &&  a.OpType == b.OpType && a.SeqNum == b.SeqNum && a.Cid == b.Cid
}

func (kv *ShardKV) Kill() {
    kv.rf.Kill()
    select{
    case  <-kv.killCh:
    default:
    }
    kv.killCh <- true
}

func (kv *ShardKV) readSnapShot(snapshot []byte) {
    kv.mu.Lock()
    defer kv.mu.Unlock()
    if snapshot == nil || len(snapshot) < 1 {return}
    r := bytes.NewBuffer(snapshot)
    d := labgob.NewDecoder(r)
    var db map[string]string
    var cid2Seq map[int64]int
    var toOutShards map[int]map[int]map[string]string
    var comeInShards map[int]int
    var myShards    map[int]bool
    var garbages    map[int]map[int]bool
    var cfg shardmaster.Config
    if d.Decode(&db) != nil || d.Decode(&cid2Seq) != nil || d.Decode(&comeInShards) != nil ||
        d.Decode(&toOutShards) != nil || d.Decode(&myShards) != nil || d.Decode(&cfg) != nil ||
        d.Decode(&garbages) != nil {
        log.Fatal("readSnapShot ERROR for server %v",kv.me)
    } else {
        kv.db, kv.cid2Seq, kv.cfg = db, cid2Seq, cfg
        kv.toOutShards, kv.comeInShards, kv.myShards, kv.garbages = toOutShards,comeInShards,myShards,garbages
    }
}

func (kv *ShardKV) needSnapShot() bool {
    kv.mu.Lock()
    defer kv.mu.Unlock()
    threshold := 10
    return kv.maxraftstate > 0 &&
        kv.maxraftstate - kv.persist.RaftStateSize() < kv.maxraftstate/threshold
}

func (kv *ShardKV) doSnapShot(index int) {
    w := new(bytes.Buffer)
    e := labgob.NewEncoder(w)
    kv.mu.Lock()
    e.Encode(kv.db)
    e.Encode(kv.cid2Seq)
    e.Encode(kv.comeInShards)
    e.Encode(kv.toOutShards)
    e.Encode(kv.myShards)
    e.Encode(kv.cfg)
    e.Encode(kv.garbages)
    kv.mu.Unlock()
    kv.rf.DoSnapShot(index,w.Bytes())
}



func (kv *ShardKV) tryPollNewCfg() {
    _, isLeader := kv.rf.GetState();
    kv.mu.Lock()
    if !isLeader || len(kv.comeInShards) > 0{
        kv.mu.Unlock()
        return
    }
    next := kv.cfg.Num + 1
    kv.mu.Unlock()
    cfg := kv.mck.Query(next)
    if cfg.Num == next {
        kv.rf.Start(cfg) //sync follower with new cfg
    }
}
func (kv *ShardKV) tryGC() {
    _, isLeader := kv.rf.GetState();
    kv.mu.Lock()
    if !isLeader || len(kv.garbages) == 0{
        kv.mu.Unlock()
        return
    }
    var wait sync.WaitGroup
    for cfgNum, shards := range kv.garbages {
        for shard := range shards {
            wait.Add(1)
            go func(shard int, cfg shardmaster.Config) {
                defer wait.Done()
                args := MigrateArgs{shard, cfg.Num}
                gid := cfg.Shards[shard]
                for _, server := range cfg.Groups[gid] {
                    srv := kv.make_end(server)
                    reply := MigrateReply{}
                    if ok := srv.Call("ShardKV.GarbageCollection", &args, &reply); ok && reply.Err == OK {
                        kv.mu.Lock()
                        defer kv.mu.Unlock()
                        delete(kv.garbages[cfgNum], shard)
                        if len(kv.garbages[cfgNum]) == 0 {
                            delete(kv.garbages, cfgNum)
                        }
                    }
                }
            }(shard, kv.mck.Query(cfgNum))
        }
    }
    kv.mu.Unlock()
    wait.Wait()
}
func (kv *ShardKV) tryPullShard() {
    _, isLeader := kv.rf.GetState();
    kv.mu.Lock()
    if  !isLeader || len(kv.comeInShards) == 0 {
        kv.mu.Unlock()
        return
    }
    var wait sync.WaitGroup
    for shard, idx := range kv.comeInShards {
        wait.Add(1)
        go func(shard int, cfg shardmaster.Config) {
            defer wait.Done()
            args := MigrateArgs{shard, cfg.Num}
            gid := cfg.Shards[shard]
            for _, server := range cfg.Groups[gid] {
                srv := kv.make_end(server)
                reply := MigrateReply{}
                if ok := srv.Call("ShardKV.ShardMigration", &args, &reply); ok && reply.Err == OK {
                    kv.rf.Start(reply)
                }

            }
        }(shard, kv.mck.Query(idx))
    }
    kv.mu.Unlock()
    wait.Wait()
}

func (kv *ShardKV) daemon(do func(), sleepMS int) {
    for {
        select {
        case <-kv.killCh:
            return
        default:
            do()
        }
        time.Sleep(time.Duration(sleepMS) * time.Millisecond)
    }
}

func (kv *ShardKV) apply(applyMsg raft.ApplyMsg) {
    if cfg, ok := applyMsg.Command.(shardmaster.Config); ok {
        kv.updateInAndOutDataShard(cfg)
    } else if migrationData, ok := applyMsg.Command.(MigrateReply); ok{
        kv.updateDBWithMigrateData(migrationData)
    }else {
        op := applyMsg.Command.(Op)
        if op.OpType == "GC" {
            cfgNum,_ := strconv.Atoi(op.Key)
            kv.gc(cfgNum,op.SeqNum);
        } else {
            kv.normal(&op)
        }
        if notifyCh := kv.put(applyMsg.CommandIndex,false); notifyCh != nil {
            send(notifyCh,op)
        }
    }
    if kv.needSnapShot() {
        go kv.doSnapShot(applyMsg.CommandIndex)
    }

}

func (kv *ShardKV) gc(cfgNum int, shard int) {
    kv.mu.Lock()
    defer kv.mu.Unlock()
    if _, ok := kv.toOutShards[cfgNum]; ok {
        delete(kv.toOutShards[cfgNum], shard)
        if len(kv.toOutShards[cfgNum]) == 0 {
            delete(kv.toOutShards, cfgNum)
        }
    }
}

func (kv *ShardKV) updateInAndOutDataShard(cfg shardmaster.Config) {
    kv.mu.Lock()
    defer kv.mu.Unlock()
    if cfg.Num <= kv.cfg.Num { //only consider newer config
        return
    }
    oldCfg, toOutShard := kv.cfg, kv.myShards
    kv.myShards, kv.cfg = make(map[int]bool), cfg
    for shard, gid := range cfg.Shards {
        if gid != kv.gid {continue}
        if _, ok := toOutShard[shard]; ok || oldCfg.Num == 0 {
            kv.myShards[shard] = true
            delete(toOutShard, shard)
        } else {
            kv.comeInShards[shard] = oldCfg.Num
        }
    }
    if len(toOutShard) > 0 { // prepare data that needed migration
        kv.toOutShards[oldCfg.Num] = make(map[int]map[string]string)
        for shard := range toOutShard {
            outDb := make(map[string]string)
            for k, v := range kv.db {
                if key2shard(k) == shard {
                    outDb[k] = v
                    delete(kv.db, k)
                }
            }
            kv.toOutShards[oldCfg.Num][shard] = outDb
        }
    }
}

func (kv *ShardKV) updateDBWithMigrateData(migrationData MigrateReply) {
    kv.mu.Lock()
    defer kv.mu.Unlock()
    if migrationData.ConfigNum != kv.cfg.Num-1 {return}
    delete(kv.comeInShards, migrationData.Shard)
    //this check is necessary, to avoid use  kv.cfg.Num-1 to update kv.cfg.Num's shard
    if _, ok := kv.myShards[migrationData.Shard]; !ok {
        kv.myShards[migrationData.Shard] = true
        for k, v := range migrationData.DB {
            kv.db[k] = v
        }
        for k, v := range migrationData.Cid2Seq {
            kv.cid2Seq[k] = Max(v,kv.cid2Seq[k])
        }
        if _, ok := kv.garbages[migrationData.ConfigNum]; !ok {
            kv.garbages[migrationData.ConfigNum] = make(map[int]bool)
        }
        kv.garbages[migrationData.ConfigNum][migrationData.Shard] = true
    }
}

func (kv *ShardKV) normal(op *Op) {
    shard := key2shard(op.Key)
    kv.mu.Lock()
    if _, ok := kv.myShards[shard]; !ok {
        op.OpType = ErrWrongGroup
    } else {
        maxSeq,found := kv.cid2Seq[op.Cid]
        if !found || op.SeqNum > maxSeq {
            if op.OpType == "Put" {
                kv.db[op.Key] = op.Value
            } else if op.OpType == "Append" {
                kv.db[op.Key] += op.Value
            }
            kv.cid2Seq[op.Cid] = op.SeqNum
        }
        if op.OpType == "Get" {
            op.Value = kv.db[op.Key]
        }
    }
    kv.mu.Unlock()
}

func send(notifyCh chan Op,op Op) {
    select{
    case  <-notifyCh:
    default:
    }
    notifyCh <- op
}

func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int, gid int, masters []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *ShardKV {
    // call labgob.Register on structures you want
    // Go's RPC library to marshall/unmarshall.
    labgob.Register(Op{})
    labgob.Register(MigrateArgs{})
    labgob.Register(MigrateReply{})
    labgob.Register(shardmaster.Config{})

    kv := new(ShardKV)
    kv.me = me
    kv.maxraftstate = maxraftstate
    kv.make_end = make_end
    kv.gid = gid
    kv.masters = masters
    // Your initialization code here.
    kv.persist = persister
    // Use something like this to talk to the shardmaster:
    kv.mck = shardmaster.MakeClerk(kv.masters)
    kv.cfg = shardmaster.Config{}

    kv.db = make(map[string]string)
    kv.chMap = make(map[int]chan Op)
    kv.cid2Seq = make(map[int64]int)

    kv.toOutShards = make(map[int]map[int]map[string]string)
    kv.comeInShards = make(map[int]int)
    kv.myShards = make(map[int]bool)
    kv.garbages = make(map[int]map[int]bool)

    kv.readSnapShot(kv.persist.ReadSnapshot())

    kv.applyCh = make(chan raft.ApplyMsg)
    kv.rf = raft.Make(servers, me, persister, kv.applyCh)
    kv.killCh = make(chan bool,1)
    go kv.daemon(kv.tryPollNewCfg,50)
    go kv.daemon(kv.tryPullShard,80)
    go kv.daemon(kv.tryGC,100)
    go func() {
        for {
            select {
            case <- kv.killCh:
                return
            case applyMsg := <- kv.applyCh:
                if !applyMsg.CommandValid {
                    kv.readSnapShot(applyMsg.SnapShot)
                    continue
                }
                kv.apply(applyMsg)
            }
        }
    }()
    return kv
}

CONCISE CLIENT

package shardkv

//
// client code to talk to a sharded key/value service.
//
// the client first talks to the shardmaster to find out
// the assignment of shards (keys) to groups, and then
// talks to the group that holds the key's shard.
//

import (
    "labrpc"
)
import "crypto/rand"
import "math/big"
import "shardmaster"
import "time"


func key2shard(key string) int {
    shard := 0
    if len(key) > 0 {
        shard = int(key[0])
    }
    shard %= shardmaster.NShards
    return shard
}

func Nrand() int64 {
    max := big.NewInt(int64(1) << 62)
    bigx, _ := rand.Int(rand.Reader, max)
    x := bigx.Int64()
    return x
}

type Clerk struct {
    sm       *shardmaster.Clerk
    config   shardmaster.Config
    make_end func(string) *labrpc.ClientEnd
    // You will have to modify this struct.
    lastLeader  int
    id          int64
    seqNum      int
}


func MakeClerk(masters []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *Clerk {
    ck := new(Clerk)
    ck.sm = shardmaster.MakeClerk(masters)
    ck.make_end = make_end
    // You'll have to add code here.
    ck.id = Nrand()//give each client a unique identifier, and then have them
    ck.seqNum = 0// tag each request with a monotonically increasing sequence number.
    ck.lastLeader = 0
    return ck
}

//
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
// You will have to modify this function.
//
func (ck *Clerk) Get(key string) string {
    args := GetArgs{}
    args.Key = key
    for {
        shard := key2shard(key)
        gid := ck.config.Shards[shard]
        if servers, ok := ck.config.Groups[gid]; ok {
            // try each server for the shard.
            for i := 0; i < len(servers); i++ {
                si := (i + ck.lastLeader) % len(servers)
                srv := ck.make_end(servers[si])
                var reply GetReply
                ok := srv.Call("ShardKV.Get", &args, &reply)
                if ok && reply.Err == OK {
                    ck.lastLeader = si;
                    return reply.Value
                }
                if ok && (reply.Err == ErrWrongGroup) {
                    break
                }
            }
        }
        time.Sleep(100 * time.Millisecond)
        // ask master for the latest configuration.
        ck.config = ck.sm.Query(-1)
    }

}

//
// shared by Put and Append.
// You will have to modify this function.
//
func (ck *Clerk) PutAppend(key string, value string, op string) {
    args := PutAppendArgs{key,value,op,ck.id,ck.seqNum}
    ck.seqNum++
    for {
        shard := key2shard(key)
        gid := ck.config.Shards[shard]
        if servers, ok := ck.config.Groups[gid]; ok {
            for i := 0; i < len(servers); i++ {
                si := (i + ck.lastLeader) % len(servers)
                srv := ck.make_end(servers[si])
                var reply PutAppendReply
                ok := srv.Call("ShardKV.PutAppend", &args, &reply)
                if ok && reply.Err == OK {
                    ck.lastLeader = si
                    return
                }
                if ok && reply.Err == ErrWrongGroup {
                    break
                }
            }
        }
        time.Sleep(100 * time.Millisecond)
        // ask master for the latest configuration.
        ck.config = ck.sm.Query(-1)
    }
}

func (ck *Clerk) Put(key string, value string) {
    ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
    ck.PutAppend(key, value, "Append")
}

CONCISE COMMON

package shardkv

//
// Sharded key/value server.
// Lots of replica groups, each running op-at-a-time paxos.
// Shardmaster decides which group serves each shard.
// Shardmaster may change shard assignment from time to time.
//
// You will have to modify these definitions.
//

const (
    OK             = "OK"
    ErrWrongLeader = "ErrWrongLeader"
    ErrWrongGroup  = "ErrWrongGroup"
)

type Err string

// Put or Append
type PutAppendArgs struct {
    Key   string
    Value string
    Op    string // "Put" or "Append"
    Cid    int64 "client unique id"
    SeqNum int   "each request with a monotonically increasing sequence number"
}

type PutAppendReply struct {
    Err         Err
}

type GetArgs struct {
    Key string
}

type GetReply struct {
    Err         Err
    Value       string
}

type MigrateArgs struct {
    Shard     int
    ConfigNum int
}

type MigrateReply struct {
    Err         Err
    ConfigNum   int
    Shard       int
    DB          map[string]string
    Cid2Seq     map[int64]int
}

func Max(x, y int) int {
    if x > y {
        return x
    }
    return y
}

最后再把全部代碼提交進(jìn)GITHUB汉额。待我把MAP REDUCE寫了一起吧曹仗。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市蠕搜,隨后出現(xiàn)的幾起案子怎茫,更是在濱河造成了極大的恐慌,老刑警劉巖妓灌,帶你破解...
    沈念sama閱讀 219,039評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件轨蛤,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡虫埂,警方通過(guò)查閱死者的電腦和手機(jī)祥山,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)掉伏,“玉大人缝呕,你說(shuō)我怎么就攤上這事「ⅲ” “怎么了供常?”我有些...
    開(kāi)封第一講書人閱讀 165,417評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)鸡捐。 經(jīng)常有香客問(wèn)我栈暇,道長(zhǎng),這世上最難降的妖魔是什么箍镜? 我笑而不...
    開(kāi)封第一講書人閱讀 58,868評(píng)論 1 295
  • 正文 為了忘掉前任源祈,我火速辦了婚禮煎源,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘香缺。我一直安慰自己手销,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布图张。 她就那樣靜靜地躺著原献,像睡著了一般。 火紅的嫁衣襯著肌膚如雪埂淮。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 51,692評(píng)論 1 305
  • 那天写隶,我揣著相機(jī)與錄音倔撞,去河邊找鬼。 笑死慕趴,一個(gè)胖子當(dāng)著我的面吹牛痪蝇,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播冕房,決...
    沈念sama閱讀 40,416評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼躏啰,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了耙册?” 一聲冷哼從身側(cè)響起给僵,我...
    開(kāi)封第一講書人閱讀 39,326評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎详拙,沒(méi)想到半個(gè)月后帝际,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,782評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡饶辙,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評(píng)論 3 337
  • 正文 我和宋清朗相戀三年蹲诀,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片弃揽。...
    茶點(diǎn)故事閱讀 40,102評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡脯爪,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出矿微,到底是詐尸還是另有隱情痕慢,我是刑警寧澤,帶...
    沈念sama閱讀 35,790評(píng)論 5 346
  • 正文 年R本政府宣布冷冗,位于F島的核電站守屉,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏蒿辙。R本人自食惡果不足惜拇泛,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評(píng)論 3 331
  • 文/蒙蒙 一滨巴、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧俺叭,春花似錦恭取、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,996評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至裕照,卻和暖如春攒发,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背晋南。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,113評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工惠猿, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人负间。 一個(gè)月前我還...
    沈念sama閱讀 48,332評(píng)論 3 373
  • 正文 我出身青樓偶妖,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親政溃。 傳聞我的和親對(duì)象是個(gè)殘疾皇子趾访,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容