Hadoop MapReduce Demo

例子:
首先讓我們先看一個Map/Reduce的應用示例,以便對它們的工作方式有一個初步的認識掰担。
本次DEMO它可以統(tǒng)計出log日志中各種瀏覽器的數(shù)量。
在運行這個demo前怒炸,我們需要做以下準備:
一:統(tǒng)計數(shù)據(jù)準備和上傳HDFS
1.首先需要在HDFS上創(chuàng)建input带饱,output目錄
當前目錄下:/hadoop-2.7.3
創(chuàng)建方式如下:
①創(chuàng)建目錄input
$ hadoop fs -mkdir /input
②將本地的文件上傳到input中
先去 https://git.oschina.net/jeetpan/Hadoop狸剃,下載access.20120104.log文件

把本地access.log文件拷貝創(chuàng)建的input目錄圆丹。
$ hadoop fs -put xxx/access.log /input
③查看input目錄下的文件
$ hadoop fs -ls /input
④創(chuàng)建目錄output
$ hadoop fs -mkdir /output
二:Demo 程序


1.KpiBrowser 程序運行Main

/**
 * 統(tǒng)計用戶使用的客戶端程序
 */
public class KpiBrowser {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //參考core-site.xml中設置的url
        String[] inPath = new String[]{"hdfs://localhost:9000/input/*"};
        String outPath = "hdfs://localhost:9000/output";

        Configuration conf = new Configuration();
        String jobName = "browser-pv";

        JobInitModel job = new JobInitModel(inPath, outPath, conf, null, jobName
                , KpiBrowser.class, null, Mapper.class, Text.class, IntWritable.class, null, null, Reducer.class
                , Text.class, IntWritable.class);

        JobInitModel sortJob = new JobInitModel(new String[]{outPath + "/part-*"}, outPath + "/sort", conf, null
                , jobName + "sort", KpiBrowser.class, null, Mapper.class, Text.class, IntWritable.class, null, null, null, null, null);

        BaseDriver.initJob(new JobInitModel[]{job, sortJob});
    }
}

2.Mapper

/**
 * 讀取log文件中的記錄汁针,寫入Map
 */
public class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, IntWritable> {
    Text browser = new Text();
    IntWritable one = new IntWritable(1);
    Kpi kpi = new Kpi();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        kpi = Kpi.parse(value.toString());
        if (kpi.getIs_validate()) {
            browser.set(kpi.getUser_agent());
            context.write(browser, one);//寫入瀏覽器類型和數(shù)量
        }
    }
}

3.Reducer

/**
 * Reducer
 */
public class Reducer extends org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable> {
    IntWritable resCount = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        Integer sum = 0;
        for (IntWritable i : values) {
            sum += i.get();
        }
        resCount.set(sum);
        context.write(key, resCount);
    }
}

三:運行結(jié)果
KpiBrowser 程序運行結(jié)果

