mapreduce實(shí)現(xiàn)流量匯總排序程序

流量匯總程序開(kāi)發(fā),利用生成好的匯總過(guò)的文件接著來(lái)進(jìn)行按照總流量由高到低排序并淋。

因?yàn)閙aptask的最終生成文件中的數(shù)據(jù)是已經(jīng)排序過(guò)的,默認(rèn)就是按照key 歸并排序,所以在傳給reduce task的時(shí)候也就是排序過(guò)的欺抗。所以我們可以將輸出bean作為key,電話號(hào)碼作為value來(lái)輸出强重。既然需要對(duì)bean根據(jù)總流量來(lái)進(jìn)行排序绞呈,那么可以讓FlowBean來(lái)實(shí)現(xiàn)WritableComparable接口而不是Writable接口,重寫(xiě)compareTo方法间景。

public class FlowBean implements WritableComparable<FlowBean>{

    private long upFlow;//上行流量

    private long downFlow;//下行流量

    private long totalFlow;//總流量

    //按照總流量倒序排
    public int compareTo(FlowBean bean) {
        return bean.totalFlow>this.totalFlow?1:-1;
    }

    //序列化時(shí)需要無(wú)參構(gòu)造方法
    public FlowBean() {
    }

    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.totalFlow = upFlow + downFlow;
    }
    public void setFlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.totalFlow = upFlow + downFlow;
    }
    //序列化方法 hadoop的序列化很簡(jiǎn)單佃声,要傳遞的數(shù)據(jù)寫(xiě)出去即可
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(totalFlow);
    }
    //反序列化方法 注意:反序列化的順序跟序列化的順序完全一致
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.totalFlow = in.readLong();
    }
    //重寫(xiě)toString以便展示
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + totalFlow;
    }
    get,set方法
}
public class FlowCountSort {
    /**
     * KEYIN:默認(rèn)情況下倘要,是mr框架所讀到的一行文本的起始偏移量圾亏,Long,但是在hadoop中有自己的
     * 更精簡(jiǎn)的序列化接口(Seria會(huì)將類(lèi)結(jié)構(gòu)都序列化,而實(shí)際我們只需要序列化數(shù)據(jù))志鹃,所以不直接用Long父晶,而用LongWritable
     * VALUEIN:默認(rèn)情況下,是mr框架所讀到的一行文本的內(nèi)容弄跌,String,同上甲喝,用Text
     * KEYOUT:是用戶自定義邏輯處理完成之后輸出數(shù)據(jù)中的key
     * VALUEOUT:是用戶自定義邏輯處理完成之后輸出數(shù)據(jù)中的value
     * @author 12706
     *
     */
    static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
        FlowBean flowBean = new FlowBean();
        Text text = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] infos = line.split("\t");
            //獲取手機(jī)號(hào)
            String phoneNum = infos[0];
            //獲取上行流量,下行流量
            String upFlow = infos[1];
            String downFlow = infos[2];
            //設(shè)置總流量
            text.set(phoneNum);
            flowBean.setFlowBean(new Long(upFlow), new Long(downFlow));
            //根據(jù)key進(jìn)行了排序铛只,所以需要FlowBean實(shí)現(xiàn)WritableComparable接口
            context.write(flowBean, text);
        }
    }
    /**
     * KEYIN VALUEIN對(duì)應(yīng)mapper輸出的KEYOUT KEYOUT類(lèi)型對(duì)應(yīng)
     * KEYOUT,VALUEOUT:是自定義reduce邏輯處理結(jié)果的輸出數(shù)據(jù)類(lèi)型
     * KEYOUT
     * VALUEOUT
     * @author 12706
     *
     */
    static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
        @Override
        protected void reduce(FlowBean key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            //直接寫(xiě)出去
            context.write(values.iterator().next(), key);
        }
    }
    /**
     * 相當(dāng)于一個(gè)yarn集群的客戶端
     * 需要在此封裝mr程序的相關(guān)運(yùn)行參數(shù)埠胖,指定jar包
     * 最后提交給yarn
     * @author 12706
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(FlowCountSort.class);
        //指定本業(yè)務(wù)job要使用的mapper,reducer業(yè)務(wù)類(lèi)
        job.setMapperClass(FlowCountSortMapper.class);
        job.setReducerClass(FlowCountSortReducer.class);
        //雖然指定了泛型,以防框架使用第三方的類(lèi)型
        //指定mapper輸出數(shù)據(jù)的kv類(lèi)型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        //指定最終輸出的數(shù)據(jù)的kv類(lèi)型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //指定job輸入原始文件所在位置
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job輸入原始文件所在位置
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //將job中配置的相關(guān)參數(shù)以及job所用的java類(lèi)所在的jar包淳玩,提交給yarn去運(yùn)行
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

測(cè)試:
將工程打成jar包(flowcount.jar)上傳到linux,啟動(dòng)hadoop集群直撤。

在/flowcount/output下有匯總過(guò)的文件

[root@mini2 ~]# hadoop fs -cat /flowcount/output/part-r-00000
13480253104     180     180     360
13502468823     7335    110349  117684
13560436666     1116    954     2070
13560439658     2034    5892    7926
13602846565     1938    2910    4848
13660577991     6960    690     7650
13719199419     240     0       240
13726230503     2481    24681   27162
13726238888     2481    24681   27162
13760778710     120     120     240
13826544101     264     0       264
13922314466     3008    3720    6728
13925057413     11058   48243   59301
13926251106     240     0       240
13926435656     132     1512    1644
15013685858     3659    3538    7197
15920133257     3156    2936    6092
15989002119     1938    180     2118
18211575961     1527    2106    3633
18320173382     9531    2412    11943
84138413        4116    1432    5548
[root@mini2 ~]# hadoop jar flowcount.jar com.scu.hadoop.mr.FlowCountSort /flowcount/output /flowcount/sortoutput
...
[root@mini2 ~]# hadoop fs -ls /flowcount/sortoutput
Found 2 items
-rw-r--r--   2 root supergroup          0 2017-10-13 04:45 /flowcount/sortoutput/_SUCCESS
-rw-r--r--   2 root supergroup        551 2017-10-13 04:45 /flowcount/sortoutput/part-r-00000
[root@mini2 ~]# hadoop fs -cat /flowcount/sortoutput/part-r-00000
13502468823     7335    110349  117684
13925057413     11058   48243   59301
13726230503     2481    24681   27162
13726238888     2481    24681   27162
18320173382     9531    2412    11943
13560439658     2034    5892    7926
13660577991     6960    690     7650
15013685858     3659    3538    7197
13922314466     3008    3720    6728
15920133257     3156    2936    6092
84138413        4116    1432    5548
13602846565     1938    2910    4848
18211575961     1527    2106    3633
15989002119     1938    180     2118
13560436666     1116    954     2070
13926435656     132     1512    1644
13480253104     180     180     360
13826544101     264     0       264
13719199419     240     0       240
13760778710     120     120     240
13926251106     240     0       240

輸出文件/flowcount/sortoutput/part-r-00000中看到了記錄就是按照總流量由高到低排序。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末蜕着,一起剝皮案震驚了整個(gè)濱河市谋竖,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌承匣,老刑警劉巖蓖乘,帶你破解...
    沈念sama閱讀 219,188評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異韧骗,居然都是意外死亡嘉抒,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)袍暴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)些侍,“玉大人,你說(shuō)我怎么就攤上這事政模「谛” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,562評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵淋样,是天一觀的道長(zhǎng)耗式。 經(jīng)常有香客問(wèn)我,道長(zhǎng)习蓬,這世上最難降的妖魔是什么纽什? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,893評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮躲叼,結(jié)果婚禮上芦缰,老公的妹妹穿的比我還像新娘。我一直安慰自己枫慷,他們只是感情好让蕾,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,917評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布浪规。 她就那樣靜靜地躺著,像睡著了一般探孝。 火紅的嫁衣襯著肌膚如雪笋婿。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,708評(píng)論 1 305
  • 那天顿颅,我揣著相機(jī)與錄音缸濒,去河邊找鬼。 笑死粱腻,一個(gè)胖子當(dāng)著我的面吹牛庇配,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播绍些,決...
    沈念sama閱讀 40,430評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼捞慌,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了柬批?” 一聲冷哼從身側(cè)響起啸澡,我...
    開(kāi)封第一講書(shū)人閱讀 39,342評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎氮帐,沒(méi)想到半個(gè)月后嗅虏,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,801評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡揪漩,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,976評(píng)論 3 337
  • 正文 我和宋清朗相戀三年旋恼,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片奄容。...
    茶點(diǎn)故事閱讀 40,115評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖产徊,靈堂內(nèi)的尸體忽然破棺而出昂勒,到底是詐尸還是另有隱情,我是刑警寧澤舟铜,帶...
    沈念sama閱讀 35,804評(píng)論 5 346
  • 正文 年R本政府宣布戈盈,位于F島的核電站,受9級(jí)特大地震影響谆刨,放射性物質(zhì)發(fā)生泄漏塘娶。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,458評(píng)論 3 331
  • 文/蒙蒙 一痊夭、第九天 我趴在偏房一處隱蔽的房頂上張望刁岸。 院中可真熱鬧,春花似錦她我、人聲如沸虹曙。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,008評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)酝碳。三九已至矾踱,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間疏哗,已是汗流浹背呛讲。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,135評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留返奉,地道東北人贝搁。 一個(gè)月前我還...
    沈念sama閱讀 48,365評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像衡瓶,于是被迫代替她去往敵國(guó)和親徘公。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,055評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容