8_大數(shù)據(jù)之MapReduce_3

Join多種應(yīng)用

1??Reduce Join

2??Reduce Join案例實操
1.需求

//order.txt
1001   01  1
1002   02  2
1003   03  3
1004   01  4
1005   02  5
1006   03  6
//pd.txt
01 小米
02 華為
03 格力

將商品信息表中數(shù)據(jù)根據(jù)商品pid合并到訂單數(shù)據(jù)表中先鱼。
2.需求分析
?通過將關(guān)聯(lián)條件作為Map輸出的key刽脖,將兩表滿足Join條件的數(shù)據(jù)并攜帶數(shù)據(jù)所來源的文件信息箕般,發(fā)往同一個ReduceTask角骤,在Reduce中進行數(shù)據(jù)的串聯(lián),如下圖所示
3.代碼實現(xiàn)
1)創(chuàng)建商品和訂合并后的Bean

package com.xxx.reducejoin;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {
   private String id;
   private String pid;
   private int amount;
   private String pname;

   @Override
   public String toString() {
       return id + "\t" + pname + "\t" + amount;
   }

   public String getId() {
       return id;
   }

   public void setId(String id) {
       this.id = id;
   }

   public String getPid() {
       return pid;
   }

   public void setPid(String pid) {
       this.pid = pid;
   }

   public int getAmount() {
       return amount;
   }

   public void setAmount(int amount) {
       this.amount = amount;
   }

   public String getPname() {
       return pname;
   }

   public void setPname(String pname) {
       this.pname = pname;
   }

   //按照Pid分組迅皇,組內(nèi)按照pname排序稽穆,有pname的在前
   @Override
   public int compareTo(OrderBean o) {
       int compare = this.pid.compareTo(o.pid);
       if (compare == 0) {
           return o.getPname().compareTo(this.getPname());
       } else {
           return compare;
       }
   }

   @Override
   public void write(DataOutput out) throws IOException {
       out.writeUTF(id);
       out.writeUTF(pid);
       out.writeInt(amount);
       out.writeUTF(pname);
   }

   @Override
   public void readFields(DataInput in) throws IOException {
       id = in.readUTF();
       pid = in.readUTF();
       amount = in.readInt();
       pname = in.readUTF();
   }
}

2)編寫TableMapper

package com.xxx.reducejoin;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {

   private String filename;

   private OrderBean order = new OrderBean();

   @Override
   protected void setup(Context context) throws IOException, InterruptedException {
       
       //獲取切片文件名
       FileSplit fs = (FileSplit) context.getInputSplit();
       filename = fs.getPath().getName();
   }

   @Override
   protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
       String[] fields = value.toString().split("\t");
       
       //對不同數(shù)據(jù)來源分開處理
       if ("order.txt".equals(filename)) {
           order.setId(fields[0]);
           order.setPid(fields[1]);
           order.setAmount(Integer.parseInt(fields[2]));
           order.setPname("");
       } else {
           order.setPid(fields[0]);
           order.setPname(fields[1]);
           order.setAmount(0);
           order.setId("");
       }

       context.write(order, NullWritable.get());
   }
}

3)編寫TableReducer

package com.xxx.reducejoin;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {

   @Override
   protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
       
       //第一條數(shù)據(jù)來自pd澜建,之后全部來自order
       Iterator<NullWritable> iterator = values.iterator();
       
       //通過第一條數(shù)據(jù)獲取pname
       iterator.next();
       String pname = key.getPname();
       
       //遍歷剩下的數(shù)據(jù)送矩,替換并寫出
       while (iterator.hasNext()) {
           iterator.next();
           key.setPname(pname);
           context.write(key,NullWritable.get());
       }
   }


}

4)編寫TableDriver

