From Calcite to Tampering with Flink SQL

今天為組內(nèi)同學(xué)做了題為「From Calcite to Tampering with Flink SQL」的分享猜拾,將Markdown版講義貼在下面。

本次分享信息量極大,涵蓋Calcite基礎(chǔ)、Blink Planner執(zhí)行原理聋涨、優(yōu)化器與優(yōu)化規(guī)則等。之后會擇重點專門寫文章二次講解负乡。


From Calcite to Tampering with Flink SQL

August 26th, 2021

For NiceTuan Real-Time Team


Prerequisites

  • Basic understanding of
    • Flink DataStream runtime (3-layered DAGs, stream partition, etc.)
    • Database system concepts
    • SQL queries
    • Scala language, just in case

(Review) Some Relational Algebra

  • Textbook - Database System Concepts 6th Edition [Abraham Silberschatz et al. 2011]

  • But Wikipedia is fairly enough

    • Relational algebra is a theory that uses algebraic structures with a well-founded semantics for modeling data, and defining queries on it
    • The theory was introduced by Edgar F. Codd

  • Projection (Π)

  • Selection (σ)
  • Rename (ρ)
  • Natural join (?) & Equi-join
  • Left outer join (?)
  • Right outer join (?)

Calcite In A Nutshell

What is it

  • As you already knew, "Flink does not reinvent the wheel, but leverages Apache Calcite to deal with most SQL-related works"

  • Apache Calcite is a foundational software framework that provides query processing, optimization, and query language support to many popular open-source data processing systems such as Apache Hive, Apache Storm, Apache Flink, Druid, and MapD

Architecture

Fundamental Concepts

  • Catalog - A metadata store & handler for schema, tables, etc.

  • SqlNode - A parsed SQL tree (i.e. AST)

    • SqlLiteral - Constant value (1, FALSE, ...)
    • SqlIdentifier - Identifier
    • SqlCall - Call to functions, operators, etc.
    • SqlSelect / SqlJoin / SqlOrderBy / ...

  • RelNode - A relational (algebraic) expression

    • LogicalTableScan
    • LogicalProject
    • LogicalFilter
    • LogicalCalc
    • ...

  • RexNode - A (typed) row-level expression

    • RexLiteral
    • RexVariable
    • RexCall
    • ...

  • RelTrait & RelTraitDef - A set of physical properties & their definitions carried by a relational expression

    • Convention - Working scope, mainly a single data source
    • RelCollation - Ordering method of data (and sort keys)
    • RelDistribution - Distribution method of data

  • RelOptPlanner - A query optimizer, which transforms a relational expression into a semantically equivalent relational expression, according to a given set of rules and a cost model

    • HepPlanner - RBO, greedy, heuristic
    • VolcanoPlanner - CBO, dynamic programming, Volcano-flavored

  • RelOptRule - A (usually empirical) rule which defines the transformation routine for RBO

    • RelOptRuleOperand - Used by the rule to determine the section of RelNodes to be optimized
    • RuleSet - Self-explanatory

  • RelOptCost - An interface for optimizer cost in terms of number of rows processed, CPU cost, and I/O cost

  • RelMetadataProvider - An interface for obtaining metadata about relational expressions to support optimization process

    • Min / max row count
    • Data size
    • Expression lineage
    • Distinctness / uniqueness
    • ...

  • RelOptCluster - The environment during the optimization of a query

Process Flow


A Quick Calcite Show

Prepare Schema and SQL

SchemaPlus rootSchema = Frameworks.createRootSchema(true);

rootSchema.add("student", new AbstractTable() {
  @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) {
    RelDataTypeFactory.Builder builder = new Builder(DEFAULT_TYPE_FACTORY);

    builder.add("id", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.BIGINT));
    builder.add("name", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.VARCHAR));
    builder.add("class", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.VARCHAR));
    builder.add("age", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.INTEGER));

    return builder.build();
  }
});

