淺析Apache Kylin UDF

目錄

一丘薛、什么是UDF灸撰?
二颤芬、實現(xiàn)一個UDF
三病袄、源碼解讀 -- 注冊UDF 的原理 && 向Apache Calcite添加UDF的多種方式
四、目前kylin中UDF注冊存在的問題
五苞也、UDF對apache calcite優(yōu)化器的影響

一、什么是UDF?

UDF全稱為user defined function(用戶自定義函數(shù))丈秩,是查詢引擎留給用戶的一個口子,用于擴展sql 的能力淳衙,用戶把自己實現(xiàn)的UDF打包部署后放在指定目錄下蘑秽,便可在查詢時使用此函數(shù)。

二箫攀、實現(xiàn)一個UDF

下面舉一個簡單的例子肠牲,演示如何在Kylin 中實現(xiàn)一個UDF

1. 實現(xiàn)一個java類

  • 類名隨意,例如ConcatUDF
  • 方法名為eval靴跛,方法名不能亂取缀雳,后面會解釋為什么
  • 在eval 方法中實現(xiàn)自己的函數(shù)邏輯,例如下面實現(xiàn)的這個函數(shù)的邏輯就是將傳入的兩個字符串進行拼接
import org.apache.calcite.linq4j.function.Parameter;

public class ConcatUDF {

    public static String eval(@Parameter(name = "str1") String col1, @Parameter(name = "str2") String col2) {
        if (col1 == null) {
            return null;
        }
        if (col2 == null) {
            return null;
        }
        return col1 + col2;
    }
}

2. 將實現(xiàn)的java類打成jar包梢睛,部署在Kylin 目錄的 “l(fā)ib” 目錄下

3. 修改配置文件
在kylin.properties中添加 org.apache.kylin.query.udf.ConcatUDF= 類的路徑

三肥印、源碼解讀 注冊UDF 的原理 && 向Apache Calcite添加UDF的多種方式

1. 注冊UDF 的原理

kylin中的查詢引擎使用的是Apache Calcite,因此往kylin 中添加一個UDF 實際上就是往calcite 框架中注冊一個UDF绝葡。 Apache Calcite 是目前比較通用的大數(shù)據(jù)查詢引擎框架深碱,想了解Apache Calcite的可以看看這篇文章 Apache Calcite:Hadoop 中新型大數(shù)據(jù)查詢引擎

讓我們一起來讀一讀kylin 的源碼藏畅,看一看用戶實現(xiàn)的UDF 是如何注冊進calcite 的敷硅。

從kylin的查詢引擎入口query(@RequestBody PrepareSqlRequest sqlRequest)開始看

query(@RequestBody PrepareSqlRequest sqlRequest)
  doQueryWithCache(SQLRequest sqlRequest, boolean isQueryInspect)
    queryWithCache(sqlRequest, startTime, isQueryCacheEnabled);
      queryAndUpdateCache(sqlRequest, startTime, isQueryCacheEnabled);
        query(sqlRequest);
          queryWithSqlMassage(sqlRequest);
private SQLResponse queryWithSqlMassage(SQLRequest sqlRequest) throws Exception {
        Connection conn = null;
        // 省略
        conn = QueryConnection.getConnection(sqlRequest.getProject());
        // 省略
       return executeRequest(correctedSql, sqlRequest, conn);
        // 省略
}

在queryWithSqlMassage中,會獲取到calcite的jdbc 連接,在獲取連接的同時竞膳,會將schema 等信息傳遞給calcite航瞭,schema信息之一便是我們要注冊的UDF,包括UDF的類路徑和方法名坦辟。

public static Connection getConnection(String project) throws SQLException {
        if (!isRegister) {
            try {
                Class<?> aClass = Thread.currentThread().getContextClassLoader()
                        .loadClass("org.apache.calcite.jdbc.Driver");
                Driver o = (Driver) aClass.getDeclaredConstructor().newInstance();
                DriverManager.registerDriver(o);
            } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
                e.printStackTrace();
            }
            isRegister = true;
        }
        File olapTmp = OLAPSchemaFactory.createTempOLAPJson(project, KylinConfig.getInstanceFromEnv());
        Properties info = new Properties();
        info.putAll(KylinConfig.getInstanceFromEnv().getCalciteExtrasProperties());
        // Import calcite props from jdbc client(override the kylin.properties)
        info.putAll(BackdoorToggles.getJdbcDriverClientCalciteProps());
        info.put("model", olapTmp.getAbsolutePath());
        info.put("typeSystem", "org.apache.kylin.query.calcite.KylinRelDataTypeSystem");
        return DriverManager.getConnection("jdbc:calcite:", info);
    }

