spark源碼分析之Checkpoint的過程

概述

checkpoint 的機(jī)制保證了需要訪問重復(fù)數(shù)據(jù)的應(yīng)用 Spark 的DAG執(zhí)行行圖可能很龐大触趴,task 中計(jì)算鏈可能會(huì)很長驼修,這時(shí)如果 task 中途運(yùn)行出錯(cuò)宏所,那么 task 的整個(gè)需要重算非常耗時(shí),因此越驻,有必要將計(jì)算代價(jià)較大的 RDD checkpoint 一下汁政,當(dāng)下游 RDD 計(jì)算出錯(cuò)時(shí),可以直接從 checkpoint 過的 RDD 那里讀取數(shù)據(jù)繼續(xù)算缀旁。

<b>我們先來看一個(gè)例子记劈,checkpoint的使用</b>

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object CheckPointTest {

   def main(args: Array[String]) {
      
    val sc: SparkContext = SparkContext.getOrCreate(new   SparkConf().setAppName("ck").setMaster("local[2]"))
    sc.setCheckpointDir("/Users/kinge/ck")

    val rdd: RDD[(String, Int)] = sc.textFile("").map{x=>(x,1) }.reduceByKey(_+_)
    rdd.checkpoint()

    rdd.count()
    rdd.groupBy(x=>x._2).collect().foreach(println)
   }
}

checkpoint流程分析

checkpoint初始化

我們可以看到最先調(diào)用了SparkContextsetCheckpointDir 設(shè)置了一個(gè)checkpoint 目錄
我們跟進(jìn)這個(gè)方法看一下

  /**
   * Set the directory under which RDDs are going to be checkpointed. The directory must
   * be a HDFS path if running on a cluster.
   */
  def setCheckpointDir(directory: String) {

    // If we are running on a cluster, log a warning if the directory is local.
    // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from
    // its own local file system, which is incorrect because the checkpoint files
    // are actually on the executor machines.
    if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
      logWarning("Checkpoint directory must be non-local " +
        "if Spark is running on a cluster: " + directory)
    }

   //利用hadoop的api創(chuàng)建了一個(gè)hdfs目錄
    checkpointDir = Option(directory).map { dir =>
      val path = new Path(dir, UUID.randomUUID().toString)
      val fs = path.getFileSystem(hadoopConfiguration)
      fs.mkdirs(path)
      fs.getFileStatus(path).getPath.toString
    }
  }

這個(gè)方法挺簡單的,就創(chuàng)建了一個(gè)目錄并巍,接下來我們看RDD核心的checkpoint 方法目木,跟進(jìn)去

  def checkpoint(): Unit = RDDCheckpointData.synchronized {
    if (context.checkpointDir.isEmpty) {
      throw new SparkException("Checkpoint directory has not been set in the SparkContext")
    } else if (checkpointData.isEmpty) {
      checkpointData = Some(new ReliableRDDCheckpointData(this))
    }
  }

這個(gè)方法沒有返回值,邏輯只有一個(gè)判斷懊渡,checkpointDir剛才設(shè)置過了刽射,不為空,然后創(chuàng)建了一個(gè)ReliableRDDCheckpointData,我們來看ReliableRDDCheckpointData

/**
 * An implementation of checkpointing that writes the RDD data to reliable storage.
 * This allows drivers to be restarted on failure with previously computed state.
 */
private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
  extends RDDCheckpointData[T](rdd) with Logging {
   剃执。誓禁。。肾档。摹恰。
}

這個(gè)ReliableRDDCheckpointData的父類RDDCheckpointData我們?cè)倮^續(xù)看它的父類

/**
*   RDD 需要經(jīng)過
*    [ Initialized  --> CheckpointingInProgress--> Checkpointed ] 
*    這幾個(gè)階段才能被 checkpoint。
*/

private[spark] object CheckpointState extends Enumeration {
  type CheckpointState = Value
  val Initialized, CheckpointingInProgress, Checkpointed = Value
}

private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
  extends Serializable {

  import CheckpointState._

  // The checkpoint state of the associated RDD.
  protected var cpState = Initialized
  
  怒见。俗慈。。遣耍。闺阱。。
}

RDD 需要經(jīng)過
[ Initialized --> CheckpointingInProgress--> Checkpointed ]
這幾個(gè)階段才能被 checkpoint舵变。
這類里面有一個(gè)枚舉來標(biāo)識(shí)CheckPoint的狀態(tài)酣溃,第一次初始化時(shí)是Initialized。
checkpoint這個(gè)一步已經(jīng)完成了纪隙,回到我們的RDD成員變量里checkpointData這個(gè)變量指向的RDDCheckpointData的實(shí)例救拉。

<b>Checkpoint初始化時(shí)序圖</b>

myuml__Collaboration1__Interaction1___0.jpg

checkpoint什么時(shí)候?qū)懭霐?shù)據(jù)

我們知道一個(gè)spark job運(yùn)行最終會(huì)調(diào)用SparkContextrunJob方法將任務(wù)提交給Executor去執(zhí)行,我們來看runJob

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