rootSchema.add("exam_result", new AbstractTable() {
  @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) {
    RelDataTypeFactory.Builder builder = new Builder(DEFAULT_TYPE_FACTORY);

    builder.add("student_id", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.BIGINT));
    builder.add("score1", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.FLOAT));
    builder.add("score2", new BasicSqlType(DEFAULT_TYPE_SYSTEM, SqlTypeName.FLOAT));

    return builder.build();
  }
});

String sql = /* language=SQL */
  "SELECT a.id, a.name, SUM(b.score1 * 0.7 + b.score2 * 0.3) AS total_score " +
  "FROM student a " +
  "INNER JOIN exam_result b ON a.id = b.student_id " +
  "WHERE a.age < 20 AND b.score1 > 60.0 " +
  "GROUP BY a.id, a.name";

Parsing

FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
  .parserConfig(SqlParser.config().withCaseSensitive(false).withLex(Lex.MYSQL_ANSI))
  .defaultSchema(rootSchema)
  .build();

SqlParser parser = SqlParser.create(sql);
SqlNode originalSqlNode = parser.parseStmt();

System.out.println(originalSqlNode.toString());
--- Original SqlNode ---
SELECT `A`.`ID`, `A`.`NAME`, SUM(`B`.`SCORE1` * 0.7 + `B`.`SCORE2` * 0.3) AS `TOTAL_SCORE`
FROM `STUDENT` AS `A`
INNER JOIN `EXAM_RESULT` AS `B` ON `A`.`ID` = `B`.`STUDENT_ID`
WHERE `A`.`AGE` < 20 AND `B`.`SCORE1` > 60.0
GROUP BY `A`.`ID`, `A`.`NAME`

Validation

Properties cxnConfig = new Properties();
cxnConfig.setProperty(
  CalciteConnectionProperty.CASE_SENSITIVE.camelName(),
  String.valueOf(frameworkConfig.getParserConfig().caseSensitive()));

CalciteCatalogReader catalogReader = new CalciteCatalogReader(
  CalciteSchema.from(rootSchema),
  CalciteSchema.from(frameworkConfig.getDefaultSchema()).path(null),
  DEFAULT_TYPE_FACTORY,
  new CalciteConnectionConfigImpl(cxnConfig)
);

SqlValidator validator = new SqlValidatorImpl1(
  frameworkConfig.getOperatorTable(),
  catalogReader,
  DEFAULT_TYPE_FACTORY
);

SqlNode validatedSqlNode = validator.validate(originalSqlNode);

System.out.println(validatedSqlNode.toString());
--- Validated SqlNode ---
SELECT `A`.`ID`, `A`.`NAME`, SUM(`B`.`SCORE1` * 0.7 + `B`.`SCORE2` * 0.3) AS `TOTAL_SCORE`
FROM `STUDENT` AS `A`
INNER JOIN `EXAM_RESULT` AS `B` ON `A`.`id` = `B`.`student_id`
WHERE `A`.`age` < 20 AND `B`.`score1` > 60.0
GROUP BY `A`.`id`, `A`.`name`

Planning

RelOptCluster relOptCluster = RelOptCluster.create(new VolcanoPlanner(), new RexBuilder(DEFAULT_TYPE_FACTORY));

SqlToRelConverter relConverter = new SqlToRelConverter(
  null,
  validator,
  catalogReader,
  relOptCluster,
  frameworkConfig.getConvertletTable()
);

RelRoot relRoot = relConverter.convertQuery(validatedSqlNode, false, true);
RelNode originalRelNode = relRoot.rel;

System.out.println(RelOptUtil.toString(originalRelNode));
--- Original RelNode ---
LogicalProject(ID=[$0], NAME=[$1], TOTAL_SCORE=[$2])
  LogicalAggregate(group=[{0, 1}], TOTAL_SCORE=[SUM($2)])
    LogicalProject(id=[$0], name=[$1], $f2=[+(*($5, 0.7:DECIMAL(2, 1)), *($6, 0.3:DECIMAL(2, 1)))])
      LogicalFilter(condition=[AND(<($3, 20), >($5, 60.0:DECIMAL(3, 1)))])
        LogicalJoin(condition=[=($0, $4)], joinType=[inner])
          LogicalTableScan(table=[[student]])
          LogicalTableScan(table=[[exam_result]])

