Spark Streaming(4) - 反壓

1. 前言

Spark Streaming在處理不斷流入的數(shù)據(jù)時通過每間隔一段時間(batch interval)將這段時間內(nèi)的流入的數(shù)據(jù)積累為一個batch仿耽,然后以這個batch內(nèi)的數(shù)據(jù)作為job DAG的輸入rdd提交新的job運行。當(dāng)一個batch的的處理時間大于batch interval時开缎,意味著數(shù)據(jù)處理速度跟不上數(shù)據(jù)接收速度奕删,此時在數(shù)據(jù)接收端(即Receiver一般數(shù)據(jù)接收端都運行在executor上)就會積累數(shù)據(jù),而數(shù)據(jù)是通過BlockManager管理的侮邀,如果數(shù)據(jù)存儲采取MEMORY_ONLY模式就會導(dǎo)致OOM铝宵,采取MEMORY_AND_DISK多余的數(shù)據(jù)保存到磁盤上反而會增加數(shù)據(jù)讀取時間华畏。

說到這里鹏秋,反壓實際就是一種根據(jù)當(dāng)前系統(tǒng)的處理能力來動態(tài)調(diào)節(jié)接收數(shù)據(jù)速率的功能。

2. 反壓

前言中提到數(shù)據(jù)接收端Receiver亡笑,可以參考文章Spark stream receiver,簡單說就是stream job運行期間會有一個或者多個Receiver運行在Executor上專門接收數(shù)據(jù)侣夷,并以batch interval為時間間隔將流式數(shù)據(jù)分割為一個個batch,然后以一個batch的數(shù)據(jù)啟動job。但是stream job中Receiver并不是必然存在的仑乌,例外的情況是當(dāng)數(shù)據(jù)源是kafka時百拓,spark內(nèi)置了一種叫DirectKafkaInputDStream的輸入源(可以通過KafkaUtils.createDirectStream(...)創(chuàng)建),這種類型的InputDStream(輸入源會實現(xiàn)這個類)沒有Receiver晰甚。對于那些帶Receiver的InputDStream實現(xiàn)類衙传,當(dāng)從InputDStream創(chuàng)建RDD時,源頭RDD中的數(shù)據(jù)就是Receiver接收的數(shù)據(jù),而從DirectKafkaInputDStream創(chuàng)建RDD時,數(shù)據(jù)實際上還沒有從kafka讀取過來,這個時候的RDD只包含了kafka的topic以及offset信息,等到rdd對應(yīng)的task運行時才從kafka中獲取數(shù)據(jù)。

由于存在有Receiver和沒有兩種情況比庄,實際上反壓的實現(xiàn)也不一樣净神。有Receiver時控制Receiver接收數(shù)據(jù)的速率就可以了,沒有Receiver的DirectKafkaInputDStream時的實現(xiàn)會在后文單獨提一下。

關(guān)于spark stream的原理可以參考:

  1. Spark Streaming(1) - 基本原理
  2. Spark Streaming(2) - JobScheduler岖寞、JobGenerator
  3. Spark Streaming(3) - Receiver和ReceiverTacker

2.1 開啟反壓

指定配置spark.streaming.backpressure.enabled為true即可開啟反壓。

2.2 有Receiver時反壓原理

反壓的原理是根據(jù)之前系統(tǒng)的處理能力來調(diào)節(jié)未來系統(tǒng)接收數(shù)據(jù)速率,它的過程是下面這樣的:

stream (stream id標(biāo)志)里所有job完成后
-> 反饋運行信息(包括威始,開始結(jié)束時間脓斩、處理本次處理記錄數(shù)等信息) 給JobScheduler
   -> JobScheduler將信息交給RateController(通過一個job成功事件,下文會說到) ,RateController根據(jù)反饋信息計算接下來應(yīng)該控制住Receiver接收多少條數(shù)據(jù) 
      -> RateController委托JobScheduler的receiverTracker將的計算結(jié)果通知給所有在Executor上運行的Receiver 
          -> Receiever控制接收數(shù)據(jù)速率

上面過程中JobScheduler處在核心的位置绰垂,由它來負(fù)責(zé)協(xié)調(diào),接下來分別講述RateController念恍,以及Recceiver是如何控制速率的。

2.2.1 RateController

