11.Join的MapReduce實現(xiàn)

Join在MapReduce中的實現(xiàn)

一宣羊、概述

tips: Hive: MapReduce/Spark巧用 explain 查看語法樹

  1. 常見的面試題:描述如何使用MapReduce來實現(xiàn)join功能:考察點

    1. MapReduce執(zhí)行流程
    2. join的底層執(zhí)行過程
    3. join的多種實現(xiàn)方式
      • ReduceJoin(有shuffle)
      • MapJoin(沒有ReduceJoin,也就是沒有Shuffle的過程)
  2. 其他常見面試題

    • Mapper的泛型里面有幾個參數(shù),各是啥意思(4 個,文件輸入的key/value液肌;寫入上下文的key/value)
    • map方法有幾個參數(shù)承疲,各是啥意思(3 個,文件輸入的key/value冲呢;上下文)
    • 字符串拼接為啥不建議使用 + 而是StringBuilder
    • Mapper/Reduce的生命周期
  • tips:簡歷:項目
    • 最新的項目寫在最前面
    • 寫的東西一定要真正會的(寫的一定要會舍败,會的不一定是自己寫的,哈哈)
    • 從自己寫的東西開始面起敬拓,然后逐步擴展=>基金/技術(shù)的一個功能鏈條
    • 想要高薪邻薯,得挖相關(guān)技術(shù)的祖墳!

二乘凸、原始數(shù)據(jù)

  1. emp.txt
7369    SMITH   CLERK   7902    1980-12-17  800.00      20
7499    ALLEN   SALESMAN    7698    1981-2-20   1600.00 300.00  30
7521    WARD    SALESMAN    7698    1981-2-22   1250.00 500.00  40
7566    JONES   MANAGER 7839    1981-4-2    2975.00     20
7654    MARTIN  SALESMAN    7698    1981-9-28   1250.00 1400.00 30
7698    BLAKE   MANAGER 7839    1981-5-1    2850.00     30
7782    CLARK   MANAGER 7839    1981-6-9    2450.00     10
7788    SCOTT   ANALYST 7566    1987-4-19   3000.00     20
7839    KING    PRESIDENT       1981-11-17  5000.00     10
7844    TURNER  SALESMAN    7698    1981-9-8    1500.00 0.00    30
7876    ADAMS   CLERK   7788    1987-5-23   1100.00     20
7900    JAMES   CLERK   7698    1981-12-3   950.00      30
7902    FORD    ANALYST 7566    1981-12-3   3000.00     20
7934    MILLER  CLERK   7782    1982-1-23   1300.00     40
8888    HIVE    PROGRAM 7839    1988-1-23   10300.00        
  1. dept.txt
10  ACCOUNTING  NEW YORK
20  RESEARCH    DALLAS
30  SALES   CHICAGO
40  OPERATIONS  BOSTON

三厕诡、ReduceJoin流程分析以及實現(xiàn)

  1. 數(shù)據(jù)通過Mapper加載過來,獲取當(dāng)前輸入的文件名营勤,確認(rèn)封裝的對象并在對象中標(biāo)記灵嫌,以join條件為key寫入上下文,然后經(jīng)過shuffle階段葛作,然后在Reduce端根據(jù)flag分發(fā)為不同的對象然后進行排列組合寫入文件寿羞。

  2. 代碼實現(xiàn)

import lombok.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;

/**
 * 前提:數(shù)據(jù)被ETL處理過,能被Hive直接使用
 * emp.txt
 *      empno   ename   sal
 * dept.txt
 *      deptno  dname
 *
 * emp.txt 與 dept.txt是多對一的關(guān)系
 * @author Liucheng
 * @since 2019-12-06
 */
public class ReduceJoinApp {

