1. 優(yōu)化數(shù)據(jù)結(jié)構(gòu)
2. 修改并行度
1. 改變并行度可以改善數(shù)據(jù)傾斜的原因是因?yàn)槿绻硞€(gè)task有100個(gè)key并且數(shù)據(jù)巨大谎痢,那么有可能導(dǎo)致OOM或者任務(wù)運(yùn)行緩慢;
2. 此時(shí)如果把并行度變大,那么可以分解每個(gè)task的數(shù)據(jù)量,比如把該task分解給10個(gè)task, 那么每個(gè)task的數(shù)據(jù)量將變小捡需,從而可以解決OOM或者任務(wù)執(zhí)行慢.
對應(yīng)reduceByKey而言可以傳入并行度參數(shù)也可以自定義partition.
3. 增加并行度:改變計(jì)算資源并沒有從根本上解決數(shù)據(jù)傾斜的問題哀卫,但是加快了任務(wù)運(yùn)行的速度.
4. 這是加入有傾斜的key, 加隨機(jī)數(shù)前綴当窗,reduceByKey聚合操作可以分而治之,產(chǎn)生的結(jié)果是代前綴的托嚣,因此需要map操作去掉前綴,然后在進(jìn)行reduceByKey操作.
3. 對數(shù)據(jù)做采樣厚骗, 對數(shù)據(jù)傾斜的key增加隨機(jī)的前綴.
(1) 針對如果傾斜的key比較少:
對與兩個(gè)RDD1和RDD2 的join操作示启, 其中一個(gè)RDD, 比如RDD1的數(shù)據(jù)傾斜的key比較少(比如可以通過sample取樣)在三個(gè)左右,那么這時(shí)候可以把RDD1轉(zhuǎn)換為RDD11(傾斜的key),RDD12(不包含傾斜的key),然后分別和RDD2進(jìn)行join操作得到的兩個(gè)結(jié)果result1,result2再次join產(chǎn)生最終的result.
(2) 針對如果傾斜的key特別多.
如果特別多的key傾斜那么就不需要考慮某一個(gè)key了领舰,把所有的key整體考慮即可夫嗓,需要把整體的數(shù)據(jù)量變大;
比如10億的數(shù)據(jù)變成500億冲秽,這時(shí)候可以使用flatmap進(jìn)行擴(kuò)容舍咖,比如
scala> List(1,2,3,4,5)
res0: List[Int] = List(1, 2, 3, 4, 5)
scala> res0.flatMap(x => 1 to x )
res1: List[Int] = List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)
但是具體擴(kuò)容的數(shù)量要依據(jù)機(jī)器的各方面的配置.
第一我們要解決的是數(shù)據(jù)能夠均勻的分布到各個(gè)節(jié)點(diǎn),讓集群能夠正常運(yùn)行起來.
偽代碼:
對一個(gè)rdd使用flatmap,另外一個(gè)使用random
val rdd1 = RDD1.flatmap{
for(i <- 1 to 10) {
i+"_"+item;
}
}
val rdd2 = RDD2.map{
val random = Random(10)
random+"_"+item;
}
val result = rdd1.join(rdd2);
result.map{
item.split //去掉前綴.
}
4. 局部聚合+全局聚合
5. ETL
6. 盡量不要產(chǎn)生shuffle
(1) 對小批量的數(shù)據(jù)進(jìn)行廣播.
針對兩個(gè)或者多個(gè)RDD進(jìn)行join操作锉桑, 如果其中一個(gè)RDDD數(shù)據(jù)比較小可以采用broadcast的方式(然后進(jìn)行map操作排霉,mappartition 批量加載數(shù)據(jù)進(jìn)行優(yōu)化) 如果數(shù)據(jù)都比較大的話會給GC帶來負(fù)擔(dān).不建議使用.
(2) 大表適合使用廣播
7. tacheyon
8. 復(fù)用RDD.
9. 從數(shù)據(jù)源頭開始考慮.
(1) 可以把key-values 變?yōu)閗ey-subkey-values
(2) 提取聚集,預(yù)操作join, 把傾斜數(shù)據(jù)在上游進(jìn)行操作.
(3) 把所有values的值進(jìn)行組拼然后就可以形成一個(gè)單一的key-values.
(4) 針對比如大量的key傾斜刨仑,比如數(shù)十萬的key傾斜郑诺,最簡單的辦法就是從硬件上去調(diào)整夹姥,增加cpu, 內(nèi)存.