【Spark】SparkContext源碼解讀

SparkContext的初始化

SparkContext是應用啟動時創(chuàng)建的Spark上下文對象,是進行Spark應用開發(fā)的主要接口,是Spark上層應用與底層實現(xiàn)的中轉(zhuǎn)站(SparkContext負責給executors發(fā)送task)。
SparkContext在初始化過程中,主要涉及一下內(nèi)容:

  • SparkEnv
  • DAGScheduler
  • TaskScheduler
  • SchedulerBackend
  • SparkUI

生成SparkConf

SparkContext的構造函數(shù)中最重要的入?yún)⑹荢parkConf。SparkContext進行初始化的時候,首先要根據(jù)初始化入?yún)順嫿⊿parkConf對象袄琳,進而再去創(chuàng)建SparkEnv。



創(chuàng)建SparkConf對象來管理spark應用的屬性設置。SparkConf類比較簡單,是通過一個HashMap容器來管理key、value類型的屬性。
下圖為SparkConf類聲明,其中setting變量為HashMap容器:



下面是SparkContext類中,關于SparkConf對象的拷貝過程:

創(chuàng)建LiveListenerBus監(jiān)聽器

這是典型的觀察者模式,向LiveListenerBus類注冊不同類型的SparkListenerEvent事件,SparkListenerBus會遍歷它的所有監(jiān)聽者SparkListener,然后找出事件對應的接口進行響應。


下面是SparkContext創(chuàng)建LiveListenerBus對象:

  // An asynchronous listener bus for Spark events
  private[spark] val listenerBus = new LiveListenerBus

創(chuàng)建SparkEnv運行環(huán)境

在SparkEnv中創(chuàng)建了MapOutputTracker、MasterActor扫沼、BlockManager伴找、CacheManager袒炉、HttpFileServer一系列對象。
下圖為生成SparkEnv的代碼:


SparkEnv的構造函數(shù)入?yún)⒘斜頌椋?/p>

class SparkEnv (
    val executorId: String,
    val actorSystem: ActorSystem,
    val serializer: Serializer,
    val closureSerializer: Serializer,
    val cacheManager: CacheManager,
    val mapOutputTracker: MapOutputTracker,
    val shuffleManager: ShuffleManager,
    val broadcastManager: BroadcastManager,
    val blockTransferService: BlockTransferService,
    val blockManager: BlockManager,
    val securityManager: SecurityManager,
    val httpFileServer: HttpFileServer,
    val sparkFilesDir: String,
    val metricsSystem: MetricsSystem,
    val shuffleMemoryManager: ShuffleMemoryManager,
    val outputCommitCoordinator: OutputCommitCoordinator,
    val conf: SparkConf) extends Logging

這里說明幾個入?yún)⒌淖饔茫?/p>

  • cacheManager: 用于存儲中間計算結果
  • mapOutputTracker: 用來緩存MapStatus信息肥隆,并提供從MapOutputMaster獲取信息的功能
  • shuffleManager: 路由維護表
  • broadcastManager: 廣播
  • blockManager: 塊管理
  • securityManager: 安全管理
  • httpFileServer: 文件存儲服務器
    *l sparkFilesDir: 文件存儲目錄
  • metricsSystem: 測量
  • conf: 配置文件

創(chuàng)建SparkUI

下面是SparkContext初始化SparkUI的代碼:


其中菌湃,在SparkUI對象初始化函數(shù)中遍略,注冊了StorageStatusListener監(jiān)聽器惧所,負責監(jiān)聽Storage的變化及時的展示到Spark web頁面上绪杏。attachTab方法中添加對象正是我們在Spark Web頁面中看到的那個標簽。

  /** Initialize all components of the server. */
  def initialize() {
    attachTab(new JobsTab(this))
    val stagesTab = new StagesTab(this)
    attachTab(stagesTab)
    attachTab(new StorageTab(this))
    attachTab(new EnvironmentTab(this))
    attachTab(new ExecutorsTab(this))
    attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
    attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
    attachHandler(
      createRedirectHandler("/stages/stage/kill", "/stages", stagesTab.handleKillRequest))
  }

創(chuàng)建TaskScheduler和DAGScheduler并啟動運行

在SparkContext中, 最主要的初始化工作就是創(chuàng)建TaskScheduler和DAGScheduler, 這兩個就是Spark的核心所在。

Spark的設計非常的干凈, 把整個DAG抽象層從實際的task執(zhí)行中剝離了出來DAGScheduler, 負責解析spark命令,生成stage, 形成DAG, 最終劃分成tasks, 提交給TaskScheduler, 他只完成靜態(tài)分析TaskScheduler,專門負責task執(zhí)行, 他只負責資源管理, task分配, 執(zhí)行情況的報告。
這樣設計的好處, 就是Spark可以通過提供不同的TaskScheduler簡單的支持各種資源調(diào)度和執(zhí)行平臺

下面代碼是根據(jù)Spark的運行模式來選擇相應的SchedulerBackend余掖,同時啟動TaskScheduler:


其中,createTaskScheduler最為關鍵的一點就是根據(jù)master變量來判斷Spark當前的部署方式宵喂,進而生成相應的SchedulerBackend的不同子類。創(chuàng)建的SchedulerBackend放置在TaskScheduler中泼疑,在后續(xù)的Task分發(fā)過程中扮演著重要角色德绿。

