7_大數(shù)據(jù)之MapReduce_2

Shuffle機(jī)制

1??Shuffle機(jī)制 : Map方法之后,Reduce方法之前的數(shù)據(jù)處理過程稱之為Shuffle.

2??Partition分區(qū)
3??Partition分區(qū)案例實(shí)操
1.需求
將統(tǒng)計(jì)結(jié)果按照手機(jī)歸屬地不同省份輸出到不同文件中(分區(qū))
(1)輸入數(shù)據(jù)

1  13736230513 192.196.100.1   www.xxx.com 2481    24681   200
2  13846544121 192.196.100.2           264 0   200
3  13956435636 192.196.100.3           132 1512    200
4  13966251146 192.168.100.1           240 0   404
5  18271575951 192.168.100.2   www.xxx.com 1527    2106    200
6  84188413    192.168.100.3   www.xxx.com 4116    1432    200
7  13590439668 192.168.100.4           1116    954 200
8  15910133277 192.168.100.5   www.hao123.com  3156    2936    200
9  13729199489 192.168.100.6           240 0   200
10 13630577991 192.168.100.7   www.shouhu.com  6960    690 200
11 15043685818 192.168.100.8   www.baidu.com   3659    3538    200
12 15959002129 192.168.100.9   www.xxx.com 1938    180 500
13 13560439638 192.168.100.10          918 4938    200
14 13470253144 192.168.100.11          180 180 200
15 13682846555 192.168.100.12  www.qq.com  1938    2910    200
16 13992314666 192.168.100.13  www.gaga.com    3008    3720    200
17 13509468723 192.168.100.14  www.qinghua.com 7335    110349  404
18 18390173782 192.168.100.15  www.sogou.com   9531    2412    200
19 13975057813 192.168.100.16  www.baidu.com   11058   48243   200
20 13768778790 192.168.100.17          120 120 200
21 13568436656 192.168.100.18  www.alibaba.com 2481    24681   200
22 13568436656 192.168.100.19          1116    954 200

(2)期望輸出數(shù)據(jù)
??手機(jī)號(hào)136137谈况、138嘲玫、139開頭都分別放到一個(gè)獨(dú)立的4個(gè)文件中泽裳,其他開頭的放到一個(gè)文件中网梢。
2.需求分析

3.在案例2.4的基礎(chǔ)上,增加一個(gè)分區(qū)類

package com.xxx.mapreduce.flowsum;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

  @Override
  public int getPartition(Text key, FlowBean value, int numPartitions) {

      // 1 獲取電話號(hào)碼的前三位
      String preNum = key.toString().substring(0, 3);
      
      int partition = 4;
      
      // 2 判斷是哪個(gè)省
      if ("136".equals(preNum)) {
          partition = 0;
      }else if ("137".equals(preNum)) {
          partition = 1;
      }else if ("138".equals(preNum)) {
          partition = 2;
      }else if ("139".equals(preNum)) {
          partition = 3;
      }

      return partition;
  }
}

4.在驅(qū)動(dòng)函數(shù)中增加自定義數(shù)據(jù)分區(qū)設(shè)置和ReduceTask設(shè)置

package com.xxx.mapreduce.flowsum;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowsumDriver {

  public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

      // 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置
      args = new String[]{"e:/output1","e:/output2"};

      // 1 獲取配置信息矾踱,或者job對(duì)象實(shí)例
      Configuration configuration = new Configuration();
      Job job = Job.getInstance(configuration);

      // 2 指定本程序的jar包所在的本地路徑
      job.setJarByClass(FlowsumDriver.class);

      // 3 指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類
      job.setMapperClass(FlowCountMapper.class);
      job.setReducerClass(FlowCountReducer.class);

      // 4 指定mapper輸出數(shù)據(jù)的kv類型
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(FlowBean.class);

      // 5 指定最終輸出的數(shù)據(jù)的kv類型
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(FlowBean.class);

      // 8 指定自定義數(shù)據(jù)分區(qū)
      job.setPartitionerClass(ProvincePartitioner.class);

      // 9 同時(shí)指定相應(yīng)數(shù)量的reduce task
      job.setNumReduceTasks(5);
      
      // 6 指定job的輸入原始文件所在目錄
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      // 7 將job中配置的相關(guān)參數(shù)反粥,以及job所用的java類所在的jar包, 提交給yarn去運(yùn)行
      boolean result = job.waitForCompletion(true);
      System.exit(result ? 0 : 1);
  }
}

