11 Spark Streaming源碼解讀之Driver中的ReceiverTracker架構(gòu)設(shè)計(jì)以及具體實(shí)現(xiàn)徹底研究

本篇內(nèi)容從ReceiverTracker消息通訊的角度來研究ReceiverTracker的源碼

  1. 在第10篇中介紹過Receiver的啟動伦连、注冊彻坛、數(shù)據(jù)匯報(bào),接著第10篇的內(nèi)容看。從ReceiverSupervisorImpl的pushAndReportBlock方法開始向瓷,代碼如下
def pushAndReportBlock(
      receivedBlock: ReceivedBlock,
      metadataOption: Option[Any],
      blockIdOption: Option[StreamBlockId]
    ) {
    val blockId = blockIdOption.getOrElse(nextBlockId)
    val time = System.currentTimeMillis
    val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
    logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
    val numRecords = blockStoreResult.numRecords
    val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
    trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
    logDebug(s"Reported block $blockId")
}

向trackerEndpoint匯報(bào)AddBlock消息扔字,blockInfo只是一個(gè)簡單的case class征唬,代碼如下

private[streaming] case class ReceivedBlockInfo(
    streamId: Int,
    numRecords: Option[Long],
    metadataOption: Option[Any],
    blockStoreResult: ReceivedBlockStoreResult
  ) {

    require(numRecords.isEmpty || numRecords.get >= 0, "numRecords must not be negative")
    
    @volatile private var _isBlockIdValid = true
    
    def blockId: StreamBlockId = blockStoreResult.blockId
    
    def walRecordHandleOption: Option[WriteAheadLogRecordHandle] = {
    blockStoreResult match {
      case walStoreResult: WriteAheadLogBasedStoreResult => Some(walStoreResult.walRecordHandle)
      case _ => None
    }
    }
    
    /** Is the block ID valid, that is, is the block present in the Spark executors. */
    def isBlockIdValid(): Boolean = _isBlockIdValid
    
    /**
    * Set the block ID as invalid. This is useful when it is known that the block is not present
    * in the Spark executors.
    * 當(dāng)block在Executors中不存在時(shí),將block ID 設(shè)置為無效的
    */
    def setBlockIdInvalid(): Unit = {
    _isBlockIdValid = false
    }
}

里面沒什么信息丢烘,看ReceivedBlockStoreResult,代碼如下

private[streaming] trait ReceivedBlockStoreResult {
  // Any implementation of this trait will store a block id
  def blockId: StreamBlockId
  // Any implementation of this trait will have to return the number of records
  def numRecords: Option[Long]
}

他只是一個(gè)接口些椒,看他的兩個(gè)子類播瞳,分別是WriteAheadLogBasedStoreResult和BlockManagerBasedStoreResult。在WriteAheadLogBasedStoreResult類中多了一個(gè)WriteAheadLogRecordHandle免糕。

  1. 看ReceiverTrackerEndpoint中的receiveAndReply是怎樣接收AddBlock消息的赢乓,代碼如下
case AddBlock(receivedBlockInfo) =>
    if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
      walBatchingThreadPool.execute(new Runnable {
        override def run(): Unit = Utils.tryLogNonFatalError {
          if (active) {
            context.reply(addBlock(receivedBlockInfo))
          } else {
            throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
          }
        }
      })
    } else {
      context.reply(addBlock(receivedBlockInfo))
    }

首先判斷是否采用WAL的方法保存元數(shù)據(jù),默認(rèn)為true说墨。如果是WAL的方式存儲骏全,WAL采用了一個(gè)線程池來處理操作苍柏。兩種方法最終都是調(diào)用addBlock(receivedBlockInfo)方法尼斧,addBlock的代碼如下

private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    receivedBlockTracker.addBlock(receivedBlockInfo)
}

這里什么也沒做就把任務(wù)交給了receivedBlockTracker,ReceivedBlockTracker在ReceiverTrack實(shí)例化的時(shí)候被創(chuàng)建试吁」卓茫看他的addBlock方法,代碼如下

def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    try {
      val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
      if (writeResult) {
        synchronized {
          getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
        }
        logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
          s"block ${receivedBlockInfo.blockStoreResult.blockId}")
      } else {
        logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
          s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
      }
      writeResult
    } catch {
      case NonFatal(e) =>
        logError(s"Error adding block $receivedBlockInfo", e)
        false
    }
}

