本篇文章將會介紹 Hadoop
重要的計算框架 MapReduce
。
完整的 MapReduce
框架包含兩部分:
- 算法邏輯層面心俗,即
map
、shuffle
以及reduce
三個重要算法組成部分峰尝,本篇文章將會介紹這個層面儒旬; - 實際運行層面,即算法邏輯作業(yè)在分布式主機中是以什么形式和什么流程運行的秸滴,因為自
MapReduce version2
以后武契,作業(yè)都是提交給YARN
進行管理,所以本文將不會介紹此部分荡含。
系列其他文章有:
一咒唆、What is MapReduce?
MapReduce
是一個基于 java 的并行分布式計算框架,使用它來編寫的數(shù)據(jù)處理應(yīng)用可以運行在大型的商用硬件集群上來處理大型數(shù)據(jù)集中的可并行化問題释液,數(shù)據(jù)處理可以發(fā)生在存儲在文件系統(tǒng)(非結(jié)構(gòu)化)或數(shù)據(jù)庫(結(jié)構(gòu)化)中的數(shù)據(jù)上全释。MapReduce
可以利用數(shù)據(jù)的位置,在存儲的位置附近處理數(shù)據(jù)误债,以最大限度地減少通信開銷浸船。
MapReduce
框架通過編組分布式服務(wù)器,并行運行各種任務(wù)寝蹈,管理系統(tǒng)各部分之間的所有通信和數(shù)據(jù)傳輸李命;其還能自動完成計算任務(wù)的并行化處理,自動劃分計算數(shù)據(jù)和計算任務(wù)箫老,在集群節(jié)點上自動分配和執(zhí)行任務(wù)以及收集計算結(jié)果封字,將數(shù)據(jù)分布存儲、數(shù)據(jù)通信槽惫、容錯處理等并行計算涉及到的很多系統(tǒng)底層的復雜細節(jié)交由系統(tǒng)負責處理周叮,減少開發(fā)人員的負擔。
MapReduce
還是一個并行程序設(shè)計模型與方法(Programming Model & Methodology)界斜。它借助于函數(shù)式程序設(shè)計語言Lisp的設(shè)計思想仿耽,提供了一種簡便的并行程序設(shè)計方法,將復雜的各薇、運行于大規(guī)模集群上的并行計算過程高度地抽象到了兩個函數(shù):Map和Reduce项贺,用Map和Reduce兩個函數(shù)編程實現(xiàn)基本的并行計算任務(wù),提供了抽象的操作和并行編程接口峭判,以簡單方便地完成大規(guī)模數(shù)據(jù)的編程和計算處理开缎。
二、The Algorithm
MapReduce框架通常由三個操作(或步驟)組成:
-
Map
:每個工作節(jié)點將map
函數(shù)應(yīng)用于本地數(shù)據(jù)林螃,并將輸出寫入臨時存儲奕删。主節(jié)點確保僅處理冗余輸入數(shù)據(jù)的一個副本。 -
Shuffle
:工作節(jié)點根據(jù)輸出鍵(由map
函數(shù)生成)重新分配數(shù)據(jù)疗认,對數(shù)據(jù)映射排序完残、分組伏钠、拷貝,目的是屬于一個鍵的所有數(shù)據(jù)都位于同一個工作節(jié)點上谨设。 -
Reduce
:工作節(jié)點現(xiàn)在并行處理每個鍵的每組輸出數(shù)據(jù)熟掂。
MapReduce 流程圖:
MapReduce
允許分布式運行 Map
操作,只要每個 Map
操作獨立于其他 Map
操作就可以并行執(zhí)行扎拣。
另一種更詳細的赴肚,將 MapReduce
分為5個步驟的理解是:
-
Prepare the Map() input:
MapReduce
框架先指定Map
處理器,然后給其分配將要處理的輸入數(shù)據(jù) -- 鍵值對K1
二蓝,并為該處理器提供與該鍵值相關(guān)的所有輸入數(shù)據(jù)誉券; -
Run the user-provided Map() code:
Map()
在K1
鍵值對上運行一次,生成由K2
指定的鍵值對的輸出侣夷; -
Shuffle the Map output to the Reduce processors:將先前生成的
K2
鍵值對横朋,根據(jù)『鍵』是否相同移至相同的工作節(jié)點; -
Run the user-provided Reduce() code:對于每個工作節(jié)點上的
K2
鍵值對進行Reduce()
操作百拓; -
Produce the final output:
MapReduce
框架收集所有Reduce
輸出琴锭,并按K2
對其進行排序以產(chǎn)生最終結(jié)果進行輸出。
[圖片上傳失敗...(image-8859b-1538542155610)]
實際生產(chǎn)環(huán)境中衙传,數(shù)據(jù)很有可能是分散在各個服務(wù)器上决帖,對于原先的大數(shù)據(jù)處理方法,則是將數(shù)據(jù)發(fā)送至代碼所在的地方進行處理蓖捶,這樣非常低效且占用了大量的帶寬地回,為應(yīng)對這種情況,MapReduce
框架的處理方法是俊鱼,將 Map()
操作或者 Reduce()
發(fā)送至數(shù)據(jù)所在的服務(wù)器上刻像,以『移動計算替代移動數(shù)據(jù)』,來加速整個框架的運行速度并闲,大多數(shù)計算都發(fā)生在具有本地磁盤上數(shù)據(jù)的節(jié)點上细睡,從而減少了網(wǎng)絡(luò)流量。
Mapper
一個 Map
函數(shù)就是對一些獨立元素組成的概念上的列表的每一個元素進行指定的操作帝火,所以每個元素都是被獨立操作的溜徙,而原始列表沒有被更改,因為這里創(chuàng)建了一個新的列表來保存新的答案犀填。這就是說蠢壹,Map
操作是可以高度并行的
MapReduce
框架的 Map
和 Reduce
函數(shù)都是根據(jù) (key, value)
形式的數(shù)據(jù)結(jié)構(gòu)定義的。 Map
在一個數(shù)據(jù)域(Data Domain)中獲取一個鍵值對九巡,然后返回一個鍵值對的列表:
Map(k1,v1) → list(k2,v2)
Map
函數(shù)會被并行調(diào)用图贸,應(yīng)用于輸入數(shù)據(jù)集中的每個鍵值對(keyed by K1)。然后每個調(diào)用返回一個鍵值對(keyed by K2)列表。之后求妹,MapReduce
框架從所有列表中收集具有相同 key
(這里是 k2)的所有鍵值對乏盐,并將它們組合在一起佳窑,為每個 key
創(chuàng)建一個組制恍。
Reducer
而 Reduce
是對一個列表的元素進行適當?shù)暮喜ⅰkm然不如 Map
函數(shù)那么并行神凑,但是因為化簡總是有一個簡單的答案净神,大規(guī)模的運算相對獨立,所以化簡函數(shù)在高度并行環(huán)境下也很有用溉委。Reduce
函數(shù)并行應(yīng)用于每個組鹃唯,從而在同一個數(shù)據(jù)域中生成一組值:
Reduce(k2, list (v2)) → list(v3)
Reduce
端接收到不同任務(wù)傳來的有序數(shù)據(jù)組。此時 Reduce()
會根據(jù)程序猿編寫的代碼邏輯進行相應(yīng)的 reduce
操作瓣喊,例如根據(jù)同一個鍵值對進行計數(shù)加和等坡慌。如果Reduce
端接受的數(shù)據(jù)量相當小,則直接存儲在內(nèi)存中藻三,如果數(shù)據(jù)量超過了該緩沖區(qū)大小的一定比例洪橘,則對數(shù)據(jù)合并后溢寫到磁盤中。
Partitioner
前面提到過棵帽,Map
階段有一個分割成組的操作熄求,這個劃分數(shù)據(jù)的過程就是 Partition
,而負責分區(qū)的 java 類就是 Partitioner
逗概。
Partitioner
組件可以讓 Map
對 Key
進行分區(qū)弟晚,從而將不同分區(qū)的 Key
交由不同的 Reduce
處理,由此逾苫,Partitioner
數(shù)量等同于 Reducer
的數(shù)量卿城,一個 Partitioner
對應(yīng)一個 Reduce
作業(yè),可認為其就是 Reduce
的輸入分片铅搓,可根據(jù)實際業(yè)務(wù)情況編程控制瑟押,提高 Reduce
效率或進行負載均衡。MapReduce
的內(nèi)置分區(qū)是HashPartition
狸吞。
具有多個分割總是有好處的勉耀,因為與處理整個輸入所花費的時間相比,處理分割所花費的時間很短蹋偏。當分割較小時便斥,可以更好的處理負載平衡,但是分割也不宜太小威始,如果過小枢纠,則會使得管理拆分和任務(wù)加載的時間在總運行時間中占過高的比重。
下圖是 map
任務(wù)和 reduce
任務(wù)的示意圖:
三黎棠、WordCount Example
這里給出一個統(tǒng)計詞頻案例的 Java 代碼:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
// 繼承 Mapper 類晋渺,實現(xiàn)自己的 map 功能
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
// map 功能必須實現(xiàn)的函數(shù)
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
// 繼承 Reducer 類镰绎,實現(xiàn)自己的 reduce 功能
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
// 初始化Configuration,讀取mapreduce系統(tǒng)配置信息
Configuration conf = new Configuration();
// 構(gòu)建 Job 并且加載計算程序 WordCount.class
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
//指定 Mapper木西、Combiner畴栖、Reducer,也就是我們自己繼承實現(xiàn)的類
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// 設(shè)置輸入輸出數(shù)據(jù)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
上述代碼會發(fā)現(xiàn)在指定 Mapper
以及 Reducer
時八千,還指定了 Combiner
類吗讶,Combiner
是一個本地化的 reduce
操作(因此我們看見 WordCount
類里是用 reduce
進行加載的),它是 map
運算的后續(xù)操作恋捆,與 map
在同一個主機上進行照皆,主要是在 map
計算出中間文件前做一個簡單的合并重復key值的操作,減少中間文件的大小沸停,這樣在后續(xù)進行到 Shuffle
時膜毁,可以降低網(wǎng)絡(luò)傳輸成本,提高網(wǎng)絡(luò)傳輸效率愤钾。
提交 MR
作業(yè)的命令:
hadoop jar {程序的 jar 包} {任務(wù)名稱} {數(shù)據(jù)輸入路徑} {數(shù)據(jù)輸出路徑}
例如:
hadoop jar hadoop-mapreduce-wordcount.jar WordCount /sample/input /sample/output
上述代碼示意圖:
Map -> Shuffle -> Reduce 的中間結(jié)果瘟滨,包括最后的輸出都是存儲在本地磁盤上。
四绰垂、Advantage & Shortcoming of MapReduce
MapReduce
的兩大優(yōu)勢是:
1 ) 并行處理:
在 MapReduce
中室奏,我們將作業(yè)劃分為多個節(jié)點,每個節(jié)點同時處理作業(yè)的一部分劲装。因此胧沫,MapReduce
基于Divide and Conquer范例,它幫助我們使用不同的機器處理數(shù)據(jù)占业。由于數(shù)據(jù)由多臺機器而不是單臺機器并行處理绒怨,因此處理數(shù)據(jù)所需的時間會減少很多。
2 ) 數(shù)據(jù)位置:
我們將計算移動到 MapReduce
框架中的數(shù)據(jù)谦疾,而不是將數(shù)據(jù)移動到計算部分南蹂。數(shù)據(jù)分布在多個節(jié)點中,其中每個節(jié)點處理駐留在其上的數(shù)據(jù)部分念恍。
這使得具有以下優(yōu)勢:
- 將處理單元移動到數(shù)據(jù)所在位置可以降低網(wǎng)絡(luò)成本六剥;
- 由于所有節(jié)點并行處理其部分數(shù)據(jù),因此處理時間縮短峰伙;
- 每個節(jié)點都會獲取要處理的數(shù)據(jù)的一部分疗疟,因此節(jié)點不會出現(xiàn)負擔過重的可能性。
但是瞳氓,MapReduce 也有其限制:
- 不能進行流式計算和實時計算策彤,只能計算離線數(shù)據(jù);
- 中間結(jié)果存儲在磁盤上,加大了磁盤的 I/O 負載店诗,且讀取速度比較慢裹刮;
- 開發(fā)麻煩,例如
wordcount
功能就需要很多的設(shè)置和代碼量庞瘸,而Spark
將會非常簡單捧弃。