RDD持久化

為什么需要持久化

所謂的持久化令杈,就是將數(shù)據(jù)進(jìn)行保存掌敬,避免數(shù)據(jù)丟失。RDD持久化并非將數(shù)據(jù)落盤保存蠢熄,而是用作緩存跪解。
了解RDD持久化前需要先了解什么是RDD?

  1. RDD就像是一個(gè)水管签孔,數(shù)據(jù)就像是水叉讥,水只會經(jīng)過水管,并不是存儲水饥追。所以RDD是不會存儲數(shù)據(jù)的图仓。
  2. RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集,的其中一個(gè)特性就是彈性
    • 存儲的彈性: spark計(jì)算過程中中間結(jié)果會保存在內(nèi)存中如果內(nèi)存不足會自動(dòng)存儲在磁盤
    • 容錯(cuò)的彈性: spark中計(jì)算過程中如果出錯(cuò)會自動(dòng)重試
    • 計(jì)算的彈性:如果計(jì)算過程中數(shù)據(jù)丟失,會根據(jù)RDD的依賴關(guān)系重新計(jì)算得到數(shù)據(jù)\color{red}{(主要說這個(gè))}
    • 分區(qū)的彈性:spark RDD會根據(jù)文件大小動(dòng)態(tài)分區(qū)

扯到彈性但绕,一是該篇文章會講到救崔,最主要的還是我需要復(fù)習(xí)一下
這里主要說到計(jì)算的彈性,如果其中一個(gè)RDD出現(xiàn)問題壁熄,可以根據(jù)依賴關(guān)系重寫計(jì)算帚豪,獲得結(jié)果。這樣的好處就是保證了數(shù)據(jù)的完整性草丧。那么有沒有缺點(diǎn)呢狸臣?答案是有的,就是需要重復(fù)計(jì)算昌执,得從頭開始烛亦。

1001

如圖(1001):若rdd5出現(xiàn)問題了诈泼,若要重新獲得數(shù)據(jù)需要從rdd1開始運(yùn)行,但是rdd5實(shí)際上只依賴于rdd4產(chǎn)生的數(shù)據(jù)結(jié)果煤禽。從頭開始效率大大降低呀铐达,我們是否可以把rdd4的解決結(jié)果緩存起來,rdd5直接從緩存中獲让使瓮孙?若緩存中沒有在從頭開始呢?這樣可以大大減少運(yùn)行時(shí)間选脊。


我們再通過一個(gè)案例深入理解緩存的作用杭抠。

  @Test
  def check2Test(): Unit ={
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    val rdd1=sc.parallelize(List(1,2,3),4)

    //進(jìn)行一個(gè)map算子操作
    val rdd2=rdd1.map(x=>{
      println("*"*10)
      x*10
    })

    //進(jìn)行一個(gè)reduce算子操作
    val rdd3=rdd2.map(x=>x+10)
    val rdd4=rdd2.map(x=>x+10)


    //打印 rdd2 的結(jié)果
    println(rdd3.collect.toList)
    // 輸出 sum值
    println(rdd4.collect.toList)
  }

有這么一個(gè)程序(如上),rdd2的結(jié)果會作為rdd3和rdd4的數(shù)據(jù)來源恳啥,也就是說rdd3和rdd4會對rdd2的結(jié)果做運(yùn)算偏灿。
若此時(shí)運(yùn)行程序;rdd2中的println("*"*10)會被運(yùn)行多少次?

    val rdd2=rdd1.map(x=>{
      println("*"*10)
      x*10
    })

答案會被運(yùn)行6次钝的,因?yàn)闀?jīng)過兩次collect翁垂,
根據(jù)依賴關(guān)系,調(diào)用類似collect 的Action運(yùn)行算子硝桩,程序會從上往下運(yùn)行(三條數(shù)據(jù)*兩次collect=6次)沿猜。

**********
**********
**********
**********
**********
**********

你可以發(fā)現(xiàn)到了問題所在,這樣每次都要從頭開始碗脊,這種依賴關(guān)系比較少只有三級邢疙,若在實(shí)際開發(fā)中,十級以上都是屬于正常的依賴關(guān)系望薄,若在rdd2中存有數(shù)據(jù),就無需從頭開始了呼畸,直接取來用即可痕支。

RDD緩存

RDD不存儲數(shù)據(jù),所以默認(rèn)情況下每次執(zhí)行的時(shí)候都會stage開頭執(zhí)行
緩存:

  • 數(shù)據(jù)保存位置: 保存在task所在主機(jī)的內(nèi)存/本地磁盤上
  • 應(yīng)用場景: 某個(gè)RDD在多個(gè)job中重復(fù)使用的時(shí)候

