[Spark]Spark RDD&算子總結(jié)

一燎猛、RDD概述

  1. 什么是RDD
    • RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集重绷,是Spark中最基本的數(shù)據(jù)抽象昭卓,它代表一個(gè)不可變候醒、可分區(qū)倒淫、里面的元素可并行計(jì)算的集合敌土。RDD具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò)返干、位置感知性調(diào)度和可伸縮性矩欠。RDD允許用戶在執(zhí)行多個(gè)查詢時(shí)顯式地將工作集緩存在內(nèi)存中晚顷,后續(xù)的查詢能夠重用工作集该默,這極大地提升了查詢速度栓袖。
  2. RDD屬性


    image.png
    1. 一組分片(Partition)音榜,即數(shù)據(jù)集的基本組成單位赠叼。對(duì)于RDD來說嘴办,每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理涧郊,并決定并行計(jì)算的粒度妆艘。用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù)批旺,如果沒有指定朱沃,那么就會(huì)采用默認(rèn)值逗物。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。
    2. 一個(gè)計(jì)算每個(gè)分片的函數(shù)失暴。Spark中RDD的計(jì)算是以分片為單位的逗扒,每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù)以達(dá)到這個(gè)目的矩肩。compute函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合黍檩,不需要保存每次計(jì)算的結(jié)果刽酱。
    3. RDD之間的依賴關(guān)系棵里。RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD转唉,所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系赠法。在部分分區(qū)數(shù)據(jù)丟失時(shí)砖织,Spark可以通過這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù)侧纯,而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算眶熬。
    4. 一個(gè)Partitioner娜氏,即RDD的分片函數(shù)。當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù)绵疲,一個(gè)是基于哈希的HashPartitioner盔憨,另外一個(gè)是基于范圍的RangePartitioner郁岩。只有對(duì)于于key-value的RDD驯用,才會(huì)有Partitioner蝴乔,非key-value的RDD的Parititioner的值是None薇正。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量挖腰,也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量猴仑。
    5. 一個(gè)列表,存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)崖飘。對(duì)于一個(gè)HDFS文件來說朱浴,這個(gè)列表保存的就是每個(gè)Partition所在的塊的位置翰蠢。按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念,Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候趁尼,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置酥泞。
  3. 創(chuàng)建RDD
    1. 由一個(gè)已經(jīng)存在的Scala集合創(chuàng)建芝囤。
      val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
    2. 由外部存儲(chǔ)系統(tǒng)的數(shù)據(jù)集創(chuàng)建羡藐,包括本地的文件系統(tǒng)仆嗦,還有所有Hadoop支持的數(shù)據(jù)集,比如HDFS集绰、Cassandra栽燕、HBase等
      val rdd2 = sc.textFile("hdfs://hadoop141:8020/words.txt")
      
    3. 查看該rdd的分區(qū)數(shù)量纫谅,默認(rèn)是程序所分配的cpu core的數(shù)量,也可以在創(chuàng)建的時(shí)候指定:rdd1.partitions.length, 創(chuàng)建的時(shí)候指定分區(qū)數(shù)量:val rdd1 = sc.parallelize(Array(1,2,3.4),3)

