MapReduce的運(yùn)行機(jī)制詳解

1 .MapReduce的運(yùn)行機(jī)制詳解

1.1:MapTask 工作機(jī)制

1561706243507.png

1561706253827.png

簡單概述:inputFile通過split被邏輯切分為多個(gè)split文件革砸,通過Record按行讀取內(nèi)容給map(用戶自己實(shí)現(xiàn)的)進(jìn)行處理蜻势,數(shù)據(jù)被map處理結(jié)束之后交給OutputCollector收集器譬圣,對(duì)其結(jié)果key進(jìn)行分區(qū)(默認(rèn)使用hash分區(qū)),然后寫入buffer超陆,每個(gè)map task都有一個(gè)內(nèi)存緩沖區(qū),存儲(chǔ)著map的輸出結(jié)果蟀俊,當(dāng)緩沖區(qū)快滿的時(shí)候需要將緩沖區(qū)的數(shù)據(jù)以一個(gè)臨時(shí)文件的方式存放到磁盤,當(dāng)整個(gè)map task結(jié)束后再對(duì)磁盤中這個(gè)map task產(chǎn)生的所有臨時(shí)文件做合并订雾,生成最終的正式輸出文件肢预,然后等待reduce task來拉數(shù)據(jù)

詳細(xì)步驟
  1. 讀取數(shù)據(jù)組件 InputFormat (默認(rèn) TextInputFormat) 會(huì)通過 getSplits 方法對(duì)輸入目錄中文件進(jìn)行邏輯切片規(guī)劃得到 block, 有多少個(gè) block就對(duì)應(yīng)啟動(dòng)多少個(gè) MapTask.

  2. 將輸入文件切分為 block 之后, 由 RecordReader 對(duì)象 (默認(rèn)是LineRecordReader) 進(jìn)行讀取, 以 \n 作為分隔符, 讀取一行數(shù)據(jù), 返回 <key,value>. Key 表示每行首字符偏移值, Value 表示這一行文本內(nèi)容

  3. 讀取 block 返回 <key,value>, 進(jìn)入用戶自己繼承的 Mapper 類中葬燎,執(zhí)行用戶重寫的 map 函數(shù), RecordReader 讀取一行這里調(diào)用一次

  4. Mapper 邏輯結(jié)束之后, 將 Mapper 的每條結(jié)果通過 context.write 進(jìn)行collect數(shù)據(jù)收集. 在 collect 中, 會(huì)先對(duì)其進(jìn)行分區(qū)處理误甚,默認(rèn)使用 HashPartitioner

    • MapReduce 提供 Partitioner 接口, 它的作用就是根據(jù) KeyValueReducer 的數(shù)量來決定當(dāng)前的這對(duì)輸出數(shù)據(jù)最終應(yīng)該交由哪個(gè) Reduce task 處理, 默認(rèn)對(duì) Key Hash 后再以 Reducer 數(shù)量取模. 默認(rèn)的取模方式只是為了平均 Reducer 的處理能力, 如果用戶自己對(duì) Partitioner 有需求, 可以訂制并設(shè)置到 Job 上

  5. 接下來, 會(huì)將數(shù)據(jù)寫入內(nèi)存, 內(nèi)存中這片區(qū)域叫做環(huán)形緩沖區(qū), 緩沖區(qū)的作用是批量收集 Mapper 結(jié)果, 減少磁盤 IO 的影響. 我們的 Key/Value 對(duì)以及 Partition 的結(jié)果都會(huì)被寫入緩沖區(qū). 當(dāng)然, 寫入之前,Key 與 Value 值都會(huì)被序列化成字節(jié)數(shù)組

    • 環(huán)形緩沖區(qū)其實(shí)是一個(gè)數(shù)組, 數(shù)組中存放著 Key, Value 的序列化數(shù)據(jù)和 Key, Value 的元數(shù)據(jù)信息, 包括 Partition, Key 的起始位置, Value 的起始位置以及 Value 的長度. 環(huán)形結(jié)構(gòu)是一個(gè)抽象概念

    • 緩沖區(qū)是有大小限制, 默認(rèn)是 100MB. 當(dāng) Mapper 的輸出結(jié)果很多時(shí), 就可能會(huì)撐爆內(nèi)存, 所以需要在一定條件下將緩沖區(qū)中的數(shù)據(jù)臨時(shí)寫入磁盤, 然后重新利用這塊緩沖區(qū). 這個(gè)從內(nèi)存往磁盤寫數(shù)據(jù)的過程被稱為 Spill, 中文可譯為溢寫. 這個(gè)溢寫是由單獨(dú)線程來完成, 不影響往緩沖區(qū)寫 Mapper 結(jié)果的線程. 溢寫線程啟動(dòng)時(shí)不應(yīng)該阻止 Mapper 的結(jié)果輸出, 所以整個(gè)緩沖區(qū)有個(gè)溢寫的比例 spill.percent. 這個(gè)比例默認(rèn)是 0.8, 也就是當(dāng)緩沖區(qū)的數(shù)據(jù)已經(jīng)達(dá)到閾值 buffer size * spill percent = 100MB * 0.8 = 80MB, 溢寫線程啟動(dòng), 鎖定這 80MB 的內(nèi)存, 執(zhí)行溢寫過程. Mapper 的輸出結(jié)果還可以往剩下的 20MB 內(nèi)存中寫, 互不影響

  6. 當(dāng)溢寫線程啟動(dòng)后, 需要對(duì)這 80MB 空間內(nèi)的 Key 做排序 (Sort). 排序是 MapReduce 模型默認(rèn)的行為, 這里的排序也是對(duì)序列化的字節(jié)做的排序

    • 如果 Job 設(shè)置過 Combiner, 那么現(xiàn)在就是使用 Combiner 的時(shí)候了. 將有相同 Key 的 Key/Value 對(duì)的 Value 加起來, 減少溢寫到磁盤的數(shù)據(jù)量. Combiner 會(huì)優(yōu)化 MapReduce 的中間結(jié)果, 所以它在整個(gè)模型中會(huì)多次使用

    • 那哪些場景才能使用 Combiner 呢? 從這里分析, Combiner 的輸出是 Reducer 的輸入, Combiner 絕不能改變最終的計(jì)算結(jié)果. Combiner 只應(yīng)該用于那種 Reduce 的輸入 Key/Value 與輸出 Key/Value 類型完全一致, 且不影響最終結(jié)果的場景. 比如累加, 最大值等. Combiner 的使用一定得慎重, 如果用好, 它對(duì) Job 執(zhí)行效率有幫助, 反之會(huì)影響 Reducer 的最終結(jié)果

  7. 合并溢寫文件, 每次溢寫會(huì)在磁盤上生成一個(gè)臨時(shí)文件 (寫之前判斷是否有 Combiner), 如果 Mapper 的輸出結(jié)果真的很大, 有多次這樣的溢寫發(fā)生, 磁盤上相應(yīng)的就會(huì)有多個(gè)臨時(shí)文件存在. 當(dāng)整個(gè)數(shù)據(jù)處理結(jié)束之后開始對(duì)磁盤中的臨時(shí)文件進(jìn)行 Merge 合并, 因?yàn)樽罱K的文件只有一個(gè), 寫入磁盤, 并且為這個(gè)文件提供了一個(gè)索引文件, 以記錄每個(gè)reduce對(duì)應(yīng)數(shù)據(jù)的偏移量

