多個MapReduce之間的嵌套
在很多實際工作中赔嚎,單個MR不能滿足邏輯需求旭贬,而是需要多個MR之間的相互嵌套。很多場景下锐涯,一個MR的輸入依賴于另一個MR的輸出。結(jié)合案例實現(xiàn)一下兩個MR的嵌套填物。
** Tip:如果只關(guān)心多個MR嵌套的實現(xiàn)纹腌,可以直接跳到下面《多個MR嵌套源碼》章節(jié)查看 **
案例描述
根據(jù)log日志計算log中不同的IP地址數(shù)量是多少。測試數(shù)據(jù)如下圖所示:
該日志中每個字段都是用Tab建分割的滞磺。
案例分析
本次任務(wù)的目的是計算該日志不同的IP地址一共有多少升薯。實現(xiàn)這個目的的方式有很多種,但是本文的目的是借助改案例對兩個MapReduce之間的嵌套進(jìn)行總結(jié)的击困。
實現(xiàn)方法
該任務(wù)分為兩個MR過程涎劈,第一個MR(命名為MR1)負(fù)責(zé)將重復(fù)的ip地址去掉,然后將無重復(fù)的ip地址進(jìn)行輸出。第二個MR(命名為MR2)負(fù)責(zé)將MR1輸出的ip地址文件進(jìn)行匯總责语,然后將計算總數(shù)輸出炮障。
MR1階段
map過程
public class IpFilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] splits = line .split("\t");
String ip = splits[3];
context.write(new Text(ip), NullWritable.get());
}
}
輸入的key和value是文本的行號和每行的內(nèi)容。
輸出的key是ip地址坤候,輸出的value為空類型胁赢。
shuffle過程
主要是針對map階段輸出的key進(jìn)行排序和分組,將相同的key分為一組白筹,并且將相同key的value放到同一個集合里面智末,所以不同的組絕對不會出現(xiàn)相同的ip地址,分好組之后將值傳遞給reduce徒河。注:該階段是hadoop系統(tǒng)自動完成的系馆,不需要程序員編程
reduce過程
public class IpFilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
由于經(jīng)過shuffle階段之后所有輸入的key都是不同的,也就是ip地址是無重復(fù)的顽照,所以可以直接輸出由蘑。
MR2階段
map過程
public class IpCountMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
//輸出的key為字符串"ip",這個可以隨便設(shè)置,只要保證每次輸出的key都一樣就行
//目的是為了在shuffle階段分組
context.write(new Text("ip"), NullWritable.get());
}
}
shuffle過程
按照相同的key進(jìn)行分組代兵,由于map階段所有的key都一樣尼酿,所以最后只有一組。
reduce過程
public class IpCountReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values,
Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
//用于存放ip地址總數(shù)量
int count = 0;
for (NullWritable v : values) {
count ++;
}
context.write(new Text(count+""), NullWritable.get());
}
}
流程圖
源碼
MR1 map源碼
//MR1 map源碼
package com.ipcount.mrmr;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class IpFilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] splits = line .split("\t");
String ip = splits[3];
context.write(new Text(ip), NullWritable.get());
}
}
MR1 reduce源碼
package com.ipcount.mrmr;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class IpFilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
MR2 map源碼
package com.ipcount.mrmr;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class IpCountMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
throws IOException, InterruptedException {
context.write(new Text("ip"), NullWritable.get());
}
}
MR2 reduce源碼
package com.ipcount.mrmr;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class IpCountReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values,
Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
int count = 0;
for (NullWritable v : values) {
count ++;
}
context.write(new Text(count+""), NullWritable.get());
}
}
多個MR嵌套源碼
package com.ipcount.mrmr;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driver {
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(Driver.class);
//job1設(shè)置
Job job1 = new Job(conf, "job1");
job1.setJarByClass(Driver.class);
job1.setMapperClass(IpFilterMapper.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(NullWritable.class);
job1.setReducerClass(IpFilterReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path(args[1]));
//job1加入控制器
ControlledJob ctrlJob1 = new ControlledJob(conf);
ctrlJob1.setJob(job1);
//job2設(shè)置
Job job2 = new Job(conf, "job2");
job2.setJarByClass(Driver.class);
job2.setMapperClass(IpCountMapper.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(NullWritable.class);
job2.setReducerClass(IpCountReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job2, new Path(args[1]));
FileOutputFormat.setOutputPath(job2, new Path(args[2]));
//job2加入控制器
ControlledJob ctrlJob2 = new ControlledJob(conf);
ctrlJob2.setJob(job2);
//設(shè)置作業(yè)之間的以來關(guān)系植影,job2的輸入以來job1的輸出
ctrlJob2.addDependingJob(ctrlJob1);
//設(shè)置主控制器裳擎,控制job1和job2兩個作業(yè)
JobControl jobCtrl = new JobControl("myCtrl");
//添加到總的JobControl里,進(jìn)行控制
jobCtrl.addJob(ctrlJob1);
jobCtrl.addJob(ctrlJob2);
//在線程中啟動思币,記住一定要有這個
Thread thread = new Thread(jobCtrl);
thread.start();
while (true) {
if (jobCtrl.allFinished()) {
System.out.println(jobCtrl.getSuccessfulJobList());
jobCtrl.stop();
break;
}
}
}
}