spark之transform和action

1. tranformation

  1. map
    map實(shí)現(xiàn)如下:
 def map[U: ClassTag](f: T => U): RDD[U] = withScope {
   val cleanF = sc.clean(f)
   new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
 }

map接收一個(gè)函數(shù)f為參數(shù),該函數(shù)接收參數(shù)類型T,然后返回類型U。當(dāng)前RDD數(shù)據(jù)類型T,map使用函數(shù)f將RDD中的每一條記錄轉(zhuǎn)換為類型為U的數(shù)據(jù)余蟹。 比如:

// 創(chuàng)建一個(gè)新的RDD oddNums,包含兩個(gè)partition子刮,只有奇數(shù)組成威酒。
val oddNums = sc.parallelize(List(1,3,5,7,9),2)
// 使用函數(shù) x => x + 1將 oddNums中的奇數(shù)轉(zhuǎn)換成偶數(shù)。
val evenNums = oddNums.map(x => x + 1)

從map的實(shí)現(xiàn)的可以看出话告,函數(shù)cleanF是通過(guò)iter.map(cleanF)發(fā)揮作用的,這就意味著iter中有多少個(gè)值兼搏,cleanF就會(huì)調(diào)用多少次,后面還會(huì)介紹mapPartitions沙郭,作用和map一樣佛呻,但是實(shí)現(xiàn)有所區(qū)別,將會(huì)在mapPartitions中提到病线。

  1. flatMap
    flatMap的原型如下:
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

flatMap將每一個(gè)元素轉(zhuǎn)換成一個(gè)集合類型吓著,然后又將這些集合的元素拿出來(lái)展開(kāi)拼在一起作為下一個(gè)RDD的數(shù)據(jù)。

flatMap接收的參數(shù)f同樣也是一個(gè)函數(shù)送挑,這個(gè)函數(shù)接收T類型(當(dāng)前RDD的數(shù)據(jù)類型)绑莺,然后返回一個(gè)集合,集合的元素類型為U(TraversableOnce一般都是集合實(shí)現(xiàn)的特質(zhì))惕耕。

flatMap調(diào)用new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)), iter迭代源RDD纺裁,迭代的元素類型T,it er.flatMap使用函數(shù)f將T轉(zhuǎn)換成U的集合司澎,然后返回U的集合上的迭代器欺缘。比如T1被轉(zhuǎn)換成集合[U1,U2,U3], U上的迭代器迭代返回U1,U2,U3三個(gè)元素,而不是[U1,U2,U3]這個(gè)集合挤安,也就是說(shuō)集合被展開(kāi)了谚殊。

看一個(gè)例子:

//RDD someNums包含數(shù)據(jù)1,2,3,4,5。要把它轉(zhuǎn)換成1,1,2,2,3,3,4,4,5,5
val someNums = sc.parallelize(List(1,2,3,4,5))
val doubleSomeNums = someNums.flatMap(x => List(x,x))
doubleSomeNums.collect
// Array[Int] = Array(1, 1, 2, 2, 3, 3, 4, 4, 5, 5)蛤铜,上面的1,2,3,4,5首先被轉(zhuǎn)換成[1,1],[2,2],[3,3],[4,4],[5,5],然后在被連接成1,1,2,2,3,3,4,4
  1. filter
    原型如下:
def filter(f: T => Boolean): RDD[T] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[T, T](
    this,
    (context, pid, iter) => iter.filter(cleanF),
    preservesPartitioning = true)
}

filter接收斷言f嫩絮,對(duì)RDD中的數(shù)據(jù)丛肢,滿足f的返回,不滿足的丟棄剿干。

  1. distinct
    原型如下:
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
   map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
 }

distinct會(huì)將RDD中重復(fù)的數(shù)據(jù)只保留一份蜂怎,這是一個(gè)全局去重操作,而不是僅僅對(duì)每個(gè)分區(qū)操作去重置尔,全局去重就意味著需要將散落在各個(gè)分區(qū)里的元素聚合到一起派敷。

上面代碼表明的它的實(shí)現(xiàn)原理:

  • 使用map將單個(gè)value映射成(value,null)這樣的鍵值對(duì);
  • reduceByKey將相同的聚集在一起,(x,y) => x是其聚集使用的函數(shù) 撰洗,聚合函數(shù)是作用在key相同的value上的,由于所有value都是null腐芍,所以這其實(shí)是(null,null) => null的函數(shù)差导。
  • map(_._1)返回key,以數(shù)據(jù)[2,2]為例:
        map(x => (x, null))   reduceByKey   map(_._1)
                 |                 |             |
輸入2,2 -> (2,null),(2,null) ->  (2, null)   ->      2
  1. coalesce
    coalesce用來(lái)改變RDD的分區(qū)個(gè)數(shù)猪勇,重新分區(qū)设褐。方法原型如下:
def coalesce(numPartitions: Int, shuffle: Boolean = false,
            partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
           (implicit ord: Ordering[T] = null)
   : RDD[T] 

參數(shù)shuffle=true且是在擴(kuò)大分區(qū)數(shù)(即目標(biāo)rdd分區(qū)數(shù)numPartitions大于當(dāng)前分區(qū))則會(huì)導(dǎo)致shuffle過(guò)程。

  1. union
    union用來(lái)將多個(gè)RDD做并集泣刹,合并后的數(shù)據(jù)不會(huì)進(jìn)行去重助析。
    其方法原型:
  def union(other: RDD[T]): RDD[T] = withScope {
  sc.union(this, other)
  }
  def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope {
  /*獲得所有參與union的rdd的分區(qū)方法partitioner,轉(zhuǎn)換成set
   這就意味著如果所有的rdd使用相同的分區(qū)方法椅您,比如都是HashPartitioner,
  而且并且各自的partitioner相等(即equals返回true外冀,對(duì)于HashPartitioner來(lái)說(shuō),
  equals為 true的條件是分區(qū)的個(gè)數(shù)一樣掀泳,RangePartitioner要復(fù)雜一點(diǎn))雪隧,那么返回的set即partitioners的size為1.
   */
   val partitioners = rdds.flatMap(_.partitioner).toSet
  if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
    new PartitionerAwareUnionRDD(this, rdds)
  } else {
    new UnionRDD(this, rdds)
  }
  }

上面if分支中,如果參與union的rdd都定義了partitioner(rdds.forall(_.partitioner.isDefined)返回true,一般只有ShuffledRDD有partitioner)且它們的partitioner一樣,這就表示參與union的rdd都產(chǎn)生相同個(gè)數(shù)的分區(qū)(假設(shè)個(gè)數(shù)為p)员舵,這就好辦了脑沿,union生成新的RDD:PartitionerAwareUnionRDD,新的RDD的擁有p個(gè)分區(qū)马僻,第i個(gè)分區(qū)就有上游參與union的rdd里的第i個(gè)分區(qū)組成庄拇。所以總結(jié)一下,這種情況所有父rdd都有p個(gè)分區(qū)韭邓,那生成的新的rdd也有p個(gè)分區(qū)措近。