最后一行代碼調(diào)用了doCheckpoint瘫拣,在dagScheduler將任務(wù)提交給集群運(yùn)行之后亿絮,我來看這個(gè)doCheckpoint方法

  private[spark] def doCheckpoint(): Unit = {
    RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
      if (!doCheckpointCalled) {
        doCheckpointCalled = true
        if (checkpointData.isDefined) {
          checkpointData.get.checkpoint()
        } else {
          //遍歷依賴的rdd,調(diào)用每個(gè)rdd的doCheckpoint方法
          dependencies.foreach(_.rdd.doCheckpoint())
        }
      }
    }
  }

這個(gè)是一個(gè)遞歸,遍歷RDD依賴鏈條派昧,當(dāng)rdd是checkpointData不為空時(shí)黔姜,調(diào)用checkpointDatacheckpoint()方法。還記得checkpointData類型是什么嗎蒂萎?就是RDDCheckpointData 秆吵,我們來看它的checkpoint方法,以下

  final def checkpoint(): Unit = {
    // Guard against multiple threads checkpointing the same RDD by
    // atomically flipping the state of this RDDCheckpointData
    RDDCheckpointData.synchronized {
      if (cpState == Initialized) {

       //1五慈、標(biāo)記當(dāng)前狀態(tài)為正在checkpoint中
        cpState = CheckpointingInProgress
      } else {
        return
      }
    }

  //2 這里調(diào)用的是子類的doCheckpoint()
    val newRDD = doCheckpoint()

    // 3 標(biāo)記checkpoint已完成纳寂,清空RDD依賴
    RDDCheckpointData.synchronized {
      cpRDD = Some(newRDD)
      cpState = Checkpointed
      rdd.markCheckpointed()
    }
  }

這個(gè)方法開始做checkpoint操作了,將doCheckpoint交給子類去實(shí)現(xiàn)checkponit的邏輯泻拦,我們?nèi)タ醋宇愒趺磳?shí)現(xiàn)doCheckpoint

  protected override def doCheckpoint(): CheckpointRDD[T] = {

    // Create the output path for the checkpoint
    val path = new Path(cpDir)
    val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
    if (!fs.mkdirs(path)) {
      throw new SparkException(s"Failed to create checkpoint path $cpDir")
    }

    //需要的配置文件(如 core-site.xml 等)broadcast 到其他 worker 節(jié)點(diǎn)的 blockManager毙芜。

    val broadcastedConf = rdd.context.broadcast(
      new SerializableConfiguration(rdd.context.hadoopConfiguration))


   //向集群提交一個(gè)Job去執(zhí)行checkpoint操作,將RDD序列化到HDFS目錄上
    rdd.context.runJob(rdd, ReliableCheckpointRDD.writeCheckpointFile[T](cpDir, broadcastedConf) _)

    // 為該 rdd 生成一個(gè)新的依賴争拐,設(shè)置該 rdd 的 parent rdd 為  
    //CheckpointRDD腋粥,該 CheckpointRDD 負(fù)責(zé)以后讀取在文件系統(tǒng)上的   
   //checkpoint 文件,生成該 rdd 的 partition架曹。
    val newRDD = new ReliableCheckpointRDD[T](rdd.context, cpDir)
    if (newRDD.partitions.length != rdd.partitions.length) {
      throw new SparkException(
        s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +
          s"number of partitions from original RDD $rdd(${rdd.partitions.length})")
    }

    // 是否清除checkpoint文件如果超出引用的資源范圍
    if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
      rdd.context.cleaner.foreach { cleaner =>
        cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
      }
    }

    logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")

//  將新產(chǎn)生的RDD返回給父類
    newRDD
  }

上面的代碼最終會(huì)返回新的CheckpointRDD 隘冲,父類將它復(fù)值給成員變量cpRDD,最終標(biāo)記當(dāng)前狀態(tài)為Checkpointed并清空當(dāng)RDD的依賴鏈绑雄。到此Checkpoint的數(shù)據(jù)就被序列化到HDFS上了展辞。

<b> Checkpoint 寫數(shù)據(jù)時(shí)序圖</b>

checkpoint.jpg

checkpoint什么時(shí)候讀取數(shù)據(jù)

我們知道Task是saprk運(yùn)行任務(wù)的最小單元,當(dāng)Task執(zhí)行失敗的時(shí)候spark會(huì)重新計(jì)算万牺,這里Task進(jìn)行計(jì)算的地方就是讀取checkpoint的入口纵竖。我們可以看一下ShuffleMapTask 里的計(jì)算方法runTask,如下

  override def runTask(context: TaskContext): MapStatus = {
   
     。杏愤。。已脓。珊楼。。度液。

    try {
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)

    //調(diào)用rdd.iterator厕宗,迭代每個(gè)partition里的數(shù)據(jù),計(jì)算并寫入磁盤
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])

      writer.stop(success = true).get
    } catch {
      case e: Exception =>
        try {
          if (writer != null) {
            writer.stop(success = false)
          }
        } catch {
          case e: Exception =>
            log.debug("Could not stop writer", e)
        }
        throw e
    }
  }

