16.Spark Streaming源碼解讀之?dāng)?shù)據(jù)清理機制解析

本期內(nèi)容:
一、Spark Streaming 數(shù)據(jù)清理總覽
二、****Spark Streaming ****數(shù)據(jù)清理過程詳解
三、****Spark Streaming ****數(shù)據(jù)清理的觸發(fā)機制


Spark Streaming不像普通Spark 的應(yīng)用程序,普通Spark程序運行完成后,中間數(shù)據(jù)會隨著SparkContext的關(guān)閉而被銷毀,而Spark Streaming一直在運行徘郭,不斷計算,每一秒中在不斷運行都會產(chǎn)生大量的中間數(shù)據(jù)己儒,所以需要對對象及元數(shù)據(jù)需要定期清理崎岂。每個batch duration運行時不斷觸發(fā)job后需要清理rdd和元數(shù)據(jù)捆毫。下面我們就結(jié)合源碼詳細(xì)解析一下Spark Streaming程序的數(shù)據(jù)清理機制闪湾。

一、數(shù)據(jù)清理總覽
Spark Streaming 運行過程中,隨著時間不斷產(chǎn)生Job,當(dāng)job運行結(jié)束后,需要清理相應(yīng)的數(shù)據(jù)(RDD,元數(shù)據(jù)信息,Checkpoint數(shù)據(jù)
),Job由JobGenerator定時產(chǎn)生,數(shù)據(jù)的清理也是有JobGenerator負(fù)責(zé)绩卤。
JobGenerator負(fù)責(zé)數(shù)據(jù)清理控制的代碼位于一個消息循環(huán)體eventLoop中:

 eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {

 override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

 

 override protected def onError(e: Throwable): Unit = {

 jobScheduler.reportError("Error in job generator", e)

 }

 }

 eventLoop.start()

其中的核心邏輯位于processEvent(event)函數(shù)中:

 /** Processes all events */

 private def processEvent(event: JobGeneratorEvent) {

 logDebug("Got event " + event)

 event match {

 case GenerateJobs(time) => generateJobs(time)

 case ClearMetadata(time) => clearMetadata(time)

 case DoCheckpoint(time, clearCheckpointDataLater) =>

 doCheckpoint(time, clearCheckpointDataLater)

 case ClearCheckpointData(time) => clearCheckpointData(time)

 }

 }

可以看到當(dāng)JobGenerator收到ClearMetadata(time) 和 ClearCheckpointData(time)是會進行相應(yīng)的數(shù)據(jù)清理,其中 clearMetadata(time)會清理RDD數(shù)據(jù)和一些元數(shù)據(jù)信息, ClearCheckpointData(time)會清理Checkpoint數(shù)據(jù)途样。

二、數(shù)據(jù)清理過程詳解
2.1 ClearMetaData 過程詳解

首先看一下clearMetaData函數(shù)的處理邏輯:

 /** Clear DStream metadata for the given `time`. */

 private def clearMetadata(time: Time) {

 ssc.graph.clearMetadata(time)

 

 // If checkpointing is enabled, then checkpoint,

 // else mark batch to be fully processed

 if (shouldCheckpoint) {

 eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))

 } else {

 // If checkpointing is not enabled, then delete metadata information about

 // received blocks (block data not saved in any case). Otherwise, wait for

 // checkpointing of this batch to complete.

 val maxRememberDuration = graph.getMaxInputStreamRememberDuration()

 jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)

 jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)

 markBatchFullyProcessed(time)

 }

 }

首先調(diào)用了DStreamGraph的clearMetadata方法:

 def clearMetadata(time: Time) {

 logDebug("Clearing metadata for time " + time)

 this.synchronized {

 outputStreams.foreach(_.clearMetadata(time))

 }

 logDebug("Cleared old metadata for time " + time)

 }

