Spark-Core源碼精讀(6)、SparkContext和SparkEnv

本文我們將詳細(xì)分析SparkContext的源碼黔帕。

我們先來(lái)看一下SparkConf:

SparkConf

SparkContext實(shí)例化的時(shí)候需要傳進(jìn)一個(gè)SparkConf作為參數(shù),SparkConf描述整個(gè)Spark應(yīng)用程序的配置信息,如果和系統(tǒng)默認(rèn)的配置沖突就會(huì)覆蓋系統(tǒng)默認(rèn)的設(shè)置妈嘹。我們經(jīng)常會(huì)在單元測(cè)試的時(shí)候使用new SparkConf(fasle)(如果不傳入?yún)?shù),默認(rèn)是true)實(shí)例化SparkConf绍妨,這樣就不會(huì)加載“conf/”下默認(rèn)的配置润脸,這樣無(wú)論在什么樣的集群環(huán)境中運(yùn)行單元測(cè)試,其配置都是一樣的他去,不會(huì)隨著環(huán)境的變化而變化毙驯。另外在程序運(yùn)行的時(shí)候不能修改SparkConf,數(shù)據(jù)結(jié)構(gòu)通過(guò)ConcurrentHashMap進(jìn)行維護(hù)灾测,只能通過(guò)clone的方式讀取Spark的配置信息爆价。最后需要說(shuō)明的是SparkConf可以進(jìn)行鏈?zhǔn)降恼{(diào)用,即:

new SparkConf().setMaster("local").setAppName("TestApp")

因?yàn)檫@些方法在設(shè)置完配置信息后最終都返回了自己媳搪,即SparkConf本身铭段,SparkConf的部分源碼如下:

// 用來(lái)存儲(chǔ)key-value的配置信息
private val settings = new ConcurrentHashMap[String, String]()
// 默認(rèn)會(huì)加載“spark.*”格式的配置信息
if (loadDefaults) {
  // Load any spark.* system properties
  for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
    set(key, value)
  }
}
/** Set a configuration variable. */
def set(key: String, value: String): SparkConf = {
  if (key == null) {
    throw new NullPointerException("null key")
  }
  if (value == null) {
    throw new NullPointerException("null value for " + key)
  }
  logDeprecationWarning(key)
  settings.put(key, value)
  // 這里我們可以清楚的看到,每次進(jìn)行設(shè)置后都會(huì)返回SparkConf自身秦爆,所以可以進(jìn)行鏈?zhǔn)降恼{(diào)用
  this
}

SparkContext

SparkContext是整個(gè)Spark功能的入口序愚,代表了應(yīng)用程序與整個(gè)集群的連接點(diǎn),通過(guò)SparkContext可以創(chuàng)建RDD等限、Accumulators和Broadcast爸吮。

Spark應(yīng)用程序是通過(guò)SparkContext發(fā)布到Spark集群的,并且Spark程序的運(yùn)行都是在SparkContext為核心的調(diào)度指揮下進(jìn)行的望门,SparkContext崩潰或者結(jié)束就代表Spark應(yīng)用程序執(zhí)行結(jié)束形娇,可見(jiàn)SparkContext在Spark中是多么的重要,下面我們結(jié)合源碼進(jìn)行詳細(xì)分析(只選取重要部分):

// 方便開(kāi)發(fā)人員查看調(diào)用的信息
// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()
// 是否允許存在多個(gè)SparkContext怒允,默認(rèn)是false
// If true, log warnings instead of throwing exceptions when multiple SparkContexts are active
private val allowMultipleContexts: Boolean =
  config.getBoolean("spark.driver.allowMultipleContexts", false)
// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having started construction.
// NOTE: this must be placed at the beginning of the SparkContext constructor.
SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
val startTime = System.currentTimeMillis()
// 判斷上下文狀態(tài)
private[spark] val stopped: AtomicBoolean = new AtomicBoolean(false)

...

// 消息通信相關(guān)的內(nèi)容埂软,我們會(huì)單獨(dú)進(jìn)行說(shuō)明
// An asynchronous listener bus for Spark events
private[spark] val listenerBus = new LiveListenerBus

...

