hadoop學(xué)習(xí)二

九. MapReduce 案例

9.1 統(tǒng)計(jì)各個(gè)手機(jī)號的上傳和下載流量總和

數(shù)據(jù)展示:

1363157985066   13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  24  27  2481    24681   200
1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4    4   0   264 0   200
1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99  2   4   132 1512    200
1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4    4   0   240 0   200
1363157985066   13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  24  27  2481    24681   200
1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4    4   0   264 0   200
1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99  2   4   132 1512    200
1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4    4   0   240 0   200

數(shù)據(jù)解釋:

# 每行數(shù)據(jù)的第二列數(shù)據(jù)是手機(jī)號,倒數(shù)第三列表示上行流量,倒數(shù)第二列表示下行流量

輸出格式要求:

# 手機(jī)號 上行流量    下行流量    總流量

最終統(tǒng)計(jì)結(jié)果為:

13726230503  上傳流量:4962  下載流量:49362  總數(shù)據(jù)流量:  54324
13826544101  上傳流量:528  下載流量:0  總數(shù)據(jù)流量:  528
13926251106  上傳流量:480  下載流量:0  總數(shù)據(jù)流量:  480
13926435656  上傳流量:264  下載流量:3024  總數(shù)據(jù)流量:  3288

創(chuàng)建數(shù)據(jù)文件上傳到HDFS文件系統(tǒng)中

[root@hadoop5 ~]# vim access.log
[root@hadoop5 ~]# cat access.log
                1363157985066   13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  24  27  2481    24681   200
                1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4    4   0   264 0   200
                1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99  2   4   132 1512    200
                1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4    4   0   240 0   200
                1363157985066   13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  24  27  2481    24681   200
                1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4    4   0   264 0   200
                1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99  2   4   132 1512    200
                1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4    4   0   240 0   200
[root@hadoop5 ~]# hdfs dfs -mkdir -p /accesslog
[root@hadoop5 ~]# hdfs dfs -put access.log /accesslog
image-20191221171759635
image-20191221171920552

編寫mapreduce的job作業(yè)完成統(tǒng)計(jì)

//統(tǒng)計(jì)手機(jī)流量
public class AccessLogJob extends Configured implements Tool {

    private static Logger logger = Logger.getLogger(AccessLogJob.class);

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new AccessLogJob(),args);
    }

    @Override
    public int run(String[] strings) throws Exception {
        //創(chuàng)建job作業(yè)
        Job job = Job.getInstance(getConf(), "access-log");
        job.setJarByClass(AccessLogJob.class);

        //設(shè)置InputFormate
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("/accesslog/access.log"));

        //設(shè)置map
        job.setMapperClass(AccessLogMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //shuffle  無須設(shè)置 自動(dòng)完成

        //設(shè)置reduce
        job.setReducerClass(AccessLogReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //設(shè)置Output Format
        job.setOutputFormatClass(TextOutputFormat.class);
        Path res = new Path("/accesslog/res");
        FileSystem fileSystem = FileSystem.get(getConf());
        if(fileSystem.exists(res)) {
            fileSystem.delete(res,true);
        }
        TextOutputFormat.setOutputPath(job, res);

        //提交job作業(yè)
        boolean status = job.waitForCompletion(true);
        System.out.println("本次作業(yè)執(zhí)行狀態(tài) = " + status);

        return 0;
    }



    public static class AccessLogMap extends Mapper<LongWritable, Text,Text,Text>{

        @Override //參數(shù)1:行首字母偏移量  參數(shù)2:當(dāng)前row數(shù)據(jù) 參數(shù)3:map輸出上下文
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] values = value.toString().split("\t");
            //輸出key 為手機(jī)號  值為: 每個(gè)手機(jī)號"上傳-下載流量"格式文本
            context.write(new Text(values[1]),new Text(values[values.length-3]+"-"+values[values.length-2]));

            logger.info("手機(jī)號: "+values[1]+"  流量格式:"+values[values.length-3]+"-"+values[values.length-2]);
        }
    }
    //reduce
    public static class AccessLogReduce extends Reducer<Text,Text,Text,Text>{
        @Override //參數(shù)1:map的key  參數(shù)2:相當(dāng)key的數(shù)組   參數(shù)3:Reduce輸出的上下文
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            int uploadData = 0; //保存上傳流量
            int downData = 0;   //保存下載流量
            for (Text value : values) {
                String[] datas = value.toString().split("-");
                uploadData+= Integer.valueOf(datas[0]);
                downData+= Integer.valueOf(datas[1]);
            }
            int total = uploadData + downData;//保存總流量

            //輸出
            context.write(key,new Text(" 上傳流量:"+uploadData+"  下載流量:"+downData+"  總數(shù)據(jù)流量:  "+total));
            logger.info("手機(jī)號: "+key+" 上傳流量:"+uploadData+"  下載流量:"+downData+"  總數(shù)據(jù)流量:  "+total);
        }
    }

}

運(yùn)行mapreduce

image-20191221183828609

查看結(jié)果

[root@hadoop5 ~]# hdfs dfs -text /accesslog/res/part-r-00000
13726230503  上傳流量:4962  下載流量:49362  總數(shù)據(jù)流量:  54324
13826544101  上傳流量:528  下載流量:0  總數(shù)據(jù)流量:  528
13926251106  上傳流量:480  下載流量:0  總數(shù)據(jù)流量:  480
13926435656  上傳流量:264  下載流量:3024  總數(shù)據(jù)流量:  3288
image-20191221184603660

9.2 自定義MapReduce中數(shù)據(jù)類型

MapReduce的執(zhí)行過程,無論是map階段還是Reduce階段都會跨JVM,通過網(wǎng)絡(luò)通信傳遞數(shù)據(jù),索引對于傳遞數(shù)據(jù)必須實(shí)現(xiàn)序列化,為此Hadoop的MapReduce模型對現(xiàn)有的數(shù)據(jù)類型進(jìn)行了近一步的包裝,如之前用到的IntWriteableLongWritableText袁波、 DoubleWritable瓢捉、NullWritable煎殷。如果處理簡單計(jì)算有這些基礎(chǔ)類型就夠了,但是如果需要復(fù)雜結(jié)果是這些數(shù)據(jù)類型遠(yuǎn)遠(yuǎn)是不夠的,因此我們需要根據(jù)實(shí)際情況自定義數(shù)據(jù)類型!

