進行MapReduce編程之wordcount

統(tǒng)計一個文件中相同單詞的個數(shù)

使用工具:IDEA+Maven

實驗基礎(chǔ):必須在服務(wù)器上搭建好hadoop的hdfs環(huán)境和yarn環(huán)境逸月,具體搭建方式:http://www.reibang.com/p/59cb748bddec

運行環(huán)境:VM虛擬機上的一個linux系統(tǒng)運行hadoop吃挑,在window10下的idea中進行開發(fā)

開發(fā)步驟:

1茴丰,在pom.xml文件中導(dǎo)入hadoop的依賴包

<properties>
        <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
</properties>
<repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
</repositories>
<dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
</dependencies>

2卫旱,新建一個WordCountApp這個類望伦,用來統(tǒng)計一個文件中相同單詞的個數(shù)纳猫,并將結(jié)果保存到一個文件中

package com.zhx.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @program: hdfs
 * @description: this is a MAPREDUCE‘s test project
 * @author: zhx
 * @create: 2021-03-09 18:09
 **/
public class WordCountApp {

    /**
     * map讀取輸入文件
     * keyInput:LongWritable蓖扑,文件的偏移量,開始為0霞幅,后面的key是每一行的字符量的累加
     * valueInput:Text漠吻,每一行的值
     * keyOutput:Text,輸出是每個單詞的字符串
     * valueOutput:LongWritable司恳,是每個單詞的個數(shù)
     */
    public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            LongWritable one = new LongWritable(1);
            //獲取一行數(shù)據(jù)
            String line = value.toString();
            //每一行按照空格進行分割開來
            String[] words = line.split(" ");
            for (String word :  words) {
                context.write(new Text(word), one);
            }
        }
    }

    /**
     * reduce:歸并操作
     * map的輸出就是reduce的輸入途乃,所以reduce的
     * keyInput:Text
     * valueInput:LongWritable
     * keyOutput:Text
     * valueOutput:LongWritable
     */
    public static class MyReduce extends Reducer<Text,LongWritable,Text,LongWritable>{
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            long sum=0;
            for (LongWritable value : values) {
                sum+=value.get();
            }
            context.write(key,new LongWritable(sum));
        }
    }

    /**
    *  Driver,封裝了MapReduce作業(yè)的所有信息
    */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //創(chuàng)建configuration
        Configuration configuration = new Configuration();
        //創(chuàng)建job
        Job job = Job.getInstance(configuration, "wordcount");
        //設(shè)置job處理類
        job.setJarByClass(WordCountApp.class);
        //設(shè)置作業(yè)處理輸入路徑
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        //設(shè)置map相關(guān)系數(shù)
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        //設(shè)置reduce的相關(guān)參數(shù)
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //設(shè)置作業(yè)處理輸出路徑
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //提交作業(yè)
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

3,需要把這個文件打包上傳到服務(wù)器中進行運行扔傅,具體操作

1耍共,進入這個項目的根目錄烫饼,打開shell窗口,執(zhí)行mvn clean package -DskipTests试读,這個命令杠纵,然后可以看到在target目錄下有一個"項目名-1.0.jar"這個文件,說明第一步成了
2钩骇,繼續(xù)執(zhí)行scp .\target\hdfs-1.0.jar root@zhx:/home/zhx/lib/這個命令比藻,lib目錄是你的服務(wù)器上要存放這個文件的位置,把第一步生成的.jar文件上傳到服務(wù)器上
3倘屹,執(zhí)行命令hadoop jar /home/zhx/lib/hdfs-1.0.jar com.zhx.hadoop.mapreduce.WordCountApp hdfs://zhx:8020/mapreduce_test1.txt hdfs://zhx:8020/output/wc/银亲,如果你的hdfs上沒有mapreduce_test1.txt這個文件,可以先把這個文件上傳到hdfs中纽匙,上傳命令hadoop fs -put mapreduce_test1.txt在服務(wù)器上的的路徑 /务蝠,/是hdfs上根路徑,直接上傳到hdfs的根路徑就可以了烛缔,為了以后方便直接把上面的命令保存到wordcountapp.sh這個文件中馏段,然后賦予這個文件執(zhí)行的權(quán)限chmod u+x wordcountapp.sh,到時候可以直接./wordcountapp.sh践瓷,就可以執(zhí)行了
4院喜,mapreduce_test1.txt的文件內(nèi)容

