MapReduce框架原理

image.png

InputFormat數(shù)據(jù)輸入

切片與MapTask并行度決定機(jī)制

  1. 問題引出
    MapTask的并行度決定Map階段的任務(wù)處理并發(fā)度,從而影響整個(gè)job的處理速度
    思考:1G的數(shù)據(jù)献酗,啟動(dòng)8個(gè)MapTask嘁信,可以提高集群的并發(fā)處理能力李根。那么1K的數(shù)據(jù)粪小,也啟動(dòng)8個(gè)MapTask椒楣,會(huì)提高集群性能嗎赴蝇?MapTask并行任務(wù)是不是越多越好呢恢总?哪些因素影響了MapTask的并行度迎罗?

  2. MapTask并行度決定機(jī)制
    數(shù)據(jù)塊:Block是HDFS物理上把數(shù)據(jù)分成一塊一塊。數(shù)據(jù)塊是HDFS上數(shù)據(jù)存儲(chǔ)單位
    數(shù)據(jù)切片:數(shù)據(jù)切片只是在邏輯上對(duì)輸入進(jìn)行分片离熏,并不會(huì)在磁盤上將其切分成片進(jìn)行存儲(chǔ)佳谦。數(shù)據(jù)切片是MapReduce程序計(jì)算輸入數(shù)據(jù)的單位,一個(gè)切片會(huì)對(duì)應(yīng)啟動(dòng)一個(gè)MapTask

image.png

job提交流程源碼和切片源碼解析

  1. job提交流程源碼
        waitForCompletion()

        submit();

        // 1建立連接
        connect();
        // 1)創(chuàng)建提交Job的代理
        new Cluster(getConfiguration());
        // (1)判斷是本地運(yùn)行環(huán)境還是yarn集群運(yùn)行環(huán)境
        initialize(jobTrackAddr, conf);

        // 2 提交job
        submitter.submitJobInternal(Job.this, cluster)
        // 1)創(chuàng)建給集群提交數(shù)據(jù)的Stag路徑
        Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

        // 2)獲取jobid 滋戳,并創(chuàng)建Job路徑
        JobID jobId = submitClient.getNewJobID();

        // 3)拷貝jar包到集群
        copyAndConfigureFiles(job, submitJobDir);
        rUploader.uploadFiles(job, jobSubmitDir);

        // 4)計(jì)算切片钻蔑,生成切片規(guī)劃文件
        writeSplits(job, submitJobDir);
        maps = writeNewSplits(job, jobSubmitDir);
        input.getSplits(job);

        // 5)向Stag路徑寫XML配置文件
        writeConf(conf, submitJobFile);
        conf.writeXml(out);

        // 6)提交Job,返回提交狀態(tài)
        status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
image.png

FileInputFormat切片源碼解析

  1. 程序先找到數(shù)據(jù)存儲(chǔ)的目錄
  2. 開始遍歷處理(規(guī)劃切片)目錄下的每一個(gè)文件
  3. 遍歷第一個(gè)文件
    (3.1)獲取文件大小,fs.sizeOf()
    (3.2)計(jì)算切片大小
    computeSplitSize(Math.max(minSize, Math.min(blockSize, maxSize))) = blockSize = 128M
    (3.3)默認(rèn)情況下奸鸯,切片大小為blockSize
    (3.4)開始切咪笑,形成第一個(gè)切片,0——128M娄涩,第二個(gè)切片窗怒,129——256M,第三個(gè)切片蓄拣,256——300M
    每次切片時(shí)扬虚,要判斷切完剩下的部分是否大于塊的1.1倍,如果不大于球恤,那么就切分為一塊
    (3.5)將切片信息寫到一個(gè)切片規(guī)劃文件里
    (3.6)整個(gè)切片核心過程在getSplit()方法中完成
    (3.7)InputSplit只記錄了切片的元數(shù)據(jù)信息辜昵,比如起始位置,長(zhǎng)度咽斧,以及所在的節(jié)點(diǎn)列表等
  4. 提交切面規(guī)劃文件到Y(jié)ARN上堪置,YARN的MrAppMaster就可以根據(jù)切片規(guī)劃文件計(jì)算開啟幾個(gè)MapTask了

FileInputFormat切片機(jī)制

  1. 切片機(jī)制
    (1)簡(jiǎn)單的按照文件的內(nèi)容長(zhǎng)度切分
    (2)切片大小躬存,默認(rèn)等于Block的大小
    (3)切片時(shí)不考慮數(shù)據(jù)集整體,而是針對(duì)每一個(gè)文件進(jìn)行切片
  2. 案例分析


    image.png

FileInputFormat切片大小的參數(shù)配置

  1. 源碼中計(jì)算切片大小的公式
    Math.max(minSize, Math.min(maxSize, blockSize));
    mapreduce.input.fileinputformat.split.minsize = 1
    mapreduce.input.fileinputformat.split.maxsize = Long.MAX_VALUE
    因此舀锨,默認(rèn)情況下岭洲,切片大小為blockSize

  2. 切片大小設(shè)置
    maxSize(切片最大值):參數(shù)如果調(diào)的比blockSize小,則會(huì)讓切片變小坎匿,而且就等于配置的這個(gè)參數(shù)的值
    minSize(切片最小值):參數(shù)調(diào)的比blockSize大盾剩,則可以讓切片變得比blockSize大

  3. 獲取切片信息API
    // 獲取切片的文件名稱
    String name = inputSplit.getPath().getName();
    // 根據(jù)文件類型獲取切片信息
    FileSplit inputSplit = (FileSplit)context.getInputSplit();

