要篇內容介紹Executor的容錯琢感,容錯方式有WAL碗暗、消息重放、其他
- 首先介紹WAL的方法昌阿,就是保存數據前饥脑,先把數據寫日志。從ReceiverSupervisorImpl的pushAndReportBlock的方法開始看懦冰,代碼如下
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
調用receivedBlockHandler的storeBlock方法灶轰,receivedBlockHandler決定了采用哪種方式來存儲數據,代碼如下
private val receivedBlockHandler: ReceivedBlockHandler = {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
}
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else {
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
}
}
如果開啟WAL的方式刷钢,會將數據保存到checkpoint目錄笋颤,如果checkpoint目錄沒有配置,就拋出異常内地。
先看WriteAheadLogBasedBlockHandler伴澄,開啟WAL后,采用BlockManager存儲數據時就不需要復本了阱缓,否則和WAL同時做容錯就是重復性工作非凌,降低了系統(tǒng)的性能。
再看BlockManagerBasedBlockHandler荆针,就是將數據交給BlockManager存儲敞嗡,根據用戶定義的存儲級別來存儲,系統(tǒng)一般默認存儲級別為MEMORY_AND_DISK_SER_2航背,如果對數據安全性要求不高也可以不要復本秸妥。
- 消息重放就是一種非常高效的方式,采用kafka的Direct API接口讀取數據時首先計算offset的位置沃粗,如果job異常粥惧,根據消費的offset位置重新指定kafka的offset,從失敗的位置讀取最盅。kafka直接做為文件存儲系統(tǒng)突雪,就像hdfs一樣,具體怎么使用以后的章節(jié)還會介紹涡贱。