package com.xxx.reducejoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class OrderDriver {
   public static void main(String[] args) throws IOException, ClassNotFoundException, >InterruptedException {
       Job job = Job.getInstance(new Configuration());
       job.setJarByClass(OrderDriver.class);

       job.setMapperClass(OrderMapper.class);
       job.setReducerClass(OrderReducer.class);
       job.setGroupingComparatorClass(OrderComparator.class);

       job.setMapOutputKeyClass(OrderBean.class);
       job.setMapOutputValueClass(NullWritable.class);

       job.setOutputKeyClass(OrderBean.class);
       job.setOutputValueClass(NullWritable.class);

       FileInputFormat.setInputPaths(job, new Path("d:\\input"));
       FileOutputFormat.setOutputPath(job, new Path("d:\\output"));

       boolean b = job.waitForCompletion(true);

       System.exit(b ? 0 : 1);

   }
}

4.測試

1001    小米  1   
1001    小米  1   
1002    華為  2   
1002    華為  2   
1003    格力  3   
1003    格力  3

5.總結(jié)

3??Map Join

  1. 使用場景 : Map Join適用于一張表十分小,一張表很大的場景;
  2. 優(yōu)點 :
    思考:在Reduce端處理過多的表蚕甥,非常容易產(chǎn)生數(shù)據(jù)傾斜。怎么辦栋荸?
    Map端緩存多張表菇怀,提前處理業(yè)務(wù)邏輯夷家,這樣增加Map端業(yè)務(wù),減少Reduce端數(shù)據(jù)的壓力敏释,盡可能的減少數(shù)據(jù)傾斜。

3.具體辦法:采用DistributedCache
(1)在Mappersetup階段摸袁,將文件讀取到緩存集合中钥顽。
(2)在驅(qū)動函數(shù)中加載緩存。
??// 緩存普通文件到Task運行節(jié)點靠汁。
??job.addCacheFile(new URI("file://e:/cache/pd.txt"));
4??Map Join案例實操

  1. 需求
// order.txt
1001   01  1
1002   02  2
1003   03  3
1004   01  4
1005   02  5
1006   03  6
// pd.txt
01 小米
02 華為
03 格力

將商品信息表中數(shù)據(jù)根據(jù)商品pid合并到訂單數(shù)據(jù)表中蜂大。

// 最終數(shù)據(jù)形式
id     pname   amount
1001   小米        1
1004   小米        4
1002   華為        2
1005   華為        5
1003   格力        3
1006   格力        6

2.需求分析 : MapJoin適用于關(guān)聯(lián)表中有小表的情形。

3.實現(xiàn)代碼
(1)先在驅(qū)動模塊中添加緩存文件

package test;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DistributedCacheDriver {

  public static void main(String[] args) throws Exception {
      
// 0 根據(jù)自己電腦路徑重新配置
args = new String[]{"e:/input/inputtable2", "e:/output1"};

// 1 獲取job信息
      Configuration configuration = new Configuration();
      Job job = Job.getInstance(configuration);

      // 2 設(shè)置加載jar包路徑
      job.setJarByClass(DistributedCacheDriver.class);

      // 3 關(guān)聯(lián)map
      job.setMapperClass(DistributedCacheMapper.class);
      
// 4 設(shè)置最終輸出數(shù)據(jù)類型
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(NullWritable.class);

      // 5 設(shè)置輸入輸出路徑
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      // 6 加載緩存數(shù)據(jù)
      job.addCacheFile(new URI("file:///e:/input/inputcache/pd.txt"));
      
      // 7 Map端Join的邏輯不需要Reduce階段蝶怔,設(shè)置reduceTask數(shù)量為0
      job.setNumReduceTasks(0);

      // 8 提交
      boolean result = job.waitForCompletion(true);
      System.exit(result ? 0 : 1);
  }
}

(2)讀取緩存的文件數(shù)據(jù)

package com.xxx.mapjoin;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

