Spark/Spark Streaming transform 是一個(gè)很強(qiáng)的方法,不過使用過程中可能也有一些值得注意的問題。在分析的問題,我們還會(huì)順帶討論下Spark Streaming 生成job的邏輯,從而讓大家知道問題的根源。
問題描述
今天有朋友貼了一段 gist,大家可以先看看這段代碼有什么問題春弥。
特定情況你會(huì)發(fā)現(xiàn)UI 的Storage標(biāo)簽上有很多新的Cache RDD,然后你以為是Cache RDD 不被釋放电抚,但是通過Spark Streaming 數(shù)據(jù)清理機(jī)制分析我們可以排除這個(gè)問題惕稻。
接著通過給RDD的設(shè)置名字,名字帶上時(shí)間,發(fā)現(xiàn)是延時(shí)的Batch 也會(huì)產(chǎn)生cache RDD蝙叛。那這是怎么回事呢俺祠?
另外還有一個(gè)問題,也是相同的原因造成的:我通過KafkaInputStream.transform 方法獲取Kafka偏移量借帘,并且保存到HDFS上蜘渣。然后發(fā)現(xiàn)一旦產(chǎn)生job(包括并沒有執(zhí)行的Job),都會(huì)生成了Offset,這樣如果出現(xiàn)宕機(jī),你看到的最新Offset 其實(shí)就是延時(shí)的肺然,而不是出現(xiàn)故障時(shí)的Offset了蔫缸。這樣做恢復(fù)就變得困難了。
問題分析
其實(shí)是這樣际起,在transform里你可以做很多復(fù)雜的工作拾碌,但是transform接受到的函數(shù)比較特殊,是會(huì)在TransformedDStream.compute方法中執(zhí)行的街望,你需要確保里面的動(dòng)作都是transformation(延時(shí)的)校翔,而不能是Action(譬如第一個(gè)例子里的count動(dòng)作),或者不能有立即執(zhí)行的(比如我提到的例子里的自己通過HDFS API 將Kafka偏移量保存到HDFS)灾前。
override def compute(validTime: Time): Option[RDD[U]] = {
val parentRDDs = parents.map { parent =>
....
//看這一句防症,你的函數(shù)在調(diào)用compute方法時(shí),就會(huì)被調(diào)用
val transformedRDD = transformFunc(parentRDDs, validTime)
if (transformedRDD == null) {
throw new SparkException.....
}
Some(transformedRDD)
}
這里有兩個(gè)疑問:
- 那些.map .transform 都是transformation,不是只有真實(shí)被提交后才會(huì)被執(zhí)行么哎甲?
- DStream.compute 方法為什么會(huì)在generateJob的時(shí)候就被調(diào)用呢蔫敲?
Spark Streaming generateJob 邏輯解析
在JobGenerator中,會(huì)定時(shí)產(chǎn)生一個(gè)GenerateJobs的事件:
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
該事件會(huì)被DStreamGraph.generateJobs 處理炭玫,產(chǎn)生Job的邏輯 也很簡單奈嘿,
def generateJobs(time: Time): Seq[Job] = {
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
........
}
就是調(diào)用各個(gè)outputStream 的generateJob方法,典型的outputStream如ForEachDStream吞加。 以ForEachDStream為例裙犹,產(chǎn)生job的方式如下:
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}
我們看到酝惧,在這里會(huì)觸發(fā)所有的DStream鏈進(jìn)行compute動(dòng)作。也就意味著所有transformation產(chǎn)生的DStream的compute方法都會(huì)被調(diào)用伯诬。
正常情況下不會(huì)有什么問題,比如.map(func) 產(chǎn)生的MappedDStream里面在compute執(zhí)行時(shí)巫财,func 都是被記住
而不是被執(zhí)行盗似。但是TransformedDStream 是比較特殊的,對(duì)應(yīng)的func是會(huì)被執(zhí)行的平项,在對(duì)應(yīng)的compute方法里赫舒,你會(huì)看到這行代碼:
val transformedRDD = transformFunc(parentRDDs, validTime)
這里的transformFunc 就是transform(func)里的func了。然而transform 又特別靈活闽瓢,可以執(zhí)行各種RDD操作接癌,這個(gè)時(shí)候Spark Streaming 是攔不住你的,一旦你使用了count之類的Action,產(chǎn)生Job的時(shí)候就會(huì)被立刻執(zhí)行扣讼,而不是等到Job被提交才執(zhí)行缺猛。