如何緩存:

  • cache
  • persist

緩存的好處:

如果一個(gè)RDD有設(shè)置cache\persist,此時(shí)rdd所屬第一個(gè)Job執(zhí)行完成之后,數(shù)據(jù)會持久化到本地的磁盤/內(nèi)存中蛮原。后續(xù)RDD所屬的其他job在執(zhí)行的時(shí)候會直接將緩存數(shù)據(jù)拿過來使用而不用重新計(jì)算

RDD Cache緩存

RDD通過Cache或者Persist方法將前面的計(jì)算結(jié)果緩存卧须,默認(rèn)情況下會把數(shù)據(jù)以序列化的形式緩存在JVM的堆內(nèi)存中。但是并不是這兩個(gè)方法被調(diào)用時(shí)立即緩存儒陨,而是觸發(fā)后面的action時(shí)花嘶,該RDD將會被緩存在計(jì)算節(jié)點(diǎn)的內(nèi)存中,并供后面重用蹦漠。

如何使用椭员?
語法: RDD.cache()

@Test
  def check2Test(): Unit ={
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    val rdd1=sc.parallelize(List(1,2,3),4)

    //進(jìn)行一個(gè)map算子操作
    val rdd2=rdd1.map(x=>{
      println("*"*10)
      x*10
    })
    //添加 rdd2的緩存
    val rddx=rdd2.cache()

    //進(jìn)行一個(gè)reduce算子操作
    val rdd3=rddx.map(x=>x+10)
    val rdd4=rddx.map(x=>x+10)


    //打印 rdd2 的結(jié)果
    println(rdd3.collect.toList)
    // 輸出 sum值
    println(rdd4.collect.toList)

  }

運(yùn)行結(jié)果,是否還會打印6次println("*"*10)

**********
**********
**********

數(shù)據(jù)直接從緩存中取笛园,所以只會打印三次隘击。


cache流程圖

RDD Persist緩存

如何使用侍芝?
語法: RDD.persist()

@Test
  def check2Test(): Unit ={
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    val rdd1=sc.parallelize(List(1,2,3),4)

    //進(jìn)行一個(gè)map算子操作
    val rdd2=rdd1.map(x=>{
      println("*"*10)
      x*10
    })
    //添加 rdd2的緩存
    val rddx=rdd2.persist()

    //進(jìn)行一個(gè)reduce算子操作
    val rdd3=rddx.map(x=>x+10)
    val rdd4=rddx.map(x=>x+10)


    //打印 rdd2 的結(jié)果
    println(rdd3.collect.toList)
    // 輸出 sum值
    println(rdd4.collect.toList)

  }

結(jié)果還是3次

**********
**********
**********

Cache和Persist有什么區(qū)別?

cache底層就是persist埋同,調(diào)用是一個(gè)無參函數(shù)州叠。

 def cache(): this.type = persist()

無參的persist()其實(shí)調(diào)用的就是有參的,指定一個(gè)緩存級別凶赁。

def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
  • cache是將數(shù)據(jù)保存在本地內(nèi)存中
  • persist可以指定將數(shù)據(jù)保存在內(nèi)存/磁盤中

Persist的緩存級別

StorageLevel 中就定義persist的用到緩存級別咧栗。

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

存儲級別:

  • NONE: 不存儲
  • DISK_ONLY : 只保存在磁盤中
  • DISK_ONLY_2 : 只保存在磁盤中,數(shù)據(jù)保存兩份
  • MEMORY_ONLY : 只保存在內(nèi)存中
  • MEMORY_ONLY_2 : 只保存在內(nèi)存中,數(shù)據(jù)保存兩份
  • MEMORY_ONLY_SER :只保存在內(nèi)存中,以序列化形式存儲
  • MEMORY_ONLY_SER_2 : 只保存在內(nèi)存中,以序列化形式存儲,數(shù)據(jù)保存兩份
  • MEMORY_AND_DISK : 數(shù)據(jù)保存在內(nèi)存/磁盤中,可以動(dòng)態(tài)調(diào)整
  • MEMORY_AND_DISK_2 : 數(shù)據(jù)保存在內(nèi)存/磁盤中,可以動(dòng)態(tài)調(diào)整虱肄,數(shù)據(jù)保存兩份
  • MEMORY_AND_DISK_SER :數(shù)據(jù)保存在內(nèi)存/磁盤中,可以動(dòng)態(tài)調(diào)整,以序列化形式存儲
  • MEMORY_AND_DISK_SER_2 : 數(shù)據(jù)保存在內(nèi)存/磁盤中,可以動(dòng)態(tài)調(diào)整,以序列化形式存儲致板,數(shù)據(jù)保存兩份
  • OFF_HEAP :數(shù)據(jù)保存在堆外內(nèi)存中