4??WritableComparable排序

1. 排序分類
2.自定義排序WritableComparable
(1)原理分析
??bean對(duì)象做為key傳輸吹由,需要實(shí)現(xiàn)WritableComparable接口重寫compareTo方法若未,就可以實(shí)現(xiàn)排序。

@Override
public int compareTo(FlowBean o) {

  int result;
      
  // 按照總流量大小倾鲫,倒序排列
  if (sumFlow > bean.getSumFlow()) {
      result = -1;
  }else if (sumFlow < bean.getSumFlow()) {
      result = 1;
  }else {
      result = 0;
  }

  return result;
}

5??WritableComparable排序案例實(shí)操(全排序)
1.需求
根據(jù)案例2.3產(chǎn)生的結(jié)果再次對(duì)總流量進(jìn)行排序粗合。
(1)輸入數(shù)據(jù) : 原始數(shù)據(jù)

1  13736230513 192.196.100.1   www.xxx.com 2481    24681   200
2  13846544121 192.196.100.2           264 0   200
3  13956435636 192.196.100.3           132 1512    200
4  13966251146 192.168.100.1           240 0   404
5  18271575951 192.168.100.2   www.xxx.com 1527    2106    200
6  84188413    192.168.100.3   www.xxx.com 4116    1432    200
7  13590439668 192.168.100.4           1116    954 200
8  15910133277 192.168.100.5   www.hao123.com  3156    2936    200
9  13729199489 192.168.100.6           240 0   200
10 13630577991 192.168.100.7   www.shouhu.com  6960    690 200
11 15043685818 192.168.100.8   www.baidu.com   3659    3538    200
12 15959002129 192.168.100.9   www.xxx.com 1938    180 500
13 13560439638 192.168.100.10          918 4938    200
14 13470253144 192.168.100.11          180 180 200
15 13682846555 192.168.100.12  www.qq.com  1938    2910    200
16 13992314666 192.168.100.13  www.gaga.com    3008    3720    200
17 13509468723 192.168.100.14  www.qinghua.com 7335    110349  404
18 18390173782 192.168.100.15  www.sogou.com   9531    2412    200
19 13975057813 192.168.100.16  www.baidu.com   11058   48243   200
20 13768778790 192.168.100.17          120 120 200
21 13568436656 192.168.100.18  www.alibaba.com 2481    24681   200
22 13568436656 192.168.100.19          1116    954 200

(2)第一次處理后的數(shù)據(jù)

13470253144    180 180 360
13509468723    7335    110349  117684
13560439638    918 4938    5856
13568436656    3597    25635   29232
13590439668    1116    954 2070
13630577991    6960    690 7650
13682846555    1938    2910    4848
13729199489    240 0   240
13736230513    2481    24681   27162
13768778790    120 120 240
13846544121    264 0   264
13956435636    132 1512    1644
13966251146    240 0   240
13975057813    11058   48243   59301
13992314666    3008    3720    6728
15043685818    3659    3538    7197
15910133277    3156    2936    6092
15959002129    1938    180 2118
18271575951    1527    2106    3633
18390173782    9531    2412    11943
84188413   4116    1432    5548

(3)期望輸出數(shù)據(jù)

13509468723    7335    110349  117684
13736230513    2481    24681   27162
13956435636    132     1512    1644
13846544121    264     0       264
。乌昔。隙疚。 。磕道。供屉。

2.需求分析

3.代碼實(shí)現(xiàn)
(1)FlowBean對(duì)象在在需求1基礎(chǔ)上增加了比較功能

