HIVE 自定義UDF厢漩、UDTF函數(shù)

HIVE自定義函數(shù)類(lèi)型

1)Hive 自帶了一些函數(shù)蟋座,比如:max/min等拗踢,但是數(shù)量有限,自己可以通過(guò)自定義UDF來(lái)方便的擴(kuò)展向臀。

2)當(dāng)Hive提供的內(nèi)置函數(shù)無(wú)法滿(mǎn)足你的業(yè)務(wù)處理需要時(shí)巢墅,此時(shí)就可以考慮使用用戶(hù)自定義函數(shù)(UDF:user-defined function)。

3)根據(jù)用戶(hù)自定義函數(shù)類(lèi)別分為以下三種:

(1)UDF(User-Defined-Function)

一進(jìn)一出

(2)UDAF(User-Defined Aggregation Function)

聚集函數(shù),多進(jìn)一出

類(lèi)似于:count/max/min

(3)UDTF(User-Defined Table-Generating Functions)

一進(jìn)多出

如lateral view explore()

4)官方文檔地址

https://cwiki.apache.org/confluence/display/Hive/HivePlugins

5)編程步驟:

(1)繼承org.apache.hadoop.hive.ql.exec.UDF

(2)需要實(shí)現(xiàn)evaluate函數(shù)君纫;evaluate函數(shù)支持重載驯遇;

(3)在hive的命令行窗口創(chuàng)建函數(shù)

添加jar

add jar linux_jar_path

創(chuàng)建function

create [temporary] function [dbname.]function_name AS class_name;

(4)在hive的命令行窗口刪除函數(shù)

Drop [temporary] function [if exists] [dbname.]function_name;

6)注意事項(xiàng):UDF必須要有返回類(lèi)型,可以返回null蓄髓,但是返回類(lèi)型不能為void叉庐;

1)在項(xiàng)目中是否自定義過(guò)UDF、UDTF函數(shù)会喝,以及用他們處理了什么問(wèn)題陡叠,及自定義步驟?
(1)用UDF函數(shù)解析公共字段肢执;用UDTF函數(shù)解析事件字段匾竿。
(2)自定義UDF:繼承UDF,重寫(xiě)evaluate方法
(3)自定義UDTF:繼承自GenericUDTF蔚万,重寫(xiě)3個(gè)方法:initialize(自定義輸出的列名和類(lèi)型)岭妖,process(將結(jié)果返回forward(result)),close
2)為什么要自定義UDF/UDTF反璃?
因?yàn)樽远x函數(shù)昵慌,可以自己埋點(diǎn)Log打印日志,出錯(cuò)或者數(shù)據(jù)異常淮蜈,方便調(diào)試斋攀。

示例:

1.Maven 依賴(lài)

<dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>
</dependencies>

UDF 一進(jìn)一出

進(jìn)行數(shù)據(jù)結(jié)構(gòu)的轉(zhuǎn)換
把JSON數(shù)組字符串轉(zhuǎn)換為 hive 中結(jié)構(gòu)體格式

 jsonstr  ==>  array<struct<action_id:string,item:string,item_type:string,ts:bigint>>

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONObject;

import java.util.ArrayList;
import java.util.List;

@Description(name = "json_array_to_struct_array", value = "-- convert json_array to struct_array")
public class JsonArrayToStructArray extends GenericUDF {
    /*
        對(duì)輸入檢測(cè)
        返回輸出的值的對(duì)象檢測(cè)器
     */
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        // 1. 對(duì)輸入檢測(cè)
        if (arguments.length < 3) {
            throw new UDFArgumentException("參數(shù)個(gè)數(shù)必須執(zhí)行3個(gè)");
        }
        for (ObjectInspector argument : arguments) {
            if (!"string".equals(argument.getTypeName())) {
                throw new UDFArgumentException("參數(shù)必須是 string");
            }
        }
        // 2. 返回輸出的值的對(duì)象檢測(cè)器
        // array(struct(k:v, k:v), struct(...))
        List<String> fieldNames = new ArrayList<>();  // 結(jié)構(gòu)體的每個(gè)k的名字
        List<ObjectInspector> oiList = new ArrayList<>();  // 結(jié)構(gòu)體的每個(gè)k的名字

