MapReduce2-3.1.1 分布式計(jì)算 實(shí)驗(yàn)示例(一)單詞計(jì)數(shù)

大家好搀捷,我是Iggi。

今天我給大家分享的是MapReduce2-3.1.1版本的Word Count Ver1.0實(shí)驗(yàn)示例默怨。

首先用一段文字簡介MapReduce:

MapReduce最早是由Google公司研究提出的一種面向大規(guī)模數(shù)據(jù)處理的并行計(jì)算模型和方法樟氢。Google公司設(shè)計(jì)MapReduce的初衷主要是為了解決其搜索引擎中大規(guī)模網(wǎng)頁數(shù)據(jù)的并行化處理。Google公司發(fā)明了MapReduce之后首先用其重新改寫了其搜索引擎中的Web文檔索引處理系統(tǒng)镇饺。但由于MapReduce可以普遍應(yīng)用于很多大規(guī)模數(shù)據(jù)的計(jì)算問題乎莉,因此自發(fā)明MapReduce以后,Google公司內(nèi)部進(jìn)一步將其廣泛應(yīng)用于很多大規(guī)模數(shù)據(jù)處理問題奸笤。到目前為止惋啃,Google公司內(nèi)有上萬個(gè)各種不同的算法問題和程序都使用MapReduce進(jìn)行處理。

2004年监右,開源項(xiàng)目Lucene(搜索索引程序庫)和Nutch(搜索引擎)的創(chuàng)始人Doug Cutting發(fā)現(xiàn)MapReduce正是其所需要的解決大規(guī)模Web數(shù)據(jù)處理的重要技術(shù)边灭,因而模仿Google MapReduce,基于Java設(shè)計(jì)開發(fā)了一個(gè)稱為Hadoop的開源MapReduce并行計(jì)算框架和系統(tǒng)健盒。自此绒瘦,Hadoop成為Apache開源組織下最重要的項(xiàng)目,自其推出后很快得到了全球?qū)W術(shù)界和工業(yè)界的普遍關(guān)注扣癣,并得到推廣和普及應(yīng)用惰帽。

MapReduce的推出給大數(shù)據(jù)并行處理帶來了巨大的革命性影響,使其已經(jīng)成為事實(shí)上的大數(shù)據(jù)處理的工業(yè)標(biāo)準(zhǔn)搏色。盡管MapReduce還有很多局限性善茎,但人們普遍公認(rèn),MapReduce是到目前為止最為成功频轿、最廣為接受和最易于使用的大數(shù)據(jù)并行處理技術(shù)垂涯。MapReduce的發(fā)展普及和帶來的巨大影響遠(yuǎn)遠(yuǎn)超出了發(fā)明者和開源社區(qū)當(dāng)初的意料烁焙,以至于馬里蘭大學(xué)教授、2010年出版的《Data-Intensive Text Processing with MapReduce》一書的作者Jimmy Lin在書中提出:MapReduce改變了我們組織大規(guī)模計(jì)算的方式耕赘,它代表了第一個(gè)有別于馮·諾依曼結(jié)構(gòu)的計(jì)算模型骄蝇,是在集群規(guī)模而非單個(gè)機(jī)器上組織大規(guī)模計(jì)算的新的抽象模型上的第一個(gè)重大突破,是到目前為止所見到的最為成功的基于大規(guī)模計(jì)算資源的計(jì)算模型操骡。

MapReduce是面向大數(shù)據(jù)并行處理的計(jì)算模型九火、框架和平臺(tái),它隱含了以下三層含義:

1)MapReduce是一個(gè)基于集群的高性能并行計(jì)算平臺(tái)(Cluster Infrastructure)册招。它允許用市場上普通的商用服務(wù)器構(gòu)成一個(gè)包含數(shù)十岔激、數(shù)百至數(shù)千個(gè)節(jié)點(diǎn)的分布和并行計(jì)算集群。

2)MapReduce是一個(gè)并行計(jì)算與運(yùn)行軟件框架(Software Framework)是掰。它提供了一個(gè)龐大但設(shè)計(jì)精良的并行計(jì)算軟件框架虑鼎,能自動(dòng)完成計(jì)算任務(wù)的并行化處理,自動(dòng)劃分計(jì)算數(shù)據(jù)和計(jì)算任務(wù)键痛,在集群節(jié)點(diǎn)上自動(dòng)分配和執(zhí)行任務(wù)以及收集計(jì)算結(jié)果炫彩,將數(shù)據(jù)分布存儲(chǔ)、數(shù)據(jù)通信絮短、容錯(cuò)處理等并行計(jì)算涉及到的很多系統(tǒng)底層的復(fù)雜細(xì)節(jié)交由系統(tǒng)負(fù)責(zé)處理江兢,大大減少了軟件開發(fā)人員的負(fù)擔(dān)。

3)MapReduce是一個(gè)并行程序設(shè)計(jì)模型與方法(Programming Model & Methodology)丁频。它借助于函數(shù)式程序設(shè)計(jì)語言Lisp的設(shè)計(jì)思想杉允,提供了一種簡便的并行程序設(shè)計(jì)方法,用Map和Reduce兩個(gè)函數(shù)編程實(shí)現(xiàn)基本的并行計(jì)算任務(wù)席里,提供了抽象的操作和并行編程接口夺颤,以簡單方便地完成大規(guī)模數(shù)據(jù)的編程和計(jì)算處理。

