2. Spark Streaming流計(jì)算框架的運(yùn)行流程源碼分析

1. spark streaming 程序代碼實(shí)例

代碼如下:

object OnlineTheTop3ItemForEachCategory2DB {  
  def main(args: Array[String]){   
    val conf = new SparkConf() //創(chuàng)建SparkConf對(duì)象  
    //設(shè)置應(yīng)用程序的名稱浙宜,在程序運(yùn)行的監(jiān)控界面可以看到名稱  
    conf.setAppName("OnlineTheTop3ItemForEachCategory2DB")  
    conf.setMaster("spark://Master:7077") //此時(shí)佣盒,程序在Spark集群  
    //設(shè)置batchDuration時(shí)間間隔來控制Job生成的頻率并且創(chuàng)建Spark Streaming執(zhí)行的入口  
    val ssc = new StreamingContext(conf, Seconds(5))  
    ssc.checkpoint("/root/Documents/SparkApps/checkpoint")  
    val soketDStream = ssc.socketTextStream("Master", 9999)  
    /// 業(yè)務(wù)處理邏輯 .....
      
    ssc.start()  
    ssc.awaitTermination()  
  }  
}  

2. Spark Streaming的運(yùn)行源碼分析

2.1 創(chuàng)建StreamingContext

我們將基于以上實(shí)例黎做,粗略地分析一下Spark源碼鹰晨,提示一些有針對(duì)性的內(nèi)容舶替,以了解其運(yùn)行的主要流程鸡典。

1)代碼沒有直接使用SparkContext芳悲,而是使用StreamingContext。

我們來看看StreamingContext 的源碼片段:

/**  
 * Create a StreamingContext by providing the configuration necessary for a new SparkContext.  
 * @param conf a org.apache.spark.SparkConf object specifying Spark parameters  
 * @param batchDuration the time interval at which streaming data will be divided into batches  
 */  
def this(conf: SparkConf, batchDuration: Duration) = {  
  this(StreamingContext.createNewSparkContext(conf), null, batchDuration)  
}  

沒錯(cuò)馏臭,createNewSparkContext就是創(chuàng)建SparkContext:

private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {   
  new SparkContext(conf)   
}  

這說明Spark Streaming也是Spark上的一個(gè)應(yīng)用程序野蝇。

2)案例最開始的地方肯定要通過數(shù)據(jù)流創(chuàng)建一個(gè)InputDStream。
val socketDstram = ssc.socketTextStream("Master", 9999)  

socketTextStream方法定義如下:

/**  
 * Create a input stream from TCP source hostname:port. Data is received using  
 * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited  
 * lines.  
 * @param hostname      Hostname to connect to for receiving data  
 * @param port          Port to connect to for receiving data  
 * @param storageLevel  Storage level to use for storing the received objects  
 *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)  
 */  
def socketTextStream(  
    hostname: String,  
    port: Int,  
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2  
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {  
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)  
}  
3)可看到代碼最后面調(diào)用socketStream括儒。

socketStream定義如下:

/**  
 * Create a input stream from TCP source hostname:port. Data is received using  
 * a TCP socket and the receive bytes it interepreted as object using the given  
 * converter.  
 * @param hostname      Hostname to connect to for receiving data  
 * @param port          Port to connect to for receiving data  
 * @param converter     Function to convert the byte stream to objects  
 * @param storageLevel  Storage level to use for storing the received objects  
 * @tparam T            Type of the objects received (after converting bytes to objects)  
 */  
def socketStream[T: ClassTag](  
    hostname: String,  
    port: Int,  
    converter: (InputStream) => Iterator[T],  
    storageLevel: StorageLevel  
  ): ReceiverInputDStream[T] = {  
  new SocketInputDStream[T](this, hostname, port, converter, storageLevel)  
}  
4)實(shí)際上生成SocketInputDStream绕沈。

SocketInputDStream類如下:



SocketInputDStream繼承ReceiverInputDStream。其中實(shí)現(xiàn)getReceiver方法帮寻,返回SocketReceiver對(duì)象乍狐。

總結(jié)一下SocketInputDStream的繼承關(guān)系:
SocketInputDStream -> ReceiverInputDStream -> InputDStream -> DStream。

5)DStream是生成RDD的模板固逗,是邏輯級(jí)別浅蚪,當(dāng)達(dá)到Interval的時(shí)候這些模板會(huì)被batch data實(shí)例化成為RDD和DAG。

