淺談Flink SQL代碼生成與UDF重復(fù)調(diào)用的優(yōu)化

前言

之前講解Flink SQL執(zhí)行流程時(shí)留下了代碼生成的坑总棵,在百忙之中抽時(shí)間補(bǔ)一補(bǔ)欣簇。

代碼生成簡(jiǎn)介

代碼生成(code generation)是當(dāng)今各種數(shù)據(jù)庫(kù)和數(shù)據(jù)處理引擎廣泛采用的物理執(zhí)行層技術(shù)之一。通過(guò)代碼生成沛申,可以將原本需要解釋執(zhí)行的算子邏輯轉(zhuǎn)為編譯執(zhí)行(二進(jìn)制代碼)宿崭,充分利用JIT編譯的優(yōu)勢(shì)释牺,克服傳統(tǒng)Volcano模型虛函數(shù)調(diào)用過(guò)多愿阐、對(duì)寄存器不友好的缺點(diǎn),在CPU-bound場(chǎng)景下可以獲得大幅的性能提升趾疚。

在大數(shù)據(jù)領(lǐng)域缨历,看官最為熟知的代碼生成應(yīng)用可能就是Spark 2.x的全階段代碼生成(whole-stage code generation)機(jī)制,它也是筆者兩年前介紹過(guò)的Tungsten Project的一部分糙麦。以常見的FILTER -> JOIN -> AGGREGATE流程為例辛孵,全階段代碼生成只需2個(gè)Stage,而傳統(tǒng)Volcano模型則需要9次虛函數(shù)調(diào)用赡磅,如下圖所示魄缚。

關(guān)于Spark的代碼生成,可以參考其源碼或DataBricks的說(shuō)明文章焚廊,不再贅述冶匹。而Flink作為后起之秀,在Flink SQL (Blink Planner)中也采用了類似的思路咆瘟。本文就來(lái)做個(gè)quick tour嚼隘,并提出一個(gè)小而有用的優(yōu)化。

Flink SQL Codegen三要素

CodeGeneratorContext

顧名思義袒餐,CodeGeneratorContext就是代碼生成器的上下文飞蛹,且同一個(gè)CodeGeneratorContext實(shí)例在相互有關(guān)聯(lián)的代碼生成器之間可以共享。它的作用就是維護(hù)代碼生成過(guò)程中的各種能夠重復(fù)使用的邏輯灸眼,包括且不限于:

  • 對(duì)象引用
  • 構(gòu)造代碼卧檐、初始化代碼
  • 常量、成員變量焰宣、局部變量霉囚、時(shí)間變量
  • 函數(shù)體(即Flink Function)及其配套(open()/close()等等)
  • 類型序列化器
  • etc.

具體代碼暫時(shí)不貼,以下是該類的部分結(jié)構(gòu)匕积。

CodeGenerator

Blink Planner的代碼生成器并沒有統(tǒng)一的基類佛嬉。它們的共同點(diǎn)就是類名大多以CodeGenerator為后綴,并且絕大多數(shù)都要與CodeGeneratorContext打交道闸天。它們的類名也都比較self-explanatory暖呕,如下圖所示。注意筆者使用的是Flink 1.13版本苞氮,所以其中還混雜著少量Old Planner的內(nèi)容湾揽,可以無(wú)視之。

挑選幾個(gè)在流計(jì)算場(chǎng)景下比較重點(diǎn)的,稍微解釋一下库物。

  • AggsHandlerCodeGenerator——負(fù)責(zé)生成普通聚合函數(shù)AggsHandleFunction與帶命名空間(即窗口語(yǔ)義)的聚合函數(shù)NamespaceAggsHandleFunction霸旗。注意它們與DataStream API中的聚合函數(shù)AggregateFunction不是一回事,但大致遵循同樣的規(guī)范戚揭。
  • CollectorCodeGenerator——負(fù)責(zé)生成Collector诱告,即算子內(nèi)將流數(shù)據(jù)向下游發(fā)射的組件∶裆梗看官用過(guò)DataStream API的話會(huì)很熟悉精居。
  • ExprCodeGenerator——負(fù)責(zé)根據(jù)Calcite RexNode生成各類表達(dá)式,Planner內(nèi)部用GeneratedExpression來(lái)表示潜必。由于RexNode很多變(字面量靴姿、變量、函數(shù)調(diào)用等等)磁滚,它巧妙地利用了RexVisitor通過(guò)訪問(wèn)者模式來(lái)將不同類型的RexNode翻譯成對(duì)應(yīng)的代碼佛吓。
  • FunctionCodeGenerator——負(fù)責(zé)根據(jù)SQL邏輯生成各類函數(shù),目前支持的有RichMapFunction垂攘、RichFlatMapFunction维雇、RichFlatJoinFunctionRichAsyncFunctionProcessFunction晒他。
  • OperatorCodeGenerator——負(fù)責(zé)生成OneInputStreamOperatorTwoInputStreamOperator谆沃。

