—————?—————?—————?—————?—————?—————
Spark Streaming概述
Spark Streaming 初始化過程
Spark Streaming Receiver啟動(dòng)過程分析
Spark Streaming 數(shù)據(jù)準(zhǔn)備階段分析(Receiver方式)
Spark Streaming 數(shù)據(jù)計(jì)算階段分析
SparkStreaming Backpressure分析
Spark Streaming Executor DynamicAllocation 機(jī)制分析
—————?—————?—————?—————?—————?—————
1、引入Backpressure的原因
默認(rèn)情況下棒仍,Spark Streaming通過Receiver以生產(chǎn)者生產(chǎn)數(shù)據(jù)的速率接收數(shù)據(jù)澳淑,計(jì)算過程中會(huì)出現(xiàn)batch processing time > batch interval的情況,其中batch processing time為實(shí)際計(jì)算一個(gè)批次花費(fèi)時(shí)間滥玷,batch interval為Streaming應(yīng)用設(shè)置的批處理間隔辟癌。這意味著Spark Streaming的數(shù)據(jù)接收速率高于Spark從隊(duì)列中移除數(shù)據(jù)的速率寒屯,也就是數(shù)據(jù)處理能力低,在設(shè)置間隔內(nèi)不能完全處理當(dāng)前接收速率接收的數(shù)據(jù)。如果這種情況持續(xù)過長(zhǎng)的時(shí)間寡夹,會(huì)造成數(shù)據(jù)在內(nèi)存中堆積处面,導(dǎo)致Receiver所在Executor內(nèi)存溢出等問題(如果設(shè)置StorageLevel包含disk,則內(nèi)存存放不下的數(shù)據(jù)會(huì)溢寫至disk,加大延遲)。Spark 1.5以前版本菩掏,用戶如果要限制Receiver的數(shù)據(jù)接收速率魂角,可以通過設(shè)置靜態(tài)配制參數(shù) “spark.streaming.receiver.maxRate”的值來實(shí)現(xiàn),此舉雖然可以通過限制接收速率智绸,來適配當(dāng)前的處理能力野揪,防止內(nèi)存溢出,但也會(huì)引入其它問題瞧栗。比如:producer數(shù)據(jù)生產(chǎn)高于maxRate斯稳,當(dāng)前集群處理能力也高于maxRate,這就會(huì)造成資源利用率下降等問題迹恐。為了更好的協(xié)調(diào)數(shù)據(jù)接收速率與資源處理能力平挑,Spark Streaming從v1.5開始引入反壓機(jī)制(back-pressure),通過動(dòng)態(tài)控制數(shù)據(jù)接收速率來適配集群數(shù)據(jù)處理能力。
2系草、BackPressure架構(gòu)模型
Spark Streaming Backpressure:根據(jù)JobScheduler反饋?zhàn)鳂I(yè)的執(zhí)行信息來動(dòng)態(tài)調(diào)整Receiver數(shù)據(jù)接收率。通過屬性“spark.streaming.backpressure.enabled
”來控制是否啟用backpressure機(jī)制唆涝,默認(rèn)值false找都,即不啟用。
2.1 Spark Streaming架構(gòu)
Spark Streaming架構(gòu)如下圖所示(對(duì)其詳細(xì)解析廊酣,參見"Spark Streaming 數(shù)據(jù)準(zhǔn)備階段分析"和"Spark Streaming 數(shù)據(jù)計(jì)算階段分析"
2.2 BackPressure執(zhí)行過程
在原架構(gòu)的基礎(chǔ)上加上一個(gè)新的組件RateController,這個(gè)組件負(fù)責(zé)監(jiān)聽“OnBatchCompleted”事件能耻,然后從中抽取processingDelay 及schedulingDelay信息. Estimator依據(jù)這些信息估算出最大處理速度(rate),最后由基于Receiver的Input Stream將rate通過ReceiverTracker與ReceiverSupervisorImpl轉(zhuǎn)發(fā)給BlockGenerator(繼承自RateLimiter).
3亡驰、BackPressure 源碼解析
3.1 RateController類體系結(jié)構(gòu)
RateController繼承自StreamingListener.用于處理BatchCompleted事件晓猛。
其實(shí)類繼承結(jié)構(gòu)如下代碼所示:
/**
* A StreamingListener that receives batch completion updates, and maintains
* an estimate of the speed at which this stream should ingest messages,
* given an estimate computation from a `RateEstimator`
*/
private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
extends StreamingListener with Serializable {
}
3.2 RateController的注冊(cè)
JobScheduler啟動(dòng)時(shí)會(huì)抽取在DStreamGraph中注冊(cè)的所有InputDstream中的rateController,并向ListenerBus注冊(cè)并開啟監(jiān)聽凡辱。
// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start()
3.3 BackPressure 執(zhí)行過程分析
BackPressure 執(zhí)行過程分為BatchCompleted事件觸發(fā)時(shí)機(jī)和事件處理兩個(gè)過程:
- BatchCompleted觸發(fā)過程
- BatchCompleted事件處理過程
3.3.1 BatchCompleted觸發(fā)過程
對(duì)BatchedCompleted的分析戒职,應(yīng)該從JobScheduler入手,因?yàn)锽atchedCompleted是批次處理結(jié)束的標(biāo)志透乾,也就是JobScheduler調(diào)度的作業(yè)執(zhí)行完成時(shí)觸發(fā)的洪燥,因此進(jìn)行作業(yè)調(diào)度執(zhí)行分析。
JobGenerater在調(diào)用generateJobs()方法生成Job后乳乌,會(huì)使用JobScheduler的submitJobSet方法對(duì)Job進(jìn)行提交. submitJobSet的具體實(shí)現(xiàn)如下:
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
其中捧韵,jobSet中的Job將通過jobExecutor進(jìn)行處理却紧,對(duì)Job進(jìn)行處理的處理器為JobHandler寿烟。JobHandler用于執(zhí)行Job及處理Job執(zhí)行結(jié)果信息。當(dāng)Job執(zhí)行完成時(shí)會(huì)產(chǎn)生JobCompleted事件. JobHandler的具體邏輯如下面代碼所示:
private class JobHandler(job: Job) extends Runnable with Logging {
import JobScheduler._
def run() {
val oldProps = ssc.sparkContext.getLocalProperties
try {
logInfo("Handler job at " + job.time)
ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
val formattedTime = UIUtils.formatBatchTime(
job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"
ssc.sc.setJobDescription(
s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
// We need to assign `eventLoop` to a temp variable. Otherwise, because
// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
// it's possible that when `post` is called, `eventLoop` happens to null.
var _eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobStarted(job, clock.getTimeMillis()))
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
job.run()
}
_eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
}
} else {
// JobScheduler has been stopped.
}
} finally {
ssc.sparkContext.setLocalProperties(oldProps)
}
}
}
}
當(dāng)Job執(zhí)行完成時(shí)闻鉴,向eventLoop發(fā)送JobCompleted事件磷瘤。EventLoop事件處理器接到JobCompleted事件后將調(diào)用handleJobCompletion 來處理Job完成事件芒篷。handleJobCompletion使用Job執(zhí)行信息創(chuàng)建StreamingListenerBatchCompleted事件并通過StreamingListenerBus向監(jiān)聽器發(fā)送搜变。實(shí)現(xiàn)如下:
private def handleJobCompletion(job: Job, completedTime: Long) {
val jobSet = jobSets.get(job.time)
jobSet.handleJobCompletion(job)
job.setEndTime(completedTime)
listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
job.result match {
case Failure(e) =>
reportError("Error running job " + job, e)
case _ =>
}
}
3.3.2 BatchCompleted事件的處理過程
StreamingListenerBus將事件轉(zhuǎn)交給具體的StreamingListener,因此BatchCompleted將交由RateController進(jìn)行處理梭伐。RateController接到BatchCompleted事件后將調(diào)用onBatchCompleted對(duì)事件進(jìn)行處理痹雅。
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo
for {
processingEnd <- batchCompleted.batchInfo.processingEndTime
workDelay <- batchCompleted.batchInfo.processingDelay
waitDelay <- batchCompleted.batchInfo.schedulingDelay
elems <- elements.get(streamUID).map(_.numRecords)
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}
onBatchCompleted會(huì)從完成的任務(wù)中抽取任務(wù)的執(zhí)行延遲和調(diào)度延遲,然后用這兩個(gè)參數(shù)用RateEstimator(目前存在唯一實(shí)現(xiàn)PIDRateEstimator糊识,proportional-integral-derivative (PID) controller绩社,PID控制器)估算出新的rate并發(fā)布。代碼如下:
/**
* Compute the new rate limit and publish it asynchronously.
*/
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
newRate.foreach { s =>
rateLimit.set(s.toLong)
publish(getLatestRate())
}
}
其中publish()由RateController的子類ReceiverRateController來定義赂苗。具體邏輯如下(ReceiverInputDStream中定義):
/**
* A RateController that sends the new rate to receivers, via the receiver tracker.
*/
private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
extends RateController(id, estimator) {
override def publish(rate: Long): Unit =
ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
}
publish的功能為新生成的rate借助ReceiverTracker進(jìn)行轉(zhuǎn)發(fā)愉耙。ReceiverTracker將rate包裝成UpdateReceiverRateLimit事件并發(fā)送給ReceiverTrackerEndpoint.
/** Update a receiver's maximum ingestion rate */
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
if (isTrackerStarted) {
endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
}
}
ReceiverTrackerEndpoint接到消息后,其將會(huì)從receiverTrackingInfos列表中獲取Receiver注冊(cè)時(shí)使用的endpoint(實(shí)為ReceiverSupervisorImpl)拌滋,再將rate包裝成UpdateLimit發(fā)送至endpoint.其接到信息后朴沿,使用updateRate更新BlockGenerators(RateLimiter子類),來計(jì)算出一個(gè)固定的令牌間隔。
/** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
private val endpoint = env.rpcEnv.setupEndpoint(
"Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
override val rpcEnv: RpcEnv = env.rpcEnv
override def receive: PartialFunction[Any, Unit] = {
case StopReceiver =>
logInfo("Received stop signal")
ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
case CleanupOldBlocks(threshTime) =>
logDebug("Received delete old batch signal")
cleanupOldBlocks(threshTime)
case UpdateRateLimit(eps) =>
logInfo(s"Received a new rate limit: $eps.")
registeredBlockGenerators.asScala.foreach { bg =>
bg.updateRate(eps)
}
}
})
其中RateLimiter的updateRate實(shí)現(xiàn)如下:
/**
* Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
* {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
*
* @param newRate A new rate in records per second. It has no effect if it's 0 or negative.
*/
private[receiver] def updateRate(newRate: Long): Unit =
if (newRate > 0) {
if (maxRateLimit > 0) {
rateLimiter.setRate(newRate.min(maxRateLimit))
} else {
rateLimiter.setRate(newRate)
}
}
其中rateLimiter的setRate的實(shí)現(xiàn)如下:
public final void setRate(double permitsPerSecond) {
Preconditions.checkArgument(permitsPerSecond > 0.0D && !Double.isNaN(permitsPerSecond), "rate must be positive");
Object var3 = this.mutex;
synchronized(this.mutex) {
this.resync(this.readSafeMicros());
double stableIntervalMicros = (double)TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
this.doSetRate(permitsPerSecond, stableIntervalMicros);
}
}
到此败砂,backpressure反壓機(jī)制調(diào)整rate結(jié)束赌渣。
4、流量控制點(diǎn)(生效位置)
當(dāng)Receiver開始接收數(shù)據(jù)時(shí)昌犹,會(huì)通過supervisor.pushSingle()方法將接收的數(shù)據(jù)存入currentBuffer等待BlockGenerator定時(shí)將數(shù)據(jù)取走坚芜,包裝成block. 在將數(shù)據(jù)存放入currentBuffer之時(shí),要獲取許可(令牌)斜姥。如果獲取到許可就可以將數(shù)據(jù)存入buffer, 否則將被阻塞鸿竖,進(jìn)而阻塞Receiver從數(shù)據(jù)源拉取數(shù)據(jù)。
/**
* Push a single data item into the buffer.
*/
def addData(data: Any): Unit = {
if (state == Active) {
waitToPush()
synchronized {
if (state == Active) {
currentBuffer += data
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
其令牌投放采用令牌桶機(jī)制進(jìn)行铸敏, 原理如下圖所示:
令牌桶機(jī)制: 大小固定的令牌桶可自行以恒定的速率源源不斷地產(chǎn)生令牌缚忧。如果令牌不被消耗,或者被消耗的速度小于產(chǎn)生的速度杈笔,令牌就會(huì)不斷地增多闪水,直到把桶填滿。后面再產(chǎn)生的令牌就會(huì)從桶中溢出蒙具。最后桶中可以保存的最大令牌數(shù)永遠(yuǎn)不會(huì)超過桶的大小敦第。當(dāng)進(jìn)行某操作時(shí)需要令牌時(shí)會(huì)從令牌桶中取出相應(yīng)的令牌數(shù),如果獲取到則繼續(xù)操作店量,否則阻塞芜果。用完之后不用放回。
Streaming 數(shù)據(jù)流被Receiver接收后融师,按行解析后存入iterator中右钾。然后逐個(gè)存入Buffer,在存入buffer時(shí)會(huì)先獲取token,如果沒有token存在舀射,則阻塞窘茁;如果獲取到則將數(shù)據(jù)存入buffer. 然后等價(jià)后續(xù)生成block操作。
本文最初是本人發(fā)在博客園(http://www.cnblogs.com/barrenlake/p/5349949.html)