這里調(diào)用了所有OutputDStream (關(guān)于DStream 的分類請參考http://blog.csdn.net/zhouzx2010/article/details/51460790)的clearMetadata方法

 private[streaming] def clearMetadata(time: Time) {

 val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)

 //獲取需要清理的RDD

 val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))

 logDebug("Clearing references to old RDDs: [" +

 oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")

 //將要清除的RDD從generatedRDDs 中清除 

 generatedRDDs --= oldRDDs.keys

 if (unpersistData) {

 logDebug(s"Unpersisting old RDDs: ${oldRDDs.values.map(_.id).mkString(", ")}")

 oldRDDs.values.foreach { rdd =>

   //將RDD 從persistence列表中移除

 rdd.unpersist(false)

 // Explicitly remove blocks of BlockRDD

 rdd match {

 case b: BlockRDD[_] =>

 logInfo(s"Removing blocks of RDD $b of time $time")

 //移除RDD的block 數(shù)據(jù)

 b.removeBlocks()

 case _ =>

 }

 }

 }

 logDebug(s"Cleared ${oldRDDs.size} RDDs that were older than " +

 s"${time - rememberDuration}: ${oldRDDs.keys.mkString(", ")}")

 //清除依賴的DStream

 dependencies.foreach(_.clearMetadata(time))

 }

關(guān)鍵的清理邏輯在代碼中做了詳細(xì)注釋,首先清理DStream對應(yīng)的RDD的元數(shù)據(jù)信息,然后清理RDD的數(shù)據(jù),最后對DStream所依賴的DStream進行清理濒憋。

回到JobGenerator的clearMetadata函數(shù):

 if (shouldCheckpoint) {

 eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))

 } else {

 // If checkpointing is not enabled, then delete metadata information about

 // received blocks (block data not saved in any case). Otherwise, wait for

 // checkpointing of this batch to complete.

 val maxRememberDuration = graph.getMaxInputStreamRememberDuration()

 jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)

 jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)

 markBatchFullyProcessed(time)

 }

調(diào)用了ReceiverTracker的 cleanupOldBlocksAndBatches方法何暇,最后調(diào)用了clearupOldBatches方法:

 def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {

 require(cleanupThreshTime.milliseconds < clock.getTimeMillis())

 val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq

 logInfo(s"Deleting batches: ${timesToCleanup.mkString(" ")}")

 if (writeToLog(BatchCleanupEvent(timesToCleanup))) {

 //將要刪除的Batch數(shù)據(jù)清除

 timeToAllocatedBlocks --= timesToCleanup

 //清理WAL日志

 writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))

 } else {

 logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")

 }

 }

可以看到ReceiverTracker的clearupOldBatches方法清理了Receiver數(shù)據(jù),也就是Batch數(shù)據(jù)和WAL日志數(shù)據(jù)。
最后對InputInfoTracker信息進行清理:

 def cleanup(batchThreshTime: Time): Unit = synchronized {

 val timesToCleanup = batchTimeToInputInfos.keys.filter(_ < batchThreshTime)

 logInfo(s"remove old batch metadata: ${timesToCleanup.mkString(" ")}")

 batchTimeToInputInfos --= timesToCleanup

 }

這簡單的清除了batchTimeToInputInfos 的輸入信息凛驮。

2.2 ClearCheckPoint 過程詳解

看一下clearCheckpointData的處理邏輯:****

 /** Clear DStream checkpoint data for the given `time`. */

 private def clearCheckpointData(time: Time) {

 ssc.graph.clearCheckpointData(time)

 

 // All the checkpoint information about which batches have been processed, etc have

 // been saved to checkpoints, so its safe to delete block metadata and data WAL files

 val maxRememberDuration = graph.getMaxInputStreamRememberDuration()

 jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)

 jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)

 markBatchFullyProcessed(time)

 }

后面的ReceiverTraker和InputInforTracker的清理邏輯和ClearMetaData的相同,這分析DStreamGraph的clearCheckpointData方法:

 def clearCheckpointData(time: Time) {

 logInfo("Clearing checkpoint data for time " + time)

 this.synchronized {

 outputStreams.foreach(_.clearCheckpointData(time))

 }

 logInfo("Cleared checkpoint data for time " + time)

 }

同樣的調(diào)用了DStreamGraph中所有OutputDStream的clearCheckPiontData 方法:

 private[streaming] def clearCheckpointData(time: Time) {

 logDebug("Clearing checkpoint data")

 checkpointData.cleanup(time)

 dependencies.foreach(_.clearCheckpointData(time))

 logDebug("Cleared checkpoint data")

 }