代碼生成器一般會(huì)在物理執(zhí)行節(jié)點(diǎn)(即ExecNode)內(nèi)被調(diào)用,但不是所有的Flink SQL邏輯都會(huì)直接走代碼生成仪芒,例如不久前講過(guò)的Window TVF的切片化窗口(參見這里)以及內(nèi)置的Top-N(參見這里)唁影。

GeneratedClass

GeneratedClass用來(lái)描述代碼生成器生成的各類實(shí)體,如函數(shù)掂名、算子等据沈,它們都位于Runtime層,類圖如下饺蔑。

注意這其中并不包括GeneratedExpression锌介,因?yàn)楸磉_(dá)式的概念僅在Planner層存在。

代碼生成示例

Codegen部分的源碼可以說(shuō)是Blink Planner內(nèi)部最為復(fù)雜的猾警,遠(yuǎn)甚于Optimizer部分孔祸,且可讀性較差,有些東西只可意會(huì)不可言傳(笑

為了偷懶便于理解发皿,筆者僅用一條極簡(jiǎn)的SQL語(yǔ)句SELECT COUNT(orderId) FROM rtdw_dwd.kafka_order_done_log WHERE mainSiteId = 10029來(lái)簡(jiǎn)單走一下流程崔慧。

觀察該語(yǔ)句生成的物理執(zhí)行計(jì)劃:

== Optimized Execution Plan ==
GroupAggregate(select=[COUNT(orderId) AS EXPR$0])
+- Exchange(distribution=[single])
   +- Calc(select=[orderId], where=[(mainSiteId = 10029:BIGINT)])
      +- TableSourceScan(table=[[hive, rtdw_dwd, kafka_order_done_log]], fields=[ts, tss, tssDay, orderId, /* ... */])

在這四個(gè)ExecNode中,StreamExecCalcStreamExecGroupAggregate會(huì)涉及代碼生成穴墅。篇幅所限惶室,本文只分析StreamExecCalc温自,它的主要代碼由CalcCodeGenerator#generateProcessCode()方法生成,該方法全文如下皇钞。

  private[flink] def generateProcessCode(
      ctx: CodeGeneratorContext,
      inputType: RowType,
      outRowType: RowType,
      outRowClass: Class[_ <: RowData],
      projection: Seq[RexNode],
      condition: Option[RexNode],
      inputTerm: String = CodeGenUtils.DEFAULT_INPUT1_TERM,
      collectorTerm: String = CodeGenUtils.DEFAULT_OPERATOR_COLLECTOR_TERM,
      eagerInputUnboxingCode: Boolean,
      retainHeader: Boolean = false,
      outputDirectly: Boolean = false,
      allowSplit: Boolean = false): String = {

    // according to the SQL standard, every table function should also be a scalar function
    // but we don't allow that for now
    projection.foreach(_.accept(ScalarFunctionsValidator))
    condition.foreach(_.accept(ScalarFunctionsValidator))

    val exprGenerator = new ExprCodeGenerator(ctx, false)
        .bindInput(inputType, inputTerm = inputTerm)

    val onlyFilter = projection.lengthCompare(inputType.getFieldCount) == 0 &&
      projection.zipWithIndex.forall { case (rexNode, index) =>
        rexNode.isInstanceOf[RexInputRef] && rexNode.asInstanceOf[RexInputRef].getIndex == index
      }

    def produceOutputCode(resultTerm: String): String = if (outputDirectly) {
      s"$collectorTerm.collect($resultTerm);"
    } else {
      s"${OperatorCodeGenerator.generateCollect(resultTerm)}"
    }

    def produceProjectionCode: String = {
      val projectionExprs = projection.map(exprGenerator.generateExpression)
      val projectionExpression = exprGenerator.generateResultExpression(
        projectionExprs,
        outRowType,
        outRowClass,
        allowSplit = allowSplit)

      val projectionExpressionCode = projectionExpression.code

      val header = if (retainHeader) {
        s"${projectionExpression.resultTerm}.setRowKind($inputTerm.getRowKind());"
      } else {
        ""
      }

      s"""
         |$header
         |$projectionExpressionCode
         |${produceOutputCode(projectionExpression.resultTerm)}
         |""".stripMargin
    }

    if (condition.isEmpty && onlyFilter) {
      throw new TableException("This calc has no useful projection and no filter. " +
        "It should be removed by CalcRemoveRule.")
    } else if (condition.isEmpty) { // only projection
      val projectionCode = produceProjectionCode
      s"""
         |${if (eagerInputUnboxingCode) ctx.reuseInputUnboxingCode() else ""}
         |$projectionCode
         |""".stripMargin
    } else {
      val filterCondition = exprGenerator.generateExpression(condition.get)
      // only filter
      if (onlyFilter) {
        s"""
           |${if (eagerInputUnboxingCode) ctx.reuseInputUnboxingCode() else ""}
           |${filterCondition.code}
           |if (${filterCondition.resultTerm}) {
           |  ${produceOutputCode(inputTerm)}
           |}
           |""".stripMargin
      } else { // both filter and projection
        val filterInputCode = ctx.reuseInputUnboxingCode()
        val filterInputSet = Set(ctx.reusableInputUnboxingExprs.keySet.toSeq: _*)

        // if any filter conditions, projection code will enter an new scope
        val projectionCode = produceProjectionCode

        val projectionInputCode = ctx.reusableInputUnboxingExprs
          .filter(entry => !filterInputSet.contains(entry._1))
          .values.map(_.code).mkString("\n")
        s"""
           |${if (eagerInputUnboxingCode) filterInputCode else ""}
           |${filterCondition.code}
           |if (${filterCondition.resultTerm}) {
           |  ${if (eagerInputUnboxingCode) projectionInputCode else ""}
           |  $projectionCode
           |}
           |""".stripMargin
      }
    }
  }

從中可以看出明顯的模擬拼接手寫代碼的過(guò)程悼泌。之前講過(guò),Calc就是ProjectFilter的結(jié)合夹界,該方法的入?yún)⒅星『冒藢?duì)應(yīng)的RexNode

  • projection——類型為RexInputRef馆里,值為$3,即源表中index為3的列orderId可柿。
  • condition——類型為RexCall鸠踪,值為=($32, 10029),即mainSiteId = 10029的謂詞趾痘。

