FlowBean
public class FlowBean implements WritableComparable<FlowBean> {
? ? private long upFlow;? ? //上行流量
? ? private long downFlow;? //下行流量
? ? private long sumFlow;? //總流量
? ? public FlowBean() {
? ? ? ? super();
? ? }
? ? public FlowBean(long upFlow, long downFlow) {
? ? ? ? super();
? ? ? ? this.upFlow = upFlow;
? ? ? ? this.downFlow = downFlow;
? ? ? ? sumFlow = upFlow + downFlow;
? ? }
? ? //比較
? ? @Override
? ? public int compareTo(FlowBean bean) {
? ? ? ? int result;
? ? ? ? //核心比較條件判斷
? ? ? ? if (sumFlow > bean.getSumFlow()) {
? ? ? ? ? ? result = -1;
? ? ? ? } else if (sumFlow < bean.getSumFlow()) {
? ? ? ? ? ? result = 1;
? ? ? ? } else {
? ? ? ? ? ? result = 0;
? ? ? ? }
? ? ? ? return result;
? ? }
? ? //序列化
? ? @Override
? ? public void write(DataOutput out) throws IOException {
? ? ? ? out.writeLong(upFlow);
? ? ? ? out.writeLong(downFlow);
? ? ? ? out.writeLong(sumFlow);
? ? }
? ? //反序列化
? ? @Override
? ? public void readFields(DataInput in) throws IOException {
? ? ? ? upFlow = in.readLong();
? ? ? ? downFlow = in.readLong();
? ? ? ? sumFlow = in.readLong();
? ? }
? ? public long getUpFlow() {
? ? ? ? return upFlow;
? ? }
? ? public void setUpFlow(long upFlow) {
? ? ? ? this.upFlow = upFlow;
? ? }
? ? public long getDownFlow() {
? ? ? ? return downFlow;
? ? }
? ? public void setDownFlow(long downFlow) {
? ? ? ? this.downFlow = downFlow;
? ? }
? ? public long getSumFlow() {
? ? ? ? return sumFlow;
? ? }
? ? public void setSumFlow(long sumFlow) {
? ? ? ? this.sumFlow = sumFlow;
? ? }
? ? @Override
? ? public String toString() {
? ? ? ? return upFlow + "\t" + downFlow + "\t" + sumFlow;
? ? }
}
FlowCountSortMapper
public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
? ? FlowBean k = new FlowBean();
? ? Text v = new Text();
? ? @Override
? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? // 1 獲取一行
? ? ? ? String line = value.toString();
? ? ? ? // 2 切割
? ? ? ? String[] fields = line.split("\t");
? ? ? ? // 3 封裝對(duì)象
? ? ? ? String phoneNum = fields[0];
? ? ? ? long upFlow = Long.parseLong(fields[1]);
? ? ? ? long downFlow = Long.parseLong(fields[2]);
? ? ? ? long sumFlow = Long.parseLong(fields[3]);
? ? ? ? k.setDownFlow(downFlow);
? ? ? ? k.setUpFlow(upFlow);
? ? ? ? k.setSumFlow(sumFlow);
? ? ? ? v.set(phoneNum);
? ? ? ? // 4 寫出
? ? ? ? context.write(k, v);
? ? }
}
FlowCountSortReducer
public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
? ? @Override
? ? protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
? ? ? ? for (Text value:values) {
? ? ? ? ? ? context.write(value, key);
? ? ? ? }
? ? }
}
FlowCountSortDriver
public class FlowCountSortDriver {
? ? public static void main(String[] args) throws Exception {
? ? ? ? //輸入輸出路徑需要根據(jù)自己電腦上的實(shí)際的輸入輸出路徑設(shè)置
? ? ? ? args = new String[]{"e:/input/output1", "e:/output1"};
? ? ? ? // 1 獲取job對(duì)象
? ? ? ? Configuration configuration = new Configuration();
? ? ? ? Job job = Job.getInstance(configuration);
? ? ? ? // 6 設(shè)置jar包存儲(chǔ)位置 關(guān)聯(lián)自定義的mapper和reducer
? ? ? ? job.setJarByClass(FlowCountSortDriver.class);
? ? ? ? // 2 指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類
? ? ? ? job.setMapperClass(FlowCountSortMapper.class);
? ? ? ? job.setReducerClass(FlowCountSortReducer.class);
? ? ? ? // 3 指定mapper輸出數(shù)據(jù)的kv類型
? ? ? ? job.setMapOutputKeyClass(FlowBean.class);
? ? ? ? job.setMapOutputValueClass(Text.class);
? ? ? ? // 4 指定最終輸出的數(shù)據(jù)的kv類型
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(FlowBean.class);
? ? ? ? // 5 指定job的輸入原始文件類型
? ? ? ? FileInputFormat.setInputPaths(job, new Path(args[0]));
? ? ? ? FileOutputFormat.setOutputPath(job, new Path(args[1]));
? ? ? ? // 6 提交job
? ? ? ? boolean result = job.waitForCompletion(true);
? ? ? ? System.exit(result ? 0 : 1);
? ? }
}