Shuffle機制
Map方法之后肋坚,Reduce方法之前的數(shù)據(jù)處理過程稱之為Shuffle涕癣。
Partition分區(qū)
如何按照條件輸出到不同文件(分區(qū))中,MapReduce提供了Partitioner功能。默認采用hash值的方式。
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
public void configure(JobConf job) {}
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K2 key, V2 value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
默認分區(qū)是根據(jù)key的hashCode對ReduceTasks個數(shù)取模得到的酷勺。用戶沒法控制那個key存儲在那個分區(qū)。
自定義Partitioner步驟
1)自定義類繼承Partitioner扳躬,重寫getPartition()方法
public class ProvincePartitioner extends Partitioner<Text,FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
String substring = text.toString().substring(0, 2);
if("135".equals(substring)){
return 0;
}
return 1;
}
}
2)在Job驅動中脆诉,設置自定義Partitioner
job.setPartitionerClass(ProvincePartitioner.class);
3)自定義Partition后甚亭,需要根據(jù)自定義Partitioner的邏輯設置相應數(shù)量的ReduceTask。
job.setNumReduceTasks(2);
分區(qū)總結
1)如果ReduceTask數(shù)量>getPartition的結果數(shù)击胜,則會多產(chǎn)生幾個空的輸出文件part-r-oooxx;
2)如果1<ReduceTask的數(shù)量<getPartition的結果數(shù)亏狰,則有一部分分區(qū)數(shù)據(jù)無法安放,會Exception
3)如果ReduceTask的數(shù)量=1偶摔,則不管MapTask端輸出多少個分區(qū)文件暇唾,最終結果都交給這一個ReduceTask,最終也就只會產(chǎn)生一個結果文件part-r-00000辰斋;
4)分區(qū)號必須從零開始策州,逐一累加。
代碼實戰(zhàn)
FlowBean.java
public class FlowBean implements Writable {
private long upFlow;
private long downFlow;
private long sumFlow;
public FlowBean() {
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
@Override
public String toString() {
return upFlow +
"\t" + downFlow +
"\t" + sumFlow;
}
}
FlowMapper.java
public class FlowMapper extends Mapper<LongWritable, Text,Text, FlowBean> {
private Text outK = new Text();
private FlowBean outV = new FlowBean();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, FlowBean>.Context context)
throws IOException, InterruptedException {
// 1 獲取一行
String line = value.toString();
// 2 切割
String[] split = line.split(" ");
System.out.println(split.length);
// 3 抓取數(shù)據(jù)
String phone = split[0];
String upFlow = split[split.length-3];
String downFlow = split[split.length-2];
// 4 封裝
outK.set(phone);
outV.setUpFlow(Long.parseLong(upFlow));
outV.setDownFlow(Long.parseLong(downFlow));
outV.setSumFlow();
context.write(outK,outV);
}
}
FlowReducer.java
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
private FlowBean outV = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values,
Reducer<Text, FlowBean, Text, FlowBean>.Context context)
throws IOException, InterruptedException {
// 1 遍歷集合類價值
long totalUp = 0;
long totalDown = 0;
for (FlowBean value : values) {
totalUp += value.getUpFlow();
totalDown += value.getDownFlow();
}
// 3 封裝
outV.setUpFlow(totalUp);
outV.setDownFlow(totalDown);
outV.setSumFlow();
// 4 寫出
context.write(key,outV);
}
}
ProvincePartitioner.java
public class ProvincePartitioner extends Partitioner<Text,FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
String substring = text.toString().substring(0, 2);
if("135".equals(substring)){
return 0;
}
return 1;
}
}
FlowDriver.java
public class FlowDriver {
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
// 1 獲取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 設置jar
job.setJarByClass(FlowDriver.class);
// 3 關聯(lián)mapper和reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4 設置mapper輸出的key和value類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 5 設置最終輸出的key和value類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
job.setPartitionerClass(ProvincePartitioner.class);
job.setNumReduceTasks(2);
// 6 設置數(shù)據(jù)的輸入路徑和輸出路徑
FileInputFormat.setInputPaths(job,new Path(System.getProperty("user.dir")+"/input/partitioner2"));
FileOutputFormat.setOutputPath(job,new Path(System.getProperty("user.dir")+"/output/partitioner2"));
// 7 提交job
Boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
WritableComparable排序
排序是MapReduce框架中最重要的操作之一亡呵。
MapTask和ReduceTask均會對數(shù)據(jù)按照Key進行排序抽活。該操作屬于Hadoop的默認行為硫戈。任何應用程序中的數(shù)據(jù)均會被排序锰什,而不管邏輯上是否需要。
默認排序是按照字典順序排序丁逝,且實現(xiàn)該排序的方法是快速排序汁胆。
排序概述
對于MapTask,它會將處理的結果暫時放到環(huán)形緩沖區(qū)中霜幼,當環(huán)形緩沖區(qū)使用率達到一定閾值后嫩码,再對緩沖區(qū)中的數(shù)據(jù)進行一次快速排序(內存完成),并將這些有序數(shù)據(jù)溢寫到磁盤上罪既,而當數(shù)據(jù)處理完畢后铸题,它會對磁盤上所有文件進行歸并排序。
對于ReduceTask琢感,它從每個MapTask上遠程拷貝相應的數(shù)據(jù)文件丢间,如果文件大小超過一定閾值,則溢寫磁盤上驹针,否則存儲在內存中烘挫。如果磁盤上文件數(shù)目達到一定閾值,則進行一次歸并排序以生成一個更大文件柬甥;如果內存中文件大小或者數(shù)目超過一定閾值饮六,則進行一次合并后將數(shù)據(jù)溢寫到磁盤上。當所有數(shù)據(jù)拷貝完畢后苛蒲,ReduceTask統(tǒng)一對內存和磁盤上的所有數(shù)據(jù)進行一次歸并排序卤橄。
排序分類
1)部分排序
MapReduce根據(jù)輸入記錄的鍵對數(shù)據(jù)集排序。保證輸出的每個文件內部有序臂外。
2)全排序
最終輸出結果只有一個文件虽风,且文件內部有序棒口。實現(xiàn)方式是只設置一個ReduceTask。但該方法在處理大型文件時效率極低辜膝,因為一臺機器處理所有文件无牵,完全喪失了MapReduce所提供的并行架構。
3)輔助排序
在Reduce端對key進行分組厂抖。應用于:在接收的key為bean對象時茎毁,想讓一個或幾個字段相同(全部字段比較不同)的key進入到同一個reduce方法時,可以采用分組排序忱辅。
4)二次排序
在自定義排序過程中七蜘,如果compareTo中的判斷條件為兩個即為二次排序。
自定義排序WritableComparable原理分析
bean對象作為key傳輸墙懂,需要實現(xiàn)WritableComparable接口重寫compareTo方法橡卤,就可以實現(xiàn)排序。
FlowBean.java
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow;
private long downFlow;
private long sumFlow;
public FlowBean() {
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
@Override
public String toString() {
return upFlow +
"\t" + downFlow +
"\t" + sumFlow;
}
@Override
public int compareTo(FlowBean o) {
if (this.sumFlow > o.sumFlow) {
return -1;
} else if (this.sumFlow < o.sumFlow) {
return 1;
} else {
if (this.upFlow > o.upFlow) {
return 1;
} else if (this.upFlow < o.upFlow) {
return -1;
} else {
return 0;
}
}
}
}
FlowMapper.java
public class FlowMapper extends Mapper<LongWritable, Text,FlowBean, Text> {
private FlowBean outK= new FlowBean();
private Text outV = new Text();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, FlowBean, Text>.Context context)
throws IOException, InterruptedException {
// 獲取1行
String line = value.toString();
// 切割
String[] split = line.split(" ");
// 封裝
outV.set(split[0]);
outK.setUpFlow(Long.parseLong(split[1]));
outK.setDownFlow(Long.parseLong(split[2]));
// 寫出
context.write(outK,outV);
}
}
FlowReducer.java
public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
private FlowBean outV = new FlowBean();
@Override
protected void reduce(FlowBean key, Iterable<Text> values,
Reducer<FlowBean, Text, Text, FlowBean>.Context context)
throws IOException, InterruptedException {
for (Text value : values) {
context.write(value,key);
}
}
}
FlowDriver.java
public class FlowDriver {
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
// 1 獲取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 設置jar
job.setJarByClass(FlowDriver.class);
// 3 關聯(lián)mapper和reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4 設置mapper輸出的key和value類型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
// 5 設置最終輸出的key和value類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6 設置數(shù)據(jù)的輸入路徑和輸出路徑
FileInputFormat.setInputPaths(job, new Path(System.getProperty("user.dir")+"/input/writeableComparable"));
FileOutputFormat.setOutputPath(job, new Path(System.getProperty("user.dir")+"/output/writeableComparable"));
// 7 提交job
Boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
Combiner
Combiner
1)Combiner是MR程序中Mapper和Reducer之外的一種組件损搬。
2)Combiner組件的父類就是Reducer碧库。
3)Combiner和Reducer的區(qū)別在于運行的位置。
Combiner是在每一個MapTask所在的節(jié)點運行巧勤;
Reducer是接受全局所有Mapper的輸出結果嵌灰;
4)Combiner的意義就是對每一個MapTask的輸出進行局部匯總,以減少網(wǎng)絡流量颅悉。
5)Combiner能夠應用的前提是不能影響最終的業(yè)務邏輯沽瞭,而且,Combiner的輸出kv能夠跟Reducer的輸入kv類型要對應起來剩瓶。
6)因為Combiner代碼和Reducer代碼一致驹溃,可以直接設置Reducer代碼為Combiner代碼
案例
WordCountMapper.java
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outKey = new Text();
private IntWritable outV = new IntWritable(1);
@Override
public void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// 1 獲取一行
String lineStr = value.toString();
// 2 切割
String[] words = lineStr.split(" ");
// 3 循環(huán)寫出
for (String word : words) {
// 封裝outKey
outKey.set(word);
// 寫出
context.write(outKey, outV);
}
}
}
WordCountReducer.java
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable outV = new IntWritable();
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
// 累加
for (IntWritable value : values) {
sum += value.get();
}
outV.set(sum);
// 寫出
context.write(key,outV);
}
}
WordCountCombiner.java
public class WordCountCombiner extends Reducer<Text, IntWritable,Text, IntWritable> {
private IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outV.set(sum);
context.write(key,outV);
}
}
WordCountDriver.java
public class WordCountDriver {
public static void main(String[] args) throws Exception {
//1 獲取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 設置jar包路徑
job.setJarByClass(WordCountDriver.class);
//3 關聯(lián)mapper、reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4 設置mapper輸出的kv類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5 設置最終輸出的kv類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setCombinerClass(WordCountCombiner.class);
// 可以直接將Reducer設置為Combiner延曙,因為這兩處代碼邏輯一致
// job.setCombinerClass(WordCountReducer.class);
//6 設置輸入路徑和輸出路徑
FileInputFormat.setInputPaths(job, new Path(System.getProperty("user.dir")+"/input/combiner"));
FileOutputFormat.setOutputPath(job, new Path(System.getProperty("user.dir")+"/output/combiner"));
//7 提交job
Boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
小結
本小節(jié)是重點M愫住!搂鲫!描述了Shuffle機制(在mapper之后reducer之前傍药,如果沒有reducer那么combiner將不執(zhí)行)。詳細描述了分區(qū)魂仍、排序以及聚合拐辽,多理解。