DStream的generatedRDDs:

// RDDs generated, marked as private[streaming] so that testsuites can access it  
@transient  
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()  

DStream的getOrCompute:



主要是生成RDD烫罩,再將生成的RDD放在HashMap中惜傲。具體生成RDD過程以后剖析。
目前大致講了DStream和RDD這些核心概念在Spark Streaming中的使用贝攒。

2.2 啟動(dòng)StreamingContext

代碼實(shí)例中的ssc.start() 方法啟動(dòng)StreamingContext,主要的邏輯發(fā)生在這個(gè)start方法中:
在StreamingContext調(diào)用start方法的內(nèi)部其實(shí)是會(huì)啟動(dòng)JobScheduler的Start方法盗誊,進(jìn)行消息循環(huán),在JobScheduler的start內(nèi)部會(huì)構(gòu)造JobGeneratorReceiverTacker隘弊,并且調(diào)用JobGenerator和 ReceiverTacker的start方法

  1. JobGenerator啟動(dòng)后會(huì)不斷的根據(jù)batchDuration生成一個(gè)個(gè)的Job哈踱。
    其實(shí)這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG而已长捧,從Java角度講嚣鄙,相當(dāng)于Runnable接口實(shí)例,此時(shí)要想運(yùn)行Job需要提交給JobScheduler串结,在JobScheduler中通過線程池的方式找到一個(gè)單獨(dú)的線程來提交Job到集群運(yùn)行(其實(shí)是在線程中基于RDD的Action觸發(fā)真正的作業(yè)的運(yùn)行)

  2. ReceiverTracker啟動(dòng)后首先在Spark Cluster中啟動(dòng)Receiver(其實(shí)是在Executor中先啟動(dòng)ReceiverSupervisor),在Receiver收到數(shù)據(jù)后會(huì)通過ReceiverSupervisor存儲(chǔ)到Executor并且把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker內(nèi)部會(huì)通過ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息肌割。

體現(xiàn)Spark Streaming應(yīng)用運(yùn)行流程的關(guān)鍵類如下圖所示卧蜓。


下面開啟神奇的 源碼分析之旅,過程痛苦,痛苦之后是大徹大悟的暢快...........

1)先看看ScreamingContext的start()。

start()方法啟動(dòng)StreamContext把敞,由于Spark應(yīng)用程序不能有多個(gè)SparkContext對(duì)象實(shí)例弥奸,所以Spark Streaming框架在啟動(dòng)時(shí)對(duì)狀態(tài)進(jìn)行判斷。代碼如下:



初始狀態(tài)時(shí)奋早,會(huì)啟動(dòng)JobScheduler盛霎。

2)接著來看下JobScheduler的啟動(dòng)過程start()。

其中啟動(dòng)了EventLoop耽装、StreamListenerBus愤炸、ReceiverTracker和jobGenerator等多項(xiàng)工作。

3)JobScheduler中的消息處理函數(shù)processEvent掉奄。

處理三類消息:Job已開始规个,Job已完成,錯(cuò)誤報(bào)告姓建。


4)我們?cè)俅致缘胤治鲆幌翵obScheduler.start()中啟動(dòng)的工作诞仓。
4.1)先看JobScheduler.start()啟動(dòng)的第一項(xiàng)工作EventLoop。

EventLoop用于處理JobScheduler的各種事件速兔。
EventLoop中有事件隊(duì)列:

private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()  

還有一個(gè)線程處理隊(duì)列中的事件:



這個(gè)線程中的onReceive墅拭、onError,在JobScheduler中的EventLoop實(shí)例化時(shí)已定義涣狗。

4.2)JobScheduler.start()啟動(dòng)的第二項(xiàng)工作StreamListenerBus帜矾。
  • 用于異步傳遞StreamingListenerEvents到注冊(cè)的StreamingListeners。
  • 用于更新Spark UI中StreamTab的內(nèi)容屑柔。
4.3)看JobScheduler.start()啟動(dòng)的第三項(xiàng)工作ReceiverTracker屡萤。

