Transformation轉(zhuǎn)換算子之Value類型

定義SparkContext

  val conf=new SparkConf().setMaster("local[4]").setAppName("test")
  val sc=new SparkContext(conf)

Map算子

  @Test
  def mapTest(): Unit ={
    // 創(chuàng)建本地集合 RDD ,最小分區(qū)數(shù)為4
    val list=List(1,2,3,4,5,6,7,8)
    val rdd1=sc.makeRDD(list,4)

    // 使用 map 算子
    val mapRdd: RDD[Unit] = rdd1.map(i => {
      println(i)
    })

    // 調(diào)用
    mapRdd.collect()
  }

運(yùn)行結(jié)果疲眷;以為是分區(qū)計(jì)算,所以結(jié)果是無序的柔滔。

5
3
7
1
8
4
6
2

查看每個(gè)分區(qū)處理哪些數(shù)據(jù)

rdd1.mapPartitionsWithIndex((index,it)=>{
    println(s"index=$index data=${it.toList}")
    it
}).collect

運(yùn)行結(jié)果

index=3 data=List(7, 8)
index=0 data=List(1, 2)
index=2 data=List(5, 6)
index=1 data=List(3, 4)

獲取map算子返回

  @Test
  def mapTest(): Unit ={
    // 創(chuàng)建本地集合 RDD ,最小分區(qū)數(shù)為4
    val list=List(1,2,3,4,5,6,7,8)
    val rdd1=sc.makeRDD(list,4)

    // 使用 map 算子
    val mapRdd: RDD[Int] = rdd1.map(i =>i*i)

    // 調(diào)用
    println(mapRdd.collect().toList)
  }
List(1, 4, 9, 16, 25, 36, 49, 64)

其基本操作和scala的map一致泡徙。
map 算子 源碼

 def map[U: ClassTag](f: T => U): RDD[U] = withScope {
   // clean 用于清理函數(shù)閉包,以為閉包無法進(jìn)行序列化葛假。
    val cleanF = sc.clean(f)
   // 創(chuàng)建 MapPartitionsRDD 對象;
   // this:創(chuàng)建新的RDD時(shí)綁定當(dāng)前RDD的依賴關(guān)系
   // 需要傳入一個(gè)函數(shù)滋恬,有三個(gè)參數(shù)
   // 第一個(gè)_:指定TaskContext 
   // 第二個(gè)_:指定TaskContext 
   // iter:當(dāng)前分區(qū)的迭代器(內(nèi)容如下)聊训;然后調(diào)用scala中的map而不是spark的map算子。
   //index=3 data=List(7, 8)
   //index=0 data=List(1, 2)
   //index=2 data=List(5, 6)
   //index=1 data=List(3, 4)
    new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
  }

思考一個(gè)問題恢氯?map算子并沒有指定分區(qū)带斑,但是卻是還是4個(gè)分區(qū)?
首先 map的數(shù)據(jù)來源于rdd1;rdd1指定了分區(qū)勋拟。

val rdd1=sc.makeRDD(list,4)

然后map綁定當(dāng)前rdd的關(guān)聯(lián)關(guān)系

// 由rdd1 調(diào)用
val mapRdd: RDD[Int] = rdd1.map(i =>i*i)
// 返回一個(gè)新的 rdd時(shí)綁定了當(dāng)前的rdd 也就是this
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))

