本文基于spark 2.11
1. 前言
在Spark Streaming(1)中介紹spark streaming工作流程中時瘦陈,大致介紹了streaming job在運(yùn)行時從stream中讀取數(shù)據(jù)的流程:
- Receiver運(yùn)行在executor上接收數(shù)據(jù)徽龟,將數(shù)據(jù)轉(zhuǎn)交給ReceiverSupervisor彰触,然后ReceiverSupervisor利用blockmanager存儲好數(shù)據(jù)斋荞,并將數(shù)據(jù)塊信息匯報給ReceiverTracker。
- ReceiverTracker運(yùn)行在Driver上胎撤,接收數(shù)據(jù)塊信息保存,后續(xù)在JobGenerator生成新jobs時分配數(shù)據(jù)作為新jobs的數(shù)據(jù)源断凶。
本文將詳細(xì)介紹上述流程伤提。
2 ReceiverTracker
ReceiverTracker有以下核心成員:
private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
private val receiverInputStreamIds = receiverInputStreams.map { _.id }
private val receivedBlockTracker = ...
- receiverInputStreams,DStreamGraph保存的所有DStream Graph所有的實現(xiàn)了ReceiverInputDStream類DStream,意味著這些DStream持有receiver认烁,能夠發(fā)送在executor啟動執(zhí)行接收流數(shù)據(jù)的任務(wù)肿男。
- 每一個receiver都有一個id,receiver匯報自己的數(shù)據(jù)時一并匯報自己的id却嗡,DStream DAG源頭的ReceiverInputDStream使用receiverTracker獲取屬于數(shù)據(jù)時就根據(jù)自己的id知道應(yīng)該取哪些數(shù)據(jù)次伶。
- receivedBlockTracker,后面會講稽穆。
2.1 ReceiverTracker的啟動
先在Driver端啟動ReceiverTracker冠王,是在JobScheduler啟動時完成,有如下調(diào)用序列:
StreamingContext#start
->JobScheduler#start
->ReceiverTracker#start
下面則是ReceiverTracker的方法:
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers()
logInfo("ReceiverTracker started")
trackerState = Started
}
}
可以看到start代碼中在rpcEnv上注冊了一個Rpc服務(wù)(關(guān)于spark 的Rcp原理可以參考spark網(wǎng)絡(luò)通信-RPC的實現(xiàn))
可以看看它注冊的receiverTrackerEndpoint的receive和receiveAdnreply方法看看它就收什么消息舌镶,提供什么服務(wù):
override def receive: PartialFunction[Any, Unit] = {
// Local messages
case StartAllReceivers(receivers) =>
...
//這個消息是自己發(fā)給自己的柱彻,在executor上啟動receiver
startReceiver(receiver, executors)
}
case RestartReceiver(receiver) =>
...
startReceiver(receiver, scheduledLocations)
case c: CleanupOldBlocks =>
// 處理過了batch數(shù)據(jù)可以清除了
...
case ReportError(streamId, message, error) =>
reportError(streamId, message, error)
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
// Remote messages
case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
...
// receiver在executor上啟動之后會將自己注冊到ReceiverTracker上豪娜,好讓它感知到
case DeregisterReceiver(streamId, message, error) =>
deregisterReceiver(streamId, message, error)
context.reply(true)
// Local messages
case AddBlock(receivedBlockInfo) =>
// 接收receiver上報的數(shù)據(jù)信息
...
case AllReceiverIds =>
context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)
case GetAllReceiverInfo =>
context.reply(receiverTrackingInfos.toMap)
case StopAllReceivers =>
assert(isTrackerStopping || isTrackerStopped)
stopReceivers()
context.reply(true)
}
從上面可以它提供的服務(wù)包括注冊啟動receivers,注冊和銷毀數(shù)據(jù)等哟楷。
在回到start方法中調(diào)用launchReceivers()
啟動receiver瘤载。
2.2 ReceiverTracker 處理receiver匯報的數(shù)據(jù)
ReceiverTracker的rpc服務(wù)接收到AddBlock()消息表示接收到receiver匯報的數(shù)據(jù)信息。
先看看AddBlock消息的結(jié)構(gòu):
// AddBlock消息包含了ReceiverdBlockInfo,這里存儲了receiver上報的數(shù)據(jù)具體信息
private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
// 上報了streamId卖擅,numRecords表示本次batch中記錄數(shù),
// blockStoreResult 有兩個實現(xiàn)類:
//. 1. BlockManagerBasedStoreResult鸣奔,receiver端使用blockmanager管理batch數(shù)據(jù)
// 2. WriteAheadLogBasedStoreResult, receiver端使用了WAL保存了batch數(shù)據(jù)
//. 關(guān)于這兩種方式會在Receiver端時解釋
private[streaming] case class ReceivedBlockInfo(
streamId: Int,
numRecords: Option[Long],
metadataOption: Option[Any],
blockStoreResult: ReceivedBlockStoreResult
) {...}
再回到到rpc服務(wù)接收到AddBlock的處理,進(jìn)入如下調(diào)用序列:
case AddBlock =>. 接收到AddBlock消息
-> ReceiverTracker#addBlock
-> ReceivedBlockTracker#addBlock 使用receivedBlockTracker來管理上報的數(shù)據(jù)
ReceiveBlockTracker
接收到的消息最終時通過ReceivedBlockTracker來管理的惩阶,下面兩個成員涉及到ReceivedBlockTracker管理上報的數(shù)據(jù)信息:
private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
- ReceivedBlockQueue挎狸,定義這個類型,后邊的所有數(shù)據(jù)信息是保存在隊列里的
- streamIdToUnallocatedBlockQueues断楷,首先上報的數(shù)據(jù)是安streamId區(qū)分開來的锨匆,這個隊列保存上報上來的,但是還還沒有分配給某個job的的數(shù)據(jù)冬筒。
- timeToAllocatedBlocks恐锣,JobGenerator生成job時請求receiverTracker分配數(shù)據(jù)給job,receiverTracker調(diào)用ReceiveBlockTracker分配數(shù)據(jù)舞痰,數(shù)據(jù)時間(job生成時間)土榴,streamId索引到,job對應(yīng)的RDD DAG就能根據(jù)時間和streamId從這里去數(shù)據(jù)响牛。
到這里ReceiveBlockTracker的addBlock的工作就清楚了將上報的數(shù)據(jù)保存到streamIdToUnallocatedBlockQueues就行了玷禽。
2.3 ReceiverTracker 為job分配數(shù)據(jù)
Spark Streaming(2)中第3節(jié)介紹JobGenerator生成job是方法generateJobs調(diào)用了receiverTracker.allocateBlocksToBatch
為job分配輸入數(shù)據(jù),分配數(shù)據(jù)的工作同樣委派給ReceiveBlockTracker娃善,下面是其allocateBlocksToBatch方法:
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
// 將streamIdToUnallocatedBlockQueues中的數(shù)據(jù)全部取出來按照streamId區(qū)分
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
// 保存到timeToAllocatedBlocks论衍,job里處于輸入源的DStream根據(jù)自己的時間的streamId取數(shù)據(jù)轉(zhuǎn)換成BlockRDD瑞佩。
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
}
} else {
logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
}
}
3. Receiver和ReceiverSupervisor
2.1中提到ReceiverTracker的start方法調(diào)用launchReceivers啟動receiver, 在receiver啟動之前的調(diào)用是這樣的:
ReceiverTracker#start
->ReceiverTracker#launchReceivers
-> ReceiverTrackerEndpoint#send(StartAllReceivers(receivers))
| 給rpc服務(wù)發(fā)送StartAllReceivers消息
v
rpc服務(wù)收到消息 ReceiverTrackerEndpoint#receive
->ReceiverTracker#startReceiver 在executor上啟動receiver
receiver的獲取
spark streaming(1) 的2.2節(jié)提到ReceiverInputDStream需要返回一個receiver聚磺。
啟動receiver
- launchReceivers 從ReceiverTracker#receiverInputStreams成員中最終獲取到所有receivers,
- 給自己持有的rpc發(fā)送StartAllReceivers消息
- 接收到消息的rpc服務(wù)調(diào)用ReceiverTracker#startReceiver
核心在startReceiver炬丸,下面代碼:
private def startReceiver(
receiver: Receiver[_],
scheduledLocations: Seq[TaskLocation]): Unit = {
def shouldStartReceiver: Boolean = {
// It's okay to start when trackerState is Initialized or Started
!(isTrackerStopping || isTrackerStopped)
}
val receiverId = receiver.streamId
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
return
}
val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf =
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// 這個函數(shù)會在receiver相關(guān)信息發(fā)送到executor上執(zhí)行
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
// 啟動receiverSupervisor
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}
// 創(chuàng)建了RDD瘫寝,RDD的數(shù)據(jù)就是receivers組成的,結(jié)合上面的函數(shù)startReceiverFunc運(yùn)行在RDD的數(shù)據(jù)上稠炬,也就是接收receiver作為參數(shù)運(yùn)行
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
receiverRDD.setName(s"Receiver $receiverId")
ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
// RDD生成job提交運(yùn)行
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
case Success(_) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.", e)
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
}(ThreadUtils.sameThread)
logInfo(s"Receiver ${receiver.streamId} started")
}
根據(jù)spark job提交一文介紹焕阿,最終startReceiverFunc函數(shù)會被包裝成ResultTask運(yùn)行在executor上,而ResultTask會調(diào)用startReceiverFunc完成receiverSupervisor的創(chuàng)建首启。
3.1 Receiver
上 面代碼Receiver的啟動序列是:
ReceiverSupervisor#start
-> ReceiverSupervisor#startReceiver
-> Receiver#onStart
Receiver主要有如下成員和方法:
@transient private var _supervisor: ReceiverSupervisor = null
def storeXXX()
def onStart()
def streamId: Int = id
- _supervisor, Receiver需要將自己接受到的數(shù)據(jù)轉(zhuǎn)給ReceiverSupervisor
- storeXXX,是一系列的方法暮屡, 存儲數(shù)據(jù),內(nèi)部就是調(diào)用ReceiverSupervisor的方法從而將數(shù)據(jù)轉(zhuǎn)給他存儲并匯報給ReceiverTracker毅桃。
- onStart褒纲,receiver啟動是調(diào)用准夷,一般在這里從流中讀數(shù)據(jù)
- streamId, 每一個輸入流唯一的id標(biāo)識
以SocketReceiver為例:
private[streaming]
class SocketReceiver[T: ClassTag](
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends Receiver[T](storageLevel) with Logging {
private var socket: Socket = _
def onStart() {
logInfo(s"Connecting to $host:$port")
try {
socket = new Socket(host, port)
} catch {
case e: ConnectException =>
restart(s"Error connecting to $host:$port", e)
return
}
logInfo(s"Connected to $host:$port")
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}
def onStop() {
// in case restart thread close it twice
synchronized {
if (socket != null) {
socket.close()
socket = null
logInfo(s"Closed socket to $host:$port")
}
}
}
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
try {
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next())
}
if (!isStopped()) {
restart("Socket data stream had no more data")
} else {
logInfo("Stopped receiving")
}
} catch {
case NonFatal(e) =>
logWarning("Error receiving data", e)
restart("Error receiving data", e)
} finally {
onStop()
}
}
}
- onStart方法啟動了后臺線程調(diào)用receive()接收數(shù)據(jù)
- receive方法調(diào)用store方法存入一條數(shù)據(jù)記錄莺掠。
下面是store方法:
def store(dataItem: T) {
// 數(shù)據(jù)交給了ReceiverSupervisor
supervisor.pushSingle(dataItem)
}
3.2 ReceiverSupervisor
ReceiverSupervisor只有一個實現(xiàn)類ReceiverSupervisorImpl衫嵌,它運(yùn)行在executor上,啟動時會一同啟動Receiver彻秆。并將接收到的數(shù)據(jù)存儲起來楔绞,然后將數(shù)據(jù)信息匯報到ReceiverTracker,下面是其主要的方法和屬性:
private val receivedBlockHandler: ReceivedBlockHandler
private val trackerEndpoint
private val endpoint
private val registeredBlockGenerators = new ConcurrentLinkedQueue[BlockGenerator]()
private val defaultBlockGeneratorListener
- receivedBlockHandler
主要有兩種實現(xiàn):
- WriteAheadLogBasedBlockHandler唇兑, 對于receiver轉(zhuǎn)過來的數(shù)據(jù)酒朵,使用WAL的方式保存,當(dāng)出錯重啟時可以從中恢復(fù)幔亥,確背芊恚可靠性。
- BlockManagerBasedBlockHandler帕棉,使用blockmanager來管理數(shù)據(jù)针肥。
WAL的方式的好處是數(shù)據(jù)寫在hdfs中,當(dāng)driver application意外退出是香伴,數(shù)據(jù)也不會丟失慰枕,使用blockmanager的話如果driver application失敗了,或者executor所在node沒了即纲,都有可能導(dǎo)致數(shù)據(jù)丟失具帮。
通過spark.streaming.receiver.writeAheadLog.enable
設(shè)置使用WAL的方式,使用WAL方式時低斋,數(shù)據(jù)同時也會使用blockmanager管理蜂厅。
trackerEndpoint,是由ReceiverTracker 的rpc服務(wù)的引用膊畴,用來和ReceiverTracker通信(Spark Rpc原理參考spark rpc原理)
-
endpoint掘猿,ReceiverSupervisor自身提供的一些rpc服務(wù),接收的消息主要有:
- StopReceiver唇跨, 停止Receiver稠通,ReceiverSupervisor
- 對receivedBlockHandler保存下來的數(shù)據(jù)做一些清除工作
registeredBlockGenerators
有時候receiver每次只上報一條數(shù)據(jù),顯然為一條數(shù)據(jù)創(chuàng)建一個block id取管理是低效的买猖,registeredBlockGenerators就是用來匯集那些一條條上報的數(shù)據(jù)改橘,達(dá)到一定大小后交給ReceiverSupervisor去保存成一個blockdefaultBlockGeneratorListener,這個listener下面講BlockGenerator會講到玉控,BlockGenerator講匯集好的block再轉(zhuǎn)交給ReceiverSupervisor時就是用這個listener會調(diào)完成的飞主。
3.2.1 BlockGenerator
上面4中,ReceiverSupervisor啟動時會默認(rèn)注冊一個defaultBlockGenerator,其類就是BlockGenerator碌识。
這個類有如下一些成員:
private[streaming] class BlockGenerator(
listener: BlockGeneratorListener,
receiverId: Int,
conf: SparkConf,
clock: Clock = new SystemClock()
) extends RateLimiter(conf) with Logging {
...
private val blockIntervalTimer =
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
@volatile private var currentBuffer = new ArrayBuffer[Any]
...
- listener, 創(chuàng)建時由ReceiverSupervisor傳遞的讽挟,也就是上面5中的defaultBlockGeneratorListener
- blockIntervalTimer,前面說blockGenerator用來將一條條轉(zhuǎn)過來的數(shù)據(jù)匯集成一個個block,這個定時器每隔一段時間(blockIntervalMs)匯集一次數(shù)據(jù)
- blocksForPushing丸冕,數(shù)據(jù)被匯集成block后耽梅,先暫存在這里,等待轉(zhuǎn)交給ReceiverSupervisor保存并匯報
- blockPushingThread胖烛,線程不停的講blocksForPushing中的block轉(zhuǎn)交給ReceiverSupervisor
- currentBuffer,receiver發(fā)過來的一條條數(shù)據(jù)先暫時存在這里眼姐,等待blockIntervalTimer匯集一起成block。
下圖描述了數(shù)據(jù)從receiver到ReceiverTracker的流程