public class MjMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

   //pd表在內(nèi)存中的緩存
   private Map<String, String> pMap = new HashMap<>();

   private Text line = new Text();

   //任務(wù)開始前將pd數(shù)據(jù)緩存進PMap
   @Override
   protected void setup(Context context) throws IOException, InterruptedException {
       
       //從緩存文件中找到pd.txt
       URI[] cacheFiles = context.getCacheFiles();
       Path path = new Path(cacheFiles[0]);

       //獲取文件系統(tǒng)并開流
       FileSystem fileSystem = FileSystem.get(context.getConfiguration());
       FSDataInputStream fsDataInputStream = fileSystem.open(path);

       //通過包裝流轉(zhuǎn)換為reader
       BufferedReader bufferedReader = new BufferedReader(
               new InputStreamReader(fsDataInputStream, "utf-8"));

       //逐行讀取奶浦,按行處理
       String line;
       while (StringUtils.isNotEmpty(line = bufferedReader.readLine())) {
           String[] fields = line.split("\t");
           pMap.put(fields[0], fields[1]);
       }

       //關(guān)流
       IOUtils.closeStream(bufferedReader);

   }

   @Override
   protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
       String[] fields = value.toString().split("\t");

       String pname = pMap.get(fields[1]);

       line.set(fields[0] + "\t" + pname + "\t" + fields[2]);

       context.write(line, NullWritable.get());

   }
}

二 計數(shù)器應(yīng)用


三 數(shù)據(jù)清洗(ETL)

??在運行核心業(yè)務(wù)MapReduce程序之前,往往要先對數(shù)據(jù)進行清洗踢星,清理掉不符合用戶要求的數(shù)據(jù)澳叉。清理的過程往往只需要運行Mapper程序,不需要運行Reduce程序沐悦。
① 數(shù)據(jù)清洗案例實操-簡單解析版

  1. 需求 : 去除日志中字段長度小于等于11的日志;
    (1)輸入數(shù)據(jù)