太多了對不對?其中工作中常用的存儲級別:

  • MEMORY_ONLY<只適用于小數(shù)據(jù)量場景>
  • MEMORY_AND_DISK<適用于大數(shù)據(jù)量場景>

RDD CheckPoint檢查點(diǎn)

雖然我們配置了緩存但是只能保存在task所在主機(jī)的內(nèi)存/本地磁盤上浩峡,若該服務(wù)器出現(xiàn)問題可岂,依然會造成數(shù)據(jù)丟失,從頭開始計(jì)算翰灾,效率也有所降低缕粹。為了安全起見,我們可以設(shè)置CheckPoint纸淮,將數(shù)據(jù)保存到比較可靠的地方平斩,如:將數(shù)據(jù)保存到HDFS中。

1)檢查點(diǎn):是通過將RDD中間結(jié)果寫入磁盤咽块。
2)為什么要做檢查點(diǎn)绘面?
由于血緣依賴過長會造成容錯(cuò)成本過高,這樣就不如在中間階段做檢查點(diǎn)容錯(cuò)侈沪,如果檢查點(diǎn)之后有節(jié)點(diǎn)出現(xiàn)問題揭璃,可以從檢查點(diǎn)開始重做血緣,減少了開銷亭罪。
3)檢查點(diǎn)存儲路徑:Checkpoint的數(shù)據(jù)通常是存儲在HDFS等容錯(cuò)瘦馍、高可用的文件系統(tǒng)
4)檢查點(diǎn)數(shù)據(jù)存儲格式為:二進(jìn)制的文件
5)檢查點(diǎn)切斷血緣:在Checkpoint的過程中,該RDD的所有依賴于父RDD中的信息將全部被移除应役。
6)檢查點(diǎn)觸發(fā)時(shí)間:對RDD進(jìn)行Checkpoint操作并不會馬上被執(zhí)行情组,必須執(zhí)行Action操作才能觸發(fā)。但是檢查點(diǎn)為了數(shù)據(jù)安全箩祥,會從血緣關(guān)系的最開始執(zhí)行一遍院崇。

案例演示:

  @Test
  def check4Test(): Unit ={
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)
    //設(shè)置存儲路徑
    sc.setCheckpointDir("hdfs://hadoop102:9820/output/a")


    val rdd1=sc.parallelize(List(1,2,3),4)

    //進(jìn)行一個(gè)map算子操作
    val rdd2=rdd1.map(x=>{
      println("*"*10)
      x*10
    })
    //添加 rdd2的緩存
    rdd2.checkpoint()

    //進(jìn)行一個(gè)reduce算子操作
    val rdd3=rdd2.map(x=>x+10)
    val rdd4=rdd2.map(x=>x+10)

    //打印 rdd2 的結(jié)果
    println(rdd3.collect.toList)
    // 輸出 sum值
    println(rdd4.collect.toList)

  }

/output/a 輸出文件

CheckPoint輸出文件

若需要這種錯(cuò)誤

Permission denied: user=123456, access=WRITE, inode="/output/a":hadoop:supergroup:drwxr-xr-x

這種屬于權(quán)限問題,解決方式設(shè)置-DHADOOP_USER_NAME=hadoop
hadoop=你鏈接服務(wù)器的賬號

配置-DHADOOP_USER_NAME

奇怪的是控制臺卻打印了6次println("*"*10)

**********
**********
**********
**********
**********
**********

這是怎么回事了袍祖?
是我執(zhí)行了 rdd3和rdd4的原因底瓣?

  @Test
  def check4Test(): Unit ={
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)
    //設(shè)置存儲路徑
    sc.setCheckpointDir("hdfs://hadoop102:9820/output/a")


    val rdd1=sc.parallelize(List(1,2,3),4)

    //進(jìn)行一個(gè)map算子操作
    val rdd2=rdd1.map(x=>{
      println("*"*10)
      x*10
    })
    //添加 rdd2的緩存
    rdd2.checkpoint()

    //進(jìn)行一個(gè)reduce算子操作
    val rdd3=rdd2.map(x=>x+10)
    val rdd4=rdd2.map(x=>x+10)
    val rdd5=rdd2.map(x=>x+10)


    //打印 rdd2 的結(jié)果
    println(rdd3.collect.toList)
    // 輸出 rdd4 的結(jié)果
    println(rdd4.collect.toList)
    // 輸出 rdd5 的結(jié)果
    println(rdd5.collect.toList)

  }