else分支中,創(chuàng)建UnionRDD仍秤。假設(shè)參與合并的rdd1熄诡,rdd2的分區(qū)分別是(R1P1,R1P2)和(R2P1,R2P2),一共4個(gè)分區(qū)诗力,新的UnionRDD也將有四個(gè)分區(qū)凰浮,也就是(R1P1,R1P2,R2P1,R2P2)我抠。

  1. sortBy、sortByKey
    對(duì)RDD中的數(shù)據(jù)進(jìn)行全局排序袜茧,下面是sortBy的原型:
def sortBy[K](
    f: (T) => K,
    ascending: Boolean = true,
    numPartitions: Int = this.partitions.length)
    (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
  this.keyBy[K](f)
      .sortByKey(ascending, numPartitions)
      .values
}

上面sortBy實(shí)際上上調(diào)用了OrderedRDDFunctions#sortByKey(注: OrderedRDDFunctions經(jīng)常出現(xiàn)菜拓,這里使用到了scala的隱式轉(zhuǎn)換RDD隱式轉(zhuǎn)換成OrderedRDDFunctions)方法。

sortByKey的是機(jī)遇reduce會(huì)對(duì)key進(jìn)行排序這一原理實(shí)現(xiàn)的笛厦,利用每一個(gè)reducer會(huì)對(duì)自己分區(qū)內(nèi)的key進(jìn)行排序的原理纳鼎,但是由于reducer只會(huì)保證自己分區(qū)內(nèi)的數(shù)據(jù)按key排序,分區(qū)之間的有序則需要另外的機(jī)制來(lái)保證(參考hadoop terasort的排序原理)裳凸。

這里簡(jiǎn)單說(shuō)一下原理:假設(shè)有10個(gè)分區(qū)贱鄙,那我門就從數(shù)據(jù)中采樣9個(gè)數(shù),這9個(gè)數(shù)就決定了10個(gè)區(qū)間姨谷,然后shuffle時(shí)逗宁,就將每一個(gè)上游rdd中的數(shù)據(jù)都落到10個(gè)里的其中一個(gè),這樣partition之間也就有序了梦湘。

即然有shuffle這個(gè)過(guò)程瞎颗,也就需要一個(gè)paritioner來(lái)決定數(shù)據(jù)流向下游那一個(gè)reducer,這里使用到的partitioner是RangePartitioner捌议,而這里RangePartitoner的range的劃分也就是上一段里那個(gè)簡(jiǎn)單原理介紹中所說(shuō)哼拔。

注:關(guān)于shuffle的過(guò)程有興趣的話可以參考Spark shuffle 原理
注:關(guān)于隱式轉(zhuǎn)換可以參考scala 隱式轉(zhuǎn)換

  1. intersection
    求兩個(gè)rdd的交集,交集的結(jié)果會(huì)去重瓣颅,方法原型如下:
def intersection(other: RDD[T]): RDD[T] = withScope {
  this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
      .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
      .keys
}

使用到了cogroup(PairRDDFunction提供的方法倦逐,PairRDDFunction中的方法只能作用于數(shù)據(jù)(key,value)形式的RDD,這里同樣使用了RDD到PairRDDFunction的隱式轉(zhuǎn)換)弄捕。兩個(gè)rdd僻孝,分別是r1、r2守谓,做cogroup操作穿铆,依然是按照兩個(gè)rdd中相同的key做group,cogroup生成一個(gè)CoGroupedRDD類型的RDD斋荞,生成新的RDD的數(shù)據(jù)中key即源r1荞雏,r2相同的key,value是一個(gè)tuple平酿,tuple的第一個(gè)元素是r1中key對(duì)應(yīng)所有value上的iterator凤优,第二個(gè)元素是r2中該key的所有value的iterator。

回到intersection方法蜈彼,由于cogroup只能作用于數(shù)據(jù)(key,value)這種二元組形式的RDD筑辨,所以先將RDD的value map成(value, null); 接著做cogroup,做完cogroup之后幸逆,對(duì)于相交的數(shù)據(jù)棍辕,必然二元組中兩個(gè)部分都不空(也就滿足filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty條件)暮现。

下面是一個(gè)例子展示數(shù)據(jù)變換過(guò)程

//()表示tuple,[]表示列表, E表示空
輸入: rdd1: 1, 2 ,3;rdd2: 2,3,4
map: rdd1 -> rdd3: (1,null),(2,null),(3,null)
map: rdd2 -> rdd4: (2,null),(3,null),(4,null)
cogroup:rdd3,rdd4 ->  rdd5: (1, ([null], E)), (2, ([null], [null])), (3, ([null], [null])), (4,(E, [null]))
//value中有E表示這個(gè)key只存在于一個(gè)rdd中楚昭,去掉
filter: (2, ([null], [null])), (3, ([null], [null]))
keys: 2,3

其他的3個(gè)或者更多個(gè)rdd參與cogroup原理是一樣的栖袋。
由于cogroup是一個(gè)比較復(fù)雜的過(guò)程,可以參考附錄cogroup抚太。

  1. glom
    方法如下:
def glom(): RDD[Array[T]] = withScope {
    new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
  }

glom將分區(qū)里的所有數(shù)據(jù)合成到一個(gè)數(shù)組塘幅。
比如:

// rdd r1 包含1 to 5, 分成兩個(gè)分區(qū).分區(qū)1包含1,2尿贫;分區(qū)2包含3,4,5
scala> val r1 = sc.parallelize(1 to 5,2)
scala> r1.collect
res20: Array[Int] = Array(1, 2, 3, 4, 5)

// glom并調(diào)用collect查看結(jié)果. 依然包含兩個(gè)分區(qū)电媳,但是分區(qū)的元素被合成數(shù)組,也就是說(shuō)原來(lái)分區(qū)1包含兩個(gè)數(shù)據(jù)記錄庆亡,現(xiàn)在只有一個(gè)類型為Array的數(shù)據(jù)記錄了匆背。
scala> r1.glom.collect
res21: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4, 5))

從glom的實(shí)現(xiàn)來(lái)看,使用了iter.toArray將源rdd的一個(gè)分區(qū)里的數(shù)據(jù)放到一個(gè)數(shù)據(jù)里身冀,是一個(gè)很消耗內(nèi)存的方法,分區(qū)數(shù)據(jù)很多時(shí)還是要注意使用括享。

  1. cartesian
    對(duì)兩個(gè)rdd做笛卡爾積搂根,方法原型如下:
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
  new CartesianRDD(sc, this, other)
}

