Hadoop 學習系列(四)之 MapReduce 原理講解

本篇文章將會介紹 Hadoop 重要的計算框架 MapReduce

完整的 MapReduce 框架包含兩部分:

  1. 算法邏輯層面心俗,即 mapshuffle 以及 reduce 三個重要算法組成部分峰尝,本篇文章將會介紹這個層面儒旬;
  2. 實際運行層面,即算法邏輯作業(yè)在分布式主機中是以什么形式和什么流程運行的秸滴,因為自 MapReduce version2 以后武契,作業(yè)都是提交給 YARN 進行管理,所以本文將不會介紹此部分荡含。

系列其他文章有:

一咒唆、What is MapReduce?

MapReduce是一個基于 java 的并行分布式計算框架,使用它來編寫的數(shù)據(jù)處理應(yīng)用可以運行在大型的商用硬件集群上來處理大型數(shù)據(jù)集中的可并行化問題释液,數(shù)據(jù)處理可以發(fā)生在存儲在文件系統(tǒng)(非結(jié)構(gòu)化)或數(shù)據(jù)庫(結(jié)構(gòu)化)中的數(shù)據(jù)上全释。MapReduce 可以利用數(shù)據(jù)的位置,在存儲的位置附近處理數(shù)據(jù)误债,以最大限度地減少通信開銷浸船。

MapReduce 框架通過編組分布式服務(wù)器,并行運行各種任務(wù)寝蹈,管理系統(tǒng)各部分之間的所有通信和數(shù)據(jù)傳輸李命;其還能自動完成計算任務(wù)的并行化處理,自動劃分計算數(shù)據(jù)和計算任務(wù)箫老,在集群節(jié)點上自動分配和執(zhí)行任務(wù)以及收集計算結(jié)果封字,將數(shù)據(jù)分布存儲、數(shù)據(jù)通信槽惫、容錯處理等并行計算涉及到的很多系統(tǒng)底層的復雜細節(jié)交由系統(tǒng)負責處理周叮,減少開發(fā)人員的負擔。

MapReduce 還是一個并行程序設(shè)計模型與方法(Programming Model & Methodology)界斜。它借助于函數(shù)式程序設(shè)計語言Lisp的設(shè)計思想仿耽,提供了一種簡便的并行程序設(shè)計方法,將復雜的各薇、運行于大規(guī)模集群上的并行計算過程高度地抽象到了兩個函數(shù):Map和Reduce项贺,用Map和Reduce兩個函數(shù)編程實現(xiàn)基本的并行計算任務(wù),提供了抽象的操作和并行編程接口峭判,以簡單方便地完成大規(guī)模數(shù)據(jù)的編程和計算處理开缎。

二、The Algorithm

MapReduce框架通常由三個操作(或步驟)組成:

  1. Map:每個工作節(jié)點將 map 函數(shù)應(yīng)用于本地數(shù)據(jù)林螃,并將輸出寫入臨時存儲奕删。主節(jié)點確保僅處理冗余輸入數(shù)據(jù)的一個副本。
  2. Shuffle:工作節(jié)點根據(jù)輸出鍵(由 map 函數(shù)生成)重新分配數(shù)據(jù)疗认,對數(shù)據(jù)映射排序完残、分組伏钠、拷貝,目的是屬于一個鍵的所有數(shù)據(jù)都位于同一個工作節(jié)點上谨设。
  3. Reduce:工作節(jié)點現(xiàn)在并行處理每個鍵的每組輸出數(shù)據(jù)熟掂。

MapReduce 流程圖:


image

MapReduce 允許分布式運行 Map 操作,只要每個 Map 操作獨立于其他 Map 操作就可以并行執(zhí)行扎拣。

另一種更詳細的赴肚,將 MapReduce 分為5個步驟的理解是:

  1. Prepare the Map() inputMapReduce 框架先指定 Map 處理器,然后給其分配將要處理的輸入數(shù)據(jù) -- 鍵值對 K1二蓝,并為該處理器提供與該鍵值相關(guān)的所有輸入數(shù)據(jù)誉券;
  2. Run the user-provided Map() codeMap()K1 鍵值對上運行一次,生成由 K2 指定的鍵值對的輸出侣夷;
  3. Shuffle the Map output to the Reduce processors:將先前生成的 K2 鍵值對横朋,根據(jù)『鍵』是否相同移至相同的工作節(jié)點;
  4. Run the user-provided Reduce() code:對于每個工作節(jié)點上的 K2 鍵值對進行 Reduce() 操作百拓;
  5. Produce the final outputMapReduce 框架收集所有 Reduce 輸出琴锭,并按 K2 對其進行排序以產(chǎn)生最終結(jié)果進行輸出。

[圖片上傳失敗...(image-8859b-1538542155610)]

