MapReduce(五):Shuffle機制

Shuffle機制

Map方法之后肋坚,Reduce方法之前的數(shù)據(jù)處理過程稱之為Shuffle涕癣。

2.3 Shuffle機制.png

Partition分區(qū)

如何按照條件輸出到不同文件(分區(qū))中,MapReduce提供了Partitioner功能。默認采用hash值的方式。

public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {

  public void configure(JobConf job) {}

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K2 key, V2 value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

默認分區(qū)是根據(jù)key的hashCode對ReduceTasks個數(shù)取模得到的酷勺。用戶沒法控制那個key存儲在那個分區(qū)。

自定義Partitioner步驟

1)自定義類繼承Partitioner扳躬,重寫getPartition()方法

public class ProvincePartitioner extends Partitioner<Text,FlowBean> {

   @Override
   public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
       String substring = text.toString().substring(0, 2);
       if("135".equals(substring)){
           return 0;
       }
       return 1;
   }
}

2)在Job驅動中脆诉,設置自定義Partitioner

        job.setPartitionerClass(ProvincePartitioner.class);

3)自定義Partition后甚亭,需要根據(jù)自定義Partitioner的邏輯設置相應數(shù)量的ReduceTask。

        job.setNumReduceTasks(2);

分區(qū)總結

1)如果ReduceTask數(shù)量>getPartition的結果數(shù)击胜,則會多產(chǎn)生幾個空的輸出文件part-r-oooxx;

2)如果1<ReduceTask的數(shù)量<getPartition的結果數(shù)亏狰,則有一部分分區(qū)數(shù)據(jù)無法安放,會Exception

3)如果ReduceTask的數(shù)量=1偶摔,則不管MapTask端輸出多少個分區(qū)文件暇唾,最終結果都交給這一個ReduceTask,最終也就只會產(chǎn)生一個結果文件part-r-00000辰斋;

4)分區(qū)號必須從零開始策州,逐一累加。

代碼實戰(zhàn)

FlowBean.java

public class FlowBean implements Writable {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean() {
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    @Override
    public String toString() {
        return upFlow +
            "\t" + downFlow +
            "\t" + sumFlow;
    }
}

FlowMapper.java

public class FlowMapper extends Mapper<LongWritable, Text,Text, FlowBean> {
    private Text outK = new Text();
    private FlowBean outV = new FlowBean();
    @Override
    protected void map(LongWritable key, Text value,
        Mapper<LongWritable, Text, Text, FlowBean>.Context context)
        throws IOException, InterruptedException {
        // 1 獲取一行
        String line = value.toString();
        // 2 切割
        String[] split = line.split(" ");
        System.out.println(split.length);
        // 3 抓取數(shù)據(jù)
        String phone = split[0];
        String upFlow = split[split.length-3];
        String downFlow = split[split.length-2];
        // 4 封裝
        outK.set(phone);
        outV.setUpFlow(Long.parseLong(upFlow));
        outV.setDownFlow(Long.parseLong(downFlow));
        outV.setSumFlow();

        context.write(outK,outV);
    }
}

FlowReducer.java

public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    private FlowBean outV = new FlowBean();
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values,
        Reducer<Text, FlowBean, Text, FlowBean>.Context context)
        throws IOException, InterruptedException {
        // 1 遍歷集合類價值
        long totalUp = 0;
        long totalDown = 0;
        for (FlowBean value : values) {
            totalUp += value.getUpFlow();
            totalDown += value.getDownFlow();
        }
        // 3 封裝
        outV.setUpFlow(totalUp);
        outV.setDownFlow(totalDown);
        outV.setSumFlow();
        // 4 寫出
        context.write(key,outV);

    }
}

ProvincePartitioner.java

public class ProvincePartitioner extends Partitioner<Text,FlowBean> {

    @Override
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
        String substring = text.toString().substring(0, 2);
        if("135".equals(substring)){
            return 0;
        }
        return 1;
    }
}

