MIT-6.824 Lab1: MapReduce-2018

MapReduce是由JeffreyDean提出的一種處理大數(shù)據(jù)的編程模型,作為在Go中編程和構(gòu)建容錯(cuò)分布式系統(tǒng)的入門。
集群中有一個(gè)master,其它的都是worker瞭亮,總共有M個(gè)map任務(wù)和R個(gè)reduce任務(wù)(M和R由用戶指定)。master負(fù)責(zé)將map和reduce任務(wù)分配給空閑的worker并處理worker的故障固棚。

Part I: Map/Reduce input and output

分別實(shí)現(xiàn) common_map.go统翩、common_reduce.go 中的 doMap()、doReduce() 方法此洲。

1.讀取一個(gè)輸入文件inFile厂汗。調(diào)用用戶定義函數(shù)mapF,將內(nèi)容轉(zhuǎn)換為鍵值對(duì)呜师。
2.新建nReduce工作數(shù)目相等的中間文件娶桦。使用reduceName(jobName, mapTask, r)生成的中間文件名。
3.根據(jù)key-value的分配規(guī)則(ihash(key) % nReduce)汁汗,將鍵值對(duì)存入新建的中間文件內(nèi)衷畦。

func doMap(
    jobName string, // the name of the MapReduce job
    mapTask int, // which map task this is
    inFile string,
    nReduce int, // the number of reduce task that will be run ("R" in the paper)
    mapF func(filename string, contents string) []KeyValue,
) {
    data, err := ioutil.ReadFile(inFile)
    if err != nil {
        panic(err)
    }
    //新建中間文件
    outputFiles := make([] *os.File, nReduce)
    for i := 0; i < nReduce; i++ {
        fileName := reduceName(jobName, mapTask, i)
        outputFiles[i], err = os.Create(fileName)
        if err != nil {
            panic(err)
        }
    }
    //將輸入文件內(nèi)容轉(zhuǎn)為鍵值對(duì)
    keyValues := mapF(inFile, string(data))
    //根據(jù)hash規(guī)則將鍵值對(duì)存入中間文件
    for _, kv := range keyValues {
        index := ihash(kv.Key) % nReduce
        enc := json.NewEncoder(outputFiles[index])
        enc.Encode(kv)
    }
    for _, file := range outputFiles {
        file.Close()
    }
}

1.讀取map工作結(jié)果的中間文件的鍵值對(duì),并合并相同的key知牌。
2.對(duì)key排序祈争。
3.將reduceF的結(jié)果保存到mergeName()返回的文件中。

func doReduce(
    jobName string, // the name of the whole MapReduce job
    reduceTask int, // which reduce task this is
    outFile string, // write the output here
    nMap int, // the number of map tasks that were run ("M" in the paper)
    reduceF func(key string, values []string) string,
) {
    inputFiles := make([] *os.File, nMap)
    for i := 0; i < nMap; i++ {
        fileName := reduceName(jobName, i, reduceTask)//注意與中間文件名的創(chuàng)建保持一致
        inputFiles[i], _ = os.Open(fileName)
    }
    //讀取中間文件內(nèi)容
    keyValues := make(map[string][]string)
    for _, inputFile := range inputFiles {
        defer inputFile.Close()
        dec := json.NewDecoder(inputFile)
        for {
            var kv KeyValue
            err := dec.Decode(&kv)
            if err != nil {
                break
            }
            keyValues[kv.Key] = append(keyValues[kv.Key], kv.Value)
        }
    }
    //排序
    keys := make([]string, 0, len(keyValues))
    for k := range keyValues {
        keys = append(keys, k)
    }
    sort.Strings(keys)

    //新建結(jié)果文件角寸,將key的統(tǒng)計(jì)結(jié)果存入菩混。
    out, err := os.Create(outFile)
    if err != nil {
        log.Fatal("Error in creating file", outFile)
    }
    defer out.Close()

    enc := json.NewEncoder(out)
    for _, key := range keys {
        kv := KeyValue{key, reduceF(key, keyValues[key])}
        enc.Encode(kv)
    }
}

Part II: Single-worker word count

編寫main/wc.go 中的 mapF()忿墅、reduceF()方法。
mapF() 返回一個(gè)鍵/值對(duì)的切片;
reduceF() 返回這個(gè)key出現(xiàn)了多少次沮峡,即values的長度疚脐。

func mapF(filename string, contents string) []mapreduce.KeyValue {
    words := strings.FieldsFunc(contents, func(r rune) bool {
        return !unicode.IsLetter(r)
    })
    res := make([]mapreduce.KeyValue, 0)
    for _, word := range words {
        res = append(res, mapreduce.KeyValue{word, ""})
    }
    return res
}

func reduceF(key string, values []string) string {
    return strconv.Itoa(len(values))
}

Part III: Distributing MapReduce tasks

編寫schedule.go中的 schedule()方法。
1.等待所有任務(wù)完成邢疙。
2.從registerChan中取出worker的地址棍弄,將任務(wù)分配給它。
注:該通道為每個(gè)工作者生成一個(gè)字符串秘症,其中包含工作者的RPC地址照卦。有些worker可能在調(diào)用schedule()之前存在式矫,有些可能在schedule()運(yùn)行時(shí)啟動(dòng);所有這些都將出現(xiàn)在registerChan上乡摹。schedule()應(yīng)該使用所有的worker,包括在它啟動(dòng)后出現(xiàn)的那些采转。

func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
    var ntasks int
    var n_other int // number of inputs (for reduce) or outputs (for map)
    switch phase {
    case mapPhase:
        ntasks = len(mapFiles)
        n_other = nReduce
    case reducePhase:
        ntasks = nReduce
        n_other = len(mapFiles)
    }

    fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
    wg := sync.WaitGroup{}
    wg.Add(ntasks)

    //將任務(wù)放入隊(duì)列
    taskCh := make(chan int,ntasks)
    for idx:=0;idx<ntasks;idx++ {
        taskCh <- idx
    }

    //所有任務(wù)完成聪廉,關(guān)閉任務(wù)channel,退出
    go func() {
        wg.Wait()
        close(taskCh)
    }()


    for idx := range taskCh{
        arg := DoTaskArgs{
            JobName:       jobName,
            File:          mapFiles[idx],
            Phase:         phase,
            TaskNumber:    idx,
            NumOtherPhase: n_other,
        }

        worker := <- registerChan
        go func(worker string,arg DoTaskArgs,idx int) {
            if call(worker,"Worker.DoTask",arg,nil){
                wg.Done()
            }else { //call失敗故慈,將任務(wù)重新放回隊(duì)列
                taskCh <- idx
            }
            //任務(wù)結(jié)束板熊,歸還工作線程
            registerChan <- worker
        }(worker,arg,idx)

    }
    fmt.Printf("Schedule: %v done\n", phase)
}

Part IV: Handling worker failures

worker 工作失敗的例子
RPC失敗的原因:1.請(qǐng)求沒有達(dá)到,工作進(jìn)程沒有執(zhí)行任務(wù)察绷;2.工作進(jìn)程可能已經(jīng)執(zhí)行了它干签,但是應(yīng)答丟失;3.工作進(jìn)程可能仍然在執(zhí)行拆撼,但是主進(jìn)程的RPC超時(shí)了容劳。
代碼實(shí)現(xiàn)參考part 3;

Part V: Inverted index generation (optional, does not count in grade)

生成倒排索引

func mapF(document string, value string) (res []mapreduce.KeyValue) {
    words := strings.FieldsFunc(value, func(r rune) bool {
        return !unicode.IsLetter(r)
    })
    s := make(map[string]bool)
    for _, word := range words {
        lower := strings.ToLower(word)
        upper := strings.ToUpper(word)
        _, hasUpper := s[lower]
        _, hasLower := s[upper]
        if !hasLower && !hasUpper {
            if lower == word {
                s[lower] = true
            } else if upper == word {
                s[upper] = true
            }
        }
    }

    for k, _ := range s {
        res = append(res, mapreduce.KeyValue{k, document})
    }
    return
}

func reduceF(key string, values []string) string {
    sort.Strings(values)
    return strconv.Itoa(len(values)) + " " + strings.Join(values, ",")
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末闸度,一起剝皮案震驚了整個(gè)濱河市竭贩,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌莺禁,老刑警劉巖留量,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異哟冬,居然都是意外死亡楼熄,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門浩峡,熙熙樓的掌柜王于貴愁眉苦臉地迎上來可岂,“玉大人,你說我怎么就攤上這事红符∏啾” “怎么了伐债?”我有些...
    開封第一講書人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長致开。 經(jīng)常有香客問我峰锁,道長,這世上最難降的妖魔是什么双戳? 我笑而不...
    開封第一講書人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任虹蒋,我火速辦了婚禮,結(jié)果婚禮上飒货,老公的妹妹穿的比我還像新娘魄衅。我一直安慰自己,他們只是感情好塘辅,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開白布晃虫。 她就那樣靜靜地躺著,像睡著了一般扣墩。 火紅的嫁衣襯著肌膚如雪哲银。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,970評(píng)論 1 284
  • 那天呻惕,我揣著相機(jī)與錄音荆责,去河邊找鬼。 笑死亚脆,一個(gè)胖子當(dāng)著我的面吹牛做院,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播濒持,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼键耕,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了弥喉?” 一聲冷哼從身側(cè)響起郁竟,我...
    開封第一講書人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎由境,沒想到半個(gè)月后棚亩,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡虏杰,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年讥蟆,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片纺阔。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡瘸彤,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出笛钝,到底是詐尸還是另有隱情质况,我是刑警寧澤愕宋,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布,位于F島的核電站结榄,受9級(jí)特大地震影響中贝,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜臼朗,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一邻寿、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧视哑,春花似錦绣否、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至慷嗜,卻和暖如春淀弹,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背庆械。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留菌赖,地道東北人缭乘。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像琉用,于是被迫代替她去往敵國和親堕绩。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容