假設(shè)參與笛卡爾積的兩個(gè)rdd分別是r1,r2擁有分區(qū)[r1p1,r1p2]和[r2p1,r2p2]铃辖,r1.cartesian(r2)生成類型為CartesianRDD的新rdd剩愧,假設(shè)是r3,r3擁有分區(qū)就是r1和r2分區(qū)的笛卡爾積, 即:[(r1p1, r2p1), (r1p1, r2p2), (r1p2, r2p1), (r1p2, r2p2)], 那么在r3上任意一個(gè)分區(qū)上計(jì)算時(shí)娇斩,假設(shè)是(r1p1, r2p1)上,只需要迭代r1p1, r2p1里的數(shù)據(jù)然后做笛卡爾積就行了仁卷。

下面是CartesianRDD的getPartitions方法:

override def getPartitions: Array[Partition] = {
    // array保存分區(qū),個(gè)數(shù)就是rdd1和rdd2分區(qū)個(gè)數(shù)相乘
    val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
    //CartesianRDD擁有的分區(qū)也是rdd1和rdd2分區(qū)的笛卡爾積
    for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
      val idx = s1.index * numPartitionsInRdd2 + s2.index
      array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
    }
    array
  }

下面是CartesianRDD的compute方法:

override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
    val currSplit = split.asInstanceOf[CartesianPartition]
   // CartesianRDD每一個(gè)分區(qū)都是上游rdd1和rdd2各一個(gè)分區(qū)組成犬第,也就是下面的s1锦积,s2. 此處兩重循環(huán)的形式完成元素的笛卡爾積計(jì)算
    for (x <- rdd1.iterator(currSplit.s1, context);
         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
  }
  1. groupBy、 PairRDDFunction#groupByKey
    方法如下:
//由于源rdd可以的數(shù)據(jù)t不是(key,value)這種二元組歉嗓,因此
它需要一個(gè)f能夠把源rdd里的數(shù)據(jù)類型T轉(zhuǎn)換成key的類型K丰介。最終生成的目標(biāo)rdd的數(shù)據(jù)形式是(f(t), t)這種。
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
    //group按key聚合涉及到shuffle鉴分,使用defaultPartitioner獲得默認(rèn)的partitioner是HashPartitoner
    groupBy[K](f, defaultPartitioner(this))
  }

groupBy把相同的key對(duì)應(yīng)的value組合在一起哮幢,可以放到一個(gè)列表中,此外它不保證value的順序志珍,也不保證每次調(diào)用value都按相同方式排列橙垢。 下面是一個(gè)groupBy的例子:

val r1 = sc.parallelize(List(1,2,3,4,3,2),2)
r1.groupBy(x=>x).collect
//groupBy的結(jié)果,key相同的value都被放到CompactBuffer里伦糯,value僅僅是被簡(jiǎn)單的拼接柜某。因此這是一種十分耗時(shí)且消耗存儲(chǔ)的操作嗽元。
// grouyBy和reduceBy底層都使用PairRDDFunctions#combineByKeyWithClassTag,只不過(guò)使用的用來(lái)聚合value的aggregator不同莺琳,groupBy的aggregator就是將value加到CompactBuffer里还棱。
res39: Array[(Int, Iterable[Int])] = Array((4,CompactBuffer(4)), (2,CompactBuffer(2, 2)), (1,CompactBuffer(1)), (3,CompactBuffer(3, 3)))

這里一路跟到PairRDDFunctions#groupByKey的實(shí)現(xiàn)看看:

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
   //CompactBuffer可以暫時(shí)理解做高效的ArrayBuffer
    val createCombiner = (v: V) => CompactBuffer(v)
  // mergeValue函數(shù)把key相同的value聚合到一起,這里的實(shí)現(xiàn)是直接到v添加到數(shù)組末尾
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }

從上面聚合的方式來(lái)看惭等,就是把value都放到數(shù)組里珍手,這在數(shù)據(jù)很多時(shí),是一種很好內(nèi)存的操作辞做,有可能會(huì)OOM琳要,所以要注意,能用reduceBy的就不要用groupBy秤茅。

  1. mapPartitions
    mapPartitions功能和map類似稚补,但還是實(shí)現(xiàn)上還是有區(qū)別的,下面是mapPartitions的原型:
  def mapPartitions[U: ClassTag](
     f: Iterator[T] => Iterator[U],
     preservesPartitioning: Boolean = false): RDD[U] = withScope {
   val cleanedF = sc.clean(f)
   new MapPartitionsRDD(
     this,
     (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
     preservesPartitioning)
 }

 // 作為比較還有map的原型:
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
   val cleanF = sc.clean(f)
   new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
 }

比較一下map和mapPartitions的不同框喳,用戶自定的函數(shù)f课幕,在map中是通過(guò)iter.map(cleanF)調(diào)用的,這意味著每一次iter上的value迭代都會(huì)調(diào)用一次f五垮; 而 mapPartitions中f是通過(guò)cleanedF(iter)調(diào)用的乍惊,直接作用在iter上,然后返回一個(gè)新的iter放仗,f實(shí)際上只被調(diào)用了一次润绎。當(dāng)有些資源需要在f中創(chuàng)建時(shí)(比如jdbc連接),使用map會(huì)導(dǎo)致頻繁創(chuàng)建诞挨,可以考慮使用mapPartitions.

  1. zip
    作用和集合上的zip一樣莉撇,集合上zip會(huì)將兩個(gè)集合相同index上的value合成tuple,這就要求兩個(gè)集合大小一樣惶傻。rdd上的zip要求兩個(gè)rdd擁有相同個(gè)數(shù)的partition棍郎,每個(gè)partition又擁有相同個(gè)數(shù)的數(shù)據(jù)。

如下例子:

// RDD r1包含兩個(gè)分區(qū)
scala> val r1 = sc.parallelize(1 to 10,2)
scala> r1.collect
res53: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
//RDD r2同樣兩個(gè)分區(qū)银室,且分區(qū)里數(shù)據(jù)個(gè)數(shù)和r1一樣坝撑。
scala> val r2 = sc.parallelize(11 to 20,2)
scala> r2.collect
res54: Array[Int] = Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
scala> r1.zip(r2).collect
// r1和r2相同下標(biāo)的數(shù)據(jù)組合成一個(gè)元組(tuple)
res56: Array[(Int, Int)] = Array((1,11), (2,12), (3,13), (4,14), (5,15), (6,16), (7,17), (8,18), (9,19), (10,20))
  1. subtract
    求兩個(gè)rdd的差,調(diào)用rdd1.substract(rdd2)會(huì)返回rdd1中去掉和rdd2相同數(shù)據(jù)的剩下部分, 但是不會(huì)對(duì)剩下的部分的數(shù)據(jù)去重粮揉。subtract都會(huì)最終調(diào)用下面的subtract方法:
def subtract(
   other: RDD[T],
   p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
 //判斷源rdd(即rdd1)的partitioner是否為空巡李,不空的話往往意味著源rdd到目地RDD會(huì)產(chǎn)生shuffle操作生成的。
 if (partitioner == Some(p)) {

  //源rdd的partitoner不空扶认,那源RDD的數(shù)據(jù)類型T一定是(key,value)形式的侨拦,這里之所以包裝成新的partitioner,跟下面的map調(diào)用有關(guān)辐宾。下面的map會(huì)把源rdd中(key,value)數(shù)據(jù)作為新生成的rdd中的key狱从,這里新的p2需要從新生成的rdd的key中(此時(shí)key類型(key膨蛮,value))提取出源rdd的key。
   val p2 = new Partitioner() {
     override def numPartitions: Int = p.numPartitions
     override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
     }
    // 再回顧上面的p2對(duì)partitioner的包裝季研,源rdd有partitioner敞葛,則源rdd的類型是范型T實(shí)際是(key,value),此處map又把它轉(zhuǎn)換成((key,value), null)与涡,所以需要包裝成p2去key從(key,value)里取出來(lái)
     this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
     } else {
     this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
     }
   }

下圖是subtract產(chǎn)生的rdd依賴:

subtract產(chǎn)生的rdd依賴

subtractByKey生成新的rdd為SubtractedRDD惹谐,下面是它的getDependencies方法:

 override def getDependencies: Seq[Dependency[_]] = {
    def rddDependency[T1: ClassTag, T2: ClassTag](rdd: RDD[_ <: Product2[T1, T2]])
      : Dependency[_] = {
      /*由于這個(gè)方法傳入的參數(shù)是上圖的rdd3和rdd4都是map產(chǎn)生的,因此rdd.partitioner是空的,所以會(huì)走向else分支驼卖,
       else分支產(chǎn)生了ShuffleDependency氨肌,所以無(wú)論如何都會(huì)產(chǎn)生shuffle。 強(qiáng)迫他產(chǎn)生一次上圖中的shuffle也是可以理解的酌畜,
       因?yàn)閟huffle會(huì)使得上游rdd3怎囚,rdd4中key相同的進(jìn)入到下游SubtractedRDD的同一分區(qū)上,那樣做subtract就簡(jiǎn)單多了桥胞。
       */
      if (rdd.partitioner == Some(part)) {
        logDebug("Adding one-to-one dependency with " + rdd)
        new OneToOneDependency(rdd)
      } else {
        logDebug("Adding shuffle dependency with " + rdd)
        new ShuffleDependency[T1, T2, Any](rdd, part)
      }
    }
   // 這里的rdd1恳守, rdd2對(duì)應(yīng)SubtractedRDD上游依賴也就是上圖的rdd3和rdd4
    Seq(rddDependency[K, V](rdd1), rddDependency[K, W](rdd2))
  }

下面是完成subtract的計(jì)算在SubtractedRDD#compute方法:

override def compute(p: Partition, context: TaskContext): Iterator[(K, V)] = {
    val partition = p.asInstanceOf[CoGroupPartition]
   // map是key到由key相同的value組成的Array的映射,這里所有的value都是null贩虾。
    val map = new JHashMap[K, ArrayBuffer[V]]
    // 對(duì)于key井誉,map中有就返回對(duì)應(yīng)的ArrayBuffer,沒(méi)有就新建立一個(gè)
    def getSeq(k: K): ArrayBuffer[V] = {
      val seq = map.get(k)
      if (seq != null) {
        seq
      } else {
        val seq = new ArrayBuffer[V]()
        map.put(k, seq)
        seq
      }
    } 

   // 由于只有ShuffleDependency整胃,所以只會(huì)走到shuffleDepency的case上。
   // 這個(gè)函數(shù)根據(jù)depNum取到上游依賴的rdd(rdd3或則rdd4喳钟,然后對(duì)每一個(gè)值作為op的參數(shù)調(diào)用)
    def integrate(depNum: Int, op: Product2[K, V] => Unit): Unit = {
      dependencies(depNum) match {
        case oneToOneDependency: OneToOneDependency[_] =>
          val dependencyPartition = partition.narrowDeps(depNum).get.split
          oneToOneDependency.rdd.iterator(dependencyPartition, context)
            .asInstanceOf[Iterator[Product2[K, V]]].foreach(op)

        case shuffleDependency: ShuffleDependency[_, _, _] =>
          //shuffleManager.getReader返回的迭代器迭代的一定是按key排好序的
          val iter = SparkEnv.get.shuffleManager
            .getReader(
              shuffleDependency.shuffleHandle, partition.index, partition.index + 1, context)
            .read()
          iter.foreach(op)
      }
    }

    // depNum = 0屁使,先跌打rdd3 shuffle之后的數(shù)據(jù),按照key在map中拿到ArrayBuffer奔则,再把value都放到ArrayBuffer中蛮寂。
    integrate(0, t => getSeq(t._1) += t._2)
    // 即然是做subtract,在迭代rdd4中的數(shù)據(jù)易茬,對(duì)于每一個(gè)key酬蹋,從map中去掉就行了。
    integrate(1, t => map.remove(t._1))
    map.asScala.iterator.map(t => t._2.iterator.map((t._1, _))).flatten
  }

其原理是迭代rdd1中的key抽莱,放到map中范抓,然后迭代rdd2中的key,在將key從之前的map中刪除食铐,得到的就是求差的結(jié)果匕垫。

  1. zipWithIndex
    對(duì)于rdd中的每一個(gè)數(shù)據(jù),返回的數(shù)據(jù)以及數(shù)據(jù)在rdd中的索引組成的tuple, 如下例:
 val r1 = sc.parallelize(List('a','b','c','d'),2)
 // 返回tuple包括a,b,c,d在rdd中索引虐呻,而且是全局索引象泵。
 scala> r1.zipWithIndex.collect
     res70: Array[(Char, Long)] = Array((a,0), (b,1), (c,2), (d,3))

方法實(shí)現(xiàn)如下:

 def zipWithIndex(): RDD[(T, Long)] = withScope {
   new ZippedWithIndexRDD(this)
 }

