—————?—————?—————?—————?—————?—————
Spark Streaming概述
Spark Streaming 初始化過程
Spark Streaming Receiver啟動(dòng)過程分析
Spark Streaming 數(shù)據(jù)準(zhǔn)備階段分析(Receiver方式)
Spark Streaming 數(shù)據(jù)計(jì)算階段分析
SparkStreaming Backpressure分析
Spark Streaming Executor DynamicAllocation 機(jī)制分析
—————?—————?—————?—————?—————?—————
Spark Streaming是一種構(gòu)建在Spark上的實(shí)時(shí)計(jì)算框架柜与。Spark Streaming應(yīng)用以Spark應(yīng)用的方式提交到Spark平臺(tái)场躯,其組件以長(zhǎng)期批處理任務(wù)的形式在Spark平臺(tái)運(yùn)行。這些任務(wù)主要負(fù)責(zé)接收實(shí)時(shí)數(shù)據(jù)流及定期產(chǎn)生批作業(yè)并提交至Spark集群旅挤,本文要說明的是以下幾個(gè)功能模塊運(yùn)行前的準(zhǔn)備工作踢关。
- 數(shù)據(jù)接收
- Job 生成
- 流量控制
- 動(dòng)態(tài)資源伸縮
下面我們以WordCount程序?yàn)槔治鯯park Streaming運(yùn)行環(huán)境的初始化過程。
val conf = new SparkConf().setAppName("wordCount").setMaster("local[4]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.socketTextStream("localhost", 8585, StorageLevel.MEMORY_ONLY)
val words = lines.flatMap(_.split(" ")).map(w => (w,1))
val wordCount = words.reduceByKey(_+_)
wordCount.print
ssc.start()
ssc.awaitTermination()
以下流程粘茄,皆以上述WordCount源碼為例签舞。
1、StreamingContext的初始化過程
StreamingContext是Spark Streaming應(yīng)用的執(zhí)行環(huán)境柒瓣,其定義很多Streaming功能的入口儒搭,如:它提供從多種數(shù)據(jù)源創(chuàng)建DStream的方法等。
在創(chuàng)建Streaming應(yīng)用時(shí)芙贫,首先應(yīng)創(chuàng)建StreamingContext(WordCount應(yīng)用可知)搂鲫,伴隨StreamingContext的創(chuàng)建將會(huì)創(chuàng)建以下主要組件:
1.1 DStreamGraph
DStreamGraph的主要功能是記錄InputDStream及OutputStream及從InputDStream中抽取出ReceiverInputStreams。因?yàn)镈Stream之間的依賴關(guān)系類似于RDD磺平,并在任務(wù)執(zhí)行時(shí)轉(zhuǎn)換成RDD魂仍,因此,可以認(rèn)為DStream Graph與RDD Graph存在對(duì)應(yīng)關(guān)系. 即:DStreamGraph以批處理間隔為周期轉(zhuǎn)換成RDDGraph.
- ReceiverInputStreams: 包含用于接收數(shù)據(jù)的Receiver信息拣挪,并在啟動(dòng)Receiver時(shí)提供相關(guān)信息
- OutputStream:每個(gè)OutputStream會(huì)在批作業(yè)生成時(shí)擦酌,生成一個(gè)Job.
1.2 JobScheduler
JobScheduler是Spark Streaming中最核心的組件,其負(fù)載Streaming各功作組件的啟動(dòng)菠劝。
- 數(shù)據(jù)接收
- Job 生成
- 流量控制
- 動(dòng)態(tài)資源伸縮
以及負(fù)責(zé)生成的批Job的調(diào)度及狀態(tài)管理工作赊舶。
2、 DStream的創(chuàng)建與轉(zhuǎn)換
StreamingContext初始化完畢后,通過調(diào)用其提供的創(chuàng)建InputDStream的方法創(chuàng)建SocketInputDStream.
SocketInputDStream的繼承關(guān)系為:
SocketInputDStream->ReceiverInputDStream->InputDStream->DStream.
在InputDStream中 提供如下功
ssc.graph.addInputStream(this)
JAVA中初始化子類時(shí)笼平,會(huì)先初始化其父類园骆。所以在創(chuàng)建SocketInputDStream時(shí),會(huì)先初始化InputDStream寓调,在InputDStream中實(shí)現(xiàn)將自身加入DStreamGraph中遇伞,以標(biāo)識(shí)其為輸入數(shù)據(jù)源。
DStream中算子的轉(zhuǎn)換捶牢,類似于RDD中的轉(zhuǎn)換鸠珠,都是延遲計(jì)算,僅形成pipeline鏈秋麸。當(dāng)上述應(yīng)用遇到print(Output算子)時(shí)渐排,會(huì)將DStream轉(zhuǎn)換為ForEachDStream,并調(diào)register方法作為OutputStream注冊(cè)到DStreamGraph的outputStreams列表,以待生成Job灸蟆。
print算子實(shí)現(xiàn)方法如下:
/**
* Print the first num elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("-------------------------------------------")
println(s"Time: $time")
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println
}
}
foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
* @param foreachFunc foreachRDD function
* @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
* in the `foreachFunc` to be displayed in the UI. If `false`, then
* only the scopes and callsites of `foreachRDD` will override those
* of the RDDs on the display.
*/
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
ForEachDStream 不同于其它DStream的地方為其重寫了generateJob方法驯耻,以使DStream Graph操作轉(zhuǎn)換成RDD Graph操作,并生成Job.
3炒考、SparkContext啟動(dòng)
/**
* Start the execution of the streams.
*
* @throws IllegalStateException if the StreamingContext is already stopped.
*/
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
scheduler.start()
}
state = StreamingContextState.ACTIVE
} catch {
case NonFatal(e) =>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state = StreamingContextState.STOPPED
throw e
}
StreamingContext.setActiveContext(this)
}
......
case ACTIVE =>
logWarning("StreamingContext has already been started")
case STOPPED =>
throw new IllegalStateException("StreamingContext has already been stopped")
}
}
在此方法中可缚,最核心的代碼是以線程的方式啟動(dòng)JobScheduler,從而開啟各功能組件斋枢。
3.1 JobScheduler的啟動(dòng)
JobScheduler主要負(fù)責(zé)以下幾種任務(wù):
- 數(shù)據(jù)接收相關(guān)組件的初始化及啟動(dòng)
ReceiverTracker的初始化及啟動(dòng)帘靡。ReceiverTracker負(fù)責(zé)管理Receiver,包括Receiver的啟停瓤帚,狀態(tài)維護(hù) 等描姚。 - Job生成相關(guān)組件的啟動(dòng)
JobGenerator的啟動(dòng)。JobGenerator負(fù)責(zé)以BatchInterval為周期生成Job. - Streaming監(jiān)聽的注冊(cè)與啟動(dòng)
- 作業(yè)監(jiān)聽
- 反壓機(jī)制
BackPressure機(jī)制戈次,通過RateController控制數(shù)據(jù)攝取速率轩勘。 - Executor DynamicAllocation 的啟動(dòng)
Executor 動(dòng)態(tài)伸縮管理, 動(dòng)態(tài)增加或減少Executor,來達(dá)到使用系統(tǒng)穩(wěn)定運(yùn)行 或減少資源開銷的目的怯邪。 - Job的調(diào)度及狀態(tài)維護(hù)绊寻。
JobScheduler的start方法的代碼如下所示:
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()
// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
case _ => null
}
executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
executorAllocClient,
receiverTracker,
ssc.conf,
ssc.graph.batchDuration.milliseconds,
clock)
executorAllocationManager.foreach(ssc.addStreamingListener)
receiverTracker.start()
jobGenerator.start()
executorAllocationManager.foreach(_.start())
logInfo("Started JobScheduler")
}
代碼中存在的 eventLoop: EventLoop[JobSchedulerEvent]對(duì)象,用以接收和處理事件悬秉。調(diào)用者通過調(diào)用其post方法向事件隊(duì)列注冊(cè)事件澄步。EventLoop開始執(zhí)行時(shí),會(huì)開啟一deamon線程用于處理隊(duì)列中的事件搂捧。EventLoop是一個(gè)抽象類驮俗,JobScheduler中初始化EventLoop時(shí)實(shí)現(xiàn)了其OnReceive方法懂缕。該方法中指定接收的事件由processEvent(event)方法處理允跑。
小結(jié)
JobScheduler是Spark Streaming中核心的組件,在其開始執(zhí)行時(shí),會(huì)開啟數(shù)據(jù)接收相關(guān)組件及Job生成相關(guān)組件聋丝,從而使數(shù)據(jù)準(zhǔn)備和數(shù)據(jù)計(jì)算兩個(gè)流程開始工作索烹。
另外,其還負(fù)責(zé)BackPressure, Executor DynamicAllocation 等優(yōu)化機(jī)制的啟動(dòng)工作弱睦。
下面的章節(jié)百姓,將對(duì)數(shù)據(jù)準(zhǔn)備和數(shù)據(jù)計(jì)算階段的流程進(jìn)行分析,以及BackPressure, Executor DynamicAllocation 機(jī)制進(jìn)行分析况木。