Hadoop分布式系統(tǒng)架構(gòu)-MapReduce-02

1、MapReduce介紹

MapReduce思想在生活中處處可見蜘渣」接茫或多或少都曾接觸過這種思想。MapReduce的思想核心是“分而治之”澜术,適用于大量復(fù)雜的任務(wù)處理場景(大規(guī)模數(shù)據(jù)處理場景)艺蝴。
?????? Map負責(zé)“分”,即把復(fù)雜的任務(wù)分解為若干個“簡單的任務(wù)”來并行處理鸟废〔赂遥可以進行拆分的前提是這些小任務(wù)可以并行計算,彼此間幾乎沒有依賴關(guān)系盒延。
??????Reduce負責(zé)“合”缩擂,即對map階段的結(jié)果進行全局匯總。
MapReduce運行在yarn集群

  1. ResourceManager
  2. NodeManager

2兰英、MapReduce編程規(guī)范

MapReduce 的開發(fā)一共有八個步驟, 其中 Map 階段分為 2 個步驟撇叁,Shuffle 階段 4 個步驟,Reduce 階段分為 2 個步驟畦贸。

Map 階段 2 個步驟:

  1. 設(shè)置 InputFormat 類, 將數(shù)據(jù)切分為 Key-Value(K1和V1) 對陨闹,輸入到第二步
  2. 自定義 Map 邏輯, 將第一步的結(jié)果轉(zhuǎn)換成另外的 Key-Value(K2和V2) 對楞捂,輸出結(jié)果

Shle 階段 4 個步驟:

  1. 對輸出的 Key-Value 對進行分區(qū)
  2. 對不同分區(qū)的數(shù)據(jù)按照Key 排序
  3. (可選) 對分組過的數(shù)據(jù)初步規(guī)約, 降低數(shù)據(jù)的網(wǎng)絡(luò)拷貝
  4. 對數(shù)據(jù)進行分組,相同 Key 的 Value 放入一個集合中

Reduce 階段 2 個步驟:

  1. 對多個 Map 任務(wù)的結(jié)果進行排序以及合并, 編寫 Reduce 函數(shù)實現(xiàn)自己的邏輯, 對輸入的
    Key-Value 進行處理, 轉(zhuǎn)為新的 Key-Value(K3和V3)輸出
  2. 設(shè)置 OutputFormat 處理并保存 Reduce 輸出的 Key-Value 數(shù)據(jù)

2.2趋厉、單詞計數(shù)案例

自定義map階段代碼

/**
 * @author : HaiLiang Huang
 * @author : Always Best Sign X
 */
public class wordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] strings = value.toString().split(" ");
        for (String string : strings) {
            context.write(new Text(string),new LongWritable(1));
        }
    }
}

自定義Reduce階段代碼

/**
 * @author : HaiLiang Huang
 * @author : Always Best Sign X
 */
public class wordCountReduce extends Reducer<Text, LongWritable,Text,LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long count = 0;
        for (LongWritable value : values) {
            count += value.get();
        }
        context.write(key,new LongWritable(count));
    }
}

編寫主方法
在編寫主方法的時候寨闹,時刻記住上面MapReduce編程規(guī)范中的八個階段的步驟(代碼中的注解說明);


/**
 * @author : HaiLiang Huang
 * @author : Always Best Sign X
 */
public class JobMain extends Configured implements Tool {

    @Override
    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(super.getConf(), JobMain.class.getSimpleName());
        //打包程序到集群上面運行的時候君账,需要指定出程序的main方法繁堡。
        job.setJarByClass(JobMain.class);
        // 第一步:讀取文件解析成key value對
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("hdfs://master:8020/a.txt"));
        // 第二步:設(shè)置自定的Mapper類
        job.setMapperClass(wordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
       //第三步,第四步乡数,第五步椭蹄,第六步,省略
       //第七步:設(shè)置自定義的reduce類
        job.setReducerClass(wordCountReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //第八步:設(shè)置輸出類以及輸出路徑
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("hdfs://master:8020/wordCount_out"));

        boolean b = job.waitForCompletion(true);

        return b?0:1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        Tool tool = new JobMain();
        int run = ToolRunner.run(configuration, tool, args);
        System.exit(run);
    }
}

2.3净赴、分區(qū)Partition

可以在自定義的Partition中定義一些分區(qū)的邏輯绳矩,然后在程序?qū)嶋H運行的過程中,符合分區(qū)邏輯的會被分到一個Reduce中玖翅。