//web.log
194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
183.49.46.228 - - [18/Sep/2013:06:49:23 +0000] "-" 400 0 "-" "-"
163.177.71.12 - - [18/Sep/2013:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
163.177.71.12 - - [18/Sep/2013:06:49:36 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
101.226.68.137 - - [18/Sep/2013:06:49:42 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
101.226.68.137 - - [18/Sep/2013:06:49:45 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
60.208.6.156 - - [18/Sep/2013:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
222.68.172.190 - - [18/Sep/2013:06:50:08 +0000] "-" 400 0 "-" "-"
183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
66.249.66.84 - - [18/Sep/2013:06:50:28 +0000] "GET /page/6/ HTTP/1.1" 200 27777 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"

(2)期望輸出數(shù)據(jù) : 每行字段長度都大于11成洗。
2.需求分析 : 需要在Map階段對輸入的數(shù)據(jù)根據(jù)規(guī)則進行過濾清洗。
3.實現(xiàn)代碼
(1)編寫LogMapper

package com.xxx.mapreduce.weblog;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
  
  Text k = new Text();
  
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      
      // 1 獲取1行數(shù)據(jù)
      String line = value.toString();
      
      // 2 解析日志
      boolean result = parseLog(line,context);
      
      // 3 日志不合法退出
      if (!result) {
          return;
      }
      
      // 4 設(shè)置key
      k.set(line);
      
      // 5 寫出數(shù)據(jù)
      context.write(k, NullWritable.get());
  }

  // 2 解析日志
  private boolean parseLog(String line, Context context) {

      // 1 截取
      String[] fields = line.split(" ");
      
      // 2 日志長度大于11的為合法
      if (fields.length > 11) {

          // 系統(tǒng)計數(shù)器
          context.getCounter("map", "true").increment(1);
          return true;
      }else {
          context.getCounter("map", "false").increment(1);
          return false;
      }
  }
}

(2)編寫LogDriver

package com.xxx.mapreduce.weblog;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LogDriver {

  public static void main(String[] args) throws Exception {

// 輸入輸出路徑需要根據(jù)自己電腦上實際的輸入輸出路徑設(shè)置
       args = new String[] { "e:/input/inputlog", "e:/output1" };

      // 1 獲取job信息
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf);

      // 2 加載jar包
      job.setJarByClass(LogDriver.class);

      // 3 關(guān)聯(lián)map
      job.setMapperClass(LogMapper.class);

      // 4 設(shè)置最終輸出類型
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(NullWritable.class);

      // 設(shè)置reducetask個數(shù)為0
      job.setNumReduceTasks(0);

      // 5 設(shè)置輸入和輸出路徑
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      // 6 提交
      job.waitForCompletion(true);
  }
}

② 數(shù)據(jù)清洗案例實操-復(fù)雜解析版
1.需求 : 對Web訪問日志中的各字段識別切分藏否,去除日志中不合法的記錄瓶殃。根據(jù)清洗規(guī)則,輸出過濾后的數(shù)據(jù)副签。
(1)輸入數(shù)據(jù)

//web.log
194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
183.49.46.228 - - [18/Sep/2013:06:49:23 +0000] "-" 400 0 "-" "-"
163.177.71.12 - - [18/Sep/2013:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
163.177.71.12 - - [18/Sep/2013:06:49:36 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
101.226.68.137 - - [18/Sep/2013:06:49:42 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
101.226.68.137 - - [18/Sep/2013:06:49:45 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
60.208.6.156 - - [18/Sep/2013:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
222.68.172.190 - - [18/Sep/2013:06:50:08 +0000] "-" 400 0 "-" "-"
183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
66.249.66.84 - - [18/Sep/2013:06:50:28 +0000] "GET /page/6/ HTTP/1.1" 200 27777 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"

(2)期望輸出數(shù)據(jù) : 都是合法的數(shù)據(jù)
2.實現(xiàn)代碼
(1)定義一個bean遥椿,用來記錄日志數(shù)據(jù)中的各數(shù)據(jù)字段

package com.xxx.mapreduce.log;

public class LogBean {
  private String remote_addr;// 記錄客戶端的ip地址
  private String remote_user;// 記錄客戶端用戶名稱,忽略屬性"-"
  private String time_local;// 記錄訪問時間與時區(qū)
  private String request;// 記錄請求的url與http協(xié)議
  private String status;// 記錄請求狀態(tài);成功是200
  private String body_bytes_sent;// 記錄發(fā)送給客戶端文件主體內(nèi)容大小
  private String http_referer;// 用來記錄從那個頁面鏈接訪問過來的
  private String http_user_agent;// 記錄客戶瀏覽器的相關(guān)信息

  private boolean valid = true;// 判斷數(shù)據(jù)是否合法

  public String getRemote_addr() {
      return remote_addr;
  }

  public void setRemote_addr(String remote_addr) {
      this.remote_addr = remote_addr;
  }

  public String getRemote_user() {
      return remote_user;
  }

  public void setRemote_user(String remote_user) {
      this.remote_user = remote_user;
  }

  public String getTime_local() {
      return time_local;
  }

  public void setTime_local(String time_local) {
      this.time_local = time_local;
  }

  public String getRequest() {
      return request;
  }

  public void setRequest(String request) {
      this.request = request;
  }

  public String getStatus() {
      return status;
  }

  public void setStatus(String status) {
      this.status = status;
  }

  public String getBody_bytes_sent() {
      return body_bytes_sent;
  }

  public void setBody_bytes_sent(String body_bytes_sent) {
      this.body_bytes_sent = body_bytes_sent;
  }

  public String getHttp_referer() {
      return http_referer;
  }

  public void setHttp_referer(String http_referer) {
      this.http_referer = http_referer;
  }

  public String getHttp_user_agent() {
      return http_user_agent;
  }

  public void setHttp_user_agent(String http_user_agent) {
      this.http_user_agent = http_user_agent;
  }

  public boolean isValid() {
      return valid;
  }

  public void setValid(boolean valid) {
      this.valid = valid;
  }

  @Override
  public String toString() {

      StringBuilder sb = new StringBuilder();
      sb.append(this.valid);
      sb.append("\001").append(this.remote_addr);
      sb.append("\001").append(this.remote_user);
      sb.append("\001").append(this.time_local);
      sb.append("\001").append(this.request);
      sb.append("\001").append(this.status);
      sb.append("\001").append(this.body_bytes_sent);
      sb.append("\001").append(this.http_referer);
      sb.append("\001").append(this.http_user_agent);
      
      return sb.toString();
  }
}

(2)編寫LogMapper

package com.xxx.mapreduce.log;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
  Text k = new Text();
  
  @Override
  protected void map(LongWritable key, Text value, Context context)   throws IOException, InterruptedException {

      // 1 獲取1行
      String line = value.toString();
      
      // 2 解析日志是否合法
      LogBean bean = parseLog(line);
      
      if (!bean.isValid()) {
          return;
      }
      
      k.set(bean.toString());
      
      // 3 輸出
      context.write(k, NullWritable.get());
  }

  // 解析日志
  private LogBean parseLog(String line) {

      LogBean logBean = new LogBean();
      
      // 1 截取
      String[] fields = line.split(" ");
      
      if (fields.length > 11) {

          // 2封裝數(shù)據(jù)
          logBean.setRemote_addr(fields[0]);
          logBean.setRemote_user(fields[1]);
          logBean.setTime_local(fields[3].substring(1));
          logBean.setRequest(fields[6]);
          logBean.setStatus(fields[8]);
          logBean.setBody_bytes_sent(fields[9]);
          logBean.setHttp_referer(fields[10]);
          
          if (fields.length > 12) {
              logBean.setHttp_user_agent(fields[11] + " "+ fields[12]);
          }else {
              logBean.setHttp_user_agent(fields[11]);
          }
          
          // 大于400淆储,HTTP錯誤
          if (Integer.parseInt(logBean.getStatus()) >= 400) {
              logBean.setValid(false);
          }
      }else {
          logBean.setValid(false);
      }
      
      return logBean;
  }
}

(3)編寫LogDriver

package com.xxx.mapreduce.log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LogDriver {
  public static void main(String[] args) throws Exception {
      
// 1 獲取job信息
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf);

      // 2 加載jar包
      job.setJarByClass(LogDriver.class);

      // 3 關(guān)聯(lián)map
      job.setMapperClass(LogMapper.class);

      // 4 設(shè)置最終輸出類型
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(NullWritable.class);

      // 5 設(shè)置輸入和輸出路徑
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      // 6 提交
      job.waitForCompletion(true);
  }
}

MapReduce開發(fā)總結(jié)


Hadoop數(shù)據(jù)壓縮

1?? 概述

2??MR支持的壓縮編碼
3??壓縮方式選擇
Gzip壓縮
Bzip2壓縮
Lzo壓縮
Snappy壓縮
4??壓縮位置選擇 : 壓縮可以在MapReduce作用的任意階段啟用冠场,如下圖所示。
5??壓縮參數(shù)配置
6?? 壓縮實操案例

  1. 數(shù)據(jù)流的壓縮和解壓縮
package com.xxx.mapreduce.compress;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;

public class TestCompress {

  public static void main(String[] args) throws Exception {
      compress("e:/hello.txt","org.apache.hadoop.io.compress.BZip2Codec");
//     decompress("e:/hello.txt.bz2");
  }

  // 1遏考、壓縮
  private static void compress(String filename, String method) throws Exception {
      
      // (1)獲取輸入流
      FileInputStream fis = new FileInputStream(new File(filename));
      
      Class codecClass = Class.forName(method);
      
      CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration());
      
      // (2)獲取輸出流
      FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension()));
      CompressionOutputStream cos = codec.createOutputStream(fos);
      
      // (3)流的對拷
      IOUtils.copyBytes(fis, cos, 1024*1024*5, false);
      
// (4)關(guān)閉資源
      cos.close();
      fos.close();
fis.close();
  }

  // 2慈鸠、解壓縮
  private static void decompress(String filename) throws FileNotFoundException, IOException {
      
      // (0)校驗是否能解壓縮
      CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());

      CompressionCodec codec = factory.getCodec(new Path(filename));
      
      if (codec == null) {
          System.out.println("cannot find codec for file " + filename);
          return;
      }
      
      // (1)獲取輸入流
      CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));
      
      // (2)獲取輸出流
      FileOutputStream fos = new FileOutputStream(new File(filename + ".decoded"));
      
      // (3)流的對拷
      IOUtils.copyBytes(cis, fos, 1024*1024*5, false);
      
      // (4)關(guān)閉資源
      cis.close();
      fos.close();
  }
}
  1. Map輸出端采用壓縮 : 即使你的MapReduce的輸入輸出文件都是未壓縮的文件,你仍然可以對Map任務(wù)的中間結(jié)果輸出做壓縮灌具,因為它要寫在硬盤并且通過網(wǎng)絡(luò)傳輸?shù)?code>Reduce節(jié)點青团,對其壓縮可以提高很多性能,這些工作只要設(shè)置兩個屬性即可;
