在Spark Streaming整個(gè)架構(gòu)體系里面翠勉,數(shù)據(jù)接收其實(shí)最為關(guān)鍵的一個(gè)流程洋侨,在數(shù)據(jù)接收完之后夜牡,后面的數(shù)據(jù)處理上面就是復(fù)用Spark Core的數(shù)據(jù)處理架構(gòu)(包括BlockManager來(lái)管理數(shù)據(jù))窗宦,之后就是”水到渠成”的工作酬姆。所以作為Spark Streaming第一個(gè)章節(jié)勤庐,我們這里先從Receiver的啟動(dòng)流程介紹起走示惊。
Receiver 架構(gòu)
目前整個(gè)Spark Streaming架構(gòu)里面,支持兩種數(shù)據(jù)接收的方式: Receiver, Direct. 區(qū)別在[Spark-kafka-integration]里面有了詳細(xì)介紹愉镰,后續(xù)會(huì)詳細(xì)解讀兩者的區(qū)別米罚。回到Receiver, 我們首要介紹下目前Receiver的架構(gòu)丈探。
在我們自己實(shí)現(xiàn)Receiver里面主要關(guān)注幾個(gè)點(diǎn):
- onStart() 里面實(shí)現(xiàn)接受數(shù)據(jù)的邏輯录择,這里面只需要關(guān)心數(shù)據(jù)如何接收,無(wú)需關(guān)于SS里面的周期性等,而且必須是No-blocking的隘竭,所以這里一般會(huì)啟動(dòng)一個(gè)Thread塘秦,然后在里面不停地接受數(shù)據(jù)。
- Reliability保證动看,每個(gè)Receiver接受了數(shù)據(jù)之后尊剔,就需要存儲(chǔ)到Spark的內(nèi)存中區(qū),這里就是調(diào)用Store方法菱皆,為了確保Reliability, 需要在Store成功之后進(jìn)行ACK返回须误,甚至在這里需要進(jìn)行WAL保證。
- 當(dāng)寫(xiě)完成Receiver仇轻,會(huì)把這個(gè)Receiver封裝到DStream里面京痢,每個(gè)DStream都會(huì)有自己compute方法來(lái)觸發(fā)從Receiver接受的數(shù)據(jù)進(jìn)行轉(zhuǎn)換轉(zhuǎn)換為BlockRDD進(jìn)而利用Spark Core的計(jì)算流程中
Receiver 啟動(dòng)流程
啟動(dòng)Receiver是一趟修煉的道路,會(huì)學(xué)習(xí)一種利用Spark分布式的環(huán)境啟動(dòng)自己的Process的方法拯田。首先我們看下大致畫(huà)的流程圖如下:
- Streaming的開(kāi)始都是源于StreamingContext, 當(dāng)初始化的時(shí)候历造,會(huì)產(chǎn)生JobScheduler(這貨是驅(qū)動(dòng)Streaming Job的關(guān)鍵),在StreamingContext.start()的時(shí)候船庇,整個(gè)JobScheduler就開(kāi)始Start啦。
- ReceiverTracker, 他會(huì)負(fù)責(zé)整個(gè)StreamContexts里面的所有ReceiverInputDStream里面的Receivers的管理(略繞)侣监。
- 那我們直接到StartAllReceivers()里面鸭轮,這里面會(huì)先做schedule, 根據(jù)目前我們SS里面的Executors 和 所有需要啟動(dòng)的Receivers 統(tǒng)一考慮,來(lái)安排一下那些Receiver需要在哪個(gè)Executor上面進(jìn)行啟動(dòng)橄霉。
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
- 最巧妙的地方來(lái)了窃爷,全部精華就在ReceiverTracker里面:
// Function to start the receiver on the worker node
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}
// Create the RDD using the scheduledLocations to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
receiverRDD.setName(s"Receiver $receiverId")
ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
case Success(_) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.", e)
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
}(ThreadUtils.sameThread)
logInfo(s"Receiver ${receiver.streamId} started")
- 首先是startReceiverFunc,注意這里只是一個(gè)function, 它把傳入的Receiver封裝成ReceiverSupervisorImpl, 給每個(gè)Receiver創(chuàng)建監(jiān)督人姓蜂,監(jiān)督人主要是負(fù)責(zé)啟動(dòng)Receiver按厘,存儲(chǔ)Receiver接受的數(shù)據(jù) 以及 同Driver保持通信
- 這時(shí)候創(chuàng)建出一個(gè)ReceiverRDD, 根據(jù)之前Receiver和Executor的安排, 這個(gè)RDD的內(nèi)容就是 receiver -> 它安排的executor的host.
- 然后利用ssc.sparkContext.submitJob把這個(gè)RDD以及剛剛startReceiverFunc一起提交到集群上面去。這時(shí)候就利用Spark Core的分布式計(jì)算框架钱慢,讓receiver 分散到自己prefer的host上面進(jìn)行了啟動(dòng)逮京。
這里就交了我們一招如何利用Spark分布式計(jì)算環(huán)境,啟動(dòng)我們自己進(jìn)程的任務(wù)束莫。關(guān)鍵點(diǎn): 構(gòu)造出自己數(shù)據(jù)對(duì)于的運(yùn)行Executor, 然后把這批數(shù)據(jù)啟動(dòng)流程function構(gòu)造出來(lái)懒棉,最后利用ssc.sparkContext.submitJob()
- 在ReceiverSupervisor, 用于處理Receiver收到的數(shù)據(jù),并且利用BlockGenerator 來(lái)產(chǎn)生最終數(shù)據(jù)的Block.
總結(jié)
在Spark Streaming里面數(shù)據(jù)接收是關(guān)鍵览绿,我們從上面看到我們的Receiver的啟動(dòng)流程策严,也學(xué)到如何利用Spark這個(gè)分布式平臺(tái)進(jìn)行提交自己的Job. 后面我們會(huì)看到Spark Streaming里面Job啟動(dòng)的流程。