前言
在spark應(yīng)用程序中,常常會(huì)遇到運(yùn)算量很大經(jīng)過(guò)很復(fù)雜的 Transformation才能得到的RDD即Lineage鏈較長(zhǎng)鞠柄、寬依賴的RDD,此時(shí)我們可以考慮將這個(gè)RDD持久化嫉柴。
cache也是可以持久化到磁盤(pán)厌杜,只不過(guò)是直接將partition的輸出數(shù)據(jù)寫(xiě)到磁盤(pán),而checkpoint是在邏輯job完成后计螺,若有需要checkpoint的RDD夯尽,再單獨(dú)啟動(dòng)一個(gè)job去完成checkpoint,這樣該RDD就被計(jì)算了兩次登馒,所以建議在有checkpoint的時(shí)候先將該RDD cache到內(nèi)存匙握,到時(shí)候直接寫(xiě)到磁盤(pán)就行了。
checkpoint的實(shí)現(xiàn)
需要使用checkpoint都需要通過(guò)sparkcontext的setCheckpointDir方法設(shè)置一個(gè)目錄以存checkpoint的各種信息數(shù)據(jù)陈轿,下面我們來(lái)看看該方法:
def setCheckpointDir(directory: String) {
if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
logWarning("Spark is not running in local mode, therefore the checkpoint directory " +
s"must not be on the local filesystem. Directory '$directory' " +
"appears to be on the local filesystem.")
}
checkpointDir = Option(directory).map { dir =>
val path = new Path(dir, UUID.randomUUID().toString)
val fs = path.getFileSystem(hadoopConfiguration)
fs.mkdirs(path)
fs.getFileStatus(path).getPath.toString
}
}
在非local模式下圈纺,directory必須是HDFS的目錄;在該目錄下創(chuàng)建一個(gè)以UUID生成的一個(gè)唯一的目錄名的目錄麦射。
通過(guò)rdd.checkpoint()即可checkpoint此RDD
def checkpoint(): Unit = RDDCheckpointData.synchronized {
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}
先判斷是否設(shè)置了checkpointDir蛾娶,再判斷checkpointData.isEmpty是否成立,checkpointData的定義是這樣的:
private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
RDDCheckpointData和RDD一一對(duì)應(yīng)潜秋,保存著和checkpoint相關(guān)的信息蛔琅。這里通過(guò)new ReliableRDDCheckpointData(this)實(shí)例化了checkpointData ,ReliableRDDCheckpointData是其子類峻呛,這里相當(dāng)于是checkpoint的一個(gè)標(biāo)記罗售,并沒(méi)有真正執(zhí)行checkpoint。
什么時(shí)候checkpoint
在有action動(dòng)作時(shí)钩述,會(huì)觸發(fā)sparkcontext對(duì)runJob的調(diào)用:
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
我們可以看到在執(zhí)行完job后會(huì)執(zhí)行 rdd.doCheckpoint()寨躁,這里就是對(duì)前面標(biāo)記了的RDD的checkpoint,我們繼續(xù)看這個(gè)方法:
private[spark] def doCheckpoint(): Unit = {
RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
if (checkpointAllMarkedAncestors) {
dependencies.foreach(_.rdd.doCheckpoint())
}
checkpointData.get.checkpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
}
先判斷是否已經(jīng)被處理過(guò)checkpoint牙勘,沒(méi)有才執(zhí)行职恳,并將doCheckpointCalled 設(shè)為true,因?yàn)榍懊嬉呀?jīng)初始化過(guò)了checkpointData,所以checkpointData.isDefined也滿足话肖,若想要把checkpointData定義過(guò)的RDD的parents也進(jìn)行checkpoint的話,那么我們需要先對(duì)parents checkpoint葡幸。因?yàn)樽钔玻绻鸕DD把自己checkpoint了,那么它就將lineage中它的parents給切除了蔚叨。繼續(xù)跟進(jìn)checkpointData.get.checkpoint()
final def checkpoint(): Unit = {
// Guard against multiple threads checkpointing the same RDD by
// atomically flipping the state of this RDDCheckpointData
RDDCheckpointData.synchronized {
if (cpState == Initialized) {
cpState = CheckpointingInProgress
} else {
return
}
}
val newRDD = doCheckpoint()
// Update our state and truncate the RDD lineage
RDDCheckpointData.synchronized {
cpRDD = Some(newRDD)
cpState = Checkpointed
rdd.markCheckpointed()
}
}
先將checkpoint的狀態(tài)改為CheckpointingInProgress床蜘,再執(zhí)行doCheckpoint,返回一個(gè)newRDD蔑水,看doCheckpoint做了什么:
protected override def doCheckpoint(): CheckpointRDD[T] = {
val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
rdd.context.cleaner.foreach { cleaner =>
cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
}
}
logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
newRDD
}
ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)邢锯,將一個(gè)RDD寫(xiě)入到多個(gè)checkpoint文件,并返回一個(gè)ReliableCheckpointRDD來(lái)代表這個(gè)RDD
def writeRDDToCheckpointDirectory[T: ClassTag](
originalRDD: RDD[T],
checkpointDir: String,
blockSize: Int = -1): ReliableCheckpointRDD[T] = {
val sc = originalRDD.sparkContext
// Create the output path for the checkpoint
val checkpointDirPath = new Path(checkpointDir)
val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
if (!fs.mkdirs(checkpointDirPath)) {
throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
}
// Save to file, and reload it as an RDD
val broadcastedConf = sc.broadcast(
new SerializableConfiguration(sc.hadoopConfiguration))
// TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
sc.runJob(originalRDD,
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
if (originalRDD.partitioner.nonEmpty) {
writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
}
val newRDD = new ReliableCheckpointRDD[T](
sc, checkpointDirPath.toString, originalRDD.partitioner)
if (newRDD.partitions.length != originalRDD.partitions.length) {
throw new SparkException(
s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +
s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})")
}
newRDD
}
獲取一些配置信息廣播輸出等操作搀别,然后啟動(dòng)一個(gè)Job去寫(xiě)Checkpint文件丹擎,主要由ReliableCheckpointRDD.writeCheckpointFile來(lái)實(shí)現(xiàn)寫(xiě)操作杀狡,寫(xiě)完checkpoint后new一個(gè)ReliableCheckpointRDD實(shí)例返回忧饭,看看具體的writePartitionToCheckpointFile實(shí)現(xiàn):
def writePartitionToCheckpointFile[T: ClassTag](
path: String,
broadcastedConf: Broadcast[SerializableConfiguration],
blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
val outputDir = new Path(path)
val fs = outputDir.getFileSystem(broadcastedConf.value.value)
val finalOutputName = ReliableCheckpointRDD.checkpointFileName(ctx.partitionId())
val finalOutputPath = new Path(outputDir, finalOutputName)
val tempOutputPath =
new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")
if (fs.exists(tempOutputPath)) {
throw new IOException(s"Checkpoint failed: temporary path $tempOutputPath already exists")
}
val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
val fileOutputStream = if (blockSize < 0) {
fs.create(tempOutputPath, false, bufferSize)
} else {
// This is mainly for testing purpose
fs.create(tempOutputPath, false, bufferSize,
fs.getDefaultReplication(fs.getWorkingDirectory), blockSize)
}
val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
Utils.tryWithSafeFinally {
serializeStream.writeAll(iterator)
} {
serializeStream.close()
}
if (!fs.rename(tempOutputPath, finalOutputPath)) {
if (!fs.exists(finalOutputPath)) {
logInfo(s"Deleting tempOutputPath $tempOutputPath")
fs.delete(tempOutputPath, false)
throw new IOException("Checkpoint failed: failed to save output of task: " +
s"${ctx.attemptNumber()} and final output path does not exist: $finalOutputPath")
} else {
// Some other copy of this task must've finished before us and renamed it
logInfo(s"Final output path $finalOutputPath already exists; not overwriting it")
if (!fs.delete(tempOutputPath, false)) {
logWarning(s"Error deleting ${tempOutputPath}")
}
}
}
}
這里的代碼就是普通的對(duì)HDFS寫(xiě)文件的操作潜索,將一個(gè)RDD partition的數(shù)據(jù)寫(xiě)到checkpoint目錄下梧奢。
doCheckpoint()操作已經(jīng)完成著淆,返回了一個(gè)new RDD:ReliableCheckpointRDD引用給cpRDD近迁,接著標(biāo)記checkpoint的狀態(tài)為Checkpointed打掘,rdd.markCheckpointed()干了什么呢?
private[spark] def markCheckpointed(): Unit = {
clearDependencies()
partitions_ = null
deps = null // Forget the constructor argument for dependencies too
}
最后再清除RDD的所有依賴绊率。
寫(xiě)checkpoint總結(jié)
- Initialized
- marked for checkpointing
- checkpointing in progress
- checkpointed
什么時(shí)候讀checkpoint
在需要讀取一個(gè)partition的數(shù)據(jù)時(shí)垂睬,會(huì)通過(guò)rdd.iterator() 去計(jì)算該 rdd 的 partition 的媳荒,我們來(lái)看RDD的iterator()實(shí)現(xiàn):
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
在cache中沒(méi)有讀到數(shù)據(jù)時(shí)再判斷該RDD是否被checkpoint過(guò),isCheckpointedAndMaterialized就是在checkpoint成功時(shí)的一個(gè)狀態(tài)標(biāo)記:cpState = Checkpointed驹饺。
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
當(dāng)該RDD被成功checkpoint了钳枕,直接使用parent rdd 的 iterator() 也就是 CheckpointRDD.iterator(),否則直接調(diào)用該RDD的compute方法逻淌。
final def dependencies: Seq[Dependency[_]] = {
checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
if (dependencies_ == null) {
dependencies_ = getDependencies
}
dependencies_
}
}
獲取RDD的依賴時(shí)么伯,會(huì)先嘗試從checkpointRDD中獲取依賴,若成功則返回被OneToOneDependency包裝過(guò)的ReliableCheckpointRDD對(duì)象卡儒,否則獲取真正的依賴田柔。