在Hadoop中配椭,排序是MapReduce的靈魂,MapTask和ReduceTask均會對數(shù)據(jù)按Key排序雹姊,這個操作是MR框架的默認(rèn)行為股缸,不管你的業(yè)務(wù)邏輯上是否需要這一操作。
這篇主要講 如何實現(xiàn):當(dāng)需要根據(jù)某一字段進(jìn)行分組吱雏,并對每個分組求 排在前 N 個的值敦姻。而上篇沒有涉及到分組,這是兩篇最大的不同歧杏。
需求
如下面的數(shù)據(jù):
grade score
A 10
A 40
B 30
C 20
B 10
D 40
A 30
C 20
B 10
D 40
C 30
D 20
...
grade 是年級镰惦,主要包括 A、B犬绒、C陨献、D 四個年級;
score 是對應(yīng)年級的分?jǐn)?shù)懂更。
我們需要根據(jù)年級進(jìn)行分組眨业,并統(tǒng)計每個年級下排在前 N 個的分?jǐn)?shù)。
分析:
- 我們這篇是要利用 MapReduce 自帶的排序功能沮协,即根據(jù) key 進(jìn)行排序龄捡,那么我們把 grade、score 封裝到一個對象中慷暂, compareTo 中根據(jù) score 指定排序規(guī)則聘殖;
- 然后要自定義 partitioner,根據(jù) grade 進(jìn)行分區(qū)行瑞。這樣每個相同 grade 的對象 就會分到 同一 reducer 中奸腺。
- 要自定義實現(xiàn) groupingcomparator
下面對 GroupingComparator 做下介紹
GroupingComparator
在hadoop的mapreduce編程模型中,當(dāng)在 map 端處理完成輸出 key-value對時血久,reduce端只會將key相同的到同一個reduce函數(shù)中去執(zhí)行突照,如果現(xiàn)在map端輸出的key是一個對象 TextPair,那這樣每個 map 端到 reduce 都會變成如下形式 (因為每個對象都不一樣氧吐,所以不能聚合到一起):
<textPair01讹蘑,1>
<textPair02,1>
<textPair03筑舅,1>
<textPair04座慰,1>
...
但是我們又有這樣的需求:根據(jù) TextPair 的某一個成員A,所有具有相同 A 的TextPair 都放到一個 reducer 函數(shù)中處理翠拣,這個A 就相當(dāng)于 “相同的Key”版仔。我們可以通過 GroupingComparator 實現(xiàn)此功能。
套用我們的例子,即所有相同的 grade 的對象蛮粮,都放到同一個 reducer 中處理背桐,并取前 N 個值。
這里和 partitioner 做下區(qū)分蝉揍,有些人可能混淆。
partitioner 是把 所有相同 grade 的對象放到一個 Reducer Task 中畦娄,但聚合還是要根據(jù)相同 key 的又沾,而我們 每個對象都不一樣,所以沒辦法聚合熙卡,所以要使用 GroupingComparator 杖刷。
上代碼:
定義成績信息bean
public class ScoreBean implements WritableComparable<ScoreBean>{
private Text grade;
private DoubleWritable score;
public ScoreBean() {
}
public ScoreBean(Text grade, DoubleWritable score) {
set(grade, score);
}
public void set(Text grade, DoubleWritable score) {
this.grade = grade;
this.score = score;
}
public Text getGrade() {
return grade;
}
public DoubleWritable getScore() {
return score;
}
@Override
public int compareTo(ScoreBean o) {
int cmp = this.grade.compareTo(o.getGrade());
if (cmp == 0) {
cmp = -this.score.compareTo(o.getScore());
}
return cmp;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(grade.toString());
out.writeDouble(score.get());
}
@Override
public void readFields(DataInput in) throws IOException {
String readUTF = in.readUTF();
double readDouble = in.readDouble();
this.grade = new Text(readUTF);
this.score= new DoubleWritable(readDouble);
}
@Override
public String toString() {
return grade.toString() + "\t" + score.get();
}
}
自定義partation分片:
public class GradePartitioner extends Partitioner<ScoreBean, NullWritable>{
@Override
public int getPartition(ScoreBean bean, NullWritable value, int numReduceTasks) {
//相同grade的成績bean,會發(fā)往相同的partition
//而且驳癌,產(chǎn)生的分區(qū)數(shù)滑燃,是會跟用戶設(shè)置的reduce task數(shù)保持一致
return (bean.getGrade().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
自定義groupingcomparator
public class GradeGroupingComparator extends WritableComparator {
protected GradeGroupingComparator() {
super(ScoreBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
ScoreBean abean = (ScoreBean) a;
ScoreBean bbean = (ScoreBean) b;
//將grade相同的bean都視為相同,從而聚合為一組
return abean.getGrade().compareTo(bbean.getGrade());
}
}
編寫mapreduce處理流程
public class SecondarySort {
static class SecondarySortMapper extends Mapper<LongWritable, Text, ScoreBean, NullWritable>{
ScoreBean bean = new ScoreBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = StringUtils.split(line, "\t");
bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));
context.write(bean, NullWritable.get());
}
}
static class SecondarySortReducer extends Reducer<ScoreBean, NullWritable, ScoreBean, NullWritable>{
//在設(shè)置了groupingcomparator以后颓鲜,這里收到的kv數(shù)據(jù) 就是: <1001 87.6>,null <1001 76.5>,null ....
//此時表窘,reduce方法中的參數(shù)key就是上述kv組中的第一個kv的key:<1001 87.6>
//要輸出同一個grade的所有成績中最大金額的那一個,就只要輸出這個key
@Override
protected void reduce(ScoreBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SecondarySort.class);
job.setMapperClass(SecondarySortMapper.class);
job.setReducerClass(SecondarySortReducer.class);
job.setOutputKeyClass(ScoreBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//指定shuffle所使用的GroupingComparator類
job.setGroupingComparatorClass(GradeGroupingComparator.class);
//指定shuffle所使用的partitioner類
job.setPartitionerClass(GradePartitioner.class);
job.setNumReduceTasks(3);
job.waitForCompletion(true);
}
}