[spark] Checkpoint 源碼解析

前言

在spark應(yīng)用程序中,常常會(huì)遇到運(yùn)算量很大經(jīng)過(guò)很復(fù)雜的 Transformation才能得到的RDD即Lineage鏈較長(zhǎng)鞠柄、寬依賴的RDD,此時(shí)我們可以考慮將這個(gè)RDD持久化嫉柴。

cache也是可以持久化到磁盤(pán)厌杜,只不過(guò)是直接將partition的輸出數(shù)據(jù)寫(xiě)到磁盤(pán),而checkpoint是在邏輯job完成后计螺,若有需要checkpoint的RDD夯尽,再單獨(dú)啟動(dòng)一個(gè)job去完成checkpoint,這樣該RDD就被計(jì)算了兩次登馒,所以建議在有checkpoint的時(shí)候先將該RDD cache到內(nèi)存匙握,到時(shí)候直接寫(xiě)到磁盤(pán)就行了。

checkpoint的實(shí)現(xiàn)

需要使用checkpoint都需要通過(guò)sparkcontext的setCheckpointDir方法設(shè)置一個(gè)目錄以存checkpoint的各種信息數(shù)據(jù)陈轿,下面我們來(lái)看看該方法:

def setCheckpointDir(directory: String) {
    if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
      logWarning("Spark is not running in local mode, therefore the checkpoint directory " +
        s"must not be on the local filesystem. Directory '$directory' " +
        "appears to be on the local filesystem.")
    }
    checkpointDir = Option(directory).map { dir =>
      val path = new Path(dir, UUID.randomUUID().toString)
      val fs = path.getFileSystem(hadoopConfiguration)
      fs.mkdirs(path)
      fs.getFileStatus(path).getPath.toString
    }
  }

在非local模式下圈纺,directory必須是HDFS的目錄;在該目錄下創(chuàng)建一個(gè)以UUID生成的一個(gè)唯一的目錄名的目錄麦射。
通過(guò)rdd.checkpoint()即可checkpoint此RDD

def checkpoint(): Unit = RDDCheckpointData.synchronized { 
    if (context.checkpointDir.isEmpty) {
      throw new SparkException("Checkpoint directory has not been set in the SparkContext")
    } else if (checkpointData.isEmpty) {
      checkpointData = Some(new ReliableRDDCheckpointData(this))
    }
  }

先判斷是否設(shè)置了checkpointDir蛾娶,再判斷checkpointData.isEmpty是否成立,checkpointData的定義是這樣的:

private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None

RDDCheckpointData和RDD一一對(duì)應(yīng)潜秋,保存著和checkpoint相關(guān)的信息蛔琅。這里通過(guò)new ReliableRDDCheckpointData(this)實(shí)例化了checkpointData ,ReliableRDDCheckpointData是其子類峻呛,這里相當(dāng)于是checkpoint的一個(gè)標(biāo)記罗售,并沒(méi)有真正執(zhí)行checkpoint。

什么時(shí)候checkpoint

在有action動(dòng)作時(shí)钩述,會(huì)觸發(fā)sparkcontext對(duì)runJob的調(diào)用:

def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

我們可以看到在執(zhí)行完job后會(huì)執(zhí)行 rdd.doCheckpoint()寨躁,這里就是對(duì)前面標(biāo)記了的RDD的checkpoint,我們繼續(xù)看這個(gè)方法:

private[spark] def doCheckpoint(): Unit = {
    RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
      if (!doCheckpointCalled) {
        doCheckpointCalled = true
        if (checkpointData.isDefined) {
          if (checkpointAllMarkedAncestors) {
              dependencies.foreach(_.rdd.doCheckpoint())
          }
          checkpointData.get.checkpoint()
        } else {
      dependencies.foreach(_.rdd.doCheckpoint())
        }
      }
    }
  }

先判斷是否已經(jīng)被處理過(guò)checkpoint牙勘,沒(méi)有才執(zhí)行职恳,并將doCheckpointCalled 設(shè)為true,因?yàn)榍懊嬉呀?jīng)初始化過(guò)了checkpointData,所以checkpointData.isDefined也滿足话肖,若想要把checkpointData定義過(guò)的RDD的parents也進(jìn)行checkpoint的話,那么我們需要先對(duì)parents checkpoint葡幸。因?yàn)樽钔玻绻鸕DD把自己checkpoint了,那么它就將lineage中它的parents給切除了蔚叨。繼續(xù)跟進(jìn)checkpointData.get.checkpoint()

