rdd的依賴

例子一:

一個簡單的例子,看一下mappartitonrdd是怎么形成依賴的

scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[88] at makeRDD at <console>:24

scala> val rdd2 = rdd1.map(x=>(x._1,x._2+1))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[90] at map at <console>:26
通過makeRDD望拖,構造了ParallelCollectionRDD哼蛆,ParallelCollectionRDD通過map算子形成了MapPartitionsRDD
makeRDD ----  返回一個ParallelCollectionRDD
private[spark] class ParallelCollectionRDD[T: ClassTag](
    sc: SparkContext,
    @transient private val data: Seq[T],
    numSlices: Int,
    locationPrefs: Map[Int, Seq[String]])
    extends RDD[T](sc, Nil)
ParallelCollectionRDD本身沒有構造函數(shù)梯皿,它的初始化動作是在RDD這個抽象類中實現(xiàn)的,請注意傳入的構造函數(shù)sc雀久,Nil署穗,接下來看一下RDD構造函數(shù):
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging 

例子二:

接下來我們看一下join算子寥裂,join一定會產(chǎn)生shuffle嗎?案疲?接下來我們看一下:

rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[72] at makeRDD at <console>:24

scala> var rdd2 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1)),10)
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[73] at makeRDD at <console>:24
scala> rdd1.partitions.length
res8: Int = 2

scala> rdd2.partitions.length
res9: Int = 5
已經(jīng)按照參數(shù)進行分區(qū)成功封恰,
scala> val rdd3 = rdd1.join(rdd2)
rdd3: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[10] at join at <console>:28
rdd3怎么會是一個MapPartitionsRDD,join不是一本shuffle操作嗎褐啡,稍后我們會分析一下源碼接著下面操作
scala> rdd3.dependencies
res13: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@69718b8c)
rdd3的依賴也是一個窄依賴
scala> rdd3.dependencies(0).rdd
res14: org.apache.spark.rdd.RDD[_] = MapPartitionsRDD[9] at join at <console>:28
rdd3的依賴rdd也是一個MapPartitionsRDD,那我們順著依賴倒退回去看看
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd
res17: org.apache.spark.rdd.RDD[_] = CoGroupedRDD[8] at join at <console>:28
終于出現(xiàn)CoGroupedRDD了
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies
res18: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@7d1a9695, org.apache.spark.ShuffleDependency@7acc1c2b)

scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies.foreach(x=>println(x.rdd))
ParallelCollectionRDD[0] at makeRDD at <console>:24
ParallelCollectionRDD[1] at makeRDD at <console>:24

ParallelCollectionRDD[0]  ----- CoGroupedRDD -- MapPartitionsRDD
ParallelCollectionRDD[1] 

到這里你應該已經(jīng)明白了诺舔,rdd1.join(rdd2)的過程中rdd是怎么樣做的算子轉(zhuǎn)化的,其中CoGroupedRDD的依賴有兩個都是ShuffleDependency依賴的類型
但是CoGroupedRDD之后的MapPartitionsRDD是怎么產(chǎn)生的那备畦?低飒?看下CoGroupedRDD源代碼:

  def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
      other2: RDD[(K, W2)],
      other3: RDD[(K, W3)],
      partitioner: Partitioner)
      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
    if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
    cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
       (vs.asInstanceOf[Iterable[V]],
         w1s.asInstanceOf[Iterable[W1]],
         w2s.asInstanceOf[Iterable[W2]],
         w3s.asInstanceOf[Iterable[W3]])
    }
  }
  可以看到CoGroupedRDD之后又做了mapValues的操作,
    def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
    val cleanF = self.context.clean(f)//閉包清理
    new MapPartitionsRDD[(K, U), (K, V)](self,
      (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
      preservesPartitioning = true)
  }
  返回一個MapPartitionsRDD

例子三:

現(xiàn)在我們對rdd1做一點小變化

  scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[41] at makeRDD at <console>:24