TextInputFormat

  1. FileInputFormat實(shí)現(xiàn)類
    思考:在運(yùn)行MapReduce程序時(shí),輸入的文件格式包括:基于行的日志文件替蔬、二進(jìn)制格式文件彪腔、數(shù)據(jù)庫表等。那么进栽,針對(duì)不同的數(shù)據(jù)類型,MapReduce是如何讀取這些數(shù)據(jù)的呢恭垦?
    FileInputFormat常見的接口實(shí)現(xiàn)類包括:TextInputFormat快毛、KeyValueTextInputFormat、NLineIntpuFormat番挺、CombineTextInputFormat和自定義的InputFormat

  2. TextInputFormat
    TextInputFormat是默認(rèn)的FileInputFormat實(shí)現(xiàn)類唠帝。按行讀取每條記錄。鍵是存儲(chǔ)該行在整個(gè)文件中的起始字節(jié)偏移量玄柏,LongWritable類型襟衰。值是這行的內(nèi)容,不包括任何行終止符(回車粪摘、換行)瀑晒,Text類型

CombineTextInputFormat切片機(jī)制

框架默認(rèn)的TextInputFormat切片機(jī)制是對(duì)任務(wù)按文件規(guī)劃切片,不管文件多小徘意,都會(huì)是一個(gè)單獨(dú)的切片苔悦,都會(huì)交給一個(gè)MapTask,這樣如果有大量的小文件椎咧,就會(huì)產(chǎn)生大量的MapTask玖详,處理效率極其低下

  1. 應(yīng)用場(chǎng)景
    CombineTextInputFormat用于小文件過多的場(chǎng)景,它可以將多個(gè)小文件從邏輯上規(guī)劃到一個(gè)切片中勤讽,這樣蟋座,多個(gè)小文件就可以交給一個(gè)MapTask處理
  2. 虛擬存儲(chǔ)切片最大值設(shè)置
    CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4M
    注意:虛擬存儲(chǔ)切片最大值設(shè)置最好根據(jù)實(shí)際的小文件大小情況來設(shè)置具體的值
  3. 切片機(jī)制
    生成切片過程包括:虛擬存儲(chǔ)過程和切片過程兩部分


    image.png

(1)虛擬存儲(chǔ)過程:
將輸入目錄下所有文件大小,一次和設(shè)置的setMaxInputSplitSize值比較脚牍,如果不大于設(shè)置的最大值向臀,邏輯上劃分一個(gè)塊。如果輸入文件大于設(shè)置的最大值且大于兩倍莫矗,那么以最大值切割一塊飒硅;當(dāng)剩余數(shù)據(jù)大小超過設(shè)置的最大值且不大于最大值兩倍砂缩,此時(shí)將文件均分為2個(gè)虛擬存儲(chǔ)塊(防止出現(xiàn)太小切片)
例如setMaxInputSplitSize值為4M,輸入文件大小為8.02M三娩,則先邏輯上分成一個(gè)4M庵芭。剩余的大小為4.02M,如果按照4M邏輯劃分雀监,就會(huì)出現(xiàn)0.02M的小虛擬存儲(chǔ)塊双吆,所以將剩余的4.02M切分成2.01M和2.01M的兩個(gè)文件

(2)切片過程:
(a)判斷虛擬存儲(chǔ)的文件大小是否大于setMaxInputSplitSize值,大于等于則單獨(dú)形成一個(gè)切片
(b)如果不大于則跟下一個(gè)虛擬存儲(chǔ)文件進(jìn)行合并会前,共同形成一個(gè)切片
(c)測(cè)試舉例:有4個(gè)小文件好乐,大小分別為:1.7M、5.1M瓦宜、3.4M以及6.8M蔚万,這四個(gè)小文件,則虛擬存儲(chǔ)之后會(huì)形成6個(gè)文件塊临庇,大小分別為1.7M反璃、(2.55M、2.55M)假夺、3.4M淮蜈、(3.4M、3.4M)已卷,最終形成3個(gè)切片梧田,大小分別是(1.7+2.55)M、(2.55+3.4)M侧蘸、(3.4+3.4)M

MapReduce工作流程

image.png
image.png

Shuffle機(jī)制

Shuffle機(jī)制

Map之后裁眯,Reduce之前的數(shù)據(jù)處理過程稱為Shuffle


image.png

Partition分區(qū)

  1. 問題引出
    要求將統(tǒng)計(jì)結(jié)果按照條件輸出到不同文件中(分區(qū))。比如:將統(tǒng)計(jì)結(jié)果按照手機(jī)歸屬地不同省份輸出到不同文件中(分區(qū))

  2. 默認(rèn)Partition分區(qū)

public class HashPartitioner<K, V> extends Partitioner<K, V> {
    public HashPartitioner() {
    }

    public int getPartition(K key, V value, int numReduceTasks) {
        return (key.hashCode() & 2147483647) % numReduceTasks;
    }
}

默認(rèn)分區(qū)是根據(jù)key的hashCode()對(duì)ReduceTasks個(gè)數(shù)取模得到的闺魏,用戶沒法控制哪個(gè)key存儲(chǔ)到哪個(gè)分區(qū)

  1. 自定義Partitioner步驟
    (1)自定義類繼承Partitioner未状,重寫getPartition()方法
public class ClustomPartitioner extends Partitioner<Text, FlowBean> {
        @Override
        public int getPartition(Text key, FlowBean value, int numPartitions){
                // 控制分區(qū)代碼邏輯
                return partition;
        }
}

(2)在job驅(qū)動(dòng)中,設(shè)置自定義Partitioner
job.setPartitionerClass(ClustomPartitioner.class);
(3)自定義Partitioner后析桥,要根據(jù)自定義的Partitioner的邏輯設(shè)置響應(yīng)數(shù)量的ReduceTask
job.setReduceTasks(5);

Partition分區(qū)案例實(shí)操

  1. 需求
    將統(tǒng)計(jì)結(jié)果按照手機(jī)歸屬地不同省份輸出到不同文件中
    (1)輸入數(shù)據(jù)
