圖解mapreduce原理全剖析

mapreduce原理全剖析--段氏六脈神劍.png

1常熙、mapTask調(diào)用InputFormat再調(diào)用RecourReader的read()方法來讀取數(shù)據(jù),獲得key墓贿、value,mapreduce通過InputFormat來解耦
2舱馅、read()方法依靠一次讀取一行的邏輯來讀取原始文件的數(shù)據(jù),返回key干毅、value,mapTask會(huì)將其交給自定義的Mapper
3渠鸽、map方法我們會(huì)調(diào)用context.write方法來輸出數(shù)據(jù)到OutPutCollector類,OutPutCollector會(huì)將數(shù)據(jù)放到(內(nèi)存中存放 默認(rèn)MR.SORT.MB:100MB可以自己配置,一般不會(huì)放滿默認(rèn)80%那婉,這里面還要留有空間排序默認(rèn)20%)環(huán)形緩沖區(qū)(其實(shí)就是一個(gè)bite()數(shù)組,如果寫滿了,那么就會(huì)一邊寫一邊將開始的數(shù)據(jù)回收呻率,然后繼續(xù)寫到回收后的位置上逻悠,形成了環(huán)形緩沖區(qū))
4童谒、環(huán)形緩沖區(qū)的溢出的數(shù)據(jù)溢出之前會(huì)通過Hashpartioner進(jìn)行分區(qū)蔫饰、排序(默認(rèn)是快速排序法key.compareTO),會(huì)通過spiller寫入到mapTask工作目錄的本地文件(所有溢寫文件分區(qū)且區(qū)內(nèi)有序)
5、所有溢出的文件會(huì)做歸并排序形成mapTask的最終結(jié)果文件,一個(gè)mapTask對(duì)應(yīng)一個(gè)最終結(jié)果文件盛嘿,形成幾個(gè)分區(qū)就會(huì)有對(duì)應(yīng)幾個(gè)reduceTask。reduceTask的個(gè)數(shù)由配置文件或者參數(shù)設(shè)置,只要不設(shè)置自定義partitioner,那么這里的分區(qū)會(huì)動(dòng)態(tài)適配reduceTask個(gè)數(shù)砰琢。如果設(shè)置了自定義partitioner,那么就需要提前設(shè)置對(duì)應(yīng)的reduceTask的個(gè)數(shù)
6、每個(gè)reduceTask都會(huì)到每一個(gè)mapTask的節(jié)點(diǎn)去下載分區(qū)文件到reduceTask的本地磁盤工作目錄
7歧强、為了保證最后的結(jié)果有序肤京,reduceTask任務(wù)A需要再次從所有mapTask下載到的對(duì)應(yīng)文件重新進(jìn)行歸并排序
8、reduceTask的內(nèi)部邏輯寫在reducer的reduce(key,values)方法,通過調(diào)用GroupingComparaor(key,netxtk)或者自定義GroupingComparaor來判斷哪些key是一組熬荆,形成key和values舟山。
9、reducer的reduce方法最后通過context.writer(key卤恳,v)寫到輸出文件(所有reduceTask的輸出文件都有序)累盗,輸出路徑由提交任務(wù)時(shí)的參數(shù)決定,默認(rèn)文件名part-r-00000
10突琳、如果設(shè)置了combiner若债,那么溢寫排序文件會(huì)調(diào)用,歸并排序時(shí)也會(huì)combiner拆融,將加快shluffer的效率蠢琳,但是一般情況下不建議使用,如果符合條件下一定要使用,也可以直接指定reducer為combiner镜豹,沒必要重復(fù)寫代碼

package cn.itcast.bigdata.mr.wcdemo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
  * 輸入為map的輸出
 */
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int count=0;
        for(IntWritable v: values){
            
            count += v.get();
        }
        
        context.write(key, new IntWritable(count));
    }
}

***紅色方框的類和方法都可以由程序員自定義
mapper類

package cn.itcast.bigdata.mr.wcdemo;

