在上一篇中介紹了Receiver在Driver的精妙實(shí)現(xiàn),本篇內(nèi)容主要介紹Receiver在Executor中的啟動(dòng)蚣驼,數(shù)據(jù)接收和存儲(chǔ)
- 從ReceiverTracker的start方法開始,調(diào)用launchReceivers()方法,給endpoint發(fā)送消息匣屡,endpoint.send(StartAllReceivers(receivers)),endpoint就是ReceiverTrackerEndpoint拇涤,也可以說是給自己的消息通訊體發(fā)送了一條消息捣作。看接收到的消息
case StartAllReceivers(receivers) =>
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
// 循環(huán)啟動(dòng)receiver
for (receiver <- receivers) {
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
//啟動(dòng)receiver
startReceiver(receiver, executors)
}
startReceiver(receiver, executors)循環(huán)調(diào)用鹅士,每一個(gè)receiver會(huì)啟動(dòng)一個(gè)job券躁。startReceiver的代碼如下
private def startReceiver(
receiver: Receiver[_],
scheduledLocations: Seq[TaskLocation]): Unit = {
def shouldStartReceiver: Boolean = {
// It's okay to start when trackerState is Initialized or Started
!(isTrackerStopping || isTrackerStopped)
}
val receiverId = receiver.streamId
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
return
}
val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf = new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// Function to start the receiver on the worker node
// 在worker節(jié)點(diǎn)啟動(dòng)receiver的方法,(就是action中的方法)
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException("Could not start receiver as object not found.")
}
//判斷task的重試次數(shù)為0如绸,就是沒有task失敗后嘱朽,重試運(yùn)行不執(zhí)行以下代碼
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
//這里創(chuàng)建接收器管理者,在start方法里啟動(dòng)receiver接收數(shù)據(jù)
val supervisor = new ReceiverSupervisorImpl(receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}
// Create the RDD using the scheduledLocations to run the receiver in a Spark job
// 創(chuàng)建接收數(shù)據(jù)的RDD
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
//
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
// 根據(jù)數(shù)據(jù)本地性創(chuàng)建receiverRDD
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
// 對(duì)job進(jìn)行一些配置
receiverRDD.setName(s"Receiver $receiverId")
ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
// 到這里就提交了receiverRDD到集群中
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
case Success(_) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
// 重啟receiver
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.", e)
logInfo(s"Restarting Receiver $receiverId")
// 重啟receiver
self.send(RestartReceiver(receiver))
}
}(submitJobThreadPool)
logInfo(s"Receiver ${receiver.streamId} started")
}
在startReceiverFunc函數(shù)中定義了從iterator中取一條記錄怔接,也就是receiver,然后實(shí)例化一個(gè)ReceiverSupervisorImpl,把receiver傳遞進(jìn)入搪泳,然后調(diào)用ReceiverSupervisorImpl的start方法。當(dāng)然這里并沒有啟動(dòng)ReceiverSupervisorImpl扼脐,只是定義了操作而已岸军,真正的執(zhí)行是在Executor中奋刽。
然后提交ReceiverRDD到集群運(yùn)行,代碼如下
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
- 通過startReceiverFunc函數(shù) 來看ReceiverSupervisorImpl在Executor上的運(yùn)行艰赞。
從supervisor.start()開始佣谐,start方法代碼如下
def start() {
onStart()
startReceiver()
}
onStart方法代碼如下
/**
* Called when supervisor is started.
* Note that this must be called before the receiver.onStart() is called to ensure
* things like [[BlockGenerator]]s are started before the receiver starts sending data.
*/
protected def onStart() { }
重點(diǎn)是看onStart的注釋,注釋內(nèi)容說在receiver.onStart()之前方妖,必須BlockGenerator先啟動(dòng)狭魂,以保證接收到的數(shù)據(jù)能夠被存儲(chǔ)起來〉趁伲看onStart方法的子類實(shí)現(xiàn)雌澄,代碼如下
override protected def onStart() {
registeredBlockGenerators.foreach { _.start() }
}
registeredBlockGenerators在ReceiverSupervisorImpl實(shí)例化的時(shí)候創(chuàng)建,代碼如下
private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator]
registeredBlockGenerators在createBlockGenerator方法中添加了BlockGenerator杯瞻,代碼如下
override def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
// Cleanup BlockGenerators that have already been stopped
registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }
// 每一個(gè)receiver創(chuàng)建一個(gè)BlockGenerator镐牺,因?yàn)閟treamId一一對(duì)應(yīng)receiver
val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
registeredBlockGenerators += newBlockGenerator
newBlockGenerator
}
那么createBlockGenerator在什么時(shí)候被調(diào)用呢?看代碼
private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)
registeredBlockGenerators的BlockGenerator已經(jīng)有了魁莉,看BlockGenerator的start()方法,代碼如下
def start(): Unit = synchronized {
if (state == Initialized) {
state = Active
blockIntervalTimer.start()
blockPushingThread.start()
logInfo("Started BlockGenerator")
} else {
throw new SparkException(
s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
}
}
這里啟動(dòng)了blockIntervalTimer和blockPushingThread睬涧,blockIntervalTimer就是一個(gè)定時(shí)器,默認(rèn)每200ms回調(diào)一下updateCurrentBuffer方法旗唁,回調(diào)時(shí)間通過參數(shù)spark.streaming.blockInterval設(shè)置畦浓,這也是一個(gè)性能調(diào)優(yōu)的參數(shù),時(shí)間過短太造成block碎片太多检疫,時(shí)間過長(zhǎng)可能導(dǎo)致block塊過大宅粥,具體時(shí)間長(zhǎng)短要根據(jù)實(shí)際業(yè)務(wù)而定,updateCurrentBuffer方法作用就是將接收到的數(shù)據(jù)包裝到block存儲(chǔ)电谣,代碼后面再看;blockPushingThread作用是定時(shí)從blocksForPushing隊(duì)列中取block,然后存儲(chǔ)抹蚀,并向ReceiverTrackerEndpoint匯報(bào)剿牺,代碼后面再看
- BlockGenerator啟動(dòng)之后接著看 supervisor.start()方法中的 startReceiver()方法, startReceiver()代碼如下
def startReceiver(): Unit = synchronized {
try {
if (onReceiverStart()) {
logInfo("Starting receiver")
receiverState = Started
receiver.onStart()
logInfo("Called receiver onStart")
} else {
// The driver refused us
stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
}
} catch {
case NonFatal(t) =>
stop("Error starting receiver " + streamId, Some(t))
}
}
首先判斷onReceiverStart()的返回值环壤,onReceiverStart()代碼在子類中的實(shí)現(xiàn)如下
override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
trackerEndpoint.askWithRetry[Boolean](msg)
}
onReceiverStart內(nèi)部向trackerEndpoint發(fā)送了一條RegisterReceiver注冊(cè)receiver的消息晒来,在trackerEndpoint內(nèi)部收到消息后,將注冊(cè)信息包裝到一個(gè)ReceiverTrackingInfo的case class類中郑现,然后把ReceiverTrackingInfo按照k-v的方式put到receiverTrackingInfos中湃崩,key就是streamId,再次說明一個(gè)inputDstream對(duì)應(yīng)一個(gè)receiver接箫。
回到上面的調(diào)用返回true,將receiverState 標(biāo)記為Started攒读,然后調(diào)用了receiver的onStart方法。
- 以SocketReceiver為例辛友,看SocketReceiver的onStart方法 ,啟動(dòng)了一條后臺(tái)線程薄扁,調(diào)用receive()方法接收數(shù)據(jù)剪返,代碼如下
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}
接著看receive()方法,代碼如下
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
if (!isStopped()) {
restart("Socket data stream had no more data")
} else {
logInfo("Stopped receiving")
}
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case NonFatal(e) =>
logWarning("Error receiving data", e)
restart("Error receiving data", e)
} finally {
if (socket != null) {
socket.close()
logInfo("Closed socket to " + host + ":" + port)
}
}
}
receiver方法的內(nèi)容就很簡(jiǎn)單了邓梅,啟動(dòng)一個(gè)socket接收數(shù)據(jù)脱盲,接收一行就調(diào)用store方法存儲(chǔ)起來,store方法的代碼如下
def store(dataItem: T) {
supervisor.pushSingle(dataItem)
}
調(diào)用supervisor的pushSingle方法日缨,supervisor就是ReceiverSupervisor的實(shí)現(xiàn)類ReceiverSupervisorImpl的方法钱反,代碼如下
def pushSingle(data: Any) {
defaultBlockGenerator.addData(data)
}
defaultBlockGenerator在上面說過,他是ReceiverSupervisorImpl的一個(gè)成員變量匣距,接著看他的addData方法面哥,代碼如下
def addData(data: Any): Unit = {
if (state == Active) {
waitToPush()
synchronized {
if (state == Active) {
currentBuffer += data
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
} else {
throw new SparkException(
"Cannot add data as BlockGenerator has not been started or has been stopped")
}
}
currentBuffer += data,在currentBuffer 上不斷的累加數(shù)據(jù),那么currentBuffer 的數(shù)據(jù)是怎樣存儲(chǔ)起來的呢墨礁,這時(shí)候就用到了前面介紹的 blockIntervalTimer和blockPushingThread
- 首先看blockIntervalTimer定時(shí)回調(diào)的updateCurrentBuffer()方法幢竹,代碼如下
private def updateCurrentBuffer(time: Long): Unit = {
try {
var newBlock: Block = null
synchronized {
if (currentBuffer.nonEmpty) {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
listener.onGenerateBlock(blockId)
newBlock = new Block(blockId, newBlockBuffer)
}
}
if (newBlock != null) {
blocksForPushing.put(newBlock) // put is blocking when queue is full
}
} catch {
case ie: InterruptedException =>
logInfo("Block updating timer thread was interrupted")
case e: Exception =>
reportError("Error in block updating thread", e)
}
}
將currentBuffer交給newBlockBuffer ,然后實(shí)例化一個(gè)空的ArrayBuffer給currentBuffer恩静,接著實(shí)例化一個(gè)Block把newBlockBuffer 傳遞進(jìn)去焕毫,最后把newBlock 放入到blocksForPushing隊(duì)列中
- 接下來就是blockPushingThread干的活了,在blockPushingThread線程中調(diào)用keepPushingBlocks方法驶乾,代碼如下
private def keepPushingBlocks() {
logInfo("Started block pushing thread")
def areBlocksBeingGenerated: Boolean = synchronized {
state != StoppedGeneratingBlocks
}
try {
// While blocks are being generated, keep polling for to-be-pushed blocks and push them.
while (areBlocksBeingGenerated) {
Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>
}
}
// At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.
logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
while (!blocksForPushing.isEmpty) {
val block = blocksForPushing.take()
logDebug(s"Pushing block $block")
pushBlock(block)
logInfo("Blocks left to push " + blocksForPushing.size())
}
logInfo("Stopped block pushing thread")
} catch {
case ie: InterruptedException =>
logInfo("Block pushing thread was interrupted")
case e: Exception =>
reportError("Error in block pushing thread", e)
}
}
從blocksForPushing隊(duì)列中定時(shí)取出block然后pushBlock邑飒,代碼如下
Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>
}
接著看pushBlock(block)方法,代碼如下
listener.onPushBlock(block.id, block.buffer)
這里調(diào)用了listener的onPushBlock方法级乐,那么listener是從哪來的疙咸,查詢一下listener變量,listener是在BlockGenerator實(shí)例化的時(shí)候傳遞進(jìn)來的风科,找BlockGenerator的實(shí)例化撒轮,是通過createBlockGenerator方法接收的參數(shù)并傳遞給BlockGenerator。找createBlockGenerator方法的調(diào)用贼穆,終于看到了defaultBlockGeneratorListener的實(shí)例化题山,代碼如下
private val defaultBlockGeneratorListener = new BlockGeneratorListener {
def onAddData(data: Any, metadata: Any): Unit = { }
def onGenerateBlock(blockId: StreamBlockId): Unit = { }
def onError(message: String, throwable: Throwable) {
reportError(message, throwable)
}
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
pushArrayBuffer(arrayBuffer, None, Some(blockId))
}
}
原來onPushBlock方法在這里,看pushArrayBuffer的調(diào)用 故痊,pushArrayBuffer方法的代碼如下
def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
}
重磅性的一行代碼出現(xiàn)了 pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption),代碼如下
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
這里面做了幾事件事顶瞳,第一調(diào)用receivedBlockHandler來存儲(chǔ)block
第二向trackerEndpoint匯報(bào)block的存儲(chǔ)結(jié)果blockInfo
- receivedBlockHandler是在ReceiverSupervisorImpl實(shí)例化的時(shí)候創(chuàng)建的,代碼如下
private val receivedBlockHandler: ReceivedBlockHandler = {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
}
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else {
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
}
}
有兩種類型愕秫,一種的WAL方式慨菱,還有一種普通的方式。WAL的方式以后再看戴甩,這里看BlockManagerBasedBlockHandler符喝,代碼如下
private[streaming] class BlockManagerBasedBlockHandler(
blockManager: BlockManager, storageLevel: StorageLevel)
extends ReceivedBlockHandler with Logging {
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
var numRecords = None: Option[Long]
val putResult: Seq[(BlockId, BlockStatus)] = block match {
case ArrayBufferBlock(arrayBuffer) =>
numRecords = Some(arrayBuffer.size.toLong)
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,tellMaster = true)
case IteratorBlock(iterator) =>
val countIterator = new CountingIterator(iterator)
val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,tellMaster = true)
numRecords = countIterator.count
putResult
case ByteBufferBlock(byteBuffer) =>
blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
case o =>
throw new SparkException(
s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
}
if (!putResult.map { _._1 }.contains(blockId)) {
throw new SparkException(s"Could not store $blockId to block manager with storage level $storageLevel")
}
BlockManagerBasedStoreResult(blockId, numRecords)
}
def cleanupOldBlocks(threshTime: Long) {
// this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing
// of BlockRDDs.
}
}
這里就是借助BlockManager來存儲(chǔ)block并返回block存儲(chǔ)的元數(shù)據(jù),終于看完了receiver的整個(gè)數(shù)據(jù)接收和存儲(chǔ)甜孤。
- 整個(gè)過程還是很清晰的洲劣,如果有張流程圖就最好了备蚓,流程圖以后補(bǔ)上,謝謝