        int size = arguments.length;
        /*for (int i = 1; i < (size - 1) / 2 + 1; i++) {
            String fieldName = getConstantStringValue(arguments, i).split(":")[0];
            fieldNames.add(fieldName);
        }*/

        for (int i = (size - 1) / 2 + 1; i < size; i++) {

            String fieldName = getConstantStringValue(arguments, i).split(":")[0];
            fieldNames.add(fieldName);

            // 不同的類(lèi)型, 使用不同的檢測(cè)器
            String type = getConstantStringValue(arguments, i).split(":")[1];
            switch (type) {
                case "string":
                    oiList.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
                    break;
                case "int":
                    oiList.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
                    break;
                case "bigint":
                    oiList.add(PrimitiveObjectInspectorFactory.javaLongObjectInspector);
                    break;

                default:
                    throw new UDFArgumentException("未知的不支持的類(lèi)型....");
            }
        }

        return ObjectInspectorFactory
                .getStandardListObjectInspector(ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, oiList));
    }

    /*
       對(duì)傳入的數(shù)據(jù)做計(jì)算, 返回函數(shù)最終的值
     */
    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {

        if (arguments[0].get() == null) {
            return null;
        }

        // 1.獲取傳入的json數(shù)組
        String jsonArrayString = arguments[0].get().toString();
        JSONArray jsonArray = new JSONArray(jsonArrayString);

        // 2. 解析數(shù)組中的數(shù)據(jù)
        // 2.1 最終的數(shù)組

        List<List<Object>> result = new ArrayList<>();
        // 2.2 解析出來(lái)需要的每個(gè)結(jié)構(gòu)體
        for(int i = 0; i < jsonArray.length(); i++){
            List<Object> struct = new ArrayList<>();
            result.add(struct);

            JSONObject obj = jsonArray.getJSONObject(i);

            // 表示結(jié)構(gòu)體應(yīng)該有多個(gè)少個(gè)字段
            for(int j = 1; j < (arguments.length - 1)/2 + 1; j++){
                // 獲取字段名
                String name = arguments[j].get().toString();
                if(obj.has(name)){
                    struct.add(obj.get(name));
                }else{
                    struct.add(null);
                }
            }
            /*
            {
                    "displayType":"promotion",
                    "item":"3",
                    "item_type":"sku_id",
                    "order":1
            }

             json_array_to_struct_array(
                       get_json_object(line,'$.actions'),
                       'action_id',
                      'item',
                      'item_type',
                      'ts',
                      'action_id:string',
                      'item:string',
                      'item_type:string',
                      'ts:bigint')
        array(struct(..), struct(....))

             */

        }

        return result;
    }

    /*
    select  a(...)
    返回要展示的字符串
     */
    @Override
    public String getDisplayString(String[] children) {
        return getStandardDisplayString("json_array_to_struct_array", children);
    }
}
/*
[
        {
            "displayType":"promotion",
            "item":"3",
            "item_type":"sku_id",
            "order":1
        },
        {
            "displayType":"promotion",
            "item":"1",
            "item_type":"sku_id",
            "order":2
        },
        {
            "displayType":"query",
            "item":"7",
            "item_type":"sku_id",
            "order":3
        },
        {
            "displayType":"promotion",
            "item":"5",
            "item_type":"sku_id",
            "order":4
        }
]
 */

UDTF 一進(jìn)多出

例如:把JSON數(shù)組:
[{"action_id":"favor_add","item":"6","item_type":"sku_id","ts":1592841743610},
{"action_id":"cart_add","item":"6","item_type":"sku_id","ts":1592841746886}]

轉(zhuǎn)換為多行JSON對(duì)象:
{"action_id":"favor_add","item":"6","item_type":"sku_id","ts":1592841743610}
{"action_id":"cart_add","item":"6","item_type":"sku_id","ts":1592841746886}

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;

import java.util.ArrayList;
import java.util.List;

/**
 * udtf:  一進(jìn)多出
 *  1進(jìn):   [{}, {}]
 *  多出:  "{}", "{}"
 */
