數(shù)據(jù)處理總流程
MapReduce計算框架體現(xiàn)的是一個分治的思想。及將待處理的數(shù)據(jù)分片在每個數(shù)據(jù)分片上并行運行相同邏輯的map()函數(shù)惫皱,然后將每一個數(shù)據(jù)分片的處理結(jié)果匯集到reduce()函數(shù)進(jìn)行規(guī)約整理难礼,最后輸出結(jié)果彭沼。
總體上來說MapReduce的處理流程從邏輯上看并不復(fù)雜廉侧。對于應(yīng)用Hadoop進(jìn)行數(shù)據(jù)分析的開發(fā)人員來說肖方,只需實現(xiàn)map()方法和reduce()方法就能完成大部分的工作。正是因為Hadoop邏輯上和開發(fā)上都不復(fù)雜使它被廣泛的應(yīng)用于各行各業(yè)非洲。
Map階段
Map階段更為詳細(xì)的處理過程如圖所示:
一般情況下用戶需要處理分析的數(shù)據(jù)都在HDFS上鸭限。因此,MapReduce計算框架會是使用InputFormat(org.apache.hadoop.mapreduce)的子類將輸入數(shù)據(jù)分片(InputSplit)两踏。分片后的數(shù)據(jù)將作為MapTask的輸入里覆,MapTask會根據(jù)map()中的程序邏輯將數(shù)據(jù)分為K-V鍵值對。
為了更好的理解數(shù)據(jù)分片的過程和實現(xiàn)的邏輯缆瓣,本文以InputFormat的一個子類FileInputFormat為例研究數(shù)據(jù)分片的過程。
FileInputFormat類將數(shù)據(jù)分片虹统,然而這里所說的分片并不是將數(shù)據(jù)物理上分成多個數(shù)據(jù)塊而是邏輯分片弓坞。
PS:并不是所有文件都可以分片,比如gzip车荔,snappy壓縮的文件就無法分割 .
數(shù)據(jù)邏輯分片的核心方法是getSplits():
public List<InputSplit> getSplits(JobContext job) throws IOException {
渡冻。。忧便。族吻。。。
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
超歌。砍艾。。巍举。脆荷。。
return splits;
}
其流程圖如下所示:
getSplits()中的BlockLocation類保存待處理文件的數(shù)據(jù)塊信息懊悯,它包含了數(shù)據(jù)塊所在DataNode的hostname蜓谋,帶有緩存副本的數(shù)據(jù)塊所在的節(jié)點的hostname,訪問數(shù)據(jù)塊所在DataNode的IP:端口號炭分,在拓?fù)渚W(wǎng)絡(luò)中的絕對路徑名桃焕,數(shù)據(jù)塊在整個數(shù)據(jù)文件中的偏移量,數(shù)據(jù)塊長度捧毛,是否是壞塊观堂。getSplits()會依據(jù)這些信息創(chuàng)建一個FileSplit完成一個邏輯分片,然后將所有的邏輯分片信息保存到List中岖妄。List中的InputSplit包含四個內(nèi)容型将,文件的路徑,文件開始的位置荐虐,文件結(jié)束的位置七兜,數(shù)據(jù)塊所在的host。
除了getSplits()方法另一比較重要的算法是computeSplitSize()方法福扬,它負(fù)責(zé)確定數(shù)據(jù)分片的大小腕铸,數(shù)據(jù)分片的大小對程序的性能會有一定的影響,最好將數(shù)據(jù)分片的大小設(shè)置的和HDFS中數(shù)據(jù)分片的大小一致铛碑。確定分片大小的算法是:
Math.max(minSize, Math.min(maxSize, blockSize))
set mapred.max.split.size=256000000;2.x版本默認(rèn)約是128M狠裹,我們集群配置的是256M
set mapred.min.split.size=10000000;2.x版本默認(rèn)是約10M,我們集群配置的是1
blockSize 在hdfs-site.xml參數(shù)dfs.block.size中配置汽烦,我們集群設(shè)置的是默認(rèn)的是134217728=128M
set mapred.map.tasks 對map task數(shù)量僅僅是參考的作用涛菠,我們集群默認(rèn)的是2
對應(yīng)的是set mapred.reduce.tasks,我們集群默認(rèn)的是-1
reducer數(shù)量可能起作用的
hive.exec.reducers.bytes.per.reducer=256000000
hive.exec.reducers.max=1009
min( hive.exec.reducers.max 撇吞,總輸入數(shù)據(jù)量/hive.exec.reducers.bytes.per.reducer)
其中俗冻,minSize是配置文件中設(shè)置的分片最小值,minSize則為最大值牍颈,blockSize為HDFS中數(shù)據(jù)塊的大小迄薄。
完成邏輯分片后,F(xiàn)ileInputFormat的各個子類向MapTask映射k-v鍵值對(如TextInputFormat)煮岁。FileInputFormat的子類是對數(shù)據(jù)分片中的數(shù)據(jù)進(jìn)行處理讥蔽。
TextInputFormat中createRecorderReader()將InputSplit解析為k-v傳給mapTask涣易,該方法中用到了LineRecordReader它繼承自RecordReader。
MapTask最終是通過調(diào)用nextKeyValue()方法來遍歷分片中的數(shù)據(jù)并且將行數(shù)以及每一行的的數(shù)據(jù)分別作為key和value傳遞給map()方法冶伞。map()方法按照開發(fā)工程師編寫的邏輯對輸入的key和value進(jìn)行處理后會組成新的k-v對然后寫出到一個內(nèi)存緩沖區(qū)中新症。
每個MapTask都有一個內(nèi)存緩沖區(qū),對緩沖區(qū)讀寫是典型的生產(chǎn)者消費者模式碰缔。這里內(nèi)存緩沖區(qū)的結(jié)構(gòu)設(shè)計對MapTask的IO效率有著直接的影響账劲。Hadoop采用了環(huán)形內(nèi)存緩沖區(qū),當(dāng)緩沖區(qū)數(shù)據(jù)量達(dá)到閾值消費者線程SpillThread開始將數(shù)據(jù)寫出金抡,于此同時充當(dāng)生產(chǎn)者的writer()函數(shù)依然可以將處理完的數(shù)據(jù)寫入到緩沖區(qū)中瀑焦。生產(chǎn)者和消費者之間的同步是通過可重入互斥鎖spillLock來完成的。
在寫磁盤之前梗肝,線程會對緩沖區(qū)內(nèi)的數(shù)據(jù)進(jìn)行分區(qū)榛瓮,以決定各個數(shù)據(jù)會傳輸?shù)侥膫€Reduce中。而在每個分區(qū)中會按key進(jìn)行排序(如果此時有個Combiner則它會在排序后的輸出上運行一次巫击,以壓縮傳輸?shù)臄?shù)據(jù))
mapred-site.xml 文件中
mapreduce.task.io .sort.mb=300M
mapreduce.map.sort.spill.percent 配置的默認(rèn)只0.8
用戶可以通過繼承Partitiner類并且實現(xiàn)getPartitioner()方法禀晓,從而定制自己的分區(qū)規(guī)則。默認(rèn)的分區(qū)規(guī)則是通過key的hashCode來完成分區(qū)的坝锰。
環(huán)形緩沖區(qū)在達(dá)到溢寫的閾值后粹懒,溢寫到磁盤(每次溢寫都會新建一個溢寫文件)最后合并溢寫文件,形成一個分區(qū)有序的中間結(jié)果顷级。另外可以對中間結(jié)果進(jìn)行壓縮凫乖,以減少傳輸?shù)臄?shù)據(jù)量。
Reduce階段
Reduce階段更為詳細(xì)的流程如下圖所示:
ReduceTask對數(shù)據(jù)進(jìn)行規(guī)約的第一步就是從MapTask的輸出磁盤上將數(shù)據(jù)拉取過來弓颈。這個過程重點分析shuffle類和Fetcher類帽芽。Shuffle類如下圖所示:
Shuffle類中的init()方法負(fù)責(zé)初始化Shuffle階段需要的上下文,并且在Shuffle的最后階段調(diào)用歸并排序方法翔冀。Shuffle類的核心方法為run()方法导街。
public RawKeyValueIterator run() throws IOException, InterruptedException {
。纤子。搬瑰。。控硼。跌捆。
// Start the map-output fetcher threads
Boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getint(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,reduceTask.getShuffleSecret());
fetchers[i].start();
}
}
。象颖。。姆钉。说订。抄瓦。
eventFetcher.shutDown();
for (Fetcher<K,V> fetcher : fetchers) {
fetcher.shutDown();
}
scheduler.close();
copyPhase.complete();
// copy is already complete
taskStatus.setPhase(TaskStatus.Phase.SORT);
reduceTask.statusUpdate(umbilical);
RawKeyValueIterator kvIter = null;
。陶冷。钙姊。。埂伦。煞额。
return kvIter;
}
在run()方法中它是通過啟動fetcher線程來拉取數(shù)據(jù)的。首先需要判斷將要拉取的數(shù)據(jù)是否具有本地性沾谜,如果數(shù)據(jù)在本地則直接傳入文件的地址否則創(chuàng)建fetcher線程來從其他節(jié)點遠(yuǎn)程拉取數(shù)據(jù)膊毁。Fetcher類類圖如下:
Fetcher繼承自Thread類因此它重寫了run()方法并且調(diào)用了copyFromHost()方法。copyFromHost()方法首先獲取指定host上運行完成的MapTaskID然后循環(huán)的從Map段讀取數(shù)據(jù)直到所有的數(shù)據(jù)都讀取完成基跑。
protected void copyFromHost(MapHost host) throws IOException {
婚温。。媳否。栅螟。。篱竭。
List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
力图。。掺逼。吃媒。。坪圾。
while (!remaining.isEmpty() && failedTasks == null) {
try {
failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);
}
catch (IOException e) {
晓折。。兽泄。漓概。。
}
}
}
讀取數(shù)據(jù)是在copyMapOutput()方法中完成的病梢,方法中用到了ShufferHeader類它實現(xiàn)了Writable接口從而可以完成序列化與反序列化的工作胃珍,它調(diào)用readFields()方法從數(shù)據(jù)流中讀取數(shù)據(jù)。
mapreduce.task.io .sort.factor =25
讀取數(shù)據(jù)過程中需要注意的是蜓陌,如果中間結(jié)果小則復(fù)制到內(nèi)存緩沖區(qū)中否則復(fù)制到本地磁盤中觅彰。當(dāng)內(nèi)存緩沖區(qū)達(dá)到大小閾值或者文件數(shù)閾值則溢寫到本地磁盤,與此同時后臺線程會不停的合并溢寫文件形成大的有序的文件钮热。
在Shuffle-copy階段進(jìn)行的同時Shuffle-Sort也在處理數(shù)據(jù)填抬,這個階段就是針對內(nèi)存中的數(shù)據(jù)和磁盤上的數(shù)據(jù)進(jìn)行歸并排序。
復(fù)制完所有的map輸出做循環(huán)歸并排序合并數(shù)據(jù)隧期。舉個例子更加好理解飒责,若合并因子為10赘娄,50個輸出文件,則合并5次宏蛉,最后剩下5個文件不符合合并條件遣臼,則將這5個文件交給Reduce處理。
Reduce階段會接收到已經(jīng)排完序的k-v對拾并,然后對k-v對進(jìn)行邏輯處理最后輸出結(jié)果k-v對到HDFS中.