Spark Streaming一致性爵卒、容錯(cuò)機(jī)制分析

Spark Streaming容錯(cuò)機(jī)制保障

參考https://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html這篇文章霉祸,Spark Streaming主要有三處做了數(shù)據(jù)容錯(cuò)機(jī)制:

  • Reciever測(cè):
    • WriteAheadLogBasedStoreResult通過(guò)storeBlock()方法保存到blockManager和WAL中蓝撇;
  • Driver測(cè):
    • ReceivedBlockTracker: 處理收到reciever和driver scheduler的調(diào)度信息時(shí)崇决,會(huì)將觸發(fā)的時(shí)間信息保存至wal中(此處類似mysql的redo日志)牺六;
    • Checkpoint機(jī)制: 在driver shechuler觸發(fā)time時(shí)間下的generateJob()之后保存這個(gè)時(shí)間的checkpoint信息颤枪,以保障任務(wù)突然失敗后的恢復(fù)邏輯;

Reciever測(cè)

WriteAheadLogBasedStoreResult容錯(cuò)邏輯淑际,并行地保存block至blockManager和WAL中畏纲,分兩步介紹。

Reciever將block保存至blockManager

如果不配置使用wal春缕,保存至blockManager的storeageLevel是用戶手動(dòng)指定的盗胀,在kafka中默認(rèn)的level為:StorageLevel.MEMORY_AND_DISK_SER_2;

如果配置使用wal锄贼,則會(huì)忽略用戶使用的storageLevel读整,使用如下的storageLevel等級(jí),默認(rèn)可以使用memory和disk,同時(shí)1個(gè)備份:

  private val blockStoreTimeout = conf.getInt(
    "spark.streaming.receiver.blockStoreTimeout", 30).seconds

  private val effectiveStorageLevel = {
    if (storageLevel.deserialized) {
      logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
        s" write ahead log is enabled, change to serialization false")
    }
    if (storageLevel.replication > 1) {
      logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
        s"write ahead log is enabled, change to replication 1")
    }

    StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
  }

  if (storageLevel != effectiveStorageLevel) {
    logWarning(s"User defined storage level $storageLevel is changed to effective storage level " +
      s"$effectiveStorageLevel when write ahead log is enabled")
  }

寫入WAL

該次write米间,會(huì)調(diào)用flush()強(qiáng)制落盤强品,所以一旦返回,一定保障數(shù)據(jù)寫入屈糊、備份成功的榛。

問(wèn)題1: 該wal并不會(huì)用于recover,因?yàn)樵趓eciver測(cè)并沒(méi)有找到recover的接口逻锐,那該wal有什么用途呢夫晌?

當(dāng)然保障數(shù)據(jù)的安全性了,在driver測(cè)會(huì)保存blockInfo信息昧诱,一定要保障blockInfo信息對(duì)應(yīng)的block存在晓淀;

問(wèn)題2:該wal因?yàn)楸4嬲鎸?shí)的數(shù)據(jù),會(huì)占用不少空間盏档,它的清理邏輯是怎樣的凶掰?

當(dāng)該batch完成之后,會(huì)觸發(fā)一個(gè)ClearMetadata()事件蜈亩,程序判定是否開(kāi)啟wal懦窘,如果開(kāi)啟則會(huì)清理該batch對(duì)應(yīng)的wal;

  def onBatchCompletion(time: Time) {
    eventLoop.post(ClearMetadata(time))
  }

Driver測(cè)

ReceivedBlockTracker

ReceivedBlockTracker測(cè)的wal是跟配置沒(méi)有關(guān)系的稚配,具體參考該issue:https://issues.apache.org/jira/browse/SPARK-7139畅涂,它的作用是將接收到的各個(gè)事件(保存的信息很少),輸出至wal中(該名字雖然叫wal道川,跟上述的wal概念還是不一樣的)午衰;

其保存的具體信息有,在ReceivedBlockTracker類中搜索writeToLog方法即可冒萄,可以發(fā)現(xiàn)有如下三處:

writeToLog(BlockAdditionEvent(receivedBlockInfo)
writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks)
writeToLog(BatchCleanupEvent(timesToCleanup)

// 對(duì)應(yīng)的事件類
private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
  extends ReceivedBlockTrackerLogEvent
private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
  extends ReceivedBlockTrackerLogEvent
private[streaming] case class BatchCleanupEvent(times: Seq[Time])
  extends ReceivedBlockTrackerLogEvent
  
  
ReceivedBlockInfo(
    streamId: Int,
    numRecords: Option[Long],
    metadataOption: Option[Any],
    blockStoreResult: ReceivedBlockStoreResult
  ) 

private[streaming] trait ReceivedBlockStoreResult {
  // Any implementation of this trait will store a block id
  def blockId: StreamBlockId
  // Any implementation of this trait will have to return the number of records
  def numRecords: Option[Long]
}  

private[streaming] case class WriteAheadLogBasedStoreResult(
    blockId: StreamBlockId,
    numRecords: Option[Long],
    walRecordHandle: WriteAheadLogRecordHandle
  ) 
private[streaming] case class FileBasedWriteAheadLogSegment(path: String, offset: Long, length: Int)
  extends WriteAheadLogRecordHandle  
  
case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
  def getBlocksOfStream(streamId: Int): Seq[ReceivedBlockInfo] = {
    streamIdToAllocatedBlocks.getOrElse(streamId, Seq.empty)
  }
}

可以看出其保存的核心信息為ReceivedBlockInfo苇经,其具體包含有:

  • streamId: 每個(gè)stream的唯一標(biāo)示;
  • numRecords: 該batch包含的記錄數(shù)量宦言;
  • metaDataOption: 可選metaData信息;
  • blockStoreResult: ReceivedBlockStoreResult是一個(gè)trait商模,根據(jù)該字段可以判定其在reciever測(cè)是否使用wal奠旺,同時(shí)會(huì)保存blockId -> (path, offset, length)的映射;

該實(shí)現(xiàn)默認(rèn)是在初始化時(shí)開(kāi)啟恢復(fù)邏輯的施流,其邏輯類似于許多存儲(chǔ)引擎的回放响疚,具體實(shí)現(xiàn)如下:

  // Recover block information from write ahead logs
  if (recoverFromWriteAheadLog) {
    recoverPastEvents()
  }
  
  llocated block info) prior to failure.
   */
  private def recoverPastEvents(): Unit = synchronized {
    // Insert the recovered block information
    def insertAddedBlock(receivedBlockInfo: ReceivedBlockInfo) {
      logTrace(s"Recovery: Inserting added block $receivedBlockInfo")
      receivedBlockInfo.setBlockIdInvalid()
      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
    }

    // Insert the recovered block-to-batch allocations and clear the queue of received blocks
    // (when the blocks were originally allocated to the batch, the queue must have been cleared).
    def insertAllocatedBatch(batchTime: Time, allocatedBlocks: AllocatedBlocks) {
      logTrace(s"Recovery: Inserting allocated batch for time $batchTime to " +
        s"${allocatedBlocks.streamIdToAllocatedBlocks}")
      streamIdToUnallocatedBlockQueues.values.foreach { _.clear() }
      timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
      lastAllocatedBatchTime = batchTime
    }

    // Cleanup the batch allocations
    def cleanupBatches(batchTimes: Seq[Time]) {
      logTrace(s"Recovery: Cleaning up batches $batchTimes")
      timeToAllocatedBlocks --= batchTimes
    }

    writeAheadLogOption.foreach { writeAheadLog =>
      logInfo(s"Recovering from write ahead logs in ${checkpointDirOption.get}")
      writeAheadLog.readAll().asScala.foreach { byteBuffer =>
        logInfo("Recovering record " + byteBuffer)
        Utils.deserialize[ReceivedBlockTrackerLogEvent](
          JavaUtils.bufferToArray(byteBuffer), Thread.currentThread().getContextClassLoader) match {
          case BlockAdditionEvent(receivedBlockInfo) =>
            insertAddedBlock(receivedBlockInfo)
          case BatchAllocationEvent(time, allocatedBlocks) =>
            insertAllocatedBatch(time, allocatedBlocks)
          case BatchCleanupEvent(batchTimes) =>
            cleanupBatches(batchTimes)
        }
      }
    }
  }

