SparkStreaming Backpressure分析

—————?—————?—————?—————?—————?—————
Spark Streaming概述
Spark Streaming 初始化過程
Spark Streaming Receiver啟動(dòng)過程分析
Spark Streaming 數(shù)據(jù)準(zhǔn)備階段分析(Receiver方式)
Spark Streaming 數(shù)據(jù)計(jì)算階段分析
SparkStreaming Backpressure分析
Spark Streaming Executor DynamicAllocation 機(jī)制分析

—————?—————?—————?—————?—————?—————

1、引入Backpressure的原因

默認(rèn)情況下棒仍,Spark Streaming通過Receiver以生產(chǎn)者生產(chǎn)數(shù)據(jù)的速率接收數(shù)據(jù)澳淑,計(jì)算過程中會(huì)出現(xiàn)batch processing time > batch interval的情況,其中batch processing time為實(shí)際計(jì)算一個(gè)批次花費(fèi)時(shí)間滥玷,batch interval為Streaming應(yīng)用設(shè)置的批處理間隔辟癌。這意味著Spark Streaming的數(shù)據(jù)接收速率高于Spark從隊(duì)列中移除數(shù)據(jù)的速率寒屯,也就是數(shù)據(jù)處理能力低,在設(shè)置間隔內(nèi)不能完全處理當(dāng)前接收速率接收的數(shù)據(jù)。如果這種情況持續(xù)過長(zhǎng)的時(shí)間寡夹,會(huì)造成數(shù)據(jù)在內(nèi)存中堆積处面,導(dǎo)致Receiver所在Executor內(nèi)存溢出等問題(如果設(shè)置StorageLevel包含disk,則內(nèi)存存放不下的數(shù)據(jù)會(huì)溢寫至disk,加大延遲)。Spark 1.5以前版本菩掏,用戶如果要限制Receiver的數(shù)據(jù)接收速率魂角,可以通過設(shè)置靜態(tài)配制參數(shù) “spark.streaming.receiver.maxRate”的值來實(shí)現(xiàn),此舉雖然可以通過限制接收速率智绸,來適配當(dāng)前的處理能力野揪,防止內(nèi)存溢出,但也會(huì)引入其它問題瞧栗。比如:producer數(shù)據(jù)生產(chǎn)高于maxRate斯稳,當(dāng)前集群處理能力也高于maxRate,這就會(huì)造成資源利用率下降等問題迹恐。為了更好的協(xié)調(diào)數(shù)據(jù)接收速率與資源處理能力平挑,Spark Streaming從v1.5開始引入反壓機(jī)制(back-pressure),通過動(dòng)態(tài)控制數(shù)據(jù)接收速率來適配集群數(shù)據(jù)處理能力。

2系草、BackPressure架構(gòu)模型

Spark Streaming Backpressure:根據(jù)JobScheduler反饋?zhàn)鳂I(yè)的執(zhí)行信息來動(dòng)態(tài)調(diào)整Receiver數(shù)據(jù)接收率。通過屬性“spark.streaming.backpressure.enabled
”來控制是否啟用backpressure機(jī)制唆涝,默認(rèn)值false找都,即不啟用。

2.1 Spark Streaming架構(gòu)