zipWithIndex的基本原理:由于需要知道每一個(gè)partition里面的每一個(gè)元素的全局索引寞秃,首先需要計(jì)算出每一個(gè)partition的元素的個(gè)數(shù),這樣就能計(jì)算出第i個(gè)partition的第一個(gè)元素在所有全部數(shù)據(jù)里面的偏移值偶惠,接下來(lái)就簡(jiǎn)單了春寿,由于任務(wù)是基于parition上的數(shù)據(jù)迭代的,那么parition里的數(shù)據(jù)的全局偏移就是該partition的第一個(gè)元素的偏移加上當(dāng)前迭代到的元素在parition里的偏移值忽孽。

下面是ZippedWithIndexRDD中定義的一些方法:

class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) {

  /** The start index of each partition. */
  @transient private val startIndices: Array[Long] = {
   //獲得依賴的父rdd的partition的個(gè)數(shù)
    val n = prev.partitions.length
    if (n == 0) {
      Array.empty
    } else if (n == 1) {
      Array(0L)
    } else {
     /*這里提交了一個(gè)spark job運(yùn)行來(lái)統(tǒng)計(jì)每一個(gè)partition的元素個(gè)數(shù)绑改。
        1. 參數(shù)Utils.getIteratorSize是一個(gè)函數(shù),task運(yùn)行在分區(qū)上時(shí)調(diào)用扒腕,它返回分區(qū)大元素個(gè)數(shù).
        2. 參數(shù)0 until n-1指定了運(yùn)行task的分區(qū)是[0, n-1)绢淀,不需要計(jì)算最后一個(gè)分區(qū)大小,
           因?yàn)樽詈笠粋€(gè)分區(qū)的偏移是前面所有分區(qū)的元素個(gè)數(shù)之和瘾腰。
        3. scanLeft(0L)(_ + _)皆的,runJob返回[0,n-1)的partition大小的列表,scanLeft計(jì)算出偏移蹋盆。
     */
      prev.context.runJob(
        prev,
        Utils.getIteratorSize _,
        0 until n - 1 
      ).scanLeft(0L)(_ + _)
    }
  }

  override def getPartitions: Array[Partition] = {
    //根據(jù)上有partition包裝新的分區(qū)ZippedWithIndexRDDPartition费薄,新的分區(qū)攜帶了自己的偏移。這是一個(gè)窄依賴
    firstParent[T].partitions.map(x => new ZippedWithIndexRDDPartition(x, startIndices(x.index)))
  }

  override def getPreferredLocations(split: Partition): Seq[String] =
    firstParent[T].preferredLocations(split.asInstanceOf[ZippedWithIndexRDDPartition].prev)

  override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = {
    val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
    val parentIter = firstParent[T].iterator(split.prev, context)
    // 重要的是這個(gè)方法栖雾,迭代上游分區(qū)的數(shù)據(jù)楞抡,返回(data, data_index)
    Utils.getIteratorZipWithIndex(parentIter, split.startIndex)
  }
}
 ---------------------------
貼一下Utils.getIteratorZipWithIndex的實(shí)現(xiàn):
1. 參數(shù)iterator是上游partition的迭代器
2. startIndex是上游partition的第一個(gè)元素的全局偏移
 def getIteratorZipWithIndex[T](iterator: Iterator[T], startIndex: Long): Iterator[(T, Long)] = {
    new Iterator[(T, Long)] {
      require(startIndex >= 0, "startIndex should be >= 0.")
 
      var index: Long = startIndex - 1L
      def hasNext: Boolean = iterator.hasNext
      // next返回?cái)?shù)據(jù)和其index的元組。
      def next(): (T, Long) = {
        index += 1L
        (iterator.next(), index)
      }
    }
  }

2. action

這小節(jié)里列出action析藕,和transform不同召廷,action會(huì)觸發(fā)job的提交運(yùn)行。

  1. reduce
    原型如下:
def reduce(f: (T, T) => T): T = withScope {
  val cleanF = sc.clean(f)
  /* 這個(gè)函數(shù)作為runJob的第二個(gè)參數(shù)账胧,作用于一個(gè)job里的最后一個(gè)階段(ResultStage)每一個(gè)分區(qū)竞慢。
      這個(gè)函數(shù)干了什么: 接受一個(gè)上游parition上的迭代器,然后調(diào)用迭代器的reduceLeft, reduceLeft使用函數(shù)f來(lái)對(duì)數(shù)據(jù)做reduce治泥。
      所以這個(gè)函數(shù)完成了ResultStage的每一個(gè)分區(qū)的reduce筹煮,不是全局的reduce
*/
  val reducePartition: Iterator[T] => Option[T] = iter => {
    if (iter.hasNext) {
      Some(iter.reduceLeft(cleanF))
    } else {
      None
    }
  }
  var jobResult: Option[T] = None
   /* 這個(gè)函數(shù)作為sc.runJob的第三個(gè)參數(shù),當(dāng)reducePartition完成每一個(gè)分區(qū)的reduce之后居夹,
      用來(lái)對(duì)每一個(gè)分區(qū)的reduce結(jié)果合并败潦,index是分區(qū)索引,taskResult即分區(qū)計(jì)算結(jié)果准脂。
      它干了什么:同樣適用函數(shù)f來(lái)對(duì)結(jié)果做規(guī)約劫扒,完成全局的reduce。
*/
  val mergeResult = (index: Int, taskResult: Option[T]) => {
    if (taskResult.isDefined) {
      jobResult = jobResult match {
        case Some(value) => Some(f(value, taskResult.get))
        case None => taskResult
      }
    }
  }
  /* 提交job狸膏,runJob需要兩個(gè)參數(shù)粟关,reducePartition作用于每個(gè)分區(qū)之上,也就是在executor上運(yùn)行;
     mergeResult運(yùn)行于driver端闷板,收集每一個(gè)分區(qū)的結(jié)果到driver端澎灸,然后對(duì)這些結(jié)果運(yùn)行mergeResult,如果每一個(gè)分區(qū)產(chǎn)生的結(jié)果很大的話遮晚,顯然reduce可能會(huì)在driver端出現(xiàn)OOM
  */
  sc.runJob(this, reducePartition, mergeResult)
 
  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

這個(gè)和reduceByKey不一樣性昭,reduceByKey是一個(gè)transform操作,會(huì)產(chǎn)生一個(gè)新的RDD(與上游RDD形成ShuffleDependency)县遣,這里的reduce是一個(gè)action糜颠,會(huì)觸發(fā)job的提交(上面代碼中sc.runJob);此外reduceByKey要求輸入數(shù)據(jù)必須是(key,value)的二元tuple萧求,而此處的reduce則不需要其兴。

  1. aggregate
    aggregate也是對(duì)值做聚合操作的,但是和reduce還是不同的夸政,下面是其方法原型:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    //作用于每個(gè)分區(qū)中的數(shù)據(jù)元旬,對(duì)每個(gè)分區(qū)中的數(shù)據(jù)聚合。運(yùn)行于executor上
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    //運(yùn)行在driver端守问,對(duì)收集回來(lái)的每個(gè)分區(qū)的聚合結(jié)果再一次聚合匀归。
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
    // 和reduce一樣,先用aggregatePartition在每一個(gè)分區(qū)上運(yùn)行聚合分區(qū)的數(shù)據(jù)耗帕,然后獲取所有分區(qū)的數(shù)據(jù)穆端,使用mergeResult在Driver端聚合,同樣從在Driver端OOM的可能仿便。
    sc.runJob(this, aggregatePartition, mergeResult)
    jobResult
  }