1   13736230513 192.196.100.1   www.atguigu.com 2481    24681   200
2   13846544121 192.196.100.2           264 0   200
3   13956435636 192.196.100.3           132 1512    200
4   13966251146 192.168.100.1           240 0   404
5   18271575951 192.168.100.2   www.atguigu.com 1527    2106    200
6   84188413    192.168.100.3   www.atguigu.com 4116    1432    200
7   13590439668 192.168.100.4           1116    954 200
8   15910133277 192.168.100.5   www.hao123.com  3156    2936    200
9   13729199489 192.168.100.6           240 0   200
10  13630577991 192.168.100.7   www.shouhu.com  6960    690 200
11  15043685818 192.168.100.8   www.baidu.com   3659    3538    200
12  15959002129 192.168.100.9   www.atguigu.com 1938    180 500
13  13560439638 192.168.100.10          918 4938    200
14  13470253144 192.168.100.11          180 180 200
15  13682846555 192.168.100.12  www.qq.com  1938    2910    200
16  13992314666 192.168.100.13  www.gaga.com    3008    3720    200
17  13509468723 192.168.100.14  www.qinghua.com 7335    110349  404
18  18390173782 192.168.100.15  www.sogou.com   9531    2412    200
19  13975057813 192.168.100.16  www.baidu.com   11058   48243   200
20  13768778790 192.168.100.17          120 120 200
21  13568436656 192.168.100.18  www.alibaba.com 2481    24681   200
22  13568436656 192.168.100.19          1116    954 200

(2)期望輸出數(shù)據(jù)
手機(jī)號(hào)136司草、137、138泡仗、139開頭都分別放到一個(gè)獨(dú)立的文件中埋虹,其他開頭的放到一個(gè)文件中

  1. 分析


    image.png
  2. 編碼
    Partitioner

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        String phone = key.toString();
        String prePhone = phone.substring(0, 3);

        int partition;
        switch (prePhone){
            case "136":
                partition = 0;
                break;
            case "137":
                partition = 1;
                break;
            case "138":
                partition = 2;
                break;
            case "139":
                partition = 3;
                break;
            default:
                partition = 4;
        }

        return partition;
    }
}

修改Driver

public class FlowCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 獲取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 設(shè)置jar路徑
        job.setJarByClass(FlowCountDriver.class);
        // 關(guān)聯(lián)mapper、reducer
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);
        // 設(shè)置map階段輸出的類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        // 設(shè)置最終輸出的類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 綁定Partitioner
        job.setPartitionerClass(ProvincePartitioner.class);
        // 設(shè)置reduce任務(wù)數(shù)娩怎,根據(jù)自定義Partitioner確定
        job.setNumReduceTasks(5);
        // 設(shè)置輸入輸出路徑
        FileInputFormat.setInputPaths(job, new Path("data/phone_data.txt"));
        FileOutputFormat.setOutputPath(job, new Path("phone_result2"));
        // 提交job
        boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }
}
  1. 分區(qū)總結(jié)
    (1)如果ReduceTask的數(shù)量 > getPartition()的結(jié)果數(shù)搔课,則會(huì)多產(chǎn)生幾個(gè)空的輸出文件
    (2)如果1 < ReduceTask < getPartition()的結(jié)果數(shù),則有一部分分區(qū)數(shù)據(jù)無處安放截亦,拋異常
    (3)如果ReduceTask的數(shù)量 = 1袍啡,則不管MapTask段輸出多少個(gè)分區(qū)文件踩官,最終結(jié)果都會(huì)交給這一個(gè)ReduceTask,最終也就只會(huì)產(chǎn)生一個(gè)結(jié)果文件
    (4)分區(qū)號(hào)必須從0開始境输,逐一累加

  2. 案例分析
    例如:自定義分區(qū)數(shù)為5蔗牡,則
    (1)job.setNumReduceTasks(1); 正常運(yùn)行,只產(chǎn)生一個(gè)輸出文件
    (2)job.setNumReduceTasks(2); 拋異常
    (3)job.setNumReduceTasks(5); 與預(yù)期一樣
    (4)job.setNumReduceTasks(6); 程序正常運(yùn)行,但會(huì)產(chǎn)生一個(gè)空文件

WritableComparable排序

概述

排序是MapReduce框架中最重要的操作之一
MapTask和ReduceTask均會(huì)對(duì)數(shù)據(jù)按照key進(jìn)行排序信粮,該操作屬于Hadoop的默認(rèn)行為黔攒。任何應(yīng)用程序中的數(shù)據(jù)均會(huì)被排序,而不管邏輯上是否需要
默認(rèn)排序是按照字段順序排序欺旧,且實(shí)現(xiàn)該排序的方法是快速排序

對(duì)于MapTask,它會(huì)將處理的結(jié)果暫時(shí)放到環(huán)形緩沖區(qū)中蛤签,當(dāng)環(huán)形緩沖區(qū)使用率達(dá)到一定閾值后辞友,在對(duì)緩沖區(qū)中的數(shù)據(jù)進(jìn)行一次快速排序,并將這些有序數(shù)據(jù)溢寫到磁盤上震肮,而當(dāng)數(shù)據(jù)處理完畢后称龙,他會(huì)對(duì)磁盤上所有文件進(jìn)行歸并排序
對(duì)于ReduceTask,他從每個(gè)MapTask上遠(yuǎn)程拷貝相應(yīng)的數(shù)據(jù)文件戳晌,如果文件大小超過一定閾值鲫尊,則溢寫到磁盤上,否則存儲(chǔ)在內(nèi)存中沦偎,如果磁盤上文件數(shù)目達(dá)到一定閾值疫向,則進(jìn)行一次歸并排序以生成一個(gè)更大的文件;如果內(nèi)存中文件大小或者數(shù)目超過一定閾值豪嚎,則進(jìn)行一次合并后將數(shù)據(jù)溢寫到磁盤上搔驼。當(dāng)所有數(shù)據(jù)拷貝完畢后,ReduceTask同意對(duì)內(nèi)存和磁盤上的所有數(shù)據(jù)進(jìn)行一次歸并排序侈询。

