本文主要通過(guò)源碼來(lái)了解SparkStreaming程序從任務(wù)生成到任務(wù)完成整個(gè)執(zhí)行流程以及中間伴隨的checkpoint操作
注:下面源碼只貼出跟分析內(nèi)容有關(guān)的代碼椿肩,其他省略
1 結(jié)論先行
SparkStreaming的啟動(dòng)盖矫、任務(wù)生成、任務(wù)結(jié)束、Checkpoint操作流程如下:
- SparkStreamingContext.start() 啟動(dòng) JobScheduler
- JobScheduler的啟動(dòng)操作
- JobScheduler 創(chuàng)建了 EventLoop[JobSchedulerEvent] 來(lái)處理 JobStarted 和 JobCompleted 事件
- 啟動(dòng) JobGenerator
- JobGenerator 的啟動(dòng)操作
- JobGenerator 創(chuàng)建了 EventLoop[JobGeneratorEvent] 來(lái)處理 GenerateJobs湃密、ClearMetaData缎玫、DoCheckPoint和ClearCheckpointData 事件
- 創(chuàng)建RecurringTimer周期性地發(fā)送 GenerateJobs 事件用于任務(wù)的周期性創(chuàng)建和執(zhí)行
- JobGenerator的任務(wù)生成工作
- 調(diào)用 geneateJobs() 來(lái)生成 JobSet 并通過(guò) JobScheduler.submitJobset 進(jìn)行任務(wù)的提交
- submitJobset 將 Job 作為參數(shù)傳入 JobHandler 值骇,并將 JobHandler 丟進(jìn)線(xiàn)程池 JobExecutor 中執(zhí)行
- 發(fā)送 DoCheckPoint 事件進(jìn)行元數(shù)據(jù)的 checkpoint
- 調(diào)用 geneateJobs() 來(lái)生成 JobSet 并通過(guò) JobScheduler.submitJobset 進(jìn)行任務(wù)的提交
- 任務(wù)完成
- JobHandler 中任務(wù)完成之后會(huì)發(fā)送 JobCompleted 事件,JobScheduler.EventLoop 接收到該事件后會(huì)進(jìn)行處理掐禁,并且判斷 JobSet 全部完成之后,發(fā)送 ClearMetaData 事件买置,進(jìn)行數(shù)據(jù)的 checkpoint 或者刪除
2 具體分析
應(yīng)用程序入口:
val sparkConf = new SparkConf().setAppName("SparkStreaming")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong))
ssc.start()
ssc.awaitTermination()
一旦ssc.start()調(diào)用之后椅邓,程序便真正開(kāi)始運(yùn)行
第一步:
SparkStreamingContext.start()進(jìn)行如下主要工作:
- 調(diào)用JobScheduler.start()
- 發(fā)送StreamingListenerStreamingStarted消息
SparkStreamingContext.start()
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try{
...
scheduler.start()
}
state = StreamingContextState.ACTIVE
scheduler.listenerBus.post(
StreamingListenerStreamingStarted(System.currentTimeMillis()))
} catch {
...
}
StreamingContext.setActiveContext(this)
}
...
case ACTIVE =>
logWarning("StreamingContext has already been started")
case STOPPED =>
throw new IllegalStateException("StreamingContext has already been stopped")
}
}
第二步:
調(diào)用JobScheduler.start()執(zhí)行以下主要操作:
- 創(chuàng)建EventLoop用于處理接收到的JobSchedulerEvent笨使,processEvent就是實(shí)際的處理邏輯
- 調(diào)用jobGenerator.start()
JobScheduler.start():
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
//創(chuàng)建一個(gè)Event監(jiān)聽(tīng)器并啟動(dòng)
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()
...
//啟動(dòng)JobGenerator
jobGenerator.start()
...
}
第三步:
JobGenerator.start()執(zhí)行以下主要操作:
- 創(chuàng)建EventLoop[JobGeneratorEvent]用于處理JobGeneratorEvent事件
- 開(kāi)始執(zhí)行job的生成工作
- 創(chuàng)建一個(gè)timer周期地執(zhí)行eventLoop.post(GenerateJobs(new Time(longTime)))
- JobGenerator.start()中的EventLoop收到GenerateJobs事件后,去執(zhí)行g(shù)enerateJobs(time)
- generateJobs()中生成JobSet并調(diào)用jobScheduler.submitJobSet()進(jìn)行提交讯泣,然后發(fā)送一個(gè)DoCheckpointEvent進(jìn)行checkpoint
JobGenerator.start()
def start(): Unit = synchronized {
if (eventLoop != null) return // generator has already been started
//創(chuàng)建checkpointWriter用于將checkpoint信息持久化
checkpointWriter
//創(chuàng)建了Event監(jiān)聽(tīng)器霍掺,用于監(jiān)聽(tīng)JobGeneratorEvent并處理
eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = {
jobScheduler.reportError("Error in job generator", e)
}
}
eventLoop.start()
if (ssc.isCheckpointPresent) {
//從checkpoint中恢復(fù)
restart()
} else {
//首次創(chuàng)建
startFirstTime()
}
}
首次啟動(dòng)會(huì)調(diào)用startFirstTime(),在該方法中主要是調(diào)用已經(jīng)初始化好的RecurringTimer.start()進(jìn)行周期性的發(fā)送GenerateJobs事件滔韵,這個(gè)周期是ssc.graph.batchDuration.milliseconds也就是你所設(shè)置的batchTime,JobGenerate.start()中所創(chuàng)建好的EventLoop收到GenerateJobs事件掌实,就會(huì)執(zhí)行processEvent(),從而執(zhí)行g(shù)enerateJobs(time)來(lái)進(jìn)行Job的生成工作
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
logInfo("Started JobGenerator at " + startTime)
}
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
generateJobs的主要工作:
- 生成JobSet并調(diào)用jobScheduler.submitJobSet()進(jìn)行提交
- 發(fā)送一個(gè)DoCheckpointEvent進(jìn)行checkpoint
private def generateJobs(time: Time) {
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
第一個(gè)操作:jobScheduler.submitJobSet()中的主要操作是遍歷jobSet中的job陪蜻,并將其作為參數(shù)傳入JobHandler對(duì)象中,并將JobHandler丟到j(luò)obExecutor中去執(zhí)行贱鼻。JobHandler是實(shí)現(xiàn)了Runnable宴卖,它的run()主要做了以下三件事
- 發(fā)送JobStarted事件
- 執(zhí)行Job.run()
- 發(fā)送JobCompleted事件
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
private class JobHandler(job: Job) extends Runnable with Logging {
import JobScheduler._
def run() {
try {
var _eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobStarted(job, clock.getTimeMillis()))//發(fā)送JobStarted事件
SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
job.run()
}
_eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))//發(fā)送JobCompleted事件
}
} else {
}
} finally {
ssc.sparkContext.setLocalProperties(oldProps)
}
}
}
第二個(gè)操作:發(fā)送DoCheckpoint事件
JobScheduler.start()中創(chuàng)建的EventLoop的核心內(nèi)容是processEvent(event)方法,Event的類(lèi)型有三種邻悬,分別是JobStarted症昏、JobCompleted和ErrorReported,EventLoop收到DoCheckpoint事件后會(huì)執(zhí)行doCheckpoint():
//JobGenerator.processEvent()
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
...
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
...
}
}
doCheckpoint()調(diào)用graph.updateCheckpointData進(jìn)行checkpoint信息的更新父丰,調(diào)用checkpointWriter.write對(duì)checkpoint信息進(jìn)行持久化
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time " + time)
//將新的checkpoint寫(xiě)到
ssc.graph.updateCheckpointData(time)
//將checkpoint寫(xiě)到文件系統(tǒng)中
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
} else if (clearCheckpointDataLater) {
markBatchFullyProcessed(time)
}
}
checkpoint的update中主要是調(diào)用DStreamGraph.updateCheckpointData:
def updateCheckpointData(time: Time) {
logInfo("Updating checkpoint data for time " + time)
this.synchronized {
outputStreams.foreach(_.updateCheckpointData(time))
}
logInfo("Updated checkpoint data for time " + time)
}
outputStreams.foreach(_.updateCheckpointData(time))則是調(diào)用了DStream中的updateCheckpointData方法肝谭,而該方法主要是調(diào)用checkpointData.update(currentTime)來(lái)進(jìn)行更新,并且調(diào)用該DStream所依賴(lài)的DStream進(jìn)行更新蛾扇。
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()
private[streaming] def updateCheckpointData(currentTime: Time) {
logDebug(s"Updating checkpoint data for time $currentTime")
checkpointData.update(currentTime)
dependencies.foreach(_.updateCheckpointData(currentTime))
logDebug(s"Updated checkpoint data for time $currentTime: $checkpointData")
}
我們接下來(lái)來(lái)看看checkpointData.update(currentTime):我們可以在DStream中看到以下的實(shí)現(xiàn):
private[streaming] val checkpointData = new DStreamCheckpointData(this)
我們接著找到了:DStreamCheckpointData.update攘烛,DStreamCheckpointData有其他子類(lèi)用于自定義保存的內(nèi)容和邏輯
//key為指定時(shí)間,value為checkpoint file內(nèi)容
@transient private var timeToCheckpointFile = new HashMap[Time, String]
// key為batchtime镀首,value該batch中最先checkpointed RDD的time
@transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time]
protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]
def update(time: Time) {
//從dsteam中獲得要checkpoint的RDDs,generatedRDDs就是一個(gè)HashMap[Time, RDD[T]]
val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
.map(x => (x._1, x._2.getCheckpointFile.get))
logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n"))
// checkpoint文件添加到最后要進(jìn)行序列化的HashMap中
if (!checkpointFiles.isEmpty) {
currentCheckpointFiles.clear()
currentCheckpointFiles ++= checkpointFiles
//更新checkpointfile
timeToCheckpointFile ++= currentCheckpointFiles
// key為傳入的time坟漱,value為最先進(jìn)行checkpoint的rdd的time
timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering)
}
}
第四步:任務(wù)完成
在上面generateJobs中所調(diào)用的jobScheduler.submitJobSet()中提到每個(gè)Job都會(huì)作為參數(shù)傳入JobHandler,而JobHandler會(huì)丟到JobExecutor中去執(zhí)行,而JobHandler的主要工作是發(fā)送JobStarted事件更哄,執(zhí)行完任務(wù)后會(huì)發(fā)送JobCompleted事件芋齿,而JobScheduler.EventLoop收到事件后會(huì)執(zhí)行handleJobCompletion方法
//JobScheduler.processEvent()
private def processEvent(event: JobSchedulerEvent) {
try {
event match {
case JobStarted(job, startTime) => handleJobStart(job, startTime)
case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
case ErrorReported(m, e) => handleError(m, e)
}
} catch {
case e: Throwable =>
reportError("Error in job scheduler", e)
}
}
handleJobCompletion方法會(huì)調(diào)用jobSet.handleJobCompletion(job),并且在最后會(huì)判斷jobSet是否已經(jīng)全部完成成翩,如果是的話(huà)會(huì)執(zhí)行jobGenerator.onBatchCompletion(jobSet.time)
private def handleJobCompletion(job: Job, completedTime: Long) {
val jobSet = jobSets.get(job.time)
jobSet.handleJobCompletion(job)
job.setEndTime(completedTime)
listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
job.result match {
case Failure(e) =>
reportError("Error running job " + job, e)
case _ => //如果所有事件完成則會(huì)執(zhí)行以下操作
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
))
}
}
}
此時(shí)到JobGenerator中找到onBatchCompletion():
def onBatchCompletion(time: Time) {
eventLoop.post(ClearMetadata(time))
}
JobGenerator.processEvent()執(zhí)行clearMetadata(time)
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
clearMetadata()對(duì)原數(shù)據(jù)進(jìn)行checkpoint或者刪除
private def clearMetadata(time: Time) {
ssc.graph.clearMetadata(time)
// If checkpointing is enabled, then checkpoint,
// else mark batch to be fully processed
if (shouldCheckpoint) {
//如果需要進(jìn)行checkpoint觅捆,發(fā)送DoCheckpoint事件
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
} else {
//將meta數(shù)據(jù)進(jìn)行刪除
}
}
附:RecurringTimer和EventLoop的源碼,并作簡(jiǎn)單的注釋
RecurringTimer的代碼如下:
private[streaming]
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
extends Logging {
//創(chuàng)建一個(gè)thread捕传,用來(lái)執(zhí)行l(wèi)oop
private val thread = new Thread("RecurringTimer - " + name) {
setDaemon(true)
override def run() { loop }
}
@volatile private var prevTime = -1L
@volatile private var nextTime = -1L
@volatile private var stopped = false
def getStartTime(): Long = {
(math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period
}
def getRestartTime(originalStartTime: Long): Long = {
val gap = clock.getTimeMillis() - originalStartTime
(math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
}
//start方法中主要是啟動(dòng)thread惠拭,用于執(zhí)行thread中的loop
def start(startTime: Long): Long = synchronized {
nextTime = startTime
thread.start()
logInfo("Started timer for " + name + " at time " + nextTime)
nextTime
}
def start(): Long = {
start(getStartTime())
}
def stop(interruptTimer: Boolean): Long = synchronized {
if (!stopped) {
stopped = true
if (interruptTimer) {
thread.interrupt()
}
thread.join()
logInfo("Stopped timer for " + name + " after time " + prevTime)
}
prevTime
}
private def triggerActionForNextInterval(): Unit = {
clock.waitTillTime(nextTime)
callback(nextTime)
prevTime = nextTime
nextTime += period
logDebug("Callback for " + name + " called at time " + prevTime)
}
//周期性地執(zhí)行callback的內(nèi)容,也就是
private def loop() {
try {
while (!stopped) {
triggerActionForNextInterval()
}
triggerActionForNextInterval()
} catch {
case e: InterruptedException =>
}
}
}
EventLoop的源碼:主要功能就是創(chuàng)建一個(gè)線(xiàn)程在后臺(tái)判斷是否Event進(jìn)來(lái)庸论,有的話(huà)則進(jìn)行相應(yīng)的處理
private[spark] abstract class EventLoop[E](name: String) extends Logging {
private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
private val stopped = new AtomicBoolean(false)
private val eventThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = {
try {
while (!stopped.get) {
val event = eventQueue.take()
try {
onReceive(event)
} catch {
case NonFatal(e) =>
try {
onError(e)
} catch {
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
} catch {
case ie: InterruptedException => // exit even if eventQueue is not empty
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
def start(): Unit = {
if (stopped.get) {
throw new IllegalStateException(name + " has already been stopped")
}
// Call onStart before starting the event thread to make sure it happens before onReceive
onStart()
eventThread.start()
}
def stop(): Unit = {
if (stopped.compareAndSet(false, true)) {
eventThread.interrupt()
var onStopCalled = false
try {
eventThread.join()
// Call onStop after the event thread exits to make sure onReceive happens before onStop
onStopCalled = true
onStop()
} catch {
case ie: InterruptedException =>
Thread.currentThread().interrupt()
if (!onStopCalled) {
// ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
// it's already called.
onStop()
}
}
} else {
// Keep quiet to allow calling `stop` multiple times.
}
}
//將event放進(jìn)eventQueue中职辅,在eventThread進(jìn)行相應(yīng)的處理
def post(event: E): Unit = {
eventQueue.put(event)
}
//返回eventThread是否存活
def isActive: Boolean = eventThread.isAlive
//eventThread啟動(dòng)前調(diào)用
protected def onStart(): Unit = {}
//在eventThred結(jié)束后調(diào)用
protected def onStop(): Unit = {}
//實(shí)現(xiàn)類(lèi)實(shí)現(xiàn)Event的處理邏輯
protected def onReceive(event: E): Unit
//出錯(cuò)時(shí)的處理邏輯
protected def onError(e: Throwable): Unit
}