例子一:
一個簡單的例子,看一下mappartitonrdd是怎么形成依賴的
scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[88] at makeRDD at <console>:24
scala> val rdd2 = rdd1.map(x=>(x._1,x._2+1))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[90] at map at <console>:26
通過makeRDD望拖,構造了ParallelCollectionRDD哼蛆,ParallelCollectionRDD通過map算子形成了MapPartitionsRDD
makeRDD ---- 返回一個ParallelCollectionRDD
private[spark] class ParallelCollectionRDD[T: ClassTag](
sc: SparkContext,
@transient private val data: Seq[T],
numSlices: Int,
locationPrefs: Map[Int, Seq[String]])
extends RDD[T](sc, Nil)
ParallelCollectionRDD本身沒有構造函數(shù)梯皿,它的初始化動作是在RDD這個抽象類中實現(xiàn)的,請注意傳入的構造函數(shù)sc雀久,Nil署穗,接下來看一下RDD構造函數(shù):
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging
例子二:
接下來我們看一下join算子寥裂,join一定會產(chǎn)生shuffle嗎?案疲?接下來我們看一下:
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[72] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1)),10)
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[73] at makeRDD at <console>:24
scala> rdd1.partitions.length
res8: Int = 2
scala> rdd2.partitions.length
res9: Int = 5
已經(jīng)按照參數(shù)進行分區(qū)成功封恰,
scala> val rdd3 = rdd1.join(rdd2)
rdd3: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[10] at join at <console>:28
rdd3怎么會是一個MapPartitionsRDD,join不是一本shuffle操作嗎褐啡,稍后我們會分析一下源碼接著下面操作
scala> rdd3.dependencies
res13: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@69718b8c)
rdd3的依賴也是一個窄依賴
scala> rdd3.dependencies(0).rdd
res14: org.apache.spark.rdd.RDD[_] = MapPartitionsRDD[9] at join at <console>:28
rdd3的依賴rdd也是一個MapPartitionsRDD,那我們順著依賴倒退回去看看
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd
res17: org.apache.spark.rdd.RDD[_] = CoGroupedRDD[8] at join at <console>:28
終于出現(xiàn)CoGroupedRDD了
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies
res18: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@7d1a9695, org.apache.spark.ShuffleDependency@7acc1c2b)
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies.foreach(x=>println(x.rdd))
ParallelCollectionRDD[0] at makeRDD at <console>:24
ParallelCollectionRDD[1] at makeRDD at <console>:24
ParallelCollectionRDD[0] ----- CoGroupedRDD -- MapPartitionsRDD
ParallelCollectionRDD[1]
到這里你應該已經(jīng)明白了诺舔,rdd1.join(rdd2)的過程中rdd是怎么樣做的算子轉(zhuǎn)化的,其中CoGroupedRDD的依賴有兩個都是ShuffleDependency依賴的類型
但是CoGroupedRDD之后的MapPartitionsRDD是怎么產(chǎn)生的那备畦?低飒?看下CoGroupedRDD源代碼:
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
(vs.asInstanceOf[Iterable[V]],
w1s.asInstanceOf[Iterable[W1]],
w2s.asInstanceOf[Iterable[W2]],
w3s.asInstanceOf[Iterable[W3]])
}
}
可以看到CoGroupedRDD之后又做了mapValues的操作,
def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)//閉包清理
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
preservesPartitioning = true)
}
返回一個MapPartitionsRDD
例子三:
現(xiàn)在我們對rdd1做一點小變化
scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[41] at makeRDD at <console>:24
再rdd1對它進行hash分區(qū)懂盐,rdd2保持不變
scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5).partitionBy(new org.apache.spark.HashPartitioner(2))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[50] at partitionBy at <console>:24
再將rdd1和rdd2做join操作
scala> val rdd3 = rdd1.join(rdd2)
rdd3: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[53] at join at <console>:28
我們看一下rdd3的依賴關系有什么變化:
scala> rdd3.dependencies
res45: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@381c4f8)
scala> rdd3.dependencies(0)
res46: org.apache.spark.Dependency[_] = org.apache.spark.OneToOneDependency@381c4f8
scala> rdd3.dependencies(0).rdd
res47: org.apache.spark.rdd.RDD[_] = MapPartitionsRDD[52] at join at <console>:28
scala> rdd3.dependencies(0).rdd.dependencies
res48: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@3672c520)
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd
res49: org.apache.spark.rdd.RDD[_] = CoGroupedRDD[51] at join at <console>:28
到這里你會發(fā)現(xiàn)和之前的依賴關系都沒有變化褥赊,繼續(xù)看下去
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies.foreach(x=>println(x.rdd))
ShuffledRDD[50] at partitionBy at <console>:24
ParallelCollectionRDD[37] at makeRDD at <console>:24
由于我們對rdd1做了一次partitionBy,那么它將變成一個ShuffledRDD莉恼,兩一個依賴的rdd還是ParallelCollectionRDD拌喉,這沒有什么問題,看一下依賴的類型
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies.foreach(x=>println(x))
org.apache.spark.OneToOneDependency@62fe218f
org.apache.spark.ShuffleDependency@2a914623
你會發(fā)現(xiàn)由于rdd1我們采用了hash重新分區(qū)俐银,和之前的對比你會發(fā)現(xiàn)依賴關系由之前的ShuffleDependency變成了OneToOneDependency尿背。
發(fā)生這個變換是發(fā)生在CoGroupedRDD的依賴,我們看一下CoGroupedRDD的依賴是怎么得到的:
override def getDependencies: Seq[Dependency[_]] = {
rdds.map { rdd: RDD[_] =>//對于參與cogroup操作的都要進行遍歷和判斷
if (rdd.partitioner == Some(part)) {//如果參與cogroup的rdd的分區(qū)方法等于part分區(qū)方法那么產(chǎn)生窄依賴
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency[K, Any, CoGroupCombiner](
rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
}
}
}
很明顯了悉患,其實在做CoGroupedRDD的時候并不是每個一定會產(chǎn)生ShuffleDependency残家,可以參考注釋但是需要解釋一下的就是那么part是什么榆俺,我們需要看一下join
的代碼是怎么實現(xiàn)的:
join實現(xiàn)幾個api售躁,我們這里是僅僅傳入一個參數(shù)
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
join(other, defaultPartitioner(self, other))
}
可以看到join底層實現(xiàn)是必須要實現(xiàn)分區(qū)器的坞淮,如果不傳入的情況下,使用默認的defaultPartitioner陪捷,一般情況下defaultPartitioner就是HashPartitioner
(defaultPartitioner可以去看一下源碼這里不做分析)
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}
重點看下cogroup傳入的兩個參數(shù):other--參與join的另一個rdd回窘,partitioner分區(qū)器
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Array(vs, w1s) =>
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
}
}
CoGroupedRDD:傳入第一個參數(shù)就是一個序列參與join兩個rdd,請注意他們的順序市袖,第二個參數(shù)就是分區(qū)器啡直,好了到這里我們知道了part其實就是你調(diào)用join的時候指定的分區(qū)方法,如果你沒有傳入那么會給你一個defaultPartitioner一般情況下就是HashPartitioner苍碟,他也就是上面說的到part, if (rdd.partitioner == Some(part)) 就是為了判斷參與join的分區(qū)方法是否與join的時候的分區(qū)方法是否相等酒觅,如果他們使用同一種分區(qū)方法那就會形成窄依賴
如果采用不同的方法那么就會產(chǎn)生一個寬依賴,其實道理也很好懂微峰,采用相同的分區(qū)方法那么在join的時候其實也就知道了分區(qū)的對應關系舷丹。
那么我們?yōu)槭裁丛诶佣校瑸槭裁串a(chǎn)生了不同的結果那蜓肆,看一下例子二中rdd1和rdd2的分區(qū)方法是什么
scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1)),10)
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at makeRDD at <console>:24
scala> rdd1.partitioner
res0: Option[org.apache.spark.Partitioner] = None
scala> rdd2.partitioner
res1: Option[org.apache.spark.Partitioner] = None
partitioner為None颜凯,沒有分區(qū)方法,可以去看看makeRDD是怎么樣對數(shù)據(jù)分區(qū)做劃分的
例子四:
最后我們看一下reduceByKey的情況
scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5).partitionBy(new org.apache.spark.HashPartitioner(2))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[6] at partitionBy at <console>:24
scala> var rdd3 = rdd1.reduceByKey(_ + _)
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[7] at reduceByKey at <console>:26
scala> rdd3.dependencies
res4: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@43d35c2a)
scala> rdd3.dependencies(0).rdd
res5: org.apache.spark.rdd.RDD[_] = ShuffledRDD[6] at partitionBy at <console>:24
scala> rdd3.dependencies(0).rdd.dependencies
res6: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@480217c6)
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd
res7: org.apache.spark.rdd.RDD[_] = ParallelCollectionRDD[5] at makeRDD at <console>:24
可以看到rdd1盡管執(zhí)行了reduceByKey仗扬,但是沒有產(chǎn)生寬依賴症概,看reduceByKey的源碼實現(xiàn)發(fā)現(xiàn)原因和之前join原理是一樣的,會對分區(qū)方法進行判斷時候產(chǎn)生寬窄依賴早芭,
如果rdd1不進行partitionBy操作彼城,就會產(chǎn)生寬依賴,因為僅僅執(zhí)行makeRDD產(chǎn)生的rdd的partitioner方法為None退个,有興趣的可以下自行驗證精肃。
總結: rdd作為彈性分布式數(shù)據(jù)集,我們都知道他是惰性計算,再遇到action算子之前不會對數(shù)據(jù)進行真正的計算帜乞,僅僅會保存這些rdd之間的依賴關系司抱,在真正要計算的時候通過每個rdd的dependences去找到它的父親依賴關系和父親rdd,從而進行回溯黎烈,stage的劃分也就是通過計算依賴完成的习柠。通過上面的例子分析,不難發(fā)現(xiàn)其實rdd的一些shuffle算子join,reduceByKey不一定會產(chǎn)生寬依賴照棋,取決于傳入算子中的分區(qū)計算和調(diào)用這個算子的rdd的分區(qū)方法