/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/bin/java -Didea.launcher.port=7532 "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath "/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/lib/tools.jar:/Users/jeetpan/WorkSpace/00-ProjectCode/GitHub/Hadoop/Hadoop-MapReduce/target/classes:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-hdfs/2.7.3/hadoop-hdfs-2.7.3.jar:/Users/jeetpan/.m2/repository/com/google/guava/guava/11.0.2/guava-11.0.2.jar:/Users/jeetpan/.m2/repository/org/mortbay/jetty/jetty/6.1.26/jetty-6.1.26.jar:/Users/jeetpan/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar:/Users/jeetpan/.m2/repository/com/sun/jersey/jersey-core/1.9/jersey-core-1.9.jar:/Users/jeetpan/.m2/repository/com/sun/jersey/jersey-server/1.9/jersey-server-1.9.jar:/Users/jeetpan/.m2/repository/asm/asm/3.1/asm-3.1.jar:/Users/jeetpan/.m2/repository/commons-cli/commons-cli/1.2/commons-cli-1.2.jar:/Users/jeetpan/.m2/repository/commons-codec/commons-codec/1.4/commons-codec-1.4.jar:/Users/jeetpan/.m2/repository/commons-io/commons-io/2.4/commons-io-2.4.jar:/Users/jeetpan/.m2/repository/commons-lang/commons-lang/2.6/commons-lang-2.6.jar:/Users/jeetpan/.m2/repository/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar:/Users/jeetpan/.m2/repository/commons-daemon/commons-daemon/1.0.13/commons-daemon-1.0.13.jar:/Users/jeetpan/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/Users/jeetpan/.m2/repository/com/google/protobuf/protobuf-java/2.5.0/protobuf-java-2.5.0.jar:/Users/jeetpan/.m2/repository/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar:/Users/jeetpan/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar:/Users/jeetpan/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar:/Users/jeetpan/.m2/repository/xmlenc/xmlenc/0.52/xmlenc-0.52.jar:/Users/jeetpan/.m2/repository/io/netty/netty/3.6.2.Final/netty-3.6.2.Final.jar:/Users/jeetpan/.m2/repository/io/netty/netty-all/4.0.23.Final/netty-all-4.0.23.Final.jar:/Users/jeetpan/.m2/repository/xerces/xercesImpl/2.9.1/xercesImpl-2.9.1.jar:/Users/jeetpan/.m2/repository/xml-apis/xml-apis/1.3.04/xml-apis-1.3.04.jar:/Users/jeetpan/.m2/repository/org/apache/htrace/htrace-core/3.1.0-incubating/htrace-core-3.1.0-incubating.jar:/Users/jeetpan/.m2/repository/org/fusesource/leveldbjni/leveldbjni-all/1.8/leveldbjni-all-1.8.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-common/2.7.3/hadoop-common-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-annotations/2.7.3/hadoop-annotations-2.7.3.jar:/Users/jeetpan/.m2/repository/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar:/Users/jeetpan/.m2/repository/commons-net/commons-net/3.1/commons-net-3.1.jar:/Users/jeetpan/.m2/repository/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar:/Users/jeetpan/.m2/repository/javax/servlet/jsp/jsp-api/2.1/jsp-api-2.1.jar:/Users/jeetpan/.m2/repository/com/sun/jersey/jersey-json/1.9/jersey-json-1.9.jar:/Users/jeetpan/.m2/repository/org/codehaus/jettison/jettison/1.1/jettison-1.1.jar:/Users/jeetpan/.m2/repository/com/sun/xml/bind/jaxb-impl/2.2.3-1/jaxb-impl-2.2.3-1.jar:/Users/jeetpan/.m2/repository/org/codehaus/jackson/jackson-jaxrs/1.8.3/jackson-jaxrs-1.8.3.jar:/Users/jeetpan/.m2/repository/org/codehaus/jackson/jackson-xc/1.8.3/jackson-xc-1.8.3.jar:/Users/jeetpan/.m2/repository/net/java/dev/jets3t/jets3t/0.9.0/jets3t-0.9.0.jar:/Users/jeetpan/.m2/repository/org/apache/httpcomponents/httpcore/4.1.2/httpcore-4.1.2.jar:/Users/jeetpan/.m2/repository/com/jamesmurty/utils/java-xmlbuilder/0.4/java-xmlbuilder-0.4.jar:/Users/jeetpan/.m2/repository/commons-configuration/commons-configuration/1.6/commons-configuration-1.6.jar:/Users/jeetpan/.m2/repository/commons-digester/commons-digester/1.8/commons-digester-1.8.jar:/Users/jeetpan/.m2/repository/commons-beanutils/commons-beanutils/1.7.0/commons-beanutils-1.7.0.jar:/Users/jeetpan/.m2/repository/commons-beanutils/commons-beanutils-core/1.8.0/commons-beanutils-core-1.8.0.jar:/Users/jeetpan/.m2/repository/org/slf4j/slf4j-api/1.7.10/slf4j-api-1.7.10.jar:/Users/jeetpan/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar:/Users/jeetpan/.m2/repository/org/apache/avro/avro/1.7.4/avro-1.7.4.jar:/Users/jeetpan/.m2/repository/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar:/Users/jeetpan/.m2/repository/org/xerial/snappy/snappy-java/1.0.4.1/snappy-java-1.0.4.1.jar:/Users/jeetpan/.m2/repository/com/google/code/gson/gson/2.2.4/gson-2.2.4.jar:/Users/jeetpan/.m2/repository/com/jcraft/jsch/0.1.42/jsch-0.1.42.jar:/Users/jeetpan/.m2/repository/org/apache/curator/curator-client/2.7.1/curator-client-2.7.1.jar:/Users/jeetpan/.m2/repository/org/apache/curator/curator-recipes/2.7.1/curator-recipes-2.7.1.jar:/Users/jeetpan/.m2/repository/com/google/code/findbugs/jsr305/3.0.0/jsr305-3.0.0.jar:/Users/jeetpan/.m2/repository/org/apache/zookeeper/zookeeper/3.4.6/zookeeper-3.4.6.jar:/Users/jeetpan/.m2/repository/org/apache/commons/commons-compress/1.4.1/commons-compress-1.4.1.jar:/Users/jeetpan/.m2/repository/org/tukaani/xz/1.0/xz-1.0.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-core/2.7.3/hadoop-mapreduce-client-core-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-yarn-common/2.7.3/hadoop-yarn-common-2.7.3.jar:/Users/jeetpan/.m2/repository/javax/xml/bind/jaxb-api/2.2.2/jaxb-api-2.2.2.jar:/Users/jeetpan/.m2/repository/javax/xml/stream/stax-api/1.0-2/stax-api-1.0-2.jar:/Users/jeetpan/.m2/repository/javax/activation/activation/1.1/activation-1.1.jar:/Users/jeetpan/.m2/repository/com/sun/jersey/jersey-client/1.9/jersey-client-1.9.jar:/Users/jeetpan/.m2/repository/com/google/inject/guice/3.0/guice-3.0.jar:/Users/jeetpan/.m2/repository/javax/inject/javax.inject/1/javax.inject-1.jar:/Users/jeetpan/.m2/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar:/Users/jeetpan/.m2/repository/com/sun/jersey/contribs/jersey-guice/1.9/jersey-guice-1.9.jar:/Users/jeetpan/.m2/repository/com/google/inject/extensions/guice-servlet/3.0/guice-servlet-3.0.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-client/2.7.3/hadoop-client-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-app/2.7.3/hadoop-mapreduce-client-app-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-common/2.7.3/hadoop-mapreduce-client-common-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-yarn-client/2.7.3/hadoop-yarn-client-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-yarn-server-common/2.7.3/hadoop-yarn-server-common-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-shuffle/2.7.3/hadoop-mapreduce-client-shuffle-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-yarn-api/2.7.3/hadoop-yarn-api-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-jobclient/2.7.3/hadoop-mapreduce-client-jobclient-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-auth/2.7.3/hadoop-auth-2.7.3.jar:/Users/jeetpan/.m2/repository/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar:/Users/jeetpan/.m2/repository/org/apache/directory/server/apacheds-kerberos-codec/2.0.0-M15/apacheds-kerberos-codec-2.0.0-M15.jar:/Users/jeetpan/.m2/repository/org/apache/directory/server/apacheds-i18n/2.0.0-M15/apacheds-i18n-2.0.0-M15.jar:/Users/jeetpan/.m2/repository/org/apache/directory/api/api-asn1-api/1.0.0-M20/api-asn1-api-1.0.0-M20.jar:/Users/jeetpan/.m2/repository/org/apache/directory/api/api-util/1.0.0-M20/api-util-1.0.0-M20.jar:/Users/jeetpan/.m2/repository/org/apache/curator/curator-framework/2.7.1/curator-framework-2.7.1.jar:/Users/jeetpan/.m2/repository/org/apache/mahout/mahout-core/0.9/mahout-core-0.9.jar:/Users/jeetpan/.m2/repository/org/apache/commons/commons-lang3/3.1/commons-lang3-3.1.jar:/Users/jeetpan/.m2/repository/com/thoughtworks/xstream/xstream/1.4.4/xstream-1.4.4.jar:/Users/jeetpan/.m2/repository/xmlpull/xmlpull/1.1.3.1/xmlpull-1.1.3.1.jar:/Users/jeetpan/.m2/repository/xpp3/xpp3_min/1.1.4c/xpp3_min-1.1.4c.jar:/Users/jeetpan/.m2/repository/org/apache/lucene/lucene-core/4.6.1/lucene-core-4.6.1.jar:/Users/jeetpan/.m2/repository/org/apache/lucene/lucene-analyzers-common/4.6.1/lucene-analyzers-common-4.6.1.jar:/Users/jeetpan/.m2/repository/org/apache/mahout/commons/commons-cli/2.0-mahout/commons-cli-2.0-mahout.jar:/Users/jeetpan/.m2/repository/org/apache/solr/solr-commons-csv/3.5.0/solr-commons-csv-3.5.0.jar:/Users/jeetpan/.m2/repository/org/apache/hadoop/hadoop-core/1.2.1/hadoop-core-1.2.1.jar:/Users/jeetpan/.m2/repository/org/apache/commons/commons-math/2.1/commons-math-2.1.jar:/Users/jeetpan/.m2/repository/commons-el/commons-el/1.0/commons-el-1.0.jar:/Users/jeetpan/.m2/repository/org/apache/mahout/mahout-math/0.9/mahout-math-0.9.jar:/Users/jeetpan/.m2/repository/org/apache/mahout/mahout-integration/0.9/mahout-integration-0.9.jar:/Users/jeetpan/.m2/repository/org/apache/commons/commons-math3/3.2/commons-math3-3.2.jar:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar" com.intellij.rt.execution.application.AppMain com.jeet.hadoop.mapreduce.kpi.browser.KpiBrowser
16/12/30 15:12:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/12/30 15:12:08 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/12/30 15:12:08 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/12/30 15:12:08 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/12/30 15:12:08 WARN mapreduce.JobResourceUploader: No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
16/12/30 15:12:08 INFO input.FileInputFormat: Total input paths to process : 1
16/12/30 15:12:09 INFO mapreduce.JobSubmitter: number of splits:6
16/12/30 15:12:09 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1040540662_0001
16/12/30 15:12:09 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
16/12/30 15:12:09 INFO mapreduce.Job: Running job: job_local1040540662_0001
16/12/30 15:12:09 INFO mapred.LocalJobRunner: OutputCommitter set in config null
16/12/30 15:12:09 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/12/30 15:12:09 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
16/12/30 15:12:09 INFO mapred.LocalJobRunner: Waiting for map tasks
16/12/30 15:12:09 INFO mapred.LocalJobRunner: Starting task: attempt_local1040540662_0001_m_000000_0
16/12/30 15:12:09 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/12/30 15:12:09 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
16/12/30 15:12:09 INFO mapred.Task:  Using ResourceCalculatorProcessTree : null
16/12/30 15:12:09 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/input/access.20120104.log:0+134217728
16/12/30 15:12:09 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
16/12/30 15:12:09 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
16/12/30 15:12:09 INFO mapred.MapTask: soft limit at 83886080
16/12/30 15:12:09 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
16/12/30 15:12:09 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
16/12/30 15:12:09 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
16/12/30 15:12:10 INFO mapreduce.Job: Job job_local1040540662_0001 running in uber mode : false
16/12/30 15:12:10 INFO mapreduce.Job:  map 0% reduce 0%
16/12/30 15:12:11 INFO mapred.LocalJobRunner: 
16/12/30 15:12:11 INFO mapred.MapTask: Starting flush of map output
16/12/30 15:12:11 INFO mapred.MapTask: Spilling map output
16/12/30 15:12:11 INFO mapred.MapTask: bufstart = 0; bufend = 2092378; bufvoid = 104857600
16/12/30 15:12:11 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 25724936(102899744); length = 489461/6553600
16/12/30 15:12:12 INFO mapred.MapTask: Finished spill 0
16/12/30 15:12:12 INFO mapred.Task: Task:attempt_local1040540662_0001_m_000000_0 is done. And is in the process of committing
16/12/30 15:12:12 INFO mapred.LocalJobRunner: map
16/12/30 15:12:12 INFO mapred.Task: Task 'attempt_local1040540662_0001_m_000000_0' done.
16/12/30 15:12:12 INFO mapred.LocalJobRunner: Finishing task: attempt_local1040540662_0001_m_000000_0
16/12/30 15:12:12 INFO mapred.LocalJobRunner: Starting task: attempt_local1040540662_0001_m_000001_0
16/12/30 15:12:12 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/12/30 15:12:12 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
16/12/30 15:12:12 INFO mapred.Task:  Using ResourceCalculatorProcessTree : null
16/12/30 15:12:12 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/input/access.20120104.log:134217728+134217728
16/12/30 15:12:12 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
16/12/30 15:12:12 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
16/12/30 15:12:12 INFO mapred.MapTask: soft limit at 83886080
16/12/30 15:12:12 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
16/12/30 15:12:12 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
16/12/30 15:12:12 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
16/12/30 15:12:12 INFO mapreduce.Job:  map 100% reduce 0%
16/12/30 15:12:13 INFO mapred.LocalJobRunner: 
16/12/30 15:12:13 INFO mapred.MapTask: Starting flush of map output
16/12/30 15:12:13 INFO mapred.MapTask: Spilling map output
16/12/30 15:12:13 INFO mapred.MapTask: bufstart = 0; bufend = 2657832; bufvoid = 104857600
16/12/30 15:12:13 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 25592192(102368768); length = 622205/6553600
16/12/30 15:12:13 INFO mapred.MapTask: Finished spill 0
16/12/30 15:12:13 INFO mapred.Task: Task:attempt_local1040540662_0001_m_000001_0 is done. And is in the process of committing
16/12/30 15:12:13 INFO mapred.LocalJobRunner: map
16/12/30 15:12:13 INFO mapred.Task: Task 'attempt_local1040540662_0001_m_000001_0' done.
16/12/30 15:12:13 INFO mapred.LocalJobRunner: Finishing task: attempt_local1040540662_0001_m_000001_0
16/12/30 15:12:13 INFO mapred.LocalJobRunner: Starting task: attempt_local1040540662_0001_m_000002_0
16/12/30 15:12:13 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/12/30 15:12:13 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
16/12/30 15:12:13 INFO mapred.Task:  Using ResourceCalculatorProcessTree : null
16/12/30 15:12:13 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/input/access.20120104.log:268435456+134217728
16/12/30 15:12:13 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
16/12/30 15:12:13 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
16/12/30 15:12:13 INFO mapred.MapTask: soft limit at 83886080
16/12/30 15:12:13 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
16/12/30 15:12:13 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
16/12/30 15:12:13 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
16/12/30 15:12:15 INFO mapred.LocalJobRunner: 
.....
.....
.....
16/12/30 15:12:18 INFO mapred.LocalJobRunner: 6 / 6 copied.
16/12/30 15:12:18 INFO reduce.MergeManagerImpl: finalMerge called with 6 in-memory map-outputs and 0 on-disk map-outputs
16/12/30 15:12:18 INFO mapred.Merger: Merging 6 sorted segments
16/12/30 15:12:18 INFO mapred.Merger: Down to the last merge-pass, with 6 segments left of total size: 14852536 bytes
16/12/30 15:12:18 INFO reduce.MergeManagerImpl: Merged 6 segments, 14852761 bytes to disk to satisfy reduce memory limit
16/12/30 15:12:18 INFO reduce.MergeManagerImpl: Merging 1 files, 14852755 bytes from disk
16/12/30 15:12:18 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
16/12/30 15:12:18 INFO mapred.Merger: Merging 1 sorted segments
16/12/30 15:12:18 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 14852748 bytes
16/12/30 15:12:18 INFO mapred.LocalJobRunner: 6 / 6 copied.
16/12/30 15:12:18 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
16/12/30 15:12:19 INFO mapred.Task: Task:attempt_local1040540662_0001_r_000000_0 is done. And is in the process of committing
16/12/30 15:12:19 INFO mapred.LocalJobRunner: 6 / 6 copied.
16/12/30 15:12:19 INFO mapred.Task: Task attempt_local1040540662_0001_r_000000_0 is allowed to commit now
16/12/30 15:12:19 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1040540662_0001_r_000000_0' to hdfs://localhost:9000/output/_temporary/0/task_local1040540662_0001_r_000000
16/12/30 15:12:19 INFO mapred.LocalJobRunner: reduce > reduce
16/12/30 15:12:19 INFO mapred.Task: Task 'attempt_local1040540662_0001_r_000000_0' done.
16/12/30 15:12:19 INFO mapred.LocalJobRunner: Finishing task: attempt_local1040540662_0001_r_000000_0
16/12/30 15:12:19 INFO mapred.LocalJobRunner: reduce task executor complete.
16/12/30 15:12:20 INFO mapreduce.Job:  map 100% reduce 100%
16/12/30 15:12:20 INFO mapreduce.Job: Job job_local1040540662_0001 completed successfully
16/12/30 15:12:20 INFO mapreduce.Job: Counters: 35
    File System Counters
        FILE: Number of bytes read=29724390
        FILE: Number of bytes written=88981738
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=3389790430
        HDFS: Number of bytes written=934
        HDFS: Number of read operations=92
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=16
    Map-Reduce Framework
        Map input records=2929645
        Map output records=778116
        Map output bytes=13296517
        Map output materialized bytes=14852785
        Input split bytes=672
        Combine input records=0
        Combine output records=0
        Reduce input groups=44
        Reduce shuffle bytes=14852785
        Reduce input records=778116
        Reduce output records=44
        Spilled Records=1556232
        Shuffled Maps =6
        Failed Shuffles=0
        Merged Map outputs=6
        GC time elapsed (ms)=453
        Total committed heap usage (bytes)=11341922304
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=688231535
    File Output Format Counters 
        Bytes Written=934
