MapReduce定義
MapReduce是一個(gè)分布式計(jì)算的框架,是用戶開發(fā)機(jī)遇hadoop的數(shù)據(jù)分析應(yīng)用的核心框架饱亿。
MapReduce的優(yōu)缺點(diǎn)
- 優(yōu)點(diǎn)
- 易于編程 只要實(shí)現(xiàn)一些簡(jiǎn)單的接口即可實(shí)現(xiàn)功能,且編寫程序類似串行
- 良好的擴(kuò)展性 支持?jǐn)U展計(jì)算服務(wù)器的數(shù)量
- 高容錯(cuò)性 可以在價(jià)格低廉的機(jī)器上運(yùn)行承璃,即便集群中某些節(jié)點(diǎn)宕機(jī)愈涩,也可以正常使用
- 適合PB級(jí)離線計(jì)算
- 缺點(diǎn)
不擅長(zhǎng)實(shí)時(shí)計(jì)算、流式計(jì)算肺蔚、DAT計(jì)算
MapReduce的編程思想
MapReduce主要包括兩個(gè)部分 Map階段 + Reduce階段煌妈,每個(gè)階段中的輸入輸出都是key-value的形式存在
已文本詞數(shù)統(tǒng)計(jì)為例,兩個(gè)階段的流程如下:
- Map階段讀取Hadoop分片的數(shù)據(jù)宣羊,按行讀取自動(dòng)進(jìn)行一次map操作璧诵,得到輸入key-value對(duì)應(yīng)為 “偏移量-本行數(shù)據(jù)”。
偏移量實(shí)際是該行起始的數(shù)據(jù)長(zhǎng)度索引仇冯,可以理解為行號(hào)之宿,例如第一行偏移量為0,數(shù)據(jù)10byte苛坚,則第二行偏移量為11比被。 - Map階段第二步執(zhí)行我們實(shí)現(xiàn)的接口算法,并將結(jié)果的key-value(單詞-每行的詞頻 如 java - 1 2 1 4 1)輸出都磁盤上泼舱。整個(gè)Map階段都是完全并行執(zhí)行的等缀。
- Reduce階段讀取Map的結(jié)果,執(zhí)行實(shí)現(xiàn)的接口娇昙,對(duì)每個(gè)分片的結(jié)果進(jìn)行初次的匯總
- Reduce階段對(duì)每個(gè)分片的結(jié)果再次進(jìn)行匯總成為一個(gè)最終結(jié)果
注:通常一個(gè)分片對(duì)應(yīng)hadoop中存儲(chǔ)的一個(gè)塊尺迂,即128M,這也可以避免載入內(nèi)存文件過大涯贞,撐爆內(nèi)存
實(shí)現(xiàn)wordcount編碼
Map類枪狂,繼承Mapper,重寫其map方法實(shí)現(xiàn)對(duì)每個(gè)單詞的統(tǒng)計(jì)宋渔,范型是根據(jù)自己業(yè)務(wù)需要定義的類型
package com.irving.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* map執(zhí)行類
* 四個(gè)范型是map輸入和輸出的類型
* @LongWritable 字符偏移量
* @Author yuanyc
* @Date 15:17 2019-07-11
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* 重寫map方法
* @Author yuanyc
* @Date 15:19 2019-07-11
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 每行起始的偏移量
System.out.println("-------");
System.out.println("偏移量" + key.get());
// 按行讀取的數(shù)據(jù)
String line = value.toString();
// 根據(jù)空格切分str
String[] arr = line.split(" ");
// 對(duì)字符傳標(biāo)記1
for (String str : arr) {
context.write(new Text(str), new IntWritable(1));
}
}
}
Reduce類州疾,繼承Reducer類,重寫reduce方法皇拣,實(shí)現(xiàn)對(duì)map結(jié)果的匯總
package com.irving.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* reduce類
* @Author yuanyc
* @Date 11:17 2019-07-14
*/
public class WordCountRecuder extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// values 形如 1严蓖, 2薄嫡,2,1颗胡,1
// 對(duì)詞頻進(jìn)行統(tǒng)計(jì)
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
編寫啟動(dòng)類毫深,執(zhí)行mapreduce算法
注:輸入輸出路徑應(yīng)當(dāng)為hdfs的目錄,但是本地調(diào)試階段可以使用linux文件系統(tǒng)目錄
package com.irving.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 啟動(dòng)類
* @Author yuanyc
* @Date 15:39 2019-07-11
*/
public class WordCountMain {
public static void main(String[] args) {
Configuration configuration = new Configuration();
// args = new String[]{"/Users/yuanyc/Documents/workspace/hdfs/test.txt", "/Users/yuanyc/Documents/workspace/hdfs/out"};
try {
// 創(chuàng)建job
Job job = Job.getInstance(configuration);
job.setJarByClass(WordCountMain.class);
// 指定map類
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 指定reduce
job.setReducerClass(WordCountRecuder.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定輸入輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交任務(wù)
job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
測(cè)試數(shù)據(jù)
本地執(zhí)行結(jié)果
注:輸出目錄不能重復(fù)存在毒姨,要重新執(zhí)行時(shí)需要?jiǎng)h除現(xiàn)有目錄
Hadoop的序列化
通過上面代碼可以看出哑蔫,MR在編碼過程中使用的輸入輸出對(duì)象類型都是不是自定義的類型。
使用的這些類型是Hadoop定義的基礎(chǔ)類型弧呐,由于mapreduce過程中伴隨大量的IO操作闸迷,因此需要針對(duì)序列化進(jìn)行性能優(yōu)化。
Java常用類型與Hadoop序列化類型的對(duì)照表
JDK的類型 | Hadoop序列化類型 |
---|---|
int | IntWritable |
long | LongWritable |
float | FloatWritable |
double | DoubleWritable |
byte | ByteWritable |
boolean | BooleanWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
自定義Java對(duì)象的序列化
自定義的Bean需要實(shí)現(xiàn)writable接口俘枫,重寫序列化和反序列化的方法
package com.irving.wordcount;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 自定義bean的hadoop序列化
* @Author yuanyc
* @Date 12:37 2019-07-14
*/
public class BeanWritable implements Writable, Comparable {
private String name;
private int age;
/**
* 序列化方法
* @Author yuanyc
* @Date 12:53 2019-07-14
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeChars(name);
dataOutput.writeInt(age);
}
/**
* 反序列化腥沽,順序要與序列化一致
* @Author yuanyc
* @Date 12:53 2019-07-14
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
name = dataInput.readUTF();
age = dataInput.readInt();
}
/**
* 自定義對(duì)象用key時(shí),需要重寫compareto方法鸠蚪,用于shuffle階段的排序
* @Author yuanyc
* @Date 12:52 2019-07-14
*/
@Override
public int compareTo(Object o) {
return 0;
}
}