image.png

好胁勺,下面進(jìn)入正題世澜。介紹Java操作MapReduce2組件完成Word Count Ver2.0的操作。

首先署穗,使用IDE建立Maven工程寥裂,建立工程時(shí)沒有特殊說明,按照向?qū)崾军c(diǎn)擊完成即可案疲。重要的是在pom.xml文件中添加依賴包封恰,內(nèi)容如下圖:

image.png

待系統(tǒng)下載好依賴的jar包后便可以編寫程序了。

展示實(shí)驗(yàn)代碼:

package linose.mapreduce;

import java.io.IOException;
import java.io.OutputStreamWriter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
//import org.apache.log4j.BasicConfigurator;


/**
 * Hello MapReduce!
 * Word Count V1.0
 * 本示例演示如何使用MapReduce組件統(tǒng)計(jì)單詞出現(xiàn)的個(gè)數(shù)
 * 關(guān)于示例中出現(xiàn)的API方法可以參考如下連接:http://hadoop.apache.org/docs/r3.1.1/api/index.html
 */
public class AppVer1 
{ 
    public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException
    {   
        /**
         * 設(shè)定MapReduce示例擁有HDFS的操作權(quán)限
         */
        System.setProperty("HADOOP_USER_NAME", "hdfs"); 
        
        /**
         * 為了清楚的看到輸出結(jié)果褐啡,暫將集群調(diào)試信息缺省诺舔。
         * 如果想查閱集群調(diào)試信息,取消注釋即可。
         */
        //BasicConfigurator.configure();
        
        /**
         * MapReude實(shí)驗(yàn)準(zhǔn)備階段:
         * 定義HDFS文件路徑
         */
        String defaultFS = "hdfs://master2.linose.cloud.beijing.com:8020";
        String inputPath = defaultFS + "/index.dirs/input.txt";
        String outputPath = defaultFS + "/index.dirs/output";
        
        /**
         * 生產(chǎn)配置低飒,并獲取HDFS對象
         */
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", defaultFS);
        FileSystem system = FileSystem.get(conf);
        
        /**
         * 定義輸入路徑许昨,輸出路徑
         */
        Path inputHdfsPath = new Path(inputPath);
        Path outputHdfsPath = new Path(outputPath);
        
        /**
         * 如果實(shí)驗(yàn)數(shù)據(jù)文件不存在則創(chuàng)建數(shù)據(jù)文件
         */
        if (!system.exists(inputHdfsPath)) {
            
            FSDataOutputStream inputStream = system.create(inputHdfsPath);
            OutputStreamWriter outputStream = new OutputStreamWriter(inputStream);
            
            outputStream.write("芒果 菠蘿 西瓜 橘子 草莓 \n");
            outputStream.write("草莓 橘子 蘋果 荔枝 藍(lán)莓 \n");
            outputStream.write("天天 菇娘 釋迦 軟棗子 癩瓜 蛇皮果 \n");
            outputStream.write("香蕉 菠蘿 鴨梨 柚子 蘋果 \n");
            outputStream.write("草莓 橘子 桂圓 荔枝 香蕉 \n");
            outputStream.write("蘋果 菠蘿 草莓 彌猴桃 芒果 \n");
            outputStream.write("蘋果 香蕉 提子 橘子 菠蘿 \n");
            outputStream.write("西瓜 蘋果 香蕉 橙子 提子 \n");
            outputStream.write("香蕉 鴨梨 西瓜 葡萄 芒果 \n");
            outputStream.write("蘋果 櫻桃 香蕉 葡萄 橘子 \n");
            outputStream.write("西瓜 葡萄 桃 車?yán)遄?香蕉 榴蓮 瓜 火龍果 荔枝 \n");
            
            outputStream.close();
            inputStream.close();
        }
    
        /**
         * 如果實(shí)驗(yàn)結(jié)果目錄存在,遍歷文件內(nèi)容全部刪除
         */
        if (system.exists(outputHdfsPath)) {
            RemoteIterator<LocatedFileStatus> fsIterator = system.listFiles(outputHdfsPath, true);
            LocatedFileStatus fileStatus;
            while (fsIterator.hasNext()) {
                fileStatus = fsIterator.next();
                system.delete(fileStatus.getPath(), false);
            }
            system.delete(outputHdfsPath, false);
        }

        /**
         * 創(chuàng)建MapReduce任務(wù)并設(shè)定Job名稱
         */
        JobConf jobConf = new JobConf(conf, WordCountVer1.class);
        jobConf.setJobName("Word Count Ver1.0:");
    
        /**
         * 指定輸入輸出的默認(rèn)格式類
         */
        jobConf.setInputFormat(TextInputFormat.class);
        jobConf.setOutputFormat(TextOutputFormat.class);

        /**
         * 設(shè)置輸入文件與輸出文件
         */
        FileInputFormat.setInputPaths(jobConf, inputHdfsPath);
        FileOutputFormat.setOutputPath(jobConf, outputHdfsPath);
        
        /**
         * 指定Reduce類輸出類型Key類型與Value類型
         */
        jobConf.setOutputKeyClass(Text.class);
        jobConf.setOutputValueClass(IntWritable.class);

        /**
         * 指定自定義Map類褥赊,Reduce類糕档,開啟Combiner函數(shù)。
         */
        jobConf.setMapperClass(WordCountVer1.Map.class);
        jobConf.setCombinerClass(WordCountVer1.Reduce.class);
        jobConf.setReducerClass(WordCountVer1.Reduce.class);
        
        /**
         * 提交作業(yè)
         */
        RunningJob run = JobClient.runJob(jobConf);

        /**
         * 然后輪詢進(jìn)度拌喉,直到作業(yè)完成速那。
         */
        float progress = 0.0f;
        do {
            progress = run.setupProgress();
            System.out.println("Word Count Ver1.0: 的當(dāng)前進(jìn)度:" + progress * 100);
            Thread.sleep(1000);
        } while (progress != 1.0f && !run.isComplete());
        
        /**
         * 如果成功,查看輸出文件內(nèi)容
         */
        if (run.isSuccessful()) {
            RemoteIterator<LocatedFileStatus> fsIterator = system.listFiles(outputHdfsPath, true);
            LocatedFileStatus fileStatus;
            while (fsIterator.hasNext()) {
                fileStatus = fsIterator.next();
                FSDataInputStream outputStream = system.open(fileStatus.getPath());
                IOUtils.copyBytes(outputStream, System.out, conf, false);
                outputStream.close();
                System.out.println("--------------------------------------------");
            }
        }
    }
}

