Spark RDD 算子總結(jié)

Spark算子總結(jié)

算子分類

  • Transformation(轉(zhuǎn)換)

    轉(zhuǎn)換算子 含義
    map(func) 返回一個(gè)新的RDD媒熊,該RDD由每一個(gè)輸入元素經(jīng)過func函數(shù)轉(zhuǎn)換后組成
    filter(func) 過濾, 返回一個(gè)新的RDD, 該RDD由經(jīng)過func函數(shù)計(jì)算后返回值為true的輸入元素組成
    flatMap(func) 類似于map氧卧,但是每一個(gè)輸入元素可以被映射為0或多個(gè)輸出元素(所以func應(yīng)該返回一個(gè)序列窝撵,而不是單一元素)
    mapPartitions(func) 類似于map豌汇,但獨(dú)立地在RDD的每一個(gè)分片上運(yùn)行廊宪,因此在類型為T的RDD上運(yùn)行時(shí)爪喘,func的函數(shù)類型必須是Iterator[T] => Iterator[U]
    mapPartitionsWithIndex(func) 類似于mapPartitions蹄咖,但func帶有一個(gè)整數(shù)參數(shù)表示分片的索引值根盒,因此在類型為T的RDD上運(yùn)行時(shí)钳幅,func的函數(shù)類型必須是(Int, Interator[T]) => Iterator[U]
    union(otherDataset) 對(duì)源RDD和參數(shù)RDD求并集后返回一個(gè)新的RDD
    Intersection(otherDataset) 對(duì)源RDD和參數(shù)RDD求交集后返回一個(gè)新的RDD
    groupByKey([numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用,返回一個(gè)(K, Iterator[V])的RDD
    reduceByKey(func, [numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用炎滞,返回一個(gè)(K,V)的RDD敢艰,使用指定的reduce函數(shù),將相同key的值聚合到一起册赛,與groupByKey類似钠导,reduce任務(wù)的個(gè)數(shù)可以通過第二個(gè)可選的參數(shù)來設(shè)置
    sortByKey([ascending], [numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用,K必須實(shí)現(xiàn)Ordered接口森瘪,返回一個(gè)按照key進(jìn)行排序的(K,V)的RDD
    sortBy(func,[ascending],[numTasks]) 與sortByKey類似牡属,但是更靈活
    join(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調(diào)用,返回一個(gè)相同key對(duì)應(yīng)的所有元素對(duì)在一起的(K,(V,W))的RDD
    cogroup(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調(diào)用扼睬,返回一個(gè)(K,(Iterable,Iterable))類型的RDD
    coalesce(numPartitions) 減少 RDD 的分區(qū)數(shù)到指定值
    repartition 對(duì) RDD 重新分區(qū)
    repartitionAndSortWithinPartitions(partitioner) 重新給 RDD 分區(qū)逮栅,并且每個(gè)分區(qū)內(nèi)以記錄的 key 排序
  • Action(行為)

    行為 含義
    reduce(func) reduce將RDD中元素前兩個(gè)傳給輸入函數(shù),產(chǎn)生一個(gè)新的return值窗宇,新產(chǎn)生的return值與RDD中下一個(gè)元素(第三個(gè)元素)組成兩個(gè)元素措伐,再被傳給輸入函數(shù),直到最后只有一個(gè)值為止蝇完。
    collect() 在驅(qū)動(dòng)程序中卿操,以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素
    count 返回RDD的元素個(gè)數(shù)
    first 返回RDD的第一個(gè)元素(類似于take(1))
    take(n) 返回一個(gè)由數(shù)據(jù)集的前n個(gè)元素組成的數(shù)組
    takeOrdered(n,[ordering]) 返回自然順序或者自定義順序的前 n 個(gè)元素
    saveAsTextFile(path) 將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng)镶奉,對(duì)于每個(gè)元素,Spark將會(huì)調(diào)用toString方法边锁,將它裝換為文件中的文本
    saveAsSequenceFile(path) 將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)仅父。
    saveAsObjectFile(path) 將數(shù)據(jù)集的元素,以 Java 序列化的方式保存到指定的目錄下
    countByKey() 針對(duì)(K,V)類型的RDD洒放,返回一個(gè)(K,Int)的map,表示每一個(gè)key對(duì)應(yīng)的元素個(gè)數(shù)散罕。
    foreach 在數(shù)據(jù)集的每一個(gè)元素上谱净,運(yùn)行函數(shù)func
    foreachPartition(func) 在數(shù)據(jù)集的每一個(gè)分區(qū)上导盅,運(yùn)行函數(shù)func
  • Manipulate(控制)

    控制 含義
    cache 對(duì)于重復(fù)使用的算子,進(jìn)行cache做緩存使用,數(shù)據(jù)只保存在內(nèi)存中,性能提升(等價(jià)于Memory_Only)
    persist 根據(jù)Storagelevel 將數(shù)據(jù) 持久化, 從而提升性能
    checkPoint 數(shù)據(jù)容錯(cuò),當(dāng)數(shù)據(jù)計(jì)算的時(shí)候,機(jī)器掛了,重新追溯到checkPoint的目錄下衷恭。checkPoint是將RDD持久化到磁盤中,還可以切斷RDD之間的依賴關(guān)系

Transformation

  • map(func)

    • 源碼

      /**
         * Return a new RDD by applying a function to all elements of this RDD.
         * 通過對(duì)一個(gè)RDD的所有元素應(yīng)用一個(gè)函數(shù)來返回一個(gè)新的RDD
         */
        def map[U: ClassTag](f: T => U): RDD[U] = withScope {
            // 清理函數(shù)閉包以確保其是可序列化的, 并發(fā)送至tasks
          val cleanF = sc.clean(f)
          new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
        }
      
  • 案例

    package com.ronnie.scala.WordCount
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    object WordCount {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("WC")
        val sc = new SparkContext(conf)
        val lines : RDD[String] = sc.textFile("./resources/words.txt")
        val word : RDD[String] = lines.flatMap(lines => {
          lines.split(" ")
        })
        val pairs : RDD[(String, Int)] = word.map(x => (x, 1))
        val result = pairs.reduceByKey((a,b) => {a + b})
        result.sortBy(_._2, false).foreach(println)
    
        // 簡(jiǎn)化寫法
        lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println)
      }
    }
    
  • filter(func)

    • 源碼

        /**
         * Return a new RDD containing only the elements that satisfy a predicate.
         * 返回一個(gè)僅含有滿足斷言條件的元素的新RDD
         */
        def filter(f: T => Boolean): RDD[T] = withScope {
            // 清理函數(shù)閉包以確保其是可序列化的, 并發(fā)送至tasks
          val cleanF = sc.clean(f)
          new MapPartitionsRDD[T, T](
            this,
              // => Iterator.filter
            (context, pid, iter) => iter.filter(cleanF),
            preservesPartitioning = true)
        }
      
      /** Returns an iterator over all the elements of this iterator that satisfy the predicate `p`.
      返回當(dāng)前迭代器中所有滿足斷言 `p` 的元素的一個(gè)迭代器
         *  The order of the elements is preserved.
         *
         *  @param p the predicate used to test values. p: 測(cè)試值的斷言條件
         *  @return  an iterator which produces those values of this iterator which satisfy the predicate `p`.
         返回 滿足斷言 `p` 的元素的一個(gè)迭代器
         *  @note    Reuse: $consumesAndProducesIterator 重用消費(fèi)生產(chǎn)迭代器
         */
        def filter(p: A => Boolean): Iterator[A] = new AbstractIterator[A] {
          // TODO 2.12 - Make a full-fledged FilterImpl that will reverse sense of p
          // 創(chuàng)建一個(gè) 完全有效的實(shí)現(xiàn)方式來逆轉(zhuǎn) p 的感知  
          private var hd: A = _
          private var hdDefined: Boolean = false // 反正是個(gè)標(biāo)簽, 為啥叫hdDefined我是沒轍
      
          def hasNext: Boolean = hdDefined || {
            do {
              // 沒有下一個(gè)元素就返回 false  
              if (!self.hasNext) return false
              // 迭代下一個(gè)元素  
              hd = self.next()
                // 對(duì)該元素進(jìn)行斷言, 只要p(hd) 不為 false, 就將 hdDefined 標(biāo)簽設(shè)置為 true, 并 返回true
            } while (!p(hd))
            hdDefined = true
            true
          }
          // 每次迭代下一個(gè)元素前判斷一下是否有下一個(gè)元素, 有就將hdDefined 標(biāo)簽重置為 false, 返回下一個(gè)元素, 沒有就拋出 NoSuchElementException 異常
            // (empty.next() => Iterator.empty )
          def next() = if (hasNext) { hdDefined = false; hd } else empty.next()
        }
      
      /** The iterator which produces no values. 沒有值的迭代器*/
        val empty: Iterator[Nothing] = new AbstractIterator[Nothing] {
          def hasNext: Boolean = false
          def next(): Nothing = throw new NoSuchElementException("next on empty iterator")
        }
      
  • 案例

    package com.ronnie.scala
    
    import java.io.StringReader
    
    import au.com.bytecode.opencsv.CSVReader
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object DataExtraction {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("WC")
        val sc = new SparkContext(conf)
    
        val input: RDD[String] = sc.textFile("./resources/gpu.csv")
        val result: RDD[Array[String]] = input.map { line =>
          val reader = new CSVReader(new StringReader(line))
          reader.readNext()
        }
        result.foreach(x => {
          x.flatMap(_.split(" ")).foreach(_.split(" ").filter(y =>y.contains("580") ).foreach(_.split(" ").filter(x => x.equals("RX580")).foreach(println)))
        })
      }
    
  • flatMap(func)

    • 源碼

        /**
         *  Return a new RDD by first applying a function to all elements of this
         *  RDD, and then flattening the results.
         * 先對(duì)該 RDD 中的每一個(gè)元素應(yīng)用一個(gè)函數(shù), 并對(duì)結(jié)果進(jìn)行扁平化處理, 再返回一個(gè)新的 RDD
         */
        def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
          val cleanF = sc.clean(f)
            // => Iterator.flatMap
          new MapPartitionsRDD[U, T](this, (context, pid, iter) => 
                                     iter.flatMap(cleanF))
        }
      
      /** Creates a new iterator by applying a function to all values produced by this iterator and concatenating the results.
      對(duì)該迭代器所提供的值 應(yīng)用一個(gè)函數(shù) 并合并這些結(jié)果 來創(chuàng)建一個(gè)迭代器, 
         *
         *  @param f the function to apply on each element. f: 應(yīng)用函數(shù)
         *  @return  the iterator resulting from applying the given iterator-valued function `f` to each value produced by this iterator and concatenating the results.
         * 返回 對(duì)該迭代器所提供的值 應(yīng)用一個(gè)函數(shù) 并合并這些結(jié)果 所產(chǎn)生的迭代器
         *  @note    Reuse: $consumesAndProducesIterator 重用消費(fèi)生產(chǎn)迭代器
         */
        def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
            // 當(dāng)前迭代器
          private var cur: Iterator[B] = empty
            // 將當(dāng)前迭代器 替換為下一個(gè)迭代器
          private def nextCur() { cur = f(self.next()).toIterator }
          def hasNext: Boolean = {
            // Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext }
            // 等價(jià)于 (當(dāng)前迭代器有下一個(gè)值) || (迭代器本身有下一個(gè)值 && 下一個(gè)迭代器有下一個(gè)值)  
            // but slightly shorter bytecode (better JVM inlining!)
            // 但是下面這種寫法有 編譯成的字節(jié)碼會(huì)更短, 利于JVM內(nèi)聯(lián)  
            while (!cur.hasNext) {
              if (!self.hasNext) return false
              nextCur()
            }
            true
          }
            // 如果有下一個(gè)就返回當(dāng)前迭代器, 否則就返回空迭代器, 并繼續(xù)迭代
          def next(): B = (if (hasNext) cur else empty).next()
        }
      
  • 案例

    • 見 map 案例
  • mapPartitions(func)

    • 源碼

      /**
         * Return a new RDD by applying a function to each partition of this RDD.
         * 通過對(duì)當(dāng)前 RDD 的每個(gè)分區(qū)應(yīng)用一個(gè)函數(shù)來創(chuàng)建一個(gè)新的 RDD
         * `preservesPartitioning` indicates whether the input function preserves the partitioner, which should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
         * `preservesPartitioning(保存分區(qū))` 標(biāo)簽 表明了 輸入函數(shù)是否保存分區(qū)器, 該標(biāo)簽?zāi)J(rèn)為 false, 除非這是個(gè) 成對(duì)的RDD 并且 傳入函數(shù)并沒有修改 key
         */
        def mapPartitions[U: ClassTag](
            f: Iterator[T] => Iterator[U],
            preservesPartitioning: Boolean = false): RDD[U] = withScope {
          val cleanedF = sc.clean(f)
          new MapPartitionsRDD(
            this,
            (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
            preservesPartitioning)
        }
      
       /**
         * [performance] Spark's internal mapPartitions method that skips closure cleaning.
         * Spark 內(nèi)部的 mapPartitions 方法, 跳過了 清理閉包的步驟
         */
        private[spark] def mapPartitionsInternal[U: ClassTag](
            f: Iterator[T] => Iterator[U],
            preservesPartitioning: Boolean = false): RDD[U] = withScope {
          new MapPartitionsRDD(
            this,
            (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter),
            preservesPartitioning)
        }
      
  • 案例

    package com.ronnie.scala.core.transform_operator
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.mutable.ListBuffer
    
    object Operator_mapPartitions {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("mapPartitions")
        val sc = new SparkContext(conf)
        val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
    
        val rdd01 = rdd.mapPartitions(iter =>{
          val list = new ListBuffer[(Int, Int)]()
          while (iter.hasNext){
            val next = iter.next()
            list += Tuple2(next, next * 2)
          }
          list.iterator
        }, false)
        rdd01.foreach(x => print(x + " "))
      }
    }
    
  • mapPartitionsWithIndex(func)

    • 源碼

      /**
         * Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
         * 通過對(duì)每個(gè) RDD 的分區(qū)應(yīng)用一個(gè)函數(shù)來創(chuàng)建一個(gè)新的 RDD, 在此過程中追蹤 初始 分區(qū)的 參數(shù)(下標(biāo)) 
         * `preservesPartitioning` indicates whether the input function preserves the partitioner, which should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
         * `preservesPartitioning(保存分區(qū))` 標(biāo)簽 表明了 輸入函數(shù)是否保存分區(qū)器, 該標(biāo)簽?zāi)J(rèn)為 false, 除非這是個(gè) 成對(duì)的RDD 并且 傳入函數(shù)并沒有修改 key
         */
        def mapPartitionsWithIndex[U: ClassTag](
            f: (Int, Iterator[T]) => Iterator[U],
            preservesPartitioning: Boolean = false): RDD[U] = withScope {
          val cleanedF = sc.clean(f)
          new MapPartitionsRDD(
            this,
            (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
            preservesPartitioning)
        }
      
```scala
/**
   * [performance] Spark's internal mapPartitionsWithIndex method that skips closure cleaning.
   跳過了閉包清理的 Spark 的內(nèi)部 mapPartitionsWithIndex 方法
   * It is a performance API to be used carefully only if we are sure that the RDD elements are serializable and don't require closure cleaning.
    這是一個(gè)性能 應(yīng)當(dāng)被小心使用的 API, 只有我們確認(rèn) 該 RDD 中的元素是可序列化的 并且不需要 閉包清理 時(shí)才使用邀摆。
   *
   * @param preservesPartitioning indicates whether the input function preserves the partitioner, which should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
   *`preservesPartitioning(保存分區(qū))` 標(biāo)簽 表明了 輸入函數(shù)是否保存分區(qū)器, 該標(biāo)簽?zāi)J(rèn)為 false, 除非這是個(gè) 成對(duì)的RDD 并且 傳入函數(shù)并沒有修改 key
   */
  private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
      f: (Int, Iterator[T]) => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
      preservesPartitioning)
  }
```
  • 案例

    package com.ronnie.scala.core.transform_operator
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.mutable.ListBuffer
    
    object Operator_mapPartitionsWithIndex {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("mapPartitionsWithIndex")
        val sc = new SparkContext(conf)
        val rdd = sc.makeRDD(List("a", "b", "c"), 2)
    
    
        rdd.mapPartitionsWithIndex((index, iter) => {
          val list = new ListBuffer[String]()
          while (iter.hasNext){
            val v = iter.next()
            list.+(v)
            println("index = " + index + ", value = " + v)
          }
          list.iterator
        },false).foreach(println)
    
        sc.stop()
      }
    }
    
  • union(otherDataset)

    • 源碼

      /**
         * Return the union of this RDD and another one. Any identical elements will appear multiple times (use `.distinct()` to eliminate them).
         * 返回當(dāng)前 RDD 和 另一個(gè) RDD 的并集, 所有標(biāo)識(shí)的元素都會(huì)出現(xiàn)多次(可以使用 distinct() 算子去重)
         */
        def union(other: RDD[T]): RDD[T] = withScope {
          sc.union(this, other) // => SparkContext.union
        }
      
      /** 
         * union 的簡(jiǎn)略形式: ++
         * Return the union of this RDD and another one. Any identical elements will appear multiple times (use `.distinct()` to eliminate them).
         */
        def ++(other: RDD[T]): RDD[T] = withScope {
          this.union(other)
        }
      
       /** Build the union of a list of RDDs passed as variable-length arguments. */
        def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = withScope {
          union(Seq(first) ++ rest)
            // ++ => TraversableLike(可遍歷的Trait).++
        }
      
      def ++[B >: A, That](that: GenTraversableOnce[B])(implicit bf: CanBuildFrom[Repr, B, That]): That = {
          val b = bf(repr)
          // 判斷類型是否一致
          if (that.isInstanceOf[IndexedSeqLike[_, _]]) b.sizeHint(this, that.seq.size)
          b ++= thisCollection
          b ++= that.seq
          b.result
        }
      
  • 案例

    package com.ronnie.scala.core.transform_operator
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Operator_union {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName("union")
        val sc = new SparkContext(conf)
    
        val nvidiaGpu: RDD[(String, Int)] = sc.parallelize(List(("rtx2080Ti", 1), ("rtx2080S", 2), ("rtx2070S", 3), ("rtx2060S", 4)))
        val amdGpu: RDD[(String, Int)] = sc.parallelize(List(("radeon7", 1),("5800XT", 2),("5700XT", 3),("RX590",4)))
    
        nvidiaGpu.union(amdGpu).foreach(println)
      }
    }
    
  • intersection(otherDataset)

    • 源碼

      /**
         * Return the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did.
         * 返回 該 RDD 和 另一個(gè) RDD的 交集。即便輸入的 RDD 有重復(fù)的元素, 輸出的RDD不會(huì)包含任何重復(fù)的元素
         *
         * @note This method performs a shuffle internally.該方法內(nèi)部執(zhí)行了shuffle
         *
         * @param partitioner Partitioner to use for the resulting RDD 應(yīng)用于結(jié)果 RDD 的 分區(qū)器
         */
        def intersection(
            other: RDD[T],
            partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
            // cogroup 根據(jù)相同key分組
          this.map(v => (v, null)).cogroup(other.map(v => (v, null)), partitioner)
               // 過濾, 只留下key 和 value 都不為空的 
              .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
              .keys
        }
      
  • 案例

    package com.ronnie.scala.core.transform_operator
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.mutable.ListBuffer
    
    object Operator_mapPartitionsWithIndex {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("mapPartitionsWithIndex")
        val sc = new SparkContext(conf)
        val rdd = sc.makeRDD(Array("a", "b", "c"), 2)
    
    
        rdd.mapPartitionsWithIndex((index, iter) => {
          val list = new ListBuffer[String]()
          while (iter.hasNext){
            val v = iter.next()
            list.+(v)
            println("index = " + index + ", value = " + v)
          }
          list.iterator
        },false).foreach(println)
    
        sc.stop()
      }
    }
    
  • groupByKey([numTasks]) 【能少用就少用, 性能不好】

    • 源碼(位于 PairRDDFunctions 中)

       /**
         * Group the values for each key in the RDD into a single sequence. Allows controlling the partitioning of the resulting key-value pair RDD by passing a Partitioner.
         * 在 RDD 中對(duì)每個(gè) key 的 values 進(jìn)行分組 并放入一個(gè)簡(jiǎn)單的序列端壳。
         * 通過 傳遞一個(gè) 分區(qū)器 來 控制 結(jié)果的鍵值對(duì) RDD 的分區(qū)
         * The ordering of elements within each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.
         * 每個(gè)組中的 元素順序是無(wú)法保證的,甚至 每次 結(jié)果的RDD 被評(píng)估是 都不一定相同
         *
         * @note This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`or `PairRDDFunctions.reduceByKey` will provide much better performance.
         *該操作可能會(huì)非常昂貴。如果你是為了對(duì)每個(gè)key進(jìn)行聚合操作而使用grouping, 使用 PairRDDFunctions 類中的 aggregateByKey(根據(jù)key聚合) 或者 reduceByKey(根據(jù)key規(guī)約) 會(huì)提供更好的性能。
         *
         * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any key in memory.
         * 就目前的應(yīng)用而言, groupByKey 必須能容納 所有在內(nèi)存中的 key 的 鍵值對(duì)
         * If a key has too many values, it can result in an `OutOfMemoryError`.
         * 如果一個(gè) key 有太多value, 它可能會(huì)導(dǎo)致 OOM(內(nèi)存用盡)
         */
        def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
          // groupByKey shouldn't use map side combine because map side combine does not reduce the amount of data shuffled and requires all map side data be inserted into a hash table, leading to more objects in the old gen.
          // groupByKey 不應(yīng)該使用 map式 的合并, 因?yàn)?map式的合并 不能降低 數(shù)據(jù) shuffle 的數(shù)量 且 要求所有map式的數(shù)據(jù)都被插入到一個(gè) 哈希表中, 會(huì)導(dǎo)致 jvm 內(nèi)存中 的老年區(qū) 會(huì)有更多的對(duì)象(可能會(huì)觸發(fā) full gc)粤策。  
          // 創(chuàng)建合并器  
          val createCombiner = (v: V) => CompactBuffer(v)
          // 合并值  
          val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
          // 合并合并器  
          val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
          // 緩存  => combineByKeyWithClassTag
          val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
            createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
          // 緩存必須是 k-v形式的 RDD, 且 v 需為可迭代的集合  
          bufs.asInstanceOf[RDD[(K, Iterable[V])]]
        }
      
      /**
         * :: Experimental :: 實(shí)驗(yàn)性的
         * Generic function to combine the elements for each key using a custom set of aggregation functions. 
         * 對(duì)每個(gè) key 應(yīng)用 一個(gè)定制的 聚合函數(shù) 集合 來合并元素的 通用函數(shù) 
         * Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
         * RDD[(K, V)] => RDD[(K, C)], C為合并后的類型
         *
         * Users provide three functions: 使用者提供 以下三種函數(shù)
         *
         *  - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
         * createCombiner(創(chuàng)建合并器), 將V類轉(zhuǎn)化為C類(比如創(chuàng)建一個(gè) 單元素的列表) 
         *  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
         * mergeValue(合并值), 將 V 類合并入C(比如 將 元素添加到 列表尾)
         *  - `mergeCombiners`, to combine two C's into a single one.
         * mergeCombiners(合并合并器), 將兩個(gè) C類 合并為一個(gè)
         *
         * In addition, users can control the partitioning of the output RDD, and whether to perform map-side aggregation (if a mapper can produce multiple items with the same key).
         * 此外, 用戶可以控制輸出 RDD 的分區(qū) 以及 是否 執(zhí)行 map式的聚合 (對(duì)于同一個(gè) key )
         *
         * @note V and C can be different -- for example, one might group an RDD of type
         * (Int, Int) into an RDD of type (Int, Seq[Int]).
         */
        @Experimental
        def combineByKeyWithClassTag[C](
            createCombiner: V => C,//創(chuàng)建combiner
            mergeValue: (C, V) => C,//map端聚合
            mergeCombiners: (C, C) => C,//reduce端聚合
            partitioner: Partitioner, // 分區(qū)器
            mapSideCombine: Boolean = true, // 是否使用map式的合并
            serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
          require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 必須是 spark 0.9.0 及其以上的版本
          if (keyClass.isArray) {
            if (mapSideCombine) {
              throw new SparkException("Cannot use map-side combining with array keys.")
            }
             // 如果分區(qū)器是哈希分區(qū)器 
            if (partitioner.isInstanceOf[HashPartitioner]) {
              throw new SparkException("HashPartitioner cannot partition array keys.")
            }
          }
          val aggregator = new Aggregator[K, V, C](
            self.context.clean(createCombiner),
            self.context.clean(mergeValue),
            self.context.clean(mergeCombiners))
      
          /**
            * 判斷分區(qū)器是否相同,如果兩個(gè)連著的shuffle類算子分區(qū)器都是相同的贾惦,那么不會(huì)產(chǎn)生shuffle
            *不相同會(huì)產(chǎn)生shuffleRDD
            */
          if (self.partitioner == Some(partitioner)) {
            self.mapPartitions(iter => {
              val context = TaskContext.get()
              new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
            }, preservesPartitioning = true)
          } else {
            /**
              * 下面設(shè)置的aggregator 就是以上三個(gè)combiner的函數(shù)
              *  在ShuffledRDD中有個(gè)方法是 getDependencies 獲取寬窄依賴的關(guān)系
              */
            new ShuffledRDD[K, V, C](self, partitioner)
              .setSerializer(serializer)
              .setAggregator(aggregator)
              .setMapSideCombine(mapSideCombine)
          }
        }
      
  • 案例

    package com.ronnie.scala.core.transform_operator
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Operator_groupByKey {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("groupByKey")
        val sc= new SparkContext(conf)
    
        val rdd: RDD[(String, Int, Int, Int)] = sc.parallelize(Array(("amd", 12, 21, 37),("nvidia", 33, 22, 31),("intel", 27, 32, 42),("amd", 29, 34, 25),("nvidia", 47, 37, 36), ("intel", 23, 43, 38)))
        val result: RDD[(Int, Int, Int)] = rdd.map(e => (e._1, (e._2, e._3, e._4))).groupByKey().flatMap(line =>{
          line._2.toArray.sortBy(_._3)(Ordering[Int].reverse).take(2)
    
        })
        result.foreach(println)
      }
    }
    
  • reduceByKey(func, [numTasks])

    • 源碼

      /**
         * Merge the values for each key using an associative and commutative reduce function.
         * 通過 關(guān)聯(lián) 和 交換的 規(guī)約 函數(shù) 來合并每個(gè) key的值
         * 
         * This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.
         * 這同樣會(huì) 在發(fā)送結(jié)果到 reducer 之前 向 在每個(gè) mapper 上 執(zhí)行本地合并, 就像 MapReduce 中的 combiner
         */
        def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
          combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
        }
      
        /**
         * Merge the values for each key using an associative and commutative reduce function. This will
         * also perform the merging locally on each mapper before sending results to a reducer, similarly
         * to a "combiner" in MapReduce.
         *
         * Output will be hash-partitioned with numPartitions partitions.
         * 輸出會(huì)根據(jù)分區(qū)數(shù)量 進(jìn)行哈希分區(qū)
         */
        def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
          reduceByKey(new HashPartitioner(numPartitions), func)
        }
      
      /**
         * Merge the values for each key using an associative and commutative reduce function. This will
         * also perform the merging locally on each mapper before sending results to a reducer, similarly
         * to a "combiner" in MapReduce.
         * Output will be hash-partitioned with the existing partitioner/parallelism level.
         * 輸出會(huì)根據(jù)存在的分區(qū)數(shù)量/并行級(jí)別  進(jìn)行哈希分區(qū)
         */
        def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
          reduceByKey(defaultPartitioner(self), func)
        }
      
  • 案例

    package com.ronnie.scala.core.transform_operator
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Operator_reduceByKey {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("reduceByKey")
        val sc = new SparkContext(conf)
        val lines = sc.textFile("data/word.txt")
        lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).foreach(println)
        sc.stop()
      }
    }
    
  • sortByKey([ascending], [numTasks]) 【位于 OrderedRDDFunctions】

    • 源碼

      /**
         * Sort the RDD by key, so that each partition contains a sorted range of the elements. 
         * 根據(jù) key 對(duì) RDD進(jìn)行排序, 從而使每個(gè)分區(qū)都包含一個(gè)排序過的元素序列
         * Calling `collect` or `save` on the resulting RDD will return or output an ordered list of records (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in order of the keys).
         * 對(duì)結(jié)果 RDD 請(qǐng)求 收集 或 保存啤挎,會(huì)返回 或者 輸出 一個(gè)有序的記錄 列表 (如果 是 save 請(qǐng)求, 它們會(huì)以多個(gè) 分區(qū)文件的形式被寫入到文件系統(tǒng), 并且按key排序)
         */
        // TODO: this currently doesn't work on P other than Tuple2!
        // 目前只對(duì) Tuple2起作用
        def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
            : RDD[(K, V)] = self.withScope
        {
          val part = new RangePartitioner(numPartitions, self, ascending)
          new ShuffledRDD[K, V, V](self, part)
            .setKeyOrdering(if (ascending) ordering else ordering.reverse)
        }
      
  • 案例

    package com.ronnie.scala.core.transform_operator
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Operator_sortByKey {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("sortByKey")
        val sc = new SparkContext(conf)
        val lines = sc.textFile("./resource/word.txt")
        lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).map(f => {(f._1, f._2)}).sortByKey(false).map(f =>((f._2, f._1))).foreach(println)
        
        sc.stop()
      }
    }
    
  • sortBy(func,[ascending], [numTasks])

    • 源碼

      /**
         * Return this RDD sorted by the given key function.
         * 根據(jù) key 函數(shù) 對(duì) RDD進(jìn)行排序, 排序其實(shí)是隱式調(diào)用了 keyBy[K](f).sortByKey(true(升序, 分區(qū)數(shù)量)).values
         */
        def sortBy[K](
            f: (T) => K,
            ascending: Boolean = true,
            numPartitions: Int = this.partitions.length)
            (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
          this.keyBy[K](f)
              .sortByKey(ascending, numPartitions)
              .values
        }
      
  • 案例

    package com.ronnie.scala.core.transform_operator
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Operator_sortBy {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("sortBy")
        val sc = new SparkContext(conf)
    
        val lines = sc.textFile("./resources/word.txt")
        lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2, false).foreach(println)
        sc.stop()
      }
    }
    
  • join(otherDataset, [numTasks]) 【位于PairRDDFunction中】

    • 源碼

      /**
         * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. 
         * 返回一個(gè)含有 所有 'this' RDD 和'other' RDD 中 key匹配的 元素對(duì)的 RDD 
         * Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and (k, v2) is in `other`. 
         * 每一對(duì)元素都會(huì)以 (k, (v1, v2)) 的元組形式被返回, (k,v1) 來自 this RDD, (k,v2) 來自 other RDD
         * Uses the given Partitioner to partition the output RDD.
         * 使用 已得 的分區(qū)器 去 對(duì)輸出的RDD進(jìn)行分區(qū)
         */
        def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
          this.cogroup(other, partitioner).flatMapValues( pair =>
            for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
          )
        }
      
      /**
         * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to partition the output RDD.
         * 對(duì) this 和 other 進(jìn)行 左外聯(lián)操作, 基于 this 中的所有k-v(w)元素, 如果元素w存在于other, 結(jié)果 RDD 為包含 k-v形式, v為(原來的 v, Option.Some(w)); 如不存在v就為(原來的 v, Option.None)形式(option 可以預(yù)防空指針)
         * 使用已得的分區(qū)器對(duì) RDD進(jìn)行分區(qū)
         */
        def leftOuterJoin[W](
            other: RDD[(K, W)],
            partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope {
          this.cogroup(other, partitioner).flatMapValues { pair =>
            if (pair._2.isEmpty) {
              pair._1.iterator.map(v => (v, None))
            } else {
                // yield函數(shù)作用是: 記住每次迭代中的有關(guān)值,并逐一存入到一個(gè)數(shù)組中项钮。
              for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
            }
          }
        }
      
      /**
         * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
         * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
         * pair (k, (None, w)) if no elements in `this` have key k.
         * 對(duì) this 和 other 進(jìn)行 右外聯(lián)操作, 基于 other 中的所有k-v(w)元素, 如果元素w存在于this, 結(jié)果 RDD 為包含 k-v形式, v為(原來的 v, Option.Some(w)); 如不存在v就為(原來的 v, Option.None)形式
         * Uses the given Partitioner to partition the output RDD.
         */
        def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
            : RDD[(K, (Option[V], W))] = self.withScope {
          this.cogroup(other, partitioner).flatMapValues { pair =>
            if (pair._1.isEmpty) {
              pair._2.iterator.map(w => (None, w))
            } else {
                 // yield函數(shù)作用是: 記住每次迭代中的有關(guān)值棺蛛,并逐一存入到一個(gè)數(shù)組中。
              for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
            }
          }
        }
      
      /**
         * Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
         * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
         * the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
         * element (k, w) in `other`, the resulting RDD will either contain all pairs
         * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
         * in `this` have key k. 
         * 對(duì) this 和 other 進(jìn)行 完全外聯(lián)操作, 結(jié)果RDD為key-value形式, 其中value為(Option.x, Option.y), 若 this 中有該值, 則x為Some(該值), 否則為None, 同理若 other中有該值, 則y為Some(該值), 否則為None
         * Uses the given Partitioner to partition the output RDD.
         * 使用已得的分區(qū)器 對(duì) 輸出的RDD進(jìn)行分區(qū)
         */
        def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
            : RDD[(K, (Option[V], Option[W]))] = self.withScope {
          this.cogroup(other, partitioner).flatMapValues {
            case (vs, Seq()) => vs.iterator.map(v => (Some(v), None))
            case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)))
            case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w))
          }
        }
      
  • 案例

    package com.ronnie.scala.core.transform_operator
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Operator_join {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("join")
        val sc = new SparkContext(conf)
    
        val rdd01: RDD[(String, Int)] = sc.parallelize(Array(("a", 1), ("b", 2), ("c", 3)))
    
        val rdd02: RDD[(String, Int)] = sc.parallelize(Array(("a", 1), ("d", 2), ("e", 3)))
    
        val result: RDD[(String, (Int, Int))] = rdd01.join(rdd02)
    
        result.foreach(println)
    
        rdd01.rightOuterJoin(rdd02).foreach(println)
    
        rdd01.leftOuterJoin(rdd02).foreach(println)
    
        rdd01.fullOuterJoin(rdd02).foreach(println)
    
      }
    }
    
  • cogroup(otherDataset, [numTasks]) 【位于pairRDDFunction中】

    • 源碼

        /**
         * For each key k in `this` or `other1` or `other2` or `other3`,
         * return a resulting RDD that contains a tuple with the list of values
         * for that key in `this`, `other1`, `other2` and `other3`(這四個(gè)都是迭代器).
         * 對(duì)每個(gè)在 'this' 或 'other1' 或 'other2' 或 'other3' 中的 key, 返回一個(gè)含有它們對(duì)應(yīng)的 value 的值 的元組
         */
        def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
            other2: RDD[(K, W2)],
            other3: RDD[(K, W3)],
            partitioner: Partitioner)
            : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
             // 如果分區(qū)器是哈希分區(qū)器 且 key的類是數(shù)組, 就拋出以下異常   
          if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
            throw new SparkException("HashPartitioner cannot partition array keys.")
          }
          val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
          cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
              // 如果是 數(shù)組就判斷 迭代器對(duì)應(yīng)類型是否一致
             (vs.asInstanceOf[Iterable[V]],
               w1s.asInstanceOf[Iterable[W1]],
               w2s.asInstanceOf[Iterable[W2]],
               w3s.asInstanceOf[Iterable[W3]])
          }
        }
      
        /**
         * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the list of values for that key in `this` as well as `other`.
         * 對(duì)于 this 或 other 迭代器中的每一個(gè) key, 返回一個(gè) 包含了 key 所對(duì)應(yīng)的的 value 的元組 的 結(jié)果RDD 
         */
        def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
            : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
          if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
            throw new SparkException("HashPartitioner cannot partition array keys.")
          }
          val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
          cg.mapValues { case Array(vs, w1s) =>
            (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
          }
        }
      
        /**
         * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
         * tuple with the list of values for that key in `this`, `other1` and `other2`.
         * 對(duì)于 this 或 other 或 other2 迭代器中的每一個(gè) key, 返回一個(gè) 包含了 key 所對(duì)應(yīng)的的 value 的元組 的 結(jié)果RDD 
         */
        def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
            : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
          if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
            throw new SparkException("HashPartitioner cannot partition array keys.")
          }
          val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
          cg.mapValues { case Array(vs, w1s, w2s) =>
            (vs.asInstanceOf[Iterable[V]],
              w1s.asInstanceOf[Iterable[W1]],
              w2s.asInstanceOf[Iterable[W2]])
          }
        }
      
        /**
         * For each key k in `this` or `other1` or `other2` or `other3`,
         * return a resulting RDD that contains a tuple with the list of values
         * for that key in `this`, `other1`, `other2` and `other3`.
         * 對(duì)于 this 或 other 或 other2 或 other3 迭代器中的每一個(gè) key, 返回一個(gè) 包含了 key 所對(duì)應(yīng)的的 value 的元組 的 結(jié)果RDD 
         */
        def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
            : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
          cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))
        }
      
        /**
         * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
         * list of values for that key in `this` as well as `other`.
         * 對(duì)于 this 或 other 迭代器中的每一個(gè) key, 返回一個(gè) 包含了 key 所對(duì)應(yīng)的的 value 的元組 的 結(jié)果RDD 
         */
        def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
          cogroup(other, defaultPartitioner(self, other))
        }
      
        /**
         * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
         * tuple with the list of values for that key in `this`, `other1` and `other2`.
         * 對(duì)于 this 或 other 或 other2 迭代器中的每一個(gè) key, 返回一個(gè) 包含了 key 所對(duì)應(yīng)的的 value 的元組 的 結(jié)果RDD 
         */
        def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
            : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
          cogroup(other1, other2, defaultPartitioner(self, other1, other2))
        }
      
        /**
         * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
         * list of values for that key in `this` as well as `other`.
         * 對(duì)于 this 或 other 迭代器中的每一個(gè) key, 返回一個(gè) 包含了 key 所對(duì)應(yīng)的的 value 的元組 的 結(jié)果RDD 
         */
        def cogroup[W](
            other: RDD[(K, W)],
            numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
          cogroup(other, new HashPartitioner(numPartitions))
        }
      
        /**
         * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
         * tuple with the list of values for that key in `this`, `other1` and `other2`.
         *對(duì)于 this 或 other 或 other2 迭代器中的每一個(gè) key, 返回一個(gè) 包含了 key 所對(duì)應(yīng)的的 value 的元組 的 結(jié)果RDD 
         */
        def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
            : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
          cogroup(other1, other2, new HashPartitioner(numPartitions))
        }
      
        /**
         * For each key k in `this` or `other1` or `other2` or `other3`,
         * return a resulting RDD that contains a tuple with the list of values
         * for that key in `this`, `other1`, `other2` and `other3`.
         *對(duì)于 this 或 other 或 other2 或 other3 迭代器中的每一個(gè) key, 返回一個(gè) 包含了 key 所對(duì)應(yīng)的的 value 的元組 的 結(jié)果RDD 
         */
        def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
            other2: RDD[(K, W2)],
            other3: RDD[(K, W3)],
            numPartitions: Int)
            : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
          cogroup(other1, other2, other3, new HashPartitioner(numPartitions))
        }
      
  • 案例

    package com.ronnie.scala.core.transform_operator
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Operator_cogroup {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("cogroup")
        
        val sc = new SparkContext(conf)
        
        val a: RDD[Int] = sc.parallelize(List(1,2,3,4))
        
        val b: RDD[String] = sc.parallelize(List("a", "b", "c","d"))
        
        val c: RDD[(Int, String)] = a.zip(b)
        
        c.cogroup(c).collect()
        
      }
    }
    
  • coalesce(numPartitions)

    • 源碼

      /**
         * Return a new RDD that is reduced into `numPartitions` partitions.
         * 返回一個(gè)已經(jīng)規(guī)約入如分區(qū)數(shù)量的分區(qū)的RDD
         *
         * This results in a narrow dependency, e.g. if you go from 1000 partitions
         * to 100 partitions, there will not be a shuffle, instead each of the 100
         * new partitions will claim 10 of the current partitions. 
         * 這些結(jié)果的父分區(qū)和子分區(qū)都是窄依賴的, 如果你把1000個(gè)分區(qū)規(guī)約成100個(gè)分區(qū), 那就不會(huì)有shuffle產(chǎn)生, 反之, 100 個(gè)分區(qū)的每一個(gè)分區(qū)都會(huì)聲明10個(gè)當(dāng)前分區(qū)
         * If a larger number of partitions is requested, it will stay at the current number of partitions.
         * 如果大量分區(qū)被請(qǐng)求, 它會(huì)維持當(dāng)前的分區(qū)數(shù)
         *
         * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
         * this may result in your computation taking place on fewer nodes than
         * you like (e.g. one node in the case of numPartitions = 1). 
         * 如果你要執(zhí)行一個(gè)急劇的 合并, 比如分區(qū)數(shù)合并為1, 這可能會(huì)導(dǎo)致你的計(jì)算 發(fā)生在比你預(yù)想的更少的節(jié)點(diǎn)上扣草。
         * To avoid this, you can pass shuffle = true. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).
         * 你可以通過傳遞 shuffle = true 參數(shù)來避免以上情況的發(fā)生。這樣會(huì)加入一個(gè)shuffle的過程, 但是當(dāng)前的上游分區(qū)會(huì)被并行執(zhí)行(無(wú)論當(dāng)前分區(qū)時(shí)哪個(gè))
         * @note With shuffle = true, you can actually coalesce to a larger number
         * of partitions. 
         * 使用 shuffle 時(shí), 你可以將 將 RDD 合并后的分區(qū)數(shù)調(diào)大
         * This is useful if you have a small number of partitions,say 100, potentially with a few partitions being abnormally large. 
         * 當(dāng)你只有一小部分分區(qū)的時(shí)候, 若有一部分分區(qū)異常的大(數(shù)據(jù)傾斜), 以上操作可能會(huì)有用颜屠。
         * Calling coalesce(1000, shuffle = true) will result in 1000 partitions with the data distributed using a hash partitioner. 
         * 調(diào)用 合并時(shí)傳入1000(分區(qū)數(shù)) 和 使用shuffle, 會(huì)產(chǎn)生 1000 個(gè)分區(qū) 以及 由 哈希分區(qū)器 分區(qū)的 數(shù)據(jù)
         * The optional partition coalescer passed in must be serializable.
         * 傳入的可選的 分區(qū)合并器 必須是可序列化的
         */
        def coalesce(numPartitions: Int, shuffle: Boolean = false,
                     partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
                    (implicit ord: Ordering[T] = null)
            : RDD[T] = withScope {
             // 分區(qū)數(shù)需要大于0   
          require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
             // 如果使用shuffle   
          if (shuffle) {
            /** Distributes elements evenly across output partitions, starting from a random partition. 
             * 從一個(gè)隨機(jī)分區(qū)開始, 元素均勻地分布在輸出分區(qū)上
             */
            val distributePartition = (index: Int, items: Iterator[T]) => {
              var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
              items.map { t =>
                // Note that the hash code of the key will just be the key itself.
                // 該 key 的哈希碼 只能是它自己  
                //The HashPartitioner will mod it with the number of total partitions.
                // 該哈希分區(qū)器會(huì)隨著總分區(qū)的數(shù)改變而修改
                position = position + 1
                (position, t)
              }
            } : Iterator[(Int, T)]
      
            // include a shuffle step so that our upstream tasks are still distributed
            // 包含一個(gè)shuffle階段從而使得我們的上游任務(wù)仍舊是分布式的  
            new CoalescedRDD(
              new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
              new HashPartitioner(numPartitions)),
              numPartitions,
              partitionCoalescer).values
          } else {
            new CoalescedRDD(this, numPartitions, partitionCoalescer)
          }
        }
      
  • 案例

    package com.ronnie.scala.core.transform_operator
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.mutable.ListBuffer
    
    object Operator_coalesce {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("coalesce")
        val sc: SparkContext = new SparkContext(conf)
    
        val rdd01: RDD[Int] = sc.parallelize(Array(1,2,3,4,5,6), 4)
    
    
        val rdd02: RDD[String] = rdd01.mapPartitionsWithIndex((partitionIndex, iter) =>{
          val list = new ListBuffer[String]()
          while (iter.hasNext){
            list += "rdd01 partitionIndex: " + partitionIndex + ", value: " + iter.next()
          }
          list.iterator
        })
        rdd02.foreach(println)
    
        val rdd03: RDD[String] = rdd02.coalesce(5, false)
        println("rdd03 number: " + rdd03.getNumPartitions)
    
        val rdd04: RDD[String] = rdd03.mapPartitionsWithIndex((partitionIndex, iter) =>{
          val list = new ListBuffer[String]()
          while (iter.hasNext){
            list += "coalesce partitionIndex: " + partitionIndex + ", value: " + iter.next()
          }
          list.iterator
        })
        rdd04.foreach(println)
        sc.stop()
      }
    }
    
  • repartition(numPartitions)

    • 源碼

      /**
         * Return a new RDD that has exactly numPartitions partitions.
         * 返回由相同分區(qū)數(shù)分區(qū)的新的RDD
         * Can increase or decrease the level of parallelism in this RDD. 
         * 可以升高或者降低該RDD的并行級(jí)別
         * Internally, this uses a shuffle to redistribute data.
         * 內(nèi)部使用了shuffle 來將數(shù)據(jù)重新分布
         * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, which can avoid performing a shuffle.
         * 如果你要降低該RDD的分區(qū)數(shù), 考慮使用 coalesce, 可以避免使用 shuffle
         * TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207.
         * 要修正 shuffle +重分區(qū) 可能導(dǎo)致的數(shù)據(jù)丟失, 請(qǐng)參考 SPARK-23207
         */
        def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
          coalesce(numPartitions, shuffle = true)
        }
      
  • 案例

    package com.ronnie.scala.core.transform_operator
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.mutable.ListBuffer
    
    object Operator_repartition {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("repartition")
        val sc = new SparkContext(conf)
    
        val rdd01: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7), 3)
    
        val rdd02: RDD[String] = rdd01.mapPartitionsWithIndex((partitionIndex, iter) =>{
          val list = new ListBuffer[String]()
          while (iter.hasNext){
            list += "rdd01partitionIndex : "+partitionIndex+",value :"+iter.next()
          }
          list.iterator
        })
        rdd02.foreach(println)
    
        val rdd03 = rdd02.repartition(4)
        val result = rdd03.mapPartitionsWithIndex((partitionIndex, iter) =>{
          val list = ListBuffer[String]()
          while(iter.hasNext){
            list +=("repartitionIndex : "+partitionIndex+",value :"+iter.next())
          }
          list.iterator
        })
        
        result.foreach(println)
        
        sc.stop()
      }
    }
    
  • repartitionAndSortWithinPartitions(partitioner) 【位于 OrderedRDDFunctions中】

    • 源碼

      /**
         * Repartition the RDD according to the given partitioner and, within each resulting partition,sort records by their keys.
         *  根據(jù)已得的 RDD分區(qū)器 對(duì) RDD進(jìn)行重新分區(qū), 并且在每個(gè)結(jié)果分區(qū)中, 對(duì)每條記錄根據(jù)他們的 key進(jìn)行排序
         * This is more efficient than calling `repartition` and then sorting within each partition because it can push the sorting down into the shuffle machinery.
         * 這是一個(gè)比調(diào)用 repartition 再 對(duì)每個(gè)分區(qū)進(jìn)行排序更高效的 算子, 因?yàn)樗梢詫⑴判蚝蟮慕Y(jié)果一次推入 shuffle 體系 
         */
        def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
          new ShuffledRDD[K, V, V](self, partitioner).setKeyOrdering(ordering)
        }
      
  • 案例

    package com.ronnie.scala.core.transform_operator
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
    
    object Operator_repartitionAndSortWithinPartitions {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("repartitionAndSortWithinPartitions")
        val sc = new SparkContext(conf)
    
        val rdd01: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8))
        rdd01.zipWithIndex().repartitionAndSortWithinPartitions(new HashPartitioner(1)).foreach(println)
      }
    }
    