在加一個(gè) rdd5 是否會執(zhí)行9次?
啟動(dòng)運(yùn)行依舊是6次

**********
**********
**********
**********
**********
**********

懊し骸濒持?這么奇怪键耕?那么把rdd4rdd5刪了呢?

  @Test
  def check4Test(): Unit ={
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)
    //設(shè)置存儲路徑
    sc.setCheckpointDir("hdfs://hadoop102:9820/output/a")


    val rdd1=sc.parallelize(List(1,2,3),4)

    //進(jìn)行一個(gè)map算子操作
    val rdd2=rdd1.map(x=>{
      println("*"*10)
      x*10
    })
    //添加 rdd2的緩存
    rdd2.checkpoint()

    //進(jìn)行一個(gè)reduce算子操作
    val rdd3=rdd2.map(x=>x+10)


    //打印 rdd2 的結(jié)果
    println(rdd3.collect.toList)

  }

啟動(dòng)運(yùn)行依舊是6次

**********
**********
**********
**********
**********
**********

這是什么原因呢柑营?這就得從checkpoint() 源碼中尋址答案了屈雄。

def checkpoint(): Unit = RDDCheckpointData.synchronized {
    // 判斷路徑是否為空
    if (context.checkpointDir.isEmpty) {
      throw new SparkException("Checkpoint directory has not been set in the SparkContext")
    } else if (checkpointData.isEmpty) {
      // 看看 ReliableRDDCheckpointData
      checkpointData = Some(new ReliableRDDCheckpointData(this))
    }
  }

ReliableRDDCheckpointData 是一個(gè)伴生對象,我們看看doCheckpoint方法

  protected override def doCheckpoint(): CheckpointRDD[T] = {
    // 將RDD寫入檢查點(diǎn)目錄
    val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)

    // Optionally clean our checkpoint files if the reference is out of scope
    if (rdd.conf.get(CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS)) {
      rdd.context.cleaner.foreach { cleaner =>
        cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
      }
    }

    logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
    newRDD
  }

進(jìn)入到 writeRDDToCheckpointDirectory

def writeRDDToCheckpointDirectory[T: ClassTag](
      originalRDD: RDD[T],
      checkpointDir: String,
      blockSize: Int = -1): ReliableCheckpointRDD[T] = {
    val checkpointStartTimeNs = System.nanoTime()

    val sc = originalRDD.sparkContext

    // Create the output path for the checkpoint
    val checkpointDirPath = new Path(checkpointDir)
    val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
    if (!fs.mkdirs(checkpointDirPath)) {
      throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
    }

    // Save to file, and reload it as an RDD
    //保存到文件官套,并將其作為RDD重新加載
    val broadcastedConf = sc.broadcast(
      new SerializableConfiguration(sc.hadoopConfiguration))
    // 上面都是一些初始化工作酒奶,判斷,創(chuàng)建目錄啥奶赔,主要是這一句惋嚎,會在運(yùn)行job
    sc.runJob(originalRDD, writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
    // 下面就不是很重要了
    if (originalRDD.partitioner.nonEmpty) {
      writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
    }

    val checkpointDurationMs =
      TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
    logInfo(s"Checkpointing took $checkpointDurationMs ms.")

    val newRDD = new ReliableCheckpointRDD[T](
      sc, checkpointDirPath.toString, originalRDD.partitioner)
    if (newRDD.partitions.length != originalRDD.partitions.length) {
      throw new SparkException(
        "Checkpoint RDD has a different number of partitions from original RDD. Original " +
          s"RDD [ID: ${originalRDD.id}, num of partitions: ${originalRDD.partitions.length}]; " +
          s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: " +
          s"${newRDD.partitions.length}].")
    }
    newRDD
  }

程序運(yùn)行到這里,他會再執(zhí)行一個(gè)job任務(wù)站刑,并將數(shù)據(jù)保存到緩存中另伍。

sc.runJob(originalRDD, writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)

checkpoint一開始就執(zhí)行(如下),雖然checkpointrdd3前面被調(diào)用绞旅,但是它需要等到rdd3執(zhí)行完collect之后再運(yùn)行一個(gè)job摆尝。

   //添加 rdd2的緩存
    rdd2.checkpoint()

    //進(jìn)行一個(gè)reduce算子操作
    val rdd3=rdd2.map(x=>x+10)

總結(jié):checkpoint會觸發(fā)一次job操作,該job操作是在checkpoint所屬RDD第一個(gè)job執(zhí)行完成之后才會觸發(fā)

這樣就不爽了因悲,明明只需要跑一次的任務(wù)就可以緩存的堕汞,現(xiàn)在需要多跑一次,雖然對后面的RDD效率有所提高晃琳,但是就是不爽讯检。
其實(shí)check可以和checkpoint 配合使用

  @Test
  def check4Test(): Unit ={
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)
    //設(shè)置存儲路徑
    sc.setCheckpointDir("hdfs://hadoop102:9820/output/a")


    val rdd1=sc.parallelize(List(1,2,3),4)

    //進(jìn)行一個(gè)map算子操作
    val rdd2=rdd1.map(x=>{
      println("*"*10)
      x*10
    })
    //添加 rdd2的緩存
    val rddx=rdd2.cache()
    rddx.checkpoint()

    //進(jìn)行一個(gè)reduce算子操作
    val rdd3=rddx.map(x=>x+10)
    val rdd4=rddx.map(x=>x+10)
    val rdd5=rddx.map(x=>x+10)


    //打印 rdd2 的結(jié)果
    println(rdd3.collect.toList)
    // 輸出 rdd4 的結(jié)果
    println(rdd4.collect.toList)
    // 輸出 rdd5 的結(jié)果
    println(rdd5.collect.toList)

    //釋放
    rddx.unpersist(true)
    //關(guān)閉鏈接
    sc.stop()

  }