RateController實現(xiàn)了StreamingListener, 它作為JobScheduler的lister恋沃,監(jiān)聽這個stream job的提交恕洲,開始運行户辞,以及完成双仍。其實它只關(guān)心StreamingListenerBatchCompleted事件的發(fā)生(該事件表示任務(wù)成功執(zhí)行),這個事件包含了如下信息:

case class BatchInfo(
    batchTime: Time,   
    streamIdToInputInfo: Map[Int, StreamInputInfo],
    submissionTime: Long,
    processingStartTime: Option[Long],
    processingEndTime: Option[Long],
    outputOperationInfos: Map[Int, OutputOperationInfo]
  ) 
  1. RateController從何而來锐帜?
    RateContoller 是InputDStream的成員,并在子類ReceiverInputDStream中初始化了具體的實例肛跌,如下:
abstract class ReceiverInputDStream {
override protected[streaming] val rateController: Option[RateController] = {
    // 開啟反壓的情況下創(chuàng)建了了ReceiverRateController
    if (RateController.isBackPressureEnabled(ssc.conf)) {
      Some(new ReceiverRateController(id, RateEstimator.create(ssc.conf, ssc.graph.batchDuration)))
    } else {
      None
    }
  }
...
}

本節(jié)中接下來關(guān)注ReceiverRateController的實現(xiàn)艺配。

  1. RateController工作原理
    上面說到RateController實現(xiàn)了StreamingListener,并且只關(guān)注StreamingListenerBatchCompleted衍慎,該事件發(fā)生時转唉,會調(diào)用RateController # onBatchCompleted方法,方法體如下:
 override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
    val elements = batchCompleted.batchInfo.streamIdToInputInfo

    for {
      processingEnd <- batchCompleted.batchInfo.processingEndTime
      workDelay <- batchCompleted.batchInfo.processingDelay
      waitDelay <- batchCompleted.batchInfo.schedulingDelay
      elems <- elements.get(streamUID).map(_.numRecords)
    } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
  }
}

上面方法拿到stream job結(jié)束時間processingEnd西饵, job真實運行時間workDelay酝掩, job從提交到結(jié)束時間waitDelay, 本次job處理記錄數(shù)elems眷柔,
然后調(diào)用computeAndPublish計算期虾,computeAndPublish方法如下:

private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
    Future[Unit] {
     // 使用rateEstimator來計算接下來的接收速率,
      val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
      newRate.foreach { s =>
        rateLimit.set(s.toLong)
        // publish是一個抽象方法驯嘱,也就是將計算出來的速率通知出處
        publish(getLatestRate())
      }
}

下面是ReceiverRateController的publish的實現(xiàn):

override def publish(rate: Long): Unit =
     // 通過JobScheduler的receiverTracker將計算出來的速率通知給所有的Receiver
      ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)

附:ReceiverTracker
這里順帶提一下ReceiverTracker中涉及的反壓過程的成員或方法吧:
(這里涉及到spark rpc镶苞,可以參考spark rpc)

class ReceiverTracker{
   // 這個rpc一端是運行在JobScheduler上的,負(fù)責(zé)接收Receiver傳過來的消息鞠评,
  // 它是類ReceiverTrackerEndpoint(ReceiverTracker的內(nèi)部類)的實例
   private var endpoint: RpcEndpointRef = null
   // stream id到Receiver的信息茂蚓,其中就包含了receiver的rpc通信信息
   private val receiverTrackingInfos = new HashMap[Int, ReceiverTrackingInfo]

   // 上面ReceiverRateController # push就是調(diào)用這個方法去通知receiver新的速率的
   def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
    if (isTrackerStarted) {
      // 這個enpoint是運行在JobScheduler上的,也就是給自己發(fā)了一個UpdateReceiverRateLimit更新速率的事件
      endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
    }
  }
}

下面是ReceiverTrackerEndpoint接收到UpdateReceiverRateLimit消息時的處理:
def receive{
...
case UpdateReceiverRateLimit(streamUID, newRate) =>
        //  拿到receiver的rpc信息eP, 然后發(fā)送UpdateRateLimit(newRate)更新速率
        for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
          eP.send(UpdateRateLimit(newRate))
        }
...
}