配置
配置 默認(rèn)值 解釋
mapreduce.task.io.sort.mb 100 設(shè)置環(huán)型緩沖區(qū)的內(nèi)存值大小
mapreduce.map.sort.spill.percent 0.8 設(shè)置溢寫的比例
mapreduce.cluster.local.dir ${hadoop.tmp.dir}/mapred/local 溢寫數(shù)據(jù)目錄
mapreduce.task.io.sort.factor 10 設(shè)置一次合并多少個(gè)溢寫文件

1.2 :ReduceTask 工作機(jī)制

1561706306005.png
1561706306005.png

Reduce 大致分為 copy、sort、reduce 三個(gè)階段垢油,重點(diǎn)在前兩個(gè)階段。copy 階段包含一個(gè) eventFetcher 來獲取已完成的 map 列表冈钦,由 Fetcher 線程去 copy 數(shù)據(jù),在此過程中會(huì)啟動(dòng)兩個(gè) merge 線程李请,分別為 inMemoryMerger 和 onDiskMerger瞧筛,分別將內(nèi)存中的數(shù)據(jù) merge 到磁盤和將磁盤中的數(shù)據(jù)進(jìn)行 merge。待數(shù)據(jù) copy 完成之后导盅,copy 階段就完成了较幌,開始進(jìn)行 sort 階段,sort 階段主要是執(zhí)行 finalMerge 操作白翻,純粹的 sort 階段乍炉,完成之后就是 reduce 階段,調(diào)用用戶定義的 reduce 函數(shù)進(jìn)行處理

