Spark Streaming中Driver的容錯(cuò)主要是ReceiverTracker浊仆、Dstream.graph衷笋、JobGenerator的容錯(cuò)
- 第一芳杏、看ReceiverTracker的容錯(cuò),主要是ReceiverTracker接收元數(shù)據(jù)的存入WAL,看ReceiverTracker的addBlock方法,代碼如下
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
try {
val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
if (writeResult) {
synchronized {
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
}
logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
s"block ${receivedBlockInfo.blockStoreResult.blockId}")
} else {
logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
}
writeResult
} catch {
case NonFatal(e) =>
logError(s"Error adding block $receivedBlockInfo", e)
false
}
}
writeToLog方法就是進(jìn)行WAL的操作爵赵,看writeToLog的代碼
private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
if (isWriteAheadLogEnabled) {
logTrace(s"Writing record: $record")
try {
writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
clock.getTimeMillis())
true
} catch {
case NonFatal(e) =>
logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
false
}
} else {
true
}
}
首先判斷是否開(kāi)啟了WAL吝秕,根據(jù)一下isWriteAheadLogEnabled值
private[streaming] def isWriteAheadLogEnabled: Boolean = writeAheadLogOption.nonEmpty
接著看writeAheadLogOption
private val writeAheadLogOption = createWriteAheadLog()
再看createWriteAheadLog()方法
private def createWriteAheadLog(): Option[WriteAheadLog] = {
checkpointDirOption.map { checkpointDir =>
val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
}
}
根據(jù)checkpoint的配置,獲取checkpoint的目錄亚再,這里可以看出郭膛,checkpoint可以有多個(gè)目錄。
寫完WAL才將receivedBlockInfo放到內(nèi)存隊(duì)列g(shù)etReceivedBlockQueue中
- 第二氛悬、看ReceivedBlockTracker的allocateBlocksToBatch方法,代碼如下
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
}
} else {
// This situation occurs when:
// 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
// possibly processed batch job or half-processed batch job need to be processed again,
// so the batchTime will be equal to lastAllocatedBatchTime.
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
}
}
從getReceivedBlockQueue中獲取每一個(gè)receiver的ReceivedBlockQueue隊(duì)列賦值給streamIdToBlocks耘柱,然后包裝一下
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
allocatedBlocks就是根據(jù)時(shí)間獲取的一批元數(shù)據(jù)如捅,交給對(duì)應(yīng)batchDuration的job,job在執(zhí)行的時(shí)候就可以使用调煎,在使用前先進(jìn)行WAL镜遣,如果job出錯(cuò)恢復(fù)后,可以知道數(shù)據(jù)計(jì)算到什么位置
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
}
- 第三士袄、看cleanupOldBatches方法悲关,cleanupOldBatches的功能是從內(nèi)存中清楚不用的batches元數(shù)據(jù),再刪除WAL的數(shù)據(jù)娄柳,再刪除之前把要?jiǎng)h除的batches信息也進(jìn)行WAL
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
logInfo("Deleting batches " + timesToCleanup)
if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
timeToAllocatedBlocks --= timesToCleanup
writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
} else {
logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")
}
}
- 總結(jié)一下上面的三種WAL,對(duì)應(yīng)下面的三種事件寓辱,這就是ReceiverTracker的容錯(cuò)
/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
private[streaming] sealed trait ReceivedBlockTrackerLogEvent
private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
extends ReceivedBlockTrackerLogEvent
private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
extends ReceivedBlockTrackerLogEvent
private[streaming] case class BatchCleanupEvent(times: Seq[Time]) extends ReceivedBlockTrackerLogEvent
- 看一下Dstream.graph和JobGenerator的容錯(cuò),從開(kāi)始
private def generateJobs(time: Time) {
SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
// allocate received blocks to batch
// 分配接收到的數(shù)據(jù)給batch
jobScheduler.receiverTracker.allocateBlocksToBatch(time)
// 使用分配的塊生成jobs
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
// 獲取元數(shù)據(jù)信息
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
// 提交jobSet
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
jobs生成完成后發(fā)送DoCheckpoint消息赤拒,最終調(diào)用doCheckpoint方法,代碼如下
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time " + time)
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
}
}
updateCheckpointData和checkpointWriter.write做了什么秫筏,后續(xù)