TaskScheduler.start的目的是啟動相應的SchedulerBackend,并啟動定時器進行檢測退渗,下面是該函數(shù)源碼(定義在TaskSchedulerImpl.scala文件中):

  override def start() {
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      import sc.env.actorSystem.dispatcher
      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
            SPECULATION_INTERVAL milliseconds) {
        Utils.tryOrExit { checkSpeculatableTasks() }
      }
    }
  }

添加EventLoggingListener監(jiān)聽器

這個默認是關閉的移稳,可以通過spark.eventLog.enabled配置開啟。它主要功能是以json格式記錄發(fā)生的事件:

  // Optionally log Spark events
  private[spark] val eventLogger: Option[EventLoggingListener] = {
    if (isEventLogEnabled) {
      val logger =
        new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
      logger.start()
      listenerBus.addListener(logger)
      Some(logger)
    } else None
  }

加入SparkListenerEvent事件

往LiveListenerBus中加入了SparkListenerEnvironmentUpdate氓辣、SparkListenerApplicationStart兩類事件秒裕,對這兩種事件監(jiān)聽的監(jiān)聽器就會調(diào)用onEnvironmentUpdate、onApplicationStart方法進行處理钞啸。

  setupAndStartListenerBus()
  postEnvironmentUpdate()
  postApplicationStart()

SparkContext類中的關鍵函數(shù)

textFile

要載入被處理的數(shù)據(jù), 最常用的textFile, 其實就是生成HadoopRDD, 作為起始的RDD

  /**
   * Read a text file from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI, and return it as an RDD of Strings.
   */
  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }


    /** Get an RDD for a Hadoop file with an arbitrary InputFormat
   *
   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
   * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
   * operation will create many references to the same object.
   * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
   * copy them using a `map` function.
   */
  def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions
      ): RDD[(K, V)] = {
    assertNotStopped()
    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
    val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
  }

runJob

關鍵在于調(diào)用了dagScheduler.runJob

  /**
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark. The allowLocal
   * flag specifies whether the scheduler can run the computation on the driver rather than
   * shipping it out to the cluster, for short actions like first().
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit) {
    if (stopped) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
      resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

說明

以上的源碼解讀基于spark-1.3.1源代碼工程文件

轉(zhuǎn)載請注明作者Jason Ding及其出處
GitCafe博客主頁(http://jasonding1354.gitcafe.io/)
Github博客主頁(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
簡書主頁(http://www.reibang.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354進入我的博客主頁

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末几蜻,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子体斩,更是在濱河造成了極大的恐慌梭稚,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,464評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件絮吵,死亡現(xiàn)場離奇詭異弧烤,居然都是意外死亡,警方通過查閱死者的電腦和手機蹬敲,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,033評論 3 399
  • 文/潘曉璐 我一進店門暇昂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人伴嗡,你說我怎么就攤上這事急波。” “怎么了瘪校?”我有些...
    開封第一講書人閱讀 169,078評論 0 362
  • 文/不壞的土叔 我叫張陵澄暮,是天一觀的道長名段。 經(jīng)常有香客問我,道長泣懊,這世上最難降的妖魔是什么伸辟? 我笑而不...
    開封第一講書人閱讀 59,979評論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮馍刮,結果婚禮上信夫,老公的妹妹穿的比我還像新娘。我一直安慰自己渠退,他們只是感情好忙迁,可當我...
    茶點故事閱讀 69,001評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著碎乃,像睡著了一般姊扔。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上梅誓,一...
    開封第一講書人閱讀 52,584評論 1 312
  • 那天恰梢,我揣著相機與錄音,去河邊找鬼梗掰。 笑死嵌言,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的及穗。 我是一名探鬼主播摧茴,決...
    沈念sama閱讀 41,085評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼埂陆!你這毒婦竟也來了苛白?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 40,023評論 0 277
  • 序言:老撾萬榮一對情侶失蹤焚虱,失蹤者是張志新(化名)和其女友劉穎购裙,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鹃栽,經(jīng)...
    沈念sama閱讀 46,555評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡躏率,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,626評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了民鼓。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片薇芝。...
    茶點故事閱讀 40,769評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖丰嘉,靈堂內(nèi)的尸體忽然破棺而出恩掷,到底是詐尸還是另有隱情,我是刑警寧澤供嚎,帶...
    沈念sama閱讀 36,439評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站,受9級特大地震影響克滴,放射性物質(zhì)發(fā)生泄漏逼争。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,115評論 3 335
  • 文/蒙蒙 一劝赔、第九天 我趴在偏房一處隱蔽的房頂上張望誓焦。 院中可真熱鬧,春花似錦着帽、人聲如沸杂伟。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,601評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽赫粥。三九已至,卻和暖如春予借,著一層夾襖步出監(jiān)牢的瞬間越平,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,702評論 1 274
  • 我被黑心中介騙來泰國打工灵迫, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留秦叛,地道東北人。 一個月前我還...
    沈念sama閱讀 49,191評論 3 378
  • 正文 我出身青樓瀑粥,卻偏偏與公主長得像挣跋,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子狞换,可洞房花燭夜當晚...
    茶點故事閱讀 45,781評論 2 361

推薦閱讀更多精彩內(nèi)容