MapReduce2-3.1.1 分布式計(jì)算 實(shí)驗(yàn)示例(二)單詞計(jì)數(shù) 加入停詞表功能

大家好,我是Iggi筹裕。

今天我給大家分享的是MapReduce2-3.1.1版本的Word Count Ver2.0實(shí)驗(yàn)窄驹。

關(guān)于MapReduce的一段文字簡(jiǎn)介請(qǐng)自行查閱我的上一篇實(shí)驗(yàn)示例:MapReduce2-3.1.1 實(shí)驗(yàn)示例 單詞計(jì)數(shù)(一)

好乐埠,下面進(jìn)入正題。介紹Java操作MapReduce2組件完成Word Count Ver2.0的操作瑞眼。

首先棵逊,使用IDE建立Maven工程,建立工程時(shí)沒有特殊說(shuō)明徒像,按照向?qū)崾军c(diǎn)擊完成即可蛙讥。重要的是在pom.xml文件中添加依賴包,內(nèi)容如下圖:

image.png

待系統(tǒng)下載好依賴的jar包后便可以編寫程序了谬墙。

展示實(shí)驗(yàn)代碼:

package linose.mapreduce;

import java.io.IOException;
import java.io.OutputStreamWriter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//import org.apache.log4j.BasicConfigurator;

/**
 * Hello MapReduce!
 * Word Count V2.0
 * 本示例演示如何使用MapReduce組件,添加忽略詞文件來(lái)統(tǒng)計(jì)單詞出現(xiàn)的個(gè)數(shù)
 * 關(guān)于示例中出現(xiàn)的API方法可以參考如下連接:http://hadoop.apache.org/docs/r3.1.1/api/index.html
 */
