概述
checkpoint 的機(jī)制保證了需要訪問重復(fù)數(shù)據(jù)的應(yīng)用 Spark 的DAG執(zhí)行行圖可能很龐大触趴,task 中計(jì)算鏈可能會(huì)很長驼修,這時(shí)如果 task 中途運(yùn)行出錯(cuò)宏所,那么 task 的整個(gè)需要重算非常耗時(shí),因此越驻,有必要將計(jì)算代價(jià)較大的 RDD checkpoint 一下汁政,當(dāng)下游 RDD 計(jì)算出錯(cuò)時(shí),可以直接從 checkpoint 過的 RDD 那里讀取數(shù)據(jù)繼續(xù)算缀旁。
<b>我們先來看一個(gè)例子记劈,checkpoint的使用</b>
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object CheckPointTest {
def main(args: Array[String]) {
val sc: SparkContext = SparkContext.getOrCreate(new SparkConf().setAppName("ck").setMaster("local[2]"))
sc.setCheckpointDir("/Users/kinge/ck")
val rdd: RDD[(String, Int)] = sc.textFile("").map{x=>(x,1) }.reduceByKey(_+_)
rdd.checkpoint()
rdd.count()
rdd.groupBy(x=>x._2).collect().foreach(println)
}
}
checkpoint流程分析
checkpoint初始化
我們可以看到最先調(diào)用了SparkContext
的setCheckpointDir
設(shè)置了一個(gè)checkpoint 目錄
我們跟進(jìn)這個(gè)方法看一下
/**
* Set the directory under which RDDs are going to be checkpointed. The directory must
* be a HDFS path if running on a cluster.
*/
def setCheckpointDir(directory: String) {
// If we are running on a cluster, log a warning if the directory is local.
// Otherwise, the driver may attempt to reconstruct the checkpointed RDD from
// its own local file system, which is incorrect because the checkpoint files
// are actually on the executor machines.
if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
logWarning("Checkpoint directory must be non-local " +
"if Spark is running on a cluster: " + directory)
}
//利用hadoop的api創(chuàng)建了一個(gè)hdfs目錄
checkpointDir = Option(directory).map { dir =>
val path = new Path(dir, UUID.randomUUID().toString)
val fs = path.getFileSystem(hadoopConfiguration)
fs.mkdirs(path)
fs.getFileStatus(path).getPath.toString
}
}
這個(gè)方法挺簡單的,就創(chuàng)建了一個(gè)目錄并巍,接下來我們看RDD核心的checkpoint
方法目木,跟進(jìn)去
def checkpoint(): Unit = RDDCheckpointData.synchronized {
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}
這個(gè)方法沒有返回值,邏輯只有一個(gè)判斷懊渡,checkpointDir
剛才設(shè)置過了刽射,不為空,然后創(chuàng)建了一個(gè)ReliableRDDCheckpointData
,我們來看ReliableRDDCheckpointData
/**
* An implementation of checkpointing that writes the RDD data to reliable storage.
* This allows drivers to be restarted on failure with previously computed state.
*/
private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {
剃执。誓禁。。肾档。摹恰。
}
這個(gè)ReliableRDDCheckpointData
的父類RDDCheckpointData
我們?cè)倮^續(xù)看它的父類
/**
* RDD 需要經(jīng)過
* [ Initialized --> CheckpointingInProgress--> Checkpointed ]
* 這幾個(gè)階段才能被 checkpoint。
*/
private[spark] object CheckpointState extends Enumeration {
type CheckpointState = Value
val Initialized, CheckpointingInProgress, Checkpointed = Value
}
private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
extends Serializable {
import CheckpointState._
// The checkpoint state of the associated RDD.
protected var cpState = Initialized
怒见。俗慈。。遣耍。闺阱。。
}
RDD 需要經(jīng)過
[ Initialized --> CheckpointingInProgress--> Checkpointed ]
這幾個(gè)階段才能被 checkpoint舵变。
這類里面有一個(gè)枚舉來標(biāo)識(shí)CheckPoint的狀態(tài)酣溃,第一次初始化時(shí)是Initialized。
checkpoint這個(gè)一步已經(jīng)完成了纪隙,回到我們的RDD成員變量里checkpointData
這個(gè)變量指向的RDDCheckpointData
的實(shí)例救拉。
<b>Checkpoint初始化時(shí)序圖</b>
checkpoint什么時(shí)候?qū)懭霐?shù)據(jù)
我們知道一個(gè)spark job運(yùn)行最終會(huì)調(diào)用SparkContext
的runJob
方法將任務(wù)提交給Executor去執(zhí)行,我們來看runJob
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
最后一行代碼調(diào)用了doCheckpoint
瘫拣,在dagScheduler
將任務(wù)提交給集群運(yùn)行之后亿絮,我來看這個(gè)doCheckpoint
方法
private[spark] def doCheckpoint(): Unit = {
RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
checkpointData.get.checkpoint()
} else {
//遍歷依賴的rdd,調(diào)用每個(gè)rdd的doCheckpoint方法
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
}
這個(gè)是一個(gè)遞歸,遍歷RDD依賴鏈條派昧,當(dāng)rdd是checkpointData
不為空時(shí)黔姜,調(diào)用checkpointData
的checkpoint()
方法。還記得checkpointData
類型是什么嗎蒂萎?就是RDDCheckpointData
秆吵,我們來看它的checkpoint
方法,以下
final def checkpoint(): Unit = {
// Guard against multiple threads checkpointing the same RDD by
// atomically flipping the state of this RDDCheckpointData
RDDCheckpointData.synchronized {
if (cpState == Initialized) {
//1五慈、標(biāo)記當(dāng)前狀態(tài)為正在checkpoint中
cpState = CheckpointingInProgress
} else {
return
}
}
//2 這里調(diào)用的是子類的doCheckpoint()
val newRDD = doCheckpoint()
// 3 標(biāo)記checkpoint已完成纳寂,清空RDD依賴
RDDCheckpointData.synchronized {
cpRDD = Some(newRDD)
cpState = Checkpointed
rdd.markCheckpointed()
}
}
這個(gè)方法開始做checkpoint操作了,將doCheckpoint
交給子類去實(shí)現(xiàn)checkponit的邏輯泻拦,我們?nèi)タ醋宇愒趺磳?shí)現(xiàn)doCheckpoint
protected override def doCheckpoint(): CheckpointRDD[T] = {
// Create the output path for the checkpoint
val path = new Path(cpDir)
val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
if (!fs.mkdirs(path)) {
throw new SparkException(s"Failed to create checkpoint path $cpDir")
}
//需要的配置文件(如 core-site.xml 等)broadcast 到其他 worker 節(jié)點(diǎn)的 blockManager毙芜。
val broadcastedConf = rdd.context.broadcast(
new SerializableConfiguration(rdd.context.hadoopConfiguration))
//向集群提交一個(gè)Job去執(zhí)行checkpoint操作,將RDD序列化到HDFS目錄上
rdd.context.runJob(rdd, ReliableCheckpointRDD.writeCheckpointFile[T](cpDir, broadcastedConf) _)
// 為該 rdd 生成一個(gè)新的依賴争拐,設(shè)置該 rdd 的 parent rdd 為
//CheckpointRDD腋粥,該 CheckpointRDD 負(fù)責(zé)以后讀取在文件系統(tǒng)上的
//checkpoint 文件,生成該 rdd 的 partition架曹。
val newRDD = new ReliableCheckpointRDD[T](rdd.context, cpDir)
if (newRDD.partitions.length != rdd.partitions.length) {
throw new SparkException(
s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +
s"number of partitions from original RDD $rdd(${rdd.partitions.length})")
}
// 是否清除checkpoint文件如果超出引用的資源范圍
if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
rdd.context.cleaner.foreach { cleaner =>
cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
}
}
logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
// 將新產(chǎn)生的RDD返回給父類
newRDD
}
上面的代碼最終會(huì)返回新的CheckpointRDD
隘冲,父類將它復(fù)值給成員變量cpRDD
,最終標(biāo)記當(dāng)前狀態(tài)為Checkpointed并清空當(dāng)RDD的依賴鏈绑雄。到此Checkpoint的數(shù)據(jù)就被序列化到HDFS上了展辞。
<b> Checkpoint 寫數(shù)據(jù)時(shí)序圖</b>
checkpoint什么時(shí)候讀取數(shù)據(jù)
我們知道Task是saprk運(yùn)行任務(wù)的最小單元,當(dāng)Task執(zhí)行失敗的時(shí)候spark會(huì)重新計(jì)算万牺,這里Task進(jìn)行計(jì)算的地方就是讀取checkpoint的入口纵竖。我們可以看一下ShuffleMapTask
里的計(jì)算方法runTask
,如下
override def runTask(context: TaskContext): MapStatus = {
。杏愤。。已脓。珊楼。。度液。
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
//調(diào)用rdd.iterator厕宗,迭代每個(gè)partition里的數(shù)據(jù),計(jì)算并寫入磁盤
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
這是spark真正調(diào)用計(jì)算方法的邏輯runTask
調(diào)用 rdd.iterator()
去計(jì)算該 rdd 的 partition 的堕担,我們來看RDD的iterator()
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}
這里會(huì)繼續(xù)調(diào)用computeOrReadCheckpoint
,我們看該方法
**
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
*/
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
當(dāng)調(diào)用rdd.iterator()
去計(jì)算該 rdd 的 partition 的時(shí)候已慢,會(huì)調(diào)用 computeOrReadCheckpoint(split: Partition)
去查看該 rdd 是否被 checkpoint 過了,如果是霹购,就調(diào)用該 rdd 的 parent rdd 的 iterator() 也就是 CheckpointRDD.iterator()佑惠,否則直接調(diào)用該RDD的compute
, 那么我們就跟進(jìn)CheckpointRDD
的compute
/**
* Read the content of the checkpoint file associated with the given partition.
*/
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
}
這里就兩行代碼,意思是從Path上讀取我們的CheckPoint數(shù)據(jù),看一下readCheckpointFile
/**
* Read the content of the specified checkpoint file.
*/
def readCheckpointFile[T](
path: Path,
broadcastedConf: Broadcast[SerializableConfiguration],
context: TaskContext): Iterator[T] = {
val env = SparkEnv.get
// 用hadoop API 讀取HDFS上的數(shù)據(jù)
val fs = path.getFileSystem(broadcastedConf.value.value)
val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
val fileInputStream = fs.open(path, bufferSize)
val serializer = env.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => deserializeStream.close())
//反序列化數(shù)據(jù)后轉(zhuǎn)換為一個(gè)Iterator
deserializeStream.asIterator.asInstanceOf[Iterator[T]]
}
CheckpointRDD
負(fù)責(zé)讀取文件系統(tǒng)上的文件膜楷,生成該 rdd 的 partition旭咽。這就解釋了為什么要為調(diào)用了checkpoint
的RDD 添加一個(gè) parent CheckpointRDD
的原因。
到此赌厅,整個(gè)checkpoint的流程就結(jié)束了穷绵。
<b>Checkpoint 讀取數(shù)據(jù)時(shí)序圖</b>