最重要的還是 MapPartitionsRDD 繼承的RDD就是rdd1 ,可以通過 prev看到勋磕。

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( 
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U], 
    其他參數(shù)
) extends RDD[U](prev) {

所以map算子的分區(qū)大小是其父類指定的分區(qū)大小。


mapPartitions 算子

案例:使用mapPartitions指黎,通過id查詢用戶信息

  @Test
  def mysqlQueryByMapPartitions(): Unit ={
    // 讀取文件
    val lines=sc.textFile("file:///C:/Users/123456/Desktop/userId.txt",4)

    val arr=lines.mapPartitions(it => {
      // 加載驅(qū)動(dòng)類
      Class.forName("com.mysql.jdbc.Driver")

      //連接數(shù)據(jù)庫
      val connection= DriverManager.getConnection("jdbc:mysql://hadoop102:3306/gmall?characterEncoding=UTF-8", "root", "123321")

      val statement = connection.prepareStatement("select * from user_info where id=?")

      try{
        while (it.hasNext) {
          //主鍵id
          val id: String = it.next()
          // 設(shè)置參數(shù)
          statement.setInt(1,id.toInt)
          // 執(zhí)行查詢
          val result: ResultSet = statement.executeQuery()

          while (result.next()) {
            val id = result.getInt("id")
            val nickName = result.getString("nick_name")
            val name = result.getString("name")
            println(s"id=$id,nickName=$nickName,name=$name")
          }
        }

      }catch {
        case e:Exception => {
          println("數(shù)據(jù)庫查詢失敗")
          println(e.printStackTrace())
        }
      }finally {
        //關(guān)閉資源
        if(statement!=null) statement.close()
        if(connection!=null) connection.close()
      }
      it
    }).collect()

    println(arr.toList)

  }

我是將id 一行行存儲(chǔ)到該文件中的朋凉。數(shù)據(jù)量不大,一百個(gè)id醋安。

val lines=sc.textFile("file:///C:/Users/123456/Desktop/userId.txt",4)

數(shù)據(jù)就不復(fù)制出來了杂彭,就看看數(shù)據(jù)庫資源關(guān)閉了幾次∠啪荆總共有四個(gè)分區(qū)亲怠,所以最終數(shù)據(jù)庫關(guān)閉了4次。

關(guān)閉數(shù)據(jù)庫資源....
關(guān)閉數(shù)據(jù)庫資源....
關(guān)閉數(shù)據(jù)庫資源....
關(guān)閉數(shù)據(jù)庫資源....

mapPartitions換成 map

@Test
  def mysqlQueryByMap(): Unit ={
    // 讀取文件
    val lines=sc.textFile("file:///C:/Users/123456/Desktop/userId.txt",4)

    val arr=lines.map(id => {
      // 加載驅(qū)動(dòng)類
      Class.forName("com.mysql.jdbc.Driver")

      //連接數(shù)據(jù)庫
      val connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/gmall?characterEncoding=UTF-8", "root", "123321")

      val statement = connection.prepareStatement("select * from user_info where id=?")

      try {
        // 設(shè)置參數(shù)
        statement.setInt(1, id.toInt)
        // 執(zhí)行查詢
        val result: ResultSet = statement.executeQuery()

        while (result.next()) {
          val id = result.getInt("id")
          val nickName = result.getString("nick_name")
          val name = result.getString("name")
          println(s"id=$id,nickName=$nickName,name=$name")
        }
      }catch {
        case e:Exception => {
          println("數(shù)據(jù)庫查詢失敗")
          println(e.printStackTrace())
        }
      }finally {
        println("關(guān)閉數(shù)據(jù)庫資源....")
        //關(guān)閉資源
        if(statement!=null) statement.close()
        if(connection!=null) connection.close()
      }
    }).collect()
  }

數(shù)據(jù)庫的建立與銷毀來來回回一百次(可以自己試試柠辞。

關(guān)閉數(shù)據(jù)庫資源....
關(guān)閉數(shù)據(jù)庫資源....
關(guān)閉數(shù)據(jù)庫資源....
關(guān)閉數(shù)據(jù)庫資源....
關(guān)閉數(shù)據(jù)庫資源....
關(guān)閉數(shù)據(jù)庫資源....
關(guān)閉數(shù)據(jù)庫資源....
...

數(shù)據(jù)量少所以团秽,沒有問題,但是若數(shù)據(jù)不是一百而是上百萬叭首,千萬呢习勤?肯定是不行的。
可能將資源鏈接丟入map中才會(huì)造成這樣的原因焙格。
如果把connection 提出去會(huì)怎么樣图毕?

  @Test
  def mysqlQueryByMap(): Unit ={
    // 讀取文件
    val lines=sc.textFile("file:///C:/Users/123456/Desktop/userId.txt",4)

    //連接數(shù)據(jù)庫
    val connection = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/gmall?characterEncoding=UTF-8", "root", "123321")

    val statement = connection.prepareStatement("select * from user_info where id=?")


    try {

      val arr=lines.map(id => {
        // 加載驅(qū)動(dòng)類
        Class.forName("com.mysql.jdbc.Driver")
        // 設(shè)置參數(shù)
        statement.setInt(1, id.toInt)
        // 執(zhí)行查詢
        val result: ResultSet = statement.executeQuery()

        while (result.next()) {
          val id = result.getInt("id")
          val nickName = result.getString("nick_name")
          val name = result.getString("name")
          println(s"id=$id,nickName=$nickName,name=$name")
        }

      }).collect()

    }catch {
      case e:Exception => {
        println("數(shù)據(jù)庫查詢失敗")
        println(e.printStackTrace())
      }
    }finally {
      println("關(guān)閉數(shù)據(jù)庫資源....")
      //關(guān)閉資源
      if(statement!=null) statement.close()
      if(connection!=null) connection.close()
    }
  }

報(bào)了一個(gè)錯(cuò),該錯(cuò)誤的原因是jdbc眷唉,并沒有實(shí)現(xiàn)序列化予颤,無法進(jìn)行傳輸。

org.apache.spark.SparkException: Task not serializable

該案例除了說明 mapmapPartitions 的區(qū)別外冬阳,更想表達(dá)的意思是蛤虐。
rdd 會(huì)將數(shù)據(jù)進(jìn)行分區(qū),每個(gè)分區(qū)的計(jì)算邏輯或數(shù)據(jù)可能不在同一個(gè)節(jié)點(diǎn)上肝陪。即使是local模式驳庭,分區(qū)之間也是并行處理。


mapPartitions 與 map 的區(qū)別:

  1. map里面的函數(shù)是針對分區(qū)里面的每個(gè)元素進(jìn)行計(jì)算氯窍,mapPartitions里面的函數(shù)是針對每個(gè)分區(qū)的所有數(shù)據(jù)的迭代器進(jìn)行計(jì)算
  2. map里面的函數(shù)是計(jì)算一個(gè)元素返回一個(gè)結(jié)果,所以map生成的新的RDD里面的元素個(gè)數(shù) = 原來RDD元素個(gè)數(shù)
    mapPartitions里面的函數(shù)是計(jì)算一個(gè)分區(qū)的所有數(shù)據(jù)的迭代器然后返回一個(gè)新的迭代器,所以mapPartitions生成的新的RDD里面的元素的個(gè)數(shù)與原來RDD元素個(gè)數(shù)可能不相同
  3. map是針對單個(gè)元素操作,元素操作完成之后可以進(jìn)行回收內(nèi)存
    mapPartitions是針對一個(gè)迭代器操作嚷掠,操作完成迭代器一個(gè)元素之后,該元素不能回收必須等到整個(gè)迭代器都處理完成之后才能回收捏检。如果一個(gè)分區(qū)中數(shù)據(jù)量很大,可能導(dǎo)致內(nèi)存溢出。如果出現(xiàn)內(nèi)存溢出可以用map代替不皆」岢牵【完成比完美更重要】

mapPartitions源碼賞析

  // f:傳入一個(gè)函數(shù),參為迭代器
  // preservesPartitioning:是否保留分區(qū)霹娄,默認(rèn)為false
  def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] = withScope {
    // 清除閉包能犯,保證數(shù)據(jù)可以進(jìn)行序列化傳輸
    val cleanedF = sc.clean(f)
    // 創(chuàng)建 MapPartitionsRDD
    // this 綁定當(dāng)前RDD,
    // iter 迭代器
    new MapPartitionsRDD(this,(_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter),preservesPartitioning)
  }

點(diǎn)擊進(jìn)入MapPartitionsRDD 它會(huì)執(zhí)行一個(gè)compute函數(shù)

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

mapPartitionsWithIndex

和 mapPartitions類似,但是它可以指定分區(qū)號

  @Test
  def mapPartitionsWithIndexTest(): Unit ={
    val list=List(1,2,3,4,5,6,7,8)
    val rdd1=sc.makeRDD(list,4)
    rdd1.mapPartitionsWithIndex((index,it)=>{
      while (it.hasNext) {
        println(s"index=$index, ${it.next()}")
      }
      it
    }).collect()
  }

結(jié)果

index=2, 5
index=1, 3
index=0, 1
index=1, 4
index=2, 6
index=0, 2
index=3, 7
index=3, 8

flatMap()

與map操作類似犬耻,將RDD中的每一個(gè)元素通過應(yīng)用f函數(shù)依次轉(zhuǎn)換為新的元素踩晶,并封裝到RDD中。
區(qū)別:在flatMap操作中枕磁,f函數(shù)的返回值是一個(gè)集合渡蜻,并且會(huì)將每一個(gè)該集合中的元素拆分出來放到新的RDD中。

  @Test
  def flatMapTest(): Unit ={
    val list=List(
      "hello,java,scala",
      "python,html,xml",
      "xpath,js,vue",
      "linux,windows"
    )
    // 創(chuàng)建本地集合RDD
    val rdd1= sc.parallelize(list, 4)
    // flatMap
    val rdd2= rdd1.flatMap(_.split(","))
    // 計(jì)算计济,匯總
    println(rdd2.collect.toList)
  }

結(jié)果

List(hello, java, scala, python, html, xml, xpath, js, vue, linux, windows)

glom()

該操作將RDD中每一個(gè)分區(qū)變成一個(gè)數(shù)組茸苇,并放置在新的RDD中,數(shù)組中元素的類型與原分區(qū)中元素類型一致

  @Test
  def glomTest(): Unit ={

    val list: Seq[Int] =List(1,2,3,4,5,6,7,8)
    val rdd1: RDD[Int] =sc.parallelize(list,4)
    val rdd2: RDD[Array[Int]] =rdd1.glom()
    // 獲取
    val arrList: List[Array[Int]] = rdd2.collect.toList
    for (arr<- arrList){
      println(arr.toList)
    }

  }
List(1, 2)
List(3, 4)
List(5, 6)
List(7, 8)

groupBy

對數(shù)據(jù)進(jìn)行分組

  @Test
  def groupBy(): Unit ={
    // 生成一百個(gè)數(shù)
    val range = Range(0, 100)

    val rdd1: RDD[Int] = sc.parallelize(range, 4)


    // 將一百以內(nèi)的數(shù)據(jù)按照 2的倍數(shù)和3的倍數(shù) 進(jìn)行分類沦寂。
    val f=(i:Int)=>{
      if(i%2==0 && i%3==0){0}
      else if(i%2==0){1}
      else if(i%3==0){2}
      else -1
    }


    val rdd2: RDD[(Int, Iterable[Int])] = rdd1.groupBy(f)

    val result: List[(Int, Iterable[Int])] = rdd2.collect.toList
    for(r <- result){
        r match {
          case (k,v) => println(k,v)
        }
    }

  }

結(jié)果

(0,CompactBuffer(0, 6, 12, 18, 24, 30, 36, 42, 48, 54, 60, 66, 72, 78, 84, 90, 96))
(1,CompactBuffer(2, 4, 8, 10, 14, 16, 20, 22, 26, 28, 32, 34, 38, 40, 44, 46, 50, 52, 56, 58, 62, 64, 68, 70, 74, 76, 80, 82, 86, 88, 92, 94, 98))
(2,CompactBuffer(3, 9, 15, 21, 27, 33, 39, 45, 51, 57, 63, 69, 75, 81, 87, 93, 99))
(-1,CompactBuffer(1, 5, 7, 11, 13, 17, 19, 23, 25, 29, 31, 35, 37, 41, 43, 47, 49, 53, 55, 59, 61, 65, 67, 71, 73, 77, 79, 83, 85, 89, 91, 95, 97))

使用groupby 完成worldCount作業(yè)

@Test
  def worldCount():Unit={
    //讀取文件
    val lines=sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",4)

    // 內(nèi)容扁平化
    val worldList: RDD[String] = lines.flatMap(_.split(" "))

    // 內(nèi)容分組
    val groupList: RDD[(String, Iterable[String])] = worldList.groupBy(s => s)

    // 統(tǒng)計(jì)單詞數(shù)量
    val result=groupList.map(x=>(x._1,x._2.size))

    println(result.collect().toList)

  }

數(shù)據(jù)結(jié)果

List((shell,4), (wahaha,1), (hello,2), (python,1), (java,5))

filter 過濾

接收一個(gè)返回值為布爾類型的函數(shù)作為參數(shù)学密。當(dāng)某個(gè)RDD調(diào)用filter方法時(shí),會(huì)對該RDD中每一個(gè)元素應(yīng)用f函數(shù)传藏,如果返回值類型為true腻暮,則該元素會(huì)被添加到新的RDD中。

案例:找到一百中所有的偶數(shù)

  @Test
  def filterTest(): Unit ={
    // 生成0-100的數(shù)
    val range=Range(0,100)
    val rdd1: RDD[Int] = sc.parallelize(range, 4)

    //過濾奇數(shù)
    val value: RDD[Int] = rdd1.filter(_%2==0)
    println(value.collect.toList)
  }
List(0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98)

sample 采樣

  def sample(withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T] = {...}
  • withReplacement:
    是否放回[true-代表放回,意味著同一個(gè)數(shù)據(jù)可能被多次采樣 false-不返回,意味著同一條數(shù)據(jù)最多被采樣一次] 【工作中設(shè)置為false】
  • fraction: 【工作中一般設(shè)置為 0.1-0.2 】:
    如果withReplacement=false, fraction代表每個(gè)元素被采樣的概率[0,1]
    如果withReplacement=true, fraction代表每個(gè)元素期望被采樣的次數(shù)
  • seed: 隨機(jī)數(shù)種子

對應(yīng) 0-100的數(shù)字進(jìn)行采樣毯侦;100%

  @Test
  def sampleTest(): Unit ={
    // 生成0-100的數(shù)
    val range=Range(0,100)
    val rdd1: RDD[Int] = sc.parallelize(range, 4)
    // withReplacement=false ;fraction=1
    val value: RDD[Int] = rdd1.sample(false, 1)

    val list=value.collect.toList
    println(list)
  }
List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)