reduce和aggregate聚合值的區(qū)別從方法簽名就可以看出体啰,reduce聚合前后的值的類型是一樣的,比如說(shuō)你不能用reduce把一個(gè)int值拼成string返回嗽仪。aggregate則可以把一種類型(T)的值聚合成另一種類型(U)返回荒勇。

上面aggregate方法,U是聚合后類型钦幔,T是聚合前類型; 參數(shù)zeroValue提供一個(gè)初始值,seqOp定義怎么把T聚合到U上常柄,combOp定義怎么把多個(gè)分區(qū)聚合后的值拼起來(lái)鲤氢。

下面是一個(gè)例子,把Int拼接成字符串

// RDD r1 包含1 to 10的整型
scala> r1.collect
res63: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// seq, 拼接字符串和整型
scala> def seq(s:String, i:Int):String = { s + i}
// comb,拼接分區(qū)聚合后的字符串
scala> def comb(s1:String, s2:String):String = { s1 + s2}
// 初始值z(mì)eroValue為空字符串
scala> r1.aggregate("")(seq,comb)
res64: String = 12345678910
  1. treeAggregate
    treeAggregate和aggregate功能上是一樣的,但是實(shí)現(xiàn)細(xì)節(jié)不一樣西潘,下面treeAggregate的實(shí)現(xiàn):
  def treeAggregate[U: ClassTag](zeroValue: U)(
    seqOp: (U, T) => U,
    combOp: (U, U) => U,
    depth: Int = 2): U = withScope {
  require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
  if (partitions.length == 0) {
    Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
  } else {
    val cleanSeqOp = context.clean(seqOp)
    val cleanCombOp = context.clean(combOp)
    val aggregatePartition =
      (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
    var numPartitions = partiallyAggregated.partitions.length
    val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
    // If creating an extra level doesn't help reduce
    // the wall-clock time, we stop tree aggregation.

    // Don't trigger TreeAggregation when it doesn't save wall-clock time
    while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {
      numPartitions /= scale
      val curNumPartitions = numPartitions
      partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
        (i, iter) => iter.map((i % curNumPartitions, _))
      }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
    }
    partiallyAggregated.reduce(cleanCombOp)
  }
}

前文說(shuō)到aggregate的過(guò)程是:先使用seqOp在各個(gè)分區(qū)上聚合卷玉,然后將分區(qū)結(jié)果全部拿到Driver端,然后使用combOp聚合喷市,過(guò)多的分區(qū)的數(shù)據(jù)被移到driver端可能會(huì)導(dǎo)致driver上OOM相种;treeAggregate不同之處在于,分區(qū)聚合之后不馬上把結(jié)果傳回driver端聚合品姓,而是調(diào)用reduceByKey再在遠(yuǎn)端按key聚合到更小的分區(qū)寝并,如有必要還會(huì)經(jīng)過(guò)多輪的reduceByKey箫措,不停的把值聚合到更小的分區(qū)上,最終傳回driver端做最終聚合衬潦。下圖可以反應(yīng)出aggregate和treeAggregate的過(guò)程上的區(qū)別:

aggregate和treeAggregate
  1. fold
    和scala集合上的fold功能一樣斤蔓,實(shí)現(xiàn)原理和reduce一樣,現(xiàn)在每一個(gè)分區(qū)上fold镀岛,然后結(jié)果傳回driver在merge.

  2. take
    方法原型如下:

     def take(num: Int): Array[T]
    

    take接收一個(gè)整型參數(shù)num弦牡,返回rdd中前num個(gè)數(shù)(從第1個(gè)partition的第1個(gè)數(shù)開(kāi)始的num個(gè)數(shù)).
    take的思路大概是這樣的:

    1. 使用一個(gè)ArrayBuffer buf保存返回結(jié)果,buf.size就表示已經(jīng)取到的結(jié)果漂羊,一開(kāi)始時(shí)顯然為0.
    2. 開(kāi)始時(shí)將運(yùn)行task的分區(qū)設(shè)成一個(gè)(也就是第一個(gè)partition0),因?yàn)椴恢狼皀um個(gè)元素會(huì)橫跨多少parition驾锰,先嘗試1個(gè)
    3. 運(yùn)行job在分區(qū)上取前num - buf.size(也就是還需要取的個(gè)數(shù)),放到buf中走越。
    4. 判斷buf.size有沒(méi)有達(dá)到num椭豫,沒(méi)有進(jìn)入4. 達(dá)到就可以返回了。
    5. 按照某種比例擴(kuò)大下一輪運(yùn)行任務(wù)的分區(qū)個(gè)數(shù)买喧,下一次job運(yùn)行的的分區(qū)的索引為成區(qū)間[上一次任務(wù)運(yùn)行最大分區(qū)索引 +1 , 上一次任務(wù)運(yùn)行最大分區(qū)索引 +下一輪分區(qū)個(gè)數(shù)]捻悯, 回到2繼續(xù)運(yùn)行。 (比如在partition0上數(shù)據(jù)不夠num個(gè)淤毛,只有num1個(gè)今缚,那么假設(shè)下一次擴(kuò)大到在兩個(gè)分區(qū)上運(yùn)行,那么下一輪就在[partition-1,partition-2] 上取num - num1個(gè)數(shù)據(jù))低淡。
  3. top, takeOrdered
    這是兩個(gè)方法姓言,放在一起是因?yàn)閠op是基于調(diào)用takeOrdered實(shí)現(xiàn)的,它們的方法原型如下:

  // top返回最大的前num個(gè)數(shù)蔗蹋,元素排序由ord定義何荚,ord比較x,y, 返回負(fù)數(shù)表示x<y, 0表示x==y猪杭。
   def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    //takeOrdered默認(rèn)是從小到大返回的餐塘,所以此處使用ord.reverse顛倒排序
     takeOrdered(num)(ord.reverse)
   }
   //takeOrdered返回最小的的num個(gè)數(shù),排序由ord定義
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
   if (num == 0) {
     Array.empty
   } else {
    /* mapPartitions表示在每一個(gè)分區(qū)上運(yùn)行皂吮,queue相當(dāng)于一個(gè)大小為num的大根堆戒傻,維持當(dāng)前已經(jīng)迭代(items迭代器)的最小的num個(gè)值.
       mapParititons生成的新rdd mapRDDs的每一個(gè)parititon擁有之前上游rdd
       每個(gè)parititon的最小的num個(gè)元素
   */
     val mapRDDs = mapPartitions { items =>
       // Priority keeps the largest elements, so let's reverse the ordering.
       val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
       queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
       Iterator.single(queue)
     }
     if (mapRDDs.partitions.length == 0) {
       Array.empty
     } else {
      // 可以回顧下reduce action,獲取到每一個(gè)分區(qū)返回的最小的num個(gè)元素值蜂筹。然后合并這些值就能得到rdd中最小的num個(gè)元素需纳。
       mapRDDs.reduce { (queue1, queue2) =>
         queue1 ++= queue2
         queue1
       }.toArray.sorted(ord)
     }
   }
 }

