Spark Streaming 初始化過程分析

—————?—————?—————?—————?—————?—————
Spark Streaming概述
Spark Streaming 初始化過程
Spark Streaming Receiver啟動(dòng)過程分析
Spark Streaming 數(shù)據(jù)準(zhǔn)備階段分析(Receiver方式)
Spark Streaming 數(shù)據(jù)計(jì)算階段分析
SparkStreaming Backpressure分析
Spark Streaming Executor DynamicAllocation 機(jī)制分析

—————?—————?—————?—————?—————?—————

Spark Streaming是一種構(gòu)建在Spark上的實(shí)時(shí)計(jì)算框架柜与。Spark Streaming應(yīng)用以Spark應(yīng)用的方式提交到Spark平臺(tái)场躯,其組件以長(zhǎng)期批處理任務(wù)的形式在Spark平臺(tái)運(yùn)行。這些任務(wù)主要負(fù)責(zé)接收實(shí)時(shí)數(shù)據(jù)流及定期產(chǎn)生批作業(yè)并提交至Spark集群旅挤,本文要說明的是以下幾個(gè)功能模塊運(yùn)行前的準(zhǔn)備工作踢关。

  • 數(shù)據(jù)接收
  • Job 生成
  • 流量控制
  • 動(dòng)態(tài)資源伸縮

下面我們以WordCount程序?yàn)槔治鯯park Streaming運(yùn)行環(huán)境的初始化過程。

val conf = new SparkConf().setAppName("wordCount").setMaster("local[4]") 
val sc = new SparkContext(conf) 
val ssc = new StreamingContext(sc, Seconds(10)) 
val lines = ssc.socketTextStream("localhost", 8585, StorageLevel.MEMORY_ONLY) 
val words = lines.flatMap(_.split(" ")).map(w => (w,1)) 
val wordCount = words.reduceByKey(_+_) 
wordCount.print 
ssc.start()
ssc.awaitTermination()

以下流程粘茄,皆以上述WordCount源碼為例签舞。

1、StreamingContext的初始化過程

StreamingContext是Spark Streaming應(yīng)用的執(zhí)行環(huán)境柒瓣,其定義很多Streaming功能的入口儒搭,如:它提供從多種數(shù)據(jù)源創(chuàng)建DStream的方法等。
在創(chuàng)建Streaming應(yīng)用時(shí)芙贫,首先應(yīng)創(chuàng)建StreamingContext(WordCount應(yīng)用可知)搂鲫,伴隨StreamingContext的創(chuàng)建將會(huì)創(chuàng)建以下主要組件:

1.1 DStreamGraph

DStreamGraph的主要功能是記錄InputDStream及OutputStream及從InputDStream中抽取出ReceiverInputStreams。因?yàn)镈Stream之間的依賴關(guān)系類似于RDD磺平,并在任務(wù)執(zhí)行時(shí)轉(zhuǎn)換成RDD魂仍,因此,可以認(rèn)為DStream Graph與RDD Graph存在對(duì)應(yīng)關(guān)系. 即:DStreamGraph以批處理間隔為周期轉(zhuǎn)換成RDDGraph.

  • ReceiverInputStreams: 包含用于接收數(shù)據(jù)的Receiver信息拣挪,并在啟動(dòng)Receiver時(shí)提供相關(guān)信息
  • OutputStream:每個(gè)OutputStream會(huì)在批作業(yè)生成時(shí)擦酌,生成一個(gè)Job.

1.2 JobScheduler

JobScheduler是Spark Streaming中最核心的組件,其負(fù)載Streaming各功作組件的啟動(dòng)菠劝。

  • 數(shù)據(jù)接收
  • Job 生成
  • 流量控制
  • 動(dòng)態(tài)資源伸縮
    以及負(fù)責(zé)生成的批Job的調(diào)度及狀態(tài)管理工作赊舶。

2、 DStream的創(chuàng)建與轉(zhuǎn)換

StreamingContext初始化完畢后,通過調(diào)用其提供的創(chuàng)建InputDStream的方法創(chuàng)建SocketInputDStream.

SocketInputDStream的繼承關(guān)系為:
SocketInputDStream->ReceiverInputDStream->InputDStream->DStream.
在InputDStream中 提供如下功

 ssc.graph.addInputStream(this)

