【Hive任務(wù)優(yōu)化】—— Map、Reduce數(shù)量調(diào)整

https://blog.csdn.net/u013332124/article/details/97373278
以下的內(nèi)容都是基于hive on spark來講解的类溢,不過大部分概念都是互通的凌蔬,也可以應(yīng)用到hive on mr上。

一闯冷、如何調(diào)整任務(wù)map數(shù)量

在hadoop體系中砂心,有一個(gè)類叫InputFormat。在hadoop1.x時(shí)期蛇耀,這個(gè)類在org.apache.hadoop.mapred包底下辩诞,是一個(gè)接口。而到了hadoop2.x時(shí)期纺涤,這個(gè)類就到了org.apache.hadoop.mapreduce包底下译暂,變成了一個(gè)抽象類(1.x的那個(gè)InputFormat接口也還保留著)。因此撩炊,我們?nèi)タ磆adoop源碼時(shí)外永,搜InputFormat會搜到兩個(gè)類。這是因?yàn)閔adoop2.x做了很大的架構(gòu)改進(jìn)拧咳,同時(shí)為了兼容1.x的一些架構(gòu)伯顶,所以同時(shí)保留了這兩套代碼。

InputFormat主要有兩個(gè)方法(下面的代碼是hadoop1.x版本的):

//1.x版本的getSplits方法
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
//2.x版本的getSplits方法
public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException;
RecordReader<K, V> getRecordReader(InputSplit split,
                                     JobConf job, 
                                     Reporter reporter) throws IOException;
  • getSplits:獲取數(shù)據(jù)分片信息骆膝,這個(gè)分片數(shù)組的大小就決定了map數(shù)量祭衩。一般一個(gè)分片由一個(gè)map task來處理
  • getRecordReader:獲取reader,用來實(shí)際讀取分片數(shù)據(jù)

無論是mapreduce job還是spark job都是使用這個(gè)InputFormat的體系來計(jì)算map數(shù)量的阅签。針對不同類型的輸入數(shù)據(jù)掐暮,我們可以實(shí)現(xiàn)各種不同的InputFormat。比如我們常用的TextInputFormat就是實(shí)現(xiàn)了InputFormat類

1政钟、FileInputFormat的實(shí)現(xiàn)邏輯介紹

hadoop實(shí)現(xiàn)了一個(gè)抽象類FileInputFormat來作為InputFormat的一個(gè)通用實(shí)現(xiàn)類劫乱,我們熟悉的TextInputFormat就繼承了FileInputFormat织中。TextInputFormat并沒有實(shí)現(xiàn)自己的getSplits方法,而是直接使用FileInputFormat的getSplits方法衷戈。

1.1 getSplits方法實(shí)現(xiàn)

先通過公式Math.max(minSize, Math.min(goalSize, blockSize))計(jì)算出目標(biāo)分片大小splitSize狭吼。

  • minSize: 根據(jù)配置mapred.min.split.size獲取的,2.x版本中這個(gè)配置改成mapreduce.input.fileinputformat.split.minsize殖妇。還hive中兩個(gè)配置都可以用
  • blockSize: 輸入hdfs文件的單個(gè)block大小
  • goalSize: 這個(gè)是1.x和2.x不同的地方刁笙。2.x中直接通過mapred.max.split.size或mapreduce.input.fileinputformat.split.maxsize獲取。而1.x則是通過getSplits方法入?yún)⒗锏膎umSplits參數(shù)來計(jì)算的谦趣。計(jì)算規(guī)則:goalSize=totalSize / (numSplits == 0 ? 1 : numSplits)疲吸,其中totalSize是輸入文件的總大小(如果輸入是一個(gè)目錄,就遍歷底下的文件累加)前鹅。

確定了splitSize后摘悴,就通過以下流程進(jìn)行分片:

在這里插入圖片描述
  1. 遍歷所有輸入文件
  2. 設(shè)置remainSize=目標(biāo)文件大小
  3. 判斷 remainSize 是否大于 splitSize * 1.1,是的話切割出一個(gè)分片舰绘,然后remainSize = remainSize - splitSize蹂喻,接著繼續(xù)執(zhí)行以上邏輯,直到remainSize小于 splitSize * 1.1捂寿。之后將文件剩下的bytes切割出一個(gè)分片來