對應(yīng) 0-100的數(shù)字進(jìn)行采樣哭靖;50%

  @Test
  def sampleTest(): Unit ={
    // 生成0-100的數(shù)
    val range=Range(0,100)
    val rdd1: RDD[Int] = sc.parallelize(range, 4)
    // withReplacement=false ;fraction=0.5
    val value: RDD[Int] = rdd1.sample(false, 0.5)

    val list=value.collect.toList
    println(list)
  }

隨機(jī)抽查50個(gè)不重樣數(shù)據(jù)

List(0, 1, 3, 4, 6, 7, 9, 11, 15, 16, 17, 20, 22, 23, 25, 27, 30, 32, 33, 34, 35, 36, 37, 38, 40, 42, 44, 45, 46, 48, 50, 54, 56, 60, 63, 65, 66, 67, 70, 71, 73, 75, 77, 79, 80, 81, 82, 83, 85, 89, 91, 92, 97)

對應(yīng) 0-100的數(shù)字進(jìn)行采樣;10%

  @Test
  def sampleTest(): Unit ={
    // 生成0-100的數(shù)
    val range=Range(0,100)
    val rdd1: RDD[Int] = sc.parallelize(range, 4)
    // withReplacement=false ;fraction=0.1
    val value: RDD[Int] = rdd1.sample(false, 0.1)

    val list=value.collect.toList
    println(list)
  }

