九. MapReduce 案例
9.1 統(tǒng)計(jì)各個(gè)手機(jī)號的上傳和下載流量總和
數(shù)據(jù)展示:
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
數(shù)據(jù)解釋:
# 每行數(shù)據(jù)的第二列數(shù)據(jù)是手機(jī)號,倒數(shù)第三列表示上行流量,倒數(shù)第二列表示下行流量
輸出格式要求:
# 手機(jī)號 上行流量 下行流量 總流量
最終統(tǒng)計(jì)結(jié)果為:
13726230503 上傳流量:4962 下載流量:49362 總數(shù)據(jù)流量: 54324
13826544101 上傳流量:528 下載流量:0 總數(shù)據(jù)流量: 528
13926251106 上傳流量:480 下載流量:0 總數(shù)據(jù)流量: 480
13926435656 上傳流量:264 下載流量:3024 總數(shù)據(jù)流量: 3288
創(chuàng)建數(shù)據(jù)文件上傳到HDFS文件系統(tǒng)中
[root@hadoop5 ~]# vim access.log
[root@hadoop5 ~]# cat access.log
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
[root@hadoop5 ~]# hdfs dfs -mkdir -p /accesslog
[root@hadoop5 ~]# hdfs dfs -put access.log /accesslog
編寫mapreduce的job作業(yè)完成統(tǒng)計(jì)
//統(tǒng)計(jì)手機(jī)流量
public class AccessLogJob extends Configured implements Tool {
private static Logger logger = Logger.getLogger(AccessLogJob.class);
public static void main(String[] args) throws Exception {
ToolRunner.run(new AccessLogJob(),args);
}
@Override
public int run(String[] strings) throws Exception {
//創(chuàng)建job作業(yè)
Job job = Job.getInstance(getConf(), "access-log");
job.setJarByClass(AccessLogJob.class);
//設(shè)置InputFormate
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("/accesslog/access.log"));
//設(shè)置map
job.setMapperClass(AccessLogMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//shuffle 無須設(shè)置 自動(dòng)完成
//設(shè)置reduce
job.setReducerClass(AccessLogReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//設(shè)置Output Format
job.setOutputFormatClass(TextOutputFormat.class);
Path res = new Path("/accesslog/res");
FileSystem fileSystem = FileSystem.get(getConf());
if(fileSystem.exists(res)) {
fileSystem.delete(res,true);
}
TextOutputFormat.setOutputPath(job, res);
//提交job作業(yè)
boolean status = job.waitForCompletion(true);
System.out.println("本次作業(yè)執(zhí)行狀態(tài) = " + status);
return 0;
}
public static class AccessLogMap extends Mapper<LongWritable, Text,Text,Text>{
@Override //參數(shù)1:行首字母偏移量 參數(shù)2:當(dāng)前row數(shù)據(jù) 參數(shù)3:map輸出上下文
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split("\t");
//輸出key 為手機(jī)號 值為: 每個(gè)手機(jī)號"上傳-下載流量"格式文本
context.write(new Text(values[1]),new Text(values[values.length-3]+"-"+values[values.length-2]));
logger.info("手機(jī)號: "+values[1]+" 流量格式:"+values[values.length-3]+"-"+values[values.length-2]);
}
}
//reduce
public static class AccessLogReduce extends Reducer<Text,Text,Text,Text>{
@Override //參數(shù)1:map的key 參數(shù)2:相當(dāng)key的數(shù)組 參數(shù)3:Reduce輸出的上下文
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int uploadData = 0; //保存上傳流量
int downData = 0; //保存下載流量
for (Text value : values) {
String[] datas = value.toString().split("-");
uploadData+= Integer.valueOf(datas[0]);
downData+= Integer.valueOf(datas[1]);
}
int total = uploadData + downData;//保存總流量
//輸出
context.write(key,new Text(" 上傳流量:"+uploadData+" 下載流量:"+downData+" 總數(shù)據(jù)流量: "+total));
logger.info("手機(jī)號: "+key+" 上傳流量:"+uploadData+" 下載流量:"+downData+" 總數(shù)據(jù)流量: "+total);
}
}
}
運(yùn)行mapreduce
查看結(jié)果
[root@hadoop5 ~]# hdfs dfs -text /accesslog/res/part-r-00000
13726230503 上傳流量:4962 下載流量:49362 總數(shù)據(jù)流量: 54324
13826544101 上傳流量:528 下載流量:0 總數(shù)據(jù)流量: 528
13926251106 上傳流量:480 下載流量:0 總數(shù)據(jù)流量: 480
13926435656 上傳流量:264 下載流量:3024 總數(shù)據(jù)流量: 3288
9.2 自定義MapReduce中數(shù)據(jù)類型
MapReduce的執(zhí)行過程,無論是map階段還是Reduce階段都會跨JVM,通過網(wǎng)絡(luò)通信傳遞數(shù)據(jù),索引對于傳遞數(shù)據(jù)必須實(shí)現(xiàn)序列化,為此Hadoop的MapReduce模型對現(xiàn)有的數(shù)據(jù)類型進(jìn)行了近一步的包裝,如之前用到的IntWriteable
、LongWritable
、Text
袁波、 DoubleWritable
瓢捉、NullWritable
煎殷。如果處理簡單計(jì)算有這些基礎(chǔ)類型就夠了,但是如果需要復(fù)雜結(jié)果是這些數(shù)據(jù)類型遠(yuǎn)遠(yuǎn)是不夠的,因此我們需要根據(jù)實(shí)際情況自定義數(shù)據(jù)類型!
9.2.1 查看提供已知數(shù)據(jù)類型類圖
通過類圖得知hadoop提供的數(shù)據(jù)類型都間接實(shí)現(xiàn)了:
Wirtable
、Comparable
罚屋。直接實(shí)現(xiàn)WritableComparable
接口,因此我們自定義類型也需要實(shí)現(xiàn)相應(yīng)的接口
9.2.2 查看WriteComparable接口
通過查看源碼得知自定義的數(shù)據(jù)類型需要實(shí)現(xiàn)類中
wirte
璧瞬、readFiles
、compareTo
麻裁、hashCode
和equals
箍镜、toString
等相關(guān)方法源祈。
9.2.3 根據(jù)之前流量案例定義自定義類型
開發(fā)自定義Writable類型
//自定義Writable類型
public class AccessLogWritable implements WritableComparable<AccessLogWritable> {
private Integer upload;
private Integer down;
private Integer total;
@Override
public int compareTo(AccessLogWritable o) {
return this.total-o.getTotal();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(upload);
out.writeInt(down);
out.writeInt(total);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upload = in.readInt();
this.down = in.readInt();
this.total = in.readInt();
}
@Override
public String toString() {
return "統(tǒng)計(jì)結(jié)果{" +
"上傳流量=" + upload +
", 下載流量=" + down +
", 上傳下載總流量=" + total +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AccessLogWritable accessLogWritable = (AccessLogWritable) o;
return Objects.equals(upload, accessLogWritable.upload) &&
Objects.equals(down, accessLogWritable.down) &&
Objects.equals(total, accessLogWritable.total);
}
@Override
public int hashCode() {
return Objects.hash(upload, down, total);
}
public Integer getUpload() {
return upload;
}
public void setUpload(Integer upload) {
this.upload = upload;
}
public Integer getDown() {
return down;
}
public void setDown(Integer down) {
this.down = down;
}
public Integer getTotal() {
return total;
}
public void setTotal(Integer total) {
this.total = total;
}
public AccessLogWritable() {
}
public AccessLogWritable(Integer upload, Integer down, Integer total) {
this.upload = upload;
this.down = down;
this.total = total;
}
public AccessLogWritable(Integer upload, Integer down) {
this.upload = upload;
this.down = down;
}
}
注意:write的順序和read的順序必須嚴(yán)格一致,讀的類型和寫的類型也必須完全一致
開發(fā)Job作業(yè)
public class AccessLogCustomerTypeJob extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new AccessLogCustomerTypeJob(),args);
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "customer-type-job");
job.setJarByClass(AccessLogCustomerTypeJob.class);
//設(shè)置input format
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("/accesslog/access.log"));
//設(shè)置map
job.setMapperClass(AccessLogCustomerTypeMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(AccessLogWritable.class);
//shuffle 無須設(shè)置 自動(dòng)處理
//設(shè)置reduce
job.setReducerClass(AccessLogCustomerTypeReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(AccessLogWritable.class);
//設(shè)置Output Format
job.setOutputFormatClass(TextOutputFormat.class);
Path res = new Path("/accesslog/res2");
FileSystem fileSystem = FileSystem.get(getConf());
if(fileSystem.exists(res)){
fileSystem.delete(res,true);
}
TextOutputFormat.setOutputPath(job, res);
//提交作業(yè)
boolean status = job.waitForCompletion(true);
System.out.println("作業(yè)執(zhí)行狀態(tài):" + status);
return 0;
}
public static class AccessLogCustomerTypeMap extends Mapper<LongWritable, Text,Text, AccessLogWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split("\t");
int upload = Integer.valueOf(values[values.length-3]);
int down = Integer.valueOf(values[values.length-2]);
context.write(new Text(values[1]),new AccessLogWritable(upload,down,0));
}
}
public static class AccessLogCustomerTypeReduce extends Reducer<Text, AccessLogWritable,Text, AccessLogWritable>{
@Override
protected void reduce(Text key, Iterable<AccessLogWritable> values, Context context) throws IOException, InterruptedException {
int upload =0;
int down = 0;
for (AccessLogWritable value : values) {
upload += value.getUpload();
down += value.getDown();
}
context.write(key,new AccessLogWritable(upload,down,upload+down));
}
}
}
執(zhí)行job作業(yè)
[root@hadoop5 ~]# yarn jar hadoop_wordcount-1.0-SNAPSHOT.jar
19/12/20 10:33:57 INFO client.RMProxy: Connecting to ResourceManager at hadoop6/10.15.0.6:8032
19/12/20 10:34:00 INFO input.FileInputFormat: Total input files to process : 1
19/12/20 10:34:00 INFO mapreduce.JobSubmitter: number of splits:1
19/12/20 10:34:01 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
19/12/20 10:34:02 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1576780523481_0013
19/12/20 10:34:03 INFO impl.YarnClientImpl: Submitted application application_1576780523481_0013
19/12/20 10:34:03 INFO mapreduce.Job: The url to track the job: http://hadoop6:8088/proxy/application_1576780523481_0013/
19/12/20 10:34:03 INFO mapreduce.Job: Running job: job_1576780523481_0013
19/12/20 10:34:27 INFO mapreduce.Job: Job job_1576780523481_0013 running in uber mode : false
19/12/20 10:34:27 INFO mapreduce.Job: map 0% reduce 0%
19/12/20 10:34:43 INFO mapreduce.Job: map 100% reduce 0%
19/12/20 10:35:00 INFO mapreduce.Job: map 100% reduce 100%
19/12/20 10:35:02 INFO mapreduce.Job: Job job_1576780523481_0013 completed successfully
查看結(jié)果
[root@hadoop5 ~]# hdfs dfs -text /accesslog/res2/part-r-00000
13726230503 統(tǒng)計(jì)結(jié)果{上傳流量=4962, 下載流量=49362, 上傳下載總流量=54324}
13826544101 統(tǒng)計(jì)結(jié)果{上傳流量=528, 下載流量=0, 上傳下載總流量=528}
13926251106 統(tǒng)計(jì)結(jié)果{上傳流量=480, 下載流量=0, 上傳下載總流量=480}
13926435656 統(tǒng)計(jì)結(jié)果{上傳流量=264, 下載流量=3024, 上傳下載總流量=3288}
十. MapReduce的高級特性
10.1 MapRedcuce的數(shù)據(jù)清洗
10.1.1 數(shù)據(jù)清洗
所謂數(shù)據(jù)清洗
指的是在復(fù)雜的數(shù)據(jù)格式中獲取我們需要的數(shù)據(jù)過程稱之為數(shù)據(jù)清洗,整個(gè)過程僅僅是將復(fù)雜數(shù)據(jù)中我們需要的數(shù)據(jù)清洗出來,不涉及任何的統(tǒng)計(jì)計(jì)算工作,如下圖展示過程就是數(shù)據(jù)清洗:
10.1.2 數(shù)據(jù)清洗編程思路分析
10.1.3 開發(fā)數(shù)據(jù)清洗
//開發(fā)數(shù)據(jù)清洗
public class DataCleanAccessLogJob extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new DataCleanAccessLogJob(),args);
}
@Override
public int run(String[] args) throws Exception {
//創(chuàng)建job作業(yè)
Job job = Job.getInstance(getConf(), "data-clean-access-log-job");
job.setJarByClass(DataCleanAccessLogJob.class);
//設(shè)置input format
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("/accesslog/access.log"));
//設(shè)置map
job.setMapperClass(DataCleanAccessLogMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//跳過shuffle 和 reduce
job.setNumReduceTasks(0);
//設(shè)置output format
job.setOutputFormatClass(TextOutputFormat.class);
Path res = new Path("/accesslog/cleandata");
FileSystem fileSystem = FileSystem.get(getConf());
if(fileSystem.exists(res)){
fileSystem.delete(res,true);
}
TextOutputFormat.setOutputPath(job,res);
//提交job
boolean status = job.waitForCompletion(true);
System.out.println("作業(yè)提交狀態(tài) = " + status);
return 0;
}
//map階段
public static class DataCleanAccessLogMap extends Mapper<LongWritable, Text,Text, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split("\t");
Text keyout = new Text(values[1]+"\t"+values[6] + "\t" + values[7]);
context.write(keyout, NullWritable.get());
}
}
//沒有reduce階段
}
注意: 設(shè)置job.setNumReduceTasks(0);這句話本次的mapreduce就跳過了reduce階段的執(zhí)行
10.1.4 運(yùn)行作業(yè)
[root@hadoop5 ~]# yarn jar hadoop_wordcount-1.0-SNAPSHOT.jar
19/12/20 13:22:03 INFO client.RMProxy: Connecting to ResourceManager at hadoop6/10.15.0.6:8032
19/12/20 13:22:07 INFO input.FileInputFormat: Total input files to process : 1
19/12/20 13:22:07 INFO mapreduce.JobSubmitter: number of splits:1
19/12/20 13:22:08 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
19/12/20 13:22:09 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1576780523481_0018
19/12/20 13:22:10 INFO impl.YarnClientImpl: Submitted application application_1576780523481_0018
19/12/20 13:22:10 INFO mapreduce.Job: The url to track the job: http://hadoop6:8088/proxy/application_1576780523481_0018/
19/12/20 13:22:10 INFO mapreduce.Job: Running job: job_1576780523481_0018
19/12/20 13:22:36 INFO mapreduce.Job: Job job_1576780523481_0018 running in uber mode : false
19/12/20 13:22:36 INFO mapreduce.Job: map 0% reduce 0%
19/12/20 13:22:51 INFO mapreduce.Job: map 100% reduce 0%
19/12/20 13:22:53 INFO mapreduce.Job: Job job_1576780523481_0018 completed successfully
10.1.5 查看運(yùn)行結(jié)果
[root@hadoop5 ~]# hdfs dfs -text /accesslog/cleandata/part*
13726230503 2481 24681
13826544101 264 0
13926435656 132 1512
13926251106 240 0
13726230503 2481 24681
13826544101 264 0
13926435656 132 1512
13926251106 240 0
十一. MapReduce的高級特性
11.1 MapReduce中Map的數(shù)量
# MapReduce運(yùn)行過程中Map的數(shù)量是由block所決定的:
也就是一個(gè)文件分為幾個(gè)block就是幾個(gè)map
注意:map的數(shù)量由block塊決定,也就意味著一旦文件確定根據(jù)默認(rèn)配置劃分block也將確定,所以我們沒有辦法在程序中手動(dòng)干預(yù)map的執(zhí)行數(shù)量
11.2 MapReduce中Reduce的數(shù)量
# Reduce的數(shù)量是可以在程序中手動(dòng)指定
默認(rèn)數(shù)量為: 1個(gè) Reduce
可以通過: job.setNumReduceTasks(0); 0 就是沒有 數(shù)字是幾就是幾個(gè)
11.2.1 通過修改word count案例reduce數(shù)量比較不同
使用默認(rèn)reduce數(shù)量也就是1的執(zhí)行結(jié)果:
將reduce數(shù)量修改為多個(gè)這里修改為2個(gè),運(yùn)行查看結(jié)果
//省略............
//設(shè)置reduce 階段
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//設(shè)置reduce數(shù)量
job.setNumReduceTasks(2);
//省略..........
運(yùn)行job作業(yè)
查看結(jié)果
通過這里可以總結(jié)出:有幾個(gè)reduce就會生成幾個(gè)結(jié)果文件,多個(gè)reduce同時(shí)處理數(shù)據(jù)將原來一個(gè)reduce處理結(jié)果,分到了不同的reduce處理文件中,因此如果日后需要將所有結(jié)果數(shù)據(jù)匯總在一起之能設(shè)置一個(gè)reduce,如果想要將結(jié)果劃分到多個(gè)文件中可以設(shè)置多個(gè)reduce數(shù)量
11.2.2 為什么要設(shè)置多個(gè)reduce數(shù)量?
# 1.提高M(jìn)R的運(yùn)行效率,從而快速統(tǒng)計(jì)計(jì)算工作
11.2.3 多個(gè)Reduce如何去分配map中數(shù)據(jù)?
一旦設(shè)置了多個(gè)reduce,如何讓多個(gè)reduce均分map中統(tǒng)計(jì)的數(shù)據(jù),這里面還有一個(gè)分區(qū)(Partition)概念,一個(gè)Reduce會形成一個(gè)分區(qū),默認(rèn)使用的是HashPartitioner會根據(jù)map輸出key做hash運(yùn)算去決定map中輸出數(shù)據(jù)交給那個(gè)reduce處理
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
public void configure(JobConf job) {}
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K2 key, V2 value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
11.2.4 如何自定義分區(qū)(Partitoner)
自定義分區(qū):可以根據(jù)業(yè)務(wù)規(guī)則將統(tǒng)計(jì)結(jié)果劃分到不同分區(qū)中
數(shù)據(jù)格式
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
要求
# 統(tǒng)計(jì)流量,并將不同省份數(shù)據(jù)的統(tǒng)計(jì)結(jié)果放在不同文件中
自定義分區(qū)
//自定義分區(qū) 輸入數(shù)據(jù)map端結(jié)果
public class ProvincePartitioner extends Partitioner<Text,AccessLogWritable> {
//根據(jù)業(yè)務(wù)規(guī)則將不同省份結(jié)果劃分到不同分區(qū)
private static HashMap<String,Integer> provincePartitioners = new HashMap<>();
static{
provincePartitioners.put("136",0);
provincePartitioners.put("137",1);
provincePartitioners.put("138",2);
provincePartitioners.put("139",3);
}
// 返回分區(qū)號給那個(gè)reduce
@Override
public int getPartition(Text key, AccessLogWritable accessLogWritable, int numPartitions) {
String keyPrefix = key.toString().substring(0, 3);
Integer partionId = provincePartitioners.get(keyPrefix);
return partionId ==null?4: partionId;
}
}
在job作業(yè)中指定分區(qū)
//設(shè)置分區(qū)
job.setPartitionerClass(ProvincePartitioner.class);
//設(shè)置reduce數(shù)量
job.setNumReduceTasks(5);
運(yùn)行job作業(yè)
查看結(jié)果
11.3 計(jì)數(shù)器(Counter)
計(jì)數(shù)器:顧名思義就是用來對map運(yùn)行數(shù)量和reduce運(yùn)行數(shù)量進(jìn)行統(tǒng)計(jì)的
11.3.1 在map中使用計(jì)數(shù)器
public static class AccessLogCustomerTypeMap extends Mapper<LongWritable, Text,Text, AccessLogWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//計(jì)數(shù)器
Counter access_map = context.getCounter("map-group", "access_map");
access_map.increment(1);
String[] values = value.toString().split("\t");
int upload = Integer.valueOf(values[values.length-3]);
int down = Integer.valueOf(values[values.length-2]);
context.write(new Text(values[1]),new AccessLogWritable(upload,down,0));
}
}
11.3.2 在reduce中使用計(jì)數(shù)器
public static class AccessLogCustomerTypeReduce extends Reducer<Text, AccessLogWritable,Text, AccessLogWritable>{
@Override
protected void reduce(Text key, Iterable<AccessLogWritable> values, Context context) throws IOException, InterruptedException {
//計(jì)數(shù)器
Counter access_map = context.getCounter("reduce-group", "access_reduce");
access_map.increment(1);
int upload =0;
int down = 0;
for (AccessLogWritable value : values) {
upload += value.getUpload();
down += value.getDown();
}
context.write(key,new AccessLogWritable(upload,down,upload+down));
}
}
11.4 Combiner 合并
Combiner合并:又稱之為map端的reduce,主要是通過對map局部的數(shù)據(jù)先進(jìn)行一次reduce,從而來減少map端輸出數(shù)據(jù)頻繁發(fā)送給Reduce處理時(shí)所帶來的網(wǎng)絡(luò)壓力問題。通過這種提前對map輸出做一次局部reduce,這樣既可以減輕網(wǎng)絡(luò)壓力,又能提高效率色迂。在mapreduce編程模型中默認(rèn)是關(guān)閉的香缺。
11.4.1 開啟Combiner
.....................
//設(shè)置map
job.setMapperClass(AccessLogCustomerTypeMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(AccessLogWritable.class);
//shuffle 無須設(shè)置 自動(dòng)處理
//設(shè)置Combiner
job.setCombinerClass(AccessLogCustomerTypeReduce.class);
//設(shè)置分區(qū)
job.setPartitionerClass(ProvincePartitioner.class);
...............
11.4.2 運(yùn)行設(shè)置Combiner的job
沒有設(shè)置Combiner時(shí):
設(shè)置Combiner時(shí):
十二. Job作業(yè)原理分析
12.1 Input Format 原理解析
12.1.1 類圖和源碼
查看Text InputFormat的類圖
注意:通過類圖發(fā)現(xiàn)最頂層父類為Input Format這個(gè)類
查看Input Formt 源碼:
/**
* Logically split the set of input files for the job.
*
* <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
* for processing.</p>
*
* <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
* input files are not physically split into chunks. For e.g. a split could
* be <i><input-file-path, start, offset></i> tuple. The InputFormat
* also creates the {@link RecordReader} to read the {@link InputSplit}.
*
* @param context job configuration.
* @return an array of {@link InputSplit}s for the job.
*/
public abstract
List<InputSplit> getSplits(JobContext context
) throws IOException, InterruptedException;
/**
* Create a record reader for a given split. The framework will call
* {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
* the split is used.
* @param split the split to be read
* @param context the information about the task
* @return a new record reader
* @throws IOException
* @throws InterruptedException
*/
public abstract
RecordReader<K,V> createRecordReader(InputSplit split,
TaskAttemptContext context
) throws IOException,
InterruptedException;
-
getSplits方法:
是用來根據(jù)block計(jì)算邏輯上的切片,每一個(gè)邏輯切片對應(yīng)一個(gè)map操作 -
createRecordReader方法:
用來將切片數(shù)據(jù)封裝成一行(LongWritable,Text)
12.1.2 Input Format切片的計(jì)算方式
說明:
?
1. 當(dāng)一個(gè)文件小于128M時(shí)一定會被分成一個(gè)邏輯切片,Block塊與Split(切片)一一對應(yīng)。
?
2.當(dāng)一個(gè)文件大于128M,剩余大小大于切片的1.1倍,Block塊與Split(切片)一一對應(yīng)歇僧。反之如果一個(gè)文件大于128M,剩余大小小于切片的1.1倍,此時(shí)將劃分為一個(gè)切片赫悄。
12.1.3 將數(shù)據(jù)封裝成key和value
image-20191222172100874
image-20191222172239717
12.2 Map源碼原理分析
這里只給出了核心方法:
/**
* Called once for each key/value pair in the input split. Most applications
* should override this, but the default is the identity function.
*/
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Expert users can override this method for more complete control over the
* execution of the Mapper.
* @param context
* @throws IOException
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
12.3 Reduce源碼分析
查看reduce的源碼,這里只羅列核心代碼:
**
* This method is called once for each key. Most applications will define
* their reduce class by overriding this method. The default implementation
* is an identity function.
*/
@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
* control how the reduce task works.
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
}
12.4 OutputFormat 源碼分析
查看源類圖的結(jié)構(gòu):
注意:和 InputFormat基本是類似的,最頂層父類為Output Format:
TextOutputFormat部分源碼如下:
/** An {@link OutputFormat} that writes plain text files. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
protected static class LineRecordWriter<K, V>
extends RecordWriter<K, V> {
private static final String utf8 = "UTF-8";
private static final byte[] newline;
static {
try {
newline = "\n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
protected DataOutputStream out;
private final byte[] keyValueSeparator;
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try {
this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
public LineRecordWriter(DataOutputStream out) {
this(out, "\t");
}
}
通過源碼得知默認(rèn)的key和value輸出分割符為"tab鍵"也就是這里的"\t"
12.5 Shuffle的分析
shuffle階段是一個(gè)整體叫法: 其實(shí)又分為
Map端的shuffle
和Reduce端的Shuffle
12.5.1 Map端shuffle
12.5.2 Reduce端的shuffle
12.6 MapReduce整體運(yùn)行原理
#1.計(jì)算切片
有幾個(gè)切片就有幾個(gè)map task
#2.環(huán)形緩存區(qū)
經(jīng)過map函數(shù)的邏輯處理后的數(shù)據(jù)輸出之后,會通過OutPutCollector收集器將數(shù)據(jù)收集到環(huán)形緩存區(qū)保存馏慨。
環(huán)形緩存區(qū)的大小默認(rèn)為100M,當(dāng)保存的數(shù)據(jù)達(dá)到80%時(shí),就將緩存區(qū)的數(shù)據(jù)溢出到磁盤上保存埂淮。
#3.溢出
環(huán)形緩存區(qū)的數(shù)據(jù)達(dá)到其容量的80%時(shí)就會溢出到磁盤上進(jìn)行保存,在此過程中,程序會對數(shù)據(jù)進(jìn)行分區(qū)(默認(rèn)HashPartition)和排序(默認(rèn)根據(jù)key進(jìn)行快排)
緩存區(qū)不斷溢出的數(shù)據(jù)形成多個(gè)小文件
#4.合并
溢出的多個(gè)小文件各個(gè)區(qū)合并在一起(0區(qū)和0區(qū)合并成一個(gè)0區(qū)),形成大文件
通過歸并排序保證區(qū)內(nèi)的數(shù)據(jù)有序
#5.shuffle
從過程2到過程7之間,即map任務(wù)和reduce任務(wù)之間的數(shù)據(jù)流稱為shuffle(混洗),而過程5最能體現(xiàn)出混洗這一概念。一般情況下写隶,一個(gè)reduce任務(wù)的輸入數(shù)據(jù)來自與多個(gè)map任務(wù)倔撞,多個(gè)reduce任務(wù)的情況下就會出現(xiàn)如過程5所示的,
每個(gè)reduce任務(wù)從map的輸出數(shù)據(jù)中獲取屬于自己的那個(gè)分區(qū)的數(shù)據(jù)慕趴。
#6.合并
運(yùn)行reducetask的節(jié)點(diǎn)通過過程5痪蝇,將來自多個(gè)map任務(wù)的屬于自己的分區(qū)數(shù)據(jù)下載到本地磁盤工作目錄。這多個(gè)分區(qū)文件通過歸并排序合并成大文件冕房,并根據(jù)key值分好組(key值相同的躏啰,value值會以迭代器的形式組在一起)。
#7.reducetask
reducetask從本地工作目錄獲取已經(jīng)分好組并且排好序的數(shù)據(jù)耙册,將數(shù)據(jù)進(jìn)行reduce函數(shù)中的邏輯處理给僵。
#8.輸出
每個(gè)reducetask輸出一個(gè)結(jié)果文件。
十三. MapReduce與Yarn
13.1 Job作業(yè)提交過程
客戶端的配置信息mapreduce.framework.name為yarn時(shí)详拙,客戶端會啟動(dòng)YarnRunner(yarn的客戶端程序)帝际,并將mapreduce作業(yè)提交給yarn平臺處理。
1.向ResourceManager請求運(yùn)行一個(gè)mapreduce程序饶辙。
2.ResourceManager返回hdfs地址蹲诀,告訴客戶端將作業(yè)運(yùn)行相關(guān)的資源文件上傳到hdfs。
3.客戶端提交mr程序運(yùn)行所需的文件(包括作業(yè)的jar包弃揽,作業(yè)的配置文件脯爪,分片信息等)到hdfs上。
4.作業(yè)相關(guān)信息提交完成后矿微,客戶端用過調(diào)用ResourcrManager的submitApplication()方法提交作業(yè)痕慢。
5.ResourceManager將作業(yè)傳遞給調(diào)度器,調(diào)度器的默認(rèn)調(diào)度策略是先進(jìn)先出冷冗。
6.調(diào)度器尋找一臺空閑的節(jié)點(diǎn)守屉,并在該節(jié)點(diǎn)隔離出一個(gè)容器(container),容器中分配了cpu蒿辙,內(nèi)存等資源拇泛,并啟動(dòng)MRAppmaster進(jìn)程滨巴。
7.MRAppmaster根據(jù)需要運(yùn)行多少個(gè)map任務(wù),多少個(gè)reduce任務(wù)向ResourceManager請求資源俺叭。
8.ResourceManager分配相應(yīng)數(shù)量的容器恭取,并告知MRAppmaster容器在哪。
9.MRAppmaster啟動(dòng)maptask熄守。
10.maptask從HDFS獲取分片數(shù)據(jù)執(zhí)行map邏輯蜈垮。
11.map邏輯執(zhí)行結(jié)束后,MRAppmaster啟動(dòng)reducetask裕照。
12.reducetask從maptask獲取屬于自己的分區(qū)數(shù)據(jù)執(zhí)行reduce邏輯攒发。
13.reduce邏輯結(jié)束后將結(jié)果數(shù)據(jù)保存到HDFS上。
14.mapreduce作業(yè)結(jié)束后晋南,MRAppmaster通知ResourceManager結(jié)束自己惠猿,讓ResourceManager回收所有資源。
十四. HA的hadoop集群搭建
14.1 集群規(guī)劃
# 集群規(guī)劃
10.15.0.20 zk zknodes 通過一個(gè)節(jié)點(diǎn)充當(dāng)整個(gè)集群
10.15.0.22 hadoop22 NameNode (active) & ZKFC
10.15.0.23 hadoop23 NameNode (standby) & ZKFC
10.15.0.24 hadoop24 ResourceManager(active)
10.15.0.25 hadoop25 ResourceManager(standby)
10.15.0.26 hadoop26 DataNode & JournalNode & NodeManager
10.15.0.27 hadoop27 DataNode & JournalNode & NodeManager
10.15.0.28 hadoop28 DataNode & JournalNode & NodeManager
# 克隆機(jī)器做準(zhǔn)備工作:
0.修改ip地址為上述ip
1.修改主機(jī)名/etc/hostsname為上述對應(yīng)主機(jī)名 修改完必須重新啟動(dòng)
2.配置主機(jī)名ip地址映射/etc/hosts文件并同步所有節(jié)點(diǎn)
10.15.0.20 zk
10.15.0.22 hadoop22
10.15.0.23 hadoop23
10.15.0.24 hadoop24
10.15.0.25 hadoop25
10.15.0.26 hadoop26
10.15.0.27 hadoop27
10.15.0.28 hadoop28
3.所有節(jié)點(diǎn)安裝jdk并配置環(huán)境變量
4.關(guān)閉所有機(jī)器的網(wǎng)絡(luò)防火墻配置
systemctl stop firewalld
systemctl disable firewalld
5.所有節(jié)點(diǎn)安裝centos7.x搭建集群的依賴
yum install psmisc -y
6.配置ssh免密登錄
hadoop22 生成ssh-keygen 然后ssh-copy-id 到 hadoop22~~hadoop28 上每一個(gè)節(jié)點(diǎn)
hadoop23 生成ssh-keygen 然后ssh-copy-id 到 hadoop22~~hadoop28 上每一個(gè)節(jié)點(diǎn)
hadoop24 生成ssh-keygen 然后ssh-copy-id 到 hadoop22~~hadoop28 上每一個(gè)節(jié)點(diǎn)
hadoop25 生成ssh-keygen 然后ssh-copy-id 到 hadoop22~~hadoop28 上每一個(gè)節(jié)點(diǎn)
14.2 搭建zk集群
# 1.安裝zk安裝包
[root@zk ~]# tar -zxvf zookeeper-3.4.12.tar.gz
# 2.準(zhǔn)備zk的數(shù)據(jù)文件夾
[root@zk ~]# mkdir zkdata1 zkdata2 zkdata3
# 3.在每個(gè)數(shù)據(jù)文件夾中準(zhǔn)備集群唯一標(biāo)識文件myid
[root@zk ~]# echo "1" >> zkdata1/myid
[root@zk ~]# echo "2" >> zkdata2/myid
[root@zk ~]# echo "3" >> zkdata3/myid
# 4.在每個(gè)數(shù)據(jù)文件夾中準(zhǔn)備zk的配置文件zoo.cfg
[root@zk ~]# vim /root/zkdata1/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/root/zkdata1
clientPort=3001
server.1=zk:3002:3003
server.2=zk:4002:4003
server.3=zk:5002:5003
[root@zk ~]# vim /root/zkdata2/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/root/zkdata2
clientPort=4001
server.1=zk:3002:3003
server.2=zk:4002:4003
server.3=zk:5002:5003
[root@zk ~]# vim /root/zkdata3/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/root/zkdata3
clientPort=5001
server.1=zk:3002:3003
server.2=zk:4002:4003
server.3=zk:5002:5003
# 5.進(jìn)入zk安裝目錄bin目錄執(zhí)行如下命令啟動(dòng)zk集群:
[root@zk bin]# ./zkServer.sh start /root/zkdata1/zoo.cfg
[root@zk bin]# ./zkServer.sh start /root/zkdata2/zoo.cfg
[root@zk bin]# ./zkServer.sh start /root/zkdata3/zoo.cfg
# 6.進(jìn)入zk安裝目錄bin目錄執(zhí)行如下命令查看集群狀態(tài)
./zkServer.sh status /root/zkdata1/zoo.cfg
./zkServer.sh status /root/zkdata2/zoo.cfg
./zkServer.sh status /root/zkdata3/zoo.cfg
14.3 搭建hadoop的HA集群
# 1.在hadoop22--hadoop28上安裝hadoop安裝包
tar -zxf hadoop-2.9.2.tar.gz
# 2.在hadoop22--hadoop28機(jī)器上配置hadoop環(huán)境變量
# 3.在hadoop22節(jié)點(diǎn)上配置hadoop-env.sh文件
export JAVA_HOME=/usr/java/jdk1.8.0_171-amd64
# 4.在hadoop22節(jié)點(diǎn)上配置core-site.xml文件
[root@hadoop22 ~]# vim hadoop-2.9.2/etc/hadoop/core-site.xml
<!--hdfs主要入口不再是一個(gè)具體機(jī)器而是一個(gè)虛擬的名稱 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://ns</value>
</property>
<!-- hadoop臨時(shí)目錄位置 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/root/hadoop-2.9.2/data</value>
</property>
<!--zk集群的所有節(jié)點(diǎn)-->
<property>
<name>ha.zookeeper.quorum</name>
<value>zk:3001,zk:4001,zk:5001</value>
</property>
# 5.在hadoop2節(jié)點(diǎn)上配置hdfs-site.xml文件
[root@hadoop22 ~]# vim hadoop-2.9.2/etc/hadoop/hdfs-site.xml
<!--指定hdfs的nameservice為ns负间,需要和core-site.xml中的保持一致 -->
<property>
<name>dfs.nameservices</name>
<value>ns</value>
</property>
<!-- ns下面有兩個(gè)NameNode偶妖,分別是nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.ns</name>
<value>nn1,nn2</value>
</property>
<!-- nn1的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.ns.nn1</name>
<value>hadoop22:9000</value>
</property>
<!-- nn1的http通信地址 -->
<property>
<name>dfs.namenode.http-address.ns.nn1</name>
<value>hadoop22:50070</value>
</property>
<!-- nn2的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.ns.nn2</name>
<value>hadoop23:9000</value>
</property>
<!-- nn2的http通信地址 -->
<property>
<name>dfs.namenode.http-address.ns.nn2</name>
<value>hadoop23:50070</value>
</property>
<!-- 指定NameNode的元數(shù)據(jù)在JournalNode上的存放位置 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://hadoop26:8485;hadoop27:8485;hadoop28:8485/ns</value>
</property>
<!-- 指定JournalNode在本地磁盤存放數(shù)據(jù)的位置 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/root/journal</value>
</property>
<!-- 開啟NameNode故障時(shí)自動(dòng)切換 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
<!-- 配置失敗自動(dòng)切換實(shí)現(xiàn)方式 -->
<property>
<name>dfs.client.failover.proxy.provider.ns</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<!-- 配置隔離機(jī)制政溃,如果ssh是默認(rèn)22端口趾访,value直接寫sshfence即可 -->
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<!-- 使用隔離機(jī)制時(shí)需要ssh免登陸 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>
# 6.在hadoop2節(jié)點(diǎn)上配置yarn-site.xml文件
<!-- 開啟RM高可用 -->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<!-- 指定RM的cluster id -->
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>yrc</value>
</property>
<!-- 指定RM的名字 -->
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<!-- 分別指定RM的地址 -->
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>hadoop24</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>hadoop25</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address.rm1</name>
<value>hadoop24:8088</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address.rm2</name>
<value>hadoop25:8088</value>
</property>
<!-- 指定zk集群地址 -->
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>zk:3001,zk:4001,zk:5001</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
# 7.在hadoop2節(jié)點(diǎn)上配置mapred-site.xml文件,默認(rèn)不存在需要復(fù)制
[root@hadoop22 ~]# cp hadoop-2.9.2/etc/hadoop/mapred-site.xml.template hadoop-2.9.2/etc/hadoop/mapred-site.xml
[root@hadoop22 ~]# vim hadoop-2.9.2/etc/hadoop/mapred-site.xml
<!-- 指定mr框架為yarn方式 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
# 8.在hadoop2節(jié)點(diǎn)上配置slaves文件
[root@hadoop22 ~]# vim hadoop-2.9.2/etc/hadoop/slaves
hadoop26
hadoop27
hadoop28
# 9.同步集群配置文件
scp -r hadoop-2.9.2/etc/hadoop/ hadoop23:/root/hadoop-2.9.2/etc/
scp -r hadoop-2.9.2/etc/hadoop/ hadoop24:/root/hadoop-2.9.2/etc/
scp -r hadoop-2.9.2/etc/hadoop/ hadoop25:/root/hadoop-2.9.2/etc/
scp -r hadoop-2.9.2/etc/hadoop/ hadoop26:/root/hadoop-2.9.2/etc/
scp -r hadoop-2.9.2/etc/hadoop/ hadoop27:/root/hadoop-2.9.2/etc/
scp -r hadoop-2.9.2/etc/hadoop/ hadoop28:/root/hadoop-2.9.2/etc/
# 10.啟動(dòng)HDFS的高可用
1.在任意NameNode上格式化ZK
hdfs zkfc -formatZK
2.在hadoop26 hadoop27 hadoop28啟動(dòng)journal node
[root@hadoop26 ~]# hadoop-daemon.sh start journalnode
[root@hadoop27 ~]# hadoop-daemon.sh start journalnode
[root@hadoop28 ~]# hadoop-daemon.sh start journalnode
3.在活躍的NameNode節(jié)點(diǎn)上執(zhí)行格式化
[root@hadoop22 ~]# hdfs namenode -format ns
4.在NameNode上啟動(dòng)hdfs集群
[root@hadoop22 ~]# start-dfs.sh
5.在standby的NameNode上執(zhí)行
[root@hadoop23 ~]# hdfs namenode -bootstrapStandby
6.在standby的NameNode執(zhí)行
[root@hadoop23 ~]# hadoop-daemon.sh start namenode
# 11.在活躍節(jié)點(diǎn)上啟動(dòng)yarn集群
1.在活躍的resourcemang節(jié)點(diǎn)上執(zhí)行
[root@hadoop24 ~]# start-yarn.sh
2.在standby的節(jié)點(diǎn)上執(zhí)行
[root@hadoop25 ~]# yarn-daemon.sh start resourcemanager
# 12.測試集群