jobGenerator做了哪些事情呢?
持有一個定時器實例
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
這個定時器會依據(jù)batchDuration提交GenerateJobs消息,也就是說每隔一個batch生成一組job
有一個方法接收GenerateJobs消息古瓤,并且執(zhí)行
/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time) => generateJobs(time)//當(dāng)前重點E恋āFг臁!
//下面是考點进陡,后面會講,重要的N⒎V壕巍!
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
/** Generate jobs and perform checkpointing for the given `time`. */
private def generateJobs(time: Time) {
//....
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // receiverTracker出現(xiàn)了以蕴!它來了糙麦!它把recevier接收到的數(shù)據(jù)block分配給具體的batch,上面講啦丛肮!
graph.generateJobs(time) //DStreamGraph通過其持有outputstreams來
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
do something
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
jobScheduler.receiverTracker.allocateBlocksToBatch(time)
graph.generateJobs(time)
在spark streaming源碼分析之job赡磅、rdd、blocks之間是如何對應(yīng)的宝与?會詳細(xì)解析這一段代碼