ReceiverTracker用于管理所有的輸入的流,以及他們輸入的數(shù)據(jù)統(tǒng)計(jì)掸宛。
這些信息將通過 StreamingListener監(jiān)聽死陆。
ReceiverTracker的start()中,會(huì)內(nèi)部實(shí)例化ReceiverTrackerEndpoint這個(gè)Rpc消息通信體唧瘾。

 1 def start(): Unit = synchronized {
 2   if (isTrackerStarted) {
 3     throw new SparkException("ReceiverTracker already started")
 4   }
 5  
 6   if (!receiverInputStreams.isEmpty) {
 7     endpoint = ssc.env.rpcEnv.setupEndpoint(
 8       "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
 9     if (!skipReceiverLaunch) launchReceivers()
10     logInfo("ReceiverTracker started")
11     trackerState = Started
12   }
13 }

在ReceiverTracker啟動(dòng)的過程中會(huì)調(diào)用其launchReceivers方法:

/**
 * Get the receivers from the ReceiverInputDStreams, distributes them to the
 * worker nodes as a parallel collection, and runs them.
 */
private def launchReceivers(): Unit = {
  val receivers = receiverInputStreams.map(nis => {
    val rcvr = nis.getReceiver()
    rcvr.setReceiverId(nis.id)
    rcvr
  })
  runDummySparkJob()
  logInfo("Starting " + receivers.length + " receivers")
  endpoint.send(StartAllReceivers(receivers))
}

其中調(diào)用了runDummySparkJob方法來啟動(dòng)Spark Streaming的框架第一個(gè)Job措译,其中collect這個(gè)action操作會(huì)觸發(fā)Spark Job的執(zhí)行。這個(gè)方法是為了確保每個(gè)Slave都注冊(cè)上饰序,避免所有Receiver都在一個(gè)節(jié)點(diǎn)领虹,使后面的計(jì)算能負(fù)載均衡。

/**
 * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
 * receivers to be scheduled on the same node.
 *
 * TODO Should poll the executor number and wait for executors according to
 * "spark.scheduler.minRegisteredResourcesRatio" and
 * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.
 */
private def runDummySparkJob(): Unit = {
  if (!ssc.sparkContext.isLocal) {
    ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
  }
  assert(getExecutors.nonEmpty)
}

ReceiverTracker.launchReceivers()還調(diào)用了endpoint.send(StartAllReceivers(receivers))方法求豫,Rpc消息通信體發(fā)送StartAllReceivers消息塌衰。
ReceiverTrackerEndpoint它自己接收到消息后诉稍,先根據(jù)調(diào)度策略獲得Recevier在哪個(gè)Executor上運(yùn)行,然后在調(diào)用startReceiver(receiver, executors)方法最疆,來啟動(dòng)Receiver杯巨。

override def receive: PartialFunction[Any, Unit] = {
  // Local messages
  case StartAllReceivers(receivers) =>
    val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
    for (receiver <- receivers) {
      val executors = scheduledLocations(receiver.streamId)
      updateReceiverScheduledExecutors(receiver.streamId, executors)
      receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
      startReceiver(receiver, executors)
    }

在startReceiver方法中,ssc.sparkContext.submitJob提交Job的時(shí)候傳入startReceiverFunc這個(gè)方法努酸,因?yàn)閟tartReceiverFunc該方法是在Executor上執(zhí)行的服爷。而在startReceiverFunc方法中實(shí)例化ReceiverSupervisorImpl對(duì)象,該對(duì)象是對(duì)Receiver進(jìn)行管理和監(jiān)控获诈。這個(gè)Job是Spark Streaming框架為我們啟動(dòng)的第二個(gè)Job仍源,且一直運(yùn)行。因?yàn)閟upervisor.awaitTermination()該方法會(huì)阻塞等待退出舔涎。

/**
 * Start a receiver along with its scheduled executors
 */