再rdd1對它進行hash分區(qū)懂盐,rdd2保持不變
scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5).partitionBy(new org.apache.spark.HashPartitioner(2))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[50] at partitionBy at <console>:24
再將rdd1和rdd2做join操作
scala> val rdd3 = rdd1.join(rdd2)
rdd3: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = MapPartitionsRDD[53] at join at <console>:28
我們看一下rdd3的依賴關系有什么變化:
scala> rdd3.dependencies
res45: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@381c4f8)
scala> rdd3.dependencies(0)
res46: org.apache.spark.Dependency[_] = org.apache.spark.OneToOneDependency@381c4f8
scala> rdd3.dependencies(0).rdd
res47: org.apache.spark.rdd.RDD[_] = MapPartitionsRDD[52] at join at <console>:28
scala> rdd3.dependencies(0).rdd.dependencies
res48: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@3672c520)
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd
res49: org.apache.spark.rdd.RDD[_] = CoGroupedRDD[51] at join at <console>:28
到這里你會發(fā)現(xiàn)和之前的依賴關系都沒有變化褥赊,繼續(xù)看下去
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies.foreach(x=>println(x.rdd))
ShuffledRDD[50] at partitionBy at <console>:24
ParallelCollectionRDD[37] at makeRDD at <console>:24
由于我們對rdd1做了一次partitionBy,那么它將變成一個ShuffledRDD莉恼,兩一個依賴的rdd還是ParallelCollectionRDD拌喉,這沒有什么問題,看一下依賴的類型
scala> rdd3.dependencies(0).rdd.dependencies(0).rdd.dependencies.foreach(x=>println(x))
org.apache.spark.OneToOneDependency@62fe218f
org.apache.spark.ShuffleDependency@2a914623

你會發(fā)現(xiàn)由于rdd1我們采用了hash重新分區(qū)俐银,和之前的對比你會發(fā)現(xiàn)依賴關系由之前的ShuffleDependency變成了OneToOneDependency尿背。
發(fā)生這個變換是發(fā)生在CoGroupedRDD的依賴,我們看一下CoGroupedRDD的依賴是怎么得到的:

  override def getDependencies: Seq[Dependency[_]] = {
    rdds.map { rdd: RDD[_] =>//對于參與cogroup操作的都要進行遍歷和判斷
      if (rdd.partitioner == Some(part)) {//如果參與cogroup的rdd的分區(qū)方法等于part分區(qū)方法那么產(chǎn)生窄依賴
        logDebug("Adding one-to-one dependency with " + rdd)
        new OneToOneDependency(rdd)
      } else {
        logDebug("Adding shuffle dependency with " + rdd)
        new ShuffleDependency[K, Any, CoGroupCombiner](
          rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
      }
    }
  }

很明顯了悉患,其實在做CoGroupedRDD的時候并不是每個一定會產(chǎn)生ShuffleDependency残家,可以參考注釋但是需要解釋一下的就是那么part是什么榆俺,我們需要看一下join
的代碼是怎么實現(xiàn)的:
join實現(xiàn)幾個api售躁,我們這里是僅僅傳入一個參數(shù)

  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
    join(other, defaultPartitioner(self, other))
  }

可以看到join底層實現(xiàn)是必須要實現(xiàn)分區(qū)器的坞淮,如果不傳入的情況下,使用默認的defaultPartitioner陪捷,一般情況下defaultPartitioner就是HashPartitioner
(defaultPartitioner可以去看一下源碼這里不做分析)

  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
    this.cogroup(other, partitioner).flatMapValues( pair =>
      for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
    )
  }
 重點看下cogroup傳入的兩個參數(shù):other--參與join的另一個rdd回窘,partitioner分區(qū)器
   def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
      : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
    if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
    cg.mapValues { case Array(vs, w1s) =>
      (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
    }
  }

CoGroupedRDD:傳入第一個參數(shù)就是一個序列參與join兩個rdd,請注意他們的順序市袖,第二個參數(shù)就是分區(qū)器啡直,好了到這里我們知道了part其實就是你調(diào)用join的時候指定的分區(qū)方法,如果你沒有傳入那么會給你一個defaultPartitioner一般情況下就是HashPartitioner苍碟,他也就是上面說的到part, if (rdd.partitioner == Some(part)) 就是為了判斷參與join的分區(qū)方法是否與join的時候的分區(qū)方法是否相等酒觅,如果他們使用同一種分區(qū)方法那就會形成窄依賴
如果采用不同的方法那么就會產(chǎn)生一個寬依賴,其實道理也很好懂微峰,采用相同的分區(qū)方法那么在join的時候其實也就知道了分區(qū)的對應關系舷丹。

那么我們?yōu)槭裁丛诶佣校瑸槭裁串a(chǎn)生了不同的結果那蜓肆,看一下例子二中rdd1和rdd2的分區(qū)方法是什么

scala>  var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5)
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1)),10)
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at makeRDD at <console>:24

scala> rdd1.partitioner
res0: Option[org.apache.spark.Partitioner] = None
scala> rdd2.partitioner
res1: Option[org.apache.spark.Partitioner] = None
partitioner為None颜凯,沒有分區(qū)方法,可以去看看makeRDD是怎么樣對數(shù)據(jù)分區(qū)做劃分的

例子四:

最后我們看一下reduceByKey的情況

scala> var rdd1 = sc.makeRDD(Array((1,1),(2,1),(3,1),(4,1),(5,1)),5).partitionBy(new org.apache.spark.HashPartitioner(2))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[6] at partitionBy at <console>:24