public class AppVer2 
{

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 
    {
        /**
         * 設(shè)定MapReduce示例擁有HDFS的操作權(quán)限
         */
        System.setProperty("HADOOP_USER_NAME", "hdfs");
        
        /**
         * 為了清楚的看到輸出結(jié)果,暫將集群調(diào)試信息缺省侵蒙。
         * 如果想查閱集群調(diào)試信息,取消注釋即可算凿。
         */
        //BasicConfigurator.configure();
        
        /**
         * MapReude實(shí)驗(yàn)準(zhǔn)備階段:
         * 定義HDFS文件路徑
         */
        String defaultFS = "hdfs://master2.linose.cloud.beijing.com:8020";
        String inputPath = defaultFS + "/index.dirs/inputV2.txt";
        String outputPath = defaultFS + "/index.dirs/outputV2";
        String skipPath = defaultFS + "/index.dirs/patterns.txt";
        
        /**
         * 生產(chǎn)配置氓轰,并獲取HDFS對(duì)象
         */
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", defaultFS);
        FileSystem system = FileSystem.get(conf);
        
        /**
         * 定義輸入路徑,輸出路徑
         */
        Path inputHdfsPath = new Path(inputPath);
        Path outputHdfsPath = new Path(outputPath);
        Path stopWordPath = new Path(skipPath);
        
        /**
         * 如果實(shí)驗(yàn)數(shù)據(jù)文件不存在則創(chuàng)建數(shù)據(jù)文件
         */
        if (!system.exists(inputHdfsPath)) {
            FSDataOutputStream outputStream = system.create(inputHdfsPath);
            OutputStreamWriter file = new OutputStreamWriter(outputStream);
            file.write("芒果 菠蘿  西瓜! 橘子, 草莓. \n");
            file.write("草莓 橘子  蘋果! 荔枝, 藍(lán)莓. \n");
            file.write("天天 菇娘  釋迦! 軟棗子, 癩瓜, 蛇皮果. \n");
            file.write("香蕉 菠蘿  鴨梨! 柚子, 蘋果. \n");
            file.write("草莓 橘子  桂圓! 荔枝, 香蕉. \n");
            file.write("蘋果 菠蘿  草莓! 彌猴桃, 芒果. \n");
            file.write("蘋果 香蕉  提子! 橘子, 菠蘿. \n");
            file.write("西瓜 蘋果  香蕉! 橙子, 提子. \n");
            file.write("香蕉 鴨梨  西瓜! 葡萄, 芒果. \n");
            file.write("蘋果 櫻桃  香蕉! 葡萄, 橘子. \n");
            file.write("西瓜 葡萄  桃! 車?yán)遄? 香蕉, 榴蓮, 瓜, 火龍果, 荔枝. \n");
            file.close();
            outputStream.close();
        }
    
        /**
         * 如果實(shí)驗(yàn)結(jié)果目錄存在案糙,遍歷文件內(nèi)容全部刪除
         */
        if (system.exists(outputHdfsPath)) {
            RemoteIterator<LocatedFileStatus> fsIterator = system.listFiles(outputHdfsPath, true);
            LocatedFileStatus fileStatus;
            while (fsIterator.hasNext()) {
                fileStatus = fsIterator.next();
                system.delete(fileStatus.getPath(), false);
            }
            system.delete(outputHdfsPath, false);
        }
        
        /**
         * 創(chuàng)建忽略單詞文件时捌,除了要過濾標(biāo)點(diǎn)符號(hào)外炉抒,我希望過濾掉:天天、菇娘拿诸、釋迦蛤奥、軟棗子僚稿、癩瓜蚀同、蛇皮果這幾個(gè)水果
         */
        system.delete(stopWordPath, false);
        if (!system.exists(stopWordPath)) {
            FSDataOutputStream outputStream = system.create(stopWordPath);
            OutputStreamWriter file = new OutputStreamWriter(outputStream);
            file.write("\\,\n");
            file.write("\\.\n");
            file.write("\\!\n");
            file.write("天天\n");
            file.write("菇娘\n");
            file.write("釋迦\n");
            file.write("軟棗子\n");
            file.write("癩瓜\n");
            file.write("蛇皮果\n");
            file.close();
            outputStream.close();
        }
        
        /**
         * 創(chuàng)建MapReduce任務(wù)并設(shè)定Job名稱
         */
        Job job = Job.getInstance(conf, "Word Count Ver2:");
        job.setJarByClass(WordCountVer2.class);
        
        /**
         * 設(shè)置輸入文件、輸出文件衰猛、緩存文件
         */
        FileInputFormat.addInputPath(job, inputHdfsPath);
        FileOutputFormat.setOutputPath(job, outputHdfsPath);
        job.addCacheFile(stopWordPath.toUri());
        job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
        
        /**
         * 指定Reduce類輸出類型Key類型與Value類型
         */
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
      
        /**
         * 指定自定義Map類啡省,Reduce類髓霞,開啟Combiner函數(shù)。
         */
        job.setMapperClass(WordCountVer2.TokenizerMapper.class);
        job.setCombinerClass(WordCountVer2.IntSumReducer.class);
        job.setReducerClass(WordCountVer2.IntSumReducer.class);
      
        /**
         * 提交作業(yè)
         */
        job.waitForCompletion(true);
        
        /**
         * 然后輪詢進(jìn)度结序,直到作業(yè)完成纵潦。
         */
        float progress = 0.0f;
        do {
            progress = job.setupProgress();
            System.out.println("Word Count Ver2: 的當(dāng)前進(jìn)度:" + progress * 100);
            Thread.sleep(1000);
        } while (progress != 1.0f && !job.isComplete());
        
        /**
         * 如果成功垃环,查看輸出文件內(nèi)容
         */
        if (job.isSuccessful()) {
            RemoteIterator<LocatedFileStatus> fsIterator = system.listFiles(outputHdfsPath, true);
            LocatedFileStatus fileStatus;
            while (fsIterator.hasNext()) {
                fileStatus = fsIterator.next();
                FSDataInputStream outputStream = system.open(fileStatus.getPath());
                IOUtils.copyBytes(outputStream, System.out, conf, false);
                outputStream.close();
                System.out.println("--------------------------------------------");
            }
        }
    }
}

展示MapReduce2-3.1.1組件編寫Word Count Ver2.0測(cè)試類:

package linose.mapreduce;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.StringUtils;

public class WordCountVer2 {
    
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        
        static enum CountersEnum { INPUT_WORDS }
        
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        
        private boolean caseSensitive;
        private Set<String> patternsToSkip = new HashSet<String>();

