Spark Streaming 誤用.transform(func)函數(shù)導(dǎo)致的問題解析

Spark/Spark Streaming transform 是一個(gè)很強(qiáng)的方法,不過使用過程中可能也有一些值得注意的問題。在分析的問題,我們還會(huì)順帶討論下Spark Streaming 生成job的邏輯,從而讓大家知道問題的根源。

問題描述

今天有朋友貼了一段 gist,大家可以先看看這段代碼有什么問題春弥。

特定情況你會(huì)發(fā)現(xiàn)UI 的Storage標(biāo)簽上有很多新的Cache RDD,然后你以為是Cache RDD 不被釋放电抚,但是通過Spark Streaming 數(shù)據(jù)清理機(jī)制分析我們可以排除這個(gè)問題惕稻。

接著通過給RDD的設(shè)置名字,名字帶上時(shí)間,發(fā)現(xiàn)是延時(shí)的Batch 也會(huì)產(chǎn)生cache RDD蝙叛。那這是怎么回事呢俺祠?

另外還有一個(gè)問題,也是相同的原因造成的:我通過KafkaInputStream.transform 方法獲取Kafka偏移量借帘,并且保存到HDFS上蜘渣。然后發(fā)現(xiàn)一旦產(chǎn)生job(包括并沒有執(zhí)行的Job),都會(huì)生成了Offset,這樣如果出現(xiàn)宕機(jī),你看到的最新Offset 其實(shí)就是延時(shí)的肺然,而不是出現(xiàn)故障時(shí)的Offset了蔫缸。這樣做恢復(fù)就變得困難了。

問題分析

其實(shí)是這樣际起,在transform里你可以做很多復(fù)雜的工作拾碌,但是transform接受到的函數(shù)比較特殊,是會(huì)在TransformedDStream.compute方法中執(zhí)行的街望,你需要確保里面的動(dòng)作都是transformation(延時(shí)的)校翔,而不能是Action(譬如第一個(gè)例子里的count動(dòng)作),或者不能有立即執(zhí)行的(比如我提到的例子里的自己通過HDFS API 將Kafka偏移量保存到HDFS)灾前。

override def compute(validTime: Time): Option[RDD[U]] = {
    val parentRDDs = parents.map { parent => 
    ....
  //看這一句防症,你的函數(shù)在調(diào)用compute方法時(shí),就會(huì)被調(diào)用
    val transformedRDD = transformFunc(parentRDDs, validTime)
    if (transformedRDD == null) {
      throw new SparkException.....
    }
    Some(transformedRDD)
  }

這里有兩個(gè)疑問:

  • 那些.map .transform 都是transformation,不是只有真實(shí)被提交后才會(huì)被執(zhí)行么哎甲?
  • DStream.compute 方法為什么會(huì)在generateJob的時(shí)候就被調(diào)用呢蔫敲?

Spark Streaming generateJob 邏輯解析

在JobGenerator中,會(huì)定時(shí)產(chǎn)生一個(gè)GenerateJobs的事件:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

該事件會(huì)被DStreamGraph.generateJobs 處理炭玫,產(chǎn)生Job的邏輯 也很簡單奈嘿,

def generateJobs(time: Time): Seq[Job] = {   
    val jobs = this.synchronized {
      outputStreams.flatMap { outputStream =>
        val jobOption = outputStream.generateJob(time)
        ........    
  }

就是調(diào)用各個(gè)outputStream 的generateJob方法,典型的outputStream如ForEachDStream吞加。 以ForEachDStream為例裙犹,產(chǎn)生job的方式如下:

override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

我們看到酝惧,在這里會(huì)觸發(fā)所有的DStream鏈進(jìn)行compute動(dòng)作。也就意味著所有transformation產(chǎn)生的DStream的compute方法都會(huì)被調(diào)用伯诬。

正常情況下不會(huì)有什么問題,比如.map(func) 產(chǎn)生的MappedDStream里面在compute執(zhí)行時(shí)巫财,func 都是被記住而不是被執(zhí)行盗似。但是TransformedDStream 是比較特殊的,對(duì)應(yīng)的func是會(huì)被執(zhí)行的平项,在對(duì)應(yīng)的compute方法里赫舒,你會(huì)看到這行代碼:

val transformedRDD = transformFunc(parentRDDs, validTime)

這里的transformFunc 就是transform(func)里的func了。然而transform 又特別靈活闽瓢,可以執(zhí)行各種RDD操作接癌,這個(gè)時(shí)候Spark Streaming 是攔不住你的,一旦你使用了count之類的Action,產(chǎn)生Job的時(shí)候就會(huì)被立刻執(zhí)行扣讼,而不是等到Job被提交才執(zhí)行缺猛。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市椭符,隨后出現(xiàn)的幾起案子荔燎,更是在濱河造成了極大的恐慌,老刑警劉巖销钝,帶你破解...
    沈念sama閱讀 218,386評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件有咨,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡蒸健,警方通過查閱死者的電腦和手機(jī)座享,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來似忧,“玉大人渣叛,你說我怎么就攤上這事∠鹇Γ” “怎么了诗箍?”我有些...
    開封第一講書人閱讀 164,704評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長挽唉。 經(jīng)常有香客問我滤祖,道長,這世上最難降的妖魔是什么瓶籽? 我笑而不...
    開封第一講書人閱讀 58,702評(píng)論 1 294
  • 正文 為了忘掉前任匠童,我火速辦了婚禮,結(jié)果婚禮上塑顺,老公的妹妹穿的比我還像新娘汤求。我一直安慰自己俏险,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,716評(píng)論 6 392
  • 文/花漫 我一把揭開白布扬绪。 她就那樣靜靜地躺著竖独,像睡著了一般。 火紅的嫁衣襯著肌膚如雪挤牛。 梳的紋絲不亂的頭發(fā)上莹痢,一...
    開封第一講書人閱讀 51,573評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音墓赴,去河邊找鬼竞膳。 笑死,一個(gè)胖子當(dāng)著我的面吹牛诫硕,可吹牛的內(nèi)容都是我干的坦辟。 我是一名探鬼主播,決...
    沈念sama閱讀 40,314評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼章办,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼锉走!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起纲菌,我...
    開封第一講書人閱讀 39,230評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤挠日,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后翰舌,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體嚣潜,經(jīng)...
    沈念sama閱讀 45,680評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,873評(píng)論 3 336
  • 正文 我和宋清朗相戀三年椅贱,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了懂算。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,991評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡庇麦,死狀恐怖计技,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情山橄,我是刑警寧澤垮媒,帶...
    沈念sama閱讀 35,706評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站航棱,受9級(jí)特大地震影響睡雇,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜饮醇,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,329評(píng)論 3 330
  • 文/蒙蒙 一它抱、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧朴艰,春花似錦观蓄、人聲如沸混移。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,910評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽歌径。三九已至,卻和暖如春亲茅,著一層夾襖步出監(jiān)牢的瞬間沮脖,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,038評(píng)論 1 270
  • 我被黑心中介騙來泰國打工芯急, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人驶俊。 一個(gè)月前我還...
    沈念sama閱讀 48,158評(píng)論 3 370
  • 正文 我出身青樓娶耍,卻偏偏與公主長得像,于是被迫代替她去往敵國和親饼酿。 傳聞我的和親對(duì)象是個(gè)殘疾皇子榕酒,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,941評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容