Hadoop-MapReduce示例

示例一:數(shù)據(jù)排序

1.1 問題描述

“數(shù)據(jù)排序”是許多實(shí)際任務(wù)執(zhí)行時(shí)要完成的第一項(xiàng)工作浸须,比如學(xué)生成績評比惨寿、數(shù)據(jù)建立索引等。
這個(gè)實(shí)例和數(shù)據(jù)去重類似删窒,都是先對原始數(shù)據(jù)進(jìn)行初步處理裂垦,為進(jìn)一步的數(shù)據(jù)操作打好基礎(chǔ)。

1.2 實(shí)例描述

對輸入文件中數(shù)據(jù)進(jìn)行排序肌索。輸入文件中的每行內(nèi)容均為一個(gè)數(shù)字蕉拢,即一個(gè)數(shù)據(jù)。要求在輸出中每行有兩個(gè)間隔的數(shù)字诚亚,其中晕换,第一個(gè)數(shù)字代表原始數(shù)據(jù)在原始數(shù)據(jù)集中的位次,第二個(gè)數(shù)字代表原始數(shù)據(jù)站宗。

INPUT
file1
==========
82
32342
65224
3332
415
742356
223
file2
==========
786
6788678
2342
55
5464
123
file3
==========
12
88
100

OUTPUT
1        12
2        55
3        82
4        88
5       100
6       123
7       223
8       415
9       786
10      2342
11      3332
12      5464
13     32342
14     65224
15     742356
16    6788678

1.3 設(shè)計(jì)思路

這個(gè)實(shí)例僅僅要求對輸入數(shù)據(jù)進(jìn)行排序闸准,熟悉MapReduce過程的讀者會很快想到在MapReduce過程中就有排序,是否可以利用這個(gè)默認(rèn)的排序梢灭,而不需要自己再實(shí)現(xiàn)具體的排序呢夷家?答案是肯定的。但是在使用之前首先需要了解它的默認(rèn)排序規(guī)則敏释。它是按照key值進(jìn)行排序的库快,如果key為封裝int的IntWritable類型,那么MapReduce按照數(shù)字大小對key排序钥顽,如果key為封裝為String的Text類型义屏,那么MapReduce按照字典順序對字符串排序。
了解了這個(gè)細(xì)節(jié),我們就知道應(yīng)該使用封裝int的IntWritable型數(shù)據(jù)結(jié)構(gòu)了闽铐。也就是在map中將讀入的數(shù)據(jù)轉(zhuǎn)化成IntWritable型膀曾,然后作為key值輸出(value任意)。reduce拿到<key阳啥,value-list>之后添谊,將輸入的key作為value輸出,并根據(jù)value-list中元素的個(gè)數(shù)決定輸出的次數(shù)察迟。輸出的key(即代碼中的linenum)是一個(gè)全局變量斩狱,它統(tǒng)計(jì)當(dāng)前key的位次。需要注意的是這個(gè)程序中沒有配置Combiner扎瓶,也就是在MapReduce過程中不使用Combiner所踊。這主要是因?yàn)槭褂胢ap和reduce就已經(jīng)能夠完成任務(wù)了。

1.4 代碼實(shí)現(xiàn)

package sort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class DataSort {
    public static class Map extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
        //實(shí)現(xiàn)Map函數(shù)
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //將輸入的純文本文件的數(shù)據(jù)轉(zhuǎn)化成String,并去掉前后空白
            String line = value.toString().trim();
            //context.write(line, 1) 輸出以line為key,1為value的數(shù)據(jù) 
            if(!"".equals(line)) {
                context.write(new IntWritable(Integer.parseInt(line)), new IntWritable(1));
            }
        }
    }
    
    public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
        IntWritable line = new IntWritable(1);
                
        @Override
        protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
             //遍歷概荷,取出每一個(gè)元素
            for (IntWritable num:values) {
                context.write(line, key);
                line = new IntWritable(line.get()+1);
            }
        }
    }
    
    @SuppressWarnings("deprecation")
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        
        conf.set("mapred.job.tracker", "192.168.125.129:9000");
        
        Job job = new Job(conf, "Data Sort");
        job.setJarByClass(DataSort.class);
        
        //設(shè)置Map和Reduce處理類
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        
        //設(shè)置輸出類型
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        
        //設(shè)置輸入和輸出目錄
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0:1);
    }

}