import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * KEYIN: 默認(rèn)情況下傲须,是mr框架所讀到的一行文本的起始偏移量,Long,
 * 但是在hadoop中有自己的更精簡(jiǎn)的序列化接口趟脂,所以不直接用Long泰讽,而用LongWritable
 * 
 * VALUEIN:默認(rèn)情況下,是mr框架所讀到的一行文本的內(nèi)容昔期,String已卸,同上,用Text
 * 
 * KEYOUT:是用戶自定義邏輯處理完成之后輸出數(shù)據(jù)中的key硼一,在此處是單詞累澡,String,同上般贼,用Text
 * VALUEOUT:是用戶自定義邏輯處理完成之后輸出數(shù)據(jù)中的value永乌,在此處是單詞次數(shù),Integer具伍,同上翅雏,用IntWritable
 * 
 * @author
 *
 */

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

    /**
     * map階段的業(yè)務(wù)邏輯就寫在自定義的map()方法中
     * maptask會(huì)對(duì)每一行輸入數(shù)據(jù)調(diào)用一次我們自定義的map()方法
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        
        //將maptask傳給我們的文本內(nèi)容先轉(zhuǎn)換成String
        String line = value.toString();
        //根據(jù)空格將這一行切分成單詞
        String[] words = line.split(" ");
        
        //將單詞輸出為<單詞,1>
        for(String word:words){
            //將單詞作為key人芽,將次數(shù)1作為value望几,以便于后續(xù)的數(shù)據(jù)分發(fā),可以根據(jù)單詞分發(fā)萤厅,以便于相同單詞會(huì)到相同的reduce task
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

reducer類

package cn.itcast.bigdata.mr.wcdemo;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * KEYIN, VALUEIN 對(duì)應(yīng)  mapper輸出的KEYOUT,VALUEOUT類型對(duì)應(yīng)
 * 
 * KEYOUT, VALUEOUT 是自定義reduce邏輯處理結(jié)果的輸出數(shù)據(jù)類型
 * KEYOUT是單詞
 * VLAUEOUT是總次數(shù)
 * @author
 *
 */
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

    /**
     * <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1>
     * <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1>
     * <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1>
     * 入?yún)ey橄抹,是一組相同單詞kv對(duì)的key
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count=0;
        /*Iterator<IntWritable> iterator = values.iterator();
        while(iterator.hasNext()){
            count += iterator.next().get();
        }*/
        for(IntWritable value:values){
        
            count += value.get();
        }
        context.write(key, new IntWritable(count));
    }
}

Driver類

package cn.itcast.bigdata.mr.wcdemo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 相當(dāng)于一個(gè)yarn集群的客戶端
 * 需要在此封裝我們的mr程序的相關(guān)運(yùn)行參數(shù)靴迫,指定jar包
 * 最后提交給yarn
 * @author
 *
 */
public class WordcountDriver {
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        
        //是否運(yùn)行為本地模式,就是看這個(gè)參數(shù)值是否為local楼誓,默認(rèn)就是local
        /*conf.set("mapreduce.framework.name", "local");*/
        
        //本地模式運(yùn)行mr程序時(shí)玉锌,輸入輸出的數(shù)據(jù)可以在本地,也可以在hdfs上
        //到底在哪里疟羹,就看以下兩行配置你用哪行主守,默認(rèn)就是file:///
        /*conf.set("fs.defaultFS", "hdfs://mini1:9000/");*/
        /*conf.set("fs.defaultFS", "file:///");*/
        
        
        
        //運(yùn)行集群模式,就是把程序提交到y(tǒng)arn中去運(yùn)行
        //要想運(yùn)行為集群模式榄融,以下3個(gè)參數(shù)要指定為集群上的值
        /*conf.set("mapreduce.framework.name", "yarn");
        conf.set("yarn.resourcemanager.hostname", "mini1");
        conf.set("fs.defaultFS", "hdfs://mini1:9000/");*/
        Job job = Job.getInstance(conf);
        
        job.setJar("c:/wc.jar");
        //指定本程序的jar包所在的本地路徑
        /*job.setJarByClass(WordcountDriver.class);*/
        
        //指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);
        
        //指定mapper輸出數(shù)據(jù)的kv類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        //指定最終輸出的數(shù)據(jù)的kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        //指定需要使用combiner参淫,以及用哪個(gè)類作為combiner的邏輯
        /*job.setCombinerClass(WordcountCombiner.class);*/
        job.setCombinerClass(WordcountReducer.class);
        