這樣看起方便點(diǎn)

List(15, 23, 44, 49, 50, 83, 85, 86, 96)

重復(fù)采樣侈离,單個(gè)元素可能被采樣多次

  @Test
  def sampleTest(): Unit ={
    // 生成0-100的數(shù)
    val range=Range(0,100)
    val rdd1: RDD[Int] = sc.parallelize(range, 4)
    // withReplacement=true ;fraction=0.2 
    val value: RDD[Int] = rdd1.sample(true, 0.2)

    val list=value.collect.toList
    println(list)
  }

withReplacement 設(shè)置為true试幽,有些數(shù)據(jù)可能會(huì)被多次采樣(如88,95)霍狰。

List(3, 19, 25, 26, 27, 30, 30, 33, 38, 55, 60, 62, 63, 66, 72, 77, 88, 88, 93, 94, 95, 95)

將withReplacement 依舊為true,fraction改為整數(shù)
這次采用0-10的數(shù)據(jù)

  @Test
  def sampleTest(): Unit ={
    // 生成0-100的數(shù)
    val range=Range(0,10)
    val rdd1: RDD[Int] = sc.parallelize(range, 4)
    // withReplacement=true ;fraction=2
    val value: RDD[Int] = rdd1.sample(true,2)

    val list=value.collect.toList
    println(list)
  }

