spark之?dāng)?shù)據(jù)傾斜

## spark之?dāng)?shù)據(jù)傾斜 什么是數(shù)據(jù)傾斜

由于數(shù)據(jù)分布不均勻造成時(shí)間差異很大產(chǎn)生的一些列異逞魍玻現(xiàn)象

常見現(xiàn)象 1、個(gè)別task作業(yè)運(yùn)行緩慢 2栓拜、莫名其妙的OOM(內(nèi)存溢出)異常

## 一挥萌、數(shù)據(jù)傾斜原因

1、針對(duì)于個(gè)別task作業(yè)運(yùn)行緩慢情況亡问,主要是由于spark作業(yè)運(yùn)行時(shí)有兩種類型官紫,窄依賴和寬依賴,在進(jìn)行寬依賴時(shí)由于是進(jìn)行了夸網(wǎng)絡(luò)傳輸進(jìn)行shuffle操作州藕,這是若是某個(gè)key值對(duì)應(yīng)數(shù)據(jù)量過(guò)大就會(huì)造成這種情況

## 二束世、解決數(shù)據(jù)傾斜

思路,找到這個(gè)數(shù)據(jù)量比較大的可以進(jìn)行分拆

**(1)床玻、在hive ETL中做預(yù)處理**

Hive ETL預(yù)處理毁涉,數(shù)據(jù)傾斜的現(xiàn)象在hive中提前被處理,這樣加載到spark中的數(shù)據(jù)有傾斜嗎锈死?沒(méi)有贫堰!此時(shí)spark給web服務(wù)端只提供一個(gè)查詢服務(wù),所以沒(méi)有的數(shù)據(jù)傾斜待牵,效率非常高其屏!只不過(guò)此時(shí)將數(shù)據(jù)傾斜解決掉了嗎?是把spark端的dataskew轉(zhuǎn)移到hive中缨该。

**(2)偎行、過(guò)濾掉發(fā)生數(shù)據(jù)傾斜的key**

找到哪些發(fā)生數(shù)據(jù)傾斜的key,同時(shí)**必須要想業(yè)務(wù)人員確認(rèn)這些key是否有用**,如果沒(méi)用直接使用filter算子過(guò)濾掉就行蛤袒。

? 但在工作中熄云,切忌,但凡是刪除妙真、過(guò)濾缴允、更新等待操作,一定慎重珍德。

**(3)练般、提高程序的并行度**

通過(guò)以下兩種方法進(jìn)行設(shè)置

1. ? spark.default.parallelism 設(shè)置spark程序全局并行度

2. shuffle操作的第二個(gè)參數(shù)進(jìn)行設(shè)置(局部)并行度

提高并行度會(huì)在一定程度上減輕數(shù)據(jù)傾斜的壓力,但是并不能從徹底上根除數(shù)據(jù)傾斜**菱阵。因?yàn)橐坏┌l(fā)生數(shù)據(jù)傾斜踢俄,傾斜的key無(wú)論如何提高并行度,經(jīng)過(guò)shuffle操作都會(huì)直到一個(gè)分區(qū)中去晴及。

**(4)都办、進(jìn)行兩階段聚合**

兩階段聚合 局部聚合+全局聚合適用于xxxxByKey操作

```

package Day24.com.aura.test

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Random

object TwoStageDataskewOps {

? def main(args: Array[String]): Unit = {

val conf=new SparkConf()

? ? ? .setAppName("TwoStageDataskewOps")

? ? ? .setMaster("local[2]")

? ? val sc=new SparkContext(conf)

? ? val list=List(

? ? ? "hello hello hello hello you you hello",

? ? ? "hello hello hello you you hei hei hello hello hello",

? ? "hello hello hello hello you you hello",

? ? "hello hello hello you you hei hei hello hello hello",

? ? "hello hello hello hello you you hello",

? ? "hello hello hello you you hei hei hello hello hello"

? ? )

? ? val listRDD=sc.parallelize(list)

? ? val pairRDD:RDD[(String,Int)]=listRDD.flatMap(line=>{

? ? ? line.split("\\s+").map((_,1))

? ? })

? ? val sorted=pairRDD.sample(true,0.6).countByValue().toList.sortWith((m1,m2)=>m1._2>m2._2)

? ? println("抽樣排序")

? ? //獲取抽樣排序后的結(jié)果將集合轉(zhuǎn)為字符串用換行符分割

? ? println(sorted.mkString("\n"))

//獲取發(fā)生數(shù)據(jù)傾斜的key

? val daraskewKey=sorted.head._1

? //添加隨機(jī)數(shù)

? ? val perfixpairsRDD=pairRDD.map{case(word,count)=>{

? ? ? if(word==daraskewKey){

? ? ? //取隨機(jī)數(shù)

? ? ? ? val random=new Random()

? ? ? ? val prefix=random.nextInt(1)

? ? ? ? (s"${prefix}_${word}",count)

? ? ? }else{

? ? ? ? (word,count)

? ? ? }

? ? }}

? ? //進(jìn)行局部聚合

? ? val partAggrRDD:RDD[(String,Int)]=perfixpairsRDD.reduceByKey(_+_)

? ? //去掉隨機(jī)數(shù)

? ? val? unPrefixpairsRDD=partAggrRDD.map{case (word,count)=>{

? ? //去掉隨機(jī)前綴

? ? ? if (word.contains("_")){

? ? ? ? (word.substring(2),count)

? ? ? }else{

? ? ? ? (word,count)

? ? ? }

? ? }}

? ? //進(jìn)行全局聚合

? ? val fullArreRDd=unPrefixpairsRDD.reduceByKey(_+_)

? ? fullArreRDd.foreach(println)

? ? sc.stop()

? }

}

```

