13 Spark Streaming源碼解讀之Driver容錯(cuò)安全性

Spark Streaming中Driver的容錯(cuò)主要是ReceiverTracker浊仆、Dstream.graph衷笋、JobGenerator的容錯(cuò)

  1. 第一芳杏、看ReceiverTracker的容錯(cuò),主要是ReceiverTracker接收元數(shù)據(jù)的存入WAL,看ReceiverTracker的addBlock方法,代碼如下
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    try {
      val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
      if (writeResult) {
        synchronized {
          getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
        }
        logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
          s"block ${receivedBlockInfo.blockStoreResult.blockId}")
      } else {
        logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
          s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
      }
      writeResult
    } catch {
      case NonFatal(e) =>
        logError(s"Error adding block $receivedBlockInfo", e)
        false
    }
}

writeToLog方法就是進(jìn)行WAL的操作爵赵,看writeToLog的代碼

 private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
    if (isWriteAheadLogEnabled) {
      logTrace(s"Writing record: $record")
      try {
        writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
          clock.getTimeMillis())
        true
      } catch {
        case NonFatal(e) =>
          logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
          false
      }
    } else {
      true
    }
}

首先判斷是否開(kāi)啟了WAL吝秕,根據(jù)一下isWriteAheadLogEnabled值

private[streaming] def isWriteAheadLogEnabled: Boolean = writeAheadLogOption.nonEmpty

接著看writeAheadLogOption

private val writeAheadLogOption = createWriteAheadLog()

再看createWriteAheadLog()方法

private def createWriteAheadLog(): Option[WriteAheadLog] = {
    checkpointDirOption.map { checkpointDir =>
      val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
      WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
    }
}

根據(jù)checkpoint的配置,獲取checkpoint的目錄亚再,這里可以看出郭膛,checkpoint可以有多個(gè)目錄。
寫完WAL才將receivedBlockInfo放到內(nèi)存隊(duì)列g(shù)etReceivedBlockQueue中

  1. 第二氛悬、看ReceivedBlockTracker的allocateBlocksToBatch方法,代碼如下
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
      val streamIdToBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
      }.toMap
      val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
        timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
        lastAllocatedBatchTime = batchTime
      } else {
        logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
      }
    } else {
      // This situation occurs when:
      // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
      // possibly processed batch job or half-processed batch job need to be processed again,
      // so the batchTime will be equal to lastAllocatedBatchTime.
      // 2. Slow checkpointing makes recovered batch time older than WAL recovered
      // lastAllocatedBatchTime.
      // This situation will only occurs in recovery time.
      logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
    }
}

從getReceivedBlockQueue中獲取每一個(gè)receiver的ReceivedBlockQueue隊(duì)列賦值給streamIdToBlocks耘柱,然后包裝一下

val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

allocatedBlocks就是根據(jù)時(shí)間獲取的一批元數(shù)據(jù)如捅,交給對(duì)應(yīng)batchDuration的job,job在執(zhí)行的時(shí)候就可以使用调煎,在使用前先進(jìn)行WAL镜遣,如果job出錯(cuò)恢復(fù)后,可以知道數(shù)據(jù)計(jì)算到什么位置

val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
        timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
        lastAllocatedBatchTime = batchTime
      } else {
        logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
}
  1. 第三士袄、看cleanupOldBatches方法悲关,cleanupOldBatches的功能是從內(nèi)存中清楚不用的batches元數(shù)據(jù),再刪除WAL的數(shù)據(jù)娄柳,再刪除之前把要?jiǎng)h除的batches信息也進(jìn)行WAL
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
    require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
    val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
    logInfo("Deleting batches " + timesToCleanup)
    if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
      timeToAllocatedBlocks --= timesToCleanup
      writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
    } else {
      logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")
    }
}
  1. 總結(jié)一下上面的三種WAL,對(duì)應(yīng)下面的三種事件寓辱,這就是ReceiverTracker的容錯(cuò)
