Hadoop之MapReduce

Hadoop大數(shù)據(jù)技術(shù)體系

框架

參考http://www.reibang.com/p/17bee8316848

MapReduce

框圖

從wordcount開始

參考:wordcount實(shí)例

  • Map: for each (k,v) ---> produce new set of (k,v) pairs
  • Reduce: produce one (k,v) for each distinct key
    比如wordcount中:


    map過程

    reduce過程

    代碼:

package com.felix;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
/**
 * 
 * 描述:WordCount explains by Felix
 * @author Hadoop Dev Group
 */
public class WordCount
{
    /**
     * MapReduceBase類:實(shí)現(xiàn)了Mapper和Reducer接口的基類(其中的方法只是實(shí)現(xiàn)接口忙迁,而未作任何事情)
     * Mapper接口:
     * WritableComparable接口:實(shí)現(xiàn)WritableComparable的類可以相互比較。所有被用作key的類應(yīng)該實(shí)現(xiàn)此接口秽荤。
     * Reporter 則可用于報(bào)告整個(gè)應(yīng)用的運(yùn)行進(jìn)度,本例中未使用控漠。 
     * 
     */
    public static class Map extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, IntWritable>
    {
        /**
         * LongWritable, IntWritable, Text 均是 Hadoop 中實(shí)現(xiàn)的用于封裝 Java 數(shù)據(jù)類型的類无拗,這些類實(shí)現(xiàn)了WritableComparable接口,
         * 都能夠被串行化從而便于在分布式環(huán)境中進(jìn)行數(shù)據(jù)交換遣总,你可以將它們分別視為long,int,String 的替代品昌抠。
         */
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        
        /**
         * Mapper接口中的map方法:
         * void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)
         * 映射一個(gè)單個(gè)的輸入k/v對(duì)到一個(gè)中間的k/v對(duì)
         * 輸出對(duì)不需要和輸入對(duì)是相同的類型患朱,輸入對(duì)可以映射到0個(gè)或多個(gè)輸出對(duì)。
         * OutputCollector接口:收集Mapper和Reducer輸出的<k,v>對(duì)扰魂。
         * OutputCollector接口的collect(k, v)方法:增加一個(gè)(k,v)對(duì)到output
         */
        public void map(LongWritable key, Text value,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException
        {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens())
            {
                word.set(tokenizer.nextToken());
                output.collect(word, one);
            }
        }
    }
    public static class Reduce extends MapReduceBase implements
            Reducer<Text, IntWritable, Text, IntWritable>
    {
        public void reduce(Text key, Iterator<IntWritable> values,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException
        {
            int sum = 0;
            while (values.hasNext())
            {
                sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }
    }
    public static void main(String[] args) throws Exception
    {
        /**
         * JobConf:map/reduce的job配置類麦乞,向hadoop框架描述map-reduce執(zhí)行的工作
         * 構(gòu)造方法:JobConf()蕴茴、JobConf(Class exampleClass)、JobConf(Configuration conf)等
         */
        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("wordcount");           //設(shè)置一個(gè)用戶定義的job名稱
        conf.setOutputKeyClass(Text.class);    //為job的輸出數(shù)據(jù)設(shè)置Key類
        conf.setOutputValueClass(IntWritable.class);   //為job輸出設(shè)置value類
        conf.setMapperClass(Map.class);         //為job設(shè)置Mapper類
        conf.setCombinerClass(Reduce.class);      //為job設(shè)置Combiner類
        conf.setReducerClass(Reduce.class);        //為job設(shè)置Reduce類
        conf.setInputFormat(TextInputFormat.class);    //為map-reduce任務(wù)設(shè)置InputFormat實(shí)現(xiàn)類
        conf.setOutputFormat(TextOutputFormat.class);  //為map-reduce任務(wù)設(shè)置OutputFormat實(shí)現(xiàn)類
        /**
         * InputFormat描述map-reduce中對(duì)job的輸入定義
         * setInputPaths():為map-reduce job設(shè)置路徑數(shù)組作為輸入列表
         * setInputPath():為map-reduce job設(shè)置路徑數(shù)組作為輸出列表
         */
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        JobClient.runJob(conf);         //運(yùn)行一個(gè)job
    }
}

MapReduce框架組成

參考:MapReduce框架組成

MapReduce主要包括JobClient姐直、JobTracker倦淀、TaskTracker、HDFS四個(gè)部分声畏。


mapreduce框架
  1. JobClient:配置參數(shù)Configuration撞叽,并打包成jar文件存儲(chǔ)在HDFS上,將文件路徑提交給JobTracker的master服務(wù)插龄,然后由master創(chuàng)建每個(gè)task將它們分發(fā)到各個(gè)TaskTracker服務(wù)中去執(zhí)行愿棋。
  2. JobTracker:這是一個(gè)master服務(wù),程序啟動(dòng)后均牢,JobTracker負(fù)責(zé)資源監(jiān)控和作業(yè)調(diào)度糠雨。JobTracker監(jiān)控所有的TaskTracker和job的健康狀況,一旦發(fā)生失敗徘跪,即將之轉(zhuǎn)移到其他節(jié)點(diǎn)上甘邀,同時(shí)JobTracker會(huì)跟蹤任務(wù)的執(zhí)行進(jìn)度、資源使用量等信息垮庐,并將這些信息告訴任務(wù)調(diào)度器松邪,而調(diào)度器會(huì)在資源出現(xiàn)空閑時(shí),選擇合適的任務(wù)使用這些資源哨查。在Hadoop 中逗抑,任務(wù)調(diào)度器是一個(gè)可插拔的模塊,用戶可以根據(jù)自己的需要設(shè)計(jì)相應(yīng)的調(diào)度器寒亥。
  3. TaskTracker:運(yùn)行在多個(gè)節(jié)點(diǎn)上的slaver服務(wù)邮府。TaskTracker主動(dòng)與JobTracker通信接受作業(yè),并負(fù)責(zé)直接執(zhí)行每個(gè)任務(wù)护盈。TaskTracker 會(huì)周期性地通過Heartbeat 將本節(jié)點(diǎn)上資源的使用情況和任務(wù)的運(yùn)行進(jìn)度匯報(bào)給JobTracker挟纱,同時(shí)接收J(rèn)obTracker 發(fā)送過來的命令并執(zhí)行相應(yīng)的操作(如啟動(dòng)新任務(wù)、殺死任務(wù)等)腐宋。TaskTracker 使用“slot”等量劃分本節(jié)點(diǎn)上的資源量√垂欤“slot”代表計(jì)算資源(CPU胸竞、內(nèi)存等)。一個(gè)Task 獲取到一個(gè)slot 后才有機(jī)會(huì)運(yùn)行参萄,而Hadoop 調(diào)度器的作用就是將各個(gè)TaskTracker 上的空閑slot 分配給Task 使用卫枝。slot 分為Map slot 和Reduce slot 兩種,分別供MapTask 和Reduce Task 使用讹挎。TaskTracker 通過slot 數(shù)目(可配置參數(shù))限定Task 的并發(fā)度校赤。
    Task分為Map Task和Reduce Task兩種吆玖,均由TaskTracker啟動(dòng)。
    邏輯角度分析作業(yè)運(yùn)行順序:輸入分片(input split)马篮、map階段沾乘、combiner階段、shuffle階段浑测、reduce階段翅阵。

邏輯流程

  • input split:在map計(jì)算之前,程序會(huì)根據(jù)輸入文件計(jì)算split迁央,每個(gè)input split針對(duì)一個(gè)map任務(wù)掷匠。input split存儲(chǔ)的并非是數(shù)據(jù)本身,而是一個(gè)分片長度和一個(gè)記錄數(shù)據(jù)的位置的數(shù)組岖圈。
  • map階段:即執(zhí)行map函數(shù)讹语。
  • combiner階段:這是一個(gè)可選擇的函數(shù),實(shí)質(zhì)上是一種reduce操作蜂科。combiner是map的后續(xù)操作顽决,主要是在map計(jì)算出中間文件前做一個(gè)簡單的合并重復(fù)key值的操作。
  • shuffle階段:指從map輸出開始崇摄,包括系統(tǒng)執(zhí)行排序即傳送map輸出到reduce作為輸入的過程擎值。另外針對(duì)map輸出的key進(jìn)行排序又叫sort階段。map端shuffle逐抑,簡單來說就是利用combiner對(duì)數(shù)據(jù)進(jìn)行預(yù)排序鸠儿,利用內(nèi)存緩沖區(qū)來完成。reduce端的shuffle包括復(fù)制數(shù)據(jù)和歸并數(shù)據(jù)厕氨,最終產(chǎn)生一個(gè)reduce輸入文件进每。shuffle過程有許多可調(diào)優(yōu)的參數(shù)來提高M(jìn)apReduce的性能,其總原則就是給shuffle過程盡量多的內(nèi)存空間命斧。
  • reduce階段:即執(zhí)行reduce函數(shù)并存到hdfs文件系統(tǒng)中田晚。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市国葬,隨后出現(xiàn)的幾起案子贤徒,更是在濱河造成了極大的恐慌,老刑警劉巖汇四,帶你破解...
    沈念sama閱讀 221,695評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件接奈,死亡現(xiàn)場離奇詭異,居然都是意外死亡通孽,警方通過查閱死者的電腦和手機(jī)序宦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來背苦,“玉大人互捌,你說我怎么就攤上這事潘明。” “怎么了秕噪?”我有些...
    開封第一講書人閱讀 168,130評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵钳降,是天一觀的道長。 經(jīng)常有香客問我巢价,道長牲阁,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,648評(píng)論 1 297
  • 正文 為了忘掉前任壤躲,我火速辦了婚禮城菊,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘碉克。我一直安慰自己凌唬,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,655評(píng)論 6 397
  • 文/花漫 我一把揭開白布漏麦。 她就那樣靜靜地躺著客税,像睡著了一般。 火紅的嫁衣襯著肌膚如雪撕贞。 梳的紋絲不亂的頭發(fā)上更耻,一...
    開封第一講書人閱讀 52,268評(píng)論 1 309
  • 那天,我揣著相機(jī)與錄音捏膨,去河邊找鬼秧均。 笑死,一個(gè)胖子當(dāng)著我的面吹牛号涯,可吹牛的內(nèi)容都是我干的目胡。 我是一名探鬼主播,決...
    沈念sama閱讀 40,835評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼链快,長吁一口氣:“原來是場噩夢啊……” “哼誉己!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起域蜗,我...
    開封第一講書人閱讀 39,740評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤巨双,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后霉祸,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體炉峰,經(jīng)...
    沈念sama閱讀 46,286評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,375評(píng)論 3 340
  • 正文 我和宋清朗相戀三年脉执,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片戒劫。...
    茶點(diǎn)故事閱讀 40,505評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡半夷,死狀恐怖婆廊,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情巫橄,我是刑警寧澤淘邻,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站湘换,受9級(jí)特大地震影響宾舅,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜彩倚,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,873評(píng)論 3 333
  • 文/蒙蒙 一筹我、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧帆离,春花似錦蔬蕊、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至们妥,卻和暖如春猜扮,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背监婶。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評(píng)論 1 272
  • 我被黑心中介騙來泰國打工旅赢, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人压储。 一個(gè)月前我還...
    沈念sama閱讀 48,921評(píng)論 3 376
  • 正文 我出身青樓鲜漩,卻偏偏與公主長得像,于是被迫代替她去往敵國和親集惋。 傳聞我的和親對(duì)象是個(gè)殘疾皇子孕似,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,515評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容