原文地址: https://itweknow.cn/detail?id=64 乖阵,歡迎大家訪問(wèn)宣赔。
簡(jiǎn)介
MapReduce程序會(huì)確保每個(gè)reduce函數(shù)的輸入都是按鍵排序的预麸。系統(tǒng)執(zhí)行排序以及將map函數(shù)的輸出傳給reduce函數(shù)的過(guò)程稱之為shuffle。整個(gè)Shuffle分為Map端和Reduce端儒将,下圖是MapReduce的Shuffle的一個(gè)整體概覽圖吏祸,大家先看一下整個(gè)圖,我們后面再做進(jìn)一步的解釋說(shuō)明。
Map端
其實(shí)Map函數(shù)產(chǎn)生的輸出會(huì)寫到磁盤上而不是HDFS贡翘。但是它也不是簡(jiǎn)簡(jiǎn)單單的直接寫到磁盤蹈矮,這中間有一個(gè)復(fù)雜的過(guò)程,下面我們就來(lái)拆解一下鸣驱。
????從上面的圖可以看到每個(gè)Map任務(wù)都會(huì)有一個(gè)緩沖區(qū)泛鸟,這個(gè)緩沖區(qū)會(huì)臨時(shí)
存儲(chǔ)map函數(shù)輸出的內(nèi)容,緩沖區(qū)的個(gè)大小默認(rèn)是100M踊东,我們可以通過(guò)mapreduce.task.io.sort.mb這個(gè)配置項(xiàng)配置北滥,
當(dāng)緩沖區(qū)中的內(nèi)容達(dá)到其設(shè)定的閾值(閾值的設(shè)置值是占整個(gè)緩沖區(qū)的大小,默認(rèn)為0.8闸翅,我們可以通過(guò)mapreduce.map.sort.spill.percent來(lái)配置)時(shí)就會(huì)產(chǎn)生溢出再芋,這個(gè)時(shí)候會(huì)有一個(gè)后臺(tái)線程將緩沖區(qū)中的內(nèi)容分區(qū)(根據(jù)最終要傳給的
Reduce任務(wù)分成不同的區(qū),分區(qū)的目的是將輸出劃分到不同的Reducer上去坚冀,后面的Reducer就會(huì)根據(jù)分區(qū)來(lái)讀取自己對(duì)應(yīng)的數(shù)據(jù))
然后區(qū)內(nèi)按照key排序济赎,如果我們?cè)O(shè)置了Combiner(Combiner的本質(zhì)也是一個(gè)Reducer,其目的是對(duì)將要寫入到磁盤上的文件先進(jìn)行一次處理记某,這樣司训,寫入到磁盤的數(shù)據(jù)量就會(huì)減少。)
的話液南,這個(gè)時(shí)候會(huì)運(yùn)行Combiner函數(shù)豁遭,最后再寫入磁盤。而在這個(gè)過(guò)程中Map任務(wù)還會(huì)繼續(xù)往緩沖區(qū)中輸出內(nèi)容贺拣,
如果出現(xiàn)緩沖區(qū)空間被占滿的情況蓖谢,Map任務(wù)就會(huì)阻塞直到緩沖區(qū)中的內(nèi)容被全部寫到磁盤中為止。
????每次緩沖區(qū)溢出時(shí)都會(huì)新建一個(gè)新的溢出文件譬涡,這樣最后其實(shí)是會(huì)出現(xiàn)多個(gè)溢出文件的闪幽,在Map任務(wù)結(jié)束前這些溢出文件會(huì)被合并到一個(gè)整的輸出文件。
Reduce端
Reduce端的Shuffle分為三個(gè)階段涡匀,復(fù)制階段盯腌、合并階段和Reduce。
????首先是復(fù)制階段陨瘩,Reduce任務(wù)需要集群上若干個(gè)map輸出作為其輸入內(nèi)容腕够,在每個(gè)Map任務(wù)完成完成的時(shí)候Reduce任務(wù)就開復(fù)制其輸出,
上面也提到過(guò)Map任務(wù)在寫入磁盤前會(huì)將輸出進(jìn)行根據(jù)Reduce任務(wù)進(jìn)行分區(qū)舌劳,所以這里Reduce任務(wù)在復(fù)制的時(shí)候只會(huì)復(fù)制自己的那個(gè)分區(qū)里的內(nèi)容帚湘。
如果Map的輸出非常小,那么Reduce會(huì)直接將其復(fù)制到內(nèi)存中甚淡,否則會(huì)被復(fù)制到磁盤大诸。
????合并階段,因?yàn)橛泻芏嗟腗ap任務(wù),所以Reduce復(fù)制過(guò)來(lái)的map輸出會(huì)有很多個(gè)资柔,在這個(gè)階段主要就是將這些Map輸出合并成為一個(gè)文件焙贷。
????Reduce階段,這個(gè)階段主要就是執(zhí)行我們的Reduce函數(shù)的代碼了贿堰,并產(chǎn)生最終的結(jié)果辙芍,然后寫入到HDFS中。
附
在整個(gè)Shuffle過(guò)程中涉及到很多的參數(shù)可以調(diào)整羹与,比如說(shuō)Map輸出的時(shí)候緩沖區(qū)的大小以及其閾值的大小沸手,還有Reduce執(zhí)行復(fù)制階段的線程數(shù)等等。我們?cè)谶@里羅列一下這些配置供大家參考注簿。
- Map端
屬性名稱 | 類型 | 默認(rèn)值 | 說(shuō)明 |
---|---|---|---|
mapreduce.task.io.sort.mb | int | 100 | Map輸出時(shí)所使用的緩沖區(qū)的大小契吉,單位為MB |
mapreduce.map.sort.spill.percent | float | 0.80 | 緩沖區(qū)的閾值,當(dāng)緩沖區(qū)中內(nèi)容達(dá)到這個(gè)閾值時(shí)會(huì)開始寫入磁盤的操作 |
mapreduce.task.io.sort.factor | int | 10 | 排序文件時(shí)诡渴,一次最多合并的流數(shù)捐晶,一般我們會(huì)將這個(gè)值提高到100 |
mapreduce.map.combine.minspills | int | 3 | 運(yùn)行Combiner所需的最少溢出文件數(shù) |
mapreduce.map.output.compress | boolean | false???? | 是否壓縮map輸出 |
mapreduce.map.output.compress.codec | Class Name | org.apache.hadoop.io.compress.DefaultCodec | 用于Map輸出壓縮的編解碼器 |
mapreduce.shuffle.max.threads | int | 0 | 每個(gè)節(jié)點(diǎn)管理器的工作線程數(shù),用于將map輸出到reducer妄辩。這個(gè)是集群范圍的設(shè)置惑灵,不能由單個(gè)作業(yè)設(shè)置。0的話表示使用Netty的默認(rèn)值眼耀,即兩倍的cpu數(shù)英支。 |
- Reduce端
屬性名稱 | 類型 | 默認(rèn)值 | 說(shuō)明 |
---|---|---|---|
mapreduce.reduce.shuffle.parallelcopies | int | 5 | 用于復(fù)制Map輸出到Reduce的線程數(shù) |
mapreduce.reduce.shuffle.maxfetchfailures | int | 10 | Reducer獲取一個(gè)Map輸出所花的最大時(shí)間 |
mapreduce.task.io.sort.factor | int | 10 | 合并Map輸入的時(shí)候,一次最多合并的流的數(shù)量 |
mapreduce.reduce.shuffle.input.buffer.percent | float | 0.7 | 在復(fù)制階段哮伟,分配給Map輸出的緩沖區(qū)占堆空間的百分比 |
mapreduce.reduce.shuffle.merge.percent | float | 0.66??? | Map輸出緩沖區(qū)的閾值使用比例干花,用于啟動(dòng)合并輸出和磁盤溢出寫的過(guò)程 |
mapreduce.reduce.merge.in.mem.threshold | int | 1000 | 啟動(dòng)合并輸出和磁盤溢出寫過(guò)程的Map輸出的閾值數(shù)。0或更小的數(shù)意味著沒(méi)有閾值限制楞黄,溢出寫行為由mapreduce.reduce.shuffle.merge.percent控制 |
mapreduce.reduce.input.buffer.percent | float | 0 | 在reduce過(guò)程中池凄,在內(nèi)存中保存Map輸出空間占整個(gè)堆空間的比例。Reduce階段開始時(shí)鬼廓,內(nèi)存中的Map輸出大小不能大于整個(gè)值肿仑。默認(rèn)情況下,在Reduce任務(wù)開始之前碎税,所有Map輸出都合并到磁盤上尤慰,以便為Reducer提供盡可能多的內(nèi)存。然而雷蹂,如果Reducer需要的內(nèi)存較少伟端,可以增加增加此值來(lái)最小化訪問(wèn)磁盤的次數(shù) |
對(duì)與后面四個(gè)配置項(xiàng)可能有點(diǎn)難的理解,其實(shí)前面也提到過(guò)萎河,在Reduce端Shuffle的復(fù)制階段其實(shí)先是將map的輸出復(fù)制到一個(gè)緩沖區(qū)荔泳,然后當(dāng)達(dá)到緩沖區(qū)的閾值或者M(jìn)ap輸出閾值時(shí)會(huì)逐步溢出寫到磁盤。其中緩沖區(qū)的大小就是由mapreduce.reduce.shuffle.input.buffer.percent配置項(xiàng)決定的虐杯,而緩沖區(qū)的閾值則由mapreduce.reduce.shuffle.merge.percent控制玛歌,Map輸出閾值則由mapreduce.reduce.merge.in.mem.threshold指定,也就是說(shuō)會(huì)由mapreduce.reduce.shuffle.merge.percent和mapreduce.reduce.merge.in.mem.threshold兩個(gè)配置項(xiàng)同時(shí)決定何時(shí)溢出到磁盤擎椰。真正的Reduce任務(wù)執(zhí)行時(shí)也就是三個(gè)過(guò)程中的Reduce階段會(huì)逐步從磁盤中讀取Map的輸出到內(nèi)存中的一個(gè)緩沖區(qū)支子,而最后的一個(gè)配置項(xiàng)mapreduce.reduce.input.buffer.percent指定的是這個(gè)緩沖區(qū)的大小,當(dāng)我們Reduce程序需要的內(nèi)存比較少的時(shí)候达舒,可以適當(dāng)調(diào)大該值以減少訪問(wèn)磁盤的次數(shù)值朋。