前言
???????? Flink Table 和 SQL 內(nèi)置了很多 SQL 中支持的函數(shù);如果有無法滿足的需要抖棘,則可以實現(xiàn)用戶自定義的函數(shù)(UDF
)來解決茂腥。
一、系統(tǒng)內(nèi)置函數(shù)
???????? Flink Table API 和 SQL 為用戶提供了一組用于數(shù)據(jù)轉(zhuǎn)換的內(nèi)置函數(shù)切省。SQL 中支持的很多函數(shù)最岗,Table API 和 SQL 都已經(jīng)做了實現(xiàn),其它還在快速開發(fā)擴展中朝捆。
以下是一些典型函數(shù)的舉例般渡,全部的內(nèi)置函數(shù),可以參考官網(wǎng)介紹。
類型TableApiSQLAPI比較函數(shù)ANY1 === ANY2value1 = value2比較函數(shù)NY1 > ANY2value1 > value2邏輯函數(shù)BOOLEAN1 || BOOLEAN2boolean1 OR boolean2邏輯函數(shù)BOOLEAN.isFalseboolean IS FALSE邏輯函數(shù)!BOOLEANNOT boolean算術(shù)函數(shù)NUMERIC1 + NUMERIC2numeric1 + numeric2算術(shù)函數(shù)NUMERIC1.power(NUMERIC2)POWER(numeric1, numeric2)字符串函數(shù)STRING1 + STRING2string1 || string2字符串函數(shù)STRING.upperCase()UPPER(string)字符串函數(shù)STRING.charLength()CHAR_LENGTH(string)時間函數(shù)STRING.toDateDATE string時間函數(shù)STRING.toTimestampTIMESTAMP string時間函數(shù)currentTime()CURRENT_TIME時間函數(shù)NUMERIC.daysINTERVAL string range時間函數(shù)NUMERIC.minutes聚合函數(shù)FIELD.countCOUNT(*)聚合函數(shù)FIELD.sum0SUM([ ALL | DISTINCT ] expression)聚合函數(shù)RANK()聚合函數(shù)ROW_NUMBER()二驯用、Flink UDF
???????? 用戶定義函數(shù)(User-defined Functions脸秽,UDF)是一個重要的特性,因為它們顯著地擴展了查詢(Query)的表達能力。一些系統(tǒng)內(nèi)置函數(shù)無法解決的需求,我們可以用 UDF 來自定義實現(xiàn)
固惯。
2.1 注冊用戶自定義函數(shù) UDF
???????? 在大多數(shù)情況下,用戶定義的函數(shù)必須先注冊
片酝,然后才能在查詢中使用。不需要專門為Scala 的 Table API 注冊函數(shù)挖腰。????????
? ? ? ? ?函數(shù)通過調(diào)用 registerFunction
()方法在 TableEnvironment 中注冊
雕沿。當用戶定義的函數(shù)被注冊時,它被插入到 TableEnvironment 的函數(shù)目錄中曙聂,這樣 Table API 或 SQL 解析器就可以識別并正確地解釋它晦炊。
2.2 標量函數(shù)(Scalar Functions)
???????? 用戶定義的標量函數(shù),可以將 0宁脊、1 或多個標量值断国,映射到新的標量值。?????????
? ? ? ? ?為了定義標量函數(shù)榆苞,必須在 org.apache.flink.table.functions
中擴展基類 Scalar Function稳衬,并實現(xiàn)(一個或多個)求值(evaluation,eval)方法坐漏。標量函數(shù)的行為由求值方法決定薄疚,求值方法必須公開聲明并命名為 eval(直接 def 聲明,沒有 override)赊琳。求值方法的參數(shù)類型和返回類型街夭,確定了標量函數(shù)的參數(shù)和返回類型。
???????? 在下面的代碼中躏筏,我們定義自己的 HashCode 函數(shù)板丽,在 TableEnvironment 中注冊它,并在查詢中調(diào)用它趁尼。準備數(shù)據(jù)
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9
代碼如下
package?udf
import?org.apache.flink.streaming.api.scala._
import?org.apache.flink.table.api.DataTypes
import?org.apache.flink.table.api.scala._
import?org.apache.flink.table.descriptors.{Csv,?FileSystem,?Schema}
import?org.apache.flink.table.functions.ScalarFunction
import?org.apache.flink.types.Row
/**
*?@Package?udf
*?@File :FlinkSqlUdfHashCode.java
*?@author?大數(shù)據(jù)老哥
*?@date?2020/12/29?21:58
*?@version?V1.0*/
object?FlinkSqlUdfHashCode?{
?def?main(args:?Array[String]):?Unit?=?{
???//1.構(gòu)建運行環(huán)境
???val?env?=?StreamExecutionEnvironment.getExecutionEnvironment
???env.setParallelism(1)?//?設置并行度為1
???//2.構(gòu)建TableEnv
???val?tableEnv?=?StreamTableEnvironment.create(env)
???//3.構(gòu)建數(shù)據(jù)源
???tableEnv.connect(new?FileSystem().path("./data/sensor.txt"))
?????.withFormat(new?Csv())
?????.withSchema(new?Schema()
???????.field("id",?DataTypes.STRING())
???????.field("timestamp",?DataTypes.INT())
???????.field("temperature",?DataTypes.DOUBLE())
?????).createTemporaryTable("sensor")
???//?轉(zhuǎn)為表
???val?tableSensor?=?tableEnv.from("sensor")
???//?床架轉(zhuǎn)換對象
???val?code?=?new?HashCode()
???//使用tableAPI?進行測試
???val?tableRes?=?tableSensor.select('id,?code('id))
???tableEnv.registerFunction("code",code)?//?注冊udf
???val?tableSql?=?tableEnv.sqlQuery(
?????"""
???????|select
???????|id,
???????|code(id)
???????|from
???????|sensor???????|""".stripMargin)
???//?輸出
???tableRes.toAppendStream[Row].print("tableAPI")
???tableSql.toAppendStream[Row].print("tableSql")
???env.execute("FlinkSqlUdfHashCode")
?}
?class?HashCode()?extends?ScalarFunction?{
???def?eval(s:?String):?String?=?{
?????s.hashCode.toString
???}
?}
}
2.3 表函數(shù)(Table Functions)
???????? 與用戶定義的標量函數(shù)類似埃碱,用戶定義的表函數(shù),可以將 0酥泞、1 或多個標量值作為輸入?yún)?shù)砚殿;?????????
? ? ? ? ?與標量函數(shù)不同的是,它可以返回任意數(shù)量的行作為輸出芝囤,而不是單個值似炎。為了定義一個表函數(shù)辛萍,必須擴展 org.apache.flink.table.functions
中的基類 TableFunction
并實現(xiàn)(一個或多個)求值方法。表函數(shù)的行為由其求值方法決定羡藐,求值方法必須是 public
的叹阔,并命名為 eval
。求值方法的參數(shù)類型传睹,決定表函數(shù)的所有有效參數(shù)。?????????
? ? ? ? ?返回表的類型由 TableFunction 的泛型類型確定岸晦。求值方法使用 protected collect(T)方法發(fā)出輸出行欧啤。?????????
? ? ? ? ? 在 Table API 中,Table 函數(shù)需要與.joinLateral
或.leftOuterJoinLateral
一起使用启上。
? ? ? ? ? ? ?joinLateral
算子邢隧,會將外部表中的每一行,與表函數(shù)(TableFunction冈在,算子的參數(shù)是它的表達式)計算得到的所有行連接起來倒慧。? ? ? ? ?
? ? ? ? 而 leftOuterJoinLateral 算子,則是左外連接包券,它同樣會將外部表中的每一行與表函數(shù)計算生成的所有行連接起來纫谅;并且,對于表函數(shù)返回的是空表的外部行溅固,也要保留下來付秕。?????????
? ? ? ? ?在 SQL 中,則需要使用 Lateral Table()侍郭,或者帶有 ON TRUE 條件的左連接询吴。
???????? 下面的代碼中,我們將定義一個表函數(shù)亮元,在表環(huán)境中注冊它猛计,并在查詢中調(diào)用它。
數(shù)據(jù)準備
hello|word,hello|spark
hello|Flink,hello|java,hello|大數(shù)據(jù)老哥
編寫代碼
package?udf
import?org.apache.flink.streaming.api.scala._
import?org.apache.flink.table.api.scala._
import?org.apache.flink.table.functions.TableFunction
import?org.apache.flink.types.Row
/**
?*?@Package?udf
?*?@File :FlinkSqlUDFTableFunction.java
?*?@author?大數(shù)據(jù)老哥
?*?@date?2020/12/29?23:10
?*?@version?V1.0?*/
object?FlinkSqlUDFTableFunction?{
??def?main(args:?Array[String]):?Unit?=?{
????//1.構(gòu)建運行環(huán)境
????val?env?=?StreamExecutionEnvironment.getExecutionEnvironment
????env.setParallelism(1)?//?設置并行度為1
????//2.構(gòu)建TableEnv
????val?tableEnv?=?StreamTableEnvironment.create(env)
????//3.構(gòu)建數(shù)據(jù)源
????val?data?=?env.readTextFile("./data/words.txt")
????//?解析數(shù)據(jù)
????val?wordData:?DataStream[String]?=?data.flatMap(_.split(","))
????//?類型轉(zhuǎn)換
????val?tableWord?=?tableEnv.fromDataStream(wordData,'id)
????//?調(diào)用TableFunction
????val?split?=?new?Split()
????//?Table?API?方式一
????val?resTable1?=?tableWord.
??????joinLateral(split('id)?as('word,'length))
??????.select('id,'word,'length?)
????//??Table?API??方式二
????val?resTable2?=?tableWord.
??????leftOuterJoinLateral(split('id)?as('word,'length))
??????.select('id,'word,'length?)
????//?將數(shù)據(jù)注冊成表
?????tableEnv.createTemporaryView("sensor",tableWord)
?????tableEnv.registerFunction("split",split)
????//?SQL?方式一
????val?tableSQL1?=?tableEnv.sqlQuery(
??????"""
????????|select
????????|id,
????????|word,
????????|length
????????|from
????????|sensor?,LATERAL?TABLE(split(id))?AS?newsensor(word,?length)????????|""".stripMargin)
????//??SQL?方式二
????val?TableSQL2?=?tableEnv.sqlQuery(
??????"""
????????|select
????????|id,
????????|word,
????????|length
????????|from
????????|sensor
????????|?LEFT?JOIN?LATERAL?TABLE(split(id))?AS?newsensor(word,?length)?ON?TRUE????????|""".stripMargin)
????//?調(diào)用數(shù)據(jù)
????resTable1.toAppendStream[Row].print("resTable1")
????resTable2.toAppendStream[Row].print("resTable2")
????tableSQL1.toAppendStream[Row].print("tableSQL1")
????TableSQL2.toAppendStream[Row].print("TableSQL2")
????env.execute("FlinkSqlUDFTableFunction")
??}
??class?Split()?extends?TableFunction[(String,Int)]?{
????def?eval(str:?String):?Unit?=?{
??????str.split("\\|").foreach(
????????word?=>?collect((word,?word.length))
??????)
????}
??}
}
2.4 聚合函數(shù)(Aggregate Functions)
? ? ? ?假設現(xiàn)在有一張表爆捞,包含了各種飲料的數(shù)據(jù)奉瘤。該表由三列(id、name 和 price)嵌削、五行組成數(shù)據(jù)∶茫現(xiàn)在我們需要找到表中所有飲料的最高價格,即執(zhí)行 max()聚合苛秕,結(jié)果將是一個數(shù)值肌访。AggregateFunction 的工作原理如下:
- 首先,它需要一個累加器艇劫,用來保存聚合中間結(jié)果的數(shù)據(jù)結(jié)構(gòu)(狀態(tài))吼驶。可以通過調(diào)用 AggregateFunction 的
createAccumulator()
方法創(chuàng)建空累加器。 - 隨后蟹演,對每個輸入行調(diào)用函數(shù)的
accumulate()
方法來更新累加器风钻。 - 處理完所有行后,將調(diào)用函數(shù)的
getValue()
方法來計算并返回最終結(jié)果酒请。AggregationFunction 要求必須實現(xiàn)的方法:
???????? 除了上述方法之外骡技,還有一些可選擇實現(xiàn)的方法。其中一些方法羞反,可以讓系統(tǒng)執(zhí)行查詢更有效率布朦,而另一些方法,對于某些場景是必需的昼窗。例如是趴,如果聚合函數(shù)應用在會話窗口(session group window)上下文中,則 merge()方法是必需的澄惊。
- retract()
- merge()
- resetAccumulator()
???????? ?接下來我們寫一個自定義AggregateFunction,計算一個每個price的平均值唆途。
數(shù)據(jù)準備
1,Latte,6
2,Milk,3
3,Breve,5
4,Mocha,8
5,Tea,4
代碼如下
package?udf
import?org.apache.calcite.rel.`type`.{RelDataType,?RelDataTypeFactory}
import?org.apache.flink.streaming.api.scala._
import?org.apache.flink.table.api.DataTypes
import?org.apache.flink.table.api.scala._
import?org.apache.flink.table.descriptors.{Csv,?FileSystem,?Schema}
import?org.apache.flink.table.functions.AggregateFunction
import?org.apache.flink.types.Row
import?java.util
/**
?*?@Package?udf
?*?@File :FlinkSQUDFAggregateFunction.java
?*?@author?大數(shù)據(jù)老哥
?*?@date?2020/12/30?22:06
?*?@version?V1.0?*/
object?FlinkSQUDFAggregateFunction?{
??def?main(args:?Array[String]):?Unit?=?{
????//1.構(gòu)建運行環(huán)境
????val?env?=?StreamExecutionEnvironment.getExecutionEnvironment
????env.setParallelism(1)?//?設置并行度為1
????//2.構(gòu)建TableEnv
????val?tableEnv?=?StreamTableEnvironment.create(env)
????//3.構(gòu)建數(shù)據(jù)源
????tableEnv.connect(new?FileSystem().path("./data/datas"))
??????.withFormat(new?Csv)
??????.withSchema(new?Schema()
????????.field("id",?DataTypes.STRING())
????????.field("name",?DataTypes.STRING())
????????.field("price",?DataTypes.DOUBLE())
??????).createTemporaryTable("datas")
????val?AvgTemp?=?new?AvgTemp()
????val?table?=?tableEnv.from("datas")
????val?resTableApi?=?table.groupBy('id)
??????.aggregate(AvgTemp('price)?as?'sumprice)
??????.select('id,?'sumprice)
????tableEnv.registerFunction("avgTemp",AvgTemp)
????val?tablesql?=?tableEnv.sqlQuery(
??????"""
????????|select
????????|id?,avgTemp(price)
????????|from?datas?group?by?id????????|""".stripMargin)
????resTableApi.toRetractStream[Row].print("resTableApi")
????tablesql.toRetractStream[Row].print("tablesql")
????env.execute("FlinkSQUDFAggregateFunction")
??}
??class?AvgTempAcc?{
????var?sum:?Double?=?0.0
????var?count:?Int?=?0
??}
??class?AvgTemp?extends?AggregateFunction[Double,?AvgTempAcc]?{
????override?def?getValue(acc:?AvgTempAcc):?Double?=?{
??????acc.sum?/?acc.count
????}
????override?def?createAccumulator():?AvgTempAcc?=?new?AvgTempAcc()
??}
??def?accumulate(accumulator:?AvgTempAcc,?price:?Double):?Unit?=?{
????accumulator.sum?+=?price
????accumulator.count?+=?1
??}
}
2.5表聚合函數(shù)(Table Aggregate Functions)
- 為首先,它同樣需要一個累加器(Accumulator)掸驱,它是保存聚合中間結(jié)果的數(shù)據(jù)結(jié)構(gòu)肛搬。通過調(diào)用
TableAggregateFunction
的createAccumulator
()方法可以創(chuàng)建空累加器。 - 為隨后亭敢,對每個輸入行調(diào)用函數(shù)的
accumulate
()方法來更新累加器滚婉。 - 為處理完所有行后,將調(diào)用函數(shù)的
emitValue
()方法來計算并返回最終結(jié)果帅刀。除了上述方法之外让腹,還有一些可選擇實現(xiàn)的方法。 - retract()
- merge()
- resetAccumulator()
- emitValue()
- emitUpdateWithRetract()
???????? 接下來我們寫一個自定義 TableAggregateFunction扣溺,用來提取每個 price 最高的兩個平均值骇窍。
數(shù)據(jù)準備
1,Latte,6
2,Milk,3
3,Breve,5
4,Mocha,8
5,Tea,4
代碼如下
package?udf
import?org.apache.flink.streaming.api.scala._
import?org.apache.flink.table.api.DataTypes
import?org.apache.flink.table.api.scala._
import?org.apache.flink.table.descriptors.{Csv,?FileSystem,?Schema}
import?org.apache.flink.table.functions.TableAggregateFunction
import?org.apache.flink.types.Row
import?org.apache.flink.util.Collector
import?udf.FlinkSQUDFAggregateFunction.AvgTemp
/**
?*?@Package?udf
?*?@File :FlinkSqlUDFTableAggregateFunction.java
?*?@author?大數(shù)據(jù)老哥
?*?@date?2020/12/30?22:53
?*?@version?V1.0?*/
object?FlinkSqlUDFTableAggregateFunction?{
??def?main(args:?Array[String]):?Unit?=?{
????//1.構(gòu)建運行環(huán)境
????val?env?=?StreamExecutionEnvironment.getExecutionEnvironment
????env.setParallelism(1)?//?設置并行度為1
????//2.構(gòu)建TableEnv
????val?tableEnv?=?StreamTableEnvironment.create(env)
????//3.構(gòu)建數(shù)據(jù)源
????tableEnv.connect(new?FileSystem().path("./data/datas"))
??????.withFormat(new?Csv)
??????.withSchema(new?Schema()
????????.field("id",?DataTypes.STRING())
????????.field("name",?DataTypes.STRING())
????????.field("price",?DataTypes.DOUBLE())
??????).createTemporaryTable("datas")
????val?table?=?tableEnv.from("datas")
????val?temp?=?new?Top2Temp()
????val?tableApi?=?table.groupBy('id)
??????.flatAggregate(temp('price)?as('tmpprice,?'rank))
??????.select('id,?'tmpprice,?'rank)
????tableEnv.registerFunction("temp",temp)
????tableApi.toRetractStream[Row].print()
????env.execute("FlinkSqlUDFTableAggregateFunction")
??}
??class?Top2TempAcc?{
????var?highestPrice:?Double?=?Int.MinValue
????var?secodeHighestPrice:?Double?=?Int.MinValue
??}
??class?Top2Temp?extends?TableAggregateFunction[(Double,?Int),?Top2TempAcc]?{
????override?def?createAccumulator():?Top2TempAcc?=?new?Top2TempAcc
????def?accumulate(acc:?Top2TempAcc,?temp:?Double):?Unit?=?{
??????if?(temp?>?acc.highestPrice)?{
????????acc.secodeHighestPrice?=?acc.highestPrice
????????acc.highestPrice?=?temp
??????}?else?if?(temp?>?acc.secodeHighestPrice)?{
????????acc.highestPrice?=?temp
??????}
????}
????def?emitValue(acc:?Top2TempAcc,?out:?Collector[(Double,?Int)]):?Unit?=?{
??????out.collect(acc.highestPrice,?1)
??????out.collect(acc.secodeHighestPrice,?2)
????}
??}
}
總結(jié)
???????? ?好了今天的內(nèi)容就分享到這里了。上述主要講解了一個系統(tǒng)自己帶的函數(shù)锥余,但是往往企業(yè)中不光只需要這些函數(shù)腹纳,有好多需求是本身函數(shù)是無法完成的。這時候就要用到我們的自定義函數(shù)了驱犹。他可以根據(jù)我們自己的需要進行編寫代碼來實現(xiàn)我們想要的功能嘲恍。我是大數(shù)據(jù)老哥我們下期見~~~