首先調(diào)用writeToLog方法熄捍,將receivedBlockInfo放到BlockAdditionEvent類中烛恤,傳遞進(jìn)去 ,writeToLog的代碼如下

private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
    if (isWriteAheadLogEnabled) {
      logTrace(s"Writing record: $record")
      try {
        writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
          clock.getTimeMillis())
        true
      } catch {
        case NonFatal(e) =>
          logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
          false
      }
    } else {
      true
    }
}

如果是WAL的方式余耽,就是把record序列化后存儲缚柏,返回操作結(jié)果true,否則直接返回true。
回到上面的判斷if (writeResult)碟贾,將receivedBlockInfo放入到getReceivedBlockQueue隊(duì)列中币喧,看一下getReceivedBlockQueue的代碼

private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
    streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
}

先從streamIdToUnallocatedBlockQueues中獲取ReceivedBlockQueue隊(duì)列,如果沒有放一個(gè)新的ReceivedBlockQueue袱耽,取到隊(duì)列后將receivedBlockInfo放入隊(duì)列杀餐。每一個(gè)receiver對應(yīng)一個(gè)自己的隊(duì)列,streamIdToUnallocatedBlockQueues的代碼如下

private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]

將receivedBlockInfo放入隊(duì)列后朱巨,返回writeResult(就是true或false),代表元數(shù)據(jù)被接收成功或失敗史翘。

  1. receivedBlockInfo已經(jīng)被放入到隊(duì)列中了,那么在什么時(shí)候被使用了呢冀续?我們在job的動態(tài)生成的時(shí)候好像看到過琼讽,看JobGenerator的generateJobs方法里有這樣一行代碼,代碼如下
// allocate received blocks to batch
// 分配接收到的數(shù)據(jù)給batch
jobScheduler.receiverTracker.allocateBlocksToBatch(time) 

看receiverTracker的allocateBlocksToBatch方法洪唐,代碼如下

def allocateBlocksToBatch(batchTime: Time): Unit = {
    if (receiverInputStreams.nonEmpty) {
      receivedBlockTracker.allocateBlocksToBatch(batchTime)
    }
}

這里調(diào)用了receivedBlockTracker的allocateBlocksToBatch(batchTime)方法跨琳,接著看allocateBlocksToBatch的代碼

 /**
   * Allocate all unallocated blocks to the given batch.
   * This event will get written to the write ahead log (if enabled).
   * 分配所有示分配的blocks給batch
   */
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
      val streamIdToBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
      }.toMap
      val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
        timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
        lastAllocatedBatchTime = batchTime
      } else {
        logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
      }
    } else {
      // This situation occurs when:
      // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
      // possibly processed batch job or half-processed batch job need to be processed again,
      // so the batchTime will be equal to lastAllocatedBatchTime.
      // 2. Slow checkpointing makes recovered batch time older than WAL recovered
      // lastAllocatedBatchTime.
      // This situation will only occurs in recovery time.
      logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
    }
}

獲取streamIdToBlocks:Map[Int,Seq[ReceiverBlockInfo]],從streamIdToUnallocatedBlockQueues中獲取每一個(gè)receiver對應(yīng)的ReceiverBlockInfo列表。
在writeToLog方法桐罕,判斷如果是WAL方式脉让,就寫日志桂敛,否則直接返回true。
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)這行代碼將allocatedBlocks所有receiver接收的元數(shù)據(jù)按時(shí)間保存到timeToAllocatedBlocks中溅潜,然后更新lastAllocatedBatchTime

  1. 那么timeToAllocatedBlocks中的數(shù)據(jù)在什么時(shí)候被獲取的术唬,我們想一下,timeToAllocatedBlocks在job生成的時(shí)候需要填充數(shù)據(jù)滚澜,數(shù)據(jù)是在RDD中被使用的粗仓,所以猜想是在創(chuàng)建RDD的時(shí)候用到了timeToAllocatedBlocks。
    找第一個(gè)RDD,就是BlockRDD设捐,在ReceiverInputDStream的compute方法中找到了timeToAllocatedBlocks的使用借浊,代碼如下