    public static void main(String[] args) throws Exception {

        // 配置類进鸠;默認(rèn)為本地文件系統(tǒng)
        Configuration configuration = new Configuration();

        // Job工作類
        Job job = Job.getInstance(configuration);

        // 設(shè)置住主類
        job.setJarByClass(ReduceJoinApp.class);

        // 配置Reducer Task任務(wù)個數(shù)
        // job.setNumReduceTasks(3);

        // 配置Mapper與Reducer
        job.setMapperClass(ReduceJoinMapper.class);
        job.setReducerClass(ReduceJoinReducer.class);

        // 告知Mapper的輸出類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Entity.class);

        // 告知Reducer的輸出類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 本地需要處理的文件
        Path emp = new Path("E:\\ImprovementWorkingSpace\\hadoop-learning\\src\\main\\resources\\join\\emp.txt");
        Path dept = new Path("E:\\ImprovementWorkingSpace\\hadoop-learning\\src\\main\\resources\\join\\dept.txt");

        Path outputPath = new Path("E:\\ImprovementWorkingSpace\\hadoop-learning\\src\\main\\resources\\join\\reduce-join");

        FileSystem fileSystem = FileSystem.get(configuration);
        fileSystem.delete(outputPath, true);

        MultipleInputs.addInputPath(job, emp, TextInputFormat.class);
        MultipleInputs.addInputPath(job, dept, TextInputFormat.class);

        FileOutputFormat.setOutputPath(job, outputPath);

        job.waitForCompletion(true);

    }
}

class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Entity> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 獲取文件名
        String filename = getFileName(context);

        String[] datas = value.toString().split("\t");

        if (filename.contains("emp.txt")) {
            if (datas.length < 8) {
                return;
            }

            // 員工表處理邏輯
            String deptno = datas[7];
            // 第一個屬性flag表示標(biāo)識來源
            Entity emp = new Entity("emp", datas[0], datas[1], datas[5], deptno);
            context.write(new Text(deptno), emp);
        } else {
            // 部門表處理邏輯
            if (datas.length < 3) {
                return;
            }
            String deptno = datas[0];
            Entity dept = new Entity("dept", datas[1]);
            context.write(new Text(deptno), dept);
        }
    }

    /**
     * 獲取文件名的方法
     */

    public String getFileName(Context context) {

        // 獲取記錄對應(yīng)的文件信息
        InputSplit inputSplit = context.getInputSplit();
        Class<? extends InputSplit> splitClass = inputSplit.getClass();
        FileSplit fileSplit = null;
        if (splitClass.equals(FileSplit.class)) {
            fileSplit = (FileSplit) inputSplit;
        } else if (splitClass.getName().equals("org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) {
            try {
                Method getInputSplitMethod = splitClass.getDeclaredMethod("getInputSplit");
                getInputSplitMethod.setAccessible(true);
                fileSplit = (FileSplit) getInputSplitMethod.invoke(inputSplit);
            } catch (Exception e) {
                System.out.println(e);
                throw new RuntimeException(e);
            }
        }

        // 獲取文件名
        String fileName = fileSplit.getPath().getName();
        // 獲取文件所在的路徑名
        String filePath = fileSplit.getPath().getParent().toUri().getPath();

        return fileName;
    }
}


class ReduceJoinReducer extends Reducer<Text, Entity, Text, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<Entity> values, Context context) throws IOException, InterruptedException {

        List<Entity> emps = new ArrayList<>();
        Entity[] dept = new Entity[1];

        values.forEach(entity -> {
            if ("emp".equals(entity.getFlag())) {
                emps.add(new Entity(entity));
                System.out.println("剛?cè)〕鰜頃r" + entity);
            } else if ("dept".equals(entity.getFlag())) {
                dept[0] = new Entity(entity);
                System.out.println("剛?cè)〕鰜頃r" + entity);
            }
        });