接下來(lái)調(diào)用ExprCodeGenerator.generateExpression()方法慢哈,先生成condition對(duì)應(yīng)的GeneratedExpression蔓钟。借助訪問(wèn)者模式永票,會(huì)轉(zhuǎn)到ExprCodeGenerator#visitCall()方法,最終生成帶空值判斷的完整代碼滥沫。部分調(diào)用棧如下:

generateCallWithStmtIfArgsNotNull:98, GenerateUtils$ (org.apache.flink.table.planner.codegen)
generateCallIfArgsNotNull:67, GenerateUtils$ (org.apache.flink.table.planner.codegen)
generateOperatorIfNotNull:2323, ScalarOperatorGens$ (org.apache.flink.table.planner.codegen.calls)
generateComparison:577, ScalarOperatorGens$ (org.apache.flink.table.planner.codegen.calls)
generateEquals:429, ScalarOperatorGens$ (org.apache.flink.table.planner.codegen.calls)
generateCallExpression:630, ExprCodeGenerator (org.apache.flink.table.planner.codegen)
visitCall:529, ExprCodeGenerator (org.apache.flink.table.planner.codegen)
visitCall:56, ExprCodeGenerator (org.apache.flink.table.planner.codegen)
accept:174, RexCall (org.apache.calcite.rex)
generateExpression:155, ExprCodeGenerator (org.apache.flink.table.planner.codegen)
generateProcessCode:173, CalcCodeGenerator$ (org.apache.flink.table.planner.codegen)
generateCalcOperator:50, CalcCodeGenerator$ (org.apache.flink.table.planner.codegen)
generateCalcOperator:-1, CalcCodeGenerator (org.apache.flink.table.planner.codegen)
translateToPlanInternal:94, CommonExecCalc (org.apache.flink.table.planner.plan.nodes.exec.common)