那UDF類路徑是從哪里獲取到的呢刊侯,沒錯,就是來自一開始用戶配置的kylin.properties锉走。schema里的函數(shù)信息除了UDF以外滨彻,還有UDAF的類路徑和方法名,注冊方式和UDF一樣挪蹭,但是目前kylin并未向用戶開放注冊UDAF的入口亭饵,而有關(guān)UDF和UDAF的區(qū)別這里不作介紹,后面想單獨寫一篇關(guān)于UDAF的文章梁厉。

public Map<String, String> getUDFs() {
        Map<String, String> udfMap = Maps.newLinkedHashMap();
        udfMap.put("version", "org.apache.kylin.query.udf.VersionUDF");
        udfMap.put("concat", "org.apache.kylin.query.udf.ConcatUDF");
        udfMap.put("massin", "org.apache.kylin.query.udf.MassInUDF");
        Map<String, String> overrideUdfMap = getPropertiesByPrefix("kylin.query.udf.");
        udfMap.putAll(overrideUdfMap);
        return udfMap;
    }
private static void createOLAPSchemaFunctions(Map<String, String> definedUdfs, StringBuilder out)
            throws IOException {
        Map<String, String> udfs = Maps.newHashMap();
        if (definedUdfs != null)
            udfs.putAll(definedUdfs);

        for (Entry<String, Class<?>> entry : MeasureTypeFactory.getUDAFs().entrySet()) {
            udfs.put(entry.getKey(), entry.getValue().getName());
        }

        int index = 0;
        out.append("            \"functions\": [\n");
        for (Map.Entry<String, String> udf : udfs.entrySet()) {
            String udfName = udf.getKey().trim().toUpperCase(Locale.ROOT);
            String udfClassName = udf.getValue().trim();
            out.append("               {\n");
            out.append("                   name: '" + udfName + "',\n");
            out.append("                   className: '" + udfClassName + "'\n");
            if (index < udfs.size() - 1) {
                out.append("               },\n");
            } else {
                out.append("               }\n");
            }
            index++;
        }
        out.append("            ]\n");
    }

通過kylin拼接好的schema信息如下所示辜羊,它會被寫入到一個以"olap_model_" 開頭的json 文件中,后續(xù)由calcite自己讀取词顾。

