實(shí)時(shí)計(jì)算支持三種自定義函數(shù)(UDX)朱盐,分別是:
UDF(User Defined Function)自定義標(biāo)量函數(shù),輸入一條記錄的0個(gè)郭毕、1個(gè)或者多個(gè)值捏萍,返回一個(gè)值。
UDAF(User Defined Aggregation Function)自定義聚合函數(shù)奕扣,將多條記錄聚合成一條值。
UDTF(User Defined Table Function)自定義表值函數(shù)掌敬,能將多條記錄轉(zhuǎn)換后再輸出惯豆,輸出記錄的個(gè)數(shù)和輸入記錄數(shù)不需要一一對應(yīng),也是唯一能返回多個(gè)字段的自定義函數(shù)奔害。
本文檔通過使用UDTF解析字節(jié)數(shù)組成多個(gè)字段
如存儲(chǔ)的是{"name":"Alice", "age":13, "grade":"A"}的字節(jié)數(shù)組楷兽,通過UDTF 變成三列name,age,grade 值分別為 Alice,13,A
1 UDTF
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.shaded.calcite.com.google.common.collect.Lists;
import org.apache.flink.table.api.functions.TableFunction;
import org.apache.flink.table.api.types.DataType;
import org.apache.flink.table.api.types.TypeInfoWrappedDataType;
import org.apache.flink.types.Row;
import java.nio.charset.Charset;
import java.util.List;
public class kafkaUDTF extends TableFunction<Row> {
public kafkaUDTF() {
}
private List<Class> clazzes = Lists.newArrayList();
private List<String> fieldName = Lists.newArrayList();
public kafkaUDTF(String... args) {
for (String arg : args) {
if (arg.contains(",")) {
//將 "VARCHAR" 轉(zhuǎn)換為 String.class, "INTEGER" 轉(zhuǎn)為 Integer.class等
clazzes.add(ClassUtil.stringConvertClass(arg.split(",")[1]));
fieldName.add(arg.split(",")[0]);
}
}
}
public static void main(String[] args) {
kafkaUDTF kafkaUDTF = new kafkaUDTF("name,VARCHAR", "age,INTEGER", "grade,VARCHAR");
kafkaUDTF.eval("{\"name\":\"Alice\", \"age\":13, \"grade\":\"A\"}".getBytes());
}
public void eval(byte[] message) {
String mess = new String(message, Charset.forName("UTF-8"));
JSONObject json = JSON.parseObject(mess);
Row row = new Row(fieldName.size());
for (int i = 0; i < fieldName.size(); i++) {
row.setField(i, json.get(fieldName.get(i)));
}
collect(row);
}
@Override
// 如果返回值是Row,就必須重載實(shí)現(xiàn)這個(gè)方法华临,顯式地告訴系統(tǒng)返回的字段類型
public DataType getResultType(Object[] arguments, Class[] argTypes) {
TypeInformation[] typeInformations = new TypeInformation[clazzes.size()];
for (int i = 0; i < clazzes.size(); i++) {
typeInformations[i] = BasicTypeInfo.of(clazzes.get(i));
}
RowTypeInfo rowType = new RowTypeInfo(typeInformations);
return new TypeInfoWrappedDataType(rowType);
}
}
2. Main
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStreamSource<byte[]> byteSource = env.fromElements("{\"name\":\"Alice\", \"age\":13, \"grade\":\"A\"}".getBytes());
Table byteSourceTable = tableEnv.fromDataStream(byteSource, "message");
tableEnv.registerTable("b", byteSourceTable);
tableEnv.registerFunction("kafkaUDTF", new kafkaUDTF("name,VARCHAR", "age,INTEGER", "grade,VARCHAR"));
Table res1 = tableEnv.sqlQuery("select T.name, T.age, T.grade\n" +
"from b as S\n" +
"LEFT JOIN LATERAL TABLE(kafkaUDTF(message)) as T(name, age, grade) ON TRUE");
res1.writeToSink(new PrintTableSink(TimeZone.getDefault()));
tableEnv.execute();
//打印結(jié)果為 task-1> (+)Alice,13,A
3. 依賴
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-core</artifactId>
<version>1.5.1</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.blink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.9</version>
</dependency>
4.擴(kuò)展性
由于blink 的kafka source只支持字節(jié)數(shù)組芯杀,可通過這個(gè)UDTF從字節(jié)數(shù)組解析出想要的字段。