Hadoop5-Mapreduce shuffle及優(yōu)化

Hadoop-Mapreduce shuffle及優(yōu)化

轉載

MapReduce簡介

在Hadoop MapReduce中煤伟,框架會確保reduce收到的輸入數(shù)據是根據key排序過的。數(shù)據從Mapper輸出到Reducer接收,是一個很復雜的過程转唉,框架處理了所有問題网严,并提供了很多配置項及擴展點。一個MapReduce的大致數(shù)據流如下圖:

這里寫圖片描述

更詳細的MapReduce介紹參考Hadoop MapReduce原理與實例艳汽。

Mapper的輸出排序猴贰、然后傳送到Reducer的過程,稱為shuffle河狐。本文詳細地解析shuffle過程米绕,深入理解這個過程對于MapReduce調優(yōu)至關重要,某種程度上說馋艺,shuffle過程是MapReduce的核心內容栅干。

Mapper端

當map函數(shù)通過context.write()開始輸出數(shù)據時,不是單純地將數(shù)據寫入到磁盤丈钙。為了性能非驮,map輸出的數(shù)據會寫入到緩沖區(qū),并進行預排序的一些工作雏赦,整個過程如下圖:

這里寫圖片描述

環(huán)形Buffer數(shù)據結構

每一個map任務有一個環(huán)形Buffer劫笙,map將輸出寫入到這個Buffer。環(huán)形Buffer是內存中的一種首尾相連的數(shù)據結構星岗,專門用來存儲Key-Value格式的數(shù)據:

這里寫圖片描述

Hadoop中填大,環(huán)形緩沖其實就是一個字節(jié)數(shù)組:

// MapTask.java
private byte[] kvbuffer;  // main output buffer

kvbuffer = new byte[maxMemUsage - recordCapacity]; 1234

kvbuffer包含數(shù)據區(qū)和索引區(qū),這兩個區(qū)是相鄰不重疊的區(qū)域俏橘,用一個分界點來標識允华。分界點不是永恒不變的,每次Spill之后都會更新一次寥掐。初始分界點為0靴寂,數(shù)據存儲方向為向上增長,索引存儲方向向下:

這里寫圖片描述

bufferindex一直往上增長召耘,例如最初為0百炬,寫入一個int類型的key之后變?yōu)?,寫入一個int類型的value之后變成8污它。

索引是對key-value在kvbuffer中的索引剖踊,是個四元組庶弃,占用四個Int長度,包括:

  • value的起始位置
  • key的起始位置
  • partition值
  • value的長度
private static final int VALSTART = 0;    // val offset in acct
private static final int KEYSTART = 1;    // key offset in acct
private static final int PARTITION = 2;   // partition offset in acct
private static final int VALLEN = 3;      // length of value
private static final int NMETA = 4;       // num meta ints
private static final int METASIZE = NMETA * 4; // size in bytes
 // write accounting info
kvmeta.put(kvindex + PARTITION, partition);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));1234567891011

kvmeta的存放指針kvindex每次都是向下跳四個“格子”德澈,然后再向上一個格子一個格子地填充四元組的數(shù)據歇攻。比如kvindex初始位置是-4,當?shù)谝粋€key-value寫完之后梆造,(kvindex+0)的位置存放value的起始位置缴守、(kvindex+1)的位置存放key的起始位置、(kvindex+2)的位置存放partition的值澳窑、(kvindex+3)的位置存放value的長度斧散,然后kvindex跳到-8位置。

緩沖區(qū)的大小默認為100M摊聋,但是可以通過mapreduce.task.io.sort.mb這個屬性來配置鸡捐。

Spill

map將輸出不斷寫入到這個緩沖區(qū)中,當緩沖區(qū)使用量達到一定比例之后麻裁,一個后臺線程開始把緩沖區(qū)的數(shù)據寫入磁盤箍镜,這個寫入的過程叫spill。開始spill的Buffer比例默認為0.80煎源,可以通過mapreduce.map.sort.spill.percent配置色迂。在后臺線程寫入的同時,map繼續(xù)將輸出寫入這個環(huán)形緩沖手销,如果緩沖池寫滿了歇僧,map會阻塞直到spill過程完成,而不會覆蓋緩沖池中的已有的數(shù)據锋拖。

