1. 簡介
Hadoop Mapreduce是一個易于編程并且能在大型集群(上千節(jié)點)快速地并行得處理大量數(shù)據(jù)的軟件框架兢卵,以可靠休蟹,容錯的方式部署在商用機(jī)器上幕侠。
MapReduce作業(yè)通常將輸入數(shù)據(jù)集分成獨立的塊油挥,由map任務(wù)以完全平行的方式進(jìn)行處理顾患∽悦洌框架對map的輸出進(jìn)行排序用踩,然后輸入到reduce任務(wù)。 通常椒功,作業(yè)的輸入和輸出都存儲在文件系統(tǒng)中捶箱。 該框架負(fù)責(zé)調(diào)度任務(wù),監(jiān)控它們并重新執(zhí)行失敗的任務(wù)动漾。
通常丁屎,計算節(jié)點和存儲節(jié)點是相同的,即MapReduce框架和Hadoop分布式文件系統(tǒng)在同一組節(jié)點上運行旱眯。 該配置允許框架在數(shù)據(jù)已經(jīng)存在的節(jié)點上有效地調(diào)度任務(wù)晨川,從而在整個集群中產(chǎn)生非常高的聚合帶寬。
MapReduce框架由單個主ResourceManager删豺,每個集群節(jié)點的一個從屬NodeManager和每個應(yīng)用程序的MRAppMaster組成共虑。
Hadoop 客戶端提交Job和配置信息給ResourceManger,它將負(fù)責(zé)把配置信息分配給從屬節(jié)點呀页,調(diào)度任務(wù)并且監(jiān)控它們妈拌,把狀態(tài)信息和診斷信息傳輸給客戶端。
2. Inputs and Outputs
MapReduce 框架只操作鍵值對蓬蝶,MapReduce 將job的不同類型輸入當(dāng)做鍵值對來處理并且生成一組鍵值對作為輸出尘分。
Key和Value類必須通過實現(xiàn)Writable接口來實現(xiàn)序列化。此外丸氛,Key類必須實現(xiàn)WritableComparable 來使得排序更簡單培愁。
MapReduce作業(yè)的輸入和輸出類型:
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
3. Mapper、Reducer缓窜、Partitioner定续、Counter谍咆、Combiner
應(yīng)用通常實現(xiàn)Mapper和Reducer接口提供map和reduce方法。這是Job的核心代碼私股。
- Mapper
Mappers將輸入的鍵值對轉(zhuǎn)換成中間鍵值對摹察。
Maps是多個單獨執(zhí)行的任務(wù)將輸入轉(zhuǎn)換成中間記錄。那些被轉(zhuǎn)換的中間記錄不一定要和輸入的記錄為相同類型倡鲸。輸入鍵值對可以在map后輸出0或者更多的鍵值對港粱。MapReduce 會根據(jù) InputFormat 切分成的各個 InputSplit 都創(chuàng)建一個map任務(wù)。所有的中間值都會按照Key進(jìn)行排序旦签,然后傳輸給一個特定的Reducer做最后確定的輸出。maps的數(shù)量通常依賴于輸入數(shù)據(jù)的總長度寸宏,也就是宁炫,輸入文檔的總block數(shù)。每個節(jié)點map的正常并行度應(yīng)該在10-100之間氮凝。 - Reducer
Reduce處理一系列相同key的中間記錄羔巢。Reducer有3個主要階段:混洗(Shuffle)、排序(Sort)和reduce罩阵。
Shuffle - 輸出到Reducer的數(shù)據(jù)都在Mapper階段經(jīng)過排序的竿秆。在這個階段框架將通過HTTP從恰當(dāng)?shù)腗apper的分區(qū)中取得數(shù)據(jù)。
Sort - 這個階段框架將對輸入到的 Reducer 的數(shù)據(jù)通過key(不同的 Mapper 可能輸出相同的key)進(jìn)行分組稿壁∮母郑混洗和排序階段是同時進(jìn)行;map的輸出數(shù)據(jù)被獲取時會進(jìn)行合并傅是。
Reduce - 在這個階段reduce方法將會被調(diào)用來處理每個已經(jīng)分好的組鍵值對匪燕。Reducer 輸出的數(shù)據(jù)是不經(jīng)過排序的。
合適的 reduce 總數(shù)應(yīng)該在 節(jié)點數(shù)每個節(jié)點的容器數(shù)0.95 至 節(jié)點數(shù)每個節(jié)點的容器數(shù)1.75 之間喧笔。當(dāng)設(shè)定值為0.95時帽驯,map任務(wù)結(jié)束后所有的 reduce 將會立刻啟動并且開始轉(zhuǎn)移數(shù)據(jù),當(dāng)設(shè)定值為1.75時书闸,處理更多任務(wù)的時候?qū)焖俚匾惠営忠惠喌剡\行 reduce 達(dá)到負(fù)載均衡尼变。Reduce 的數(shù)目的增加將會增加框架的負(fù)擔(dān),但是會提高負(fù)載均衡和降低失敗率浆劲。當(dāng)沒有 reduction 需求的時候可以將 reduce-task 的數(shù)目設(shè)置為0嫌术,是允許的。 - Partitioner
Partitioner對key進(jìn)行分區(qū)梳侨。Partitioner 對 map 輸出的中間值的 key(Reducer之前)進(jìn)行分區(qū)蛉威。分區(qū)采用的默認(rèn)方法是對 key 取 hashcode。分區(qū)數(shù)等于 job 的 reduce 任務(wù)數(shù)走哺。因此這會根據(jù)中間值的key 將數(shù)據(jù)傳輸?shù)綄?yīng)的 reduce蚯嫌。HashPartitioner 是默認(rèn)的的分區(qū)器哲虾。 - Counter
計數(shù)器是一個工具用于報告 Mapreduce 應(yīng)用的統(tǒng)計。
Mapper 和 Reducer 實現(xiàn)類可使用計數(shù)器來報告統(tǒng)計值择示。
Hadoop Mapreduce 是普遍的可用的 Mappers束凑、Reducers 和 Partitioners 組成的一個庫。 - Combiner
在Mapper和Reducer之間有一個非常重要的組件栅盲,它就是Combiner汪诉。并不是所有的job都適用combiner,只有操作滿足結(jié)合律的才可設(shè)置combiner谈秫。每一個map都可能會產(chǎn)生大量的本地輸出扒寄,Combiner的作用就是對map端的輸出先做一次合并,以減少在map和reduce節(jié)點之間的數(shù)據(jù)傳輸量拟烫,以提高網(wǎng)絡(luò)IO性能该编,是MapReduce的一種優(yōu)化手段之一。
4. WordCount
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}