16/12/30 15:12:20 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/12/30 15:12:20 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/12/30 15:12:20 WARN mapreduce.JobResourceUploader: No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
.....
.....
.....
15:12:21 INFO mapreduce.Job: Job job_local558846411_0002 running in uber mode : false
16/12/30 15:12:21 INFO mapreduce.Job:  map 100% reduce 100%
16/12/30 15:12:21 INFO mapreduce.Job: Job job_local558846411_0002 completed successfully
16/12/30 15:12:21 INFO mapreduce.Job: Counters: 35
    File System Counters
        FILE: Number of bytes read=59419410
        FILE: Number of bytes written=60554086
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=1376464938
        HDFS: Number of bytes written=1868
        HDFS: Number of read operations=63
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=18
    Map-Reduce Framework
        Map input records=44
        Map output records=0
        Map output bytes=0
        Map output materialized bytes=6
        Input split bytes=106
        Combine input records=0
        Combine output records=0
        Reduce input groups=0
        Reduce shuffle bytes=6
        Reduce input records=0
        Reduce output records=0
        Spilled Records=0
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=0
        Total committed heap usage (bytes)=3979345920
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=934
    File Output Format Counters 
        Bytes Written=0

Process finished with exit code 0

訪問 http://localhost:50070/ 查詢運行結(jié)果