/** Trait representing any event in the ReceivedBlockTracker that updates its state. */
private[streaming] sealed trait ReceivedBlockTrackerLogEvent
private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)
  extends ReceivedBlockTrackerLogEvent
private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)
  extends ReceivedBlockTrackerLogEvent
private[streaming] case class BatchCleanupEvent(times: Seq[Time])  extends ReceivedBlockTrackerLogEvent
  1. 看一下Dstream.graph和JobGenerator的容錯(cuò),從開(kāi)始
  private def generateJobs(time: Time) {
   SparkEnv has been removed.
    SparkEnv.set(ssc.env)
    Try {

      // allocate received blocks to batch
      // 分配接收到的數(shù)據(jù)給batch
      jobScheduler.receiverTracker.allocateBlocksToBatch(time)
      // 使用分配的塊生成jobs
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        // 獲取元數(shù)據(jù)信息
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        // 提交jobSet
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

jobs生成完成后發(fā)送DoCheckpoint消息赤拒,最終調(diào)用doCheckpoint方法,代碼如下

private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
    if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
      logInfo("Checkpointing graph for time " + time)
      ssc.graph.updateCheckpointData(time)
      checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
    }
}

updateCheckpointData和checkpointWriter.write做了什么秫筏,后續(xù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市挎挖,隨后出現(xiàn)的幾起案子这敬,更是在濱河造成了極大的恐慌,老刑警劉巖蕉朵,帶你破解...
    沈念sama閱讀 221,888評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件崔涂,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡始衅,警方通過(guò)查閱死者的電腦和手機(jī)冷蚂,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,677評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)觅闽,“玉大人帝雇,你說(shuō)我怎么就攤上這事◎茸荆” “怎么了尸闸?”我有些...
    開(kāi)封第一講書人閱讀 168,386評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我吮廉,道長(zhǎng)苞尝,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 59,726評(píng)論 1 297
  • 正文 為了忘掉前任宦芦,我火速辦了婚禮宙址,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘调卑。我一直安慰自己抡砂,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,729評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布恬涧。 她就那樣靜靜地躺著注益,像睡著了一般。 火紅的嫁衣襯著肌膚如雪溯捆。 梳的紋絲不亂的頭發(fā)上丑搔,一...
    開(kāi)封第一講書人閱讀 52,337評(píng)論 1 310
  • 那天,我揣著相機(jī)與錄音提揍,去河邊找鬼啤月。 笑死,一個(gè)胖子當(dāng)著我的面吹牛劳跃,可吹牛的內(nèi)容都是我干的谎仲。 我是一名探鬼主播,決...
    沈念sama閱讀 40,902評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼售碳,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼强重!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起贸人,我...
    開(kāi)封第一講書人閱讀 39,807評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤间景,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后艺智,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體倘要,經(jīng)...
    沈念sama閱讀 46,349評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,439評(píng)論 3 340
  • 正文 我和宋清朗相戀三年十拣,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了封拧。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,567評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡夭问,死狀恐怖泽西,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情缰趋,我是刑警寧澤捧杉,帶...
    沈念sama閱讀 36,242評(píng)論 5 350
  • 正文 年R本政府宣布陕见,位于F島的核電站,受9級(jí)特大地震影響味抖,放射性物質(zhì)發(fā)生泄漏评甜。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,933評(píng)論 3 334
  • 文/蒙蒙 一仔涩、第九天 我趴在偏房一處隱蔽的房頂上張望忍坷。 院中可真熱鬧,春花似錦熔脂、人聲如沸佩研。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 32,420評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)韧骗。三九已至,卻和暖如春零聚,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背些侍。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,531評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工隶症, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人岗宣。 一個(gè)月前我還...
    沈念sama閱讀 48,995評(píng)論 3 377
  • 正文 我出身青樓蚂会,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親耗式。 傳聞我的和親對(duì)象是個(gè)殘疾皇子胁住,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,585評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容