final def checkpoint(): Unit = {
    // Guard against multiple threads checkpointing the same RDD by
    // atomically flipping the state of this RDDCheckpointData
    RDDCheckpointData.synchronized {
      if (cpState == Initialized) {
        cpState = CheckpointingInProgress
      } else {
        return
      }
    }

    val newRDD = doCheckpoint()

    // Update our state and truncate the RDD lineage
    RDDCheckpointData.synchronized {
      cpRDD = Some(newRDD)
      cpState = Checkpointed
      rdd.markCheckpointed()
    }
  }

先將checkpoint的狀態(tài)改為CheckpointingInProgress床蜘,再執(zhí)行doCheckpoint,返回一個(gè)newRDD蔑水,看doCheckpoint做了什么:

protected override def doCheckpoint(): CheckpointRDD[T] = {
    val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
    if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
      rdd.context.cleaner.foreach { cleaner =>
        cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
      }
    }
    logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
    newRDD
  }

ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)邢锯,將一個(gè)RDD寫(xiě)入到多個(gè)checkpoint文件,并返回一個(gè)ReliableCheckpointRDD來(lái)代表這個(gè)RDD

def writeRDDToCheckpointDirectory[T: ClassTag](
      originalRDD: RDD[T],
      checkpointDir: String,
      blockSize: Int = -1): ReliableCheckpointRDD[T] = {
    val sc = originalRDD.sparkContext
    // Create the output path for the checkpoint
    val checkpointDirPath = new Path(checkpointDir)
    val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
    if (!fs.mkdirs(checkpointDirPath)) {
      throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
    }
    // Save to file, and reload it as an RDD
    val broadcastedConf = sc.broadcast(
      new SerializableConfiguration(sc.hadoopConfiguration))
    // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
    sc.runJob(originalRDD,
      writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
    if (originalRDD.partitioner.nonEmpty) {
      writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
    }
    val newRDD = new ReliableCheckpointRDD[T](
      sc, checkpointDirPath.toString, originalRDD.partitioner)
    if (newRDD.partitions.length != originalRDD.partitions.length) {
      throw new SparkException(
        s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +
          s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})")
    }
    newRDD
  }

獲取一些配置信息廣播輸出等操作搀别,然后啟動(dòng)一個(gè)Job去寫(xiě)Checkpint文件丹擎,主要由ReliableCheckpointRDD.writeCheckpointFile來(lái)實(shí)現(xiàn)寫(xiě)操作杀狡,寫(xiě)完checkpoint后new一個(gè)ReliableCheckpointRDD實(shí)例返回忧饭,看看具體的writePartitionToCheckpointFile實(shí)現(xiàn):

def writePartitionToCheckpointFile[T: ClassTag](
      path: String,
      broadcastedConf: Broadcast[SerializableConfiguration],
      blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
    val env = SparkEnv.get
    val outputDir = new Path(path)
    val fs = outputDir.getFileSystem(broadcastedConf.value.value)

    val finalOutputName = ReliableCheckpointRDD.checkpointFileName(ctx.partitionId())
    val finalOutputPath = new Path(outputDir, finalOutputName)
    val tempOutputPath =
      new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")

    if (fs.exists(tempOutputPath)) {
      throw new IOException(s"Checkpoint failed: temporary path $tempOutputPath already exists")
    }
    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)

    val fileOutputStream = if (blockSize < 0) {
      fs.create(tempOutputPath, false, bufferSize)
    } else {
      // This is mainly for testing purpose
      fs.create(tempOutputPath, false, bufferSize,
        fs.getDefaultReplication(fs.getWorkingDirectory), blockSize)
    }
    val serializer = env.serializer.newInstance()
    val serializeStream = serializer.serializeStream(fileOutputStream)
    Utils.tryWithSafeFinally {
      serializeStream.writeAll(iterator)
    } {
      serializeStream.close()
    }

    if (!fs.rename(tempOutputPath, finalOutputPath)) {
      if (!fs.exists(finalOutputPath)) {
        logInfo(s"Deleting tempOutputPath $tempOutputPath")
        fs.delete(tempOutputPath, false)
        throw new IOException("Checkpoint failed: failed to save output of task: " +
          s"${ctx.attemptNumber()} and final output path does not exist: $finalOutputPath")
      } else {
        // Some other copy of this task must've finished before us and renamed it
        logInfo(s"Final output path $finalOutputPath already exists; not overwriting it")
        if (!fs.delete(tempOutputPath, false)) {
          logWarning(s"Error deleting ${tempOutputPath}")
        }
      }
    }
  }

