用戶自定義的函數(shù)是一個重要的特性,因為它們展示擴展了查詢的表達(dá)能力全蝶。
注冊用戶自定義的函數(shù)
在大多數(shù)情況下雀监,必須先注冊用戶定義的函數(shù),然后才能在查詢中使用它在塔。TableEnvironment
通過調(diào)用registerFunction()
方法來注冊函數(shù)掀抹。
標(biāo)量函數(shù)(Scalar Functions)
如果內(nèi)置函數(shù)中不包含必需的標(biāo)量函數(shù),則可以為Table API和SQL定義用戶自定義的標(biāo)量函數(shù)心俗。用戶定義的標(biāo)量函數(shù)將零個傲武,一個或多個標(biāo)量值映射到新的標(biāo)量值。
為了定義標(biāo)量函數(shù)城榛,必須擴展org.apache.flink.Table.function
中的基類ScalarFunction
和實現(xiàn)(一個或多個)評估方法揪利。標(biāo)量函數(shù)的行為由評估方法決定。必須公開聲明一個評估方法并命名為eval
狠持。評估方法的參數(shù)類型和返回類型也決定標(biāo)量函數(shù)的參數(shù)和返回類型疟位。還可以通過實現(xiàn)名為eval的多個評估方法來重載評估方法。評估方法也可以支持變量參數(shù)喘垂,比如eval(String…str)
甜刻。
以下示例展示如何定義標(biāo)量函數(shù)绍撞,在TableEnvironment
中注冊它,并在查詢中調(diào)用它得院。
public class HashCode extends ScalarFunction {
private int factor = 12;
public HashCode(int factor) {
this.factor = factor;
}
//一個約定方法
public int eval(String s) {
return s.hashCode() * factor;
}
}
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register the function
tableEnv.registerFunction("hashCode", new HashCode(10));
// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");
// use the function in SQL API
tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");
默認(rèn)情況下傻铣,評估方法的結(jié)果類型由Flink的類型提取工具決定。對于基本類型或簡單pojo祥绞,這已經(jīng)足夠了非洲。但是對于更復(fù)雜的、自定義的或復(fù)合類型蜕径,這可能是錯誤的两踏。在這些情況下,可以通過通過重寫ScalarFunction
的 getResultType()
手動定義結(jié)果類型的類型信息兜喻。
表值函數(shù)(Table Functions)
與用戶定義的標(biāo)量函數(shù)類似梦染,用戶定義的表函數(shù)將零個,一個或多個標(biāo)量值作為輸入?yún)?shù)朴皆。但是弓坞,與標(biāo)量函數(shù)相比,它可以返回任意數(shù)量的行作為輸出而不是單個值车荔。返回的行可以包含一個或多個列。
為了定義表函數(shù)戚扳,必須擴展org.apache.flink.table.function
中的基類TableFunction
和實現(xiàn)(一個或多個)評估方法忧便。表函數(shù)的行為由其評估方法決定。一個評估方法必須聲明為public并命名為eval帽借≈樵觯可以通過實現(xiàn)名為eval的多個評估方法來重載TableFunction
。評估方法的參數(shù)類型決定了表函數(shù)的所有有效參數(shù)砍艾。評估方法也可以支持變量參數(shù)蒂教,比如eval(String…str)
。返回的表的類型由TableFunction
的泛型類型決定脆荷。評估方法使用受保護(hù)的collect(T)
方法發(fā)出輸出行凝垛。
在Table API中,表函數(shù)與.join(Table)
或. leftouterjoin(Table)
一起使用蜓谋。joinLateral 操作將外部表(操作符左側(cè)的表)中的每一行與表值函數(shù)(操作符右側(cè)的表)生成的所有行關(guān)聯(lián)起來梦皮。leftOuterJoin
操作符將外部表(操作符左側(cè)的表)中的每一行與表值函數(shù)(操作符右側(cè)的表)生成的所有行連接起來,并保留表函數(shù)返回空表的外部行桃焕。在SQL中剑肯,使用帶有CROSS JOIN或LEFT JOIN且具有ON TRUE連接條件的LATERAL TABLE(<Table Function>)。
以下示例展示如何定義表值函數(shù)观堂,在TableEnvironment
中注冊它让网,并在查詢中調(diào)用它呀忧。
// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
public class Split extends TableFunction<Tuple2<String, Integer>> {
private String separator = " ";
public Split(String separator) {
this.separator = separator;
}
//輸入一行一列輸出多行兩個列
public void eval(String str) {
for (String s : str.split(separator)) {
// use collect(...) to emit a row
collect(new Tuple2<String, Integer>(s, s.length()));
}
}
}
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table myTable = ... // table schema: [a: String]
// Register the function.
tableEnv.registerFunction("split", new Split("#"));
// Use the table function in the Java Table API. "as" specifies the field names of the table.
myTable.join(new Table(tableEnv, "split(a) as (word, length)"))
.select("a, word, length");
myTable.leftOuterJoin(new Table(tableEnv, "split(a) as (word, length)"))
.select("a, word, length");
// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
默認(rèn)情況下,評估方法的結(jié)果類型由Flink的類型提取工具決定溃睹。對于基本類型或簡單pojo而账,這已經(jīng)足夠了。但是對于更復(fù)雜的丸凭、自定義的或復(fù)合類型福扬,這可能是錯誤的。在這些情況下惜犀,可以通過通過重寫TableFunction
的 getResultType()
手動定義結(jié)果類型的類型信息铛碑。
聚合函數(shù)
用戶定義的聚合函數(shù)將一個表的一個或多個行并且具有一個或多個屬性聚合為標(biāo)量值。
用戶定義的聚合函數(shù)是通過擴展AggregateFunction
類實現(xiàn)的虽界。AggregateFunction
的工作原理如下汽烦。首先,需要定義一個累加器莉御,它是保存聚合的中間結(jié)果的數(shù)據(jù)結(jié)構(gòu)撇吞。然后是利用AggregateFunction
的createAccumulator()
方法創(chuàng)建一個空的累加器。接下來礁叔,對每個輸入行調(diào)用函數(shù)的accumulate()
方法來更新累加器牍颈。處理完所有行之后,調(diào)用函數(shù)的getValue()
方法來計算并返回最終結(jié)果琅关。
除了上述所必須的方法之外煮岁,還有一些可選擇性實現(xiàn)的約定方法。雖然其中一些方法允許系統(tǒng)更高效地執(zhí)行查詢涣易,但是其他方法對于某些用例則必需的画机。e.g. retract():在有界OVER窗口上聚合是必需的。merge():許多批處理聚合和會話窗口聚合都需要新症。resetAccumulator():許多批處理聚合都需要步氏。
AggregateFunction
的所有方法都必須聲明為public
的,而不是static
徒爹。
/**
* Accumulator for WeightedAvg.
*/
public static class WeightedAvgAccum {
public long sum = 0;
public int count = 0;
}
/**
* Weighted Average user-defined aggregate function.
*/
public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {
@Override
public WeightedAvgAccum createAccumulator() {
return new WeightedAvgAccum();
}
@Override
public Long getValue(WeightedAvgAccum acc) {
if (acc.count == 0) {
return null;
} else {
return acc.sum / acc.count;
}
}
public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
acc.sum += iValue * iWeight;
acc.count += iWeight;
}
//可選擇性的實現(xiàn)
public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
acc.sum -= iValue * iWeight;
acc.count -= iWeight;
}
//可選擇性的實現(xiàn)
public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
Iterator<WeightedAvgAccum> iter = it.iterator();
while (iter.hasNext()) {
WeightedAvgAccum a = iter.next();
acc.count += a.count;
acc.sum += a.sum;
}
}
//可選擇性的實現(xiàn)
public void resetAccumulator(WeightedAvgAccum acc) {
acc.count = 0;
acc.sum = 0L;
}
}
// register function
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("wAvg", new WeightedAvg());
// use function
tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");
將UDF與運行時集成
有時荚醒,用戶定義的函數(shù)可能需要獲取全局運行時信息,或者在實際工作之前進(jìn)行一些設(shè)置/清理工作隆嗅。用戶自定義的函數(shù)可以通過覆蓋open()和close()方法實現(xiàn)腌且。
open()方法在評估方法之前調(diào)用一次。在最后一次調(diào)用評估方法之后在調(diào)用close()方法榛瓮。
open()方法提供一個FunctionContext铺董,其中包含有關(guān)用戶定義函數(shù)在其中執(zhí)行的上下文的信息。
public class HashCode extends ScalarFunction {
private int factor = 0;
@Override
public void open(FunctionContext context) throws Exception {
// access "hashcode_factor" parameter
// "12" would be the default value if parameter does not exist
factor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12"));
}
public int eval(String s) {
return s.hashCode() * factor;
}
}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// set job parameter
Configuration conf = new Configuration();
conf.setString("hashcode_factor", "31");
env.getConfig().setGlobalJobParameters(conf);
// register the function
tableEnv.registerFunction("hashCode", new HashCode());
// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");
// use the function in SQL
tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");