目錄
流量控制簡介
在流式處理系統(tǒng)中短荐,流量控制(rate control/rate limit)是一個非常重要的話題斗塘。對系統(tǒng)進行流控,主要目的是為了保證運行的穩(wěn)定性,防止突發(fā)大流量造成整個系統(tǒng)的擾動(throttle),長時間或劇烈的擾動甚至會使系統(tǒng)宕機。另外,為了保證系統(tǒng)的吞吐量最大化,也需要設計合理的流控門檻落追,避免系統(tǒng)空轉使資源利用率降低。
Spark Streaming作為基于微批次(micro-batch)的流處理框架涯肩,其流量的理想狀態(tài)就是官方文檔中所說的“batches of data should be processed as fast as they are being generated”轿钠,即每一批次的處理時長batch_process_time需要小于(但是又比較接近)我們設定的批次間隔batch_interval巢钓。如果batch_process_time > batch_interval,說明程序的處理能力不足疗垛,積累的數據越來越多症汹,最終會造成Executor內存溢出。如果batch_process_time << batch_interval贷腕,說明系統(tǒng)有很長時間是空閑的背镇,應該適當提升流量。
Spark Streaming流控基本設置
Spark Streaming通過Executor里的Receiver組件源源不斷地接收外部數據泽裳,并通過BlockManager將外部數據轉化為Spark中的塊進行存儲瞒斩。Spark Streaming機制的簡單框圖如下所示。
要限制Receiver接收數據的速率涮总,可以在SparkConf中設置配置項spark.streaming.receiver.maxRate
胸囱,單位為數據條數/秒。如果采用的是基于Direct Stream方式的Kafka連接瀑梗,不經過Receiver烹笔,就得設置配置項spark.streaming.kafka.maxRatePerPartition
來限流,單位是每分區(qū)的數據條數/秒抛丽。
這兩種方式的優(yōu)點是設置非常簡單谤职,只需要通過實際業(yè)務的吞吐量估算一下使批次間隔和處理耗時基本達到平衡的速率就可以了。缺點是一旦業(yè)務量發(fā)生變化亿鲜,就只能手動修改參數并重啟Streaming程序允蜈。另外,人為估計的參數畢竟有可能不準蒿柳,設置得太激進或太保守都不好饶套。
所以,Spark后來提出了動態(tài)流量控制的方案其馏,能夠根據當前系統(tǒng)的處理速度智能地調節(jié)流量閾值,名為反壓(back pressure)機制爆安。其在1.5版本開始加入叛复,ASF JIRA中對應的issue是SPARK-7398。要啟用它扔仓,只需要將配置項spark.streaming.backpressure.enabled
設為true就可以(默認值為false)褐奥。
反壓機制看似簡單,但它背后有一套非常精巧的控制邏輯翘簇,下面就來深入看一看撬码。
Spark Streaming反壓機制的具體實現(xiàn)
動態(tài)流量控制器
o.a.s.streaming.scheduler.RateController抽象類是動態(tài)流量控制的核心。其源碼不甚長版保,抄錄如下呜笑。
private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
extends StreamingListener with Serializable {
init()
protected def publish(rate: Long): Unit
@transient
implicit private var executionContext: ExecutionContext = _
@transient
private var rateLimit: AtomicLong = _
private def init() {
executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update"))
rateLimit = new AtomicLong(-1L)
}
private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
ois.defaultReadObject()
init()
}
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
newRate.foreach { s =>
rateLimit.set(s.toLong)
publish(getLatestRate())
}
}
def getLatestRate(): Long = rateLimit.get()
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo
for {
processingEnd <- batchCompleted.batchInfo.processingEndTime
workDelay <- batchCompleted.batchInfo.processingDelay
waitDelay <- batchCompleted.batchInfo.schedulingDelay
elems <- elements.get(streamUID).map(_.numRecords)
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}
}
可見夫否,RateController抽象類繼承自StreamingListener特征,表示它是一個Streaming監(jiān)聽器叫胁。在之前的Spark Core源碼精讀系列文章中已經講過了監(jiān)聽器和事件總線機制凰慈,因此不再多說了。
RateController的主要工作如下:
- 監(jiān)聽StreamingListenerBatchCompleted事件驼鹅,該事件表示一個批次已經處理完成微谓。
- 從該事件的BatchInfo實例中取得:處理完成的時間戳processingEndTime、實際處理時長processingDelay(從批次的第一個job開始處理到最后一個job處理完成經過的時間)输钩、調度時延schedulingDelay(從批次被提交給Streaming JobScheduler到第一個job開始處理經過的時間)豺型。
- 另外從事件的StreamInputInfo實例中取得批次輸入數據的條數numRecords。
- 將取得的以上4個參數傳遞給速率估算器RateEstimator买乃,計算出新的流量閾值姻氨,并將其發(fā)布出去。
通過RateController的子類ReceiverRateController實現(xiàn)的publish()抽象方法可知为牍,新的流量閾值是發(fā)布給了ReceiverTracker哼绑。
private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
extends RateController(id, estimator) {
override def publish(rate: Long): Unit =
ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
}
不過下面先看速率估算器RateEstimator的實現(xiàn),稍后再回來看ReceiverTracker之后的事情碉咆。
基于PID機制的速率估算器
o.a.s.streaming.scheduler.rate.RateEstimator是一個很短的特征抖韩,其中只給出了計算流量閾值的方法compute()的定義。它還有一個伴生對象用于創(chuàng)建速率估算器的實例疫铜,其中寫出了更多關于反壓機制的配置參數茂浮。
object RateEstimator {
def create(conf: SparkConf, batchInterval: Duration): RateEstimator =
conf.get("spark.streaming.backpressure.rateEstimator", "pid") match {
case "pid" =>
val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)
new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate)
case estimator =>
throw new IllegalArgumentException(s"Unknown rate estimator: $estimator")
}
}
目前RateEstimator的唯一實現(xiàn)類是PIDRateEstimator,亦即spark.streaming.backpressure.rateEstimator
配置項的值只能為pid壳咕。其具體代碼如下席揽。
private[streaming] class PIDRateEstimator(
batchIntervalMillis: Long,
proportional: Double,
integral: Double,
derivative: Double,
minRate: Double
) extends RateEstimator with Logging {
private var firstRun: Boolean = true
private var latestTime: Long = -1L
private var latestRate: Double = -1D
private var latestError: Double = -1L
def compute(
time: Long,
numElements: Long,
processingDelay: Long,
schedulingDelay: Long
): Option[Double] = {
this.synchronized {
if (time > latestTime && numElements > 0 && processingDelay > 0) {
val delaySinceUpdate = (time - latestTime).toDouble / 1000
val processingRate = numElements.toDouble / processingDelay * 1000
val error = latestRate - processingRate
val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis
val dError = (error - latestError) / delaySinceUpdate
val newRate = (latestRate - proportional * error -
integral * historicalError -
derivative * dError).max(minRate)
latestTime = time
if (firstRun) {
latestRate = processingRate
latestError = 0D
firstRun = false
None
} else {
latestRate = newRate
latestError = error
Some(newRate)
}
} else {
None
}
}
}
}
PIDRateEstimator充分運用了工控領域中常見的PID控制器的思想。所謂PID控制器谓厘,即比例(Proportional)-積分(Integral)-微分(Derivative)控制器幌羞,本質上是一種反饋回路(loop feedback)。它把收集到的數據和一個設定值(setpoint)進行比較竟稳,然后用它們之間的差計算新的輸入值属桦,該輸入值可以讓系統(tǒng)數據盡量接近或者達到設定值。
下圖示出PID控制器的基本原理他爸。
亦即:
其中e(t)代表誤差聂宾,即設定值與回授值之間的差。也就是說诊笤,比例單元對應當前誤差系谐,積分單元對應過去累積誤差,而微分單元對應將來誤差讨跟〖退控制三個單元的增益因子分別為Kp鄙煤、Ki、Kd止喷。
回到PIDRateEstimator的源碼來馆类,對應以上的式子,我們可以得知:
- 處理速率的設定值其實就是上一批次的處理速率latestRate弹谁,回授值就是這一批次的速率processingRate乾巧,誤差error自然就是兩者之差。
- 過去累積誤差在這里體現(xiàn)為調度時延的過程中數據積壓的速度预愤,也就是schedulingDelay * processingRate / batchInterval沟于。
- 將來誤差就是上面算出的error對時間微分的結果。
將上面三者綜合起來植康,就可以根據Spark Streaming在上一批次以及這一批次的處理速率旷太,估算出一個合適的用于下一批次的流量閾值。比例增益Kp由spark.streaming.backpressure.pid.proportional
控制销睁,默認值1.0供璧;積分增益Ki由spark.streaming.backpressure.pid.integral
控制,默認值0.2冻记;微分增益Kd由spark.streaming.backpressure.pid.derived
控制睡毒,默認值0.0。
除了上述參數之外冗栗,還有兩個參數與反壓機制相關演顾。一是spark.streaming.backpressure.initialRate
,用于控制初始化時的處理速率隅居。二是spark.streaming.backpressure.pid.minRate
钠至,用于控制最小處理速率,默認值100條/秒胎源。
通過RPC發(fā)布流量閾值
回來看ReceiverTracker棉钧,顧名思義,它負責追蹤Receiver的狀態(tài)涕蚤。其sendRateUpdate()方法如下宪卿。
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
if (isTrackerStarted) {
endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
}
}
其中endpoint是RPC端點的引用,具體來說赞季,是ReceiverTrackerEndpoint的引用愧捕。這個方法會將流ID與新的流量閾值包裝在UpdateReceiverRateLimit消息中發(fā)送過去奢驯。
ReceiverTrackerEndpoint收到這條消息后申钩,會再將其包裝為UpdateRateLimit消息并發(fā)送給Receiver注冊時的RPC端點(位于ReceiverSupervisorImpl類中)。
private val endpoint = env.rpcEnv.setupEndpoint(
"Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
override val rpcEnv: RpcEnv = env.rpcEnv
override def receive: PartialFunction[Any, Unit] = {
case StopReceiver =>
logInfo("Received stop signal")
ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
case CleanupOldBlocks(threshTime) =>
logDebug("Received delete old batch signal")
cleanupOldBlocks(threshTime)
case UpdateRateLimit(eps) =>
logInfo(s"Received a new rate limit: $eps.")
registeredBlockGenerators.asScala.foreach { bg =>
bg.updateRate(eps)
}
}
})
可見瘪阁,收到該消息之后調用了BlockGenerator.updateRate()方法撒遣。BlockGenerator是RateLimiter的子類邮偎,它負責將收到的流數據轉化成塊存儲。updateRate()方法是在RateLimiter抽象類中實現(xiàn)的义黎。
private[receiver] def updateRate(newRate: Long): Unit =
if (newRate > 0) {
if (maxRateLimit > 0) {
rateLimiter.setRate(newRate.min(maxRateLimit))
} else {
rateLimiter.setRate(newRate)
}
}
這里最終借助了Guava中的限流器RateLimiter實現(xiàn)限流(Spark是不會重復造輪子的)禾进,其中maxRateLimit就是前面提到過的spark.streaming.receiver.maxRate
參數。至此廉涕,新的流量閾值就設置好了泻云。
以上就是與反壓機制有關的全部細節(jié),整個流程可以用下面的框圖表示狐蜕。
還有最后一個小問題宠纯,流量閾值設定好之后是如何生效的?這其實已經超出了本文的范疇层释,簡單看一下婆瓜。
借助Guava令牌桶完成流量控制
Receiver在收到一條數據之后,會調用BlockGenerator.addData()方法贡羔,將數據存入緩存廉白。然后再從緩存取數據,并包裝成一個個block乖寒。
def addData(data: Any): Unit = {
if (state == Active) {
waitToPush()
synchronized {
if (state == Active) {
currentBuffer += data
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
注意到在真正存入緩存之前猴蹂,先調用了waitToPush()方法,它本質上就是Guava的RateLimiter.acquire()方法宵统。
@CanIgnoreReturnValue
public double acquire() {
return acquire(1);
}
@CanIgnoreReturnValue
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
Guava的限流器是計算機網絡中經典限流方法——令牌桶(token bucket)算法的典型實現(xiàn)晕讲。acquire()方法的作用是從RateLimiter獲取一個令牌(這里叫permit),如果能夠取到令牌才將數據緩存马澈,如果不能取到令牌就會被阻塞瓢省。RateLimiter.setRate()方法就是通過改變向令牌桶中放入令牌的速率(參數名稱permitsPerSecond)來實現(xiàn)流量控制的。
關于令牌桶算法的細節(jié)痊班,可以參見英文維基勤婚,也可以參考Guava源碼,內容十分豐富涤伐。下圖只是一個簡單的示意馒胆。