package com.xxx.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;   
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

  public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

      Configuration configuration = new Configuration();

      // 開啟map端輸出壓縮
  configuration.setBoolean("mapreduce.map.output.compress", true);
      // 設(shè)置map端輸出壓縮方式
  configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

      Job job = Job.getInstance(configuration);

      job.setJarByClass(WordCountDriver.class);

      job.setMapperClass(WordCountMapper.class);
      job.setReducerClass(WordCountReducer.class);

      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(IntWritable.class);

      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);

      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      boolean result = job.waitForCompletion(true);

      System.exit(result ? 1 : 0);
  }
}

1.Mapper保持不變

package com.xxx.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

Text k = new Text();
  IntWritable v = new IntWritable(1);

  @Override
  protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {

      // 1 獲取一行
      String line = value.toString();

      // 2 切割
      String[] words = line.split(" ");

      // 3 循環(huán)寫出
      for(String word:words){
k.set(word);
          context.write(k, v);
      }
  }
}
  1. Reducer保持不變
package com.xxx.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

  IntWritable v = new IntWritable();

  @Override
  protected void reduce(Text key, Iterable<IntWritable> values,
          Context context) throws IOException, InterruptedException {
      
      int sum = 0;

      // 1 匯總
      for(IntWritable value:values){
          sum += value.get();
      }
      
       v.set(sum);

       // 2 輸出
      context.write(key, v);
  }
}
  1. Reduce輸出端采用壓縮
    1.修改驅(qū)動