RateController更新速率以及通知新接收速率過程就是這樣聋涨,接下來是Receiver如何去根據(jù)新的速率newRate控制接收速率晾浴。

2.2.2 Receiver控制速率

Receiver上運行了很多組件:

  • Receiver負(fù)責(zé)接收數(shù)據(jù)
  • 接收的數(shù)據(jù)上報給ReceiverSupervisorImpl
  • 如果接受的數(shù)據(jù)是一條條上報(調(diào)用方法ReceiverSupervisorImpl # putSingle), 則ReceiverSupervisorImpl 使用BlockGenerator用來將一條條的記錄匯聚成block(如果Receiver一次接收并上報一批數(shù)據(jù)就不會使用BlockGenerator)

上面BlockGenerator將一條條數(shù)據(jù)匯聚成block牍白, Receiver上控制接受速率就是通過BlockGenerator處理速度來實現(xiàn)的脊凰,BlockGenerator阻塞了也就相當(dāng)于間接阻塞了Receiver接受速率。

但是上面說只有一條條接受的數(shù)據(jù)會走BlockGenerator茂腥,如果Receiver不使用
ReceiverSupervisorImpl # putSingle而是使用其他方法一次上報一批數(shù)據(jù)狸涌,其實反壓是不起作用的。

  1. ReceiverSupervisorImpl接收UpdateRateLimit消息
    下面是ReceiverSupervisorImpl接受消息的rpc端處理代碼:
 private val endpoint = env.rpcEnv.setupEndpoint(
    "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
      override val rpcEnv: RpcEnv = env.rpcEnv

      override def receive: PartialFunction[Any, Unit] = {
        case StopReceiver =>
          logInfo("Received stop signal")
          ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
        case CleanupOldBlocks(threshTime) =>
          logDebug("Received delete old batch signal")
          cleanupOldBlocks(threshTime)
        case UpdateRateLimit(eps) =>
          logInfo(s"Received a new rate limit: $eps.")
         // 通知所有的BlockGenerator去更新速率
          registeredBlockGenerators.asScala.foreach { bg =>
            bg.updateRate(eps)
          }
      }
    })
  1. BlockGenerator控制速率
    BlockGenerator采用令牌桶算法實現(xiàn)速率控制,原理簡介如下:

假設(shè)想把接收速率控制在m條記錄每秒最岗, 那么生產(chǎn)者只需要以恒定的速度每秒向桶中放m個令牌帕胆,數(shù)據(jù)接收者接收數(shù)據(jù)之前需要從桶中拿走一個令牌才能接收一條數(shù)據(jù),顯然數(shù)據(jù)接收速率不會超過m

來看看BlockGenerator是怎么實現(xiàn)的般渡,BlockGenerator實現(xiàn)了RateLimiter抽象類懒豹,下面是RateLimiter的部分實現(xiàn):

// 最大接收速率,接收數(shù)據(jù)速率由maxRateLimit和動態(tài)更新的反壓速率共同控制
private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
  // rateLimiter負(fù)責(zé)產(chǎn)生令牌
  private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)
 
 // BlockGenerator在存儲數(shù)據(jù)之前會調(diào)用這個方法,相當(dāng)于取令牌诊杆。
  def waitToPush() {
    rateLimiter.acquire()
  }

 // 更新速率歼捐,也就是令牌產(chǎn)生的速率
  private[receiver] def updateRate(newRate: Long): Unit =
    if (newRate > 0) {
      if (maxRateLimit > 0) {
        rateLimiter.setRate(newRate.min(maxRateLimit))
      } else {
        rateLimiter.setRate(newRate)
      }
 }

BlockGenerator的父類RateLimiter實現(xiàn)了令牌桶速率控制算法,接下來就是BlockGenerator在接收Receiver傳遞過來的數(shù)據(jù)時調(diào)用waitToPush去獲取令牌了晨汹,沒有令牌是豹储,BlockGenerator阻塞,那么Receiver也會阻塞下去淘这。

2.3 DirectKafkaInputDStream時的反壓實現(xiàn)