9.2.1 查看提供已知數(shù)據(jù)類型類圖

image-20191225122724269

通過類圖得知hadoop提供的數(shù)據(jù)類型都間接實(shí)現(xiàn)了:WirtableComparable 罚屋。直接實(shí)現(xiàn)WritableComparable接口,因此我們自定義類型也需要實(shí)現(xiàn)相應(yīng)的接口

9.2.2 查看WriteComparable接口

image-20191221202600106

通過查看源碼得知自定義的數(shù)據(jù)類型需要實(shí)現(xiàn)類中 wirte璧瞬、readFilescompareTo麻裁、hashCodeequals箍镜、toString等相關(guān)方法源祈。

9.2.3 根據(jù)之前流量案例定義自定義類型

開發(fā)自定義Writable類型

//自定義Writable類型
public class AccessLogWritable implements WritableComparable<AccessLogWritable> {
    private Integer upload;
    private Integer down;
    private Integer total;

    @Override
    public int compareTo(AccessLogWritable o) {
        return this.total-o.getTotal();
    }
        
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(upload);
        out.writeInt(down);
        out.writeInt(total);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.upload = in.readInt();
        this.down = in.readInt();
        this.total = in.readInt();
    }

    @Override
    public String toString() {
        return "統(tǒng)計(jì)結(jié)果{" +
                "上傳流量=" + upload +
                ", 下載流量=" + down +
                ",  上傳下載總流量=" + total +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        AccessLogWritable accessLogWritable = (AccessLogWritable) o;
        return Objects.equals(upload, accessLogWritable.upload) &&
                Objects.equals(down, accessLogWritable.down) &&
                Objects.equals(total, accessLogWritable.total);
    }

    @Override
    public int hashCode() {
        return Objects.hash(upload, down, total);
    }

    public Integer getUpload() {
        return upload;
    }

    public void setUpload(Integer upload) {
        this.upload = upload;
    }

    public Integer getDown() {
        return down;
    }

    public void setDown(Integer down) {
        this.down = down;
    }

    public Integer getTotal() {
        return total;
    }

    public void setTotal(Integer total) {
        this.total = total;
    }

    public AccessLogWritable() {
    }

    public AccessLogWritable(Integer upload, Integer down, Integer total) {
        this.upload = upload;
        this.down = down;
        this.total = total;
    }

    public AccessLogWritable(Integer upload, Integer down) {
        this.upload = upload;
        this.down = down;
    }
}

注意:write的順序和read的順序必須嚴(yán)格一致,讀的類型和寫的類型也必須完全一致

開發(fā)Job作業(yè)

public class AccessLogCustomerTypeJob extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new AccessLogCustomerTypeJob(),args);
    }
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), "customer-type-job");
        job.setJarByClass(AccessLogCustomerTypeJob.class);

        //設(shè)置input format
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("/accesslog/access.log"));

        //設(shè)置map
        job.setMapperClass(AccessLogCustomerTypeMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(AccessLogWritable.class);

        //shuffle 無須設(shè)置 自動(dòng)處理

        //設(shè)置reduce
        job.setReducerClass(AccessLogCustomerTypeReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(AccessLogWritable.class);

        //設(shè)置Output Format
        job.setOutputFormatClass(TextOutputFormat.class);
        Path res = new Path("/accesslog/res2");
        FileSystem fileSystem = FileSystem.get(getConf());
        if(fileSystem.exists(res)){
            fileSystem.delete(res,true);
        }
        TextOutputFormat.setOutputPath(job, res);

        //提交作業(yè)
        boolean status = job.waitForCompletion(true);
        System.out.println("作業(yè)執(zhí)行狀態(tài):" + status);
        return 0;
    }


    public static class AccessLogCustomerTypeMap extends Mapper<LongWritable, Text,Text, AccessLogWritable>{

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] values = value.toString().split("\t");
            int upload = Integer.valueOf(values[values.length-3]);
            int down = Integer.valueOf(values[values.length-2]);
            context.write(new Text(values[1]),new AccessLogWritable(upload,down,0));
        }
    }

    public static class AccessLogCustomerTypeReduce extends Reducer<Text, AccessLogWritable,Text, AccessLogWritable>{
        @Override
        protected void reduce(Text key, Iterable<AccessLogWritable> values, Context context) throws IOException, InterruptedException {
            int upload =0;
            int down = 0;
            for (AccessLogWritable value : values) {
                upload += value.getUpload();
                down  += value.getDown();
            }
            context.write(key,new AccessLogWritable(upload,down,upload+down));
        }
    }
}

執(zhí)行job作業(yè)

[root@hadoop5 ~]# yarn jar hadoop_wordcount-1.0-SNAPSHOT.jar
19/12/20 10:33:57 INFO client.RMProxy: Connecting to ResourceManager at hadoop6/10.15.0.6:8032
19/12/20 10:34:00 INFO input.FileInputFormat: Total input files to process : 1
19/12/20 10:34:00 INFO mapreduce.JobSubmitter: number of splits:1
19/12/20 10:34:01 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
19/12/20 10:34:02 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1576780523481_0013
19/12/20 10:34:03 INFO impl.YarnClientImpl: Submitted application application_1576780523481_0013
19/12/20 10:34:03 INFO mapreduce.Job: The url to track the job: http://hadoop6:8088/proxy/application_1576780523481_0013/
19/12/20 10:34:03 INFO mapreduce.Job: Running job: job_1576780523481_0013
19/12/20 10:34:27 INFO mapreduce.Job: Job job_1576780523481_0013 running in uber mode : false
19/12/20 10:34:27 INFO mapreduce.Job:  map 0% reduce 0%
19/12/20 10:34:43 INFO mapreduce.Job:  map 100% reduce 0%
19/12/20 10:35:00 INFO mapreduce.Job:  map 100% reduce 100%
19/12/20 10:35:02 INFO mapreduce.Job: Job job_1576780523481_0013 completed successfully
image-20191221231217047

查看結(jié)果

