Spark Streaming 數(shù)據(jù)清理機制

大家剛開始用Spark Streaming時,心里肯定嘀咕,對于一個7*24小時運行的數(shù)據(jù),cache住的RDD,broadcast 系統(tǒng)會幫忙自己清理掉么?還是說必須自己做清理迅矛?如果系統(tǒng)幫忙清理的話,機制是啥潜叛?

前言

為啥要了解機制呢秽褒?這就好比JVM的垃圾回收壶硅,雖然JVM的垃圾回收已經巨牛了,但是依然會遇到很多和它相關的case導致系統(tǒng)運行不正常震嫉。

這個內容我記得自己剛接觸Spark Streaming的時候森瘪,老板也問過我,運行期間會保留多少個RDD? 當時沒回答出來票堵。后面在群里也有人問到了扼睬,所以就整理了下。文中如有謬誤之處悴势,還望指出窗宇。

DStream 和 RDD

我們知道Spark Streaming 計算還是基于Spark Core的,Spark Core 的核心又是RDD. 所以Spark Streaming 肯定也要和RDD扯上關系特纤。然而Spark Streaming 并沒有直接讓用戶使用RDD而是自己抽象了一套DStream的概念军俊。 DStream 和 RDD 是包含的關系,你可以理解為Java里的裝飾模式捧存,也就是DStream 是對RDD的增強粪躬,但是行為表現(xiàn)和RDD是基本上差不多的。都具備幾個條件:

  1. 具有類似的tranformation動作昔穴,比如map,reduceByKey等镰官,也有一些自己獨有的,比如Window吗货,mapWithStated等
  2. 都具有Action動作泳唠,比如foreachRDD,count等

從編程模型上看是一致的宙搬。

所以很可能你寫的那堆Spark Streaming代碼看起來好像和Spark 一致的,然而并不能直接復用笨腥,因為一個是DStream的變換,一個是RDD的變化勇垛。

Spark Streaming中 DStream 介紹

DStream 下面包含幾個類:

  1. 數(shù)據(jù)源類脖母,比如InputDStream,具體如DirectKafkaInputStream等
  2. 轉換類,典型比如MappedDStream,ShuffledDStream
  3. 輸出類闲孤,典型比如ForEachDStream

從上面來看谆级,數(shù)據(jù)從開始(輸入)到結束(輸出)都是DStream體系來完成的,也就意味著用戶正常情況是無法直接去產生和操作RDD的崭放,這也就是說哨苛,DStream有機會和義務去負責RDD的生命周期鸽凶。

這就回答了前言中的問題了币砂。Spark Streaming具備自動清理功能。

RDD 在Spark Stream中產生的流程

在Spark Streaming中RDD的生命流程大體如下:

  1. 在InputDStream會將接受到的數(shù)據(jù)轉化成RDD,比如DirectKafkaInputStream 產生的就是 KafkaRDD
  2. 接著通過MappedDStream等進行數(shù)據(jù)轉換玻侥,這個時候是直接調用RDD對應的map方法進行轉換的
  3. 在進行輸出類操作時决摧,才暴露出RDD,可以讓用戶執(zhí)行相應的存儲,其他計算等操作。

我們這里就以下面的代碼來進行更詳細的解釋:

val source  =   KafkaUtils.createDirectInputStream(....)
source.map(....).foreachRDD{rdd=>
    rdd.saveTextFile(....)
}

foreachRDD 產生ForEachDStream掌桩,因為foreachRDD是個Action,所以會觸發(fā)任務的執(zhí)行边锁,會被調用generateJob方法。

 override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

對應的parent是MappedDStream,也就是說調用MappedDStream.getOrCompute.該方法在DStream中波岛,首先會在MappedDStream對象中的generatedRDDs 變量中查找是否已經有RDD,如果沒有則觸發(fā)計算茅坛,并且將產生的RDD放到generatedRDDs