如果withReplacement=true, fraction代表每個(gè)元素期望被采樣的次數(shù);
這個(gè)期望值是不確定的饰及,如上我期望值是2但是有的卻只有1個(gè)如(7)蔗坯;但也比2高的。

List(0, 0, 0, 0, 0, 1, 1, 1, 3, 3, 4, 4, 5, 5, 5, 5, 5, 5, 7, 8, 8, 8, 9, 9, 9)

最后再說說 seed燎含;默認(rèn)是一個(gè)偽隨機(jī)數(shù)宾濒,用來決定采樣的隨機(jī)率。
一般不會(huì)更改

Long = Utils.random.nextLong

seed 固定多次采用的結(jié)果也是一樣

  @Test
  def sampleTest(): Unit ={
    // 生成0-100的數(shù)
    val range=Range(0,10)
    val rdd1: RDD[Int] = sc.parallelize(range, 4)
    // withReplacement=true ;fraction=2;seed=10
    val value: RDD[Int] = rdd1.sample(true,2,10)

    val list=value.collect.toList
    println(list)
  }

第一次運(yùn)行

List(1, 2, 2, 2, 4, 4, 4, 4, 4, 5, 5, 5, 5, 6, 6, 7, 7, 7, 8, 9, 9, 9, 9)

第二次運(yùn)行

