本文我們將詳細(xì)分析SparkContext的源碼黔帕。
我們先來(lái)看一下SparkConf:
SparkConf
SparkContext實(shí)例化的時(shí)候需要傳進(jìn)一個(gè)SparkConf作為參數(shù),SparkConf描述整個(gè)Spark應(yīng)用程序的配置信息,如果和系統(tǒng)默認(rèn)的配置沖突就會(huì)覆蓋系統(tǒng)默認(rèn)的設(shè)置妈嘹。我們經(jīng)常會(huì)在單元測(cè)試的時(shí)候使用new SparkConf(fasle)(如果不傳入?yún)?shù),默認(rèn)是true)實(shí)例化SparkConf绍妨,這樣就不會(huì)加載“conf/”下默認(rèn)的配置润脸,這樣無(wú)論在什么樣的集群環(huán)境中運(yùn)行單元測(cè)試,其配置都是一樣的他去,不會(huì)隨著環(huán)境的變化而變化毙驯。另外在程序運(yùn)行的時(shí)候不能修改SparkConf,數(shù)據(jù)結(jié)構(gòu)通過(guò)ConcurrentHashMap進(jìn)行維護(hù)灾测,只能通過(guò)clone的方式讀取Spark的配置信息爆价。最后需要說(shuō)明的是SparkConf可以進(jìn)行鏈?zhǔn)降恼{(diào)用,即:
new SparkConf().setMaster("local").setAppName("TestApp")
因?yàn)檫@些方法在設(shè)置完配置信息后最終都返回了自己媳搪,即SparkConf本身铭段,SparkConf的部分源碼如下:
// 用來(lái)存儲(chǔ)key-value的配置信息
private val settings = new ConcurrentHashMap[String, String]()
// 默認(rèn)會(huì)加載“spark.*”格式的配置信息
if (loadDefaults) {
// Load any spark.* system properties
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value)
}
}
/** Set a configuration variable. */
def set(key: String, value: String): SparkConf = {
if (key == null) {
throw new NullPointerException("null key")
}
if (value == null) {
throw new NullPointerException("null value for " + key)
}
logDeprecationWarning(key)
settings.put(key, value)
// 這里我們可以清楚的看到,每次進(jìn)行設(shè)置后都會(huì)返回SparkConf自身秦爆,所以可以進(jìn)行鏈?zhǔn)降恼{(diào)用
this
}
SparkContext
SparkContext是整個(gè)Spark功能的入口序愚,代表了應(yīng)用程序與整個(gè)集群的連接點(diǎn),通過(guò)SparkContext可以創(chuàng)建RDD等限、Accumulators和Broadcast爸吮。
Spark應(yīng)用程序是通過(guò)SparkContext發(fā)布到Spark集群的,并且Spark程序的運(yùn)行都是在SparkContext為核心的調(diào)度指揮下進(jìn)行的望门,SparkContext崩潰或者結(jié)束就代表Spark應(yīng)用程序執(zhí)行結(jié)束形娇,可見(jiàn)SparkContext在Spark中是多么的重要,下面我們結(jié)合源碼進(jìn)行詳細(xì)分析(只選取重要部分):
// 方便開(kāi)發(fā)人員查看調(diào)用的信息
// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()
// 是否允許存在多個(gè)SparkContext怒允,默認(rèn)是false
// If true, log warnings instead of throwing exceptions when multiple SparkContexts are active
private val allowMultipleContexts: Boolean =
config.getBoolean("spark.driver.allowMultipleContexts", false)
// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having started construction.
// NOTE: this must be placed at the beginning of the SparkContext constructor.
SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
val startTime = System.currentTimeMillis()
// 判斷上下文狀態(tài)
private[spark] val stopped: AtomicBoolean = new AtomicBoolean(false)
...
// 消息通信相關(guān)的內(nèi)容埂软,我們會(huì)單獨(dú)進(jìn)行說(shuō)明
// An asynchronous listener bus for Spark events
private[spark] val listenerBus = new LiveListenerBus
...
// 追蹤所有執(zhí)行持久化的RDD
// Keeps track of all persisted RDDs
private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]
...
// 傳遞給executors的環(huán)境變量信息
// Environment variables to pass to our executors.
private[spark] val executorEnvs = HashMap[String, String]()
// 配置SPARK_USER
// Set SPARK_USER for user who is running SparkContext.
val sparkUser = Utils.getCurrentUserName()
同時(shí)還有幾個(gè)重載的構(gòu)造方法,我們不進(jìn)行一一說(shuō)明纫事,下面我們來(lái)看SparkContext中最重要的一個(gè)部分勘畔,即try里面的內(nèi)容(大部分初始化的工作都在這里面,因?yàn)閮?nèi)容較多丽惶,大家可以根據(jù)具體的功能點(diǎn)進(jìn)行查看):
try {
// 讀取SparkConf的信息炫七,即Spark的配置信息,并檢查是否有非法的配置信息
_conf = config.clone()
_conf.validateSettings()
// 判斷是否配置了Master钾唬,沒(méi)有的話拋出異常
if (!_conf.contains("spark.master")) {
throw new SparkException("A master URL must be set in your configuration")
}
// 判斷是否配置了AppName万哪,沒(méi)有的話拋出異常
if (!_conf.contains("spark.app.name")) {
throw new SparkException("An application name must be set in your configuration")
}
// Yarn模式下的判斷
// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
// yarn-standalone is deprecated, but still supported
if ((master == "yarn-cluster" || master == "yarn-standalone") &&
!_conf.contains("spark.yarn.app.id")) {
throw new SparkException("Detected yarn-cluster mode, but isn't running on a cluster. " +
"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
}
if (_conf.getBoolean("spark.logConf", false)) {
logInfo("Spark configuration:\n" + _conf.toDebugString)
}
// 設(shè)置Driver的host和port
// Set Spark driver host and port system properties
_conf.setIfMissing("spark.driver.host", Utils.localHostName())
_conf.setIfMissing("spark.driver.port", "0")
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
_jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0))
.toSeq.flatten
_eventLogDir =
if (isEventLogEnabled) {
// 這里需要進(jìn)行設(shè)置侠驯,否則默認(rèn)路徑是“/tmp/spark-events”,防止系統(tǒng)自動(dòng)清除
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
.stripSuffix("/")
Some(Utils.resolveURI(unresolvedDir))
} else {
None
}
_eventLogCodec = {
val compress = _conf.getBoolean("spark.eventLog.compress", false)
if (compress && isEventLogEnabled) {
Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
} else {
None
}
}
_conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName)
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
// "_jobProgressListener" should be set up before creating SparkEnv because when creating
// "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
_jobProgressListener = new JobProgressListener(_conf)
// 關(guān)于ListenerBus我們會(huì)單獨(dú)分析
listenerBus.addListener(jobProgressListener)
// Create the Spark execution environment (cache, map output tracker, etc)
// 創(chuàng)建SparkEnv
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
// 元數(shù)據(jù)清理器
_metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)
// 監(jiān)控job和Stage的執(zhí)行
_statusTracker = new SparkStatusTracker(this)
// 顯示Stage的執(zhí)行進(jìn)度奕巍,從statusTracker定期拉取活動(dòng)狀態(tài)的Stages的進(jìn)度吟策,將在Stage至少運(yùn)行500ms后顯示,如果多個(gè)Stage在同一時(shí)間執(zhí)行的止,則它們的狀態(tài)將會(huì)合并到一起檩坚,在一行中顯示,每200ms更新一次
_progressBar =
if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
}
// Spark UI部分
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
_env.securityManager, appName, startTime = startTime))
} else {
// For tests, do not enable the UI
None
}
// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
_ui.foreach(_.bind())
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
// 添加Jar包的依賴
// Add each JAR given through the constructor
if (jars != null) {
jars.foreach(addJar)
}
if (files != null) {
files.foreach(addFile)
}
// 獲取executor的內(nèi)存配置信息诅福,如果沒(méi)有設(shè)置匾委,默認(rèn)就是1G
_executorMemory = _conf.getOption("spark.executor.memory")
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM"))
.map(warnSparkMem))
.map(Utils.memoryStringToMb)
.getOrElse(1024)
// Convert java options to env vars as a work around
// since we can't set env vars directly in sbt.
for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
executorEnvs(envKey) = value
}
Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
executorEnvs("SPARK_PREPEND_CLASSES") = v
}
// Mesos相關(guān)的配置
// The Mesos scheduler backend relies on this environment variable to set executor memory.
// TODO: Set this only in the Mesos scheduler.
executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
executorEnvs ++= _conf.getExecutorEnv
executorEnvs("SPARK_USER") = sparkUser
// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
// 下面就是SparkContext中最重要的部分,即創(chuàng)建一系列調(diào)度器
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()
// 初始化應(yīng)用程序Id信息
_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
_ui.foreach(_.setAppId(_applicationId))
_env.blockManager.initialize(_applicationId)
// 統(tǒng)計(jì)系統(tǒng)相關(guān)內(nèi)容
// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
metricsSystem.start()
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
_eventLogger =
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
} else {
None
}
// Optionally scale number of executors dynamically based on workload. Exposed for testing.
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
}
_executorAllocationManager =
if (dynamicAllocationEnabled) {
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
} else {
None
}
_executorAllocationManager.foreach(_.start())
_cleaner =
if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
} else {
None
}
_cleaner.foreach(_.start())
setupAndStartListenerBus()
postEnvironmentUpdate()
postApplicationStart()
// Post init
_taskScheduler.postStartHook()
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
// is killed, though.
_shutdownHookRef = ShutdownHookManager.addShutdownHook(
ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")
stop()
}
} catch {
case NonFatal(e) =>
logError("Error initializing SparkContext.", e)
try {
stop()
} catch {
case NonFatal(inner) =>
logError("Error stopping SparkContext after init error.", inner)
} finally {
throw e
}
}
其實(shí)SparkContext中最主要的三大核心對(duì)象就是DAGScheduler氓润、TaskScheduler赂乐、SchedulerBackend,我們會(huì)在接下來(lái)的文章中詳細(xì)進(jìn)行分析咖气。
SparkEnv
下面我們?cè)賮?lái)分析一下SparkEnv挨措,SparkEnv保存了一個(gè)正在運(yùn)行的Spark實(shí)例(Master或者Worker)的運(yùn)行時(shí)環(huán)境信息,包括序列化(serializer)采章、Akka的actor system(雖然1.6.x默認(rèn)使用的是Netty但是有一些歷史遺留代碼运嗜,spark2.x開(kāi)始已經(jīng)不在依賴Akka)、BlockManager悯舟、MapOutPutTracker(Shuffle過(guò)程中非常重要)等,跟SparkContext一樣砸民,這些具體的功能點(diǎn)我們會(huì)用單獨(dú)的文章分別進(jìn)行說(shuō)明抵怎,現(xiàn)在我們簡(jiǎn)單過(guò)濾一下SparkEnv的源碼:
先來(lái)看createDriverEnv和createExecutorEnv,顧名思義岭参,這兩個(gè)方法就是分別創(chuàng)建Driver和Executor的運(yùn)行時(shí)環(huán)境反惕。
/**
* Create a SparkEnv for the driver.
*/
private[spark] def createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
val hostname = conf.get("spark.driver.host")
val port = conf.get("spark.driver.port").toInt
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
hostname,
port,
isDriver = true,
isLocal = isLocal,
numUsableCores = numCores,
listenerBus = listenerBus,
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
}
/**
* Create a SparkEnv for an executor.
* In coarse-grained mode, the executor provides an actor system that is already instantiated.
*/
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
numCores: Int,
isLocal: Boolean): SparkEnv = {
val env = create(
conf,
executorId,
hostname,
port,
isDriver = false,
isLocal = isLocal,
numUsableCores = numCores
)
SparkEnv.set(env)
env
}
可以看到這兩個(gè)方法最終都調(diào)用了create()方法:
private def create(
conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
isDriver: Boolean,
isLocal: Boolean,
numUsableCores: Int,
listenerBus: LiveListenerBus = null,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
// Listener bus僅使用在Driver上,所以要進(jìn)行判斷
// Listener bus is only used on the driver
if (isDriver) {
assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
}
// 實(shí)例化SecurityManager
val securityManager = new SecurityManager(conf)
// 這里是Spark1.6.x中遺留的一部分關(guān)于Akka的代碼演侯,spark2.x中已經(jīng)完全移除
// Create the ActorSystem for Akka and get the port it binds to.
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
// 創(chuàng)建rpcEnv
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
clientMode = !isDriver)
val actorSystem: ActorSystem =
if (rpcEnv.isInstanceOf[AkkaRpcEnv]) {
rpcEnv.y[AkkaRpcEnv].actorSystem
} else {
val actorSystemPort =
if (port == 0 || rpcEnv.address == null) {
port
} else {
rpcEnv.address.port + 1
}
// Create a ActorSystem for legacy codes
AkkaUtils.createActorSystem(
actorSystemName + "ActorSystem",
hostname,
actorSystemPort,
conf,
securityManager
)._1
}
// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
// In the non-driver case, the RPC env's address may be null since it may not be listening
// for incoming connections.
// 設(shè)置Driver或者Executor的端口號(hào)
if (isDriver) {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
} else if (rpcEnv.address != null) {
conf.set("spark.executor.port", rpcEnv.address.port.toString)
}
// 根據(jù)給定的類的名字進(jìn)行實(shí)例化操作
// Create an instance of the class with the given name, possibly initializing it with our conf
def instantiateClass[T](className: String): T = {
val cls = Utils.classForName(className)
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
// SparkConf, then one taking no arguments
try {
cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
.newInstance(conf, new java.lang.Boolean(isDriver))
.asInstanceOf[T]
} catch {
case _: NoSuchMethodException =>
try {
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
} catch {
case _: NoSuchMethodException =>
cls.getConstructor().newInstance().asInstanceOf[T]
}
}
}
// Create an instance of the class named by the given SparkConf property, or defaultClassName
// if the property is not set, possibly initializing it with our conf
def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
instantiateClass[T](conf.get(propertyName, defaultClassName))
}
// 設(shè)置序列化器姿染,可以看到默認(rèn)使用的是Java的序列化器
val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")
val closureSerializer = instantiateClassFromConf[Serializer](
"spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")
def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
if (isDriver) {
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
RpcUtils.makeDriverRef(name, conf, rpcEnv)
}
}
// 如果是Driver實(shí)例化MapOutputTrackerMaster,如果是Executor實(shí)例化MapOutputTrackerWorker秒际,在Shuffle時(shí)會(huì)詳細(xì)說(shuō)明
// 說(shuō)明MapOutputTracker也是Master/Slaves的結(jié)構(gòu)
val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf)
} else {
new MapOutputTrackerWorker(conf)
}
// 實(shí)例化向MapOutputTrackerMasterEndpoint并向MapOutputTracker注冊(cè)
// Have to assign trackerActor after initialization as MapOutputTrackerActor
// requires the MapOutputTracker itself
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
// Shuffle的配置信息悬赏,默認(rèn)使用的是SortShuffleManager
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
// 實(shí)例化ShuffleManager,默認(rèn)是SortShuffleManager
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
// 是否使用原始的MemoryManager娄徊,即StaticMemoryManager
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
// 默認(rèn)使用的是UnifiedMemoryManager內(nèi)存管理器
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}
// 實(shí)例化BlockTransferService闽颇,默認(rèn)的實(shí)現(xiàn)方式是Netty,即NettyBlockTransferService
val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
// 下面是BlockManager相關(guān)的初始化過(guò)程寄锐,我們會(huì)用單獨(dú)的文章說(shuō)明
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)
// 實(shí)例化BroadcastManager
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
// 實(shí)例化cacheManager
val cacheManager = new CacheManager(blockManager)
// 統(tǒng)計(jì)系統(tǒng)
val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
}
// 設(shè)置sparkFiles的存儲(chǔ)目錄兵多,用來(lái)下載依賴尖啡。local模式下是一個(gè)臨時(shí)的目錄,分布式模式下是Executor的工作目錄
// Set the sparkFiles directory, used when downloading dependencies. In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working
// directory.
val sparkFilesDir: String = if (isDriver) {
Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
} else {
"."
}
// outputCommitCoordinator相關(guān)的初始化及注冊(cè)部分
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf, isDriver)
}
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
val envInstance = new SparkEnv(
executorId,
rpcEnv,
actorSystem,
serializer,
closureSerializer,
cacheManager,
mapOutputTracker,
shuffleManager,
broadcastManager,
blockTransferService,
blockManager,
securityManager,
sparkFilesDir,
metricsSystem,
memoryManager,
outputCommitCoordinator,
conf)
// Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
// called, and we only need to do it for driver. Because driver may run as a service, and if we
// don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
if (isDriver) {
envInstance.driverTmpDirToDelete = Some(sparkFilesDir)
}
envInstance
}
本文只是帶領(lǐng)大家瀏覽了一下SparkContext和SparkEnv的源碼剩膘,具體的模塊會(huì)進(jìn)行單獨(dú)的分析衅斩。
本文為原創(chuàng),歡迎轉(zhuǎn)載怠褐,轉(zhuǎn)載請(qǐng)注明出處矛渴、作者,謝謝惫搏!