[root@hadoop5 ~]#  hdfs dfs -text /accesslog/res2/part-r-00000
13726230503 統(tǒng)計(jì)結(jié)果{上傳流量=4962, 下載流量=49362,  上傳下載總流量=54324}
13826544101 統(tǒng)計(jì)結(jié)果{上傳流量=528, 下載流量=0,  上傳下載總流量=528}
13926251106 統(tǒng)計(jì)結(jié)果{上傳流量=480, 下載流量=0,  上傳下載總流量=480}
13926435656 統(tǒng)計(jì)結(jié)果{上傳流量=264, 下載流量=3024,  上傳下載總流量=3288}
image-20191221231328361

十. MapReduce的高級特性

10.1 MapRedcuce的數(shù)據(jù)清洗

10.1.1 數(shù)據(jù)清洗

所謂數(shù)據(jù)清洗指的是在復(fù)雜的數(shù)據(jù)格式中獲取我們需要的數(shù)據(jù)過程稱之為數(shù)據(jù)清洗,整個(gè)過程僅僅是將復(fù)雜數(shù)據(jù)中我們需要的數(shù)據(jù)清洗出來,不涉及任何的統(tǒng)計(jì)計(jì)算工作,如下圖展示過程就是數(shù)據(jù)清洗:

image-20191222105730220

10.1.2 數(shù)據(jù)清洗編程思路分析

image-20191222111700751

10.1.3 開發(fā)數(shù)據(jù)清洗

//開發(fā)數(shù)據(jù)清洗
public class DataCleanAccessLogJob extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new DataCleanAccessLogJob(),args);
    }

    @Override
    public int run(String[] args) throws Exception {
        //創(chuàng)建job作業(yè)
        Job job = Job.getInstance(getConf(), "data-clean-access-log-job");
        job.setJarByClass(DataCleanAccessLogJob.class);
        //設(shè)置input format
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("/accesslog/access.log"));

        //設(shè)置map
        job.setMapperClass(DataCleanAccessLogMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        //跳過shuffle 和 reduce
        job.setNumReduceTasks(0);

        //設(shè)置output format
        job.setOutputFormatClass(TextOutputFormat.class);
        Path res = new Path("/accesslog/cleandata");
        FileSystem fileSystem = FileSystem.get(getConf());
        if(fileSystem.exists(res)){
            fileSystem.delete(res,true);
        }
        TextOutputFormat.setOutputPath(job,res);


        //提交job
        boolean status = job.waitForCompletion(true);
        System.out.println("作業(yè)提交狀態(tài) = " + status);
        return 0;
    }

     //map階段
     public  static class DataCleanAccessLogMap extends Mapper<LongWritable, Text,Text, NullWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] values = value.toString().split("\t");
            Text keyout = new Text(values[1]+"\t"+values[6] + "\t" + values[7]);
            context.write(keyout, NullWritable.get());
        }
    }
    //沒有reduce階段
}

注意: 設(shè)置job.setNumReduceTasks(0);這句話本次的mapreduce就跳過了reduce階段的執(zhí)行

10.1.4 運(yùn)行作業(yè)

[root@hadoop5 ~]# yarn jar hadoop_wordcount-1.0-SNAPSHOT.jar
19/12/20 13:22:03 INFO client.RMProxy: Connecting to ResourceManager at hadoop6/10.15.0.6:8032
19/12/20 13:22:07 INFO input.FileInputFormat: Total input files to process : 1
19/12/20 13:22:07 INFO mapreduce.JobSubmitter: number of splits:1
19/12/20 13:22:08 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
19/12/20 13:22:09 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1576780523481_0018
19/12/20 13:22:10 INFO impl.YarnClientImpl: Submitted application application_1576780523481_0018
19/12/20 13:22:10 INFO mapreduce.Job: The url to track the job: http://hadoop6:8088/proxy/application_1576780523481_0018/
19/12/20 13:22:10 INFO mapreduce.Job: Running job: job_1576780523481_0018
19/12/20 13:22:36 INFO mapreduce.Job: Job job_1576780523481_0018 running in uber mode : false
19/12/20 13:22:36 INFO mapreduce.Job:  map 0% reduce 0%
19/12/20 13:22:51 INFO mapreduce.Job:  map 100% reduce 0%
19/12/20 13:22:53 INFO mapreduce.Job: Job job_1576780523481_0018 completed successfully
image-20191222113540419

10.1.5 查看運(yùn)行結(jié)果

image-20191222113701804
[root@hadoop5 ~]# hdfs dfs -text /accesslog/cleandata/part*
13726230503 2481    24681
13826544101 264 0
13926435656 132 1512
13926251106 240 0
13726230503 2481    24681
13826544101 264 0
13926435656 132 1512
13926251106 240 0
image-20191222113759032

十一. MapReduce的高級特性

11.1 MapReduce中Map的數(shù)量

# MapReduce運(yùn)行過程中Map的數(shù)量是由block所決定的: 
        也就是一個(gè)文件分為幾個(gè)block就是幾個(gè)map

注意:map的數(shù)量由block塊決定,也就意味著一旦文件確定根據(jù)默認(rèn)配置劃分block也將確定,所以我們沒有辦法在程序中手動(dòng)干預(yù)map的執(zhí)行數(shù)量

11.2 MapReduce中Reduce的數(shù)量

# Reduce的數(shù)量是可以在程序中手動(dòng)指定
        默認(rèn)數(shù)量為:  1個(gè) Reduce
        可以通過:    job.setNumReduceTasks(0);  0 就是沒有   數(shù)字是幾就是幾個(gè)

11.2.1 通過修改word count案例reduce數(shù)量比較不同

使用默認(rèn)reduce數(shù)量也就是1的執(zhí)行結(jié)果:

image-20191222121049721
image-20191222121133848