@transientprivate[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    // If RDD was already generated, then retrieve it from HashMap,
    // or else compute the RDD
    generatedRDDs.get(time).orElse {
....
generatedRDDs.put(time, newRDD)
....

計算RDD是調用的compute方法,MappedDStream 的compute方法很簡單则拷,直接調用的父類也就是DirectKafkaInputStream的getOrCompute方法:

override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  }

在上面的例子中贡蓖,MappedDStream 的parent是DirectKafkaInputStream中,這是個數(shù)據(jù)源,所以他的compute方法會直接new出一個RDD.

從上面可以得出幾個結論:

  1. 數(shù)據(jù)源以及轉換類DStream都會維護一個generatedRDDs煌茬,可以按batchTime 進行獲取
  2. 內部本質還是進行的RDD的轉換

如果我們調用了cache會發(fā)生什么

這里又會有兩種情況斥铺,一種是調用DStream.cache,第二種是RDD.cache。事實上他們是完全一樣的坛善。

DStream的cache 動作只是將DStream的變量storageLevel 設置為MEMORY_ONLY_SER晾蜘,然后在產生(或者獲取)RDD的時候,調用RDD的persit方法進行設置眠屎。所以DStream.cache 產生的效果等價于RDD.cache(也就是你自己調用foreachRDD 將RDD 都設置一遍)

進入正題剔交,我們是怎么釋放Cache住的RDD的

其實無所謂Cache不Cache住,RDD最終都是要釋放的组力,否則運行久了省容,光RDD對象也能承包了你的內存。我們知道燎字,在Spark Streaming中腥椒,周期性產生事件驅動Spark Streaming 的類其實是:

org.apache.spark.streaming.scheduler.JobGenerator

他內部有個永動機(定時器),定時發(fā)布一個產生任務的事件:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

然后通過processEvent進行事件處理:

  /** Processes all events */
  private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
      case GenerateJobs(time) => generateJobs(time)
      case ClearMetadata(time) => clearMetadata(time)
      case DoCheckpoint(time, clearCheckpointDataLater) =>
        doCheckpoint(time, clearCheckpointDataLater)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
  }

目前我們只關注ClearMetadata 事件。對應的方法為:

private def clearMetadata(time: Time) {
    ssc.graph.clearMetadata(time)

    // If checkpointing is enabled, then checkpoint,
    // else mark batch to be fully processed
    if (shouldCheckpoint) {
      eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
    } else {
      // If checkpointing is not enabled, then delete metadata information about
      // received blocks (block data not saved in any case). Otherwise, wait for
      // checkpointing of this batch to complete.
      val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
      jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
      jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)
      markBatchFullyProcessed(time)
    }
  }

首先是清理輸出DStream(比如ForeachDStream),接著是清理輸入類(基于Receiver模式)的數(shù)據(jù)候衍。

ForeachDStream 其實調用的也是DStream的方法笼蛛。該方法大體如下:

private[streaming] def clearMetadata(time: Time) {
    val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
    val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
    logDebug("Clearing references to old RDDs: [" +
      oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
    generatedRDDs --= oldRDDs.keys
    if (unpersistData) {
      logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", "))
      oldRDDs.values.foreach { rdd =>
        rdd.unpersist(false)
        // Explicitly remove blocks of BlockRDD
        rdd match {
          case b: BlockRDD[_] =>
            logInfo("Removing blocks of RDD " + b + " of time " + time)
            b.removeBlocks()
          case _ =>
        }
      }
    }
    logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
      (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
    dependencies.foreach(_.clearMetadata(time))
  }

大體執(zhí)行動作如下描述:

  1. 根據(jù)記憶周期得到應該剔除的RDD
  2. 根據(jù)是否要清理cache數(shù)據(jù),進行unpersit 操作蛉鹿,并且顯示的移除block
  3. 根據(jù)依賴調用其他的DStream進行動作清理

這里我們還可以看到滨砍,通過參數(shù)spark.streaming.unpersist 你是可以決定是否手工控制是否需要對cache住的數(shù)據(jù)進行清理。

這里你會有兩個疑問:

  1. dependencies 是什么妖异?
  2. rememberDuration 是怎么來的惋戏?