1.5 導(dǎo)出JRE包

sort.java文件上點(diǎn)擊右鍵秕岛,選擇導(dǎo)出,彈出界面

選擇Java/JAR文件误证,點(diǎn)擊下一步


選擇要導(dǎo)出的資源和導(dǎo)出的文件位置继薛,點(diǎn)擊完成,導(dǎo)出成功愈捅。

1.6 執(zhí)行Hadoop/MapReduce

1.6.1 上傳文件

sort.jarfile1.txt遏考、file2.txtfile3.txt一起上傳到Hadoop集群的master主機(jī)的/home/hadoop/tasks目錄下

1.6.2 啟動Hadoop集群

./sbin/start-all.sh


啟動完成

1.6.3 將文件上傳到input

cd ~
cd /home/hadoop/tasks
cat file2.txt >> file1.txt
cat file3.txt >> file1.txt
cat file1.txt

再輸入:
hadoop fs -mkdir -p input
hadoop fs -copyFromLocal word.txt input

可看到在文件系統(tǒng)的/user/hadoop/input目錄下出現(xiàn)了file1.txt

1.6.4 執(zhí)行Hadoop/MapReduce

./bin/hadoop jar /home/hadoop/tasks/sort.jar sort.DataSort input output


此時(shí)的資源管理界面為
submitted applications

該任務(wù)詳情

可在/user/hadoop/output文件夾中查看到輸出結(jié)果


點(diǎn)擊Download即可啟動下載輸出結(jié)果

1.7 輸出結(jié)果

1   12
2   55
3   82
4   88
5   100
6   123
7   223
8   415
9   786
10  2342
11  3332
12  5464
13  32342
14  65224
15  742356
16  6788678

1.8 參考文檔

https://blog.csdn.net/garychenqin/article/details/48223057

示例二:網(wǎng)站統(tǒng)計(jì)

2.1 任務(wù)描述

1.統(tǒng)計(jì)所有IP對網(wǎng)站的有效瀏覽數(shù)平均數(shù):有效瀏覽數(shù)=瀏覽數(shù)+直接訪問訪問數(shù)+間接訪問數(shù)-閃退數(shù)
2.統(tǒng)計(jì)每個(gè)IP最大有效收藏?cái)?shù)

2.2 輸入樣例

ip編號        瀏覽數(shù)      收藏?cái)?shù)    直接訪問數(shù)    間接訪問數(shù)     閃退數(shù)
ip1          3412344     2424        110           111         990
ip2          12332       25          12            456         230
ip2          535         33          10            3           61
ip1          23424       225         34            5           80
ip5          5677        2           9             76          90
ip6          113         768         435           89          120
ip4          63          133         34            23          21
ip3          5           2           89            56          10
ip3          111115      22          56            67          50
ip6          111         5           67            12          70
ip8          17          3           0             12          71
ip9          3455        0           1             81          90

2.3 代碼實(shí)現(xiàn)

2.3.1 File:FlowBean.java

package statistics;

import java.io.IOException;
import java.io.DataInput;  
import java.io.DataOutput;  


import org.apache.hadoop.io.Writable;

public class FlowBean implements Writable {
    private String ipNum;
    private long aveValidVisitNum;
    private long sumValidLikeNum;
    
     // 在反序列化時(shí),反射機(jī)制需要調(diào)用空參構(gòu)造函數(shù)蓝谨,所以顯示定義了一個(gè)空參構(gòu)造函數(shù)  
    public FlowBean() {  
    }  
    
    // 為了對象數(shù)據(jù)的初始化方便灌具,加入一個(gè)帶參的構(gòu)造函數(shù)  
    public FlowBean(String ipNum, long aveValidVisitNum, long sumValidLikeNum) {  
        this.ipNum = ipNum;  
        this.aveValidVisitNum = aveValidVisitNum;  
        this.sumValidLikeNum = sumValidLikeNum;    
    }
    