FlowDriver.java

public class FlowDriver {

    public static void main(String[] args)
        throws IOException, InterruptedException, ClassNotFoundException {
        // 1 獲取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2 設置jar
        job.setJarByClass(FlowDriver.class);
        // 3 關聯(lián)mapper和reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        // 4 設置mapper輸出的key和value類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        // 5 設置最終輸出的key和value類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        job.setPartitionerClass(ProvincePartitioner.class);
        job.setNumReduceTasks(2);
        // 6 設置數(shù)據(jù)的輸入路徑和輸出路徑
        FileInputFormat.setInputPaths(job,new Path(System.getProperty("user.dir")+"/input/partitioner2"));
        FileOutputFormat.setOutputPath(job,new Path(System.getProperty("user.dir")+"/output/partitioner2"));
        // 7 提交job
        Boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

WritableComparable排序

排序是MapReduce框架中最重要的操作之一亡呵。

MapTask和ReduceTask均會對數(shù)據(jù)按照Key進行排序抽活。該操作屬于Hadoop的默認行為硫戈。任何應用程序中的數(shù)據(jù)均會被排序锰什,而不管邏輯上是否需要。

默認排序是按照字典順序排序丁逝,且實現(xiàn)該排序的方法是快速排序汁胆。

排序概述

對于MapTask,它會將處理的結果暫時放到環(huán)形緩沖區(qū)中霜幼,當環(huán)形緩沖區(qū)使用率達到一定閾值后嫩码,再對緩沖區(qū)中的數(shù)據(jù)進行一次快速排序(內存完成),并將這些有序數(shù)據(jù)溢寫到磁盤上罪既,而當數(shù)據(jù)處理完畢后铸题,它會對磁盤上所有文件進行歸并排序。

對于ReduceTask琢感,它從每個MapTask上遠程拷貝相應的數(shù)據(jù)文件丢间,如果文件大小超過一定閾值,則溢寫磁盤上驹针,否則存儲在內存中烘挫。如果磁盤上文件數(shù)目達到一定閾值,則進行一次歸并排序以生成一個更大文件柬甥;如果內存中文件大小或者數(shù)目超過一定閾值饮六,則進行一次合并后將數(shù)據(jù)溢寫到磁盤上。當所有數(shù)據(jù)拷貝完畢后苛蒲,ReduceTask統(tǒng)一對內存和磁盤上的所有數(shù)據(jù)進行一次歸并排序卤橄。

排序分類

1)部分排序

MapReduce根據(jù)輸入記錄的鍵對數(shù)據(jù)集排序。保證輸出的每個文件內部有序臂外。

2)全排序

最終輸出結果只有一個文件虽风,且文件內部有序棒口。實現(xiàn)方式是只設置一個ReduceTask。但該方法在處理大型文件時效率極低辜膝,因為一臺機器處理所有文件无牵,完全喪失了MapReduce所提供的并行架構。

3)輔助排序

在Reduce端對key進行分組厂抖。應用于:在接收的key為bean對象時茎毁,想讓一個或幾個字段相同(全部字段比較不同)的key進入到同一個reduce方法時,可以采用分組排序忱辅。

4)二次排序

在自定義排序過程中七蜘,如果compareTo中的判斷條件為兩個即為二次排序。

自定義排序WritableComparable原理分析

bean對象作為key傳輸墙懂,需要實現(xiàn)WritableComparable接口重寫compareTo方法橡卤,就可以實現(xiàn)排序。

FlowBean.java

public class FlowBean implements WritableComparable<FlowBean> {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean() {
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    @Override
    public String toString() {
        return upFlow +
            "\t" + downFlow +
            "\t" + sumFlow;
    }

