前言
Spark Streaming Job的生成是通過JobGenerator
每隔 batchDuration 長時(shí)間動(dòng)態(tài)生成的泳桦,每個(gè)batch 對(duì)應(yīng)提交一個(gè)JobSet琐馆,因?yàn)獒槍?duì)一個(gè)batch可能有多個(gè)輸出操作丹禀。
概述流程:
- 定時(shí)器定時(shí)向 eventLoop 發(fā)送生成job的請(qǐng)求
- 通過receiverTracker 為當(dāng)前batch分配block
- 為當(dāng)前batch生成對(duì)應(yīng)的 Jobs
- 將Jobs封裝成JobSet 提交執(zhí)行
入口
在 JobGenerator 初始化的時(shí)候就創(chuàng)建了一個(gè)定時(shí)器:
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
每隔 batchDuration 就會(huì)向 eventLoop 發(fā)送 GenerateJobs(new Time(longTime))消息棵红,eventLoop的事件處理方法中會(huì)調(diào)用generateJobs(time)方法:
case GenerateJobs(time) => generateJobs(time)
private def generateJobs(time: Time) {
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
為當(dāng)前batchTime分配Block
首先調(diào)用receiverTracker.allocateBlocksToBatch(time)
方法為當(dāng)前batchTime分配對(duì)應(yīng)的Block擅威,最終會(huì)調(diào)用receiverTracker
的Block管理者receivedBlockTracker
的allocateBlocksToBatch
方法:
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
}
} else {
logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
}
}
private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
}
可以看到是從streamIdToUnallocatedBlockQueues
中獲取到所有streamId對(duì)應(yīng)的未分配的blocks静汤,該隊(duì)列的信息是supervisor 存儲(chǔ)好Block后向receiverTracker上報(bào)的Block信息盆驹,詳情可見 ReceiverTracker 數(shù)據(jù)產(chǎn)生與存儲(chǔ)圆丹。
獲取到所有streamId對(duì)應(yīng)的未分配的blockInfos后,將其放入了timeToAllocatedBlocks:Map[Time, AllocatedBlocks]
中躯喇,后面生成RDD的時(shí)候會(huì)用到辫封。
為當(dāng)前batchTime生成Jobs
調(diào)用DStreamGraph
的generateJobs
方法為當(dāng)前batchTime生成job:
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
}
一個(gè)outputStream就對(duì)應(yīng)一個(gè)job,遍歷所有的outputStreams廉丽,為其生成job:
# ForEachDStream
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}
先獲取到time對(duì)應(yīng)的RDD倦微,然后將其作為參數(shù)再調(diào)用foreachFunc方法,foreachFunc方法是通過構(gòu)造器傳過來的正压,我們來看看print()輸出的情況:
def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("-------------------------------------------")
println(s"Time: $time")
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println
}
}
foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}
這里的構(gòu)造的foreachFunc方法就是最終和rdd一起提交job的執(zhí)行方法欣福,也即對(duì)rdd調(diào)用take()后并打印,真正觸發(fā)action操作的是在這個(gè)func函數(shù)里蔑匣,現(xiàn)在再來看看是怎么拿到rdd的劣欢,每個(gè)DStream都有一個(gè)generatedRDDs:Map[Time, RDD[T]]
變量,來保存time對(duì)應(yīng)的RDD裁良,若獲取不到則會(huì)通過compute()方法來計(jì)算凿将,對(duì)于需要在executor上啟動(dòng)Receiver來接收數(shù)據(jù)的ReceiverInputDStream來說:
override def compute(validTime: Time): Option[RDD[T]] = {
val blockRDD = {
if (validTime < graph.startTime) {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// driver failure without any write ahead log to recover pre-failure data.
new BlockRDD[T](ssc.sc, Array.empty)
} else {
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
// for this batch
val receiverTracker = ssc.scheduler.receiverTracker
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
// Register the input blocks information into InputInfoTracker
val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
// Create the BlockRDD
createBlockRDD(validTime, blockInfos)
}
}
Some(blockRDD)
}
會(huì)通過receiverTracker來獲取該batch對(duì)應(yīng)的blocks,前面已經(jīng)分析過為所有streamId分配了對(duì)應(yīng)的未分配的block价脾,并且放在了timeToAllocatedBlocks:Map[Time, AllocatedBlocks]
中牧抵,這里底層就是從這個(gè)timeToAllocatedBlocks
獲取到的blocksInfo,然后調(diào)用了createBlockRDD(validTime, blockInfos)
通過blockId創(chuàng)建了RDD。
最后犀变,將通過此RDD和foreachFun構(gòu)建jobFunc妹孙,并創(chuàng)建Job返回。
封裝jobs成JobSet并提交執(zhí)行
每個(gè)outputStream對(duì)應(yīng)一個(gè)Job获枝,最終就會(huì)生成一個(gè)jobs蠢正,為這個(gè)jobs創(chuàng)建JobSet,并通過jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
來提交這個(gè)JobSet:
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
然后通過jobExecutor來執(zhí)行省店,jobExecutor是一個(gè)線程池嚣崭,并行度默認(rèn)為1,可通過spark.streaming.concurrentJobs
配置懦傍,即同時(shí)可執(zhí)行幾個(gè)批次的數(shù)據(jù)雹舀。
處理類JobHandler中調(diào)用的是Job.run(),執(zhí)行的是前面構(gòu)建的 jobFunc 方法粗俱。