Checkpoint

class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
  extends Logging with Serializable {
  val master = ssc.sc.master
  val framework = ssc.sc.appName
  val jars = ssc.sc.jars
  val graph = ssc.graph
  val checkpointDir = ssc.checkpointDir
  val checkpointDuration = ssc.checkpointDuration
  val pendingTimes = ssc.scheduler.getPendingTimes().toArray
  val sparkConfPairs = ssc.conf.getAll

  def createSparkConf(): SparkConf = {
  }
}

通過(guò)Checkpoint類可以看出,其保存至hdfs的信息有:

  • master: Spark運(yùn)行master;
  • framework: Spark啟動(dòng)名字瞪醋;
  • jars: Spark運(yùn)行依賴jars;
  • graph: Streaming運(yùn)行依賴graph圖(我理解是所依賴的rdd信息)忿晕;
  • checkpointDir: checkpoint路徑;
  • checkpointDuration: checkpoint周期银受;
  • pendingTimes: 調(diào)度pending時(shí)間践盼;
  • sparkConfPairs: sparkConf鸦采;

其保存和恢復(fù)邏輯較為簡(jiǎn)單:

保存:每個(gè)batch時(shí)間都會(huì)保存該checkpoit(當(dāng)然checkpoint周期也可以設(shè)置);
恢復(fù):?jiǎn)?dòng)driver時(shí)咕幻,會(huì)首先嘗試從checkpoint中恢復(fù)渔伯;

參考:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市肄程,隨后出現(xiàn)的幾起案子锣吼,更是在濱河造成了極大的恐慌,老刑警劉巖蓝厌,帶你破解...
    沈念sama閱讀 207,248評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件玄叠,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡拓提,警方通過(guò)查閱死者的電腦和手機(jī)读恃,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)崎苗,“玉大人狐粱,你說(shuō)我怎么就攤上這事〉ㄊ” “怎么了肌蜻?”我有些...
    開(kāi)封第一講書人閱讀 153,443評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)必尼。 經(jīng)常有香客問(wèn)我蒋搜,道長(zhǎng),這世上最難降的妖魔是什么判莉? 我笑而不...
    開(kāi)封第一講書人閱讀 55,475評(píng)論 1 279
  • 正文 為了忘掉前任豆挽,我火速辦了婚禮,結(jié)果婚禮上券盅,老公的妹妹穿的比我還像新娘帮哈。我一直安慰自己,他們只是感情好锰镀,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,458評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布娘侍。 她就那樣靜靜地躺著,像睡著了一般泳炉。 火紅的嫁衣襯著肌膚如雪憾筏。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 49,185評(píng)論 1 284
  • 那天花鹅,我揣著相機(jī)與錄音氧腰,去河邊找鬼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛古拴,可吹牛的內(nèi)容都是我干的箩帚。 我是一名探鬼主播,決...
    沈念sama閱讀 38,451評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼斤富,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼膏潮!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起满力,我...
    開(kāi)封第一講書人閱讀 37,112評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤焕参,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后油额,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體叠纷,經(jīng)...
    沈念sama閱讀 43,609評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,083評(píng)論 2 325
  • 正文 我和宋清朗相戀三年潦嘶,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了涩嚣。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,163評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡掂僵,死狀恐怖航厚,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情锰蓬,我是刑警寧澤幔睬,帶...
    沈念sama閱讀 33,803評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站芹扭,受9級(jí)特大地震影響麻顶,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜舱卡,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,357評(píng)論 3 307
  • 文/蒙蒙 一辅肾、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧轮锥,春花似錦矫钓、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,357評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至蝴簇,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間匆帚,已是汗流浹背熬词。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,590評(píng)論 1 261
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人互拾。 一個(gè)月前我還...
    沈念sama閱讀 45,636評(píng)論 2 355
  • 正文 我出身青樓歪今,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親颜矿。 傳聞我的和親對(duì)象是個(gè)殘疾皇子寄猩,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,925評(píng)論 2 344

推薦閱讀更多精彩內(nèi)容