MapReduce 分組 TopN(二次排序)

在Hadoop中配椭,排序是MapReduce的靈魂,MapTask和ReduceTask均會對數(shù)據(jù)按Key排序雹姊,這個操作是MR框架的默認(rèn)行為股缸,不管你的業(yè)務(wù)邏輯上是否需要這一操作。

這篇主要講 如何實現(xiàn):當(dāng)需要根據(jù)某一字段進(jìn)行分組吱雏,并對每個分組求 排在前 N 個的值敦姻。而上篇沒有涉及到分組,這是兩篇最大的不同歧杏。

需求

如下面的數(shù)據(jù):

grade   score  
A       10  
A       40  
B       30  
C       20  
B       10  
D       40  
A       30  
C       20  
B       10  
D       40  
C       30  
D       20  
...

grade 是年級镰惦,主要包括 A、B犬绒、C陨献、D 四個年級;
score 是對應(yīng)年級的分?jǐn)?shù)懂更。

我們需要根據(jù)年級進(jìn)行分組眨业,并統(tǒng)計每個年級下排在前 N 個的分?jǐn)?shù)。

分析:

  1. 我們這篇是要利用 MapReduce 自帶的排序功能沮协,即根據(jù) key 進(jìn)行排序龄捡,那么我們把 grade、score 封裝到一個對象中慷暂, compareTo 中根據(jù) score 指定排序規(guī)則聘殖;
  2. 然后要自定義 partitioner,根據(jù) grade 進(jìn)行分區(qū)行瑞。這樣每個相同 grade 的對象 就會分到 同一 reducer 中奸腺。
  3. 要自定義實現(xiàn) groupingcomparator

下面對 GroupingComparator 做下介紹

GroupingComparator

在hadoop的mapreduce編程模型中,當(dāng)在 map 端處理完成輸出 key-value對時血久,reduce端只會將key相同的到同一個reduce函數(shù)中去執(zhí)行突照,如果現(xiàn)在map端輸出的key是一個對象 TextPair,那這樣每個 map 端到 reduce 都會變成如下形式 (因為每個對象都不一樣氧吐,所以不能聚合到一起):

<textPair01讹蘑,1>
<textPair02,1>
<textPair03筑舅,1>
<textPair04座慰,1>
...

但是我們又有這樣的需求:根據(jù) TextPair 的某一個成員A,所有具有相同 A 的TextPair 都放到一個 reducer 函數(shù)中處理翠拣,這個A 就相當(dāng)于 “相同的Key”版仔。我們可以通過 GroupingComparator 實現(xiàn)此功能。

套用我們的例子,即所有相同的 grade 的對象蛮粮,都放到同一個 reducer 中處理背桐,并取前 N 個值。

這里和 partitioner 做下區(qū)分蝉揍,有些人可能混淆。
partitioner 是把 所有相同 grade 的對象放到一個 Reducer Task 中畦娄,但聚合還是要根據(jù)相同 key 的又沾,而我們 每個對象都不一樣,所以沒辦法聚合熙卡,所以要使用 GroupingComparator 杖刷。

上代碼:

定義成績信息bean

public class ScoreBean implements WritableComparable<ScoreBean>{  
    private Text grade;  
    private DoubleWritable score;  
  
    public ScoreBean() {  
    }  
    public ScoreBean(Text grade, DoubleWritable score) {  
        set(grade, score);  
    }  
  
    public void set(Text grade, DoubleWritable score) {  
  
        this.grade = grade;  
        this.score = score;  
  
    }  
  
    public Text getGrade() {  
        return grade;  
    }  
  
    public DoubleWritable getScore() {  
        return score;  
    }  
  
    @Override  
    public int compareTo(ScoreBean o) {  
        int cmp = this.grade.compareTo(o.getGrade());  
        if (cmp == 0) {  
  
            cmp = -this.score.compareTo(o.getScore());  
        }  
        return cmp;  
    }  
  
    @Override  
    public void write(DataOutput out) throws IOException {  
        out.writeUTF(grade.toString());  
        out.writeDouble(score.get());  
          
    }  
  
    @Override  
    public void readFields(DataInput in) throws IOException {  
        String readUTF = in.readUTF();  
        double readDouble = in.readDouble();  
          
        this.grade = new Text(readUTF);  
        this.score= new DoubleWritable(readDouble);  
    }  
  
  
    @Override  
    public String toString() {  
        return grade.toString() + "\t" + score.get();  
    }  
}  

自定義partation分片:

public class GradePartitioner extends Partitioner<ScoreBean, NullWritable>{  
  
    @Override  
    public int getPartition(ScoreBean bean, NullWritable value, int numReduceTasks) {  
        //相同grade的成績bean,會發(fā)往相同的partition  
        //而且驳癌,產(chǎn)生的分區(qū)數(shù)滑燃,是會跟用戶設(shè)置的reduce task數(shù)保持一致  
        return (bean.getGrade().hashCode() & Integer.MAX_VALUE) % numReduceTasks;    
    }  
}  

自定義groupingcomparator

public class GradeGroupingComparator extends WritableComparator {  
  
