數(shù)據(jù)傾斜調(diào)優(yōu)
- 絕大多數(shù)task執(zhí)行得都非嘲舴鳎快伞梯,但個(gè)別task執(zhí)行極慢。比如帚屉,總共有1000個(gè)task谜诫,997個(gè)task都在1分鐘之內(nèi)執(zhí)行完了,但是剩余兩三個(gè)task卻要一兩個(gè)小時(shí),這種情況很常見
- 原本能夠正常執(zhí)行的Spark作業(yè)攻旦,某天突然報(bào)出OOM(內(nèi)存溢出)異常喻旷,觀察異常棧,是我們寫的業(yè)務(wù)代碼造成的牢屋。這種情況比較少見
數(shù)據(jù)傾斜發(fā)生的原因:
在進(jìn)行shuffle的時(shí)候且预,必須將各個(gè)節(jié)點(diǎn)上相同的key拉取到某個(gè)節(jié)點(diǎn)上的一個(gè)task來進(jìn)行處理,比如按照key進(jìn)行聚合或join等操作烙无。此時(shí)如果某個(gè)key對(duì)應(yīng)的數(shù)據(jù)量特別大的話锋谐,就會(huì)發(fā)生數(shù)據(jù)傾斜,因此出現(xiàn)數(shù)據(jù)傾斜的時(shí)候截酷,Spark作業(yè)看起來會(huì)運(yùn)行得非常緩慢涮拗,甚至可能因?yàn)槟硞€(gè)task處理的數(shù)據(jù)量過大導(dǎo)致內(nèi)存溢出。
數(shù)據(jù)傾斜只會(huì)發(fā)生在shuffle過程中 distinct、groupByKey三热、reduceByKey鼓择、aggregateByKey、join就漾、cogroup呐能、repartition等
問題分析
某個(gè)task執(zhí)行特別慢的情況
首先要看的,就是數(shù)據(jù)傾斜發(fā)生在第幾個(gè)stage中
如果是用yarn-client模式提交抑堡,那么本地是直接可以看到log的摆出,可以在log中找到當(dāng)前運(yùn)行到了第幾個(gè)stage
如果是用yarn-cluster模式提交,則可以通過Spark Web UI來查看當(dāng)前運(yùn)行到了第幾個(gè)stage首妖。
此外懊蒸,無論是使用yarn-client模式還是yarn-cluster模式,我們都可以在Spark Web UI上深入看一下當(dāng)前這個(gè)stage各個(gè)task分配的數(shù)據(jù)量悯搔,從而進(jìn)一步確定是不是task分配的數(shù)據(jù)不均勻?qū)е铝藬?shù)據(jù)傾斜
知道數(shù)據(jù)傾斜發(fā)生在哪一個(gè)stage之后,接著我們就需要根據(jù)stage劃分原理舌仍,推算出來發(fā)生傾斜的那個(gè)stage對(duì)應(yīng)代碼中的哪一部分(Spark是根據(jù)shuffle類算子來進(jìn)行stage的劃分)
某個(gè)task莫名其妙內(nèi)存溢出的情況
看log的異常棧妒貌,通過異常棧信息就可以定位到你的代碼中哪一行發(fā)生了內(nèi)存溢出。然后在那行代碼附近找找铸豁,一般也會(huì)有shuffle類算子灌曙,此時(shí)很可能就是這個(gè)算子導(dǎo)致了數(shù)據(jù)傾斜。不能單純靠偶然的內(nèi)存溢出就判定發(fā)生了數(shù)據(jù)傾斜节芥。因?yàn)樽约壕帉懙拇a的bug在刺,以及偶然出現(xiàn)的數(shù)據(jù)異常,也可能會(huì)導(dǎo)致內(nèi)存溢出
查看導(dǎo)致數(shù)據(jù)傾斜的key的數(shù)據(jù)分布情況
知道了數(shù)據(jù)傾斜發(fā)生在哪里之后头镊,通常需要分析一下那個(gè)執(zhí)行了shuffle操作并且導(dǎo)致了數(shù)據(jù)傾斜的RDD表蚣驼。查看一下其中key的分布情況,這主要是為之后選擇哪一種技術(shù)方案提供依據(jù)相艇。針對(duì)不同的key分布與不同的shuffle算子組合起來的各種情況颖杏,可能需要選擇不同的技術(shù)方案來解決
有很多種查看key分布的方式
如果是Spark SQL中的group by、join語(yǔ)句導(dǎo)致的數(shù)據(jù)傾斜坛芽,那么就查詢一下SQL中使用的表的key分布情況
如果是對(duì)Spark RDD執(zhí)行shuffle算子導(dǎo)致的數(shù)據(jù)傾斜留储,那么可以在Spark作業(yè)中加入查看key分布的代碼,比如RDD.countByKey()咙轩。然后對(duì)統(tǒng)計(jì)出來的各個(gè)key出現(xiàn)的次數(shù)获讳,collect/take到客戶端打印一下,就可以看到key的分布情況
解決方案
方案一:使用Hive ETL預(yù)處理數(shù)據(jù)
適用場(chǎng)景:導(dǎo)致數(shù)據(jù)傾斜的是hive表活喊。如果該Hive表中的數(shù)據(jù)本身很不均勻丐膝,而且業(yè)務(wù)場(chǎng)景需要頻繁使用Spark對(duì)Hive表執(zhí)行某個(gè)分析操作,那么比較適合使用這種技術(shù)方案
實(shí)現(xiàn)思路:可以評(píng)估一下,是否可以通過Hive來進(jìn)行數(shù)據(jù)預(yù)處理(即通過Hive ETL預(yù)先對(duì)數(shù)據(jù)按照key進(jìn)行聚合尤误,或者是預(yù)先和其他表進(jìn)行join)侠畔,然后在Spark作業(yè)中針對(duì)的數(shù)據(jù)源就不是原來的Hive表了,而是預(yù)處理后的Hive表损晤。此時(shí)由于數(shù)據(jù)已經(jīng)預(yù)先進(jìn)行過聚合或join操作了软棺,那么在Spark作業(yè)中也就不需要使用原先的shuffle類算子執(zhí)行這類操作了。
方案優(yōu)點(diǎn):實(shí)現(xiàn)起來簡(jiǎn)單便捷尤勋,效果還非常好喘落,完全規(guī)避掉了數(shù)據(jù)傾斜,Spark作業(yè)的性能會(huì)大幅度提升最冰。
方案缺點(diǎn):Hive ETL中還是會(huì)發(fā)生數(shù)據(jù)傾斜瘦棋。
在一些Java系統(tǒng)與Spark結(jié)合使用的項(xiàng)目中,會(huì)出現(xiàn)Java代碼頻繁調(diào)用Spark作業(yè)的場(chǎng)景暖哨,而且對(duì)Spark作業(yè)的執(zhí)行性能要求很高赌朋,就比較適合使用這種方案。將數(shù)據(jù)傾斜提前到上游的Hive ETL篇裁,每天僅執(zhí)行一次沛慢,只有那一次是比較慢的,而之后每次Java調(diào)用Spark作業(yè)時(shí)达布,執(zhí)行速度都會(huì)很快团甲,能夠提供更好的用戶體驗(yàn)
用戶通過Java Web系統(tǒng)提交數(shù)據(jù)分析統(tǒng)計(jì)任務(wù),后端通過Java提交Spark作業(yè)進(jìn)行數(shù)據(jù)分析統(tǒng)計(jì)黍聂。要求Spark作業(yè)速度必須要快
方案二:過濾少數(shù)導(dǎo)致傾斜的key
適用場(chǎng)景:如果發(fā)現(xiàn)導(dǎo)致傾斜的key就少數(shù)幾個(gè)躺苦,而且對(duì)計(jì)算本身的影響并不大的話
實(shí)現(xiàn)思路:將導(dǎo)致數(shù)據(jù)傾斜的key給過濾掉之后辱姨,這些key就不會(huì)參與計(jì)算了
方案優(yōu)點(diǎn):實(shí)現(xiàn)簡(jiǎn)單仓技,而且效果也很好,可以完全規(guī)避掉數(shù)據(jù)傾斜维哈。
方案缺點(diǎn):適用場(chǎng)景不多脐区,大多數(shù)情況下集乔,導(dǎo)致傾斜的key還是很多的,并不是只有少數(shù)幾個(gè)坡椒。
方案三:提高shuffle操作的并行度
處理數(shù)據(jù)傾斜最簡(jiǎn)單的一種方案
實(shí)現(xiàn)思路:在對(duì)RDD執(zhí)行shuffle算子時(shí)扰路,給shuffle算子傳入一個(gè)參數(shù),比如reduceByKey(1000)倔叼,該參數(shù)就設(shè)置了這個(gè)shuffle算子執(zhí)行時(shí)shuffle read task的數(shù)量汗唱。對(duì)于Spark SQL中的shuffle類語(yǔ)句,比如group by丈攒、join等哩罪,需要設(shè)置一個(gè)參數(shù)授霸,即spark.sql.shuffle.partitions,該參數(shù)代表了shuffle read task的并行度际插,該值默認(rèn)是200碘耳,對(duì)于很多場(chǎng)景來說都有點(diǎn)過小。
實(shí)現(xiàn)原理:增加shuffle read task的數(shù)量框弛,可以讓原本分配給一個(gè)task的多個(gè)key分配給多個(gè)task辛辨,從而讓每個(gè)task處理比原來更少的數(shù)據(jù)。
方案優(yōu)點(diǎn):實(shí)現(xiàn)起來比較簡(jiǎn)單瑟枫,可以有效緩解和減輕數(shù)據(jù)傾斜的影響斗搞。
方案缺點(diǎn):只是緩解了數(shù)據(jù)傾斜而已,沒有徹底根除問題慷妙,根據(jù)實(shí)踐經(jīng)驗(yàn)來看僻焚,其效果有限。
該方案通常無法徹底解決數(shù)據(jù)傾斜膝擂,因?yàn)槿绻霈F(xiàn)一些極端情況虑啤,比如某個(gè)key對(duì)應(yīng)的數(shù)據(jù)量有100萬,那么無論你的task數(shù)量增加到多少架馋,這個(gè)對(duì)應(yīng)著100萬數(shù)據(jù)的key肯定還是會(huì)分配到一個(gè)task中去處理咐旧,因此注定還是會(huì)發(fā)生數(shù)據(jù)傾斜的
方案四:兩階段聚合(局部聚合+全局聚合)
適用場(chǎng)景:對(duì)RDD執(zhí)行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語(yǔ)句進(jìn)行分組聚合時(shí),比較適用這種方案绩蜻。
實(shí)現(xiàn)思路:這個(gè)方案的核心實(shí)現(xiàn)思路就是進(jìn)行兩階段聚合。第一次是局部聚合室埋,先給每個(gè)key都打上一個(gè)隨機(jī)數(shù)办绝,比如10以內(nèi)的隨機(jī)數(shù),此時(shí)原先一樣的key就變成不一樣的了姚淆,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1)孕蝉,就會(huì)變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接著對(duì)打上隨機(jī)數(shù)后的數(shù)據(jù)腌逢,執(zhí)行reduceByKey等聚合操作降淮,進(jìn)行局部聚合,那么局部聚合結(jié)果搏讶,就會(huì)變成了(1_hello, 2) (2_hello, 2)佳鳖。然后將各個(gè)key的前綴給去掉,就會(huì)變成(hello,2)(hello,2)媒惕,再次進(jìn)行全局聚合操作系吩,就可以得到最終結(jié)果了,比如(hello, 4)妒蔚。
方案優(yōu)點(diǎn):對(duì)于聚合類的shuffle操作導(dǎo)致的數(shù)據(jù)傾斜穿挨,效果是非常不錯(cuò)的月弛。通常都可以解決掉數(shù)據(jù)傾斜,或者至少是大幅度緩解數(shù)據(jù)傾斜科盛,將Spark作業(yè)的性能提升數(shù)倍以上帽衙。
方案缺點(diǎn):僅僅適用于聚合類的shuffle操作,適用范圍相對(duì)較窄贞绵。如果是join類的shuffle操作厉萝,還得用其他的解決方案。
// 第一步但壮,給RDD中的每個(gè)key都打上一個(gè)隨機(jī)前綴冀泻。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(
new PairFunction<Tuple2<Long,Long>, String, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(10);
return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
}
});
// 第二步,對(duì)打上隨機(jī)前綴的key進(jìn)行局部聚合蜡饵。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
// 第三步弹渔,去除RDD中每個(gè)key的隨機(jī)前綴。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
new PairFunction<Tuple2<String,Long>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
throws Exception {
long originalKey = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2<Long, Long>(originalKey, tuple._2);
}
});
// 第四步溯祸,對(duì)去除了隨機(jī)前綴的RDD進(jìn)行全局聚合肢专。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
方案五:將reduce join轉(zhuǎn)為map join
適用場(chǎng)景:在對(duì)RDD使用join類操作,或者是在Spark SQL中使用join語(yǔ)句時(shí)焦辅,而且join操作中的一個(gè)RDD或表的數(shù)據(jù)量比較胁┱取(比如幾百M(fèi)或者一兩G),比較適用此方案
實(shí)現(xiàn)思路:不使用join算子進(jìn)行連接操作筷登,而使用Broadcast變量與map類算子實(shí)現(xiàn)join操作剃根,進(jìn)而完全規(guī)避掉shuffle類的操作,徹底避免數(shù)據(jù)傾斜的發(fā)生和出現(xiàn)
實(shí)現(xiàn)原理:普通的join是會(huì)走shuffle過程的前方,而一旦shuffle狈醉,就相當(dāng)于會(huì)將相同key的數(shù)據(jù)拉取到一個(gè)shuffle read task中再進(jìn)行join,此時(shí)就是reduce join惠险。但是如果一個(gè)RDD是比較小的苗傅,則可以采用廣播小RDD全量數(shù)據(jù)+map算子來實(shí)現(xiàn)與join同樣的效果,也就是map join班巩,此時(shí)就不會(huì)發(fā)生shuffle操作渣慕,也就不會(huì)發(fā)生數(shù)據(jù)傾斜。
方案優(yōu)點(diǎn):對(duì)join操作導(dǎo)致的數(shù)據(jù)傾斜抱慌,效果非常好逊桦,因?yàn)楦揪筒粫?huì)發(fā)生shuffle,也就根本不會(huì)發(fā)生數(shù)據(jù)傾斜抑进。
方案缺點(diǎn):適用場(chǎng)景較少卫袒,因?yàn)檫@個(gè)方案只適用于一個(gè)大表和一個(gè)小表的情況
方案六:采樣傾斜key并分拆join操作
適用場(chǎng)景:兩個(gè)RDD/Hive表進(jìn)行join的時(shí)候,如果數(shù)據(jù)量都比較大单匣,那么此時(shí)可以看一下兩個(gè)RDD/Hive表中的key分布情況夕凝。如果出現(xiàn)數(shù)據(jù)傾斜宝穗,是因?yàn)槠渲心骋粋€(gè)RDD/Hive表中的少數(shù)幾個(gè)key的數(shù)據(jù)量過大,而另一個(gè)RDD/Hive表中的所有key都分布比較均勻码秉,那么采用這個(gè)解決方案是比較合適的
實(shí)現(xiàn)原理:對(duì)于join導(dǎo)致的數(shù)據(jù)傾斜逮矛,如果只是某幾個(gè)key導(dǎo)致了傾斜,可以將少數(shù)幾個(gè)key分拆成獨(dú)立RDD转砖,并附加隨機(jī)前綴打散成n份去進(jìn)行join须鼎,此時(shí)這幾個(gè)key對(duì)應(yīng)的數(shù)據(jù)就不會(huì)集中在少數(shù)幾個(gè)task上,而是分散到多個(gè)task進(jìn)行join了府蔗。
方案優(yōu)點(diǎn):對(duì)于join導(dǎo)致的數(shù)據(jù)傾斜晋控,如果只是某幾個(gè)key導(dǎo)致了傾斜,采用該方式可以用最有效的方式打散key進(jìn)行join姓赤。而且只需要針對(duì)少數(shù)傾斜key對(duì)應(yīng)的數(shù)據(jù)進(jìn)行擴(kuò)容n倍赡译,不需要對(duì)全量數(shù)據(jù)進(jìn)行擴(kuò)容。避免了占用過多內(nèi)存不铆。
方案缺點(diǎn):如果導(dǎo)致傾斜的key特別多的話蝌焚,比如成千上萬個(gè)key都導(dǎo)致數(shù)據(jù)傾斜,那么這種方式也不適合誓斥。
方案七:使用隨機(jī)前綴和擴(kuò)容RDD進(jìn)行join
適用場(chǎng)景:如果在進(jìn)行join操作時(shí)只洒,RDD中有大量的key導(dǎo)致數(shù)據(jù)傾斜,那么進(jìn)行分拆key也沒什么意義劳坑,此時(shí)就只能使用最后一種方案來解決問題了毕谴。
實(shí)現(xiàn)原理:將原先一樣的key通過附加隨機(jī)前綴變成不一樣的key,然后就可以將這些處理后的“不同key”分散到多個(gè)task中去處理距芬,而不是讓一個(gè)task處理大量的相同key涝开。該方案與“解決方案六”的不同之處就在于,上一種方案是盡量只對(duì)少數(shù)傾斜key對(duì)應(yīng)的數(shù)據(jù)進(jìn)行特殊處理蔑穴,由于處理過程需要擴(kuò)容RDD,因此上一種方案擴(kuò)容RDD后對(duì)內(nèi)存的占用并不大惧浴;而這一種方案是針對(duì)有大量?jī)A斜key的情況存和,沒法將部分key拆分出來進(jìn)行單獨(dú)處理,因此只能對(duì)整個(gè)RDD進(jìn)行數(shù)據(jù)擴(kuò)容衷旅,對(duì)內(nèi)存資源要求很高
方案優(yōu)點(diǎn):對(duì)join類型的數(shù)據(jù)傾斜基本都可以處理捐腿,而且效果也相對(duì)比較顯著,性能提升效果非常不錯(cuò)柿顶。
方案缺點(diǎn):該方案更多的是緩解數(shù)據(jù)傾斜茄袖,而不是徹底避免數(shù)據(jù)傾斜。而且需要對(duì)整個(gè)RDD進(jìn)行擴(kuò)容嘁锯,對(duì)內(nèi)存資源要求很高宪祥。