hello mapreduce
hdfs mapreduce yarn
hello is a hdfs
yarn mapreduce

5,查看是否成功晕翠,執(zhí)行hadoop fs -ls /output/wc這個命令够坐,看是否存在part-r-00000這個文件,然后查看這個文件hadoop fs -cat /output/wc/part-r-00000崖面,內(nèi)容如下:

a       1
hdfs    2
hello   2
is      1
mapreduce       3
yarn    2

4,上述3步已經(jīng)基本上開發(fā)完成了梯影,但是還是存在一些問題的

如果此刻繼續(xù)執(zhí)行wordcountapp.sh這個文件會出現(xiàn)錯誤巫员,這個錯誤是說hdfs上已經(jīng)存在了輸出文件夾了,在MapReduce中輸出文件夾不能事先存在

錯誤日志

因此有兩種解決辦法:1甲棍,是手動在shell中將輸出文件目錄刪除简识,執(zhí)行hadoop fs -rm -R /output 2,是在代碼中完成刪除功能

       //創(chuàng)建configuration
       Configuration configuration = new Configuration();
       //準備清理已存在的輸出目錄
       Path outpath = new Path(args[1]);
       FileSystem fileSystem = FileSystem.get(configuration);
       if (fileSystem.exists(outpath)) {
          fileSystem.delete(outpath, true);
           System.out.println("已經(jīng)刪除了存在的目錄");
       }

把這段代碼加入到wordcountapp中的main函數(shù)中感猛,然后重新打包上傳七扰,執(zhí)行wordcountapp.sh這個文件,看是否出錯

5陪白,繼續(xù)優(yōu)化颈走,引入combiner,這個可以本地reduce咱士,從而可以減少MapTask輸出的數(shù)據(jù)量以及網(wǎng)絡(luò)的傳輸量立由,他可以先把每個Mapper上的詞先和合并一波轧钓,然后把合并后的傳給reduce

combiner.png

使用場景:求和,次數(shù)锐膜,加法這些可以毕箍,但是平均數(shù)就不行
把這個job.setCombinerClass(MyReduce.class);加入到main中

       //設(shè)置map相關(guān)系數(shù)
       job.setMapperClass(MyMapper.class);
       job.setMapOutputKeyClass(Text.class);
       job.setMapOutputValueClass(LongWritable.class);
       //通過job設(shè)置combine處理類,其實邏輯上與reduce一致
       job.setCombinerClass(MyReduce.class);

然后執(zhí)行打包上傳道盏,執(zhí)行.sh文件而柑,查看執(zhí)行過程中的日志


combiner執(zhí)行結(jié)果

只要這兩個不為0說明combiner起作用了

6,partitioner分區(qū)器
shuffle是通過分區(qū)partitioner 分配給Reduce的荷逞,一個Reducer對應(yīng)一個記錄文件媒咳,Partitioner是shuffle的一部分,partitioner執(zhí)行時機:在mapper執(zhí)行完成颅围,Reducer還沒有執(zhí)行的時候伟葫,mapper的輸出就是partitioner的輸入 即<k2,v2>partitioner 分區(qū)主要是用來提高效率的 ,例如從全國基站的數(shù)據(jù)中查找北京基站的數(shù)據(jù)院促,如果計算時不分區(qū)全國的數(shù)據(jù)都放在一起筏养,查詢的時候就相當于全表掃描 效率非常低,如果在第一次進行Mapreducer計算的時候按照省市進行分區(qū)常拓,每個城市的基站數(shù)據(jù)都存儲在對應(yīng)的每個文件渐溶,那么下次再進行查詢的時候直接從北京分區(qū)里直接查找 效率很高。分區(qū)的依據(jù)是具體業(yè)務(wù)需求弄抬,可以按照省市分區(qū)茎辐,時間進行分區(qū)等。如果不手動進行分區(qū)掂恕,Hadoop有一個默認的分區(qū)規(guī)則Partitioner是partitioner的基類拖陆,如果需要定制partitioner也需要繼承該類。HashPartitioner是mapreduce的默認partitioner懊亡。計算方法是which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks依啰,得到當前的目的reducer。

