Flink自定義函數(shù)實(shí)現(xiàn)

鑒于自定義函數(shù)函數(shù)在SQL中的強(qiáng)大語(yǔ)義崖咨,在SQL中有十分廣泛的應(yīng)用探膊。Flink在其Table/SQL API中同樣支持自定義函數(shù)廷痘,且根據(jù)Flink Forward Asia 2019的規(guī)劃酿箭,在后續(xù)flink版本中馒铃,自定義函數(shù)將支持python語(yǔ)言以及兼容Hive的自定義函數(shù). 本文簡(jiǎn)要介紹Flink中的UDF支持及實(shí)現(xiàn)师崎。

自定義函數(shù)支持類型

Flink支持的自定義函數(shù)包括UDF,UDTF,UDAF等能力咐柜,主要通過(guò)繼承UserDefinedFunction师骗,定義相關(guān)方法黔龟,通過(guò)自動(dòng)代碼生成技術(shù)(Code Generation)生成輔助類妇智,完成對(duì)方法的調(diào)用。

自定義函數(shù) 功能 實(shí)現(xiàn)類
UDF 對(duì)字段進(jìn)行轉(zhuǎn)義捌锭,輸入一條記錄俘陷,返回一條記錄 ScalarFunction
UDTF 輸入一條記錄,返回一條/多條記錄 TableFunction
UDAF 輸入一條/多條記錄观谦,返回一條記錄 AggregateFunction

自定義函數(shù)實(shí)現(xiàn)

UDF實(shí)現(xiàn)

Flink中通過(guò)ScalarFunction標(biāo)量來(lái)支持UDF的實(shí)現(xiàn)拉盾。應(yīng)用只需繼承ScalarFunction,在其中定義eval函數(shù)即可豁状。

其應(yīng)用較為簡(jiǎn)單捉偏,其實(shí)現(xiàn)是通過(guò)Code Generation技術(shù)自動(dòng)生成相關(guān)代碼,完成對(duì)eval函數(shù)的調(diào)用泻红。 在ProcessOperator中夭禽,其function即為自動(dòng)生成的函數(shù),在其processElement方法中谊路,會(huì)調(diào)用我們自定義的eval方法讹躯,完成相關(guān)的轉(zhuǎn)化邏輯。

一個(gè)簡(jiǎn)單的UDF函數(shù)如下:

public class ToUpperCase extends ScalarFunction {

    public String eval(String s) {

        return s.toUpperCase();
    }
}

調(diào)用堆棧如下圖所示:

UDF堆棧

自動(dòng)代碼生產(chǎn)技術(shù)自動(dòng)生成的function源碼如下:

public class DataStreamCalcRule$19 extends org.apache.flink.streaming.api.functions.ProcessFunction {

    final com.cyq.learning.udf.function.ToUpperCase function_com$cyq$learning$udf$function$ToUpperCase$078037202ef3ddcbc4f27e57d323d0fd;


    final org.apache.flink.types.Row out =
            new org.apache.flink.types.Row(4);


    public DataStreamCalcRule$19() throws Exception {

        function_com$cyq$learning$udf$function$ToUpperCase$078037202ef3ddcbc4f27e57d323d0fd = (com.cyq.learning.udf.function.ToUpperCase)
                org.apache.flink.table.utils.EncodingUtils.decodeStringToObject(
                        "rO0ABXNyACljb20uY3lxLmxlYXJuaW5nLnVkZi5mdW5jdGlvbi5Ub1VwcGVyQ2FzZViZ_w3c2xj-AgAAeHIAL29yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLlNjYWxhckZ1bmN0aW9uygzW306qntsCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvboP22EvVTaRLAgAAeHA",
                        org.apache.flink.table.functions.UserDefinedFunction.class);

    }

    @Override
    public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {

        function_com$cyq$learning$udf$function$ToUpperCase$078037202ef3ddcbc4f27e57d323d0fd.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));


    }

    @Override
    public void processElement(Object _in1, org.apache.flink.streaming.api.functions.ProcessFunction.Context ctx, org.apache.flink.util.Collector c) throws Exception {
        org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) _in1;

        boolean isNull$18 = (java.lang.Long) in1.getField(2) == null;
        long result$17;
        if (isNull$18) {
            result$17 = -1L;
        } else {
            result$17 = (java.lang.Long) in1.getField(2);
        }


        boolean isNull$11 = (java.lang.String) in1.getField(0) == null;
        java.lang.String result$10;
        if (isNull$11) {
            result$10 = "";
        } else {
            result$10 = (java.lang.String) (java.lang.String) in1.getField(0);
        }


        boolean isNull$13 = (java.lang.String) in1.getField(1) == null;
        java.lang.String result$12;
        if (isNull$13) {
            result$12 = "";
        } else {
            result$12 = (java.lang.String) (java.lang.String) in1.getField(1);
        }


        if (isNull$11) {
            out.setField(0, null);
        } else {
            out.setField(0, result$10);
        }


        if (isNull$13) {
            out.setField(1, null);
        } else {
            out.setField(1, result$12);
        }


        java.lang.String result$14 = function_com$cyq$learning$udf$function$ToUpperCase$078037202ef3ddcbc4f27e57d323d0fd.eval(
                isNull$13 ? null : (java.lang.String) result$12);


        boolean isNull$16 = result$14 == null;
        java.lang.String result$15;
        if (isNull$16) {
            result$15 = "";
        } else {
            result$15 = (java.lang.String) result$14;
        }


        if (isNull$16) {
            out.setField(2, null);
        } else {
            out.setField(2, result$15);
        }


        if (isNull$18) {
            out.setField(3, null);
        } else {
            out.setField(3, result$17);
        }

        c.collect(out);

    }

    @Override
    public void close() throws Exception {

        function_com$cyq$learning$udf$function$ToUpperCase$078037202ef3ddcbc4f27e57d323d0fd.close();


    }
}

UDTF實(shí)現(xiàn)

Flink的UDTF函數(shù)實(shí)現(xiàn)通過(guò)繼承TableFunction完成缠劝,其完成一行拆分成多行的核心在于TableFunction的collect方法潮梯,該方法通過(guò)調(diào)用collector的collect方法可以將消息發(fā)送至下游operator,當(dāng)接收到一條消息后惨恭,可以對(duì)消息拆分秉馏,然后將拆分后的多條消息分別發(fā)送至下游operator即可完成一條輸入,多條輸出的目標(biāo)脱羡。

一個(gè)簡(jiǎn)單的UDTF實(shí)現(xiàn)如下:

public class SplitFun extends TableFunction<Tuple2<String, Integer>> {

    private String separator = ",";
    public SplitFun(String separator) {
        this.separator = separator;
    }
    public void eval(String str) {
        for (String s : str.split(separator)) {
            collect(new Tuple2<String, Integer>(s, s.length()));
        }
    }
}

堆棧調(diào)用圖如下:


UDTF堆棧

