? 接著上一節(jié)剖效,繼續(xù)講璧尸。還是以 SELECT A.B FROM A 為例爷光。
? 查看AstBuilder邏輯蛀序,遍歷訪問哼拔,最終會(huì)訪問到querySpecification節(jié)點(diǎn):
override def visitQuerySpecification(
ctx: QuerySpecificationContext): LogicalPlan = withOrigin(ctx) {
val from = OneRowRelation().optional(ctx.fromClause) {
visitFromClause(ctx.fromClause)
}
withQuerySpecification(ctx, from)
}
? optional用的比較多倦逐,放一下它的邏輯檬姥,其實(shí)很簡(jiǎn)單:
/**
* Create a plan using the block of code when the given context exists. Otherwise return the
* original plan.
*/
def optional(ctx: AnyRef)(f: => LogicalPlan): LogicalPlan = {
if (ctx != null) {
f
} else {
plan
}
}
/**
* Map a [[LogicalPlan]] to another [[LogicalPlan]] if the passed context exists using the
* passed function. The original plan is returned when the context does not exist.
*/
def optionalMap[C](ctx: C)(f: (C, LogicalPlan) => LogicalPlan): LogicalPlan = {
if (ctx != null) {
f(ctx, plan)
} else {
plan
}
}
? FROM 語(yǔ)句解析,因?yàn)橛衘oin的情況秉犹,所以寫的比較復(fù)雜崇堵,我們的sql比較簡(jiǎn)單鸳劳,就是返回一個(gè)relation
/**
* Create a logical plan for a given 'FROM' clause. Note that we support multiple (comma
* separated) relations here, these get converted into a single plan by condition-less inner join.
*/
override def visitFromClause(ctx: FromClauseContext): LogicalPlan = withOrigin(ctx) {
val from = ctx.relation.asScala.foldLeft(null: LogicalPlan) { (left, relation) =>
val right = plan(relation.relationPrimary)
val join = right.optionalMap(left)(Join(_, _, Inner, None))
withJoinRelations(join, relation)
}
ctx.lateralView.asScala.foldLeft(from)(withGenerate)
}
? 對(duì)WHERE 等語(yǔ)句解析涵紊,有些邏輯還是很復(fù)雜摸柄,我們只需要關(guān)注自己的sql:
/**
* Add a query specification to a logical plan. The query specification is the core of the logical
* plan, this is where sourcing (FROM clause), transforming (SELECT TRANSFORM/MAP/REDUCE),
* projection (SELECT), aggregation (GROUP BY ... HAVING ...) and filtering (WHERE) takes place.
*
* Note that query hints are ignored (both by the parser and the builder).
*/
private def withQuerySpecification(
ctx: QuerySpecificationContext,
relation: LogicalPlan): LogicalPlan = withOrigin(ctx) {
import ctx._
// WHERE
def filter(ctx: BooleanExpressionContext, plan: LogicalPlan): LogicalPlan = {
Filter(expression(ctx), plan)
}
// Expressions. 也就是要查詢的內(nèi)容
val expressions = Option(namedExpressionSeq).toSeq
.flatMap(_.namedExpression.asScala)
.map(typedVisit[Expression])
// Create either a transform or a regular query.
val specType = Option(kind).map(_.getType).getOrElse(SqlBaseParser.SELECT)
specType match {
case SqlBaseParser.MAP | SqlBaseParser.REDUCE | SqlBaseParser.TRANSFORM =>
// Transform
// Add where.
val withFilter = relation.optionalMap(where)(filter)
// Create the attributes.
val (attributes, schemaLess) = if (colTypeList != null) {
// Typed return columns.
(createSchema(colTypeList).toAttributes, false)
} else if (identifierSeq != null) {
// Untyped return columns.
val attrs = visitIdentifierSeq(identifierSeq).map { name =>
AttributeReference(name, StringType, nullable = true)()
}
(attrs, false)
} else {
(Seq(AttributeReference("key", StringType)(),
AttributeReference("value", StringType)()), true)
}
// Create the transform.
ScriptTransformation(
expressions,
string(script),
attributes,
withFilter,
withScriptIOSchema(
ctx, inRowFormat, recordWriter, outRowFormat, recordReader, schemaLess))
// 我們的是select語(yǔ)句
case SqlBaseParser.SELECT =>
// Regular select
// Add lateral views.
val withLateralView = ctx.lateralView.asScala.foldLeft(relation)(withGenerate)
// Add where.
val withFilter = withLateralView.optionalMap(where)(filter)
// Add aggregation or a project.
val namedExpressions = expressions.map {
case e: NamedExpression => e
case e: Expression => UnresolvedAlias(e)
}
val withProject = if (aggregation != null) {
withAggregation(aggregation, namedExpressions, withFilter)
} else if (namedExpressions.nonEmpty) {
// 我們的sql語(yǔ)句的返回結(jié)果
Project(namedExpressions, withFilter)
} else {
withFilter
}
// Having
val withHaving = withProject.optional(having) {
// Note that we add a cast to non-predicate expressions. If the expression itself is
// already boolean, the optimizer will get rid of the unnecessary cast.
val predicate = expression(having) match {
case p: Predicate => p
case e => Cast(e, BooleanType)
}
Filter(predicate, withProject)
}
// Distinct
val withDistinct = if (setQuantifier() != null && setQuantifier().DISTINCT() != null) {
Distinct(withHaving)
} else {
withHaving
}
// Window
val withWindow = withDistinct.optionalMap(windows)(withWindows)
// Hint
hints.asScala.foldRight(withWindow)(withHints)
}
}
? 最終返回的是 Project(namedExpressions, withFilter),他繼承了LogicalPlan
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
override def maxRows: Option[Long] = child.maxRows
override lazy val resolved: Boolean = {
val hasSpecialExpressions = projectList.exists ( _.collect {
case agg: AggregateExpression => agg
case generator: Generator => generator
case window: WindowExpression => window
}.nonEmpty
)
!expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions
}
override def validConstraints: Set[Expression] =
child.constraints.union(getAliasedConstraints(projectList))
}
? 我們斷點(diǎn)調(diào)試一下,結(jié)果確實(shí)是這樣:
? 從上面我們也已經(jīng)看到了LogicalPlan是Tree,每一個(gè)節(jié)點(diǎn)都繼承自TreeNode. Spark Sql 的LogicalPlan咧七、Expression继阻、PhysicalPlan全都是用Tree表示的瘟檩,后面都會(huì)講到墨辛。
這次提到的LogicalPlan有三個(gè)子類:
- UnaryNode 一元節(jié)點(diǎn),即只有一個(gè)子節(jié)點(diǎn)睹簇。如 Limit太惠、Filter 操作
- BinaryNode 二元節(jié)點(diǎn),即有左右子節(jié)點(diǎn)的二叉節(jié)點(diǎn)垛叨。如 Join嗽元、Union 操作
- LeafNode 葉子節(jié)點(diǎn),沒有子節(jié)點(diǎn)的節(jié)點(diǎn)淤翔。主要用戶命令類操作,如SetCommand.
sql語(yǔ)句經(jīng)過解析旁壮,得到上面三種節(jié)點(diǎn)構(gòu)成的Tree抡谐,用于后續(xù)流程麦撵。