先知道是什么搞隐,再去了解為什么
MapReduce入門概述
MapReduce定義
MapReduce是一個基于Hadoop的分布式運算程序的編程框架
它的核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶的組件組合成為一個完整的分布式運算程序,并發(fā)的運行在Hadoop集群上做祝。
MapReduce優(yōu)點
- MapReduce易于編程:簡單的實現(xiàn)一些接口就可以實現(xiàn)分布式程序近尚,并且這個分布式程序可以分布到大量廉價的PC機器上執(zhí)行巫俺。
- 良好的擴展性:加機器就可以增加計算能力
- 高容錯性:所謂容錯就是當(dāng)系統(tǒng)中一臺機器故障時候,有一種機制可以將任務(wù)分配到新機器上然后繼續(xù)運行肿男,這個過程是不需要人工干涉的
- 適合PB級上數(shù)據(jù)的離線處理:大數(shù)據(jù)的穩(wěn)定處理
MapReduce缺點
- 不擅長實時計算:MapReduce不能像Mysql,在毫秒級或秒級返回結(jié)果
- 不擅長流式計算:流式計算輸入數(shù)據(jù)是動態(tài)的却嗡,連續(xù)不斷的舶沛,但是MR處理的數(shù)據(jù)一定是靜態(tài)的,這是由設(shè)計決定的
- 不擅長DAG計算:多個任務(wù)具有依賴關(guān)系窗价,后者輸入依賴前者輸出如庭,這種活MR不擅長,讀寫磁盤太多性能下降
MapReduce統(tǒng)計單詞過程
[圖片上傳失敗...(image-51ec82-1644239453599)]
默認(rèn)是按照128M進行數(shù)據(jù)切塊哦
在上圖進程一共有三種:
APPMaster:負責(zé)整個程序的過程調(diào)度和狀態(tài)協(xié)調(diào)
MapTask:負責(zé)Map階段的整個數(shù)據(jù)處理流程
ReduceTask:負責(zé)Reduce階段的整個數(shù)據(jù)處理流程
MapReduce編程套路
我們編寫的部分基本分為三個:Mapper撼港,Reducer和Driver
Map階段
(1)用戶自定義Mapper要繼承的父類
(2)Mapper的輸入數(shù)據(jù)格式是KV對
(3)Mapper中業(yè)務(wù)邏輯寫在map()方法中【map()對每個KV對調(diào)用一次】
(4)Mapper的輸出數(shù)據(jù)格式也是KV對
Reducer階段
(1)用戶自定義Reducer繼承自己的父類
(2)Reducer的輸入數(shù)據(jù)類型對應(yīng)Mapper的輸出數(shù)據(jù)類型坪它,也是KV
(3)Reduce業(yè)務(wù)邏輯寫在reduce()方法中【reduce()對每個KV對調(diào)用一次】
Driver階段
相當(dāng)于YARN集群的客戶端,等等程序?qū)懲甑勰担枰ㄟ^它把整個程序提交到Y(jié)ARN集群上
HELLO WORLD案例
需求
統(tǒng)計文件中單詞的出現(xiàn)的詞頻
Mapper代碼
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 獲取一行
String line = value.toString();
// 2 切割
String[] words = line.split(" ");
// 3 輸出
for (String word : words) {
k.set(word);
context.write(k, v);
}
}
}
Reducer階段
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
int sum;
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// 1 累加求和
sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
// 2 輸出
v.set(sum);
context.write(key,v);
}
}
Driver驅(qū)動類
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 輸入輸出路徑需要根據(jù)自己電腦上實際的輸入輸出路徑設(shè)置
// 注意這里是在win下跑往毡,如果放到集群上路徑需要更改
args = new String[] { "e:/input/inputword", "e:/output1" };
// 1 獲取配置信息以及封裝任務(wù)
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 設(shè)置jar加載路徑
job.setJarByClass(WordcountDriver.class);
// 3 設(shè)置map和reduce類
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
// 4 設(shè)置map輸出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 設(shè)置Reduce輸出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 設(shè)置輸入和輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}