    @Override
    public int compareTo(FlowBean o) {
        if (this.sumFlow > o.sumFlow) {
            return -1;
        } else if (this.sumFlow < o.sumFlow) {
            return 1;
        } else {
            if (this.upFlow > o.upFlow) {
                return 1;
            } else if (this.upFlow < o.upFlow) {
                return -1;
            } else {
                return 0;
            }
        }
    }
}

FlowMapper.java

public class FlowMapper extends Mapper<LongWritable, Text,FlowBean, Text> {
    private FlowBean  outK= new FlowBean();
    private Text outV = new Text();

    @Override
    protected void map(LongWritable key, Text value,
        Mapper<LongWritable, Text, FlowBean, Text>.Context context)
        throws IOException, InterruptedException {
        // 獲取1行
        String line = value.toString();
        // 切割
        String[] split = line.split(" ");
        // 封裝
        outV.set(split[0]);
        outK.setUpFlow(Long.parseLong(split[1]));
        outK.setDownFlow(Long.parseLong(split[2]));
        // 寫出
        context.write(outK,outV);
    }
}

FlowReducer.java

public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
    private FlowBean outV = new FlowBean();

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values,
        Reducer<FlowBean, Text, Text, FlowBean>.Context context)
        throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(value,key);
        }

    }
}

FlowDriver.java

public class FlowDriver {

    public static void main(String[] args)
        throws IOException, InterruptedException, ClassNotFoundException {
        // 1 獲取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2 設置jar
        job.setJarByClass(FlowDriver.class);
        // 3 關聯(lián)mapper和reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        // 4 設置mapper輸出的key和value類型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);
        // 5 設置最終輸出的key和value類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 6 設置數(shù)據(jù)的輸入路徑和輸出路徑
        FileInputFormat.setInputPaths(job, new Path(System.getProperty("user.dir")+"/input/writeableComparable"));
        FileOutputFormat.setOutputPath(job, new Path(System.getProperty("user.dir")+"/output/writeableComparable"));
        // 7 提交job
        Boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

Combiner

Combiner

1)Combiner是MR程序中Mapper和Reducer之外的一種組件损搬。

2)Combiner組件的父類就是Reducer碧库。

3)Combiner和Reducer的區(qū)別在于運行的位置。

Combiner是在每一個MapTask所在的節(jié)點運行巧勤;

Reducer是接受全局所有Mapper的輸出結果嵌灰;

4)Combiner的意義就是對每一個MapTask的輸出進行局部匯總,以減少網(wǎng)絡流量颅悉。

5)Combiner能夠應用的前提是不能影響最終的業(yè)務邏輯沽瞭,而且,Combiner的輸出kv能夠跟Reducer的輸入kv類型要對應起來剩瓶。

6)因為Combiner代碼和Reducer代碼一致驹溃,可以直接設置Reducer代碼為Combiner代碼

案例

WordCountMapper.java

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text outKey = new Text();
    private IntWritable outV = new IntWritable(1);

    @Override
    public void map(LongWritable key, Text value,
        Mapper<LongWritable, Text, Text, IntWritable>.Context context)
        throws IOException, InterruptedException {
        // 1 獲取一行
        String lineStr = value.toString();
        // 2 切割
        String[] words = lineStr.split(" ");
        // 3 循環(huán)寫出
        for (String word : words) {
            // 封裝outKey
            outKey.set(word);
            // 寫出
            context.write(outKey, outV);
        }
    }
}

WordCountReducer.java

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    IntWritable outV = new IntWritable();

    @Override
    public void reduce(Text key, Iterable<IntWritable> values,
        Reducer<Text, IntWritable, Text, IntWritable>.Context context)
        throws IOException, InterruptedException {
        int sum = 0;
        // 累加
        for (IntWritable value : values) {
            sum += value.get();
        }
        outV.set(sum);
        // 寫出
        context.write(key,outV);
    }
}

WordCountCombiner.java

public class WordCountCombiner extends Reducer<Text, IntWritable,Text, IntWritable> {

    private IntWritable outV = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
        Reducer<Text, IntWritable, Text, IntWritable>.Context context)
        throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        outV.set(sum);
        context.write(key,outV);

    }
}

