Hive是將符合SQL語法的字符串解析生成可以在Hadoop上執(zhí)行的MapReduce的工具埃仪。使用Hive盡量按照分布式計算的一些特點來設(shè)計sql,和傳統(tǒng)關(guān)系型數(shù)據(jù)庫有區(qū)別,
所以需要去掉原有關(guān)系型數(shù)據(jù)庫下開發(fā)的一些固有思維肛响。
基本原則:
1:盡量盡早地過濾數(shù)據(jù),減少每個階段的數(shù)據(jù)量,對于分區(qū)表要加分區(qū)惜索,同時只選擇需要使用到的字段
select ... from A
join B
on A.key = B.key
where A.userid>10
and B.userid<10
and A.dt='20120417'
and B.dt='20120417';
應(yīng)該改寫為:
select .... from (select .... from A
where dt='201200417'
and userid>10
) a
join ( select .... from B
where dt='201200417'
and userid < 10
) b
on a.key = b.key;
2特笋、對歷史庫的計算經(jīng)驗 (這項是說根據(jù)不同的使用目的優(yōu)化使用方法)
歷史庫計算和使用,分區(qū)
3:盡量原子化操作巾兆,盡量避免一個SQL包含復(fù)雜邏輯
可以使用中間表來完成復(fù)雜的邏輯
4 jion操作 小表要注意放在join的左邊(目前TCL里面很多都小表放在join的右邊)猎物。
否則會引起磁盤和內(nèi)存的大量消耗
5:如果union all的部分個數(shù)大于2,或者每個union部分數(shù)據(jù)量大角塑,應(yīng)該拆成多個insert into 語句蔫磨,實際測試過程中,執(zhí)行時間能提升50%
insert overwite table tablename partition (dt= ....)
select ..... from (
select ... from A
union all
select ... from B
union all
select ... from C
) R
where ...;
可以改寫為:
insert into table tablename partition (dt= ....)
select .... from A
WHERE ...;
insert into table tablename partition (dt= ....)
select .... from B
WHERE ...;
insert into table tablename partition (dt= ....)
select .... from C
WHERE ...;
5:寫SQL要先了解數(shù)據(jù)本身的特點吉拳,如果有join ,group操作的話质帅,要注意是否會有數(shù)據(jù)傾斜
如果出現(xiàn)數(shù)據(jù)傾斜,應(yīng)當做如下處理:
set hive.exec.reducers.max=200;
set mapred.reduce.tasks= 200;---增大Reduce個數(shù)
set hive.groupby.mapaggr.checkinterval=100000 ;--這個是group的鍵對應(yīng)的記錄條數(shù)超過這個值則會進行分拆,值根據(jù)具體數(shù)據(jù)量設(shè)置
set hive.groupby.skewindata=true; --如果是group by過程出現(xiàn)傾斜 應(yīng)該設(shè)置為true
set hive.skewjoin.key=100000; --這個是join的鍵對應(yīng)的記錄條數(shù)超過這個值則會進行分拆,值根據(jù)具體數(shù)據(jù)量設(shè)置
set hive.optimize.skewjoin=true;--如果是join 過程出現(xiàn)傾斜 應(yīng)該設(shè)置為true
(1) 啟動一次job盡可能的多做事情留攒,一個job能完成的事情,不要兩個job來做
通常來說前面的任務(wù)啟動可以稍帶一起做的事情就一起做了,以便后續(xù)的多個任務(wù)重用,與此緊密相連的是模型設(shè)計,好的模型特別重要.
(2) 合理設(shè)置reduce個數(shù)
reduce個數(shù)過少沒有真正發(fā)揮hadoop并行計算的威力,但reduce個數(shù)過多嫉嘀,會造成大量小文件問題炼邀,數(shù)據(jù)量、資源情況只有自己最清楚剪侮,找到個折衷點,
(3) 使用hive.exec.parallel參數(shù)控制在同一個sql中的不同的job是否可以同時運行拭宁,提高作業(yè)的并發(fā)
2、讓服務(wù)器盡量少做事情瓣俯,走最優(yōu)的路徑杰标,以資源消耗最少為目標
比如:
(1) 注意join的使用
若其中有一個表很小使用map join,否則使用普通的reduce join彩匕,注意hive會將join前面的表數(shù)據(jù)裝載內(nèi)存,所以較小的一個表在較大的表之前,減少內(nèi)存資源的消耗
(2)注意小文件的問題
在hive里有兩種比較常見的處理辦法
第一是使用Combinefileinputformat腔剂,將多個小文件打包作為一個整體的inputsplit,減少map任務(wù)數(shù)
set mapred.max.split.size=256000000;
set mapred.min.split.size.per.node=256000000
set Mapred.min.split.size.per.rack=256000000
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
第二是設(shè)置hive參數(shù)驼仪,將額外啟動一個MR Job打包小文件
hive.merge.mapredfiles = false 是否合并 Reduce 輸出文件掸犬,默認為 False
hive.merge.size.per.task = 25610001000 合并文件的大小
(3)注意數(shù)據(jù)傾斜
在hive里比較常用的處理辦法
第一通過hive.groupby.skewindata=true控制生成兩個MR Job,第一個MR Job Map的輸出結(jié)果隨機分配到reduce做次預(yù)匯總,減少某些key值條數(shù)過多某些key條數(shù)過小造成的數(shù)據(jù)傾斜問題
第二通過hive.map.aggr = true(默認為true)在Map端做combiner,假如map各條數(shù)據(jù)基本上不一樣, 聚合沒什么意義袜漩,做combiner反而畫蛇添足,hive里也考慮的比較周到通過參數(shù)hive.groupby.mapaggr.checkinterval = 100000 (默認)hive.map.aggr.hash.min.reduction=0.5(默認),預(yù)先取100000條數(shù)據(jù)聚合,如果聚合后的條數(shù)/100000>0.5,則不再聚合
(4)善用multi insert,union all
multi insert適合基于同一個源表按照不同邏輯不同粒度處理插入不同表的場景湾碎,做到只需要掃描源表一次宙攻,job個數(shù)不變,減少源表掃描次數(shù)
union all用好介褥,可減少表的掃描次數(shù)座掘,減少job的個數(shù),通常預(yù)先按不同邏輯不同條件生成的查詢union all后,再統(tǒng)一group by計算,不同表的union all相當于multiple inputs,同一個表的union all,相當map一次輸出多條
(5) 參數(shù)設(shè)置的調(diào)優(yōu)
集群參數(shù)種類繁多,舉個例子比如
可針對特定job設(shè)置特定參數(shù),比如jvm重用,reduce copy線程數(shù)量設(shè)置(適合map較快柔滔,輸出量較大)
如果任務(wù)數(shù)多且小雹顺,比如在一分鐘之內(nèi)完成,減少task數(shù)量以減少任務(wù)初始化的消耗廊遍℃依ⅲ可以通過配置JVM重用選項減少task的消耗
一、控制Hive中Map和reduce的數(shù)量
Hive中的sql查詢會生成執(zhí)行計劃喉前,執(zhí)行計劃以MapReduce的方式執(zhí)行没酣,那么結(jié)合數(shù)據(jù)和集群的大小,map和reduce的數(shù)量就會影響到sql執(zhí)行的效率卵迂。
除了要控制Hive生成的Job的數(shù)量裕便,也要控制map和reduce的數(shù)量。
1见咒、 map的數(shù)量偿衰,通常情況下和split的大小有關(guān)系,之前寫的一篇blog“map和reduce的數(shù)量是如何定義的”有描述改览。
hive中默認的hive.input.format是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat下翎,對于combineHiveInputFormat,它的輸入的map數(shù)量
由三個配置決定,
mapred.min.split.size.per.node宝当, 一個節(jié)點上split的至少的大小
mapred.min.split.size.per.rack 一個交換機下split至少的大小
mapred.max.split.size 一個split最大的大小
它的主要思路是把輸入目錄下的大文件分成多個map的輸入, 并合并小文件, 做為一個map的輸入. 具體的原理是下述三步:
a视事、根據(jù)輸入目錄下的每個文件,如果其長度超過mapred.max.split.size,以block為單位分成多個split(一個split是一個map的輸入),每個split的長度都大于mapred.max.split.size, 因為以block為單位, 因此也會大于blockSize, 此文件剩下的長度如果大于mapred.min.split.size.per.node, 則生成一個split, 否則先暫時保留.
b、現(xiàn)在剩下的都是一些長度效短的碎片,把每個rack下碎片合并, 只要長度超過mapred.max.split.size就合并成一個split, 最后如果剩下的碎片比mapred.min.split.size.per.rack大, 就合并成一個split, 否則暫時保留.
c庆揩、把不同rack下的碎片合并, 只要長度超過mapred.max.split.size就合并成一個split, 剩下的碎片無論長度, 合并成一個split.
舉例: mapred.max.split.size=1000
mapred.min.split.size.per.node=300
mapred.min.split.size.per.rack=100
輸入目錄下五個文件,rack1下三個文件,長度為2050,1499,10, rack2下兩個文件,長度為1010,80. 另外blockSize為500.
經(jīng)過第一步, 生成五個split: 1000,1000,1000,499,1000. 剩下的碎片為rack1下:50,10; rack2下10:80
由于兩個rack下的碎片和都不超過100, 所以經(jīng)過第二步, split和碎片都沒有變化.
第三步,合并四個碎片成一個split, 長度為150.
如果要減少map數(shù)量, 可以調(diào)大mapred.max.split.size, 否則調(diào)小即可.
其特點是: 一個塊至多作為一個map的輸入俐东,一個文件可能有多個塊,一個文件可能因為塊多分給做為不同map的輸入订晌, 一個map可能處理多個塊虏辫,可能處理多個文件。
2锈拨、 reduce數(shù)量
可以在hive運行sql的時砌庄,打印出來,如下:
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapred.reduce.tasks=<number>
reduce數(shù)量由以下三個參數(shù)決定,
mapred.reduce.tasks(強制指定reduce的任務(wù)數(shù)量)
hive.exec.reducers.bytes.per.reducer(每個reduce任務(wù)處理的數(shù)據(jù)量鹤耍,默認為1000^3=1G)
hive.exec.reducers.max(每個任務(wù)最大的reduce數(shù)肉迫,默認為999)
計算reducer數(shù)的公式很簡單N=min( hive.exec.reducers.max ,總輸入數(shù)據(jù)量/ hive.exec.reducers.bytes.per.reducer )
只有一個reduce的場景:
a稿黄、沒有g(shù)roup by 的匯總
b喊衫、order by
c、笛卡爾積
二杆怕、join和Group的優(yōu)化
對于普通的join操作族购,會在map端根據(jù)key的hash值,shuffle到某一個reduce上去陵珍,在reduce端做join連接操作寝杖,內(nèi)存中緩存join左邊的表,遍歷右邊的表互纯,一次做join操作瑟幕。所以在做join操作時候,將數(shù)據(jù)量多的表放在join的右邊留潦。
當數(shù)據(jù)量比較大只盹,并且key分布不均勻,大量的key都shuffle到一個reduce上了兔院,就出現(xiàn)了數(shù)據(jù)的傾斜殖卑。
對于Group操作,首先在map端聚合坊萝,最后在reduce端坐聚合孵稽,hive默認是這樣的,以下是相關(guān)的參數(shù)
· hive.[map](http://www.verydemo.com/demo_c152_i9269.html).aggr = true是否在 Map 端進行聚合十偶,默認為 True
· hive.groupby.mapaggr.checkinterval = 100000在 Map 端進行聚合操作的條目數(shù)目
對于join和Group操作都可能會出現(xiàn)數(shù)據(jù)傾斜菩鲜。
以下有幾種解決這個問題的常見思路
1、參數(shù)hive.groupby.skewindata = true,解決數(shù)據(jù)傾斜的萬能鑰匙扯键,查詢計劃會有兩個 MR [Job](http://www.verydemo.com/demo_c152_i9269.html)睦袖。第一個 MR Job 中,Map 的輸出結(jié)果集合會隨機分布到 Reduce 中荣刑,每個 Reduce 做部分聚合操作,并輸出結(jié)果伦乔,這樣處理的結(jié)果是相同的 Group By Key 有可能被分發(fā)到不同的 Reduce 中厉亏,從而達到負載均衡的目的;第二個 MR Job 再根據(jù)預(yù)處理的數(shù)據(jù)結(jié)果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中)烈和,最后完成最終的聚合操作爱只。
2、where的條件寫在join里面招刹,使得減少join的數(shù)量(經(jīng)過map端過濾恬试,只輸出復(fù)合條件的)
3窝趣、mapjoin方式,無reduce操作训柴,在map端做join操作(map端cache小表的全部數(shù)據(jù))哑舒,這種方式下無法執(zhí)行Full/RIGHT OUTER join操作
4、對于count(distinct)操作幻馁,在map端以group by的字段和count的字段聯(lián)合作為key洗鸵,如果有大量相同的key,那么會存在數(shù)據(jù)傾斜的問題
5仗嗦、數(shù)據(jù)的傾斜還包括膘滨,大量的join連接key為空的情況,空的key都hash到一個reduce上去了稀拐,解決這個問題火邓,最好把空的key和非空的key做區(qū)分
空的key不做join操作。
當然有的hive操作德撬,不存在數(shù)據(jù)傾斜的問題铲咨,比如數(shù)據(jù)聚合類的操作,像sum砰逻、count鸣驱,因為已經(jīng)在map端做了聚合操作了,到reduce端的數(shù)據(jù)相對少一些蝠咆,所以不存在這個問題踊东。
四、小文件的合并
大量的小文件導(dǎo)致文件數(shù)目過多刚操,給HDFS帶來壓力闸翅,對hive處理的效率影響比較大,可以合并map和reduce產(chǎn)生的文件
· hive.merge.mapfiles = true是否和并 Map 輸出文件菊霜,默認為 True
· hive.merge.mapredfiles = [false](http://www.verydemo.com/demo_c152_i9269.html)是否合并 Reduce 輸出文件坚冀,默認為 False
· hive.merge.size.per.task = 256*1000*1000合并文件的大小
五、in/exists(not)
通過left semi join 實現(xiàn) in操作鉴逞,一個限制就是join右邊的表只能出現(xiàn)在join條件中
六记某、分區(qū)裁剪
通過在條件中指定分區(qū),來限制數(shù)據(jù)掃描的范圍构捡,可以極大提高查詢的效率
七液南、排序
order by 排序,只存在一個reduce勾徽,這樣效率比較低滑凉。
可以用sort by操作,通常結(jié)合distribute by使用做reduce分區(qū)鍵