在工作中使用hive比較多,也寫了很多HiveQL惜犀。這里從三個方面對 Hive 常用的一些性能優(yōu)化進行了總結。
表設計層面優(yōu)化
利用分區(qū)表優(yōu)化
分區(qū)表 是在某一個或者幾個維度上對數(shù)據進行分類存儲剑逃,一個分區(qū)對應一個目錄沛膳。如果篩選條件里有分區(qū)字段,那么 Hive 只需要遍歷對應分區(qū)目錄下的文件即可俊鱼,不需要遍歷全局數(shù)據刻像,使得處理的數(shù)據量大大減少,從而提高查詢效率并闲。
當一個 Hive 表的查詢大多數(shù)情況下细睡,會根據某一個字段進行篩選時,那么非常適合創(chuàng)建為分區(qū)表帝火。
利用桶表優(yōu)化
指定桶的個數(shù)后溜徙,存儲數(shù)據時,根據某一個字段進行哈希后犀填,確定存儲在哪個桶里蠢壹,這樣做的目的和分區(qū)表類似,也是使得篩選時不用全局遍歷所有的數(shù)據九巡,只需要遍歷所在桶就可以了图贸。
選擇合適的文件存儲格式
Apache Hive 支持 Apache Hadoop 中使用的幾種熟悉的文件格式。
TextFile
默認格式,如果建表時不指定默認為此格式疏日。
存儲方式:行存儲乏盐。
每一行都是一條記錄,每行都以換行符\n
結尾制恍。數(shù)據不做壓縮時父能,磁盤會開銷比較大,數(shù)據解析開銷也比較大净神。
可結合 Gzip何吝、Bzip2 等壓縮方式一起使用(系統(tǒng)會自動檢查,查詢時會自動解壓)鹃唯,但對于某些壓縮算法 hive 不會對數(shù)據進行切分爱榕,從而無法對數(shù)據進行并行操作。
SequenceFile
一種Hadoop API 提供的二進制文件坡慌,使用方便黔酥、可分割、個壓縮的特點洪橘。
支持三種壓縮選擇:NONE跪者、RECORD、BLOCK熄求。RECORD壓縮率低渣玲,一般建議使用BLOCK壓縮。
RCFile
存儲方式:數(shù)據按行分塊弟晚,每塊按照列存儲 忘衍。
- 首先,將數(shù)據按行分塊卿城,保證同一個record在一個塊上枚钓,避免讀一個記錄需要讀取多個block。
- 其次瑟押,塊數(shù)據列式存儲搀捷,有利于數(shù)據壓縮和快速的列存取。
ORC
存儲方式:數(shù)據按行分塊勉耀,每塊按照列存儲
Hive 提供的新格式指煎,屬于 RCFile 的升級版,性能有大幅度提升便斥,而且數(shù)據可以壓縮存儲至壤,壓縮快,快速列存取枢纠。
Parquet
存儲方式:列式存儲
Parquet 對于大型查詢的類型是高效的像街。對于掃描特定表格中的特定列查詢,Parquet特別有用。Parquet一般使用 Snappy镰绎、Gzip 壓縮脓斩。默認 Snappy。
Parquet 支持 Impala 查詢引擎畴栖。
表的文件存儲格式盡量采用 Parquet 或 ORC随静,不僅降低存儲量,還優(yōu)化了查詢吗讶,壓縮燎猛,表關聯(lián)等性能;
選擇合適的壓縮方式
Hive 語句最終是轉化為 MapReduce 程序來執(zhí)行的照皆,而 MapReduce 的性能瓶頸在與 網絡IO 和 磁盤IO重绷,要解決性能瓶頸,最主要的是 減少數(shù)據量膜毁,對數(shù)據進行壓縮是個好方式昭卓。壓縮雖然是減少了數(shù)據量,但是壓縮過程要消耗CPU瘟滨,但是在Hadoop中候醒,往往性能瓶頸不在于CPU,CPU壓力并不大室奏,所以壓縮充分利用了比較空閑的CPU火焰。
常用壓縮算法對比
如何選擇壓縮方式
- 壓縮比率
- 壓縮解壓速度
- 是否支持split
支持分割的文件可以并行的有多個 mapper 程序處理大數(shù)據文件劲装,大多數(shù)文件不支持可分割是因為這些文件只能從頭開始讀胧沫。
語法和參數(shù)層面優(yōu)化
列裁剪
Hive 在讀數(shù)據的時候,可以只讀取查詢中所需要用到的列占业,而忽略其他的列绒怨。這樣做可以節(jié)省讀取開銷,中間表存儲開銷和數(shù)據整合開銷谦疾。
set hive.optimize.cp = true; -- 列裁剪南蹂,取數(shù)只取查詢中需要用到的列,默認為真
分區(qū)裁剪
在查詢的過程中只選擇需要的分區(qū)念恍,可以減少讀入的分區(qū)數(shù)目六剥,減少讀入的數(shù)據量。
set hive.optimize.pruner=true; // 默認為true
合并小文件
Map 輸入合并
在執(zhí)行 MapReduce 程序的時候峰伙,一般情況是一個文件需要一個 mapper 來處理疗疟。但是如果數(shù)據源是大量的小文件,這樣豈不是會啟動大量的 mapper 任務瞳氓,這樣會浪費大量資源策彤。可以將輸入的小文件進行合并,從而減少mapper任務數(shù)量店诗。詳細分析
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- Map端輸入裹刮、合并文件之后按照block的大小分割(默認)
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; -- Map端輸入,不合并
Map/Reduce輸出合并
大量的小文件會給 HDFS 帶來壓力庞瘸,影響處理效率捧弃。可以通過合并 Map 和 Reduce 的結果文件來消除影響擦囊。
set hive.merge.mapfiles=true; -- 是否合并Map輸出文件, 默認值為真
set hive.merge.mapredfiles=true; -- 是否合并Reduce 端輸出文件,默認值為假
set hive.merge.size.per.task=25610001000; -- 合并文件的大小,默認值為 256000000
合理控制 map/reduce 任務數(shù)量
合理控制 mapper 數(shù)量
減少 mapper 數(shù)可以通過合并小文件來實現(xiàn)
增加 mapper 數(shù)可以通過控制上一個 reduce
默認的 mapper 個數(shù)計算方式
輸入文件總大兴稹:total_size
hdfs 設置的數(shù)據塊大小:dfs_block_size
default_mapper_num = total_size/dfs_block_size
MapReduce 中提供了如下參數(shù)來控制 map 任務個數(shù):
set mapred.map.tasks=10;
從字面上看霜第,貌似是可以直接設置 mapper 個數(shù)的樣子葛家,但是很遺憾不行,這個參數(shù)設置只有在大于default_mapper_num
的時候泌类,才會生效癞谒。
那如果我們需要減少 mapper 數(shù)量,但是文件大小是固定的刃榨,那該怎么辦呢?
可以通過mapred.min.split.size
設置每個任務處理的文件的大小弹砚,這個大小只有在大于dfs_block_size
的時候才會生效
split_size=max(mapred.min.split.size, dfs_block_size)
split_num=total_size/split_size
compute_map_num = min(split_num, max(default_mapper_num, mapred.map.tasks))
這樣就可以減少mapper數(shù)量了。
總結一下控制 mapper 個數(shù)的方法:
- 如果想增加 mapper 個數(shù)枢希,可以設置
mapred.map.tasks
為一個較大的值 - 如果想減少 mapper 個數(shù)桌吃,可以設置
maperd.min.split.size
為一個較大的值 - 如果輸入是大量小文件,想減少 mapper 個數(shù)苞轿,可以通過設置
hive.input.format
合并小文件
如果想要調整 mapper 個數(shù)茅诱,在調整之前,需要確定處理的文件大概大小以及文件的存在形式(是大量小文件搬卒,還是單個大文件)瑟俭,然后再設置合適的參數(shù)。
合理控制reducer數(shù)量
如果 reducer 數(shù)量過多契邀,一個 reducer 會產生一個結數(shù)量果文件摆寄,這樣就會生成很多小文件,那么如果這些結果文件會作為下一個 job 的輸入坯门,則會出現(xiàn)小文件需要進行合并的問題微饥,而且啟動和初始化 reducer 需要耗費和資源。
如果 reducer 數(shù)量過少古戴,這樣一個 reducer 就需要處理大量的數(shù)據欠橘,并且還有可能會出現(xiàn)數(shù)據傾斜的問題,使得整個查詢耗時長允瞧。
默認情況下简软,hive 分配的 reducer 個數(shù)由下列參數(shù)決定:
- 參數(shù)1:
hive.exec.reducers.bytes.per.reducer
(默認1G) - 參數(shù)2:
hive.exec.reducers.max
(默認為999)
reducer的計算公式為:
N = min(參數(shù)2蛮拔, 總輸入數(shù)據量/參數(shù)1)
可以通過改變上述兩個參數(shù)的值來控制reducer的數(shù)量。
也可以通過
set mapred.map.tasks=10;
直接控制reducer個數(shù)痹升,如果設置了該參數(shù)建炫,上面兩個參數(shù)就會忽略。
Join優(yōu)化
優(yōu)先過濾數(shù)據
盡量減少每個階段的數(shù)據量疼蛾,對于分區(qū)表能用上分區(qū)字段的盡量使用肛跌,同時只選擇后面需要使用到的列,最大限度的減少參與 join 的數(shù)據量察郁。
小表 join 大表原則
小表 join 大表的時應遵守小表 join 大表原則衍慎,原因是 join 操作的 reduce 階段,位于 join 左邊的表內容會被加載進內存皮钠,將條目少的表放在左邊稳捆,可以有效減少發(fā)生內存溢出的幾率。join 中執(zhí)行順序是從左到右生成 Job麦轰,應該保證連續(xù)查詢中的表的大小從左到右是依次增加的乔夯。
使用相同的連接鍵
在 hive 中,當對 3 個或更多張表進行 join 時款侵,如果 on 條件使用相同字段末荐,那么它們會合并為一個 MapReduce Job,利用這種特性新锈,可以將相同的 join on 的放入一個 job 來節(jié)省執(zhí)行時間甲脏。
啟用 mapjoin
mapjoin 是將 join 雙方比較小的表直接分發(fā)到各個 map 進程的內存中,在 map 進程中進行 join 操作妹笆,這樣就不用進行 reduce 步驟块请,從而提高了速度。只有 join 操作才能啟用 mapjoin晾浴。
set hive.auto.convert.join = true; -- 是否根據輸入小表的大小负乡,自動將reduce端的common join 轉化為map join,將小表刷入內存中脊凰。
set hive.mapjoin.smalltable.filesize = 2500000; -- 刷入內存表的大小(字節(jié))
set hive.mapjoin.maxsize=1000000; -- Map Join所處理的最大的行數(shù)。超過此行數(shù)茂腥,Map Join進程會異常退出
盡量原子操作
盡量避免一個SQL包含復雜的邏輯狸涌,可以使用中間表來完成復雜的邏輯。
桶表 mapjoin
當兩個分桶表 join 時最岗,如果 join on的是分桶字段帕胆,小表的分桶數(shù)是大表的倍數(shù)時,可以啟用 mapjoin 來提高效率般渡。
set hive.optimize.bucketmapjoin = true; -- 啟用桶表 map join
Group By 優(yōu)化
默認情況下懒豹,Map階段同一個Key的數(shù)據會分發(fā)到一個Reduce上芙盘,當一個Key的數(shù)據過大時會產生 數(shù)據傾斜。進行group by
操作時可以從以下兩個方面進行優(yōu)化:
1. Map端部分聚合
事實上并不是所有的聚合操作都需要在 Reduce 部分進行脸秽,很多聚合操作都可以先在 Map 端進行部分聚合儒老,然后在 Reduce 端的得出最終結果。
set hive.map.aggr=true; -- 開啟Map端聚合參數(shù)設置
set hive.grouby.mapaggr.checkinterval=100000; -- 在Map端進行聚合操作的條目數(shù)目
2. 有數(shù)據傾斜時進行負載均衡
set hive.groupby.skewindata = true; -- 有數(shù)據傾斜的時候進行負載均衡(默認是false)
當選項設定為 true 時记餐,生成的查詢計劃有兩個 MapReduce 任務驮樊。在第一個 MapReduce 任務中,map 的輸出結果會隨機分布到 reduce 中片酝,每個 reduce 做部分聚合操作囚衔,并輸出結果,這樣處理的結果是相同的group by key
有可能分發(fā)到不同的 reduce 中雕沿,從而達到負載均衡的目的练湿;第二個 MapReduce 任務再根據預處理的數(shù)據結果按照group by key
分布到各個 reduce 中,最后完成最終的聚合操作审轮。
Order By 優(yōu)化
order by
只能是在一個reduce進程中進行鞠鲜,所以如果對一個大數(shù)據集進行order by
,會導致一個reduce進程中處理的數(shù)據相當大断国,造成查詢執(zhí)行緩慢贤姆。
- 在最終結果上進行
order by
,不要在中間的大數(shù)據集上進行排序稳衬。如果最終結果較少霞捡,可以在一個reduce上進行排序時,那么就在最后的結果集上進行order by
薄疚。 - 如果是去排序后的前N條數(shù)據碧信,可以使用
distribute by
和sort by
在各個reduce上進行排序后前N條,然后再對各個reduce的結果集合合并后在一個reduce中全局排序街夭,再取前N條砰碴,因為參與全局排序的order by
的數(shù)據量最多是reduce個數(shù) * N
,所以執(zhí)行效率很高板丽。
COUNT DISTINCT優(yōu)化
-- 優(yōu)化前(只有一個reduce呈枉,先去重再count負擔比較大):
select count(distinct id) from tablename;
-- 優(yōu)化后(啟動兩個job,一個job負責子查詢(可以有多個reduce)埃碱,另一個job負責count(1)):
select count(1) from (select distinct id from tablename) tmp;
一次讀取多次插入
有些場景是從一張表讀取數(shù)據后猖辫,要多次利用,這時可以使用multi insert
語法:
from sale_detail
insert overwrite table sale_detail_multi partition (sale_date='2010', region='china' )
select shop_name, customer_id, total_price where .....
insert overwrite table sale_detail_multi partition (sale_date='2011', region='china' )
select shop_name, customer_id, total_price where .....;
說明:
- 一般情況下砚殿,單個SQL中最多可以寫128路輸出啃憎,超過128路,則報語法錯誤似炎。
- 在一個multi insert中:
- 對于分區(qū)表辛萍,同一個目標分區(qū)不允許出現(xiàn)多次悯姊。
- 對于未分區(qū)表,該表不能出現(xiàn)多次贩毕。
- 對于同一張分區(qū)表的不同分區(qū)悯许,不能同時有
insert overwrite
和insert into
操作,否則報錯返回耳幢。
啟用壓縮
map 輸出壓縮
set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
中間數(shù)據壓縮
中間數(shù)據壓縮就是對 hive 查詢的多個 job 之間的數(shù)據進行壓縮岸晦。最好是選擇一個節(jié)省CPU耗時的壓縮方式【υ澹可以采用snappy
壓縮算法启上,該算法的壓縮和解壓效率都非常高。
set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;
結果數(shù)據壓縮
最終的結果數(shù)據(Reducer輸出數(shù)據)也是可以進行壓縮的店印,可以選擇一個壓縮效果比較好的冈在,可以減少數(shù)據的大小和數(shù)據的磁盤讀寫時間;
注:常用的gzip按摘,snappy壓縮算法是不支持并行處理的包券,如果數(shù)據源是gzip/snappy壓縮文件大文件,這樣只會有有個mapper來處理這個文件炫贤,會嚴重影響查詢效率溅固。
所以如果結果數(shù)據需要作為其他查詢任務的數(shù)據源,可以選擇支持splitable的LZO
算法兰珍,這樣既能對結果文件進行壓縮侍郭,還可以并行的處理,這樣就可以大大的提高job執(zhí)行的速度了掠河。關于如何給Hadoop集群安裝LZO壓縮庫可以查看這篇文章亮元。
set hive.exec.compress.output=true;
set mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
Hadoop集群支持一下算法:
- org.apache.hadoop.io.compress.DefaultCodec
- org.apache.hadoop.io.compress.GzipCodec
- org.apache.hadoop.io.compress.BZip2Codec
- org.apache.hadoop.io.compress.DeflateCodec
- org.apache.hadoop.io.compress.SnappyCodec
- org.apache.hadoop.io.compress.Lz4Codec
- com.hadoop.compression.lzo.LzoCodec
- com.hadoop.compression.lzo.LzopCodec
Hive架構層面優(yōu)化
啟用直接抓取
Hive 從 HDFS 中讀取數(shù)據,有兩種方式:啟用 MapReduce 讀取唠摹、直接抓取爆捞。
直接抓取數(shù)據比 MapReduce 方式讀取數(shù)據要快的多,但是只有少數(shù)操作可以使用直接抓取方式勾拉。
可以通過hive.fetch.task.conversion
參數(shù)來配置在什么情況下采用直接抓取方式:
-
minimal:只有
select *
煮甥、在分區(qū)字段上where
過濾、有limit
這三種場景下才啟用直接抓取方式望艺。 -
more:在
select
苛秕、where
篩選、limit
時找默,都啟用直接抓取方式。
set hive.fetch.task.conversion=more; -- 啟用fetch more模式
本地化執(zhí)行
Hive 在集群上查詢時吼驶,默認是在集群上多臺機器上運行惩激,需要多個機器進行協(xié)調運行店煞,這種方式很好的解決了大數(shù)據量的查詢問題。但是在Hive查詢處理的數(shù)據量比較小的時候风钻,其實沒有必要啟動分布式模式去執(zhí)行顷蟀,因為以分布式方式執(zhí)行設計到跨網絡傳輸、多節(jié)點協(xié)調等骡技,并且消耗資源鸣个。對于小數(shù)據集,可以通過本地模式布朦,在單臺機器上處理所有任務囤萤,執(zhí)行時間明顯被縮短。
set hive.exec.mode.local.auto=true; -- 打開hive自動判斷是否啟動本地模式的開關
set hive.exec.mode.local.auto.input.files.max=4; -- map任務數(shù)最大值
set hive.exec.mode.local.auto.inputbytes.max=134217728; -- map輸入文件最大大小
JVM重用
Hive 語句最終會轉換為一系列的 MapReduce 任務是趴,每一個MapReduce 任務是由一系列的Map Task 和 Reduce Task 組成的涛舍,默認情況下,MapReduce 中一個 Map Task 或者 Reduce Task 就會啟動一個 JVM 進程唆途,一個 Task 執(zhí)行完畢后富雅,JVM進程就會退出。這樣如果任務花費時間很短肛搬,又要多次啟動 JVM 的情況下没佑,JVM的啟動時間會變成一個比較大的消耗,這時温赔,可以通過重用 JVM 來解決蛤奢。
set mapred.job.reuse.jvm.num.tasks=5;
JVM也是有缺點的,開啟JVM重用會一直占用使用到的 task 的插槽让腹,以便進行重用远剩,直到任務完成后才會釋放。如果某個
不平衡的job
中有幾個 reduce task 執(zhí)行的時間要比其他的 reduce task 消耗的時間要多得多的話骇窍,那么保留的插槽就會一直空閑卻無法被其他的 job 使用瓜晤,直到所有的 task 都結束了才會釋放。
并行執(zhí)行
有的查詢語句腹纳,hive會將其轉化為一個或多個階段痢掠,包括:MapReduce 階段、抽樣階段嘲恍、合并階段足画、limit 階段等。默認情況下佃牛,一次只執(zhí)行一個階段淹辞。但是,如果某些階段不是互相依賴俘侠,是可以并行執(zhí)行的象缀。多階段并行是比較耗系統(tǒng)資源的蔬将。
set hive.exec.parallel=true; -- 可以開啟并發(fā)執(zhí)行。
set hive.exec.parallel.thread.number=16; -- 同一個sql允許最大并行度央星,默認為8霞怀。
推測執(zhí)行
在分布式集群環(huán)境下,因為程序Bug(包括Hadoop本身的bug)莉给,負載不均衡或者資源分布不均等原因毙石,會造成同一個作業(yè)的多個任務之間運行速度不一致,有些任務的運行速度可能明顯慢于其他任務(比如一個作業(yè)的某個任務進度只有50%颓遏,而其他所有任務已經運行完畢)徐矩,則這些任務會拖慢作業(yè)的整體執(zhí)行進度。為了避免這種情況發(fā)生州泊,Hadoop采用了推測執(zhí)行(Speculative Execution)機制丧蘸,它根據一定的法則推測出“拖后腿”的任務,并為這樣的任務啟動一個備份任務遥皂,讓該任務與原始任務同時處理同一份數(shù)據力喷,并最終選用最先成功運行完成任務的計算結果作為最終結果。
set mapreduce.map.speculative=true;
set mapreduce.reduce.speculative=true;
建議:
如果用戶對于運行時的偏差非常敏感的話演训,那么可以將這些功能關閉掉弟孟。如果用戶因為輸入數(shù)據量很大而需要執(zhí)行長時間的map或者Reduce task的話,那么啟動推測執(zhí)行造成的浪費是非常巨大大样悟。
擴展閱讀