詳細(xì)步驟
  1. Copy階段滤馍,簡單地拉取數(shù)據(jù)岛琼。Reduce進(jìn)程啟動(dòng)一些數(shù)據(jù)copy線程(Fetcher),通過HTTP方式請(qǐng)求maptask獲取屬于自己的文件巢株。
  2. Merge階段槐瑞。這里的merge如map端的merge動(dòng)作,只是數(shù)組中存放的是不同map端copy來的數(shù)值阁苞。Copy過來的數(shù)據(jù)會(huì)先放入內(nèi)存緩沖區(qū)中困檩,這里的緩沖區(qū)大小要比map端的更為靈活。merge有三種形式:內(nèi)存到內(nèi)存那槽;內(nèi)存到磁盤窗看;磁盤到磁盤。默認(rèn)情況下第一種形式不啟用倦炒。當(dāng)內(nèi)存中的數(shù)據(jù)量到達(dá)一定閾值,就啟動(dòng)內(nèi)存到磁盤的merge软瞎。與map 端類似逢唤,這也是溢寫的過程拉讯,這個(gè)過程中如果你設(shè)置有Combiner,也是會(huì)啟用的鳖藕,然后在磁盤中生成了眾多的溢寫文件魔慷。第二種merge方式一直在運(yùn)行,直到?jīng)]有map端的數(shù)據(jù)時(shí)才結(jié)束著恩,然后啟動(dòng)第三種磁盤到磁盤的merge方式生成最終的文件院尔。
  3. 合并排序。把分散的數(shù)據(jù)合并成一個(gè)大的數(shù)據(jù)后喉誊,還會(huì)再對(duì)合并后的數(shù)據(jù)排序邀摆。
  4. 對(duì)排序后的鍵值對(duì)調(diào)用reduce方法,鍵相等的鍵值對(duì)調(diào)用一次reduce方法伍茄,每次調(diào)用會(huì)產(chǎn)生零個(gè)或者多個(gè)鍵值對(duì)栋盹,最后把這些輸出的鍵值對(duì)寫入到HDFS文件中。

1.3:Shuffle 過程

map 階段處理的數(shù)據(jù)如何傳遞給 reduce 階段敷矫,是 MapReduce 框架中最關(guān)鍵的一個(gè)流程例获,這個(gè)流程就叫 shuffle
shuffle: 洗牌、發(fā)牌 ——(核心機(jī)制:數(shù)據(jù)分區(qū)曹仗,排序榨汤,分組,規(guī)約怎茫,合并等過程)

1561706306005.png

