單個(gè)表每天數(shù)據(jù)有50億左右。需用二級(jí)分區(qū)優(yōu)化該表利耍。
1、最初查詢
insert into table xx_parquet_v2 PARTITION(dt, uiappid) select %s from xxx where dt= %s;
錯(cuò)誤:
Java Heap Space盔粹“妫或者GC overhead limit exceeded。
原因:
Parquet和ORC是列式批處理文件格式舷嗡。這些格式要求在寫入文件之前將批次的行(batches of rows)緩存在內(nèi)存中轴猎。在執(zhí)行INSERT語(yǔ)句時(shí),動(dòng)態(tài)分區(qū)目前的實(shí)現(xiàn)是:至少為每個(gè)動(dòng)態(tài)分區(qū)目錄打開一個(gè)文件寫入器(file writer)进萄。由于這些緩沖區(qū)是按分區(qū)維護(hù)的捻脖,因此在運(yùn)行時(shí)所需的內(nèi)存量隨著分區(qū)數(shù)量的增加而增加。所以經(jīng)常會(huì)導(dǎo)致mappers或reducers的OOM中鼠,具體取決于打開的文件寫入器(file writer)的數(shù)量可婶。
通過(guò)INSERT語(yǔ)句插入數(shù)據(jù)到動(dòng)態(tài)分區(qū)表中,也可能會(huì)超過(guò)HDFS同時(shí)打開文件數(shù)的限制援雇。
如果沒(méi)有join或聚合扰肌,INSERT ... SELECT語(yǔ)句會(huì)被轉(zhuǎn)換為只有map任務(wù)的作業(yè)。mapper任務(wù)會(huì)讀取輸入記錄然后將它們發(fā)送到目標(biāo)分區(qū)目錄熊杨。在這種情況下曙旭,每個(gè)mapper必須為遇到的每個(gè)動(dòng)態(tài)分區(qū)創(chuàng)建一個(gè)新的文件寫入器(file writer)盗舰。mapper在運(yùn)行時(shí)所需的內(nèi)存量隨著它遇到的分區(qū)數(shù)量的增加而增加。
詳細(xì)原因:https://blog.csdn.net/frank_jyp/article/details/81780821
2桂躏、第一次修改
set hive.optimize.sort.dynamic.partition = true钻趋,從新跑上述語(yǔ)句。
通過(guò)這個(gè)優(yōu)化剂习,這個(gè)只有map任務(wù)的mapreduce會(huì)引入reduce過(guò)程蛮位,這樣動(dòng)態(tài)分區(qū)的那個(gè)字段比如日期在傳到reducer時(shí)會(huì)被排序。由于分區(qū)字段是排序的鳞绕,因此每個(gè)reducer只需要保持一個(gè)文件寫入器(file writer)隨時(shí)處于打開狀態(tài)失仁,在收到來(lái)自特定分區(qū)的所有行后,關(guān)閉記錄寫入器(record writer)们何,從而減小內(nèi)存壓力萄焦。這種優(yōu)化方式在寫parquet文件時(shí)使用的內(nèi)存要相對(duì)少一些,但代價(jià)是要對(duì)分區(qū)字段進(jìn)行排序冤竹。
但reduce階段一直卡在99%拂封,判斷是uiappid數(shù)據(jù)傾斜導(dǎo)致。驗(yàn)證數(shù)據(jù)傾斜:
# 找出uiappid條數(shù)大于1億條的uiappid
select uiappid, count(*) as t from xxx where dt=%s group by uiappid having t>100000000;
然后你會(huì)發(fā)現(xiàn)跑得特別慢鹦蠕。開啟map group優(yōu)化(Map端部分聚合冒签,相當(dāng)于Combiner):
hive.map.aggr=true
設(shè)置上述參數(shù)即可。若是其他情況的group優(yōu)化钟病,可參考hive.groupby.skewindata參數(shù)萧恕。
hive.groupby.skewindata=true
有數(shù)據(jù)傾斜的時(shí)候進(jìn)行負(fù)載均衡,當(dāng)hive.groupby.skewindata設(shè)定為 true肠阱,生成的查詢計(jì)劃會(huì)有兩個(gè) MR Job廊鸥。第一個(gè) MR Job 中,Map 的輸出結(jié)果集合會(huì)隨機(jī)分布到 Reduce 中辖所,每個(gè) Reduce 做部分聚合操作惰说,并輸出結(jié)果,這樣處理的結(jié)果是相同的 Group By Key 有可能被分發(fā)到不同的 Reduce 中缘回,從而達(dá)到負(fù)載均衡的目的吆视;第二個(gè) MR Job 再根據(jù)預(yù)處理的數(shù)據(jù)結(jié)果按照 Group By Key 分布到 Reduce 中(這個(gè)過(guò)程可以保證相同的 Group By Key 被分布到同一個(gè) Reduce 中),最后完成最終的聚合操作酥宴。
3啦吧、第二次修改
分兩步:
1、第一步:找出條數(shù)大于1億的uiappid后拙寡,select時(shí)過(guò)濾調(diào)這些大的uiappid授滓。通過(guò)這個(gè)優(yōu)化過(guò),reduce階段單個(gè)key的數(shù)據(jù)都不超過(guò)1億條,可以快速得到結(jié)果般堆。
set hive.optimize.sort.dynamic.partition = true;
insert into table xx_parquet_v2 PARTITION(dt='%s', uiappid) select %s from xxx where dt= %s and uiappid not in ('a','b');
2在孝、第二步:再次將uiappid條數(shù)大于1億的數(shù)據(jù)插入表中。因?yàn)榇笥?億條的uiappid比較少淮摔,可以為每個(gè)mapper遇到的分區(qū)創(chuàng)建一個(gè)文件寫入器(file writer)私沮。
insert into table xx_parquet_v2 PARTITION(dt='%s', uiappid) select %s from xxx where dt= %s and uiappid in ('a','b');
4、其他的配置
set mapreduce.map.memory.mb=6144;
set mapreduce.map.java.opts=-Xmx4096m; # map 內(nèi)存配置
set mapreduce.input.fileinputformat.split.maxsize=1024000000;
set mapreduce.input.fileinputformat.split.minsize=1024000000;
set mapred.max.split.size=1024000000;
set mapred.min.split.size.per.node=1024000000;
set mapred.min.split.size.per.rack=1024000000; # map文件大小配置
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set parquet.memory.min.chunk.size=100000; # parquet文件格式配置
set hive.exec.dynamic.partition.mode=nonstrict; #配置動(dòng)態(tài)分區(qū)
set mapreduce.reduce.memory.mb=8192;
set mapreduce.reduce.java.opts=-Xmx6144m; # 配置reduce內(nèi)存限制