多個MapReduce之間的嵌套

多個MapReduce之間的嵌套

在很多實際工作中赔嚎,單個MR不能滿足邏輯需求旭贬,而是需要多個MR之間的相互嵌套。很多場景下锐涯,一個MR的輸入依賴于另一個MR的輸出。結(jié)合案例實現(xiàn)一下兩個MR的嵌套填物。
** Tip:如果只關(guān)心多個MR嵌套的實現(xiàn)纹腌,可以直接跳到下面《多個MR嵌套源碼》章節(jié)查看 **

案例描述

根據(jù)log日志計算log中不同的IP地址數(shù)量是多少。測試數(shù)據(jù)如下圖所示:



該日志中每個字段都是用Tab建分割的滞磺。

案例分析

本次任務(wù)的目的是計算該日志不同的IP地址一共有多少升薯。實現(xiàn)這個目的的方式有很多種,但是本文的目的是借助改案例對兩個MapReduce之間的嵌套進(jìn)行總結(jié)的击困。

實現(xiàn)方法

該任務(wù)分為兩個MR過程涎劈,第一個MR(命名為MR1)負(fù)責(zé)將重復(fù)的ip地址去掉,然后將無重復(fù)的ip地址進(jìn)行輸出。第二個MR(命名為MR2)負(fù)責(zé)將MR1輸出的ip地址文件進(jìn)行匯總责语,然后將計算總數(shù)輸出炮障。

MR1階段


map過程

public class IpFilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, NullWritable>.Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String[] splits = line .split("\t");
        String ip = splits[3];
        context.write(new Text(ip), NullWritable.get());
    }
}

輸入的key和value是文本的行號和每行的內(nèi)容。
輸出的key是ip地址坤候,輸出的value為空類型胁赢。

shuffle過程

主要是針對map階段輸出的key進(jìn)行排序和分組,將相同的key分為一組白筹,并且將相同key的value放到同一個集合里面智末,所以不同的組絕對不會出現(xiàn)相同的ip地址,分好組之后將值傳遞給reduce徒河。注:該階段是hadoop系統(tǒng)自動完成的系馆,不需要程序員編程

reduce過程

 public class IpFilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) 
            throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
} 

由于經(jīng)過shuffle階段之后所有輸入的key都是不同的,也就是ip地址是無重復(fù)的顽照,所以可以直接輸出由蘑。

MR2階段


map過程

public class IpCountMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
            throws IOException, InterruptedException {
        //輸出的key為字符串"ip",這個可以隨便設(shè)置,只要保證每次輸出的key都一樣就行
        //目的是為了在shuffle階段分組
        context.write(new Text("ip"), NullWritable.get());
    }
}

shuffle過程

按照相同的key進(jìn)行分組代兵,由于map階段所有的key都一樣尼酿,所以最后只有一組。

reduce過程

public class IpCountReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values,
            Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        //用于存放ip地址總數(shù)量
        int count = 0;
        for (NullWritable v : values) {
            count ++;
        }
        context.write(new Text(count+""), NullWritable.get());
    }
}

流程圖

源碼

MR1 map源碼

//MR1 map源碼
package com.ipcount.mrmr;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class IpFilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, NullWritable>.Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String[] splits = line .split("\t");
        String ip = splits[3];
        context.write(new Text(ip), NullWritable.get());
    }
}

MR1 reduce源碼

package com.ipcount.mrmr;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class IpFilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) 
            throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
}

MR2 map源碼

package com.ipcount.mrmr;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class IpCountMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
            throws IOException, InterruptedException {
        context.write(new Text("ip"), NullWritable.get());
    }
}

MR2 reduce源碼

package com.ipcount.mrmr;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class IpCountReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values,
            Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        int count = 0;
        for (NullWritable v : values) {
            count ++;
        }
        context.write(new Text(count+""), NullWritable.get());
    }
}

多個MR嵌套源碼

