此次的實驗掂器,由一鍵搭建環(huán)境開始格嘁。
5.1 實驗?zāi)康?/h2>
基于MapReduce思想这橙,編寫WordCount程序。
5.2 實驗要求
1.理解MapReduce編程思想熬丧;
2.會編寫MapReduce版本W(wǎng)ordCount笋粟;
3.會執(zhí)行該程序;
4.自行分析執(zhí)行過程析蝴。
5.3 實驗原理
MapReduce是一種計算模型害捕,簡單的說就是將大批量的工作(數(shù)據(jù))分解(MAP)執(zhí)行,然后再將結(jié)果合并成最終結(jié)果(REDUCE)闷畸。這樣做的好處是可以在任務(wù)被分解后尝盼,可以通過大量機(jī)器進(jìn)行并行計算,減少整個操作的時間佑菩。
適用范圍:數(shù)據(jù)量大盾沫,但是數(shù)據(jù)種類小可以放入內(nèi)存裁赠。
基本原理及要點:將數(shù)據(jù)交給不同的機(jī)器去處理,數(shù)據(jù)劃分赴精,結(jié)果歸約佩捞。
理解MapReduce和Yarn:在新版Hadoop中,Yarn作為一個資源管理調(diào)度框架蕾哟,是Hadoop下MapReduce程序運(yùn)行的生存環(huán)境一忱。其實MapRuduce除了可以運(yùn)行Yarn框架下,也可以運(yùn)行在諸如Mesos谭确,Corona之類的調(diào)度框架上帘营,使用不同的調(diào)度框架,需要針對Hadoop做不同的適配逐哈。
一個完成的MapReduce程序在Yarn中執(zhí)行過程如下:
(1)ResourcManager JobClient向ResourcManager提交一個job仪吧。
(2)ResourcManager向Scheduler請求一個供MRAppMaster運(yùn)行的container,然后啟動它鞠眉。
(3)MRAppMaster啟動起來后向ResourcManager注冊。
(4)ResourcManagerJobClient向ResourcManager獲取到MRAppMaster相關(guān)的信息择诈,然后直接與MRAppMaster進(jìn)行通信械蹋。
(5)MRAppMaster算splits并為所有的map構(gòu)造資源請求。
(6)MRAppMaster做一些必要的MR OutputCommitter的準(zhǔn)備工作羞芍。
(7)MRAppMaster向RM(Scheduler)發(fā)起資源請求哗戈,得到一組供map/reduce task運(yùn)行的container,然后與NodeManager一起對每一個container執(zhí)行一些必要的任務(wù)荷科,包括資源本地化等唯咬。
(8)MRAppMaster 監(jiān)視運(yùn)行著的task 直到完成,當(dāng)task失敗時畏浆,申請新的container運(yùn)行失敗的task胆胰。
(9)當(dāng)每個map/reduce task完成后,MRAppMaster運(yùn)行MR OutputCommitter的cleanup 代碼刻获,也就是進(jìn)行一些收尾工作蜀涨。
(10)當(dāng)所有的map/reduce完成后,MRAppMaster運(yùn)行OutputCommitter的必要的job commit或者abort APIs蝎毡。
(11)MRAppMaster退出厚柳。
5.3.1 MapReduce編程
編寫在Hadoop中依賴Yarn框架執(zhí)行的MapReduce程序,并不需要自己開發(fā)MRAppMaster和YARNRunner沐兵,因為Hadoop已經(jīng)默認(rèn)提供通用的YARNRunner和MRAppMaster程序别垮, 大部分情況下只需要編寫相應(yīng)的Map處理和Reduce處理過程的業(yè)務(wù)程序即可。
編寫一個MapReduce程序并不復(fù)雜扎谎,關(guān)鍵點在于掌握分布式的編程思想和方法碳想,主要將計算過程分為以下五個步驟:
(1)迭代烧董。遍歷輸入數(shù)據(jù),并將之解析成key/value對移袍。
(2)將輸入key/value對映射(map)成另外一些key/value對解藻。
(3)依據(jù)key對中間數(shù)據(jù)進(jìn)行分組(grouping)。
(4)以組為單位對數(shù)據(jù)進(jìn)行歸約(reduce)葡盗。
(5)迭代螟左。將最終產(chǎn)生的key/value對保存到輸出文件中。
5.3.2 Java API解析
(1)InputFormat:用于描述輸入數(shù)據(jù)的格式觅够,常用的為TextInputFormat提供如下兩個功能:
數(shù)據(jù)切分: 按照某個策略將輸入數(shù)據(jù)切分成若干個split胶背,以便確定Map Task個數(shù)以及對應(yīng)的split。
為Mapper提供數(shù)據(jù):給定某個split喘先,能將其解析成一個個key/value對钳吟。
(2)OutputFormat:用于描述輸出數(shù)據(jù)的格式,它能夠?qū)⒂脩籼峁┑膋ey/value對寫入特定格式的文件中窘拯。
(3)Mapper/Reducer: Mapper/Reducer中封裝了應(yīng)用程序的數(shù)據(jù)處理邏輯红且。
(4)Writable:Hadoop自定義的序列化接口。實現(xiàn)該類的接口可以用作MapReduce過程中的value數(shù)據(jù)使用涤姊。
(5)WritableComparable:在Writable基礎(chǔ)上繼承了Comparable接口暇番,實現(xiàn)該類的接口可以用作MapReduce過程中的key數(shù)據(jù)使用。(因為key包含了比較排序的操作)思喊。
5.4 實驗步驟
本實驗主要分為壁酬,確認(rèn)前期準(zhǔn)備,編寫MapReduce程序恨课,打包提交代碼舆乔。查看運(yùn)行結(jié)果這幾個步驟,詳細(xì)如下:
在免密部分剂公,配置好client的就行希俩,其他的不再進(jìn)行操作了
5.4.1 啟動Hadoop
執(zhí)行命令啟動前面實驗部署好的Hadoop系統(tǒng)。
[root@master ~]# cd /usr/cstor/hadoop/
[root@master hadoop]# sbin/start-all.sh
5.4.2 驗證HDFS上沒有wordcount的文件夾
[root@master ~]# cd /usr/cstor/hadoop/
[root@master hadoop]# bin/hadoop fs -ls / #查看HDFS上根目錄文件 /
此時HDFS上應(yīng)該是沒有wordcount文件夾纲辽。
5.4.3 上傳數(shù)據(jù)文件到HDFS
[root@master ~]# cd /usr/cstor/hadoop/
[root@master hadoop]# bin/hadoop fs -put /root/data/5/word /
5.4.4 編寫MapReduce程序
主要編寫Map和Reduce類斜纪,其中Map過程需要繼承org.apache.hadoop.mapreduce包中Mapper類,并重寫其map方法文兑;Reduce過程需要繼承org.apache.hadoop.mapreduce包中Reduce類盒刚,并重寫其reduce方法。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import java.io.IOException;
import java.util.StringTokenizer;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
//map方法绿贞,劃分一行文本因块,讀一個單詞寫出一個<單詞,1>
public void map(Object key, Text value, Context context)throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);//寫出<單詞,1>
}
}
}
//定義reduce類,對相同的單詞籍铁,把它們中的VList值全部相加
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();//相當(dāng)于<Hello,1><Hello,1>涡上,將兩個1相加
}
result.set(sum);
context.write(key, result);//寫出這個單詞趾断,和這個單詞出現(xiàn)次數(shù)<單詞,單詞出現(xiàn)次數(shù)>
}
}
public static void main(String[] args) throws Exception {//主方法吩愧,函數(shù)入口
Configuration conf = new Configuration(); //實例化配置文件類
Job job = new Job(conf, "WordCount"); //實例化Job類
job.setInputFormatClass(TextInputFormat.class); //指定使用默認(rèn)輸入格式類
TextInputFormat.setInputPaths(job, args[0]); //設(shè)置待處理文件的位置
job.setJarByClass(WordCount.class); //設(shè)置主類名
job.setMapperClass(TokenizerMapper.class); //指定使用上述自定義Map類
job.setCombinerClass(IntSumReducer.class); //指定開啟Combiner函數(shù)
job.setMapOutputKeyClass(Text.class); //指定Map類輸出的芋酌,K類型
job.setMapOutputValueClass(IntWritable.class); //指定Map類輸出的,V類型
job.setPartitionerClass(HashPartitioner.class); //指定使用默認(rèn)的HashPartitioner類
job.setReducerClass(IntSumReducer.class); //指定使用上述自定義Reduce類
job.setNumReduceTasks(Integer.parseInt(args[2])); //指定Reduce個數(shù)
job.setOutputKeyClass(Text.class); //指定Reduce類輸出的,K類型
job.setOutputValueClass(Text.class); //指定Reduce類輸出的,V類型
job.setOutputFormatClass(TextOutputFormat.class); //指定使用默認(rèn)輸出格式類
TextOutputFormat.setOutputPath(job, new Path(args[1])); //設(shè)置輸出結(jié)果文件位置
System.exit(job.waitForCompletion(true) ? 0 : 1); //提交任務(wù)并監(jiān)控任務(wù)狀態(tài)
}
}
5.4.5 使用Eclipse開發(fā)工具將該代碼打包
假定打包后的文件名為hdpAction.jar雁佳,主類WordCount位于包njupt下脐帝,則可使用如下命令向YARN集群提交本應(yīng)用。
[root@master ~]# yarn jar hdpAction.jar njupt.WordCount /word /wordcount 1
其中“yarn”為命令糖权,“jar”為命令參數(shù)堵腹,后面緊跟打包后的代碼地址,“njupt”為包名星澳,“WordCount”為主類名疚顷,“/word”為輸入文件在HDFS中的位置,/wordcount為輸出文件在HDFS中的位置禁偎。
5.5 實驗結(jié)果
5.5.1 程序運(yùn)行成功控制臺上的顯示內(nèi)容
如圖5-1所示:
圖5-1
5.5.2 在HDFS上查看結(jié)果
如圖5-2所示:
圖5-2
出現(xiàn)的錯誤:
不是在hadoop目錄下