本篇內(nèi)容從ReceiverTracker消息通訊的角度來研究ReceiverTracker的源碼
- 在第10篇中介紹過Receiver的啟動伦连、注冊彻坛、數(shù)據(jù)匯報(bào),接著第10篇的內(nèi)容看。從ReceiverSupervisorImpl的pushAndReportBlock方法開始向瓷,代碼如下
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
向trackerEndpoint匯報(bào)AddBlock消息扔字,blockInfo只是一個(gè)簡單的case class征唬,代碼如下
private[streaming] case class ReceivedBlockInfo(
streamId: Int,
numRecords: Option[Long],
metadataOption: Option[Any],
blockStoreResult: ReceivedBlockStoreResult
) {
require(numRecords.isEmpty || numRecords.get >= 0, "numRecords must not be negative")
@volatile private var _isBlockIdValid = true
def blockId: StreamBlockId = blockStoreResult.blockId
def walRecordHandleOption: Option[WriteAheadLogRecordHandle] = {
blockStoreResult match {
case walStoreResult: WriteAheadLogBasedStoreResult => Some(walStoreResult.walRecordHandle)
case _ => None
}
}
/** Is the block ID valid, that is, is the block present in the Spark executors. */
def isBlockIdValid(): Boolean = _isBlockIdValid
/**
* Set the block ID as invalid. This is useful when it is known that the block is not present
* in the Spark executors.
* 當(dāng)block在Executors中不存在時(shí),將block ID 設(shè)置為無效的
*/
def setBlockIdInvalid(): Unit = {
_isBlockIdValid = false
}
}
里面沒什么信息丢烘,看ReceivedBlockStoreResult,代碼如下
private[streaming] trait ReceivedBlockStoreResult {
// Any implementation of this trait will store a block id
def blockId: StreamBlockId
// Any implementation of this trait will have to return the number of records
def numRecords: Option[Long]
}
他只是一個(gè)接口些椒,看他的兩個(gè)子類播瞳,分別是WriteAheadLogBasedStoreResult和BlockManagerBasedStoreResult。在WriteAheadLogBasedStoreResult類中多了一個(gè)WriteAheadLogRecordHandle免糕。
- 看ReceiverTrackerEndpoint中的receiveAndReply是怎樣接收AddBlock消息的赢乓,代碼如下
case AddBlock(receivedBlockInfo) =>
if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
walBatchingThreadPool.execute(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
if (active) {
context.reply(addBlock(receivedBlockInfo))
} else {
throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
}
}
})
} else {
context.reply(addBlock(receivedBlockInfo))
}
首先判斷是否采用WAL的方法保存元數(shù)據(jù),默認(rèn)為true说墨。如果是WAL的方式存儲骏全,WAL采用了一個(gè)線程池來處理操作苍柏。兩種方法最終都是調(diào)用addBlock(receivedBlockInfo)方法尼斧,addBlock的代碼如下
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
receivedBlockTracker.addBlock(receivedBlockInfo)
}
這里什么也沒做就把任務(wù)交給了receivedBlockTracker,ReceivedBlockTracker在ReceiverTrack實(shí)例化的時(shí)候被創(chuàng)建试吁」卓茫看他的addBlock方法,代碼如下
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
try {
val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
if (writeResult) {
synchronized {
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
}
logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
s"block ${receivedBlockInfo.blockStoreResult.blockId}")
} else {
logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
}
writeResult
} catch {
case NonFatal(e) =>
logError(s"Error adding block $receivedBlockInfo", e)
false
}
}
首先調(diào)用writeToLog方法熄捍,將receivedBlockInfo放到BlockAdditionEvent類中烛恤,傳遞進(jìn)去 ,writeToLog的代碼如下
private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
if (isWriteAheadLogEnabled) {
logTrace(s"Writing record: $record")
try {
writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
clock.getTimeMillis())
true
} catch {
case NonFatal(e) =>
logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
false
}
} else {
true
}
}
如果是WAL的方式余耽,就是把record序列化后存儲缚柏,返回操作結(jié)果true,否則直接返回true。
回到上面的判斷if (writeResult)碟贾,將receivedBlockInfo放入到getReceivedBlockQueue隊(duì)列中币喧,看一下getReceivedBlockQueue的代碼
private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
}
先從streamIdToUnallocatedBlockQueues中獲取ReceivedBlockQueue隊(duì)列,如果沒有放一個(gè)新的ReceivedBlockQueue袱耽,取到隊(duì)列后將receivedBlockInfo放入隊(duì)列杀餐。每一個(gè)receiver對應(yīng)一個(gè)自己的隊(duì)列,streamIdToUnallocatedBlockQueues的代碼如下
private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
將receivedBlockInfo放入隊(duì)列后朱巨,返回writeResult(就是true或false),代表元數(shù)據(jù)被接收成功或失敗史翘。
- receivedBlockInfo已經(jīng)被放入到隊(duì)列中了,那么在什么時(shí)候被使用了呢冀续?我們在job的動態(tài)生成的時(shí)候好像看到過琼讽,看JobGenerator的generateJobs方法里有這樣一行代碼,代碼如下
// allocate received blocks to batch
// 分配接收到的數(shù)據(jù)給batch
jobScheduler.receiverTracker.allocateBlocksToBatch(time)
看receiverTracker的allocateBlocksToBatch方法洪唐,代碼如下
def allocateBlocksToBatch(batchTime: Time): Unit = {
if (receiverInputStreams.nonEmpty) {
receivedBlockTracker.allocateBlocksToBatch(batchTime)
}
}
這里調(diào)用了receivedBlockTracker的allocateBlocksToBatch(batchTime)方法跨琳,接著看allocateBlocksToBatch的代碼
/**
* Allocate all unallocated blocks to the given batch.
* This event will get written to the write ahead log (if enabled).
* 分配所有示分配的blocks給batch
*/
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
}
} else {
// This situation occurs when:
// 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
// possibly processed batch job or half-processed batch job need to be processed again,
// so the batchTime will be equal to lastAllocatedBatchTime.
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
}
}
獲取streamIdToBlocks:Map[Int,Seq[ReceiverBlockInfo]],從streamIdToUnallocatedBlockQueues中獲取每一個(gè)receiver對應(yīng)的ReceiverBlockInfo列表。
在writeToLog方法桐罕,判斷如果是WAL方式脉让,就寫日志桂敛,否則直接返回true。
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)這行代碼將allocatedBlocks所有receiver接收的元數(shù)據(jù)按時(shí)間保存到timeToAllocatedBlocks中溅潜,然后更新lastAllocatedBatchTime
- 那么timeToAllocatedBlocks中的數(shù)據(jù)在什么時(shí)候被獲取的术唬,我們想一下,timeToAllocatedBlocks在job生成的時(shí)候需要填充數(shù)據(jù)滚澜,數(shù)據(jù)是在RDD中被使用的粗仓,所以猜想是在創(chuàng)建RDD的時(shí)候用到了timeToAllocatedBlocks。
找第一個(gè)RDD,就是BlockRDD设捐,在ReceiverInputDStream的compute方法中找到了timeToAllocatedBlocks的使用借浊,代碼如下
override def compute(validTime: Time): Option[RDD[T]] = {
val blockRDD = {
if (validTime < graph.startTime) {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// driver failure without any write ahead log to recover pre-failure data.
new BlockRDD[T](ssc.sc, Array.empty)
} else {
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
// for this batch
val receiverTracker = ssc.scheduler.receiverTracker
// 根據(jù)時(shí)間獲取所有receiver接收數(shù)據(jù)的元數(shù)據(jù)列表
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
// Register the input blocks information into InputInfoTracker
val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
// Create the BlockRDD
createBlockRDD(validTime, blockInfos)
}
}
Some(blockRDD)
}
跟蹤receiverTracker.getBlocksOfBatch這個(gè)方法,代碼如下
def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = {
receivedBlockTracker.getBlocksOfBatch(batchTime)
}
接著看getBlocksOfBatch方法
def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = synchronized {
timeToAllocatedBlocks.get(batchTime).map { _.streamIdToAllocatedBlocks }.getOrElse(Map.empty)
}
終于看到了timeToAllocatedBlocks被使用
- 再看一個(gè)ReceiverTracker的stop方法
代碼如下
def stop(graceful: Boolean): Unit = synchronized {
if (isTrackerStarted) {
// First, stop the receivers
trackerState = Stopping
if (!skipReceiverLaunch) {
// Send the stop signal to all the receivers
endpoint.askWithRetry[Boolean](StopAllReceivers)
// Wait for the Spark job that runs the receivers to be over
// That is, for the receivers to quit gracefully.
receiverJobExitLatch.await(10, TimeUnit.SECONDS)
if (graceful) {
logInfo("Waiting for receiver job to terminate gracefully")
receiverJobExitLatch.await()
logInfo("Waited for receiver job to terminate gracefully")
}
// Check if all the receivers have been deregistered or not
val receivers = endpoint.askWithRetry[Seq[Int]](AllReceiverIds)
if (receivers.nonEmpty) {
logWarning("Not all of the receivers have deregistered, " + receivers)
} else {
logInfo("All of the receivers have deregistered successfully")
}
}
// Finally, stop the endpoint
ssc.env.rpcEnv.stop(endpoint)
endpoint = null
receivedBlockTracker.stop()
logInfo("ReceiverTracker stopped")
trackerState = Stopped
}
}
向endpoint發(fā)送一條停止所有receiver的消息StopAllReceivers萝招,看接收到消息是怎樣處理的蚂斤,代碼如下
case StopAllReceivers =>
assert(isTrackerStopping || isTrackerStopped)
stopReceivers()
context.reply(true)
接著看stopReceivers()方法,代碼如下
private def stopReceivers() {
receiverTrackingInfos.values.flatMap(_.endpoint).foreach { _.send(StopReceiver) }
logInfo("Sent stop signal to all " + receiverTrackingInfos.size + " receivers")
}
向每一個(gè)receiver發(fā)送一條StopReceiver消息槐沼,看ReceiverSupervisorImpl中的endpoint接收消息后的邏輯代碼
case StopReceiver =>
logInfo("Received stop signal")
ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
調(diào)用了ReceiverSupervisorImpl的stop方法曙蒸,stop方法代碼如下
def stop(message: String, error: Option[Throwable]) {
stoppingError = error.orNull
stopReceiver(message, error)
onStop(message, error)
futureExecutionContext.shutdownNow()
stopLatch.countDown()
}
先看stopReceiver方法,代碼如下
def stopReceiver(message: String, error: Option[Throwable]): Unit = synchronized {
try {
logInfo("Stopping receiver with message: " + message + ": " + error.getOrElse(""))
receiverState match {
case Initialized =>
logWarning("Skip stopping receiver because it has not yet stared")
case Started =>
receiverState = Stopped
receiver.onStop()
logInfo("Called receiver onStop")
onReceiverStop(message, error)
case Stopped =>
logWarning("Receiver has been stopped")
}
} catch {
case NonFatal(t) =>
logError("Error stopping receiver " + streamId + t.getStackTraceString)
}
}
第一調(diào)用了receiver的stop方法receiver.onStop()岗钩,看一下KafkaReceiver的onStop()方法纽窟,代碼如下
def onStop() {
if (consumerConnector != null) {
consumerConnector.shutdown()
consumerConnector = null
}
}
關(guān)閉了consumer的連接,就是停止接收數(shù)據(jù)
第二調(diào)用onReceiverStop兼吓,看ReceiverSupervisor的子類ReceiverSupervisorImpl的onReceiverStop方法臂港,代碼如下
override protected def onReceiverStop(message: String, error: Option[Throwable]) {
logInfo("Deregistering receiver " + streamId)
val errorString = error.map(Throwables.getStackTraceAsString).getOrElse("")
trackerEndpoint.askWithRetry[Boolean](DeregisterReceiver(streamId, message, errorString))
logInfo("Stopped receiver " + streamId)
}
向trackerEndpoint發(fā)送了一條 注銷receiver的消息DeregisterReceiver。
再看onStop()方法视搏,在ReceiverSupervisor的子類ReceiverSupervisorImpl的onReceiverStop方法审孽,代碼如下
override protected def onStop(message: String, error: Option[Throwable]) {
registeredBlockGenerators.foreach { _.stop() }
env.rpcEnv.stop(endpoint)
}
調(diào)用了每一個(gè)BlockGenerator的stop方法,stop方法代碼如下
def stop(): Unit = {
// Set the state to stop adding data
synchronized {
if (state == Active) {
state = StoppedAddingData
} else {
logWarning(s"Cannot stop BlockGenerator as its not in the Active state [state = $state]")
return
}
}
// Stop generating blocks and set the state for block pushing thread to start draining the queue
logInfo("Stopping BlockGenerator")
blockIntervalTimer.stop(interruptTimer = false)
synchronized { state = StoppedGeneratingBlocks }
// Wait for the queue to drain and mark generated as stopped
logInfo("Waiting for block pushing thread to terminate")
blockPushingThread.join()
synchronized { state = StoppedAll }
logInfo("Stopped BlockGenerator")
}
主要是定時(shí)器的停止blockIntervalTimer.stop凶朗,blockIntervalTimer在上一講有具體的作用講解
- ReceiverTracker的其他消息瓷胧,以后再繼