深入理解Spark Streaming流量控制及反壓機制

目錄

流量控制簡介

在流式處理系統(tǒng)中短荐,流量控制(rate control/rate limit)是一個非常重要的話題斗塘。對系統(tǒng)進行流控,主要目的是為了保證運行的穩(wěn)定性,防止突發(fā)大流量造成整個系統(tǒng)的擾動(throttle),長時間或劇烈的擾動甚至會使系統(tǒng)宕機。另外,為了保證系統(tǒng)的吞吐量最大化,也需要設計合理的流控門檻落追,避免系統(tǒng)空轉使資源利用率降低。

Spark Streaming作為基于微批次(micro-batch)的流處理框架涯肩,其流量的理想狀態(tài)就是官方文檔中所說的“batches of data should be processed as fast as they are being generated”轿钠,即每一批次的處理時長batch_process_time需要小于(但是又比較接近)我們設定的批次間隔batch_interval巢钓。如果batch_process_time > batch_interval,說明程序的處理能力不足疗垛,積累的數據越來越多症汹,最終會造成Executor內存溢出。如果batch_process_time << batch_interval贷腕,說明系統(tǒng)有很長時間是空閑的背镇,應該適當提升流量。

Spark Streaming流控基本設置

Spark Streaming通過Executor里的Receiver組件源源不斷地接收外部數據泽裳,并通過BlockManager將外部數據轉化為Spark中的塊進行存儲瞒斩。Spark Streaming機制的簡單框圖如下所示。


要限制Receiver接收數據的速率涮总,可以在SparkConf中設置配置項spark.streaming.receiver.maxRate胸囱,單位為數據條數/秒。如果采用的是基于Direct Stream方式的Kafka連接瀑梗,不經過Receiver烹笔,就得設置配置項spark.streaming.kafka.maxRatePerPartition來限流,單位是每分區(qū)的數據條數/秒抛丽。

這兩種方式的優(yōu)點是設置非常簡單谤职,只需要通過實際業(yè)務的吞吐量估算一下使批次間隔和處理耗時基本達到平衡的速率就可以了。缺點是一旦業(yè)務量發(fā)生變化亿鲜,就只能手動修改參數并重啟Streaming程序允蜈。另外,人為估計的參數畢竟有可能不準蒿柳,設置得太激進或太保守都不好饶套。

所以,Spark后來提出了動態(tài)流量控制的方案其馏,能夠根據當前系統(tǒng)的處理速度智能地調節(jié)流量閾值,名為反壓(back pressure)機制爆安。其在1.5版本開始加入叛复,ASF JIRA中對應的issue是SPARK-7398。要啟用它扔仓,只需要將配置項spark.streaming.backpressure.enabled設為true就可以(默認值為false)褐奥。

反壓機制看似簡單,但它背后有一套非常精巧的控制邏輯翘簇,下面就來深入看一看撬码。

Spark Streaming反壓機制的具體實現(xiàn)

動態(tài)流量控制器

o.a.s.streaming.scheduler.RateController抽象類是動態(tài)流量控制的核心。其源碼不甚長版保,抄錄如下呜笑。

private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
    extends StreamingListener with Serializable {
  init()

  protected def publish(rate: Long): Unit

  @transient
  implicit private var executionContext: ExecutionContext = _

  @transient
  private var rateLimit: AtomicLong = _

  private def init() {
    executionContext = ExecutionContext.fromExecutorService(
      ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update"))
    rateLimit = new AtomicLong(-1L)
  }

  private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
    ois.defaultReadObject()
    init()
  }

  private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
    Future[Unit] {
      val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
      newRate.foreach { s =>
        rateLimit.set(s.toLong)
        publish(getLatestRate())
      }
    }

  def getLatestRate(): Long = rateLimit.get()

  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
    val elements = batchCompleted.batchInfo.streamIdToInputInfo

    for {
      processingEnd <- batchCompleted.batchInfo.processingEndTime
      workDelay <- batchCompleted.batchInfo.processingDelay
      waitDelay <- batchCompleted.batchInfo.schedulingDelay
      elems <- elements.get(streamUID).map(_.numRecords)
    } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
  }
}

可見夫否,RateController抽象類繼承自StreamingListener特征,表示它是一個Streaming監(jiān)聽器叫胁。在之前的Spark Core源碼精讀系列文章中已經講過了監(jiān)聽器和事件總線機制凰慈,因此不再多說了。