{
    "version": "1.0",
    "defaultSchema": "DEFAULT",
    "schemas": [
        {
            "type": "custom",
            "name": "DEFAULT",
            "factory": "io.kyligence.kap.query.schema.KapSchemaFactory",
            "operand": {
                "project": "bingfeng"
            },
            "functions": 
 [{"name":"MASSIN","className":"org.apache.kylin.query.udf.MassInUDF"},
  {"name":"CONCAT","className":"org.apache.kylin.query.udf.string.ConcatUDF"},
  {"name":"VERSION","className":"org.apache.kylin.query.udf.VersionUDF"},
    ]
}

實際上八秃,kylin 并沒有把全部的字段都填寫上,還要兩個字段可以使用肉盹,methodName和path昔驱。

填補后的schema 如下所示。methodName字段的值被設(shè)為"*"上忍,這是方便為了后續(xù)calcite的處理骤肛,詳細原因請往后看。還有一個字段叫path窍蓝,這個字段目前已經(jīng)被calcite廢棄腋颠,不需要填寫。

{
    "version": "1.0",
    "defaultSchema": "DEFAULT",
    "schemas": [
        {
            "type": "custom",
            "name": "DEFAULT",
            "factory": "io.kyligence.kap.query.schema.KapSchemaFactory",
            "operand": {
                "project": "bingfeng"
            },
            "functions": 
 [{"name":"MASSIN","className":"org.apache.kylin.query.udf.MassInUDF","methodName":"*","path":null},
  {"name":"CONCAT","className":"org.apache.kylin.query.udf.string.ConcatUDF","methodName":"*","path":null},
  {"name":"VERSION","className":"org.apache.kylin.query.udf.VersionUDF","methodName":"*","path":null},
    ]
}

2. 向Apache Calcite添加UDF的多種方式

接下來我們再看看calcite這邊是如何處理的吓笙,有人猜到是如何把用戶寫的函數(shù)邏輯注冊進去的嗎淑玫?
沒錯,很簡單观蓄,就是反射混移!

calcite 將json 里的UDF 類路徑和方法名拿到,通過反射侮穿,獲取到用戶實現(xiàn)的UDF方法歌径,然后將此方法注冊進calcite的schema中。在后續(xù)的查詢中亲茅,便可從schema中獲取到此UDF方法

/** Creates and validates a {@link ScalarFunctionImpl}, and adds it to a
   * schema. If {@code methodName} is "*", may add more than one function.
   *
   * @param schema Schema to add to
   * @param functionName Name of function; null to derived from method name
   * @param path Path to look for functions
   * @param className Class to inspect for methods that may be user-defined
   *                  functions
   * @param methodName Method name;
   *                  null means use the class as a UDF;
   *                  "*" means add all methods
   * @param upCase Whether to convert method names to upper case, so that they
   *               can be called without using quotes
   */
  public static void addFunctions(SchemaPlus schema, @Nullable String functionName,
      List<String> path, String className, @Nullable String methodName, boolean upCase) {
    final Class<?> clazz;
    try {
      clazz = Class.forName(className);
    } catch (ClassNotFoundException e) {
      throw new RuntimeException("UDF class '"
          + className + "' not found");
    }
    String methodNameOrDefault = Util.first(methodName, "eval");
    String actualFunctionName;
    if (functionName != null) {
      actualFunctionName = functionName;
    } else {
      actualFunctionName = methodNameOrDefault;
    }
    if (upCase) {
      actualFunctionName = actualFunctionName.toUpperCase(Locale.ROOT);
    }
    final TableFunction tableFunction =
        TableFunctionImpl.create(clazz, methodNameOrDefault);
    if (tableFunction != null) {
      schema.add(Util.first(functionName, methodNameOrDefault),
          tableFunction);
      return;
    }
    // Must look for TableMacro before ScalarFunction. Both have an "eval"
    // method.
    final TableMacro macro = TableMacroImpl.create(clazz);
    if (macro != null) {
      schema.add(actualFunctionName, macro);
      return;
    }
    if (methodName != null && methodName.equals("*")) {
      for (Map.Entry<String, Function> entry
          : ScalarFunctionImpl.functions(clazz).entries()) {
        String name = entry.getKey();
        if (upCase) {
          name = name.toUpperCase(Locale.ROOT);
        }
        schema.add(name, entry.getValue());
      }
      return;
    } else {
      final ScalarFunction function =
          ScalarFunctionImpl.create(clazz, methodNameOrDefault);
      if (function != null) {
        schema.add(actualFunctionName, function);
        return;
      }
    }
    if (methodName == null) {
      final AggregateFunction aggFunction = AggregateFunctionImpl.create(clazz);
      if (aggFunction != null) {
        schema.add(actualFunctionName, aggFunction);
        return;
      }
    }
    throw new RuntimeException("Not a valid function class: " + clazz
        + ". Scalar functions and table macros have an 'eval' method; "
        + "aggregate functions have 'init' and 'add' methods, and optionally "
        + "'initAdd', 'merge' and 'result' methods.");
  }

從上面的代碼我們可以看出回铛,將UDF被添加到calcite中可以有多種方式狗准,我將所有的情況總結(jié)成了如下表格(假設(shè)用戶實現(xiàn)了一個類名為InStrUDF的UDF),而不同的注冊方式茵肃,被calcite選中的方法會有不一樣腔长。

無論該UDF是否需要重載,都強烈推薦使用方式五验残,methodName設(shè)為"*"捞附,將所有UDF類中需要重載的方法都取名為udf名的全大寫(例如INSTR)。

屏幕快照 2019-07-28 下午11.40.54.png

四您没、目前kylin中UDF注冊存在的問題

1. 注冊方式

目前開源kylin 中使用的是上面那張表格里的方式一鸟召,這種方式要求UDF的方法名必須為eval,并且無法實現(xiàn)重載氨鹏。不夠靈活欧募。

2. 性能問題

通過閱讀kylin 的源碼可以看到,在每一次查詢都會去創(chuàng)建一次schema仆抵,也就是每次都需要從kylin.properties 里面讀取到UDF的類路徑和方法名跟继,每次都需要完成不少字符串拼接的工作,并且還要走一遍json镣丑,再注冊進calcite舔糖。

如果把注冊進calcite之前的步驟提前做好,進行預(yù)計算存放在一個變量中传轰,每次查詢只需要拿這個變量去完成注冊的工作剩盒,那么就可以節(jié)省掉查詢過程中不少的時間谷婆。

或者更極致一點慨蛙,不調(diào)用calcite的addFunctions 進行注冊,如果能拿到schema變量纪挎,提前把反射的代碼也預(yù)計算好期贫,直接調(diào)用最底層的 schema.add()進行注冊,這樣可以把查詢過程中每個UDF反射的時間也省掉异袄。

經(jīng)過性能測試通砍,這種改進可以將kylin查詢qps 從60提升至70。

五烤蜕、UDF對apache calcite優(yōu)化器的影響

優(yōu)化器很難評估UDF的性能封孙,更新中...

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市讽营,隨后出現(xiàn)的幾起案子虎忌,更是在濱河造成了極大的恐慌,老刑警劉巖橱鹏,帶你破解...
    沈念sama閱讀 211,423評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件膜蠢,死亡現(xiàn)場離奇詭異堪藐,居然都是意外死亡,警方通過查閱死者的電腦和手機挑围,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,147評論 2 385
  • 文/潘曉璐 我一進店門礁竞,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人杉辙,你說我怎么就攤上這事模捂。” “怎么了蜘矢?”我有些...
    開封第一講書人閱讀 157,019評論 0 348
  • 文/不壞的土叔 我叫張陵枫绅,是天一觀的道長。 經(jīng)常有香客問我硼端,道長并淋,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,443評論 1 283
  • 正文 為了忘掉前任珍昨,我火速辦了婚禮县耽,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘镣典。我一直安慰自己兔毙,他們只是感情好,可當我...
    茶點故事閱讀 65,535評論 6 385
  • 文/花漫 我一把揭開白布兄春。 她就那樣靜靜地躺著澎剥,像睡著了一般。 火紅的嫁衣襯著肌膚如雪赶舆。 梳的紋絲不亂的頭發(fā)上哑姚,一...
    開封第一講書人閱讀 49,798評論 1 290
  • 那天,我揣著相機與錄音芜茵,去河邊找鬼叙量。 笑死,一個胖子當著我的面吹牛九串,可吹牛的內(nèi)容都是我干的绞佩。 我是一名探鬼主播,決...
    沈念sama閱讀 38,941評論 3 407
  • 文/蒼蘭香墨 我猛地睜開眼猪钮,長吁一口氣:“原來是場噩夢啊……” “哼品山!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起烤低,我...
    開封第一講書人閱讀 37,704評論 0 266
  • 序言:老撾萬榮一對情侶失蹤肘交,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后拂玻,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體酸些,經(jīng)...
    沈念sama閱讀 44,152評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡宰译,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,494評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了魄懂。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片沿侈。...
    茶點故事閱讀 38,629評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖市栗,靈堂內(nèi)的尸體忽然破棺而出缀拭,到底是詐尸還是另有隱情,我是刑警寧澤填帽,帶...
    沈念sama閱讀 34,295評論 4 329
  • 正文 年R本政府宣布蛛淋,位于F島的核電站,受9級特大地震影響篡腌,放射性物質(zhì)發(fā)生泄漏褐荷。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,901評論 3 313
  • 文/蒙蒙 一嘹悼、第九天 我趴在偏房一處隱蔽的房頂上張望叛甫。 院中可真熱鬧,春花似錦杨伙、人聲如沸其监。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽抖苦。三九已至,卻和暖如春米死,著一層夾襖步出監(jiān)牢的瞬間锌历,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,978評論 1 266
  • 我被黑心中介騙來泰國打工哲身, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留辩涝,地道東北人贸伐。 一個月前我還...
    沈念sama閱讀 46,333評論 2 360
  • 正文 我出身青樓勘天,卻偏偏與公主長得像,于是被迫代替她去往敵國和親捉邢。 傳聞我的和親對象是個殘疾皇子脯丝,可洞房花燭夜當晚...
    茶點故事閱讀 43,499評論 2 348

推薦閱讀更多精彩內(nèi)容