List(1, 2, 2, 2, 4, 4, 4, 4, 4, 5, 5, 5, 5, 6, 6, 7, 7, 7, 8, 9, 9, 9, 9)

distinct 去重

  @Test
  def distinctTest(): Unit ={
    // 設(shè)置一些重復(fù)的元素
    val list=List(1,2,2,3,3,4,3,5,6,7,9,8,9,4,7)

    // 創(chuàng)建本地集合RDD
    val rdd1: RDD[Int] = sc.parallelize(list, 4)

    // 去重
    val value: RDD[Int] = rdd1.distinct()

    // 調(diào)用屏箍,打印
    println(value.collect.toList)
  }

結(jié)果

List(4, 8, 1, 9, 5, 6, 2, 3, 7)

除了使用distinct也可使用groupBy 實(shí)現(xiàn)去重功能

 @Test
  def distinctTest(): Unit ={
    // 設(shè)置一些重復(fù)的元素
    val list=List(1,2,2,3,3,4,3,5,6,7,9,8,9,4,7)

    // 創(chuàng)建本地集合RDD
    val rdd1: RDD[Int] = sc.parallelize(list, 4)

    // 去重
    val value: RDD[Int] = rdd1.groupBy(x=>x).map(_._1)

    // 調(diào)用绘梦,打印
    println(value.collect.toList)
  }