Paste_Image.png

下載part-r-00000查看統(tǒng)計結(jié)果

    6
"HTCT9188_TD/1.0    3
"HuaweiSymantecSpider/1.0+DSE-support@huaweisymantec.com+(compatible;   1200
"MQQBrowser/1.9.1   4
"MQQBrowser/2.1 8
"MQQBrowser/28  13
"MQQBrowser/29  4
"Microsoft  80
"Mozilla/4.0    668020
"Mozilla/5.0    108539
"Nokia5230/21.0.004 4
"Nokia5233/40.1.003 2
"Nokia5233/50.1.001 1
"Nokia5800w/51.0.006    1
"Nokia6120c/3.83.2  2
"Nokia6120c/4.21    15
"Nokia6120ci/7.10   1
"NokiaC5-03/20.0.024    1
"NokiaC7-00/022.014 1
"NokiaC7-00/CU/022.014  15
"NokiaE63/200.21.005    2
"NokiaE72-1/031.023 1
"NokiaE72-1/081.003 1
"NokiaE72/031.023   3
.......
.......

四:核心功能描述
應用程序通常會通過提供map和reduce來實現(xiàn) Mapper和Reducer接口吆玖,它們組成作業(yè)的核心。

Mapper
Mapper將輸入鍵值對(key/value pair)映射到一組中間格式的鍵值對集合伙菜。
Map是一類將輸入記錄集轉(zhuǎn)換為中間格式記錄集的獨立任務贰镣。 這種轉(zhuǎn)換的中間格式記錄集不需要與輸入記錄集的類型一致酒繁。
一個給定的輸入鍵值對可以映射成0個或多個輸出鍵值對导梆。
Hadoop Map/Reduce框架為每一個InputSplit產(chǎn)生一個map任務轨淌,而每個InputSplit是由該作業(yè)的InputFormat產(chǎn)生的。

Reducer
Reducer將與一個key關(guān)聯(lián)的一組中間數(shù)值集歸約(reduce)為一個更小的數(shù)值集看尼。
用戶可以通過 JobConf.setNumReduceTasks(int)設定一個作業(yè)中reduce任務的數(shù)目递鹉。
概括地說,對Reducer的實現(xiàn)者需要重寫 JobConfigurable.configure(JobConf)方法藏斩,這個方法需要傳遞一個JobConf參數(shù)梳虽,目的是完成Reducer的初始化工作。
然后灾茁,框架為成組的輸入數(shù)據(jù)中的每個<key, (list of values)>對調(diào)用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法窜觉。
之后,應用程序可以通過重寫Closeable.close()來執(zhí)行相應的清理工作北专。
Reducer有3個主要階段:shuffle禀挫、sort和reduce。

Shuffle
Reducer的輸入就是Mapper已經(jīng)排好序的輸出拓颓。在這個階段语婴,框架通過HTTP為每個Reducer獲得所有Mapper輸出中與之相關(guān)的分塊。
Sort
這個階段驶睦,框架將按照key的值對Reducer的輸入進行分組 (因為不同mapper的輸出中可能會有相同的key)砰左。
Shuffle和Sort兩個階段是同時進行的;map的輸出也是一邊被取回一邊被合并的场航。
Secondary Sort
如果需要中間過程對key的分組規(guī)則和reduce前對key的分組規(guī)則不同缠导,那么可以通過 JobConf.setOutputValueGroupingComparator(Class)來指定一個Comparator。再加上 JobConf.setOutputKeyComparatorClass(Class)可用于控制中間過程的key如何被分組溉痢,所以結(jié)合兩者可以實現(xiàn)按值的二次排序僻造。
Reduce
在這個階段,框架為已分組的輸入數(shù)據(jù)中的每個 <key, (list of values)>對調(diào)用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法孩饼。
Reduce任務的輸出通常是通過調(diào)用 OutputCollector.collect(WritableComparable, Writable)寫入 文件系統(tǒng)的髓削。
應用程序可以使用Reporter報告進度,設定應用程序級別的狀態(tài)消息镀娶,更新Counters(計數(shù)器)立膛,或者僅是表明自己運行正常。
Reducer的輸出是沒有排序的梯码。
需要多少個Reduce宝泵?
Reduce的數(shù)目建議是0.95或1.75乘以 (<no. of nodes> * mapred.tasktracker.reduce.tasks.maximum)。
用0.95忍些,所有reduce可以在maps一完成時就立刻啟動鲁猩,開始傳輸map的輸出結(jié)果。用1.75罢坝,速度快的節(jié)點可以在完成第一輪reduce任務后廓握,可以開始第二輪,這樣可以得到比較好的負載均衡的效果嘁酿。
增加reduce的數(shù)目會增加整個框架的開銷隙券,但可以改善負載均衡,降低由于執(zhí)行失敗帶來的負面影響闹司。
上述比例因子比整體數(shù)目稍小一些是為了給框架中的推測性任務(speculative-tasks) 或失敗的任務預留一些reduce的資源娱仔。
無Reducer
如果沒有歸約要進行,那么設置reduce任務的數(shù)目為零是合法的游桩。
這種情況下牲迫,map任務的輸出會直接被寫入由 setOutputPath(Path)指定的輸出路徑耐朴。框架在把它們寫入FileSystem之前沒有對它們進行排序盹憎。
Partitioner
Partitioner用于劃分鍵值空間(key space)筛峭。
Partitioner負責控制map輸出結(jié)果key的分割。Key(或者一個key子集)被用于產(chǎn)生分區(qū)陪每,通常使用的是Hash函數(shù)影晓。分區(qū)的數(shù)目與一個作業(yè)的reduce任務的數(shù)目是一樣的。因此檩禾,它控制將中間過程的key(也就是這條記錄)應該發(fā)送給m個reduce任務中的哪一個來進行reduce操作挂签。
HashPartitioner是默認的 Partitioner。
Reporter
Reporter是用于Map/Reduce應用程序報告進度盼产,設定應用級別的狀態(tài)消息饵婆, 更新Counters(計數(shù)器)的機制。
Mapper和Reducer的實現(xiàn)可以利用Reporter 來報告進度辆飘,或者僅是表明自己運行正常啦辐。在那種應用程序需要花很長時間處理個別鍵值對的場景中,這種機制是很關(guān)鍵的蜈项,因為框架可能會以為這個任務超時了芹关,從而將它強行殺死。另一個避免這種情況發(fā)生的方式是紧卒,將配置參數(shù)mapred.task.timeout設置為一個足夠高的值(或者干脆設置為零侥衬,則沒有超時限制了)。
應用程序可以用Reporter來更新Counter(計數(shù)器)跑芳。
OutputCollector
OutputCollector是一個Map/Reduce框架提供的用于收集 Mapper或Reducer輸出數(shù)據(jù)的通用機制 (包括中間輸出結(jié)果和作業(yè)的輸出結(jié)果)轴总。
Hadoop Map/Reduce框架附帶了一個包含許多實用型的mapper、reducer和partitioner 的類庫博个。
作業(yè)配置
JobConf代表一個Map/Reduce作業(yè)的配置怀樟。
JobConf是用戶向Hadoop框架描述一個Map/Reduce作業(yè)如何執(zhí)行的主要接口∨栌叮框架會按照JobConf描述的信息忠實地去嘗試完成這個作業(yè)往堡,然而:
一些參數(shù)可能會被管理者標記為 final,這意味它們不能被更改共耍。
一些作業(yè)的參數(shù)可以被直截了當?shù)剡M行設置(例如: setNumReduceTasks(int))虑灰,而另一些參數(shù)則與框架或者作業(yè)的其他參數(shù)之間微妙地相互影響,并且設置起來比較復雜(例如: setNumMapTasks(int))痹兜。

