1.使用Hive ETL預(yù)處理數(shù)據(jù)
方案適用場(chǎng)景:如果導(dǎo)致數(shù)據(jù)傾斜的是Hive表岔霸。如果該Hive表中的數(shù)據(jù)本身很不均勻(比如某個(gè)
key對(duì)應(yīng)了100萬(wàn)數(shù)據(jù),其他key才對(duì)應(yīng)了10條數(shù)據(jù))绞绒,而且業(yè)務(wù)場(chǎng)景需要頻繁使用Spark對(duì)Hive表
執(zhí)行某個(gè)分析操作度硝,那么比較適合使用這種技術(shù)方案依沮。方案實(shí)現(xiàn)思路:此時(shí)可以評(píng)估一下盗舰,是否可以通過(guò)Hive來(lái)進(jìn)行數(shù)據(jù)預(yù)處理(即通過(guò)Hive ETL預(yù)先對(duì)
數(shù)據(jù)按照key進(jìn)行聚合,或者是預(yù)先和其他表進(jìn)行join)扎谎,然后在Spark作業(yè)中針對(duì)的數(shù)據(jù)源就不是
原來(lái)的Hive表了碳想,而是預(yù)處理后的Hive表。此時(shí)由于數(shù)據(jù)已經(jīng)預(yù)先進(jìn)行過(guò)聚合或join操作了毁靶,那么
在Spark作業(yè)中也就不需要使用原先的shuffle類算子執(zhí)行這類操作了胧奔。方案實(shí)現(xiàn)原理:這種方案從根源上解決了數(shù)據(jù)傾斜,因?yàn)閺氐妆苊饬嗽赟park中執(zhí)行shuffle類算子
老充,那么肯定就不會(huì)有數(shù)據(jù)傾斜的問(wèn)題了。但是這里也要提醒一下大家螟左,這種方式屬于治標(biāo)不治本啡浊。
因?yàn)楫吘箶?shù)據(jù)本身就存在分布不均勻的問(wèn)題,所以Hive ETL中進(jìn)行g(shù)roup by或者join等shuffle操作
時(shí)胶背,還是會(huì)出現(xiàn)數(shù)據(jù)傾斜巷嚣,導(dǎo)致Hive ETL的速度很慢。我們只是把數(shù)據(jù)傾斜的發(fā)生提前到了Hive
ETL中钳吟,避免Spark程序發(fā)生數(shù)據(jù)傾斜而已廷粒。
2.過(guò)濾少數(shù)導(dǎo)致傾斜的key
SampleOperator
方案適用場(chǎng)景:如果發(fā)現(xiàn)導(dǎo)致傾斜的key就少數(shù)幾個(gè),而且對(duì)計(jì)算本身的影響并不大的話,那么很
適合使用這種方案坝茎。比如99%的key就對(duì)應(yīng)10條數(shù)據(jù)涤姊,但是只有一個(gè)key對(duì)應(yīng)了100萬(wàn)數(shù)據(jù),從而導(dǎo)
致了數(shù)據(jù)傾斜嗤放。方案實(shí)現(xiàn)思路:如果我們判斷那少數(shù)幾個(gè)數(shù)據(jù)量特別多的key思喊,對(duì)作業(yè)的執(zhí)行和計(jì)算結(jié)果不是特別
重要的話,那么干脆就直接過(guò)濾掉那少數(shù)幾個(gè)key次酌。比如恨课,在Spark SQL中可以使用where子句過(guò)濾
掉這些key或者在Spark Core中對(duì)RDD執(zhí)行filter算子過(guò)濾掉這些key。如果需要每次作業(yè)執(zhí)行時(shí)岳服,
動(dòng)態(tài)判定哪些key的數(shù)據(jù)量最多然后再進(jìn)行過(guò)濾剂公,那么可以使用sample算子對(duì)RDD進(jìn)行采樣,然后
計(jì)算出每個(gè)key的數(shù)量吊宋,取數(shù)據(jù)量最多的key過(guò)濾掉即可纲辽。方案實(shí)現(xiàn)原理:將導(dǎo)致數(shù)據(jù)傾斜的key給過(guò)濾掉之后,這些key就不會(huì)參與計(jì)算了贫母,自然不可能產(chǎn)生
數(shù)據(jù)傾斜文兑。
sample算子的使用:nameRDD.sample(false,0.75),false代表不放回的抽樣腺劣,0.75代表從整體中抽多少數(shù)據(jù)绿贞。
3.提高shuffle操作的并行度
方案實(shí)現(xiàn)思路:在對(duì)RDD執(zhí)行shuffle算子時(shí),給shuffle算子傳入一個(gè)參數(shù)橘原,比如
reduceByKey(1000)籍铁,該參數(shù)就設(shè)置了這個(gè)shuffle算子執(zhí)行時(shí)shuffle read task的數(shù)量。對(duì)于
Spark SQL中的shuffle類語(yǔ)句趾断,比如group by拒名、join等,需要設(shè)置一個(gè)參數(shù)芋酌,即
spark.sql.shuffle.partitions增显,該參數(shù)代表了shuffle read task的并行度,該值默認(rèn)是200脐帝,對(duì)于很
多場(chǎng)景來(lái)說(shuō)都有點(diǎn)過(guò)小同云。方案實(shí)現(xiàn)原理:增加shuffle read task的數(shù)量,可以讓原本分配給一個(gè)task的多個(gè)key分配給多個(gè)
task堵腹,從而讓每個(gè)task處理比原來(lái)更少的數(shù)據(jù)炸站。舉例來(lái)說(shuō),如果原本有5個(gè)key疚顷,每個(gè)key對(duì)應(yīng)10條
數(shù)據(jù)旱易,這5個(gè)key都是分配給一個(gè)task的禁偎,那么這個(gè)task就要處理50條數(shù)據(jù)。而增加了shuffle read
task以后阀坏,每個(gè)task就分配到一個(gè)key如暖,即每個(gè)task就處理10條數(shù)據(jù),那么自然每個(gè)task的執(zhí)行時(shí)
間都會(huì)變短了全释。
4.雙重聚合
DoubelReduceByKey
方案適用場(chǎng)景:對(duì)RDD執(zhí)行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by
語(yǔ)句進(jìn)行分組聚合時(shí)装处,比較適用這種方案。方案實(shí)現(xiàn)思路:這個(gè)方案的核心實(shí)現(xiàn)思路就是進(jìn)行兩階段聚合浸船。第一次是局部聚合妄迁,先給每個(gè)key
都打上一個(gè)隨機(jī)數(shù),比如10以內(nèi)的隨機(jī)數(shù)李命,此時(shí)原先一樣的key就變成不一樣的了登淘,比如(hello, 1)
(hello, 1) (hello, 1) (hello, 1),就會(huì)變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)封字。接著
對(duì)打上隨機(jī)數(shù)后的數(shù)據(jù)黔州,執(zhí)行reduceByKey等聚合操作,進(jìn)行局部聚合阔籽,那么局部聚合結(jié)果流妻,就會(huì)
變成了(1_hello, 2) (2_hello, 2)。然后將各個(gè)key的前綴給去掉笆制,就會(huì)變成(hello,2)(hello,2)绅这,再次
進(jìn)行全局聚合操作,就可以得到最終結(jié)果了在辆,比如(hello, 4)证薇。方案實(shí)現(xiàn)原理:將原本相同的key通過(guò)附加隨機(jī)前綴的方式,變成多個(gè)不同的key匆篓,就可以讓原本被
一個(gè)task處理的數(shù)據(jù)分散到多個(gè)task上去做局部聚合浑度,進(jìn)而解決單個(gè)task處理數(shù)據(jù)量過(guò)多的問(wèn)題。
接著去除掉隨機(jī)前綴鸦概,再次進(jìn)行全局聚合箩张,就可以得到最終的結(jié)果
5.將reduce join轉(zhuǎn)為map join
方案適用場(chǎng)景:在對(duì)RDD使用join類操作,或者是在Spark SQL中使用join語(yǔ)句時(shí)窗市,而且join操作中
的一個(gè)RDD或表的數(shù)據(jù)量比較邢瓤丁(比如幾百M(fèi)或者一兩G),具體看Executor的內(nèi)存來(lái)廣播
(executor-memory * 0.48 0.6 * 0.8)谨设,比較適用此方案熟掂。方案實(shí)現(xiàn)思路:不使用join算子進(jìn)行連接操作缎浇,而使用Broadcast變量與map類算子實(shí)現(xiàn)join操作扎拣,
進(jìn)而完全規(guī)避掉shuffle類的操作,徹底避免數(shù)據(jù)傾斜的發(fā)生和出現(xiàn)。將較小RDD中的數(shù)據(jù)直接通過(guò)
collect算子拉取到Driver端的內(nèi)存中來(lái)二蓝,然后對(duì)其創(chuàng)建一個(gè)Broadcast變量誉券;接著對(duì)另外一個(gè)RDD
執(zhí)行map類算子,在算子函數(shù)內(nèi)刊愚,從Broadcast變量中獲取較小RDD的全量數(shù)據(jù)踊跟,與當(dāng)前RDD的每
一條數(shù)據(jù)按照連接key進(jìn)行比對(duì),如果連接key相同的話鸥诽,那么就將兩個(gè)RDD的數(shù)據(jù)用你需要的方式
連接起來(lái)商玫。方案實(shí)現(xiàn)原理:普通的join是會(huì)走shuffle過(guò)程的,而一旦shuffle牡借,就相當(dāng)于會(huì)將相同key的數(shù)據(jù)拉
取到一個(gè)shuffle read task中再進(jìn)行join拳昌,此時(shí)就是reduce join。但是如果一個(gè)RDD是比較小的钠龙,
則可以采用廣播小RDD全量數(shù)據(jù)+map算子來(lái)實(shí)現(xiàn)與join同樣的效果炬藤,也就是map join,此時(shí)就不
會(huì)發(fā)生shuffle操作碴里,也就不會(huì)發(fā)生數(shù)據(jù)傾斜
6.采樣傾斜key并分拆join操作
方案適用場(chǎng)景:兩個(gè)RDD/Hive表進(jìn)行join的時(shí)候沈矿,如果數(shù)據(jù)量都比較大,無(wú)法采用“解決方案五
”咬腋,那么此時(shí)可以看一下兩個(gè)RDD/Hive表中的key分布情況羹膳。如果出現(xiàn)數(shù)據(jù)傾斜,是因?yàn)槠渲心骋?br> 個(gè)RDD/Hive表中的少數(shù)幾個(gè)key的數(shù)據(jù)量過(guò)大帝火,而另一個(gè)RDD/Hive表中的所有key都分布比較均
勻溜徙,那么采用這個(gè)解決方案是比較合適的。-
方案實(shí)現(xiàn)思路:
對(duì)包含少數(shù)幾個(gè)數(shù)據(jù)量過(guò)大的key的那個(gè)RDD犀填,通過(guò)sample算子采樣出一份樣本來(lái)蠢壹,然后統(tǒng)計(jì)一下每個(gè)
key的數(shù)量,計(jì)算出來(lái)數(shù)據(jù)量最大的是哪幾個(gè)key九巡。然后將這幾個(gè)key對(duì)應(yīng)的數(shù)據(jù)從原來(lái)的RDD中拆分出來(lái)图贸,形成一個(gè)單獨(dú)的RDD,并給每個(gè)key都打上n以
內(nèi)的隨機(jī)數(shù)作為前綴冕广,而不會(huì)導(dǎo)致傾斜的大部分key形成另外一個(gè)RDD疏日。接著將需要join的另一個(gè)RDD,也過(guò)濾出來(lái)那幾個(gè)傾斜key對(duì)應(yīng)的數(shù)據(jù)并形成一個(gè)單獨(dú)的RDD撒汉,將每條數(shù)
據(jù)膨脹成n條數(shù)據(jù)沟优,這n條數(shù)據(jù)都按順序附加一個(gè)0~n的前綴,不會(huì)導(dǎo)致傾斜的大部分key也形成另外一個(gè)
RDD睬辐。再將附加了隨機(jī)前綴的獨(dú)立RDD與另一個(gè)膨脹n倍的獨(dú)立RDD進(jìn)行join挠阁,此時(shí)就可以將原先相同的key打
散成n份宾肺,分散到多個(gè)task中去進(jìn)行join了。而另外兩個(gè)普通的RDD就照常join即可侵俗。
最后將兩次join的結(jié)果使用union算子合并起來(lái)即可锨用,就是最終的join結(jié)果。
7.使用隨機(jī)前綴和擴(kuò)容RDD進(jìn)行join
方案適用場(chǎng)景:如果在進(jìn)行join操作時(shí)隘谣,RDD中有大量的key導(dǎo)致數(shù)據(jù)傾斜增拥,那么進(jìn)行分拆key也沒(méi)
什么意義,此時(shí)就只能使用最后一種方案來(lái)解決問(wèn)題了寻歧。-
方案實(shí)現(xiàn)思路:
該方案的實(shí)現(xiàn)思路基本和“解決方案六”類似掌栅,首先查看RDD/Hive表中的數(shù)據(jù)分布情況,找到那個(gè)造成
數(shù)據(jù)傾斜的RDD/Hive表码泛,比如有多個(gè)key都對(duì)應(yīng)了超過(guò)1萬(wàn)條數(shù)據(jù)渣玲。然后將該RDD的每條數(shù)據(jù)都打上一個(gè)n以內(nèi)的隨機(jī)前綴。
同時(shí)對(duì)另外一個(gè)正常的RDD進(jìn)行擴(kuò)容弟晚,將每條數(shù)據(jù)都擴(kuò)容成n條數(shù)據(jù)忘衍,擴(kuò)容出來(lái)的每條數(shù)據(jù)都依次打上一
個(gè)0~n的前綴。最后將兩個(gè)處理后的RDD進(jìn)行join即可卿城。
方案實(shí)現(xiàn)原理:將原先一樣的key通過(guò)附加隨機(jī)前綴變成不一樣的key枚钓,然后就可以將這些處理后的
“不同key”分散到多個(gè)task中去處理,而不是讓一個(gè)task處理大量的相同key瑟押。該方案與“解決方
案六”的不同之處就在于搀捷,上一種方案是盡量只對(duì)少數(shù)傾斜key對(duì)應(yīng)的數(shù)據(jù)進(jìn)行特殊處理,由于處
理過(guò)程需要擴(kuò)容RDD多望,因此上一種方案擴(kuò)容RDD后對(duì)內(nèi)存的占用并不大嫩舟;而這一種方案是針對(duì)有大
量?jī)A斜key的情況,沒(méi)法將部分key拆分出來(lái)進(jìn)行單獨(dú)處理怀偷,因此只能對(duì)整個(gè)RDD進(jìn)行數(shù)據(jù)擴(kuò)容家厌,對(duì)
內(nèi)存資源要求很高。