在寫入之前诈悍,后臺線程把數(shù)據按照他們將送往的reducer進行劃分,通過調用PartitionergetPartition()方法就能知道該輸出要送往哪個Reducer兽埃。默認的Partitioner使用Hash算法來分區(qū)侥钳,即通過key.hashCode() mode R來計算,R為Reducer的個數(shù)柄错。getPartition返回Partition事實上是個整數(shù)舷夺,例如有10個Reducer,則返回0-9的整數(shù)售貌,每個Reducer會對應到一個Partition给猾。map輸出的鍵值對,與partition一起存在緩沖中(即前面提到的kvmeta中)颂跨。假設作業(yè)有2個reduce任務敢伸,則數(shù)據在內存中被劃分為reduce1和reduce2:

這里寫圖片描述

并且針對每部分數(shù)據,使用快速排序算法(QuickSort)對key排序毫捣。

如果設置了Combiner详拙,則在排序的結果上運行combine。

排序后的數(shù)據被寫入到mapreduce.cluster.local.dir配置的目錄中的其中一個蔓同,使用round robin fashion的方式輪流饶辙。注意寫入的是本地文件目錄,而不是HDFS斑粱。Spill文件名像sipll0.out弃揽,spill1.out等。

不同Partition的數(shù)據都放在同一個文件则北,通過索引來區(qū)分partition的邊界和起始位置矿微。索引是一個三元組結構,包括起始位置尚揣、數(shù)據長度涌矢、壓縮后的數(shù)據長度,對應IndexRecord類:

public class IndexRecord {
  public long startOffset;
  public long rawLength;
  public long partLength;

  public IndexRecord() { }

  public IndexRecord(long startOffset, long rawLength, long partLength) {
    this.startOffset = startOffset;
    this.rawLength = rawLength;
    this.partLength = partLength;
  }
}12345678910111213

每個mapper也有對應的一個索引環(huán)形Buffer快骗,默認為1KB娜庇,可以通過mapreduce.task.index.cache.limit.bytes來配置,索引如果足夠小則存在內存中方篮,如果內存放不下名秀,需要寫入磁盤。
Spill文件索引名稱類似這樣 spill110.out.index, spill111.out.index藕溅。

Spill文件的索引事實上是 org.apache.hadoop.mapred.SpillRecord的一個數(shù)組匕得,每個Map任務(源碼中的MapTask.java類)維護一個這樣的列表:

final ArrayList<SpillRecord> indexCacheList = new ArrayList<SpillRecord>();1

創(chuàng)建一個SpillRecord時,會分配(Number_Of_Reducers * 24)Bytes緩沖:

public SpillRecord(int numPartitions) {
  buf = ByteBuffer.allocate(
      numPartitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
  entries = buf.asLongBuffer();
}12345

numPartitions是Partition的個數(shù)巾表,其實也就是Reducer的個數(shù):

public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;

// ---

partitions = jobContext.getNumReduceTasks();
final SpillRecord spillRec = new SpillRecord(partitions);123456

默認的索引緩沖為1KB汁掠,即10241024 Bytes,假設有2個Reducer攒发,則每個Spill文件的索引大小為224=48 Bytes调塌,當Spill文件超過21845.3時,索引文件就需要寫入磁盤惠猿。

索引及spill文件如下圖示意:

這里寫圖片描述

Spill的過程至少需要運行一次羔砾,因為Mapper的輸出結果必須要寫入磁盤,供Reducer進一步處理偶妖。

合并Spill文件

在整個map任務中姜凄,一旦緩沖達到設定的閾值,就會觸發(fā)spill操作趾访,寫入spill文件到磁盤态秧,因此最后可能有多個spill文件。在map任務結束之前扼鞋,這些文件會根據情況合并到一個大的分區(qū)的申鱼、排序的文件中愤诱,排序是在內存排序的基礎上進行全局排序。下圖是合并過程的簡單示意:

這里寫圖片描述

相對應的索引文件也會被合并捐友,以便在Reducer請求對應Partition的數(shù)據的時候能夠快速讀取淫半。

另外,如果spill文件數(shù)量大于mapreduce.map.combiner.minspills配置的數(shù)匣砖,則在合并文件寫入之前科吭,會再次運行combiner。如果spill文件數(shù)量太少猴鲫,運行combiner的收益可能小于調用的代價对人。

mapreduce.task.io.sort.factor屬性配置每次最多合并多少個文件,默認為10拂共,即一次最多合并10個spill文件牺弄。最后,多輪合并之后宜狐,所有的輸出文件被合并為唯一一個大文件猖闪,以及相應的索引文件(可能只在內存中存在)。

壓縮

在數(shù)據量大的時候肌厨,對map輸出進行壓縮通常是個好主意培慌。要啟用壓縮,將mapreduce.map.output.compress設為true柑爸,并使用mapreduce.map.output.compress.codec設置使用的壓縮算法吵护。

通過HTTP暴露輸出結果

map輸出數(shù)據完成之后,通過運行一個HTTP Server暴露出來表鳍,供reduce端獲取馅而。用來相應reduce數(shù)據請求的線程數(shù)量可以配置,默認情況下為機器內核數(shù)量的兩倍譬圣,如需自己配置瓮恭,通過mapreduce.shuffle.max.threads屬性來配置,注意該配置是針對NodeManager配置的厘熟,而不是每個作業(yè)配置屯蹦。

同時,Map任務完成后绳姨,也會通知Application Master登澜,以便Reducer能夠及時來拉取數(shù)據。

通過緩沖飘庄、劃分(partition)脑蠕、排序、combiner跪削、合并谴仙、壓縮等過程之后迂求,map端的工作就算完畢:

這里寫圖片描述

Reducer端

各個map任務運行完之后,輸出寫入運行任務的機器磁盤中晃跺。Reducer需要從各map任務中提取自己的那一部分數(shù)據(對應的partition)锁摔。每個map任務的完成時間可能是不一樣的,reduce任務在map任務結束之后會盡快取走輸出結果哼审,這個階段叫copy。
Reducer是如何知道要去哪些機器去數(shù)據呢孕豹?一旦map任務完成之后涩盾,就會通過常規(guī)心跳通知應用程序的Application Master。reduce的一個線程會周期性地向master詢問励背,直到提取完所有數(shù)據(如何知道提取完春霍?)。

數(shù)據被reduce提走之后叶眉,map機器不會立刻刪除數(shù)據址儒,這是為了預防reduce任務失敗需要重做。因此map輸出數(shù)據是在整個作業(yè)完成之后才被刪除掉的衅疙。

reduce維護幾個copier線程莲趣,并行地從map任務機器提取數(shù)據。默認情況下有5個copy線程饱溢,可以通過mapreduce.reduce.shuffle.parallelcopies配置喧伞。

如果map輸出的數(shù)據足夠小,則會被拷貝到reduce任務的JVM內存中绩郎。mapreduce.reduce.shuffle.input.buffer.percent配置JVM堆內存的多少比例可以用于存放map任務的輸出結果潘鲫。如果數(shù)據太大容不下,則被拷貝到reduce的機器磁盤上肋杖。

內存中合并

當緩沖中數(shù)據達到配置的閾值時溉仑,這些數(shù)據在內存中被合并、寫入機器磁盤状植。閾值有2種配置方式:

  • 配置內存比例: 前面提到reduce JVM堆內存的一部分用于存放來自map任務的輸入浊竟,在這基礎之上配置一個開始合并數(shù)據的比例。假設用于存放map輸出的內存為500M津畸,mapreduce.reduce.shuffle.merger.percent配置為0.80逐沙,則當內存中的數(shù)據達到400M的時候,會觸發(fā)合并寫入洼畅。
  • 配置map輸出數(shù)量: 通過mapreduce.reduce.merge.inmem.threshold配置吩案。

在合并的過程中,會對被合并的文件做全局的排序帝簇。如果作業(yè)配置了Combiner徘郭,則會運行combine函數(shù)靠益,減少寫入磁盤的數(shù)據量。

Copy過程中磁盤合并

在copy過來的數(shù)據不斷寫入磁盤的過程中残揉,一個后臺線程會把這些文件合并為更大的胧后、有序的文件。如果map的輸出結果進行了壓縮抱环,則在合并過程中壳快,需要在內存中解壓后才能給進行合并。這里的合并只是為了減少最終合并的工作量镇草,也就是在map輸出還在拷貝時眶痰,就開始進行一部分合并工作。合并的過程一樣會進行全局排序梯啤。

最終磁盤中合并

當所有map輸出都拷貝完畢之后竖伯,所有數(shù)據被最后合并成一個排序的文件,作為reduce任務的輸入因宇。這個合并過程是一輪一輪進行的七婴,最后一輪的合并結果直接推送給reduce作為輸入,節(jié)省了磁盤操作的一個來回察滑。最后(所以map輸出都拷貝到reduce之后)進行合并的map輸出可能來自合并后寫入磁盤的文件打厘,也可能來及內存緩沖,在最后寫入內存的map輸出可能沒有達到閾值觸發(fā)合并贺辰,所以還留在內存中婚惫。

每一輪合并并不一定合并平均數(shù)量的文件數(shù),指導原則是使用整個合并過程中寫入磁盤的數(shù)據量最小魂爪,為了達到這個目的先舷,則需要最終的一輪合并中合并盡可能多的數(shù)據,因為最后一輪的數(shù)據直接作為reduce的輸入滓侍,無需寫入磁盤再讀出蒋川。因此我們讓最終的一輪合并的文件數(shù)達到最大,即合并因子的值撩笆,通過mapreduce.task.io.sort.factor來配置捺球。

假設現(xiàn)在有50個map輸出文件,合并因子配置為10夕冲,則需要5輪的合并氮兵。最終的一輪確保合并10個文件,其中包括4個來自前4輪的合并結果歹鱼,因此原始的50個中泣栈,再留出6個給最終一輪。所以最后的5輪合并可能情況如下:

這里寫圖片描述

前4輪合并后的數(shù)據都是寫入到磁盤中的,注意到最后的2格顏色不一樣南片,是為了標明這些數(shù)據可能直接來自于內存掺涛。

MemToMem合并

除了內存中合并和磁盤中合并外,Hadoop還定義了一種MemToMem合并疼进,這種合并將內存中的map輸出合并薪缆,然后再寫入內存。這種合并默認關閉伞广,可以通過reduce.merge.memtomem.enabled打開拣帽,當map輸出文件達到reduce.merge.memtomem.threshold時,觸發(fā)這種合并嚼锄。

最后一次合并后傳遞給reduce方法

合并后的文件作為輸入傳遞給Reducer减拭,Reducer針對每個key及其排序的數(shù)據調用reduce函數(shù)。產生的reduce輸出一般寫入到HDFS灾票,reduce輸出的文件第一個副本寫入到當前運行reduce的機器,其他副本選址原則按照常規(guī)的HDFS數(shù)據寫入原則來進行茫虽,詳細信息請參考這里刊苍。

通過從map機器提取結果,合并濒析,combine之后正什,傳遞給reduce完成最后工作,整個過程也就差不多完成号杏。最后再感受一下下面這張圖:

這里寫圖片描述

性能調優(yōu)

如果能夠根據情況對shuffle過程進行調優(yōu)婴氮,對于提供MapReduce性能很有幫助。相關的參數(shù)配置列在后面的表格中盾致。

一個通用的原則是給shuffle過程分配盡可能大的內存主经,當然你需要確保map和reduce有足夠的內存來運行業(yè)務邏輯。因此在實現(xiàn)Mapper和Reducer時庭惜,應該盡量減少內存的使用罩驻,例如避免在Map中不斷地疊加。

運行map和reduce任務的JVM护赊,內存通過mapred.child.java.opts屬性來設置惠遏,盡可能設大內存。容器的內存大小通過mapreduce.map.memory.mbmapreduce.reduce.memory.mb來設置骏啰,默認都是1024M节吮。

map優(yōu)化

在map端,避免寫入多個spill文件可能達到最好的性能判耕,一個spill文件是最好的透绩。通過估計map的輸出大小,設置合理的mapreduce.task.io.sort.*屬性,使得spill文件數(shù)量最小渺贤。例如盡可能調大mapreduce.task.io.sort.mb雏胃。

map端相關的屬性如下表:

屬性名 值類型 默認值 說明
mapreduce.task.io.sort.mb int 100 用于map輸出排序的內存大小
mapreduce.map.sort.spill.percent float 0.80 開始spill的緩沖池閾值
mapreduce.task.io.sort.factor int 10 合并文件數(shù)最大值,與reduce共用
mapreduce.map.combine.minspills int 3 運行combiner的最低spill文件數(shù)
mapreduce.map.out.compress boolean false 輸出是否壓縮
mapreduce.map.out.compress 類名 DefaultCodec 壓縮算法
mapreduce.shuffle.max.threads int 0 服務于reduce提取結果的線程數(shù)量

reduce優(yōu)化

在reduce端志鞍,如果能夠讓所有數(shù)據都保存在內存中瞭亮,可以達到最佳的性能。通常情況下固棚,內存都保留給reduce函數(shù)统翩,但是如果reduce函數(shù)對內存需求不是很高,將mapreduce.reduce.merge.inmem.threshold(觸發(fā)合并的map輸出文件數(shù))設為0此洲,mapreduce.reduce.input.buffer.percent(用于保存map輸出文件的堆內存比例)設為1.0厂汗,可以達到很好的性能提升。在2008年的TB級別數(shù)據排序性能測試中呜师,Hadoop就是通過將reduce的中間數(shù)據都保存在內存中勝利的娶桦。

reduce端相關屬性:

屬性名 值類型 默認值 說明
mapreduce.reduce.shuffle.parallelcopies int 5 提取map輸出的copier線程數(shù)
mapreduce.reduce.shuffle.maxfetchfailures int 10 提取map輸出最大嘗試次數(shù),超出后報錯
mapreduce.task.io.sort.factor int 10 合并文件數(shù)最大值汁汗,與map共用
mapreduce.reduce.shuffle.input.buffer.percent float 0.70 copy階段用于保存map輸出的堆內存比例
mapreduce.reduce.shuffle.merge.percent float 0.66 開始spill的緩沖池比例閾值
mapreduce.reduce.shuffle.inmem.threshold int 1000 開始spill的map輸出文件數(shù)閾值衷畦,小于等于0表示沒有閾值,此時只由緩沖池比例來控制
mapreduce.reduce.input.buffer.percent float 0.0 reduce函數(shù)開始運行時知牌,內存中的map輸出所占的堆內存比例不得高于這個值祈争,默認情況內存都用于reduce函數(shù),也就是map輸出都寫入到磁盤

通用優(yōu)化

Hadoop默認使用4KB作為緩沖角寸,這個算是很小的菩混,可以通過io.file.buffer.size來調高緩沖池大小。

參考

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末扁藕,一起剝皮案震驚了整個濱河市沮峡,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌亿柑,老刑警劉巖帖烘,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異橄杨,居然都是意外死亡秘症,警方通過查閱死者的電腦和手機胁赢,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進店門偏螺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人缩赛,你說我怎么就攤上這事采转〈狭” “怎么了瞬痘?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長板熊。 經常有香客問我框全,道長,這世上最難降的妖魔是什么干签? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任津辩,我火速辦了婚禮,結果婚禮上容劳,老公的妹妹穿的比我還像新娘喘沿。我一直安慰自己,他們只是感情好竭贩,可當我...
    茶點故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布蚜印。 她就那樣靜靜地躺著,像睡著了一般留量。 火紅的嫁衣襯著肌膚如雪窄赋。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天楼熄,我揣著相機與錄音忆绰,去河邊找鬼。 笑死孝赫,一個胖子當著我的面吹牛较木,可吹牛的內容都是我干的红符。 我是一名探鬼主播青柄,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼预侯!你這毒婦竟也來了致开?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤萎馅,失蹤者是張志新(化名)和其女友劉穎双戳,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體糜芳,經...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡飒货,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了峭竣。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片塘辅。...
    茶點故事閱讀 38,094評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖皆撩,靈堂內的尸體忽然破棺而出扣墩,到底是詐尸還是另有隱情哲银,我是刑警寧澤,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布呻惕,位于F島的核電站荆责,受9級特大地震影響,放射性物質發(fā)生泄漏亚脆。R本人自食惡果不足惜做院,卻給世界環(huán)境...
    茶點故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望型酥。 院中可真熱鬧山憨,春花似錦、人聲如沸弥喉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽由境。三九已至棚亩,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間虏杰,已是汗流浹背讥蟆。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留纺阔,地道東北人瘸彤。 一個月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像笛钝,于是被迫代替她去往敵國和親质况。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內容