目錄
一丘薛、什么是UDF灸撰?
二颤芬、實現(xiàn)一個UDF
三病袄、源碼解讀 -- 注冊UDF 的原理 && 向Apache Calcite添加UDF的多種方式
四、目前kylin中UDF注冊存在的問題
五苞也、UDF對apache calcite優(yōu)化器的影響
一、什么是UDF?
UDF全稱為user defined function(用戶自定義函數(shù))丈秩,是查詢引擎留給用戶的一個口子,用于擴展sql 的能力淳衙,用戶把自己實現(xiàn)的UDF打包部署后放在指定目錄下蘑秽,便可在查詢時使用此函數(shù)。
二箫攀、實現(xiàn)一個UDF
下面舉一個簡單的例子肠牲,演示如何在Kylin 中實現(xiàn)一個UDF
1. 實現(xiàn)一個java類
- 類名隨意,例如ConcatUDF
- 方法名為eval靴跛,方法名不能亂取缀雳,后面會解釋為什么
- 在eval 方法中實現(xiàn)自己的函數(shù)邏輯,例如下面實現(xiàn)的這個函數(shù)的邏輯就是將傳入的兩個字符串進行拼接
import org.apache.calcite.linq4j.function.Parameter;
public class ConcatUDF {
public static String eval(@Parameter(name = "str1") String col1, @Parameter(name = "str2") String col2) {
if (col1 == null) {
return null;
}
if (col2 == null) {
return null;
}
return col1 + col2;
}
}
2. 將實現(xiàn)的java類打成jar包梢睛,部署在Kylin 目錄的 “l(fā)ib” 目錄下
3. 修改配置文件
在kylin.properties中添加 org.apache.kylin.query.udf.ConcatUDF= 類的路徑
三肥印、源碼解讀 注冊UDF 的原理 && 向Apache Calcite添加UDF的多種方式
1. 注冊UDF 的原理
kylin中的查詢引擎使用的是Apache Calcite,因此往kylin 中添加一個UDF 實際上就是往calcite 框架中注冊一個UDF绝葡。 Apache Calcite 是目前比較通用的大數(shù)據(jù)查詢引擎框架深碱,想了解Apache Calcite的可以看看這篇文章 Apache Calcite:Hadoop 中新型大數(shù)據(jù)查詢引擎。
讓我們一起來讀一讀kylin 的源碼藏畅,看一看用戶實現(xiàn)的UDF 是如何注冊進calcite 的敷硅。
從kylin的查詢引擎入口query(@RequestBody PrepareSqlRequest sqlRequest)開始看
query(@RequestBody PrepareSqlRequest sqlRequest)
doQueryWithCache(SQLRequest sqlRequest, boolean isQueryInspect)
queryWithCache(sqlRequest, startTime, isQueryCacheEnabled);
queryAndUpdateCache(sqlRequest, startTime, isQueryCacheEnabled);
query(sqlRequest);
queryWithSqlMassage(sqlRequest);
private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws Exception {
Connection conn = null;
// 省略
conn = QueryConnection.getConnection(sqlRequest.getProject());
// 省略
return executeRequest(correctedSql, sqlRequest, conn);
// 省略
}
在queryWithSqlMassage中,會獲取到calcite的jdbc 連接,在獲取連接的同時竞膳,會將schema 等信息傳遞給calcite航瞭,schema信息之一便是我們要注冊的UDF,包括UDF的類路徑和方法名坦辟。
public static Connection getConnection(String project) throws SQLException {
if (!isRegister) {
try {
Class<?> aClass = Thread.currentThread().getContextClassLoader()
.loadClass("org.apache.calcite.jdbc.Driver");
Driver o = (Driver) aClass.getDeclaredConstructor().newInstance();
DriverManager.registerDriver(o);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
e.printStackTrace();
}
isRegister = true;
}
File olapTmp = OLAPSchemaFactory.createTempOLAPJson(project, KylinConfig.getInstanceFromEnv());
Properties info = new Properties();
info.putAll(KylinConfig.getInstanceFromEnv().getCalciteExtrasProperties());
// Import calcite props from jdbc client(override the kylin.properties)
info.putAll(BackdoorToggles.getJdbcDriverClientCalciteProps());
info.put("model", olapTmp.getAbsolutePath());
info.put("typeSystem", "org.apache.kylin.query.calcite.KylinRelDataTypeSystem");
return DriverManager.getConnection("jdbc:calcite:", info);
}
那UDF類路徑是從哪里獲取到的呢刊侯,沒錯,就是來自一開始用戶配置的kylin.properties锉走。schema里的函數(shù)信息除了UDF以外滨彻,還有UDAF的類路徑和方法名,注冊方式和UDF一樣挪蹭,但是目前kylin并未向用戶開放注冊UDAF的入口亭饵,而有關(guān)UDF和UDAF的區(qū)別這里不作介紹,后面想單獨寫一篇關(guān)于UDAF的文章梁厉。
public Map<String, String> getUDFs() {
Map<String, String> udfMap = Maps.newLinkedHashMap();
udfMap.put("version", "org.apache.kylin.query.udf.VersionUDF");
udfMap.put("concat", "org.apache.kylin.query.udf.ConcatUDF");
udfMap.put("massin", "org.apache.kylin.query.udf.MassInUDF");
Map<String, String> overrideUdfMap = getPropertiesByPrefix("kylin.query.udf.");
udfMap.putAll(overrideUdfMap);
return udfMap;
}
private static void createOLAPSchemaFunctions(Map<String, String> definedUdfs, StringBuilder out)
throws IOException {
Map<String, String> udfs = Maps.newHashMap();
if (definedUdfs != null)
udfs.putAll(definedUdfs);
for (Entry<String, Class<?>> entry : MeasureTypeFactory.getUDAFs().entrySet()) {
udfs.put(entry.getKey(), entry.getValue().getName());
}
int index = 0;
out.append(" \"functions\": [\n");
for (Map.Entry<String, String> udf : udfs.entrySet()) {
String udfName = udf.getKey().trim().toUpperCase(Locale.ROOT);
String udfClassName = udf.getValue().trim();
out.append(" {\n");
out.append(" name: '" + udfName + "',\n");
out.append(" className: '" + udfClassName + "'\n");
if (index < udfs.size() - 1) {
out.append(" },\n");
} else {
out.append(" }\n");
}
index++;
}
out.append(" ]\n");
}
通過kylin拼接好的schema信息如下所示辜羊,它會被寫入到一個以"olap_model_" 開頭的json 文件中,后續(xù)由calcite自己讀取词顾。
{
"version": "1.0",
"defaultSchema": "DEFAULT",
"schemas": [
{
"type": "custom",
"name": "DEFAULT",
"factory": "io.kyligence.kap.query.schema.KapSchemaFactory",
"operand": {
"project": "bingfeng"
},
"functions":
[{"name":"MASSIN","className":"org.apache.kylin.query.udf.MassInUDF"},
{"name":"CONCAT","className":"org.apache.kylin.query.udf.string.ConcatUDF"},
{"name":"VERSION","className":"org.apache.kylin.query.udf.VersionUDF"},
]
}
實際上八秃,kylin 并沒有把全部的字段都填寫上,還要兩個字段可以使用肉盹,methodName和path昔驱。
填補后的schema 如下所示。methodName字段的值被設(shè)為"*"上忍,這是方便為了后續(xù)calcite的處理骤肛,詳細原因請往后看。還有一個字段叫path窍蓝,這個字段目前已經(jīng)被calcite廢棄腋颠,不需要填寫。
{
"version": "1.0",
"defaultSchema": "DEFAULT",
"schemas": [
{
"type": "custom",
"name": "DEFAULT",
"factory": "io.kyligence.kap.query.schema.KapSchemaFactory",
"operand": {
"project": "bingfeng"
},
"functions":
[{"name":"MASSIN","className":"org.apache.kylin.query.udf.MassInUDF","methodName":"*","path":null},
{"name":"CONCAT","className":"org.apache.kylin.query.udf.string.ConcatUDF","methodName":"*","path":null},
{"name":"VERSION","className":"org.apache.kylin.query.udf.VersionUDF","methodName":"*","path":null},
]
}
2. 向Apache Calcite添加UDF的多種方式
接下來我們再看看calcite這邊是如何處理的吓笙,有人猜到是如何把用戶寫的函數(shù)邏輯注冊進去的嗎淑玫?
沒錯,很簡單观蓄,就是反射混移!
calcite 將json 里的UDF 類路徑和方法名拿到,通過反射侮穿,獲取到用戶實現(xiàn)的UDF方法歌径,然后將此方法注冊進calcite的schema中。在后續(xù)的查詢中亲茅,便可從schema中獲取到此UDF方法
/** Creates and validates a {@link ScalarFunctionImpl}, and adds it to a
* schema. If {@code methodName} is "*", may add more than one function.
*
* @param schema Schema to add to
* @param functionName Name of function; null to derived from method name
* @param path Path to look for functions
* @param className Class to inspect for methods that may be user-defined
* functions
* @param methodName Method name;
* null means use the class as a UDF;
* "*" means add all methods
* @param upCase Whether to convert method names to upper case, so that they
* can be called without using quotes
*/
public static void addFunctions(SchemaPlus schema, @Nullable String functionName,
List<String> path, String className, @Nullable String methodName, boolean upCase) {
final Class<?> clazz;
try {
clazz = Class.forName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException("UDF class '"
+ className + "' not found");
}
String methodNameOrDefault = Util.first(methodName, "eval");
String actualFunctionName;
if (functionName != null) {
actualFunctionName = functionName;
} else {
actualFunctionName = methodNameOrDefault;
}
if (upCase) {
actualFunctionName = actualFunctionName.toUpperCase(Locale.ROOT);
}
final TableFunction tableFunction =
TableFunctionImpl.create(clazz, methodNameOrDefault);
if (tableFunction != null) {
schema.add(Util.first(functionName, methodNameOrDefault),
tableFunction);
return;
}
// Must look for TableMacro before ScalarFunction. Both have an "eval"
// method.
final TableMacro macro = TableMacroImpl.create(clazz);
if (macro != null) {
schema.add(actualFunctionName, macro);
return;
}
if (methodName != null && methodName.equals("*")) {
for (Map.Entry<String, Function> entry
: ScalarFunctionImpl.functions(clazz).entries()) {
String name = entry.getKey();
if (upCase) {
name = name.toUpperCase(Locale.ROOT);
}
schema.add(name, entry.getValue());
}
return;
} else {
final ScalarFunction function =
ScalarFunctionImpl.create(clazz, methodNameOrDefault);
if (function != null) {
schema.add(actualFunctionName, function);
return;
}
}
if (methodName == null) {
final AggregateFunction aggFunction = AggregateFunctionImpl.create(clazz);
if (aggFunction != null) {
schema.add(actualFunctionName, aggFunction);
return;
}
}
throw new RuntimeException("Not a valid function class: " + clazz
+ ". Scalar functions and table macros have an 'eval' method; "
+ "aggregate functions have 'init' and 'add' methods, and optionally "
+ "'initAdd', 'merge' and 'result' methods.");
}
從上面的代碼我們可以看出回铛,將UDF被添加到calcite中可以有多種方式狗准,我將所有的情況總結(jié)成了如下表格(假設(shè)用戶實現(xiàn)了一個類名為InStrUDF的UDF),而不同的注冊方式茵肃,被calcite選中的方法會有不一樣腔长。
無論該UDF是否需要重載,都強烈推薦使用方式五验残,methodName設(shè)為"*"捞附,將所有UDF類中需要重載的方法都取名為udf名的全大寫(例如INSTR)。
四您没、目前kylin中UDF注冊存在的問題
1. 注冊方式
目前開源kylin 中使用的是上面那張表格里的方式一鸟召,這種方式要求UDF的方法名必須為eval,并且無法實現(xiàn)重載氨鹏。不夠靈活欧募。
2. 性能問題
通過閱讀kylin 的源碼可以看到,在每一次查詢都會去創(chuàng)建一次schema仆抵,也就是每次都需要從kylin.properties 里面讀取到UDF的類路徑和方法名跟继,每次都需要完成不少字符串拼接的工作,并且還要走一遍json镣丑,再注冊進calcite舔糖。
如果把注冊進calcite之前的步驟提前做好,進行預(yù)計算存放在一個變量中传轰,每次查詢只需要拿這個變量去完成注冊的工作剩盒,那么就可以節(jié)省掉查詢過程中不少的時間谷婆。
或者更極致一點慨蛙,不調(diào)用calcite的addFunctions 進行注冊,如果能拿到schema變量纪挎,提前把反射的代碼也預(yù)計算好期贫,直接調(diào)用最底層的 schema.add()進行注冊,這樣可以把查詢過程中每個UDF反射的時間也省掉异袄。
經(jīng)過性能測試通砍,這種改進可以將kylin查詢qps 從60提升至70。
五烤蜕、UDF對apache calcite優(yōu)化器的影響
優(yōu)化器很難評估UDF的性能封孙,更新中...