這里的核心邏輯在checkpointData.cleanup(time)方法,這里的CheckpointData 是 DStreamCheckpointData對象, DStreamCheckpointData的clearup方法如下:

def cleanup(time: Time) {

 // 獲取需要清理的Checkpoint 文件 時間

 timeToOldestCheckpointFileTime.remove(time) match {

 case Some(lastCheckpointFileTime) =>

 //獲取需要刪除的文件

 val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime)

 logDebug("Files to delete:\n" + filesToDelete.mkString(","))

 filesToDelete.foreach {

 case (time, file) =>

 try {

 val path = new Path(file)

 if (fileSystem == null) {

 fileSystem = path.getFileSystem(dstream.ssc.sparkContext.hadoopConfiguration)

 }

 //

 ** 刪除文件**
**  **** **  

 fileSystem.delete(path, true)

 timeToCheckpointFile -= time

 logInfo("Deleted checkpoint file '" + file + "' for time " + time)

 } catch {

 case e: Exception =>

 logWarning("Error deleting old checkpoint file '" + file + "' for time " + time, e)

 fileSystem = null

 }

 }

 case None =>

 logDebug("Nothing to delete")

 }

 }

可以看到checkpoint的清理,就是刪除了指定時間以前的checkpoint文件裆站。

三、數(shù)據(jù)清理的觸發(fā)
**3.1 ClearMetaData 過程的觸發(fā)******
JobGenerator 生成job后,交給JobHandler執(zhí)行, JobHandler的run方法中,會在job執(zhí)行完后給JobScheduler 發(fā)送JobCompleted消息:

 _eventLoop = eventLoop

 if (_eventLoop != null) {

 _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))

 

 }

JobScheduler 收到JobCompleted 消息調(diào)用 handleJobCompletion 方法,源碼如下:

 private def processEvent(event: JobSchedulerEvent) {

 try {

 event match {

 case JobStarted(job, startTime) => handleJobStart(job, startTime)

 case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)

 case ErrorReported(m, e) => handleError(m, e)

 }

 } catch {

 case e: Throwable =>

 reportError("Error in job scheduler", e)

 }

 }

在 JobScheduler 的handleJobCompletion方法中會調(diào)用JobGenerator的onBatchCompletion方法,我們看一下JobGenerator的 onBatchCompletion 方法的源碼:

 def onBatchCompletion(time: Time) {

 eventLoop.post(ClearMetadata(time))

 }

可以看到JobGenerator的onBatchCompletion方法給自己發(fā)送了ClearMetadata消息從而觸發(fā)了ClearMetaData操作黔夭。

3.2 ****ClearCheckPoint ****過程的觸發(fā)
清理CheckPoint數(shù)據(jù)發(fā)生在CheckPoint完成之后,我們先看一下CheckPointHandler的run方法:


 // All done, print success

 val finishTime = System.currentTimeMillis()

 logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile +

 "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms")

 //調(diào)用JobGenerator的方法進行checkpoint數(shù)據(jù)清理

 jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater)

可以看到在checkpoint完成后,會調(diào)用JobGenerator的onCheckpointCompletion方法進行checkpoint數(shù)據(jù)清理,我查看JobGenerator的onCheckpointCompletion方法源碼:

 def onCheckpointCompletion(time: Time, clearCheckpointDataLater: Boolean) {

 if (clearCheckpointDataLater) {

 eventLoop.post(ClearCheckpointData(time))

 }

 }

可以看到JobGenerator的onCheckpointCompletion方法中首先對傳進來的 clearCheckpointDataLater 參數(shù)進行判斷,如果該參數(shù)為true,就會給JobGenerator的eventLoop循環(huán)體發(fā)送ClearCheckpointData消息,從而觸發(fā)clearCheckpointData 方法的調(diào)用,進行Checkpoint數(shù)據(jù)的清理宏胯。
什么時候該參數(shù)會true呢?
我們回到JobGenerator的 ClearMetadata 方法:

 private def clearMetadata(time: Time) {

 ssc.graph.clearMetadata(time)

 

 if (shouldCheckpoint) {

 //發(fā)送DoCheckpoint消息,并進行相應(yīng)的Checkpoint數(shù)據(jù)清理

 eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))

 } else {

 val maxRememberDuration = graph.getMaxInputStreamRememberDuration()

 jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)

 jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)

 markBatchFullyProcessed(time)

 }

 }

