思考問題
Mapper類
Mapper類
org.apache.hadoop.mapreduce.Mapper<KEYIN犀变、VALUEIN惠桃、KEYOUT弟灼、VALUEOUT>
四個泛型,分別是KEYIN嗽上、VALUEIN办绝、KEYOUT伊约、VALUEOUT,
前面兩個KEYIN、VALUEIN 指的是map 函數(shù)輸入的參數(shù)key孕蝉、value 的類型屡律;
后面兩個KEYOUT、VALUEOUT 指的是map 函數(shù)輸出的key降淮、value 的類型超埋。
Mapper有setup(),map(),cleanup()和run()四個方法霍殴。
其中setup()一般是用來進(jìn)行一些map()前的準(zhǔn)備工作媒惕,
map()則一般承擔(dān)主要的處理工作,
cleanup()則是收尾工作如關(guān)閉文件或者執(zhí)行map()后的K-V分發(fā)等来庭。run()方法提供了setup->map->cleanup()的執(zhí)行模板妒蔚。
在MapReduce中,Mapper從一個輸入分片中讀取數(shù)據(jù)月弛,然后經(jīng)過Shuffle and Sort階段肴盏,分發(fā)數(shù)據(jù)給Reducer,在Map端和Reduce端我們可能使用設(shè)置的Combiner進(jìn)行合并帽衙,這在Reduce前進(jìn)行菜皂。Partitioner控制每個K-V(鍵值)對應(yīng)該被分發(fā)到哪個reducer(我們的Job可能有多個reducer),Hadoop默認(rèn)使用HashPartitioner佛寿,HashPartitioner使用key的hashCode對reducer的數(shù)量取模得來幌墓。
run()方法
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
可以得出,K/V對是從傳入的Context(上下文)獲取的冀泻。
map()方法
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
也看得出輸出結(jié)果K/V對也是通過Context來完成的
作為map方法輸入的鍵值對常侣,其value值存儲的是文本文件中的一行(以回車符為行結(jié)束標(biāo)記),而key值為該行的首字母相對于文本文件的首地址的偏移量弹渔。將<K1,V1>作為map方法的結(jié)果輸出胳施,其余的工作都交有 MapReduce框架 處理。
這里輸入?yún)?shù)key肢专、value 的類型就是KEYIN舞肆、VALUEIN,每一個鍵值對都會調(diào)用一次map 函數(shù)博杖。在這里,map 函數(shù)沒有處理輸入的key椿胯、value,直接通過context.write(…)方法輸出了剃根,輸出的key哩盲、value 的類型就是KEYOUT、VALUEOUT狈醉。這是默認(rèn)實現(xiàn)廉油,通常是需要我們根據(jù)業(yè)務(wù)邏輯覆蓋的。
當(dāng)調(diào)用到map時苗傅,通常會先執(zhí)行一個setup函數(shù)抒线,最后會執(zhí)行一個cleanup函數(shù)。而默認(rèn)情況下渣慕,這兩個函數(shù)的內(nèi)容都是nothing嘶炭。因此抱慌,當(dāng)map方法不符合應(yīng)用要求時,可以試著通過增加setup和cleanup的內(nèi)容來滿足應(yīng)用的需求旱物。
Reducer類
Reducer類
org.apache.hadoop.mapreduce.Reducer<KEYIN遥缕、VALUEIN、KEYOUT宵呛、VALUEOUT>
四個泛型,分別是KEYIN单匣、VALUEIN、KEYOUT宝穗、VALUEOUT,
前面兩個KEYIN户秤、VALUEIN 指的是map 函數(shù)輸出的參數(shù),即reduce 函數(shù)輸入的key逮矛、value 的類型鸡号;
后面兩個KEYOUT、VALUEOUT 指的是reduce 函數(shù)輸出的key须鼎、value 的類型鲸伴。
Reducer有3個主要的函數(shù),分別是:setup()晋控,clearup()汞窗,reduce(),run()赡译。
reducer()
@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values,
Context context ) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
run()
@SuppressWarnings("unchecked")
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
((ReduceContext.ValueIterator)
(context.getValues().iterator())).resetBackupStore();
}
cleanup(context);
}
}
當(dāng)調(diào)用到reduce時仲吏,通常會先執(zhí)行一個setup函數(shù),最后會執(zhí)行一個cleanup函數(shù)蝌焚。而默認(rèn)情況下裹唆,這兩個函數(shù)的內(nèi)容都是nothing。因此只洒,當(dāng)reduce不符合應(yīng)用要求時许帐,可以試著通過增加setup和cleanup的內(nèi)容來滿足應(yīng)用的需求。
InputFormat類
平時我們寫MapReduce程序的時候毕谴,在設(shè)置輸入格式的時候成畦,總會調(diào)用形如job.setInputFormatClass(KeyValueTextInputFormat.class);來保證輸入文件按照我們想要的格式被讀取。
所有的輸入格式都繼承于InputFormat析珊,這是一個抽象類,其子類有專門用于讀取普通文件的FileInputFormat蔑穴,用來讀取數(shù)據(jù)庫的DBInputFormat等等忠寻。
其實,一個輸入格式InputFormat存和,主要無非就是要解決如何將數(shù)據(jù)分割成分片(比如多少行為一個分片)奕剃,以及如何讀取分片中的數(shù)據(jù)(比如按行讀戎月谩)。前者由getSplits()完成纵朋,后者由RecordReader完成柿顶。這些方法的實現(xiàn)都在子類中。
1 public abstract class InputFormat<K, V> {
2
3 public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
4
5 public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, InterruptedException;
6
7 }
類InputFomat 是負(fù)責(zé)把HDFS 中的文件經(jīng)過一系列處理變成map 函數(shù)的輸入部分的操软。這個類做了三件事情:
第一嘁锯, 驗證輸入信息的合法性,包括輸入路徑是否存在等聂薪;
第二家乘,把HDFS 中的文件按照一定規(guī)則拆分成InputSplit,每個InputSplit 由一個Mapper執(zhí)行藏澳;
第三仁锯,提供RecordReader,把InputSplit 中的每一行解析出來供map 函數(shù)處理翔悠;
MapReduce應(yīng)用開發(fā)人員并不需要直接處理InputSplit业崖,因為它是由InputFormat創(chuàng)建。InputFormat負(fù)責(zé)產(chǎn)生輸入分片并將它們分隔成記錄蓄愁。
InputSplit
我們知道Mappers的輸入是一個一個的輸入分片双炕,稱InputSplit。InputSplit是一個抽象類涝登,它在邏輯上包含了提供給處理這個InputSplit的Mapper的所有K-V對雄家。
getLength()用來獲取InputSplit的大小,以支持對InputSplits進(jìn)行排序胀滚,而getLocations()則用來獲取存儲分片的位置列表趟济。
public abstract class InputSplit {
public abstract long getLength() throws IOException, InterruptedException;
public abstract
String[] getLocations() throws IOException, InterruptedException;
}
InputSplit是hadoop定義的用來傳送給每個單獨的map的數(shù)據(jù),InputSplit存儲的并非數(shù)據(jù)本身咽笼,而是一個分片長度和一個記錄數(shù)據(jù)位置的數(shù)組顷编。生成InputSplit的方法可以通過InputFormat()來設(shè)置。
當(dāng)數(shù)據(jù)傳送給map時剑刑,map會將輸入分片傳送到InputFormat媳纬,InputFormat則調(diào)用方法getRecordReader()生成RecordReader,RecordReader再通過creatKey()施掏、creatValue()方法創(chuàng)建可供map處理的一個一個的<key,value>對钮惠。
簡而言之,InputFormat()方法是用來生成可供map處理的<key,value>對的七芭。
FileinputFormat類
FileinputFormat類是所有使用文件作為其數(shù)據(jù)源的InputFormat實現(xiàn)的基類素挽。
它提供了兩個功能:
- 定義哪些文件包含在一個作業(yè)的輸出中
- 輸入文件生成分片的實現(xiàn)。
并把分片分隔成記錄的作業(yè)由其子類來完成狸驳。
FileinputFormat類的輸入路徑
作業(yè)的輸入被設(shè)定為一組路徑预明,這對限定作業(yè)輸入提供了很大的靈活性缩赛。
如果需要排除特定文件可以使用FileInPutFormat的SetInputPathFilter()方法設(shè)置一個過濾器:
FileInPutFormat類的輸入分片
給定一組文件旨袒,F(xiàn)ileInPutFormat是如何把它們轉(zhuǎn)換為輸入分片的?
FileInPutFormat只分割大文件遮斥。這里的大是值超過HDFS塊的大小峦失。而分片通常與HDFS塊大小一樣,也可以設(shè)置不同的Hadoop屬性來改變术吗。
下面是該類對getSplits 方法的實現(xiàn)
利用FileInputFormat 的getSplits方法尉辑,我們就計算出了我們的作業(yè)的所有輸入分片了
注意:每一個輸入分片啟動一個Mapper 任務(wù)。
public List<InputSplit> getSplits(JobContext job
2 ) throws IOException {
3 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
4 long maxSize = getMaxSplitSize(job);
5
6 // generate splits
7 List<InputSplit> splits = new ArrayList<InputSplit>();
8 List<FileStatus>files = listStatus(job);
9 for (FileStatus file: files) {
10 Path path = file.getPath();
11 FileSystem fs = path.getFileSystem(job.getConfiguration());
12 long length = file.getLen();
13 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
14 if ((length != 0) && isSplitable(job, path)) {
15 long blockSize = file.getBlockSize();
16 long splitSize = computeSplitSize(blockSize, minSize, maxSize);
17
18 long bytesRemaining = length;
19 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
20 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
21 splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
22 blkLocations[blkIndex].getHosts()));
23 bytesRemaining -= splitSize;
24 }
25
26 if (bytesRemaining != 0) {
27 splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
28 blkLocations[blkLocations.length-1].getHosts()));
29 }
30 } else if (length != 0) {
31 splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
32 } else {
33 //Create empty hosts array for zero length files
34 splits.add(new FileSplit(path, 0, length, new String[0]));
35 }
36 }
37
38 // Save the number of input files in the job-conf
39 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
40
41 LOG.debug("Total # of splits: " + splits.size());
42 return splits;
43 }
那這些計算出來的分片是怎么被map讀取出來的呢较屿?就是InputFormat中的另一個方法createRecordReader(),FileInputFormat并沒有對這個方法做具體的要求隧魄,而是交給子類自行去實現(xiàn)它。
- RecordReader:
RecordReader是用來從一個輸入分片中讀取一個一個的K -V 對的抽象類隘蝎,我們可以將其看作是在InputSplit上的迭代器购啄。我們從類圖中可以看到它的一些方法,最主要的方法就是nextKeyvalue()方法嘱么,由它獲取分片上的下一個K-V 對狮含。 - 我們再深入看看上面提到的RecordReader的一個子類: Lin eRecordReader。
LineRecordReader由一個FileSplit構(gòu)造出來曼振,start是這個FileSplit的起始位置几迄,pos是當(dāng)前讀取分片的位置,end是分片結(jié)束位置冰评,in是打開的一個讀取這個分片的輸入流映胁,它是使用這個FileSplit對應(yīng)的文件名來打開的。key和value則分別是每次讀取的K-V對甲雅。然后我們還看到可以利用getProgress()來跟蹤讀取分片的進(jìn)度解孙,這個函數(shù)就是根據(jù)已經(jīng)讀取的K-V對占總K-V對的比例來顯示進(jìn)度的。
其他輸入類
CombineFileInputFormat類 能夠很好的處理小文件
WholeFileInputFormat類 使用RecordReader將整個文件讀為一條記錄抛人。TestInputFormat類 Hadoop默認(rèn)的輸入方法弛姜,每條記錄是一行輸入。
鍵是LongWritable類型妖枚,存儲該行在整個文件中的字節(jié)偏移量廷臼。
值是這行的內(nèi)容,不包括任何行終止符(換行符和回車符),是Text類型的中剩。
SequenceFileInputFormat類 順序文件格式存儲二進(jìn)制的鍵值對的序列作為MapReduce的輸入時使用。
MultipleInputs類 能妥善處理多種格式輸入問題抒寂。
DBInputFormat 這種輸入格式用于使用JDBC從關(guān)系數(shù)據(jù)庫中讀取數(shù)據(jù)结啼。