揭開Spark Streaming神秘面紗① - DStreamGraph 與 DStream DAG

在 Spark Streaming 中唉擂,DStreamGraph 是一個非常重要的組件杈湾,主要用來:

  1. 通過成員 inputStreams 持有 Spark Streaming 輸入源及接收數(shù)據(jù)的方式
  2. 通過成員 outputStreams 持有 Streaming app 的 output 操作唧垦,并記錄 DStream 依賴關(guān)系
  3. 生成每個 batch 對應(yīng)的 jobs

下面撒桨,我將通過分析一個簡單的例子析恢,結(jié)合源碼分析來說明 DStreamGraph 是如何發(fā)揮作用的墨坚。例子如下:

val sparkConf = new SparkConf().setAppName("HdfsWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))

val lines = ssc.textFileStream(args(0))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

創(chuàng)建 DStreamGraph 實例

代碼val ssc = new StreamingContext(sparkConf, Seconds(2))創(chuàng)建了 StreamingContext 實例,StreamingContext 包含了 DStreamGraph 類型的成員graph映挂,graph 在 StreamingContext主構(gòu)造函數(shù)中被創(chuàng)建泽篮,如下

  private[streaming] val graph: DStreamGraph = {
    if (isCheckpointPresent) {
      cp_.graph.setContext(this)
      cp_.graph.restoreCheckpointData()
      cp_.graph
    } else {
      require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
      val newGraph = new DStreamGraph()
      newGraph.setBatchDuration(batchDur_)
      newGraph
    }
  }

可以看到,若當(dāng)前 checkpoint 可用柑船,會優(yōu)先從 checkpoint 恢復(fù) graph帽撑,否則新建一個。還可以從這里知道的一點是:graph 是運行在 driver 上的

DStreamGraph記錄輸入源及如何接收數(shù)據(jù)

DStreamGraph有和application 輸入數(shù)據(jù)相關(guān)的成員和方法鞍时,如下:

  private val inputStreams = new ArrayBuffer[InputDStream[_]]()

  def addInputStream(inputStream: InputDStream[_]) {
    this.synchronized {
      inputStream.setGraph(this)
      inputStreams += inputStream
    }
  }

成員inputStreams為 InputDStream 類型的數(shù)組亏拉,InputDStream是所有 input streams(數(shù)據(jù)輸入流) 的虛基類。該類提供了 start() 和 stop()方法供 streaming 系統(tǒng)來開始和停止接收數(shù)據(jù)。那些只需要在 driver 端接收數(shù)據(jù)并轉(zhuǎn)成 RDD 的 input streams 可以直接繼承 InputDStream专筷,例如 FileInputDStream是 InputDStream 的子類弱贼,它監(jiān)控一個 HDFS 目錄并將新文件轉(zhuǎn)成RDDs。而那些需要在 workers 上運行receiver 來接收數(shù)據(jù)的 Input DStream磷蛹,需要繼承 ReceiverInputDStream吮旅,比如 KafkaReceiver。

我們來看看val lines = ssc.textFileStream(args(0))調(diào)用味咳。
為了更容易理解庇勃,我畫出了val lines = ssc.textFileStream(args(0))的調(diào)用流程

從上面的調(diào)用流程圖我們可以知道:

  1. ssc.textFileStream會觸發(fā)新建一個FileInputDStream。FileInputDStream繼承于InputDStream槽驶,其start()方法定義了數(shù)據(jù)源及如何接收數(shù)據(jù)
  2. 在FileInputDStream構(gòu)造函數(shù)中责嚷,會調(diào)用ssc.graph.addInputStream(this),將自身添加到 DStreamGraph 的 inputStreams: ArrayBuffer[InputDStream[_]] 中掂铐,這樣 DStreamGraph 就知道了這個 Streaming App 的輸入源及如何接收數(shù)據(jù)罕拂。可能你會奇怪為什么inputStreams 是數(shù)組類型全陨,舉個例子爆班,這里再來一個 val lines1 = ssc.textFileStream(args(0)),那么又將生成一個 FileInputStream 實例添加到inputStreams辱姨,所以這里需要集合類型
  3. 生成FileInputDStream調(diào)用其 map 方法柿菩,將以 FileInputDStream 本身作為 partent 來構(gòu)造新的 MappedDStream。對于 DStream 的 transform 操作雨涛,都將生成一個新的 DStream枢舶,和 RDD transform 生成新的 RDD 類似

與MappedDStream 不同,所有繼承了 InputDStream 的定義了輸入源及接收數(shù)據(jù)方式的 sreams 都沒有 parent替久,因為它們就是最初的 streams凉泄。

DStream 的依賴鏈

每個 DStream 的子類都會繼承 def dependencies: List[DStream[_]] = List()方法,該方法用來返回自己的依賴的父 DStream 列表侣肄。比如旧困,沒有父DStream 的 InputDStream 的 dependencies方法返回List()醇份。

MappedDStream 的實現(xiàn)如下:

class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  ...
}

