一文搞懂Flink SQL執(zhí)行過程

背景

學(xué)習(xí)了 apache calcite末贾,基本上把 apache calcite 的官網(wǎng)看了一遍凝颇,也寫了幾個小例子筐钟,現(xiàn)在該分析一下 Flink SQL 的執(zhí)行過程了揩瞪,其中關(guān)于 apache calcite 的部分不深究,因為 apache calcite 有些復(fù)雜篓冲,真的要了解清楚需要大量時間李破,本次還是聚焦 Flink.

正文

以 SQL Query 為例 select a.* from a join b on a.id=b.id

sql query 入口方法

// sql query 入口方法
  override def sqlQuery(query: String): Table = {
  // 最后生成是 PlannerQueryOperation,也就是 Flink 算子
    val operations = parser.parse(query)

    if (operations.size != 1) throw new ValidationException(
      "Unsupported SQL query! sqlQuery() only accepts a single SQL query.")

    operations.get(0) match {
      case op: QueryOperation if !op.isInstanceOf[ModifyOperation] =>
        createTable(op)
      case _ => throw new ValidationException(
        "Unsupported SQL query! sqlQuery() only accepts a single SQL query of type " +
          "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.")
    }
  }

接下來具體看一下如何生成 PlannerQueryOperation
首先生成 SqlNode

@Override
    public List<Operation> parse(String statement) {
        CalciteParser parser = calciteParserSupplier.get();
        FlinkPlannerImpl planner = validatorSupplier.get();
        // parse the sql query
        //SQL 解析階段,生成AST(抽象語法樹)壹将,作用是SQL–>SqlNode
        SqlNode parsed = parser.parse(statement);

        Operation operation =
                SqlToOperationConverter.convert(planner, catalogManager, parsed)
                        .orElseThrow(() -> new TableException("Unsupported query: " + statement));
        return Collections.singletonList(operation);
    }

然后將 SqlNode 轉(zhuǎn)化為 RelNode

 private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) {
        // transform to a relational tree
        //語義分析嗤攻,生成邏輯計劃,作用是SqlNode–>RelNode
        RelRoot relational = planner.rel(validated);
        return new PlannerQueryOperation(relational.project());
    }

在轉(zhuǎn)化 RelNode 的過程會诽俯,基于 Flink 定制的優(yōu)化規(guī)則以及 calcite 自身的一些規(guī)則

/**
  * Support all joins.  Flink定制的優(yōu)化rules
  */
private class FlinkLogicalJoinConverter
  extends ConverterRule(
    classOf[LogicalJoin],
    Convention.NONE,
    FlinkConventions.LOGICAL,
    "FlinkLogicalJoinConverter") {

  override def convert(rel: RelNode): RelNode = {
    val join = rel.asInstanceOf[LogicalJoin]
    val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL)
    val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL)
    FlinkLogicalJoin.create(newLeft, newRight, join.getCondition, join.getJoinType)
  }
}

生成 物理執(zhí)行計劃妇菱,對應(yīng)的都是 RelNode

class StreamExecJoinRule
  extends RelOptRule(
    ......

  // 基于Flink rules 將optimized LogicalPlan轉(zhuǎn)成 Flink 物理執(zhí)行計劃
  override def onMatch(call: RelOptRuleCall): Unit = {
    val join: FlinkLogicalJoin = call.rel(0)
    val left = join.getLeft
    val right = join.getRight

    def toHashTraitByColumns(
        columns: util.Collection[_ <: Number],
        inputTraitSets: RelTraitSet): RelTraitSet = {
      val distribution = if (columns.isEmpty) {
        FlinkRelDistribution.SINGLETON
      } else {
        FlinkRelDistribution.hash(columns)
      }
      inputTraitSets
        .replace(FlinkConventions.STREAM_PHYSICAL)
        .replace(distribution)
    }

    val joinInfo = join.analyzeCondition()
    val (leftRequiredTrait, rightRequiredTrait) = (
      toHashTraitByColumns(joinInfo.leftKeys, left.getTraitSet),
      toHashTraitByColumns(joinInfo.rightKeys, right.getTraitSet))

    val providedTraitSet = join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)

    val newLeft: RelNode = RelOptRule.convert(left, leftRequiredTrait)
    val newRight: RelNode = RelOptRule.convert(right, rightRequiredTrait)

    // Stream physical RelNode,物理執(zhí)行計劃
    val newJoin = new StreamExecJoin(
      join.getCluster,
      providedTraitSet,
      newLeft,
      newRight,
      join.getCondition,
      join.getJoinType)
    call.transformTo(newJoin)
  }
}