分類排序
  1. 部分排序
    MapReduce根據(jù)輸入記錄的鍵對(duì)數(shù)據(jù)排序舌涨。保證輸出的每個(gè)文件內(nèi)部有序
  2. 全排序
    最終輸出結(jié)果只有一個(gè)文件,且文件內(nèi)部有序扔字。實(shí)現(xiàn)方式是只設(shè)置一個(gè)ReduceTask囊嘉。但該方法在處理大型文件時(shí)效率極低温技,因?yàn)橐慌_(tái)機(jī)器處理所有文件,完全喪失了MapReduce所提供的并行架構(gòu)
  3. 輔助排序
    在Reduce端對(duì)key進(jìn)行分組扭粱。應(yīng)用于:在接收的key為bean對(duì)象時(shí)舵鳞,想讓一個(gè)或幾個(gè)字段相同(全部字段不同)的key進(jìn)入到同一個(gè)reduce方法,可以采用分組排序
  4. 二次排序
    在自定義排序過程中焊刹,如果compareTo中的判斷條件為兩個(gè)即為二次排序
自定義WritableComparable原理分析

bean對(duì)象作為key傳輸系任,需要實(shí)現(xiàn)WriableComparable接口,重寫compareTo方法虐块,就可以實(shí)現(xiàn)排序

@Override
public int compareTo(FlowBean bean) {
        int result;
        if(this.sumFlow > bean.getSumFlow()) {
                result = -1;
        } else if(this.sumFlow < bean.getSumFlow()) {
                result = 1;
        } else {
                result = 0;
        }

        return resule;
}

WritableComparable排序案例實(shí)操(全排序)

  1. 需求
    根據(jù)手機(jī)號(hào)的案例俩滥,將結(jié)果根據(jù)總流量倒序排序

  2. 需求分析


    image.png
  3. 編碼
    FlowBean
    在原有基礎(chǔ)上,改為實(shí)現(xiàn)WriableComparable接口贺奠,并重寫compareTo方法

public class FlowBean implements WritableComparable<FlowBean> {
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    // 空參構(gòu)造
    public FlowBean() {
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    @Override
    public int compareTo(FlowBean bean) {
        int result;

        if (this.sumFlow > bean.getSumFlow()) {
            result = -1;
        } else if(this.sumFlow < bean.getSumFlow()) {
            result = 1;
        } else {
            result = 0;
        }

        return result;
    }
}

Mapper

public class FlowCountMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
    private Text outValue = new Text();
    private FlowBean outKey = new FlowBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 獲取一行
        String line = value.toString();

        // 切分
        String[] split = line.split("\t");

        // 封裝
        String phone = split[0];
        outValue.set(phone);

        long up = Long.parseLong(split[1]);
        long down = Long.parseLong(split[2]);
        long sum = Long.parseLong(split[3]);

        outKey.setUpFlow(up);
        outKey.setDownFlow(down);
        outKey.setSumFlow(sum);

        // 寫出
        context.write(outKey, outValue);
    }
}

Reducer

public class FlowCountReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
    private FlowBean outValue = new FlowBean();

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        // 循環(huán)遍歷霜旧,防止不同手機(jī)號(hào)流量相同,循環(huán)過程中寫出
        for (Text value : values) {
            context.write(value, key);
        }
    }
}

Driver
注意修改Mapper的輸出類型儡率,并將之前案例的輸出作為輸入

public class FlowCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 獲取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 設(shè)置jar路徑
        job.setJarByClass(FlowCountDriver.class);
        // 關(guān)聯(lián)mapper挂据、reducer
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);
        // 設(shè)置map階段輸出的類型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        // 設(shè)置最終輸出的類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 設(shè)置輸入輸出路徑
        FileInputFormat.setInputPaths(job, new Path("phone_result"));
        FileOutputFormat.setOutputPath(job, new Path("phone_compare_result"));
        // 提交job
        boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }
}

Combiner合并

  1. Combiner是MapReduce程序中Mapper和Reducer之外的一種組件

  2. Combiner組件的父類就是Reducer

  3. Combiner和Reducer的區(qū)別在于運(yùn)行的位置
    Combiner是在每一個(gè)MapTask所在的節(jié)點(diǎn)運(yùn)行
    Reducer是接收全局所有Mapper的輸出結(jié)果

  4. Combiner的意義就是對(duì)每一個(gè)MapTask的輸出進(jìn)行局部匯總,以減小網(wǎng)絡(luò)傳輸量

  5. Combiner能夠應(yīng)用的前提是不能影響最終的業(yè)務(wù)邏輯儿普,而且崎逃,Combiner的輸出KV應(yīng)該和Reducer的輸入KV類型對(duì)應(yīng)起來

  6. 自定義Combiner實(shí)現(xiàn)步驟
    (1)自定義一個(gè)Combiner繼承Reducer,重寫reduce方法

public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable outValue = new IntWritable();

        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable value : values) {
                        sum += value.get();
                }

                outValue.set(sum);

                context.write(key, outValue);
        }
}

(2)在驅(qū)動(dòng)類中進(jìn)行設(shè)置
job.setCombinerClass(WordCountCombiner.class);

另一種實(shí)現(xiàn)方式:
我們發(fā)現(xiàn)新編寫的Combiner和原始的Reducer內(nèi)容一致眉孩,而Combiner和Reducer都繼承自Reducer个绍,所以我們可以將原有的Reducer作為Combiner使用
job.setCombinerClass(WordCountReducer.class)

OutputFormat數(shù)據(jù)輸出

OutputFormat接口實(shí)現(xiàn)類

OutputFormat是MapReduce輸出的基類,所有實(shí)現(xiàn)MapReduce輸出都實(shí)現(xiàn)了OutputFormat接口浪汪。下面我們介紹幾種常見的OutputForamt實(shí)現(xiàn)類

  1. OutputFormat實(shí)現(xiàn)類


    image.png
  1. 默認(rèn)輸出格式TextOutputFormat
  2. 自定義OutputFormat
    應(yīng)用場(chǎng)景:輸出數(shù)據(jù)到MySQL/HBASE/ElasticSearch等存儲(chǔ)中
    步驟:
    (1) 自定義一個(gè)類繼承OutputFormat
    (2)改寫RecordWriter巴柿,具體改寫輸出數(shù)據(jù)的方法write()