二询吴、RDD編程API---包含兩種算子

  1. Transformation
    RDD中的所有轉(zhuǎn)換都是延遲加載的猛计,也就是說,它們并不會(huì)直接計(jì)算結(jié)果盗温。相反的,它們只是記住這些應(yīng)用到基礎(chǔ)數(shù)據(jù)集(例如一個(gè)文件)上的轉(zhuǎn)換動(dòng)作砚偶。只有當(dāng)發(fā)生一個(gè)要求返回結(jié)果給Driver的動(dòng)作時(shí)染坯,這些轉(zhuǎn)換才會(huì)真正運(yùn)行单鹿。這種設(shè)計(jì)讓Spark更加有效率地運(yùn)行。
  2. 常用的Transformation操作:
    • map(func):返回一個(gè)新的RDD昼窗,該RDD由每一個(gè)輸入的元素經(jīng)過func函數(shù)轉(zhuǎn)換后組成。
    • filter(func):返回一個(gè)新的RDD掸驱,該RDD由每一個(gè)輸入的元素經(jīng)過func函數(shù)計(jì)算后返回為true的輸入元素組成毕贼。
    • sortBy(func鬼癣,[ascending], [numTasks]):返回一個(gè)新的RDD,輸入元素經(jīng)過func函數(shù)計(jì)算后啤贩,按照指定的方式進(jìn)行排序待秃。(默認(rèn)方式為false,升序痹屹;true是降序)
      val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))
      val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x,true)
      val rdd3 = rdd2.filter(_>10)
      val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x+"",true)
      val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x.toString,true)
      
    • flatMap(func):類似于map章郁,但是每一個(gè)輸入元素可以被映射為0或多個(gè)輸出元素(所以func應(yīng)該返回一個(gè)序列志衍,而不是單一元素)暖庄。類似于先map,然后再flatten足画。
      val rdd4 = sc.parallelize(Array("a b c", "d e f", "h i j"))
      rdd4.flatMap(_.split(' ')).collect
      ------------------------------------------------------------------
      val rdd5 = sc.parallelize(List(List("a b c", "a b b"),List("e f g", "a f g"), List("h i j", "a a b")))
      rdd5.flatMap(_.flatMap(_.split(" "))).collect
      
    • union:求并集雄驹,注意類型要一致
    • intersection:求交集
    • distinct:去重
      val rdd6 = sc.parallelize(List(5,6,4,7))
      val rdd7 = sc.parallelize(List(1,2,3,4))
      val rdd8 = rdd6.union(rdd7)
      rdd8.distinct.sortBy(x=>x).collect
      --------------------------------------------
      val rdd9 = rdd6.intersection(rdd7)
      
    • join、leftOuterJoin淹辞、rightOuterJoin
      val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
      val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7)))
      --------------------------------------------------------------------------
      val rdd3 = rdd1.join(rdd2).collect
      rdd3: Array[(String, (Int, Int))] = Array((tom,(1,8)), (jerry,(2,9)))
      ---------------------------------------------------------------------------
      val rdd3 = rdd1.leftOuterJoin(rdd2).collect
      rdd3: Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(8))), (jerry,(2,Some(9))), (kitty,(3,None)))
      ---------------------------------------------------------------------------
      val rdd3 = rdd1.rightOuterJoin(rdd2).collect
      rdd3: Array[(String, (Option[Int], Int))] = Array((tom,(Some(1),8)), (jerry,(Some(2),9)), (shuke,(None,7)))
      
    • groupByKey([numTasks]):在一個(gè)(K,V)的RDD上調(diào)用医舆,返回一個(gè)(K, Iterator[V])的RDD----只針對(duì)數(shù)據(jù)是對(duì)偶元組的
      val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
      val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7)))
      val rdd3 = rdd1 union rdd2
      val rdd4 = rdd3.groupByKey.collect
      rdd4: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(8, 1)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)), (jerry,CompactBuffer(9, 2)))
      -----------------------------------------------------------------------------------
      val rdd5 = rdd4.map(x=>(x._1,x._2.sum))
      rdd5: Array[(String, Int)] = Array((tom,9), (shuke,7), (kitty,3), (jerry,11))
      
    • groupBy:傳入一個(gè)參數(shù)的函數(shù)俘侠,按照傳入的參數(shù)為key,返回一個(gè)新的RDD[(K, Iterable[T])]蔬将,value是所有可以相同的傳入數(shù)據(jù)組成的迭代器爷速。以下為源碼:
      /**
        * Return an RDD of grouped items. Each group consists of a key and a sequence of elements
        * mapping to that key. The ordering of elements within each group is not guaranteed, and
        * may even differ each time the resulting RDD is evaluated.
        *
        * @note This operation may be very expensive. If you are grouping in order to perform an
        * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
        * or `PairRDDFunctions.reduceByKey` will provide much better performance.
        */
      def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
        groupBy[K](f, defaultPartitioner(this))
      }
      
      具體代碼案例:
      scala> val rdd1=sc.parallelize(List(("a",1,2),("b",1,1),("a",4,5)))
      rdd1: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[47] at parallelize at <console>:24
      scala> rdd1.groupBy(_._1).collect
      res18: Array[(String, Iterable[(String, Int, Int)])] = Array((a,CompactBuffer((a,1,2), (a,4,5))), (b,CompactBuffer((b,1,1))))
      
    • reduceByKey(func,[numTasks]):在一個(gè)(K,V)的RDD上調(diào)用霞怀,返回一個(gè)(K,V)的RDD惫东,使用指定的reduce函數(shù),將相同key的值聚合到一起毙石,與groupByKey類似廉沮,reduce任務(wù)的個(gè)數(shù)可以通過第二個(gè)可選的參數(shù)來設(shè)置。
      val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
      val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7)))
      val rdd3 = rdd1 union rdd2
      val rdd6 = rdd3.reduceByKey(_+_).collect
      rdd6: Array[(String, Int)] = Array((tom,9), (shuke,7), (kitty,3), (jerry,11))
      
    • cogroup(otherDataset, [numTasks]):在類型為(K,V)和(K,W)的RDD上調(diào)用徐矩,返回一個(gè)(K,(Iterable<V>,Iterable<W>))類型的RDD
      val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
      val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
      val rdd3 = rdd1.cogroup(rdd2).collect
      rdd3: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((tom,(CompactBuffer(2, 1),CompactBuffer(1))), (jerry,(CompactBuffer(3),CompactBuffer(2))), (shuke,(CompactBuffer(),CompactBuffer(2))), (kitty,(CompactBuffer(2),CompactBuffer())))
      ----------------------------------------------------------------------------------------
      val rdd4 = rdd3.map(x=>(x._1,x._2._1.sum+x._2._2.sum))
      rdd4: Array[(String, Int)] = Array((tom,4), (jerry,5), (shuke,2), (kitty,2))
      
    • cartesian(otherDataset )笛卡爾積
      val rdd1 = sc.parallelize(List("tom", "jerry"))
      val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"))
      val rdd3 = rdd1.cartesian(rdd2).collect
      rdd3: Array[(String, String)] = Array((tom,tom), (tom,kitty), (tom,shuke), (jerry,tom), (jerry,kitty), (jerry,shuke))
      
  3. Action


    image.png