其自動(dòng)代碼生成代包含兩個(gè)類(TableFunctionCollector$42 和DataStreamCorrelateRule$28)萝究,其中TableFunctionColletor即為將消息發(fā)送至下游operator的輔助collector, DataStreamCorrelateRule則是通過(guò)其processElement完成對(duì)TableFunction(SplitFun)的調(diào)用免都。

  • DataStreamCorrelateRule$28源碼如下:

    public class DataStreamCorrelateRule$28 extends org.apache.flink.streaming.api.functions.ProcessFunction {
    
        final org.apache.flink.table.runtime.TableFunctionCollector instance_org$apache$flink$table$runtime$TableFunctionCollector$22;
    
        final com.cyq.learning.udf.function.SplitFun function_com$cyq$learning$udf$function$SplitFun$f9ab41da087dbe9d50d61478f4bc638d;
    
    
        final org.apache.flink.types.Row out =
                new org.apache.flink.types.Row(6);
    
    
        public DataStreamCorrelateRule$28() throws Exception {
    
            function_com$cyq$learning$udf$function$SplitFun$f9ab41da087dbe9d50d61478f4bc638d = (com.cyq.learning.udf.function.SplitFun)
                    org.apache.flink.table.utils.EncodingUtils.decodeStringToObject(
                            "rO0ABXNyACZjb20uY3lxLmxlYXJuaW5nLnVkZi5mdW5jdGlvbi5TcGxpdEZ1bl5QHqGTdoFOAgABTAAJc2VwYXJhdG9ydAASTGphdmEvbGFuZy9TdHJpbmc7eHIALm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLlRhYmxlRnVuY3Rpb27qA9IDIKYg-gIAAUwACWNvbGxlY3RvcnQAIUxvcmcvYXBhY2hlL2ZsaW5rL3V0aWwvQ29sbGVjdG9yO3hyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9ug_bYS9VNpEsCAAB4cHB0AAFA",
                            org.apache.flink.table.functions.UserDefinedFunction.class);
    
    
        }
    
    
        public DataStreamCorrelateRule$28(org.apache.flink.table.runtime.TableFunctionCollector arg0) throws Exception {
            this();
            instance_org$apache$flink$table$runtime$TableFunctionCollector$22 = arg0;
    
        }
    
    
        @Override
        public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
    
            function_com$cyq$learning$udf$function$SplitFun$f9ab41da087dbe9d50d61478f4bc638d.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
    
    
        }
    
        @Override
        public void processElement(Object _in1, org.apache.flink.streaming.api.functions.ProcessFunction.Context ctx, org.apache.flink.util.Collector c) throws Exception {
            org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) _in1;
    
            boolean isNull$15 = (java.lang.Long) in1.getField(2) == null;
            long result$14;
            if (isNull$15) {
                result$14 = -1L;
            } else {
                result$14 = (java.lang.Long) in1.getField(2);
            }
    
    
            boolean isNull$11 = (java.lang.String) in1.getField(0) == null;
            java.lang.String result$10;
            if (isNull$11) {
                result$10 = "";
            } else {
                result$10 = (java.lang.String) (java.lang.String) in1.getField(0);
            }
    
    
            boolean isNull$13 = (java.lang.String) in1.getField(1) == null;
            java.lang.String result$12;
            if (isNull$13) {
                result$12 = "";
            } else {
                result$12 = (java.lang.String) (java.lang.String) in1.getField(1);
            }
    
    
            boolean isNull$17 = (java.lang.Long) in1.getField(3) == null;
            long result$16;
            if (isNull$17) {
                result$16 = -1L;
            } else {
                result$16 = (long) org.apache.calcite.runtime.SqlFunctions.toLong((java.lang.Long) in1.getField(3));
            }
    
    
            function_com$cyq$learning$udf$function$SplitFun$f9ab41da087dbe9d50d61478f4bc638d.setCollector(instance_org$apache$flink$table$runtime$TableFunctionCollector$22);
    
    
            java.lang.String result$25;
            boolean isNull$26;
            if (false) {
                result$25 = "";
                isNull$26 = true;
            } else {
    
                boolean isNull$24 = (java.lang.String) in1.getField(1) == null;
                java.lang.String result$23;
                if (isNull$24) {
                    result$23 = "";
                } else {
                    result$23 = (java.lang.String) (java.lang.String) in1.getField(1);
                }
    
                result$25 = result$23;
                isNull$26 = isNull$24;
            }
    
            function_com$cyq$learning$udf$function$SplitFun$f9ab41da087dbe9d50d61478f4bc638d.eval(isNull$26 ? null : (java.lang.String) result$25);
    
    
            boolean hasOutput = instance_org$apache$flink$table$runtime$TableFunctionCollector$22.isCollected();
            if (!hasOutput) {
    
    
                if (isNull$11) {
                    out.setField(0, null);
                } else {
                    out.setField(0, result$10);
                }
    
    
                if (isNull$13) {
                    out.setField(1, null);
                } else {
                    out.setField(1, result$12);
                }
    
    
                if (isNull$15) {
                    out.setField(2, null);
                } else {
                    out.setField(2, result$14);
                }
    
    
                java.lang.Long result$27;
                if (isNull$17) {
                    result$27 = null;
                } else {
                    result$27 = result$16;
                }
    
                if (isNull$17) {
                    out.setField(3, null);
                } else {
                    out.setField(3, result$27);
                }
    
    
                if (true) {
                    out.setField(4, null);
                } else {
                    out.setField(4, "");
                }
    
    
                if (true) {
                    out.setField(5, null);
                } else {
                    out.setField(5, -1);
                }
    
                c.collect(out);
            }
    
        }
    
        @Override
        public void close() throws Exception {
    
            function_com$cyq$learning$udf$function$SplitFun$f9ab41da087dbe9d50d61478f4bc638d.close();
    
    
        }
    }
    
  • TableFunctionCollector$42源碼如下:

    public class TableFunctionCollector$42 extends org.apache.flink.table.runtime.TableFunctionCollector {
    
    
        final org.apache.flink.types.Row out =
                new org.apache.flink.types.Row(6);
    
    
        public TableFunctionCollector$42() throws Exception {
    
    
        }
    
        @Override
        public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
    
    
        }
    
        @Override
        public void collect(Object record) throws Exception {
            super.collect(record);
            org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) getInput();
            org.apache.flink.api.java.tuple.Tuple2 in2 = (org.apache.flink.api.java.tuple.Tuple2) record;
    
            boolean isNull$34 = (java.lang.Long) in1.getField(2) == null;
            long result$33;
            if (isNull$34) {
                result$33 = -1L;
            } else {
                result$33 = (java.lang.Long) in1.getField(2);
            }
    
    
            boolean isNull$30 = (java.lang.String) in1.getField(0) == null;
            java.lang.String result$29;
            if (isNull$30) {
                result$29 = "";
            } else {
                result$29 = (java.lang.String) (java.lang.String) in1.getField(0);
            }
    
    
            boolean isNull$32 = (java.lang.String) in1.getField(1) == null;
            java.lang.String result$31;
            if (isNull$32) {
                result$31 = "";
            } else {
                result$31 = (java.lang.String) (java.lang.String) in1.getField(1);
            }
    
    
            boolean isNull$36 = (java.lang.Long) in1.getField(3) == null;
            long result$35;
            if (isNull$36) {
                result$35 = -1L;
            } else {
                result$35 = (long) org.apache.calcite.runtime.SqlFunctions.toLong((java.lang.Long) in1.getField(3));
            }
    
    
            if (isNull$30) {
                out.setField(0, null);
            } else {
                out.setField(0, result$29);
            }
    
    
            if (isNull$32) {
                out.setField(1, null);
            } else {
                out.setField(1, result$31);
            }
    
    
            if (isNull$34) {
                out.setField(2, null);
            } else {
                out.setField(2, result$33);
            }
    
    
            java.lang.Long result$41;
            if (isNull$36) {
                result$41 = null;
            } else {
                result$41 = result$35;
            }
    
            if (isNull$36) {
                out.setField(3, null);
            } else {
                out.setField(3, result$41);
            }
    
    
            boolean isNull$38 = (java.lang.String) in2.f0 == null;
            java.lang.String result$37;
            if (isNull$38) {
                result$37 = "";
            } else {
                result$37 = (java.lang.String) (java.lang.String) in2.f0;
            }
    
            if (isNull$38) {
                out.setField(4, null);
            } else {
                out.setField(4, result$37);
            }
    
    
            boolean isNull$40 = (java.lang.Integer) in2.f1 == null;
            int result$39;
            if (isNull$40) {
                result$39 = -1;
            } else {
                result$39 = (java.lang.Integer) in2.f1;
            }
    
            if (isNull$40) {
                out.setField(5, null);
            } else {
                out.setField(5, result$39);
            }
    
            getCollector().collect(out);
    
        }
    
        @Override
        public void close() throws Exception {
    
    
        }
    }
    