    // 將對象的數(shù)據(jù)序列化到流中 
    public void write(DataOutput out) throws IOException {
        out.writeUTF(ipNum);
        out.writeLong(aveValidVisitNum);
        out.writeLong(sumValidLikeNum);
    }
    
    //從流中反序列化對象的數(shù)據(jù)
    //從數(shù)據(jù)流中讀出對象字段時(shí),必須跟序列化的順序保持一致
    public void readFields(DataInput in) throws IOException {
        this.ipNum = in.readUTF();
        this.aveValidVisitNum = in.readLong();
        this.sumValidLikeNum = in.readLong();
    }
    
    
    public String getIPNum() {
        return ipNum;       
    }
    
    public void setIPNum(String ipNum) {
        this.ipNum = ipNum;
    }
    
    public long getValidVisitNum() {
        return aveValidVisitNum;
    }
    
    public void setValidVisitNum(long validVisitNum) {
        this.aveValidVisitNum = validVisitNum;
    }
    
    public long getValidLikeNum() {
        return sumValidLikeNum;
    }
    
    public void setValidLikeNum(long validLikeNum) {
        this.sumValidLikeNum = validLikeNum;
    }
    
    public String toString() {
        return "" + aveValidVisitNum + "\t" + sumValidLikeNum;
    }
}

2.3.2 File:website.java

package statistics;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class website {
    FlowBean flowBean = new FlowBean();
    public static class Map extends Mapper<LongWritable, Text, Text, FlowBean> {
        //實(shí)現(xiàn)Map函數(shù)
        protected void map(LongWritable key, Text value, 
                Mapper<LongWritable, Text, Text, FlowBean>.Context context)
                throws IOException, InterruptedException {
            //將輸入的純文本文件的數(shù)據(jù)轉(zhuǎn)化成String
            String line = value.toString();
            //將輸入的數(shù)據(jù)首先按行進(jìn)行分割
            StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");
            //分別對每行進(jìn)行處理
            while (tokenizerArticle.hasMoreElements()) {
                //每行按空格劃分
                StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());
                String strIPNum = tokenizerLine.nextToken(); //ip地址部分
                String strBrowseNum = tokenizerLine.nextToken(); //瀏覽數(shù)
                String strLikeNum = tokenizerLine.nextToken(); //收藏?cái)?shù)
                String strDirectVisitNum = tokenizerLine.nextToken(); //直接訪問數(shù)
                String strIndireVisitNum = tokenizerLine.nextToken(); //間接訪問數(shù)
                String strFlashNum = tokenizerLine.nextToken(); //閃退數(shù)
                
                String ipNum = strIPNum;
                Long validVisitNum = Long.parseLong(strBrowseNum)+Long.parseLong(strDirectVisitNum)+Long.parseLong(strIndireVisitNum)-Long.parseLong(strFlashNum);
                Long likeNum = Long.parseLong(strLikeNum);
                
                //封裝數(shù)據(jù)并輸出ip地址譬巫、有效訪問數(shù)和收藏?cái)?shù)
                context.write(new Text(ipNum), new FlowBean(ipNum, validVisitNum, likeNum));    
            }
        }       
    }
    
    public static class Reduce extends Reducer<Text, FlowBean, Text, FlowBean> {
        //框架每傳遞一組數(shù)據(jù)調(diào)用一次reduce方法
        //reduce中的業(yè)務(wù)邏輯就是遍歷values咖楣,然后進(jìn)行累加求和再輸出
        
        protected void reduce(Text key, Iterable<FlowBean> values, 
                Reducer<Text, FlowBean, Text, FlowBean>.Context context)
                throws IOException, InterruptedException {
            long max=0; //記錄收藏?cái)?shù)最大值
            long sumValidVisitNum = 0;
            long count = 0;
            
            for (FlowBean value:values) {
                sumValidVisitNum += value.getValidVisitNum();
                max = max >= value.getValidLikeNum()? max:value.getValidLikeNum();
                count++;
                }
            
            context.write(key, new FlowBean(key.toString(), sumValidVisitNum/count, max));
        }
    }
    
    @SuppressWarnings("deprecation")
                
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            
            conf.set("mapred.job.tracker", "192.168.125.129:9000");
            
            Job job = new Job(conf, "statistics");
            job.setJarByClass(website.class);
            
            //設(shè)置Map和Reduce處理類
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
            
            //設(shè)置輸出類型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
            
            //設(shè)置輸入和輸出目錄
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true) ? 0:1);
            }
    }