自定義OutputFormat案例

  1. 需求
    過濾輸入的log日志,包含atguigu的網(wǎng)站輸出到atguigu.log死遭,不包含的輸出的other.log
    數(shù)據(jù)
http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.taobao.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com
  1. 需求分析


    image.png
  2. 編碼
    Mapper

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 不作任何處理
        context.write(value, NullWritable.get());
    }
}

Reducer

public class LogReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        // 相同key的數(shù)據(jù)會(huì)被聚合广恢,循環(huán)寫出,防止丟失
        for (NullWritable value : values) {
            context.write(key, value);
        }
    }
}

OutputFormat

public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {

    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        LogRecordWriter logRecordWriter = new LogRecordWriter(job);
        return logRecordWriter;
    }
}

RecordWriter

public class LogRecordWriter extends RecordWriter<Text, NullWritable> {

    private FSDataOutputStream atguiguOut;
    private FSDataOutputStream otherOut;

    public LogRecordWriter(TaskAttemptContext job) {
        // 創(chuàng)建兩條流
        try {
            FileSystem fileSystem = FileSystem.get(job.getConfiguration());

            atguiguOut = fileSystem.create(new Path("log/atguigu.log"));
            otherOut = fileSystem.create(new Path("log/other.log"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        // 具體怎么寫
        String line = key.toString();
        if (line.contains("atguigu")) {
            atguiguOut.writeBytes(line + "\n");
        } else {
            otherOut.writeBytes(line + "\n");
        }
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        IOUtils.closeStreams(atguiguOut, otherOut);
    }
}

Driver

public class LogDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 獲取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 設(shè)置jar路徑
        job.setJarByClass(LogDriver.class);
        // 關(guān)聯(lián)mapper呀潭、reducer
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);
        // 設(shè)置map輸出的KV類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 設(shè)置最終輸出的KV類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 配置自定義OutputFormat
        job.setOutputFormatClass(LogOutputFormat.class);
        // 設(shè)置輸入路徑和輸出路徑
        FileInputFormat.setInputPaths(job, new Path("data/log.txt"));
        /*
        雖然我們?cè)O(shè)置了自定義的OutputFormat钉迷,但是自定義的OutputFormat繼承自FileOutputFormat
        而FileOutputFormat要輸出一個(gè)_SUCCESS文件,所以還是要指定輸出路徑
         */
        FileOutputFormat.setOutputPath(job, new Path("log"));
        // 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

MapReduce內(nèi)核源碼解析

MapTask工作機(jī)制

image.png
  1. Read階段:MapTask通過用戶編寫的RecordReader钠署,從輸入InputSplit中解析出一個(gè)個(gè)key/value篷牌。
  2. Map階段:該節(jié)點(diǎn)主要是將解析出的key/value交給用戶編寫map()函數(shù)處理,并產(chǎn)生一系列新的key/value踏幻。
  3. Collect收集階段:在用戶編寫map()函數(shù)中枷颊,當(dāng)數(shù)據(jù)處理完成后,一般會(huì)調(diào)用OutputCollector.collect()輸出結(jié)果。在該函數(shù)內(nèi)部夭苗,它會(huì)將生成的key/value分區(qū)(調(diào)用Partitioner)信卡,并寫入一個(gè)環(huán)形內(nèi)存緩沖區(qū)中。
  4. Spill階段:即“溢寫”题造,當(dāng)環(huán)形緩沖區(qū)滿后傍菇,MapReduce會(huì)將數(shù)據(jù)寫到本地磁盤上,生成一個(gè)臨時(shí)文件界赔。需要注意的是丢习,將數(shù)據(jù)寫入本地磁盤之前,先要對(duì)數(shù)據(jù)進(jìn)行一次本地排序淮悼,并在必要時(shí)對(duì)數(shù)據(jù)進(jìn)行合并咐低、壓縮等操作。

ReduceTask工作機(jī)制

image.png
  1. Copy階段:ReduceTask從各個(gè)MapTask上遠(yuǎn)程拷貝一片數(shù)據(jù)袜腥,并針對(duì)某一片數(shù)據(jù)见擦,如果其大小超過一定閾值,則寫到磁盤上羹令,否則直接放到內(nèi)存中
  2. Sort階段:在遠(yuǎn)程拷貝數(shù)據(jù)的同時(shí)鲤屡,ReduceTask啟動(dòng)了兩個(gè)后臺(tái)線程對(duì)內(nèi)存和磁盤上的文件進(jìn)行合并,以防止內(nèi)存使用過多或磁盤上文件過多福侈。按照MapReduce語義酒来,用戶編寫reduce()函數(shù)輸入數(shù)據(jù)是按key進(jìn)行聚集的一組數(shù)據(jù)。為了將key相同的數(shù)據(jù)聚在一起肪凛,Hadoop采用了基于排序的策略役首。由于各個(gè)MapTask已經(jīng)實(shí)現(xiàn)對(duì)自己的處理結(jié)果進(jìn)行了局部排序,因此显拜,ReduceTask只需對(duì)所有數(shù)據(jù)進(jìn)行一次歸并排序即可。
  3. Reduce階段:reduce()函數(shù)將計(jì)算結(jié)果寫到HDFS上

ReduceTask并行度決定機(jī)制

回顧:MapTask并行度有切片個(gè)數(shù)決定爹袁,切片個(gè)數(shù)由輸入文件和切片規(guī)則決定远荠。
思考:ReduceTask并行度由誰決定

  1. 設(shè)置ReduceTask并行度個(gè)數(shù)
    ReduceTask的并行度同樣影響整個(gè)Job的執(zhí)行并發(fā)讀和執(zhí)行效率,但與MapTask的并發(fā)數(shù)由切片數(shù)決定不同失息,ReduceTask數(shù)量的決定是可以直接手動(dòng)設(shè)置:
// 默認(rèn)值為1譬淳,手動(dòng)設(shè)置為4
job.setNumReduceTasks(4);
  1. 實(shí)驗(yàn):測(cè)試ReduceTask多少合適
    (1)實(shí)驗(yàn)環(huán)境
    1個(gè)Master節(jié)點(diǎn),16個(gè)Slave節(jié)點(diǎn)盹兢,CPU:8GHZ邻梆,內(nèi)存2G
    (2)實(shí)驗(yàn)結(jié)果


    image.png
  2. 注意事項(xiàng)
    (1)ReduceTask=0,表示沒有Reduce階段绎秒,輸出文件個(gè)數(shù)和Map個(gè)數(shù)一致浦妄。
    (2)ReduceTask默認(rèn)值就是1,所以輸出文件個(gè)數(shù)為1
    (3)如果數(shù)據(jù)分布不均勻,就有可能在Reduce階段產(chǎn)生數(shù)據(jù)傾斜
    (4)ReduceTask數(shù)量并不是任意設(shè)置剂娄,還要考慮業(yè)務(wù)邏輯需求蠢涝,有些情況下,需要計(jì)算全局匯總結(jié)果阅懦,就只能有一個(gè)ReduceTask
    (5)具體多少個(gè)ReduceTask和二,需要根據(jù)集群性能而定
    (6)如果分區(qū)數(shù)不是1,但是ReduceTask為1耳胎,不執(zhí)行分區(qū)過程惯吕,因?yàn)樵贛apTask源碼中,執(zhí)行分區(qū)的前提是先判斷ReduceTask個(gè)數(shù)是否為1怕午,不大于1肯定不執(zhí)行

MapTask & ReduceTask源碼跟蹤

MapTask

context.write(k, NullWritable.get()); //自定義的 map 方法的寫出废登,進(jìn)入
    output.write(key, value); 
        //MapTask727 行,收集方法诗轻,進(jìn)入兩次
        collector.collect(key, value,partitioner.getPartition(key, value, partitions));
            HashPartitioner(); //默認(rèn)分區(qū)器
        collect() //MapTask1082 行 map 端所有的 kv 全部寫出后會(huì)走下面的 close 方法
            close() //MapTask732 行
                collector.flush() // 溢出刷寫方法钳宪,MapTask735 行,提前打個(gè)斷點(diǎn)扳炬,進(jìn)入
                    sortAndSpill() //溢寫排序吏颖,MapTask1505 行,進(jìn)入
                        sorter.sort() QuickSort //溢寫排序方法恨樟,MapTask1625 行半醉,進(jìn)入
                    mergeParts(); //合并文件,MapTask1527 行劝术,進(jìn)入
                collector.close(); //MapTask739 行,收集器關(guān)閉,即將進(jìn)入 ReduceTask

ReduceTask

if (isMapOrReduce()) //reduceTask324 行缩多,提前打斷點(diǎn)
    initialize() // reduceTask333 行,進(jìn)入
    init(shuffleContext); // reduceTask375 行,走到這需要先給下面的打斷點(diǎn)
         totalMaps = job.getNumMapTasks(); // ShuffleSchedulerImpl 第 120 行,提前打斷點(diǎn)
         merger = createMergeManager(context); //合并方法养晋,Shuffle 第 80 行
            // MergeManagerImpl 第 232 235 行衬吆,提前打斷點(diǎn)
            this.inMemoryMerger = createInMemoryMerger(); //內(nèi)存合并
            this.onDiskMerger = new OnDiskMerger(this); //磁盤合并
    rIter = shuffleConsumerPlugin.run();
        eventFetcher.start(); //開始抓取數(shù)據(jù),Shuffle 第 107 行绳泉,提前打斷點(diǎn)
        eventFetcher.shutDown(); //抓取結(jié)束逊抡,Shuffle 第 141 行,提前打斷點(diǎn)
        copyPhase.complete(); //copy 階段完成零酪,Shuffle 第 151 行
        taskStatus.setPhase(TaskStatus.Phase.SORT); //開始排序階段冒嫡,Shuffle 第 152 行
    sortPhase.complete(); //排序階段完成,即將進(jìn)入 reduce 階段 reduceTask382 行
reduce(); //reduce 階段調(diào)用的就是我們自定義的 reduce 方法四苇,會(huì)被調(diào)用多次
    cleanup(context); //reduce 完成之前孝凌,會(huì)最后調(diào)用一次 Reducer 里面的 cleanup 方法

Join多種應(yīng)用

Reduce Join

Map端的主要工作:為來自不同表或文件的KV對(duì),打標(biāo)簽以區(qū)別不同來源的記錄月腋。然后用連接字段作為key蟀架,其余部分和新加的標(biāo)志作為value瓣赂,最后進(jìn)行輸出
Reduce端的主要工作:在Reduce端以連接字段作為key的分組已經(jīng)完成,我們只需要在每一個(gè)分組當(dāng)中將那些來源于不同文件的記錄(在Map階段已經(jīng)打標(biāo)志)分開辜窑,最后進(jìn)行合并就可以了

Reduce Join案例

