示例一:數(shù)據(jù)排序
1.1 問題描述
“數(shù)據(jù)排序”是許多實(shí)際任務(wù)執(zhí)行時(shí)要完成的第一項(xiàng)工作浸须,比如學(xué)生成績評比惨寿、數(shù)據(jù)建立索引等。
這個(gè)實(shí)例和數(shù)據(jù)去重類似删窒,都是先對原始數(shù)據(jù)進(jìn)行初步處理裂垦,為進(jìn)一步的數(shù)據(jù)操作打好基礎(chǔ)。
1.2 實(shí)例描述
對輸入文件中數(shù)據(jù)進(jìn)行排序肌索。輸入文件中的每行內(nèi)容均為一個(gè)數(shù)字蕉拢,即一個(gè)數(shù)據(jù)。要求在輸出中每行有兩個(gè)間隔的數(shù)字诚亚,其中晕换,第一個(gè)數(shù)字代表原始數(shù)據(jù)在原始數(shù)據(jù)集中的位次,第二個(gè)數(shù)字代表原始數(shù)據(jù)站宗。
INPUT
file1
==========
82
32342
65224
3332
415
742356
223
file2
==========
786
6788678
2342
55
5464
123
file3
==========
12
88
100
OUTPUT
1 12
2 55
3 82
4 88
5 100
6 123
7 223
8 415
9 786
10 2342
11 3332
12 5464
13 32342
14 65224
15 742356
16 6788678
1.3 設(shè)計(jì)思路
這個(gè)實(shí)例僅僅要求對輸入數(shù)據(jù)進(jìn)行排序闸准,熟悉MapReduce過程的讀者會很快想到在MapReduce過程中就有排序,是否可以利用這個(gè)默認(rèn)的排序梢灭,而不需要自己再實(shí)現(xiàn)具體的排序呢夷家?答案是肯定的。但是在使用之前首先需要了解它的默認(rèn)排序規(guī)則敏释。它是按照key值進(jìn)行排序的库快,如果key為封裝int的IntWritable類型,那么MapReduce按照數(shù)字大小對key排序钥顽,如果key為封裝為String的Text類型义屏,那么MapReduce按照字典順序對字符串排序。
了解了這個(gè)細(xì)節(jié),我們就知道應(yīng)該使用封裝int的IntWritable型數(shù)據(jù)結(jié)構(gòu)了闽铐。也就是在map中將讀入的數(shù)據(jù)轉(zhuǎn)化成IntWritable型膀曾,然后作為key值輸出(value任意)。reduce拿到<key阳啥,value-list>之后添谊,將輸入的key作為value輸出,并根據(jù)value-list中元素的個(gè)數(shù)決定輸出的次數(shù)察迟。輸出的key(即代碼中的linenum)是一個(gè)全局變量斩狱,它統(tǒng)計(jì)當(dāng)前key的位次。需要注意的是這個(gè)程序中沒有配置Combiner扎瓶,也就是在MapReduce過程中不使用Combiner所踊。這主要是因?yàn)槭褂胢ap和reduce就已經(jīng)能夠完成任務(wù)了。
1.4 代碼實(shí)現(xiàn)
package sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class DataSort {
public static class Map extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
//實(shí)現(xiàn)Map函數(shù)
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//將輸入的純文本文件的數(shù)據(jù)轉(zhuǎn)化成String,并去掉前后空白
String line = value.toString().trim();
//context.write(line, 1) 輸出以line為key,1為value的數(shù)據(jù)
if(!"".equals(line)) {
context.write(new IntWritable(Integer.parseInt(line)), new IntWritable(1));
}
}
}
public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
IntWritable line = new IntWritable(1);
@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//遍歷概荷,取出每一個(gè)元素
for (IntWritable num:values) {
context.write(line, key);
line = new IntWritable(line.get()+1);
}
}
}
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "192.168.125.129:9000");
Job job = new Job(conf, "Data Sort");
job.setJarByClass(DataSort.class);
//設(shè)置Map和Reduce處理類
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//設(shè)置輸出類型
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//設(shè)置輸入和輸出目錄
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0:1);
}
}
1.5 導(dǎo)出JRE包
在sort.java
文件上點(diǎn)擊右鍵秕岛,選擇導(dǎo)出,彈出界面
選擇Java/JAR文件误证,點(diǎn)擊下一步
選擇要導(dǎo)出的資源和導(dǎo)出的文件位置继薛,點(diǎn)擊完成,導(dǎo)出成功愈捅。
1.6 執(zhí)行Hadoop/MapReduce
1.6.1 上傳文件
將sort.jar
與file1.txt
遏考、file2.txt
和file3.txt
一起上傳到Hadoop集群的master
主機(jī)的/home/hadoop/tasks
目錄下
1.6.2 啟動Hadoop集群
./sbin/start-all.sh
啟動完成
1.6.3 將文件上傳到input
cd ~
cd /home/hadoop/tasks
cat file2.txt >> file1.txt
cat file3.txt >> file1.txt
cat file1.txt
再輸入:
hadoop fs -mkdir -p input
hadoop fs -copyFromLocal word.txt input
可看到在文件系統(tǒng)的/user/hadoop/input
目錄下出現(xiàn)了file1.txt
1.6.4 執(zhí)行Hadoop/MapReduce
./bin/hadoop jar /home/hadoop/tasks/sort.jar sort.DataSort input output
此時(shí)的資源管理界面為
該任務(wù)詳情
可在/user/hadoop/output
文件夾中查看到輸出結(jié)果
點(diǎn)擊
Download
即可啟動下載輸出結(jié)果
1.7 輸出結(jié)果
1 12
2 55
3 82
4 88
5 100
6 123
7 223
8 415
9 786
10 2342
11 3332
12 5464
13 32342
14 65224
15 742356
16 6788678
1.8 參考文檔
https://blog.csdn.net/garychenqin/article/details/48223057
示例二:網(wǎng)站統(tǒng)計(jì)
2.1 任務(wù)描述
1.統(tǒng)計(jì)所有IP對網(wǎng)站的有效瀏覽數(shù)平均數(shù):有效瀏覽數(shù)=瀏覽數(shù)+直接訪問訪問數(shù)+間接訪問數(shù)-閃退數(shù)
2.統(tǒng)計(jì)每個(gè)IP最大有效收藏?cái)?shù)
2.2 輸入樣例
ip編號 瀏覽數(shù) 收藏?cái)?shù) 直接訪問數(shù) 間接訪問數(shù) 閃退數(shù)
ip1 3412344 2424 110 111 990
ip2 12332 25 12 456 230
ip2 535 33 10 3 61
ip1 23424 225 34 5 80
ip5 5677 2 9 76 90
ip6 113 768 435 89 120
ip4 63 133 34 23 21
ip3 5 2 89 56 10
ip3 111115 22 56 67 50
ip6 111 5 67 12 70
ip8 17 3 0 12 71
ip9 3455 0 1 81 90
2.3 代碼實(shí)現(xiàn)
2.3.1 File:FlowBean.java
package statistics;
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import org.apache.hadoop.io.Writable;
public class FlowBean implements Writable {
private String ipNum;
private long aveValidVisitNum;
private long sumValidLikeNum;
// 在反序列化時(shí),反射機(jī)制需要調(diào)用空參構(gòu)造函數(shù)蓝谨,所以顯示定義了一個(gè)空參構(gòu)造函數(shù)
public FlowBean() {
}
// 為了對象數(shù)據(jù)的初始化方便灌具,加入一個(gè)帶參的構(gòu)造函數(shù)
public FlowBean(String ipNum, long aveValidVisitNum, long sumValidLikeNum) {
this.ipNum = ipNum;
this.aveValidVisitNum = aveValidVisitNum;
this.sumValidLikeNum = sumValidLikeNum;
}
// 將對象的數(shù)據(jù)序列化到流中
public void write(DataOutput out) throws IOException {
out.writeUTF(ipNum);
out.writeLong(aveValidVisitNum);
out.writeLong(sumValidLikeNum);
}
//從流中反序列化對象的數(shù)據(jù)
//從數(shù)據(jù)流中讀出對象字段時(shí),必須跟序列化的順序保持一致
public void readFields(DataInput in) throws IOException {
this.ipNum = in.readUTF();
this.aveValidVisitNum = in.readLong();
this.sumValidLikeNum = in.readLong();
}
public String getIPNum() {
return ipNum;
}
public void setIPNum(String ipNum) {
this.ipNum = ipNum;
}
public long getValidVisitNum() {
return aveValidVisitNum;
}
public void setValidVisitNum(long validVisitNum) {
this.aveValidVisitNum = validVisitNum;
}
public long getValidLikeNum() {
return sumValidLikeNum;
}
public void setValidLikeNum(long validLikeNum) {
this.sumValidLikeNum = validLikeNum;
}
public String toString() {
return "" + aveValidVisitNum + "\t" + sumValidLikeNum;
}
}
2.3.2 File:website.java
package statistics;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class website {
FlowBean flowBean = new FlowBean();
public static class Map extends Mapper<LongWritable, Text, Text, FlowBean> {
//實(shí)現(xiàn)Map函數(shù)
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, FlowBean>.Context context)
throws IOException, InterruptedException {
//將輸入的純文本文件的數(shù)據(jù)轉(zhuǎn)化成String
String line = value.toString();
//將輸入的數(shù)據(jù)首先按行進(jìn)行分割
StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");
//分別對每行進(jìn)行處理
while (tokenizerArticle.hasMoreElements()) {
//每行按空格劃分
StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());
String strIPNum = tokenizerLine.nextToken(); //ip地址部分
String strBrowseNum = tokenizerLine.nextToken(); //瀏覽數(shù)
String strLikeNum = tokenizerLine.nextToken(); //收藏?cái)?shù)
String strDirectVisitNum = tokenizerLine.nextToken(); //直接訪問數(shù)
String strIndireVisitNum = tokenizerLine.nextToken(); //間接訪問數(shù)
String strFlashNum = tokenizerLine.nextToken(); //閃退數(shù)
String ipNum = strIPNum;
Long validVisitNum = Long.parseLong(strBrowseNum)+Long.parseLong(strDirectVisitNum)+Long.parseLong(strIndireVisitNum)-Long.parseLong(strFlashNum);
Long likeNum = Long.parseLong(strLikeNum);
//封裝數(shù)據(jù)并輸出ip地址譬巫、有效訪問數(shù)和收藏?cái)?shù)
context.write(new Text(ipNum), new FlowBean(ipNum, validVisitNum, likeNum));
}
}
}
public static class Reduce extends Reducer<Text, FlowBean, Text, FlowBean> {
//框架每傳遞一組數(shù)據(jù)調(diào)用一次reduce方法
//reduce中的業(yè)務(wù)邏輯就是遍歷values咖楣,然后進(jìn)行累加求和再輸出
protected void reduce(Text key, Iterable<FlowBean> values,
Reducer<Text, FlowBean, Text, FlowBean>.Context context)
throws IOException, InterruptedException {
long max=0; //記錄收藏?cái)?shù)最大值
long sumValidVisitNum = 0;
long count = 0;
for (FlowBean value:values) {
sumValidVisitNum += value.getValidVisitNum();
max = max >= value.getValidLikeNum()? max:value.getValidLikeNum();
count++;
}
context.write(key, new FlowBean(key.toString(), sumValidVisitNum/count, max));
}
}
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "192.168.125.129:9000");
Job job = new Job(conf, "statistics");
job.setJarByClass(website.class);
//設(shè)置Map和Reduce處理類
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//設(shè)置輸出類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//設(shè)置輸入和輸出目錄
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0:1);
}
}
2.4 Hadoop啟動核心代碼
cd /usr/local/hadoop2
./sbin/start-all.sh
cd ~
cd /home/hadoop/tasks
hadoop fs -copyFromLocal website.txt input
cd /usr/local/hadoop2
./bin/hadoop jar /home/hadoop/tasks/web.jar statistics.website input output/web_ouput
2.5 輸出數(shù)據(jù)
ip1 1717479 2424
ip2 6528 33
ip3 55664 22
ip4 99 133
ip5 5672 2
ip6 318 768
ip8 -42 3
ip9 3447 0