Action

  • reduce(func)

    • 源碼

      /**
         * Reduces the elements of this RDD using the specified commutative and
         * associative binary operator.
         * 根據(jù)特定的交換和相關(guān)的二進(jìn)制操作器 來規(guī)約 RDD 的元素
         */
        def reduce(f: (T, T) => T): T = withScope {
          val cleanF = sc.clean(f)
          val reducePartition: Iterator[T] => Option[T] = iter => {
            if (iter.hasNext) {
              Some(iter.reduceLeft(cleanF))
            } else {
              None
            }
          }
          var jobResult: Option[T] = None
          val mergeResult = (index: Int, taskResult: Option[T]) => {
              // isDefined <=> !isEmpty
            if (taskResult.isDefined) {
              jobResult = jobResult match {
                case Some(value) => Some(f(value, taskResult.get))
                case None => taskResult
              }
            }
          }
          sc.runJob(this, reducePartition, mergeResult)
          // Get the final result out of our Option, or throw an exception if the RDD was empty
          // 獲取最終的Option結(jié)果, 如果 RDD為空則拋出異常  
          jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
        }
      
      /**
         * Reduces the elements of this RDD in a multi-level tree pattern.
         * 按照多層級(jí)的樹狀模型來規(guī)約RDD中的元素
         * treeReduce可以對(duì)任何RDD使用辰妙,相當(dāng)于是reduce操作的泛化
         *
         * @param depth suggested depth of the tree (default: 2)
         * param: 建議的樹的深度(默認(rèn)為2)
         * @see [[org.apache.spark.rdd.RDD#reduce]]
         */
        def treeReduce(f: (T, T) => T, depth: Int = 2): T = withScope {
            // 樹的深度必須要 >= 1
          require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
          val cleanF = context.clean(f)
          val reducePartition: Iterator[T] => Option[T] = iter => {
            if (iter.hasNext) {
              Some(iter.reduceLeft(cleanF))
            } else {
              None
            }
          }
          val partiallyReduced = mapPartitions(it => Iterator(reducePartition(it)))
          val op: (Option[T], Option[T]) => Option[T] = (c, x) => {
              // isDefined <=> !isEmpty
            if (c.isDefined && x.isDefined) {
              Some(cleanF(c.get, x.get))
            } else if (c.isDefined) {
              c
            } else if (x.isDefined) {
              x
            } else {
              None
            }
          }
          partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
            .getOrElse(throw new UnsupportedOperationException("empty collection"))
        }
      
  • 案例

    • 見wordCount
  • collect()

    • 源碼

      /**
         * Return an array that contains all of the elements in this RDD.
         * 返回包含所有RDD中元素的一個(gè)數(shù)組
         * @note This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory.
         * 該方法僅在期望的結(jié)構(gòu)數(shù)組比較小時(shí)才適用, 因?yàn)樗械臄?shù)據(jù)都會(huì)加載到 driver 的內(nèi)存中
         */
        def collect(): Array[T] = withScope {
          val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
          Array.concat(results: _*)
        }
      
  • 案例

    package com.ronnie.scala.core.action_operator
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Operator_count {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("count")
        val sc = new SparkContext(conf)
    
        val lines: RDD[String] = sc.textFile("./resources/word.txt")
    
        val result: Long = lines.count()
    
        println(result)
        sc.stop()
      }
    }
    
  • count()

    • 源碼

       /**
         * Return the number of elements in the RDD.
         * 返回 RDD中的元素
         */
        def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
      
  • 案例

    package com.ronnie.scala.core.action_operator
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Operator_count {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("count")
        val sc = new SparkContext(conf)
    
        val lines: RDD[String] = sc.textFile("./resources/word.txt")
    
        val result: Long = lines.count()
    
        println(result)
        sc.stop()
      }
    }
    
  • first()

    • 源碼

      /**
         * Return the first element in this RDD.
         * 返回該RDD中的第一個(gè)元素 <=> take(1)
         */
        def first(): T = withScope {
          take(1) match {
            case Array(t) => t
            case _ => throw new UnsupportedOperationException("empty collection")
          }
        }
      
  • 案例

  • take(n)

    • 源碼

      /**
         * Take the first num elements of the RDD. 
         * 獲取第一個(gè)RDD中對(duì)應(yīng)傳入數(shù)字的第一個(gè)元素
         * It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.
         * 它先通過掃描一個(gè)分區(qū), 并使用分區(qū)的結(jié)果來預(yù)估 所需的額外的分區(qū)來滿足限制
         * 
         * @note This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory.
         * 該方法僅在期望的結(jié)構(gòu)數(shù)組比較小時(shí)才適用, 因?yàn)樗械臄?shù)據(jù)都會(huì)加載到 driver 的內(nèi)存中
         * @note Due to complications in the internal implementation, this method will raise an exception if called on an RDD of `Nothing` or `Null`.
         * 由于內(nèi)部實(shí)現(xiàn)的復(fù)雜, 該方法在調(diào)用的RDD為 Nothing或空時(shí)會(huì)拋出異常
         */
        def take(num: Int): Array[T] = withScope {
          val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2)
          if (num == 0) {
            new Array[T](0)
          } else {
            val buf = new ArrayBuffer[T]
            val totalParts = this.partitions.length
            var partsScanned = 0
            while (buf.size < num && partsScanned < totalParts) {
              // The number of partitions to try in this iteration. It is ok for this number to be greater than totalParts because we actually cap it at totalParts in runJob.
              // 進(jìn)行此次迭代的分區(qū)數(shù), 該參數(shù)可以比總的分區(qū)數(shù)大, 我們可以在runJob的總分區(qū)數(shù)中以cap(Consistency, Avaliability, Partition Tolerance)  
              var numPartsToTry = 1L
              val left = num - buf.size
              if (partsScanned > 0) {
                // If we didn't find any rows after the previous iteration, quadruple and retry.
                // 如果在上次迭代之后 沒有找到任何行, 就將迭代分區(qū)數(shù)變?yōu)樗瓉淼乃谋?并重試 
                // Otherwise, interpolate the number of partitions we need to try, but overestimate it by 50%. We also cap the estimation in the end.
                // 如果找到了任何行, J就在它內(nèi)部插入我們想測(cè)試的分區(qū)數(shù), 并且提升百分之50的預(yù)估值, 最終我們也以cap來評(píng)估它
                  
                // 如果緩存為空
                if (buf.isEmpty) {
                    // 就將嘗試的分區(qū)數(shù)修改為 掃描的分區(qū)數(shù) * 上升的因子
                  numPartsToTry = partsScanned * scaleUpFactor
                } else {
                  // As left > 0, numPartsToTry is always >= 1
                  // 如果 num左側(cè)的數(shù)量大于 0, 要嘗試的分區(qū)數(shù)也會(huì) >= 1
                    // ceil 向上取整
                  numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
                  numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
                }
              }
      
              val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
              val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
              // ++= 將一批元素添加至末尾
              res.foreach(buf ++= _.take(num - buf.size))
              partsScanned += p.size
            }
      
            buf.toArray
          }
        }
      
  • 案例

    package com.ronnie.scala.core.action_operator
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Operator_take {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("take")
        val sc = new SparkContext(conf)
        
        val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6))
        
        val r: Array[Int] = rdd.take(3)
        r.foreach(println)
      }
    }
    
  • takeOrdered(n, [ordering])

    • 源碼

      /**
         * Returns the first k (smallest) elements from this RDD as defined by the specified implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]].
         * 說白了就是獲取有序的前幾個(gè)最小的元素, 排序有隱式函數(shù) Ordering[T] 維護(hù), 效果與 top算子相反
         * For example:
         * {{{
         *   sc.parallelize(Seq(10, 4, 2, 12, 3)).takeOrdered(1)
         *   // returns Array(2)
         *
         *   sc.parallelize(Seq(2, 3, 4, 5, 6)).takeOrdered(2)
         *   // returns Array(2, 3)
         * }}}
         *
         * @note This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory.
         * 該方法僅在期望的結(jié)構(gòu)數(shù)組比較小時(shí)才適用, 因?yàn)樗械臄?shù)據(jù)都會(huì)加載到 driver 的內(nèi)存中
         *
         * @param num k, the number of elements to return
         * num k, 返回的元素?cái)?shù)量
         * @param ord the implicit ordering for T
         * ord, 隱式地對(duì)數(shù)組T進(jìn)行排序
         * @return an array of top elements
         */
        def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
          if (num == 0) {
            Array.empty
          } else {
            val mapRDDs = mapPartitions { items =>
              // Priority keeps the largest elements, so let's reverse the ordering.
              // 默認(rèn)優(yōu)先保留最大的元素, 所以我們需要逆轉(zhuǎn)排序  
              val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
              queue ++= collectionUtils.takeOrdered(items, num)(ord)
              Iterator.single(queue)
            }
             // 如果mapRDD的分區(qū)長(zhǎng)度為0 
            if (mapRDDs.partitions.length == 0) {
                // 就清空數(shù)組
              Array.empty
            } else {
                // 否則就把第二個(gè)隊(duì)列按順序合并到第一個(gè)對(duì)列末尾, 然后轉(zhuǎn)成數(shù)組再排序
              mapRDDs.reduce { (queue1, queue2) =>
                queue1 ++= queue2
                queue1
              }.toArray.sorted(ord)
            }
          }
        }
      
  • 案例

    package com.ronnie.scala.core.action_operator
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Operator_takeOrdered {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("take")
        val sc = new SparkContext(conf)
        
        val rdd = sc.makeRDD(List(1, -21, 30, 45, 6, 98.1, 0.97))
        rdd.takeOrdered(9).foreach(println)
    
      }
    }
    
  • saveAsTextFile(path)

    • 源碼

      /**
         * Save this RDD as a text file, using string representations of elements.
         * 將該RDD中儲(chǔ)存的元素保存為text文件, 使用字符串來代替元素
         */
        def saveAsTextFile(path: String): Unit = withScope {
          // https://issues.apache.org/jira/browse/SPARK-2075
          
          // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit Ordering for it and will use the default `null`.
          // 空可寫入(如key, null) 在hadoop 1.x 版本中是可比較的, 所以編譯器并不能找到隱式的排序函數(shù), 并且使用默認(rèn)的 null 
          //  However, it's a `Comparable[NullWritable]`in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an Ordering for `NullWritable`. 
          // 然而在hadoop2.x版本中, 它變成了Comparable[NullWritable], 所以編譯器能隱式調(diào)用排序方法來對(duì)空可寫入進(jìn)行排序
          // That's why the compiler will generate different anonymous classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
          // 這也是為什么編譯器會(huì)對(duì) Hadoop1.x 和 Hadoop2.x 版本的 saveAsTextFile 生成不同的匿名類
          // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate same bytecodes for `saveAsTextFile`.
         // 從而, 我們就在此提供一個(gè)顯示的空排序來確保 編譯器能夠?qū)aveAsTextFile生成相同的字節(jié)碼文件   
          val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
          val textClassTag = implicitly[ClassTag[Text]]
          val r = this.mapPartitions { iter =>
            val text = new Text()
            iter.map { x =>
              text.set(x.toString)
              (NullWritable.get(), text)
            }
          }
          RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
            .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
        }
      
      /**
         * Save this RDD as a compressed text file, using string representations of elements.
         * 將此 RDD 保存為一個(gè)壓縮的 text 文件, 用 字符串來代替元素甫窟。(多個(gè)壓縮方式參數(shù)avro, parquet ...)
         */
        def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = withScope {
          // https://issues.apache.org/jira/browse/SPARK-2075
          val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
          val textClassTag = implicitly[ClassTag[Text]]
          val r = this.mapPartitions { iter =>
            val text = new Text()
            iter.map { x =>
              text.set(x.toString)
              (NullWritable.get(), text)
            }
          }
          RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
            .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
        }
      
  • 案例

    package com.ronnie.scala.core.action_operator
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Operator_saveAsTextFile {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("saveAsTextFile")
        val sc = new SparkContext(conf)
    
        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Life is just a journey", 1), ("Whatever make you confused, please follow your mind", 2), ("A life so changed", 3)))
    
        rdd.saveAsTextFile("./resource/testTextFile")
      }
    }
    
  • saveAsSequenceFile(path) 【位于SequenceFileRDDFunctions 中】

    • 源碼

      /**
         * Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key and value types. 
         * 通過使用寫入類型來將RDD輸出為一個(gè)hadoop序列文件, 我們通過 RDD 的 key 和 value類型來推斷 可寫入的類型
         * If the key or value are Writable, then we use their classes directly;
         * 如果key或者value 是可寫入的, 我們則可以直接使用他們的類
         * otherwise we map primitive types such as Int and Double to IntWritable, DoubleWritable, etc, byte arrays to BytesWritable, and Strings to Text. 
         * 否則, 我們就會(huì)將 key 和 value 去和原始類型進(jìn)行匹配, 比如IntWritable, DoubleWritable (byte 數(shù)組 => ByteWritbale密浑, Strings => IntWritable)
         * The `path` can be on any Hadoop-supported file system.
         * 路徑可以在任意hadoop支持的文件系統(tǒng)上
         */
        def saveAsSequenceFile(
            path: String,
            codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
          def anyToWritable[U <% Writable](u: U): Writable = u
      
          // TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and valueWritableClass at the compile time.
           // 我們不能在編譯時(shí)強(qiáng)迫  anyToWritable 的返回類型 和 keyWritableClass 
          //  To implement that, we need to add type parameters to SequenceFileRDDFunctions. 
          // 為了實(shí)現(xiàn)以上, 我們需要將參數(shù)類型添加到 SequenceFileRDDFunctions 類中 
          //  however, SequenceFileRDDFunctions is a public class so it will be a breaking change.
          // 然而,  SequenceFileRDDFunctions 是一個(gè)共有類, 所以以上操作會(huì)導(dǎo)致爆炸性的改變 
          val convertKey = self.keyClass != _keyWritableClass
          val convertValue = self.valueClass != _valueWritableClass
      
          logInfo("Saving as sequence file of type " +
            s"(${_keyWritableClass.getSimpleName},${_valueWritableClass.getSimpleName})" )
          val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
          val jobConf = new JobConf(self.context.hadoopConfiguration)
            // 如果不能轉(zhuǎn)化key, 也不能轉(zhuǎn)化value(類型不匹配導(dǎo)致)
          if (!convertKey && !convertValue) {
            self.saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
            // 如果不能轉(zhuǎn)化 key, 能轉(zhuǎn)化 value  
          } else if (!convertKey && convertValue) {
            self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile(
              path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
              // 如果能轉(zhuǎn)化key, 不能轉(zhuǎn)化value
          } else if (convertKey && !convertValue) {
            self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile(
              path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
              // 如果能轉(zhuǎn)化key 且 能轉(zhuǎn)化 value
          } else if (convertKey && convertValue) {
            self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(
              path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
          }
        }
      
  • 案例

    package com.ronnie.scala.core.action_operator
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Operator_saveAsSequenceFile {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("saveAsSequenceFile")
        val sc = new SparkContext(conf)
    
        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Life is just a journey", 1), ("Whatever make you confused, please follow your mind", 2), ("A life so changed", 3)))
    
        rdd.saveAsTextFile("./resource/testSequenceFile")
      }
    }
    
  • saveAsObjectFile(path)

    • 源碼

      /**
         * Save this RDD as a SequenceFile of serialized objects.
         * 保存RDD問一個(gè)序列化過的序列文件
         */
        def saveAsObjectFile(path: String): Unit = withScope {
          this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
            .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
            .saveAsSequenceFile(path)
        }
      
  • 案例

    package com.ronnie.scala.core.action_operator
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    object Operation_saveAsObjectFile {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("saveAsObjectFile")
        val sc = new SparkContext(conf)
    
        val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Life is just a journey", 1), ("Whatever make you confused, please follow your mind", 2), ("A life so changed", 3)))
    
        rdd.saveAsTextFile("./resource/testObjectFile")
      }
    }
    
  • countByKey()

    • 源碼

      /**
         * Return the count of each unique value in this RDD as a local map of (value, count) pairs.
         * 返回RDD中每一個(gè)唯一的value和它對(duì)應(yīng)的計(jì)數(shù) 的本地視圖
         *
         * @note This method should only be used if the resulting map is expected to be small, as the whole thing is loaded into the driver's memory.
         *  該方法僅在期望的結(jié)果map比較小時(shí)才適用, 因?yàn)樗械臄?shù)據(jù)都會(huì)加載到 driver 的內(nèi)存中
         * To handle very large results, consider using
         * 為了控制這些變化, 考慮使用以下方式來替代
         * {{{
         * rdd.map(x => (x, 1L)).reduceByKey(_ + _)
         * }}}
         *
         * , which returns an RDD[T, Long] instead of a map.
         * 返回一個(gè) k, v形式的RDD(v 為 Long) 而不是map
         */
        def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {
          map(value => (value, null)).countByKey()
        }
      
  • 案例

    package com.ronnie.scala.core.action_operator
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Operator_countByKey {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("countByKey")
        val sc = new SparkContext(conf)
    
        val rdd: RDD[(String, Int)] = sc.makeRDD(List(
               ("a", 100),
               ("b", 200),
               ("c", 300),
               ("d", 400)))
    
        val rdd01 = sc.parallelize(List(
          ("a", 100),
          ("b", 200),
          ("a", 300),
          ("c", 400)
        ))
    
        val result: collection.Map[String, Long] = rdd01.countByKey()
    
        result.foreach(println)
        sc.stop()
      }
    }
    
  • foreach(func)

    • 源碼

      /**
         * Applies a function f to all elements of this RDD.
         * 對(duì)RDD中的所有元素應(yīng)用一個(gè)函數(shù)
         */
        def foreach(f: T => Unit): Unit = withScope {
          val cleanF = sc.clean(f)
          sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
        }
      
  • 案例

    • 見wordCount
  • foreachPartition

    • 源碼

      /**
         * Applies a function f to each partition of this RDD.
         * 對(duì)RDD中的每一個(gè)分區(qū)應(yīng)用一個(gè)函數(shù)
         */
        def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
          val cleanF = sc.clean(f)
          sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
        }
      
      
  • 案例

    package com.ronnie.scala.core.action_operator
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Operation_foreachPartition {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("foreachPartition")
        val sc = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("./resources/word.txt")
    
        lines.foreachPartition(x =>{
          println("連接數(shù)據(jù)庫(kù)......")
          while (x.hasNext){
            println(x.next())
          }
          println("關(guān)閉數(shù)據(jù)庫(kù)......")
    
        })
      }
    }
    

