1. 前言
Spark Streaming在處理不斷流入的數(shù)據(jù)時通過每間隔一段時間(batch interval)將這段時間內(nèi)的流入的數(shù)據(jù)積累為一個batch仿耽,然后以這個batch內(nèi)的數(shù)據(jù)作為job DAG的輸入rdd提交新的job運行。當(dāng)一個batch的的處理時間大于batch interval時开缎,意味著數(shù)據(jù)處理速度跟不上數(shù)據(jù)接收速度奕删,此時在數(shù)據(jù)接收端(即Receiver一般數(shù)據(jù)接收端都運行在executor上)就會積累數(shù)據(jù),而數(shù)據(jù)是通過BlockManager管理的侮邀,如果數(shù)據(jù)存儲采取MEMORY_ONLY模式就會導(dǎo)致OOM铝宵,采取MEMORY_AND_DISK多余的數(shù)據(jù)保存到磁盤上反而會增加數(shù)據(jù)讀取時間华畏。
說到這里鹏秋,反壓實際就是一種根據(jù)當(dāng)前系統(tǒng)的處理能力來動態(tài)調(diào)節(jié)接收數(shù)據(jù)速率的功能。
2. 反壓
前言中提到數(shù)據(jù)接收端Receiver亡笑,可以參考文章Spark stream receiver,簡單說就是stream job運行期間會有一個或者多個Receiver運行在Executor上專門接收數(shù)據(jù)侣夷,并以batch interval為時間間隔將流式數(shù)據(jù)分割為一個個batch,然后以一個batch的數(shù)據(jù)啟動job。但是stream job中Receiver并不是必然存在的仑乌,例外的情況是當(dāng)數(shù)據(jù)源是kafka時百拓,spark內(nèi)置了一種叫DirectKafkaInputDStream
的輸入源(可以通過KafkaUtils.createDirectStream(...)
創(chuàng)建),這種類型的InputDStream(輸入源會實現(xiàn)這個類)沒有Receiver晰甚。對于那些帶Receiver的InputDStream實現(xiàn)類衙传,當(dāng)從InputDStream創(chuàng)建RDD時,源頭RDD中的數(shù)據(jù)就是Receiver接收的數(shù)據(jù),而從DirectKafkaInputDStream
創(chuàng)建RDD時,數(shù)據(jù)實際上還沒有從kafka讀取過來,這個時候的RDD只包含了kafka的topic以及offset信息,等到rdd對應(yīng)的task運行時才從kafka中獲取數(shù)據(jù)。
由于存在有Receiver和沒有兩種情況比庄,實際上反壓的實現(xiàn)也不一樣净神。有Receiver時控制Receiver接收數(shù)據(jù)的速率就可以了,沒有Receiver的DirectKafkaInputDStream
時的實現(xiàn)會在后文單獨提一下。
附
關(guān)于spark stream的原理可以參考:
- Spark Streaming(1) - 基本原理
- Spark Streaming(2) - JobScheduler岖寞、JobGenerator
- Spark Streaming(3) - Receiver和ReceiverTacker
2.1 開啟反壓
指定配置spark.streaming.backpressure.enabled
為true即可開啟反壓。
2.2 有Receiver時反壓原理
反壓的原理是根據(jù)之前系統(tǒng)的處理能力來調(diào)節(jié)未來系統(tǒng)接收數(shù)據(jù)速率,它的過程是下面這樣的:
stream (stream id標(biāo)志)里所有job完成后
-> 反饋運行信息(包括威始,開始結(jié)束時間脓斩、處理本次處理記錄數(shù)等信息) 給JobScheduler
-> JobScheduler將信息交給RateController(通過一個job成功事件,下文會說到) ,RateController根據(jù)反饋信息計算接下來應(yīng)該控制住Receiver接收多少條數(shù)據(jù)
-> RateController委托JobScheduler的receiverTracker將的計算結(jié)果通知給所有在Executor上運行的Receiver
-> Receiever控制接收數(shù)據(jù)速率
上面過程中JobScheduler處在核心的位置绰垂,由它來負(fù)責(zé)協(xié)調(diào),接下來分別講述RateController念恍,以及Recceiver是如何控制速率的。
2.2.1 RateController
RateController實現(xiàn)了StreamingListener
, 它作為JobScheduler的lister恋沃,監(jiān)聽這個stream job的提交恕洲,開始運行户辞,以及完成双仍。其實它只關(guān)心StreamingListenerBatchCompleted
事件的發(fā)生(該事件表示任務(wù)成功執(zhí)行),這個事件包含了如下信息:
case class BatchInfo(
batchTime: Time,
streamIdToInputInfo: Map[Int, StreamInputInfo],
submissionTime: Long,
processingStartTime: Option[Long],
processingEndTime: Option[Long],
outputOperationInfos: Map[Int, OutputOperationInfo]
)
- RateController從何而來锐帜?
RateContoller 是InputDStream的成員,并在子類ReceiverInputDStream
中初始化了具體的實例肛跌,如下:
abstract class ReceiverInputDStream {
override protected[streaming] val rateController: Option[RateController] = {
// 開啟反壓的情況下創(chuàng)建了了ReceiverRateController
if (RateController.isBackPressureEnabled(ssc.conf)) {
Some(new ReceiverRateController(id, RateEstimator.create(ssc.conf, ssc.graph.batchDuration)))
} else {
None
}
}
...
}
本節(jié)中接下來關(guān)注ReceiverRateController
的實現(xiàn)艺配。
- RateController工作原理
上面說到RateController實現(xiàn)了StreamingListener
,并且只關(guān)注StreamingListenerBatchCompleted
衍慎,該事件發(fā)生時转唉,會調(diào)用RateController # onBatchCompleted方法,方法體如下:
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo
for {
processingEnd <- batchCompleted.batchInfo.processingEndTime
workDelay <- batchCompleted.batchInfo.processingDelay
waitDelay <- batchCompleted.batchInfo.schedulingDelay
elems <- elements.get(streamUID).map(_.numRecords)
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}
}
上面方法拿到stream job結(jié)束時間processingEnd西饵, job真實運行時間workDelay酝掩, job從提交到結(jié)束時間waitDelay, 本次job處理記錄數(shù)elems眷柔,
然后調(diào)用computeAndPublish計算期虾,computeAndPublish方法如下:
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
// 使用rateEstimator來計算接下來的接收速率,
val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
newRate.foreach { s =>
rateLimit.set(s.toLong)
// publish是一個抽象方法驯嘱,也就是將計算出來的速率通知出處
publish(getLatestRate())
}
}
下面是ReceiverRateController
的publish的實現(xiàn):
override def publish(rate: Long): Unit =
// 通過JobScheduler的receiverTracker將計算出來的速率通知給所有的Receiver
ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
附:ReceiverTracker
這里順帶提一下ReceiverTracker中涉及的反壓過程的成員或方法吧:
(這里涉及到spark rpc镶苞,可以參考spark rpc)
class ReceiverTracker{
// 這個rpc一端是運行在JobScheduler上的,負(fù)責(zé)接收Receiver傳過來的消息鞠评,
// 它是類ReceiverTrackerEndpoint(ReceiverTracker的內(nèi)部類)的實例
private var endpoint: RpcEndpointRef = null
// stream id到Receiver的信息茂蚓,其中就包含了receiver的rpc通信信息
private val receiverTrackingInfos = new HashMap[Int, ReceiverTrackingInfo]
// 上面ReceiverRateController # push就是調(diào)用這個方法去通知receiver新的速率的
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
if (isTrackerStarted) {
// 這個enpoint是運行在JobScheduler上的,也就是給自己發(fā)了一個UpdateReceiverRateLimit更新速率的事件
endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
}
}
}
下面是ReceiverTrackerEndpoint接收到UpdateReceiverRateLimit消息時的處理:
def receive{
...
case UpdateReceiverRateLimit(streamUID, newRate) =>
// 拿到receiver的rpc信息eP, 然后發(fā)送UpdateRateLimit(newRate)更新速率
for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
eP.send(UpdateRateLimit(newRate))
}
...
}
RateController更新速率以及通知新接收速率過程就是這樣聋涨,接下來是Receiver如何去根據(jù)新的速率newRate控制接收速率晾浴。
2.2.2 Receiver控制速率
Receiver上運行了很多組件:
- Receiver負(fù)責(zé)接收數(shù)據(jù)
- 接收的數(shù)據(jù)上報給ReceiverSupervisorImpl
- 如果接受的數(shù)據(jù)是一條條上報(調(diào)用方法ReceiverSupervisorImpl # putSingle), 則ReceiverSupervisorImpl 使用BlockGenerator用來將一條條的記錄匯聚成block(如果Receiver一次接收并上報一批數(shù)據(jù)就不會使用BlockGenerator)
上面BlockGenerator將一條條數(shù)據(jù)匯聚成block牍白, Receiver上控制接受速率就是通過BlockGenerator處理速度來實現(xiàn)的脊凰,BlockGenerator阻塞了也就相當(dāng)于間接阻塞了Receiver接受速率。
但是上面說只有一條條接受的數(shù)據(jù)會走BlockGenerator茂腥,如果Receiver不使用
ReceiverSupervisorImpl # putSingle
而是使用其他方法一次上報一批數(shù)據(jù)狸涌,其實反壓是不起作用的。
- ReceiverSupervisorImpl接收UpdateRateLimit消息
下面是ReceiverSupervisorImpl接受消息的rpc端處理代碼:
private val endpoint = env.rpcEnv.setupEndpoint(
"Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
override val rpcEnv: RpcEnv = env.rpcEnv
override def receive: PartialFunction[Any, Unit] = {
case StopReceiver =>
logInfo("Received stop signal")
ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
case CleanupOldBlocks(threshTime) =>
logDebug("Received delete old batch signal")
cleanupOldBlocks(threshTime)
case UpdateRateLimit(eps) =>
logInfo(s"Received a new rate limit: $eps.")
// 通知所有的BlockGenerator去更新速率
registeredBlockGenerators.asScala.foreach { bg =>
bg.updateRate(eps)
}
}
})
- BlockGenerator控制速率
BlockGenerator采用令牌桶算法實現(xiàn)速率控制,原理簡介如下:
假設(shè)想把接收速率控制在m條記錄每秒最岗, 那么生產(chǎn)者只需要以恒定的速度每秒向桶中放m個令牌帕胆,數(shù)據(jù)接收者接收數(shù)據(jù)之前需要從桶中拿走一個令牌才能接收一條數(shù)據(jù),顯然數(shù)據(jù)接收速率不會超過m
來看看BlockGenerator是怎么實現(xiàn)的般渡,BlockGenerator實現(xiàn)了RateLimiter抽象類懒豹,下面是RateLimiter的部分實現(xiàn):
// 最大接收速率,接收數(shù)據(jù)速率由maxRateLimit和動態(tài)更新的反壓速率共同控制
private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
// rateLimiter負(fù)責(zé)產(chǎn)生令牌
private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)
// BlockGenerator在存儲數(shù)據(jù)之前會調(diào)用這個方法,相當(dāng)于取令牌诊杆。
def waitToPush() {
rateLimiter.acquire()
}
// 更新速率歼捐,也就是令牌產(chǎn)生的速率
private[receiver] def updateRate(newRate: Long): Unit =
if (newRate > 0) {
if (maxRateLimit > 0) {
rateLimiter.setRate(newRate.min(maxRateLimit))
} else {
rateLimiter.setRate(newRate)
}
}
BlockGenerator的父類RateLimiter實現(xiàn)了令牌桶速率控制算法,接下來就是BlockGenerator在接收Receiver傳遞過來的數(shù)據(jù)時調(diào)用waitToPush
去獲取令牌了晨汹,沒有令牌是豹储,BlockGenerator阻塞,那么Receiver也會阻塞下去淘这。
2.3 DirectKafkaInputDStream時的反壓實現(xiàn)
DirectKafkaInputDStream是沒有Receiver的剥扣。
有Receiver的stream job, 數(shù)據(jù)從由Receiver接收铝穷,然后組裝成block然后匯報到JobScheduler钠怯,stream job提交運行前從JobScheduler拿一個block運行。
但是對于DirectKafkaInputDStream而言曙聂,不存在獨立運行的Receiver接收數(shù)據(jù)晦炊,而是在從DirectKafkaInputDStream創(chuàng)建出
KafkaRDD
然后提交stream job運行時,在KafkaRDD
的compute方法中才開始從kafka讀取數(shù)據(jù)宁脊。此時由于接收數(shù)據(jù)是在job開始運行后在task中進(jìn)行的断国,因此反壓實現(xiàn)也是通過控制本次task從kafka中讀取多少數(shù)據(jù)來實現(xiàn)的。
1. 計算接收速率
不貼代碼了榆苞,DirectKafkaInputDStream
使用IDRateEstimator去評估每秒接收的數(shù)據(jù)量R稳衬,同時由于DirectKafkaInputDStream
可以同時從一個topic的n個分區(qū)接收數(shù)據(jù),這個R是整個全部分區(qū)的數(shù)據(jù)接收速度坐漏,在DirectKafkaInputDStream
中還有一個重要的參數(shù)spark.streaming.kafka.maxRatePerPartition
(每秒)控制每個分區(qū)的最大接收速度(假設(shè)是maxR)薄疚。
評估出R之后碧信,就是計算n個分區(qū)每個分區(qū)的速度了,這個不是簡單的R/n分的街夭,
而是根據(jù)之前每一個分區(qū)消費到的offset(假設(shè)是prefOffset)和現(xiàn)在每個分區(qū)的最新offset(假設(shè)是curOffset)的差值/ 總差值的比例計算出來的砰碴,
假設(shè)全部n個分區(qū)從上一batch消費之后到現(xiàn)在的整個未消費記錄數(shù)是totalLag,
分區(qū)n1未消費數(shù)是lagN1莱坎, 那么本次batch從分區(qū)n1應(yīng)該消費的記錄數(shù)就是:
(min( (lagN1 / totalLag) * R, maxR)) * batchDuration(換算成秒)
2. 控制速率
「1」中計算出每個分區(qū)最大消費記錄數(shù)m衣式,接下來從DirectKafkaInputDStream
生成KafkaRDD
(KafkaRDD的分區(qū)個數(shù)也就是topic的分區(qū)個數(shù)), 然后提交job運行后KafkaRDD
的compute方法開始從kafka消費m條記錄檐什。