RateController的主要工作如下:

  • 監(jiān)聽StreamingListenerBatchCompleted事件驼鹅,該事件表示一個批次已經處理完成微谓。
  • 從該事件的BatchInfo實例中取得:處理完成的時間戳processingEndTime、實際處理時長processingDelay(從批次的第一個job開始處理到最后一個job處理完成經過的時間)输钩、調度時延schedulingDelay(從批次被提交給Streaming JobScheduler到第一個job開始處理經過的時間)豺型。
  • 另外從事件的StreamInputInfo實例中取得批次輸入數據的條數numRecords。
  • 將取得的以上4個參數傳遞給速率估算器RateEstimator买乃,計算出新的流量閾值姻氨,并將其發(fā)布出去。

通過RateController的子類ReceiverRateController實現(xiàn)的publish()抽象方法可知为牍,新的流量閾值是發(fā)布給了ReceiverTracker哼绑。

  private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
      extends RateController(id, estimator) {
    override def publish(rate: Long): Unit =
      ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
  }

不過下面先看速率估算器RateEstimator的實現(xiàn),稍后再回來看ReceiverTracker之后的事情碉咆。

基于PID機制的速率估算器

o.a.s.streaming.scheduler.rate.RateEstimator是一個很短的特征抖韩,其中只給出了計算流量閾值的方法compute()的定義。它還有一個伴生對象用于創(chuàng)建速率估算器的實例疫铜,其中寫出了更多關于反壓機制的配置參數茂浮。

object RateEstimator {
  def create(conf: SparkConf, batchInterval: Duration): RateEstimator =
    conf.get("spark.streaming.backpressure.rateEstimator", "pid") match {
      case "pid" =>
        val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
        val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
        val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
        val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)
        new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate)

      case estimator =>
        throw new IllegalArgumentException(s"Unknown rate estimator: $estimator")
    }
}

目前RateEstimator的唯一實現(xiàn)類是PIDRateEstimator,亦即spark.streaming.backpressure.rateEstimator配置項的值只能為pid壳咕。其具體代碼如下席揽。

private[streaming] class PIDRateEstimator(
    batchIntervalMillis: Long,
    proportional: Double,
    integral: Double,
    derivative: Double,
    minRate: Double
  ) extends RateEstimator with Logging {
  private var firstRun: Boolean = true
  private var latestTime: Long = -1L
  private var latestRate: Double = -1D
  private var latestError: Double = -1L

  def compute(
      time: Long, 
      numElements: Long,
      processingDelay: Long,
      schedulingDelay: Long 
    ): Option[Double] = {
    this.synchronized {
      if (time > latestTime && numElements > 0 && processingDelay > 0) {
        val delaySinceUpdate = (time - latestTime).toDouble / 1000
        val processingRate = numElements.toDouble / processingDelay * 1000
        val error = latestRate - processingRate

        val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis
        val dError = (error - latestError) / delaySinceUpdate

        val newRate = (latestRate - proportional * error -
                                    integral * historicalError -
                                    derivative * dError).max(minRate) 

        latestTime = time
        if (firstRun) {
          latestRate = processingRate
          latestError = 0D
          firstRun = false
          None
        } else {
          latestRate = newRate
          latestError = error
          Some(newRate)
        }
      } else {
        None
      }
    }
  }
}

PIDRateEstimator充分運用了工控領域中常見的PID控制器的思想。所謂PID控制器谓厘,即比例(Proportional)-積分(Integral)-微分(Derivative)控制器幌羞,本質上是一種反饋回路(loop feedback)。它把收集到的數據和一個設定值(setpoint)進行比較竟稳,然后用它們之間的差計算新的輸入值属桦,該輸入值可以讓系統(tǒng)數據盡量接近或者達到設定值。

下圖示出PID控制器的基本原理他爸。


亦即:


其中e(t)代表誤差聂宾,即設定值與回授值之間的差。也就是說诊笤,比例單元對應當前誤差系谐,積分單元對應過去累積誤差,而微分單元對應將來誤差讨跟〖退控制三個單元的增益因子分別為Kp鄙煤、Ki、Kd止喷。