// 追蹤所有執(zhí)行持久化的RDD
// Keeps track of all persisted RDDs
private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]

...

// 傳遞給executors的環(huán)境變量信息
// Environment variables to pass to our executors.
private[spark] val executorEnvs = HashMap[String, String]()
// 配置SPARK_USER
// Set SPARK_USER for user who is running SparkContext.
val sparkUser = Utils.getCurrentUserName()

同時(shí)還有幾個(gè)重載的構(gòu)造方法,我們不進(jìn)行一一說(shuō)明纫事,下面我們來(lái)看SparkContext中最重要的一個(gè)部分勘畔,即try里面的內(nèi)容(大部分初始化的工作都在這里面,因?yàn)閮?nèi)容較多丽惶,大家可以根據(jù)具體的功能點(diǎn)進(jìn)行查看):

try {
  // 讀取SparkConf的信息炫七,即Spark的配置信息,并檢查是否有非法的配置信息
  _conf = config.clone()
  _conf.validateSettings()
  // 判斷是否配置了Master钾唬,沒(méi)有的話拋出異常
  if (!_conf.contains("spark.master")) {
    throw new SparkException("A master URL must be set in your configuration")
  }
  // 判斷是否配置了AppName万哪,沒(méi)有的話拋出異常
  if (!_conf.contains("spark.app.name")) {
    throw new SparkException("An application name must be set in your configuration")
  }
  // Yarn模式下的判斷
  // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
  // yarn-standalone is deprecated, but still supported
  if ((master == "yarn-cluster" || master == "yarn-standalone") &&
      !_conf.contains("spark.yarn.app.id")) {
    throw new SparkException("Detected yarn-cluster mode, but isn't running on a cluster. " +
      "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
  }
  if (_conf.getBoolean("spark.logConf", false)) {
    logInfo("Spark configuration:\n" + _conf.toDebugString)
  }
  
  // 設(shè)置Driver的host和port
  // Set Spark driver host and port system properties
  _conf.setIfMissing("spark.driver.host", Utils.localHostName())
  _conf.setIfMissing("spark.driver.port", "0")
  _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
  _jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
  _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0))
    .toSeq.flatten
  _eventLogDir =
    if (isEventLogEnabled) {
      // 這里需要進(jìn)行設(shè)置侠驯,否則默認(rèn)路徑是“/tmp/spark-events”,防止系統(tǒng)自動(dòng)清除
      val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
        .stripSuffix("/")
      Some(Utils.resolveURI(unresolvedDir))
    } else {
      None
    }
  _eventLogCodec = {
    val compress = _conf.getBoolean("spark.eventLog.compress", false)
    if (compress && isEventLogEnabled) {
      Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
    } else {
      None
    }
  }
  _conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName)
  if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
  // "_jobProgressListener" should be set up before creating SparkEnv because when creating
  // "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
  _jobProgressListener = new JobProgressListener(_conf)
  // 關(guān)于ListenerBus我們會(huì)單獨(dú)分析
  listenerBus.addListener(jobProgressListener)
  // Create the Spark execution environment (cache, map output tracker, etc)
  // 創(chuàng)建SparkEnv
  _env = createSparkEnv(_conf, isLocal, listenerBus)
  SparkEnv.set(_env)
  // 元數(shù)據(jù)清理器
  _metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)
  // 監(jiān)控job和Stage的執(zhí)行
  _statusTracker = new SparkStatusTracker(this)
  // 顯示Stage的執(zhí)行進(jìn)度奕巍,從statusTracker定期拉取活動(dòng)狀態(tài)的Stages的進(jìn)度吟策,將在Stage至少運(yùn)行500ms后顯示,如果多個(gè)Stage在同一時(shí)間執(zhí)行的止,則它們的狀態(tài)將會(huì)合并到一起檩坚,在一行中顯示,每200ms更新一次
  _progressBar =
    if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
      Some(new ConsoleProgressBar(this))
    } else {
      None
    }
  // Spark UI部分
  _ui =
    if (conf.getBoolean("spark.ui.enabled", true)) {
      Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
        _env.securityManager, appName, startTime = startTime))
    } else {
      // For tests, do not enable the UI
      None
    }
  // Bind the UI before starting the task scheduler to communicate
  // the bound port to the cluster manager properly
  _ui.foreach(_.bind())
  _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
  // 添加Jar包的依賴
  // Add each JAR given through the constructor
  if (jars != null) {
    jars.foreach(addJar)
  }
  if (files != null) {
    files.foreach(addFile)
  }
  
  // 獲取executor的內(nèi)存配置信息诅福,如果沒(méi)有設(shè)置匾委,默認(rèn)就是1G
  _executorMemory = _conf.getOption("spark.executor.memory")
    .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
    .orElse(Option(System.getenv("SPARK_MEM"))
    .map(warnSparkMem))
    .map(Utils.memoryStringToMb)
    .getOrElse(1024)
  // Convert java options to env vars as a work around
  // since we can't set env vars directly in sbt.
  for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
    value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
    executorEnvs(envKey) = value
  }
  Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
    executorEnvs("SPARK_PREPEND_CLASSES") = v
  }
  // Mesos相關(guān)的配置
  // The Mesos scheduler backend relies on this environment variable to set executor memory.
  // TODO: Set this only in the Mesos scheduler.
  executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
  executorEnvs ++= _conf.getExecutorEnv
  executorEnvs("SPARK_USER") = sparkUser
  // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
  // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
  _heartbeatReceiver = env.rpcEnv.setupEndpoint(
    HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
  // 下面就是SparkContext中最重要的部分,即創(chuàng)建一系列調(diào)度器
  // Create and start the scheduler
  val (sched, ts) = SparkContext.createTaskScheduler(this, master)
  _schedulerBackend = sched
  _taskScheduler = ts
  _dagScheduler = new DAGScheduler(this)
  _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
  // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
  // constructor
  _taskScheduler.start()
  // 初始化應(yīng)用程序Id信息
  _applicationId = _taskScheduler.applicationId()
  _applicationAttemptId = taskScheduler.applicationAttemptId()
  _conf.set("spark.app.id", _applicationId)
  _ui.foreach(_.setAppId(_applicationId))
  _env.blockManager.initialize(_applicationId)
  // 統(tǒng)計(jì)系統(tǒng)相關(guān)內(nèi)容
  // The metrics system for Driver need to be set spark.app.id to app ID.
  // So it should start after we get app ID from the task scheduler and set spark.app.id.
  metricsSystem.start()
  // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
  metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
  _eventLogger =
    if (isEventLogEnabled) {
      val logger =
        new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
          _conf, _hadoopConfiguration)
      logger.start()
      listenerBus.addListener(logger)
      Some(logger)
    } else {
      None
    }
  // Optionally scale number of executors dynamically based on workload. Exposed for testing.
  val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
  if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
    logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
  }
  _executorAllocationManager =
    if (dynamicAllocationEnabled) {
      Some(new ExecutorAllocationManager(this, listenerBus, _conf))
    } else {
      None
    }
  _executorAllocationManager.foreach(_.start())
  _cleaner =
    if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
      Some(new ContextCleaner(this))
    } else {
      None
    }
  _cleaner.foreach(_.start())
  setupAndStartListenerBus()
  postEnvironmentUpdate()
  postApplicationStart()
  // Post init
  _taskScheduler.postStartHook()
  _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
  _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
  _executorAllocationManager.foreach { e =>
    _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
  }
  // Make sure the context is stopped if the user forgets about it. This avoids leaving
  // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
  // is killed, though.
  _shutdownHookRef = ShutdownHookManager.addShutdownHook(
    ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
    logInfo("Invoking stop() from shutdown hook")
    stop()
  }
} catch {
  case NonFatal(e) =>
    logError("Error initializing SparkContext.", e)
    try {
      stop()
    } catch {
      case NonFatal(inner) =>
        logError("Error stopping SparkContext after init error.", inner)
    } finally {
      throw e
    }
}