這里的代碼就是普通的對(duì)HDFS寫(xiě)文件的操作潜索,將一個(gè)RDD partition的數(shù)據(jù)寫(xiě)到checkpoint目錄下梧奢。

doCheckpoint()操作已經(jīng)完成著淆,返回了一個(gè)new RDD:ReliableCheckpointRDD引用給cpRDD近迁,接著標(biāo)記checkpoint的狀態(tài)為Checkpointed打掘,rdd.markCheckpointed()干了什么呢?

private[spark] def markCheckpointed(): Unit = {
    clearDependencies()
    partitions_ = null
    deps = null    // Forget the constructor argument for dependencies too
  }

最后再清除RDD的所有依賴绊率。

寫(xiě)checkpoint總結(jié)

  • Initialized
  • marked for checkpointing
  • checkpointing in progress
  • checkpointed

什么時(shí)候讀checkpoint

在需要讀取一個(gè)partition的數(shù)據(jù)時(shí)垂睬,會(huì)通過(guò)rdd.iterator() 去計(jì)算該 rdd 的 partition 的媳荒,我們來(lái)看RDD的iterator()實(shí)現(xiàn):

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      getOrCompute(split, context)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

在cache中沒(méi)有讀到數(shù)據(jù)時(shí)再判斷該RDD是否被checkpoint過(guò),isCheckpointedAndMaterialized就是在checkpoint成功時(shí)的一個(gè)狀態(tài)標(biāo)記:cpState = Checkpointed驹饺。

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
  {
    if (isCheckpointedAndMaterialized) {
      firstParent[T].iterator(split, context)
    } else {
      compute(split, context)
    }
  }

當(dāng)該RDD被成功checkpoint了钳枕,直接使用parent rdd 的 iterator() 也就是 CheckpointRDD.iterator(),否則直接調(diào)用該RDD的compute方法逻淌。

final def dependencies: Seq[Dependency[_]] = {
    checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
      if (dependencies_ == null) {
        dependencies_ = getDependencies
      }
      dependencies_
    }
  }

獲取RDD的依賴時(shí)么伯,會(huì)先嘗試從checkpointRDD中獲取依賴,若成功則返回被OneToOneDependency包裝過(guò)的ReliableCheckpointRDD對(duì)象卡儒,否則獲取真正的依賴田柔。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市骨望,隨后出現(xiàn)的幾起案子硬爆,更是在濱河造成了極大的恐慌,老刑警劉巖擎鸠,帶你破解...
    沈念sama閱讀 222,104評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件缀磕,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)袜蚕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén)糟把,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人牲剃,你說(shuō)我怎么就攤上這事遣疯。” “怎么了凿傅?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,697評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵缠犀,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我聪舒,道長(zhǎng)辨液,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,836評(píng)論 1 298
  • 正文 為了忘掉前任箱残,我火速辦了婚禮滔迈,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘疚宇。我一直安慰自己亡鼠,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,851評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布敷待。 她就那樣靜靜地躺著间涵,像睡著了一般。 火紅的嫁衣襯著肌膚如雪榜揖。 梳的紋絲不亂的頭發(fā)上勾哩,一...
    開(kāi)封第一講書(shū)人閱讀 52,441評(píng)論 1 310
  • 那天,我揣著相機(jī)與錄音举哟,去河邊找鬼思劳。 笑死,一個(gè)胖子當(dāng)著我的面吹牛妨猩,可吹牛的內(nèi)容都是我干的潜叛。 我是一名探鬼主播,決...
    沈念sama閱讀 40,992評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼壶硅,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼威兜!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起庐椒,我...
    開(kāi)封第一講書(shū)人閱讀 39,899評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤椒舵,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后约谈,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體笔宿,經(jīng)...
    沈念sama閱讀 46,457評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡犁钟,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,529評(píng)論 3 341
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了泼橘。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片涝动。...
    茶點(diǎn)故事閱讀 40,664評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖炬灭,靈堂內(nèi)的尸體忽然破棺而出捧存,到底是詐尸還是另有隱情,我是刑警寧澤担败,帶...
    沈念sama閱讀 36,346評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站镰官,受9級(jí)特大地震影響提前,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜泳唠,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,025評(píng)論 3 334
  • 文/蒙蒙 一狈网、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧笨腥,春花似錦拓哺、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,511評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至谆级,卻和暖如春烤礁,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背肥照。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,611評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工脚仔, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人舆绎。 一個(gè)月前我還...
    沈念sama閱讀 49,081評(píng)論 3 377
  • 正文 我出身青樓鲤脏,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親吕朵。 傳聞我的和親對(duì)象是個(gè)殘疾皇子猎醇,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,675評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容