  1. 需求
    輸入數(shù)據(jù)


    image.png

order.txt

1001    01  1
1002    02  2
1003    03  3
1004    01  4
1005    02  5
1006    03  6

pd.txt

01  小米
02  華為
03  格力

期望輸出


image.png
  1. 需求分析
    通過將關(guān)聯(lián)條件作為Map輸出的key钩述,將兩表滿足join條件的數(shù)據(jù)并攜帶數(shù)據(jù)所來源的文件信息,發(fā)往同一個(gè)ReduceTask穆碎,在Reduce中進(jìn)行數(shù)據(jù)的串聯(lián)


    image.png
  2. 編碼
    TableBean

public class TableBean implements Writable {
    // 訂單id
    private String id;
    // 商品id
    private String pid;
    // 商品數(shù)量
    private int amount;
    // 商品名稱
    private String pname;
    // 標(biāo)志字段 order pd
    private String flag;

    public TableBean() {

    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public int getAmount() {
        return amount;
    }

    public void setAmount(int amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(id);
        out.writeUTF(pid);
        out.writeInt(amount);
        out.writeUTF(pname);
        out.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        id = in.readUTF();
        pid = in.readUTF();
        amount = in.readInt();
        pname = in.readUTF();
        flag = in.readUTF();
    }

    @Override
    public String toString() {
        return id + "\t" + pname + "\t" + amount;
    }
}

Mapper

public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {

    private String fileName;
    private Text outKey = new Text();
    private TableBean outValue = new TableBean();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 初始化order pd
        FileSplit inputSplit = (FileSplit) context.getInputSplit();
        fileName = inputSplit.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 獲取一行
        String line = value.toString();
        System.out.println(line);


        // 判斷屬于哪個(gè)文件
        if (fileName.contains("order")) {
            // 切分牙勘, 防止多個(gè)文件切分邏輯不同
            String[] split = line.split("\t");
            // 訂單
            /*
            1001    01  1
            1002    02  2
             */
            // 封裝
            System.out.println(Arrays.toString(split));
            String id = split[0];
            String pid = split[1];
            int amount = Integer.parseInt(split[2]);

            outKey.set(pid);

            outValue.setId(id);
            outValue.setPid(pid);
            outValue.setAmount(amount);
            outValue.setPname("");
            outValue.setFlag("order");
        } else {
            // 切分
            String[] split = line.split("\t");
            // 產(chǎn)品
            /*
            01  小米
            02  華為
             */
            // 封裝
            String pid = split[0];
            String pname = split[1];

            outKey.set(pid);
            outValue.setId("");
            outValue.setPid(pid);
            outValue.setAmount(-1);
            outValue.setPname(pname);
            outValue.setFlag("pd");
        }

        // 寫出
        context.write(outKey, outValue);
    }
}

Reducer

public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
        /*
        01  1001    1   order
        01  1004    4   order
        01  小米  pd
         */
        // 初始化存儲(chǔ)
        List<TableBean> orderBeans = new ArrayList<>();
        TableBean pdBean = new TableBean();