其實(shí)SparkContext中最主要的三大核心對(duì)象就是DAGScheduler氓润、TaskScheduler赂乐、SchedulerBackend,我們會(huì)在接下來(lái)的文章中詳細(xì)進(jìn)行分析咖气。

SparkEnv

下面我們?cè)賮?lái)分析一下SparkEnv挨措,SparkEnv保存了一個(gè)正在運(yùn)行的Spark實(shí)例(Master或者Worker)的運(yùn)行時(shí)環(huán)境信息,包括序列化(serializer)采章、Akka的actor system(雖然1.6.x默認(rèn)使用的是Netty但是有一些歷史遺留代碼运嗜,spark2.x開(kāi)始已經(jīng)不在依賴Akka)、BlockManager悯舟、MapOutPutTracker(Shuffle過(guò)程中非常重要)等,跟SparkContext一樣砸民,這些具體的功能點(diǎn)我們會(huì)用單獨(dú)的文章分別進(jìn)行說(shuō)明抵怎,現(xiàn)在我們簡(jiǎn)單過(guò)濾一下SparkEnv的源碼:

先來(lái)看createDriverEnv和createExecutorEnv,顧名思義岭参,這兩個(gè)方法就是分別創(chuàng)建Driver和Executor的運(yùn)行時(shí)環(huán)境反惕。

