mapreduce實現(xiàn)流量匯總排序程序

流量匯總程序開發(fā),利用生成好的匯總過的文件接著來進(jìn)行按照總流量由高到低排序纲熏。

因為maptask的最終生成文件中的數(shù)據(jù)是已經(jīng)排序過的妆丘,默認(rèn)就是按照key 歸并排序,所以在傳給reduce task的時候也就是排序過的局劲。所以我們可以將輸出bean作為key勺拣,電話號碼作為value來輸出。既然需要對bean根據(jù)總流量來進(jìn)行排序鱼填,那么可以讓FlowBean來實現(xiàn)WritableComparable接口而不是Writable接口药有,重寫compareTo方法。

public class FlowBean implements WritableComparable<FlowBean>{

    private long upFlow;//上行流量

    private long downFlow;//下行流量

    private long totalFlow;//總流量

    //按照總流量倒序排
    public int compareTo(FlowBean bean) {
        return bean.totalFlow>this.totalFlow?1:-1;
    }

    //序列化時需要無參構(gòu)造方法
    public FlowBean() {
    }

    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.totalFlow = upFlow + downFlow;
    }
    public void setFlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.totalFlow = upFlow + downFlow;
    }
    //序列化方法 hadoop的序列化很簡單苹丸,要傳遞的數(shù)據(jù)寫出去即可
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(totalFlow);
    }
    //反序列化方法 注意:反序列化的順序跟序列化的順序完全一致
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.totalFlow = in.readLong();
    }
    //重寫toString以便展示
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + totalFlow;
    }
    get愤惰,set方法
}

