SparkContext的初始化
SparkContext是應用啟動時創(chuàng)建的Spark上下文對象,是進行Spark應用開發(fā)的主要接口,是Spark上層應用與底層實現(xiàn)的中轉(zhuǎn)站(SparkContext負責給executors發(fā)送task)。
SparkContext在初始化過程中,主要涉及一下內(nèi)容:
- SparkEnv
- DAGScheduler
- TaskScheduler
- SchedulerBackend
- SparkUI
生成SparkConf
SparkContext的構造函數(shù)中最重要的入?yún)⑹荢parkConf。SparkContext進行初始化的時候,首先要根據(jù)初始化入?yún)順嫿⊿parkConf對象袄琳,進而再去創(chuàng)建SparkEnv。

創(chuàng)建SparkConf對象來管理spark應用的屬性設置。SparkConf類比較簡單,是通過一個HashMap容器來管理key、value類型的屬性。
下圖為SparkConf類聲明,其中setting變量為HashMap容器:

下面是SparkContext類中,關于SparkConf對象的拷貝過程:

創(chuàng)建LiveListenerBus監(jiān)聽器
這是典型的觀察者模式,向LiveListenerBus類注冊不同類型的SparkListenerEvent事件,SparkListenerBus會遍歷它的所有監(jiān)聽者SparkListener,然后找出事件對應的接口進行響應。

下面是SparkContext創(chuàng)建LiveListenerBus對象:
// An asynchronous listener bus for Spark events
private[spark] val listenerBus = new LiveListenerBus
創(chuàng)建SparkEnv運行環(huán)境
在SparkEnv中創(chuàng)建了MapOutputTracker、MasterActor扫沼、BlockManager伴找、CacheManager袒炉、HttpFileServer一系列對象。
下圖為生成SparkEnv的代碼:

SparkEnv的構造函數(shù)入?yún)⒘斜頌椋?/p>
class SparkEnv (
val executorId: String,
val actorSystem: ActorSystem,
val serializer: Serializer,
val closureSerializer: Serializer,
val cacheManager: CacheManager,
val mapOutputTracker: MapOutputTracker,
val shuffleManager: ShuffleManager,
val broadcastManager: BroadcastManager,
val blockTransferService: BlockTransferService,
val blockManager: BlockManager,
val securityManager: SecurityManager,
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
val shuffleMemoryManager: ShuffleMemoryManager,
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf) extends Logging
這里說明幾個入?yún)⒌淖饔茫?/p>
- cacheManager: 用于存儲中間計算結果
- mapOutputTracker: 用來緩存MapStatus信息肥隆,并提供從MapOutputMaster獲取信息的功能
- shuffleManager: 路由維護表
- broadcastManager: 廣播
- blockManager: 塊管理
- securityManager: 安全管理
- httpFileServer: 文件存儲服務器
*l sparkFilesDir: 文件存儲目錄 - metricsSystem: 測量
- conf: 配置文件
創(chuàng)建SparkUI
下面是SparkContext初始化SparkUI的代碼:

其中菌湃,在SparkUI對象初始化函數(shù)中遍略,注冊了StorageStatusListener監(jiān)聽器惧所,負責監(jiān)聽Storage的變化及時的展示到Spark web頁面上绪杏。attachTab方法中添加對象正是我們在Spark Web頁面中看到的那個標簽。
/** Initialize all components of the server. */
def initialize() {
attachTab(new JobsTab(this))
val stagesTab = new StagesTab(this)
attachTab(stagesTab)
attachTab(new StorageTab(this))
attachTab(new EnvironmentTab(this))
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
attachHandler(
createRedirectHandler("/stages/stage/kill", "/stages", stagesTab.handleKillRequest))
}
創(chuàng)建TaskScheduler和DAGScheduler并啟動運行
在SparkContext中, 最主要的初始化工作就是創(chuàng)建TaskScheduler和DAGScheduler, 這兩個就是Spark的核心所在。
Spark的設計非常的干凈, 把整個DAG抽象層從實際的task執(zhí)行中剝離了出來DAGScheduler, 負責解析spark命令,生成stage, 形成DAG, 最終劃分成tasks, 提交給TaskScheduler, 他只完成靜態(tài)分析TaskScheduler,專門負責task執(zhí)行, 他只負責資源管理, task分配, 執(zhí)行情況的報告。
這樣設計的好處, 就是Spark可以通過提供不同的TaskScheduler簡單的支持各種資源調(diào)度和執(zhí)行平臺
下面代碼是根據(jù)Spark的運行模式來選擇相應的SchedulerBackend余掖,同時啟動TaskScheduler:

其中,
createTaskScheduler
最為關鍵的一點就是根據(jù)master變量來判斷Spark當前的部署方式宵喂,進而生成相應的SchedulerBackend的不同子類。創(chuàng)建的SchedulerBackend放置在TaskScheduler中泼疑,在后續(xù)的Task分發(fā)過程中扮演著重要角色德绿。
TaskScheduler.start
的目的是啟動相應的SchedulerBackend,并啟動定時器進行檢測退渗,下面是該函數(shù)源碼(定義在TaskSchedulerImpl.scala
文件中):
override def start() {
backend.start()
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
Utils.tryOrExit { checkSpeculatableTasks() }
}
}
}
添加EventLoggingListener監(jiān)聽器
這個默認是關閉的移稳,可以通過spark.eventLog.enabled配置開啟。它主要功能是以json格式記錄發(fā)生的事件:
// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
} else None
}
加入SparkListenerEvent事件
往LiveListenerBus中加入了SparkListenerEnvironmentUpdate氓辣、SparkListenerApplicationStart兩類事件秒裕,對這兩種事件監(jiān)聽的監(jiān)聽器就會調(diào)用onEnvironmentUpdate、onApplicationStart方法進行處理钞啸。
setupAndStartListenerBus()
postEnvironmentUpdate()
postApplicationStart()
SparkContext類中的關鍵函數(shù)
textFile
要載入被處理的數(shù)據(jù), 最常用的textFile, 其實就是生成HadoopRDD, 作為起始的RDD
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
runJob
關鍵在于調(diào)用了dagScheduler.runJob
/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark. The allowLocal
* flag specifies whether the scheduler can run the computation on the driver rather than
* shipping it out to the cluster, for short actions like first().
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
if (stopped) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
說明
以上的源碼解讀基于spark-1.3.1源代碼工程文件
轉(zhuǎn)載請注明作者Jason Ding及其出處
GitCafe博客主頁(http://jasonding1354.gitcafe.io/)
Github博客主頁(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
簡書主頁(http://www.reibang.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354進入我的博客主頁