private def startReceiver(
    receiver: Receiver[_],
    scheduledLocations: Seq[TaskLocation]): Unit = {
  def shouldStartReceiver: Boolean = {
 
    // ........... 此處省略1萬(wàn)字 (無(wú)關(guān)代碼) , 呵呵噠 .........
 
  // Function to start the receiver on the worker node
  val startReceiverFunc: Iterator[Receiver[_]] => Unit =
    (iterator: Iterator[Receiver[_]]) => {
      if (!iterator.hasNext) {
        throw new SparkException(
          "Could not start receiver as object not found.")
      }
      if (TaskContext.get().attemptNumber() == 0) {
        val receiver = iterator.next()
        assert(iterator.hasNext == false)
        //實(shí)例化Receiver監(jiān)控者
        val supervisor = new ReceiverSupervisorImpl(
          receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
        supervisor.start()
        supervisor.awaitTermination()
      } else {
        // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
      }
    }
 
  // Create the RDD using the scheduledLocations to run the receiver in a Spark job
  val receiverRDD: RDD[Receiver[_]] =
    if (scheduledLocations.isEmpty) {
      ssc.sc.makeRDD(Seq(receiver), 1)
    } else {
      val preferredLocations = scheduledLocations.map(_.toString).distinct
      ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
    }
 
  receiverRDD.setName(s"Receiver $receiverId")
  ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
  ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
  val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
    receiverRDD, 
   startReceiverFunc, //提交Job時(shí)候傳入startReceiverFunc這個(gè)方法笼踩,因?yàn)閟tartReceiverFunc該方法是在Executor上執(zhí)行的
  Seq(0), (_, _) => Unit, ())
 
  // 一直重啟 receiver job直到 ReceiverTracker is stopped
  future.onComplete {
    case Success(_) =>
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      } else {
        logInfo(s"Restarting Receiver $receiverId")
        self.send(RestartReceiver(receiver))
      }
    case Failure(e) =>
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      } else {
        logError("Receiver has been stopped. Try to restart it.", e)
        logInfo(s"Restarting Receiver $receiverId")
        self.send(RestartReceiver(receiver))
      }
  }(submitJobThreadPool)
  logInfo(s"Receiver ${receiver.streamId} started")
}

接下來看下ReceiverSupervisorImpl的啟動(dòng)過程,先啟動(dòng)所有注冊(cè)上的BlockGenerator對(duì)象终抽,然后向ReceiverTrackerEndpoint發(fā)送RegisterReceiver消息戳表,再調(diào)用receiver的onStart方法。

/** Start the supervisor */
def start() {
  onStart()
  startReceiver()
}

其中的onStart():啟動(dòng)所有注冊(cè)上的BlockGenerator對(duì)象

override protected def onStart() {
  registeredBlockGenerators.foreach { _.start() }
}

其中的startReceiver()方法中調(diào)用onReceiverStart()然后再調(diào)用receiver的onStart方法昼伴。

/** Start receiver */
def startReceiver(): Unit = synchronized {
  try {
    if (onReceiverStart()) {
      logInfo("Starting receiver")
      receiverState = Started
      receiver.onStart()
      logInfo("Called receiver onStart")
    } else {
      // The driver refused us
      stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
    }
  } catch {
    case NonFatal(t) =>
      stop("Error starting receiver " + streamId, Some(t))
  }
}

在onReceiverStart()中向ReceiverTrackerEndpoint發(fā)送RegisterReceiver消息