上面takeOrdered使用了RDD#reduce這個(gè)方法將每個(gè)分區(qū)的最小的num個(gè)數(shù)傳會(huì)driver,在driver比較獲得全局最小的num個(gè)數(shù)艺挪,如果num值很大的話會(huì)造成driver OOM

  1. max不翩,min
    獲取rdd中最大和最小值
def max()(implicit ord: Ordering[T]): T = withScope {
  // 回顧reduce,接收(T,T) => T的函數(shù),ord.max方法比較兩個(gè)值口蝠,返回大的器钟。
 // reduce現(xiàn)在每個(gè)partition上運(yùn)行ord.max取得partition最大的值,然后將這些值返回給driver端亚皂,得到最大的值俱箱。
  this.reduce(ord.max)
}

def min()(implicit ord: Ordering[T]): T = withScope {
  this.reduce(ord.min)
}

3 附錄

3.1 cogroup

3.1.1 cogroup的作用

首先cogroup是PairRDDFunctions中定義的方法,它只能作用于元素類型是(key,value)二元組型這樣的rdd灭必, cogroup可以接收多個(gè)rdd作為參數(shù)進(jìn)行操作狞谱,但是為了方便,這里只假設(shè)有兩個(gè)rdd: r1禁漓, r2.
r1跟衅,r2 cogroup產(chǎn)生新的rdd r3: r3的key包含了r1,r2的所有的key播歼,對(duì)于key的value是一個(gè)數(shù)組伶跷,數(shù)組組的元素依次是key在r1和r2中所有的value的數(shù)組。
下面是一個(gè)例子:

//r1, r2是國(guó)家到城市的二元組
val r1 = sc.parallelize(List(("china","hefei"),("USA","chicago"),("japan","tokyo")))
val r2 = sc.parallelize(List(("china","beijing"),("USA","new york"),("china","shanghai")))
r1.cogroup(r2).collect
//輸出秘狞,CompactBuffer可以理解成數(shù)組叭莫,可以看到key包含了r1,r2的所有的key,
Array((japan,(CompactBuffer(tokyo),CompactBuffer())), (USA,(CompactBuffer(chicago),CompactBuffer(new york))), (china,(CompactBuffer(hefei),CompactBuffer(beijing, shanghai))))

3.1.2 cogroup原理

cogroup會(huì)產(chǎn)生CoGroupedRDD烁试,直接看他的實(shí)現(xiàn)吧:

