要知道怎么對MapReduce作業(yè)進(jìn)行調(diào)優(yōu)前提條件是需要對Map-Reduce的過程了然于胸堕伪。
Map Side
1.從磁盤讀取數(shù)據(jù)并分片
默認(rèn)每個(gè)block對應(yīng)一個(gè)分片,一個(gè)map task
2.進(jìn)行map處理
運(yùn)行自定義的map業(yè)務(wù)過程
3.輸出數(shù)據(jù)到緩沖區(qū)中
map輸出的數(shù)據(jù)并不是直接寫入磁盤的薇搁,而是會(huì)先存儲(chǔ)在一個(gè)預(yù)定義的buffer中
4笑窜、分區(qū)、排序分組的過程
對map輸出的數(shù)據(jù)進(jìn)行分區(qū)莲祸,按照key進(jìn)行排序和分組
5憔狞、歸約(可選)
相當(dāng)于本地端的reduce過程
6蝴悉、合并寫入磁盤
對map的最終數(shù)據(jù)進(jìn)行merge之后輸出到磁盤中等待shuffle過程
Reduce side
1.從map端復(fù)制數(shù)據(jù)
2.對數(shù)據(jù)進(jìn)行合并
以上兩個(gè)步驟即為shuffle過程
3.對數(shù)據(jù)進(jìn)行排序
4.進(jìn)行reduce操作
5.輸出到磁盤
詳細(xì)的過程將會(huì)在調(diào)優(yōu)技巧中體現(xiàn)出來
最簡單的調(diào)優(yōu)方式
設(shè)置Combiner
Combiner在Map端提前進(jìn)行了一次Reduce處理。
可減少M(fèi)ap Task中間輸出的結(jié)果瘾敢,從而減少各個(gè)Reduce Task的遠(yuǎn)程拷貝數(shù)據(jù)量拍冠,最終表現(xiàn)為Map Task和Reduce Task執(zhí)行時(shí)間縮短。
選擇合理的Writable類型
為應(yīng)用程序處理的數(shù)據(jù)選擇合適的Writable類型可大大提升性能簇抵。
比如處理整數(shù)類型數(shù)據(jù)時(shí)倦微,直接采用IntWritable比先以Text類型讀入在轉(zhuǎn)換為整數(shù)類型要高效。
如果輸出整數(shù)的大部分可用一個(gè)或兩個(gè)字節(jié)保存正压,那么直接采用VIntWritable或者VLongWritable欣福,它們采用了變長整型的編碼方式,可以大大減少輸出數(shù)據(jù)量焦履。
作業(yè)級別調(diào)優(yōu)
增加輸入文件的副本數(shù)
假設(shè)集群有1個(gè)Namenode+8個(gè)Datanode節(jié)點(diǎn)拓劝,HDFS默認(rèn)的副本數(shù)為3
那么map端讀取數(shù)據(jù)的時(shí)候,在啟動(dòng)map task的機(jī)器上讀取本地的數(shù)據(jù)為3/8嘉裤,一部分?jǐn)?shù)據(jù)是通過網(wǎng)絡(luò)從其他節(jié)點(diǎn)拿到的
那么如果副本數(shù)設(shè)置為8會(huì)是什么情況郑临?
相當(dāng)于每個(gè)子節(jié)點(diǎn)上都會(huì)有一份完整的數(shù)據(jù),map讀取的時(shí)候直接從本地拿屑宠,不需要通過網(wǎng)絡(luò)這一層了
但是在實(shí)際情況中設(shè)置副本數(shù)為8是不可行的厢洞,因?yàn)閿?shù)據(jù)本身非常龐大,副本數(shù)超過5對集群的磁盤就非常有壓力了,所以這項(xiàng)設(shè)置需要酌情處理
該配置在hdfs-side.xml的dfs.replication項(xiàng)中設(shè)置
Map side tuning
InputFormat
這是map階段的第一步躺翻,從磁盤讀取數(shù)據(jù)并切片丧叽,每個(gè)分片由一個(gè)map task處理
當(dāng)輸入的是海量的小文件的時(shí)候,會(huì)啟動(dòng)大量的map task公你,效率及其之慢踊淳,有效的解決方式是使用CombineInputFormat自定義分片策略對小文件進(jìn)行合并處理
從而減少map task的數(shù)量,減少map過程使用的時(shí)間
詳情請看:自定義分片策略解決大量小文件問題
另外陕靠,map task的啟動(dòng)數(shù)量也和下面這幾個(gè)參數(shù)有關(guān)系:
?mapred.min.split.size:Input Split的最小值 默認(rèn)值1
?mapred.max.split.size:Input Split的最大值
?dfs.block.size:HDFS 中一個(gè)block大小迂尝,默認(rèn)值128MB
當(dāng)mapred.min.split.size小于dfs.block.size的時(shí)候,一個(gè)block會(huì)被分為多個(gè)分片剪芥,也就是對應(yīng)多個(gè)map task
當(dāng)mapred.min.split.size大于dfs.block.size的時(shí)候垄开,一個(gè)分片可能對應(yīng)多個(gè)block,也就是一個(gè)map task讀取多個(gè)block數(shù)據(jù)
集群的網(wǎng)絡(luò)税肪、IO等性能很好的時(shí)候溉躲,建議調(diào)高dfs.block.size
根據(jù)數(shù)據(jù)源的特性,主要調(diào)整mapred.min.split.size來控制map task的數(shù)量
Buffer
該階段是map side中將結(jié)果輸出到磁盤之前的一個(gè)處理方式寸认,通過對其進(jìn)行設(shè)置的話可以減少map任務(wù)的IO開銷,從而提高性能
由于map任務(wù)運(yùn)行時(shí)中間結(jié)果首先存儲(chǔ)在buffer中,默認(rèn)當(dāng)緩存的使用量達(dá)到80%的時(shí)候就開始寫入磁盤,這個(gè)過程叫做spill(溢出)
這個(gè)buffer默認(rèn)的大小是100M可以通過設(shè)定io.sort.mb的值來進(jìn)行調(diào)整
當(dāng)map產(chǎn)生的數(shù)據(jù)非常大時(shí)串慰,如果默認(rèn)的buffer大小不夠看偏塞,那么勢必會(huì)進(jìn)行非常多次的spill,進(jìn)行spill就意味著要寫磁盤邦鲫,產(chǎn)生IO開銷
這時(shí)候就可以把io.sort.mb調(diào)大灸叼,那么map在整個(gè)計(jì)算過程中spill的次數(shù)就勢必會(huì)降低,map task對磁盤的操作就會(huì)變少
如果map tasks的瓶頸在磁盤上庆捺,這樣調(diào)整就會(huì)大大提高map的計(jì)算性能
但是如果將io.sort.mb調(diào)的非常大的時(shí)候古今,對機(jī)器的配置要求就非常高,因?yàn)檎加脙?nèi)存過大滔以,所以需要根據(jù)情況進(jìn)行配置
map并不是要等到buffer全部寫滿時(shí)才進(jìn)行spill捉腥,因?yàn)槿绻繉憹M了再去寫spill,勢必會(huì)造成map的計(jì)算部分等待buffer釋放空間的情況你画。
所以抵碟,map其實(shí)是當(dāng)buffer被寫滿到一定程度(比如80%)時(shí),才開始進(jìn)行spill
可以通過設(shè)置io.sort.spill.percent的值來調(diào)整這個(gè)閾值
這個(gè)參數(shù)同樣也是影響spill頻繁程度坏匪,進(jìn)而影響map task運(yùn)行周期對磁盤的讀寫頻率
但是通常情況下只需要對io.sort.mb進(jìn)行調(diào)整即可
Merge
該階段是map產(chǎn)生spill之后拟逮,對spill進(jìn)行處理的過程,通過對其進(jìn)行配置也可以達(dá)到優(yōu)化IO開銷的目的
map產(chǎn)生spill之后必須將些spill進(jìn)行合并,這個(gè)過程叫做merge
merge過程是并行處理spill的,每次并行多少個(gè)spill是由參數(shù)io.sort.factor指定的,默認(rèn)為10個(gè)
如果產(chǎn)生的spill非常多适滓,merge的時(shí)候每次只能處理10個(gè)spill敦迄,那么還是會(huì)造成頻繁的IO處理
適當(dāng)?shù)恼{(diào)大每次并行處理的spill數(shù)有利于減少merge數(shù)因此可以影響map的性能
但是如果調(diào)整的數(shù)值過大,并行處理spill的進(jìn)程過多會(huì)對機(jī)器造成很大壓力
Combine
我們知道如果map side設(shè)置了Combiner,那么會(huì)根據(jù)設(shè)定的函數(shù)對map輸出的數(shù)據(jù)進(jìn)行一次類reduce的預(yù)處理
但是和分組罚屋、排序分組不一樣的是苦囱,combine發(fā)生的階段可能是在merge之前,也可能是在merge之后
這個(gè)時(shí)機(jī)可以由一個(gè)參數(shù)控制:min.num.spill.for.combine沿后,默認(rèn)值為3
當(dāng)job中設(shè)定了combiner沿彭,并且spill數(shù)最少有3個(gè)的時(shí)候,那么combiner函數(shù)就會(huì)在merge產(chǎn)生結(jié)果文件之前運(yùn)行
例如尖滚,產(chǎn)生的spill非常多喉刘,雖然我們可以通過merge階段的io.sort.factor進(jìn)行優(yōu)化配置,但是在此之前我們還可以通過先執(zhí)行combine對結(jié)果進(jìn)行處理之后再對數(shù)據(jù)進(jìn)行merge
這樣一來漆弄,到merge階段的數(shù)據(jù)量將會(huì)進(jìn)一步減少睦裳,IO開銷也會(huì)被降到最低
輸出中間數(shù)據(jù)到磁盤
這個(gè)階段是map side的最后一個(gè)步驟,在這個(gè)步驟中也可以通過壓縮選項(xiàng)的配置來得到任務(wù)的優(yōu)化
其實(shí)無論是spill的時(shí)候撼唾,還是最后merge產(chǎn)生的結(jié)果文件廉邑,都是可以壓縮的
壓縮的好處在于,通過壓縮減少寫入讀出磁盤的數(shù)據(jù)量倒谷。對中間結(jié)果非常大蛛蒙,磁盤速度成為map執(zhí)行瓶頸的job,尤其有用
控制輸出是否使用壓縮的參數(shù)是mapred.compress.map.output渤愁,值為true或者false
啟用壓縮之后牵祟,會(huì)犧牲CPU的一些計(jì)算資源,但是可以節(jié)省IO開銷抖格,非常適合IO密集型的作業(yè)(如果是CPU密集型的作業(yè)不建議設(shè)置)
設(shè)置壓縮的時(shí)候诺苹,我們可以選擇不同的壓縮算法
Hadoop默認(rèn)提供了GzipCodec,LzoCodec雹拄,BZip2Codec收奔,LzmaCodec等壓縮格式
通常來說,想要達(dá)到比較平衡的cpu和磁盤壓縮比滓玖,LzoCodec比較合適坪哄,但也要取決于job的具體情況
如果想要自行選擇中間結(jié)果的壓縮算法,可以設(shè)置配置參數(shù):
mapred.map.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec
//或者其他用戶自行選擇的壓縮方式1
2
Map side tuning總結(jié)
從上面提到的幾點(diǎn)可以看到势篡,map端的性能瓶頸都是頻繁的IO操作造成的损姜,所有的優(yōu)化也都是針對IO進(jìn)行的,而優(yōu)化的瓶頸又很大程度上被機(jī)器的配置等外部因素所限制
map端調(diào)優(yōu)的相關(guān)參數(shù):
選項(xiàng)
類型
默認(rèn)值
描述
mapred.min.split.size int 1 Input Split的最小值
mapred.max.split.size int . Input Split的最大值
io.sort.mb int 100 map緩沖區(qū)大小
io.sort.spill.percent float 0.8 緩沖區(qū)閾值
io.sort.factor int 10 并行處理spill的個(gè)數(shù)
min.num.spill.for.combine int 3 最少有多少個(gè)spill的時(shí)候combine在merge之前進(jìn)行
mapred.compress.map.output boolean false map中間數(shù)據(jù)是否采用壓縮
mapred.map.output.compression.codec String . 壓縮算法
Reduce side tuning
Shuffle
1.Copy
由于job的每一個(gè)map都會(huì)根據(jù)reduce(n)數(shù)將數(shù)據(jù)分成map 輸出結(jié)果分成n個(gè)partition殊霞,所以map的中間結(jié)果中是有可能包含每一個(gè)reduce需要處理的部分?jǐn)?shù)據(jù)的
為了優(yōu)化reduce的執(zhí)行時(shí)間摧阅,hadoop中等第一個(gè)map結(jié)束后,所有的reduce就開始嘗試從完成的map中下載該reduce對應(yīng)的partition部分?jǐn)?shù)據(jù)
在這個(gè)shuffle過程中绷蹲,由于map的數(shù)量通常是很多個(gè)的棒卷,而每個(gè)map中又都有可能包含每個(gè)reduce所需要的數(shù)據(jù)
所以對于每個(gè)reduce來說顾孽,去各個(gè)map中拿數(shù)據(jù)也是并行的,可以通過mapred.reduce.parallel.copies這個(gè)參數(shù)來調(diào)整比规,默認(rèn)為5
當(dāng)map數(shù)量很多的時(shí)候若厚,就可以適當(dāng)調(diào)大這個(gè)值,減少shuffle過程使用的時(shí)間
還有一種情況是:reduce從map中拿數(shù)據(jù)的時(shí)候蜒什,有可能因?yàn)橹虚g結(jié)果丟失测秸、網(wǎng)絡(luò)等其他原因?qū)е耺ap任務(wù)失敗
而reduce不會(huì)因?yàn)閙ap失敗就永無止境的等待下去,它會(huì)嘗試去別的地方獲得自己的數(shù)據(jù)(這段時(shí)間失敗的map可能會(huì)被重跑)
所以設(shè)置reduce獲取數(shù)據(jù)的超時(shí)時(shí)間可以避免一些因?yàn)榫W(wǎng)絡(luò)不好導(dǎo)致無法獲得數(shù)據(jù)的情況
mapred.reduce.copy.backoff灾常,默認(rèn)300s
一般情況下不用調(diào)整這個(gè)值霎冯,因?yàn)樯a(chǎn)環(huán)境的網(wǎng)絡(luò)都是很流暢的
2.Merge
由于reduce是并行將map結(jié)果下載到本地,所以也是需要進(jìn)行merge的钞瀑,所以io.sort.factor的配置選項(xiàng)同樣會(huì)影響reduce進(jìn)行merge時(shí)的行為
和map一樣沈撞,reduce下載過來的數(shù)據(jù)也是存入一個(gè)buffer中而不是馬上寫入磁盤的,所以我們同樣可以控制這個(gè)值來減少IO開銷
控制該值的參數(shù)為:
mapred.job.shuffle.input.buffer.percent雕什,默認(rèn)0.7缠俺,這是一個(gè)百分比,意思是reduce的可用內(nèi)存中拿出70%作為buffer存放數(shù)據(jù)
reduce的可用內(nèi)存通過mapred.child.java.opts來設(shè)置贷岸,比如置為-Xmx1024m壹士,該參數(shù)是同時(shí)設(shè)定map和reduce task的可用內(nèi)存,一般為map buffer大小的兩倍左右
設(shè)置了reduce端的buffer大小偿警,我們同樣可以通過一個(gè)參數(shù)來控制buffer中的數(shù)據(jù)達(dá)到一個(gè)閾值的時(shí)候開始往磁盤寫數(shù)據(jù):mapred.job.shuffle.merge.percent躏救,默認(rèn)為0.66
Sort
sort的過程一般非常短,因?yàn)槭沁卌opy邊merge邊sort的户敬,后面就直接進(jìn)入真正的reduce計(jì)算階段了
Reduce
之前我們說過reduc端的buffer落剪,默認(rèn)情況下睁本,數(shù)據(jù)達(dá)到一個(gè)閾值的時(shí)候尿庐,buffer中的數(shù)據(jù)就會(huì)寫入磁盤,然后reduce會(huì)從磁盤中獲得所有的數(shù)據(jù)
也就是說呢堰,buffer和reduce是沒有直接關(guān)聯(lián)的抄瑟,中間多個(gè)一個(gè)寫磁盤->讀磁盤的過程,既然有這個(gè)弊端枉疼,那么就可以通過參數(shù)來配置
使得buffer中的一部分?jǐn)?shù)據(jù)可以直接輸送到reduce皮假,從而減少IO開銷:mapred.job.reduce.input.buffer.percent,默認(rèn)為0.0
當(dāng)值大于0的時(shí)候骂维,會(huì)保留指定比例的內(nèi)存讀buffer中的數(shù)據(jù)直接拿給reduce使用
這樣一來惹资,設(shè)置buffer需要內(nèi)存,讀取數(shù)據(jù)需要內(nèi)存航闺,reduce計(jì)算也要內(nèi)存褪测,所以要根據(jù)作業(yè)的運(yùn)行情況進(jìn)行調(diào)整
Reduce side tuning總結(jié)
和map階段差不多猴誊,reduce節(jié)點(diǎn)的調(diào)優(yōu)也是主要集中在加大內(nèi)存使用量,減少IO侮措,增大并行數(shù)
reduce調(diào)優(yōu)主要參數(shù):
選項(xiàng)
類型
默認(rèn)值
描述
mapred.reduce.parallel.copies int 5 每個(gè)reduce去map中拿數(shù)據(jù)的并行數(shù)
mapred.reduce.copy.backoff int 300 獲取map數(shù)據(jù)最大超時(shí)時(shí)間
mapred.job.shuffle.input.buffer.percent float 0.7 buffer大小占reduce可用內(nèi)存的比例
mapred.child.java.opts String . -Xmx1024m設(shè)置reduce可用內(nèi)存為1g
mapred.job.shuffle.merge.percent float 0.66 buffer中的數(shù)據(jù)達(dá)到多少比例開始寫入磁盤
mapred.job.reduce.input.buffer.percent float 0.0 指定多少比例的內(nèi)存用來存放buffer中的數(shù)據(jù)
MapReduce tuning總結(jié)
Map Task和Reduce Task調(diào)優(yōu)的一個(gè)原則就是
減少數(shù)據(jù)的傳輸量
盡量使用內(nèi)存
減少磁盤IO的次數(shù)
增大任務(wù)并行數(shù)
除此之外還有根據(jù)自己集群及網(wǎng)絡(luò)的實(shí)際情況來調(diào)優(yōu)
Map task和Reduce task的啟動(dòng)數(shù)
在集群部署完畢之后懈叹,根據(jù)機(jī)器的配置情況,我們就可以通過一定的公式知道每個(gè)節(jié)點(diǎn)上container的大小和數(shù)量
1.mapper數(shù)量
每個(gè)作業(yè)啟動(dòng)的mapper由輸入的分片數(shù)決定分扎,每個(gè)節(jié)點(diǎn)啟動(dòng)的mapper數(shù)應(yīng)該是在10-100之間澄成,且最好每個(gè)map的執(zhí)行時(shí)間至少一分鐘
如果輸入的文件巨大,會(huì)產(chǎn)生無數(shù)個(gè)mapper的情況畏吓,應(yīng)該使用mapred.tasktracker.map.tasks.maximum參數(shù)確定每個(gè)tasktracker能夠啟動(dòng)的最大mapper數(shù)墨状,默認(rèn)只有2
以免同時(shí)啟動(dòng)過多的mapper
2.reducer數(shù)量
reducer的啟動(dòng)數(shù)量官方建議是0.95或者1.75*節(jié)點(diǎn)數(shù)*每個(gè)節(jié)點(diǎn)的container數(shù)
使用0.95的時(shí)候reduce只需要一輪就可以完成
使用1.75的時(shí)候完成較快的reducer會(huì)進(jìn)行第二輪計(jì)算,并進(jìn)行負(fù)載均衡
增加reducer的數(shù)量會(huì)增加集群的負(fù)擔(dān)庵佣,但是會(huì)得到較好的負(fù)載均衡結(jié)果和減低失敗成本
一些詳細(xì)的參數(shù):
選項(xiàng)
類型
默認(rèn)值
描述
mapred.reduce.tasks int 1 reduce task數(shù)量
mapred.tasktracker.map.tasks.maximum int 2 每個(gè)節(jié)點(diǎn)上能夠啟動(dòng)map task的最大數(shù)量
mapred.tasktracker.reduce.tasks.maximum int 2 每個(gè)節(jié)點(diǎn)上能夠啟動(dòng)reduce task的最大數(shù)量
mapred.reduce.slowstart.completed.maps float 0.05 map階段完成5%的時(shí)候開始進(jìn)行reduce計(jì)算
map和reduce task是同時(shí)啟動(dòng)的歉胶,很長一段時(shí)間是并存的
共存的時(shí)間取決于mapred.reduce.slowstart.completed.maps的設(shè)置
如果設(shè)置為0.6.那么reduce將在map完成60%后進(jìn)入運(yùn)行態(tài)
如果設(shè)置的map和reduce參數(shù)都很大,勢必造成map和reduce爭搶資源巴粪,造成有些進(jìn)程饑餓通今,超時(shí)出錯(cuò),最大的可能就是socket.timeout的出錯(cuò)
reduce是在33%的時(shí)候完成shuffle過程肛根,所以確保reduce進(jìn)行到33%的時(shí)候map任務(wù)全部完成辫塌,可以通過觀察任務(wù)界面的完成度進(jìn)行調(diào)整
當(dāng)reduce到達(dá)33%的時(shí)候,map恰好達(dá)到100%設(shè)置最佳的比例派哲,可以讓map先完成臼氨,但是不要讓reduce等待計(jì)算資源