FilterMapper
public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
? ? @Override
? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? context.write(value, NullWritable.get());
? ? }
}
FilterReducer
public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
? ? Text k = new Text();
? ? @Override
? ? protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
? ? ? ? String line = key.toString();
? ? ? ? line = line + "\r\n";
? ? ? ? k.set(line);
? ? ? ? //防止有重復的數(shù)據(jù)
? ? ? ? for (NullWritable nullWritable : values) {
? ? ? ? ? ? context.write(key, NullWritable.get());
? ? ? ? }
? ? }
}
FilterOutputFormat
public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable> {
? ? @Override
? ? public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
? ? ? ? return new FRecordWriter(job);
? ? }
}
FRecordWriter
public class FRecordWriter extends RecordWriter<Text, NullWritable> {
? ? FSDataOutputStream fosatguigu;
? ? FSDataOutputStream fosother;
? ? public FRecordWriter(TaskAttemptContext job) {
? ? ? ? try {
? ? ? ? ? ? // 1 獲取文件系統(tǒng)
? ? ? ? ? ? FileSystem fs = FileSystem.get(job.getConfiguration());
? ? ? ? ? ? // 2 創(chuàng)建輸出到atguigu.log的輸出流
? ? ? ? ? ? fosatguigu = fs.create(new Path("e:/atguigu.log"));
? ? ? ? ? ? // 3 創(chuàng)建輸出到other.log
? ? ? ? ? ? fosother = fs.create(new Path("e:/other.log"));
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
? ? @Override
? ? public void write(Text key, NullWritable value) throws IOException, InterruptedException {
? ? ? ? //判斷key當中是否有atguigu,如果有寫出到atguigu.log,如果沒有寫出到other.log
? ? ? ? if(key.toString().contains("atguigu")){
? ? ? ? ? ? //atguigu輸出流
? ? ? ? ? ? fosatguigu.write(key.toString().getBytes());
? ? ? ? }else{
? ? ? ? ? ? //other輸出流
? ? ? ? ? ? fosother.write(key.toString().getBytes());
? ? ? ? }
? ? }
? ? @Override
? ? public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
? ? ? ? IOUtils.closeStream(fosatguigu);
? ? ? ? IOUtils.closeStream(fosother);
? ? }
}
FilterDirver
public class FilterDirver {
? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
? ? ? ? //輸入輸出路徑需要根據(jù)自己電腦上的實際的輸入輸出路徑設(shè)置
? ? ? ? args = new String[]{"e:/input/inputoutputformat", "e:/output2"};
? ? ? ? // 1 獲取配置信息
? ? ? ? Configuration conf = new Configuration();
? ? ? ? Job job = Job.getInstance(conf);
? ? ? ? // 2 設(shè)置jar包加載路徑
? ? ? ? job.setJarByClass(FilterDirver.class);
? ? ? ? // 3 加載map/reduce類
? ? ? ? job.setMapperClass(FilterMapper.class);
? ? ? ? job.setReducerClass(FilterReducer.class);
? ? ? ? // 4 設(shè)置map輸出數(shù)據(jù)kv類型
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(NullWritable.class);
? ? ? ? // 5 設(shè)置最終輸出數(shù)據(jù)的kv類型
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setOutputValueClass(NullWritable.class);
? ? ? ? //要將自定義的輸出格式組件設(shè)置到j(luò)ob中
? ? ? ? job.setOutputFormatClass(FilterOutputFormat.class);
? ? ? ? // 6 設(shè)置輸入數(shù)據(jù)和輸出數(shù)據(jù)路徑
? ? ? ? FileInputFormat.setInputPaths(job, new Path(args[0]));
? ? ? ? FileOutputFormat.setOutputPath(job, new Path(args[1]));
? ? ? ? // 8 設(shè)置reduce端的分組
? ? ? ? job.setGroupingComparatorClass(OrderGroupingComparator.class);
? ? ? ? // 7 提交job
? ? ? ? boolean result = job.waitForCompletion(true);
? ? ? ? System.exit(result ? 0 : 1);
? ? }
}