map-reduce 是 hadoop 的核心概念之一凶异,hadoop 權(quán)威指南中以一個天氣數(shù)據(jù)處理的例子說明了 map-reduce 的好處:
- map 階段將工作劃分為可以并行的部分并進(jìn)行調(diào)度;
- 在 reduce 階段提供了方便的數(shù)據(jù)整合方式
- 可以將任務(wù)分散到多個機(jī)器上并行執(zhí)行,加快任務(wù)執(zhí)行速度
1 map-reduce 的過程
先分別讀入數(shù)據(jù),得到一個局部的解,然后通過 shuffle隔箍,將 key 相同的數(shù)據(jù)整合起來,最終在 reduce 階段合并起來脚乡,輸出數(shù)據(jù)蜒滩。
map 示例:
public class MaxTemperatureMapper extends
Mapper<LongWritable, //keyin
Text, // valuein
Text, // keyout
IntWritable //valueout
> {
public static final int MISSING = 9999;
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
reduce 代碼,注意 reduce 的輸入類型與 map 的輸出類型要保持一致:
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxvalue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxvalue = Math.max(maxvalue, value.get());
}
context.write(key, new IntWritable(maxvalue));
}
}
最后將 map-reduce 結(jié)合起來:
public class MaxTemperature {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (args.length != 2) {
System.out.println("Usage: MaxTemperature <input path> <out path>");
System.exit(-1);
}
Job job = Job.getInstance();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max Temperature");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
//combiner 其實(shí)也是一個reducer
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
2 數(shù)據(jù)流
Job 是 hadoop 運(yùn)行任務(wù)的基礎(chǔ)單位,job 會被分為 task 來運(yùn)行帮掉,task 會由 YARN 調(diào)度在集群的節(jié)點(diǎn)上運(yùn)行,通常每個 task 處理的任務(wù)大小最好和 hdfs 的 block 大小相同窒典,防止由于 task 所需數(shù)據(jù)分布在不同節(jié)點(diǎn)所引起的數(shù)據(jù)交換開銷蟆炊。
map 完成后,數(shù)據(jù)被寫入本地硬盤瀑志,而后被發(fā)送給 reducer涩搓。reducer 可以有一個,也可以有多個劈猪,同一個 key 對應(yīng)的數(shù)據(jù)將會被發(fā)送到同一個 reducer 處理昧甘。同時,對于沒有必要進(jìn)行 reduce 操作的 job战得,也可以沒有 reducer充边。
3 combiner Functions
有些情況下,map 輸出的數(shù)據(jù)可以先進(jìn)行預(yù)先處理常侦,以減少向 reducer 傳遞的數(shù)據(jù)浇冰。例如在統(tǒng)計每年的最高溫度時,同一個 split 輸出了若干 key相同的記錄:(1950, 0)聋亡,(1950,20)肘习,(1950,10) ,則可以先在 split 內(nèi)統(tǒng)計出最大值(1950,20)坡倔,從而減少了數(shù)據(jù)的傳輸漂佩。注意,combiner 不保證會被執(zhí)行罪塔,所以一定要保證是否存在 combiner 輸出的結(jié)果都不會有變化投蝉。
//通過該方法可以設(shè)置 combiner,combiner 其實(shí)也是一個 reducer
job.setCombinerClass(MaxTemperatureReducer.class);
4 hadoop streaming
hadoop streaming 提供了一種用其他語言寫 map-reduce 的 api征堪,主要是使用了輸入輸出重定向墓拜,個人感覺意義不大。