一陪每、數(shù)據(jù)傾斜概念
? ? ? 1.1影晓、是指shuffle過程中,必須將各個(gè)節(jié)點(diǎn)上相同key拉取到某個(gè)節(jié)點(diǎn)上的一個(gè)task來進(jìn)行處理檩禾,此時(shí)如果某個(gè)key對(duì)應(yīng)的數(shù)據(jù)特別大的話挂签,就會(huì)發(fā)生數(shù)據(jù)傾斜。
? ? ? 1.2盼产、數(shù)據(jù)傾斜舉例:
二饵婆、數(shù)據(jù)傾斜現(xiàn)象
? ? ? 2.1、同一個(gè)stage中相同task絕大部分task執(zhí)行時(shí)間快戏售,少數(shù)幾個(gè)執(zhí)行時(shí)間慢侨核。往往慢task耗時(shí)是快task耗時(shí)的2-3倍以上草穆。
? ? ? 2.2、原本正常運(yùn)行的任務(wù)某天突然oom,也可能是發(fā)生了數(shù)據(jù)傾斜搓译。
三悲柱、定位數(shù)據(jù)傾斜的代碼
? ? ? 3.1、數(shù)據(jù)傾斜是發(fā)生在shuffle過程中些己,定位代碼中具有 shuffle的算子豌鸡。例如:reduceBykey,groupByKey,aggregateByKey,distinct,cogroup,join,partionBy,repartion.
? ? 3.2、結(jié)合任務(wù)管理界面轴总,比對(duì)同一個(gè)task的執(zhí)行時(shí)間直颅,與數(shù)據(jù)量。如果有個(gè)別相差較大就說有數(shù)據(jù)傾斜怀樟,從而根據(jù)task定位到對(duì)應(yīng)stage的shuffle算子功偿。
? ? 3.3、對(duì)數(shù)據(jù)進(jìn)行無放回采樣往堡,查看key的分布械荷,得到傾斜的key值,再從代碼中定位哪些地方會(huì)有這些key參與從而定位到對(duì)應(yīng)的代碼塊虑灰。
四吨瞎、數(shù)據(jù)傾斜解決方案
4.1、對(duì)數(shù)據(jù)進(jìn)行etl 預(yù)處理數(shù)據(jù)
? ? ? ? ? ? ? 使用場景:
? ? ? ? ? ? ? ? ? ? a穆咐、hive中文件大小不均勻颤诀,有的大有的小。spark在讀取大文件時(shí)會(huì)對(duì)大文件按对湃。照block快進(jìn)行切分崖叫,小文件不會(huì)切分。如果不進(jìn)行預(yù)處理拍柒,那么小文件處理速度快心傀,大文件處理慢、資源沒有得到充分利用拆讯,可以先對(duì)hive數(shù)據(jù)進(jìn)行清洗脂男、去重、重新分區(qū)等操作來將原本不均勻的數(shù)據(jù)重新均勻的存放在多個(gè)文件中种呐。以簡化后面依賴此數(shù)據(jù)源的任務(wù)宰翅。
? ? ? ? ? ? ? ? ? ? b、hive中key分布不均勻陕贮,可以將shuffle類操作在先進(jìn)行處理堕油。處理完畢之后,spark應(yīng)用不必進(jìn)行重復(fù)的shuffle,直接用處理后的結(jié)果就可以掉缺。在頻繁調(diào)用spark作業(yè)并且有實(shí)效要求的場景中卜录,如果今天作業(yè)要用到昨天數(shù)據(jù)的聚合數(shù)據(jù),可以每天進(jìn)行一次預(yù)處理眶明,將數(shù)據(jù)聚合好艰毒,從而保證今天作業(yè)的實(shí)效要求。
4.2搜囱、過濾少數(shù)導(dǎo)致傾斜的key
? ? ? ? ? ? ? 使用場景:
? ? ? ? ? ? ? ? ? ? ? a丑瞧、傾斜key沒有業(yè)務(wù)有意義,比如存在很多key是‘-’(‘-’在我們系統(tǒng)代表空)的記錄蜀肘,那么久可以直接filter掉來解決’-‘帶來的數(shù)據(jù)傾斜绊汹。
? ? ? ? ? ? ? ? ? ? ? b、傾斜key是有意義的扮宠,那么就需要單獨(dú)拎出來進(jìn)行單獨(dú)處理西乖。
4.3、提高shuffle操作的并行度
使用場景:同一個(gè)task被分配了多個(gè)傾斜的key.試圖增加shuffle read task的數(shù)量坛增,可以讓原本分配給一個(gè)task的多個(gè)key分配給多個(gè)task获雕,從而讓每個(gè)task處理比原來更少的數(shù)據(jù)。實(shí)現(xiàn)起來比較簡單收捣,可以有效緩解和減輕數(shù)據(jù)傾斜的影響届案,原理如下圖
4.4、兩階段聚合(局部聚合+全局聚合)
? ? ? ? ? ? ? 使用場景:對(duì)RDD執(zhí)行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進(jìn)行分組聚合時(shí)罢艾,比較適用這種方案楣颠。如果是join類的shuffle操作,還得用其他的解決方案咐蚯。
? ? ? ? ? ? ? 實(shí)現(xiàn)方式:將原本相同的key通過附加隨機(jī)前綴的方式球碉,變成多個(gè)不同的key,就可以讓原本被一個(gè)task處理的數(shù)據(jù)分散到多個(gè)task上去做局部聚合仓蛆,進(jìn)而解決單個(gè)task處理數(shù)據(jù)量過多的問題。接著去除掉隨機(jī)前綴挎春,再次進(jìn)行全局聚合看疙,就可以得到最終的結(jié)果。具體原理見下圖直奋。
4.5能庆、將reduce join轉(zhuǎn)為map join
? ? ? ? ? ? ? ?使用場景:join類操作,存在小表join大表的場景脚线「榈ǎ可以將小表進(jìn)行廣播從而避免shuffle
4.6、采樣傾斜key并分拆join操作
? ? ? ? ? ? ? ?使用場景:適用于join類操作中,由于相同key過大占內(nèi)存渠旁,不能使用4.5方案攀例,但傾斜key的種數(shù)不是很多的場景。
? ? ? ? ? ? ? ? 實(shí)現(xiàn)方式:
? ? ? ? ? ? ? ? ? ? ? ? 第一步顾腊、 對(duì)包含少數(shù)幾個(gè)數(shù)據(jù)量過大的key的那個(gè)RDD粤铭,通過sample算子采樣出一份樣本來,然后統(tǒng)計(jì)一下每個(gè)key的數(shù)量杂靶,計(jì)算出來數(shù)據(jù)量最大的是哪幾個(gè)key.
? ? ? ? ? ? ? ? ? ? ? ? 第二步梆惯、將這幾個(gè)key對(duì)應(yīng)的數(shù)據(jù)從原來的RDD中拆分出來,形成一個(gè)單獨(dú)的RDD吗垮,并給每個(gè)key都打上n以內(nèi)的隨機(jī)數(shù)作為前綴垛吗,而不會(huì)導(dǎo)致傾斜的大部分key形成另外一個(gè)RDD。
? ? ? ? ? ? ? ? ? ? ? 第三步烁登、 接著將需要join的另一個(gè)RDD怯屉,也過濾出來那幾個(gè)傾斜key對(duì)應(yīng)的數(shù)據(jù)并形成一個(gè)單獨(dú)的RDD,將每條數(shù)據(jù)膨脹成n條數(shù)據(jù)防泵,這n條數(shù)據(jù)都按順序附加一個(gè)0~n的前綴蚀之,不會(huì)導(dǎo)致傾斜的大部分key也形成另外一個(gè)RDD。
? ? ? ? ? ? ? ? ? ? ? 第四步捷泞、再將附加了隨機(jī)前綴的獨(dú)立RDD與另一個(gè)膨脹n倍的獨(dú)立RDD進(jìn)行join足删,此時(shí)就可以將原先相同的key打散成n份,分散到多個(gè)task中去進(jìn)行join了锁右。
? ? ? ? ? ? ? ? ? ? ? 第五步失受、 而另外兩個(gè)普通的RDD就照常join即可。最后將兩次join的結(jié)果使用union算子合并起來即可咏瑟,就是最終的join結(jié)果拂到。
具體原理見下圖:
4.7、使用隨機(jī)前綴和擴(kuò)容RDD進(jìn)行join
? ? ? ? ? ? ? 使用場景:如果在進(jìn)行join操作時(shí)码泞,RDD中有大量的key導(dǎo)致數(shù)據(jù)傾斜兄旬。
? ? ? ? ? ? ? 實(shí)現(xiàn)方式:同4.6,不同的是它不需要對(duì)原先rdd進(jìn)行傾斜key過濾將原來rdd形成含傾斜key的rdd余寥,與不含傾斜key的rdd领铐。直接對(duì)整個(gè)原本的rdd的key一邊進(jìn)行加隨機(jī)數(shù),另一邊進(jìn)行相應(yīng)倍數(shù)的擴(kuò)容宋舷。而這一種方案是針對(duì)有大量傾斜key的情況绪撵,沒法將部分key拆分出來進(jìn)行單獨(dú)處理,因此只能對(duì)整個(gè)RDD進(jìn)行數(shù)據(jù)擴(kuò)容祝蝠,對(duì)內(nèi)存資源要求很高音诈。