        emps.forEach(entity -> {
            StringBuilder sb = new StringBuilder();
            sb.append(entity.getEmpnno()).append("\t")
                    .append(entity.getEname()).append("\t")
                    .append(entity.getSal()).append("\t")
                    .append(entity.getDeptno()).append("\t")
                    .append(dept[0].getDname());

            try {
                context.write(new Text(sb.toString()), NullWritable.get());
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
class Entity implements Writable {

    private String flag;
    private String empnno;
    private String ename;
    private String sal;
    private String deptno;
    private String dname;

    public Entity(String flag, String empnno, String ename, String sal, String deptno) {
        this.flag = flag;
        this.empnno = empnno;
        this.ename = ename;
        this.sal = sal;
        this.deptno = deptno;
    }

    public Entity(String flag, String dname) {
        this.flag = flag;
        this.dname = dname;
    }

    public Entity(Entity newEntity) {
        this.flag = newEntity.getFlag();
        this.empnno = newEntity.getEmpnno();
        this.ename = newEntity.getEname();
        this.sal = newEntity.getSal();
        this.deptno = newEntity.getDeptno();
        this.dname = newEntity.getDname();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(flag);
        out.writeUTF(empnno != null ? empnno : "");
        out.writeUTF(ename != null ? ename : "");
        out.writeUTF(sal != null ? sal : "");
        out.writeUTF(deptno != null ? deptno : "");
        out.writeUTF(dname != null ? dname : "");
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.flag = in.readUTF();
        this.empnno = in.readUTF();
        this.ename = in.readUTF();
        this.sal = in.readUTF();
        this.deptno = in.readUTF();
        this.dname = in.readUTF();
    }

    @Override
    public String toString() {
        return this.hashCode() + "--Entity{" +
                "flag='" + flag + '\'' +
                ", empnno='" + empnno + '\'' +
                ", ename='" + ename + '\'' +
                ", sal='" + sal + '\'' +
                ", deptno='" + deptno + '\'' +
                ", dname='" + dname + '\'' +
                '}';
    }
}

四稠曼、MapJoin流程分析

  1. 適合數(shù)據(jù)小,是否有必要全部
  2. shuffle是整個大數(shù)據(jù)處理過程中非常耗時,非常損耗性能的地方
  3. 能規(guī)避shuffle的地方就不要使用shuffle【調(diào)優(yōu)霞幅,有些情況下進行shuffle是更加優(yōu)化漠吻,這種情況比較少】
  4. 將小文件的內(nèi)容寫入緩存,讀取比較大文件司恳,然后在緩存中根據(jù)join條件查找寫入緩存即可途乃。
  5. 代碼實現(xiàn)
package com.hahadasheng.bigdata.hadooplearning.reducejoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.*;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

/**
 * @author Liucheng
 * @since 2019-12-07
 */
public class MapperJoinApp {

    public static void main(String[] args) throws Exception {

        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(MapperJoinApp.class);

        // 配置Reducer Task任務(wù)個數(shù)為0
        job.setNumReduceTasks(0);

        job.setMapperClass(MapperJoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        // 小文件 hdfs文件
        URI dept = new URI("/mapjoin/dept.txt");
        // 將小文件加到分布式緩存中
        job.addCacheFile(dept);

        // 大文件 hdfs文件
        Path emp = new Path("/mapjoin/emp.txt");
        // 寫入大文件
        FileInputFormat.setInputPaths(job, emp);

        Path outputPath = new Path("/mapjoin/map-join");
        FileSystem fileSystem = FileSystem.get(configuration);
        fileSystem.delete(outputPath, true);
        // 文件輸出
        FileOutputFormat.setOutputPath(job, outputPath);

        job.waitForCompletion(true);

    }
}

class MapperJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    private Map<String, String> deptCatch = new HashMap<>();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {

        String filePath = context.getCacheFiles()[0].toString();
        //String filePath = "E:/ImprovementWorkingSpace/hadoop-learning/src/main/resources/join/dept.txt";
        
        // 如下這個方法是在本地讀取文件,在HDFS上會報錯扔傅!
        BufferedReader br = new BufferedReader(new FileReader(filePath));

        String line;
        while ((line = br.readLine()) != null) {
            String[] datas = line.split("\t");
            // 部門表處理邏輯
            if (datas.length < 3) {
                return;
            }
            String deptno = datas[0];
            String dname = datas[1];
            deptCatch.put(deptno, dname);
        }
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] datas = value.toString().split("\t");

        if (datas.length < 8) {
            return;
        }
        // 員工表處理邏輯
        String empnno = datas[0];
        String ename = datas[1];
        String sal = datas[5];
        String deptno = datas[7];

        StringBuilder sb = new StringBuilder();
        sb.append(empnno).append("\t")
                .append(empnno).append("\t")
                .append(ename).append("\t")
                .append(sal).append("\t")
                .append(deptno).append("\t")
                .append(deptCatch.get(deptno));

        context.write(new Text(sb.toString()), NullWritable.get());
    }
}

win10環(huán)境下有問題耍共,建議在Linux環(huán)境下測試。

  • 修改文件core-site.xml添加如下內(nèi)容
<property>
    <name>dfs.permissions</name>
    <value>false</value>
</property>
  • 問題解決
1. 每個節(jié)點的core-site.xml要添加配置

<property>
    <name>dfs.permissions</name>
    <value>false</value>
</property>

2. main方法中的

job.addCacheFile(new URI(xxx))

這里會在hdfs上找對應(yīng)的文件猎塞,如果沒有則會報錯

3. Mapper中的setup

String filePath = context.getCacheFiles()[0].toString();
BufferedReader br = new BufferedReader(new FileReader(filePath));

這個是在本地文件系統(tǒng)找文件试读,因為調(diào)用的是Java標(biāo)準(zhǔn)庫的API,沒有使用Hadoop相關(guān)的流荠耽,我嘗試過使用完成的路徑钩骇,加上hdfs://hadoop:8020也會報錯;

3. 最終铝量,我在hdfs上創(chuàng)建 了目錄倘屹,并把dept.txt文件上傳到該目錄,然后在Linux本機同樣創(chuàng)建相同的目錄慢叨,將dept.txt文件也拷貝一份纽匙,終于運行成功了

五、小文件

Hadoop存儲TB甚至更大級別的數(shù)據(jù)集
file ==> block ==> 3
       ==> DN directory
元數(shù)據(jù)信息存放在 NN NameNode 內(nèi)存中
100M vs 1k 的信息都會存放在NN中拍谐,
如果小文件越多烛缔,NN的壓力越大,就算是有備份的NN赠尾,也無濟于事

什么是小文件:按照自己忍受的程度決定
    CDH blocksize 128M 默認(rèn)力穗,可以自定制
    blocks的大小設(shè)置決定了 元數(shù)據(jù)信息 大小
    NN的內(nèi)存多少就決定存儲多少
    monitor 監(jiān)控 小文件的問題
    
小文件是怎么產(chǎn)生的?
    故障:解決 ==> 為什么會產(chǎn)生這個故障气嫁? ==> 解決或者規(guī)避故障
    1) 某種手段把數(shù)據(jù)采集過來
        Flume 如果使用不當(dāng)当窗,采集到HDFS的數(shù)據(jù)會有很多的小文件 raw 源數(shù)據(jù) (僅僅按照官網(wǎng)配置,沒有優(yōu)化手段)
        Logstash
        從 WebServer 采集到HDFS
    2) MR<進程>/Hive/Spark(Core/SQL/Streaming)<線程>
        ETL預(yù)處理 產(chǎn)生很多小文件
        Stat統(tǒng)計分析 數(shù)據(jù)倉庫 分好幾層 又是一對小文件

小文件解決方案
    刪寸宵? 
        原始數(shù)據(jù)處理完可以崖面,
        ETL: 時間 比如1年 2年之后 ==> 遷移(discp 工具)
        Stat統(tǒng)計: 可以? 不可以 梯影? 
    合巫员?
        標(biāo)題黨推薦的SequenceFile優(yōu)點、缺點甲棍?简识,自己要有辨識度
        CombineFileInputFormat 文本、列式(ORC/Parquet)
        Hive合并小文件的參數(shù)控制: 性能不咋樣 假如沒有使用Hive咋辦
        Hadoop追加:可以? 不可以七扰?
            離線處理 batch 假如為 ETL ==> 目錄
            假如數(shù)據(jù)是錯的 ==> 重跑 append 數(shù)據(jù)不對了
        HBase: 假如生產(chǎn)上沒有使用HBase咋辦
        如果生成上使用了Spark/Flink:一個task處理多少數(shù)據(jù)
        reduce個數(shù)決定了 ==> 文件輸出的個數(shù) 根據(jù)環(huán)境
            多  ==> 快 但是files多
            少  ==> skew歪斜
        SQL:
        

學(xué)大數(shù)據(jù)要學(xué)會造數(shù)據(jù)奢赂。
學(xué)會看英文文檔【弊撸看一手的東西膳灶。愿意去讀,長期積累立由。技術(shù)更新迭代很快轧钓,只有學(xué)會看一手文檔才能保持自身的知識迭代。
大數(shù)據(jù)調(diào)優(yōu)锐膜,要根據(jù)線上的環(huán)境找合適的方式解決毕箍,沒有一勞永逸的方案。


官網(wǎng) DistCp 工具 是一個MapReduce作業(yè)枣耀,但是沒有Reduce工程

面試題(查看官網(wǎng)霉晕!)
    hadoop2有哪些新特性,那個版本添加進來的
    hadoop3有哪些新特性捞奕,那個版本添加進來的
    MapReduce僅有map,不需要reduce的場景
        MapJoin ETL Sqoop
    WebHDFS
    Federation
    Snapshots
    Quotas and HDFS
    Short Circuit Local Reads
    Centralized Cache Management

版本信息介紹

Cloudera: CDH 手工搭建 CM WebUI搭建
Hortonworks: HDP
MapR:
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市拄轻,隨后出現(xiàn)的幾起案子颅围,更是在濱河造成了極大的恐慌,老刑警劉巖恨搓,帶你破解...
    沈念sama閱讀 219,366評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件院促,死亡現(xiàn)場離奇詭異,居然都是意外死亡斧抱,警方通過查閱死者的電腦和手機常拓,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來辉浦,“玉大人弄抬,你說我怎么就攤上這事∠芙迹” “怎么了掂恕?”我有些...
    開封第一講書人閱讀 165,689評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長弛槐。 經(jīng)常有香客問我懊亡,道長,這世上最難降的妖魔是什么乎串? 我笑而不...
    開封第一講書人閱讀 58,925評論 1 295
  • 正文 為了忘掉前任店枣,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘鸯两。我一直安慰自己闷旧,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,942評論 6 392
  • 文/花漫 我一把揭開白布甩卓。 她就那樣靜靜地躺著鸠匀,像睡著了一般。 火紅的嫁衣襯著肌膚如雪逾柿。 梳的紋絲不亂的頭發(fā)上缀棍,一...
    開封第一講書人閱讀 51,727評論 1 305
  • 那天,我揣著相機與錄音机错,去河邊找鬼爬范。 笑死,一個胖子當(dāng)著我的面吹牛弱匪,可吹牛的內(nèi)容都是我干的青瀑。 我是一名探鬼主播,決...
    沈念sama閱讀 40,447評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼萧诫,長吁一口氣:“原來是場噩夢啊……” “哼斥难!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起帘饶,我...
    開封第一講書人閱讀 39,349評論 0 276
  • 序言:老撾萬榮一對情侶失蹤哑诊,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后及刻,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體镀裤,經(jīng)...
    沈念sama閱讀 45,820評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,990評論 3 337
  • 正文 我和宋清朗相戀三年缴饭,在試婚紗的時候發(fā)現(xiàn)自己被綠了暑劝。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,127評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡颗搂,死狀恐怖担猛,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情峭火,我是刑警寧澤毁习,帶...
    沈念sama閱讀 35,812評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站卖丸,受9級特大地震影響纺且,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜稍浆,卻給世界環(huán)境...
    茶點故事閱讀 41,471評論 3 331
  • 文/蒙蒙 一载碌、第九天 我趴在偏房一處隱蔽的房頂上張望猜嘱。 院中可真熱鬧,春花似錦嫁艇、人聲如沸朗伶。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽论皆。三九已至,卻和暖如春猾漫,著一層夾襖步出監(jiān)牢的瞬間点晴,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評論 1 272
  • 我被黑心中介騙來泰國打工悯周, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留粒督,地道東北人。 一個月前我還...
    沈念sama閱讀 48,388評論 3 373
  • 正文 我出身青樓禽翼,卻偏偏與公主長得像屠橄,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子闰挡,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,066評論 2 355

推薦閱讀更多精彩內(nèi)容