shuffle 是 Mapreduce 的核心收壕,它分布在 Mapreduce 的 map 階段和 reduce 階段。一般把從 Map 產(chǎn)生輸出開始到 Reduce 取得數(shù)據(jù)作為輸入之前的過程稱作 shuffle遭居。

  1. Collect階段:將 MapTask 的結(jié)果輸出到默認(rèn)大小為 100M 的環(huán)形緩沖區(qū)啼器,保存的是 key/value,Partition 分區(qū)信息等俱萍。
  2. Spill階段:當(dāng)內(nèi)存中的數(shù)據(jù)量達(dá)到一定的閥值的時(shí)候端壳,就會(huì)將數(shù)據(jù)寫入本地磁盤,在將數(shù)據(jù)寫入磁盤之前需要對(duì)數(shù)據(jù)進(jìn)行一次排序的操作枪蘑,如果配置了 combiner损谦,還會(huì)將有相同分區(qū)號(hào)和 key 的數(shù)據(jù)進(jìn)行排序。
  3. Merge階段:把所有溢出的臨時(shí)文件進(jìn)行一次合并操作岳颇,以確保一個(gè) MapTask 最終只產(chǎn)生一個(gè)中間數(shù)據(jù)文件照捡。
  4. Copy階段:ReduceTask 啟動(dòng) Fetcher 線程到已經(jīng)完成 MapTask 的節(jié)點(diǎn)上復(fù)制一份屬于自己的數(shù)據(jù),這些數(shù)據(jù)默認(rèn)會(huì)保存在內(nèi)存的緩沖區(qū)中话侧,當(dāng)內(nèi)存的緩沖區(qū)達(dá)到一定的閥值的時(shí)候栗精,就會(huì)將數(shù)據(jù)寫到磁盤之上。
  5. Merge階段:在 ReduceTask 遠(yuǎn)程復(fù)制數(shù)據(jù)的同時(shí),會(huì)在后臺(tái)開啟兩個(gè)線程對(duì)內(nèi)存到本地的數(shù)據(jù)文件進(jìn)行合并操作悲立。
  6. Sort階段:在對(duì)數(shù)據(jù)進(jìn)行合并的同時(shí)鹿寨,會(huì)進(jìn)行排序操作,由于 MapTask 階段已經(jīng)對(duì)數(shù)據(jù)進(jìn)行了局部的排序薪夕,ReduceTask 只需保證 Copy 的數(shù)據(jù)的最終整體有效性即可脚草。
    Shuffle 中的緩沖區(qū)大小會(huì)影響到 mapreduce 程序的執(zhí)行效率,原則上說原献,緩沖區(qū)越大馏慨,磁盤io的次數(shù)越少,執(zhí)行速度就越快
    緩沖區(qū)的大小可以通過參數(shù)調(diào)整, 參數(shù):mapreduce.task.io.sort.mb 默認(rèn)100M

2. 案例: Reduce 端實(shí)現(xiàn) JOIN

2.1. 需求

假如數(shù)據(jù)量巨大姑隅,兩表的數(shù)據(jù)是以文件的形式存儲(chǔ)在 HDFS 中, 需要用 MapReduce 程序來實(shí)現(xiàn)以下 SQL 查詢運(yùn)算

select  a.id,a.date,b.name,b.category_id,b.price from t_order a left join t_product b on a.pid = b.id
商品表
id pname category_id price
P0001 小米5 1000 2000
P0002 錘子T1 1000 3000
訂單數(shù)據(jù)表
id date pid amount
1001 20150710 P0001 2
1002 20150710 P0002 3

2.2 實(shí)現(xiàn)步驟

通過將關(guān)聯(lián)的條件作為map輸出的key写隶,將兩表滿足join條件的數(shù)據(jù)并攜帶數(shù)據(jù)所來源的文件信息,發(fā)往同一個(gè)reduce task粤策,在reduce中進(jìn)行數(shù)據(jù)的串聯(lián)

Step 1: 定義 Mapper

public class ReduceJoinMapper extends Mapper<LongWritable,Text,Text,Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1:判斷數(shù)據(jù)來自哪個(gè)文件
        FileSplit fileSplit = (FileSplit) context.getInputSplit();
        String fileName = fileSplit.getPath().getName();
        if(fileName.equals("product.txt")){
            //數(shù)據(jù)來自商品表
            //2:將K1和V1轉(zhuǎn)為K2和V2,寫入上下文中
            String[] split = value.toString().split(",");
            String productId = split[0];

            context.write(new Text(productId), value);

        }else{
            //數(shù)據(jù)來自訂單表
            //2:將K1和V1轉(zhuǎn)為K2和V2,寫入上下文中
            String[] split = value.toString().split(",");
            String productId = split[2];

            context.write(new Text(productId), value);

        }

    }
}

Step 2: 定義 Reducer