/**
 * Create a SparkEnv for the driver.
 */
private[spark] def createDriverEnv(
    conf: SparkConf,
    isLocal: Boolean,
    listenerBus: LiveListenerBus,
    numCores: Int,
    mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
  assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
  assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
  val hostname = conf.get("spark.driver.host")
  val port = conf.get("spark.driver.port").toInt
  create(
    conf,
    SparkContext.DRIVER_IDENTIFIER,
    hostname,
    port,
    isDriver = true,
    isLocal = isLocal,
    numUsableCores = numCores,
    listenerBus = listenerBus,
    mockOutputCommitCoordinator = mockOutputCommitCoordinator
  )
}
/**
 * Create a SparkEnv for an executor.
 * In coarse-grained mode, the executor provides an actor system that is already instantiated.
 */
private[spark] def createExecutorEnv(
    conf: SparkConf,
    executorId: String,
    hostname: String,
    port: Int,
    numCores: Int,
    isLocal: Boolean): SparkEnv = {
  val env = create(
    conf,
    executorId,
    hostname,
    port,
    isDriver = false,
    isLocal = isLocal,
    numUsableCores = numCores
  )
  SparkEnv.set(env)
  env
}

可以看到這兩個(gè)方法最終都調(diào)用了create()方法:

private def create(
    conf: SparkConf,
    executorId: String,
    hostname: String,
    port: Int,
    isDriver: Boolean,
    isLocal: Boolean,
    numUsableCores: Int,
    listenerBus: LiveListenerBus = null,
    mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
  // Listener bus僅使用在Driver上,所以要進(jìn)行判斷
  // Listener bus is only used on the driver
  if (isDriver) {
    assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
  }
  // 實(shí)例化SecurityManager
  val securityManager = new SecurityManager(conf)
  // 這里是Spark1.6.x中遺留的一部分關(guān)于Akka的代碼演侯,spark2.x中已經(jīng)完全移除
  // Create the ActorSystem for Akka and get the port it binds to.
  val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
  // 創(chuàng)建rpcEnv
  val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
    clientMode = !isDriver)
  val actorSystem: ActorSystem =
    if (rpcEnv.isInstanceOf[AkkaRpcEnv]) {
      rpcEnv.y[AkkaRpcEnv].actorSystem
    } else {
      val actorSystemPort =
        if (port == 0 || rpcEnv.address == null) {
          port
        } else {
          rpcEnv.address.port + 1
        }
      // Create a ActorSystem for legacy codes
      AkkaUtils.createActorSystem(
        actorSystemName + "ActorSystem",
        hostname,
        actorSystemPort,
        conf,
        securityManager
      )._1
    }
  // Figure out which port Akka actually bound to in case the original port is 0 or occupied.
  // In the non-driver case, the RPC env's address may be null since it may not be listening
  // for incoming connections.
  // 設(shè)置Driver或者Executor的端口號(hào)
  if (isDriver) {
    conf.set("spark.driver.port", rpcEnv.address.port.toString)
  } else if (rpcEnv.address != null) {
    conf.set("spark.executor.port", rpcEnv.address.port.toString)
  }
  
  // 根據(jù)給定的類的名字進(jìn)行實(shí)例化操作
  // Create an instance of the class with the given name, possibly initializing it with our conf
  def instantiateClass[T](className: String): T = {
    val cls = Utils.classForName(className)
    // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
    // SparkConf, then one taking no arguments
    try {
      cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
        .newInstance(conf, new java.lang.Boolean(isDriver))
        .asInstanceOf[T]
    } catch {
      case _: NoSuchMethodException =>
        try {
          cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
        } catch {
          case _: NoSuchMethodException =>
            cls.getConstructor().newInstance().asInstanceOf[T]
        }
    }
  }
  // Create an instance of the class named by the given SparkConf property, or defaultClassName
  // if the property is not set, possibly initializing it with our conf
  def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
    instantiateClass[T](conf.get(propertyName, defaultClassName))
  }
  
  // 設(shè)置序列化器姿染,可以看到默認(rèn)使用的是Java的序列化器
  val serializer = instantiateClassFromConf[Serializer](
    "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
  logDebug(s"Using serializer: ${serializer.getClass}")
  val closureSerializer = instantiateClassFromConf[Serializer](
    "spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")
  def registerOrLookupEndpoint(
      name: String, endpointCreator: => RpcEndpoint):
    RpcEndpointRef = {
    if (isDriver) {
      logInfo("Registering " + name)
      rpcEnv.setupEndpoint(name, endpointCreator)
    } else {
      RpcUtils.makeDriverRef(name, conf, rpcEnv)
    }
  }
  
  // 如果是Driver實(shí)例化MapOutputTrackerMaster,如果是Executor實(shí)例化MapOutputTrackerWorker秒际,在Shuffle時(shí)會(huì)詳細(xì)說(shuō)明
  // 說(shuō)明MapOutputTracker也是Master/Slaves的結(jié)構(gòu)
  val mapOutputTracker = if (isDriver) {
    new MapOutputTrackerMaster(conf)
  } else {
    new MapOutputTrackerWorker(conf)
  }
  
  // 實(shí)例化向MapOutputTrackerMasterEndpoint并向MapOutputTracker注冊(cè)
  // Have to assign trackerActor after initialization as MapOutputTrackerActor
  // requires the MapOutputTracker itself
  mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
    new MapOutputTrackerMasterEndpoint(
      rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
  // Shuffle的配置信息悬赏,默認(rèn)使用的是SortShuffleManager
  // Let the user specify short names for shuffle managers
  val shortShuffleMgrNames = Map(
    "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
    "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
    "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
  val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
  val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
  // 實(shí)例化ShuffleManager,默認(rèn)是SortShuffleManager
  val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
  // 是否使用原始的MemoryManager娄徊,即StaticMemoryManager
  val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
  // 默認(rèn)使用的是UnifiedMemoryManager內(nèi)存管理器
  val memoryManager: MemoryManager =
    if (useLegacyMemoryManager) {
      new StaticMemoryManager(conf, numUsableCores)
    } else {
      UnifiedMemoryManager(conf, numUsableCores)
    }
  // 實(shí)例化BlockTransferService闽颇,默認(rèn)的實(shí)現(xiàn)方式是Netty,即NettyBlockTransferService
  val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
  // 下面是BlockManager相關(guān)的初始化過(guò)程寄锐,我們會(huì)用單獨(dú)的文章說(shuō)明
  val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
    BlockManagerMaster.DRIVER_ENDPOINT_NAME,
    new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
    conf, isDriver)
  // NB: blockManager is not valid until initialize() is called later.
  val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
    serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
    blockTransferService, securityManager, numUsableCores)
  // 實(shí)例化BroadcastManager
  val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
  // 實(shí)例化cacheManager
  val cacheManager = new CacheManager(blockManager)
  // 統(tǒng)計(jì)系統(tǒng)
  val metricsSystem = if (isDriver) {
    // Don't start metrics system right now for Driver.
    // We need to wait for the task scheduler to give us an app ID.
    // Then we can start the metrics system.
    MetricsSystem.createMetricsSystem("driver", conf, securityManager)
  } else {
    // We need to set the executor ID before the MetricsSystem is created because sources and
    // sinks specified in the metrics configuration file will want to incorporate this executor's
    // ID into the metrics they report.
    conf.set("spark.executor.id", executorId)
    val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
    ms.start()
    ms
  }
  
  // 設(shè)置sparkFiles的存儲(chǔ)目錄兵多,用來(lái)下載依賴尖啡。local模式下是一個(gè)臨時(shí)的目錄,分布式模式下是Executor的工作目錄
  // Set the sparkFiles directory, used when downloading dependencies.  In local mode,
  // this is a temporary directory; in distributed mode, this is the executor's current working
  // directory.
  val sparkFilesDir: String = if (isDriver) {
    Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
  } else {
    "."
  }
  
  // outputCommitCoordinator相關(guān)的初始化及注冊(cè)部分
  val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
    new OutputCommitCoordinator(conf, isDriver)
  }
  val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
    new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
  outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
  val envInstance = new SparkEnv(
    executorId,
    rpcEnv,
    actorSystem,
    serializer,
    closureSerializer,
    cacheManager,
    mapOutputTracker,
    shuffleManager,
    broadcastManager,
    blockTransferService,
    blockManager,
    securityManager,
    sparkFilesDir,
    metricsSystem,
    memoryManager,
    outputCommitCoordinator,
    conf)
  // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
  // called, and we only need to do it for driver. Because driver may run as a service, and if we
  // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
  if (isDriver) {
    envInstance.driverTmpDirToDelete = Some(sparkFilesDir)
  }
  envInstance
}

本文只是帶領(lǐng)大家瀏覽了一下SparkContext和SparkEnv的源碼剩膘,具體的模塊會(huì)進(jìn)行單獨(dú)的分析衅斩。

本文為原創(chuàng),歡迎轉(zhuǎn)載怠褐,轉(zhuǎn)載請(qǐng)注明出處矛渴、作者,謝謝惫搏!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末具温,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子筐赔,更是在濱河造成了極大的恐慌铣猩,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,198評(píng)論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件茴丰,死亡現(xiàn)場(chǎng)離奇詭異达皿,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)贿肩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門峦椰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人汰规,你說(shuō)我怎么就攤上這事汤功。” “怎么了溜哮?”我有些...
    開(kāi)封第一講書(shū)人閱讀 167,643評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵滔金,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我茂嗓,道長(zhǎng)餐茵,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,495評(píng)論 1 296
  • 正文 為了忘掉前任述吸,我火速辦了婚禮忿族,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蝌矛。我一直安慰自己道批,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,502評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布朴读。 她就那樣靜靜地躺著屹徘,像睡著了一般。 火紅的嫁衣襯著肌膚如雪衅金。 梳的紋絲不亂的頭發(fā)上噪伊,一...
    開(kāi)封第一講書(shū)人閱讀 52,156評(píng)論 1 308
  • 那天簿煌,我揣著相機(jī)與錄音,去河邊找鬼鉴吹。 笑死姨伟,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的豆励。 我是一名探鬼主播夺荒,決...
    沈念sama閱讀 40,743評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼良蒸!你這毒婦竟也來(lái)了技扼?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,659評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤嫩痰,失蹤者是張志新(化名)和其女友劉穎剿吻,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體串纺,經(jīng)...
    沈念sama閱讀 46,200評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡丽旅,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,282評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了纺棺。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片榄笙。...
    茶點(diǎn)故事閱讀 40,424評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖祷蝌,靈堂內(nèi)的尸體忽然破棺而出茅撞,到底是詐尸還是另有隱情,我是刑警寧澤杆逗,帶...
    沈念sama閱讀 36,107評(píng)論 5 349
  • 正文 年R本政府宣布乡翅,位于F島的核電站,受9級(jí)特大地震影響罪郊,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜尚洽,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,789評(píng)論 3 333
  • 文/蒙蒙 一悔橄、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧腺毫,春花似錦癣疟、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,264評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至急黎,卻和暖如春扎狱,著一層夾襖步出監(jiān)牢的瞬間侧到,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,390評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工淤击, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留匠抗,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,798評(píng)論 3 376
  • 正文 我出身青樓污抬,卻偏偏與公主長(zhǎng)得像汞贸,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子印机,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,435評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容