通常穆咐,JobConf會指明Mapper、Combiner(如果有的話)、 Partitioner对湃、Reducer崖叫、InputFormat和 OutputFormat的具體實現(xiàn)。JobConf還能指定一組輸入文件 (setInputPaths(JobConf, Path...) /addInputPath(JobConf, Path)) 和(setInputPaths(JobConf, String) /addInputPaths(JobConf, String)) 以及輸出文件應該寫在哪兒 (setOutputPath(Path))熟尉。
JobConf可選擇地對作業(yè)設置一些高級選項归露,例如:設置Comparator; 放到DistributedCache上的文件斤儿;中間結(jié)果或者作業(yè)輸出結(jié)果是否需要壓縮以及怎么壓縮; 利用用戶提供的腳本(setMapDebugScript(String)/setReduceDebugScript(String)) 進行調(diào)試恐锦;作業(yè)是否允許預防性(speculative)任務的執(zhí)行 (setMapSpeculativeExecution(boolean))/(setReduceSpeculativeExecution(boolean)) 往果;每個任務最大的嘗試次數(shù) (setMaxMapAttempts(int)/setMaxReduceAttempts(int)) ;一個作業(yè)能容忍的任務失敗的百分比 (setMaxMapTaskFailuresPercent(int)/setMaxReduceTaskFailuresPercent(int)) 一铅;等等陕贮。
當然,用戶能使用 set(String, String)/get(String, String) 來設置或者取得應用程序需要的任意參數(shù)潘飘。然而肮之,DistributedCache的使用是面向大規(guī)模只讀數(shù)據(jù)的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末卜录,一起剝皮案震驚了整個濱河市戈擒,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌艰毒,老刑警劉巖筐高,帶你破解...
    沈念sama閱讀 217,826評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異丑瞧,居然都是意外死亡柑土,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評論 3 395
  • 文/潘曉璐 我一進店門绊汹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來稽屏,“玉大人,你說我怎么就攤上這事西乖『疲” “怎么了?”我有些...
    開封第一講書人閱讀 164,234評論 0 354
  • 文/不壞的土叔 我叫張陵浴栽,是天一觀的道長荒叼。 經(jīng)常有香客問我,道長典鸡,這世上最難降的妖魔是什么被廓? 我笑而不...
    開封第一講書人閱讀 58,562評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮萝玷,結(jié)果婚禮上嫁乘,老公的妹妹穿的比我還像新娘昆婿。我一直安慰自己,他們只是感情好蜓斧,可當我...
    茶點故事閱讀 67,611評論 6 392
  • 文/花漫 我一把揭開白布仓蛆。 她就那樣靜靜地躺著,像睡著了一般挎春。 火紅的嫁衣襯著肌膚如雪看疙。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,482評論 1 302
  • 那天直奋,我揣著相機與錄音能庆,去河邊找鬼。 笑死脚线,一個胖子當著我的面吹牛搁胆,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播邮绿,決...
    沈念sama閱讀 40,271評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼渠旁,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了船逮?” 一聲冷哼從身側(cè)響起顾腊,我...
    開封第一講書人閱讀 39,166評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎傻唾,沒想到半個月后投慈,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,608評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡冠骄,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,814評論 3 336
  • 正文 我和宋清朗相戀三年伪煤,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片凛辣。...
    茶點故事閱讀 39,926評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡抱既,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出扁誓,到底是詐尸還是另有隱情防泵,我是刑警寧澤,帶...
    沈念sama閱讀 35,644評論 5 346
  • 正文 年R本政府宣布蝗敢,位于F島的核電站捷泞,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏寿谴。R本人自食惡果不足惜锁右,卻給世界環(huán)境...
    茶點故事閱讀 41,249評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧咏瑟,春花似錦拂到、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至余寥,卻和暖如春领铐,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背劈狐。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評論 1 269
  • 我被黑心中介騙來泰國打工罐孝, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人肥缔。 一個月前我還...
    沈念sama閱讀 48,063評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像汹来,于是被迫代替她去往敵國和親续膳。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,871評論 2 354

推薦閱讀更多精彩內(nèi)容