Spark Streaming架構(gòu)如下圖所示(對(duì)其詳細(xì)解析廊酣,參見"Spark Streaming 數(shù)據(jù)準(zhǔn)備階段分析""Spark Streaming 數(shù)據(jù)計(jì)算階段分析"

2.2 BackPressure執(zhí)行過程

在原架構(gòu)的基礎(chǔ)上加上一個(gè)新的組件RateController,這個(gè)組件負(fù)責(zé)監(jiān)聽“OnBatchCompleted”事件能耻,然后從中抽取processingDelay 及schedulingDelay信息. Estimator依據(jù)這些信息估算出最大處理速度(rate),最后由基于Receiver的Input Stream將rate通過ReceiverTracker與ReceiverSupervisorImpl轉(zhuǎn)發(fā)給BlockGenerator(繼承自RateLimiter).

BackPressure執(zhí)行流程

3亡驰、BackPressure 源碼解析

3.1 RateController類體系結(jié)構(gòu)

RateController繼承自StreamingListener.用于處理BatchCompleted事件晓猛。
其實(shí)類繼承結(jié)構(gòu)如下代碼所示:

/**
 * A StreamingListener that receives batch completion updates, and maintains
 * an estimate of the speed at which this stream should ingest messages,
 * given an estimate computation from a `RateEstimator`
 */
private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
    extends StreamingListener with Serializable {
}

3.2 RateController的注冊(cè)

JobScheduler啟動(dòng)時(shí)會(huì)抽取在DStreamGraph中注冊(cè)的所有InputDstream中的rateController,并向ListenerBus注冊(cè)并開啟監(jiān)聽凡辱。

 // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)
 listenerBus.start()

3.3 BackPressure 執(zhí)行過程分析

BackPressure 執(zhí)行過程分為BatchCompleted事件觸發(fā)時(shí)機(jī)和事件處理兩個(gè)過程:

  • BatchCompleted觸發(fā)過程
  • BatchCompleted事件處理過程

3.3.1 BatchCompleted觸發(fā)過程

對(duì)BatchedCompleted的分析戒职,應(yīng)該從JobScheduler入手,因?yàn)锽atchedCompleted是批次處理結(jié)束的標(biāo)志透乾,也就是JobScheduler調(diào)度的作業(yè)執(zhí)行完成時(shí)觸發(fā)的洪燥,因此進(jìn)行作業(yè)調(diào)度執(zhí)行分析。
JobGenerater在調(diào)用generateJobs()方法生成Job后乳乌,會(huì)使用JobScheduler的submitJobSet方法對(duì)Job進(jìn)行提交. submitJobSet的具體實(shí)現(xiàn)如下:

  def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
  }

其中捧韵,jobSet中的Job將通過jobExecutor進(jìn)行處理却紧,對(duì)Job進(jìn)行處理的處理器為JobHandler寿烟。JobHandler用于執(zhí)行Job及處理Job執(zhí)行結(jié)果信息。當(dāng)Job執(zhí)行完成時(shí)會(huì)產(chǎn)生JobCompleted事件. JobHandler的具體邏輯如下面代碼所示:

private class JobHandler(job: Job) extends Runnable with Logging {
    import JobScheduler._

    def run() {
      val oldProps = ssc.sparkContext.getLocalProperties
      try {
        logInfo("Handler job at " + job.time)
        ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
        val formattedTime = UIUtils.formatBatchTime(
          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

        ssc.sc.setJobDescription(
          s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
        // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
        // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
        ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")

        // We need to assign `eventLoop` to a temp variable. Otherwise, because
        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
        // it's possible that when `post` is called, `eventLoop` happens to null.
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details.
          SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
          }
        } else {
          // JobScheduler has been stopped.
        }
      } finally {
        ssc.sparkContext.setLocalProperties(oldProps)
      }
    }
  }
}

當(dāng)Job執(zhí)行完成時(shí)闻鉴,向eventLoop發(fā)送JobCompleted事件磷瘤。EventLoop事件處理器接到JobCompleted事件后將調(diào)用handleJobCompletion 來處理Job完成事件芒篷。handleJobCompletion使用Job執(zhí)行信息創(chuàng)建StreamingListenerBatchCompleted事件并通過StreamingListenerBus向監(jiān)聽器發(fā)送搜变。實(shí)現(xiàn)如下:

private def handleJobCompletion(job: Job, completedTime: Long) {
    val jobSet = jobSets.get(job.time)
    jobSet.handleJobCompletion(job)
    job.setEndTime(completedTime)
    listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
    logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
    if (jobSet.hasCompleted) {
      jobSets.remove(jobSet.time)
      jobGenerator.onBatchCompletion(jobSet.time)
      logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
        jobSet.totalDelay / 1000.0, jobSet.time.toString,
        jobSet.processingDelay / 1000.0
      ))
      listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
    }
    job.result match {
      case Failure(e) =>
        reportError("Error running job " + job, e)
      case _ =>
    }
  }

3.3.2 BatchCompleted事件的處理過程

StreamingListenerBus將事件轉(zhuǎn)交給具體的StreamingListener,因此BatchCompleted將交由RateController進(jìn)行處理梭伐。RateController接到BatchCompleted事件后將調(diào)用onBatchCompleted對(duì)事件進(jìn)行處理痹雅。

 override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
    val elements = batchCompleted.batchInfo.streamIdToInputInfo

    for {
      processingEnd <- batchCompleted.batchInfo.processingEndTime
      workDelay <- batchCompleted.batchInfo.processingDelay
      waitDelay <- batchCompleted.batchInfo.schedulingDelay
      elems <- elements.get(streamUID).map(_.numRecords)
    } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
  }