override protected def onReceiverStart(): Boolean = {
  val msg = RegisterReceiver(
    streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
  trackerEndpoint.askWithRetry[Boolean](msg)
}

其中在Driver運(yùn)行的ReceiverTrackerEndpoint對(duì)象接收到RegisterReceiver消息后匾旭,將streamId, typ, host, executorId, receiverEndpoint封裝為ReceiverTrackingInfo保存到內(nèi)存對(duì)象receiverTrackingInfos這個(gè)HashMap中。

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  // Remote messages
  case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
    val successful =
      registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
    context.reply(successful)

registerReceiver方法源碼:

/** Register a receiver */
private def registerReceiver(
    streamId: Int,
    typ: String,
    host: String,
    executorId: String,
    receiverEndpoint: RpcEndpointRef,
    senderAddress: RpcAddress
  ): Boolean = {
  if (!receiverInputStreamIds.contains(streamId)) {
    throw new SparkException("Register received for unexpected id " + streamId)
  }
 
    // ........... 此處省略1萬(wàn)字 (無(wú)關(guān)代碼) , 呵呵噠 .........
 
  if (!isAcceptable) {
    // Refuse it since it's scheduled to a wrong executor
    false
  } else {
    val name = s"${typ}-${streamId}"
    val receiverTrackingInfo = ReceiverTrackingInfo(
      streamId,
      ReceiverState.ACTIVE,
      scheduledLocations = None,
      runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
      name = Some(name),
      endpoint = Some(receiverEndpoint))
    receiverTrackingInfos.put(streamId, receiverTrackingInfo)
    listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
    logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
    true
  }
}

Receiver的啟動(dòng)圃郊,以ssc.socketTextStream("localhost", 9999)為例价涝,創(chuàng)建的是SocketReceiver對(duì)象。內(nèi)部啟動(dòng)一個(gè)線程來連接Socket Server持舆,讀取socket數(shù)據(jù)并存儲(chǔ)色瘩。

private[streaming]
class SocketReceiver[T: ClassTag](
   host: String,
   port: Int,
   bytesToObjects: InputStream => Iterator[T],
   storageLevel: StorageLevel
 ) extends Receiver[T](storageLevel) with Logging {

 def onStart() {
   // Start the thread that receives data over a connection
   new Thread("Socket Receiver") {
     setDaemon(true)
     override def run() { receive() }
   }.start()
 }


 /** Create a socket connection and receive data until receiver is stopped */
 def receive() {
   var socket: Socket = null
   try {
     logInfo("Connecting to " + host + ":" + port)
     socket = new Socket(host, port)
     logInfo("Connected to " + host + ":" + port)
     val iterator = bytesToObjects(socket.getInputStream())
     while(!isStopped && iterator.hasNext) {
       store(iterator.next)
     }
     if (!isStopped()) {
       restart("Socket data stream had no more data")
     } else {
       logInfo("Stopped receiving")
     }
   } catch {
       // ........... 此處省略1萬(wàn)字 (無(wú)關(guān)代碼) , 呵呵噠 .........
 }
}
4.4)接下來看JobScheduler.start()中啟動(dòng)的第四項(xiàng)工作JobGenerator。

JobGenerator有成員RecurringTimer逸寓,用于啟動(dòng)消息系統(tǒng)和定時(shí)器居兆。按照batchInterval時(shí)間間隔定期發(fā)送GenerateJobs消息。

//根據(jù)創(chuàng)建StreamContext時(shí)傳入的batchInterval竹伸,定時(shí)發(fā)送GenerateJobs消息
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
 
JobGenerator的start()方法:
/** Start generation of jobs */
def start(): Unit = synchronized {
  if (eventLoop != null) return // generator has already been started
 
  // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
  // See SPARK-10125
  checkpointWriter
 
  eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
    override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
 
    override protected def onError(e: Throwable): Unit = {
      jobScheduler.reportError("Error in job generator", e)
    }
  }
 
  // 啟動(dòng)消息循環(huán)處理線程
  eventLoop.start()
 
  if (ssc.isCheckpointPresent) {
    restart()
  } else {
    // 開啟定時(shí)生成Job的定時(shí)器
    startFirstTime()
  }
}

JobGenerator.start()中的startFirstTime()的定義:

/** Starts the generator for the first time */
private def startFirstTime() {
  val startTime = new Time(timer.getStartTime())
  graph.start(startTime - graph.batchDuration)
  timer.start(startTime.milliseconds)
  logInfo("Started JobGenerator at " + startTime)
}

JobGenerator.start()中的processEvent()的定義:


其中g(shù)enerateJobs的定義:

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {
 
    // 根據(jù)特定的時(shí)間獲取具體的數(shù)據(jù)
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    //調(diào)用DStreamGraph的generateJobs生成Job
    graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

DStreamGraph的generateJobs方法泥栖,調(diào)用輸出流的generateJob方法來生成Jobs集合。

// 輸出流:具體Action的輸出操作
private val outputStreams = new ArrayBuffer[DStream[_]]()
 
def generateJobs(time: Time): Seq[Job] = {
  logDebug("Generating jobs for time " + time)
  val jobs = this.synchronized {
    outputStreams.flatMap { outputStream =>
      val jobOption = outputStream.generateJob(time)
      jobOption.foreach(_.setCallSite(outputStream.creationSite))
      jobOption
    }
  }
  logDebug("Generated " + jobs.length + " jobs for time " + time)
  jobs
}

來看下DStream的generateJob方法勋篓,調(diào)用getOrCompute方法來獲取當(dāng)Interval的時(shí)候吧享,DStreamGraph會(huì)被BatchData實(shí)例化成為RDD,如果有RDD則封裝jobFunc方法譬嚣,里面包含context.sparkContext.runJob(rdd, emptyFunc)钢颂,然后返回封裝后的Job。

/**  
 * Generate a SparkStreaming job for the given time. This is an internal method that  
 * should not be called directly. This default implementation creates a job  
 * that materializes the corresponding RDD. Subclasses of DStream may override this  
 * to generate their own jobs.  
 */  
private[streaming] def generateJob(time: Time): Option[Job] = {  
  getOrCompute(time) match {  
    case Some(rdd) => {  
      val jobFunc = () => {  
        val emptyFunc = { (iterator: Iterator[T]) => {} }  
        context.sparkContext.runJob(rdd, emptyFunc)  
      }  
      Some(new Job(time, jobFunc))  
    }  
    case None => None  
  }  
}  

接下來看JobScheduler的submitJobSet方法拜银,向線程池中提交JobHandler殊鞭。而JobHandler實(shí)現(xiàn)了Runnable 接口遭垛,最終調(diào)用了job.run()這個(gè)方法∏恚看一下Job類的定義耻卡,其中run方法調(diào)用的func為構(gòu)造Job時(shí)傳入的jobFunc疯汁,其包含了context.sparkContext.runJob(rdd, emptyFunc)操作牲尺,最終導(dǎo)致Job的提交。

def submitJobSet(jobSet: JobSet) {
  if (jobSet.jobs.isEmpty) {
    logInfo("No jobs added for time " + jobSet.time)
  } else {
    listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
    jobSets.put(jobSet.time, jobSet)
    jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    logInfo("Added jobs for time " + jobSet.time)
  }
}

JobHandler實(shí)現(xiàn)了Runnable 接口幌蚊,最終調(diào)用了job.run()這個(gè)方法:

private class JobHandler(job: Job) extends Runnable with Logging {
    import JobScheduler._
 
    def run() {
      try {
    
         //  *********** 此處省略無(wú)關(guān)代碼 *******************
 
        // We need to assign `eventLoop` to a temp variable. Otherwise, because
        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
        // it's possible that when `post` is called, `eventLoop` happens to null.
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
          }
        } else {
          // JobScheduler has been stopped.
        }
      } finally {
        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
      }
    }
  }
}

