-
spark sql 為何要進(jìn)行 analyzer ?
- 通過 antlr 解析出來的抽象語法樹 UnResolved LogicalPlan 僅僅是一種數(shù)據(jù)結(jié)構(gòu)湃望,不包含任何數(shù)據(jù)信息
- Analyzer 階段會使用事先定義好的 Rule 以及 SessionCatalog 等信息對 Unresolved LogicalPlan 進(jìn)行 transform惩琉,綁定數(shù)據(jù)源以及數(shù)據(jù)類型
-
spark sql 的 Analyzer 主要干了什么添怔?
- Analyzer模塊將Unresolved LogicalPlan結(jié)合元數(shù)據(jù)catalog進(jìn)行綁定,最終轉(zhuǎn)化為Resolved LogicalPlan
spark sql 實(shí)現(xiàn) analyzer 的具體細(xì)節(jié)
-
Spark sql 中的 Rule
Spark sql 解析出來的抽象語法樹据悔,每個節(jié)點(diǎn)都是由 TreeNode 構(gòu)成的,而 Rule 作為一條規(guī)則作用在 TreeNode 節(jié)點(diǎn)上進(jìn)行 transform 操作
-
Rule 的定義如下菠隆,apple 方法由子類實(shí)現(xiàn)定義了每個 Rule 該如何 transform
``` abstract class Rule[TreeType <: TreeNode[_]] extends Logging { /** Name for this rule, automatically inferred based on class name. */ val ruleName: String = { val className = getClass.getName if (className endsWith "$") className.dropRight(1) else className } def apply(plan: TreeType): TreeType } ```
-
spark sql 定義的 Rule
- spark sql 在 src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala 定義了很多 Batch rule
lazy val batches: Seq[Batch] = Seq( Batch("Hints", fixedPoint, new ResolveHints.ResolveBroadcastHints(conf), ResolveHints.ResolveCoalesceHints, ResolveHints.RemoveAllHints), Batch("Simple Sanity Check", Once, LookupFunctions), Batch("Substitution", fixedPoint, CTESubstitution, WindowsSubstitution, EliminateUnions, new SubstituteUnresolvedOrdinals(conf)), Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: ResolveRelations :: ResolveReferences :: ResolveCreateNamedStruct :: ResolveDeserializer :: ResolveNewInstance :: ResolveUpCast :: ResolveGroupingAnalytics :: ResolvePivot :: ResolveOrdinalInOrderByAndGroupBy :: ResolveAggAliasInGroupBy :: ResolveMissingReferences :: ExtractGenerator :: ResolveGenerate :: ResolveFunctions :: ResolveAliases :: ResolveSubquery :: ResolveSubqueryColumnAliases :: ResolveWindowOrder :: ResolveWindowFrame :: ResolveNaturalAndUsingJoin :: ResolveOutputRelation :: ExtractWindowExpressions :: GlobalAggregates :: ResolveAggregateFunctions :: TimeWindowing :: ResolveInlineTables(conf) :: ResolveHigherOrderFunctions(catalog) :: ResolveLambdaVariables(conf) :: ResolveTimeZone(conf) :: ResolveRandomSeed :: TypeCoercion.typeCoercionRules(conf) ++ extendedResolutionRules : _*), Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*), Batch("View", Once, AliasViewChild(conf)), Batch("Nondeterministic", Once, PullOutNondeterministic), Batch("UDF", Once, HandleNullInputsForUDF), Batch("FixNullability", Once, FixNullability), Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint, CleanupAliases) )
- spark sql 在 src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala 定義了很多 Batch rule
-
Analyzer 調(diào)用父類的 execute 方法,遍歷上面定義的 batches羡榴,將 Rule 作用在 Logical plan 上實(shí)現(xiàn) transform
def execute(plan: TreeType): TreeType = { var curPlan = plan val queryExecutionMetrics = RuleExecutor.queryExecutionMeter val planChangeLogger = new PlanChangeLogger() batches.foreach { batch => val batchStartPlan = curPlan var iteration = 1 var lastPlan = curPlan var continue = true // Run until fix point (or the max number of iterations as specified in the strategy. while (continue) { curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => val startTime = System.nanoTime() val result = rule(plan) val runTime = System.nanoTime() - startTime if (!result.fastEquals(plan)) { queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName) queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime) planChangeLogger.log(rule.ruleName, plan, result) } queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime) queryExecutionMetrics.incNumExecution(rule.ruleName) // Run the structural integrity checker against the plan after each rule. if (!isPlanIntegral(result)) { val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " + "the structural integrity of the plan is broken." throw new TreeNodeException(result, message, null) } result } iteration += 1 if (iteration > batch.strategy.maxIterations) { // Only log if this is a rule that is supposed to run more than once. if (iteration != 2) { val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" if (Utils.isTesting) { throw new TreeNodeException(curPlan, message, null) } else { logWarning(message) } } continue = false } if (curPlan.fastEquals(lastPlan)) { logTrace( s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.") continue = false } lastPlan = curPlan } if (!batchStartPlan.fastEquals(curPlan)) { logDebug( s""" |=== Result of Batch ${batch.name} === |${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")} """.stripMargin) } else { logTrace(s"Batch ${batch.name} has no effect.") } } curPlan }
- batch 和里面的 rules 都是連續(xù)執(zhí)行的迄沫,每執(zhí)行完一個 batch 都判斷此 batch 執(zhí)行的次數(shù)是否達(dá)到 maxIterations 和執(zhí)行此 batch 前后是否有變化羊瘩,達(dá)到 maxIterations 或者執(zhí)行 batch 前后無變化都不再執(zhí)行此batch尘吗。
-
解析 spark sql 如何通過 Catolog 將數(shù)據(jù)信息綁定到 logical plan 上的
- ResolveRelations 實(shí)現(xiàn)替換 UnresolvedRelation 為 resolveRelation
- ResolveRelations 繼承自 Rule睬捶,定義: object ResolveRelations extends Rule[LogicalPlan]
- ResolveRelations 的 apply() 方法定義了 UnsolveRelation 到 ResolveRelation 的轉(zhuǎn)換
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => EliminateSubqueryAliases(lookupTableFromCatalog(u)) match { case v: View => u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.") case other => i.copy(table = other) } case u: UnresolvedRelation => resolveRelation(u) }
-
resolveOperatorsUp 是一個柯理化調(diào)用
{ case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => EliminateSubqueryAliases(lookupTableFromCatalog(u)) match { case v: View => u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.") case other => i.copy(table = other) } case u: UnresolvedRelation => resolveRelation(u) }
resolveOperatorsUp 后面的整個代碼塊作為參數(shù)傳入 resolveOperatorsUp 方法中
-
resolveOperatorsUp 的具體實(shí)現(xiàn)
def resolveOperatorsUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = { if (!analyzed) { AnalysisHelper.allowInvokingTransformsInAnalyzer { /* apply方法中調(diào)用LogicalPlan 的resolveOperatorsUp方法并將后面的偏函數(shù)rule作為參數(shù)觉渴。自下而上的應(yīng)用于邏輯算子樹上的每一個LogicalPlan節(jié)點(diǎn) */ val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule)) if (self fastEquals afterRuleOnChildren) { CurrentOrigin.withOrigin(origin) { /* rule.applyOrElse(self, identity[LogicalPlan]) 將偏函數(shù)應(yīng)用到LogicalPlan蜕猫。偏函數(shù)會對傳入的參數(shù)進(jìn)行模式匹配,只有匹配成功的參數(shù)才會進(jìn)行處理漱挚。在rule偏函數(shù)中可以看出旨涝,如果LogicalPlan是UnresolvedRelation類型白华,則調(diào)用resolveRelation(u)方法弧腥。 */ rule.applyOrElse(self, identity[LogicalPlan]) } } else { CurrentOrigin.withOrigin(origin) { rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan]) } } } } else { self } }
首先判斷此 plan 是否已經(jīng)被 analyzed 過管搪,接著調(diào)用 mapChildren更鲁,并且傳入的是resolveOperators 方法澡为,其實(shí)就是一個遞歸調(diào)用媒至,它會優(yōu)先處理它的子節(jié)點(diǎn)塘慕,然后再處理自己图呢,如果處理后的LogicalPlan 和當(dāng)前的相等就說明他沒有子節(jié)點(diǎn)了蛤织,則處理它自己指蚜,反之處理返回的 plan摊鸡。
如果匹配的是 UnresolvedRelation 結(jié)點(diǎn)免猾,則調(diào)用 resolveRelation 方法進(jìn)行解析猎提,
經(jīng)過resolveRelation方法之后疙教,返回的 logical plan 是已經(jīng)和實(shí)際元數(shù)據(jù)綁定好的plan贞谓,可能是從globalTempViewManager直接獲取的经宏,可能是從tempTables直接獲取烁兰,也可能是從externalCatalog 獲取的元數(shù)據(jù)沪斟。
-