flink 自定義函數(shù)

用戶自定義的函數(shù)是一個重要的特性,因為它們展示擴展了查詢的表達(dá)能力全蝶。

注冊用戶自定義的函數(shù)

在大多數(shù)情況下雀监,必須先注冊用戶定義的函數(shù),然后才能在查詢中使用它在塔。TableEnvironment通過調(diào)用registerFunction()方法來注冊函數(shù)掀抹。

標(biāo)量函數(shù)(Scalar Functions)

如果內(nèi)置函數(shù)中不包含必需的標(biāo)量函數(shù),則可以為Table API和SQL定義用戶自定義的標(biāo)量函數(shù)心俗。用戶定義的標(biāo)量函數(shù)將零個傲武,一個或多個標(biāo)量值映射到新的標(biāo)量值。
為了定義標(biāo)量函數(shù)城榛,必須擴展org.apache.flink.Table.function中的基類ScalarFunction和實現(xiàn)(一個或多個)評估方法揪利。標(biāo)量函數(shù)的行為由評估方法決定。必須公開聲明一個評估方法并命名為eval狠持。評估方法的參數(shù)類型和返回類型也決定標(biāo)量函數(shù)的參數(shù)和返回類型疟位。還可以通過實現(xiàn)名為eval的多個評估方法來重載評估方法。評估方法也可以支持變量參數(shù)喘垂,比如eval(String…str)甜刻。

以下示例展示如何定義標(biāo)量函數(shù)绍撞,在TableEnvironment中注冊它,并在查詢中調(diào)用它得院。

public class HashCode extends ScalarFunction {
  private int factor = 12;
  
  public HashCode(int factor) {
      this.factor = factor;
  }
  //一個約定方法
  public int eval(String s) {
      return s.hashCode() * factor;
  }
}

BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register the function
tableEnv.registerFunction("hashCode", new HashCode(10));

// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");

// use the function in SQL API
tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");

默認(rèn)情況下傻铣,評估方法的結(jié)果類型由Flink的類型提取工具決定。對于基本類型或簡單pojo祥绞,這已經(jīng)足夠了非洲。但是對于更復(fù)雜的、自定義的或復(fù)合類型蜕径,這可能是錯誤的两踏。在這些情況下,可以通過通過重寫ScalarFunctiongetResultType()手動定義結(jié)果類型的類型信息兜喻。

表值函數(shù)(Table Functions)

與用戶定義的標(biāo)量函數(shù)類似梦染,用戶定義的表函數(shù)將零個,一個或多個標(biāo)量值作為輸入?yún)?shù)朴皆。但是弓坞,與標(biāo)量函數(shù)相比,它可以返回任意數(shù)量的行作為輸出而不是單個值车荔。返回的行可以包含一個或多個列。
為了定義表函數(shù)戚扳,必須擴展org.apache.flink.table.function中的基類TableFunction和實現(xiàn)(一個或多個)評估方法忧便。表函數(shù)的行為由其評估方法決定。一個評估方法必須聲明為public并命名為eval帽借≈樵觯可以通過實現(xiàn)名為eval的多個評估方法來重載TableFunction。評估方法的參數(shù)類型決定了表函數(shù)的所有有效參數(shù)砍艾。評估方法也可以支持變量參數(shù)蒂教,比如eval(String…str)。返回的表的類型由TableFunction的泛型類型決定脆荷。評估方法使用受保護(hù)的collect(T)方法發(fā)出輸出行凝垛。
在Table API中,表函數(shù)與.join(Table). leftouterjoin(Table)一起使用蜓谋。joinLateral 操作將外部表(操作符左側(cè)的表)中的每一行與表值函數(shù)(操作符右側(cè)的表)生成的所有行關(guān)聯(lián)起來梦皮。leftOuterJoin操作符將外部表(操作符左側(cè)的表)中的每一行與表值函數(shù)(操作符右側(cè)的表)生成的所有行連接起來,并保留表函數(shù)返回空表的外部行桃焕。在SQL中剑肯,使用帶有CROSS JOIN或LEFT JOIN且具有ON TRUE連接條件的LATERAL TABLE(<Table Function>)。

以下示例展示如何定義表值函數(shù)观堂,在TableEnvironment中注冊它让网,并在查詢中調(diào)用它呀忧。

// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
public class Split extends TableFunction<Tuple2<String, Integer>> {
    private String separator = " ";
    
    public Split(String separator) {
        this.separator = separator;
    }
    //輸入一行一列輸出多行兩個列
    public void eval(String str) {
        for (String s : str.split(separator)) {
            // use collect(...) to emit a row
            collect(new Tuple2<String, Integer>(s, s.length()));
        }
    }
}

BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table myTable = ...         // table schema: [a: String]

// Register the function.
tableEnv.registerFunction("split", new Split("#"));

// Use the table function in the Java Table API. "as" specifies the field names of the table.
myTable.join(new Table(tableEnv, "split(a) as (word, length)"))
    .select("a, word, length");
myTable.leftOuterJoin(new Table(tableEnv, "split(a) as (word, length)"))
    .select("a, word, length");

// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");

默認(rèn)情況下,評估方法的結(jié)果類型由Flink的類型提取工具決定溃睹。對于基本類型或簡單pojo而账,這已經(jīng)足夠了。但是對于更復(fù)雜的丸凭、自定義的或復(fù)合類型福扬,這可能是錯誤的。在這些情況下惜犀,可以通過通過重寫TableFunctiongetResultType()手動定義結(jié)果類型的類型信息铛碑。

聚合函數(shù)