然后通過 translateToPlanInternal 生成 Flink 算子

class StreamExecJoin(
    cluster: RelOptCluster,
    traitSet: RelTraitSet,
    leftRel: RelNode,
    rightRel: RelNode,
    condition: RexNode,
    joinType: JoinRelType)
  extends CommonPhysicalJoin(cluster, traitSet, leftRel, rightRel, condition, joinType)
  with StreamPhysicalRel
  with StreamExecNode[RowData] {
  ......

  //  作用是生成 StreamOperator, 即Flink算子
  override protected def translateToPlanInternal(
      planner: StreamPlanner): Transformation[RowData] = {

    val tableConfig = planner.getTableConfig
    val returnType = InternalTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType))

    val leftTransform = getInputNodes.get(0).translateToPlan(planner)
      .asInstanceOf[Transformation[RowData]]
    val rightTransform = getInputNodes.get(1).translateToPlan(planner)
      .asInstanceOf[Transformation[RowData]]

    val leftType = leftTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]
    val rightType = rightTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]

    val (leftJoinKey, rightJoinKey) =
      JoinUtil.checkAndGetJoinKeys(keyPairs, getLeft, getRight, allowEmptyKey = true)

    val leftSelect = KeySelectorUtil.getRowDataSelector(leftJoinKey, leftType)
    val rightSelect = KeySelectorUtil.getRowDataSelector(rightJoinKey, rightType)

    val leftInputSpec = analyzeJoinInput(left)
    val rightInputSpec = analyzeJoinInput(right)

    val generatedCondition = JoinUtil.generateConditionFunction(
      tableConfig,
      cluster.getRexBuilder,
      getJoinInfo,
      leftType.toRowType,
      rightType.toRowType)

    val minRetentionTime = tableConfig.getMinIdleStateRetentionTime

    val operator = if (joinType == JoinRelType.ANTI || joinType == JoinRelType.SEMI) {
      new StreamingSemiAntiJoinOperator(
        joinType == JoinRelType.ANTI,
        leftType,
        rightType,
        generatedCondition,
        leftInputSpec,
        rightInputSpec,
        filterNulls,
        minRetentionTime)
    } else {
      val leftIsOuter = joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL
      val rightIsOuter = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL
      new StreamingJoinOperator(
        leftType,
        rightType,
        generatedCondition,
        leftInputSpec,
        rightInputSpec,
        leftIsOuter,
        rightIsOuter,
        filterNulls,
        minRetentionTime)
    }

    val ret = new TwoInputTransformation[RowData, RowData, RowData](
      leftTransform,
      rightTransform,
      getRelDetailedDescription,
      operator,
      returnType,
      leftTransform.getParallelism)

    if (inputsContainSingleton()) {
      ret.setParallelism(1)
      ret.setMaxParallelism(1)
    }

    // set KeyType and Selector for state
    ret.setStateKeySelectors(leftSelect, rightSelect)
    ret.setStateKeyType(leftSelect.getProducedType)
    ret
  }

算子執(zhí)行方式

public class StreamingJoinOperator extends AbstractStreamingJoinOperator {

    private static final long serialVersionUID = -376944622236540545L;

    // whether left side is outer side, e.g. left is outer but right is not when LEFT OUTER JOIN
    private final boolean leftIsOuter;
    // whether right side is outer side, e.g. right is outer but left is not when RIGHT OUTER JOIN
    private final boolean rightIsOuter;

    private transient JoinedRowData outRow;
    private transient RowData leftNullRow;
    private transient RowData rightNullRow;

    // left join state
    private transient JoinRecordStateView leftRecordStateView;
    // right join state
    private transient JoinRecordStateView rightRecordStateView;

