舉例理解MapReduce—排序

例子實現目標

該代碼實現的是在輸入的數據對中彪标,先以第一列由小到大排序仿便,如果第一列值相等谦去,以第二列由小到大排序憎蛤。即:
添加cp.txt文件到input文件夾

$vim cp.txt
$hadoop fs -put cp.txt /input/

5,1
3,2
1,3
4,3
2,3
1,4
1,2
2,5

輸出結果

1,2
1,3
1,4
2,3
2,5,
3,2
4,3
5,1

附圖:

image.png

實踐例子

1.終端執(zhí)行>start-all.sh
2.input文件夾下增加cp.txt文件
3.打開eclipse
4.新建mapreduce項目外傅,新建包(命名mr),新建類(命名MySortClass )類代碼如下:
5.右鍵俩檬,選擇run as hadoop
6.右鍵refresh一下hadoop文件萎胰,成功后output下會出現成功排序的結果文件

package mr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import mr.MyWordCount.MyMapper;
import mr.MyWordCount.MyReduce;

public class MySortClass {
    
    static class MySortMapper  extends  Mapper<LongWritable, Text, A, NullWritable>{  
        
         public void map(LongWritable k1, Text v1, Context context) 
                         throws java.io.IOException, java.lang.InterruptedException
         {
            String[]  lines= v1.toString().split(",");
             
                A  a1=new A(Long.parseLong(lines[0]),Long.parseLong(lines[1]));
             
            context.write(a1, NullWritable.get());
            System.out.println("map......");
         }
        
    }
    
    static class  MySortReduce extends Reducer<A, NullWritable, Text, Text>{
         public void reduce(A k2, Iterable<NullWritable> v2, Context context) throws java.io.IOException, java.lang.InterruptedException
         {
              
              
             context.write(new Text(new Long(k2.firstNum).toString()), new Text(new Long(k2.secondNum).toString()));    
             
             System.out.println("reduce......");
         }
            
    }
    
    private static class  A implements WritableComparable<A> {
        long firstNum;
        long secondNum;
 
        public A() {
        }
 
        public A(long first, long second) {
            firstNum = first;
            secondNum = second;
        }
 
       
        public void write(DataOutput out) throws IOException {
            out.writeLong(firstNum);
            out.writeLong(secondNum);
        }
    
        public void readFields(DataInput in) throws IOException {
            firstNum = in.readLong();
            secondNum = in.readLong();
        }
 
        /*
         * 當key進行排序時會調用以下這個compreTo方法
         */
        @Override
        public int compareTo(A anotherKey) {
            long min = firstNum - anotherKey.firstNum;
            if (min != 0) {
                // 說明第一列不相等,則返回兩數之間小的數
                return (int) min;
            } else {
                return (int) (secondNum - anotherKey.secondNum);
            }
        }
    }
    private static String INPUT_PATH="hdfs://master:9000/input/cp.txt";
    private static String OUTPUT_PATH="hdfs://master:9000/output/c/";

    public static void main(String[] args) throws  Exception {
        Configuration  conf=new Configuration();
        FileSystem  fs=FileSystem.get(new URI(OUTPUT_PATH),conf);
     
        if(fs.exists(new Path(OUTPUT_PATH)))
                fs.delete(new Path(OUTPUT_PATH));
        
        Job  job=new Job(conf,"myjob");
        
        job.setJarByClass(MySortClass.class);
        job.setMapperClass(MySortMapper.class);
        job.setReducerClass(MySortReduce.class);
        //job.setCombinerClass(MySortReduce.class);
         
    
        job.setMapOutputKeyClass(A.class);
        job.setMapOutputValueClass(NullWritable.class); 
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

    /*如果map和reduce的<key,value>類型是一樣的棚辽,
則僅設置job.setOutputKeyClass();job.setOutputValueClass();即可*/

        FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
        
        job.waitForCompletion(true);

    }

}

部分代碼理解

類A實現了WritableComparable技竟,設置了兩個屬性firstNum; secondNum;

String[] lines= v1.toString().split(",");

讀取一行(5,1)以逗號分隔,兩個元素(5)(1)存入數組lines

A a1=new A(Long.parseLong(lines[0]),Long.parseLong(lines[1]));

Long.parseLong(lines[0])將string類型的“5”轉化為long類型屈藐,a1.firstNum=5;a1.secondNum=1;

context.write(a1, NullWritable.get());