**(5)、使用map-join代替reducerjoin**

此操作主要針對(duì)join類的聚合操作虑稼,多表關(guān)聯(lián)琳钉,前提是進(jìn)行大小表關(guān)聯(lián)

```

package Day24.com.aura.test

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Random

object TwoStageDataskewOps {

? def main(args: Array[String]): Unit = {

val conf=new SparkConf()

? ? ? .setAppName("TwoStageDataskewOps")

? ? ? .setMaster("local[2]")

? ? val sc=new SparkContext(conf)

? ? val list=List(

? ? ? "hello hello hello hello you you hello",

? ? ? "hello hello hello you you hei hei hello hello hello",

? ? "hello hello hello hello you you hello",

? ? "hello hello hello you you hei hei hello hello hello",

? ? "hello hello hello hello you you hello",

? ? "hello hello hello you you hei hei hello hello hello"

? ? )

? ? val listRDD=sc.parallelize(list)

? ? val pairRDD:RDD[(String,Int)]=listRDD.flatMap(line=>{

? ? ? line.split("\\s+").map((_,1))

? ? })

? ? val sorted=pairRDD.sample(true,0.6).countByValue().toList.sortWith((m1,m2)=>m1._2>m2._2)

? ? println("抽樣排序")

? ? //獲取抽樣排序后的結(jié)果將集合轉(zhuǎn)為字符串用換行符分割

? ? println(sorted.mkString("\n"))

//獲取發(fā)生數(shù)據(jù)傾斜的key

? val daraskewKey=sorted.head._1

? //添加隨機(jī)數(shù)

? ? val perfixpairsRDD=pairRDD.map{case(word,count)=>{

? ? ? if(word==daraskewKey){

? ? ? //取隨機(jī)數(shù)

? ? ? ? val random=new Random()

? ? ? ? val prefix=random.nextInt(1)

? ? ? ? (s"${prefix}_${word}",count)

? ? ? }else{

? ? ? ? (word,count)

? ? ? }

? ? }}

? ? //進(jìn)行局部聚合

? ? val partAggrRDD:RDD[(String,Int)]=perfixpairsRDD.reduceByKey(_+_)

? ? //去掉隨機(jī)數(shù)

? ? val? unPrefixpairsRDD=partAggrRDD.map{case (word,count)=>{

? ? //去掉隨機(jī)前綴

? ? ? if (word.contains("_")){

? ? ? ? (word.substring(2),count)

? ? ? }else{

? ? ? ? (word,count)

? ? ? }

? ? }}

? ? //進(jìn)行全局聚合

? ? val fullArreRDd=unPrefixpairsRDD.reduceByKey(_+_)

? ? fullArreRDd.foreach(println)

? ? sc.stop()

? }

}

```

**(6)、使用采樣key進(jìn)行分拆和聚合**

當(dāng)join操作的是兩張大表蛛倦,一張表正常歌懒,一張表中有個(gè)別key異常,其余正常溯壶。怎么辦及皂?

**使用采樣key進(jìn)行分拆和聚合**

```

package Day24.com.aura.test

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

import scala.util.Random

object TwoStageDataskewOps02 {

? def main(args: Array[String]): Unit = {

val conf=new SparkConf()

? ? ? .setAppName("TwoStageDataskewOps02")

? ? ? .setMaster("local[2]")

? ? val sc = new SparkContext(conf)

? ? val left = List(

? ? ? ("hello", 1),

? ? ? ("hello", 2),

? ? ? ("hello", 3),

? ? ? ("you", 1),

? ? ? ("me", 1),

? ? ? ("you", 2),

? ? ? ("hello", 4),

? ? ? ("hello", 5)

? ? )

? ? val right = List(

? ? ? ("hello", 11),

? ? ? ("hello", 12),

? ? ? ("you", 11),

? ? ? ("me", 12)

? ? )


? val leftListRDD:RDD[(String,Int)]=sc.parallelize(left)

? ? val rightListRDD:RDD[(String,Int)]=sc.parallelize(right)

? //獲取樣本數(shù)據(jù)

? ? val sampleRDD=leftListRDD.sample(true,0.8)

? //對(duì)樣本數(shù)據(jù)進(jìn)行排序

? ? val sorted=sampleRDD.countByValue().toList.sortWith((m1,m2)=>m1._2>m2._2)

? ? //找到異常數(shù)據(jù)key值

? ? val dataskewKey=sorted.head._1

//將表拆分成正常數(shù)據(jù)和異常數(shù)據(jù)

? ? val dsLeftRDD:RDD[(String,Int)]=leftListRDD.filter{case (word,count)=>word==dataskewKey}

? ? val commonleftRDD:RDD[(String,Int)]=leftListRDD.filter{case (word,count)=>word!=dataskewKey}

? ? val dsRightRDD:RDD[(String, Int)] = rightListRDD.filter{case (word, count) => word == dataskewKey}

? ? val commonRightRDD:RDD[(String, Int)] = rightListRDD.filter{case (word, count) => word != dataskewKey}

//為異常表的異常數(shù)據(jù)的key添加隨機(jī)數(shù)

? ? val prefixLeftRDD=dsLeftRDD.map{case? (word,count)=>{

? ? ? val random=new Random()

? ? ? val prefix=random.nextInt(2)

? ? ? (s"${prefix}_${word}",count)

? ? }}

? ? //將正常表的與異常表的數(shù)據(jù)進(jìn)行擴(kuò)容

? ? val prefixRightRDD=dsRightRDD.flatMap{case (word,count)=>{

? ? ? val ab=ArrayBuffer[(String,Int)]()

? ? ? for(i<- 0 until 2){

? ? ? ? ab.append((s"${i}_${word}",count))

? ? ? }

? ? ? ab

? ? }}

? ? //進(jìn)行表的正常部分join連接

? ? val commonJoinedRDd=commonleftRDD.join(commonRightRDD)

? //進(jìn)行表的異常部分進(jìn)行連接

? ? val dsJoinedRDD=prefixLeftRDD.join(prefixRightRDD)

? ? //除去異常表添加的隨機(jī)數(shù)

? ? val dsFinalJoinedRDD=dsJoinedRDD.map{case (word,count)=>{

? ? ? (word.substring(word.indexOf("_")+1),count)

? ? }}

? ? //合并異常表數(shù)據(jù)和正常表數(shù)據(jù)

? ? val finalJoinedRDD=dsFinalJoinedRDD.union(commonJoinedRDd)

? ? finalJoinedRDD.foreach(println)

? ? sc.stop()

? }

}

```

擴(kuò)展,兩張大表且改,左表全量異常验烧,右表正常。

? 沒(méi)有好的解決方案又跛,左表全量加N以內(nèi)的隨機(jī)前綴碍拆,右表全量進(jìn)行N倍的擴(kuò)容】叮可能會(huì)有的問(wèn)題感混,擴(kuò)容之后的存儲(chǔ)壓力非常大,可能發(fā)生OOM異常礼烈。

實(shí)際上是以空間換時(shí)間

## 如果說(shuō)上述的單一操作解決不了問(wèn)題怎么辦弧满?那就一起上!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末此熬,一起剝皮案震驚了整個(gè)濱河市谱秽,隨后出現(xiàn)的幾起案子洽蛀,更是在濱河造成了極大的恐慌,老刑警劉巖疟赊,帶你破解...
    沈念sama閱讀 218,941評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異峡碉,居然都是意外死亡近哟,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門鲫寄,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)吉执,“玉大人,你說(shuō)我怎么就攤上這事地来〈撩担” “怎么了?”我有些...
    開封第一講書人閱讀 165,345評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵未斑,是天一觀的道長(zhǎng)咕宿。 經(jīng)常有香客問(wèn)我,道長(zhǎng)蜡秽,這世上最難降的妖魔是什么府阀? 我笑而不...
    開封第一講書人閱讀 58,851評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮芽突,結(jié)果婚禮上试浙,老公的妹妹穿的比我還像新娘。我一直安慰自己寞蚌,他們只是感情好田巴,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著挟秤,像睡著了一般壹哺。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上煞聪,一...
    開封第一講書人閱讀 51,688評(píng)論 1 305
  • 那天斗躏,我揣著相機(jī)與錄音,去河邊找鬼昔脯。 笑死啄糙,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的云稚。 我是一名探鬼主播隧饼,決...
    沈念sama閱讀 40,414評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼静陈!你這毒婦竟也來(lái)了燕雁?” 一聲冷哼從身側(cè)響起诞丽,我...
    開封第一講書人閱讀 39,319評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎拐格,沒(méi)想到半個(gè)月后僧免,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,775評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡捏浊,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年责静,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了昆咽。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片漱病。...
    茶點(diǎn)故事閱讀 40,096評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡惜姐,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出胡岔,到底是詐尸還是另有隱情法希,我是刑警寧澤,帶...
    沈念sama閱讀 35,789評(píng)論 5 346
  • 正文 年R本政府宣布靶瘸,位于F島的核電站苫亦,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏奕锌。R本人自食惡果不足惜著觉,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望惊暴。 院中可真熱鬧饼丘,春花似錦、人聲如沸辽话。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)油啤。三九已至典徘,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間益咬,已是汗流浹背逮诲。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留幽告,地道東北人梅鹦。 一個(gè)月前我還...
    沈念sama閱讀 48,308評(píng)論 3 372
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像冗锁,于是被迫代替她去往敵國(guó)和親齐唆。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容