新建一個PartitonerApp這個類

package com.zhx.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @program: hdfs
 * @description: this is a MAPREDUCE‘s test project
 * @author: zhx
 * @create: 2021-03-09 18:09
 **/
public class PartitonerApp {

    /**
     * map讀取輸入文件
     * keyInput:LongWritable店枣,文件的偏移量速警,開始為0,后面的key是每一行的字符量的累加
     * valueInput:Text鸯两,每一行的值
     * keyOutput:Text闷旧,輸出是每個單詞的字符串
     * valueOutput:LongWritable,是每個單詞對應(yīng)的value值
     * 這是測試 數(shù)據(jù)钧唐,一個文件中忙灼,第一列式手機品牌,第二列是銷售數(shù)逾柿,統(tǒng)計每種手機總的銷售數(shù)缀棍,這里輸出key是品牌字符串宅此,value是銷售數(shù)
     *   huawei 100
         xiaomi 100
         huawei 200
         xiaomi 300
         nojiya 100
         iphone 200
         huawei 200
         iphone 200
     */
    public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            LongWritable one = new LongWritable(1);
            //獲取一行數(shù)據(jù)
            String line = value.toString();
            //每一行按照空格進行分割開來
            String[] words = line.split(" ");
            context.write(new Text(words[0]), new LongWritable(Long.parseLong(words[1])));
        }
    }

    /**
     * reduce:歸并操作
     * map的輸出就是reduce的輸入,所以reduce的
     * keyInput:Text
     * valueInput:LongWritable
     * keyOutput:Text
     * valueOutput:LongWritable
     */
    public static class MyReduce extends Reducer<Text,LongWritable,Text,LongWritable>{
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            long sum=0;
            for (LongWritable value : values) {
                sum+=value.get();
            }
            context.write(key,new LongWritable(sum));
        }
    }

    public static class MyPartitioner extends Partitioner<Text,LongWritable>{

        @Override
        public int getPartition(Text key, LongWritable value, int i) {
            if(key.toString().equals("huawei")){
                return 0;
            }
            if(key.toString().equals("xiaomi")){
                return 1;
            }
            if(key.toString().equals("iphone")){
                return 2;
            }
            return 3;
        }
    }

    /**
    *  Driver,封裝了MapReduce作業(yè)的所有信息
    */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //創(chuàng)建configuration
        Configuration configuration = new Configuration();
        //準備清理已存在的輸出目錄
        Path outpath = new Path(args[1]);
        FileSystem fileSystem = FileSystem.get(configuration);
        if (fileSystem.exists(outpath)) {
            fileSystem.delete(outpath, true);
            System.out.println("已經(jīng)刪除了存在的目錄");
        }
        //創(chuàng)建job
        Job job = Job.getInstance(configuration, "wordcount");
        //設(shè)置job處理類
        job.setJarByClass(PartitonerApp.class);
        //設(shè)置作業(yè)處理輸入路徑
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        //設(shè)置map相關(guān)系數(shù)
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        //通過job設(shè)置combine處理類爬范,其實邏輯上與reduce一致
        job.setCombinerClass(MyReduce.class);
        //設(shè)置reduce的相關(guān)參數(shù)
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //設(shè)置job的partitioner的處理類
        job.setPartitionerClass(MyPartitioner.class);
        //設(shè)置4個reducer父腕,每個分區(qū)一個
        job.setNumReduceTasks(4);
        //設(shè)置作業(yè)處理輸出路徑
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //提交作業(yè)
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