結(jié)果如下侣集。其中resultTerm是表達(dá)式結(jié)果字段,nullTerm是表達(dá)式是否為空的boolean字段兰绣。后面的編號(hào)是內(nèi)置計(jì)數(shù)器的值世分,防止重復(fù)。

GeneratedExpression(resultTerm = result$3, nullTerm = isNull$2, code = 


isNull$2 = isNull$1 || false;
result$3 = false;
if (!isNull$2) {
  
  result$3 = field$1 == ((long) 10029L);
  
}
, resultType = BOOLEAN, literalValue = None)

看官可能會(huì)覺得生成的代碼比較冗長(zhǎng)缀辩,有些東西沒必要寫臭埋。但是代碼生成器的設(shè)計(jì)目標(biāo)是兼顧通用性和穩(wěn)定性,因此必須保證生成的代碼在各種情況下都可以正確地運(yùn)行臀玄。另外JVM也可以通過(guò)條件編譯瓢阴、公共子表達(dá)式消除、方法內(nèi)聯(lián)等優(yōu)化手段生成最優(yōu)的字節(jié)碼健无,不用過(guò)于擔(dān)心荣恐。

話說(shuō)回來(lái),上文中過(guò)濾條件的輸入filterInputCode是如何通過(guò)CodeGeneratorContext#reuseInputUnboxingCode()重用的呢累贤?別忘了$32也是一個(gè)RexInputRef叠穆,所以遞歸visit到它時(shí)會(huì)調(diào)用GenerateUtils#generateInputAccess()方法生成對(duì)應(yīng)的代碼,即:

isNull$1 = in1.isNullAt(32);
field$1 = -1L;
if (!isNull$1) {
  field$1 = in1.getLong(32);
}

將它拼在filterCondition的前面臼膏,完成硼被。處理projection的流程類似,看官可套用上面的思路自行追蹤渗磅,不再?gòu)U話了祷嘶。

主處理邏輯生成之后屎媳,還需要將它用Function或者Operator承載才能生效。Calc節(jié)點(diǎn)在執(zhí)行層對(duì)應(yīng)的是一個(gè)OneInputStreamOperator论巍,由OperatorCodeGenerator#generateOneInputStreamOperator()負(fù)責(zé)烛谊。從它的代碼可以看到更清晰的輪廓,如下嘉汰。

  def generateOneInputStreamOperator[IN <: Any, OUT <: Any](
      ctx: CodeGeneratorContext,
      name: String,
      processCode: String,
      inputType: LogicalType,
      inputTerm: String = CodeGenUtils.DEFAULT_INPUT1_TERM,
      endInputCode: Option[String] = None,
      lazyInputUnboxingCode: Boolean = false,
      converter: String => String = a => a): GeneratedOperator[OneInputStreamOperator[IN, OUT]] = {
    addReuseOutElement(ctx)
    val operatorName = newName(name)
    val abstractBaseClass = ctx.getOperatorBaseClass
    val baseClass = classOf[OneInputStreamOperator[IN, OUT]]
    val inputTypeTerm = boxedTypeTermForType(inputType)

    val (endInput, endInputImpl) = endInputCode match {
      case None => ("", "")
      case Some(code) =>
        (s"""
           |@Override
           |public void endInput() throws Exception {
           |  ${ctx.reuseLocalVariableCode()}
           |  $code
           |}
         """.stripMargin, s", ${className[BoundedOneInput]}")
    }

    val operatorCode =
      j"""
      public class $operatorName extends ${abstractBaseClass.getCanonicalName}
          implements ${baseClass.getCanonicalName}$endInputImpl {

        private final Object[] references;
        ${ctx.reuseMemberCode()}

        public $operatorName(
            Object[] references,
            ${className[StreamTask[_, _]]} task,
            ${className[StreamConfig]} config,
            ${className[Output[_]]} output,
            ${className[ProcessingTimeService]} processingTimeService) throws Exception {
          this.references = references;
          ${ctx.reuseInitCode()}
          this.setup(task, config, output);
          if (this instanceof ${className[AbstractStreamOperator[_]]}) {
            ((${className[AbstractStreamOperator[_]]}) this)
              .setProcessingTimeService(processingTimeService);
          }
        }

        @Override
        public void open() throws Exception {
          super.open();
          ${ctx.reuseOpenCode()}
        }

        @Override
        public void processElement($STREAM_RECORD $ELEMENT) throws Exception {
          $inputTypeTerm $inputTerm = ($inputTypeTerm) ${converter(s"$ELEMENT.getValue()")};
          ${ctx.reusePerRecordCode()}
          ${ctx.reuseLocalVariableCode()}
          ${if (lazyInputUnboxingCode) "" else ctx.reuseInputUnboxingCode()}
          $processCode
        }

        $endInput

        @Override
        public void close() throws Exception {
           super.close();
          ${ctx.reuseCloseCode()}
        }

        ${ctx.reuseInnerClassDefinitionCode()}
      }
    """.stripMargin

    LOG.debug(s"Compiling OneInputStreamOperator Code:\n$name")
    new GeneratedOperator(operatorName, operatorCode, ctx.references.toArray)
  }

仍然注意那些能夠通過(guò)CodeGeneratorContext復(fù)用的內(nèi)容丹禀,例如processElement()方法中的本地變量聲明部分,可以通過(guò)reuseLocalVariableCode()取得鞋怀。最終的生成結(jié)果比較冗長(zhǎng)双泪,看官可通過(guò)Pastebin的傳送門查看,并與上面的框架對(duì)應(yīng)密似。

另外焙矛,如果不想每次都通過(guò)Debug查看生成的代碼,可在Log4j配置文件內(nèi)加入以下兩行残腌。

logger.codegen.name = org.apache.flink.table.runtime.generated
logger.codegen.level = DEBUG

這樣村斟,在生成代碼被編譯的時(shí)候,就會(huì)輸出其內(nèi)容抛猫。當(dāng)GeneratedClass被首次實(shí)例化時(shí)蟆盹,就會(huì)調(diào)用Janino進(jìn)行動(dòng)態(tài)編譯,并將結(jié)果緩存在一個(gè)內(nèi)部Cache中闺金,避免重復(fù)編譯逾滥。可通過(guò)查看o.a.f.table.runtime.generated.CompileUtils及其上下文獲得更多信息败匹。

UDF表達(dá)式重用(FLINK-21573)

UDF重復(fù)調(diào)用的問(wèn)題在某些情況下可能會(huì)對(duì)Flink SQL用戶造成困擾寨昙,例如下面的SQL語(yǔ)句:

SELECT
  mp['eventType'] AS eventType,
  mp['fromType'] AS fromType,
  mp['columnType'] AS columnType
  -- A LOT OF other columns...
FROM (
  SELECT SplitQueryParamsAsMap(query_string) AS mp
  FROM rtdw_ods.kafka_analytics_access_log_app
  WHERE CHAR_LENGTH(query_string) > 1
);

假設(shè)從Map中取N個(gè)key對(duì)應(yīng)的value,自定義函數(shù)SplitQueryParamsAsMap就會(huì)被調(diào)用N次掀亩,這顯然是不符合常理的——對(duì)于一個(gè)確定的輸入query_string舔哪,該UDF的輸出就是確定的,沒有必要每次都調(diào)用归榕。如果UDF包含計(jì)算密集型的邏輯尸红,整個(gè)作業(yè)的性能就會(huì)受到很大影響。

如何解決呢刹泄?通過(guò)挖掘代碼外里,可以得知源頭在于Calcite重寫查詢時(shí)不會(huì)考慮函數(shù)的確定性(determinism),也就是說(shuō)FunctionDefinition#isDeterministic()沒有起到應(yīng)有的作用特石≈鸦龋考慮到直接改動(dòng)Calcite難度較大且容易引起兼容性問(wèn)題,我們考慮在SQL執(zhí)行前的最后一步——也就是代碼生成階段來(lái)施工姆蘸。

觀察調(diào)用UDF生成的代碼墩莫,如下芙委。

    externalResult$8 = (java.util.Map) function_com$sht$bigdata$rt$udf$scalar$SplitQueryParamsAsMap$5cccfdc891a58463898db753288ed577
      .eval(isNull$0 ? null : ((java.lang.String) converter$7.toExternal((org.apache.flink.table.data.binary.BinaryStringData) field$2)));
    isNull$10 = externalResult$8 == null;
    result$10 = null;
    if (!isNull$10) {
      result$10 = (org.apache.flink.table.data.MapData) converter$9.toInternalOrNull((java.util.Map) externalResult$8);
    }

    // ......

    externalResult$24 = (java.util.Map) function_com$sht$bigdata$rt$udf$scalar$SplitQueryParamsAsMap$5cccfdc891a58463898db753288ed577
      .eval(isNull$0 ? null : ((java.lang.String) converter$7.toExternal((org.apache.flink.table.data.binary.BinaryStringData) field$2)));
    isNull$25 = externalResult$24 == null;
    result$25 = null;
    if (!isNull$25) {
      result$25 = (org.apache.flink.table.data.MapData) converter$9.toInternalOrNull((java.util.Map) externalResult$24);
    }