onBatchCompleted會(huì)從完成的任務(wù)中抽取任務(wù)的執(zhí)行延遲和調(diào)度延遲,然后用這兩個(gè)參數(shù)用RateEstimator(目前存在唯一實(shí)現(xiàn)PIDRateEstimator糊识,proportional-integral-derivative (PID) controller绩社,PID控制器)估算出新的rate并發(fā)布。代碼如下:

/**
   * Compute the new rate limit and publish it asynchronously.
   */
  private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
    Future[Unit] {
      val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
      newRate.foreach { s =>
        rateLimit.set(s.toLong)
        publish(getLatestRate())
      }
    }

其中publish()由RateController的子類ReceiverRateController來定義赂苗。具體邏輯如下(ReceiverInputDStream中定義):

  /**
   * A RateController that sends the new rate to receivers, via the receiver tracker.
   */
  private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
      extends RateController(id, estimator) {
    override def publish(rate: Long): Unit =
      ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
  }

publish的功能為新生成的rate借助ReceiverTracker進(jìn)行轉(zhuǎn)發(fā)愉耙。ReceiverTracker將rate包裝成UpdateReceiverRateLimit事件并發(fā)送給ReceiverTrackerEndpoint.


  /** Update a receiver's maximum ingestion rate */
  def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
    if (isTrackerStarted) {
      endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
    }
  }

ReceiverTrackerEndpoint接到消息后,其將會(huì)從receiverTrackingInfos列表中獲取Receiver注冊(cè)時(shí)使用的endpoint(實(shí)為ReceiverSupervisorImpl)拌滋,再將rate包裝成UpdateLimit發(fā)送至endpoint.其接到信息后朴沿,使用updateRate更新BlockGenerators(RateLimiter子類),來計(jì)算出一個(gè)固定的令牌間隔。

/** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
  private val endpoint = env.rpcEnv.setupEndpoint(
    "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
      override val rpcEnv: RpcEnv = env.rpcEnv

      override def receive: PartialFunction[Any, Unit] = {
        case StopReceiver =>
          logInfo("Received stop signal")
          ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
        case CleanupOldBlocks(threshTime) =>
          logDebug("Received delete old batch signal")
          cleanupOldBlocks(threshTime)
        case UpdateRateLimit(eps) =>
          logInfo(s"Received a new rate limit: $eps.")
          registeredBlockGenerators.asScala.foreach { bg =>
            bg.updateRate(eps)
          }
      }
    })

其中RateLimiter的updateRate實(shí)現(xiàn)如下:

 /**
   * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
   * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
   *
   * @param newRate A new rate in records per second. It has no effect if it's 0 or negative.
   */
  private[receiver] def updateRate(newRate: Long): Unit =
    if (newRate > 0) {
      if (maxRateLimit > 0) {
        rateLimiter.setRate(newRate.min(maxRateLimit))
      } else {
        rateLimiter.setRate(newRate)
      }
    }

其中rateLimiter的setRate的實(shí)現(xiàn)如下:

  public final void setRate(double permitsPerSecond) {
        Preconditions.checkArgument(permitsPerSecond > 0.0D && !Double.isNaN(permitsPerSecond), "rate must be positive");
        Object var3 = this.mutex;
        synchronized(this.mutex) {
            this.resync(this.readSafeMicros());
            double stableIntervalMicros = (double)TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
            this.stableIntervalMicros = stableIntervalMicros;
            this.doSetRate(permitsPerSecond, stableIntervalMicros);
        }
    }

到此败砂,backpressure反壓機(jī)制調(diào)整rate結(jié)束赌渣。

4、流量控制點(diǎn)(生效位置)

當(dāng)Receiver開始接收數(shù)據(jù)時(shí)昌犹,會(huì)通過supervisor.pushSingle()方法將接收的數(shù)據(jù)存入currentBuffer等待BlockGenerator定時(shí)將數(shù)據(jù)取走坚芜,包裝成block. 在將數(shù)據(jù)存放入currentBuffer之時(shí),要獲取許可(令牌)斜姥。如果獲取到許可就可以將數(shù)據(jù)存入buffer, 否則將被阻塞鸿竖,進(jìn)而阻塞Receiver從數(shù)據(jù)源拉取數(shù)據(jù)。

 /**
   * Push a single data item into the buffer.
   */
  def addData(data: Any): Unit = {
    if (state == Active) {
      waitToPush()
      synchronized {
        if (state == Active) {
          currentBuffer += data
        } else {
          throw new SparkException(
            "Cannot add data as BlockGenerator has not been started or has been stopped")
        }
      }
    } else {
      throw new SparkException(
        "Cannot add data as BlockGenerator has not been started or has been stopped")
    }
  }

