spark-sql源碼解讀

一绘面、開發(fā)一個spark應(yīng)用

//初始化sparksession
    val spark = SparkSession.builder.appName("SparkSQL Test").master("local[4]").getOrCreate() 
    //transform操作,生成dataframe,可繼續(xù)執(zhí)行dataframe相關(guān)dsl api,
    val sqlDf = spark.sql("select count(*) from table")
    //action操作,spark-core開始執(zhí)行
    sqlDf.show(false)

二、初始化sparksession-sessionState構(gòu)造過程

//1:sparksession懶加載sessionstate
        lazy val sessionState: SessionState = {
            parentSessionState
              .map(_.clone(this))
              .getOrElse {
                val state = SparkSession.instantiateSessionState(
                  SparkSession.sessionStateClassName(sparkContext.conf),
                  self)
                initialSessionOptions.foreach { case (k, v) => state.conf.setConfString(k, v) }
                state
              }
          }
//2:實例化sessionstate
    /**
       * Helper method to create an instance of `SessionState` based on `className` from conf.
       * The result is either `SessionState` or a Hive based `SessionState`.
       */
      private def instantiateSessionState(
          className: String,
          sparkSession: SparkSession): SessionState = {
        try {
          // invoke `new [Hive]SessionStateBuilder(SparkSession, Option[SessionState])`
          val clazz = Utils.classForName(className)
          val ctor = clazz.getConstructors.head
          //默認:BaseSessionStateBuilder, hive:HiveSessionStateBuilder
          ctor.newInstance(sparkSession, None).asInstanceOf[BaseSessionStateBuilder].build()
            ...
//3: 構(gòu)建SessionState,初始化catalog侈沪、sqlparser揭璃、analyzer、optimzizer,內(nèi)置函數(shù)以及udf函數(shù)等等
            def build(): SessionState = {
              new SessionState(
                session.sharedState,
                conf,
                experimentalMethods,
                functionRegistry,
                udfRegistration,
                () => catalog,
                sqlParser,
                () => analyzer,
                () => optimizer,
                planner,
                streamingQueryManager,
                listenerManager,
                () => resourceLoader,
                createQueryExecution,
                createClone)
            }

注:QueryExecution亭罪、SessionState瘦馍、BaseSessionStateBuilder之間的關(guān)系:
(1)QueryExecution的analyzed、optimizedPlan是懶加載的,被調(diào)用時實際調(diào)用的是SessionState中的analyzer、optimizer的相關(guān)方法做解析和優(yōu)化
(2)SessionState的catalog、analyzer烘嘱、optimizer陋率、resourceLoader也是懶加載的,被調(diào)用時實際調(diào)用的是在BaseSessionStateBuilder初始化SessionState的時候生成的匿名函數(shù)
三试伙、transform-生成dataframe-resolved logicalPlan

/*
1: paserplan生成unresolved logicalPlan, ofRows方法中調(diào)用QueryExecution.assertAnalyzed(),
        其實是sparkSession.sessionState.analyzer.executeAndCheck(logical),
        再使用定義的各種解析規(guī)則,resolving unresolved attributes and relations,生成resolved logicalPlan,
        最終new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))生成dataframe*/
def sql(sqlText: String): DataFrame = {Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))}
//2: 使用訪問者模式,astBuilder遍歷antlr sql語法樹,解析成catalyst的ast語法樹,生成unresolved的邏輯計劃
  override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    astBuilder.visitSingleStatement(parser.singleStatement()) match {
      case plan: LogicalPlan => plan
      case _ =>
        val position = Origin(None, None)
        throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
    }
  }
 //代碼3:將sql命令傳給antlr,使用SqlBase.g4生成的詞匯解析器SqlBaseLexer和語法解析器SqlBaseParser,對詞和語法校驗
  protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
    logDebug(s"Parsing command: $command")

    val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
    lexer.removeErrorListeners()
    lexer.addErrorListener(ParseErrorListener)
    lexer.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced

    val tokenStream = new CommonTokenStream(lexer)
    val parser = new SqlBaseParser(tokenStream)
    parser.addParseListener(PostProcessor)
    parser.removeErrorListeners()
    parser.addErrorListener(ParseErrorListener)
    parser.legacy_setops_precedence_enbled = SQLConf.get.setOpsPrecedenceEnforced

    try {
      try {
        // first, try parsing with potentially faster SLL mode
        parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
        toResult(parser)
        ...

四曙咽、action-觸發(fā)執(zhí)行-優(yōu)化邏輯計劃,生成物理計劃,轉(zhuǎn)為rdd提交給sparkContex

//1:拉取20行數(shù)據(jù)到driver端,調(diào)用take(),最終調(diào)用head()
        def show(): Unit = show(20)
            
        def head(n: Int): Array[T] = withAction("head", limit(n).queryExecution)(collectFromPlan)
//2:Wrap一個action,監(jiān)控查詢執(zhí)行過程和時間花費,執(zhí)行用戶注冊的回調(diào)函數(shù)
        private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = {
            try {
                /*觸發(fā)optimizer優(yōu)化器采用一系列優(yōu)化規(guī)則(eg:謂詞下推)對resolved logicalPlan進行優(yōu)化,
                /sparkplanner選擇出最優(yōu)策略(eg:廣播表)將optimizedPlan轉(zhuǎn)化為sparkplan,
                    sparkplan應(yīng)用一系列規(guī)則,轉(zhuǎn)化為可預(yù)備執(zhí)行的物理計劃
                    */
              qe.executedPlan.foreach { plan =>
                plan.resetMetrics()
              }
              val start = System.nanoTime()
              val result = SQLExecution.withNewExecutionId(sparkSession, qe) {
                 //調(diào)用collectFromPlan,交給spark-core,執(zhí)行物理計劃,轉(zhuǎn)為rdd操作
                action(qe.executedPlan)
              }
              val end = System.nanoTime()
              sparkSession.listenerManager.onSuccess(name, qe, end - start)
              result
            } catch {
              case e: Exception =>
                sparkSession.listenerManager.onFailure(name, qe, e)
                throw e
            }
          }
//3:QueryExecution中從優(yōu)化到生成可預(yù)備執(zhí)行的物理計劃工作流
          lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)

          lazy val sparkPlan: SparkPlan = {
            SparkSession.setActiveSession(sparkSession)
            // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
            //       but we will implement to choose the best plan.
            planner.plan(ReturnAnswer(optimizedPlan)).next()
          }

          // executedPlan should not be used to initialize any SparkPlan. It should be
          // only used for execution.
          lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
              
          protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
              preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
            }

            /** A sequence of rules that will be applied in order to the physical plan before execution. */
            protected def preparations: Seq[Rule[SparkPlan]] = Seq(
              python.ExtractPythonUDFs,
              PlanSubqueries(sparkSession),
              EnsureRequirements(sparkSession.sessionState.conf),
              CollapseCodegenStages(sparkSession.sessionState.conf),
              ReuseExchange(sparkSession.sessionState.conf),
              ReuseSubquery(sparkSession.sessionState.conf))