可以看到在clearMetadata方法中,發(fā)送了DoCheckpoint消息,其中參數(shù) clearCheckpointDataLater 為ture本姥。Generator的eventLoop收到該消息后調(diào)用 doCheckpoint 方法:

 private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {

 if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {

 logInfo("Checkpointing graph for time " + time)

 ssc.graph.updateCheckpointData(time)

 checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)

 }

 }

這里關(guān)鍵一步:調(diào)用了CheckpointWriter的write方法,注意此時參數(shù) clearCheckpointDataLater 為true肩袍。我們進入該方法:

 def write(checkpoint: Checkpoint, clearCheckpointDataLater: Boolean) {

 try {

 val bytes = Checkpoint.serialize(checkpoint, conf)

//將參數(shù)clearCheckpointDataLater傳入CheckpoitWriteHandler

 executor.execute(new CheckpointWriteHandler(

 checkpoint.checkpointTime, bytes, clearCheckpointDataLater))

 logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")

 } catch {

 case rej: RejectedExecutionException =>

 logError("Could not submit checkpoint task to the thread pool executor", rej)

 }

 }

可以看到此時參數(shù) clearCheckpointDataLater 傳入CheckpointWriteHandler 。這樣Checkpoint完成之后就會發(fā)送ClearCheckpointData消息給JobGenerator進行Checkpoint數(shù)據(jù)的清理婚惫。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末氛赐,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子先舷,更是在濱河造成了極大的恐慌艰管,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,561評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蒋川,死亡現(xiàn)場離奇詭異牲芋,居然都是意外死亡,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,218評論 3 385
  • 文/潘曉璐 我一進店門街图,熙熙樓的掌柜王于貴愁眉苦臉地迎上來浇衬,“玉大人,你說我怎么就攤上這事餐济≡爬蓿” “怎么了?”我有些...
    開封第一講書人閱讀 157,162評論 0 348
  • 文/不壞的土叔 我叫張陵絮姆,是天一觀的道長醉冤。 經(jīng)常有香客問我,道長篙悯,這世上最難降的妖魔是什么蚁阳? 我笑而不...
    開封第一講書人閱讀 56,470評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮鸽照,結(jié)果婚禮上螺捐,老公的妹妹穿的比我還像新娘。我一直安慰自己矮燎,他們只是感情好定血,可當(dāng)我...
    茶點故事閱讀 65,550評論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著诞外,像睡著了一般澜沟。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上峡谊,一...
    開封第一講書人閱讀 49,806評論 1 290
  • 那天茫虽,我揣著相機與錄音,去河邊找鬼既们。 笑死濒析,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的贤壁。 我是一名探鬼主播悼枢,決...
    沈念sama閱讀 38,951評論 3 407
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼脾拆!你這毒婦竟也來了馒索?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,712評論 0 266
  • 序言:老撾萬榮一對情侶失蹤名船,失蹤者是張志新(化名)和其女友劉穎绰上,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體渠驼,經(jīng)...
    沈念sama閱讀 44,166評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡蜈块,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,510評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片百揭。...
    茶點故事閱讀 38,643評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡爽哎,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出器一,到底是詐尸還是另有隱情课锌,我是刑警寧澤,帶...
    沈念sama閱讀 34,306評論 4 330
  • 正文 年R本政府宣布祈秕,位于F島的核電站渺贤,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏请毛。R本人自食惡果不足惜志鞍,卻給世界環(huán)境...
    茶點故事閱讀 39,930評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望方仿。 院中可真熱鬧固棚,春花似錦、人聲如沸兼丰。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,745評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽鳍征。三九已至,卻和暖如春面徽,著一層夾襖步出監(jiān)牢的瞬間艳丛,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,983評論 1 266
  • 我被黑心中介騙來泰國打工趟紊, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留氮双,地道東北人。 一個月前我還...
    沈念sama閱讀 46,351評論 2 360
  • 正文 我出身青樓霎匈,卻偏偏與公主長得像戴差,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子铛嘱,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,509評論 2 348

推薦閱讀更多精彩內(nèi)容