回到PIDRateEstimator的源碼來馆类,對應以上的式子,我們可以得知:

  • 處理速率的設定值其實就是上一批次的處理速率latestRate弹谁,回授值就是這一批次的速率processingRate乾巧,誤差error自然就是兩者之差。
  • 過去累積誤差在這里體現(xiàn)為調度時延的過程中數據積壓的速度预愤,也就是schedulingDelay * processingRate / batchInterval沟于。
  • 將來誤差就是上面算出的error對時間微分的結果。

將上面三者綜合起來植康,就可以根據Spark Streaming在上一批次以及這一批次的處理速率旷太,估算出一個合適的用于下一批次的流量閾值。比例增益Kpspark.streaming.backpressure.pid.proportional控制销睁,默認值1.0供璧;積分增益Kispark.streaming.backpressure.pid.integral控制,默認值0.2冻记;微分增益Kdspark.streaming.backpressure.pid.derived控制睡毒,默認值0.0。

除了上述參數之外冗栗,還有兩個參數與反壓機制相關演顾。一是spark.streaming.backpressure.initialRate,用于控制初始化時的處理速率隅居。二是spark.streaming.backpressure.pid.minRate钠至,用于控制最小處理速率,默認值100條/秒胎源。

通過RPC發(fā)布流量閾值

回來看ReceiverTracker棉钧,顧名思義,它負責追蹤Receiver的狀態(tài)涕蚤。其sendRateUpdate()方法如下宪卿。

  def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
    if (isTrackerStarted) {
      endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
    }
  }

其中endpoint是RPC端點的引用,具體來說赞季,是ReceiverTrackerEndpoint的引用愧捕。這個方法會將流ID與新的流量閾值包裝在UpdateReceiverRateLimit消息中發(fā)送過去奢驯。

ReceiverTrackerEndpoint收到這條消息后申钩,會再將其包裝為UpdateRateLimit消息并發(fā)送給Receiver注冊時的RPC端點(位于ReceiverSupervisorImpl類中)。

  private val endpoint = env.rpcEnv.setupEndpoint(
    "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
      override val rpcEnv: RpcEnv = env.rpcEnv

      override def receive: PartialFunction[Any, Unit] = {
        case StopReceiver =>
          logInfo("Received stop signal")
          ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
        case CleanupOldBlocks(threshTime) =>
          logDebug("Received delete old batch signal")
          cleanupOldBlocks(threshTime)
        case UpdateRateLimit(eps) =>
          logInfo(s"Received a new rate limit: $eps.")
          registeredBlockGenerators.asScala.foreach { bg =>
            bg.updateRate(eps)
          }
      }
    })

可見瘪阁,收到該消息之后調用了BlockGenerator.updateRate()方法撒遣。BlockGenerator是RateLimiter的子類邮偎,它負責將收到的流數據轉化成塊存儲。updateRate()方法是在RateLimiter抽象類中實現(xiàn)的义黎。

  private[receiver] def updateRate(newRate: Long): Unit =
    if (newRate > 0) {
      if (maxRateLimit > 0) {
        rateLimiter.setRate(newRate.min(maxRateLimit))
      } else {
        rateLimiter.setRate(newRate)
      }
    }

這里最終借助了Guava中的限流器RateLimiter實現(xiàn)限流(Spark是不會重復造輪子的)禾进,其中maxRateLimit就是前面提到過的spark.streaming.receiver.maxRate參數。至此廉涕,新的流量閾值就設置好了泻云。

以上就是與反壓機制有關的全部細節(jié),整個流程可以用下面的框圖表示狐蜕。


還有最后一個小問題宠纯,流量閾值設定好之后是如何生效的?這其實已經超出了本文的范疇层释,簡單看一下婆瓜。

借助Guava令牌桶完成流量控制

Receiver在收到一條數據之后,會調用BlockGenerator.addData()方法贡羔,將數據存入緩存廉白。然后再從緩存取數據,并包裝成一個個block乖寒。

  def addData(data: Any): Unit = {
    if (state == Active) {
      waitToPush()
      synchronized {
        if (state == Active) {
          currentBuffer += data
        } else {
          throw new SparkException(
            "Cannot add data as BlockGenerator has not been started or has been stopped")
        }
      }
    } else {
      throw new SparkException(
        "Cannot add data as BlockGenerator has not been started or has been stopped")
    }
  }

