第一個(gè)MapReduce程序——WordCount
[TOC]
一烈菌、MapReduce簡(jiǎn)介
1.1 MapReduce編程模型
MapReduce采用”分而治之”的思想碘饼,把對(duì)大規(guī)模數(shù)據(jù)集的操作,分發(fā)給一個(gè)主節(jié)點(diǎn)管理下的各個(gè)分節(jié)點(diǎn)共同完成努隙,然后通過(guò)整合各個(gè)節(jié)點(diǎn)的中間結(jié)果,得到最終結(jié)果。簡(jiǎn)單地說(shuō)癌别,MapReduce就是”任務(wù)的分解與結(jié)果的匯總”。
在Hadoop中蹋笼,用于執(zhí)行MapReduce任務(wù)的機(jī)器角色有兩個(gè):
- JobTracker用于調(diào)度工作的展姐,一個(gè)Hadoop集群中只有一個(gè)JobTracker,位于master剖毯。
- TaskTracker用于執(zhí)行工作圾笨,位于各slave上。
在分布式計(jì)算中逊谋,MapReduce框架負(fù)責(zé)處理了并行編程中分布式存儲(chǔ)擂达、工作調(diào)度、負(fù)載均衡涣狗、容錯(cuò)均衡谍婉、容錯(cuò)處理以及網(wǎng)絡(luò)通信等復(fù)雜問(wèn)題舒憾,把處理過(guò)程高度抽象為兩個(gè)函數(shù):map和reduce,map負(fù)責(zé)把任務(wù)分解成多個(gè)任務(wù)穗熬,reduce負(fù)責(zé)把分解后多任務(wù)處理的結(jié)果匯總起來(lái)镀迂。
需要注意的是,用MapReduce來(lái)處理的數(shù)據(jù)集(或任務(wù))必須具備這樣的特點(diǎn):待處理的數(shù)據(jù)集可以分解成許多小的數(shù)據(jù)集唤蔗,而且每一個(gè)小數(shù)據(jù)集都可以完全并行地進(jìn)行處理探遵。
1.2 MapReduce工作過(guò)程
對(duì)于一個(gè)MR任務(wù),它的輸入妓柜、輸出以及中間結(jié)果都是<key, value>
鍵值對(duì):
- Map:
<k1, v1>
——>list(<k2, v2>)
- Reduce:
<k2, list(v2)>
——>list(<k3, v3>)
MR程序的執(zhí)行過(guò)程主要分為三步:Map階段箱季、Shuffle階段、Reduce階段棍掐,如下圖:
-
Map階段
- 分片(Split):map階段的輸入通常是HDFS上文件藏雏,在運(yùn)行Mapper前,F(xiàn)ileInputFormat會(huì)將輸入文件分割成多個(gè)split ——1個(gè)split至少包含1個(gè)HDFS的Block(默認(rèn)為64M)作煌;然后每一個(gè)分片運(yùn)行一個(gè)map進(jìn)行處理掘殴。
- 執(zhí)行(Map):對(duì)輸入分片中的每個(gè)鍵值對(duì)調(diào)用
map()
函數(shù)進(jìn)行運(yùn)算,然后輸出一個(gè)結(jié)果鍵值對(duì)粟誓。- Partitioner:對(duì)
map()
的輸出進(jìn)行partition奏寨,即根據(jù)key或value及reduce的數(shù)量來(lái)決定當(dāng)前的這對(duì)鍵值對(duì)最終應(yīng)該交由哪個(gè)reduce處理。默認(rèn)是對(duì)key哈希后再以reduce task數(shù)量取模鹰服,默認(rèn)的取模方式只是為了避免數(shù)據(jù)傾斜病瞳。然后該key/value對(duì)以及partitionIdx的結(jié)果都會(huì)被寫(xiě)入環(huán)形緩沖區(qū)。
- Partitioner:對(duì)
- 溢寫(xiě)(Spill):map輸出寫(xiě)在內(nèi)存中的環(huán)形緩沖區(qū)悲酷,默認(rèn)當(dāng)緩沖區(qū)滿80%套菜,啟動(dòng)溢寫(xiě)線程,將緩沖的數(shù)據(jù)寫(xiě)出到磁盤(pán)舔涎。
- Sort:在溢寫(xiě)到磁盤(pán)之前笼踩,使用快排對(duì)緩沖區(qū)數(shù)據(jù)按照partitionIdx, key排序。(每個(gè)partitionIdx表示一個(gè)分區(qū)亡嫌,一個(gè)分區(qū)對(duì)應(yīng)一個(gè)reduce)
- Combiner:如果設(shè)置了Combiner嚎于,那么在Sort之后,還會(huì)對(duì)具有相同key的鍵值對(duì)進(jìn)行合并挟冠,減少溢寫(xiě)到磁盤(pán)的數(shù)據(jù)量于购。
- 合并(Merge):溢寫(xiě)可能會(huì)生成多個(gè)文件,這時(shí)需要將多個(gè)文件合并成一個(gè)文件知染。合并的過(guò)程中會(huì)不斷地進(jìn)行 sort & combine 操作肋僧,最后合并成了一個(gè)已分區(qū)且已排序的文件。
-
Shuffle階段:廣義上Shuffle階段橫跨Map端和Reduce端,在Map端包括Spill過(guò)程嫌吠,在Reduce端包括copy和merge/sort過(guò)程止潘。通常認(rèn)為Shuffle階段就是將map的輸出作為reduce的輸入的過(guò)程
- Copy過(guò)程:Reduce端啟動(dòng)一些copy線程,通過(guò)HTTP方式將map端輸出文件中屬于自己的部分拉取到本地辫诅。Reduce會(huì)從多個(gè)map端拉取數(shù)據(jù)凭戴,并且每個(gè)map的數(shù)據(jù)都是有序的。
- Merge過(guò)程:Copy過(guò)來(lái)的數(shù)據(jù)會(huì)先放入內(nèi)存緩沖區(qū)中炕矮,這里的緩沖區(qū)比較大么夫;當(dāng)緩沖區(qū)數(shù)據(jù)量達(dá)到一定閾值時(shí),將數(shù)據(jù)溢寫(xiě)到磁盤(pán)(與map端類似肤视,溢寫(xiě)過(guò)程會(huì)執(zhí)行 sort & combine)档痪。如果生成了多個(gè)溢寫(xiě)文件,它們會(huì)被merge成一個(gè)有序的最終文件邢滑。這個(gè)過(guò)程也會(huì)不停地執(zhí)行 sort & combine 操作腐螟。
-
Reduce階段:Shuffle階段最終生成了一個(gè)有序的文件作為Reduce的輸入,對(duì)于該文件中的每一個(gè)鍵值對(duì)調(diào)用
reduce()
方法困后,并將結(jié)果寫(xiě)到HDFS遭垛。
二、運(yùn)行WordCount程序
在運(yùn)行程序之前操灿,需要先搭建好Hadoop集群環(huán)境,參考《Hadoop+HBase+ZooKeeper分布式集群環(huán)境搭建》泵督。
2.1 源代碼
WordCount可以說(shuō)是最簡(jiǎn)單的MapReduce程序了趾盐,只包含三個(gè)文件:一個(gè) Map 的 Java 文件,一個(gè) Reduce 的 Java 文件小腊,一個(gè)負(fù)責(zé)調(diào)用的主程序 Java 文件救鲤。
我們?cè)诋?dāng)前用戶的主文件夾下創(chuàng)建wordcount_01/
目錄,在該目錄下再創(chuàng)建src/
和classes/
秩冈。 src 目錄存放 Java 的源代碼本缠,classes 目錄存放編譯結(jié)果。
TokenizerMapper.java
package com.lisong.hadoop;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
IntWritable one = new IntWritable(1);
Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException,InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
IntSumReducer.java
package com.lisong.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException {
int sum = 0;
for(IntWritable val:values) {
sum += val.get();
}
result.set(sum);
context.write(key,result);
}
}
WordCount.java
package com.lisong.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "wordcount");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
以上三個(gè).java源文件均置于 src 目錄下入问。
2.2 編譯
Hadoop 2.x 版本中jar不再集中在一個(gè) hadoop-core-*.jar 中丹锹,而是分成多個(gè) jar。編譯WordCount程序需要如下三個(gè) jar:
$HADOOP_HOME/share/hadoop/common/hadoop-common-2.4.1.jar
$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.4.1.jar
$HADOOP_HOME/share/hadoop/common/lib/commons-cli-1.2.jar
2.3 打包
2.4 執(zhí)行
執(zhí)行hadoop程序的時(shí)候芬失,輸入文件必須先放入hdfs文件系統(tǒng)中楣黍,不能是本地文件。
1 . 先查看hdfs文件系統(tǒng)的根目錄:
2 . 然后利用put將輸入文件(多個(gè)輸入文件位于input文件夾下)復(fù)制到hdfs文件系統(tǒng)中:
3 . 運(yùn)行wordcount程序
4 . 查看運(yùn)行結(jié)果
三棱烂、WordCount程序分析
3.1 Hadoop數(shù)據(jù)類型
Hadoop MapReduce操作的是鍵值對(duì)租漂,但這些鍵值對(duì)并不是Integer、String等標(biāo)準(zhǔn)的Java類型。為了讓鍵值對(duì)可以在集群上移動(dòng)哩治,Hadoop提供了一些實(shí)現(xiàn)了WritableComparable
接口的基本數(shù)據(jù)類型秃踩,以便用這些類型定義的數(shù)據(jù)可以被序列化進(jìn)行網(wǎng)絡(luò)傳輸、文件存儲(chǔ)與大小比較业筏。
- 值:僅會(huì)被簡(jiǎn)單的傳遞憔杨,必須實(shí)現(xiàn)
Writable
或WritableComparable
接口。 - 鍵:在Reduce階段排序時(shí)需要進(jìn)行比較驾孔,故只能實(shí)現(xiàn)
WritableComparable
接口芍秆。
下面是8個(gè)預(yù)定義的Hadoop基本數(shù)據(jù)類型,它們均實(shí)現(xiàn)了WritableComparable
接口:
類 | 描述 |
---|---|
BooleanWritable | 標(biāo)準(zhǔn)布爾型數(shù)值 |
ByteWritable | 單字節(jié)數(shù)值 |
DoubleWritable | 雙字節(jié)數(shù) |
FloatWritable | 浮點(diǎn)數(shù) |
IntWritable | 整型數(shù) |
LongWritable | 長(zhǎng)整型數(shù) |
Text | 使用UTF8格式存儲(chǔ)的文本 |
NullWritable | 當(dāng)<key,value> 中的key或value為空時(shí)使用 |
3.2 源代碼分析
3.2.1 Map過(guò)程
package com.lisong.hadoop;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
IntWritable one = new IntWritable(1);
Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException,InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
Map過(guò)程需要繼承org.apache.hadoop.mapreduce
包中 Mapper 類翠勉,并重寫(xiě)其map方法妖啥。
public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>11
其中的模板參數(shù):第一個(gè)Object表示輸入key的類型;第二個(gè)Text表示輸入value的類型对碌;第三個(gè)Text表示表示輸出鍵的類型荆虱;第四個(gè)IntWritable表示輸出值的類型。
作為map方法輸入的鍵值對(duì)朽们,其value值存儲(chǔ)的是文本文件中的一行(以回車符為行結(jié)束標(biāo)記)怀读,而key值為該行的首字母相對(duì)于文本文件的首地址的偏移量。然后StringTokenizer類將每一行拆分成為一個(gè)個(gè)的單詞骑脱,并將<word,1>
作為map方法的結(jié)果輸出菜枷,其余的工作都交有 MapReduce框架 處理。
注:StringTokenizer
是Java工具包中的一個(gè)類叁丧,用于將字符串進(jìn)行拆分——默認(rèn)情況下使用空格作為分隔符進(jìn)行分割啤誊。
3.2.2 Reduce過(guò)程
package com.lisong.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException {
int sum = 0;
for(IntWritable val:values) {
sum += val.get();
}
result.set(sum);
context.write(key,result);
}
}
Reduce過(guò)程需要繼承org.apache.hadoop.mapreduce
包中 Reducer 類,并 重寫(xiě) reduce方法拥娄。
public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>11
其中模板參數(shù)同Map一樣蚊锹,依次表示是輸入鍵類型,輸入值類型稚瘾,輸出鍵類型牡昆,輸出值類型。
public void reduce(Text key, Iterable<IntWritable> values, Context context)11
reduce 方法的輸入?yún)?shù) key 為單個(gè)單詞摊欠,而 values 是由各Mapper上對(duì)應(yīng)單詞的計(jì)數(shù)值所組成的列表(一個(gè)實(shí)現(xiàn)了 Iterable 接口的變量丢烘,可以理解成 values 里包含若干個(gè) IntWritable 整數(shù),可以通過(guò)迭代的方式遍歷所有的值)些椒,所以只要遍歷 values 并求和铅协,即可得到某個(gè)單詞出現(xiàn)的總次數(shù)。
3.2.3 執(zhí)行作業(yè)
package com.lisong.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "wordcount");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
在MapReduce中摊沉,由Job對(duì)象負(fù)責(zé)管理和運(yùn)行一個(gè)計(jì)算任務(wù)狐史,并通過(guò)Job的一些方法對(duì)任務(wù)的參數(shù)進(jìn)行相關(guān)的設(shè)置,此處:
- 設(shè)置了使用
TokenizerMapper.class
完成Map過(guò)程中的處理,使用IntSumReducer.class
完成Combine和Reduce過(guò)程中的處理骏全。 - 還設(shè)置了Map過(guò)程和Reduce過(guò)程的輸出類型:key的類型為T(mén)ext苍柏,value的類型為IntWritable。
- 任務(wù)的輸出和輸入路徑則由命令行參數(shù)指定姜贡,并由FileInputFormat和FileOutputFormat分別設(shè)定试吁。
- FileInputFormat類的很重要的作用就是將文件進(jìn)行切分 split,并將 split 進(jìn)一步拆分成key/value對(duì)
- FileOutputFormat類的作用是將處理結(jié)果寫(xiě)入輸出文件楼咳。
- 完成相應(yīng)任務(wù)的參數(shù)設(shè)定后熄捍,即可調(diào)用
job.waitForCompletion()
方法執(zhí)行任務(wù)。
3.2.4 WordCount流程
1)將文件拆分成splits母怜,由于測(cè)試用的文件較小余耽,所以每個(gè)文件為一個(gè)split,并將文件按行分割形成<key,value>
對(duì)苹熏,key為偏移量(包括了回車符)碟贾,value為文本行。這一步由MapReduce框架自動(dòng)完成轨域,如下圖:
2)將分割好的<key,value>
對(duì)交給用戶定義的map方法進(jìn)行處理袱耽,生成新的<key,value>
對(duì),如下圖所示:
3)得到map方法輸出的<key,value>
對(duì)后干发,Mapper會(huì)將它們按照key值進(jìn)行排序朱巨,并執(zhí)行Combine過(guò)程,將key值相同的value值累加枉长,得到Mapper的最終輸出結(jié)果蔬崩。如下圖:
4)Reducer先對(duì)從Mapper接收的數(shù)據(jù)進(jìn)行排序,再交由用戶自定義的reduce方法進(jìn)行處理搀暑,得到新的<key,value>
對(duì),并作為WordCount的輸出結(jié)果跨琳,如下圖: