FlinkSQL字段血緣解決方案及源碼

核心思想是通過Calcite解析SQL生成關(guān)系表達(dá)式RelNode樹隐解,再通過優(yōu)化得到optimized logical paln,最后調(diào)用Calcite RelMetadataQuery獲取字段級別的血緣關(guān)系咕晋。

源碼地址: https://github.com/HamaWhiteGG/flink-sql-lineage

序號 作者 版本 時間 備注
1 HamaWhite 1.0.0 2022-08-09 1. 增加文檔和源碼
2 HamaWhite 2.0.0 2022-11-24 1. 支持Watermark
2. 支持UDTF
3. 改變Calcite源碼修改方式
4. 升級Hudi和Mysql CDC版本
3 HamaWhite 2.0.1 2022-12-01 1. 支持field AS LOCALTIMESTAMP
4 HamaWhite 2.0.2 2022-12-30 1. 支持 Flink CEP SQL
2. 支持 ROW_NUMBER()

一、基礎(chǔ)知識

1.1 Apache Calcite簡介

Apache Calcite是一款開源的動態(tài)數(shù)據(jù)管理框架,它提供了標(biāo)準(zhǔn)的SQL語言蜈项、多種查詢優(yōu)化和連接各種數(shù)據(jù)源的能力,但不包括數(shù)據(jù)存儲续挟、處理數(shù)據(jù)的算法和存儲元數(shù)據(jù)的存儲庫紧卒。Calcite采用的是業(yè)界大數(shù)據(jù)查詢框架的一種通用思路,它的目標(biāo)是“one size fits all”诗祸,希望能為不同計算平臺和數(shù)據(jù)源提供統(tǒng)一的查詢引擎跑芳。Calcite作為一個強(qiáng)大的SQL計算引擎,在Flink內(nèi)部的SQL引擎模塊也是基于Calcite贬媒。

Calcite工作流程如下圖所示聋亡,一般分為Parser、Validator和Converter际乘、Optimizer階段坡倔。


1.1 Calcite workflow diagram.png

詳情請參考How to screw SQL to anything with Apache Calcite

1.2 Calcite RelNode介紹

在CalciteSQL解析中,Parser解析后生成的SqlNode語法樹脖含,經(jīng)過Validator校驗后在Converter階段會把SqlNode抽象語法樹轉(zhuǎn)為關(guān)系運算符樹(RelNode Tree)罪塔,如下圖所示。


1.2 Calcite SqlNode vs RelNode.png

1.3 組件版本信息

組件名稱 版本 備注
Flink 1.14.4
Hadoop 3.2.2
Hive 3.1.2
Hudi-flink1.14-bundle 0.12.1
Flink-connector-mysql-cdc 2.2.1
JDK 1.8
Scala 2.12

二养葵、字段血緣解析核心思想

2.1 FlinkSQL 執(zhí)行流程解析

根據(jù)源碼整理出FlinkSQL的執(zhí)行流程如下圖所示征堪,主要分為五個階段:

  1. Parse階段

語法解析,使用JavaCC把SQL轉(zhuǎn)換成抽象語法樹(AST)关拒,在Calcite中用SqlNode來表示佃蚜。

  1. Validate階段

語法校驗,根據(jù)元數(shù)據(jù)信息進(jìn)行語法驗證着绊,例如查詢的表谐算、字段、函數(shù)是否存在归露,會分別對from洲脂、where、group by剧包、having恐锦、select、orader by等子句進(jìn)行validate疆液,驗證后還是SqlNode構(gòu)成的語法樹AST一铅。

  1. Convert階段

語義分析,根據(jù)SqlNode和元數(shù)據(jù)信息構(gòu)建關(guān)系表達(dá)式RelNode樹枚粘,也就是最初版本的邏輯計劃馅闽。

  1. Optimize階段

邏輯計劃優(yōu)化飘蚯,優(yōu)化器會基于規(guī)則進(jìn)行等價變換,例如謂詞下推福也、列裁剪等局骤,最終得到最優(yōu)的查詢計劃。

  1. Execute階段

把邏輯查詢計劃翻譯成物理執(zhí)行計劃暴凑,依次生成StreamGraph峦甩、JobGraph,最終提交運行现喳。


2.1 FlinkSQL execution flowchart.png

注1: 圖中的Abstract Syntax Tree凯傲、Optimized Physical Plan、Optimized Execution Plan嗦篱、Physical Execution Plan名稱來源于StreamPlanner中的explain()方法冰单。

注2: 相比Calcite官方工作流程圖,此處把Validate和Convert分為兩個階段灸促。

2.2 字段血緣解析思路

2.2 FlinkSQL field lineage analysis thought.png

FlinkSQL字段血緣解析分為三個階段:

  1. 對輸入SQL進(jìn)行Parse诫欠、Validate、Convert浴栽,生成關(guān)系表達(dá)式RelNode樹荒叼,對應(yīng)FlinkSQL 執(zhí)行流程圖中的第1、2和3步驟典鸡。
  2. 在優(yōu)化階段被廓,只生成到Optimized Logical Plan即可,而非原本的Optimized Physical Plan。要修正FlinkSQL 執(zhí)行流程圖中的第4步驟。
2.2 FlinkSQL field lineage analysis flowchart.png
  1. 針對上步驟優(yōu)化生成的邏輯RelNode,調(diào)用RelMetadataQuery的getColumnOrigins(RelNode rel, int column)查詢原始字段信息。然后構(gòu)造血緣關(guān)系待讳,并返回結(jié)果。

2.3 核心源碼闡述

parseFieldLineage(String sql)方法是對外提供的字段血緣解析API,里面分別執(zhí)行三大步驟谅摄。

public List<FieldLineage> parseFieldLineage(String sql) {
    LOG.info("Input Sql: \n {}", sql);
    // 1. Generate original relNode tree
    Tuple2<String, RelNode> parsed = parseStatement(sql);
    String sinkTable = parsed.getField(0);
    RelNode oriRelNode = parsed.getField(1);

    // 2. Optimize original relNode to generate Optimized Logical Plan
    RelNode optRelNode = optimize(oriRelNode);

    // 3. Build lineage based from RelMetadataQuery
    return buildFiledLineageResult(sinkTable, optRelNode);
}

2.3.1 根據(jù)SQL生成RelNode樹

調(diào)用ParserImpl.List<Operation> parse(String statement) 方法即可,然后返回第一個operation中的calciteTree多律。此代碼限制只支持Insert的血緣關(guān)系痴突。

private Tuple2<String, RelNode> parseStatement(String sql) {
    List<Operation> operations = tableEnv.getParser().parse(sql);
    
    if (operations.size() != 1) {
        throw new TableException(
            "Unsupported SQL query! only accepts a single SQL statement.");
    }
    Operation operation = operations.get(0);
    if (operation instanceof CatalogSinkModifyOperation) {
        CatalogSinkModifyOperation sinkOperation = (CatalogSinkModifyOperation) operation;
        
        PlannerQueryOperation queryOperation = (PlannerQueryOperation) sinkOperation.getChild();
        RelNode relNode = queryOperation.getCalciteTree();
        return new Tuple2<>(
            sinkOperation.getTableIdentifier().asSummaryString(),
            relNode);
    } else {
        throw new TableException("Only insert is supported now.");
    }
}

2.3.2 生成Optimized Logical Plan

在第4步驟的邏輯計劃優(yōu)化階段,根據(jù)源碼可知核心是調(diào)用FlinkStreamProgram的中的優(yōu)化策略狼荞,共包含12個階段(subquery_rewrite辽装、temporal_join_rewrite...logical_rewrite、time_indicator相味、physical拾积、physical_rewrite),優(yōu)化后生成的是Optimized Physical Plan。
根據(jù)SQL的字段血緣解析原理可知拓巧,只要解析到logical_rewrite優(yōu)化后即可斯碌,因此復(fù)制FlinkStreamProgram源碼為FlinkStreamProgramWithoutPhysical類,并刪除time_indicator肛度、physical傻唾、physical_rewrite策略及最后面chainedProgram.addLast相關(guān)代碼。然后調(diào)用optimize方法核心代碼如下:


//  this.flinkChainedProgram = FlinkStreamProgramWithoutPhysical.buildProgram(configuration);

/**
 *  Calling each program's optimize method in sequence.
 */
private RelNode optimize(RelNode relNode) {
    return flinkChainedProgram.optimize(relNode, new StreamOptimizeContext() {
        @Override
        public boolean isBatchMode() {
            return false;
        }

        @Override
        public TableConfig getTableConfig() {
            return tableEnv.getConfig();
        }

        @Override
        public FunctionCatalog getFunctionCatalog() {
            return getPlanner().getFlinkContext().getFunctionCatalog();
        }

        @Override
        public CatalogManager getCatalogManager() {
            return tableEnv.getCatalogManager();
        }

        @Override
        public SqlExprToRexConverterFactory getSqlExprToRexConverterFactory() {
            return getPlanner().getFlinkContext().getSqlExprToRexConverterFactory();
        }

        @Override
        public <C> C unwrap(Class<C> clazz) {
            return getPlanner().getFlinkContext().unwrap(clazz);

        }

        @Override
        public FlinkRelBuilder getFlinkRelBuilder() {
            return getPlanner().getRelBuilder();
        }

        @Override
        public boolean needFinalTimeIndicatorConversion() {
            return true;
        }

        @Override
        public boolean isUpdateBeforeRequired() {
            return false;
        }

        @Override
        public MiniBatchInterval getMiniBatchInterval() {
            return MiniBatchInterval.NONE;
        }


        private PlannerBase getPlanner() {
            return (PlannerBase) tableEnv.getPlanner();
        }
    });
}

注: 此代碼可參考StreamCommonSubGraphBasedOptimizer中的optimizeTree方法來書寫承耿。

2.3.3 查詢原始字段并構(gòu)造血緣

調(diào)用RelMetadataQuery的getColumnOrigins(RelNode rel, int column)查詢原始字段信息冠骄,然后構(gòu)造血緣關(guān)系,并返回結(jié)果加袋。

buildFiledLineageResult(String sinkTable, RelNode optRelNode)

private List<FieldLineage> buildFiledLineageResult(String sinkTable, RelNode optRelNode) {
    // target columns
    List<String> targetColumnList = tableEnv.from(sinkTable)
            .getResolvedSchema()
            .getColumnNames();
    
    // check the size of query and sink fields match
    validateSchema(sinkTable, optRelNode, targetColumnList);

    RelMetadataQuery metadataQuery = optRelNode.getCluster().getMetadataQuery();

    List<FieldLineage> fieldLineageList = new ArrayList<>();

    for (int index = 0; index < targetColumnList.size(); index++) {
        String targetColumn = targetColumnList.get(index);

        LOG.debug("**********************************************************");
        LOG.debug("Target table: {}", sinkTable);
        LOG.debug("Target column: {}", targetColumn);

        Set<RelColumnOrigin> relColumnOriginSet = metadataQuery.getColumnOrigins(optRelNode, index);

        if (CollectionUtils.isNotEmpty(relColumnOriginSet)) {
            for (RelColumnOrigin relColumnOrigin : relColumnOriginSet) {
                // table
                RelOptTable table = relColumnOrigin.getOriginTable();
                String sourceTable = String.join(".", table.getQualifiedName());

                // filed
                int ordinal = relColumnOrigin.getOriginColumnOrdinal();
                List<String> fieldNames = table.getRowType().getFieldNames();
                String sourceColumn = fieldNames.get(ordinal);
                LOG.debug("----------------------------------------------------------");
                LOG.debug("Source table: {}", sourceTable);
                LOG.debug("Source column: {}", sourceColumn);

                // add record
                fieldLineageList.add(buildRecord(sourceTable, sourceColumn, sinkTable, targetColumn));
            }
        }
    }
    return fieldLineageList;
}

三凛辣、測試結(jié)果

詳細(xì)測試用例可查看代碼中的單測,此處只描述部分測試點职烧。

3.1 建表語句

下面新建三張表扁誓,分別是: ods_mysql_users、dim_mysql_company和dwd_hudi_users阳堕。

3.1.1 新建mysql cdc table-ods_mysql_users

DROP TABLE IF EXISTS ods_mysql_users;

CREATE TABLE ods_mysql_users(
  id BIGINT,
  name STRING,
  birthday TIMESTAMP(3),
  ts TIMESTAMP(3),
  proc_time as proctime()
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '192.168.90.xxx',
  'port' = '3306',
  'username' = 'root',
  'password' = 'xxx',
  'server-time-zone' = 'Asia/Shanghai',
  'database-name' = 'demo',
  'table-name' = 'users'
);

3.1.2 新建mysql dim table-dim_mysql_company

DROP TABLE IF EXISTS dim_mysql_company;

CREATE TABLE dim_mysql_company (
    user_id BIGINT, 
    company_name STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.168.90.xxx:3306/demo?useSSL=false&characterEncoding=UTF-8',
    'username' = 'root',
    'password' = 'xxx',
    'table-name' = 'company'
);

3.1.3 新建hudi sink table-dwd_hudi_users

DROP TABLE IF EXISTS dwd_hudi_users;

CREATE TABLE dwd_hudi_users (
    id BIGINT,
    name STRING,
    company_name STRING,
    birthday TIMESTAMP(3),
    ts TIMESTAMP(3),
    `partition` VARCHAR(20)
) PARTITIONED BY (`partition`) WITH (
    'connector' = 'hudi',
    'table.type' = 'COPY_ON_WRITE',
    'path' = 'hdfs://192.168.90.xxx:9000/hudi/dwd_hudi_users',
    'read.streaming.enabled' = 'true',
    'read.streaming.check-interval' = '1'
);

3.2 測試SQL及血緣結(jié)果

3.2.1 測試insert-select

  • 測試SQL
INSERT INTO
    dwd_hudi_users
SELECT
    id,
    name,
    name as company_name,
    birthday,
    ts,
    DATE_FORMAT(birthday, 'yyyyMMdd')
FROM
    ods_mysql_users
  • 測試結(jié)果
sourceTable sourceColumn targetTable targetColumn
ods_mysql_users id dwd_hudi_users id
ods_mysql_users name dwd_hudi_users name
ods_mysql_users name dwd_hudi_users company_name
ods_mysql_users birthday dwd_hudi_users birthday
ods_mysql_users ts dwd_hudi_users ts
ods_mysql_users birthday dwd_hudi_users partition

3.2.2 測試insert-select-join

  • 測試SQL
SELECT
    a.id as id1,
    CONCAT(a.name, b.company_name),
    b.company_name,
    a.birthday,
    a.ts,
    DATE_FORMAT(a.birthday, 'yyyyMMdd') as p
FROM
    ods_mysql_users as a
JOIN 
    dim_mysql_company as b
ON a.id = b.user_id
  • RelNode樹展示

Original RelNode

 LogicalProject(id1=[$0], EXPR$1=[CONCAT($1, $6)], company_name=[$6], birthday=[$2], ts=[$3], p=[DATE_FORMAT($2, _UTF-16LE'yyyyMMdd')])
  LogicalJoin(condition=[=($0, $5)], joinType=[inner])
    LogicalProject(id=[$0], name=[$1], birthday=[$2], ts=[$3], proc_time=[PROCTIME()])
      LogicalTableScan(table=[[hive, flink_demo, ods_mysql_users]])
    LogicalTableScan(table=[[hive, flink_demo, dim_mysql_company]])

經(jīng)過optimize(RelNode relNode)優(yōu)化后的Optimized RelNode結(jié)果如下:

 FlinkLogicalCalc(select=[id AS id1, CONCAT(name, company_name) AS EXPR$1, company_name, birthday, ts, DATE_FORMAT(birthday, _UTF-16LE'yyyyMMdd') AS p])
  FlinkLogicalJoin(condition=[=($0, $4)], joinType=[inner])
    FlinkLogicalTableSourceScan(table=[[hive, flink_demo, ods_mysql_users]], fields=[id, name, birthday, ts])
    FlinkLogicalTableSourceScan(table=[[hive, flink_demo, dim_mysql_company]], fields=[user_id, company_name])
  • 測試結(jié)果
sourceTable sourceColumn targetTable targetColumn
ods_mysql_users id dwd_hudi_users id
dim_mysql_company company_name dwd_hudi_users name
ods_mysql_users name dwd_hudi_users name
dim_mysql_company company_name dwd_hudi_users company_name
ods_mysql_users birthday dwd_hudi_users birthday
ods_mysql_users ts dwd_hudi_users ts
ods_mysql_users birthday dwd_hudi_users partition

3.2.3 測試insert-select-lookup-join

上述步驟完成后還不支持Lookup Join的字段血緣解析跋理,測試情況如下所述。

  • 測試SQL
SELECT
    a.id as id1,
    CONCAT(a.name, b.company_name),
    b.company_name,
    a.birthday,
    a.ts,
    DATE_FORMAT(a.birthday, 'yyyyMMdd') as p
FROM
    ods_mysql_users as a
JOIN 
    dim_mysql_company FOR SYSTEM_TIME AS OF a.proc_time AS b
ON a.id = b.user_id
  • 測試結(jié)果
sourceTable sourceColumn targetTable targetColumn
ods_mysql_users id dwd_hudi_users id
ods_mysql_users name dwd_hudi_users name
ods_mysql_users birthday dwd_hudi_users birthday
ods_mysql_users ts dwd_hudi_users ts
ods_mysql_users birthday dwd_hudi_users partition

可以看到恬总,維表dim_mysql_company的字段血緣關(guān)系都被丟失掉前普,因此繼續(xù)進(jìn)行下面的步驟。

四壹堰、修改Calcite源碼支持Lookup Join

4.1 實現(xiàn)思路

針對Lookup Join拭卿,Parser會把SQL語句'FOR SYSTEM_TIME AS OF'解析成 SqlSnapshot ( SqlNode),validate() 將其轉(zhuǎn)換成 LogicalSnapshot(RelNode)贱纠。

Lookup Join-Original RelNode

 LogicalProject(id1=[$0], EXPR$1=[CONCAT($1, $6)], company_name=[$6], birthday=[$2], ts=[$3], p=[DATE_FORMAT($2, _UTF-16LE'yyyyMMdd')])
  LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 4}])
    LogicalProject(id=[$0], name=[$1], birthday=[$2], ts=[$3], proc_time=[PROCTIME()])
      LogicalTableScan(table=[[hive, flink_demo, ods_mysql_users]])
    LogicalFilter(condition=[=($cor0.id, $0)])
      LogicalSnapshot(period=[$cor0.proc_time])
        LogicalTableScan(table=[[hive, flink_demo, dim_mysql_company]])

但calcite-core中RelMdColumnOrigins這個Handler類里并沒有處理Snapshot類型的RelNode峻厚,導(dǎo)致返回空,繼而丟失Lookup Join字段的血緣關(guān)系谆焊。因此惠桃,需要在RelMdColumnOrigins增加一個處理Snapshot的getColumnOrigins(Snapshot rel,RelMetadataQuery mq, int iOutputColumn)方法。

由于flink-table-planner是采用maven-shade-plugin打包的辖试,因此修改calcite-core后要重新打flink包辜王。flink-table/flink-table-planner/pom.xml。


<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  ...
    <artifactSet>
      <includes combine.children="append">
        <include>org.apache.calcite:*</include>
        <include>org.apache.calcite.avatica:*</include>
  ...             

本文在下面的4.2-4.4小節(jié)給出基礎(chǔ)性操作步驟罐孝,分別講述如何修改calcite呐馆、flink源碼,以及如何編譯莲兢、打包汹来。

同時在4.5小節(jié)也提供另外一種實現(xiàn)路徑续膳,即通過動態(tài)編輯Java字節(jié)碼技術(shù)來增加getColumnOrigins方法,源碼已默認(rèn)采用此技術(shù)收班,讀者也可直接跳到4.5小節(jié)進(jìn)行閱讀坟岔。

4.2 重新編譯Calcite源碼

4.2.1 下載源碼及創(chuàng)建分支

flink1.14.4依賴的calcite版本是1.26.0,因此基于tag calcite-1.26.0來修改源碼闺阱。并且在原有3位版本號后面再增加一位版本號炮车,以區(qū)別于官方發(fā)布的版本。

# 下載github上源碼
$ git clone git@github.com:apache/calcite.git

# 切換到 calcite-1.26.0 tag
$ git checkout calcite-1.26.0

# 新建分支calcite-1.26.0.1
$ git checkout -b calcite-1.26.0.1

4.2.2 修改源碼

  1. 在calcite-core模塊酣溃,給RelMdColumnOrigins類增加方法 getColumnOrigins(Snapshot rel,RelMetadataQuery mq, int iOutputColumn)瘦穆。org.apache.calcite.rel.metadata.RelMdColumnOrigins
  /**
   * Support the field blood relationship of lookup join
   */
  public Set<RelColumnOrigin> getColumnOrigins(Snapshot rel,
                                               RelMetadataQuery mq, int iOutputColumn) {
      return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
  }
  1. 修改版本號為 1.26.0.1,calcite/gradle.properties
# 修改前
calcite.version=1.26.0
# 修改后
calcite.version=1.26.0.1
  1. 刪除打包名稱上的SNAPSHOT赊豌,由于未研究出Gradlew 打包參數(shù)扛或,此處直接修改build.gradle.kts代碼。

    calcite/build.gradle.kts

# 修改前
val buildVersion = "calcite".v + releaseParams.snapshotSuffix

# 修改后
val buildVersion = "calcite".v

4.2.3 編譯源碼和推送到本地倉庫

# 編譯源碼
$ ./gradlew build -x test 

# 推送到本地倉庫
$ ./gradlew publishToMavenLocal

運行成功后查看本地maven倉庫碘饼,已經(jīng)產(chǎn)生calcite-core-1.26.0.1.jar熙兔。

$ ll ~/.m2/repository/org/apache/calcite/calcite-core/1.26.0.1

-rw-r--r--  1 baisong  staff  8893065  8  9 13:51 calcite-core-1.26.0.1-javadoc.jar
-rw-r--r--  1 baisong  staff  3386193  8  9 13:51 calcite-core-1.26.0.1-sources.jar
-rw-r--r--  1 baisong  staff  2824504  8  9 13:51 calcite-core-1.26.0.1-tests.jar
-rw-r--r--  1 baisong  staff  5813238  8  9 13:51 calcite-core-1.26.0.1.jar
-rw-r--r--  1 baisong  staff     5416  8  9 13:51 calcite-core-1.26.0.1.pom

4.3 重新編譯Flink源碼

4.2.1 下載源碼及創(chuàng)建分支

基于tag release-1.14.4來修改源碼。并且在原有3位版本號后面再增加一位版本號艾恼,以區(qū)別于官方發(fā)布的版本住涉。

# 下載github上flink源碼
$ git clone git@github.com:apache/flink.git

# 切換到 release-1.14.4 tag
$ git checkout release-1.14.4

# 新建分支release-1.14.4.1
$ git checkout -b release-1.14.4.1

4.3.2 修改源碼

  1. 在flink-table模塊,修改calcite.version的版本為 1.26.0.1钠绍,flink-table-planner會引用此版本號舆声。即讓flink-table-planner引用calcite-core-1.26.0.1。flink/flink-table/pom.xml柳爽。
<properties>
    <!-- When updating Janino, make sure that Calcite supports it as well. -->
    <janino.version>3.0.11</janino.version>
    <!--<calcite.version>1.26.0</calcite.version>-->
    <calcite.version>1.26.0.1</calcite.version>
    <guava.version>29.0-jre</guava.version>
</properties>
  1. 修改flink-table-planner版本號為1.14.4.1媳握,包含下面3點。flink/flink-table/flink-table-planner/pom.xml磷脯。

<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<!--1. 新增此行-->
<version>1.14.4.1</version>
<name>Flink : Table : Planner</name>

<!--2. 全局替換${project.version}為${parent.version}-->

<!--3. 新增加此依賴蛾找,強(qiáng)制指定flink-test-utils-junit版本,否則編譯會報錯-->
<dependency>
    <artifactId>flink-test-utils-junit</artifactId>
    <groupId>org.apache.flink</groupId>
    <version>${parent.version}</version>
    <scope>test</scope>
</dependency>

4.3.3 編譯源碼和推送到遠(yuǎn)程倉庫

# 只編譯 flink-table-planner
$ mvn clean install -pl flink-table/flink-table-planner -am -Dscala-2.12 -DskipTests -Dfast -Drat.skip=true -Dcheckstyle.skip=true -Pskip-webui-build

運行成功后查看本地maven倉庫赵誓,已經(jīng)產(chǎn)生flink-table-planner_2.12-1.14.4.1.jar

$ ll ~/.m2/repository/org/apache/flink/flink-table-planner_2.12/1.14.4.1

-rw-r--r--  1 baisong  staff  11514580 11 24 18:27 flink-table-planner_2.12-1.14.4.1-tests.jar
-rw-r--r--  1 baisong  staff  35776592 11 24 18:28 flink-table-planner_2.12-1.14.4.1.jar
-rw-r--r--  1 baisong  staff        40 11 23 17:13 flink-table-planner_2.12-1.14.4.1.jar.sha1
-rw-r--r--  1 baisong  staff     15666 11 24 18:28 flink-table-planner_2.12-1.14.4.1.pom
-rw-r--r--  1 baisong  staff        40 11 23 17:12 flink-table-planner_2.12-1.14.4.1.pom.sha1

如果要推送到Maven倉庫打毛,修改pom.xml 增加倉庫地址。

<distributionManagement>
    <repository>
        <id>releases</id>
        <url>http://xxx.xxx-inc.com/repository/maven-releases</url>
    </repository>
    <snapshotRepository>
        <id>snapshots</id>
        <url>http://xxx.xxx-inc.com/repository/maven-snapshots</url>
    </snapshotRepository>
</distributionManagement>
# 進(jìn)入flink-table-planner模塊
$ cd flink-table/flink-table-planner

# 推送到到遠(yuǎn)程倉庫
$ mvn clean deploy -Dscala-2.12 -DskipTests -Dfast -Drat.skip=true -Dcheckstyle.skip=true -Pskip-webui-build -T 1C

4.4 修改Flink依賴版本并測試Lookup Join

修改pom.xml中依賴的flink-table-planner的版本為1.14.4.1俩功。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.14.4.1</version>
</dependency>

執(zhí)行第3.2.3章節(jié)的測試用例得到Lookup Join血緣結(jié)果如下隘冲,已經(jīng)包含維表dim_mysql_company的字段血緣關(guān)系。

sourceTable sourceColumn targetTable targetColumn
ods_mysql_users id dwd_hudi_users id
dim_mysql_company company_name dwd_hudi_users name
ods_mysql_users name dwd_hudi_users name
dim_mysql_company company_name dwd_hudi_users company_name
ods_mysql_users birthday dwd_hudi_users birthday
ods_mysql_users ts dwd_hudi_users ts
ods_mysql_users birthday dwd_hudi_users partition

4.5 動態(tài)編輯Java字節(jié)碼增加getColumnOrigins方法

Javassist是可以動態(tài)編輯Java字節(jié)碼的類庫绑雄,它可以在Java程序運行時定義一個新的類并加載到JVM中,還可以在JVM加載時修改一個類文件奥邮。
因此万牺,本文通過Javassist技術(shù)來動態(tài)給RelMdColumnOrigins類增加getColumnOrigins(Snapshot rel,RelMetadataQuery mq, int iOutputColumn)方法罗珍。

核心代碼如下:

/**
 * Dynamic add getColumnOrigins method to class RelMdColumnOrigins by javassist:
 *
 * public Set<RelColumnOrigin> getColumnOrigins(Snapshot rel,RelMetadataQuery mq, int iOutputColumn) {
 *      return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
 * }
 */
static {
    try {
        ClassPool classPool = ClassPool.getDefault();
        CtClass ctClass = classPool.getCtClass("org.apache.calcite.rel.metadata.RelMdColumnOrigins");

        CtClass[] parameters = new CtClass[]{classPool.get(Snapshot.class.getName())
                , classPool.get(RelMetadataQuery.class.getName())
                , CtClass.intType
        };
        // add method
        CtMethod ctMethod = new CtMethod(classPool.get("java.util.Set"), "getColumnOrigins", parameters, ctClass);
        ctMethod.setModifiers(Modifier.PUBLIC);
        ctMethod.setBody("{return $2.getColumnOrigins($1.getInput(), $3);}");
        ctClass.addMethod(ctMethod);
        // load the class
        ctClass.toClass();
    } catch (Exception e) {
        throw new TableException("Dynamic add getColumnOrigins() method exception.", e);
    }
}

注1: 也可把RelMdColumnOrigins類及package拷貝到項目中,然后手動增加getColumnOrigins方法脚粟。但是此方法兼容性不夠友好覆旱,后續(xù)calcite源碼進(jìn)行迭代后血緣代碼要跟隨calcite一起修正。

上述代碼增加后核无,執(zhí)行Lookup Join的測試用例后就能看到維表dim_mysql_company的字段血緣關(guān)系扣唱,如4.4節(jié)的表格所示。

五团南、Flink其他高級語法支持

在1.0.0版本發(fā)布后噪沙,經(jīng)過讀者@SinyoWong實踐測試發(fā)現(xiàn)還不支持Table Functions(UDTF)和Watermark語法的字段血緣解析,于是開始進(jìn)一步完善代碼吐根。

詳見issue: https://github.com/HamaWhiteGG/flink-sql-lineage/issues/3正歼,在此表示感謝。

5.1 改變Calcite源碼修改方式

由于下面步驟還需要修改Calcite源碼中的RelMdColumnOrigins類拷橘,第四章節(jié)介紹的兩種修改Calcite源碼重新編譯和動態(tài)編輯字節(jié)碼方法都太過于笨重局义,
因此直接在本項目下新建org.apache.calcite.rel.metadata.RelMdColumnOrigins類,把Calcite的源碼拷貝過來后進(jìn)行修改冗疮。

記得把支持Lookup Join添加的getColumnOrigins(Snapshot rel,RelMetadataQuery mq, int iOutputColumn)增加進(jìn)來萄唇。

  /**
   * Support the field blood relationship of lookup join
   */
  public Set<RelColumnOrigin> getColumnOrigins(Snapshot rel,
                                               RelMetadataQuery mq, int iOutputColumn) {
      return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
  }

5.2 支持Table Functions

5.2.1 新建UDTF

  • 自定義Table Function 類
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public class MySplitFunction extends TableFunction<Row> {

    public void eval(String str) {
        for (String s : str.split(" ")) {
            // use collect(...) to emit a row
            collect(Row.of(s, s.length()));
        }
    }
}
  • 新建my_split_udtf函數(shù)
DROP FUNCTION IF EXISTS my_split_udtf;

CREATE FUNCTION IF NOT EXISTS my_split_udtf 
  AS 'com.dtwave.flink.lineage.tablefuncion.MySplitFunction';

5.2.2 測試UDTF SQL

INSERT INTO
  dwd_hudi_users
SELECT
  length,
  name,
  word as company_name,
  birthday,
  ts,
  DATE_FORMAT(birthday, 'yyyyMMdd')
FROM
  ods_mysql_users,
  LATERAL TABLE (my_split_udtf (name))

5.2.3 分析Optimized Logical Plan

生成Optimized Logical Plan的如下:

 FlinkLogicalCalc(select=[length, name, word AS company_name, birthday, ts, DATE_FORMAT(birthday, _UTF-16LE'yyyyMMdd') AS EXPR$5])
  FlinkLogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{1}])
    FlinkLogicalCalc(select=[id, name, birthday, ts, PROCTIME() AS proc_time])
      FlinkLogicalTableSourceScan(table=[[hive, flink_demo, ods_mysql_users]], fields=[id, name, birthday, ts])
    FlinkLogicalTableFunctionScan(invocation=[my_split_udtf($cor0.name)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) word, INTEGER length)])

可以看到中間生成 FlinkLogicalCorrelate, 源碼調(diào)試過程中的變量信息如下圖:


5.2 Table Function debugging variable.png

分析繼承關(guān)系:

# FlinkLogicalCorrelate
FlinkLogicalCorrelate -> Correlate -> BiRel -> AbstractRelNode -> RelNode

# Join(Join和Correlate有類似,此處也展示下)
Join -> BiRel -> AbstractRelNode -> RelNode

# FlinkLogicalTableSourceScan
FlinkLogicalTableSourceScan -> TableScan ->AbstractRelNode -> RelNode
          
# FlinkLogicalTableFunctionScan
FlinkLogicalTableFunctionScan -> TableFunctionScan ->AbstractRelNode -> RelNode      

5.2.4 新增getColumnOrigins(Correlate rel, RelMetadataQuery mq, int iOutputColumn)方法

在org.apache.calcite.rel.metadata.RelMdColumnOrigins類的getColumnOrigins()的方法中术幔,發(fā)現(xiàn)沒有Correlate作為參數(shù)的方法另萤,因此解析不出UDTF的字段血緣關(guān)系。

由于Correlate和Join都繼承自BiRel特愿,即有l(wèi)eft和right兩個RelNode仲墨。因此在書寫Correlate的解析時可參考下已有的getColumnOrigins(Join rel, RelMetadataQuery mq,int iOutputColumn)方法。

LATERAL TABLE (my_split_udtf (name))生成的臨時表兩個字段word和length揍障,本質(zhì)是來自dwd_hudi_users表的name字段目养。
因此針對右邊的LATERAL TABLE獲取UDTF中的字段,然后再根據(jù)字段名獲取左表信息和索引毒嫡,最終是獲取的是左表的字段血緣關(guān)系癌蚁。

核心代碼如下:

/**
 * Support the field blood relationship of table function
 */
public Set<RelColumnOrigin> getColumnOrigins(Correlate rel, RelMetadataQuery mq, int iOutputColumn) {

    List<RelDataTypeField> leftFieldList = rel.getLeft().getRowType().getFieldList();

    int nLeftColumns = leftFieldList.size();
    Set<RelColumnOrigin> set;
    if (iOutputColumn < nLeftColumns) {
        set = mq.getColumnOrigins(rel.getLeft(), iOutputColumn);
    } else {
        // get the field name of the left table configured in the Table Function on the right
        TableFunctionScan tableFunctionScan = (TableFunctionScan) rel.getRight();
        RexCall rexCall = (RexCall) tableFunctionScan.getCall();
        // support only one field in table function
        RexFieldAccess rexFieldAccess = (RexFieldAccess) rexCall.operands.get(0);
        String fieldName = rexFieldAccess.getField().getName();

        int leftFieldIndex = 0;
        for (int i = 0; i < nLeftColumns; i++) {
            if (leftFieldList.get(i).getName().equalsIgnoreCase(fieldName)) {
                leftFieldIndex = i;
                break;
            }
        }
        /**
         * Get the fields from the left table, don't go to
         * getColumnOrigins(TableFunctionScan rel,RelMetadataQuery mq, int iOutputColumn),
         * otherwise the return is null, and the UDTF field origin cannot be parsed
         */
        set = mq.getColumnOrigins(rel.getLeft(), leftFieldIndex);
    }
    return set;
}

注: 在Logical Plan中可以看到right RelNode是FlinkLogicalTableFunctionScan類型,繼承自TableFunctionScan兜畸,但在已有g(shù)etColumnOrigins(TableFunctionScan rel,RelMetadataQuery mq, int iOutputColumn) 獲取的結(jié)果是null努释。
剛開始也嘗試修改此方法,但一直無法獲取的左表的信息咬摇。因此改為在getColumnOrigins(Correlate rel, RelMetadataQuery mq, int iOutputColumn) 獲取右變LATERAL TABLE血緣的代碼伐蒂。

5.2.5 測試結(jié)果

sourceTable sourceColumn targetTable targetColumn
ods_mysql_users name dwd_hudi_users id
ods_mysql_users name dwd_hudi_users name
ods_mysql_users name dwd_hudi_users company_name
ods_mysql_users birthday dwd_hudi_users birthday
ods_mysql_users ts dwd_hudi_users ts
ods_mysql_users birthday dwd_hudi_users partition

注: SQL中的word和length本質(zhì)是來自dwd_hudi_users表的name字段,因此字段血緣關(guān)系展示的是name肛鹏。
即 ods_mysql_users.name -> length -> dwd_hudi_users.id 和 ods_mysql_users.name -> word -> dwd_hudi_users.company_name

5.3 支持Watermark

5.3.1 新建ods_mysql_users_watermark

DROP TABLE IF EXISTS ods_mysql_users_watermark;

CREATE TABLE ods_mysql_users_watermark(
  id BIGINT,
  name STRING,
  birthday TIMESTAMP(3),
  ts TIMESTAMP(3),
  proc_time as proctime(),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '192.168.90.xxx',
  'port' = '3306',
  'username' = 'root',
  'password' = 'xxx',
  'server-time-zone' = 'Asia/Shanghai',
  'database-name' = 'demo',
  'table-name' = 'users'
);

5.3.2 測試Watermark SQL

INSERT INTO
    dwd_hudi_users
SELECT
    id,
    name,
    name as company_name,
    birthday,
    ts,
    DATE_FORMAT(birthday, 'yyyyMMdd')
FROM
    ods_mysql_users_watermark

5.3.3 分析Optimized Logical Plan

生成Optimized Logical Plan的如下:

 FlinkLogicalCalc(select=[id, name, name AS company_name, birthday, ts, DATE_FORMAT(birthday, _UTF-16LE'yyyyMMdd') AS EXPR$5])
  FlinkLogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 5000:INTERVAL SECOND)])
    FlinkLogicalTableSourceScan(table=[[hive, flink_demo, ods_mysql_users_watermark]], fields=[id, name, birthday, ts])

可以看到中間生成 FlinkLogicalWatermarkAssigner, 分析繼承關(guān)系:

FlinkLogicalWatermarkAssigner -> WatermarkAssigner -> SingleRel -> AbstractRelNode -> RelNode

因此下面增加SingleRel作為參數(shù)的getColumnOrigins方法逸邦。

5.3.4 新增getColumnOrigins(SingleRel rel, RelMetadataQuery mq, int iOutputColumn)方法

 /**
   * Support the field blood relationship of watermark
   */
  public Set<RelColumnOrigin> getColumnOrigins(SingleRel rel,
                                               RelMetadataQuery mq, int iOutputColumn) {
      return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
  } 

5.3.5 測試結(jié)果

sourceTable sourceColumn targetTable targetColumn
ods_mysql_users_watermark id dwd_hudi_users id
ods_mysql_users_watermark name dwd_hudi_users name
ods_mysql_users_watermark name dwd_hudi_users company_name
ods_mysql_users_watermark birthday dwd_hudi_users birthday
ods_mysql_users_watermark ts dwd_hudi_users ts
ods_mysql_users_watermark birthday dwd_hudi_users partition

六恩沛、參考文獻(xiàn)

  1. How to screw SQL to anything with Apache Calcite
  2. 使用build.gradle.kts發(fā)布到mavenLocal
  3. Flink SQL LookupJoin終極解決方案及Flink Rule入門
  4. 基于Calcite解析Flink SQL列級數(shù)據(jù)血緣
  5. 干貨|詳解FlinkSQL實現(xiàn)原理
  6. SQL解析框架: Calcite
  7. Flink1.14-table functions doc
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市缕减,隨后出現(xiàn)的幾起案子雷客,更是在濱河造成了極大的恐慌,老刑警劉巖桥狡,帶你破解...
    沈念sama閱讀 218,682評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件搅裙,死亡現(xiàn)場離奇詭異,居然都是意外死亡裹芝,警方通過查閱死者的電腦和手機(jī)部逮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來局雄,“玉大人甥啄,你說我怎么就攤上這事【娲睿” “怎么了蜈漓?”我有些...
    開封第一講書人閱讀 165,083評論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長宫盔。 經(jīng)常有香客問我融虽,道長,這世上最難降的妖魔是什么灼芭? 我笑而不...
    開封第一講書人閱讀 58,763評論 1 295
  • 正文 為了忘掉前任有额,我火速辦了婚禮,結(jié)果婚禮上彼绷,老公的妹妹穿的比我還像新娘巍佑。我一直安慰自己,他們只是感情好寄悯,可當(dāng)我...
    茶點故事閱讀 67,785評論 6 392
  • 文/花漫 我一把揭開白布萤衰。 她就那樣靜靜地躺著,像睡著了一般猜旬。 火紅的嫁衣襯著肌膚如雪脆栋。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,624評論 1 305
  • 那天洒擦,我揣著相機(jī)與錄音椿争,去河邊找鬼。 笑死熟嫩,一個胖子當(dāng)著我的面吹牛秦踪,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 40,358評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼洋侨,長吁一口氣:“原來是場噩夢啊……” “哼舍扰!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起希坚,我...
    開封第一講書人閱讀 39,261評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎陵且,沒想到半個月后裁僧,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,722評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡慕购,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年聊疲,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片沪悲。...
    茶點故事閱讀 40,030評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡获洲,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出殿如,到底是詐尸還是另有隱情贡珊,我是刑警寧澤,帶...
    沈念sama閱讀 35,737評論 5 346
  • 正文 年R本政府宣布涉馁,位于F島的核電站门岔,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏烤送。R本人自食惡果不足惜寒随,卻給世界環(huán)境...
    茶點故事閱讀 41,360評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望帮坚。 院中可真熱鬧妻往,春花似錦、人聲如沸试和。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽灰署。三九已至判帮,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間溉箕,已是汗流浹背晦墙。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留肴茄,地道東北人晌畅。 一個月前我還...
    沈念sama閱讀 48,237評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像寡痰,于是被迫代替她去往敵國和親抗楔。 傳聞我的和親對象是個殘疾皇子棋凳,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,976評論 2 355

推薦閱讀更多精彩內(nèi)容