scala> var rdd3 = rdd1.reduceByKey(_ + _)
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[7] at reduceByKey at <console>:26

scala> rdd3.dependencies
res4: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@43d35c2a)

scala> rdd3.dependencies(0).rdd
res5: org.apache.spark.rdd.RDD[_] = ShuffledRDD[6] at partitionBy at <console>:24

scala> rdd3.dependencies(0).rdd.dependencies
res6: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@480217c6)

scala> rdd3.dependencies(0).rdd.dependencies(0).rdd
res7: org.apache.spark.rdd.RDD[_] = ParallelCollectionRDD[5] at makeRDD at <console>:24

可以看到rdd1盡管執(zhí)行了reduceByKey仗扬,但是沒有產(chǎn)生寬依賴症概,看reduceByKey的源碼實現(xiàn)發(fā)現(xiàn)原因和之前join原理是一樣的,會對分區(qū)方法進行判斷時候產(chǎn)生寬窄依賴早芭,
如果rdd1不進行partitionBy操作彼城,就會產(chǎn)生寬依賴,因為僅僅執(zhí)行makeRDD產(chǎn)生的rdd的partitioner方法為None退个,有興趣的可以下自行驗證精肃。

總結: rdd作為彈性分布式數(shù)據(jù)集,我們都知道他是惰性計算,再遇到action算子之前不會對數(shù)據(jù)進行真正的計算帜乞,僅僅會保存這些rdd之間的依賴關系司抱,在真正要計算的時候通過每個rdd的dependences去找到它的父親依賴關系和父親rdd,從而進行回溯黎烈,stage的劃分也就是通過計算依賴完成的习柠。通過上面的例子分析,不難發(fā)現(xiàn)其實rdd的一些shuffle算子join,reduceByKey不一定會產(chǎn)生寬依賴照棋,取決于傳入算子中的分區(qū)計算和調(diào)用這個算子的rdd的分區(qū)方法

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末资溃,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子烈炭,更是在濱河造成了極大的恐慌溶锭,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,331評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件符隙,死亡現(xiàn)場離奇詭異趴捅,居然都是意外死亡垫毙,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,372評論 3 398
  • 文/潘曉璐 我一進店門拱绑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來综芥,“玉大人,你說我怎么就攤上這事猎拨“蛎辏” “怎么了?”我有些...
    開封第一講書人閱讀 167,755評論 0 360
  • 文/不壞的土叔 我叫張陵红省,是天一觀的道長额各。 經(jīng)常有香客問我,道長吧恃,這世上最難降的妖魔是什么臊泰? 我笑而不...
    開封第一講書人閱讀 59,528評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮蚜枢,結果婚禮上缸逃,老公的妹妹穿的比我還像新娘。我一直安慰自己厂抽,他們只是感情好需频,可當我...
    茶點故事閱讀 68,526評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著筷凤,像睡著了一般昭殉。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上藐守,一...
    開封第一講書人閱讀 52,166評論 1 308
  • 那天挪丢,我揣著相機與錄音,去河邊找鬼卢厂。 笑死乾蓬,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的慎恒。 我是一名探鬼主播任内,決...
    沈念sama閱讀 40,768評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼融柬!你這毒婦竟也來了死嗦?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,664評論 0 276
  • 序言:老撾萬榮一對情侶失蹤粒氧,失蹤者是張志新(化名)和其女友劉穎越除,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,205評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡摘盆,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,290評論 3 340
  • 正文 我和宋清朗相戀三年翼雀,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片骡澈。...
    茶點故事閱讀 40,435評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡锅纺,死狀恐怖掷空,靈堂內(nèi)的尸體忽然破棺而出肋殴,到底是詐尸還是另有隱情,我是刑警寧澤坦弟,帶...
    沈念sama閱讀 36,126評論 5 349
  • 正文 年R本政府宣布护锤,位于F島的核電站,受9級特大地震影響酿傍,放射性物質(zhì)發(fā)生泄漏烙懦。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,804評論 3 333
  • 文/蒙蒙 一赤炒、第九天 我趴在偏房一處隱蔽的房頂上張望氯析。 院中可真熱鬧,春花似錦莺褒、人聲如沸掩缓。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,276評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽你辣。三九已至,卻和暖如春尘执,著一層夾襖步出監(jiān)牢的瞬間舍哄,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評論 1 272
  • 我被黑心中介騙來泰國打工誊锭, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留表悬,地道東北人。 一個月前我還...
    沈念sama閱讀 48,818評論 3 376
  • 正文 我出身青樓丧靡,卻偏偏與公主長得像签孔,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子窘行,可洞房花燭夜當晚...
    茶點故事閱讀 45,442評論 2 359