Commbiner相當于本地的Reducer計算模式想暗,但是并不是所有場合都適合嚷量,總結(jié)一下都是什么場合適合用镇饮。
作用
因為Map產(chǎn)生了太多的輸出取逾,為了減少RPC傳輸耗绿,在本地進行一次類似于Reduce操作,進行累加砾隅,再將累加的值傳給Reduce误阻。
注意:因為Combiner是可插拔的,所以添加Combiner不能影響最終的計算機過,Combiner應(yīng)該適用于那些究反,Reduce輸入和輸出key/value類型完全一致的寻定,且不影響最終結(jié)果的。
WordCount實例
public class TestCombinerForAvgMR {
//Map對不同文件不同月份進行統(tǒng)計
? ? public static class ForMapextends Mapper {
Textokey =new Text();
AvgEntityavgEntity =new AvgEntity();
@Override
? ? ? ? protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
String []line = value.toString().split(" ");
okey.set(line[0]);
avgEntity.setCount(1);
avgEntity.setSum(Integer.parseInt(line[1]));
context.write(okey,avgEntity);
}
}
//Combiner對每個月份的進行累加
? ? public static class ForCombinextends Reducer {
@Override
? ? ? ? protected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException {
int sum =0;
int count =0;
for (AvgEntity avgEntity:values){
sum += avgEntity.getSum();
count += avgEntity.getCount();
}
AvgEntity avgEntity =new AvgEntity();
avgEntity.setSum(sum);
avgEntity.setCount(count);
context.write(key,avgEntity);
}
}
//將月份合并進行累加,做除法
? ? public static class ForReduceextends Reducer{
@Override
? ? ? ? protected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException {
int sum =0;
int count =0;
for(AvgEntity avgEntity : values){
sum += avgEntity.getCount();
count += avgEntity.getSum();
}
context.write(key,new IntWritable(sum/count));
}
}
public static void main(String[] args)throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setMapperClass(ForMap.class);
job.setReducerClass(ForReduce.class);
job.setCombinerClass(ForCombin.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(AvgEntity.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job,new Path("目錄"));
FileOutputFormat.setOutputPath(job,new Path("目錄"));
job.waitForCompletion(true);
? ? }
問題總結(jié)
1.為什么需要在Mapper端進行歸約處理
? ??因為在Mapper進行歸約后精耐,數(shù)據(jù)量變小了狼速,這樣再通過網(wǎng)絡(luò)傳輸時,傳輸時間就變短了卦停,減少了整個作業(yè)的運行時間向胡。
2.為什么可以在Mapper端進行歸約處理
? ??因為Reducer端接收的數(shù)據(jù)就是來自于Mapper端。我們在Mapper進行歸約處理惊完,無非就是把歸約操作提前到Mapper端做而已僵芹。
3.既然在Mapper端進行了歸約處理,為什么還要在Reducer端進行處理小槐。
????因為Mapper端只處理了本節(jié)點的數(shù)據(jù)拇派,而Reduce端處理的是來自多個Mapper端的數(shù)據(jù),因此有些在Mapper端不能歸約的數(shù)據(jù)凿跳,在Reducer端可以進行歸約件豌。
4.求平均數(shù)(SVG)的非關(guān)聯(lián)操作場景如何減少I/O傳輸量
? ? 更改Mapper端使其輸出兩列數(shù)據(jù)分別是數(shù)值個數(shù)count和平均數(shù)avg,這樣在Reducer端累加count作為總的數(shù)值個數(shù)拄显,輸出計數(shù)和平均值苟径。