package com.xxx.mapreduce.sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean> {

  private long upFlow;
  private long downFlow;
  private long sumFlow;

  // 反序列化時(shí),需要反射調(diào)用空參構(gòu)造函數(shù),所以必須有
  public FlowBean() {
      super();
  }

  public FlowBean(long upFlow, long downFlow) {
      super();
      this.upFlow = upFlow;
      this.downFlow = downFlow;
      this.sumFlow = upFlow + downFlow;
  }

  public void set(long upFlow, long downFlow) {
      this.upFlow = upFlow;
      this.downFlow = downFlow;
      this.sumFlow = upFlow + downFlow;
  }

  public long getSumFlow() {
      return sumFlow;
  }

  public void setSumFlow(long sumFlow) {
      this.sumFlow = sumFlow;
  }   

  public long getUpFlow() {
      return upFlow;
  }

  public void setUpFlow(long upFlow) {
      this.upFlow = upFlow;
  }

  public long getDownFlow() {
      return downFlow;
  }

  public void setDownFlow(long downFlow) {
      this.downFlow = downFlow;
  }

  /**
   * 序列化方法
   * @param out
   * @throws IOException
   */
  @Override
  public void write(DataOutput out) throws IOException {
      out.writeLong(upFlow);
      out.writeLong(downFlow);
      out.writeLong(sumFlow);
  }

  /**
   * 反序列化方法 注意反序列化的順序和序列化的順序完全一致
   * @param in
   * @throws IOException
   */
  @Override
  public void readFields(DataInput in) throws IOException {
      upFlow = in.readLong();
      downFlow = in.readLong();
      sumFlow = in.readLong();
  }

  @Override
  public String toString() {
      return upFlow + "\t" + downFlow + "\t" + sumFlow;
  }

  @Override
  public int compareTo(FlowBean o) {
      
      int result;
      
      // 按照總流量大小伶丐,倒序排列
      if (sumFlow > bean.getSumFlow()) {
          result = -1;
      }else if (sumFlow < bean.getSumFlow()) {
          result = 1;
      }else {
          result = 0;
      }

      return result;
  }
}

(2)編寫Mapper

package com.xxx.mapreduce.sort;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{

  FlowBean bean = new FlowBean();
  Text v = new Text();

  @Override
  protected void map(LongWritable key, Text value, Context context)   throws IOException, InterruptedException {

      // 1 獲取一行
      String line = value.toString();
      
      // 2 截取
      String[] fields = line.split("\t");
      
      // 3 封裝對(duì)象
      String phoneNbr = fields[0];
      long upFlow = Long.parseLong(fields[1]);
      long downFlow = Long.parseLong(fields[2]);
      
      bean.set(upFlow, downFlow);
      v.set(phoneNbr);
      
      // 4 輸出
      context.write(bean, v);
  }
}

(3)編寫Reducer

package com.xxx.mapreduce.sort;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{

  @Override
  protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      
      // 循環(huán)輸出悼做,避免總流量相同情況
      for (Text text : values) {
          context.write(text, key);
      }
  }
}

(4)編寫Driver

package com.xxx.mapreduce.sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowCountSortDriver {

  public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {

      // 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置
      args = new String[]{"e:/output1","e:/output2"};

      // 1 獲取配置信息,或者job對(duì)象實(shí)例
      Configuration configuration = new Configuration();
      Job job = Job.getInstance(configuration);

      // 2 指定本程序的jar包所在的本地路徑
      job.setJarByClass(FlowCountSortDriver.class);

      // 3 指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類
      job.setMapperClass(FlowCountSortMapper.class);
      job.setReducerClass(FlowCountSortReducer.class);

      // 4 指定mapper輸出數(shù)據(jù)的kv類型
      job.setMapOutputKeyClass(FlowBean.class);
      job.setMapOutputValueClass(Text.class);

      // 5 指定最終輸出的數(shù)據(jù)的kv類型
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(FlowBean.class);

      // 6 指定job的輸入原始文件所在目錄
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
      
      // 7 將job中配置的相關(guān)參數(shù)哗魂,以及job所用的java類所在的jar包肛走, 提交給yarn去運(yùn)行
      boolean result = job.waitForCompletion(true);
      System.exit(result ? 0 : 1);
  }
}

6??WritableComparable排序案例實(shí)操(區(qū)內(nèi)排序)
1.需求 : 要求每個(gè)省份手機(jī)號(hào)輸出的文件中按照總流量?jī)?nèi)部排序。
2.需求分析 : 基于前一個(gè)需求录别,增加自定義分區(qū)類朽色,分區(qū)按照省份手機(jī)號(hào)設(shè)置。

3.案例實(shí)操
(1)增加自定義分區(qū)類