        // 遍歷
        for (TableBean value : values) {
            if ("order".equals(value.getFlag())) {
                // 訂單
                TableBean tempBean = new TableBean();

                try {
                    BeanUtils.copyProperties(tempBean, value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }

                orderBeans.add(tempBean);
            } else {
                // 商品
                try {
                    BeanUtils.copyProperties(pdBean, value);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }

        // 遍歷orderBeans,為每個(gè)設(shè)置產(chǎn)品名
        for (TableBean orderBean : orderBeans) {
            orderBean.setPname(pdBean.getPname());
            context.write(orderBean, NullWritable.get());
        }
    }
}

Driver

public class TableDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 獲取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 設(shè)置jar路徑
        job.setJarByClass(TableDriver.class);
        // 關(guān)聯(lián)mapper所禀、reducer
        job.setMapperClass(TableMapper.class);
        job.setReducerClass(TableReducer.class);
        // 設(shè)置map輸出的KV類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TableBean.class);
        // 設(shè)置最終輸出的KV類型
        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);
        // 設(shè)置輸入路徑和輸出路徑
        FileInputFormat.setInputPaths(job, new Path("data/join"));
        FileOutputFormat.setOutputPath(job, new Path("join_result"));
        // 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}
  1. 總結(jié)
    缺點(diǎn):這種方式方面,合并的操作在Reduce階段完成,Reduce端的處理壓力太大色徘,Map節(jié)點(diǎn)的運(yùn)算負(fù)載很低恭金,資源利用率不高,而且在Reduce階段會(huì)產(chǎn)生數(shù)據(jù)傾斜
    解決方式:使用MapJoin

Map Join

  1. 使用場(chǎng)景
    Map Join適用于一張表很大褂策,一張表很小的場(chǎng)景

  2. 優(yōu)點(diǎn)
    思考:在Reduce端處理過多的表横腿,非常容易產(chǎn)生數(shù)據(jù)傾斜,怎么辦斤寂?
    在Map端緩存多張表耿焊,提前處理業(yè)務(wù)邏輯,這樣增加Map端業(yè)務(wù)遍搞,減少Reduce端數(shù)據(jù)的壓力罗侯,盡可能的減少數(shù)據(jù)傾斜

  3. 具體辦法:采用DistributedCache
    (1)在Mapper的setup階段,將文件讀取到緩存集合中
    (2)在Driver驅(qū)動(dòng)類中加載緩存

// 緩存普通文件到Task運(yùn)行的節(jié)點(diǎn)
job.addCacheFile(new URI("file://cache/xxx.txt"));
// 集群環(huán)境運(yùn)行需要設(shè)置HDFS路徑
job.addCacheFile(new URI("hdfs://ip:port/cache/xxx.txt"))

Map Join案例

  1. 需求同Reduce Join

  2. 需求分析