JAVA中初始化子類時(shí)笼平,會(huì)先初始化其父類园骆。所以在創(chuàng)建SocketInputDStream時(shí),會(huì)先初始化InputDStream寓调,在InputDStream中實(shí)現(xiàn)將自身加入DStreamGraph中遇伞,以標(biāo)識(shí)其為輸入數(shù)據(jù)源。
DStream中算子的轉(zhuǎn)換捶牢,類似于RDD中的轉(zhuǎn)換鸠珠,都是延遲計(jì)算,僅形成pipeline鏈秋麸。當(dāng)上述應(yīng)用遇到print(Output算子)時(shí)渐排,會(huì)將DStream轉(zhuǎn)換為ForEachDStream,并調(diào)register方法作為OutputStream注冊(cè)到DStreamGraph的outputStreams列表,以待生成Job灸蟆。
print算子實(shí)現(xiàn)方法如下:

/**
   * Print the first num elements of each RDD generated in this DStream. This is an output
   * operator, so this DStream will be registered as an output stream and there materialized.
   */
 def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println(s"Time: $time")
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }

  /**
   * Apply a function to each RDD in this DStream. This is an output operator, so
   * 'this' DStream will be registered as an output stream and therefore materialized.
   * @param foreachFunc foreachRDD function
   * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
   *                           in the `foreachFunc` to be displayed in the UI. If `false`, then
   *                           only the scopes and callsites of `foreachRDD` will override those
   *                           of the RDDs on the display.
   */
  private def foreachRDD(
      foreachFunc: (RDD[T], Time) => Unit,
      displayInnerRDDOps: Boolean): Unit = {
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  }

ForEachDStream 不同于其它DStream的地方為其重寫了generateJob方法驯耻,以使DStream Graph操作轉(zhuǎn)換成RDD Graph操作,并生成Job.

3炒考、SparkContext啟動(dòng)

/**
   * Start the execution of the streams.
   *
   * @throws IllegalStateException if the StreamingContext is already stopped.
   */
  def start(): Unit = synchronized {
    state match {
      case INITIALIZED =>
        startSite.set(DStream.getCreationSite())
        StreamingContext.ACTIVATION_LOCK.synchronized {
          StreamingContext.assertNoOtherContextIsActive()
          try {
            validate()

            // Start the streaming scheduler in a new thread, so that thread local properties
            // like call sites and job groups can be reset without affecting those of the
            // current thread.
            ThreadUtils.runInNewThread("streaming-start") {
              sparkContext.setCallSite(startSite.get)
              sparkContext.clearJobGroup()
              sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
              savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
              scheduler.start()
            }
            state = StreamingContextState.ACTIVE
          } catch {
            case NonFatal(e) =>
              logError("Error starting the context, marking it as stopped", e)
              scheduler.stop(false)
              state = StreamingContextState.STOPPED
              throw e
          }
          StreamingContext.setActiveContext(this)
        }
        ......
      case ACTIVE =>
        logWarning("StreamingContext has already been started")
      case STOPPED =>
        throw new IllegalStateException("StreamingContext has already been stopped")
    }
  }

在此方法中可缚,最核心的代碼是以線程的方式啟動(dòng)JobScheduler,從而開啟各功能組件斋枢。

3.1 JobScheduler的啟動(dòng)

JobScheduler主要負(fù)責(zé)以下幾種任務(wù):

  • 數(shù)據(jù)接收相關(guān)組件的初始化及啟動(dòng)
    ReceiverTracker的初始化及啟動(dòng)帘靡。ReceiverTracker負(fù)責(zé)管理Receiver,包括Receiver的啟停瓤帚,狀態(tài)維護(hù) 等描姚。
  • Job生成相關(guān)組件的啟動(dòng)
    JobGenerator的啟動(dòng)。JobGenerator負(fù)責(zé)以BatchInterval為周期生成Job.
  • Streaming監(jiān)聽的注冊(cè)與啟動(dòng)
  • 作業(yè)監(jiān)聽
  • 反壓機(jī)制
    BackPressure機(jī)制戈次,通過RateController控制數(shù)據(jù)攝取速率轩勘。
  • Executor DynamicAllocation 的啟動(dòng)
    Executor 動(dòng)態(tài)伸縮管理, 動(dòng)態(tài)增加或減少Executor,來達(dá)到使用系統(tǒng)穩(wěn)定運(yùn)行 或減少資源開銷的目的怯邪。
  • Job的調(diào)度及狀態(tài)維護(hù)绊寻。

JobScheduler的start方法的代碼如下所示:

def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()

    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)

    listenerBus.start()
    receiverTracker = new ReceiverTracker(ssc)
    inputInfoTracker = new InputInfoTracker(ssc)

    val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
      case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
      case _ => null
    }

    executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
      executorAllocClient,
      receiverTracker,
      ssc.conf,
      ssc.graph.batchDuration.milliseconds,
      clock)
    executorAllocationManager.foreach(ssc.addStreamingListener)
    receiverTracker.start()
    jobGenerator.start()
    executorAllocationManager.foreach(_.start())
    logInfo("Started JobScheduler")
  }

代碼中存在的 eventLoop: EventLoop[JobSchedulerEvent]對(duì)象,用以接收和處理事件悬秉。調(diào)用者通過調(diào)用其post方法向事件隊(duì)列注冊(cè)事件澄步。EventLoop開始執(zhí)行時(shí),會(huì)開啟一deamon線程用于處理隊(duì)列中的事件搂捧。EventLoop是一個(gè)抽象類驮俗,JobScheduler中初始化EventLoop時(shí)實(shí)現(xiàn)了其OnReceive方法懂缕。該方法中指定接收的事件由processEvent(event)方法處理允跑。

小結(jié)

JobScheduler是Spark Streaming中核心的組件,在其開始執(zhí)行時(shí),會(huì)開啟數(shù)據(jù)接收相關(guān)組件及Job生成相關(guān)組件聋丝,從而使數(shù)據(jù)準(zhǔn)備和數(shù)據(jù)計(jì)算兩個(gè)流程開始工作索烹。
另外,其還負(fù)責(zé)BackPressure, Executor DynamicAllocation 等優(yōu)化機(jī)制的啟動(dòng)工作弱睦。
下面的章節(jié)百姓,將對(duì)數(shù)據(jù)準(zhǔn)備和數(shù)據(jù)計(jì)算階段的流程進(jìn)行分析,以及BackPressure, Executor DynamicAllocation 機(jī)制進(jìn)行分析况木。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末垒拢,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子火惊,更是在濱河造成了極大的恐慌求类,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,378評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件屹耐,死亡現(xiàn)場(chǎng)離奇詭異尸疆,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)惶岭,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門寿弱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人按灶,你說我怎么就攤上這事症革。” “怎么了鸯旁?”我有些...
    開封第一講書人閱讀 152,702評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵地沮,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我羡亩,道長(zhǎng)摩疑,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,259評(píng)論 1 279
  • 正文 為了忘掉前任畏铆,我火速辦了婚禮酪穿,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘劫樟。我一直安慰自己葫盼,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,263評(píng)論 5 371
  • 文/花漫 我一把揭開白布瓦灶。 她就那樣靜靜地躺著鸠删,像睡著了一般。 火紅的嫁衣襯著肌膚如雪贼陶。 梳的紋絲不亂的頭發(fā)上刃泡,一...
    開封第一講書人閱讀 49,036評(píng)論 1 285
  • 那天巧娱,我揣著相機(jī)與錄音,去河邊找鬼烘贴。 笑死禁添,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的桨踪。 我是一名探鬼主播老翘,決...
    沈念sama閱讀 38,349評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼锻离!你這毒婦竟也來了铺峭?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,979評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤汽纠,失蹤者是張志新(化名)和其女友劉穎逛薇,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體疏虫,經(jīng)...
    沈念sama閱讀 43,469評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡永罚,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,938評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了卧秘。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片呢袱。...
    茶點(diǎn)故事閱讀 38,059評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖翅敌,靈堂內(nèi)的尸體忽然破棺而出羞福,到底是詐尸還是另有隱情,我是刑警寧澤蚯涮,帶...
    沈念sama閱讀 33,703評(píng)論 4 323
  • 正文 年R本政府宣布治专,位于F島的核電站,受9級(jí)特大地震影響遭顶,放射性物質(zhì)發(fā)生泄漏张峰。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,257評(píng)論 3 307
  • 文/蒙蒙 一棒旗、第九天 我趴在偏房一處隱蔽的房頂上張望喘批。 院中可真熱鬧,春花似錦铣揉、人聲如沸饶深。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽敌厘。三九已至,卻和暖如春朽合,著一層夾襖步出監(jiān)牢的瞬間俱两,已是汗流浹背饱狂。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留锋华,地道東北人嗡官。 一個(gè)月前我還...
    沈念sama閱讀 45,501評(píng)論 2 354
  • 正文 我出身青樓箭窜,卻偏偏與公主長(zhǎng)得像毯焕,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子磺樱,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,792評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容