將reduce數(shù)量修改為多個(gè)這里修改為2個(gè),運(yùn)行查看結(jié)果

                //省略............
                //設(shè)置reduce 階段
        job.setReducerClass(WordCountReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //設(shè)置reduce數(shù)量
        job.setNumReduceTasks(2);
                //省略..........

運(yùn)行job作業(yè)

image-20191222121658125

查看結(jié)果

image-20191222121732060
image-20191222121849990

通過這里可以總結(jié)出:有幾個(gè)reduce就會生成幾個(gè)結(jié)果文件,多個(gè)reduce同時(shí)處理數(shù)據(jù)將原來一個(gè)reduce處理結(jié)果,分到了不同的reduce處理文件中,因此如果日后需要將所有結(jié)果數(shù)據(jù)匯總在一起之能設(shè)置一個(gè)reduce,如果想要將結(jié)果劃分到多個(gè)文件中可以設(shè)置多個(gè)reduce數(shù)量

11.2.2 為什么要設(shè)置多個(gè)reduce數(shù)量?

# 1.提高M(jìn)R的運(yùn)行效率,從而快速統(tǒng)計(jì)計(jì)算工作

11.2.3 多個(gè)Reduce如何去分配map中數(shù)據(jù)?

一旦設(shè)置了多個(gè)reduce,如何讓多個(gè)reduce均分map中統(tǒng)計(jì)的數(shù)據(jù),這里面還有一個(gè)分區(qū)(Partition)概念,一個(gè)Reduce會形成一個(gè)分區(qū),默認(rèn)使用的是HashPartitioner會根據(jù)map輸出key做hash運(yùn)算去決定map中輸出數(shù)據(jù)交給那個(gè)reduce處理

@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
  public void configure(JobConf job) {}
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K2 key, V2 value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

image-20191222130057898

11.2.4 如何自定義分區(qū)(Partitoner)

自定義分區(qū):可以根據(jù)業(yè)務(wù)規(guī)則將統(tǒng)計(jì)結(jié)果劃分到不同分區(qū)中

數(shù)據(jù)格式

1363157985066   13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  24  27  2481    24681   200
1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4    4   0   264 0   200
1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99  2   4   132 1512    200
1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4    4   0   240 0   200
1363157985066   13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  24  27  2481    24681   200
1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4    4   0   264 0   200
1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99  2   4   132 1512    200
1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4    4   0   240 0   200

要求

# 統(tǒng)計(jì)流量,并將不同省份數(shù)據(jù)的統(tǒng)計(jì)結(jié)果放在不同文件中

自定義分區(qū)

//自定義分區(qū) 輸入數(shù)據(jù)map端結(jié)果
public class ProvincePartitioner extends Partitioner<Text,AccessLogWritable> {
    //根據(jù)業(yè)務(wù)規(guī)則將不同省份結(jié)果劃分到不同分區(qū)
    private static  HashMap<String,Integer> provincePartitioners =  new HashMap<>();
    static{
        provincePartitioners.put("136",0);
        provincePartitioners.put("137",1);
        provincePartitioners.put("138",2);
        provincePartitioners.put("139",3);
    }

    // 返回分區(qū)號給那個(gè)reduce
    @Override
    public int getPartition(Text key, AccessLogWritable accessLogWritable, int numPartitions) {
        String keyPrefix = key.toString().substring(0, 3);
        Integer partionId = provincePartitioners.get(keyPrefix);
        return partionId ==null?4: partionId;
    }
}

在job作業(yè)中指定分區(qū)

//設(shè)置分區(qū)
job.setPartitionerClass(ProvincePartitioner.class);
//設(shè)置reduce數(shù)量
job.setNumReduceTasks(5);

運(yùn)行job作業(yè)

image-20191222140811595

查看結(jié)果

image-20191222140714981
image-20191222140953746

11.3 計(jì)數(shù)器(Counter)

計(jì)數(shù)器:顧名思義就是用來對map運(yùn)行數(shù)量和reduce運(yùn)行數(shù)量進(jìn)行統(tǒng)計(jì)的

11.3.1 在map中使用計(jì)數(shù)器

public static class AccessLogCustomerTypeMap extends Mapper<LongWritable, Text,Text, AccessLogWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //計(jì)數(shù)器
            Counter access_map = context.getCounter("map-group", "access_map");
            access_map.increment(1);
            String[] values = value.toString().split("\t");
            int upload = Integer.valueOf(values[values.length-3]);
            int down = Integer.valueOf(values[values.length-2]);
            context.write(new Text(values[1]),new AccessLogWritable(upload,down,0));
        }
    }

11.3.2 在reduce中使用計(jì)數(shù)器

public static class AccessLogCustomerTypeReduce extends Reducer<Text, AccessLogWritable,Text, AccessLogWritable>{
        @Override
        protected void reduce(Text key, Iterable<AccessLogWritable> values, Context context) throws IOException, InterruptedException {
            //計(jì)數(shù)器
            Counter access_map = context.getCounter("reduce-group", "access_reduce");
            access_map.increment(1);
            int upload =0;
            int down = 0;
            for (AccessLogWritable value : values) {
                upload += value.getUpload();
                down  += value.getDown();
            }
            context.write(key,new AccessLogWritable(upload,down,upload+down));
        }
    }
image-20191222155105711

11.4 Combiner 合并

Combiner合并:又稱之為map端的reduce,主要是通過對map局部的數(shù)據(jù)先進(jìn)行一次reduce,從而來減少map端輸出數(shù)據(jù)頻繁發(fā)送給Reduce處理時(shí)所帶來的網(wǎng)絡(luò)壓力問題。通過這種提前對map輸出做一次局部reduce,這樣既可以減輕網(wǎng)絡(luò)壓力,又能提高效率色迂。在mapreduce編程模型中默認(rèn)是關(guān)閉的香缺。

11.4.1 開啟Combiner

