flinksql - operator - udf

概念
  1. 自定義標(biāo)量函數(shù)费彼,接收一個(gè)或多個(gè)列柄粹,輸出一個(gè)列,行與行是一一對(duì)應(yīng)的
  2. 構(gòu)造函數(shù)在jobmanager上創(chuàng)建udf時(shí)就執(zhí)行
  3. open方法在所有并行子任務(wù)上都執(zhí)行一次项乒,且在調(diào)用該udf時(shí)才會(huì)執(zhí)行open方法
  4. 通過(guò)DataTypeHint注解和FunctionHint注解可以自定義udf參數(shù)和返回類型
  5. 通過(guò)重寫getTypeInference方法動(dòng)態(tài)指定udf返回類型
  6. deterministic為true時(shí),若eval方法無(wú)參或傳入常量參數(shù)葵蒂,則eval方法僅會(huì)執(zhí)行一次交播,所有行的調(diào)用結(jié)果都采用此次執(zhí)行的eval返回值。若deterministic為false時(shí)践付,則無(wú)論何種情況均每行執(zhí)行一次eval方法獲得對(duì)應(yīng)的返回值秦士。deterministic默認(rèn)為true,可通過(guò)重寫isDeterministic方法指定其值永高。
定義

定義udf類隧土,繼承ScalarFunction,并實(shí)現(xiàn)eval方法命爬,參數(shù)自定義

// 實(shí)現(xiàn)nvl函數(shù)曹傀,接收任意類型的參數(shù),若第一個(gè)參數(shù)為null則返回第二個(gè)參數(shù)的值饲宛,否則返回第一個(gè)參數(shù)皆愉,且返回值類型恒等于第一個(gè)參數(shù)類型
import org.apache.flink.table.functions.ScalarFunction;