public class Mypartition extends Partitioner<Text, NullWritable> {
    @Override
    public int getPartition(Text text, NullWritable nullWritable, int i) {
        String number = text.toString().split("\t")[5];
        if (Integer.parseInt(number) > 15){
            return 1;
        } else {
            return 0;
        }
    }
}

在編寫完自定義的Partition之后翼馆,在Main方法中還需要添加setPartition方法以及setNumReduceTasks方法。

        job.setPartitionerClass(Mypartition.class);
        job.setNumReduceTasks(2);

2.4金度、MapReduce排序和序列化

1应媚、序列化 (Serialization) 是指把結(jié)構(gòu)化對象轉(zhuǎn)化為字節(jié)流
2、反序列化 (Deserialization) 是序列化的逆過程. 把字節(jié)流轉(zhuǎn)為結(jié)構(gòu)化對象. 當(dāng)要在進程間傳遞對象或持久化對象的時候, 就需要序列化對象成字節(jié)流, 反之當(dāng)要將接收到或從磁盤讀取的字節(jié)流轉(zhuǎn)換為對象, 就要進行反序列化猜极。
3中姜、Java 的序列化 (Serializable) 是一個重量級序列化框架, 一個對象被序列化后, 會附帶很多額外的信息 (各種校驗信息, header, 繼承體系等), 不便于在網(wǎng)絡(luò)中高效傳輸. 所以, Hadoop自己開發(fā)了一套序列化機制(Writable), 精簡高效. 不用像 Java 對象類一樣傳輸多層的父子關(guān)系, 需要哪個屬性就傳輸哪個屬性值, 大大的減少網(wǎng)絡(luò)傳輸?shù)拈_銷
4、Writable 是 Hadoop 的序列化格式, Hadoop 定義了這樣一個 Writable 接口. 一個類要支持可序列化只需實現(xiàn)這個接口即可
5魔吐、另外 Writable 有一個子接口是 WritableComparable, WritableComparable 是既可實現(xiàn)序列化, 也可以對key進行比較, 我們這里可以通過自定義 Key 實現(xiàn) WritableComparable 來實現(xiàn)我們的排序功能扎筒。

2.4.2、小案例

現(xiàn)有一數(shù)據(jù)集酬姆,描述如下:
a 1
a 9
b 3
a 7
b 8
b 10
a 5
要求:
1、第一列按照字典順序進行排列奥溺。
2辞色、第一列相同的時候,按照第二列升序進行排列浮定。

分析:
1相满、首先Map階段,讀取文件中數(shù)據(jù)K1是文本偏移量克婶,V1是讀取一行的內(nèi)容绊谭;
2脖苏、Map將<K1 ,V1>轉(zhuǎn)化成<K2,V2>,K2是PairWritable建蹄,V2是second碌更;
3、我們在PairWritable類中定義了比較器洞慎,所以在shuffle階段中的排序痛单,會自動對K2進行排序。
4劲腿、Reduce階段將<K2,V2>轉(zhuǎn)化成<K3,V3>輸出旭绒,這里面的K3,V3就是我們想要的 “字符串 數(shù)字”格式。

實現(xiàn):
Step1焦人、自定義類型和比較器

package MapReduce.SerializationTest;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author : HaiLiang Huang
 * @author : Always Best Sign X
 */
public class PairWritable implements WritableComparable<PairWritable> {

    private String first;
    private int second;
    public PairWritable(){

    }
    public PairWritable(String first,int second){
        this.set(first,second);
    }
    public void set(String first,int second){
        this.first = first;
        this.second = second;
    }


    // 序列化
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(first);
        dataOutput.writeInt(second);
    }


    //反序列化
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.first = dataInput.readUTF();
        this.second = dataInput.readInt();
    }

    @Override
    public int compareTo(PairWritable o) {
        
        int comp = this.first.compareTo(o.first);
        if (comp!=0){
            return comp;
        }else {
            return Integer.valueOf(this.second).compareTo(Integer.valueOf(o.getSecond()));
        }
    }

    public String getFirst() {
        return first;
    }

    public void setFirst(String first) {
        this.first = first;
    }

    public int getSecond() {
        return second;
    }

    public void setSecond(int second) {
        this.second = second;
    }

    @Override
    public String toString() {
        return "PairWritable{" +
                "first='" + first + '\'' +
                ", second=" + second +
                '}';
    }
}

Step2挥吵、編寫Map方法

package MapReduce.SerializationTest;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author : HaiLiang Huang
 * @author : Always Best Sign X
 */