這樣配置配置之后,就只會運(yùn)行一次了卫旱。

//添加 rdd2的緩存
val rddx=rdd2.cache()
rddx.checkpoint()

程序結(jié)果

**********
**********
**********

注意:checkpoint還是會運(yùn)行一個(gè)job人灼,但是程序不用從頭開始了,而是直接從rddx中取顾翼。
總結(jié):為了避免checkpoint觸發(fā)的job重復(fù)執(zhí)行之前的數(shù)據(jù)處理邏輯,可以在checkpoint之間將rdd通過cache緩存數(shù)據(jù),后續(xù)checkpoint觸發(fā)的job就可以直接使用緩存的數(shù)據(jù)


最后還得注意的是挡毅,緩存是可以備份到內(nèi)存或磁盤中的。
使用cache時(shí)暴构,job結(jié)束之后,緩存會被自動(dòng)釋放段磨。
使用checkpoint時(shí)取逾,需要手動(dòng)進(jìn)行釋放,需要設(shè)置unpersist為true默認(rèn)為false苹支。

    //釋放
    rddx.unpersist(true)
    //關(guān)閉鏈接
    sc.stop()

cache與checkpoint的區(qū)別:

數(shù)據(jù)持久化的位置不一樣:

  1. cache是將數(shù)據(jù)保存在本地內(nèi)存/磁盤中
  2. checkpoint是將數(shù)據(jù)保存在HDFS中

依賴關(guān)系是否保留不一樣

  1. cache是將數(shù)據(jù)保存在本地內(nèi)存/磁盤中,如果服務(wù)器宕機(jī),此時(shí)數(shù)據(jù)丟失,丟失數(shù)據(jù)之后只能根據(jù)rdd的依賴關(guān)系重新計(jì)算得到數(shù)據(jù),所以rdd的依賴關(guān)系會保留
  2. checkpoint是將數(shù)據(jù)保存在HDFS,數(shù)據(jù)不會丟失,此時(shí)RDD的依賴關(guān)系會切除

工作常用還是cache


1)Cache緩存只是將數(shù)據(jù)保存起來砾隅,不切斷血緣依賴。Checkpoint檢查點(diǎn)切斷血緣依賴债蜜。
2)Cache緩存的數(shù)據(jù)通常存儲在磁盤晴埂、內(nèi)存等地方究反,可靠性低。Checkpoint的數(shù)據(jù)通常存儲在HDFS等容錯(cuò)儒洛、高可用的文件系統(tǒng)精耐,可靠性高。
3)建議對checkpoint()的RDD使用Cache緩存琅锻,這樣checkpoint的job只需從Cache緩存中讀取數(shù)據(jù)即可卦停,否則需要再從頭計(jì)算一次RDD。
4)如果使用完了緩存恼蓬,可以通過unpersist()方法釋放緩存


