MapReduce定義
MapReduce是一個分布式運算程序的編程框架,是用戶開發(fā)“基于Hadoop的數(shù)據(jù)分析應(yīng)用”的核心框架
MapReduce核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,并發(fā)運行在一個Hadoop集群上
MapReduce的優(yōu)缺點
優(yōu)點
- 易于編程,用戶只關(guān)心業(yè)務(wù)邏輯胆数,實現(xiàn)框架的接口
- 良好的擴展性峻村,可以動態(tài)的增加服務(wù)器,解決計算資源不夠的問題
- 高容錯性,任何一臺掛掉,可以將任務(wù)轉(zhuǎn)移到其他節(jié)點
- 適合海量數(shù)據(jù)計算(TB/PB級別)绳矩,幾千臺服務(wù)器共同計算
缺點
- 不擅長實時計算
- 不擅長流式計算
- 不擅長DAG有向無環(huán)圖計算
MapReduce核心思想
image.png
MapReduce的進程
一個完整的MapReduce程序在分布式環(huán)境下有三類實例進程
- MrAppMaster:負責整個程序的過程調(diào)度及狀態(tài)協(xié)調(diào)
- MapTask:負責Map階段整個數(shù)據(jù)處理流程
- ReduceTask:負責Reduce階段整個數(shù)據(jù)處理流程
官方WordCount源碼
采用反編譯工具反編譯源碼驮捍,發(fā)現(xiàn)WordCount案例有Map類疟呐、Reduce類、驅(qū)動類厌漂。且數(shù)據(jù)類型是Hadoop封裝的序列化類型
image.png
image.png
常用數(shù)據(jù)序列化類型
image.png
MapReduce編程規(guī)范
用戶編寫的程序分為三個部分Map萨醒、Reduce、Driver
Map階段
- 用戶自定義的Mapper要繼承父類
- Mapper的輸入數(shù)據(jù)是KV對的形式
- Mapper中的業(yè)務(wù)邏輯寫在map方法里
- Mapper的輸出數(shù)據(jù)是KV對的形式
- map方法(MapTask進程)對每個KV對調(diào)用一次
Reduce階段
- 用戶自定義的Reducer要繼承父類
- Reducer的輸入要對應(yīng)Mapper的輸出苇倡,KV對
- Reducer的業(yè)務(wù)邏輯寫在reduce方法里
- reduce方法(ReduceTask進程)對每一組相同K的KV對調(diào)用一次
Driver階段
相當于YARN集群的客戶端富纸,用于提交我們整個程序到Y(jié)ARN集群,提交的是封裝了MapReduce程序相關(guān)運行參數(shù)的job對象
模擬官網(wǎng)WordCount案例
測試數(shù)據(jù)
a b c
s d
c
c
r
a
d s
r t
h e
Mapper
/**
* KEYIN, map階段輸入的key的類型 long
* VALUEIN, map階段輸入的value的類型 text
* KEYOUT, map階段輸出的key的類型 text
* VALUEOUT, map階段輸出的value的類型 int
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outKey = new Text();
private IntWritable outValue = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 獲取一行數(shù)據(jù)
String line = value.toString();
// 切分
String[] words = line.split(" ");
// 循環(huán)寫出
for (String word : words) {
// 封裝outKey
outKey.set(word);
// 寫出
context.write(outKey, outValue);
}
}
}
Reducer
/**
* KEYIN, reduce階段輸入的key的類型 long
* VALUEIN, reduce階段輸入的value的類型 text
* KEYOUT, reduce階段輸出的key的類型 text
* VALUEOUT, reduce階段輸出的value的類型 int
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outValue = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 累加
for (IntWritable value : values) {
sum += value.get();
}
outValue.set(sum);
context.write(key, outValue);
}
}
Driver
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 獲取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 設(shè)置jar路徑
job.setJarByClass(WordCountDriver.class);
// 關(guān)聯(lián)mapper旨椒、reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 設(shè)置map輸出的KV類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 設(shè)置最終輸出的KV類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 設(shè)置輸入路徑和輸出路徑
FileInputFormat.setInputPaths(job, new Path("test.txt"));
FileOutputFormat.setOutputPath(job, new Path("result.txt"));
// 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}