一 Shuffle
機(jī)制
1??
Shuffle
機(jī)制 :Map
方法之后,Reduce
方法之前的數(shù)據(jù)處理過程稱之為Shuffle
.2??Partition
分區(qū)3??Partition
分區(qū)案例實(shí)操
1.需求
將統(tǒng)計(jì)結(jié)果按照手機(jī)歸屬地不同省份輸出到不同文件中(分區(qū))
(1)輸入數(shù)據(jù)1 13736230513 192.196.100.1 www.xxx.com 2481 24681 200 2 13846544121 192.196.100.2 264 0 200 3 13956435636 192.196.100.3 132 1512 200 4 13966251146 192.168.100.1 240 0 404 5 18271575951 192.168.100.2 www.xxx.com 1527 2106 200 6 84188413 192.168.100.3 www.xxx.com 4116 1432 200 7 13590439668 192.168.100.4 1116 954 200 8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200 9 13729199489 192.168.100.6 240 0 200 10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200 11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200 12 15959002129 192.168.100.9 www.xxx.com 1938 180 500 13 13560439638 192.168.100.10 918 4938 200 14 13470253144 192.168.100.11 180 180 200 15 13682846555 192.168.100.12 www.qq.com 1938 2910 200 16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200 17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404 18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200 19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200 20 13768778790 192.168.100.17 120 120 200 21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200 22 13568436656 192.168.100.19 1116 954 200
(2)期望輸出數(shù)據(jù)
??手機(jī)號(hào)136
、137
谈况、138
嘲玫、139
開頭都分別放到一個(gè)獨(dú)立的4
個(gè)文件中泽裳,其他開頭的放到一個(gè)文件中网梢。
2.需求分析3.在案例2.4的基礎(chǔ)上,增加一個(gè)分區(qū)類package com.xxx.mapreduce.flowsum; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class ProvincePartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text key, FlowBean value, int numPartitions) { // 1 獲取電話號(hào)碼的前三位 String preNum = key.toString().substring(0, 3); int partition = 4; // 2 判斷是哪個(gè)省 if ("136".equals(preNum)) { partition = 0; }else if ("137".equals(preNum)) { partition = 1; }else if ("138".equals(preNum)) { partition = 2; }else if ("139".equals(preNum)) { partition = 3; } return partition; } }
4.在驅(qū)動(dòng)函數(shù)中增加自定義數(shù)據(jù)分區(qū)設(shè)置和
ReduceTask
設(shè)置package com.xxx.mapreduce.flowsum; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowsumDriver { public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { // 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置 args = new String[]{"e:/output1","e:/output2"}; // 1 獲取配置信息矾踱,或者job對(duì)象實(shí)例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 指定本程序的jar包所在的本地路徑 job.setJarByClass(FlowsumDriver.class); // 3 指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); // 4 指定mapper輸出數(shù)據(jù)的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 5 指定最終輸出的數(shù)據(jù)的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 8 指定自定義數(shù)據(jù)分區(qū) job.setPartitionerClass(ProvincePartitioner.class); // 9 同時(shí)指定相應(yīng)數(shù)量的reduce task job.setNumReduceTasks(5); // 6 指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 將job中配置的相關(guān)參數(shù)反粥,以及job所用的java類所在的jar包, 提交給yarn去運(yùn)行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
4??
WritableComparable
排序1. 排序分類2.自定義排序WritableComparable
(1)原理分析
??bean
對(duì)象做為key
傳輸吹由,需要實(shí)現(xiàn)WritableComparable
接口重寫compareTo
方法若未,就可以實(shí)現(xiàn)排序。@Override public int compareTo(FlowBean o) { int result; // 按照總流量大小倾鲫,倒序排列 if (sumFlow > bean.getSumFlow()) { result = -1; }else if (sumFlow < bean.getSumFlow()) { result = 1; }else { result = 0; } return result; }
5??
WritableComparable
排序案例實(shí)操(全排序)
1.需求
根據(jù)案例2.3產(chǎn)生的結(jié)果再次對(duì)總流量進(jìn)行排序粗合。
(1)輸入數(shù)據(jù) : 原始數(shù)據(jù)1 13736230513 192.196.100.1 www.xxx.com 2481 24681 200 2 13846544121 192.196.100.2 264 0 200 3 13956435636 192.196.100.3 132 1512 200 4 13966251146 192.168.100.1 240 0 404 5 18271575951 192.168.100.2 www.xxx.com 1527 2106 200 6 84188413 192.168.100.3 www.xxx.com 4116 1432 200 7 13590439668 192.168.100.4 1116 954 200 8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200 9 13729199489 192.168.100.6 240 0 200 10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200 11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200 12 15959002129 192.168.100.9 www.xxx.com 1938 180 500 13 13560439638 192.168.100.10 918 4938 200 14 13470253144 192.168.100.11 180 180 200 15 13682846555 192.168.100.12 www.qq.com 1938 2910 200 16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200 17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404 18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200 19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200 20 13768778790 192.168.100.17 120 120 200 21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200 22 13568436656 192.168.100.19 1116 954 200
(2)第一次處理后的數(shù)據(jù)
13470253144 180 180 360 13509468723 7335 110349 117684 13560439638 918 4938 5856 13568436656 3597 25635 29232 13590439668 1116 954 2070 13630577991 6960 690 7650 13682846555 1938 2910 4848 13729199489 240 0 240 13736230513 2481 24681 27162 13768778790 120 120 240 13846544121 264 0 264 13956435636 132 1512 1644 13966251146 240 0 240 13975057813 11058 48243 59301 13992314666 3008 3720 6728 15043685818 3659 3538 7197 15910133277 3156 2936 6092 15959002129 1938 180 2118 18271575951 1527 2106 3633 18390173782 9531 2412 11943 84188413 4116 1432 5548
(3)期望輸出數(shù)據(jù)
13509468723 7335 110349 117684 13736230513 2481 24681 27162 13956435636 132 1512 1644 13846544121 264 0 264 。乌昔。隙疚。 。磕道。供屉。
2.需求分析
3.代碼實(shí)現(xiàn)
(1)FlowBean
對(duì)象在在需求1基礎(chǔ)上增加了比較功能package com.xxx.mapreduce.sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class FlowBean implements WritableComparable<FlowBean> { private long upFlow; private long downFlow; private long sumFlow; // 反序列化時(shí),需要反射調(diào)用空參構(gòu)造函數(shù),所以必須有 public FlowBean() { super(); } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } /** * 序列化方法 * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /** * 反序列化方法 注意反序列化的順序和序列化的順序完全一致 * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } @Override public int compareTo(FlowBean o) { int result; // 按照總流量大小伶丐,倒序排列 if (sumFlow > bean.getSumFlow()) { result = -1; }else if (sumFlow < bean.getSumFlow()) { result = 1; }else { result = 0; } return result; } }
(2)編寫
Mapper
類package com.xxx.mapreduce.sort; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{ FlowBean bean = new FlowBean(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 獲取一行 String line = value.toString(); // 2 截取 String[] fields = line.split("\t"); // 3 封裝對(duì)象 String phoneNbr = fields[0]; long upFlow = Long.parseLong(fields[1]); long downFlow = Long.parseLong(fields[2]); bean.set(upFlow, downFlow); v.set(phoneNbr); // 4 輸出 context.write(bean, v); } }
(3)編寫
Reducer
類package com.xxx.mapreduce.sort; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{ @Override protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 循環(huán)輸出悼做,避免總流量相同情況 for (Text text : values) { context.write(text, key); } } }
(4)編寫
Driver
類package com.xxx.mapreduce.sort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowCountSortDriver { public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { // 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置 args = new String[]{"e:/output1","e:/output2"}; // 1 獲取配置信息,或者job對(duì)象實(shí)例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 指定本程序的jar包所在的本地路徑 job.setJarByClass(FlowCountSortDriver.class); // 3 指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類 job.setMapperClass(FlowCountSortMapper.class); job.setReducerClass(FlowCountSortReducer.class); // 4 指定mapper輸出數(shù)據(jù)的kv類型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); // 5 指定最終輸出的數(shù)據(jù)的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 6 指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 將job中配置的相關(guān)參數(shù)哗魂,以及job所用的java類所在的jar包肛走, 提交給yarn去運(yùn)行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
6??
WritableComparable
排序案例實(shí)操(區(qū)內(nèi)排序)
1.需求 : 要求每個(gè)省份手機(jī)號(hào)輸出的文件中按照總流量?jī)?nèi)部排序。
2.需求分析 : 基于前一個(gè)需求录别,增加自定義分區(qū)類朽色,分區(qū)按照省份手機(jī)號(hào)設(shè)置。3.案例實(shí)操
(1)增加自定義分區(qū)類package com.xxx.mapreduce.sort; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class ProvincePartitioner extends Partitioner<FlowBean, Text> { @Override public int getPartition(FlowBean key, Text value, int numPartitions) { // 1 獲取手機(jī)號(hào)碼前三位 String preNum = value.toString().substring(0, 3); int partition = 4; // 2 根據(jù)手機(jī)號(hào)歸屬地設(shè)置分區(qū) if ("136".equals(preNum)) { partition = 0; }else if ("137".equals(preNum)) { partition = 1; }else if ("138".equals(preNum)) { partition = 2; }else if ("139".equals(preNum)) { partition = 3; } return partition; } }
(2)在驅(qū)動(dòng)類中添加分區(qū)類
// 加載自定義分區(qū)類 job.setPartitionerClass(ProvincePartitioner.class); // 設(shè)置Reducetask個(gè)數(shù) job.setNumReduceTasks(5);
7??
Combiner
合并自定義Combiner
實(shí)現(xiàn)步驟
(a)自定義一個(gè)Combiner
繼承Reducer
组题,重寫Reduce
方法public class WordcountCombiner extends Reducer<Text, IntWritable, Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { // 1 匯總操作 int count = 0; for(IntWritable v :values){ count += v.get(); } // 2 寫出 context.write(key, new IntWritable(count)); } }
(b)在
Job
驅(qū)動(dòng)類中設(shè)置:job.setCombinerClass(WordcountCombiner.class);
8??
Combiner
合并案例實(shí)操
1.需求
??統(tǒng)計(jì)過程中對(duì)每一個(gè)MapTask
的輸出進(jìn)行局部匯總葫男,以減小網(wǎng)絡(luò)傳輸量即采用Combiner
功能。
(1)數(shù)據(jù)輸入banzhang ni hao xihuan hadoop banzhang banzhang ni hao xihuan hadoop banzhang
(2)期望輸出數(shù)據(jù)
期望:Combiner
輸入數(shù)據(jù)多往踢,輸出時(shí)經(jīng)過合并腾誉,輸出數(shù)據(jù)降低。
2.需求分析3.案例實(shí)操-方案一
1)增加一個(gè)WordcountCombiner
類繼承Reducer
package com.xxx.mr.combiner; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{ IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 1 匯總 int sum = 0; for(IntWritable value :values){ sum += value.get(); } v.set(sum); // 2 寫出 context.write(key, v); } }
2)在
WordcountDriver
驅(qū)動(dòng)類中指定Combiner
// 指定需要使用combiner峻呕,以及用哪個(gè)類作為combiner的邏輯 job.setCombinerClass(WordcountCombiner.class);
4.案例實(shí)操-方案二
1)將WordcountReducer
作為Combiner
在WordcountDriver
驅(qū)動(dòng)類中指定// 指定需要使用Combiner利职,以及用哪個(gè)類作為Combiner的邏輯 job.setCombinerClass(WordcountReducer.class);
- 運(yùn)行程序見下圖所示
9??GroupingComparator
分組(輔助排序)
對(duì)Reduce
階段的數(shù)據(jù)根據(jù)某一個(gè)或幾個(gè)字段進(jìn)行分組。
分組排序步驟:
(1)自定義類繼承WritableComparator
(2)重寫compare()
方法@Override public int compare(WritableComparable a, WritableComparable b) { // 比較的業(yè)務(wù)邏輯 return result; }
(3)創(chuàng)建一個(gè)構(gòu)造將比較對(duì)象的類傳給父類
protected OrderGroupingComparator() { super(OrderBean.class, true); }
??
GroupingComparator
分組案例實(shí)操
1.需求 : 有如下訂單數(shù)據(jù)現(xiàn)在需要求出每一個(gè)訂單中最貴的商品瘦癌。
(1)輸入數(shù)據(jù)0000001 Pdt_01 222.8 0000002 Pdt_05 722.4 0000001 Pdt_02 33.8 0000003 Pdt_06 232.8 0000003 Pdt_02 33.8 0000002 Pdt_03 522.8 0000002 Pdt_04 122.4
(2)期望輸出數(shù)據(jù)
1 222.8 2 722.4 3 232.8
2.需求分析
(1)利用“訂單id
和成交金額”作為key
猪贪,可以將Map
階段讀取到的所有訂單數(shù)據(jù)按照id
升序排序,如果id
相同再按照金額降序排序讯私,發(fā)送到Reduce
热押。
(2)在Reduce
端利用groupingComparator
將訂單id
相同的kv
聚合成組,然后取第一個(gè)即是該訂單中最貴商品斤寇,如下圖所示桶癣。3.代碼實(shí)現(xiàn)
(1)定義訂單信息OrderBean
類package com.xxx.mapreduce.order; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class OrderBean implements WritableComparable<OrderBean> { private int order_id; // 訂單id號(hào) private double price; // 價(jià)格 public OrderBean() { super(); } public OrderBean(int order_id, double price) { super(); this.order_id = order_id; this.price = price; } @Override public void write(DataOutput out) throws IOException { out.writeInt(order_id); out.writeDouble(price); } @Override public void readFields(DataInput in) throws IOException { order_id = in.readInt(); price = in.readDouble(); } @Override public String toString() { return order_id + "\t" + price; } public int getOrder_id() { return order_id; } public void setOrder_id(int order_id) { this.order_id = order_id; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } // 二次排序 @Override public int compareTo(OrderBean o) { int result; if (order_id > o.getOrder_id()) { result = 1; } else if (order_id < o.getOrder_id()) { result = -1; } else { // 價(jià)格倒序排序 result = price > o.getPrice() ? -1 : 1; } return result; } }
(2)編寫
OrderSortMapper
類package com.xxx.mapreduce.order; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> { OrderBean k = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 獲取一行 String line = value.toString(); // 2 截取 String[] fields = line.split("\t"); // 3 封裝對(duì)象 k.setOrder_id(Integer.parseInt(fields[0])); k.setPrice(Double.parseDouble(fields[2])); // 4 寫出 context.write(k, NullWritable.get()); } }
(3)編寫
OrderSortGroupingComparator
類package com.xxx.mapreduce.order; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class OrderGroupingComparator extends WritableComparator { protected OrderGroupingComparator() { super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean aBean = (OrderBean) a; OrderBean bBean = (OrderBean) b; int result; if (aBean.getOrder_id() > bBean.getOrder_id()) { result = 1; } else if (aBean.getOrder_id() < bBean.getOrder_id()) { result = -1; } else { result = 0; } return result; } }
(4)編寫
OrderSortReducer
類package com.xxx.mapreduce.order; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> { @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }
(5)編寫
OrderSortDriver
類package com.xxx.mapreduce.order; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class OrderDriver { public static void main(String[] args) throws Exception, IOException { // 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置 args = new String[]{"e:/input/inputorder" , "e:/output1"}; // 1 獲取配置信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 設(shè)置jar包加載路徑 job.setJarByClass(OrderDriver.class); // 3 加載map/reduce類 job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); // 4 設(shè)置map輸出數(shù)據(jù)key和value類型 job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); // 5 設(shè)置最終輸出數(shù)據(jù)的key和value類型 job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); // 6 設(shè)置輸入數(shù)據(jù)和輸出數(shù)據(jù)路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 8 設(shè)置reduce端的分組 >job.setGroupingComparatorClass(OrderGroupingComparator.class); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
二 MapTask
工作機(jī)制
(1)Read
階段:MapTask
通過用戶編寫的RecordReader
,從輸入InputSplit
中解析出一個(gè)個(gè)key/value
娘锁。
(2)Map
階段:該節(jié)點(diǎn)主要是將解析出的key/value
交給用戶編寫map()
函數(shù)處理牙寞,并產(chǎn)生一系列新的key/value
。
(3)Collect
收集階段:在用戶編寫map()
函數(shù)中莫秆,當(dāng)數(shù)據(jù)處理完成后间雀,一般會(huì)調(diào)用OutputCollector.collect()
輸出結(jié)果。在該函數(shù)內(nèi)部镊屎,它會(huì)將生成的key/value
分區(qū)(調(diào)用Partitioner
)惹挟,并寫入一個(gè)環(huán)形內(nèi)存緩沖區(qū)中。
(4)Spill
階段:即“溢寫”缝驳,當(dāng)環(huán)形緩沖區(qū)滿后连锯,MapReduce
會(huì)將數(shù)據(jù)寫到本地磁盤上归苍,生成一個(gè)臨時(shí)文件。需要注意的是运怖,將數(shù)據(jù)寫入本地磁盤之前霜医,先要對(duì)數(shù)據(jù)進(jìn)行一次本地排序,并在必要時(shí)對(duì)數(shù)據(jù)進(jìn)行合并驳规、壓縮等操作。
溢寫階段詳情:
步驟1:利用快速排序算法對(duì)緩存區(qū)內(nèi)的數(shù)據(jù)進(jìn)行排序署海,排序方式是吗购,先按照分區(qū)編號(hào)Partition
進(jìn)行排序,然后按照key
進(jìn)行排序砸狞。這樣捻勉,經(jīng)過排序后,數(shù)據(jù)以分區(qū)為單位聚集在一起刀森,且同一分區(qū)內(nèi)所有數(shù)據(jù)按照key
有序踱启。
步驟2:按照分區(qū)編號(hào)由小到大依次將每個(gè)分區(qū)中的數(shù)據(jù)寫入任務(wù)工作目錄下的臨時(shí)文件output/spillN.out
(N
表示當(dāng)前溢寫次數(shù))中。如果用戶設(shè)置了Combiner
研底,則寫入文件之前埠偿,對(duì)每個(gè)分區(qū)中的數(shù)據(jù)進(jìn)行一次聚集操作。
步驟3:將分區(qū)數(shù)據(jù)的元信息寫到內(nèi)存索引數(shù)據(jù)結(jié)構(gòu)SpillRecord
中榜晦,其中每個(gè)分區(qū)的元信息包括在臨時(shí)文件中的偏移量冠蒋、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小。如果當(dāng)前內(nèi)存索引大小超過1MB
乾胶,則將內(nèi)存索引寫到文件output/spillN.out.index
中抖剿。
(5)Combine
階段:當(dāng)所有數(shù)據(jù)處理完成后,MapTask
對(duì)所有臨時(shí)文件進(jìn)行一次合并识窿,以確保最終只會(huì)生成一個(gè)數(shù)據(jù)文件斩郎。
當(dāng)所有數(shù)據(jù)處理完后,MapTask
會(huì)將所有臨時(shí)文件合并成一個(gè)大文件喻频,并保存到文件output/file.out
中缩宜,同時(shí)生成相應(yīng)的索引文件output/file.out.index
。
在進(jìn)行文件合并過程中半抱,MapTask
以分區(qū)為單位進(jìn)行合并脓恕。對(duì)于某個(gè)分區(qū),它將采用多輪遞歸合并的方式窿侈。每輪合并io.sort.factor
(默認(rèn)10
)個(gè)文件炼幔,并將產(chǎn)生的文件重新加入待合并列表中,對(duì)文件排序后史简,重復(fù)以上過程乃秀,直到最終得到一個(gè)大文件肛著。
讓每個(gè)MapTask
最終只生成一個(gè)數(shù)據(jù)文件,可避免同時(shí)打開大量文件和同時(shí)讀取大量小文件產(chǎn)生的隨機(jī)讀取帶來的開銷跺讯。
三 ReduceTask
工作機(jī)制
(1)Copy
階段:ReduceTask
從各個(gè)MapTask
上遠(yuǎn)程拷貝一片數(shù)據(jù)枢贿,并針對(duì)某一片數(shù)據(jù),如果其大小超過一定閾值刀脏,則寫到磁盤上局荚,否則直接放到內(nèi)存中。
(2)Merge
階段:在遠(yuǎn)程拷貝數(shù)據(jù)的同時(shí)愈污,ReduceTask
啟動(dòng)了兩個(gè)后臺(tái)線程對(duì)內(nèi)存和磁盤上的文件進(jìn)行合并耀态,以防止內(nèi)存使用過多或磁盤上文件過多。
(3)Sort
階段:按照MapReduce
語義暂雹,用戶編寫reduce()
函數(shù)輸入數(shù)據(jù)是按key
進(jìn)行聚集的一組數(shù)據(jù)首装。為了將key
相同的數(shù)據(jù)聚在一起,Hadoop
采用了基于排序的策略杭跪。由于各個(gè)MapTask
已經(jīng)實(shí)現(xiàn)對(duì)自己的處理結(jié)果進(jìn)行了局部排序仙逻,因此,ReduceTask
只需對(duì)所有數(shù)據(jù)進(jìn)行一次歸并排序即可涧尿。
(4)Reduce
階段:reduce()
函數(shù)將計(jì)算結(jié)果寫到HDFS
上系奉。
1.設(shè)置ReduceTask
并行度(個(gè)數(shù))
?ReduceTask
的并行度同樣影響整個(gè)Job
的執(zhí)行并發(fā)度和執(zhí)行效率,但與MapTask
的并發(fā)數(shù)由切片數(shù)決定不同姑廉,ReduceTask
數(shù)量的決定是可以直接手動(dòng)設(shè)置:job.setNumReduceTasks(4); // 默認(rèn)值是1喜最,手動(dòng)設(shè)置為4
;
2.實(shí)驗(yàn):測(cè)試ReduceTask
多少合適
(1)實(shí)驗(yàn)環(huán)境:1
個(gè)Master
節(jié)點(diǎn),16
個(gè)Slave
節(jié)點(diǎn):CPU
:8GHZ
庄蹋,內(nèi)存:2G
(2)實(shí)驗(yàn)結(jié)論:(數(shù)據(jù)量為1GB
)3.注意事項(xiàng)
四 OutputFormat
數(shù)據(jù)輸出
1??
OutputFormat
接口實(shí)現(xiàn)類2??自定義OutputFormat
3??自定義OutputFormat
案例實(shí)操
1.需求
?過濾輸入的log
日志煤伟,包含xxx
的網(wǎng)站輸出到e:/xxx.log
沈条,不包含xxx
的網(wǎng)站輸出到e:/other.log
。
(1)輸入數(shù)據(jù)http://www.baidu.com http://www.google.com http://cn.bing.com http://www.xxx.com http://www.sohu.com http://www.sina.com http://www.sin2a.com http://www.sin2desa.com http://www.sindsafa.com
(2)期望輸出數(shù)據(jù)
xxx.log :http://www.xxx.com
other.log :http://cn.bing.com http://www.baidu.com http://www.google.com http://www.sin2a.com http://www.sin2desa.com http://www.sina.com http://www.sindsafa.com http://www.sohu.com
2.需求分析
3.案例實(shí)操
(1)編寫FilterMapper
類package com.xxx.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 寫出 context.write(value, NullWritable.get()); } }
(2)編寫
FilterReducer
類package com.xxx.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> { Text k = new Text(); @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { // 1 獲取一行 String line = key.toString(); // 2 拼接 line = line + "\r\n"; // 3 設(shè)置key k.set(line); // 4 輸出 context.write(k, NullWritable.get()); } }
(3)自定義一個(gè)
OutputFormat
類package com.xxx.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{ @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { // 創(chuàng)建一個(gè)RecordWriter return new FilterRecordWriter(job); } }
(4)編寫
RecordWriter
類package com.xxx.mapreduce.outputformat; import java.io.IOException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; public class FilterRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream xxxOut = null; FSDataOutputStream otherOut = null; public FilterRecordWriter(TaskAttemptContext job) { // 1 獲取文件系統(tǒng) FileSystem fs; try { fs = FileSystem.get(job.getConfiguration()); // 2 創(chuàng)建輸出文件路徑 Path xxxPath = new Path("e:/xxx.log"); Path otherPath = new Path("e:/other.log"); // 3 創(chuàng)建輸出流 xxxOut = fs.create(xxxPath); otherOut = fs.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // 判斷是否包含“xxx”輸出到不同文件 if (key.toString().contains("xxx")) { xxxOut.write(key.toString().getBytes()); } else { otherOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 關(guān)閉資源 IOUtils.closeStream(xxxOut); IOUtils.closeStream(otherOut); } }
(5)編寫
FilterDriver
類package com.xxx.mapreduce.outputformat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FilterDriver { public static void main(String[] args) throws Exception { // 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置 args = new String[] { "e:/input/inputoutputformat", "e:/output2" }; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FilterDriver.class); job.setMapperClass(FilterMapper.class); job.setReducerClass(FilterReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要將自定義的輸出格式組件設(shè)置到j(luò)ob中 job.setOutputFormatClass(FilterOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); // 雖然我們自定義了outputformat,但是因?yàn)槲覀兊膐utputformat繼承自fileoutputformat // 而fileoutputformat要輸出一個(gè)_SUCCESS文件纽什,所以楼吃,在這還得指定一個(gè)輸出目錄 FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }