大家好搀捷,我是Iggi。
今天我給大家分享的是MapReduce2-3.1.1版本的Word Count Ver1.0實(shí)驗(yàn)示例默怨。
首先用一段文字簡介MapReduce:
MapReduce最早是由Google公司研究提出的一種面向大規(guī)模數(shù)據(jù)處理的并行計(jì)算模型和方法樟氢。Google公司設(shè)計(jì)MapReduce的初衷主要是為了解決其搜索引擎中大規(guī)模網(wǎng)頁數(shù)據(jù)的并行化處理。Google公司發(fā)明了MapReduce之后首先用其重新改寫了其搜索引擎中的Web文檔索引處理系統(tǒng)镇饺。但由于MapReduce可以普遍應(yīng)用于很多大規(guī)模數(shù)據(jù)的計(jì)算問題乎莉,因此自發(fā)明MapReduce以后,Google公司內(nèi)部進(jìn)一步將其廣泛應(yīng)用于很多大規(guī)模數(shù)據(jù)處理問題奸笤。到目前為止惋啃,Google公司內(nèi)有上萬個(gè)各種不同的算法問題和程序都使用MapReduce進(jìn)行處理。
2004年监右,開源項(xiàng)目Lucene(搜索索引程序庫)和Nutch(搜索引擎)的創(chuàng)始人Doug Cutting發(fā)現(xiàn)MapReduce正是其所需要的解決大規(guī)模Web數(shù)據(jù)處理的重要技術(shù)边灭,因而模仿Google MapReduce,基于Java設(shè)計(jì)開發(fā)了一個(gè)稱為Hadoop的開源MapReduce并行計(jì)算框架和系統(tǒng)健盒。自此绒瘦,Hadoop成為Apache開源組織下最重要的項(xiàng)目,自其推出后很快得到了全球?qū)W術(shù)界和工業(yè)界的普遍關(guān)注扣癣,并得到推廣和普及應(yīng)用惰帽。
MapReduce的推出給大數(shù)據(jù)并行處理帶來了巨大的革命性影響,使其已經(jīng)成為事實(shí)上的大數(shù)據(jù)處理的工業(yè)標(biāo)準(zhǔn)搏色。盡管MapReduce還有很多局限性善茎,但人們普遍公認(rèn),MapReduce是到目前為止最為成功频轿、最廣為接受和最易于使用的大數(shù)據(jù)并行處理技術(shù)垂涯。MapReduce的發(fā)展普及和帶來的巨大影響遠(yuǎn)遠(yuǎn)超出了發(fā)明者和開源社區(qū)當(dāng)初的意料烁焙,以至于馬里蘭大學(xué)教授、2010年出版的《Data-Intensive Text Processing with MapReduce》一書的作者Jimmy Lin在書中提出:MapReduce改變了我們組織大規(guī)模計(jì)算的方式耕赘,它代表了第一個(gè)有別于馮·諾依曼結(jié)構(gòu)的計(jì)算模型骄蝇,是在集群規(guī)模而非單個(gè)機(jī)器上組織大規(guī)模計(jì)算的新的抽象模型上的第一個(gè)重大突破,是到目前為止所見到的最為成功的基于大規(guī)模計(jì)算資源的計(jì)算模型操骡。
MapReduce是面向大數(shù)據(jù)并行處理的計(jì)算模型九火、框架和平臺(tái),它隱含了以下三層含義:
1)MapReduce是一個(gè)基于集群的高性能并行計(jì)算平臺(tái)(Cluster Infrastructure)册招。它允許用市場上普通的商用服務(wù)器構(gòu)成一個(gè)包含數(shù)十岔激、數(shù)百至數(shù)千個(gè)節(jié)點(diǎn)的分布和并行計(jì)算集群。
2)MapReduce是一個(gè)并行計(jì)算與運(yùn)行軟件框架(Software Framework)是掰。它提供了一個(gè)龐大但設(shè)計(jì)精良的并行計(jì)算軟件框架虑鼎,能自動(dòng)完成計(jì)算任務(wù)的并行化處理,自動(dòng)劃分計(jì)算數(shù)據(jù)和計(jì)算任務(wù)键痛,在集群節(jié)點(diǎn)上自動(dòng)分配和執(zhí)行任務(wù)以及收集計(jì)算結(jié)果炫彩,將數(shù)據(jù)分布存儲(chǔ)、數(shù)據(jù)通信絮短、容錯(cuò)處理等并行計(jì)算涉及到的很多系統(tǒng)底層的復(fù)雜細(xì)節(jié)交由系統(tǒng)負(fù)責(zé)處理江兢,大大減少了軟件開發(fā)人員的負(fù)擔(dān)。
3)MapReduce是一個(gè)并行程序設(shè)計(jì)模型與方法(Programming Model & Methodology)丁频。它借助于函數(shù)式程序設(shè)計(jì)語言Lisp的設(shè)計(jì)思想杉允,提供了一種簡便的并行程序設(shè)計(jì)方法,用Map和Reduce兩個(gè)函數(shù)編程實(shí)現(xiàn)基本的并行計(jì)算任務(wù)席里,提供了抽象的操作和并行編程接口夺颤,以簡單方便地完成大規(guī)模數(shù)據(jù)的編程和計(jì)算處理。
好胁勺,下面進(jìn)入正題世澜。介紹Java操作MapReduce2組件完成Word Count Ver2.0的操作。
首先署穗,使用IDE建立Maven工程寥裂,建立工程時(shí)沒有特殊說明,按照向?qū)崾军c(diǎn)擊完成即可案疲。重要的是在pom.xml文件中添加依賴包封恰,內(nèi)容如下圖:
待系統(tǒng)下載好依賴的jar包后便可以編寫程序了。
展示實(shí)驗(yàn)代碼:
package linose.mapreduce;
import java.io.IOException;
import java.io.OutputStreamWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
//import org.apache.log4j.BasicConfigurator;
/**
* Hello MapReduce!
* Word Count V1.0
* 本示例演示如何使用MapReduce組件統(tǒng)計(jì)單詞出現(xiàn)的個(gè)數(shù)
* 關(guān)于示例中出現(xiàn)的API方法可以參考如下連接:http://hadoop.apache.org/docs/r3.1.1/api/index.html
*/
public class AppVer1
{
public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException
{
/**
* 設(shè)定MapReduce示例擁有HDFS的操作權(quán)限
*/
System.setProperty("HADOOP_USER_NAME", "hdfs");
/**
* 為了清楚的看到輸出結(jié)果褐啡,暫將集群調(diào)試信息缺省诺舔。
* 如果想查閱集群調(diào)試信息,取消注釋即可。
*/
//BasicConfigurator.configure();
/**
* MapReude實(shí)驗(yàn)準(zhǔn)備階段:
* 定義HDFS文件路徑
*/
String defaultFS = "hdfs://master2.linose.cloud.beijing.com:8020";
String inputPath = defaultFS + "/index.dirs/input.txt";
String outputPath = defaultFS + "/index.dirs/output";
/**
* 生產(chǎn)配置低飒,并獲取HDFS對象
*/
Configuration conf = new Configuration();
conf.set("fs.defaultFS", defaultFS);
FileSystem system = FileSystem.get(conf);
/**
* 定義輸入路徑许昨,輸出路徑
*/
Path inputHdfsPath = new Path(inputPath);
Path outputHdfsPath = new Path(outputPath);
/**
* 如果實(shí)驗(yàn)數(shù)據(jù)文件不存在則創(chuàng)建數(shù)據(jù)文件
*/
if (!system.exists(inputHdfsPath)) {
FSDataOutputStream inputStream = system.create(inputHdfsPath);
OutputStreamWriter outputStream = new OutputStreamWriter(inputStream);
outputStream.write("芒果 菠蘿 西瓜 橘子 草莓 \n");
outputStream.write("草莓 橘子 蘋果 荔枝 藍(lán)莓 \n");
outputStream.write("天天 菇娘 釋迦 軟棗子 癩瓜 蛇皮果 \n");
outputStream.write("香蕉 菠蘿 鴨梨 柚子 蘋果 \n");
outputStream.write("草莓 橘子 桂圓 荔枝 香蕉 \n");
outputStream.write("蘋果 菠蘿 草莓 彌猴桃 芒果 \n");
outputStream.write("蘋果 香蕉 提子 橘子 菠蘿 \n");
outputStream.write("西瓜 蘋果 香蕉 橙子 提子 \n");
outputStream.write("香蕉 鴨梨 西瓜 葡萄 芒果 \n");
outputStream.write("蘋果 櫻桃 香蕉 葡萄 橘子 \n");
outputStream.write("西瓜 葡萄 桃 車?yán)遄?香蕉 榴蓮 瓜 火龍果 荔枝 \n");
outputStream.close();
inputStream.close();
}
/**
* 如果實(shí)驗(yàn)結(jié)果目錄存在,遍歷文件內(nèi)容全部刪除
*/
if (system.exists(outputHdfsPath)) {
RemoteIterator<LocatedFileStatus> fsIterator = system.listFiles(outputHdfsPath, true);
LocatedFileStatus fileStatus;
while (fsIterator.hasNext()) {
fileStatus = fsIterator.next();
system.delete(fileStatus.getPath(), false);
}
system.delete(outputHdfsPath, false);
}
/**
* 創(chuàng)建MapReduce任務(wù)并設(shè)定Job名稱
*/
JobConf jobConf = new JobConf(conf, WordCountVer1.class);
jobConf.setJobName("Word Count Ver1.0:");
/**
* 指定輸入輸出的默認(rèn)格式類
*/
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setOutputFormat(TextOutputFormat.class);
/**
* 設(shè)置輸入文件與輸出文件
*/
FileInputFormat.setInputPaths(jobConf, inputHdfsPath);
FileOutputFormat.setOutputPath(jobConf, outputHdfsPath);
/**
* 指定Reduce類輸出類型Key類型與Value類型
*/
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(IntWritable.class);
/**
* 指定自定義Map類褥赊,Reduce類糕档,開啟Combiner函數(shù)。
*/
jobConf.setMapperClass(WordCountVer1.Map.class);
jobConf.setCombinerClass(WordCountVer1.Reduce.class);
jobConf.setReducerClass(WordCountVer1.Reduce.class);
/**
* 提交作業(yè)
*/
RunningJob run = JobClient.runJob(jobConf);
/**
* 然后輪詢進(jìn)度拌喉,直到作業(yè)完成速那。
*/
float progress = 0.0f;
do {
progress = run.setupProgress();
System.out.println("Word Count Ver1.0: 的當(dāng)前進(jìn)度:" + progress * 100);
Thread.sleep(1000);
} while (progress != 1.0f && !run.isComplete());
/**
* 如果成功,查看輸出文件內(nèi)容
*/
if (run.isSuccessful()) {
RemoteIterator<LocatedFileStatus> fsIterator = system.listFiles(outputHdfsPath, true);
LocatedFileStatus fileStatus;
while (fsIterator.hasNext()) {
fileStatus = fsIterator.next();
FSDataInputStream outputStream = system.open(fileStatus.getPath());
IOUtils.copyBytes(outputStream, System.out, conf, false);
outputStream.close();
System.out.println("--------------------------------------------");
}
}
}
}
展示MapReduce2-3.1.1組件編寫Word Count Ver1.0測試類:
package linose.mapreduce;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
/**
* WordCount示例
* @author Iggi
*
*/
public class WordCountVer1 {
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter report) throws IOException {
int counter = 0;
while(values.hasNext()) {
counter += values.next().get();
}
output.collect(key, new IntWritable(counter));
}
}
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter report) throws IOException {
String[] words = value.toString().split(" ");
for(String word: words){
output.collect(new Text(word), new IntWritable(1));
}
}
}
}
下圖為測試結(jié)果:
至此尿背,MapReduce2-3.1.1 Word Count Ver1.0 實(shí)驗(yàn)示例演示完畢端仰。