Join在MapReduce中的實現(xiàn)
一宣羊、概述
tips: Hive: MapReduce/Spark巧用 explain 查看語法樹
-
常見的面試題:描述如何使用MapReduce來實現(xiàn)join功能:考察點
- MapReduce執(zhí)行流程
- join的底層執(zhí)行過程
- join的多種實現(xiàn)方式
- ReduceJoin(有shuffle)
- MapJoin(沒有ReduceJoin,也就是沒有Shuffle的過程)
-
其他常見面試題
- Mapper的泛型里面有幾個參數(shù),各是啥意思(4 個,文件輸入的key/value液肌;寫入上下文的key/value)
- map方法有幾個參數(shù)承疲,各是啥意思(3 個,文件輸入的key/value冲呢;上下文)
- 字符串拼接為啥不建議使用 + 而是StringBuilder
- Mapper/Reduce的生命周期
- tips:簡歷:項目
- 最新的項目寫在最前面
- 寫的東西一定要真正會的(寫的一定要會舍败,會的不一定是自己寫的,哈哈)
- 從自己寫的東西開始面起敬拓,然后逐步擴展=>基金/技術(shù)的一個功能鏈條
- 想要高薪邻薯,得挖相關(guān)技術(shù)的祖墳!
二乘凸、原始數(shù)據(jù)
- emp.txt
7369 SMITH CLERK 7902 1980-12-17 800.00 20
7499 ALLEN SALESMAN 7698 1981-2-20 1600.00 300.00 30
7521 WARD SALESMAN 7698 1981-2-22 1250.00 500.00 40
7566 JONES MANAGER 7839 1981-4-2 2975.00 20
7654 MARTIN SALESMAN 7698 1981-9-28 1250.00 1400.00 30
7698 BLAKE MANAGER 7839 1981-5-1 2850.00 30
7782 CLARK MANAGER 7839 1981-6-9 2450.00 10
7788 SCOTT ANALYST 7566 1987-4-19 3000.00 20
7839 KING PRESIDENT 1981-11-17 5000.00 10
7844 TURNER SALESMAN 7698 1981-9-8 1500.00 0.00 30
7876 ADAMS CLERK 7788 1987-5-23 1100.00 20
7900 JAMES CLERK 7698 1981-12-3 950.00 30
7902 FORD ANALYST 7566 1981-12-3 3000.00 20
7934 MILLER CLERK 7782 1982-1-23 1300.00 40
8888 HIVE PROGRAM 7839 1988-1-23 10300.00
- dept.txt
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
三厕诡、ReduceJoin流程分析以及實現(xiàn)
數(shù)據(jù)通過Mapper加載過來,獲取當(dāng)前輸入的文件名营勤,確認(rèn)封裝的對象并在對象中標(biāo)記灵嫌,以join條件為key寫入上下文,然后經(jīng)過shuffle階段葛作,然后在Reduce端根據(jù)flag分發(fā)為不同的對象然后進行排列組合寫入文件寿羞。
代碼實現(xiàn)
import lombok.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
/**
* 前提:數(shù)據(jù)被ETL處理過,能被Hive直接使用
* emp.txt
* empno ename sal
* dept.txt
* deptno dname
*
* emp.txt 與 dept.txt是多對一的關(guān)系
* @author Liucheng
* @since 2019-12-06
*/
public class ReduceJoinApp {
public static void main(String[] args) throws Exception {
// 配置類进鸠;默認(rèn)為本地文件系統(tǒng)
Configuration configuration = new Configuration();
// Job工作類
Job job = Job.getInstance(configuration);
// 設(shè)置住主類
job.setJarByClass(ReduceJoinApp.class);
// 配置Reducer Task任務(wù)個數(shù)
// job.setNumReduceTasks(3);
// 配置Mapper與Reducer
job.setMapperClass(ReduceJoinMapper.class);
job.setReducerClass(ReduceJoinReducer.class);
// 告知Mapper的輸出類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Entity.class);
// 告知Reducer的輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 本地需要處理的文件
Path emp = new Path("E:\\ImprovementWorkingSpace\\hadoop-learning\\src\\main\\resources\\join\\emp.txt");
Path dept = new Path("E:\\ImprovementWorkingSpace\\hadoop-learning\\src\\main\\resources\\join\\dept.txt");
Path outputPath = new Path("E:\\ImprovementWorkingSpace\\hadoop-learning\\src\\main\\resources\\join\\reduce-join");
FileSystem fileSystem = FileSystem.get(configuration);
fileSystem.delete(outputPath, true);
MultipleInputs.addInputPath(job, emp, TextInputFormat.class);
MultipleInputs.addInputPath(job, dept, TextInputFormat.class);
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
}
}
class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Entity> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 獲取文件名
String filename = getFileName(context);
String[] datas = value.toString().split("\t");
if (filename.contains("emp.txt")) {
if (datas.length < 8) {
return;
}
// 員工表處理邏輯
String deptno = datas[7];
// 第一個屬性flag表示標(biāo)識來源
Entity emp = new Entity("emp", datas[0], datas[1], datas[5], deptno);
context.write(new Text(deptno), emp);
} else {
// 部門表處理邏輯
if (datas.length < 3) {
return;
}
String deptno = datas[0];
Entity dept = new Entity("dept", datas[1]);
context.write(new Text(deptno), dept);
}
}
/**
* 獲取文件名的方法
*/
public String getFileName(Context context) {
// 獲取記錄對應(yīng)的文件信息
InputSplit inputSplit = context.getInputSplit();
Class<? extends InputSplit> splitClass = inputSplit.getClass();
FileSplit fileSplit = null;
if (splitClass.equals(FileSplit.class)) {
fileSplit = (FileSplit) inputSplit;
} else if (splitClass.getName().equals("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) {
try {
Method getInputSplitMethod = splitClass.getDeclaredMethod("getInputSplit");
getInputSplitMethod.setAccessible(true);
fileSplit = (FileSplit) getInputSplitMethod.invoke(inputSplit);
} catch (Exception e) {
System.out.println(e);
throw new RuntimeException(e);
}
}
// 獲取文件名
String fileName = fileSplit.getPath().getName();
// 獲取文件所在的路徑名
String filePath = fileSplit.getPath().getParent().toUri().getPath();
return fileName;
}
}
class ReduceJoinReducer extends Reducer<Text, Entity, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<Entity> values, Context context) throws IOException, InterruptedException {
List<Entity> emps = new ArrayList<>();
Entity[] dept = new Entity[1];
values.forEach(entity -> {
if ("emp".equals(entity.getFlag())) {
emps.add(new Entity(entity));
System.out.println("剛?cè)〕鰜頃r" + entity);
} else if ("dept".equals(entity.getFlag())) {
dept[0] = new Entity(entity);
System.out.println("剛?cè)〕鰜頃r" + entity);
}
});
emps.forEach(entity -> {
StringBuilder sb = new StringBuilder();
sb.append(entity.getEmpnno()).append("\t")
.append(entity.getEname()).append("\t")
.append(entity.getSal()).append("\t")
.append(entity.getDeptno()).append("\t")
.append(dept[0].getDname());
try {
context.write(new Text(sb.toString()), NullWritable.get());
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
class Entity implements Writable {
private String flag;
private String empnno;
private String ename;
private String sal;
private String deptno;
private String dname;
public Entity(String flag, String empnno, String ename, String sal, String deptno) {
this.flag = flag;
this.empnno = empnno;
this.ename = ename;
this.sal = sal;
this.deptno = deptno;
}
public Entity(String flag, String dname) {
this.flag = flag;
this.dname = dname;
}
public Entity(Entity newEntity) {
this.flag = newEntity.getFlag();
this.empnno = newEntity.getEmpnno();
this.ename = newEntity.getEname();
this.sal = newEntity.getSal();
this.deptno = newEntity.getDeptno();
this.dname = newEntity.getDname();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(flag);
out.writeUTF(empnno != null ? empnno : "");
out.writeUTF(ename != null ? ename : "");
out.writeUTF(sal != null ? sal : "");
out.writeUTF(deptno != null ? deptno : "");
out.writeUTF(dname != null ? dname : "");
}
@Override
public void readFields(DataInput in) throws IOException {
this.flag = in.readUTF();
this.empnno = in.readUTF();
this.ename = in.readUTF();
this.sal = in.readUTF();
this.deptno = in.readUTF();
this.dname = in.readUTF();
}
@Override
public String toString() {
return this.hashCode() + "--Entity{" +
"flag='" + flag + '\'' +
", empnno='" + empnno + '\'' +
", ename='" + ename + '\'' +
", sal='" + sal + '\'' +
", deptno='" + deptno + '\'' +
", dname='" + dname + '\'' +
'}';
}
}
四稠曼、MapJoin流程分析
- 適合數(shù)據(jù)小,是否有必要全部
- shuffle是整個大數(shù)據(jù)處理過程中非常耗時,非常損耗性能的地方
- 能規(guī)避shuffle的地方就不要使用shuffle【調(diào)優(yōu)霞幅,有些情況下進行shuffle是更加優(yōu)化漠吻,這種情況比較少】
- 將小文件的內(nèi)容寫入緩存,讀取比較大文件司恳,然后在緩存中根據(jù)join條件查找寫入緩存即可途乃。
- 代碼實現(xiàn)
package com.hahadasheng.bigdata.hadooplearning.reducejoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.*;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
/**
* @author Liucheng
* @since 2019-12-07
*/
public class MapperJoinApp {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(MapperJoinApp.class);
// 配置Reducer Task任務(wù)個數(shù)為0
job.setNumReduceTasks(0);
job.setMapperClass(MapperJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 小文件 hdfs文件
URI dept = new URI("/mapjoin/dept.txt");
// 將小文件加到分布式緩存中
job.addCacheFile(dept);
// 大文件 hdfs文件
Path emp = new Path("/mapjoin/emp.txt");
// 寫入大文件
FileInputFormat.setInputPaths(job, emp);
Path outputPath = new Path("/mapjoin/map-join");
FileSystem fileSystem = FileSystem.get(configuration);
fileSystem.delete(outputPath, true);
// 文件輸出
FileOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
}
}
class MapperJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private Map<String, String> deptCatch = new HashMap<>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
String filePath = context.getCacheFiles()[0].toString();
//String filePath = "E:/ImprovementWorkingSpace/hadoop-learning/src/main/resources/join/dept.txt";
// 如下這個方法是在本地讀取文件,在HDFS上會報錯扔傅!
BufferedReader br = new BufferedReader(new FileReader(filePath));
String line;
while ((line = br.readLine()) != null) {
String[] datas = line.split("\t");
// 部門表處理邏輯
if (datas.length < 3) {
return;
}
String deptno = datas[0];
String dname = datas[1];
deptCatch.put(deptno, dname);
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] datas = value.toString().split("\t");
if (datas.length < 8) {
return;
}
// 員工表處理邏輯
String empnno = datas[0];
String ename = datas[1];
String sal = datas[5];
String deptno = datas[7];
StringBuilder sb = new StringBuilder();
sb.append(empnno).append("\t")
.append(empnno).append("\t")
.append(ename).append("\t")
.append(sal).append("\t")
.append(deptno).append("\t")
.append(deptCatch.get(deptno));
context.write(new Text(sb.toString()), NullWritable.get());
}
}
win10環(huán)境下有問題耍共,建議在Linux環(huán)境下測試。
- 修改文件core-site.xml添加如下內(nèi)容
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
- 問題解決
1. 每個節(jié)點的core-site.xml要添加配置
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
2. main方法中的
job.addCacheFile(new URI(xxx))
這里會在hdfs上找對應(yīng)的文件猎塞,如果沒有則會報錯
3. Mapper中的setup
String filePath = context.getCacheFiles()[0].toString();
BufferedReader br = new BufferedReader(new FileReader(filePath));
這個是在本地文件系統(tǒng)找文件试读,因為調(diào)用的是Java標(biāo)準(zhǔn)庫的API,沒有使用Hadoop相關(guān)的流荠耽,我嘗試過使用完成的路徑钩骇,加上hdfs://hadoop:8020也會報錯;
3. 最終铝量,我在hdfs上創(chuàng)建 了目錄倘屹,并把dept.txt文件上傳到該目錄,然后在Linux本機同樣創(chuàng)建相同的目錄慢叨,將dept.txt文件也拷貝一份纽匙,終于運行成功了
五、小文件
Hadoop存儲TB甚至更大級別的數(shù)據(jù)集
file ==> block ==> 3
==> DN directory
元數(shù)據(jù)信息存放在 NN NameNode 內(nèi)存中
100M vs 1k 的信息都會存放在NN中拍谐,
如果小文件越多烛缔,NN的壓力越大,就算是有備份的NN赠尾,也無濟于事
什么是小文件:按照自己忍受的程度決定
CDH blocksize 128M 默認(rèn)力穗,可以自定制
blocks的大小設(shè)置決定了 元數(shù)據(jù)信息 大小
NN的內(nèi)存多少就決定存儲多少
monitor 監(jiān)控 小文件的問題
小文件是怎么產(chǎn)生的?
故障:解決 ==> 為什么會產(chǎn)生這個故障气嫁? ==> 解決或者規(guī)避故障
1) 某種手段把數(shù)據(jù)采集過來
Flume 如果使用不當(dāng)当窗,采集到HDFS的數(shù)據(jù)會有很多的小文件 raw 源數(shù)據(jù) (僅僅按照官網(wǎng)配置,沒有優(yōu)化手段)
Logstash
從 WebServer 采集到HDFS
2) MR<進程>/Hive/Spark(Core/SQL/Streaming)<線程>
ETL預(yù)處理 產(chǎn)生很多小文件
Stat統(tǒng)計分析 數(shù)據(jù)倉庫 分好幾層 又是一對小文件
小文件解決方案
刪寸宵?
原始數(shù)據(jù)處理完可以崖面,
ETL: 時間 比如1年 2年之后 ==> 遷移(discp 工具)
Stat統(tǒng)計: 可以? 不可以 梯影?
合巫员?
標(biāo)題黨推薦的SequenceFile優(yōu)點、缺點甲棍?简识,自己要有辨識度
CombineFileInputFormat 文本、列式(ORC/Parquet)
Hive合并小文件的參數(shù)控制: 性能不咋樣 假如沒有使用Hive咋辦
Hadoop追加:可以? 不可以七扰?
離線處理 batch 假如為 ETL ==> 目錄
假如數(shù)據(jù)是錯的 ==> 重跑 append 數(shù)據(jù)不對了
HBase: 假如生產(chǎn)上沒有使用HBase咋辦
如果生成上使用了Spark/Flink:一個task處理多少數(shù)據(jù)
reduce個數(shù)決定了 ==> 文件輸出的個數(shù) 根據(jù)環(huán)境
多 ==> 快 但是files多
少 ==> skew歪斜
SQL:
學(xué)大數(shù)據(jù)要學(xué)會造數(shù)據(jù)奢赂。
學(xué)會看英文文檔【弊撸看一手的東西膳灶。愿意去讀,長期積累立由。技術(shù)更新迭代很快轧钓,只有學(xué)會看一手文檔才能保持自身的知識迭代。
大數(shù)據(jù)調(diào)優(yōu)锐膜,要根據(jù)線上的環(huán)境找合適的方式解決毕箍,沒有一勞永逸的方案。
官網(wǎng) DistCp 工具 是一個MapReduce作業(yè)枣耀,但是沒有Reduce工程
面試題(查看官網(wǎng)霉晕!)
hadoop2有哪些新特性,那個版本添加進來的
hadoop3有哪些新特性捞奕,那個版本添加進來的
MapReduce僅有map,不需要reduce的場景
MapJoin ETL Sqoop
WebHDFS
Federation
Snapshots
Quotas and HDFS
Short Circuit Local Reads
Centralized Cache Management
版本信息介紹
Cloudera: CDH 手工搭建 CM WebUI搭建
Hortonworks: HDP
MapR: