大數(shù)據(jù)之Hadoop-MapReduce(1)

第1章 MapReduce概述

1.1 MapReduce定義

MapReduce是一個(gè)分布式運(yùn)算程序的編程框架,是用戶開(kāi)發(fā)“基于Hadoop的數(shù)據(jù)分析應(yīng)用”的核心框架朗伶。
MapReduce核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個(gè)完整的分布式運(yùn)算程序好渠,并發(fā)運(yùn)行在一個(gè)Hadoop集群上昨稼。

1.2 MapReduce優(yōu)缺點(diǎn)

1.2.1 優(yōu)點(diǎn)

MapReduce 易于編程
它簡(jiǎn)單的實(shí)現(xiàn)一些接口,就可以完成一個(gè)分布式程序拳锚,這個(gè)分布式程序可以分布到大量廉價(jià)的PC機(jī)器上運(yùn)行假栓。也就是說(shuō)你寫一個(gè)分布式程序,跟寫一個(gè)簡(jiǎn)單的串行程序是一模一樣的霍掺。就是因?yàn)檫@個(gè)特點(diǎn)使得MapReduce編程變得非常流行匾荆。

良好的擴(kuò)展性
當(dāng)你的計(jì)算資源不能得到滿足的時(shí)候,你可以通過(guò)簡(jiǎn)單的增加機(jī)器來(lái)擴(kuò)展它的計(jì)算能力杆烁。

高容錯(cuò)性
MapReduce設(shè)計(jì)的初衷就是使程序能夠部署在廉價(jià)的PC機(jī)器上牙丽,這就要求它具有很高的容錯(cuò)性。比如其中一臺(tái)機(jī)器掛了兔魂,它可以把上面的計(jì)算任務(wù)轉(zhuǎn)移到另外一個(gè)節(jié)點(diǎn)上運(yùn)行烤芦,不至于這個(gè)任務(wù)運(yùn)行失敗,而且這個(gè)過(guò)程不需要人工參與析校,而完全是由Hadoop內(nèi)部完成的构罗。

大數(shù)據(jù)處理
可以實(shí)現(xiàn)上千臺(tái)服務(wù)器集群并發(fā)工作铜涉,提供數(shù)據(jù)處理能力。

1.2.2 缺點(diǎn)

不擅長(zhǎng)實(shí)時(shí)計(jì)算
MapReduce無(wú)法像MySQL一樣绰播,在毫秒或者秒級(jí)內(nèi)返回結(jié)果骄噪。

不擅長(zhǎng)流式計(jì)算
流式計(jì)算的輸入數(shù)據(jù)是動(dòng)態(tài)的,而MapReduce的輸入數(shù)據(jù)集是靜態(tài)的蠢箩,不能動(dòng)態(tài)變化链蕊。這是因?yàn)镸apReduce自身的設(shè)計(jì)特點(diǎn)決定了數(shù)據(jù)源必須是靜態(tài)的。

不擅長(zhǎng)DAG(有向圖)計(jì)算
多個(gè)應(yīng)用程序存在依賴關(guān)系谬泌,后一個(gè)應(yīng)用程序的輸入為前一個(gè)的輸出滔韵。在這種情況下,MapReduce并不是不能做掌实,而是使用后陪蜻,每個(gè)MapReduce作業(yè)的輸出結(jié)果都會(huì)寫入到磁盤,會(huì)造成大量的磁盤IO贱鼻,導(dǎo)致性能非常的低下宴卖。

1.3 MapReduce核心思想


1)分布式的運(yùn)算程序往往需要分成至少2個(gè)階段。
2)第一個(gè)階段的MapTask并發(fā)實(shí)例邻悬,完全并行運(yùn)行症昏,互不相干。
3)第二個(gè)階段的ReduceTask并發(fā)實(shí)例互不相干父丰,但是他們的數(shù)據(jù)依賴于上一個(gè)階段的所有MapTask并發(fā)實(shí)例的輸出肝谭。
4)MapReduce編程模型只能包含一個(gè)Map階段和一個(gè)Reduce階段,如果用戶的業(yè)務(wù)邏輯非常復(fù)雜蛾扇,那就只能多個(gè)MapReduce程序攘烛,串行運(yùn)行。

1.4 MapReduce進(jìn)程

一個(gè)完整的MapReduce程序在分布式運(yùn)行時(shí)有三類實(shí)例進(jìn)程:
1)MrAppMaster:負(fù)責(zé)整個(gè)程序的過(guò)程調(diào)度及狀態(tài)協(xié)調(diào)镀首。
2)MapTask:負(fù)責(zé)Map階段的整個(gè)數(shù)據(jù)處理流程坟漱。
3)ReduceTask:負(fù)責(zé)Reduce階段的整個(gè)數(shù)據(jù)處理流程。

1.5 常用數(shù)據(jù)序列化類型

Java類型 Hadoop Writable類型
Boolean BooleanWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWritable
Double DoubleWritable
String Text
Map MapWritable
Array ArrayWritable

1.6 MapReduce編程規(guī)范

用戶編寫的程序分成三個(gè)部分:Mapper更哄、Reducer和Driver靖秩。

1 Mapper階段
(1)用戶自定義的Mapper要繼承自己的父類
(2)Mapper的輸入數(shù)據(jù)是KV對(duì)的形式(KV的類型可自定義)
(3)Mapper中的業(yè)務(wù)邏輯寫在map()方法中
(4)Mapper的輸出數(shù)據(jù)是KV對(duì)的形式(KV的類型可自定義)
(5)map()方法(MapTask進(jìn)程)對(duì)每一個(gè)<K,V>調(diào)用一次

2 Reducer階段
(1)用戶自定義的Reducer要繼承自己的父類
(2)Reducer的輸入數(shù)據(jù)類型對(duì)應(yīng)Mapper的輸出數(shù)據(jù)類型,也是KV
(3)Reducer的業(yè)務(wù)邏輯寫在reduce()方法中
(4)ReduceTask進(jìn)程對(duì)每一組相同k的<k,v>組調(diào)用一次reduce()方法

3 Driver階段
相當(dāng)于YARN集群的客戶端竖瘾,用于提交我們整個(gè)程序到Y(jié)ARN集群沟突,提交的是封裝了MapReduce程序相關(guān)運(yùn)行參數(shù)的job對(duì)象

1.7 WordCount案例實(shí)操

1 需求
在給定的文本文件中統(tǒng)計(jì)輸出每一個(gè)單詞出現(xiàn)的總次數(shù)
(1)輸入數(shù)據(jù):hello.txt

hello world
hello java beans
hello spark sql
scala
spark streaming
python spark

(2)期望輸出數(shù)據(jù)

beans   1
hello   3
java    1
python  1
scala   1
spark   3
sql 1
streaming   1
world   1

2 需求分析
按照MapReduce編程規(guī)范,分別編寫Mapper捕传,Reducer惠拭,Driver。

3 環(huán)境準(zhǔn)備
(1)創(chuàng)建maven工程模塊hadoop_mapreduce
(2)在pom.xml文件中添加如下依賴

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>
    </dependencies>

(2)在項(xiàng)目的src/main/resources目錄下,新建一個(gè)文件职辅,命名為“l(fā)og4j.properties”棒呛,寫入以下內(nèi)容。

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

4 編寫程序
(1)編寫Mapper類

package com.jackyan.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    Text k = new Text();
    IntWritable v = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 獲取一行數(shù)據(jù)
        String line = value.toString();

        // 按空格切割成一個(gè)一個(gè)的單詞
        String[] words = line.split(" ");

        // 遍歷處理每一個(gè)單詞域携,按(word, 1)的格式輸出
        for (String word : words) {
            k.set(word);
            context.write(k, v);
        }
    }
}

(2)編寫Reducer類

package com.jackyan.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class WrodCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    // 用來(lái)計(jì)數(shù)word的數(shù)量
    int sum;
    IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        v.set(sum);
        context.write(key, v);
    }
}

(3)編寫Driver驅(qū)動(dòng)類

package com.jackyan.mapreduce;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {
    public static void main(String[] args) throws Exception {
        // 獲取配置信息及封裝任務(wù)
        Job job = Job.getInstance();

        // 設(shè)置jar加載路徑
        job.setJarByClass(WordCountDriver.class);

        // 設(shè)置map和reduce類
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WrodCountReducer.class);

        // 設(shè)置map輸出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 設(shè)置最終輸出kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 設(shè)置輸入輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

5 本地測(cè)試
直接在Idea中運(yùn)行

6 集群上測(cè)試
用maven打jar包簇秒,需要添加的打包插件依賴

<build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.jackyan.mapreduce.WordCountDriver</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

將程序打成jar包,然后拷貝到Hadoop集群中


啟動(dòng)Hadoop集群秀鞭,將target下的hadoop_mapreduce-1.0-SNAPSHOT.jar拷貝到hadoop101機(jī)器上

[hadoop@hadoop101 ~]$ ls
hadoop_mapreduce-1.0-SNAPSHOT.jar  input

將hello.txt上傳到hdfs趋观,執(zhí)行WordCount程序

[hadoop@hadoop101 ~]$ hadoop jar hadoop_mapreduce-1.0-SNAPSHOT.jar com.jackyan.mapreduce.WordCountDriver /hello.txt /output

查看執(zhí)行結(jié)果

[hadoop@hadoop101 ~]$ hdfs dfs -cat /output/*
beans   1
hello   3
java    1
python  1
scala   1
spark   3
sql 1
streaming   1
world   1

第2章 Hadoop序列化

2.1 序列化概述

2.1.1 什么是序列化

序列化就是把內(nèi)存中的對(duì)象,轉(zhuǎn)換成字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)以便于存儲(chǔ)到磁盤(持久化)和網(wǎng)絡(luò)傳輸锋边。
反序列化就是將收到字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)或者是磁盤的持久化數(shù)據(jù)皱坛,轉(zhuǎn)換成內(nèi)存中的對(duì)象。

2.1.2 為什么要序列化

一般來(lái)說(shuō)豆巨,“活的”對(duì)象只生存在內(nèi)存里剩辟,關(guān)機(jī)斷電就沒(méi)有了。而且“活的”對(duì)象只能由本地的進(jìn)程使用往扔,不能被發(fā)送到網(wǎng)絡(luò)上的另外一臺(tái)計(jì)算機(jī)贩猎。 然而序列化可以存儲(chǔ)“活的”對(duì)象,可以將“活的”對(duì)象發(fā)送到遠(yuǎn)程計(jì)算機(jī)萍膛。

2.1.3 為什么不用Java的序列化

Java的序列化是一個(gè)重量級(jí)序列化框架(Serializable)吭服,一個(gè)對(duì)象被序列化后,會(huì)附帶很多額外的信息(各種校驗(yàn)信息卦羡,Header噪馏,繼承體系等)麦到,不便于在網(wǎng)絡(luò)中高效傳輸绿饵。所以,Hadoop自己開(kāi)發(fā)了一套序列化機(jī)制(Writable)瓶颠。

Hadoop序列化特點(diǎn):
(1)緊湊 :高效使用存儲(chǔ)空間拟赊。
(2)快速:讀寫數(shù)據(jù)的額外開(kāi)銷小。
(3)可擴(kuò)展:隨著通信協(xié)議的升級(jí)而可升級(jí)
(4)互操作:支持多語(yǔ)言的交互

2.2 Hadoop序列化方法

自定義bean對(duì)象實(shí)現(xiàn)序列化接口(Writable)
在企業(yè)開(kāi)發(fā)中往往常用的基本序列化類型不能滿足所有需求粹淋,比如在Hadoop框架內(nèi)部傳遞一個(gè)bean對(duì)象吸祟,那么該對(duì)象就需要實(shí)現(xiàn)序列化接口。
具體實(shí)現(xiàn)bean對(duì)象序列化步驟如下7步桃移。
(1)必須實(shí)現(xiàn)Writable接口
(2)反序列化時(shí)屋匕,需要反射調(diào)用空參構(gòu)造函數(shù),所以必須有空參構(gòu)造

public FlowBean() { 
    super();
}

(3)重寫序列化方法

@Overridepublic void write(DataOutput out) throws IOException { 
    out.writeLong(upFlow);  
    out.writeLong(downFlow);    
    out.writeLong(sumFlow);
}

(4)重寫反序列化方法

@Overridepublic void readFields(DataInput in) throws IOException {  
    upFlow = in.readLong(); 
    downFlow = in.readLong();   
    sumFlow = in.readLong();
}

(5)注意反序列化的順序和序列化的順序完全一致
(6)要想把結(jié)果顯示在文件中借杰,需要重寫toString()过吻,可用”\t”分開(kāi),方便后續(xù)用。
(7)如果需要將自定義的bean放在key中傳輸纤虽,則還需要實(shí)現(xiàn)Comparable接口乳绕,因?yàn)镸apReduce框中的Shuffle過(guò)程要求對(duì)key必須能排序。

@Overridepublic int compareTo(FlowBean o) { 
    // 倒序排列逼纸,從大到小    
    return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

2.3 序列化案例實(shí)操

1 需求:

統(tǒng)計(jì)每一個(gè)手機(jī)號(hào)耗費(fèi)的總上行流量洋措、下行流量、總流量
輸入數(shù)據(jù):phone_data.txt

1   13736230513 192.196.100.1   www.huawei.com  2481    24681   200
2   13846544121 192.196.100.2           264 0   200
3   13956435636 192.196.100.3           132 1512    200
4   13966251146 192.168.100.1           240 0   404
5   18271575951 192.168.100.2   www.huawei.com  1527    2106    200
6   84188413    192.168.100.3   www.huawei.com  4116    1432    200
7   13590439668 192.168.100.4           1116    954 200
8   15910133277 192.168.100.5   www.hao123.com  3156    2936    200
9   13729199489 192.168.100.6           240 0   200
10  13630577991 192.168.100.7   www.shouhu.com  6960    690 200
11  15043685818 192.168.100.8   www.baidu.com   3659    3538    200
12  15959002129 192.168.100.9   www.huawei.com  1938    180 500
13  13560439638 192.168.100.10          918 4938    200
14  13470253144 192.168.100.11          180 180 200
15  13682846555 192.168.100.12  www.qq.com  1938    2910    200
16  13992314666 192.168.100.13  www.gaga.com    3008    3720    200
17  13509468723 192.168.100.14  www.qinghua.com 7335    110349  404
18  18390173782 192.168.100.15  www.sogou.com   9531    2412    200
19  13975057813 192.168.100.16  www.baidu.com   11058   48243   200
20  13768778790 192.168.100.17          120 120 200
21  13568436656 192.168.100.18  www.alibaba.com 2481    24681   200
22  13568436656 192.168.100.19          1116    954 200

輸入數(shù)據(jù)格式:

7   13560436666 120.196.100.99   網(wǎng)址                1116      954        200
id  手機(jī)號(hào)碼      網(wǎng)絡(luò)ip           www.baidu.com      上行流量    下行流量     網(wǎng)絡(luò)狀態(tài)碼

輸出數(shù)據(jù)格式

13560436666         1116              954           2070
手機(jī)號(hào)碼            上行流量            下行流量        總流量

2 需求分析

Map階段
(1)讀取一行數(shù)據(jù)杰刽,切分字段:

7      13560436666    120.196.100.99   1116       954      200

(2)抽取手機(jī)號(hào)菠发、上行流量、下行流量

13560436666       1116                954           
手機(jī)號(hào)碼            上行流量            下行流量 

(3)以手機(jī)號(hào)為key专缠,bean對(duì)象為value輸出雷酪,即context.write(手機(jī)號(hào),bean);
(4)bean對(duì)象要想能夠傳輸,必須實(shí)現(xiàn)序列化接口

Reduce階段
(1)累加上行流量和下行流量得到總流量涝婉。

13560436666        1116       +         954     =      2070   
手機(jī)號(hào)碼            上行流量             下行流量           總流量 

3 編寫MapReduce程序

package com.jackyan.mapreduce.mapper;

import com.jackyan.mapreduce.beans.FlowBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    FlowBean v = new FlowBean();
    Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1 獲取一行
        String line = value.toString();

        // 2 切割字段
        String[] fields = line.split("\t");

        // 3 封裝對(duì)象
        // 取出手機(jī)號(hào)碼
        String phoneNum = fields[1];

        // 取出上行流量和下行流量
        long upFlow = Long.parseLong(fields[fields.length - 3]);
        long downFlow = Long.parseLong(fields[fields.length - 2]);

        k.set(phoneNum);
        v.set(downFlow, upFlow);

        // 4 寫出
        context.write(k, v);
    }
}

4 編寫Reducer類

package com.jackyan.mapreduce.reducer;

import com.jackyan.mapreduce.beans.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        long sum_upFlow = 0;
        long sum_downFlow = 0;

        // 1 遍歷所用bean哥力,將其中的上行流量,下行流量分別累加
        for (FlowBean flowBean : values) {
            sum_upFlow += flowBean.getUpFlow();
            sum_downFlow += flowBean.getDownFlow();
        }

        // 2 封裝對(duì)象
        FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);

        // 3 寫出
        context.write(key, resultBean);
    }
}

5 編寫Driver驅(qū)動(dòng)類

package com.jackyan.mapreduce.driver;

import com.jackyan.mapreduce.beans.FlowBean;
import com.jackyan.mapreduce.mapper.FlowCountMapper;
import com.jackyan.mapreduce.reducer.FlowCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowSumDriver {
    public static void main(String[] args) throws Exception{
        // 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置
        args = new String[] { "h:/input", "h:/output1" };

        // 1 獲取配置信息,或者job對(duì)象實(shí)例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 6 指定本程序的jar包所在的本地路徑
        job.setJarByClass(FlowSumDriver.class);

        // 2 指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        // 3 指定mapper輸出數(shù)據(jù)的kv類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 4 指定最終輸出的數(shù)據(jù)的kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 5 指定job的輸入原始文件所在目錄
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 將job中配置的相關(guān)參數(shù)咒彤,以及job所用的java類所在的jar包蜗搔, 提交給yarn去運(yùn)行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

6 測(cè)試結(jié)果

13470253144 180 180 360
13509468723 110349  7335    117684
13560439638 4938    918 5856
13568436656 25635   3597    29232
13590439668 954 1116    2070
13630577991 690 6960    7650
13682846555 2910    1938    4848
13729199489 0   240 240
13736230513 24681   2481    27162
13768778790 120 120 240
13846544121 0   264 264
13956435636 1512    132 1644
13966251146 0   240 240
13975057813 48243   11058   59301
13992314666 3720    3008    6728
15043685818 3538    3659    7197
15910133277 2936    3156    6092
15959002129 180 1938    2118
18271575951 2106    1527    3633
18390173782 2412    9531    11943
84188413    1432    4116    5548

第3章 MapReduce框架原理

3.1.1 切片與MapTask并行度決定機(jī)制

1 問(wèn)題引出

MapTask的并行度決定Map階段的任務(wù)處理并發(fā)度,進(jìn)而影響到整個(gè)Job的處理速度锌钮。

思考:1G的數(shù)據(jù),啟動(dòng)8個(gè)MapTask引矩,可以提高集群的并發(fā)處理能力梁丘。那么1K的數(shù)據(jù),也啟動(dòng)8個(gè)MapTask旺韭,會(huì)提高集群性能嗎氛谜?MapTask并行任務(wù)是否越多越好呢?哪些因素影響了MapTask并行度区端?

2 MapTask并行度決定機(jī)制

數(shù)據(jù)塊:Block是HDFS物理上把數(shù)據(jù)分成一塊一塊值漫。

數(shù)據(jù)切片:數(shù)據(jù)切片只是在邏輯上對(duì)輸入進(jìn)行分片,并不會(huì)在磁盤上將其切分成片進(jìn)行存儲(chǔ)织盼。

1)一個(gè)Job的Map階段并行度由客戶端在提交Job時(shí)的切片數(shù)決定
2)每一個(gè)Split切片分配一個(gè)MapTask并行實(shí)例處理
3)默認(rèn)情況下杨何,切片大小=BlockSize
4)切片時(shí)不考慮數(shù)據(jù)集整體,而是逐個(gè)針對(duì)每一個(gè)文件單獨(dú)切片

3.1.2 Job提交流程源碼和切片源碼詳解

Job提交流程源碼詳解

waitForCompletion()

submit();

// 1建立連接
connect();  
    // 1)創(chuàng)建提交Job的代理
    new Cluster(getConfiguration());
    // (1)判斷是本地yarn還是遠(yuǎn)程
        initialize(jobTrackAddr, conf); 

// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
    // 1)創(chuàng)建給集群提交數(shù)據(jù)的Stag路徑
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

    // 2)獲取jobid 沥邻,并創(chuàng)建Job路徑
    JobID jobId = submitClient.getNewJobID();

    // 3)拷貝jar包到集群
    copyAndConfigureFiles(job, submitJobDir);   
    rUploader.uploadFiles(job, jobSubmitDir);

    // 4)計(jì)算切片危虱,生成切片規(guī)劃文件
    writeSplits(job, submitJobDir);
    maps = writeNewSplits(job, jobSubmitDir);
    input.getSplits(job);

    // 5)向Stag路徑寫XML配置文件
    writeConf(conf, submitJobFile);
    conf.writeXml(out);

    // 6)提交Job,返回提交狀態(tài)
    status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

FileInputFormat切片源碼解析(input.getSplits(job))**

(1)程序先找到你數(shù)據(jù)存儲(chǔ)的目錄。
(2)開(kāi)始遍歷處理(規(guī)劃切片)目錄下的每一個(gè)文件
(3)遍歷第一個(gè)文件ss.txt
a)獲取文件大小fs.sizeOf(ss.txt)
b)計(jì)算切片大刑迫:computeSplitSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
c)默認(rèn)情況下埃跷,切片大小=blocksize
d)開(kāi)始切,形成第1個(gè)切片:ss.txt—0:128M --> 第2個(gè)切片ss.txt—128:256M --> 第3個(gè)切片ss.txt—256M:300M
每次切片時(shí),都要判斷切完剩下的部分是否大于塊的1.1倍捌蚊,不大于1.1倍就劃分一塊切片
e)將切片信息寫到一個(gè)切片規(guī)劃文件中
f)整個(gè)切片的核心過(guò)程在getSplit()方法中完成
g)InputSplit只記錄了切片的元數(shù)據(jù)信息集畅,比如起始位置、長(zhǎng)度以及所在的節(jié)點(diǎn)列表等缅糟。
(4)提交切片規(guī)劃文件到Y(jié)ARN上挺智,YARN上的MrAppMaster就可以根據(jù)切片規(guī)劃文件計(jì)算開(kāi)啟MapTask個(gè)數(shù)。

3.1.3 FileInputFormat切片機(jī)制

1 切片機(jī)制
(1)簡(jiǎn)單地按照文件的內(nèi)容長(zhǎng)度進(jìn)行切片
(2)切片大小窗宦,默認(rèn)等于Block大小
(3)切片時(shí)不考慮數(shù)據(jù)集整體赦颇,而是逐個(gè)針對(duì)每一個(gè)文件單獨(dú)切片

2 案例分析
(1)輸入數(shù)據(jù)有兩個(gè)文件:

file1.txt    320M
file2.txt    10M

(2)經(jīng)過(guò)FileInputFormat的切片機(jī)制運(yùn)算后,形成的切片信息如下:

file1.txt.split1--  0~128
file1.txt.split2--  128~256
file1.txt.split3--  256~320
file2.txt.split1--  0~10M

3 FileInputFormat切片大小的參數(shù)配置
(1)源碼中計(jì)算切片大小的公式

Math.max(minSize, Math.min(maxSize, blockSize)); 
mapreduce.input.fileinputformat.split.minsize=1 默認(rèn)值為1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默認(rèn)值Long.MAXValue

因此赴涵,默認(rèn)情況下媒怯,切片大小=blocksize。

(2)切片大小設(shè)置
maxsize(切片最大值):參數(shù)如果調(diào)得比blockSize小髓窜,則會(huì)讓切片變小扇苞,而且就等于配置的這個(gè)參數(shù)的值。
minsize(切片最小值):參數(shù)調(diào)的比blockSize大寄纵,則可以讓切片變得比blockSize還大鳖敷。

(3)獲取切片信息API

// 獲取切片的文件名稱
String name = inputSplit.getPath().getName();
// 根據(jù)文件類型獲取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();

3.1.4 CombineTextInputFormat切片機(jī)制

框架默認(rèn)的TextInputFormat切片機(jī)制是對(duì)任務(wù)按文件規(guī)劃切片,不管文件多小程拭,都會(huì)是一個(gè)單獨(dú)的切片定踱,都會(huì)交給一個(gè)MapTask,這樣如果有大量小文件恃鞋,就會(huì)產(chǎn)生大量的MapTask崖媚,處理效率極其低下。

1 應(yīng)用場(chǎng)景:
CombineTextInputFormat用于小文件過(guò)多的場(chǎng)景恤浪,它可以將多個(gè)小文件從邏輯上規(guī)劃到一個(gè)切片中畅哑,這樣,多個(gè)小文件就可以交給一個(gè)MapTask處理资锰。

2 虛擬存儲(chǔ)切片最大值設(shè)置

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

注意:虛擬存儲(chǔ)切片最大值設(shè)置最好根據(jù)實(shí)際的小文件大小情況來(lái)設(shè)置具體的值敢课。

3 切片機(jī)制
生成切片過(guò)程包括:虛擬存儲(chǔ)過(guò)程和切片過(guò)程二部分阶祭。
(1)虛擬存儲(chǔ)過(guò)程:
將輸入目錄下所有文件大小绷杜,依次和設(shè)置的setMaxInputSplitSize值比較,如果不大于設(shè)置的最大值濒募,邏輯上劃分一個(gè)塊鞭盟。如果輸入文件大于設(shè)置的最大值且大于兩倍,那么以最大值切割一塊瑰剃;當(dāng)剩余數(shù)據(jù)大小超過(guò)設(shè)置的最大值且不大于最大值2倍齿诉,此時(shí)將文件均分成2個(gè)虛擬存儲(chǔ)塊(防止出現(xiàn)太小切片)。

例如setMaxInputSplitSize值為4M,輸入文件大小為8.02M粤剧,則先邏輯上分成一個(gè)4M歇竟。剩余的大小為4.02M,如果按照4M邏輯劃分抵恋,就會(huì)出現(xiàn)0.02M的小的虛擬存儲(chǔ)文件焕议,所以將剩余的4.02M文件切分成(2.01M和2.01M)兩個(gè)文件。

(2)切片過(guò)程:
(a)判斷虛擬存儲(chǔ)的文件大小是否大于setMaxInputSplitSize值弧关,大于等于則單獨(dú)形成一個(gè)切片盅安。
(b)如果不大于則跟下一個(gè)虛擬存儲(chǔ)文件進(jìn)行合并,共同形成一個(gè)切片世囊。
(c)測(cè)試舉例:有4個(gè)小文件大小分別為1.7M别瞭、5.1M、3.4M以及6.8M這四個(gè)小文件株憾,則虛擬存儲(chǔ)之后形成6個(gè)文件塊蝙寨,大小分別為:

1.7M,(2.55M嗤瞎、2.55M)籽慢,3.4M以及(3.4M、3.4M)

最終會(huì)形成3個(gè)切片猫胁,大小分別為:

(1.7+2.55)M箱亿,(2.55+3.4)M,(3.4+3.4)M

3.1.5 CombineTextInputFormat案例實(shí)操

1 需求
將輸入的大量小文件合并成一個(gè)切片統(tǒng)一處理弃秆。
(1)輸入數(shù)據(jù):準(zhǔn)備4個(gè)小文件
(2)期望
期望一個(gè)切片處理4個(gè)文件

2 實(shí)現(xiàn)過(guò)程
(1)不做任何處理届惋,運(yùn)行之前的WordCount案例程序,觀察切片個(gè)數(shù)為4菠赚。
(2)在WordcountDriver中增加如下代碼脑豹,運(yùn)行程序,并觀察運(yùn)行的切片個(gè)數(shù)為3衡查。
(a)驅(qū)動(dòng)類中添加代碼如下:

// 如果不設(shè)置InputFormat瘩欺,它默認(rèn)用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);

//虛擬存儲(chǔ)切片最大值設(shè)置4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
(b)運(yùn)行如果為3個(gè)切片。

(3)在WordcountDriver中增加如下代碼拌牲,運(yùn)行程序俱饿,并觀察運(yùn)行的切片個(gè)數(shù)為1。
(a)驅(qū)動(dòng)中添加代碼如下:

// 如果不設(shè)置InputFormat塌忽,它默認(rèn)用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);

//虛擬存儲(chǔ)切片最大值設(shè)置20m
CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);
(b)運(yùn)行如果為1個(gè)切片拍埠。

3.1.6 FileInputFormat實(shí)現(xiàn)類

思考:在運(yùn)行MapReduce程序時(shí),輸入的文件格式包括:基于行的日志文件土居、二進(jìn)制格式文件枣购、數(shù)據(jù)庫(kù)表等嬉探。那么,針對(duì)不同的數(shù)據(jù)類型棉圈,MapReduce是如何讀取這些數(shù)據(jù)的呢涩堤?

FileInputFormat常見(jiàn)的接口實(shí)現(xiàn)類包括:TextInputFormat、KeyValueTextInputFormat分瘾、NLineInputFormat定躏、CombineTextInputFormat和自定義InputFormat等。

1 TextInputFormat
TextInputFormat是默認(rèn)的FileInputFormat實(shí)現(xiàn)類芹敌。按行讀取每條記錄痊远。鍵是存儲(chǔ)該行在整個(gè)文件中的起始字節(jié)偏移量, LongWritable類型氏捞。值是這行的內(nèi)容碧聪,不包括任何行終止符(換行符和回車符),Text類型液茎。

以下是一個(gè)示例逞姿,比如,一個(gè)分片包含了如下4條文本記錄捆等。

Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise

每條記錄表示為以下鍵/值對(duì):

(0,Rich learning form)
(19,Intelligent learning engine)
(47,Learning more convenient)
(72,From the real demand for more close to the enterprise)

2 KeyValueTextInputFormat
每一行均為一條記錄滞造,被分隔符分割為key,value栋烤≮搜可以通過(guò)在驅(qū)動(dòng)類中設(shè)置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");來(lái)設(shè)定分隔符。默認(rèn)分隔符是tab(\t)明郭。

以下是一個(gè)示例买窟,輸入是一個(gè)包含4條記錄的分片。其中——>表示一個(gè)(水平方向的)制表符薯定。

line1 ——>Rich learning form
line2 ——>Intelligent learning engine
line3 ——>Learning more convenient
line4 ——>From the real demand for more close to the enterprise

每條記錄表示為以下鍵/值對(duì):

(line1,Rich learning form)
(line2,Intelligent learning engine)
(line3,Learning more convenient)
(line4,From the real demand for more close to the enterprise)

此時(shí)的鍵是每行排在制表符之前的Text序列始绍。

3 NLineInputFormat
如果使用NlineInputFormat,代表每個(gè)map進(jìn)程處理的InputSplit不再按Block塊去劃分话侄,而是按NlineInputFormat指定的行數(shù)N來(lái)劃分亏推。即輸入文件的總行數(shù)/N=切片數(shù),如果不整除年堆,切片數(shù)=商+1吞杭。
以下是一個(gè)示例,仍然以上面的4行輸入為例嘀韧。

Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise

例如篇亭,如果N是2缠捌,則每個(gè)輸入分片包含兩行锄贷。開(kāi)啟2個(gè)MapTask译蒂。

(0,Rich learning form)
(19,Intelligent learning engine)

另一個(gè) mapper 則收到后兩行:

(47,Learning more convenient)
(72,From the real demand for more close to the enterprise)

這里的鍵和值與TextInputFormat生成的一樣。

3.1.7 KeyValueTextInputFormat使用案例

1 需求
統(tǒng)計(jì)輸入文件中每一行的第一個(gè)單詞相同的行數(shù)谊却。
(1)輸入數(shù)據(jù)

banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang

(2)期望結(jié)果數(shù)據(jù)

banzhang    2
xihuan  2

2 代碼實(shí)現(xiàn)
(1)編寫Mapper類

package com.jackyan.mapreduce.mapper;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class KVTextMapper extends Mapper<Text, Text, Text, LongWritable> {

    // 1 設(shè)置value
    LongWritable v = new LongWritable(1);

    @Override
    protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {

        // banzhang ni hao
        // 2 寫出
        context.write(key, v);
    }
}

(2)編寫Reducer類

package com.jackyan.mapreduce.reducer;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class KVTextReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

    LongWritable v = new LongWritable();

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long sum = 0L;

        // 1 匯總統(tǒng)計(jì)
        for (LongWritable value : values) {
            sum += value.get();
        }

        v.set(sum);

        // 2 輸出
        context.write(key, v);
    }
}

(3)編寫Driver類

package com.jackyan.mapreduce.driver;

import com.jackyan.mapreduce.mapper.KVTextMapper;
import com.jackyan.mapreduce.reducer.KVTextReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class KVTextDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        // 設(shè)置切割符
        conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
        // 1 獲取job對(duì)象
        Job job = Job.getInstance(conf);

        // 2 設(shè)置jar包位置柔昼,關(guān)聯(lián)mapper和reducer
        job.setJarByClass(KVTextDriver.class);
        job.setMapperClass(KVTextMapper.class);
        job.setReducerClass(KVTextReducer.class);

        // 3 設(shè)置map輸出kv類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        // 4 設(shè)置最終輸出kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 5 設(shè)置輸入輸出數(shù)據(jù)路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));

        // 設(shè)置輸入格式
        job.setInputFormatClass(KeyValueTextInputFormat.class);

        // 6 設(shè)置輸出數(shù)據(jù)路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 提交job
        job.waitForCompletion(true);
    }
}

3.1.8 NLineInputFormat使用案例

1 需求
對(duì)每個(gè)單詞進(jìn)行個(gè)數(shù)統(tǒng)計(jì),要求根據(jù)每個(gè)輸入文件的行數(shù)來(lái)規(guī)定輸出多少個(gè)切片炎辨。此案例要求每三行放入一個(gè)切片中捕透。
(1)輸入數(shù)據(jù)nline.txt

banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang banzhang ni hao
xihuan hadoop banzhang

(2)期望輸出數(shù)據(jù)

Number of splits:4

2 代碼實(shí)現(xiàn)
(1)編寫Mapper類

package com.jackyan.mapreduce.mapper;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class NLineMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    private Text k = new Text();
    private LongWritable v = new LongWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1 獲取一行
        String line = value.toString();

        // 2 切割
        String[] splited = line.split(" ");

        // 3 循環(huán)寫出
        for (int i = 0; i < splited.length; i++) {

            k.set(splited[i]);

            context.write(k, v);
        }
    }
}

(2)編寫Reducer類

package com.jackyan.mapreduce.reducer;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class NLineReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

    LongWritable v = new LongWritable();

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long sum = 0l;

        // 1 匯總
        for (LongWritable value : values) {
            sum += value.get();
        }

        v.set(sum);

        // 2 輸出
        context.write(key, v);
    }
}

(3)編寫Driver類

package com.jackyan.mapreduce.driver;

import com.jackyan.mapreduce.mapper.NLineMapper;
import com.jackyan.mapreduce.reducer.NLineReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class NLineDriver {
    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
        // 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置
        args = new String[]{"h:/input/nline.txt", "e:/output"};

        // 1 獲取job對(duì)象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 7設(shè)置每個(gè)切片InputSplit中劃分三條記錄
        NLineInputFormat.setNumLinesPerSplit(job, 3);

        // 8使用NLineInputFormat處理記錄數(shù)
        job.setInputFormatClass(NLineInputFormat.class);

        // 2設(shè)置jar包位置,關(guān)聯(lián)mapper和reducer
        job.setJarByClass(NLineDriver.class);
        job.setMapperClass(NLineMapper.class);
        job.setReducerClass(NLineReducer.class);

        // 3設(shè)置map輸出kv類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        // 4設(shè)置最終輸出kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 5設(shè)置輸入輸出數(shù)據(jù)路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 6提交job
        job.waitForCompletion(true);
    }
}

(4)測(cè)試結(jié)果

3.1.9 自定義InputFormat

Hadoop框架自帶的InputFormat類型不能滿足所有應(yīng)用場(chǎng)景碴萧,需要自定義InputFormat來(lái)解決實(shí)際問(wèn)題乙嘀。
自定義InputFormat步驟如下:
(1)自定義一個(gè)類繼承FileInputFormat。
(2)改寫RecordReader破喻,實(shí)現(xiàn)一次讀取一個(gè)完整文件封裝為KV虎谢。
(3)在輸出時(shí)使用SequenceFileOutPutFormat輸出合并文件。

3.1.10 自定義InputFormat案例實(shí)操

無(wú)論HDFS還是MapReduce曹质,在處理小文件時(shí)效率都非常低婴噩,但又難免面臨處理大量小文件的場(chǎng)景,此時(shí)羽德,就需要有相應(yīng)解決方案几莽。可以自定義InputFormat實(shí)現(xiàn)小文件的合并宅静。

1 需求
將多個(gè)小文件合并成一個(gè)SequenceFile文件(SequenceFile文件是Hadoop用來(lái)存儲(chǔ)二進(jìn)制形式的key-value對(duì)的文件格式)章蚣,SequenceFile里面存儲(chǔ)著多個(gè)文件,存儲(chǔ)的形式為文件路徑+名稱為key姨夹,文件內(nèi)容為value究驴。
(1)輸入數(shù)據(jù),1.txt匀伏,2.txt洒忧,3.txt,內(nèi)容如下:

# 1.txt
There are moments in life when you miss someone so much that you just want to pick them from your dreams and hug them for real! Dream what you want to dream;
go where you want to go;
be what you want to be,because you have only one life and one chance to do all the things you want to do.
# 2.txt
May you have enough happiness to make you sweet,
enough trials to make you strong,
enough sorrow to keep you human,enough hope to make you happy? 
Always put yourself in others’shoes.If you feel that it hurts you,
it probably hurts the other person, too.
# 3.txt
The happiest of people don’t necessarily have the best of everything;
they just make the most of everything that comes along their way.
Happiness lies for those who cry,those who hurt, 
those who have searched,and those who have tried,
for only they can appreciate the importance of people

2 需求分析
a 自定義一個(gè)類繼承FileInputFormat
(1)重寫isSplitable()方法够颠,返回false不可切割
(2)重寫createRecordReader()熙侍,創(chuàng)建自定義的RecordReader對(duì)象,并初始化
b 改寫RecordReader履磨,實(shí)現(xiàn)一次讀取一個(gè)完整文件封裝為KV
(1)采用IO流一次讀取一個(gè)文件輸出到value中蛉抓,因?yàn)樵O(shè)置了不可切片,最終把所有文件都封裝到了value中
(2)獲取文件路徑信息 + 名稱剃诅,并設(shè)置key
c 設(shè)置Driver

// (1)設(shè)置輸入的inputFormat
job.setInputFormatClass(WholeFileInputformat.class);

// (2)設(shè)置輸出的outputFormat
job.setOutputFormatClass(SequenceFileOutputFormat.class);

3 程序?qū)崿F(xiàn)
a 自定義InputFromat

package com.jackyan.mapreduce.inputformat;

import com.jackyan.mapreduce.recordreder.WholeFileRecordReader;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable> {
    @Override
    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        WholeFileRecordReader recordReader = new WholeFileRecordReader();
        recordReader.initialize(split, context);

        return recordReader;
    }

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        // 文件不進(jìn)行分片
        return false;
    }
}

b 自定義RecordReader類

package com.jackyan.mapreduce.recordreder;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {

    private Configuration configuration;
    private FileSplit split;

    private boolean isProgress= true;
    private BytesWritable value = new BytesWritable();
    private Text key = new Text();

    public WholeFileRecordReader() {
        super();
    }

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        this.split = (FileSplit) split;
        this.configuration = context.getConfiguration();
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (isProgress) {

            // 1 定義緩存區(qū)
            byte[] contents = new byte[(int)split.getLength()];

            FileSystem fs = null;
            FSDataInputStream fis = null;

            try {
                // 2 獲取文件系統(tǒng)
                Path path = split.getPath();
                fs = path.getFileSystem(configuration);

                // 3 讀取數(shù)據(jù)
                fis = fs.open(path);

                // 4 讀取文件內(nèi)容
                IOUtils.readFully(fis, contents, 0, contents.length);

                // 5 輸出文件內(nèi)容
                value.set(contents, 0, contents.length);

                // 6 獲取文件路徑及名稱
                String name = split.getPath().toString();

                // 7 設(shè)置輸出的key值
                key.set(name);

            } catch (Exception e) {

            }finally {
                IOUtils.closeStream(fis);
            }

            isProgress = false;

            return true;
        }

        return false;
    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    @Override
    public void close() throws IOException {

    }
}

c 編寫SequenceFileMapper

package com.jackyan.mapreduce.mapper;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {
    @Override
    protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {

        context.write(key, value);
    }
}

d 編寫SequenceFileReducer類

package com.jackyan.mapreduce.reducer;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
    @Override
    protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {

        context.write(key, values.iterator().next());
    }
}

e 編寫SequenceFileDriver類

package com.jackyan.mapreduce.driver;

import com.jackyan.mapreduce.inputformat.WholeFileInputformat;
import com.jackyan.mapreduce.mapper.SequenceFileMapper;
import com.jackyan.mapreduce.reducer.SequenceFileReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

import java.io.IOException;

public class SequenceFileDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置
        args = new String[] { "h:/input/sequenceTest", "h:/output" };

        // 1 獲取job對(duì)象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 設(shè)置jar包存儲(chǔ)位置巷送、關(guān)聯(lián)自定義的mapper和reducer
        job.setJarByClass(SequenceFileDriver.class);
        job.setMapperClass(SequenceFileMapper.class);
        job.setReducerClass(SequenceFileReducer.class);

        // 7設(shè)置輸入的inputFormat
        job.setInputFormatClass(WholeFileInputformat.class);

        // 8設(shè)置輸出的outputFormat
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        // 3 設(shè)置map輸出端的kv類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        // 4 設(shè)置最終輸出端的kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        // 5 設(shè)置輸入輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 6 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

3.2 MapReduce工作流程

流程詳解**
上面的流程是整個(gè)MapReduce最全工作流程,但是Shuffle過(guò)程只是從第7步開(kāi)始到第16步結(jié)束矛辕,具體Shuffle過(guò)程詳解笑跛,如下:
1)MapTask收集我們的map()方法輸出的kv對(duì)付魔,放到內(nèi)存緩沖區(qū)中
2)從內(nèi)存緩沖區(qū)不斷溢出本地磁盤文件,可能會(huì)溢出多個(gè)文件
3)多個(gè)溢出文件會(huì)被合并成大的溢出文件
4)在溢出過(guò)程及合并的過(guò)程中飞蹂,都要調(diào)用Partitioner進(jìn)行分區(qū)和針對(duì)key進(jìn)行排序
5)ReduceTask根據(jù)自己的分區(qū)號(hào)几苍,去各個(gè)MapTask機(jī)器上取相應(yīng)的結(jié)果分區(qū)數(shù)據(jù)
6)ReduceTask會(huì)取到同一個(gè)分區(qū)的來(lái)自不同MapTask的結(jié)果文件,ReduceTask會(huì)將這些文件再進(jìn)行合并(歸并排序)
7)合并成大文件后陈哑,Shuffle的過(guò)程也就結(jié)束了妻坝,后面進(jìn)入ReduceTask的邏輯運(yùn)算過(guò)程(從文件中取出一個(gè)一個(gè)的鍵值對(duì)Group,調(diào)用用戶自定義的reduce()方法)

注意
Shuffle中的緩沖區(qū)大小會(huì)影響到MapReduce程序的執(zhí)行效率惊窖,原則上說(shuō)刽宪,緩沖區(qū)越大,磁盤io的次數(shù)越少界酒,執(zhí)行速度就越快纠屋。
緩沖區(qū)的大小可以通過(guò)參數(shù)調(diào)整,參數(shù):io.sort.mb默認(rèn)100M盾计。

源碼解析流程

context.write(k, NullWritable.get());
    output.write(key, value);
        collector.collect(key, value,partitioner.getPartition(key, value, partitions));
            HashPartitioner();
        collect()
            close()
                collect.flush()
                    sortAndSpill()
                        sort()   QuickSort
                    mergeParts();
    
                collector.close();

3.3 Shuffle機(jī)制

3.3.1 Shuffle機(jī)制

Map方法之后售担,Reduce方法之前的數(shù)據(jù)處理過(guò)程稱之為Shuffle。

3.3.2 Partition分區(qū)

很多情況下署辉,要求將統(tǒng)計(jì)結(jié)果按照條件輸出到不同文件中(分區(qū))族铆。比如:將統(tǒng)計(jì)結(jié)果按照手機(jī)歸屬地不同省份輸出到不同文件中(分區(qū))

默認(rèn)Partitioner分區(qū)

public class HashPartitioner<K, V> extends Partitioner<K, V> {
  public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

默認(rèn)分區(qū)是根據(jù)key的hashCode對(duì)ReduceTasks個(gè)數(shù)取模得到的。用戶沒(méi)法控制哪個(gè)key存儲(chǔ)到哪個(gè)分區(qū)哭尝。

自定義Partitioner步驟:
(1)自定義類繼承Partitioner哥攘,重寫getPartition()方法

public class CustomPartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
          // 控制分區(qū)代碼邏輯
    … …
        return partition;
    }
}

(2)在Job驅(qū)動(dòng)中,設(shè)置自定義Partitioner

job.setPartitionerClass(CustomPartitioner.class);

(3)自定義Partition后材鹦,要根據(jù)自定義Partitioner的邏輯設(shè)置相應(yīng)數(shù)量的ReduceTask

job.setNumReduceTasks(5);

分區(qū)總結(jié):
(1)如果ReduceTask的數(shù)量> getPartition的結(jié)果數(shù)逝淹,則會(huì)多產(chǎn)生幾個(gè)空的輸出文件part-r-000xx;
(2)如果1<ReduceTask的數(shù)量<getPartition的結(jié)果數(shù)桶唐,則有一部分分區(qū)數(shù)據(jù)無(wú)處安放栅葡,會(huì)Exception;
(3)如果ReduceTask的數(shù)量=1尤泽,則不管MapTask端輸出多少個(gè)分區(qū)文件欣簇,最終結(jié)果都交給這一個(gè)ReduceTask,最終也就只會(huì)產(chǎn)生一個(gè)結(jié)果文件 part-r-00000坯约;
(4)分區(qū)號(hào)必須從零開(kāi)始熊咽,逐一累加。

案例分析:
例如:假設(shè)自定義分區(qū)數(shù)為5闹丐,則
(1)job.setNumReduceTasks(1); 會(huì)正常運(yùn)行横殴,只不過(guò)會(huì)產(chǎn)生一個(gè)輸出文件
(2)job.setNumReduceTasks(2); 會(huì)報(bào)錯(cuò)
(3)job.setNumReduceTasks(6); 大于5,程序會(huì)正常運(yùn)行卿拴,會(huì)產(chǎn)生空文件

3.3.3 Partition分區(qū)案例實(shí)操

1 需求
將統(tǒng)計(jì)結(jié)果按照手機(jī)歸屬地不同省份輸出到不同文件中(分區(qū))
(1)輸入數(shù)據(jù)phone_data.txt

1   13736230513 192.196.100.1   www.huawei.com  2481    24681   200
2   13846544121 192.196.100.2           264 0   200
3   13956435636 192.196.100.3           132 1512    200
4   13966251146 192.168.100.1           240 0   404
5   18271575951 192.168.100.2   www.huawei.com  1527    2106    200
6   84188413    192.168.100.3   www.huawei.com  4116    1432    200
7   13590439668 192.168.100.4           1116    954 200
8   15910133277 192.168.100.5   www.hao123.com  3156    2936    200
9   13729199489 192.168.100.6           240 0   200
10  13630577991 192.168.100.7   www.shouhu.com  6960    690 200
11  15043685818 192.168.100.8   www.baidu.com   3659    3538    200
12  15959002129 192.168.100.9   www.huawei.com  1938    180 500
13  13560439638 192.168.100.10          918 4938    200
14  13470253144 192.168.100.11          180 180 200
15  13682846555 192.168.100.12  www.qq.com  1938    2910    200
16  13992314666 192.168.100.13  www.gaga.com    3008    3720    200
17  13509468723 192.168.100.14  www.qinghua.com 7335    110349  404
18  18390173782 192.168.100.15  www.sogou.com   9531    2412    200
19  13975057813 192.168.100.16  www.baidu.com   11058   48243   200
20  13768778790 192.168.100.17          120 120 200
21  13568436656 192.168.100.18  www.alibaba.com 2481    24681   200
22  13568436656 192.168.100.19          1116    954 200

(2)期望輸出數(shù)據(jù)
手機(jī)號(hào)136衫仑、137梨与、138、139開(kāi)頭都分別放到一個(gè)獨(dú)立的4個(gè)文件中惑畴,其他開(kāi)頭的放到一個(gè)文件中蛋欣。

2 在2.3 序列化案例實(shí)操的基礎(chǔ)上航徙,增加一個(gè)分區(qū)類

// 1 獲取電話號(hào)碼的前三位
        String preNum = key.toString().substring(0, 3);
        
        int partition = 4;
        
        // 2 判斷是哪個(gè)省
        if ("136".equals(preNum)) {
            partition = 0;
        }else if ("137".equals(preNum)) {
            partition = 1;
        }else if ("138".equals(preNum)) {
            partition = 2;
        }else if ("139".equals(preNum)) {
            partition = 3;
        }

        return partition;
    }

3 在驅(qū)動(dòng)函數(shù)中增加自定義數(shù)據(jù)分區(qū)設(shè)置和ReduceTask設(shè)置

        // 指定自定義數(shù)據(jù)分區(qū)
        job.setPartitionerClass(ProvincePartitioner.class);

        // 同時(shí)指定相應(yīng)數(shù)量的reduce task
        job.setNumReduceTasks(5);

3.3.4 WritableComparable排序

排序是MapReduce框架中最重要的操作之一如贷。
MapTask和ReduceTask均會(huì)對(duì)數(shù)據(jù)按照key進(jìn)行排序。該操作屬于Hadoop的默認(rèn)行為到踏。任何應(yīng)用程序中的數(shù)據(jù)均會(huì)被排序杠袱,而不管邏輯上是否需要。
默認(rèn)排序是按照字典順序排序窝稿,且實(shí)現(xiàn)該排序的方法是快速排序楣富。

對(duì)于MapTask,它會(huì)將處理的結(jié)果暫時(shí)放到環(huán)形緩沖區(qū)中伴榔,當(dāng)環(huán)形緩沖區(qū)使用率達(dá)到一定閾值后纹蝴,再對(duì)緩沖區(qū)中的數(shù)據(jù)進(jìn)行一次快速排序,并將這些有序數(shù)據(jù)溢寫到磁盤上踪少,而當(dāng)數(shù)據(jù)處理完畢后塘安,它會(huì)對(duì)磁盤上所有文件進(jìn)行歸并排序。

對(duì)于ReduceTask援奢,它從每個(gè)MapTask上遠(yuǎn)程拷貝相應(yīng)的數(shù)據(jù)文件兼犯,如果文件大小超過(guò)一定閾值,則溢寫磁盤上集漾,否則存儲(chǔ)在內(nèi)存中切黔。如果磁盤上文件數(shù)目達(dá)到一定閾值,則進(jìn)行一次歸并排序以生成一個(gè)更大文件具篇;如果內(nèi)存中文件大小或者數(shù)目超過(guò)一定閾值纬霞,則進(jìn)行一次合并后將數(shù)據(jù)溢寫到磁盤上。當(dāng)所有數(shù)據(jù)拷貝完畢后驱显,ReduceTask統(tǒng)一對(duì)內(nèi)存和磁盤上的所有數(shù)據(jù)進(jìn)行一次歸并排序险领。

排序分類

a 部分排序:
MapReduce根據(jù)輸入記錄的鍵對(duì)數(shù)據(jù)集排序。保證輸出的每個(gè)文件內(nèi)部有序秒紧。

b 全排序:
最終輸出結(jié)果只有一個(gè)文件绢陌,且文件內(nèi)部有序。實(shí)現(xiàn)方式是只設(shè)置一個(gè)ReduceTask熔恢。但該方法在處理大型文件時(shí)效率極低脐湾,因?yàn)橐慌_(tái)機(jī)器處理所有文件,完全喪失了MapReduce所提供的并行架構(gòu)叙淌。

c 輔助排序:(GroupingComparator分組):
在Reduce端對(duì)key進(jìn)行分組秤掌。應(yīng)用于:在接收的key為bean對(duì)象時(shí)愁铺,想讓一個(gè)或幾個(gè)字段相同(全部字段比較不相同)的key進(jìn)入到同一個(gè)reduce方法時(shí),可以采用分組排序闻鉴。

d 二次排序:
在自定義排序過(guò)程中茵乱,如果compareTo中的判斷條件為兩個(gè)即為二次排序。

自定義排序WritableComparable

原理分析
bean對(duì)象做為key傳輸孟岛,需要實(shí)現(xiàn)WritableComparable接口重寫compareTo方法瓶竭,就可以實(shí)現(xiàn)排序。

@Override
public int compareTo(FlowBean o) {

    int result;
        
    // 按照總流量大小渠羞,倒序排列
    if (sumFlow > bean.getSumFlow()) {
        result = -1;
    }else if (sumFlow < bean.getSumFlow()) {
        result = 1;
    }else {
        result = 0;
    }

    return result;
}

3.3.5 WritableComparable排序案例實(shí)操(全排序)

1 需求
根據(jù)案例2.3序列號(hào)案例實(shí)操產(chǎn)生的結(jié)果再次對(duì)總流量進(jìn)行排序斤贰。
(1)輸入數(shù)據(jù)phone_data-part-r-00000

13470253144 180 180 360
13509468723 110349  7335    117684
13560439638 4938    918 5856
13568436656 25635   3597    29232
13590439668 954 1116    2070
13630577991 690 6960    7650
13682846555 2910    1938    4848
13729199489 0   240 240
13736230513 24681   2481    27162
13768778790 120 120 240
13846544121 0   264 264
13956435636 1512    132 1644
13966251146 0   240 240
13975057813 48243   11058   59301
13992314666 3720    3008    6728
15043685818 3538    3659    7197
15910133277 2936    3156    6092
15959002129 180 1938    2118
18271575951 2106    1527    3633
18390173782 2412    9531    11943
84188413    1432    4116    5548

2 代碼實(shí)現(xiàn)
(1)FlowBean對(duì)象在在需求1基礎(chǔ)上增加了比較功能

package com.jackyan.mapreduce.beans;

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

// 1 實(shí)現(xiàn)writable接口
public class FlowBean implements WritableComparable<FlowBean> {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    //2  反序列化時(shí),需要反射調(diào)用空參構(gòu)造函數(shù)次询,所以必須有
    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    //3  寫序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    //4 反序列化方法
    //5 反序列化方法讀順序必須和寫序列化方法的寫順序必須一致
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow  = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    // 6 編寫toString方法荧恍,方便后續(xù)打印到文本
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    @Override
    public int compareTo(FlowBean flowBean) {

        int result;

        // 按照總流量大小,倒序排列
        if (sumFlow > flowBean.getSumFlow()) {
            result = -1;
        }else if (sumFlow < flowBean.getSumFlow()) {
            result = 1;
        }else {
            result = 0;
        }

        return result;
    }
}

(2)編寫Mapper類

package com.jackyan.mapreduce.mapper;

import com.jackyan.mapreduce.beans.FlowBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 以FlowBean為key
 */
public class FlowOrderMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
    FlowBean flowBean = new FlowBean();
    Text value = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1 獲取一行
        String line = value.toString();

        // 2 切割字段
        String[] fields = line.split("\t");

        // 3 封裝對(duì)象
        String phoneNbr = fields[0];
        long upFlow = Long.parseLong(fields[1]);
        long downFlow = Long.parseLong(fields[2]);

        flowBean.set(upFlow, downFlow);
        value.set(phoneNbr);
        // 4 寫出
        context.write(flowBean, value);
    }
}

(3)編寫Reducer類

package com.jackyan.mapreduce.reducer;

import com.jackyan.mapreduce.beans.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowOrderReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(value, key);
        }
    }
}

(4)編寫Driver類

package com.jackyan.mapreduce.driver;

import com.jackyan.mapreduce.beans.FlowBean;
import com.jackyan.mapreduce.mapper.FlowCountMapper;
import com.jackyan.mapreduce.mapper.FlowOrderMapper;
import com.jackyan.mapreduce.reducer.FlowCountReducer;
import com.jackyan.mapreduce.reducer.FlowOrderReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowOrderDriver {
    public static void main(String[] args) throws Exception{
        // 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置
        args = new String[] { "h:/input/phone_data-part-r-00000", "h:/output" };

        // 獲取配置信息屯吊,或者job對(duì)象實(shí)例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 指定本程序的jar包所在的本地路徑
        job.setJarByClass(FlowOrderDriver.class);

//        // 指定自定義數(shù)據(jù)分區(qū)
//        job.setPartitionerClass(ProvincePartitioner.class);
//
//        // 同時(shí)指定相應(yīng)數(shù)量的reduce task
//        job.setNumReduceTasks(5);

        // 指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類
        job.setMapperClass(FlowOrderMapper.class);
        job.setReducerClass(FlowOrderReducer.class);

        // 指定mapper輸出數(shù)據(jù)的kv類型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        // 指定最終輸出的數(shù)據(jù)的kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 指定job的輸入原始文件所在目錄
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 將job中配置的相關(guān)參數(shù)送巡,以及job所用的java類所在的jar包, 提交給yarn去運(yùn)行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

3.3.6 WritableComparable排序案例實(shí)操(區(qū)內(nèi)排序)

1 需求
要求每個(gè)省份手機(jī)號(hào)輸出的文件中按照總流量?jī)?nèi)部排序盒卸。
2 需求分析
基于前一個(gè)需求骗爆,增加自定義分區(qū)類,分區(qū)按照省份手機(jī)號(hào)設(shè)置世落。
3 代碼實(shí)現(xiàn)
(1)增加自定義分區(qū)類

package com.jackyan.mapreduce.partition;

import com.jackyan.mapreduce.beans.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean flowBean, int numPartitions) {
        // 1 獲取電話號(hào)碼的前三位
        String preNum = key.toString().substring(0, 3);

        int partition = 4;

        // 2 判斷是哪個(gè)省
        if ("136".equals(preNum)) {
            partition = 0;
        }else if ("137".equals(preNum)) {
            partition = 1;
        }else if ("138".equals(preNum)) {
            partition = 2;
        }else if ("139".equals(preNum)) {
            partition = 3;
        }

        return partition;
    }
}

(2)在驅(qū)動(dòng)類中添加分區(qū)類

// 加載自定義分區(qū)類
job.setPartitionerClass(ProvincePartitioner.class);

// 設(shè)置Reducetask個(gè)數(shù)
job.setNumReduceTasks(5);

3.3.7 Combiner合并

(1) Combiner是MR程序中Mapper和Reducer之外的一種組件淮腾。
(2) Combiner組件的父類就是Reducer。
(3) Combiner和Reducer的區(qū)別在于運(yùn)行的位置
Combiner是在每一個(gè)MapTask所在的節(jié)點(diǎn)運(yùn)行;
Reducer是接收全局所有Mapper的輸出結(jié)果屉佳;
(4) Combiner的意義就是對(duì)每一個(gè)MapTask的輸出進(jìn)行局部匯總谷朝,以減小網(wǎng)絡(luò)傳輸量。
(5) Combiner能夠應(yīng)用的前提是不能影響最終的業(yè)務(wù)邏輯武花,而且圆凰,Combiner的輸出kv應(yīng)該跟Reducer的輸入kv類型要對(duì)應(yīng)起來(lái)。
(6) 自定義Combiner實(shí)現(xiàn)步驟
(a)自定義一個(gè)Combiner繼承Reducer体箕,重寫Reduce方法

public class WordcountCombiner extends Reducer<Text, IntWritable, Text,IntWritable>{

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

        // 1 匯總操作
        int count = 0;
        for(IntWritable v :values){
            count += v.get();
        }

        // 2 寫出
        context.write(key, new IntWritable(count));
    }
}

(b)在Job驅(qū)動(dòng)類中設(shè)置:

job.setCombinerClass(WordcountCombiner.class);

3.3.8 Combiner合并案例實(shí)操

1 需求
統(tǒng)計(jì)過(guò)程中對(duì)每一個(gè)MapTask的輸出進(jìn)行局部匯總专钉,以減小網(wǎng)絡(luò)傳輸量即采用Combiner功能。
(1)數(shù)據(jù)輸入combine.txt

hello hadoop
hello java spark hadoop world
hello hadoop
hello java spark hadoop world
hello hadoop
hello java spark hadoop world
hello hadoop
hello java spark hadoop world

(2)期望輸出數(shù)據(jù)
期望:Combine輸入數(shù)據(jù)多累铅,輸出時(shí)經(jīng)過(guò)合并跃须,輸出數(shù)據(jù)降低。

2 案例實(shí)操-方案一:
1)增加一個(gè)WordcountCombiner類繼承Reducer

package com.jackyan.mapreduce.combiner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

    IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // 匯總
        int sum = 0;

        for (IntWritable value : values) {
            sum += value.get();
        }

        v.set(sum);

        context.write(key, v);
    }
}

2)在WordcountDriver驅(qū)動(dòng)類中指定Combiner

// 指定需要使用combiner娃兽,以及用哪個(gè)類作為combiner的邏輯
job.setCombinerClass(WordcountCombiner.class);

3 案例實(shí)操-方案二
1)將WordcountReducer作為Combiner在WordcountDriver驅(qū)動(dòng)類中指定

// 指定需要使用Combiner菇民,以及用哪個(gè)類作為Combiner的邏輯
job.setCombinerClass(WordcountReducer.class);

4 運(yùn)行結(jié)果
使用前后對(duì)比:

3.3.9 GroupingComparator分組(輔助排序)

對(duì)Reduce階段的數(shù)據(jù)根據(jù)某一個(gè)或幾個(gè)字段進(jìn)行分組。
分組排序步驟:
(1)自定義類繼承WritableComparator
(2)重寫compare()方法

@Override
public int compare(WritableComparable a, WritableComparable b) {
        // 比較的業(yè)務(wù)邏輯
        return result;
}

(3)創(chuàng)建一個(gè)構(gòu)造將比較對(duì)象的類傳給父類

protected OrderGroupingComparator() {
        super(OrderBean.class, true);
}

3.3.10 GroupingComparator分組案例實(shí)操

1 需求
有如下訂單數(shù)據(jù)

訂單id 商品id 成交金額
0000001 Pdt_01 222.8
Pdt_02 33.8
0000002 Pdt_03 522.8
Pdt_04 122.4
Pdt_05 722.4
0000003 Pdt_06 232.8
Pdt_02 33.8

現(xiàn)在需要求出每一個(gè)訂單中最貴的商品。
(1)輸入數(shù)據(jù)groupingComparator.txt

0000001 Pdt_01  222.8
0000002 Pdt_05  722.4
0000001 Pdt_02  33.8
0000003 Pdt_06  232.8
0000003 Pdt_02  33.8
0000002 Pdt_03  522.8
0000002 Pdt_04  122.4

(2)期望輸出數(shù)據(jù)

1   222.8
2   722.4
3   232.8

2 需求分析
(1)利用“訂單id和成交金額”作為key,可以將Map階段讀取到的所有訂單數(shù)據(jù)按照id升序排序,如果id相同再按照金額降序排序蓄氧,發(fā)送到Reduce。
(2)在Reduce端利用groupingComparator將訂單id相同的kv聚合成組仿吞,然后取第一個(gè)即是該訂單中最貴商品。

3 代碼實(shí)現(xiàn)
(1)定義訂單信息OrderBean類

package com.jackyan.mapreduce.beans;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {

    private int order_id; // 訂單id號(hào)
    private double price; // 價(jià)格

    public OrderBean() {
        super();
    }

    public OrderBean(int order_id, double price) {
        this.order_id = order_id;
        this.price = price;
    }

    @Override
    public int compareTo(OrderBean orderBean) {

        int res;

        if(order_id > orderBean.order_id) {
            res = 1;
        } else if (order_id < orderBean.order_id) {
            res = -1;
        } else {
            res = price > orderBean.price ? -1 : 1;
        }

        return res;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(order_id);
        out.writeDouble(price);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.order_id = in.readInt();
        this.price = in.readDouble();
    }

    public int getOrder_id() {
        return order_id;
    }

    public void setOrder_id(int order_id) {
        this.order_id = order_id;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return "OrderBean{" +
                "order_id=" + order_id +
                ", price=" + price +
                '}';
    }
}

(2)編寫OrderSortMapper類

package com.jackyan.mapreduce.mapper;

import com.jackyan.mapreduce.beans.OrderBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class OrderSortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {

    OrderBean k = new OrderBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1 獲取一行
        String line = value.toString();

        // 2 截取
        String[] fields = line.split("\t");

        // 3 封裝對(duì)象
        k.setOrder_id(Integer.parseInt(fields[0]));
        k.setPrice(Double.parseDouble(fields[2]));

        // 4 寫出
        context.write(k, NullWritable.get());
    }
}

(3)編寫OrderSortGroupingComparator類

package com.jackyan.mapreduce.comparator;

import com.jackyan.mapreduce.beans.OrderBean;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderSortGroupingComparator extends WritableComparator {

    protected OrderSortGroupingComparator() {
        super(OrderBean.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean aBean = (OrderBean) a;
        OrderBean bBean = (OrderBean) b;

        int res;

        if (aBean.getOrder_id() > bBean.getOrder_id()) {
            res = 1;
        } else if (aBean.getOrder_id() < bBean.getOrder_id()) {
            res = -1;
        } else {
            res = 0;
        }
        return  res;
    }
}

(4)編寫OrderSortReducer類

package com.jackyan.mapreduce.reducer;

import com.jackyan.mapreduce.beans.OrderBean;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class OrderSortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
}

(5)編寫OrderSortDriver類

package com.jackyan.mapreduce.driver;

import com.jackyan.mapreduce.beans.OrderBean;
import com.jackyan.mapreduce.comparator.OrderSortGroupingComparator;
import com.jackyan.mapreduce.mapper.OrderSortMapper;
import com.jackyan.mapreduce.reducer.OrderSortReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class OrderSortDriver {
    public static void main(String[] args) throws Exception, IOException {

// 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置
        args  = new String[]{"h:/input/groupingComparator.txt" , "h:/output"};

        // 1 獲取配置信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 設(shè)置jar包加載路徑
        job.setJarByClass(OrderSortDriver.class);

        // 3 加載map/reduce類
        job.setMapperClass(OrderSortMapper.class);
        job.setReducerClass(OrderSortReducer.class);

        // 4 設(shè)置map輸出數(shù)據(jù)key和value類型
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        // 5 設(shè)置最終輸出數(shù)據(jù)的key和value類型
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        // 6 設(shè)置輸入數(shù)據(jù)和輸出數(shù)據(jù)路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 8 設(shè)置reduce端的分組
        job.setGroupingComparatorClass(OrderSortGroupingComparator.class);

        // 7 提交
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

3.4 MapTask工作機(jī)制

(1)Read階段:MapTask通過(guò)用戶編寫的RecordReader,從輸入InputSplit中解析出一個(gè)個(gè)key/value。
(2)Map階段:該節(jié)點(diǎn)主要是將解析出的key/value交給用戶編寫map()函數(shù)處理下梢,并產(chǎn)生一系列新的key/value。
(3)Collect收集階段:在用戶編寫map()函數(shù)中志秃,當(dāng)數(shù)據(jù)處理完成后怔球,一般會(huì)調(diào)用OutputCollector.collect()輸出結(jié)果嚼酝。在該函數(shù)內(nèi)部浮还,它會(huì)將生成的key/value分區(qū)(調(diào)用Partitioner),并寫入一個(gè)環(huán)形內(nèi)存緩沖區(qū)中闽巩。
(4)Spill階段:即“溢寫”钧舌,當(dāng)環(huán)形緩沖區(qū)滿后,MapReduce會(huì)將數(shù)據(jù)寫到本地磁盤上涎跨,生成一個(gè)臨時(shí)文件洼冻。需要注意的是,將數(shù)據(jù)寫入本地磁盤之前隅很,先要對(duì)數(shù)據(jù)進(jìn)行一次本地排序撞牢,并在必要時(shí)對(duì)數(shù)據(jù)進(jìn)行合并、壓縮等操作叔营。

溢寫階段詳情:
步驟1:利用快速排序算法對(duì)緩存區(qū)內(nèi)的數(shù)據(jù)進(jìn)行排序屋彪,排序方式是,先按照分區(qū)編號(hào)Partition進(jìn)行排序绒尊,然后按照key進(jìn)行排序畜挥。這樣,經(jīng)過(guò)排序后婴谱,數(shù)據(jù)以分區(qū)為單位聚集在一起蟹但,且同一分區(qū)內(nèi)所有數(shù)據(jù)按照key有序。
步驟2:按照分區(qū)編號(hào)由小到大依次將每個(gè)分區(qū)中的數(shù)據(jù)寫入任務(wù)工作目錄下的臨時(shí)文件output/spillN.out(N表示當(dāng)前溢寫次數(shù))中谭羔。如果用戶設(shè)置了Combiner华糖,則寫入文件之前,對(duì)每個(gè)分區(qū)中的數(shù)據(jù)進(jìn)行一次聚集操作瘟裸。
步驟3:將分區(qū)數(shù)據(jù)的元信息寫到內(nèi)存索引數(shù)據(jù)結(jié)構(gòu)SpillRecord中客叉,其中每個(gè)分區(qū)的元信息包括在臨時(shí)文件中的偏移量、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小。如果當(dāng)前內(nèi)存索引大小超過(guò)1MB十办,則將內(nèi)存索引寫到文件output/spillN.out.index中秀撇。

(5)Combine階段:當(dāng)所有數(shù)據(jù)處理完成后,MapTask對(duì)所有臨時(shí)文件進(jìn)行一次合并向族,以確保最終只會(huì)生成一個(gè)數(shù)據(jù)文件呵燕。

當(dāng)所有數(shù)據(jù)處理完后,MapTask會(huì)將所有臨時(shí)文件合并成一個(gè)大文件件相,并保存到文件output/file.out中再扭,同時(shí)生成相應(yīng)的索引文件output/file.out.index。在進(jìn)行文件合并過(guò)程中夜矗,MapTask以分區(qū)為單位進(jìn)行合并泛范。對(duì)于某個(gè)分區(qū),它將采用多輪遞歸合并的方式紊撕。每輪合并io.sort.factor(默認(rèn)10)個(gè)文件罢荡,并將產(chǎn)生的文件重新加入待合并列表中,對(duì)文件排序后对扶,重復(fù)以上過(guò)程区赵,直到最終得到一個(gè)大文件。讓每個(gè)MapTask最終只生成一個(gè)數(shù)據(jù)文件浪南,可避免同時(shí)打開(kāi)大量文件和同時(shí)讀取大量小文件產(chǎn)生的隨機(jī)讀取帶來(lái)的開(kāi)銷笼才。

3.5 ReduceTask工作機(jī)制

(1)Copy階段:ReduceTask從各個(gè)MapTask上遠(yuǎn)程拷貝一片數(shù)據(jù),并針對(duì)某一片數(shù)據(jù)络凿,如果其大小超過(guò)一定閾值骡送,則寫到磁盤上,否則直接放到內(nèi)存中絮记。
(2)Merge階段:在遠(yuǎn)程拷貝數(shù)據(jù)的同時(shí)摔踱,ReduceTask啟動(dòng)了兩個(gè)后臺(tái)線程對(duì)內(nèi)存和磁盤上的文件進(jìn)行合并,以防止內(nèi)存使用過(guò)多或磁盤上文件過(guò)多到千。
(3)Sort階段:按照MapReduce語(yǔ)義昌渤,用戶編寫reduce()函數(shù)輸入數(shù)據(jù)是按key進(jìn)行聚集的一組數(shù)據(jù)。為了將key相同的數(shù)據(jù)聚在一起憔四,Hadoop采用了基于排序的策略膀息。由于各個(gè)MapTask已經(jīng)實(shí)現(xiàn)對(duì)自己的處理結(jié)果進(jìn)行了局部排序,因此了赵,ReduceTask只需對(duì)所有數(shù)據(jù)進(jìn)行一次歸并排序即可潜支。
(4)Reduce階段:reduce()函數(shù)將計(jì)算結(jié)果寫到HDFS上。

設(shè)置ReduceTask并行度(個(gè)數(shù))
ReduceTask的并行度同樣影響整個(gè)Job的執(zhí)行并發(fā)度和執(zhí)行效率柿汛,但與MapTask的并發(fā)數(shù)由切片數(shù)決定不同冗酿,ReduceTask數(shù)量的決定是可以直接手動(dòng)設(shè)置:

// 默認(rèn)值是1埠对,手動(dòng)設(shè)置為4
job.setNumReduceTasks(4);

實(shí)驗(yàn):測(cè)試ReduceTask多少合適
(1)實(shí)驗(yàn)環(huán)境:1個(gè)Master節(jié)點(diǎn),16個(gè)Slave節(jié)點(diǎn):CPU:8GHZ裁替,內(nèi)存: 2G
(2)實(shí)驗(yàn)結(jié)論:

MapTask =16
ReduceTask 1 5 10 15 16 20 25 30 45 60
總時(shí)間 892 146 110 92 88 100 128 101 145 104

注意事項(xiàng)
(1)ReduceTask=0项玛,表示沒(méi)有Reduce階段,輸出文件個(gè)數(shù)和Map個(gè)數(shù)一致弱判。
(2)ReduceTask默認(rèn)值就是1襟沮,所以輸出文件個(gè)數(shù)為一個(gè)。
(3)如果數(shù)據(jù)分布不均勻昌腰,就有可能在Reduce階段產(chǎn)生數(shù)據(jù)傾斜
(4)ReduceTask數(shù)量并不是任意設(shè)置开伏,還要考慮業(yè)務(wù)邏輯需求,有些情況下遭商,需要計(jì)算全局匯總結(jié)果固灵,就只能有1個(gè)ReduceTask。
(5)具體多少個(gè)ReduceTask劫流,需要根據(jù)集群性能而定巫玻。
(6)如果分區(qū)數(shù)不是1,但是ReduceTask為1困介,是否執(zhí)行分區(qū)過(guò)程大审。答案是:不執(zhí)行分區(qū)過(guò)程蘸际。因?yàn)樵贛apTask的源碼中座哩,執(zhí)行分區(qū)的前提是先判斷ReduceNum個(gè)數(shù)是否大于1。不大于1肯定不執(zhí)行粮彤。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末根穷,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子导坟,更是在濱河造成了極大的恐慌屿良,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件惫周,死亡現(xiàn)場(chǎng)離奇詭異尘惧,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)递递,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門喷橙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人登舞,你說(shuō)我怎么就攤上這事贰逾。” “怎么了菠秒?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵疙剑,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我,道長(zhǎng)言缤,這世上最難降的妖魔是什么嚼蚀? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮管挟,結(jié)果婚禮上驰坊,老公的妹妹穿的比我還像新娘。我一直安慰自己哮独,他們只是感情好拳芙,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著皮璧,像睡著了一般舟扎。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上悴务,一...
    開(kāi)封第一講書(shū)人閱讀 51,125評(píng)論 1 297
  • 那天睹限,我揣著相機(jī)與錄音,去河邊找鬼讯檐。 笑死羡疗,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的别洪。 我是一名探鬼主播叨恨,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼挖垛!你這毒婦竟也來(lái)了痒钝?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤痢毒,失蹤者是張志新(化名)和其女友劉穎送矩,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體哪替,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡栋荸,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了凭舶。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片晌块。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖库快,靈堂內(nèi)的尸體忽然破棺而出摸袁,到底是詐尸還是另有隱情,我是刑警寧澤义屏,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布靠汁,位于F島的核電站蜂大,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏蝶怔。R本人自食惡果不足惜奶浦,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望踢星。 院中可真熱鬧澳叉,春花似錦、人聲如沸沐悦。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)藏否。三九已至瓶殃,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間副签,已是汗流浹背遥椿。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留淆储,地道東北人冠场。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像本砰,于是被迫代替她去往敵國(guó)和親碴裙。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容