三滞时、RDD編程----高級(jí)API

  1. mapPartitions:針對(duì)每個(gè)分區(qū)進(jìn)行操作,源碼如下:要求傳入一個(gè)Iterator滤灯,并且返回一個(gè)Iterator
    /**
      * Return a new RDD by applying a function to each partition of this RDD.
      *
      * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
      * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
      */
    def mapPartitions[U: ClassTag](
        f: Iterator[T] => Iterator[U],
        preservesPartitioning: Boolean = false): RDD[U] = withScope {
      val cleanedF = sc.clean(f)
      new MapPartitionsRDD(
        this,
        (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
        preservesPartitioning)
    }
    
    mapPartitionsWithIndex:針對(duì)每個(gè)partition操作坪稽,把每個(gè)partition中的分區(qū)號(hào)和對(duì)應(yīng)的值拿出來。是Transformation
    源碼:
    /**
    * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
    * of the original partition.
    *
    * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
    * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
    preservesPartitioning表示返回RDD是否留有分區(qū)器鳞骤。僅當(dāng)RDD為K-V型RDD窒百,且key沒有被修飾的情況下,可設(shè)為true豫尽。非K-V型RDD一般不存在分區(qū)器篙梢;K-V RDD key被修改后,元素將不再滿足分區(qū)器的分區(qū)要求美旧。這些情況下庭猩,須設(shè)為false,表示返回的RDD沒有被分區(qū)器分過區(qū)陈症。
    */
    def mapPartitionsWithIndex[U: ClassTag](-------要求傳入一個(gè)函數(shù)
        f: (Int, Iterator[T]) => Iterator[U],------函數(shù)要求傳入兩個(gè)參數(shù)
        preservesPartitioning: Boolean = false): RDD[U] = withScope {
      val cleanedF = sc.clean(f)
      new MapPartitionsRDD(
        this,
        (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
        preservesPartitioning)
    }
    
    代碼實(shí)例:
    1. 首先自定義一個(gè)函數(shù)蔼水,符合mapPartitionsWithIndex參數(shù)要求的函數(shù)
    scala> val func = (index : Int,iter : Iterator[Int]) => {
        | iter.toList.map(x=>"[PartID:" + index + ",val:" + x + "]").iterator
        | }
    func: (Int, Iterator[Int]) => Iterator[String] = <function2>
    2. 定義一個(gè)算子,分區(qū)數(shù)為2
    scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
    3. 調(diào)用方法录肯,傳入自定義的函數(shù)
    scala> rdd1.mapPartitionsWithIndex(func).collect
    res0: Array[String] = Array([PartID:0,val:1], [PartID:0,val:2], [PartID:0,val:3], [PartID:0,val:4], [PartID:1,val:5], [PartID:1,val:6], [PartID:1,val:7], [PartID:1,val:8], [PartID:1,val:9])
    
  2. aggregate:聚合操作趴腋,是Action
    • 源碼
      /**
      * Aggregate the elements of each partition, and then the results for all the partitions, using
      * given combine functions and a neutral "zero value". This function can return a different result
      * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
      * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
      * allowed to modify and return their first argument instead of creating a new U to avoid memory
      * allocation.
      * 將RDD中元素聚集,須提供0初值(因?yàn)槔鄯e元素论咏,所有要提供累積的初值)优炬。先在分區(qū)內(nèi)依照seqOp函數(shù)
      * 聚集元素(把T類型元素聚集為U類型的分區(qū)“結(jié)果”),再在分區(qū)間按照combOp函數(shù)聚集分區(qū)計(jì)算結(jié)果厅贪,最后返回這個(gè)結(jié)果
      *
      * @param zeroValue the initial value for the accumulated result of each partition for the
      *                  `seqOp` operator, and also the initial value for the combine results from
      *                  different partitions for the `combOp` operator - this will typically be the
      *                  neutral element (e.g. `Nil` for list concatenation or `0` for summation)
      * @param seqOp an operator used to accumulate results within a partition
      * @param combOp an associative operator used to combine results from different partitions
      * 第一個(gè)參數(shù)是初始值, 第二個(gè)參數(shù):是兩個(gè)函數(shù)[每個(gè)函數(shù)都是2個(gè)參數(shù)(第一個(gè)參數(shù):先對(duì)個(gè)個(gè)分區(qū)進(jìn)行合并, 
      * 第二個(gè):對(duì)個(gè)個(gè)分區(qū)合并后的結(jié)果再進(jìn)行合并), 輸出一個(gè)參數(shù)]
      */
      def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
        // Clone the zero value since we will also be serializing it as part of tasks
        var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
        val cleanSeqOp = sc.clean(seqOp)
        val cleanCombOp = sc.clean(combOp)
        val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
        val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
        sc.runJob(this, aggregatePartition, mergeResult)
        jobResult
      }
      
    • 代碼實(shí)例:
      scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
      rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
      //這里先對(duì)連個(gè)分區(qū)分別進(jìn)行相加蠢护,然后兩個(gè)的分區(qū)相加后的結(jié)果再相加得出最后的結(jié)果
      scala> rdd1.aggregate(0)(_+_,_+_)
      res0: Int = 45                                                                 
      //先對(duì)每個(gè)分區(qū)比較求出最大值,然后每個(gè)分區(qū)求出的最大值再相加得出最后的結(jié)果
      scala> rdd1.aggregate(0)(math.max(_,_),_+_)
      res1: Int = 13
      //這里需要注意养涮,初始值是每次都要參與運(yùn)算的葵硕,例如下面的代碼:分區(qū)1是1,2,3,4眉抬;初始值為5,則他們比較最大值就是5懈凹,分區(qū)2是5,6,7,8,9蜀变;初始值為5,則他們比較結(jié)果最大值就是9介评;然后再相加库北,這里初始值也要參與運(yùn)算,5+(5+9)=19
      scala> rdd1.aggregate(5)(math.max(_,_),_+_)
      res0: Int = 19
      -----------------------------------------------------------------------------------------------
      scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
      rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
      //這里需要注意们陆,由于每個(gè)分區(qū)計(jì)算是并行計(jì)算寒瓦,所以計(jì)算出的結(jié)果有先后順序,所以結(jié)果會(huì)出現(xiàn)兩種情況:如下
      scala> rdd2.aggregate("")(_+_,_+_)
      res0: String = defabc                                                                                                                    
      scala> rdd2.aggregate("")(_+_,_+_)
      res2: String = abcdef
      //這里的例子更能說明上面提到的初始值參與計(jì)算的問題坪仇,我們可以看到初始值=號(hào)參與了三次計(jì)算
      scala> rdd2.aggregate("=")(_+_,_+_)
      res0: String = ==def=abc
      --------------------------------------------------------------------------------------
      scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
      rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24
      scala> rdd3.aggregate("")((x,y)=>math.max(x.length,y.length).toString,_+_)
      res1: String = 42                                                               
      scala> rdd3.aggregate("")((x,y)=>math.max(x.length,y.length).toString,_+_)
      res3: String = 24
      -------------------------------------------------------------------------------------------
      scala> val rdd4 = sc.parallelize(List("12","23","345",""),2)
      rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24
      //這里需要注意:第一個(gè)分區(qū)加上初始值元素為"","12","23",兩兩比較孵构,最小的長(zhǎng)度為1;第二個(gè)分區(qū)加上初始值元素為"","345","",兩兩比較烟很,最小的長(zhǎng)度為0
      scala> rdd4.aggregate("")((x,y)=>math.min(x.length,y.length).toString,_+_)
      res4: String = 10                                                               
      scala> rdd4.aggregate("")((x,y)=>math.min(x.length,y.length).toString,_+_)
      res9: String = 01                                                               
      ------------------------------------------------------------------------------------
      //注意與上面的例子的區(qū)別,這里定義的rdd里的元素的順序跟上面不一樣蜡镶,導(dǎo)致結(jié)果不一樣
      scala> val rdd5 = sc.parallelize(List("12","23","","345"),2)
      rdd5: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
      scala> rdd5.aggregate("")((x,y)=>math.min(x.length,y.length).toString,(x,y)=>x+y)
      res1: String = 11 
      
  3. aggregateByKey:按照key值進(jìn)行聚合
    //定義RDD
    scala> val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
    pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24
    //自定義方法雾袱,用于傳入mapPartitionsWithIndex
    scala> val func=(index:Int,iter:Iterator[(String, Int)])=>{
        | iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
        | }
    func: (Int, Iterator[(String, Int)]) => Iterator[String] = <function2>
    //查看分區(qū)情況
    scala> pairRDD.mapPartitionsWithIndex(func).collect
    res2: Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)], [partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)])
    //注意:初始值為0和其他值的區(qū)別
    scala> pairRDD.aggregateByKey(0)(_+_,_+_).collect
    res4: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))               
    
    scala> pairRDD.aggregateByKey(10)(_+_,_+_).collect
    res5: Array[(String, Int)] = Array((dog,22), (cat,39), (mouse,26))
    //下面三個(gè)的區(qū)別:,第一個(gè)比較好理解官还,由于初始值為0芹橡,所以每個(gè)分區(qū)輸出不同動(dòng)物中個(gè)數(shù)最多的那個(gè),然后在累加
    scala> pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect
    res6: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
    
    //下面兩個(gè):由于有初始值望伦,就需要考慮初始值參與計(jì)算林说,這里第一個(gè)分區(qū)的元素為("cat",2), ("cat", 5), ("mouse", 4),初始值是10屯伞,不同動(dòng)物之間兩兩比較value的大小腿箩,都需要將初始值加入比較,所以第一個(gè)分區(qū)輸出為("cat", 10), ("mouse", 10)劣摇;第二個(gè)分區(qū)同第一個(gè)分區(qū)珠移,輸出結(jié)果為(dog,12), (cat,12), (mouse,10);所以最后累加的結(jié)果為(dog,12), (cat,22), (mouse,20)末融,注意最后的對(duì)每個(gè)分區(qū)結(jié)果計(jì)算的時(shí)候钧惧,初始值不參與計(jì)算
    scala> pairRDD.aggregateByKey(10)(math.max(_,_),_+_).collect
    res7: Array[(String, Int)] = Array((dog,12), (cat,22), (mouse,20))
    //這個(gè)和上面的類似
    scala> pairRDD.aggregateByKey(100)(math.max(_,_),_+_).collect
    res8: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
    
  4. coalesce:返回一個(gè)新的RDD
    重新給RDD的元素分區(qū)。
    當(dāng)適當(dāng)縮小分區(qū)數(shù)時(shí)勾习,如1000->100浓瞪,spark會(huì)把之前的10個(gè)分區(qū)當(dāng)作一個(gè)分區(qū),并行度變?yōu)?00巧婶,不會(huì)引起數(shù)據(jù)shuffle乾颁。
    當(dāng)嚴(yán)重縮小分區(qū)數(shù)時(shí)涂乌,如1000->1,運(yùn)算時(shí)的并行度會(huì)變成1钮孵。為了避免并行效率低下問題骂倘,可將shuffle設(shè)為true。shuffle之前的運(yùn)算和之后的運(yùn)算分為不同stage巴席,它們的并行度分別為1000,1历涝。
    當(dāng)把分區(qū)數(shù)增大時(shí),必會(huì)存在shuffle漾唉,shuffle須設(shè)為true荧库。
    partitionBy:按照傳入的參數(shù)進(jìn)行分區(qū),傳入的參數(shù)為分區(qū)的實(shí)例對(duì)象赵刑,可以傳入之定義分區(qū)的實(shí)例或者默認(rèn)的HashPartitioner;源碼如下:
    /**
    * Return a copy of the RDD partitioned using the specified partitioner.
    */
    def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
    if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    if (self.partitioner == Some(partitioner)) {
        self
    } else {
        new ShuffledRDD[K, V, V](self, partitioner)
    }
    }
    
    repartition:返回一個(gè)新的RDD
    按指定分區(qū)數(shù)重新分區(qū)RDD分衫,存在shuffle。
    當(dāng)指定的分區(qū)數(shù)比當(dāng)前分區(qū)數(shù)目少時(shí)般此,考慮使用coalesce蚪战,這樣能夠避免shuffle。
    scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
    
    scala> val rdd2 = rdd1.repartition(6)
    rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at repartition at <console>:26
    
    scala> rdd2.partitions.length
    res0: Int = 6
    
    scala> val rdd3 = rdd2.coalesce(2,true)
    rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at coalesce at <console>:28
    
    scala> rdd3.partitions.length
    res1: Int = 2
    
  5. collectAsMap:將RDD轉(zhuǎn)換成Map(注意RDD的數(shù)據(jù)應(yīng)為對(duì)偶元組)
    scala> val rdd1 = sc.parallelize(List(("a", 1), ("b", 2),("c", 2),("d", 4),("e", 1)))
    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[9] at parallelize at <console>:24
    scala> rdd1.collectAsMap
    res3: scala.collection.Map[String,Int] = Map(e -> 1, b -> 2, d -> 4, a -> 1, c -> 2)
    
  6. combineByKey:和reduceByKey的效果相同铐懊,reduceByKey底層就是調(diào)用combineByKey
    • 源碼
      /**
      * Generic function to combine the elements for each key using a custom set of aggregation
      * functions. This method is here for backward compatibility. It does not provide combiner
      * classtag information to the shuffle.
      *
      * @see [[combineByKeyWithClassTag]]
      */
      def combineByKey[C](
          createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C,
          partitioner: Partitioner,
          mapSideCombine: Boolean = true,
          serializer: Serializer = null): RDD[(K, C)] = self.withScope {
      combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
          partitioner, mapSideCombine, serializer)(null)
      }
      
      /**
      * Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
      * This method is here for backward compatibility. It does not provide combiner
      * classtag information to the shuffle.
      *
      * @see [[combineByKeyWithClassTag]]
      */
      def combineByKey[C](
          createCombiner: V => C,
          mergeValue: (C, V) => C,
          mergeCombiners: (C, C) => C,
          numPartitions: Int): RDD[(K, C)] = self.withScope {
            combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
      }
      
    • 參數(shù)說明:
      1. 第一個(gè)參數(shù)createCombiner: V => C:生成合并器邀桑,每組key,取出第一個(gè)value的值科乎,然后返回你想合并的類型壁畸。
      2. 第二個(gè)參數(shù)mergeValue: (C, V) => C:函數(shù)壁顶,局部計(jì)算
      3. 第三個(gè)參數(shù)mergeCombiners: (C, C) => C:函數(shù)碌尔,對(duì)局部計(jì)算的結(jié)果再進(jìn)行計(jì)算
    • 代碼實(shí)例
      //首先聲明兩個(gè)rdd,然后利用zip將兩個(gè)rdd合并成一個(gè)淹仑,rdd6
      scala> val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
      rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[21] at parallelize at <console>:24
      
      scala> val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
      rdd5: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:24
      
      scala> val rdd6 = rdd5.zip(rdd4)
      rdd6: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[23] at zip at <console>:28
      
      scala> rdd6.collect
      res6: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))
      
      //我們需要將按照key進(jìn)行分組合并空闲,相同的key的value都放在List中
      //這里我們第一個(gè)參數(shù)List(_):表示將第一個(gè)value取出放進(jìn)集合中
      //第二個(gè)參數(shù)(x:List[String],y:String)=>x :+ y:表示局部計(jì)算令杈,將value加入到List中
      //第三個(gè)參數(shù)(m:List[String],n:List[String])=>m++n:表示對(duì)局部的計(jì)算結(jié)果再進(jìn)行計(jì)算
      
      scala> val rdd7 = rdd6.combineByKey(List(_),(x:List[String],y:String)=>x :+ y,(m:List[String],n:List[String])=>m++n)
      rdd7: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[24] at combineByKey at <console>:30
      
      scala> rdd7.collect
      res7: Array[(Int, List[String])] = Array((1,List(dog, cat, turkey)), (2,List(wolf, bear, bee, salmon, rabbit, gnu)))
      
      //這里第一個(gè)參數(shù),可以有另外的寫法碴倾。如下面的兩個(gè)
      scala> val rdd7 = rdd6.combineByKey(_::List(),(x:List[String],y:String)=>x :+ y,(m:List[String],n:List[String])=>m++n).collect
      rdd7: Array[(Int, List[String])] = Array((1,List(turkey, dog, cat)), (2,List(wolf, bear, bee, gnu, salmon, rabbit)))
      
      scala> val rdd7 = rdd6.combineByKey(_::Nil,(x:List[String],y:String)=>x :+ y,(m:List[String],n:List[String])=>m++n).collect
      rdd7: Array[(Int, List[String])] = Array((1,List(turkey, dog, cat)), (2,List(wolf, bear, bee, gnu, salmon, rabbit)))
      
  7. countByKey这揣、countByValue:按照key或者value計(jì)算出現(xiàn)的次數(shù)