package com.xxx.mapreduce.sort;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<FlowBean, Text> {

  @Override
  public int getPartition(FlowBean key, Text value, int numPartitions) {
      
      // 1 獲取手機(jī)號(hào)碼前三位
      String preNum = value.toString().substring(0, 3);
      
      int partition = 4;
      
      // 2 根據(jù)手機(jī)號(hào)歸屬地設(shè)置分區(qū)
      if ("136".equals(preNum)) {
          partition = 0;
      }else if ("137".equals(preNum)) {
          partition = 1;
      }else if ("138".equals(preNum)) {
          partition = 2;
      }else if ("139".equals(preNum)) {
          partition = 3;
      }

      return partition;
  }
}

(2)在驅(qū)動(dòng)類中添加分區(qū)類

// 加載自定義分區(qū)類
job.setPartitionerClass(ProvincePartitioner.class);

// 設(shè)置Reducetask個(gè)數(shù)
job.setNumReduceTasks(5);

7??Combiner合并

自定義Combiner實(shí)現(xiàn)步驟
(a)自定義一個(gè)Combiner繼承Reducer组题,重寫Reduce方法

public class WordcountCombiner extends Reducer<Text, IntWritable, Text,IntWritable>{

  @Override
  protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

     // 1 匯總操作
      int count = 0;
      for(IntWritable v :values){
          count += v.get();
      }

     // 2 寫出
      context.write(key, new IntWritable(count));
  }
}

(b)在Job驅(qū)動(dòng)類中設(shè)置:

job.setCombinerClass(WordcountCombiner.class);

8??Combiner合并案例實(shí)操
1.需求
??統(tǒng)計(jì)過程中對(duì)每一個(gè)MapTask的輸出進(jìn)行局部匯總葫男,以減小網(wǎng)絡(luò)傳輸量即采用Combiner功能。
(1)數(shù)據(jù)輸入

banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang

(2)期望輸出數(shù)據(jù)
期望:Combiner輸入數(shù)據(jù)多往踢,輸出時(shí)經(jīng)過合并腾誉,輸出數(shù)據(jù)降低。
2.需求分析

3.案例實(shí)操-方案一
1)增加一個(gè)WordcountCombiner類繼承Reducer

package com.xxx.mr.combiner;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{

IntWritable v = new IntWritable();

  @Override
  protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

      // 1 匯總
      int sum = 0;

      for(IntWritable value :values){
          sum += value.get();
      }

      v.set(sum);

      // 2 寫出
      context.write(key, v);
  }
}

2)在WordcountDriver驅(qū)動(dòng)類中指定Combiner

// 指定需要使用combiner峻呕,以及用哪個(gè)類作為combiner的邏輯
job.setCombinerClass(WordcountCombiner.class);

4.案例實(shí)操-方案二
1)將WordcountReducer作為CombinerWordcountDriver驅(qū)動(dòng)類中指定

// 指定需要使用Combiner利职,以及用哪個(gè)類作為Combiner的邏輯
job.setCombinerClass(WordcountReducer.class);
  1. 運(yùn)行程序見下圖所示
    9??GroupingComparator分組(輔助排序)
    對(duì)Reduce階段的數(shù)據(jù)根據(jù)某一個(gè)或幾個(gè)字段進(jìn)行分組。
    分組排序步驟:
    (1)自定義類繼承WritableComparator
    (2)重寫compare()方法
@Override
public int compare(WritableComparable a, WritableComparable b) {
      // 比較的業(yè)務(wù)邏輯
      return result;
}

(3)創(chuàng)建一個(gè)構(gòu)造將比較對(duì)象的類傳給父類

protected OrderGroupingComparator() {
      super(OrderBean.class, true);
}

??GroupingComparator分組案例實(shí)操
1.需求 : 有如下訂單數(shù)據(jù)

現(xiàn)在需要求出每一個(gè)訂單中最貴的商品瘦癌。
(1)輸入數(shù)據(jù)

0000001    Pdt_01  222.8
0000002    Pdt_05  722.4
0000001    Pdt_02  33.8
0000003    Pdt_06  232.8
0000003    Pdt_02  33.8
0000002    Pdt_03  522.8
0000002    Pdt_04  122.4

(2)期望輸出數(shù)據(jù)

1  222.8
2  722.4
3  232.8