從上面的流程可以看出口四,在FileInputFormat的getSplits邏輯下,一個(gè)map task最多只能處理一個(gè)文件秦陋。也就是說蔓彩,如果輸入文件有n個(gè),無論怎么調(diào)整參數(shù)驳概,分配的map數(shù)量都不會少于n赤嚼。

2、CombineFileInputFormat的實(shí)現(xiàn)邏輯介紹

上面介紹的FileInputFormat分片規(guī)則比較簡單顺又,有時(shí)候可能會有某一些分片都集中在幾臺節(jié)點(diǎn)的問題探膊。而在CombineFileInputFormat的實(shí)現(xiàn)中,會盡量保證各個(gè)節(jié)點(diǎn)待榔、機(jī)架都能分配到一定的數(shù)據(jù)。

2.1 getSplits方法實(shí)現(xiàn)

首先遍歷所有的輸入文件流济,在遍歷過程中構(gòu)建以下數(shù)據(jù)結(jié)構(gòu):

//具體節(jié)點(diǎn)上的所有block信息
Map<String, Set<OneBlockInfo>> nodeToBlocks    
//未分配出去的block信息
Map<OneBlockInfo, String[]> blockToNodes   
//具體機(jī)架上的所有block信息
Map<String, List<OneBlockInfo>> rackToBlocks   

還有幾個(gè)和配置相關(guān)的變量:

  • maxSize: 通過mapreduce.input.fileinputformat.split.maxsize參數(shù)設(shè)置
  • minSizePerNode: 通過mapreduce.input.fileinputformat.split.minsize.per.node參數(shù)設(shè)置
  • minSizePerRack:通過mapreduce.input.fileinputformat.split.minsize.per.rack參數(shù)設(shè)置

接下來開始計(jì)算分片(部分流程圖):

在這里插入圖片描述

上面的流程圖主要是遍歷nodeToBlocks下的所有node锐锣,然后繼續(xù)遍歷node下的所有block,將block的大小相加(這個(gè)block的大小計(jì)算也和maxSize有關(guān))绳瘟,一直疊加到maxSize后生成一個(gè)分片雕憔,然后退出該node的block遍歷。如果遍歷完block都沒達(dá)到maxSize糖声,但是累加的數(shù)據(jù)量達(dá)到了minSizePerNode的值斤彼,也可以切割出一個(gè)分片來分瘦,否則就什么都不做。之后繼續(xù)下一個(gè)node遍歷琉苇。(在這個(gè)過程中嘲玫,一個(gè)node最多只會生成一個(gè)分片)

之后按照基本一樣的流程遍歷rackToBlocks,處理完rack的分片后將所有剩下的block會放到一個(gè)overflowBlocks(ArrayList\<OneBlockInfo\>)的數(shù)據(jù)結(jié)構(gòu)中并扇。

最后一步是遍歷overflowBlocks去团,不斷累加block,遍歷過程中達(dá)到maxSize就切割一個(gè)分片出來穷蛹,直到全部遍歷完土陪。

3、HiveInputFormat 的實(shí)現(xiàn)邏輯介紹

因?yàn)閔ive的任務(wù)的數(shù)據(jù)類型有各種各樣肴熏,用戶在提交sql時(shí)并不需要關(guān)心要用哪種InputFormat來讀取數(shù)據(jù)鬼雀。因此,Hive內(nèi)部實(shí)現(xiàn)了自己的InputFormat蛙吏。我們可以通過hive的參數(shù)hive.input.format來設(shè)置具體要用哪種InputFormat實(shí)現(xiàn)源哩。hive默認(rèn)是使用org.apache.hadoop.hive.ql.io.CombineHiveInputFormat。目前也僅支持HiveInputFormat和CombineHiveInputFormat兩種出刷。

HiveInputFormat做的事主要就是解析輸入目錄對應(yīng)的表璧疗,然后獲取表設(shè)置的InputFormat是哪個(gè)。之后就把getSplits邏輯委托給這個(gè)類了馁龟。

如果我們要讀取的表是textfile格式的崩侠,最后就也是走到TextInputFormat類進(jìn)行數(shù)據(jù)分片。所以坷檩,大多數(shù)情況下却音,HiveInputFormat的行為和FileInputFormat差不多

