概念
- 自定義標(biāo)量函數(shù)费彼,接收一個(gè)或多個(gè)列柄粹,輸出一個(gè)列,行與行是一一對(duì)應(yīng)的
- 構(gòu)造函數(shù)在jobmanager上創(chuàng)建udf時(shí)就執(zhí)行
- open方法在所有并行子任務(wù)上都執(zhí)行一次项乒,且在調(diào)用該udf時(shí)才會(huì)執(zhí)行open方法
- 通過(guò)DataTypeHint注解和FunctionHint注解可以自定義udf參數(shù)和返回類型
- 通過(guò)重寫getTypeInference方法動(dòng)態(tài)指定udf返回類型
- deterministic為true時(shí),若eval方法無(wú)參或傳入常量參數(shù)葵蒂,則eval方法僅會(huì)執(zhí)行一次交播,所有行的調(diào)用結(jié)果都采用此次執(zhí)行的eval返回值。若deterministic為false時(shí)践付,則無(wú)論何種情況均每行執(zhí)行一次eval方法獲得對(duì)應(yīng)的返回值秦士。deterministic默認(rèn)為true,可通過(guò)重寫isDeterministic方法指定其值永高。
定義
定義udf類隧土,繼承ScalarFunction,并實(shí)現(xiàn)eval方法命爬,參數(shù)自定義
// 實(shí)現(xiàn)nvl函數(shù)曹傀,接收任意類型的參數(shù),若第一個(gè)參數(shù)為null則返回第二個(gè)參數(shù)的值饲宛,否則返回第一個(gè)參數(shù)皆愉,且返回值類型恒等于第一個(gè)參數(shù)類型
import org.apache.flink.table.functions.ScalarFunction;
private Class<?> valueConvertClass;
private Class<?> defaultValueConvertClass;
Constructor<?> convertConstructor = null;
Method staticConvertMethod = null;
Map<Class<?>, Class<?>> typeMap;
public Nvl() {
// 保存引用類型與基本類型的對(duì)應(yīng)關(guān)系, 因?yàn)樗衯alueOf轉(zhuǎn)換方法都要求傳入基本類型, 而defaultValueConvertClass獲取到的有可能是其引用類型
typeMap = new HashMap<>();
typeMap.put(Integer.class, int.class);
typeMap.put(Long.class, long.class);
typeMap.put(Double.class, double.class);
typeMap.put(Character.class, char.class);
typeMap.put(Byte.class, byte.class);
typeMap.put(Short.class, short.class);
typeMap.put(Float.class, float.class);
}
@Override
public void open(FunctionContext context) throws Exception {
if (valueConvertClass != defaultValueConvertClass) {
if (valueConvertClass.equals(BigDecimal.class)) {
// 對(duì)應(yīng)FlinkSQL的DECIMAL類型
// 使用BigDecimal的構(gòu)造函數(shù)把目標(biāo)對(duì)象轉(zhuǎn)為BigDecimal對(duì)象, 源類型BigDecimal,int,long,char[],string,double,BigInteger
convertConstructor = BigDecimal.class.getConstructor(typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
} else if (valueConvertClass.equals(String.class)) {
// 對(duì)應(yīng)flinkSQL的STRING、VARCHAR艇抠、CHAR類型
// 使用String.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為String對(duì)象, 原類型支持所有基本數(shù)據(jù)類型
staticConvertMethod = String.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
} else if (valueConvertClass.equals(Integer.class)) {
// 對(duì)應(yīng)FlinkSQL的INT類型
// 使用Integer.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為Integer對(duì)象, 源類型僅支持int類型和String類型
staticConvertMethod = Integer.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
} else if (valueConvertClass.equals(Boolean.class)) {
// 對(duì)應(yīng)flinkSQL的BOOLEAN類型
// 使用Boolean.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為Boolean對(duì)象, 源類型僅支持boolean類型和String類型
staticConvertMethod = Boolean.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
} else if (valueConvertClass.equals(Byte.class)) {
// 對(duì)應(yīng)FlinkSQL的TINYINT類型
// 使用Byte.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為Byte對(duì)象, 源類型僅支持byte類型和String類型
staticConvertMethod = Byte.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
} else if (valueConvertClass.equals(Short.class)) {
// 對(duì)應(yīng)FlinkSQL的SMALLINT類型
// 使用Short.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為Short對(duì)象, 源類型僅支持short類型和String類型
staticConvertMethod = Short.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
}else if (valueConvertClass.equals(Long.class)) {
// 對(duì)應(yīng)FlinkSQL的BIGINT類型
// 使用Long.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為L(zhǎng)ong對(duì)象, 源類型僅支持long類型和String類型
staticConvertMethod = Long.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
} else if (valueConvertClass.equals(Float.class)) {
// 對(duì)應(yīng)FlinkSQL的FLOAT類型
// 使用Float.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為Float對(duì)象, 源類型僅支持float類型和String類型
staticConvertMethod = Float.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
} else if (valueConvertClass.equals(Double.class)) {
// 對(duì)應(yīng)FlinkSQL的DOUBLE類型
// 使用Double.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為Double對(duì)象, 源類型僅支持double類型和String類型
staticConvertMethod = Double.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
} else if (valueConvertClass.equals(Date.class)) {
// 對(duì)應(yīng)FlinkSQL的DATE類型
// 使用Date.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為Date對(duì)象, 源類型僅支持LocalDate類型和String類型
staticConvertMethod = Date.class.getMethod("valueOf", defaultValueConvertClass);
} else if (valueConvertClass.equals(LocalDate.class)) {
// 對(duì)應(yīng)FlinkSQL的DATE類型
// 使用LocalDate.parse方法把目標(biāo)對(duì)象轉(zhuǎn)為L(zhǎng)ocalDate對(duì)象, 源類型僅支持CharSequence類型
staticConvertMethod = LocalDate.class.getMethod("parse", defaultValueConvertClass);
} else if (valueConvertClass.equals(Time.class)) {
// 對(duì)應(yīng)FlinkSQL的TIME(0)類型
// 使用Time.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為Time對(duì)象, 源類型僅支持LocalTime類型和String類型
staticConvertMethod = Time.class.getMethod("valueOf", defaultValueConvertClass);
} else {
throw new RuntimeException("unsupported datatype: " + defaultValueConvertClass.getName());
}
}
}
// eval方法幕庐,實(shí)現(xiàn)udf返回
// 重寫getTypeInference方法,以及聲明eval方法返回類型為Object家淤,實(shí)現(xiàn)動(dòng)態(tài)返回類型
// 使用DataTypeHin注解自定義udf參數(shù)類型异剥,inputGroup = InputGroup.ANY時(shí)表示接收任意類型的參數(shù),搭配Object類型的參數(shù)類型絮重,實(shí)現(xiàn)對(duì)任意類型參數(shù)的接收處理
public Object eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object value,
@DataTypeHint(inputGroup = InputGroup.ANY) Object defaultValue) throws InvocationTargetException, InstantiationException, IllegalAccessException {
if (value != null) {
return value;
} else if (staticConvertMethod != null) {
return staticConvertMethod.invoke(null, defaultValue);
} else if (convertConstructor != null) {
return convertConstructor.newInstance(defaultValue);
} else {
return defaultValue;
}
}
// 獲取第一個(gè)參數(shù)的類型, 此類型通過(guò)其字段類型得到冤寿,并將其作為udf返回類型
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.outputTypeStrategy(callContext -> {
// getConversionClass為引用類型
valueConvertClass = callContext.getArgumentDataTypes().get(0).getConversionClass();
defaultValueConvertClass = callContext.getArgumentDataTypes().get(1).getConversionClass();
return Optional.of(callContext.getArgumentDataTypes().get(0));
})
.build();
}
}
使用
// table api
table.select($"nvl",call(new Nvl(),$"col1",$"col2"));
// flink sql
tableEnv.createTemporaryFunction("nvl",Nvl.class);
tableEnv.createTemporaryFunction("nvl",new Nvl());