其令牌投放采用令牌桶機(jī)制進(jìn)行铸敏, 原理如下圖所示:



  令牌桶機(jī)制: 大小固定的令牌桶可自行以恒定的速率源源不斷地產(chǎn)生令牌缚忧。如果令牌不被消耗,或者被消耗的速度小于產(chǎn)生的速度杈笔,令牌就會(huì)不斷地增多闪水,直到把桶填滿。后面再產(chǎn)生的令牌就會(huì)從桶中溢出蒙具。最后桶中可以保存的最大令牌數(shù)永遠(yuǎn)不會(huì)超過桶的大小敦第。當(dāng)進(jìn)行某操作時(shí)需要令牌時(shí)會(huì)從令牌桶中取出相應(yīng)的令牌數(shù),如果獲取到則繼續(xù)操作店量,否則阻塞芜果。用完之后不用放回。
  Streaming 數(shù)據(jù)流被Receiver接收后融师,按行解析后存入iterator中右钾。然后逐個(gè)存入Buffer,在存入buffer時(shí)會(huì)先獲取token,如果沒有token存在舀射,則阻塞窘茁;如果獲取到則將數(shù)據(jù)存入buffer. 然后等價(jià)后續(xù)生成block操作。

本文最初是本人發(fā)在博客園(http://www.cnblogs.com/barrenlake/p/5349949.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末脆烟,一起剝皮案震驚了整個(gè)濱河市山林,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌邢羔,老刑警劉巖驼抹,帶你破解...
    沈念sama閱讀 206,378評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異拜鹤,居然都是意外死亡框冀,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門敏簿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來明也,“玉大人,你說我怎么就攤上這事惯裕∥率” “怎么了?”我有些...
    開封第一講書人閱讀 152,702評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵蜻势,是天一觀的道長(zhǎng)撑刺。 經(jīng)常有香客問我,道長(zhǎng)咙边,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,259評(píng)論 1 279
  • 正文 為了忘掉前任次员,我火速辦了婚禮败许,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘淑蔚。我一直安慰自己市殷,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,263評(píng)論 5 371
  • 文/花漫 我一把揭開白布刹衫。 她就那樣靜靜地躺著醋寝,像睡著了一般。 火紅的嫁衣襯著肌膚如雪带迟。 梳的紋絲不亂的頭發(fā)上音羞,一...
    開封第一講書人閱讀 49,036評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音仓犬,去河邊找鬼嗅绰。 笑死,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的窘面。 我是一名探鬼主播翠语,決...
    沈念sama閱讀 38,349評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼财边!你這毒婦竟也來了肌括?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,979評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤酣难,失蹤者是張志新(化名)和其女友劉穎谍夭,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鲸鹦,經(jīng)...
    沈念sama閱讀 43,469評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡慧库,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,938評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了馋嗜。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片齐板。...
    茶點(diǎn)故事閱讀 38,059評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖葛菇,靈堂內(nèi)的尸體忽然破棺而出甘磨,到底是詐尸還是另有隱情,我是刑警寧澤眯停,帶...
    沈念sama閱讀 33,703評(píng)論 4 323
  • 正文 年R本政府宣布济舆,位于F島的核電站,受9級(jí)特大地震影響莺债,放射性物質(zhì)發(fā)生泄漏滋觉。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,257評(píng)論 3 307
  • 文/蒙蒙 一齐邦、第九天 我趴在偏房一處隱蔽的房頂上張望椎侠。 院中可真熱鬧,春花似錦措拇、人聲如沸我纪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽浅悉。三九已至,卻和暖如春券犁,著一層夾襖步出監(jiān)牢的瞬間术健,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來泰國(guó)打工粘衬, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留苛坚,地道東北人比被。 一個(gè)月前我還...
    沈念sama閱讀 45,501評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像泼舱,于是被迫代替她去往敵國(guó)和親等缀。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,792評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容