2.需求分析
(1)利用“訂單id和成交金額”作為key猪贪,可以將Map階段讀取到的所有訂單數(shù)據(jù)按照id升序排序,如果id相同再按照金額降序排序讯私,發(fā)送到Reduce热押。
(2)在Reduce端利用groupingComparator將訂單id相同的kv聚合成組,然后取第一個(gè)即是該訂單中最貴商品斤寇,如下圖所示桶癣。

3.代碼實(shí)現(xiàn)
(1)定義訂單信息OrderBean

package com.xxx.mapreduce.order;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class OrderBean implements WritableComparable<OrderBean> {

  private int order_id; // 訂單id號(hào)
  private double price; // 價(jià)格

  public OrderBean() {
      super();
  }

  public OrderBean(int order_id, double price) {
      super();
      this.order_id = order_id;
      this.price = price;
  }

  @Override
  public void write(DataOutput out) throws IOException {
      out.writeInt(order_id);
      out.writeDouble(price);
  }

  @Override
  public void readFields(DataInput in) throws IOException {
      order_id = in.readInt();
      price = in.readDouble();
  }

  @Override
  public String toString() {
      return order_id + "\t" + price;
  }

  public int getOrder_id() {
      return order_id;
  }

  public void setOrder_id(int order_id) {
      this.order_id = order_id;
  }

  public double getPrice() {
      return price;
  }

  public void setPrice(double price) {
      this.price = price;
  }

  // 二次排序
  @Override
  public int compareTo(OrderBean o) {

      int result;

      if (order_id > o.getOrder_id()) {
          result = 1;
      } else if (order_id < o.getOrder_id()) {
          result = -1;
      } else {
          // 價(jià)格倒序排序
          result = price > o.getPrice() ? -1 : 1;
      }

      return result;
  }
}

(2)編寫OrderSortMapper

package com.xxx.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {

  OrderBean k = new OrderBean();
  
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      
      // 1 獲取一行
      String line = value.toString();
      
      // 2 截取
      String[] fields = line.split("\t");
      
      // 3 封裝對(duì)象
      k.setOrder_id(Integer.parseInt(fields[0]));
      k.setPrice(Double.parseDouble(fields[2]));
      
      // 4 寫出
      context.write(k, NullWritable.get());
  }
}

(3)編寫OrderSortGroupingComparator

package com.xxx.mapreduce.order;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderGroupingComparator extends WritableComparator {

  protected OrderGroupingComparator() {
      super(OrderBean.class, true);
  }

  @Override
  public int compare(WritableComparable a, WritableComparable b) {

      OrderBean aBean = (OrderBean) a;
      OrderBean bBean = (OrderBean) b;

      int result;
      if (aBean.getOrder_id() > bBean.getOrder_id()) {
          result = 1;
      } else if (aBean.getOrder_id() < bBean.getOrder_id()) {
          result = -1;
      } else {
          result = 0;
      }

      return result;
  }
}

(4)編寫OrderSortReducer

package com.xxx.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {

  @Override
  protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context)        throws IOException, InterruptedException {
      
      context.write(key, NullWritable.get());
  }
}

(5)編寫OrderSortDriver

package com.xxx.mapreduce.order;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OrderDriver {

  public static void main(String[] args) throws Exception, IOException {

// 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置
      args  = new String[]{"e:/input/inputorder" , "e:/output1"};

      // 1 獲取配置信息
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf);

      // 2 設(shè)置jar包加載路徑
      job.setJarByClass(OrderDriver.class);

      // 3 加載map/reduce類
      job.setMapperClass(OrderMapper.class);
      job.setReducerClass(OrderReducer.class);

      // 4 設(shè)置map輸出數(shù)據(jù)key和value類型
      job.setMapOutputKeyClass(OrderBean.class);
      job.setMapOutputValueClass(NullWritable.class);

      // 5 設(shè)置最終輸出數(shù)據(jù)的key和value類型
      job.setOutputKeyClass(OrderBean.class);
      job.setOutputValueClass(NullWritable.class);

      // 6 設(shè)置輸入數(shù)據(jù)和輸出數(shù)據(jù)路徑
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      // 8 設(shè)置reduce端的分組
  >job.setGroupingComparatorClass(OrderGroupingComparator.class);

      // 7 提交
      boolean result = job.waitForCompletion(true);
      System.exit(result ? 0 : 1);
  }
}

