在 Spark Streaming 中唉擂,DStreamGraph 是一個非常重要的組件杈湾,主要用來:
- 通過成員 inputStreams 持有 Spark Streaming 輸入源及接收數(shù)據(jù)的方式
- 通過成員 outputStreams 持有 Streaming app 的 output 操作唧垦,并記錄 DStream 依賴關(guān)系
- 生成每個 batch 對應(yīng)的 jobs
下面撒桨,我將通過分析一個簡單的例子析恢,結(jié)合源碼分析來說明 DStreamGraph 是如何發(fā)揮作用的墨坚。例子如下:
val sparkConf = new SparkConf().setAppName("HdfsWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val lines = ssc.textFileStream(args(0))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
創(chuàng)建 DStreamGraph 實例
代碼val ssc = new StreamingContext(sparkConf, Seconds(2))
創(chuàng)建了 StreamingContext 實例,StreamingContext 包含了 DStreamGraph 類型的成員graph映挂,graph 在 StreamingContext主構(gòu)造函數(shù)中被創(chuàng)建泽篮,如下
private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
cp_.graph.setContext(this)
cp_.graph.restoreCheckpointData()
cp_.graph
} else {
require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
val newGraph = new DStreamGraph()
newGraph.setBatchDuration(batchDur_)
newGraph
}
}
可以看到,若當(dāng)前 checkpoint 可用柑船,會優(yōu)先從 checkpoint 恢復(fù) graph帽撑,否則新建一個。還可以從這里知道的一點是:graph 是運行在 driver 上的
DStreamGraph記錄輸入源及如何接收數(shù)據(jù)
DStreamGraph有和application 輸入數(shù)據(jù)相關(guān)的成員和方法鞍时,如下:
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
def addInputStream(inputStream: InputDStream[_]) {
this.synchronized {
inputStream.setGraph(this)
inputStreams += inputStream
}
}
成員inputStreams為 InputDStream 類型的數(shù)組亏拉,InputDStream是所有 input streams(數(shù)據(jù)輸入流) 的虛基類。該類提供了 start() 和 stop()方法供 streaming 系統(tǒng)來開始和停止接收數(shù)據(jù)。那些只需要在 driver 端接收數(shù)據(jù)并轉(zhuǎn)成 RDD 的 input streams 可以直接繼承 InputDStream专筷,例如 FileInputDStream是 InputDStream 的子類弱贼,它監(jiān)控一個 HDFS 目錄并將新文件轉(zhuǎn)成RDDs。而那些需要在 workers 上運行receiver 來接收數(shù)據(jù)的 Input DStream磷蛹,需要繼承 ReceiverInputDStream吮旅,比如 KafkaReceiver。
我們來看看val lines = ssc.textFileStream(args(0))
調(diào)用味咳。
為了更容易理解庇勃,我畫出了val lines = ssc.textFileStream(args(0))
的調(diào)用流程
從上面的調(diào)用流程圖我們可以知道:
- ssc.textFileStream會觸發(fā)新建一個FileInputDStream。FileInputDStream繼承于InputDStream槽驶,其start()方法定義了數(shù)據(jù)源及如何接收數(shù)據(jù)
- 在FileInputDStream構(gòu)造函數(shù)中责嚷,會調(diào)用
ssc.graph.addInputStream(this)
,將自身添加到 DStreamGraph 的inputStreams: ArrayBuffer[InputDStream[_]]
中掂铐,這樣 DStreamGraph 就知道了這個 Streaming App 的輸入源及如何接收數(shù)據(jù)罕拂。可能你會奇怪為什么inputStreams 是數(shù)組類型全陨,舉個例子爆班,這里再來一個val lines1 = ssc.textFileStream(args(0))
,那么又將生成一個 FileInputStream 實例添加到inputStreams辱姨,所以這里需要集合類型 - 生成FileInputDStream調(diào)用其 map 方法柿菩,將以 FileInputDStream 本身作為 partent 來構(gòu)造新的 MappedDStream。對于 DStream 的 transform 操作雨涛,都將生成一個新的 DStream枢舶,和 RDD transform 生成新的 RDD 類似
與MappedDStream 不同,所有繼承了 InputDStream 的定義了輸入源及接收數(shù)據(jù)方式的 sreams 都沒有 parent替久,因為它們就是最初的 streams凉泄。
DStream 的依賴鏈
每個 DStream 的子類都會繼承 def dependencies: List[DStream[_]] = List()
方法,該方法用來返回自己的依賴的父 DStream 列表侣肄。比如旧困,沒有父DStream 的 InputDStream 的 dependencies方法返回List()醇份。
MappedDStream 的實現(xiàn)如下:
class MappedDStream[T: ClassTag, U: ClassTag] (
parent: DStream[T],
mapFunc: T => U
) extends DStream[U](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
...
}
在上例中稼锅,構(gòu)造函數(shù)參數(shù)列表中的 parent 即在 ssc.textFileStream 中new 的定義了輸入源及數(shù)據(jù)接收方式的最初的 FileInputDStream實例,這里的 dependencies方法將返回該FileInputDStream實例僚纷,這就構(gòu)成了第一條依賴矩距。可用如下圖表示怖竭,這里特地將 input streams 用藍色表示锥债,以強調(diào)其與普通由 transform 產(chǎn)生的 DStream 的不同:
繼續(xù)來看val words = lines.flatMap(_.split(" "))
,flatMap如下:
def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
每一個 transform 操作都將創(chuàng)建一個新的 DStream,flatMap 操作也不例外哮肚,它會創(chuàng)建一個FlatMappedDStream登夫,F(xiàn)latMappedDStream的實現(xiàn)如下:
class FlatMappedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
flatMapFunc: T => Traversable[U]
) extends DStream[U](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
...
}
與 MappedDStream 相同,F(xiàn)latMappedDStream#dependencies也返回其依賴的父 DStream允趟,及 lines恼策,到這里,依賴鏈就變成了下圖:
之后的幾步操作不再這樣具體分析潮剪,到生成wordCounts時涣楷,依賴圖將變成下面這樣:
在 DStream 中,與 transofrm 相對應(yīng)的是 output 操作抗碰,包括 print
, saveAsTextFiles
, saveAsObjectFiles
, saveAsHadoopFiles
, foreachRDD
狮斗。output 操作中,會創(chuàng)建ForEachDStream實例并調(diào)用register方法將自身添加到DStreamGraph.outputStreams成員中弧蝇,該ForEachDStream實例也會持有是調(diào)用的哪個 output 操作碳褒。本例的代碼調(diào)用如下,只需看箭頭所指幾行代碼
與 DStream transform 操作返回一個新的 DStream 不同看疗,output 操作不會返回任何東西骤视,只會創(chuàng)建一個ForEachDStream作為依賴鏈的終結(jié)。
至此鹃觉, 生成了完成的依賴鏈专酗,也就是 DAG,如下圖(這里將 ForEachDStream 標為黃色以顯示其與眾不同):
這里的依賴鏈又叫 DAG盗扇。本文以一個簡單的例子說明 DStream DAG 的生成過程祷肯,之后將再寫兩篇文章說明如何根據(jù)這個 DStream DAG 得到 RDD DAG 及如何定時生成 job。