.....................               
//設(shè)置map
job.setMapperClass(AccessLogCustomerTypeMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(AccessLogWritable.class);

//shuffle 無須設(shè)置 自動(dòng)處理

//設(shè)置Combiner
job.setCombinerClass(AccessLogCustomerTypeReduce.class);

//設(shè)置分區(qū)
job.setPartitionerClass(ProvincePartitioner.class);
...............

11.4.2 運(yùn)行設(shè)置Combiner的job

沒有設(shè)置Combiner時(shí):

image-20191222160857054

設(shè)置Combiner時(shí):

image-20191222160936568

十二. Job作業(yè)原理分析

12.1 Input Format 原理解析

12.1.1 類圖和源碼

查看Text InputFormat的類圖

image-20191222163038681

注意:通過類圖發(fā)現(xiàn)最頂層父類為Input Format這個(gè)類

查看Input Formt 源碼:

image-20191222163205033
/** 
   * Logically split the set of input files for the job.  
   * 
   * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
   * for processing.</p>
   *
   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
   * input files are not physically split into chunks. For e.g. a split could
   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
   * also creates the {@link RecordReader} to read the {@link InputSplit}.
   * 
   * @param context job configuration.
   * @return an array of {@link InputSplit}s for the job.
   */
  public abstract 
    List<InputSplit> getSplits(JobContext context
                               ) throws IOException, InterruptedException;
  
  /**
   * Create a record reader for a given split. The framework will call
   * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
   * the split is used.
   * @param split the split to be read
   * @param context the information about the task
   * @return a new record reader
   * @throws IOException
   * @throws InterruptedException
   */
  public abstract 
    RecordReader<K,V> createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, 
                                                 InterruptedException;
  • getSplits方法:是用來根據(jù)block計(jì)算邏輯上的切片,每一個(gè)邏輯切片對應(yīng)一個(gè)map操作
  • createRecordReader方法:用來將切片數(shù)據(jù)封裝成一行(LongWritable,Text)

12.1.2 Input Format切片的計(jì)算方式

image-20191222165455715
image-20191228151116890

說明:

? 1. 當(dāng)一個(gè)文件小于128M時(shí)一定會被分成一個(gè)邏輯切片,Block塊與Split(切片)一一對應(yīng)。

? 2.當(dāng)一個(gè)文件大于128M,剩余大小大于切片的1.1倍,Block塊與Split(切片)一一對應(yīng)歇僧。反之如果一個(gè)文件大于128M,剩余大小小于切片的1.1倍,此時(shí)將劃分為一個(gè)切片赫悄。

image-20191222170938132

12.1.3 將數(shù)據(jù)封裝成key和value

image-20191222172100874

image-20191222172239717


12.2 Map源碼原理分析

這里只給出了核心方法:

 /**
   * Called once for each key/value pair in the input split. Most applications
   * should override this, but the default is the identity function.
   */
  @SuppressWarnings("unchecked")
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }

  /**
   * Called once at the end of the task.
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }
  
  /**
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }
image-20191222172609418

12.3 Reduce源碼分析

查看reduce的源碼,這里只羅列核心代碼:

**
   * This method is called once for each key. Most applications will define
   * their reduce class by overriding this method. The default implementation
   * is an identity function.
   */
  @SuppressWarnings("unchecked")
  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }

  /**
   * Called once at the end of the task.
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }

  /**
   * Advanced application writers can use the 
   * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
   * control how the reduce task works.
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKey()) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
      cleanup(context);
    }
  }
}
image-20191228141029002

12.4 OutputFormat 源碼分析

查看源類圖的結(jié)構(gòu):

image-20191228141731357

注意:和 InputFormat基本是類似的,最頂層父類為Output Format:

TextOutputFormat部分源碼如下:

/** An {@link OutputFormat} that writes plain text files. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
  public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
  protected static class LineRecordWriter<K, V>
    extends RecordWriter<K, V> {
    private static final String utf8 = "UTF-8";
    private static final byte[] newline;
    static {
      try {
        newline = "\n".getBytes(utf8);
      } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
      }
    }

    protected DataOutputStream out;
    private final byte[] keyValueSeparator;

    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
      this.out = out;
      try {
        this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
      } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
      }
    }

    public LineRecordWriter(DataOutputStream out) {
      this(out, "\t");
    }
  }
image-20191228142300615

通過源碼得知默認(rèn)的key和value輸出分割符為"tab鍵"也就是這里的"\t"

12.5 Shuffle的分析

shuffle階段是一個(gè)整體叫法: 其實(shí)又分為Map端的shuffleReduce端的Shuffle

12.5.1 Map端shuffle

image-20191228200603195

12.5.2 Reduce端的shuffle

image-20191228200805109

12.6 MapReduce整體運(yùn)行原理

image-20191228201000469
#1.計(jì)算切片
    有幾個(gè)切片就有幾個(gè)map task
#2.環(huán)形緩存區(qū)
    經(jīng)過map函數(shù)的邏輯處理后的數(shù)據(jù)輸出之后,會通過OutPutCollector收集器將數(shù)據(jù)收集到環(huán)形緩存區(qū)保存馏慨。
    環(huán)形緩存區(qū)的大小默認(rèn)為100M,當(dāng)保存的數(shù)據(jù)達(dá)到80%時(shí),就將緩存區(qū)的數(shù)據(jù)溢出到磁盤上保存埂淮。
#3.溢出
    環(huán)形緩存區(qū)的數(shù)據(jù)達(dá)到其容量的80%時(shí)就會溢出到磁盤上進(jìn)行保存,在此過程中,程序會對數(shù)據(jù)進(jìn)行分區(qū)(默認(rèn)HashPartition)和排序(默認(rèn)根據(jù)key進(jìn)行快排)
    緩存區(qū)不斷溢出的數(shù)據(jù)形成多個(gè)小文件
#4.合并
    溢出的多個(gè)小文件各個(gè)區(qū)合并在一起(0區(qū)和0區(qū)合并成一個(gè)0區(qū)),形成大文件
    通過歸并排序保證區(qū)內(nèi)的數(shù)據(jù)有序
#5.shuffle
    從過程2到過程7之間,即map任務(wù)和reduce任務(wù)之間的數(shù)據(jù)流稱為shuffle(混洗),而過程5最能體現(xiàn)出混洗這一概念。一般情況下写隶,一個(gè)reduce任務(wù)的輸入數(shù)據(jù)來自與多個(gè)map任務(wù)倔撞,多個(gè)reduce任務(wù)的情況下就會出現(xiàn)如過程5所示的,
    每個(gè)reduce任務(wù)從map的輸出數(shù)據(jù)中獲取屬于自己的那個(gè)分區(qū)的數(shù)據(jù)慕趴。
#6.合并
    運(yùn)行reducetask的節(jié)點(diǎn)通過過程5痪蝇,將來自多個(gè)map任務(wù)的屬于自己的分區(qū)數(shù)據(jù)下載到本地磁盤工作目錄。這多個(gè)分區(qū)文件通過歸并排序合并成大文件冕房,并根據(jù)key值分好組(key值相同的躏啰,value值會以迭代器的形式組在一起)。
#7.reducetask
    reducetask從本地工作目錄獲取已經(jīng)分好組并且排好序的數(shù)據(jù)耙册,將數(shù)據(jù)進(jìn)行reduce函數(shù)中的邏輯處理给僵。
#8.輸出
    每個(gè)reducetask輸出一個(gè)結(jié)果文件。

十三. MapReduce與Yarn

13.1 Job作業(yè)提交過程

img
客戶端的配置信息mapreduce.framework.name為yarn時(shí)详拙,客戶端會啟動(dòng)YarnRunner(yarn的客戶端程序)帝际,并將mapreduce作業(yè)提交給yarn平臺處理。

1.向ResourceManager請求運(yùn)行一個(gè)mapreduce程序饶辙。

2.ResourceManager返回hdfs地址蹲诀,告訴客戶端將作業(yè)運(yùn)行相關(guān)的資源文件上傳到hdfs。

3.客戶端提交mr程序運(yùn)行所需的文件(包括作業(yè)的jar包弃揽,作業(yè)的配置文件脯爪,分片信息等)到hdfs上。

4.作業(yè)相關(guān)信息提交完成后矿微,客戶端用過調(diào)用ResourcrManager的submitApplication()方法提交作業(yè)痕慢。

5.ResourceManager將作業(yè)傳遞給調(diào)度器,調(diào)度器的默認(rèn)調(diào)度策略是先進(jìn)先出冷冗。

6.調(diào)度器尋找一臺空閑的節(jié)點(diǎn)守屉,并在該節(jié)點(diǎn)隔離出一個(gè)容器(container),容器中分配了cpu蒿辙,內(nèi)存等資源拇泛,并啟動(dòng)MRAppmaster進(jìn)程滨巴。

7.MRAppmaster根據(jù)需要運(yùn)行多少個(gè)map任務(wù),多少個(gè)reduce任務(wù)向ResourceManager請求資源俺叭。

8.ResourceManager分配相應(yīng)數(shù)量的容器恭取,并告知MRAppmaster容器在哪。

9.MRAppmaster啟動(dòng)maptask熄守。

10.maptask從HDFS獲取分片數(shù)據(jù)執(zhí)行map邏輯蜈垮。

11.map邏輯執(zhí)行結(jié)束后,MRAppmaster啟動(dòng)reducetask裕照。

12.reducetask從maptask獲取屬于自己的分區(qū)數(shù)據(jù)執(zhí)行reduce邏輯攒发。

13.reduce邏輯結(jié)束后將結(jié)果數(shù)據(jù)保存到HDFS上。

14.mapreduce作業(yè)結(jié)束后晋南,MRAppmaster通知ResourceManager結(jié)束自己惠猿,讓ResourceManager回收所有資源。

十四. HA的hadoop集群搭建

14.1 集群規(guī)劃

# 集群規(guī)劃
    10.15.0.20    zk     zknodes  通過一個(gè)節(jié)點(diǎn)充當(dāng)整個(gè)集群
    
    10.15.0.22    hadoop22    NameNode (active)  & ZKFC
    10.15.0.23    hadoop23    NameNode (standby) & ZKFC
    10.15.0.24    hadoop24    ResourceManager(active) 
    10.15.0.25    hadoop25    ResourceManager(standby)
    10.15.0.26    hadoop26    DataNode & JournalNode  & NodeManager
    10.15.0.27    hadoop27    DataNode & JournalNode  & NodeManager
    10.15.0.28    hadoop28    DataNode & JournalNode  & NodeManager

# 克隆機(jī)器做準(zhǔn)備工作:
    0.修改ip地址為上述ip
    1.修改主機(jī)名/etc/hostsname為上述對應(yīng)主機(jī)名  修改完必須重新啟動(dòng)
    2.配置主機(jī)名ip地址映射/etc/hosts文件并同步所有節(jié)點(diǎn)
    10.15.0.20 zk
    10.15.0.22 hadoop22
    10.15.0.23 hadoop23
    10.15.0.24 hadoop24
    10.15.0.25 hadoop25
    10.15.0.26 hadoop26
    10.15.0.27 hadoop27
    10.15.0.28 hadoop28
    3.所有節(jié)點(diǎn)安裝jdk并配置環(huán)境變量
    4.關(guān)閉所有機(jī)器的網(wǎng)絡(luò)防火墻配置
        systemctl stop firewalld
        systemctl disable firewalld
    5.所有節(jié)點(diǎn)安裝centos7.x搭建集群的依賴
        yum install psmisc -y
    6.配置ssh免密登錄
        hadoop22 生成ssh-keygen  然后ssh-copy-id 到 hadoop22~~hadoop28 上每一個(gè)節(jié)點(diǎn)
        hadoop23 生成ssh-keygen  然后ssh-copy-id 到 hadoop22~~hadoop28 上每一個(gè)節(jié)點(diǎn)
        hadoop24 生成ssh-keygen  然后ssh-copy-id 到 hadoop22~~hadoop28 上每一個(gè)節(jié)點(diǎn)
        hadoop25 生成ssh-keygen  然后ssh-copy-id 到 hadoop22~~hadoop28 上每一個(gè)節(jié)點(diǎn)

14.2 搭建zk集群

# 1.安裝zk安裝包
    [root@zk ~]# tar -zxvf zookeeper-3.4.12.tar.gz

# 2.準(zhǔn)備zk的數(shù)據(jù)文件夾
    [root@zk ~]# mkdir zkdata1 zkdata2 zkdata3

# 3.在每個(gè)數(shù)據(jù)文件夾中準(zhǔn)備集群唯一標(biāo)識文件myid
    [root@zk ~]# echo "1" >> zkdata1/myid
    [root@zk ~]# echo "2" >> zkdata2/myid 
    [root@zk ~]# echo "3" >> zkdata3/myid 

# 4.在每個(gè)數(shù)據(jù)文件夾中準(zhǔn)備zk的配置文件zoo.cfg
 
    [root@zk ~]# vim /root/zkdata1/zoo.cfg  
    tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/root/zkdata1
    clientPort=3001
    server.1=zk:3002:3003
    server.2=zk:4002:4003
    server.3=zk:5002:5003

  [root@zk ~]# vim /root/zkdata2/zoo.cfg  
      tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/root/zkdata2
    clientPort=4001
    server.1=zk:3002:3003
    server.2=zk:4002:4003
    server.3=zk:5002:5003

    [root@zk ~]# vim /root/zkdata3/zoo.cfg  
      tickTime=2000
    initLimit=10
    syncLimit=5
    dataDir=/root/zkdata3
    clientPort=5001
    server.1=zk:3002:3003
    server.2=zk:4002:4003
    server.3=zk:5002:5003

# 5.進(jìn)入zk安裝目錄bin目錄執(zhí)行如下命令啟動(dòng)zk集群:
    [root@zk bin]# ./zkServer.sh start /root/zkdata1/zoo.cfg
    [root@zk bin]# ./zkServer.sh start /root/zkdata2/zoo.cfg
    [root@zk bin]# ./zkServer.sh start /root/zkdata3/zoo.cfg

# 6.進(jìn)入zk安裝目錄bin目錄執(zhí)行如下命令查看集群狀態(tài)
        ./zkServer.sh status /root/zkdata1/zoo.cfg
        ./zkServer.sh status /root/zkdata2/zoo.cfg
        ./zkServer.sh status /root/zkdata3/zoo.cfg

14.3 搭建hadoop的HA集群

# 1.在hadoop22--hadoop28上安裝hadoop安裝包
       tar -zxf hadoop-2.9.2.tar.gz 
# 2.在hadoop22--hadoop28機(jī)器上配置hadoop環(huán)境變量

# 3.在hadoop22節(jié)點(diǎn)上配置hadoop-env.sh文件
        export JAVA_HOME=/usr/java/jdk1.8.0_171-amd64
# 4.在hadoop22節(jié)點(diǎn)上配置core-site.xml文件
        [root@hadoop22 ~]# vim hadoop-2.9.2/etc/hadoop/core-site.xml 
<!--hdfs主要入口不再是一個(gè)具體機(jī)器而是一個(gè)虛擬的名稱 -->
<property>
  <name>fs.defaultFS</name>
  <value>hdfs://ns</value>
</property>
<!-- hadoop臨時(shí)目錄位置 -->
<property>
  <name>hadoop.tmp.dir</name>
  <value>/root/hadoop-2.9.2/data</value>
</property>
<!--zk集群的所有節(jié)點(diǎn)-->
<property>
    <name>ha.zookeeper.quorum</name>
  <value>zk:3001,zk:4001,zk:5001</value>
</property>
# 5.在hadoop2節(jié)點(diǎn)上配置hdfs-site.xml文件 
    [root@hadoop22 ~]# vim hadoop-2.9.2/etc/hadoop/hdfs-site.xml 
<!--指定hdfs的nameservice為ns负间,需要和core-site.xml中的保持一致 -->
      <property>
          <name>dfs.nameservices</name>
          <value>ns</value>
      </property>
      <!-- ns下面有兩個(gè)NameNode偶妖,分別是nn1,nn2 -->
      <property>
          <name>dfs.ha.namenodes.ns</name>
          <value>nn1,nn2</value>
      </property>
        <!-- nn1的RPC通信地址 -->
      <property>
          <name>dfs.namenode.rpc-address.ns.nn1</name>
          <value>hadoop22:9000</value>
      </property>
      <!-- nn1的http通信地址 -->
      <property>
          <name>dfs.namenode.http-address.ns.nn1</name>
          <value>hadoop22:50070</value>
      </property>
    <!-- nn2的RPC通信地址 -->
      <property>
          <name>dfs.namenode.rpc-address.ns.nn2</name>
          <value>hadoop23:9000</value>
      </property>
      <!-- nn2的http通信地址 -->
      <property>
          <name>dfs.namenode.http-address.ns.nn2</name>
          <value>hadoop23:50070</value>
      </property>

    <!-- 指定NameNode的元數(shù)據(jù)在JournalNode上的存放位置 -->
    <property>
        <name>dfs.namenode.shared.edits.dir</name>
        <value>qjournal://hadoop26:8485;hadoop27:8485;hadoop28:8485/ns</value>
    </property>
<!-- 指定JournalNode在本地磁盤存放數(shù)據(jù)的位置 -->
    <property>
        <name>dfs.journalnode.edits.dir</name>
        <value>/root/journal</value>
    </property>
    <!-- 開啟NameNode故障時(shí)自動(dòng)切換 -->
    <property>
        <name>dfs.ha.automatic-failover.enabled</name>
        <value>true</value>
    </property>
    <!-- 配置失敗自動(dòng)切換實(shí)現(xiàn)方式 -->
    <property>
        <name>dfs.client.failover.proxy.provider.ns</name>
        <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
    </property>
    <!-- 配置隔離機(jī)制政溃,如果ssh是默認(rèn)22端口趾访,value直接寫sshfence即可 -->
    <property>
        <name>dfs.ha.fencing.methods</name>
        <value>sshfence</value>
    </property>
    <!-- 使用隔離機(jī)制時(shí)需要ssh免登陸 -->
    <property>
        <name>dfs.ha.fencing.ssh.private-key-files</name>
        <value>/root/.ssh/id_rsa</value>
    </property>
# 6.在hadoop2節(jié)點(diǎn)上配置yarn-site.xml文件
<!-- 開啟RM高可用 -->
<property>
  <name>yarn.resourcemanager.ha.enabled</name>
  <value>true</value>
</property>
<!-- 指定RM的cluster id -->
<property>
  <name>yarn.resourcemanager.cluster-id</name>
  <value>yrc</value>
</property>
<!-- 指定RM的名字 -->
<property>
  <name>yarn.resourcemanager.ha.rm-ids</name>
  <value>rm1,rm2</value>
</property>
<!-- 分別指定RM的地址 -->
<property>
  <name>yarn.resourcemanager.hostname.rm1</name>
  <value>hadoop24</value>
</property>
<property>
  <name>yarn.resourcemanager.hostname.rm2</name>
  <value>hadoop25</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm1</name>
  <value>hadoop24:8088</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm2</name>
  <value>hadoop25:8088</value>
</property>
<!-- 指定zk集群地址 -->
<property>
  <name>yarn.resourcemanager.zk-address</name>
  <value>zk:3001,zk:4001,zk:5001</value>
</property>
<property>
  <name>yarn.nodemanager.aux-services</name>
  <value>mapreduce_shuffle</value>
</property>
# 7.在hadoop2節(jié)點(diǎn)上配置mapred-site.xml文件,默認(rèn)不存在需要復(fù)制
    [root@hadoop22 ~]# cp hadoop-2.9.2/etc/hadoop/mapred-site.xml.template  hadoop-2.9.2/etc/hadoop/mapred-site.xml
    [root@hadoop22 ~]# vim hadoop-2.9.2/etc/hadoop/mapred-site.xml
<!-- 指定mr框架為yarn方式 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>

# 8.在hadoop2節(jié)點(diǎn)上配置slaves文件
    [root@hadoop22 ~]# vim hadoop-2.9.2/etc/hadoop/slaves 
    hadoop26
    hadoop27
    hadoop28

# 9.同步集群配置文件
    scp -r hadoop-2.9.2/etc/hadoop/ hadoop23:/root/hadoop-2.9.2/etc/
    scp -r hadoop-2.9.2/etc/hadoop/ hadoop24:/root/hadoop-2.9.2/etc/
    scp -r hadoop-2.9.2/etc/hadoop/ hadoop25:/root/hadoop-2.9.2/etc/
    scp -r hadoop-2.9.2/etc/hadoop/ hadoop26:/root/hadoop-2.9.2/etc/
    scp -r hadoop-2.9.2/etc/hadoop/ hadoop27:/root/hadoop-2.9.2/etc/
    scp -r hadoop-2.9.2/etc/hadoop/ hadoop28:/root/hadoop-2.9.2/etc/

# 10.啟動(dòng)HDFS的高可用
    1.在任意NameNode上格式化ZK
        hdfs zkfc -formatZK

    2.在hadoop26 hadoop27 hadoop28啟動(dòng)journal node
        [root@hadoop26 ~]# hadoop-daemon.sh start journalnode
        [root@hadoop27 ~]# hadoop-daemon.sh start journalnode
        [root@hadoop28 ~]# hadoop-daemon.sh start journalnode

    3.在活躍的NameNode節(jié)點(diǎn)上執(zhí)行格式化
        [root@hadoop22 ~]# hdfs namenode -format ns

    4.在NameNode上啟動(dòng)hdfs集群
        [root@hadoop22 ~]# start-dfs.sh

    5.在standby的NameNode上執(zhí)行
        [root@hadoop23 ~]# hdfs namenode -bootstrapStandby

    6.在standby的NameNode執(zhí)行
        [root@hadoop23 ~]# hadoop-daemon.sh start namenode

# 11.在活躍節(jié)點(diǎn)上啟動(dòng)yarn集群

    1.在活躍的resourcemang節(jié)點(diǎn)上執(zhí)行
            [root@hadoop24 ~]# start-yarn.sh
    2.在standby的節(jié)點(diǎn)上執(zhí)行
        [root@hadoop25 ~]# yarn-daemon.sh start resourcemanager

# 12.測試集群
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市董虱,隨后出現(xiàn)的幾起案子扼鞋,更是在濱河造成了極大的恐慌,老刑警劉巖空扎,帶你破解...
    沈念sama閱讀 219,110評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件藏鹊,死亡現(xiàn)場離奇詭異,居然都是意外死亡转锈,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評論 3 395
  • 文/潘曉璐 我一進(jìn)店門楚殿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來撮慨,“玉大人,你說我怎么就攤上這事脆粥∑瞿纾” “怎么了?”我有些...
    開封第一講書人閱讀 165,474評論 0 356
  • 文/不壞的土叔 我叫張陵变隔,是天一觀的道長规伐。 經(jīng)常有香客問我,道長匣缘,這世上最難降的妖魔是什么猖闪? 我笑而不...
    開封第一講書人閱讀 58,881評論 1 295
  • 正文 為了忘掉前任鲜棠,我火速辦了婚禮,結(jié)果婚禮上培慌,老公的妹妹穿的比我還像新娘豁陆。我一直安慰自己,他們只是感情好吵护,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,902評論 6 392
  • 文/花漫 我一把揭開白布盒音。 她就那樣靜靜地躺著,像睡著了一般馅而。 火紅的嫁衣襯著肌膚如雪祥诽。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,698評論 1 305
  • 那天瓮恭,我揣著相機(jī)與錄音原押,去河邊找鬼。 笑死偎血,一個(gè)胖子當(dāng)著我的面吹牛诸衔,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播颇玷,決...
    沈念sama閱讀 40,418評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼笨农,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了帖渠?” 一聲冷哼從身側(cè)響起谒亦,我...
    開封第一講書人閱讀 39,332評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎空郊,沒想到半個(gè)月后份招,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,796評論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡狞甚,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,968評論 3 337
  • 正文 我和宋清朗相戀三年锁摔,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片哼审。...
    茶點(diǎn)故事閱讀 40,110評論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡谐腰,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出涩盾,到底是詐尸還是另有隱情十气,我是刑警寧澤,帶...
    沈念sama閱讀 35,792評論 5 346
  • 正文 年R本政府宣布春霍,位于F島的核電站砸西,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜芹枷,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,455評論 3 331
  • 文/蒙蒙 一衅疙、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧杖狼,春花似錦炼蛤、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至绿聘,卻和暖如春嗽上,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背熄攘。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評論 1 272
  • 我被黑心中介騙來泰國打工兽愤, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人挪圾。 一個(gè)月前我還...
    沈念sama閱讀 48,348評論 3 373
  • 正文 我出身青樓浅萧,卻偏偏與公主長得像,于是被迫代替她去往敵國和親哲思。 傳聞我的和親對象是個(gè)殘疾皇子洼畅,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,047評論 2 355