        //如果不設(shè)置InputFormat,它默認(rèn)用的是TextInputformat.class--CombineTextInputFormat是hdfs中要處理的文件都是大量小文件的情況下---主要是更改了getsplits()方法
        job.setInputFormatClass(CombineTextInputFormat.class);
        CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);//最大切片大小,如果超過最大值愧杯,會(huì)被切分開
        CombineTextInputFormat.setMinInputSplitSize(job, 2097152);//最小切片大小涎才,那么切下來實(shí)際上到底有多大?要考慮具體情況力九,本節(jié)點(diǎn)文件耍铜、跨機(jī)架文件、跨機(jī)房文件
        
        //指定job的輸入原始文件所在目錄
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job的輸出結(jié)果所在目錄
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        //將job中配置的相關(guān)參數(shù)跌前,以及job所用的java類所在的jar包业扒,提交給yarn去運(yùn)行
        /*job.submit();*/
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
        
    }
    

}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市舒萎,隨后出現(xiàn)的幾起案子程储,更是在濱河造成了極大的恐慌,老刑警劉巖臂寝,帶你破解...
    沈念sama閱讀 210,978評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件章鲤,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡咆贬,警方通過查閱死者的電腦和手機(jī)败徊,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來掏缎,“玉大人皱蹦,你說我怎么就攤上這事【祢冢” “怎么了沪哺?”我有些...
    開封第一講書人閱讀 156,623評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)酌儒。 經(jīng)常有香客問我辜妓,道長(zhǎng),這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,324評(píng)論 1 282
  • 正文 為了忘掉前任籍滴,我火速辦了婚禮酪夷,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘孽惰。我一直安慰自己晚岭,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,390評(píng)論 5 384
  • 文/花漫 我一把揭開白布勋功。 她就那樣靜靜地躺著坦报,像睡著了一般。 火紅的嫁衣襯著肌膚如雪酝润。 梳的紋絲不亂的頭發(fā)上燎竖,一...
    開封第一講書人閱讀 49,741評(píng)論 1 289
  • 那天璃弄,我揣著相機(jī)與錄音要销,去河邊找鬼。 笑死夏块,一個(gè)胖子當(dāng)著我的面吹牛疏咐,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播脐供,決...
    沈念sama閱讀 38,892評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼浑塞,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了政己?” 一聲冷哼從身側(cè)響起酌壕,我...
    開封第一講書人閱讀 37,655評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎歇由,沒想到半個(gè)月后卵牍,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,104評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡沦泌,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年糊昙,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片谢谦。...
    茶點(diǎn)故事閱讀 38,569評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡释牺,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出回挽,到底是詐尸還是另有隱情没咙,我是刑警寧澤,帶...
    沈念sama閱讀 34,254評(píng)論 4 328
  • 正文 年R本政府宣布千劈,位于F島的核電站镜撩,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜袁梗,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,834評(píng)論 3 312
  • 文/蒙蒙 一宜鸯、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧遮怜,春花似錦淋袖、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至陌凳,卻和暖如春剥懒,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背合敦。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評(píng)論 1 264
  • 我被黑心中介騙來泰國打工初橘, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人充岛。 一個(gè)月前我還...
    沈念sama閱讀 46,260評(píng)論 2 360
  • 正文 我出身青樓保檐,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國和親崔梗。 傳聞我的和親對(duì)象是個(gè)殘疾皇子夜只,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,446評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 1. mapreduce 的運(yùn)行機(jī)制(Hadoop 2) 首先看下 mapreduce 在 yarn 中的執(zhí)行流程...
    Java旅行者閱讀 668評(píng)論 0 3
  • MapReduce計(jì)算模型 一、MR執(zhí)行流程 最簡(jiǎn)單過程:map --> reduce 定制了Partitione...
    yanzhelee閱讀 868評(píng)論 0 0
  • 思考問題 MapReduce總結(jié) MapReduce MapReduce的定義MapReduce是一種編程模型蒜魄, ...
    Sakura_P閱讀 936評(píng)論 0 1
  • wordcount 關(guān)于切片劃分maptask任務(wù)扔亥,由客戶端(提交job)完成,寫入文件交給mr appmaste...
    pamperxg閱讀 1,106評(píng)論 0 0
  • 文/歡謔 進(jìn)入大學(xué)已經(jīng)兩個(gè)月谈为,遇見了許多沒有見過卻能...
    歡謔閱讀 211評(píng)論 0 1