Manipulate

  • cache

    • 源碼

      /**
         * Persist this RDD with the default storage level (`MEMORY_ONLY`).
         * 等于只使用內(nèi)存的persist, 其實(shí)就是對(duì) 下面的persist方法包裝了一下
         */
        def cache(): this.type = persist()
      
      /**
       * Persist this RDD with the default storage level (`MEMORY_ONLY`).
       */
       def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
      
  • persist

    • 源碼
      /**- Mark this RDD for persisting using the specified level.
      - 使用特殊的存儲(chǔ)級(jí)別來標(biāo)記該RDD的持久化
      *
      - @param newLevel the target storage level 新的目標(biāo)存儲(chǔ)級(jí)別
      - @param allowOverride whether to override any existing level with the new one
      - 是否允許覆蓋已經(jīng)存在的存儲(chǔ)級(jí)別
      */
      private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
      // TODO: Handle changes of StorageLevel 處理存儲(chǔ)界別的變化
      // 如果 當(dāng)前存儲(chǔ)級(jí)別 不為NONE 且 新存儲(chǔ)級(jí)別 不為舊存儲(chǔ)級(jí)別 且 不允許覆蓋, 就拋出不支持的操作異常
      if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of an RDD after it was already assigned a level")
      }
      // If this is the first time this RDD is marked for persisting, register it
      // with the SparkContext for cleanups and accounting. Do this only once.
      // 如果 這是這個(gè)RDD第一次被標(biāo)記要持久化, 那么只執(zhí)行這一次
      if (storageLevel == StorageLevel.NONE) {
      sc.cleaner.foreach(_.registerRDDForCleanup(this))
      sc.persistRDD(this)
      }
      storageLevel = newLevel
      this
      }
      
      
  /**
 * Set this RDD's storage level to persist its values across operations after the first time it is computed. 
 * 在該RDD第一次被計(jì)算后, 根據(jù)它的存儲(chǔ)級(jí)別在操作間隙持久化它
 * This can only be used to assign a new storage level if the RDD does not
 * have a storage level set yet. Local checkpointing is an exception.
 * 除本地檢查點(diǎn)外這只能對(duì)新生的RDD進(jìn)行操作
 */