結(jié)果

List(4, 8, 1, 9, 5, 6, 2, 3, 7)

coalesce 合并分區(qū)

  @Test
  def coalesceTest(): Unit ={
    // 生成0-100的數(shù)
    val range=Range(0,100)

    // 創(chuàng)建本地集合RDD
    val rdd1: RDD[Int] = sc.parallelize(range, 4)    
  }

查看分區(qū)情況

rdd1.mapPartitionsWithIndex((index,it)=>{
  println(s"index=$index;it=${it.toList}")
  it
}).collect

各個(gè)分區(qū)情況

index=0;it=List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24)
index=1;it=List(25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49)
index=3;it=List(75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)
index=2;it=List(50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74)

合并分區(qū)
numPartitions:合并分區(qū)的個(gè)數(shù)
shuffle:默認(rèn)為false橘忱,并不會(huì)觸發(fā)shuffle,設(shè)置為true卸奉,可以重新擴(kuò)大分區(qū)钝诚,但是會(huì)進(jìn)行shuffle操作。

 def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null): RDD[T] = withScope {...}

合并成兩個(gè)分區(qū)榄棵,并查看合并分區(qū)后的數(shù)據(jù)情況

    //合并成兩個(gè)分區(qū)
    val value: RDD[Int] = rdd1.coalesce(2)
    println(s"分區(qū)數(shù)${value.getNumPartitions}")
    //查看合并之后的分區(qū)情況
    value.mapPartitionsWithIndex((index,it)=>{
      println(s"index=$index;it=${it.toList}")
      it
    }).collect

結(jié)果

分區(qū)數(shù)2
index=0;it=List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49)
index=1;it=List(50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99)

默認(rèn)情況下凝颇,無法增加分區(qū)

    //合并成兩個(gè)分區(qū)
    val value: RDD[Int] = rdd1.coalesce(5)
    println(s"分區(qū)數(shù)${value.getNumPartitions}")
    //查看合并之后的分區(qū)情況
    value.mapPartitionsWithIndex((index,it)=>{
      println(s"index=$index;it=${it.toList}")
      it
    }).collect

就打印分區(qū)數(shù)就行了,無法擴(kuò)展分區(qū)疹鳄。

分區(qū)數(shù)2

coalesce默認(rèn)情況下只能合并分區(qū),如果想要增大分區(qū)數(shù),需要設(shè)置shuffle=true

    //合并成兩個(gè)分區(qū)
    val value: RDD[Int] = rdd1.coalesce(5,true)
    println(s"分區(qū)數(shù)${value.getNumPartitions}")

    //查看合并之后的分區(qū)情況
    value.mapPartitionsWithIndex((index,it)=>{
      println(s"index=$index;it=${it.toList}")
      it
    }).collect

數(shù)據(jù)結(jié)果

分區(qū)數(shù)5
index=0;it=List(4, 9, 14, 19, 24, 27, 32, 37, 42, 47, 53, 58, 63, 68, 73, 78, 83, 88, 93, 98)
index=1;it=List(0, 5, 10, 15, 20, 28, 33, 38, 43, 48, 54, 59, 64, 69, 74, 79, 84, 89, 94, 99)
index=3;it=List(2, 7, 12, 17, 22, 25, 30, 35, 40, 45, 51, 56, 61, 66, 71, 76, 81, 86, 91, 96)
index=2;it=List(1, 6, 11, 16, 21, 29, 34, 39, 44, 49, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95)
index=4;it=List(3, 8, 13, 18, 23, 26, 31, 36, 41, 46, 52, 57, 62, 67, 72, 77, 82, 87, 92, 97)