在上例中稼锅,構(gòu)造函數(shù)參數(shù)列表中的 parent 即在 ssc.textFileStream 中new 的定義了輸入源及數(shù)據(jù)接收方式的最初的 FileInputDStream實例,這里的 dependencies方法將返回該FileInputDStream實例僚纷,這就構(gòu)成了第一條依賴矩距。可用如下圖表示怖竭,這里特地將 input streams 用藍色表示锥债,以強調(diào)其與普通由 transform 產(chǎn)生的 DStream 的不同:

繼續(xù)來看val words = lines.flatMap(_.split(" ")),flatMap如下:

  def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope {
    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
  }

每一個 transform 操作都將創(chuàng)建一個新的 DStream,flatMap 操作也不例外哮肚,它會創(chuàng)建一個FlatMappedDStream登夫,F(xiàn)latMappedDStream的實現(xiàn)如下:

class FlatMappedDStream[T: ClassTag, U: ClassTag](
    parent: DStream[T],
    flatMapFunc: T => Traversable[U]
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  ...
}

與 MappedDStream 相同,F(xiàn)latMappedDStream#dependencies也返回其依賴的父 DStream允趟,及 lines恼策,到這里,依賴鏈就變成了下圖:

之后的幾步操作不再這樣具體分析潮剪,到生成wordCounts時涣楷,依賴圖將變成下面這樣:

在 DStream 中,與 transofrm 相對應(yīng)的是 output 操作抗碰,包括 print, saveAsTextFiles, saveAsObjectFiles, saveAsHadoopFiles, foreachRDD狮斗。output 操作中,會創(chuàng)建ForEachDStream實例并調(diào)用register方法將自身添加到DStreamGraph.outputStreams成員中弧蝇,該ForEachDStream實例也會持有是調(diào)用的哪個 output 操作碳褒。本例的代碼調(diào)用如下,只需看箭頭所指幾行代碼

與 DStream transform 操作返回一個新的 DStream 不同看疗,output 操作不會返回任何東西骤视,只會創(chuàng)建一個ForEachDStream作為依賴鏈的終結(jié)。

至此鹃觉, 生成了完成的依賴鏈专酗,也就是 DAG,如下圖(這里將 ForEachDStream 標為黃色以顯示其與眾不同):

這里的依賴鏈又叫 DAG盗扇。本文以一個簡單的例子說明 DStream DAG 的生成過程祷肯,之后將再寫兩篇文章說明如何根據(jù)這個 DStream DAG 得到 RDD DAG 及如何定時生成 job。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末疗隶,一起剝皮案震驚了整個濱河市佑笋,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌斑鼻,老刑警劉巖蒋纬,帶你破解...
    沈念sama閱讀 216,744評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異坚弱,居然都是意外死亡蜀备,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,505評論 3 392
  • 文/潘曉璐 我一進店門荒叶,熙熙樓的掌柜王于貴愁眉苦臉地迎上來碾阁,“玉大人,你說我怎么就攤上這事些楣≈祝” “怎么了宪睹?”我有些...
    開封第一講書人閱讀 163,105評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長蚕钦。 經(jīng)常有香客問我亭病,道長,這世上最難降的妖魔是什么嘶居? 我笑而不...
    開封第一講書人閱讀 58,242評論 1 292
  • 正文 為了忘掉前任命贴,我火速辦了婚禮,結(jié)果婚禮上食听,老公的妹妹穿的比我還像新娘胸蛛。我一直安慰自己,他們只是感情好樱报,可當(dāng)我...
    茶點故事閱讀 67,269評論 6 389
  • 文/花漫 我一把揭開白布葬项。 她就那樣靜靜地躺著,像睡著了一般迹蛤。 火紅的嫁衣襯著肌膚如雪民珍。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,215評論 1 299
  • 那天盗飒,我揣著相機與錄音嚷量,去河邊找鬼。 笑死逆趣,一個胖子當(dāng)著我的面吹牛蝶溶,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播宣渗,決...
    沈念sama閱讀 40,096評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼抖所,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了痕囱?” 一聲冷哼從身側(cè)響起田轧,我...
    開封第一講書人閱讀 38,939評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎鞍恢,沒想到半個月后傻粘,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,354評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡帮掉,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,573評論 2 333
  • 正文 我和宋清朗相戀三年弦悉,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片旭寿。...
    茶點故事閱讀 39,745評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡警绩,死狀恐怖崇败,靈堂內(nèi)的尸體忽然破棺而出盅称,到底是詐尸還是另有隱情肩祥,我是刑警寧澤,帶...
    沈念sama閱讀 35,448評論 5 344
  • 正文 年R本政府宣布缩膝,位于F島的核電站混狠,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏疾层。R本人自食惡果不足惜将饺,卻給世界環(huán)境...
    茶點故事閱讀 41,048評論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望痛黎。 院中可真熱鬧予弧,春花似錦、人聲如沸湖饱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,683評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽井厌。三九已至蚓庭,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間仅仆,已是汗流浹背器赞。 一陣腳步聲響...
    開封第一講書人閱讀 32,838評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留墓拜,地道東北人港柜。 一個月前我還...
    沈念sama閱讀 47,776評論 2 369
  • 正文 我出身青樓,卻偏偏與公主長得像咳榜,于是被迫代替她去往敵國和親潘懊。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,652評論 2 354

推薦閱讀更多精彩內(nèi)容