UDAF實(shí)現(xiàn)

Flink 的UDAF的實(shí)現(xiàn)是通過(guò)繼承AggregateFunction并實(shí)現(xiàn)相關(guān)的函數(shù)邏輯完成統(tǒng)計(jì)分析功能。UDAF實(shí)現(xiàn)時(shí)帆竹,需要實(shí)現(xiàn)的函數(shù)較多绕娘,不同的場(chǎng)景下需要實(shí)現(xiàn)的方法也不盡相同,因此其實(shí)現(xiàn)也較為復(fù)雜栽连∫瞪幔可參考UserDefinedFunction

一個(gè)簡(jiǎn)單的UDAF實(shí)現(xiàn)可能如下:

public class AggFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamTableEnvironment streamTableEnvironment = EnvironmentUtil.getStreamTableEnvironment();

        TableSchema tableSchema = new TableSchema(new String[]{"product_id", "number", "price", "time"},
                new TypeInformation[]{Types.STRING, Types.LONG, Types.INT, Types.SQL_TIMESTAMP});

        Map<String, String> kakfaProps = new HashMap<>();
        kakfaProps.put("bootstrap.servers", BROKER_SERVERS);

        FlinkKafkaConsumerBase flinkKafkaConsumerBase = SourceUtil.createKafkaSourceStream(TOPIC, kakfaProps, tableSchema);

        DataStream<Row> stream = streamTableEnvironment.execEnv().addSource(flinkKafkaConsumerBase);

        streamTableEnvironment.registerFunction("wAvg", new WeightedAvg());

        Table streamTable = streamTableEnvironment.fromDataStream(stream, "product_id,number,price,rowtime.rowtime");

        streamTableEnvironment.registerTable("product", streamTable);
        Table result = streamTableEnvironment.sqlQuery("SELECT product_id, wAvg(number,price) as avgPrice FROM product group by product_id");


        final TableSchema tableSchemaResult = new TableSchema(new String[]{"product_id", "avg"},
                new TypeInformation[]{Types.STRING, Types.LONG});
        final TypeInformation<Row> typeInfoResult = tableSchemaResult.toRowType();

        // Here a toRestarctStream is need, Because the Talbe result is not an append table,
        DataStream ds = streamTableEnvironment.toRetractStream(result, typeInfoResult);
        ds.print();
        streamTableEnvironment.execEnv().execute("Flink Agg Function Test");


    }
}