override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        val receiverTracker = ssc.scheduler.receiverTracker
        // 根據(jù)時(shí)間獲取所有receiver接收數(shù)據(jù)的元數(shù)據(jù)列表
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

        // Register the input blocks information into InputInfoTracker
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

        // Create the BlockRDD
        createBlockRDD(validTime, blockInfos)
      }
    }
    Some(blockRDD)
}

跟蹤receiverTracker.getBlocksOfBatch這個(gè)方法,代碼如下

def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = {
    receivedBlockTracker.getBlocksOfBatch(batchTime)
}

接著看getBlocksOfBatch方法

def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = synchronized {
    timeToAllocatedBlocks.get(batchTime).map { _.streamIdToAllocatedBlocks }.getOrElse(Map.empty)
}

終于看到了timeToAllocatedBlocks被使用

  1. 再看一個(gè)ReceiverTracker的stop方法
    代碼如下
def stop(graceful: Boolean): Unit = synchronized {
    if (isTrackerStarted) {
      // First, stop the receivers
      trackerState = Stopping
      if (!skipReceiverLaunch) {
        // Send the stop signal to all the receivers
        endpoint.askWithRetry[Boolean](StopAllReceivers)

        // Wait for the Spark job that runs the receivers to be over
        // That is, for the receivers to quit gracefully.
        receiverJobExitLatch.await(10, TimeUnit.SECONDS)

        if (graceful) {
          logInfo("Waiting for receiver job to terminate gracefully")
          receiverJobExitLatch.await()
          logInfo("Waited for receiver job to terminate gracefully")
        }

        // Check if all the receivers have been deregistered or not
        val receivers = endpoint.askWithRetry[Seq[Int]](AllReceiverIds)
        if (receivers.nonEmpty) {
          logWarning("Not all of the receivers have deregistered, " + receivers)
        } else {
          logInfo("All of the receivers have deregistered successfully")
        }
      }

      // Finally, stop the endpoint
      ssc.env.rpcEnv.stop(endpoint)
      endpoint = null
      receivedBlockTracker.stop()
      logInfo("ReceiverTracker stopped")
      trackerState = Stopped
    }
}

向endpoint發(fā)送一條停止所有receiver的消息StopAllReceivers萝招,看接收到消息是怎樣處理的蚂斤,代碼如下

case StopAllReceivers =>
    assert(isTrackerStopping || isTrackerStopped)
    stopReceivers()
    context.reply(true)

接著看stopReceivers()方法,代碼如下

private def stopReceivers() {
      receiverTrackingInfos.values.flatMap(_.endpoint).foreach { _.send(StopReceiver) }
      logInfo("Sent stop signal to all " + receiverTrackingInfos.size + " receivers")
}

向每一個(gè)receiver發(fā)送一條StopReceiver消息槐沼,看ReceiverSupervisorImpl中的endpoint接收消息后的邏輯代碼

case StopReceiver =>
      logInfo("Received stop signal")
      ReceiverSupervisorImpl.this.stop("Stopped by driver", None)

調(diào)用了ReceiverSupervisorImpl的stop方法曙蒸,stop方法代碼如下

def stop(message: String, error: Option[Throwable]) {
        stoppingError = error.orNull
        stopReceiver(message, error)
        onStop(message, error)
        futureExecutionContext.shutdownNow()
        stopLatch.countDown()
}

先看stopReceiver方法,代碼如下

def stopReceiver(message: String, error: Option[Throwable]): Unit = synchronized {
    try {
      logInfo("Stopping receiver with message: " + message + ": " + error.getOrElse(""))
      receiverState match {
        case Initialized =>
          logWarning("Skip stopping receiver because it has not yet stared")
        case Started =>
          receiverState = Stopped
          receiver.onStop()
          logInfo("Called receiver onStop")
          onReceiverStop(message, error)
        case Stopped =>
          logWarning("Receiver has been stopped")
      }
    } catch {
      case NonFatal(t) =>
        logError("Error stopping receiver " + streamId + t.getStackTraceString)
    }
}

第一調(diào)用了receiver的stop方法receiver.onStop()岗钩,看一下KafkaReceiver的onStop()方法纽窟,代碼如下

def onStop() {
        if (consumerConnector != null) {
          consumerConnector.shutdown()
          consumerConnector = null
        }
}