Job的代碼片段:

private[streaming]  
class Job(val time: Time, func: () => _) {  
  private var _id: String = _  
  private var _outputOpId: Int = _  
  private var isSet = false  
  private var _result: Try[_] = null  
  private var _callSite: CallSite = null  
  private var _startTime: Option[Long] = None  
  private var _endTime: Option[Long] = None  
  def run() {  
    _result = Try(func())  
  }  

以上是主要源碼的分析.

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末谤碳,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子溢豆,更是在濱河造成了極大的恐慌蜒简,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,273評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件漩仙,死亡現(xiàn)場(chǎng)離奇詭異搓茬,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)队他,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門卷仑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人麸折,你說我怎么就攤上這事锡凝。” “怎么了垢啼?”我有些...
    開封第一講書人閱讀 167,709評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵窜锯,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我芭析,道長(zhǎng)锚扎,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,520評(píng)論 1 296
  • 正文 為了忘掉前任馁启,我火速辦了婚禮驾孔,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘进统。我一直安慰自己助币,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,515評(píng)論 6 397
  • 文/花漫 我一把揭開白布螟碎。 她就那樣靜靜地躺著眉菱,像睡著了一般。 火紅的嫁衣襯著肌膚如雪掉分。 梳的紋絲不亂的頭發(fā)上俭缓,一...
    開封第一講書人閱讀 52,158評(píng)論 1 308
  • 那天克伊,我揣著相機(jī)與錄音,去河邊找鬼华坦。 笑死愿吹,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的惜姐。 我是一名探鬼主播犁跪,決...
    沈念sama閱讀 40,755評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼歹袁!你這毒婦竟也來了坷衍?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,660評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤条舔,失蹤者是張志新(化名)和其女友劉穎枫耳,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體孟抗,經(jīng)...
    沈念sama閱讀 46,203評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡迁杨,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,287評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了凄硼。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片铅协。...
    茶點(diǎn)故事閱讀 40,427評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖帆喇,靈堂內(nèi)的尸體忽然破棺而出警医,到底是詐尸還是另有隱情,我是刑警寧澤坯钦,帶...
    沈念sama閱讀 36,122評(píng)論 5 349
  • 正文 年R本政府宣布预皇,位于F島的核電站,受9級(jí)特大地震影響婉刀,放射性物質(zhì)發(fā)生泄漏吟温。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,801評(píng)論 3 333
  • 文/蒙蒙 一突颊、第九天 我趴在偏房一處隱蔽的房頂上張望鲁豪。 院中可真熱鬧,春花似錦律秃、人聲如沸爬橡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,272評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)糙申。三九已至,卻和暖如春船惨,著一層夾襖步出監(jiān)牢的瞬間柜裸,已是汗流浹背缕陕。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留疙挺,地道東北人扛邑。 一個(gè)月前我還...
    沈念sama閱讀 48,808評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像铐然,于是被迫代替她去往敵國(guó)和親蔬崩。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,440評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容