Optimization

  • Predicate (filter) pushdown past join into table scan using HepPlanner and FILTER_INTO_JOIN rule

σR.aθa' ^ S.bθb' (R ? S) = (σR.aθa' R) ? (σS.bθb' S)

  • HepProgram defines the order of rules to be attempted
HepProgram hepProgram = new HepProgramBuilder()
  .addRuleInstance(CoreRules.FILTER_INTO_JOIN)
  .addMatchOrder(HepMatchOrder.BOTTOM_UP)
  .build();

HepPlanner hepPlanner = new HepPlanner(hepProgram);
hepPlanner.setRoot(originalRelNode);
RelNode optimizedRelNode = hepPlanner.findBestExp();

System.out.println(RelOptUtil.toString(optimizedRelNode));
--- Optimized RelNode ---
LogicalProject(ID=[$0], NAME=[$1], TOTAL_SCORE=[$2])
  LogicalAggregate(group=[{0, 1}], TOTAL_SCORE=[SUM($2)])
    LogicalProject(id=[$0], name=[$1], $f2=[+(*($5, 0.7:DECIMAL(2, 1)), *($6, 0.3:DECIMAL(2, 1)))])
      LogicalJoin(condition=[=($0, $4)], joinType=[inner])
        LogicalFilter(condition=[<($3, 20)])
          LogicalTableScan(table=[[student]])
        LogicalFilter(condition=[>($1, 60.0:DECIMAL(3, 1))])
          LogicalTableScan(table=[[exam_result]])
  • Rules can do a lot more...

Dive Into Blink Stream Planner

Overview

  • Parsing & validation
  • Logical planning
  • All-over optimization w/ physical planning
  • Execution planning & codegen (only a brief today)

SQL for Example

  • Will not cover sophisticated things (e.g. sub-queries, aggregate functions, window TVFs) for now
  • Just an ordinary streaming ETL process, which will be optimized later
INSERT INTO expdb.print_joined_result
SELECT 
  FROM_UNIXTIME(a.ts / 1000, 'yyyy-MM-dd HH:mm:ss') AS tss, 
  a.userId, a.eventType, 
  a.siteId, b.site_name AS siteName
FROM expdb.kafka_analytics_access_log_app 
/*+ OPTIONS('scan.startup.mode'='latest-offset','properties.group.id'='DiveIntoBlinkExp') */ a
LEFT JOIN rtdw_dim.mysql_site_war_zone_mapping_relation 
FOR SYSTEM_TIME AS OF a.procTime AS b ON CAST(a.siteId AS INT) = b.site_id
WHERE a.userId > 3 + 4;

Parsing & Validation

  • Build the flink-sql-parser module, and you'll get the exact parser for Flink SQL dialect
  • Call stack
// parse
parse:54, CalciteParser (org.apache.flink.table.planner.parse)
parse:96, ParserImpl (org.apache.flink.table.planner.delegation)
executeSql:722, TableEnvironmentImpl (org.apache.flink.table.api.internal)

// validation
-- goes to org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator#validate()
org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate:150, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
validate:108, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
convert:201, SqlToOperationConverter (org.apache.flink.table.planner.operations)
parse:99, ParserImpl (org.apache.flink.table.planner.delegation)
executeSql:722, TableEnvironmentImpl (org.apache.flink.table.api.internal)
  • SqlNode tree
    • Note that FOR SYSTEM_TIME AS OF syntax is translated to a SqlSnapshot node

Logical Planning

  • Call stack
    • Obviously, these are a bunch of recursive processes
-- goes to Calcite SqlToRelConverter
org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:168, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
rel:160, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
toQueryOperation:967, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convertSqlQuery:936, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convert:275, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convertSqlInsert:595, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convert:268, SqlToOperationConverter (org.apache.flink.table.planner.operations)
parse:99, ParserImpl (org.apache.flink.table.planner.delegation)
executeSql:722, TableEnvironmentImpl (org.apache.flink.table.api.internal)
  • Logical planning in Flink SQL yields a tree of Operations (e.g. ModifyOperation, QueryOperation)

    • Just wrappers of RelNodes

  • RelNode tree

    • SqlJoinLogicalCorrelate (in Calcite this means nested-loop join)
    • SqlSnapshotLogicalSnapshot
    • etc.
  • Output of EXPLAIN statement