當(dāng)我們在Hive創(chuàng)建一張表時(shí)矢炼,hive會根據(jù)用戶的語句去給表的數(shù)據(jù)設(shè)置對應(yīng)的InputFormat系瓢。默認(rèn)是textfile的格式,使用的是org.apache.hadoop.mapred.TextInputFormat句灌。如果我們建表的時(shí)候加上stored as orc夷陋,對應(yīng)的InputFormat就變成了org.apache.hadoop.hive.ql.io.orc.OrcInputFormat

另外胰锌,因?yàn)镠ive的HiveInputFormat是基于hadoop1.x版本的骗绕。因此分片大小的計(jì)算公式中g(shù)oalSize由getSplits的第二個(gè)參數(shù)numSplits來決定,這個(gè)參數(shù)是外界傳進(jìn)來的资昧。在mapreduce job中酬土,可以通過mapred.map.tasks參數(shù)來設(shè)置,默認(rèn)是2格带。如果我們想調(diào)大并發(fā)度撤缴,可以把這個(gè)值設(shè)置的大一點(diǎn)刹枉。但是在hive on spark中卻沒有地方可以設(shè)置這個(gè)參數(shù)(設(shè)置spark.defalut.parallelism也沒用),在hive on spark中屈呕,numSplits的值都默認(rèn)為2微宝。這里不具體展開,感興趣的可以自行查看相關(guān)源碼凉袱。

4芥吟、CombineHiveInputFormat的實(shí)現(xiàn)邏輯介紹

和HiveInputFormat相對應(yīng)的是CombineHiveInputFormat。這個(gè)也是hive的默認(rèn)配置专甩。

CombineHiveInputFormat的底層實(shí)現(xiàn)也是使用了CombineFileInputFormat的getSplits方法钟鸵。但是CombineHiveInputFormat在調(diào)用CombineFileInputFormat#getSplits()前還會進(jìn)行一些額外的處理。比如過濾出一些不需要合并的目錄(例如ACID的目錄就不可以去合并)涤躲。

5棺耍、map數(shù)量調(diào)整總結(jié)

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat 時(shí)

可影響map數(shù)量的參數(shù)
mapreduce.input.fileinputformat.split.maxsize ( mapred.max.split.size )
mapreduce.input.fileinputformat.split.minsize.per.node
mapreduce.input.fileinputformat.split.minsize.per.rack

set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat 時(shí)

可影響map數(shù)量的參數(shù)
mapreduce.input.fileinputformat.split.minsize ( mapred.min.split.size )
mapred.map.tasks(僅hive on mr有作用)
也可以通過改變文件hdfs block大小來影響(不推薦)

二、如何調(diào)整任務(wù)reduce數(shù)量

hive設(shè)置reduce數(shù)量的邏輯就比較簡單种樱,相關(guān)代碼在SetSparkReducerParallelism類中蒙袍,感興趣的可以自己去細(xì)看代碼。

reduce的數(shù)量主要和以下三個(gè)參數(shù)有關(guān):

參數(shù)名 默認(rèn)值
hive.exec.reducers.bytes.per.reducer 1000000000
hive.exec.reducers.max 999
mapred.reduce.tasks -1

mapred.reduce.tasks參數(shù)很簡單嫩挤,就是簡單粗暴的直接指定reduce數(shù)量應(yīng)該是多少害幅。這個(gè)值是多少reduce task數(shù)量就是多少。

hive.exec.reducers.bytes.per.reducer和hive.exec.reducers.max是配合使用的岂昭。

具體計(jì)算公式如下:

reduceNumber = min(maxReduceNumber,mapInputSize/(bytesPerReduce/2))

其中mapInputSize表示的是預(yù)期輸入的數(shù)據(jù)總大小以现。這里注意要把預(yù)期輸入的數(shù)據(jù)量和實(shí)際輸入的數(shù)據(jù)量區(qū)別對待。通常我們在spark historyServer看到的統(tǒng)計(jì)數(shù)據(jù)都是實(shí)際輸入的數(shù)據(jù)量

在這里插入圖片描述

上圖中實(shí)際輸入的數(shù)據(jù)量是5.5GB约啊,我們設(shè)置的bytesPerReduce是30m邑遏。如果使用公式去算,會發(fā)現(xiàn)的出來的值不是492恰矩。但其實(shí)預(yù)期輸入的數(shù)據(jù)量是7.2GB(也就是目標(biāo)目錄的總大屑呛小),只是map task在讀取數(shù)據(jù)時(shí)會過濾掉一些沒用的數(shù)據(jù)不讀取外傅。

