Foreword
大數(shù)據(jù)領(lǐng)域SQL化的風(fēng)潮方興未艾(所謂"Everybody knows SQL")当窗,F(xiàn)link自然也不能“免俗”够坐。Flink SQL是Flink系統(tǒng)內(nèi)部最高級(jí)別的API,也是流批一體思想的集大成者崖面。用戶可以通過簡(jiǎn)單明了的SQL語(yǔ)句像查表一樣執(zhí)行流任務(wù)或批任務(wù)元咙,屏蔽了底層DataStream/DataSet API的復(fù)雜細(xì)節(jié),降低了使用門檻巫员。
那么庶香,一條Flink SQL語(yǔ)句是如何轉(zhuǎn)化成可執(zhí)行的任務(wù)的呢?本文以最新的Flink 1.11版本為藍(lán)本疏遏,分上下兩篇詳細(xì)梳理流程脉课。在此之前,先簡(jiǎn)要介紹Apache Calcite與Blink Planner财异。
Prerequisite: Apache Calcite & Workflow
不同于Spark專門打造Catalyst作為其SQL層核心的騷操作倘零,F(xiàn)link沒有重復(fù)造輪子,而是務(wù)實(shí)地直接利用了通用的SQL解析與優(yōu)化引擎——Apache Calcite戳寸。Calcite在Hive呈驶、Drill、Pheonix疫鹊、Kylin諸框架中都已經(jīng)有了非常成熟的應(yīng)用袖瞻,F(xiàn)link選擇它也是情理之中。
下圖示出Calcite在整個(gè)Flink Table & SQL體系中的作用拆吆。
可見枣耀,Calcite在此體系內(nèi)需要負(fù)責(zé)以下任務(wù):
- 解析(parsing)——將SQL語(yǔ)句轉(zhuǎn)化為抽象語(yǔ)法樹(AST)霉晕,即SqlNode樹。
- 驗(yàn)證(validation)——根據(jù)Catalog中的元數(shù)據(jù)進(jìn)行語(yǔ)法檢查。
- 邏輯計(jì)劃(logical planning)——根據(jù)AST和元數(shù)據(jù)構(gòu)造出邏輯計(jì)劃牺堰,即RelNode樹拄轻。
- 邏輯計(jì)劃優(yōu)化(logical plan optimization)——按照預(yù)定義的優(yōu)化規(guī)則RelOptRule優(yōu)化邏輯計(jì)劃。Calcite中的優(yōu)化器RelOptPlanner有兩種伟葫,一是基于規(guī)則優(yōu)化(RBO)的HepPlanner恨搓,二是基于代價(jià)優(yōu)化(CBO)的VolcanoPlanner。
- 物理計(jì)劃(physical planning)——將優(yōu)化的邏輯計(jì)劃翻譯成對(duì)應(yīng)執(zhí)行邏輯的物理計(jì)劃筏养。
前4個(gè)階段其實(shí)就是Calcite的標(biāo)準(zhǔn)工作流斧抱,同時(shí)這5個(gè)階段也基本上涵蓋了Flink SQL執(zhí)行流程的主體部分。Table API相對(duì)于SQL只是在解析撼玄、驗(yàn)證方面有些不同(解析的不是SQL語(yǔ)句而是算子樹夺姑,再用Calcite RelBuilder生成邏輯計(jì)劃)。而在物理計(jì)劃之后掌猛,還需要通過代碼生成(code generation)最終轉(zhuǎn)化為能夠直接執(zhí)行的DataStream/DataSet API程序。下面分析Flink SQL的執(zhí)行步驟時(shí)眉睹,就以上面的5 + 1 = 6個(gè)階段為準(zhǔn)荔茬。
本文作為上篇,先講解比較簡(jiǎn)單的解析竹海、驗(yàn)證和邏輯計(jì)劃慕蔚,下篇再講解比較復(fù)雜的邏輯計(jì)劃優(yōu)化、物理計(jì)劃和代碼生成斋配。
關(guān)于Calcite細(xì)節(jié)的講解有珠玉在前孔飒,看官可直接參考以下幾篇文章,本文不再班門弄斧了艰争。
- 《Apache Calcite: A Foundational Framework for Optimized Query Processing Over Heterogeneous Data Sources》 from SIGMOD 2018 by E.Begoli et al.
- 《Apache Calcite處理流程詳解(一)》 from matt33.com
- 《Apache Calcite優(yōu)化器詳解(二)》 from matt33.com
Prerequisite: Blink Planner
Flink Table/SQL體系中的Planner(即查詢處理器)是溝通Flink與Calcite的橋梁坏瞄,為Table/SQL API提供完整的解析、優(yōu)化和執(zhí)行環(huán)境甩卓。Blink Planner從1.9版本開始合并到Flink主干鸠匀,并從1.11版本開始成為默認(rèn)Planner,而原有的Old Planner將會(huì)逐漸退役逾柿。
Blink Planner真正地踐行了流批一體的處理方式缀棍。它根據(jù)流處理作業(yè)和批處理作業(yè)的不同,分別提供了StreamPlanner和BatchPlanner兩種實(shí)現(xiàn)机错。這兩種Planner的底層共享了基類PlannerBase的很多源碼爬范,且作業(yè)最終都會(huì)翻譯成基于DataStream Transformation API的執(zhí)行邏輯(即將批處理視為流處理的特殊情況)。通過如下類圖即可看出一二弱匪。
Blink Planner正式發(fā)布時(shí)社區(qū)的介紹見這里青瀑,不多廢話了。
Example Preparation
為了方便講解,使用一個(gè)簡(jiǎn)單的基于官方StreamSQLExample改造而來的示例狱窘,完整代碼如下杜顺。
object StreamSQLExample {
def main(args: Array[String]): Unit = {
val settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
// set up execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tEnv = StreamTableEnvironment.create(env, settings)
val orders: DataStream[Order] = env.fromCollection(Seq(
Order(20200819177L, 1L, "beer", 3),
Order(20200819234L, 2L, "diaper", 4),
Order(20200819239L, 2L, "beef", 6),
Order(20200820066L, 3L, "rubber", 2),
Order(20200820100L, 3L, "beer", 5)))
val users: DataStream[User] = env.fromCollection(Seq(
User(1L, "Alice", 27),
User(2L, "Bob", 26),
User(3L, "Charlie", 25)))
// register DataStream as Table
val tableA = tEnv.createTemporaryView("orders", orders, 'id, 'uid, 'product, 'amount)
val tableB = tEnv.createTemporaryView("users", users, 'id, 'name, 'age)
// join the two tables
val sql =
s"""
|SELECT u.name,sum(o.amount) AS total
|FROM orders o
|INNER JOIN users u ON o.uid = u.id
|WHERE u.age < 27
|GROUP BY u.name
""".stripMargin
print(tEnv.explainSql(sql))
val result = tEnv.sqlQuery(sql)
result.toRetractStream[Row].print()
env.execute()
}
case class Order(id: Long, uid: Long, product: String, amount: Int)
case class User(id: Long, name: String, age: Int)
}
通過TableEnvironment.explainSql()方法可以直接以文本形式獲取到上述SQL語(yǔ)句的查詢計(jì)劃,包括抽象語(yǔ)法樹蘸炸、優(yōu)化的邏輯計(jì)劃和物理執(zhí)行計(jì)劃三部分躬络,在接下來的行文中會(huì)逐漸將查詢計(jì)劃貼出來。
好了搭儒,Let's get our hands dirty.
Stage 1: Parsing
首先來到執(zhí)行SQL語(yǔ)句的入口TableEnvironmentImpl.sqlQuery()方法穷当,第一句就是調(diào)用Parser.parse()方法解析SQL。
// TableEnvironmentImpl.sqlQuery()
@Override
public Table sqlQuery(String query) {
List<Operation> operations = parser.parse(query);
// ......
}
繼續(xù)來到ParserImpl.parse()以及它調(diào)用的CalciteParser.parse()方法淹禾。
// ParserImpl.parse()
@Override
public List<Operation> parse(String statement) {
CalciteParser parser = calciteParserSupplier.get();
FlinkPlannerImpl planner = validatorSupplier.get();
// parse the sql query
SqlNode parsed = parser.parse(statement);
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);
}
// CalciteParser.parse()
public SqlNode parse(String sql) {
try {
SqlParser parser = SqlParser.create(sql, config);
return parser.parseStmt();
} catch (SqlParseException e) {
throw new SqlParserException("SQL parse failed. " + e.getMessage(), e);
}
}
可見是直接調(diào)用Calcite的SQL解析器SqlParser進(jìn)行解析的馁菜。限于篇幅原因,本文就不繼續(xù)向下追蹤了铃岔,看官可在上文提到的參考文檔中找到詳細(xì)的Calcite源碼分析汪疮。來觀察一下解析出的SqlNode數(shù)據(jù)吧。
我們知道毁习,F(xiàn)link的SQL方言與標(biāo)準(zhǔn)SQL相比有很大差別智嚷,那么如何實(shí)現(xiàn)Flink SQL專用的解析器呢?注意到構(gòu)造SqlParser的配置類SqlParser.Config時(shí)纺且,需要傳入解析器工廠SqlParserImplFactory盏道,對(duì)應(yīng)代碼如下。
// PlanningConfigurationBuilder.getSqlParserConfig()
public SqlParser.Config getSqlParserConfig() {
return JavaScalaConversionUtil.toJava(calciteConfig(tableConfig).sqlParserConfig()).orElseGet(() ->
// we use Java lex because back ticks are easier than double quotes in programming
// and cases are preserved
SqlParser
.configBuilder()
.setParserFactory(FlinkSqlParserImpl.FACTORY)
.setConformance(getSqlConformance())
.setLex(Lex.JAVA)
.build());
}
但是flink-sql-parser模塊中默認(rèn)并沒有FlinkSqlParserImpl這個(gè)類载碌。我們只需要將該模塊build一下猜嘱,就會(huì)發(fā)現(xiàn)JavaCC開始編譯Flink SQL的語(yǔ)法描述文件(包含Calcite內(nèi)置的Parser.jj與Flink定制好的Freemarker模板),輸出如下信息:
[INFO] --- javacc-maven-plugin:2.4:javacc (javacc) @ flink-sql-parser ---
Java Compiler Compiler Version 4.0 (Parser Generator)
(type "javacc" with no arguments for help)
Reading from file /Users/lmagic/workspace-new/gitee/flink/flink-table/flink-sql-parser/target/generated-sources/javacc/Parser.jj . . .
Note: UNICODE_INPUT option is specified. Please make sure you create the parser/lexer using a Reader with the correct character encoding.
File "TokenMgrError.java" does not exist. Will create one.
File "ParseException.java" does not exist. Will create one.
File "Token.java" does not exist. Will create one.
File "SimpleCharStream.java" does not exist. Will create one.
Parser generated successfully.
[INFO] Processed 1 grammar
最終在generated-sources目錄下生成了FlinkSqlParserImpl及其附屬的類嫁艇,Calcite會(huì)利用它們進(jìn)行Flink SQL的解析朗伶。codegen目錄下則是語(yǔ)法描述文件的本體。
// FlinkSqlParserImpl
/**
* SQL parser, generated from Parser.jj by JavaCC.
*
* <p>The public wrapper for this parser is {@link SqlParser}.
*/
public class FlinkSqlParserImpl extends SqlAbstractParserImpl implements FlinkSqlParserImplConstants
Stage 2: Validation
SQL解析完成后裳仆,上文所述ParserImpl.parse()方法緊接著就會(huì)調(diào)用驗(yàn)證相關(guān)的邏輯腕让。查看SqlToOperationConverter.convert()方法的代碼。
// SqlToOperationConverter.convert()
public static Optional<Operation> convert(
FlinkPlannerImpl flinkPlanner,
CatalogManager catalogManager,
SqlNode sqlNode) {
// validate the query
final SqlNode validated = flinkPlanner.validate(sqlNode);
// ......
}
FlinkPlannerImpl.validate()方法與其調(diào)用的validateInternal()方法如下所示歧斟。
def validate(sqlNode: SqlNode): SqlNode = {
val validator = getOrCreateSqlValidator()
validateInternal(sqlNode, validator)
}
private def validateInternal(sqlNode: SqlNode, validator: FlinkCalciteSqlValidator): SqlNode = {
try {
sqlNode.accept(new PreValidateReWriter(
validator.getCatalogReader.unwrap(classOf[CatalogReader]), typeFactory))
// do extended validation.
sqlNode match {
case node: ExtendedSqlNode =>
node.validate()
case _ =>
}
// no need to validate row type for DDL and insert nodes.
if (sqlNode.getKind.belongsTo(SqlKind.DDL)
|| sqlNode.getKind == SqlKind.INSERT
|| sqlNode.getKind == SqlKind.CREATE_FUNCTION
|| sqlNode.getKind == SqlKind.DROP_FUNCTION
|| sqlNode.getKind == SqlKind.OTHER_DDL
|| sqlNode.isInstanceOf[SqlShowCatalogs]
|| sqlNode.isInstanceOf[SqlShowDatabases]
|| sqlNode.isInstanceOf[SqlShowTables]
|| sqlNode.isInstanceOf[SqlShowFunctions]
|| sqlNode.isInstanceOf[SqlShowViews]
|| sqlNode.isInstanceOf[SqlRichDescribeTable]) {
return sqlNode
}
sqlNode match {
case explain: SqlExplain =>
val validated = validator.validate(explain.getExplicandum)
explain.setOperand(0, validated)
explain
case _ =>
validator.validate(sqlNode)
}
}
catch {
case e: RuntimeException =>
throw new ValidationException(s"SQL validation failed. ${e.getMessage}", e)
}
}
可見纯丸,對(duì)于某些SqlNode類型是不需要驗(yàn)證的,直接返回静袖。FlinkCalciteSqlValidator繼承了Calcite的默認(rèn)驗(yàn)證器SqlValidatorImpl觉鼻,并額外規(guī)定了對(duì)字面量和join的驗(yàn)證邏輯,代碼就不再貼出來了队橙。
觀察驗(yàn)證過后的SqlNode數(shù)據(jù)坠陈,可以發(fā)現(xiàn)多出了catalog和database的名稱萨惑,說明確實(shí)根據(jù)元數(shù)據(jù)校驗(yàn)了各個(gè)元素(表名、列名及類型仇矾、函數(shù)名等)庸蔼。
經(jīng)歷了解析和驗(yàn)證階段之后,我們的查詢計(jì)劃仍然停留在SqlNode樹的形態(tài)贮匕。如果用AST的方式表達(dá)姐仅,如下所示。
== Abstract Syntax Tree ==
LogicalAggregate(group=[{0}], total=[SUM($1)])
+- LogicalProject(name=[$5], amount=[$3])
+- LogicalFilter(condition=[<($6, 27)])
+- LogicalJoin(condition=[=($1, $4)], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, orders]])
+- LogicalTableScan(table=[[default_catalog, default_database, users]])
Stage 3: Logical Planning
在邏輯計(jì)劃階段刻盐,SqlNode將被轉(zhuǎn)化成RelNode掏膏,從單純的語(yǔ)句轉(zhuǎn)化為對(duì)數(shù)據(jù)的處理邏輯,即關(guān)系代數(shù)的具體操作敦锌,如Scan馒疹、Project、Filter乙墙、Join等颖变。接著上一節(jié)SqlToOperationConverter.convert()方法來看。
// SqlToOperationConverter.convert()
public static Optional<Operation> convert(
FlinkPlannerImpl flinkPlanner,
CatalogManager catalogManager,
SqlNode sqlNode) {
// validate the query
final SqlNode validated = flinkPlanner.validate(sqlNode);
SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner, catalogManager);
if (validated instanceof SqlCreateTable) {
return Optional.of(converter.createTableConverter.convertCreateTable((SqlCreateTable) validated));
} else if (validated instanceof SqlDropTable) {
return Optional.of(converter.convertDropTable((SqlDropTable) validated));
} else if (validated instanceof SqlAlterTable) {
return Optional.of(converter.convertAlterTable((SqlAlterTable) validated));
} else if (validated instanceof SqlAlterView) {
return Optional.of(converter.convertAlterView((SqlAlterView) validated));
} else if (validated instanceof SqlCreateFunction) {
return Optional.of(converter.convertCreateFunction((SqlCreateFunction) validated));
} else if (/*...*/) {
// ......
} else if (validated instanceof SqlRichDescribeTable) {
return Optional.of(converter.convertDescribeTable((SqlRichDescribeTable) validated));
} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
return Optional.of(converter.convertSqlQuery(validated));
} else {
return Optional.empty();
}
}
這個(gè)方法會(huì)用很多重if-else判斷驗(yàn)證之后的SqlNode屬于何種類型听想,再分別調(diào)用不同的方法觸發(fā)轉(zhuǎn)換為RelNode的操作悼做。由于示例中執(zhí)行的是一個(gè)SELECT語(yǔ)句,所以從convertSqlQuery()方法繼續(xù)哗魂。
// SqlToOperationConverter.convertSqlQuery()
private Operation convertSqlQuery(SqlNode node) {
return toQueryOperation(flinkPlanner, node);
}
// SqlToOperationConverter.toQueryOperation()
private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) {
// transform to a relational tree
RelRoot relational = planner.rel(validated);
return new PlannerQueryOperation(relational.project());
}
注釋已經(jīng)寫得很明白了,F(xiàn)linkPlannerImpl.rel()方法將SqlNode樹轉(zhuǎn)化為RelNode樹漓雅,并返回其根RelRoot录别。而rel()方法直接利用Calcite內(nèi)置的SqlToRelConverter組件來遞歸地轉(zhuǎn)換,其具體邏輯仍然可見參考文檔邻吞。
// FlinkPlannerImpl.rel()
private def rel(validatedSqlNode: SqlNode, sqlValidator: FlinkCalciteSqlValidator) = {
try {
assert(validatedSqlNode != null)
val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
createToRelContext(),
sqlValidator,
sqlValidator.getCatalogReader.unwrap(classOf[CalciteCatalogReader]),
cluster,
convertletTable,
sqlToRelConverterConfig)
sqlToRelConverter.convertQuery(validatedSqlNode, false, true)
// ......
} catch {
case e: RelConversionException => throw new TableException(e.getMessage)
}
}
下圖示出RelRoot所表示的RelNode樹形結(jié)構(gòu)组题,注意LogicalAggregate、LogicalProject等都是Calcite中AbstractRelNode的實(shí)現(xiàn)類抱冷。
To Be Continued...
民那晚安晚安崔列。