    public StreamingJoinOperator(
            InternalTypeInfo<RowData> leftType,
            InternalTypeInfo<RowData> rightType,
            GeneratedJoinCondition generatedJoinCondition,
            JoinInputSideSpec leftInputSideSpec,
            JoinInputSideSpec rightInputSideSpec,
            boolean leftIsOuter,
            boolean rightIsOuter,
            boolean[] filterNullKeys,
            long stateRetentionTime) {
       ......

    @Override
    public void processElement1(StreamRecord<RowData> element) throws Exception {
        processElement(element.getValue(), leftRecordStateView, rightRecordStateView, true);
    }

    @Override
    public void processElement2(StreamRecord<RowData> element) throws Exception {
        processElement(element.getValue(), rightRecordStateView, leftRecordStateView, false);
    }

也是 join 最終執(zhí)行的地方

總結(jié)

sql 解析,生成抽象語法樹惊畏,由SQL---> SqlNode恶耽,然后進行語義分析,生成 Logical Plan, SqlNode---->RelNode 未經(jīng)過優(yōu)化的 RelNode ----> 應(yīng)用 Flink 定制的一些優(yōu)化 rule颜启,優(yōu)化 Logical Plan
----> 轉(zhuǎn)化為物理執(zhí)行計劃 Stream physical RelNode -----> 生成 StreamOperator Flink 算子
----> 算子執(zhí)行

本文的主要目的是在大方向上明白 Flink SQL 的解析過程偷俭,具體細(xì)節(jié)讀者感興趣可以自行深入研究

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市缰盏,隨后出現(xiàn)的幾起案子涌萤,更是在濱河造成了極大的恐慌,老刑警劉巖口猜,帶你破解...
    沈念sama閱讀 216,496評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件负溪,死亡現(xiàn)場離奇詭異,居然都是意外死亡济炎,警方通過查閱死者的電腦和手機川抡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來须尚,“玉大人崖堤,你說我怎么就攤上這事∧痛玻” “怎么了密幔?”我有些...
    開封第一講書人閱讀 162,632評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長撩轰。 經(jīng)常有香客問我胯甩,道長昧廷,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,180評論 1 292
  • 正文 為了忘掉前任偎箫,我火速辦了婚禮木柬,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘淹办。我一直安慰自己弄诲,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,198評論 6 388
  • 文/花漫 我一把揭開白布娇唯。 她就那樣靜靜地躺著齐遵,像睡著了一般。 火紅的嫁衣襯著肌膚如雪塔插。 梳的紋絲不亂的頭發(fā)上梗摇,一...
    開封第一講書人閱讀 51,165評論 1 299
  • 那天,我揣著相機與錄音想许,去河邊找鬼伶授。 笑死,一個胖子當(dāng)著我的面吹牛流纹,可吹牛的內(nèi)容都是我干的糜烹。 我是一名探鬼主播,決...
    沈念sama閱讀 40,052評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼漱凝,長吁一口氣:“原來是場噩夢啊……” “哼疮蹦!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起茸炒,我...
    開封第一講書人閱讀 38,910評論 0 274
  • 序言:老撾萬榮一對情侶失蹤愕乎,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后壁公,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體感论,經(jīng)...
    沈念sama閱讀 45,324評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,542評論 2 332
  • 正文 我和宋清朗相戀三年紊册,在試婚紗的時候發(fā)現(xiàn)自己被綠了比肄。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,711評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡囊陡,死狀恐怖芳绩,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情关斜,我是刑警寧澤示括,帶...
    沈念sama閱讀 35,424評論 5 343
  • 正文 年R本政府宣布铺浇,位于F島的核電站痢畜,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜丁稀,卻給世界環(huán)境...
    茶點故事閱讀 41,017評論 3 326
  • 文/蒙蒙 一吼拥、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧线衫,春花似錦凿可、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,668評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至白热,卻和暖如春敛助,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背屋确。 一陣腳步聲響...
    開封第一講書人閱讀 32,823評論 1 269
  • 我被黑心中介騙來泰國打工纳击, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人攻臀。 一個月前我還...
    沈念sama閱讀 47,722評論 2 368
  • 正文 我出身青樓焕数,卻偏偏與公主長得像,于是被迫代替她去往敵國和親刨啸。 傳聞我的和親對象是個殘疾皇子堡赔,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,611評論 2 353

推薦閱讀更多精彩內(nèi)容