實際生產(chǎn)環(huán)境中衙传,數(shù)據(jù)很有可能是分散在各個服務(wù)器上决帖,對于原先的大數(shù)據(jù)處理方法,則是將數(shù)據(jù)發(fā)送至代碼所在的地方進行處理蓖捶,這樣非常低效且占用了大量的帶寬地回,為應(yīng)對這種情況,MapReduce 框架的處理方法是俊鱼,將 Map() 操作或者 Reduce() 發(fā)送至數(shù)據(jù)所在的服務(wù)器上刻像,以『移動計算替代移動數(shù)據(jù)』,來加速整個框架的運行速度并闲,大多數(shù)計算都發(fā)生在具有本地磁盤上數(shù)據(jù)的節(jié)點上细睡,從而減少了網(wǎng)絡(luò)流量。

Mapper

一個 Map 函數(shù)就是對一些獨立元素組成的概念上的列表的每一個元素進行指定的操作帝火,所以每個元素都是被獨立操作的溜徙,而原始列表沒有被更改,因為這里創(chuàng)建了一個新的列表來保存新的答案犀填。這就是說蠢壹,Map 操作是可以高度并行的

MapReduce 框架的 MapReduce 函數(shù)都是根據(jù) (key, value) 形式的數(shù)據(jù)結(jié)構(gòu)定義的。 Map 在一個數(shù)據(jù)域(Data Domain)中獲取一個鍵值對九巡,然后返回一個鍵值對的列表:

Map(k1,v1) → list(k2,v2)

Map 函數(shù)會被并行調(diào)用图贸,應(yīng)用于輸入數(shù)據(jù)集中的每個鍵值對(keyed by K1)。然后每個調(diào)用返回一個鍵值對(keyed by K2)列表。之后求妹,MapReduce 框架從所有列表中收集具有相同 key(這里是 k2)的所有鍵值對乏盐,并將它們組合在一起佳窑,為每個 key 創(chuàng)建一個組制恍。

Reducer

Reduce 是對一個列表的元素進行適當?shù)暮喜ⅰkm然不如 Map 函數(shù)那么并行神凑,但是因為化簡總是有一個簡單的答案净神,大規(guī)模的運算相對獨立,所以化簡函數(shù)在高度并行環(huán)境下也很有用溉委。Reduce 函數(shù)并行應(yīng)用于每個組鹃唯,從而在同一個數(shù)據(jù)域中生成一組值:

Reduce(k2, list (v2)) → list(v3)

Reduce 端接收到不同任務(wù)傳來的有序數(shù)據(jù)組。此時 Reduce() 會根據(jù)程序猿編寫的代碼邏輯進行相應(yīng)的 reduce 操作瓣喊,例如根據(jù)同一個鍵值對進行計數(shù)加和等坡慌。如果Reduce 端接受的數(shù)據(jù)量相當小,則直接存儲在內(nèi)存中藻三,如果數(shù)據(jù)量超過了該緩沖區(qū)大小的一定比例洪橘,則對數(shù)據(jù)合并后溢寫到磁盤中。

Partitioner

前面提到過棵帽,Map 階段有一個分割成組的操作熄求,這個劃分數(shù)據(jù)的過程就是 Partition,而負責分區(qū)的 java 類就是 Partitioner逗概。

Partitioner 組件可以讓 MapKey 進行分區(qū)弟晚,從而將不同分區(qū)的 Key 交由不同的 Reduce 處理,由此逾苫,Partitioner 數(shù)量等同于 Reducer 的數(shù)量卿城,一個 Partitioner 對應(yīng)一個 Reduce 作業(yè),可認為其就是 Reduce 的輸入分片铅搓,可根據(jù)實際業(yè)務(wù)情況編程控制瑟押,提高 Reduce 效率或進行負載均衡。MapReduce 的內(nèi)置分區(qū)是HashPartition狸吞。

具有多個分割總是有好處的勉耀,因為與處理整個輸入所花費的時間相比,處理分割所花費的時間很短蹋偏。當分割較小時便斥,可以更好的處理負載平衡,但是分割也不宜太小威始,如果過小枢纠,則會使得管理拆分和任務(wù)加載的時間在總運行時間中占過高的比重。

下圖是 map 任務(wù)和 reduce 任務(wù)的示意圖:

image

三黎棠、WordCount Example

這里給出一個統(tǒng)計詞頻案例的 Java 代碼:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  // 繼承 Mapper 類晋渺,實現(xiàn)自己的 map 功能
  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    // map 功能必須實現(xiàn)的函數(shù)
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  // 繼承 Reducer 類镰绎,實現(xiàn)自己的 reduce 功能
  public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    // 初始化Configuration,讀取mapreduce系統(tǒng)配置信息
    Configuration conf = new Configuration();

    // 構(gòu)建 Job 并且加載計算程序 WordCount.class
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);

    //指定 Mapper木西、Combiner畴栖、Reducer,也就是我們自己繼承實現(xiàn)的類
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);

    // 設(shè)置輸入輸出數(shù)據(jù)
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

上述代碼會發(fā)現(xiàn)在指定 Mapper 以及 Reducer 時八千,還指定了 Combiner 類吗讶,Combiner 是一個本地化的 reduce 操作(因此我們看見 WordCount 類里是用 reduce 進行加載的),它是 map 運算的后續(xù)操作恋捆,與 map 在同一個主機上進行照皆,主要是在 map 計算出中間文件前做一個簡單的合并重復key值的操作,減少中間文件的大小沸停,這樣在后續(xù)進行到 Shuffle 時膜毁,可以降低網(wǎng)絡(luò)傳輸成本,提高網(wǎng)絡(luò)傳輸效率愤钾。

