本次嘗試自定義輸出類型
手機(jī)流量分為上傳流量和下載流量买羞,統(tǒng)計(jì)的時(shí)候需要得到的結(jié)果表示為(手機(jī)號(hào) 上傳流量 下載流量 總流量)例如(13333333333 200 400 600)片林,數(shù)據(jù)集中包含(手機(jī)號(hào) 上傳流量 下載流量)。因此需要自定義輸出的類型板乙。還是先新建maven項(xiàng)目砸脊,pom.xml和wordcount的一樣具篇。
1、先新建一個(gè)FlowBean類
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 用來保存對(duì)應(yīng)號(hào)碼上下行流量的對(duì)象
*/
public class FlowBean implements Writable{
private long upFlow;
private long dFlow;
private long sumFlow;
public FlowBean(){
}
public FlowBean(long upFlow, long dFlow){
this.upFlow = upFlow;
this.dFlow = dFlow;
this.sumFlow = upFlow + dFlow;
}
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dFlow);
out.writeLong(sumFlow);
}
public void readFields(DataInput dataInput) throws IOException {
upFlow = dataInput.readLong();
dFlow = dataInput.readLong();
sumFlow = dataInput.readLong();
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getdFlow() {
return dFlow;
}
public void setdFlow(long dFlow) {
this.dFlow = dFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
}
2凌埂、新建流量統(tǒng)計(jì)類(包含map和reduce)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowCount {
static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
//將一行內(nèi)容轉(zhuǎn)成String
String line = value.toString();
//切分字段
String[] fields = line.split("\t");
//取出手機(jī)號(hào)
String phoneNum = fields[0];
//取出上傳下載流量
long upFlow = Long.parseLong(fields[1]);
long dFlow = Long.parseLong(fields[2]);
context.write(new Text(phoneNum), new FlowBean(upFlow, dFlow));
}
}
static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
long sum_upFlow = 0;
long sum_dFlow = 0;
//遍歷bean
for(FlowBean bean:values){
sum_upFlow += bean.getUpFlow();
sum_dFlow += bean.getdFlow();
}
FlowBean resultBean = new FlowBean(sum_upFlow, sum_dFlow);
context.write(key, resultBean);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//指定本程序的jar包所在的路徑
job.setJarByClass(FlowCount.class);
//指定map
job.setMapperClass(FlowCountMapper.class);
//指定reduce
job.setReducerClass(FlowCountReducer.class);
//指定map輸出
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//指定reduce輸出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//指定job的輸入原始文件所在目錄
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定job的輸出原始文件所在目錄
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交job
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
將map和reduce作為內(nèi)部類驱显,代碼結(jié)構(gòu)更加簡單
然后打包,上傳到hadoop服務(wù)器瞳抓,運(yùn)行成功~檢查輸出結(jié)果如下:
查看輸出
統(tǒng)計(jì)應(yīng)該是成功了埃疫,但是顯示出來的是地址而不是值,修改一下代碼孩哑,給FlowBean重寫toString栓霜,然后reduce部分改成輸出Text:
static class FlowCountReducer extends Reducer<Text, FlowBean, Text, Text>{
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long sum_upFlow = 0;
long sum_dFlow = 0;
//遍歷bean
for(FlowBean bean:values){
sum_upFlow += bean.getUpFlow();
sum_dFlow += bean.getdFlow();
}
FlowBean resultBean = new FlowBean(sum_upFlow, sum_dFlow);
context.write(key, new Text(resultBean.toString()));
}
}
然后重新打包上傳運(yùn)行,結(jié)果變成我們想要的了:
新的結(jié)果