private Class<?> valueConvertClass;
    private Class<?> defaultValueConvertClass;
    Constructor<?> convertConstructor = null;
    Method staticConvertMethod = null;
    Map<Class<?>, Class<?>> typeMap;

    public Nvl() {
        // 保存引用類型與基本類型的對(duì)應(yīng)關(guān)系, 因?yàn)樗衯alueOf轉(zhuǎn)換方法都要求傳入基本類型, 而defaultValueConvertClass獲取到的有可能是其引用類型
        typeMap = new HashMap<>();
        typeMap.put(Integer.class, int.class);
        typeMap.put(Long.class, long.class);
        typeMap.put(Double.class, double.class);
        typeMap.put(Character.class, char.class);
        typeMap.put(Byte.class, byte.class);
        typeMap.put(Short.class, short.class);
        typeMap.put(Float.class, float.class);
    }
    
    @Override
    public void open(FunctionContext context) throws Exception {
        if (valueConvertClass != defaultValueConvertClass) {
            if (valueConvertClass.equals(BigDecimal.class)) {
                // 對(duì)應(yīng)FlinkSQL的DECIMAL類型
                // 使用BigDecimal的構(gòu)造函數(shù)把目標(biāo)對(duì)象轉(zhuǎn)為BigDecimal對(duì)象, 源類型BigDecimal,int,long,char[],string,double,BigInteger
                convertConstructor = BigDecimal.class.getConstructor(typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
            } else if (valueConvertClass.equals(String.class)) {
                // 對(duì)應(yīng)flinkSQL的STRING、VARCHAR艇抠、CHAR類型
                // 使用String.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為String對(duì)象, 原類型支持所有基本數(shù)據(jù)類型
                staticConvertMethod = String.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
            }  else if (valueConvertClass.equals(Integer.class)) {
                // 對(duì)應(yīng)FlinkSQL的INT類型
                // 使用Integer.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為Integer對(duì)象, 源類型僅支持int類型和String類型
                staticConvertMethod = Integer.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
            } else if (valueConvertClass.equals(Boolean.class)) {
                // 對(duì)應(yīng)flinkSQL的BOOLEAN類型
                // 使用Boolean.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為Boolean對(duì)象, 源類型僅支持boolean類型和String類型
                staticConvertMethod = Boolean.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
            } else if (valueConvertClass.equals(Byte.class)) {
                // 對(duì)應(yīng)FlinkSQL的TINYINT類型
                // 使用Byte.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為Byte對(duì)象, 源類型僅支持byte類型和String類型
                staticConvertMethod = Byte.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
            } else if (valueConvertClass.equals(Short.class)) {
                // 對(duì)應(yīng)FlinkSQL的SMALLINT類型
                // 使用Short.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為Short對(duì)象, 源類型僅支持short類型和String類型
                staticConvertMethod = Short.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
            }else if (valueConvertClass.equals(Long.class)) {
                // 對(duì)應(yīng)FlinkSQL的BIGINT類型
                // 使用Long.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為L(zhǎng)ong對(duì)象, 源類型僅支持long類型和String類型
                staticConvertMethod = Long.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
            } else if (valueConvertClass.equals(Float.class)) {
                // 對(duì)應(yīng)FlinkSQL的FLOAT類型
                // 使用Float.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為Float對(duì)象, 源類型僅支持float類型和String類型
                staticConvertMethod = Float.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
            } else if (valueConvertClass.equals(Double.class)) {
                // 對(duì)應(yīng)FlinkSQL的DOUBLE類型
                // 使用Double.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為Double對(duì)象, 源類型僅支持double類型和String類型
                staticConvertMethod = Double.class.getMethod("valueOf", typeMap.getOrDefault(defaultValueConvertClass, defaultValueConvertClass));
            } else if (valueConvertClass.equals(Date.class)) {
                // 對(duì)應(yīng)FlinkSQL的DATE類型
                // 使用Date.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為Date對(duì)象, 源類型僅支持LocalDate類型和String類型
                staticConvertMethod = Date.class.getMethod("valueOf", defaultValueConvertClass);
            } else if (valueConvertClass.equals(LocalDate.class)) {
                // 對(duì)應(yīng)FlinkSQL的DATE類型
                // 使用LocalDate.parse方法把目標(biāo)對(duì)象轉(zhuǎn)為L(zhǎng)ocalDate對(duì)象, 源類型僅支持CharSequence類型
                staticConvertMethod = LocalDate.class.getMethod("parse", defaultValueConvertClass);
            } else if (valueConvertClass.equals(Time.class)) {
                // 對(duì)應(yīng)FlinkSQL的TIME(0)類型
                // 使用Time.valueOf方法把目標(biāo)對(duì)象轉(zhuǎn)為Time對(duì)象, 源類型僅支持LocalTime類型和String類型
                staticConvertMethod = Time.class.getMethod("valueOf", defaultValueConvertClass);
            } else {
                throw new RuntimeException("unsupported datatype: " + defaultValueConvertClass.getName());
            }
        }
    }

    // eval方法幕庐,實(shí)現(xiàn)udf返回
    // 重寫getTypeInference方法,以及聲明eval方法返回類型為Object家淤,實(shí)現(xiàn)動(dòng)態(tài)返回類型
    // 使用DataTypeHin注解自定義udf參數(shù)類型异剥,inputGroup = InputGroup.ANY時(shí)表示接收任意類型的參數(shù),搭配Object類型的參數(shù)類型絮重,實(shí)現(xiàn)對(duì)任意類型參數(shù)的接收處理
    public Object eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object value,
                       @DataTypeHint(inputGroup = InputGroup.ANY) Object defaultValue) throws InvocationTargetException, InstantiationException, IllegalAccessException {
        if (value != null) {
            return value;
        } else if (staticConvertMethod != null) {
            return staticConvertMethod.invoke(null, defaultValue);
        } else if (convertConstructor != null) {
            return convertConstructor.newInstance(defaultValue);
        } else {
            return defaultValue;
        }
    }

    // 獲取第一個(gè)參數(shù)的類型, 此類型通過(guò)其字段類型得到冤寿,并將其作為udf返回類型
    @Override
    public TypeInference getTypeInference(DataTypeFactory typeFactory) {
        return TypeInference.newBuilder()
                .outputTypeStrategy(callContext -> {
                    // getConversionClass為引用類型
                    valueConvertClass = callContext.getArgumentDataTypes().get(0).getConversionClass();
                    defaultValueConvertClass = callContext.getArgumentDataTypes().get(1).getConversionClass();
                    return Optional.of(callContext.getArgumentDataTypes().get(0));
                })
                .build();
    }
}
使用
// table api
table.select($"nvl",call(new Nvl(),$"col1",$"col2"));
// flink sql
tableEnv.createTemporaryFunction("nvl",Nvl.class);
tableEnv.createTemporaryFunction("nvl",new Nvl());

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市青伤,隨后出現(xiàn)的幾起案子疚沐,更是在濱河造成了極大的恐慌,老刑警劉巖潮模,帶你破解...
    沈念sama閱讀 212,454評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異痴施,居然都是意外死亡擎厢,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門辣吃,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)动遭,“玉大人,你說(shuō)我怎么就攤上這事神得±宓耄” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 157,921評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵哩簿,是天一觀的道長(zhǎng)宵蕉。 經(jīng)常有香客問(wèn)我酝静,道長(zhǎng),這世上最難降的妖魔是什么羡玛? 我笑而不...
    開(kāi)封第一講書人閱讀 56,648評(píng)論 1 284
  • 正文 為了忘掉前任别智,我火速辦了婚禮,結(jié)果婚禮上稼稿,老公的妹妹穿的比我還像新娘薄榛。我一直安慰自己,他們只是感情好让歼,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布敞恋。 她就那樣靜靜地躺著,像睡著了一般谋右。 火紅的嫁衣襯著肌膚如雪硬猫。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 49,950評(píng)論 1 291
  • 那天倚评,我揣著相機(jī)與錄音浦徊,去河邊找鬼。 笑死天梧,一個(gè)胖子當(dāng)著我的面吹牛盔性,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播呢岗,決...
    沈念sama閱讀 39,090評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼冕香,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了后豫?” 一聲冷哼從身側(cè)響起悉尾,我...
    開(kāi)封第一講書人閱讀 37,817評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎挫酿,沒(méi)想到半個(gè)月后构眯,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,275評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡早龟,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評(píng)論 2 327
  • 正文 我和宋清朗相戀三年惫霸,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片葱弟。...
    茶點(diǎn)故事閱讀 38,724評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡壹店,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出芝加,到底是詐尸還是另有隱情硅卢,我是刑警寧澤,帶...
    沈念sama閱讀 34,409評(píng)論 4 333
  • 正文 年R本政府宣布,位于F島的核電站将塑,受9級(jí)特大地震影響脉顿,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜抬旺,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評(píng)論 3 316
  • 文/蒙蒙 一弊予、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧开财,春花似錦汉柒、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,815評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至历葛,卻和暖如春正塌,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背恤溶。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,043評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工乓诽, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人咒程。 一個(gè)月前我還...
    沈念sama閱讀 46,503評(píng)論 2 361
  • 正文 我出身青樓鸠天,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親帐姻。 傳聞我的和親對(duì)象是個(gè)殘疾皇子稠集,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容