        private Configuration conf;
        private BufferedReader fis;
        
        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            conf = context.getConfiguration();
            caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
            if (conf.getBoolean("wordcount.skip.patterns", false)) {
                URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
                for (URI patternsURI : patternsURIs) {
                    Path patternsPath = new Path(patternsURI.getPath());
                    String patternsFileName = patternsPath.getName().toString();
                    parseSkipFile(patternsFileName);
                }
            }
        }

        private void parseSkipFile(String fileName) {
            try {
                fis = new BufferedReader(new FileReader(fileName));
                String pattern = null;
                while ((pattern = fis.readLine()) != null) {
                    patternsToSkip.add(pattern);
                }
            } catch (IOException ioe) {
                System.err.println("Caught exception while parsing the cached file '" + StringUtils.stringifyException(ioe));
            }
        }

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            
            String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();
            for (String pattern : patternsToSkip) {
                line = line.replaceAll(pattern, "");
            }
            
            StringTokenizer itr = new StringTokenizer(line);
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
                Counter counter = context.getCounter(CountersEnum.class.getName(), CountersEnum.INPUT_WORDS.toString()); counter.increment(1);
            }
        }
      }

      public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
        
          private IntWritable result = new IntWritable();

          public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
              int sum = 0;
              for (IntWritable value : values) {
                sum += value.get();
              }
              result.set(sum);
              context.write(key, result);
          }
      }
}

下圖為測(cè)試結(jié)果:


image.png

至此涧团,MapReduce2-3.1.1 Word Count Ver2.0 實(shí)驗(yàn)示例演示完畢经磅。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市阿迈,隨后出現(xiàn)的幾起案子轧叽,更是在濱河造成了極大的恐慌,老刑警劉巖待逞,帶你破解...
    沈念sama閱讀 211,042評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件识樱,死亡現(xiàn)場(chǎng)離奇詭異震束,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)垢村,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門嘉栓,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人麻昼,你說(shuō)我怎么就攤上這事趣钱。” “怎么了燕垃?”我有些...
    開封第一講書人閱讀 156,674評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)您旁。 經(jīng)常有香客問我轴捎,道長(zhǎng),這世上最難降的妖魔是什么侦锯? 我笑而不...
    開封第一講書人閱讀 56,340評(píng)論 1 283
  • 正文 為了忘掉前任秦驯,我火速辦了婚禮,結(jié)果婚禮上亲桥,老公的妹妹穿的比我還像新娘固耘。我一直安慰自己,他們只是感情好番枚,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,404評(píng)論 5 384
  • 文/花漫 我一把揭開白布户辫。 她就那樣靜靜地躺著嗤锉,像睡著了一般墓塌。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上苫幢,一...
    開封第一講書人閱讀 49,749評(píng)論 1 289
  • 那天韩肝,我揣著相機(jī)與錄音,去河邊找鬼涡相。 笑死,一個(gè)胖子當(dāng)著我的面吹牛催蝗,可吹牛的內(nèi)容都是我干的丙号。 我是一名探鬼主播,決...
    沈念sama閱讀 38,902評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼喳魏,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼怀薛!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起迂苛,我...
    開封第一講書人閱讀 37,662評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤鼓择,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后念搬,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體摆出,經(jīng)...
    沈念sama閱讀 44,110評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡偎漫,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了温亲。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片杯矩。...
    茶點(diǎn)故事閱讀 38,577評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡史隆,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情鬓照,我是刑警寧澤相艇,帶...
    沈念sama閱讀 34,258評(píng)論 4 328
  • 正文 年R本政府宣布坛芽,位于F島的核電站,受9級(jí)特大地震影響咙轩,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜丐膝,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,848評(píng)論 3 312
  • 文/蒙蒙 一钾菊、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧浑此,春花似錦滞详、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至篇裁,卻和暖如春赡若,著一層夾襖步出監(jiān)牢的瞬間团甲,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評(píng)論 1 264
  • 我被黑心中介騙來(lái)泰國(guó)打工身腻, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人脐区。 一個(gè)月前我還...
    沈念sama閱讀 46,271評(píng)論 2 360
  • 正文 我出身青樓她按,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親媒佣。 傳聞我的和親對(duì)象是個(gè)殘疾皇子陵刹,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,452評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容