package com.xxx.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

  public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
      
      Configuration configuration = new Configuration();
      
      Job job = Job.getInstance(configuration);
      
      job.setJarByClass(WordCountDriver.class);
      
      job.setMapperClass(WordCountMapper.class);
      job.setReducerClass(WordCountReducer.class);
      
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(IntWritable.class);
      
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
      
      // 設(shè)置reduce端輸出壓縮開啟
      FileOutputFormat.setCompressOutput(job, true);
      
      // 設(shè)置壓縮的方式
      FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); 
//     FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); 
//     FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); 
      
      boolean result = job.waitForCompletion(true);
      
      System.exit(result?1:0);
  }
}

2.MapperReducer保持不變(詳見6.2

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末咖楣,一起剝皮案震驚了整個濱河市督笆,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌诱贿,老刑警劉巖娃肿,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件咕缎,死亡現(xiàn)場離奇詭異,居然都是意外死亡料扰,警方通過查閱死者的電腦和手機凭豪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來晒杈,“玉大人嫂伞,你說我怎么就攤上這事≌辏” “怎么了帖努?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長粪般。 經(jīng)常有香客問我拼余,道長,這世上最難降的妖魔是什么亩歹? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任匙监,我火速辦了婚禮,結(jié)果婚禮上小作,老公的妹妹穿的比我還像新娘舅柜。我一直安慰自己,他們只是感情好躲惰,可當(dāng)我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布致份。 她就那樣靜靜地躺著,像睡著了一般础拨。 火紅的嫁衣襯著肌膚如雪氮块。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天诡宗,我揣著相機與錄音滔蝉,去河邊找鬼。 笑死塔沃,一個胖子當(dāng)著我的面吹牛蝠引,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蛀柴,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼螃概,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了鸽疾?” 一聲冷哼從身側(cè)響起吊洼,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎制肮,沒想到半個月后冒窍,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體递沪,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年综液,在試婚紗的時候發(fā)現(xiàn)自己被綠了款慨。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡谬莹,死狀恐怖樱调,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情届良,我是刑警寧澤,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布圣猎,位于F島的核電站士葫,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏送悔。R本人自食惡果不足惜慢显,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望欠啤。 院中可真熱鬧荚藻,春花似錦、人聲如沸洁段。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽祠丝。三九已至疾呻,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間写半,已是汗流浹背岸蜗。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留叠蝇,地道東北人璃岳。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像悔捶,于是被迫代替她去往敵國和親铃慷。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,927評論 2 355

推薦閱讀更多精彩內(nèi)容