@Description(name = "explode_json_array", value = "explode json array ....")
public class ExplodeJsonArray extends GenericUDTF {
    /*
      作用:
      1.檢測(cè)輸入
      2. 返回期望的數(shù)據(jù)類(lèi)型的檢測(cè)器
     */
    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        // 1. 檢測(cè)輸入
        // 1.1 獲取到傳入的參數(shù)
        List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();
        if(inputFields.size() != 1)
            throw new UDFArgumentException("函數(shù) explode_json_array 需要正好1個(gè)參數(shù)");

        ObjectInspector inspector = inputFields.get(0).getFieldObjectInspector();
        if(inspector.getCategory() != ObjectInspector.Category.PRIMITIVE || !inspector.getTypeName().equals("string")){
            throw new UDFArgumentException("函數(shù) explode_json_array 參數(shù)類(lèi)型不匹配, 必須是一個(gè)字符串");
        }
        // 2. 返回期望數(shù)據(jù)類(lèi)型的檢測(cè)器
        List<String> names = new ArrayList<>();
        names.add("action");
        List<ObjectInspector> inspectors = new ArrayList<>();
        inspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors);

    }
    /*
        處理數(shù)據(jù)   [{}, {}, ...]
     */
    @Override
    public void process(Object[] args) throws HiveException {
        String jsonArrayString = args[0].toString();
        JSONArray jsonArray = new JSONArray(jsonArrayString);
        for (int i = 0; i < jsonArray.length(); i++) {
            String col = jsonArray.getString(i);
            String[] cols = new String[1];
            cols[0] = col;
            forward(cols);  // 為什么要傳遞數(shù)組? 有可能炸裂出來(lái)多列數(shù)據(jù), 所以才需要傳遞數(shù)字
        }
    }
    /**
     * 關(guān)閉資源
     * 不用實(shí)現(xiàn)
     */
    @Override
    public void close() throws HiveException {

    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市梧田,隨后出現(xiàn)的幾起案子淳蔼,更是在濱河造成了極大的恐慌,老刑警劉巖裁眯,帶你破解...
    沈念sama閱讀 211,290評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鹉梨,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡穿稳,警方通過(guò)查閱死者的電腦和手機(jī)存皂,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)逢艘,“玉大人旦袋,你說(shuō)我怎么就攤上這事∷模” “怎么了疤孕?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,872評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀(guān)的道長(zhǎng)央拖。 經(jīng)常有香客問(wèn)我祭阀,道長(zhǎng)截亦,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,415評(píng)論 1 283
  • 正文 為了忘掉前任柬讨,我火速辦了婚禮崩瓤,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘踩官。我一直安慰自己却桶,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布蔗牡。 她就那樣靜靜地躺著颖系,像睡著了一般。 火紅的嫁衣襯著肌膚如雪辩越。 梳的紋絲不亂的頭發(fā)上嘁扼,一...
    開(kāi)封第一講書(shū)人閱讀 49,784評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音黔攒,去河邊找鬼趁啸。 笑死,一個(gè)胖子當(dāng)著我的面吹牛督惰,可吹牛的內(nèi)容都是我干的不傅。 我是一名探鬼主播,決...
    沈念sama閱讀 38,927評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼赏胚,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼访娶!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起觉阅,我...
    開(kāi)封第一講書(shū)人閱讀 37,691評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤变秦,失蹤者是張志新(化名)和其女友劉穎樊拓,沒(méi)想到半個(gè)月后轻腺,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體琉雳,經(jīng)...
    沈念sama閱讀 44,137評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評(píng)論 2 326
  • 正文 我和宋清朗相戀三年痴柔,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了沦偎。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片疫向。...
    茶點(diǎn)故事閱讀 38,622評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡咳蔚,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出搔驼,到底是詐尸還是另有隱情谈火,我是刑警寧澤,帶...
    沈念sama閱讀 34,289評(píng)論 4 329
  • 正文 年R本政府宣布舌涨,位于F島的核電站糯耍,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜温技,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評(píng)論 3 312
  • 文/蒙蒙 一革为、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧舵鳞,春花似錦震檩、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至套才,卻和暖如春迂猴,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背背伴。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工沸毁, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人傻寂。 一個(gè)月前我還...
    沈念sama閱讀 46,316評(píng)論 2 360
  • 正文 我出身青樓以清,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親崎逃。 傳聞我的和親對(duì)象是個(gè)殘疾皇子掷倔,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評(píng)論 2 348