public class ReduceJoinMapper extends Mapper<LongWritable,Text,Text,Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1:判斷數(shù)據(jù)來自哪個(gè)文件

        FileSplit fileSplit = (FileSplit) context.getInputSplit();
        String fileName = fileSplit.getPath().getName();
        if(fileName.equals("product.txt")){
            //數(shù)據(jù)來自商品表
            //2:將K1和V1轉(zhuǎn)為K2和V2,寫入上下文中
            String[] split = value.toString().split(",");
            String productId = split[0];

            context.write(new Text(productId), value);

        }else{
            //數(shù)據(jù)來自訂單表
            //2:將K1和V1轉(zhuǎn)為K2和V2,寫入上下文中
            String[] split = value.toString().split(",");
            String productId = split[2];

            context.write(new Text(productId), value);

        }



    }
}

Step 3: 定義主類

public class ReduceJoinReducer extends Reducer<Text,Text,Text,Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
       //1:遍歷集合,獲取V3 (first +second)
        String first = "";
        String second = "";
        for (Text value : values) {
            if(value.toString().startsWith("p")){
                first = value.toString();
            }else{
                second += value.toString();
            }

        }
        //2:將K3和V3寫入上下文中
        context.write(key, new Text(first+"\t"+second));
    }
}

3. 案例: Map端實(shí)現(xiàn) JOIN

3.1 概述

? 適用于關(guān)聯(lián)表中有小表的情形.

? 使用分布式緩存,可以將小表分發(fā)到所有的map節(jié)點(diǎn)樟澜,這樣,map節(jié)點(diǎn)就可以在本地對(duì)自己所讀到的大表數(shù)據(jù)進(jìn)行join并輸出最終結(jié)果叮盘,可以大大提高join操作的并發(fā)度秩贰,加快處理速度

3.2 實(shí)現(xiàn)步驟

先在mapper類中預(yù)先定義好小表,進(jìn)行join

引入實(shí)際場景中的解決方案:一次加載數(shù)據(jù)庫或者用

Step 1:定義Mapper
public class MapJoinMapper extends Mapper<LongWritable,Text,Text,Text>{
    private HashMap<String, String> map = new HashMap<>();

    //第一件事情:將分布式緩存的小表數(shù)據(jù)讀取到本地Map集合(只需要做一次)

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //1:獲取分布式緩存文件列表
        URI[] cacheFiles =  context.getCacheFiles();

        //2:獲取指定的分布式緩存文件的文件系統(tǒng)(FileSystem)
        FileSystem fileSystem = FileSystem.get(cacheFiles[0], context.getConfiguration());

        //3:獲取文件的輸入流
        FSDataInputStream inputStream = fileSystem.open(new Path(cacheFiles[0]));

        //4:讀取文件內(nèi)容, 并將數(shù)據(jù)存入Map集合
           //4.1 將字節(jié)輸入流轉(zhuǎn)為字符緩沖流FSDataInputStream --->BufferedReader
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
           //4.2 讀取小表文件內(nèi)容,以行位單位,并將讀取的數(shù)據(jù)存入map集合


        String line = null;
        while((line = bufferedReader.readLine()) != null){
            String[] split = line.split(",");

            map.put(split[0], line);

        }


        //5:關(guān)閉流
        bufferedReader.close();
        fileSystem.close();


    }

    //第二件事情:對(duì)大表的處理業(yè)務(wù)邏輯,而且要實(shí)現(xiàn)大表和小表的join操作

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //1:從行文本數(shù)據(jù)中獲取商品的id: p0001 , p0002  得到了K2
            String[] split = value.toString().split(",");
            String productId = split[2];  //K2

            //2:在Map集合中,將商品的id作為鍵,獲取值(商品的行文本數(shù)據(jù)) ,將value和值拼接,得到V2
            String productLine = map.get(productId);
            String valueLine = productLine+"\t"+value.toString(); //V2
            //3:將K2和V2寫入上下文中
            context.write(new Text(productId), new Text(valueLine));
    }
}