scala> val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[27] at parallelize at <console>:24
scala> rdd1.countByKey
res8: scala.collection.Map[String,Long] = Map(a -> 1, b -> 2, c -> 2)          
scala> rdd1.countByValue
res9: scala.collection.Map[(String, Int),Long] = Map((c,2) -> 1, (a,1) -> 1, (b,2) -> 2, (c,1) -> 1)
  1. filterByRange
scala> val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1),("b",6)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24
//注意:這里傳入的參數(shù),是左閉右閉的區(qū)間
scala> val rdd2 = rdd1.filterByRange("b","d")
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[34] at filterByRange at <console>:26
scala> rdd2.collect
res10: Array[(String, Int)] = Array((c,3), (d,4), (c,2), (b,6))
  1. flatMapValues:對(duì)values進(jìn)行處理影斑,類似flatMap给赞,會(huì)將key和每一個(gè)分出來的value組成映射
scala> val rdd3 = sc.parallelize(List(("a", "1 2"), ("b", "3 4")))
rdd3: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[35] at parallelize at <console>:24
scala> val rdd4 = rdd3.flatMapValues(_.split(" "))
rdd4: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[36] at flatMapValues at <console>:26
scala> rdd4.collect
res11: Array[(String, String)] = Array((a,1), (a,2), (b,3), (b,4))