    image.png
  3. 編碼
    Driver

public class MapJoinDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        // 獲取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 設(shè)置jar路徑
        job.setJarByClass(MapJoinDriver.class);
        // 關(guān)聯(lián)mapper溪猿,不需要reducer
        job.setMapperClass(MapJoinMapper.class);
        job.setNumReduceTasks(0);
        // 設(shè)置緩存
        job.addCacheFile(new URI("cache/pd.txt"));
        // 設(shè)置map輸出的KV類型
        job.setMapOutputKeyClass(TableBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 設(shè)置最終輸出的KV類型
        job.setOutputKeyClass(TableBean.class);
        job.setOutputValueClass(NullWritable.class);
        // 設(shè)置輸入路徑和輸出路徑
        FileInputFormat.setInputPaths(job, new Path("data/join"));
        FileOutputFormat.setOutputPath(job, new Path("map_join_result"));
        // 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

Mapper

public class MapJoinMapper extends Mapper<LongWritable, Text, TableBean, NullWritable> {
    private Map<String, String> cache = new HashMap<>();
    private TableBean outKey = new TableBean();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 讀取緩存
        URI[] cacheFiles = context.getCacheFiles();

        FileSystem fs = FileSystem.get(context.getConfiguration());

        FSDataInputStream fsDataInputStream = fs.open(new Path(cacheFiles[0]));

        // 從流中讀取數(shù)據(jù)
        List<String> lines = IOUtils.readLines(fsDataInputStream, StandardCharsets.UTF_8);
        for (String line : lines) {
            String[] split = line.split("\t");
            cache.put(split[0], split[1]);
        }

        // 關(guān)流
        org.apache.hadoop.io.IOUtils.closeStreams(fsDataInputStream, fs);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 處理order.txt
        // 獲取一行
        String line = value.toString();

        // 切分
        String[] split = line.split("\t");

        /*
            1001    01  1
            1002    02  2
         */
        // 根據(jù)pid獲取緩存中的pname
        // 封裝
        outKey.setId(split[0]);
        outKey.setPname(cache.get(split[1]));
        outKey.setAmount(Integer.parseInt(split[2]));

        // 寫出
        context.write(outKey, NullWritable.get());
    }
}

MapReduce開發(fā)總結(jié)

  1. 輸入數(shù)據(jù)接口:InputFormat
    (1)默認(rèn)使用的實(shí)現(xiàn)類是:TextInputFormat
    (2)TextInputFormat的功能邏輯是:一次讀取一行文本钩杰,然后將該行的起始偏移量作為key,行內(nèi)容作為value返回
    (3)CombineTextInputFormat可以把多個(gè)小文件合并成一個(gè)切片處理诊县,提高處理效率
  2. 邏輯處理接口:Mapper
    用戶根據(jù)業(yè)務(wù)需求實(shí)現(xiàn)其中的三個(gè)方法:map()讲弄、setup()、cleanup()
  3. Partition分區(qū)
    (1)有默認(rèn)實(shí)現(xiàn)HashPartitioner依痊,邏輯是根據(jù)key的哈希值和numReduce來返回一個(gè)分區(qū)號(hào):key.hashCode() & Integer.MAX_VALUE % numReduces
    (2)如果業(yè)務(wù)上有特別的需求避除,可以自定義分區(qū)
  4. Comparable排序
    (1)當(dāng)我們用自定義的對(duì)象作為key來輸出時(shí),就必須要實(shí)現(xiàn)WritableComparable接口抗悍,重寫其中的compareTo方法
    (2)部分排序:對(duì)最終輸出的每一個(gè)文件進(jìn)行內(nèi)部排序
    (3)全排序:對(duì)所有數(shù)據(jù)進(jìn)行排序,通常只有一個(gè)Reduce
    (4)二次排序:排序的條件有兩個(gè)
  5. Combiner
    不影響最終的業(yè)務(wù)邏輯
    提前聚合map钳枕,解決數(shù)據(jù)傾斜
  6. Reducer
    用戶的業(yè)務(wù)邏輯
    setup()缴渊、reduce()、clearup()
  7. OutputFormat
    (1)默認(rèn)使用的實(shí)現(xiàn)類是:TextOutputFormat
    (2)TextOutputFormat的功能邏輯:將每個(gè)KV對(duì)項(xiàng)目表文件中寫一行
    (3)自定義OutputFormat
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末鱼炒,一起剝皮案震驚了整個(gè)濱河市衔沼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖指蚁,帶你破解...
    沈念sama閱讀 218,546評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件菩佑,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡凝化,警方通過查閱死者的電腦和手機(jī)稍坯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來搓劫,“玉大人瞧哟,你說我怎么就攤上這事∏瓜颍” “怎么了勤揩?”我有些...
    開封第一講書人閱讀 164,911評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)秘蛔。 經(jīng)常有香客問我陨亡,道長(zhǎng),這世上最難降的妖魔是什么深员? 我笑而不...
    開封第一講書人閱讀 58,737評(píng)論 1 294
  • 正文 為了忘掉前任负蠕,我火速辦了婚禮,結(jié)果婚禮上辨液,老公的妹妹穿的比我還像新娘虐急。我一直安慰自己,他們只是感情好滔迈,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,753評(píng)論 6 392
  • 文/花漫 我一把揭開白布止吁。 她就那樣靜靜地躺著,像睡著了一般燎悍。 火紅的嫁衣襯著肌膚如雪敬惦。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,598評(píng)論 1 305
  • 那天谈山,我揣著相機(jī)與錄音俄删,去河邊找鬼。 笑死奏路,一個(gè)胖子當(dāng)著我的面吹牛畴椰,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播鸽粉,決...
    沈念sama閱讀 40,338評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼斜脂,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了触机?” 一聲冷哼從身側(cè)響起帚戳,我...
    開封第一講書人閱讀 39,249評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤玷或,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后片任,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體偏友,經(jīng)...
    沈念sama閱讀 45,696評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,888評(píng)論 3 336
  • 正文 我和宋清朗相戀三年对供,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了位他。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,013評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡犁钟,死狀恐怖棱诱,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情涝动,我是刑警寧澤迈勋,帶...
    沈念sama閱讀 35,731評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站醋粟,受9級(jí)特大地震影響靡菇,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜米愿,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,348評(píng)論 3 330
  • 文/蒙蒙 一厦凤、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧育苟,春花似錦较鼓、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至漱竖,卻和暖如春禽篱,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背馍惹。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評(píng)論 1 270
  • 我被黑心中介騙來泰國打工躺率, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留芬探,地道東北人寝贡。 一個(gè)月前我還...
    沈念sama閱讀 48,203評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像辆影,于是被迫代替她去往敵國和親良狈。 傳聞我的和親對(duì)象是個(gè)殘疾皇子后添,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,960評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容