在spark job中纪吮,可能存在多個(gè)stage,有的reduce task數(shù)量的規(guī)則的mapInputSize是取自父stage的shuffle Read的值

三萎胰、關(guān)于map task是如何讀取數(shù)據(jù)的

上面介紹了任務(wù)是如何切割分片碾盟,然后確定map task數(shù)量的,之后一個(gè)map task負(fù)責(zé)一個(gè)分片奥洼。但是如果仔細(xì)思考,就會發(fā)現(xiàn)一個(gè)問題:map task所分配的到的分片不一定是完整的晚胡。

舉個(gè)例子灵奖,一個(gè)文件有200M嚼沿,一行大概100M,也就是這個(gè)文件只有2條記錄瓷患。

這時(shí)如果我們讀取這個(gè)文件時(shí)splitSize是60M骡尽,也就是最終會切割成4個(gè)分片:059MB,60119MB擅编,120179MB攀细,180200B。也就是會啟動4個(gè)map task來處理數(shù)據(jù)爱态。那這4個(gè)task是怎么處理兩條數(shù)據(jù)的呢谭贪?

這個(gè)就和InputFormat的getRecordReader方法有關(guān)系了。map task拿到RecordReader后锦担,就通過RecordReader來讀取數(shù)據(jù)了俭识。一般的RecordReader實(shí)現(xiàn)都會特別處理這種情況。如果LineRecordReader的讀取邏輯中洞渔,是這樣處理的:

  • 第一個(gè)分片:發(fā)現(xiàn)起始是0套媚,就直接往后讀取一整行的數(shù)據(jù)。也就是這個(gè)map task實(shí)際處理的數(shù)據(jù)量是100M磁椒,即一條記錄
  • 第二個(gè)分片:起始不是0堤瘤,它就會在通過指定的行分割符去找下一行的起始位置開始讀,也就是定位100M的位置浆熔,然后開始讀取一行的數(shù)據(jù)本辐。這個(gè)map task實(shí)際處理的數(shù)據(jù)量也是100M,即一條記錄
  • 第三個(gè)分片:起始不是0蘸拔,往后掃描也找不到下一個(gè)行分割符师郑,所以這個(gè)map task處理的數(shù)據(jù)量是0M。
  • 第四個(gè)分片:和分片三一樣调窍,處理的數(shù)據(jù)量是0M宝冕。

因此我們發(fā)現(xiàn),雖然上面分出了4個(gè)分片邓萨,但實(shí)際有處理數(shù)據(jù)量的task其實(shí)就2個(gè)地梨,還有兩個(gè)task實(shí)際基本沒做什么事,等于浪費(fèi)了一定的資源缔恳。所以宝剖,我們?nèi)绻胝{(diào)大map階段的并行度,并不是只要將splitSize調(diào)小就可以的歉甚,還要關(guān)注map task能處理的粒度是多大万细。比如輸入文件是orc時(shí),如果orc文件的stripe大小平均是32m纸泄,那么splitSize=16m的實(shí)際并行度并不會比splitSize=32m來的好赖钞,反而會浪費(fèi)一些資源腰素。

四、一些優(yōu)化方向

1雪营、map數(shù)量優(yōu)化原則

關(guān)于map數(shù)量弓千,一般來說,肯定是數(shù)量越多性能越高献起,但是同樣的洋访,數(shù)量越多意味著所消耗的資源也越多。因此谴餐,選擇一個(gè)合理的map數(shù)量對資源和性能的影響還是挺大的姻政。一般優(yōu)化原則如下:

  • 使map數(shù)量的并行度盡量逼近集群單個(gè)任務(wù)允許的最高并行度。比如hive on spark中总寒,最多只能運(yùn)行1000個(gè)executor扶歪,那么可以讓map數(shù)量盡量逼近這個(gè)值。如果有多個(gè)stage同時(shí)都是map stage摄闸,就盡量保證這幾個(gè)stage的map task總和不超過1000
  • 雖然說盡量使用更多的map數(shù)量善镰,但是要避免太多map task不工作的情況(處理的數(shù)據(jù)量是0)。比如Orc文件的一個(gè)stripe大小是30m年枕,但是以2m一個(gè)map task去計(jì)算map task數(shù)量炫欺,就會導(dǎo)致大多數(shù)的map task不會真正的處理數(shù)據(jù)。具體可以看第三章【關(guān)于map task是如何讀取數(shù)據(jù)的】

2熏兄、reduce數(shù)量優(yōu)化原則

關(guān)于reduce數(shù)量品洛,和map一樣,也要選擇了一個(gè)比較合理的值摩桶。一般優(yōu)化原則如下:

  • 使reduce數(shù)量的并行度盡量逼近集群單個(gè)任務(wù)允許的最高并行度(這個(gè)理由和map一樣)桥状,這個(gè)其實(shí)可以通過hive.exec.reducers.max參數(shù)來限制
  • 某些任務(wù)在reduce階段可能會導(dǎo)致數(shù)據(jù)傾斜,所以如果可以獲取到任務(wù)的歷史運(yùn)行情況硝清,還可以適當(dāng)?shù)母鶕?jù)任務(wù)reduce階段的傾斜程度來動態(tài)的調(diào)整reduce數(shù)量(具體多少和任務(wù)實(shí)際的運(yùn)行情況有關(guān))

