在整個(gè)MapReducer階段中,Map輸入的文件管嬉,Reducer輸出的文件都是存儲(chǔ)在分布式文件系統(tǒng)中童本,但是Map任務(wù)處理的中間結(jié)果需要保存在本地磁盤,所以Map階段需要考慮數(shù)據(jù)的局限性(即計(jì)算向數(shù)據(jù)靠攏)嫡锌。
讀源碼
-
InputFormat
MapReducer框架使用InputFormat作為數(shù)據(jù)的預(yù)處理模塊
public abstract class InputFormat<K, V> {
public InputFormat() {
}
// 對(duì)數(shù)據(jù)進(jìn)行邏輯分片,得到一個(gè)InputSplit的列表
public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;
// 讀取分片琳钉,轉(zhuǎn)換成key/value形式
public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}
-
InputSpli(輸入分片)
InputSplit是對(duì)文件進(jìn)行預(yù)處理的輸入單位,是邏輯切分蛛倦,只是記錄了要處理數(shù)據(jù)的位置和長(zhǎng)度
根據(jù)輸入文件計(jì)算分片大小歌懒,每個(gè)分片任務(wù)對(duì)應(yīng)著一個(gè)Map
分片的大小范圍可以在mapred-site.xml中設(shè)置,那在每次任務(wù)中分片大小又是多大呢溯壶?
在FileInputFormat中首先比較文件大小和最大分塊大屑霸怼(maxSize),得到一個(gè)最小值且改,然后和最小分塊大醒樯铡(minSize)進(jìn)行比較得到一個(gè)最大值,就是分塊大小又跛。
public abstract class InputSplit {
public InputSplit() {
}
//獲得當(dāng)前spllit長(zhǎng)度
public abstract long getLength() throws IOException, InterruptedException;
// 獲取節(jié)點(diǎn)地址列表(每個(gè)split的儲(chǔ)存地址不同)
public abstract String[] getLocations() throws IOException, InterruptedException;
// 如果是空值碍拆,則全部存儲(chǔ)在磁盤上
@Evolving
public SplitLocationInfo[] getLocationInfo() throws IOException {
return null;
}
}
-
RecordReader
RecordReader對(duì)InputSplit中的數(shù)據(jù)進(jìn)行處理,加載數(shù)據(jù)并且轉(zhuǎn)換成適合Map任務(wù)讀取的鍵值形式
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
public RecordReader() {
}
// 初始化
public abstract void initialize(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
// 判斷下一個(gè)key/value的存在
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
// 獲得當(dāng)前key
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
// 獲得當(dāng)前value
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
// 獲得當(dāng)前處理進(jìn)度(0.0~1.0)
public abstract float getProgress() throws IOException, InterruptedException;
// 關(guān)閉RecordReader
public abstract void close() throws IOException;
}
-
Mapper
根據(jù)用戶定義的映射規(guī)則慨蓝,輸出一系列<key,value>作為中間結(jié)果感混,輸入的key和value類需要支持序列化操作,即繼承Writable礼烈,key的類同時(shí)也必須實(shí)現(xiàn)WritableComparable
默認(rèn)輸入key類型 LongWritable 記錄數(shù)據(jù)分片的偏移位置
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public Mapper() {
}
//預(yù)處理模塊
protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}
//重寫模塊以滿足業(yè)務(wù)需求
protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
context.write(key, value);
}
// 掃尾工作
protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}
//驅(qū)動(dòng)
public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
this.setup(context);
// 對(duì)輸入的每一對(duì)key/value調(diào)用Map方法
try {
while(context.nextKeyValue()) {
this.map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
this.cleanup(context);
}
}
// 設(shè)置Context
public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public Context() {
}
}
}
改寫LongWrite為Text
想要改寫首先我們要看Map默認(rèn)的輸入類型設(shè)置
job.setInputFormatClass(TextInputFormat.class);
將輸入key改變成Text我們要設(shè)置
job.setInputFormatClass(KeyValueTextInputFormat.class);
我們先來(lái)看一下TextInputFormat的源碼
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
public TextInputFormat() {
}
// 定義文本的讀取方式弧满,返回一個(gè)RecoredReader <LongWritable,Text>
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
}
// 返回一個(gè)LineRecordReader(繼承自RecordReader)
return new LineRecordReader(recordDelimiterBytes);
}
// 判斷是否分片,true進(jìn)行分片
protected boolean isSplitable(JobContext context, Path file) {
// 判斷是否加入壓縮
CompressionCodec codec = (new CompressionCodecFactory(context.getConfiguration())).getCodec(file);
return null == codec ? true : codec instanceof SplittableCompressionCodec;
}
}
在TextInputFormat中重寫了FileInputFormat類中的isSplitable()并進(jìn)行了壓縮判斷
protected boolean isSplitable(JobContext context, Path filename) {
return true;
}
我們?cè)倏匆幌翶eyValueTextInputFormat的源碼
public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {
public KeyValueTextInputFormat() {
}
// 判斷是否分片此熬,并進(jìn)行壓縮判斷
protected boolean isSplitable(JobContext context, Path file) {
CompressionCodec codec = (new CompressionCodecFactory(context.getConfiguration())).getCodec(file);
return null == codec ? true : codec instanceof SplittableCompressionCodec;
}
// 定義文本的讀取方式庭呜,返回一個(gè)RecoredReader <Text,Text>
public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
context.setStatus(genericSplit.toString());
return new KeyValueLineRecordReader(context.getConfiguration());
}
}
總結(jié):FileInputFormat是所以使用文件作為其數(shù)據(jù)源的InputFormat的子類,他有6個(gè)子類分別是
- TextInputFormat 每次讀入一行數(shù)據(jù) key:該行的偏移量 value: 行內(nèi)容
- KeyValueTextInputFormat 按照分隔符分割為key和value犀忱,如果不存在key則是一行數(shù)據(jù)募谎,value為空
分隔符設(shè)置,默認(rèn)為“\t”conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");//設(shè)置“峡碉,”為分隔符
- NLineInputFormat 可以實(shí)現(xiàn)Mapper收到固定行數(shù)
NLineInputFormat.setNumLinesPerSplit(job, 20); // 設(shè)置行數(shù)
- SequenceFileInputFormat 為Hadoop順序文件設(shè)計(jì)近哟,存儲(chǔ)二進(jìn)制<key,value>
- CombineFileInputFormat 為小文件設(shè)計(jì),可以將多個(gè)文件打包到一個(gè)分片中
- FixedLengthInputFormat
多路徑輸入
我們可以通過(guò)設(shè)置setInputPaths()設(shè)置多個(gè)路徑
FileInputFormat.setInputPaths(new Path(),new Path(),new Path());
也可以通過(guò)設(shè)置MultipleInputs來(lái)實(shí)現(xiàn)多個(gè)文件指定不同的Mapper
MultipleInputs.addInputPath(job, new Path(""), TextInputFormat.class,MapA.class);
MultipleInputs.addInputPath(job, new Path(""), TextInputFormat.class,MapB.class);
小文件處理
- 文件壓縮
-
自定義分片
- 繼承FileInputFormat類
- 重寫里面的isSplitable 改成返回false 取消默認(rèn)分片規(guī)則
- 重寫createRecordReader 方法 指定新的規(guī)則
- 編寫一個(gè)類繼承RecordReader
- 重寫里面的方法
- 設(shè)置你定義的InputFormat