因此,我們可以在UDF滿足確定性的前提下狂秦,重用UDF表達(dá)式產(chǎn)生的結(jié)果灌侣,即形如externalResult$8的term。思路比較直接裂问,首先在CodeGeneratorContext中添加可重用的UDF表達(dá)式及其result term的容器侧啼,以及對(duì)應(yīng)的方法。代碼如下堪簿。

  private val reusableScalarFuncExprs: mutable.Map[String, String] =
    mutable.Map[String, String]()

  private val reusableResultTerms: mutable.Map[String, String] =
    mutable.Map[String, String]()

  def addReusableScalarFuncExpr(code: String, term: String): Unit = {
    if (!reusableScalarFuncExprs.contains(code)) {
      reusableScalarFuncExprs.put(code, term)
    }
  }

  def addReusableResultTerm(term: String, originalTerm: String): Unit = {
    if (!reusableResultTerms.contains(term)) {
      reusableResultTerms.put(term, originalTerm);
    }
  }

  def reuseScalarFuncExpr(code: String) : String = {
    reusableScalarFuncExprs.getOrElse(code, code)
  }

  def reuseResultTerm(term: String) : String = {
    reusableResultTerms.getOrElse(term, term)
  }

注意在保存UDF表達(dá)式時(shí)痊乾,是以生成的代碼為key,result term為value椭更。保存result term的映射時(shí)哪审,是以新的為key,舊的為value虑瀑。

然后從ExprCodeGenerator入手(函數(shù)調(diào)用都屬于RexCall)湿滓,找到UDF代碼生成的方法,即BridgingFunctionGenUtil#generateScalarFunctionCall()缴川,做如下改動(dòng)茉稠。

  private def generateScalarFunctionCall(
      ctx: CodeGeneratorContext,
      functionTerm: String,
      externalOperands: Seq[GeneratedExpression],
      outputDataType: DataType,
      isDeterministic: Boolean)
    : GeneratedExpression = {

    // result conversion
    val externalResultClass = outputDataType.getConversionClass
    val externalResultTypeTerm = typeTerm(externalResultClass)
    // Janino does not fully support the JVM spec:
    // boolean b = (boolean) f(); where f returns Object
    // This is not supported and we need to box manually.
    val externalResultClassBoxed = primitiveToWrapper(externalResultClass)
    val externalResultCasting = if (externalResultClass == externalResultClassBoxed) {
      s"($externalResultTypeTerm)"
    } else {
      s"($externalResultTypeTerm) (${typeTerm(externalResultClassBoxed)})"
    }
    val externalResultTerm = ctx.addReusableLocalVariable(externalResultTypeTerm, "externalResult")

    if (isDeterministic) {
      val funcEvalCode =
        s"""
           |$externalResultCasting $functionTerm
           |  .$SCALAR_EVAL(${externalOperands.map(_.resultTerm).map(ctx.reuseResultTerm).mkString(", ")});
           |""".stripMargin

      val reusableFuncExpr = ctx.reuseScalarFuncExpr(funcEvalCode)
      if (!reusableFuncExpr.equals(funcEvalCode)) {
        ctx.addReusableResultTerm(externalResultTerm, reusableFuncExpr)
      }

      ctx.addReusableScalarFuncExpr(funcEvalCode, externalResultTerm)

      val internalExpr = genToInternalConverterAll(ctx, outputDataType, externalResultTerm)

      // function call
      internalExpr.copy(code =
        s"""
           |${externalOperands.map(_.code).mkString("\n")}
           |$externalResultTerm = $reusableFuncExpr;
           |${internalExpr.code}
           |""".stripMargin)
    } else {
      val internalExpr = genToInternalConverterAll(ctx, outputDataType, externalResultTerm)

      // function call
      internalExpr.copy(code =
        s"""
           |${externalOperands.map(_.code).mkString("\n")}
           |$externalResultTerm = $externalResultCasting $functionTerm
           |  .$SCALAR_EVAL(${externalOperands.map(_.resultTerm).mkString(", ")});
           |${internalExpr.code}
           |""".stripMargin)
    }
  }

