1. spark streaming 程序代碼實(shí)例
代碼如下:
object OnlineTheTop3ItemForEachCategory2DB {
def main(args: Array[String]){
val conf = new SparkConf() //創(chuàng)建SparkConf對(duì)象
//設(shè)置應(yīng)用程序的名稱浙宜,在程序運(yùn)行的監(jiān)控界面可以看到名稱
conf.setAppName("OnlineTheTop3ItemForEachCategory2DB")
conf.setMaster("spark://Master:7077") //此時(shí)佣盒,程序在Spark集群
//設(shè)置batchDuration時(shí)間間隔來控制Job生成的頻率并且創(chuàng)建Spark Streaming執(zhí)行的入口
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("/root/Documents/SparkApps/checkpoint")
val soketDStream = ssc.socketTextStream("Master", 9999)
/// 業(yè)務(wù)處理邏輯 .....
ssc.start()
ssc.awaitTermination()
}
}
2. Spark Streaming的運(yùn)行源碼分析
2.1 創(chuàng)建StreamingContext
我們將基于以上實(shí)例黎做,粗略地分析一下Spark源碼鹰晨,提示一些有針對(duì)性的內(nèi)容舶替,以了解其運(yùn)行的主要流程鸡典。
1)代碼沒有直接使用SparkContext芳悲,而是使用StreamingContext。
我們來看看StreamingContext 的源碼片段:
/**
* Create a StreamingContext by providing the configuration necessary for a new SparkContext.
* @param conf a org.apache.spark.SparkConf object specifying Spark parameters
* @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
沒錯(cuò)馏臭,createNewSparkContext就是創(chuàng)建SparkContext:
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
new SparkContext(conf)
}
這說明Spark Streaming也是Spark上的一個(gè)應(yīng)用程序野蝇。
2)案例最開始的地方肯定要通過數(shù)據(jù)流創(chuàng)建一個(gè)InputDStream。
val socketDstram = ssc.socketTextStream("Master", 9999)
socketTextStream方法定義如下:
/**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
3)可看到代碼最后面調(diào)用socketStream括儒。
socketStream定義如下:
/**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes it interepreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param converter Function to convert the byte stream to objects
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
4)實(shí)際上生成SocketInputDStream绕沈。
SocketInputDStream類如下:
SocketInputDStream繼承ReceiverInputDStream。其中實(shí)現(xiàn)getReceiver方法帮寻,返回SocketReceiver對(duì)象乍狐。
總結(jié)一下SocketInputDStream的繼承關(guān)系:
SocketInputDStream -> ReceiverInputDStream -> InputDStream -> DStream。
5)DStream是生成RDD的模板固逗,是邏輯級(jí)別浅蚪,當(dāng)達(dá)到Interval的時(shí)候這些模板會(huì)被batch data實(shí)例化成為RDD和DAG。
DStream的generatedRDDs:
// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
DStream的getOrCompute:
主要是生成RDD烫罩,再將生成的RDD放在HashMap中惜傲。具體生成RDD過程以后剖析。
目前大致講了DStream和RDD這些核心概念在Spark Streaming中的使用贝攒。
2.2 啟動(dòng)StreamingContext
代碼實(shí)例中的ssc.start() 方法啟動(dòng)StreamingContext,主要的邏輯發(fā)生在這個(gè)start方法中:
在StreamingContext調(diào)用start方法的內(nèi)部其實(shí)是會(huì)啟動(dòng)JobScheduler的Start方法
盗誊,進(jìn)行消息循環(huán),在JobScheduler的start內(nèi)部會(huì)構(gòu)造JobGenerator
和ReceiverTacker
隘弊,并且調(diào)用JobGenerator和 ReceiverTacker的start方法
:
JobGenerator啟動(dòng)后會(huì)不斷的根據(jù)
batchDuration
生成一個(gè)個(gè)的Job哈踱。
其實(shí)這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG而已长捧,從Java角度講嚣鄙,相當(dāng)于Runnable接口實(shí)例,此時(shí)要想運(yùn)行Job需要提交給JobScheduler串结,在JobScheduler中通過線程池的方式找到一個(gè)單獨(dú)的線程來提交Job到集群運(yùn)行(其實(shí)是在線程中基于RDD的Action觸發(fā)真正的作業(yè)的運(yùn)行)ReceiverTracker啟動(dòng)后首先在Spark Cluster中啟動(dòng)Receiver(其實(shí)是在Executor中先啟動(dòng)ReceiverSupervisor),在Receiver收到數(shù)據(jù)后會(huì)通過ReceiverSupervisor存儲(chǔ)到Executor并且把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker內(nèi)部會(huì)通過ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息肌割。
體現(xiàn)Spark Streaming應(yīng)用運(yùn)行流程的關(guān)鍵類如下圖所示卧蜓。
下面開啟神奇的 源碼分析之旅,過程痛苦,痛苦之后是大徹大悟的暢快...........
1)先看看ScreamingContext的start()。
start()方法啟動(dòng)StreamContext把敞,由于Spark應(yīng)用程序不能有多個(gè)SparkContext對(duì)象實(shí)例弥奸,所以Spark Streaming框架在啟動(dòng)時(shí)對(duì)狀態(tài)進(jìn)行判斷。代碼如下:
初始狀態(tài)時(shí)奋早,會(huì)啟動(dòng)JobScheduler盛霎。
2)接著來看下JobScheduler的啟動(dòng)過程start()。
其中啟動(dòng)了EventLoop耽装、StreamListenerBus愤炸、ReceiverTracker和jobGenerator等多項(xiàng)工作。
3)JobScheduler中的消息處理函數(shù)processEvent掉奄。
處理三類消息:Job已開始规个,Job已完成,錯(cuò)誤報(bào)告姓建。
4)我們?cè)俅致缘胤治鲆幌翵obScheduler.start()中啟動(dòng)的工作诞仓。
4.1)先看JobScheduler.start()啟動(dòng)的第一項(xiàng)工作EventLoop。
EventLoop用于處理JobScheduler的各種事件速兔。
EventLoop中有事件隊(duì)列:
private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
還有一個(gè)線程處理隊(duì)列中的事件:
這個(gè)線程中的onReceive墅拭、onError,在JobScheduler中的EventLoop實(shí)例化時(shí)已定義涣狗。
4.2)JobScheduler.start()啟動(dòng)的第二項(xiàng)工作StreamListenerBus帜矾。
- 用于異步傳遞StreamingListenerEvents到注冊(cè)的StreamingListeners。
- 用于更新Spark UI中StreamTab的內(nèi)容屑柔。
4.3)看JobScheduler.start()啟動(dòng)的第三項(xiàng)工作ReceiverTracker屡萤。
ReceiverTracker用于管理所有的輸入的流,以及他們輸入的數(shù)據(jù)統(tǒng)計(jì)掸宛。
這些信息將通過 StreamingListener監(jiān)聽死陆。
ReceiverTracker的start()中,會(huì)內(nèi)部實(shí)例化ReceiverTrackerEndpoint這個(gè)Rpc消息通信體唧瘾。
1 def start(): Unit = synchronized {
2 if (isTrackerStarted) {
3 throw new SparkException("ReceiverTracker already started")
4 }
5
6 if (!receiverInputStreams.isEmpty) {
7 endpoint = ssc.env.rpcEnv.setupEndpoint(
8 "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
9 if (!skipReceiverLaunch) launchReceivers()
10 logInfo("ReceiverTracker started")
11 trackerState = Started
12 }
13 }
在ReceiverTracker啟動(dòng)的過程中會(huì)調(diào)用其launchReceivers方法:
/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
*/
private def launchReceivers(): Unit = {
val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
runDummySparkJob()
logInfo("Starting " + receivers.length + " receivers")
endpoint.send(StartAllReceivers(receivers))
}
其中調(diào)用了runDummySparkJob方法來啟動(dòng)Spark Streaming的框架第一個(gè)Job措译,其中collect這個(gè)action操作會(huì)觸發(fā)Spark Job的執(zhí)行。這個(gè)方法是為了確保每個(gè)Slave都注冊(cè)上饰序,避免所有Receiver都在一個(gè)節(jié)點(diǎn)领虹,使后面的計(jì)算能負(fù)載均衡。
/**
* Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
* receivers to be scheduled on the same node.
*
* TODO Should poll the executor number and wait for executors according to
* "spark.scheduler.minRegisteredResourcesRatio" and
* "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.
*/
private def runDummySparkJob(): Unit = {
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}
assert(getExecutors.nonEmpty)
}
ReceiverTracker.launchReceivers()還調(diào)用了endpoint.send(StartAllReceivers(receivers))方法求豫,Rpc消息通信體發(fā)送StartAllReceivers消息塌衰。
ReceiverTrackerEndpoint它自己接收到消息后诉稍,先根據(jù)調(diào)度策略獲得Recevier在哪個(gè)Executor上運(yùn)行,然后在調(diào)用startReceiver(receiver, executors)方法最疆,來啟動(dòng)Receiver杯巨。
override def receive: PartialFunction[Any, Unit] = {
// Local messages
case StartAllReceivers(receivers) =>
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
for (receiver <- receivers) {
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
startReceiver(receiver, executors)
}
在startReceiver方法中,ssc.sparkContext.submitJob提交Job
的時(shí)候傳入startReceiverFunc這個(gè)方法努酸,因?yàn)閟tartReceiverFunc該方法是在Executor上執(zhí)行的服爷。而在startReceiverFunc方法中實(shí)例化ReceiverSupervisorImpl對(duì)象
,該對(duì)象是對(duì)Receiver進(jìn)行管理和監(jiān)控获诈。這個(gè)Job是Spark Streaming框架為我們啟動(dòng)的第二個(gè)Job仍源,且一直運(yùn)行。因?yàn)閟upervisor.awaitTermination()該方法會(huì)阻塞等待退出
舔涎。
/**
* Start a receiver along with its scheduled executors
*/
private def startReceiver(
receiver: Receiver[_],
scheduledLocations: Seq[TaskLocation]): Unit = {
def shouldStartReceiver: Boolean = {
// ........... 此處省略1萬(wàn)字 (無(wú)關(guān)代碼) , 呵呵噠 .........
// Function to start the receiver on the worker node
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
//實(shí)例化Receiver監(jiān)控者
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}
// Create the RDD using the scheduledLocations to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
receiverRDD.setName(s"Receiver $receiverId")
ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD,
startReceiverFunc, //提交Job時(shí)候傳入startReceiverFunc這個(gè)方法笼踩,因?yàn)閟tartReceiverFunc該方法是在Executor上執(zhí)行的
Seq(0), (_, _) => Unit, ())
// 一直重啟 receiver job直到 ReceiverTracker is stopped
future.onComplete {
case Success(_) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.", e)
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
}(submitJobThreadPool)
logInfo(s"Receiver ${receiver.streamId} started")
}
接下來看下ReceiverSupervisorImpl的啟動(dòng)過程,先啟動(dòng)所有注冊(cè)上的BlockGenerator對(duì)象终抽,然后向ReceiverTrackerEndpoint發(fā)送RegisterReceiver消息戳表,再調(diào)用receiver的onStart方法。
/** Start the supervisor */
def start() {
onStart()
startReceiver()
}
其中的onStart():啟動(dòng)所有注冊(cè)上的BlockGenerator對(duì)象
override protected def onStart() {
registeredBlockGenerators.foreach { _.start() }
}
其中的startReceiver()方法中調(diào)用onReceiverStart()然后再調(diào)用receiver的onStart方法昼伴。
/** Start receiver */
def startReceiver(): Unit = synchronized {
try {
if (onReceiverStart()) {
logInfo("Starting receiver")
receiverState = Started
receiver.onStart()
logInfo("Called receiver onStart")
} else {
// The driver refused us
stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
}
} catch {
case NonFatal(t) =>
stop("Error starting receiver " + streamId, Some(t))
}
}
在onReceiverStart()中向ReceiverTrackerEndpoint發(fā)送RegisterReceiver消息
override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
trackerEndpoint.askWithRetry[Boolean](msg)
}
其中在Driver運(yùn)行的ReceiverTrackerEndpoint對(duì)象接收到RegisterReceiver消息后匾旭,將streamId, typ, host, executorId, receiverEndpoint封裝為ReceiverTrackingInfo保存到內(nèi)存對(duì)象receiverTrackingInfos這個(gè)HashMap中。
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
// Remote messages
case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
val successful =
registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
context.reply(successful)
registerReceiver方法源碼:
/** Register a receiver */
private def registerReceiver(
streamId: Int,
typ: String,
host: String,
executorId: String,
receiverEndpoint: RpcEndpointRef,
senderAddress: RpcAddress
): Boolean = {
if (!receiverInputStreamIds.contains(streamId)) {
throw new SparkException("Register received for unexpected id " + streamId)
}
// ........... 此處省略1萬(wàn)字 (無(wú)關(guān)代碼) , 呵呵噠 .........
if (!isAcceptable) {
// Refuse it since it's scheduled to a wrong executor
false
} else {
val name = s"${typ}-${streamId}"
val receiverTrackingInfo = ReceiverTrackingInfo(
streamId,
ReceiverState.ACTIVE,
scheduledLocations = None,
runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
name = Some(name),
endpoint = Some(receiverEndpoint))
receiverTrackingInfos.put(streamId, receiverTrackingInfo)
listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
true
}
}
Receiver的啟動(dòng)圃郊,以ssc.socketTextStream("localhost", 9999)為例价涝,創(chuàng)建的是SocketReceiver對(duì)象。內(nèi)部啟動(dòng)一個(gè)線程來連接Socket Server持舆,讀取socket數(shù)據(jù)并存儲(chǔ)色瘩。
private[streaming]
class SocketReceiver[T: ClassTag](
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends Receiver[T](storageLevel) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
if (!isStopped()) {
restart("Socket data stream had no more data")
} else {
logInfo("Stopped receiving")
}
} catch {
// ........... 此處省略1萬(wàn)字 (無(wú)關(guān)代碼) , 呵呵噠 .........
}
}
4.4)接下來看JobScheduler.start()中啟動(dòng)的第四項(xiàng)工作JobGenerator。
JobGenerator有成員RecurringTimer逸寓,用于啟動(dòng)消息系統(tǒng)和定時(shí)器居兆。按照batchInterval時(shí)間間隔定期發(fā)送GenerateJobs消息。
//根據(jù)創(chuàng)建StreamContext時(shí)傳入的batchInterval竹伸,定時(shí)發(fā)送GenerateJobs消息
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
JobGenerator的start()方法:
/** Start generation of jobs */
def start(): Unit = synchronized {
if (eventLoop != null) return // generator has already been started
// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
// See SPARK-10125
checkpointWriter
eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = {
jobScheduler.reportError("Error in job generator", e)
}
}
// 啟動(dòng)消息循環(huán)處理線程
eventLoop.start()
if (ssc.isCheckpointPresent) {
restart()
} else {
// 開啟定時(shí)生成Job的定時(shí)器
startFirstTime()
}
}
JobGenerator.start()中的startFirstTime()的定義:
/** Starts the generator for the first time */
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
logInfo("Started JobGenerator at " + startTime)
}
JobGenerator.start()中的processEvent()的定義:
其中g(shù)enerateJobs的定義:
/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
// 根據(jù)特定的時(shí)間獲取具體的數(shù)據(jù)
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
//調(diào)用DStreamGraph的generateJobs生成Job
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
DStreamGraph的generateJobs方法泥栖,調(diào)用輸出流的generateJob方法來生成Jobs集合。
// 輸出流:具體Action的輸出操作
private val outputStreams = new ArrayBuffer[DStream[_]]()
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
}
來看下DStream的generateJob方法勋篓,調(diào)用getOrCompute方法來獲取當(dāng)Interval的時(shí)候吧享,DStreamGraph會(huì)被BatchData實(shí)例化成為RDD,如果有RDD則封裝jobFunc方法譬嚣,里面包含context.sparkContext.runJob(rdd, emptyFunc)钢颂,然后返回封裝后的Job。
/**
* Generate a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
* to generate their own jobs.
*/
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}
接下來看JobScheduler的submitJobSet方法拜银,向線程池中提交JobHandler殊鞭。而JobHandler實(shí)現(xiàn)了Runnable 接口遭垛,最終調(diào)用了job.run()這個(gè)方法∏恚看一下Job類的定義耻卡,其中run方法調(diào)用的func為構(gòu)造Job時(shí)傳入的jobFunc疯汁,其包含了context.sparkContext.runJob(rdd, emptyFunc)操作牲尺,最終導(dǎo)致Job的提交。
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
JobHandler實(shí)現(xiàn)了Runnable 接口幌蚊,最終調(diào)用了job.run()這個(gè)方法:
private class JobHandler(job: Job) extends Runnable with Logging {
import JobScheduler._
def run() {
try {
// *********** 此處省略無(wú)關(guān)代碼 *******************
// We need to assign `eventLoop` to a temp variable. Otherwise, because
// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
// it's possible that when `post` is called, `eventLoop` happens to null.
var _eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobStarted(job, clock.getTimeMillis()))
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
job.run()
}
_eventLoop = eventLoop
if (_eventLoop != null) {
_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
}
} else {
// JobScheduler has been stopped.
}
} finally {
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
}
}
}
}
Job的代碼片段:
private[streaming]
class Job(val time: Time, func: () => _) {
private var _id: String = _
private var _outputOpId: Int = _
private var isSet = false
private var _result: Try[_] = null
private var _callSite: CallSite = null
private var _startTime: Option[Long] = None
private var _endTime: Option[Long] = None
def run() {
_result = Try(func())
}
以上是主要源碼的分析.