一梗摇、問題
兩個RDD進行join操作(即 rdd1.join(rdd2)) 會導(dǎo)致shuffle糯崎,這是因為join操作會對key一致的key-vlaue對進行合并几缭,而** key相同的key-value對不太可能會在同一個partition, 因此很有可能是需要進行經(jīng)過網(wǎng)絡(luò)進行shuffle的沃呢,而shuffle會產(chǎn)生許多中間數(shù)據(jù)(小文件)并涉及到網(wǎng)絡(luò)傳輸奏司,這些通常比較耗時,Spark中要盡量避免shuffle樟插。
二、解決方案
優(yōu)化方法:將小RDD的數(shù)據(jù)通過broadcast到每個executor中竿刁,各大RDD partition分別和小RDD做join操作黄锤。
具體是:在driver端將小RDD轉(zhuǎn)換成數(shù)組array并broadcast到各executor端,然后再各executor task中對各partion的大RDD的key-value對和小rdd的key-value對進行join食拜;由于每個executor端都有完整的小RDD鸵熟,因此小RDD的各partition不需要shuffle到RDD的各partition,小RDD廣播到大RDD的各partition后负甸,各partition分別進行join流强,最后再執(zhí)行reduce,所有分區(qū)的join結(jié)果匯總到driver端呻待。
三打月、業(yè)務(wù)代碼
import org.apache.spark.sql.SparkSession
object BigRDDJoinSmallRDD {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder().master("local[3]").appName("BigRDD Join SmallRDD").getOrCreate()
val sc = sparkSession.sparkContext
val list1 = List(("jame",23), ("wade",3), ("kobe",24))
val list2 = List(("jame", 13), ("wade",6), ("kobe",16))
val bigRDD = sc.makeRDD(list1)
val smallRDD = sc.makeRDD(list2)
println(bigRDD.getNumPartitions)
println(smallRDD.getNumPartitions)
// driver端rdd不broadcast廣播smallRDD到各executor,RDD不能被broadcast蚕捉,需要轉(zhuǎn)換成數(shù)組array
val smallRDDB= sc.broadcast(smallRDD.collect())
val joinedRDD = bigRDD.mapPartitions(partition => {
val smallRDDBV = smallRDDB.value // 各個executor端的task讀取廣播value
partition.map(element => {
//println(joinUtil(element, smallRDDBV))
joinUtil(element, smallRDDBV)
})
})
joinedRDD.foreach(x => println(x))
}
/**
* join操作:對兩個rdd中的相同key的value1和value2進行聚合奏篙,即(key,value1).join(key,value2)得到(key,(value1, vlaue2))
* 如果bigRDDEle的key和smallRDD的某個key一致萝勤,那么返回(key,(value1, vlaue2))
* 該方法會在各executor的task上執(zhí)行
* */
def joinUtil(bigRDDEle:(String,Int), smallRDD: Array[(String, Int)]): (String, (Int,Int)) = {
var joinEle:(String, (Int, Int)) = null
// 遍歷數(shù)組smallRDD
smallRDD.foreach(smallRDDEle => {
if(smallRDDEle._1.equals(bigRDDEle._1)){
// 如果bigRDD中某個元素的key和數(shù)組smallRDD的key一致蛔外,返回join結(jié)果
joinEle = (bigRDDEle._1, (bigRDDEle._2, smallRDDEle._2))
}
})
joinEle
}
}
如有錯誤,敬請指正痢畜!