## spark之?dāng)?shù)據(jù)傾斜 什么是數(shù)據(jù)傾斜
由于數(shù)據(jù)分布不均勻造成時(shí)間差異很大產(chǎn)生的一些列異逞魍玻現(xiàn)象
常見現(xiàn)象 1、個(gè)別task作業(yè)運(yùn)行緩慢 2栓拜、莫名其妙的OOM(內(nèi)存溢出)異常
## 一挥萌、數(shù)據(jù)傾斜原因
1、針對(duì)于個(gè)別task作業(yè)運(yùn)行緩慢情況亡问,主要是由于spark作業(yè)運(yùn)行時(shí)有兩種類型官紫,窄依賴和寬依賴,在進(jìn)行寬依賴時(shí)由于是進(jìn)行了夸網(wǎng)絡(luò)傳輸進(jìn)行shuffle操作州藕,這是若是某個(gè)key值對(duì)應(yīng)數(shù)據(jù)量過(guò)大就會(huì)造成這種情況
## 二束世、解決數(shù)據(jù)傾斜
思路,找到這個(gè)數(shù)據(jù)量比較大的可以進(jìn)行分拆
**(1)床玻、在hive ETL中做預(yù)處理**
Hive ETL預(yù)處理毁涉,數(shù)據(jù)傾斜的現(xiàn)象在hive中提前被處理,這樣加載到spark中的數(shù)據(jù)有傾斜嗎锈死?沒(méi)有贫堰!此時(shí)spark給web服務(wù)端只提供一個(gè)查詢服務(wù),所以沒(méi)有的數(shù)據(jù)傾斜待牵,效率非常高其屏!只不過(guò)此時(shí)將數(shù)據(jù)傾斜解決掉了嗎?是把spark端的dataskew轉(zhuǎn)移到hive中缨该。
**(2)偎行、過(guò)濾掉發(fā)生數(shù)據(jù)傾斜的key**
找到哪些發(fā)生數(shù)據(jù)傾斜的key,同時(shí)**必須要想業(yè)務(wù)人員確認(rèn)這些key是否有用**,如果沒(méi)用直接使用filter算子過(guò)濾掉就行蛤袒。
? 但在工作中熄云,切忌,但凡是刪除妙真、過(guò)濾缴允、更新等待操作,一定慎重珍德。
**(3)练般、提高程序的并行度**
通過(guò)以下兩種方法進(jìn)行設(shè)置
1. ? spark.default.parallelism 設(shè)置spark程序全局并行度
2. shuffle操作的第二個(gè)參數(shù)進(jìn)行設(shè)置(局部)并行度
提高并行度會(huì)在一定程度上減輕數(shù)據(jù)傾斜的壓力,但是并不能從徹底上根除數(shù)據(jù)傾斜**菱阵。因?yàn)橐坏┌l(fā)生數(shù)據(jù)傾斜踢俄,傾斜的key無(wú)論如何提高并行度,經(jīng)過(guò)shuffle操作都會(huì)直到一個(gè)分區(qū)中去晴及。
**(4)都办、進(jìn)行兩階段聚合**
兩階段聚合 局部聚合+全局聚合適用于xxxxByKey操作
```
package Day24.com.aura.test
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Random
object TwoStageDataskewOps {
? def main(args: Array[String]): Unit = {
val conf=new SparkConf()
? ? ? .setAppName("TwoStageDataskewOps")
? ? ? .setMaster("local[2]")
? ? val sc=new SparkContext(conf)
? ? val list=List(
? ? ? "hello hello hello hello you you hello",
? ? ? "hello hello hello you you hei hei hello hello hello",
? ? "hello hello hello hello you you hello",
? ? "hello hello hello you you hei hei hello hello hello",
? ? "hello hello hello hello you you hello",
? ? "hello hello hello you you hei hei hello hello hello"
? ? )
? ? val listRDD=sc.parallelize(list)
? ? val pairRDD:RDD[(String,Int)]=listRDD.flatMap(line=>{
? ? ? line.split("\\s+").map((_,1))
? ? })
? ? val sorted=pairRDD.sample(true,0.6).countByValue().toList.sortWith((m1,m2)=>m1._2>m2._2)
? ? println("抽樣排序")
? ? //獲取抽樣排序后的結(jié)果將集合轉(zhuǎn)為字符串用換行符分割
? ? println(sorted.mkString("\n"))
//獲取發(fā)生數(shù)據(jù)傾斜的key
? val daraskewKey=sorted.head._1
? //添加隨機(jī)數(shù)
? ? val perfixpairsRDD=pairRDD.map{case(word,count)=>{
? ? ? if(word==daraskewKey){
? ? ? //取隨機(jī)數(shù)
? ? ? ? val random=new Random()
? ? ? ? val prefix=random.nextInt(1)
? ? ? ? (s"${prefix}_${word}",count)
? ? ? }else{
? ? ? ? (word,count)
? ? ? }
? ? }}
? ? //進(jìn)行局部聚合
? ? val partAggrRDD:RDD[(String,Int)]=perfixpairsRDD.reduceByKey(_+_)
? ? //去掉隨機(jī)數(shù)
? ? val? unPrefixpairsRDD=partAggrRDD.map{case (word,count)=>{
? ? //去掉隨機(jī)前綴
? ? ? if (word.contains("_")){
? ? ? ? (word.substring(2),count)
? ? ? }else{
? ? ? ? (word,count)
? ? ? }
? ? }}
? ? //進(jìn)行全局聚合
? ? val fullArreRDd=unPrefixpairsRDD.reduceByKey(_+_)
? ? fullArreRDd.foreach(println)
? ? sc.stop()
? }
}
```
**(5)、使用map-join代替reducerjoin**
此操作主要針對(duì)join類的聚合操作虑稼,多表關(guān)聯(lián)琳钉,前提是進(jìn)行大小表關(guān)聯(lián)
例
```
package Day24.com.aura.test
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.Random
object TwoStageDataskewOps {
? def main(args: Array[String]): Unit = {
val conf=new SparkConf()
? ? ? .setAppName("TwoStageDataskewOps")
? ? ? .setMaster("local[2]")
? ? val sc=new SparkContext(conf)
? ? val list=List(
? ? ? "hello hello hello hello you you hello",
? ? ? "hello hello hello you you hei hei hello hello hello",
? ? "hello hello hello hello you you hello",
? ? "hello hello hello you you hei hei hello hello hello",
? ? "hello hello hello hello you you hello",
? ? "hello hello hello you you hei hei hello hello hello"
? ? )
? ? val listRDD=sc.parallelize(list)
? ? val pairRDD:RDD[(String,Int)]=listRDD.flatMap(line=>{
? ? ? line.split("\\s+").map((_,1))
? ? })
? ? val sorted=pairRDD.sample(true,0.6).countByValue().toList.sortWith((m1,m2)=>m1._2>m2._2)
? ? println("抽樣排序")
? ? //獲取抽樣排序后的結(jié)果將集合轉(zhuǎn)為字符串用換行符分割
? ? println(sorted.mkString("\n"))
//獲取發(fā)生數(shù)據(jù)傾斜的key
? val daraskewKey=sorted.head._1
? //添加隨機(jī)數(shù)
? ? val perfixpairsRDD=pairRDD.map{case(word,count)=>{
? ? ? if(word==daraskewKey){
? ? ? //取隨機(jī)數(shù)
? ? ? ? val random=new Random()
? ? ? ? val prefix=random.nextInt(1)
? ? ? ? (s"${prefix}_${word}",count)
? ? ? }else{
? ? ? ? (word,count)
? ? ? }
? ? }}
? ? //進(jìn)行局部聚合
? ? val partAggrRDD:RDD[(String,Int)]=perfixpairsRDD.reduceByKey(_+_)
? ? //去掉隨機(jī)數(shù)
? ? val? unPrefixpairsRDD=partAggrRDD.map{case (word,count)=>{
? ? //去掉隨機(jī)前綴
? ? ? if (word.contains("_")){
? ? ? ? (word.substring(2),count)
? ? ? }else{
? ? ? ? (word,count)
? ? ? }
? ? }}
? ? //進(jìn)行全局聚合
? ? val fullArreRDd=unPrefixpairsRDD.reduceByKey(_+_)
? ? fullArreRDd.foreach(println)
? ? sc.stop()
? }
}
```
**(6)、使用采樣key進(jìn)行分拆和聚合**
當(dāng)join操作的是兩張大表蛛倦,一張表正常歌懒,一張表中有個(gè)別key異常,其余正常溯壶。怎么辦及皂?
**使用采樣key進(jìn)行分拆和聚合**
```
package Day24.com.aura.test
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
object TwoStageDataskewOps02 {
? def main(args: Array[String]): Unit = {
val conf=new SparkConf()
? ? ? .setAppName("TwoStageDataskewOps02")
? ? ? .setMaster("local[2]")
? ? val sc = new SparkContext(conf)
? ? val left = List(
? ? ? ("hello", 1),
? ? ? ("hello", 2),
? ? ? ("hello", 3),
? ? ? ("you", 1),
? ? ? ("me", 1),
? ? ? ("you", 2),
? ? ? ("hello", 4),
? ? ? ("hello", 5)
? ? )
? ? val right = List(
? ? ? ("hello", 11),
? ? ? ("hello", 12),
? ? ? ("you", 11),
? ? ? ("me", 12)
? ? )
? val leftListRDD:RDD[(String,Int)]=sc.parallelize(left)
? ? val rightListRDD:RDD[(String,Int)]=sc.parallelize(right)
? //獲取樣本數(shù)據(jù)
? ? val sampleRDD=leftListRDD.sample(true,0.8)
? //對(duì)樣本數(shù)據(jù)進(jìn)行排序
? ? val sorted=sampleRDD.countByValue().toList.sortWith((m1,m2)=>m1._2>m2._2)
? ? //找到異常數(shù)據(jù)key值
? ? val dataskewKey=sorted.head._1
//將表拆分成正常數(shù)據(jù)和異常數(shù)據(jù)
? ? val dsLeftRDD:RDD[(String,Int)]=leftListRDD.filter{case (word,count)=>word==dataskewKey}
? ? val commonleftRDD:RDD[(String,Int)]=leftListRDD.filter{case (word,count)=>word!=dataskewKey}
? ? val dsRightRDD:RDD[(String, Int)] = rightListRDD.filter{case (word, count) => word == dataskewKey}
? ? val commonRightRDD:RDD[(String, Int)] = rightListRDD.filter{case (word, count) => word != dataskewKey}
//為異常表的異常數(shù)據(jù)的key添加隨機(jī)數(shù)
? ? val prefixLeftRDD=dsLeftRDD.map{case? (word,count)=>{
? ? ? val random=new Random()
? ? ? val prefix=random.nextInt(2)
? ? ? (s"${prefix}_${word}",count)
? ? }}
? ? //將正常表的與異常表的數(shù)據(jù)進(jìn)行擴(kuò)容
? ? val prefixRightRDD=dsRightRDD.flatMap{case (word,count)=>{
? ? ? val ab=ArrayBuffer[(String,Int)]()
? ? ? for(i<- 0 until 2){
? ? ? ? ab.append((s"${i}_${word}",count))
? ? ? }
? ? ? ab
? ? }}
? ? //進(jìn)行表的正常部分join連接
? ? val commonJoinedRDd=commonleftRDD.join(commonRightRDD)
? //進(jìn)行表的異常部分進(jìn)行連接
? ? val dsJoinedRDD=prefixLeftRDD.join(prefixRightRDD)
? ? //除去異常表添加的隨機(jī)數(shù)
? ? val dsFinalJoinedRDD=dsJoinedRDD.map{case (word,count)=>{
? ? ? (word.substring(word.indexOf("_")+1),count)
? ? }}
? ? //合并異常表數(shù)據(jù)和正常表數(shù)據(jù)
? ? val finalJoinedRDD=dsFinalJoinedRDD.union(commonJoinedRDd)
? ? finalJoinedRDD.foreach(println)
? ? sc.stop()
? }
}
```
擴(kuò)展,兩張大表且改,左表全量異常验烧,右表正常。
? 沒(méi)有好的解決方案又跛,左表全量加N以內(nèi)的隨機(jī)前綴碍拆,右表全量進(jìn)行N倍的擴(kuò)容】叮可能會(huì)有的問(wèn)題感混,擴(kuò)容之后的存儲(chǔ)壓力非常大,可能發(fā)生OOM異常礼烈。
實(shí)際上是以空間換時(shí)間
## 如果說(shuō)上述的單一操作解決不了問(wèn)題怎么辦弧满?那就一起上!