MapTask工作機(jī)制

(1)Read階段:MapTask通過用戶編寫的RecordReader,從輸入InputSplit中解析出一個(gè)個(gè)key/value娘锁。
(2)Map階段:該節(jié)點(diǎn)主要是將解析出的key/value交給用戶編寫map()函數(shù)處理牙寞,并產(chǎn)生一系列新的key/value
(3)Collect收集階段:在用戶編寫map()函數(shù)中莫秆,當(dāng)數(shù)據(jù)處理完成后间雀,一般會(huì)調(diào)用OutputCollector.collect()輸出結(jié)果。在該函數(shù)內(nèi)部镊屎,它會(huì)將生成的key/value分區(qū)(調(diào)用Partitioner)惹挟,并寫入一個(gè)環(huán)形內(nèi)存緩沖區(qū)中。
(4)Spill階段:即“溢寫”缝驳,當(dāng)環(huán)形緩沖區(qū)滿后连锯,MapReduce會(huì)將數(shù)據(jù)寫到本地磁盤上归苍,生成一個(gè)臨時(shí)文件。需要注意的是运怖,將數(shù)據(jù)寫入本地磁盤之前霜医,先要對(duì)數(shù)據(jù)進(jìn)行一次本地排序,并在必要時(shí)對(duì)數(shù)據(jù)進(jìn)行合并驳规、壓縮等操作。
溢寫階段詳情:
步驟1:利用快速排序算法對(duì)緩存區(qū)內(nèi)的數(shù)據(jù)進(jìn)行排序署海,排序方式是吗购,先按照分區(qū)編號(hào)Partition進(jìn)行排序,然后按照key進(jìn)行排序砸狞。這樣捻勉,經(jīng)過排序后,數(shù)據(jù)以分區(qū)為單位聚集在一起刀森,且同一分區(qū)內(nèi)所有數(shù)據(jù)按照key有序踱启。
步驟2:按照分區(qū)編號(hào)由小到大依次將每個(gè)分區(qū)中的數(shù)據(jù)寫入任務(wù)工作目錄下的臨時(shí)文件output/spillN.outN表示當(dāng)前溢寫次數(shù))中。如果用戶設(shè)置了Combiner研底,則寫入文件之前埠偿,對(duì)每個(gè)分區(qū)中的數(shù)據(jù)進(jìn)行一次聚集操作。
步驟3:將分區(qū)數(shù)據(jù)的元信息寫到內(nèi)存索引數(shù)據(jù)結(jié)構(gòu)SpillRecord中榜晦,其中每個(gè)分區(qū)的元信息包括在臨時(shí)文件中的偏移量冠蒋、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小。如果當(dāng)前內(nèi)存索引大小超過1MB乾胶,則將內(nèi)存索引寫到文件output/spillN.out.index中抖剿。
(5)Combine階段:當(dāng)所有數(shù)據(jù)處理完成后,MapTask對(duì)所有臨時(shí)文件進(jìn)行一次合并识窿,以確保最終只會(huì)生成一個(gè)數(shù)據(jù)文件斩郎。
當(dāng)所有數(shù)據(jù)處理完后,MapTask會(huì)將所有臨時(shí)文件合并成一個(gè)大文件喻频,并保存到文件output/file.out中缩宜,同時(shí)生成相應(yīng)的索引文件output/file.out.index
在進(jìn)行文件合并過程中半抱,MapTask以分區(qū)為單位進(jìn)行合并脓恕。對(duì)于某個(gè)分區(qū),它將采用多輪遞歸合并的方式窿侈。每輪合并io.sort.factor(默認(rèn)10)個(gè)文件炼幔,并將產(chǎn)生的文件重新加入待合并列表中,對(duì)文件排序后史简,重復(fù)以上過程乃秀,直到最終得到一個(gè)大文件肛著。
讓每個(gè)MapTask最終只生成一個(gè)數(shù)據(jù)文件,可避免同時(shí)打開大量文件和同時(shí)讀取大量小文件產(chǎn)生的隨機(jī)讀取帶來的開銷跺讯。


ReduceTask工作機(jī)制