這是spark真正調(diào)用計(jì)算方法的邏輯runTask調(diào)用 rdd.iterator() 去計(jì)算該 rdd 的 partition 的堕担,我們來看RDD的iterator()

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

這里會(huì)繼續(xù)調(diào)用computeOrReadCheckpoint,我們看該方法

**
   * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
   */
  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
  {
    if (isCheckpointedAndMaterialized) {
      firstParent[T].iterator(split, context)
    } else {
      compute(split, context)
    }
  }

當(dāng)調(diào)用rdd.iterator()去計(jì)算該 rdd 的 partition 的時(shí)候已慢,會(huì)調(diào)用 computeOrReadCheckpoint(split: Partition)去查看該 rdd 是否被 checkpoint 過了,如果是霹购,就調(diào)用該 rdd 的 parent rdd 的 iterator() 也就是 CheckpointRDD.iterator()佑惠,否則直接調(diào)用該RDD的compute, 那么我們就跟進(jìn)CheckpointRDDcompute

  /**
   * Read the content of the checkpoint file associated with the given partition.
   */
  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
    val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
    ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
  }

這里就兩行代碼,意思是從Path上讀取我們的CheckPoint數(shù)據(jù),看一下readCheckpointFile

  /**
   * Read the content of the specified checkpoint file.
   */
  def readCheckpointFile[T](
      path: Path,
      broadcastedConf: Broadcast[SerializableConfiguration],
      context: TaskContext): Iterator[T] = {
    val env = SparkEnv.get

  // 用hadoop API 讀取HDFS上的數(shù)據(jù)
    val fs = path.getFileSystem(broadcastedConf.value.value)
    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
    val fileInputStream = fs.open(path, bufferSize)
    val serializer = env.serializer.newInstance()
    val deserializeStream = serializer.deserializeStream(fileInputStream)

    // Register an on-task-completion callback to close the input stream.
    context.addTaskCompletionListener(context => deserializeStream.close())

    //反序列化數(shù)據(jù)后轉(zhuǎn)換為一個(gè)Iterator
    deserializeStream.asIterator.asInstanceOf[Iterator[T]]
  }

CheckpointRDD 負(fù)責(zé)讀取文件系統(tǒng)上的文件膜楷,生成該 rdd 的 partition旭咽。這就解釋了為什么要為調(diào)用了checkpoint的RDD 添加一個(gè) parent CheckpointRDD的原因。
到此赌厅,整個(gè)checkpoint的流程就結(jié)束了穷绵。

<b>Checkpoint 讀取數(shù)據(jù)時(shí)序圖</b>

checkpoint.jpg
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市特愿,隨后出現(xiàn)的幾起案子仲墨,更是在濱河造成了極大的恐慌,老刑警劉巖揍障,帶你破解...
    沈念sama閱讀 211,348評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件目养,死亡現(xiàn)場離奇詭異,居然都是意外死亡亚兄,警方通過查閱死者的電腦和手機(jī)混稽,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,122評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來审胚,“玉大人匈勋,你說我怎么就攤上這事∩胚叮” “怎么了洽洁?”我有些...
    開封第一講書人閱讀 156,936評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長菲嘴。 經(jīng)常有香客問我饿自,道長,這世上最難降的妖魔是什么龄坪? 我笑而不...
    開封第一講書人閱讀 56,427評(píng)論 1 283
  • 正文 為了忘掉前任昭雌,我火速辦了婚禮,結(jié)果婚禮上健田,老公的妹妹穿的比我還像新娘烛卧。我一直安慰自己,他們只是感情好妓局,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,467評(píng)論 6 385
  • 文/花漫 我一把揭開白布总放。 她就那樣靜靜地躺著,像睡著了一般好爬。 火紅的嫁衣襯著肌膚如雪局雄。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,785評(píng)論 1 290
  • 那天存炮,我揣著相機(jī)與錄音炬搭,去河邊找鬼蜈漓。 笑死,一個(gè)胖子當(dāng)著我的面吹牛尚蝌,可吹牛的內(nèi)容都是我干的迎变。 我是一名探鬼主播,決...
    沈念sama閱讀 38,931評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼飘言,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼衣形!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起姿鸿,我...
    開封第一講書人閱讀 37,696評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤谆吴,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后苛预,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體句狼,經(jīng)...
    沈念sama閱讀 44,141評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,483評(píng)論 2 327
  • 正文 我和宋清朗相戀三年热某,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了腻菇。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,625評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡昔馋,死狀恐怖筹吐,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情秘遏,我是刑警寧澤丘薛,帶...
    沈念sama閱讀 34,291評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站邦危,受9級(jí)特大地震影響洋侨,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜倦蚪,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,892評(píng)論 3 312
  • 文/蒙蒙 一希坚、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧陵且,春花似錦裁僧、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽播急。三九已至脓钾,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間桩警,已是汗流浹背可训。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人握截。 一個(gè)月前我還...
    沈念sama閱讀 46,324評(píng)論 2 360
  • 正文 我出身青樓飞崖,卻偏偏與公主長得像,于是被迫代替她去往敵國和親谨胞。 傳聞我的和親對(duì)象是個(gè)殘疾皇子固歪,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,492評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容