Hadoop InputFormat介紹
1 概述
我們在編寫MapReduce程序的時候密幔,在設(shè)置輸入格式的時候箱亿,會調(diào)用如下代碼:
job.setInputFormatClass(KeyVakueTextInputFormat.class)
通過上面的代碼來保證輸入的文件是按照我們想要的格式被讀取,所有的輸入格式都繼承于InputFormat,這是一個抽象類舱呻,其子類有專門用于讀取普通文件的FileInputFormatt,用于讀取數(shù)據(jù)庫文件的DBInputFromat,用于讀取HBase的TableInputFormat等等箱吕。如下圖是InputFormat的圖譜芥驳。
2 InputFormat方法
從類圖中可以看出,InputFormat抽象類僅有兩個抽象方法:
public abstract List<InputSplit> getSplits(JobContext context)
public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context)
getSplits()方法是邏輯上拆分作業(yè)的輸入文件集茬高,然后將每個InputSplit分配給一個單獨的Mapper進行處理
注意:拆分是按輸入文件的邏輯分割兆旬,而輸入文件不會被物理分割成塊。每個切片都是一個<input-file-path,start,offset>
的元組怎栽,InputFormat并創(chuàng)建相應(yīng)的RecordReader讀取這些切片丽猬。
createRecordReader()方法是為給定的切片創(chuàng)建一個記錄閱讀器。在切片被使用之前先調(diào)用RecordReader.initialize(InputSplit, TaskAttemptContext)
方法熏瞄。
通過InputFormat脚祟,MapReduce框架可以做到:
- 驗證作業(yè)輸入的正確性
- 將輸入的文件切割成邏輯分片(InputSplit),一個InputSplit將會分配給一個獨立的MapTask
- 提供RecordReader實現(xiàn),讀取InputSplit中的Kv對供Mapper使用强饮。
不同的InputFormat會各自實現(xiàn)不同的文件讀取方法以及分片方式由桌,每個輸入分片會被單獨的MapTask作為數(shù)據(jù)源。下面將介紹InputSplit和RecordReader胡陪。
3 InputSplit介紹
MapTask的輸入是一個輸入切片沥寥,稱為InputSplit。InputSplit也是一個抽象類柠座,它在邏輯上包含給處理這個InputSplit的Mapper的所有KV對邑雅。不同類型的輸入格式對應(yīng)不同類型的切片,下圖是InputSplit的類圖妈经。
3.1 InputSplit方法
// 獲取切片大小淮野,并且根據(jù)size對切片排序
public abstract long getLength()
// 獲取存儲該分片的數(shù)據(jù)所在的節(jié)點位置,其中的數(shù)據(jù)是本地的吹泡,位置信息不需要序列號
public abstract String[] getLocations()
// 獲取有關(guān)切片在那個節(jié)點上的信息骤星,以及它是如何存儲在每個位置的
public SplitLocationInfo[] getLocationInfo()
4 RecordReader
RecorderReader將讀入到Map的數(shù)據(jù)拆分成KV對。RecorderReader也是一個抽象類爆哑。下面是RecordReader的類圖:
接下來看一下RecordReader的源代碼:
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
/**
* 由一個InputSplit初始化
*/
public abstract void initialize(InputSplit split,
TaskAttemptContext context
) throws IOException, InterruptedException;
/**
* 讀取分片下一個KV
*/
public abstract
boolean nextKeyValue() throws IOException, InterruptedException;
/**
* Get the current key
*/
public abstract
KEYIN getCurrentKey() throws IOException, InterruptedException;
/**
* Get the current value.
*/
public abstract
VALUEIN getCurrentValue() throws IOException, InterruptedException;
/**
* 跟蹤讀取分片的進度
*/
public abstract float getProgress() throws IOException, InterruptedException;
/**
* Close the record reader.
*/
public abstract void close() throws IOException;
}