    protected GradeGroupingComparator() {  
  
        super(ScoreBean.class, true);  
    }  
      
    @Override  
    public int compare(WritableComparable a, WritableComparable b) {  
        ScoreBean abean = (ScoreBean) a;  
        ScoreBean bbean = (ScoreBean) b;  
          
        //將grade相同的bean都視為相同,從而聚合為一組  
        return abean.getGrade().compareTo(bbean.getGrade());  
    }  
} 

編寫mapreduce處理流程

public class SecondarySort {  
      
    static class SecondarySortMapper extends Mapper<LongWritable, Text, ScoreBean, NullWritable>{  
          
        ScoreBean bean = new ScoreBean();  
          
        @Override  
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
  
            String line = value.toString();  
            String[] fields = StringUtils.split(line, "\t");  
              
            bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));  
              
            context.write(bean, NullWritable.get());  
              
        }  
          
    }  
      
    static class SecondarySortReducer extends Reducer<ScoreBean, NullWritable, ScoreBean, NullWritable>{  
          
          
        //在設(shè)置了groupingcomparator以后颓鲜,這里收到的kv數(shù)據(jù) 就是:  <1001 87.6>,null  <1001 76.5>,null  ....   
        //此時表窘,reduce方法中的參數(shù)key就是上述kv組中的第一個kv的key:<1001 87.6>  
        //要輸出同一個grade的所有成績中最大金額的那一個,就只要輸出這個key  
        @Override  
        protected void reduce(ScoreBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {  
            context.write(key, NullWritable.get());  
        }  
    }  
      
      
    public static void main(String[] args) throws Exception {  
          
        Configuration conf = new Configuration();  
        Job job = Job.getInstance(conf);  
          
        job.setJarByClass(SecondarySort.class);  
          
        job.setMapperClass(SecondarySortMapper.class);  
        job.setReducerClass(SecondarySortReducer.class);  
          
          
        job.setOutputKeyClass(ScoreBean.class);  
        job.setOutputValueClass(NullWritable.class);  
          
        FileInputFormat.setInputPaths(job, new Path(args[0]));  
        FileOutputFormat.setOutputPath(job, new Path(args[1]));  
        //指定shuffle所使用的GroupingComparator類  
        job.setGroupingComparatorClass(GradeGroupingComparator.class);  
        //指定shuffle所使用的partitioner類  
        job.setPartitionerClass(GradePartitioner.class);  
          
        job.setNumReduceTasks(3);  
          
        job.waitForCompletion(true);  
    }  
}  
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末甜滨,一起剝皮案震驚了整個濱河市乐严,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌衣摩,老刑警劉巖昂验,帶你破解...
    沈念sama閱讀 218,525評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異艾扮,居然都是意外死亡既琴,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評論 3 395
  • 文/潘曉璐 我一進(jìn)店門泡嘴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來甫恩,“玉大人,你說我怎么就攤上這事酌予√钗铮” “怎么了?”我有些...
    開封第一講書人閱讀 164,862評論 0 354
  • 文/不壞的土叔 我叫張陵霎终,是天一觀的道長滞磺。 經(jīng)常有香客問我,道長莱褒,這世上最難降的妖魔是什么击困? 我笑而不...
    開封第一講書人閱讀 58,728評論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上阅茶,老公的妹妹穿的比我還像新娘蛛枚。我一直安慰自己,他們只是感情好脸哀,可當(dāng)我...
    茶點故事閱讀 67,743評論 6 392
  • 文/花漫 我一把揭開白布蹦浦。 她就那樣靜靜地躺著,像睡著了一般撞蜂。 火紅的嫁衣襯著肌膚如雪盲镶。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,590評論 1 305
  • 那天蝌诡,我揣著相機(jī)與錄音溉贿,去河邊找鬼。 笑死浦旱,一個胖子當(dāng)著我的面吹牛宇色,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播颁湖,決...
    沈念sama閱讀 40,330評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼宣蠕,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了甥捺?” 一聲冷哼從身側(cè)響起植影,我...
    開封第一講書人閱讀 39,244評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎涎永,沒想到半個月后思币,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,693評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡羡微,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,885評論 3 336
  • 正文 我和宋清朗相戀三年谷饿,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片妈倔。...
    茶點故事閱讀 40,001評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡博投,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出盯蝴,到底是詐尸還是另有隱情毅哗,我是刑警寧澤,帶...
    沈念sama閱讀 35,723評論 5 346
  • 正文 年R本政府宣布捧挺,位于F島的核電站虑绵,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏闽烙。R本人自食惡果不足惜翅睛,卻給世界環(huán)境...
    茶點故事閱讀 41,343評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧捕发,春花似錦疏旨、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,919評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至法挨,卻和暖如春谁榜,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背坷剧。 一陣腳步聲響...
    開封第一講書人閱讀 33,042評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留喊暖,地道東北人惫企。 一個月前我還...
    沈念sama閱讀 48,191評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像陵叽,于是被迫代替她去往敵國和親狞尔。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,955評論 2 355

推薦閱讀更多精彩內(nèi)容