https://blog.csdn.net/u013332124/article/details/97373278
以下的內(nèi)容都是基于hive on spark來講解的类溢,不過大部分概念都是互通的凌蔬,也可以應(yīng)用到hive on mr上。
一闯冷、如何調(diào)整任務(wù)map數(shù)量
在hadoop體系中砂心,有一個(gè)類叫InputFormat
。在hadoop1.x時(shí)期蛇耀,這個(gè)類在org.apache.hadoop.mapred
包底下辩诞,是一個(gè)接口。而到了hadoop2.x時(shí)期纺涤,這個(gè)類就到了org.apache.hadoop.mapreduce
包底下译暂,變成了一個(gè)抽象類(1.x的那個(gè)InputFormat接口也還保留著)。因此撩炊,我們?nèi)タ磆adoop源碼時(shí)外永,搜InputFormat會搜到兩個(gè)類。這是因?yàn)閔adoop2.x做了很大的架構(gòu)改進(jìn)拧咳,同時(shí)為了兼容1.x的一些架構(gòu)伯顶,所以同時(shí)保留了這兩套代碼。
InputFormat主要有兩個(gè)方法(下面的代碼是hadoop1.x版本的):
//1.x版本的getSplits方法
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
//2.x版本的getSplits方法
public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException;
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
- getSplits:獲取數(shù)據(jù)分片信息骆膝,這個(gè)分片數(shù)組的大小就決定了map數(shù)量祭衩。一般一個(gè)分片由一個(gè)map task來處理
- getRecordReader:獲取reader,用來實(shí)際讀取分片數(shù)據(jù)
無論是mapreduce job還是spark job都是使用這個(gè)InputFormat的體系來計(jì)算map數(shù)量的阅签。針對不同類型的輸入數(shù)據(jù)掐暮,我們可以實(shí)現(xiàn)各種不同的InputFormat。比如我們常用的TextInputFormat就是實(shí)現(xiàn)了InputFormat類
1政钟、FileInputFormat的實(shí)現(xiàn)邏輯介紹
hadoop實(shí)現(xiàn)了一個(gè)抽象類FileInputFormat來作為InputFormat的一個(gè)通用實(shí)現(xiàn)類劫乱,我們熟悉的TextInputFormat就繼承了FileInputFormat织中。TextInputFormat并沒有實(shí)現(xiàn)自己的getSplits方法,而是直接使用FileInputFormat的getSplits方法衷戈。
1.1 getSplits方法實(shí)現(xiàn)
先通過公式Math.max(minSize, Math.min(goalSize, blockSize))
計(jì)算出目標(biāo)分片大小splitSize狭吼。
- minSize: 根據(jù)配置mapred.min.split.size獲取的,2.x版本中這個(gè)配置改成mapreduce.input.fileinputformat.split.minsize殖妇。還hive中兩個(gè)配置都可以用
- blockSize: 輸入hdfs文件的單個(gè)block大小
- goalSize: 這個(gè)是1.x和2.x不同的地方刁笙。2.x中直接通過mapred.max.split.size或mapreduce.input.fileinputformat.split.maxsize獲取。而1.x則是通過getSplits方法入?yún)⒗锏膎umSplits參數(shù)來計(jì)算的谦趣。計(jì)算規(guī)則:
goalSize=totalSize / (numSplits == 0 ? 1 : numSplits)
疲吸,其中totalSize是輸入文件的總大小(如果輸入是一個(gè)目錄,就遍歷底下的文件累加)前鹅。
確定了splitSize后摘悴,就通過以下流程進(jìn)行分片:
- 遍歷所有輸入文件
- 設(shè)置
remainSize=目標(biāo)文件大小
- 判斷 remainSize 是否大于 splitSize * 1.1,是的話切割出一個(gè)分片舰绘,然后remainSize = remainSize - splitSize蹂喻,接著繼續(xù)執(zhí)行以上邏輯,直到remainSize小于 splitSize * 1.1捂寿。之后將文件剩下的bytes切割出一個(gè)分片來
從上面的流程可以看出口四,在FileInputFormat的getSplits邏輯下,一個(gè)map task最多只能處理一個(gè)文件秦陋。也就是說蔓彩,如果輸入文件有n個(gè),無論怎么調(diào)整參數(shù)驳概,分配的map數(shù)量都不會少于n赤嚼。
2、CombineFileInputFormat的實(shí)現(xiàn)邏輯介紹
上面介紹的FileInputFormat分片規(guī)則比較簡單顺又,有時(shí)候可能會有某一些分片都集中在幾臺節(jié)點(diǎn)的問題探膊。而在CombineFileInputFormat的實(shí)現(xiàn)中,會盡量保證各個(gè)節(jié)點(diǎn)待榔、機(jī)架都能分配到一定的數(shù)據(jù)。
2.1 getSplits方法實(shí)現(xiàn)
首先遍歷所有的輸入文件流济,在遍歷過程中構(gòu)建以下數(shù)據(jù)結(jié)構(gòu):
//具體節(jié)點(diǎn)上的所有block信息
Map<String, Set<OneBlockInfo>> nodeToBlocks
//未分配出去的block信息
Map<OneBlockInfo, String[]> blockToNodes
//具體機(jī)架上的所有block信息
Map<String, List<OneBlockInfo>> rackToBlocks
還有幾個(gè)和配置相關(guān)的變量:
- maxSize: 通過mapreduce.input.fileinputformat.split.maxsize參數(shù)設(shè)置
- minSizePerNode: 通過mapreduce.input.fileinputformat.split.minsize.per.node參數(shù)設(shè)置
- minSizePerRack:通過mapreduce.input.fileinputformat.split.minsize.per.rack參數(shù)設(shè)置
接下來開始計(jì)算分片(部分流程圖):
上面的流程圖主要是遍歷nodeToBlocks下的所有node锐锣,然后繼續(xù)遍歷node下的所有block,將block的大小相加(這個(gè)block的大小計(jì)算也和maxSize有關(guān))绳瘟,一直疊加到maxSize后生成一個(gè)分片雕憔,然后退出該node的block遍歷。如果遍歷完block都沒達(dá)到maxSize糖声,但是累加的數(shù)據(jù)量達(dá)到了minSizePerNode的值斤彼,也可以切割出一個(gè)分片來分瘦,否則就什么都不做。之后繼續(xù)下一個(gè)node遍歷琉苇。(在這個(gè)過程中嘲玫,一個(gè)node最多只會生成一個(gè)分片)
之后按照基本一樣的流程遍歷rackToBlocks,處理完rack的分片后將所有剩下的block會放到一個(gè)overflowBlocks(ArrayList\<OneBlockInfo\>)
的數(shù)據(jù)結(jié)構(gòu)中并扇。
最后一步是遍歷overflowBlocks去团,不斷累加block,遍歷過程中達(dá)到maxSize就切割一個(gè)分片出來穷蛹,直到全部遍歷完土陪。
3、HiveInputFormat 的實(shí)現(xiàn)邏輯介紹
因?yàn)閔ive的任務(wù)的數(shù)據(jù)類型有各種各樣肴熏,用戶在提交sql時(shí)并不需要關(guān)心要用哪種InputFormat來讀取數(shù)據(jù)鬼雀。因此,Hive內(nèi)部實(shí)現(xiàn)了自己的InputFormat蛙吏。我們可以通過hive的參數(shù)hive.input.format
來設(shè)置具體要用哪種InputFormat實(shí)現(xiàn)源哩。hive默認(rèn)是使用org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
。目前也僅支持HiveInputFormat和CombineHiveInputFormat兩種出刷。
HiveInputFormat做的事主要就是解析輸入目錄對應(yīng)的表璧疗,然后獲取表設(shè)置的InputFormat是哪個(gè)。之后就把getSplits邏輯委托給這個(gè)類了馁龟。
如果我們要讀取的表是textfile格式的崩侠,最后就也是走到TextInputFormat類進(jìn)行數(shù)據(jù)分片。所以坷檩,大多數(shù)情況下却音,HiveInputFormat的行為和FileInputFormat差不多。
當(dāng)我們在Hive創(chuàng)建一張表時(shí)矢炼,hive會根據(jù)用戶的語句去給表的數(shù)據(jù)設(shè)置對應(yīng)的InputFormat系瓢。默認(rèn)是textfile的格式,使用的是
org.apache.hadoop.mapred.TextInputFormat
句灌。如果我們建表的時(shí)候加上stored as orc夷陋,對應(yīng)的InputFormat就變成了org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
。
另外胰锌,因?yàn)镠ive的HiveInputFormat是基于hadoop1.x版本的骗绕。因此分片大小的計(jì)算公式中g(shù)oalSize由getSplits的第二個(gè)參數(shù)numSplits來決定,這個(gè)參數(shù)是外界傳進(jìn)來的资昧。在mapreduce job中酬土,可以通過mapred.map.tasks參數(shù)來設(shè)置,默認(rèn)是2格带。如果我們想調(diào)大并發(fā)度撤缴,可以把這個(gè)值設(shè)置的大一點(diǎn)刹枉。但是在hive on spark中卻沒有地方可以設(shè)置這個(gè)參數(shù)(設(shè)置spark.defalut.parallelism也沒用),在hive on spark中屈呕,numSplits的值都默認(rèn)為2微宝。這里不具體展開,感興趣的可以自行查看相關(guān)源碼凉袱。
4芥吟、CombineHiveInputFormat的實(shí)現(xiàn)邏輯介紹
和HiveInputFormat相對應(yīng)的是CombineHiveInputFormat。這個(gè)也是hive的默認(rèn)配置专甩。
CombineHiveInputFormat的底層實(shí)現(xiàn)也是使用了CombineFileInputFormat的getSplits方法钟鸵。但是CombineHiveInputFormat在調(diào)用CombineFileInputFormat#getSplits()前還會進(jìn)行一些額外的處理。比如過濾出一些不需要合并的目錄(例如ACID的目錄就不可以去合并)涤躲。
5棺耍、map數(shù)量調(diào)整總結(jié)
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat 時(shí):
可影響map數(shù)量的參數(shù) |
---|
mapreduce.input.fileinputformat.split.maxsize ( mapred.max.split.size ) |
mapreduce.input.fileinputformat.split.minsize.per.node |
mapreduce.input.fileinputformat.split.minsize.per.rack |
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat 時(shí):
可影響map數(shù)量的參數(shù) |
---|
mapreduce.input.fileinputformat.split.minsize ( mapred.min.split.size ) |
mapred.map.tasks(僅hive on mr有作用) |
也可以通過改變文件hdfs block大小來影響(不推薦) |
二、如何調(diào)整任務(wù)reduce數(shù)量
hive設(shè)置reduce數(shù)量的邏輯就比較簡單种樱,相關(guān)代碼在SetSparkReducerParallelism類中蒙袍,感興趣的可以自己去細(xì)看代碼。
reduce的數(shù)量主要和以下三個(gè)參數(shù)有關(guān):
參數(shù)名 | 默認(rèn)值 |
---|---|
hive.exec.reducers.bytes.per.reducer | 1000000000 |
hive.exec.reducers.max | 999 |
mapred.reduce.tasks | -1 |
mapred.reduce.tasks參數(shù)很簡單嫩挤,就是簡單粗暴的直接指定reduce數(shù)量應(yīng)該是多少害幅。這個(gè)值是多少reduce task數(shù)量就是多少。
hive.exec.reducers.bytes.per.reducer和hive.exec.reducers.max是配合使用的岂昭。
具體計(jì)算公式如下:
reduceNumber = min(maxReduceNumber,mapInputSize/(bytesPerReduce/2))
其中mapInputSize表示的是預(yù)期輸入的數(shù)據(jù)總大小以现。這里注意要把預(yù)期輸入的數(shù)據(jù)量和實(shí)際輸入的數(shù)據(jù)量區(qū)別對待。通常我們在spark historyServer看到的統(tǒng)計(jì)數(shù)據(jù)都是實(shí)際輸入的數(shù)據(jù)量:
上圖中實(shí)際輸入的數(shù)據(jù)量是5.5GB约啊,我們設(shè)置的bytesPerReduce是30m邑遏。如果使用公式去算,會發(fā)現(xiàn)的出來的值不是492恰矩。但其實(shí)預(yù)期輸入的數(shù)據(jù)量是7.2GB(也就是目標(biāo)目錄的總大屑呛小),只是map task在讀取數(shù)據(jù)時(shí)會過濾掉一些沒用的數(shù)據(jù)不讀取外傅。
在spark job中纪吮,可能存在多個(gè)stage,有的reduce task數(shù)量的規(guī)則的mapInputSize是取自父stage的shuffle Read的值
三萎胰、關(guān)于map task是如何讀取數(shù)據(jù)的
上面介紹了任務(wù)是如何切割分片碾盟,然后確定map task數(shù)量的,之后一個(gè)map task負(fù)責(zé)一個(gè)分片奥洼。但是如果仔細(xì)思考,就會發(fā)現(xiàn)一個(gè)問題:map task所分配的到的分片不一定是完整的晚胡。
舉個(gè)例子灵奖,一個(gè)文件有200M嚼沿,一行大概100M,也就是這個(gè)文件只有2條記錄瓷患。
這時(shí)如果我們讀取這個(gè)文件時(shí)splitSize是60M骡尽,也就是最終會切割成4個(gè)分片:059MB,60119MB擅编,120179MB攀细,180200B。也就是會啟動4個(gè)map task來處理數(shù)據(jù)爱态。那這4個(gè)task是怎么處理兩條數(shù)據(jù)的呢谭贪?
這個(gè)就和InputFormat的getRecordReader方法有關(guān)系了。map task拿到RecordReader后锦担,就通過RecordReader來讀取數(shù)據(jù)了俭识。一般的RecordReader實(shí)現(xiàn)都會特別處理這種情況。如果LineRecordReader的讀取邏輯中洞渔,是這樣處理的:
- 第一個(gè)分片:發(fā)現(xiàn)起始是0套媚,就直接往后讀取一整行的數(shù)據(jù)。也就是這個(gè)map task實(shí)際處理的數(shù)據(jù)量是100M磁椒,即一條記錄
- 第二個(gè)分片:起始不是0堤瘤,它就會在通過指定的行分割符去找下一行的起始位置開始讀,也就是定位100M的位置浆熔,然后開始讀取一行的數(shù)據(jù)本辐。這個(gè)map task實(shí)際處理的數(shù)據(jù)量也是100M,即一條記錄
- 第三個(gè)分片:起始不是0蘸拔,往后掃描也找不到下一個(gè)行分割符师郑,所以這個(gè)map task處理的數(shù)據(jù)量是0M。
- 第四個(gè)分片:和分片三一樣调窍,處理的數(shù)據(jù)量是0M宝冕。
因此我們發(fā)現(xiàn),雖然上面分出了4個(gè)分片邓萨,但實(shí)際有處理數(shù)據(jù)量的task其實(shí)就2個(gè)地梨,還有兩個(gè)task實(shí)際基本沒做什么事,等于浪費(fèi)了一定的資源缔恳。所以宝剖,我們?nèi)绻胝{(diào)大map階段的并行度,并不是只要將splitSize調(diào)小就可以的歉甚,還要關(guān)注map task能處理的粒度是多大万细。比如輸入文件是orc時(shí),如果orc文件的stripe大小平均是32m纸泄,那么splitSize=16m的實(shí)際并行度并不會比splitSize=32m來的好赖钞,反而會浪費(fèi)一些資源腰素。
四、一些優(yōu)化方向
1雪营、map數(shù)量優(yōu)化原則
關(guān)于map數(shù)量弓千,一般來說,肯定是數(shù)量越多性能越高献起,但是同樣的洋访,數(shù)量越多意味著所消耗的資源也越多。因此谴餐,選擇一個(gè)合理的map數(shù)量對資源和性能的影響還是挺大的姻政。一般優(yōu)化原則如下:
- 使map數(shù)量的并行度盡量逼近集群單個(gè)任務(wù)允許的最高并行度。比如hive on spark中总寒,最多只能運(yùn)行1000個(gè)executor扶歪,那么可以讓map數(shù)量盡量逼近這個(gè)值。如果有多個(gè)stage同時(shí)都是map stage摄闸,就盡量保證這幾個(gè)stage的map task總和不超過1000
- 雖然說盡量使用更多的map數(shù)量善镰,但是要避免太多map task不工作的情況(處理的數(shù)據(jù)量是0)。比如Orc文件的一個(gè)stripe大小是30m年枕,但是以2m一個(gè)map task去計(jì)算map task數(shù)量炫欺,就會導(dǎo)致大多數(shù)的map task不會真正的處理數(shù)據(jù)。具體可以看第三章【關(guān)于map task是如何讀取數(shù)據(jù)的】
2熏兄、reduce數(shù)量優(yōu)化原則
關(guān)于reduce數(shù)量品洛,和map一樣,也要選擇了一個(gè)比較合理的值摩桶。一般優(yōu)化原則如下:
- 使reduce數(shù)量的并行度盡量逼近集群單個(gè)任務(wù)允許的最高并行度(這個(gè)理由和map一樣)桥状,這個(gè)其實(shí)可以通過hive.exec.reducers.max參數(shù)來限制
- 某些任務(wù)在reduce階段可能會導(dǎo)致數(shù)據(jù)傾斜,所以如果可以獲取到任務(wù)的歷史運(yùn)行情況硝清,還可以適當(dāng)?shù)母鶕?jù)任務(wù)reduce階段的傾斜程度來動態(tài)的調(diào)整reduce數(shù)量(具體多少和任務(wù)實(shí)際的運(yùn)行情況有關(guān))