提交 MR 作業(yè)的命令:

hadoop jar {程序的 jar 包} {任務(wù)名稱} {數(shù)據(jù)輸入路徑} {數(shù)據(jù)輸出路徑}

例如:

hadoop jar hadoop-mapreduce-wordcount.jar WordCount /sample/input /sample/output

上述代碼示意圖:

image

Map -> Shuffle -> Reduce 的中間結(jié)果瘟滨,包括最后的輸出都是存儲在本地磁盤上。

四绰垂、Advantage & Shortcoming of MapReduce

MapReduce 的兩大優(yōu)勢是:

1 ) 并行處理:

MapReduce 中室奏,我們將作業(yè)劃分為多個節(jié)點,每個節(jié)點同時處理作業(yè)的一部分劲装。因此胧沫,MapReduce 基于Divide and Conquer范例,它幫助我們使用不同的機器處理數(shù)據(jù)占业。由于數(shù)據(jù)由多臺機器而不是單臺機器并行處理绒怨,因此處理數(shù)據(jù)所需的時間會減少很多。

2 ) 數(shù)據(jù)位置:

我們將計算移動到 MapReduce 框架中的數(shù)據(jù)谦疾,而不是將數(shù)據(jù)移動到計算部分南蹂。數(shù)據(jù)分布在多個節(jié)點中,其中每個節(jié)點處理駐留在其上的數(shù)據(jù)部分念恍。

這使得具有以下優(yōu)勢:

  • 將處理單元移動到數(shù)據(jù)所在位置可以降低網(wǎng)絡(luò)成本六剥;
  • 由于所有節(jié)點并行處理其部分數(shù)據(jù),因此處理時間縮短峰伙;
  • 每個節(jié)點都會獲取要處理的數(shù)據(jù)的一部分疗疟,因此節(jié)點不會出現(xiàn)負擔過重的可能性。

但是瞳氓,MapReduce 也有其限制:

  1. 不能進行流式計算和實時計算策彤,只能計算離線數(shù)據(jù);
  2. 中間結(jié)果存儲在磁盤上,加大了磁盤的 I/O 負載店诗,且讀取速度比較慢裹刮;
  3. 開發(fā)麻煩,例如 wordcount 功能就需要很多的設(shè)置和代碼量庞瘸,而 Spark 將會非常簡單捧弃。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市恕洲,隨后出現(xiàn)的幾起案子塔橡,更是在濱河造成了極大的恐慌,老刑警劉巖霜第,帶你破解...
    沈念sama閱讀 211,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異户辞,居然都是意外死亡泌类,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評論 3 385
  • 文/潘曉璐 我一進店門底燎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來刃榨,“玉大人,你說我怎么就攤上這事双仍∈嘞#” “怎么了?”我有些...
    開封第一講書人閱讀 157,435評論 0 348
  • 文/不壞的土叔 我叫張陵朱沃,是天一觀的道長苞轿。 經(jīng)常有香客問我,道長逗物,這世上最難降的妖魔是什么搬卒? 我笑而不...
    開封第一講書人閱讀 56,509評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮翎卓,結(jié)果婚禮上契邀,老公的妹妹穿的比我還像新娘。我一直安慰自己失暴,他們只是感情好坯门,可當我...
    茶點故事閱讀 65,611評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著逗扒,像睡著了一般古戴。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上缴阎,一...
    開封第一講書人閱讀 49,837評論 1 290
  • 那天允瞧,我揣著相機與錄音,去河邊找鬼。 笑死述暂,一個胖子當著我的面吹牛痹升,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播畦韭,決...
    沈念sama閱讀 38,987評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼疼蛾,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了艺配?” 一聲冷哼從身側(cè)響起察郁,我...
    開封第一講書人閱讀 37,730評論 0 267
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎转唉,沒想到半個月后皮钠,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,194評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡赠法,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,525評論 2 327
  • 正文 我和宋清朗相戀三年肖爵,在試婚紗的時候發(fā)現(xiàn)自己被綠了怜珍。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,664評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖驻呐,靈堂內(nèi)的尸體忽然破棺而出潜的,到底是詐尸還是另有隱情嘉栓,我是刑警寧澤碍讨,帶...
    沈念sama閱讀 34,334評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站眶熬,受9級特大地震影響妹笆,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜聋涨,卻給世界環(huán)境...
    茶點故事閱讀 39,944評論 3 313
  • 文/蒙蒙 一晾浴、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧牍白,春花似錦脊凰、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至最岗,卻和暖如春帕胆,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背般渡。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評論 1 266
  • 我被黑心中介騙來泰國打工懒豹, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留芙盘,地道東北人。 一個月前我還...
    沈念sama閱讀 46,389評論 2 360
  • 正文 我出身青樓脸秽,卻偏偏與公主長得像儒老,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子记餐,可洞房花燭夜當晚...
    茶點故事閱讀 43,554評論 2 349

推薦閱讀更多精彩內(nèi)容