public class SortMapperTest extends Mapper<LongWritable, Text, PairWritable, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] s = value.toString().split("\t");

        context.write(new PairWritable(s[0], Integer.valueOf(s[1])), new IntWritable(Integer.valueOf(s[1])));
    }
}

Step3、編寫Reduce方法

package MapReduce.SerializationTest;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author : HaiLiang Huang
 * @author : Always Best Sign X
 */
public class SortReducerTest extends Reducer<PairWritable, IntWritable, Text,IntWritable> {
    @Override
    protected void reduce(PairWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        for (IntWritable value : values) {
            context.write(new Text(key.getFirst()),value);
        }
    }
}

Step4花椭、編寫主方法

package MapReduce.SerializationTest;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author : HaiLiang Huang
 * @author : Always Best Sign X
 */
public class SortMain extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
        Configuration conf = super.getConf();
        conf.set("mapreduce.framework.name","local");
        Job job = Job.getInstance(conf, SortMain.class.getSimpleName());
        job.setJarByClass(SortMain.class);

        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("D:\\input\\Sort"));

        job.setMapperClass(SortMapperTest.class);
        job.setMapOutputKeyClass(PairWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setReducerClass(SortReducerTest.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setNumReduceTasks(1);
        
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("D:\\input\\SortResult"));

        boolean b = job.waitForCompletion(true);

        return b?0:1;

    }



    public static void main(String[] args) {
        try {
            int run = ToolRunner.run(new Configuration(), new SortMain(), args);
            System.exit(run);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

2.5忽匈、Partition分區(qū)

為了數(shù)據(jù)的統(tǒng)計, 可以把一批類似的數(shù)據(jù)發(fā)送到同一個 Reduce 當(dāng)中, 在同一個 Reduce 當(dāng)中統(tǒng)計相同類型的數(shù)據(jù), 就可以實現(xiàn)類似的數(shù)據(jù)分區(qū)和統(tǒng)計等其實就是相同類型的數(shù)據(jù), 有共性的數(shù)據(jù), 送到一起去處理。其中控制如何將相同類型的數(shù)據(jù)發(fā)給同一Reduce个从,由Partition來控制邏輯脉幢。
Reduce 當(dāng)中默認的分區(qū)只有一個。

案例:
需求:
將partition.csv 這個文本文件中第六個字段也就是開獎結(jié)果數(shù)值嗦锐,現(xiàn)在需求將15以上的結(jié)果以及15以下的結(jié)果進行分開成兩個文件進行保存嫌松。

實現(xiàn):
Step 1、定義Mapper

/**
 * @author : HaiLiang Huang
 * @author : Always Best Sign X
 */
public class ParitionMapper extends Mapper<LongWritable, Text, IntWritable,Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split("\t");
        context.write(new IntWritable(Integer.valueOf(split[5])),value);
    }
}

Step 2奕污、定義Partitioner

/**
 * @author : HaiLiang Huang
 * @author : Always Best Sign X
 */
public class MyPartition extends Partitioner<IntWritable, Text> {
    @Override
    public int getPartition(IntWritable intWritable, Text text, int i) {
        if (intWritable.get() > 15) return 1;
        else return 0;
    }
}

Step 1萎羔、定義Reducer

/**
 * @author : HaiLiang Huang
 * @author : Always Best Sign X
 */
public class PartitionReducer extends Reducer<IntWritable, Text,Text, NullWritable> {
    @Override
    protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(value,NullWritable.get());
        }

    }
}

Step 1、定義Main

/**
 * @author : HaiLiang Huang
 * @author : Always Best Sign X
 */
public class PartitionMain extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(super.getConf(), "partition");

        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.setInputPaths(job, new Path("file:///D:\\input\\partition"));

        job.setMapperClass(ParitionMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(PartitionReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setPartitionerClass(MyPartition.class);
        job.setNumReduceTasks(2);

        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("file:///D:\\output_out\\partition22"));

        boolean b = job.waitForCompletion(true);

        return b ? 0 : 1;

    }
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        PartitionMain partitionMain = new PartitionMain();
        int run = ToolRunner.run(configuration, partitionMain, args);
        System.exit(run);
    }
}

2.6碳默、規(guī)約Combiner

2.6.1贾陷、概念

每一個 map 都可能會產(chǎn)生大量的本地輸出,Combiner 的作用就是對 map 端的輸出先做一次合并嘱根,以減少在 map 和 reduce 節(jié)點之間的數(shù)據(jù)傳輸量髓废,以提高網(wǎng)絡(luò)IO 性能,是 MapReduce的一種優(yōu)化手段之一该抒。