dependencies 你可以簡單理解為父DStream,通過dependencies 我們可以獲得已完整DStream鏈。

rememberDuration 的設置略微復雜些,大體是 slideDuration,如果設置了checkpointDuration 則是2*checkpointDuration 或者通過DStreamGraph.rememberDuration(如果設置了的話他膳,譬如通過StreamingContext.remember方法,不過通過該方法設置的值要大于計算得到的值會生效)

另外值得一提的就是后面的DStream 會調整前面的DStream的rememberDuration响逢,譬如如果你用了window* 相關的操作,則在此之前的DStream 的rememberDuration 都需要加上windowDuration棕孙。

然后根據(jù)Spark Streaming的定時性舔亭,每個周期只要完成了些膨,都會觸發(fā)清理動作,這個就是清理動作發(fā)生的時機。代碼如下:

def onBatchCompletion(time: Time) {     
    eventLoop.post(ClearMetadata(time))
}

總結下

Spark Streaming 會在每個Batch任務結束時進行一次清理動作钦铺。每個DStream 都會被掃描订雾,不同的DStream根據(jù)情況不同,保留的RDD數(shù)量也是不一致的矛洞,但都是根據(jù)rememberDuration變量決定,而該變量會被下游的DStream所影響洼哎,所以不同的DStream的rememberDuration取值是不一樣的。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末沼本,一起剝皮案震驚了整個濱河市谱净,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌擅威,老刑警劉巖壕探,帶你破解...
    沈念sama閱讀 221,273評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異郊丛,居然都是意外死亡李请,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評論 3 398
  • 文/潘曉璐 我一進店門厉熟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來导盅,“玉大人,你說我怎么就攤上這事揍瑟“追” “怎么了?”我有些...
    開封第一講書人閱讀 167,709評論 0 360
  • 文/不壞的土叔 我叫張陵绢片,是天一觀的道長滤馍。 經常有香客問我,道長底循,這世上最難降的妖魔是什么巢株? 我笑而不...
    開封第一講書人閱讀 59,520評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮熙涤,結果婚禮上阁苞,老公的妹妹穿的比我還像新娘。我一直安慰自己祠挫,他們只是感情好那槽,可當我...
    茶點故事閱讀 68,515評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著等舔,像睡著了一般骚灸。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上软瞎,一...
    開封第一講書人閱讀 52,158評論 1 308
  • 那天逢唤,我揣著相機與錄音,去河邊找鬼涤浇。 笑死鳖藕,一個胖子當著我的面吹牛,可吹牛的內容都是我干的只锭。 我是一名探鬼主播著恩,決...
    沈念sama閱讀 40,755評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼蜻展!你這毒婦竟也來了喉誊?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,660評論 0 276
  • 序言:老撾萬榮一對情侶失蹤纵顾,失蹤者是張志新(化名)和其女友劉穎伍茄,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體施逾,經...
    沈念sama閱讀 46,203評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡敷矫,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,287評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了汉额。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片曹仗。...
    茶點故事閱讀 40,427評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖蠕搜,靈堂內的尸體忽然破棺而出怎茫,到底是詐尸還是另有隱情,我是刑警寧澤妓灌,帶...
    沈念sama閱讀 36,122評論 5 349
  • 正文 年R本政府宣布轨蛤,位于F島的核電站,受9級特大地震影響虫埂,放射性物質發(fā)生泄漏俱萍。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,801評論 3 333
  • 文/蒙蒙 一告丢、第九天 我趴在偏房一處隱蔽的房頂上張望枪蘑。 院中可真熱鬧,春花似錦岖免、人聲如沸岳颇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,272評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽话侧。三九已至,卻和暖如春闯参,著一層夾襖步出監(jiān)牢的瞬間瞻鹏,已是汗流浹背悲立。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留新博,地道東北人薪夕。 一個月前我還...
    沈念sama閱讀 48,808評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像赫悄,于是被迫代替她去往敵國和親原献。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,440評論 2 359

推薦閱讀更多精彩內容