然后打包上傳,執(zhí)行partitonerapp.sh文件青瀑,這個文件內(nèi)容hadoop jar /home/zhx/lib/hdfs-1.0.jar com.zhx.hadoop.mapreduce.PartitonerApp hdfs://zhx:8020/mapreduce_partitioner.txt hdfs://zhx:8020/output/wc/璧亮,此時輸出文件夾下會有4個文件,分別是

image.png

image.png
配置jobhistory

記錄已運行完的MapReduce信息到指定的hdfs目錄下斥难,默認是不開啟的枝嘶,可以http://zhx:8088/打開yarn的網(wǎng)址,然后運行一個pi的job哑诊,可以查看Tracking UI這列下的history群扶,是否可以打開,打不開說明需要配置

mapreduce-site.xml

<property>
        <name>mapreduce.jobhistory.address</name>
        <value>zhx:10020</value>
        <description>mapreduce jobhistory server ip(host:port)</description>
</property>
<property>
        <name>mapreduce.jobhistory.webapp.address</name>
        <value>zhx:19888</value>
        <description>mapreduce jobhistory server web ui host:port</description>
</property>
<property>
        <name>mapreduce.jobhistory.done-dir</name>
        <value>/history</value>
</property>
<property>
        <name>mapreduce.jobhistory.intermediate-done-dir</name>
        <value>/history/done_intermediate</value>
</property>

yarn-site.xml

<property>
        <name>yarn.log-aggregation-enable</name>
        <value>true</value>
</property>

然后停掉yarn镀裤,在重啟yarn竞阐,在啟動sbin目錄下的./mr-jobhistory-daemon.sh start historyserver
然后jps,看是否有 JobHistoryServer這個進程
最后可以在yarn的瀏覽器中查看執(zhí)行的日志了

image.png

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末暑劝,一起剝皮案震驚了整個濱河市骆莹,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌担猛,老刑警劉巖幕垦,帶你破解...
    沈念sama閱讀 219,110評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異傅联,居然都是意外死亡先改,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人胀茵,你說我怎么就攤上這事试和。” “怎么了衅枫?”我有些...
    開封第一講書人閱讀 165,474評論 0 356
  • 文/不壞的土叔 我叫張陵嫁艇,是天一觀的道長。 經(jīng)常有香客問我弦撩,道長步咪,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,881評論 1 295
  • 正文 為了忘掉前任益楼,我火速辦了婚禮猾漫,結(jié)果婚禮上点晴,老公的妹妹穿的比我還像新娘。我一直安慰自己悯周,他們只是感情好粒督,可當我...
    茶點故事閱讀 67,902評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著禽翼,像睡著了一般屠橄。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上闰挡,一...
    開封第一講書人閱讀 51,698評論 1 305
  • 那天锐墙,我揣著相機與錄音,去河邊找鬼长酗。 笑死溪北,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的夺脾。 我是一名探鬼主播之拨,決...
    沈念sama閱讀 40,418評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼劳翰!你這毒婦竟也來了敦锌?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,332評論 0 276
  • 序言:老撾萬榮一對情侶失蹤佳簸,失蹤者是張志新(化名)和其女友劉穎乙墙,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體生均,經(jīng)...
    沈念sama閱讀 45,796評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡听想,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,968評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了马胧。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片汉买。...
    茶點故事閱讀 40,110評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖佩脊,靈堂內(nèi)的尸體忽然破棺而出蛙粘,到底是詐尸還是另有隱情,我是刑警寧澤威彰,帶...
    沈念sama閱讀 35,792評論 5 346
  • 正文 年R本政府宣布出牧,位于F島的核電站,受9級特大地震影響歇盼,放射性物質(zhì)發(fā)生泄漏舔痕。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,455評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望伯复。 院中可真熱鬧慨代,春花似錦、人聲如沸啸如。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽组底。三九已至丈积,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間债鸡,已是汗流浹背江滨。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留厌均,地道東北人唬滑。 一個月前我還...
    沈念sama閱讀 48,348評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像棺弊,于是被迫代替她去往敵國和親晶密。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,047評論 2 355

推薦閱讀更多精彩內(nèi)容