def persist(newLevel: StorageLevel): this.type = {
    // 如果是本地設(shè)置過檢查點(diǎn)
  if (isLocallyCheckpointed) {
    // This means the user previously called localCheckpoint(), which should have already marked this RDD for persisting.
    // 這意味著用戶之前調(diào)用過 localCheckpoint方法, 所以已經(jīng)標(biāo)記改RDD為已持久化操作過
    // Here we should override the old storage level with one that is explicitly requested by the user (after adapting it to use disk).
    // 在這里, 我們應(yīng)該用用戶顯示請(qǐng)求的存儲(chǔ)級(jí)別 來 覆蓋舊的存儲(chǔ)級(jí)別
    persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
  } else {
    persist(newLevel, allowOverride = false)
  }
}
  • checkPoint

    • 源碼

      /**
         * Performs the checkpointing of this RDD by saving this. 
         * 執(zhí)行 checkpointing 方法 來保存 RDD
         * It is called after a job using this RDD has completed (therefore the RDD has been materialized and potentially stored in memory).
         * 當(dāng)一個(gè)使用該RDD的任務(wù)完成后, 該方法會(huì)被調(diào)用(因?yàn)镽DD可能會(huì)被實(shí)例化 并 可能被存儲(chǔ)到內(nèi)存中) 
         * doCheckpoint() is called recursively on the parent RDDs.
         * doCheckpoint 方法會(huì)對(duì)父RDD遞歸執(zhí)行
         */
        private[spark] def doCheckpoint(): Unit = {
          RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
             // 如果 doCheckpoint沒有被調(diào)用過
            if (!doCheckpointCalled) {
              // 就需要調(diào)用
              doCheckpointCalled = true
              // 如果檢查點(diǎn)數(shù)據(jù)已經(jīng)被定義過了
              if (checkpointData.isDefined) {
                 // 如果 從此檢查點(diǎn)開始的所有祖先都被標(biāo)記為checkpointing(需要添加保存點(diǎn))
                if (checkpointAllMarkedAncestors) {
                  // TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint them in parallel.
                  // 我們可以收集所有需要添加檢查點(diǎn)的RDD, 并且并行的添加檢查點(diǎn)
                  // Checkpoint parents first because our lineage will be truncated after we checkpoint ourselves
                  // 優(yōu)先對(duì)保存點(diǎn)的父保存點(diǎn)執(zhí)行操作, 因?yàn)?RDD 的lineage(血緣) 會(huì)在添加保存點(diǎn)后被截?cái)?            dependencies.foreach(_.rdd.doCheckpoint())
                }
                checkpointData.get.checkpoint()
              } else {
                dependencies.foreach(_.rdd.doCheckpoint())
              }
            }
          }
        }
      
      // Whether to checkpoint all ancestor RDDs that are marked for checkpointing.
      // 是否對(duì)所有添加了checkpointing標(biāo)簽的 RDD的祖先添加保存點(diǎn)
      // By default, we stop as soon as we find the first such RDD, an optimization that allows us to write less data but is not safe for all workloads.
      // 默認(rèn)情況下, 我們只要找到一個(gè)這樣的RDD就會(huì)停止查詢, 這是一個(gè)能使我們寫入更少的數(shù)據(jù)的優(yōu)化, 但是, 這可能對(duì)所有的工作量來說是不安全的。
      // E.g. in streaming we may checkpoint both an RDD and its parent in every batch, in which case the parent may never be checkpointed and its lineage never truncated, leading to OOMs in the long run (SPARK-6847).
      // 比如在流中, 我們可能會(huì)在每一批中對(duì)一個(gè)RDD和它的父RDD都添加保存點(diǎn), 這可能會(huì)導(dǎo)致 父 RDD 的 保存點(diǎn)操作 一直失敗 但 它的 lineage(血緣) 缺一直沒被切斷, 這可能會(huì)在一次長(zhǎng)運(yùn)行中導(dǎo)致內(nèi)存用盡
      
      private val checkpointAllMarkedAncestors =
      Option(sc.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS)).exists(_.toBoolean)
      