-- In fact this is the original logical plan
== Abstract Syntax Tree ==
LogicalSink(table=[hive.expdb.print_joined_result], fields=[tss, userId, eventType, siteId, siteName])
+- LogicalProject(tss=[FROM_UNIXTIME(/($0, 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss')], userId=[$1], eventType=[$2], siteId=[$6], siteName=[$10])
   +- LogicalFilter(condition=[>($1, +(3, 4))])
      +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{6, 8}])
         :- LogicalProject(ts=[$0], userId=[$1], eventType=[$2], columnType=[$3], fromType=[$4], grouponId=[$5], siteId=[$6], merchandiseId=[$7], procTime=[PROCTIME()])
         :  +- LogicalTableScan(table=[[hive, expdb, kafka_analytics_access_log_app]], hints=[[[OPTIONS inheritPath:[] options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])
         +- LogicalFilter(condition=[=(CAST($cor0.siteId):INTEGER, $0)])
            +- LogicalSnapshot(period=[$cor0.procTime])
               +- LogicalTableScan(table=[[hive, rtdw_dim, mysql_site_war_zone_mapping_relation]])

All-Over Optimization w/ Physical Planning

  • Call stack
    • CommonSubGraphBasedOptimizer is a Flink-implemented optimizer that divides logical plan into sub-graphs by SinkBlocks, and reuses common sub-graphs whenever available
    • For most scenarios, the logical plan is merely a single tree (optimizeTree)
-- goes to org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram#optimize()
optimizeTree:163, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
doOptimize:79, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
optimize:284, PlannerBase (org.apache.flink.table.planner.delegation)
translate:168, PlannerBase (org.apache.flink.table.planner.delegation)
translate:1516, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeInternal:738, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeInternal:854, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeSql:728, TableEnvironmentImpl (org.apache.flink.table.api.internal)
  • FlinkChainedProgram breaks down to several FlinkHepPrograms (resemble to HepProgram), which defines the order of rules to be attempted with HepPlanner

    • This time a lot more rules of course
    • Flink SQL handles entire physical planning process with RelOptRules, along with logical/physical optimization

  • All RuleSets are presented in FlinkStreamRuleSets, some of them are shipped natively with Calcite

  • FlinkStreamProgram actually build up the program sequence
    • The names are quite straightforward though
    • At the end of LOGICAL, specialized ConverterRules will convert Calcite RelNode into FlinkLogicalRel
      • e.g. LogicalCalcFlinkLogicalCalcConverterFlinkLogicalCalc
      • i.e. Converted the convention to FLINK_LOGICAL
      • Logical optimization phase is somewhat hard to observe
  • The optimized StreamPhysicalRel tree
    • Physical planning rules are almost all ConverterRules
      • FlinkLogicalRelStreamPhysicalRel, convention FLINK_LOGICALSTREAM_PHYSICAL
      • e.g. FlinkLogicalCalcStreamPhysicalCalcRuleStreamPhysicalCalc

    • HepRelVertex is the wrapper of RelNode in HepPlanner
  • Output of EXPLAIN statement
== Optimized Physical Plan ==
Sink(table=[hive.expdb.print_joined_result], fields=[tss, userId, eventType, siteId, siteName])
+- Calc(select=[FROM_UNIXTIME(/(ts, 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS tss, userId, eventType, siteId, site_name AS siteName])
   +- LookupJoin(table=[hive.rtdw_dim.mysql_site_war_zone_mapping_relation], joinType=[LeftOuterJoin], async=[false], lookup=[site_id=siteId0], select=[ts, userId, eventType, siteId, siteId0, site_id, site_name])
      +- Calc(select=[ts, userId, eventType, siteId, CAST(siteId) AS siteId0], where=[>(userId, 7)])
         +- TableSourceScan(table=[[hive, expdb, kafka_analytics_access_log_app]], fields=[ts, userId, eventType, columnType, fromType, grouponId, siteId, merchandiseId], hints=[[[OPTIONS options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])
  • Pick two rules for some explanation

  • TEMPORAL_JOIN_REWRITE - LogicalCorrelateToJoinFromLookupTableRuleWithFilter

This rule matches

+- LogicalCorrelate
   :- [RelNode related to stream table]
   +- LogicalFilter(condition)
      +- LogicalSnapshot(time_attr)
         +- [RelNode related to temporal table]

and transforms into

+- LogicalJoin(condition)
   :- [RelNode related to stream table]
   +- LogicalSnapshot(time_attr)
      +- [RelNode related to temporal table]
  • PHYSICAL - StreamPhysicalLookupJoinRule - SnapshotOnTableScanRule

This rule matches

+- FlinkLogicalJoin(condition)
   :- [RelNode related to stream table]
   +- FlinkLogicalSnapshot(time_attr)
      +- FlinkLogicalTableSourceScan [w/ LookupTableSource]

and transforms into StreamPhysicalLookupJoin

Execution Planning & Codegen

  • Call stack
-- goes to separate FlinkPhysicalRel#translateToExecNode()
generate:74, ExecNodeGraphGenerator (org.apache.flink.table.planner.plan.nodes.exec)
generate:54, ExecNodeGraphGenerator (org.apache.flink.table.planner.plan.nodes.exec)
translateToExecNodeGraph:312, PlannerBase (org.apache.flink.table.planner.delegation)
translate:164, PlannerBase (org.apache.flink.table.planner.delegation)
translate:1518, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeInternal:740, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeInternal:856, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeSql:730, TableEnvironmentImpl (org.apache.flink.table.api.internal)

-- goes to separate ExecNodeBase#translateToPlan() & StreamExecNode#translateToPlanInternal()
translateToPlan:70, StreamPlanner (org.apache.flink.table.planner.delegation)
translate:165, PlannerBase (org.apache.flink.table.planner.delegation)
translate:1518, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeInternal:740, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeInternal:856, TableEnvironmentImpl (org.apache.flink.table.api.internal)
executeSql:730, TableEnvironmentImpl (org.apache.flink.table.api.internal)
  • The ExecNodeGraph DAG
    • JSON representation of this DAG can be acquired or executed by tableEnv.asInstanceOf[TableEnvironmentInternal].getJsonPlan(sql) / executeJsonPlan(plan)
  • Output of EXPLAIN statement
== Optimized Execution Plan ==
Sink(table=[hive.expdb.print_joined_result], fields=[tss, userId, eventType, siteId, siteName])
+- Calc(select=[FROM_UNIXTIME((ts / 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS tss, userId, eventType, siteId, site_name AS siteName])
   +- LookupJoin(table=[hive.rtdw_dim.mysql_site_war_zone_mapping_relation], joinType=[LeftOuterJoin], async=[false], lookup=[site_id=siteId0], select=[ts, userId, eventType, siteId, siteId0, site_id, site_name])
      +- Calc(select=[ts, userId, eventType, siteId, CAST(siteId) AS siteId0], where=[(userId > 7)])
         +- TableSourceScan(table=[[hive, expdb, kafka_analytics_access_log_app]], fields=[ts, userId, eventType, columnType, fromType, grouponId, siteId, merchandiseId], hints=[[[OPTIONS options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])
  • StreamExecNodeTransformation → Generated DataStream Operator / Function code

    • e.g. StreamExecCalcOneInputStreamTransformationOneInputStreamOperator / FlatMapFunction

  • Generated code will be dynamically compiled into Java class files through Janino

  • We'll leave detailed explanation of this part for the next lecture


Get Our Hands Dirty

Question

  • Are there any hidden trouble in the simple example program shown above?
  • Try focus on the LookupJoin and consider its cache locality
    • In extreme conditions, a lookup-ed KV can be re-cached N times

Define An Option

  • Distributing lookup keys (according to hash) to sub-tasks seems better

  • In ExecutionConfigOptions...

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Boolean> TABLE_EXEC_LOOKUP_DISTRIBUTE_BY_KEY =
    key("table.exec.lookup.distribute-by-key")
    .defaultValue(false)
    .withDescription("Specifies whether to distribute lookups to sub-tasks by hash value of lookup key.");

Customize A Rule

  • When to apply this rule? --- After physical planning

  • What should we do? --- Insert a hash-by-key operation before StreamPhysicalLookupJoin

    • FlinkRelDistribution will do the work
    • Physical redistribution means StreamPhysicalExchange node

  • Note that there are 5 kinds of RelTrait in Flink SQL

class HashDistributedLookupJoinRule extends RelOptRule(
  operand(classOf[StreamPhysicalLookupJoin], any()),
  "HashDistributedLookupJoinRule") {

  override def matches(call: RelOptRuleCall): Boolean = {
    val tableConfig = call.getPlanner.getContext.unwrap(classOf[FlinkContext]).getTableConfig
    tableConfig.getConfiguration.getBoolean(ExecutionConfigOptions.TABLE_EXEC_LOOKUP_DISTRIBUTE_BY_KEY)
  }

  override def onMatch(call: RelOptRuleCall): Unit = {
    val originalLookupJoin: StreamPhysicalLookupJoin = call.rel(0)
    val joinInfo = originalLookupJoin.joinInfo
    val traitSet = originalLookupJoin.getTraitSet

    val requiredDistribution = FlinkRelDistribution.hash(joinInfo.leftKeys)

    val hashDistributedTraitSet = traitSet
      .replace(requiredDistribution)
      .replace(FlinkConventions.STREAM_PHYSICAL)
      .replace(RelCollations.EMPTY)
      .replace(traitSet.getTrait(ModifyKindSetTraitDef.INSTANCE))
      .replace(traitSet.getTrait(UpdateKindTraitDef.INSTANCE))

    val hashDistributedInput = new StreamPhysicalExchange(
      originalLookupJoin.getCluster,
      hashDistributedTraitSet,
      originalLookupJoin,
      requiredDistribution
    )

    call.transformTo(
      originalLookupJoin.copy(originalLookupJoin.getTraitSet, util.Arrays.asList(hashDistributedInput))
    )
  }
}

object HashDistributedLookupJoinRule {
  val INSTANCE: RelOptRule = new HashDistributedLookupJoinRule
}
  • There's a helper method FlinkExpandConversionRule#satisfyDistribution() (also used in two-stage aggregation), how lucky
val hashDistributedInput = FlinkExpandConversionRule.satisfyDistribution(
  FlinkConventions.STREAM_PHYSICAL,
  originalLookupJoin.getInput,
  requiredDistribution
)

Put Into Rule Set

  • At the tail of FlinkStreamRuleSets
val PHYSICAL_REWRITE: RuleSet = RuleSets.ofList(
    // hash distributed lookup join rule
    HashDistributedLookupJoinRule.INSTANCE,
    // optimize agg rule
    TwoStageOptimizedAggregateRule.INSTANCE,
    // incremental agg rule
    IncrementalAggregateRule.INSTANCE,
    // optimize window agg rule
    TwoStageOptimizedWindowAggregateRule.INSTANCE
)

Have A Try

  • Rebuild flink-table-api-java & flink-table-planner-blink module
  • SET table.exec.lookup.distribute-by-key=true
== Optimized Physical Plan ==
Sink(table=[hive.expdb.print_joined_result], fields=[tss, userId, eventType, siteId, siteName])
+- Calc(select=[FROM_UNIXTIME(/(ts, 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS tss, userId, eventType, siteId, site_name AS siteName])
   +- LookupJoin(table=[hive.rtdw_dim.mysql_site_war_zone_mapping_relation], joinType=[LeftOuterJoin], async=[false], lookup=[site_id=siteId0], select=[ts, userId, eventType, siteId, siteId0, site_id, site_name])
      +- Exchange(distribution=[hash[siteId0]])
         +- Calc(select=[ts, userId, eventType, siteId, CAST(siteId) AS siteId0], where=[>(userId, 7)])
            +- TableSourceScan(table=[[hive, expdb, kafka_analytics_access_log_app]], fields=[ts, userId, eventType, columnType, fromType, grouponId, siteId, merchandiseId], hints=[[[OPTIONS options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])

== Optimized Execution Plan ==
Sink(table=[hive.expdb.print_joined_result], fields=[tss, userId, eventType, siteId, siteName])
+- Calc(select=[FROM_UNIXTIME((ts / 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS tss, userId, eventType, siteId, site_name AS siteName])
   +- LookupJoin(table=[hive.rtdw_dim.mysql_site_war_zone_mapping_relation], joinType=[LeftOuterJoin], async=[false], lookup=[site_id=siteId0], select=[ts, userId, eventType, siteId, siteId0, site_id, site_name])
      +- Exchange(distribution=[hash[siteId0]])
         +- Calc(select=[ts, userId, eventType, siteId, CAST(siteId) AS siteId0], where=[(userId > 7)])
            +- TableSourceScan(table=[[hive, expdb, kafka_analytics_access_log_app]], fields=[ts, userId, eventType, columnType, fromType, grouponId, siteId, merchandiseId], hints=[[[OPTIONS options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])

The End

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市脊凰,隨后出現(xiàn)的幾起案子抖棘,更是在濱河造成了極大的恐慌,老刑警劉巖狸涌,帶你破解...
    沈念sama閱讀 219,490評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件切省,死亡現(xiàn)場離奇詭異,居然都是意外死亡帕胆,警方通過查閱死者的電腦和手機(jī)朝捆,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來懒豹,“玉大人芙盘,你說我怎么就攤上這事×郴啵” “怎么了儒老?”我有些...
    開封第一講書人閱讀 165,830評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長记餐。 經(jīng)常有香客問我驮樊,道長,這世上最難降的妖魔是什么片酝? 我笑而不...
    開封第一講書人閱讀 58,957評論 1 295
  • 正文 為了忘掉前任囚衔,我火速辦了婚禮,結(jié)果婚禮上雕沿,老公的妹妹穿的比我還像新娘练湿。我一直安慰自己,他們只是感情好晦炊,可當(dāng)我...
    茶點故事閱讀 67,974評論 6 393
  • 文/花漫 我一把揭開白布鞠鲜。 她就那樣靜靜地躺著宁脊,像睡著了一般。 火紅的嫁衣襯著肌膚如雪贤姆。 梳的紋絲不亂的頭發(fā)上榆苞,一...
    開封第一講書人閱讀 51,754評論 1 307
  • 那天,我揣著相機(jī)與錄音霞捡,去河邊找鬼坐漏。 笑死,一個胖子當(dāng)著我的面吹牛碧信,可吹牛的內(nèi)容都是我干的赊琳。 我是一名探鬼主播,決...
    沈念sama閱讀 40,464評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼砰碴,長吁一口氣:“原來是場噩夢啊……” “哼躏筏!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起呈枉,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤趁尼,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后猖辫,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體酥泞,經(jīng)...
    沈念sama閱讀 45,847評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,995評論 3 338
  • 正文 我和宋清朗相戀三年啃憎,在試婚紗的時候發(fā)現(xiàn)自己被綠了芝囤。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,137評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡辛萍,死狀恐怖悯姊,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情贩毕,我是刑警寧澤挠轴,帶...
    沈念sama閱讀 35,819評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站耳幢,受9級特大地震影響岸晦,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜睛藻,卻給世界環(huán)境...
    茶點故事閱讀 41,482評論 3 331
  • 文/蒙蒙 一启上、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧店印,春花似錦冈在、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽纫谅。三九已至,卻和暖如春溅固,著一層夾襖步出監(jiān)牢的瞬間付秕,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評論 1 272
  • 我被黑心中介騙來泰國打工侍郭, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留询吴,地道東北人。 一個月前我還...
    沈念sama閱讀 48,409評論 3 373
  • 正文 我出身青樓亮元,卻偏偏與公主長得像猛计,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子爆捞,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,086評論 2 355

推薦閱讀更多精彩內(nèi)容