Spark sql analyzer 過程解析

  1. spark sql 為何要進(jìn)行 analyzer ?

    • 通過 antlr 解析出來的抽象語法樹 UnResolved LogicalPlan 僅僅是一種數(shù)據(jù)結(jié)構(gòu)湃望,不包含任何數(shù)據(jù)信息
    • Analyzer 階段會使用事先定義好的 Rule 以及 SessionCatalog 等信息對 Unresolved LogicalPlan 進(jìn)行 transform惩琉,綁定數(shù)據(jù)源以及數(shù)據(jù)類型
  2. spark sql 的 Analyzer 主要干了什么添怔?

    • Analyzer模塊將Unresolved LogicalPlan結(jié)合元數(shù)據(jù)catalog進(jìn)行綁定,最終轉(zhuǎn)化為Resolved LogicalPlan
spark sql 實(shí)現(xiàn) analyzer 的具體細(xì)節(jié)
  1. Spark sql 中的 Rule

    • Spark sql 解析出來的抽象語法樹据悔,每個節(jié)點(diǎn)都是由 TreeNode 構(gòu)成的,而 Rule 作為一條規(guī)則作用在 TreeNode 節(jié)點(diǎn)上進(jìn)行 transform 操作

    • Rule 的定義如下菠隆,apple 方法由子類實(shí)現(xiàn)定義了每個 Rule 該如何 transform

        ```
        abstract class Rule[TreeType <: TreeNode[_]] extends Logging {
      
          /** Name for this rule, automatically inferred based on class name. */
          val ruleName: String = {
            val className = getClass.getName
            if (className endsWith "$") className.dropRight(1) else className
          }
      
          def apply(plan: TreeType): TreeType
        }
        ```
      
  2. spark sql 定義的 Rule

    • spark sql 在 src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala 定義了很多 Batch rule
       lazy val batches: Seq[Batch] = Seq(
          Batch("Hints", fixedPoint,
            new ResolveHints.ResolveBroadcastHints(conf),
            ResolveHints.ResolveCoalesceHints,
            ResolveHints.RemoveAllHints),
          Batch("Simple Sanity Check", Once,
            LookupFunctions),
          Batch("Substitution", fixedPoint,
            CTESubstitution,
            WindowsSubstitution,
            EliminateUnions,
            new SubstituteUnresolvedOrdinals(conf)),
          Batch("Resolution", fixedPoint,
            ResolveTableValuedFunctions ::
            ResolveRelations ::
            ResolveReferences ::
            ResolveCreateNamedStruct ::
            ResolveDeserializer ::
            ResolveNewInstance ::
            ResolveUpCast ::
            ResolveGroupingAnalytics ::
            ResolvePivot ::
            ResolveOrdinalInOrderByAndGroupBy ::
            ResolveAggAliasInGroupBy ::
            ResolveMissingReferences ::
            ExtractGenerator ::
            ResolveGenerate ::
            ResolveFunctions ::
            ResolveAliases ::
            ResolveSubquery ::
            ResolveSubqueryColumnAliases ::
            ResolveWindowOrder ::
            ResolveWindowFrame ::
            ResolveNaturalAndUsingJoin ::
            ResolveOutputRelation ::
            ExtractWindowExpressions ::
            GlobalAggregates ::
            ResolveAggregateFunctions ::
            TimeWindowing ::
            ResolveInlineTables(conf) ::
            ResolveHigherOrderFunctions(catalog) ::
            ResolveLambdaVariables(conf) ::
            ResolveTimeZone(conf) ::
            ResolveRandomSeed ::
            TypeCoercion.typeCoercionRules(conf) ++
            extendedResolutionRules : _*),
          Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
          Batch("View", Once,
            AliasViewChild(conf)),
          Batch("Nondeterministic", Once,
            PullOutNondeterministic),
          Batch("UDF", Once,
            HandleNullInputsForUDF),
          Batch("FixNullability", Once,
            FixNullability),
          Batch("Subquery", Once,
            UpdateOuterReferences),
          Batch("Cleanup", fixedPoint,
            CleanupAliases)
        )
      
  3. Analyzer 調(diào)用父類的 execute 方法,遍歷上面定義的 batches羡榴,將 Rule 作用在 Logical plan 上實(shí)現(xiàn) transform

     def execute(plan: TreeType): TreeType = {
        var curPlan = plan
        val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
        val planChangeLogger = new PlanChangeLogger()
    
        batches.foreach { batch =>
          val batchStartPlan = curPlan
          var iteration = 1
          var lastPlan = curPlan
          var continue = true
    
          // Run until fix point (or the max number of iterations as specified in the strategy.
          while (continue) {
            curPlan = batch.rules.foldLeft(curPlan) {
              case (plan, rule) =>
                val startTime = System.nanoTime()
                val result = rule(plan)
                val runTime = System.nanoTime() - startTime
    
                if (!result.fastEquals(plan)) {
                  queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
                  queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
                  planChangeLogger.log(rule.ruleName, plan, result)
                }
                queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
                queryExecutionMetrics.incNumExecution(rule.ruleName)
    
                // Run the structural integrity checker against the plan after each rule.
                if (!isPlanIntegral(result)) {
                  val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " +
                    "the structural integrity of the plan is broken."
                  throw new TreeNodeException(result, message, null)
                }
    
                result
            }
            iteration += 1
            if (iteration > batch.strategy.maxIterations) {
              // Only log if this is a rule that is supposed to run more than once.
              if (iteration != 2) {
                val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"
                if (Utils.isTesting) {
                  throw new TreeNodeException(curPlan, message, null)
                } else {
                  logWarning(message)
                }
              }
              continue = false
            }
    
            if (curPlan.fastEquals(lastPlan)) {
              logTrace(
                s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
              continue = false
            }
            lastPlan = curPlan
          }
    
          if (!batchStartPlan.fastEquals(curPlan)) {
            logDebug(
              s"""
                |=== Result of Batch ${batch.name} ===
                |${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")}
              """.stripMargin)
          } else {
            logTrace(s"Batch ${batch.name} has no effect.")
          }
        }
    
        curPlan
      }
    
    • batch 和里面的 rules 都是連續(xù)執(zhí)行的迄沫,每執(zhí)行完一個 batch 都判斷此 batch 執(zhí)行的次數(shù)是否達(dá)到 maxIterations 和執(zhí)行此 batch 前后是否有變化羊瘩,達(dá)到 maxIterations 或者執(zhí)行 batch 前后無變化都不再執(zhí)行此batch尘吗。
  4. 解析 spark sql 如何通過 Catolog 將數(shù)據(jù)信息綁定到 logical plan 上的

    • ResolveRelations 實(shí)現(xiàn)替換 UnresolvedRelation 為 resolveRelation
    • ResolveRelations 繼承自 Rule睬捶,定義: object ResolveRelations extends Rule[LogicalPlan]
    • ResolveRelations 的 apply() 方法定義了 UnsolveRelation 到 ResolveRelation 的轉(zhuǎn)換
      def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
            case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
              EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
                case v: View =>
                  u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
                case other => i.copy(table = other)
              }
            case u: UnresolvedRelation => resolveRelation(u)
          }
      
      • resolveOperatorsUp 是一個柯理化調(diào)用

        {
        case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
                        EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
                          case v: View =>
                            u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
                          case other => i.copy(table = other)
                        }
                      case u: UnresolvedRelation => resolveRelation(u)
                    }
        
        • resolveOperatorsUp 后面的整個代碼塊作為參數(shù)傳入 resolveOperatorsUp 方法中

        • resolveOperatorsUp 的具體實(shí)現(xiàn)

          def resolveOperatorsUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
              if (!analyzed) {
                AnalysisHelper.allowInvokingTransformsInAnalyzer {
                  /* apply方法中調(diào)用LogicalPlan 的resolveOperatorsUp方法并將后面的偏函數(shù)rule作為參數(shù)觉渴。自下而上的應(yīng)用于邏輯算子樹上的每一個LogicalPlan節(jié)點(diǎn) */
                  val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule))
                  if (self fastEquals afterRuleOnChildren) {
                    CurrentOrigin.withOrigin(origin) {
                      /* rule.applyOrElse(self, identity[LogicalPlan]) 將偏函數(shù)應(yīng)用到LogicalPlan蜕猫。偏函數(shù)會對傳入的參數(shù)進(jìn)行模式匹配,只有匹配成功的參數(shù)才會進(jìn)行處理漱挚。在rule偏函數(shù)中可以看出旨涝,如果LogicalPlan是UnresolvedRelation類型白华,則調(diào)用resolveRelation(u)方法弧腥。 */
                      rule.applyOrElse(self, identity[LogicalPlan])
                    }
                  } else {
                    CurrentOrigin.withOrigin(origin) {
                      rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
                    }
                  }
                }
              } else {
                self
              }
            }
          
        • 首先判斷此 plan 是否已經(jīng)被 analyzed 過管搪,接著調(diào)用 mapChildren更鲁,并且傳入的是resolveOperators 方法澡为,其實(shí)就是一個遞歸調(diào)用媒至,它會優(yōu)先處理它的子節(jié)點(diǎn)塘慕,然后再處理自己图呢,如果處理后的LogicalPlan 和當(dāng)前的相等就說明他沒有子節(jié)點(diǎn)了蛤织,則處理它自己指蚜,反之處理返回的 plan摊鸡。

        • 如果匹配的是 UnresolvedRelation 結(jié)點(diǎn)免猾,則調(diào)用 resolveRelation 方法進(jìn)行解析猎提,
          經(jīng)過resolveRelation方法之后疙教,返回的 logical plan 是已經(jīng)和實(shí)際元數(shù)據(jù)綁定好的plan贞谓,可能是從globalTempViewManager直接獲取的经宏,可能是從tempTables直接獲取烁兰,也可能是從externalCatalog 獲取的元數(shù)據(jù)沪斟。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末择吊,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子所森,更是在濱河造成了極大的恐慌,老刑警劉巖晴弃,帶你破解...
    沈念sama閱讀 217,907評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異枯怖,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)寿冕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人京办,你說我怎么就攤上這事〔萍ⅲ” “怎么了?”我有些...
    開封第一講書人閱讀 164,298評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我,道長秽誊,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,586評論 1 293
  • 正文 為了忘掉前任藻懒,我火速辦了婚禮归敬,結(jié)果婚禮上鄙早,老公的妹妹穿的比我還像新娘汪茧。我一直安慰自己,他們只是感情好限番,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,633評論 6 392
  • 文/花漫 我一把揭開白布陆爽。 她就那樣靜靜地躺著,像睡著了一般扳缕。 火紅的嫁衣襯著肌膚如雪慌闭。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,488評論 1 302
  • 那天躯舔,我揣著相機(jī)與錄音驴剔,去河邊找鬼。 笑死粥庄,一個胖子當(dāng)著我的面吹牛丧失,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播惜互,決...
    沈念sama閱讀 40,275評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼布讹,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了训堆?” 一聲冷哼從身側(cè)響起描验,我...
    開封第一講書人閱讀 39,176評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎坑鱼,沒想到半個月后膘流,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,619評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡鲁沥,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,819評論 3 336
  • 正文 我和宋清朗相戀三年呼股,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片画恰。...
    茶點(diǎn)故事閱讀 39,932評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡彭谁,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出允扇,到底是詐尸還是另有隱情缠局,我是刑警寧澤则奥,帶...
    沈念sama閱讀 35,655評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站甩鳄,受9級特大地震影響逞度,放射性物質(zhì)發(fā)生泄漏额划。R本人自食惡果不足惜妙啃,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,265評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望俊戳。 院中可真熱鬧揖赴,春花似錦、人聲如沸抑胎。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽阿逃。三九已至铭拧,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間恃锉,已是汗流浹背搀菩。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留破托,地道東北人肪跋。 一個月前我還...
    沈念sama閱讀 48,095評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像土砂,于是被迫代替她去往敵國和親州既。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,884評論 2 354

推薦閱讀更多精彩內(nèi)容