關(guān)閉了consumer的連接,就是停止接收數(shù)據(jù)
第二調(diào)用onReceiverStop兼吓,看ReceiverSupervisor的子類ReceiverSupervisorImpl的onReceiverStop方法臂港,代碼如下

override protected def onReceiverStop(message: String, error: Option[Throwable]) {
    logInfo("Deregistering receiver " + streamId)
    val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
    trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, message, errorString))
    logInfo("Stopped receiver " + streamId)
}

向trackerEndpoint發(fā)送了一條 注銷receiver的消息DeregisterReceiver。
再看onStop()方法视搏,在ReceiverSupervisor的子類ReceiverSupervisorImpl的onReceiverStop方法审孽,代碼如下

override protected def onStop(message: String, error: Option[Throwable]) {
    registeredBlockGenerators.foreach { _.stop() }
    env.rpcEnv.stop(endpoint)
}

調(diào)用了每一個(gè)BlockGenerator的stop方法,stop方法代碼如下

def stop(): Unit = {
    // Set the state to stop adding data
    synchronized {
      if (state == Active) {
        state = StoppedAddingData
      } else {
        logWarning(s"Cannot stop BlockGenerator as its not in the Active state [state = $state]")
        return
      }
    }

    // Stop generating blocks and set the state for block pushing thread to start draining the queue
    logInfo("Stopping BlockGenerator")
    blockIntervalTimer.stop(interruptTimer = false)
    synchronized { state = StoppedGeneratingBlocks }

    // Wait for the queue to drain and mark generated as stopped
    logInfo("Waiting for block pushing thread to terminate")
    blockPushingThread.join()
    synchronized { state = StoppedAll }
    logInfo("Stopped BlockGenerator")
}

主要是定時(shí)器的停止blockIntervalTimer.stop凶朗,blockIntervalTimer在上一講有具體的作用講解

  1. ReceiverTracker的其他消息瓷胧,以后再繼
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市棚愤,隨后出現(xiàn)的幾起案子搓萧,更是在濱河造成了極大的恐慌,老刑警劉巖宛畦,帶你破解...
    沈念sama閱讀 211,042評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件瘸洛,死亡現(xiàn)場離奇詭異,居然都是意外死亡次和,警方通過查閱死者的電腦和手機(jī)反肋,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評論 2 384
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來踏施,“玉大人石蔗,你說我怎么就攤上這事罕邀。” “怎么了养距?”我有些...
    開封第一講書人閱讀 156,674評論 0 345
  • 文/不壞的土叔 我叫張陵诉探,是天一觀的道長。 經(jīng)常有香客問我棍厌,道長肾胯,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,340評論 1 283
  • 正文 為了忘掉前任耘纱,我火速辦了婚禮敬肚,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘束析。我一直安慰自己艳馒,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,404評論 5 384
  • 文/花漫 我一把揭開白布畸陡。 她就那樣靜靜地躺著鹰溜,像睡著了一般虽填。 火紅的嫁衣襯著肌膚如雪丁恭。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,749評論 1 289
  • 那天斋日,我揣著相機(jī)與錄音牲览,去河邊找鬼。 笑死恶守,一個(gè)胖子當(dāng)著我的面吹牛第献,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播兔港,決...
    沈念sama閱讀 38,902評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼庸毫,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了衫樊?” 一聲冷哼從身側(cè)響起飒赃,我...
    開封第一講書人閱讀 37,662評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎科侈,沒想到半個(gè)月后载佳,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,110評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡臀栈,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年蔫慧,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片权薯。...
    茶點(diǎn)故事閱讀 38,577評論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡姑躲,死狀恐怖睡扬,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情黍析,我是刑警寧澤威蕉,帶...
    沈念sama閱讀 34,258評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站橄仍,受9級特大地震影響韧涨,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜侮繁,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,848評論 3 312
  • 文/蒙蒙 一虑粥、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧宪哩,春花似錦娩贷、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至品抽,卻和暖如春储笑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背圆恤。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評論 1 264
  • 我被黑心中介騙來泰國打工突倍, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人盆昙。 一個(gè)月前我還...
    沈念sama閱讀 46,271評論 2 360
  • 正文 我出身青樓羽历,卻偏偏與公主長得像,于是被迫代替她去往敵國和親淡喜。 傳聞我的和親對象是個(gè)殘疾皇子秕磷,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,452評論 2 348

推薦閱讀更多精彩內(nèi)容