Hadoop-Mapreduce shuffle及優(yōu)化
MapReduce簡介
在Hadoop MapReduce中煤伟,框架會確保reduce收到的輸入數(shù)據是根據key排序過的。數(shù)據從Mapper輸出到Reducer接收,是一個很復雜的過程转唉,框架處理了所有問題网严,并提供了很多配置項及擴展點。一個MapReduce的大致數(shù)據流如下圖:
更詳細的MapReduce介紹參考Hadoop MapReduce原理與實例艳汽。
Mapper的輸出排序猴贰、然后傳送到Reducer的過程,稱為shuffle河狐。本文詳細地解析shuffle過程米绕,深入理解這個過程對于MapReduce調優(yōu)至關重要,某種程度上說馋艺,shuffle過程是MapReduce的核心內容栅干。
Mapper端
當map函數(shù)通過context.write()
開始輸出數(shù)據時,不是單純地將數(shù)據寫入到磁盤丈钙。為了性能非驮,map輸出的數(shù)據會寫入到緩沖區(qū),并進行預排序的一些工作雏赦,整個過程如下圖:
環(huán)形Buffer數(shù)據結構
每一個map任務有一個環(huán)形Buffer劫笙,map將輸出寫入到這個Buffer。環(huán)形Buffer是內存中的一種首尾相連的數(shù)據結構星岗,專門用來存儲Key-Value格式的數(shù)據:
Hadoop中填大,環(huán)形緩沖其實就是一個字節(jié)數(shù)組:
// MapTask.java
private byte[] kvbuffer; // main output buffer
kvbuffer = new byte[maxMemUsage - recordCapacity]; 1234
kvbuffer包含數(shù)據區(qū)和索引區(qū),這兩個區(qū)是相鄰不重疊的區(qū)域俏橘,用一個分界點來標識允华。分界點不是永恒不變的,每次Spill之后都會更新一次寥掐。初始分界點為0靴寂,數(shù)據存儲方向為向上增長,索引存儲方向向下:
bufferindex一直往上增長召耘,例如最初為0百炬,寫入一個int類型的key之后變?yōu)?,寫入一個int類型的value之后變成8污它。
索引是對key-value在kvbuffer中的索引剖踊,是個四元組庶弃,占用四個Int長度,包括:
- value的起始位置
- key的起始位置
- partition值
- value的長度
private static final int VALSTART = 0; // val offset in acct
private static final int KEYSTART = 1; // key offset in acct
private static final int PARTITION = 2; // partition offset in acct
private static final int VALLEN = 3; // length of value
private static final int NMETA = 4; // num meta ints
private static final int METASIZE = NMETA * 4; // size in bytes
// write accounting info
kvmeta.put(kvindex + PARTITION, partition);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));1234567891011
kvmeta的存放指針kvindex每次都是向下跳四個“格子”德澈,然后再向上一個格子一個格子地填充四元組的數(shù)據歇攻。比如kvindex初始位置是-4,當?shù)谝粋€key-value寫完之后梆造,(kvindex+0)的位置存放value的起始位置缴守、(kvindex+1)的位置存放key的起始位置、(kvindex+2)的位置存放partition的值澳窑、(kvindex+3)的位置存放value的長度斧散,然后kvindex跳到-8位置。
緩沖區(qū)的大小默認為100M摊聋,但是可以通過mapreduce.task.io.sort.mb
這個屬性來配置鸡捐。
Spill
map將輸出不斷寫入到這個緩沖區(qū)中,當緩沖區(qū)使用量達到一定比例之后麻裁,一個后臺線程開始把緩沖區(qū)的數(shù)據寫入磁盤箍镜,這個寫入的過程叫spill。開始spill的Buffer比例默認為0.80煎源,可以通過mapreduce.map.sort.spill.percent
配置色迂。在后臺線程寫入的同時,map繼續(xù)將輸出寫入這個環(huán)形緩沖手销,如果緩沖池寫滿了歇僧,map會阻塞直到spill過程完成,而不會覆蓋緩沖池中的已有的數(shù)據锋拖。
在寫入之前诈悍,后臺線程把數(shù)據按照他們將送往的reducer進行劃分,通過調用Partitioner
的getPartition()
方法就能知道該輸出要送往哪個Reducer兽埃。默認的Partitioner使用Hash算法來分區(qū)侥钳,即通過key.hashCode() mode R
來計算,R為Reducer的個數(shù)柄错。getPartition
返回Partition事實上是個整數(shù)舷夺,例如有10個Reducer,則返回0-9的整數(shù)售貌,每個Reducer會對應到一個Partition给猾。map輸出的鍵值對,與partition一起存在緩沖中(即前面提到的kvmeta中)颂跨。假設作業(yè)有2個reduce任務敢伸,則數(shù)據在內存中被劃分為reduce1和reduce2:
并且針對每部分數(shù)據,使用快速排序算法(QuickSort)對key排序毫捣。
如果設置了Combiner详拙,則在排序的結果上運行combine。
排序后的數(shù)據被寫入到mapreduce.cluster.local.dir
配置的目錄中的其中一個蔓同,使用round robin fashion的方式輪流饶辙。注意寫入的是本地文件目錄,而不是HDFS斑粱。Spill文件名像sipll0.out弃揽,spill1.out等。
不同Partition的數(shù)據都放在同一個文件则北,通過索引來區(qū)分partition的邊界和起始位置矿微。索引是一個三元組結構,包括起始位置尚揣、數(shù)據長度涌矢、壓縮后的數(shù)據長度,對應IndexRecord類:
public class IndexRecord {
public long startOffset;
public long rawLength;
public long partLength;
public IndexRecord() { }
public IndexRecord(long startOffset, long rawLength, long partLength) {
this.startOffset = startOffset;
this.rawLength = rawLength;
this.partLength = partLength;
}
}12345678910111213
每個mapper也有對應的一個索引環(huán)形Buffer快骗,默認為1KB娜庇,可以通過mapreduce.task.index.cache.limit.bytes
來配置,索引如果足夠小則存在內存中方篮,如果內存放不下名秀,需要寫入磁盤。
Spill文件索引名稱類似這樣 spill110.out.index, spill111.out.index藕溅。
Spill文件的索引事實上是 org.apache.hadoop.mapred.SpillRecord的一個數(shù)組匕得,每個Map任務(源碼中的MapTask.java類)維護一個這樣的列表:
final ArrayList<SpillRecord> indexCacheList = new ArrayList<SpillRecord>();1
創(chuàng)建一個SpillRecord時,會分配(Number_Of_Reducers * 24)Bytes緩沖:
public SpillRecord(int numPartitions) {
buf = ByteBuffer.allocate(
numPartitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
entries = buf.asLongBuffer();
}12345
numPartitions是Partition的個數(shù)巾表,其實也就是Reducer的個數(shù):
public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
// ---
partitions = jobContext.getNumReduceTasks();
final SpillRecord spillRec = new SpillRecord(partitions);123456
默認的索引緩沖為1KB汁掠,即10241024 Bytes,假設有2個Reducer攒发,則每個Spill文件的索引大小為224=48 Bytes调塌,當Spill文件超過21845.3時,索引文件就需要寫入磁盤惠猿。
索引及spill文件如下圖示意:
Spill的過程至少需要運行一次羔砾,因為Mapper的輸出結果必須要寫入磁盤,供Reducer進一步處理偶妖。
合并Spill文件
在整個map任務中姜凄,一旦緩沖達到設定的閾值,就會觸發(fā)spill操作趾访,寫入spill文件到磁盤态秧,因此最后可能有多個spill文件。在map任務結束之前扼鞋,這些文件會根據情況合并到一個大的分區(qū)的申鱼、排序的文件中愤诱,排序是在內存排序的基礎上進行全局排序。下圖是合并過程的簡單示意:
相對應的索引文件也會被合并捐友,以便在Reducer請求對應Partition的數(shù)據的時候能夠快速讀取淫半。
另外,如果spill文件數(shù)量大于mapreduce.map.combiner.minspills配置的數(shù)匣砖,則在合并文件寫入之前科吭,會再次運行combiner。如果spill文件數(shù)量太少猴鲫,運行combiner的收益可能小于調用的代價对人。
mapreduce.task.io.sort.factor屬性配置每次最多合并多少個文件,默認為10拂共,即一次最多合并10個spill文件牺弄。最后,多輪合并之后宜狐,所有的輸出文件被合并為唯一一個大文件猖闪,以及相應的索引文件(可能只在內存中存在)。
壓縮
在數(shù)據量大的時候肌厨,對map輸出進行壓縮通常是個好主意培慌。要啟用壓縮,將mapreduce.map.output.compress
設為true柑爸,并使用mapreduce.map.output.compress.codec
設置使用的壓縮算法吵护。
通過HTTP暴露輸出結果
map輸出數(shù)據完成之后,通過運行一個HTTP Server暴露出來表鳍,供reduce端獲取馅而。用來相應reduce數(shù)據請求的線程數(shù)量可以配置,默認情況下為機器內核數(shù)量的兩倍譬圣,如需自己配置瓮恭,通過mapreduce.shuffle.max.threads
屬性來配置,注意該配置是針對NodeManager配置的厘熟,而不是每個作業(yè)配置屯蹦。
同時,Map任務完成后绳姨,也會通知Application Master登澜,以便Reducer能夠及時來拉取數(shù)據。
通過緩沖飘庄、劃分(partition)脑蠕、排序、combiner跪削、合并谴仙、壓縮等過程之后迂求,map端的工作就算完畢:
Reducer端
各個map任務運行完之后,輸出寫入運行任務的機器磁盤中晃跺。Reducer需要從各map任務中提取自己的那一部分數(shù)據(對應的partition)锁摔。每個map任務的完成時間可能是不一樣的,reduce任務在map任務結束之后會盡快取走輸出結果哼审,這個階段叫copy。
Reducer是如何知道要去哪些機器去數(shù)據呢孕豹?一旦map任務完成之后涩盾,就會通過常規(guī)心跳通知應用程序的Application Master。reduce的一個線程會周期性地向master詢問励背,直到提取完所有數(shù)據(如何知道提取完春霍?)。
數(shù)據被reduce提走之后叶眉,map機器不會立刻刪除數(shù)據址儒,這是為了預防reduce任務失敗需要重做。因此map輸出數(shù)據是在整個作業(yè)完成之后才被刪除掉的衅疙。
reduce維護幾個copier線程莲趣,并行地從map任務機器提取數(shù)據。默認情況下有5個copy線程饱溢,可以通過mapreduce.reduce.shuffle.parallelcopies
配置喧伞。
如果map輸出的數(shù)據足夠小,則會被拷貝到reduce任務的JVM內存中绩郎。mapreduce.reduce.shuffle.input.buffer.percent
配置JVM堆內存的多少比例可以用于存放map任務的輸出結果潘鲫。如果數(shù)據太大容不下,則被拷貝到reduce的機器磁盤上肋杖。
內存中合并
當緩沖中數(shù)據達到配置的閾值時溉仑,這些數(shù)據在內存中被合并、寫入機器磁盤状植。閾值有2種配置方式:
- 配置內存比例: 前面提到reduce JVM堆內存的一部分用于存放來自map任務的輸入浊竟,在這基礎之上配置一個開始合并數(shù)據的比例。假設用于存放map輸出的內存為500M津畸,
mapreduce.reduce.shuffle.merger.percent
配置為0.80逐沙,則當內存中的數(shù)據達到400M的時候,會觸發(fā)合并寫入洼畅。 - 配置map輸出數(shù)量: 通過
mapreduce.reduce.merge.inmem.threshold
配置吩案。
在合并的過程中,會對被合并的文件做全局的排序帝簇。如果作業(yè)配置了Combiner徘郭,則會運行combine函數(shù)靠益,減少寫入磁盤的數(shù)據量。
Copy過程中磁盤合并
在copy過來的數(shù)據不斷寫入磁盤的過程中残揉,一個后臺線程會把這些文件合并為更大的胧后、有序的文件。如果map的輸出結果進行了壓縮抱环,則在合并過程中壳快,需要在內存中解壓后才能給進行合并。這里的合并只是為了減少最終合并的工作量镇草,也就是在map輸出還在拷貝時眶痰,就開始進行一部分合并工作。合并的過程一樣會進行全局排序梯啤。
最終磁盤中合并
當所有map輸出都拷貝完畢之后竖伯,所有數(shù)據被最后合并成一個排序的文件,作為reduce任務的輸入因宇。這個合并過程是一輪一輪進行的七婴,最后一輪的合并結果直接推送給reduce作為輸入,節(jié)省了磁盤操作的一個來回察滑。最后(所以map輸出都拷貝到reduce之后)進行合并的map輸出可能來自合并后寫入磁盤的文件打厘,也可能來及內存緩沖,在最后寫入內存的map輸出可能沒有達到閾值觸發(fā)合并贺辰,所以還留在內存中婚惫。
每一輪合并并不一定合并平均數(shù)量的文件數(shù),指導原則是使用整個合并過程中寫入磁盤的數(shù)據量最小魂爪,為了達到這個目的先舷,則需要最終的一輪合并中合并盡可能多的數(shù)據,因為最后一輪的數(shù)據直接作為reduce的輸入滓侍,無需寫入磁盤再讀出蒋川。因此我們讓最終的一輪合并的文件數(shù)達到最大,即合并因子的值撩笆,通過mapreduce.task.io.sort.factor
來配置捺球。
假設現(xiàn)在有50個map輸出文件,合并因子配置為10夕冲,則需要5輪的合并氮兵。最終的一輪確保合并10個文件,其中包括4個來自前4輪的合并結果歹鱼,因此原始的50個中泣栈,再留出6個給最終一輪。所以最后的5輪合并可能情況如下:
前4輪合并后的數(shù)據都是寫入到磁盤中的,注意到最后的2格顏色不一樣南片,是為了標明這些數(shù)據可能直接來自于內存掺涛。
MemToMem合并
除了內存中合并和磁盤中合并外,Hadoop還定義了一種MemToMem合并疼进,這種合并將內存中的map輸出合并薪缆,然后再寫入內存。這種合并默認關閉伞广,可以通過reduce.merge.memtomem.enabled
打開拣帽,當map輸出文件達到reduce.merge.memtomem.threshold
時,觸發(fā)這種合并嚼锄。
最后一次合并后傳遞給reduce方法
合并后的文件作為輸入傳遞給Reducer减拭,Reducer針對每個key及其排序的數(shù)據調用reduce函數(shù)。產生的reduce輸出一般寫入到HDFS灾票,reduce輸出的文件第一個副本寫入到當前運行reduce的機器,其他副本選址原則按照常規(guī)的HDFS數(shù)據寫入原則來進行茫虽,詳細信息請參考這里刊苍。
通過從map機器提取結果,合并濒析,combine之后正什,傳遞給reduce完成最后工作,整個過程也就差不多完成号杏。最后再感受一下下面這張圖:
性能調優(yōu)
如果能夠根據情況對shuffle過程進行調優(yōu)婴氮,對于提供MapReduce性能很有幫助。相關的參數(shù)配置列在后面的表格中盾致。
一個通用的原則是給shuffle過程分配盡可能大的內存主经,當然你需要確保map和reduce有足夠的內存來運行業(yè)務邏輯。因此在實現(xiàn)Mapper和Reducer時庭惜,應該盡量減少內存的使用罩驻,例如避免在Map中不斷地疊加。
運行map和reduce任務的JVM护赊,內存通過mapred.child.java.opts
屬性來設置惠遏,盡可能設大內存。容器的內存大小通過mapreduce.map.memory.mb
和mapreduce.reduce.memory.mb
來設置骏啰,默認都是1024M节吮。
map優(yōu)化
在map端,避免寫入多個spill文件可能達到最好的性能判耕,一個spill文件是最好的透绩。通過估計map的輸出大小,設置合理的mapreduce.task.io.sort.*
屬性,使得spill文件數(shù)量最小渺贤。例如盡可能調大mapreduce.task.io.sort.mb
雏胃。
map端相關的屬性如下表:
屬性名 | 值類型 | 默認值 | 說明 |
---|---|---|---|
mapreduce.task.io.sort.mb | int | 100 | 用于map輸出排序的內存大小 |
mapreduce.map.sort.spill.percent | float | 0.80 | 開始spill的緩沖池閾值 |
mapreduce.task.io.sort.factor | int | 10 | 合并文件數(shù)最大值,與reduce共用 |
mapreduce.map.combine.minspills | int | 3 | 運行combiner的最低spill文件數(shù) |
mapreduce.map.out.compress | boolean | false | 輸出是否壓縮 |
mapreduce.map.out.compress | 類名 | DefaultCodec | 壓縮算法 |
mapreduce.shuffle.max.threads | int | 0 | 服務于reduce提取結果的線程數(shù)量 |
reduce優(yōu)化
在reduce端志鞍,如果能夠讓所有數(shù)據都保存在內存中瞭亮,可以達到最佳的性能。通常情況下固棚,內存都保留給reduce函數(shù)统翩,但是如果reduce函數(shù)對內存需求不是很高,將mapreduce.reduce.merge.inmem.threshold
(觸發(fā)合并的map輸出文件數(shù))設為0此洲,mapreduce.reduce.input.buffer.percent
(用于保存map輸出文件的堆內存比例)設為1.0厂汗,可以達到很好的性能提升。在2008年的TB級別數(shù)據排序性能測試中呜师,Hadoop就是通過將reduce的中間數(shù)據都保存在內存中勝利的娶桦。
reduce端相關屬性:
屬性名 | 值類型 | 默認值 | 說明 |
---|---|---|---|
mapreduce.reduce.shuffle.parallelcopies | int | 5 | 提取map輸出的copier線程數(shù) |
mapreduce.reduce.shuffle.maxfetchfailures | int | 10 | 提取map輸出最大嘗試次數(shù),超出后報錯 |
mapreduce.task.io.sort.factor | int | 10 | 合并文件數(shù)最大值汁汗,與map共用 |
mapreduce.reduce.shuffle.input.buffer.percent | float | 0.70 | copy階段用于保存map輸出的堆內存比例 |
mapreduce.reduce.shuffle.merge.percent | float | 0.66 | 開始spill的緩沖池比例閾值 |
mapreduce.reduce.shuffle.inmem.threshold | int | 1000 | 開始spill的map輸出文件數(shù)閾值衷畦,小于等于0表示沒有閾值,此時只由緩沖池比例來控制 |
mapreduce.reduce.input.buffer.percent | float | 0.0 | reduce函數(shù)開始運行時知牌,內存中的map輸出所占的堆內存比例不得高于這個值祈争,默認情況內存都用于reduce函數(shù),也就是map輸出都寫入到磁盤 |
通用優(yōu)化
Hadoop默認使用4KB作為緩沖角寸,這個算是很小的菩混,可以通過io.file.buffer.size
來調高緩沖池大小。