Shuffle會自動(dòng)進(jìn)行緩存

Job程序運(yùn)行中惊完,若進(jìn)行了shuffle操作,會自動(dòng)完成一次緩存操作处硬。

案例演示:

  @Test
  def check4Test(): Unit ={
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)


    val rdd1=sc.parallelize(List(1,2,3),4)

    //進(jìn)行一個(gè)map算子操作
    val rdd2=rdd1.map(x=>{
      println("*"*10)
      x*10
    })

    // 進(jìn)行一次分組小槐,主要想讓程序完成一次shuffle操作。
    val rddx=rdd2.groupBy(x=>x)

    //進(jìn)行一個(gè)reduce算子操作
    val rdd3=rddx.map(x=>x)
    val rdd4=rddx.map(x=>x)
    val rdd5=rddx.map(x=>x)


    //打印 rdd2 的結(jié)果
    println(rdd3.collect.toList)
    // 輸出 rdd4 的結(jié)果
    println(rdd4.collect.toList)
    // 輸出 rdd5 的結(jié)果
    println(rdd5.collect.toList)

    //關(guān)閉鏈接
    sc.stop()

  }

將代碼做了一些變動(dòng)荷辕;

  1. 刪除所有的緩存
  2. 使用groupBy凿跳,讓程序完成一次shuffle操作
  3. 最終看程序,是否只打印 3次println("*"*10)?

運(yùn)行結(jié)果:只打印了3次桐腌。

**********
**********
**********

這個(gè)一定要記住拄显,面試可能會被問到。

最后

文章內(nèi)容案站,我一邊看一邊做案例躬审,可能文檔會比較亂,沒有其他博客那么清晰明了蟆盐。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末承边,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子石挂,更是在濱河造成了極大的恐慌博助,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件痹愚,死亡現(xiàn)場離奇詭異富岳,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)拯腮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進(jìn)店門窖式,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人动壤,你說我怎么就攤上這事萝喘。” “怎么了?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵阁簸,是天一觀的道長爬早。 經(jīng)常有香客問我,道長启妹,這世上最難降的妖魔是什么筛严? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮翅溺,結(jié)果婚禮上脑漫,老公的妹妹穿的比我還像新娘。我一直安慰自己咙崎,他們只是感情好优幸,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著褪猛,像睡著了一般网杆。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上伊滋,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天碳却,我揣著相機(jī)與錄音,去河邊找鬼笑旺。 笑死昼浦,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的筒主。 我是一名探鬼主播关噪,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼乌妙!你這毒婦竟也來了使兔?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤藤韵,失蹤者是張志新(化名)和其女友劉穎虐沥,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體泽艘,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡欲险,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了匹涮。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片盯荤。...
    茶點(diǎn)故事閱讀 39,902評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖焕盟,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤脚翘,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布灼卢,位于F島的核電站,受9級特大地震影響来农,放射性物質(zhì)發(fā)生泄漏鞋真。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一沃于、第九天 我趴在偏房一處隱蔽的房頂上張望涩咖。 院中可真熱鬧,春花似錦繁莹、人聲如沸檩互。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽闸昨。三九已至,卻和暖如春薄风,著一層夾襖步出監(jiān)牢的瞬間饵较,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工遭赂, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留循诉,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓撇他,卻偏偏與公主長得像茄猫,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子逆粹,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 1. Background 當(dāng)我們需要多次使用同一個(gè) RDD 時(shí)募疮,如果簡單的調(diào)用 Action 操作,Spark ...
    xiaoc024閱讀 925評論 0 0
  • rdd的持久化算子有三種: 1僻弹、cache:將數(shù)據(jù)持久化到內(nèi)存 2阿浓、persist:可以將數(shù)據(jù)持久化到磁盤,也可以...
    林桉閱讀 454評論 0 0
  • 我們這節(jié)課講一下RDD的持久化 這段代碼我們上午已經(jīng)看過了蹋绽,有瑕疵大家看出來了嗎芭毙?有什么瑕疵啊?大家是否還記得我在...
    Albert陳凱閱讀 647評論 0 2
  • (1) persist算子 使用方法: StorageLevel說明: StorageLevel的構(gòu)造函數(shù): St...
    printf200閱讀 295評論 0 1
  • (1) persist算子 使用方法: StorageLevel說明: StorageLevel的構(gòu)造函數(shù): St...
    數(shù)據(jù)萌新閱讀 580評論 0 0