if (isDeterministic)塊內(nèi)的代碼實(shí)現(xiàn)了UDF表達(dá)式重用描馅,即重用生成的第一個(gè)result term把夸。筆者就不多解釋了,畢竟與上一節(jié)的相比已經(jīng)算是很好理解了(笑

重新編譯flink-table模塊并執(zhí)行相同的SQL铭污,就會(huì)發(fā)現(xiàn)生成的代碼發(fā)生了變化:

    externalResult$8 = (java.util.Map) function_com$sht$bigdata$rt$udf$scalar$SplitQueryParamsAsMap$5cccfdc891a58463898db753288ed577
      .eval(isNull$0 ? null : ((java.lang.String) converter$7.toExternal((org.apache.flink.table.data.binary.BinaryStringData) field$2)));
    isNull$10 = externalResult$8 == null;
    result$10 = null;
    if (!isNull$10) {
      result$10 = (org.apache.flink.table.data.MapData) converter$9.toInternalOrNull((java.util.Map) externalResult$8);
    }
    
    // ......

    externalResult$24 = externalResult$8;
    isNull$25 = externalResult$24 == null;
    result$25 = null;
    if (!isNull$25) {
      result$25 = (org.apache.flink.table.data.MapData) converter$9.toInternalOrNull((java.util.Map) externalResult$24);
    }

大功告成恋日。

The End

本文是用新機(jī)器正經(jīng)寫的第一篇博客。M1 yyds(

準(zhǔn)備FFA的演講稿去了嘹狞。

民那晚安晚安岂膳。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市磅网,隨后出現(xiàn)的幾起案子谈截,更是在濱河造成了極大的恐慌,老刑警劉巖涧偷,帶你破解...
    沈念sama閱讀 221,695評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件簸喂,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡燎潮,警方通過(guò)查閱死者的電腦和手機(jī)喻鳄,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)确封,“玉大人除呵,你說(shuō)我怎么就攤上這事再菊。” “怎么了颜曾?”我有些...
    開封第一講書人閱讀 168,130評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵纠拔,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我泛豪,道長(zhǎng)绿语,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,648評(píng)論 1 297
  • 正文 為了忘掉前任候址,我火速辦了婚禮吕粹,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘岗仑。我一直安慰自己匹耕,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,655評(píng)論 6 397
  • 文/花漫 我一把揭開白布荠雕。 她就那樣靜靜地躺著稳其,像睡著了一般。 火紅的嫁衣襯著肌膚如雪炸卑。 梳的紋絲不亂的頭發(fā)上既鞠,一...
    開封第一講書人閱讀 52,268評(píng)論 1 309
  • 那天,我揣著相機(jī)與錄音盖文,去河邊找鬼嘱蛋。 笑死,一個(gè)胖子當(dāng)著我的面吹牛五续,可吹牛的內(nèi)容都是我干的洒敏。 我是一名探鬼主播,決...
    沈念sama閱讀 40,835評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼疙驾,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼凶伙!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起它碎,我...
    開封第一講書人閱讀 39,740評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤函荣,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后扳肛,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體傻挂,經(jīng)...
    沈念sama閱讀 46,286評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,375評(píng)論 3 340
  • 正文 我和宋清朗相戀三年敞峭,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了踊谋。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,505評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡旋讹,死狀恐怖殖蚕,靈堂內(nèi)的尸體忽然破棺而出轿衔,到底是詐尸還是另有隱情,我是刑警寧澤睦疫,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布害驹,位于F島的核電站,受9級(jí)特大地震影響蛤育,放射性物質(zhì)發(fā)生泄漏宛官。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,873評(píng)論 3 333
  • 文/蒙蒙 一瓦糕、第九天 我趴在偏房一處隱蔽的房頂上張望底洗。 院中可真熱鬧,春花似錦咕娄、人聲如沸亥揖。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)费变。三九已至,卻和暖如春圣贸,著一層夾襖步出監(jiān)牢的瞬間挚歧,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工吁峻, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留滑负,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,921評(píng)論 3 376
  • 正文 我出身青樓锡搜,卻偏偏與公主長(zhǎng)得像橙困,于是被迫代替她去往敵國(guó)和親瞧掺。 傳聞我的和親對(duì)象是個(gè)殘疾皇子耕餐,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,515評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容