MapReducer之Map輸入


在整個(gè)MapReducer階段中,Map輸入的文件管嬉,Reducer輸出的文件都是存儲(chǔ)在分布式文件系統(tǒng)中童本,但是Map任務(wù)處理的中間結(jié)果需要保存在本地磁盤,所以Map階段需要考慮數(shù)據(jù)的局限性(即計(jì)算向數(shù)據(jù)靠攏)嫡锌。

讀源碼

  • InputFormat

    MapReducer框架使用InputFormat作為數(shù)據(jù)的預(yù)處理模塊

public abstract class InputFormat<K, V> {
    public InputFormat() {
    }
  // 對(duì)數(shù)據(jù)進(jìn)行邏輯分片,得到一個(gè)InputSplit的列表
    public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;
// 讀取分片琳钉,轉(zhuǎn)換成key/value形式
    public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}

  • InputSpli(輸入分片)

    InputSplit是對(duì)文件進(jìn)行預(yù)處理的輸入單位,是邏輯切分蛛倦,只是記錄了要處理數(shù)據(jù)的位置和長(zhǎng)度
    根據(jù)輸入文件計(jì)算分片大小歌懒,每個(gè)分片任務(wù)對(duì)應(yīng)著一個(gè)Map
    分片的大小范圍可以在mapred-site.xml中設(shè)置,那在每次任務(wù)中分片大小又是多大呢溯壶?
    在FileInputFormat中首先比較文件大小和最大分塊大屑霸怼(maxSize),得到一個(gè)最小值且改,然后和最小分塊大醒樯铡(minSize)進(jìn)行比較得到一個(gè)最大值,就是分塊大小又跛。

public abstract class InputSplit {
    public InputSplit() {
    }
    //獲得當(dāng)前spllit長(zhǎng)度
    public abstract long getLength() throws IOException, InterruptedException;
    // 獲取節(jié)點(diǎn)地址列表(每個(gè)split的儲(chǔ)存地址不同)
    public abstract String[] getLocations() throws IOException, InterruptedException;
    // 如果是空值碍拆,則全部存儲(chǔ)在磁盤上
    @Evolving
    public SplitLocationInfo[] getLocationInfo() throws IOException {
        return null;
    }
}
  • RecordReader

    RecordReader對(duì)InputSplit中的數(shù)據(jù)進(jìn)行處理,加載數(shù)據(jù)并且轉(zhuǎn)換成適合Map任務(wù)讀取的鍵值形式

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
    public RecordReader() {
    }
    // 初始化
    public abstract void initialize(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
    // 判斷下一個(gè)key/value的存在
    public abstract boolean nextKeyValue() throws IOException, InterruptedException;
    // 獲得當(dāng)前key
    public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
    // 獲得當(dāng)前value
    public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
    // 獲得當(dāng)前處理進(jìn)度(0.0~1.0)
    public abstract float getProgress() throws IOException, InterruptedException;
    // 關(guān)閉RecordReader
    public abstract void close() throws IOException;
}

  • Mapper

    根據(jù)用戶定義的映射規(guī)則慨蓝,輸出一系列<key,value>作為中間結(jié)果感混,輸入的key和value類需要支持序列化操作,即繼承Writable礼烈,key的類同時(shí)也必須實(shí)現(xiàn)WritableComparable
    默認(rèn)輸入key類型 LongWritable 記錄數(shù)據(jù)分片的偏移位置

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public Mapper() {
    }
//預(yù)處理模塊
    protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }
//重寫模塊以滿足業(yè)務(wù)需求
    protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        context.write(key, value);
    }
// 掃尾工作
    protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }
//驅(qū)動(dòng)
    public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        this.setup(context);
        // 對(duì)輸入的每一對(duì)key/value調(diào)用Map方法
        try {
            while(context.nextKeyValue()) {
                this.map(context.getCurrentKey(), context.getCurrentValue(), context);
            }
        } finally {
            this.cleanup(context);
        }

    }
// 設(shè)置Context
    public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        public Context() {
        }
    }
}

改寫LongWrite為Text

想要改寫首先我們要看Map默認(rèn)的輸入類型設(shè)置

job.setInputFormatClass(TextInputFormat.class);

將輸入key改變成Text我們要設(shè)置

job.setInputFormatClass(KeyValueTextInputFormat.class);

我們先來(lái)看一下TextInputFormat的源碼

public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
    public TextInputFormat() {
    }
    // 定義文本的讀取方式弧满,返回一個(gè)RecoredReader <LongWritable,Text>
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
        String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
        byte[] recordDelimiterBytes = null;
        if (null != delimiter) {
            recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
        }
        // 返回一個(gè)LineRecordReader(繼承自RecordReader)
        return new LineRecordReader(recordDelimiterBytes);
    }
    // 判斷是否分片,true進(jìn)行分片
    protected boolean isSplitable(JobContext context, Path file) {
      // 判斷是否加入壓縮
        CompressionCodec codec = (new CompressionCodecFactory(context.getConfiguration())).getCodec(file);
        return null == codec ? true : codec instanceof SplittableCompressionCodec;
    }
}

在TextInputFormat中重寫了FileInputFormat類中的isSplitable()并進(jìn)行了壓縮判斷