(1)Copy階段:ReduceTask從各個(gè)MapTask上遠(yuǎn)程拷貝一片數(shù)據(jù)枢贿,并針對(duì)某一片數(shù)據(jù),如果其大小超過一定閾值刀脏,則寫到磁盤上局荚,否則直接放到內(nèi)存中。
(2)Merge階段:在遠(yuǎn)程拷貝數(shù)據(jù)的同時(shí)愈污,ReduceTask啟動(dòng)了兩個(gè)后臺(tái)線程對(duì)內(nèi)存和磁盤上的文件進(jìn)行合并耀态,以防止內(nèi)存使用過多或磁盤上文件過多。
(3)Sort階段:按照MapReduce語義暂雹,用戶編寫reduce()函數(shù)輸入數(shù)據(jù)是按key進(jìn)行聚集的一組數(shù)據(jù)首装。為了將key相同的數(shù)據(jù)聚在一起,Hadoop采用了基于排序的策略杭跪。由于各個(gè)MapTask已經(jīng)實(shí)現(xiàn)對(duì)自己的處理結(jié)果進(jìn)行了局部排序仙逻,因此,ReduceTask只需對(duì)所有數(shù)據(jù)進(jìn)行一次歸并排序即可涧尿。
(4)Reduce階段:reduce()函數(shù)將計(jì)算結(jié)果寫到HDFS上系奉。
1.設(shè)置ReduceTask并行度(個(gè)數(shù))
?ReduceTask的并行度同樣影響整個(gè)Job的執(zhí)行并發(fā)度和執(zhí)行效率,但與MapTask的并發(fā)數(shù)由切片數(shù)決定不同姑廉,ReduceTask數(shù)量的決定是可以直接手動(dòng)設(shè)置:job.setNumReduceTasks(4); // 默認(rèn)值是1喜最,手動(dòng)設(shè)置為4;
2.實(shí)驗(yàn):測(cè)試ReduceTask多少合適
(1)實(shí)驗(yàn)環(huán)境:1個(gè)Master節(jié)點(diǎn),16個(gè)Slave節(jié)點(diǎn):CPU:8GHZ庄蹋,內(nèi)存: 2G
(2)實(shí)驗(yàn)結(jié)論:(數(shù)據(jù)量為1GB
3.注意事項(xiàng)


OutputFormat數(shù)據(jù)輸出

1??OutputFormat接口實(shí)現(xiàn)類

2??自定義OutputFormat
3??自定義OutputFormat案例實(shí)操
1.需求
?過濾輸入的log日志煤伟,包含xxx的網(wǎng)站輸出到e:/xxx.log沈条,不包含xxx的網(wǎng)站輸出到e:/other.log
(1)輸入數(shù)據(jù)

http://www.baidu.com
http://www.google.com
http://cn.bing.com 
http://www.xxx.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com

(2)期望輸出數(shù)據(jù)
xxx.log : http://www.xxx.com
other.log :

http://cn.bing.com
http://www.baidu.com
http://www.google.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sina.com
http://www.sindsafa.com
http://www.sohu.com

2.需求分析

3.案例實(shí)操
(1)編寫FilterMapper

package com.xxx.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
  
  @Override
  protected void map(LongWritable key, Text value, Context context)   throws IOException, InterruptedException {

      // 寫出
      context.write(value, NullWritable.get());
  }
}

(2)編寫FilterReducer

package com.xxx.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

Text k = new Text();

  @Override
  protected void reduce(Text key, Iterable<NullWritable> values, Context context)     throws IOException, InterruptedException {

    // 1 獲取一行
      String line = key.toString();

      // 2 拼接
      line = line + "\r\n";

      // 3 設(shè)置key
      k.set(line);

      // 4 輸出
      context.write(k, NullWritable.get());
  }
}

(3)自定義一個(gè)OutputFormat

package com.xxx.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{

  @Override
  public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)         throws IOException, InterruptedException {

      // 創(chuàng)建一個(gè)RecordWriter
      return new FilterRecordWriter(job);
  }
}

(4)編寫RecordWriter