關(guān)于存儲(chǔ)級(jí)別請(qǐng)查看: <a >StorageLevel</a>

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末蕴坪,一起剝皮案震驚了整個(gè)濱河市肴掷,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌背传,老刑警劉巖呆瞻,帶你破解...
    沈念sama閱讀 216,843評(píng)論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異径玖,居然都是意外死亡痴脾,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,538評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門梳星,熙熙樓的掌柜王于貴愁眉苦臉地迎上來赞赖,“玉大人滚朵,你說我怎么就攤上這事∏坝颍” “怎么了辕近?”我有些...
    開封第一講書人閱讀 163,187評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)匿垄。 經(jīng)常有香客問我移宅,道長(zhǎng),這世上最難降的妖魔是什么椿疗? 我笑而不...
    開封第一講書人閱讀 58,264評(píng)論 1 292
  • 正文 為了忘掉前任漏峰,我火速辦了婚禮,結(jié)果婚禮上届榄,老公的妹妹穿的比我還像新娘浅乔。我一直安慰自己,他們只是感情好铝条,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,289評(píng)論 6 390
  • 文/花漫 我一把揭開白布靖苇。 她就那樣靜靜地躺著,像睡著了一般班缰。 火紅的嫁衣襯著肌膚如雪顾复。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,231評(píng)論 1 299
  • 那天鲁捏,我揣著相機(jī)與錄音,去河邊找鬼萧芙。 笑死给梅,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的双揪。 我是一名探鬼主播动羽,決...
    沈念sama閱讀 40,116評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼渔期!你這毒婦竟也來了运吓?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,945評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤疯趟,失蹤者是張志新(化名)和其女友劉穎拘哨,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體信峻,經(jīng)...
    沈念sama閱讀 45,367評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡倦青,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,581評(píng)論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了盹舞。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片产镐。...
    茶點(diǎn)故事閱讀 39,754評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡隘庄,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出癣亚,到底是詐尸還是另有隱情丑掺,我是刑警寧澤,帶...
    沈念sama閱讀 35,458評(píng)論 5 344
  • 正文 年R本政府宣布述雾,位于F島的核電站街州,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏绰咽。R本人自食惡果不足惜菇肃,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,068評(píng)論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望取募。 院中可真熱鬧琐谤,春花似錦、人聲如沸玩敏。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,692評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)旺聚。三九已至织阳,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間砰粹,已是汗流浹背唧躲。 一陣腳步聲響...
    開封第一講書人閱讀 32,842評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留碱璃,地道東北人弄痹。 一個(gè)月前我還...
    沈念sama閱讀 47,797評(píng)論 2 369
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像嵌器,于是被迫代替她去往敵國(guó)和親肛真。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,654評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容