//4:執(zhí)行自定義的回調(diào)函數(shù)函數(shù),該函數(shù)底層最終執(zhí)行sparkplan的do把物理計劃轉(zhuǎn)化為rdd操作
            /**
               * Collect all elements from a spark plan.
               */
              private def collectFromPlan(plan: SparkPlan): Array[T] = {
                // This projection writes output to a `InternalRow`, which means applying this projection is not
                // thread-safe. Here we create the projection inside this method to make `Dataset` thread-safe.
                val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
                plan.executeCollect().map { row =>
                  // The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type
                  // parameter of its `get` method, so it's safe to use null here.
                  objProj(row).get(0, null).asInstanceOf[T]
                }
              }
//5:將sparkplan轉(zhuǎn)為rdd,交給sparkContext提交job
              /**
                 * Runs this query returning the result as an array.
                 */
                def executeCollect(): Array[InternalRow] = {
                    //getByteArrayRdd調(diào)用execute(),再調(diào)用doExecute()方法,將sparkplan轉(zhuǎn)為RDD
                  val byteArrayRdd = getByteArrayRdd()

                  val results = ArrayBuffer[InternalRow]()
                      //byteArrayRdd.collect()是rdd的action算子,會運行sc.runJob()提交job給spark集群
                  byteArrayRdd.collect().foreach { countAndBytes =>
                    decodeUnsafeRows(countAndBytes._2).foreach(results.+=)
                  }
                  results.toArray
                }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末去枷,一起剝皮案震驚了整個濱河市久免,隨后出現(xiàn)的幾起案子底瓣,更是在濱河造成了極大的恐慌谢揪,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,270評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件捐凭,死亡現(xiàn)場離奇詭異键耕,居然都是意外死亡,警方通過查閱死者的電腦和手機柑营,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,489評論 3 395
  • 文/潘曉璐 我一進店門屈雄,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人官套,你說我怎么就攤上這事酒奶。” “怎么了奶赔?”我有些...
    開封第一講書人閱讀 165,630評論 0 356
  • 文/不壞的土叔 我叫張陵惋嚎,是天一觀的道長。 經(jīng)常有香客問我站刑,道長另伍,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,906評論 1 295
  • 正文 為了忘掉前任绞旅,我火速辦了婚禮摆尝,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘因悲。我一直安慰自己堕汞,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,928評論 6 392
  • 文/花漫 我一把揭開白布晃琳。 她就那樣靜靜地躺著讯检,像睡著了一般。 火紅的嫁衣襯著肌膚如雪卫旱。 梳的紋絲不亂的頭發(fā)上人灼,一...
    開封第一講書人閱讀 51,718評論 1 305
  • 那天,我揣著相機與錄音顾翼,去河邊找鬼投放。 笑死,一個胖子當(dāng)著我的面吹牛暴构,可吹牛的內(nèi)容都是我干的跪呈。 我是一名探鬼主播段磨,決...
    沈念sama閱讀 40,442評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼耗绿!你這毒婦竟也來了苹支?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,345評論 0 276
  • 序言:老撾萬榮一對情侶失蹤误阻,失蹤者是張志新(化名)和其女友劉穎债蜜,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體究反,經(jīng)...
    沈念sama閱讀 45,802評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡寻定,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,984評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了精耐。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片狼速。...
    茶點故事閱讀 40,117評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖卦停,靈堂內(nèi)的尸體忽然破棺而出向胡,到底是詐尸還是另有隱情,我是刑警寧澤惊完,帶...
    沈念sama閱讀 35,810評論 5 346
  • 正文 年R本政府宣布僵芹,位于F島的核電站,受9級特大地震影響小槐,放射性物質(zhì)發(fā)生泄漏拇派。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,462評論 3 331
  • 文/蒙蒙 一凿跳、第九天 我趴在偏房一處隱蔽的房頂上張望件豌。 院中可真熱鬧,春花似錦拄显、人聲如沸苟径。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,011評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至蟆盐,卻和暖如春承边,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背石挂。 一陣腳步聲響...
    開封第一講書人閱讀 33,139評論 1 272
  • 我被黑心中介騙來泰國打工博助, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人痹愚。 一個月前我還...
    沈念sama閱讀 48,377評論 3 373
  • 正文 我出身青樓富岳,卻偏偏與公主長得像蛔糯,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子窖式,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,060評論 2 355

推薦閱讀更多精彩內(nèi)容