mapValues:不改變key,只針對(duì)傳入的鍵值對(duì)的value進(jìn)行計(jì)算矫户,類似于map片迅;注意與上面的flatMapValues的區(qū)別,它不會(huì)改變傳入的key-value對(duì)皆辽,只是將value按照傳入的函數(shù)進(jìn)行處理柑蛇;

scala> val rdd3 = sc.parallelize(List(("a",(1,2)),("b",(2,4))))
rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ParallelCollectionRDD[57] at parallelize at <console>:24
scala> rdd3.mapValues(x=>x._1 + x._2).collect
res34: Array[(String, Int)] = Array((a,3), (b,6))
------------------------------------------------------------------------
如果使用flatMapValues芥挣,結(jié)果如下,它將value全部拆開跟key組成映射
scala> rdd3.flatMapValues(x=>x + "").collect
res36: Array[(String, Char)] = Array((a,(), (a,1), (a,,), (a,2), (a,)), (b,(), (b,2), (b,,), (b,4), (b,)))
  1. foldByKey:根據(jù)key分組耻台,對(duì)每一組的value進(jìn)行計(jì)算
scala> val rdd1 = sc.parallelize(List("dog", "wolf", "cat", "bear"), 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[37] at parallelize at <console>:24
scala> val rdd2 = rdd1.map(x=>(x.length,x))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[38] at map at <console>:26
scala> rdd2.collect
res12: Array[(Int, String)] = Array((3,dog), (4,wolf), (3,cat), (4,bear))
-----------------------------------------------------------------------------
scala> val rdd3 = rdd2.foldByKey("")(_+_)
rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[39] at foldByKey at <console>:28
scala> rdd3.collect
res13: Array[(Int, String)] = Array((4,bearwolf), (3,dogcat))
scala> val rdd3 = rdd2.foldByKey(" ")(_+_).collect
rdd3: Array[(Int, String)] = Array((4," bear wolf"), (3," dog cat"))
-----------------------------------------------------------------------------
//進(jìn)行wordcout的計(jì)算
val rdd = sc.textFile("hdfs://node-1.itcast.cn:9000/wc").flatMap(_.split(" ")).map((_, 1))
rdd.foldByKey(0)(_+_)
  1. keyBy:以傳入的參數(shù)作為key空免,生成新的RDD
scala> val rdd1 = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[41] at parallelize at <console>:24
scala> val rdd2 = rdd1.keyBy(_.length)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[42] at keyBy at <console>:26
scala> rdd2.collect
res14: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))
  1. keys、values:取出rdd的key或者value盆耽,生成新的RDD