參考資料

https://stackoverflow.com/questions/14291170/how-does-hadoop-process-records-split-across-block-boundaries

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末辅斟,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子芦拿,更是在濱河造成了極大的恐慌士飒,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,807評論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蔗崎,死亡現(xiàn)場離奇詭異酵幕,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)缓苛,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,284評論 3 399
  • 文/潘曉璐 我一進(jìn)店門芳撒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事笔刹÷辏” “怎么了?”我有些...
    開封第一講書人閱讀 169,589評論 0 363
  • 文/不壞的土叔 我叫張陵徘熔,是天一觀的道長。 經(jīng)常有香客問我淆党,道長酷师,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,188評論 1 300
  • 正文 為了忘掉前任染乌,我火速辦了婚禮山孔,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘荷憋。我一直安慰自己台颠,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,185評論 6 398
  • 文/花漫 我一把揭開白布勒庄。 她就那樣靜靜地躺著串前,像睡著了一般。 火紅的嫁衣襯著肌膚如雪实蔽。 梳的紋絲不亂的頭發(fā)上荡碾,一...
    開封第一講書人閱讀 52,785評論 1 314
  • 那天,我揣著相機(jī)與錄音局装,去河邊找鬼坛吁。 笑死,一個(gè)胖子當(dāng)著我的面吹牛铐尚,可吹牛的內(nèi)容都是我干的拨脉。 我是一名探鬼主播,決...
    沈念sama閱讀 41,220評論 3 423
  • 文/蒼蘭香墨 我猛地睜開眼宣增,長吁一口氣:“原來是場噩夢啊……” “哼玫膀!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起统舀,我...
    開封第一講書人閱讀 40,167評論 0 277
  • 序言:老撾萬榮一對情侶失蹤匆骗,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后誉简,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體碉就,經(jīng)...
    沈念sama閱讀 46,698評論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,767評論 3 343
  • 正文 我和宋清朗相戀三年闷串,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了瓮钥。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,912評論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖碉熄,靈堂內(nèi)的尸體忽然破棺而出桨武,到底是詐尸還是另有隱情,我是刑警寧澤锈津,帶...
    沈念sama閱讀 36,572評論 5 351
  • 正文 年R本政府宣布呀酸,位于F島的核電站,受9級特大地震影響琼梆,放射性物質(zhì)發(fā)生泄漏性誉。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,254評論 3 336
  • 文/蒙蒙 一茎杂、第九天 我趴在偏房一處隱蔽的房頂上張望错览。 院中可真熱鬧,春花似錦煌往、人聲如沸倾哺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,746評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽羞海。三九已至,卻和暖如春曲管,著一層夾襖步出監(jiān)牢的瞬間扣猫,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,859評論 1 274
  • 我被黑心中介騙來泰國打工翘地, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留申尤,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,359評論 3 379
  • 正文 我出身青樓衙耕,卻偏偏與公主長得像昧穿,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子橙喘,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,922評論 2 361

推薦閱讀更多精彩內(nèi)容