public class FlowCountSort {
    /**
     * KEYIN:默認(rèn)情況下,是mr框架所讀到的一行文本的起始偏移量赘理,Long宦言,但是在hadoop中有自己的
     * 更精簡的序列化接口(Seria會將類結(jié)構(gòu)都序列化,而實際我們只需要序列化數(shù)據(jù))商模,所以不直接用Long奠旺,而用LongWritable
     * VALUEIN:默認(rèn)情況下,是mr框架所讀到的一行文本的內(nèi)容施流,String,同上响疚,用Text
     * KEYOUT:是用戶自定義邏輯處理完成之后輸出數(shù)據(jù)中的key
     * VALUEOUT:是用戶自定義邏輯處理完成之后輸出數(shù)據(jù)中的value
     * @author 12706
     *
     */
    static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
        FlowBean flowBean = new FlowBean();
        Text text = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] infos = line.split("\t");
            //獲取手機(jī)號
            String phoneNum = infos[0];
            //獲取上行流量,下行流量
            String upFlow = infos[1];
            String downFlow = infos[2];
            //設(shè)置總流量
            text.set(phoneNum);
            flowBean.setFlowBean(new Long(upFlow), new Long(downFlow));
            //根據(jù)key進(jìn)行了排序瞪醋,所以需要FlowBean實現(xiàn)WritableComparable接口
            context.write(flowBean, text);
        }
    }
    /**
     * KEYIN VALUEIN對應(yīng)mapper輸出的KEYOUT KEYOUT類型對應(yīng)
     * KEYOUT,VALUEOUT:是自定義reduce邏輯處理結(jié)果的輸出數(shù)據(jù)類型
     * KEYOUT
     * VALUEOUT
     * @author 12706
     *
     */
    static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
        @Override
        protected void reduce(FlowBean key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            //直接寫出去
            context.write(values.iterator().next(), key);
        }
    }
    /**
     * 相當(dāng)于一個yarn集群的客戶端
     * 需要在此封裝mr程序的相關(guān)運(yùn)行參數(shù)稽寒,指定jar包
     * 最后提交給yarn
     * @author 12706
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(FlowCountSort.class);
        //指定本業(yè)務(wù)job要使用的mapper,reducer業(yè)務(wù)類
        job.setMapperClass(FlowCountSortMapper.class);
        job.setReducerClass(FlowCountSortReducer.class);
        //雖然指定了泛型,以防框架使用第三方的類型
        //指定mapper輸出數(shù)據(jù)的kv類型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        //指定最終輸出的數(shù)據(jù)的kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //指定job輸入原始文件所在位置
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job輸入原始文件所在位置
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //將job中配置的相關(guān)參數(shù)以及job所用的java類所在的jar包趟章,提交給yarn去運(yùn)行
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

測試:
將工程打成jar包(flowcount.jar)上傳到linux,啟動hadoop集群。

在/flowcount/output下有匯總過的文件

[root@mini2 ~]# hadoop fs -cat /flowcount/output/part-r-00000
13480253104     180     180     360
13502468823     7335    110349  117684
13560436666     1116    954     2070
13560439658     2034    5892    7926
13602846565     1938    2910    4848
13660577991     6960    690     7650
13719199419     240     0       240
13726230503     2481    24681   27162
13726238888     2481    24681   27162
13760778710     120     120     240
13826544101     264     0       264
13922314466     3008    3720    6728
13925057413     11058   48243   59301
13926251106     240     0       240
13926435656     132     1512    1644
15013685858     3659    3538    7197
15920133257     3156    2936    6092
15989002119     1938    180     2118
18211575961     1527    2106    3633
18320173382     9531    2412    11943
84138413        4116    1432    5548

[root@mini2 ~]# hadoop jar flowcount.jar com.scu.hadoop.mr.FlowCountSort /flowcount/output /flowcount/sortoutput
...
[root@mini2 ~]# hadoop fs -ls /flowcount/sortoutput
Found 2 items
-rw-r--r--   2 root supergroup          0 2017-10-13 04:45 /flowcount/sortoutput/_SUCCESS
-rw-r--r--   2 root supergroup        551 2017-10-13 04:45 /flowcount/sortoutput/part-r-00000
[root@mini2 ~]# hadoop fs -cat /flowcount/sortoutput/part-r-00000
13502468823     7335    110349  117684
13925057413     11058   48243   59301
13726230503     2481    24681   27162
13726238888     2481    24681   27162
18320173382     9531    2412    11943
13560439658     2034    5892    7926
13660577991     6960    690     7650
15013685858     3659    3538    7197
13922314466     3008    3720    6728
15920133257     3156    2936    6092
84138413        4116    1432    5548
13602846565     1938    2910    4848
18211575961     1527    2106    3633
15989002119     1938    180     2118
13560436666     1116    954     2070
13926435656     132     1512    1644
13480253104     180     180     360
13826544101     264     0       264
13719199419     240     0       240
13760778710     120     120     240
13926251106     240     0       240

輸出文件/flowcount/sortoutput/part-r-00000中看到了記錄就是按照總流量由高到低排序慎王。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蚓土,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子赖淤,更是在濱河造成了極大的恐慌蜀漆,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,427評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件咱旱,死亡現(xiàn)場離奇詭異确丢,居然都是意外死亡绷耍,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評論 3 395
  • 文/潘曉璐 我一進(jìn)店門鲜侥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來褂始,“玉大人,你說我怎么就攤上這事描函∑槊纾” “怎么了?”我有些...
    開封第一講書人閱讀 165,747評論 0 356
  • 文/不壞的土叔 我叫張陵舀寓,是天一觀的道長胆数。 經(jīng)常有香客問我,道長互墓,這世上最難降的妖魔是什么必尼? 我笑而不...
    開封第一講書人閱讀 58,939評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮篡撵,結(jié)果婚禮上判莉,老公的妹妹穿的比我還像新娘。我一直安慰自己酸休,他們只是感情好骂租,可當(dāng)我...
    茶點故事閱讀 67,955評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著斑司,像睡著了一般渗饮。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上宿刮,一...
    開封第一講書人閱讀 51,737評論 1 305
  • 那天互站,我揣著相機(jī)與錄音,去河邊找鬼僵缺。 笑死胡桃,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的磕潮。 我是一名探鬼主播翠胰,決...
    沈念sama閱讀 40,448評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼自脯!你這毒婦竟也來了之景?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,352評論 0 276
  • 序言:老撾萬榮一對情侶失蹤膏潮,失蹤者是張志新(化名)和其女友劉穎锻狗,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,834評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡轻纪,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,992評論 3 338
  • 正文 我和宋清朗相戀三年油额,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片刻帚。...
    茶點故事閱讀 40,133評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡潦嘶,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出我擂,到底是詐尸還是另有隱情衬以,我是刑警寧澤,帶...
    沈念sama閱讀 35,815評論 5 346
  • 正文 年R本政府宣布校摩,位于F島的核電站看峻,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏衙吩。R本人自食惡果不足惜互妓,卻給世界環(huán)境...
    茶點故事閱讀 41,477評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望坤塞。 院中可真熱鬧冯勉,春花似錦、人聲如沸摹芙。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,022評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽浮禾。三九已至交胚,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間盈电,已是汗流浹背蝴簇。 一陣腳步聲響...
    開封第一講書人閱讀 33,147評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留匆帚,地道東北人熬词。 一個月前我還...
    沈念sama閱讀 48,398評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像吸重,于是被迫代替她去往敵國和親互拾。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,077評論 2 355

推薦閱讀更多精彩內(nèi)容