scala> val rdd1 = sc.parallelize(List(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[43] at parallelize at <console>:24
scala> rdd1.keys.collect
res16: Array[String] = Array(e, c, d, c, a)
scala> rdd1.values.collect
res17: Array[Int] = Array(5, 3, 4, 2, 1)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末蹋砚,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子摄杂,更是在濱河造成了極大的恐慌坝咐,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,723評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件析恢,死亡現(xiàn)場(chǎng)離奇詭異墨坚,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)映挂,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,485評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門泽篮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人柑船,你說我怎么就攤上這事帽撑。” “怎么了椎组?”我有些...
    開封第一講書人閱讀 152,998評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)历恐。 經(jīng)常有香客問我寸癌,道長(zhǎng),這世上最難降的妖魔是什么弱贼? 我笑而不...
    開封第一講書人閱讀 55,323評(píng)論 1 279
  • 正文 為了忘掉前任蒸苇,我火速辦了婚禮,結(jié)果婚禮上吮旅,老公的妹妹穿的比我還像新娘溪烤。我一直安慰自己,他們只是感情好庇勃,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,355評(píng)論 5 374
  • 文/花漫 我一把揭開白布檬嘀。 她就那樣靜靜地躺著,像睡著了一般责嚷。 火紅的嫁衣襯著肌膚如雪鸳兽。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,079評(píng)論 1 285
  • 那天罕拂,我揣著相機(jī)與錄音揍异,去河邊找鬼全陨。 笑死,一個(gè)胖子當(dāng)著我的面吹牛衷掷,可吹牛的內(nèi)容都是我干的辱姨。 我是一名探鬼主播,決...
    沈念sama閱讀 38,389評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼戚嗅,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼雨涛!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起渡处,我...
    開封第一講書人閱讀 37,019評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤镜悉,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后医瘫,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體侣肄,經(jīng)...
    沈念sama閱讀 43,519評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,971評(píng)論 2 325
  • 正文 我和宋清朗相戀三年醇份,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了稼锅。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,100評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡僚纷,死狀恐怖矩距,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情怖竭,我是刑警寧澤锥债,帶...
    沈念sama閱讀 33,738評(píng)論 4 324
  • 正文 年R本政府宣布,位于F島的核電站痊臭,受9級(jí)特大地震影響哮肚,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜广匙,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,293評(píng)論 3 307
  • 文/蒙蒙 一允趟、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧鸦致,春花似錦潮剪、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,289評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至绽乔,卻和暖如春改含,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,517評(píng)論 1 262
  • 我被黑心中介騙來泰國打工捍壤, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留骤视,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,547評(píng)論 2 354
  • 正文 我出身青樓鹃觉,卻偏偏與公主長(zhǎng)得像专酗,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子盗扇,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,834評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容