package com.ipcount.mrmr;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Driver {

    public static void main(String[] args) throws Exception {

        JobConf conf = new JobConf(Driver.class);
        
        //job1設(shè)置
        Job job1 = new Job(conf, "job1");
        job1.setJarByClass(Driver.class);
        job1.setMapperClass(IpFilterMapper.class);
        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(NullWritable.class);
        
        job1.setReducerClass(IpFilterReducer.class);
        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job1, new Path(args[0]));
        FileOutputFormat.setOutputPath(job1, new Path(args[1]));
        
        //job1加入控制器
        ControlledJob ctrlJob1 = new ControlledJob(conf);
        ctrlJob1.setJob(job1);
        
        //job2設(shè)置
        Job job2 = new Job(conf, "job2");
        job2.setJarByClass(Driver.class);
        job2.setMapperClass(IpCountMapper.class);
        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(NullWritable.class);
        
        job2.setReducerClass(IpCountReducer.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job2, new Path(args[1]));
        FileOutputFormat.setOutputPath(job2, new Path(args[2]));
        
        //job2加入控制器
        ControlledJob ctrlJob2 = new ControlledJob(conf);
        ctrlJob2.setJob(job2);
        
        //設(shè)置作業(yè)之間的以來關(guān)系植影,job2的輸入以來job1的輸出
        ctrlJob2.addDependingJob(ctrlJob1);
        
        //設(shè)置主控制器裳擎,控制job1和job2兩個作業(yè)
        JobControl jobCtrl = new JobControl("myCtrl");
        //添加到總的JobControl里,進(jìn)行控制
        jobCtrl.addJob(ctrlJob1);
        jobCtrl.addJob(ctrlJob2);
        
        
        //在線程中啟動思币,記住一定要有這個
        Thread thread = new Thread(jobCtrl);
        thread.start();
        while (true) {
            if (jobCtrl.allFinished()) {
                System.out.println(jobCtrl.getSuccessfulJobList());
                jobCtrl.stop();
                break;
            }
        }
        
    }

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末鹿响,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子谷饿,更是在濱河造成了極大的恐慌惶我,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,376評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件博投,死亡現(xiàn)場離奇詭異指孤,居然都是意外死亡,警方通過查閱死者的電腦和手機贬堵,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,126評論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來结洼,“玉大人黎做,你說我怎么就攤上這事∷扇蹋” “怎么了蒸殿?”我有些...
    開封第一講書人閱讀 156,966評論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我宏所,道長酥艳,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,432評論 1 283
  • 正文 為了忘掉前任爬骤,我火速辦了婚禮充石,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘霞玄。我一直安慰自己骤铃,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,519評論 6 385
  • 文/花漫 我一把揭開白布坷剧。 她就那樣靜靜地躺著惰爬,像睡著了一般。 火紅的嫁衣襯著肌膚如雪惫企。 梳的紋絲不亂的頭發(fā)上撕瞧,一...
    開封第一講書人閱讀 49,792評論 1 290
  • 那天,我揣著相機與錄音狞尔,去河邊找鬼丛版。 笑死,一個胖子當(dāng)著我的面吹牛沪么,可吹牛的內(nèi)容都是我干的硼婿。 我是一名探鬼主播,決...
    沈念sama閱讀 38,933評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼禽车,長吁一口氣:“原來是場噩夢啊……” “哼寇漫!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起殉摔,我...
    開封第一講書人閱讀 37,701評論 0 266
  • 序言:老撾萬榮一對情侶失蹤州胳,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后逸月,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體栓撞,經(jīng)...
    沈念sama閱讀 44,143評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,488評論 2 327
  • 正文 我和宋清朗相戀三年碗硬,在試婚紗的時候發(fā)現(xiàn)自己被綠了瓤湘。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,626評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡恩尾,死狀恐怖弛说,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情翰意,我是刑警寧澤木人,帶...
    沈念sama閱讀 34,292評論 4 329
  • 正文 年R本政府宣布信柿,位于F島的核電站,受9級特大地震影響醒第,放射性物質(zhì)發(fā)生泄漏渔嚷。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,896評論 3 313
  • 文/蒙蒙 一稠曼、第九天 我趴在偏房一處隱蔽的房頂上張望形病。 院中可真熱鬧,春花似錦蒲列、人聲如沸窒朋。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽侥猩。三九已至,卻和暖如春抵赢,著一層夾襖步出監(jiān)牢的瞬間欺劳,已是汗流浹背铅鲤。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留邢享,地道東北人鹏往。 一個月前我還...
    沈念sama閱讀 46,324評論 2 360
  • 正文 我出身青樓伊履,卻偏偏與公主長得像,于是被迫代替她去往敵國和親款违。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,494評論 2 348

推薦閱讀更多精彩內(nèi)容