(1)combiner是 MR 程序中 Mapper 和 Reducer 之外的一種組件
(2)combiner 組件的父類就是 Reducer
(3)combiner 和 reducer 的區(qū)別在于運行的位置
    (3.1)Combiner 是在每一個 maptask 所在的節(jié)點運行
    (3.2)Reducer 是接收全局所有 Mapper 的輸出結(jié)果
(4)combiner 的意義就是對每一個 maptask 的輸出進行局部匯總慌洪,以減小網(wǎng)絡(luò)傳輸量

2.6.2、實現(xiàn)

(1)自定義一個 combiner 繼承 Reducer凑保,重寫 reduce 方法
(2)在 job 中設(shè)置 job.setCombinerClass(CustomCombiner.class)

3冈爹、MapReduce運行機制

3.1、MapTask工作機制

image.png

整個Map階段流程大體:

(1)讀取數(shù)據(jù)組件 InputFormat (默認 TextInputFormat) 會通過 getSplits 方法對輸入目錄中文件進行邏輯切片規(guī)劃得到 block , 有多少個 block 就對應(yīng)啟動多少個 MapTask .
(2)將輸入文件切分為 block 之后, 由 RecordReader 對象 (默認是LineRecordReader) 進行讀取, 以 \n 作為分隔符, 讀取一行數(shù)據(jù), 返回 <key欧引,value> . Key 表示每行首字符偏移值, Value 表示這一行文本內(nèi)容
(3)讀取 block 返回 <key,value> , 進入用戶自己繼承的 Mapper 類中频伤,執(zhí)行用戶重寫的 map 函數(shù),RecordReader 讀取一行這里調(diào)用一次
(4)Mapper 邏輯結(jié)束之后, 將 Mapper 的每條結(jié)果通過 context.write 進行collect數(shù)據(jù)收集. 在 collect 中, 會先對其進行分區(qū)處理,默認使用 HashPartitioner芝此。

MapReduce 提供 Partitioner 接口, 它的作用就是根據(jù) Key 或 Value 及Reducer 的數(shù)量來決定當(dāng)前的這對輸出數(shù)據(jù)最終應(yīng)該交由哪個 Reducetask 處理, 默認對 Key Hash 后再以 Reducer 數(shù)量取模. 默認的取模方式只是為了平均 Reducer 的處理能力, 如果用戶自己對 Partitioner 有需求, 可以訂制并設(shè)置到 Job 上憋肖。

(5)接下來, 會將數(shù)據(jù)寫入內(nèi)存, 內(nèi)存中這片區(qū)域叫做環(huán)形緩沖區(qū), 緩沖區(qū)的作用是批量收集Mapper 結(jié)果, 減少磁盤 IO 的影響. 我們的 Key/Value 對以及 Partition 的結(jié)果都會被寫入緩沖區(qū). 當(dāng)然, 寫入之前因痛,Key 與 Value 值都會被序列化成字節(jié)數(shù)組。

1瞬哼、環(huán)形緩沖區(qū)其實是一個數(shù)組, 數(shù)組中存放著 Key, Value 的序列化數(shù)據(jù)和 Key,Value 的元數(shù)據(jù)信息, 包括 Partition, Key 的起始位置, Value 的起始位置以及 Value的長度. 環(huán)形結(jié)構(gòu)是一個抽象概念婚肆。
2、緩沖區(qū)是有大小限制, 默認是 100MB. 當(dāng) Mapper 的輸出結(jié)果很多時, 就可能會撐爆內(nèi)存, 所以需要在一定條件下將緩沖區(qū)中的數(shù)據(jù)臨時寫入磁盤, 然后重新利用這塊緩沖區(qū). 這個從內(nèi)存往磁盤寫數(shù)據(jù)的過程被稱為 Spill, 中文可譯為溢寫. 這個溢寫是由單獨線程來完成, 不影響往緩沖區(qū)寫 Mapper 結(jié)果的線程. 溢寫線程啟動時不應(yīng)該阻止 Mapper 的結(jié)果輸出, 所以整個緩沖區(qū)有個溢寫的比例spill.percent . 這個比例默認是 0.8, 也就是當(dāng)緩沖區(qū)的數(shù)據(jù)已經(jīng)達到閾值buffer size * spill percent = 100MB * 0.8 = 80MB , 溢寫線程啟動,鎖定這80MB 的內(nèi)存, 執(zhí)行溢寫過程. Mapper 的輸出結(jié)果還可以往剩下的 20MB內(nèi)存中寫, 互不影響坐慰。

