這次嘗試區(qū)分器的使用
很多證件的號碼會根據(jù)前綴的幾個字符來確定一些信息副签,比如省份等,手機(jī)號也有這樣的特征基矮,通過前綴來區(qū)分是移動還是電信聯(lián)通等淆储,將號碼根據(jù)不通的前綴匯總到不通的文件輸出,這就是區(qū)分器的用途家浇,還和上次一樣本砰,咱們先來創(chuàng)建一個pom.xml,然后創(chuàng)建FlowBean對象用來保存上傳下載流量钢悲。接下來做的就不同了点额,我們要自己定義一個區(qū)分器:
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import java.util.HashMap;
/**
* 區(qū)分器舔株,
*/
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
public static HashMap<String, Integer> proviceDict = new HashMap<String, Integer>();
static{
proviceDict.put("137",0);
proviceDict.put("133",1);
proviceDict.put("138",2);
proviceDict.put("135",3);
}
@Override
public int getPartition(Text key, FlowBean value, int numPartitions){
String prefix = key.toString().substring(0,3);
Integer province = proviceDict.get(prefix);
return province==null?4:province;
}
}
區(qū)分器一般都是通過一個hashMap完成的,這里我們分成5個區(qū)还棱,為啥不是4個载慈?因為有其他前綴的號碼會被歸為最后一類,然后就可以來寫mapreduce主程序了:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowCount {
static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
String phone = fields[0];
long upFlow = Long.parseLong(fields[1]);
long dFlow = Long.parseLong(fields[2]);
context.write(new Text(phone), new FlowBean(upFlow,dFlow));
}
}
static class FlowCountReducer extends Reducer<Text, FlowBean, Text,Text>{
@Override
protected void reduce(Text key, Iterable<FlowBean> value, Context context ) throws IOException, InterruptedException {
long sum_upFlow = 0;
long sum_dFlow = 0;
for (FlowBean bean:value){
sum_upFlow+=bean.getUpFlow();
sum_dFlow+=bean.getdFlow();
}
context.write(key, new Text((new FlowBean(sum_upFlow,sum_dFlow)).toString()));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(FlowBean.class);
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
//指定分區(qū)器
job.setPartitionerClass(ProvincePartitioner.class);
//指定相應(yīng)分區(qū)數(shù)量的reduceTask
job.setNumReduceTasks(5);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
需要注意珍手,主程序里面指定了區(qū)分器和數(shù)量
然后打包上傳办铡,讓我們來看看效果怎么樣吧
ubuntu@hadoop1:~/text$ hdfs dfs -ls /output/partitioner1
Found 6 items
-rw-r--r-- 3 ubuntu supergroup 0 2019-07-31 20:06 /output/partitioner1/_SUCCESS
-rw-r--r-- 3 ubuntu supergroup 28 2019-07-31 20:06 /output/partitioner1/part-r-00000
-rw-r--r-- 3 ubuntu supergroup 84 2019-07-31 20:06 /output/partitioner1/part-r-00001
-rw-r--r-- 3 ubuntu supergroup 28 2019-07-31 20:06 /output/partitioner1/part-r-00002
-rw-r--r-- 3 ubuntu supergroup 28 2019-07-31 20:06 /output/partitioner1/part-r-00003
-rw-r--r-- 3 ubuntu supergroup 80 2019-07-31 20:06 /output/partitioner1/part-r-00004
輸出文件從之前的1個變成了5個,可見是成功了的琳要,然后查看其中的一個寡具,果然只保存他的分區(qū)的內(nèi)容
ubuntu@hadoop1:~/text$ hdfs dfs -cat /output/partitioner1/part-r-00004
14838244322 (900 500 1400)
18273723427 (300 800 1100)
19283413241 (500 200 700)