一稼虎、MapReduce完整流程
MapTask工作機(jī)制
ReduceTask工作機(jī)制
MapTask工作機(jī)制:
(1)Read階段:MapTask通過用戶編寫的RecordReader济欢,從輸入InputSplit中解析出一個(gè)個(gè)key/value垮庐。
(2)Map階段:該節(jié)點(diǎn)主要是將解析出的key/value交給用戶編寫map()函數(shù)處理岂傲,并產(chǎn)生一系列新的key/value腔呜。
(3)Collect收集階段:在用戶編寫map()函數(shù)中出爹,當(dāng)數(shù)據(jù)處理完成后,一般會(huì)調(diào)用OutputCollector.collect()輸出結(jié)果馅袁。在該函數(shù)內(nèi)部域慷,它會(huì)將生成的key/value分區(qū)(調(diào)用Partitioner),并寫入一個(gè)環(huán)形內(nèi)存緩沖區(qū)中汗销。
(4)Spill階段:即“溢寫”犹褒,當(dāng)環(huán)形緩沖區(qū)滿80%后,MapReduce會(huì)將數(shù)據(jù)寫到本地磁盤上弛针,生成一個(gè)臨時(shí)文件叠骑。需要注意的是,將數(shù)據(jù)寫入本地磁盤之前削茁,先要對(duì)數(shù)據(jù)進(jìn)行一次本地排序座云,并在必要時(shí)對(duì)數(shù)據(jù)進(jìn)行合并、壓縮等操作付材。
溢寫階段詳情:
步驟1:利用快速排序算法對(duì)緩存區(qū)內(nèi)的數(shù)據(jù)進(jìn)行排序朦拖,排序方式是,先按照分區(qū)編號(hào)Partition進(jìn)行排序厌衔,然后按照key進(jìn)行排序璧帝。這樣,經(jīng)過排序后富寿,數(shù)據(jù)以分區(qū)為單位聚集在一起睬隶,且同一分區(qū)內(nèi)所有數(shù)據(jù)按照key有序。
步驟2:按照分區(qū)編號(hào)由小到大依次將每個(gè)分區(qū)中的數(shù)據(jù)寫入任務(wù)工作目錄下的臨時(shí)文件output/spillN.out(N表示當(dāng)前溢寫次數(shù))中页徐。如果用戶設(shè)置了Combiner苏潜,則寫入文件之前,對(duì)每個(gè)分區(qū)中的數(shù)據(jù)進(jìn)行一次聚集操作变勇。
步驟3:將分區(qū)數(shù)據(jù)的元信息寫到內(nèi)存索引數(shù)據(jù)結(jié)構(gòu)SpillRecord中恤左,其中每個(gè)分區(qū)的元信息包括,在臨時(shí)文件中的偏移量搀绣、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小飞袋。如果當(dāng)前內(nèi)存索引大小超過1MB,則將內(nèi)存索引寫到文件output/spillN.out.index中链患。
(5)合并階段:當(dāng)所有數(shù)據(jù)處理完成后巧鸭,MapTask對(duì)所有臨時(shí)文件進(jìn)行一次合并,以確保最終只會(huì)生成一個(gè)數(shù)據(jù)文件麻捻。
當(dāng)所有數(shù)據(jù)處理完后纲仍,MapTask會(huì)將所有臨時(shí)文件合并成一個(gè)大文件呀袱,并保存到文件output/file.out中,同時(shí)生成相應(yīng)的索引文件output/file.out.index郑叠。
在進(jìn)行文件合并過程中夜赵,MapTask以分區(qū)為單位進(jìn)行合并。對(duì)于某個(gè)分區(qū)锻拘,它將采用多輪遞歸合并的方式。每輪合并io.sort.factor(默認(rèn)10)個(gè)文件击蹲,并將產(chǎn)生的文件重新加入待合并列表中署拟,對(duì)文件排序后,重復(fù)以上過程歌豺,直到最終得到一個(gè)大文件推穷。
ReduceTask工作機(jī)制:
(1)Copy階段:ReduceTask從各個(gè)MapTask上遠(yuǎn)程拷貝一片數(shù)據(jù),并針對(duì)某一片數(shù)據(jù)类咧,如果其大小超過一定閾值馒铃,則寫到磁盤上,否則直接放到內(nèi)存中痕惋。
(2)Merge階段:在遠(yuǎn)程拷貝數(shù)據(jù)的同時(shí)区宇,ReduceTask啟動(dòng)了兩個(gè)后臺(tái)線程對(duì)內(nèi)存和磁盤上的文件進(jìn)行合并,以防止內(nèi)存使用過多或磁盤上文件過多值戳。
(3)Sort階段:當(dāng)所有map task的分區(qū)數(shù)據(jù)全部拷貝完议谷,按照MapReduce語義,用戶編寫reduce()函數(shù)輸入數(shù)據(jù)是按key進(jìn)行聚集的一組數(shù)據(jù)堕虹。為了將key相同的數(shù)據(jù)聚在一起卧晓,Hadoop采用了基于排序的策略。由于各個(gè)MapTask已經(jīng)實(shí)現(xiàn)對(duì)自己的處理結(jié)果進(jìn)行了局部排序赴捞,因此逼裆,ReduceTask只需對(duì)所有數(shù)據(jù)進(jìn)行一次歸并排序即可。
(4)Reduce階段:reduce()函數(shù)將計(jì)算結(jié)果寫到HDFS上赦政。
讓每個(gè)MapTask最終只生成一個(gè)數(shù)據(jù)文件胜宇,可避免同時(shí)打開大量文件和同時(shí)讀取大量小文件產(chǎn)生的隨機(jī)讀取帶來的開銷。
二恢着、性能優(yōu)化
思維導(dǎo)圖:
1掸屡、參數(shù)優(yōu)化
Map階段
從HDFS讀取數(shù)據(jù)
map數(shù)量問題
map數(shù)量多:存在大量小文件,每個(gè)小文件對(duì)應(yīng)一個(gè)map任務(wù)然评,map任務(wù)啟動(dòng)和
初始化時(shí)間大于邏輯處理時(shí)間仅财,造成資源浪費(fèi)。
map數(shù)量少:存在大文件碗淌,map執(zhí)行時(shí)間長(zhǎng)盏求,影響整個(gè)任務(wù)執(zhí)行抖锥。
原則
使大數(shù)據(jù)量利用合適的map數(shù)
使單個(gè)map任務(wù)處理合適的數(shù)據(jù)量(存在文件不大,但是字段少記錄多的情況)
切片公式:splitSize=Math.max(minSize, Math.min(maxSize, blockSize));
方案
合并小文件碎罚,減少map數(shù)磅废,修改相關(guān)參數(shù)
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set mapred.max.split.size; --切片最大值,大于這個(gè)值要切分
set mapred.min.split.size; --切片最小值荆烈,小于這個(gè)值要合并
--每個(gè)節(jié)點(diǎn)處理的最小split拯勉,節(jié)點(diǎn)的文件大小小于這個(gè)值,同一個(gè)節(jié)點(diǎn)下合并憔购;在多個(gè)節(jié)點(diǎn)下跨節(jié)點(diǎn)合并
set mapred.min.split.size.per.node;
--每個(gè)機(jī)架處理的最小split宫峦,機(jī)架的文件大小小于這個(gè)值,同一個(gè)機(jī)架下合并玫鸟;在多個(gè)節(jié)點(diǎn)下跨機(jī)架合并
set mapred.min.split.size.per.rack;
一般參數(shù)值大小關(guān)系:max.split.size >= min.split.size >= min.size.per.node >= min.size.per.rack
參數(shù)作用優(yōu)先升級(jí)關(guān)系:max.split.size <= min.split.size <= min.size.per.node <= min.size.per.rack
增加map數(shù)(存在文件不大导绷,但是字段少記錄多)
設(shè)置reduce個(gè)數(shù):set mapred.reduce.tasks=n;
重新寫入新表: create table target_table as select * from source_table distribute by rand();
對(duì)新表進(jìn)行邏輯處理,增大map數(shù)量屎飘,提高執(zhí)行效率
設(shè)置切片最大值妥曲、最小值
set mapred.max.split.size; --切片最大值,大于這個(gè)值要切分
set mapred.min.split.size; --切片最小值钦购,小于這個(gè)值要合并
相關(guān)影響
是否壓縮&是否可切分
set hive.exec.compress.output; --查看是否開啟壓縮
set io.compression.codecs; --查看配置了哪些壓縮算法
set mapreduce.output.fileoutputformat.compress.codec; --查看使用的壓縮算法
參考資料
《MapReduce過程詳解及其性能優(yōu)化》
《真正讓你明白Hive參數(shù)調(diào)優(yōu)系列1:控制map個(gè)數(shù)與性能調(diào)優(yōu)參數(shù)》
《hive優(yōu)化之------控制hive任務(wù)中的map數(shù)和reduce數(shù)》
《hive的性能優(yōu)化之參數(shù)調(diào)優(yōu)》
《大多數(shù)開發(fā)人員都弄錯(cuò)的Hive與MapReduce小文件合并問題》
Partition
partition值默認(rèn)是通過計(jì)算key的hash值后對(duì)Reduce task的數(shù)量取模獲得
寫數(shù)據(jù)到磁盤
環(huán)形緩沖區(qū)
set mapreduce.task.io.sort.mb檐盟;--環(huán)形緩沖區(qū)字節(jié)數(shù)組大小,默認(rèn)100M
set mapreduce.map.sort.spill.percent押桃; --環(huán)形緩沖區(qū)空間達(dá)到該占比后溢出遵堵,默認(rèn)0.80
溢寫
Sort
快速排序?qū)彌_區(qū)數(shù)據(jù)進(jìn)行排序,先按照分區(qū)編號(hào)進(jìn)行排序怨规,再按照key進(jìn)行排序陌宿。
Combiner
set min.num.spill.for.combine; --默認(rèn)3,即溢寫文件最少3個(gè)的時(shí)候波丰,combiner函數(shù)在merge產(chǎn)生結(jié)果文件前運(yùn)行
merge
set mapreduce.task.io.sort.factor壳坪;--采用多輪遞歸合并的方式,合并時(shí)最多同時(shí)合并的溢寫文件數(shù)掰烟,默認(rèn)10
對(duì)溢寫文件爽蝴,合并文件進(jìn)行壓縮
set mapreduce.map.output.compress; --開啟壓縮,默認(rèn)false
set mapreduce.map.output.compress.codec; --設(shè)置壓縮方式纫骑,默認(rèn)org.apache.hadoop.io.compress.DefaultCodec
Shuffle階段
Map端Shuffle
排序
規(guī)約
合并
Reduce端Shuffle
Copy
set mapreduce.reduce.shuffle.parallelcopies; --Reduce同時(shí)到多少個(gè)map端fetcher數(shù)據(jù)的并行度蝎亚,默認(rèn)5
set mapreduce.reduce.shuffle.read.timeout; --Reduce下載線程最大的下載時(shí)間段,避免網(wǎng)絡(luò)原因?qū)е抡`判先馆,默認(rèn)180000ms
SortMerge
Reduce
set mapreduce.reduce.input.buffer.percent; --reduce處理數(shù)據(jù)的內(nèi)存占比发框,默認(rèn)為0.0,即全部從磁盤讀處理數(shù)據(jù)煤墙;如果內(nèi)存足夠大梅惯,可適當(dāng)分配宪拥,提高計(jì)算速度
參考資料
《MapReduce執(zhí)行流程和Shuffle過程》
Reduce階段
分組
Reduce
Reduce數(shù)量問題
Reduce數(shù)量多:ReduceTask啟動(dòng)和初始化消耗時(shí)間和資源
Reduce數(shù)量少:同時(shí)過多的Reduce會(huì)生成很多個(gè)文件,也可能導(dǎo)致小文件問題
原則
使大數(shù)據(jù)量利用合適的reduce數(shù)
使單個(gè)reduce任務(wù)處理合適的數(shù)據(jù)量
規(guī)則
默認(rèn)規(guī)則
參數(shù)1:hive.exec.reducers.bytes.per.reducer --每個(gè)reduce任務(wù)處理的數(shù)據(jù)量
參數(shù)2:hive.exec.reducers.max --每個(gè)任務(wù)最大的reduce數(shù)量
計(jì)算規(guī)則:reduceNum = min(參數(shù)2, 總數(shù)據(jù)量/參數(shù)1)
手動(dòng)設(shè)置
set mapred.reduce.tasks = n;
只有一個(gè)reduce的i情況
情況1:select UDTF聚合函數(shù)卻沒帶group by
方案:使用group by
情況2:select count(distinct)
方案:select count(1) from (select column from table group by colum)
情況3:order by全局排序
方案:使用limit
情況4:笛卡爾積铣减,即join時(shí)沒有關(guān)聯(lián)條件
方案:使用on條件她君;注意:on條件之間只能用and不能用or
通用參數(shù)
set mapreduce.map.memory.mb; --mapTask容器內(nèi)存
set mapreduce.reduce.memory.mb; --reduceTask容器內(nèi)存
set io.file.buffer.size; --緩沖池大小,默認(rèn)4k葫哗,可適當(dāng)調(diào)高
2缔刹、HQL優(yōu)化
數(shù)據(jù)傾斜
原因:MapReduce計(jì)算中,大量的相同的key被分配到同一個(gè)ReduceTask中劣针,造成數(shù)據(jù)熱點(diǎn)校镐,負(fù)載不均衡。
原因
場(chǎng)景1:Join
大表join小表 or 小表join大表
方案1:mapjoin酿秸,將小表全部加載到內(nèi)存中灭翔,在map端進(jìn)行join后輸出魏烫,避免reducer處理
set hive.auto.covert.join = true; --開啟mapjoin功能辣苏,如果join中小表小于閾值則轉(zhuǎn)為mapjoin
set hive.mapjoin.smalltable.filesize=26214400; --大表小表的閾值設(shè)置(默認(rèn)25M以下為小表)
方案2:小表放到子查詢,小表的查詢結(jié)果相當(dāng)于大表的where條件
select a.* from bigtable a where id in (select id from smalltable group by id);
大表join大表
方案1:替換關(guān)聯(lián)字段中的大量空值/重復(fù)值
方法1:空key過濾/去重
select t1.,t2.name from
(select * from table1 where length(id)>=1 or id is not null) t1
join
(select * from table2 where length(id)>=1 or id is not null) t2
on t1.id = t2.id ;
OR
select t1. from
(select * from table1 where length(id)>=1 or id is not null) t1
join
(select id from table2 group by id) t2
on t1.id = t2.id ;
OR
select * from log a join user b on a.user_id is not null and a.user_id = b.user_id
union all
select * from log c where c.user_id is null;
方法2:空key轉(zhuǎn)換
select t1.,t2.name
from table1 t1
join table2 t2
on case when t1.id is null or length(id) = 0 then concat(hive,rand()) else t1.id end = t2.id;
方案2:大表切分成小表后mapjoin
例如:日志表關(guān)聯(lián)用戶表哄褒,對(duì)日志表進(jìn)行小表處理后關(guān)聯(lián)用戶表mapjoin
原邏輯:
select * from log a left join users b on a.user_id = b.user_id;
改進(jìn)后:
select /+mapjoin(x)/ *
from log a
left join
(select /+mapjoin(c)/ d.
from (select distinct user_id from log) c join users d on c.user_id = d.user_id)x
on a.user_id = x.user_id;
場(chǎng)景2:不同數(shù)據(jù)類型關(guān)聯(lián)
例如:user表中user_id為int稀蟋,log表中user_id既有string也有int,關(guān)聯(lián)時(shí)默認(rèn)hash按照int類型的user_id分配呐赡,導(dǎo)致所有string類型的user_id分配到同一個(gè)reducer中
方案:把int類型轉(zhuǎn)為string的user_id
select * from user a left join log b on b.user_id = cast(a.user_id as string);
場(chǎng)景3:group by
問題:map階段同一個(gè)key數(shù)據(jù)分發(fā)給一個(gè)reduce退客,key數(shù)據(jù)過大導(dǎo)致數(shù)據(jù)傾斜
方案:數(shù)據(jù)負(fù)載均衡時(shí),查詢計(jì)劃生成兩個(gè)MRJob链嘀。
set hive.map.aggr = true; --是否在Map端進(jìn)行聚合萌狂,默認(rèn)為true
set hive.groupby.mapaggr.checkinterval = 100000; --在Map端進(jìn)行聚合操作的條目數(shù)目;默認(rèn)1000000
set hive.groupby.skewindata = true; --有數(shù)據(jù)傾斜的時(shí)候進(jìn)行負(fù)載均衡怀泊,默認(rèn)False
參考資料
《hive.map.aggr茫藏、hive.groupby.skewindata執(zhí)行過程》
場(chǎng)景4:count(distinct)
子主題 1
參考資料
《深入淺出Hive數(shù)據(jù)傾斜》
《Hive學(xué)習(xí)之路 (十九)Hive的數(shù)據(jù)傾斜》
分區(qū)裁剪&列裁剪
分區(qū)裁剪:讀取數(shù)據(jù)時(shí),對(duì)數(shù)據(jù)分區(qū)過濾
列裁剪:讀取數(shù)據(jù)時(shí)霹琼,只獲取需要的列务傲,減少數(shù)據(jù)輸入
謂詞下推PPD(Predicate Pushdown)
set hive.optimize.ppd = true; --設(shè)置謂詞下推
所謂的謂詞下推就是在join之前的mr(spark)任務(wù)的map階段提前對(duì)表進(jìn)行過濾優(yōu)化,使得最后參與join的表的數(shù)據(jù)量更小
1枣申、對(duì)于Join(Inner Join)售葡、Full outer Join,條件寫在on后面忠藤,還是where后面挟伙,性能上面沒有區(qū)別;
2模孩、對(duì)于Left outer Join 像寒,右側(cè)的表寫在on后面烘豹、左側(cè)的表寫在where后面,性能上有提高诺祸;
3携悯、對(duì)于Right outer Join,左側(cè)的表寫在on后面筷笨、右側(cè)的表寫在where后面憔鬼,性能上有提高;
select ename,dept_name from E left outer join D on ( E.dept_id = D.dept_id and E.eid='HZ001' and D.dept_id = 'D001'); --dept_id在map端過濾胃夏,eid在reduce端過濾
select ename,dept_name from E left outer join D on ( E.dept_id = D.dept_id and D.dept_id = 'D001') where E.eid='HZ001'; --dept_id轴或,eid都在map端過濾
select ename,dept_name from E left outer join D on ( E.dept_id = D.dept_id and E.eid='HZ001') where D.dept_id = 'D001'; --dept_id,eid都在reduce端過濾
elect ename,dept_name from E left outer join D on ( E.dept_id = D.dept_id ) where E.eid='HZ001' and D.dept_id = 'D001'; --dept_id在reduce端過濾仰禀,eid在map端過濾
注意:如果在表達(dá)式中含有不確定函數(shù)照雁,整個(gè)表達(dá)式的謂詞將不會(huì)被pushed,例如:
select a.*
from a join b on a.id = b.id
where a.ds = '2019-10-09' and a.create_time = unix_timestamp();
因?yàn)閡nix_timestamp是不確定函數(shù)答恶,在編譯的時(shí)候無法得知饺蚊,所以,整個(gè)表達(dá)式不會(huì)被pushed悬嗓,即ds='2019-10-09'也不會(huì)被提前過濾污呼。類似的不確定函數(shù)還有rand()等。
參考資料
《Hive中的Predicate Pushdown Rules(謂詞下推規(guī)則)》
3包竹、架構(gòu)優(yōu)化
fetch抓取
不通過MapReduce燕酷,直接讀取存儲(chǔ)文件,輸出結(jié)果周瞎;適用于全局查找苗缩、字段查找、limit查找
set hive.fetch.task.conversion=more;
本地模式
set hive.exec.mode.local.auto=true; --開啟本地mapreduce模式
set hive.exec.mode.local.auto.inputbytes.max=50000000; --當(dāng)輸入數(shù)據(jù)量小于這個(gè)值声诸,啟動(dòng)本地mr
set hive.exec.mode.local.auto.input.files.max=5; --當(dāng)輸入文件個(gè)數(shù)小于這個(gè)值酱讶,啟動(dòng)本地mr
并行執(zhí)行
把一個(gè)sql語句中沒有相互依賴的階段并行去運(yùn)行。提高集群資源利用率
set hive.exec.parallel=true; --開啟并行執(zhí)行双絮,默認(rèn)false
set hive.exec.parallel.thread.number=16; --同一個(gè)sql允許最大并行度浴麻,默認(rèn)8
嚴(yán)格模式
set hive.mapred.mode=strict; --設(shè)置嚴(yán)格模式
情況1:分區(qū)表,必須含有對(duì)分區(qū)字段的where過濾條件
情況2:order by囤攀,對(duì)于使用order by的查詢软免,必須使用limit
情況3:笛卡爾,避免使用笛卡爾查詢
JVM重用(慎用)
set mapred.job.reuse.jvm.num.tasks=10; --設(shè)置JVM實(shí)例在同一個(gè)job中重復(fù)使用10次焚挠,通常在10-20之間
情況1:很難避免小文件
情況2:task特別多
推測(cè)執(zhí)行(慎用)
如果輸入數(shù)據(jù)量很大需要執(zhí)行長(zhǎng)時(shí)間的map或者redue膏萧,推測(cè)執(zhí)行會(huì)造成很大資源浪費(fèi)。
壓縮
Hive表中間數(shù)據(jù)壓縮
set hive.exec.compress.intermediate=true; --開啟中間數(shù)據(jù)壓縮功能,默認(rèn)false
set mapreduce.map.output.compress=true; --開啟mapreduce中map輸出壓縮功能榛泛,默認(rèn)false
set mapred.map.output.compression.codec= org.apache.hadoop.io.compress.SnappyCodec; --設(shè)置中間數(shù)據(jù)的壓縮算法
Hive表最終輸出壓縮
set hive.exec.compress.output=true; --開啟輸出結(jié)果壓縮功能
set mapred.output.compression.codec= org.apache.hadoop.io.compress.SnappyCodec; --設(shè)置輸出結(jié)果壓縮算法