圖片摘自于Alexey Grishchenko博文
Map-side
InputFormat Class :
getSplits: the set of input data splits 返回一組輸入數(shù)據(jù)的拆分文件
-
getRecordReader:
iterable
interface for reading all the records from a single input? 從單個輸入文件提供可迭代的接口用于讀取輸入數(shù)據(jù)
文件塊大小取決于InputFormat(自定義輸入文件拆分需要繼承此FileInputFormat)
輸入文件是文本類型需要配置dfs.blocksize的大小(hdfs-site.xml)
gzip類型的壓縮文件挺物,不可拆分需要用輸入整個文件等勒极。
每個mapper處理一個輸入拆分塊文件晃择,大多數(shù)時處理128MB大小文件若輸入文件是以GB或PB或更大磨确。
map
函數(shù)運用于輸入拆分文件的每對鍵值對(k酿愧,v)挺狰,它們的每對鍵值對都有RecordReader
返回身隐。
根據(jù)業(yè)務(wù)邏輯需求廷区,在mapper對每對鍵值做處理輸出結(jié)果,將結(jié)果由Context文本類傳給reducer端贾铝。
負責收集map的輸出數(shù)據(jù)(如文件)是mapreduce.job.map.output.collector
屬性 (mapreduce-default.xml)隙轻,默認是由org.apache.hadoop.mapred.MapTask$MapOutputBuffer
實現(xiàn)。
Map 函數(shù)的輸出結(jié)果首先調(diào)用從Partitioner類getPartition方法垢揩。 getPartition
需要傳入鍵值對(k玖绿,v) 還有reduce的任務(wù)數(shù)量(numReduceTasks),然后返回這些鍵值對匹配的分區(qū)水孩。
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
接著把輸出數(shù)據(jù)鍵值對和分區(qū)號一同寫入環(huán)形緩沖區(qū) (ring buffer), 此緩沖區(qū)大小由mapreduce.task.io.sort.mb
(mapreduce-default.xml) 定義镰矿, 默認為100MB, 最大map輸出數(shù)據(jù)信息允許在ring buffer占用的大小. 若輸出數(shù)據(jù)大小大于此值俘种,數(shù)據(jù)會被溢出到(寫入)硬盤秤标。
注意绝淡,map 輸出的環(huán)形緩沖區(qū)默認是比輸入文件的拆分塊(默認是128MB) 要小,所以多余的會被寫到硬盤中苍姜。
溢出的操作是由新的線程執(zhí)行牢酵,起初環(huán)形緩沖區(qū)的大小為0.8mapreduce.map.sort.spill.percent
(mapreduce-default.xml), 所以最初的緩沖區(qū)大小為80MB衙猪。 map task的輸出溢出文件大小默認是大于80MB的馍乙,多余的是會被寫到硬盤里。
溢出用開啟新線程來處理輸出文件數(shù)據(jù)是為了讓mapper在處理溢出同時也能繼續(xù)執(zhí)行處理輸入文件數(shù)據(jù)垫释。
注意: 當處理輸入數(shù)據(jù)(FileInputFormat)的數(shù)率比溢出的數(shù)率快時丝格,Mapper函數(shù)會停止工作 因為 內(nèi)存的環(huán)形緩沖區(qū)可能會達到100%。在這種情況棵譬,mapper函數(shù)會阻塞并等待溢出線程為下一批輸出數(shù)據(jù)處理清空一些內(nèi)存空間显蝌。
溢出線程會把環(huán)形緩沖區(qū)數(shù)據(jù)寫到mapper函數(shù)調(diào)用的服務(wù)器的本地的文件里。溢出線程寫出本地的路勁是由mapreduce.job.local.dir
(mapred-default.xml)定義订咸,此配置屬性包含了一組由集群上MapReduce任務(wù)用到的路徑來存儲零時數(shù)據(jù)曼尊。 文件夾被一個接一個使用。 寫入前脏嚷,數(shù)據(jù)會以快速排序 進行排序:comparator函數(shù)先對比partition分區(qū)號然后再對比key值骆撇,以至于先排分區(qū),再每個分區(qū)排序key值父叙。
排序完成后神郊,Combiner被調(diào)用用于減少輸入硬盤的數(shù)據(jù)量。Combiner的輸出會被寫入硬盤高每。 有個邊際情況是當mapper產(chǎn)生的輸出數(shù)據(jù)過于大不能容入內(nèi)存時(大于輸出緩沖區(qū)大杏炱瘛),sorter和combiner時不會被調(diào)用鲸匿; 這時爷怀,mapper的輸出數(shù)據(jù)就會直接寫入硬盤。
無論mapper輸出數(shù)據(jù)是多大带欢,輸出完成時“Spill”是肯定會至少調(diào)用一次运授。
(詳細信息查看:ShuffleHandler.class#sendMapOutput
)
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, ? String user, String mapId, int reduce, MapOutputInfo mapOutputInfo)
同樣的map任務(wù)會執(zhí)行,如sort和combine乔煞,還有寫入本地磁盤文件吁朦。
每個由溢出線程溢出文件有個索引包含了每個溢出文件的分區(qū)信息:分區(qū)從哪開始于哪結(jié)束。 這些索引被存在內(nèi)存中渡贾,此內(nèi)存塊大小由mapreduce.task.index.cache.limit.bytes
決定逗宜,默認為1MB。 內(nèi)存不足時,所有的下一批生成的溢出文件的索引會與溢出文件一起被寫入到磁盤纺讲。
當mapper處理輸出文件與最后溢出結(jié)束時擂仍,溢出線程完成結(jié)束而合并階段開始。 在合并時熬甚,所有的溢出文件應(yīng)該合并一塊為單個map的輸出文件逢渔。一個合并過程默認可以處理10個溢出文件(由mapreduce.task.io.sort.factor
決定)。
<property>
<name>mapreduce.task.io.sort.factor</name>
<value>10</value>
<description>The number of streams to merge at once while sorting
files. This determines the number of open file handles.</description>
</property>
若溢出文件大于此屬性值乡括, 剩余的文件會被合并為一個大文件肃廓。
合并期間,若被合并的文件大于(min.num.spills.for.combine
, 默認為3), 寫入磁盤前诲泌,combiner會在merge結(jié)果上執(zhí)行盲赊。
MapTask的結(jié)果是一個包含了所有的Mapper輸出結(jié)果與描述分區(qū)開始-結(jié)束的索引信息的輸出文件, 這些分區(qū)開始-結(jié)束索引信息便于ReduceTask能夠從磁盤中索取每個reducer任務(wù)的運行相關(guān)數(shù)據(jù)档礁。
Reduce-Side
Map的任務(wù)數(shù)量由拆分塊決定的角钩,然而reduce的任務(wù)用戶自己設(shè)置的(mapreduce.job.reduces
, 默認為1)呻澜。 Reduce端的shuffle的實現(xiàn)由mapreduce.job.reduce.shuffle.consumer.plugin.class
屬性決定,默認為org.apache.hadoop.mapreduce.task.reduce.Shuffle
惨险。
Reduce端做的第一件事就開啟”Event Fetcher" 線程羹幸,從Application Master得到Mapper的狀態(tài)并監(jiān)聽mapper的事件是否執(zhí)行結(jié)束。 當mapper結(jié)束自己的shuffle過程辫愉,mapper的輸出文件數(shù)據(jù)傳送到多個“Fetcher”線程的其中一個栅受。 “Fetcher”線程是由mapreduce.reduce.shuffle.parallelcopies
決定的,默認為5個恭朗,這意味著單個reduce任務(wù), 有五個線程來從mapper端并行拷貝數(shù)據(jù)屏镊。Fetch的節(jié)點間的傳輸是通過HTTP或HTTPS的協(xié)議連接fetcher到相應(yīng)的DataNode URL。
<property>
<name>mapreduce.reduce.shuffle.parallelcopies</name>
<value>5</value>
<description>The default number of parallel transfers run by reduce
during the copy(shuffle) phase.
</description>
</property>
“Fetcher”從Mapper端下載的數(shù)據(jù)都被存在內(nèi)存中痰腮,此內(nèi)存大小占用reducer內(nèi)存比例由mapreduce.reduce.shuffle.input.buffer.percent
決定而芥,總的reducer內(nèi)存是mapreduce.reduce.memory.totalbytes
。
<property>
<name>mapreduce.reduce.shuffle.merge.percent</name>
<value>0.66</value>
<description>The usage threshold at which an in-memory merge will be
initiated, expressed as a percentage of the total memory allocated to
storing in-memory map outputs, as defined by
mapreduce.reduce.shuffle.input.buffer.percent.
</description>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>1024</value>
<description>The amount of memory to request from the scheduler for each
reduce task.
</description>
</property>
若這些內(nèi)存不夠容納膀值,reducer會把數(shù)據(jù)存入reducer端的本地磁盤中的mapreduce.job.local.dir
文件夾棍丐。
<property>
<name>mapreduce.cluster.local.dir</name>
<value>${hadoop.tmp.dir}/mapred/local</value>
<description>The local directory where MapReduce stores intermediate
data files. May be a comma-separated list of
directories on different devices in order to spread disk i/o.
Directories that do not exist are ignored.
</description>
</property>
fetcher索取相應(yīng)文件數(shù)據(jù)后,merger線程開啟工作沧踏。它們不會等待整個fetching過程完成而是開新線程與其并行執(zhí)行歌逢。 Hadoop有三種merger線程。
- InMemory merger (內(nèi)存線程)
- 不能關(guān)閉翘狱。 由Reduce任務(wù)索取MapTask的輸出數(shù)據(jù)占用的Reducer內(nèi)存緩存超出了總內(nèi)存允許的占用百分比
reduce.shuffle.merge.percent
而啟動秘案。 合并后執(zhí)行combiner。輸出寫入硬盤,總會被調(diào)用至少一次阱高。
- 不能關(guān)閉翘狱。 由Reduce任務(wù)索取MapTask的輸出數(shù)據(jù)占用的Reducer內(nèi)存緩存超出了總內(nèi)存允許的占用百分比
- MemToMem merger (內(nèi)存到內(nèi)存)
- 默認為關(guān)閉师骗。 可由
reduce.merge.memtomem.enabled
開啟。 此線程合并內(nèi)存中mapper的輸出文件數(shù)據(jù)并寫reduce輸出到內(nèi)存讨惩。當不同的MapTask輸出文件大小達到mapreduce.reduce.merge.memtomem.threshold
(默認為1000)辟癌, 線程會被啟動。
- 默認為關(guān)閉师骗。 可由
- OnDisk (硬盤)
- 在一次線程執(zhí)行中荐捻, 當文件數(shù)量以(
2 * task.io.sort.factor - 1
) 上升 , 但是合并不超過mapreduce.task.io.sort.factor
的文件數(shù)量而啟動黍少。 OnDisk Merger線程合并本地磁盤的文件。
- 在一次線程執(zhí)行中荐捻, 當文件數(shù)量以(
最后個線程处面, finalMerge 在reducer的主線程中運行厂置,合并所有由InMemory和OnDisk在本地磁盤產(chǎn)生剩余的文件。 最終合并輸出結(jié)果分布在RAM和硬盤之間魂角。RAM最大允許使用為reduce 輸入大小是由總的棧大小mapred.job.reduce.markreset.buffer.percent
百分比來決定的昵济,默認為0.
在這些所有的線程啟動完成后,reducer會把輸出寫入HDFS文件系統(tǒng)中野揪。
原文出自于访忿,Alexey Grishchenko的Hadoop MapReduce Comprehensive Description
譯者: 邁大_阿李同學