(6)當(dāng)溢寫線程啟動后, 需要對這 80MB 空間內(nèi)的 Key 做排序 (Sort). 排序是 MapReduce 模型默認的行為, 這里的排序也是對序列化的字節(jié)做的排序

1较性、如果 Job 設(shè)置過 Combiner, 那么現(xiàn)在就是使用 Combiner 的時候了. 將有相同Key 的 Key/Value 對的 Value 加起來, 減少溢寫到磁盤的數(shù)據(jù)量. Combiner 會優(yōu)化MapReduce 的中間結(jié)果, 所以它在整個模型中會多次使用
2、那哪些場景才能使用 Combiner 呢? 從這里分析, Combiner 的輸出是 Reducer 的輸入, Combiner 絕不能改變最終的計算結(jié)果. Combiner 只應(yīng)該用于那種 Reduce的輸入 Key/Value 與輸出 Key/Value 類型完全一致, 且不影響最終結(jié)果的場景. 比如累加, 最大值等. Combiner 的使用一定得慎重, 如果用好, 它對 Job 執(zhí)行效率有幫助, 反之會影響 Reducer 的最終結(jié)果

(7)合并溢寫文件, 每次溢寫會在磁盤上生成一個臨時文件 (寫之前判斷是否有 Combiner), 如果 Mapper 的輸出結(jié)果真的很大, 有多次這樣的溢寫發(fā)生, 磁盤上相應(yīng)的就會有多個臨時文件存在. 當(dāng)整個數(shù)據(jù)處理結(jié)束之后開始對磁盤中的臨時文件進行 Merge 合并, 因為最終的文件只有一個, 寫入磁盤, 并且為這個文件提供了一個索引文件, 以記錄每個reduce對應(yīng)數(shù)據(jù)的偏移量结胀。

3.2赞咙、ReduceTask工作機制

ReduceTask.png

(1)Copy階段 ,簡單地拉取數(shù)據(jù)糟港。Reduce進程啟動一些數(shù)據(jù)copy線程(Fetcher)攀操,通過HTTP方式請求maptask獲取屬于自己的文件。
(2)Merge階段 秸抚。這里的merge如map端的merge動作速和,只是數(shù)組中存放的是不同map端copy來的數(shù)值。Copy過來的數(shù)據(jù)會先放入內(nèi)存緩沖區(qū)中剥汤,這里的緩沖區(qū)大小要比map端的更為靈活颠放。merge有三種形式:內(nèi)存到內(nèi)存;內(nèi)存到磁盤吭敢;磁盤到磁盤碰凶。默認情況下第一種形式不啟用,當(dāng)內(nèi)存中的數(shù)據(jù)量到達一定閾值鹿驼,就啟動內(nèi)存到磁盤的merge欲低。與map端類似,這也是溢寫的過程畜晰,這個過程中如果你設(shè)置有Combiner砾莱,也是會啟用的,然后在磁盤中生成了眾多的溢寫文件凄鼻。第二種merge方式一直在運行恤磷,直到?jīng)]有map端的數(shù)據(jù)時才結(jié)束,然后啟動第三種磁盤到磁盤的merge方式生成最終的文件野宜。
(3)合并排序 。把分散的數(shù)據(jù)合并成一個大的數(shù)據(jù)后魔策,還會再對合并后的數(shù)據(jù)排序匈子。
(4)對排序后的鍵值對調(diào)用reduce方法 ,鍵相等的鍵值對調(diào)用一次reduce方法闯袒,每次調(diào)用會產(chǎn)生零個或者多個鍵值對虎敦,最后把這些輸出的鍵值對寫入到HDFS文件中游岳。

3.3、Shuffle過程

Shuffle過程是整個MapReduce的核心其徙,Shuffle過程的核心操作有:數(shù)據(jù)分區(qū)胚迫,排序,分組唾那,規(guī)約等過程访锻。MapReduce三個階段八個步驟

