MapReduce數(shù)據(jù)處理模型:
map和reduce函數(shù)的輸入和輸出時鍵值對薪棒。
MapReduce的類型
Hadoop的 MapReduce中诈茧,map函數(shù)和reduce函數(shù)遵循如下格式:
- map:(k1,v1) --->list(k2,v2)
- combiner:(k2,list(v2))---> lsit(k2,v2)
- reduce:(k2,list(v2)) --->list(k3,v3)
默認的MapReduce作業(yè)
1别威、默認mapper是Mapper類母谎,將輸入的鍵和值原封不動地寫到輸出中:
public class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>{
protected void map(KEYIN key,VALUEIN value,
Context context) throws IOException,InterruptedException{
context.write((KEYOUT)key,(VALUE)value);
}
}
Mapper是一個泛型類型(generic type)埂蕊,可以接受任何鍵或值的類型揪利。map任務(wù)的數(shù)量等于輸入文件被劃分成的分塊數(shù)歉提,取決于輸入文件的大小及文件塊的大小笛坦。
2、默認的partitioner(分區(qū)索引)是HashPartitioner苔巨,它對每條記錄的鍵進行哈希操作版扩,以絕對該記錄應(yīng)該屬于哪個分區(qū):
public class HashPartitioner<K,V>extends Partitioner<K,V>{
public int getPartitioner(k key,V value,int numPartitions){
return (key.hashCode()& Integer.MAX_VALUE) % numPartitions;
}
}
鍵的哈希碼被轉(zhuǎn)換成一個非負整數(shù),它由哈希值與最大的整型值做一次按位與操作而獲得侄泽,然后用分區(qū)數(shù)礁芦,進行取模操作,以決定該記錄屬于哪個分區(qū)索引。
每個分區(qū)由一個reduce任務(wù)處理柿扣,所以分區(qū)數(shù)等于作業(yè)的reduce任務(wù)個數(shù)肖方,默認情況下,只有一個reducer未状,即只有一個分區(qū)俯画,該情況下想,所有數(shù)據(jù)都放在同一分區(qū)司草,partitioner操作變得無關(guān)緊要艰垂,如果有多個reduce任務(wù),這是HashPartitioner的作用便體現(xiàn)出來翻伺。
3材泄、默認reducer是Reducer類型沮焕,也是一個泛型類型吨岭,把所有輸入寫到輸出中:
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>{
protected void reduce(KEYIN key,Iterable<VALUEIN> values,
Context context)throws IOException,InterruptedException{
for(VALUEIN value:values){
context.write((KEYOUT)key,(VALUEOUT)value);
}
}
}
輸入格式
輸入分片與記錄
一個輸入分片(split)是一個由單個map操作來處理的輸入塊,每個map操作只處理一個輸入分片峦树。每個分片被劃分為若干條記錄辣辫,每條記錄就是一個鍵值對,map一個接一個地處理記錄魁巩。
輸入分片在包org.apache.hadoop.mapreduce下急灭,用InputSplit接口表示
public abstract class InputSplit{
public abstract long getLength() throws IOException,InterruptedException;
public abstract String[] getLocations()throws IOException,InterruptedException;
}
InputSplit包含一個以字節(jié)為單位的長度和一組存儲位置(一組主機名),分片并不包含數(shù)據(jù)本身谷遂,而是指向數(shù)據(jù)的引用葬馋,存儲位置用于供MapReduce系統(tǒng)使用以便將map任務(wù)盡量放在分片數(shù)據(jù)附近,而分片大小用來排序分片肾扰,以便優(yōu)先處理最大的分片畴嘶,從而最小優(yōu)化作業(yè)運行時間(貪心算法)。
MapReduce開發(fā)不用直接處理InputSplit集晚,它由InputFormat創(chuàng)建的(在MapReduce驅(qū)動程序中窗悯,InputFormat負責創(chuàng)建輸入分片并將它們分割成記錄),InputFormat類定義如下:
public abstract class InputFormat<K,V>{
public abstract List<InputSplit> getSplits(JobContext context)
throws IOException,InterruptedException;
public abstract RecordReader<K,V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,InterruptedException;
}
- 運行作業(yè)的客戶端(驅(qū)動程序)通過getSplits()計算分片偷拔,然后將它們發(fā)送到application master
- application master使用其存儲位置信息(InputSplit中的字段)來調(diào)度map任務(wù)蒋院,從而在集群上處理這些分片數(shù)據(jù)。
- map任務(wù)把輸入分片傳給InputFormat的createRecordReader()方法來獲取這個分片的RecordReader莲绰。RecordReader就像記錄上的迭代器欺旧,map任務(wù)用一個RecordReader來生成記錄的鍵值對,然后再傳遞給map函數(shù)蛤签。
查看Mapper的run方法辞友,如下:
public void run(Context context)throws IOException,InterruptedException{
setup(context);
while(context.nextKeyalue()){
map(context.getCurrentKey(),context.getCurrentValue(),context);
}
cleanup(context);
}
- 首先運行setup(),重復調(diào)用Context的nextKeyValue()為mapper產(chǎn)生鍵值對象顷啼。
- 通過Context踏枣,鍵值從RecordReader中被檢索出并傳遞給map()方法
- 當reader讀到stream的結(jié)尾時昌屉,nextKeyValue()方法返回false,map任務(wù)運行其cleanup()方法茵瀑,然后結(jié)束间驮。
Mapper的run方法是公共的 ,可以由用戶定制马昨。MultithreadedMapRunner是另一個MapRunnable接口的實現(xiàn)竞帽,可配置指定個數(shù)的線程來并發(fā)運行多個mapper。
FileInputFormat類
FileInputFormat是所有使用文件作為其數(shù)據(jù)源的InputFormat實現(xiàn)的基類鸿捧,它提供兩個功能:
- 用于指出作業(yè)的輸入文件位置
- 為輸入文件生成分片的代碼實現(xiàn)屹篓。
FileInputFormat類的輸入路徑:
作業(yè)的輸入被設(shè)定成一組路徑,F(xiàn)ileInputFormat提供4種靜態(tài)方法來設(shè)定Job
public static void addInputPath(Job job,Path path);
public static void addInputPaths(Job job,String commaSeperatedPaths);
public static void setInputPaths(Job job,Path inputPaths);
public static void setInputPaths(Job job,String commaSeperatedPaths);
FileInputFormat類的輸入分片:
FileInputFormat負責把大文件(超過HDFS塊的大谐着)分割成小文件(HDFS塊大卸亚伞),分片大小在默認情況下:
max(minimumSize, min(maximumSize,blockSize))
避免切分:
兩種實現(xiàn)方法:
- 增加最小分片大小泼菌,設(shè)置為最大文件大小谍肤,即long.MAX_VALUE
- 使用FileInputFormat具體子類,重寫isSplitable()方法哗伯,把返回值設(shè)置為false,如下
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class NonSplittableTextInputFormat extends TextInputFormat{
@Override
protected boolean isSplitable(JobContext context,Path file){
return false;
}
}
文本輸入
TextInputFormat是默認的InputFormat荒揣,每天記錄時一行輸入,鍵是LongWritable類型焊刹,存儲該行在整個文件中的字節(jié)偏移量系任。值是這行內(nèi)容,不包括任何終止符虐块,被打包成一個Text對象俩滥。如下:
//如下文本
On the top of the Crumpetty Tre
The Quangle Wangle sat,
But his face you could not see,
On accout of his Beaver Hat.
//每條記錄值,鍵值對
(0,On the top of the Crumpetty Tre )
(33,The Quangle Wangle sat,)
(57,But his face you could not see,)
(89,On accout of his Beaver Hat.)
其他類:KeyValueTextInputFormat、NLineInputFormat
二進制輸入
SequenceFileInputFormat類非凌、SequenceFileAsTextInputFormat類举农、SequenceFileAsBinaryInputFormat類、FileLengthInputFormat類敞嗡。
數(shù)據(jù)庫輸入/輸出
在關(guān)系數(shù)據(jù)庫與HDFS之間移動數(shù)據(jù)的方法是:使用Sqoop
其他方式:Hbase的TableInputFormat
輸出格式
類圖:
參考資料:《Hadoop權(quán)威指南》