其中相關(guān)的結(jié)構(gòu)及函數(shù)如下:

public class WeightedAvgAccum {
    public long sum = 0;
    public int count = 0;
}
public class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {
    @Override
    public WeightedAvgAccum createAccumulator() {
        return new WeightedAvgAccum();
    }
    @Override
    public Long getValue(WeightedAvgAccum acc) {
        if (acc.count == 0) {
            return null;
        } else {
            return acc.sum / acc.count;
        }
    }
    public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
        acc.sum += iValue * iWeight;
        acc.count += iWeight;
    }
    public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
        acc.sum -= iValue * iWeight;
        acc.count -= iWeight;
    }
    public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
        Iterator<WeightedAvgAccum> iter = it.iterator();
        while (iter.hasNext()) {
            WeightedAvgAccum a = iter.next();
            acc.count += a.count;
            acc.sum += a.sum;
        }
    }
    public void resetAccumulator(WeightedAvgAccum acc) {
        acc.count = 0;
        acc.sum = 0L;
    }
}

注意: 在AggFunctionTest中,由于我們執(zhí)行的sql升酣,是一個(gè)匯總統(tǒng)計(jì)分析是一個(gè)全局的統(tǒng)計(jì)舷暮,在此場(chǎng)景下轉(zhuǎn)化為流進(jìn)行輸出,我們采用的是toRetractStream流噩茄,即可撤回流模式,因?yàn)槭侨纸y(tǒng)計(jì)下面,統(tǒng)計(jì)的結(jié)果實(shí)時(shí)發(fā)生發(fā)變化,所以無(wú)法使用appendStream绩聘。關(guān)于toRetractStream可以參考TalbeToStreamConversion

其執(zhí)行由GroupAggProcessFunction的processelement方法完成沥割,核心邏輯如下:

override def processElement(
    inputC: CRow,
    ctx: ProcessFunction[CRow, CRow]#Context,
    out: Collector[CRow]): Unit = {

  val currentTime = ctx.timerService().currentProcessingTime()
  // register state-cleanup timer
  processCleanupTimer(ctx, currentTime)

  val input = inputC.row

  // get accumulators and input counter
  var accumulators = state.value()
  var inputCnt = cntState.value()

  if (null == accumulators) {
    // Don't create a new accumulator for a retraction message. This
    // might happen if the retraction message is the first message for the
    // key or after a state clean up.
    if (!inputC.change) {
      return
    }
    // first accumulate message
    firstRow = true
    accumulators = function.createAccumulators()
  } else {
    firstRow = false
  }

  if (null == inputCnt) {
    inputCnt = 0L
  }

  // Set group keys value to the final output
  function.setForwardedFields(input, newRow.row)
  function.setForwardedFields(input, prevRow.row)

  // Set previous aggregate result to the prevRow
  function.setAggregationResults(accumulators, prevRow.row)

  // update aggregate result and set to the newRow
  if (inputC.change) {
    inputCnt += 1
    // accumulate input
    function.accumulate(accumulators, input)
    function.setAggregationResults(accumulators, newRow.row)
  } else {
    inputCnt -= 1
    // retract input
    function.retract(accumulators, input)
    function.setAggregationResults(accumulators, newRow.row)
  }

  if (inputCnt != 0) {
    // we aggregated at least one record for this key

    // update the state
    state.update(accumulators)
    cntState.update(inputCnt)

    // if this was not the first row
    if (!firstRow) {
      if (prevRow.row.equals(newRow.row) && !stateCleaningEnabled) {
        // newRow is the same as before and state cleaning is not enabled.
        // We emit nothing
        // If state cleaning is enabled, we have to emit messages to prevent too early
        // state eviction of downstream operators.
        return
      } else {
        // retract previous result
        if (generateRetraction) {
          out.collect(prevRow)
        }
      }
    }
    // emit the new result
    out.collect(newRow)

  } else {
    // we retracted the last record for this key
    // sent out a delete message
    out.collect(prevRow)
    // and clear all state
    state.clear()
    cntState.clear()
  }
}