(1)Collect階段 :將 MapTask 的結(jié)果輸出到默認大小為 100M 的環(huán)形緩沖區(qū),保存的是key/value闹获,Partition 分區(qū)信息等期犬。
(2)Spill階段 :當(dāng)內(nèi)存中的數(shù)據(jù)量達到一定的閥值的時候,就會將數(shù)據(jù)寫入本地磁盤避诽,在將數(shù)據(jù)寫入磁盤之前需要對數(shù)據(jù)進行一次排序的操作龟虎,如果配置了 combiner,還會將有相同分區(qū)號和 key 的數(shù)據(jù)進行排序沙庐。
(3)Merge階段 :把所有溢出的臨時文件進行一次合并操作鲤妥,以確保一個 MapTask 最終只產(chǎn)生一個中間數(shù)據(jù)文件。
(4)Copy階段 :ReduceTask 啟動 Fetcher 線程到已經(jīng)完成 MapTask 的節(jié)點上復(fù)制一份屬于自己的數(shù)據(jù)拱雏,這些數(shù)據(jù)默認會保存在內(nèi)存的緩沖區(qū)中棉安,當(dāng)內(nèi)存的緩沖區(qū)達到一定的閥值的時候,就會將數(shù)據(jù)寫到磁盤之上古涧。
(5) Merge階段 :在 ReduceTask 遠程復(fù)制數(shù)據(jù)的同時垂券,會在后臺開啟兩個線程對內(nèi)存到本地的數(shù)據(jù)文件進行合并操作。
(6)Sort階段 :在對數(shù)據(jù)進行合并的同時羡滑,會進行排序操作菇爪,由于 MapTask 階段已經(jīng)對數(shù)據(jù)進行了局部的排序,ReduceTask 只需保證 Copy 的數(shù)據(jù)的最終整體有效性即可柒昏。Shuffle 中的緩沖區(qū)大小會影響到mapreduce 程序的執(zhí)行效率凳宙,原則上說,緩沖區(qū)越大职祷,磁盤io的次數(shù)越少氏涩,執(zhí)行速度就越快。

4有梆、MapReduce案例

以下各種案例說明以及代碼都在github中(hailiang9615/big-data-Classic-contact-case-: 分享大數(shù)據(jù)計算框架的各種經(jīng)典小型案例 (github.com))

4.1是尖、MapReduce案例---流量統(tǒng)計案

(1)、需求一:統(tǒng)計求和

統(tǒng)計每個手機號的上行數(shù)據(jù)包總和泥耀,下行數(shù)據(jù)包總和饺汹,上行總流量之和,下行總流量之和痰催。

(2)兜辞、需求二:上行流量倒序排序(遞減排序)

基于上面分析的結(jié)果進行上行流量遞減排序迎瞧。

(3)、需求三:根據(jù)手機號碼分區(qū)

基于需求一的結(jié)果逸吵,給手機號進行分區(qū)凶硅。

4.2、Reduce端實現(xiàn)SQL語句的Join操作

4.3扫皱、Map端實現(xiàn)SQL語句的Join操作

4.4足绅、尋找共同好友

4.5、自定義InputFormat

4.6啸罢、自定義outputFormat

4.7编检、自定義分組求取topN

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市扰才,隨后出現(xiàn)的幾起案子允懂,更是在濱河造成了極大的恐慌,老刑警劉巖衩匣,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蕾总,死亡現(xiàn)場離奇詭異,居然都是意外死亡琅捏,警方通過查閱死者的電腦和手機生百,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來柄延,“玉大人蚀浆,你說我怎么就攤上這事∷寻桑” “怎么了市俊?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長滤奈。 經(jīng)常有香客問我摆昧,道長,這世上最難降的妖魔是什么蜒程? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任绅你,我火速辦了婚禮,結(jié)果婚禮上昭躺,老公的妹妹穿的比我還像新娘忌锯。我一直安慰自己,他們只是感情好领炫,可當(dāng)我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布汉规。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪针史。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天碟狞,我揣著相機與錄音啄枕,去河邊找鬼。 笑死族沃,一個胖子當(dāng)著我的面吹牛频祝,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播脆淹,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼常空,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了盖溺?” 一聲冷哼從身側(cè)響起漓糙,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎烘嘱,沒想到半個月后昆禽,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡蝇庭,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年醉鳖,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片哮内。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡盗棵,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出北发,到底是詐尸還是另有隱情纹因,我是刑警寧澤,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布鲫竞,位于F島的核電站辐怕,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏从绘。R本人自食惡果不足惜寄疏,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望僵井。 院中可真熱鬧陕截,春花似錦、人聲如沸批什。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至乳规,卻和暖如春形葬,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背暮的。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工笙以, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人冻辩。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓猖腕,卻偏偏與公主長得像,于是被迫代替她去往敵國和親恨闪。 傳聞我的和親對象是個殘疾皇子倘感,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容