統(tǒng)計一個文件中相同單詞的個數(shù)
使用工具:IDEA+Maven
實驗基礎(chǔ):必須在服務(wù)器上搭建好hadoop的hdfs環(huán)境和yarn環(huán)境逸月,具體搭建方式:http://www.reibang.com/p/59cb748bddec
運行環(huán)境:VM虛擬機上的一個linux系統(tǒng)運行hadoop吃挑,在window10下的idea中進行開發(fā)
開發(fā)步驟:
1茴丰,在pom.xml文件中導(dǎo)入hadoop的依賴包
<properties>
<hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
</properties>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
2卫旱,新建一個WordCountApp這個類望伦,用來統(tǒng)計一個文件中相同單詞的個數(shù)纳猫,并將結(jié)果保存到一個文件中
package com.zhx.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @program: hdfs
* @description: this is a MAPREDUCE‘s test project
* @author: zhx
* @create: 2021-03-09 18:09
**/
public class WordCountApp {
/**
* map讀取輸入文件
* keyInput:LongWritable蓖扑,文件的偏移量,開始為0霞幅,后面的key是每一行的字符量的累加
* valueInput:Text漠吻,每一行的值
* keyOutput:Text,輸出是每個單詞的字符串
* valueOutput:LongWritable司恳,是每個單詞的個數(shù)
*/
public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
LongWritable one = new LongWritable(1);
//獲取一行數(shù)據(jù)
String line = value.toString();
//每一行按照空格進行分割開來
String[] words = line.split(" ");
for (String word : words) {
context.write(new Text(word), one);
}
}
}
/**
* reduce:歸并操作
* map的輸出就是reduce的輸入途乃,所以reduce的
* keyInput:Text
* valueInput:LongWritable
* keyOutput:Text
* valueOutput:LongWritable
*/
public static class MyReduce extends Reducer<Text,LongWritable,Text,LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum=0;
for (LongWritable value : values) {
sum+=value.get();
}
context.write(key,new LongWritable(sum));
}
}
/**
* Driver,封裝了MapReduce作業(yè)的所有信息
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//創(chuàng)建configuration
Configuration configuration = new Configuration();
//創(chuàng)建job
Job job = Job.getInstance(configuration, "wordcount");
//設(shè)置job處理類
job.setJarByClass(WordCountApp.class);
//設(shè)置作業(yè)處理輸入路徑
FileInputFormat.setInputPaths(job,new Path(args[0]));
//設(shè)置map相關(guān)系數(shù)
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//設(shè)置reduce的相關(guān)參數(shù)
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//設(shè)置作業(yè)處理輸出路徑
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//提交作業(yè)
System.exit(job.waitForCompletion(true)?0:1);
}
}
3,需要把這個文件打包上傳到服務(wù)器中進行運行扔傅,具體操作
1耍共,進入這個項目的根目錄烫饼,打開shell窗口,執(zhí)行
mvn clean package -DskipTests
试读,這個命令杠纵,然后可以看到在target目錄下有一個"項目名-1.0.jar"這個文件,說明第一步成了
2钩骇,繼續(xù)執(zhí)行scp .\target\hdfs-1.0.jar root@zhx:/home/zhx/lib/
這個命令比藻,lib目錄是你的服務(wù)器上要存放這個文件的位置,把第一步生成的.jar文件上傳到服務(wù)器上
3倘屹,執(zhí)行命令hadoop jar /home/zhx/lib/hdfs-1.0.jar com.zhx.hadoop.mapreduce.WordCountApp hdfs://zhx:8020/mapreduce_test1.txt hdfs://zhx:8020/output/wc/
银亲,如果你的hdfs上沒有mapreduce_test1.txt這個文件,可以先把這個文件上傳到hdfs中纽匙,上傳命令hadoop fs -put mapreduce_test1.txt在服務(wù)器上的的路徑 /
务蝠,/是hdfs上根路徑,直接上傳到hdfs的根路徑就可以了烛缔,為了以后方便直接把上面的命令保存到wordcountapp.sh這個文件中馏段,然后賦予這個文件執(zhí)行的權(quán)限chmod u+x wordcountapp.sh
,到時候可以直接./wordcountapp.sh践瓷,就可以執(zhí)行了
4院喜,mapreduce_test1.txt的文件內(nèi)容hello mapreduce hdfs mapreduce yarn hello is a hdfs yarn mapreduce
5,查看是否成功晕翠,執(zhí)行
hadoop fs -ls /output/wc
這個命令够坐,看是否存在part-r-00000這個文件,然后查看這個文件hadoop fs -cat /output/wc/part-r-00000
崖面,內(nèi)容如下:a 1 hdfs 2 hello 2 is 1 mapreduce 3 yarn 2
4,上述3步已經(jīng)基本上開發(fā)完成了梯影,但是還是存在一些問題的
如果此刻繼續(xù)執(zhí)行wordcountapp.sh這個文件會出現(xiàn)錯誤巫员,這個錯誤是說hdfs上已經(jīng)存在了輸出文件夾了,在MapReduce中輸出文件夾不能事先存在
錯誤日志
因此有兩種解決辦法:1甲棍,是手動在shell中將輸出文件目錄刪除简识,執(zhí)行hadoop fs -rm -R /output
2,是在代碼中完成刪除功能//創(chuàng)建configuration Configuration configuration = new Configuration(); //準備清理已存在的輸出目錄 Path outpath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(configuration); if (fileSystem.exists(outpath)) { fileSystem.delete(outpath, true); System.out.println("已經(jīng)刪除了存在的目錄"); }
把這段代碼加入到wordcountapp中的main函數(shù)中感猛,然后重新打包上傳七扰,執(zhí)行wordcountapp.sh這個文件,看是否出錯
5陪白,繼續(xù)優(yōu)化颈走,引入combiner,這個可以本地reduce咱士,從而可以減少MapTask輸出的數(shù)據(jù)量以及網(wǎng)絡(luò)的傳輸量立由,他可以先把每個Mapper上的詞先和合并一波轧钓,然后把合并后的傳給reduce
combiner.png
使用場景:求和,次數(shù)锐膜,加法這些可以毕箍,但是平均數(shù)就不行
把這個job.setCombinerClass(MyReduce.class);
加入到main中//設(shè)置map相關(guān)系數(shù) job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //通過job設(shè)置combine處理類,其實邏輯上與reduce一致 job.setCombinerClass(MyReduce.class);
然后執(zhí)行打包上傳道盏,執(zhí)行.sh文件而柑,查看執(zhí)行過程中的日志
combiner執(zhí)行結(jié)果
只要這兩個不為0說明combiner起作用了
6,partitioner分區(qū)器
shuffle是通過分區(qū)partitioner 分配給Reduce的荷逞,一個Reducer對應(yīng)一個記錄文件媒咳,Partitioner是shuffle的一部分,partitioner執(zhí)行時機:在mapper執(zhí)行完成颅围,Reducer還沒有執(zhí)行的時候伟葫,mapper的輸出就是partitioner的輸入 即<k2,v2>partitioner 分區(qū)主要是用來提高效率的 ,例如從全國基站的數(shù)據(jù)中查找北京基站的數(shù)據(jù)院促,如果計算時不分區(qū)全國的數(shù)據(jù)都放在一起筏养,查詢的時候就相當于全表掃描 效率非常低,如果在第一次進行Mapreducer計算的時候按照省市進行分區(qū)常拓,每個城市的基站數(shù)據(jù)都存儲在對應(yīng)的每個文件渐溶,那么下次再進行查詢的時候直接從北京分區(qū)里直接查找 效率很高。分區(qū)的依據(jù)是具體業(yè)務(wù)需求弄抬,可以按照省市分區(qū)茎辐,時間進行分區(qū)等。如果不手動進行分區(qū)掂恕,Hadoop有一個默認的分區(qū)規(guī)則Partitioner是partitioner的基類拖陆,如果需要定制partitioner也需要繼承該類。HashPartitioner是mapreduce的默認partitioner懊亡。計算方法是which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks依啰,得到當前的目的reducer。
新建一個PartitonerApp這個類
package com.zhx.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @program: hdfs
* @description: this is a MAPREDUCE‘s test project
* @author: zhx
* @create: 2021-03-09 18:09
**/
public class PartitonerApp {
/**
* map讀取輸入文件
* keyInput:LongWritable店枣,文件的偏移量速警,開始為0,后面的key是每一行的字符量的累加
* valueInput:Text鸯两,每一行的值
* keyOutput:Text闷旧,輸出是每個單詞的字符串
* valueOutput:LongWritable,是每個單詞對應(yīng)的value值
* 這是測試 數(shù)據(jù)钧唐,一個文件中忙灼,第一列式手機品牌,第二列是銷售數(shù)逾柿,統(tǒng)計每種手機總的銷售數(shù)缀棍,這里輸出key是品牌字符串宅此,value是銷售數(shù)
* huawei 100
xiaomi 100
huawei 200
xiaomi 300
nojiya 100
iphone 200
huawei 200
iphone 200
*/
public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
LongWritable one = new LongWritable(1);
//獲取一行數(shù)據(jù)
String line = value.toString();
//每一行按照空格進行分割開來
String[] words = line.split(" ");
context.write(new Text(words[0]), new LongWritable(Long.parseLong(words[1])));
}
}
/**
* reduce:歸并操作
* map的輸出就是reduce的輸入,所以reduce的
* keyInput:Text
* valueInput:LongWritable
* keyOutput:Text
* valueOutput:LongWritable
*/
public static class MyReduce extends Reducer<Text,LongWritable,Text,LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum=0;
for (LongWritable value : values) {
sum+=value.get();
}
context.write(key,new LongWritable(sum));
}
}
public static class MyPartitioner extends Partitioner<Text,LongWritable>{
@Override
public int getPartition(Text key, LongWritable value, int i) {
if(key.toString().equals("huawei")){
return 0;
}
if(key.toString().equals("xiaomi")){
return 1;
}
if(key.toString().equals("iphone")){
return 2;
}
return 3;
}
}
/**
* Driver,封裝了MapReduce作業(yè)的所有信息
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//創(chuàng)建configuration
Configuration configuration = new Configuration();
//準備清理已存在的輸出目錄
Path outpath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if (fileSystem.exists(outpath)) {
fileSystem.delete(outpath, true);
System.out.println("已經(jīng)刪除了存在的目錄");
}
//創(chuàng)建job
Job job = Job.getInstance(configuration, "wordcount");
//設(shè)置job處理類
job.setJarByClass(PartitonerApp.class);
//設(shè)置作業(yè)處理輸入路徑
FileInputFormat.setInputPaths(job,new Path(args[0]));
//設(shè)置map相關(guān)系數(shù)
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//通過job設(shè)置combine處理類爬范,其實邏輯上與reduce一致
job.setCombinerClass(MyReduce.class);
//設(shè)置reduce的相關(guān)參數(shù)
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//設(shè)置job的partitioner的處理類
job.setPartitionerClass(MyPartitioner.class);
//設(shè)置4個reducer父腕,每個分區(qū)一個
job.setNumReduceTasks(4);
//設(shè)置作業(yè)處理輸出路徑
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//提交作業(yè)
System.exit(job.waitForCompletion(true)?0:1);
}
}
然后打包上傳,執(zhí)行partitonerapp.sh文件青瀑,這個文件內(nèi)容hadoop jar /home/zhx/lib/hdfs-1.0.jar com.zhx.hadoop.mapreduce.PartitonerApp hdfs://zhx:8020/mapreduce_partitioner.txt hdfs://zhx:8020/output/wc/
璧亮,此時輸出文件夾下會有4個文件,分別是
配置jobhistory
記錄已運行完的MapReduce信息到指定的hdfs目錄下斥难,默認是不開啟的枝嘶,可以http://zhx:8088/打開yarn的網(wǎng)址,然后運行一個pi的job哑诊,可以查看Tracking UI這列下的history群扶,是否可以打開,打不開說明需要配置
mapreduce-site.xml
<property>
<name>mapreduce.jobhistory.address</name>
<value>zhx:10020</value>
<description>mapreduce jobhistory server ip(host:port)</description>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>zhx:19888</value>
<description>mapreduce jobhistory server web ui host:port</description>
</property>
<property>
<name>mapreduce.jobhistory.done-dir</name>
<value>/history</value>
</property>
<property>
<name>mapreduce.jobhistory.intermediate-done-dir</name>
<value>/history/done_intermediate</value>
</property>
yarn-site.xml
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
然后停掉yarn镀裤,在重啟yarn竞阐,在啟動sbin目錄下的
./mr-jobhistory-daemon.sh start historyserver
然后jps,看是否有 JobHistoryServer這個進程
最后可以在yarn的瀏覽器中查看執(zhí)行的日志了
image.png