從以上代碼可以看出其相關(guān)的統(tǒng)計(jì)功能主要是通過(guò)function完成狀態(tài)更新,輸出判定等凿菩,其中Function依舊是通過(guò)自動(dòng)代碼生成技術(shù)生成机杜,function源碼如下:

public final class NonWindowedAggregationHelper$17 extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations {

  
   final com.cyq.learning.udf.function.WeightedAvg function_com$cyq$learning$udf$function$WeightedAvg$776eb73449c07a5f410bff97fc0bb881;
  

       private final org.apache.flink.table.runtime.aggregate.SingleElementIterable<com.cyq.learning.udf.function.WeightedAvgAccum> accIt0 =
               new org.apache.flink.table.runtime.aggregate.SingleElementIterable<com.cyq.learning.udf.function.WeightedAvgAccum>();

      public NonWindowedAggregationHelper$17() throws Exception {
   
           function_com$cyq$learning$udf$function$WeightedAvg$776eb73449c07a5f410bff97fc0bb881 = (com.cyq.learning.udf.function.WeightedAvg)
                   org.apache.flink.table.utils.EncodingUtils.decodeStringToObject(
                           "rO0ABXNyACljb20uY3lxLmxlYXJuaW5nLnVkZi5mdW5jdGlvbi5XZWlnaHRlZEF2Z7o-ESHvqA7TAgAAeHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uYPYkANq5VRgCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvboP22EvVTaRLAgAAeHA",
                           org.apache.flink.table.functions.UserDefinedFunction.class);

   }

  
   public final void open(
              org.apache.flink.api.common.functions.RuntimeContext ctx) throws Exception {

          function_com$cyq$learning$udf$function$WeightedAvg$776eb73449c07a5f410bff97fc0bb881.open(new org.apache.flink.table.functions.FunctionContext(ctx));
   
       }
   

   public final void setAggregationResults(
            org.apache.flink.types.Row accs,
              org.apache.flink.types.Row output) throws Exception {
       
          org.apache.flink.table.functions.AggregateFunction baseClass0 =
               (org.apache.flink.table.functions.AggregateFunction) function_com$cyq$learning$udf$function$WeightedAvg$776eb73449c07a5f410bff97fc0bb881;
           com.cyq.learning.udf.function.WeightedAvgAccum acc0 = (com.cyq.learning.udf.function.WeightedAvgAccum) accs.getField(0);
   
           output.setField(
               1,
               baseClass0.getValue(acc0));

   
       }
   
   public final void accumulate(
           org.apache.flink.types.Row accs,
            org.apache.flink.types.Row input) throws Exception {
   
           com.cyq.learning.udf.function.WeightedAvgAccum acc0 = (com.cyq.learning.udf.function.WeightedAvgAccum) accs.getField(0);
   
       function_com$cyq$learning$udf$function$WeightedAvg$776eb73449c07a5f410bff97fc0bb881.accumulate(acc0
               , (java.lang.Long) input.getField(1), (java.lang.Integer) input.getField(2));

    }
   
   
       public final void retract(
               org.apache.flink.types.Row accs,
               org.apache.flink.types.Row input) throws Exception {
   }