WordCountDriver.java

public class WordCountDriver {

    public static void main(String[] args) throws Exception {
        //1 獲取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //2 設置jar包路徑
        job.setJarByClass(WordCountDriver.class);
        //3 關聯(lián)mapper、reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        //4 設置mapper輸出的kv類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //5 設置最終輸出的kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setCombinerClass(WordCountCombiner.class);
        // 可以直接將Reducer設置為Combiner延曙,因為這兩處代碼邏輯一致
        // job.setCombinerClass(WordCountReducer.class);
        //6 設置輸入路徑和輸出路徑
        FileInputFormat.setInputPaths(job, new Path(System.getProperty("user.dir")+"/input/combiner"));
        FileOutputFormat.setOutputPath(job, new Path(System.getProperty("user.dir")+"/output/combiner"));
        //7 提交job
        Boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

小結

本小節(jié)是重點M愫住!搂鲫!描述了Shuffle機制(在mapper之后reducer之前傍药,如果沒有reducer那么combiner將不執(zhí)行)。詳細描述了分區(qū)魂仍、排序以及聚合拐辽,多理解。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末擦酌,一起剝皮案震驚了整個濱河市俱诸,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌赊舶,老刑警劉巖睁搭,帶你破解...
    沈念sama閱讀 222,104評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件赶诊,死亡現(xiàn)場離奇詭異,居然都是意外死亡园骆,警方通過查閱死者的電腦和手機舔痪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來锌唾,“玉大人锄码,你說我怎么就攤上這事∩翁椋” “怎么了滋捶?”我有些...
    開封第一講書人閱讀 168,697評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長余黎。 經(jīng)常有香客問我重窟,道長,這世上最難降的妖魔是什么惧财? 我笑而不...
    開封第一講書人閱讀 59,836評論 1 298
  • 正文 為了忘掉前任巡扇,我火速辦了婚禮,結果婚禮上可缚,老公的妹妹穿的比我還像新娘霎迫。我一直安慰自己斋枢,他們只是感情好帘靡,可當我...
    茶點故事閱讀 68,851評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著瓤帚,像睡著了一般描姚。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上戈次,一...
    開封第一講書人閱讀 52,441評論 1 310
  • 那天轩勘,我揣著相機與錄音,去河邊找鬼怯邪。 笑死绊寻,一個胖子當著我的面吹牛,可吹牛的內容都是我干的悬秉。 我是一名探鬼主播澄步,決...
    沈念sama閱讀 40,992評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼和泌!你這毒婦竟也來了村缸?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,899評論 0 276
  • 序言:老撾萬榮一對情侶失蹤武氓,失蹤者是張志新(化名)和其女友劉穎梯皿,沒想到半個月后仇箱,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,457評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡东羹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,529評論 3 341
  • 正文 我和宋清朗相戀三年剂桥,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片属提。...
    茶點故事閱讀 40,664評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡渊额,死狀恐怖,靈堂內的尸體忽然破棺而出垒拢,到底是詐尸還是另有隱情旬迹,我是刑警寧澤,帶...
    沈念sama閱讀 36,346評論 5 350
  • 正文 年R本政府宣布求类,位于F島的核電站奔垦,受9級特大地震影響,放射性物質發(fā)生泄漏尸疆。R本人自食惡果不足惜椿猎,卻給世界環(huán)境...
    茶點故事閱讀 42,025評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望寿弱。 院中可真熱鬧犯眠,春花似錦、人聲如沸症革。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽噪矛。三九已至分预,卻和暖如春暂殖,著一層夾襖步出監(jiān)牢的瞬間朝刊,已是汗流浹背负懦。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留缩滨,地道東北人势就。 一個月前我還...
    沈念sama閱讀 49,081評論 3 377
  • 正文 我出身青樓,卻偏偏與公主長得像脉漏,于是被迫代替她去往敵國和親苞冯。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,675評論 2 359

推薦閱讀更多精彩內容