基于Calcite解析Flink SQL列級數(shù)據(jù)血緣

數(shù)據(jù)血緣

數(shù)據(jù)血緣(data lineage)是數(shù)據(jù)治理(data governance)的重要組成部分液肌,也是元數(shù)據(jù)管理啥繁、數(shù)據(jù)質(zhì)量管理的有力工具荠卷。通俗地講幻林,數(shù)據(jù)血緣就是數(shù)據(jù)在產(chǎn)生终吼、加工镀赌、流轉(zhuǎn)到最終消費過程中形成的有層次的、可溯源的聯(lián)系际跪。成熟的數(shù)據(jù)血緣系統(tǒng)可以幫助開發(fā)者快速定位問題商佛,以及追蹤數(shù)據(jù)的更改,確定上下游的影響等等姆打。

在數(shù)據(jù)倉庫的場景下良姆,數(shù)據(jù)的載體是數(shù)據(jù)庫中的表和列(字段),相應(yīng)地幔戏,數(shù)據(jù)血緣根據(jù)粒度也可以分為較粗的表級血緣和較細的列(字段)級血緣玛追。離線數(shù)倉的數(shù)據(jù)血緣提取已經(jīng)有了成熟的方法,如利用Hive提供的LineageLogger與Execution Hooks機制闲延。本文就來簡要介紹一種在實時數(shù)倉中基于Calcite解析Flink SQL列級血緣的方法痊剖,在此之前,先用幾句話聊聊Calcite的關(guān)系式元數(shù)據(jù)體系垒玲。

Calcite關(guān)系式元數(shù)據(jù)

在Calcite內(nèi)部陆馁,庫表元數(shù)據(jù)由Catalog來處理,關(guān)系式元數(shù)據(jù)才會被冠以[Rel]Metadata的名稱合愈。關(guān)系式元數(shù)據(jù)與RelNode對應(yīng)叮贩,以下是與其相關(guān)的Calcite組件:

  • RelMetadataQuery:為關(guān)系式元數(shù)據(jù)提供統(tǒng)一的訪問接口击狮;
  • RelMetadataProvider:為RelMetadataQuery各接口提供實現(xiàn)的中間層;
  • MetadataFactory:生產(chǎn)并維護RelMetadataProvider的工廠益老;
  • MetadataHandler:處理關(guān)系式元數(shù)據(jù)的具體實現(xiàn)邏輯彪蓬,全部位于org.apache.calcite.rel.metadata包下,且類名均以RelMd作為前綴捺萌。

Calcite內(nèi)置了許多種默認的關(guān)系式元數(shù)據(jù)實現(xiàn)档冬,并以接口的形式統(tǒng)一維護在BuiltInMetadata抽象類里,如下圖所示互婿,名稱都比較直白(如RowCount就表示該RelNode查詢結(jié)果的行數(shù))捣郊。

其中辽狈,ColumnOrigin.Handler就是負責(zé)解析列級血緣的MetadataHandler慈参,對各類RelNode分別定義了相應(yīng)的尋找起源列的方法,其結(jié)構(gòu)如下圖所示刮萌。具體源碼會另外寫文章專門講解驮配,本文先不提。

處理`Snapshot` RelNode的方法是筆者新增的

注意包括ColumnOrigin.Handler在內(nèi)的絕大多數(shù)MetadataHandler都是靠ReflectiveRelMetadataProvider來發(fā)揮作用着茸。顧名思義壮锻,ReflectiveRelMetadataProvider通過反射取得各個MetadataHandler中的方法,并在內(nèi)部維護RelNode具體類型和通過Java Proxy生成的Metadata代理對象(其中包含Handler方法)的映射涮阔。這樣猜绣,通過RelMetadataQuery獲取關(guān)系式元數(shù)據(jù)時,用戶的請求就可以根據(jù)RelNode類型正確地dispatch到對應(yīng)的方法上去敬特。

另外掰邢,還有少數(shù)MetadataHandler(如CumulativeCost/NonCumulativeCost對應(yīng)的Handlers)在Calcite工程里找不到具體的實現(xiàn)。它們的代碼是運行時生成的伟阔,并由JaninoRelMetadataProvider做動態(tài)編譯辣之。關(guān)于代碼生成和Janino也在計劃中,暫不贅述皱炉。

當然實際應(yīng)用時我們不需要了解這些細節(jié)怀估,只需要與RelMetadataQuery打交道。下面就來看看如何通過它取得我們想要的Flink SQL列血緣合搅。

解析Flink SQL列級血緣

以Flink SQL任務(wù)中最為常見的單條INSERT INTO ... SELECT ...為例多搀,首先我們需要取得SQL語句生成的RelNode對象,即邏輯計劃樹灾部。

為了方便講解康铭,這里筆者簡單粗暴地在o.a.f.table.api.internal.TableEnvironmentImpl類中定義了一個getInsertOperation()方法。它負責(zé)解析梳猪、驗證SQL語句麻削,生成CatalogSinkModifyOperation蒸痹,并取得它的PlannerQueryOperation子節(jié)點(即SELECT操作)。代碼如下呛哟。

public Tuple3<String, Map<String, String>, QueryOperation> getInsertOperation(String insertStmt) {
    List<Operation> operations = getParser().parse(insertStmt);
    if (operations.size() != 1) {
        throw new TableException(
                "Unsupported SQL query! getInsertOperation() only accepts a single INSERT statement.");
    }
    Operation operation = operations.get(0);
    if (operation instanceof CatalogSinkModifyOperation) {
        CatalogSinkModifyOperation sinkOperation = (CatalogSinkModifyOperation) operation;
        QueryOperation queryOperation = sinkOperation.getChild();
        return new Tuple3<>(
                sinkOperation.getTableIdentifier().asSummaryString(),
                sinkOperation.getDynamicOptions(),
                queryOperation);
    } else {
        throw new TableException("Only INSERT is supported now.");
    }
}

接下來就能夠取得Sink的表名以及對應(yīng)的RelNode根節(jié)點叠荠。示例SQL來自之前的<<From Calcite to Tampering with Flink SQL>>講義。

val tableEnv = StreamTableEnvironment.create(streamEnv, EnvironmentSettings.newInstance().build())
val sql = /* language=SQL */
  s"""
     |INSERT INTO tmp.print_joined_result
     |SELECT FROM_UNIXTIME(a.ts / 1000, 'yyyy-MM-dd HH:mm:ss') AS tss, a.userId, a.eventType, a.siteId, b.site_name AS siteName
     |FROM rtdw_ods.kafka_analytics_access_log_app /*+ OPTIONS('scan.startup.mode'='latest-offset','properties.group.id'='DiveIntoBlinkExp') */ a
     |LEFT JOIN rtdw_dim.mysql_site_war_zone_mapping_relation FOR SYSTEM_TIME AS OF a.procTime AS b ON CAST(a.siteId AS INT) = b.main_site_id
     |WHERE a.userId > 7
     |""".stripMargin

val insertOp = tableEnv.asInstanceOf[TableEnvironmentImpl].getInsertOperation(sql)
val tableName = insertOp.f0
val relNode = insertOp.f2.asInstanceOf[PlannerQueryOperation].getCalciteTree

然后對取得的RelNode進行邏輯優(yōu)化扫责,即執(zhí)行之前所講過的FlinkStreamProgram榛鼎,但僅執(zhí)行到LOGICAL_REWRITE階段為止。我們在本地將FlinkStreamProgram復(fù)制一份鳖孤,并刪去PHYSICALPHYSICAL_REWRITE兩個階段者娱,即:

object FlinkStreamProgramLogicalOnly {

  val SUBQUERY_REWRITE = "subquery_rewrite"
  val TEMPORAL_JOIN_REWRITE = "temporal_join_rewrite"
  val DECORRELATE = "decorrelate"
  val TIME_INDICATOR = "time_indicator"
  val DEFAULT_REWRITE = "default_rewrite"
  val PREDICATE_PUSHDOWN = "predicate_pushdown"
  val JOIN_REORDER = "join_reorder"
  val PROJECT_REWRITE = "project_rewrite"
  val LOGICAL = "logical"
  val LOGICAL_REWRITE = "logical_rewrite"

  def buildProgram(config: Configuration): FlinkChainedProgram[StreamOptimizeContext] = {
    val chainedProgram = new FlinkChainedProgram[StreamOptimizeContext]()

    // rewrite sub-queries to joins
    chainedProgram.addLast(
      SUBQUERY_REWRITE,
      FlinkGroupProgramBuilder.newBuilder[StreamOptimizeContext]
        // rewrite QueryOperationCatalogViewTable before rewriting sub-queries
        .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
          .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
          .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
          .add(FlinkStreamRuleSets.TABLE_REF_RULES)
          .build(), "convert table references before rewriting sub-queries to semi-join")
        .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
          .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
          .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
          .add(FlinkStreamRuleSets.SEMI_JOIN_RULES)
          .build(), "rewrite sub-queries to semi-join")
        .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
          .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
          .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
          .add(FlinkStreamRuleSets.TABLE_SUBQUERY_RULES)
          .build(), "sub-queries remove")
        // convert RelOptTableImpl (which exists in SubQuery before) to FlinkRelOptTable
        .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
          .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
          .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
          .add(FlinkStreamRuleSets.TABLE_REF_RULES)
          .build(), "convert table references after sub-queries removed")
        .build())

    // rewrite special temporal join plan
    // ...

    // query decorrelation
    // ...

    // convert time indicators
    // ...

    // default rewrite, includes: predicate simplification, expression reduction, window
    // properties rewrite, etc.
    // ...

    // rule based optimization: push down predicate(s) in where clause, so it only needs to read
    // the required data
    // ...

    // join reorder
    // ...

    // project rewrite
    // ...

    // optimize the logical plan
    chainedProgram.addLast(
      LOGICAL,
      FlinkVolcanoProgramBuilder.newBuilder
        .add(FlinkStreamRuleSets.LOGICAL_OPT_RULES)
        .setRequiredOutputTraits(Array(FlinkConventions.LOGICAL))
        .build())

    // logical rewrite
    chainedProgram.addLast(
      LOGICAL_REWRITE,
      FlinkHepRuleSetProgramBuilder.newBuilder
        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
        .add(FlinkStreamRuleSets.LOGICAL_REWRITE)
        .build())

    chainedProgram
  }
}

執(zhí)行FlinkStreamProgramLogicalOnly即可。注意StreamOptimizeContext內(nèi)需要傳入的上下文信息苏揣,通過各種workaround取得(FunctionCatalog可以在TableEnvironmentImpl內(nèi)增加一個Getter拿到)黄鳍。

val logicalProgram = FlinkStreamProgramLogicalOnly.buildProgram(tableEnvConfig)

val optRelNode = logicalProgram.optimize(relNode, new StreamOptimizeContext {
  override def getTableConfig: TableConfig = tableEnv.getConfig

  override def getFunctionCatalog: FunctionCatalog = tableEnv.asInstanceOf[TableEnvironmentImpl].getFunctionCatalog

  override def getCatalogManager: CatalogManager = tableEnv.asInstanceOf[TableEnvironmentImpl].getCatalogManager

  override def getRexBuilder: RexBuilder = relNode.getCluster.getRexBuilder

  override def getSqlExprToRexConverterFactory: SqlExprToRexConverterFactory =
    relNode.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext]).getSqlExprToRexConverterFactory

  override def isUpdateBeforeRequired: Boolean = false

  override def needFinalTimeIndicatorConversion: Boolean = true

  override def getMiniBatchInterval: MiniBatchInterval = MiniBatchInterval.NONE
})

對比一下優(yōu)化前與優(yōu)化后的RelNode

--- Original RelNode ---
LogicalProject(tss=[FROM_UNIXTIME(/($0, 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss')], userId=[$3], eventType=[$4], siteId=[$8], siteName=[$46])
  LogicalFilter(condition=[>($3, 7)])
    LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{8, 44}])
      LogicalProject(ts=[$0], tss=[$1], tssDay=[$2], userId=[$3], eventType=[$4], columnType=[$5], fromType=[$6], grouponId=[$7], /* ... */, procTime=[PROCTIME()])
        LogicalTableScan(table=[[hive, rtdw_ods, kafka_analytics_access_log_app]], hints=[[[OPTIONS inheritPath:[] options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])
      LogicalFilter(condition=[=(CAST($cor0.siteId):INTEGER, $8)])
        LogicalSnapshot(period=[$cor0.procTime])
          LogicalTableScan(table=[[hive, rtdw_dim, mysql_site_war_zone_mapping_relation]])

--- Optimized RelNode ---
FlinkLogicalCalc(select=[FROM_UNIXTIME(/(ts, 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS tss, userId, eventType, siteId, site_name AS siteName])
  FlinkLogicalJoin(condition=[=($4, $6)], joinType=[left])
    FlinkLogicalCalc(select=[ts, userId, eventType, siteId, CAST(siteId) AS siteId0], where=[>(userId, 7)])
      FlinkLogicalTableSourceScan(table=[[hive, rtdw_ods, kafka_analytics_access_log_app]], fields=[ts, tss, tssDay, userId, eventType, columnType, fromType, grouponId, /* ... */, latitude, longitude], hints=[[[OPTIONS options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])
    FlinkLogicalSnapshot(period=[$cor0.procTime])
      FlinkLogicalCalc(select=[site_name, main_site_id])
        FlinkLogicalTableSourceScan(table=[[hive, rtdw_dim, mysql_site_war_zone_mapping_relation]], fields=[site_id, site_name, site_city_id, /* ... */])

這里需要注意兩個問題。

其一平匈,Calcite中RelMdColumnOrigins這個Handler類里并沒有處理Snapshot類型的RelNode框沟,走fallback邏輯則會對所有非葉子節(jié)點的RelNode返回空,所以默認情況下是拿不到Lookup Join字段的血緣關(guān)系的增炭。我們還需要修改它的源碼忍燥,在遇到Snapshot時繼續(xù)深搜:

public Set<RelColumnOrigin> getColumnOrigins(Snapshot rel,
    RelMetadataQuery mq, int iOutputColumn) {
  return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
}

其二,F(xiàn)link使用的Calcite版本為1.26隙姿,但是該版本不會追蹤派生列(isDerived == true梅垄,例如SUM(col))的血緣。1.27版本修復(fù)了此問題输玷,為避免大版本不兼容队丝,可以將對應(yīng)的issue CALCITE-4251 cherry-pick到內(nèi)部的Calcite 1.26分支上來。當然別忘了重新編譯Calcite Core和Flink Table模塊饲嗽。

最后就可以通過RelMetadataQuery取得結(jié)果表中字段的起源列了炭玫。So easy.

val metadataQuery = optRelNode.getCluster.getMetadataQuery

for (i <- 0 to 4) {
  val origins = metadataQuery.getColumnOrigins(optRelNode, i)
  if (origins != null) {
    for (rco <- origins) {
      val table = rco.getOriginTable
      val tableName = table.getQualifiedName.mkString(".")
      val ordinal = rco.getOriginColumnOrdinal
      val fields = table.getRowType.getFieldNames
      println(Seq(tableName, ordinal, fields.get(ordinal)).mkString("\t"))
    }
  } else {
    println("NULL")
  }
}

/* Outputs:
hive.rtdw_ods.kafka_analytics_access_log_app    0   ts
hive.rtdw_ods.kafka_analytics_access_log_app    3   userId
hive.rtdw_ods.kafka_analytics_access_log_app    4   eventType
hive.rtdw_ods.kafka_analytics_access_log_app    8   siteId
hive.rtdw_dim.mysql_site_war_zone_mapping_relation  1   site_name
*/

上面例子中的SQL語句比較簡單,因此產(chǎn)生的ColumnOrigin也只有單列貌虾⊥碳樱看官可自行用多表JOIN或者有聚合邏輯的SQL來測試,多列ColumnOrigin的情況下也很好用尽狠,免去了自行折騰RelVisitor或者RelShuttle的許多麻煩衔憨。

最后的血緣可視化這一步,普遍采用Neo4j袄膏、JanusGraph等圖數(shù)據(jù)庫承載并展示列血緣關(guān)系的數(shù)據(jù)践图。筆者也正在探索將Flink SQL列級血緣集成到Atlas的方法,進度比較慢沉馆,期望值請勿太高码党。

The End

博客荒廢良久德崭,驚動大佬出面催更,慚愧慚愧揖盘。

受疫情影響眉厨,F(xiàn)FA 2021轉(zhuǎn)為線上,不能面基真可惜(

炒雞感謝會務(wù)組發(fā)來的大禮包~

也歡迎大家屆時光臨本鶸的presentation~

民那晚安晚安兽狭。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末憾股,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子箕慧,更是在濱河造成了極大的恐慌服球,老刑警劉巖,帶你破解...
    沈念sama閱讀 207,248評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件颠焦,死亡現(xiàn)場離奇詭異斩熊,居然都是意外死亡,警方通過查閱死者的電腦和手機蒸健,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評論 2 381
  • 文/潘曉璐 我一進店門座享,熙熙樓的掌柜王于貴愁眉苦臉地迎上來婉商,“玉大人似忧,你說我怎么就攤上這事≌芍龋” “怎么了盯捌?”我有些...
    開封第一講書人閱讀 153,443評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長蘑秽。 經(jīng)常有香客問我饺著,道長,這世上最難降的妖魔是什么肠牲? 我笑而不...
    開封第一講書人閱讀 55,475評論 1 279
  • 正文 為了忘掉前任幼衰,我火速辦了婚禮,結(jié)果婚禮上缀雳,老公的妹妹穿的比我還像新娘渡嚣。我一直安慰自己,他們只是感情好肥印,可當我...
    茶點故事閱讀 64,458評論 5 374
  • 文/花漫 我一把揭開白布识椰。 她就那樣靜靜地躺著,像睡著了一般深碱。 火紅的嫁衣襯著肌膚如雪腹鹉。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,185評論 1 284
  • 那天敷硅,我揣著相機與錄音功咒,去河邊找鬼愉阎。 笑死,一個胖子當著我的面吹牛力奋,可吹牛的內(nèi)容都是我干的诫硕。 我是一名探鬼主播,決...
    沈念sama閱讀 38,451評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼刊侯,長吁一口氣:“原來是場噩夢啊……” “哼章办!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起滨彻,我...
    開封第一講書人閱讀 37,112評論 0 261
  • 序言:老撾萬榮一對情侶失蹤改含,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后承冰,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體子寓,經(jīng)...
    沈念sama閱讀 43,609評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,083評論 2 325
  • 正文 我和宋清朗相戀三年辜羊,在試婚紗的時候發(fā)現(xiàn)自己被綠了踏兜。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,163評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡八秃,死狀恐怖碱妆,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情昔驱,我是刑警寧澤疹尾,帶...
    沈念sama閱讀 33,803評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站骤肛,受9級特大地震影響纳本,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜腋颠,卻給世界環(huán)境...
    茶點故事閱讀 39,357評論 3 307
  • 文/蒙蒙 一繁成、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧淑玫,春花似錦巾腕、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至歌径,卻和暖如春毁嗦,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背回铛。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評論 1 261
  • 我被黑心中介騙來泰國打工狗准, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留克锣,地道東北人。 一個月前我還...
    沈念sama閱讀 45,636評論 2 355
  • 正文 我出身青樓腔长,卻偏偏與公主長得像袭祟,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子捞附,可洞房花燭夜當晚...
    茶點故事閱讀 42,925評論 2 344

推薦閱讀更多精彩內(nèi)容