展示MapReduce2-3.1.1組件編寫Word Count Ver1.0測試類:

package linose.mapreduce;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

/**
 * WordCount示例
 * @author Iggi
 *
 */
public class WordCountVer1 {

    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter report) throws IOException {
            int counter = 0;
            while(values.hasNext()) {
                counter += values.next().get();
            }
            output.collect(key, new IntWritable(counter));
        }
    }
    
    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter report) throws IOException {
            
            String[] words = value.toString().split(" ");
            for(String word: words){
                output.collect(new Text(word), new IntWritable(1));
            }
        }
    }
}

下圖為測試結(jié)果:


image.png

至此尿背,MapReduce2-3.1.1 Word Count Ver1.0 實(shí)驗(yàn)示例演示完畢端仰。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市田藐,隨后出現(xiàn)的幾起案子榆俺,更是在濱河造成了極大的恐慌,老刑警劉巖坞淮,帶你破解...
    沈念sama閱讀 221,430評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異陪捷,居然都是意外死亡回窘,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,406評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門市袖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來啡直,“玉大人,你說我怎么就攤上這事苍碟【泼伲” “怎么了?”我有些...
    開封第一講書人閱讀 167,834評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵微峰,是天一觀的道長舷丹。 經(jīng)常有香客問我,道長蜓肆,這世上最難降的妖魔是什么颜凯? 我笑而不...
    開封第一講書人閱讀 59,543評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮仗扬,結(jié)果婚禮上症概,老公的妹妹穿的比我還像新娘。我一直安慰自己早芭,他們只是感情好彼城,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,547評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般募壕。 火紅的嫁衣襯著肌膚如雪调炬。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,196評(píng)論 1 308
  • 那天司抱,我揣著相機(jī)與錄音筐眷,去河邊找鬼。 笑死习柠,一個(gè)胖子當(dāng)著我的面吹牛邦蜜,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播衷敌,決...
    沈念sama閱讀 40,776評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼鳖悠,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了溶锭?” 一聲冷哼從身側(cè)響起宝恶,我...
    開封第一講書人閱讀 39,671評(píng)論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎趴捅,沒想到半個(gè)月后垫毙,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,221評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡拱绑,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,303評(píng)論 3 340
  • 正文 我和宋清朗相戀三年综芥,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片猎拨。...
    茶點(diǎn)故事閱讀 40,444評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡膀藐,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出红省,到底是詐尸還是另有隱情额各,我是刑警寧澤,帶...
    沈念sama閱讀 36,134評(píng)論 5 350
  • 正文 年R本政府宣布吧恃,位于F島的核電站虾啦,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏痕寓。R本人自食惡果不足惜缸逃,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,810評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望厂抽。 院中可真熱鬧需频,春花似錦、人聲如沸筷凤。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,285評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至挪丢,卻和暖如春蹂风,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背乾蓬。 一陣腳步聲響...
    開封第一講書人閱讀 33,399評(píng)論 1 272
  • 我被黑心中介騙來泰國打工惠啄, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人任内。 一個(gè)月前我還...
    沈念sama閱讀 48,837評(píng)論 3 376
  • 正文 我出身青樓撵渡,卻偏偏與公主長得像,于是被迫代替她去往敵國和親死嗦。 傳聞我的和親對象是個(gè)殘疾皇子趋距,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,455評(píng)論 2 359