注意到在真正存入緩存之前猴蹂,先調用了waitToPush()方法,它本質上就是Guava的RateLimiter.acquire()方法宵统。

  @CanIgnoreReturnValue
  public double acquire() {
    return acquire(1);
  }

  @CanIgnoreReturnValue
  public double acquire(int permits) {
    long microsToWait = reserve(permits);
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
  }

  @Override
  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    resync(nowMicros);
    long returnValue = nextFreeTicketMicros;
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    double freshPermits = requiredPermits - storedPermitsToSpend;
    long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);

    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
  }

Guava的限流器是計算機網絡中經典限流方法——令牌桶(token bucket)算法的典型實現(xiàn)晕讲。acquire()方法的作用是從RateLimiter獲取一個令牌(這里叫permit),如果能夠取到令牌才將數據緩存马澈,如果不能取到令牌就會被阻塞瓢省。RateLimiter.setRate()方法就是通過改變向令牌桶中放入令牌的速率(參數名稱permitsPerSecond)來實現(xiàn)流量控制的。

關于令牌桶算法的細節(jié)痊班,可以參見英文維基勤婚,也可以參考Guava源碼,內容十分豐富涤伐。下圖只是一個簡單的示意馒胆。

The End

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市凝果,隨后出現(xiàn)的幾起案子祝迂,更是在濱河造成了極大的恐慌,老刑警劉巖器净,帶你破解...
    沈念sama閱讀 216,744評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件型雳,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機纠俭,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,505評論 3 392
  • 文/潘曉璐 我一進店門沿量,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人冤荆,你說我怎么就攤上這事朴则。” “怎么了钓简?”我有些...
    開封第一講書人閱讀 163,105評論 0 353
  • 文/不壞的土叔 我叫張陵乌妒,是天一觀的道長。 經常有香客問我外邓,道長芥被,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,242評論 1 292
  • 正文 為了忘掉前任坐榆,我火速辦了婚禮拴魄,結果婚禮上,老公的妹妹穿的比我還像新娘席镀。我一直安慰自己匹中,他們只是感情好,可當我...
    茶點故事閱讀 67,269評論 6 389
  • 文/花漫 我一把揭開白布豪诲。 她就那樣靜靜地躺著顶捷,像睡著了一般。 火紅的嫁衣襯著肌膚如雪屎篱。 梳的紋絲不亂的頭發(fā)上服赎,一...
    開封第一講書人閱讀 51,215評論 1 299
  • 那天,我揣著相機與錄音交播,去河邊找鬼重虑。 笑死,一個胖子當著我的面吹牛秦士,可吹牛的內容都是我干的缺厉。 我是一名探鬼主播,決...
    沈念sama閱讀 40,096評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼隧土,長吁一口氣:“原來是場噩夢啊……” “哼提针!你這毒婦竟也來了?” 一聲冷哼從身側響起曹傀,我...
    開封第一講書人閱讀 38,939評論 0 274
  • 序言:老撾萬榮一對情侶失蹤辐脖,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后皆愉,有當地人在樹林里發(fā)現(xiàn)了一具尸體嗜价,經...
    沈念sama閱讀 45,354評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡履植,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,573評論 2 333
  • 正文 我和宋清朗相戀三年豌鸡,在試婚紗的時候發(fā)現(xiàn)自己被綠了枝恋。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片先壕。...
    茶點故事閱讀 39,745評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡翔脱,死狀恐怖奴拦,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情届吁,我是刑警寧澤错妖,帶...
    沈念sama閱讀 35,448評論 5 344
  • 正文 年R本政府宣布,位于F島的核電站疚沐,受9級特大地震影響暂氯,放射性物質發(fā)生泄漏。R本人自食惡果不足惜亮蛔,卻給世界環(huán)境...
    茶點故事閱讀 41,048評論 3 327
  • 文/蒙蒙 一痴施、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧究流,春花似錦辣吃、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,683評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至偷仿,卻和暖如春哩簿,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背酝静。 一陣腳步聲響...
    開封第一講書人閱讀 32,838評論 1 269
  • 我被黑心中介騙來泰國打工节榜, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人别智。 一個月前我還...
    沈念sama閱讀 47,776評論 2 369
  • 正文 我出身青樓全跨,卻偏偏與公主長得像,于是被迫代替她去往敵國和親亿遂。 傳聞我的和親對象是個殘疾皇子浓若,可洞房花燭夜當晚...
    茶點故事閱讀 44,652評論 2 354