鑒于自定義函數(shù)函數(shù)在SQL中的強(qiáng)大語(yǔ)義崖咨,在SQL中有十分廣泛的應(yīng)用探膊。Flink在其Table/SQL API中同樣支持自定義函數(shù)廷痘,且根據(jù)Flink Forward Asia 2019的規(guī)劃酿箭,在后續(xù)flink版本中馒铃,自定義函數(shù)將支持python語(yǔ)言以及兼容Hive的自定義函數(shù). 本文簡(jiǎn)要介紹Flink中的UDF支持及實(shí)現(xiàn)师崎。
自定義函數(shù)支持類型
Flink支持的自定義函數(shù)包括UDF,UDTF,UDAF等能力咐柜,主要通過(guò)繼承UserDefinedFunction师骗,定義相關(guān)方法黔龟,通過(guò)自動(dòng)代碼生成技術(shù)(Code Generation)生成輔助類妇智,完成對(duì)方法的調(diào)用。
自定義函數(shù) | 功能 | 實(shí)現(xiàn)類 |
---|---|---|
UDF | 對(duì)字段進(jìn)行轉(zhuǎn)義捌锭,輸入一條記錄俘陷,返回一條記錄 | ScalarFunction |
UDTF | 輸入一條記錄,返回一條/多條記錄 | TableFunction |
UDAF | 輸入一條/多條記錄观谦,返回一條記錄 | AggregateFunction |
自定義函數(shù)實(shí)現(xiàn)
UDF實(shí)現(xiàn)
Flink中通過(guò)ScalarFunction標(biāo)量來(lái)支持UDF的實(shí)現(xiàn)拉盾。應(yīng)用只需繼承ScalarFunction,在其中定義eval函數(shù)即可豁状。
其應(yīng)用較為簡(jiǎn)單捉偏,其實(shí)現(xiàn)是通過(guò)Code Generation技術(shù)自動(dòng)生成相關(guān)代碼,完成對(duì)eval函數(shù)的調(diào)用泻红。 在ProcessOperator中夭禽,其function即為自動(dòng)生成的函數(shù),在其processElement方法中谊路,會(huì)調(diào)用我們自定義的eval方法讹躯,完成相關(guān)的轉(zhuǎn)化邏輯。
一個(gè)簡(jiǎn)單的UDF函數(shù)如下:
public class ToUpperCase extends ScalarFunction { public String eval(String s) { return s.toUpperCase(); } }
調(diào)用堆棧如下圖所示:
自動(dòng)代碼生產(chǎn)技術(shù)自動(dòng)生成的function源碼如下:
public class DataStreamCalcRule$19 extends org.apache.flink.streaming.api.functions.ProcessFunction { final com.cyq.learning.udf.function.ToUpperCase function_com$cyq$learning$udf$function$ToUpperCase$078037202ef3ddcbc4f27e57d323d0fd; final org.apache.flink.types.Row out = new org.apache.flink.types.Row(4); public DataStreamCalcRule$19() throws Exception { function_com$cyq$learning$udf$function$ToUpperCase$078037202ef3ddcbc4f27e57d323d0fd = (com.cyq.learning.udf.function.ToUpperCase) org.apache.flink.table.utils.EncodingUtils.decodeStringToObject( "rO0ABXNyACljb20uY3lxLmxlYXJuaW5nLnVkZi5mdW5jdGlvbi5Ub1VwcGVyQ2FzZViZ_w3c2xj-AgAAeHIAL29yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLlNjYWxhckZ1bmN0aW9uygzW306qntsCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvboP22EvVTaRLAgAAeHA", org.apache.flink.table.functions.UserDefinedFunction.class); } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { function_com$cyq$learning$udf$function$ToUpperCase$078037202ef3ddcbc4f27e57d323d0fd.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext())); } @Override public void processElement(Object _in1, org.apache.flink.streaming.api.functions.ProcessFunction.Context ctx, org.apache.flink.util.Collector c) throws Exception { org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) _in1; boolean isNull$18 = (java.lang.Long) in1.getField(2) == null; long result$17; if (isNull$18) { result$17 = -1L; } else { result$17 = (java.lang.Long) in1.getField(2); } boolean isNull$11 = (java.lang.String) in1.getField(0) == null; java.lang.String result$10; if (isNull$11) { result$10 = ""; } else { result$10 = (java.lang.String) (java.lang.String) in1.getField(0); } boolean isNull$13 = (java.lang.String) in1.getField(1) == null; java.lang.String result$12; if (isNull$13) { result$12 = ""; } else { result$12 = (java.lang.String) (java.lang.String) in1.getField(1); } if (isNull$11) { out.setField(0, null); } else { out.setField(0, result$10); } if (isNull$13) { out.setField(1, null); } else { out.setField(1, result$12); } java.lang.String result$14 = function_com$cyq$learning$udf$function$ToUpperCase$078037202ef3ddcbc4f27e57d323d0fd.eval( isNull$13 ? null : (java.lang.String) result$12); boolean isNull$16 = result$14 == null; java.lang.String result$15; if (isNull$16) { result$15 = ""; } else { result$15 = (java.lang.String) result$14; } if (isNull$16) { out.setField(2, null); } else { out.setField(2, result$15); } if (isNull$18) { out.setField(3, null); } else { out.setField(3, result$17); } c.collect(out); } @Override public void close() throws Exception { function_com$cyq$learning$udf$function$ToUpperCase$078037202ef3ddcbc4f27e57d323d0fd.close(); } }
UDTF實(shí)現(xiàn)
Flink的UDTF函數(shù)實(shí)現(xiàn)通過(guò)繼承TableFunction完成缠劝,其完成一行拆分成多行的核心在于TableFunction的collect方法潮梯,該方法通過(guò)調(diào)用collector的collect方法可以將消息發(fā)送至下游operator,當(dāng)接收到一條消息后惨恭,可以對(duì)消息拆分秉馏,然后將拆分后的多條消息分別發(fā)送至下游operator即可完成一條輸入,多條輸出的目標(biāo)脱羡。
一個(gè)簡(jiǎn)單的UDTF實(shí)現(xiàn)如下:
public class SplitFun extends TableFunction<Tuple2<String, Integer>> { private String separator = ","; public SplitFun(String separator) { this.separator = separator; } public void eval(String str) { for (String s : str.split(separator)) { collect(new Tuple2<String, Integer>(s, s.length())); } } }
堆棧調(diào)用圖如下:
其自動(dòng)代碼生成代包含兩個(gè)類(TableFunctionCollector$
42 和DataStreamCorrelateRule$
28)萝究,其中TableFunctionColletor即為將消息發(fā)送至下游operator的輔助collector, DataStreamCorrelateRule則是通過(guò)其processElement完成對(duì)TableFunction(SplitFun)的調(diào)用免都。
-
DataStreamCorrelateRule
$
28源碼如下:public class DataStreamCorrelateRule$28 extends org.apache.flink.streaming.api.functions.ProcessFunction { final org.apache.flink.table.runtime.TableFunctionCollector instance_org$apache$flink$table$runtime$TableFunctionCollector$22; final com.cyq.learning.udf.function.SplitFun function_com$cyq$learning$udf$function$SplitFun$f9ab41da087dbe9d50d61478f4bc638d; final org.apache.flink.types.Row out = new org.apache.flink.types.Row(6); public DataStreamCorrelateRule$28() throws Exception { function_com$cyq$learning$udf$function$SplitFun$f9ab41da087dbe9d50d61478f4bc638d = (com.cyq.learning.udf.function.SplitFun) org.apache.flink.table.utils.EncodingUtils.decodeStringToObject( "rO0ABXNyACZjb20uY3lxLmxlYXJuaW5nLnVkZi5mdW5jdGlvbi5TcGxpdEZ1bl5QHqGTdoFOAgABTAAJc2VwYXJhdG9ydAASTGphdmEvbGFuZy9TdHJpbmc7eHIALm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLlRhYmxlRnVuY3Rpb27qA9IDIKYg-gIAAUwACWNvbGxlY3RvcnQAIUxvcmcvYXBhY2hlL2ZsaW5rL3V0aWwvQ29sbGVjdG9yO3hyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9ug_bYS9VNpEsCAAB4cHB0AAFA", org.apache.flink.table.functions.UserDefinedFunction.class); } public DataStreamCorrelateRule$28(org.apache.flink.table.runtime.TableFunctionCollector arg0) throws Exception { this(); instance_org$apache$flink$table$runtime$TableFunctionCollector$22 = arg0; } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { function_com$cyq$learning$udf$function$SplitFun$f9ab41da087dbe9d50d61478f4bc638d.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext())); } @Override public void processElement(Object _in1, org.apache.flink.streaming.api.functions.ProcessFunction.Context ctx, org.apache.flink.util.Collector c) throws Exception { org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) _in1; boolean isNull$15 = (java.lang.Long) in1.getField(2) == null; long result$14; if (isNull$15) { result$14 = -1L; } else { result$14 = (java.lang.Long) in1.getField(2); } boolean isNull$11 = (java.lang.String) in1.getField(0) == null; java.lang.String result$10; if (isNull$11) { result$10 = ""; } else { result$10 = (java.lang.String) (java.lang.String) in1.getField(0); } boolean isNull$13 = (java.lang.String) in1.getField(1) == null; java.lang.String result$12; if (isNull$13) { result$12 = ""; } else { result$12 = (java.lang.String) (java.lang.String) in1.getField(1); } boolean isNull$17 = (java.lang.Long) in1.getField(3) == null; long result$16; if (isNull$17) { result$16 = -1L; } else { result$16 = (long) org.apache.calcite.runtime.SqlFunctions.toLong((java.lang.Long) in1.getField(3)); } function_com$cyq$learning$udf$function$SplitFun$f9ab41da087dbe9d50d61478f4bc638d.setCollector(instance_org$apache$flink$table$runtime$TableFunctionCollector$22); java.lang.String result$25; boolean isNull$26; if (false) { result$25 = ""; isNull$26 = true; } else { boolean isNull$24 = (java.lang.String) in1.getField(1) == null; java.lang.String result$23; if (isNull$24) { result$23 = ""; } else { result$23 = (java.lang.String) (java.lang.String) in1.getField(1); } result$25 = result$23; isNull$26 = isNull$24; } function_com$cyq$learning$udf$function$SplitFun$f9ab41da087dbe9d50d61478f4bc638d.eval(isNull$26 ? null : (java.lang.String) result$25); boolean hasOutput = instance_org$apache$flink$table$runtime$TableFunctionCollector$22.isCollected(); if (!hasOutput) { if (isNull$11) { out.setField(0, null); } else { out.setField(0, result$10); } if (isNull$13) { out.setField(1, null); } else { out.setField(1, result$12); } if (isNull$15) { out.setField(2, null); } else { out.setField(2, result$14); } java.lang.Long result$27; if (isNull$17) { result$27 = null; } else { result$27 = result$16; } if (isNull$17) { out.setField(3, null); } else { out.setField(3, result$27); } if (true) { out.setField(4, null); } else { out.setField(4, ""); } if (true) { out.setField(5, null); } else { out.setField(5, -1); } c.collect(out); } } @Override public void close() throws Exception { function_com$cyq$learning$udf$function$SplitFun$f9ab41da087dbe9d50d61478f4bc638d.close(); } }
-
TableFunctionCollector
$
42源碼如下:public class TableFunctionCollector$42 extends org.apache.flink.table.runtime.TableFunctionCollector { final org.apache.flink.types.Row out = new org.apache.flink.types.Row(6); public TableFunctionCollector$42() throws Exception { } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { } @Override public void collect(Object record) throws Exception { super.collect(record); org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) getInput(); org.apache.flink.api.java.tuple.Tuple2 in2 = (org.apache.flink.api.java.tuple.Tuple2) record; boolean isNull$34 = (java.lang.Long) in1.getField(2) == null; long result$33; if (isNull$34) { result$33 = -1L; } else { result$33 = (java.lang.Long) in1.getField(2); } boolean isNull$30 = (java.lang.String) in1.getField(0) == null; java.lang.String result$29; if (isNull$30) { result$29 = ""; } else { result$29 = (java.lang.String) (java.lang.String) in1.getField(0); } boolean isNull$32 = (java.lang.String) in1.getField(1) == null; java.lang.String result$31; if (isNull$32) { result$31 = ""; } else { result$31 = (java.lang.String) (java.lang.String) in1.getField(1); } boolean isNull$36 = (java.lang.Long) in1.getField(3) == null; long result$35; if (isNull$36) { result$35 = -1L; } else { result$35 = (long) org.apache.calcite.runtime.SqlFunctions.toLong((java.lang.Long) in1.getField(3)); } if (isNull$30) { out.setField(0, null); } else { out.setField(0, result$29); } if (isNull$32) { out.setField(1, null); } else { out.setField(1, result$31); } if (isNull$34) { out.setField(2, null); } else { out.setField(2, result$33); } java.lang.Long result$41; if (isNull$36) { result$41 = null; } else { result$41 = result$35; } if (isNull$36) { out.setField(3, null); } else { out.setField(3, result$41); } boolean isNull$38 = (java.lang.String) in2.f0 == null; java.lang.String result$37; if (isNull$38) { result$37 = ""; } else { result$37 = (java.lang.String) (java.lang.String) in2.f0; } if (isNull$38) { out.setField(4, null); } else { out.setField(4, result$37); } boolean isNull$40 = (java.lang.Integer) in2.f1 == null; int result$39; if (isNull$40) { result$39 = -1; } else { result$39 = (java.lang.Integer) in2.f1; } if (isNull$40) { out.setField(5, null); } else { out.setField(5, result$39); } getCollector().collect(out); } @Override public void close() throws Exception { } }
UDAF實(shí)現(xiàn)
Flink 的UDAF的實(shí)現(xiàn)是通過(guò)繼承AggregateFunction并實(shí)現(xiàn)相關(guān)的函數(shù)邏輯完成統(tǒng)計(jì)分析功能。UDAF實(shí)現(xiàn)時(shí)帆竹,需要實(shí)現(xiàn)的函數(shù)較多绕娘,不同的場(chǎng)景下需要實(shí)現(xiàn)的方法也不盡相同,因此其實(shí)現(xiàn)也較為復(fù)雜栽连∫瞪幔可參考UserDefinedFunction
一個(gè)簡(jiǎn)單的UDAF實(shí)現(xiàn)可能如下:
public class AggFunctionTest { public static void main(String[] args) throws Exception { StreamTableEnvironment streamTableEnvironment = EnvironmentUtil.getStreamTableEnvironment(); TableSchema tableSchema = new TableSchema(new String[]{"product_id", "number", "price", "time"}, new TypeInformation[]{Types.STRING, Types.LONG, Types.INT, Types.SQL_TIMESTAMP}); Map<String, String> kakfaProps = new HashMap<>(); kakfaProps.put("bootstrap.servers", BROKER_SERVERS); FlinkKafkaConsumerBase flinkKafkaConsumerBase = SourceUtil.createKafkaSourceStream(TOPIC, kakfaProps, tableSchema); DataStream<Row> stream = streamTableEnvironment.execEnv().addSource(flinkKafkaConsumerBase); streamTableEnvironment.registerFunction("wAvg", new WeightedAvg()); Table streamTable = streamTableEnvironment.fromDataStream(stream, "product_id,number,price,rowtime.rowtime"); streamTableEnvironment.registerTable("product", streamTable); Table result = streamTableEnvironment.sqlQuery("SELECT product_id, wAvg(number,price) as avgPrice FROM product group by product_id"); final TableSchema tableSchemaResult = new TableSchema(new String[]{"product_id", "avg"}, new TypeInformation[]{Types.STRING, Types.LONG}); final TypeInformation<Row> typeInfoResult = tableSchemaResult.toRowType(); // Here a toRestarctStream is need, Because the Talbe result is not an append table, DataStream ds = streamTableEnvironment.toRetractStream(result, typeInfoResult); ds.print(); streamTableEnvironment.execEnv().execute("Flink Agg Function Test"); } }
其中相關(guān)的結(jié)構(gòu)及函數(shù)如下:
public class WeightedAvgAccum { public long sum = 0; public int count = 0; }
public class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> { @Override public WeightedAvgAccum createAccumulator() { return new WeightedAvgAccum(); } @Override public Long getValue(WeightedAvgAccum acc) { if (acc.count == 0) { return null; } else { return acc.sum / acc.count; } } public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) { acc.sum += iValue * iWeight; acc.count += iWeight; } public void retract(WeightedAvgAccum acc, long iValue, int iWeight) { acc.sum -= iValue * iWeight; acc.count -= iWeight; } public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) { Iterator<WeightedAvgAccum> iter = it.iterator(); while (iter.hasNext()) { WeightedAvgAccum a = iter.next(); acc.count += a.count; acc.sum += a.sum; } } public void resetAccumulator(WeightedAvgAccum acc) { acc.count = 0; acc.sum = 0L; } }
注意: 在AggFunctionTest中,由于我們執(zhí)行的sql升酣,是一個(gè)匯總統(tǒng)計(jì)分析是一個(gè)全局的統(tǒng)計(jì)舷暮,在此場(chǎng)景下轉(zhuǎn)化為流進(jìn)行輸出,我們采用的是toRetractStream流噩茄,即可撤回流
模式,因?yàn)槭侨纸y(tǒng)計(jì)下面,統(tǒng)計(jì)的結(jié)果實(shí)時(shí)發(fā)生發(fā)變化,所以無(wú)法使用appendStream绩聘。關(guān)于toRetractStream可以參考TalbeToStreamConversion
其執(zhí)行由GroupAggProcessFunction的processelement方法完成沥割,核心邏輯如下:
override def processElement( inputC: CRow, ctx: ProcessFunction[CRow, CRow]#Context, out: Collector[CRow]): Unit = { val currentTime = ctx.timerService().currentProcessingTime() // register state-cleanup timer processCleanupTimer(ctx, currentTime) val input = inputC.row // get accumulators and input counter var accumulators = state.value() var inputCnt = cntState.value() if (null == accumulators) { // Don't create a new accumulator for a retraction message. This // might happen if the retraction message is the first message for the // key or after a state clean up. if (!inputC.change) { return } // first accumulate message firstRow = true accumulators = function.createAccumulators() } else { firstRow = false } if (null == inputCnt) { inputCnt = 0L } // Set group keys value to the final output function.setForwardedFields(input, newRow.row) function.setForwardedFields(input, prevRow.row) // Set previous aggregate result to the prevRow function.setAggregationResults(accumulators, prevRow.row) // update aggregate result and set to the newRow if (inputC.change) { inputCnt += 1 // accumulate input function.accumulate(accumulators, input) function.setAggregationResults(accumulators, newRow.row) } else { inputCnt -= 1 // retract input function.retract(accumulators, input) function.setAggregationResults(accumulators, newRow.row) } if (inputCnt != 0) { // we aggregated at least one record for this key // update the state state.update(accumulators) cntState.update(inputCnt) // if this was not the first row if (!firstRow) { if (prevRow.row.equals(newRow.row) && !stateCleaningEnabled) { // newRow is the same as before and state cleaning is not enabled. // We emit nothing // If state cleaning is enabled, we have to emit messages to prevent too early // state eviction of downstream operators. return } else { // retract previous result if (generateRetraction) { out.collect(prevRow) } } } // emit the new result out.collect(newRow) } else { // we retracted the last record for this key // sent out a delete message out.collect(prevRow) // and clear all state state.clear() cntState.clear() } }
從以上代碼可以看出其相關(guān)的統(tǒng)計(jì)功能主要是通過(guò)function完成狀態(tài)更新,輸出判定等凿菩,其中Function依舊是通過(guò)自動(dòng)代碼生成技術(shù)生成机杜,function源碼如下:
public final class NonWindowedAggregationHelper$17 extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations { final com.cyq.learning.udf.function.WeightedAvg function_com$cyq$learning$udf$function$WeightedAvg$776eb73449c07a5f410bff97fc0bb881; private final org.apache.flink.table.runtime.aggregate.SingleElementIterable<com.cyq.learning.udf.function.WeightedAvgAccum> accIt0 = new org.apache.flink.table.runtime.aggregate.SingleElementIterable<com.cyq.learning.udf.function.WeightedAvgAccum>(); public NonWindowedAggregationHelper$17() throws Exception { function_com$cyq$learning$udf$function$WeightedAvg$776eb73449c07a5f410bff97fc0bb881 = (com.cyq.learning.udf.function.WeightedAvg) org.apache.flink.table.utils.EncodingUtils.decodeStringToObject( "rO0ABXNyACljb20uY3lxLmxlYXJuaW5nLnVkZi5mdW5jdGlvbi5XZWlnaHRlZEF2Z7o-ESHvqA7TAgAAeHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uYPYkANq5VRgCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvboP22EvVTaRLAgAAeHA", org.apache.flink.table.functions.UserDefinedFunction.class); } public final void open( org.apache.flink.api.common.functions.RuntimeContext ctx) throws Exception { function_com$cyq$learning$udf$function$WeightedAvg$776eb73449c07a5f410bff97fc0bb881.open(new org.apache.flink.table.functions.FunctionContext(ctx)); } public final void setAggregationResults( org.apache.flink.types.Row accs, org.apache.flink.types.Row output) throws Exception { org.apache.flink.table.functions.AggregateFunction baseClass0 = (org.apache.flink.table.functions.AggregateFunction) function_com$cyq$learning$udf$function$WeightedAvg$776eb73449c07a5f410bff97fc0bb881; com.cyq.learning.udf.function.WeightedAvgAccum acc0 = (com.cyq.learning.udf.function.WeightedAvgAccum) accs.getField(0); output.setField( 1, baseClass0.getValue(acc0)); } public final void accumulate( org.apache.flink.types.Row accs, org.apache.flink.types.Row input) throws Exception { com.cyq.learning.udf.function.WeightedAvgAccum acc0 = (com.cyq.learning.udf.function.WeightedAvgAccum) accs.getField(0); function_com$cyq$learning$udf$function$WeightedAvg$776eb73449c07a5f410bff97fc0bb881.accumulate(acc0 , (java.lang.Long) input.getField(1), (java.lang.Integer) input.getField(2)); } public final void retract( org.apache.flink.types.Row accs, org.apache.flink.types.Row input) throws Exception { } public final org.apache.flink.types.Row createAccumulators() throws Exception { org.apache.flink.types.Row accs = new org.apache.flink.types.Row(1); com.cyq.learning.udf.function.WeightedAvgAccum acc0 = (com.cyq.learning.udf.function.WeightedAvgAccum) function_com$cyq$learning$udf$function$WeightedAvg$776eb73449c07a5f410bff97fc0bb881.createAccumulator(); accs.setField( 0, acc0); return accs; } public final void setForwardedFields( org.apache.flink.types.Row input, org.apache.flink.types.Row output) { output.setField( 0, input.getField(0)); } public final org.apache.flink.types.Row createOutputRow() { return new org.apache.flink.types.Row(2); } public final org.apache.flink.types.Row mergeAccumulatorsPair( org.apache.flink.types.Row a, org.apache.flink.types.Row b) { return a; } public final void resetAccumulator( org.apache.flink.types.Row accs) throws Exception { } public final void cleanup() throws Exception { } public final void close() throws Exception { function_com$cyq$learning$udf$function$WeightedAvg$776eb73449c07a5f410bff97fc0bb881.close(); } }
以上源碼讀者可參考Learning
讀者通過(guò)如上的分析及各種自動(dòng)生成的代碼大致可了解到自定義函數(shù)的執(zhí)行流程。其中較為難理解的部分主要是高大上的自動(dòng)代碼生成技術(shù)衅谷。由于Code Generation在sql執(zhí)行中性能優(yōu)越性以及實(shí)現(xiàn)功能的靈活性椒拗,本技術(shù)在Spark和Flink中都有廣泛應(yīng)用。后續(xù)筆者也會(huì)對(duì)Code Generation進(jìn)行分析获黔。