Step 2:定義主類
public class JobMain  extends Configured implements Tool{
    @Override
    public int run(String[] args) throws Exception {
        //1:獲取job對(duì)象
        Job job = Job.getInstance(super.getConf(), "map_join_job");

        //2:設(shè)置job對(duì)象(將小表放在分布式緩存中)
            //將小表放在分布式緩存中
           // DistributedCache.addCacheFile(new URI("hdfs://node01:8020/cache_file/product.txt"), super.getConf());
           job.addCacheFile(new URI("hdfs://node01:8020/cache_file/product.txt"));

           //第一步:設(shè)置輸入類和輸入的路徑
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.addInputPath(job, new Path("file:///D:\\input\\map_join_input"));
            //第二步:設(shè)置Mapper類和數(shù)據(jù)類型
            job.setMapperClass(MapJoinMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            //第八步:設(shè)置輸出類和輸出路徑
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\map_join_out"));


        //3:等待任務(wù)結(jié)束
        boolean bl = job.waitForCompletion(true);
        return bl ? 0 :1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        //啟動(dòng)job任務(wù)
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);
    }
}

4. 案例:求共同好友

4.1 需求分析

以下是qq的好友列表數(shù)據(jù)柔吼,冒號(hào)前是一個(gè)用戶毒费,冒號(hào)后是該用戶的所有好友(數(shù)據(jù)中的好友關(guān)系是單向的)

A:B,C,D,F,E,O
B:A,C,E,K
C:A,B,D,E,I 
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

求出哪些人兩兩之間有共同好友,及他倆的共同好友都有誰愈魏?

4.2 實(shí)現(xiàn)步驟

第一步:代碼實(shí)現(xiàn)

Mapper類

public class Step1Mapper extends Mapper<LongWritable,Text,Text,Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
         //1:以冒號(hào)拆分行文本數(shù)據(jù): 冒號(hào)左邊就是V2
        String[] split = value.toString().split(":");
        String userStr = split[0];

        //2:將冒號(hào)右邊的字符串以逗號(hào)拆分,每個(gè)成員就是K2
        String[] split1 = split[1].split(",");
        for (String s : split1) {
            //3:將K2和v2寫入上下文中
            context.write(new Text(s), new Text(userStr));
        }
    }
}

Reducer類:

public class Step1Reducer extends Reducer<Text,Text,Text,Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        //1:遍歷集合,并將每一個(gè)元素拼接,得到K3
        StringBuffer buffer = new StringBuffer();

        for (Text value : values) {
            buffer.append(value.toString()).append("-");
        }
        //2:K2就是V3
        //3:將K3和V3寫入上下文中
        context.write(new Text(buffer.toString()), key);
    }
}

JobMain:

public class JobMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        //1:獲取Job對(duì)象
        Job job = Job.getInstance(super.getConf(), "common_friends_step1_job");

        //2:設(shè)置job任務(wù)
            //第一步:設(shè)置輸入類和輸入路徑
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.addInputPath(job, new Path("file:///D:\\input\\common_friends_step1_input"));

            //第二步:設(shè)置Mapper類和數(shù)據(jù)類型
            job.setMapperClass(Step1Mapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            //第三,四,五,六

            //第七步:設(shè)置Reducer類和數(shù)據(jù)類型
            job.setReducerClass(Step1Reducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            //第八步:設(shè)置輸出類和輸出的路徑
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\common_friends_step1_out"));

        //3:等待job任務(wù)結(jié)束
        boolean bl = job.waitForCompletion(true);


        return bl ? 0: 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();

        //啟動(dòng)job任務(wù)
        int run = ToolRunner.run(configuration, new JobMain(), args);

        System.exit(run);
    }
}
第二步:代碼實(shí)現(xiàn)

Mapper類

public class Step2Mapper extends Mapper<LongWritable,Text,Text,Text> {
    /*
     K1           V1

     0            A-F-C-J-E-    B
    ----------------------------------

     K2             V2
     A-C            B
     A-E            B
     A-F            B
     C-E            B

     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1:拆分行文本數(shù)據(jù),結(jié)果的第二部分可以得到V2
        String[] split = value.toString().split("\t");
        String   friendStr =split[1];

        //2:繼續(xù)以'-'為分隔符拆分行文本數(shù)據(jù)第一部分,得到數(shù)組
        String[] userArray = split[0].split("-");

        //3:對(duì)數(shù)組做一個(gè)排序
        Arrays.sort(userArray);

        //4:對(duì)數(shù)組中的元素進(jìn)行兩兩組合,得到K2
        /*
          A-E-C ----->  A  C  E

          A  C  E
            A  C  E

         */
        for (int i = 0; i <userArray.length -1 ; i++) {
            for (int j = i+1; j  < userArray.length ; j++) {
                //5:將K2和V2寫入上下文中
                context.write(new Text(userArray[i] +"-"+userArray[j]), new Text(friendStr));
            }

        }

    }
}

