背景
Hivequery將運(yùn)算好的數(shù)據(jù)寫回hdfs(比如insert into語句)踩蔚,有時(shí)候會(huì)產(chǎn)生大量的小文件檬输,如果不采用CombineHiveInputFormat就對這些小文件進(jìn)行操作的話會(huì)產(chǎn)生大量的map task,耗費(fèi)大量集群資源涩拙,而且小文件過多會(huì)對namenode造成很大壓力际长。所以Hive在正常job執(zhí)行完之后,會(huì)起一個(gè)conditional task兴泥,來判斷是否需要合并小文件工育,如果滿足要求就會(huì)另外啟動(dòng)一個(gè)map-only job 或者mapred job來完成合并
參數(shù)解釋
hive.mergejob.maponly (默認(rèn)為true)
如果Hadoop版本支持CombineFileInputFormat,則啟動(dòng)Map-only job for merge搓彻,否則啟動(dòng) ?MapReduce merge job如绸,map端combine file是比較高效的做法
hive.merge.mapfiles(默認(rèn)為true)
正常的map-only job后,是否啟動(dòng)merge job來合并map端輸出的結(jié)果
hive.merge.mapredfiles(默認(rèn)為false)
正常的map-reduce job后旭贬,是否啟動(dòng)merge job來合并reduce端輸出的結(jié)果怔接,建議開啟
hive.merge.smallfiles.avgsize(默認(rèn)為16MB)
如果不是partitioned table的話,輸出table文件的平均大小小于這個(gè)值稀轨,啟動(dòng)merge job扼脐,如果是partitioned table,則分別計(jì)算每個(gè)partition下文件平均大小奋刽,只merge平均大小小于這個(gè)值的partition瓦侮。這個(gè)值只有當(dāng)hive.merge.mapfiles或hive.merge.mapredfiles設(shè)定為true時(shí)艰赞,才有效
hive.exec.reducers.bytes.per.reducer(默認(rèn)為1G)
如果用戶不主動(dòng)設(shè)置mapred.reduce.tasks數(shù),則會(huì)根據(jù)input directory計(jì)算出所有讀入文件的input summary size肚吏,然后除以這個(gè)值算出reduce number
reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
reducers = Math.max(1, reducers);
reducers = Math.min(maxReducers, reducers);
hive.merge.size.per.task(默認(rèn)是256MB)
merge job后每個(gè)文件的目標(biāo)大蟹窖(targetSize),用之前job輸出文件的total size除以這個(gè)值罚攀,就可以決定merge job的reduce數(shù)目党觅。merge job的map端相當(dāng)于identity map,然后shuffle到reduce坞生,每個(gè)reduce dump一個(gè)文件仔役,通過這種方式控制文件的數(shù)量和大小
MapredWork work = (MapredWork) mrTask.getWork();
if (work.getNumReduceTasks() > 0) {
int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
int reducers = (int) ((totalSize +targetSize - 1) / targetSize);
reducers = Math.max(1, reducers);
reducers = Math.min(maxReducers, reducers);
work.setNumReduceTasks(reducers);
}
mapred.max.split.size(默認(rèn)256MB)
mapred.min.split.size.per.node(默認(rèn)1 byte)
mapred.min.split.size.per.rack(默認(rèn)1 byte)
這三個(gè)參數(shù)CombineFileInputFormat中會(huì)使用,Hive默認(rèn)的InputFormat是CombineHiveInputFormat是己,里面所有的調(diào)用(包括最重要的getSplits和getRecordReader)都會(huì)轉(zhuǎn)換成CombineFileInputFormat的調(diào)用又兵,所以可以看成是它的一個(gè)包裝。CombineFileInputFormat 可以將許多小文件合并成一個(gè)map的輸入卒废,如果文件很大沛厨,也可以對大文件進(jìn)行切分,分成多個(gè)map的輸入摔认。一個(gè)CombineFileSplit對應(yīng)一個(gè)map的輸入逆皮,包含一組path(hdfs路徑list),startoffset, lengths, locations(文件所在hostname list)mapred.max.split.size是一個(gè)split 最大的大小参袱,mapred.min.split.size.per.node是一個(gè)節(jié)點(diǎn)上(datanode)split至少的大小电谣,mapred.min.split.size.per.rack是同一個(gè)交換機(jī)(rack locality)下split至少的大小通過這三個(gè)數(shù)的調(diào)節(jié),組成了一串CombineFileSplit用戶可以通過增大mapred.max.split.size的值來減少M(fèi)ap Task數(shù)量
結(jié)論
hive 通過上述幾個(gè)值來控制是否啟動(dòng)merge file job抹蚀,通常是建議大家都開啟剿牺,如果是一堆順序執(zhí)行的作業(yè)鏈,只有最后一張表需要固化落地环壤,中間表用好就刪除的話晒来,可以在最后一個(gè)insert into table之前再開啟,防止之前的作業(yè)也會(huì)launch merge job使得作業(yè)變慢郑现。
上周還發(fā)現(xiàn)目前啟動(dòng)的針對RCFile的Block Merger在某種少見情況下湃崩,會(huì)生成duplicated files,Hive代碼中本身已經(jīng)考慮到這點(diǎn)接箫,所以會(huì)在Merger Task RCFileMergeMapper的JobClose函數(shù)中調(diào)用Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx), ?不過不知道為什么沒有生效攒读,還會(huì)存在重復(fù)文件,需要再研究下
Hive是否起merge job是由conditional task在運(yùn)行時(shí)決定的辛友,如果hadoop job或者h(yuǎn)ive未如預(yù)期般執(zhí)行合并作業(yè)整陌,則可以利用github上的file crush工具完成合并,它的原理也是啟動(dòng)一個(gè)mapreduce job完成合并,不過目前只支持textfile 和 sequencefile
鏈接地址:https://github.com/edwardcapriolo/filecrush
轉(zhuǎn)自:https://blog.csdn.net/apple001100/article/details/62420724