用戶定義的聚合函數(shù)將一個表的一個或多個行并且具有一個或多個屬性聚合為標(biāo)量值。
用戶定義的聚合函數(shù)是通過擴展AggregateFunction類實現(xiàn)的虽界。AggregateFunction的工作原理如下汽烦。首先,需要定義一個累加器莉御,它是保存聚合的中間結(jié)果的數(shù)據(jù)結(jié)構(gòu)撇吞。然后是利用AggregateFunctioncreateAccumulator()方法創(chuàng)建一個空的累加器。接下來礁叔,對每個輸入行調(diào)用函數(shù)的accumulate()方法來更新累加器牍颈。處理完所有行之后,調(diào)用函數(shù)的getValue()方法來計算并返回最終結(jié)果琅关。
除了上述所必須的方法之外煮岁,還有一些可選擇性實現(xiàn)的約定方法。雖然其中一些方法允許系統(tǒng)更高效地執(zhí)行查詢涣易,但是其他方法對于某些用例則必需的画机。e.g. retract():在有界OVER窗口上聚合是必需的。merge():許多批處理聚合和會話窗口聚合都需要新症。resetAccumulator():許多批處理聚合都需要步氏。
AggregateFunction的所有方法都必須聲明為public的,而不是static徒爹。

/**
 * Accumulator for WeightedAvg.
 */
public static class WeightedAvgAccum {
    public long sum = 0;
    public int count = 0;
}

/**
 * Weighted Average user-defined aggregate function.
 */
public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {

    @Override
    public WeightedAvgAccum createAccumulator() {
        return new WeightedAvgAccum();
    }

    @Override
    public Long getValue(WeightedAvgAccum acc) {
        if (acc.count == 0) {
            return null;
        } else {
            return acc.sum / acc.count;
        }
    }

    public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
        acc.sum += iValue * iWeight;
        acc.count += iWeight;
    }
    //可選擇性的實現(xiàn)
    public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
        acc.sum -= iValue * iWeight;
        acc.count -= iWeight;
    }
     //可選擇性的實現(xiàn)
    public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
        Iterator<WeightedAvgAccum> iter = it.iterator();
        while (iter.hasNext()) {
            WeightedAvgAccum a = iter.next();
            acc.count += a.count;
            acc.sum += a.sum;
        }
    }
     //可選擇性的實現(xiàn)
    public void resetAccumulator(WeightedAvgAccum acc) {
        acc.count = 0;
        acc.sum = 0L;
    }
}

// register function
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("wAvg", new WeightedAvg());

// use function
tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");

將UDF與運行時集成

有時荚醒,用戶定義的函數(shù)可能需要獲取全局運行時信息,或者在實際工作之前進(jìn)行一些設(shè)置/清理工作隆嗅。用戶自定義的函數(shù)可以通過覆蓋open()和close()方法實現(xiàn)腌且。
open()方法在評估方法之前調(diào)用一次。在最后一次調(diào)用評估方法之后在調(diào)用close()方法榛瓮。
open()方法提供一個FunctionContext铺董,其中包含有關(guān)用戶定義函數(shù)在其中執(zhí)行的上下文的信息。

public class HashCode extends ScalarFunction {

    private int factor = 0;

    @Override
    public void open(FunctionContext context) throws Exception {
        // access "hashcode_factor" parameter
        // "12" would be the default value if parameter does not exist
        factor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12")); 
    }

    public int eval(String s) {
        return s.hashCode() * factor;
    }
}

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// set job parameter
Configuration conf = new Configuration();
conf.setString("hashcode_factor", "31");
env.getConfig().setGlobalJobParameters(conf);

// register the function
tableEnv.registerFunction("hashCode", new HashCode());

// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");

// use the function in SQL
tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市精续,隨后出現(xiàn)的幾起案子坝锰,更是在濱河造成了極大的恐慌,老刑警劉巖重付,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件顷级,死亡現(xiàn)場離奇詭異,居然都是意外死亡确垫,警方通過查閱死者的電腦和手機弓颈,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來删掀,“玉大人翔冀,你說我怎么就攤上這事∨幔” “怎么了纤子?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵,是天一觀的道長款票。 經(jīng)常有香客問我控硼,道長,這世上最難降的妖魔是什么艾少? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任卡乾,我火速辦了婚禮,結(jié)果婚禮上缚够,老公的妹妹穿的比我還像新娘幔妨。我一直安慰自己,他們只是感情好潮瓶,可當(dāng)我...
    茶點故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著钙姊,像睡著了一般毯辅。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上煞额,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天思恐,我揣著相機與錄音,去河邊找鬼膊毁。 笑死胀莹,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的婚温。 我是一名探鬼主播描焰,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了荆秦?” 一聲冷哼從身側(cè)響起篱竭,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎步绸,沒想到半個月后掺逼,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡瓤介,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年吕喘,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片刑桑。...
    茶點故事閱讀 38,605評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡氯质,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出漾月,到底是詐尸還是另有隱情病梢,我是刑警寧澤,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布梁肿,位于F島的核電站蜓陌,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏吩蔑。R本人自食惡果不足惜钮热,卻給世界環(huán)境...
    茶點故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望烛芬。 院中可真熱鬧隧期,春花似錦、人聲如沸赘娄。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽遣臼。三九已至性置,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間揍堰,已是汗流浹背鹏浅。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留屏歹,地道東北人隐砸。 一個月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像蝙眶,于是被迫代替她去往敵國和親季希。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,472評論 2 348

推薦閱讀更多精彩內(nèi)容