    public final org.apache.flink.types.Row createAccumulators() throws Exception {
   
           org.apache.flink.types.Row accs =
                   new org.apache.flink.types.Row(1);
           com.cyq.learning.udf.function.WeightedAvgAccum acc0 = (com.cyq.learning.udf.function.WeightedAvgAccum) function_com$cyq$learning$udf$function$WeightedAvg$776eb73449c07a5f410bff97fc0bb881.createAccumulator();
       accs.setField(
               0,
                acc0);
           return accs;
   }
  
   public final void setForwardedFields(
               org.apache.flink.types.Row input,
               org.apache.flink.types.Row output) {
   
           output.setField(
                   0,
                   input.getField(0));
       }
   
       public final org.apache.flink.types.Row createOutputRow() {
       return new org.apache.flink.types.Row(2);
   }
   
   
       public final org.apache.flink.types.Row mergeAccumulatorsPair(
               org.apache.flink.types.Row a,
               org.apache.flink.types.Row b) {

        return a;

    }
   
       public final void resetAccumulator(
               org.apache.flink.types.Row accs) throws Exception {
       }
   
       public final void cleanup() throws Exception {
   
       }
   
       public final void close() throws Exception {
   
           function_com$cyq$learning$udf$function$WeightedAvg$776eb73449c07a5f410bff97fc0bb881.close();
           
       }
   }
            

以上源碼讀者可參考Learning

讀者通過(guò)如上的分析及各種自動(dòng)生成的代碼大致可了解到自定義函數(shù)的執(zhí)行流程。其中較為難理解的部分主要是高大上的自動(dòng)代碼生成技術(shù)衅谷。由于Code Generation在sql執(zhí)行中性能優(yōu)越性以及實(shí)現(xiàn)功能的靈活性椒拗,本技術(shù)在Spark和Flink中都有廣泛應(yīng)用。后續(xù)筆者也會(huì)對(duì)Code Generation進(jìn)行分析获黔。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末蚀苛,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子玷氏,更是在濱河造成了極大的恐慌堵未,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,383評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件盏触,死亡現(xiàn)場(chǎng)離奇詭異渗蟹,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)赞辩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門雌芽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人诗宣,你說(shuō)我怎么就攤上這事膘怕∠胱纾” “怎么了召庞?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,852評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵岛心,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我篮灼,道長(zhǎng)忘古,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,621評(píng)論 1 284
  • 正文 為了忘掉前任诅诱,我火速辦了婚禮髓堪,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘娘荡。我一直安慰自己干旁,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,741評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布炮沐。 她就那樣靜靜地躺著争群,像睡著了一般。 火紅的嫁衣襯著肌膚如雪大年。 梳的紋絲不亂的頭發(fā)上换薄,一...
    開(kāi)封第一講書(shū)人閱讀 49,929評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音翔试,去河邊找鬼轻要。 笑死,一個(gè)胖子當(dāng)著我的面吹牛垦缅,可吹牛的內(nèi)容都是我干的冲泥。 我是一名探鬼主播,決...
    沈念sama閱讀 39,076評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼壁涎,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼柏蘑!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起粹庞,我...
    開(kāi)封第一講書(shū)人閱讀 37,803評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤咳焚,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后庞溜,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體革半,經(jīng)...
    沈念sama閱讀 44,265評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,582評(píng)論 2 327
  • 正文 我和宋清朗相戀三年流码,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了又官。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,716評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡漫试,死狀恐怖六敬,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情驾荣,我是刑警寧澤外构,帶...
    沈念sama閱讀 34,395評(píng)論 4 333
  • 正文 年R本政府宣布普泡,位于F島的核電站,受9級(jí)特大地震影響审编,放射性物質(zhì)發(fā)生泄漏撼班。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,039評(píng)論 3 316
  • 文/蒙蒙 一垒酬、第九天 我趴在偏房一處隱蔽的房頂上張望砰嘁。 院中可真熱鬧,春花似錦勘究、人聲如沸矮湘。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,798評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)板祝。三九已至,卻和暖如春走净,著一層夾襖步出監(jiān)牢的瞬間券时,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,027評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工伏伯, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留橘洞,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,488評(píng)論 2 361
  • 正文 我出身青樓说搅,卻偏偏與公主長(zhǎng)得像炸枣,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子弄唧,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,612評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容