package com.xxx.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {

  FSDataOutputStream xxxOut = null;
  FSDataOutputStream otherOut = null;

  public FilterRecordWriter(TaskAttemptContext job) {

      // 1 獲取文件系統(tǒng)
      FileSystem fs;

      try {
          fs = FileSystem.get(job.getConfiguration());

          // 2 創(chuàng)建輸出文件路徑
          Path xxxPath = new Path("e:/xxx.log");
          Path otherPath = new Path("e:/other.log");

          // 3 創(chuàng)建輸出流
          xxxOut = fs.create(xxxPath);
          otherOut = fs.create(otherPath);
      } catch (IOException e) {
          e.printStackTrace();
      }
  }

  @Override
  public void write(Text key, NullWritable value) throws IOException, InterruptedException {

      // 判斷是否包含“xxx”輸出到不同文件
      if (key.toString().contains("xxx")) {
          xxxOut.write(key.toString().getBytes());
      } else {
          otherOut.write(key.toString().getBytes());
      }
  }

  @Override
  public void close(TaskAttemptContext context) throws IOException, InterruptedException {

      // 關(guān)閉資源
      IOUtils.closeStream(xxxOut);
      IOUtils.closeStream(otherOut);  }
}

(5)編寫FilterDriver

package com.xxx.mapreduce.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FilterDriver {

  public static void main(String[] args) throws Exception {

      // 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置
      args = new String[] { "e:/input/inputoutputformat", "e:/output2" };

      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf);

      job.setJarByClass(FilterDriver.class);
      job.setMapperClass(FilterMapper.class);
      job.setReducerClass(FilterReducer.class);

      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(NullWritable.class);
      
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(NullWritable.class);

      // 要將自定義的輸出格式組件設(shè)置到j(luò)ob中
      job.setOutputFormatClass(FilterOutputFormat.class);

      FileInputFormat.setInputPaths(job, new Path(args[0]));

      // 雖然我們自定義了outputformat,但是因?yàn)槲覀兊膐utputformat繼承自fileoutputformat
      // 而fileoutputformat要輸出一個(gè)_SUCCESS文件纽什,所以楼吃,在這還得指定一個(gè)輸出目錄
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      boolean result = job.waitForCompletion(true);
      System.exit(result ? 0 : 1);
  }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末倍宾,一起剝皮案震驚了整個(gè)濱河市旅急,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌扰柠,老刑警劉巖粉铐,帶你破解...
    沈念sama閱讀 217,277評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異卤档,居然都是意外死亡蝙泼,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門劝枣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來汤踏,“玉大人织鲸,你說我怎么就攤上這事∠海” “怎么了搂擦?”我有些...
    開封第一講書人閱讀 163,624評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)哗脖。 經(jīng)常有香客問我瀑踢,道長(zhǎng),這世上最難降的妖魔是什么才避? 我笑而不...
    開封第一講書人閱讀 58,356評(píng)論 1 293
  • 正文 為了忘掉前任丘损,我火速辦了婚禮,結(jié)果婚禮上工扎,老公的妹妹穿的比我還像新娘。我一直安慰自己衔蹲,他們只是感情好肢娘,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著舆驶,像睡著了一般橱健。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上沙廉,一...
    開封第一講書人閱讀 51,292評(píng)論 1 301
  • 那天拘荡,我揣著相機(jī)與錄音,去河邊找鬼撬陵。 笑死珊皿,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的巨税。 我是一名探鬼主播蟋定,決...
    沈念sama閱讀 40,135評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼草添!你這毒婦竟也來了驶兜?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,992評(píng)論 0 275
  • 序言:老撾萬榮一對(duì)情侶失蹤远寸,失蹤者是張志新(化名)和其女友劉穎抄淑,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體驰后,經(jīng)...
    沈念sama閱讀 45,429評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡肆资,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了灶芝。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片迅耘。...
    茶點(diǎn)故事閱讀 39,785評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡贱枣,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出颤专,到底是詐尸還是另有隱情纽哥,我是刑警寧澤,帶...
    沈念sama閱讀 35,492評(píng)論 5 345
  • 正文 年R本政府宣布栖秕,位于F島的核電站春塌,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏簇捍。R本人自食惡果不足惜只壳,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望暑塑。 院中可真熱鬧吼句,春花似錦、人聲如沸事格。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽驹愚。三九已至远搪,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間逢捺,已是汗流浹背谁鳍。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留劫瞳,地道東北人倘潜。 一個(gè)月前我還...
    沈念sama閱讀 47,891評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像志于,于是被迫代替她去往敵國(guó)和親窍荧。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容