本期內(nèi)容:
一、Spark Streaming 數(shù)據(jù)清理總覽
二、****Spark Streaming ****數(shù)據(jù)清理過程詳解
三、****Spark Streaming ****數(shù)據(jù)清理的觸發(fā)機制
Spark Streaming不像普通Spark 的應(yīng)用程序,普通Spark程序運行完成后,中間數(shù)據(jù)會隨著SparkContext的關(guān)閉而被銷毀,而Spark Streaming一直在運行徘郭,不斷計算,每一秒中在不斷運行都會產(chǎn)生大量的中間數(shù)據(jù)己儒,所以需要對對象及元數(shù)據(jù)需要定期清理崎岂。每個batch duration運行時不斷觸發(fā)job后需要清理rdd和元數(shù)據(jù)捆毫。下面我們就結(jié)合源碼詳細(xì)解析一下Spark Streaming程序的數(shù)據(jù)清理機制闪湾。
一、數(shù)據(jù)清理總覽
Spark Streaming 運行過程中,隨著時間不斷產(chǎn)生Job,當(dāng)job運行結(jié)束后,需要清理相應(yīng)的數(shù)據(jù)(RDD,元數(shù)據(jù)信息,Checkpoint數(shù)據(jù)
),Job由JobGenerator定時產(chǎn)生,數(shù)據(jù)的清理也是有JobGenerator負(fù)責(zé)绩卤。
JobGenerator負(fù)責(zé)數(shù)據(jù)清理控制的代碼位于一個消息循環(huán)體eventLoop中:
eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = {
jobScheduler.reportError("Error in job generator", e)
}
}
eventLoop.start()
其中的核心邏輯位于processEvent(event)函數(shù)中:
/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
可以看到當(dāng)JobGenerator收到ClearMetadata(time) 和 ClearCheckpointData(time)是會進行相應(yīng)的數(shù)據(jù)清理,其中 clearMetadata(time)會清理RDD數(shù)據(jù)和一些元數(shù)據(jù)信息, ClearCheckpointData(time)會清理Checkpoint數(shù)據(jù)途样。
二、數(shù)據(jù)清理過程詳解
2.1 ClearMetaData 過程詳解
首先看一下clearMetaData函數(shù)的處理邏輯:
/** Clear DStream metadata for the given `time`. */
private def clearMetadata(time: Time) {
ssc.graph.clearMetadata(time)
// If checkpointing is enabled, then checkpoint,
// else mark batch to be fully processed
if (shouldCheckpoint) {
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
} else {
// If checkpointing is not enabled, then delete metadata information about
// received blocks (block data not saved in any case). Otherwise, wait for
// checkpointing of this batch to complete.
val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)
markBatchFullyProcessed(time)
}
}
首先調(diào)用了DStreamGraph的clearMetadata方法:
def clearMetadata(time: Time) {
logDebug("Clearing metadata for time " + time)
this.synchronized {
outputStreams.foreach(_.clearMetadata(time))
}
logDebug("Cleared old metadata for time " + time)
}
這里調(diào)用了所有OutputDStream (關(guān)于DStream 的分類請參考http://blog.csdn.net/zhouzx2010/article/details/51460790)的clearMetadata方法
private[streaming] def clearMetadata(time: Time) {
val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
//獲取需要清理的RDD
val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
logDebug("Clearing references to old RDDs: [" +
oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
//將要清除的RDD從generatedRDDs 中清除
generatedRDDs --= oldRDDs.keys
if (unpersistData) {
logDebug(s"Unpersisting old RDDs: ${oldRDDs.values.map(_.id).mkString(", ")}")
oldRDDs.values.foreach { rdd =>
//將RDD 從persistence列表中移除
rdd.unpersist(false)
// Explicitly remove blocks of BlockRDD
rdd match {
case b: BlockRDD[_] =>
logInfo(s"Removing blocks of RDD $b of time $time")
//移除RDD的block 數(shù)據(jù)
b.removeBlocks()
case _ =>
}
}
}
logDebug(s"Cleared ${oldRDDs.size} RDDs that were older than " +
s"${time - rememberDuration}: ${oldRDDs.keys.mkString(", ")}")
//清除依賴的DStream
dependencies.foreach(_.clearMetadata(time))
}
關(guān)鍵的清理邏輯在代碼中做了詳細(xì)注釋,首先清理DStream對應(yīng)的RDD的元數(shù)據(jù)信息,然后清理RDD的數(shù)據(jù),最后對DStream所依賴的DStream進行清理濒憋。
回到JobGenerator的clearMetadata函數(shù):
if (shouldCheckpoint) {
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
} else {
// If checkpointing is not enabled, then delete metadata information about
// received blocks (block data not saved in any case). Otherwise, wait for
// checkpointing of this batch to complete.
val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)
markBatchFullyProcessed(time)
}
調(diào)用了ReceiverTracker的 cleanupOldBlocksAndBatches方法何暇,最后調(diào)用了clearupOldBatches方法:
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
logInfo(s"Deleting batches: ${timesToCleanup.mkString(" ")}")
if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
//將要刪除的Batch數(shù)據(jù)清除
timeToAllocatedBlocks --= timesToCleanup
//清理WAL日志
writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
} else {
logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")
}
}
可以看到ReceiverTracker的clearupOldBatches方法清理了Receiver數(shù)據(jù),也就是Batch數(shù)據(jù)和WAL日志數(shù)據(jù)。
最后對InputInfoTracker信息進行清理:
def cleanup(batchThreshTime: Time): Unit = synchronized {
val timesToCleanup = batchTimeToInputInfos.keys.filter(_ < batchThreshTime)
logInfo(s"remove old batch metadata: ${timesToCleanup.mkString(" ")}")
batchTimeToInputInfos --= timesToCleanup
}
這簡單的清除了batchTimeToInputInfos 的輸入信息凛驮。
2.2 ClearCheckPoint 過程詳解
看一下clearCheckpointData的處理邏輯:****
/** Clear DStream checkpoint data for the given `time`. */
private def clearCheckpointData(time: Time) {
ssc.graph.clearCheckpointData(time)
// All the checkpoint information about which batches have been processed, etc have
// been saved to checkpoints, so its safe to delete block metadata and data WAL files
val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)
markBatchFullyProcessed(time)
}
后面的ReceiverTraker和InputInforTracker的清理邏輯和ClearMetaData的相同,這分析DStreamGraph的clearCheckpointData方法:
def clearCheckpointData(time: Time) {
logInfo("Clearing checkpoint data for time " + time)
this.synchronized {
outputStreams.foreach(_.clearCheckpointData(time))
}
logInfo("Cleared checkpoint data for time " + time)
}
同樣的調(diào)用了DStreamGraph中所有OutputDStream的clearCheckPiontData 方法:
private[streaming] def clearCheckpointData(time: Time) {
logDebug("Clearing checkpoint data")
checkpointData.cleanup(time)
dependencies.foreach(_.clearCheckpointData(time))
logDebug("Cleared checkpoint data")
}
這里的核心邏輯在checkpointData.cleanup(time)方法,這里的CheckpointData 是 DStreamCheckpointData對象, DStreamCheckpointData的clearup方法如下:
def cleanup(time: Time) {
// 獲取需要清理的Checkpoint 文件 時間
timeToOldestCheckpointFileTime.remove(time) match {
case Some(lastCheckpointFileTime) =>
//獲取需要刪除的文件
val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime)
logDebug("Files to delete:\n" + filesToDelete.mkString(","))
filesToDelete.foreach {
case (time, file) =>
try {
val path = new Path(file)
if (fileSystem == null) {
fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration)
}
//
** 刪除文件**
** **** **
fileSystem.delete(path, true)
timeToCheckpointFile -= time
logInfo("Deleted checkpoint file '" + file + "' for time " + time)
} catch {
case e: Exception =>
logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e)
fileSystem = null
}
}
case None =>
logDebug("Nothing to delete")
}
}
可以看到checkpoint的清理,就是刪除了指定時間以前的checkpoint文件裆站。
三、數(shù)據(jù)清理的觸發(fā)
**3.1 ClearMetaData 過程的觸發(fā)******
JobGenerator 生成job后,交給JobHandler執(zhí)行, JobHandler的run方法中,會在job執(zhí)行完后給JobScheduler 發(fā)送JobCompleted消息:
_eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
}
JobScheduler 收到JobCompleted 消息調(diào)用 handleJobCompletion 方法,源碼如下:
private def processEvent(event: JobSchedulerEvent) {
try {
event match {
case JobStarted(job, startTime) => handleJobStart(job, startTime)
case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
case ErrorReported(m, e) => handleError(m, e)
}
} catch {
case e: Throwable =>
reportError("Error in job scheduler", e)
}
}
在 JobScheduler 的handleJobCompletion方法中會調(diào)用JobGenerator的onBatchCompletion方法,我們看一下JobGenerator的 onBatchCompletion 方法的源碼:
def onBatchCompletion(time: Time) {
eventLoop.post(ClearMetadata(time))
}
可以看到JobGenerator的onBatchCompletion方法給自己發(fā)送了ClearMetadata消息從而觸發(fā)了ClearMetaData操作黔夭。
3.2 ****ClearCheckPoint ****過程的觸發(fā)
清理CheckPoint數(shù)據(jù)發(fā)生在CheckPoint完成之后,我們先看一下CheckPointHandler的run方法:
// All done, print success
val finishTime = System.currentTimeMillis()
logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile +
"', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms")
//調(diào)用JobGenerator的方法進行checkpoint數(shù)據(jù)清理
jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater)
可以看到在checkpoint完成后,會調(diào)用JobGenerator的onCheckpointCompletion方法進行checkpoint數(shù)據(jù)清理,我查看JobGenerator的onCheckpointCompletion方法源碼:
def onCheckpointCompletion(time: Time, clearCheckpointDataLater: Boolean) {
if (clearCheckpointDataLater) {
eventLoop.post(ClearCheckpointData(time))
}
}
可以看到JobGenerator的onCheckpointCompletion方法中首先對傳進來的 clearCheckpointDataLater 參數(shù)進行判斷,如果該參數(shù)為true,就會給JobGenerator的eventLoop循環(huán)體發(fā)送ClearCheckpointData消息,從而觸發(fā)clearCheckpointData 方法的調(diào)用,進行Checkpoint數(shù)據(jù)的清理宏胯。
什么時候該參數(shù)會true呢?
我們回到JobGenerator的 ClearMetadata 方法:
private def clearMetadata(time: Time) {
ssc.graph.clearMetadata(time)
if (shouldCheckpoint) {
//發(fā)送DoCheckpoint消息,并進行相應(yīng)的Checkpoint數(shù)據(jù)清理
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
} else {
val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)
markBatchFullyProcessed(time)
}
}
可以看到在clearMetadata方法中,發(fā)送了DoCheckpoint消息,其中參數(shù) clearCheckpointDataLater 為ture本姥。Generator的eventLoop收到該消息后調(diào)用 doCheckpoint 方法:
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time " + time)
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
}
}
這里關(guān)鍵一步:調(diào)用了CheckpointWriter的write方法,注意此時參數(shù) clearCheckpointDataLater 為true肩袍。我們進入該方法:
def write(checkpoint: Checkpoint, clearCheckpointDataLater: Boolean) {
try {
val bytes = Checkpoint.serialize(checkpoint, conf)
//將參數(shù)clearCheckpointDataLater傳入CheckpoitWriteHandler
executor.execute(new CheckpointWriteHandler(
checkpoint.checkpointTime, bytes, clearCheckpointDataLater))
logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
} catch {
case rej: RejectedExecutionException =>
logError("Could not submit checkpoint task to the thread pool executor", rej)
}
}
可以看到此時參數(shù) clearCheckpointDataLater 傳入CheckpointWriteHandler 。這樣Checkpoint完成之后就會發(fā)送ClearCheckpointData消息給JobGenerator進行Checkpoint數(shù)據(jù)的清理婚惫。