Reducer類:

public class Step2Reducer extends Reducer<Text,Text,Text,Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        //1:原來的K2就是K3
        //2:將集合進(jìn)行遍歷,將集合中的元素拼接,得到V3
        StringBuffer buffer = new StringBuffer();
        for (Text value : values) {
            buffer.append(value.toString()).append("-");
            
        }
        //3:將K3和V3寫入上下文中
        context.write(key, new Text(buffer.toString()));
    }
}

JobMain:

public class JobMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        //1:獲取Job對(duì)象
        Job job = Job.getInstance(super.getConf(), "common_friends_step2_job");

        //2:設(shè)置job任務(wù)
            //第一步:設(shè)置輸入類和輸入路徑
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.addInputPath(job, new Path("file:///D:\\out\\common_friends_step1_out"));

            //第二步:設(shè)置Mapper類和數(shù)據(jù)類型
            job.setMapperClass(Step2Mapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            //第三,四,五,六

            //第七步:設(shè)置Reducer類和數(shù)據(jù)類型
            job.setReducerClass(Step2Reducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            //第八步:設(shè)置輸出類和輸出的路徑
            job.setOutputFormatClass(TextOutputFormat.class);
            TextOutputFormat.setOutputPath(job, new Path("file:///D:\\out\\common_friends_step2_out"));

        //3:等待job任務(wù)結(jié)束
        boolean bl = job.waitForCompletion(true);
        return bl ? 0: 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        //啟動(dòng)job任務(wù)
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末觅玻,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子培漏,更是在濱河造成了極大的恐慌溪厘,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,817評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件牌柄,死亡現(xiàn)場離奇詭異畸悬,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)珊佣,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門蹋宦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人咒锻,你說我怎么就攤上這事冷冗。” “怎么了惑艇?”我有些...
    開封第一講書人閱讀 157,354評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵蒿辙,是天一觀的道長。 經(jīng)常有香客問我,道長须板,這世上最難降的妖魔是什么碰镜? 我笑而不...
    開封第一講書人閱讀 56,498評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮习瑰,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘秽荤。我一直安慰自己甜奄,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,600評(píng)論 6 386
  • 文/花漫 我一把揭開白布窃款。 她就那樣靜靜地躺著课兄,像睡著了一般。 火紅的嫁衣襯著肌膚如雪晨继。 梳的紋絲不亂的頭發(fā)上烟阐,一...
    開封第一講書人閱讀 49,829評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音紊扬,去河邊找鬼蜒茄。 笑死,一個(gè)胖子當(dāng)著我的面吹牛餐屎,可吹牛的內(nèi)容都是我干的檀葛。 我是一名探鬼主播,決...
    沈念sama閱讀 38,979評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼腹缩,長吁一口氣:“原來是場噩夢啊……” “哼屿聋!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起藏鹊,我...
    開封第一講書人閱讀 37,722評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤润讥,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后盘寡,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體楚殿,經(jīng)...
    沈念sama閱讀 44,189評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,519評(píng)論 2 327
  • 正文 我和宋清朗相戀三年宴抚,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了勒魔。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,654評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡菇曲,死狀恐怖冠绢,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情常潮,我是刑警寧澤弟胀,帶...
    沈念sama閱讀 34,329評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站,受9級(jí)特大地震影響孵户,放射性物質(zhì)發(fā)生泄漏萧朝。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,940評(píng)論 3 313
  • 文/蒙蒙 一夏哭、第九天 我趴在偏房一處隱蔽的房頂上張望检柬。 院中可真熱鬧,春花似錦竖配、人聲如沸何址。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽用爪。三九已至,卻和暖如春胁镐,著一層夾襖步出監(jiān)牢的瞬間偎血,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評(píng)論 1 266
  • 我被黑心中介騙來泰國打工盯漂, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留颇玷,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,382評(píng)論 2 360
  • 正文 我出身青樓宠能,卻偏偏與公主長得像亚隙,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子违崇,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,543評(píng)論 2 349