Spark Streaming容錯(cuò)機(jī)制保障
參考https://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html這篇文章霉祸,Spark Streaming主要有三處做了數(shù)據(jù)容錯(cuò)機(jī)制:
- Reciever測(cè):
- WriteAheadLogBasedStoreResult通過(guò)storeBlock()方法保存到blockManager和WAL中蓝撇;
- Driver測(cè):
- ReceivedBlockTracker: 處理收到reciever和driver scheduler的調(diào)度信息時(shí)崇决,會(huì)將觸發(fā)的時(shí)間信息保存至wal中(此處類似mysql的redo日志)牺六;
- Checkpoint機(jī)制: 在driver shechuler觸發(fā)time時(shí)間下的generateJob()之后保存這個(gè)時(shí)間的checkpoint信息颤枪,以保障任務(wù)突然失敗后的恢復(fù)邏輯;
Reciever測(cè)
WriteAheadLogBasedStoreResult容錯(cuò)邏輯淑际,并行地保存block至blockManager和WAL中畏纲,分兩步介紹。
Reciever將block保存至blockManager
如果不配置使用wal春缕,保存至blockManager的storeageLevel是用戶手動(dòng)指定的盗胀,在kafka中默認(rèn)的level為:StorageLevel.MEMORY_AND_DISK_SER_2;
如果配置使用wal锄贼,則會(huì)忽略用戶使用的storageLevel读整,使用如下的storageLevel等級(jí),默認(rèn)可以使用memory和disk,同時(shí)1個(gè)備份:
private val blockStoreTimeout = conf.getInt(
"spark.streaming.receiver.blockStoreTimeout", 30).seconds
private val effectiveStorageLevel = {
if (storageLevel.deserialized) {
logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
s" write ahead log is enabled, change to serialization false")
}
if (storageLevel.replication > 1) {
logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
s"write ahead log is enabled, change to replication 1")
}
StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
}
if (storageLevel != effectiveStorageLevel) {
logWarning(s"User defined storage level $storageLevel is changed to effective storage level " +
s"$effectiveStorageLevel when write ahead log is enabled")
}
寫入WAL
該次write米间,會(huì)調(diào)用flush()強(qiáng)制落盤强品,所以一旦返回,一定保障數(shù)據(jù)寫入屈糊、備份成功的榛。
問(wèn)題1: 該wal并不會(huì)用于recover,因?yàn)樵趓eciver測(cè)并沒(méi)有找到recover的接口逻锐,那該wal有什么用途呢夫晌?
當(dāng)然保障數(shù)據(jù)的安全性了,在driver測(cè)會(huì)保存blockInfo信息昧诱,一定要保障blockInfo信息對(duì)應(yīng)的block存在晓淀;
問(wèn)題2:該wal因?yàn)楸4嬲鎸?shí)的數(shù)據(jù),會(huì)占用不少空間盏档,它的清理邏輯是怎樣的凶掰?
當(dāng)該batch完成之后,會(huì)觸發(fā)一個(gè)ClearMetadata()事件蜈亩,程序判定是否開(kāi)啟wal懦窘,如果開(kāi)啟則會(huì)清理該batch對(duì)應(yīng)的wal;
def onBatchCompletion(time: Time) {
eventLoop.post(ClearMetadata(time))
}
Driver測(cè)
ReceivedBlockTracker
ReceivedBlockTracker測(cè)的wal是跟配置沒(méi)有關(guān)系的稚配,具體參考該issue:https://issues.apache.org/jira/browse/SPARK-7139畅涂,它的作用是將接收到的各個(gè)事件(保存的信息很少),輸出至wal中(該名字雖然叫wal道川,跟上述的wal概念還是不一樣的)午衰;
其保存的具體信息有,在ReceivedBlockTracker類中搜索writeToLog方法即可冒萄,可以發(fā)現(xiàn)有如下三處:
writeToLog(BlockAdditionEvent(receivedBlockInfo)
writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks)
writeToLog(BatchCleanupEvent(timesToCleanup)
// 對(duì)應(yīng)的事件類
private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
extends ReceivedBlockTrackerLogEvent
private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
extends ReceivedBlockTrackerLogEvent
private[streaming] case class BatchCleanupEvent(times: Seq[Time])
extends ReceivedBlockTrackerLogEvent
ReceivedBlockInfo(
streamId: Int,
numRecords: Option[Long],
metadataOption: Option[Any],
blockStoreResult: ReceivedBlockStoreResult
)
private[streaming] trait ReceivedBlockStoreResult {
// Any implementation of this trait will store a block id
def blockId: StreamBlockId
// Any implementation of this trait will have to return the number of records
def numRecords: Option[Long]
}
private[streaming] case class WriteAheadLogBasedStoreResult(
blockId: StreamBlockId,
numRecords: Option[Long],
walRecordHandle: WriteAheadLogRecordHandle
)
private[streaming] case class FileBasedWriteAheadLogSegment(path: String, offset: Long, length: Int)
extends WriteAheadLogRecordHandle
case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
def getBlocksOfStream(streamId: Int): Seq[ReceivedBlockInfo] = {
streamIdToAllocatedBlocks.getOrElse(streamId, Seq.empty)
}
}
可以看出其保存的核心信息為ReceivedBlockInfo苇经,其具體包含有:
- streamId: 每個(gè)stream的唯一標(biāo)示;
- numRecords: 該batch包含的記錄數(shù)量宦言;
- metaDataOption: 可選metaData信息;
- blockStoreResult: ReceivedBlockStoreResult是一個(gè)trait商模,根據(jù)該字段可以判定其在reciever測(cè)是否使用wal奠旺,同時(shí)會(huì)保存blockId -> (path, offset, length)的映射;
該實(shí)現(xiàn)默認(rèn)是在初始化時(shí)開(kāi)啟恢復(fù)邏輯的施流,其邏輯類似于許多存儲(chǔ)引擎的回放响疚,具體實(shí)現(xiàn)如下:
// Recover block information from write ahead logs
if (recoverFromWriteAheadLog) {
recoverPastEvents()
}
llocated block info) prior to failure.
*/
private def recoverPastEvents(): Unit = synchronized {
// Insert the recovered block information
def insertAddedBlock(receivedBlockInfo: ReceivedBlockInfo) {
logTrace(s"Recovery: Inserting added block $receivedBlockInfo")
receivedBlockInfo.setBlockIdInvalid()
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
}
// Insert the recovered block-to-batch allocations and clear the queue of received blocks
// (when the blocks were originally allocated to the batch, the queue must have been cleared).
def insertAllocatedBatch(batchTime: Time, allocatedBlocks: AllocatedBlocks) {
logTrace(s"Recovery: Inserting allocated batch for time $batchTime to " +
s"${allocatedBlocks.streamIdToAllocatedBlocks}")
streamIdToUnallocatedBlockQueues.values.foreach { _.clear() }
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
}
// Cleanup the batch allocations
def cleanupBatches(batchTimes: Seq[Time]) {
logTrace(s"Recovery: Cleaning up batches $batchTimes")
timeToAllocatedBlocks --= batchTimes
}
writeAheadLogOption.foreach { writeAheadLog =>
logInfo(s"Recovering from write ahead logs in ${checkpointDirOption.get}")
writeAheadLog.readAll().asScala.foreach { byteBuffer =>
logInfo("Recovering record " + byteBuffer)
Utils.deserialize[ReceivedBlockTrackerLogEvent](
JavaUtils.bufferToArray(byteBuffer), Thread.currentThread().getContextClassLoader) match {
case BlockAdditionEvent(receivedBlockInfo) =>
insertAddedBlock(receivedBlockInfo)
case BatchAllocationEvent(time, allocatedBlocks) =>
insertAllocatedBatch(time, allocatedBlocks)
case BatchCleanupEvent(batchTimes) =>
cleanupBatches(batchTimes)
}
}
}
}
Checkpoint
class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
val master = ssc.sc.master
val framework = ssc.sc.appName
val jars = ssc.sc.jars
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
val sparkConfPairs = ssc.conf.getAll
def createSparkConf(): SparkConf = {
}
}
通過(guò)Checkpoint類可以看出,其保存至hdfs的信息有:
- master: Spark運(yùn)行master;
- framework: Spark啟動(dòng)名字瞪醋;
- jars: Spark運(yùn)行依賴jars;
- graph: Streaming運(yùn)行依賴graph圖(我理解是所依賴的rdd信息)忿晕;
- checkpointDir: checkpoint路徑;
- checkpointDuration: checkpoint周期银受;
- pendingTimes: 調(diào)度pending時(shí)間践盼;
- sparkConfPairs: sparkConf鸦采;
其保存和恢復(fù)邏輯較為簡(jiǎn)單:
保存:每個(gè)batch時(shí)間都會(huì)保存該checkpoit(當(dāng)然checkpoint周期也可以設(shè)置);
恢復(fù):?jiǎn)?dòng)driver時(shí)咕幻,會(huì)首先嘗試從checkpoint中恢復(fù)渔伯;
參考:
- ReceivedBlockTracker WAL實(shí)現(xiàn): https://issues.apache.org/jira/browse/SPARK-7139
- Databrick 關(guān)于一致性的文章: https://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html
- Spark Streaming WAL實(shí)現(xiàn):https://issues.apache.org/jira/browse/SPARK-3129
- Spark Streaming HA 設(shè)計(jì): https://docs.google.com/document/d/1vTCB5qVfyxQPlHuv8rit9-zjdttlgaSrMgfCDQlCJIM/edit#
- https://github.com/apache/spark/pull/3721
- https://github.com/lw-lin/CoolplaySpark/blob/master/Spark%20Streaming%20源碼解析系列/1.2%20DStream%20生成%20RDD%20實(shí)例詳解.md