源碼解析

def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
          (implicit ord: Ordering[T] = null): RDD[T] = withScope {
    require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
    // 判斷 shuffle 是否為true
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](
          mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
          new HashPartitioner(numPartitions)),
        numPartitions,
        partitionCoalescer).values
    } else {
      new CoalescedRDD(this, numPartitions, partitionCoalescer)
    }
}

先分析shuffle=false情況

// this 當(dāng)前調(diào)用coalesce的rdd
// numPartitions 分區(qū)數(shù) 拧略;上面設(shè)置的是5
new CoalescedRDD(this, numPartitions, partitionCoalescer)

partitionCoalescer 我們并沒有指定,所以使用的是默認(rèn)的DefaultPartitionCoalescer

  override def getPartitions: Array[Partition] = {
    val pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer())

    pc.coalesce(maxPartitions, prev).zipWithIndex.map {
      case (pg, i) =>
        val ids = pg.partitions.map(_.index).toArray
        CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
    }
  }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末瘪弓,一起剝皮案震驚了整個(gè)濱河市垫蛆,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌腺怯,老刑警劉巖袱饭,帶你破解...
    沈念sama閱讀 222,590評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異瓢喉,居然都是意外死亡宁赤,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,157評論 3 399
  • 文/潘曉璐 我一進(jìn)店門栓票,熙熙樓的掌柜王于貴愁眉苦臉地迎上來决左,“玉大人,你說我怎么就攤上這事走贪》鹈停” “怎么了?”我有些...
    開封第一講書人閱讀 169,301評論 0 362
  • 文/不壞的土叔 我叫張陵坠狡,是天一觀的道長继找。 經(jīng)常有香客問我,道長逃沿,這世上最難降的妖魔是什么婴渡? 我笑而不...
    開封第一講書人閱讀 60,078評論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮凯亮,結(jié)果婚禮上边臼,老公的妹妹穿的比我還像新娘。我一直安慰自己假消,他們只是感情好柠并,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,082評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般臼予。 火紅的嫁衣襯著肌膚如雪鸣戴。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,682評論 1 312
  • 那天粘拾,我揣著相機(jī)與錄音窄锅,去河邊找鬼。 笑死半哟,一個(gè)胖子當(dāng)著我的面吹牛酬滤,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播寓涨,決...
    沈念sama閱讀 41,155評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼盯串,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了戒良?” 一聲冷哼從身側(cè)響起体捏,我...
    開封第一講書人閱讀 40,098評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎糯崎,沒想到半個(gè)月后几缭,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,638評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡沃呢,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,701評論 3 342
  • 正文 我和宋清朗相戀三年年栓,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片薄霜。...
    茶點(diǎn)故事閱讀 40,852評論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡某抓,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出惰瓜,到底是詐尸還是另有隱情否副,我是刑警寧澤,帶...
    沈念sama閱讀 36,520評論 5 351
  • 正文 年R本政府宣布崎坊,位于F島的核電站备禀,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏奈揍。R本人自食惡果不足惜曲尸,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,181評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望男翰。 院中可真熱鬧另患,春花似錦、人聲如沸奏篙。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,674評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽秘通。三九已至为严,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間肺稀,已是汗流浹背第股。 一陣腳步聲響...
    開封第一講書人閱讀 33,788評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留话原,地道東北人夕吻。 一個(gè)月前我還...
    沈念sama閱讀 49,279評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像繁仁,于是被迫代替她去往敵國和親涉馅。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,851評論 2 361

推薦閱讀更多精彩內(nèi)容