一 Join
多種應(yīng)用
1??
Reduce Join
2??Reduce Join
案例實操
1.需求//order.txt 1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6
//pd.txt 01 小米 02 華為 03 格力
將商品信息表中數(shù)據(jù)根據(jù)商品pid
合并到訂單數(shù)據(jù)表中先鱼。2.需求分析
?通過將關(guān)聯(lián)條件作為Map
輸出的key
刽脖,將兩表滿足Join
條件的數(shù)據(jù)并攜帶數(shù)據(jù)所來源的文件信息箕般,發(fā)往同一個ReduceTask
角骤,在Reduce
中進行數(shù)據(jù)的串聯(lián),如下圖所示3.代碼實現(xiàn)
1)創(chuàng)建商品和訂合并后的Bean
類package com.xxx.reducejoin; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class OrderBean implements WritableComparable<OrderBean> { private String id; private String pid; private int amount; private String pname; @Override public String toString() { return id + "\t" + pname + "\t" + amount; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getPid() { return pid; } public void setPid(String pid) { this.pid = pid; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } //按照Pid分組迅皇,組內(nèi)按照pname排序稽穆,有pname的在前 @Override public int compareTo(OrderBean o) { int compare = this.pid.compareTo(o.pid); if (compare == 0) { return o.getPname().compareTo(this.getPname()); } else { return compare; } } @Override public void write(DataOutput out) throws IOException { out.writeUTF(id); out.writeUTF(pid); out.writeInt(amount); out.writeUTF(pname); } @Override public void readFields(DataInput in) throws IOException { id = in.readUTF(); pid = in.readUTF(); amount = in.readInt(); pname = in.readUTF(); } }
2)編寫
TableMapper
類package com.xxx.reducejoin; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> { private String filename; private OrderBean order = new OrderBean(); @Override protected void setup(Context context) throws IOException, InterruptedException { //獲取切片文件名 FileSplit fs = (FileSplit) context.getInputSplit(); filename = fs.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t"); //對不同數(shù)據(jù)來源分開處理 if ("order.txt".equals(filename)) { order.setId(fields[0]); order.setPid(fields[1]); order.setAmount(Integer.parseInt(fields[2])); order.setPname(""); } else { order.setPid(fields[0]); order.setPname(fields[1]); order.setAmount(0); order.setId(""); } context.write(order, NullWritable.get()); } }
3)編寫
TableReducer
類package com.xxx.reducejoin; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> { @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { //第一條數(shù)據(jù)來自pd澜建,之后全部來自order Iterator<NullWritable> iterator = values.iterator(); //通過第一條數(shù)據(jù)獲取pname iterator.next(); String pname = key.getPname(); //遍歷剩下的數(shù)據(jù)送矩,替換并寫出 while (iterator.hasNext()) { iterator.next(); key.setPname(pname); context.write(key,NullWritable.get()); } } }
4)編寫
TableDriver
類package com.xxx.reducejoin; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class OrderDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, >InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(OrderDriver.class); job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); job.setGroupingComparatorClass(OrderComparator.class); job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("d:\\input")); FileOutputFormat.setOutputPath(job, new Path("d:\\output")); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
4.測試
1001 小米 1 1001 小米 1 1002 華為 2 1002 華為 2 1003 格力 3 1003 格力 3
5.總結(jié)
3??Map Join
- 使用場景 :
Map Join
適用于一張表十分小,一張表很大的場景;- 優(yōu)點 :
思考:在Reduce
端處理過多的表蚕甥,非常容易產(chǎn)生數(shù)據(jù)傾斜。怎么辦栋荸?
在Map
端緩存多張表菇怀,提前處理業(yè)務(wù)邏輯夷家,這樣增加Map
端業(yè)務(wù),減少Reduce
端數(shù)據(jù)的壓力敏释,盡可能的減少數(shù)據(jù)傾斜。3.具體辦法:采用
DistributedCache
(1)在Mapper
的setup
階段摸袁,將文件讀取到緩存集合中钥顽。
(2)在驅(qū)動函數(shù)中加載緩存。
??// 緩存普通文件到Task
運行節(jié)點靠汁。
??job.addCacheFile(new URI("file://e:/cache/pd.txt"));
4??Map Join
案例實操
- 需求
// order.txt 1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6
// pd.txt 01 小米 02 華為 03 格力
將商品信息表中數(shù)據(jù)根據(jù)商品pid合并到訂單數(shù)據(jù)表中蜂大。
// 最終數(shù)據(jù)形式 id pname amount 1001 小米 1 1004 小米 4 1002 華為 2 1005 華為 5 1003 格力 3 1006 格力 6
2.需求分析 :
MapJoin
適用于關(guān)聯(lián)表中有小表的情形。3.實現(xiàn)代碼
(1)先在驅(qū)動模塊中添加緩存文件package test; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class DistributedCacheDriver { public static void main(String[] args) throws Exception { // 0 根據(jù)自己電腦路徑重新配置 args = new String[]{"e:/input/inputtable2", "e:/output1"}; // 1 獲取job信息 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 設(shè)置加載jar包路徑 job.setJarByClass(DistributedCacheDriver.class); // 3 關(guān)聯(lián)map job.setMapperClass(DistributedCacheMapper.class); // 4 設(shè)置最終輸出數(shù)據(jù)類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 5 設(shè)置輸入輸出路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6 加載緩存數(shù)據(jù) job.addCacheFile(new URI("file:///e:/input/inputcache/pd.txt")); // 7 Map端Join的邏輯不需要Reduce階段蝶怔,設(shè)置reduceTask數(shù)量為0 job.setNumReduceTasks(0); // 8 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
(2)讀取緩存的文件數(shù)據(jù)
package com.xxx.mapjoin; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; public class MjMapper extends Mapper<LongWritable, Text, Text, NullWritable> { //pd表在內(nèi)存中的緩存 private Map<String, String> pMap = new HashMap<>(); private Text line = new Text(); //任務(wù)開始前將pd數(shù)據(jù)緩存進PMap @Override protected void setup(Context context) throws IOException, InterruptedException { //從緩存文件中找到pd.txt URI[] cacheFiles = context.getCacheFiles(); Path path = new Path(cacheFiles[0]); //獲取文件系統(tǒng)并開流 FileSystem fileSystem = FileSystem.get(context.getConfiguration()); FSDataInputStream fsDataInputStream = fileSystem.open(path); //通過包裝流轉(zhuǎn)換為reader BufferedReader bufferedReader = new BufferedReader( new InputStreamReader(fsDataInputStream, "utf-8")); //逐行讀取奶浦,按行處理 String line; while (StringUtils.isNotEmpty(line = bufferedReader.readLine())) { String[] fields = line.split("\t"); pMap.put(fields[0], fields[1]); } //關(guān)流 IOUtils.closeStream(bufferedReader); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t"); String pname = pMap.get(fields[1]); line.set(fields[0] + "\t" + pname + "\t" + fields[2]); context.write(line, NullWritable.get()); } }
二 計數(shù)器應(yīng)用
三 數(shù)據(jù)清洗(ETL)
??在運行核心業(yè)務(wù)
MapReduce
程序之前,往往要先對數(shù)據(jù)進行清洗踢星,清理掉不符合用戶要求的數(shù)據(jù)澳叉。清理的過程往往只需要運行Mapper
程序,不需要運行Reduce
程序沐悦。
① 數(shù)據(jù)清洗案例實操-簡單解析版
- 需求 : 去除日志中字段長度小于等于
11
的日志;
(1)輸入數(shù)據(jù)//web.log 194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)" 183.49.46.228 - - [18/Sep/2013:06:49:23 +0000] "-" 400 0 "-" "-" 163.177.71.12 - - [18/Sep/2013:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0" 163.177.71.12 - - [18/Sep/2013:06:49:36 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0" 101.226.68.137 - - [18/Sep/2013:06:49:42 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0" 101.226.68.137 - - [18/Sep/2013:06:49:45 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0" 60.208.6.156 - - [18/Sep/2013:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36" 222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36" 222.68.172.190 - - [18/Sep/2013:06:50:08 +0000] "-" 400 0 "-" "-" 183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0" 183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0" 66.249.66.84 - - [18/Sep/2013:06:50:28 +0000] "GET /page/6/ HTTP/1.1" 200 27777 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
(2)期望輸出數(shù)據(jù) : 每行字段長度都大于
11
成洗。
2.需求分析 : 需要在Map
階段對輸入的數(shù)據(jù)根據(jù)規(guī)則進行過濾清洗。
3.實現(xiàn)代碼
(1)編寫LogMapper
類package com.xxx.mapreduce.weblog; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 獲取1行數(shù)據(jù) String line = value.toString(); // 2 解析日志 boolean result = parseLog(line,context); // 3 日志不合法退出 if (!result) { return; } // 4 設(shè)置key k.set(line); // 5 寫出數(shù)據(jù) context.write(k, NullWritable.get()); } // 2 解析日志 private boolean parseLog(String line, Context context) { // 1 截取 String[] fields = line.split(" "); // 2 日志長度大于11的為合法 if (fields.length > 11) { // 系統(tǒng)計數(shù)器 context.getCounter("map", "true").increment(1); return true; }else { context.getCounter("map", "false").increment(1); return false; } } }
(2)編寫
LogDriver
類package com.xxx.mapreduce.weblog; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class LogDriver { public static void main(String[] args) throws Exception { // 輸入輸出路徑需要根據(jù)自己電腦上實際的輸入輸出路徑設(shè)置 args = new String[] { "e:/input/inputlog", "e:/output1" }; // 1 獲取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 加載jar包 job.setJarByClass(LogDriver.class); // 3 關(guān)聯(lián)map job.setMapperClass(LogMapper.class); // 4 設(shè)置最終輸出類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 設(shè)置reducetask個數(shù)為0 job.setNumReduceTasks(0); // 5 設(shè)置輸入和輸出路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6 提交 job.waitForCompletion(true); } }
② 數(shù)據(jù)清洗案例實操-復(fù)雜解析版
1.需求 : 對Web
訪問日志中的各字段識別切分藏否,去除日志中不合法的記錄瓶殃。根據(jù)清洗規(guī)則,輸出過濾后的數(shù)據(jù)副签。
(1)輸入數(shù)據(jù)//web.log 194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)" 183.49.46.228 - - [18/Sep/2013:06:49:23 +0000] "-" 400 0 "-" "-" 163.177.71.12 - - [18/Sep/2013:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0" 163.177.71.12 - - [18/Sep/2013:06:49:36 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0" 101.226.68.137 - - [18/Sep/2013:06:49:42 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0" 101.226.68.137 - - [18/Sep/2013:06:49:45 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0" 60.208.6.156 - - [18/Sep/2013:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36" 222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36" 222.68.172.190 - - [18/Sep/2013:06:50:08 +0000] "-" 400 0 "-" "-" 183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0" 183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0" 66.249.66.84 - - [18/Sep/2013:06:50:28 +0000] "GET /page/6/ HTTP/1.1" 200 27777 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
(2)期望輸出數(shù)據(jù) : 都是合法的數(shù)據(jù)
2.實現(xiàn)代碼
(1)定義一個bean
遥椿,用來記錄日志數(shù)據(jù)中的各數(shù)據(jù)字段package com.xxx.mapreduce.log; public class LogBean { private String remote_addr;// 記錄客戶端的ip地址 private String remote_user;// 記錄客戶端用戶名稱,忽略屬性"-" private String time_local;// 記錄訪問時間與時區(qū) private String request;// 記錄請求的url與http協(xié)議 private String status;// 記錄請求狀態(tài);成功是200 private String body_bytes_sent;// 記錄發(fā)送給客戶端文件主體內(nèi)容大小 private String http_referer;// 用來記錄從那個頁面鏈接訪問過來的 private String http_user_agent;// 記錄客戶瀏覽器的相關(guān)信息 private boolean valid = true;// 判斷數(shù)據(jù)是否合法 public String getRemote_addr() { return remote_addr; } public void setRemote_addr(String remote_addr) { this.remote_addr = remote_addr; } public String getRemote_user() { return remote_user; } public void setRemote_user(String remote_user) { this.remote_user = remote_user; } public String getTime_local() { return time_local; } public void setTime_local(String time_local) { this.time_local = time_local; } public String getRequest() { return request; } public void setRequest(String request) { this.request = request; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getBody_bytes_sent() { return body_bytes_sent; } public void setBody_bytes_sent(String body_bytes_sent) { this.body_bytes_sent = body_bytes_sent; } public String getHttp_referer() { return http_referer; } public void setHttp_referer(String http_referer) { this.http_referer = http_referer; } public String getHttp_user_agent() { return http_user_agent; } public void setHttp_user_agent(String http_user_agent) { this.http_user_agent = http_user_agent; } public boolean isValid() { return valid; } public void setValid(boolean valid) { this.valid = valid; } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.valid); sb.append("\001").append(this.remote_addr); sb.append("\001").append(this.remote_user); sb.append("\001").append(this.time_local); sb.append("\001").append(this.request); sb.append("\001").append(this.status); sb.append("\001").append(this.body_bytes_sent); sb.append("\001").append(this.http_referer); sb.append("\001").append(this.http_user_agent); return sb.toString(); } }
(2)編寫
LogMapper
類package com.xxx.mapreduce.log; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{ Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 獲取1行 String line = value.toString(); // 2 解析日志是否合法 LogBean bean = parseLog(line); if (!bean.isValid()) { return; } k.set(bean.toString()); // 3 輸出 context.write(k, NullWritable.get()); } // 解析日志 private LogBean parseLog(String line) { LogBean logBean = new LogBean(); // 1 截取 String[] fields = line.split(" "); if (fields.length > 11) { // 2封裝數(shù)據(jù) logBean.setRemote_addr(fields[0]); logBean.setRemote_user(fields[1]); logBean.setTime_local(fields[3].substring(1)); logBean.setRequest(fields[6]); logBean.setStatus(fields[8]); logBean.setBody_bytes_sent(fields[9]); logBean.setHttp_referer(fields[10]); if (fields.length > 12) { logBean.setHttp_user_agent(fields[11] + " "+ fields[12]); }else { logBean.setHttp_user_agent(fields[11]); } // 大于400淆储,HTTP錯誤 if (Integer.parseInt(logBean.getStatus()) >= 400) { logBean.setValid(false); } }else { logBean.setValid(false); } return logBean; } }
(3)編寫
LogDriver
類package com.xxx.mapreduce.log; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class LogDriver { public static void main(String[] args) throws Exception { // 1 獲取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 加載jar包 job.setJarByClass(LogDriver.class); // 3 關(guān)聯(lián)map job.setMapperClass(LogMapper.class); // 4 設(shè)置最終輸出類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 5 設(shè)置輸入和輸出路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6 提交 job.waitForCompletion(true); } }
四 MapReduce
開發(fā)總結(jié)
五 Hadoop
數(shù)據(jù)壓縮
1?? 概述
2??MR
支持的壓縮編碼3??壓縮方式選擇
①Gzip
壓縮②Bzip2
壓縮③Lzo
壓縮④Snappy
壓縮4??壓縮位置選擇 : 壓縮可以在MapReduce
作用的任意階段啟用冠场,如下圖所示。5??壓縮參數(shù)配置6?? 壓縮實操案例
- 數(shù)據(jù)流的壓縮和解壓縮
package com.xxx.mapreduce.compress; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.util.ReflectionUtils; public class TestCompress { public static void main(String[] args) throws Exception { compress("e:/hello.txt","org.apache.hadoop.io.compress.BZip2Codec"); // decompress("e:/hello.txt.bz2"); } // 1遏考、壓縮 private static void compress(String filename, String method) throws Exception { // (1)獲取輸入流 FileInputStream fis = new FileInputStream(new File(filename)); Class codecClass = Class.forName(method); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration()); // (2)獲取輸出流 FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension())); CompressionOutputStream cos = codec.createOutputStream(fos); // (3)流的對拷 IOUtils.copyBytes(fis, cos, 1024*1024*5, false); // (4)關(guān)閉資源 cos.close(); fos.close(); fis.close(); } // 2慈鸠、解壓縮 private static void decompress(String filename) throws FileNotFoundException, IOException { // (0)校驗是否能解壓縮 CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration()); CompressionCodec codec = factory.getCodec(new Path(filename)); if (codec == null) { System.out.println("cannot find codec for file " + filename); return; } // (1)獲取輸入流 CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename))); // (2)獲取輸出流 FileOutputStream fos = new FileOutputStream(new File(filename + ".decoded")); // (3)流的對拷 IOUtils.copyBytes(cis, fos, 1024*1024*5, false); // (4)關(guān)閉資源 cis.close(); fos.close(); } }
Map
輸出端采用壓縮 : 即使你的MapReduce
的輸入輸出文件都是未壓縮的文件,你仍然可以對Map
任務(wù)的中間結(jié)果輸出做壓縮灌具,因為它要寫在硬盤并且通過網(wǎng)絡(luò)傳輸?shù)?code>Reduce節(jié)點青团,對其壓縮可以提高很多性能,這些工作只要設(shè)置兩個屬性即可;package com.xxx.mapreduce.compress; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); // 開啟map端輸出壓縮 configuration.setBoolean("mapreduce.map.output.compress", true); // 設(shè)置map端輸出壓縮方式 configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class); Job job = Job.getInstance(configuration); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result ? 1 : 0); } }
1.
Mapper
保持不變package com.xxx.mapreduce.compress; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { // 1 獲取一行 String line = value.toString(); // 2 切割 String[] words = line.split(" "); // 3 循環(huán)寫出 for(String word:words){ k.set(word); context.write(k, v); } } }
Reducer
保持不變package com.xxx.mapreduce.compress; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; // 1 匯總 for(IntWritable value:values){ sum += value.get(); } v.set(sum); // 2 輸出 context.write(key, v); } }
Reduce
輸出端采用壓縮
1.修改驅(qū)動package com.xxx.mapreduce.compress; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.Lz4Codec; import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 設(shè)置reduce端輸出壓縮開啟 FileOutputFormat.setCompressOutput(job, true); // 設(shè)置壓縮的方式 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); // FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); boolean result = job.waitForCompletion(true); System.exit(result?1:0); } }
2.
Mapper
和Reducer
保持不變(詳見6.2
)