DirectKafkaInputDStream是沒有Receiver的剥扣。

  • 有Receiver的stream job, 數(shù)據(jù)從由Receiver接收铝穷,然后組裝成block然后匯報到JobScheduler钠怯,stream job提交運行前從JobScheduler拿一個block運行。

  • 但是對于DirectKafkaInputDStream而言曙聂,不存在獨立運行的Receiver接收數(shù)據(jù)晦炊,而是在從DirectKafkaInputDStream創(chuàng)建出KafkaRDD然后提交stream job運行時,在KafkaRDD的compute方法中才開始從kafka讀取數(shù)據(jù)宁脊。此時由于接收數(shù)據(jù)是在job開始運行后在task中進(jìn)行的断国,因此反壓實現(xiàn)也是通過控制本次task從kafka中讀取多少數(shù)據(jù)來實現(xiàn)的。

1. 計算接收速率
不貼代碼了榆苞,DirectKafkaInputDStream使用IDRateEstimator去評估每秒接收的數(shù)據(jù)量R稳衬,同時由于DirectKafkaInputDStream可以同時從一個topic的n個分區(qū)接收數(shù)據(jù),這個R是整個全部分區(qū)的數(shù)據(jù)接收速度坐漏,在DirectKafkaInputDStream中還有一個重要的參數(shù)spark.streaming.kafka.maxRatePerPartition(每秒)控制每個分區(qū)的最大接收速度(假設(shè)是maxR)薄疚。

評估出R之后碧信,就是計算n個分區(qū)每個分區(qū)的速度了,這個不是簡單的R/n分的街夭,
而是根據(jù)之前每一個分區(qū)消費到的offset(假設(shè)是prefOffset)和現(xiàn)在每個分區(qū)的最新offset(假設(shè)是curOffset)的差值/ 總差值的比例計算出來的砰碴,

假設(shè)全部n個分區(qū)從上一batch消費之后到現(xiàn)在的整個未消費記錄數(shù)是totalLag,
分區(qū)n1未消費數(shù)是lagN1莱坎, 那么本次batch從分區(qū)n1應(yīng)該消費的記錄數(shù)就是:

(min( (lagN1 / totalLag) * R, maxR)) * batchDuration(換算成秒)

2. 控制速率
「1」中計算出每個分區(qū)最大消費記錄數(shù)m衣式,接下來從DirectKafkaInputDStream生成KafkaRDD(KafkaRDD的分區(qū)個數(shù)也就是topic的分區(qū)個數(shù)), 然后提交job運行后KafkaRDD的compute方法開始從kafka消費m條記錄檐什。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市弱卡,隨后出現(xiàn)的幾起案子乃正,更是在濱河造成了極大的恐慌,老刑警劉巖婶博,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件瓮具,死亡現(xiàn)場離奇詭異,居然都是意外死亡凡人,警方通過查閱死者的電腦和手機名党,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來挠轴,“玉大人传睹,你說我怎么就攤上這事“痘蓿” “怎么了欧啤?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長启上。 經(jīng)常有香客問我邢隧,道長,這世上最難降的妖魔是什么冈在? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任倒慧,我火速辦了婚禮,結(jié)果婚禮上包券,老公的妹妹穿的比我還像新娘纫谅。我一直安慰自己,他們只是感情好兴使,可當(dāng)我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布系宜。 她就那樣靜靜地躺著,像睡著了一般发魄。 火紅的嫁衣襯著肌膚如雪盹牧。 梳的紋絲不亂的頭發(fā)上俩垃,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天,我揣著相機與錄音汰寓,去河邊找鬼口柳。 笑死,一個胖子當(dāng)著我的面吹牛有滑,可吹牛的內(nèi)容都是我干的跃闹。 我是一名探鬼主播,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼毛好,長吁一口氣:“原來是場噩夢啊……” “哼望艺!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起肌访,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤找默,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后吼驶,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體惩激,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年蟹演,在試婚紗的時候發(fā)現(xiàn)自己被綠了风钻。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡酒请,死狀恐怖骡技,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蚌父,我是刑警寧澤哮兰,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站苟弛,受9級特大地震影響喝滞,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜膏秫,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一右遭、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧缤削,春花似錦窘哈、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至帅刀,卻和暖如春让腹,著一層夾襖步出監(jiān)牢的瞬間远剩,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工骇窍, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留瓜晤,地道東北人。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓腹纳,卻偏偏與公主長得像痢掠,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子嘲恍,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,033評論 2 355

推薦閱讀更多精彩內(nèi)容