寫入上下文榔组,設置map的輸出為<key,空>,不能使用new NullWritable()來定義,獲取空值只能NullWritable.get()來獲取

context.write(new Text(new Long(k2.firstNum).toString()), new Text(new Long(k2.secondNum).toString()));

reduce生成新的鍵值對联逻,如:將<(5,1),null>轉化為<5,1>

以map->reduce集群處理流程理解該例子(假設文件龐大)

1.首先對輸入文件分片(inputSplit)搓扯,假設分片大小為三行,那么分為三片:

5,1
3,2
1,3

4,3
2,3
1,4

1,2
2,5

2.三片交由三個map進程處理包归,生成鍵值對<a1,null>锨推,為減少帶寬負荷,在本地節(jié)點上做了排序公壤,分區(qū)(partitioner换可,數據做了分區(qū)標記)輸出結果:
(如果有需要在分區(qū)之前還可以進行combiner(本地reduce操作,詳情請見文章《了解MapReduce》底部對combiner的解釋)厦幅,這里分區(qū)之前不需要combiner)

<(1,3)锦担,null>
<(3,2),null>
<(5,1)慨削,null>

<(1,4)洞渔,null>
<(2,3)套媚,null>
<(4,3),null>

<(1,2)磁椒,null>
<(2,5)堤瘤,null>

  1. 然后就是所有節(jié)點洗牌(shuffle),將各個節(jié)點上同個分區(qū)的數據放置到一個節(jié)點中浆熔,放置過去后做了排序:

<(1,2)本辐,null>
<(1,3),null>
<(1,4)医增,null>

<(2,3)慎皱,null>
<(2,5),null>

<(3,2)叶骨,null>

<(4,3)茫多,null>

<(5,1),null>

  1. 最后就是reduce忽刽,生成新鍵值對并生成最后排序結果

(1,2)
(1,3)
(1,4)
(2,3)
(2,5)
(3,2)
(4,3)
(5,1)

總的來說就是:map(本地)->combiner(本地)->partitioner(本地)->shuffle(集群)->reduce(新本地)天揖,各部分又還有細節(jié)操作,combiner和partitioner屬于map階段的跪帝,shuffle屬于reduce階段的今膊。
附圖理解:

Paste_Image.png
因為對hadoop源碼還不是很熟悉,所以不能很好地解釋代碼伞剑,歡迎大家建議和指導斑唬。
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市黎泣,隨后出現的幾起案子恕刘,更是在濱河造成了極大的恐慌,老刑警劉巖聘裁,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件雪营,死亡現場離奇詭異,居然都是意外死亡衡便,警方通過查閱死者的電腦和手機献起,發(fā)現死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來镣陕,“玉大人谴餐,你說我怎么就攤上這事〈粢郑” “怎么了岂嗓?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長鹊碍。 經常有香客問我厌殉,道長食绿,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任公罕,我火速辦了婚禮器紧,結果婚禮上,老公的妹妹穿的比我還像新娘楼眷。我一直安慰自己铲汪,他們只是感情好,可當我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布罐柳。 她就那樣靜靜地躺著掌腰,像睡著了一般。 火紅的嫁衣襯著肌膚如雪张吉。 梳的紋絲不亂的頭發(fā)上齿梁,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天,我揣著相機與錄音芦拿,去河邊找鬼士飒。 笑死查邢,一個胖子當著我的面吹牛蔗崎,可吹牛的內容都是我干的。 我是一名探鬼主播扰藕,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼缓苛,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了邓深?” 一聲冷哼從身側響起未桥,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎芥备,沒想到半個月后冬耿,有當地人在樹林里發(fā)現了一具尸體,經...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡萌壳,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年亦镶,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片袱瓮。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡缤骨,死狀恐怖,靈堂內的尸體忽然破棺而出尺借,到底是詐尸還是另有隱情绊起,我是刑警寧澤,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布燎斩,位于F島的核電站虱歪,受9級特大地震影響蜂绎,放射性物質發(fā)生泄漏。R本人自食惡果不足惜笋鄙,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一荡碾、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧局装,春花似錦坛吁、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至宣增,卻和暖如春玫膀,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背爹脾。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工帖旨, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人灵妨。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓解阅,卻偏偏與公主長得像,于是被迫代替她去往敵國和親泌霍。 傳聞我的和親對象是個殘疾皇子货抄,可洞房花燭夜當晚...
    茶點故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內容