OrderBean
public class OrderBean implements WritableComparable<OrderBean> {
? ? private int order_id;? //訂單id
? ? private double price;? //價格
? ? public OrderBean() {
? ? ? ? super();
? ? }
? ? public OrderBean(int order_id, double price) {
? ? ? ? super();
? ? ? ? this.order_id = order_id;
? ? ? ? this.price = price;
? ? }
? ? @Override
? ? public int compareTo(OrderBean bean) {
? ? ? ? //先按照id升序排序暴构,如果相同按照價格降序排序
? ? ? ? int result;
? ? ? ? if (order_id > bean.getOrder_id()) {
? ? ? ? ? ? result = 1;
? ? ? ? } else if (order_id < bean.getOrder_id()) {
? ? ? ? ? ? result = -1;
? ? ? ? } else {
? ? ? ? ? ? if (price > bean.getPrice()) {
? ? ? ? ? ? ? ? result = -1;
? ? ? ? ? ? } else if (price < bean.getPrice()) {
? ? ? ? ? ? ? ? result = 1;
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? result = 0;
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? return result;
? ? }
? ? @Override
? ? public void write(DataOutput out) throws IOException {
? ? ? ? out.writeInt(order_id);
? ? ? ? out.writeDouble(price);
? ? }
? ? @Override
? ? public void readFields(DataInput in) throws IOException {
? ? ? ? order_id = in.readInt();
? ? ? ? price = in.readDouble();
? ? }
? ? public int getOrder_id() {
? ? ? ? return order_id;
? ? }
? ? public void setOrder_id(int order_id) {
? ? ? ? this.order_id = order_id;
? ? }
? ? public double getPrice() {
? ? ? ? return price;
? ? }
? ? public void setPrice(double price) {
? ? ? ? this.price = price;
? ? }
? ? @Override
? ? public String toString() {
? ? ? ? return order_id + "\t" + price;
? ? }
}
OrderMapper
public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
? ? OrderBean k = new OrderBean();
? ? @Override
? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? // 1 獲取一行
? ? ? ? String line = value.toString();
? ? ? ? // 2 切割
? ? ? ? String[] fields = line.split(" ");
? ? ? ? // 3 封裝對象
? ? ? ? k.setOrder_id(Integer.parseInt(fields[0]));
? ? ? ? k.setPrice(Double.parseDouble(fields[2]));
? ? ? ? // 4 寫出
? ? ? ? context.write(k, NullWritable.get());
? ? }
}
OrderReducer
public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
? ? @Override
? ? protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
? ? ? ? context.write(key, NullWritable.get());
? ? }
}
OrderDirver
public class OrderDirver {
? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
? ? ? ? //輸入輸出路徑需要根據(jù)自己電腦上的實際的輸入輸出路徑設(shè)置
? ? ? ? args = new String[]{"e:/input/inputorder", "e:/output1"};
? ? ? ? // 1 獲取配置信息
? ? ? ? Configuration conf = new Configuration();
? ? ? ? Job job = Job.getInstance(conf);
? ? ? ? // 2 設(shè)置jar包加載路徑
? ? ? ? job.setJarByClass(OrderDirver.class);
? ? ? ? // 3 加載map/reduce類
? ? ? ? job.setMapperClass(OrderMapper.class);
? ? ? ? job.setReducerClass(OrderReducer.class);
? ? ? ? // 4 設(shè)置map輸出數(shù)據(jù)kv類型
? ? ? ? job.setMapOutputKeyClass(OrderBean.class);
? ? ? ? job.setMapOutputValueClass(NullWritable.class);
? ? ? ? // 5 設(shè)置最終輸出數(shù)據(jù)的kv類型
? ? ? ? job.setOutputKeyClass(OrderBean.class);
? ? ? ? job.setOutputValueClass(NullWritable.class);
? ? ? ? // 6 設(shè)置輸入數(shù)據(jù)和輸出數(shù)據(jù)路徑
? ? ? ? FileInputFormat.setInputPaths(job, new Path(args[0]));
? ? ? ? FileOutputFormat.setOutputPath(job, new Path(args[1]));
? ? ? ? // 8 設(shè)置reduce端的分組
? ? ? ? job.setGroupingComparatorClass(OrderGroupingComparator.class);
? ? ? ? // 7 提交job
? ? ? ? boolean result = job.waitForCompletion(true);
? ? ? ? System.exit(result ? 0 : 1);
? ? }
}
OrderGroupingComparator
public class OrderGroupingComparator extends WritableComparator {
? ? protected OrderGroupingComparator() {
? ? ? ? super(OrderBean.class, true);
? ? }
? ? @Override
? ? public int compare(WritableComparable a, WritableComparable b) {
? ? ? ? //要求只要id相同跪呈,就認為是相同的key
? ? ? ? OrderBean aBean = (OrderBean) a;
? ? ? ? OrderBean bBean = (OrderBean) b;
? ? ? ? int result;
? ? ? ? if (aBean.getOrder_id() > bBean.getOrder_id()) {
? ? ? ? ? ? result = 1;
? ? ? ? } else if (aBean.getOrder_id() < bBean.getOrder_id()) {
? ? ? ? ? ? result = -1;
? ? ? ? } else {
? ? ? ? ? ? result = 0;
? ? ? ? }
? ? ? ? return result;
? ? }
}
PS:如果要顯示top3可修改Reducer
OrderReducer
public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
? ? @Override
? ? protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
? ? ? ? //循環(huán)的時候設(shè)置循環(huán)次數(shù)為3即可
? ? ? ? for (NullWritable nullWritable : values) {
? ? ? ? ? ? context.write(key, NullWritable.get());
? ? ? ? }
? ? }
}