//rdd即參與cogroup的所有rdd雇初,是一個(gè)數(shù)組,所以可以有多個(gè)rdd减响。
//類型化參數(shù)'_ <: Product2[K, _]'表明rdd的元素必須是二元組靖诗,而且所有的rdd的key類型得是一樣的.
//part默認(rèn)是HashPartitioner
class CoGroupedRDD[K: ClassTag](
    @transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
    part: Partitioner)
  extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {

  /**
   1. 這里定義了新類型,CoGroup即來(lái)源于某一個(gè)上游rdd的key的value組成的數(shù)組.
   2. CoGroupValue,二元組,第一個(gè)元素Any類型是上游rdd中value,注意上游rdd的類型是(key,value),這里是提取value出來(lái)的,
      第二個(gè)元素Int是上游rdd在在dependencies列表中的index儒老,也就是第一個(gè)元素來(lái)源于的那個(gè)rdd。
   3. CoGroupCombiner妥凳, 數(shù)組,每個(gè)元素是一個(gè)CoGroup,也就是說(shuō)第i元素
      就是key在第I個(gè)rdd中所有value組成的數(shù)組。
   */
  private type CoGroup = CompactBuffer[Any]
  private type CoGroupValue = (Any, Int)  // Int is dependency number
  private type CoGroupCombiner = Array[CoGroup]

  private var serializer: Serializer = SparkEnv.get.serializer

  /** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */
  def setSerializer(serializer: Serializer): CoGroupedRDD[K] = {
    this.serializer = serializer
    this
  }

  override def getDependencies: Seq[Dependency[_]] = {
    //獲取依賴時(shí)败晴,遍歷上游所有rdd
    rdds.map { rdd: RDD[_] =>
      //上游用的partitioner和當(dāng)前CoGroupedRDD一樣,默認(rèn)的HashPartitioner相同的判斷標(biāo)準(zhǔn)時(shí)產(chǎn)生一樣的分區(qū)個(gè)數(shù)颖医,RangeParitioner復(fù)雜一點(diǎn)位衩。
      // 不管如果裆蒸,相同就意味著上游rdd是通過(guò)shuffle產(chǎn)生的熔萧,所有的元素已經(jīng)按照key聚合到對(duì)應(yīng)的partiton了,
      // 當(dāng)前RDD和上游rdd的分區(qū)直接可以一對(duì)一依賴,不同再shuffle一次聚合key了佛致。
      if (rdd.partitioner == Some(part)) {
        logDebug("Adding one-to-one dependency with " + rdd)
        new OneToOneDependency(rdd)
      } else {
       // 否則的話只好shuffle一次贮缕,按key聚合好
        logDebug("Adding shuffle dependency with " + rdd)
        new ShuffleDependency[K, Any, CoGroupCombiner](
          rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
      }
    }
  }

  override def getPartitions: Array[Partition] = {
    val array = new Array[Partition](part.numPartitions)
    for (i <- 0 until array.length) {
      // Each CoGroupPartition will have a dependency per contributing RDD
      array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
        // Assume each RDD contributed a single dependency, and get it
        dependencies(j) match {
          case s: ShuffleDependency[_, _, _] =>
            None
          case _ =>
            Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
        }
      }.toArray)
    }
    array
  }

  override val partitioner: Some[Partitioner] = Some(part)

  override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
    val split = s.asInstanceOf[CoGroupPartition]
    val numRdds = dependencies.length

    /* 看看做了什么: 首先返回的是迭代器數(shù)組,包含對(duì)每一個(gè)上游rdd的迭代.
        其次迭代的元素類型是一個(gè)二元組俺榆,第一個(gè)元素類型‘Product2[K, Any]’表明它是上游rdd里的數(shù)據(jù)感昼, 第二個(gè)元素Int則表明第一個(gè)元素所屬的rdd
    */
    val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
    for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
     // 跟上有一對(duì)一依賴就簡(jiǎn)單很多了,直接取到依賴的上游parition罐脊,返回?cái)?shù)據(jù)和上有rdd的索引就行了定嗓。
      case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
        val dependencyPartition = split.narrowDeps(depNum).get.split
        val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
        rddIterators += ((it, depNum))

      case shuffleDependency: ShuffleDependency[_, _, _] =>
       // 跟上游shuffle依賴,那么就需要有shuffle read的過(guò)程萍桌,不提細(xì)節(jié)宵溅,總之shuffle read完成之后,
       //會(huì)從上游所有rdd中收集了屬于當(dāng)前CoGroupedRDD的當(dāng)前分區(qū)的所有元素, 
        val it = SparkEnv.get.shuffleManager
          .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
          .read()
        rddIterators += ((it, depNum))
    }
    
   /* 返回了一個(gè)類型是‘ ExternalAppendOnlyMap’的東西上炎,它是干什么的呢恃逻,簡(jiǎn)單說(shuō), 這個(gè)map有按key聚合的作用,就像reduceBy一樣藕施。
      當(dāng)你往里面插一個(gè)元素時(shí)寇损,它會(huì)按照你定義的combine和merger函數(shù),把相同的key的元素聚合起來(lái)
   */
    val map = createExternalMap(numRdds)
    //迭代上游數(shù)據(jù)裳食,根據(jù)前面rddIterator的定義矛市,此處it是上游rdd中的數(shù)據(jù),類型應(yīng)該是(key,value)的,depNum是rdd索引 
    for ((it, depNum) <- rddIterators) {
    // map要求插入的元素必須是(K,V)型的胞谈,這里的pair._1就是rdd中的key尘盼,value是CoGroupValue介紹過(guò),所以map會(huì)按照key來(lái)聚合烦绳。
   //所以關(guān)鍵是map的combiner和merger的實(shí)現(xiàn)
      map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
    }
    context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
    context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
    context.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
    //map聚合好之后卿捎,返回新的迭代器,返回InterruptibleIterator表示它可以被中途取消
    new InterruptibleIterator(context,
      map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
  }

  private def createExternalMap(numRdds: Int)
    : ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
    // createCombiner用來(lái)在出現(xiàn)第一個(gè)元素時(shí)径密,將該元素轉(zhuǎn)換成聚合后的元素午阵,可能是列表之類的,什么都可以
    val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
      
      val newCombiner = Array.fill(numRdds)(new CoGroup)
     //value._2是rdd索引享扔,value._1是rdd數(shù)據(jù)(key,value)中的value底桂,這句表示value加到數(shù)組中。
      newCombiner(value._2) += value._1
      newCombiner
    }
    //將元素合并到聚合后的新類型元素上惧眠,還是往數(shù)組里加
    val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
      (combiner, value) => {
      combiner(value._2) += value._1
      combiner
    }
   // 將兩個(gè)聚合后的新類型合并籽懦,合并兩個(gè)數(shù)組
    val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
      (combiner1, combiner2) => {
        var depNum = 0
        while (depNum < numRdds) {
          combiner1(depNum) ++= combiner2(depNum)
          depNum += 1
        }
        combiner1
      }
    new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
      createCombiner, mergeValue, mergeCombiners)
  }

  override def clearDependencies() {
    super.clearDependencies()
    rdds = null
  }
}

  1. 關(guān)于shuffle read可以參考shuffle read第三節(jié)
  2. 關(guān)于ExternalAppendOnlyMap可以參考ExternalAppendOnlyMap4.3節(jié)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市氛魁,隨后出現(xiàn)的幾起案子暮顺,更是在濱河造成了極大的恐慌厅篓,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,214評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件捶码,死亡現(xiàn)場(chǎng)離奇詭異羽氮,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)惫恼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門档押,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人祈纯,你說(shuō)我怎么就攤上這事令宿。” “怎么了腕窥?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,543評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵掀淘,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我油昂,道長(zhǎng)革娄,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,221評(píng)論 1 279
  • 正文 為了忘掉前任冕碟,我火速辦了婚禮拦惋,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘安寺。我一直安慰自己厕妖,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布挑庶。 她就那樣靜靜地躺著言秸,像睡著了一般。 火紅的嫁衣襯著肌膚如雪迎捺。 梳的紋絲不亂的頭發(fā)上举畸,一...
    開(kāi)封第一講書(shū)人閱讀 49,007評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音凳枝,去河邊找鬼抄沮。 笑死,一個(gè)胖子當(dāng)著我的面吹牛岖瑰,可吹牛的內(nèi)容都是我干的叛买。 我是一名探鬼主播,決...
    沈念sama閱讀 38,313評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼蹋订,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼率挣!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起露戒,我...
    開(kāi)封第一講書(shū)人閱讀 36,956評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤椒功,失蹤者是張志新(化名)和其女友劉穎娃圆,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體蛾茉,經(jīng)...
    沈念sama閱讀 43,441評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評(píng)論 2 323
  • 正文 我和宋清朗相戀三年撩鹿,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了谦炬。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,018評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡节沦,死狀恐怖键思,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情甫贯,我是刑警寧澤吼鳞,帶...
    沈念sama閱讀 33,685評(píng)論 4 322
  • 正文 年R本政府宣布,位于F島的核電站叫搁,受9級(jí)特大地震影響赔桌,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜渴逻,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評(píng)論 3 307
  • 文/蒙蒙 一疾党、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧惨奕,春花似錦雪位、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,240評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至卧波,卻和暖如春时肿,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背港粱。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,464評(píng)論 1 261
  • 我被黑心中介騙來(lái)泰國(guó)打工嗜侮, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人啥容。 一個(gè)月前我還...
    沈念sama閱讀 45,467評(píng)論 2 352
  • 正文 我出身青樓锈颗,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親咪惠。 傳聞我的和親對(duì)象是個(gè)殘疾皇子击吱,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容