[spark streaming] 動(dòng)態(tài)生成 Job 并提交執(zhí)行

前言

Spark Streaming Job的生成是通過JobGenerator每隔 batchDuration 長時(shí)間動(dòng)態(tài)生成的泳桦,每個(gè)batch 對(duì)應(yīng)提交一個(gè)JobSet琐馆,因?yàn)獒槍?duì)一個(gè)batch可能有多個(gè)輸出操作丹禀。

概述流程:

  • 定時(shí)器定時(shí)向 eventLoop 發(fā)送生成job的請(qǐng)求
  • 通過receiverTracker 為當(dāng)前batch分配block
  • 為當(dāng)前batch生成對(duì)應(yīng)的 Jobs
  • 將Jobs封裝成JobSet 提交執(zhí)行

入口

在 JobGenerator 初始化的時(shí)候就創(chuàng)建了一個(gè)定時(shí)器:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

每隔 batchDuration 就會(huì)向 eventLoop 發(fā)送 GenerateJobs(new Time(longTime))消息棵红,eventLoop的事件處理方法中會(huì)調(diào)用generateJobs(time)方法:

      case GenerateJobs(time) => generateJobs(time)
private def generateJobs(time: Time) {
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }

為當(dāng)前batchTime分配Block

首先調(diào)用receiverTracker.allocateBlocksToBatch(time)方法為當(dāng)前batchTime分配對(duì)應(yīng)的Block擅威,最終會(huì)調(diào)用receiverTracker的Block管理者receivedBlockTrackerallocateBlocksToBatch方法:

def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
      val streamIdToBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
      }.toMap
      val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
        timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
        lastAllocatedBatchTime = batchTime
      } else {
        logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
      }
    } else {
      logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
    }
  }
private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
    streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
  }

可以看到是從streamIdToUnallocatedBlockQueues中獲取到所有streamId對(duì)應(yīng)的未分配的blocks静汤,該隊(duì)列的信息是supervisor 存儲(chǔ)好Block后向receiverTracker上報(bào)的Block信息盆驹,詳情可見 ReceiverTracker 數(shù)據(jù)產(chǎn)生與存儲(chǔ)圆丹。

獲取到所有streamId對(duì)應(yīng)的未分配的blockInfos后,將其放入了timeToAllocatedBlocks:Map[Time, AllocatedBlocks]中躯喇,后面生成RDD的時(shí)候會(huì)用到辫封。

為當(dāng)前batchTime生成Jobs

調(diào)用DStreamGraphgenerateJobs方法為當(dāng)前batchTime生成job:

 def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
      outputStreams.flatMap { outputStream =>
        val jobOption = outputStream.generateJob(time)
        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }

一個(gè)outputStream就對(duì)應(yīng)一個(gè)job,遍歷所有的outputStreams廉丽,為其生成job:

# ForEachDStream
override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

先獲取到time對(duì)應(yīng)的RDD倦微,然后將其作為參數(shù)再調(diào)用foreachFunc方法,foreachFunc方法是通過構(gòu)造器傳過來的正压,我們來看看print()輸出的情況:

def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println(s"Time: $time")
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }

這里的構(gòu)造的foreachFunc方法就是最終和rdd一起提交job的執(zhí)行方法欣福,也即對(duì)rdd調(diào)用take()后并打印,真正觸發(fā)action操作的是在這個(gè)func函數(shù)里蔑匣,現(xiàn)在再來看看是怎么拿到rdd的劣欢,每個(gè)DStream都有一個(gè)generatedRDDs:Map[Time, RDD[T]]變量,來保存time對(duì)應(yīng)的RDD裁良,若獲取不到則會(huì)通過compute()方法來計(jì)算凿将,對(duì)于需要在executor上啟動(dòng)Receiver來接收數(shù)據(jù)的ReceiverInputDStream來說:

 override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        val receiverTracker = ssc.scheduler.receiverTracker
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

        // Register the input blocks information into InputInfoTracker
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

        // Create the BlockRDD
        createBlockRDD(validTime, blockInfos)
      }
    }
    Some(blockRDD)
  }

會(huì)通過receiverTracker來獲取該batch對(duì)應(yīng)的blocks,前面已經(jīng)分析過為所有streamId分配了對(duì)應(yīng)的未分配的block价脾,并且放在了timeToAllocatedBlocks:Map[Time, AllocatedBlocks]中牧抵,這里底層就是從這個(gè)timeToAllocatedBlocks獲取到的blocksInfo,然后調(diào)用了createBlockRDD(validTime, blockInfos)通過blockId創(chuàng)建了RDD。

最后犀变,將通過此RDD和foreachFun構(gòu)建jobFunc妹孙,并創(chuàng)建Job返回。

封裝jobs成JobSet并提交執(zhí)行

每個(gè)outputStream對(duì)應(yīng)一個(gè)Job获枝,最終就會(huì)生成一個(gè)jobs蠢正,為這個(gè)jobs創(chuàng)建JobSet,并通過jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))來提交這個(gè)JobSet:

jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))

然后通過jobExecutor來執(zhí)行省店,jobExecutor是一個(gè)線程池嚣崭,并行度默認(rèn)為1,可通過spark.streaming.concurrentJobs配置懦傍,即同時(shí)可執(zhí)行幾個(gè)批次的數(shù)據(jù)雹舀。

處理類JobHandler中調(diào)用的是Job.run(),執(zhí)行的是前面構(gòu)建的 jobFunc 方法粗俱。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末说榆,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子寸认,更是在濱河造成了極大的恐慌签财,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,277評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件废麻,死亡現(xiàn)場離奇詭異荠卷,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)烛愧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門油宜,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人怜姿,你說我怎么就攤上這事慎冤。” “怎么了沧卢?”我有些...
    開封第一講書人閱讀 163,624評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵蚁堤,是天一觀的道長。 經(jīng)常有香客問我但狭,道長披诗,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,356評(píng)論 1 293
  • 正文 為了忘掉前任立磁,我火速辦了婚禮呈队,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘唱歧。我一直安慰自己宪摧,他們只是感情好粒竖,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著几于,像睡著了一般蕊苗。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上沿彭,一...
    開封第一講書人閱讀 51,292評(píng)論 1 301
  • 那天朽砰,我揣著相機(jī)與錄音,去河邊找鬼喉刘。 笑死锅移,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的饱搏。 我是一名探鬼主播,決...
    沈念sama閱讀 40,135評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼置逻,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼推沸!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起券坞,我...
    開封第一講書人閱讀 38,992評(píng)論 0 275
  • 序言:老撾萬榮一對(duì)情侶失蹤鬓催,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后恨锚,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體宇驾,經(jīng)...
    沈念sama閱讀 45,429評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評(píng)論 3 334
  • 正文 我和宋清朗相戀三年猴伶,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了课舍。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,785評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡他挎,死狀恐怖筝尾,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情办桨,我是刑警寧澤筹淫,帶...
    沈念sama閱讀 35,492評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站呢撞,受9級(jí)特大地震影響损姜,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜殊霞,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評(píng)論 3 328
  • 文/蒙蒙 一摧阅、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧脓鹃,春花似錦逸尖、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽岩齿。三九已至,卻和暖如春苞俘,著一層夾襖步出監(jiān)牢的瞬間盹沈,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評(píng)論 1 269
  • 我被黑心中介騙來泰國打工吃谣, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留乞封,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,891評(píng)論 2 370
  • 正文 我出身青樓岗憋,卻偏偏與公主長得像肃晚,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子仔戈,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容