2.4 Hadoop啟動核心代碼

cd /usr/local/hadoop2
./sbin/start-all.sh
cd ~
cd /home/hadoop/tasks
hadoop fs -copyFromLocal website.txt input
cd /usr/local/hadoop2
./bin/hadoop jar /home/hadoop/tasks/web.jar statistics.website input output/web_ouput

2.5 輸出數(shù)據(jù)

ip1   1717479       2424
ip2   6528          33
ip3   55664         22
ip4   99            133
ip5   5672          2
ip6   318           768
ip8   -42           3
ip9   3447          0

2.6 參考文檔

https://blog.csdn.net/xw_classmate/article/details/50639848

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市芦昔,隨后出現(xiàn)的幾起案子诱贿,更是在濱河造成了極大的恐慌,老刑警劉巖烟零,帶你破解...
    沈念sama閱讀 216,324評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件瘪松,死亡現(xiàn)場離奇詭異咸作,居然都是意外死亡锨阿,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評論 3 392
  • 文/潘曉璐 我一進(jìn)店門记罚,熙熙樓的掌柜王于貴愁眉苦臉地迎上來墅诡,“玉大人,你說我怎么就攤上這事∧┰纾” “怎么了烟馅?”我有些...
    開封第一講書人閱讀 162,328評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長然磷。 經(jīng)常有香客問我郑趁,道長,這世上最難降的妖魔是什么姿搜? 我笑而不...
    開封第一講書人閱讀 58,147評論 1 292
  • 正文 為了忘掉前任寡润,我火速辦了婚禮,結(jié)果婚禮上舅柜,老公的妹妹穿的比我還像新娘梭纹。我一直安慰自己,他們只是感情好致份,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,160評論 6 388
  • 文/花漫 我一把揭開白布变抽。 她就那樣靜靜地躺著,像睡著了一般氮块。 火紅的嫁衣襯著肌膚如雪绍载。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,115評論 1 296
  • 那天滔蝉,我揣著相機(jī)與錄音逛钻,去河邊找鬼。 笑死锰提,一個(gè)胖子當(dāng)著我的面吹牛曙痘,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播立肘,決...
    沈念sama閱讀 40,025評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼边坤,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了谅年?” 一聲冷哼從身側(cè)響起茧痒,我...
    開封第一講書人閱讀 38,867評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎融蹂,沒想到半個(gè)月后旺订,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,307評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡超燃,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,528評論 2 332
  • 正文 我和宋清朗相戀三年区拳,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片意乓。...
    茶點(diǎn)故事閱讀 39,688評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡樱调,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情笆凌,我是刑警寧澤圣猎,帶...
    沈念sama閱讀 35,409評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站乞而,受9級特大地震影響送悔,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜爪模,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,001評論 3 325
  • 文/蒙蒙 一放祟、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧呻右,春花似錦跪妥、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,657評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至落塑,卻和暖如春纽疟,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背憾赁。 一陣腳步聲響...
    開封第一講書人閱讀 32,811評論 1 268
  • 我被黑心中介騙來泰國打工污朽, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人龙考。 一個(gè)月前我還...
    沈念sama閱讀 47,685評論 2 368
  • 正文 我出身青樓蟆肆,卻偏偏與公主長得像,于是被迫代替她去往敵國和親晦款。 傳聞我的和親對象是個(gè)殘疾皇子炎功,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,573評論 2 353

推薦閱讀更多精彩內(nèi)容