MapReduce是由JeffreyDean提出的一種處理大數(shù)據(jù)的編程模型,作為在Go中編程和構(gòu)建容錯(cuò)分布式系統(tǒng)的入門。
集群中有一個(gè)master,其它的都是worker瞭亮,總共有M個(gè)map任務(wù)和R個(gè)reduce任務(wù)(M和R由用戶指定)。master負(fù)責(zé)將map和reduce任務(wù)分配給空閑的worker并處理worker的故障固棚。
Part I: Map/Reduce input and output
分別實(shí)現(xiàn) common_map.go统翩、common_reduce.go 中的 doMap()、doReduce() 方法此洲。
1.讀取一個(gè)輸入文件inFile厂汗。調(diào)用用戶定義函數(shù)mapF,將內(nèi)容轉(zhuǎn)換為鍵值對(duì)呜师。
2.新建nReduce工作數(shù)目相等的中間文件娶桦。使用reduceName(jobName, mapTask, r)生成的中間文件名。
3.根據(jù)key-value的分配規(guī)則(ihash(key) % nReduce)汁汗,將鍵值對(duì)存入新建的中間文件內(nèi)衷畦。
func doMap(
jobName string, // the name of the MapReduce job
mapTask int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(filename string, contents string) []KeyValue,
) {
data, err := ioutil.ReadFile(inFile)
if err != nil {
panic(err)
}
//新建中間文件
outputFiles := make([] *os.File, nReduce)
for i := 0; i < nReduce; i++ {
fileName := reduceName(jobName, mapTask, i)
outputFiles[i], err = os.Create(fileName)
if err != nil {
panic(err)
}
}
//將輸入文件內(nèi)容轉(zhuǎn)為鍵值對(duì)
keyValues := mapF(inFile, string(data))
//根據(jù)hash規(guī)則將鍵值對(duì)存入中間文件
for _, kv := range keyValues {
index := ihash(kv.Key) % nReduce
enc := json.NewEncoder(outputFiles[index])
enc.Encode(kv)
}
for _, file := range outputFiles {
file.Close()
}
}
1.讀取map工作結(jié)果的中間文件的鍵值對(duì),并合并相同的key知牌。
2.對(duì)key排序祈争。
3.將reduceF的結(jié)果保存到mergeName()返回的文件中。
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTask int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
inputFiles := make([] *os.File, nMap)
for i := 0; i < nMap; i++ {
fileName := reduceName(jobName, i, reduceTask)//注意與中間文件名的創(chuàng)建保持一致
inputFiles[i], _ = os.Open(fileName)
}
//讀取中間文件內(nèi)容
keyValues := make(map[string][]string)
for _, inputFile := range inputFiles {
defer inputFile.Close()
dec := json.NewDecoder(inputFile)
for {
var kv KeyValue
err := dec.Decode(&kv)
if err != nil {
break
}
keyValues[kv.Key] = append(keyValues[kv.Key], kv.Value)
}
}
//排序
keys := make([]string, 0, len(keyValues))
for k := range keyValues {
keys = append(keys, k)
}
sort.Strings(keys)
//新建結(jié)果文件角寸,將key的統(tǒng)計(jì)結(jié)果存入菩混。
out, err := os.Create(outFile)
if err != nil {
log.Fatal("Error in creating file", outFile)
}
defer out.Close()
enc := json.NewEncoder(out)
for _, key := range keys {
kv := KeyValue{key, reduceF(key, keyValues[key])}
enc.Encode(kv)
}
}
Part II: Single-worker word count
編寫main/wc.go 中的 mapF()忿墅、reduceF()方法。
mapF() 返回一個(gè)鍵/值對(duì)的切片;
reduceF() 返回這個(gè)key出現(xiàn)了多少次沮峡,即values的長度疚脐。
func mapF(filename string, contents string) []mapreduce.KeyValue {
words := strings.FieldsFunc(contents, func(r rune) bool {
return !unicode.IsLetter(r)
})
res := make([]mapreduce.KeyValue, 0)
for _, word := range words {
res = append(res, mapreduce.KeyValue{word, ""})
}
return res
}
func reduceF(key string, values []string) string {
return strconv.Itoa(len(values))
}
Part III: Distributing MapReduce tasks
編寫schedule.go中的 schedule()方法。
1.等待所有任務(wù)完成邢疙。
2.從registerChan中取出worker的地址棍弄,將任務(wù)分配給它。
注:該通道為每個(gè)工作者生成一個(gè)字符串秘症,其中包含工作者的RPC地址照卦。有些worker可能在調(diào)用schedule()之前存在式矫,有些可能在schedule()運(yùn)行時(shí)啟動(dòng);所有這些都將出現(xiàn)在registerChan上乡摹。schedule()應(yīng)該使用所有的worker,包括在它啟動(dòng)后出現(xiàn)的那些采转。
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
wg := sync.WaitGroup{}
wg.Add(ntasks)
//將任務(wù)放入隊(duì)列
taskCh := make(chan int,ntasks)
for idx:=0;idx<ntasks;idx++ {
taskCh <- idx
}
//所有任務(wù)完成聪廉,關(guān)閉任務(wù)channel,退出
go func() {
wg.Wait()
close(taskCh)
}()
for idx := range taskCh{
arg := DoTaskArgs{
JobName: jobName,
File: mapFiles[idx],
Phase: phase,
TaskNumber: idx,
NumOtherPhase: n_other,
}
worker := <- registerChan
go func(worker string,arg DoTaskArgs,idx int) {
if call(worker,"Worker.DoTask",arg,nil){
wg.Done()
}else { //call失敗故慈,將任務(wù)重新放回隊(duì)列
taskCh <- idx
}
//任務(wù)結(jié)束板熊,歸還工作線程
registerChan <- worker
}(worker,arg,idx)
}
fmt.Printf("Schedule: %v done\n", phase)
}
Part IV: Handling worker failures
worker 工作失敗的例子
RPC失敗的原因:1.請(qǐng)求沒有達(dá)到,工作進(jìn)程沒有執(zhí)行任務(wù)察绷;2.工作進(jìn)程可能已經(jīng)執(zhí)行了它干签,但是應(yīng)答丟失;3.工作進(jìn)程可能仍然在執(zhí)行拆撼,但是主進(jìn)程的RPC超時(shí)了容劳。
代碼實(shí)現(xiàn)參考part 3;
Part V: Inverted index generation (optional, does not count in grade)
生成倒排索引
func mapF(document string, value string) (res []mapreduce.KeyValue) {
words := strings.FieldsFunc(value, func(r rune) bool {
return !unicode.IsLetter(r)
})
s := make(map[string]bool)
for _, word := range words {
lower := strings.ToLower(word)
upper := strings.ToUpper(word)
_, hasUpper := s[lower]
_, hasLower := s[upper]
if !hasLower && !hasUpper {
if lower == word {
s[lower] = true
} else if upper == word {
s[upper] = true
}
}
}
for k, _ := range s {
res = append(res, mapreduce.KeyValue{k, document})
}
return
}
func reduceF(key string, values []string) string {
sort.Strings(values)
return strconv.Itoa(len(values)) + " " + strings.Join(values, ",")
}