protected boolean isSplitable(JobContext context, Path filename) {
     return true;
  }

我們?cè)倏匆幌翶eyValueTextInputFormat的源碼

public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {
    public KeyValueTextInputFormat() {
    }
    // 判斷是否分片此熬,并進(jìn)行壓縮判斷
    protected boolean isSplitable(JobContext context, Path file) {
        CompressionCodec codec = (new CompressionCodecFactory(context.getConfiguration())).getCodec(file);
        return null == codec ? true : codec instanceof SplittableCompressionCodec;
    }
    //  定義文本的讀取方式庭呜,返回一個(gè)RecoredReader <Text,Text>
    public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
        context.setStatus(genericSplit.toString());
        return new KeyValueLineRecordReader(context.getConfiguration());
    }
}

總結(jié):FileInputFormat是所以使用文件作為其數(shù)據(jù)源的InputFormat的子類,他有6個(gè)子類分別是

  • TextInputFormat 每次讀入一行數(shù)據(jù) key:該行的偏移量 value: 行內(nèi)容
  • KeyValueTextInputFormat 按照分隔符分割為key和value犀忱,如果不存在key則是一行數(shù)據(jù)募谎,value為空
    分隔符設(shè)置,默認(rèn)為“\t”

    conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");//設(shè)置“峡碉,”為分隔符

  • NLineInputFormat 可以實(shí)現(xiàn)Mapper收到固定行數(shù)

    NLineInputFormat.setNumLinesPerSplit(job, 20); // 設(shè)置行數(shù)

  • SequenceFileInputFormat 為Hadoop順序文件設(shè)計(jì)近哟,存儲(chǔ)二進(jìn)制<key,value>
  • CombineFileInputFormat 為小文件設(shè)計(jì),可以將多個(gè)文件打包到一個(gè)分片中
  • FixedLengthInputFormat

多路徑輸入

我們可以通過(guò)設(shè)置setInputPaths()設(shè)置多個(gè)路徑

FileInputFormat.setInputPaths(new Path(),new Path(),new Path());

也可以通過(guò)設(shè)置MultipleInputs來(lái)實(shí)現(xiàn)多個(gè)文件指定不同的Mapper

MultipleInputs.addInputPath(job, new Path(""), TextInputFormat.class,MapA.class);
MultipleInputs.addInputPath(job, new Path(""), TextInputFormat.class,MapB.class);

小文件處理

  • 文件壓縮
  • 自定義分片
    1. 繼承FileInputFormat類
    2. 重寫里面的isSplitable 改成返回false 取消默認(rèn)分片規(guī)則
    3. 重寫createRecordReader 方法 指定新的規(guī)則
    4. 編寫一個(gè)類繼承RecordReader
    5. 重寫里面的方法
    6. 設(shè)置你定義的InputFormat
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末鲫寄,一起剝皮案震驚了整個(gè)濱河市吉执,隨后出現(xiàn)的幾起案子疯淫,更是在濱河造成了極大的恐慌,老刑警劉巖戳玫,帶你破解...
    沈念sama閱讀 211,639評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件熙掺,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡咕宿,警方通過(guò)查閱死者的電腦和手機(jī)币绩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)府阀,“玉大人缆镣,你說(shuō)我怎么就攤上這事∈哉悖” “怎么了董瞻?”我有些...
    開封第一講書人閱讀 157,221評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)田巴。 經(jīng)常有香客問(wèn)我钠糊,道長(zhǎng),這世上最難降的妖魔是什么壹哺? 我笑而不...
    開封第一講書人閱讀 56,474評(píng)論 1 283
  • 正文 為了忘掉前任抄伍,我火速辦了婚禮,結(jié)果婚禮上管宵,老公的妹妹穿的比我還像新娘截珍。我一直安慰自己,他們只是感情好啄糙,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,570評(píng)論 6 386
  • 文/花漫 我一把揭開白布笛臣。 她就那樣靜靜地躺著,像睡著了一般隧饼。 火紅的嫁衣襯著肌膚如雪沈堡。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,816評(píng)論 1 290
  • 那天燕雁,我揣著相機(jī)與錄音诞丽,去河邊找鬼。 笑死拐格,一個(gè)胖子當(dāng)著我的面吹牛僧免,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播捏浊,決...
    沈念sama閱讀 38,957評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼懂衩,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起浊洞,我...
    開封第一講書人閱讀 37,718評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤牵敷,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后法希,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體枷餐,經(jīng)...
    沈念sama閱讀 44,176評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,511評(píng)論 2 327
  • 正文 我和宋清朗相戀三年苫亦,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了毛肋。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,646評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡屋剑,死狀恐怖润匙,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情唉匾,我是刑警寧澤趁桃,帶...
    沈念sama閱讀 34,322評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站肄鸽,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏油啤。R本人自食惡果不足惜典徘,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,934評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望益咬。 院中可真熱鬧逮诲,春花似錦、人聲如沸幽告。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)冗锁。三九已至齐唆,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間冻河,已是汗流浹背箍邮。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留叨叙,地道東北人锭弊。 一個(gè)月前我還...
    沈念sama閱讀 46,358評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像擂错,于是被迫代替她去往敵國(guó)和親味滞。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,514評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容