我們關(guān)注的問題是數(shù)據(jù)是怎么被接收的仰坦?又是怎么存儲的驳阎?
數(shù)據(jù)是被executor上的線程receiver接收的捂齐,接收之后交由executor上的線程ReceiverSupervisorImpl處理慧脱。
JobScheduler的重要成員之一登場3曷恕镜廉!ReceiverTrackerE濉!!
ReceiverTracker的簡單介紹齐遵?
ReceiverTracker的目的是為每個batch的RDD提供輸入數(shù)據(jù)寂玲。通過以下三步完成:
- 分發(fā)receiver到executor,啟動接收的線程梗摇。
- 分發(fā)ReceiverSupervisorImpl到executor拓哟,啟動處理數(shù)據(jù)的線程,并掌握數(shù)據(jù)的信息
- 一個job提交了伶授,它是怎么為其提供數(shù)據(jù)進行etl的断序?
++首先看下Receiver是怎么被分發(fā)到各個executor上的++
def start(): Unit = synchronized {
//....
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))//用來接收和處理來自 ReceiverTracker 和 receivers 發(fā)送的消息
if (!skipReceiverLaunch) launchReceivers() //重要!考點C优搿Nナ!將receiver分發(fā)到executers
//.....
}
}
//來景图!具體來看launchReceivers
private def launchReceivers(): Unit = {
val receivers = receiverInputStreams.map {...}//DStreamGraph持有所有的inputDS较雕,獲取到這些inputDS的receiver
endpoint.send(StartAllReceivers(receivers))//拿到receivers后分發(fā)的具體實現(xiàn)
}
override def receive: PartialFunction[Any, Unit] = {
// 確定了每個 receiver 要分發(fā)到哪些 executors
case StartAllReceivers(receivers) =>
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
for (receiver <- receivers) {
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
startReceiver(receiver, executors)
}
//.....
}
private def startReceiver(
receiver: Receiver[_],
scheduledLocations: Seq[TaskLocation]): Unit = {
// Function to start the receiver on the worker node
//重點!考點V勘摇亮蒋!這個函數(shù)會和rdd一起提交,它new了一個ReceiverSupervisorImpl用來具體處理接收的數(shù)據(jù)妆毕,后面會具體講I骶痢!
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)//真正處理接收到的數(shù)據(jù)
supervisor.start()//啟動線程
supervisor.awaitTermination()//重要笛粘!堵塞線程趁怔,源源不斷的從reciver處獲取數(shù)據(jù)!
}
}
// Create the RDD using the scheduledLocations to run the receiver in a Spark job
//重點薪前!考點H笈!這里把recever和location打包成一個rdd了示括,所以recevier可以在多個executor上運行F探健!垛膝!
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
//.....
//提交啦鳍侣!?? 到這里recevier就被分發(fā)到具體的executor上了
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
//....
}
++來,再看一下具體在executor上是怎么實現(xiàn)處理數(shù)據(jù)的吼拥?++
第一部分倚聚,怎么接收數(shù)據(jù)?
recevier被分發(fā)到具體的executor上之后會怎么實現(xiàn)數(shù)據(jù)的處理呢凿可?reciver會調(diào)用supervisor的put方法;笳邸!!也就是說recevier其實只關(guān)心從哪兒接數(shù)據(jù)以及數(shù)據(jù)接過來怎么解析唬复,而并不關(guān)心數(shù)據(jù)怎么存4;H埂敞咧!誰在用!9枷佟休建!
//先看下recevier怎么把數(shù)據(jù)給ReceiverSupervisorImpl,比如KafkaReceiver
class KafkaReceiver(....) extends Receiver[(K, V)](storageLevel) with Logging {
def onStart() {
//去哪兒接收數(shù)據(jù)
// Kafka connection properties
// Create the connection to the cluster
//接收到的數(shù)據(jù)怎么解析
val keyDecoder = ...
val valueDecoder = ...
//線程池接收數(shù)據(jù)
val executorPool = ...
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
}
// 處理接收到的數(shù)據(jù),storeF懒啤2馍啊!這里會調(diào)用supervisor.pushSingle0俅摇F鲂!加匈!
private class MessageHandler(stream: KafkaStream[K, V])
extends Runnable {
def run() {
val streamIterator = stream.iterator()
while (streamIterator.hasNext()) {
val msgAndMetadata = streamIterator.next()
store((msgAndMetadata.key, msgAndMetadata.message))
}
}
}
}
第二部分存璃,那么數(shù)據(jù)接過來了,怎么存儲呢雕拼?這里是ReceiverSupervisorImpl實現(xiàn)的纵东,主要有三個方法:
//put類,會把一條條的數(shù)據(jù)交給BlockGenerator啥寇,匯聚成block
def pushSingle(data: Any) {
defaultBlockGenerator.addData(data)
}
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
//存儲block的具體邏輯
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
//存儲成功之后偎球,發(fā)送新增的blockInfo到ReceiverTracker
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
}
//把每個block通過blockManager存到內(nèi)存/硬盤,同rdd邏輯一致
private val receivedBlockHandler: ReceivedBlockHandler = {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
//wal辑甜,重點衰絮!預(yù)寫!磷醋!防丟數(shù)據(jù)
new WriteAheadLogBasedBlockHandler(env.blockManager, env.serializerManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else {
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
}
}
第三部分猫牡,數(shù)據(jù)怎么被用呢?數(shù)據(jù)被存儲之后告知了ReceiverTracker子檀,但是怎么用呢镊掖?
//ReceiverTracker自己是不管block的,它有一個成員receivedBlockTracker來處理褂痰!它是個老板D督!缩歪!
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
receivedBlockTracker.addBlock(receivedBlockInfo)
}
//注意??定時器JobGenerate在定時提交job的時候會調(diào)用ReceiverTracker的allocateBlocksToBatch方法來把block和batch對應(yīng)起來归薛,可以看到block怎么被分配到batch這個過程是receivedBlockTracker處理的!!
def allocateBlocksToBatch(batchTime: Time): Unit = {
if (receiverInputStreams.nonEmpty) {
receivedBlockTracker.allocateBlocksToBatch(batchTime)
}
}
關(guān)于數(shù)據(jù)被存儲之后主籍,是怎么和rdd關(guān)聯(lián)起來的习贫,更多的內(nèi)容在spark streaming源碼分析之job、rdd千元、blocks之間是如何對應(yīng)的苫昌?